added wbuffer2

This commit is contained in:
John Cupitt 2010-03-20 23:56:35 +00:00
parent dedbe06ea4
commit 8d7dcfe5f6
10 changed files with 274 additions and 371 deletions

View File

@ -36,6 +36,8 @@
set >0 can override) on linux and win32 set >0 can override) on linux and win32
- better nprocs guesser - better nprocs guesser
- im_render() fixes to help the paintbox, some speedups too - im_render() fixes to help the paintbox, some speedups too
- added im_wbuffer2(), a new distributed threading system, and --vips-wbuffer2
to enable it
15/1/10 started 7.21.1 15/1/10 started 7.21.1
- added "written" callbacks, used to implement write to non-vips formats - added "written" callbacks, used to implement write to non-vips formats

7
TODO
View File

@ -1,3 +1,10 @@
- do we need to wait for the last bg write to end?
- make VipsThread opaque ... have a separate VipsThreadState struct holding
stuff and only expose that
- nip2 image display does not work with threading disabled
- try a distributed worker model for image generation - try a distributed worker model for image generation
at the moment threadpools have an idle list and a manager ... in fact, what at the moment threadpools have an idle list and a manager ... in fact, what

View File

@ -50,18 +50,24 @@ typedef struct {
/*< private >*/ /*< private >*/
struct _VipsThreadpool *pool; /* Pool we are part of */ struct _VipsThreadpool *pool; /* Pool we are part of */
REGION *reg; /* Region this thread operates on */
GThread *thread; /* Thread for this region */ GThread *thread; /* Thread for this region */
gboolean kill; /* Set this to make thread kill itself */
gboolean stop; /* Set this to make thread stop work */
gboolean error; /* Set by thread if work fn fails */
REGION *oreg; /* If part of an inplace pool, */ /* Set this to ask the thread to exit.
Rect pos; /* Where this thread should write */ */
int x, y; /* Its result */ gboolean exit;
void *a, *b, *c; /* User arguments to work fns */ /* Set by the thread if work or allocate return an error.
*/
gboolean error;
/* Thread state for the worker and allocate. Handy for communication.
* The region is created and destroyed by the threadpool for the
* worker.
*/
REGION *reg;
Rect pos;
int x, y;
void *a, *b, *c;
#ifdef TIME_THREAD #ifdef TIME_THREAD
double *btime, *etime; double *btime, *etime;
@ -69,16 +75,17 @@ typedef struct {
#endif /*TIME_THREAD*/ #endif /*TIME_THREAD*/
} VipsThread; } VipsThread;
/* A work function. This does a unit of work (eg. processing a tile or
* whatever).
*/
typedef int (*VipsThreadpoolWork)( VipsThread *thr, REGION *reg,
void *a, void *b, void *c );
/* A work allocate function. This is run single-threaded by a worker to /* A work allocate function. This is run single-threaded by a worker to
* set up a new work unit. * set up a new work unit.
* Return non-zero for errors. Set *stop for "no more work to do"
*/ */
typedef int (*VipsThreadpoolAllocate)( VipsThread *thr, typedef int (*VipsThreadpoolAllocate)( VipsThread *thr,
void *a, void *b, void *c, gboolean *stop );
/* A work function. This does a unit of work (eg. processing a tile or
* whatever). Return non-zero for errors.
*/
typedef int (*VipsThreadpoolWork)( VipsThread *thr, REGION *reg,
void *a, void *b, void *c ); void *a, void *b, void *c );
/* What we track for a group of threads working together. /* What we track for a group of threads working together.
@ -104,9 +111,13 @@ typedef struct _VipsThreadpool {
*/ */
im_semaphore_t finish; im_semaphore_t finish;
gboolean kill; /* Set to stop eval early */ /* Set this to abort evaluation early with an error.
gboolean stop; /* Set on normal end of computation */ */
gboolean progress; /* Set this to get eval progress feedback */ gboolean kill;
/* Set by Allocate (via an arg) to indicate normal end of computation.
*/
gboolean stop;
/* Set this if the pool has been shut down. We sometimes need to allow /* Set this if the pool has been shut down. We sometimes need to allow
* double-frees. * double-frees.
@ -114,12 +125,17 @@ typedef struct _VipsThreadpool {
gboolean zombie; gboolean zombie;
} VipsThreadpool; } VipsThreadpool;
int vips_threadpool_run( VipsImage *im, int vips_threadpool_run( VipsImage *im,
VipsThreadpoolAllocate allocate, VipsThreadpoolWork work, VipsThreadpoolAllocate allocate, VipsThreadpoolWork work,
void *a, void *b, void *c ); void *a, void *b, void *c );
void vips_get_tile_size( VipsImage *im, void vips_get_tile_size( VipsImage *im,
int *tile_width, int *tile_height, int *nlines ); int *tile_width, int *tile_height, int *nlines );
extern int im__wbuffer2;
int im_wbuffer2( VipsImage *im, im_wbuffer_fn write_fn, void *a, void *b );
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif /*__cplusplus*/ #endif /*__cplusplus*/

