From 23d87ff9df116f0f43465a13adc2fbe720fc3466 Mon Sep 17 00:00:00 2001 From: Jiuzhu Dong Date: Mon, 26 Jul 2021 10:36:13 +0800 Subject: [PATCH] usrwqueue: implement order work queue Signed-off-by: Jiuzhu Dong --- include/nuttx/semaphore.h | 2 +- libs/libc/wqueue/Make.defs | 2 +- libs/libc/wqueue/work_cancel.c | 53 +++++++--- libs/libc/wqueue/work_queue.c | 82 +++++++++++---- libs/libc/wqueue/work_signal.c | 99 ------------------ libs/libc/wqueue/work_usrthread.c | 164 ++++++++---------------------- libs/libc/wqueue/wqueue.h | 4 +- 7 files changed, 152 insertions(+), 254 deletions(-) delete mode 100644 libs/libc/wqueue/work_signal.c diff --git a/include/nuttx/semaphore.h b/include/nuttx/semaphore.h index 42bd308f65..83fb3c12b4 100644 --- a/include/nuttx/semaphore.h +++ b/include/nuttx/semaphore.h @@ -61,7 +61,7 @@ # define _SEM_TIMEDWAIT(s,t) nxsem_timedwait(s,t) # define _SEM_CLOCKWAIT(s,c,t) nxsem_clockwait(s,c,t) # define _SEM_POST(s) nxsem_post(s) -# define _SEM_GETVALUE(s) nxsem_get_value(s) +# define _SEM_GETVALUE(s,v) nxsem_get_value(s,v) # define _SEM_GETPROTOCOL(s,p) nxsem_get_protocol(s,p) # define _SEM_SETPROTOCOL(s,p) nxsem_set_protocol(s,p) # define _SEM_ERRNO(r) (-(r)) diff --git a/libs/libc/wqueue/Make.defs b/libs/libc/wqueue/Make.defs index 72862bfd84..a0633d1419 100644 --- a/libs/libc/wqueue/Make.defs +++ b/libs/libc/wqueue/Make.defs @@ -22,7 +22,7 @@ ifeq ($(CONFIG_LIB_USRWORK),y) # Add the work queue C files to the build -CSRCS += work_usrthread.c work_queue.c work_cancel.c work_signal.c +CSRCS += work_usrthread.c work_queue.c work_cancel.c # Add the wqueue directory to the build diff --git a/libs/libc/wqueue/work_cancel.c b/libs/libc/wqueue/work_cancel.c index 54e084cfb3..77a4c34961 100644 --- a/libs/libc/wqueue/work_cancel.c +++ b/libs/libc/wqueue/work_cancel.c @@ -48,8 +48,8 @@ * work_queue() again. * * Input Parameters: - * qid - The work queue ID - * work - The previously queued work structure to cancel + * wqueue - The work queue + * work - The previously queue work structure to cancel * * Returned Value: * Zero (OK) on success, a negated errno on failure. This error may be @@ -63,7 +63,10 @@ static int work_qcancel(FAR struct usr_wqueue_s *wqueue, FAR struct work_s *work) { + FAR sq_entry_t *prev = NULL; + FAR sq_entry_t *curr; int ret = -ENOENT; + int semcount; DEBUGASSERT(work != NULL); @@ -78,18 +81,44 @@ static int work_qcancel(FAR struct usr_wqueue_s *wqueue, if (work->worker != NULL) { - /* A little test of the integrity of the work queue */ - - DEBUGASSERT(work->dq.flink != NULL || - (FAR dq_entry_t *)work == wqueue->q.tail); - DEBUGASSERT(work->dq.blink != NULL || - (FAR dq_entry_t *)work == wqueue->q.head); - - /* Remove the entry from the work queue and make sure that it is - * marked as available (i.e., the worker field is nullified). + /* Search the work activelist for the target work. We can't + * use sq_rem to do this because there are additional operations that + * need to be done. */ - dq_rem((FAR dq_entry_t *)work, &wqueue->q); + curr = wqueue->q.head; + while (curr && curr != &work->u.s.sq) + { + prev = curr; + curr = curr->flink; + } + + /* Check if the work was found in the list. If not, then an OS + * error has occurred because the work is marked active! + */ + + DEBUGASSERT(curr); + + /* Now, remove the work from the work queue */ + + if (prev) + { + /* Remove the work from mid- or end-of-queue */ + + sq_remafter(prev, &wqueue->q); + } + else + { + /* Remove the work at the head of the queue */ + + sq_remfirst(&wqueue->q); + _SEM_GETVALUE(&wqueue->wake, &semcount); + if (semcount < 1) + { + _SEM_POST(&wqueue->wake); + } + } + work->worker = NULL; ret = OK; } diff --git a/libs/libc/wqueue/work_queue.c b/libs/libc/wqueue/work_queue.c index 5f631f2d8c..7470b4aa1c 100644 --- a/libs/libc/wqueue/work_queue.c +++ b/libs/libc/wqueue/work_queue.c @@ -32,6 +32,7 @@ #include #include +#include #include "wqueue/wqueue.h" @@ -56,7 +57,7 @@ * and remove it from the work queue. * * Input Parameters: - * qid - The work queue ID (index) + * wqueue - The work queue * work - The work structure to queue * worker - The worker callback to be invoked. The callback will be * invoked on the worker thread of execution. @@ -74,35 +75,72 @@ static int work_qqueue(FAR struct usr_wqueue_s *wqueue, FAR struct work_s *work, worker_t worker, FAR void *arg, clock_t delay) { - DEBUGASSERT(work != NULL); + FAR sq_entry_t *prev = NULL; + FAR sq_entry_t *curr; + sclock_t delta; + int semcount; /* Get exclusive access to the work queue */ while (_SEM_WAIT(&wqueue->lock) < 0); - /* Is there already pending work? */ - - if (work->worker != NULL) - { - /* Remove the entry from the work queue. It will be requeued at the - * end of the work queue. - */ - - dq_rem((FAR dq_entry_t *)work, &wqueue->q); - } - /* Initialize the work structure */ - work->worker = worker; /* Work callback. non-NULL means queued */ - work->arg = arg; /* Callback argument */ - work->delay = delay; /* Delay until work performed */ + work->worker = worker; /* Work callback. non-NULL means queued */ + work->arg = arg; /* Callback argument */ + work->u.s.qtime = clock() + delay; /* Delay until work performed */ - /* Now, time-tag that entry and put it in the work queue. */ + /* Do the easy case first -- when the work queue is empty. */ - work->qtime = clock(); /* Time work queued */ + if (wqueue->q.head == NULL) + { + /* Add the watchdog to the head == tail of the queue. */ - dq_addlast((FAR dq_entry_t *)work, &wqueue->q); - kill(wqueue->pid, SIGWORK); /* Wake up the worker thread */ + sq_addfirst(&work->u.s.sq, &wqueue->q); + _SEM_POST(&wqueue->wake); + } + + /* There are other active watchdogs in the timer queue */ + + else + { + curr = wqueue->q.head; + + /* Check if the new work must be inserted before the curr. */ + + do + { + delta = work->u.s.qtime - ((FAR struct work_s *)curr)->u.s.qtime; + if (delta < 0) + { + break; + } + + prev = curr; + curr = curr->flink; + } + while (curr != NULL); + + /* Insert the new watchdog in the list */ + + if (prev == NULL) + { + /* Insert the watchdog at the head of the list */ + + sq_addfirst(&work->u.s.sq, &wqueue->q); + _SEM_GETVALUE(&wqueue->wake, &semcount); + if (semcount < 1) + { + _SEM_POST(&wqueue->wake); + } + } + else + { + /* Insert the watchdog in mid- or end-of-queue */ + + sq_addafter(prev, &work->u.s.sq, &wqueue->q); + } + } _SEM_POST(&wqueue->lock); return OK; @@ -146,6 +184,10 @@ int work_queue(int qid, FAR struct work_s *work, worker_t worker, { if (qid == USRWORK) { + /* Is there already pending work? */ + + work_cancel(qid, work); + return work_qqueue(&g_usrwork, work, worker, arg, delay); } else diff --git a/libs/libc/wqueue/work_signal.c b/libs/libc/wqueue/work_signal.c deleted file mode 100644 index eae548b851..0000000000 --- a/libs/libc/wqueue/work_signal.c +++ /dev/null @@ -1,99 +0,0 @@ -/**************************************************************************** - * libs/libc/wqueue/work_signal.c - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. The - * ASF licenses this file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the - * License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations - * under the License. - * - ****************************************************************************/ - -/**************************************************************************** - * Included Files - ****************************************************************************/ - -#include - -#include -#include - -#include - -#include "wqueue/wqueue.h" - -#if defined(CONFIG_LIB_USRWORK) && !defined(__KERNEL__) - -/**************************************************************************** - * Pre-processor Definitions - ****************************************************************************/ - -/**************************************************************************** - * Private Type Declarations - ****************************************************************************/ - -/**************************************************************************** - * Public Data - ****************************************************************************/ - -/**************************************************************************** - * Private Data - ****************************************************************************/ - -/**************************************************************************** - * Private Functions - ****************************************************************************/ - -/**************************************************************************** - * Public Functions - ****************************************************************************/ - -/**************************************************************************** - * Name: work_signal - * - * Description: - * Signal the worker thread to process the work queue now. This function - * is used internally by the work logic but could also be used by the - * user to force an immediate re-assessment of pending work. - * - * Input Parameters: - * qid - The work queue ID - * - * Returned Value: - * Zero on success, a negated errno on failure - * - ****************************************************************************/ - -int work_signal(int qid) -{ - int ret; - - if (qid == USRWORK) - { - /* Signal the worker thread */ - - ret = kill(g_usrwork.pid, SIGWORK); - if (ret < 0) - { - int errcode = get_errno(); - ret = -errcode; - } - } - else - { - ret = -EINVAL; - } - - return ret; -} - -#endif /* CONFIG_LIB_USRWORK && !__KERNEL__ */ diff --git a/libs/libc/wqueue/work_usrthread.c b/libs/libc/wqueue/work_usrthread.c index 0c71b8a49a..8c2b345783 100644 --- a/libs/libc/wqueue/work_usrthread.c +++ b/libs/libc/wqueue/work_usrthread.c @@ -27,7 +27,6 @@ #include #include #include -#include #include #include #include @@ -45,26 +44,12 @@ * Pre-processor Definitions ****************************************************************************/ -/* Use CLOCK_MONOTONIC if it is available. CLOCK_REALTIME can cause bad - * delays if the time is changed. - */ - -#ifdef CONFIG_CLOCK_MONOTONIC -# define WORK_CLOCK CLOCK_MONOTONIC -#else -# define WORK_CLOCK CLOCK_REALTIME -#endif - #ifdef CONFIG_SYSTEM_TIME64 # define WORK_DELAY_MAX UINT64_MAX #else # define WORK_DELAY_MAX UINT32_MAX #endif -#ifndef MIN -# define MIN(a,b) ((a) < (b) ? (a) : (b)) -#endif - /**************************************************************************** * Private Type Declarations ****************************************************************************/ @@ -98,17 +83,12 @@ struct usr_wqueue_s g_usrwork; * ****************************************************************************/ -void work_process(FAR struct usr_wqueue_s *wqueue) +static void work_process(FAR struct usr_wqueue_s *wqueue) { volatile FAR struct work_s *work; - sigset_t sigset; - sigset_t oldset; - worker_t worker; + worker_t worker; FAR void *arg; - clock_t elapsed; - clock_t remaining; - clock_t stick; - clock_t ctick; + sclock_t elapsed; clock_t next; int ret; @@ -125,15 +105,6 @@ void work_process(FAR struct usr_wqueue_s *wqueue) return; } - /* Set up the signal mask */ - - sigemptyset(&sigset); - sigaddset(&sigset, SIGWORK); - - /* Get the time that we started this polling cycle in clock ticks. */ - - stick = clock(); - /* And check each entry in the work queue. Since we have locked the * work queue we know: (1) we will not be suspended unless we do * so ourselves, and (2) there will be no changes to the work queue @@ -142,19 +113,21 @@ void work_process(FAR struct usr_wqueue_s *wqueue) work = (FAR struct work_s *)wqueue->q.head; while (work) { - /* Is this work ready? It is ready if there is no delay or if - * the delay has elapsed. qtime is the time that the work was added - * to the work queue. It will always be greater than or equal to - * zero. Therefore a delay of zero will always execute immediately. + /* Is this work ready? It is ready if there is no delay or if + * the delay has elapsed. is the time that the work was added + * to the work queue. Therefore a delay of equal or less than + * zero will always execute immediately. */ - ctick = clock(); - elapsed = ctick - work->qtime; - if (elapsed >= work->delay) + elapsed = clock() - work->u.s.qtime; + + /* Is this delay work ready? */ + + if (elapsed >= 0) { /* Remove the ready-to-execute work from the list */ - dq_rem((struct dq_entry_s *)work, &wqueue->q); + sq_remfirst(&wqueue->q); /* Extract the work description from the entry (in case the work * instance by the re-used after it has been de-queued). @@ -195,65 +168,26 @@ void work_process(FAR struct usr_wqueue_s *wqueue) return; } - - work = (FAR struct work_s *)wqueue->q.head; } - else - { - /* Canceled.. Just move to the next work in the list with - * the work queue still locked. - */ - work = (FAR struct work_s *)work->dq.flink; - } + work = (FAR struct work_s *)wqueue->q.head; } - else /* elapsed < work->delay */ + else { - /* This one is not ready. - * - * NOTE that elapsed is relative to the current time, - * not the time of beginning of this queue processing pass. - * So it may need an adjustment. - */ - - elapsed += (ctick - stick); - if (elapsed > work->delay) - { - /* The delay has expired while we are processing */ - - elapsed = work->delay; - } - - /* Will it be ready before the next scheduled wakeup interval? */ - - remaining = work->delay - elapsed; - if (remaining < next) - { - /* Yes.. Then schedule to wake up when the work is ready */ - - next = remaining; - } - - /* Then try the next in the list. */ - - work = (FAR struct work_s *)work->dq.flink; + next = work->u.s.qtime - clock(); + break; } } - /* Unlock the work queue before waiting. In order to assure that we do - * not lose the SIGWORK signal before waiting, we block the SIGWORK - * signals before unlocking the work queue. That will cause in SIGWORK - * signals directed to the worker thread to pend. - */ + /* Unlock the work queue before waiting. */ - sigprocmask(SIG_BLOCK, &sigset, &oldset); _SEM_POST(&wqueue->lock); if (next == WORK_DELAY_MAX) { - /* Wait indefinitely until signaled with SIGWORK */ + /* Wait indefinitely until work_queue has new items */ - sigwaitinfo(&sigset, NULL); + _SEM_WAIT(&wqueue->wake); } else { @@ -261,7 +195,7 @@ void work_process(FAR struct usr_wqueue_s *wqueue) time_t sec; /* Wait awhile to check the work list. We will wait here until - * either the time elapses or until we are awakened by a signal. + * either the time elapses or until we are awakened by a semaphore. * Interrupts will be re-enabled while we wait. */ @@ -269,10 +203,8 @@ void work_process(FAR struct usr_wqueue_s *wqueue) rqtp.tv_sec = sec; rqtp.tv_nsec = (next - (sec * 1000000)) * 1000; - sigtimedwait(&sigset, NULL, &rqtp); + _SEM_TIMEDWAIT(&wqueue->wake, &rqtp); } - - sigprocmask(SIG_SETMASK, &oldset, NULL); } /**************************************************************************** @@ -339,53 +271,48 @@ static pthread_addr_t work_usrthread(pthread_addr_t arg) int work_usrstart(void) { + int ret; +#ifndef CONFIG_BUILD_PROTECTED + pthread_t usrwork; + pthread_attr_t attr; + struct sched_param param; +#endif + /* Set up the work queue lock */ _SEM_INIT(&g_usrwork.lock, 0, 1); + _SEM_INIT(&g_usrwork.wake, 0, 0); + _SEM_SETPROTOCOL(&g_usrwork.wake, SEM_PRIO_NONE); + + /* Initialize the work queue */ + + sq_init(&g_usrwork.q); + #ifdef CONFIG_BUILD_PROTECTED /* Start a user-mode worker thread for use by applications. */ - g_usrwork.pid = task_create("uwork", - CONFIG_LIB_USRWORKPRIORITY, - CONFIG_LIB_USRWORKSTACKSIZE, - (main_t)work_usrthread, - (FAR char * const *)NULL); - - DEBUGASSERT(g_usrwork.pid > 0); - if (g_usrwork.pid < 0) + ret = task_create("uwork", + CONFIG_LIB_USRWORKPRIORITY, + CONFIG_LIB_USRWORKSTACKSIZE, + (main_t)work_usrthread, + ((FAR char * const *)NULL)); + if (ret < 0) { int errcode = get_errno(); DEBUGASSERT(errcode > 0); return -errcode; } - return g_usrwork.pid; + return ret; #else - pthread_t usrwork; - pthread_attr_t attr; - struct sched_param param; - int ret; - /* Start a user-mode worker thread for use by applications. */ pthread_attr_init(&attr); pthread_attr_setstacksize(&attr, CONFIG_LIB_USRWORKSTACKSIZE); -#ifdef CONFIG_SCHED_SPORADIC - /* Get the current sporadic scheduling parameters. Those will not be - * modified. - */ - - ret = set_getparam(pid, ¶m); - if (ret < 0) - { - int errcode = get_errno(); - return -errcode; - } -#endif - + pthread_attr_getschedparam(&attr, ¶m); param.sched_priority = CONFIG_LIB_USRWORKPRIORITY; pthread_attr_setschedparam(&attr, ¶m); @@ -401,8 +328,7 @@ int work_usrstart(void) pthread_detach(usrwork); - g_usrwork.pid = (pid_t)usrwork; - return g_usrwork.pid; + return (pid_t)usrwork; #endif } diff --git a/libs/libc/wqueue/wqueue.h b/libs/libc/wqueue/wqueue.h index e84c2296e3..4c0342199a 100644 --- a/libs/libc/wqueue/wqueue.h +++ b/libs/libc/wqueue/wqueue.h @@ -46,9 +46,9 @@ struct usr_wqueue_s { - struct dq_queue_s q; /* The queue of pending work */ + struct sq_queue_s q; /* The queue of pending work */ sem_t lock; /* exclusive access to user-mode work queue */ - pid_t pid; /* The task ID of the worker thread(s) */ + sem_t wake; /* The wake-up semaphore of the usrthread */ }; /****************************************************************************