diff --git a/include/nuttx/wqueue.h b/include/nuttx/wqueue.h index 2e423e24b7..971ab630d5 100644 --- a/include/nuttx/wqueue.h +++ b/include/nuttx/wqueue.h @@ -232,6 +232,10 @@ #ifndef __ASSEMBLY__ +/* Work queue forward declaration */ + +struct kwork_wqueue_s; + /* Defines the work callback */ typedef CODE void (*worker_t)(FAR void *arg); @@ -247,13 +251,14 @@ struct work_s { struct { - struct dq_entry_s dq; /* Implements a double linked list */ - clock_t qtime; /* Time work queued */ + struct dq_entry_s dq; /* Implements a double linked list */ + clock_t qtime; /* Time work queued */ } s; - struct wdog_s timer; /* Delay expiry timer */ + struct wdog_s timer; /* Delay expiry timer */ } u; - worker_t worker; /* Work callback */ - FAR void *arg; /* Callback argument */ + worker_t worker; /* Work callback */ + FAR void *arg; /* Callback argument */ + FAR struct kwork_wqueue_s *wq; /* Work queue */ }; /* This is an enumeration of the various events that may be @@ -330,7 +335,50 @@ int work_usrstart(void); #endif /**************************************************************************** - * Name: work_queue + * Name: work_queue_create + * + * Description: + * Create a new work queue. The work queue is identified by its work + * queue ID, which is used to queue works to the work queue and to + * perform other operations on the work queue. + * This function will create a work thread pool with nthreads threads. + * The work queue ID is returned on success. + * + * Input Parameters: + * name - Name of the new task + * priority - Priority of the new task + * stack_size - size (in bytes) of the stack needed + * nthreads - Number of work thread should be created + * + * Returned Value: + * The work queue handle returned on success. Otherwise, NULL + * + ****************************************************************************/ + +FAR struct kwork_wqueue_s *work_queue_create(FAR const char *name, + int priority, + int stack_size, int nthreads); + +/**************************************************************************** + * Name: work_queue_free + * + * Description: + * Destroy a work queue. The work queue is identified by its work queue ID. + * All worker threads will be destroyed and the work queue will be freed. + * The work queue ID is invalid after this function returns. + * + * Input Parameters: + * wqueue - The work queue handle + * + * Returned Value: + * Zero on success, a negated errno value on failure. + * + ****************************************************************************/ + +int work_queue_free(FAR struct kwork_wqueue_s *wqueue); + +/**************************************************************************** + * Name: work_queue/work_queue_wq * * Description: * Queue work to be performed at a later time. All queued work will be @@ -344,7 +392,8 @@ int work_usrstart(void); * pending work will be canceled and lost. * * Input Parameters: - * qid - The work queue ID + * qid - The work queue ID (must be HPWORK or LPWORK) + * wqueue - The work queue handle * work - The work structure to queue * worker - The worker callback to be invoked. The callback will be * invoked on the worker thread of execution. @@ -360,9 +409,12 @@ int work_usrstart(void); int work_queue(int qid, FAR struct work_s *work, worker_t worker, FAR void *arg, clock_t delay); +int work_queue_wq(FAR struct kwork_wqueue_s *wqueue, + FAR struct work_s *work, worker_t worker, + FAR void *arg, clock_t delay); /**************************************************************************** - * Name: work_cancel + * Name: work_cancel/work_cancel_wq * * Description: * Cancel previously queued work. This removes work from the work queue. @@ -370,7 +422,8 @@ int work_queue(int qid, FAR struct work_s *work, worker_t worker, * work_queue() again. * * Input Parameters: - * qid - The work queue ID + * qid - The work queue ID (must be HPWORK or LPWORK) + * wqueue - The work queue handle * work - The previously queued work structure to cancel * * Returned Value: @@ -382,9 +435,11 @@ int work_queue(int qid, FAR struct work_s *work, worker_t worker, ****************************************************************************/ int work_cancel(int qid, FAR struct work_s *work); +int work_cancel_wq(FAR struct kwork_wqueue_s *wqueue, + FAR struct work_s *work); /**************************************************************************** - * Name: work_cancel_sync + * Name: work_cancel_sync/work_cancel_sync_wq * * Description: * Blocked cancel previously queued user-mode work. This removes work @@ -393,6 +448,7 @@ int work_cancel(int qid, FAR struct work_s *work); * * Input Parameters: * qid - The work queue ID (must be HPWORK or LPWORK) + * wqueue - The work queue handle * work - The previously queued work structure to cancel * * Returned Value: @@ -405,6 +461,8 @@ int work_cancel(int qid, FAR struct work_s *work); ****************************************************************************/ int work_cancel_sync(int qid, FAR struct work_s *work); +int work_cancel_sync_wq(FAR struct kwork_wqueue_s *wqueue, + FAR struct work_s *work); /**************************************************************************** * Name: work_available diff --git a/sched/wqueue/kwork_cancel.c b/sched/wqueue/kwork_cancel.c index 351d48d8b6..2aceb4a65a 100644 --- a/sched/wqueue/kwork_cancel.c +++ b/sched/wqueue/kwork_cancel.c @@ -49,10 +49,9 @@ * work_queue() again. * * Input Parameters: - * wqueue - The work queue to use. Must be HPWORK or LPWORK - * nthread - The number of threads in the work queue - * > 0 unsynchronous cancel - * < 0 synchronous cancel + * wqueue - The work queue to use + * sync - true: synchronous cancel + * false: asynchronous cancel * work - The previously queued work structure to cancel * * Returned Value: @@ -64,13 +63,16 @@ * ****************************************************************************/ -static int work_qcancel(FAR struct kwork_wqueue_s *wqueue, int nthread, +static int work_qcancel(FAR struct kwork_wqueue_s *wqueue, bool sync, FAR struct work_s *work) { irqstate_t flags; int ret = -ENOENT; - DEBUGASSERT(work != NULL); + if (wqueue == NULL || work == NULL) + { + return -EINVAL; + } /* Cancelling the work is simply a matter of removing the work structure * from the work queue. This must be done with interrupts disabled because @@ -96,11 +98,11 @@ static int work_qcancel(FAR struct kwork_wqueue_s *wqueue, int nthread, work->worker = NULL; ret = OK; } - else if (!up_interrupt_context() && !sched_idletask() && nthread > 0) + else if (!up_interrupt_context() && !sched_idletask() && sync) { int wndx; - for (wndx = 0; wndx < nthread; wndx++) + for (wndx = 0; wndx < wqueue->nthreads; wndx++) { if (wqueue->worker[wndx].work == work && wqueue->worker[wndx].pid != nxsched_gettid()) @@ -121,20 +123,20 @@ static int work_qcancel(FAR struct kwork_wqueue_s *wqueue, int nthread, ****************************************************************************/ /**************************************************************************** - * Name: work_cancel + * Name: work_cancel/work_cancel_wq * * Description: - * Cancel previously queued user-mode work. This removes work from the - * user mode work queue. After work has been cancelled, it may be - * requeued by calling work_queue() again. + * Cancel previously queued work. This removes work from the work queue. + * After work has been cancelled, it may be requeued by calling + * work_queue() again. * * Input Parameters: * qid - The work queue ID (must be HPWORK or LPWORK) + * wqueue - The work queue handle * work - The previously queued work structure to cancel * * Returned Value: - * Zero (OK) on success, a negated errno on failure. This error may be - * reported: + * Zero on success, a negated errno on failure * * -ENOENT - There is no such work queued. * -EINVAL - An invalid work queue was specified @@ -143,33 +145,17 @@ static int work_qcancel(FAR struct kwork_wqueue_s *wqueue, int nthread, int work_cancel(int qid, FAR struct work_s *work) { -#ifdef CONFIG_SCHED_HPWORK - if (qid == HPWORK) - { - /* Cancel high priority work */ + return work_qcancel(work_qid2wq(qid), false, work); +} - return work_qcancel((FAR struct kwork_wqueue_s *)&g_hpwork, - -1, work); - } - else -#endif -#ifdef CONFIG_SCHED_LPWORK - if (qid == LPWORK) - { - /* Cancel low priority work */ - - return work_qcancel((FAR struct kwork_wqueue_s *)&g_lpwork, - -1, work); - } - else -#endif - { - return -EINVAL; - } +int work_cancel_wq(FAR struct kwork_wqueue_s *wqueue, + FAR struct work_s *work) +{ + return work_qcancel(wqueue, false, work); } /**************************************************************************** - * Name: work_cancel_sync + * Name: work_cancel_sync/work_cancel_sync_wq * * Description: * Blocked cancel previously queued user-mode work. This removes work @@ -178,6 +164,7 @@ int work_cancel(int qid, FAR struct work_s *work) * * Input Parameters: * qid - The work queue ID (must be HPWORK or LPWORK) + * wqueue - The work queue handle * work - The previously queued work structure to cancel * * Returned Value: @@ -191,29 +178,13 @@ int work_cancel(int qid, FAR struct work_s *work) int work_cancel_sync(int qid, FAR struct work_s *work) { -#ifdef CONFIG_SCHED_HPWORK - if (qid == HPWORK) - { - /* Cancel high priority work */ + return work_qcancel(work_qid2wq(qid), true, work); +} - return work_qcancel((FAR struct kwork_wqueue_s *)&g_hpwork, - CONFIG_SCHED_HPNTHREADS, work); - } - else -#endif -#ifdef CONFIG_SCHED_LPWORK - if (qid == LPWORK) - { - /* Cancel low priority work */ - - return work_qcancel((FAR struct kwork_wqueue_s *)&g_lpwork, - CONFIG_SCHED_LPNTHREADS, work); - } - else -#endif - { - return -EINVAL; - } +int work_cancel_sync_wq(FAR struct kwork_wqueue_s *wqueue, + FAR struct work_s *work) +{ + return work_qcancel(wqueue, true, work); } #endif /* CONFIG_SCHED_WORKQUEUE */ diff --git a/sched/wqueue/kwork_queue.c b/sched/wqueue/kwork_queue.c index 60576ac143..4891b4c2e6 100644 --- a/sched/wqueue/kwork_queue.c +++ b/sched/wqueue/kwork_queue.c @@ -60,37 +60,25 @@ ****************************************************************************/ /**************************************************************************** - * Name: hp_work_timer_expiry + * Name: work_timer_expiry ****************************************************************************/ -#ifdef CONFIG_SCHED_HPWORK -static void hp_work_timer_expiry(wdparm_t arg) +static void work_timer_expiry(wdparm_t arg) { + FAR struct work_s *work = (FAR struct work_s *)arg; irqstate_t flags = enter_critical_section(); - queue_work(&g_hpwork, arg); + + queue_work(work->wq, work); leave_critical_section(flags); } -#endif - -/**************************************************************************** - * Name: lp_work_timer_expiry - ****************************************************************************/ - -#ifdef CONFIG_SCHED_LPWORK -static void lp_work_timer_expiry(wdparm_t arg) -{ - irqstate_t flags = enter_critical_section(); - queue_work(&g_lpwork, arg); - leave_critical_section(flags); -} -#endif static bool work_is_canceling(FAR struct kworker_s *kworkers, int nthreads, FAR struct work_s *work) { int semcount; + int wndx; - for (int wndx = 0; wndx < nthreads; wndx++) + for (wndx = 0; wndx < nthreads; wndx++) { if (kworkers[wndx].work == work) { @@ -110,12 +98,11 @@ static bool work_is_canceling(FAR struct kworker_s *kworkers, int nthreads, ****************************************************************************/ /**************************************************************************** - * Name: work_queue + * Name: work_queue/work_queue_wq * * Description: - * Queue kernel-mode work to be performed at a later time. All queued - * work will be performed on the worker thread of execution (not the - * caller's). + * Queue work to be performed at a later time. All queued work will be + * performed on the worker thread of execution (not the caller's). * * The work structure is allocated and must be initialized to all zero by * the caller. Otherwise, the work structure is completely managed by the @@ -125,12 +112,13 @@ static bool work_is_canceling(FAR struct kworker_s *kworkers, int nthreads, * pending work will be canceled and lost. * * Input Parameters: - * qid - The work queue ID (index) + * qid - The work queue ID (must be HPWORK or LPWORK) + * wqueue - The work queue handle * work - The work structure to queue * worker - The worker callback to be invoked. The callback will be * invoked on the worker thread of execution. * arg - The argument that will be passed to the worker callback when - * int is invoked. + * it is invoked. * delay - Delay (in clock ticks) from the time queue until the worker * is invoked. Zero means to perform the work immediately. * @@ -139,15 +127,18 @@ static bool work_is_canceling(FAR struct kworker_s *kworkers, int nthreads, * ****************************************************************************/ -int work_queue(int qid, FAR struct work_s *work, worker_t worker, - FAR void *arg, clock_t delay) +int work_queue_wq(FAR struct kwork_wqueue_s *wqueue, + FAR struct work_s *work, worker_t worker, + FAR void *arg, clock_t delay) { - FAR struct kwork_wqueue_s * wqueue; - wdentry_t expiry; irqstate_t flags; - int nthreads; int ret = OK; + if (wqueue == NULL || work == NULL || worker == NULL) + { + return -EINVAL; + } + /* Interrupts are disabled so that this logic can be called from with * task logic or from interrupt handling logic. */ @@ -158,33 +149,10 @@ int work_queue(int qid, FAR struct work_s *work, worker_t worker, if (work->worker != NULL) { - work_cancel(qid, work); + work_cancel_wq(wqueue, work); } -#ifdef CONFIG_SCHED_HPWORK - if (qid == HPWORK) - { - wqueue = (FAR struct kwork_wqueue_s *)&g_hpwork; - expiry = hp_work_timer_expiry; - nthreads = CONFIG_SCHED_HPNTHREADS; - } - else -#endif -#ifdef CONFIG_SCHED_LPWORK - if (qid == LPWORK) - { - wqueue = (FAR struct kwork_wqueue_s *)&g_lpwork; - expiry = lp_work_timer_expiry; - nthreads = CONFIG_SCHED_LPNTHREADS; - } - else -#endif - { - ret = -EINVAL; - goto out; - } - - if (work_is_canceling(wqueue->worker, nthreads, work)) + if (work_is_canceling(wqueue->worker, wqueue->nthreads, work)) { goto out; } @@ -192,7 +160,8 @@ int work_queue(int qid, FAR struct work_s *work, worker_t worker, /* Initialize the work structure. */ work->worker = worker; /* Work callback. non-NULL means queued */ - work->arg = arg; /* Callback argument */ + work->arg = arg; /* Callback argument */ + work->wq = wqueue; /* Work queue */ /* Queue the new work */ @@ -202,8 +171,7 @@ int work_queue(int qid, FAR struct work_s *work, worker_t worker, } else { - wd_start(&work->u.timer, delay, expiry, - (wdparm_t)work); + wd_start(&work->u.timer, delay, work_timer_expiry, (wdparm_t)work); } out: @@ -211,4 +179,10 @@ out: return ret; } +int work_queue(int qid, FAR struct work_s *work, worker_t worker, + FAR void *arg, clock_t delay) +{ + return work_queue_wq(work_qid2wq(qid), work, worker, arg, delay); +} + #endif /* CONFIG_SCHED_WORKQUEUE */ diff --git a/sched/wqueue/kwork_thread.c b/sched/wqueue/kwork_thread.c index f693a7f6c8..5351b78900 100644 --- a/sched/wqueue/kwork_thread.c +++ b/sched/wqueue/kwork_thread.c @@ -37,6 +37,7 @@ #include #include #include +#include #include "sched/sched.h" #include "wqueue/wqueue.h" @@ -82,6 +83,8 @@ struct hp_wqueue_s g_hpwork = { {NULL, NULL}, SEM_INITIALIZER(0), + SEM_INITIALIZER(0), + CONFIG_SCHED_HPNTHREADS, }; #endif /* CONFIG_SCHED_HPWORK */ @@ -93,6 +96,8 @@ struct lp_wqueue_s g_lpwork = { {NULL, NULL}, SEM_INITIALIZER(0), + SEM_INITIALIZER(0), + CONFIG_SCHED_LPNTHREADS, }; #endif /* CONFIG_SCHED_LPWORK */ @@ -144,7 +149,7 @@ static int work_thread(int argc, FAR char *argv[]) /* Loop forever */ - for (; ; ) + while (!wqueue->exit) { /* And check each entry in the work queue. Since we have disabled * interrupts we know: (1) we will not be suspended unless we do @@ -209,7 +214,8 @@ static int work_thread(int argc, FAR char *argv[]) leave_critical_section(flags); - return OK; /* To keep some compilers happy */ + nxsem_post(&wqueue->exsem); + return OK; } /**************************************************************************** @@ -223,7 +229,6 @@ static int work_thread(int argc, FAR char *argv[]) * name - Name of the new task * priority - Priority of the new task * stack_size - size (in bytes) of the stack needed - * nthread - Number of work thread should be created * wqueue - Work queue instance * * Returned Value: @@ -232,7 +237,7 @@ static int work_thread(int argc, FAR char *argv[]) ****************************************************************************/ static int work_thread_create(FAR const char *name, int priority, - int stack_size, int nthread, + int stack_size, FAR struct kwork_wqueue_s *wqueue) { FAR char *argv[3]; @@ -242,12 +247,12 @@ static int work_thread_create(FAR const char *name, int priority, int pid; /* Don't permit any of the threads to run until we have fully initialized - * g_hpwork and g_lpwork. + * all of them. */ sched_lock(); - for (wndx = 0; wndx < nthread; wndx++) + for (wndx = 0; wndx < wqueue->nthreads; wndx++) { nxsem_init(&wqueue->worker[wndx].wait, 0, 0); @@ -279,21 +284,131 @@ static int work_thread_create(FAR const char *name, int priority, * Public Functions ****************************************************************************/ +/**************************************************************************** + * Name: work_queue_create + * + * Description: + * Create a new work queue. The work queue is identified by its work + * queue ID, which is used to queue works to the work queue and to + * perform other operations on the work queue. + * This function will create a work thread pool with nthreads threads. + * The work queue ID is returned on success. + * + * Input Parameters: + * name - Name of the new task + * priority - Priority of the new task + * stack_size - size (in bytes) of the stack needed + * nthreads - Number of work thread should be created + * + * Returned Value: + * The work queue handle returned on success. Otherwise, NULL + * + ****************************************************************************/ + +FAR struct kwork_wqueue_s *work_queue_create(FAR const char *name, + int priority, + int stack_size, int nthreads) +{ + FAR struct kwork_wqueue_s *wqueue; + int ret; + + if (nthreads < 1) + { + return NULL; + } + + /* Allocate a new work queue */ + + wqueue = kmm_zalloc(sizeof(struct kwork_wqueue_s) + + nthreads * sizeof(struct kworker_s)); + if (wqueue == NULL) + { + return NULL; + } + + /* Initialize the work queue structure */ + + dq_init(&wqueue->q); + nxsem_init(&wqueue->sem, 0, 0); + nxsem_init(&wqueue->exsem, 0, 0); + wqueue->nthreads = nthreads; + + /* Create the work queue thread pool */ + + ret = work_thread_create(name, priority, stack_size, wqueue); + if (ret < 0) + { + kmm_free(wqueue); + return NULL; + } + + return wqueue; +} + +/**************************************************************************** + * Name: work_queue_free + * + * Description: + * Destroy a work queue. The work queue is identified by its work queue ID. + * All worker threads will be destroyed and the work queue will be freed. + * The work queue ID is invalid after this function returns. + * + * Input Parameters: + * qid - The work queue ID + * + * Returned Value: + * Zero on success, a negated errno value on failure. + * + ****************************************************************************/ + +int work_queue_free(FAR struct kwork_wqueue_s *wqueue) +{ + int wndx; + + if (wqueue == NULL) + { + return -EINVAL; + } + + /* Mark the work queue as exiting */ + + wqueue->exit = true; + + /* Queue a exit work for all threads */ + + for (wndx = 0; wndx < wqueue->nthreads; wndx++) + { + nxsem_post(&wqueue->sem); + } + + for (wndx = 0; wndx < wqueue->nthreads; wndx++) + { + nxsem_wait_uninterruptible(&wqueue->exsem); + } + + nxsem_destroy(&wqueue->sem); + nxsem_destroy(&wqueue->exsem); + kmm_free(wqueue); + + return OK; +} + /**************************************************************************** * Name: work_start_highpri * * Description: - * Start the high-priority, kernel-mode worker thread(s) + * Start the high-priority, kernel-mode work queue. * * Input Parameters: * None * * Returned Value: - * A negated errno value is returned on failure. + * Return zero (OK) on success. A negated errno value is returned on + * errno value is returned on failure. * ****************************************************************************/ -#if defined(CONFIG_SCHED_HPWORK) +#ifdef CONFIG_SCHED_HPWORK int work_start_highpri(void) { /* Start the high-priority, kernel mode worker thread(s) */ @@ -302,7 +417,6 @@ int work_start_highpri(void) return work_thread_create(HPWORKNAME, CONFIG_SCHED_HPWORKPRIORITY, CONFIG_SCHED_HPWORKSTACKSIZE, - CONFIG_SCHED_HPNTHREADS, (FAR struct kwork_wqueue_s *)&g_hpwork); } #endif /* CONFIG_SCHED_HPWORK */ @@ -317,11 +431,12 @@ int work_start_highpri(void) * None * * Returned Value: - * A negated errno value is returned on failure. + * Return zero (OK) on success. A negated errno value is returned on + * errno value is returned on failure. * ****************************************************************************/ -#if defined(CONFIG_SCHED_LPWORK) +#ifdef CONFIG_SCHED_LPWORK int work_start_lowpri(void) { /* Start the low-priority, kernel mode worker thread(s) */ @@ -330,7 +445,6 @@ int work_start_lowpri(void) return work_thread_create(LPWORKNAME, CONFIG_SCHED_LPWORKPRIORITY, CONFIG_SCHED_LPWORKSTACKSIZE, - CONFIG_SCHED_LPNTHREADS, (FAR struct kwork_wqueue_s *)&g_lpwork); } #endif /* CONFIG_SCHED_LPWORK */ diff --git a/sched/wqueue/wqueue.h b/sched/wqueue/wqueue.h index 66f2ed5a4c..4700878b63 100644 --- a/sched/wqueue/wqueue.h +++ b/sched/wqueue/wqueue.h @@ -32,6 +32,7 @@ #include #include +#include #ifdef CONFIG_SCHED_WORKQUEUE @@ -63,7 +64,10 @@ struct kwork_wqueue_s { struct dq_queue_s q; /* The queue of pending work */ sem_t sem; /* The counting semaphore of the wqueue */ - struct kworker_s worker[1]; /* Describes a worker thread */ + sem_t exsem; /* Sync waiting for thread exit */ + uint8_t nthreads; /* Number of worker threads */ + bool exit; /* A flag to request the thread to exit */ + struct kworker_s worker[0]; /* Describes a worker thread */ }; /* This structure defines the state of one high-priority work queue. This @@ -75,6 +79,9 @@ struct hp_wqueue_s { struct dq_queue_s q; /* The queue of pending work */ sem_t sem; /* The counting semaphore of the wqueue */ + sem_t exsem; /* Sync waiting for thread exit */ + uint8_t nthreads; /* Number of worker threads */ + bool exit; /* A flag to request the thread to exit */ /* Describes each thread in the high priority queue's thread pool */ @@ -91,6 +98,9 @@ struct lp_wqueue_s { struct dq_queue_s q; /* The queue of pending work */ sem_t sem; /* The counting semaphore of the wqueue */ + sem_t exsem; /* Sync waiting for thread exit */ + uint8_t nthreads; /* Number of worker threads */ + bool exit; /* A flag to request the thread to exit */ /* Describes each thread in the low priority queue's thread pool */ @@ -118,6 +128,27 @@ extern struct lp_wqueue_s g_lpwork; * Public Function Prototypes ****************************************************************************/ +static inline_function FAR struct kwork_wqueue_s *work_qid2wq(int qid) +{ +#ifdef CONFIG_SCHED_HPWORK + if (qid == HPWORK) + { + return (FAR struct kwork_wqueue_s *)&g_hpwork; + } + else +#endif +#ifdef CONFIG_SCHED_LPWORK + if (qid == LPWORK) + { + return (FAR struct kwork_wqueue_s *)&g_lpwork; + } + else +#endif + { + return NULL; + } +} + /**************************************************************************** * Name: work_start_highpri * @@ -128,7 +159,7 @@ extern struct lp_wqueue_s g_lpwork; * None * * Returned Value: - * The task ID of the worker thread is returned on success. A negated + * Return zero (OK) on success. A negated errno value is returned on * errno value is returned on failure. * ****************************************************************************/ @@ -147,7 +178,7 @@ int work_start_highpri(void); * None * * Returned Value: - * The task ID of the worker thread is returned on success. A negated + * Return zero (OK) on success. A negated errno value is returned on * errno value is returned on failure. * ****************************************************************************/