sched/msgq: add support of System V message queue

https://man.openbsd.org/msgget.2

Signed-off-by: chao an <anchao@xiaomi.com>
This commit is contained in:
chao an 2022-10-19 22:28:43 +08:00 committed by Xiang Xiao
parent 96c3debe6a
commit 205c8934a3
16 changed files with 1281 additions and 11 deletions

View File

@ -411,7 +411,7 @@ static FAR const char * const g_statenames[] =
"Inactive",
"Waiting,Semaphore",
"Waiting,Signal"
#ifndef CONFIG_DISABLE_MQUEUE
#if !defined(CONFIG_DISABLE_MQUEUE) && !defined(CONFIG_DISABLE_MQUEUE_SYSV)
, "Waiting,MQ empty"
, "Waiting,MQ full"
#endif

View File

@ -269,7 +269,7 @@ int inode_stat(FAR struct inode *inode, FAR struct stat *buf, int resolve)
}
else
#endif
#if !defined(CONFIG_DISABLE_MQUEUE)
#if !defined(CONFIG_DISABLE_MQUEUE) && !defined(CONFIG_DISABLE_MQUEUE_SYSV)
/* Check for a message queue */
if (INODE_IS_MQUEUE(inode))

View File

@ -205,7 +205,7 @@ enum tstate_e
TSTATE_TASK_INACTIVE, /* BLOCKED - Initialized but not yet activated */
TSTATE_WAIT_SEM, /* BLOCKED - Waiting for a semaphore */
TSTATE_WAIT_SIG, /* BLOCKED - Waiting for a signal */
#ifndef CONFIG_DISABLE_MQUEUE
#if !defined(CONFIG_DISABLE_MQUEUE) && !defined(CONFIG_DISABLE_MQUEUE_SYSV)
TSTATE_WAIT_MQNOTEMPTY, /* BLOCKED - Waiting for a MQ to become not empty. */
TSTATE_WAIT_MQNOTFULL, /* BLOCKED - Waiting for a MQ to become not full. */
#endif

213
include/sys/msg.h Normal file
View File

