From 4a4b3ac537e76b57ec169cd762fc658b66b0d092 Mon Sep 17 00:00:00 2001 From: Gregory Nutt Date: Fri, 10 Oct 2014 16:24:50 -0600 Subject: [PATCH] Add support for multiple low-priority worker threads --- include/nuttx/wqueue.h | 18 +++----- libc/wqueue/work_cancel.c | 2 +- libc/wqueue/work_queue.c | 7 +-- libc/wqueue/work_signal.c | 2 +- libc/wqueue/work_usrthread.c | 24 +++++----- libc/wqueue/wqueue.h | 11 ++++- sched/wqueue/kwork_cancel.c | 7 +-- sched/wqueue/kwork_hpthread.c | 24 +++++----- sched/wqueue/kwork_inherit.c | 9 ++-- sched/wqueue/kwork_lpthread.c | 82 ++++++++++++++++++++++++++++------- sched/wqueue/kwork_process.c | 6 ++- sched/wqueue/kwork_queue.c | 15 ++++--- sched/wqueue/kwork_signal.c | 22 +++++++++- sched/wqueue/wqueue.h | 52 ++++++++++++++++++++-- 14 files changed, 204 insertions(+), 77 deletions(-) diff --git a/include/nuttx/wqueue.h b/include/nuttx/wqueue.h index b2d51cb0f6..4de65ff687 100644 --- a/include/nuttx/wqueue.h +++ b/include/nuttx/wqueue.h @@ -82,6 +82,8 @@ * then an additional, lower-priority work queue will also be created. This * lower priority work queue is better suited for more extended processing * (such as file system clean-up operations) + * CONFIG_SCHED_LPNTHREADS - The number of thread in the low-priority queue's + * thread pool. Default: 1 * CONFIG_SCHED_LPWORKPRIORITY - The minimum execution priority of the lower * priority worker thread. Default: 50 * CONFIG_SCHED_LPWORKPRIOMAX - The maximum execution priority of the lower @@ -189,6 +191,10 @@ #ifdef CONFIG_SCHED_LPWORK +# ifndef CONFIG_SCHED_LPNTHREADS +# define CONFIG_SCHED_LPNTHREADS 1 +#endif + # ifndef CONFIG_SCHED_LPWORKPRIORITY # define CONFIG_SCHED_LPWORKPRIORITY 50 # endif @@ -293,18 +299,6 @@ #ifndef __ASSEMBLY__ -/* This structure defines the state on one work queue. This structure is - * used internally by the OS and worker queue logic and should not be - * accessed by application logic. - */ - -struct wqueue_s -{ - uint32_t delay; /* Delay between polling cycles (ticks) */ - struct dq_queue_s q; /* The queue of pending work */ - pid_t pid[1]; /* The task ID of the worker thread(s) */ -}; - /* Defines the work callback */ typedef void (*worker_t)(FAR void *arg); diff --git a/libc/wqueue/work_cancel.c b/libc/wqueue/work_cancel.c index fa04f6ca9f..322a85ae70 100644 --- a/libc/wqueue/work_cancel.c +++ b/libc/wqueue/work_cancel.c @@ -92,7 +92,7 @@ * ****************************************************************************/ -static int work_qcancel(FAR struct wqueue_s *wqueue, FAR struct work_s *work) +static int work_qcancel(FAR struct usr_wqueue_s *wqueue, FAR struct work_s *work) { int ret = -ENOENT; diff --git a/libc/wqueue/work_queue.c b/libc/wqueue/work_queue.c index a17d8d14b9..19e441178e 100644 --- a/libc/wqueue/work_queue.c +++ b/libc/wqueue/work_queue.c @@ -102,8 +102,9 @@ * ****************************************************************************/ -static int work_qqueue(FAR struct wqueue_s *wqueue, FAR struct work_s *work, - worker_t worker, FAR void *arg, uint32_t delay) +static int work_qqueue(FAR struct usr_wqueue_s *wqueue, + FAR struct work_s *work, worker_t worker, + FAR void *arg, uint32_t delay) { DEBUGASSERT(work != NULL); @@ -122,7 +123,7 @@ static int work_qqueue(FAR struct wqueue_s *wqueue, FAR struct work_s *work, work->qtime = clock_systimer(); /* Time work queued */ dq_addlast((FAR dq_entry_t *)work, &wqueue->q); - kill(wqueue->pid[0], SIGWORK); /* Wake up the worker thread */ + kill(wqueue->pid, SIGWORK); /* Wake up the worker thread */ work_unlock(); return OK; diff --git a/libc/wqueue/work_signal.c b/libc/wqueue/work_signal.c index 813b9e8d68..1ab861dfed 100644 --- a/libc/wqueue/work_signal.c +++ b/libc/wqueue/work_signal.c @@ -97,7 +97,7 @@ int work_signal(int qid) { /* Signal the worker thread */ - ret = kill(g_usrwork.pid[0], SIGWORK); + ret = kill(g_usrwork.pid, SIGWORK); if (ret < 0) { int errcode = errno; diff --git a/libc/wqueue/work_usrthread.c b/libc/wqueue/work_usrthread.c index 0b4606e2e7..48ccf1f8bc 100644 --- a/libc/wqueue/work_usrthread.c +++ b/libc/wqueue/work_usrthread.c @@ -83,7 +83,7 @@ /* The state of the user mode work queue. */ -struct wqueue_s g_usrwork; +struct usr_wqueue_s g_usrwork; /* This semaphore supports exclusive access to the user-mode work queue */ @@ -118,7 +118,7 @@ extern pthread_mutex_t g_usrmutex; * ****************************************************************************/ -void work_process(FAR struct wqueue_s *wqueue) +void work_process(FAR struct usr_wqueue_s *wqueue) { volatile FAR struct work_s *work; worker_t worker; @@ -351,21 +351,21 @@ int work_usrstart(void) /* Start a user-mode worker thread for use by applications. */ - g_usrwork.pid[0] = task_create("uwork", - CONFIG_SCHED_USRWORKPRIORITY, - CONFIG_SCHED_USRWORKSTACKSIZE, - (main_t)work_usrthread, - (FAR char * const *)NULL); + g_usrwork.pid = task_create("uwork", + CONFIG_SCHED_USRWORKPRIORITY, + CONFIG_SCHED_USRWORKSTACKSIZE, + (main_t)work_usrthread, + (FAR char * const *)NULL); - DEBUGASSERT(g_usrwork.pid[0] > 0); - if (g_usrwork.pid[0] < 0) + DEBUGASSERT(g_usrwork.pid > 0); + if (g_usrwork.pid < 0) { int errcode = errno; DEBUGASSERT(errcode > 0); return -errcode; } - return g_usrwork.pid[0]; + return g_usrwork.pid; } #else { @@ -398,8 +398,8 @@ int work_usrstart(void) (void)pthread_detach(usrwork); - g_usrwork.pid[0] = (pid_t)usrwork; - return g_usrwork.pid[0]; + g_usrwork.pid = (pid_t)usrwork; + return g_usrwork.pid; } #endif } diff --git a/libc/wqueue/wqueue.h b/libc/wqueue/wqueue.h index 9fcbabdb81..13bfacf0a0 100644 --- a/libc/wqueue/wqueue.h +++ b/libc/wqueue/wqueue.h @@ -57,6 +57,15 @@ * Public Type Definitions ****************************************************************************/ +/* This structure defines the state of one user-modework queue. */ + +struct usr_wqueue_s +{ + uint32_t delay; /* Delay between polling cycles (ticks) */ + struct dq_queue_s q; /* The queue of pending work */ + pid_t pid; /* The task ID of the worker thread(s) */ +}; + /**************************************************************************** * Public Data ****************************************************************************/ @@ -64,7 +73,7 @@ #if defined(CONFIG_SCHED_USRWORK) && !defined(__KERNEL__) /* The state of the user mode work queue */ -extern struct wqueue_s g_usrwork; +extern struct usr_wqueue_s g_usrwork; /* This semaphore/mutex supports exclusive access to the user-mode work queue */ diff --git a/sched/wqueue/kwork_cancel.c b/sched/wqueue/kwork_cancel.c index 589389c52d..8aa133aaab 100644 --- a/sched/wqueue/kwork_cancel.c +++ b/sched/wqueue/kwork_cancel.c @@ -91,7 +91,8 @@ * ****************************************************************************/ -static int work_qcancel(FAR struct wqueue_s *wqueue, FAR struct work_s *work) +static int work_qcancel(FAR struct kwork_wqueue_s *wqueue, + FAR struct work_s *work) { irqstate_t flags; int ret = -ENOENT; @@ -156,7 +157,7 @@ int work_cancel(int qid, FAR struct work_s *work) { /* Cancel high priority work */ - return work_qcancel(&g_hpwork, work); + return work_qcancel((FAR struct kwork_wqueue_s *)&g_hpwork, work); } else #endif @@ -165,7 +166,7 @@ int work_cancel(int qid, FAR struct work_s *work) { /* Cancel low priority work */ - return work_qcancel(&g_lpwork, work); + return work_qcancel((FAR struct kwork_wqueue_s *)&g_lpwork, work); } else #endif diff --git a/sched/wqueue/kwork_hpthread.c b/sched/wqueue/kwork_hpthread.c index 5457452d31..bb78973aee 100644 --- a/sched/wqueue/kwork_hpthread.c +++ b/sched/wqueue/kwork_hpthread.c @@ -66,7 +66,7 @@ /* The state of the kernel mode, high priority work queue. */ -struct wqueue_s g_hpwork; +struct hp_wqueue_s g_hpwork; /**************************************************************************** * Private Data @@ -123,7 +123,7 @@ static int work_hpthread(int argc, char *argv[]) * we process items in the work list. */ - work_process(&g_hpwork); + work_process((FAR struct kwork_wqueue_s *)&g_hpwork, 0); } return OK; /* To keep some compilers happy */ @@ -150,22 +150,24 @@ static int work_hpthread(int argc, char *argv[]) int work_hpstart(void) { + pid_t pid; + /* Initialize work queue data structures */ - g_hpwork.delay = CONFIG_SCHED_WORKPERIOD / USEC_PER_TICK; + g_hpwork.delay = CONFIG_SCHED_WORKPERIOD / USEC_PER_TICK; dq_init(&g_hpwork.q); /* Start the high-priority, kernel mode worker thread */ svdbg("Starting high-priority kernel worker thread\n"); - g_hpwork.pid[0] = kernel_thread(HPWORKNAME, CONFIG_SCHED_WORKPRIORITY, - CONFIG_SCHED_WORKSTACKSIZE, - (main_t)work_hpthread, - (FAR char * const *)NULL); + pid = kernel_thread(HPWORKNAME, CONFIG_SCHED_WORKPRIORITY, + CONFIG_SCHED_WORKSTACKSIZE, + (main_t)work_hpthread, + (FAR char * const *)NULL); - DEBUGASSERT(g_hpwork.pid[0] > 0); - if (g_hpwork.pid[0] < 0) + DEBUGASSERT(pid > 0); + if (pid < 0) { int errcode = errno; DEBUGASSERT(errcode > 0); @@ -174,7 +176,9 @@ int work_hpstart(void) return -errcode; } - return g_hpwork.pid[0]; + g_hpwork.worker[0].pid = pid; + g_hpwork.worker[0].busy = true; + return pid; } #endif /* CONFIG_SCHED_WORKQUEUE && CONFIG_SCHED_HPWORK*/ diff --git a/sched/wqueue/kwork_inherit.c b/sched/wqueue/kwork_inherit.c index af2698f5db..056997d0f6 100644 --- a/sched/wqueue/kwork_inherit.c +++ b/sched/wqueue/kwork_inherit.c @@ -86,12 +86,11 @@ void lpwork_boostpriority(uint8_t reqprio) reqprio = CONFIG_SCHED_LPWORKPRIOMAX; } - /* Get the process ID of the low priority worker thread from the low - * priority work queue. Then get the TCB of the low priority worker - * thread from the process ID. + /* Get the process ID of one low priority worker thread. Then get the TCB + * of the low priority worker thread from the process ID. */ - wpid = g_lpwork.pid[0]; + wpid = g_lpwork.worker[0].pid; wtcb = sched_gettcb(wpid); /* Prevent context switches until we get the priorities right */ @@ -214,7 +213,7 @@ void lpwork_restorepriority(uint8_t reqprio) * thread from the process ID. */ - wpid = g_lpwork.pid[0]; + wpid = g_lpwork.worker[0].pid; wtcb = sched_gettcb(wpid); /* Prevent context switches until we get the priorities right */ diff --git a/sched/wqueue/kwork_lpthread.c b/sched/wqueue/kwork_lpthread.c index 30e4eb50a1..d42029e7f0 100644 --- a/sched/wqueue/kwork_lpthread.c +++ b/sched/wqueue/kwork_lpthread.c @@ -39,6 +39,9 @@ #include +#include +#include +#include #include #include #include @@ -66,7 +69,7 @@ /* The state of the kernel mode, low priority work queue(s). */ -struct wqueue_s g_lpwork; +struct lp_wqueue_s g_lpwork; /**************************************************************************** * Private Data @@ -103,6 +106,28 @@ struct wqueue_s g_lpwork; static int work_lpthread(int argc, char *argv[]) { + int wndx; + + /* Find out thread index by search the workers in g_lpwork */ + + { + pid_t me = getpid(); + int i; + + /* Check each entry if we have to */ + + for (wndx = 0, i = 0; i < CONFIG_SCHED_LPNTHREADS; i++) + { + if (g_lpwork.worker[i].pid == me) + { + wndx = i; + break; + } + } + + DEBUGASSERT(i < CONFIG_SCHED_LPNTHREADS); + } + /* Loop forever */ for (;;) @@ -112,15 +137,21 @@ static int work_lpthread(int argc, char *argv[]) * context (for example, if the memory was freed from an interrupt handler). * NOTE: If the work thread is disabled, this clean-up is performed by * the IDLE thread (at a very, very low priority). + * + * In the event of multiple low priority threads, on index == 0 will do + * the garbage collection. */ - sched_garbagecollection(); + if (wndx == 0) + { + sched_garbagecollection(); + } /* Then process queued work. We need to keep interrupts disabled while * we process items in the work list. */ - work_process(&g_lpwork); + work_process((FAR struct kwork_wqueue_s *)&g_lpwork, wndx); } return OK; /* To keep some compilers happy */ @@ -147,31 +178,50 @@ static int work_lpthread(int argc, char *argv[]) int work_lpstart(void) { + pid_t pid; + int wndx; + /* Initialize work queue data structures */ + memset(&g_lpwork, 0, sizeof(struct kwork_wqueue_s)); + g_lpwork.delay = CONFIG_SCHED_LPWORKPERIOD / USEC_PER_TICK; dq_init(&g_lpwork.q); + /* Don't permit any of the threads to run until we have fully initialized + * g_lpwork. + */ + + sched_lock(); + /* Start the low-priority, kernel mode worker thread(s) */ - svdbg("Starting low-priority kernel worker thread\n"); + svdbg("Starting low-priority kernel worker thread(s)\n"); - g_lpwork.pid[0] = kernel_thread(LPWORKNAME, CONFIG_SCHED_LPWORKPRIORITY, - CONFIG_SCHED_LPWORKSTACKSIZE, - (main_t)work_lpthread, - (FAR char * const *)NULL); - - DEBUGASSERT(g_lpwork.pid[0] > 0); - if (g_lpwork.pid[0] < 0) + for (wndx = 0; wndx < CONFIG_SCHED_LPNTHREADS; wndx++) { - int errcode = errno; - DEBUGASSERT(errcode > 0); + pid = kernel_thread(LPWORKNAME, CONFIG_SCHED_LPWORKPRIORITY, + CONFIG_SCHED_LPWORKSTACKSIZE, + (main_t)work_lpthread, + (FAR char * const *)NULL); - slldbg("kernel_thread failed: %d\n", errcode); - return -errcode; + DEBUGASSERT(pid > 0); + if (pid < 0) + { + int errcode = errno; + DEBUGASSERT(errcode > 0); + + slldbg("kernel_thread %d failed: %d\n", wndx, errcode); + sched_unlock(); + return -errcode; + } + + g_lpwork.worker[wndx].pid = pid; + g_lpwork.worker[wndx].busy = true; } - return g_lpwork.pid[0]; + sched_unlock(); + return g_lpwork.worker[0].pid; } #endif /* CONFIG_SCHED_WORKQUEUE && CONFIG_SCHED_LPWORK */ diff --git a/sched/wqueue/kwork_process.c b/sched/wqueue/kwork_process.c index 1943e3e5fd..a51195fe5a 100644 --- a/sched/wqueue/kwork_process.c +++ b/sched/wqueue/kwork_process.c @@ -48,6 +48,8 @@ #include +#include "wqueue/wqueue.h" + #ifdef CONFIG_SCHED_WORKQUEUE /**************************************************************************** @@ -105,7 +107,7 @@ * ****************************************************************************/ -void work_process(FAR struct wqueue_s *wqueue) +void work_process(FAR struct kwork_wqueue_s *wqueue, int wndx) { volatile FAR struct work_s *work; worker_t worker; @@ -239,7 +241,9 @@ void work_process(FAR struct wqueue_s *wqueue) * Interrupts will be re-enabled while we wait. */ + wqueue->worker[wndx].busy = false; usleep(next * USEC_PER_TICK); + wqueue->worker[wndx].busy = true; } } diff --git a/sched/wqueue/kwork_queue.c b/sched/wqueue/kwork_queue.c index e385d76b75..842fe656ff 100644 --- a/sched/wqueue/kwork_queue.c +++ b/sched/wqueue/kwork_queue.c @@ -97,12 +97,13 @@ * is invoked. Zero means to perform the work immediately. * * Returned Value: - * Zero on success, a negated errno on failure + * None * ****************************************************************************/ -static int work_qqueue(FAR struct wqueue_s *wqueue, FAR struct work_s *work, - worker_t worker, FAR void *arg, uint32_t delay) +static void work_qqueue(FAR struct kwork_wqueue_s *wqueue, + FAR struct work_s *work, worker_t worker, + FAR void *arg, uint32_t delay) { irqstate_t flags; @@ -123,10 +124,8 @@ static int work_qqueue(FAR struct wqueue_s *wqueue, FAR struct work_s *work, work->qtime = clock_systimer(); /* Time work queued */ dq_addlast((FAR dq_entry_t *)work, &wqueue->q); - kill(wqueue->pid[0], SIGWORK); /* Wake up the worker thread */ irqrestore(flags); - return OK; } /**************************************************************************** @@ -170,7 +169,8 @@ int work_queue(int qid, FAR struct work_s *work, worker_t worker, { /* Cancel high priority work */ - return work_qqueue(&g_hpwork, work, worker, arg, delay); + work_qqueue((FAR struct kwork_wqueue_s *)&g_hpwork, work, worker, arg, delay); + return work_signal(HPWORK); } else #endif @@ -179,7 +179,8 @@ int work_queue(int qid, FAR struct work_s *work, worker_t worker, { /* Cancel low priority work */ - return work_qqueue(&g_lpwork, work, worker, arg, delay); + work_qqueue((FAR struct kwork_wqueue_s *)&g_lpwork, work, worker, arg, delay); + return work_signal(LPWORK); } else #endif diff --git a/sched/wqueue/kwork_signal.c b/sched/wqueue/kwork_signal.c index 03253eff34..ccc342e1b7 100644 --- a/sched/wqueue/kwork_signal.c +++ b/sched/wqueue/kwork_signal.c @@ -90,6 +90,7 @@ int work_signal(int qid) { pid_t pid; + int wndx; int ret; /* Get the process ID of the worker thread */ @@ -97,14 +98,31 @@ int work_signal(int qid) #ifdef CONFIG_SCHED_HPWORK if (qid == HPWORK) { - pid = g_hpwork.pid[0]; + pid = g_hpwork.worker[0].pid; } else #endif #ifdef CONFIG_SCHED_LPWORK if (qid == LPWORK) { - pid = g_lpwork.pid[0]; + int i; + + /* Find an IDLE worker thread */ + + for (wndx = 0, i = 0; i < CONFIG_SCHED_LPNTHREADS; i++) + { + if (g_lpwork.worker[i].busy) + { + wndx = i; + break; + } + } + + /* Use the process ID of the IDLE thread (or thread 0 is the are all + * busy) + */ + + pid = g_lpwork.worker[wndx].pid; } else #endif diff --git a/sched/wqueue/wqueue.h b/sched/wqueue/wqueue.h index 5307750724..2a93e88f14 100644 --- a/sched/wqueue/wqueue.h +++ b/sched/wqueue/wqueue.h @@ -42,6 +42,10 @@ #include +#include +#include +#include + #ifdef CONFIG_SCHED_WORKQUEUE /**************************************************************************** @@ -64,6 +68,48 @@ /**************************************************************************** * Public Type Definitions ****************************************************************************/ +/* This represents one worker */ + +struct kworker_s +{ + pid_t pid; /* The task ID of the worker thread */ + volatile bool busy; /* True: Worker is not available */ +}; + +/* This structure defines the state of one high-priority work queue */ + +struct kwork_wqueue_s +{ + uint32_t delay; /* Delay between polling cycles (ticks) */ + struct dq_queue_s q; /* The queue of pending work */ + struct kworker_s worker[1]; /* Describes a worker thread */ +}; + + +/* This structure defines the state of one high-priority work queue. This + * structure must be cast-compatible with kwork_wqueue_s. + */ + +struct hp_wqueue_s +{ + uint32_t delay; /* Delay between polling cycles (ticks) */ + struct dq_queue_s q; /* The queue of pending work */ + struct kworker_s worker[1]; /* Describes the single high priority worker */ +}; + +/* This structure defines the state of one high-priority work queue. This + * structure must be cast compatible with kwork_wqueue_s + */ + +struct lp_wqueue_s +{ + uint32_t delay; /* Delay between polling cycles (ticks) */ + struct dq_queue_s q; /* The queue of pending work */ + + /* Describes each thread in the low priority queue's thread pool */ + + struct kworker_s worker[CONFIG_SCHED_LPNTHREADS]; +}; /**************************************************************************** * Public Data @@ -72,13 +118,13 @@ #ifdef CONFIG_SCHED_HPWORK /* The state of the kernel mode, high priority work queue. */ -extern struct wqueue_s g_hpwork; +extern struct hp_wqueue_s g_hpwork; #endif #ifdef CONFIG_SCHED_LPWORK /* The state of the kernel mode, low priority work queue(s). */ -extern struct wqueue_s g_lpwork; +extern struct lp_wqueue_s g_lpwork; #endif /**************************************************************************** @@ -140,7 +186,7 @@ int work_lpstart(void); * ****************************************************************************/ -void work_process(FAR struct wqueue_s *wqueue); +void work_process(FAR struct kwork_wqueue_s *wqueue, int wndx); #endif /* CONFIG_SCHED_WORKQUEUE */ #endif /* __SCHED_WQUEUE_WQUEUE_H */