Merge pull request #2235 from kleisauke/reuse-threads

Reuse threads by using the thread pool of GLib (#2038)
This commit is contained in:
John Cupitt 2021-05-08 18:52:04 +01:00 committed by GitHub
commit 98946e5e15
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 520 additions and 438 deletions

View File

@ -103,6 +103,7 @@ jobs:
echo "ASAN_DSO=$ASAN_DSO" >> $GITHUB_ENV
echo "ASAN_OPTIONS=suppressions=${{ github.workspace }}/suppressions/asan.supp" >> $GITHUB_ENV
echo "LSAN_OPTIONS=suppressions=${{ github.workspace }}/suppressions/lsan.supp" >> $GITHUB_ENV
echo "TSAN_OPTIONS=suppressions=${{ github.workspace }}/suppressions/tsan.supp" >> $GITHUB_ENV
echo "UBSAN_OPTIONS=suppressions=${{ github.workspace }}/suppressions/ubsan.supp:print_stacktrace=1" >> $GITHUB_ENV
echo "LD_LIBRARY_PATH=$LLVM_PREFIX/lib:`dirname $ASAN_DSO`" >> $GITHUB_ENV
echo "$LLVM_PREFIX/bin" >> $GITHUB_PATH

View File

@ -5694,7 +5694,7 @@ vips_get_option_group( void )
}
/* We used to use this for system() back in the day. But it's awkward to make
* it work properly on win32, so this is nonw deprecated.
* it work properly on win32, so this is now deprecated.
*/
FILE *
vips_popenf( const char *fmt, const char *mode, ... )
@ -5703,3 +5703,16 @@ vips_popenf( const char *fmt, const char *mode, ... )
return( NULL );
}
GThread *
vips_g_thread_new( const char *domain, GThreadFunc func, gpointer data )
{
vips_error( "vips_g_thread_new", "%s", _( "deprecated" ) );
return( NULL );
}
void *
vips_g_thread_join( GThread *thread )
{
vips_error( "vips_g_thread_join", "%s", _( "deprecated" ) );
return( NULL );
}

View File

@ -108,9 +108,8 @@ extern char *vips__disc_threshold;
extern gboolean vips__cache_dump;
extern gboolean vips__cache_trace;
extern int vips__n_active_threads;
void vips__threadpool_init( void );
void vips__threadpool_shutdown( void );
void vips__cache_init( void );
@ -141,6 +140,7 @@ int vips_mapfilerw( VipsImage * );
int vips_remapfilerw( VipsImage * );
void vips__buffer_init( void );
void vips__buffer_shutdown( void );
void vips__copy_4byte( int swap, unsigned char *to, unsigned char *from );
void vips__copy_2byte( gboolean swap, unsigned char *to, unsigned char *from );

View File

@ -47,11 +47,6 @@ void vips_g_mutex_free( GMutex * );
GCond *vips_g_cond_new( void );
void vips_g_cond_free( GCond * );
/* ... and for GThread.
*/
GThread *vips_g_thread_new( const char *, GThreadFunc, gpointer );
void *vips_g_thread_join( GThread *thread );
gboolean vips_thread_isworker( void );
#ifdef __cplusplus

View File

@ -5,6 +5,8 @@
* 17/3/10
* - from threadgroup
* - rework with a simpler distributed work allocation model
* 02/02/20 kleisauke
* - reuse threads by using GLib's threadpool
*/
/*
@ -130,6 +132,7 @@ typedef int (*VipsThreadpoolWorkFn)( VipsThreadState *state, void *a );
*/
typedef int (*VipsThreadpoolProgressFn)( void *a );
int vips_threadpool_push( const char *name, GFunc func, gpointer data );
int vips_threadpool_run( VipsImage *im,
VipsThreadStartFn start,
VipsThreadpoolAllocateFn allocate,

View File

@ -1224,6 +1224,11 @@ VipsWindow *vips_window_ref( VipsImage *im, int top, int height );
FILE *vips_popenf( const char *fmt, const char *mode, ... )
__attribute__((format(printf, 1, 3)));
/* old GThread API.
*/
GThread *vips_g_thread_new( const char *, GThreadFunc, gpointer );
void *vips_g_thread_join( GThread *thread );
/* This stuff is very, very old and should not be used by anyone now.
*/
#ifdef VIPS_ENABLE_ANCIENT

View File

@ -675,3 +675,13 @@ vips__buffer_init( void )
#endif /*DEBUG_CREATE*/
}
void
vips__buffer_shutdown( void )
{
VipsBufferThread *buffer_thread;
if( (buffer_thread = g_private_get( buffer_thread_key )) ) {
buffer_thread_free( buffer_thread );
g_private_set( buffer_thread_key, NULL );
}
}

View File

@ -245,15 +245,13 @@ vips__thread_profile_attach( const char *thread_name )
VIPS_DEBUG_MSG( "vips__thread_profile_attach: %s\n", thread_name );
g_assert( !g_private_get( vips_thread_profile_key ) );
profile = g_new( VipsThreadProfile, 1 );
profile->name = thread_name;
profile->gates = g_hash_table_new_full(
g_direct_hash, g_str_equal,
NULL, (GDestroyNotify) vips_thread_gate_free );
profile->memory = vips_thread_gate_new( "memory" );
g_private_set( vips_thread_profile_key, profile );
g_private_replace( vips_thread_profile_key, profile );
}
static VipsThreadProfile *

View File

@ -641,12 +641,6 @@ vips_leak( void )
n_leaks += strlen( vips_error_buffer() );
}
if( vips__n_active_threads > 0 ) {
vips_buf_appendf( &buf, "threads: %d not joined\n",
vips__n_active_threads );
n_leaks += vips__n_active_threads;
}
fprintf( stderr, "%s", vips_buf_all( &buf ) );
n_leaks += vips__print_renders();
@ -665,7 +659,7 @@ vips_leak( void )
*
* This function needs to be called when a thread that has been using vips
* exits. It is called for you by vips_shutdown() and for any threads created
* by vips_g_thread_new().
* within the #VipsThreadPool.
*
* You will need to call it from threads created in
* other ways or there will be memory leaks. If you do not call it, vips
@ -678,6 +672,7 @@ void
vips_thread_shutdown( void )
{
vips__thread_profile_detach();
vips__buffer_shutdown();
}
/**
@ -719,6 +714,8 @@ vips_shutdown( void )
vips__thread_profile_stop();
vips__threadpool_shutdown();
#ifdef HAVE_GSF
gsf_shutdown();
#endif /*HAVE_GSF*/