@ -0,0 +1,213 @@
/****************************************************************************
* include/sys/msg.h
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership. The
* ASF licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the
* License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*
****************************************************************************/
#ifndef __INCLUDE_SYS_MSG_H
#define __INCLUDE_SYS_MSG_H
/****************************************************************************
* Included Files
****************************************************************************/
#include <nuttx/config.h>
#include <sys/ipc.h>
#include <time.h>
/****************************************************************************
* Pre-processor Definitions
****************************************************************************/
/* The MSG_NOERROR identifier value, the msqid_ds struct and the msg struct
* are as defined by the SV API Intel 386 Processor Supplement.
*/
#define MSG_NOERROR 010000 /* No error if message is too big */
#define MSG_EXCEPT 020000 /* Recv any msg except of specified type.*/
#define MSG_COPY 040000 /* Copy (not remove) all queue messages */
/****************************************************************************
* Public Type Declarations
****************************************************************************/
typedef unsigned long msgqnum_t;
typedef unsigned long msglen_t;
struct msqid_ds
{
struct ipc_perm msg_perm; /* Ownership and permissions */
time_t msg_stime; /* Time of last msgsnd(2) */
time_t msg_rtime; /* Time of last msgrcv(2) */
time_t msg_ctime; /* Time of last change */
unsigned long msg_cbytes; /* Current number of bytes in
* queue (nonstandard) */
msgqnum_t msg_qnum; /* Current number of messages
* in queue */
msglen_t msg_qbytes; /* Maximum number of bytes
* allowed in queue */
pid_t msg_lspid; /* PID of last msgsnd(2) */
pid_t msg_lrpid; /* PID of last msgrcv(2) */
};
/* Structure describing a message. The SVID doesn't suggest any
* particular name for this structure. There is a reference in the
* msgop man page that reads "The structure mymsg is an example of what
* this user defined buffer might look like, and includes the following
* members:". This sentence is followed by two lines equivalent
* to the mtype and mtext field declarations below. It isn't clear
* if "mymsg" refers to the name of the structure type or the name of an
* instance of the structure...
*/
struct mymsg
{
long mtype; /* message type (+ve integer) */
char mtext[1]; /* message body */
};
#ifdef __cplusplus
#define EXTERN extern "C"
extern "C"
{
#else
#define EXTERN extern
#endif
/****************************************************************************
* Public Function Prototypes
****************************************************************************/
/****************************************************************************
* Name: msgctl
*
* Description:
* System V message control operations.
* msgctl() performs the control operation specified by cmd on the
* System V message queue with identifier msqid.
*
* Input Parameters:
* msqid - System V message queue identifier
* cmd - Command operations
* msqid_ds - Defines a message queue
*
* Returned Value:
* On success, IPC_STAT, IPC_SET, and IPC_RMID return 0. A
* successful IPC_INFO or MSG_INFO operation returns the index of
* the highest used entry in the kernel's internal array recording
* information about all message queues. (This information can be
* used with repeated MSG_STAT or MSG_STAT_ANY operations to obtain
* information about all queues on the system.) A successful
* MSG_STAT or MSG_STAT_ANY operation returns the identifier of the
* queue whose index was given in msqid.
*
* On failure, -1 is returned and errno is set to indicate the error.
*
****************************************************************************/
int msgctl(int msqid, int cmd, FAR struct msqid_ds *buf);
/****************************************************************************
* Name: msgget
*
* Description:
* Get a System V message queue identifier
* The msgget() system call returns the System V message queue
* identifier associated with the value of the key argument. It may
* be used either to obtain the identifier of a previously created
* message queue (when msgflg is zero and key does not have the
* value IPC_PRIVATE), or to create a new set.
*
* Input Parameters:
* key - Key associated with the message queue
* msgflg - Operations and permissions flag
*
* Returned Value:
* On success, msgget() returns the message queue identifier (a
* nonnegative integer). On failure, -1 is returned, and errno is
* set to indicate the error.
*
****************************************************************************/
int msgget(key_t key, int msgflg);
/****************************************************************************
* Name: msgsnd
*
* Description:
* The msgsnd() function is used to send a message to the queue
* associated with the message queue identifier specified by msqid.
* The msgp argument points to a user-defined buffer that must contain
* first a field of type long int that will specify the type of the
* message, and then a data portion that will hold the data bytes of
* the message.
*
* Input Parameters:
* msqid - Message queue identifier
* msgp - Pointer to a buffer with the message to be sent
* msgsz - Length of the data part of the message to be sent
* msgflg - Operations flags
*
* Returned Value:
* On success, mq_send() returns 0 (OK); on error, -1 (ERROR)
* is returned, with errno set to indicate the error:
*
* EAGAIN The queue was full and the O_NONBLOCK flag was set for the
* message queue description referred to by mqdes.
* EINVAL Either msg or mqdes is NULL or the value of prio is invalid.
* EPERM Message queue opened not opened for writing.
* EMSGSIZE 'msglen' was greater than the maxmsgsize attribute of the
* message queue.
* EINTR The call was interrupted by a signal handler.
*
****************************************************************************/
int msgsnd(int msqid, FAR const void *msgp, size_t msgsz, int msgflg);
/****************************************************************************
* Name: msgrcv
*
* Description:
* The msgrcv() function reads a message from the message queue specified
* by the msqid parameter and places it in the user-defined buffer
* pointed to by the *msgp parameter.
*
* Input Parameters:
* msqid - Message queue identifier
* msgp - Pointer to a buffer in which the received message will be
* stored
* msgsz - Length of the data part of the buffer
* msgtyp - Type of message to be received.
* msgflg - Operations flags.
*
* Returned Value:
* On success, msgrcv() returns the number of bytes actually copied
* into the mtext array.
* On failure, both functions return -1, and set errno to indicate
* the error.
*
****************************************************************************/
ssize_t msgrcv(int msqid, FAR void *msgp,
size_t msgsz, long msgtyp, int msgflg);
#undef EXTERN
#ifdef __cplusplus
}
#endif
#endif /* __INCLUDE_SYS_MSG_H */

View File

@ -34,6 +34,12 @@ config DISABLE_MQUEUE
bool "Disable POSIX message queue support"
default DEFAULT_SMALL
config DISABLE_MQUEUE_SYSV
bool "Disable System V message queue support"
default DISABLE_MQUEUE
---help---
Disable System V message queue support
config DISABLE_ENVIRON
bool "Disable environment variable support"
default DEFAULT_SMALL
@ -1571,8 +1577,8 @@ config SIG_SIGWORK
endmenu # Signal Numbers
endmenu # Signal Configuration
menu "POSIX Message Queue Options"
depends on !DISABLE_MQUEUE
menu "Message Queue Options"
depends on !DISABLE_MQUEUE && !DISABLE_MQUEUE_SYSV
config PREALLOC_MQ_MSGS
int "Number of pre-allocated messages"

