From eef3f9568d4aa1768603536c18bbfc90491bbdd4 Mon Sep 17 00:00:00 2001 From: John Cupitt Date: Tue, 11 May 2021 10:23:05 +0100 Subject: [PATCH] revise threadpool comments and docs --- libvips/iofuncs/threadpool.c | 66 +++++++++++++++++++++--------------- 1 file changed, 38 insertions(+), 28 deletions(-) diff --git a/libvips/iofuncs/threadpool.c b/libvips/iofuncs/threadpool.c index 1e63ceb2..ad57ec13 100644 --- a/libvips/iofuncs/threadpool.c +++ b/libvips/iofuncs/threadpool.c @@ -56,9 +56,9 @@ */ /* + */ #define VIPS_DEBUG #define VIPS_DEBUG_RED - */ #ifdef HAVE_CONFIG_H #include @@ -104,8 +104,8 @@ * can be used to give feedback. */ -/* Maximum number of concurrent threads we allow. No reason for the limit, - * it's just there to stop mad values for VIPS_CONCURRENCY killing the system. +/* Maximum number of concurrent threads we allow. It prevents huge values of + * VIPS_CONCURRENCY killing the system. */ #define MAX_THREADS (1024) @@ -120,7 +120,8 @@ int vips__thinstrip_height = VIPS__THINSTRIP_HEIGHT; */ int vips__concurrency = 0; -/* Set this GPrivate to indicate that this is a vips worker. +/* Set this GPrivate to indicate that this thread is a worker inside + * the vips threadpool. */ static GPrivate *is_worker_key = NULL; @@ -213,6 +214,9 @@ vips_thread_main_loop( gpointer thread_data, gpointer pool_data ) g_free( exec ); + /* Free all thread-private caches, since they probably won't be valid + * for the next task this thread is given. + */ vips_thread_shutdown(); } @@ -318,13 +322,14 @@ vips__concurrency_get_default( void ) else nthr = get_num_processors(); - if( nthr < 1 || nthr > MAX_THREADS ) { + if( nthr < 1 || + nthr > MAX_THREADS ) { nthr = VIPS_CLIP( 1, nthr, MAX_THREADS ); g_warning( _( "threads clipped to %d" ), nthr ); } - return nthr; + return( nthr ); } /** @@ -371,7 +376,7 @@ vips_concurrency_set( int concurrency ) * If vips_concurrency_set() has not been called and no command-line argument * was used, vips uses the value of the environment variable VIPS_CONCURRENCY, * - * If VIPS_CONCURRENCY has not been set, vips find the number of hardware + * If VIPS_CONCURRENCY has not been set, vips finds the number of hardware * threads that the host machine can run in parallel and uses that value. * * The final value is clipped to the range 1 - 1024. @@ -386,6 +391,10 @@ vips_concurrency_get( void ) return( vips__concurrency ); } +/* The VipsThreadStartFn arg to vips_threadpool_run() is called once for each + * thread to make one of these things to hold the thread state. + */ + G_DEFINE_TYPE( VipsThreadState, vips_thread_state, VIPS_TYPE_OBJECT ); static void @@ -441,7 +450,7 @@ vips_thread_state_set( VipsObject *object, void *a, void *b ) VipsThreadState *state = (VipsThreadState *) object; VipsImage *im = (VipsImage *) a; - VIPS_DEBUG_MSG( "vips_thread_state_set:\n" ); + VIPS_DEBUG_MSG( "vips_thread_state_set: image %p\n", im ); state->im = im; state->a = b; @@ -452,13 +461,13 @@ vips_thread_state_set( VipsObject *object, void *a, void *b ) VipsThreadState * vips_thread_state_new( VipsImage *im, void *a ) { - VIPS_DEBUG_MSG( "vips_thread_state_new:\n" ); + VIPS_DEBUG_MSG( "vips_thread_state_new: image %p\n", im ); return( VIPS_THREAD_STATE( vips_object_new( VIPS_TYPE_THREAD_STATE, vips_thread_state_set, im, a ) ) ); } -/* What we track for a task within the #VipsThreadPool. +/* A VipsTask is the state of one call to vips_threadpool_run(). */ typedef struct _VipsTask { /* All private. @@ -493,8 +502,7 @@ typedef struct _VipsTask { gboolean stop; } VipsTask; -/* Run this once per main loop. Get some work (single-threaded), then do it - * (many-threaded). +/* Allocate some work (single-threaded), then do it (many-threaded). * * The very first workunit is also executed single-threaded. This gives * loaders a change to seek to the correct spot, see vips_sequential(). @@ -511,7 +519,7 @@ vips_task_work_unit( VipsTask *task, VipsThreadState *state ) VIPS_GATE_STOP( "vips_task_work_unit: wait" ); - /* Has another worker signaled stop while we've been working? + /* Has another worker signaled stop while we've been waiting? */ if( task->stop ) { g_mutex_unlock( task->allocate_lock ); @@ -550,7 +558,7 @@ vips_task_work_unit( VipsTask *task, VipsThreadState *state ) task->error = TRUE; } -/* What runs as a thread ... loop, waiting to be told to do stuff. +/* What runs as a pipeline thread ... loop, waiting to be told to do stuff. */ static void vips_task_run( gpointer data, gpointer user_data ) @@ -594,7 +602,8 @@ vips__threadpool_shutdown( void ) /* We may come here without having inited. */ if( vips__pool ) { - VIPS_DEBUG_MSG( "vips__threadpool_shutdown: (%p)\n", vips__pool ); + VIPS_DEBUG_MSG( "vips__threadpool_shutdown: (%p)\n", + vips__pool ); g_thread_pool_free( vips__pool, TRUE, TRUE ); vips__pool = NULL; @@ -610,8 +619,6 @@ vips_task_new( VipsImage *im, int *n_tasks ) gint64 n_tiles; int n_lines; - /* Allocate and init a new task. - */ if( !(task = VIPS_NEW( NULL, VipsTask )) ) return( NULL ); task->im = im; @@ -632,7 +639,7 @@ vips_task_new( VipsImage *im, int *n_tasks ) vips_get_tile_size( im, &tile_width, &tile_height, &n_lines ); n_tiles = (1 + (gint64) im->Xsize / tile_width) * (1 + (gint64) im->Ysize / tile_height); - n_tiles = VIPS_MAX( 0, n_tiles ); + n_tiles = VIPS_MAX( 1, n_tiles ); *n_tasks = VIPS_MIN( *n_tasks, n_tiles ); VIPS_DEBUG_MSG( "vips_task_new: \"%s\" (%p), with %d tasks\n", @@ -654,21 +661,15 @@ vips_task_free( VipsTask *task ) } /** - * vips_threadpool_push. + * vips_threadpool_push: * @name an name for the thread * @func a function to execute in the thread pool * @data an argument to supply to @func * - * Inserts a new function into the list of tasks to be executed by the + * Add a new function to the list of tasks to be executed by the * #VipsThreadPool. A newly created or reused thread will execute * @func with with the argument data. * - * When the number of currently running threads is lower than the - * maximal allowed number of threads set by vips_concurrency_set(), - * a new thread is started (or reused). - * Otherwise, it stays in the queue until a thread in this pool - * finishes its previous task and processes the @func. - * * See also: vips_concurrency_set(). * * Returns: 0 on success, -1 on error. @@ -691,6 +692,9 @@ vips_threadpool_push( const char *name, GFunc func, gpointer data ) return( -1 ); } + VIPS_DEBUG_MSG( "vips_threadpool_push: %u threads in pool\n", + g_thread_pool_get_num_threads( vips__pool ) ); + return( result ? 0 : -1 ); } @@ -818,7 +822,7 @@ vips_threadpool_run( VipsImage *im, task->work = work; task->a = a; - /* Put the tasks to work. + /* Create a set of workers for this pipeline. */ for( i = 0; i < n_tasks; i++ ) if( vips_threadpool_push( "worker", vips_task_run, task ) ) @@ -859,7 +863,7 @@ vips_threadpool_run( VipsImage *im, return( result ); } -/* Create a new threadpool. This is called during vips_init. +/* Create the vips threadpool. This is called during vips_init. */ void vips__threadpool_init( void ) @@ -874,8 +878,14 @@ vips__threadpool_init( void ) if( vips__concurrency == 0 ) vips__concurrency = vips__concurrency_get_default(); + /* We can have many more than vips__concurrency threads -- each active + * pipeline will make vips__concurrency more, see + * vips_threadpool_run(). + */ vips__pool = g_thread_pool_new( vips_thread_main_loop, NULL, -1, FALSE, NULL ); + + VIPS_DEBUG_MSG( "vips__threadpool_init: (%p)\n", vips__pool ); } /**