libvips/libvips/iofuncs/threadpool.c

678 lines
16 KiB
C
Raw Normal View History

2010-03-18 16:05:24 +01:00
/* Support for thread pools ... like threadgroups, but lighter.
*
* 18/3/10
* - from threadgroup.c
2010-03-21 01:49:30 +01:00
* - distributed work allocation idea from Christian Blenia, thank you
* very much
2010-03-21 23:39:47 +01:00
* 21/3/10
* - progress feedback
* - only expose VipsThreadState
2010-03-18 16:05:24 +01:00
*/
/*
This file is part of VIPS.
VIPS is free software; you can redistribute it and/or modify
it under the terms of the GNU Lesser General Public License as published by
the Free Software Foundation; either version 2 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
*/
/*
These files are distributed with VIPS - http://www.vips.ecs.soton.ac.uk
*/
/*
#define TIME_THREAD
#define DEBUG_CREATE
#define DEBUG_IO
2010-03-21 00:56:35 +01:00
*/
2010-03-18 16:05:24 +01:00
#ifdef HAVE_CONFIG_H
#include <config.h>
#endif /*HAVE_CONFIG_H*/
#include <vips/intl.h>
#include <stdio.h>
#include <stdlib.h>
#ifdef HAVE_UNISTD_H
#include <unistd.h>
#endif /*HAVE_UNISTD_H*/
#include <errno.h>
#include <vips/vips.h>
#include <vips/internal.h>
#include <vips/thread.h>
#ifdef WITH_DMALLOC
#include <dmalloc.h>
#endif /*WITH_DMALLOC*/
/**
* SECTION: threadpool
* @short_description: pools of worker threads ... a lighter version of
* threadgroups
* @stability: Stable
* @see_also: <link linkend="libvips-generate">generate</link>
* @include: vips/vips.h
*
2010-03-21 23:39:47 +01:00
* vips_threadpool_run() loops a set of threads over an image. Threads take it
* in turns to allocate units of work (a unit might be a tile in an image),
* then run in parallel to process those units. An optional progress function
* can be used to give feedback.
2010-03-18 16:05:24 +01:00
*
2010-03-21 23:39:47 +01:00
* This is like threadgroup, but workers allocate work units themselves. This
* reduces the synchronisation overhead and improves scalability.
*/
/* What we track for each thread in the pool.
2010-03-18 16:05:24 +01:00
*/
2010-03-21 23:39:47 +01:00
typedef struct {
/* All private.
*/
/*< private >*/
VipsThreadState state;
struct _VipsThreadpool *pool; /* Pool we are part of */
/* Thread we are running.
*/
GThread *thread;
/* Set this to ask the thread to exit.
*/
gboolean exit;
/* Set by the thread if work or allocate return an error.
*/
gboolean error;
#ifdef TIME_THREAD
double *btime, *etime;
int tpos;
#endif /*TIME_THREAD*/
} VipsThread;
/* What we track for a group of threads working together.
*/
typedef struct _VipsThreadpool {
/* All private.
*/
/*< private >*/
VipsImage *im; /* Image we are calculating */
/* Do a unit of work (runs in parallel) and allocate a unit of work
* (serial). Plus the mutex we use to serialize work allocation.
*/
VipsThreadpoolAllocate allocate;
VipsThreadpoolWork work;
GMutex *allocate_lock;
void *a, *b, *c; /* User arguments to work / allocate */
int nthr; /* Number of threads in pool */
VipsThread **thr; /* Threads */
/* The caller blocks here until all threads finish.
*/
im_semaphore_t finish;
/* Workers up this for every loop to make the main thread tick.
*/
im_semaphore_t tick;
/* Set this to abort evaluation early with an error.
*/
gboolean error;
/* Set by Allocate (via an arg) to indicate normal end of computation.
*/
gboolean stop;
} VipsThreadpool;
2010-03-18 16:05:24 +01:00
#ifdef TIME_THREAD
/* Size of time buffers.
*/
#define IM_TBUF_SIZE (20000)
2010-03-18 23:08:07 +01:00
static GTimer *thread_timer = NULL;
2010-03-18 16:05:24 +01:00
#endif /*TIME_THREAD*/
#ifdef TIME_THREAD
/* Save time buffers.
*/
static int
2010-03-21 23:39:47 +01:00
vips_thread_save_time_buffers( VipsThread *thr )
2010-03-18 16:05:24 +01:00
{
int i;
static int rn = 1;
FILE *fp;
2010-03-18 23:08:07 +01:00
char name[256];
2010-03-18 16:05:24 +01:00
im_snprintf( name, 256, "time%d", rn++ );
2010-03-21 23:39:47 +01:00
printf( "vips_thread_save_time_buffers: "
"saving buffer to \"%s\"\n", name );
2010-03-18 16:05:24 +01:00
if( !(fp = fopen( name, "w" )) )
error_exit( "unable to write to \"%s\"", name );
2010-03-18 23:08:07 +01:00
for( i = 0; i < thr->tpos; i++ )
fprintf( fp, "%g, %g\n", thr->btime[i], thr->etime[i] );
2010-03-18 16:05:24 +01:00
fclose( fp );
return( 0 );
}
#endif /*TIME_THREAD*/
/* Junk a thread.
*/
static void
2010-03-21 23:39:47 +01:00
vips_thread_free( VipsThread *thr )
2010-03-18 16:05:24 +01:00
{
/* Is there a thread running this region? Kill it!
*/
if( thr->thread ) {
2010-03-21 00:56:35 +01:00
thr->exit = 1;
2010-03-18 16:05:24 +01:00
/* Return value is always NULL (see thread_main_loop).
*/
(void) g_thread_join( thr->thread );
#ifdef DEBUG_CREATE
printf( "thread_free: g_thread_join()\n" );
#endif /*DEBUG_CREATE*/
thr->thread = NULL;
}
2010-03-21 23:39:47 +01:00
IM_FREEF( im_region_free, thr->state.reg );
2010-03-18 23:08:07 +01:00
thr->pool = NULL;
2010-03-18 16:05:24 +01:00
#ifdef TIME_THREAD
if( thr->btime )
2010-03-21 23:39:47 +01:00
(void) vips_thread_save_time_buffers( thr );
2010-03-18 16:05:24 +01:00
#endif /*TIME_THREAD*/
}
2010-03-21 00:56:35 +01:00
/* The main loop: get some work, do it! Can run from many worker threads, or
2010-03-21 14:54:56 +01:00
* from the main thread if threading is off.
2010-03-18 16:05:24 +01:00
*/
static void
2010-03-21 23:39:47 +01:00
vips_thread_work_unit( VipsThread *thr )
2010-03-18 16:05:24 +01:00
{
VipsThreadpool *pool = thr->pool;
2010-03-21 14:54:56 +01:00
/* Ask for a work unit.
*/
g_mutex_lock( pool->allocate_lock );
2010-03-21 23:39:47 +01:00
2010-03-21 14:54:56 +01:00
if( !pool->stop ) {
2010-03-21 23:39:47 +01:00
if( pool->allocate( &thr->state,
2010-03-21 14:54:56 +01:00
pool->a, pool->b, pool->c, &pool->stop ) ) {
thr->error = TRUE;
pool->error = TRUE;
2010-03-18 23:08:07 +01:00
}
2010-03-21 14:54:56 +01:00
}
2010-03-21 23:39:47 +01:00
2010-03-21 14:54:56 +01:00
g_mutex_unlock( pool->allocate_lock );
2010-03-18 16:05:24 +01:00
2010-03-21 14:54:56 +01:00
if( pool->stop || pool->error )
return;
2010-03-18 16:05:24 +01:00
#ifdef TIME_THREAD
2010-03-21 14:54:56 +01:00
/* Note start time.
*/
if( thr->btime && thr->tpos < IM_TBUF_SIZE )
thr->btime[thr->tpos] =
g_timer_elapsed( thread_timer, NULL );
2010-03-18 16:05:24 +01:00
#endif /*TIME_THREAD*/
2010-03-21 14:54:56 +01:00
/* Process a work unit.
*/
2010-03-21 23:39:47 +01:00
if( pool->work( &thr->state, pool->a, pool->b, pool->c ) ) {
2010-03-21 14:54:56 +01:00
thr->error = TRUE;
pool->error = TRUE;
}
2010-03-18 16:05:24 +01:00
#ifdef TIME_THREAD
2010-03-21 14:54:56 +01:00
/* Note stop time.
*/
if( thr->etime && thr->tpos < IM_TBUF_SIZE ) {
thr->etime[thr->tpos] =
g_timer_elapsed( thread_timer, NULL );
thr->tpos += 1;
2010-03-18 16:05:24 +01:00
}
2010-03-21 14:54:56 +01:00
#endif /*TIME_THREAD*/
2010-03-21 00:56:35 +01:00
}
#ifdef HAVE_THREADS
/* What runs as a thread ... loop, waiting to be told to do stuff.
*/
static void *
2010-03-21 23:39:47 +01:00
vips_thread_main_loop( void *a )
2010-03-21 00:56:35 +01:00
{
VipsThread *thr = (VipsThread *) a;
VipsThreadpool *pool = thr->pool;
g_assert( pool == thr->pool );
/* We now control the region (it was created by pool when we
* were built).
*/
2010-03-21 23:39:47 +01:00
im__region_take_ownership( thr->state.reg );
2010-03-21 00:56:35 +01:00
2010-03-21 14:54:56 +01:00
/* Process work units! Always tick, even if we are stopping, so the
* main thread will wake up for exit.
2010-03-21 00:56:35 +01:00
*/
2010-03-21 14:54:56 +01:00
for(;;) {
2010-03-21 23:39:47 +01:00
vips_thread_work_unit( thr );
2010-03-21 14:54:56 +01:00
im_semaphore_up( &pool->tick );
2010-03-18 16:05:24 +01:00
2010-03-21 14:54:56 +01:00
if( pool->stop || pool->error )
break;
}
/* We are exiting: tell the main thread.
2010-03-18 23:08:07 +01:00
*/
im_semaphore_up( &pool->finish );
2010-03-18 16:05:24 +01:00
return( NULL );
}
#endif /*HAVE_THREADS*/
/* Attach another thread to a threadgroup.
*/
static VipsThread *
vips_thread_new( VipsThreadpool *pool )
{
VipsThread *thr;
if( !(thr = IM_NEW( pool->im, VipsThread )) )
return( NULL );
thr->pool = pool;
2010-03-21 23:39:47 +01:00
thr->state.reg = NULL;
2010-03-18 16:05:24 +01:00
thr->thread = NULL;
2010-03-21 00:56:35 +01:00
thr->exit = 0;
2010-03-18 16:05:24 +01:00
thr->error = 0;
#ifdef TIME_THREAD
thr->btime = NULL;
thr->etime = NULL;
thr->tpos = 0;
#endif /*TIME_THREAD*/
/* Attach stuff.
*/
2010-03-21 23:39:47 +01:00
if( !(thr->state.reg = im_region_create( pool->im )) ) {
vips_thread_free( thr );
2010-03-18 16:05:24 +01:00
return( NULL );
}
/* Get ready to hand the region over to the thread.
*/
2010-03-21 23:39:47 +01:00
im__region_no_ownership( thr->state.reg );
2010-03-18 16:05:24 +01:00
#ifdef TIME_THREAD
2010-03-18 23:08:07 +01:00
thr->btime = IM_ARRAY( pool->im, IM_TBUF_SIZE, double );
thr->etime = IM_ARRAY( pool->im, IM_TBUF_SIZE, double );
2010-03-18 16:05:24 +01:00
if( !thr->btime || !thr->etime ) {
thread_free( thr );
return( NULL );
}
#endif /*TIME_THREAD*/
#ifdef HAVE_THREADS
/* Make a worker thread. We have to use g_thread_create_full() because
* we need to insist on a non-tiny stack. Some platforms default to
* very small values (eg. various BSDs).
*/
2010-03-21 23:39:47 +01:00
if( !(thr->thread = g_thread_create_full( vips_thread_main_loop, thr,
2010-03-18 16:05:24 +01:00
IM__DEFAULT_STACK_SIZE, TRUE, FALSE,
G_THREAD_PRIORITY_NORMAL, NULL )) ) {
im_error( "threadgroup_thread_new",
"%s", _( "unable to create thread" ) );
2010-03-21 23:39:47 +01:00
vips_thread_free( thr );
2010-03-18 16:05:24 +01:00
return( NULL );
}
#ifdef DEBUG_CREATE
2010-03-21 23:39:47 +01:00
printf( "vips_thread_new: g_thread_create_full()\n" );
2010-03-18 16:05:24 +01:00
#endif /*DEBUG_CREATE*/
#endif /*HAVE_THREADS*/
return( thr );
}
/* Kill all threads in a threadgroup, if there are any.
*/
static void
2010-03-21 23:39:47 +01:00
vips_threadpool_kill_threads( VipsThreadpool *pool )
2010-03-18 16:05:24 +01:00
{
if( pool->thr ) {
int i;
for( i = 0; i < pool->nthr; i++ )
2010-03-21 23:39:47 +01:00
vips_thread_free( pool->thr[i] );
2010-03-18 16:05:24 +01:00
pool->thr = NULL;
#ifdef DEBUG_IO
2010-03-21 23:39:47 +01:00
printf( "vips_threadpool_kill_threads: killed %d threads\n",
2010-03-18 16:05:24 +01:00
pool->nthr );
#endif /*DEBUG_IO*/
}
}
2010-03-21 23:39:47 +01:00
static int
2010-03-18 16:05:24 +01:00
vips_threadpool_free( VipsThreadpool *pool )
{
#ifdef DEBUG_IO
printf( "vips_threadpool_free: \"%s\" (%p)\n",
pool->im->filename, pool );
#endif /*DEBUG_IO*/
2010-03-21 23:39:47 +01:00
vips_threadpool_kill_threads( pool );
2010-03-18 16:05:24 +01:00
IM_FREEF( g_mutex_free, pool->allocate_lock );
2010-03-21 14:54:56 +01:00
im_semaphore_destroy( &pool->finish );
im_semaphore_destroy( &pool->tick );
2010-03-18 16:05:24 +01:00
return( 0 );
}
2010-03-21 23:39:47 +01:00
static VipsThreadpool *
2010-03-18 16:05:24 +01:00
vips_threadpool_new( VipsImage *im )
{
VipsThreadpool *pool;
/* Allocate and init new thread block.
*/
if( !(pool = IM_NEW( im, VipsThreadpool )) )
return( NULL );
pool->im = im;
pool->allocate = NULL;
pool->work = NULL;
pool->allocate_lock = g_mutex_new();
2010-03-19 15:54:12 +01:00
pool->nthr = im_concurrency_get();
2010-03-18 16:05:24 +01:00
pool->thr = NULL;
im_semaphore_init( &pool->finish, 0, "finish" );
2010-03-21 14:54:56 +01:00
im_semaphore_init( &pool->tick, 0, "tick" );
2010-03-18 16:05:24 +01:00
pool->stop = FALSE;
2010-03-21 14:54:56 +01:00
pool->error = FALSE;
2010-03-18 16:05:24 +01:00
/* Attach tidy-up callback.
*/
if( im_add_close_callback( im,
(im_callback_fn) vips_threadpool_free, pool, NULL ) ) {
(void) vips_threadpool_free( pool );
return( NULL );
}
#ifdef DEBUG_IO
printf( "vips_threadpool_new: \"%s\" (%p), with %d threads\n",
im->filename, pool, pool->nthr );
#endif /*DEBUG_IO*/
return( pool );
}
/* Attach a set of threads.
*/
static int
2010-03-21 23:39:47 +01:00
vips_threadpool_create_threads( VipsThreadpool *pool )
2010-03-18 16:05:24 +01:00
{
int i;
g_assert( !pool->thr );
/* Make thread array.
*/
2010-03-18 23:08:07 +01:00
if( !(pool->thr = IM_ARRAY( pool->im, pool->nthr, VipsThread * )) )
2010-03-18 16:05:24 +01:00
return( -1 );
for( i = 0; i < pool->nthr; i++ )
pool->thr[i] = NULL;
/* Attach threads and start them working.
*/
for( i = 0; i < pool->nthr; i++ )
if( !(pool->thr[i] = vips_thread_new( pool )) ) {
2010-03-21 23:39:47 +01:00
vips_threadpool_kill_threads( pool );
2010-03-18 16:05:24 +01:00
return( -1 );
}
return( 0 );
}
2010-03-21 23:39:47 +01:00
/**
* VipsThreadState:
* @reg: a #REGION
* @pos: a #Rect
* @x: an int
* @y: an int
* @d: client data
* @e: client data
* @f: client data
*
* These per-thread values are carried around for your use by
* vips_threadpool_run(). They are private to each thread, so they are handy
* for VipsThreadpoolAllocate and VipsThreadpoolWork to communicate.
*
* @reg is created for you at the start of processing and freed at the end,
* but you can do what you like with it.
*/
/**
* VipsThreadpoolAllocate:
* @state: per-thread state
* @a: client data
* @b: client data
* @c: client data
* @stop: set this to signal end of computation
*
* This function is called to allocate a new work unit for the thread. It is
* always single-threaded, so it can modify per-call state (such as a
* counter).
*
* @a, @b, @c are the values supplied to the call to
* vips_threadpool_run().
*
* It should set @stop to %TRUE to indicate that no work could be allocated
* because the job is done.
*
* It should return 0 for a normal return, or non-zero to indicate an error.
*
* See also: vips_threadpool_run().
*/
/**
* VipsThreadpoolWork:
* @state: per-thread state
* @a: client data
* @b: client data
* @c: client data
*
* This function is called to process a work unit. Many copies of this can run
* at once, so it should not write to the per-call state. It can write to
* per-thread state.
*
* @a, @b, @c are the values supplied to the call to
* vips_threadpool_run().
*
* It should return 0 for a normal return, or non-zero to indicate an error.
*
* See also: vips_threadpool_run().
*/
/**
* VipsThreadpoolProgress:
* @a: client data
* @b: client data
* @c: client data
*
* This function is called by the main thread once for every work unit
* processed. It can be used to give the user progress feedback.
*
* It should return 0 for a normal return, or non-zero to indicate an error.
*
* See also: vips_threadpool_run().
*/
/**
* vips_threadpool_run:
* @im: image to loop over
* @allocate: allocate a work unit
* @work: process a work unit
* @progress: give progress feedback about a work unit, or %NULL
* @a: client data
* @b: client data
* @c: client data
*
* This function runs a set of threads over an image. Each thread run
* @allocate to set up a new work unit (perhaps the next tile in an image, for
* example), then @work to process that work unit. After each unit is
* processed, @progress is called, so that the operation can give
* progress feedback.@progress may be %NULL.
*
* Each thread has a small amount of private state that vips_threadpool_run()
* maintains for it. The @allocate and @work functions can use this to
* communicate.
*
* @allocate is always single-threaded (so it can write to the per-call state),
* whereas @work can be executed concurrently. @progress is always called by
* the main thread (ie. the thread which called vips_thtreadpool_run()).
*
* See also: im_wbuffer2(), im_concurrency_set().
*
* Returns: 0 on success, -1 on error.
*/
2010-03-18 16:05:24 +01:00
int
2010-03-18 23:08:07 +01:00
vips_threadpool_run( VipsImage *im,
2010-03-21 14:54:56 +01:00
VipsThreadpoolAllocate allocate,
VipsThreadpoolWork work,
VipsThreadpoolProgress progress,
2010-03-18 23:08:07 +01:00
void *a, void *b, void *c )
2010-03-18 16:05:24 +01:00
{
2010-03-18 23:08:07 +01:00
VipsThreadpool *pool;
2010-03-18 16:05:24 +01:00
int result;
2010-03-18 23:08:07 +01:00
#ifdef TIME_THREAD
if( !thread_timer )
thread_timer = g_timer_new();
#endif /*TIME_THREAD*/
if( !(pool = vips_threadpool_new( im )) )
return( -1 );
2010-03-18 16:05:24 +01:00
pool->allocate = allocate;
pool->work = work;
2010-03-18 23:08:07 +01:00
pool->a = a;
pool->b = b;
pool->c = c;
2010-03-18 16:05:24 +01:00
/* Attach workers and set them going.
*/
2010-03-21 23:39:47 +01:00
if( vips_threadpool_create_threads( pool ) ) {
2010-03-18 23:08:07 +01:00
vips_threadpool_free( pool );
2010-03-18 16:05:24 +01:00
return( -1 );
2010-03-18 23:08:07 +01:00
}
2010-03-18 16:05:24 +01:00
2010-03-21 14:54:56 +01:00
for(;;) {
2010-03-18 16:05:24 +01:00
#ifdef HAVE_THREADS
2010-03-21 14:54:56 +01:00
/* Wait for a tick from a worker.
*/
im_semaphore_down( &pool->tick );
2010-03-18 16:05:24 +01:00
#else
2010-03-21 14:54:56 +01:00
/* No threads, do the work ourselves in the main thread.
*/
2010-03-21 23:39:47 +01:00
vips_thread_work_unit( pool->thr[0] );
2010-03-18 16:05:24 +01:00
#endif /*HAVE_THREADS*/
2010-03-21 14:54:56 +01:00
if( pool->stop || pool->error )
break;
2010-03-18 23:08:07 +01:00
2010-03-21 14:54:56 +01:00
if( progress &&
progress( pool->a, pool->b, pool->c ) )
pool->error = TRUE;
if( pool->stop || pool->error )
break;
2010-03-18 16:05:24 +01:00
}
2010-03-21 14:54:56 +01:00
/* Wait for them all to hit finish.
*/
im_semaphore_downn( &pool->finish, pool->nthr );
/* Return 0 for success.
*/
result = pool->error ? -1 : 0;
2010-03-18 23:08:07 +01:00
vips_threadpool_free( pool );
2010-03-18 16:05:24 +01:00
2010-03-18 23:08:07 +01:00
return( result );
2010-03-18 16:05:24 +01:00
}
2010-03-18 23:08:07 +01:00
2010-03-21 23:39:47 +01:00
/**
* vips_get_tile_size:
* @im: image to guess for
* @tile_width: return selected tile width
* @tile_height: return selected tile height
* @nlines: return buffer height in scanlines
*
* Pick a tile size and a buffer height for this image and the current
* value of im_concurrency_get(). The buffer height
* will always be a multiple of tile_height.
2010-03-19 15:54:12 +01:00
*/
void
vips_get_tile_size( VipsImage *im,
int *tile_width, int *tile_height, int *nlines )
{
2010-03-21 00:56:35 +01:00
const int nthr = im_concurrency_get();
2010-03-19 15:54:12 +01:00
/* Pick a render geometry.
*/
2010-03-21 00:56:35 +01:00
switch( im->dhint ) {
2010-03-19 15:54:12 +01:00
case IM_SMALLTILE:
*tile_width = im__tile_width;
*tile_height = im__tile_height;
/* Enough lines of tiles that we can expect to be able to keep
* nthr busy.
*/
*nlines = *tile_height *
(1 + nthr / IM_MAX( 1, im->Xsize / *tile_width ));
break;
case IM_FATSTRIP:
*tile_width = im->Xsize;
*tile_height = im__fatstrip_height;
*nlines = *tile_height * nthr * 2;
break;
case IM_ANY:
case IM_THINSTRIP:
*tile_width = im->Xsize;
*tile_height = im__thinstrip_height;
*nlines = *tile_height * nthr * 2;
break;
default:
g_assert( 0 );
}
2010-03-21 00:56:35 +01:00
/* We make this assumption in several places.
*/
g_assert( *nlines % *tile_height == 0 );
2010-03-19 15:54:12 +01:00
#ifdef DEBUG_IO
printf( "vips_get_tile_size: %d by %d patches, "
"groups of %d scanlines\n",
*tile_width, *tile_height, *nlines );
#endif /*DEBUG_IO*/
}