usrwqueue: implement order work queue

Signed-off-by: Jiuzhu Dong <dongjiuzhu1@xiaomi.com>
This commit is contained in:
Jiuzhu Dong 2021-07-26 10:36:13 +08:00 committed by Xiang Xiao
parent 00854f0f94
commit 23d87ff9df
7 changed files with 152 additions and 254 deletions

View File

@ -61,7 +61,7 @@
# define _SEM_TIMEDWAIT(s,t) nxsem_timedwait(s,t) # define _SEM_TIMEDWAIT(s,t) nxsem_timedwait(s,t)
# define _SEM_CLOCKWAIT(s,c,t) nxsem_clockwait(s,c,t) # define _SEM_CLOCKWAIT(s,c,t) nxsem_clockwait(s,c,t)
# define _SEM_POST(s) nxsem_post(s) # define _SEM_POST(s) nxsem_post(s)
# define _SEM_GETVALUE(s) nxsem_get_value(s) # define _SEM_GETVALUE(s,v) nxsem_get_value(s,v)
# define _SEM_GETPROTOCOL(s,p) nxsem_get_protocol(s,p) # define _SEM_GETPROTOCOL(s,p) nxsem_get_protocol(s,p)
# define _SEM_SETPROTOCOL(s,p) nxsem_set_protocol(s,p) # define _SEM_SETPROTOCOL(s,p) nxsem_set_protocol(s,p)
# define _SEM_ERRNO(r) (-(r)) # define _SEM_ERRNO(r) (-(r))

View File

@ -22,7 +22,7 @@ ifeq ($(CONFIG_LIB_USRWORK),y)
# Add the work queue C files to the build # Add the work queue C files to the build
CSRCS += work_usrthread.c work_queue.c work_cancel.c work_signal.c CSRCS += work_usrthread.c work_queue.c work_cancel.c
# Add the wqueue directory to the build # Add the wqueue directory to the build

View File

