strip out threadgroup

This commit is contained in:
John Cupitt 2010-04-16 21:21:15 +00:00
parent f1ebc12fb1
commit fdbdba432b
22 changed files with 242 additions and 3087 deletions

View File

@ -13,6 +13,7 @@
- added VIPS_DEBUG_MSG() macro
- --vips-wbuffer2 turns on threadpool for im_iterate as well
- im_vips2tiff() uses vips_sink() instead of threadgroup
- strip out threadgroup
16/1/10 started 7.21.2
- "invalidate" is careful to keep images alive, so invalidate callbacks can do

4
TODO
View File

@ -1,4 +1,8 @@
- swap all im_wbuffer() for vips_sink_disc()
swap all im_iterate() for vips_sink_disc()
- im_prepare_thread() can go? I think it was only used by im_render()

View File

@ -397,3 +397,30 @@ im_isnative( im_arch_type arch )
g_assert( 0 );
}
}
int
im_iterate( IMAGE *im,
im_start_fn start, im_generate_fn generate, im_stop_fn stop,
void *b, void *c )
{
return( vips_sink( im, start, generate, stop, b, c ) );
}
int
im_render_priority( IMAGE *in, IMAGE *out, IMAGE *mask,
int width, int height, int max,
int priority,
notify_fn notify, void *client )
{
return( vips_sink_screen( in, out, mask,
width, height, max, priority, notify, client ) );
}
int
im_cache( IMAGE *in, IMAGE *out, int width, int height, int max )
{
return( im_render_priority( in, out, NULL,
width, height, max,
0,
NULL, NULL ) );
}

View File