View File

@ -117,6 +117,7 @@ extern "C" {
#include <vips/interpolate.h> #include <vips/interpolate.h>
#include <vips/semaphore.h> #include <vips/semaphore.h>
#include <vips/threadgroup.h> #include <vips/threadgroup.h>
#include <vips/threadpool.h>
#include <vips/meta.h> #include <vips/meta.h>
#include <vips/header.h> #include <vips/header.h>

View File

@ -29,6 +29,7 @@ libiofuncs_la_SOURCES = \
im_unmapfile.c \ im_unmapfile.c \
im_guess_prefix.c \ im_guess_prefix.c \
im_wbuffer.c \ im_wbuffer.c \
im_wbuffer2.c \
im_wrapmany.c \ im_wrapmany.c \
im_writeline.c \ im_writeline.c \
memory.c \ memory.c \
@ -39,6 +40,7 @@ libiofuncs_la_SOURCES = \
rect.c \ rect.c \
semaphore.c \ semaphore.c \
threadgroup.c \ threadgroup.c \
threadpool.c \
util.c \ util.c \
im_init_world.c \ im_init_world.c \
buf.c \ buf.c \

View File

@ -605,8 +605,12 @@ im_generate( IMAGE *im,
im_region_free( or ); im_region_free( or );
return( -1 ); return( -1 );
} }
if( im->dtype == IM_OPENOUT ) if( im->dtype == IM_OPENOUT ) {
if( im__wbuffer2 )
res = im_wbuffer2( im, write_vips, NULL, NULL );
else
res = im_wbuffer( tg, write_vips, NULL, NULL ); res = im_wbuffer( tg, write_vips, NULL, NULL );
}
else else
res = eval_to_memory( tg, or ); res = eval_to_memory( tg, or );

View File

@ -268,6 +268,8 @@ static GOptionEntry option_entries[] = {
N_( "set fatstrip height to N (DEBUG)" ), "N" }, N_( "set fatstrip height to N (DEBUG)" ), "N" },
{ "vips-progress", 'p', 0, G_OPTION_ARG_NONE, &im__progress, { "vips-progress", 'p', 0, G_OPTION_ARG_NONE, &im__progress,
N_( "show progress feedback" ), NULL }, N_( "show progress feedback" ), NULL },
{ "vips-wbuffer2", 'd', 0, G_OPTION_ARG_NONE, &im__wbuffer2,
N_( "use distributed work allocation" ), NULL },
{ NULL } { NULL }
}; };

View File