View File

@ -48,6 +48,7 @@
#include "signal/signal.h"
#include "semaphore/semaphore.h"
#include "mqueue/mqueue.h"
#include "mqueue/msg.h"
#include "clock/clock.h"
#include "timer/timer.h"
#include "irq/irq.h"
@ -575,12 +576,18 @@ void nx_start(void)
nxsig_initialize();
#ifndef CONFIG_DISABLE_MQUEUE
#if !defined(CONFIG_DISABLE_MQUEUE) && !defined(CONFIG_DISABLE_MQUEUE_SYSV)
/* Initialize the named message queue facility (if in link) */
nxmq_initialize();
#endif
#ifndef CONFIG_DISABLE_MQUEUE_SYSV
/* Initialize the System V message queue facility (if in link) */
nxmsg_initialize();
#endif
#ifdef CONFIG_NET
/* Initialize the networking system */

View File

@ -25,9 +25,15 @@ CSRCS += mq_timedreceive.c mq_rcvinternal.c mq_initialize.c
CSRCS += mq_msgfree.c mq_msgqalloc.c mq_msgqfree.c mq_recover.c
CSRCS += mq_setattr.c mq_waitirq.c mq_notify.c mq_getattr.c
endif
ifneq ($(CONFIG_DISABLE_MQUEUE_SYSV),y)
CSRCS += msgctl.c msgget.c msginternal.c msgrcv.c msgsnd.c
endif
# Include mqueue build support
DEPPATH += --dep-path mqueue
VPATH += :mqueue
endif

132
sched/mqueue/msg.h Normal file
View File

@ -0,0 +1,132 @@
/****************************************************************************
* sched/mqueue/msg.h
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership. The
* ASF licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the
* License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*
****************************************************************************/
#ifndef __SCHED_MQUEUE_MSG_H
#define __SCHED_MQUEUE_MSG_H
/****************************************************************************
* Included Files
****************************************************************************/
#include <nuttx/config.h>
#include <nuttx/compiler.h>
#include <nuttx/irq.h>
#include <nuttx/mqueue.h>
#include <sys/msg.h>
#include <errno.h>
#if defined(CONFIG_MQ_MAXMSGSIZE) && CONFIG_MQ_MAXMSGSIZE > 0
/****************************************************************************
* Pre-processor Definitions
****************************************************************************/
#define MSG_MAX_BYTES CONFIG_MQ_MAXMSGSIZE
#define MSG_MAX_MSGS 16
/****************************************************************************
* Public Type Definitions
****************************************************************************/
struct msgq_s
{
struct mqueue_cmn_s cmn;
struct list_node msglist; /* Prioritized message list */
key_t key;
int16_t maxmsgs; /* Maximum number of messages in the queue */
int16_t nmsgs; /* Number of message in the queue */
uint16_t maxmsgsize; /* Max size of message in message queue */
};
struct msgbuf_s
{
struct list_node node;
uint16_t msize; /* Message data length */
long mtype; /* Message type, must be > 0 */
char mtext[MSG_MAX_BYTES]; /* Message data */
};
/****************************************************************************
* Public Data
****************************************************************************/
#ifdef __cplusplus
#define EXTERN extern "C"
extern "C"
{
#else
#define EXTERN extern
#endif
EXTERN struct list_node g_msgfreelist;
/****************************************************************************
* Public Function Prototypes
****************************************************************************/
/****************************************************************************
* Name: nxmsg_initialize
*
* Description:
* Initialize the message queue
*
****************************************************************************/
void nxmsg_initialize(void);
/****************************************************************************
* Name: nxmsg_alloc
*
* Description:
* Allocate a message queue instance
*
****************************************************************************/
int nxmsg_alloc(FAR struct msgq_s **pmsgq);
/****************************************************************************
* Name: nxmsg_free
*
* Description:
* Free the message queue instance
*
****************************************************************************/
void nxmsg_free(FAR struct msgq_s *msgq);
/****************************************************************************
* Name: nxmsg_lookup
*
* Description:
* Find the message queue in look-up table
*
****************************************************************************/
FAR struct msgq_s *nxmsg_lookup(key_t key);
#undef EXTERN
#ifdef __cplusplus
}
#endif
#endif /* defined(CONFIG_MQ_MAXMSGSIZE) && CONFIG_MQ_MAXMSGSIZE > 0 */
#endif /* __SCHED_MQUEUE_MSG_H */