@ -201,7 +201,6 @@ typedef struct {
IMAGE *in;
struct jpeg_compress_struct cinfo;
ErrorManager eman;
im_threadgroup_t *tg;
JSAMPROW *row_pointer;
char *profile_bytes;
unsigned int profile_length;
@ -212,7 +211,6 @@ static void
write_destroy( Write *write )
{
jpeg_destroy_compress( &write->cinfo );
IM_FREEF( im_threadgroup_free, write->tg );
IM_FREEF( im_close, write->in );
IM_FREEF( fclose, write->eman.fp );
IM_FREE( write->row_pointer );
@ -237,7 +235,6 @@ write_new( IMAGE *in )
return( NULL );
}
write->tg = NULL;
write->row_pointer = NULL;
write->cinfo.err = jpeg_std_error( &write->eman.pub );
write->eman.pub.error_exit = new_error_exit;
@ -523,7 +520,7 @@ write_profile_meta( Write *write )
}
static int
write_jpeg_block( REGION *region, Rect *area, void *a, void *b )
write_jpeg_block( REGION *region, Rect *area, void *a )
{
Write *write = (Write *) a;
int i;
@ -597,9 +594,8 @@ write_vips( Write *write, int qfac, const char *profile )
/* Build VIPS output stuff now we know the image we'll be writing.
*/
write->tg = im_threadgroup_create( in );
write->row_pointer = IM_ARRAY( NULL, write->tg->nlines, JSAMPROW );
if( !write->tg || !write->row_pointer )
if( !(write->row_pointer =
IM_ARRAY( NULL, write->in->Ysize, JSAMPROW )) )
return( -1 );
/* Rest to default.
@ -630,7 +626,7 @@ write_vips( Write *write, int qfac, const char *profile )
/* Write data. Note that the write function grabs the longjmp()!
*/
if( im_wbuffer( write->tg, write_jpeg_block, write, NULL ) )
if( vips_sink_disc( write->in, write_jpeg_block, write ) )
return( -1 );
/* We have to reinstate the setjmp() before we jpeg_finish_compress().

View File

@ -98,7 +98,6 @@ user_warning_function( png_structp png_ptr, png_const_charp warning_msg )
*/
typedef struct {
IMAGE *in;
im_threadgroup_t *tg;
FILE *fp;
png_structp pPng;
@ -109,7 +108,6 @@ typedef struct {
static void
write_destroy( Write *write )
{
IM_FREEF( im_threadgroup_free, write->tg );
IM_FREEF( im_close, write->in );
IM_FREEF( fclose, write->fp );
if( write->pPng )
@ -135,13 +133,12 @@ write_new( IMAGE *in )
return( NULL );
}
write->tg = im_threadgroup_create( write->in );
write->row_pointer = IM_ARRAY( NULL, write->tg->nlines, png_bytep );
write->row_pointer = IM_ARRAY( NULL, in->Ysize, png_bytep );
write->fp = NULL;
write->pPng = NULL;
write->pInfo = NULL;
if( !write->tg || !write->row_pointer ) {
if( !write->row_pointer ) {
write_destroy( write );
return( NULL );
}
@ -169,7 +166,7 @@ write_new( IMAGE *in )
}
static int
write_png_block( REGION *region, Rect *area, void *a, void *b )
write_png_block( REGION *region, Rect *area, void *a )
{
Write *write = (Write *) a;
int i;
@ -252,7 +249,7 @@ write_vips( Write *write, int compress, int interlace )
/* Write data.
*/
for( i = 0; i < nb_passes; i++ )
if( im_wbuffer( write->tg, write_png_block, write, NULL ) )
if( vips_sink_disc( write->in, write_png_block, write ) )
return( -1 );
/* The setjmp() was held by our background writer: reset it.

View File

@ -56,19 +56,20 @@
#include <dmalloc.h>
#endif /*WITH_DMALLOC*/
typedef int (*write_fn)( IMAGE *in, FILE *fp, PEL *p );
/* What we track during a PPM write.
*/
typedef struct {
IMAGE *in;
im_threadgroup_t *tg;
FILE *fp;
char *name;
write_fn fn;
} Write;
static void
write_destroy( Write *write )
{
IM_FREEF( im_threadgroup_free, write->tg );
IM_FREEF( fclose, write->fp );
IM_FREE( write->name );
@ -84,11 +85,10 @@ write_new( IMAGE *in, const char *name )
return( NULL );
write->in = in;
write->tg = im_threadgroup_create( write->in );
write->name = im_strdup( NULL, name );
write->fp = im__file_open_write( name );
if( !write->tg || !write->name || !write->fp ) {
if( !write->name || !write->fp ) {
write_destroy( write );
return( NULL );
}
@ -96,8 +96,6 @@ write_new( IMAGE *in, const char *name )
return( write );
}
typedef int (*write_fn)( IMAGE *in, FILE *fp, PEL *p );
static int
write_ppm_line_ascii( IMAGE *in, FILE *fp, PEL *p )
{
@ -163,16 +161,15 @@ write_ppm_line_binary( IMAGE *in, FILE *fp, PEL *p )
}
static int
write_ppm_block( REGION *region, Rect *area, void *a, void *b )
write_ppm_block( REGION *region, Rect *area, void *a )
{
Write *write = (Write *) a;
write_fn fn = (write_fn) b;
int i;
for( i = 0; i < area->height; i++ ) {
PEL *p = (PEL *) IM_REGION_ADDR( region, 0, area->top + i );
if( fn( write->in, write->fp, p ) )
if( write->fn( write->in, write->fp, p ) )
return( -1 );
}
@ -183,7 +180,6 @@ static int
write_ppm( Write *write, int ascii )
{
IMAGE *in = write->in;
write_fn fn = ascii ? write_ppm_line_ascii : write_ppm_line_binary;
int max_value;
char *magic;
@ -223,7 +219,9 @@ write_ppm( Write *write, int ascii )
fprintf( write->fp, "%d %d\n", in->Xsize, in->Ysize );
fprintf( write->fp, "%d\n", max_value );
if( im_wbuffer( write->tg, write_ppm_block, write, fn ) )
write->fn = ascii ? write_ppm_line_ascii : write_ppm_line_binary;
if( vips_sink_disc( write->in, write_ppm_block, write ) )
return( -1 );
return( 0 );

View File

@ -57,7 +57,6 @@
*/
typedef struct {
IMAGE *in;
im_threadgroup_t *tg;
int fd;
} Write;
@ -66,7 +65,6 @@ typedef struct {
static void
write_destroy( Write *write )
{
IM_FREEF( im_threadgroup_free, write->tg );
im_free( write );
}
@ -80,10 +78,9 @@ write_new( IMAGE *in, int fd )
return( NULL );
write->in = in;
write->tg = im_threadgroup_create( write->in );
write->fd = fd;
if( !write->tg || !write->fd ) {
if( !write->fd ) {
write_destroy( write );
return( NULL );
}
@ -93,7 +90,7 @@ write_new( IMAGE *in, int fd )
static int
write_block( REGION *region, Rect *area, void *a, void *b )
write_block( REGION *region, Rect *area, void *a )
{
Write *write = (Write *) a;
int i;
@ -128,7 +125,7 @@ im_vips2raw( IMAGE *in, int fd )
if( im_pincheck( in ) || !(write = write_new( in, fd )) )
return( -1 );
if( im_wbuffer( write->tg, write_block, write, NULL ) ) {
if( vips_sink_disc( in, write_block, write ) ) {
write_destroy( write );
return( -1 );
}

View File

@ -1023,7 +1023,6 @@ typedef struct {
IMAGE *in;
char *filename;
im_threadgroup_t *tg;
FILE *fout;
char format[256];
double expos;
@ -1036,7 +1035,6 @@ typedef struct {
static void
write_destroy( Write *write )
{
IM_FREEF( im_threadgroup_free, write->tg );
IM_FREE( write->filename );
IM_FREEF( fclose, write->fout );
@ -1054,7 +1052,6 @@ write_new( IMAGE *in, const char *filename )
write->in = in;
write->filename = im_strdup( NULL, filename );
write->tg = im_threadgroup_create( write->in );
write->fout = im__file_open_write( filename );
strcpy( write->format, COLRFMT );
write->expos = 1.0;
@ -1070,7 +1067,7 @@ write_new( IMAGE *in, const char *filename )
write->prims[3][0] = CIE_x_w;
write->prims[3][1] = CIE_y_w;
if( !write->filename || !write->tg || !write->fout ) {
if( !write->filename || !write->fout ) {
write_destroy( write );
return( NULL );
}
@ -1125,7 +1122,7 @@ vips2rad_put_header( Write *write )
}
static int
vips2rad_put_data_block( REGION *region, Rect *area, void *a, void *b )
vips2rad_put_data_block( REGION *region, Rect *area, void *a )
{
Write *write = (Write *) a;
int i;
@ -1143,7 +1140,7 @@ vips2rad_put_data_block( REGION *region, Rect *area, void *a, void *b )
static int
vips2rad_put_data( Write *write )
{
if( im_wbuffer( write->tg, vips2rad_put_data_block, write, NULL ) )
if( vips_sink_disc( write->in, vips2rad_put_data_block, write ) )
return( -1 );
return( 0 );

View File

@ -38,7 +38,6 @@ pkginclude_HEADERS = \
resample.h \
semaphore.h \
struct.h \
threadgroup.h \
threadpool.h \
thread.h \
transform.h \

View File

@ -70,10 +70,6 @@ extern int im__concurrency;
*/
extern int im__progress;
/* Use wbuffer2.
*/
extern int im__wbuffer2;
typedef int (*im__fftproc_fn)( IMAGE *, IMAGE *, IMAGE * );
/* iofuncs

View File

@ -1,129 +0,0 @@
/* Thread eval for VIPS.
*
* 29/9/99 JC
* - from thread.h
*/
/*
This file is part of VIPS.
VIPS is free software; you can redistribute it and/or modify
it under the terms of the GNU Lesser General Public License as published by
the Free Software Foundation; either version 2 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
*/
/*
These files are distributed with VIPS - http://www.vips.ecs.soton.ac.uk
*/
#ifndef IM_THREADGROUP_H
#define IM_THREADGROUP_H
#ifdef __cplusplus
extern "C" {
#endif /*__cplusplus*/
#include <vips/semaphore.h>
/* What we track for each thread.
*/
typedef struct {
/* All private.
*/
/*< private >*/
REGION *reg; /* Region this thread operates on */
struct im__threadgroup_t *tg; /* Thread group we are part of */
GThread *thread; /* Thread for this region */
im_semaphore_t go; /* Thread waits here to start work */
int kill; /* Set this to make thread exit */
int error; /* Set by thread if work fn fails */
REGION *oreg; /* If part of an inplace threadgroup, */
Rect pos; /* Where this thread should write */
int x, y; /* Its result */
void *a, *b, *c; /* User arguments to work fns */
#ifdef TIME_THREAD
double *btime, *etime;
int tpos;
#endif /*TIME_THREAD*/
} im_thread_t;
/* A work function.
*/
typedef int (*im__work_fn)( im_thread_t *thr,
REGION *, void *, void *, void * );
/* What we track for a group of threads working together.
*/
typedef struct im__threadgroup_t {
/* All private.
*/
/*< private >*/
int zombie; /* Set if has been freed */
IMAGE *im; /* Image we are calculating */
int pw, ph; /* Tile size */
int nlines; /* Scanlines-at-once we prefer for iteration */
im__work_fn work; /* Work fn for this threadgroup */
int nthr; /* Number of threads in group */
im_thread_t **thr; /* Threads */
im_semaphore_t idle_sem;/* The number of idle threads */
GSList *idle; /* All the idle threads */
GMutex *idle_lock;
#ifdef DEBUG_HIGHWATER
int nidle; /* Number of idles */
int min_idle; /* How short idle got */
#endif /*DEBUG_HIGHWATER*/
int kill; /* Set this to stop threadgroup early */
int progress; /* Set this to get eval progress feedback */
} im_threadgroup_t;
void im_concurrency_set( int concurrency );
int im_concurrency_get( void );
/* Thread group functions.
*/
im_threadgroup_t *im_threadgroup_create( IMAGE *im );
int im_threadgroup_free( im_threadgroup_t *tg );
im_thread_t *im_threadgroup_get( im_threadgroup_t *tg );
void im_threadgroup_trigger( im_thread_t *thr );
void im_threadgroup_wait( im_threadgroup_t *tg );
int im_threadgroup_iserror( im_threadgroup_t *tg );
/* Threaded im_prepare()
*/
int im_prepare_thread( im_threadgroup_t *tg, REGION *oreg, Rect *r );
/* Threaded, double-buffered eval to file.
*/
typedef int (*im_wbuffer_fn)( REGION *region, Rect *area, void *a, void *b );
int im_wbuffer( im_threadgroup_t *tg,
im_wbuffer_fn write_fn, void *a, void *b );
#ifdef __cplusplus
}
#endif /*__cplusplus*/
#endif /*IM_THREADGROUP_H*/

View File

@ -144,8 +144,13 @@ int vips_sink_screen( VipsImage *in, VipsImage *out, VipsImage *mask,
int priority,
VipsSinkNotify notify, void *a );
int vips_sink_memory( VipsImage *im );
void im__print_renders( void );
void im_concurrency_set( int concurrency );
int im_concurrency_get( void );
#ifdef __cplusplus
}
#endif /*__cplusplus*/

View File

@ -116,7 +116,6 @@ extern "C" {
#include <vips/check.h>
#include <vips/interpolate.h>
#include <vips/semaphore.h>
#include <vips/threadgroup.h>
#include <vips/threadpool.h>
#include <vips/meta.h>

View File

@ -17,18 +17,16 @@ libiofuncs_la_SOURCES = \
im_histlin.c \
im_image.c \
init.c \
im_iterate.c \
im_mapfile.c \
im_open.c \
im_open_vips.c \
im_partial.c \
im_prepare.c \
im_render.c \
im_setbuf.c \
im_setupout.c \
im_unmapfile.c \
im_guess_prefix.c \
im_wbuffer.c \
sinkmemory.c \
sinkscreen.c \
sinkdisc.c \
sink.c \
@ -41,7 +39,6 @@ libiofuncs_la_SOURCES = \
region.c \
rect.c \
semaphore.c \
threadgroup.c \
threadpool.c \
util.c \
im_init_world.c \

View File

@ -45,6 +45,8 @@
* - new start/end eval callbacks
* 7/10/09
* - gtkdoc comments
* 16/4/10
* - remove threadgroup stuff
*/
/*
@ -294,184 +296,6 @@ im_allocate_input_array( IMAGE *out, ... )
* Returns: 0 on success, -1 on error.
*/
static int
generate_work( im_thread_t *thr,
REGION *reg, void *a, void *b, void *c )
{
/* thr pos needs to be set before coming here ... check.
*/
{
Rect image;
image.left = 0;
image.top = 0;
image.width = thr->tg->im->Xsize;
image.height = thr->tg->im->Ysize;
g_assert( im_rect_includesrect( &image, &thr->pos ) );
}
if( im_prepare_to( reg, thr->oreg, &thr->pos, thr->x, thr->y ) )
return( -1 );
return( 0 );
}
/* Loop over a big region, filling it in many small pieces with threads.
*/
static int
eval_to_region( REGION *or, im_threadgroup_t *tg )
{
Rect *r = &or->valid;
Rect image;
int x, y;
#ifdef DEBUG_IO
int ntiles = 0;
printf( "eval_to_region: partial image output to region\n" );
printf( "\tleft = %d, top = %d, width = %d, height = %d\n",
r->left, r->top, r->width, r->height );
#endif /*DEBUG_IO*/
image.left = 0;
image.top = 0;
image.width = or->im->Xsize;
image.height = or->im->Ysize;
/* Our work function ... an inplace one.
*/
tg->work = generate_work;
/* Loop over or, attaching to all sub-parts in turn.
*/
for( y = r->top; y < IM_RECT_BOTTOM( r ); y += tg->ph )
for( x = r->left; x < IM_RECT_RIGHT( r ); x += tg->pw ) {
im_thread_t *thr;
Rect pos;
Rect clipped;
/* thrs appear on idle when the child thread does
* threadgroup_idle_add and hits the 'go' semaphore.
*/
thr = im_threadgroup_get( tg );
/* Set the position we want to generate with this
* thread. Clip against the size of the image and the
* space available in or.
*/
pos.left = x;
pos.top = y;
pos.width = tg->pw;
pos.height = tg->ph;
im_rect_intersectrect( &pos, &image, &clipped );
im_rect_intersectrect( &clipped, r, &clipped );
/* Note params and start work.
*/
thr->oreg = or;
thr->pos = clipped;
thr->x = clipped.left;
thr->y = clipped.top;
im_threadgroup_trigger( thr );
/* Check for errors.
*/
if( im_threadgroup_iserror( tg ) ) {
/* Don't kill threads yet ... we may want to
* get some error stuff out of them.
*/
im_threadgroup_wait( tg );
return( -1 );
}
#ifdef DEBUG_IO
ntiles++;
#endif /*DEBUG_IO*/
}
/* Wait for all threads to hit 'go' again.
*/
im_threadgroup_wait( tg );
if( im_threadgroup_iserror( tg ) )
return( -1 );
#ifdef DEBUG_IO
printf( "eval_to_region: %d patches calculated\n", ntiles );
#endif /*DEBUG_IO*/
return( 0 );
}
/* Output to a memory area. Might be im_setbuf(), im_mmapin()/im_makerw() or
* im_mmapinrw().
*/
static int
eval_to_memory( im_threadgroup_t *tg, REGION *or )
{
int y, chunk;
IMAGE *im = or->im;
int result;
result = 0;
#ifdef DEBUG_IO
int ntiles = 0;
printf( "eval_to_memory: partial image output to memory area\n" );
#endif /*DEBUG_IO*/
/* Signal start of eval.
*/
if( im__start_eval( im ) )
return( -1 );
/* Choose a chunk size ... 1/100th of the height of the image, about.
* This sets the granularity of user feedback on eval progress, but
* does not affect mem requirements etc.
*/
chunk = (im->Ysize / 100) + 1;
/* Loop down the output image, evaling each chunk.
*/
for( y = 0; y < im->Ysize; y += chunk ) {
Rect pos;
/* Attach or to this position in image.
*/
pos.left = 0;
pos.top = y;
pos.width = im->Xsize;
pos.height = IM_MIN( chunk, im->Ysize - y );
if( (result = im_region_image( or, &pos )) )
break;
/* Ask for evaluation of this area.
*/
if( (result = eval_to_region( or, tg )) )
break;
/* Trigger any eval callbacks on our source image.
*/
if( (result = im__handle_eval( im, pos.width, pos.height )) )
break;
#ifdef DEBUG_IO
ntiles++;
#endif /*DEBUG_IO*/
}
/* Signal end of eval.
*/
result |= im__end_eval( im );
#ifdef DEBUG_IO
printf( "eval_to_memory: %d patches calculated\n", ntiles );
#endif /*DEBUG_IO*/
return( result );
}
/* A write function for VIPS images. Just write() the pixel data.
*/
static int
@ -597,10 +421,8 @@ im_generate( IMAGE *im,
if( im->dtype == IM_OPENOUT )
res = vips_sink_disc( im,
(VipsRegionWrite) write_vips, NULL );
/*
else
res = eval_to_memory( tg, or );
*/
res = vips_sink_memory( im );
/* Error?
*/

View File

@ -268,8 +268,6 @@ static GOptionEntry option_entries[] = {
N_( "set fatstrip height to N (DEBUG)" ), "N" },
{ "vips-progress", 'p', 0, G_OPTION_ARG_NONE, &im__progress,
N_( "show progress feedback" ), NULL },
{ "vips-wbuffer2", 'd', 0, G_OPTION_ARG_NONE, &im__wbuffer2,
N_( "use distributed work allocation" ), NULL },
{ NULL }
};

View File

@ -1,417 +0,0 @@
/* Manage pipelines of partial images.
*
* J.Cupitt, 17/4/93.
* 1/7/93 JC
* - adapted for partial v2
* 9/5/94
* - new thread stuff added, with a define to turn it off
* 21/11/94 JC
* - pw and ph wrong way round!
* 24/5/95 JC
* - redone, now works in pipelines!
* 30/8/96 JC
* - more sharing with im_generate()
* 2/3/98 JC
* - IM_ANY added
* 19/1/99 JC
* - oops, threads were broken :(
* 30/7/99 RP JC
* - threads reorganised for POSIX
* 25/10/03 JC
* - read via a buffer image so we work with mmap window images
* 27/11/06
* - merge threadgroup stuff
* 7/11/07
* - new eval start/progress/end system
* 7/10/09
* - gtkdoc comments
* 15/10/09
* - call start and stop functions from the worker threads
* - reworked a bit
*/
/*
This file is part of VIPS.
VIPS is free software; you can redistribute it and/or modify
it under the terms of the GNU Lesser General Public License as published by
the Free Software Foundation; either version 2 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
*/
/*
These files are distributed with VIPS - http://www.vips.ecs.soton.ac.uk
*/
/*
#define DEBUG
*/
#ifdef HAVE_CONFIG_H
#include <config.h>
#endif /*HAVE_CONFIG_H*/
#include <vips/intl.h>
#include <stdio.h>
#include <stdlib.h>
#include <vips/vips.h>
#include <vips/thread.h>
#include <vips/internal.h>
#include <vips/debug.h>
#ifdef WITH_DMALLOC
#include <dmalloc.h>
#endif /*WITH_DMALLOC*/
/* Track this stuff during an im_iterate().
*/
typedef struct _Iterate {
IMAGE *im;
/* We need a temp "p" image between the source image and us to
* make sure we can't damage the original.
*/
IMAGE *t;
/* Store our sequence values in tg->thr[i]->a. The seq values in the
* regions are used by the im_copy() to t.
*/
im_threadgroup_t *tg;
im_start_fn start;
im_generate_fn generate;
im_stop_fn stop;
void *b;
void *c;
/* Set this to signal to workers that we want to shut down sequences.
*/
gboolean shutdown;
} Iterate;
/* Call a thread's stop function.
*/
static int
iterate_call_stop( Iterate *iter, im_thread_t *thr )
{
#ifdef DEBUG
printf( "iterate_call_stop: thr = %p\n", thr );
#endif /*DEBUG*/
if( thr->a && iter->stop ) {
if( iter->stop( thr->a, iter->b, iter->c ) ) {
im_error( "im_iterate",
_( "stop function failed "
"for image \"%s\"" ),
iter->im->filename );
return( -1 );
}
thr->a = NULL;
}
return( 0 );
}
static void
iterate_call_all_stop( Iterate *iter )
{
im_threadgroup_t *tg = iter->tg;
int i;
#ifdef DEBUG
printf( "iterate_call_all_stop: start\n" );
#endif /*DEBUG*/
/* Wait for all threads to hit 'go'.
*/
im_threadgroup_wait( tg );
/* Get all threads off idle and waiting for work.
*/
for( i = 0; i < tg->nthr; i++ )
(void) im_threadgroup_get( tg );
/* Now run all threads one more time and ask them to junk their
* sequence value. 'shutdown' changes the behaviour of the work
* function, see below.
*/
iter->shutdown = TRUE;
for( i = 0; i < tg->nthr; i++ ) {
/* The work fn will need this set if it's not been run
* before.
*/
tg->thr[i]->b = iter;
im_threadgroup_trigger( tg->thr[i] );
}
/* And wait for them to idle again.
*/
im_threadgroup_wait( tg );
#ifdef DEBUG
printf( "iterate_call_all_stop: done\n" );
#endif /*DEBUG*/
}
static void
iterate_free( Iterate *iter )
{
/* All threads should have junked their sequence values.
*/
if( iter->tg ) {
int i;
for( i = 0; i < iter->tg->nthr; i++ )
g_assert( !iter->tg->thr[i]->a );
}
IM_FREEF( im_threadgroup_free, iter->tg );
IM_FREEF( im_close, iter->t );
}
/* Call the start function for this thread, if necessary.
*/
static int
iterate_call_start( Iterate *iter, im_thread_t *thr )
{
if( !thr->a && iter->start ) {
g_mutex_lock( iter->t->sslock );
thr->a = iter->start( iter->t, iter->b, iter->c );
g_mutex_unlock( iter->t->sslock );
if( !thr->a ) {
im_error( "im_iterate",
_( "start function failed for image \"%s\"" ),
iter->im->filename );
return( -1 );
}
}
return( 0 );
}
/* Our work function. This is complicated because we need to make sure we
* all call our start and stop functions from the worker thread, so that any
* regions created are owned by the worker and not the main thread.
*/
static int
iterate_work( im_thread_t *thr, REGION *reg, void *seq, void *a, void *b )
{
Iterate *iter = (Iterate *) a;
/* 'shutdown' is set at the end of computation to ask workers to free
* context.
*/
if( iter->shutdown ) {
if( iterate_call_stop( iter, thr ) )
return( -1 );
}
else {
/* Make sure the start function has run and we have the
* sequence value set.
*/
iterate_call_start( iter, thr );
seq = thr->a;
/* thr pos needs to be set before coming here ... check.
*/
{
Rect image;
image.left = 0;
image.top = 0;
image.width = thr->tg->im->Xsize;
image.height = thr->tg->im->Ysize;
g_assert( im_rect_includesrect( &image, &thr->pos ) );
}
/* Generate pixels for the user.
*/
if( im_prepare( reg, &thr->pos ) )
return( -1 );
/* Run the user code on this area.
*/
if( iter->generate( reg, seq, iter->b, iter->c ) )
return( -1 );
}
return( 0 );
}
static int
iterate_init( Iterate *iter,
IMAGE *im,
im_start_fn start, im_generate_fn generate, im_stop_fn stop,
void *b, void *c )
{
iter->im = im;
iter->t = NULL;
iter->tg = NULL;
iter->start = start;
iter->generate = generate;
iter->stop = stop;
iter->b = b;
iter->c = c;
iter->shutdown = FALSE;
if( !(iter->t = im_open( "iterate", "p" )) ||
im_copy( iter->im, iter->t ) ||
!(iter->tg = im_threadgroup_create( iter->t )) ) {
iterate_free( iter );
return( -1 );
}
iter->tg->work = iterate_work;
#ifdef DEBUG
im_diag( "im_iterate", "using %d threads", iter->tg->nthr );
#endif /*DEBUG*/
return( 0 );
}
/* Loop over an image, preparing in parts with threads.
*/
static int
iterate_loop( Iterate *iter, im_threadgroup_t *tg, IMAGE *t )
{
int x, y;
Rect image;
image.left = 0;
image.top = 0;
image.width = t->Xsize;
image.height = t->Ysize;
/* Loop over or, attaching to all sub-parts in turn.
*/
for( y = 0; y < t->Ysize; y += tg->ph )
for( x = 0; x < t->Xsize; x += tg->pw ) {
im_thread_t *thr;
Rect pos;
Rect clipped;
/* thrs appear on idle when the child thread does
* threadgroup_idle_add and hits the 'go' semaphore.
*/
thr = im_threadgroup_get( tg );
/* Set the position we want to generate with this
* thread.
*/
pos.left = x;
pos.top = y;
pos.width = tg->pw;
pos.height = tg->ph;
im_rect_intersectrect( &image, &pos, &clipped );
thr->pos = clipped;
/* Other stuff we want passed to iterate_work().
*/
thr->b = iter;
/* Start worker going.
*/
im_threadgroup_trigger( thr );
/* Trigger any eval callbacks on our source image,
* check for errors.
*/
if( im__handle_eval( t, tg->pw, tg->ph ) ||
im_threadgroup_iserror( tg ) ) {
/* Don't kill threads yet ... we may want to
* get some error stuff out of them.
*/
im_threadgroup_wait( tg );
return( -1 );
}
}
/* Make sure all processing is done.
*/
im_threadgroup_wait( tg );
return( 0 );
}
/**
* im_iterate:
* @im: scan over this image
* @start: start sequences with this function
* @generate: generate pixels with this function
* @stop: stop sequences with this function
* @a: user data
* @b: user data
*
* Loops over an image. @generate is called for every pixel in the image, with
* the @reg argument being a region of pixels for processing. im_iterate() is
* used to implement operations like im_avg() which have no image output.
*
* See also: im_generate(), im_open().
*
* Returns: 0 on success, or -1 on error.
*/
int
im_iterate( IMAGE *im,
im_start_fn start, im_generate_fn generate, im_stop_fn stop,
void *b, void *c )
{
Iterate iter;
int result;
if( im__wbuffer2 )
return( vips_sink( im, start, generate, stop, b, c ) );
g_assert( !im_image_sanity( im ) );
/* We don't use this, but make sure it's set in case any old binaries
* are expectiing it.
*/
im->Bbits = im_bits_of_fmt( im->BandFmt );
if( iterate_init( &iter, im, start, generate, stop, b, c ) )
return( -1 );
/* Signal start of eval.
*/
if( im__start_eval( iter.t ) ) {
iterate_free( &iter );
return( -1 );
}
/* Loop and generate multi-thread.
*/
result = iterate_loop( &iter, iter.tg, iter.t );
/* Signal end of eval.
*/
result |= im__end_eval( iter.t );
/* Shut down and test for any errors.
*/
iterate_call_all_stop( &iter );
result |= im_threadgroup_iserror( iter.tg );
iterate_free( &iter );
return( result );
}

File diff suppressed because it is too large Load Diff

View File

@ -1,452 +0,0 @@
/* Double-buffered write.
*
* 2/11/07
* - cut from im_generate
* 7/11/07
* - trigger start/end eval callbacks
* 21/3/10
* - optionally use wbuffer2
*/
/*
This file is part of VIPS.
VIPS is free software; you can redistribute it and/or modify
it under the terms of the GNU Lesser General Public License as published by
the Free Software Foundation; either version 2 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
*/
/*
These files are distributed with VIPS - http://www.vips.ecs.soton.ac.uk
*/
/*
#define DEBUG
*/
#ifdef HAVE_CONFIG_H
#include <config.h>
#endif /*HAVE_CONFIG_H*/
#include <vips/intl.h>
#include <stdio.h>
#include <stdlib.h>
#include <stdarg.h>
#include <assert.h>
#include <errno.h>
#include <string.h>
#include <sys/types.h>
#ifdef HAVE_UNISTD_H
#include <unistd.h>
#endif /*HAVE_UNISTD_H*/
#include <vips/vips.h>
#include <vips/internal.h>
#include <vips/private.h>
#include <vips/thread.h>
#ifdef WITH_DMALLOC
#include <dmalloc.h>
#endif /*WITH_DMALLOC*/
/* A buffer we are going to write to disc in a background thread.
*/
typedef struct _WriteBuffer {
im_threadgroup_t *tg; /* What makes the pixels */
REGION *region; /* Pixels */
Rect area; /* Part of image this region covers */
im_semaphore_t go; /* Start bg thread loop */
im_semaphore_t nwrite; /* Number of threads writing to region */
im_semaphore_t done; /* Bg thread has done write */
int write_errno; /* Save write errors here */
GThread *thread; /* BG writer thread */
gboolean kill; /* Set to ask thread to exit */
im_wbuffer_fn write_fn; /* BG write with this */
void *a; /* Client data */
void *b;
} WriteBuffer;
static void
wbuffer_free( WriteBuffer *wbuffer )
{
/* Is there a thread running this region? Kill it!
*/
if( wbuffer->thread ) {
wbuffer->kill = TRUE;
im_semaphore_up( &wbuffer->go );
/* Return value is always NULL (see wbuffer_write_thread).
*/
(void) g_thread_join( wbuffer->thread );
#ifdef DEBUG_CREATE
printf( "wbuffer_free: g_thread_join()\n" );
#endif /*DEBUG_CREATE*/
wbuffer->thread = NULL;
}
IM_FREEF( im_region_free, wbuffer->region );
im_semaphore_destroy( &wbuffer->go );
im_semaphore_destroy( &wbuffer->nwrite );
im_semaphore_destroy( &wbuffer->done );
im_free( wbuffer );
}
static void
wbuffer_write( WriteBuffer *wbuffer )
{
wbuffer->write_errno = wbuffer->write_fn( wbuffer->region,
&wbuffer->area, wbuffer->a, wbuffer->b );
#ifdef DEBUG
printf( "wbuffer_write: %d bytes from wbuffer %p\n",
wbuffer->region->bpl * wbuffer->area.height, wbuffer );
#endif /*DEBUG*/
}
#ifdef HAVE_THREADS
/* Run this as a thread to do a BG write.
*/
static void *
wbuffer_write_thread( void *data )
{
WriteBuffer *wbuffer = (WriteBuffer *) data;
for(;;) {
im_semaphore_down( &wbuffer->go );
if( wbuffer->kill )
break;
/* Wait for all writer threads to leave this wbuffer.
*/
im_semaphore_downn( &wbuffer->nwrite, 0 );
wbuffer_write( wbuffer );
/* Signal write complete.
*/
im_semaphore_up( &wbuffer->done );
}
return( NULL );
}
#endif /*HAVE_THREADS*/
static WriteBuffer *
wbuffer_new( im_threadgroup_t *tg, im_wbuffer_fn write_fn, void *a, void *b )
{
WriteBuffer *wbuffer;
if( !(wbuffer = IM_NEW( NULL, WriteBuffer )) )
return( NULL );
wbuffer->tg = tg;
wbuffer->region = NULL;
im_semaphore_init( &wbuffer->go, 0, "go" );
im_semaphore_init( &wbuffer->nwrite, 0, "nwrite" );
im_semaphore_init( &wbuffer->done, 0, "done" );
wbuffer->write_errno = 0;
wbuffer->thread = NULL;
wbuffer->kill = FALSE;
wbuffer->write_fn = write_fn;
wbuffer->a = a;
wbuffer->b = b;
if( !(wbuffer->region = im_region_create( tg->im )) ) {
wbuffer_free( wbuffer );
return( NULL );
}
#ifdef HAVE_THREADS
/* Make this last (picks up parts of wbuffer on startup).
*/
if( !(wbuffer->thread = g_thread_create( wbuffer_write_thread, wbuffer,
TRUE, NULL )) ) {
im_error( "wbuffer_new", "%s", _( "unable to create thread" ) );
wbuffer_free( wbuffer );
return( NULL );
}
#endif /*HAVE_THREADS*/
return( wbuffer );
}
/* Our work function ... generate a tile and tell the wbuffer write thread that
* we're done.
*/
static int
wbuffer_work_fn( im_thread_t *thr,
REGION *reg, void *a, void *b, void *c )
{
WriteBuffer *wbuffer = (WriteBuffer *) a;
/* thr pos needs to be set before coming here ... check.
*/
{
Rect image;
image.left = 0;
image.top = 0;
image.width = thr->tg->im->Xsize;
image.height = thr->tg->im->Ysize;
g_assert( im_rect_includesrect( &image, &thr->pos ) );
}
if( im_prepare_to( reg, thr->oreg, &thr->pos, thr->x, thr->y ) )
return( -1 );
im_semaphore_upn( &wbuffer->nwrite, 1 );
return( 0 );
}
/* Attach a wbuffer to a position.
*/
static int
wbuffer_position( WriteBuffer *wbuffer,
int left, int top, int width, int height )
{
Rect image, area;
image.left = 0;
image.top = 0;
image.width = wbuffer->tg->im->Xsize;
image.height = wbuffer->tg->im->Ysize;
area.left = left;
area.top = top;
area.width = width;
area.height = height;
im_rect_intersectrect( &area, &image, &wbuffer->area );
if( im_region_buffer( wbuffer->region, &wbuffer->area ) )
return( -1 );
/* This should be an exclusive buffer, hopefully.
*/
assert( !wbuffer->region->buffer->done );
return( 0 );
}
/* Loop over a wbuffer filling it threadily.
*/
static int
wbuffer_fill( WriteBuffer *wbuffer )
{
Rect *area = &wbuffer->area;
im_threadgroup_t *tg = wbuffer->tg;
IMAGE *im = tg->im;
Rect image;
int x, y;
#ifdef DEBUG
printf( "wbuffer_fill: starting for wbuffer %p at line %d\n",
wbuffer, area->top );
#endif /*DEBUG*/
image.left = 0;
image.top = 0;
image.width = im->Xsize;
image.height = im->Ysize;
/* Loop over area, sparking threads for all sub-parts in turn.
*/
for( y = area->top; y < IM_RECT_BOTTOM( area ); y += tg->ph )
for( x = area->left; x < IM_RECT_RIGHT( area ); x += tg->pw ) {
im_thread_t *thr;
Rect pos;
Rect clipped;
/* thrs appear on idle when the child thread does
* threadgroup_idle_add and hits the 'go' semaphore.
*/
thr = im_threadgroup_get( tg );
/* Set the position we want to generate with this
* thread. Clip against the size of the image and the
* space available in or.
*/
pos.left = x;
pos.top = y;
pos.width = tg->pw;
pos.height = tg->ph;
im_rect_intersectrect( &pos, &image, &clipped );
im_rect_intersectrect( &clipped, area, &clipped );
/* Note params.
*/
thr->oreg = wbuffer->region;
thr->pos = clipped;
thr->x = clipped.left;
thr->y = clipped.top;
thr->a = wbuffer;
#ifdef DEBUG
printf( "wbuffer_fill: starting for tile at %d x %d\n",
x, y );
#endif /*DEBUG*/
/* Add writer to n of writers on wbuffer, set it going.
*/
im_semaphore_upn( &wbuffer->nwrite, -1 );
im_threadgroup_trigger( thr );
/* Trigger any eval callbacks on our source image and
* check for errors.
*/
if( im__handle_eval( tg->im, tg->pw, tg->ph ) ||
im_threadgroup_iserror( tg ) ) {
/* Don't kill threads yet ... we may want to
* get some error stuff out of them.
*/
im_threadgroup_wait( tg );
return( -1 );
}
}
return( 0 );
}
/* Eval to file.
*/
static int
wbuffer_eval_to_file( WriteBuffer *b1, WriteBuffer *b2 )
{
im_threadgroup_t *tg = b1->tg;
IMAGE *im = tg->im;
int y;
assert( b1->tg == b2->tg );
#ifdef DEBUG
int nstrips;
nstrips = 0;
printf( "wbuffer_eval_to_file: partial image output to file\n" );
#endif /*DEBUG*/
/* What threads do at the end of each tile ... decrement the nwrite
* semaphore.
*/
tg->work = wbuffer_work_fn;
/* Fill to in steps, write each to the output.
*/
for( y = 0; y < im->Ysize; y += tg->nlines ) {
/* Attach to this position in image.
*/
if( wbuffer_position( b1, 0, y, im->Xsize, tg->nlines ) )
return( -1 );
/* Spark off threads to fill with data.
*/
if( wbuffer_fill( b1 ) )
return( -1 );
/* We have to keep the ordering on wbuffer writes, so we can't
* have more than one background write going at once. Plus we
* want to make sure write()s don't get interleaved. Wait for
* the previous BG write (if any) to finish.
*/
if( y > 0 ) {
im_semaphore_down( &b2->done );
/* Previous write suceeded?
*/
if( b2->write_errno ) {
im_error_system( b2->write_errno,
"im__eval_to_file",
"%s", _( "write failed" ) );
return( -1 );
}
}
/* b1 write can go.
*/
im_semaphore_up( &b1->go );
#ifndef HAVE_THREADS
/* No threading ... just write.
*/
wbuffer_write( b1 );
#endif /*HAVE_THREADS*/
/* Rotate wbuffers.
*/
{
WriteBuffer *t;
t = b1; b1 = b2; b2 = t;
}
#ifdef DEBUG
nstrips++;
#endif /*DEBUG*/
}
/* Wait for all threads to finish, check for any errors.
*/
im_threadgroup_wait( tg );
im_semaphore_down( &b2->done );
if( im_threadgroup_iserror( tg ) )
return( -1 );
if( b1->write_errno || b2->write_errno ) {
im_error_system(
b1->write_errno ? b1->write_errno : b2->write_errno,
"im__eval_to_file", "%s", _( "write failed" ) );
return( -1 );
}
#ifdef DEBUG
printf( "wbuffer_eval_to_file: success! %d strips written\n", nstrips );
#endif /*DEBUG*/
return( 0 );
}
int
im_wbuffer( im_threadgroup_t *tg, im_wbuffer_fn write_fn, void *a, void *b )
{
WriteBuffer *b1, *b2;
int result;
/* Optionally use the newer one.
*/
if( im__wbuffer2 )
return( vips_sink_disc( tg->im,
(VipsRegionWrite) write_fn, a ) );
if( im__start_eval( tg->im ) )
return( -1 );
result = 0;
b1 = wbuffer_new( tg, write_fn, a, b );
b2 = wbuffer_new( tg, write_fn, a, b );
if( !b1 || !b2 || wbuffer_eval_to_file( b1, b2 ) )
result = -1;
im__end_eval( tg->im );
wbuffer_free( b1 );
wbuffer_free( b2 );
return( result );
}

View File

@ -56,6 +56,10 @@
typedef struct _Sink {
VipsImage *im;
/* A big region for the image memory. All the threads write to this.
*/
REGION *all;
/* The position we're at in the image.
*/
int x;
@ -68,13 +72,32 @@ typedef struct _Sink {
int nlines;
} Sink;
static void
sink_free( Sink *sink )
{
IM_FREEF( im_region_free, sink->all );
}
static int
sink_init( Sink *sink, VipsImage *im )
{
Rect all;
sink->im = im;
sink->x = 0;
sink->y = 0;
all.left = 0;
all.top = 0;
all.width = im->Xsize;
all.height = im->Ysize;
if( !(sink->all = im_region_create( im )) ||
im_region_image( sink->all, &all ) ) {
sink_free( sink );
return( -1 );
}
vips_get_tile_size( im,
&sink->tile_width, &sink->tile_height, &sink->nlines );
@ -125,7 +148,8 @@ sink_work( VipsThreadState *state, void *a )
{
Sink *sink = (Sink *) a;
if( im_prepare_to( state->reg, &state->pos ) )
if( im_prepare_to( state->reg, sink->all,
&state->pos, state->pos.left, state->pos.top ) )
return( -1 );
return( 0 );
@ -170,11 +194,13 @@ vips_sink_memory( VipsImage *im )
*/
im->Bbits = im_bits_of_fmt( im->BandFmt );
if( sink_init( &sink ) )
if( sink_init( &sink, im ) )
return( -1 );
if( im__start_eval( im ) )
if( im__start_eval( im ) ) {
sink_free( &sink );
return( -1 );
}
result = vips_threadpool_run( im,
vips_thread_state_new,
@ -185,5 +211,7 @@ vips_sink_memory( VipsImage *im )
im__end_eval( im );
sink_free( &sink );
return( result );
}

View File

@ -1,707 +0,0 @@
/* Support for thread groups.
*
* 29/9/99 JC
* - from thread.c
* 23/10/03 JC
* - threadgroup now has a kill flag as well
* 9/6/06
* - use an idle queue instead of flags
* 24/11/06
* - double-buffer file writes
* 27/11/06
* - break stuff out to generate / iterate
* 15/10/09
* - get rid of inplace and default work stuff, you must now always set a
* work function
* 22/10/09
* - gtkdoc
* 14/3/10
* - scale nlines with nthr for smalltile
* - better nprocs guesser
*/
/*
This file is part of VIPS.
VIPS is free software; you can redistribute it and/or modify
it under the terms of the GNU Lesser General Public License as published by
the Free Software Foundation; either version 2 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
*/
/*
These files are distributed with VIPS - http://www.vips.ecs.soton.ac.uk
*/
/*
#define TIME_THREAD
#define DEBUG_CREATE
#define DEBUG_HIGHWATER
#define DEBUG_IO
*/
#ifdef HAVE_CONFIG_H
#include <config.h>
#endif /*HAVE_CONFIG_H*/
#include <vips/intl.h>
#include <stdio.h>
#include <stdlib.h>
#ifdef HAVE_UNISTD_H
#include <unistd.h>
#endif /*HAVE_UNISTD_H*/
#include <errno.h>
#include <vips/vips.h>
#include <vips/internal.h>
#include <vips/thread.h>
#ifdef WITH_DMALLOC
#include <dmalloc.h>
#endif /*WITH_DMALLOC*/
/**
* SECTION: threadgroup
* @short_description: groups of worker threads
* @stability: Stable
* @see_also: <link linkend="libvips-generate">generate</link>
* @include: vips/vips.h
*
* VIPS has its own threadpool system, used by (for example)
* im_prepare_thread().
*
* Most of this is internal to VIPS and does not need to be documented. You
* should only need im_threadgroup_create() and im_threadgroup_free().
*/
#ifdef TIME_THREAD
/* Size of time buffers.
*/
#define IM_TBUF_SIZE (20000)
#endif /*TIME_THREAD*/
/* Maximum number of concurrent threads we allow. No reason for the limit,
* it's just there to stop mad values for IM_CONCURRENCY killing the system.
*/
#define IM_MAX_THREADS (1024)
/* Name of environment variable we get concurrency level from.
*/
#define IM_CONCURRENCY "IM_CONCURRENCY"
/* Default tile geometry ... can be set by init_world.
*/
int im__tile_width = IM__TILE_WIDTH;
int im__tile_height = IM__TILE_HEIGHT;
int im__fatstrip_height = IM__FATSTRIP_HEIGHT;
int im__thinstrip_height = IM__THINSTRIP_HEIGHT;
/* Default n threads ... 0 means get from environment.
*/
int im__concurrency = 0;
#ifndef HAVE_THREADS
/* If we're building without gthread, we need stubs for the g_thread_*() and
* g_mutex_*() functions. <vips/thread.h> has #defines which point the g_
* names here.
*/
void im__g_thread_init( GThreadFunctions *vtable ) {}
gpointer im__g_thread_join( GThread *dummy ) { return( NULL ); }
gpointer im__g_thread_self( void ) { return( NULL ); }
GThread *im__g_thread_create_full( GThreadFunc d1,
gpointer d2, gulong d3, gboolean d4, gboolean d5, GThreadPriority d6,
GError **d7 )
{ return( NULL ); }
GMutex *im__g_mutex_new( void ) { return( NULL ); }
void im__g_mutex_free( GMutex *d ) {}
void im__g_mutex_lock( GMutex *d ) {}
void im__g_mutex_unlock( GMutex *d ) {}
#endif /*!HAVE_THREADS*/
void
im_concurrency_set( int concurrency )
{
im__concurrency = concurrency;
}
static int
get_num_processors( void )
{
int nproc;
nproc = 1;
#ifdef G_OS_UNIX
#if defined(HAVE_UNISTD_H) && defined(_SC_NPROCESSORS_ONLN)
{
/* POSIX style.
*/
int x;
x = sysconf( _SC_NPROCESSORS_ONLN );
if( x > 0 )
nproc = x;
}
#elif defined HW_NCPU
{
/* BSD style.
*/
int x;
size_t len = sizeof(x);
sysctl( (int[2]) {CTL_HW, HW_NCPU}, 2, &x, &len, NULL, 0 );
if( x > 0 )
nproc = x;
}
#endif
/* libgomp has some very complex code on Linux to count the number of
* processors available to the current process taking pthread affinity
* into account, but we don't attempt that here. Perhaps we should?
*/
#endif /*G_OS_UNIX*/
#ifdef OS_WIN32
{
/* Count the CPUs currently available to this process.
*/
DWORD_PTR process_cpus;
DWORD_PTR system_cpus;
if( GetProcessAffinityMask( GetCurrentProcess(),
&process_cpus, &system_cpus ) ) {
unsigned int count;
for( count = 0; process_cpus != 0; process_cpus >>= 1 )
if( process_cpus & 1 )
count++;
if( count > 0 )
nproc = count;
}
}
#endif /*OS_WIN32*/
return( nproc );
}
/* Set (p)thr_concurrency() from IM_CONCURRENCY environment variable. Return
* the number of regions we should pass over the image.
*/
int
im_concurrency_get( void )
{
const char *str;
int nthr;
int x;
/* Tell the threads system how much concurrency we expect.
*/
if( im__concurrency > 0 )
nthr = im__concurrency;
else if( (str = g_getenv( IM_CONCURRENCY )) &&
(x = atoi( str )) > 0 )
nthr = x;
else
nthr = get_num_processors();
if( nthr < 1 || nthr > IM_MAX_THREADS ) {
nthr = IM_CLIP( 1, nthr, IM_MAX_THREADS );
im_warn( "im_concurrency_get",
_( "threads clipped to %d" ), nthr );
}
/* Save for next time around.
*/
im_concurrency_set( nthr );
return( nthr );
}
#ifdef TIME_THREAD
/* Save time buffers.
*/
static int
save_time_buffers( REGION *reg )
{
int i;
static int rn = 1;
FILE *fp;
char name[ 256 ];
im_snprintf( name, 256, "time%d", rn++ );
if( !(fp = fopen( name, "w" )) )
error_exit( "unable to write to \"%s\"", name );
for( i = 0; i < reg->tpos; i++ )
fprintf( fp, "%lld\n%lld\n", reg->btime[i], reg->etime[i] );
fclose( fp );
return( 0 );
}
#endif /*TIME_THREAD*/
/* Handle the idle list.
*/
/* Make sure thr is not on the idle list ... eg. on thread_free().
*/
static void
threadgroup_idle_remove( im_thread_t *thr )
{
im_threadgroup_t *tg = thr->tg;
g_mutex_lock( tg->idle_lock );
tg->idle = g_slist_remove( tg->idle, thr );
#ifdef DEBUG_HIGHWATER
tg->nidle -= 1;
g_assert( tg->nidle >= 0 && tg->nidle <= tg->nthr );
#endif /*DEBUG_HIGHWATER*/
g_mutex_unlock( tg->idle_lock );
}
/* Add to idle.
*/
static void
threadgroup_idle_add( im_thread_t *thr )
{
im_threadgroup_t *tg = thr->tg;
g_mutex_lock( tg->idle_lock );
g_assert( !g_slist_find( tg->idle, thr ) );
tg->idle = g_slist_prepend( tg->idle, thr );
#ifdef DEBUG_HIGHWATER
tg->nidle += 1;
g_assert( tg->nidle >= 0 && tg->nidle <= tg->nthr );
#endif /*DEBUG_HIGHWATER*/
im_semaphore_up( &tg->idle_sem );
g_mutex_unlock( tg->idle_lock );
}
/* Take the first thread off idle.
*/
im_thread_t *
im_threadgroup_get( im_threadgroup_t *tg )
{
im_thread_t *thr;
/* Wait for a thread to be added to idle.
*/
im_semaphore_down( &tg->idle_sem );
g_mutex_lock( tg->idle_lock );
g_assert( tg->idle );
g_assert( tg->idle->data );
thr = (im_thread_t *) tg->idle->data;
tg->idle = g_slist_remove( tg->idle, thr );
#ifdef DEBUG_HIGHWATER
tg->nidle -= 1;
g_assert( tg->nidle >= 0 && tg->nidle <= tg->nthr );
if( tg->nidle < tg->min_idle )
tg->min_idle = tg->nidle;
#endif /*DEBUG_HIGHWATER*/
g_mutex_unlock( tg->idle_lock );
return( thr );
}
/* Wait for all threads to be idle.
*/
void
im_threadgroup_wait( im_threadgroup_t *tg )
{
/* Wait for all threads to signal idle.
*/
im_semaphore_downn( &tg->idle_sem, tg->nthr );
g_mutex_lock( tg->idle_lock );
g_assert( tg->idle );
g_assert( g_slist_length( tg->idle ) == (unsigned int) tg->nthr );
g_mutex_unlock( tg->idle_lock );
/* All threads are now blocked on their 'go' semaphores, and idle is
* zero. Up idle by the number of threads, ready for the next loop.
*/
im_semaphore_upn( &tg->idle_sem, tg->nthr );
}
/* Junk a thread.
*/
static void
thread_free( im_thread_t *thr )
{
/* Is there a thread running this region? Kill it!
*/
if( thr->thread ) {
thr->kill = -1;
im_semaphore_up( &thr->go );
/* Return value is always NULL (see thread_main_loop).
*/
(void) g_thread_join( thr->thread );
#ifdef DEBUG_CREATE
printf( "thread_free: g_thread_join()\n" );
#endif /*DEBUG_CREATE*/
thr->thread = NULL;
}
im_semaphore_destroy( &thr->go );
threadgroup_idle_remove( thr );
IM_FREEF( im_region_free, thr->reg );
thr->oreg = NULL;
thr->tg = NULL;
#ifdef TIME_THREAD
if( thr->btime )
(void) save_time_buffers( thr );
#endif /*TIME_THREAD*/
}
/* The work we do in one loop ... fill a region, and call a function. Either
* called from the main thread (if no threading), or from worker.
*/
static void
work_fn( im_thread_t *thr )
{
/* Doublecheck only one thread per region.
*/
g_assert( thr->thread == g_thread_self() );
g_assert( thr->tg->work );
/* Call our work function.
*/
if( !thr->error &&
thr->tg->work( thr, thr->reg, thr->a, thr->b, thr->c ) )
thr->error = -1;
/* Back on the idle queue again.
*/
threadgroup_idle_add( thr );
}
#ifdef HAVE_THREADS
/* What runs as a thread ... loop, waiting to be told to do stuff.
*/
static void *
thread_main_loop( void *a )
{
im_thread_t *thr = (im_thread_t *) a;
im_threadgroup_t *tg = thr->tg;
/* We now control the region (it was created by tg when we were
* built).
*/
im__region_take_ownership( thr->reg );
for(;;) {
g_assert( tg == thr->tg );
/* Signal the main thread that we are idle, and block.
*/
im_semaphore_down( &thr->go );
/* Asked to exit?
*/
if( thr->kill )
break;
#ifdef TIME_THREAD
/* Note start time.
*/
if( thr->btime )
thr->btime[thr->tpos] = gethrtime();
#endif /*TIME_THREAD*/
/* Loop once.
*/
work_fn( thr );
#ifdef TIME_THREAD
/* Note stop time.
*/
if( thr->etime ) {
thr->etime[thr->tpos] = gethrtime();
thr->tpos++;
}
#endif /*TIME_THREAD*/
}
return( NULL );
}
#endif /*HAVE_THREADS*/
/* Attach another thread to a threadgroup.
*/
static im_thread_t *
threadgroup_thread_new( im_threadgroup_t *tg )
{
im_thread_t *thr;
if( !(thr = IM_NEW( tg->im, im_thread_t )) )
return( NULL );
thr->tg = tg;
thr->reg = NULL;
thr->thread = NULL;
im_semaphore_init( &thr->go, 0, "go" );
thr->kill = 0;
thr->error = 0;
thr->oreg = NULL;
thr->a = thr->b = thr->c = NULL;
#ifdef TIME_THREAD
thr->btime = NULL;
thr->etime = NULL;
thr->tpos = 0;
#endif /*TIME_THREAD*/
/* Attach stuff.
*/
if( !(thr->reg = im_region_create( tg->im )) ) {
thread_free( thr );
return( NULL );
}
/* Get ready to hand the region over to the thread.
*/
im__region_no_ownership( thr->reg );
#ifdef TIME_THREAD
thr->btime = IM_ARRAY( tg->im, IM_TBUF_SIZE, hrtime_t );
thr->etime = IM_ARRAY( tg->im, IM_TBUF_SIZE, hrtime_t );
if( !thr->btime || !thr->etime ) {
thread_free( thr );
return( NULL );
}
#endif /*TIME_THREAD*/
#ifdef HAVE_THREADS
/* Make a worker thread. We have to use g_thread_create_full() because
* we need to insist on a non-tiny stack. Some platforms default to
* very small values (eg. various BSDs).
*/
if( !(thr->thread = g_thread_create_full( thread_main_loop, thr,
IM__DEFAULT_STACK_SIZE, TRUE, FALSE,
G_THREAD_PRIORITY_NORMAL, NULL )) ) {
im_error( "threadgroup_thread_new",
"%s", _( "unable to create thread" ) );
thread_free( thr );
return( NULL );
}
#ifdef DEBUG_CREATE
printf( "threadgroup_thread_new: g_thread_create_full()\n" );
#endif /*DEBUG_CREATE*/
#endif /*HAVE_THREADS*/
/* It's idle.
*/
threadgroup_idle_add( thr );
return( thr );
}
/* Kill all threads in a threadgroup, if there are any.
*/
static void
threadgroup_kill_threads( im_threadgroup_t *tg )
{
if( tg->thr ) {
int i;
for( i = 0; i < tg->nthr; i++ )
thread_free( tg->thr[i] );
tg->thr = NULL;
g_assert( !tg->idle );
/* Reset the idle semaphore.
*/
im_semaphore_destroy( &tg->idle_sem );
im_semaphore_init( &tg->idle_sem, 0, "idle" );
#ifdef DEBUG_IO
printf( "threadgroup_kill_threads: killed %d threads\n",
tg->nthr );
#endif /*DEBUG_IO*/
}
}
/**
* im_threadgroup_free:
* @tg: threadgroup to free
*
* Frees a threadgroup. This function can be called multiple times, though
* only the first time will have any effect.
*
* All worker threads are terminated and all resources freed.
*
* See also: im_threadgroup_create().
*
* Returns: 0.
*/
int
im_threadgroup_free( im_threadgroup_t *tg )
{
#ifdef DEBUG_IO
printf( "im_threadgroup_free: \"%s\" (%p)\n", tg->im->filename, tg );
#endif /*DEBUG_IO*/
if( !tg || tg->zombie )
return( 0 );
threadgroup_kill_threads( tg );
im_semaphore_destroy( &tg->idle_sem );
IM_FREEF( g_mutex_free, tg->idle_lock );
tg->zombie = -1;
#ifdef DEBUG_HIGHWATER
printf( "im_threadgroup_free %p: max busy workers = %d\n",
tg, tg->nthr - tg->min_idle );
#endif /*DEBUG_HIGHWATER*/
return( 0 );
}
/**
* im_threadgroup_create:
* @im: image to create the threadgroup on
*
* Makes a threadgroup attached to the image. The threadgroup will be freed
* for you if the image is closed, but you can free it yourself with
* im_threadgroup_free() if you wish.
*
* See also: im_threadgroup_free(), im_prepare_thread().
*
* Returns: an #im_threadgroup_t on success, %NULL on error.
*/
im_threadgroup_t *
im_threadgroup_create( IMAGE *im )
{
im_threadgroup_t *tg;
int i;
/* Allocate and init new thread block.
*/
if( !(tg = IM_NEW( im, im_threadgroup_t )) )
return( NULL );
tg->zombie = 0;
tg->im = im;
tg->work = NULL;
tg->nthr = im_concurrency_get();
tg->thr = NULL;
tg->kill = 0;
/* Pick a render geometry.
*/
vips_get_tile_size( tg->im, &tg->pw, &tg->ph, &tg->nlines );
/* Attach tidy-up callback.
*/
if( im_add_close_callback( im,
(im_callback_fn) im_threadgroup_free, tg, NULL ) ) {
(void) im_threadgroup_free( tg );
return( NULL );
}
#ifdef DEBUG_IO
printf( "im_threadgroup_create: %d by %d patches, "
"groups of %d scanlines\n", tg->pw, tg->ph, tg->nlines );
#endif /*DEBUG_IO*/
/* Init locks.
*/
im_semaphore_init( &tg->idle_sem, 0, "idle" );
tg->idle = NULL;
tg->idle_lock = g_mutex_new();
#ifdef DEBUG_HIGHWATER
tg->nidle = 0;
tg->min_idle = tg->nthr;
#endif /*DEBUG_HIGHWATER*/
/* Make thread array.
*/
if( !(tg->thr = IM_ARRAY( im, tg->nthr + 1, im_thread_t * )) )
return( NULL );
for( i = 0; i < tg->nthr + 1; i++ )
tg->thr[i] = NULL;
/* Attach threads.
*/
for( i = 0; i < tg->nthr; i++ )
if( !(tg->thr[i] = threadgroup_thread_new( tg )) )
return( NULL );
#ifdef DEBUG_IO
printf( "im_threadgroup_create: \"%s\" (%p), with %d threads\n",
im->filename, tg, tg->nthr );
#endif /*DEBUG_IO*/
return( tg );
}
/* Trigger work. If not threading, just call fn directly.
*/
void
im_threadgroup_trigger( im_thread_t *thr )
{
/* Start worker going.
*/
im_semaphore_up( &thr->go );
#ifndef HAVE_THREADS
/* No threading ... just eval directly.
*/
work_fn( thr );
#endif /*HAVE_THREADS*/
}
/* Test all threads for error.
*/
int
im_threadgroup_iserror( im_threadgroup_t *tg )
{
int i;
if( tg->kill )
return( -1 );
if( tg->im->kill )
return( -1 );
for( i = 0; i < tg->nthr; i++ )
if( tg->thr[i]->error )
return( -1 );
return( 0 );
}

View File

@ -64,8 +64,7 @@
/**
* SECTION: threadpool
* @short_description: pools of worker threads ... a lighter version of
* threadgroups
* @short_description: pools of worker threads
* @stability: Stable
* @see_also: <link linkend="libvips-generate">generate</link>
* @include: vips/vips.h
@ -74,11 +73,151 @@
* in turns to allocate units of work (a unit might be a tile in an image),
* then run in parallel to process those units. An optional progress function
* can be used to give feedback.
*
* This is like threadgroup, but workers allocate work units themselves. This
* reduces synchronisation overhead and improves scalability.
*/
/* Maximum number of concurrent threads we allow. No reason for the limit,
* it's just there to stop mad values for IM_CONCURRENCY killing the system.
*/
#define IM_MAX_THREADS (1024)
/* Name of environment variable we get concurrency level from.
*/
#define IM_CONCURRENCY "IM_CONCURRENCY"
/* Default tile geometry ... can be set by init_world.
*/
int im__tile_width = IM__TILE_WIDTH;
int im__tile_height = IM__TILE_HEIGHT;
int im__fatstrip_height = IM__FATSTRIP_HEIGHT;
int im__thinstrip_height = IM__THINSTRIP_HEIGHT;
/* Default n threads ... 0 means get from environment.
*/
int im__concurrency = 0;
#ifndef HAVE_THREADS
/* If we're building without gthread, we need stubs for the g_thread_*() and
* g_mutex_*() functions. <vips/thread.h> has #defines which point the g_
* names here.
*/
void im__g_thread_init( GThreadFunctions *vtable ) {}
gpointer im__g_thread_join( GThread *dummy ) { return( NULL ); }
gpointer im__g_thread_self( void ) { return( NULL ); }
GThread *im__g_thread_create_full( GThreadFunc d1,
gpointer d2, gulong d3, gboolean d4, gboolean d5, GThreadPriority d6,
GError **d7 )
{ return( NULL ); }
GMutex *im__g_mutex_new( void ) { return( NULL ); }
void im__g_mutex_free( GMutex *d ) {}
void im__g_mutex_lock( GMutex *d ) {}
void im__g_mutex_unlock( GMutex *d ) {}
#endif /*!HAVE_THREADS*/
void
im_concurrency_set( int concurrency )
{
im__concurrency = concurrency;
}
static int
get_num_processors( void )
{
int nproc;
nproc = 1;
#ifdef G_OS_UNIX
#if defined(HAVE_UNISTD_H) && defined(_SC_NPROCESSORS_ONLN)
{
/* POSIX style.
*/
int x;
x = sysconf( _SC_NPROCESSORS_ONLN );
if( x > 0 )
nproc = x;
}
#elif defined HW_NCPU
{
/* BSD style.
*/
int x;
size_t len = sizeof(x);
sysctl( (int[2]) {CTL_HW, HW_NCPU}, 2, &x, &len, NULL, 0 );
if( x > 0 )
nproc = x;
}
#endif
/* libgomp has some very complex code on Linux to count the number of
* processors available to the current process taking pthread affinity
* into account, but we don't attempt that here. Perhaps we should?
*/
#endif /*G_OS_UNIX*/
#ifdef OS_WIN32
{
/* Count the CPUs currently available to this process.
*/
DWORD_PTR process_cpus;
DWORD_PTR system_cpus;
if( GetProcessAffinityMask( GetCurrentProcess(),
&process_cpus, &system_cpus ) ) {
unsigned int count;
for( count = 0; process_cpus != 0; process_cpus >>= 1 )
if( process_cpus & 1 )
count++;
if( count > 0 )
nproc = count;
}
}
#endif /*OS_WIN32*/
return( nproc );
}
/* Set (p)thr_concurrency() from IM_CONCURRENCY environment variable. Return
* the number of regions we should pass over the image.
*/
int
im_concurrency_get( void )
{
const char *str;
int nthr;
int x;
/* Tell the threads system how much concurrency we expect.
*/
if( im__concurrency > 0 )
nthr = im__concurrency;
else if( (str = g_getenv( IM_CONCURRENCY )) &&
(x = atoi( str )) > 0 )
nthr = x;
else
nthr = get_num_processors();
if( nthr < 1 || nthr > IM_MAX_THREADS ) {
nthr = IM_CLIP( 1, nthr, IM_MAX_THREADS );
im_warn( "im_concurrency_get",
_( "threads clipped to %d" ), nthr );
}
/* Save for next time around.
*/
im_concurrency_set( nthr );
return( nthr );
}
/**
* VipsThreadState:
* @reg: a #REGION
@ -375,7 +514,7 @@ vips_thread_main_loop( void *a )
}
#endif /*HAVE_THREADS*/
/* Attach another thread to a threadgroup.
/* Attach another thread to a threadpool.
*/
static VipsThread *
vips_thread_new( VipsThreadpool *pool )
@ -417,7 +556,7 @@ vips_thread_new( VipsThreadpool *pool )
if( !(thr->thread = g_thread_create_full( vips_thread_main_loop, thr,
IM__DEFAULT_STACK_SIZE, TRUE, FALSE,
G_THREAD_PRIORITY_NORMAL, NULL )) ) {
im_error( "threadgroup_thread_new",
im_error( "vips_thread_new",
"%s", _( "unable to create thread" ) );
vips_thread_free( thr );
return( NULL );
@ -429,7 +568,7 @@ vips_thread_new( VipsThreadpool *pool )
return( thr );
}
/* Kill all threads in a threadgroup, if there are any.
/* Kill all threads in a threadpool, if there are any.
*/
static void
vips_threadpool_kill_threads( VipsThreadpool *pool )