sched/mqueue: add support of tick based send() and receive()

Add tick based send()/receive() to reduce the cost of time conversion

Signed-off-by: chao an <anchao@lixiang.com>
This commit is contained in:
chao an 2024-05-10 17:21:40 +08:00 committed by Xiang Xiao
parent 3a4560ed40
commit 66509e490e
3 changed files with 339 additions and 81 deletions

View File

@ -588,6 +588,52 @@ int file_mq_timedsend(FAR struct file *mq, FAR const char *msg,
size_t msglen, unsigned int prio,
FAR const struct timespec *abstime);
/****************************************************************************
* Name: file_mq_ticksend
*
* Description:
* This function adds the specified message (msg) to the message queue
* (mq). file_mq_ticksend() behaves just like mq_send(), except that if
* the queue is full and the O_NONBLOCK flag is not enabled for the
* message queue description, then abstime points to a structure which
* specifies a ceiling on the time for which the call will block.
*
* file_mq_ticksend() is functionally equivalent to mq_timedsend() except
* that:
*
* - It is not a cancellation point, and
* - It does not modify the errno value.
*
* See comments with mq_timedsend() for a more complete description of the
* behavior of this function
*
* Input Parameters:
* mq - Message queue descriptor
* msg - Message to send
* msglen - The length of the message in bytes
* prio - The priority of the message
* ticks - Ticks to wait from the start time until the semaphore is
* posted.
*
* Returned Value:
* This is an internal OS interface and should not be used by applications.
* It follows the NuttX internal error return policy: Zero (OK) is
* returned on success. A negated errno value is returned on failure.
* (see mq_timedsend() for the list list valid return values).
*
* EAGAIN The queue was empty, and the O_NONBLOCK flag was set for the
* message queue description referred to by mq.
* EINVAL Either msg or mq is NULL or the value of prio is invalid.
* EBADF Message queue opened not opened for writing.
* EMSGSIZE 'msglen' was greater than the maxmsgsize attribute of the
* message queue.
* EINTR The call was interrupted by a signal handler.
*
****************************************************************************/
int file_mq_ticksend(FAR struct file *mq, FAR const char *msg,
size_t msglen, unsigned int prio, sclock_t ticks);
/****************************************************************************
* Name: file_mq_receive
*
@ -656,6 +702,44 @@ ssize_t file_mq_timedreceive(FAR struct file *mq, FAR char *msg,
size_t msglen, FAR unsigned int *prio,
FAR const struct timespec *abstime);
/****************************************************************************
* Name: file_mq_tickreceive
*
* Description:
* This function receives the oldest of the highest priority messages from
* the message queue specified by "mq." If the message queue is empty
* and O_NONBLOCK was not set, file_mq_tickreceive() will block until a
* message is added to the message queue (or until a timeout occurs).
*
* file_mq_tickreceive() is an internal OS interface. It is functionally
* equivalent to mq_timedreceive() except that:
*
* - It is not a cancellation point, and
* - It does not modify the errno value.
*
* See comments with mq_timedreceive() for a more complete description of
* the behavior of this function
*
* Input Parameters:
* mq - Message Queue Descriptor
* msg - Buffer to receive the message
* msglen - Size of the buffer in bytes
* prio - If not NULL, the location to store message priority.
* ticks - Ticks to wait from the start time until the semaphore is
* posted.
*
* Returned Value:
* This is an internal OS interface and should not be used by applications.
* It follows the NuttX internal error return policy: Zero (OK) is
* returned on success. A negated errno value is returned on failure.
* (see mq_timedreceive() for the list list valid return values).
*
****************************************************************************/
ssize_t file_mq_tickreceive(FAR struct file *mq, FAR char *msg,
size_t msglen, FAR unsigned int *prio,
sclock_t ticks);
/****************************************************************************
* Name: file_mq_setattr
*

View File

@ -102,22 +102,12 @@ static void nxmq_rcvtimeout(wdparm_t pid)
****************************************************************************/
/****************************************************************************
* Name: file_mq_timedreceive
* Name: file_mq_timedreceive_internal
*
* Description:
* This function receives the oldest of the highest priority messages from
* the message queue specified by "mq." If the message queue is empty
* and O_NONBLOCK was not set, file_mq_timedreceive() will block until a
* message is added to the message queue (or until a timeout occurs).
*
* file_mq_timedreceive() is an internal OS interface. It is functionally
* equivalent to mq_timedreceive() except that:
*
* - It is not a cancellation point, and
* - It does not modify the errno value.
*
* See comments with mq_timedreceive() for a more complete description of
* the behavior of this function
* This is an internal function of file_mq_timedreceive()/
* file_mq_tickreceive(), please refer to the detailed description for
* more information.
*
* Input Parameters:
* mq - Message Queue Descriptor
@ -125,6 +115,8 @@ static void nxmq_rcvtimeout(wdparm_t pid)
* msglen - Size of the buffer in bytes
* prio - If not NULL, the location to store message priority.
* abstime - the absolute time to wait until a timeout is declared.
* ticks - Ticks to wait from the start time until the semaphore is
* posted.
*
* Returned Value:
* This is an internal OS interface and should not be used by applications.
@ -134,9 +126,11 @@ static void nxmq_rcvtimeout(wdparm_t pid)
*
****************************************************************************/
ssize_t file_mq_timedreceive(FAR struct file *mq, FAR char *msg,
size_t msglen, FAR unsigned int *prio,
FAR const struct timespec *abstime)
static ssize_t
file_mq_timedreceive_internal(FAR struct file *mq, FAR char *msg,
size_t msglen, FAR unsigned int *prio,
FAR const struct timespec *abstime,
sclock_t ticks)
{
FAR struct tcb_s *rtcb = this_task();
FAR struct mqueue_inode_s *msgq;
@ -156,11 +150,6 @@ ssize_t file_mq_timedreceive(FAR struct file *mq, FAR char *msg,
return ret;
}
if (!abstime || abstime->tv_nsec < 0 || abstime->tv_nsec >= 1000000000)
{
return -EINVAL;
}
msgq = mq->f_inode->i_private;
/* Furthermore, nxmq_wait_receive() expects to have interrupts disabled
@ -175,28 +164,37 @@ ssize_t file_mq_timedreceive(FAR struct file *mq, FAR char *msg,
if (list_is_empty(&msgq->msglist))
{
sclock_t ticks;
if (abstime != NULL)
{
if (abstime->tv_nsec < 0 || abstime->tv_nsec >= 1000000000)
{
ret = -EINVAL;
}
else
{
/* Convert the timespec to clock ticks.
* We must have interrupts disabled here so that
* this time stays valid until the wait begins.
*/
/* Convert the timespec to clock ticks. We must have interrupts
* disabled here so that this time stays valid until the wait begins.
*/
ret = clock_abstime2ticks(CLOCK_REALTIME, abstime, &ticks);
}
ret = clock_abstime2ticks(CLOCK_REALTIME, abstime, &ticks);
/* Handle any time-related errors */
if (ret != OK)
{
goto errout_in_critical_section;
}
}
/* If the time has already expired and the message queue is empty,
* return immediately.
*/
if (ret == OK && ticks <= 0)
if (ticks <= 0)
{
ret = ETIMEDOUT;
}
/* Handle any time-related errors */
if (ret != OK)
{
ret = -ret;
ret = -ETIMEDOUT;
goto errout_in_critical_section;
}
@ -238,6 +236,87 @@ errout_in_critical_section:
return ret;
}
/****************************************************************************
* Name: file_mq_timedreceive
*
* Description:
* This function receives the oldest of the highest priority messages from
* the message queue specified by "mq." If the message queue is empty
* and O_NONBLOCK was not set, file_mq_timedreceive() will block until a
* message is added to the message queue (or until a timeout occurs).
*
* file_mq_timedreceive() is an internal OS interface. It is functionally
* equivalent to mq_timedreceive() except that:
*
* - It is not a cancellation point, and
* - It does not modify the errno value.
*
* See comments with mq_timedreceive() for a more complete description of
* the behavior of this function
*
* Input Parameters:
* mq - Message Queue Descriptor
* msg - Buffer to receive the message
* msglen - Size of the buffer in bytes
* prio - If not NULL, the location to store message priority.
* abstime - the absolute time to wait until a timeout is declared.
*
* Returned Value:
* This is an internal OS interface and should not be used by applications.
* It follows the NuttX internal error return policy: Zero (OK) is
* returned on success. A negated errno value is returned on failure.
* (see mq_timedreceive() for the list list valid return values).
*
****************************************************************************/
ssize_t file_mq_timedreceive(FAR struct file *mq, FAR char *msg,
size_t msglen, FAR unsigned int *prio,
FAR const struct timespec *abstime)
{
return file_mq_timedreceive_internal(mq, msg, msglen, prio, abstime, 0);
}
/****************************************************************************
* Name: file_mq_tickreceive
*
* Description:
* This function receives the oldest of the highest priority messages from
* the message queue specified by "mq." If the message queue is empty
* and O_NONBLOCK was not set, file_mq_tickreceive() will block until a
* message is added to the message queue (or until a timeout occurs).
*
* file_mq_tickreceive() is an internal OS interface. It is functionally
* equivalent to mq_timedreceive() except that:
*
* - It is not a cancellation point, and
* - It does not modify the errno value.
*
* See comments with mq_timedreceive() for a more complete description of
* the behavior of this function
*
* Input Parameters:
* mq - Message Queue Descriptor
* msg - Buffer to receive the message
* msglen - Size of the buffer in bytes
* prio - If not NULL, the location to store message priority.
* ticks - Ticks to wait from the start time until the semaphore is
* posted.
*
* Returned Value:
* This is an internal OS interface and should not be used by applications.
* It follows the NuttX internal error return policy: Zero (OK) is
* returned on success. A negated errno value is returned on failure.
* (see mq_timedreceive() for the list list valid return values).
*
****************************************************************************/
ssize_t file_mq_tickreceive(FAR struct file *mq, FAR char *msg,
size_t msglen, FAR unsigned int *prio,
sclock_t ticks)
{
return file_mq_timedreceive_internal(mq, msg, msglen, prio, NULL, ticks);
}
/****************************************************************************
* Name: nxmq_timedreceive
*
@ -284,7 +363,7 @@ ssize_t nxmq_timedreceive(mqd_t mqdes, FAR char *msg, size_t msglen,
return ret;
}
return file_mq_timedreceive(filep, msg, msglen, prio, abstime);
return file_mq_timedreceive_internal(filep, msg, msglen, prio, abstime, 0);
}
/****************************************************************************

View File

@ -101,23 +101,11 @@ static void nxmq_sndtimeout(wdparm_t pid)
****************************************************************************/
/****************************************************************************
* Name: file_mq_timedsend
* Name: file_mq_timedsend_internal
*
* Description:
* This function adds the specified message (msg) to the message queue
* (mq). file_mq_timedsend() behaves just like mq_send(), except that if
* the queue is full and the O_NONBLOCK flag is not enabled for the
* message queue description, then abstime points to a structure which
* specifies a ceiling on the time for which the call will block.
*
* file_mq_timedsend() is functionally equivalent to mq_timedsend() except
* that:
*
* - It is not a cancellation point, and
* - It does not modify the errno value.
*
* See comments with mq_timedsend() for a more complete description of the
* behavior of this function
* This is an internal function of file_mq_timedsend()/file_mq_ticksend(),
* please refer to the detailed description for more information.
*
* Input Parameters:
* mq - Message queue descriptor
@ -125,6 +113,8 @@ static void nxmq_sndtimeout(wdparm_t pid)
* msglen - The length of the message in bytes
* prio - The priority of the message
* abstime - the absolute time to wait until a timeout is decleared
* ticks - Ticks to wait from the start time until the semaphore is
* posted.
*
* Returned Value:
* This is an internal OS interface and should not be used by applications.
@ -142,15 +132,16 @@ static void nxmq_sndtimeout(wdparm_t pid)
*
****************************************************************************/
int file_mq_timedsend(FAR struct file *mq, FAR const char *msg,
size_t msglen, unsigned int prio,
FAR const struct timespec *abstime)
static int
file_mq_timedsend_internal(FAR struct file *mq, FAR const char *msg,
size_t msglen, unsigned int prio,
FAR const struct timespec *abstime,
sclock_t ticks)
{
FAR struct tcb_s *rtcb = this_task();
FAR struct mqueue_inode_s *msgq;
FAR struct mqueue_msg_s *mqmsg;
irqstate_t flags;
sclock_t ticks;
int ret;
DEBUGASSERT(up_interrupt_context() == false);
@ -205,40 +196,46 @@ int file_mq_timedsend(FAR struct file *mq, FAR const char *msg,
goto out_send_message;
}
/* The message queue is full... We are going to wait. Now we must have a
* valid time value.
/* The message queue is full... We are going to wait.
* Now we must have a valid time value.
*/
if (!abstime || abstime->tv_nsec < 0 || abstime->tv_nsec >= 1000000000)
if (abstime != NULL)
{
ret = -EINVAL;
nxmq_free_msg(mqmsg);
goto errout_in_critical_section;
if (abstime->tv_nsec < 0 || abstime->tv_nsec >= 1000000000)
{
ret = -EINVAL;
}
else
{
/* We are not in an interrupt handler and the message queue
* is full. Set up a timed wait for the message queue to
* become non-full.
*
* Convert the timespec to clock ticks. We must have interrupts
* disabled here so that this time stays valid until the wait
* begins.
*/
ret = clock_abstime2ticks(CLOCK_REALTIME, abstime, &ticks);
}
/* Handle any time-related errors */
if (ret != OK)
{
nxmq_free_msg(mqmsg);
goto errout_in_critical_section;
}
}
/* We are not in an interrupt handler and the message queue is full.
* Set up a timed wait for the message queue to become non-full.
*
* Convert the timespec to clock ticks. We must have interrupts
* disabled here so that this time stays valid until the wait begins.
*/
ret = clock_abstime2ticks(CLOCK_REALTIME, abstime, &ticks);
/* If the time has already expired and the message queue is empty,
* return immediately.
*/
if (ret == OK && ticks <= 0)
if (ticks <= 0)
{
ret = ETIMEDOUT;
}
/* Handle any time-related errors */
if (ret != OK)
{
ret = -ret;
ret = -ETIMEDOUT;
nxmq_free_msg(mqmsg);
goto errout_in_critical_section;
}
@ -282,6 +279,104 @@ errout_in_critical_section:
return ret;
}
/****************************************************************************
* Name: file_mq_timedsend
*
* Description:
* This function adds the specified message (msg) to the message queue
* (mq). file_mq_timedsend() behaves just like mq_send(), except that if
* the queue is full and the O_NONBLOCK flag is not enabled for the
* message queue description, then abstime points to a structure which
* specifies a ceiling on the time for which the call will block.
*
* file_mq_timedsend() is functionally equivalent to mq_timedsend() except
* that:
*
* - It is not a cancellation point, and
* - It does not modify the errno value.
*
* See comments with mq_timedsend() for a more complete description of the
* behavior of this function
*
* Input Parameters:
* mq - Message queue descriptor
* msg - Message to send
* msglen - The length of the message in bytes
* prio - The priority of the message
* abstime - the absolute time to wait until a timeout is decleared
*
* Returned Value:
* This is an internal OS interface and should not be used by applications.
* It follows the NuttX internal error return policy: Zero (OK) is
* returned on success. A negated errno value is returned on failure.
* (see mq_timedsend() for the list list valid return values).
*
* EAGAIN The queue was empty, and the O_NONBLOCK flag was set for the
* message queue description referred to by mq.
* EINVAL Either msg or mq is NULL or the value of prio is invalid.
* EBADF Message queue opened not opened for writing.
* EMSGSIZE 'msglen' was greater than the maxmsgsize attribute of the
* message queue.
* EINTR The call was interrupted by a signal handler.
*
****************************************************************************/
int file_mq_timedsend(FAR struct file *mq, FAR const char *msg,
size_t msglen, unsigned int prio,
FAR const struct timespec *abstime)
{
return file_mq_timedsend_internal(mq, msg, msglen, prio, abstime, 0);
}
/****************************************************************************
* Name: file_mq_ticksend
*
* Description:
* This function adds the specified message (msg) to the message queue
* (mq). file_mq_ticksend() behaves just like mq_send(), except that if
* the queue is full and the O_NONBLOCK flag is not enabled for the
* message queue description, then abstime points to a structure which
* specifies a ceiling on the time for which the call will block.
*
* file_mq_ticksend() is functionally equivalent to mq_timedsend() except
* that:
*
* - It is not a cancellation point, and
* - It does not modify the errno value.
*
* See comments with mq_timedsend() for a more complete description of the
* behavior of this function
*
* Input Parameters:
* mq - Message queue descriptor
* msg - Message to send
* msglen - The length of the message in bytes
* prio - The priority of the message
* ticks - Ticks to wait from the start time until the semaphore is
* posted.
*
* Returned Value:
* This is an internal OS interface and should not be used by applications.
* It follows the NuttX internal error return policy: Zero (OK) is
* returned on success. A negated errno value is returned on failure.
* (see mq_timedsend() for the list list valid return values).
*
* EAGAIN The queue was empty, and the O_NONBLOCK flag was set for the
* message queue description referred to by mq.
* EINVAL Either msg or mq is NULL or the value of prio is invalid.
* EBADF Message queue opened not opened for writing.
* EMSGSIZE 'msglen' was greater than the maxmsgsize attribute of the
* message queue.
* EINTR The call was interrupted by a signal handler.
*
****************************************************************************/
int file_mq_ticksend(FAR struct file *mq, FAR const char *msg,
size_t msglen, unsigned int prio, sclock_t ticks)
{
return file_mq_timedsend_internal(mq, msg, msglen, prio, NULL, ticks);
}
/****************************************************************************
* Name: nxmq_timedsend
*
@ -336,7 +431,7 @@ int nxmq_timedsend(mqd_t mqdes, FAR const char *msg, size_t msglen,
return ret;
}
return file_mq_timedsend(filep, msg, msglen, prio, abstime);
return file_mq_timedsend_internal(filep, msg, msglen, prio, abstime, 0);
}
/****************************************************************************