sched/mqueue: minor code tuning of message queue

1. remove unnecessary temporary variables
2. adjust the protection scope of the critical section

Signed-off-by: chao.an <anchao@xiaomi.com>
This commit is contained in:
chao.an 2022-06-08 20:39:03 +08:00 committed by Xiang Xiao
parent 8da798926d
commit be33d66c05
7 changed files with 82 additions and 140 deletions

View File

@ -55,8 +55,6 @@
void nxmq_free_msg(FAR struct mqueue_msg_s *mqmsg) void nxmq_free_msg(FAR struct mqueue_msg_s *mqmsg)
{ {
irqstate_t flags;
/* If this is a generally available pre-allocated message, /* If this is a generally available pre-allocated message,
* then just put it back in the free list. * then just put it back in the free list.
*/ */
@ -67,9 +65,7 @@ void nxmq_free_msg(FAR struct mqueue_msg_s *mqmsg)
* list from interrupt handlers. * list from interrupt handlers.
*/ */
flags = enter_critical_section();
sq_addlast((FAR sq_entry_t *)mqmsg, &g_msgfree); sq_addlast((FAR sq_entry_t *)mqmsg, &g_msgfree);
leave_critical_section(flags);
} }
/* If this is a message pre-allocated for interrupts, /* If this is a message pre-allocated for interrupts,
@ -82,9 +78,7 @@ void nxmq_free_msg(FAR struct mqueue_msg_s *mqmsg)
* list from interrupt handlers. * list from interrupt handlers.
*/ */
flags = enter_critical_section();
sq_addlast((FAR sq_entry_t *)mqmsg, &g_msgfreeirq); sq_addlast((FAR sq_entry_t *)mqmsg, &g_msgfreeirq);
leave_critical_section(flags);
} }
/* Otherwise, deallocate it. Note: interrupt handlers /* Otherwise, deallocate it. Note: interrupt handlers

View File

@ -123,12 +123,10 @@ int nxmq_verify_receive(FAR struct mqueue_inode_s *msgq,
int nxmq_wait_receive(FAR struct mqueue_inode_s *msgq, int nxmq_wait_receive(FAR struct mqueue_inode_s *msgq,
int oflags, FAR struct mqueue_msg_s **rcvmsg) int oflags, FAR struct mqueue_msg_s **rcvmsg)
{ {
FAR struct tcb_s *rtcb;
FAR struct mqueue_msg_s *newmsg; FAR struct mqueue_msg_s *newmsg;
int ret; FAR struct tcb_s *rtcb;
DEBUGASSERT(rcvmsg != NULL); DEBUGASSERT(rcvmsg != NULL);
*rcvmsg = NULL; /* Assume failure */
#ifdef CONFIG_CANCELLATION_POINTS #ifdef CONFIG_CANCELLATION_POINTS
/* nxmq_wait_receive() is not a cancellation point, but it may be called /* nxmq_wait_receive() is not a cancellation point, but it may be called
@ -181,10 +179,9 @@ int nxmq_wait_receive(FAR struct mqueue_inode_s *msgq,
* errno value (should be either EINTR or ETIMEDOUT). * errno value (should be either EINTR or ETIMEDOUT).
*/ */
ret = rtcb->errcode; if (rtcb->errcode != OK)
if (ret != OK)
{ {
return -ret; return -rtcb->errcode;
} }
} }
else else
@ -247,7 +244,6 @@ ssize_t nxmq_do_receive(FAR struct mqueue_inode_s *msgq,
FAR char *ubuffer, unsigned int *prio) FAR char *ubuffer, unsigned int *prio)
{ {
FAR struct tcb_s *btcb; FAR struct tcb_s *btcb;
irqstate_t flags;
ssize_t rcvmsglen; ssize_t rcvmsglen;
/* Get the length of the message (also the return value) */ /* Get the length of the message (also the return value) */
@ -279,7 +275,6 @@ ssize_t nxmq_do_receive(FAR struct mqueue_inode_s *msgq,
* messages can be sent from interrupt handlers. * messages can be sent from interrupt handlers.
*/ */
flags = enter_critical_section();
for (btcb = (FAR struct tcb_s *)g_waitingformqnotfull.head; for (btcb = (FAR struct tcb_s *)g_waitingformqnotfull.head;
btcb && btcb->msgwaitq != msgq; btcb && btcb->msgwaitq != msgq;
btcb = btcb->flink) btcb = btcb->flink)
@ -296,8 +291,6 @@ ssize_t nxmq_do_receive(FAR struct mqueue_inode_s *msgq,
btcb->msgwaitq = NULL; btcb->msgwaitq = NULL;
msgq->nwaitnotfull--; msgq->nwaitnotfull--;
up_unblock_task(btcb); up_unblock_task(btcb);
leave_critical_section(flags);
} }
/* Return the length of the message transferred to the user buffer */ /* Return the length of the message transferred to the user buffer */

View File

@ -79,7 +79,6 @@ ssize_t file_mq_receive(FAR struct file *mq, FAR char *msg, size_t msglen,
irqstate_t flags; irqstate_t flags;
ssize_t ret; ssize_t ret;
inode = mq->f_inode;
if (!inode) if (!inode)
{ {
return -EBADF; return -EBADF;
@ -108,7 +107,6 @@ ssize_t file_mq_receive(FAR struct file *mq, FAR char *msg, size_t msglen,
/* Get the message from the message queue */ /* Get the message from the message queue */
ret = nxmq_wait_receive(msgq, mq->f_oflags, &mqmsg); ret = nxmq_wait_receive(msgq, mq->f_oflags, &mqmsg);
leave_critical_section(flags);
/* Check if we got a message from the message queue. We might /* Check if we got a message from the message queue. We might
* not have a message if: * not have a message if:
@ -117,12 +115,13 @@ ssize_t file_mq_receive(FAR struct file *mq, FAR char *msg, size_t msglen,
* - The wait was interrupted by a signal * - The wait was interrupted by a signal
*/ */
if (ret >= 0) if (ret == OK)
{ {
DEBUGASSERT(mqmsg != NULL);
ret = nxmq_do_receive(msgq, mqmsg, msg, prio); ret = nxmq_do_receive(msgq, mqmsg, msg, prio);
} }
leave_critical_section(flags);
return ret; return ret;
} }

View File

@ -70,13 +70,12 @@
int file_mq_send(FAR struct file *mq, FAR const char *msg, size_t msglen, int file_mq_send(FAR struct file *mq, FAR const char *msg, size_t msglen,
unsigned int prio) unsigned int prio)
{ {
FAR struct mqueue_msg_s *mqmsg = NULL;
FAR struct inode *inode = mq->f_inode; FAR struct inode *inode = mq->f_inode;
FAR struct mqueue_inode_s *msgq; FAR struct mqueue_inode_s *msgq;
FAR struct mqueue_msg_s *mqmsg;
irqstate_t flags; irqstate_t flags;
int ret; int ret;
inode = mq->f_inode;
if (!inode) if (!inode)
{ {
return -EBADF; return -EBADF;
@ -101,9 +100,7 @@ int file_mq_send(FAR struct file *mq, FAR const char *msg, size_t msglen,
* non-FULL. This would fail with EAGAIN, EINTR, or ETIMEOUT. * non-FULL. This would fail with EAGAIN, EINTR, or ETIMEOUT.
*/ */
mqmsg = NULL;
flags = enter_critical_section(); flags = enter_critical_section();
ret = OK;
if (!up_interrupt_context()) /* In an interrupt handler? */ if (!up_interrupt_context()) /* In an interrupt handler? */
{ {
@ -121,8 +118,7 @@ int file_mq_send(FAR struct file *mq, FAR const char *msg, size_t msglen,
/* ret can only be negative if nxmq_wait_send failed */ /* ret can only be negative if nxmq_wait_send failed */
leave_critical_section(flags); if (ret == OK)
if (ret >= 0)
{ {
/* Now allocate the message. */ /* Now allocate the message. */
@ -130,16 +126,6 @@ int file_mq_send(FAR struct file *mq, FAR const char *msg, size_t msglen,
/* Check if the message was successfully allocated */ /* Check if the message was successfully allocated */
ret = (mqmsg == NULL) ? -ENOMEM : OK;
}
/* Check if we were able to get a message structure -- this can fail
* either because we cannot send the message (and didn't bother trying
* to allocate it) or because the allocation failed.
*/
if (mqmsg != NULL)
{
/* The allocation was successful (implying that we can also send the /* The allocation was successful (implying that we can also send the
* message). Perform the message send. * message). Perform the message send.
* *
@ -149,9 +135,12 @@ int file_mq_send(FAR struct file *mq, FAR const char *msg, size_t msglen,
* to be exceeded in that case. * to be exceeded in that case.
*/ */
ret = nxmq_do_send(msgq, mqmsg, msg, msglen, prio); ret = (mqmsg == NULL) ? -ENOMEM :
nxmq_do_send(msgq, mqmsg, msg, msglen, prio);
} }
leave_critical_section(flags);
return ret; return ret;
} }

View File

@ -126,7 +126,6 @@ int nxmq_verify_send(FAR struct mqueue_inode_s *msgq, int oflags,
FAR struct mqueue_msg_s *nxmq_alloc_msg(void) FAR struct mqueue_msg_s *nxmq_alloc_msg(void)
{ {
FAR struct mqueue_msg_s *mqmsg; FAR struct mqueue_msg_s *mqmsg;
irqstate_t flags;
/* If we were called from an interrupt handler, then try to get the message /* If we were called from an interrupt handler, then try to get the message
* from generally available list of messages. If this fails, then try the * from generally available list of messages. If this fails, then try the
@ -154,9 +153,7 @@ FAR struct mqueue_msg_s *nxmq_alloc_msg(void)
* Disable interrupts -- we might be called from an interrupt handler. * Disable interrupts -- we might be called from an interrupt handler.
*/ */
flags = enter_critical_section();
mqmsg = (FAR struct mqueue_msg_s *)sq_remfirst(&g_msgfree); mqmsg = (FAR struct mqueue_msg_s *)sq_remfirst(&g_msgfree);
leave_critical_section(flags);
/* If we cannot a message from the free list, then we will have to /* If we cannot a message from the free list, then we will have to
* allocate one. * allocate one.
@ -214,7 +211,6 @@ FAR struct mqueue_msg_s *nxmq_alloc_msg(void)
int nxmq_wait_send(FAR struct mqueue_inode_s *msgq, int oflags) int nxmq_wait_send(FAR struct mqueue_inode_s *msgq, int oflags)
{ {
FAR struct tcb_s *rtcb; FAR struct tcb_s *rtcb;
int ret;
#ifdef CONFIG_CANCELLATION_POINTS #ifdef CONFIG_CANCELLATION_POINTS
/* nxmq_wait_send() is not a cancellation point, but may be called via /* nxmq_wait_send() is not a cancellation point, but may be called via
@ -233,7 +229,11 @@ int nxmq_wait_send(FAR struct mqueue_inode_s *msgq, int oflags)
/* Verify that the queue is indeed full as the caller thinks */ /* Verify that the queue is indeed full as the caller thinks */
if (msgq->nmsgs >= msgq->maxmsgs) /* Loop until there are fewer than max allowable messages in the
* receiving message queue
*/
while (msgq->nmsgs >= msgq->maxmsgs)
{ {
/* Should we block until there is sufficient space in the /* Should we block until there is sufficient space in the
* message queue? * message queue?
@ -246,51 +246,36 @@ int nxmq_wait_send(FAR struct mqueue_inode_s *msgq, int oflags)
return -EAGAIN; return -EAGAIN;
} }
/* Yes... We will not return control until the message queue is /* Block until the message queue is no longer full.
* available or we receive a signal or at timeout occurs. * When we are unblocked, we will try again
*/ */
else rtcb = this_task();
rtcb->msgwaitq = msgq;
msgq->nwaitnotfull++;
/* Initialize the errcode used to communication wake-up error
* conditions.
*/
rtcb->errcode = OK;
/* Make sure this is not the idle task, descheduling that
* isn't going to end well.
*/
DEBUGASSERT(NULL != rtcb->flink);
up_block_task(rtcb, TSTATE_WAIT_MQNOTFULL);
/* When we resume at this point, either (1) the message queue
* is no longer empty, or (2) the wait has been interrupted by
* a signal. We can detect the latter case be examining the
* per-task errno value (should be EINTR or ETIMEOUT).
*/
if (rtcb->errcode != OK)
{ {
/* Loop until there are fewer than max allowable messages in the return -rtcb->errcode;
* receiving message queue
*/
while (msgq->nmsgs >= msgq->maxmsgs)
{
/* Block until the message queue is no longer full.
* When we are unblocked, we will try again
*/
rtcb = this_task();
rtcb->msgwaitq = msgq;
msgq->nwaitnotfull++;
/* Initialize the errcode used to communication wake-up error
* conditions.
*/
rtcb->errcode = OK;
/* Make sure this is not the idle task, descheduling that
* isn't going to end well.
*/
DEBUGASSERT(NULL != rtcb->flink);
up_block_task(rtcb, TSTATE_WAIT_MQNOTFULL);
/* When we resume at this point, either (1) the message queue
* is no longer empty, or (2) the wait has been interrupted by
* a signal. We can detect the latter case be examining the
* per-task errno value (should be EINTR or ETIMEOUT).
*/
ret = rtcb->errcode;
if (ret != OK)
{
return -ret;
}
}
} }
} }
@ -325,7 +310,6 @@ int nxmq_do_send(FAR struct mqueue_inode_s *msgq,
FAR struct tcb_s *btcb; FAR struct tcb_s *btcb;
FAR struct mqueue_msg_s *next; FAR struct mqueue_msg_s *next;
FAR struct mqueue_msg_s *prev; FAR struct mqueue_msg_s *prev;
irqstate_t flags;
/* Construct the message header info */ /* Construct the message header info */
@ -336,11 +320,8 @@ int nxmq_do_send(FAR struct mqueue_inode_s *msgq,
memcpy((FAR void *)mqmsg->mail, (FAR const void *)msg, msglen); memcpy((FAR void *)mqmsg->mail, (FAR const void *)msg, msglen);
/* Insert the new message in the message queue */ /* Insert the new message in the message queue
* Search the message list to find the location to insert the new
flags = enter_critical_section();
/* Search the message list to find the location to insert the new
* message. Each is list is maintained in ascending priority order. * message. Each is list is maintained in ascending priority order.
*/ */
@ -367,8 +348,6 @@ int nxmq_do_send(FAR struct mqueue_inode_s *msgq,
nxmq_pollnotify(msgq, POLLIN); nxmq_pollnotify(msgq, POLLIN);
} }
leave_critical_section(flags);
/* Check if we need to notify any tasks that are attached to the /* Check if we need to notify any tasks that are attached to the
* message queue * message queue
*/ */
@ -398,7 +377,6 @@ int nxmq_do_send(FAR struct mqueue_inode_s *msgq,
/* Check if any tasks are waiting for the MQ not empty event. */ /* Check if any tasks are waiting for the MQ not empty event. */
flags = enter_critical_section();
if (msgq->nwaitnotempty > 0) if (msgq->nwaitnotempty > 0)
{ {
/* Find the highest priority task that is waiting for /* Find the highest priority task that is waiting for
@ -423,6 +401,5 @@ int nxmq_do_send(FAR struct mqueue_inode_s *msgq,
up_unblock_task(btcb); up_unblock_task(btcb);
} }
leave_critical_section(flags);
return OK; return OK;
} }

View File

@ -145,7 +145,6 @@ ssize_t file_mq_timedreceive(FAR struct file *mq, FAR char *msg,
irqstate_t flags; irqstate_t flags;
int ret; int ret;
inode = mq->f_inode;
if (!inode) if (!inode)
{ {
return -EBADF; return -EBADF;
@ -188,23 +187,23 @@ ssize_t file_mq_timedreceive(FAR struct file *mq, FAR char *msg,
* disabled here so that this time stays valid until the wait begins. * disabled here so that this time stays valid until the wait begins.
*/ */
int result = clock_abstime2ticks(CLOCK_REALTIME, abstime, &ticks); ret = clock_abstime2ticks(CLOCK_REALTIME, abstime, &ticks);
/* If the time has already expired and the message queue is empty, /* If the time has already expired and the message queue is empty,
* return immediately. * return immediately.
*/ */
if (result == OK && ticks <= 0) if (ret == OK && ticks <= 0)
{ {
result = ETIMEDOUT; ret = ETIMEDOUT;
} }
/* Handle any time-related errors */ /* Handle any time-related errors */
if (result != OK) if (ret != OK)
{ {
leave_critical_section(flags); ret = -ret;
return -result; goto errout_in_critical_section;
} }
/* Start the watchdog */ /* Start the watchdog */
@ -222,10 +221,6 @@ ssize_t file_mq_timedreceive(FAR struct file *mq, FAR char *msg,
wd_cancel(&rtcb->waitdog); wd_cancel(&rtcb->waitdog);
/* We can now restore interrupts */
leave_critical_section(flags);
/* Check if we got a message from the message queue. We might /* Check if we got a message from the message queue. We might
* not have a message if: * not have a message if:
* *
@ -234,12 +229,17 @@ ssize_t file_mq_timedreceive(FAR struct file *mq, FAR char *msg,
* - The watchdog timeout expired * - The watchdog timeout expired
*/ */
if (ret >= 0) if (ret == OK)
{ {
DEBUGASSERT(mqmsg != NULL); DEBUGASSERT(mqmsg != NULL);
ret = nxmq_do_receive(msgq, mqmsg, msg, prio); ret = nxmq_do_receive(msgq, mqmsg, msg, prio);
} }
/* We can now restore interrupts */
errout_in_critical_section:
leave_critical_section(flags);
return ret; return ret;
} }

View File

@ -146,16 +146,14 @@ int file_mq_timedsend(FAR struct file *mq, FAR const char *msg,
size_t msglen, unsigned int prio, size_t msglen, unsigned int prio,
FAR const struct timespec *abstime) FAR const struct timespec *abstime)
{ {
FAR struct tcb_s *rtcb = this_task();
FAR struct inode *inode = mq->f_inode; FAR struct inode *inode = mq->f_inode;
FAR struct tcb_s *rtcb = this_task();
FAR struct mqueue_inode_s *msgq; FAR struct mqueue_inode_s *msgq;
FAR struct mqueue_msg_s *mqmsg; FAR struct mqueue_msg_s *mqmsg;
irqstate_t flags; irqstate_t flags;
sclock_t ticks; sclock_t ticks;
int result;
int ret; int ret;
inode = mq->f_inode;
if (!inode) if (!inode)
{ {
return -EBADF; return -EBADF;
@ -173,6 +171,10 @@ int file_mq_timedsend(FAR struct file *mq, FAR const char *msg,
return ret; return ret;
} }
/* Disable interruption */
flags = enter_critical_section();
/* Pre-allocate a message structure */ /* Pre-allocate a message structure */
mqmsg = nxmq_alloc_msg(); mqmsg = nxmq_alloc_msg();
@ -182,7 +184,8 @@ int file_mq_timedsend(FAR struct file *mq, FAR const char *msg,
* errno value. * errno value.
*/ */
return -ENOMEM; ret = -ENOMEM;
goto errout_in_critical_section;
} }
/* OpenGroup.org: "Under no circumstance shall the operation fail with a /* OpenGroup.org: "Under no circumstance shall the operation fail with a
@ -205,7 +208,7 @@ int file_mq_timedsend(FAR struct file *mq, FAR const char *msg,
* Currently nxmq_do_send() always returns OK. * Currently nxmq_do_send() always returns OK.
*/ */
return nxmq_do_send(msgq, mqmsg, msg, msglen, prio); goto out_send_message;
} }
/* The message queue is full... We are going to wait. Now we must have a /* The message queue is full... We are going to wait. Now we must have a
@ -215,7 +218,8 @@ int file_mq_timedsend(FAR struct file *mq, FAR const char *msg,
if (!abstime || abstime->tv_nsec < 0 || abstime->tv_nsec >= 1000000000) if (!abstime || abstime->tv_nsec < 0 || abstime->tv_nsec >= 1000000000)
{ {
ret = -EINVAL; ret = -EINVAL;
goto errout_with_mqmsg; nxmq_free_msg(mqmsg);
goto errout_in_critical_section;
} }
/* We are not in an interrupt handler and the message queue is full. /* We are not in an interrupt handler and the message queue is full.
@ -225,23 +229,22 @@ int file_mq_timedsend(FAR struct file *mq, FAR const char *msg,
* disabled here so that this time stays valid until the wait begins. * disabled here so that this time stays valid until the wait begins.
*/ */
flags = enter_critical_section(); ret = clock_abstime2ticks(CLOCK_REALTIME, abstime, &ticks);
result = clock_abstime2ticks(CLOCK_REALTIME, abstime, &ticks);
/* If the time has already expired and the message queue is empty, /* If the time has already expired and the message queue is empty,
* return immediately. * return immediately.
*/ */
if (result == OK && ticks <= 0) if (ret == OK && ticks <= 0)
{ {
result = ETIMEDOUT; ret = ETIMEDOUT;
} }
/* Handle any time-related errors */ /* Handle any time-related errors */
if (result != OK) if (ret != OK)
{ {
ret = -result; ret = -ret;
goto errout_in_critical_section; goto errout_in_critical_section;
} }
@ -261,26 +264,19 @@ int file_mq_timedsend(FAR struct file *mq, FAR const char *msg,
/* Check if nxmq_wait_send() failed */ /* Check if nxmq_wait_send() failed */
if (ret < 0) if (ret == OK)
{ {
/* nxmq_wait_send() failed. */ /* If any of the above failed, set the errno. Otherwise, there should
* be space for another message in the message queue. NOW we can
* allocate the message structure.
*
* Currently nxmq_do_send() always returns OK.
*/
goto errout_in_critical_section; out_send_message:
ret = nxmq_do_send(msgq, mqmsg, msg, msglen, prio);
} }
/* That is the end of the atomic operations */
leave_critical_section(flags);
/* If any of the above failed, set the errno. Otherwise, there should
* be space for another message in the message queue. NOW we can allocate
* the message structure.
*
* Currently nxmq_do_send() always returns OK.
*/
return nxmq_do_send(msgq, mqmsg, msg, msglen, prio);
/* Exit here with (1) the scheduler locked, (2) a message allocated, (3) a /* Exit here with (1) the scheduler locked, (2) a message allocated, (3) a
* wdog allocated, and (4) interrupts disabled. * wdog allocated, and (4) interrupts disabled.
*/ */
@ -288,12 +284,6 @@ int file_mq_timedsend(FAR struct file *mq, FAR const char *msg,
errout_in_critical_section: errout_in_critical_section:
leave_critical_section(flags); leave_critical_section(flags);
/* Exit here with (1) the scheduler locked and 2) a message allocated. The
* error code is in 'result'
*/
errout_with_mqmsg:
nxmq_free_msg(mqmsg);
return ret; return ret;
} }