124
sched/mqueue/msgctl.c Normal file
View File

@ -0,0 +1,124 @@
/****************************************************************************
* sched/mqueue/msgctl.c
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership. The
* ASF licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the
* License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*
****************************************************************************/
/****************************************************************************
* Included Files
****************************************************************************/
#include "mqueue/msg.h"
/****************************************************************************
* Public Functions
****************************************************************************/
/****************************************************************************
* Name: msgctl
*
* Description:
* System V message control operations.
* msgctl() performs the control operation specified by cmd on the
* System V message queue with identifier msqid.
*
* Input Parameters:
* msqid - System V message queue identifier
* cmd - Command operations
* msqid_ds - Defines a message queue
*
* Returned Value:
* On success, IPC_STAT, IPC_SET, and IPC_RMID return 0. A
* successful IPC_INFO or MSG_INFO operation returns the index of
* the highest used entry in the kernel's internal array recording
* information about all message queues. (This information can be
* used with repeated MSG_STAT or MSG_STAT_ANY operations to obtain
* information about all queues on the system.) A successful
* MSG_STAT or MSG_STAT_ANY operation returns the identifier of the
* queue whose index was given in msqid.
*
* On failure, -1 is returned and errno is set to indicate the error.
*
****************************************************************************/
int msgctl(int msqid, int cmd, FAR struct msqid_ds *buf)
{
FAR struct msgq_s *msgq;
irqstate_t flags;
int ret = OK;
flags = enter_critical_section();
msgq = nxmsg_lookup(msqid);
if (msgq == NULL)
{
ret = -EINVAL;
goto errout_with_critical;
}
switch (cmd)
{
case IPC_RMID:
{
nxmsg_free(msgq);
break;
}
case IPC_SET:
{
if (buf == NULL)
{
ret = -EFAULT;
break;
}
msgq->maxmsgs = buf->msg_qbytes / CONFIG_MQ_MAXMSGSIZE;
break;
}
case IPC_STAT:
{
if (buf == NULL)
{
ret = -EFAULT;
break;
}
buf->msg_qnum = list_length(&msgq->msglist);
buf->msg_cbytes = buf->msg_qnum * CONFIG_MQ_MAXMSGSIZE;
buf->msg_qbytes = msgq->maxmsgs * CONFIG_MQ_MAXMSGSIZE;
break;
}
default:
{
ret = -EINVAL;
break;
}
}
errout_with_critical:
leave_critical_section(flags);
if (ret < 0)
{
set_errno(-ret);
return ERROR;
}
return ret;
}

84
sched/mqueue/msgget.c Normal file
View File

@ -0,0 +1,84 @@
/****************************************************************************
* sched/mqueue/msgget.c
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership. The
* ASF licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the
* License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*
****************************************************************************/
/****************************************************************************
* Included Files
****************************************************************************/
#include "mqueue/msg.h"
/****************************************************************************
* Public Functions
****************************************************************************/
/****************************************************************************
* Name: msgget
*
* Description:
* Get a System V message queue identifier
* The msgget() system call returns the System V message queue
* identifier associated with the value of the key argument. It may
* be used either to obtain the identifier of a previously created
* message queue (when msgflg is zero and key does not have the
* value IPC_PRIVATE), or to create a new set.
*
* Input Parameters:
* key - Key associated with the message queue
* msgflg - Operations and permissions flag
*
* Returned Value:
* On success, msgget() returns the message queue identifier (a
* nonnegative integer). On failure, -1 is returned, and errno is
* set to indicate the error.
*
****************************************************************************/
int msgget(key_t key, int msgflg)
{
FAR struct msgq_s *msgq;
irqstate_t flags;
int ret = OK;
flags = enter_critical_section();
msgq = nxmsg_lookup(key);
if (msgq)
{
if ((msgflg & IPC_CREAT) && (msgflg & IPC_EXCL))
{
ret = -EEXIST;
}
}
else
{
ret = (key != IPC_PRIVATE && !(msgflg & IPC_CREAT)) ?
-ENOENT : nxmsg_alloc(&msgq);
}
leave_critical_section(flags);
if (ret < 0)
{
set_errno(-ret);
return ERROR;
}
return msgq->key;
}

