diff --git a/libvips/include/vips/threadpool.h b/libvips/include/vips/threadpool.h index f7f91c86..776d4859 100644 --- a/libvips/include/vips/threadpool.h +++ b/libvips/include/vips/threadpool.h @@ -72,13 +72,14 @@ typedef struct { /* A work function. This does a unit of work (eg. processing a tile or * whatever). */ -typedef int (*VipsThreadpoolWork)( VipsThread *thr, - REGION *, void *, void *, void * ); +typedef int (*VipsThreadpoolWork)( VipsThread *thr, REGION *reg, + void *a, void *b, void *c ); /* A work allocate function. This is run single-threaded by a worker to - * set up a new work unit. Return TRUE if computation is all done. + * set up a new work unit. */ -typedef gboolean (*VipsThreadpoolAllocate)( VipsThread *thr ); +typedef int (*VipsThreadpoolAllocate)( VipsThread *thr, + void *a, void *b, void *c ); /* What we track for a group of threads working together. */ @@ -86,9 +87,7 @@ typedef struct _VipsThreadpool { /* All private. */ /*< private >*/ - IMAGE *im; /* Image we are calculating */ - int pw, ph; /* Tile size */ - int nlines; /* Scanlines-at-once we prefer for iteration */ + VipsImage *im; /* Image we are calculating */ /* Do a unit of work (runs in parallel) and allocate a unit of work * (serial). Plus the mutex we use to serialize work allocation. @@ -118,6 +117,8 @@ typedef struct _VipsThreadpool { int vips_threadpool_run( VipsImage *im, VipsThreadpoolAllocate allocate, VipsThreadpoolWork work, void *a, void *b, void *c ); +void vips_get_tile_size( VipsImage *im, + int *tile_width, int *tile_height, int *nlines ); #ifdef __cplusplus } diff --git a/libvips/iofuncs/Makefile.am b/libvips/iofuncs/Makefile.am index 88977823..ae8750fc 100644 --- a/libvips/iofuncs/Makefile.am +++ b/libvips/iofuncs/Makefile.am @@ -39,7 +39,6 @@ libiofuncs_la_SOURCES = \ rect.c \ semaphore.c \ threadgroup.c \ - threadpool.c \ util.c \ im_init_world.c \ buf.c \ diff --git a/libvips/iofuncs/im_wbuffer2.c b/libvips/iofuncs/im_wbuffer2.c new file mode 100644 index 00000000..e4786eb3 --- /dev/null +++ b/libvips/iofuncs/im_wbuffer2.c @@ -0,0 +1,535 @@ +/* Double-buffered write. + * + * 19/3/10 + * - from im_wbuffer.c + * - move on top of VipsThreadpool, instead of im_threadgroup_t + */ + +/* + + This file is part of VIPS. + + VIPS is free software; you can redistribute it and/or modify + it under the terms of the GNU Lesser General Public License as published by + the Free Software Foundation; either version 2 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + + */ + +/* + + These files are distributed with VIPS - http://www.vips.ecs.soton.ac.uk + + */ + +/* + */ +#define DEBUG + +#ifdef HAVE_CONFIG_H +#include +#endif /*HAVE_CONFIG_H*/ +#include + +#include +#include +#include +#include +#include +#include +#include +#ifdef HAVE_UNISTD_H +#include +#endif /*HAVE_UNISTD_H*/ + +#include +#include +#include +#include + +#ifdef WITH_DMALLOC +#include +#endif /*WITH_DMALLOC*/ + +/* A buffer we are going to write to disc in a background thread. + */ +typedef struct _WriteBuffer { + struct _Write *write; + + REGION *region; /* Pixels */ + Rect area; /* Part of image this region covers */ + im_semaphore_t go; /* Start bg thread loop */ + im_semaphore_t nwrite; /* Number of threads writing to region */ + im_semaphore_t done; /* Bg thread has done write */ + int write_errno; /* Save write errors here */ + GThread *thread; /* BG writer thread */ + gboolean kill; /* Set to ask thread to exit */ + im_wbuffer_fn write_fn; /* BG write with this */ + void *a; /* Client data */ + void *b; +} WriteBuffer; + +/* Per-call state. + */ +typedef struct _Write { + VipsImage *im; + + /* We are current writing tiles to buf, buf_back is in the hands of + * the bg write thread. + */ + WriteBuffer *buf; + WriteBuffer *buf_back; + + /* The position we're at in buf. + */ + int x; + int y; + + /* The tilesize we've picked. + */ + int tile_width; + int tile_height; + int nlines; +} Write; + +static void +wbuffer_free( WriteBuffer *wbuffer ) +{ + /* Is there a thread running this region? Kill it! + */ + if( wbuffer->thread ) { + wbuffer->kill = TRUE; + im_semaphore_up( &wbuffer->go ); + + /* Return value is always NULL (see wbuffer_write_thread). + */ + (void) g_thread_join( wbuffer->thread ); +#ifdef DEBUG_CREATE + printf( "wbuffer_free: g_thread_join()\n" ); +#endif /*DEBUG_CREATE*/ + + wbuffer->thread = NULL; + } + + IM_FREEF( im_region_free, wbuffer->region ); + im_semaphore_destroy( &wbuffer->go ); + im_semaphore_destroy( &wbuffer->nwrite ); + im_semaphore_destroy( &wbuffer->done ); + im_free( wbuffer ); +} + +static void +wbuffer_write( WriteBuffer *wbuffer ) +{ + wbuffer->write_errno = wbuffer->write_fn( wbuffer->region, + &wbuffer->area, wbuffer->a, wbuffer->b ); + +#ifdef DEBUG + printf( "wbuffer_write: %d bytes from wbuffer %p\n", + wbuffer->region->bpl * wbuffer->area.height, wbuffer ); +#endif /*DEBUG*/ +} + +#ifdef HAVE_THREADS +/* Run this as a thread to do a BG write. + */ +static void * +wbuffer_write_thread( void *data ) +{ + WriteBuffer *wbuffer = (WriteBuffer *) data; + + for(;;) { + im_semaphore_down( &wbuffer->go ); + + if( wbuffer->kill ) + break; + + /* Wait for all writer threads to leave this wbuffer. + */ + im_semaphore_downn( &wbuffer->nwrite, 0 ); + + wbuffer_write( wbuffer ); + + /* Signal write complete. + */ + im_semaphore_up( &wbuffer->done ); + } + + return( NULL ); +} +#endif /*HAVE_THREADS*/ + +static WriteBuffer * +wbuffer_new( Write *write, im_wbuffer_fn write_fn, void *a, void *b ) +{ + WriteBuffer *wbuffer; + + if( !(wbuffer = IM_NEW( NULL, WriteBuffer )) ) + return( NULL ); + wbuffer->write = write; + wbuffer->region = NULL; + im_semaphore_init( &wbuffer->go, 0, "go" ); + im_semaphore_init( &wbuffer->nwrite, 0, "nwrite" ); + im_semaphore_init( &wbuffer->done, 0, "done" ); + wbuffer->write_errno = 0; + wbuffer->thread = NULL; + wbuffer->kill = FALSE; + wbuffer->write_fn = write_fn; + wbuffer->a = a; + wbuffer->b = b; + + if( !(wbuffer->region = im_region_create( write->im )) ) { + wbuffer_free( wbuffer ); + return( NULL ); + } + +#ifdef HAVE_THREADS + /* Make this last (picks up parts of wbuffer on startup). + */ + if( !(wbuffer->thread = g_thread_create( wbuffer_write_thread, wbuffer, + TRUE, NULL )) ) { + im_error( "wbuffer_new", "%s", _( "unable to create thread" ) ); + wbuffer_free( wbuffer ); + return( NULL ); + } +#endif /*HAVE_THREADS*/ + + return( wbuffer ); +} + +/* Our VipsThreadpoolWork function ... generate a tile and tell the wbuffer + * write thread that we're done. + */ +static int +wbuffer_work_fn( VipsThread *thr, + REGION *reg, void *a, void *b, void *c ) +{ + WriteBuffer *wbuffer = (WriteBuffer *) a; + Write *write = wbuffer->write; + + if( im_prepare_to( reg, wbuffer->reg, + &thr->pos, thr->pos.left, thr->pos.top ) ) + return( -1 ); + + im_semaphore_up( &wbuffer->nwrite ); + + return( 0 ); +} + +/* Move a wbuffer to a position. + */ +static int +wbuffer_position( WriteBuffer *wbuffer, int top, int height ) +{ + Rect image, area; + + image.left = 0; + image.top = 0; + image.width = wbuffer->write->im->Xsize; + image.height = wbuffer->write->im->Ysize; + + area.left = 0; + area.top = top; + area.width = wbuffer->write->im->Xsize; + area.height = height; + + im_rect_intersectrect( &area, &image, &wbuffer->area ); + if( im_region_buffer( wbuffer->region, &wbuffer->area ) ) + return( -1 ); + + /* This should be an exclusive buffer, hopefully. + */ + g_assert( !wbuffer->region->buffer->done ); + + return( 0 ); +} + +/* Our VipsThreadpoolAllocate function ... move the thread to the next tile + * that needs doing. If no buffer is available (the bg writer hasn't yet + * finished with it), we block. If all tiles are done, we return FALSE to end + * iteration. + */ +static gboolean +wbuffer_allocate_fn( VipsThread *thr, void *a, void *b, void *c ) +{ + WriteBuffer *wbuffer = (WriteBuffer *) a; + Write *write = wbuffer->write; + + Rect image; + Rect tile; + + /* Is the current x/y position OK? New line or maybe new buffer or + * maybe all done. + */ + if( write->x > write->buf->area.width ) { + write->x = 0; + write->y += write->tile_height; + + if( write->y > IM_RECT_BOTTOM( &write->buf->area ) ) { + /* Buffer full. Block until the other buffer has been + * written. + */ + if( write->buf->area.top > 0 ) { + im_semaphore_down( &write->buf_back->done ); + + /* Previous write suceeded? + */ + if( write->buf_back->write_errno ) { + thr->err = write->buf_back->write_errno; + return( -1 ); + } + } + + /* Start writing this buffer. + */ + im_semaphore_up( &write->buf->go ); + + /* Swap buffers. + */ + { + WriteBuffer *t; + + t = write->buf; + write->buf = write->buf_back; + write->buf_back = t; + } + + /* End of image? + */ + if( write->buf_back->area.top + write->nlines > + write->im->Ysize ) + return( -1 ); + + /* Position buf below buf_back. + */ + if( wbuffer_position( write->buf, + write->buf_back->area.top + write->nlines, + write->nlines ) ) + return( -1 ); + } + } + + image.left = 0; + image.top = 0; + image.width = write->im->Xsize; + image.height = write->im->Ysize; + tile.left = write->x; + tile.top = write->y; + tile.width = write->tile_width; + tile.height = write->tile_height; + im_rect_intersectrect( &image, &tile, &thr->pos ); + + write->x += write->tile_width; + + return( 0 ); +} + +/* Loop over a wbuffer filling it threadily. + */ +static int +wbuffer_fill( WriteBuffer *wbuffer ) +{ + Rect *area = &wbuffer->area; + im_threadgroup_t *tg = wbuffer->tg; + IMAGE *im = tg->im; + Rect image; + + int x, y; + +#ifdef DEBUG + printf( "wbuffer_fill: starting for wbuffer %p at line %d\n", + wbuffer, area->top ); +#endif /*DEBUG*/ + + image.left = 0; + image.top = 0; + image.width = im->Xsize; + image.height = im->Ysize; + + /* Loop over area, sparking threads for all sub-parts in turn. + */ + for( y = area->top; y < IM_RECT_BOTTOM( area ); y += tg->ph ) + for( x = area->left; x < IM_RECT_RIGHT( area ); x += tg->pw ) { + im_thread_t *thr; + Rect pos; + Rect clipped; + + /* thrs appear on idle when the child thread does + * threadgroup_idle_add and hits the 'go' semaphore. + */ + thr = im_threadgroup_get( tg ); + + /* Set the position we want to generate with this + * thread. Clip against the size of the image and the + * space available in or. + */ + pos.left = x; + pos.top = y; + pos.width = tg->pw; + pos.height = tg->ph; + im_rect_intersectrect( &pos, &image, &clipped ); + im_rect_intersectrect( &clipped, area, &clipped ); + + /* Note params. + */ + thr->oreg = wbuffer->region; + thr->pos = clipped; + thr->x = clipped.left; + thr->y = clipped.top; + thr->a = wbuffer; + +#ifdef DEBUG + printf( "wbuffer_fill: starting for tile at %d x %d\n", + x, y ); +#endif /*DEBUG*/ + + /* Add writer to n of writers on wbuffer, set it going. + */ + im_semaphore_upn( &wbuffer->nwrite, -1 ); + im_threadgroup_trigger( thr ); + + /* Trigger any eval callbacks on our source image and + * check for errors. + */ + if( im__handle_eval( tg->im, tg->pw, tg->ph ) || + im_threadgroup_iserror( tg ) ) { + /* Don't kill threads yet ... we may want to + * get some error stuff out of them. + */ + im_threadgroup_wait( tg ); + return( -1 ); + } + } + + return( 0 ); +} + +/* Eval to file. + */ +static int +wbuffer_eval_to_file( WriteBuffer *b1, WriteBuffer *b2 ) +{ + im_threadgroup_t *tg = b1->tg; + IMAGE *im = tg->im; + int y; + + assert( b1->tg == b2->tg ); + +#ifdef DEBUG + int nstrips; + + nstrips = 0; + printf( "wbuffer_eval_to_file: partial image output to file\n" ); +#endif /*DEBUG*/ + + /* What threads do at the end of each tile ... decrement the nwrite + * semaphore. + */ + tg->work = wbuffer_work_fn; + + /* Fill to in steps, write each to the output. + */ + for( y = 0; y < im->Ysize; y += tg->nlines ) { + /* Attach to this position in image. + */ + if( wbuffer_position( b1, 0, y, im->Xsize, tg->nlines ) ) + return( -1 ); + + /* Spark off threads to fill with data. + */ + if( wbuffer_fill( b1 ) ) + return( -1 ); + + /* We have to keep the ordering on wbuffer writes, so we can't + * have more than one background write going at once. Plus we + * want to make sure write()s don't get interleaved. Wait for + * the previous BG write (if any) to finish. + */ + if( y > 0 ) { + im_semaphore_down( &b2->done ); + + /* Previous write suceeded? + */ + if( b2->write_errno ) { + im_error_system( b2->write_errno, + "im__eval_to_file", + "%s", _( "write failed" ) ); + return( -1 ); + } + } + + /* b1 write can go. + */ + im_semaphore_up( &b1->go ); + +#ifndef HAVE_THREADS + /* No threading ... just write. + */ + wbuffer_write( b1 ); +#endif /*HAVE_THREADS*/ + + /* Rotate wbuffers. + */ + { + WriteBuffer *t; + + t = b1; b1 = b2; b2 = t; + } + +#ifdef DEBUG + nstrips++; +#endif /*DEBUG*/ + } + + /* Wait for all threads to finish, check for any errors. + */ + im_threadgroup_wait( tg ); + im_semaphore_down( &b2->done ); + if( im_threadgroup_iserror( tg ) ) + return( -1 ); + if( b1->write_errno || b2->write_errno ) { + im_error_system( + b1->write_errno ? b1->write_errno : b2->write_errno, + "im__eval_to_file", "%s", _( "write failed" ) ); + return( -1 ); + } + +#ifdef DEBUG + printf( "wbuffer_eval_to_file: success! %d strips written\n", nstrips ); +#endif /*DEBUG*/ + + return( 0 ); +} + +int +im_wbuffer( im_threadgroup_t *tg, im_wbuffer_fn write_fn, void *a, void *b ) +{ + WriteBuffer *b1, *b2; + int result; + + if( im__start_eval( tg->im ) ) + return( -1 ); + + result = 0; + + b1 = wbuffer_new( tg, write_fn, a, b ); + b2 = wbuffer_new( tg, write_fn, a, b ); + + if( !b1 || !b2 || wbuffer_eval_to_file( b1, b2 ) ) + result = -1; + + im__end_eval( tg->im ); + wbuffer_free( b1 ); + wbuffer_free( b2 ); + + return( result ); +} diff --git a/libvips/iofuncs/threadpool.c b/libvips/iofuncs/threadpool.c index 9677ff78..de91473d 100644 --- a/libvips/iofuncs/threadpool.c +++ b/libvips/iofuncs/threadpool.c @@ -173,7 +173,8 @@ thread_main_loop( void *a ) if( !pool->stop ) { gboolean morework; - morework = pool->allocate( thr ); + morework = pool->allocate( thr, + pool->a, pool->b, pool->c ); if( !morework ) pool->stop = TRUE; } @@ -356,8 +357,7 @@ vips_threadpool_new( VipsImage *im ) pool->allocate = NULL; pool->work = NULL; pool->allocate_lock = g_mutex_new(); - if( (pool->nthr = im_concurrency_get()) < 0 ) - return( NULL ); + pool->nthr = im_concurrency_get(); pool->thr = NULL; im_semaphore_init( &pool->finish, 0, "finish" ); pool->kill = FALSE; @@ -365,37 +365,6 @@ vips_threadpool_new( VipsImage *im ) pool->progress = FALSE; pool->zombie = FALSE; - /* Pick a render geometry. - */ - switch( pool->im->dhint ) { - case IM_SMALLTILE: - pool->pw = im__tile_width; - pool->ph = im__tile_height; - - /* Enough lines of tiles that we can expect to be able to keep - * nthr busy. - */ - pool->nlines = pool->ph * (1 + pool->nthr / - IM_MAX( 1, pool->im->Xsize / pool->pw )); - break; - - case IM_FATSTRIP: - pool->pw = pool->im->Xsize; - pool->ph = im__fatstrip_height; - pool->nlines = pool->ph * pool->nthr * 2; - break; - - case IM_ANY: - case IM_THINSTRIP: - pool->pw = pool->im->Xsize; - pool->ph = im__thinstrip_height; - pool->nlines = pool->ph * pool->nthr * 2; - break; - - default: - g_assert( 0 ); - } - /* Attach tidy-up callback. */ if( im_add_close_callback( im, @@ -404,11 +373,6 @@ vips_threadpool_new( VipsImage *im ) return( NULL ); } -#ifdef DEBUG_IO - printf( "vips_threadpool_new: %d by %d patches, " - "groups of %d scanlines\n", pool->pw, pool->ph, pool->nlines ); -#endif /*DEBUG_IO*/ - #ifdef DEBUG_IO printf( "vips_threadpool_new: \"%s\" (%p), with %d threads\n", im->filename, pool, pool->nthr ); @@ -515,3 +479,49 @@ vips_threadpool_run( VipsImage *im, return( result ); } +/* Pick a tile size and buffer size. + */ +void +vips_get_tile_size( VipsImage *im, + int *tile_width, int *tile_height, int *nlines ) +{ + const int nthr = im_get_concurrency(); + + /* Pick a render geometry. + */ + switch( pool->im->dhint ) { + case IM_SMALLTILE: + *tile_width = im__tile_width; + *tile_height = im__tile_height; + + /* Enough lines of tiles that we can expect to be able to keep + * nthr busy. + */ + *nlines = *tile_height * + (1 + nthr / IM_MAX( 1, im->Xsize / *tile_width )); + break; + + case IM_FATSTRIP: + *tile_width = im->Xsize; + *tile_height = im__fatstrip_height; + *nlines = *tile_height * nthr * 2; + break; + + case IM_ANY: + case IM_THINSTRIP: + *tile_width = im->Xsize; + *tile_height = im__thinstrip_height; + *nlines = *tile_height * nthr * 2; + break; + + default: + g_assert( 0 ); + } + +#ifdef DEBUG_IO + printf( "vips_get_tile_size: %d by %d patches, " + "groups of %d scanlines\n", + *tile_width, *tile_height, *nlines ); +#endif /*DEBUG_IO*/ +} +