Squashed commit of the following:

Change all calls to mq_receive() and mq_timedreceive() in the OS to calls to nxmq_receive() and nxmq_timedreceive(), making appropriate changes for differences in return values.

    sched/mqueue:  Add nxmq_receive() and mxmq_timedreceive() which are functionally equivalent to the standard mq_receive and mq_timedreceive() except that (1) they do not create cancellation points, and (2) the do not modify the application's errno variable.
This commit is contained in:
Gregory Nutt 2017-10-10 09:57:40 -06:00
parent fca07be1df
commit e11e3b2607
14 changed files with 379 additions and 192 deletions

View File

@ -438,7 +438,7 @@ static void *null_workerthread(pthread_addr_t pvarg)
{
/* Wait for messages from our message queue */
msglen = mq_receive(priv->mq, (FAR char *)&msg, sizeof(msg), &prio);
msglen = nxmq_receive(priv->mq, (FAR char *)&msg, sizeof(msg), &prio);
/* Handle the case when we return with no message */

View File

@ -1599,7 +1599,7 @@ static void *cs43l22_workerthread(pthread_addr_t pvarg)
/* Wait for messages from our message queue */
msglen = mq_receive(priv->mq, (FAR char *)&msg, sizeof(msg), &prio);
msglen = nxmq_receive(priv->mq, (FAR char *)&msg, sizeof(msg), &prio);
/* Handle the case when we return with no message */

View File

@ -1288,7 +1288,7 @@ static void *vs1053_workerthread(pthread_addr_t pvarg)
/* Wait for messages from our message queue */
size = mq_receive(dev->mq, (FAR char *)&msg, sizeof(msg), &prio);
size = nxmq_receive(dev->mq, (FAR char *)&msg, sizeof(msg), &prio);
/* Handle the case when we return with no message */

View File

@ -64,6 +64,7 @@
#include <nuttx/clock.h>
#include <nuttx/wqueue.h>
#include <nuttx/signal.h>
#include <nuttx/mqueue.h>
#include <nuttx/i2c/i2c_master.h>
#include <nuttx/fs/fs.h>
#include <nuttx/fs/ioctl.h>
@ -2051,7 +2052,7 @@ static void *wm8904_workerthread(pthread_addr_t pvarg)
/* Wait for messages from our message queue */
msglen = mq_receive(priv->mq, (FAR char *)&msg, sizeof(msg), &prio);
msglen = nxmq_receive(priv->mq, (FAR char *)&msg, sizeof(msg), &prio);
/* Handle the case when we return with no message */

View File

@ -54,6 +54,7 @@
#include <fcntl.h>
#include "cc3000drv.h"
#include <nuttx/mqueue.h>
#include <nuttx/wireless/cc3000.h>
#include <nuttx/wireless/cc3000/cc3000_common.h>
@ -159,8 +160,8 @@ uint8_t *cc3000_wait(void)
{
DEBUGASSERT(spiconf.cc3000fd >= 0);
mq_receive(spiconf.queue, (FAR char *)&spiconf.rx_buffer,
sizeof(spiconf.rx_buffer), 0);
(void)nxmq_receive(spiconf.queue, (FAR char *)&spiconf.rx_buffer,
sizeof(spiconf.rx_buffer), 0);
return spiconf.rx_buffer.pbuffer;
}
@ -199,8 +200,8 @@ static void *unsoliced_thread_func(void *parameter)
while (spiconf.run)
{
memset(&spiconf.rx_buffer, 0, sizeof(spiconf.rx_buffer));
nbytes = mq_receive(spiconf.queue, (FAR char *)&spiconf.rx_buffer,
sizeof(spiconf.rx_buffer), 0);
nbytes = nxmq_receive(spiconf.queue, (FAR char *)&spiconf.rx_buffer,
sizeof(spiconf.rx_buffer), 0);
if (nbytes > 0)
{
ninfo("%d Processed\n", nbytes);

View File

@ -1,7 +1,7 @@
/****************************************************************************
* graphics/nxmu/nxmu_server.c
*
* Copyright (C) 2008-2012 Gregory Nutt. All rights reserved.
* Copyright (C) 2008-2012, 2017 Gregory Nutt. All rights reserved.
* Author: Gregory Nutt <gnutt@nuttx.org>
*
* Redistribution and use in source and binary forms, with or without
@ -48,7 +48,9 @@
#include <errno.h>
#include <debug.h>
#include <nuttx/mqueue.h>
#include <nuttx/nx/nx.h>
#include "nxfe.h"
/****************************************************************************
@ -329,14 +331,16 @@ int nx_runinstance(FAR const char *mqname, FAR NX_DRIVERTYPE *dev)
{
/* Receive the next server message */
nbytes = mq_receive(fe.conn.crdmq, buffer, NX_MXSVRMSGLEN, 0);
nbytes = nxmq_receive(fe.conn.crdmq, buffer, NX_MXSVRMSGLEN, 0);
if (nbytes < 0)
{
if (errno != EINTR)
if (nbytes != -EINTR)
{
gerr("ERROR: mq_receive failed: %d\n", errno);
goto errout; /* mq_receive sets errno */
gerr("ERROR: nxmq_receive() failed: %d\n", nbytes);
ret = nbytes;
goto errout;
}
continue;
}
@ -550,7 +554,11 @@ int nx_runinstance(FAR const char *mqname, FAR NX_DRIVERTYPE *dev)
}
}
errout:
nxmu_shutdown(&fe);
return OK;
errout:
nxmu_shutdown(&fe);
set_errno(-ret);
return ERROR;
}