188
sched/mqueue/msginternal.c Normal file
View File

@ -0,0 +1,188 @@
/****************************************************************************
* sched/mqueue/msginternal.c
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership. The
* ASF licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the
* License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*
****************************************************************************/
/****************************************************************************
* Included Files
****************************************************************************/
#include <nuttx/kmalloc.h>
#include "mqueue/msg.h"
/****************************************************************************
* Pre-processor Definitions
****************************************************************************/
#define MQ_PERBLOCK 10
/****************************************************************************
* Private Data
****************************************************************************/
static uint8_t g_nmsgq; /* The number of groups of msgs array */
static FAR struct msgq_s **g_msgqs; /* The pointer of two layer file descriptors array */
/****************************************************************************
* Public Data
****************************************************************************/
struct list_node g_msgfreelist = LIST_INITIAL_VALUE(g_msgfreelist);
/****************************************************************************
* Private Functions
****************************************************************************/
/****************************************************************************
* Name: nxmsg_alloc_internal
****************************************************************************/
static FAR struct msgq_s *nxmsg_alloc_internal(void)
{
FAR struct msgq_s *msgq;
FAR struct msgq_s **tmp;
int i;
msgq = kmm_zalloc(sizeof(struct msgq_s));
if (msgq == NULL)
{
return NULL;
}
for (i = 0; i < g_nmsgq; i++)
{
if (g_msgqs[i] == NULL)
{
g_msgqs[i] = msgq;
msgq->key = i + 1;
return msgq;
}
}
tmp = kmm_realloc(g_msgqs, sizeof(FAR void *) *
(g_nmsgq + MQ_PERBLOCK));
if (tmp == NULL)
{
kmm_free(msgq);
return NULL;
}
g_msgqs = tmp;
memset(&g_msgqs[g_nmsgq], 0, sizeof(FAR void *) * MQ_PERBLOCK);
g_msgqs[g_nmsgq] = msgq;
msgq->key = g_nmsgq + 1;
g_nmsgq += MQ_PERBLOCK;
return msgq;
}
/****************************************************************************
* Public Functions
****************************************************************************/
/****************************************************************************
* Name: nxmsg_initialize
****************************************************************************/
void nxmsg_initialize(void)
{
FAR struct msgbuf_s *msg;
msg = (FAR struct msgbuf_s *)kmm_malloc(sizeof(*msg) *
CONFIG_PREALLOC_MQ_MSGS);
if (msg)
{
int i;
for (i = 0; i < CONFIG_PREALLOC_MQ_MSGS; i++)
{
list_add_tail(&g_msgfreelist, &msg->node);
msg++;
}
}
}
/****************************************************************************
* Name: nxmsg_alloc
****************************************************************************/
int nxmsg_alloc(FAR struct msgq_s **pmsgq)
{
FAR struct msgq_s *msgq;
msgq = nxmsg_alloc_internal();
if (msgq == NULL)
{
return -ENOMEM;
}
/* Initialize the new named message queue */
list_initialize(&msgq->msglist);
msgq->maxmsgs = MSG_MAX_MSGS;
msgq->maxmsgsize = MSG_MAX_BYTES;
*pmsgq = msgq;
return OK;
}
/****************************************************************************
* Name: nxmsg_free
****************************************************************************/
void nxmsg_free(FAR struct msgq_s *msgq)
{
FAR struct msgbuf_s *entry;
FAR struct msgbuf_s *tmp;
int index;
if (msgq == NULL || msgq->key <= 0 || msgq->key > g_nmsgq)
{
return;
}
index = msgq->key - 1;
list_for_every_entry_safe(&msgq->msglist, entry,
tmp, struct msgbuf_s, node)
{
list_delete(&entry->node);
list_add_tail(&g_msgfreelist, &entry->node);
}
kmm_free(g_msgqs[index]);
g_msgqs[index] = NULL;
}
/****************************************************************************
* Name: nxmsg_lookup
****************************************************************************/
FAR struct msgq_s *nxmsg_lookup(key_t key)
{
if (key <= 0 || key > g_nmsgq)
{
return NULL;
}
return g_msgqs[key - 1];
}

258
sched/mqueue/msgrcv.c Normal file
View File

