add sync locks to vips_sink()

vips_sink() used to just fire off threads willy nilly -- this could
cause problems with sequential images if a worker fell too far behind
the pack

copy over the area locking mechanism from vips_sink_memory(): new
threads are now stalled if an old thread has not finished
This commit is contained in:
John Cupitt 2018-01-31 16:25:06 +00:00
parent 421f659d33
commit 01eed391ab
2 changed files with 174 additions and 57 deletions

View File

@ -5,6 +5,7 @@
- fix a C++ warning in composite.cpp [lovell]
- remove number of images limit in composite
- composite allows 1 mode ... reused for all joins
- vips_sink() has locks to prevent any threads falling too far behind
10/12/17 started 8.6.1
- fix mmap window new/free cycling

View File

@ -50,6 +50,19 @@
#include "sink.h"
/* A part of the image we are scanning.
*
* We can't let any threads fall too far behind as that would mess up seq
* image sources. Keep track of two areas moving down the image, and stall if
* the previous area still has active threads.
*/
typedef struct _SinkArea {
struct _Sink *sink;
VipsRect rect; /* Part of image this area covers */
VipsSemaphore n_thread; /* Number of threads scanning this area */
} SinkArea;
/* Per-call state.
*/
typedef struct _Sink {
@ -67,6 +80,13 @@ typedef struct _Sink {
VipsStopFn stop_fn;
void *a;
void *b;
/* We are current scanning area, we'll delay starting a new
* area if old_area (the previous position) hasn't completed.
*/
SinkArea *area;
SinkArea *old_area;
} Sink;
/* Our per-thread state.
@ -82,6 +102,11 @@ typedef struct _SinkThreadState {
* parent_object.reg, it's defined on the outer image.
*/
VipsRegion *reg;
/* The area we were allocated from.
*/
SinkArea *area;
} SinkThreadState;
typedef struct _SinkThreadStateClass {
@ -91,6 +116,133 @@ typedef struct _SinkThreadStateClass {
G_DEFINE_TYPE( SinkThreadState, sink_thread_state, VIPS_TYPE_THREAD_STATE );
static void
sink_area_free( SinkArea *area )
{
vips_semaphore_destroy( &area->n_thread );
vips_free( area );
}
static SinkArea *
sink_area_new( Sink *sink )
{
SinkArea *area;
if( !(area = VIPS_NEW( NULL, SinkArea )) )
return( NULL );
area->sink = sink;
vips_semaphore_init( &area->n_thread, 0, "n_thread" );
return( area );
}
/* Move an area to a position.
*/
static void
sink_area_position( SinkArea *area, int top, int height )
{
Sink *sink = area->sink;
VipsRect all, rect;
all.left = 0;
all.top = 0;
all.width = sink->sink_base.im->Xsize;
all.height = sink->sink_base.im->Ysize;
rect.left = 0;
rect.top = top;
rect.width = sink->sink_base.im->Xsize;
rect.height = height;
vips_rect_intersectrect( &all, &rect, &area->rect );
}
/* Our VipsThreadpoolAllocate function ... move the thread to the next tile
* that needs doing. If we fill the current area, we block until the previous
* area is finished, then swap areas.
*
* If all tiles are done, we return FALSE to end iteration.
*/
static gboolean
sink_area_allocate_fn( VipsThreadState *state, void *a, gboolean *stop )
{
SinkThreadState *wstate = (SinkThreadState *) state;
Sink *sink = (Sink *) a;
SinkBase *sink_base = (SinkBase *) sink;
VipsRect image;
VipsRect tile;
VIPS_DEBUG_MSG( "sink_area_allocate_fn: %p\n", g_thread_self() );
/* Is the state x/y OK? New line or maybe new buffer or maybe even
* all done.
*/
if( sink_base->x >= sink->area->rect.width ) {
sink_base->x = 0;
sink_base->y += sink_base->tile_height;
if( sink_base->y >= VIPS_RECT_BOTTOM( &sink->area->rect ) ) {
/* Block until the previous area is done.
*/
if( sink->area->rect.top > 0 )
vips_semaphore_downn(
&sink->old_area->n_thread, 0 );
/* End of image?
*/
if( sink_base->y >= sink_base->im->Ysize ) {
*stop = TRUE;
return( 0 );
}
/* Swap buffers.
*/
VIPS_SWAP( SinkArea *,
sink->area, sink->old_area );
/* Position buf at the new y.
*/
sink_area_position( sink->area,
sink_base->y, sink_base->n_lines );
}
}
/* x, y and buf are good: save params for thread.
*/
image.left = 0;
image.top = 0;
image.width = sink_base->im->Xsize;
image.height = sink_base->im->Ysize;
tile.left = sink_base->x;
tile.top = sink_base->y;
tile.width = sink_base->tile_width;
tile.height = sink_base->tile_height;
vips_rect_intersectrect( &image, &tile, &state->pos );
/* The thread needs to know which area it's writing to.
*/
wstate->area = sink->area;
VIPS_DEBUG_MSG( " %p allocated %d x %d:\n",
g_thread_self(), state->pos.left, state->pos.top );
/* Add to the number of writers on the area.
*/
vips_semaphore_upn( &sink->area->n_thread, -1 );
/* Move state on.
*/
sink_base->x += sink_base->tile_width;
/* Add the number of pixels we've just allocated to progress.
*/
sink_base->processed += state->pos.width * state->pos.height;
return( 0 );
}
/* Call a thread's stop function.
*/
static int
@ -194,6 +346,8 @@ vips_sink_thread_state_new( VipsImage *im, void *a )
static void
sink_free( Sink *sink )
{
VIPS_FREEF( sink_area_free, sink->area );
VIPS_FREEF( sink_area_free, sink->old_area );
VIPS_FREEF( g_object_unref, sink->t );
}
@ -233,7 +387,12 @@ sink_init( Sink *sink,
sink->a = a;
sink->b = b;
sink->area = NULL;
sink->old_area = NULL;
if( !(sink->t = vips_image_new()) ||
!(sink->area = sink_area_new( sink )) ||
!(sink->old_area = sink_area_new( sink )) ||
vips_image_write( sink->sink_base.im, sink->t ) ) {
sink_free( sink );
return( -1 );
@ -242,69 +401,25 @@ sink_init( Sink *sink,
return( 0 );
}
int
vips_sink_base_allocate( VipsThreadState *state, void *a, gboolean *stop )
{
SinkBase *sink_base = (SinkBase *) a;
VipsRect image, tile;
/* Has work requested early termination?
*/
if( state->stop ) {
*stop = TRUE;
return( 0 );
}
/* Is the state x/y OK? New line or maybe all done.
*/
if( sink_base->x >= sink_base->im->Xsize ) {
sink_base->x = 0;
sink_base->y += sink_base->tile_height;
if( sink_base->y >= sink_base->im->Ysize ) {
*stop = TRUE;
return( 0 );
}
}
/* x, y and buf are good: save params for thread.
*/
image.left = 0;
image.top = 0;
image.width = sink_base->im->Xsize;
image.height = sink_base->im->Ysize;
tile.left = sink_base->x;
tile.top = sink_base->y;
tile.width = sink_base->tile_width;
tile.height = sink_base->tile_height;
vips_rect_intersectrect( &image, &tile, &state->pos );
/* Move state on.
*/
sink_base->x += sink_base->tile_width;
/* Add the number of pixels we've just allocated to progress.
*/
sink_base->processed += state->pos.width * state->pos.height;
return( 0 );
}
static int
sink_work( VipsThreadState *state, void *a )
{
SinkThreadState *sstate = (SinkThreadState *) state;
Sink *sink = (Sink *) a;
SinkArea *area = sstate->area;
if( vips_region_prepare( sstate->reg, &state->pos ) ||
sink->generate_fn( sstate->reg, sstate->seq,
sink->a, sink->b, &state->stop ) )
return( -1 );
int result;
return( 0 );
result = vips_region_prepare( sstate->reg, &state->pos );
if( !result )
result = sink->generate_fn( sstate->reg, sstate->seq,
sink->a, sink->b, &state->stop );
/* Tell the allocator we're done.
*/
vips_semaphore_upn( &area->n_thread, 1 );
return( result );
}
int
@ -376,9 +491,10 @@ vips_sink_tile( VipsImage *im,
*/
vips_image_preeval( im );
sink_area_position( sink.area, 0, sink.sink_base.n_lines );
result = vips_threadpool_run( im,
vips_sink_thread_state_new,
vips_sink_base_allocate,
sink_area_allocate_fn,
sink_work,
vips_sink_base_progress,
&sink );