View File

@ -73,6 +73,10 @@ typedef struct _Sink {
*/
VipsImage *t;
/* Mutex for serialising calls to VipsStartFn and VipsStopFn.
*/
GMutex *sslock;
/* Call params.
*/
VipsStartFn start_fn;
@ -167,7 +171,7 @@ sink_area_position( SinkArea *area, int top, int height )
static gboolean
sink_area_allocate_fn( VipsThreadState *state, void *a, gboolean *stop )
{
SinkThreadState *wstate = (SinkThreadState *) state;
SinkThreadState *sstate = (SinkThreadState *) state;
Sink *sink = (Sink *) a;
SinkBase *sink_base = (SinkBase *) sink;
@ -223,7 +227,7 @@ sink_area_allocate_fn( VipsThreadState *state, void *a, gboolean *stop )
/* The thread needs to know which area it's writing to.
*/
wstate->area = sink->area;
sstate->area = sink->area;
VIPS_DEBUG_MSG( " %p allocated %d x %d:\n",
g_thread_self(), state->pos.left, state->pos.top );
@ -249,9 +253,21 @@ static int
sink_call_stop( Sink *sink, SinkThreadState *state )
{
if( state->seq && sink->stop_fn ) {
int result;
VIPS_DEBUG_MSG( "sink_call_stop: state = %p\n", state );
if( sink->stop_fn( state->seq, sink->a, sink->b ) ) {
VIPS_GATE_START( "sink_call_stop: wait" );
g_mutex_lock( sink->sslock );
VIPS_GATE_STOP( "sink_call_stop: wait" );
result = sink->stop_fn( state->seq, sink->a, sink->b );
g_mutex_unlock( sink->sslock );
if( result ) {
SinkBase *sink_base = (SinkBase *) sink;
vips_error( "vips_sink",
@ -286,8 +302,16 @@ sink_call_start( Sink *sink, SinkThreadState *state )
if( !state->seq && sink->start_fn ) {
VIPS_DEBUG_MSG( "sink_call_start: state = %p\n", state );
VIPS_GATE_START( "sink_call_start: wait" );
g_mutex_lock( sink->sslock );
VIPS_GATE_STOP( "sink_call_start: wait" );
state->seq = sink->start_fn( sink->t, sink->a, sink->b );
g_mutex_unlock( sink->sslock );
if( !state->seq ) {
SinkBase *sink_base = (SinkBase *) sink;
@ -346,6 +370,7 @@ vips_sink_thread_state_new( VipsImage *im, void *a )
static void
sink_free( Sink *sink )
{
VIPS_FREEF( vips_g_mutex_free, sink->sslock );
VIPS_FREEF( sink_area_free, sink->area );
VIPS_FREEF( sink_area_free, sink->old_area );
VIPS_FREEF( g_object_unref, sink->t );
@ -381,6 +406,7 @@ sink_init( Sink *sink,
vips_sink_base_init( &sink->sink_base, image );
sink->t = NULL;
sink->sslock = vips_g_mutex_new();
sink->start_fn = start_fn;
sink->generate_fn = generate_fn;
sink->stop_fn = stop_fn;

View File

@ -75,8 +75,9 @@ typedef struct _WriteBuffer {
VipsSemaphore go; /* Start bg thread loop */
VipsSemaphore nwrite; /* Number of threads writing to region */
VipsSemaphore done; /* Bg thread has done write */
VipsSemaphore finish; /* Bg thread has finished */
int write_errno; /* Save write errors here */
GThread *thread; /* BG writer thread */
gboolean running; /* Whether the bg writer thread is running */
gboolean kill; /* Set to ask thread to exit */
} WriteBuffer;
@ -141,22 +142,22 @@ wbuffer_free( WriteBuffer *wbuffer )
{
/* Is there a thread running this region? Kill it!
*/
if( wbuffer->thread ) {
if( wbuffer->running ) {
wbuffer->kill = TRUE;
vips_semaphore_up( &wbuffer->go );
/* Return value is always NULL (see wbuffer_write_thread).
*/
(void) vips_g_thread_join( wbuffer->thread );
VIPS_DEBUG_MSG( "wbuffer_free: vips_g_thread_join()\n" );
vips_semaphore_down( &wbuffer->finish );
wbuffer->thread = NULL;
VIPS_DEBUG_MSG( "wbuffer_free:\n" );
wbuffer->running = FALSE;
}
VIPS_UNREF( wbuffer->region );
vips_semaphore_destroy( &wbuffer->go );
vips_semaphore_destroy( &wbuffer->nwrite );
vips_semaphore_destroy( &wbuffer->done );
vips_semaphore_destroy( &wbuffer->finish );
g_free( wbuffer );
}
@ -178,8 +179,8 @@ wbuffer_write( WriteBuffer *wbuffer )
/* Run this as a thread to do a BG write.
*/
static void *
wbuffer_write_thread( void *data )
static void
wbuffer_write_thread( void *data, void *user_data )
{
WriteBuffer *wbuffer = (WriteBuffer *) data;
@ -202,7 +203,9 @@ wbuffer_write_thread( void *data )
vips_semaphore_up( &wbuffer->done );
}
return( NULL );
/* We are exiting: tell the main thread.
*/
vips_semaphore_up( &wbuffer->finish );
}
static WriteBuffer *
@ -217,8 +220,9 @@ wbuffer_new( Write *write )
vips_semaphore_init( &wbuffer->go, 0, "go" );
vips_semaphore_init( &wbuffer->nwrite, 0, "nwrite" );
vips_semaphore_init( &wbuffer->done, 0, "done" );
vips_semaphore_init( &wbuffer->finish, 0, "finish" );
wbuffer->write_errno = 0;
wbuffer->thread = NULL;
wbuffer->running = FALSE;
wbuffer->kill = FALSE;
if( !(wbuffer->region = vips_region_new( write->sink_base.im )) ) {
@ -232,12 +236,14 @@ wbuffer_new( Write *write )
/* Make this last (picks up parts of wbuffer on startup).
*/
if( !(wbuffer->thread = vips_g_thread_new( "wbuffer",
wbuffer_write_thread, wbuffer )) ) {
if( vips_threadpool_push( "wbuffer", wbuffer_write_thread,
wbuffer ) ) {
wbuffer_free( wbuffer );
return( NULL );
}
wbuffer->running = TRUE;
return( wbuffer );
}

View File

@ -173,7 +173,7 @@ sink_memory_area_position( SinkMemoryArea *area, int top, int height )
static gboolean
sink_memory_area_allocate_fn( VipsThreadState *state, void *a, gboolean *stop )
{
SinkMemoryThreadState *wstate = (SinkMemoryThreadState *) state;
SinkMemoryThreadState *smstate = (SinkMemoryThreadState *) state;
SinkMemory *memory = (SinkMemory *) a;
SinkBase *sink_base = (SinkBase *) memory;
@ -229,7 +229,7 @@ sink_memory_area_allocate_fn( VipsThreadState *state, void *a, gboolean *stop )
/* The thread needs to know which area it's writing to.
*/
wstate->area = memory->area;
smstate->area = memory->area;
VIPS_DEBUG_MSG( " %p allocated %d x %d:\n",
g_thread_self(), state->pos.left, state->pos.top );
@ -255,8 +255,8 @@ static int
sink_memory_area_work_fn( VipsThreadState *state, void *a )
{
SinkMemory *memory = (SinkMemory *) a;
SinkMemoryThreadState *wstate = (SinkMemoryThreadState *) state;
SinkMemoryArea *area = wstate->area;
SinkMemoryThreadState *smstate = (SinkMemoryThreadState *) state;
SinkMemoryArea *area = smstate->area;
int result;

View File

@ -106,8 +106,12 @@ typedef struct _Render {
* threads. We can't easily use the gobject ref count system since we
* need a lock around operations.
*/
#if GLIB_CHECK_VERSION( 2, 58, 0 )
gatomicrefcount ref_count;
#else
int ref_count;
GMutex *ref_count_lock;
#endif
/* Parameters.
*/
@ -162,22 +166,29 @@ typedef struct _RenderThreadStateClass {
G_DEFINE_TYPE( RenderThreadState, render_thread_state, VIPS_TYPE_THREAD_STATE );
/* The BG thread which sits waiting to do some calculations, and the semaphore
* it waits on holding the number of renders with dirty tiles.
/* A boolean indicating if the bg render thread is running.
*/
static GThread *render_thread = NULL;
static gboolean render_running = FALSE;
/* Set this to ask the render thread to quit.
*/
static gboolean render_kill = FALSE;
/* All the renders with dirty tiles, and a semaphore that the bg render thread
* waits on.
/* All the renders with dirty tiles.
*/
static GMutex *render_dirty_lock = NULL;
static GSList *render_dirty_all = NULL;
/* A semaphore where the bg render thread waits on holding the number of
* renders with dirty tiles
*/
static VipsSemaphore n_render_dirty_sem;
/* A semaphore where the main thread waits for when the bg render thread
* is shutdown.
*/
static VipsSemaphore render_finish;
/* Set this to make the bg thread stop and reschedule.
*/
static gboolean render_reschedule = FALSE;
@ -221,7 +232,11 @@ render_free( Render *render )
{
VIPS_DEBUG_MSG_AMBER( "render_free: %p\n", render );
#if GLIB_CHECK_VERSION( 2, 58, 0 )
g_assert ( g_atomic_ref_count_compare( &render->ref_count, 0 ) );
#else
g_assert( render->ref_count == 0 );
#endif
g_mutex_lock( render_dirty_lock );
if( g_slist_find( render_dirty_all, render ) ) {
@ -234,7 +249,9 @@ render_free( Render *render )
}
g_mutex_unlock( render_dirty_lock );
#if !GLIB_CHECK_VERSION( 2, 58, 0 )
vips_g_mutex_free( render->ref_count_lock );
#endif
vips_g_mutex_free( render->lock );
vips_slist_map2( render->all, (VipsSListMap2Fn) tile_free, NULL, NULL );
@ -259,10 +276,15 @@ render_free( Render *render )
static int
render_ref( Render *render )
{
#if GLIB_CHECK_VERSION( 2, 58, 0 )
g_assert( !g_atomic_ref_count_compare( &render->ref_count, 0 ) );
g_atomic_ref_count_inc( &render->ref_count );
#else
g_mutex_lock( render->ref_count_lock );
g_assert( render->ref_count != 0 );
render->ref_count += 1;
g_mutex_unlock( render->ref_count_lock );
#endif
return( 0 );
}
@ -272,11 +294,16 @@ render_unref( Render *render )
{
int kill;
#if GLIB_CHECK_VERSION( 2, 58, 0 )
g_assert( !g_atomic_ref_count_compare( &render->ref_count, 0 ) );
kill = g_atomic_ref_count_dec( &render->ref_count );
#else
g_mutex_lock( render->ref_count_lock );
g_assert( render->ref_count > 0 );
render->ref_count -= 1;
kill = render->ref_count == 0;
g_mutex_unlock( render->ref_count_lock );
#endif
if( kill )
render_free( render );
@ -435,10 +462,7 @@ vips__render_shutdown( void )
if( render_dirty_lock ) {
g_mutex_lock( render_dirty_lock );
if( render_thread ) {
GThread *thread;
thread = render_thread;
if( render_running ) {
render_reschedule = TRUE;
render_kill = TRUE;
@ -446,13 +470,16 @@ vips__render_shutdown( void )
vips_semaphore_up( &n_render_dirty_sem );
(void) vips_g_thread_join( thread );
vips_semaphore_down( &render_finish );
render_running = FALSE;
}
else
g_mutex_unlock( render_dirty_lock );
VIPS_FREEF( vips_g_mutex_free, render_dirty_lock );
vips_semaphore_destroy( &n_render_dirty_sem );
vips_semaphore_destroy( &render_finish );
}
}
@ -547,8 +574,12 @@ render_new( VipsImage *in, VipsImage *out, VipsImage *mask,
*/
g_object_ref( in );
#if GLIB_CHECK_VERSION( 2, 58, 0 )
g_atomic_ref_count_init( &render->ref_count );
#else
render->ref_count = 1;
render->ref_count_lock = vips_g_mutex_new();
#endif
render->in = in;
render->out = out;
@ -985,10 +1016,10 @@ render_dirty_get( void )
return( render );
}
/* Loop for the background render mananger thread.
/* Loop for the background render manager thread.
*/
static void *
render_thread_main( void *client )
static void
render_thread_main( void *data, void *user_data )
{
Render *render;
@ -1023,23 +1054,29 @@ render_thread_main( void *client )
}
}
/* We are exiting, so render_thread must now be NULL.
/* We are exiting: tell the main thread.
*/
render_thread = NULL;
return( NULL );
vips_semaphore_up( &render_finish );
}
static void *
vips__sink_screen_init( void *data )
{
g_assert( !render_thread );
g_assert( !render_running );
g_assert( !render_dirty_lock );
render_dirty_lock = vips_g_mutex_new();
vips_semaphore_init( &n_render_dirty_sem, 0, "n_render_dirty" );
render_thread = vips_g_thread_new( "sink_screen",
render_thread_main, NULL );
vips_semaphore_init( &render_finish, 0, "render_finish" );
if ( vips_threadpool_push( "sink_screen", render_thread_main,
NULL ) ) {
vips_error("vips_sink_screen_init", "%s",
_("unable to init render thread"));
return( NULL );
}
render_running = TRUE;
return( NULL );
}

View File

@ -23,6 +23,9 @@
* - don't depend on image width when setting n_lines
* 27/2/19 jtorresfabra
* - free threadpool earlier
* 02/02/20 kleisauke
* - reuse threads by using GLib's threadpool
* - remove mutex lock for VipsThreadStartFn
*/
/*
@ -53,7 +56,6 @@
*/
/*
#define DEBUG_OUT_OF_THREADS
#define VIPS_DEBUG
#define VIPS_DEBUG_RED
*/
@ -87,6 +89,15 @@
* @include: vips/vips.h
* @title: VipsThreadpool
*
* A threadpool which allows reusing already started threads. Implementing
* this can be tedious and error-prone. Therefore we use the GLib
* provided threadpool for our convenience. An added advantage is, that
* the threads can be shared between the different subsystems, when they
* are using GLib.
*
* The threadpool is created during vips_init() and is destroyed by
* vips_shutdown().
*
* vips_threadpool_run() loops a set of threads over an image. Threads take it
* in turns to allocate units of work (a unit might be a tile in an image),
* then run in parallel to process those units. An optional progress function
@ -109,10 +120,6 @@ int vips__thinstrip_height = VIPS__THINSTRIP_HEIGHT;
*/
int vips__concurrency = 0;
/* Count the number of threads we have active and report on leak test.
*/
int vips__n_active_threads = 0;
/* Set this GPrivate to indicate that this is a vips worker.
*/
static GPrivate *is_worker_key = NULL;
@ -121,6 +128,10 @@ static GPrivate *is_worker_key = NULL;
*/
static gboolean vips__stall = FALSE;
/* The thread pool we'll use.
*/
static GThreadPool *vips__pool = NULL;
/* Glib 2.32 revised the thread API. We need some compat functions.
*/
@ -171,125 +182,38 @@ vips_thread_isworker( void )
}
typedef struct {
const char *domain;
GThreadFunc func;
/* An name for this thread.
*/
const char *name;
/* The function to execute within the #VipsThreadPool.
*/
GFunc func;
/* User data that is handed over to func when it is called.
*/
gpointer data;
} VipsThreadInfo;
} VipsThreadExec;
static void *
vips_thread_run( gpointer data )
static void
vips_thread_main_loop( gpointer thread_data, gpointer pool_data )
{
VipsThreadInfo *info = (VipsThreadInfo *) data;
void *result;
VipsThreadExec *exec = (VipsThreadExec *) thread_data;
/* Set this to something (anything) to tag this thread as a vips
* worker.
* worker. No need to call g_private_replace as there is no
* GDestroyNotify handler associated with a worker.
*/
g_private_set( is_worker_key, data );
g_private_set( is_worker_key, thread_data );
if( vips__thread_profile )
vips__thread_profile_attach( info->domain );
vips__thread_profile_attach( exec->name );
result = info->func( info->data );
exec->func( exec->data, pool_data );
g_free( info );
g_free( exec );
vips_thread_shutdown();
return( result );
}
GThread *
vips_g_thread_new( const char *domain, GThreadFunc func, gpointer data )
{
#ifdef DEBUG_OUT_OF_THREADS
static int n_threads = 0;
#endif /*DEBUG_OUT_OF_THREADS*/
GThread *thread;
VipsThreadInfo *info;
GError *error = NULL;
info = g_new( VipsThreadInfo, 1 );
info->domain = domain;
info->func = func;
info->data = data;
#ifdef DEBUG_OUT_OF_THREADS
n_threads += 1;
if( n_threads > 10 )
thread = NULL;
else {
#endif /*DEBUG_OUT_OF_THREADS*/
thread = g_thread_try_new( domain, vips_thread_run, info, &error );
VIPS_DEBUG_MSG_RED( "vips_g_thread_new: g_thread_create( %s ) = %p\n",
domain, thread );
#ifdef DEBUG_OUT_OF_THREADS
}
#endif /*DEBUG_OUT_OF_THREADS*/
if( !thread ) {
if( error )
vips_g_error( &error );
else
vips_error( domain,
"%s", _( "unable to create thread" ) );
}
if( thread &&
vips__leak ) {
g_mutex_lock( vips__global_lock );
vips__n_active_threads += 1;
g_mutex_unlock( vips__global_lock );
}
return( thread );
}
void *
vips_g_thread_join( GThread *thread )
{
void *result;
result = g_thread_join( thread );
VIPS_DEBUG_MSG_RED( "vips_g_thread_join: g_thread_join( %p )\n",
thread );
if( vips__leak ) {
g_mutex_lock( vips__global_lock );
vips__n_active_threads -= 1;
g_mutex_unlock( vips__global_lock );
}
return( result );
}
/**
* vips_concurrency_set:
* @concurrency: number of threads to run
*
* Sets the number of worker threads that vips should use when running a
* #VipsThreadPool.
*
* The special value 0 means "default". In this case, the number of threads is
* set by the environment variable VIPS_CONCURRENCY, or if that is not set, the
* number of threads availble on the host machine.
*
* See also: vips_concurrency_get().
*/
void
vips_concurrency_set( int concurrency )
{
vips__concurrency = concurrency;
}
static int
@ -371,6 +295,67 @@ get_num_processors( void )
#endif /*!GLIB_CHECK_VERSION( 2, 48, 1 )*/
}
/* The default concurrency, set by the environment variable VIPS_CONCURRENCY,
* or if that is not set, the number of threads available on the host machine.
*/
static int
vips__concurrency_get_default( void )
{
const char *str;
int nthr;
int x;
/* Tell the threads system how much concurrency we expect.
*/
if( vips__concurrency > 0 )
nthr = vips__concurrency;
else if( ((str = g_getenv( "VIPS_CONCURRENCY" ))
#if ENABLE_DEPRECATED
|| (str = g_getenv( "IM_CONCURRENCY" ))
#endif
) && (x = atoi( str )) > 0 )
nthr = x;
else
nthr = get_num_processors();
if( nthr < 1 || nthr > MAX_THREADS ) {
nthr = VIPS_CLIP( 1, nthr, MAX_THREADS );
g_warning( _( "threads clipped to %d" ), nthr );
}
return nthr;
}
/**
* vips_concurrency_set:
* @concurrency: number of threads to run
*
* Sets the number of worker threads that vips should use when running a
* #VipsThreadPool.
*
* The special value 0 means "default". In this case, the number of threads is
* set by the environment variable VIPS_CONCURRENCY, or if that is not set, the
* number of threads available on the host machine.
*
* See also: vips_concurrency_get().
*/
void
vips_concurrency_set( int concurrency )
{
/* Tell the threads system how much concurrency we expect.
*/
if( concurrency < 1 )
concurrency = vips__concurrency_get_default();
else if( concurrency > MAX_THREADS ) {
concurrency = MAX_THREADS;
g_warning( _( "threads clipped to %d" ), MAX_THREADS );
}
vips__concurrency = concurrency;
}
/**
* vips_concurrency_get:
*
@ -398,34 +383,7 @@ get_num_processors( void )
int
vips_concurrency_get( void )
{
const char *str;
int nthr;
int x;
/* Tell the threads system how much concurrency we expect.
*/
if( vips__concurrency > 0 )
nthr = vips__concurrency;
else if( ((str = g_getenv( "VIPS_CONCURRENCY" ))
#if ENABLE_DEPRECATED
|| (str = g_getenv( "IM_CONCURRENCY" ))
#endif
) && (x = atoi( str )) > 0 )
nthr = x;
else
nthr = get_num_processors();
if( nthr < 1 || nthr > MAX_THREADS ) {
nthr = VIPS_CLIP( 1, nthr, MAX_THREADS );
g_warning( _( "threads clipped to %d" ), nthr );
}
/* Save for next time around.
*/
vips_concurrency_set( nthr );
return( nthr );
return( vips__concurrency );
}
G_DEFINE_TYPE( VipsThreadState, vips_thread_state, VIPS_TYPE_OBJECT );
@ -500,41 +458,17 @@ vips_thread_state_new( VipsImage *im, void *a )
VIPS_TYPE_THREAD_STATE, vips_thread_state_set, im, a ) ) );
}
/* What we track for each thread in the pool.
/* What we track for a task within the #VipsThreadPool.
*/
typedef struct {
/* All private.
*/
/*< private >*/
struct _VipsThreadpool *pool; /* Pool we are part of */
VipsThreadState *state;
/* Thread we are running.
*/
GThread *thread;
/* Set this to ask the thread to exit.
*/
gboolean exit;
/* Set by the thread if work or allocate return an error.
*/
gboolean error;
} VipsThread;
/* What we track for a group of threads working together.
*/
typedef struct _VipsThreadpool {
typedef struct _VipsTask {
/* All private.
*/
/*< private >*/
VipsImage *im; /* Image we are calculating */
/* Start a thread, do a unit of work (runs in parallel) and allocate
* a unit of work (serial). Plus the mutex we use to serialize work
* allocation.
/* Start or reuse a thread, do a unit of work (runs in parallel)
* and allocate a unit of work (serial). Plus the mutex we use to
* serialize work allocation.
*/
VipsThreadStartFn start;
VipsThreadpoolAllocateFn allocate;
@ -542,10 +476,7 @@ typedef struct _VipsThreadpool {
GMutex *allocate_lock;
void *a; /* User argument to start / allocate / etc. */
int nthr; /* Number of threads in pool */
VipsThread **thr; /* Threads */
/* The caller blocks here until all threads finish.
/* The caller blocks here until all tasks finish.
*/
VipsSemaphore finish;
@ -560,46 +491,7 @@ typedef struct _VipsThreadpool {
/* Set by Allocate (via an arg) to indicate normal end of computation.
*/
gboolean stop;
} VipsThreadpool;
/* Junk a thread.
*/
static void
vips_thread_free( VipsThread *thr )
{
/* Is there a thread running this region? Kill it!
*/
if( thr->thread ) {
thr->exit = 1;
/* Return value is always NULL (see thread_main_loop).
*/
(void) vips_g_thread_join( thr->thread );
thr->thread = NULL;
}
VIPS_FREEF( g_object_unref, thr->state );
thr->pool = NULL;
VIPS_FREE( thr );
}
static int
vips_thread_allocate( VipsThread *thr )
{
VipsThreadpool *pool = thr->pool;
g_assert( !pool->stop );
if( !thr->state &&
!(thr->state = pool->start( pool->im, pool->a )) )
return( -1 );
if( pool->allocate( thr->state, pool->a, &pool->stop ) )
return( -1 );
return( 0 );
}
} VipsTask;
/* Run this once per main loop. Get some work (single-threaded), then do it
* (many-threaded).
@ -608,221 +500,198 @@ vips_thread_allocate( VipsThread *thr )
* loaders a change to seek to the correct spot, see vips_sequential().
*/
static void
vips_thread_work_unit( VipsThread *thr )
vips_task_work_unit( VipsTask *task, VipsThreadState *state )
{
VipsThreadpool *pool = thr->pool;
if( thr->error )
if( task->error )
return;
VIPS_GATE_START( "vips_thread_work_unit: wait" );
VIPS_GATE_START( "vips_task_work_unit: wait" );
g_mutex_lock( pool->allocate_lock );
g_mutex_lock( task->allocate_lock );
VIPS_GATE_STOP( "vips_thread_work_unit: wait" );
VIPS_GATE_STOP( "vips_task_work_unit: wait" );
/* Has another worker signaled stop while we've been working?
*/
if( pool->stop ) {
g_mutex_unlock( pool->allocate_lock );
if( task->stop ) {
g_mutex_unlock( task->allocate_lock );
return;
}
if( vips_thread_allocate( thr ) ) {
thr->error = TRUE;
pool->error = TRUE;
g_mutex_unlock( pool->allocate_lock );
if( task->allocate( state, task->a, &task->stop ) ) {
task->error = TRUE;
g_mutex_unlock( task->allocate_lock );
return;
}
/* Have we just signalled stop?
*/
if( pool->stop ) {
g_mutex_unlock( pool->allocate_lock );
if( task->stop ) {
g_mutex_unlock( task->allocate_lock );
return;
}
g_mutex_unlock( pool->allocate_lock );
g_mutex_unlock( task->allocate_lock );
if( thr->state->stall &&
if( state->stall &&
vips__stall ) {
/* Sleep for 0.5s. Handy for stressing the seq system. Stall
* is set by allocate funcs in various places.
*/
g_usleep( 500000 );
thr->state->stall = FALSE;
printf( "vips_thread_work_unit: "
"stall done, releasing y = %d ...\n", thr->state->y );
state->stall = FALSE;
printf( "vips_task_work_unit: "
"stall done, releasing y = %d ...\n", state->y );
}
/* Process a work unit.
*/
if( pool->work( thr->state, pool->a ) ) {
thr->error = TRUE;
pool->error = TRUE;
}
if( task->work( state, task->a ) )
task->error = TRUE;
}
/* What runs as a thread ... loop, waiting to be told to do stuff.
*/
static void *
vips_thread_main_loop( void *a )
static void
vips_task_run( gpointer data, gpointer user_data )
{
VipsThread *thr = (VipsThread *) a;
VipsThreadpool *pool = thr->pool;
VipsTask *task = (VipsTask *) data;
VipsThreadState *state;
g_assert( pool == thr->pool );
VIPS_GATE_START( "vips_task_run: thread" );
VIPS_GATE_START( "vips_thread_main_loop: thread" );
if( !(state = task->start( task->im, task->a )) )
task->error = TRUE;
/* Process work units! Always tick, even if we are stopping, so the
* main thread will wake up for exit.
*/
for(;;) {
VIPS_GATE_START( "vips_thread_work_unit: u" );
vips_thread_work_unit( thr );
VIPS_GATE_STOP( "vips_thread_work_unit: u" );
vips_semaphore_up( &pool->tick );
VIPS_GATE_START( "vips_task_work_unit: u" );
vips_task_work_unit( task, state );
VIPS_GATE_STOP( "vips_task_work_unit: u" );
vips_semaphore_up( &task->tick );
if( pool->stop ||
pool->error )
if( task->stop ||
task->error )
break;
}
VIPS_FREEF( g_object_unref, state );
/* We are exiting: tell the main thread.
*/
vips_semaphore_up( &pool->finish );
vips_semaphore_up( &task->finish );
VIPS_GATE_STOP( "vips_thread_main_loop: thread" );
return( NULL );
VIPS_GATE_STOP( "vips_task_run: thread" );
}
/* Attach another thread to a threadpool.
/* Called from vips_shutdown().
*/
static VipsThread *
vips_thread_new( VipsThreadpool *pool )
void
vips__threadpool_shutdown( void )
{
VipsThread *thr;
if( !(thr = VIPS_NEW( NULL, VipsThread )) )
return( NULL );
thr->pool = pool;
thr->state = NULL;
thr->thread = NULL;
thr->exit = 0;
thr->error = 0;
/* We can't build the state here, it has to be done by the worker
* itself the first time that allocate runs so that any regions are
* owned by the correct thread.
/* We may come here without having inited.
*/
if( vips__pool ) {
VIPS_DEBUG_MSG( "vips__threadpool_shutdown: (%p)\n", vips__pool );
if( !(thr->thread = vips_g_thread_new( "worker",
vips_thread_main_loop, thr )) ) {
vips_thread_free( thr );
return( NULL );
}
return( thr );
}
/* Kill all threads in a threadpool, if there are any. Can be called multiple
* times.
*/
static void
vips_threadpool_kill_threads( VipsThreadpool *pool )
{
if( pool->thr ) {
int i;
for( i = 0; i < pool->nthr; i++ )
VIPS_FREEF( vips_thread_free, pool->thr[i] );
VIPS_DEBUG_MSG( "vips_threadpool_kill_threads: "
"killed %d threads\n", pool->nthr );
g_thread_pool_free( vips__pool, TRUE, TRUE );
vips__pool = NULL;
}
}
static void
vips_threadpool_free( VipsThreadpool *pool )
static VipsTask *
vips_task_new( VipsImage *im, int *n_tasks )
{
VIPS_DEBUG_MSG( "vips_threadpool_free: \"%s\" (%p)\n",
pool->im->filename, pool );
vips_threadpool_kill_threads( pool );
VIPS_FREEF( vips_g_mutex_free, pool->allocate_lock );
vips_semaphore_destroy( &pool->finish );
vips_semaphore_destroy( &pool->tick );
VIPS_FREE( pool->thr );
VIPS_FREE( pool );
}
static VipsThreadpool *
vips_threadpool_new( VipsImage *im )
{
VipsThreadpool *pool;
VipsTask *task;
int tile_width;
int tile_height;
gint64 n_tiles;
int n_lines;
/* Allocate and init new thread block.
/* Allocate and init a new task.
*/
if( !(pool = VIPS_NEW( NULL, VipsThreadpool )) )
if( !(task = VIPS_NEW( NULL, VipsTask )) )
return( NULL );
pool->im = im;
pool->allocate = NULL;
pool->work = NULL;
pool->allocate_lock = vips_g_mutex_new();
pool->nthr = vips_concurrency_get();
pool->thr = NULL;
vips_semaphore_init( &pool->finish, 0, "finish" );
vips_semaphore_init( &pool->tick, 0, "tick" );
pool->error = FALSE;
pool->stop = FALSE;
task->im = im;
task->allocate = NULL;
task->work = NULL;
task->allocate_lock = vips_g_mutex_new();
vips_semaphore_init( &task->finish, 0, "finish" );
vips_semaphore_init( &task->tick, 0, "tick" );
task->error = FALSE;
task->stop = FALSE;
/* If this is a tiny image, we won't need all nthr threads. Guess how
*n_tasks = vips_concurrency_get();
/* If this is a tiny image, we won't need all n_tasks. Guess how
* many tiles we might need to cover the image and use that to limit
* the number of threads we create.
* the number of tasks we create.
*/
vips_get_tile_size( im, &tile_width, &tile_height, &n_lines );
n_tiles = (1 + (gint64) im->Xsize / tile_width) *
(1 + (gint64) im->Ysize / tile_height);
n_tiles = VIPS_CLIP( 0, n_tiles, MAX_THREADS );
pool->nthr = VIPS_MIN( pool->nthr, n_tiles );
n_tiles = VIPS_MAX( 0, n_tiles );
*n_tasks = VIPS_MIN( *n_tasks, n_tiles );
VIPS_DEBUG_MSG( "vips_threadpool_new: \"%s\" (%p), with %d threads\n",
im->filename, pool, pool->nthr );
VIPS_DEBUG_MSG( "vips_task_new: \"%s\" (%p), with %d tasks\n",
im->filename, task, *n_tasks );
return( pool );
return( task );
}
/* Attach a set of threads.
*/
static int
vips_threadpool_create_threads( VipsThreadpool *pool )
static void
vips_task_free( VipsTask *task )
{
int i;
VIPS_DEBUG_MSG( "vips_task_free: \"%s\" (%p)\n",
task->im->filename, task );
g_assert( !pool->thr );
VIPS_FREEF( vips_g_mutex_free, task->allocate_lock );
vips_semaphore_destroy( &task->finish );
vips_semaphore_destroy( &task->tick );
VIPS_FREE( task );
}
/* Make thread array.
/**
* vips_threadpool_push.
* @name an name for the thread
* @func a function to execute in the thread pool
* @data an argument to supply to @func
*
* Inserts a new function into the list of tasks to be executed by the
* #VipsThreadPool. A newly created or reused thread will execute
* @func with with the argument data.
*
* When the number of currently running threads is lower than the
* maximal allowed number of threads set by vips_concurrency_set(),
* a new thread is started (or reused).
* Otherwise, it stays in the queue until a thread in this pool
* finishes its previous task and processes the @func.
*
* See also: vips_concurrency_set().
*
* Returns: 0 on success, -1 on error.
*/
if( !(pool->thr = VIPS_ARRAY( NULL, pool->nthr, VipsThread * )) )
return( -1 );
for( i = 0; i < pool->nthr; i++ )
pool->thr[i] = NULL;
int
vips_threadpool_push( const char *name, GFunc func, gpointer data )
{
VipsThreadExec *exec;
GError *error = NULL;
gboolean result;
/* Attach threads and start them working.
*/
for( i = 0; i < pool->nthr; i++ )
if( !(pool->thr[i] = vips_thread_new( pool )) ) {
vips_threadpool_kill_threads( pool );
exec = g_new( VipsThreadExec, 1 );
exec->name = name;
exec->func = func;
exec->data = data;
result = g_thread_pool_push( vips__pool, exec, &error );
if( error ) {
vips_g_error( &error );
return( -1 );
}
return( 0 );
return( result ? 0 : -1 );
}
/**
@ -835,8 +704,8 @@ vips_threadpool_create_threads( VipsThreadpool *pool )
* is allocated to it to build the per-thread state. Per-thread state is used
* by #VipsThreadpoolAllocate and #VipsThreadpoolWork to communicate.
*
* #VipsThreadState is a subclass of #VipsObject. Start functions are called
* from allocate, that is, they are single-threaded.
* #VipsThreadState is a subclass of #VipsObject. Start functions can be
* executed concurrently.
*
* See also: vips_threadpool_run().
*
@ -908,7 +777,8 @@ vips_threadpool_create_threads( VipsThreadpool *pool )
* @progress: give progress feedback about a work unit, or %NULL
* @a: client data
*
* This function runs a set of threads over an image. Each thread first calls
* This function runs a set of threads over an image. It will use a newly
* created or reused thread within the #VipsThreadPool. Each thread first calls
* @start to create new per-thread state, then runs
* @allocate to set up a new work unit (perhaps the next tile in an image, for
* example), then @work to process that work unit. After each unit is
@ -918,9 +788,9 @@ vips_threadpool_create_threads( VipsThreadpool *pool )
* The object returned by @start must be an instance of a subclass of
* #VipsThreadState. Use this to communicate between @allocate and @work.
*
* @allocate and @start are always single-threaded (so they can write to the
* per-pool state), whereas @work can be executed concurrently. @progress is
* always called by
* @allocate is always single-threaded (so it can write to the
* per-pool state), whereas @start and @work can be executed concurrently.
* @progress is always called by
* the main thread (ie. the thread which called vips_threadpool_run()).
*
* See also: vips_concurrency_set().
@ -935,60 +805,61 @@ vips_threadpool_run( VipsImage *im,
VipsThreadpoolProgressFn progress,
void *a )
{
VipsThreadpool *pool;
VipsTask *task;
int n_tasks;
int i;
int result;
if( !(pool = vips_threadpool_new( im )) )
if( !(task = vips_task_new( im, &n_tasks )) )
return( -1 );
pool->start = start;
pool->allocate = allocate;
pool->work = work;
pool->a = a;
task->start = start;
task->allocate = allocate;
task->work = work;
task->a = a;
/* Attach workers and set them going.
/* Put the tasks to work.
*/
if( vips_threadpool_create_threads( pool ) ) {
vips_threadpool_free( pool );
for( i = 0; i < n_tasks; i++ )
if( vips_threadpool_push( "worker", vips_task_run, task ) )
return( -1 );
}
for(;;) {
/* Wait for a tick from a worker.
*/
vips_semaphore_down( &pool->tick );
vips_semaphore_down( &task->tick );
VIPS_DEBUG_MSG( "vips_threadpool_run: tick\n" );
if( pool->stop ||
pool->error )
if( task->stop ||
task->error )
break;
if( progress &&
progress( pool->a ) )
pool->error = TRUE;
progress( task->a ) )
task->error = TRUE;
if( pool->stop ||
pool->error )
if( task->stop ||
task->error )
break;
}
/* Wait for them all to hit finish.
*/
vips_semaphore_downn( &pool->finish, pool->nthr );
vips_semaphore_downn( &task->finish, n_tasks );
/* Return 0 for success.
*/
result = pool->error ? -1 : 0;
result = task->error ? -1 : 0;
vips_threadpool_free( pool );
vips_task_free( task );
vips_image_minimise_all( im );
return( result );
}
/* Start up threadpools. This is called during vips_init.
/* Create a new threadpool. This is called during vips_init.
*/
void
vips__threadpool_init( void )
@ -999,6 +870,12 @@ vips__threadpool_init( void )
if( g_getenv( "VIPS_STALL" ) )
vips__stall = TRUE;
if( vips__concurrency == 0 )
vips__concurrency = vips__concurrency_get_default();
vips__pool = g_thread_pool_new( vips_thread_main_loop, NULL,
-1, FALSE, NULL );
}
/**
@ -1076,4 +953,3 @@ vips_get_tile_size( VipsImage *im,
"groups of %d scanlines\n",
*tile_width, *tile_height, *n_lines );
}

115
suppressions/tsan.supp Normal file
View File

@ -0,0 +1,115 @@
# an unlocked FALSE/TRUE assignment, which is fine
race:vips_region_prepare_to
race:render_kill
race:render_reschedule
# an unlocked NULL assignment, which is fine
race:render_thread
# unlocked read of pixels-processed-so-far, which is fine
race:vips_sink_base_progress
race:wbuffer_allocate_fn
race:sink_memory_area_allocate_fn
# use of *stop from generate funcs is unlocked, but fine
race:vips_threadpool_run
# guarded with vips_tracked_mutex, so it should be fine
race:vips_tracked_mem
race:vips_tracked_allocs
race:vips_tracked_files
# guarded with task->allocate_lock, so it should be fine
race:vips_thread_allocate
race:vips_task_work_unit
# guarded with vips_cache_lock, so it should be fine
race:vips_cache_table
race:vips_cache_drop_all
race:vips_cache_get_first
race:vips_cache_get_lru_cb
race:vips_cache_remove
# guarded with vips__global_lock, so it should be fine
race:vips_error_freeze_count
race:vips__link_make
race:vips__link_map
race:vips__link_break_all
race:tile_name
# the double-buffered output and write-behind thread are non-racy
race:wbuffer_new
race:wbuffer_write
race:wbuffer_work_fn
race:wbuffer_free
race:write_thread_state_new
# thread-local variables (i.e. GPrivate) are harmless
race:vips_thread_profile_key
race:buffer_thread_key
# glib signals are probably non-racy
race:vips_image_preeval
race:vips_image_eval
race:vips_image_posteval
race:vips_image_written
race:vips_image_real_written
race:vips_image_save_cb
race:render_close_cb
race:readjpeg_close_cb
# semaphores are probably non-racy
race:vips_semaphore_*
# guarded with sink->sslock, so it should be fine
race:sink_call_start
race:sink_call_stop
race:vips_hist_find_start
race:vips_hist_find_stop
race:vips_statistic_scan_start
race:vips_statistic_scan_stop
race:vips_max_stop
race:vips_values_add
race:vips_deviate_stop
race:vips_arithmetic_start
race:histogram_new
# per-thread state allocate/dispose functions are non-racy
race:sink_thread_state_class_init
race:vips_thread_state_init
race:vips_thread_state_build
race:vips_thread_state_set
race:vips_thread_state_dispose
race:write_thread_state_new
race:vips_sink_base_init
race:vips_sink_thread_state_new
race:sink_memory_init
race:sink_memory_thread_state_new
race:sink_memory_free
race:sink_thread_state_build
race:sink_thread_state_dispose
race:buffer_cache_free
race:sink_init
race:sink_free
race:vips_sequential_dispose
race:vips_block_cache_dispose
race:vips_image_init
race:vips_image_build
race:vips_image_finalize
race:vips_image_dispose
# guarded with image->sslock, so it should be fine
race:vips__region_start
race:vips__region_stop
race:vips_region_dispose
race:vips__region_take_ownership
# is fine now, see: https://github.com/libvips/libvips/pull/1211
race:vips_image_temp_name
# is fine now, see: https://github.com/libvips/libvips/pull/1483
race:meta_new
race:meta_cp
race:vips_image_set
race:vips__image_copy_fields_array
race:vips__image_meta_copy