From f5f0dda551e5b4deb889c544d7cc3b0c45de926c Mon Sep 17 00:00:00 2001 From: John Cupitt Date: Tue, 21 Aug 2012 16:18:25 +0100 Subject: [PATCH] experiment with thread stalling --- ChangeLog | 1 + libvips/conversion/sequential.c | 120 ++++++++++++++++++++------------ libvips/conversion/tilecache.c | 90 ++++++++++-------------- libvips/foreign/foreign.c | 29 -------- libvips/foreign/jpeg2vips.c | 7 +- libvips/foreign/tiff2vips.c | 3 +- libvips/foreign/vipspng.c | 5 +- libvips/include/vips/internal.h | 4 -- libvips/iofuncs/operation.c | 2 - libvips/iofuncs/semaphore.c | 8 ++- libvips/iofuncs/sinkdisc.c | 12 ++-- libvips/resample/shrink.c | 2 +- 12 files changed, 133 insertions(+), 150 deletions(-) diff --git a/ChangeLog b/ChangeLog index 9341dcf5..b80275bb 100644 --- a/ChangeLog +++ b/ChangeLog @@ -10,6 +10,7 @@ - better error msgs for enum args - fix compiler warnings in production build (thanks Dmitry) - fix spurious warnings about exif updates +- VipsSequential has an integrated cache and stalls out of order threads 20/7/12 started 7.30.0 - support "rs" mode in vips7 diff --git a/libvips/conversion/sequential.c b/libvips/conversion/sequential.c index 50853cd1..dc4acfdd 100644 --- a/libvips/conversion/sequential.c +++ b/libvips/conversion/sequential.c @@ -9,6 +9,9 @@ * - support skip forwards as well, so we can do extract/insert * 10/8/12 * - add @trace option + * 21/8/12 + * - remove skip forward, instead do thread stalling and have an + * integrated cache */ /* @@ -38,8 +41,8 @@ */ /* -#define VIPS_DEBUG */ +#define VIPS_DEBUG #ifdef HAVE_CONFIG_H #include @@ -61,14 +64,32 @@ typedef struct _VipsSequential { VipsImage *in; - int y_pos; gboolean trace; + + /* Lock access to y_pos with this, use the cond to wake up stalled + * threads. + */ + GMutex *lock; + GCond *ready; + + int y_pos; } VipsSequential; typedef VipsConversionClass VipsSequentialClass; G_DEFINE_TYPE( VipsSequential, vips_sequential, VIPS_TYPE_CONVERSION ); +static void +vips_sequential_dispose( GObject *gobject ) +{ + VipsSequential *sequential = (VipsSequential *) gobject; + + VIPS_FREEF( g_mutex_free, sequential->lock ); + VIPS_FREEF( g_cond_free, sequential->ready ); + + G_OBJECT_CLASS( vips_sequential_parent_class )->dispose( gobject ); +} + static int vips_sequential_generate( VipsRegion *or, void *seq, void *a, void *b, gboolean *stop ) @@ -77,55 +98,53 @@ vips_sequential_generate( VipsRegion *or, VipsRect *r = &or->valid; VipsRegion *ir = (VipsRegion *) seq; + printf( "vips_sequential_generate: thread %p " + "request for %d lines, starting at line %d\n", + g_thread_self(), + r->height, r->top ); + if( sequential->trace ) vips_diag( "VipsSequential", - "%d lines, starting at line %d", r->height, r->top ); + "request for %d lines, starting at line %d", + r->height, r->top ); +retry: - /* We can't go backwards, but we can skip forwards. + g_mutex_lock( sequential->lock ); + + if( r->top > sequential->y_pos ) { + /* This is for stuff in the future, stall. + */ + printf( "thread %p stalling ...\n", g_thread_self() ); + g_cond_wait( sequential->ready, sequential->lock ); + printf( "thread %p awake again, retrying ...\n", + g_thread_self() ); + goto retry; + } + + /* This is a request for old or present pixels -- serve from cache. + * This may trigger further, sequential reads. */ - if( r->top < sequential->y_pos ) { - vips_error( "VipsSequential", - _( "at line %d in file, but line %d requested" ), - sequential->y_pos, r->top ); + printf( "thread %p reading ...\n", g_thread_self() ); + if( vips_region_prepare( ir, r ) || + vips_region_region( or, ir, r, r->left, r->top ) ) { + g_mutex_unlock( sequential->lock ); return( -1 ); } - /* We're inside a tilecache where tiles are the full image width, so - * this should always be true. - */ - g_assert( r->left == 0 ); - g_assert( r->width == or->im->Xsize ); - g_assert( VIPS_RECT_BOTTOM( r ) <= or->im->Ysize ); + if( VIPS_RECT_BOTTOM( r ) >= sequential->y_pos ) { + /* This request has straddled or continued on from the read + * point. Update it, and wake up all stalled threads for a + * retry. + */ + sequential->y_pos = VIPS_RECT_BOTTOM( r ); - /* Skip forwards, if necessary. - */ - while( sequential->y_pos < r->top ) { - VipsRect rect; + printf( "thread %p updating y_pos and waking stalled\n", + g_thread_self() ); - rect.top = sequential->y_pos; - rect.left = 0; - rect.width = or->im->Xsize; - rect.height = VIPS_MIN( r->top - sequential->y_pos, - VIPS__FATSTRIP_HEIGHT ); - if( vips_region_prepare( ir, &rect ) ) - return( -1 ); - - sequential->y_pos += rect.height; - - if( sequential->trace ) - vips_diag( "VipsSequential", - "skipping %d lines", rect.height ); + g_cond_broadcast( sequential->ready ); } - g_assert( sequential->y_pos == r->top ); - - /* Pointer copy. - */ - if( vips_region_prepare( ir, r ) || - vips_region_region( or, ir, r, r->left, r->top ) ) - return( -1 ); - - sequential->y_pos += r->height; + g_mutex_unlock( sequential->lock ); return( 0 ); } @@ -136,6 +155,8 @@ vips_sequential_build( VipsObject *object ) VipsConversion *conversion = VIPS_CONVERSION( object ); VipsSequential *sequential = (VipsSequential *) object; + VipsImage *t; + VIPS_DEBUG_MSG( "vips_sequential_build\n" ); if( VIPS_OBJECT_CLASS( vips_sequential_parent_class )->build( object ) ) @@ -144,14 +165,22 @@ vips_sequential_build( VipsObject *object ) if( vips_image_pio_input( sequential->in ) ) return( -1 ); - if( vips_image_copy_fields( conversion->out, sequential->in ) ) + if( vips_tilecache( sequential->in, &t, + "tile_width", sequential->in->Xsize, + "tile_height", 1, + "max_tiles", 500, + "strategy", VIPS_CACHE_SEQUENTIAL, + NULL ) ) return( -1 ); - vips_demand_hint( conversion->out, - VIPS_DEMAND_STYLE_FATSTRIP, sequential->in, NULL ); + vips_object_local( object, t ); + if( vips_image_copy_fields( conversion->out, t ) ) + return( -1 ); + vips_demand_hint( conversion->out, + VIPS_DEMAND_STYLE_FATSTRIP, t, NULL ); if( vips_image_generate( conversion->out, vips_start_one, vips_sequential_generate, vips_stop_one, - sequential->in, sequential ) ) + t, sequential ) ) return( -1 ); return( 0 ); @@ -165,6 +194,7 @@ vips_sequential_class_init( VipsSequentialClass *class ) VIPS_DEBUG_MSG( "vips_sequential_class_init\n" ); + gobject_class->dispose = vips_sequential_dispose; gobject_class->set_property = vips_object_set_property; gobject_class->get_property = vips_object_get_property; @@ -190,6 +220,8 @@ static void vips_sequential_init( VipsSequential *sequential ) { sequential->trace = FALSE; + sequential->lock = g_mutex_new(); + sequential->ready = g_cond_new(); } /** diff --git a/libvips/conversion/tilecache.c b/libvips/conversion/tilecache.c index dc0ea4ad..44865229 100644 --- a/libvips/conversion/tilecache.c +++ b/libvips/conversion/tilecache.c @@ -79,7 +79,7 @@ typedef struct { int time; /* Time of last use for flush */ int x; /* xy pos in VIPS image cods */ int y; -} Tile; +} VipsTile; typedef struct _VipsTileCache { VipsConversion parent_instance; @@ -103,7 +103,7 @@ typedef VipsConversionClass VipsTileCacheClass; G_DEFINE_TYPE( VipsTileCache, vips_tile_cache, VIPS_TYPE_CONVERSION ); static void -tile_destroy( Tile *tile ) +vips_tile_destroy( VipsTile *tile ) { VipsTileCache *cache = tile->cache; @@ -121,9 +121,9 @@ static void vips_tile_cache_drop_all( VipsTileCache *cache ) { while( cache->tiles ) { - Tile *tile = (Tile *) cache->tiles->data; + VipsTile *tile = (VipsTile *) cache->tiles->data; - tile_destroy( tile ); + vips_tile_destroy( tile ); } } @@ -138,12 +138,12 @@ vips_tile_cache_dispose( GObject *gobject ) G_OBJECT_CLASS( vips_tile_cache_parent_class )->dispose( gobject ); } -static Tile * -tile_new( VipsTileCache *cache ) +static VipsTile * +vips_tile_new( VipsTileCache *cache ) { - Tile *tile; + VipsTile *tile; - if( !(tile = VIPS_NEW( NULL, Tile )) ) + if( !(tile = VIPS_NEW( NULL, VipsTile )) ) return( NULL ); tile->cache = cache; @@ -156,7 +156,7 @@ tile_new( VipsTileCache *cache ) cache->ntiles += 1; if( !(tile->region = vips_region_new( cache->in )) ) { - tile_destroy( tile ); + vips_tile_destroy( tile ); return( NULL ); } vips__region_no_ownership( tile->region ); @@ -165,7 +165,7 @@ tile_new( VipsTileCache *cache ) } static int -tile_move( Tile *tile, int x, int y ) +vips_tile_move( VipsTile *tile, int x, int y ) { VipsRect area; @@ -184,14 +184,16 @@ tile_move( Tile *tile, int x, int y ) } /* Do we have a tile in the cache? + * + * FIXME .. use a hash? */ -static Tile * -tile_search( VipsTileCache *cache, int x, int y ) +static VipsTile * +vips_tile_search( VipsTileCache *cache, int x, int y ) { GSList *p; for( p = cache->tiles; p; p = p->next ) { - Tile *tile = (Tile *) p->data; + VipsTile *tile = (VipsTile *) p->data; if( tile->x == x && tile->y == y ) return( tile ); @@ -201,7 +203,7 @@ tile_search( VipsTileCache *cache, int x, int y ) } static void -tile_touch( Tile *tile ) +vips_tile_touch( VipsTile *tile ) { g_assert( tile->cache->ntiles >= 0 ); @@ -211,7 +213,7 @@ tile_touch( Tile *tile ) /* Fill a tile with pixels. */ static int -tile_fill( Tile *tile, VipsRegion *in ) +vips_tile_fill( VipsTile *tile, VipsRegion *in ) { VipsRect area; @@ -226,7 +228,7 @@ tile_fill( Tile *tile, VipsRegion *in ) &area, area.left, area.top ) ) return( -1 ); - tile_touch( tile ); + vips_tile_touch( tile ); return( 0 ); } @@ -234,17 +236,17 @@ tile_fill( Tile *tile, VipsRegion *in ) /* Find existing tile, make a new tile, or if we have a full set of tiles, * reuse LRU. */ -static Tile * -tile_find( VipsTileCache *cache, VipsRegion *in, int x, int y ) +static VipsTile * +vips_tile_find( VipsTileCache *cache, VipsRegion *in, int x, int y ) { - Tile *tile; + VipsTile *tile; int oldest, topmost; GSList *p; /* In cache already? */ - if( (tile = tile_search( cache, x, y )) ) { - tile_touch( tile ); + if( (tile = vips_tile_search( cache, x, y )) ) { + vips_tile_touch( tile ); return( tile ); } @@ -253,9 +255,9 @@ tile_find( VipsTileCache *cache, VipsRegion *in, int x, int y ) */ if( cache->max_tiles == -1 || cache->ntiles < cache->max_tiles ) { - if( !(tile = tile_new( cache )) || - tile_move( tile, x, y ) || - tile_fill( tile, in ) ) + if( !(tile = vips_tile_new( cache )) || + vips_tile_move( tile, x, y ) || + vips_tile_fill( tile, in ) ) return( NULL ); return( tile ); @@ -268,7 +270,7 @@ tile_find( VipsTileCache *cache, VipsRegion *in, int x, int y ) oldest = cache->time; tile = NULL; for( p = cache->tiles; p; p = p->next ) { - Tile *t = (Tile *) p->data; + VipsTile *t = (VipsTile *) p->data; if( t->time < oldest ) { oldest = t->time; @@ -281,7 +283,7 @@ tile_find( VipsTileCache *cache, VipsRegion *in, int x, int y ) topmost = cache->in->Ysize; tile = NULL; for( p = cache->tiles; p; p = p->next ) { - Tile *t = (Tile *) p->data; + VipsTile *t = (VipsTile *) p->data; if( t->y < topmost ) { topmost = t->y; @@ -298,35 +300,13 @@ tile_find( VipsTileCache *cache, VipsRegion *in, int x, int y ) VIPS_DEBUG_MSG( "tilecache: reusing tile %d x %d\n", tile->x, tile->y ); - if( tile_move( tile, x, y ) || - tile_fill( tile, in ) ) + if( vips_tile_move( tile, x, y ) || + vips_tile_fill( tile, in ) ) return( NULL ); return( tile ); } -/* Copy rect from @from to @to. - */ -static void -copy_region( VipsRegion *from, VipsRegion *to, VipsRect *area ) -{ - int y; - - /* Area should be inside both from and to. - */ - g_assert( vips_rect_includesrect( &from->valid, area ) ); - g_assert( vips_rect_includesrect( &to->valid, area ) ); - - /* Loop down common area, copying. - */ - for( y = area->top; y < VIPS_RECT_BOTTOM( area ); y++ ) { - VipsPel *p = VIPS_REGION_ADDR( from, area->left, y ); - VipsPel *q = VIPS_REGION_ADDR( to, area->left, y ); - - memcpy( q, p, VIPS_IMAGE_SIZEOF_PEL( from->im ) * area->width ); - } -} - /* Generate func. */ static int @@ -353,7 +333,7 @@ vips_tile_cache_gen( VipsRegion *or, * the output region directly to the tile. * * However this would mean that tile drop on minimise could then leave - * dangling pointers, if minimuse was called on an active pipeline. + * dangling pointers, if minimise were called on an active pipeline. */ VIPS_DEBUG_MSG( "vips_tile_cache_gen: " @@ -362,11 +342,11 @@ vips_tile_cache_gen( VipsRegion *or, for( y = ys; y < VIPS_RECT_BOTTOM( r ); y += th ) for( x = xs; x < VIPS_RECT_RIGHT( r ); x += tw ) { - Tile *tile; + VipsTile *tile; VipsRect tarea; VipsRect hit; - if( !(tile = tile_find( cache, in, x, y )) ) { + if( !(tile = vips_tile_find( cache, in, x, y )) ) { g_mutex_unlock( cache->lock ); return( -1 ); } @@ -381,8 +361,8 @@ vips_tile_cache_gen( VipsRegion *or, /* The part of the tile that we need. */ vips_rect_intersectrect( &tarea, r, &hit ); - - copy_region( tile->region, or, &hit ); + vips_region_copy( tile->region, or, &hit, + hit.left, hit.top ); } g_mutex_unlock( cache->lock ); diff --git a/libvips/foreign/foreign.c b/libvips/foreign/foreign.c index b3e0ce18..79ba8e0e 100644 --- a/libvips/foreign/foreign.c +++ b/libvips/foreign/foreign.c @@ -956,35 +956,6 @@ vips_foreign_load_init( VipsForeignLoad *load ) load->disc = TRUE; } -/* Make a sequential cache for a file reader. - */ -int -vips_foreign_tilecache( VipsImage *in, VipsImage **out, int strip_height ) -{ - int tile_width; - int tile_height; - int nlines; - int nstrips; - - vips_get_tile_size( in, &tile_width, &tile_height, &nlines ); - - /* We need two buffers, each with enough strips to make a complete - * buffer. And double to be safe, since input buffers must be larger - * than output, and our buffers may not align exactly. - */ - nstrips = 2 * (1 + nlines / strip_height); - - if( vips_tilecache( in, out, - "tile_width", in->Xsize, - "tile_height", strip_height * nstrips, - "max_tiles", 2, - "strategy", VIPS_CACHE_SEQUENTIAL, - NULL ) ) - return( -1 ); - - return( 0 ); -} - /* Abstract base class for image savers. */ diff --git a/libvips/foreign/jpeg2vips.c b/libvips/foreign/jpeg2vips.c index 9a105d74..750496fe 100644 --- a/libvips/foreign/jpeg2vips.c +++ b/libvips/foreign/jpeg2vips.c @@ -915,11 +915,10 @@ read_jpeg_image( ReadJpeg *jpeg, VipsImage *out ) #endif /*DEBUG*/ if( vips_image_generate( t[0], - NULL, read_jpeg_generate, NULL, - jpeg, NULL ) || + NULL, read_jpeg_generate, NULL, + jpeg, NULL ) || vips_sequential( t[0], &t[1], NULL ) || - vips_foreign_tilecache( t[1], &t[2], 8 ) || - vips_image_write( t[2], out ) ) + vips_image_write( t[1], out ) ) return( -1 ); return( 0 ); diff --git a/libvips/foreign/tiff2vips.c b/libvips/foreign/tiff2vips.c index 5d3331b4..d5b1c343 100644 --- a/libvips/foreign/tiff2vips.c +++ b/libvips/foreign/tiff2vips.c @@ -1429,8 +1429,7 @@ read_stripwise( ReadTiff *rtiff, VipsImage *out ) NULL, tiff2vips_stripwise_generate, NULL, rtiff, tbuf ) || vips_sequential( t[0], &t[1], NULL ) || - vips_foreign_tilecache( t[1], &t[2], rtiff->rows_per_strip ) || - vips_image_write( t[2], out ) ) + vips_image_write( t[1], out ) ) return( -1 ); return( 0 ); diff --git a/libvips/foreign/vipspng.c b/libvips/foreign/vipspng.c index df9f06b8..8926ca0f 100644 --- a/libvips/foreign/vipspng.c +++ b/libvips/foreign/vipspng.c @@ -75,8 +75,8 @@ */ /* -#define DEBUG */ +#define DEBUG #ifdef HAVE_CONFIG_H #include @@ -490,8 +490,7 @@ vips__png_read( const char *name, VipsImage *out ) NULL, png2vips_generate, NULL, read, NULL ) || vips_sequential( t[0], &t[1], NULL ) || - vips_foreign_tilecache( t[1], &t[2], 16 ) || - vips_image_write( t[2], out ) ) + vips_image_write( t[1], out ) ) return( -1 ); } diff --git a/libvips/include/vips/internal.h b/libvips/include/vips/internal.h index a5f4ae9a..93d7272f 100644 --- a/libvips/include/vips/internal.h +++ b/libvips/include/vips/internal.h @@ -139,10 +139,6 @@ int vips__sizealike( VipsImage *in1, VipsImage *in2, int vips__bandalike( const char *domain, VipsImage *in1, VipsImage *in2, VipsImage **out1, VipsImage **out2 ); -int vips_foreign_tilecache( VipsImage *in, VipsImage **out, int strip_height ); - - - void im__format_init( void ); diff --git a/libvips/iofuncs/operation.c b/libvips/iofuncs/operation.c index 1d75eedf..7927f2a6 100644 --- a/libvips/iofuncs/operation.c +++ b/libvips/iofuncs/operation.c @@ -281,8 +281,6 @@ vips_operation_class_init( VipsOperationClass *class ) static void vips_operation_init( VipsOperation *operation ) { - /* Init our instance fields. - */ } /** diff --git a/libvips/iofuncs/semaphore.c b/libvips/iofuncs/semaphore.c index 6b094d02..3b4959b8 100644 --- a/libvips/iofuncs/semaphore.c +++ b/libvips/iofuncs/semaphore.c @@ -87,8 +87,14 @@ vips_semaphore_upn( VipsSemaphore *s, int n ) s->v += n; value_after_op = s->v; #ifdef HAVE_THREADS + /* If we are only incrementing by one, we only need to wake a single + * thread. If we are incrementing by a lot, we must wake all threads. + */ + if( n == 1 ) + g_cond_signal( s->cond ); + else + g_cond_broadcast( s->cond ); g_mutex_unlock( s->mutex ); - g_cond_signal( s->cond ); #endif /*HAVE_THREADS*/ #ifdef DEBUG_IO diff --git a/libvips/iofuncs/sinkdisc.c b/libvips/iofuncs/sinkdisc.c index 3130c210..0d633399 100644 --- a/libvips/iofuncs/sinkdisc.c +++ b/libvips/iofuncs/sinkdisc.c @@ -38,8 +38,8 @@ */ /* -#define VIPS_DEBUG */ +#define VIPS_DEBUG #ifdef HAVE_CONFIG_H #include @@ -337,8 +337,8 @@ wbuffer_allocate_fn( VipsThreadState *state, void *a, gboolean *stop ) sink_base->y += sink_base->tile_height; if( sink_base->y >= VIPS_RECT_BOTTOM( &write->buf->area ) ) { - /* Block until the last write is done, then set write - * of the front buffer going. + /* Block until the write of the previous buffer + * is done, then set write of this buffer going. */ if( wbuffer_flush( write ) ) return( -1 ); @@ -365,7 +365,8 @@ wbuffer_allocate_fn( VipsThreadState *state, void *a, gboolean *stop ) /* Position buf at the new y. */ if( wbuffer_position( write->buf, - sink_base->y, sink_base->nlines ) ) + //sink_base->y, sink_base->nlines ) ) + sink_base->y, sink_base->tile_height ) ) return( -1 ); } } @@ -495,7 +496,8 @@ vips_sink_disc( VipsImage *im, VipsRegionWrite write_fn, void *a ) result = 0; if( !write.buf || !write.buf_back || - wbuffer_position( write.buf, 0, write.sink_base.nlines ) || + //wbuffer_position( write.buf, 0, write.sink_base.nlines ) || + wbuffer_position( write.buf, 0, write.sink_base.tile_height ) || vips_threadpool_run( im, write_thread_state_new, wbuffer_allocate_fn, diff --git a/libvips/resample/shrink.c b/libvips/resample/shrink.c index 747bdd6b..67b3e255 100644 --- a/libvips/resample/shrink.c +++ b/libvips/resample/shrink.c @@ -63,8 +63,8 @@ */ /* -#define DEBUG */ +#define DEBUG #ifdef HAVE_CONFIG_H #include