diff --git a/ChangeLog b/ChangeLog index e39b27bc..1a318b9d 100644 --- a/ChangeLog +++ b/ChangeLog @@ -13,6 +13,10 @@ master - add ".pnm" save [ewelot] - threaded tiff jp2k and jpeg decompress - improve speed and efficiency of animated WebP write [dloebl] +- add rgb mode to openslideload +- new thread recycler +- threadpools size dynamically with load +- operations can hint threadpool size - support for N-colour ICC profiles 11/10/22 started 8.13.3 diff --git a/libvips/colour/colour.c b/libvips/colour/colour.c index 96121ce0..2bef067b 100644 --- a/libvips/colour/colour.c +++ b/libvips/colour/colour.c @@ -234,6 +234,13 @@ vips_colour_gen( VipsRegion *or, int i, y; VipsPel *p[MAX_INPUT_IMAGES], *q; + /* + printf( "vips_colour_gen: %s, " + "left = %d, top = %d, width = %d, height = %d\n", + VIPS_OBJECT_CLASS( class )->nickname, + r->left, r->top, r->width, r->height ); + */ + if( vips_reorder_prepare_many( or->im, ir, r ) ) return( -1 ); diff --git a/libvips/conversion/sequential.c b/libvips/conversion/sequential.c index 1b3bce4a..01e5b747 100644 --- a/libvips/conversion/sequential.c +++ b/libvips/conversion/sequential.c @@ -126,7 +126,7 @@ vips_sequential_generate( VipsRegion *or, VIPS_GATE_START( "vips_sequential_generate: wait" ); - g_mutex_lock( sequential->lock ); + vips__worker_lock( sequential->lock ); VIPS_GATE_STOP( "vips_sequential_generate: wait" ); diff --git a/libvips/conversion/tilecache.c b/libvips/conversion/tilecache.c index fe261eeb..72453810 100644 --- a/libvips/conversion/tilecache.c +++ b/libvips/conversion/tilecache.c @@ -620,7 +620,7 @@ vips_tile_cache_gen( VipsRegion *or, VIPS_GATE_START( "vips_tile_cache_gen: wait1" ); - g_mutex_lock( cache->lock ); + vips__worker_lock( cache->lock ); VIPS_GATE_STOP( "vips_tile_cache_gen: wait1" ); @@ -895,7 +895,7 @@ vips_line_cache_gen( VipsRegion *or, VIPS_GATE_START( "vips_line_cache_gen: wait" ); - g_mutex_lock( block_cache->lock ); + vips__worker_lock( block_cache->lock ); VIPS_GATE_STOP( "vips_line_cache_gen: wait" ); diff --git a/libvips/create/black.c b/libvips/create/black.c index 84ef17cc..eca30999 100644 --- a/libvips/create/black.c +++ b/libvips/create/black.c @@ -83,6 +83,14 @@ static int vips_black_gen( VipsRegion *or, void *seq, void *a, void *b, gboolean *stop ) { + /* + VipsRect *r = &or->valid; + + printf( "vips_black_gen: " + "left = %d, top = %d, width = %d, height = %d\n", + r->left, r->top, r->width, r->height ); + */ + vips_region_black( or ); return( 0 ); diff --git a/libvips/foreign/dzsave.c b/libvips/foreign/dzsave.c index 5b3fd713..55ccbd37 100644 --- a/libvips/foreign/dzsave.c +++ b/libvips/foreign/dzsave.c @@ -1900,6 +1900,8 @@ strip_work( VipsThreadState *state, void *a ) g_mutex_unlock( vips__global_lock ); + vips_image_set_int( x, VIPS_META_CONCURRENCY, 1 ); + if( write_image( dz, out, x, dz->suffix ) ) { g_object_unref( out ); g_object_unref( x ); diff --git a/libvips/foreign/fits.c b/libvips/foreign/fits.c index f4d973e7..9d1dae77 100644 --- a/libvips/foreign/fits.c +++ b/libvips/foreign/fits.c @@ -450,11 +450,13 @@ fits2vips_generate( VipsRegion *out, q = VIPS_REGION_ADDR( out, r->left, r->top ); - g_mutex_lock( fits->lock ); + vips__worker_lock( fits->lock ); + if( vips_fits_read_subset( fits, fpixel, lpixel, inc, q ) ) { g_mutex_unlock( fits->lock ); return( -1 ); } + g_mutex_unlock( fits->lock ); } else { @@ -478,12 +480,14 @@ fits2vips_generate( VipsRegion *out, q = VIPS_REGION_ADDR( out, r->left, y ); - g_mutex_lock( fits->lock ); + vips__worker_lock( fits->lock ); + if( vips_fits_read_subset( fits, fpixel, lpixel, inc, q ) ) { g_mutex_unlock( fits->lock ); return( -1 ); } + g_mutex_unlock( fits->lock ); } } diff --git a/libvips/foreign/magick2vips.c b/libvips/foreign/magick2vips.c index bc6672a4..9d052f2a 100644 --- a/libvips/foreign/magick2vips.c +++ b/libvips/foreign/magick2vips.c @@ -754,9 +754,11 @@ magick_fill_region( VipsRegion *out, PixelPacket *pixels; - g_mutex_lock( read->lock ); + vips__worker_lock( read->lock ); + pixels = get_pixels( read->frames[frame], r->left, line, r->width, 1 ); + g_mutex_unlock( read->lock ); if( !pixels ) { diff --git a/libvips/foreign/magick7load.c b/libvips/foreign/magick7load.c index 3c3543c5..5a2e330d 100644 --- a/libvips/foreign/magick7load.c +++ b/libvips/foreign/magick7load.c @@ -676,10 +676,12 @@ vips_foreign_load_magick7_fill_region( VipsRegion *or, Quantum * restrict p; VipsPel * restrict q; - g_mutex_lock( magick7->lock ); + vips__worker_lock( magick7->lock ); + p = GetCacheViewAuthenticPixels( magick7->cache_view[frame], r->left, line, r->width, 1, magick7->exception ); + g_mutex_unlock( magick7->lock ); if( !p ) diff --git a/libvips/foreign/openslideload.c b/libvips/foreign/openslideload.c index 16bffae7..32e50c9d 100644 --- a/libvips/foreign/openslideload.c +++ b/libvips/foreign/openslideload.c @@ -110,6 +110,7 @@ typedef struct { gboolean autocrop; char *associated; gboolean attach_associated; + gboolean rgb; openslide_t *osr; @@ -237,7 +238,7 @@ get_bounds( openslide_t *osr, VipsRect *rect ) static ReadSlide * readslide_new( const char *filename, VipsImage *out, int level, gboolean autocrop, - const char *associated, gboolean attach_associated ) + const char *associated, gboolean attach_associated, gboolean rgb ) { ReadSlide *rslide; @@ -268,6 +269,7 @@ readslide_new( const char *filename, VipsImage *out, rslide->autocrop = autocrop; rslide->associated = g_strdup( associated ); rslide->attach_associated = attach_associated; + rslide->rgb = rgb; /* Non-crazy defaults, override in _parse() if we can. */ @@ -289,11 +291,11 @@ readslide_new( const char *filename, VipsImage *out, * compatibility with older vipses. */ static void -argb2rgba( uint32_t * restrict buf, int n, uint32_t bg ) +argb2rgba( uint32_t * restrict buf, int64_t n, uint32_t bg ) { const uint32_t pbg = GUINT32_TO_BE( (bg << 8) | 255 ); - int i; + int64_t i; for( i = 0; i < n; i++ ) { uint32_t * restrict p = buf + i; @@ -318,6 +320,90 @@ argb2rgba( uint32_t * restrict buf, int n, uint32_t bg ) } } +/* Convert from ARGB to RGB. In RGB mode, assume a is always 255. + */ +static void +argb2rgb( uint32_t * restrict buf, VipsPel *restrict q, int64_t n ) +{ + int64_t i; + + for( i = 0; i < n; i++ ) { + uint32_t x = buf[i]; + + q[0] = ((x >> 16) & 0xff); + q[1] = ((x >> 8) & 0xff); + q[2] = (x & 0xff); + q += 3; + } +} + +static VipsImage * +vips__openslide_get_associated( ReadSlide *rslide, const char *associated_name ) +{ + VipsImage *associated; + int64_t w, h; + const char *error; + + associated = vips_image_new_memory(); + openslide_get_associated_image_dimensions( rslide->osr, + associated_name, &w, &h ); + + /* Always 4 bands, since this is the image that gets the ARGB from + * cairo. + */ + vips_image_init_fields( associated, w, h, 4, + VIPS_FORMAT_UCHAR, + VIPS_CODING_NONE, VIPS_INTERPRETATION_sRGB, 1.0, 1.0 ); + if( vips_image_pipelinev( associated, + VIPS_DEMAND_STYLE_THINSTRIP, NULL ) || + vips_image_write_prepare( associated ) ) { + g_object_unref( associated ); + return( NULL ); + } + + openslide_read_associated_image( rslide->osr, + associated_name, + (uint32_t *) VIPS_IMAGE_ADDR( associated, 0, 0 ) ); + error = openslide_get_error( rslide->osr ); + if( error ) { + vips_error( "openslide2vips", + _( "reading associated image: %s" ), error ); + g_object_unref( associated ); + return( NULL ); + } + + /* In RGB mode we make a second RGB image and repack to that. + */ + if( rslide->rgb ) { + VipsImage *rgb; + + rgb = vips_image_new_memory(); + vips_object_local( rgb, associated ); + + vips_image_init_fields( rgb, w, h, 3, + VIPS_FORMAT_UCHAR, + VIPS_CODING_NONE, VIPS_INTERPRETATION_sRGB, 1.0, 1.0 ); + if( vips_image_pipelinev( rgb, + VIPS_DEMAND_STYLE_THINSTRIP, NULL ) || + vips_image_write_prepare( rgb ) ) { + g_object_unref( rgb ); + return( NULL ); + } + + argb2rgb( (uint32_t *) VIPS_IMAGE_ADDR( associated, 0, 0 ), + VIPS_IMAGE_ADDR( rgb, 0, 0 ), w * h ); + + associated = rgb; + } + else + /* We can do this in place. + */ + argb2rgba( (uint32_t *) VIPS_IMAGE_ADDR( associated, 0, 0 ), + w * h, rslide->bg ); + + return( associated ); +} + static int readslide_attach_associated( ReadSlide *rslide, VipsImage *image ) { @@ -326,38 +412,17 @@ readslide_attach_associated( ReadSlide *rslide, VipsImage *image ) for( associated_name = openslide_get_associated_image_names( rslide->osr ); *associated_name != NULL; associated_name++ ) { - int64_t w, h; VipsImage *associated; - uint32_t *p; - const char *error; - char buf[256]; + char buf[256]; - associated = vips_image_new_memory(); - openslide_get_associated_image_dimensions( rslide->osr, - *associated_name, &w, &h ); - vips_image_init_fields( associated, w, h, 4, VIPS_FORMAT_UCHAR, - VIPS_CODING_NONE, VIPS_INTERPRETATION_sRGB, 1.0, 1.0 ); - if( vips_image_pipelinev( associated, - VIPS_DEMAND_STYLE_THINSTRIP, NULL ) || - vips_image_write_prepare( associated ) ) { - g_object_unref( associated ); - return( -1 ); - } - p = (uint32_t *) VIPS_IMAGE_ADDR( associated, 0, 0 ); - openslide_read_associated_image( rslide->osr, - *associated_name, p ); - error = openslide_get_error( rslide->osr ); - if( error ) { - vips_error( "openslide2vips", - _( "reading associated image: %s" ), error ); - g_object_unref( associated ); - return( -1 ); - } - argb2rgba( p, w * h, rslide->bg ); + if( !(associated = vips__openslide_get_associated( rslide, + *associated_name )) ) + return( -1 ); vips_snprintf( buf, 256, "openslide.associated.%s", *associated_name ); vips_image_set_image( image, buf, associated ); + g_object_unref( associated ); } @@ -542,8 +607,9 @@ readslide_parse( ReadSlide *rslide, VipsImage *image ) "slide-associated-images", associated_names ); VIPS_FREE( associated_names ); - vips_image_init_fields( image, w, h, 4, VIPS_FORMAT_UCHAR, - VIPS_CODING_NONE, VIPS_INTERPRETATION_sRGB, xres, yres ); + vips_image_init_fields( image, w, h, rslide->rgb ? 3 : 4, + VIPS_FORMAT_UCHAR, VIPS_CODING_NONE, + VIPS_INTERPRETATION_sRGB, xres, yres ); return( 0 ); } @@ -551,28 +617,46 @@ readslide_parse( ReadSlide *rslide, VipsImage *image ) static int vips__openslide_read_header( const char *filename, VipsImage *out, int level, gboolean autocrop, - char *associated, gboolean attach_associated ) + char *associated, gboolean attach_associated, gboolean rgb ) { ReadSlide *rslide; if( !(rslide = readslide_new( filename, - out, level, autocrop, associated, attach_associated )) || + out, level, autocrop, associated, attach_associated, rgb )) || readslide_parse( rslide, out ) ) return( -1 ); return( 0 ); } +/* Allocate a tile buffer. Have one of these for each thread so we can unpack + * to vips in parallel. + */ +static void * +vips__openslide_start( VipsImage *out, void *a, void *b ) +{ + ReadSlide *rslide = (ReadSlide *) a; + + uint32_t *tile_buffer; + + if( !(tile_buffer = VIPS_MALLOC( NULL, + (size_t) rslide->tile_width * rslide->tile_height * 4 )) ) + return( NULL ); + + return( (void *) tile_buffer ); +} + static int vips__openslide_generate( VipsRegion *out, void *_seq, void *_rslide, void *unused, gboolean *stop ) { + uint32_t *tile_buffer = (uint32_t *) _seq; ReadSlide *rslide = _rslide; uint32_t bg = rslide->bg; VipsRect *r = &out->valid; int n = r->width * r->height; - uint32_t *buf = (uint32_t *) VIPS_REGION_ADDR( out, r->left, r->top ); + uint32_t *buf; const char *error; VIPS_DEBUG_MSG( "vips__openslide_generate: %dx%d @ %dx%d\n", @@ -586,10 +670,21 @@ vips__openslide_generate( VipsRegion *out, g_assert( r->width <= rslide->tile_width ); g_assert( r->height <= rslide->tile_height ); - /* The memory on the region should be contiguous for our ARGB->RGBA - * loop below. - */ - g_assert( VIPS_REGION_LSKIP( out ) == r->width * 4 ); + /* The memory on the region should be contiguous. + */ + g_assert( VIPS_REGION_LSKIP( out ) == r->width * out->im->Bands ); + + /* In RGB mode we need to read to the tile buffer. + */ + if( rslide->rgb ) { + g_assert( tile_buffer ); + g_assert( rslide->tile_width >= r->width ); + g_assert( rslide->tile_height >= r->height ); + + buf = tile_buffer; + } + else + buf = (uint32_t *) VIPS_REGION_ADDR( out, r->left, r->top ); openslide_read_region( rslide->osr, buf, @@ -612,16 +707,29 @@ vips__openslide_generate( VipsRegion *out, return( -1 ); } - /* Since we are inside a cache, we know buf must be continuous. - */ - argb2rgba( buf, n, bg ); + if( rslide->rgb ) + argb2rgb( tile_buffer, + VIPS_REGION_ADDR( out, r->left, r->top ), n ); + else + argb2rgba( buf, n, bg ); + + return( 0 ); +} + +static int +vips__openslide_stop( void *_seq, void *a, void *b ) +{ + uint32_t *tile_buffer = (uint32_t *) _seq; + + VIPS_FREE( tile_buffer ); return( 0 ); } static int vips__openslide_read( const char *filename, VipsImage *out, - int level, gboolean autocrop, gboolean attach_associated ) + int level, gboolean autocrop, gboolean attach_associated, + gboolean rgb ) { ReadSlide *rslide; VipsImage *raw; @@ -631,7 +739,7 @@ vips__openslide_read( const char *filename, VipsImage *out, filename, level ); if( !(rslide = readslide_new( filename, out, level, autocrop, - NULL, attach_associated )) ) + NULL, attach_associated, rgb )) ) return( -1 ); raw = vips_image_new(); @@ -639,7 +747,9 @@ vips__openslide_read( const char *filename, VipsImage *out, if( readslide_parse( rslide, raw ) || vips_image_generate( raw, - NULL, vips__openslide_generate, NULL, rslide, NULL ) ) + vips__openslide_start, + vips__openslide_generate, + vips__openslide_stop, rslide, NULL ) ) return( -1 ); /* Copy to out, adding a cache. Enough tiles for two complete rows, @@ -665,41 +775,27 @@ vips__openslide_read( const char *filename, VipsImage *out, static int vips__openslide_read_associated( const char *filename, VipsImage *out, - const char *associated ) + const char *associated_name, gboolean rgb ) { ReadSlide *rslide; - VipsImage *raw; - uint32_t *buf; - const char *error; + VipsImage *associated; VIPS_DEBUG_MSG( "vips__openslide_read_associated: %s %s\n", - filename, associated ); + filename, associated_name ); - if( !(rslide = readslide_new( filename, out, 0, FALSE, - associated, FALSE )) ) + if( !(rslide = readslide_new( filename, + out, 0, FALSE, associated_name, FALSE, rgb )) ) return( -1 ); - /* Memory buffer. Get associated directly to this, then copy to out. - */ - raw = vips_image_new_memory(); - vips_object_local( out, raw ); + if( !(associated = vips__openslide_get_associated( rslide, + associated_name )) ) + return( -1 ); - if( readslide_parse( rslide, raw ) || - vips_image_write_prepare( raw ) ) - return( -1 ); - - buf = (uint32_t *) VIPS_IMAGE_ADDR( raw, 0, 0 ); - openslide_read_associated_image( rslide->osr, rslide->associated, buf ); - error = openslide_get_error( rslide->osr ); - if( error ) { - vips_error( "openslide2vips", - _( "reading associated image: %s" ), error ); - return( -1 ); - } - argb2rgba( buf, raw->Xsize * raw->Ysize, rslide->bg ); - - if( vips_image_write( raw, out ) ) + if( vips_image_write( associated, out ) ) { + VIPS_UNREF( associated ); return( -1 ); + } + VIPS_UNREF( associated ); return( 0 ); } @@ -731,6 +827,10 @@ typedef struct _VipsForeignLoadOpenslide { */ gboolean attach_associated; + /* Read as RGB, not RGBA. + */ + gboolean rgb; + } VipsForeignLoadOpenslide; typedef VipsForeignLoadClass VipsForeignLoadOpenslideClass; @@ -826,7 +926,8 @@ vips_foreign_load_openslide_header( VipsForeignLoad *load ) if( vips__openslide_read_header( openslide->filename, load->out, openslide->level, openslide->autocrop, - openslide->associated, openslide->attach_associated ) ) + openslide->associated, openslide->attach_associated, + openslide->rgb ) ) return( -1 ); VIPS_SETSTR( load->out->filename, openslide->filename ); @@ -842,12 +943,13 @@ vips_foreign_load_openslide_load( VipsForeignLoad *load ) if( !openslide->associated ) { if( vips__openslide_read( openslide->filename, load->real, openslide->level, openslide->autocrop, - openslide->attach_associated ) ) + openslide->attach_associated, + openslide->rgb ) ) return( -1 ); } else { if( vips__openslide_read_associated( openslide->filename, - load->real, openslide->associated ) ) + load->real, openslide->associated, openslide->rgb ) ) return( -1 ); } @@ -917,13 +1019,20 @@ vips_foreign_load_openslide_class_init( VipsForeignLoadOpenslideClass *class ) G_STRUCT_OFFSET( VipsForeignLoadOpenslide, associated ), NULL ); - VIPS_ARG_BOOL( class, "attach_associated", 13, + VIPS_ARG_BOOL( class, "attach_associated", 23, _( "Attach associated" ), _( "Attach all associated images" ), VIPS_ARGUMENT_OPTIONAL_INPUT, G_STRUCT_OFFSET( VipsForeignLoadOpenslide, attach_associated ), FALSE ); + VIPS_ARG_BOOL( class, "rgb", 24, + _( "RGB" ), + _( "Output RGB (not RGBA)" ), + VIPS_ARGUMENT_OPTIONAL_INPUT, + G_STRUCT_OFFSET( VipsForeignLoadOpenslide, rgb ), + FALSE ); + } static void diff --git a/libvips/foreign/pdfiumload.c b/libvips/foreign/pdfiumload.c index 1c195434..7f622469 100644 --- a/libvips/foreign/pdfiumload.c +++ b/libvips/foreign/pdfiumload.c @@ -572,7 +572,7 @@ vips_foreign_load_pdf_generate( VipsRegion *or, if( vips_foreign_load_pdf_get_page( pdf, pdf->page_no + i ) ) return( -1 ); - g_mutex_lock( vips_pdfium_mutex ); + vips__worker_lock( vips_pdfium_mutex ); /* 4 means RGBA. */ diff --git a/libvips/include/vips/header.h b/libvips/include/vips/header.h index 502bf329..6e4294f2 100644 --- a/libvips/include/vips/header.h +++ b/libvips/include/vips/header.h @@ -157,6 +157,13 @@ extern "C" { */ #define VIPS_META_N_SUBIFDS "n-subifds" +/** + * VIPS_META_CONCURRENCY: + * + * If set, the suggested concurrency for this image. + */ +#define VIPS_META_CONCURRENCY "concurrency" + VIPS_API guint64 vips_format_sizeof( VipsBandFormat format ); VIPS_API @@ -207,6 +214,8 @@ int vips_image_get_orientation( VipsImage *image ); VIPS_API gboolean vips_image_get_orientation_swap( VipsImage *image ); VIPS_API +int vips_image_get_concurrency( VipsImage *image, int default_concurrency ); +VIPS_API const void *vips_image_get_data( VipsImage *image ); VIPS_API diff --git a/libvips/include/vips/internal.h b/libvips/include/vips/internal.h index 363ef829..a9f2802f 100644 --- a/libvips/include/vips/internal.h +++ b/libvips/include/vips/internal.h @@ -143,9 +143,11 @@ extern char *vips__disc_threshold; extern gboolean vips__cache_dump; extern gboolean vips__cache_trace; +void vips__thread_init( void ); void vips__threadpool_init( void ); void vips__threadpool_shutdown( void ); int vips__thread_execute( const char *name, GFunc func, gpointer data ); +VIPS_API void vips__worker_lock( GMutex *mutex ); void vips__cache_init( void ); diff --git a/libvips/include/vips/thread.h b/libvips/include/vips/thread.h index 964c94f2..ac3cd528 100644 --- a/libvips/include/vips/thread.h +++ b/libvips/include/vips/thread.h @@ -59,7 +59,13 @@ VIPS_API void *vips_g_thread_join( GThread *thread ); VIPS_API -gboolean vips_thread_isworker( void ); +gboolean vips_thread_isvips( void ); + +typedef struct _VipsThreadset VipsThreadset; +VipsThreadset *vips_threadset_new( int max_threads ); +int vips_threadset_run( VipsThreadset *set, + const char *domain, GFunc func, gpointer data ); +void vips_threadset_free( VipsThreadset *set ); #ifdef __cplusplus } diff --git a/libvips/iofuncs/buffer.c b/libvips/iofuncs/buffer.c index ecbc00b2..f7a6f1df 100644 --- a/libvips/iofuncs/buffer.c +++ b/libvips/iofuncs/buffer.c @@ -316,8 +316,10 @@ buffer_thread_get( void ) { VipsBufferThread *buffer_thread; - if( vips_thread_isworker() ) { - /* Workers get a private set of buffers. + if( vips_thread_isvips() ) { + /* Our threads get a set of private buffers, since we know we + * will be calling vips_thread_shutdown() on thread + * termination. */ if( !(buffer_thread = g_private_get( buffer_thread_key )) ) { buffer_thread = buffer_thread_new(); @@ -327,7 +329,7 @@ buffer_thread_get( void ) g_assert( buffer_thread->thread == g_thread_self() ); } else - /* Non-workers don't have one. + /* Non-vips threads don't have one. */ buffer_thread = NULL; diff --git a/libvips/iofuncs/header.c b/libvips/iofuncs/header.c index b6e0b50f..49f46b8b 100644 --- a/libvips/iofuncs/header.c +++ b/libvips/iofuncs/header.c @@ -951,6 +951,30 @@ vips_image_get_n_pages( VipsImage *image ) return( 1 ); } +/** + * vips_image_get_concurrency: + * @image: image to get from + * + * Fetch and sanity-check #VIPS_CONCURRENCY. Default to 1 if not present or + * crazy. + * + * Returns: the suggested concurrency for this image + */ +int +vips_image_get_concurrency( VipsImage *image, int default_concurrency ) +{ + int concurrency; + + if( vips_image_get_typeof( image, VIPS_META_CONCURRENCY ) && + !vips_image_get_int( image, + VIPS_META_CONCURRENCY, &concurrency ) && + concurrency >= 1 && + concurrency < 100 ) + return( concurrency ); + + return( default_concurrency ); +} + /** * vips_image_get_n_subifds: * @image: image to get from diff --git a/libvips/iofuncs/init.c b/libvips/iofuncs/init.c index 3346b6bc..93f0b17b 100644 --- a/libvips/iofuncs/init.c +++ b/libvips/iofuncs/init.c @@ -505,6 +505,7 @@ vips_init( const char *argv0 ) (void) _setmaxstdio( 2048 ); #endif /*G_OS_WIN32*/ + vips__thread_init(); vips__threadpool_init(); vips__buffer_init(); vips__meta_init(); diff --git a/libvips/iofuncs/meson.build b/libvips/iofuncs/meson.build index 9d85774a..d96fdbf9 100644 --- a/libvips/iofuncs/meson.build +++ b/libvips/iofuncs/meson.build @@ -1,4 +1,7 @@ iofuncs_sources = files( + 'thread.c', + 'threadset.c', + 'threadpool.c', 'ginputsource.c', 'sourceginput.c', 'connection.c', @@ -28,7 +31,6 @@ iofuncs_sources = files( 'region.c', 'rect.c', 'semaphore.c', - 'threadpool.c', 'util.c', 'init.c', 'buf.c', diff --git a/libvips/iofuncs/sink.c b/libvips/iofuncs/sink.c index 5a75012e..dcd2f5a5 100644 --- a/libvips/iofuncs/sink.c +++ b/libvips/iofuncs/sink.c @@ -73,10 +73,6 @@ typedef struct _Sink { */ VipsImage *t; - /* Mutex for serialising calls to VipsStartFn and VipsStopFn. - */ - GMutex *sslock; - /* Call params. */ VipsStartFn start_fn; @@ -257,16 +253,8 @@ sink_call_stop( Sink *sink, SinkThreadState *state ) VIPS_DEBUG_MSG( "sink_call_stop: state = %p\n", state ); - VIPS_GATE_START( "sink_call_stop: wait" ); - - g_mutex_lock( sink->sslock ); - - VIPS_GATE_STOP( "sink_call_stop: wait" ); - result = sink->stop_fn( state->seq, sink->a, sink->b ); - g_mutex_unlock( sink->sslock ); - if( result ) { SinkBase *sink_base = (SinkBase *) sink; @@ -302,16 +290,8 @@ sink_call_start( Sink *sink, SinkThreadState *state ) if( !state->seq && sink->start_fn ) { VIPS_DEBUG_MSG( "sink_call_start: state = %p\n", state ); - VIPS_GATE_START( "sink_call_start: wait" ); - - g_mutex_lock( sink->sslock ); - - VIPS_GATE_STOP( "sink_call_start: wait" ); - state->seq = sink->start_fn( sink->t, sink->a, sink->b ); - g_mutex_unlock( sink->sslock ); - if( !state->seq ) { SinkBase *sink_base = (SinkBase *) sink; @@ -370,7 +350,6 @@ vips_sink_thread_state_new( VipsImage *im, void *a ) static void sink_free( Sink *sink ) { - VIPS_FREEF( vips_g_mutex_free, sink->sslock ); VIPS_FREEF( sink_area_free, sink->area ); VIPS_FREEF( sink_area_free, sink->old_area ); VIPS_FREEF( g_object_unref, sink->t ); @@ -406,7 +385,6 @@ sink_init( Sink *sink, vips_sink_base_init( &sink->sink_base, image ); sink->t = NULL; - sink->sslock = vips_g_mutex_new(); sink->start_fn = start_fn; sink->generate_fn = generate_fn; sink->stop_fn = stop_fn; diff --git a/libvips/iofuncs/thread.c b/libvips/iofuncs/thread.c new file mode 100644 index 00000000..ae75f6b2 --- /dev/null +++ b/libvips/iofuncs/thread.c @@ -0,0 +1,459 @@ +/* Basic functions to support threading. + * + * 29/9/22 + * - from threadpool.c + */ + +/* + + This file is part of VIPS. + + VIPS is free software; you can redistribute it and/or modify + it under the terms of the GNU Lesser General Public License as published by + the Free Software Foundation; either version 2 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA + 02110-1301 USA + + */ + +/* + + These files are distributed with VIPS - http://www.vips.ecs.soton.ac.uk + + */ + +/* +#define VIPS_DEBUG +#define VIPS_DEBUG_RED + */ + +#ifdef HAVE_CONFIG_H +#include +#endif /*HAVE_CONFIG_H*/ +#include + +#include +#include +#ifdef HAVE_UNISTD_H +#include +#endif /*HAVE_UNISTD_H*/ +#include + +#include +#include +#include +#include + +#ifdef G_OS_WIN32 +#include +#endif /*G_OS_WIN32*/ + +/* Maximum value we allow for VIPS_CONCURRENCY. We need to stop huge values + * killing the system. + */ +#define MAX_THREADS (1024) + +/* Default n threads ... 0 means get from environment. + */ +int vips__concurrency = 0; + +/* Default tile geometry ... can be set by vips_init(). + */ +int vips__tile_width = VIPS__TILE_WIDTH; +int vips__tile_height = VIPS__TILE_HEIGHT; +int vips__fatstrip_height = VIPS__FATSTRIP_HEIGHT; +int vips__thinstrip_height = VIPS__THINSTRIP_HEIGHT; + +/* Set this GPrivate to indicate that is a libvips thread. + */ +static GPrivate *is_vips_thread_key = NULL; + +/* TRUE if we are a vips thread. We sometimes manage resource allocation + * differently for vips threads since we can cheaply free stuff on thread + * termination. + */ +gboolean +vips_thread_isvips( void ) +{ + return( g_private_get( is_vips_thread_key ) != NULL ); +} + +/* Glib 2.32 revised the thread API. We need some compat functions. + */ + +GMutex * +vips_g_mutex_new( void ) +{ + GMutex *mutex; + + mutex = g_new( GMutex, 1 ); + g_mutex_init( mutex ); + + return( mutex ); +} + +void +vips_g_mutex_free( GMutex *mutex ) +{ + g_mutex_clear( mutex ); + g_free( mutex ); +} + +GCond * +vips_g_cond_new( void ) +{ + GCond *cond; + + cond = g_new( GCond, 1 ); + g_cond_init( cond ); + + return( cond ); +} + +void +vips_g_cond_free( GCond *cond ) +{ + g_cond_clear( cond ); + g_free( cond ); +} + +typedef struct { + const char *domain; + GThreadFunc func; + gpointer data; +} VipsThreadInfo; + +static void * +vips_thread_run( gpointer data ) +{ + VipsThreadInfo *info = (VipsThreadInfo *) data; + + void *result; + + /* Set this to something (anything) to tag this thread as a vips + * worker. No need to call g_private_replace as there is no + * GDestroyNotify handler associated with a worker. + */ + g_private_set( is_vips_thread_key, info ); + + result = info->func( info->data ); + + g_free( info ); + + vips_thread_shutdown(); + + return( result ); +} + +GThread * +vips_g_thread_new( const char *domain, GThreadFunc func, gpointer data ) +{ + GThread *thread; + VipsThreadInfo *info; + GError *error = NULL; + + info = g_new( VipsThreadInfo, 1 ); + info->domain = domain; + info->func = func; + info->data = data; + + thread = g_thread_try_new( domain, vips_thread_run, info, &error ); + + VIPS_DEBUG_MSG_RED( "vips_g_thread_new: g_thread_create( %s ) = %p\n", + domain, thread ); + + if( !thread ) { + if( error ) + vips_g_error( &error ); + else + vips_error( domain, + "%s", _( "unable to create thread" ) ); + } + + return( thread ); +} + +void * +vips_g_thread_join( GThread *thread ) +{ + void *result; + + result = g_thread_join( thread ); + + VIPS_DEBUG_MSG_RED( "vips_g_thread_join: g_thread_join( %p )\n", + thread ); + + return( result ); +} + +static int +get_num_processors( void ) +{ +#if GLIB_CHECK_VERSION( 2, 48, 1 ) + /* We could use g_get_num_processors when GLib >= 2.48.1, see: + * https://gitlab.gnome.org/GNOME/glib/commit/999711abc82ea3a698d05977f9f91c0b73957f7f + * https://gitlab.gnome.org/GNOME/glib/commit/2149b29468bb99af3c29d5de61f75aad735082dc + */ + return( g_get_num_processors() ); +#else + int nproc; + + nproc = 1; + +#ifdef G_OS_UNIX + +#if defined(HAVE_UNISTD_H) && defined(_SC_NPROCESSORS_ONLN) +{ + /* POSIX style. + */ + int x; + + x = sysconf( _SC_NPROCESSORS_ONLN ); + if( x > 0 ) + nproc = x; +} +#elif defined HW_NCPU +{ + /* BSD style. + */ + int x; + size_t len = sizeof(x); + + sysctl( (int[2]) {CTL_HW, HW_NCPU}, 2, &x, &len, NULL, 0 ); + if( x > 0 ) + nproc = x; +} +#endif + + /* libgomp has some very complex code on Linux to count the number of + * processors available to the current process taking pthread affinity + * into account, but we don't attempt that here. Perhaps we should? + */ + +#endif /*G_OS_UNIX*/ + +#ifdef G_OS_WIN32 +{ + /* Count the CPUs currently available to this process. + */ + SYSTEM_INFO sysinfo; + DWORD_PTR process_cpus; + DWORD_PTR system_cpus; + + /* This *never* fails, use it as fallback + */ + GetNativeSystemInfo( &sysinfo ); + nproc = (int) sysinfo.dwNumberOfProcessors; + + if( GetProcessAffinityMask( GetCurrentProcess(), + &process_cpus, &system_cpus ) ) { + unsigned int af_count; + + for( af_count = 0; process_cpus != 0; process_cpus >>= 1 ) + if( process_cpus & 1 ) + af_count++; + + /* Prefer affinity-based result, if available + */ + if( af_count > 0 ) + nproc = af_count; + } +} +#endif /*G_OS_WIN32*/ + + return( nproc ); +#endif /*!GLIB_CHECK_VERSION( 2, 48, 1 )*/ +} + +/* The default concurrency, set by the environment variable VIPS_CONCURRENCY, + * or if that is not set, the number of threads available on the host machine. + */ +static int +vips__concurrency_get_default( void ) +{ + const char *str; + int nthr; + int x; + + /* Tell the threads system how much concurrency we expect. + */ + if( vips__concurrency > 0 ) + nthr = vips__concurrency; + else if( ((str = g_getenv( "VIPS_CONCURRENCY" )) +#if ENABLE_DEPRECATED + || (str = g_getenv( "IM_CONCURRENCY" )) +#endif + ) && (x = atoi( str )) > 0 ) + nthr = x; + else + nthr = get_num_processors(); + + if( nthr < 1 || + nthr > MAX_THREADS ) { + nthr = VIPS_CLIP( 1, nthr, MAX_THREADS ); + + g_warning( _( "threads clipped to %d" ), nthr ); + } + + return( nthr ); +} + +/** + * vips_concurrency_set: + * @concurrency: number of threads to run + * + * Sets the number of worker threads that vips should use when running a + * #VipsThreadPool. + * + * The special value 0 means "default". In this case, the number of threads + * is set by the environment variable VIPS_CONCURRENCY, or if that is not + * set, the number of threads available on the host machine. + * + * See also: vips_concurrency_get(). + */ +void +vips_concurrency_set( int concurrency ) +{ + /* Tell the threads system how much concurrency we expect. + */ + if( concurrency < 1 ) + concurrency = vips__concurrency_get_default(); + else if( concurrency > MAX_THREADS ) { + concurrency = MAX_THREADS; + + g_warning( _( "threads clipped to %d" ), MAX_THREADS ); + } + + vips__concurrency = concurrency; +} + +/** + * vips_concurrency_get: + * + * Returns the number of worker threads that vips should use when running a + * #VipsThreadPool. + * + * vips gets this values from these sources in turn: + * + * If vips_concurrency_set() has been called, this value is used. The special + * value 0 means "default". You can also use the command-line argument + * "--vips-concurrency" to set this value. + * + * If vips_concurrency_set() has not been called and no command-line argument + * was used, vips uses the value of the environment variable VIPS_CONCURRENCY, + * + * If VIPS_CONCURRENCY has not been set, vips finds the number of hardware + * threads that the host machine can run in parallel and uses that value. + * + * The final value is clipped to the range 1 - 1024. + * + * See also: vips_concurrency_get(). + * + * Returns: number of worker threads to use. + */ +int +vips_concurrency_get( void ) +{ + return( vips__concurrency ); +} + +/** + * vips_get_tile_size: (method) + * @im: image to guess for + * @tile_width: (out): return selected tile width + * @tile_height: (out): return selected tile height + * @n_lines: (out): return buffer height in scanlines + * + * Pick a tile size and a buffer height for this image and the current + * value of vips_concurrency_get(). The buffer height + * will always be a multiple of tile_height. + * + * The buffer height is the height of each buffer we fill in sink disc. Since + * we have two buffers, the largest range of input locality is twice the output + * buffer size, plus whatever margin we add for things like convolution. + */ +void +vips_get_tile_size( VipsImage *im, + int *tile_width, int *tile_height, int *n_lines ) +{ + const int nthr = vips_concurrency_get(); + const int typical_image_width = 1000; + + /* Compiler warnings. + */ + *tile_width = 1; + *tile_height = 1; + + /* Pick a render geometry. + */ + switch( im->dhint ) { + case VIPS_DEMAND_STYLE_SMALLTILE: + *tile_width = vips__tile_width; + *tile_height = vips__tile_height; + break; + + case VIPS_DEMAND_STYLE_ANY: + case VIPS_DEMAND_STYLE_FATSTRIP: + *tile_width = im->Xsize; + *tile_height = vips__fatstrip_height; + break; + + case VIPS_DEMAND_STYLE_THINSTRIP: + *tile_width = im->Xsize; + /* Only enable thinstrip height for very wide images -- the + * overheads are too high to be worthwhile otherwise. + */ + *tile_height = im->Xsize > 10000 ? + vips__thinstrip_height : vips__fatstrip_height; + break; + + default: + g_assert_not_reached(); + } + + /* We can't set n_lines for the current demand style: a later bit of + * the pipeline might see a different hint and we need to synchronise + * buffer sizes everywhere. + * + * We also can't depend on the current image size, since that might + * change down the pipeline too. Pick a typical image width. + * + * Pick the maximum buffer size we might possibly need, then round up + * to a multiple of tileheight. + */ + *n_lines = vips__tile_height * + VIPS_ROUND_UP( vips__tile_width * nthr, + typical_image_width ) / + typical_image_width; + *n_lines = VIPS_MAX( *n_lines, vips__fatstrip_height * nthr ); + *n_lines = VIPS_MAX( *n_lines, vips__thinstrip_height * nthr ); + *n_lines = VIPS_ROUND_UP( *n_lines, *tile_height ); + + /* We make this assumption in several places. + */ + g_assert( *n_lines % *tile_height == 0 ); + + VIPS_DEBUG_MSG( "vips_get_tile_size: %d by %d patches, " + "groups of %d scanlines\n", + *tile_width, *tile_height, *n_lines ); +} + +void +vips__thread_init( void ) +{ + static GPrivate private = { 0 }; + + is_vips_thread_key = &private; + + if( vips__concurrency == 0 ) + vips__concurrency = vips__concurrency_get_default(); +} diff --git a/libvips/iofuncs/threadpool.c b/libvips/iofuncs/threadpool.c index ab7f6271..8e392faf 100644 --- a/libvips/iofuncs/threadpool.c +++ b/libvips/iofuncs/threadpool.c @@ -23,9 +23,6 @@ * - don't depend on image width when setting n_lines * 27/2/19 jtorresfabra * - free threadpool earlier - * 02/02/20 kleisauke - * - reuse threads by using GLib's threadpool - * - remove mutex lock for VipsThreadStartFn */ /* @@ -55,7 +52,7 @@ */ -/* +/* #define VIPS_DEBUG #define VIPS_DEBUG_RED */ @@ -89,383 +86,78 @@ * @include: vips/vips.h * @title: VipsThreadpool * - * A threadpool which allows reusing already started threads. Implementing - * this can be tedious and error-prone. Therefore we use the GLib - * provided threadpool for our convenience. An added advantage is, that - * the threads can be shared between the different subsystems, when they - * are using GLib. - * - * The threadpool is created during vips_init() and is destroyed by - * vips_shutdown(). - * * vips_threadpool_run() loops a set of threads over an image. Threads take it * in turns to allocate units of work (a unit might be a tile in an image), * then run in parallel to process those units. An optional progress function * can be used to give feedback. */ -/* Maximum number of concurrent threads we allow. It prevents huge values of - * VIPS_CONCURRENCY killing the system. - */ -#define MAX_THREADS (1024) - -/* Default tile geometry ... can be set by vips_init(). - */ -int vips__tile_width = VIPS__TILE_WIDTH; -int vips__tile_height = VIPS__TILE_HEIGHT; -int vips__fatstrip_height = VIPS__FATSTRIP_HEIGHT; -int vips__thinstrip_height = VIPS__THINSTRIP_HEIGHT; - -/* Default n threads ... 0 means get from environment. - */ -int vips__concurrency = 0; - -/* Set this GPrivate to indicate that this thread is a worker inside - * the vips threadpool. - */ -static GPrivate *is_worker_key = NULL; - /* Set to stall threads for debugging. */ static gboolean vips__stall = FALSE; -/* The thread pool we'll use. +/* The global threadset we run workers in. */ -static GThreadPool *vips__pool = NULL; +static VipsThreadset *vips__threadset = NULL; -/* Glib 2.32 revised the thread API. We need some compat functions. +/* Set this GPrivate to link a thread back to its VipsWorker struct. */ +static GPrivate *worker_key = NULL; -GMutex * -vips_g_mutex_new( void ) +/* Maximum value we allow for VIPS_CONCURRENCY. We need to stop huge values + * killing the system. + */ +#define MAX_THREADS (1024) + +/* Start up threadpools. This is called during vips_init. + */ +void +vips__threadpool_init( void ) { - GMutex *mutex; + static GPrivate private = { 0 }; - mutex = g_new( GMutex, 1 ); - g_mutex_init( mutex ); + /* 3 is the useful minimum, and huge values can crash the machine. + */ + const char *max_threads_env = g_getenv( "VIPS_MAX_THREADS" ); + int max_threads = max_threads_env ? + VIPS_CLIP( 3, atoi( max_threads_env ), MAX_THREADS ) : + 0; - return( mutex ); + worker_key = &private; + + if( g_getenv( "VIPS_STALL" ) ) + vips__stall = TRUE; + + /* max_threads > 0 will create a set of threads on startup. This is + * necessary for wasm, but may break on systems that try to fork() + * after init. + */ + vips__threadset = vips_threadset_new( max_threads ); } void -vips_g_mutex_free( GMutex *mutex ) +vips__threadpool_shutdown( void ) { - g_mutex_clear( mutex ); - g_free( mutex ); -} - -GCond * -vips_g_cond_new( void ) -{ - GCond *cond; - - cond = g_new( GCond, 1 ); - g_cond_init( cond ); - - return( cond ); -} - -void -vips_g_cond_free( GCond *cond ) -{ - g_cond_clear( cond ); - g_free( cond ); -} - -/* TRUE if we are a vips worker thread. We sometimes manage resource allocation - * differently for vips workers since we can cheaply free stuff on thread - * termination. - */ -gboolean -vips_thread_isworker( void ) -{ - return( g_private_get( is_worker_key ) != NULL ); -} - -typedef struct { - const char *domain; - GThreadFunc func; - gpointer data; -} VipsThreadInfo; - -static void * -vips_thread_run( gpointer data ) -{ - VipsThreadInfo *info = (VipsThreadInfo *) data; - - void *result; - - /* Set this to something (anything) to tag this thread as a vips - * worker. - */ - g_private_set( is_worker_key, data ); - - if( vips__thread_profile ) - vips__thread_profile_attach( info->domain ); - - result = info->func( info->data ); - - g_free( info ); - - vips_thread_shutdown(); - - return( result ); -} - -GThread * -vips_g_thread_new( const char *domain, GThreadFunc func, gpointer data ) -{ - GThread *thread; - VipsThreadInfo *info; - GError *error = NULL; - - info = g_new( VipsThreadInfo, 1 ); - info->domain = domain; - info->func = func; - info->data = data; - - thread = g_thread_try_new( domain, vips_thread_run, info, &error ); - - VIPS_DEBUG_MSG_RED( "vips_g_thread_new: g_thread_create( %s ) = %p\n", - domain, thread ); - - if( !thread ) { - if( error ) - vips_g_error( &error ); - else - vips_error( domain, - "%s", _( "unable to create thread" ) ); - } - - return( thread ); -} - -void * -vips_g_thread_join( GThread *thread ) -{ - void *result; - - result = g_thread_join( thread ); - - VIPS_DEBUG_MSG_RED( "vips_g_thread_join: g_thread_join( %p )\n", - thread ); - - return( result ); -} - -typedef struct { - /* An name for this thread. - */ - const char *name; - - /* The function to execute within the #VipsThreadPool. - */ - GFunc func; - - /* User data that is handed over to func when it is called. - */ - gpointer data; -} VipsThreadExec; - -static void -vips_thread_main_loop( gpointer thread_data, gpointer pool_data ) -{ - VipsThreadExec *exec = (VipsThreadExec *) thread_data; - - /* Set this to something (anything) to tag this thread as a vips - * worker. No need to call g_private_replace as there is no - * GDestroyNotify handler associated with a worker. - */ - g_private_set( is_worker_key, thread_data ); - - if( vips__thread_profile ) - vips__thread_profile_attach( exec->name ); - - exec->func( exec->data, pool_data ); - - g_free( exec ); - - /* Free all thread-private caches, since they probably won't be valid - * for the next task this thread is given. - */ - vips_thread_shutdown(); -} - -static int -get_num_processors( void ) -{ -#if GLIB_CHECK_VERSION( 2, 48, 1 ) - /* We could use g_get_num_processors when GLib >= 2.48.1, see: - * https://gitlab.gnome.org/GNOME/glib/commit/999711abc82ea3a698d05977f9f91c0b73957f7f - * https://gitlab.gnome.org/GNOME/glib/commit/2149b29468bb99af3c29d5de61f75aad735082dc - */ - return( g_get_num_processors() ); -#else - int nproc; - - nproc = 1; - -#ifdef G_OS_UNIX - -#if defined(HAVE_UNISTD_H) && defined(_SC_NPROCESSORS_ONLN) -{ - /* POSIX style. - */ - int x; - - x = sysconf( _SC_NPROCESSORS_ONLN ); - if( x > 0 ) - nproc = x; -} -#elif defined HW_NCPU -{ - /* BSD style. - */ - int x; - size_t len = sizeof(x); - - sysctl( (int[2]) {CTL_HW, HW_NCPU}, 2, &x, &len, NULL, 0 ); - if( x > 0 ) - nproc = x; -} -#endif - - /* libgomp has some very complex code on Linux to count the number of - * processors available to the current process taking pthread affinity - * into account, but we don't attempt that here. Perhaps we should? - */ - -#endif /*G_OS_UNIX*/ - -#ifdef G_OS_WIN32 -{ - /* Count the CPUs currently available to this process. - */ - SYSTEM_INFO sysinfo; - DWORD_PTR process_cpus; - DWORD_PTR system_cpus; - - /* This *never* fails, use it as fallback - */ - GetNativeSystemInfo( &sysinfo ); - nproc = (int) sysinfo.dwNumberOfProcessors; - - if( GetProcessAffinityMask( GetCurrentProcess(), - &process_cpus, &system_cpus ) ) { - unsigned int af_count; - - for( af_count = 0; process_cpus != 0; process_cpus >>= 1 ) - if( process_cpus & 1 ) - af_count++; - - /* Prefer affinity-based result, if available - */ - if( af_count > 0 ) - nproc = af_count; - } -} -#endif /*G_OS_WIN32*/ - - return( nproc ); -#endif /*!GLIB_CHECK_VERSION( 2, 48, 1 )*/ -} - -/* The default concurrency, set by the environment variable VIPS_CONCURRENCY, - * or if that is not set, the number of threads available on the host machine. - */ -static int -vips__concurrency_get_default( void ) -{ - const char *str; - int nthr; - int x; - - /* Tell the threads system how much concurrency we expect. - */ - if( vips__concurrency > 0 ) - nthr = vips__concurrency; - else if( ((str = g_getenv( "VIPS_CONCURRENCY" )) -#if ENABLE_DEPRECATED - || (str = g_getenv( "IM_CONCURRENCY" )) -#endif - ) && (x = atoi( str )) > 0 ) - nthr = x; - else - nthr = get_num_processors(); - - if( nthr < 1 || - nthr > MAX_THREADS ) { - nthr = VIPS_CLIP( 1, nthr, MAX_THREADS ); - - g_warning( _( "threads clipped to %d" ), nthr ); - } - - return( nthr ); + VIPS_FREEF( vips_threadset_free, vips__threadset ); } /** - * vips_concurrency_set: - * @concurrency: number of threads to run + * vips__thread_execute: + * @name: a name for the thread + * @func: a function to execute in the global threadset + * @data: an argument to supply to @func * - * Sets the number of worker threads that vips should use when running a - * #VipsThreadPool. + * A newly created or reused thread will execute @func with the + * argument @data. * - * The special value 0 means "default". In this case, the number of threads is - * set by the environment variable VIPS_CONCURRENCY, or if that is not set, the - * number of threads available on the host machine. - * - * See also: vips_concurrency_get(). - */ -void -vips_concurrency_set( int concurrency ) -{ - /* Tell the threads system how much concurrency we expect. - */ - if( concurrency < 1 ) - concurrency = vips__concurrency_get_default(); - else if( concurrency > MAX_THREADS ) { - concurrency = MAX_THREADS; - - g_warning( _( "threads clipped to %d" ), MAX_THREADS ); - } - - vips__concurrency = concurrency; -} - -/** - * vips_concurrency_get: - * - * Returns the number of worker threads that vips should use when running a - * #VipsThreadPool. - * - * vips gets this values from these sources in turn: - * - * If vips_concurrency_set() has been called, this value is used. The special - * value 0 means "default". You can also use the command-line argument - * "--vips-concurrency" to set this value. - * - * If vips_concurrency_set() has not been called and no command-line argument - * was used, vips uses the value of the environment variable VIPS_CONCURRENCY, - * - * If VIPS_CONCURRENCY has not been set, vips finds the number of hardware - * threads that the host machine can run in parallel and uses that value. - * - * The final value is clipped to the range 1 - 1024. - * - * See also: vips_concurrency_get(). - * - * Returns: number of worker threads to use. + * Returns: 0 on success, -1 on error. */ int -vips_concurrency_get( void ) +vips__thread_execute( const char *domain, GFunc func, gpointer data ) { - return( vips__concurrency ); + return( vips_threadset_run( vips__threadset, domain, func, data ) ); } -/* The VipsThreadStartFn arg to vips_threadpool_run() is called once for each - * thread to make one of these things to hold the thread state. - */ - G_DEFINE_TYPE( VipsThreadState, vips_thread_state, VIPS_TYPE_OBJECT ); static void @@ -538,251 +230,308 @@ vips_thread_state_new( VipsImage *im, void *a ) VIPS_TYPE_THREAD_STATE, vips_thread_state_set, im, a ) ) ); } -/* A VipsTask is the state of one call to vips_threadpool_run(). +/* What we track for each thread in the pool. */ -typedef struct _VipsTask { - /* All private. - */ +typedef struct _VipsWorker { + /*< private >*/ + struct _VipsThreadpool *pool; /* Pool we are part of */ + + VipsThreadState *state; + + gboolean stop; + +} VipsWorker; + +/* What we track for a group of threads working together. + */ +typedef struct _VipsThreadpool { /*< private >*/ VipsImage *im; /* Image we are calculating */ - /* Start or reuse a thread, do a unit of work (runs in parallel) - * and allocate a unit of work (serial). Plus the mutex we use to - * serialize work allocation. + /* Start a thread, do a unit of work (runs in parallel) and allocate + * a unit of work (serial). Plus the mutex we use to serialize work + * allocation. */ VipsThreadStartFn start; VipsThreadpoolAllocateFn allocate; VipsThreadpoolWorkFn work; GMutex *allocate_lock; - void *a; /* User argument to start / allocate / etc. */ + void *a; /* User argument to start / allocate / etc. */ - /* The caller blocks here until all tasks finish. + int max_workers; /* Max number of workers in pool */ + + /* The number of workers in the pool (as a negative number, so + * -4 means 4 workers are running). */ - VipsSemaphore finish; + VipsSemaphore n_workers; /* Workers up this for every loop to make the main thread tick. */ - VipsSemaphore tick; + VipsSemaphore tick; + + /* The number of workers queueing up on allocate_lock. Use this to + * grow and shrink the threadpool. + */ + int n_waiting; /* Set this to abort evaluation early with an error. */ gboolean error; - /* Set by Allocate (via an arg) to indicate normal end of computation. + /* Ask threads to exit, either set by allocate, or on free. */ gboolean stop; -} VipsTask; -/* Allocate some work (single-threaded), then do it (many-threaded). + /* Set this and the next worker to see it will clear the flag and exit + * (used to downsize the threadpool). + */ + int exit; +} VipsThreadpool; + +static int +vips_worker_allocate( VipsWorker *worker ) +{ + VipsThreadpool *pool = worker->pool; + + g_assert( !pool->stop ); + + if( !worker->state && + !(worker->state = pool->start( pool->im, pool->a )) ) + return( -1 ); + + if( pool->allocate( worker->state, pool->a, &pool->stop ) ) + return( -1 ); + + return( 0 ); +} + +/* Run this once per main loop. Get some work (single-threaded), then do it + * (many-threaded). * * The very first workunit is also executed single-threaded. This gives * loaders a change to seek to the correct spot, see vips_sequential(). */ static void -vips_task_work_unit( VipsTask *task, VipsThreadState *state ) +vips_worker_work_unit( VipsWorker *worker ) { - if( task->error ) - return; + VipsThreadpool *pool = worker->pool; - VIPS_GATE_START( "vips_task_work_unit: wait" ); + VIPS_GATE_START( "vips_worker_work_unit: wait" ); - g_mutex_lock( task->allocate_lock ); + vips__worker_lock( pool->allocate_lock ); - VIPS_GATE_STOP( "vips_task_work_unit: wait" ); + VIPS_GATE_STOP( "vips_worker_work_unit: wait" ); /* Has another worker signaled stop while we've been waiting? */ - if( task->stop ) { - g_mutex_unlock( task->allocate_lock ); + if( pool->stop ) { + worker->stop = TRUE; + g_mutex_unlock( pool->allocate_lock ); return; } - if( task->allocate( state, task->a, &task->stop ) ) { - task->error = TRUE; - g_mutex_unlock( task->allocate_lock ); + /* Has a thread been asked to exit? Volunteer if yes. + */ + if( g_atomic_int_add( &pool->exit, -1 ) > 0 ) { + /* A thread had been asked to exit, and we've grabbed the + * flag. + */ + worker->stop = TRUE; + g_mutex_unlock( pool->allocate_lock ); + return; + } + else { + /* No one had been asked to exit and we've mistakenly taken + * the exit count below zero. Put it back up again. + */ + g_atomic_int_add( &pool->exit, 1 ); + } + + if( vips_worker_allocate( worker ) ) { + pool->error = TRUE; + worker->stop = TRUE; + g_mutex_unlock( pool->allocate_lock ); return; } /* Have we just signalled stop? */ - if( task->stop ) { - g_mutex_unlock( task->allocate_lock ); + if( pool->stop ) { + worker->stop = TRUE; + g_mutex_unlock( pool->allocate_lock ); return; } - g_mutex_unlock( task->allocate_lock ); + g_mutex_unlock( pool->allocate_lock ); - if( state->stall && + if( worker->state->stall && vips__stall ) { /* Sleep for 0.5s. Handy for stressing the seq system. Stall * is set by allocate funcs in various places. */ g_usleep( 500000 ); - state->stall = FALSE; - printf( "vips_task_work_unit: " - "stall done, releasing y = %d ...\n", state->y ); + worker->state->stall = FALSE; + printf( "vips_worker_work_unit: stall done, " + "releasing y = %d ...\n", worker->state->y ); } /* Process a work unit. */ - if( task->work( state, task->a ) ) - task->error = TRUE; + if( pool->work( worker->state, pool->a ) ) { + worker->stop = TRUE; + pool->error = TRUE; + } } -/* What runs as a pipeline thread ... loop, waiting to be told to do stuff. +/* What runs as a thread ... loop, waiting to be told to do stuff. */ static void -vips_task_run( gpointer data, gpointer user_data ) +vips_thread_main_loop( void *a, void *b ) { - VipsTask *task = (VipsTask *) data; - VipsThreadState *state; + VipsWorker *worker = (VipsWorker *) a; + VipsThreadpool *pool = worker->pool; - VIPS_GATE_START( "vips_task_run: thread" ); + g_assert( pool == worker->pool ); - if( !(state = task->start( task->im, task->a )) ) - task->error = TRUE; + VIPS_GATE_START( "vips_thread_main_loop: thread" ); + + g_private_set( worker_key, worker ); /* Process work units! Always tick, even if we are stopping, so the * main thread will wake up for exit. */ - for(;;) { - VIPS_GATE_START( "vips_task_work_unit: u" ); - vips_task_work_unit( task, state ); - VIPS_GATE_STOP( "vips_task_work_unit: u" ); - vips_semaphore_up( &task->tick ); - - if( task->stop || - task->error ) - break; + while( !pool->stop && + !worker->stop && + !pool->error ) { + VIPS_GATE_START( "vips_worker_work_unit: u" ); + vips_worker_work_unit( worker ); + VIPS_GATE_STOP( "vips_thread_work_unit: u" ); + vips_semaphore_up( &pool->tick ); } - VIPS_FREEF( g_object_unref, state ); + VIPS_GATE_STOP( "vips_thread_main_loop: thread" ); - /* We are exiting: tell the main thread. + /* unreffing the worker state will trigger stop in the threadstate, so + * we need to single-thread. */ - vips_semaphore_up( &task->finish ); + g_mutex_lock( pool->allocate_lock ); - VIPS_GATE_STOP( "vips_task_run: thread" ); + VIPS_FREEF( g_object_unref, worker->state ); + + g_mutex_unlock( pool->allocate_lock ); + + VIPS_FREE( worker ); + g_private_set( worker_key, NULL ); + + /* We are done: tell the main thread. + */ + vips_semaphore_upn( &pool->n_workers, 1 ); } -/* Called from vips_shutdown(). +/* Attach another thread to a threadpool. */ -void -vips__threadpool_shutdown( void ) +static int +vips_worker_new( VipsThreadpool *pool ) { - /* We may come here without having inited. - */ - if( vips__pool ) { - VIPS_DEBUG_MSG( "vips__threadpool_shutdown: (%p)\n", - vips__pool ); + VipsWorker *worker; - g_thread_pool_free( vips__pool, TRUE, TRUE ); - vips__pool = NULL; + if( !(worker = VIPS_NEW( NULL, VipsWorker )) ) + return( -1 ); + worker->pool = pool; + worker->state = NULL; + + /* We can't build the state here, it has to be done by the worker + * itself the first time that allocate runs so that any regions are + * owned by the correct thread. + */ + + if( vips__thread_execute( "worker", + vips_thread_main_loop, worker ) ) { + g_free( worker ); + return( -1 ); } + + /* One more worker in the pool. + */ + vips_semaphore_upn( &pool->n_workers, -1 ); + + return( 0 ); } -static VipsTask * -vips_task_new( VipsImage *im, int *n_tasks ) +void +vips__worker_lock( GMutex *mutex ) { - VipsTask *task; + VipsWorker *worker = (VipsWorker *) g_private_get( worker_key ); + + if( worker ) + g_atomic_int_add( &worker->pool->n_waiting, 1 ); + g_mutex_lock( mutex ); + if( worker ) + g_atomic_int_add( &worker->pool->n_waiting, -1 ); +} + +static void +vips_threadpool_free( VipsThreadpool *pool ) +{ + VIPS_DEBUG_MSG( "vips_threadpool_free: \"%s\" (%p)\n", + pool->im->filename, pool ); + + /* Wait for them all to exit. + */ + pool->stop = TRUE; + vips_semaphore_downn( &pool->n_workers, 0 ); + + VIPS_FREEF( vips_g_mutex_free, pool->allocate_lock ); + vips_semaphore_destroy( &pool->n_workers ); + vips_semaphore_destroy( &pool->tick ); + VIPS_FREE( pool ); +} + +static VipsThreadpool * +vips_threadpool_new( VipsImage *im ) +{ + VipsThreadpool *pool; int tile_width; int tile_height; gint64 n_tiles; int n_lines; - if( !(task = VIPS_NEW( NULL, VipsTask )) ) + /* Allocate and init new thread block. + */ + if( !(pool = VIPS_NEW( NULL, VipsThreadpool )) ) return( NULL ); - task->im = im; - task->allocate = NULL; - task->work = NULL; - task->allocate_lock = vips_g_mutex_new(); - vips_semaphore_init( &task->finish, 0, "finish" ); - vips_semaphore_init( &task->tick, 0, "tick" ); - task->error = FALSE; - task->stop = FALSE; + pool->im = im; + pool->allocate = NULL; + pool->work = NULL; + pool->allocate_lock = vips_g_mutex_new(); + pool->max_workers = vips_concurrency_get(); + vips_semaphore_init( &pool->n_workers, 0, "n_workers" ); + vips_semaphore_init( &pool->tick, 0, "tick" ); + pool->error = FALSE; + pool->stop = FALSE; + pool->exit = 0; - *n_tasks = vips_concurrency_get(); - - /* If this is a tiny image, we won't need all n_tasks. Guess how + /* If this is a tiny image, we won't need all max_workers threads. + * Guess how * many tiles we might need to cover the image and use that to limit - * the number of tasks we create. + * the number of threads we create. */ vips_get_tile_size( im, &tile_width, &tile_height, &n_lines ); n_tiles = (1 + (gint64) im->Xsize / tile_width) * (1 + (gint64) im->Ysize / tile_height); - n_tiles = VIPS_MAX( 1, n_tiles ); - *n_tasks = VIPS_MIN( *n_tasks, n_tiles ); + n_tiles = VIPS_CLIP( 1, n_tiles, 1024 ); + pool->max_workers = VIPS_MIN( pool->max_workers, n_tiles ); - VIPS_DEBUG_MSG( "vips_task_new: \"%s\" (%p), with %d tasks\n", - im->filename, task, *n_tasks ); - - return( task ); -} - -static void -vips_task_free( VipsTask *task ) -{ - VIPS_DEBUG_MSG( "vips_task_free: \"%s\" (%p)\n", - task->im->filename, task ); - - VIPS_FREEF( vips_g_mutex_free, task->allocate_lock ); - vips_semaphore_destroy( &task->finish ); - vips_semaphore_destroy( &task->tick ); - VIPS_FREE( task ); -} - -static void * -vips__thread_once_init( void *data ) -{ - /* We can have many more than vips__concurrency threads -- each active - * pipeline will make vips__concurrency more, see - * vips_threadpool_run(). + /* VIPS_META_CONCURRENCY on the image can optionally override + * concurrency. */ - vips__pool = g_thread_pool_new( vips_thread_main_loop, NULL, - -1, FALSE, NULL ); + pool->max_workers = vips_image_get_concurrency( im, pool->max_workers ); - return( NULL ); -} + VIPS_DEBUG_MSG( "vips_threadpool_new: " + "\"%s\" (%p), with %d threads\n", + im->filename, pool, pool->max_workers ); -/** - * vips__thread_execute: - * @name: a name for the thread - * @func: a function to execute in the thread pool - * @data: an argument to supply to @func - * - * A newly created or reused thread will execute @func with with the - * argument data. - * - * See also: vips_concurrency_set(). - * - * Returns: 0 on success, -1 on error. - */ -int -vips__thread_execute( const char *name, GFunc func, gpointer data ) -{ - static GOnce once = G_ONCE_INIT; - - VipsThreadExec *exec; - GError *error = NULL; - gboolean result; - - VIPS_ONCE( &once, vips__thread_once_init, NULL ); - - exec = g_new( VipsThreadExec, 1 ); - exec->name = name; - exec->func = func; - exec->data = data; - - result = g_thread_pool_push( vips__pool, exec, &error ); - if( error ) { - vips_g_error( &error ); - return( -1 ); - } - - VIPS_DEBUG_MSG( "vips__thread_execute: %u threads in pool\n", - g_thread_pool_get_num_threads( vips__pool ) ); - - return( result ? 0 : -1 ); + return( pool ); } /** @@ -795,8 +544,8 @@ vips__thread_execute( const char *name, GFunc func, gpointer data ) * is allocated to it to build the per-thread state. Per-thread state is used * by #VipsThreadpoolAllocate and #VipsThreadpoolWork to communicate. * - * #VipsThreadState is a subclass of #VipsObject. Start functions can be - * executed concurrently. + * #VipsThreadState is a subclass of #VipsObject. Start functions are called + * from allocate, that is, they are single-threaded. * * See also: vips_threadpool_run(). * @@ -868,8 +617,7 @@ vips__thread_execute( const char *name, GFunc func, gpointer data ) * @progress: give progress feedback about a work unit, or %NULL * @a: client data * - * This function runs a set of threads over an image. It will use a newly - * created or reused thread within the #VipsThreadPool. Each thread first calls + * This function runs a set of threads over an image. Each thread first calls * @start to create new per-thread state, then runs * @allocate to set up a new work unit (perhaps the next tile in an image, for * example), then @work to process that work unit. After each unit is @@ -879,9 +627,9 @@ vips__thread_execute( const char *name, GFunc func, gpointer data ) * The object returned by @start must be an instance of a subclass of * #VipsThreadState. Use this to communicate between @allocate and @work. * - * @allocate is always single-threaded (so it can write to the - * per-pool state), whereas @start and @work can be executed concurrently. - * @progress is always called by + * @allocate and @start are always single-threaded (so they can write to the + * per-pool state), whereas @work can be executed concurrently. @progress is + * always called by * the main thread (ie. the thread which called vips_threadpool_run()). * * See also: vips_concurrency_set(). @@ -896,155 +644,82 @@ vips_threadpool_run( VipsImage *im, VipsThreadpoolProgressFn progress, void *a ) { - VipsTask *task; - int n_tasks; - int i; + VipsThreadpool *pool; int result; + int n_waiting; + int n_working; - if( !(task = vips_task_new( im, &n_tasks )) ) + if( !(pool = vips_threadpool_new( im )) ) return( -1 ); - task->start = start; - task->allocate = allocate; - task->work = work; - task->a = a; + pool->start = start; + pool->allocate = allocate; + pool->work = work; + pool->a = a; - /* Create a set of workers for this pipeline. + /* Start with half of the max number of threads, then let it drift up + * and down with load. */ - for( i = 0; i < n_tasks; i++ ) - if( vips__thread_execute( "worker", vips_task_run, task ) ) + for( n_working = 0; n_working < 1 + pool->max_workers / 2; n_working++ ) + if( vips_worker_new( pool ) ) { + vips_threadpool_free( pool ); return( -1 ); + } for(;;) { /* Wait for a tick from a worker. */ - vips_semaphore_down( &task->tick ); + vips_semaphore_down( &pool->tick ); VIPS_DEBUG_MSG( "vips_threadpool_run: tick\n" ); - if( task->stop || - task->error ) + if( pool->stop || + pool->error ) break; if( progress && - progress( task->a ) ) - task->error = TRUE; + progress( pool->a ) ) + pool->error = TRUE; - if( task->stop || - task->error ) + if( pool->stop || + pool->error ) break; + + n_waiting = g_atomic_int_get( &pool->n_waiting ); + VIPS_DEBUG_MSG( "n_waiting = %d\n", n_waiting ); + VIPS_DEBUG_MSG( "n_working = %d\n", n_working ); + VIPS_DEBUG_MSG( "exit = %d\n", pool->exit ); + + if( n_waiting > 3 && + n_working > 1 ) { + VIPS_DEBUG_MSG( "shrinking thread pool\n" ); + g_atomic_int_add( &pool->exit, 1 ); + n_working -= 1; + } + else if( n_waiting < 2 && + n_working < pool->max_workers ) { + VIPS_DEBUG_MSG( "expanding thread pool\n" ); + if( vips_worker_new( pool ) ) { + vips_threadpool_free( pool ); + return( -1 ); + } + n_working += 1; + } } - /* Wait for them all to hit finish. + /* + if( !vips_image_get_concurrency( im, 0 ) ) + printf( "vips_threadpool_run: " + "finished with %d workers in pool\n", n_working ); */ - vips_semaphore_downn( &task->finish, n_tasks ); /* Return 0 for success. */ - result = task->error ? -1 : 0; + result = pool->error ? -1 : 0; - vips_task_free( task ); + vips_threadpool_free( pool ); + + vips_image_minimise_all( im ); return( result ); } - -/* Create the vips threadpool. This is called during vips_init. - */ -void -vips__threadpool_init( void ) -{ - static GPrivate private = { 0 }; - - is_worker_key = &private; - - if( g_getenv( "VIPS_STALL" ) ) - vips__stall = TRUE; - - if( vips__concurrency == 0 ) - vips__concurrency = vips__concurrency_get_default(); - - /* The threadpool is built in the first vips__thread_execute() - * call, since we want thread creation to happen as late as possible. - * - * Many web platforms start up in a base environment, then fork() for - * each request. We must not make the threadpool before the fork. - */ - - VIPS_DEBUG_MSG( "vips__threadpool_init: (%p)\n", vips__pool ); -} - -/** - * vips_get_tile_size: (method) - * @im: image to guess for - * @tile_width: (out): return selected tile width - * @tile_height: (out): return selected tile height - * @n_lines: (out): return buffer height in scanlines - * - * Pick a tile size and a buffer height for this image and the current - * value of vips_concurrency_get(). The buffer height - * will always be a multiple of tile_height. - * - * The buffer height is the height of each buffer we fill in sink disc. Since - * we have two buffers, the largest range of input locality is twice the output - * buffer size, plus whatever margin we add for things like convolution. - */ -void -vips_get_tile_size( VipsImage *im, - int *tile_width, int *tile_height, int *n_lines ) -{ - const int nthr = vips_concurrency_get(); - const int typical_image_width = 1000; - - /* Compiler warnings. - */ - *tile_width = 1; - *tile_height = 1; - - /* Pick a render geometry. - */ - switch( im->dhint ) { - case VIPS_DEMAND_STYLE_SMALLTILE: - *tile_width = vips__tile_width; - *tile_height = vips__tile_height; - break; - - case VIPS_DEMAND_STYLE_ANY: - case VIPS_DEMAND_STYLE_FATSTRIP: - *tile_width = im->Xsize; - *tile_height = vips__fatstrip_height; - break; - - case VIPS_DEMAND_STYLE_THINSTRIP: - *tile_width = im->Xsize; - *tile_height = vips__thinstrip_height; - break; - - default: - g_assert_not_reached(); - } - - /* We can't set n_lines for the current demand style: a later bit of - * the pipeline might see a different hint and we need to synchronise - * buffer sizes everywhere. - * - * We also can't depend on the current image size, since that might - * change down the pipeline too. Pick a typical image width. - * - * Pick the maximum buffer size we might possibly need, then round up - * to a multiple of tileheight. - */ - *n_lines = vips__tile_height * - VIPS_ROUND_UP( vips__tile_width * nthr, typical_image_width ) / - typical_image_width; - *n_lines = VIPS_MAX( *n_lines, vips__fatstrip_height * nthr ); - *n_lines = VIPS_MAX( *n_lines, vips__thinstrip_height * nthr ); - *n_lines = VIPS_ROUND_UP( *n_lines, *tile_height ); - - /* We make this assumption in several places. - */ - g_assert( *n_lines % *tile_height == 0 ); - - VIPS_DEBUG_MSG( "vips_get_tile_size: %d by %d patches, " - "groups of %d scanlines\n", - *tile_width, *tile_height, *n_lines ); -} diff --git a/libvips/iofuncs/threadset.c b/libvips/iofuncs/threadset.c new file mode 100644 index 00000000..4e9cb10c --- /dev/null +++ b/libvips/iofuncs/threadset.c @@ -0,0 +1,330 @@ +/* A set of threads. + * + * Creating and destroying threads can be expensive on some platforms, so we + * try to only create once, then reuse. + */ + +/* + + This file is part of VIPS. + + VIPS is free software; you can redistribute it and/or modify + it under the terms of the GNU Lesser General Public License as published by + the Free Software Foundation; either version 2 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA + 02110-1301 USA + + */ + +/* + + These files are distributed with VIPS - http://www.vips.ecs.soton.ac.uk + + */ + +#ifdef HAVE_CONFIG_H +#include +#endif /*HAVE_CONFIG_H*/ +#include + +#include +#include +#ifdef HAVE_UNISTD_H +#include +#endif /*HAVE_UNISTD_H*/ +#include + +#include +#include +#include +#include + +typedef struct _VipsThreadsetMember { + /* The set we are part of. + */ + VipsThreadset *set; + + /* The underlying glib thread object. + */ + GThread *thread; + + /* The task the thread should run next. + */ + const char *domain; + GFunc func; + void *data; + void *user_data; + + /* The thread waits on this when it's free. + */ + VipsSemaphore idle; + + /* Set by our controller to request exit. + */ + gboolean kill; +} VipsThreadsetMember; + +struct _VipsThreadset { + GMutex *lock; + + /* All the VipsThreadsetMember we have created. + */ + GSList *members; + + /* The set of currently idle threads. + */ + GSList *free; + + /* The current number of threads, the highwater mark, and + * the max we allow before blocking thread creation. + */ + int n_threads; + int n_threads_highwater; + int max_threads; +}; + +/* The thread work function. + */ +static void * +vips_threadset_work( void *pointer ) +{ + VipsThreadsetMember *member = (VipsThreadsetMember *) pointer; + VipsThreadset *set = member->set; + + for(;;) { + /* Wait to be given work. + */ + vips_semaphore_down( &member->idle ); + if( member->kill || + !member->func ) + break; + + /* If we're profiling, attach a prof struct to this thread. + */ + if( vips__thread_profile ) + vips__thread_profile_attach( member->domain ); + + /* Execute the task. + */ + member->func( member->data, member->user_data ); + + /* Free any thread-private resources -- they will not be + * useful for the next task to use this thread. + */ + vips_thread_shutdown(); + + member->domain = NULL; + member->func = NULL; + member->data = NULL; + member->user_data = NULL; + + /* We are free ... back on the free list! + */ + g_mutex_lock( set->lock ); + set->free = g_slist_prepend( set->free, member ); + g_mutex_unlock( set->lock ); + } + + /* Kill has been requested. We leave this thread on the members + * list so it can be found and joined. + */ + + return( NULL ); +} + +/* Create a new idle member for the set. + */ +static VipsThreadsetMember * +vips_threadset_add( VipsThreadset *set ) +{ + VipsThreadsetMember *member; + + if( set->max_threads && + set->n_threads >= set->max_threads ) { + vips_error( "VipsThreadset", + "%s", _( "threadset is exhausted" ) ); + return( NULL ); + } + + member = g_new0( VipsThreadsetMember, 1 ); + member->set = set; + + vips_semaphore_init( &member->idle, 0, "idle" ); + + if( !(member->thread = vips_g_thread_new( "libvips worker", + vips_threadset_work, member )) ) { + vips_semaphore_destroy( &member->idle ); + VIPS_FREE( member ); + + return( NULL ); + } + + g_mutex_lock( set->lock ); + set->members = g_slist_prepend( set->members, member ); + set->n_threads += 1; + set->n_threads_highwater = + VIPS_MAX( set->n_threads_highwater, set->n_threads );; + g_mutex_unlock( set->lock ); + + return( member ); +} + +/** + * vips_threadset_new: + * @max_threads: maxium number of system threads + * + * Create a new threadset. + * + * If @max_threads is 0, new threads will be created when necessary by + * vips_threadset_run(), with no limit on the number of threads. + * + * If @max_threads is > 0, then that many threads will be created by + * vips_threadset_new() during startup and vips_threadset_run() will fail if + * no free threads are available. + * + * Returns: the new threadset. + */ +VipsThreadset * +vips_threadset_new( int max_threads ) +{ + VipsThreadset *set; + + set = g_new0( VipsThreadset, 1 ); + set->lock = vips_g_mutex_new(); + set->max_threads = max_threads; + + if( set->max_threads > 0 ) + for( int i = 0; i < set->max_threads; i++ ) { + VipsThreadsetMember *member; + + if( !(member = vips_threadset_add( set )) ) { + vips_threadset_free( set ); + return( NULL ); + } + + set->free = g_slist_prepend( set->free, member ); + } + + return( set ); +} + +/** + * vips_threadset_run: + * @set: the threadset to run the task in + * @domain: the name of the task (useful for debugging) + * @func: the task to execute + * @data: the task's data + * + * Execute a task in a thread. If there are no idle threads, create a new one, + * provided we are under @max_threads. + * + * See also: vips_threadset_new(). + * + * Returns: 0 on success, or -1 on error. + */ +int +vips_threadset_run( VipsThreadset *set, + const char *domain, GFunc func, gpointer data ) +{ + VipsThreadsetMember *member; + + member = NULL; + + /* Try to get an idle thread. + */ + g_mutex_lock( set->lock ); + if( set->free ) { + member = (VipsThreadsetMember *) set->free->data; + set->free = g_slist_remove( set->free, member ); + } + g_mutex_unlock( set->lock ); + + /* None? Make a new idle but not free member. + */ + if( !member ) + member = vips_threadset_add( set ); + + /* Still nothing? Thread create has failed. + */ + if( !member ) + return( -1 ); + + /* Allocate the task and set it going. + */ + member->domain = domain; + member->func = func; + member->data = data; + member->user_data = NULL; + vips_semaphore_up( &member->idle ); + + return( 0 ); +} + +/* Kill a member. + */ +static void +vips_threadset_kill_member( VipsThreadsetMember *member ) +{ + VipsThreadset *set = member->set; + + member->kill = TRUE; + vips_semaphore_up( &member->idle ); + g_thread_join( member->thread ); + + vips_semaphore_destroy( &member->idle ); + + g_mutex_lock( set->lock ); + set->free = g_slist_remove( set->free, member ); + set->n_threads -= 1; + g_mutex_unlock( set->lock ); + + VIPS_FREE( member ); +} + +/** + * vips_threadset_free: + * @set: the threadset to free + * + * Free a threadset. This call will block until all pending tasks are + * finished. + */ +void +vips_threadset_free( VipsThreadset *set ) +{ + VIPS_DEBUG_MSG( "vips_threadset_free: %p\n", set ); + + /* Try to get and finish a thread. + */ + for(;;) { + VipsThreadsetMember *member; + + member = NULL; + g_mutex_lock( set->lock ); + if( set->members ) { + member = (VipsThreadsetMember *) set->members->data; + set->members = g_slist_remove( set->members, member ); + } + g_mutex_unlock( set->lock ); + + if( !member ) + break; + + vips_threadset_kill_member( member ); + } + + if( vips__leak ) + printf( "vips_threadset_free: peak of %d threads\n", + set->n_threads_highwater ); + + VIPS_FREEF( vips_g_mutex_free, set->lock ); + VIPS_FREE( set ); +} diff --git a/suppressions/lsan.supp b/suppressions/lsan.supp index 47fb1f07..8ffe8a9f 100644 --- a/suppressions/lsan.supp +++ b/suppressions/lsan.supp @@ -3,4 +3,5 @@ leak:bash leak:libfontconfig.so leak:libIlmImf-2_5.so leak:libIlmThread-2_5.so +leak:libMagickCore-6.Q16.so leak:libx265.so diff --git a/test/test_threading.sh b/test/test_threading.sh index f8cf027b..79da603f 100755 --- a/test/test_threading.sh +++ b/test/test_threading.sh @@ -43,6 +43,15 @@ if [ $(echo "$max > 0" | bc) -eq 1 ]; then echo error, max == $max exit 1 else - echo all threading tests passed + echo all benchmark threading tests passed fi +# setting VIPS_MAX_THREADS low should force a small thread limit +echo -n "checking threadset size limit ... " +VIPS_MAX_THREADS=5 VIPS_CONCURRENCY=3 vips copy $image x.v || exit_code=$? +if [ $exit_code -ne 0 ]; then + echo FAILED + exit 1 +fi +echo ok + diff --git a/test/variables.sh.in b/test/variables.sh.in index 1af7b47c..ea4e8b93 100644 --- a/test/variables.sh.in +++ b/test/variables.sh.in @@ -2,7 +2,7 @@ top_srcdir=@abs_top_srcdir@ top_builddir=@abs_top_builddir@ PYTHON=@PYTHON@ # we need a different tmp for each script since make can run tests in parallel -tmp=$top_srcdir/test/tmp-$$ +tmp=$top_builddir/test/tmp-$$ test_images=$top_srcdir/test/test-suite/images image=$test_images/sample.jpg mkdir -p $tmp