diff --git a/TODO b/TODO index 0012effc..a4665cdd 100644 --- a/TODO +++ b/TODO @@ -1,10 +1,7 @@ -- make VipsThread opaque ... have a separate VipsThreadState struct holding - stuff and only expose that +- make VipsRegionWrite the real fun, im_wbuffer_fn the typedef - try with im_render(), ouch, that will be painful -- switch other file sinks over, maybe have a shim layer? - - switch im_iterate() over? - do we have other uses of threadgroup? diff --git a/doc/reference/libvips-docs.sgml.in b/doc/reference/libvips-docs.sgml.in index a48bbd2f..21cea6af 100644 --- a/doc/reference/libvips-docs.sgml.in +++ b/doc/reference/libvips-docs.sgml.in @@ -24,6 +24,7 @@ + diff --git a/libvips/include/vips/internal.h b/libvips/include/vips/internal.h index 345579c1..c7c693a0 100644 --- a/libvips/include/vips/internal.h +++ b/libvips/include/vips/internal.h @@ -70,6 +70,10 @@ extern int im__concurrency; */ extern int im__progress; +/* Use wbuffer2. + */ +extern int im__wbuffer2; + typedef int (*im__fftproc_fn)( IMAGE *, IMAGE *, IMAGE * ); /* iofuncs diff --git a/libvips/include/vips/threadpool.h b/libvips/include/vips/threadpool.h index a004c9ad..7fb1473d 100644 --- a/libvips/include/vips/threadpool.h +++ b/libvips/include/vips/threadpool.h @@ -42,50 +42,33 @@ extern "C" { #include -/* What we track for each thread in the pool. +/* The per-thread state we expose. Allocate functions can use these members to + * communicate with work functions. */ -typedef struct { - /* All private. - */ - /*< private >*/ - struct _VipsThreadpool *pool; /* Pool we are part of */ - - GThread *thread; /* Thread for this region */ - - /* Set this to ask the thread to exit. - */ - gboolean exit; - - /* Set by the thread if work or allocate return an error. - */ - gboolean error; - - /* Thread state for the worker and allocate. Handy for communication. - * The region is created and destroyed by the threadpool for the - * worker. +typedef struct _VipsThreadState { + /* This region is created and destroyed by the threadpool for the + * worker. */ REGION *reg; + + /* The rest are neither used nor set, do what you lke with them. + */ Rect pos; int x, y; - void *a, *b, *c; - -#ifdef TIME_THREAD - double *btime, *etime; - int tpos; -#endif /*TIME_THREAD*/ -} VipsThread; + void *d, *e, *f; +} VipsThreadState; /* A work allocate function. This is run single-threaded by a worker to * set up a new work unit. * Return non-zero for errors. Set *stop for "no more work to do" */ -typedef int (*VipsThreadpoolAllocate)( VipsThread *thr, +typedef int (*VipsThreadpoolAllocate)( VipsThreadState *state, void *a, void *b, void *c, gboolean *stop ); /* A work function. This does a unit of work (eg. processing a tile or * whatever). Return non-zero for errors. */ -typedef int (*VipsThreadpoolWork)( VipsThread *thr, REGION *reg, +typedef int (*VipsThreadpoolWork)( VipsThreadState *state, void *a, void *b, void *c ); /* A progress function. This is run by the main thread once for every @@ -93,47 +76,6 @@ typedef int (*VipsThreadpoolWork)( VipsThread *thr, REGION *reg, */ typedef int (*VipsThreadpoolProgress)( void *a, void *b, void *c ); -/* What we track for a group of threads working together. - */ -typedef struct _VipsThreadpool { - /* All private. - */ - /*< private >*/ - VipsImage *im; /* Image we are calculating */ - - /* Do a unit of work (runs in parallel) and allocate a unit of work - * (serial). Plus the mutex we use to serialize work allocation. - */ - VipsThreadpoolAllocate allocate; - VipsThreadpoolWork work; - GMutex *allocate_lock; - void *a, *b, *c; /* User arguments to work / allocate */ - - int nthr; /* Number of threads in pool */ - VipsThread **thr; /* Threads */ - - /* The caller blocks here until all threads finish. - */ - im_semaphore_t finish; - - /* Workers up this for every loop to make the main thread tick. - */ - im_semaphore_t tick; - - /* Set this to abort evaluation early with an error. - */ - gboolean error; - - /* Set by Allocate (via an arg) to indicate normal end of computation. - */ - gboolean stop; - - /* Set this if the pool has been shut down. We sometimes need to allow - * double-frees. - */ - gboolean zombie; -} VipsThreadpool; - int vips_threadpool_run( VipsImage *im, VipsThreadpoolAllocate allocate, VipsThreadpoolWork work, @@ -142,9 +84,9 @@ int vips_threadpool_run( VipsImage *im, void vips_get_tile_size( VipsImage *im, int *tile_width, int *tile_height, int *nlines ); -extern int im__wbuffer2; - -int im_wbuffer2( VipsImage *im, im_wbuffer_fn write_fn, void *a, void *b ); +typedef im_wbuffer_fn VipsRegionWrite; +int vips_discsink( VipsImage *im, + VipsRegionWrite write_fn, void *a, void *b ); #ifdef __cplusplus } diff --git a/libvips/iofuncs/Makefile.am b/libvips/iofuncs/Makefile.am index a06440d3..68ed9fa2 100644 --- a/libvips/iofuncs/Makefile.am +++ b/libvips/iofuncs/Makefile.am @@ -29,7 +29,7 @@ libiofuncs_la_SOURCES = \ im_unmapfile.c \ im_guess_prefix.c \ im_wbuffer.c \ - im_wbuffer2.c \ + discsink.c \ im_wrapmany.c \ im_writeline.c \ memory.c \ diff --git a/libvips/iofuncs/im_wbuffer2.c b/libvips/iofuncs/discsink.c similarity index 87% rename from libvips/iofuncs/im_wbuffer2.c rename to libvips/iofuncs/discsink.c index 946f829a..a7f2cfb0 100644 --- a/libvips/iofuncs/im_wbuffer2.c +++ b/libvips/iofuncs/discsink.c @@ -1,4 +1,4 @@ -/* Double-buffered write. +/* Write an image. * * 19/3/10 * - from im_wbuffer.c @@ -99,7 +99,7 @@ typedef struct _Write { /* The file format write operation. */ - im_wbuffer_fn write_fn; + VipsRegionWrite write_fn; void *a; void *b; } Write; @@ -148,7 +148,6 @@ wbuffer_write( WriteBuffer *wbuffer ) wbuffer->write_errno = write->write_fn( wbuffer->region, &wbuffer->area, write->a, write->b ); - } #ifdef HAVE_THREADS @@ -309,7 +308,7 @@ wbuffer_position( WriteBuffer *wbuffer, int top, int height ) * iteration. */ static gboolean -wbuffer_allocate_fn( VipsThread *thr, +wbuffer_allocate_fn( VipsThreadState *state, void *a, void *b, void *c, gboolean *stop ) { Write *write = (Write *) a; @@ -359,8 +358,8 @@ wbuffer_allocate_fn( VipsThread *thr, tile.top = write->y; tile.width = write->tile_width; tile.height = write->tile_height; - im_rect_intersectrect( &image, &tile, &thr->pos ); - thr->a = write->buf; + im_rect_intersectrect( &image, &tile, &state->pos ); + state->d = write->buf; /* Add to the number of writers on the buffer. */ @@ -376,17 +375,17 @@ wbuffer_allocate_fn( VipsThread *thr, /* Our VipsThreadpoolWork function ... generate a tile! */ static int -wbuffer_work_fn( VipsThread *thr, - REGION *reg, void *a, void *b, void *c ) +wbuffer_work_fn( VipsThreadState *state, + void *a, void *b, void *c ) { - WriteBuffer *wbuffer = (WriteBuffer *) thr->a; + WriteBuffer *wbuffer = (WriteBuffer *) state->d; #ifdef DEBUG printf( "wbuffer_work_fn\n" ); #endif /*DEBUG*/ - if( im_prepare_to( reg, wbuffer->region, - &thr->pos, thr->pos.left, thr->pos.top ) ) + if( im_prepare_to( state->reg, wbuffer->region, + &state->pos, state->pos.left, state->pos.top ) ) return( -1 ); /* Tell the bg write thread we've left. @@ -415,7 +414,7 @@ wbuffer_progress_fn( void *a, void *b, void *c ) static void write_init( Write *write, - VipsImage *im, im_wbuffer_fn write_fn, void *a, void *b ) + VipsImage *im, VipsRegionWrite write_fn, void *a, void *b ) { write->im = im; write->buf = wbuffer_new( write ); @@ -437,8 +436,42 @@ write_free( Write *write ) IM_FREEF( wbuffer_free, write->buf_back ); } +/** + * VipsRegionWrite: + * @region: pixels to write + * @a: client data + * @b: client data + * + * The function should write the pixels in @region. @a and @b are the values + * passed into vips_discsink(). + * + * See also: vips_discsink(). + * + * Returns: 0 on success, -1 on error. + */ + +/** + * vips_discsink: + * @im: image to process + * @write_fn: called for every batch of pixels + * @a: client data + * @b: client data + * + * vips_discsink() loops over @im, top-to-bottom, generating it in sections. + * As each section is produced, @write_fn is called. + * + * @write_fn is always called single-threaded, it's always given image + * sections in top-to-bottom order, and there are never any gaps. + * + * This operation is handy for making image sinks which output to things like + * disc files. + * + * See also: im_concurrency_set(). + * + * Returns: 0 on success, -1 on error. + */ int -im_wbuffer2( VipsImage *im, im_wbuffer_fn write_fn, void *a, void *b ) +vips_discsink( VipsImage *im, VipsRegionWrite write_fn, void *a, void *b ) { Write write; int result; diff --git a/libvips/iofuncs/im_wbuffer.c b/libvips/iofuncs/im_wbuffer.c index 369e5b53..732f57d7 100644 --- a/libvips/iofuncs/im_wbuffer.c +++ b/libvips/iofuncs/im_wbuffer.c @@ -56,6 +56,7 @@ #include #include +#include #include #ifdef WITH_DMALLOC @@ -429,7 +430,7 @@ im_wbuffer( im_threadgroup_t *tg, im_wbuffer_fn write_fn, void *a, void *b ) /* Optionally use the newer one. */ if( im__wbuffer2 ) - return( im_wbuffer2( tg->im, write_fn, a, b ) ); + return( vips_discsink( tg->im, write_fn, a, b ) ); if( im__start_eval( tg->im ) ) return( -1 ); diff --git a/libvips/iofuncs/threadpool.c b/libvips/iofuncs/threadpool.c index 4de23b55..8fba6ef1 100644 --- a/libvips/iofuncs/threadpool.c +++ b/libvips/iofuncs/threadpool.c @@ -4,6 +4,9 @@ * - from threadgroup.c * - distributed work allocation idea from Christian Blenia, thank you * very much + * 21/3/10 + * - progress feedback + * - only expose VipsThreadState */ /* @@ -66,13 +69,79 @@ * @see_also: generate * @include: vips/vips.h * - * This is like threadgroup, but the work allocation is distributed. This - * reduces the synchronisation overhead and improves scalability. + * vips_threadpool_run() loops a set of threads over an image. Threads take it + * in turns to allocate units of work (a unit might be a tile in an image), + * then run in parallel to process those units. An optional progress function + * can be used to give feedback. * - * Most of this is internal to VIPS and does not need to be documented. You - * should only need vips_threadpool_new() and vips_threadpool_free(). + * This is like threadgroup, but workers allocate work units themselves. This + * reduces the synchronisation overhead and improves scalability. */ +/* What we track for each thread in the pool. + */ +typedef struct { + /* All private. + */ + /*< private >*/ + VipsThreadState state; + + struct _VipsThreadpool *pool; /* Pool we are part of */ + + /* Thread we are running. + */ + GThread *thread; + + /* Set this to ask the thread to exit. + */ + gboolean exit; + + /* Set by the thread if work or allocate return an error. + */ + gboolean error; + +#ifdef TIME_THREAD + double *btime, *etime; + int tpos; +#endif /*TIME_THREAD*/ +} VipsThread; + +/* What we track for a group of threads working together. + */ +typedef struct _VipsThreadpool { + /* All private. + */ + /*< private >*/ + VipsImage *im; /* Image we are calculating */ + + /* Do a unit of work (runs in parallel) and allocate a unit of work + * (serial). Plus the mutex we use to serialize work allocation. + */ + VipsThreadpoolAllocate allocate; + VipsThreadpoolWork work; + GMutex *allocate_lock; + void *a, *b, *c; /* User arguments to work / allocate */ + + int nthr; /* Number of threads in pool */ + VipsThread **thr; /* Threads */ + + /* The caller blocks here until all threads finish. + */ + im_semaphore_t finish; + + /* Workers up this for every loop to make the main thread tick. + */ + im_semaphore_t tick; + + /* Set this to abort evaluation early with an error. + */ + gboolean error; + + /* Set by Allocate (via an arg) to indicate normal end of computation. + */ + gboolean stop; +} VipsThreadpool; + #ifdef TIME_THREAD /* Size of time buffers. */ @@ -84,7 +153,7 @@ static GTimer *thread_timer = NULL; /* Save time buffers. */ static int -save_time_buffers( VipsThread *thr ) +vips_thread_save_time_buffers( VipsThread *thr ) { int i; static int rn = 1; @@ -92,7 +161,8 @@ save_time_buffers( VipsThread *thr ) char name[256]; im_snprintf( name, 256, "time%d", rn++ ); - printf( "saving time buffer to \"%s\"\n", name ); + printf( "vips_thread_save_time_buffers: " + "saving buffer to \"%s\"\n", name ); if( !(fp = fopen( name, "w" )) ) error_exit( "unable to write to \"%s\"", name ); for( i = 0; i < thr->tpos; i++ ) @@ -106,7 +176,7 @@ save_time_buffers( VipsThread *thr ) /* Junk a thread. */ static void -thread_free( VipsThread *thr ) +vips_thread_free( VipsThread *thr ) { /* Is there a thread running this region? Kill it! */ @@ -123,12 +193,12 @@ thread_free( VipsThread *thr ) thr->thread = NULL; } - IM_FREEF( im_region_free, thr->reg ); + IM_FREEF( im_region_free, thr->state.reg ); thr->pool = NULL; #ifdef TIME_THREAD if( thr->btime ) - (void) save_time_buffers( thr ); + (void) vips_thread_save_time_buffers( thr ); #endif /*TIME_THREAD*/ } @@ -136,20 +206,22 @@ thread_free( VipsThread *thr ) * from the main thread if threading is off. */ static void -thread_work_unit( VipsThread *thr ) +vips_thread_work_unit( VipsThread *thr ) { VipsThreadpool *pool = thr->pool; /* Ask for a work unit. */ g_mutex_lock( pool->allocate_lock ); + if( !pool->stop ) { - if( pool->allocate( thr, + if( pool->allocate( &thr->state, pool->a, pool->b, pool->c, &pool->stop ) ) { thr->error = TRUE; pool->error = TRUE; } } + g_mutex_unlock( pool->allocate_lock ); if( pool->stop || pool->error ) @@ -165,7 +237,7 @@ thread_work_unit( VipsThread *thr ) /* Process a work unit. */ - if( pool->work( thr, thr->reg, pool->a, pool->b, pool->c ) ) { + if( pool->work( &thr->state, pool->a, pool->b, pool->c ) ) { thr->error = TRUE; pool->error = TRUE; } @@ -185,7 +257,7 @@ thread_work_unit( VipsThread *thr ) /* What runs as a thread ... loop, waiting to be told to do stuff. */ static void * -thread_main_loop( void *a ) +vips_thread_main_loop( void *a ) { VipsThread *thr = (VipsThread *) a; VipsThreadpool *pool = thr->pool; @@ -195,13 +267,13 @@ thread_main_loop( void *a ) /* We now control the region (it was created by pool when we * were built). */ - im__region_take_ownership( thr->reg ); + im__region_take_ownership( thr->state.reg ); /* Process work units! Always tick, even if we are stopping, so the * main thread will wake up for exit. */ for(;;) { - thread_work_unit( thr ); + vips_thread_work_unit( thr ); im_semaphore_up( &pool->tick ); if( pool->stop || pool->error ) @@ -226,7 +298,7 @@ vips_thread_new( VipsThreadpool *pool ) if( !(thr = IM_NEW( pool->im, VipsThread )) ) return( NULL ); thr->pool = pool; - thr->reg = NULL; + thr->state.reg = NULL; thr->thread = NULL; thr->exit = 0; thr->error = 0; @@ -238,14 +310,14 @@ vips_thread_new( VipsThreadpool *pool ) /* Attach stuff. */ - if( !(thr->reg = im_region_create( pool->im )) ) { - thread_free( thr ); + if( !(thr->state.reg = im_region_create( pool->im )) ) { + vips_thread_free( thr ); return( NULL ); } /* Get ready to hand the region over to the thread. */ - im__region_no_ownership( thr->reg ); + im__region_no_ownership( thr->state.reg ); #ifdef TIME_THREAD thr->btime = IM_ARRAY( pool->im, IM_TBUF_SIZE, double ); @@ -261,17 +333,17 @@ vips_thread_new( VipsThreadpool *pool ) * we need to insist on a non-tiny stack. Some platforms default to * very small values (eg. various BSDs). */ - if( !(thr->thread = g_thread_create_full( thread_main_loop, thr, + if( !(thr->thread = g_thread_create_full( vips_thread_main_loop, thr, IM__DEFAULT_STACK_SIZE, TRUE, FALSE, G_THREAD_PRIORITY_NORMAL, NULL )) ) { im_error( "threadgroup_thread_new", "%s", _( "unable to create thread" ) ); - thread_free( thr ); + vips_thread_free( thr ); return( NULL ); } #ifdef DEBUG_CREATE - printf( "threadgroup_thread_new: g_thread_create_full()\n" ); + printf( "vips_thread_new: g_thread_create_full()\n" ); #endif /*DEBUG_CREATE*/ #endif /*HAVE_THREADS*/ @@ -281,36 +353,23 @@ vips_thread_new( VipsThreadpool *pool ) /* Kill all threads in a threadgroup, if there are any. */ static void -threadpool_kill_threads( VipsThreadpool *pool ) +vips_threadpool_kill_threads( VipsThreadpool *pool ) { if( pool->thr ) { int i; for( i = 0; i < pool->nthr; i++ ) - thread_free( pool->thr[i] ); + vips_thread_free( pool->thr[i] ); pool->thr = NULL; #ifdef DEBUG_IO - printf( "threadpool_kill_threads: killed %d threads\n", + printf( "vips_threadpool_kill_threads: killed %d threads\n", pool->nthr ); #endif /*DEBUG_IO*/ } } -/** - * vips_threadpool_free: - * @pool: pool to free - * - * Frees a VipsThreadpool. This function can be called multiple times, though - * only the first call will have any effect. - * - * All worker threads are terminated and all resources freed. - * - * See also: vips_threadpool_new(). - * - * Returns: 0. - */ -int +static int vips_threadpool_free( VipsThreadpool *pool ) { #ifdef DEBUG_IO @@ -318,31 +377,15 @@ vips_threadpool_free( VipsThreadpool *pool ) pool->im->filename, pool ); #endif /*DEBUG_IO*/ - if( !pool || pool->zombie ) - return( 0 ); - - threadpool_kill_threads( pool ); + vips_threadpool_kill_threads( pool ); IM_FREEF( g_mutex_free, pool->allocate_lock ); im_semaphore_destroy( &pool->finish ); im_semaphore_destroy( &pool->tick ); - pool->zombie = 1; return( 0 ); } -/** - * vips_threadpool_new: - * @im: image to create the threadgroup on - * - * Makes a threadpool attached to the image. The threadgroup will be freed - * for you if the image is closed, but you can free it yourself with - * vips_threadpool_free() if you wish. - * - * See also: vips_threadpool_free(). - * - * Returns: an #VipsThreadpool on success, %NULL on error. - */ -VipsThreadpool * +static VipsThreadpool * vips_threadpool_new( VipsImage *im ) { VipsThreadpool *pool; @@ -361,7 +404,6 @@ vips_threadpool_new( VipsImage *im ) im_semaphore_init( &pool->tick, 0, "tick" ); pool->stop = FALSE; pool->error = FALSE; - pool->zombie = FALSE; /* Attach tidy-up callback. */ @@ -382,7 +424,7 @@ vips_threadpool_new( VipsImage *im ) /* Attach a set of threads. */ static int -threadpool_create_threads( VipsThreadpool *pool ) +vips_threadpool_create_threads( VipsThreadpool *pool ) { int i; @@ -399,13 +441,115 @@ threadpool_create_threads( VipsThreadpool *pool ) */ for( i = 0; i < pool->nthr; i++ ) if( !(pool->thr[i] = vips_thread_new( pool )) ) { - threadpool_kill_threads( pool ); + vips_threadpool_kill_threads( pool ); return( -1 ); } return( 0 ); } +/** + * VipsThreadState: + * @reg: a #REGION + * @pos: a #Rect + * @x: an int + * @y: an int + * @d: client data + * @e: client data + * @f: client data + * + * These per-thread values are carried around for your use by + * vips_threadpool_run(). They are private to each thread, so they are handy + * for VipsThreadpoolAllocate and VipsThreadpoolWork to communicate. + * + * @reg is created for you at the start of processing and freed at the end, + * but you can do what you like with it. + */ + +/** + * VipsThreadpoolAllocate: + * @state: per-thread state + * @a: client data + * @b: client data + * @c: client data + * @stop: set this to signal end of computation + * + * This function is called to allocate a new work unit for the thread. It is + * always single-threaded, so it can modify per-call state (such as a + * counter). + * + * @a, @b, @c are the values supplied to the call to + * vips_threadpool_run(). + * + * It should set @stop to %TRUE to indicate that no work could be allocated + * because the job is done. + * + * It should return 0 for a normal return, or non-zero to indicate an error. + * + * See also: vips_threadpool_run(). + */ + +/** + * VipsThreadpoolWork: + * @state: per-thread state + * @a: client data + * @b: client data + * @c: client data + * + * This function is called to process a work unit. Many copies of this can run + * at once, so it should not write to the per-call state. It can write to + * per-thread state. + * + * @a, @b, @c are the values supplied to the call to + * vips_threadpool_run(). + * + * It should return 0 for a normal return, or non-zero to indicate an error. + * + * See also: vips_threadpool_run(). + */ + +/** + * VipsThreadpoolProgress: + * @a: client data + * @b: client data + * @c: client data + * + * This function is called by the main thread once for every work unit + * processed. It can be used to give the user progress feedback. + * + * It should return 0 for a normal return, or non-zero to indicate an error. + * + * See also: vips_threadpool_run(). + */ + +/** + * vips_threadpool_run: + * @im: image to loop over + * @allocate: allocate a work unit + * @work: process a work unit + * @progress: give progress feedback about a work unit, or %NULL + * @a: client data + * @b: client data + * @c: client data + * + * This function runs a set of threads over an image. Each thread run + * @allocate to set up a new work unit (perhaps the next tile in an image, for + * example), then @work to process that work unit. After each unit is + * processed, @progress is called, so that the operation can give + * progress feedback.@progress may be %NULL. + * + * Each thread has a small amount of private state that vips_threadpool_run() + * maintains for it. The @allocate and @work functions can use this to + * communicate. + * + * @allocate is always single-threaded (so it can write to the per-call state), + * whereas @work can be executed concurrently. @progress is always called by + * the main thread (ie. the thread which called vips_thtreadpool_run()). + * + * See also: im_wbuffer2(), im_concurrency_set(). + * + * Returns: 0 on success, -1 on error. + */ int vips_threadpool_run( VipsImage *im, VipsThreadpoolAllocate allocate, @@ -432,7 +576,7 @@ vips_threadpool_run( VipsImage *im, /* Attach workers and set them going. */ - if( threadpool_create_threads( pool ) ) { + if( vips_threadpool_create_threads( pool ) ) { vips_threadpool_free( pool ); return( -1 ); } @@ -445,7 +589,7 @@ vips_threadpool_run( VipsImage *im, #else /* No threads, do the work ourselves in the main thread. */ - thread_work_unit( pool->thr[0] ); + vips_thread_work_unit( pool->thr[0] ); #endif /*HAVE_THREADS*/ if( pool->stop || pool->error ) @@ -472,8 +616,16 @@ vips_threadpool_run( VipsImage *im, return( result ); } -/* Pick a tile size and a buffer height. The buffer height must be a multiple - * of tile_height. +/** + * vips_get_tile_size: + * @im: image to guess for + * @tile_width: return selected tile width + * @tile_height: return selected tile height + * @nlines: return buffer height in scanlines + * + * Pick a tile size and a buffer height for this image and the current + * value of im_concurrency_get(). The buffer height + * will always be a multiple of tile_height. */ void vips_get_tile_size( VipsImage *im,