Squashed commit of the following:

sched/wqueue:  Modify high priority work queue to support multiple threads.

    sched/wqueue and libs/libc/wqueue:  workqueues don't need set global data to zero since .bss is cleared automatically.  Removing this unnecessary initialization also avoids the loss the work items queued before initialization.
This commit is contained in:
ligd 2018-08-25 14:52:13 -06:00 committed by Gregory Nutt
parent 72fd2a5e34
commit 631071cded
8 changed files with 161 additions and 85 deletions

View File

@ -65,6 +65,8 @@
* (which runs at the lowest of priority and may not be appropriate * (which runs at the lowest of priority and may not be appropriate
* if memory reclamation is of high priority). If CONFIG_SCHED_HPWORK * if memory reclamation is of high priority). If CONFIG_SCHED_HPWORK
* is enabled, then the following options can also be used: * is enabled, then the following options can also be used:
* CONFIG_SCHED_HPNTHREADS - The number of thread in the high-priority queue's
* thread pool. Default: 1
* CONFIG_SCHED_HPWORKPRIORITY - The execution priority of the high- * CONFIG_SCHED_HPWORKPRIORITY - The execution priority of the high-
* priority worker thread. Default: 224 * priority worker thread. Default: 224
* CONFIG_SCHED_HPWORKPERIOD - How often the worker thread checks for * CONFIG_SCHED_HPWORKPERIOD - How often the worker thread checks for
@ -146,6 +148,10 @@
#ifdef CONFIG_SCHED_HPWORK #ifdef CONFIG_SCHED_HPWORK
# ifndef CONFIG_SCHED_HPNTHREADS
# define CONFIG_SCHED_HPNTHREADS 1
# endif
# ifndef CONFIG_SCHED_HPWORKPRIORITY # ifndef CONFIG_SCHED_HPWORKPRIORITY
# define CONFIG_SCHED_HPWORKPRIORITY 224 # define CONFIG_SCHED_HPWORKPRIORITY 224
# endif # endif

View File

@ -341,7 +341,6 @@ int work_usrstart(void)
/* Initialize work queue data structures */ /* Initialize work queue data structures */
g_usrwork.delay = CONFIG_LIB_USRWORKPERIOD / USEC_PER_TICK; g_usrwork.delay = CONFIG_LIB_USRWORKPERIOD / USEC_PER_TICK;
dq_init(&g_usrwork.q);
#ifdef CONFIG_BUILD_PROTECTED #ifdef CONFIG_BUILD_PROTECTED
{ {

View File

@ -1302,6 +1302,24 @@ config SCHED_HPWORK
if SCHED_HPWORK if SCHED_HPWORK
config SCHED_HPNTHREADS
int "Number of high-priority worker threads"
default 1
---help---
This options selects multiple, high-priority threads. This is
essentially a "thread pool" that provides multi-threaded servicing
of the high-priority work queue. This breaks the serialization
of the "queue" (hence, it is no longer a queue at all).
CAUTION: Some drivers may use the work queue to serialize
operations. They may also use the high-priority work queue if it is
available. If there are multiple high-priority worker threads, then
this can result in the loss of that serialization. There may be
concurrent driver operations running on different HP threads and
this could lead to a failure. You may need to visit the use of the
HP work queue on your configuration is you select
CONFIG_SCHED_HPNTHREADS > 1
config SCHED_HPWORKPRIORITY config SCHED_HPWORKPRIORITY
int "High priority worker thread priority" int "High priority worker thread priority"
default 224 default 224
@ -1370,8 +1388,8 @@ config SCHED_LPNTHREADS
example). example).
CAUTION: Some drivers may use the work queue to serialize CAUTION: Some drivers may use the work queue to serialize
operations. The may also use the low-priority work queue if it is operations. They may also use the low-priority work queue if it is
available. If there are multiple low-priority worker thread, then available. If there are multiple low-priority worker threads, then
this can result in the loss of that serialization. There may be this can result in the loss of that serialization. There may be
concurrent driver operations running on different LP threads and concurrent driver operations running on different LP threads and
this could lead to a failure. You may need to visit the use of the this could lead to a failure. You may need to visit the use of the

View File

