From 8d7dcfe5f652e48b450637f078b13958e91d27b9 Mon Sep 17 00:00:00 2001 From: John Cupitt Date: Sat, 20 Mar 2010 23:56:35 +0000 Subject: [PATCH] added wbuffer2 --- ChangeLog | 2 + TODO | 7 + libvips/include/vips/threadpool.h | 52 ++-- libvips/include/vips/vips.h | 1 + libvips/iofuncs/Makefile.am | 2 + libvips/iofuncs/im_generate.c | 8 +- libvips/iofuncs/im_init_world.c | 2 + libvips/iofuncs/im_wbuffer2.c | 396 ++++++++++++------------------ libvips/iofuncs/threadgroup.c | 32 +-- libvips/iofuncs/threadpool.c | 143 +++++------ 10 files changed, 274 insertions(+), 371 deletions(-) diff --git a/ChangeLog b/ChangeLog index 4316ac57..38d64a84 100644 --- a/ChangeLog +++ b/ChangeLog @@ -36,6 +36,8 @@ set >0 can override) on linux and win32 - better nprocs guesser - im_render() fixes to help the paintbox, some speedups too +- added im_wbuffer2(), a new distributed threading system, and --vips-wbuffer2 + to enable it 15/1/10 started 7.21.1 - added "written" callbacks, used to implement write to non-vips formats diff --git a/TODO b/TODO index f5645351..26679df8 100644 --- a/TODO +++ b/TODO @@ -1,3 +1,10 @@ +- do we need to wait for the last bg write to end? + +- make VipsThread opaque ... have a separate VipsThreadState struct holding + stuff and only expose that + +- nip2 image display does not work with threading disabled + - try a distributed worker model for image generation at the moment threadpools have an idle list and a manager ... in fact, what diff --git a/libvips/include/vips/threadpool.h b/libvips/include/vips/threadpool.h index 776d4859..2f9faacd 100644 --- a/libvips/include/vips/threadpool.h +++ b/libvips/include/vips/threadpool.h @@ -50,18 +50,24 @@ typedef struct { /*< private >*/ struct _VipsThreadpool *pool; /* Pool we are part of */ - REGION *reg; /* Region this thread operates on */ - GThread *thread; /* Thread for this region */ - gboolean kill; /* Set this to make thread kill itself */ - gboolean stop; /* Set this to make thread stop work */ - gboolean error; /* Set by thread if work fn fails */ - REGION *oreg; /* If part of an inplace pool, */ - Rect pos; /* Where this thread should write */ - int x, y; /* Its result */ + /* Set this to ask the thread to exit. + */ + gboolean exit; - void *a, *b, *c; /* User arguments to work fns */ + /* Set by the thread if work or allocate return an error. + */ + gboolean error; + + /* Thread state for the worker and allocate. Handy for communication. + * The region is created and destroyed by the threadpool for the + * worker. + */ + REGION *reg; + Rect pos; + int x, y; + void *a, *b, *c; #ifdef TIME_THREAD double *btime, *etime; @@ -69,16 +75,17 @@ typedef struct { #endif /*TIME_THREAD*/ } VipsThread; -/* A work function. This does a unit of work (eg. processing a tile or - * whatever). - */ -typedef int (*VipsThreadpoolWork)( VipsThread *thr, REGION *reg, - void *a, void *b, void *c ); - /* A work allocate function. This is run single-threaded by a worker to * set up a new work unit. + * Return non-zero for errors. Set *stop for "no more work to do" */ typedef int (*VipsThreadpoolAllocate)( VipsThread *thr, + void *a, void *b, void *c, gboolean *stop ); + +/* A work function. This does a unit of work (eg. processing a tile or + * whatever). Return non-zero for errors. + */ +typedef int (*VipsThreadpoolWork)( VipsThread *thr, REGION *reg, void *a, void *b, void *c ); /* What we track for a group of threads working together. @@ -104,9 +111,13 @@ typedef struct _VipsThreadpool { */ im_semaphore_t finish; - gboolean kill; /* Set to stop eval early */ - gboolean stop; /* Set on normal end of computation */ - gboolean progress; /* Set this to get eval progress feedback */ + /* Set this to abort evaluation early with an error. + */ + gboolean kill; + + /* Set by Allocate (via an arg) to indicate normal end of computation. + */ + gboolean stop; /* Set this if the pool has been shut down. We sometimes need to allow * double-frees. @@ -114,12 +125,17 @@ typedef struct _VipsThreadpool { gboolean zombie; } VipsThreadpool; + int vips_threadpool_run( VipsImage *im, VipsThreadpoolAllocate allocate, VipsThreadpoolWork work, void *a, void *b, void *c ); void vips_get_tile_size( VipsImage *im, int *tile_width, int *tile_height, int *nlines ); +extern int im__wbuffer2; + +int im_wbuffer2( VipsImage *im, im_wbuffer_fn write_fn, void *a, void *b ); + #ifdef __cplusplus } #endif /*__cplusplus*/ diff --git a/libvips/include/vips/vips.h b/libvips/include/vips/vips.h index 1abb78c5..3d110d82 100644 --- a/libvips/include/vips/vips.h +++ b/libvips/include/vips/vips.h @@ -117,6 +117,7 @@ extern "C" { #include #include #include +#include #include #include diff --git a/libvips/iofuncs/Makefile.am b/libvips/iofuncs/Makefile.am index ae8750fc..a06440d3 100644 --- a/libvips/iofuncs/Makefile.am +++ b/libvips/iofuncs/Makefile.am @@ -29,6 +29,7 @@ libiofuncs_la_SOURCES = \ im_unmapfile.c \ im_guess_prefix.c \ im_wbuffer.c \ + im_wbuffer2.c \ im_wrapmany.c \ im_writeline.c \ memory.c \ @@ -39,6 +40,7 @@ libiofuncs_la_SOURCES = \ rect.c \ semaphore.c \ threadgroup.c \ + threadpool.c \ util.c \ im_init_world.c \ buf.c \ diff --git a/libvips/iofuncs/im_generate.c b/libvips/iofuncs/im_generate.c index e5c10842..bf146bac 100644 --- a/libvips/iofuncs/im_generate.c +++ b/libvips/iofuncs/im_generate.c @@ -605,8 +605,12 @@ im_generate( IMAGE *im, im_region_free( or ); return( -1 ); } - if( im->dtype == IM_OPENOUT ) - res = im_wbuffer( tg, write_vips, NULL, NULL ); + if( im->dtype == IM_OPENOUT ) { + if( im__wbuffer2 ) + res = im_wbuffer2( im, write_vips, NULL, NULL ); + else + res = im_wbuffer( tg, write_vips, NULL, NULL ); + } else res = eval_to_memory( tg, or ); diff --git a/libvips/iofuncs/im_init_world.c b/libvips/iofuncs/im_init_world.c index 2eb59cc0..95769908 100644 --- a/libvips/iofuncs/im_init_world.c +++ b/libvips/iofuncs/im_init_world.c @@ -268,6 +268,8 @@ static GOptionEntry option_entries[] = { N_( "set fatstrip height to N (DEBUG)" ), "N" }, { "vips-progress", 'p', 0, G_OPTION_ARG_NONE, &im__progress, N_( "show progress feedback" ), NULL }, + { "vips-wbuffer2", 'd', 0, G_OPTION_ARG_NONE, &im__wbuffer2, + N_( "use distributed work allocation" ), NULL }, { NULL } }; diff --git a/libvips/iofuncs/im_wbuffer2.c b/libvips/iofuncs/im_wbuffer2.c index e4786eb3..aa312bdd 100644 --- a/libvips/iofuncs/im_wbuffer2.c +++ b/libvips/iofuncs/im_wbuffer2.c @@ -32,8 +32,8 @@ */ /* - */ #define DEBUG + */ #ifdef HAVE_CONFIG_H #include @@ -73,9 +73,6 @@ typedef struct _WriteBuffer { int write_errno; /* Save write errors here */ GThread *thread; /* BG writer thread */ gboolean kill; /* Set to ask thread to exit */ - im_wbuffer_fn write_fn; /* BG write with this */ - void *a; /* Client data */ - void *b; } WriteBuffer; /* Per-call state. @@ -99,8 +96,18 @@ typedef struct _Write { int tile_width; int tile_height; int nlines; + + /* The file format write operation. + */ + im_wbuffer_fn write_fn; + void *a; + void *b; } Write; +/* Enable im_wbuffer2 ... set from the cmd line, tested by our users. + */ +int im__wbuffer2 = 0; + static void wbuffer_free( WriteBuffer *wbuffer ) { @@ -113,9 +120,9 @@ wbuffer_free( WriteBuffer *wbuffer ) /* Return value is always NULL (see wbuffer_write_thread). */ (void) g_thread_join( wbuffer->thread ); -#ifdef DEBUG_CREATE +#ifdef DEBUG printf( "wbuffer_free: g_thread_join()\n" ); -#endif /*DEBUG_CREATE*/ +#endif /*DEBUG*/ wbuffer->thread = NULL; } @@ -130,13 +137,18 @@ wbuffer_free( WriteBuffer *wbuffer ) static void wbuffer_write( WriteBuffer *wbuffer ) { - wbuffer->write_errno = wbuffer->write_fn( wbuffer->region, - &wbuffer->area, wbuffer->a, wbuffer->b ); + Write *write = wbuffer->write; #ifdef DEBUG - printf( "wbuffer_write: %d bytes from wbuffer %p\n", - wbuffer->region->bpl * wbuffer->area.height, wbuffer ); + static int n = 1; + + printf( "wbuffer_write: %d, %d bytes from wbuffer %p\n", + n++, wbuffer->region->bpl * wbuffer->area.height, wbuffer ); #endif /*DEBUG*/ + + wbuffer->write_errno = write->write_fn( wbuffer->region, + &wbuffer->area, write->a, write->b ); + } #ifdef HAVE_THREADS @@ -148,12 +160,14 @@ wbuffer_write_thread( void *data ) WriteBuffer *wbuffer = (WriteBuffer *) data; for(;;) { + /* Wait to be told to write. + */ im_semaphore_down( &wbuffer->go ); if( wbuffer->kill ) break; - /* Wait for all writer threads to leave this wbuffer. + /* Now block until the last worker finishes on this buffer. */ im_semaphore_downn( &wbuffer->nwrite, 0 ); @@ -169,7 +183,7 @@ wbuffer_write_thread( void *data ) #endif /*HAVE_THREADS*/ static WriteBuffer * -wbuffer_new( Write *write, im_wbuffer_fn write_fn, void *a, void *b ) +wbuffer_new( Write *write ) { WriteBuffer *wbuffer; @@ -183,15 +197,16 @@ wbuffer_new( Write *write, im_wbuffer_fn write_fn, void *a, void *b ) wbuffer->write_errno = 0; wbuffer->thread = NULL; wbuffer->kill = FALSE; - wbuffer->write_fn = write_fn; - wbuffer->a = a; - wbuffer->b = b; if( !(wbuffer->region = im_region_create( write->im )) ) { wbuffer_free( wbuffer ); return( NULL ); } + /* The worker threads need to be able to move the buffers around. + */ + im__region_no_ownership( wbuffer->region ); + #ifdef HAVE_THREADS /* Make this last (picks up parts of wbuffer on startup). */ @@ -206,21 +221,49 @@ wbuffer_new( Write *write, im_wbuffer_fn write_fn, void *a, void *b ) return( wbuffer ); } -/* Our VipsThreadpoolWork function ... generate a tile and tell the wbuffer - * write thread that we're done. +/* Write and swap buffers. */ static int -wbuffer_work_fn( VipsThread *thr, - REGION *reg, void *a, void *b, void *c ) +wbuffer_flush( Write *write ) { - WriteBuffer *wbuffer = (WriteBuffer *) a; - Write *write = wbuffer->write; + WriteBuffer *t; - if( im_prepare_to( reg, wbuffer->reg, - &thr->pos, thr->pos.left, thr->pos.top ) ) - return( -1 ); +#ifdef DEBUG + static int n = 1; - im_semaphore_up( &wbuffer->nwrite ); + printf( "wbuffer_flush: %d\n", n++ ); +#endif /*DEBUG*/ + + /* Block until the other buffer has been written. We have to do this + * before we can set this buffer writing or we'll lose output ordering. + */ + if( write->buf->area.top > 0 ) { + im_semaphore_down( &write->buf_back->done ); + + /* Previous write suceeded? + */ + if( write->buf_back->write_errno ) { + im_error_system( write->buf_back->write_errno, + "wbuffer_write", "%s", _( "write failed" ) ); + return( -1 ); + } + } + + /* Set the background writer going for this buffer. + */ +#ifdef HAVE_THREADS + im_semaphore_up( &write->buf->go ); +#else + /* No threads? Write ourselves synchronously. + */ + wbuffer_write( write->buf ); +#endif /*HAVE_THREADS*/ + + /* Swap buffers. + */ + t = write->buf; + write->buf = write->buf_back; + write->buf_back = t; return( 0 ); } @@ -231,6 +274,7 @@ static int wbuffer_position( WriteBuffer *wbuffer, int top, int height ) { Rect image, area; + int result; image.left = 0; image.top = 0; @@ -243,14 +287,20 @@ wbuffer_position( WriteBuffer *wbuffer, int top, int height ) area.height = height; im_rect_intersectrect( &area, &image, &wbuffer->area ); - if( im_region_buffer( wbuffer->region, &wbuffer->area ) ) - return( -1 ); + + /* The workers take turns to move the buffers. + */ + im__region_take_ownership( wbuffer->region ); + + result = im_region_buffer( wbuffer->region, &wbuffer->area ); + + im__region_no_ownership( wbuffer->region ); /* This should be an exclusive buffer, hopefully. */ g_assert( !wbuffer->region->buffer->done ); - return( 0 ); + return( result ); } /* Our VipsThreadpoolAllocate function ... move the thread to the next tile @@ -259,65 +309,48 @@ wbuffer_position( WriteBuffer *wbuffer, int top, int height ) * iteration. */ static gboolean -wbuffer_allocate_fn( VipsThread *thr, void *a, void *b, void *c ) +wbuffer_allocate_fn( VipsThread *thr, + void *a, void *b, void *c, gboolean *stop ) { - WriteBuffer *wbuffer = (WriteBuffer *) a; - Write *write = wbuffer->write; + Write *write = (Write *) a; Rect image; Rect tile; - /* Is the current x/y position OK? New line or maybe new buffer or - * maybe all done. +#ifdef DEBUG + printf( "wbuffer_allocate_fn\n" ); +#endif /*DEBUG*/ + + /* Is the state x/y OK? New line or maybe new buffer or maybe even + * all done. */ - if( write->x > write->buf->area.width ) { + if( write->x >= write->buf->area.width ) { write->x = 0; write->y += write->tile_height; - if( write->y > IM_RECT_BOTTOM( &write->buf->area ) ) { - /* Buffer full. Block until the other buffer has been - * written. + if( write->y >= IM_RECT_BOTTOM( &write->buf->area ) ) { + /* Write and swap buffers. */ - if( write->buf->area.top > 0 ) { - im_semaphore_down( &write->buf_back->done ); - - /* Previous write suceeded? - */ - if( write->buf_back->write_errno ) { - thr->err = write->buf_back->write_errno; - return( -1 ); - } - } - - /* Start writing this buffer. - */ - im_semaphore_up( &write->buf->go ); - - /* Swap buffers. - */ - { - WriteBuffer *t; - - t = write->buf; - write->buf = write->buf_back; - write->buf_back = t; - } + if( wbuffer_flush( write ) ) + return( -1 ); /* End of image? */ - if( write->buf_back->area.top + write->nlines > - write->im->Ysize ) - return( -1 ); + if( write->y >= write->im->Ysize ) { + *stop = TRUE; + return( 0 ); + } - /* Position buf below buf_back. + /* Position buf at the new y. */ if( wbuffer_position( write->buf, - write->buf_back->area.top + write->nlines, - write->nlines ) ) + write->y, write->nlines ) ) return( -1 ); } } + /* x, y and buf are good: save params for thread. + */ image.left = 0; image.top = 0; image.width = write->im->Xsize; @@ -327,209 +360,90 @@ wbuffer_allocate_fn( VipsThread *thr, void *a, void *b, void *c ) tile.width = write->tile_width; tile.height = write->tile_height; im_rect_intersectrect( &image, &tile, &thr->pos ); + thr->a = write->buf; + /* Add to the number of writers on the buffer. + */ + im_semaphore_upn( &write->buf->nwrite, -1 ); + + /* Move state on. + */ write->x += write->tile_width; return( 0 ); } -/* Loop over a wbuffer filling it threadily. +/* Our VipsThreadpoolWork function ... generate a tile! */ static int -wbuffer_fill( WriteBuffer *wbuffer ) +wbuffer_work_fn( VipsThread *thr, + REGION *reg, void *a, void *b, void *c ) { - Rect *area = &wbuffer->area; - im_threadgroup_t *tg = wbuffer->tg; - IMAGE *im = tg->im; - Rect image; - - int x, y; + WriteBuffer *wbuffer = (WriteBuffer *) thr->a; #ifdef DEBUG - printf( "wbuffer_fill: starting for wbuffer %p at line %d\n", - wbuffer, area->top ); + printf( "wbuffer_work_fn\n" ); #endif /*DEBUG*/ - image.left = 0; - image.top = 0; - image.width = im->Xsize; - image.height = im->Ysize; + if( im_prepare_to( reg, wbuffer->region, + &thr->pos, thr->pos.left, thr->pos.top ) ) + return( -1 ); - /* Loop over area, sparking threads for all sub-parts in turn. + /* Tell the bg write thread we've left. */ - for( y = area->top; y < IM_RECT_BOTTOM( area ); y += tg->ph ) - for( x = area->left; x < IM_RECT_RIGHT( area ); x += tg->pw ) { - im_thread_t *thr; - Rect pos; - Rect clipped; - - /* thrs appear on idle when the child thread does - * threadgroup_idle_add and hits the 'go' semaphore. - */ - thr = im_threadgroup_get( tg ); - - /* Set the position we want to generate with this - * thread. Clip against the size of the image and the - * space available in or. - */ - pos.left = x; - pos.top = y; - pos.width = tg->pw; - pos.height = tg->ph; - im_rect_intersectrect( &pos, &image, &clipped ); - im_rect_intersectrect( &clipped, area, &clipped ); - - /* Note params. - */ - thr->oreg = wbuffer->region; - thr->pos = clipped; - thr->x = clipped.left; - thr->y = clipped.top; - thr->a = wbuffer; - -#ifdef DEBUG - printf( "wbuffer_fill: starting for tile at %d x %d\n", - x, y ); -#endif /*DEBUG*/ - - /* Add writer to n of writers on wbuffer, set it going. - */ - im_semaphore_upn( &wbuffer->nwrite, -1 ); - im_threadgroup_trigger( thr ); - - /* Trigger any eval callbacks on our source image and - * check for errors. - */ - if( im__handle_eval( tg->im, tg->pw, tg->ph ) || - im_threadgroup_iserror( tg ) ) { - /* Don't kill threads yet ... we may want to - * get some error stuff out of them. - */ - im_threadgroup_wait( tg ); - return( -1 ); - } - } + im_semaphore_upn( &wbuffer->nwrite, 1 ); return( 0 ); } -/* Eval to file. - */ -static int -wbuffer_eval_to_file( WriteBuffer *b1, WriteBuffer *b2 ) -{ - im_threadgroup_t *tg = b1->tg; - IMAGE *im = tg->im; - int y; - - assert( b1->tg == b2->tg ); - -#ifdef DEBUG - int nstrips; - - nstrips = 0; - printf( "wbuffer_eval_to_file: partial image output to file\n" ); -#endif /*DEBUG*/ - - /* What threads do at the end of each tile ... decrement the nwrite - * semaphore. - */ - tg->work = wbuffer_work_fn; - - /* Fill to in steps, write each to the output. - */ - for( y = 0; y < im->Ysize; y += tg->nlines ) { - /* Attach to this position in image. - */ - if( wbuffer_position( b1, 0, y, im->Xsize, tg->nlines ) ) - return( -1 ); - - /* Spark off threads to fill with data. - */ - if( wbuffer_fill( b1 ) ) - return( -1 ); - - /* We have to keep the ordering on wbuffer writes, so we can't - * have more than one background write going at once. Plus we - * want to make sure write()s don't get interleaved. Wait for - * the previous BG write (if any) to finish. - */ - if( y > 0 ) { - im_semaphore_down( &b2->done ); - - /* Previous write suceeded? - */ - if( b2->write_errno ) { - im_error_system( b2->write_errno, - "im__eval_to_file", - "%s", _( "write failed" ) ); - return( -1 ); - } - } - - /* b1 write can go. - */ - im_semaphore_up( &b1->go ); - -#ifndef HAVE_THREADS - /* No threading ... just write. - */ - wbuffer_write( b1 ); -#endif /*HAVE_THREADS*/ - - /* Rotate wbuffers. - */ - { - WriteBuffer *t; - - t = b1; b1 = b2; b2 = t; - } - -#ifdef DEBUG - nstrips++; -#endif /*DEBUG*/ - } - - /* Wait for all threads to finish, check for any errors. - */ - im_threadgroup_wait( tg ); - im_semaphore_down( &b2->done ); - if( im_threadgroup_iserror( tg ) ) - return( -1 ); - if( b1->write_errno || b2->write_errno ) { - im_error_system( - b1->write_errno ? b1->write_errno : b2->write_errno, - "im__eval_to_file", "%s", _( "write failed" ) ); - return( -1 ); - } - -#ifdef DEBUG - printf( "wbuffer_eval_to_file: success! %d strips written\n", nstrips ); -#endif /*DEBUG*/ - - return( 0 ); -} - int -im_wbuffer( im_threadgroup_t *tg, im_wbuffer_fn write_fn, void *a, void *b ) +im_wbuffer2( VipsImage *im, im_wbuffer_fn write_fn, void *a, void *b ) { - WriteBuffer *b1, *b2; + Write write; int result; - if( im__start_eval( tg->im ) ) + write.im = im; + write.buf = wbuffer_new( &write ); + write.buf_back = wbuffer_new( &write ); + write.x = 0; + write.y = 0; + write.write_fn = write_fn; + write.a = a; + write.b = b; + + vips_get_tile_size( im, + &write.tile_width, &write.tile_height, &write.nlines ); + + if( im__start_eval( im ) ) return( -1 ); result = 0; - b1 = wbuffer_new( tg, write_fn, a, b ); - b2 = wbuffer_new( tg, write_fn, a, b ); - - if( !b1 || !b2 || wbuffer_eval_to_file( b1, b2 ) ) + if( !write.buf || + !write.buf_back || + wbuffer_position( write.buf, 0, write.nlines ) || + vips_threadpool_run( im, + wbuffer_allocate_fn, wbuffer_work_fn, + &write, NULL, NULL ) ) result = -1; - im__end_eval( tg->im ); - wbuffer_free( b1 ); - wbuffer_free( b2 ); + /* We've set all the buffers writing, but not waited for the BG + * writer to finish. This can take a while: it has to wait for the + * last worker to make the last tile. + * + * We can't just free the buffers (which will wait for the bg threads + * to finish), since the bg thread might see the kill before it gets a + * chance to write. + */ + if( write.buf->area.top > 0 ) + im_semaphore_down( &write.buf_back->done ); + + im__end_eval( im ); + + /* Free buffers ... this will wait for the final write to finish too. + */ + wbuffer_free( write.buf ); + wbuffer_free( write.buf_back ); return( result ); } diff --git a/libvips/iofuncs/threadgroup.c b/libvips/iofuncs/threadgroup.c index c78abb2b..982f551b 100644 --- a/libvips/iofuncs/threadgroup.c +++ b/libvips/iofuncs/threadgroup.c @@ -618,41 +618,13 @@ im_threadgroup_create( IMAGE *im ) tg->zombie = 0; tg->im = im; tg->work = NULL; - if( (tg->nthr = im_concurrency_get()) < 0 ) - return( NULL ); + tg->nthr = im_concurrency_get(); tg->thr = NULL; tg->kill = 0; /* Pick a render geometry. */ - switch( tg->im->dhint ) { - case IM_SMALLTILE: - tg->pw = im__tile_width; - tg->ph = im__tile_height; - - /* Enough lines of tiles that we can expect to be able to keep - * nthr busy. - */ - tg->nlines = tg->ph * (1 + tg->nthr / - IM_MAX( 1, tg->im->Xsize / tg->pw )); - break; - - case IM_FATSTRIP: - tg->pw = tg->im->Xsize; - tg->ph = im__fatstrip_height; - tg->nlines = tg->ph * tg->nthr * 2; - break; - - case IM_ANY: - case IM_THINSTRIP: - tg->pw = tg->im->Xsize; - tg->ph = im__thinstrip_height; - tg->nlines = tg->ph * tg->nthr * 2; - break; - - default: - error_exit( "panic: internal error #98i245983425" ); - } + vips_get_tile_size( tg->im, &tg->pw, &tg->ph, &tg->nlines ); /* Attach tidy-up callback. */ diff --git a/libvips/iofuncs/threadpool.c b/libvips/iofuncs/threadpool.c index de91473d..5c744d91 100644 --- a/libvips/iofuncs/threadpool.c +++ b/libvips/iofuncs/threadpool.c @@ -31,10 +31,10 @@ */ /* - */ #define TIME_THREAD #define DEBUG_CREATE #define DEBUG_IO + */ #ifdef HAVE_CONFIG_H #include @@ -51,7 +51,6 @@ #include #include #include -#include #ifdef WITH_DMALLOC #include @@ -91,6 +90,7 @@ save_time_buffers( VipsThread *thr ) char name[256]; im_snprintf( name, 256, "time%d", rn++ ); + printf( "saving time buffer to \"%s\"\n", name ); if( !(fp = fopen( name, "w" )) ) error_exit( "unable to write to \"%s\"", name ); for( i = 0; i < thr->tpos; i++ ) @@ -109,7 +109,7 @@ thread_free( VipsThread *thr ) /* Is there a thread running this region? Kill it! */ if( thr->thread ) { - thr->kill = 1; + thr->exit = 1; /* Return value is always NULL (see thread_main_loop). */ @@ -122,7 +122,6 @@ thread_free( VipsThread *thr ) } IM_FREEF( im_region_free, thr->reg ); - thr->oreg = NULL; thr->pool = NULL; #ifdef TIME_THREAD @@ -131,23 +130,58 @@ thread_free( VipsThread *thr ) #endif /*TIME_THREAD*/ } -/* The work we do in one loop. This can run from the main thread in a loop if - * we're unthreaded, or in parallel if we are threaded. +/* The main loop: get some work, do it! Can run from many worker threads, or + * from the main thread if threading is off. */ static void -work_fn( VipsThread *thr ) +thread_loop( VipsThread *thr ) { - /* Doublecheck only one thread per region. - */ - g_assert( thr->thread == g_thread_self() ); + VipsThreadpool *pool = thr->pool; - g_assert( thr->pool->work ); + for(;;) { + /* Ask for a work unit. + */ + g_mutex_lock( pool->allocate_lock ); + if( !pool->stop ) { + if( pool->allocate( thr, + pool->a, pool->b, pool->c, &pool->stop ) ) + thr->error = -1; + } + g_mutex_unlock( pool->allocate_lock ); - /* Call our work function. - */ - if( !thr->error && - thr->pool->work( thr, thr->reg, thr->a, thr->b, thr->c ) ) - thr->error = 1; + if( pool->stop || pool->kill ) + break; + if( thr->exit || thr->error ) + break; + +#ifdef TIME_THREAD + /* Note start time. + */ + if( thr->btime && thr->tpos < IM_TBUF_SIZE ) + thr->btime[thr->tpos] = + g_timer_elapsed( thread_timer, NULL ); +#endif /*TIME_THREAD*/ + + /* Loop once. + */ + if( pool->work( thr, thr->reg, pool->a, pool->b, pool->c ) ) + thr->error = 1; + + if( pool->stop || pool->kill ) + break; + if( thr->exit || thr->error ) + break; + +#ifdef TIME_THREAD + /* Note stop time. + */ + if( thr->etime && thr->tpos < IM_TBUF_SIZE ) { + thr->etime[thr->tpos] = + g_timer_elapsed( thread_timer, NULL ); + thr->tpos += 1; + } +#endif /*TIME_THREAD*/ + } } #ifdef HAVE_THREADS @@ -166,48 +200,9 @@ thread_main_loop( void *a ) */ im__region_take_ownership( thr->reg ); - for(;;) { - /* Ask for a work unit. - */ - g_mutex_lock( pool->allocate_lock ); - if( !pool->stop ) { - gboolean morework; - - morework = pool->allocate( thr, - pool->a, pool->b, pool->c ); - if( !morework ) - pool->stop = TRUE; - } - g_mutex_unlock( pool->allocate_lock ); - - /* Asked to stop work? - */ - if( thr->stop || thr->kill || - pool->stop || pool->kill ) - break; - -#ifdef TIME_THREAD - /* Note start time. - */ - if( thr->btime ) - thr->btime[thr->tpos] = - g_timer_elapsed( thread_timer, NULL ); -#endif /*TIME_THREAD*/ - - /* Loop once. - */ - work_fn( thr ); - -#ifdef TIME_THREAD - /* Note stop time. - */ - if( thr->etime ) { - thr->etime[thr->tpos] = - g_timer_elapsed( thread_timer, NULL ); - thr->tpos++; - } -#endif /*TIME_THREAD*/ - } + /* Do all the work we can. + */ + thread_loop( thr ); /* We are exiting: tell the main thread. */ @@ -229,11 +224,8 @@ vips_thread_new( VipsThreadpool *pool ) thr->pool = pool; thr->reg = NULL; thr->thread = NULL; - thr->kill = 0; - thr->stop = 0; + thr->exit = 0; thr->error = 0; - thr->oreg = NULL; - thr->a = thr->b = thr->c = NULL; #ifdef TIME_THREAD thr->btime = NULL; thr->etime = NULL; @@ -362,7 +354,6 @@ vips_threadpool_new( VipsImage *im ) im_semaphore_init( &pool->finish, 0, "finish" ); pool->kill = FALSE; pool->stop = FALSE; - pool->progress = FALSE; pool->zombie = FALSE; /* Attach tidy-up callback. @@ -444,20 +435,7 @@ vips_threadpool_run( VipsImage *im, #else /* No threads, do the work ourselves in the main thread. */ - for(;;) { - gboolean morework; - - morework = pool->allocate( pool->thr[0] ); - if( !morework && !pool->stop ) - pool->stop = TRUE; - if( pool->thr[0]->error ) - break; - if( pool->thr[0]->stop || pool->thr[0]->kill || - pool->stop || pool->kill ) - break; - - work_fn( pool->thr[0] ); - } + thread_loop( pool->thr[0] ); #endif /*HAVE_THREADS*/ /* Test for error. @@ -479,17 +457,18 @@ vips_threadpool_run( VipsImage *im, return( result ); } -/* Pick a tile size and buffer size. +/* Pick a tile size and a buffer height. The buffer height must be a multiple + * of tile_height. */ void vips_get_tile_size( VipsImage *im, int *tile_width, int *tile_height, int *nlines ) { - const int nthr = im_get_concurrency(); + const int nthr = im_concurrency_get(); /* Pick a render geometry. */ - switch( pool->im->dhint ) { + switch( im->dhint ) { case IM_SMALLTILE: *tile_width = im__tile_width; *tile_height = im__tile_height; @@ -518,6 +497,10 @@ vips_get_tile_size( VipsImage *im, g_assert( 0 ); } + /* We make this assumption in several places. + */ + g_assert( *nlines % *tile_height == 0 ); + #ifdef DEBUG_IO printf( "vips_get_tile_size: %d by %d patches, " "groups of %d scanlines\n",