im_iterate() has an optional threadpool backend too
This commit is contained in:
parent
5b00c86bab
commit
56c1c9c036
@ -11,6 +11,7 @@
|
|||||||
- fixed a couple of /0 problems with scale == 0 masks
|
- fixed a couple of /0 problems with scale == 0 masks
|
||||||
- set G_LOG_DOMAIN to VIPS so we can use g_warning etc.
|
- set G_LOG_DOMAIN to VIPS so we can use g_warning etc.
|
||||||
- added VIPS_DEBUG_MSG() macro
|
- added VIPS_DEBUG_MSG() macro
|
||||||
|
- --vips-wbuffer2 turns on threadpool for im_iterate as well
|
||||||
|
|
||||||
16/1/10 started 7.21.2
|
16/1/10 started 7.21.2
|
||||||
- "invalidate" is careful to keep images alive, so invalidate callbacks can do
|
- "invalidate" is careful to keep images alive, so invalidate callbacks can do
|
||||||
|
7
TODO
7
TODO
@ -1,9 +1,8 @@
|
|||||||
|
|
||||||
- how about
|
- vips_threadpool() has three clientdata, do we need all of them? maube one is
|
||||||
|
enough
|
||||||
|
|
||||||
#define VIPS_DEBUG_MSG( STR ) printf( ...)
|
rename vips_discsink() / discsink.c as vips_sink_disc / sinkdisc
|
||||||
|
|
||||||
nicer than lots of #ifdef DEBUG ...
|
|
||||||
|
|
||||||
|
|
||||||
- vips_sink() needs to be able to supply a thread start / stop function
|
- vips_sink() needs to be able to supply a thread start / stop function
|
||||||
|
@ -35,27 +35,27 @@ extern "C" {
|
|||||||
#endif /*__cplusplus*/
|
#endif /*__cplusplus*/
|
||||||
|
|
||||||
#ifdef VIPS_DEBUG
|
#ifdef VIPS_DEBUG
|
||||||
#define VIPS_DEBUG_MSG( FMT, ... ) printf( FMT, __VA_ARGS__ )
|
#define VIPS_DEBUG_MSG( ... ) printf( __VA_ARGS__ )
|
||||||
#else
|
#else
|
||||||
#define VIPS_DEBUG_MSG( FMT, ... )
|
#define VIPS_DEBUG_MSG( ... )
|
||||||
#endif /*VIPS_DEBUG*/
|
#endif /*VIPS_DEBUG*/
|
||||||
|
|
||||||
#ifdef VIPS_DEBUG_RED
|
#ifdef VIPS_DEBUG_RED
|
||||||
#define VIPS_DEBUG_MSG_RED( FMT, ... ) printf( "red: " FMT, __VA_ARGS__ )
|
#define VIPS_DEBUG_MSG_RED( ... ) printf( "red: " __VA_ARGS__ )
|
||||||
#else
|
#else
|
||||||
#define VIPS_DEBUG_MSG_RED( FMT, ... )
|
#define VIPS_DEBUG_MSG_RED( ... )
|
||||||
#endif /*VIPS_DEBUG_RED*/
|
#endif /*VIPS_DEBUG_RED*/
|
||||||
|
|
||||||
#ifdef VIPS_DEBUG_AMBER
|
#ifdef VIPS_DEBUG_AMBER
|
||||||
#define VIPS_DEBUG_MSG_AMBER( FMT, ... ) printf( "amber: " FMT, __VA_ARGS__ )
|
#define VIPS_DEBUG_MSG_AMBER( ... ) printf( "amber: " __VA_ARGS__ )
|
||||||
#else
|
#else
|
||||||
#define VIPS_DEBUG_MSG_AMBER( FMT, ... )
|
#define VIPS_DEBUG_MSG_AMBER( ... )
|
||||||
#endif /*VIPS_DEBUG_AMBER*/
|
#endif /*VIPS_DEBUG_AMBER*/
|
||||||
|
|
||||||
#ifdef VIPS_DEBUG_GREEN
|
#ifdef VIPS_DEBUG_GREEN
|
||||||
#define VIPS_DEBUG_MSG_GREEN( FMT, ... ) printf( "green: " FMT, __VA_ARGS__ )
|
#define VIPS_DEBUG_MSG_GREEN( ... ) printf( "green: " __VA_ARGS__ )
|
||||||
#else
|
#else
|
||||||
#define VIPS_DEBUG_MSG_GREEN( FMT, ... )
|
#define VIPS_DEBUG_MSG_GREEN( ... )
|
||||||
#endif /*VIPS_DEBUG_GREEN*/
|
#endif /*VIPS_DEBUG_GREEN*/
|
||||||
|
|
||||||
/* All open image descriptors ... see im_init() and im_close().
|
/* All open image descriptors ... see im_init() and im_close().
|
||||||
|
@ -90,12 +90,6 @@ int im_render_priority( IMAGE *in, IMAGE *out, IMAGE *mask,
|
|||||||
void (*notify)( IMAGE *, Rect *, void * ), void *client );
|
void (*notify)( IMAGE *, Rect *, void * ), void *client );
|
||||||
int im_cache( IMAGE *in, IMAGE *out, int width, int height, int max );
|
int im_cache( IMAGE *in, IMAGE *out, int width, int height, int max );
|
||||||
|
|
||||||
typedef void (*VipsSinkNotify)( VipsImage *im, Rect *rect, void *client );
|
|
||||||
int vips_sink_screen( VipsImage *in, VipsImage *out, VipsImage *mask,
|
|
||||||
int width, int height, int max,
|
|
||||||
int priority,
|
|
||||||
VipsSinkNotify notify, void *client );
|
|
||||||
|
|
||||||
/* WIO.
|
/* WIO.
|
||||||
*/
|
*/
|
||||||
int im_setupout( IMAGE *im );
|
int im_setupout( IMAGE *im );
|
||||||
|
@ -42,51 +42,103 @@ extern "C" {
|
|||||||
|
|
||||||
#include <vips/semaphore.h>
|
#include <vips/semaphore.h>
|
||||||
|
|
||||||
/* The per-thread state we expose. Allocate functions can use these members to
|
/* Per-thread state. Allocate functions can use these members to
|
||||||
* communicate with work functions.
|
* communicate with work functions.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
|
#define VIPS_TYPE_THREAD_STATE (vips_thread_state_get_type())
|
||||||
|
#define VIPS_THREAD_STATE( obj ) \
|
||||||
|
(G_TYPE_CHECK_INSTANCE_CAST( (obj), \
|
||||||
|
VIPS_TYPE_THREAD_STATE, VipsThreadState ))
|
||||||
|
#define VIPS_THREAD_STATE_CLASS( klass ) \
|
||||||
|
(G_TYPE_CHECK_CLASS_CAST( (klass), \
|
||||||
|
VIPS_TYPE_THREAD_STATE, VipsThreadStateClass))
|
||||||
|
#define VIPS_IS_THREAD_STATE( obj ) \
|
||||||
|
(G_TYPE_CHECK_INSTANCE_TYPE( (obj), VIPS_TYPE_THREAD_STATE ))
|
||||||
|
#define VIPS_IS_THREAD_STATE_CLASS( klass ) \
|
||||||
|
(G_TYPE_CHECK_CLASS_TYPE( (klass), VIPS_TYPE_THREAD_STATE ))
|
||||||
|
#define VIPS_THREAD_STATE_GET_CLASS( obj ) \
|
||||||
|
(G_TYPE_INSTANCE_GET_CLASS( (obj), \
|
||||||
|
VIPS_TYPE_THREAD_STATE, VipsThreadStateClass ))
|
||||||
|
|
||||||
typedef struct _VipsThreadState {
|
typedef struct _VipsThreadState {
|
||||||
|
VipsObject parent_object;
|
||||||
|
|
||||||
|
/* Image we run on.
|
||||||
|
*/
|
||||||
|
VipsImage *im;
|
||||||
|
|
||||||
/* This region is created and destroyed by the threadpool for the
|
/* This region is created and destroyed by the threadpool for the
|
||||||
* worker.
|
* use of the worker.
|
||||||
*/
|
*/
|
||||||
REGION *reg;
|
REGION *reg;
|
||||||
|
|
||||||
/* The rest are neither used nor set, do what you like with them.
|
/* Neither used nor set, do what you like with them.
|
||||||
*/
|
*/
|
||||||
Rect pos;
|
Rect pos;
|
||||||
int x, y;
|
int x, y;
|
||||||
void *d, *e, *f;
|
|
||||||
|
/* The client data passed to the enclosing vips_threadpool_run().
|
||||||
|
*/
|
||||||
|
void *a;
|
||||||
|
|
||||||
} VipsThreadState;
|
} VipsThreadState;
|
||||||
|
|
||||||
|
typedef struct _VipsThreadStateClass {
|
||||||
|
VipsObjectClass parent_class;
|
||||||
|
|
||||||
|
} VipsThreadStateClass;
|
||||||
|
|
||||||
|
void *vips_thread_state_set( VipsObject *object, void *a, void *b );
|
||||||
|
GType vips_thread_state_get_type( void );
|
||||||
|
|
||||||
|
VipsThreadState *vips_thread_state_new( VipsImage *im, void *a );
|
||||||
|
|
||||||
|
/* Constructor for per-thread state.
|
||||||
|
*/
|
||||||
|
typedef VipsThreadState *(*VipsThreadStart)( VipsImage *im, void *a );
|
||||||
|
|
||||||
/* A work allocate function. This is run single-threaded by a worker to
|
/* A work allocate function. This is run single-threaded by a worker to
|
||||||
* set up a new work unit.
|
* set up a new work unit.
|
||||||
* Return non-zero for errors. Set *stop for "no more work to do"
|
* Return non-zero for errors. Set *stop for "no more work to do"
|
||||||
*/
|
*/
|
||||||
typedef int (*VipsThreadpoolAllocate)( VipsThreadState *state,
|
typedef int (*VipsThreadpoolAllocate)( VipsThreadState *state,
|
||||||
void *a, void *b, void *c, gboolean *stop );
|
void *a, gboolean *stop );
|
||||||
|
|
||||||
/* A work function. This does a unit of work (eg. processing a tile or
|
/* A work function. This does a unit of work (eg. processing a tile or
|
||||||
* whatever). Return non-zero for errors.
|
* whatever). Return non-zero for errors.
|
||||||
*/
|
*/
|
||||||
typedef int (*VipsThreadpoolWork)( VipsThreadState *state,
|
typedef int (*VipsThreadpoolWork)( VipsThreadState *state, void *a );
|
||||||
void *a, void *b, void *c );
|
|
||||||
|
|
||||||
/* A progress function. This is run by the main thread once for every
|
/* A progress function. This is run by the main thread once for every
|
||||||
* allocation. Return an error to kill computation early.
|
* allocation. Return an error to kill computation early.
|
||||||
*/
|
*/
|
||||||
typedef int (*VipsThreadpoolProgress)( void *a, void *b, void *c );
|
typedef int (*VipsThreadpoolProgress)( void *a );
|
||||||
|
|
||||||
int vips_threadpool_run( VipsImage *im,
|
int vips_threadpool_run( VipsImage *im,
|
||||||
|
VipsThreadStart start,
|
||||||
VipsThreadpoolAllocate allocate,
|
VipsThreadpoolAllocate allocate,
|
||||||
VipsThreadpoolWork work,
|
VipsThreadpoolWork work,
|
||||||
VipsThreadpoolProgress progress,
|
VipsThreadpoolProgress progress,
|
||||||
void *a, void *b, void *c );
|
void *a );
|
||||||
void vips_get_tile_size( VipsImage *im,
|
void vips_get_tile_size( VipsImage *im,
|
||||||
int *tile_width, int *tile_height, int *nlines );
|
int *tile_width, int *tile_height, int *nlines );
|
||||||
|
|
||||||
typedef int (*VipsRegionWrite)( REGION *region, Rect *area, void *a, void *b );
|
typedef int (*VipsRegionWrite)( REGION *region, Rect *area, void *a );
|
||||||
int vips_discsink( VipsImage *im,
|
int vips_sink_disc( VipsImage *im, VipsRegionWrite write_fn, void *a );
|
||||||
VipsRegionWrite write_fn, void *a, void *b );
|
|
||||||
|
typedef void *(*VipsStart)( VipsImage *out, void *a, void *b );
|
||||||
|
typedef int (*VipsGenerate)( REGION *out, void *seq, void *a, void *b );
|
||||||
|
typedef int (*VipsStop)( void *seq, void *a, void *b );
|
||||||
|
int vips_sink( VipsImage *im,
|
||||||
|
VipsStart start, VipsGenerate generate, VipsStop stop,
|
||||||
|
void *a, void *b );
|
||||||
|
|
||||||
|
typedef void (*VipsSinkNotify)( VipsImage *im, Rect *rect, void *client );
|
||||||
|
int vips_sink_screen( VipsImage *in, VipsImage *out, VipsImage *mask,
|
||||||
|
int width, int height, int max,
|
||||||
|
int priority,
|
||||||
|
VipsSinkNotify notify, void *client );
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
|
@ -29,7 +29,8 @@ libiofuncs_la_SOURCES = \
|
|||||||
im_unmapfile.c \
|
im_unmapfile.c \
|
||||||
im_guess_prefix.c \
|
im_guess_prefix.c \
|
||||||
im_wbuffer.c \
|
im_wbuffer.c \
|
||||||
discsink.c \
|
sinkdisc.c \
|
||||||
|
sink.c \
|
||||||
im_wrapmany.c \
|
im_wrapmany.c \
|
||||||
im_writeline.c \
|
im_writeline.c \
|
||||||
memory.c \
|
memory.c \
|
||||||
|
@ -379,6 +379,9 @@ im_iterate( IMAGE *im,
|
|||||||
Iterate iter;
|
Iterate iter;
|
||||||
int result;
|
int result;
|
||||||
|
|
||||||
|
if( im__wbuffer2 )
|
||||||
|
return( vips_sink( im, start, generate, stop, b, c ) );
|
||||||
|
|
||||||
g_assert( !im_image_sanity( im ) );
|
g_assert( !im_image_sanity( im ) );
|
||||||
|
|
||||||
/* We don't use this, but make sure it's set in case any old binaries
|
/* We don't use this, but make sure it's set in case any old binaries
|
||||||
|
@ -430,7 +430,8 @@ im_wbuffer( im_threadgroup_t *tg, im_wbuffer_fn write_fn, void *a, void *b )
|
|||||||
/* Optionally use the newer one.
|
/* Optionally use the newer one.
|
||||||
*/
|
*/
|
||||||
if( im__wbuffer2 )
|
if( im__wbuffer2 )
|
||||||
return( vips_discsink( tg->im, write_fn, a, b ) );
|
return( vips_sink_disc( tg->im,
|
||||||
|
(VipsRegionWrite) write_fn, a ) );
|
||||||
|
|
||||||
if( im__start_eval( tg->im ) )
|
if( im__start_eval( tg->im ) )
|
||||||
return( -1 );
|
return( -1 );
|
||||||
|
@ -31,8 +31,8 @@
|
|||||||
*/
|
*/
|
||||||
|
|
||||||
/*
|
/*
|
||||||
#define DEBUG
|
|
||||||
*/
|
*/
|
||||||
|
#define VIPS_DEBUG
|
||||||
|
|
||||||
#ifdef HAVE_CONFIG_H
|
#ifdef HAVE_CONFIG_H
|
||||||
#include <config.h>
|
#include <config.h>
|
||||||
@ -72,7 +72,7 @@ typedef struct _Sink {
|
|||||||
int tile_height;
|
int tile_height;
|
||||||
int nlines;
|
int nlines;
|
||||||
|
|
||||||
/* Keep the sequence value in VipsThreadState->d.
|
/* Call params.
|
||||||
*/
|
*/
|
||||||
im_start_fn start;
|
im_start_fn start;
|
||||||
im_generate_fn generate;
|
im_generate_fn generate;
|
||||||
@ -81,6 +81,124 @@ typedef struct _Sink {
|
|||||||
void *b;
|
void *b;
|
||||||
} Sink;
|
} Sink;
|
||||||
|
|
||||||
|
/* Our per-thread state.
|
||||||
|
*/
|
||||||
|
typedef struct _SinkThreadState {
|
||||||
|
VipsThreadState parent_object;
|
||||||
|
|
||||||
|
/* Sequence value for this thread.
|
||||||
|
*/
|
||||||
|
void *seq;
|
||||||
|
|
||||||
|
/* The region we walk over sink.t copy. We can't use
|
||||||
|
* parent_object.reg, it's defined on the outer image.
|
||||||
|
*/
|
||||||
|
REGION *reg;
|
||||||
|
} SinkThreadState;
|
||||||
|
|
||||||
|
typedef struct _SinkThreadStateClass {
|
||||||
|
VipsThreadStateClass parent_class;
|
||||||
|
|
||||||
|
} SinkThreadStateClass;
|
||||||
|
|
||||||
|
G_DEFINE_TYPE( SinkThreadState, sink_thread_state, VIPS_TYPE_THREAD_STATE );
|
||||||
|
|
||||||
|
/* Call a thread's stop function.
|
||||||
|
*/
|
||||||
|
static int
|
||||||
|
sink_call_stop( Sink *sink, SinkThreadState *state )
|
||||||
|
{
|
||||||
|
if( state->seq && sink->stop ) {
|
||||||
|
VIPS_DEBUG_MSG( "sink_call_stop: state = %p\n", state );
|
||||||
|
|
||||||
|
if( sink->stop( state->seq, sink->a, sink->b ) ) {
|
||||||
|
im_error( "vips_sink",
|
||||||
|
_( "stop function failed for image \"%s\"" ),
|
||||||
|
sink->im->filename );
|
||||||
|
return( -1 );
|
||||||
|
}
|
||||||
|
|
||||||
|
state->seq = NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
return( 0 );
|
||||||
|
}
|
||||||
|
|
||||||
|
static void
|
||||||
|
sink_thread_state_dispose( GObject *gobject )
|
||||||
|
{
|
||||||
|
SinkThreadState *state = (SinkThreadState *) gobject;
|
||||||
|
Sink *sink = (Sink *) ((VipsThreadState *) state)->a;
|
||||||
|
|
||||||
|
sink_call_stop( sink, state );
|
||||||
|
IM_FREEF( im_region_free, state->reg );
|
||||||
|
|
||||||
|
G_OBJECT_CLASS( sink_thread_state_parent_class )->dispose( gobject );
|
||||||
|
}
|
||||||
|
|
||||||
|
/* Call the start function for this thread, if necessary.
|
||||||
|
*/
|
||||||
|
static int
|
||||||
|
sink_call_start( Sink *sink, SinkThreadState *state )
|
||||||
|
{
|
||||||
|
if( !state->seq && sink->start ) {
|
||||||
|
VIPS_DEBUG_MSG( "sink_call_start: state = %p\n", state );
|
||||||
|
|
||||||
|
state->seq = sink->start( sink->t, sink->a, sink->b );
|
||||||
|
|
||||||
|
if( !state->seq ) {
|
||||||
|
im_error( "vips_sink",
|
||||||
|
_( "start function failed for image \"%s\"" ),
|
||||||
|
sink->im->filename );
|
||||||
|
return( -1 );
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return( 0 );
|
||||||
|
}
|
||||||
|
|
||||||
|
static int
|
||||||
|
sink_thread_state_build( VipsObject *object )
|
||||||
|
{
|
||||||
|
SinkThreadState *state = (SinkThreadState *) object;
|
||||||
|
Sink *sink = (Sink *) ((VipsThreadState *) state)->a;
|
||||||
|
|
||||||
|
if( !(state->reg = im_region_create( sink->t )) ||
|
||||||
|
sink_call_start( sink, state ) )
|
||||||
|
return( -1 );
|
||||||
|
|
||||||
|
return( VIPS_OBJECT_CLASS(
|
||||||
|
sink_thread_state_parent_class )->build( object ) );
|
||||||
|
}
|
||||||
|
|
||||||
|
static void
|
||||||
|
sink_thread_state_class_init( SinkThreadStateClass *class )
|
||||||
|
{
|
||||||
|
GObjectClass *gobject_class = G_OBJECT_CLASS( class );
|
||||||
|
VipsObjectClass *object_class = VIPS_OBJECT_CLASS( class );
|
||||||
|
|
||||||
|
gobject_class->dispose = sink_thread_state_dispose;
|
||||||
|
|
||||||
|
object_class->build = sink_thread_state_build;
|
||||||
|
object_class->nickname = "sinkthreadstate";
|
||||||
|
object_class->description = _( "per-thread state for sink" );
|
||||||
|
}
|
||||||
|
|
||||||
|
static void
|
||||||
|
sink_thread_state_init( SinkThreadState *state )
|
||||||
|
{
|
||||||
|
state->seq = NULL;
|
||||||
|
state->reg = NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
static VipsThreadState *
|
||||||
|
sink_thread_state_new( VipsImage *im, void *a )
|
||||||
|
{
|
||||||
|
return( VIPS_THREAD_STATE( vips_object_new(
|
||||||
|
sink_thread_state_get_type(),
|
||||||
|
vips_thread_state_set, im, a ) ) );
|
||||||
|
}
|
||||||
|
|
||||||
static void
|
static void
|
||||||
sink_free( Sink *sink )
|
sink_free( Sink *sink )
|
||||||
{
|
{
|
||||||
@ -95,6 +213,8 @@ sink_init( Sink *sink,
|
|||||||
{
|
{
|
||||||
sink->im = im;
|
sink->im = im;
|
||||||
sink->t = NULL;
|
sink->t = NULL;
|
||||||
|
sink->x = 0;
|
||||||
|
sink->y = 0;
|
||||||
sink->start = start;
|
sink->start = start;
|
||||||
sink->generate = generate;
|
sink->generate = generate;
|
||||||
sink->stop = stop;
|
sink->stop = stop;
|
||||||
@ -113,54 +233,8 @@ sink_init( Sink *sink,
|
|||||||
return( 0 );
|
return( 0 );
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Call the start function for this thread, if necessary.
|
|
||||||
*/
|
|
||||||
static int
|
|
||||||
sink_call_start( Sink *sink, VipsThreadState *state )
|
|
||||||
{
|
|
||||||
if( !state->d && sink->start ) {
|
|
||||||
#ifdef DEBUG
|
|
||||||
printf( "sink_call_start: state = %p\n", state );
|
|
||||||
#endif /*DEBUG*/
|
|
||||||
state->d = sink->start( sink->t, sink->a, sink->b );
|
|
||||||
|
|
||||||
if( !state->d ) {
|
|
||||||
im_error( "vips_sink",
|
|
||||||
_( "start function failed for image \"%s\"" ),
|
|
||||||
sink->im->filename );
|
|
||||||
return( -1 );
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return( 0 );
|
|
||||||
}
|
|
||||||
|
|
||||||
/* Call a thread's stop function.
|
|
||||||
*/
|
|
||||||
static int
|
|
||||||
sink_call_stop( Sink *sink, VipsThreadState *state )
|
|
||||||
{
|
|
||||||
if( state->d && sink->stop ) {
|
|
||||||
#ifdef DEBUG
|
|
||||||
printf( "sink_call_stop: state = %p\n", state );
|
|
||||||
#endif /*DEBUG*/
|
|
||||||
|
|
||||||
if( sink->stop( state->d, sink->a, sink->b ) ) {
|
|
||||||
im_error( "vips_sink",
|
|
||||||
_( "stop function failed for image \"%s\"" ),
|
|
||||||
sink->im->filename );
|
|
||||||
return( -1 );
|
|
||||||
}
|
|
||||||
|
|
||||||
state->d = NULL;
|
|
||||||
}
|
|
||||||
|
|
||||||
return( 0 );
|
|
||||||
}
|
|
||||||
|
|
||||||
static int
|
static int
|
||||||
sink_allocate( VipsThreadState *state,
|
sink_allocate( VipsThreadState *state, void *a, gboolean *stop )
|
||||||
void *a, void *b, void *c, gboolean *stop )
|
|
||||||
{
|
{
|
||||||
Sink *sink = (Sink *) a;
|
Sink *sink = (Sink *) a;
|
||||||
|
|
||||||
@ -173,14 +247,12 @@ sink_allocate( VipsThreadState *state,
|
|||||||
sink->y += sink->tile_height;
|
sink->y += sink->tile_height;
|
||||||
|
|
||||||
if( sink->y >= sink->im->Ysize ) {
|
if( sink->y >= sink->im->Ysize ) {
|
||||||
sink_call_stop( sink, state );
|
|
||||||
*stop = TRUE;
|
*stop = TRUE;
|
||||||
|
|
||||||
return( 0 );
|
return( 0 );
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
sink_call_start( sink, state );
|
|
||||||
|
|
||||||
/* x, y and buf are good: save params for thread.
|
/* x, y and buf are good: save params for thread.
|
||||||
*/
|
*/
|
||||||
image.left = 0;
|
image.left = 0;
|
||||||
@ -195,27 +267,26 @@ sink_allocate( VipsThreadState *state,
|
|||||||
|
|
||||||
/* Move state on.
|
/* Move state on.
|
||||||
*/
|
*/
|
||||||
write->x += write->tile_width;
|
sink->x += sink->tile_width;
|
||||||
|
|
||||||
return( 0 );
|
return( 0 );
|
||||||
}
|
}
|
||||||
|
|
||||||
static int
|
static int
|
||||||
sink_work( VipsThreadState *state, void *a, void *b, void *c )
|
sink_work( VipsThreadState *state, void *a )
|
||||||
{
|
{
|
||||||
|
SinkThreadState *sstate = (SinkThreadState *) state;
|
||||||
Sink *sink = (Sink *) a;
|
Sink *sink = (Sink *) a;
|
||||||
|
|
||||||
if( im_prepare( state->reg, &state->pos ) ||
|
if( im_prepare( sstate->reg, &state->pos ) ||
|
||||||
sink->generate( state->reg, state->d, sink->a, sink->b ) ) {
|
sink->generate( sstate->reg, sstate->seq, sink->a, sink->b ) )
|
||||||
sink_call_stop( sink, state );
|
|
||||||
return( -1 );
|
return( -1 );
|
||||||
}
|
|
||||||
|
|
||||||
return( 0 );
|
return( 0 );
|
||||||
}
|
}
|
||||||
|
|
||||||
static int
|
static int
|
||||||
sink_progress( void *a, void *b, void *c )
|
sink_progress( void *a )
|
||||||
{
|
{
|
||||||
Sink *sink = (Sink *) a;
|
Sink *sink = (Sink *) a;
|
||||||
|
|
||||||
@ -239,7 +310,7 @@ sink_progress( void *a, void *b, void *c )
|
|||||||
* @b: user data
|
* @b: user data
|
||||||
*
|
*
|
||||||
* Loops over an image. @generate is called for every pixel in the image, with
|
* Loops over an image. @generate is called for every pixel in the image, with
|
||||||
* the @reg argument being a region of pixels for processing. im_iterate() is
|
* the @reg argument being a region of pixels for processing. vips_sink() is
|
||||||
* used to implement operations like im_avg() which have no image output.
|
* used to implement operations like im_avg() which have no image output.
|
||||||
*
|
*
|
||||||
* See also: im_generate(), im_open().
|
* See also: im_generate(), im_open().
|
||||||
@ -270,7 +341,11 @@ vips_sink( VipsImage *im,
|
|||||||
}
|
}
|
||||||
|
|
||||||
result = vips_threadpool_run( im,
|
result = vips_threadpool_run( im,
|
||||||
sink_allocate, sink_work, sink_progress, &sink, NULL );
|
sink_thread_state_new,
|
||||||
|
sink_allocate,
|
||||||
|
sink_work,
|
||||||
|
sink_progress,
|
||||||
|
&sink );
|
||||||
|
|
||||||
im__end_eval( sink.t );
|
im__end_eval( sink.t );
|
||||||
|
|
||||||
|
@ -1,4 +1,4 @@
|
|||||||
/* Write an image.
|
/* Write an image to a disc file.
|
||||||
*
|
*
|
||||||
* 19/3/10
|
* 19/3/10
|
||||||
* - from im_wbuffer.c
|
* - from im_wbuffer.c
|
||||||
@ -101,13 +101,51 @@ typedef struct _Write {
|
|||||||
*/
|
*/
|
||||||
VipsRegionWrite write_fn;
|
VipsRegionWrite write_fn;
|
||||||
void *a;
|
void *a;
|
||||||
void *b;
|
|
||||||
} Write;
|
} Write;
|
||||||
|
|
||||||
/* Enable im_wbuffer2 ... set from the cmd line, tested by our users.
|
/* Enable im_wbuffer2 ... set from the cmd line, tested by our users.
|
||||||
*/
|
*/
|
||||||
int im__wbuffer2 = 0;
|
int im__wbuffer2 = 0;
|
||||||
|
|
||||||
|
/* Our per-thread state ... we need to also track the buffer that pos is
|
||||||
|
* supposed to write to.
|
||||||
|
*/
|
||||||
|
typedef struct _WriteThreadState {
|
||||||
|
VipsThreadState parent_object;
|
||||||
|
|
||||||
|
WriteBuffer *buf;
|
||||||
|
} WriteThreadState;
|
||||||
|
|
||||||
|
typedef struct _WriteThreadStateClass {
|
||||||
|
VipsThreadStateClass parent_class;
|
||||||
|
|
||||||
|
} WriteThreadStateClass;
|
||||||
|
|
||||||
|
G_DEFINE_TYPE( WriteThreadState, write_thread_state, VIPS_TYPE_THREAD_STATE );
|
||||||
|
|
||||||
|
static void
|
||||||
|
write_thread_state_class_init( WriteThreadStateClass *class )
|
||||||
|
{
|
||||||
|
VipsObjectClass *object_class = VIPS_OBJECT_CLASS( class );
|
||||||
|
|
||||||
|
object_class->nickname = "writethreadstate";
|
||||||
|
object_class->description = _( "per-thread state for sinkdisc" );
|
||||||
|
}
|
||||||
|
|
||||||
|
static void
|
||||||
|
write_thread_state_init( WriteThreadState *state )
|
||||||
|
{
|
||||||
|
state->buf = NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
static VipsThreadState *
|
||||||
|
write_thread_state_new( VipsImage *im, void *a )
|
||||||
|
{
|
||||||
|
return( VIPS_THREAD_STATE( vips_object_new(
|
||||||
|
write_thread_state_get_type(),
|
||||||
|
vips_thread_state_set, im, a ) ) );
|
||||||
|
}
|
||||||
|
|
||||||
static void
|
static void
|
||||||
wbuffer_free( WriteBuffer *wbuffer )
|
wbuffer_free( WriteBuffer *wbuffer )
|
||||||
{
|
{
|
||||||
@ -147,7 +185,7 @@ wbuffer_write( WriteBuffer *wbuffer )
|
|||||||
#endif /*DEBUG*/
|
#endif /*DEBUG*/
|
||||||
|
|
||||||
wbuffer->write_errno = write->write_fn( wbuffer->region,
|
wbuffer->write_errno = write->write_fn( wbuffer->region,
|
||||||
&wbuffer->area, write->a, write->b );
|
&wbuffer->area, write->a );
|
||||||
}
|
}
|
||||||
|
|
||||||
#ifdef HAVE_THREADS
|
#ifdef HAVE_THREADS
|
||||||
@ -308,9 +346,9 @@ wbuffer_position( WriteBuffer *wbuffer, int top, int height )
|
|||||||
* iteration.
|
* iteration.
|
||||||
*/
|
*/
|
||||||
static gboolean
|
static gboolean
|
||||||
wbuffer_allocate_fn( VipsThreadState *state,
|
wbuffer_allocate_fn( VipsThreadState *state, void *a, gboolean *stop )
|
||||||
void *a, void *b, void *c, gboolean *stop )
|
|
||||||
{
|
{
|
||||||
|
WriteThreadState *wstate = (WriteThreadState *) state;
|
||||||
Write *write = (Write *) a;
|
Write *write = (Write *) a;
|
||||||
|
|
||||||
Rect image;
|
Rect image;
|
||||||
@ -359,7 +397,7 @@ wbuffer_allocate_fn( VipsThreadState *state,
|
|||||||
tile.width = write->tile_width;
|
tile.width = write->tile_width;
|
||||||
tile.height = write->tile_height;
|
tile.height = write->tile_height;
|
||||||
im_rect_intersectrect( &image, &tile, &state->pos );
|
im_rect_intersectrect( &image, &tile, &state->pos );
|
||||||
state->d = write->buf;
|
wstate->buf = write->buf;
|
||||||
|
|
||||||
/* Add to the number of writers on the buffer.
|
/* Add to the number of writers on the buffer.
|
||||||
*/
|
*/
|
||||||
@ -375,22 +413,21 @@ wbuffer_allocate_fn( VipsThreadState *state,
|
|||||||
/* Our VipsThreadpoolWork function ... generate a tile!
|
/* Our VipsThreadpoolWork function ... generate a tile!
|
||||||
*/
|
*/
|
||||||
static int
|
static int
|
||||||
wbuffer_work_fn( VipsThreadState *state,
|
wbuffer_work_fn( VipsThreadState *state, void *a )
|
||||||
void *a, void *b, void *c )
|
|
||||||
{
|
{
|
||||||
WriteBuffer *wbuffer = (WriteBuffer *) state->d;
|
WriteThreadState *wstate = (WriteThreadState *) state;
|
||||||
|
|
||||||
#ifdef DEBUG
|
#ifdef DEBUG
|
||||||
printf( "wbuffer_work_fn\n" );
|
printf( "wbuffer_work_fn\n" );
|
||||||
#endif /*DEBUG*/
|
#endif /*DEBUG*/
|
||||||
|
|
||||||
if( im_prepare_to( state->reg, wbuffer->region,
|
if( im_prepare_to( state->reg, wstate->buf->region,
|
||||||
&state->pos, state->pos.left, state->pos.top ) )
|
&state->pos, state->pos.left, state->pos.top ) )
|
||||||
return( -1 );
|
return( -1 );
|
||||||
|
|
||||||
/* Tell the bg write thread we've left.
|
/* Tell the bg write thread we've left.
|
||||||
*/
|
*/
|
||||||
im_semaphore_upn( &wbuffer->nwrite, 1 );
|
im_semaphore_upn( &wstate->buf->nwrite, 1 );
|
||||||
|
|
||||||
return( 0 );
|
return( 0 );
|
||||||
}
|
}
|
||||||
@ -398,7 +435,7 @@ wbuffer_work_fn( VipsThreadState *state,
|
|||||||
/* Our VipsThreadpoolProgress function ... send some eval progress feedback.
|
/* Our VipsThreadpoolProgress function ... send some eval progress feedback.
|
||||||
*/
|
*/
|
||||||
static int
|
static int
|
||||||
wbuffer_progress_fn( void *a, void *b, void *c )
|
wbuffer_progress_fn( void *a )
|
||||||
{
|
{
|
||||||
Write *write = (Write *) a;
|
Write *write = (Write *) a;
|
||||||
|
|
||||||
@ -414,7 +451,7 @@ wbuffer_progress_fn( void *a, void *b, void *c )
|
|||||||
|
|
||||||
static void
|
static void
|
||||||
write_init( Write *write,
|
write_init( Write *write,
|
||||||
VipsImage *im, VipsRegionWrite write_fn, void *a, void *b )
|
VipsImage *im, VipsRegionWrite write_fn, void *a )
|
||||||
{
|
{
|
||||||
write->im = im;
|
write->im = im;
|
||||||
write->buf = wbuffer_new( write );
|
write->buf = wbuffer_new( write );
|
||||||
@ -423,7 +460,6 @@ write_init( Write *write,
|
|||||||
write->y = 0;
|
write->y = 0;
|
||||||
write->write_fn = write_fn;
|
write->write_fn = write_fn;
|
||||||
write->a = a;
|
write->a = a;
|
||||||
write->b = b;
|
|
||||||
|
|
||||||
vips_get_tile_size( im,
|
vips_get_tile_size( im,
|
||||||
&write->tile_width, &write->tile_height, &write->nlines );
|
&write->tile_width, &write->tile_height, &write->nlines );
|
||||||
@ -440,7 +476,6 @@ write_free( Write *write )
|
|||||||
* VipsRegionWrite:
|
* VipsRegionWrite:
|
||||||
* @region: pixels to write
|
* @region: pixels to write
|
||||||
* @a: client data
|
* @a: client data
|
||||||
* @b: client data
|
|
||||||
*
|
*
|
||||||
* The function should write the pixels in @region. @a and @b are the values
|
* The function should write the pixels in @region. @a and @b are the values
|
||||||
* passed into vips_discsink().
|
* passed into vips_discsink().
|
||||||
@ -451,16 +486,16 @@ write_free( Write *write )
|
|||||||
*/
|
*/
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* vips_discsink:
|
* vips_sink_disc:
|
||||||
* @im: image to process
|
* @im: image to process
|
||||||
* @write_fn: called for every batch of pixels
|
* @write_fn: called for every batch of pixels
|
||||||
* @a: client data
|
* @a: client data
|
||||||
* @b: client data
|
|
||||||
*
|
*
|
||||||
* vips_discsink() loops over @im, top-to-bottom, generating it in sections.
|
* vips_sink_disc() loops over @im, top-to-bottom, generating it in sections.
|
||||||
* As each section is produced, @write_fn is called.
|
* As each section is produced, @write_fn is called.
|
||||||
*
|
*
|
||||||
* @write_fn is always called single-threaded, it's always given image
|
* @write_fn is always called single-threaded (though not always from the same
|
||||||
|
* thread), it's always given image
|
||||||
* sections in top-to-bottom order, and there are never any gaps.
|
* sections in top-to-bottom order, and there are never any gaps.
|
||||||
*
|
*
|
||||||
* This operation is handy for making image sinks which output to things like
|
* This operation is handy for making image sinks which output to things like
|
||||||
@ -471,7 +506,7 @@ write_free( Write *write )
|
|||||||
* Returns: 0 on success, -1 on error.
|
* Returns: 0 on success, -1 on error.
|
||||||
*/
|
*/
|
||||||
int
|
int
|
||||||
vips_discsink( VipsImage *im, VipsRegionWrite write_fn, void *a, void *b )
|
vips_sink_disc( VipsImage *im, VipsRegionWrite write_fn, void *a )
|
||||||
{
|
{
|
||||||
Write write;
|
Write write;
|
||||||
int result;
|
int result;
|
||||||
@ -479,7 +514,13 @@ vips_discsink( VipsImage *im, VipsRegionWrite write_fn, void *a, void *b )
|
|||||||
if( im__start_eval( im ) )
|
if( im__start_eval( im ) )
|
||||||
return( -1 );
|
return( -1 );
|
||||||
|
|
||||||
write_init( &write, im, write_fn, a, b );
|
write_init( &write, im, write_fn, a );
|
||||||
|
|
||||||
|
printf( "initing thread state\n" );
|
||||||
|
|
||||||
|
vips_thread_state_get_type();
|
||||||
|
|
||||||
|
printf( "made thread state\n" );
|
||||||
|
|
||||||
result = 0;
|
result = 0;
|
||||||
|
|
||||||
@ -487,10 +528,11 @@ vips_discsink( VipsImage *im, VipsRegionWrite write_fn, void *a, void *b )
|
|||||||
!write.buf_back ||
|
!write.buf_back ||
|
||||||
wbuffer_position( write.buf, 0, write.nlines ) ||
|
wbuffer_position( write.buf, 0, write.nlines ) ||
|
||||||
vips_threadpool_run( im,
|
vips_threadpool_run( im,
|
||||||
|
write_thread_state_new,
|
||||||
wbuffer_allocate_fn,
|
wbuffer_allocate_fn,
|
||||||
wbuffer_work_fn,
|
wbuffer_work_fn,
|
||||||
wbuffer_progress_fn,
|
wbuffer_progress_fn,
|
||||||
&write, NULL, NULL ) )
|
&write ) )
|
||||||
result = -1;
|
result = -1;
|
||||||
|
|
||||||
/* We've set all the buffers writing, but not waited for the BG
|
/* We've set all the buffers writing, but not waited for the BG
|
@ -38,8 +38,8 @@
|
|||||||
/*
|
/*
|
||||||
#define TIME_THREAD
|
#define TIME_THREAD
|
||||||
#define VIPS_DEBUG_RED
|
#define VIPS_DEBUG_RED
|
||||||
#define VIPS_DEBUG
|
|
||||||
*/
|
*/
|
||||||
|
#define VIPS_DEBUG
|
||||||
|
|
||||||
#ifdef HAVE_CONFIG_H
|
#ifdef HAVE_CONFIG_H
|
||||||
#include <config.h>
|
#include <config.h>
|
||||||
@ -76,19 +76,106 @@
|
|||||||
* can be used to give feedback.
|
* can be used to give feedback.
|
||||||
*
|
*
|
||||||
* This is like threadgroup, but workers allocate work units themselves. This
|
* This is like threadgroup, but workers allocate work units themselves. This
|
||||||
* reduces the synchronisation overhead and improves scalability.
|
* reduces synchronisation overhead and improves scalability.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
|
/**
|
||||||
|
* VipsThreadState:
|
||||||
|
* @reg: a #REGION
|
||||||
|
* @pos: a #Rect
|
||||||
|
* @x: an int
|
||||||
|
* @y: an int
|
||||||
|
* @a: client data
|
||||||
|
*
|
||||||
|
* These per-thread values are carried around for your use by
|
||||||
|
* vips_threadpool_run(). They are private to each thread, so they are a
|
||||||
|
* useful place
|
||||||
|
* for #VipsThreadpoolAllocate and #VipsThreadpoolWork to communicate.
|
||||||
|
*
|
||||||
|
* @reg is created for you at the start of processing and freed at the end,
|
||||||
|
* but you can do what you like with it.
|
||||||
|
*/
|
||||||
|
|
||||||
|
G_DEFINE_TYPE( VipsThreadState, vips_thread_state, VIPS_TYPE_OBJECT );
|
||||||
|
|
||||||
|
static void
|
||||||
|
vips_thread_state_dispose( GObject *gobject )
|
||||||
|
{
|
||||||
|
VipsThreadState *state = (VipsThreadState *) gobject;
|
||||||
|
|
||||||
|
VIPS_DEBUG_MSG( "vips_thread_state_dispose:\n" );
|
||||||
|
|
||||||
|
IM_FREEF( im_region_free, state->reg );
|
||||||
|
|
||||||
|
G_OBJECT_CLASS( vips_thread_state_parent_class )->dispose( gobject );
|
||||||
|
}
|
||||||
|
|
||||||
|
static int
|
||||||
|
vips_thread_state_build( VipsObject *object )
|
||||||
|
{
|
||||||
|
VipsThreadState *state = (VipsThreadState *) object;
|
||||||
|
|
||||||
|
if( !(state->reg = im_region_create( state->im )) )
|
||||||
|
return( -1 );
|
||||||
|
|
||||||
|
return( VIPS_OBJECT_CLASS(
|
||||||
|
vips_thread_state_parent_class )->build( object ) );
|
||||||
|
}
|
||||||
|
|
||||||
|
static void
|
||||||
|
vips_thread_state_class_init( VipsThreadStateClass *class )
|
||||||
|
{
|
||||||
|
GObjectClass *gobject_class = G_OBJECT_CLASS( class );
|
||||||
|
VipsObjectClass *object_class = VIPS_OBJECT_CLASS( class );
|
||||||
|
|
||||||
|
gobject_class->dispose = vips_thread_state_dispose;
|
||||||
|
|
||||||
|
object_class->build = vips_thread_state_build;
|
||||||
|
object_class->nickname = "threadstate";
|
||||||
|
object_class->description = _( "per-thread state for vipsthreadpool" );
|
||||||
|
}
|
||||||
|
|
||||||
|
static void
|
||||||
|
vips_thread_state_init( VipsThreadState *state )
|
||||||
|
{
|
||||||
|
VIPS_DEBUG_MSG( "vips_thread_state_init:\n" );
|
||||||
|
|
||||||
|
state->reg = NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
void *
|
||||||
|
vips_thread_state_set( VipsObject *object, void *a, void *b )
|
||||||
|
{
|
||||||
|
VipsThreadState *state = (VipsThreadState *) object;
|
||||||
|
VipsImage *im = (VipsImage *) a;
|
||||||
|
|
||||||
|
VIPS_DEBUG_MSG( "vips_thread_state_set:\n" );
|
||||||
|
|
||||||
|
state->im = im;
|
||||||
|
state->a = b;
|
||||||
|
|
||||||
|
return( NULL );
|
||||||
|
}
|
||||||
|
|
||||||
|
VipsThreadState *
|
||||||
|
vips_thread_state_new( VipsImage *im, void *a )
|
||||||
|
{
|
||||||
|
VIPS_DEBUG_MSG( "vips_thread_state_new:\n" );
|
||||||
|
|
||||||
|
return( VIPS_THREAD_STATE( vips_object_new(
|
||||||
|
VIPS_TYPE_THREAD_STATE, vips_thread_state_set, im, a ) ) );
|
||||||
|
}
|
||||||
|
|
||||||
/* What we track for each thread in the pool.
|
/* What we track for each thread in the pool.
|
||||||
*/
|
*/
|
||||||
typedef struct {
|
typedef struct {
|
||||||
/* All private.
|
/* All private.
|
||||||
*/
|
*/
|
||||||
/*< private >*/
|
/*< private >*/
|
||||||
VipsThreadState state;
|
|
||||||
|
|
||||||
struct _VipsThreadpool *pool; /* Pool we are part of */
|
struct _VipsThreadpool *pool; /* Pool we are part of */
|
||||||
|
|
||||||
|
VipsThreadState *state;
|
||||||
|
|
||||||
/* Thread we are running.
|
/* Thread we are running.
|
||||||
*/
|
*/
|
||||||
GThread *thread;
|
GThread *thread;
|
||||||
@ -115,13 +202,15 @@ typedef struct _VipsThreadpool {
|
|||||||
/*< private >*/
|
/*< private >*/
|
||||||
VipsImage *im; /* Image we are calculating */
|
VipsImage *im; /* Image we are calculating */
|
||||||
|
|
||||||
/* Do a unit of work (runs in parallel) and allocate a unit of work
|
/* STart a thread, do a unit of work (runs in parallel) and allocate
|
||||||
* (serial). Plus the mutex we use to serialize work allocation.
|
* a unit of work (serial). Plus the mutex we use to serialize work
|
||||||
|
* allocation.
|
||||||
*/
|
*/
|
||||||
|
VipsThreadStart start;
|
||||||
VipsThreadpoolAllocate allocate;
|
VipsThreadpoolAllocate allocate;
|
||||||
VipsThreadpoolWork work;
|
VipsThreadpoolWork work;
|
||||||
GMutex *allocate_lock;
|
GMutex *allocate_lock;
|
||||||
void *a, *b, *c; /* User arguments to work / allocate */
|
void *a; /* User argument to start / allocate / etc. */
|
||||||
|
|
||||||
int nthr; /* Number of threads in pool */
|
int nthr; /* Number of threads in pool */
|
||||||
VipsThread **thr; /* Threads */
|
VipsThread **thr; /* Threads */
|
||||||
@ -191,7 +280,7 @@ vips_thread_free( VipsThread *thr )
|
|||||||
thr->thread = NULL;
|
thr->thread = NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
IM_FREEF( im_region_free, thr->state.reg );
|
IM_FREEF( g_object_unref, thr->state );
|
||||||
thr->pool = NULL;
|
thr->pool = NULL;
|
||||||
|
|
||||||
#ifdef TIME_THREAD
|
#ifdef TIME_THREAD
|
||||||
@ -212,9 +301,14 @@ vips_thread_work_unit( VipsThread *thr )
|
|||||||
*/
|
*/
|
||||||
g_mutex_lock( pool->allocate_lock );
|
g_mutex_lock( pool->allocate_lock );
|
||||||
|
|
||||||
if( !pool->stop ) {
|
if( !thr->state )
|
||||||
if( pool->allocate( &thr->state,
|
if( !(thr->state = pool->start( pool->im, pool->a )) ) {
|
||||||
pool->a, pool->b, pool->c, &pool->stop ) ) {
|
thr->error = TRUE;
|
||||||
|
pool->error = TRUE;
|
||||||
|
}
|
||||||
|
|
||||||
|
if( !pool->stop && !pool->error ) {
|
||||||
|
if( pool->allocate( thr->state, pool->a, &pool->stop ) ) {
|
||||||
thr->error = TRUE;
|
thr->error = TRUE;
|
||||||
pool->error = TRUE;
|
pool->error = TRUE;
|
||||||
}
|
}
|
||||||
@ -235,7 +329,7 @@ vips_thread_work_unit( VipsThread *thr )
|
|||||||
|
|
||||||
/* Process a work unit.
|
/* Process a work unit.
|
||||||
*/
|
*/
|
||||||
if( pool->work( &thr->state, pool->a, pool->b, pool->c ) ) {
|
if( pool->work( thr->state, pool->a ) ) {
|
||||||
thr->error = TRUE;
|
thr->error = TRUE;
|
||||||
pool->error = TRUE;
|
pool->error = TRUE;
|
||||||
}
|
}
|
||||||
@ -262,11 +356,6 @@ vips_thread_main_loop( void *a )
|
|||||||
|
|
||||||
g_assert( pool == thr->pool );
|
g_assert( pool == thr->pool );
|
||||||
|
|
||||||
/* We now control the region (it was created by pool when we
|
|
||||||
* were built).
|
|
||||||
*/
|
|
||||||
im__region_take_ownership( thr->state.reg );
|
|
||||||
|
|
||||||
/* Process work units! Always tick, even if we are stopping, so the
|
/* Process work units! Always tick, even if we are stopping, so the
|
||||||
* main thread will wake up for exit.
|
* main thread will wake up for exit.
|
||||||
*/
|
*/
|
||||||
@ -296,7 +385,7 @@ vips_thread_new( VipsThreadpool *pool )
|
|||||||
if( !(thr = IM_NEW( pool->im, VipsThread )) )
|
if( !(thr = IM_NEW( pool->im, VipsThread )) )
|
||||||
return( NULL );
|
return( NULL );
|
||||||
thr->pool = pool;
|
thr->pool = pool;
|
||||||
thr->state.reg = NULL;
|
thr->state = NULL;
|
||||||
thr->thread = NULL;
|
thr->thread = NULL;
|
||||||
thr->exit = 0;
|
thr->exit = 0;
|
||||||
thr->error = 0;
|
thr->error = 0;
|
||||||
@ -306,16 +395,10 @@ vips_thread_new( VipsThreadpool *pool )
|
|||||||
thr->tpos = 0;
|
thr->tpos = 0;
|
||||||
#endif /*TIME_THREAD*/
|
#endif /*TIME_THREAD*/
|
||||||
|
|
||||||
/* Attach stuff.
|
/* We can't build the state here, it has to be done by the worker
|
||||||
|
* itself the first time that allocate runs so that any regions are
|
||||||
|
* owned by the correct thread.
|
||||||
*/
|
*/
|
||||||
if( !(thr->state.reg = im_region_create( pool->im )) ) {
|
|
||||||
vips_thread_free( thr );
|
|
||||||
return( NULL );
|
|
||||||
}
|
|
||||||
|
|
||||||
/* Get ready to hand the region over to the thread.
|
|
||||||
*/
|
|
||||||
im__region_no_ownership( thr->state.reg );
|
|
||||||
|
|
||||||
#ifdef TIME_THREAD
|
#ifdef TIME_THREAD
|
||||||
thr->btime = IM_ARRAY( pool->im, IM_TBUF_SIZE, double );
|
thr->btime = IM_ARRAY( pool->im, IM_TBUF_SIZE, double );
|
||||||
@ -363,6 +446,8 @@ vips_threadpool_kill_threads( VipsThreadpool *pool )
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/* This can be called multiple times, careful.
|
||||||
|
*/
|
||||||
static int
|
static int
|
||||||
vips_threadpool_free( VipsThreadpool *pool )
|
vips_threadpool_free( VipsThreadpool *pool )
|
||||||
{
|
{
|
||||||
@ -439,21 +524,21 @@ vips_threadpool_create_threads( VipsThreadpool *pool )
|
|||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* VipsThreadState:
|
* VipsThreadpoolStart:
|
||||||
* @reg: a #REGION
|
* @a: client data
|
||||||
* @pos: a #Rect
|
* @b: client data
|
||||||
* @x: an int
|
* @c: client data
|
||||||
* @y: an int
|
|
||||||
* @d: client data
|
|
||||||
* @e: client data
|
|
||||||
* @f: client data
|
|
||||||
*
|
*
|
||||||
* These per-thread values are carried around for your use by
|
* This function is called once by each worker just before the first time work
|
||||||
* vips_threadpool_run(). They are private to each thread, so they are handy
|
* is allocated to it to build the per-thread state. Per-thread state is used
|
||||||
* for VipsThreadpoolAllocate and VipsThreadpoolWork to communicate.
|
* by #VipsThreadpoolAllocate and #VipsThreadpoolWork to communicate.
|
||||||
*
|
*
|
||||||
* @reg is created for you at the start of processing and freed at the end,
|
* #VipsThreadState is a subclass of #VipsObject. Start functions are called
|
||||||
* but you can do what you like with it.
|
* from allocate, that is, they are single-threaded.
|
||||||
|
*
|
||||||
|
* See also: vips_threadpool_run().
|
||||||
|
*
|
||||||
|
* Returns: a new #VipsThreadState object, or NULL on error
|
||||||
*/
|
*/
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -465,7 +550,7 @@ vips_threadpool_create_threads( VipsThreadpool *pool )
|
|||||||
* @stop: set this to signal end of computation
|
* @stop: set this to signal end of computation
|
||||||
*
|
*
|
||||||
* This function is called to allocate a new work unit for the thread. It is
|
* This function is called to allocate a new work unit for the thread. It is
|
||||||
* always single-threaded, so it can modify per-call state (such as a
|
* always single-threaded, so it can modify per-pool state (such as a
|
||||||
* counter).
|
* counter).
|
||||||
*
|
*
|
||||||
* @a, @b, @c are the values supplied to the call to
|
* @a, @b, @c are the values supplied to the call to
|
||||||
@ -474,9 +559,9 @@ vips_threadpool_create_threads( VipsThreadpool *pool )
|
|||||||
* It should set @stop to %TRUE to indicate that no work could be allocated
|
* It should set @stop to %TRUE to indicate that no work could be allocated
|
||||||
* because the job is done.
|
* because the job is done.
|
||||||
*
|
*
|
||||||
* It should return 0 for a normal return, or non-zero to indicate an error.
|
|
||||||
*
|
|
||||||
* See also: vips_threadpool_run().
|
* See also: vips_threadpool_run().
|
||||||
|
*
|
||||||
|
* Returns: 0 on success, or -1 on error
|
||||||
*/
|
*/
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -487,15 +572,15 @@ vips_threadpool_create_threads( VipsThreadpool *pool )
|
|||||||
* @c: client data
|
* @c: client data
|
||||||
*
|
*
|
||||||
* This function is called to process a work unit. Many copies of this can run
|
* This function is called to process a work unit. Many copies of this can run
|
||||||
* at once, so it should not write to the per-call state. It can write to
|
* at once, so it should not write to the per-pool state. It can write to
|
||||||
* per-thread state.
|
* per-thread state.
|
||||||
*
|
*
|
||||||
* @a, @b, @c are the values supplied to the call to
|
* @a, @b, @c are the values supplied to the call to
|
||||||
* vips_threadpool_run().
|
* vips_threadpool_run().
|
||||||
*
|
*
|
||||||
* It should return 0 for a normal return, or non-zero to indicate an error.
|
|
||||||
*
|
|
||||||
* See also: vips_threadpool_run().
|
* See also: vips_threadpool_run().
|
||||||
|
*
|
||||||
|
* Returns: 0 on success, or -1 on error
|
||||||
*/
|
*/
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -507,14 +592,15 @@ vips_threadpool_create_threads( VipsThreadpool *pool )
|
|||||||
* This function is called by the main thread once for every work unit
|
* This function is called by the main thread once for every work unit
|
||||||
* processed. It can be used to give the user progress feedback.
|
* processed. It can be used to give the user progress feedback.
|
||||||
*
|
*
|
||||||
* It should return 0 for a normal return, or non-zero to indicate an error.
|
|
||||||
*
|
|
||||||
* See also: vips_threadpool_run().
|
* See also: vips_threadpool_run().
|
||||||
|
*
|
||||||
|
* Returns: 0 on success, or -1 on error
|
||||||
*/
|
*/
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* vips_threadpool_run:
|
* vips_threadpool_run:
|
||||||
* @im: image to loop over
|
* @im: image to loop over
|
||||||
|
* @start: allocate per-thread state
|
||||||
* @allocate: allocate a work unit
|
* @allocate: allocate a work unit
|
||||||
* @work: process a work unit
|
* @work: process a work unit
|
||||||
* @progress: give progress feedback about a work unit, or %NULL
|
* @progress: give progress feedback about a work unit, or %NULL
|
||||||
@ -522,30 +608,33 @@ vips_threadpool_create_threads( VipsThreadpool *pool )
|
|||||||
* @b: client data
|
* @b: client data
|
||||||
* @c: client data
|
* @c: client data
|
||||||
*
|
*
|
||||||
* This function runs a set of threads over an image. Each thread run
|
* This function runs a set of threads over an image. Each thread first calls
|
||||||
|
* @start to create new per-thread state, then runs
|
||||||
* @allocate to set up a new work unit (perhaps the next tile in an image, for
|
* @allocate to set up a new work unit (perhaps the next tile in an image, for
|
||||||
* example), then @work to process that work unit. After each unit is
|
* example), then @work to process that work unit. After each unit is
|
||||||
* processed, @progress is called, so that the operation can give
|
* processed, @progress is called, so that the operation can give
|
||||||
* progress feedback.@progress may be %NULL.
|
* progress feedback. @progress may be %NULL.
|
||||||
*
|
*
|
||||||
* Each thread has a small amount of private state that vips_threadpool_run()
|
* Each thread has private state that the @allocate and @work functions can
|
||||||
* maintains for it. The @allocate and @work functions can use this to
|
* use to communicate. This state is created by each worker as it starts using
|
||||||
* communicate.
|
* @start. Use the state destructor to clean up.
|
||||||
*
|
*
|
||||||
* @allocate is always single-threaded (so it can write to the per-call state),
|
* @allocate and @start are always single-threaded (so they can write to the
|
||||||
* whereas @work can be executed concurrently. @progress is always called by
|
* per-pool state), whereas @work can be executed concurrently. @progress is
|
||||||
* the main thread (ie. the thread which called vips_thtreadpool_run()).
|
* always called by
|
||||||
|
* the main thread (ie. the thread which called vips_threadpool_run()).
|
||||||
*
|
*
|
||||||
* See also: im_wbuffer2(), im_concurrency_set().
|
* See also: im_wbuffer2(), im_concurrency_set().
|
||||||
*
|
*
|
||||||
* Returns: 0 on success, -1 on error.
|
* Returns: 0 on success, or -1 on error.
|
||||||
*/
|
*/
|
||||||
int
|
int
|
||||||
vips_threadpool_run( VipsImage *im,
|
vips_threadpool_run( VipsImage *im,
|
||||||
|
VipsThreadStart start,
|
||||||
VipsThreadpoolAllocate allocate,
|
VipsThreadpoolAllocate allocate,
|
||||||
VipsThreadpoolWork work,
|
VipsThreadpoolWork work,
|
||||||
VipsThreadpoolProgress progress,
|
VipsThreadpoolProgress progress,
|
||||||
void *a, void *b, void *c )
|
void *a )
|
||||||
{
|
{
|
||||||
VipsThreadpool *pool;
|
VipsThreadpool *pool;
|
||||||
int result;
|
int result;
|
||||||
@ -558,11 +647,10 @@ vips_threadpool_run( VipsImage *im,
|
|||||||
if( !(pool = vips_threadpool_new( im )) )
|
if( !(pool = vips_threadpool_new( im )) )
|
||||||
return( -1 );
|
return( -1 );
|
||||||
|
|
||||||
|
pool->start = start;
|
||||||
pool->allocate = allocate;
|
pool->allocate = allocate;
|
||||||
pool->work = work;
|
pool->work = work;
|
||||||
pool->a = a;
|
pool->a = a;
|
||||||
pool->b = b;
|
|
||||||
pool->c = c;
|
|
||||||
|
|
||||||
/* Attach workers and set them going.
|
/* Attach workers and set them going.
|
||||||
*/
|
*/
|
||||||
@ -586,7 +674,7 @@ vips_threadpool_run( VipsImage *im,
|
|||||||
break;
|
break;
|
||||||
|
|
||||||
if( progress &&
|
if( progress &&
|
||||||
progress( pool->a, pool->b, pool->c ) )
|
progress( pool->a ) )
|
||||||
pool->error = TRUE;
|
pool->error = TRUE;
|
||||||
|
|
||||||
if( pool->stop || pool->error )
|
if( pool->stop || pool->error )
|
||||||
@ -631,10 +719,10 @@ vips_get_tile_size( VipsImage *im,
|
|||||||
*tile_height = im__tile_height;
|
*tile_height = im__tile_height;
|
||||||
|
|
||||||
/* Enough lines of tiles that we can expect to be able to keep
|
/* Enough lines of tiles that we can expect to be able to keep
|
||||||
* nthr busy.
|
* nthr busy. Then double it.
|
||||||
*/
|
*/
|
||||||
*nlines = *tile_height *
|
*nlines = *tile_height *
|
||||||
(1 + nthr / IM_MAX( 1, im->Xsize / *tile_width ));
|
(1 + nthr / IM_MAX( 1, im->Xsize / *tile_width )) * 2;
|
||||||
break;
|
break;
|
||||||
|
|
||||||
case IM_FATSTRIP:
|
case IM_FATSTRIP:
|
||||||
|
@ -147,6 +147,7 @@ vips_interpolate( VipsInterpolate *interpolate,
|
|||||||
VipsInterpolateClass *class = VIPS_INTERPOLATE_GET_CLASS( interpolate );
|
VipsInterpolateClass *class = VIPS_INTERPOLATE_GET_CLASS( interpolate );
|
||||||
|
|
||||||
g_assert( class->interpolate );
|
g_assert( class->interpolate );
|
||||||
|
|
||||||
class->interpolate( interpolate, out, in, x, y );
|
class->interpolate( interpolate, out, in, x, y );
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user