revise threadpool comments and docs

This commit is contained in:
John Cupitt 2021-05-11 10:23:05 +01:00
parent e00ca4e6b7
commit eef3f9568d

View File

@ -56,9 +56,9 @@
*/ */
/* /*
*/
#define VIPS_DEBUG #define VIPS_DEBUG
#define VIPS_DEBUG_RED #define VIPS_DEBUG_RED
*/
#ifdef HAVE_CONFIG_H #ifdef HAVE_CONFIG_H
#include <config.h> #include <config.h>
@ -104,8 +104,8 @@
* can be used to give feedback. * can be used to give feedback.
*/ */
/* Maximum number of concurrent threads we allow. No reason for the limit, /* Maximum number of concurrent threads we allow. It prevents huge values of
* it's just there to stop mad values for VIPS_CONCURRENCY killing the system. * VIPS_CONCURRENCY killing the system.
*/ */
#define MAX_THREADS (1024) #define MAX_THREADS (1024)
@ -120,7 +120,8 @@ int vips__thinstrip_height = VIPS__THINSTRIP_HEIGHT;
*/ */
int vips__concurrency = 0; int vips__concurrency = 0;
/* Set this GPrivate to indicate that this is a vips worker. /* Set this GPrivate to indicate that this thread is a worker inside
* the vips threadpool.
*/ */
static GPrivate *is_worker_key = NULL; static GPrivate *is_worker_key = NULL;
@ -213,6 +214,9 @@ vips_thread_main_loop( gpointer thread_data, gpointer pool_data )
g_free( exec ); g_free( exec );
/* Free all thread-private caches, since they probably won't be valid
* for the next task this thread is given.
*/
vips_thread_shutdown(); vips_thread_shutdown();
} }
@ -318,13 +322,14 @@ vips__concurrency_get_default( void )
else else
nthr = get_num_processors(); nthr = get_num_processors();
if( nthr < 1 || nthr > MAX_THREADS ) { if( nthr < 1 ||
nthr > MAX_THREADS ) {
nthr = VIPS_CLIP( 1, nthr, MAX_THREADS ); nthr = VIPS_CLIP( 1, nthr, MAX_THREADS );
g_warning( _( "threads clipped to %d" ), nthr ); g_warning( _( "threads clipped to %d" ), nthr );
} }
return nthr; return( nthr );
} }
/** /**
@ -371,7 +376,7 @@ vips_concurrency_set( int concurrency )
* If vips_concurrency_set() has not been called and no command-line argument * If vips_concurrency_set() has not been called and no command-line argument
* was used, vips uses the value of the environment variable VIPS_CONCURRENCY, * was used, vips uses the value of the environment variable VIPS_CONCURRENCY,
* *
* If VIPS_CONCURRENCY has not been set, vips find the number of hardware * If VIPS_CONCURRENCY has not been set, vips finds the number of hardware
* threads that the host machine can run in parallel and uses that value. * threads that the host machine can run in parallel and uses that value.
* *
* The final value is clipped to the range 1 - 1024. * The final value is clipped to the range 1 - 1024.
@ -386,6 +391,10 @@ vips_concurrency_get( void )
return( vips__concurrency ); return( vips__concurrency );
} }
/* The VipsThreadStartFn arg to vips_threadpool_run() is called once for each
* thread to make one of these things to hold the thread state.
*/
G_DEFINE_TYPE( VipsThreadState, vips_thread_state, VIPS_TYPE_OBJECT ); G_DEFINE_TYPE( VipsThreadState, vips_thread_state, VIPS_TYPE_OBJECT );
static void static void
@ -441,7 +450,7 @@ vips_thread_state_set( VipsObject *object, void *a, void *b )
VipsThreadState *state = (VipsThreadState *) object; VipsThreadState *state = (VipsThreadState *) object;
VipsImage *im = (VipsImage *) a; VipsImage *im = (VipsImage *) a;
VIPS_DEBUG_MSG( "vips_thread_state_set:\n" ); VIPS_DEBUG_MSG( "vips_thread_state_set: image %p\n", im );
state->im = im; state->im = im;
state->a = b; state->a = b;
@ -452,13 +461,13 @@ vips_thread_state_set( VipsObject *object, void *a, void *b )
VipsThreadState * VipsThreadState *
vips_thread_state_new( VipsImage *im, void *a ) vips_thread_state_new( VipsImage *im, void *a )
{ {
VIPS_DEBUG_MSG( "vips_thread_state_new:\n" ); VIPS_DEBUG_MSG( "vips_thread_state_new: image %p\n", im );
return( VIPS_THREAD_STATE( vips_object_new( return( VIPS_THREAD_STATE( vips_object_new(
VIPS_TYPE_THREAD_STATE, vips_thread_state_set, im, a ) ) ); VIPS_TYPE_THREAD_STATE, vips_thread_state_set, im, a ) ) );
} }
/* What we track for a task within the #VipsThreadPool. /* A VipsTask is the state of one call to vips_threadpool_run().
*/ */
typedef struct _VipsTask { typedef struct _VipsTask {
/* All private. /* All private.
@ -493,8 +502,7 @@ typedef struct _VipsTask {
gboolean stop; gboolean stop;
} VipsTask; } VipsTask;
/* Run this once per main loop. Get some work (single-threaded), then do it /* Allocate some work (single-threaded), then do it (many-threaded).
* (many-threaded).
* *
* The very first workunit is also executed single-threaded. This gives * The very first workunit is also executed single-threaded. This gives
* loaders a change to seek to the correct spot, see vips_sequential(). * loaders a change to seek to the correct spot, see vips_sequential().
@ -511,7 +519,7 @@ vips_task_work_unit( VipsTask *task, VipsThreadState *state )
VIPS_GATE_STOP( "vips_task_work_unit: wait" ); VIPS_GATE_STOP( "vips_task_work_unit: wait" );
/* Has another worker signaled stop while we've been working? /* Has another worker signaled stop while we've been waiting?
*/ */
if( task->stop ) { if( task->stop ) {
g_mutex_unlock( task->allocate_lock ); g_mutex_unlock( task->allocate_lock );
@ -550,7 +558,7 @@ vips_task_work_unit( VipsTask *task, VipsThreadState *state )
task->error = TRUE; task->error = TRUE;
} }
/* What runs as a thread ... loop, waiting to be told to do stuff. /* What runs as a pipeline thread ... loop, waiting to be told to do stuff.
*/ */
static void static void
vips_task_run( gpointer data, gpointer user_data ) vips_task_run( gpointer data, gpointer user_data )
@ -594,7 +602,8 @@ vips__threadpool_shutdown( void )
/* We may come here without having inited. /* We may come here without having inited.
*/ */
if( vips__pool ) { if( vips__pool ) {
VIPS_DEBUG_MSG( "vips__threadpool_shutdown: (%p)\n", vips__pool ); VIPS_DEBUG_MSG( "vips__threadpool_shutdown: (%p)\n",
vips__pool );
g_thread_pool_free( vips__pool, TRUE, TRUE ); g_thread_pool_free( vips__pool, TRUE, TRUE );
vips__pool = NULL; vips__pool = NULL;
@ -610,8 +619,6 @@ vips_task_new( VipsImage *im, int *n_tasks )
gint64 n_tiles; gint64 n_tiles;
int n_lines; int n_lines;
/* Allocate and init a new task.
*/
if( !(task = VIPS_NEW( NULL, VipsTask )) ) if( !(task = VIPS_NEW( NULL, VipsTask )) )
return( NULL ); return( NULL );
task->im = im; task->im = im;
@ -632,7 +639,7 @@ vips_task_new( VipsImage *im, int *n_tasks )
vips_get_tile_size( im, &tile_width, &tile_height, &n_lines ); vips_get_tile_size( im, &tile_width, &tile_height, &n_lines );
n_tiles = (1 + (gint64) im->Xsize / tile_width) * n_tiles = (1 + (gint64) im->Xsize / tile_width) *
(1 + (gint64) im->Ysize / tile_height); (1 + (gint64) im->Ysize / tile_height);
n_tiles = VIPS_MAX( 0, n_tiles ); n_tiles = VIPS_MAX( 1, n_tiles );
*n_tasks = VIPS_MIN( *n_tasks, n_tiles ); *n_tasks = VIPS_MIN( *n_tasks, n_tiles );
VIPS_DEBUG_MSG( "vips_task_new: \"%s\" (%p), with %d tasks\n", VIPS_DEBUG_MSG( "vips_task_new: \"%s\" (%p), with %d tasks\n",
@ -654,21 +661,15 @@ vips_task_free( VipsTask *task )
} }
/** /**
* vips_threadpool_push. * vips_threadpool_push:
* @name an name for the thread * @name an name for the thread
* @func a function to execute in the thread pool * @func a function to execute in the thread pool
* @data an argument to supply to @func * @data an argument to supply to @func
* *
* Inserts a new function into the list of tasks to be executed by the * Add a new function to the list of tasks to be executed by the
* #VipsThreadPool. A newly created or reused thread will execute * #VipsThreadPool. A newly created or reused thread will execute
* @func with with the argument data. * @func with with the argument data.
* *
* When the number of currently running threads is lower than the
* maximal allowed number of threads set by vips_concurrency_set(),
* a new thread is started (or reused).
* Otherwise, it stays in the queue until a thread in this pool
* finishes its previous task and processes the @func.
*
* See also: vips_concurrency_set(). * See also: vips_concurrency_set().
* *
* Returns: 0 on success, -1 on error. * Returns: 0 on success, -1 on error.
@ -691,6 +692,9 @@ vips_threadpool_push( const char *name, GFunc func, gpointer data )
return( -1 ); return( -1 );
} }
VIPS_DEBUG_MSG( "vips_threadpool_push: %u threads in pool\n",
g_thread_pool_get_num_threads( vips__pool ) );
return( result ? 0 : -1 ); return( result ? 0 : -1 );
} }
@ -818,7 +822,7 @@ vips_threadpool_run( VipsImage *im,
task->work = work; task->work = work;
task->a = a; task->a = a;
/* Put the tasks to work. /* Create a set of workers for this pipeline.
*/ */
for( i = 0; i < n_tasks; i++ ) for( i = 0; i < n_tasks; i++ )
if( vips_threadpool_push( "worker", vips_task_run, task ) ) if( vips_threadpool_push( "worker", vips_task_run, task ) )
@ -859,7 +863,7 @@ vips_threadpool_run( VipsImage *im,
return( result ); return( result );
} }
/* Create a new threadpool. This is called during vips_init. /* Create the vips threadpool. This is called during vips_init.
*/ */
void void
vips__threadpool_init( void ) vips__threadpool_init( void )
@ -874,8 +878,14 @@ vips__threadpool_init( void )
if( vips__concurrency == 0 ) if( vips__concurrency == 0 )
vips__concurrency = vips__concurrency_get_default(); vips__concurrency = vips__concurrency_get_default();
/* We can have many more than vips__concurrency threads -- each active
* pipeline will make vips__concurrency more, see
* vips_threadpool_run().
*/
vips__pool = g_thread_pool_new( vips_thread_main_loop, NULL, vips__pool = g_thread_pool_new( vips_thread_main_loop, NULL,
-1, FALSE, NULL ); -1, FALSE, NULL );
VIPS_DEBUG_MSG( "vips__threadpool_init: (%p)\n", vips__pool );
} }
/** /**