@ -0,0 +1,258 @@
/****************************************************************************
* sched/mqueue/msgrcv.c
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership. The
* ASF licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the
* License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*
****************************************************************************/
/****************************************************************************
* Included Files
****************************************************************************/
#include <assert.h>
#include <nuttx/cancelpt.h>
#include "sched/sched.h"
#include "mqueue/msg.h"
/****************************************************************************
* Private Functions
****************************************************************************/
/****************************************************************************
* Name: msgrcv_wait
****************************************************************************/
static int msgrcv_wait(FAR struct msgq_s *msgq, FAR struct msgbuf_s **rcvmsg,
long msgtyp, int msgflg)
{
FAR struct msgbuf_s *newmsg = NULL;
FAR struct msgbuf_s *tmp;
FAR struct tcb_s *rtcb;
#ifdef CONFIG_CANCELLATION_POINTS
/* msgrcv_wait() is not a cancellation point, but it may be called
* from msgrcv() which are cancellation point.
*/
if (check_cancellation_point())
{
/* If there is a pending cancellation, then do not perform
* the wait. Exit now with ECANCELED.
*/
return -ECANCELED;
}
#endif
/* Get the message from the head of the queue */
while (1)
{
list_for_every_entry(&msgq->msglist, tmp, struct msgbuf_s, node)
{
/* Unless MSG_COPY is specified in msgflg (see below), the msgtyp
* argument specifies the type of message requested, as follows:
*
* 1. If msgtyp is 0, then the first message in the queue is read.
*
* 2. If msgtyp is greater than 0, then the first message in the
* queue of type msgtyp is read, unless MSG_EXCEPT was
* specified in msgflg, in which case the first message in the
* queue of type not equal to msgtyp will be read.
*
* 3. If msgtyp is less than 0, then the first message in the
* queue with the lowest type less than or equal to the
* absolute value of msgtyp will be read.
*/
if (msgtyp < 0)
{
if (newmsg == NULL || newmsg->mtype > tmp->mtype)
{
newmsg = tmp;
}
}
else if (msgtyp == 0 ||
((msgflg & MSG_EXCEPT) != 0) == (tmp->mtype != msgtyp))
{
newmsg = tmp;
break;
}
}
if (newmsg)
{
list_delete(&newmsg->node);
goto found;
}
if ((msgflg & IPC_NOWAIT) != 0)
{
return -EAGAIN;
}
/* The queue is empty! Should we block until there the above condition
* has been satisfied?
*/
rtcb = this_task();
rtcb->waitobj = msgq;
msgq->cmn.nwaitnotempty++;
/* Initialize the 'errcode" used to communication wake-up error
* conditions.
*/
rtcb->errcode = OK;
/* Make sure this is not the idle task, descheduling that
* isn't going to end well.
*/
DEBUGASSERT(NULL != rtcb->flink);
up_block_task(rtcb, TSTATE_WAIT_MQNOTEMPTY);
/* When we resume at this point, either (1) the message queue
* is no longer empty, or (2) the wait has been interrupted by
* a signal. We can detect the latter case be examining the
* errno value (should be either EINTR or ETIMEDOUT).
*/
if (rtcb->errcode != OK)
{
return -rtcb->errcode;
}
}
found:
msgq->nmsgs--;
*rcvmsg = newmsg;
return OK;
}
/****************************************************************************
* Public Functions
****************************************************************************/
/****************************************************************************
* Name: msgrcv
*
* Description:
* The msgrcv() function reads a message from the message queue specified
* by the msqid parameter and places it in the user-defined buffer
* pointed to by the *msgp parameter.
*
* Input Parameters:
* msqid - Message queue identifier
* msgp - Pointer to a buffer in which the received message will be
* stored
* msgsz - Length of the data part of the buffer
* msgtyp - Type of message to be received.
* msgflg - Operations flags.
*
* Returned Value:
* On success, msgrcv() returns the number of bytes actually copied
* into the mtext array.
* On failure, both functions return -1, and set errno to indicate
* the error.
*
****************************************************************************/
ssize_t msgrcv(int msqid, FAR void *msgp, size_t msgsz, long msgtyp,
int msgflg)
{
FAR struct msgbuf_s *msg = NULL;
FAR struct mymsg *buf = msgp;
FAR struct msgq_s *msgq;
FAR struct tcb_s *btcb;
irqstate_t flags;
int ret;
if (msgp == NULL)
{
ret = -EFAULT;
goto errout;
}
flags = enter_critical_section();
msgq = nxmsg_lookup(msqid);
if (msgq == NULL)
{
ret = -EINVAL;
goto errout_with_critical;
}
if (msgsz < msgq->maxmsgsize &&
((msgflg & MSG_NOERROR) == 0))
{
ret = -EMSGSIZE;
goto errout_with_critical;
}
ret = msgrcv_wait(msgq, &msg, msgtyp, msgflg);
if (ret < 0)
{
goto errout_with_critical;
}
ret = msgsz > msg->msize ? msg->msize : msgsz;
buf->mtype = msg->mtype;
memcpy(buf->mtext, msg->mtext, ret);
list_add_tail(&g_msgfreelist, &msg->node);
/* Check if any tasks are waiting for the MQ not full event. */
if (msgq->cmn.nwaitnotfull > 0)
{
/* Find the highest priority task that is waiting for
* this queue to be not-full in g_waitingformqnotfull list.
* This must be performed in a critical section because
* messages can be sent from interrupt handlers.
*/
btcb = (FAR struct tcb_s *)dq_peek(MQ_WNFLIST(msgq->cmn));
/* If one was found, unblock it. NOTE: There is a race
* condition here: the queue might be full again by the
* time the task is unblocked
*/
DEBUGASSERT(btcb != NULL);
if (WDOG_ISACTIVE(&btcb->waitdog))
{
wd_cancel(&btcb->waitdog);
}
msgq->cmn.nwaitnotfull--;
up_unblock_task(btcb);
}
errout_with_critical:
leave_critical_section(flags);
errout:
if (ret < 0)
{
set_errno(-ret);
return ERROR;
}
return ret;
}

