Revised threading system (#3105)

* reimplement threadpool

just a set of threads that get recycled

"ninja test" passes and dzsave seems to work, though pytest fails for
some reason

* clean up threading code a bit

move base stuff into thread.c, so we have a simple

	thread -> threadset -> threadpool

layering

* start trying to revert g_threadpool

based on the original commit

* working!

nice low systime again

still need to repply cnages to threadpool.c since 80e0cc3d1

* reapply fixes from master

so threadpool.c is now up to date

* rename VipsThread as VipsWorker

a bit less confusing

* use a semaphore to count workers in a pool

* tidy up

* formatting

* dynamic threadpool sizing

based on counting the number of blocked threads in each pool

it works, but the improvement is not great :(

* add "concurrency" metadata item

so operators can hint threadpool size (dzsave especially)

* don't use thinstrip for small images

* add RGB mode to openslide

since flatten was taking 20% of CPU time for dzsave

* fix up rgb mode

now actually works

* make the tile buffer per thread

in the new openslideload rgb mode

* fix dynamic pool downsize

* mild refactoring

* fix the buffer system

oops, turned it off by mistake

* all done!

* revise changelog

* Update libvips/iofuncs/threadset.c

Co-authored-by: Kleis Auke Wolthuizen <github@kleisauke.nl>

* Update libvips/iofuncs/threadset.c

Co-authored-by: Kleis Auke Wolthuizen <github@kleisauke.nl>

* LSan: add libMagickCore to suppression file

* Revert "Remove mutex lock for VipsThreadStartFn"

This reverts commit 41440491.

* add VIPS_MAX_THREADS

to set a hard limit on the threadset size

* Revert "Revert "Remove mutex lock for VipsThreadStartFn""

This reverts commit 77e8520966ba79194fff3b4e648bbd295cd5c260.

* remove sslock from sink.c

* move fixed threadpool build to init

not first use

* add some doc comments

* revert test suite threshold change

* add a test for MAX_THREADS

and move the test tmp/ area into the builddir

* limit VIPS_MAX_THREADS to sane values

* use tabs rather than spaces

Co-authored-by: Kleis Auke Wolthuizen <github@kleisauke.nl>
This commit is contained in:
John Cupitt 2022-10-26 15:25:19 +01:00 committed by GitHub
parent ea0912f23b
commit 976db37f84
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
25 changed files with 1376 additions and 740 deletions

View File

@ -13,6 +13,10 @@ master
- add ".pnm" save [ewelot]
- threaded tiff jp2k and jpeg decompress
- improve speed and efficiency of animated WebP write [dloebl]
- add rgb mode to openslideload
- new thread recycler
- threadpools size dynamically with load
- operations can hint threadpool size
- support for N-colour ICC profiles
11/10/22 started 8.13.3

View File

@ -234,6 +234,13 @@ vips_colour_gen( VipsRegion *or,
int i, y;
VipsPel *p[MAX_INPUT_IMAGES], *q;
/*
printf( "vips_colour_gen: %s, "
"left = %d, top = %d, width = %d, height = %d\n",
VIPS_OBJECT_CLASS( class )->nickname,
r->left, r->top, r->width, r->height );
*/
if( vips_reorder_prepare_many( or->im, ir, r ) )
return( -1 );

View File

@ -126,7 +126,7 @@ vips_sequential_generate( VipsRegion *or,
VIPS_GATE_START( "vips_sequential_generate: wait" );
g_mutex_lock( sequential->lock );
vips__worker_lock( sequential->lock );
VIPS_GATE_STOP( "vips_sequential_generate: wait" );

View File

@ -620,7 +620,7 @@ vips_tile_cache_gen( VipsRegion *or,
VIPS_GATE_START( "vips_tile_cache_gen: wait1" );
g_mutex_lock( cache->lock );
vips__worker_lock( cache->lock );
VIPS_GATE_STOP( "vips_tile_cache_gen: wait1" );
@ -895,7 +895,7 @@ vips_line_cache_gen( VipsRegion *or,
VIPS_GATE_START( "vips_line_cache_gen: wait" );
g_mutex_lock( block_cache->lock );
vips__worker_lock( block_cache->lock );
VIPS_GATE_STOP( "vips_line_cache_gen: wait" );

View File

@ -83,6 +83,14 @@ static int
vips_black_gen( VipsRegion *or, void *seq, void *a, void *b,
gboolean *stop )
{
/*
VipsRect *r = &or->valid;
printf( "vips_black_gen: "
"left = %d, top = %d, width = %d, height = %d\n",
r->left, r->top, r->width, r->height );
*/
vips_region_black( or );
return( 0 );

View File

@ -1900,6 +1900,8 @@ strip_work( VipsThreadState *state, void *a )
g_mutex_unlock( vips__global_lock );
vips_image_set_int( x, VIPS_META_CONCURRENCY, 1 );
if( write_image( dz, out, x, dz->suffix ) ) {
g_object_unref( out );
g_object_unref( x );

View File

@ -450,11 +450,13 @@ fits2vips_generate( VipsRegion *out,
q = VIPS_REGION_ADDR( out, r->left, r->top );
g_mutex_lock( fits->lock );
vips__worker_lock( fits->lock );
if( vips_fits_read_subset( fits, fpixel, lpixel, inc, q ) ) {
g_mutex_unlock( fits->lock );
return( -1 );
}
g_mutex_unlock( fits->lock );
}
else {
@ -478,12 +480,14 @@ fits2vips_generate( VipsRegion *out,
q = VIPS_REGION_ADDR( out, r->left, y );
g_mutex_lock( fits->lock );
vips__worker_lock( fits->lock );
if( vips_fits_read_subset( fits,
fpixel, lpixel, inc, q ) ) {
g_mutex_unlock( fits->lock );
return( -1 );
}
g_mutex_unlock( fits->lock );
}
}

View File

@ -754,9 +754,11 @@ magick_fill_region( VipsRegion *out,
PixelPacket *pixels;
g_mutex_lock( read->lock );
vips__worker_lock( read->lock );
pixels = get_pixels( read->frames[frame],
r->left, line, r->width, 1 );
g_mutex_unlock( read->lock );
if( !pixels ) {

View File

@ -676,10 +676,12 @@ vips_foreign_load_magick7_fill_region( VipsRegion *or,
Quantum * restrict p;
VipsPel * restrict q;
g_mutex_lock( magick7->lock );
vips__worker_lock( magick7->lock );
p = GetCacheViewAuthenticPixels( magick7->cache_view[frame],
r->left, line, r->width, 1,
magick7->exception );
g_mutex_unlock( magick7->lock );
if( !p )

View File

@ -110,6 +110,7 @@ typedef struct {
gboolean autocrop;
char *associated;
gboolean attach_associated;
gboolean rgb;
openslide_t *osr;
@ -237,7 +238,7 @@ get_bounds( openslide_t *osr, VipsRect *rect )
static ReadSlide *
readslide_new( const char *filename, VipsImage *out,
int level, gboolean autocrop,
const char *associated, gboolean attach_associated )
const char *associated, gboolean attach_associated, gboolean rgb )
{
ReadSlide *rslide;
@ -268,6 +269,7 @@ readslide_new( const char *filename, VipsImage *out,
rslide->autocrop = autocrop;
rslide->associated = g_strdup( associated );
rslide->attach_associated = attach_associated;
rslide->rgb = rgb;
/* Non-crazy defaults, override in _parse() if we can.
*/
@ -289,11 +291,11 @@ readslide_new( const char *filename, VipsImage *out,
* compatibility with older vipses.
*/
static void
argb2rgba( uint32_t * restrict buf, int n, uint32_t bg )
argb2rgba( uint32_t * restrict buf, int64_t n, uint32_t bg )
{
const uint32_t pbg = GUINT32_TO_BE( (bg << 8) | 255 );
int i;
int64_t i;
for( i = 0; i < n; i++ ) {
uint32_t * restrict p = buf + i;
@ -318,6 +320,90 @@ argb2rgba( uint32_t * restrict buf, int n, uint32_t bg )
}
}
/* Convert from ARGB to RGB. In RGB mode, assume a is always 255.
*/
static void
argb2rgb( uint32_t * restrict buf, VipsPel *restrict q, int64_t n )
{
int64_t i;
for( i = 0; i < n; i++ ) {
uint32_t x = buf[i];
q[0] = ((x >> 16) & 0xff);
q[1] = ((x >> 8) & 0xff);
q[2] = (x & 0xff);
q += 3;
}
}
static VipsImage *
vips__openslide_get_associated( ReadSlide *rslide, const char *associated_name )
{
VipsImage *associated;
int64_t w, h;
const char *error;
associated = vips_image_new_memory();
openslide_get_associated_image_dimensions( rslide->osr,
associated_name, &w, &h );
/* Always 4 bands, since this is the image that gets the ARGB from
* cairo.
*/
vips_image_init_fields( associated, w, h, 4,
VIPS_FORMAT_UCHAR,
VIPS_CODING_NONE, VIPS_INTERPRETATION_sRGB, 1.0, 1.0 );
if( vips_image_pipelinev( associated,
VIPS_DEMAND_STYLE_THINSTRIP, NULL ) ||
vips_image_write_prepare( associated ) ) {
g_object_unref( associated );
return( NULL );
}
openslide_read_associated_image( rslide->osr,
associated_name,
(uint32_t *) VIPS_IMAGE_ADDR( associated, 0, 0 ) );
error = openslide_get_error( rslide->osr );
if( error ) {
vips_error( "openslide2vips",
_( "reading associated image: %s" ), error );
g_object_unref( associated );
return( NULL );
}
/* In RGB mode we make a second RGB image and repack to that.
*/
if( rslide->rgb ) {
VipsImage *rgb;
rgb = vips_image_new_memory();
vips_object_local( rgb, associated );
vips_image_init_fields( rgb, w, h, 3,
VIPS_FORMAT_UCHAR,
VIPS_CODING_NONE, VIPS_INTERPRETATION_sRGB, 1.0, 1.0 );
if( vips_image_pipelinev( rgb,
VIPS_DEMAND_STYLE_THINSTRIP, NULL ) ||
vips_image_write_prepare( rgb ) ) {
g_object_unref( rgb );
return( NULL );
}
argb2rgb( (uint32_t *) VIPS_IMAGE_ADDR( associated, 0, 0 ),
VIPS_IMAGE_ADDR( rgb, 0, 0 ), w * h );
associated = rgb;
}
else
/* We can do this in place.
*/
argb2rgba( (uint32_t *) VIPS_IMAGE_ADDR( associated, 0, 0 ),
w * h, rslide->bg );
return( associated );
}
static int
readslide_attach_associated( ReadSlide *rslide, VipsImage *image )
{
@ -326,38 +412,17 @@ readslide_attach_associated( ReadSlide *rslide, VipsImage *image )
for( associated_name =
openslide_get_associated_image_names( rslide->osr );
*associated_name != NULL; associated_name++ ) {
int64_t w, h;
VipsImage *associated;
uint32_t *p;
const char *error;
char buf[256];
associated = vips_image_new_memory();
openslide_get_associated_image_dimensions( rslide->osr,
*associated_name, &w, &h );
vips_image_init_fields( associated, w, h, 4, VIPS_FORMAT_UCHAR,
VIPS_CODING_NONE, VIPS_INTERPRETATION_sRGB, 1.0, 1.0 );
if( vips_image_pipelinev( associated,
VIPS_DEMAND_STYLE_THINSTRIP, NULL ) ||
vips_image_write_prepare( associated ) ) {
g_object_unref( associated );
if( !(associated = vips__openslide_get_associated( rslide,
*associated_name )) )
return( -1 );
}
p = (uint32_t *) VIPS_IMAGE_ADDR( associated, 0, 0 );
openslide_read_associated_image( rslide->osr,
*associated_name, p );
error = openslide_get_error( rslide->osr );
if( error ) {
vips_error( "openslide2vips",
_( "reading associated image: %s" ), error );
g_object_unref( associated );
return( -1 );
}
argb2rgba( p, w * h, rslide->bg );
vips_snprintf( buf, 256,
"openslide.associated.%s", *associated_name );
vips_image_set_image( image, buf, associated );
g_object_unref( associated );
}
@ -542,8 +607,9 @@ readslide_parse( ReadSlide *rslide, VipsImage *image )
"slide-associated-images", associated_names );
VIPS_FREE( associated_names );
vips_image_init_fields( image, w, h, 4, VIPS_FORMAT_UCHAR,
VIPS_CODING_NONE, VIPS_INTERPRETATION_sRGB, xres, yres );
vips_image_init_fields( image, w, h, rslide->rgb ? 3 : 4,
VIPS_FORMAT_UCHAR, VIPS_CODING_NONE,
VIPS_INTERPRETATION_sRGB, xres, yres );
return( 0 );
}
@ -551,28 +617,46 @@ readslide_parse( ReadSlide *rslide, VipsImage *image )
static int
vips__openslide_read_header( const char *filename, VipsImage *out,
int level, gboolean autocrop,
char *associated, gboolean attach_associated )
char *associated, gboolean attach_associated, gboolean rgb )
{
ReadSlide *rslide;
if( !(rslide = readslide_new( filename,
out, level, autocrop, associated, attach_associated )) ||
out, level, autocrop, associated, attach_associated, rgb )) ||
readslide_parse( rslide, out ) )
return( -1 );
return( 0 );
}
/* Allocate a tile buffer. Have one of these for each thread so we can unpack
* to vips in parallel.
*/
static void *
vips__openslide_start( VipsImage *out, void *a, void *b )
{
ReadSlide *rslide = (ReadSlide *) a;
uint32_t *tile_buffer;
if( !(tile_buffer = VIPS_MALLOC( NULL,
(size_t) rslide->tile_width * rslide->tile_height * 4 )) )
return( NULL );
return( (void *) tile_buffer );
}
static int
vips__openslide_generate( VipsRegion *out,
void *_seq, void *_rslide, void *unused, gboolean *stop )
{
uint32_t *tile_buffer = (uint32_t *) _seq;
ReadSlide *rslide = _rslide;
uint32_t bg = rslide->bg;
VipsRect *r = &out->valid;
int n = r->width * r->height;
uint32_t *buf = (uint32_t *) VIPS_REGION_ADDR( out, r->left, r->top );
uint32_t *buf;
const char *error;
VIPS_DEBUG_MSG( "vips__openslide_generate: %dx%d @ %dx%d\n",
@ -586,10 +670,21 @@ vips__openslide_generate( VipsRegion *out,
g_assert( r->width <= rslide->tile_width );
g_assert( r->height <= rslide->tile_height );
/* The memory on the region should be contiguous for our ARGB->RGBA
* loop below.
/* The memory on the region should be contiguous.
*/
g_assert( VIPS_REGION_LSKIP( out ) == r->width * 4 );
g_assert( VIPS_REGION_LSKIP( out ) == r->width * out->im->Bands );
/* In RGB mode we need to read to the tile buffer.
*/
if( rslide->rgb ) {
g_assert( tile_buffer );
g_assert( rslide->tile_width >= r->width );
g_assert( rslide->tile_height >= r->height );
buf = tile_buffer;
}
else
buf = (uint32_t *) VIPS_REGION_ADDR( out, r->left, r->top );
openslide_read_region( rslide->osr,
buf,
@ -612,16 +707,29 @@ vips__openslide_generate( VipsRegion *out,
return( -1 );
}
/* Since we are inside a cache, we know buf must be continuous.
*/
if( rslide->rgb )
argb2rgb( tile_buffer,
VIPS_REGION_ADDR( out, r->left, r->top ), n );
else
argb2rgba( buf, n, bg );
return( 0 );
}
static int
vips__openslide_stop( void *_seq, void *a, void *b )
{
uint32_t *tile_buffer = (uint32_t *) _seq;
VIPS_FREE( tile_buffer );
return( 0 );
}
static int
vips__openslide_read( const char *filename, VipsImage *out,
int level, gboolean autocrop, gboolean attach_associated )
int level, gboolean autocrop, gboolean attach_associated,
gboolean rgb )
{
ReadSlide *rslide;
VipsImage *raw;
@ -631,7 +739,7 @@ vips__openslide_read( const char *filename, VipsImage *out,
filename, level );
if( !(rslide = readslide_new( filename, out, level, autocrop,
NULL, attach_associated )) )
NULL, attach_associated, rgb )) )
return( -1 );
raw = vips_image_new();
@ -639,7 +747,9 @@ vips__openslide_read( const char *filename, VipsImage *out,
if( readslide_parse( rslide, raw ) ||
vips_image_generate( raw,
NULL, vips__openslide_generate, NULL, rslide, NULL ) )
vips__openslide_start,
vips__openslide_generate,
vips__openslide_stop, rslide, NULL ) )
return( -1 );
/* Copy to out, adding a cache. Enough tiles for two complete rows,
@ -665,41 +775,27 @@ vips__openslide_read( const char *filename, VipsImage *out,
static int
vips__openslide_read_associated( const char *filename, VipsImage *out,
const char *associated )
const char *associated_name, gboolean rgb )
{
ReadSlide *rslide;
VipsImage *raw;
uint32_t *buf;
const char *error;
VipsImage *associated;
VIPS_DEBUG_MSG( "vips__openslide_read_associated: %s %s\n",
filename, associated );
filename, associated_name );
if( !(rslide = readslide_new( filename, out, 0, FALSE,
associated, FALSE )) )
if( !(rslide = readslide_new( filename,
out, 0, FALSE, associated_name, FALSE, rgb )) )
return( -1 );
/* Memory buffer. Get associated directly to this, then copy to out.
*/
raw = vips_image_new_memory();
vips_object_local( out, raw );
if( readslide_parse( rslide, raw ) ||
vips_image_write_prepare( raw ) )
if( !(associated = vips__openslide_get_associated( rslide,
associated_name )) )
return( -1 );
buf = (uint32_t *) VIPS_IMAGE_ADDR( raw, 0, 0 );
openslide_read_associated_image( rslide->osr, rslide->associated, buf );
error = openslide_get_error( rslide->osr );
if( error ) {
vips_error( "openslide2vips",
_( "reading associated image: %s" ), error );
if( vips_image_write( associated, out ) ) {
VIPS_UNREF( associated );
return( -1 );
}
argb2rgba( buf, raw->Xsize * raw->Ysize, rslide->bg );
if( vips_image_write( raw, out ) )
return( -1 );
VIPS_UNREF( associated );
return( 0 );
}
@ -731,6 +827,10 @@ typedef struct _VipsForeignLoadOpenslide {
*/
gboolean attach_associated;
/* Read as RGB, not RGBA.
*/
gboolean rgb;
} VipsForeignLoadOpenslide;
typedef VipsForeignLoadClass VipsForeignLoadOpenslideClass;
@ -826,7 +926,8 @@ vips_foreign_load_openslide_header( VipsForeignLoad *load )
if( vips__openslide_read_header( openslide->filename, load->out,
openslide->level, openslide->autocrop,
openslide->associated, openslide->attach_associated ) )
openslide->associated, openslide->attach_associated,
openslide->rgb ) )
return( -1 );
VIPS_SETSTR( load->out->filename, openslide->filename );
@ -842,12 +943,13 @@ vips_foreign_load_openslide_load( VipsForeignLoad *load )
if( !openslide->associated ) {
if( vips__openslide_read( openslide->filename, load->real,
openslide->level, openslide->autocrop,
openslide->attach_associated ) )
openslide->attach_associated,
openslide->rgb ) )
return( -1 );
}
else {
if( vips__openslide_read_associated( openslide->filename,
load->real, openslide->associated ) )
load->real, openslide->associated, openslide->rgb ) )
return( -1 );
}
@ -917,13 +1019,20 @@ vips_foreign_load_openslide_class_init( VipsForeignLoadOpenslideClass *class )
G_STRUCT_OFFSET( VipsForeignLoadOpenslide, associated ),
NULL );
VIPS_ARG_BOOL( class, "attach_associated", 13,
VIPS_ARG_BOOL( class, "attach_associated", 23,
_( "Attach associated" ),
_( "Attach all associated images" ),
VIPS_ARGUMENT_OPTIONAL_INPUT,
G_STRUCT_OFFSET( VipsForeignLoadOpenslide, attach_associated ),
FALSE );
VIPS_ARG_BOOL( class, "rgb", 24,
_( "RGB" ),
_( "Output RGB (not RGBA)" ),
VIPS_ARGUMENT_OPTIONAL_INPUT,
G_STRUCT_OFFSET( VipsForeignLoadOpenslide, rgb ),
FALSE );
}
static void

View File

@ -572,7 +572,7 @@ vips_foreign_load_pdf_generate( VipsRegion *or,
if( vips_foreign_load_pdf_get_page( pdf, pdf->page_no + i ) )
return( -1 );
g_mutex_lock( vips_pdfium_mutex );
vips__worker_lock( vips_pdfium_mutex );
/* 4 means RGBA.
*/

View File

@ -157,6 +157,13 @@ extern "C" {
*/
#define VIPS_META_N_SUBIFDS "n-subifds"
/**
* VIPS_META_CONCURRENCY:
*
* If set, the suggested concurrency for this image.
*/
#define VIPS_META_CONCURRENCY "concurrency"
VIPS_API
guint64 vips_format_sizeof( VipsBandFormat format );
VIPS_API
@ -207,6 +214,8 @@ int vips_image_get_orientation( VipsImage *image );
VIPS_API
gboolean vips_image_get_orientation_swap( VipsImage *image );
VIPS_API
int vips_image_get_concurrency( VipsImage *image, int default_concurrency );
VIPS_API
const void *vips_image_get_data( VipsImage *image );
VIPS_API

View File

@ -143,9 +143,11 @@ extern char *vips__disc_threshold;
extern gboolean vips__cache_dump;
extern gboolean vips__cache_trace;
void vips__thread_init( void );
void vips__threadpool_init( void );
void vips__threadpool_shutdown( void );
int vips__thread_execute( const char *name, GFunc func, gpointer data );
VIPS_API void vips__worker_lock( GMutex *mutex );
void vips__cache_init( void );

View File

@ -59,7 +59,13 @@ VIPS_API
void *vips_g_thread_join( GThread *thread );
VIPS_API
gboolean vips_thread_isworker( void );
gboolean vips_thread_isvips( void );
typedef struct _VipsThreadset VipsThreadset;
VipsThreadset *vips_threadset_new( int max_threads );
int vips_threadset_run( VipsThreadset *set,
const char *domain, GFunc func, gpointer data );
void vips_threadset_free( VipsThreadset *set );
#ifdef __cplusplus
}

View File

@ -316,8 +316,10 @@ buffer_thread_get( void )
{
VipsBufferThread *buffer_thread;
if( vips_thread_isworker() ) {
/* Workers get a private set of buffers.
if( vips_thread_isvips() ) {
/* Our threads get a set of private buffers, since we know we
* will be calling vips_thread_shutdown() on thread
* termination.
*/
if( !(buffer_thread = g_private_get( buffer_thread_key )) ) {
buffer_thread = buffer_thread_new();
@ -327,7 +329,7 @@ buffer_thread_get( void )
g_assert( buffer_thread->thread == g_thread_self() );
}
else
/* Non-workers don't have one.
/* Non-vips threads don't have one.
*/
buffer_thread = NULL;

View File

@ -951,6 +951,30 @@ vips_image_get_n_pages( VipsImage *image )
return( 1 );
}
/**
* vips_image_get_concurrency:
* @image: image to get from
*
* Fetch and sanity-check #VIPS_CONCURRENCY. Default to 1 if not present or
* crazy.
*
* Returns: the suggested concurrency for this image
*/
int
vips_image_get_concurrency( VipsImage *image, int default_concurrency )
{
int concurrency;
if( vips_image_get_typeof( image, VIPS_META_CONCURRENCY ) &&
!vips_image_get_int( image,
VIPS_META_CONCURRENCY, &concurrency ) &&
concurrency >= 1 &&
concurrency < 100 )
return( concurrency );
return( default_concurrency );
}
/**
* vips_image_get_n_subifds:
* @image: image to get from

View File

@ -505,6 +505,7 @@ vips_init( const char *argv0 )
(void) _setmaxstdio( 2048 );
#endif /*G_OS_WIN32*/
vips__thread_init();
vips__threadpool_init();
vips__buffer_init();
vips__meta_init();

View File

@ -1,4 +1,7 @@
iofuncs_sources = files(
'thread.c',
'threadset.c',
'threadpool.c',
'ginputsource.c',
'sourceginput.c',
'connection.c',
@ -28,7 +31,6 @@ iofuncs_sources = files(
'region.c',
'rect.c',
'semaphore.c',
'threadpool.c',
'util.c',
'init.c',
'buf.c',

View File

@ -73,10 +73,6 @@ typedef struct _Sink {
*/
VipsImage *t;
/* Mutex for serialising calls to VipsStartFn and VipsStopFn.
*/
GMutex *sslock;
/* Call params.
*/
VipsStartFn start_fn;
@ -257,16 +253,8 @@ sink_call_stop( Sink *sink, SinkThreadState *state )
VIPS_DEBUG_MSG( "sink_call_stop: state = %p\n", state );
VIPS_GATE_START( "sink_call_stop: wait" );
g_mutex_lock( sink->sslock );
VIPS_GATE_STOP( "sink_call_stop: wait" );
result = sink->stop_fn( state->seq, sink->a, sink->b );
g_mutex_unlock( sink->sslock );
if( result ) {
SinkBase *sink_base = (SinkBase *) sink;
@ -302,16 +290,8 @@ sink_call_start( Sink *sink, SinkThreadState *state )
if( !state->seq && sink->start_fn ) {
VIPS_DEBUG_MSG( "sink_call_start: state = %p\n", state );
VIPS_GATE_START( "sink_call_start: wait" );
g_mutex_lock( sink->sslock );
VIPS_GATE_STOP( "sink_call_start: wait" );
state->seq = sink->start_fn( sink->t, sink->a, sink->b );
g_mutex_unlock( sink->sslock );
if( !state->seq ) {
SinkBase *sink_base = (SinkBase *) sink;
@ -370,7 +350,6 @@ vips_sink_thread_state_new( VipsImage *im, void *a )
static void
sink_free( Sink *sink )
{
VIPS_FREEF( vips_g_mutex_free, sink->sslock );
VIPS_FREEF( sink_area_free, sink->area );
VIPS_FREEF( sink_area_free, sink->old_area );
VIPS_FREEF( g_object_unref, sink->t );
@ -406,7 +385,6 @@ sink_init( Sink *sink,
vips_sink_base_init( &sink->sink_base, image );
sink->t = NULL;
sink->sslock = vips_g_mutex_new();
sink->start_fn = start_fn;
sink->generate_fn = generate_fn;
sink->stop_fn = stop_fn;

459
libvips/iofuncs/thread.c Normal file
View File

@ -0,0 +1,459 @@
/* Basic functions to support threading.
*
* 29/9/22
* - from threadpool.c
*/
/*
This file is part of VIPS.
VIPS is free software; you can redistribute it and/or modify
it under the terms of the GNU Lesser General Public License as published by
the Free Software Foundation; either version 2 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
02110-1301 USA
*/
/*
These files are distributed with VIPS - http://www.vips.ecs.soton.ac.uk
*/
/*
#define VIPS_DEBUG
#define VIPS_DEBUG_RED
*/
#ifdef HAVE_CONFIG_H
#include <config.h>
#endif /*HAVE_CONFIG_H*/
#include <glib/gi18n-lib.h>
#include <stdio.h>
#include <stdlib.h>
#ifdef HAVE_UNISTD_H
#include <unistd.h>
#endif /*HAVE_UNISTD_H*/
#include <errno.h>
#include <vips/vips.h>
#include <vips/internal.h>
#include <vips/thread.h>
#include <vips/debug.h>
#ifdef G_OS_WIN32
#include <windows.h>
#endif /*G_OS_WIN32*/
/* Maximum value we allow for VIPS_CONCURRENCY. We need to stop huge values
* killing the system.
*/
#define MAX_THREADS (1024)
/* Default n threads ... 0 means get from environment.
*/
int vips__concurrency = 0;
/* Default tile geometry ... can be set by vips_init().
*/
int vips__tile_width = VIPS__TILE_WIDTH;
int vips__tile_height = VIPS__TILE_HEIGHT;
int vips__fatstrip_height = VIPS__FATSTRIP_HEIGHT;
int vips__thinstrip_height = VIPS__THINSTRIP_HEIGHT;
/* Set this GPrivate to indicate that is a libvips thread.
*/
static GPrivate *is_vips_thread_key = NULL;
/* TRUE if we are a vips thread. We sometimes manage resource allocation
* differently for vips threads since we can cheaply free stuff on thread
* termination.
*/
gboolean
vips_thread_isvips( void )
{
return( g_private_get( is_vips_thread_key ) != NULL );
}
/* Glib 2.32 revised the thread API. We need some compat functions.
*/
GMutex *
vips_g_mutex_new( void )
{
GMutex *mutex;
mutex = g_new( GMutex, 1 );
g_mutex_init( mutex );
return( mutex );
}
void
vips_g_mutex_free( GMutex *mutex )
{
g_mutex_clear( mutex );
g_free( mutex );
}
GCond *
vips_g_cond_new( void )
{
GCond *cond;
cond = g_new( GCond, 1 );
g_cond_init( cond );
return( cond );
}
void
vips_g_cond_free( GCond *cond )
{
g_cond_clear( cond );
g_free( cond );
}
typedef struct {
const char *domain;
GThreadFunc func;
gpointer data;
} VipsThreadInfo;
static void *
vips_thread_run( gpointer data )
{
VipsThreadInfo *info = (VipsThreadInfo *) data;
void *result;
/* Set this to something (anything) to tag this thread as a vips
* worker. No need to call g_private_replace as there is no
* GDestroyNotify handler associated with a worker.
*/
g_private_set( is_vips_thread_key, info );
result = info->func( info->data );
g_free( info );
vips_thread_shutdown();
return( result );
}
GThread *
vips_g_thread_new( const char *domain, GThreadFunc func, gpointer data )
{
GThread *thread;
VipsThreadInfo *info;
GError *error = NULL;
info = g_new( VipsThreadInfo, 1 );
info->domain = domain;
info->func = func;
info->data = data;
thread = g_thread_try_new( domain, vips_thread_run, info, &error );
VIPS_DEBUG_MSG_RED( "vips_g_thread_new: g_thread_create( %s ) = %p\n",
domain, thread );
if( !thread ) {
if( error )
vips_g_error( &error );
else
vips_error( domain,
"%s", _( "unable to create thread" ) );
}
return( thread );
}
void *
vips_g_thread_join( GThread *thread )
{
void *result;
result = g_thread_join( thread );
VIPS_DEBUG_MSG_RED( "vips_g_thread_join: g_thread_join( %p )\n",
thread );
return( result );
}
static int
get_num_processors( void )
{
#if GLIB_CHECK_VERSION( 2, 48, 1 )
/* We could use g_get_num_processors when GLib >= 2.48.1, see:
* https://gitlab.gnome.org/GNOME/glib/commit/999711abc82ea3a698d05977f9f91c0b73957f7f
* https://gitlab.gnome.org/GNOME/glib/commit/2149b29468bb99af3c29d5de61f75aad735082dc
*/
return( g_get_num_processors() );
#else
int nproc;
nproc = 1;
#ifdef G_OS_UNIX
#if defined(HAVE_UNISTD_H) && defined(_SC_NPROCESSORS_ONLN)
{
/* POSIX style.
*/
int x;
x = sysconf( _SC_NPROCESSORS_ONLN );
if( x > 0 )
nproc = x;
}
#elif defined HW_NCPU
{
/* BSD style.
*/
int x;
size_t len = sizeof(x);
sysctl( (int[2]) {CTL_HW, HW_NCPU}, 2, &x, &len, NULL, 0 );
if( x > 0 )
nproc = x;
}
#endif
/* libgomp has some very complex code on Linux to count the number of
* processors available to the current process taking pthread affinity
* into account, but we don't attempt that here. Perhaps we should?
*/
#endif /*G_OS_UNIX*/
#ifdef G_OS_WIN32
{
/* Count the CPUs currently available to this process.
*/
SYSTEM_INFO sysinfo;
DWORD_PTR process_cpus;
DWORD_PTR system_cpus;
/* This *never* fails, use it as fallback
*/
GetNativeSystemInfo( &sysinfo );
nproc = (int) sysinfo.dwNumberOfProcessors;
if( GetProcessAffinityMask( GetCurrentProcess(),
&process_cpus, &system_cpus ) ) {
unsigned int af_count;
for( af_count = 0; process_cpus != 0; process_cpus >>= 1 )
if( process_cpus & 1 )
af_count++;
/* Prefer affinity-based result, if available
*/
if( af_count > 0 )
nproc = af_count;
}
}
#endif /*G_OS_WIN32*/
return( nproc );
#endif /*!GLIB_CHECK_VERSION( 2, 48, 1 )*/
}
/* The default concurrency, set by the environment variable VIPS_CONCURRENCY,
* or if that is not set, the number of threads available on the host machine.
*/
static int
vips__concurrency_get_default( void )
{
const char *str;
int nthr;
int x;
/* Tell the threads system how much concurrency we expect.
*/
if( vips__concurrency > 0 )
nthr = vips__concurrency;
else if( ((str = g_getenv( "VIPS_CONCURRENCY" ))
#if ENABLE_DEPRECATED
|| (str = g_getenv( "IM_CONCURRENCY" ))
#endif
) && (x = atoi( str )) > 0 )
nthr = x;
else
nthr = get_num_processors();
if( nthr < 1 ||
nthr > MAX_THREADS ) {
nthr = VIPS_CLIP( 1, nthr, MAX_THREADS );
g_warning( _( "threads clipped to %d" ), nthr );
}
return( nthr );
}
/**
* vips_concurrency_set:
* @concurrency: number of threads to run
*
* Sets the number of worker threads that vips should use when running a
* #VipsThreadPool.
*
* The special value 0 means "default". In this case, the number of threads
* is set by the environment variable VIPS_CONCURRENCY, or if that is not
* set, the number of threads available on the host machine.
*
* See also: vips_concurrency_get().
*/
void
vips_concurrency_set( int concurrency )
{
/* Tell the threads system how much concurrency we expect.
*/
if( concurrency < 1 )
concurrency = vips__concurrency_get_default();
else if( concurrency > MAX_THREADS ) {
concurrency = MAX_THREADS;
g_warning( _( "threads clipped to %d" ), MAX_THREADS );
}
vips__concurrency = concurrency;
}
/**
* vips_concurrency_get:
*
* Returns the number of worker threads that vips should use when running a
* #VipsThreadPool.
*
* vips gets this values from these sources in turn:
*
* If vips_concurrency_set() has been called, this value is used. The special
* value 0 means "default". You can also use the command-line argument
* "--vips-concurrency" to set this value.
*
* If vips_concurrency_set() has not been called and no command-line argument
* was used, vips uses the value of the environment variable VIPS_CONCURRENCY,
*
* If VIPS_CONCURRENCY has not been set, vips finds the number of hardware
* threads that the host machine can run in parallel and uses that value.
*
* The final value is clipped to the range 1 - 1024.
*
* See also: vips_concurrency_get().
*
* Returns: number of worker threads to use.
*/
int
vips_concurrency_get( void )
{
return( vips__concurrency );
}
/**
* vips_get_tile_size: (method)
* @im: image to guess for
* @tile_width: (out): return selected tile width
* @tile_height: (out): return selected tile height
* @n_lines: (out): return buffer height in scanlines
*
* Pick a tile size and a buffer height for this image and the current
* value of vips_concurrency_get(). The buffer height
* will always be a multiple of tile_height.
*
* The buffer height is the height of each buffer we fill in sink disc. Since
* we have two buffers, the largest range of input locality is twice the output
* buffer size, plus whatever margin we add for things like convolution.
*/
void
vips_get_tile_size( VipsImage *im,
int *tile_width, int *tile_height, int *n_lines )
{
const int nthr = vips_concurrency_get();
const int typical_image_width = 1000;
/* Compiler warnings.
*/
*tile_width = 1;
*tile_height = 1;
/* Pick a render geometry.
*/
switch( im->dhint ) {
case VIPS_DEMAND_STYLE_SMALLTILE:
*tile_width = vips__tile_width;
*tile_height = vips__tile_height;
break;
case VIPS_DEMAND_STYLE_ANY:
case VIPS_DEMAND_STYLE_FATSTRIP:
*tile_width = im->Xsize;
*tile_height = vips__fatstrip_height;
break;
case VIPS_DEMAND_STYLE_THINSTRIP:
*tile_width = im->Xsize;
/* Only enable thinstrip height for very wide images -- the
* overheads are too high to be worthwhile otherwise.
*/
*tile_height = im->Xsize > 10000 ?
vips__thinstrip_height : vips__fatstrip_height;
break;
default:
g_assert_not_reached();
}
/* We can't set n_lines for the current demand style: a later bit of
* the pipeline might see a different hint and we need to synchronise
* buffer sizes everywhere.
*
* We also can't depend on the current image size, since that might
* change down the pipeline too. Pick a typical image width.
*
* Pick the maximum buffer size we might possibly need, then round up
* to a multiple of tileheight.
*/
*n_lines = vips__tile_height *
VIPS_ROUND_UP( vips__tile_width * nthr,
typical_image_width ) /
typical_image_width;
*n_lines = VIPS_MAX( *n_lines, vips__fatstrip_height * nthr );
*n_lines = VIPS_MAX( *n_lines, vips__thinstrip_height * nthr );
*n_lines = VIPS_ROUND_UP( *n_lines, *tile_height );
/* We make this assumption in several places.
*/
g_assert( *n_lines % *tile_height == 0 );
VIPS_DEBUG_MSG( "vips_get_tile_size: %d by %d patches, "
"groups of %d scanlines\n",
*tile_width, *tile_height, *n_lines );
}
void
vips__thread_init( void )
{
static GPrivate private = { 0 };
is_vips_thread_key = &private;
if( vips__concurrency == 0 )
vips__concurrency = vips__concurrency_get_default();
}

File diff suppressed because it is too large Load Diff

330
libvips/iofuncs/threadset.c Normal file
View File

@ -0,0 +1,330 @@
/* A set of threads.
*
* Creating and destroying threads can be expensive on some platforms, so we
* try to only create once, then reuse.
*/
/*
This file is part of VIPS.
VIPS is free software; you can redistribute it and/or modify
it under the terms of the GNU Lesser General Public License as published by
the Free Software Foundation; either version 2 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
02110-1301 USA
*/
/*
These files are distributed with VIPS - http://www.vips.ecs.soton.ac.uk
*/
#ifdef HAVE_CONFIG_H
#include <config.h>
#endif /*HAVE_CONFIG_H*/
#include <glib/gi18n-lib.h>
#include <stdio.h>
#include <stdlib.h>
#ifdef HAVE_UNISTD_H
#include <unistd.h>
#endif /*HAVE_UNISTD_H*/
#include <errno.h>
#include <vips/vips.h>
#include <vips/internal.h>
#include <vips/thread.h>
#include <vips/debug.h>
typedef struct _VipsThreadsetMember {
/* The set we are part of.
*/
VipsThreadset *set;
/* The underlying glib thread object.
*/
GThread *thread;
/* The task the thread should run next.
*/
const char *domain;
GFunc func;
void *data;
void *user_data;
/* The thread waits on this when it's free.
*/
VipsSemaphore idle;
/* Set by our controller to request exit.
*/
gboolean kill;
} VipsThreadsetMember;
struct _VipsThreadset {
GMutex *lock;
/* All the VipsThreadsetMember we have created.
*/
GSList *members;
/* The set of currently idle threads.
*/
GSList *free;
/* The current number of threads, the highwater mark, and
* the max we allow before blocking thread creation.
*/
int n_threads;
int n_threads_highwater;
int max_threads;
};
/* The thread work function.
*/
static void *
vips_threadset_work( void *pointer )
{
VipsThreadsetMember *member = (VipsThreadsetMember *) pointer;
VipsThreadset *set = member->set;
for(;;) {
/* Wait to be given work.
*/
vips_semaphore_down( &member->idle );
if( member->kill ||
!member->func )
break;
/* If we're profiling, attach a prof struct to this thread.
*/
if( vips__thread_profile )
vips__thread_profile_attach( member->domain );
/* Execute the task.
*/
member->func( member->data, member->user_data );
/* Free any thread-private resources -- they will not be
* useful for the next task to use this thread.
*/
vips_thread_shutdown();
member->domain = NULL;
member->func = NULL;
member->data = NULL;
member->user_data = NULL;
/* We are free ... back on the free list!
*/
g_mutex_lock( set->lock );
set->free = g_slist_prepend( set->free, member );
g_mutex_unlock( set->lock );
}
/* Kill has been requested. We leave this thread on the members
* list so it can be found and joined.
*/
return( NULL );
}
/* Create a new idle member for the set.
*/
static VipsThreadsetMember *
vips_threadset_add( VipsThreadset *set )
{
VipsThreadsetMember *member;
if( set->max_threads &&
set->n_threads >= set->max_threads ) {
vips_error( "VipsThreadset",
"%s", _( "threadset is exhausted" ) );
return( NULL );
}
member = g_new0( VipsThreadsetMember, 1 );
member->set = set;
vips_semaphore_init( &member->idle, 0, "idle" );
if( !(member->thread = vips_g_thread_new( "libvips worker",
vips_threadset_work, member )) ) {
vips_semaphore_destroy( &member->idle );
VIPS_FREE( member );
return( NULL );
}
g_mutex_lock( set->lock );
set->members = g_slist_prepend( set->members, member );
set->n_threads += 1;
set->n_threads_highwater =
VIPS_MAX( set->n_threads_highwater, set->n_threads );;
g_mutex_unlock( set->lock );
return( member );
}
/**
* vips_threadset_new:
* @max_threads: maxium number of system threads
*
* Create a new threadset.
*
* If @max_threads is 0, new threads will be created when necessary by
* vips_threadset_run(), with no limit on the number of threads.
*
* If @max_threads is > 0, then that many threads will be created by
* vips_threadset_new() during startup and vips_threadset_run() will fail if
* no free threads are available.
*
* Returns: the new threadset.
*/
VipsThreadset *
vips_threadset_new( int max_threads )
{
VipsThreadset *set;
set = g_new0( VipsThreadset, 1 );
set->lock = vips_g_mutex_new();
set->max_threads = max_threads;
if( set->max_threads > 0 )
for( int i = 0; i < set->max_threads; i++ ) {
VipsThreadsetMember *member;
if( !(member = vips_threadset_add( set )) ) {
vips_threadset_free( set );
return( NULL );
}
set->free = g_slist_prepend( set->free, member );
}
return( set );
}
/**
* vips_threadset_run:
* @set: the threadset to run the task in
* @domain: the name of the task (useful for debugging)
* @func: the task to execute
* @data: the task's data
*
* Execute a task in a thread. If there are no idle threads, create a new one,
* provided we are under @max_threads.
*
* See also: vips_threadset_new().
*
* Returns: 0 on success, or -1 on error.
*/
int
vips_threadset_run( VipsThreadset *set,
const char *domain, GFunc func, gpointer data )
{
VipsThreadsetMember *member;
member = NULL;
/* Try to get an idle thread.
*/
g_mutex_lock( set->lock );
if( set->free ) {
member = (VipsThreadsetMember *) set->free->data;
set->free = g_slist_remove( set->free, member );
}
g_mutex_unlock( set->lock );
/* None? Make a new idle but not free member.
*/
if( !member )
member = vips_threadset_add( set );
/* Still nothing? Thread create has failed.
*/
if( !member )
return( -1 );
/* Allocate the task and set it going.
*/
member->domain = domain;
member->func = func;
member->data = data;
member->user_data = NULL;
vips_semaphore_up( &member->idle );
return( 0 );
}
/* Kill a member.
*/
static void
vips_threadset_kill_member( VipsThreadsetMember *member )
{
VipsThreadset *set = member->set;
member->kill = TRUE;
vips_semaphore_up( &member->idle );
g_thread_join( member->thread );
vips_semaphore_destroy( &member->idle );
g_mutex_lock( set->lock );
set->free = g_slist_remove( set->free, member );
set->n_threads -= 1;
g_mutex_unlock( set->lock );
VIPS_FREE( member );
}
/**
* vips_threadset_free:
* @set: the threadset to free
*
* Free a threadset. This call will block until all pending tasks are
* finished.
*/
void
vips_threadset_free( VipsThreadset *set )
{
VIPS_DEBUG_MSG( "vips_threadset_free: %p\n", set );
/* Try to get and finish a thread.
*/
for(;;) {
VipsThreadsetMember *member;
member = NULL;
g_mutex_lock( set->lock );
if( set->members ) {
member = (VipsThreadsetMember *) set->members->data;
set->members = g_slist_remove( set->members, member );
}
g_mutex_unlock( set->lock );
if( !member )
break;
vips_threadset_kill_member( member );
}
if( vips__leak )
printf( "vips_threadset_free: peak of %d threads\n",
set->n_threads_highwater );
VIPS_FREEF( vips_g_mutex_free, set->lock );
VIPS_FREE( set );
}

View File

@ -3,4 +3,5 @@ leak:bash
leak:libfontconfig.so
leak:libIlmImf-2_5.so
leak:libIlmThread-2_5.so
leak:libMagickCore-6.Q16.so
leak:libx265.so

View File

@ -43,6 +43,15 @@ if [ $(echo "$max > 0" | bc) -eq 1 ]; then
echo error, max == $max
exit 1
else
echo all threading tests passed
echo all benchmark threading tests passed
fi
# setting VIPS_MAX_THREADS low should force a small thread limit
echo -n "checking threadset size limit ... "
VIPS_MAX_THREADS=5 VIPS_CONCURRENCY=3 vips copy $image x.v || exit_code=$?
if [ $exit_code -ne 0 ]; then
echo FAILED
exit 1
fi
echo ok

View File

@ -2,7 +2,7 @@ top_srcdir=@abs_top_srcdir@
top_builddir=@abs_top_builddir@
PYTHON=@PYTHON@
# we need a different tmp for each script since make can run tests in parallel
tmp=$top_srcdir/test/tmp-$$
tmp=$top_builddir/test/tmp-$$
test_images=$top_srcdir/test/test-suite/images
image=$test_images/sample.jpg
mkdir -p $tmp