@ -32,8 +32,8 @@
*/ */
/* /*
*/
#define DEBUG #define DEBUG
*/
#ifdef HAVE_CONFIG_H #ifdef HAVE_CONFIG_H
#include <config.h> #include <config.h>
@ -73,9 +73,6 @@ typedef struct _WriteBuffer {
int write_errno; /* Save write errors here */ int write_errno; /* Save write errors here */
GThread *thread; /* BG writer thread */ GThread *thread; /* BG writer thread */
gboolean kill; /* Set to ask thread to exit */ gboolean kill; /* Set to ask thread to exit */
im_wbuffer_fn write_fn; /* BG write with this */
void *a; /* Client data */
void *b;
} WriteBuffer; } WriteBuffer;
/* Per-call state. /* Per-call state.
@ -99,8 +96,18 @@ typedef struct _Write {
int tile_width; int tile_width;
int tile_height; int tile_height;
int nlines; int nlines;
/* The file format write operation.
*/
im_wbuffer_fn write_fn;
void *a;
void *b;
} Write; } Write;
/* Enable im_wbuffer2 ... set from the cmd line, tested by our users.
*/
int im__wbuffer2 = 0;
static void static void
wbuffer_free( WriteBuffer *wbuffer ) wbuffer_free( WriteBuffer *wbuffer )
{ {
@ -113,9 +120,9 @@ wbuffer_free( WriteBuffer *wbuffer )
/* Return value is always NULL (see wbuffer_write_thread). /* Return value is always NULL (see wbuffer_write_thread).
*/ */
(void) g_thread_join( wbuffer->thread ); (void) g_thread_join( wbuffer->thread );
#ifdef DEBUG_CREATE #ifdef DEBUG
printf( "wbuffer_free: g_thread_join()\n" ); printf( "wbuffer_free: g_thread_join()\n" );
#endif /*DEBUG_CREATE*/ #endif /*DEBUG*/
wbuffer->thread = NULL; wbuffer->thread = NULL;
} }
@ -130,13 +137,18 @@ wbuffer_free( WriteBuffer *wbuffer )
static void static void
wbuffer_write( WriteBuffer *wbuffer ) wbuffer_write( WriteBuffer *wbuffer )
{ {
wbuffer->write_errno = wbuffer->write_fn( wbuffer->region, Write *write = wbuffer->write;
&wbuffer->area, wbuffer->a, wbuffer->b );
#ifdef DEBUG #ifdef DEBUG
printf( "wbuffer_write: %d bytes from wbuffer %p\n", static int n = 1;
wbuffer->region->bpl * wbuffer->area.height, wbuffer );
printf( "wbuffer_write: %d, %d bytes from wbuffer %p\n",
n++, wbuffer->region->bpl * wbuffer->area.height, wbuffer );
#endif /*DEBUG*/ #endif /*DEBUG*/
wbuffer->write_errno = write->write_fn( wbuffer->region,
&wbuffer->area, write->a, write->b );
} }
#ifdef HAVE_THREADS #ifdef HAVE_THREADS
@ -148,12 +160,14 @@ wbuffer_write_thread( void *data )
WriteBuffer *wbuffer = (WriteBuffer *) data; WriteBuffer *wbuffer = (WriteBuffer *) data;
for(;;) { for(;;) {
/* Wait to be told to write.
*/
im_semaphore_down( &wbuffer->go ); im_semaphore_down( &wbuffer->go );
if( wbuffer->kill ) if( wbuffer->kill )
break; break;
/* Wait for all writer threads to leave this wbuffer. /* Now block until the last worker finishes on this buffer.
*/ */
im_semaphore_downn( &wbuffer->nwrite, 0 ); im_semaphore_downn( &wbuffer->nwrite, 0 );
@ -169,7 +183,7 @@ wbuffer_write_thread( void *data )
#endif /*HAVE_THREADS*/ #endif /*HAVE_THREADS*/
static WriteBuffer * static WriteBuffer *
wbuffer_new( Write *write, im_wbuffer_fn write_fn, void *a, void *b ) wbuffer_new( Write *write )
{ {
WriteBuffer *wbuffer; WriteBuffer *wbuffer;
@ -183,15 +197,16 @@ wbuffer_new( Write *write, im_wbuffer_fn write_fn, void *a, void *b )
wbuffer->write_errno = 0; wbuffer->write_errno = 0;
wbuffer->thread = NULL; wbuffer->thread = NULL;
wbuffer->kill = FALSE; wbuffer->kill = FALSE;
wbuffer->write_fn = write_fn;
wbuffer->a = a;
wbuffer->b = b;
if( !(wbuffer->region = im_region_create( write->im )) ) { if( !(wbuffer->region = im_region_create( write->im )) ) {
wbuffer_free( wbuffer ); wbuffer_free( wbuffer );
return( NULL ); return( NULL );
} }
/* The worker threads need to be able to move the buffers around.
*/
im__region_no_ownership( wbuffer->region );
#ifdef HAVE_THREADS #ifdef HAVE_THREADS
/* Make this last (picks up parts of wbuffer on startup). /* Make this last (picks up parts of wbuffer on startup).
*/ */
@ -206,21 +221,49 @@ wbuffer_new( Write *write, im_wbuffer_fn write_fn, void *a, void *b )
return( wbuffer ); return( wbuffer );
} }
/* Our VipsThreadpoolWork function ... generate a tile and tell the wbuffer /* Write and swap buffers.
* write thread that we're done.
*/ */
static int static int
wbuffer_work_fn( VipsThread *thr, wbuffer_flush( Write *write )
REGION *reg, void *a, void *b, void *c )
{ {
WriteBuffer *wbuffer = (WriteBuffer *) a; WriteBuffer *t;
Write *write = wbuffer->write;
if( im_prepare_to( reg, wbuffer->reg, #ifdef DEBUG
&thr->pos, thr->pos.left, thr->pos.top ) ) static int n = 1;
printf( "wbuffer_flush: %d\n", n++ );
#endif /*DEBUG*/
/* Block until the other buffer has been written. We have to do this
* before we can set this buffer writing or we'll lose output ordering.
*/
if( write->buf->area.top > 0 ) {
im_semaphore_down( &write->buf_back->done );
/* Previous write suceeded?
*/
if( write->buf_back->write_errno ) {
im_error_system( write->buf_back->write_errno,
"wbuffer_write", "%s", _( "write failed" ) );
return( -1 ); return( -1 );
}
}
im_semaphore_up( &wbuffer->nwrite ); /* Set the background writer going for this buffer.
*/
#ifdef HAVE_THREADS
im_semaphore_up( &write->buf->go );
#else
/* No threads? Write ourselves synchronously.
*/
wbuffer_write( write->buf );
#endif /*HAVE_THREADS*/
/* Swap buffers.
*/
t = write->buf;
write->buf = write->buf_back;
write->buf_back = t;
return( 0 ); return( 0 );
} }
@ -231,6 +274,7 @@ static int
wbuffer_position( WriteBuffer *wbuffer, int top, int height ) wbuffer_position( WriteBuffer *wbuffer, int top, int height )
{ {
Rect image, area; Rect image, area;
int result;
image.left = 0; image.left = 0;
image.top = 0; image.top = 0;
@ -243,14 +287,20 @@ wbuffer_position( WriteBuffer *wbuffer, int top, int height )
area.height = height; area.height = height;
im_rect_intersectrect( &area, &image, &wbuffer->area ); im_rect_intersectrect( &area, &image, &wbuffer->area );
if( im_region_buffer( wbuffer->region, &wbuffer->area ) )
return( -1 ); /* The workers take turns to move the buffers.
*/
im__region_take_ownership( wbuffer->region );
result = im_region_buffer( wbuffer->region, &wbuffer->area );
im__region_no_ownership( wbuffer->region );
/* This should be an exclusive buffer, hopefully. /* This should be an exclusive buffer, hopefully.
*/ */
g_assert( !wbuffer->region->buffer->done ); g_assert( !wbuffer->region->buffer->done );
return( 0 ); return( result );
} }
/* Our VipsThreadpoolAllocate function ... move the thread to the next tile /* Our VipsThreadpoolAllocate function ... move the thread to the next tile
@ -259,65 +309,48 @@ wbuffer_position( WriteBuffer *wbuffer, int top, int height )
* iteration. * iteration.
*/ */
static gboolean static gboolean
wbuffer_allocate_fn( VipsThread *thr, void *a, void *b, void *c ) wbuffer_allocate_fn( VipsThread *thr,
void *a, void *b, void *c, gboolean *stop )
{ {
WriteBuffer *wbuffer = (WriteBuffer *) a; Write *write = (Write *) a;
Write *write = wbuffer->write;
Rect image; Rect image;
Rect tile; Rect tile;
/* Is the current x/y position OK? New line or maybe new buffer or #ifdef DEBUG
* maybe all done. printf( "wbuffer_allocate_fn\n" );
#endif /*DEBUG*/
/* Is the state x/y OK? New line or maybe new buffer or maybe even
* all done.
*/ */
if( write->x > write->buf->area.width ) { if( write->x >= write->buf->area.width ) {
write->x = 0; write->x = 0;
write->y += write->tile_height; write->y += write->tile_height;
if( write->y > IM_RECT_BOTTOM( &write->buf->area ) ) { if( write->y >= IM_RECT_BOTTOM( &write->buf->area ) ) {
/* Buffer full. Block until the other buffer has been /* Write and swap buffers.
* written.
*/ */
if( write->buf->area.top > 0 ) { if( wbuffer_flush( write ) )
im_semaphore_down( &write->buf_back->done );
/* Previous write suceeded?
*/
if( write->buf_back->write_errno ) {
thr->err = write->buf_back->write_errno;
return( -1 ); return( -1 );
}
}
/* Start writing this buffer.
*/
im_semaphore_up( &write->buf->go );
/* Swap buffers.
*/
{
WriteBuffer *t;
t = write->buf;
write->buf = write->buf_back;
write->buf_back = t;
}
/* End of image? /* End of image?
*/ */
if( write->buf_back->area.top + write->nlines > if( write->y >= write->im->Ysize ) {
write->im->Ysize ) *stop = TRUE;
return( -1 ); return( 0 );
}
/* Position buf below buf_back. /* Position buf at the new y.
*/ */
if( wbuffer_position( write->buf, if( wbuffer_position( write->buf,
write->buf_back->area.top + write->nlines, write->y, write->nlines ) )
write->nlines ) )
return( -1 ); return( -1 );
} }
} }
/* x, y and buf are good: save params for thread.
*/
image.left = 0; image.left = 0;
image.top = 0; image.top = 0;
image.width = write->im->Xsize; image.width = write->im->Xsize;
@ -327,209 +360,90 @@ wbuffer_allocate_fn( VipsThread *thr, void *a, void *b, void *c )
tile.width = write->tile_width; tile.width = write->tile_width;
tile.height = write->tile_height; tile.height = write->tile_height;
im_rect_intersectrect( &image, &tile, &thr->pos ); im_rect_intersectrect( &image, &tile, &thr->pos );
thr->a = write->buf;
/* Add to the number of writers on the buffer.
*/
im_semaphore_upn( &write->buf->nwrite, -1 );
/* Move state on.
*/
write->x += write->tile_width; write->x += write->tile_width;
return( 0 ); return( 0 );
} }
/* Loop over a wbuffer filling it threadily. /* Our VipsThreadpoolWork function ... generate a tile!
*/ */
static int static int
wbuffer_fill( WriteBuffer *wbuffer ) wbuffer_work_fn( VipsThread *thr,
REGION *reg, void *a, void *b, void *c )
{ {
Rect *area = &wbuffer->area; WriteBuffer *wbuffer = (WriteBuffer *) thr->a;
im_threadgroup_t *tg = wbuffer->tg;
IMAGE *im = tg->im;
Rect image;
int x, y;
#ifdef DEBUG #ifdef DEBUG
printf( "wbuffer_fill: starting for wbuffer %p at line %d\n", printf( "wbuffer_work_fn\n" );
wbuffer, area->top );
#endif /*DEBUG*/ #endif /*DEBUG*/
image.left = 0; if( im_prepare_to( reg, wbuffer->region,
image.top = 0; &thr->pos, thr->pos.left, thr->pos.top ) )
image.width = im->Xsize;
image.height = im->Ysize;
/* Loop over area, sparking threads for all sub-parts in turn.
*/
for( y = area->top; y < IM_RECT_BOTTOM( area ); y += tg->ph )
for( x = area->left; x < IM_RECT_RIGHT( area ); x += tg->pw ) {
im_thread_t *thr;
Rect pos;
Rect clipped;
/* thrs appear on idle when the child thread does
* threadgroup_idle_add and hits the 'go' semaphore.
*/
thr = im_threadgroup_get( tg );
/* Set the position we want to generate with this
* thread. Clip against the size of the image and the
* space available in or.
*/
pos.left = x;
pos.top = y;
pos.width = tg->pw;
pos.height = tg->ph;
im_rect_intersectrect( &pos, &image, &clipped );
im_rect_intersectrect( &clipped, area, &clipped );
/* Note params.
*/
thr->oreg = wbuffer->region;
thr->pos = clipped;
thr->x = clipped.left;
thr->y = clipped.top;
thr->a = wbuffer;
#ifdef DEBUG
printf( "wbuffer_fill: starting for tile at %d x %d\n",
x, y );
#endif /*DEBUG*/
/* Add writer to n of writers on wbuffer, set it going.
*/
im_semaphore_upn( &wbuffer->nwrite, -1 );
im_threadgroup_trigger( thr );
/* Trigger any eval callbacks on our source image and
* check for errors.
*/
if( im__handle_eval( tg->im, tg->pw, tg->ph ) ||
im_threadgroup_iserror( tg ) ) {
/* Don't kill threads yet ... we may want to
* get some error stuff out of them.
*/
im_threadgroup_wait( tg );
return( -1 );
}
}
return( 0 );
}
/* Eval to file.
*/
static int
wbuffer_eval_to_file( WriteBuffer *b1, WriteBuffer *b2 )
{
im_threadgroup_t *tg = b1->tg;
IMAGE *im = tg->im;
int y;
assert( b1->tg == b2->tg );
#ifdef DEBUG
int nstrips;
nstrips = 0;
printf( "wbuffer_eval_to_file: partial image output to file\n" );
#endif /*DEBUG*/
/* What threads do at the end of each tile ... decrement the nwrite
* semaphore.
*/
tg->work = wbuffer_work_fn;
/* Fill to in steps, write each to the output.
*/
for( y = 0; y < im->Ysize; y += tg->nlines ) {
/* Attach to this position in image.
*/
if( wbuffer_position( b1, 0, y, im->Xsize, tg->nlines ) )
return( -1 ); return( -1 );
/* Spark off threads to fill with data. /* Tell the bg write thread we've left.
*/ */
if( wbuffer_fill( b1 ) ) im_semaphore_upn( &wbuffer->nwrite, 1 );
return( -1 );
/* We have to keep the ordering on wbuffer writes, so we can't
* have more than one background write going at once. Plus we
* want to make sure write()s don't get interleaved. Wait for
* the previous BG write (if any) to finish.
*/
if( y > 0 ) {
im_semaphore_down( &b2->done );
/* Previous write suceeded?
*/
if( b2->write_errno ) {
im_error_system( b2->write_errno,
"im__eval_to_file",
"%s", _( "write failed" ) );
return( -1 );
}
}
/* b1 write can go.
*/
im_semaphore_up( &b1->go );
#ifndef HAVE_THREADS
/* No threading ... just write.
*/
wbuffer_write( b1 );
#endif /*HAVE_THREADS*/
/* Rotate wbuffers.
*/
{
WriteBuffer *t;
t = b1; b1 = b2; b2 = t;
}
#ifdef DEBUG
nstrips++;
#endif /*DEBUG*/
}
/* Wait for all threads to finish, check for any errors.
*/
im_threadgroup_wait( tg );
im_semaphore_down( &b2->done );
if( im_threadgroup_iserror( tg ) )
return( -1 );
if( b1->write_errno || b2->write_errno ) {
im_error_system(
b1->write_errno ? b1->write_errno : b2->write_errno,
"im__eval_to_file", "%s", _( "write failed" ) );
return( -1 );
}
#ifdef DEBUG
printf( "wbuffer_eval_to_file: success! %d strips written\n", nstrips );
#endif /*DEBUG*/
return( 0 ); return( 0 );
} }
int int
im_wbuffer( im_threadgroup_t *tg, im_wbuffer_fn write_fn, void *a, void *b ) im_wbuffer2( VipsImage *im, im_wbuffer_fn write_fn, void *a, void *b )
{ {
WriteBuffer *b1, *b2; Write write;
int result; int result;
if( im__start_eval( tg->im ) ) write.im = im;
write.buf = wbuffer_new( &write );
write.buf_back = wbuffer_new( &write );
write.x = 0;
write.y = 0;
write.write_fn = write_fn;
write.a = a;
write.b = b;
vips_get_tile_size( im,
&write.tile_width, &write.tile_height, &write.nlines );
if( im__start_eval( im ) )
return( -1 ); return( -1 );
result = 0; result = 0;
b1 = wbuffer_new( tg, write_fn, a, b ); if( !write.buf ||
b2 = wbuffer_new( tg, write_fn, a, b ); !write.buf_back ||
wbuffer_position( write.buf, 0, write.nlines ) ||
if( !b1 || !b2 || wbuffer_eval_to_file( b1, b2 ) ) vips_threadpool_run( im,
wbuffer_allocate_fn, wbuffer_work_fn,
&write, NULL, NULL ) )
result = -1; result = -1;
im__end_eval( tg->im ); /* We've set all the buffers writing, but not waited for the BG
wbuffer_free( b1 ); * writer to finish. This can take a while: it has to wait for the
wbuffer_free( b2 ); * last worker to make the last tile.
*
* We can't just free the buffers (which will wait for the bg threads
* to finish), since the bg thread might see the kill before it gets a
* chance to write.
*/
if( write.buf->area.top > 0 )
im_semaphore_down( &write.buf_back->done );
im__end_eval( im );
/* Free buffers ... this will wait for the final write to finish too.
*/
wbuffer_free( write.buf );
wbuffer_free( write.buf_back );
return( result ); return( result );
} }

View File

@ -618,41 +618,13 @@ im_threadgroup_create( IMAGE *im )
tg->zombie = 0; tg->zombie = 0;
tg->im = im; tg->im = im;
tg->work = NULL; tg->work = NULL;
if( (tg->nthr = im_concurrency_get()) < 0 ) tg->nthr = im_concurrency_get();
return( NULL );
tg->thr = NULL; tg->thr = NULL;
tg->kill = 0; tg->kill = 0;
/* Pick a render geometry. /* Pick a render geometry.
*/ */
switch( tg->im->dhint ) { vips_get_tile_size( tg->im, &tg->pw, &tg->ph, &tg->nlines );
case IM_SMALLTILE:
tg->pw = im__tile_width;
tg->ph = im__tile_height;
/* Enough lines of tiles that we can expect to be able to keep
* nthr busy.
*/
tg->nlines = tg->ph * (1 + tg->nthr /
IM_MAX( 1, tg->im->Xsize / tg->pw ));
break;
case IM_FATSTRIP:
tg->pw = tg->im->Xsize;
tg->ph = im__fatstrip_height;
tg->nlines = tg->ph * tg->nthr * 2;
break;
case IM_ANY:
case IM_THINSTRIP:
tg->pw = tg->im->Xsize;
tg->ph = im__thinstrip_height;
tg->nlines = tg->ph * tg->nthr * 2;
break;
default:
error_exit( "panic: internal error #98i245983425" );
}
/* Attach tidy-up callback. /* Attach tidy-up callback.
*/ */

View File

@ -31,10 +31,10 @@
*/ */
/* /*
*/
#define TIME_THREAD #define TIME_THREAD
#define DEBUG_CREATE #define DEBUG_CREATE
#define DEBUG_IO #define DEBUG_IO
*/
#ifdef HAVE_CONFIG_H #ifdef HAVE_CONFIG_H
#include <config.h> #include <config.h>
@ -51,7 +51,6 @@
#include <vips/vips.h> #include <vips/vips.h>
#include <vips/internal.h> #include <vips/internal.h>
#include <vips/thread.h> #include <vips/thread.h>
#include <vips/threadpool.h>
#ifdef WITH_DMALLOC #ifdef WITH_DMALLOC
#include <dmalloc.h> #include <dmalloc.h>
@ -91,6 +90,7 @@ save_time_buffers( VipsThread *thr )
char name[256]; char name[256];
im_snprintf( name, 256, "time%d", rn++ ); im_snprintf( name, 256, "time%d", rn++ );
printf( "saving time buffer to \"%s\"\n", name );
if( !(fp = fopen( name, "w" )) ) if( !(fp = fopen( name, "w" )) )
error_exit( "unable to write to \"%s\"", name ); error_exit( "unable to write to \"%s\"", name );
for( i = 0; i < thr->tpos; i++ ) for( i = 0; i < thr->tpos; i++ )
@ -109,7 +109,7 @@ thread_free( VipsThread *thr )
/* Is there a thread running this region? Kill it! /* Is there a thread running this region? Kill it!
*/ */
if( thr->thread ) { if( thr->thread ) {
thr->kill = 1; thr->exit = 1;
/* Return value is always NULL (see thread_main_loop). /* Return value is always NULL (see thread_main_loop).
*/ */
@ -122,7 +122,6 @@ thread_free( VipsThread *thr )
} }
IM_FREEF( im_region_free, thr->reg ); IM_FREEF( im_region_free, thr->reg );
thr->oreg = NULL;
thr->pool = NULL; thr->pool = NULL;
#ifdef TIME_THREAD #ifdef TIME_THREAD
@ -131,23 +130,58 @@ thread_free( VipsThread *thr )
#endif /*TIME_THREAD*/ #endif /*TIME_THREAD*/
} }
/* The work we do in one loop. This can run from the main thread in a loop if /* The main loop: get some work, do it! Can run from many worker threads, or
* we're unthreaded, or in parallel if we are threaded. * from the main thread if threading is off.
*/ */
static void static void
work_fn( VipsThread *thr ) thread_loop( VipsThread *thr )
{ {
/* Doublecheck only one thread per region. VipsThreadpool *pool = thr->pool;
*/
g_assert( thr->thread == g_thread_self() );
g_assert( thr->pool->work ); for(;;) {
/* Ask for a work unit.
/* Call our work function.
*/ */
if( !thr->error && g_mutex_lock( pool->allocate_lock );
thr->pool->work( thr, thr->reg, thr->a, thr->b, thr->c ) ) if( !pool->stop ) {
if( pool->allocate( thr,
pool->a, pool->b, pool->c, &pool->stop ) )
thr->error = -1;
}
g_mutex_unlock( pool->allocate_lock );
if( pool->stop || pool->kill )
break;
if( thr->exit || thr->error )
break;
#ifdef TIME_THREAD
/* Note start time.
*/
if( thr->btime && thr->tpos < IM_TBUF_SIZE )
thr->btime[thr->tpos] =
g_timer_elapsed( thread_timer, NULL );
#endif /*TIME_THREAD*/
/* Loop once.
*/
if( pool->work( thr, thr->reg, pool->a, pool->b, pool->c ) )
thr->error = 1; thr->error = 1;
if( pool->stop || pool->kill )
break;
if( thr->exit || thr->error )
break;
#ifdef TIME_THREAD
/* Note stop time.
*/
if( thr->etime && thr->tpos < IM_TBUF_SIZE ) {
thr->etime[thr->tpos] =
g_timer_elapsed( thread_timer, NULL );
thr->tpos += 1;
}
#endif /*TIME_THREAD*/
}
} }
#ifdef HAVE_THREADS #ifdef HAVE_THREADS
@ -166,48 +200,9 @@ thread_main_loop( void *a )
*/ */
im__region_take_ownership( thr->reg ); im__region_take_ownership( thr->reg );
for(;;) { /* Do all the work we can.
/* Ask for a work unit.
*/ */
g_mutex_lock( pool->allocate_lock ); thread_loop( thr );
if( !pool->stop ) {
gboolean morework;
morework = pool->allocate( thr,
pool->a, pool->b, pool->c );
if( !morework )
pool->stop = TRUE;
}
g_mutex_unlock( pool->allocate_lock );
/* Asked to stop work?
*/
if( thr->stop || thr->kill ||
pool->stop || pool->kill )
break;
#ifdef TIME_THREAD
/* Note start time.
*/
if( thr->btime )
thr->btime[thr->tpos] =
g_timer_elapsed( thread_timer, NULL );
#endif /*TIME_THREAD*/
/* Loop once.
*/
work_fn( thr );
#ifdef TIME_THREAD
/* Note stop time.
*/
if( thr->etime ) {
thr->etime[thr->tpos] =
g_timer_elapsed( thread_timer, NULL );
thr->tpos++;
}
#endif /*TIME_THREAD*/
}
/* We are exiting: tell the main thread. /* We are exiting: tell the main thread.
*/ */
@ -229,11 +224,8 @@ vips_thread_new( VipsThreadpool *pool )
thr->pool = pool; thr->pool = pool;
thr->reg = NULL; thr->reg = NULL;
thr->thread = NULL; thr->thread = NULL;
thr->kill = 0; thr->exit = 0;
thr->stop = 0;
thr->error = 0; thr->error = 0;
thr->oreg = NULL;
thr->a = thr->b = thr->c = NULL;
#ifdef TIME_THREAD #ifdef TIME_THREAD
thr->btime = NULL; thr->btime = NULL;
thr->etime = NULL; thr->etime = NULL;
@ -362,7 +354,6 @@ vips_threadpool_new( VipsImage *im )
im_semaphore_init( &pool->finish, 0, "finish" ); im_semaphore_init( &pool->finish, 0, "finish" );
pool->kill = FALSE; pool->kill = FALSE;
pool->stop = FALSE; pool->stop = FALSE;
pool->progress = FALSE;
pool->zombie = FALSE; pool->zombie = FALSE;
/* Attach tidy-up callback. /* Attach tidy-up callback.
@ -444,20 +435,7 @@ vips_threadpool_run( VipsImage *im,
#else #else
/* No threads, do the work ourselves in the main thread. /* No threads, do the work ourselves in the main thread.
*/ */
for(;;) { thread_loop( pool->thr[0] );
gboolean morework;
morework = pool->allocate( pool->thr[0] );
if( !morework && !pool->stop )
pool->stop = TRUE;
if( pool->thr[0]->error )
break;
if( pool->thr[0]->stop || pool->thr[0]->kill ||
pool->stop || pool->kill )
break;
work_fn( pool->thr[0] );
}
#endif /*HAVE_THREADS*/ #endif /*HAVE_THREADS*/
/* Test for error. /* Test for error.
@ -479,17 +457,18 @@ vips_threadpool_run( VipsImage *im,
return( result ); return( result );
} }
/* Pick a tile size and buffer size. /* Pick a tile size and a buffer height. The buffer height must be a multiple
* of tile_height.
*/ */
void void
vips_get_tile_size( VipsImage *im, vips_get_tile_size( VipsImage *im,
int *tile_width, int *tile_height, int *nlines ) int *tile_width, int *tile_height, int *nlines )
{ {
const int nthr = im_get_concurrency(); const int nthr = im_concurrency_get();
/* Pick a render geometry. /* Pick a render geometry.
*/ */
switch( pool->im->dhint ) { switch( im->dhint ) {
case IM_SMALLTILE: case IM_SMALLTILE:
*tile_width = im__tile_width; *tile_width = im__tile_width;
*tile_height = im__tile_height; *tile_height = im__tile_height;
@ -518,6 +497,10 @@ vips_get_tile_size( VipsImage *im,
g_assert( 0 ); g_assert( 0 );
} }
/* We make this assumption in several places.
*/
g_assert( *nlines % *tile_height == 0 );
#ifdef DEBUG_IO #ifdef DEBUG_IO
printf( "vips_get_tile_size: %d by %d patches, " printf( "vips_get_tile_size: %d by %d patches, "
"groups of %d scanlines\n", "groups of %d scanlines\n",