@ -1,7 +1,7 @@
/**************************************************************************** /****************************************************************************
* sched/wqueue/work_hothread.c * sched/wqueue/work_hpthread.c
* *
* Copyright (C) 2009-2014 Gregory Nutt. All rights reserved. * Copyright (C) 2009-2014, 2018 Gregory Nutt. All rights reserved.
* Author: Gregory Nutt <gnutt@nuttx.org> * Author: Gregory Nutt <gnutt@nuttx.org>
* *
* Redistribution and use in source and binary forms, with or without * Redistribution and use in source and binary forms, with or without
@ -39,6 +39,9 @@
#include <nuttx/config.h> #include <nuttx/config.h>
#include <unistd.h>
#include <sched.h>
#include <string.h>
#include <errno.h> #include <errno.h>
#include <queue.h> #include <queue.h>
#include <debug.h> #include <debug.h>
@ -56,7 +59,7 @@
* Public Data * Public Data
****************************************************************************/ ****************************************************************************/
/* The state of the kernel mode, high priority work queue. */ /* The state of the kernel mode, high priority work queue(s). */
struct hp_wqueue_s g_hpwork; struct hp_wqueue_s g_hpwork;
@ -68,10 +71,10 @@ struct hp_wqueue_s g_hpwork;
* Name: work_hpthread * Name: work_hpthread
* *
* Description: * Description:
* This is the worker thread that performs the actions placed on the high * These are the worker threads that performs the actions placed on the high
* priority work queue. * priority work queue.
* *
* This, along with the lower priority worker thread(s) are the kernel * These, along with the lower priority worker thread(s) are the kernel
* mode work queues (also build in the flat build). One of these threads * mode work queues (also build in the flat build). One of these threads
* also performs periodic garbage collection (that would otherwise be * also performs periodic garbage collection (that would otherwise be
* performed by the idle thread if CONFIG_SCHED_WORKQUEUE is not defined). * performed by the idle thread if CONFIG_SCHED_WORKQUEUE is not defined).
@ -92,31 +95,68 @@ struct hp_wqueue_s g_hpwork;
static int work_hpthread(int argc, char *argv[]) static int work_hpthread(int argc, char *argv[])
{ {
#if CONFIG_SCHED_HPNTHREADS > 1
int wndx;
pid_t me = getpid();
int i;
/* Find out thread index by search the workers in g_hpwork */
for (wndx = 0, i = 0; i < CONFIG_SCHED_HPNTHREADS; i++)
{
if (g_hpwork.worker[i].pid == me)
{
wndx = i;
break;
}
}
DEBUGASSERT(i < CONFIG_SCHED_HPNTHREADS);
#endif
/* Loop forever */ /* Loop forever */
for (; ; ) for (; ; )
{ {
#ifndef CONFIG_SCHED_LPWORK #if CONFIG_SCHED_HPNTHREADS > 1
/* First, perform garbage collection. This cleans-up memory /* Thread 0 is special. Only thread 0 performs period garbage collection */
* de-allocations that were queued because they could not be freed in
* that execution context (for example, if the memory was freed from
* an interrupt handler).
*
* NOTE: If the work thread is disabled, this clean-up is performed by
* the IDLE thread (at a very, very low priority). If the low-priority
* work thread is enabled, then the garbage collection is done on that
* thread instead.
*/
sched_garbage_collection(); if (wndx > 0)
{
/* The other threads will perform work, waiting indefinitely until
* signalled for the next work availability.
*
* The special value of zero for the poll period instructs work_process
* to wait indefinitely until a signal is received.
*/
work_process((FAR struct kwork_wqueue_s *)&g_hpwork, 0, wndx);
}
else
#endif
{
#ifndef CONFIG_SCHED_LPWORK
/* First, perform garbage collection. This cleans-up memory
* de-allocations that were queued because they could not be freed in
* that execution context (for example, if the memory was freed from
* an interrupt handler).
*
* NOTE: If the work thread is disabled, this clean-up is performed by
* the IDLE thread (at a very, very low priority). If the low-priority
* work thread is enabled, then the garbage collection is done on that
* thread instead.
*/
sched_garbage_collection();
#endif #endif
/* Then process queued work. work_process will not return until: (1) /* Then process queued work. work_process will not return until: (1)
* there is no further work in the work queue, and (2) the polling * there is no further work in the work queue, and (2) the polling
* period provided by g_hpwork.delay expires. * period provided by g_hpwork.delay expires.
*/ */
work_process((FAR struct kwork_wqueue_s *)&g_hpwork, g_hpwork.delay, 0); work_process((FAR struct kwork_wqueue_s *)&g_hpwork, g_hpwork.delay, 0);
}
} }
return OK; /* To keep some compilers happy */ return OK; /* To keep some compilers happy */
@ -130,7 +170,7 @@ static int work_hpthread(int argc, char *argv[])
* Name: work_hpstart * Name: work_hpstart
* *
* Description: * Description:
* Start the high-priority, kernel-mode work queue. * Start the high-priority, kernel-mode worker thread(s)
* *
* Input Parameters: * Input Parameters:
* None * None
@ -144,31 +184,43 @@ static int work_hpthread(int argc, char *argv[])
int work_hpstart(void) int work_hpstart(void)
{ {
pid_t pid; pid_t pid;
int wndx;
/* Initialize work queue data structures */ /* Initialize work queue data structures */
g_hpwork.delay = CONFIG_SCHED_HPWORKPERIOD / USEC_PER_TICK; g_hpwork.delay = CONFIG_SCHED_HPWORKPERIOD / USEC_PER_TICK;
dq_init(&g_hpwork.q);
/* Start the high-priority, kernel mode worker thread */ /* Don't permit any of the threads to run until we have fully initialized
* g_hpwork.
*/
sinfo("Starting high-priority kernel worker thread\n"); sched_lock();
pid = kthread_create(HPWORKNAME, CONFIG_SCHED_HPWORKPRIORITY, /* Start the high-priority, kernel mode worker thread(s) */
CONFIG_SCHED_HPWORKSTACKSIZE,
(main_t)work_hpthread,
(FAR char * const *)NULL);
DEBUGASSERT(pid > 0); sinfo("Starting high-priority kernel worker thread(s)\n");
if (pid < 0)
for (wndx = 0; wndx < CONFIG_SCHED_HPNTHREADS; wndx++)
{ {
serr("ERROR: kthread_create failed: %d\n", (int)pid); pid = kthread_create(HPWORKNAME, CONFIG_SCHED_HPWORKPRIORITY,
return (int)pid; CONFIG_SCHED_HPWORKSTACKSIZE,
(main_t)work_hpthread,
(FAR char * const *)NULL);
DEBUGASSERT(pid > 0);
if (pid < 0)
{
serr("ERROR: kthread_create %d failed: %d\n", wndx, (int)pid);
sched_unlock();
return (int)pid;
}
g_hpwork.worker[wndx].pid = pid;
g_hpwork.worker[wndx].busy = true;
} }
g_hpwork.worker[0].pid = pid; sched_unlock();
g_hpwork.worker[0].busy = true; return g_hpwork.worker[0].pid;
return pid;
} }
#endif /* CONFIG_SCHED_HPWORK */ #endif /* CONFIG_SCHED_HPWORK */

View File

@ -94,7 +94,7 @@ struct lp_wqueue_s g_lpwork;
static int work_lpthread(int argc, char *argv[]) static int work_lpthread(int argc, char *argv[])
{ {
#if CONFIG_SCHED_LPNTHREADS > 0 #if CONFIG_SCHED_LPNTHREADS > 1
int wndx; int wndx;
pid_t me = getpid(); pid_t me = getpid();
int i; int i;
@ -117,7 +117,7 @@ static int work_lpthread(int argc, char *argv[])
for (; ; ) for (; ; )
{ {
#if CONFIG_SCHED_LPNTHREADS > 0 #if CONFIG_SCHED_LPNTHREADS > 1
/* Thread 0 is special. Only thread 0 performs period garbage collection */ /* Thread 0 is special. Only thread 0 performs period garbage collection */
if (wndx > 0) if (wndx > 0)
@ -184,10 +184,7 @@ int work_lpstart(void)
/* Initialize work queue data structures */ /* Initialize work queue data structures */
memset(&g_lpwork, 0, sizeof(struct kwork_wqueue_s));
g_lpwork.delay = CONFIG_SCHED_LPWORKPERIOD / USEC_PER_TICK; g_lpwork.delay = CONFIG_SCHED_LPWORKPERIOD / USEC_PER_TICK;
dq_init(&g_lpwork.q);
/* Don't permit any of the threads to run until we have fully initialized /* Don't permit any of the threads to run until we have fully initialized
* g_lpwork. * g_lpwork.

View File

@ -1,7 +1,7 @@
/**************************************************************************** /****************************************************************************
* sched/wqueue/work_process.c * sched/wqueue/work_process.c
* *
* Copyright (C) 2009-2014, 2016-2017 Gregory Nutt. All rights reserved. * Copyright (C) 2009-2014, 2016-2018 Gregory Nutt. All rights reserved.
* Author: Gregory Nutt <gnutt@nuttx.org> * Author: Gregory Nutt <gnutt@nuttx.org>
* *
* Redistribution and use in source and binary forms, with or without * Redistribution and use in source and binary forms, with or without
@ -215,12 +215,13 @@ void work_process(FAR struct kwork_wqueue_s *wqueue, clock_t period, int wndx)
} }
} }
#if defined(CONFIG_SCHED_LPWORK) && CONFIG_SCHED_LPNTHREADS > 0 #if (defined(CONFIG_SCHED_HPWORK) && CONFIG_SCHED_HPNTHREADS > 1) \
|| (defined(CONFIG_SCHED_LPWORK) && CONFIG_SCHED_LPNTHREADS > 1)
/* Value of zero for period means that we should wait indefinitely until /* Value of zero for period means that we should wait indefinitely until
* signalled. This option is used only for the case where there are * signalled. This option is used only for the case where there are
* multiple, low-priority worker threads. In that case, only one of * multiple worker threads. In that case, only one of the threads does
* the threads does the poll... the others simple. In all other cases * the poll... the others simple. In all other cases period will be
* period will be non-zero and equal to wqueue->delay. * non-zero and equal to wqueue->delay.
*/ */
if (period == 0) if (period == 0)

View File

@ -1,7 +1,7 @@
/**************************************************************************** /****************************************************************************
* sched/wqueue/work_signal.c * sched/wqueue/work_signal.c
* *
* Copyright (C) 2014, 2016-2017 Gregory Nutt. All rights reserved. * Copyright (C) 2014, 2016-2018 Gregory Nutt. All rights reserved.
* Author: Gregory Nutt <gnutt@nuttx.org> * Author: Gregory Nutt <gnutt@nuttx.org>
* *
* Redistribution and use in source and binary forms, with or without * Redistribution and use in source and binary forms, with or without
@ -71,46 +71,25 @@
int work_signal(int qid) int work_signal(int qid)
{ {
pid_t pid; FAR struct kwork_wqueue_s *work;
int threads;
int i;
/* Get the process ID of the worker thread */ /* Get the process ID of the worker thread */
#ifdef CONFIG_SCHED_HPWORK #ifdef CONFIG_SCHED_HPWORK
if (qid == HPWORK) if (qid == HPWORK)
{ {
pid = g_hpwork.worker[0].pid; work = (FAR struct kwork_wqueue_s *)&g_hpwork;
threads = CONFIG_SCHED_HPNTHREADS;
} }
else else
#endif #endif
#ifdef CONFIG_SCHED_LPWORK #ifdef CONFIG_SCHED_LPWORK
if (qid == LPWORK) if (qid == LPWORK)
{ {
int i; work = (FAR struct kwork_wqueue_s *)&g_lpwork;
threads = CONFIG_SCHED_LPNTHREADS;
/* Find an IDLE worker thread */
for (i = 0; i < CONFIG_SCHED_LPNTHREADS; i++)
{
/* Is this worker thread busy? */
if (!g_lpwork.worker[i].busy)
{
/* No.. select this thread */
break;
}
}
/* If all of the IDLE threads are busy, then just return successfully */
if (i >= CONFIG_SCHED_LPNTHREADS)
{
return OK;
}
/* Otherwise, signal the first IDLE thread found */
pid = g_lpwork.worker[i].pid;
} }
else else
#endif #endif
@ -118,9 +97,30 @@ int work_signal(int qid)
return -EINVAL; return -EINVAL;
} }
/* Signal the worker thread */ /* Find an IDLE worker thread */
return nxsig_kill(pid, SIGWORK); for (i = 0; i < threads; i++)
{
/* Is this worker thread busy? */
if (!work->worker[i].busy)
{
/* No.. select this thread */
break;
}
}
/* If all of the IDLE threads are busy, then just return successfully */
if (i >= threads)
{
return OK;
}
/* Otherwise, signal the first IDLE thread found */
return nxsig_kill(work->worker[i].pid, SIGWORK);
} }
#endif /* CONFIG_SCHED_WORKQUEUE */ #endif /* CONFIG_SCHED_WORKQUEUE */

View File

@ -1,7 +1,7 @@
/**************************************************************************** /****************************************************************************
* sched/wqueue/wqueue.h * sched/wqueue/wqueue.h
* *
* Copyright (C) 2014, 2016 Gregory Nutt. All rights reserved. * Copyright (C) 2014, 2016, 2018 Gregory Nutt. All rights reserved.
* Author: Gregory Nutt <gnutt@nuttx.org> * Author: Gregory Nutt <gnutt@nuttx.org>
* *
* Redistribution and use in source and binary forms, with or without * Redistribution and use in source and binary forms, with or without
@ -88,11 +88,14 @@ struct hp_wqueue_s
{ {
clock_t delay; /* Delay between polling cycles (ticks) */ clock_t delay; /* Delay between polling cycles (ticks) */
struct dq_queue_s q; /* The queue of pending work */ struct dq_queue_s q; /* The queue of pending work */
struct kworker_s worker[1]; /* Describes the single high priority worker */
/* Describes each thread in the high priority queue's thread pool */
struct kworker_s worker[CONFIG_SCHED_HPNTHREADS];
}; };
#endif #endif
/* This structure defines the state of one high-priority work queue. This /* This structure defines the state of one low-priority work queue. This
* structure must be cast compatible with kwork_wqueue_s * structure must be cast compatible with kwork_wqueue_s
*/ */