stuff
This commit is contained in:
parent
064bd55a98
commit
55541cdfc8
5
TODO
5
TODO
@ -1,10 +1,7 @@
|
|||||||
- make VipsThread opaque ... have a separate VipsThreadState struct holding
|
- make VipsRegionWrite the real fun, im_wbuffer_fn the typedef
|
||||||
stuff and only expose that
|
|
||||||
|
|
||||||
- try with im_render(), ouch, that will be painful
|
- try with im_render(), ouch, that will be painful
|
||||||
|
|
||||||
- switch other file sinks over, maybe have a shim layer?
|
|
||||||
|
|
||||||
- switch im_iterate() over?
|
- switch im_iterate() over?
|
||||||
|
|
||||||
- do we have other uses of threadgroup?
|
- do we have other uses of threadgroup?
|
||||||
|
@ -24,6 +24,7 @@
|
|||||||
<xi:include href="xml/meta.xml"/>
|
<xi:include href="xml/meta.xml"/>
|
||||||
<xi:include href="xml/region.xml"/>
|
<xi:include href="xml/region.xml"/>
|
||||||
<xi:include href="xml/generate.xml"/>
|
<xi:include href="xml/generate.xml"/>
|
||||||
|
<xi:include href="xml/threadpool.xml"/>
|
||||||
<xi:include href="xml/threadgroup.xml"/>
|
<xi:include href="xml/threadgroup.xml"/>
|
||||||
<xi:include href="xml/error.xml"/>
|
<xi:include href="xml/error.xml"/>
|
||||||
<xi:include href="xml/memory.xml"/>
|
<xi:include href="xml/memory.xml"/>
|
||||||
|
@ -70,6 +70,10 @@ extern int im__concurrency;
|
|||||||
*/
|
*/
|
||||||
extern int im__progress;
|
extern int im__progress;
|
||||||
|
|
||||||
|
/* Use wbuffer2.
|
||||||
|
*/
|
||||||
|
extern int im__wbuffer2;
|
||||||
|
|
||||||
typedef int (*im__fftproc_fn)( IMAGE *, IMAGE *, IMAGE * );
|
typedef int (*im__fftproc_fn)( IMAGE *, IMAGE *, IMAGE * );
|
||||||
|
|
||||||
/* iofuncs
|
/* iofuncs
|
||||||
|
@ -42,50 +42,33 @@ extern "C" {
|
|||||||
|
|
||||||
#include <vips/semaphore.h>
|
#include <vips/semaphore.h>
|
||||||
|
|
||||||
/* What we track for each thread in the pool.
|
/* The per-thread state we expose. Allocate functions can use these members to
|
||||||
|
* communicate with work functions.
|
||||||
*/
|
*/
|
||||||
typedef struct {
|
typedef struct _VipsThreadState {
|
||||||
/* All private.
|
/* This region is created and destroyed by the threadpool for the
|
||||||
*/
|
* worker.
|
||||||
/*< private >*/
|
|
||||||
struct _VipsThreadpool *pool; /* Pool we are part of */
|
|
||||||
|
|
||||||
GThread *thread; /* Thread for this region */
|
|
||||||
|
|
||||||
/* Set this to ask the thread to exit.
|
|
||||||
*/
|
|
||||||
gboolean exit;
|
|
||||||
|
|
||||||
/* Set by the thread if work or allocate return an error.
|
|
||||||
*/
|
|
||||||
gboolean error;
|
|
||||||
|
|
||||||
/* Thread state for the worker and allocate. Handy for communication.
|
|
||||||
* The region is created and destroyed by the threadpool for the
|
|
||||||
* worker.
|
|
||||||
*/
|
*/
|
||||||
REGION *reg;
|
REGION *reg;
|
||||||
|
|
||||||
|
/* The rest are neither used nor set, do what you lke with them.
|
||||||
|
*/
|
||||||
Rect pos;
|
Rect pos;
|
||||||
int x, y;
|
int x, y;
|
||||||
void *a, *b, *c;
|
void *d, *e, *f;
|
||||||
|
} VipsThreadState;
|
||||||
#ifdef TIME_THREAD
|
|
||||||
double *btime, *etime;
|
|
||||||
int tpos;
|
|
||||||
#endif /*TIME_THREAD*/
|
|
||||||
} VipsThread;
|
|
||||||
|
|
||||||
/* A work allocate function. This is run single-threaded by a worker to
|
/* A work allocate function. This is run single-threaded by a worker to
|
||||||
* set up a new work unit.
|
* set up a new work unit.
|
||||||
* Return non-zero for errors. Set *stop for "no more work to do"
|
* Return non-zero for errors. Set *stop for "no more work to do"
|
||||||
*/
|
*/
|
||||||
typedef int (*VipsThreadpoolAllocate)( VipsThread *thr,
|
typedef int (*VipsThreadpoolAllocate)( VipsThreadState *state,
|
||||||
void *a, void *b, void *c, gboolean *stop );
|
void *a, void *b, void *c, gboolean *stop );
|
||||||
|
|
||||||
/* A work function. This does a unit of work (eg. processing a tile or
|
/* A work function. This does a unit of work (eg. processing a tile or
|
||||||
* whatever). Return non-zero for errors.
|
* whatever). Return non-zero for errors.
|
||||||
*/
|
*/
|
||||||
typedef int (*VipsThreadpoolWork)( VipsThread *thr, REGION *reg,
|
typedef int (*VipsThreadpoolWork)( VipsThreadState *state,
|
||||||
void *a, void *b, void *c );
|
void *a, void *b, void *c );
|
||||||
|
|
||||||
/* A progress function. This is run by the main thread once for every
|
/* A progress function. This is run by the main thread once for every
|
||||||
@ -93,47 +76,6 @@ typedef int (*VipsThreadpoolWork)( VipsThread *thr, REGION *reg,
|
|||||||
*/
|
*/
|
||||||
typedef int (*VipsThreadpoolProgress)( void *a, void *b, void *c );
|
typedef int (*VipsThreadpoolProgress)( void *a, void *b, void *c );
|
||||||
|
|
||||||
/* What we track for a group of threads working together.
|
|
||||||
*/
|
|
||||||
typedef struct _VipsThreadpool {
|
|
||||||
/* All private.
|
|
||||||
*/
|
|
||||||
/*< private >*/
|
|
||||||
VipsImage *im; /* Image we are calculating */
|
|
||||||
|
|
||||||
/* Do a unit of work (runs in parallel) and allocate a unit of work
|
|
||||||
* (serial). Plus the mutex we use to serialize work allocation.
|
|
||||||
*/
|
|
||||||
VipsThreadpoolAllocate allocate;
|
|
||||||
VipsThreadpoolWork work;
|
|
||||||
GMutex *allocate_lock;
|
|
||||||
void *a, *b, *c; /* User arguments to work / allocate */
|
|
||||||
|
|
||||||
int nthr; /* Number of threads in pool */
|
|
||||||
VipsThread **thr; /* Threads */
|
|
||||||
|
|
||||||
/* The caller blocks here until all threads finish.
|
|
||||||
*/
|
|
||||||
im_semaphore_t finish;
|
|
||||||
|
|
||||||
/* Workers up this for every loop to make the main thread tick.
|
|
||||||
*/
|
|
||||||
im_semaphore_t tick;
|
|
||||||
|
|
||||||
/* Set this to abort evaluation early with an error.
|
|
||||||
*/
|
|
||||||
gboolean error;
|
|
||||||
|
|
||||||
/* Set by Allocate (via an arg) to indicate normal end of computation.
|
|
||||||
*/
|
|
||||||
gboolean stop;
|
|
||||||
|
|
||||||
/* Set this if the pool has been shut down. We sometimes need to allow
|
|
||||||
* double-frees.
|
|
||||||
*/
|
|
||||||
gboolean zombie;
|
|
||||||
} VipsThreadpool;
|
|
||||||
|
|
||||||
int vips_threadpool_run( VipsImage *im,
|
int vips_threadpool_run( VipsImage *im,
|
||||||
VipsThreadpoolAllocate allocate,
|
VipsThreadpoolAllocate allocate,
|
||||||
VipsThreadpoolWork work,
|
VipsThreadpoolWork work,
|
||||||
@ -142,9 +84,9 @@ int vips_threadpool_run( VipsImage *im,
|
|||||||
void vips_get_tile_size( VipsImage *im,
|
void vips_get_tile_size( VipsImage *im,
|
||||||
int *tile_width, int *tile_height, int *nlines );
|
int *tile_width, int *tile_height, int *nlines );
|
||||||
|
|
||||||
extern int im__wbuffer2;
|
typedef im_wbuffer_fn VipsRegionWrite;
|
||||||
|
int vips_discsink( VipsImage *im,
|
||||||
int im_wbuffer2( VipsImage *im, im_wbuffer_fn write_fn, void *a, void *b );
|
VipsRegionWrite write_fn, void *a, void *b );
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
|
@ -29,7 +29,7 @@ libiofuncs_la_SOURCES = \
|
|||||||
im_unmapfile.c \
|
im_unmapfile.c \
|
||||||
im_guess_prefix.c \
|
im_guess_prefix.c \
|
||||||
im_wbuffer.c \
|
im_wbuffer.c \
|
||||||
im_wbuffer2.c \
|
discsink.c \
|
||||||
im_wrapmany.c \
|
im_wrapmany.c \
|
||||||
im_writeline.c \
|
im_writeline.c \
|
||||||
memory.c \
|
memory.c \
|
||||||
|
@ -1,4 +1,4 @@
|
|||||||
/* Double-buffered write.
|
/* Write an image.
|
||||||
*
|
*
|
||||||
* 19/3/10
|
* 19/3/10
|
||||||
* - from im_wbuffer.c
|
* - from im_wbuffer.c
|
||||||
@ -99,7 +99,7 @@ typedef struct _Write {
|
|||||||
|
|
||||||
/* The file format write operation.
|
/* The file format write operation.
|
||||||
*/
|
*/
|
||||||
im_wbuffer_fn write_fn;
|
VipsRegionWrite write_fn;
|
||||||
void *a;
|
void *a;
|
||||||
void *b;
|
void *b;
|
||||||
} Write;
|
} Write;
|
||||||
@ -148,7 +148,6 @@ wbuffer_write( WriteBuffer *wbuffer )
|
|||||||
|
|
||||||
wbuffer->write_errno = write->write_fn( wbuffer->region,
|
wbuffer->write_errno = write->write_fn( wbuffer->region,
|
||||||
&wbuffer->area, write->a, write->b );
|
&wbuffer->area, write->a, write->b );
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
#ifdef HAVE_THREADS
|
#ifdef HAVE_THREADS
|
||||||
@ -309,7 +308,7 @@ wbuffer_position( WriteBuffer *wbuffer, int top, int height )
|
|||||||
* iteration.
|
* iteration.
|
||||||
*/
|
*/
|
||||||
static gboolean
|
static gboolean
|
||||||
wbuffer_allocate_fn( VipsThread *thr,
|
wbuffer_allocate_fn( VipsThreadState *state,
|
||||||
void *a, void *b, void *c, gboolean *stop )
|
void *a, void *b, void *c, gboolean *stop )
|
||||||
{
|
{
|
||||||
Write *write = (Write *) a;
|
Write *write = (Write *) a;
|
||||||
@ -359,8 +358,8 @@ wbuffer_allocate_fn( VipsThread *thr,
|
|||||||
tile.top = write->y;
|
tile.top = write->y;
|
||||||
tile.width = write->tile_width;
|
tile.width = write->tile_width;
|
||||||
tile.height = write->tile_height;
|
tile.height = write->tile_height;
|
||||||
im_rect_intersectrect( &image, &tile, &thr->pos );
|
im_rect_intersectrect( &image, &tile, &state->pos );
|
||||||
thr->a = write->buf;
|
state->d = write->buf;
|
||||||
|
|
||||||
/* Add to the number of writers on the buffer.
|
/* Add to the number of writers on the buffer.
|
||||||
*/
|
*/
|
||||||
@ -376,17 +375,17 @@ wbuffer_allocate_fn( VipsThread *thr,
|
|||||||
/* Our VipsThreadpoolWork function ... generate a tile!
|
/* Our VipsThreadpoolWork function ... generate a tile!
|
||||||
*/
|
*/
|
||||||
static int
|
static int
|
||||||
wbuffer_work_fn( VipsThread *thr,
|
wbuffer_work_fn( VipsThreadState *state,
|
||||||
REGION *reg, void *a, void *b, void *c )
|
void *a, void *b, void *c )
|
||||||
{
|
{
|
||||||
WriteBuffer *wbuffer = (WriteBuffer *) thr->a;
|
WriteBuffer *wbuffer = (WriteBuffer *) state->d;
|
||||||
|
|
||||||
#ifdef DEBUG
|
#ifdef DEBUG
|
||||||
printf( "wbuffer_work_fn\n" );
|
printf( "wbuffer_work_fn\n" );
|
||||||
#endif /*DEBUG*/
|
#endif /*DEBUG*/
|
||||||
|
|
||||||
if( im_prepare_to( reg, wbuffer->region,
|
if( im_prepare_to( state->reg, wbuffer->region,
|
||||||
&thr->pos, thr->pos.left, thr->pos.top ) )
|
&state->pos, state->pos.left, state->pos.top ) )
|
||||||
return( -1 );
|
return( -1 );
|
||||||
|
|
||||||
/* Tell the bg write thread we've left.
|
/* Tell the bg write thread we've left.
|
||||||
@ -415,7 +414,7 @@ wbuffer_progress_fn( void *a, void *b, void *c )
|
|||||||
|
|
||||||
static void
|
static void
|
||||||
write_init( Write *write,
|
write_init( Write *write,
|
||||||
VipsImage *im, im_wbuffer_fn write_fn, void *a, void *b )
|
VipsImage *im, VipsRegionWrite write_fn, void *a, void *b )
|
||||||
{
|
{
|
||||||
write->im = im;
|
write->im = im;
|
||||||
write->buf = wbuffer_new( write );
|
write->buf = wbuffer_new( write );
|
||||||
@ -437,8 +436,42 @@ write_free( Write *write )
|
|||||||
IM_FREEF( wbuffer_free, write->buf_back );
|
IM_FREEF( wbuffer_free, write->buf_back );
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* VipsRegionWrite:
|
||||||
|
* @region: pixels to write
|
||||||
|
* @a: client data
|
||||||
|
* @b: client data
|
||||||
|
*
|
||||||
|
* The function should write the pixels in @region. @a and @b are the values
|
||||||
|
* passed into vips_discsink().
|
||||||
|
*
|
||||||
|
* See also: vips_discsink().
|
||||||
|
*
|
||||||
|
* Returns: 0 on success, -1 on error.
|
||||||
|
*/
|
||||||
|
|
||||||
|
/**
|
||||||
|
* vips_discsink:
|
||||||
|
* @im: image to process
|
||||||
|
* @write_fn: called for every batch of pixels
|
||||||
|
* @a: client data
|
||||||
|
* @b: client data
|
||||||
|
*
|
||||||
|
* vips_discsink() loops over @im, top-to-bottom, generating it in sections.
|
||||||
|
* As each section is produced, @write_fn is called.
|
||||||
|
*
|
||||||
|
* @write_fn is always called single-threaded, it's always given image
|
||||||
|
* sections in top-to-bottom order, and there are never any gaps.
|
||||||
|
*
|
||||||
|
* This operation is handy for making image sinks which output to things like
|
||||||
|
* disc files.
|
||||||
|
*
|
||||||
|
* See also: im_concurrency_set().
|
||||||
|
*
|
||||||
|
* Returns: 0 on success, -1 on error.
|
||||||
|
*/
|
||||||
int
|
int
|
||||||
im_wbuffer2( VipsImage *im, im_wbuffer_fn write_fn, void *a, void *b )
|
vips_discsink( VipsImage *im, VipsRegionWrite write_fn, void *a, void *b )
|
||||||
{
|
{
|
||||||
Write write;
|
Write write;
|
||||||
int result;
|
int result;
|
@ -56,6 +56,7 @@
|
|||||||
|
|
||||||
#include <vips/vips.h>
|
#include <vips/vips.h>
|
||||||
#include <vips/internal.h>
|
#include <vips/internal.h>
|
||||||
|
#include <vips/private.h>
|
||||||
#include <vips/thread.h>
|
#include <vips/thread.h>
|
||||||
|
|
||||||
#ifdef WITH_DMALLOC
|
#ifdef WITH_DMALLOC
|
||||||
@ -429,7 +430,7 @@ im_wbuffer( im_threadgroup_t *tg, im_wbuffer_fn write_fn, void *a, void *b )
|
|||||||
/* Optionally use the newer one.
|
/* Optionally use the newer one.
|
||||||
*/
|
*/
|
||||||
if( im__wbuffer2 )
|
if( im__wbuffer2 )
|
||||||
return( im_wbuffer2( tg->im, write_fn, a, b ) );
|
return( vips_discsink( tg->im, write_fn, a, b ) );
|
||||||
|
|
||||||
if( im__start_eval( tg->im ) )
|
if( im__start_eval( tg->im ) )
|
||||||
return( -1 );
|
return( -1 );
|
||||||
|
@ -4,6 +4,9 @@
|
|||||||
* - from threadgroup.c
|
* - from threadgroup.c
|
||||||
* - distributed work allocation idea from Christian Blenia, thank you
|
* - distributed work allocation idea from Christian Blenia, thank you
|
||||||
* very much
|
* very much
|
||||||
|
* 21/3/10
|
||||||
|
* - progress feedback
|
||||||
|
* - only expose VipsThreadState
|
||||||
*/
|
*/
|
||||||
|
|
||||||
/*
|
/*
|
||||||
@ -66,13 +69,79 @@
|
|||||||
* @see_also: <link linkend="libvips-generate">generate</link>
|
* @see_also: <link linkend="libvips-generate">generate</link>
|
||||||
* @include: vips/vips.h
|
* @include: vips/vips.h
|
||||||
*
|
*
|
||||||
* This is like threadgroup, but the work allocation is distributed. This
|
* vips_threadpool_run() loops a set of threads over an image. Threads take it
|
||||||
* reduces the synchronisation overhead and improves scalability.
|
* in turns to allocate units of work (a unit might be a tile in an image),
|
||||||
|
* then run in parallel to process those units. An optional progress function
|
||||||
|
* can be used to give feedback.
|
||||||
*
|
*
|
||||||
* Most of this is internal to VIPS and does not need to be documented. You
|
* This is like threadgroup, but workers allocate work units themselves. This
|
||||||
* should only need vips_threadpool_new() and vips_threadpool_free().
|
* reduces the synchronisation overhead and improves scalability.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
|
/* What we track for each thread in the pool.
|
||||||
|
*/
|
||||||
|
typedef struct {
|
||||||
|
/* All private.
|
||||||
|
*/
|
||||||
|
/*< private >*/
|
||||||
|
VipsThreadState state;
|
||||||
|
|
||||||
|
struct _VipsThreadpool *pool; /* Pool we are part of */
|
||||||
|
|
||||||
|
/* Thread we are running.
|
||||||
|
*/
|
||||||
|
GThread *thread;
|
||||||
|
|
||||||
|
/* Set this to ask the thread to exit.
|
||||||
|
*/
|
||||||
|
gboolean exit;
|
||||||
|
|
||||||
|
/* Set by the thread if work or allocate return an error.
|
||||||
|
*/
|
||||||
|
gboolean error;
|
||||||
|
|
||||||
|
#ifdef TIME_THREAD
|
||||||
|
double *btime, *etime;
|
||||||
|
int tpos;
|
||||||
|
#endif /*TIME_THREAD*/
|
||||||
|
} VipsThread;
|
||||||
|
|
||||||
|
/* What we track for a group of threads working together.
|
||||||
|
*/
|
||||||
|
typedef struct _VipsThreadpool {
|
||||||
|
/* All private.
|
||||||
|
*/
|
||||||
|
/*< private >*/
|
||||||
|
VipsImage *im; /* Image we are calculating */
|
||||||
|
|
||||||
|
/* Do a unit of work (runs in parallel) and allocate a unit of work
|
||||||
|
* (serial). Plus the mutex we use to serialize work allocation.
|
||||||
|
*/
|
||||||
|
VipsThreadpoolAllocate allocate;
|
||||||
|
VipsThreadpoolWork work;
|
||||||
|
GMutex *allocate_lock;
|
||||||
|
void *a, *b, *c; /* User arguments to work / allocate */
|
||||||
|
|
||||||
|
int nthr; /* Number of threads in pool */
|
||||||
|
VipsThread **thr; /* Threads */
|
||||||
|
|
||||||
|
/* The caller blocks here until all threads finish.
|
||||||
|
*/
|
||||||
|
im_semaphore_t finish;
|
||||||
|
|
||||||
|
/* Workers up this for every loop to make the main thread tick.
|
||||||
|
*/
|
||||||
|
im_semaphore_t tick;
|
||||||
|
|
||||||
|
/* Set this to abort evaluation early with an error.
|
||||||
|
*/
|
||||||
|
gboolean error;
|
||||||
|
|
||||||
|
/* Set by Allocate (via an arg) to indicate normal end of computation.
|
||||||
|
*/
|
||||||
|
gboolean stop;
|
||||||
|
} VipsThreadpool;
|
||||||
|
|
||||||
#ifdef TIME_THREAD
|
#ifdef TIME_THREAD
|
||||||
/* Size of time buffers.
|
/* Size of time buffers.
|
||||||
*/
|
*/
|
||||||
@ -84,7 +153,7 @@ static GTimer *thread_timer = NULL;
|
|||||||
/* Save time buffers.
|
/* Save time buffers.
|
||||||
*/
|
*/
|
||||||
static int
|
static int
|
||||||
save_time_buffers( VipsThread *thr )
|
vips_thread_save_time_buffers( VipsThread *thr )
|
||||||
{
|
{
|
||||||
int i;
|
int i;
|
||||||
static int rn = 1;
|
static int rn = 1;
|
||||||
@ -92,7 +161,8 @@ save_time_buffers( VipsThread *thr )
|
|||||||
char name[256];
|
char name[256];
|
||||||
|
|
||||||
im_snprintf( name, 256, "time%d", rn++ );
|
im_snprintf( name, 256, "time%d", rn++ );
|
||||||
printf( "saving time buffer to \"%s\"\n", name );
|
printf( "vips_thread_save_time_buffers: "
|
||||||
|
"saving buffer to \"%s\"\n", name );
|
||||||
if( !(fp = fopen( name, "w" )) )
|
if( !(fp = fopen( name, "w" )) )
|
||||||
error_exit( "unable to write to \"%s\"", name );
|
error_exit( "unable to write to \"%s\"", name );
|
||||||
for( i = 0; i < thr->tpos; i++ )
|
for( i = 0; i < thr->tpos; i++ )
|
||||||
@ -106,7 +176,7 @@ save_time_buffers( VipsThread *thr )
|
|||||||
/* Junk a thread.
|
/* Junk a thread.
|
||||||
*/
|
*/
|
||||||
static void
|
static void
|
||||||
thread_free( VipsThread *thr )
|
vips_thread_free( VipsThread *thr )
|
||||||
{
|
{
|
||||||
/* Is there a thread running this region? Kill it!
|
/* Is there a thread running this region? Kill it!
|
||||||
*/
|
*/
|
||||||
@ -123,12 +193,12 @@ thread_free( VipsThread *thr )
|
|||||||
thr->thread = NULL;
|
thr->thread = NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
IM_FREEF( im_region_free, thr->reg );
|
IM_FREEF( im_region_free, thr->state.reg );
|
||||||
thr->pool = NULL;
|
thr->pool = NULL;
|
||||||
|
|
||||||
#ifdef TIME_THREAD
|
#ifdef TIME_THREAD
|
||||||
if( thr->btime )
|
if( thr->btime )
|
||||||
(void) save_time_buffers( thr );
|
(void) vips_thread_save_time_buffers( thr );
|
||||||
#endif /*TIME_THREAD*/
|
#endif /*TIME_THREAD*/
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -136,20 +206,22 @@ thread_free( VipsThread *thr )
|
|||||||
* from the main thread if threading is off.
|
* from the main thread if threading is off.
|
||||||
*/
|
*/
|
||||||
static void
|
static void
|
||||||
thread_work_unit( VipsThread *thr )
|
vips_thread_work_unit( VipsThread *thr )
|
||||||
{
|
{
|
||||||
VipsThreadpool *pool = thr->pool;
|
VipsThreadpool *pool = thr->pool;
|
||||||
|
|
||||||
/* Ask for a work unit.
|
/* Ask for a work unit.
|
||||||
*/
|
*/
|
||||||
g_mutex_lock( pool->allocate_lock );
|
g_mutex_lock( pool->allocate_lock );
|
||||||
|
|
||||||
if( !pool->stop ) {
|
if( !pool->stop ) {
|
||||||
if( pool->allocate( thr,
|
if( pool->allocate( &thr->state,
|
||||||
pool->a, pool->b, pool->c, &pool->stop ) ) {
|
pool->a, pool->b, pool->c, &pool->stop ) ) {
|
||||||
thr->error = TRUE;
|
thr->error = TRUE;
|
||||||
pool->error = TRUE;
|
pool->error = TRUE;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
g_mutex_unlock( pool->allocate_lock );
|
g_mutex_unlock( pool->allocate_lock );
|
||||||
|
|
||||||
if( pool->stop || pool->error )
|
if( pool->stop || pool->error )
|
||||||
@ -165,7 +237,7 @@ thread_work_unit( VipsThread *thr )
|
|||||||
|
|
||||||
/* Process a work unit.
|
/* Process a work unit.
|
||||||
*/
|
*/
|
||||||
if( pool->work( thr, thr->reg, pool->a, pool->b, pool->c ) ) {
|
if( pool->work( &thr->state, pool->a, pool->b, pool->c ) ) {
|
||||||
thr->error = TRUE;
|
thr->error = TRUE;
|
||||||
pool->error = TRUE;
|
pool->error = TRUE;
|
||||||
}
|
}
|
||||||
@ -185,7 +257,7 @@ thread_work_unit( VipsThread *thr )
|
|||||||
/* What runs as a thread ... loop, waiting to be told to do stuff.
|
/* What runs as a thread ... loop, waiting to be told to do stuff.
|
||||||
*/
|
*/
|
||||||
static void *
|
static void *
|
||||||
thread_main_loop( void *a )
|
vips_thread_main_loop( void *a )
|
||||||
{
|
{
|
||||||
VipsThread *thr = (VipsThread *) a;
|
VipsThread *thr = (VipsThread *) a;
|
||||||
VipsThreadpool *pool = thr->pool;
|
VipsThreadpool *pool = thr->pool;
|
||||||
@ -195,13 +267,13 @@ thread_main_loop( void *a )
|
|||||||
/* We now control the region (it was created by pool when we
|
/* We now control the region (it was created by pool when we
|
||||||
* were built).
|
* were built).
|
||||||
*/
|
*/
|
||||||
im__region_take_ownership( thr->reg );
|
im__region_take_ownership( thr->state.reg );
|
||||||
|
|
||||||
/* Process work units! Always tick, even if we are stopping, so the
|
/* Process work units! Always tick, even if we are stopping, so the
|
||||||
* main thread will wake up for exit.
|
* main thread will wake up for exit.
|
||||||
*/
|
*/
|
||||||
for(;;) {
|
for(;;) {
|
||||||
thread_work_unit( thr );
|
vips_thread_work_unit( thr );
|
||||||
im_semaphore_up( &pool->tick );
|
im_semaphore_up( &pool->tick );
|
||||||
|
|
||||||
if( pool->stop || pool->error )
|
if( pool->stop || pool->error )
|
||||||
@ -226,7 +298,7 @@ vips_thread_new( VipsThreadpool *pool )
|
|||||||
if( !(thr = IM_NEW( pool->im, VipsThread )) )
|
if( !(thr = IM_NEW( pool->im, VipsThread )) )
|
||||||
return( NULL );
|
return( NULL );
|
||||||
thr->pool = pool;
|
thr->pool = pool;
|
||||||
thr->reg = NULL;
|
thr->state.reg = NULL;
|
||||||
thr->thread = NULL;
|
thr->thread = NULL;
|
||||||
thr->exit = 0;
|
thr->exit = 0;
|
||||||
thr->error = 0;
|
thr->error = 0;
|
||||||
@ -238,14 +310,14 @@ vips_thread_new( VipsThreadpool *pool )
|
|||||||
|
|
||||||
/* Attach stuff.
|
/* Attach stuff.
|
||||||
*/
|
*/
|
||||||
if( !(thr->reg = im_region_create( pool->im )) ) {
|
if( !(thr->state.reg = im_region_create( pool->im )) ) {
|
||||||
thread_free( thr );
|
vips_thread_free( thr );
|
||||||
return( NULL );
|
return( NULL );
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Get ready to hand the region over to the thread.
|
/* Get ready to hand the region over to the thread.
|
||||||
*/
|
*/
|
||||||
im__region_no_ownership( thr->reg );
|
im__region_no_ownership( thr->state.reg );
|
||||||
|
|
||||||
#ifdef TIME_THREAD
|
#ifdef TIME_THREAD
|
||||||
thr->btime = IM_ARRAY( pool->im, IM_TBUF_SIZE, double );
|
thr->btime = IM_ARRAY( pool->im, IM_TBUF_SIZE, double );
|
||||||
@ -261,17 +333,17 @@ vips_thread_new( VipsThreadpool *pool )
|
|||||||
* we need to insist on a non-tiny stack. Some platforms default to
|
* we need to insist on a non-tiny stack. Some platforms default to
|
||||||
* very small values (eg. various BSDs).
|
* very small values (eg. various BSDs).
|
||||||
*/
|
*/
|
||||||
if( !(thr->thread = g_thread_create_full( thread_main_loop, thr,
|
if( !(thr->thread = g_thread_create_full( vips_thread_main_loop, thr,
|
||||||
IM__DEFAULT_STACK_SIZE, TRUE, FALSE,
|
IM__DEFAULT_STACK_SIZE, TRUE, FALSE,
|
||||||
G_THREAD_PRIORITY_NORMAL, NULL )) ) {
|
G_THREAD_PRIORITY_NORMAL, NULL )) ) {
|
||||||
im_error( "threadgroup_thread_new",
|
im_error( "threadgroup_thread_new",
|
||||||
"%s", _( "unable to create thread" ) );
|
"%s", _( "unable to create thread" ) );
|
||||||
thread_free( thr );
|
vips_thread_free( thr );
|
||||||
return( NULL );
|
return( NULL );
|
||||||
}
|
}
|
||||||
|
|
||||||
#ifdef DEBUG_CREATE
|
#ifdef DEBUG_CREATE
|
||||||
printf( "threadgroup_thread_new: g_thread_create_full()\n" );
|
printf( "vips_thread_new: g_thread_create_full()\n" );
|
||||||
#endif /*DEBUG_CREATE*/
|
#endif /*DEBUG_CREATE*/
|
||||||
#endif /*HAVE_THREADS*/
|
#endif /*HAVE_THREADS*/
|
||||||
|
|
||||||
@ -281,36 +353,23 @@ vips_thread_new( VipsThreadpool *pool )
|
|||||||
/* Kill all threads in a threadgroup, if there are any.
|
/* Kill all threads in a threadgroup, if there are any.
|
||||||
*/
|
*/
|
||||||
static void
|
static void
|
||||||
threadpool_kill_threads( VipsThreadpool *pool )
|
vips_threadpool_kill_threads( VipsThreadpool *pool )
|
||||||
{
|
{
|
||||||
if( pool->thr ) {
|
if( pool->thr ) {
|
||||||
int i;
|
int i;
|
||||||
|
|
||||||
for( i = 0; i < pool->nthr; i++ )
|
for( i = 0; i < pool->nthr; i++ )
|
||||||
thread_free( pool->thr[i] );
|
vips_thread_free( pool->thr[i] );
|
||||||
pool->thr = NULL;
|
pool->thr = NULL;
|
||||||
|
|
||||||
#ifdef DEBUG_IO
|
#ifdef DEBUG_IO
|
||||||
printf( "threadpool_kill_threads: killed %d threads\n",
|
printf( "vips_threadpool_kill_threads: killed %d threads\n",
|
||||||
pool->nthr );
|
pool->nthr );
|
||||||
#endif /*DEBUG_IO*/
|
#endif /*DEBUG_IO*/
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
static int
|
||||||
* vips_threadpool_free:
|
|
||||||
* @pool: pool to free
|
|
||||||
*
|
|
||||||
* Frees a VipsThreadpool. This function can be called multiple times, though
|
|
||||||
* only the first call will have any effect.
|
|
||||||
*
|
|
||||||
* All worker threads are terminated and all resources freed.
|
|
||||||
*
|
|
||||||
* See also: vips_threadpool_new().
|
|
||||||
*
|
|
||||||
* Returns: 0.
|
|
||||||
*/
|
|
||||||
int
|
|
||||||
vips_threadpool_free( VipsThreadpool *pool )
|
vips_threadpool_free( VipsThreadpool *pool )
|
||||||
{
|
{
|
||||||
#ifdef DEBUG_IO
|
#ifdef DEBUG_IO
|
||||||
@ -318,31 +377,15 @@ vips_threadpool_free( VipsThreadpool *pool )
|
|||||||
pool->im->filename, pool );
|
pool->im->filename, pool );
|
||||||
#endif /*DEBUG_IO*/
|
#endif /*DEBUG_IO*/
|
||||||
|
|
||||||
if( !pool || pool->zombie )
|
vips_threadpool_kill_threads( pool );
|
||||||
return( 0 );
|
|
||||||
|
|
||||||
threadpool_kill_threads( pool );
|
|
||||||
IM_FREEF( g_mutex_free, pool->allocate_lock );
|
IM_FREEF( g_mutex_free, pool->allocate_lock );
|
||||||
im_semaphore_destroy( &pool->finish );
|
im_semaphore_destroy( &pool->finish );
|
||||||
im_semaphore_destroy( &pool->tick );
|
im_semaphore_destroy( &pool->tick );
|
||||||
pool->zombie = 1;
|
|
||||||
|
|
||||||
return( 0 );
|
return( 0 );
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
static VipsThreadpool *
|
||||||
* vips_threadpool_new:
|
|
||||||
* @im: image to create the threadgroup on
|
|
||||||
*
|
|
||||||
* Makes a threadpool attached to the image. The threadgroup will be freed
|
|
||||||
* for you if the image is closed, but you can free it yourself with
|
|
||||||
* vips_threadpool_free() if you wish.
|
|
||||||
*
|
|
||||||
* See also: vips_threadpool_free().
|
|
||||||
*
|
|
||||||
* Returns: an #VipsThreadpool on success, %NULL on error.
|
|
||||||
*/
|
|
||||||
VipsThreadpool *
|
|
||||||
vips_threadpool_new( VipsImage *im )
|
vips_threadpool_new( VipsImage *im )
|
||||||
{
|
{
|
||||||
VipsThreadpool *pool;
|
VipsThreadpool *pool;
|
||||||
@ -361,7 +404,6 @@ vips_threadpool_new( VipsImage *im )
|
|||||||
im_semaphore_init( &pool->tick, 0, "tick" );
|
im_semaphore_init( &pool->tick, 0, "tick" );
|
||||||
pool->stop = FALSE;
|
pool->stop = FALSE;
|
||||||
pool->error = FALSE;
|
pool->error = FALSE;
|
||||||
pool->zombie = FALSE;
|
|
||||||
|
|
||||||
/* Attach tidy-up callback.
|
/* Attach tidy-up callback.
|
||||||
*/
|
*/
|
||||||
@ -382,7 +424,7 @@ vips_threadpool_new( VipsImage *im )
|
|||||||
/* Attach a set of threads.
|
/* Attach a set of threads.
|
||||||
*/
|
*/
|
||||||
static int
|
static int
|
||||||
threadpool_create_threads( VipsThreadpool *pool )
|
vips_threadpool_create_threads( VipsThreadpool *pool )
|
||||||
{
|
{
|
||||||
int i;
|
int i;
|
||||||
|
|
||||||
@ -399,13 +441,115 @@ threadpool_create_threads( VipsThreadpool *pool )
|
|||||||
*/
|
*/
|
||||||
for( i = 0; i < pool->nthr; i++ )
|
for( i = 0; i < pool->nthr; i++ )
|
||||||
if( !(pool->thr[i] = vips_thread_new( pool )) ) {
|
if( !(pool->thr[i] = vips_thread_new( pool )) ) {
|
||||||
threadpool_kill_threads( pool );
|
vips_threadpool_kill_threads( pool );
|
||||||
return( -1 );
|
return( -1 );
|
||||||
}
|
}
|
||||||
|
|
||||||
return( 0 );
|
return( 0 );
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* VipsThreadState:
|
||||||
|
* @reg: a #REGION
|
||||||
|
* @pos: a #Rect
|
||||||
|
* @x: an int
|
||||||
|
* @y: an int
|
||||||
|
* @d: client data
|
||||||
|
* @e: client data
|
||||||
|
* @f: client data
|
||||||
|
*
|
||||||
|
* These per-thread values are carried around for your use by
|
||||||
|
* vips_threadpool_run(). They are private to each thread, so they are handy
|
||||||
|
* for VipsThreadpoolAllocate and VipsThreadpoolWork to communicate.
|
||||||
|
*
|
||||||
|
* @reg is created for you at the start of processing and freed at the end,
|
||||||
|
* but you can do what you like with it.
|
||||||
|
*/
|
||||||
|
|
||||||
|
/**
|
||||||
|
* VipsThreadpoolAllocate:
|
||||||
|
* @state: per-thread state
|
||||||
|
* @a: client data
|
||||||
|
* @b: client data
|
||||||
|
* @c: client data
|
||||||
|
* @stop: set this to signal end of computation
|
||||||
|
*
|
||||||
|
* This function is called to allocate a new work unit for the thread. It is
|
||||||
|
* always single-threaded, so it can modify per-call state (such as a
|
||||||
|
* counter).
|
||||||
|
*
|
||||||
|
* @a, @b, @c are the values supplied to the call to
|
||||||
|
* vips_threadpool_run().
|
||||||
|
*
|
||||||
|
* It should set @stop to %TRUE to indicate that no work could be allocated
|
||||||
|
* because the job is done.
|
||||||
|
*
|
||||||
|
* It should return 0 for a normal return, or non-zero to indicate an error.
|
||||||
|
*
|
||||||
|
* See also: vips_threadpool_run().
|
||||||
|
*/
|
||||||
|
|
||||||
|
/**
|
||||||
|
* VipsThreadpoolWork:
|
||||||
|
* @state: per-thread state
|
||||||
|
* @a: client data
|
||||||
|
* @b: client data
|
||||||
|
* @c: client data
|
||||||
|
*
|
||||||
|
* This function is called to process a work unit. Many copies of this can run
|
||||||
|
* at once, so it should not write to the per-call state. It can write to
|
||||||
|
* per-thread state.
|
||||||
|
*
|
||||||
|
* @a, @b, @c are the values supplied to the call to
|
||||||
|
* vips_threadpool_run().
|
||||||
|
*
|
||||||
|
* It should return 0 for a normal return, or non-zero to indicate an error.
|
||||||
|
*
|
||||||
|
* See also: vips_threadpool_run().
|
||||||
|
*/
|
||||||
|
|
||||||
|
/**
|
||||||
|
* VipsThreadpoolProgress:
|
||||||
|
* @a: client data
|
||||||
|
* @b: client data
|
||||||
|
* @c: client data
|
||||||
|
*
|
||||||
|
* This function is called by the main thread once for every work unit
|
||||||
|
* processed. It can be used to give the user progress feedback.
|
||||||
|
*
|
||||||
|
* It should return 0 for a normal return, or non-zero to indicate an error.
|
||||||
|
*
|
||||||
|
* See also: vips_threadpool_run().
|
||||||
|
*/
|
||||||
|
|
||||||
|
/**
|
||||||
|
* vips_threadpool_run:
|
||||||
|
* @im: image to loop over
|
||||||
|
* @allocate: allocate a work unit
|
||||||
|
* @work: process a work unit
|
||||||
|
* @progress: give progress feedback about a work unit, or %NULL
|
||||||
|
* @a: client data
|
||||||
|
* @b: client data
|
||||||
|
* @c: client data
|
||||||
|
*
|
||||||
|
* This function runs a set of threads over an image. Each thread run
|
||||||
|
* @allocate to set up a new work unit (perhaps the next tile in an image, for
|
||||||
|
* example), then @work to process that work unit. After each unit is
|
||||||
|
* processed, @progress is called, so that the operation can give
|
||||||
|
* progress feedback.@progress may be %NULL.
|
||||||
|
*
|
||||||
|
* Each thread has a small amount of private state that vips_threadpool_run()
|
||||||
|
* maintains for it. The @allocate and @work functions can use this to
|
||||||
|
* communicate.
|
||||||
|
*
|
||||||
|
* @allocate is always single-threaded (so it can write to the per-call state),
|
||||||
|
* whereas @work can be executed concurrently. @progress is always called by
|
||||||
|
* the main thread (ie. the thread which called vips_thtreadpool_run()).
|
||||||
|
*
|
||||||
|
* See also: im_wbuffer2(), im_concurrency_set().
|
||||||
|
*
|
||||||
|
* Returns: 0 on success, -1 on error.
|
||||||
|
*/
|
||||||
int
|
int
|
||||||
vips_threadpool_run( VipsImage *im,
|
vips_threadpool_run( VipsImage *im,
|
||||||
VipsThreadpoolAllocate allocate,
|
VipsThreadpoolAllocate allocate,
|
||||||
@ -432,7 +576,7 @@ vips_threadpool_run( VipsImage *im,
|
|||||||
|
|
||||||
/* Attach workers and set them going.
|
/* Attach workers and set them going.
|
||||||
*/
|
*/
|
||||||
if( threadpool_create_threads( pool ) ) {
|
if( vips_threadpool_create_threads( pool ) ) {
|
||||||
vips_threadpool_free( pool );
|
vips_threadpool_free( pool );
|
||||||
return( -1 );
|
return( -1 );
|
||||||
}
|
}
|
||||||
@ -445,7 +589,7 @@ vips_threadpool_run( VipsImage *im,
|
|||||||
#else
|
#else
|
||||||
/* No threads, do the work ourselves in the main thread.
|
/* No threads, do the work ourselves in the main thread.
|
||||||
*/
|
*/
|
||||||
thread_work_unit( pool->thr[0] );
|
vips_thread_work_unit( pool->thr[0] );
|
||||||
#endif /*HAVE_THREADS*/
|
#endif /*HAVE_THREADS*/
|
||||||
|
|
||||||
if( pool->stop || pool->error )
|
if( pool->stop || pool->error )
|
||||||
@ -472,8 +616,16 @@ vips_threadpool_run( VipsImage *im,
|
|||||||
return( result );
|
return( result );
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Pick a tile size and a buffer height. The buffer height must be a multiple
|
/**
|
||||||
* of tile_height.
|
* vips_get_tile_size:
|
||||||
|
* @im: image to guess for
|
||||||
|
* @tile_width: return selected tile width
|
||||||
|
* @tile_height: return selected tile height
|
||||||
|
* @nlines: return buffer height in scanlines
|
||||||
|
*
|
||||||
|
* Pick a tile size and a buffer height for this image and the current
|
||||||
|
* value of im_concurrency_get(). The buffer height
|
||||||
|
* will always be a multiple of tile_height.
|
||||||
*/
|
*/
|
||||||
void
|
void
|
||||||
vips_get_tile_size( VipsImage *im,
|
vips_get_tile_size( VipsImage *im,
|
||||||
|
Loading…
Reference in New Issue
Block a user