Reuse threads by using GLib's threadpool
This commit is contained in:
parent
1423c550cc
commit
80e0cc3d12
@ -111,6 +111,7 @@ extern gboolean vips__cache_trace;
|
|||||||
extern int vips__n_active_threads;
|
extern int vips__n_active_threads;
|
||||||
|
|
||||||
void vips__threadpool_init( void );
|
void vips__threadpool_init( void );
|
||||||
|
void vips__threadpool_shutdown( void );
|
||||||
|
|
||||||
void vips__cache_init( void );
|
void vips__cache_init( void );
|
||||||
|
|
||||||
|
@ -5,6 +5,8 @@
|
|||||||
* 17/3/10
|
* 17/3/10
|
||||||
* - from threadgroup
|
* - from threadgroup
|
||||||
* - rework with a simpler distributed work allocation model
|
* - rework with a simpler distributed work allocation model
|
||||||
|
* 02/02/20 kleisauke
|
||||||
|
* - reuse threads by using GLib's threadpool
|
||||||
*/
|
*/
|
||||||
|
|
||||||
/*
|
/*
|
||||||
@ -130,6 +132,7 @@ typedef int (*VipsThreadpoolWorkFn)( VipsThreadState *state, void *a );
|
|||||||
*/
|
*/
|
||||||
typedef int (*VipsThreadpoolProgressFn)( void *a );
|
typedef int (*VipsThreadpoolProgressFn)( void *a );
|
||||||
|
|
||||||
|
int vips_threadpool_push( const char *name, GFunc func, gpointer data );
|
||||||
int vips_threadpool_run( VipsImage *im,
|
int vips_threadpool_run( VipsImage *im,
|
||||||
VipsThreadStartFn start,
|
VipsThreadStartFn start,
|
||||||
VipsThreadpoolAllocateFn allocate,
|
VipsThreadpoolAllocateFn allocate,
|
||||||
|
@ -665,7 +665,7 @@ vips_leak( void )
|
|||||||
*
|
*
|
||||||
* This function needs to be called when a thread that has been using vips
|
* This function needs to be called when a thread that has been using vips
|
||||||
* exits. It is called for you by vips_shutdown() and for any threads created
|
* exits. It is called for you by vips_shutdown() and for any threads created
|
||||||
* by vips_g_thread_new().
|
* within the #VipsThreadPool.
|
||||||
*
|
*
|
||||||
* You will need to call it from threads created in
|
* You will need to call it from threads created in
|
||||||
* other ways or there will be memory leaks. If you do not call it, vips
|
* other ways or there will be memory leaks. If you do not call it, vips
|
||||||
@ -719,6 +719,8 @@ vips_shutdown( void )
|
|||||||
|
|
||||||
vips__thread_profile_stop();
|
vips__thread_profile_stop();
|
||||||
|
|
||||||
|
vips__threadpool_shutdown();
|
||||||
|
|
||||||
#ifdef HAVE_GSF
|
#ifdef HAVE_GSF
|
||||||
gsf_shutdown();
|
gsf_shutdown();
|
||||||
#endif /*HAVE_GSF*/
|
#endif /*HAVE_GSF*/
|
||||||
|
@ -23,6 +23,8 @@
|
|||||||
* - don't depend on image width when setting n_lines
|
* - don't depend on image width when setting n_lines
|
||||||
* 27/2/19 jtorresfabra
|
* 27/2/19 jtorresfabra
|
||||||
* - free threadpool earlier
|
* - free threadpool earlier
|
||||||
|
* 02/02/20 kleisauke
|
||||||
|
* - reuse threads by using GLib's threadpool
|
||||||
*/
|
*/
|
||||||
|
|
||||||
/*
|
/*
|
||||||
@ -87,6 +89,15 @@
|
|||||||
* @include: vips/vips.h
|
* @include: vips/vips.h
|
||||||
* @title: VipsThreadpool
|
* @title: VipsThreadpool
|
||||||
*
|
*
|
||||||
|
* A threadpool which allows reusing already started threads. Implementing
|
||||||
|
* this can be tedious and error-prone. Therefore we use the GLib
|
||||||
|
* provided threadpool for our convenience. An added advantage is, that
|
||||||
|
* the threads can be shared between the different subsystems, when they
|
||||||
|
* are using GLib.
|
||||||
|
*
|
||||||
|
* The threadpool is created during vips_init() and is destroyed by
|
||||||
|
* vips_shutdown().
|
||||||
|
*
|
||||||
* vips_threadpool_run() loops a set of threads over an image. Threads take it
|
* vips_threadpool_run() loops a set of threads over an image. Threads take it
|
||||||
* in turns to allocate units of work (a unit might be a tile in an image),
|
* in turns to allocate units of work (a unit might be a tile in an image),
|
||||||
* then run in parallel to process those units. An optional progress function
|
* then run in parallel to process those units. An optional progress function
|
||||||
@ -121,6 +132,10 @@ static GPrivate *is_worker_key = NULL;
|
|||||||
*/
|
*/
|
||||||
static gboolean vips__stall = FALSE;
|
static gboolean vips__stall = FALSE;
|
||||||
|
|
||||||
|
/* The thread pool we'll use.
|
||||||
|
*/
|
||||||
|
static GThreadPool *vips__pool = NULL;
|
||||||
|
|
||||||
/* Glib 2.32 revised the thread API. We need some compat functions.
|
/* Glib 2.32 revised the thread API. We need some compat functions.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
@ -273,23 +288,38 @@ vips_g_thread_join( GThread *thread )
|
|||||||
return( result );
|
return( result );
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
typedef struct {
|
||||||
* vips_concurrency_set:
|
/* An name for this thread.
|
||||||
* @concurrency: number of threads to run
|
|
||||||
*
|
|
||||||
* Sets the number of worker threads that vips should use when running a
|
|
||||||
* #VipsThreadPool.
|
|
||||||
*
|
|
||||||
* The special value 0 means "default". In this case, the number of threads is
|
|
||||||
* set by the environment variable VIPS_CONCURRENCY, or if that is not set, the
|
|
||||||
* number of threads availble on the host machine.
|
|
||||||
*
|
|
||||||
* See also: vips_concurrency_get().
|
|
||||||
*/
|
*/
|
||||||
void
|
const char *name;
|
||||||
vips_concurrency_set( int concurrency )
|
|
||||||
|
/* The function to execute within the #VipsThreadPool.
|
||||||
|
*/
|
||||||
|
GFunc func;
|
||||||
|
|
||||||
|
/* User data that is handed over to func when it is called.
|
||||||
|
*/
|
||||||
|
gpointer data;
|
||||||
|
} VipsThreadExec;
|
||||||
|
|
||||||
|
static void
|
||||||
|
vips_thread_main_loop( gpointer thread_data, gpointer pool_data )
|
||||||
{
|
{
|
||||||
vips__concurrency = concurrency;
|
VipsThreadExec *exec = (VipsThreadExec *) thread_data;
|
||||||
|
|
||||||
|
/* Set this to something (anything) to tag this thread as a vips
|
||||||
|
* worker.
|
||||||
|
*/
|
||||||
|
g_private_set( is_worker_key, thread_data );
|
||||||
|
|
||||||
|
if( vips__thread_profile )
|
||||||
|
vips__thread_profile_attach( exec->name );
|
||||||
|
|
||||||
|
exec->func( exec->data, pool_data );
|
||||||
|
|
||||||
|
g_free( exec );
|
||||||
|
|
||||||
|
vips_thread_shutdown();
|
||||||
}
|
}
|
||||||
|
|
||||||
static int
|
static int
|
||||||
@ -371,6 +401,67 @@ get_num_processors( void )
|
|||||||
#endif /*!GLIB_CHECK_VERSION( 2, 48, 1 )*/
|
#endif /*!GLIB_CHECK_VERSION( 2, 48, 1 )*/
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/* The default concurrency, set by the environment variable VIPS_CONCURRENCY,
|
||||||
|
* or if that is not set, the number of threads available on the host machine.
|
||||||
|
*/
|
||||||
|
static int
|
||||||
|
vips__concurrency_get_default( void )
|
||||||
|
{
|
||||||
|
const char *str;
|
||||||
|
int nthr;
|
||||||
|
int x;
|
||||||
|
|
||||||
|
/* Tell the threads system how much concurrency we expect.
|
||||||
|
*/
|
||||||
|
if( vips__concurrency > 0 )
|
||||||
|
nthr = vips__concurrency;
|
||||||
|
else if( ((str = g_getenv( "VIPS_CONCURRENCY" ))
|
||||||
|
#if ENABLE_DEPRECATED
|
||||||
|
|| (str = g_getenv( "IM_CONCURRENCY" ))
|
||||||
|
#endif
|
||||||
|
) && (x = atoi( str )) > 0 )
|
||||||
|
nthr = x;
|
||||||
|
else
|
||||||
|
nthr = get_num_processors();
|
||||||
|
|
||||||
|
if( nthr < 1 || nthr > MAX_THREADS ) {
|
||||||
|
nthr = VIPS_CLIP( 1, nthr, MAX_THREADS );
|
||||||
|
|
||||||
|
g_warning( _( "threads clipped to %d" ), nthr );
|
||||||
|
}
|
||||||
|
|
||||||
|
return nthr;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* vips_concurrency_set:
|
||||||
|
* @concurrency: number of threads to run
|
||||||
|
*
|
||||||
|
* Sets the number of worker threads that vips should use when running a
|
||||||
|
* #VipsThreadPool.
|
||||||
|
*
|
||||||
|
* The special value 0 means "default". In this case, the number of threads is
|
||||||
|
* set by the environment variable VIPS_CONCURRENCY, or if that is not set, the
|
||||||
|
* number of threads available on the host machine.
|
||||||
|
*
|
||||||
|
* See also: vips_concurrency_get().
|
||||||
|
*/
|
||||||
|
void
|
||||||
|
vips_concurrency_set( int concurrency )
|
||||||
|
{
|
||||||
|
/* Tell the threads system how much concurrency we expect.
|
||||||
|
*/
|
||||||
|
if( concurrency < 1 )
|
||||||
|
concurrency = vips__concurrency_get_default();
|
||||||
|
else if( concurrency > MAX_THREADS ) {
|
||||||
|
concurrency = MAX_THREADS;
|
||||||
|
|
||||||
|
g_warning( _( "threads clipped to %d" ), MAX_THREADS );
|
||||||
|
}
|
||||||
|
|
||||||
|
vips__concurrency = concurrency;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* vips_concurrency_get:
|
* vips_concurrency_get:
|
||||||
*
|
*
|
||||||
@ -398,34 +489,7 @@ get_num_processors( void )
|
|||||||
int
|
int
|
||||||
vips_concurrency_get( void )
|
vips_concurrency_get( void )
|
||||||
{
|
{
|
||||||
const char *str;
|
return( vips__concurrency );
|
||||||
int nthr;
|
|
||||||
int x;
|
|
||||||
|
|
||||||
/* Tell the threads system how much concurrency we expect.
|
|
||||||
*/
|
|
||||||
if( vips__concurrency > 0 )
|
|
||||||
nthr = vips__concurrency;
|
|
||||||
else if( ((str = g_getenv( "VIPS_CONCURRENCY" ))
|
|
||||||
#if ENABLE_DEPRECATED
|
|
||||||
|| (str = g_getenv( "IM_CONCURRENCY" ))
|
|
||||||
#endif
|
|
||||||
) && (x = atoi( str )) > 0 )
|
|
||||||
nthr = x;
|
|
||||||
else
|
|
||||||
nthr = get_num_processors();
|
|
||||||
|
|
||||||
if( nthr < 1 || nthr > MAX_THREADS ) {
|
|
||||||
nthr = VIPS_CLIP( 1, nthr, MAX_THREADS );
|
|
||||||
|
|
||||||
g_warning( _( "threads clipped to %d" ), nthr );
|
|
||||||
}
|
|
||||||
|
|
||||||
/* Save for next time around.
|
|
||||||
*/
|
|
||||||
vips_concurrency_set( nthr );
|
|
||||||
|
|
||||||
return( nthr );
|
|
||||||
}
|
}
|
||||||
|
|
||||||
G_DEFINE_TYPE( VipsThreadState, vips_thread_state, VIPS_TYPE_OBJECT );
|
G_DEFINE_TYPE( VipsThreadState, vips_thread_state, VIPS_TYPE_OBJECT );
|
||||||
@ -500,41 +564,17 @@ vips_thread_state_new( VipsImage *im, void *a )
|
|||||||
VIPS_TYPE_THREAD_STATE, vips_thread_state_set, im, a ) ) );
|
VIPS_TYPE_THREAD_STATE, vips_thread_state_set, im, a ) ) );
|
||||||
}
|
}
|
||||||
|
|
||||||
/* What we track for each thread in the pool.
|
/* What we track for a task within the #VipsThreadPool.
|
||||||
*/
|
*/
|
||||||
typedef struct {
|
typedef struct _VipsTask {
|
||||||
/* All private.
|
|
||||||
*/
|
|
||||||
/*< private >*/
|
|
||||||
struct _VipsThreadpool *pool; /* Pool we are part of */
|
|
||||||
|
|
||||||
VipsThreadState *state;
|
|
||||||
|
|
||||||
/* Thread we are running.
|
|
||||||
*/
|
|
||||||
GThread *thread;
|
|
||||||
|
|
||||||
/* Set this to ask the thread to exit.
|
|
||||||
*/
|
|
||||||
gboolean exit;
|
|
||||||
|
|
||||||
/* Set by the thread if work or allocate return an error.
|
|
||||||
*/
|
|
||||||
gboolean error;
|
|
||||||
|
|
||||||
} VipsThread;
|
|
||||||
|
|
||||||
/* What we track for a group of threads working together.
|
|
||||||
*/
|
|
||||||
typedef struct _VipsThreadpool {
|
|
||||||
/* All private.
|
/* All private.
|
||||||
*/
|
*/
|
||||||
/*< private >*/
|
/*< private >*/
|
||||||
VipsImage *im; /* Image we are calculating */
|
VipsImage *im; /* Image we are calculating */
|
||||||
|
|
||||||
/* Start a thread, do a unit of work (runs in parallel) and allocate
|
/* Start or reuse a thread, do a unit of work (runs in parallel)
|
||||||
* a unit of work (serial). Plus the mutex we use to serialize work
|
* and allocate a unit of work (serial). Plus the mutex we use to
|
||||||
* allocation.
|
* serialize work allocation.
|
||||||
*/
|
*/
|
||||||
VipsThreadStartFn start;
|
VipsThreadStartFn start;
|
||||||
VipsThreadpoolAllocateFn allocate;
|
VipsThreadpoolAllocateFn allocate;
|
||||||
@ -542,10 +582,7 @@ typedef struct _VipsThreadpool {
|
|||||||
GMutex *allocate_lock;
|
GMutex *allocate_lock;
|
||||||
void *a; /* User argument to start / allocate / etc. */
|
void *a; /* User argument to start / allocate / etc. */
|
||||||
|
|
||||||
int nthr; /* Number of threads in pool */
|
/* The caller blocks here until all tasks finish.
|
||||||
VipsThread **thr; /* Threads */
|
|
||||||
|
|
||||||
/* The caller blocks here until all threads finish.
|
|
||||||
*/
|
*/
|
||||||
VipsSemaphore finish;
|
VipsSemaphore finish;
|
||||||
|
|
||||||
@ -560,46 +597,7 @@ typedef struct _VipsThreadpool {
|
|||||||
/* Set by Allocate (via an arg) to indicate normal end of computation.
|
/* Set by Allocate (via an arg) to indicate normal end of computation.
|
||||||
*/
|
*/
|
||||||
gboolean stop;
|
gboolean stop;
|
||||||
} VipsThreadpool;
|
} VipsTask;
|
||||||
|
|
||||||
/* Junk a thread.
|
|
||||||
*/
|
|
||||||
static void
|
|
||||||
vips_thread_free( VipsThread *thr )
|
|
||||||
{
|
|
||||||
/* Is there a thread running this region? Kill it!
|
|
||||||
*/
|
|
||||||
if( thr->thread ) {
|
|
||||||
thr->exit = 1;
|
|
||||||
|
|
||||||
/* Return value is always NULL (see thread_main_loop).
|
|
||||||
*/
|
|
||||||
(void) vips_g_thread_join( thr->thread );
|
|
||||||
thr->thread = NULL;
|
|
||||||
}
|
|
||||||
|
|
||||||
VIPS_FREEF( g_object_unref, thr->state );
|
|
||||||
thr->pool = NULL;
|
|
||||||
|
|
||||||
VIPS_FREE( thr );
|
|
||||||
}
|
|
||||||
|
|
||||||
static int
|
|
||||||
vips_thread_allocate( VipsThread *thr )
|
|
||||||
{
|
|
||||||
VipsThreadpool *pool = thr->pool;
|
|
||||||
|
|
||||||
g_assert( !pool->stop );
|
|
||||||
|
|
||||||
if( !thr->state &&
|
|
||||||
!(thr->state = pool->start( pool->im, pool->a )) )
|
|
||||||
return( -1 );
|
|
||||||
|
|
||||||
if( pool->allocate( thr->state, pool->a, &pool->stop ) )
|
|
||||||
return( -1 );
|
|
||||||
|
|
||||||
return( 0 );
|
|
||||||
}
|
|
||||||
|
|
||||||
/* Run this once per main loop. Get some work (single-threaded), then do it
|
/* Run this once per main loop. Get some work (single-threaded), then do it
|
||||||
* (many-threaded).
|
* (many-threaded).
|
||||||
@ -608,221 +606,203 @@ vips_thread_allocate( VipsThread *thr )
|
|||||||
* loaders a change to seek to the correct spot, see vips_sequential().
|
* loaders a change to seek to the correct spot, see vips_sequential().
|
||||||
*/
|
*/
|
||||||
static void
|
static void
|
||||||
vips_thread_work_unit( VipsThread *thr )
|
vips_task_work_unit( VipsTask *task, VipsThreadState *state )
|
||||||
{
|
{
|
||||||
VipsThreadpool *pool = thr->pool;
|
if( task->error )
|
||||||
|
|
||||||
if( thr->error )
|
|
||||||
return;
|
return;
|
||||||
|
|
||||||
VIPS_GATE_START( "vips_thread_work_unit: wait" );
|
VIPS_GATE_START( "vips_task_work_unit: wait" );
|
||||||
|
|
||||||
g_mutex_lock( pool->allocate_lock );
|
g_mutex_lock( task->allocate_lock );
|
||||||
|
|
||||||
VIPS_GATE_STOP( "vips_thread_work_unit: wait" );
|
VIPS_GATE_STOP( "vips_task_work_unit: wait" );
|
||||||
|
|
||||||
/* Has another worker signaled stop while we've been working?
|
/* Has another worker signaled stop while we've been working?
|
||||||
*/
|
*/
|
||||||
if( pool->stop ) {
|
if( task->stop ) {
|
||||||
g_mutex_unlock( pool->allocate_lock );
|
g_mutex_unlock( task->allocate_lock );
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
if( vips_thread_allocate( thr ) ) {
|
if( task->allocate( state, task->a, &task->stop ) ) {
|
||||||
thr->error = TRUE;
|
task->error = TRUE;
|
||||||
pool->error = TRUE;
|
g_mutex_unlock( task->allocate_lock );
|
||||||
g_mutex_unlock( pool->allocate_lock );
|
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Have we just signalled stop?
|
/* Have we just signalled stop?
|
||||||
*/
|
*/
|
||||||
if( pool->stop ) {
|
if( task->stop ) {
|
||||||
g_mutex_unlock( pool->allocate_lock );
|
g_mutex_unlock( task->allocate_lock );
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
g_mutex_unlock( pool->allocate_lock );
|
g_mutex_unlock( task->allocate_lock );
|
||||||
|
|
||||||
if( thr->state->stall &&
|
if( state->stall &&
|
||||||
vips__stall ) {
|
vips__stall ) {
|
||||||
/* Sleep for 0.5s. Handy for stressing the seq system. Stall
|
/* Sleep for 0.5s. Handy for stressing the seq system. Stall
|
||||||
* is set by allocate funcs in various places.
|
* is set by allocate funcs in various places.
|
||||||
*/
|
*/
|
||||||
g_usleep( 500000 );
|
g_usleep( 500000 );
|
||||||
thr->state->stall = FALSE;
|
state->stall = FALSE;
|
||||||
printf( "vips_thread_work_unit: "
|
printf( "vips_task_work_unit: "
|
||||||
"stall done, releasing y = %d ...\n", thr->state->y );
|
"stall done, releasing y = %d ...\n", state->y );
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Process a work unit.
|
/* Process a work unit.
|
||||||
*/
|
*/
|
||||||
if( pool->work( thr->state, pool->a ) ) {
|
if( task->work( state, task->a ) )
|
||||||
thr->error = TRUE;
|
task->error = TRUE;
|
||||||
pool->error = TRUE;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/* What runs as a thread ... loop, waiting to be told to do stuff.
|
/* What runs as a thread ... loop, waiting to be told to do stuff.
|
||||||
*/
|
*/
|
||||||
static void *
|
static void
|
||||||
vips_thread_main_loop( void *a )
|
vips_task_run( gpointer data, gpointer user_data )
|
||||||
{
|
{
|
||||||
VipsThread *thr = (VipsThread *) a;
|
VipsTask *task = (VipsTask *) data;
|
||||||
VipsThreadpool *pool = thr->pool;
|
VipsThreadState *state;
|
||||||
|
|
||||||
g_assert( pool == thr->pool );
|
VIPS_GATE_START( "vips_task_run: thread" );
|
||||||
|
|
||||||
VIPS_GATE_START( "vips_thread_main_loop: thread" );
|
// TODO: Could we move this to vips_task_work_unit()?
|
||||||
|
g_mutex_lock( task->allocate_lock );
|
||||||
|
if( !(state = task->start( task->im, task->a )) )
|
||||||
|
task->error = TRUE;
|
||||||
|
g_mutex_unlock( task->allocate_lock );
|
||||||
|
|
||||||
/* Process work units! Always tick, even if we are stopping, so the
|
/* Process work units! Always tick, even if we are stopping, so the
|
||||||
* main thread will wake up for exit.
|
* main thread will wake up for exit.
|
||||||
*/
|
*/
|
||||||
for(;;) {
|
for(;;) {
|
||||||
VIPS_GATE_START( "vips_thread_work_unit: u" );
|
VIPS_GATE_START( "vips_task_work_unit: u" );
|
||||||
vips_thread_work_unit( thr );
|
vips_task_work_unit( task, state );
|
||||||
VIPS_GATE_STOP( "vips_thread_work_unit: u" );
|
VIPS_GATE_STOP( "vips_task_work_unit: u" );
|
||||||
vips_semaphore_up( &pool->tick );
|
vips_semaphore_up( &task->tick );
|
||||||
|
|
||||||
if( pool->stop ||
|
if( task->stop ||
|
||||||
pool->error )
|
task->error )
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
g_mutex_lock( task->allocate_lock );
|
||||||
|
VIPS_FREEF( g_object_unref, state );
|
||||||
|
g_mutex_unlock( task->allocate_lock );
|
||||||
|
|
||||||
/* We are exiting: tell the main thread.
|
/* We are exiting: tell the main thread.
|
||||||
*/
|
*/
|
||||||
vips_semaphore_up( &pool->finish );
|
vips_semaphore_up( &task->finish );
|
||||||
|
|
||||||
VIPS_GATE_STOP( "vips_thread_main_loop: thread" );
|
VIPS_GATE_STOP( "vips_task_run: thread" );
|
||||||
|
|
||||||
return( NULL );
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Attach another thread to a threadpool.
|
/* Called from vips_shutdown().
|
||||||
*/
|
*/
|
||||||
static VipsThread *
|
void
|
||||||
vips_thread_new( VipsThreadpool *pool )
|
vips__threadpool_shutdown( void )
|
||||||
{
|
{
|
||||||
VipsThread *thr;
|
/* We may come here without having inited.
|
||||||
|
|
||||||
if( !(thr = VIPS_NEW( NULL, VipsThread )) )
|
|
||||||
return( NULL );
|
|
||||||
thr->pool = pool;
|
|
||||||
thr->state = NULL;
|
|
||||||
thr->thread = NULL;
|
|
||||||
thr->exit = 0;
|
|
||||||
thr->error = 0;
|
|
||||||
|
|
||||||
/* We can't build the state here, it has to be done by the worker
|
|
||||||
* itself the first time that allocate runs so that any regions are
|
|
||||||
* owned by the correct thread.
|
|
||||||
*/
|
*/
|
||||||
|
if( vips__pool ) {
|
||||||
|
VIPS_DEBUG_MSG( "vips__threadpool_shutdown: (%p)\n", vips__pool );
|
||||||
|
|
||||||
if( !(thr->thread = vips_g_thread_new( "worker",
|
g_thread_pool_free( vips__pool, TRUE, TRUE );
|
||||||
vips_thread_main_loop, thr )) ) {
|
vips__pool = NULL;
|
||||||
vips_thread_free( thr );
|
}
|
||||||
return( NULL );
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return( thr );
|
static VipsTask *
|
||||||
}
|
vips_task_new( VipsImage *im, int *n_tasks )
|
||||||
|
|
||||||
/* Kill all threads in a threadpool, if there are any. Can be called multiple
|
|
||||||
* times.
|
|
||||||
*/
|
|
||||||
static void
|
|
||||||
vips_threadpool_kill_threads( VipsThreadpool *pool )
|
|
||||||
{
|
{
|
||||||
if( pool->thr ) {
|
VipsTask *task;
|
||||||
int i;
|
|
||||||
|
|
||||||
for( i = 0; i < pool->nthr; i++ )
|
|
||||||
VIPS_FREEF( vips_thread_free, pool->thr[i] );
|
|
||||||
|
|
||||||
VIPS_DEBUG_MSG( "vips_threadpool_kill_threads: "
|
|
||||||
"killed %d threads\n", pool->nthr );
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
static void
|
|
||||||
vips_threadpool_free( VipsThreadpool *pool )
|
|
||||||
{
|
|
||||||
VIPS_DEBUG_MSG( "vips_threadpool_free: \"%s\" (%p)\n",
|
|
||||||
pool->im->filename, pool );
|
|
||||||
|
|
||||||
vips_threadpool_kill_threads( pool );
|
|
||||||
VIPS_FREEF( vips_g_mutex_free, pool->allocate_lock );
|
|
||||||
vips_semaphore_destroy( &pool->finish );
|
|
||||||
vips_semaphore_destroy( &pool->tick );
|
|
||||||
VIPS_FREE( pool->thr );
|
|
||||||
VIPS_FREE( pool );
|
|
||||||
}
|
|
||||||
|
|
||||||
static VipsThreadpool *
|
|
||||||
vips_threadpool_new( VipsImage *im )
|
|
||||||
{
|
|
||||||
VipsThreadpool *pool;
|
|
||||||
int tile_width;
|
int tile_width;
|
||||||
int tile_height;
|
int tile_height;
|
||||||
gint64 n_tiles;
|
gint64 n_tiles;
|
||||||
int n_lines;
|
int n_lines;
|
||||||
|
|
||||||
/* Allocate and init new thread block.
|
/* Allocate and init a new task.
|
||||||
*/
|
*/
|
||||||
if( !(pool = VIPS_NEW( NULL, VipsThreadpool )) )
|
if( !(task = VIPS_NEW( NULL, VipsTask )) )
|
||||||
return( NULL );
|
return( NULL );
|
||||||
pool->im = im;
|
task->im = im;
|
||||||
pool->allocate = NULL;
|
task->allocate = NULL;
|
||||||
pool->work = NULL;
|
task->work = NULL;
|
||||||
pool->allocate_lock = vips_g_mutex_new();
|
task->allocate_lock = vips_g_mutex_new();
|
||||||
pool->nthr = vips_concurrency_get();
|
vips_semaphore_init( &task->finish, 0, "finish" );
|
||||||
pool->thr = NULL;
|
vips_semaphore_init( &task->tick, 0, "tick" );
|
||||||
vips_semaphore_init( &pool->finish, 0, "finish" );
|
task->error = FALSE;
|
||||||
vips_semaphore_init( &pool->tick, 0, "tick" );
|
task->stop = FALSE;
|
||||||
pool->error = FALSE;
|
|
||||||
pool->stop = FALSE;
|
|
||||||
|
|
||||||
/* If this is a tiny image, we won't need all nthr threads. Guess how
|
*n_tasks = vips_concurrency_get();
|
||||||
|
|
||||||
|
/* If this is a tiny image, we won't need all n_tasks. Guess how
|
||||||
* many tiles we might need to cover the image and use that to limit
|
* many tiles we might need to cover the image and use that to limit
|
||||||
* the number of threads we create.
|
* the number of tasks we create.
|
||||||
*/
|
*/
|
||||||
vips_get_tile_size( im, &tile_width, &tile_height, &n_lines );
|
vips_get_tile_size( im, &tile_width, &tile_height, &n_lines );
|
||||||
n_tiles = (1 + (gint64) im->Xsize / tile_width) *
|
n_tiles = (1 + (gint64) im->Xsize / tile_width) *
|
||||||
(1 + (gint64) im->Ysize / tile_height);
|
(1 + (gint64) im->Ysize / tile_height);
|
||||||
n_tiles = VIPS_CLIP( 0, n_tiles, MAX_THREADS );
|
n_tiles = VIPS_MAX( 0, n_tiles );
|
||||||
pool->nthr = VIPS_MIN( pool->nthr, n_tiles );
|
*n_tasks = VIPS_MIN( *n_tasks, n_tiles );
|
||||||
|
|
||||||
VIPS_DEBUG_MSG( "vips_threadpool_new: \"%s\" (%p), with %d threads\n",
|
VIPS_DEBUG_MSG( "vips_task_new: \"%s\" (%p), with %d tasks\n",
|
||||||
im->filename, pool, pool->nthr );
|
im->filename, task, *n_tasks );
|
||||||
|
|
||||||
return( pool );
|
return( task );
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Attach a set of threads.
|
static void
|
||||||
*/
|
vips_task_free( VipsTask *task )
|
||||||
static int
|
|
||||||
vips_threadpool_create_threads( VipsThreadpool *pool )
|
|
||||||
{
|
{
|
||||||
int i;
|
VIPS_DEBUG_MSG( "vips_task_free: \"%s\" (%p)\n",
|
||||||
|
task->im->filename, task );
|
||||||
|
|
||||||
g_assert( !pool->thr );
|
VIPS_FREEF( vips_g_mutex_free, task->allocate_lock );
|
||||||
|
vips_semaphore_destroy( &task->finish );
|
||||||
|
vips_semaphore_destroy( &task->tick );
|
||||||
|
VIPS_FREE( task );
|
||||||
|
}
|
||||||
|
|
||||||
/* Make thread array.
|
/**
|
||||||
|
* vips_threadpool_push.
|
||||||
|
* @name an name for the thread
|
||||||
|
* @func a function to execute in the thread pool
|
||||||
|
* @data an argument to supply to @func
|
||||||
|
*
|
||||||
|
* Inserts a new function into the list of tasks to be executed by the
|
||||||
|
* #VipsThreadPool. A newly created or reused thread will execute
|
||||||
|
* @func with with the argument data.
|
||||||
|
*
|
||||||
|
* When the number of currently running threads is lower than the
|
||||||
|
* maximal allowed number of threads set by vips_concurrency_set(),
|
||||||
|
* a new thread is started (or reused).
|
||||||
|
* Otherwise, it stays in the queue until a thread in this pool
|
||||||
|
* finishes its previous task and processes the @func.
|
||||||
|
*
|
||||||
|
* See also: vips_concurrency_set().
|
||||||
|
*
|
||||||
|
* Returns: 0 on success, -1 on error.
|
||||||
*/
|
*/
|
||||||
if( !(pool->thr = VIPS_ARRAY( NULL, pool->nthr, VipsThread * )) )
|
int
|
||||||
return( -1 );
|
vips_threadpool_push( const char *name, GFunc func, gpointer data )
|
||||||
for( i = 0; i < pool->nthr; i++ )
|
{
|
||||||
pool->thr[i] = NULL;
|
VipsThreadExec *exec;
|
||||||
|
GError *error = NULL;
|
||||||
|
gboolean result;
|
||||||
|
|
||||||
/* Attach threads and start them working.
|
exec = g_new( VipsThreadExec, 1 );
|
||||||
*/
|
exec->name = name;
|
||||||
for( i = 0; i < pool->nthr; i++ )
|
exec->func = func;
|
||||||
if( !(pool->thr[i] = vips_thread_new( pool )) ) {
|
exec->data = data;
|
||||||
vips_threadpool_kill_threads( pool );
|
|
||||||
|
result = g_thread_pool_push( vips__pool, exec, &error );
|
||||||
|
if( error ) {
|
||||||
|
vips_g_error( &error );
|
||||||
return( -1 );
|
return( -1 );
|
||||||
}
|
}
|
||||||
|
|
||||||
return( 0 );
|
return( result ? 0 : -1 );
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -908,7 +888,8 @@ vips_threadpool_create_threads( VipsThreadpool *pool )
|
|||||||
* @progress: give progress feedback about a work unit, or %NULL
|
* @progress: give progress feedback about a work unit, or %NULL
|
||||||
* @a: client data
|
* @a: client data
|
||||||
*
|
*
|
||||||
* This function runs a set of threads over an image. Each thread first calls
|
* This function runs a set of threads over an image. It will use a newly
|
||||||
|
* created or reused thread within the #VipsThreadPool. Each thread first calls
|
||||||
* @start to create new per-thread state, then runs
|
* @start to create new per-thread state, then runs
|
||||||
* @allocate to set up a new work unit (perhaps the next tile in an image, for
|
* @allocate to set up a new work unit (perhaps the next tile in an image, for
|
||||||
* example), then @work to process that work unit. After each unit is
|
* example), then @work to process that work unit. After each unit is
|
||||||
@ -935,60 +916,61 @@ vips_threadpool_run( VipsImage *im,
|
|||||||
VipsThreadpoolProgressFn progress,
|
VipsThreadpoolProgressFn progress,
|
||||||
void *a )
|
void *a )
|
||||||
{
|
{
|
||||||
VipsThreadpool *pool;
|
VipsTask *task;
|
||||||
|
int n_tasks;
|
||||||
|
int i;
|
||||||
int result;
|
int result;
|
||||||
|
|
||||||
if( !(pool = vips_threadpool_new( im )) )
|
if( !(task = vips_task_new( im, &n_tasks )) )
|
||||||
return( -1 );
|
return( -1 );
|
||||||
|
|
||||||
pool->start = start;
|
task->start = start;
|
||||||
pool->allocate = allocate;
|
task->allocate = allocate;
|
||||||
pool->work = work;
|
task->work = work;
|
||||||
pool->a = a;
|
task->a = a;
|
||||||
|
|
||||||
/* Attach workers and set them going.
|
/* Put the tasks to work.
|
||||||
*/
|
*/
|
||||||
if( vips_threadpool_create_threads( pool ) ) {
|
for( i = 0; i < n_tasks; i++ )
|
||||||
vips_threadpool_free( pool );
|
if( vips_threadpool_push( "worker", vips_task_run, task ) )
|
||||||
return( -1 );
|
return( -1 );
|
||||||
}
|
|
||||||
|
|
||||||
for(;;) {
|
for(;;) {
|
||||||
/* Wait for a tick from a worker.
|
/* Wait for a tick from a worker.
|
||||||
*/
|
*/
|
||||||
vips_semaphore_down( &pool->tick );
|
vips_semaphore_down( &task->tick );
|
||||||
|
|
||||||
VIPS_DEBUG_MSG( "vips_threadpool_run: tick\n" );
|
VIPS_DEBUG_MSG( "vips_threadpool_run: tick\n" );
|
||||||
|
|
||||||
if( pool->stop ||
|
if( task->stop ||
|
||||||
pool->error )
|
task->error )
|
||||||
break;
|
break;
|
||||||
|
|
||||||
if( progress &&
|
if( progress &&
|
||||||
progress( pool->a ) )
|
progress( task->a ) )
|
||||||
pool->error = TRUE;
|
task->error = TRUE;
|
||||||
|
|
||||||
if( pool->stop ||
|
if( task->stop ||
|
||||||
pool->error )
|
task->error )
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Wait for them all to hit finish.
|
/* Wait for them all to hit finish.
|
||||||
*/
|
*/
|
||||||
vips_semaphore_downn( &pool->finish, pool->nthr );
|
vips_semaphore_downn( &task->finish, n_tasks );
|
||||||
|
|
||||||
/* Return 0 for success.
|
/* Return 0 for success.
|
||||||
*/
|
*/
|
||||||
result = pool->error ? -1 : 0;
|
result = task->error ? -1 : 0;
|
||||||
|
|
||||||
vips_threadpool_free( pool );
|
vips_task_free( task );
|
||||||
|
|
||||||
vips_image_minimise_all( im );
|
vips_image_minimise_all( im );
|
||||||
|
|
||||||
return( result );
|
return( result );
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Start up threadpools. This is called during vips_init.
|
/* Create a new threadpool. This is called during vips_init.
|
||||||
*/
|
*/
|
||||||
void
|
void
|
||||||
vips__threadpool_init( void )
|
vips__threadpool_init( void )
|
||||||
@ -999,6 +981,12 @@ vips__threadpool_init( void )
|
|||||||
|
|
||||||
if( g_getenv( "VIPS_STALL" ) )
|
if( g_getenv( "VIPS_STALL" ) )
|
||||||
vips__stall = TRUE;
|
vips__stall = TRUE;
|
||||||
|
|
||||||
|
if( vips__concurrency == 0 )
|
||||||
|
vips__concurrency = vips__concurrency_get_default();
|
||||||
|
|
||||||
|
vips__pool = g_thread_pool_new( vips_thread_main_loop, NULL,
|
||||||
|
-1, FALSE, NULL );
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -1076,4 +1064,3 @@ vips_get_tile_size( VipsImage *im,
|
|||||||
"groups of %d scanlines\n",
|
"groups of %d scanlines\n",
|
||||||
*tile_width, *tile_height, *n_lines );
|
*tile_width, *tile_height, *n_lines );
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user