252
sched/mqueue/msgsnd.c Normal file
View File

@ -0,0 +1,252 @@
/****************************************************************************
* sched/mqueue/msgsnd.c
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership. The
* ASF licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the
* License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*
****************************************************************************/
/****************************************************************************
* Included Files
****************************************************************************/
#include <assert.h>
#include <nuttx/cancelpt.h>
#include "sched/sched.h"
#include "mqueue/msg.h"
/****************************************************************************
* Private Functions
****************************************************************************/
/****************************************************************************
* Name: msgsnd_wait
****************************************************************************/
static int msgsnd_wait(FAR struct msgq_s *msgq, int msgflg)
{
FAR struct tcb_s *rtcb;
#ifdef CONFIG_CANCELLATION_POINTS
/* msgsnd_wait() is not a cancellation point, but may be called via
* msgsnd() which are cancellation points.
*/
if (check_cancellation_point())
{
/* If there is a pending cancellation, then do not perform
* the wait. Exit now with ECANCELED.
*/
return -ECANCELED;
}
#endif
/* Verify that the queue is indeed full as the caller thinks
* Loop until there are fewer than max allowable messages in the
* receiving message queue
*/
while (msgq->nmsgs >= msgq->maxmsgs)
{
/* Should we block until there is sufficient space in the
* message queue?
*/
if ((msgflg & IPC_NOWAIT) != 0)
{
return -EAGAIN;
}
/* Block until the message queue is no longer full.
* When we are unblocked, we will try again
*/
rtcb = this_task();
rtcb->waitobj = msgq;
msgq->cmn.nwaitnotfull++;
/* Initialize the errcode used to communication wake-up error
* conditions.
*/
rtcb->errcode = OK;
/* Make sure this is not the idle task, descheduling that
* isn't going to end well.
*/
DEBUGASSERT(NULL != rtcb->flink);
up_block_task(rtcb, TSTATE_WAIT_MQNOTFULL);
/* When we resume at this point, either (1) the message queue
* is no longer empty, or (2) the wait has been interrupted by
* a signal. We can detect the latter case be examining the
* per-task errno value (should be EINTR or ETIMEOUT).
*/
if (rtcb->errcode != OK)
{
return -rtcb->errcode;
}
}
return OK;
}
/****************************************************************************
* Public Functions
****************************************************************************/
/****************************************************************************
* Name: msgsnd
*
* Description:
* The msgsnd() function is used to send a message to the queue
* associated with the message queue identifier specified by msqid.
* The msgp argument points to a user-defined buffer that must contain
* first a field of type long int that will specify the type of the
* message, and then a data portion that will hold the data bytes of
* the message.
*
* Input Parameters:
* msqid - Message queue identifier
* msgp - Pointer to a buffer with the message to be sent
* msgsz - Length of the data part of the message to be sent
* msgflg - Operations flags
*
* Returned Value:
* On success, mq_send() returns 0 (OK); on error, -1 (ERROR)
* is returned, with errno set to indicate the error:
*
* EAGAIN The queue was full and the O_NONBLOCK flag was set for the
* message queue description referred to by mqdes.
* EINVAL Either msg or mqdes is NULL or the value of prio is invalid.
* EPERM Message queue opened not opened for writing.
* EMSGSIZE 'msglen' was greater than the maxmsgsize attribute of the
* message queue.
* EINTR The call was interrupted by a signal handler.
*
****************************************************************************/
int msgsnd(int msqid, FAR const void *msgp, size_t msgsz, int msgflg)
{
FAR const struct mymsg *buf = msgp;
FAR struct msgbuf_s *msg;
FAR struct msgq_s *msgq;
FAR struct tcb_s *btcb;
irqstate_t flags;
int ret = OK;
if (msgp == NULL)
{
ret = -EFAULT;
goto errout;
}
flags = enter_critical_section();
msgq = nxmsg_lookup(msqid);
if (msgq == NULL)
{
ret = -EINVAL;
goto errout_with_critical;
}
if (msgsz > msgq->maxmsgsize)
{
ret = -EMSGSIZE;
goto errout_with_critical;
}
/* Is the message queue FULL? */
if (msgq->nmsgs >= msgq->maxmsgs)
{
if (!up_interrupt_context()) /* In an interrupt handler? */
{
/* Yes.. the message queue is full. Wait for space to become
* available in the message queue.
*/
ret = msgsnd_wait(msgq, msgflg);
}
else if ((msgflg & IPC_NOWAIT) != 0)
{
ret = -EAGAIN;
}
}
if (ret == OK)
{
/* Now allocate the message. */
msg = (FAR struct msgbuf_s *)list_remove_head(&g_msgfreelist);
if (msg == NULL)
{
ret = -ENOMEM;
goto errout_with_critical;
}
/* Check if the message was successfully allocated */
msg->msize = msgsz;
msg->mtype = buf->mtype;
memcpy(msg->mtext, buf->mtext, msgsz);
/* Insert the new message in the message queue */
list_add_tail(&msgq->msglist, &msg->node);
msgq->nmsgs++;
if (msgq->cmn.nwaitnotempty > 0)
{
/* Find the highest priority task that is waiting for
* this queue to be non-empty in g_waitingformqnotempty
* list. enter_critical_section() should give us sufficient
* protection since interrupts should never cause a change
* in this list
*/
btcb = (FAR struct tcb_s *)dq_peek(MQ_WNELIST(msgq->cmn));
/* If one was found, unblock it */
DEBUGASSERT(btcb);
if (WDOG_ISACTIVE(&btcb->waitdog))
{
wd_cancel(&btcb->waitdog);
}
msgq->cmn.nwaitnotempty--;
up_unblock_task(btcb);
}
}
errout_with_critical:
leave_critical_section(flags);
errout:
if (ret < 0)
{
set_errno(-ret);
return ERROR;
}
return OK;
}

