From 01eed391abd9c5e1fdacabb3dc986941f0883fad Mon Sep 17 00:00:00 2001 From: John Cupitt Date: Wed, 31 Jan 2018 16:25:06 +0000 Subject: [PATCH] add sync locks to vips_sink() vips_sink() used to just fire off threads willy nilly -- this could cause problems with sequential images if a worker fell too far behind the pack copy over the area locking mechanism from vips_sink_memory(): new threads are now stalled if an old thread has not finished --- ChangeLog | 1 + libvips/iofuncs/sink.c | 230 +++++++++++++++++++++++++++++++---------- 2 files changed, 174 insertions(+), 57 deletions(-) diff --git a/ChangeLog b/ChangeLog index 8a6c756b..59f5baae 100644 --- a/ChangeLog +++ b/ChangeLog @@ -5,6 +5,7 @@ - fix a C++ warning in composite.cpp [lovell] - remove number of images limit in composite - composite allows 1 mode ... reused for all joins +- vips_sink() has locks to prevent any threads falling too far behind 10/12/17 started 8.6.1 - fix mmap window new/free cycling diff --git a/libvips/iofuncs/sink.c b/libvips/iofuncs/sink.c index f8e44a46..d8cad9b3 100644 --- a/libvips/iofuncs/sink.c +++ b/libvips/iofuncs/sink.c @@ -50,6 +50,19 @@ #include "sink.h" +/* A part of the image we are scanning. + * + * We can't let any threads fall too far behind as that would mess up seq + * image sources. Keep track of two areas moving down the image, and stall if + * the previous area still has active threads. + */ +typedef struct _SinkArea { + struct _Sink *sink; + + VipsRect rect; /* Part of image this area covers */ + VipsSemaphore n_thread; /* Number of threads scanning this area */ +} SinkArea; + /* Per-call state. */ typedef struct _Sink { @@ -67,6 +80,13 @@ typedef struct _Sink { VipsStopFn stop_fn; void *a; void *b; + + /* We are current scanning area, we'll delay starting a new + * area if old_area (the previous position) hasn't completed. + */ + SinkArea *area; + SinkArea *old_area; + } Sink; /* Our per-thread state. @@ -82,6 +102,11 @@ typedef struct _SinkThreadState { * parent_object.reg, it's defined on the outer image. */ VipsRegion *reg; + + /* The area we were allocated from. + */ + SinkArea *area; + } SinkThreadState; typedef struct _SinkThreadStateClass { @@ -91,6 +116,133 @@ typedef struct _SinkThreadStateClass { G_DEFINE_TYPE( SinkThreadState, sink_thread_state, VIPS_TYPE_THREAD_STATE ); +static void +sink_area_free( SinkArea *area ) +{ + vips_semaphore_destroy( &area->n_thread ); + vips_free( area ); +} + +static SinkArea * +sink_area_new( Sink *sink ) +{ + SinkArea *area; + + if( !(area = VIPS_NEW( NULL, SinkArea )) ) + return( NULL ); + area->sink = sink; + vips_semaphore_init( &area->n_thread, 0, "n_thread" ); + + return( area ); +} + +/* Move an area to a position. + */ +static void +sink_area_position( SinkArea *area, int top, int height ) +{ + Sink *sink = area->sink; + + VipsRect all, rect; + + all.left = 0; + all.top = 0; + all.width = sink->sink_base.im->Xsize; + all.height = sink->sink_base.im->Ysize; + + rect.left = 0; + rect.top = top; + rect.width = sink->sink_base.im->Xsize; + rect.height = height; + + vips_rect_intersectrect( &all, &rect, &area->rect ); +} + +/* Our VipsThreadpoolAllocate function ... move the thread to the next tile + * that needs doing. If we fill the current area, we block until the previous + * area is finished, then swap areas. + * + * If all tiles are done, we return FALSE to end iteration. + */ +static gboolean +sink_area_allocate_fn( VipsThreadState *state, void *a, gboolean *stop ) +{ + SinkThreadState *wstate = (SinkThreadState *) state; + Sink *sink = (Sink *) a; + SinkBase *sink_base = (SinkBase *) sink; + + VipsRect image; + VipsRect tile; + + VIPS_DEBUG_MSG( "sink_area_allocate_fn: %p\n", g_thread_self() ); + + /* Is the state x/y OK? New line or maybe new buffer or maybe even + * all done. + */ + if( sink_base->x >= sink->area->rect.width ) { + sink_base->x = 0; + sink_base->y += sink_base->tile_height; + + if( sink_base->y >= VIPS_RECT_BOTTOM( &sink->area->rect ) ) { + /* Block until the previous area is done. + */ + if( sink->area->rect.top > 0 ) + vips_semaphore_downn( + &sink->old_area->n_thread, 0 ); + + /* End of image? + */ + if( sink_base->y >= sink_base->im->Ysize ) { + *stop = TRUE; + return( 0 ); + } + + /* Swap buffers. + */ + VIPS_SWAP( SinkArea *, + sink->area, sink->old_area ); + + /* Position buf at the new y. + */ + sink_area_position( sink->area, + sink_base->y, sink_base->n_lines ); + } + } + + /* x, y and buf are good: save params for thread. + */ + image.left = 0; + image.top = 0; + image.width = sink_base->im->Xsize; + image.height = sink_base->im->Ysize; + tile.left = sink_base->x; + tile.top = sink_base->y; + tile.width = sink_base->tile_width; + tile.height = sink_base->tile_height; + vips_rect_intersectrect( &image, &tile, &state->pos ); + + /* The thread needs to know which area it's writing to. + */ + wstate->area = sink->area; + + VIPS_DEBUG_MSG( " %p allocated %d x %d:\n", + g_thread_self(), state->pos.left, state->pos.top ); + + /* Add to the number of writers on the area. + */ + vips_semaphore_upn( &sink->area->n_thread, -1 ); + + /* Move state on. + */ + sink_base->x += sink_base->tile_width; + + /* Add the number of pixels we've just allocated to progress. + */ + sink_base->processed += state->pos.width * state->pos.height; + + return( 0 ); +} + /* Call a thread's stop function. */ static int @@ -194,6 +346,8 @@ vips_sink_thread_state_new( VipsImage *im, void *a ) static void sink_free( Sink *sink ) { + VIPS_FREEF( sink_area_free, sink->area ); + VIPS_FREEF( sink_area_free, sink->old_area ); VIPS_FREEF( g_object_unref, sink->t ); } @@ -233,7 +387,12 @@ sink_init( Sink *sink, sink->a = a; sink->b = b; + sink->area = NULL; + sink->old_area = NULL; + if( !(sink->t = vips_image_new()) || + !(sink->area = sink_area_new( sink )) || + !(sink->old_area = sink_area_new( sink )) || vips_image_write( sink->sink_base.im, sink->t ) ) { sink_free( sink ); return( -1 ); @@ -242,69 +401,25 @@ sink_init( Sink *sink, return( 0 ); } -int -vips_sink_base_allocate( VipsThreadState *state, void *a, gboolean *stop ) -{ - SinkBase *sink_base = (SinkBase *) a; - - VipsRect image, tile; - - /* Has work requested early termination? - */ - if( state->stop ) { - *stop = TRUE; - - return( 0 ); - } - - /* Is the state x/y OK? New line or maybe all done. - */ - if( sink_base->x >= sink_base->im->Xsize ) { - sink_base->x = 0; - sink_base->y += sink_base->tile_height; - - if( sink_base->y >= sink_base->im->Ysize ) { - *stop = TRUE; - - return( 0 ); - } - } - - /* x, y and buf are good: save params for thread. - */ - image.left = 0; - image.top = 0; - image.width = sink_base->im->Xsize; - image.height = sink_base->im->Ysize; - tile.left = sink_base->x; - tile.top = sink_base->y; - tile.width = sink_base->tile_width; - tile.height = sink_base->tile_height; - vips_rect_intersectrect( &image, &tile, &state->pos ); - - /* Move state on. - */ - sink_base->x += sink_base->tile_width; - - /* Add the number of pixels we've just allocated to progress. - */ - sink_base->processed += state->pos.width * state->pos.height; - - return( 0 ); -} - static int sink_work( VipsThreadState *state, void *a ) { SinkThreadState *sstate = (SinkThreadState *) state; Sink *sink = (Sink *) a; + SinkArea *area = sstate->area; - if( vips_region_prepare( sstate->reg, &state->pos ) || - sink->generate_fn( sstate->reg, sstate->seq, - sink->a, sink->b, &state->stop ) ) - return( -1 ); + int result; - return( 0 ); + result = vips_region_prepare( sstate->reg, &state->pos ); + if( !result ) + result = sink->generate_fn( sstate->reg, sstate->seq, + sink->a, sink->b, &state->stop ); + + /* Tell the allocator we're done. + */ + vips_semaphore_upn( &area->n_thread, 1 ); + + return( result ); } int @@ -376,9 +491,10 @@ vips_sink_tile( VipsImage *im, */ vips_image_preeval( im ); + sink_area_position( sink.area, 0, sink.sink_base.n_lines ); result = vips_threadpool_run( im, vips_sink_thread_state_new, - vips_sink_base_allocate, + sink_area_allocate_fn, sink_work, vips_sink_base_progress, &sink );