From 80e0cc3d127bdcb2add84bd1fd659d4795cb4f2e Mon Sep 17 00:00:00 2001 From: Kleis Auke Wolthuizen Date: Thu, 18 Mar 2021 16:53:00 +0100 Subject: [PATCH 1/7] Reuse threads by using GLib's threadpool --- libvips/include/vips/internal.h | 1 + libvips/include/vips/threadpool.h | 3 + libvips/iofuncs/init.c | 4 +- libvips/iofuncs/threadpool.c | 539 +++++++++++++++--------------- 4 files changed, 270 insertions(+), 277 deletions(-) diff --git a/libvips/include/vips/internal.h b/libvips/include/vips/internal.h index d3436472..20c41e8d 100644 --- a/libvips/include/vips/internal.h +++ b/libvips/include/vips/internal.h @@ -111,6 +111,7 @@ extern gboolean vips__cache_trace; extern int vips__n_active_threads; void vips__threadpool_init( void ); +void vips__threadpool_shutdown( void ); void vips__cache_init( void ); diff --git a/libvips/include/vips/threadpool.h b/libvips/include/vips/threadpool.h index 8adb97b2..38310526 100644 --- a/libvips/include/vips/threadpool.h +++ b/libvips/include/vips/threadpool.h @@ -5,6 +5,8 @@ * 17/3/10 * - from threadgroup * - rework with a simpler distributed work allocation model + * 02/02/20 kleisauke + * - reuse threads by using GLib's threadpool */ /* @@ -130,6 +132,7 @@ typedef int (*VipsThreadpoolWorkFn)( VipsThreadState *state, void *a ); */ typedef int (*VipsThreadpoolProgressFn)( void *a ); +int vips_threadpool_push( const char *name, GFunc func, gpointer data ); int vips_threadpool_run( VipsImage *im, VipsThreadStartFn start, VipsThreadpoolAllocateFn allocate, diff --git a/libvips/iofuncs/init.c b/libvips/iofuncs/init.c index be4dced3..f9399491 100644 --- a/libvips/iofuncs/init.c +++ b/libvips/iofuncs/init.c @@ -665,7 +665,7 @@ vips_leak( void ) * * This function needs to be called when a thread that has been using vips * exits. It is called for you by vips_shutdown() and for any threads created - * by vips_g_thread_new(). + * within the #VipsThreadPool. * * You will need to call it from threads created in * other ways or there will be memory leaks. If you do not call it, vips @@ -719,6 +719,8 @@ vips_shutdown( void ) vips__thread_profile_stop(); + vips__threadpool_shutdown(); + #ifdef HAVE_GSF gsf_shutdown(); #endif /*HAVE_GSF*/ diff --git a/libvips/iofuncs/threadpool.c b/libvips/iofuncs/threadpool.c index fccc3adb..93f13d47 100644 --- a/libvips/iofuncs/threadpool.c +++ b/libvips/iofuncs/threadpool.c @@ -23,6 +23,8 @@ * - don't depend on image width when setting n_lines * 27/2/19 jtorresfabra * - free threadpool earlier + * 02/02/20 kleisauke + * - reuse threads by using GLib's threadpool */ /* @@ -87,6 +89,15 @@ * @include: vips/vips.h * @title: VipsThreadpool * + * A threadpool which allows reusing already started threads. Implementing + * this can be tedious and error-prone. Therefore we use the GLib + * provided threadpool for our convenience. An added advantage is, that + * the threads can be shared between the different subsystems, when they + * are using GLib. + * + * The threadpool is created during vips_init() and is destroyed by + * vips_shutdown(). + * * vips_threadpool_run() loops a set of threads over an image. Threads take it * in turns to allocate units of work (a unit might be a tile in an image), * then run in parallel to process those units. An optional progress function @@ -121,6 +132,10 @@ static GPrivate *is_worker_key = NULL; */ static gboolean vips__stall = FALSE; +/* The thread pool we'll use. + */ +static GThreadPool *vips__pool = NULL; + /* Glib 2.32 revised the thread API. We need some compat functions. */ @@ -273,23 +288,38 @@ vips_g_thread_join( GThread *thread ) return( result ); } -/** - * vips_concurrency_set: - * @concurrency: number of threads to run - * - * Sets the number of worker threads that vips should use when running a - * #VipsThreadPool. - * - * The special value 0 means "default". In this case, the number of threads is - * set by the environment variable VIPS_CONCURRENCY, or if that is not set, the - * number of threads availble on the host machine. - * - * See also: vips_concurrency_get(). - */ -void -vips_concurrency_set( int concurrency ) +typedef struct { + /* An name for this thread. + */ + const char *name; + + /* The function to execute within the #VipsThreadPool. + */ + GFunc func; + + /* User data that is handed over to func when it is called. + */ + gpointer data; +} VipsThreadExec; + +static void +vips_thread_main_loop( gpointer thread_data, gpointer pool_data ) { - vips__concurrency = concurrency; + VipsThreadExec *exec = (VipsThreadExec *) thread_data; + + /* Set this to something (anything) to tag this thread as a vips + * worker. + */ + g_private_set( is_worker_key, thread_data ); + + if( vips__thread_profile ) + vips__thread_profile_attach( exec->name ); + + exec->func( exec->data, pool_data ); + + g_free( exec ); + + vips_thread_shutdown(); } static int @@ -371,6 +401,67 @@ get_num_processors( void ) #endif /*!GLIB_CHECK_VERSION( 2, 48, 1 )*/ } +/* The default concurrency, set by the environment variable VIPS_CONCURRENCY, + * or if that is not set, the number of threads available on the host machine. + */ +static int +vips__concurrency_get_default( void ) +{ + const char *str; + int nthr; + int x; + + /* Tell the threads system how much concurrency we expect. + */ + if( vips__concurrency > 0 ) + nthr = vips__concurrency; + else if( ((str = g_getenv( "VIPS_CONCURRENCY" )) +#if ENABLE_DEPRECATED + || (str = g_getenv( "IM_CONCURRENCY" )) +#endif + ) && (x = atoi( str )) > 0 ) + nthr = x; + else + nthr = get_num_processors(); + + if( nthr < 1 || nthr > MAX_THREADS ) { + nthr = VIPS_CLIP( 1, nthr, MAX_THREADS ); + + g_warning( _( "threads clipped to %d" ), nthr ); + } + + return nthr; +} + +/** + * vips_concurrency_set: + * @concurrency: number of threads to run + * + * Sets the number of worker threads that vips should use when running a + * #VipsThreadPool. + * + * The special value 0 means "default". In this case, the number of threads is + * set by the environment variable VIPS_CONCURRENCY, or if that is not set, the + * number of threads available on the host machine. + * + * See also: vips_concurrency_get(). + */ +void +vips_concurrency_set( int concurrency ) +{ + /* Tell the threads system how much concurrency we expect. + */ + if( concurrency < 1 ) + concurrency = vips__concurrency_get_default(); + else if( concurrency > MAX_THREADS ) { + concurrency = MAX_THREADS; + + g_warning( _( "threads clipped to %d" ), MAX_THREADS ); + } + + vips__concurrency = concurrency; +} + /** * vips_concurrency_get: * @@ -398,34 +489,7 @@ get_num_processors( void ) int vips_concurrency_get( void ) { - const char *str; - int nthr; - int x; - - /* Tell the threads system how much concurrency we expect. - */ - if( vips__concurrency > 0 ) - nthr = vips__concurrency; - else if( ((str = g_getenv( "VIPS_CONCURRENCY" )) -#if ENABLE_DEPRECATED - || (str = g_getenv( "IM_CONCURRENCY" )) -#endif - ) && (x = atoi( str )) > 0 ) - nthr = x; - else - nthr = get_num_processors(); - - if( nthr < 1 || nthr > MAX_THREADS ) { - nthr = VIPS_CLIP( 1, nthr, MAX_THREADS ); - - g_warning( _( "threads clipped to %d" ), nthr ); - } - - /* Save for next time around. - */ - vips_concurrency_set( nthr ); - - return( nthr ); + return( vips__concurrency ); } G_DEFINE_TYPE( VipsThreadState, vips_thread_state, VIPS_TYPE_OBJECT ); @@ -500,41 +564,17 @@ vips_thread_state_new( VipsImage *im, void *a ) VIPS_TYPE_THREAD_STATE, vips_thread_state_set, im, a ) ) ); } -/* What we track for each thread in the pool. +/* What we track for a task within the #VipsThreadPool. */ -typedef struct { - /* All private. - */ - /*< private >*/ - struct _VipsThreadpool *pool; /* Pool we are part of */ - - VipsThreadState *state; - - /* Thread we are running. - */ - GThread *thread; - - /* Set this to ask the thread to exit. - */ - gboolean exit; - - /* Set by the thread if work or allocate return an error. - */ - gboolean error; - -} VipsThread; - -/* What we track for a group of threads working together. - */ -typedef struct _VipsThreadpool { +typedef struct _VipsTask { /* All private. */ /*< private >*/ VipsImage *im; /* Image we are calculating */ - /* Start a thread, do a unit of work (runs in parallel) and allocate - * a unit of work (serial). Plus the mutex we use to serialize work - * allocation. + /* Start or reuse a thread, do a unit of work (runs in parallel) + * and allocate a unit of work (serial). Plus the mutex we use to + * serialize work allocation. */ VipsThreadStartFn start; VipsThreadpoolAllocateFn allocate; @@ -542,10 +582,7 @@ typedef struct _VipsThreadpool { GMutex *allocate_lock; void *a; /* User argument to start / allocate / etc. */ - int nthr; /* Number of threads in pool */ - VipsThread **thr; /* Threads */ - - /* The caller blocks here until all threads finish. + /* The caller blocks here until all tasks finish. */ VipsSemaphore finish; @@ -560,46 +597,7 @@ typedef struct _VipsThreadpool { /* Set by Allocate (via an arg) to indicate normal end of computation. */ gboolean stop; -} VipsThreadpool; - -/* Junk a thread. - */ -static void -vips_thread_free( VipsThread *thr ) -{ - /* Is there a thread running this region? Kill it! - */ - if( thr->thread ) { - thr->exit = 1; - - /* Return value is always NULL (see thread_main_loop). - */ - (void) vips_g_thread_join( thr->thread ); - thr->thread = NULL; - } - - VIPS_FREEF( g_object_unref, thr->state ); - thr->pool = NULL; - - VIPS_FREE( thr ); -} - -static int -vips_thread_allocate( VipsThread *thr ) -{ - VipsThreadpool *pool = thr->pool; - - g_assert( !pool->stop ); - - if( !thr->state && - !(thr->state = pool->start( pool->im, pool->a )) ) - return( -1 ); - - if( pool->allocate( thr->state, pool->a, &pool->stop ) ) - return( -1 ); - - return( 0 ); -} +} VipsTask; /* Run this once per main loop. Get some work (single-threaded), then do it * (many-threaded). @@ -608,221 +606,203 @@ vips_thread_allocate( VipsThread *thr ) * loaders a change to seek to the correct spot, see vips_sequential(). */ static void -vips_thread_work_unit( VipsThread *thr ) +vips_task_work_unit( VipsTask *task, VipsThreadState *state ) { - VipsThreadpool *pool = thr->pool; - - if( thr->error ) + if( task->error ) return; - VIPS_GATE_START( "vips_thread_work_unit: wait" ); + VIPS_GATE_START( "vips_task_work_unit: wait" ); - g_mutex_lock( pool->allocate_lock ); + g_mutex_lock( task->allocate_lock ); - VIPS_GATE_STOP( "vips_thread_work_unit: wait" ); + VIPS_GATE_STOP( "vips_task_work_unit: wait" ); /* Has another worker signaled stop while we've been working? */ - if( pool->stop ) { - g_mutex_unlock( pool->allocate_lock ); + if( task->stop ) { + g_mutex_unlock( task->allocate_lock ); return; } - if( vips_thread_allocate( thr ) ) { - thr->error = TRUE; - pool->error = TRUE; - g_mutex_unlock( pool->allocate_lock ); + if( task->allocate( state, task->a, &task->stop ) ) { + task->error = TRUE; + g_mutex_unlock( task->allocate_lock ); return; } /* Have we just signalled stop? */ - if( pool->stop ) { - g_mutex_unlock( pool->allocate_lock ); + if( task->stop ) { + g_mutex_unlock( task->allocate_lock ); return; } - g_mutex_unlock( pool->allocate_lock ); + g_mutex_unlock( task->allocate_lock ); - if( thr->state->stall && + if( state->stall && vips__stall ) { /* Sleep for 0.5s. Handy for stressing the seq system. Stall * is set by allocate funcs in various places. */ g_usleep( 500000 ); - thr->state->stall = FALSE; - printf( "vips_thread_work_unit: " - "stall done, releasing y = %d ...\n", thr->state->y ); + state->stall = FALSE; + printf( "vips_task_work_unit: " + "stall done, releasing y = %d ...\n", state->y ); } /* Process a work unit. */ - if( pool->work( thr->state, pool->a ) ) { - thr->error = TRUE; - pool->error = TRUE; - } + if( task->work( state, task->a ) ) + task->error = TRUE; } /* What runs as a thread ... loop, waiting to be told to do stuff. */ -static void * -vips_thread_main_loop( void *a ) +static void +vips_task_run( gpointer data, gpointer user_data ) { - VipsThread *thr = (VipsThread *) a; - VipsThreadpool *pool = thr->pool; + VipsTask *task = (VipsTask *) data; + VipsThreadState *state; - g_assert( pool == thr->pool ); + VIPS_GATE_START( "vips_task_run: thread" ); - VIPS_GATE_START( "vips_thread_main_loop: thread" ); + // TODO: Could we move this to vips_task_work_unit()? + g_mutex_lock( task->allocate_lock ); + if( !(state = task->start( task->im, task->a )) ) + task->error = TRUE; + g_mutex_unlock( task->allocate_lock ); /* Process work units! Always tick, even if we are stopping, so the * main thread will wake up for exit. */ for(;;) { - VIPS_GATE_START( "vips_thread_work_unit: u" ); - vips_thread_work_unit( thr ); - VIPS_GATE_STOP( "vips_thread_work_unit: u" ); - vips_semaphore_up( &pool->tick ); + VIPS_GATE_START( "vips_task_work_unit: u" ); + vips_task_work_unit( task, state ); + VIPS_GATE_STOP( "vips_task_work_unit: u" ); + vips_semaphore_up( &task->tick ); - if( pool->stop || - pool->error ) + if( task->stop || + task->error ) break; } + g_mutex_lock( task->allocate_lock ); + VIPS_FREEF( g_object_unref, state ); + g_mutex_unlock( task->allocate_lock ); + /* We are exiting: tell the main thread. */ - vips_semaphore_up( &pool->finish ); + vips_semaphore_up( &task->finish ); - VIPS_GATE_STOP( "vips_thread_main_loop: thread" ); - - return( NULL ); + VIPS_GATE_STOP( "vips_task_run: thread" ); } -/* Attach another thread to a threadpool. +/* Called from vips_shutdown(). */ -static VipsThread * -vips_thread_new( VipsThreadpool *pool ) +void +vips__threadpool_shutdown( void ) { - VipsThread *thr; - - if( !(thr = VIPS_NEW( NULL, VipsThread )) ) - return( NULL ); - thr->pool = pool; - thr->state = NULL; - thr->thread = NULL; - thr->exit = 0; - thr->error = 0; - - /* We can't build the state here, it has to be done by the worker - * itself the first time that allocate runs so that any regions are - * owned by the correct thread. + /* We may come here without having inited. */ + if( vips__pool ) { + VIPS_DEBUG_MSG( "vips__threadpool_shutdown: (%p)\n", vips__pool ); - if( !(thr->thread = vips_g_thread_new( "worker", - vips_thread_main_loop, thr )) ) { - vips_thread_free( thr ); - return( NULL ); - } - - return( thr ); -} - -/* Kill all threads in a threadpool, if there are any. Can be called multiple - * times. - */ -static void -vips_threadpool_kill_threads( VipsThreadpool *pool ) -{ - if( pool->thr ) { - int i; - - for( i = 0; i < pool->nthr; i++ ) - VIPS_FREEF( vips_thread_free, pool->thr[i] ); - - VIPS_DEBUG_MSG( "vips_threadpool_kill_threads: " - "killed %d threads\n", pool->nthr ); + g_thread_pool_free( vips__pool, TRUE, TRUE ); + vips__pool = NULL; } } -static void -vips_threadpool_free( VipsThreadpool *pool ) +static VipsTask * +vips_task_new( VipsImage *im, int *n_tasks ) { - VIPS_DEBUG_MSG( "vips_threadpool_free: \"%s\" (%p)\n", - pool->im->filename, pool ); - - vips_threadpool_kill_threads( pool ); - VIPS_FREEF( vips_g_mutex_free, pool->allocate_lock ); - vips_semaphore_destroy( &pool->finish ); - vips_semaphore_destroy( &pool->tick ); - VIPS_FREE( pool->thr ); - VIPS_FREE( pool ); -} - -static VipsThreadpool * -vips_threadpool_new( VipsImage *im ) -{ - VipsThreadpool *pool; + VipsTask *task; int tile_width; int tile_height; gint64 n_tiles; int n_lines; - /* Allocate and init new thread block. + /* Allocate and init a new task. */ - if( !(pool = VIPS_NEW( NULL, VipsThreadpool )) ) + if( !(task = VIPS_NEW( NULL, VipsTask )) ) return( NULL ); - pool->im = im; - pool->allocate = NULL; - pool->work = NULL; - pool->allocate_lock = vips_g_mutex_new(); - pool->nthr = vips_concurrency_get(); - pool->thr = NULL; - vips_semaphore_init( &pool->finish, 0, "finish" ); - vips_semaphore_init( &pool->tick, 0, "tick" ); - pool->error = FALSE; - pool->stop = FALSE; + task->im = im; + task->allocate = NULL; + task->work = NULL; + task->allocate_lock = vips_g_mutex_new(); + vips_semaphore_init( &task->finish, 0, "finish" ); + vips_semaphore_init( &task->tick, 0, "tick" ); + task->error = FALSE; + task->stop = FALSE; - /* If this is a tiny image, we won't need all nthr threads. Guess how + *n_tasks = vips_concurrency_get(); + + /* If this is a tiny image, we won't need all n_tasks. Guess how * many tiles we might need to cover the image and use that to limit - * the number of threads we create. + * the number of tasks we create. */ vips_get_tile_size( im, &tile_width, &tile_height, &n_lines ); n_tiles = (1 + (gint64) im->Xsize / tile_width) * (1 + (gint64) im->Ysize / tile_height); - n_tiles = VIPS_CLIP( 0, n_tiles, MAX_THREADS ); - pool->nthr = VIPS_MIN( pool->nthr, n_tiles ); + n_tiles = VIPS_MAX( 0, n_tiles ); + *n_tasks = VIPS_MIN( *n_tasks, n_tiles ); - VIPS_DEBUG_MSG( "vips_threadpool_new: \"%s\" (%p), with %d threads\n", - im->filename, pool, pool->nthr ); + VIPS_DEBUG_MSG( "vips_task_new: \"%s\" (%p), with %d tasks\n", + im->filename, task, *n_tasks ); - return( pool ); + return( task ); } -/* Attach a set of threads. - */ -static int -vips_threadpool_create_threads( VipsThreadpool *pool ) +static void +vips_task_free( VipsTask *task ) { - int i; + VIPS_DEBUG_MSG( "vips_task_free: \"%s\" (%p)\n", + task->im->filename, task ); - g_assert( !pool->thr ); + VIPS_FREEF( vips_g_mutex_free, task->allocate_lock ); + vips_semaphore_destroy( &task->finish ); + vips_semaphore_destroy( &task->tick ); + VIPS_FREE( task ); +} - /* Make thread array. - */ - if( !(pool->thr = VIPS_ARRAY( NULL, pool->nthr, VipsThread * )) ) +/** + * vips_threadpool_push. + * @name an name for the thread + * @func a function to execute in the thread pool + * @data an argument to supply to @func + * + * Inserts a new function into the list of tasks to be executed by the + * #VipsThreadPool. A newly created or reused thread will execute + * @func with with the argument data. + * + * When the number of currently running threads is lower than the + * maximal allowed number of threads set by vips_concurrency_set(), + * a new thread is started (or reused). + * Otherwise, it stays in the queue until a thread in this pool + * finishes its previous task and processes the @func. + * + * See also: vips_concurrency_set(). + * + * Returns: 0 on success, -1 on error. + */ +int +vips_threadpool_push( const char *name, GFunc func, gpointer data ) +{ + VipsThreadExec *exec; + GError *error = NULL; + gboolean result; + + exec = g_new( VipsThreadExec, 1 ); + exec->name = name; + exec->func = func; + exec->data = data; + + result = g_thread_pool_push( vips__pool, exec, &error ); + if( error ) { + vips_g_error( &error ); return( -1 ); - for( i = 0; i < pool->nthr; i++ ) - pool->thr[i] = NULL; + } - /* Attach threads and start them working. - */ - for( i = 0; i < pool->nthr; i++ ) - if( !(pool->thr[i] = vips_thread_new( pool )) ) { - vips_threadpool_kill_threads( pool ); - return( -1 ); - } - - return( 0 ); + return( result ? 0 : -1 ); } /** @@ -908,7 +888,8 @@ vips_threadpool_create_threads( VipsThreadpool *pool ) * @progress: give progress feedback about a work unit, or %NULL * @a: client data * - * This function runs a set of threads over an image. Each thread first calls + * This function runs a set of threads over an image. It will use a newly + * created or reused thread within the #VipsThreadPool. Each thread first calls * @start to create new per-thread state, then runs * @allocate to set up a new work unit (perhaps the next tile in an image, for * example), then @work to process that work unit. After each unit is @@ -935,60 +916,61 @@ vips_threadpool_run( VipsImage *im, VipsThreadpoolProgressFn progress, void *a ) { - VipsThreadpool *pool; + VipsTask *task; + int n_tasks; + int i; int result; - if( !(pool = vips_threadpool_new( im )) ) + if( !(task = vips_task_new( im, &n_tasks )) ) return( -1 ); - pool->start = start; - pool->allocate = allocate; - pool->work = work; - pool->a = a; + task->start = start; + task->allocate = allocate; + task->work = work; + task->a = a; - /* Attach workers and set them going. + /* Put the tasks to work. */ - if( vips_threadpool_create_threads( pool ) ) { - vips_threadpool_free( pool ); - return( -1 ); - } + for( i = 0; i < n_tasks; i++ ) + if( vips_threadpool_push( "worker", vips_task_run, task ) ) + return( -1 ); for(;;) { /* Wait for a tick from a worker. */ - vips_semaphore_down( &pool->tick ); + vips_semaphore_down( &task->tick ); VIPS_DEBUG_MSG( "vips_threadpool_run: tick\n" ); - if( pool->stop || - pool->error ) + if( task->stop || + task->error ) break; if( progress && - progress( pool->a ) ) - pool->error = TRUE; + progress( task->a ) ) + task->error = TRUE; - if( pool->stop || - pool->error ) + if( task->stop || + task->error ) break; } /* Wait for them all to hit finish. */ - vips_semaphore_downn( &pool->finish, pool->nthr ); + vips_semaphore_downn( &task->finish, n_tasks ); /* Return 0 for success. */ - result = pool->error ? -1 : 0; + result = task->error ? -1 : 0; - vips_threadpool_free( pool ); + vips_task_free( task ); vips_image_minimise_all( im ); return( result ); } -/* Start up threadpools. This is called during vips_init. +/* Create a new threadpool. This is called during vips_init. */ void vips__threadpool_init( void ) @@ -999,6 +981,12 @@ vips__threadpool_init( void ) if( g_getenv( "VIPS_STALL" ) ) vips__stall = TRUE; + + if( vips__concurrency == 0 ) + vips__concurrency = vips__concurrency_get_default(); + + vips__pool = g_thread_pool_new( vips_thread_main_loop, NULL, + -1, FALSE, NULL ); } /** @@ -1076,4 +1064,3 @@ vips_get_tile_size( VipsImage *im, "groups of %d scanlines\n", *tile_width, *tile_height, *n_lines ); } - From 414404917441dde60ca206524ca25cdfb7ec9780 Mon Sep 17 00:00:00 2001 From: Kleis Auke Wolthuizen Date: Mon, 3 Feb 2020 14:54:01 +0100 Subject: [PATCH 2/7] Remove mutex lock for VipsThreadStartFn vips_{avg,deviate,hough,max,min,stats} are the only arithmetic functions that do not require a mutex on the _start and/or _stop function. All other arithmetic functions still needs this, so move it to sink instead. --- libvips/iofuncs/sink.c | 30 ++++++++++++++++++++++++++++-- libvips/iofuncs/threadpool.c | 16 ++++++---------- 2 files changed, 34 insertions(+), 12 deletions(-) diff --git a/libvips/iofuncs/sink.c b/libvips/iofuncs/sink.c index 17454eda..bd3cb433 100644 --- a/libvips/iofuncs/sink.c +++ b/libvips/iofuncs/sink.c @@ -73,6 +73,10 @@ typedef struct _Sink { */ VipsImage *t; + /* Mutex for serialising calls to VipsStartFn and VipsStopFn. + */ + GMutex *sslock; + /* Call params. */ VipsStartFn start_fn; @@ -249,9 +253,21 @@ static int sink_call_stop( Sink *sink, SinkThreadState *state ) { if( state->seq && sink->stop_fn ) { + int result; + VIPS_DEBUG_MSG( "sink_call_stop: state = %p\n", state ); - if( sink->stop_fn( state->seq, sink->a, sink->b ) ) { + VIPS_GATE_START( "sink_call_stop: wait" ); + + g_mutex_lock( sink->sslock ); + + VIPS_GATE_STOP( "sink_call_stop: wait" ); + + result = sink->stop_fn( state->seq, sink->a, sink->b ); + + g_mutex_unlock( sink->sslock ); + + if( result ) { SinkBase *sink_base = (SinkBase *) sink; vips_error( "vips_sink", @@ -286,7 +302,15 @@ sink_call_start( Sink *sink, SinkThreadState *state ) if( !state->seq && sink->start_fn ) { VIPS_DEBUG_MSG( "sink_call_start: state = %p\n", state ); - state->seq = sink->start_fn( sink->t, sink->a, sink->b ); + VIPS_GATE_START( "sink_call_start: wait" ); + + g_mutex_lock( sink->sslock ); + + VIPS_GATE_STOP( "sink_call_start: wait" ); + + state->seq = sink->start_fn( sink->t, sink->a, sink->b ); + + g_mutex_unlock( sink->sslock ); if( !state->seq ) { SinkBase *sink_base = (SinkBase *) sink; @@ -346,6 +370,7 @@ vips_sink_thread_state_new( VipsImage *im, void *a ) static void sink_free( Sink *sink ) { + VIPS_FREEF( vips_g_mutex_free, sink->sslock ); VIPS_FREEF( sink_area_free, sink->area ); VIPS_FREEF( sink_area_free, sink->old_area ); VIPS_FREEF( g_object_unref, sink->t ); @@ -381,6 +406,7 @@ sink_init( Sink *sink, vips_sink_base_init( &sink->sink_base, image ); sink->t = NULL; + sink->sslock = vips_g_mutex_new(); sink->start_fn = start_fn; sink->generate_fn = generate_fn; sink->stop_fn = stop_fn; diff --git a/libvips/iofuncs/threadpool.c b/libvips/iofuncs/threadpool.c index 93f13d47..26416473 100644 --- a/libvips/iofuncs/threadpool.c +++ b/libvips/iofuncs/threadpool.c @@ -25,6 +25,7 @@ * - free threadpool earlier * 02/02/20 kleisauke * - reuse threads by using GLib's threadpool + * - remove mutex lock for VipsThreadStartFn */ /* @@ -666,11 +667,8 @@ vips_task_run( gpointer data, gpointer user_data ) VIPS_GATE_START( "vips_task_run: thread" ); - // TODO: Could we move this to vips_task_work_unit()? - g_mutex_lock( task->allocate_lock ); if( !(state = task->start( task->im, task->a )) ) task->error = TRUE; - g_mutex_unlock( task->allocate_lock ); /* Process work units! Always tick, even if we are stopping, so the * main thread will wake up for exit. @@ -686,9 +684,7 @@ vips_task_run( gpointer data, gpointer user_data ) break; } - g_mutex_lock( task->allocate_lock ); VIPS_FREEF( g_object_unref, state ); - g_mutex_unlock( task->allocate_lock ); /* We are exiting: tell the main thread. */ @@ -815,8 +811,8 @@ vips_threadpool_push( const char *name, GFunc func, gpointer data ) * is allocated to it to build the per-thread state. Per-thread state is used * by #VipsThreadpoolAllocate and #VipsThreadpoolWork to communicate. * - * #VipsThreadState is a subclass of #VipsObject. Start functions are called - * from allocate, that is, they are single-threaded. + * #VipsThreadState is a subclass of #VipsObject. Start functions can be + * executed concurrently. * * See also: vips_threadpool_run(). * @@ -899,9 +895,9 @@ vips_threadpool_push( const char *name, GFunc func, gpointer data ) * The object returned by @start must be an instance of a subclass of * #VipsThreadState. Use this to communicate between @allocate and @work. * - * @allocate and @start are always single-threaded (so they can write to the - * per-pool state), whereas @work can be executed concurrently. @progress is - * always called by + * @allocate is always single-threaded (so it can write to the + * per-pool state), whereas @start and @work can be executed concurrently. + * @progress is always called by * the main thread (ie. the thread which called vips_threadpool_run()). * * See also: vips_concurrency_set(). From d1b4d12388d29de6f201fd22cc81a0f97c36e10c Mon Sep 17 00:00:00 2001 From: Kleis Auke Wolthuizen Date: Tue, 11 Feb 2020 12:44:33 +0100 Subject: [PATCH 3/7] Move sink_disc and sink_screen threads to threadpool And deprecate the vips_g_thread_new and vips_g_thread_join functions. --- libvips/deprecated/vips7compat.c | 15 +++- libvips/include/vips/internal.h | 2 - libvips/include/vips/thread.h | 5 -- libvips/include/vips/vips7compat.h | 5 ++ libvips/iofuncs/init.c | 6 -- libvips/iofuncs/sink.c | 4 +- libvips/iofuncs/sinkdisc.c | 32 +++++---- libvips/iofuncs/sinkmemory.c | 8 +-- libvips/iofuncs/sinkscreen.c | 53 ++++++++------ libvips/iofuncs/threadpool.c | 110 +---------------------------- 10 files changed, 78 insertions(+), 162 deletions(-) diff --git a/libvips/deprecated/vips7compat.c b/libvips/deprecated/vips7compat.c index 94208559..546111d1 100644 --- a/libvips/deprecated/vips7compat.c +++ b/libvips/deprecated/vips7compat.c @@ -5694,7 +5694,7 @@ vips_get_option_group( void ) } /* We used to use this for system() back in the day. But it's awkward to make - * it work properly on win32, so this is nonw deprecated. + * it work properly on win32, so this is now deprecated. */ FILE * vips_popenf( const char *fmt, const char *mode, ... ) @@ -5703,3 +5703,16 @@ vips_popenf( const char *fmt, const char *mode, ... ) return( NULL ); } +GThread * +vips_g_thread_new( const char *domain, GThreadFunc func, gpointer data ) +{ + vips_error( "vips_g_thread_new", "%s", _( "deprecated" ) ); + return( NULL ); +} + +void * +vips_g_thread_join( GThread *thread ) +{ + vips_error( "vips_g_thread_join", "%s", _( "deprecated" ) ); + return( NULL ); +} diff --git a/libvips/include/vips/internal.h b/libvips/include/vips/internal.h index 20c41e8d..7ce89b50 100644 --- a/libvips/include/vips/internal.h +++ b/libvips/include/vips/internal.h @@ -108,8 +108,6 @@ extern char *vips__disc_threshold; extern gboolean vips__cache_dump; extern gboolean vips__cache_trace; -extern int vips__n_active_threads; - void vips__threadpool_init( void ); void vips__threadpool_shutdown( void ); diff --git a/libvips/include/vips/thread.h b/libvips/include/vips/thread.h index 63ba23c4..ae51b652 100644 --- a/libvips/include/vips/thread.h +++ b/libvips/include/vips/thread.h @@ -47,11 +47,6 @@ void vips_g_mutex_free( GMutex * ); GCond *vips_g_cond_new( void ); void vips_g_cond_free( GCond * ); -/* ... and for GThread. - */ -GThread *vips_g_thread_new( const char *, GThreadFunc, gpointer ); -void *vips_g_thread_join( GThread *thread ); - gboolean vips_thread_isworker( void ); #ifdef __cplusplus diff --git a/libvips/include/vips/vips7compat.h b/libvips/include/vips/vips7compat.h index ac34a710..a591a84c 100644 --- a/libvips/include/vips/vips7compat.h +++ b/libvips/include/vips/vips7compat.h @@ -1224,6 +1224,11 @@ VipsWindow *vips_window_ref( VipsImage *im, int top, int height ); FILE *vips_popenf( const char *fmt, const char *mode, ... ) __attribute__((format(printf, 1, 3))); +/* old GThread API. + */ +GThread *vips_g_thread_new( const char *, GThreadFunc, gpointer ); +void *vips_g_thread_join( GThread *thread ); + /* This stuff is very, very old and should not be used by anyone now. */ #ifdef VIPS_ENABLE_ANCIENT diff --git a/libvips/iofuncs/init.c b/libvips/iofuncs/init.c index f9399491..735c907e 100644 --- a/libvips/iofuncs/init.c +++ b/libvips/iofuncs/init.c @@ -641,12 +641,6 @@ vips_leak( void ) n_leaks += strlen( vips_error_buffer() ); } - if( vips__n_active_threads > 0 ) { - vips_buf_appendf( &buf, "threads: %d not joined\n", - vips__n_active_threads ); - n_leaks += vips__n_active_threads; - } - fprintf( stderr, "%s", vips_buf_all( &buf ) ); n_leaks += vips__print_renders(); diff --git a/libvips/iofuncs/sink.c b/libvips/iofuncs/sink.c index bd3cb433..bd6b10ae 100644 --- a/libvips/iofuncs/sink.c +++ b/libvips/iofuncs/sink.c @@ -171,7 +171,7 @@ sink_area_position( SinkArea *area, int top, int height ) static gboolean sink_area_allocate_fn( VipsThreadState *state, void *a, gboolean *stop ) { - SinkThreadState *wstate = (SinkThreadState *) state; + SinkThreadState *sstate = (SinkThreadState *) state; Sink *sink = (Sink *) a; SinkBase *sink_base = (SinkBase *) sink; @@ -227,7 +227,7 @@ sink_area_allocate_fn( VipsThreadState *state, void *a, gboolean *stop ) /* The thread needs to know which area it's writing to. */ - wstate->area = sink->area; + sstate->area = sink->area; VIPS_DEBUG_MSG( " %p allocated %d x %d:\n", g_thread_self(), state->pos.left, state->pos.top ); diff --git a/libvips/iofuncs/sinkdisc.c b/libvips/iofuncs/sinkdisc.c index 8cd62be6..c9d4267a 100644 --- a/libvips/iofuncs/sinkdisc.c +++ b/libvips/iofuncs/sinkdisc.c @@ -75,8 +75,9 @@ typedef struct _WriteBuffer { VipsSemaphore go; /* Start bg thread loop */ VipsSemaphore nwrite; /* Number of threads writing to region */ VipsSemaphore done; /* Bg thread has done write */ + VipsSemaphore finish; /* Bg thread has finished */ int write_errno; /* Save write errors here */ - GThread *thread; /* BG writer thread */ + gboolean running; /* Whether the bg writer thread is running */ gboolean kill; /* Set to ask thread to exit */ } WriteBuffer; @@ -141,22 +142,22 @@ wbuffer_free( WriteBuffer *wbuffer ) { /* Is there a thread running this region? Kill it! */ - if( wbuffer->thread ) { + if( wbuffer->running ) { wbuffer->kill = TRUE; vips_semaphore_up( &wbuffer->go ); - /* Return value is always NULL (see wbuffer_write_thread). - */ - (void) vips_g_thread_join( wbuffer->thread ); - VIPS_DEBUG_MSG( "wbuffer_free: vips_g_thread_join()\n" ); + vips_semaphore_down( &wbuffer->finish ); - wbuffer->thread = NULL; + VIPS_DEBUG_MSG( "wbuffer_free:\n" ); + + wbuffer->running = FALSE; } VIPS_UNREF( wbuffer->region ); vips_semaphore_destroy( &wbuffer->go ); vips_semaphore_destroy( &wbuffer->nwrite ); vips_semaphore_destroy( &wbuffer->done ); + vips_semaphore_destroy( &wbuffer->finish ); g_free( wbuffer ); } @@ -178,8 +179,8 @@ wbuffer_write( WriteBuffer *wbuffer ) /* Run this as a thread to do a BG write. */ -static void * -wbuffer_write_thread( void *data ) +static void +wbuffer_write_thread( void *data, void *user_data ) { WriteBuffer *wbuffer = (WriteBuffer *) data; @@ -202,7 +203,9 @@ wbuffer_write_thread( void *data ) vips_semaphore_up( &wbuffer->done ); } - return( NULL ); + /* We are exiting: tell the main thread. + */ + vips_semaphore_up( &wbuffer->finish ); } static WriteBuffer * @@ -217,8 +220,9 @@ wbuffer_new( Write *write ) vips_semaphore_init( &wbuffer->go, 0, "go" ); vips_semaphore_init( &wbuffer->nwrite, 0, "nwrite" ); vips_semaphore_init( &wbuffer->done, 0, "done" ); + vips_semaphore_init( &wbuffer->finish, 0, "finish" ); wbuffer->write_errno = 0; - wbuffer->thread = NULL; + wbuffer->running = FALSE; wbuffer->kill = FALSE; if( !(wbuffer->region = vips_region_new( write->sink_base.im )) ) { @@ -232,12 +236,14 @@ wbuffer_new( Write *write ) /* Make this last (picks up parts of wbuffer on startup). */ - if( !(wbuffer->thread = vips_g_thread_new( "wbuffer", - wbuffer_write_thread, wbuffer )) ) { + if( vips_threadpool_push( "wbuffer", wbuffer_write_thread, + wbuffer ) ) { wbuffer_free( wbuffer ); return( NULL ); } + wbuffer->running = TRUE; + return( wbuffer ); } diff --git a/libvips/iofuncs/sinkmemory.c b/libvips/iofuncs/sinkmemory.c index 0441312f..8a1dff52 100644 --- a/libvips/iofuncs/sinkmemory.c +++ b/libvips/iofuncs/sinkmemory.c @@ -173,7 +173,7 @@ sink_memory_area_position( SinkMemoryArea *area, int top, int height ) static gboolean sink_memory_area_allocate_fn( VipsThreadState *state, void *a, gboolean *stop ) { - SinkMemoryThreadState *wstate = (SinkMemoryThreadState *) state; + SinkMemoryThreadState *smstate = (SinkMemoryThreadState *) state; SinkMemory *memory = (SinkMemory *) a; SinkBase *sink_base = (SinkBase *) memory; @@ -229,7 +229,7 @@ sink_memory_area_allocate_fn( VipsThreadState *state, void *a, gboolean *stop ) /* The thread needs to know which area it's writing to. */ - wstate->area = memory->area; + smstate->area = memory->area; VIPS_DEBUG_MSG( " %p allocated %d x %d:\n", g_thread_self(), state->pos.left, state->pos.top ); @@ -255,8 +255,8 @@ static int sink_memory_area_work_fn( VipsThreadState *state, void *a ) { SinkMemory *memory = (SinkMemory *) a; - SinkMemoryThreadState *wstate = (SinkMemoryThreadState *) state; - SinkMemoryArea *area = wstate->area; + SinkMemoryThreadState *smstate = (SinkMemoryThreadState *) state; + SinkMemoryArea *area = smstate->area; int result; diff --git a/libvips/iofuncs/sinkscreen.c b/libvips/iofuncs/sinkscreen.c index c6e5ead1..b208b9f1 100644 --- a/libvips/iofuncs/sinkscreen.c +++ b/libvips/iofuncs/sinkscreen.c @@ -162,22 +162,29 @@ typedef struct _RenderThreadStateClass { G_DEFINE_TYPE( RenderThreadState, render_thread_state, VIPS_TYPE_THREAD_STATE ); -/* The BG thread which sits waiting to do some calculations, and the semaphore - * it waits on holding the number of renders with dirty tiles. +/* A boolean indicating if the bg render thread is running. */ -static GThread *render_thread = NULL; +static gboolean render_running = FALSE; /* Set this to ask the render thread to quit. */ static gboolean render_kill = FALSE; -/* All the renders with dirty tiles, and a semaphore that the bg render thread - * waits on. +/* All the renders with dirty tiles. */ static GMutex *render_dirty_lock = NULL; static GSList *render_dirty_all = NULL; + +/* A semaphore where the bg render thread waits on holding the number of + * renders with dirty tiles + */ static VipsSemaphore n_render_dirty_sem; +/* A semaphore where the main thread waits for when the bg render thread + * is shutdown. + */ +static VipsSemaphore render_finish; + /* Set this to make the bg thread stop and reschedule. */ static gboolean render_reschedule = FALSE; @@ -435,10 +442,7 @@ vips__render_shutdown( void ) if( render_dirty_lock ) { g_mutex_lock( render_dirty_lock ); - if( render_thread ) { - GThread *thread; - - thread = render_thread; + if( render_running ) { render_reschedule = TRUE; render_kill = TRUE; @@ -446,13 +450,16 @@ vips__render_shutdown( void ) vips_semaphore_up( &n_render_dirty_sem ); - (void) vips_g_thread_join( thread ); + vips_semaphore_down( &render_finish ); + + render_running = FALSE; } else g_mutex_unlock( render_dirty_lock ); VIPS_FREEF( vips_g_mutex_free, render_dirty_lock ); vips_semaphore_destroy( &n_render_dirty_sem ); + vips_semaphore_destroy( &render_finish ); } } @@ -985,10 +992,10 @@ render_dirty_get( void ) return( render ); } -/* Loop for the background render mananger thread. +/* Loop for the background render manager thread. */ -static void * -render_thread_main( void *client ) +static void +render_thread_main( void *data, void *user_data ) { Render *render; @@ -1023,23 +1030,29 @@ render_thread_main( void *client ) } } - /* We are exiting, so render_thread must now be NULL. + /* We are exiting: tell the main thread. */ - render_thread = NULL; - - return( NULL ); + vips_semaphore_up( &render_finish ); } static void * vips__sink_screen_init( void *data ) { - g_assert( !render_thread ); + g_assert( !render_running ); g_assert( !render_dirty_lock ); render_dirty_lock = vips_g_mutex_new(); vips_semaphore_init( &n_render_dirty_sem, 0, "n_render_dirty" ); - render_thread = vips_g_thread_new( "sink_screen", - render_thread_main, NULL ); + vips_semaphore_init( &render_finish, 0, "render_finish" ); + + if ( vips_threadpool_push( "sink_screen", render_thread_main, + NULL ) ) { + vips_error("vips_sink_screen_init", "%s", + _("unable to init render thread")); + return( NULL ); + } + + render_running = TRUE; return( NULL ); } diff --git a/libvips/iofuncs/threadpool.c b/libvips/iofuncs/threadpool.c index 26416473..b22879e7 100644 --- a/libvips/iofuncs/threadpool.c +++ b/libvips/iofuncs/threadpool.c @@ -55,8 +55,7 @@ */ -/* -#define DEBUG_OUT_OF_THREADS +/* #define VIPS_DEBUG #define VIPS_DEBUG_RED */ @@ -121,10 +120,6 @@ int vips__thinstrip_height = VIPS__THINSTRIP_HEIGHT; */ int vips__concurrency = 0; -/* Count the number of threads we have active and report on leak test. - */ -int vips__n_active_threads = 0; - /* Set this GPrivate to indicate that this is a vips worker. */ static GPrivate *is_worker_key = NULL; @@ -186,109 +181,6 @@ vips_thread_isworker( void ) return( g_private_get( is_worker_key ) != NULL ); } -typedef struct { - const char *domain; - GThreadFunc func; - gpointer data; -} VipsThreadInfo; - -static void * -vips_thread_run( gpointer data ) -{ - VipsThreadInfo *info = (VipsThreadInfo *) data; - - void *result; - - /* Set this to something (anything) to tag this thread as a vips - * worker. - */ - g_private_set( is_worker_key, data ); - - if( vips__thread_profile ) - vips__thread_profile_attach( info->domain ); - - result = info->func( info->data ); - - g_free( info ); - - vips_thread_shutdown(); - - return( result ); -} - -GThread * -vips_g_thread_new( const char *domain, GThreadFunc func, gpointer data ) -{ -#ifdef DEBUG_OUT_OF_THREADS - static int n_threads = 0; -#endif /*DEBUG_OUT_OF_THREADS*/ - - GThread *thread; - VipsThreadInfo *info; - GError *error = NULL; - - info = g_new( VipsThreadInfo, 1 ); - info->domain = domain; - info->func = func; - info->data = data; - -#ifdef DEBUG_OUT_OF_THREADS - n_threads += 1; - if( n_threads > 10 ) - thread = NULL; - else { -#endif /*DEBUG_OUT_OF_THREADS*/ - - thread = g_thread_try_new( domain, vips_thread_run, info, &error ); - - VIPS_DEBUG_MSG_RED( "vips_g_thread_new: g_thread_create( %s ) = %p\n", - domain, thread ); - -#ifdef DEBUG_OUT_OF_THREADS - } -#endif /*DEBUG_OUT_OF_THREADS*/ - - if( !thread ) { - if( error ) - vips_g_error( &error ); - else - vips_error( domain, - "%s", _( "unable to create thread" ) ); - } - - if( thread && - vips__leak ) { - g_mutex_lock( vips__global_lock ); - - vips__n_active_threads += 1; - - g_mutex_unlock( vips__global_lock ); - } - - return( thread ); -} - -void * -vips_g_thread_join( GThread *thread ) -{ - void *result; - - result = g_thread_join( thread ); - - VIPS_DEBUG_MSG_RED( "vips_g_thread_join: g_thread_join( %p )\n", - thread ); - - if( vips__leak ) { - g_mutex_lock( vips__global_lock ); - - vips__n_active_threads -= 1; - - g_mutex_unlock( vips__global_lock ); - } - - return( result ); -} - typedef struct { /* An name for this thread. */ From 8b01104c67b93b72f20b3d078d3542f7f07d548a Mon Sep 17 00:00:00 2001 From: Kleis Auke Wolthuizen Date: Fri, 24 Apr 2020 11:45:44 +0200 Subject: [PATCH 4/7] Add suppressions file for ThreadSanitizer --- .github/workflows/ci.yml | 1 + suppressions/tsan.supp | 115 +++++++++++++++++++++++++++++++++++++++ 2 files changed, 116 insertions(+) create mode 100644 suppressions/tsan.supp diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index a57f8b97..4098d0dc 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -103,6 +103,7 @@ jobs: echo "ASAN_DSO=$ASAN_DSO" >> $GITHUB_ENV echo "ASAN_OPTIONS=suppressions=${{ github.workspace }}/suppressions/asan.supp" >> $GITHUB_ENV echo "LSAN_OPTIONS=suppressions=${{ github.workspace }}/suppressions/lsan.supp" >> $GITHUB_ENV + echo "TSAN_OPTIONS=suppressions=${{ github.workspace }}/suppressions/tsan.supp" >> $GITHUB_ENV echo "UBSAN_OPTIONS=suppressions=${{ github.workspace }}/suppressions/ubsan.supp:print_stacktrace=1" >> $GITHUB_ENV echo "LD_LIBRARY_PATH=$LLVM_PREFIX/lib:`dirname $ASAN_DSO`" >> $GITHUB_ENV echo "$LLVM_PREFIX/bin" >> $GITHUB_PATH diff --git a/suppressions/tsan.supp b/suppressions/tsan.supp new file mode 100644 index 00000000..1a71eeb5 --- /dev/null +++ b/suppressions/tsan.supp @@ -0,0 +1,115 @@ +# an unlocked FALSE/TRUE assignment, which is fine +race:vips_region_prepare_to +race:render_kill +race:render_reschedule + +# an unlocked NULL assignment, which is fine +race:render_thread + +# unlocked read of pixels-processed-so-far, which is fine +race:vips_sink_base_progress +race:wbuffer_allocate_fn +race:sink_memory_area_allocate_fn + +# use of *stop from generate funcs is unlocked, but fine +race:vips_threadpool_run + +# guarded with vips_tracked_mutex, so it should be fine +race:vips_tracked_mem +race:vips_tracked_allocs +race:vips_tracked_files + +# guarded with task->allocate_lock, so it should be fine +race:vips_thread_allocate +race:vips_task_work_unit + +# guarded with vips_cache_lock, so it should be fine +race:vips_cache_table +race:vips_cache_drop_all +race:vips_cache_get_first +race:vips_cache_get_lru_cb +race:vips_cache_remove + +# guarded with vips__global_lock, so it should be fine +race:vips_error_freeze_count +race:vips__link_make +race:vips__link_map +race:vips__link_break_all +race:tile_name + +# the double-buffered output and write-behind thread are non-racy +race:wbuffer_new +race:wbuffer_write +race:wbuffer_work_fn +race:wbuffer_free +race:write_thread_state_new + +# thread-local variables (i.e. GPrivate) are harmless +race:vips_thread_profile_key +race:buffer_thread_key + +# glib signals are probably non-racy +race:vips_image_preeval +race:vips_image_eval +race:vips_image_posteval +race:vips_image_written +race:vips_image_real_written +race:vips_image_save_cb +race:render_close_cb +race:readjpeg_close_cb + +# semaphores are probably non-racy +race:vips_semaphore_* + +# guarded with sink->sslock, so it should be fine +race:sink_call_start +race:sink_call_stop +race:vips_hist_find_start +race:vips_hist_find_stop +race:vips_statistic_scan_start +race:vips_statistic_scan_stop +race:vips_max_stop +race:vips_values_add +race:vips_deviate_stop +race:vips_arithmetic_start +race:histogram_new + +# per-thread state allocate/dispose functions are non-racy +race:sink_thread_state_class_init +race:vips_thread_state_init +race:vips_thread_state_build +race:vips_thread_state_set +race:vips_thread_state_dispose +race:write_thread_state_new +race:vips_sink_base_init +race:vips_sink_thread_state_new +race:sink_memory_init +race:sink_memory_thread_state_new +race:sink_memory_free +race:sink_thread_state_build +race:sink_thread_state_dispose +race:buffer_cache_free +race:sink_init +race:sink_free +race:vips_sequential_dispose +race:vips_block_cache_dispose +race:vips_image_init +race:vips_image_build +race:vips_image_finalize +race:vips_image_dispose + +# guarded with image->sslock, so it should be fine +race:vips__region_start +race:vips__region_stop +race:vips_region_dispose +race:vips__region_take_ownership + +# is fine now, see: https://github.com/libvips/libvips/pull/1211 +race:vips_image_temp_name + +# is fine now, see: https://github.com/libvips/libvips/pull/1483 +race:meta_new +race:meta_cp +race:vips_image_set +race:vips__image_copy_fields_array +race:vips__image_meta_copy From 91a143e5c916de6104f42b7eeaa8d6b51a157e24 Mon Sep 17 00:00:00 2001 From: Kleis Auke Wolthuizen Date: Mon, 8 Jun 2020 16:46:46 +0200 Subject: [PATCH 5/7] Swap g_private_set with g_private_replace where possible There's a possibility that the old GDestroyNotify handler is not called when threads are being reused. --- libvips/iofuncs/gate.c | 4 +--- libvips/iofuncs/threadpool.c | 3 ++- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/libvips/iofuncs/gate.c b/libvips/iofuncs/gate.c index 3e42028c..163e1d94 100644 --- a/libvips/iofuncs/gate.c +++ b/libvips/iofuncs/gate.c @@ -245,15 +245,13 @@ vips__thread_profile_attach( const char *thread_name ) VIPS_DEBUG_MSG( "vips__thread_profile_attach: %s\n", thread_name ); - g_assert( !g_private_get( vips_thread_profile_key ) ); - profile = g_new( VipsThreadProfile, 1 ); profile->name = thread_name; profile->gates = g_hash_table_new_full( g_direct_hash, g_str_equal, NULL, (GDestroyNotify) vips_thread_gate_free ); profile->memory = vips_thread_gate_new( "memory" ); - g_private_set( vips_thread_profile_key, profile ); + g_private_replace( vips_thread_profile_key, profile ); } static VipsThreadProfile * diff --git a/libvips/iofuncs/threadpool.c b/libvips/iofuncs/threadpool.c index b22879e7..1e63ceb2 100644 --- a/libvips/iofuncs/threadpool.c +++ b/libvips/iofuncs/threadpool.c @@ -201,7 +201,8 @@ vips_thread_main_loop( gpointer thread_data, gpointer pool_data ) VipsThreadExec *exec = (VipsThreadExec *) thread_data; /* Set this to something (anything) to tag this thread as a vips - * worker. + * worker. No need to call g_private_replace as there is no + * GDestroyNotify handler associated with a worker. */ g_private_set( is_worker_key, thread_data ); From 060ab7c7a214e1b6e479225109b752eaac157ebf Mon Sep 17 00:00:00 2001 From: Kleis Auke Wolthuizen Date: Mon, 8 Jun 2020 17:37:40 +0200 Subject: [PATCH 6/7] Free the calculated pixel buffer cache early Since threads can be reused, we need to free the calculated pixel buffer cache early (i.e. during vips_thread_shutdown). This (partially) reverts commit 1a915db. --- libvips/include/vips/internal.h | 1 + libvips/iofuncs/buffer.c | 10 ++++++++++ libvips/iofuncs/init.c | 1 + 3 files changed, 12 insertions(+) diff --git a/libvips/include/vips/internal.h b/libvips/include/vips/internal.h index 7ce89b50..c5daee0c 100644 --- a/libvips/include/vips/internal.h +++ b/libvips/include/vips/internal.h @@ -140,6 +140,7 @@ int vips_mapfilerw( VipsImage * ); int vips_remapfilerw( VipsImage * ); void vips__buffer_init( void ); +void vips__buffer_shutdown( void ); void vips__copy_4byte( int swap, unsigned char *to, unsigned char *from ); void vips__copy_2byte( gboolean swap, unsigned char *to, unsigned char *from ); diff --git a/libvips/iofuncs/buffer.c b/libvips/iofuncs/buffer.c index 9b584af1..e55f3081 100644 --- a/libvips/iofuncs/buffer.c +++ b/libvips/iofuncs/buffer.c @@ -675,3 +675,13 @@ vips__buffer_init( void ) #endif /*DEBUG_CREATE*/ } +void +vips__buffer_shutdown( void ) +{ + VipsBufferThread *buffer_thread; + + if( (buffer_thread = g_private_get( buffer_thread_key )) ) { + buffer_thread_free( buffer_thread ); + g_private_set( buffer_thread_key, NULL ); + } +} diff --git a/libvips/iofuncs/init.c b/libvips/iofuncs/init.c index 735c907e..51471ed3 100644 --- a/libvips/iofuncs/init.c +++ b/libvips/iofuncs/init.c @@ -672,6 +672,7 @@ void vips_thread_shutdown( void ) { vips__thread_profile_detach(); + vips__buffer_shutdown(); } /** From 83575e347a22c35d58f1f2ab16dab9cfb7dfae20 Mon Sep 17 00:00:00 2001 From: Kleis Auke Wolthuizen Date: Sat, 8 Aug 2020 14:21:06 +0200 Subject: [PATCH 7/7] Use gatomicrefcount in sinkscreen --- libvips/iofuncs/sinkscreen.c | 26 +++++++++++++++++++++++++- 1 file changed, 25 insertions(+), 1 deletion(-) diff --git a/libvips/iofuncs/sinkscreen.c b/libvips/iofuncs/sinkscreen.c index b208b9f1..c4091e5a 100644 --- a/libvips/iofuncs/sinkscreen.c +++ b/libvips/iofuncs/sinkscreen.c @@ -106,8 +106,12 @@ typedef struct _Render { * threads. We can't easily use the gobject ref count system since we * need a lock around operations. */ +#if GLIB_CHECK_VERSION( 2, 58, 0 ) + gatomicrefcount ref_count; +#else int ref_count; - GMutex *ref_count_lock; + GMutex *ref_count_lock; +#endif /* Parameters. */ @@ -228,7 +232,11 @@ render_free( Render *render ) { VIPS_DEBUG_MSG_AMBER( "render_free: %p\n", render ); +#if GLIB_CHECK_VERSION( 2, 58, 0 ) + g_assert ( g_atomic_ref_count_compare( &render->ref_count, 0 ) ); +#else g_assert( render->ref_count == 0 ); +#endif g_mutex_lock( render_dirty_lock ); if( g_slist_find( render_dirty_all, render ) ) { @@ -241,7 +249,9 @@ render_free( Render *render ) } g_mutex_unlock( render_dirty_lock ); +#if !GLIB_CHECK_VERSION( 2, 58, 0 ) vips_g_mutex_free( render->ref_count_lock ); +#endif vips_g_mutex_free( render->lock ); vips_slist_map2( render->all, (VipsSListMap2Fn) tile_free, NULL, NULL ); @@ -266,10 +276,15 @@ render_free( Render *render ) static int render_ref( Render *render ) { +#if GLIB_CHECK_VERSION( 2, 58, 0 ) + g_assert( !g_atomic_ref_count_compare( &render->ref_count, 0 ) ); + g_atomic_ref_count_inc( &render->ref_count ); +#else g_mutex_lock( render->ref_count_lock ); g_assert( render->ref_count != 0 ); render->ref_count += 1; g_mutex_unlock( render->ref_count_lock ); +#endif return( 0 ); } @@ -279,11 +294,16 @@ render_unref( Render *render ) { int kill; +#if GLIB_CHECK_VERSION( 2, 58, 0 ) + g_assert( !g_atomic_ref_count_compare( &render->ref_count, 0 ) ); + kill = g_atomic_ref_count_dec( &render->ref_count ); +#else g_mutex_lock( render->ref_count_lock ); g_assert( render->ref_count > 0 ); render->ref_count -= 1; kill = render->ref_count == 0; g_mutex_unlock( render->ref_count_lock ); +#endif if( kill ) render_free( render ); @@ -554,8 +574,12 @@ render_new( VipsImage *in, VipsImage *out, VipsImage *mask, */ g_object_ref( in ); +#if GLIB_CHECK_VERSION( 2, 58, 0 ) + g_atomic_ref_count_init( &render->ref_count ); +#else render->ref_count = 1; render->ref_count_lock = vips_g_mutex_new(); +#endif render->in = in; render->out = out;