experiment with thread stalling

This commit is contained in:
John Cupitt 2012-08-21 16:18:25 +01:00
parent 45e3c0bd39
commit f5f0dda551
12 changed files with 133 additions and 150 deletions

View File

@ -10,6 +10,7 @@
- better error msgs for enum args
- fix compiler warnings in production build (thanks Dmitry)
- fix spurious warnings about exif updates
- VipsSequential has an integrated cache and stalls out of order threads
20/7/12 started 7.30.0
- support "rs" mode in vips7

View File

@ -9,6 +9,9 @@
* - support skip forwards as well, so we can do extract/insert
* 10/8/12
* - add @trace option
* 21/8/12
* - remove skip forward, instead do thread stalling and have an
* integrated cache
*/
/*
@ -38,8 +41,8 @@
*/
/*
#define VIPS_DEBUG
*/
#define VIPS_DEBUG
#ifdef HAVE_CONFIG_H
#include <config.h>
@ -61,14 +64,32 @@ typedef struct _VipsSequential {
VipsImage *in;
int y_pos;
gboolean trace;
/* Lock access to y_pos with this, use the cond to wake up stalled
* threads.
*/
GMutex *lock;
GCond *ready;
int y_pos;
} VipsSequential;
typedef VipsConversionClass VipsSequentialClass;
G_DEFINE_TYPE( VipsSequential, vips_sequential, VIPS_TYPE_CONVERSION );
static void
vips_sequential_dispose( GObject *gobject )
{
VipsSequential *sequential = (VipsSequential *) gobject;
VIPS_FREEF( g_mutex_free, sequential->lock );
VIPS_FREEF( g_cond_free, sequential->ready );
G_OBJECT_CLASS( vips_sequential_parent_class )->dispose( gobject );
}
static int
vips_sequential_generate( VipsRegion *or,
void *seq, void *a, void *b, gboolean *stop )
@ -77,55 +98,53 @@ vips_sequential_generate( VipsRegion *or,
VipsRect *r = &or->valid;
VipsRegion *ir = (VipsRegion *) seq;
printf( "vips_sequential_generate: thread %p "
"request for %d lines, starting at line %d\n",
g_thread_self(),
r->height, r->top );
if( sequential->trace )
vips_diag( "VipsSequential",
"%d lines, starting at line %d", r->height, r->top );
"request for %d lines, starting at line %d",
r->height, r->top );
retry:
/* We can't go backwards, but we can skip forwards.
g_mutex_lock( sequential->lock );
if( r->top > sequential->y_pos ) {
/* This is for stuff in the future, stall.
*/
printf( "thread %p stalling ...\n", g_thread_self() );
g_cond_wait( sequential->ready, sequential->lock );
printf( "thread %p awake again, retrying ...\n",
g_thread_self() );
goto retry;
}
/* This is a request for old or present pixels -- serve from cache.
* This may trigger further, sequential reads.
*/
if( r->top < sequential->y_pos ) {
vips_error( "VipsSequential",
_( "at line %d in file, but line %d requested" ),
sequential->y_pos, r->top );
printf( "thread %p reading ...\n", g_thread_self() );
if( vips_region_prepare( ir, r ) ||
vips_region_region( or, ir, r, r->left, r->top ) ) {
g_mutex_unlock( sequential->lock );
return( -1 );
}
/* We're inside a tilecache where tiles are the full image width, so
* this should always be true.
*/
g_assert( r->left == 0 );
g_assert( r->width == or->im->Xsize );
g_assert( VIPS_RECT_BOTTOM( r ) <= or->im->Ysize );
if( VIPS_RECT_BOTTOM( r ) >= sequential->y_pos ) {
/* This request has straddled or continued on from the read
* point. Update it, and wake up all stalled threads for a
* retry.
*/
sequential->y_pos = VIPS_RECT_BOTTOM( r );
/* Skip forwards, if necessary.
*/
while( sequential->y_pos < r->top ) {
VipsRect rect;
printf( "thread %p updating y_pos and waking stalled\n",
g_thread_self() );
rect.top = sequential->y_pos;
rect.left = 0;
rect.width = or->im->Xsize;
rect.height = VIPS_MIN( r->top - sequential->y_pos,
VIPS__FATSTRIP_HEIGHT );
if( vips_region_prepare( ir, &rect ) )
return( -1 );
sequential->y_pos += rect.height;
if( sequential->trace )
vips_diag( "VipsSequential",
"skipping %d lines", rect.height );
g_cond_broadcast( sequential->ready );
}
g_assert( sequential->y_pos == r->top );
/* Pointer copy.
*/
if( vips_region_prepare( ir, r ) ||
vips_region_region( or, ir, r, r->left, r->top ) )
return( -1 );
sequential->y_pos += r->height;
g_mutex_unlock( sequential->lock );
return( 0 );
}
@ -136,6 +155,8 @@ vips_sequential_build( VipsObject *object )
VipsConversion *conversion = VIPS_CONVERSION( object );
VipsSequential *sequential = (VipsSequential *) object;
VipsImage *t;
VIPS_DEBUG_MSG( "vips_sequential_build\n" );
if( VIPS_OBJECT_CLASS( vips_sequential_parent_class )->build( object ) )
@ -144,14 +165,22 @@ vips_sequential_build( VipsObject *object )
if( vips_image_pio_input( sequential->in ) )
return( -1 );
if( vips_image_copy_fields( conversion->out, sequential->in ) )
if( vips_tilecache( sequential->in, &t,
"tile_width", sequential->in->Xsize,
"tile_height", 1,
"max_tiles", 500,
"strategy", VIPS_CACHE_SEQUENTIAL,
NULL ) )
return( -1 );
vips_demand_hint( conversion->out,
VIPS_DEMAND_STYLE_FATSTRIP, sequential->in, NULL );
vips_object_local( object, t );
if( vips_image_copy_fields( conversion->out, t ) )
return( -1 );
vips_demand_hint( conversion->out,
VIPS_DEMAND_STYLE_FATSTRIP, t, NULL );
if( vips_image_generate( conversion->out,
vips_start_one, vips_sequential_generate, vips_stop_one,
sequential->in, sequential ) )
t, sequential ) )
return( -1 );
return( 0 );
@ -165,6 +194,7 @@ vips_sequential_class_init( VipsSequentialClass *class )
VIPS_DEBUG_MSG( "vips_sequential_class_init\n" );
gobject_class->dispose = vips_sequential_dispose;
gobject_class->set_property = vips_object_set_property;
gobject_class->get_property = vips_object_get_property;
@ -190,6 +220,8 @@ static void
vips_sequential_init( VipsSequential *sequential )
{
sequential->trace = FALSE;
sequential->lock = g_mutex_new();
sequential->ready = g_cond_new();
}
/**

View File

@ -79,7 +79,7 @@ typedef struct {
int time; /* Time of last use for flush */
int x; /* xy pos in VIPS image cods */
int y;
} Tile;
} VipsTile;
typedef struct _VipsTileCache {
VipsConversion parent_instance;
@ -103,7 +103,7 @@ typedef VipsConversionClass VipsTileCacheClass;
G_DEFINE_TYPE( VipsTileCache, vips_tile_cache, VIPS_TYPE_CONVERSION );
static void
tile_destroy( Tile *tile )
vips_tile_destroy( VipsTile *tile )
{
VipsTileCache *cache = tile->cache;
@ -121,9 +121,9 @@ static void
vips_tile_cache_drop_all( VipsTileCache *cache )
{
while( cache->tiles ) {
Tile *tile = (Tile *) cache->tiles->data;
VipsTile *tile = (VipsTile *) cache->tiles->data;
tile_destroy( tile );
vips_tile_destroy( tile );
}
}
@ -138,12 +138,12 @@ vips_tile_cache_dispose( GObject *gobject )
G_OBJECT_CLASS( vips_tile_cache_parent_class )->dispose( gobject );
}
static Tile *
tile_new( VipsTileCache *cache )
static VipsTile *
vips_tile_new( VipsTileCache *cache )
{
Tile *tile;
VipsTile *tile;
if( !(tile = VIPS_NEW( NULL, Tile )) )
if( !(tile = VIPS_NEW( NULL, VipsTile )) )
return( NULL );
tile->cache = cache;
@ -156,7 +156,7 @@ tile_new( VipsTileCache *cache )
cache->ntiles += 1;
if( !(tile->region = vips_region_new( cache->in )) ) {
tile_destroy( tile );
vips_tile_destroy( tile );
return( NULL );
}
vips__region_no_ownership( tile->region );
@ -165,7 +165,7 @@ tile_new( VipsTileCache *cache )
}
static int
tile_move( Tile *tile, int x, int y )
vips_tile_move( VipsTile *tile, int x, int y )
{
VipsRect area;
@ -184,14 +184,16 @@ tile_move( Tile *tile, int x, int y )
}
/* Do we have a tile in the cache?
*
* FIXME .. use a hash?
*/
static Tile *
tile_search( VipsTileCache *cache, int x, int y )
static VipsTile *
vips_tile_search( VipsTileCache *cache, int x, int y )
{
GSList *p;
for( p = cache->tiles; p; p = p->next ) {
Tile *tile = (Tile *) p->data;
VipsTile *tile = (VipsTile *) p->data;
if( tile->x == x && tile->y == y )
return( tile );
@ -201,7 +203,7 @@ tile_search( VipsTileCache *cache, int x, int y )
}
static void
tile_touch( Tile *tile )
vips_tile_touch( VipsTile *tile )
{
g_assert( tile->cache->ntiles >= 0 );
@ -211,7 +213,7 @@ tile_touch( Tile *tile )
/* Fill a tile with pixels.
*/
static int
tile_fill( Tile *tile, VipsRegion *in )
vips_tile_fill( VipsTile *tile, VipsRegion *in )
{
VipsRect area;
@ -226,7 +228,7 @@ tile_fill( Tile *tile, VipsRegion *in )
&area, area.left, area.top ) )
return( -1 );
tile_touch( tile );
vips_tile_touch( tile );
return( 0 );
}
@ -234,17 +236,17 @@ tile_fill( Tile *tile, VipsRegion *in )
/* Find existing tile, make a new tile, or if we have a full set of tiles,
* reuse LRU.
*/
static Tile *
tile_find( VipsTileCache *cache, VipsRegion *in, int x, int y )
static VipsTile *
vips_tile_find( VipsTileCache *cache, VipsRegion *in, int x, int y )
{
Tile *tile;
VipsTile *tile;
int oldest, topmost;
GSList *p;
/* In cache already?
*/
if( (tile = tile_search( cache, x, y )) ) {
tile_touch( tile );
if( (tile = vips_tile_search( cache, x, y )) ) {
vips_tile_touch( tile );
return( tile );
}
@ -253,9 +255,9 @@ tile_find( VipsTileCache *cache, VipsRegion *in, int x, int y )
*/
if( cache->max_tiles == -1 ||
cache->ntiles < cache->max_tiles ) {
if( !(tile = tile_new( cache )) ||
tile_move( tile, x, y ) ||
tile_fill( tile, in ) )
if( !(tile = vips_tile_new( cache )) ||
vips_tile_move( tile, x, y ) ||
vips_tile_fill( tile, in ) )
return( NULL );
return( tile );
@ -268,7 +270,7 @@ tile_find( VipsTileCache *cache, VipsRegion *in, int x, int y )
oldest = cache->time;
tile = NULL;
for( p = cache->tiles; p; p = p->next ) {
Tile *t = (Tile *) p->data;
VipsTile *t = (VipsTile *) p->data;
if( t->time < oldest ) {
oldest = t->time;
@ -281,7 +283,7 @@ tile_find( VipsTileCache *cache, VipsRegion *in, int x, int y )
topmost = cache->in->Ysize;
tile = NULL;
for( p = cache->tiles; p; p = p->next ) {
Tile *t = (Tile *) p->data;
VipsTile *t = (VipsTile *) p->data;
if( t->y < topmost ) {
topmost = t->y;
@ -298,35 +300,13 @@ tile_find( VipsTileCache *cache, VipsRegion *in, int x, int y )
VIPS_DEBUG_MSG( "tilecache: reusing tile %d x %d\n", tile->x, tile->y );
if( tile_move( tile, x, y ) ||
tile_fill( tile, in ) )
if( vips_tile_move( tile, x, y ) ||
vips_tile_fill( tile, in ) )
return( NULL );
return( tile );
}
/* Copy rect from @from to @to.
*/
static void
copy_region( VipsRegion *from, VipsRegion *to, VipsRect *area )
{
int y;
/* Area should be inside both from and to.
*/
g_assert( vips_rect_includesrect( &from->valid, area ) );
g_assert( vips_rect_includesrect( &to->valid, area ) );
/* Loop down common area, copying.
*/
for( y = area->top; y < VIPS_RECT_BOTTOM( area ); y++ ) {
VipsPel *p = VIPS_REGION_ADDR( from, area->left, y );
VipsPel *q = VIPS_REGION_ADDR( to, area->left, y );
memcpy( q, p, VIPS_IMAGE_SIZEOF_PEL( from->im ) * area->width );
}
}
/* Generate func.
*/
static int
@ -353,7 +333,7 @@ vips_tile_cache_gen( VipsRegion *or,
* the output region directly to the tile.
*
* However this would mean that tile drop on minimise could then leave
* dangling pointers, if minimuse was called on an active pipeline.
* dangling pointers, if minimise were called on an active pipeline.
*/
VIPS_DEBUG_MSG( "vips_tile_cache_gen: "
@ -362,11 +342,11 @@ vips_tile_cache_gen( VipsRegion *or,
for( y = ys; y < VIPS_RECT_BOTTOM( r ); y += th )
for( x = xs; x < VIPS_RECT_RIGHT( r ); x += tw ) {
Tile *tile;
VipsTile *tile;
VipsRect tarea;
VipsRect hit;
if( !(tile = tile_find( cache, in, x, y )) ) {
if( !(tile = vips_tile_find( cache, in, x, y )) ) {
g_mutex_unlock( cache->lock );
return( -1 );
}
@ -381,8 +361,8 @@ vips_tile_cache_gen( VipsRegion *or,
/* The part of the tile that we need.
*/
vips_rect_intersectrect( &tarea, r, &hit );
copy_region( tile->region, or, &hit );
vips_region_copy( tile->region, or, &hit,
hit.left, hit.top );
}
g_mutex_unlock( cache->lock );

View File

@ -956,35 +956,6 @@ vips_foreign_load_init( VipsForeignLoad *load )
load->disc = TRUE;
}
/* Make a sequential cache for a file reader.
*/
int
vips_foreign_tilecache( VipsImage *in, VipsImage **out, int strip_height )
{
int tile_width;
int tile_height;
int nlines;
int nstrips;
vips_get_tile_size( in, &tile_width, &tile_height, &nlines );
/* We need two buffers, each with enough strips to make a complete
* buffer. And double to be safe, since input buffers must be larger
* than output, and our buffers may not align exactly.
*/
nstrips = 2 * (1 + nlines / strip_height);
if( vips_tilecache( in, out,
"tile_width", in->Xsize,
"tile_height", strip_height * nstrips,
"max_tiles", 2,
"strategy", VIPS_CACHE_SEQUENTIAL,
NULL ) )
return( -1 );
return( 0 );
}
/* Abstract base class for image savers.
*/

View File

@ -915,11 +915,10 @@ read_jpeg_image( ReadJpeg *jpeg, VipsImage *out )
#endif /*DEBUG*/
if( vips_image_generate( t[0],
NULL, read_jpeg_generate, NULL,
jpeg, NULL ) ||
NULL, read_jpeg_generate, NULL,
jpeg, NULL ) ||
vips_sequential( t[0], &t[1], NULL ) ||
vips_foreign_tilecache( t[1], &t[2], 8 ) ||
vips_image_write( t[2], out ) )
vips_image_write( t[1], out ) )
return( -1 );
return( 0 );

View File

@ -1429,8 +1429,7 @@ read_stripwise( ReadTiff *rtiff, VipsImage *out )
NULL, tiff2vips_stripwise_generate, NULL,
rtiff, tbuf ) ||
vips_sequential( t[0], &t[1], NULL ) ||
vips_foreign_tilecache( t[1], &t[2], rtiff->rows_per_strip ) ||
vips_image_write( t[2], out ) )
vips_image_write( t[1], out ) )
return( -1 );
return( 0 );

View File

@ -75,8 +75,8 @@
*/
/*
#define DEBUG
*/
#define DEBUG
#ifdef HAVE_CONFIG_H
#include <config.h>
@ -490,8 +490,7 @@ vips__png_read( const char *name, VipsImage *out )
NULL, png2vips_generate, NULL,
read, NULL ) ||
vips_sequential( t[0], &t[1], NULL ) ||
vips_foreign_tilecache( t[1], &t[2], 16 ) ||
vips_image_write( t[2], out ) )
vips_image_write( t[1], out ) )
return( -1 );
}

View File

@ -139,10 +139,6 @@ int vips__sizealike( VipsImage *in1, VipsImage *in2,
int vips__bandalike( const char *domain,
VipsImage *in1, VipsImage *in2, VipsImage **out1, VipsImage **out2 );
int vips_foreign_tilecache( VipsImage *in, VipsImage **out, int strip_height );
void im__format_init( void );

View File

@ -281,8 +281,6 @@ vips_operation_class_init( VipsOperationClass *class )
static void
vips_operation_init( VipsOperation *operation )
{
/* Init our instance fields.
*/
}
/**

View File

@ -87,8 +87,14 @@ vips_semaphore_upn( VipsSemaphore *s, int n )
s->v += n;
value_after_op = s->v;
#ifdef HAVE_THREADS
/* If we are only incrementing by one, we only need to wake a single
* thread. If we are incrementing by a lot, we must wake all threads.
*/
if( n == 1 )
g_cond_signal( s->cond );
else
g_cond_broadcast( s->cond );
g_mutex_unlock( s->mutex );
g_cond_signal( s->cond );
#endif /*HAVE_THREADS*/
#ifdef DEBUG_IO

View File

@ -38,8 +38,8 @@
*/
/*
#define VIPS_DEBUG
*/
#define VIPS_DEBUG
#ifdef HAVE_CONFIG_H
#include <config.h>
@ -337,8 +337,8 @@ wbuffer_allocate_fn( VipsThreadState *state, void *a, gboolean *stop )
sink_base->y += sink_base->tile_height;
if( sink_base->y >= VIPS_RECT_BOTTOM( &write->buf->area ) ) {
/* Block until the last write is done, then set write
* of the front buffer going.
/* Block until the write of the previous buffer
* is done, then set write of this buffer going.
*/
if( wbuffer_flush( write ) )
return( -1 );
@ -365,7 +365,8 @@ wbuffer_allocate_fn( VipsThreadState *state, void *a, gboolean *stop )
/* Position buf at the new y.
*/
if( wbuffer_position( write->buf,
sink_base->y, sink_base->nlines ) )
//sink_base->y, sink_base->nlines ) )
sink_base->y, sink_base->tile_height ) )
return( -1 );
}
}
@ -495,7 +496,8 @@ vips_sink_disc( VipsImage *im, VipsRegionWrite write_fn, void *a )
result = 0;
if( !write.buf ||
!write.buf_back ||
wbuffer_position( write.buf, 0, write.sink_base.nlines ) ||
//wbuffer_position( write.buf, 0, write.sink_base.nlines ) ||
wbuffer_position( write.buf, 0, write.sink_base.tile_height ) ||
vips_threadpool_run( im,
write_thread_state_new,
wbuffer_allocate_fn,

View File

@ -63,8 +63,8 @@
*/
/*
#define DEBUG
*/
#define DEBUG
#ifdef HAVE_CONFIG_H
#include <config.h>