diff --git a/ChangeLog b/ChangeLog index d39a3a93..36fd9927 100644 --- a/ChangeLog +++ b/ChangeLog @@ -60,8 +60,12 @@ - im_jpeg2vips.c, set scale_num on shrink (thanks Guido) - heh argh reading history always stopped after the first line (thanks Haida) - added im_histindexed -- new im_iterate() calls start functions from workers so resources they make - are owned by the worker thread +- new im_iterate() calls start and stop functions from workers so resources + they make are owned by the worker thread ... this makes it possible to have + start functions which create mutiple regions and therefore allows + im_iterate() to scan more than one image at once +- threadgroup no longer has any default action, you must attach a work + function 25/3/09 started 7.18.0 - revised version numbers diff --git a/TODO b/TODO index c6115943..fef48ee7 100644 --- a/TODO +++ b/TODO @@ -1,18 +1,3 @@ -- need to call stop funcs from the worker as well - - im_generate() does this my having im_region_free() call stop, if necessary - - but we can't use that mechanism, since it relies on im->client1 etc. which - for us will be the copy operation that generated the image - - regions need a call-this-to-free-seq function which is installed by - generate or iterate as appropriate - - no, that won't work, when could we install the callbacks? - - we must call the stop functions from the worker threads, that'll take care of - freeing the regions in the right context for us - - memory.c diff --git a/libvips/include/vips/threadgroup.h b/libvips/include/vips/threadgroup.h index 8912e04a..602b2aa6 100644 --- a/libvips/include/vips/threadgroup.h +++ b/libvips/include/vips/threadgroup.h @@ -47,10 +47,6 @@ extern "C" { */ #define IM__DEFAULT_STACK_SIZE (2 * 1024 * 1024) -/* A work function. - */ -typedef int (*im__work_fn)( REGION *, void *, void *, void * ); - /* What we track for each thread. */ typedef struct { @@ -63,8 +59,8 @@ typedef struct { int error; /* Set by thread if work fn fails */ REGION *oreg; /* If part of an inplace threadgroup, */ - Rect pos; /* where this thread should write */ - int x, y; /* it's result */ + Rect pos; /* Where this thread should write */ + int x, y; /* Its result */ void *a, *b, *c; /* User arguments to work fns */ @@ -74,6 +70,11 @@ typedef struct { #endif /*TIME_THREAD*/ } im_thread_t; +/* A work function. + */ +typedef int (*im__work_fn)( im_thread_t *thr, + REGION *, void *, void *, void * ); + /* What we track for a group of threads working together. */ typedef struct im__threadgroup_t { @@ -84,7 +85,6 @@ typedef struct im__threadgroup_t { int nlines; /* Scanlines-at-once we prefer for iteration */ im__work_fn work; /* Work fn for this threadgroup */ - int inplace; /* Regions should be contiguous */ int nthr; /* Number of threads in group */ im_thread_t **thr; /* Threads */ diff --git a/libvips/iofuncs/im_generate.c b/libvips/iofuncs/im_generate.c index f71b8d4e..567ffb55 100644 --- a/libvips/iofuncs/im_generate.c +++ b/libvips/iofuncs/im_generate.c @@ -294,6 +294,29 @@ im_allocate_input_array( IMAGE *out, ... ) * Returns: 0 on success, -1 on error. */ +static int +generate_work( im_thread_t *thr, + REGION *reg, void *a, void *b, void *c ) +{ + /* thr pos needs to be set before coming here ... check. + */ +{ + Rect image; + + image.left = 0; + image.top = 0; + image.width = thr->tg->im->Xsize; + image.height = thr->tg->im->Ysize; + + g_assert( im_rect_includesrect( &image, &thr->pos ) ); +} + + if( im_prepare_to( reg, thr->oreg, &thr->pos, thr->x, thr->y ) ) + return( -1 ); + + return( 0 ); +} + /* Loop over a big region, filling it in many small pieces with threads. */ static int @@ -309,9 +332,9 @@ eval_to_region( REGION *or, im_threadgroup_t *tg ) image.width = or->im->Xsize; image.height = or->im->Ysize; - /* Note we'll be working to fill a contigious area. + /* Our work function ... an inplace one. */ - tg->inplace = 1; + tg->work = generate_work; /* Loop over or, attaching to all sub-parts in turn. */ diff --git a/libvips/iofuncs/im_iterate.c b/libvips/iofuncs/im_iterate.c index 9762c953..67b27af8 100644 --- a/libvips/iofuncs/im_iterate.c +++ b/libvips/iofuncs/im_iterate.c @@ -25,6 +25,9 @@ * - new eval start/progress/end system * 7/10/09 * - gtkdoc comments + * 15/10/09 + * - call start and stop functions from the worker threads + * - reworked a bit */ /* @@ -54,7 +57,7 @@ */ /* -#define DEBUG_IO +#define DEBUG */ #ifdef HAVE_CONFIG_H @@ -93,35 +96,83 @@ typedef struct _Iterate { im_stop_fn stop; void *b; void *c; + + /* Set this to signal to workers that we want to shut down sequences. + */ + gboolean shutdown; } Iterate; -/* Call all stop functions. +/* Call a thread's stop function. */ static int -iterate_call_all_stop( Iterate *iter, im_threadgroup_t *tg ) +iterate_call_stop( Iterate *iter, im_thread_t *thr ) { - int i; +#ifdef DEBUG + printf( "iterate_call_stop: thr = %p\n", thr ); +#endif /*DEBUG*/ - for( i = 0; i < tg->nthr; i++ ) { - if( tg->thr[i]->a && iter->stop ) { - if( iter->stop( tg->thr[i]->a, iter->b, iter->c ) ) - /* Drastic! - */ - im_error( "im_iterate", - _( "stop function failed " - "for image \"%s\"" ), - iter->im->filename ); - tg->thr[i]->a = NULL; + if( thr->a && iter->stop ) { + if( iter->stop( thr->a, iter->b, iter->c ) ) { + im_error( "im_iterate", + _( "stop function failed " + "for image \"%s\"" ), + iter->im->filename ); + return( -1 ); } + + thr->a = NULL; } return( 0 ); } +static void +iterate_call_all_stop( Iterate *iter ) +{ + im_threadgroup_t *tg = iter->tg; + + int i; + +#ifdef DEBUG + printf( "iterate_call_all_stop: start\n" ); +#endif /*DEBUG*/ + + /* Wait for all threads to hit 'go'. + */ + im_threadgroup_wait( tg ); + + /* Get all threads off idle and waiting for work. + */ + for( i = 0; i < tg->nthr; i++ ) + (void) im_threadgroup_get( tg ); + + /* Now run all threads one more time and ask them to junk their + * sequence value. 'shutdown' changes the behaviour of the work + * function, see below. + */ + iter->shutdown = TRUE; + + for( i = 0; i < tg->nthr; i++ ) { + /* The work fn will need this set if it's not been run + * before. + */ + tg->thr[i]->b = iter; + im_threadgroup_trigger( tg->thr[i] ); + } + + /* And wait for them to idle again. + */ + im_threadgroup_wait( tg ); + +#ifdef DEBUG + printf( "iterate_call_all_stop: done\n" ); +#endif /*DEBUG*/ +} + static void iterate_free( Iterate *iter ) { - /* Check all the stop functions have been called. + /* All threads should have junked their sequence values. */ if( iter->tg ) { int i; @@ -155,22 +206,54 @@ iterate_call_start( Iterate *iter, im_thread_t *thr ) return( 0 ); } -/* Our generate function. We need to call the user's start function from the - * worker thread so that any regions it makes are owned by the thread. +/* Our work function. This is complicated because we need to make sure we + * all call our start and stop functions from the worker thread, so that any + * regions created are owned by the worker and not the main thread. */ static int -iterate_gen( REGION *reg, void *seq, void *a, void *b ) +iterate_work( im_thread_t *thr, REGION *reg, void *seq, void *a, void *b ) { Iterate *iter = (Iterate *) a; - im_thread_t *thr = (im_thread_t *) b; - /* Make sure the start function has run and we have the sequence value - * set. + /* 'shutdown' is set at the end of computation to ask workers to free + * context. */ - iterate_call_start( iter, thr ); - seq = thr->a; + if( iter->shutdown ) { + if( iterate_call_stop( iter, thr ) ) + return( -1 ); + } + else { + /* Make sure the start function has run and we have the + * sequence value set. + */ + iterate_call_start( iter, thr ); + seq = thr->a; - return( iter->generate( reg, seq, iter->b, iter->c ) ); + /* thr pos needs to be set before coming here ... check. + */ +{ + Rect image; + + image.left = 0; + image.top = 0; + image.width = thr->tg->im->Xsize; + image.height = thr->tg->im->Ysize; + + g_assert( im_rect_includesrect( &image, &thr->pos ) ); +} + + /* Generate pixels for the user. + */ + if( im_prepare( reg, &thr->pos ) ) + return( -1 ); + + /* Run the user code on this area. + */ + if( iter->generate( reg, seq, iter->b, iter->c ) ) + return( -1 ); + } + + return( 0 ); } static int @@ -187,6 +270,7 @@ iterate_init( Iterate *iter, iter->stop = stop; iter->b = b; iter->c = c; + iter->shutdown = FALSE; if( !(iter->t = im_open( "iterate", "p" )) || im_copy( iter->im, iter->t ) || @@ -195,14 +279,11 @@ iterate_init( Iterate *iter, return( -1 ); } - iter->tg->work = iterate_gen; - iter->tg->inplace = 0; + iter->tg->work = iterate_work; -#ifdef DEBUG_IO - if( iter->tg->nthr > 1 ) - im_diagnostics( "im_iterate: using %d threads", - iter->tg->nthr ); -#endif /*DEBUG_IO*/ +#ifdef DEBUG + im_diag( "im_iterate", "using %d threads", iter->tg->nthr ); +#endif /*DEBUG*/ return( 0 ); } @@ -215,10 +296,6 @@ iterate_loop( Iterate *iter, im_threadgroup_t *tg, IMAGE *t ) int x, y; Rect image; - /* Set up. - */ - tg->inplace = 0; - image.left = 0; image.top = 0; image.width = t->Xsize; @@ -248,10 +325,9 @@ iterate_loop( Iterate *iter, im_threadgroup_t *tg, IMAGE *t ) thr->pos = clipped; - /* Other stuff we want passed to iterate_gen(). + /* Other stuff we want passed to iterate_work(). */ thr->b = iter; - thr->c = thr; /* Start worker going. */ @@ -270,15 +346,10 @@ iterate_loop( Iterate *iter, im_threadgroup_t *tg, IMAGE *t ) } } - /* Wait for all threads to hit 'go' again. + /* Make sure all processing is done. */ im_threadgroup_wait( tg ); - /* Test for any errors. - */ - if( im_threadgroup_iserror( tg ) ) - return( -1 ); - return( 0 ); } @@ -326,7 +397,11 @@ im_iterate( IMAGE *im, /* Signal end of eval. */ result |= im__end_eval( iter.t ); - result |= iterate_call_all_stop( &iter, iter.tg ); + + /* Shut down and test for any errors. + */ + iterate_call_all_stop( &iter ); + result |= im_threadgroup_iserror( iter.tg ); iterate_free( &iter ); return( result ); diff --git a/libvips/iofuncs/im_wbuffer.c b/libvips/iofuncs/im_wbuffer.c index 3bc984dc..d60c16f4 100644 --- a/libvips/iofuncs/im_wbuffer.c +++ b/libvips/iofuncs/im_wbuffer.c @@ -182,11 +182,31 @@ wbuffer_new( im_threadgroup_t *tg, im_wbuffer_fn write_fn, void *a, void *b ) return( wbuffer ); } -/* At end of work_fn ... need to tell wbuffer write thread that we're done. +/* Our work function ... generate a tile and tell the wbuffer write thread that + * we're done. */ static int -wbuffer_work_fn( REGION *region, WriteBuffer *wbuffer ) +wbuffer_work_fn( im_thread_t *thr, + REGION *reg, void *a, void *b, void *c ) { + WriteBuffer *wbuffer = (WriteBuffer *) a; + + /* thr pos needs to be set before coming here ... check. + */ +{ + Rect image; + + image.left = 0; + image.top = 0; + image.width = thr->tg->im->Xsize; + image.height = thr->tg->im->Ysize; + + g_assert( im_rect_includesrect( &image, &thr->pos ) ); +} + + if( im_prepare_to( reg, thr->oreg, &thr->pos, thr->x, thr->y ) ) + return( -1 ); + im_semaphore_upn( &wbuffer->nwrite, 1 ); return( 0 ); @@ -319,14 +339,10 @@ wbuffer_eval_to_file( WriteBuffer *b1, WriteBuffer *b2 ) printf( "wbuffer_eval_to_file: partial image output to file\n" ); #endif /*DEBUG*/ - /* Note we'll be working to fill a contigious area. - */ - tg->inplace = 1; - /* What threads do at the end of each tile ... decrement the nwrite * semaphore. */ - tg->work = (im__work_fn) wbuffer_work_fn; + tg->work = wbuffer_work_fn; /* Fill to in steps, write each to the output. */ @@ -403,8 +419,7 @@ wbuffer_eval_to_file( WriteBuffer *b1, WriteBuffer *b2 ) } int -im_wbuffer( im_threadgroup_t *tg, - im_wbuffer_fn write_fn, void *a, void *b ) +im_wbuffer( im_threadgroup_t *tg, im_wbuffer_fn write_fn, void *a, void *b ) { WriteBuffer *b1, *b2; int result; diff --git a/libvips/iofuncs/threadgroup.c b/libvips/iofuncs/threadgroup.c index 4ff19cef..74d6a6eb 100644 --- a/libvips/iofuncs/threadgroup.c +++ b/libvips/iofuncs/threadgroup.c @@ -10,6 +10,9 @@ * - double-buffer file writes * 27/11/06 * - break stuff out to generate / iterate + * 15/10/09 + * - get rid of inplace and default work stuff, you must now always set a + * work function */ /* @@ -52,7 +55,6 @@ #include #include -#include #ifdef HAVE_UNISTD_H #include #endif /*HAVE_UNISTD_H*/ @@ -215,7 +217,7 @@ threadgroup_idle_remove( im_thread_t *thr ) #ifdef DEBUG_HIGHWATER tg->nidle -= 1; - assert( tg->nidle >= 0 && tg->nidle <= tg->nthr ); + g_assert( tg->nidle >= 0 && tg->nidle <= tg->nthr ); #endif /*DEBUG_HIGHWATER*/ g_mutex_unlock( tg->idle_lock ); @@ -230,12 +232,12 @@ threadgroup_idle_add( im_thread_t *thr ) g_mutex_lock( tg->idle_lock ); - assert( !g_slist_find( tg->idle, thr ) ); + g_assert( !g_slist_find( tg->idle, thr ) ); tg->idle = g_slist_prepend( tg->idle, thr ); #ifdef DEBUG_HIGHWATER tg->nidle += 1; - assert( tg->nidle >= 0 && tg->nidle <= tg->nthr ); + g_assert( tg->nidle >= 0 && tg->nidle <= tg->nthr ); #endif /*DEBUG_HIGHWATER*/ im_semaphore_up( &tg->idle_sem ); @@ -256,14 +258,14 @@ im_threadgroup_get( im_threadgroup_t *tg ) g_mutex_lock( tg->idle_lock ); - assert( tg->idle ); - assert( tg->idle->data ); + g_assert( tg->idle ); + g_assert( tg->idle->data ); thr = (im_thread_t *) tg->idle->data; tg->idle = g_slist_remove( tg->idle, thr ); #ifdef DEBUG_HIGHWATER tg->nidle -= 1; - assert( tg->nidle >= 0 && tg->nidle <= tg->nthr ); + g_assert( tg->nidle >= 0 && tg->nidle <= tg->nthr ); if( tg->nidle < tg->min_idle ) tg->min_idle = tg->nidle; #endif /*DEBUG_HIGHWATER*/ @@ -336,26 +338,14 @@ work_fn( im_thread_t *thr ) { /* Doublecheck only one thread per region. */ - assert( thr->thread == g_thread_self() ); + g_assert( thr->thread == g_thread_self() ); - /* Prepare this area. - */ - if( thr->tg->inplace ) { - if( im_prepare_to( thr->reg, thr->oreg, - &thr->pos, thr->x, thr->y ) ) - thr->error = -1; - } - else { - /* Attach to this position. - */ - if( im_prepare( thr->reg, &thr->pos ) ) - thr->error = -1; - } + g_assert( thr->tg->work ); /* Call our work function. */ - if( !thr->error && thr->tg->work && - thr->tg->work( thr->reg, thr->a, thr->b, thr->c ) ) + if( !thr->error && + thr->tg->work( thr, thr->reg, thr->a, thr->b, thr->c ) ) thr->error = -1; /* Back on the idle queue again. @@ -364,7 +354,7 @@ work_fn( im_thread_t *thr ) } #ifdef HAVE_THREADS -/* What runs as a thread ... loop, waiting to be told to fill our region. +/* What runs as a thread ... loop, waiting to be told to do stuff. */ static void * thread_main_loop( void *a ) @@ -378,7 +368,7 @@ thread_main_loop( void *a ) im__region_take_ownership( thr->reg ); for(;;) { - assert( tg == thr->tg ); + g_assert( tg == thr->tg ); /* Signal the main thread that we are idle, and block. */ @@ -495,7 +485,7 @@ threadgroup_kill_threads( im_threadgroup_t *tg ) thread_free( tg->thr[i] ); tg->thr = NULL; - assert( !tg->idle ); + g_assert( !tg->idle ); /* Reset the idle semaphore. */ @@ -550,7 +540,6 @@ im_threadgroup_create( IMAGE *im ) tg->zombie = 0; tg->im = im; tg->work = NULL; - tg->inplace = 0; if( (tg->nthr = im_concurrency_get()) < 0 ) return( NULL ); tg->thr = NULL; @@ -632,21 +621,6 @@ im_threadgroup_create( IMAGE *im ) void im_threadgroup_trigger( im_thread_t *thr ) { - im_threadgroup_t *tg = thr->tg; - - /* thr pos needs to be set before coming here ... check. - */ -{ - Rect image; - - image.left = 0; - image.top = 0; - image.width = tg->im->Xsize; - image.height = tg->im->Ysize; - - assert( im_rect_includesrect( &image, &thr->pos ) ); -} - /* Start worker going. */ im_semaphore_up( &thr->go );