View File

@ -77,15 +77,21 @@
*/
#if defined(CONFIG_BUILD_FLAT) || defined(__KERNEL__)
# define _MQ_SEND(d,m,l,p) nxmq_send(d,m,l,p)
# define _MQ_TIMEDSEND(d,m,l,p,t) nxmq_send(d,m,l,p,t)
# define _MQ_ERRNO(r) (-(r))
# define _MQ_ERRVAL(r) (r)
# define _MQ_SEND(d,m,l,p) nxmq_send(d,m,l,p)
# define _MQ_TIMEDSEND(d,m,l,p,t) nxmq_timedsend(d,m,l,p,t)
# define _MQ_RECEIVE(d,m,l,p) nxmq_receive(d,m,l,p)
# define _MQ_TIMEDRECIEVE(d,m,l,p,t) nxmq_timedreceive(d,m,l,p,t)
# define _MQ_GETERRNO(r) (-(r))
# define _MQ_SETERRNO(r) set_errno(-(r))
# define _MQ_GETERRVAL(r) (r)
#else
# define _MQ_SEND(d,m,l,p) mq_send(d,m,l,p)
# define _MQ_TIMEDSEND(d,m,l,p,t) mq_send(d,m,l,p,t)
# define _MQ_ERRNO(r) errno
# define _MQ_ERRVAL(r) (-errno)
# define _MQ_SEND(d,m,l,p) mq_send(d,m,l,p)
# define _MQ_TIMEDSEND(d,m,l,p,t) mq_timedsend(d,m,l,p,t)
# define _MQ_RECEIVE(d,m,l,p) mq_receive(d,m,l,p)
# define _MQ_TIMEDRECIEVE(d,m,l,p,t) mq_timedreceive(d,m,l,p,t)
# define _MQ_GETERRNO(r) errno
# define _MQ_SETERRNO(r)
# define _MQ_GETERRVAL(r) (-errno)
#endif
/****************************************************************************
@ -223,6 +229,73 @@ int nxmq_send(mqd_t mqdes, FAR const char *msg, size_t msglen, int prio);
int nxmq_timedsend(mqd_t mqdes, FAR const char *msg, size_t msglen, int prio,
FAR const struct timespec *abstime);
/****************************************************************************
* Name: nxmq_receive
*
* Description:
* This function receives the oldest of the highest priority messages
* from the message queue specified by "mqdes." This is an internal OS
* interface. It is functionally equivalent to mq_receive except that:
*
* - It is not a cancellaction point, and
* - It does not modify the errno value.
*
* See comments with mq_receive() for a more complete description of the
* behavior of this function
*
* Input Parameters:
* mqdes - Message Queue Descriptor
* msg - Buffer to receive the message
* msglen - Size of the buffer in bytes
* prio - If not NULL, the location to store message priority.
*
* Returned Value:
* This is an internal OS interface and should not be used by applications.
* It follows the NuttX internal error return policy: Zero (OK) is
* returned on success. A negated errno value is returned on failure.
* (see mq_receive() for the list list valid return values).
*
****************************************************************************/
ssize_t nxmq_receive(mqd_t mqdes, FAR char *msg, size_t msglen,
FAR int *prio);
/****************************************************************************
* Name: nxmq_timedreceive
*
* Description:
* This function receives the oldest of the highest priority messages from
* the message queue specified by "mqdes." If the message queue is empty
* and O_NONBLOCK was not set, nxmq_timedreceive() will block until a
* message is added to the message queue (or until a timeout occurs).
*
* nxmq_timedreceive() is an internal OS interface. It is functionally
* equivalent to mq_timedreceive() except that:
*
* - It is not a cancellaction point, and
* - It does not modify the errno value.
*
* See comments with mq_timedreceive() for a more complete description of
* the behavior of this function
*
* Input Parameters:
* mqdes - Message Queue Descriptor
* msg - Buffer to receive the message
* msglen - Size of the buffer in bytes
* prio - If not NULL, the location to store message priority.
* abstime - the absolute time to wait until a timeout is declared.
*
* Returned Value:
* This is an internal OS interface and should not be used by applications.
* It follows the NuttX internal error return policy: Zero (OK) is
* returned on success. A negated errno value is returned on failure.
* (see mq_timedreceive() for the list list valid return values).
*
****************************************************************************/
ssize_t nxmq_timedreceive(mqd_t mqdes, FAR char *msg, size_t msglen,
FAR int *prio, FAR const struct timespec *abstime);
/****************************************************************************
* Name: nxmq_free_msgq
*
@ -236,7 +309,7 @@ int nxmq_timedsend(mqd_t mqdes, FAR const char *msg, size_t msglen, int prio,
* Inputs:
* msgq - Named essage queue to be freed
*
* Return Value:
* Returned Value:
* None
*
****************************************************************************/
@ -250,13 +323,13 @@ void nxmq_free_msgq(FAR struct mqueue_inode_s *msgq);
* This function implements a part of the POSIX message queue open logic.
* It allocates and initializes a structu mqueue_inode_s structure.
*
* Parameters:
* Input Parameters:
* mode - mode_t value is ignored
* attr - The mq_maxmsg attribute is used at the time that the message
* queue is created to determine the maximum number of
* messages that may be placed in the message queue.
*
* Return Value:
* Returned Value:
* The allocated and initalized message queue structure or NULL in the
* event of a failure.
*
@ -271,12 +344,12 @@ FAR struct mqueue_inode_s *nxmq_alloc_msgq(mode_t mode,
* Description:
* Create a message queue descriptor for the specified TCB
*
* Inputs:
* Input Parameters:
* TCB - task that needs the descriptor.
* msgq - Named message queue containing the message
* oflags - access rights for the descriptor
*
* Return Value:
* Returned Value:
* On success, the message queue descriptor is returned. NULL is returned
* on a failure to allocate.
*
@ -294,11 +367,11 @@ mqd_t nxmq_create_des(FAR struct tcb_s *mtcb,
* deallocates any system resources allocated by the system for use by
* this task for its message queue.
*
* Parameters:
* Input Parameters:
* mqdes - Message queue descriptor.
* group - Group that has the open descriptor.
*
* Return Value:
* Returned Value:
* Zero (OK) if the message queue is closed successfully. Otherwise, a
* negated errno value is returned.
*
@ -313,11 +386,11 @@ int nxmq_close_group(mqd_t mqdes, FAR struct task_group_s *group);
* This function performs the portion of the mq_close operation related
* to freeing resource used by the message queue descriptor itself.
*
* Parameters:
* Input Parameters:
* mqdes - Message queue descriptor.
* group - Group that has the open descriptor.
*
* Return Value:
* Returned Value:
* None.
*
* Assumptions:

View File

@ -1,7 +1,7 @@
/****************************************************************************
* libnx/nxmu/nx_eventhandler.c
*
* Copyright (C) 2008-2009, 2011-2013 Gregory Nutt. All rights reserved.
* Copyright (C) 2008-2009, 2011-2013, 2017 Gregory Nutt. All rights reserved.
* Author: Gregory Nutt <gnutt@nuttx.org>
*
* Redistribution and use in source and binary forms, with or without
@ -46,6 +46,7 @@
#include <errno.h>
#include <debug.h>
#include <nuttx/mqueue.h>
#include <nuttx/nx/nx.h>
#include <nuttx/nx/nxbe.h>
#include <nuttx/nx/nxmu.h>
@ -129,16 +130,18 @@ int nx_eventhandler(NXHANDLE handle)
do
{
nbytes = mq_receive(conn->crdmq, buffer, NX_MXCLIMSGLEN, 0);
nbytes = _MQ_RECEIVE(conn->crdmq, buffer, NX_MXCLIMSGLEN, 0);
if (nbytes < 0)
{
int errcode = _MQ_GETERRNO(ret);
/* EINTR is not an error. The wait was interrupted by a signal and
* we just need to try reading again.
*/
if (errno != EINTR)
if (errcode != EINTR)
{
if (errno == EAGAIN)
if (errcode == EAGAIN)
{
/* EAGAIN is not an error. It occurs because the MQ is opened with
* O_NONBLOCK and there is no message available now.
@ -148,7 +151,8 @@ int nx_eventhandler(NXHANDLE handle)
}
else
{
gerr("ERROR: mq_receive failed: %d\n", errno);
gerr("ERROR: _MQ_RECEIVE failed: %d\n", errcode);
_MQ_SETERRNO(ret);
return ERROR;
}
}

View File

@ -100,10 +100,10 @@
* task is waiting for the message queue to become non-empty. This is
* inconsistent with the POSIX specification which says, "If a process
* has registered for notification of message a arrival at a message
* queue and some process is blocked in mq_receive() waiting to receive
* queue and some process is blocked in [nx]mq_receive() waiting to receive
* a message when a message arrives at the queue, the arriving message
* message shall satisfy mq_receive()... The resulting behavior is as if
* the message queue remains empty, and no notification shall be sent."
* message shall satisfy [nx]mq_receive()... The resulting behavior is as
* if the message queue remains empty, and no notification shall be sent."
*
****************************************************************************/

View File

@ -1,7 +1,8 @@
/****************************************************************************
* sched/mqueue/mq_rcvinternal.c
*
* Copyright (C) 2007, 2008, 2012-2013, 2016 Gregory Nutt. All rights reserved.
* Copyright (C) 2007, 2008, 2012-2013, 2016-2017 Gregory Nutt. All rights
* reserved.
* Author: Gregory Nutt <gnutt@nuttx.org>
*
* Redistribution and use in source and binary forms, with or without
@ -63,26 +64,24 @@
* Name: nxmq_verify_receive
*
* Description:
* This is internal, common logic shared by both mq_receive and
* mq_timedreceive. This function verifies the input parameters that are
* common to both functions.
* This is internal, common logic shared by both [nx]mq_receive and
* [nx]mq_timedreceive. This function verifies the input parameters that
* are common to both functions.
*
* Parameters:
* Input Parameters:
* mqdes - Message Queue Descriptor
* msg - Buffer to receive the message
* msglen - Size of the buffer in bytes
*
* Return Value:
* One success, 0 (OK) is returned. On failure, -1 (ERROR) is returned and
* the errno is set appropriately:
* Returned Value:
* One success, zero (OK) is returned. A negated errno value is returned
* on any failure:
*
* EPERM Message queue opened not opened for reading.
* EMSGSIZE 'msglen' was less than the maxmsgsize attribute of the message
* queue.
* EINVAL Invalid 'msg' or 'mqdes'
*
* Assumptions:
*
****************************************************************************/
int nxmq_verify_receive(mqd_t mqdes, FAR char *msg, size_t msglen)
@ -91,20 +90,17 @@ int nxmq_verify_receive(mqd_t mqdes, FAR char *msg, size_t msglen)
if (!msg || !mqdes)
{
set_errno(EINVAL);
return ERROR;
return -EINVAL;
}
if ((mqdes->oflags & O_RDOK) == 0)
{
set_errno(EPERM);
return ERROR;
return -EPERM;
}
if (msglen < (size_t)mqdes->msgq->maxmsgsize)
{
set_errno(EMSGSIZE);
return ERROR;
return -EMSGSIZE;
}
return OK;
@ -114,18 +110,19 @@ int nxmq_verify_receive(mqd_t mqdes, FAR char *msg, size_t msglen)
* Name: nxmq_wait_receive
*
* Description:
* This is internal, common logic shared by both mq_receive and
* mq_timedreceive. This function waits for a message to be received on
* the specified message queue, removes the message from the queue, and
* This is internal, common logic shared by both [nx]mq_receive and
* [nx]mq_timedreceive. This function waits for a message to be received
* on the specified message queue, removes the message from the queue, and
* returns it.
*
* Parameters:
* mqdes - Message queue descriptor
* Input Parameters:
* mqdes - Message queue descriptor
* rcvmsg - The caller-provided location in which to return the newly
* received message.
*
* Return Value:
* On success, a reference to the received message. If the wait was
* interrupted by a signal or a timeout, then the errno will be set
* appropriately and NULL will be returned.
* Returned Value:
* One success, zero (OK) is returned. A negated errno value is returned
* on any failure.
*
* Assumptions:
* - The caller has provided all validity checking of the input parameters
@ -136,28 +133,30 @@ int nxmq_verify_receive(mqd_t mqdes, FAR char *msg, size_t msglen)
*
****************************************************************************/
FAR struct mqueue_msg_s *nxmq_wait_receive(mqd_t mqdes)
int nxmq_wait_receive(mqd_t mqdes, FAR struct mqueue_msg_s **rcvmsg)
{
FAR struct tcb_s *rtcb;
FAR struct mqueue_inode_s *msgq;
FAR struct mqueue_msg_s *rcvmsg;
FAR struct mqueue_msg_s *newmsg;
int ret;
/* nxmq_wait_receive() is not a cancellation point, but it is always called
* from a cancellation point.
DEBUGASSERT(rcvmsg != NULL);
*rcvmsg = NULL; /* Assume failure */
#ifdef CONFIG_CANCELLATION_POINTS
/* nxmq_wait_receive() is not a cancellation point, but it may be called
* from mq_receive() or mq_timedreceive() which are cancellation point.
*/
if (enter_cancellation_point())
if (check_cancellation_point())
{
#ifdef CONFIG_CANCELLATION_POINTS
/* If there is a pending cancellation, then do not perform
* the wait. Exit now with ECANCELED.
*/
set_errno(ECANCELED);
leave_cancellation_point();
return NULL;
#endif
return -ECANCELED;
}
#endif
/* Get a pointer to the message queue */
@ -165,7 +164,7 @@ FAR struct mqueue_msg_s *nxmq_wait_receive(mqd_t mqdes)
/* Get the message from the head of the queue */
while ((rcvmsg = (FAR struct mqueue_msg_s *)sq_remfirst(&msgq->msglist)) == NULL)
while ((newmsg = (FAR struct mqueue_msg_s *)sq_remfirst(&msgq->msglist)) == NULL)
{
/* The queue is empty! Should we block until there the above condition
* has been satisfied?
@ -173,13 +172,21 @@ FAR struct mqueue_msg_s *nxmq_wait_receive(mqd_t mqdes)
if ((mqdes->oflags & O_NONBLOCK) == 0)
{
int saved_errno;
/* Yes.. Block and try again */
rtcb = this_task();
rtcb = this_task();
rtcb->msgwaitq = msgq;
msgq->nwaitnotempty++;
set_errno(OK);
/* "Borrow" the per-task errno to communication wake-up error
* conditions.
*/
saved_errno = rtcb->pterrno;
rtcb->pterrno = OK;
up_block_task(rtcb, TSTATE_WAIT_MQNOTEMPTY);
/* When we resume at this point, either (1) the message queue
@ -188,9 +195,12 @@ FAR struct mqueue_msg_s *nxmq_wait_receive(mqd_t mqdes)
* errno value (should be either EINTR or ETIMEDOUT).
*/
if (get_errno() != OK)
ret = rtcb->pterrno;
rtcb->pterrno = saved_errno;
if (ret != OK)
{
break;
return -ret;
}
}
else
@ -199,8 +209,7 @@ FAR struct mqueue_msg_s *nxmq_wait_receive(mqd_t mqdes)
* message queue description referred to by 'mqdes'.
*/
set_errno(EAGAIN);
break;
return -EAGAIN;
}
}
@ -208,32 +217,32 @@ FAR struct mqueue_msg_s *nxmq_wait_receive(mqd_t mqdes)
* the queue while we are still in the critical section
*/
if (rcvmsg)
if (newmsg)
{
msgq->nmsgs--;
}
leave_cancellation_point();
return rcvmsg;
*rcvmsg = newmsg;
return OK;
}
/****************************************************************************
* Name: nxmq_do_receive
*
* Description:
* This is internal, common logic shared by both mq_receive and
* mq_timedreceive. This function accepts the message obtained by
* This is internal, common logic shared by both [nx]mq_receive and
* [nx]mq_timedreceive. This function accepts the message obtained by
* mq_waitmsg, provides the message content to the user, notifies any
* threads that were waiting for the message queue to become non-full,
* and disposes of the message structure
*
* Parameters:
* Input Parameters:
* mqdes - Message queue descriptor
* mqmsg - The message obtained by mq_waitmsg()
* ubuffer - The address of the user provided buffer to receive the message
* prio - The user-provided location to return the message priority.
*
* Return Value:
* Returned Value:
* Returns the length of the received message. This function does not fail.
*
* Assumptions:
@ -293,7 +302,7 @@ ssize_t nxmq_do_receive(mqd_t mqdes, FAR struct mqueue_msg_s *mqmsg,
* time the task is unblocked
*/
ASSERT(btcb);
DEBUGASSERT(btcb);
btcb->msgwaitq = NULL;
msgq->nwaitnotfull--;

View File

@ -1,7 +1,7 @@
/****************************************************************************
* sched/mqueue/mq_receive.c
*
* Copyright (C) 2007, 2009, 2016 Gregory Nutt. All rights reserved.
* Copyright (C) 2007, 2009, 2016-2017 Gregory Nutt. All rights reserved.
* Author: Gregory Nutt <gnutt@nuttx.org>
*
* Redistribution and use in source and binary forms, with or without
@ -47,6 +47,7 @@
#include <nuttx/irq.h>
#include <nuttx/arch.h>
#include <nuttx/mqueue.h>
#include <nuttx/cancelpt.h>
#include "mqueue/mqueue.h"
@ -56,66 +57,50 @@
****************************************************************************/
/****************************************************************************
* Name: mq_receive
* Name: nxmq_receive
*
* Description:
* This function receives the oldest of the highest priority messages
* from the message queue specified by "mqdes." If the size of the
* buffer in bytes (msglen) is less than the "mq_msgsize" attribute of
* the message queue, mq_receive will return an error. Otherwise, the
* selected message is removed from the queue and copied to "msg."
* from the message queue specified by "mqdes." This is an internal OS
* interface. It is functionally equivalent to mq_receive except that:
*
* If the message queue is empty and O_NONBLOCK was not set,
* mq_receive() will block until a message is added to the message
* queue. If more than one task is waiting to receive a message, only
* the task with the highest priority that has waited the longest will
* be unblocked.
* - It is not a cancellaction point, and
* - It does not modify the errno value.
*
* If the queue is empty and O_NONBLOCK is set, ERROR will be returned.
* See comments with mq_receive() for a more complete description of the
* behavior of this function
*
* Parameters:
* mqdes - Message Queue Descriptor
* msg - Buffer to receive the message
* Input Parameters:
* mqdes - Message Queue Descriptor
* msg - Buffer to receive the message
* msglen - Size of the buffer in bytes
* prio - If not NULL, the location to store message priority.
* prio - If not NULL, the location to store message priority.
*
* Return Value:
* One success, the length of the selected message in bytes is returned.
* On failure, -1 (ERROR) is returned and the errno is set appropriately:
*
* EAGAIN The queue was empty, and the O_NONBLOCK flag was set
* for the message queue description referred to by 'mqdes'.
* EPERM Message queue opened not opened for reading.
* EMSGSIZE 'msglen' was less than the maxmsgsize attribute of the
* message queue.
* EINTR The call was interrupted by a signal handler.
* EINVAL Invalid 'msg' or 'mqdes'
*
* Assumptions:
* Returned Value:
* This is an internal OS interface and should not be used by applications.
* It follows the NuttX internal error return policy: Zero (OK) is
* returned on success. A negated errno value is returned on failure.
* (see mq_receive() for the list list valid return values).
*
****************************************************************************/
ssize_t mq_receive(mqd_t mqdes, FAR char *msg, size_t msglen,
FAR int *prio)
ssize_t nxmq_receive(mqd_t mqdes, FAR char *msg, size_t msglen,
FAR int *prio)
{
FAR struct mqueue_msg_s *mqmsg;
irqstate_t flags;
ssize_t ret = ERROR;
ssize_t ret;
DEBUGASSERT(up_interrupt_context() == false);
/* mq_receive() is a cancellation point */
(void)enter_cancellation_point();
/* Verify the input parameters and, in case of an error, set
* errno appropriately.
*/
if (nxmq_verify_receive(mqdes, msg, msglen) != OK)
ret = nxmq_verify_receive(mqdes, msg, msglen);
if (ret < 0)
{
leave_cancellation_point();
return ERROR;
return ret;
}
/* Get the next message from the message queue. We will disable
@ -135,7 +120,7 @@ ssize_t mq_receive(mqd_t mqdes, FAR char *msg, size_t msglen,
/* Get the message from the message queue */
mqmsg = nxmq_wait_receive(mqdes);
ret = nxmq_wait_receive(mqdes, &mqmsg);
leave_critical_section(flags);
/* Check if we got a message from the message queue. We might
@ -145,12 +130,72 @@ ssize_t mq_receive(mqd_t mqdes, FAR char *msg, size_t msglen,
* - The wait was interrupted by a signal
*/
if (mqmsg)
if (ret >= 0)
{
DEBUGASSERT(mqmsg != NULL);
ret = nxmq_do_receive(mqdes, mqmsg, msg, prio);
}
sched_unlock();
return ret;
}
/****************************************************************************
* Name: mq_receive
*
* Description:
* This function receives the oldest of the highest priority messages
* from the message queue specified by "mqdes." If the size of the
* buffer in bytes (msglen) is less than the "mq_msgsize" attribute of
* the message queue, mq_receive will return an error. Otherwise, the
* selected message is removed from the queue and copied to "msg."
*
* If the message queue is empty and O_NONBLOCK was not set,
* mq_receive() will block until a message is added to the message
* queue. If more than one task is waiting to receive a message, only
* the task with the highest priority that has waited the longest will
* be unblocked.
*
* If the queue is empty and O_NONBLOCK is set, ERROR will be returned.
*
* Input Parameters:
* mqdes - Message Queue Descriptor
* msg - Buffer to receive the message
* msglen - Size of the buffer in bytes
* prio - If not NULL, the location to store message priority.
*
* Returned Value:
* One success, the length of the selected message in bytes is returned.
* On failure, -1 (ERROR) is returned and the errno is set appropriately:
*
* EAGAIN The queue was empty, and the O_NONBLOCK flag was set
* for the message queue description referred to by 'mqdes'.
* EPERM Message queue opened not opened for reading.
* EMSGSIZE 'msglen' was less than the maxmsgsize attribute of the
* message queue.
* EINTR The call was interrupted by a signal handler.
* EINVAL Invalid 'msg' or 'mqdes'
*
****************************************************************************/
ssize_t mq_receive(mqd_t mqdes, FAR char *msg, size_t msglen,
FAR int *prio)
{
int ret;
/* mq_receive() is a cancellation point */
(void)enter_cancellation_point();
/* Let nxmq_receive do all of the work */
ret = nxmq_receive(mqdes, msg, msglen, prio);
if (ret < 0)
{
set_errno(-ret);
ret = ERROR;
}
leave_cancellation_point();
return ret;
}

View File

@ -1,7 +1,8 @@
/****************************************************************************
* sched/mqueue/mq_timedreceive.c
*
* Copyright (C) 2007-2009, 2011, 2013-2016 Gregory Nutt. All rights reserved.
* Copyright (C) 2007-2009, 2011, 2013-2017 Gregory Nutt. All rights
* reserved.
* Author: Gregory Nutt <gnutt@nuttx.org>
*
* Redistribution and use in source and binary forms, with or without
@ -50,6 +51,7 @@
#include <nuttx/irq.h>
#include <nuttx/arch.h>
#include <nuttx/wdog.h>
#include <nuttx/mqueue.h>
#include <nuttx/cancelpt.h>
#include "sched/sched.h"
@ -67,11 +69,11 @@
* This function is called if the timeout elapses before the message queue
* becomes non-empty.
*
* Parameters:
* Input Parameters:
* argc - the number of arguments (should be 1)
* pid - the task ID of the task to wakeup
*
* Return Value:
* Returned Value:
* None
*
* Assumptions:
@ -116,84 +118,61 @@ static void nxmq_rcvtimeout(int argc, wdparm_t pid)
****************************************************************************/
/****************************************************************************
* Name: mq_timedreceive
* Name: nxmq_timedreceive
*
* Description:
* This function receives the oldest of the highest priority messages from
* the message queue specified by "mqdes." If the size of the buffer in
* bytes (msglen) is less than the "mq_msgsize" attribute of the message
* queue, mq_timedreceive will return an error. Otherwise, the selected
* message is removed from the queue and copied to "msg."
* the message queue specified by "mqdes." If the message queue is empty
* and O_NONBLOCK was not set, nxmq_timedreceive() will block until a
* message is added to the message queue (or until a timeout occurs).
*
* If the message queue is empty and O_NONBLOCK was not set,
* mq_timedreceive() will block until a message is added to the message
* queue (or until a timeout occurs). If more than one task is waiting
* to receive a message, only the task with the highest priority that has
* waited the longest will be unblocked.
* nxmq_timedreceive() is an internal OS interface. It is functionally
* equivalent to mq_timedreceive() except that:
*
* mq_timedreceive() behaves just like mq_receive(), except that if the
* queue is empty and the O_NONBLOCK flag is not enabled for the message
* queue description, then abstime points to a structure which specifies a
* ceiling on the time for which the call will block. This ceiling is an
* absolute timeout in seconds and nanoseconds since the Epoch (midnight
* on the morning of 1 January 1970).
* - It is not a cancellaction point, and
* - It does not modify the errno value.
*
* If no message is available, and the timeout has already expired by the
* time of the call, mq_timedreceive() returns immediately.
* See comments with mq_timedreceive() for a more complete description of
* the behavior of this function
*
* Parameters:
* mqdes - Message Queue Descriptor
* msg - Buffer to receive the message
* msglen - Size of the buffer in bytes
* prio - If not NULL, the location to store message priority.
* Input Parameters:
* mqdes - Message Queue Descriptor
* msg - Buffer to receive the message
* msglen - Size of the buffer in bytes
* prio - If not NULL, the location to store message priority.
* abstime - the absolute time to wait until a timeout is declared.
*
* Return Value:
* One success, the length of the selected message in bytes is returned.
* On failure, -1 (ERROR) is returned and the errno is set appropriately:
*
* EAGAIN The queue was empty, and the O_NONBLOCK flag was set
* for the message queue description referred to by 'mqdes'.
* EPERM Message queue opened not opened for reading.
* EMSGSIZE 'msglen' was less than the maxmsgsize attribute of the
* message queue.
* EINTR The call was interrupted by a signal handler.
* EINVAL Invalid 'msg' or 'mqdes' or 'abstime'
* ETIMEDOUT The call timed out before a message could be transferred.
*
* Assumptions:
* Returned Value:
* This is an internal OS interface and should not be used by applications.
* It follows the NuttX internal error return policy: Zero (OK) is
* returned on success. A negated errno value is returned on failure.
* (see mq_timedreceive() for the list list valid return values).
*
****************************************************************************/
ssize_t mq_timedreceive(mqd_t mqdes, FAR char *msg, size_t msglen,
ssize_t nxmq_timedreceive(mqd_t mqdes, FAR char *msg, size_t msglen,
FAR int *prio, FAR const struct timespec *abstime)
{
FAR struct tcb_s *rtcb = this_task();
FAR struct mqueue_msg_s *mqmsg;
irqstate_t flags;
int ret = ERROR;
int ret;
DEBUGASSERT(up_interrupt_context() == false && rtcb->waitdog == NULL);
/* mq_timedreceive() is a cancellation point */
(void)enter_cancellation_point();
/* Verify the input parameters and, in case of an error, set
* errno appropriately.
*/
if (nxmq_verify_receive(mqdes, msg, msglen) != OK)
ret = nxmq_verify_receive(mqdes, msg, msglen);
if (ret < 0)
{
leave_cancellation_point();
return ERROR;
return ret;
}
if (!abstime || abstime->tv_nsec < 0 || abstime->tv_nsec >= 1000000000)
{
set_errno(EINVAL);
leave_cancellation_point();
return ERROR;
return -EINVAL;
}
/* Create a watchdog. We will not actually need this watchdog
@ -204,9 +183,7 @@ ssize_t mq_timedreceive(mqd_t mqdes, FAR char *msg, size_t msglen,
rtcb->waitdog = wd_create();
if (!rtcb->waitdog)
{
set_errno(EINVAL);
leave_cancellation_point();
return ERROR;
return -ENOMEM;
}
/* Get the next message from the message queue. We will disable
@ -257,9 +234,7 @@ ssize_t mq_timedreceive(mqd_t mqdes, FAR char *msg, size_t msglen,
wd_delete(rtcb->waitdog);
rtcb->waitdog = NULL;
set_errno(result);
leave_cancellation_point();
return ERROR;
return -result;
}
/* Start the watchdog */
@ -270,7 +245,7 @@ ssize_t mq_timedreceive(mqd_t mqdes, FAR char *msg, size_t msglen,
/* Get the message from the message queue */
mqmsg = nxmq_wait_receive(mqdes);
ret = nxmq_wait_receive(mqdes, &mqmsg);
/* Stop the watchdog timer (this is not harmful in the case where
* it was never started)
@ -290,14 +265,84 @@ ssize_t mq_timedreceive(mqd_t mqdes, FAR char *msg, size_t msglen,
* - The watchdog timeout expired
*/
if (mqmsg)
if (ret >= 0)
{
DEBUGASSERT(mqmsg != NULL);
ret = nxmq_do_receive(mqdes, mqmsg, msg, prio);
}
sched_unlock();
wd_delete(rtcb->waitdog);
rtcb->waitdog = NULL;
return ret;
}
/****************************************************************************
* Name: mq_timedreceive
*
* Description:
* This function receives the oldest of the highest priority messages from
* the message queue specified by "mqdes." If the size of the buffer in
* bytes (msglen) is less than the "mq_msgsize" attribute of the message
* queue, mq_timedreceive will return an error. Otherwise, the selected
* message is removed from the queue and copied to "msg."
*
* If the message queue is empty and O_NONBLOCK was not set,
* mq_timedreceive() will block until a message is added to the message
* queue (or until a timeout occurs). If more than one task is waiting
* to receive a message, only the task with the highest priority that has
* waited the longest will be unblocked.
*
* mq_timedreceive() behaves just like mq_receive(), except that if the
* queue is empty and the O_NONBLOCK flag is not enabled for the message
* queue description, then abstime points to a structure which specifies a
* ceiling on the time for which the call will block. This ceiling is an
* absolute timeout in seconds and nanoseconds since the Epoch (midnight
* on the morning of 1 January 1970).
*
* If no message is available, and the timeout has already expired by the
* time of the call, mq_timedreceive() returns immediately.
*
* Input Parameters:
* mqdes - Message Queue Descriptor
* msg - Buffer to receive the message
* msglen - Size of the buffer in bytes
* prio - If not NULL, the location to store message priority.
* abstime - the absolute time to wait until a timeout is declared.
*
* Returned Value:
* One success, the length of the selected message in bytes is returned.
* On failure, -1 (ERROR) is returned and the errno is set appropriately:
*
* EAGAIN The queue was empty, and the O_NONBLOCK flag was set
* for the message queue description referred to by 'mqdes'.
* EPERM Message queue opened not opened for reading.
* EMSGSIZE 'msglen' was less than the maxmsgsize attribute of the
* message queue.
* EINTR The call was interrupted by a signal handler.
* EINVAL Invalid 'msg' or 'mqdes' or 'abstime'
* ETIMEDOUT The call timed out before a message could be transferred.
*
****************************************************************************/
ssize_t mq_timedreceive(mqd_t mqdes, FAR char *msg, size_t msglen,
FAR int *prio, FAR const struct timespec *abstime)
{
int ret;
/* mq_timedreceive() is a cancellation point */
(void)enter_cancellation_point();
/* Let nxmq_timedreceive do all of the work */
ret = nxmq_timedreceive(mqdes, msg, msglen, prio, abstime);
if (ret < 0)
{
set_errno(-ret);
ret = ERROR;
}
leave_cancellation_point();
return ret;
}

View File

@ -58,7 +58,8 @@
* Description:
* This function is called when a signal or a timeout is received by a
* task that is waiting on a message queue -- either for a queue to
* becoming not full (on mq_send) or not empty (on mq_receive).
* becoming not full (on mq_send and friends) or not empty (on mq_receive
* and friends).
*
* Parameters:
* wtcb - A pointer to the TCB of the task that is waiting on a message

View File

@ -152,7 +152,7 @@ void nxmq_wait_irq(FAR struct tcb_s *wtcb, int errcode);
/* mq_rcvinternal.c ********************************************************/
int nxmq_verify_receive(mqd_t mqdes, FAR char *msg, size_t msglen);
FAR struct mqueue_msg_s *nxmq_wait_receive(mqd_t mqdes);
int nxmq_wait_receive(mqd_t mqdes, FAR struct mqueue_msg_s **rcvmsg);
ssize_t nxmq_do_receive(mqd_t mqdes, FAR struct mqueue_msg_s *mqmsg,
FAR char *ubuffer, FAR int *prio);