From 56c1c9c036a5a22e4be1692379af91854d27e9e0 Mon Sep 17 00:00:00 2001 From: John Cupitt Date: Fri, 9 Apr 2010 16:51:45 +0000 Subject: [PATCH] im_iterate() has an optional threadpool backend too --- ChangeLog | 1 + TODO | 7 +- libvips/include/vips/debug.h | 16 +- libvips/include/vips/generate.h | 6 - libvips/include/vips/threadpool.h | 76 ++++++-- libvips/iofuncs/Makefile.am | 3 +- libvips/iofuncs/im_iterate.c | 3 + libvips/iofuncs/im_wbuffer.c | 3 +- libvips/iofuncs/sink.c | 197 +++++++++++++------ libvips/iofuncs/{discsink.c => sinkdisc.c} | 86 ++++++--- libvips/iofuncs/threadpool.c | 214 +++++++++++++++------ libvips/resample/interpolate.c | 1 + 12 files changed, 435 insertions(+), 178 deletions(-) rename libvips/iofuncs/{discsink.c => sinkdisc.c} (84%) diff --git a/ChangeLog b/ChangeLog index 3e35979a..72f88875 100644 --- a/ChangeLog +++ b/ChangeLog @@ -11,6 +11,7 @@ - fixed a couple of /0 problems with scale == 0 masks - set G_LOG_DOMAIN to VIPS so we can use g_warning etc. - added VIPS_DEBUG_MSG() macro +- --vips-wbuffer2 turns on threadpool for im_iterate as well 16/1/10 started 7.21.2 - "invalidate" is careful to keep images alive, so invalidate callbacks can do diff --git a/TODO b/TODO index 7d986d5c..827a8584 100644 --- a/TODO +++ b/TODO @@ -1,9 +1,8 @@ -- how about +- vips_threadpool() has three clientdata, do we need all of them? maube one is + enough - #define VIPS_DEBUG_MSG( STR ) printf( ...) - - nicer than lots of #ifdef DEBUG ... + rename vips_discsink() / discsink.c as vips_sink_disc / sinkdisc - vips_sink() needs to be able to supply a thread start / stop function diff --git a/libvips/include/vips/debug.h b/libvips/include/vips/debug.h index cc285537..d16b4339 100644 --- a/libvips/include/vips/debug.h +++ b/libvips/include/vips/debug.h @@ -35,27 +35,27 @@ extern "C" { #endif /*__cplusplus*/ #ifdef VIPS_DEBUG -#define VIPS_DEBUG_MSG( FMT, ... ) printf( FMT, __VA_ARGS__ ) +#define VIPS_DEBUG_MSG( ... ) printf( __VA_ARGS__ ) #else -#define VIPS_DEBUG_MSG( FMT, ... ) +#define VIPS_DEBUG_MSG( ... ) #endif /*VIPS_DEBUG*/ #ifdef VIPS_DEBUG_RED -#define VIPS_DEBUG_MSG_RED( FMT, ... ) printf( "red: " FMT, __VA_ARGS__ ) +#define VIPS_DEBUG_MSG_RED( ... ) printf( "red: " __VA_ARGS__ ) #else -#define VIPS_DEBUG_MSG_RED( FMT, ... ) +#define VIPS_DEBUG_MSG_RED( ... ) #endif /*VIPS_DEBUG_RED*/ #ifdef VIPS_DEBUG_AMBER -#define VIPS_DEBUG_MSG_AMBER( FMT, ... ) printf( "amber: " FMT, __VA_ARGS__ ) +#define VIPS_DEBUG_MSG_AMBER( ... ) printf( "amber: " __VA_ARGS__ ) #else -#define VIPS_DEBUG_MSG_AMBER( FMT, ... ) +#define VIPS_DEBUG_MSG_AMBER( ... ) #endif /*VIPS_DEBUG_AMBER*/ #ifdef VIPS_DEBUG_GREEN -#define VIPS_DEBUG_MSG_GREEN( FMT, ... ) printf( "green: " FMT, __VA_ARGS__ ) +#define VIPS_DEBUG_MSG_GREEN( ... ) printf( "green: " __VA_ARGS__ ) #else -#define VIPS_DEBUG_MSG_GREEN( FMT, ... ) +#define VIPS_DEBUG_MSG_GREEN( ... ) #endif /*VIPS_DEBUG_GREEN*/ /* All open image descriptors ... see im_init() and im_close(). diff --git a/libvips/include/vips/generate.h b/libvips/include/vips/generate.h index dff8ff92..85ded08e 100644 --- a/libvips/include/vips/generate.h +++ b/libvips/include/vips/generate.h @@ -90,12 +90,6 @@ int im_render_priority( IMAGE *in, IMAGE *out, IMAGE *mask, void (*notify)( IMAGE *, Rect *, void * ), void *client ); int im_cache( IMAGE *in, IMAGE *out, int width, int height, int max ); -typedef void (*VipsSinkNotify)( VipsImage *im, Rect *rect, void *client ); -int vips_sink_screen( VipsImage *in, VipsImage *out, VipsImage *mask, - int width, int height, int max, - int priority, - VipsSinkNotify notify, void *client ); - /* WIO. */ int im_setupout( IMAGE *im ); diff --git a/libvips/include/vips/threadpool.h b/libvips/include/vips/threadpool.h index 08c2571d..58c8e0fc 100644 --- a/libvips/include/vips/threadpool.h +++ b/libvips/include/vips/threadpool.h @@ -42,51 +42,103 @@ extern "C" { #include -/* The per-thread state we expose. Allocate functions can use these members to +/* Per-thread state. Allocate functions can use these members to * communicate with work functions. */ + +#define VIPS_TYPE_THREAD_STATE (vips_thread_state_get_type()) +#define VIPS_THREAD_STATE( obj ) \ + (G_TYPE_CHECK_INSTANCE_CAST( (obj), \ + VIPS_TYPE_THREAD_STATE, VipsThreadState )) +#define VIPS_THREAD_STATE_CLASS( klass ) \ + (G_TYPE_CHECK_CLASS_CAST( (klass), \ + VIPS_TYPE_THREAD_STATE, VipsThreadStateClass)) +#define VIPS_IS_THREAD_STATE( obj ) \ + (G_TYPE_CHECK_INSTANCE_TYPE( (obj), VIPS_TYPE_THREAD_STATE )) +#define VIPS_IS_THREAD_STATE_CLASS( klass ) \ + (G_TYPE_CHECK_CLASS_TYPE( (klass), VIPS_TYPE_THREAD_STATE )) +#define VIPS_THREAD_STATE_GET_CLASS( obj ) \ + (G_TYPE_INSTANCE_GET_CLASS( (obj), \ + VIPS_TYPE_THREAD_STATE, VipsThreadStateClass )) + typedef struct _VipsThreadState { + VipsObject parent_object; + + /* Image we run on. + */ + VipsImage *im; + /* This region is created and destroyed by the threadpool for the - * worker. + * use of the worker. */ REGION *reg; - /* The rest are neither used nor set, do what you like with them. + /* Neither used nor set, do what you like with them. */ Rect pos; int x, y; - void *d, *e, *f; + + /* The client data passed to the enclosing vips_threadpool_run(). + */ + void *a; + } VipsThreadState; +typedef struct _VipsThreadStateClass { + VipsObjectClass parent_class; + +} VipsThreadStateClass; + +void *vips_thread_state_set( VipsObject *object, void *a, void *b ); +GType vips_thread_state_get_type( void ); + +VipsThreadState *vips_thread_state_new( VipsImage *im, void *a ); + +/* Constructor for per-thread state. + */ +typedef VipsThreadState *(*VipsThreadStart)( VipsImage *im, void *a ); + /* A work allocate function. This is run single-threaded by a worker to * set up a new work unit. * Return non-zero for errors. Set *stop for "no more work to do" */ typedef int (*VipsThreadpoolAllocate)( VipsThreadState *state, - void *a, void *b, void *c, gboolean *stop ); + void *a, gboolean *stop ); /* A work function. This does a unit of work (eg. processing a tile or * whatever). Return non-zero for errors. */ -typedef int (*VipsThreadpoolWork)( VipsThreadState *state, - void *a, void *b, void *c ); +typedef int (*VipsThreadpoolWork)( VipsThreadState *state, void *a ); /* A progress function. This is run by the main thread once for every * allocation. Return an error to kill computation early. */ -typedef int (*VipsThreadpoolProgress)( void *a, void *b, void *c ); +typedef int (*VipsThreadpoolProgress)( void *a ); int vips_threadpool_run( VipsImage *im, + VipsThreadStart start, VipsThreadpoolAllocate allocate, VipsThreadpoolWork work, VipsThreadpoolProgress progress, - void *a, void *b, void *c ); + void *a ); void vips_get_tile_size( VipsImage *im, int *tile_width, int *tile_height, int *nlines ); -typedef int (*VipsRegionWrite)( REGION *region, Rect *area, void *a, void *b ); -int vips_discsink( VipsImage *im, - VipsRegionWrite write_fn, void *a, void *b ); +typedef int (*VipsRegionWrite)( REGION *region, Rect *area, void *a ); +int vips_sink_disc( VipsImage *im, VipsRegionWrite write_fn, void *a ); + +typedef void *(*VipsStart)( VipsImage *out, void *a, void *b ); +typedef int (*VipsGenerate)( REGION *out, void *seq, void *a, void *b ); +typedef int (*VipsStop)( void *seq, void *a, void *b ); +int vips_sink( VipsImage *im, + VipsStart start, VipsGenerate generate, VipsStop stop, + void *a, void *b ); + +typedef void (*VipsSinkNotify)( VipsImage *im, Rect *rect, void *client ); +int vips_sink_screen( VipsImage *in, VipsImage *out, VipsImage *mask, + int width, int height, int max, + int priority, + VipsSinkNotify notify, void *client ); #ifdef __cplusplus } diff --git a/libvips/iofuncs/Makefile.am b/libvips/iofuncs/Makefile.am index 68ed9fa2..9607b040 100644 --- a/libvips/iofuncs/Makefile.am +++ b/libvips/iofuncs/Makefile.am @@ -29,7 +29,8 @@ libiofuncs_la_SOURCES = \ im_unmapfile.c \ im_guess_prefix.c \ im_wbuffer.c \ - discsink.c \ + sinkdisc.c \ + sink.c \ im_wrapmany.c \ im_writeline.c \ memory.c \ diff --git a/libvips/iofuncs/im_iterate.c b/libvips/iofuncs/im_iterate.c index 6f600376..68cc09f6 100644 --- a/libvips/iofuncs/im_iterate.c +++ b/libvips/iofuncs/im_iterate.c @@ -379,6 +379,9 @@ im_iterate( IMAGE *im, Iterate iter; int result; + if( im__wbuffer2 ) + return( vips_sink( im, start, generate, stop, b, c ) ); + g_assert( !im_image_sanity( im ) ); /* We don't use this, but make sure it's set in case any old binaries diff --git a/libvips/iofuncs/im_wbuffer.c b/libvips/iofuncs/im_wbuffer.c index 732f57d7..209e2876 100644 --- a/libvips/iofuncs/im_wbuffer.c +++ b/libvips/iofuncs/im_wbuffer.c @@ -430,7 +430,8 @@ im_wbuffer( im_threadgroup_t *tg, im_wbuffer_fn write_fn, void *a, void *b ) /* Optionally use the newer one. */ if( im__wbuffer2 ) - return( vips_discsink( tg->im, write_fn, a, b ) ); + return( vips_sink_disc( tg->im, + (VipsRegionWrite) write_fn, a ) ); if( im__start_eval( tg->im ) ) return( -1 ); diff --git a/libvips/iofuncs/sink.c b/libvips/iofuncs/sink.c index d6d05562..36cdab6d 100644 --- a/libvips/iofuncs/sink.c +++ b/libvips/iofuncs/sink.c @@ -31,8 +31,8 @@ */ /* -#define DEBUG */ +#define VIPS_DEBUG #ifdef HAVE_CONFIG_H #include @@ -72,7 +72,7 @@ typedef struct _Sink { int tile_height; int nlines; - /* Keep the sequence value in VipsThreadState->d. + /* Call params. */ im_start_fn start; im_generate_fn generate; @@ -81,6 +81,124 @@ typedef struct _Sink { void *b; } Sink; +/* Our per-thread state. + */ +typedef struct _SinkThreadState { + VipsThreadState parent_object; + + /* Sequence value for this thread. + */ + void *seq; + + /* The region we walk over sink.t copy. We can't use + * parent_object.reg, it's defined on the outer image. + */ + REGION *reg; +} SinkThreadState; + +typedef struct _SinkThreadStateClass { + VipsThreadStateClass parent_class; + +} SinkThreadStateClass; + +G_DEFINE_TYPE( SinkThreadState, sink_thread_state, VIPS_TYPE_THREAD_STATE ); + +/* Call a thread's stop function. + */ +static int +sink_call_stop( Sink *sink, SinkThreadState *state ) +{ + if( state->seq && sink->stop ) { + VIPS_DEBUG_MSG( "sink_call_stop: state = %p\n", state ); + + if( sink->stop( state->seq, sink->a, sink->b ) ) { + im_error( "vips_sink", + _( "stop function failed for image \"%s\"" ), + sink->im->filename ); + return( -1 ); + } + + state->seq = NULL; + } + + return( 0 ); +} + +static void +sink_thread_state_dispose( GObject *gobject ) +{ + SinkThreadState *state = (SinkThreadState *) gobject; + Sink *sink = (Sink *) ((VipsThreadState *) state)->a; + + sink_call_stop( sink, state ); + IM_FREEF( im_region_free, state->reg ); + + G_OBJECT_CLASS( sink_thread_state_parent_class )->dispose( gobject ); +} + +/* Call the start function for this thread, if necessary. + */ +static int +sink_call_start( Sink *sink, SinkThreadState *state ) +{ + if( !state->seq && sink->start ) { + VIPS_DEBUG_MSG( "sink_call_start: state = %p\n", state ); + + state->seq = sink->start( sink->t, sink->a, sink->b ); + + if( !state->seq ) { + im_error( "vips_sink", + _( "start function failed for image \"%s\"" ), + sink->im->filename ); + return( -1 ); + } + } + + return( 0 ); +} + +static int +sink_thread_state_build( VipsObject *object ) +{ + SinkThreadState *state = (SinkThreadState *) object; + Sink *sink = (Sink *) ((VipsThreadState *) state)->a; + + if( !(state->reg = im_region_create( sink->t )) || + sink_call_start( sink, state ) ) + return( -1 ); + + return( VIPS_OBJECT_CLASS( + sink_thread_state_parent_class )->build( object ) ); +} + +static void +sink_thread_state_class_init( SinkThreadStateClass *class ) +{ + GObjectClass *gobject_class = G_OBJECT_CLASS( class ); + VipsObjectClass *object_class = VIPS_OBJECT_CLASS( class ); + + gobject_class->dispose = sink_thread_state_dispose; + + object_class->build = sink_thread_state_build; + object_class->nickname = "sinkthreadstate"; + object_class->description = _( "per-thread state for sink" ); +} + +static void +sink_thread_state_init( SinkThreadState *state ) +{ + state->seq = NULL; + state->reg = NULL; +} + +static VipsThreadState * +sink_thread_state_new( VipsImage *im, void *a ) +{ + return( VIPS_THREAD_STATE( vips_object_new( + sink_thread_state_get_type(), + vips_thread_state_set, im, a ) ) ); +} + static void sink_free( Sink *sink ) { @@ -95,6 +213,8 @@ sink_init( Sink *sink, { sink->im = im; sink->t = NULL; + sink->x = 0; + sink->y = 0; sink->start = start; sink->generate = generate; sink->stop = stop; @@ -113,54 +233,8 @@ sink_init( Sink *sink, return( 0 ); } -/* Call the start function for this thread, if necessary. - */ -static int -sink_call_start( Sink *sink, VipsThreadState *state ) -{ - if( !state->d && sink->start ) { -#ifdef DEBUG - printf( "sink_call_start: state = %p\n", state ); -#endif /*DEBUG*/ - state->d = sink->start( sink->t, sink->a, sink->b ); - - if( !state->d ) { - im_error( "vips_sink", - _( "start function failed for image \"%s\"" ), - sink->im->filename ); - return( -1 ); - } - } - - return( 0 ); -} - -/* Call a thread's stop function. - */ -static int -sink_call_stop( Sink *sink, VipsThreadState *state ) -{ - if( state->d && sink->stop ) { -#ifdef DEBUG - printf( "sink_call_stop: state = %p\n", state ); -#endif /*DEBUG*/ - - if( sink->stop( state->d, sink->a, sink->b ) ) { - im_error( "vips_sink", - _( "stop function failed for image \"%s\"" ), - sink->im->filename ); - return( -1 ); - } - - state->d = NULL; - } - - return( 0 ); -} - static int -sink_allocate( VipsThreadState *state, - void *a, void *b, void *c, gboolean *stop ) +sink_allocate( VipsThreadState *state, void *a, gboolean *stop ) { Sink *sink = (Sink *) a; @@ -173,14 +247,12 @@ sink_allocate( VipsThreadState *state, sink->y += sink->tile_height; if( sink->y >= sink->im->Ysize ) { - sink_call_stop( sink, state ); *stop = TRUE; + return( 0 ); } } - sink_call_start( sink, state ); - /* x, y and buf are good: save params for thread. */ image.left = 0; @@ -195,27 +267,26 @@ sink_allocate( VipsThreadState *state, /* Move state on. */ - write->x += write->tile_width; + sink->x += sink->tile_width; return( 0 ); } static int -sink_work( VipsThreadState *state, void *a, void *b, void *c ) +sink_work( VipsThreadState *state, void *a ) { + SinkThreadState *sstate = (SinkThreadState *) state; Sink *sink = (Sink *) a; - if( im_prepare( state->reg, &state->pos ) || - sink->generate( state->reg, state->d, sink->a, sink->b ) ) { - sink_call_stop( sink, state ); + if( im_prepare( sstate->reg, &state->pos ) || + sink->generate( sstate->reg, sstate->seq, sink->a, sink->b ) ) return( -1 ); - } return( 0 ); } static int -sink_progress( void *a, void *b, void *c ) +sink_progress( void *a ) { Sink *sink = (Sink *) a; @@ -239,7 +310,7 @@ sink_progress( void *a, void *b, void *c ) * @b: user data * * Loops over an image. @generate is called for every pixel in the image, with - * the @reg argument being a region of pixels for processing. im_iterate() is + * the @reg argument being a region of pixels for processing. vips_sink() is * used to implement operations like im_avg() which have no image output. * * See also: im_generate(), im_open(). @@ -270,7 +341,11 @@ vips_sink( VipsImage *im, } result = vips_threadpool_run( im, - sink_allocate, sink_work, sink_progress, &sink, NULL ); + sink_thread_state_new, + sink_allocate, + sink_work, + sink_progress, + &sink ); im__end_eval( sink.t ); diff --git a/libvips/iofuncs/discsink.c b/libvips/iofuncs/sinkdisc.c similarity index 84% rename from libvips/iofuncs/discsink.c rename to libvips/iofuncs/sinkdisc.c index a7f2cfb0..ea1dedd8 100644 --- a/libvips/iofuncs/discsink.c +++ b/libvips/iofuncs/sinkdisc.c @@ -1,4 +1,4 @@ -/* Write an image. +/* Write an image to a disc file. * * 19/3/10 * - from im_wbuffer.c @@ -101,13 +101,51 @@ typedef struct _Write { */ VipsRegionWrite write_fn; void *a; - void *b; } Write; /* Enable im_wbuffer2 ... set from the cmd line, tested by our users. */ int im__wbuffer2 = 0; +/* Our per-thread state ... we need to also track the buffer that pos is + * supposed to write to. + */ +typedef struct _WriteThreadState { + VipsThreadState parent_object; + + WriteBuffer *buf; +} WriteThreadState; + +typedef struct _WriteThreadStateClass { + VipsThreadStateClass parent_class; + +} WriteThreadStateClass; + +G_DEFINE_TYPE( WriteThreadState, write_thread_state, VIPS_TYPE_THREAD_STATE ); + +static void +write_thread_state_class_init( WriteThreadStateClass *class ) +{ + VipsObjectClass *object_class = VIPS_OBJECT_CLASS( class ); + + object_class->nickname = "writethreadstate"; + object_class->description = _( "per-thread state for sinkdisc" ); +} + +static void +write_thread_state_init( WriteThreadState *state ) +{ + state->buf = NULL; +} + +static VipsThreadState * +write_thread_state_new( VipsImage *im, void *a ) +{ + return( VIPS_THREAD_STATE( vips_object_new( + write_thread_state_get_type(), + vips_thread_state_set, im, a ) ) ); +} + static void wbuffer_free( WriteBuffer *wbuffer ) { @@ -147,7 +185,7 @@ wbuffer_write( WriteBuffer *wbuffer ) #endif /*DEBUG*/ wbuffer->write_errno = write->write_fn( wbuffer->region, - &wbuffer->area, write->a, write->b ); + &wbuffer->area, write->a ); } #ifdef HAVE_THREADS @@ -308,9 +346,9 @@ wbuffer_position( WriteBuffer *wbuffer, int top, int height ) * iteration. */ static gboolean -wbuffer_allocate_fn( VipsThreadState *state, - void *a, void *b, void *c, gboolean *stop ) +wbuffer_allocate_fn( VipsThreadState *state, void *a, gboolean *stop ) { + WriteThreadState *wstate = (WriteThreadState *) state; Write *write = (Write *) a; Rect image; @@ -359,7 +397,7 @@ wbuffer_allocate_fn( VipsThreadState *state, tile.width = write->tile_width; tile.height = write->tile_height; im_rect_intersectrect( &image, &tile, &state->pos ); - state->d = write->buf; + wstate->buf = write->buf; /* Add to the number of writers on the buffer. */ @@ -375,22 +413,21 @@ wbuffer_allocate_fn( VipsThreadState *state, /* Our VipsThreadpoolWork function ... generate a tile! */ static int -wbuffer_work_fn( VipsThreadState *state, - void *a, void *b, void *c ) +wbuffer_work_fn( VipsThreadState *state, void *a ) { - WriteBuffer *wbuffer = (WriteBuffer *) state->d; + WriteThreadState *wstate = (WriteThreadState *) state; #ifdef DEBUG printf( "wbuffer_work_fn\n" ); #endif /*DEBUG*/ - if( im_prepare_to( state->reg, wbuffer->region, + if( im_prepare_to( state->reg, wstate->buf->region, &state->pos, state->pos.left, state->pos.top ) ) return( -1 ); /* Tell the bg write thread we've left. */ - im_semaphore_upn( &wbuffer->nwrite, 1 ); + im_semaphore_upn( &wstate->buf->nwrite, 1 ); return( 0 ); } @@ -398,7 +435,7 @@ wbuffer_work_fn( VipsThreadState *state, /* Our VipsThreadpoolProgress function ... send some eval progress feedback. */ static int -wbuffer_progress_fn( void *a, void *b, void *c ) +wbuffer_progress_fn( void *a ) { Write *write = (Write *) a; @@ -414,7 +451,7 @@ wbuffer_progress_fn( void *a, void *b, void *c ) static void write_init( Write *write, - VipsImage *im, VipsRegionWrite write_fn, void *a, void *b ) + VipsImage *im, VipsRegionWrite write_fn, void *a ) { write->im = im; write->buf = wbuffer_new( write ); @@ -423,7 +460,6 @@ write_init( Write *write, write->y = 0; write->write_fn = write_fn; write->a = a; - write->b = b; vips_get_tile_size( im, &write->tile_width, &write->tile_height, &write->nlines ); @@ -440,7 +476,6 @@ write_free( Write *write ) * VipsRegionWrite: * @region: pixels to write * @a: client data - * @b: client data * * The function should write the pixels in @region. @a and @b are the values * passed into vips_discsink(). @@ -451,16 +486,16 @@ write_free( Write *write ) */ /** - * vips_discsink: + * vips_sink_disc: * @im: image to process * @write_fn: called for every batch of pixels * @a: client data - * @b: client data * - * vips_discsink() loops over @im, top-to-bottom, generating it in sections. + * vips_sink_disc() loops over @im, top-to-bottom, generating it in sections. * As each section is produced, @write_fn is called. * - * @write_fn is always called single-threaded, it's always given image + * @write_fn is always called single-threaded (though not always from the same + * thread), it's always given image * sections in top-to-bottom order, and there are never any gaps. * * This operation is handy for making image sinks which output to things like @@ -471,7 +506,7 @@ write_free( Write *write ) * Returns: 0 on success, -1 on error. */ int -vips_discsink( VipsImage *im, VipsRegionWrite write_fn, void *a, void *b ) +vips_sink_disc( VipsImage *im, VipsRegionWrite write_fn, void *a ) { Write write; int result; @@ -479,7 +514,13 @@ vips_discsink( VipsImage *im, VipsRegionWrite write_fn, void *a, void *b ) if( im__start_eval( im ) ) return( -1 ); - write_init( &write, im, write_fn, a, b ); + write_init( &write, im, write_fn, a ); + + printf( "initing thread state\n" ); + + vips_thread_state_get_type(); + + printf( "made thread state\n" ); result = 0; @@ -487,10 +528,11 @@ vips_discsink( VipsImage *im, VipsRegionWrite write_fn, void *a, void *b ) !write.buf_back || wbuffer_position( write.buf, 0, write.nlines ) || vips_threadpool_run( im, + write_thread_state_new, wbuffer_allocate_fn, wbuffer_work_fn, wbuffer_progress_fn, - &write, NULL, NULL ) ) + &write ) ) result = -1; /* We've set all the buffers writing, but not waited for the BG diff --git a/libvips/iofuncs/threadpool.c b/libvips/iofuncs/threadpool.c index e8273575..bdb1bd98 100644 --- a/libvips/iofuncs/threadpool.c +++ b/libvips/iofuncs/threadpool.c @@ -38,8 +38,8 @@ /* #define TIME_THREAD #define VIPS_DEBUG_RED -#define VIPS_DEBUG */ +#define VIPS_DEBUG #ifdef HAVE_CONFIG_H #include @@ -76,19 +76,106 @@ * can be used to give feedback. * * This is like threadgroup, but workers allocate work units themselves. This - * reduces the synchronisation overhead and improves scalability. + * reduces synchronisation overhead and improves scalability. */ +/** + * VipsThreadState: + * @reg: a #REGION + * @pos: a #Rect + * @x: an int + * @y: an int + * @a: client data + * + * These per-thread values are carried around for your use by + * vips_threadpool_run(). They are private to each thread, so they are a + * useful place + * for #VipsThreadpoolAllocate and #VipsThreadpoolWork to communicate. + * + * @reg is created for you at the start of processing and freed at the end, + * but you can do what you like with it. + */ + +G_DEFINE_TYPE( VipsThreadState, vips_thread_state, VIPS_TYPE_OBJECT ); + +static void +vips_thread_state_dispose( GObject *gobject ) +{ + VipsThreadState *state = (VipsThreadState *) gobject; + + VIPS_DEBUG_MSG( "vips_thread_state_dispose:\n" ); + + IM_FREEF( im_region_free, state->reg ); + + G_OBJECT_CLASS( vips_thread_state_parent_class )->dispose( gobject ); +} + +static int +vips_thread_state_build( VipsObject *object ) +{ + VipsThreadState *state = (VipsThreadState *) object; + + if( !(state->reg = im_region_create( state->im )) ) + return( -1 ); + + return( VIPS_OBJECT_CLASS( + vips_thread_state_parent_class )->build( object ) ); +} + +static void +vips_thread_state_class_init( VipsThreadStateClass *class ) +{ + GObjectClass *gobject_class = G_OBJECT_CLASS( class ); + VipsObjectClass *object_class = VIPS_OBJECT_CLASS( class ); + + gobject_class->dispose = vips_thread_state_dispose; + + object_class->build = vips_thread_state_build; + object_class->nickname = "threadstate"; + object_class->description = _( "per-thread state for vipsthreadpool" ); +} + +static void +vips_thread_state_init( VipsThreadState *state ) +{ + VIPS_DEBUG_MSG( "vips_thread_state_init:\n" ); + + state->reg = NULL; +} + +void * +vips_thread_state_set( VipsObject *object, void *a, void *b ) +{ + VipsThreadState *state = (VipsThreadState *) object; + VipsImage *im = (VipsImage *) a; + + VIPS_DEBUG_MSG( "vips_thread_state_set:\n" ); + + state->im = im; + state->a = b; + + return( NULL ); +} + +VipsThreadState * +vips_thread_state_new( VipsImage *im, void *a ) +{ + VIPS_DEBUG_MSG( "vips_thread_state_new:\n" ); + + return( VIPS_THREAD_STATE( vips_object_new( + VIPS_TYPE_THREAD_STATE, vips_thread_state_set, im, a ) ) ); +} + /* What we track for each thread in the pool. */ typedef struct { /* All private. */ /*< private >*/ - VipsThreadState state; - struct _VipsThreadpool *pool; /* Pool we are part of */ + VipsThreadState *state; + /* Thread we are running. */ GThread *thread; @@ -115,13 +202,15 @@ typedef struct _VipsThreadpool { /*< private >*/ VipsImage *im; /* Image we are calculating */ - /* Do a unit of work (runs in parallel) and allocate a unit of work - * (serial). Plus the mutex we use to serialize work allocation. + /* STart a thread, do a unit of work (runs in parallel) and allocate + * a unit of work (serial). Plus the mutex we use to serialize work + * allocation. */ + VipsThreadStart start; VipsThreadpoolAllocate allocate; VipsThreadpoolWork work; GMutex *allocate_lock; - void *a, *b, *c; /* User arguments to work / allocate */ + void *a; /* User argument to start / allocate / etc. */ int nthr; /* Number of threads in pool */ VipsThread **thr; /* Threads */ @@ -191,7 +280,7 @@ vips_thread_free( VipsThread *thr ) thr->thread = NULL; } - IM_FREEF( im_region_free, thr->state.reg ); + IM_FREEF( g_object_unref, thr->state ); thr->pool = NULL; #ifdef TIME_THREAD @@ -212,9 +301,14 @@ vips_thread_work_unit( VipsThread *thr ) */ g_mutex_lock( pool->allocate_lock ); - if( !pool->stop ) { - if( pool->allocate( &thr->state, - pool->a, pool->b, pool->c, &pool->stop ) ) { + if( !thr->state ) + if( !(thr->state = pool->start( pool->im, pool->a )) ) { + thr->error = TRUE; + pool->error = TRUE; + } + + if( !pool->stop && !pool->error ) { + if( pool->allocate( thr->state, pool->a, &pool->stop ) ) { thr->error = TRUE; pool->error = TRUE; } @@ -235,7 +329,7 @@ vips_thread_work_unit( VipsThread *thr ) /* Process a work unit. */ - if( pool->work( &thr->state, pool->a, pool->b, pool->c ) ) { + if( pool->work( thr->state, pool->a ) ) { thr->error = TRUE; pool->error = TRUE; } @@ -262,11 +356,6 @@ vips_thread_main_loop( void *a ) g_assert( pool == thr->pool ); - /* We now control the region (it was created by pool when we - * were built). - */ - im__region_take_ownership( thr->state.reg ); - /* Process work units! Always tick, even if we are stopping, so the * main thread will wake up for exit. */ @@ -296,7 +385,7 @@ vips_thread_new( VipsThreadpool *pool ) if( !(thr = IM_NEW( pool->im, VipsThread )) ) return( NULL ); thr->pool = pool; - thr->state.reg = NULL; + thr->state = NULL; thr->thread = NULL; thr->exit = 0; thr->error = 0; @@ -306,16 +395,10 @@ vips_thread_new( VipsThreadpool *pool ) thr->tpos = 0; #endif /*TIME_THREAD*/ - /* Attach stuff. + /* We can't build the state here, it has to be done by the worker + * itself the first time that allocate runs so that any regions are + * owned by the correct thread. */ - if( !(thr->state.reg = im_region_create( pool->im )) ) { - vips_thread_free( thr ); - return( NULL ); - } - - /* Get ready to hand the region over to the thread. - */ - im__region_no_ownership( thr->state.reg ); #ifdef TIME_THREAD thr->btime = IM_ARRAY( pool->im, IM_TBUF_SIZE, double ); @@ -363,6 +446,8 @@ vips_threadpool_kill_threads( VipsThreadpool *pool ) } } +/* This can be called multiple times, careful. + */ static int vips_threadpool_free( VipsThreadpool *pool ) { @@ -439,21 +524,21 @@ vips_threadpool_create_threads( VipsThreadpool *pool ) } /** - * VipsThreadState: - * @reg: a #REGION - * @pos: a #Rect - * @x: an int - * @y: an int - * @d: client data - * @e: client data - * @f: client data + * VipsThreadpoolStart: + * @a: client data + * @b: client data + * @c: client data * - * These per-thread values are carried around for your use by - * vips_threadpool_run(). They are private to each thread, so they are handy - * for VipsThreadpoolAllocate and VipsThreadpoolWork to communicate. + * This function is called once by each worker just before the first time work + * is allocated to it to build the per-thread state. Per-thread state is used + * by #VipsThreadpoolAllocate and #VipsThreadpoolWork to communicate. * - * @reg is created for you at the start of processing and freed at the end, - * but you can do what you like with it. + * #VipsThreadState is a subclass of #VipsObject. Start functions are called + * from allocate, that is, they are single-threaded. + * + * See also: vips_threadpool_run(). + * + * Returns: a new #VipsThreadState object, or NULL on error */ /** @@ -465,7 +550,7 @@ vips_threadpool_create_threads( VipsThreadpool *pool ) * @stop: set this to signal end of computation * * This function is called to allocate a new work unit for the thread. It is - * always single-threaded, so it can modify per-call state (such as a + * always single-threaded, so it can modify per-pool state (such as a * counter). * * @a, @b, @c are the values supplied to the call to @@ -474,9 +559,9 @@ vips_threadpool_create_threads( VipsThreadpool *pool ) * It should set @stop to %TRUE to indicate that no work could be allocated * because the job is done. * - * It should return 0 for a normal return, or non-zero to indicate an error. - * * See also: vips_threadpool_run(). + * + * Returns: 0 on success, or -1 on error */ /** @@ -487,15 +572,15 @@ vips_threadpool_create_threads( VipsThreadpool *pool ) * @c: client data * * This function is called to process a work unit. Many copies of this can run - * at once, so it should not write to the per-call state. It can write to + * at once, so it should not write to the per-pool state. It can write to * per-thread state. * * @a, @b, @c are the values supplied to the call to * vips_threadpool_run(). * - * It should return 0 for a normal return, or non-zero to indicate an error. - * * See also: vips_threadpool_run(). + * + * Returns: 0 on success, or -1 on error */ /** @@ -507,14 +592,15 @@ vips_threadpool_create_threads( VipsThreadpool *pool ) * This function is called by the main thread once for every work unit * processed. It can be used to give the user progress feedback. * - * It should return 0 for a normal return, or non-zero to indicate an error. - * * See also: vips_threadpool_run(). + * + * Returns: 0 on success, or -1 on error */ /** * vips_threadpool_run: * @im: image to loop over + * @start: allocate per-thread state * @allocate: allocate a work unit * @work: process a work unit * @progress: give progress feedback about a work unit, or %NULL @@ -522,30 +608,33 @@ vips_threadpool_create_threads( VipsThreadpool *pool ) * @b: client data * @c: client data * - * This function runs a set of threads over an image. Each thread run + * This function runs a set of threads over an image. Each thread first calls + * @start to create new per-thread state, then runs * @allocate to set up a new work unit (perhaps the next tile in an image, for * example), then @work to process that work unit. After each unit is * processed, @progress is called, so that the operation can give - * progress feedback.@progress may be %NULL. + * progress feedback. @progress may be %NULL. * - * Each thread has a small amount of private state that vips_threadpool_run() - * maintains for it. The @allocate and @work functions can use this to - * communicate. + * Each thread has private state that the @allocate and @work functions can + * use to communicate. This state is created by each worker as it starts using + * @start. Use the state destructor to clean up. * - * @allocate is always single-threaded (so it can write to the per-call state), - * whereas @work can be executed concurrently. @progress is always called by - * the main thread (ie. the thread which called vips_thtreadpool_run()). + * @allocate and @start are always single-threaded (so they can write to the + * per-pool state), whereas @work can be executed concurrently. @progress is + * always called by + * the main thread (ie. the thread which called vips_threadpool_run()). * * See also: im_wbuffer2(), im_concurrency_set(). * - * Returns: 0 on success, -1 on error. + * Returns: 0 on success, or -1 on error. */ int vips_threadpool_run( VipsImage *im, + VipsThreadStart start, VipsThreadpoolAllocate allocate, VipsThreadpoolWork work, VipsThreadpoolProgress progress, - void *a, void *b, void *c ) + void *a ) { VipsThreadpool *pool; int result; @@ -558,11 +647,10 @@ vips_threadpool_run( VipsImage *im, if( !(pool = vips_threadpool_new( im )) ) return( -1 ); + pool->start = start; pool->allocate = allocate; pool->work = work; pool->a = a; - pool->b = b; - pool->c = c; /* Attach workers and set them going. */ @@ -586,7 +674,7 @@ vips_threadpool_run( VipsImage *im, break; if( progress && - progress( pool->a, pool->b, pool->c ) ) + progress( pool->a ) ) pool->error = TRUE; if( pool->stop || pool->error ) @@ -631,10 +719,10 @@ vips_get_tile_size( VipsImage *im, *tile_height = im__tile_height; /* Enough lines of tiles that we can expect to be able to keep - * nthr busy. + * nthr busy. Then double it. */ *nlines = *tile_height * - (1 + nthr / IM_MAX( 1, im->Xsize / *tile_width )); + (1 + nthr / IM_MAX( 1, im->Xsize / *tile_width )) * 2; break; case IM_FATSTRIP: diff --git a/libvips/resample/interpolate.c b/libvips/resample/interpolate.c index 7f65b9d8..2325ed9c 100644 --- a/libvips/resample/interpolate.c +++ b/libvips/resample/interpolate.c @@ -147,6 +147,7 @@ vips_interpolate( VipsInterpolate *interpolate, VipsInterpolateClass *class = VIPS_INTERPOLATE_GET_CLASS( interpolate ); g_assert( class->interpolate ); + class->interpolate( interpolate, out, in, x, y ); }