View File

@ -439,7 +439,7 @@ int nxsig_tcbdispatch(FAR struct tcb_s *stcb, siginfo_t *info)
nxsem_wait_irq(stcb, EINTR);
}
#ifndef CONFIG_DISABLE_MQUEUE
#if !defined(CONFIG_DISABLE_MQUEUE) && !defined(CONFIG_DISABLE_MQUEUE_SYSV)
/* If the task is blocked waiting on a message queue, then that task
* must be unblocked when a signal is received.
*/

View File

@ -392,7 +392,7 @@ bool nxnotify_cancellation(FAR struct tcb_s *tcb)
nxsig_wait_irq(tcb, ECANCELED);
}
#ifndef CONFIG_DISABLE_MQUEUE
#if !defined(CONFIG_DISABLE_MQUEUE) && !defined(CONFIG_DISABLE_MQUEUE_SYSV)
/* If the thread is blocked waiting on a message queue, then
* the thread must be unblocked to handle the cancellation.
*/

View File

@ -78,7 +78,7 @@ void nxtask_recover(FAR struct tcb_s *tcb)
nxsem_recover(tcb);
#ifndef CONFIG_DISABLE_MQUEUE
#if !defined(CONFIG_DISABLE_MQUEUE) && !defined(CONFIG_DISABLE_MQUEUE_SYSV)
/* Handle cases where the thread was waiting for a message queue event */
nxmq_recover(tcb);