@ -48,8 +48,8 @@
* work_queue() again. * work_queue() again.
* *
* Input Parameters: * Input Parameters:
* qid - The work queue ID * wqueue - The work queue
* work - The previously queued work structure to cancel * work - The previously queue work structure to cancel
* *
* Returned Value: * Returned Value:
* Zero (OK) on success, a negated errno on failure. This error may be * Zero (OK) on success, a negated errno on failure. This error may be
@ -63,7 +63,10 @@
static int work_qcancel(FAR struct usr_wqueue_s *wqueue, static int work_qcancel(FAR struct usr_wqueue_s *wqueue,
FAR struct work_s *work) FAR struct work_s *work)
{ {
FAR sq_entry_t *prev = NULL;
FAR sq_entry_t *curr;
int ret = -ENOENT; int ret = -ENOENT;
int semcount;
DEBUGASSERT(work != NULL); DEBUGASSERT(work != NULL);
@ -78,18 +81,44 @@ static int work_qcancel(FAR struct usr_wqueue_s *wqueue,
if (work->worker != NULL) if (work->worker != NULL)
{ {
/* A little test of the integrity of the work queue */ /* Search the work activelist for the target work. We can't
* use sq_rem to do this because there are additional operations that
DEBUGASSERT(work->dq.flink != NULL || * need to be done.
(FAR dq_entry_t *)work == wqueue->q.tail);
DEBUGASSERT(work->dq.blink != NULL ||
(FAR dq_entry_t *)work == wqueue->q.head);
/* Remove the entry from the work queue and make sure that it is
* marked as available (i.e., the worker field is nullified).
*/ */
dq_rem((FAR dq_entry_t *)work, &wqueue->q); curr = wqueue->q.head;
while (curr && curr != &work->u.s.sq)
{
prev = curr;
curr = curr->flink;
}
/* Check if the work was found in the list. If not, then an OS
* error has occurred because the work is marked active!
*/
DEBUGASSERT(curr);
/* Now, remove the work from the work queue */
if (prev)
{
/* Remove the work from mid- or end-of-queue */
sq_remafter(prev, &wqueue->q);
}
else
{
/* Remove the work at the head of the queue */
sq_remfirst(&wqueue->q);
_SEM_GETVALUE(&wqueue->wake, &semcount);
if (semcount < 1)
{
_SEM_POST(&wqueue->wake);
}
}
work->worker = NULL; work->worker = NULL;
ret = OK; ret = OK;
} }

View File

@ -32,6 +32,7 @@
#include <nuttx/clock.h> #include <nuttx/clock.h>
#include <nuttx/wqueue.h> #include <nuttx/wqueue.h>
#include <nuttx/semaphore.h>
#include "wqueue/wqueue.h" #include "wqueue/wqueue.h"
@ -56,7 +57,7 @@
* and remove it from the work queue. * and remove it from the work queue.
* *
* Input Parameters: * Input Parameters:
* qid - The work queue ID (index) * wqueue - The work queue
* work - The work structure to queue * work - The work structure to queue
* worker - The worker callback to be invoked. The callback will be * worker - The worker callback to be invoked. The callback will be
* invoked on the worker thread of execution. * invoked on the worker thread of execution.
@ -74,35 +75,72 @@ static int work_qqueue(FAR struct usr_wqueue_s *wqueue,
FAR struct work_s *work, worker_t worker, FAR struct work_s *work, worker_t worker,
FAR void *arg, clock_t delay) FAR void *arg, clock_t delay)
{ {
DEBUGASSERT(work != NULL); FAR sq_entry_t *prev = NULL;
FAR sq_entry_t *curr;
sclock_t delta;
int semcount;
/* Get exclusive access to the work queue */ /* Get exclusive access to the work queue */
while (_SEM_WAIT(&wqueue->lock) < 0); while (_SEM_WAIT(&wqueue->lock) < 0);
/* Is there already pending work? */
if (work->worker != NULL)
{
/* Remove the entry from the work queue. It will be requeued at the
* end of the work queue.
*/
dq_rem((FAR dq_entry_t *)work, &wqueue->q);
}
/* Initialize the work structure */ /* Initialize the work structure */
work->worker = worker; /* Work callback. non-NULL means queued */ work->worker = worker; /* Work callback. non-NULL means queued */
work->arg = arg; /* Callback argument */ work->arg = arg; /* Callback argument */
work->delay = delay; /* Delay until work performed */ work->u.s.qtime = clock() + delay; /* Delay until work performed */
/* Now, time-tag that entry and put it in the work queue. */ /* Do the easy case first -- when the work queue is empty. */
work->qtime = clock(); /* Time work queued */ if (wqueue->q.head == NULL)
{
/* Add the watchdog to the head == tail of the queue. */
dq_addlast((FAR dq_entry_t *)work, &wqueue->q); sq_addfirst(&work->u.s.sq, &wqueue->q);
kill(wqueue->pid, SIGWORK); /* Wake up the worker thread */ _SEM_POST(&wqueue->wake);
}
/* There are other active watchdogs in the timer queue */
else
{
curr = wqueue->q.head;
/* Check if the new work must be inserted before the curr. */
do
{
delta = work->u.s.qtime - ((FAR struct work_s *)curr)->u.s.qtime;
if (delta < 0)
{
break;
}
prev = curr;
curr = curr->flink;
}
while (curr != NULL);
/* Insert the new watchdog in the list */
if (prev == NULL)
{
/* Insert the watchdog at the head of the list */
sq_addfirst(&work->u.s.sq, &wqueue->q);
_SEM_GETVALUE(&wqueue->wake, &semcount);
if (semcount < 1)
{
_SEM_POST(&wqueue->wake);
}
}
else
{
/* Insert the watchdog in mid- or end-of-queue */
sq_addafter(prev, &work->u.s.sq, &wqueue->q);
}
}
_SEM_POST(&wqueue->lock); _SEM_POST(&wqueue->lock);
return OK; return OK;
@ -146,6 +184,10 @@ int work_queue(int qid, FAR struct work_s *work, worker_t worker,
{ {
if (qid == USRWORK) if (qid == USRWORK)
{ {
/* Is there already pending work? */
work_cancel(qid, work);
return work_qqueue(&g_usrwork, work, worker, arg, delay); return work_qqueue(&g_usrwork, work, worker, arg, delay);
} }
else else

View File

@ -1,99 +0,0 @@
/****************************************************************************
* libs/libc/wqueue/work_signal.c
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership. The
* ASF licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the
* License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*
****************************************************************************/
/****************************************************************************
* Included Files
****************************************************************************/
#include <nuttx/config.h>
#include <signal.h>
#include <errno.h>
#include <nuttx/wqueue.h>
#include "wqueue/wqueue.h"
#if defined(CONFIG_LIB_USRWORK) && !defined(__KERNEL__)
/****************************************************************************
* Pre-processor Definitions
****************************************************************************/
/****************************************************************************
* Private Type Declarations
****************************************************************************/
/****************************************************************************
* Public Data
****************************************************************************/
/****************************************************************************
* Private Data
****************************************************************************/
/****************************************************************************
* Private Functions
****************************************************************************/
/****************************************************************************
* Public Functions
****************************************************************************/
/****************************************************************************
* Name: work_signal
*
* Description:
* Signal the worker thread to process the work queue now. This function
* is used internally by the work logic but could also be used by the
* user to force an immediate re-assessment of pending work.
*
* Input Parameters:
* qid - The work queue ID
*
* Returned Value:
* Zero on success, a negated errno on failure
*
****************************************************************************/
int work_signal(int qid)
{
int ret;
if (qid == USRWORK)
{
/* Signal the worker thread */
ret = kill(g_usrwork.pid, SIGWORK);
if (ret < 0)
{
int errcode = get_errno();
ret = -errcode;
}
}
else
{
ret = -EINVAL;
}
return ret;
}
#endif /* CONFIG_LIB_USRWORK && !__KERNEL__ */

View File

@ -27,7 +27,6 @@
#include <stdint.h> #include <stdint.h>
#include <unistd.h> #include <unistd.h>
#include <pthread.h> #include <pthread.h>
#include <signal.h>
#include <sched.h> #include <sched.h>
#include <errno.h> #include <errno.h>
#include <assert.h> #include <assert.h>
@ -45,26 +44,12 @@
* Pre-processor Definitions * Pre-processor Definitions
****************************************************************************/ ****************************************************************************/
/* Use CLOCK_MONOTONIC if it is available. CLOCK_REALTIME can cause bad
* delays if the time is changed.
*/
#ifdef CONFIG_CLOCK_MONOTONIC
# define WORK_CLOCK CLOCK_MONOTONIC
#else
# define WORK_CLOCK CLOCK_REALTIME
#endif
#ifdef CONFIG_SYSTEM_TIME64 #ifdef CONFIG_SYSTEM_TIME64
# define WORK_DELAY_MAX UINT64_MAX # define WORK_DELAY_MAX UINT64_MAX
#else #else
# define WORK_DELAY_MAX UINT32_MAX # define WORK_DELAY_MAX UINT32_MAX
#endif #endif
#ifndef MIN
# define MIN(a,b) ((a) < (b) ? (a) : (b))
#endif
/**************************************************************************** /****************************************************************************
* Private Type Declarations * Private Type Declarations
****************************************************************************/ ****************************************************************************/
@ -98,17 +83,12 @@ struct usr_wqueue_s g_usrwork;
* *
****************************************************************************/ ****************************************************************************/
void work_process(FAR struct usr_wqueue_s *wqueue) static void work_process(FAR struct usr_wqueue_s *wqueue)
{ {
volatile FAR struct work_s *work; volatile FAR struct work_s *work;
sigset_t sigset; worker_t worker;
sigset_t oldset;
worker_t worker;
FAR void *arg; FAR void *arg;
clock_t elapsed; sclock_t elapsed;
clock_t remaining;
clock_t stick;
clock_t ctick;
clock_t next; clock_t next;
int ret; int ret;
@ -125,15 +105,6 @@ void work_process(FAR struct usr_wqueue_s *wqueue)
return; return;
} }
/* Set up the signal mask */
sigemptyset(&sigset);
sigaddset(&sigset, SIGWORK);
/* Get the time that we started this polling cycle in clock ticks. */
stick = clock();
/* And check each entry in the work queue. Since we have locked the /* And check each entry in the work queue. Since we have locked the
* work queue we know: (1) we will not be suspended unless we do * work queue we know: (1) we will not be suspended unless we do
* so ourselves, and (2) there will be no changes to the work queue * so ourselves, and (2) there will be no changes to the work queue
@ -142,19 +113,21 @@ void work_process(FAR struct usr_wqueue_s *wqueue)
work = (FAR struct work_s *)wqueue->q.head; work = (FAR struct work_s *)wqueue->q.head;
while (work) while (work)
{ {
/* Is this work ready? It is ready if there is no delay or if /* Is this work ready? It is ready if there is no delay or if
* the delay has elapsed. qtime is the time that the work was added * the delay has elapsed. is the time that the work was added
* to the work queue. It will always be greater than or equal to * to the work queue. Therefore a delay of equal or less than
* zero. Therefore a delay of zero will always execute immediately. * zero will always execute immediately.
*/ */
ctick = clock(); elapsed = clock() - work->u.s.qtime;
elapsed = ctick - work->qtime;
if (elapsed >= work->delay) /* Is this delay work ready? */
if (elapsed >= 0)
{ {
/* Remove the ready-to-execute work from the list */ /* Remove the ready-to-execute work from the list */
dq_rem((struct dq_entry_s *)work, &wqueue->q); sq_remfirst(&wqueue->q);
/* Extract the work description from the entry (in case the work /* Extract the work description from the entry (in case the work
* instance by the re-used after it has been de-queued). * instance by the re-used after it has been de-queued).
@ -195,65 +168,26 @@ void work_process(FAR struct usr_wqueue_s *wqueue)
return; return;
} }
work = (FAR struct work_s *)wqueue->q.head;
} }
else
{
/* Canceled.. Just move to the next work in the list with
* the work queue still locked.
*/
work = (FAR struct work_s *)work->dq.flink; work = (FAR struct work_s *)wqueue->q.head;
}
} }
else /* elapsed < work->delay */ else
{ {
/* This one is not ready. next = work->u.s.qtime - clock();
* break;
* NOTE that elapsed is relative to the current time,
* not the time of beginning of this queue processing pass.
* So it may need an adjustment.
*/
elapsed += (ctick - stick);
if (elapsed > work->delay)
{
/* The delay has expired while we are processing */
elapsed = work->delay;
}
/* Will it be ready before the next scheduled wakeup interval? */
remaining = work->delay - elapsed;
if (remaining < next)
{
/* Yes.. Then schedule to wake up when the work is ready */
next = remaining;
}
/* Then try the next in the list. */
work = (FAR struct work_s *)work->dq.flink;
} }
} }
/* Unlock the work queue before waiting. In order to assure that we do /* Unlock the work queue before waiting. */
* not lose the SIGWORK signal before waiting, we block the SIGWORK
* signals before unlocking the work queue. That will cause in SIGWORK
* signals directed to the worker thread to pend.
*/
sigprocmask(SIG_BLOCK, &sigset, &oldset);
_SEM_POST(&wqueue->lock); _SEM_POST(&wqueue->lock);
if (next == WORK_DELAY_MAX) if (next == WORK_DELAY_MAX)
{ {
/* Wait indefinitely until signaled with SIGWORK */ /* Wait indefinitely until work_queue has new items */
sigwaitinfo(&sigset, NULL); _SEM_WAIT(&wqueue->wake);
} }
else else
{ {
@ -261,7 +195,7 @@ void work_process(FAR struct usr_wqueue_s *wqueue)
time_t sec; time_t sec;
/* Wait awhile to check the work list. We will wait here until /* Wait awhile to check the work list. We will wait here until
* either the time elapses or until we are awakened by a signal. * either the time elapses or until we are awakened by a semaphore.
* Interrupts will be re-enabled while we wait. * Interrupts will be re-enabled while we wait.
*/ */
@ -269,10 +203,8 @@ void work_process(FAR struct usr_wqueue_s *wqueue)
rqtp.tv_sec = sec; rqtp.tv_sec = sec;
rqtp.tv_nsec = (next - (sec * 1000000)) * 1000; rqtp.tv_nsec = (next - (sec * 1000000)) * 1000;
sigtimedwait(&sigset, NULL, &rqtp); _SEM_TIMEDWAIT(&wqueue->wake, &rqtp);
} }
sigprocmask(SIG_SETMASK, &oldset, NULL);
} }
/**************************************************************************** /****************************************************************************
@ -339,53 +271,48 @@ static pthread_addr_t work_usrthread(pthread_addr_t arg)
int work_usrstart(void) int work_usrstart(void)
{ {
int ret;
#ifndef CONFIG_BUILD_PROTECTED
pthread_t usrwork;
pthread_attr_t attr;
struct sched_param param;
#endif
/* Set up the work queue lock */ /* Set up the work queue lock */
_SEM_INIT(&g_usrwork.lock, 0, 1); _SEM_INIT(&g_usrwork.lock, 0, 1);
_SEM_INIT(&g_usrwork.wake, 0, 0);
_SEM_SETPROTOCOL(&g_usrwork.wake, SEM_PRIO_NONE);
/* Initialize the work queue */
sq_init(&g_usrwork.q);
#ifdef CONFIG_BUILD_PROTECTED #ifdef CONFIG_BUILD_PROTECTED
/* Start a user-mode worker thread for use by applications. */ /* Start a user-mode worker thread for use by applications. */
g_usrwork.pid = task_create("uwork", ret = task_create("uwork",
CONFIG_LIB_USRWORKPRIORITY, CONFIG_LIB_USRWORKPRIORITY,
CONFIG_LIB_USRWORKSTACKSIZE, CONFIG_LIB_USRWORKSTACKSIZE,
(main_t)work_usrthread, (main_t)work_usrthread,
(FAR char * const *)NULL); ((FAR char * const *)NULL));
if (ret < 0)
DEBUGASSERT(g_usrwork.pid > 0);
if (g_usrwork.pid < 0)
{ {
int errcode = get_errno(); int errcode = get_errno();
DEBUGASSERT(errcode > 0); DEBUGASSERT(errcode > 0);
return -errcode; return -errcode;
} }
return g_usrwork.pid; return ret;
#else #else
pthread_t usrwork;
pthread_attr_t attr;
struct sched_param param;
int ret;
/* Start a user-mode worker thread for use by applications. */ /* Start a user-mode worker thread for use by applications. */
pthread_attr_init(&attr); pthread_attr_init(&attr);
pthread_attr_setstacksize(&attr, CONFIG_LIB_USRWORKSTACKSIZE); pthread_attr_setstacksize(&attr, CONFIG_LIB_USRWORKSTACKSIZE);
#ifdef CONFIG_SCHED_SPORADIC pthread_attr_getschedparam(&attr, &param);
/* Get the current sporadic scheduling parameters. Those will not be
* modified.
*/
ret = set_getparam(pid, &param);
if (ret < 0)
{
int errcode = get_errno();
return -errcode;
}
#endif
param.sched_priority = CONFIG_LIB_USRWORKPRIORITY; param.sched_priority = CONFIG_LIB_USRWORKPRIORITY;
pthread_attr_setschedparam(&attr, &param); pthread_attr_setschedparam(&attr, &param);
@ -401,8 +328,7 @@ int work_usrstart(void)
pthread_detach(usrwork); pthread_detach(usrwork);
g_usrwork.pid = (pid_t)usrwork; return (pid_t)usrwork;
return g_usrwork.pid;
#endif #endif
} }

View File

@ -46,9 +46,9 @@
struct usr_wqueue_s struct usr_wqueue_s
{ {
struct dq_queue_s q; /* The queue of pending work */ struct sq_queue_s q; /* The queue of pending work */
sem_t lock; /* exclusive access to user-mode work queue */ sem_t lock; /* exclusive access to user-mode work queue */
pid_t pid; /* The task ID of the worker thread(s) */ sem_t wake; /* The wake-up semaphore of the usrthread */
}; };
/**************************************************************************** /****************************************************************************