This commit is contained in:
John Cupitt 2009-10-15 14:22:23 +00:00
parent 5472ee939b
commit bb6ce837e8
7 changed files with 197 additions and 121 deletions

View File

@ -60,8 +60,12 @@
- im_jpeg2vips.c, set scale_num on shrink (thanks Guido) - im_jpeg2vips.c, set scale_num on shrink (thanks Guido)
- heh argh reading history always stopped after the first line (thanks Haida) - heh argh reading history always stopped after the first line (thanks Haida)
- added im_histindexed - added im_histindexed
- new im_iterate() calls start functions from workers so resources they make - new im_iterate() calls start and stop functions from workers so resources
are owned by the worker thread they make are owned by the worker thread ... this makes it possible to have
start functions which create mutiple regions and therefore allows
im_iterate() to scan more than one image at once
- threadgroup no longer has any default action, you must attach a work
function
25/3/09 started 7.18.0 25/3/09 started 7.18.0
- revised version numbers - revised version numbers

15
TODO
View File

@ -1,18 +1,3 @@
- need to call stop funcs from the worker as well
im_generate() does this my having im_region_free() call stop, if necessary
but we can't use that mechanism, since it relies on im->client1 etc. which
for us will be the copy operation that generated the image
regions need a call-this-to-free-seq function which is installed by
generate or iterate as appropriate
no, that won't work, when could we install the callbacks?
we must call the stop functions from the worker threads, that'll take care of
freeing the regions in the right context for us
- memory.c - memory.c

View File

@ -47,10 +47,6 @@ extern "C" {
*/ */
#define IM__DEFAULT_STACK_SIZE (2 * 1024 * 1024) #define IM__DEFAULT_STACK_SIZE (2 * 1024 * 1024)
/* A work function.
*/
typedef int (*im__work_fn)( REGION *, void *, void *, void * );
/* What we track for each thread. /* What we track for each thread.
*/ */
typedef struct { typedef struct {
@ -63,8 +59,8 @@ typedef struct {
int error; /* Set by thread if work fn fails */ int error; /* Set by thread if work fn fails */
REGION *oreg; /* If part of an inplace threadgroup, */ REGION *oreg; /* If part of an inplace threadgroup, */
Rect pos; /* where this thread should write */ Rect pos; /* Where this thread should write */
int x, y; /* it's result */ int x, y; /* Its result */
void *a, *b, *c; /* User arguments to work fns */ void *a, *b, *c; /* User arguments to work fns */
@ -74,6 +70,11 @@ typedef struct {
#endif /*TIME_THREAD*/ #endif /*TIME_THREAD*/
} im_thread_t; } im_thread_t;
/* A work function.
*/
typedef int (*im__work_fn)( im_thread_t *thr,
REGION *, void *, void *, void * );
/* What we track for a group of threads working together. /* What we track for a group of threads working together.
*/ */
typedef struct im__threadgroup_t { typedef struct im__threadgroup_t {
@ -84,7 +85,6 @@ typedef struct im__threadgroup_t {
int nlines; /* Scanlines-at-once we prefer for iteration */ int nlines; /* Scanlines-at-once we prefer for iteration */
im__work_fn work; /* Work fn for this threadgroup */ im__work_fn work; /* Work fn for this threadgroup */
int inplace; /* Regions should be contiguous */
int nthr; /* Number of threads in group */ int nthr; /* Number of threads in group */
im_thread_t **thr; /* Threads */ im_thread_t **thr; /* Threads */

View File

@ -294,6 +294,29 @@ im_allocate_input_array( IMAGE *out, ... )
* Returns: 0 on success, -1 on error. * Returns: 0 on success, -1 on error.
*/ */
static int
generate_work( im_thread_t *thr,
REGION *reg, void *a, void *b, void *c )
{
/* thr pos needs to be set before coming here ... check.
*/
{
Rect image;
image.left = 0;
image.top = 0;
image.width = thr->tg->im->Xsize;
image.height = thr->tg->im->Ysize;
g_assert( im_rect_includesrect( &image, &thr->pos ) );
}
if( im_prepare_to( reg, thr->oreg, &thr->pos, thr->x, thr->y ) )
return( -1 );
return( 0 );
}
/* Loop over a big region, filling it in many small pieces with threads. /* Loop over a big region, filling it in many small pieces with threads.
*/ */
static int static int
@ -309,9 +332,9 @@ eval_to_region( REGION *or, im_threadgroup_t *tg )
image.width = or->im->Xsize; image.width = or->im->Xsize;
image.height = or->im->Ysize; image.height = or->im->Ysize;
/* Note we'll be working to fill a contigious area. /* Our work function ... an inplace one.
*/ */
tg->inplace = 1; tg->work = generate_work;
/* Loop over or, attaching to all sub-parts in turn. /* Loop over or, attaching to all sub-parts in turn.
*/ */

View File

@ -25,6 +25,9 @@
* - new eval start/progress/end system * - new eval start/progress/end system
* 7/10/09 * 7/10/09
* - gtkdoc comments * - gtkdoc comments
* 15/10/09
* - call start and stop functions from the worker threads
* - reworked a bit
*/ */
/* /*
@ -54,7 +57,7 @@
*/ */
/* /*
#define DEBUG_IO #define DEBUG
*/ */
#ifdef HAVE_CONFIG_H #ifdef HAVE_CONFIG_H
@ -93,35 +96,83 @@ typedef struct _Iterate {
im_stop_fn stop; im_stop_fn stop;
void *b; void *b;
void *c; void *c;
/* Set this to signal to workers that we want to shut down sequences.
*/
gboolean shutdown;
} Iterate; } Iterate;
/* Call all stop functions. /* Call a thread's stop function.
*/ */
static int static int
iterate_call_all_stop( Iterate *iter, im_threadgroup_t *tg ) iterate_call_stop( Iterate *iter, im_thread_t *thr )
{ {
int i; #ifdef DEBUG
printf( "iterate_call_stop: thr = %p\n", thr );
#endif /*DEBUG*/
for( i = 0; i < tg->nthr; i++ ) { if( thr->a && iter->stop ) {
if( tg->thr[i]->a && iter->stop ) { if( iter->stop( thr->a, iter->b, iter->c ) ) {
if( iter->stop( tg->thr[i]->a, iter->b, iter->c ) ) im_error( "im_iterate",
/* Drastic! _( "stop function failed "
*/ "for image \"%s\"" ),
im_error( "im_iterate", iter->im->filename );
_( "stop function failed " return( -1 );
"for image \"%s\"" ),
iter->im->filename );
tg->thr[i]->a = NULL;
} }
thr->a = NULL;
} }
return( 0 ); return( 0 );
} }
static void
iterate_call_all_stop( Iterate *iter )
{
im_threadgroup_t *tg = iter->tg;
int i;
#ifdef DEBUG
printf( "iterate_call_all_stop: start\n" );
#endif /*DEBUG*/
/* Wait for all threads to hit 'go'.
*/
im_threadgroup_wait( tg );
/* Get all threads off idle and waiting for work.
*/
for( i = 0; i < tg->nthr; i++ )
(void) im_threadgroup_get( tg );
/* Now run all threads one more time and ask them to junk their
* sequence value. 'shutdown' changes the behaviour of the work
* function, see below.
*/
iter->shutdown = TRUE;
for( i = 0; i < tg->nthr; i++ ) {
/* The work fn will need this set if it's not been run
* before.
*/
tg->thr[i]->b = iter;
im_threadgroup_trigger( tg->thr[i] );
}
/* And wait for them to idle again.
*/
im_threadgroup_wait( tg );
#ifdef DEBUG
printf( "iterate_call_all_stop: done\n" );
#endif /*DEBUG*/
}
static void static void
iterate_free( Iterate *iter ) iterate_free( Iterate *iter )
{ {
/* Check all the stop functions have been called. /* All threads should have junked their sequence values.
*/ */
if( iter->tg ) { if( iter->tg ) {
int i; int i;
@ -155,22 +206,54 @@ iterate_call_start( Iterate *iter, im_thread_t *thr )
return( 0 ); return( 0 );
} }
/* Our generate function. We need to call the user's start function from the /* Our work function. This is complicated because we need to make sure we
* worker thread so that any regions it makes are owned by the thread. * all call our start and stop functions from the worker thread, so that any
* regions created are owned by the worker and not the main thread.
*/ */
static int static int
iterate_gen( REGION *reg, void *seq, void *a, void *b ) iterate_work( im_thread_t *thr, REGION *reg, void *seq, void *a, void *b )
{ {
Iterate *iter = (Iterate *) a; Iterate *iter = (Iterate *) a;
im_thread_t *thr = (im_thread_t *) b;
/* Make sure the start function has run and we have the sequence value /* 'shutdown' is set at the end of computation to ask workers to free
* set. * context.
*/ */
iterate_call_start( iter, thr ); if( iter->shutdown ) {
seq = thr->a; if( iterate_call_stop( iter, thr ) )
return( -1 );
}
else {
/* Make sure the start function has run and we have the
* sequence value set.
*/
iterate_call_start( iter, thr );
seq = thr->a;
return( iter->generate( reg, seq, iter->b, iter->c ) ); /* thr pos needs to be set before coming here ... check.
*/
{
Rect image;
image.left = 0;
image.top = 0;
image.width = thr->tg->im->Xsize;
image.height = thr->tg->im->Ysize;
g_assert( im_rect_includesrect( &image, &thr->pos ) );
}
/* Generate pixels for the user.
*/
if( im_prepare( reg, &thr->pos ) )
return( -1 );
/* Run the user code on this area.
*/
if( iter->generate( reg, seq, iter->b, iter->c ) )
return( -1 );
}
return( 0 );
} }
static int static int
@ -187,6 +270,7 @@ iterate_init( Iterate *iter,
iter->stop = stop; iter->stop = stop;
iter->b = b; iter->b = b;
iter->c = c; iter->c = c;
iter->shutdown = FALSE;
if( !(iter->t = im_open( "iterate", "p" )) || if( !(iter->t = im_open( "iterate", "p" )) ||
im_copy( iter->im, iter->t ) || im_copy( iter->im, iter->t ) ||
@ -195,14 +279,11 @@ iterate_init( Iterate *iter,
return( -1 ); return( -1 );
} }
iter->tg->work = iterate_gen; iter->tg->work = iterate_work;
iter->tg->inplace = 0;
#ifdef DEBUG_IO #ifdef DEBUG
if( iter->tg->nthr > 1 ) im_diag( "im_iterate", "using %d threads", iter->tg->nthr );
im_diagnostics( "im_iterate: using %d threads", #endif /*DEBUG*/
iter->tg->nthr );
#endif /*DEBUG_IO*/
return( 0 ); return( 0 );
} }
@ -215,10 +296,6 @@ iterate_loop( Iterate *iter, im_threadgroup_t *tg, IMAGE *t )
int x, y; int x, y;
Rect image; Rect image;
/* Set up.
*/
tg->inplace = 0;
image.left = 0; image.left = 0;
image.top = 0; image.top = 0;
image.width = t->Xsize; image.width = t->Xsize;
@ -248,10 +325,9 @@ iterate_loop( Iterate *iter, im_threadgroup_t *tg, IMAGE *t )
thr->pos = clipped; thr->pos = clipped;
/* Other stuff we want passed to iterate_gen(). /* Other stuff we want passed to iterate_work().
*/ */
thr->b = iter; thr->b = iter;
thr->c = thr;
/* Start worker going. /* Start worker going.
*/ */
@ -270,15 +346,10 @@ iterate_loop( Iterate *iter, im_threadgroup_t *tg, IMAGE *t )
} }
} }
/* Wait for all threads to hit 'go' again. /* Make sure all processing is done.
*/ */
im_threadgroup_wait( tg ); im_threadgroup_wait( tg );
/* Test for any errors.
*/
if( im_threadgroup_iserror( tg ) )
return( -1 );
return( 0 ); return( 0 );
} }
@ -326,7 +397,11 @@ im_iterate( IMAGE *im,
/* Signal end of eval. /* Signal end of eval.
*/ */
result |= im__end_eval( iter.t ); result |= im__end_eval( iter.t );
result |= iterate_call_all_stop( &iter, iter.tg );
/* Shut down and test for any errors.
*/
iterate_call_all_stop( &iter );
result |= im_threadgroup_iserror( iter.tg );
iterate_free( &iter ); iterate_free( &iter );
return( result ); return( result );

View File

@ -182,11 +182,31 @@ wbuffer_new( im_threadgroup_t *tg, im_wbuffer_fn write_fn, void *a, void *b )
return( wbuffer ); return( wbuffer );
} }
/* At end of work_fn ... need to tell wbuffer write thread that we're done. /* Our work function ... generate a tile and tell the wbuffer write thread that
* we're done.
*/ */
static int static int
wbuffer_work_fn( REGION *region, WriteBuffer *wbuffer ) wbuffer_work_fn( im_thread_t *thr,
REGION *reg, void *a, void *b, void *c )
{ {
WriteBuffer *wbuffer = (WriteBuffer *) a;
/* thr pos needs to be set before coming here ... check.
*/
{
Rect image;
image.left = 0;
image.top = 0;
image.width = thr->tg->im->Xsize;
image.height = thr->tg->im->Ysize;
g_assert( im_rect_includesrect( &image, &thr->pos ) );
}
if( im_prepare_to( reg, thr->oreg, &thr->pos, thr->x, thr->y ) )
return( -1 );
im_semaphore_upn( &wbuffer->nwrite, 1 ); im_semaphore_upn( &wbuffer->nwrite, 1 );
return( 0 ); return( 0 );
@ -319,14 +339,10 @@ wbuffer_eval_to_file( WriteBuffer *b1, WriteBuffer *b2 )
printf( "wbuffer_eval_to_file: partial image output to file\n" ); printf( "wbuffer_eval_to_file: partial image output to file\n" );
#endif /*DEBUG*/ #endif /*DEBUG*/
/* Note we'll be working to fill a contigious area.
*/
tg->inplace = 1;
/* What threads do at the end of each tile ... decrement the nwrite /* What threads do at the end of each tile ... decrement the nwrite
* semaphore. * semaphore.
*/ */
tg->work = (im__work_fn) wbuffer_work_fn; tg->work = wbuffer_work_fn;
/* Fill to in steps, write each to the output. /* Fill to in steps, write each to the output.
*/ */
@ -403,8 +419,7 @@ wbuffer_eval_to_file( WriteBuffer *b1, WriteBuffer *b2 )
} }
int int
im_wbuffer( im_threadgroup_t *tg, im_wbuffer( im_threadgroup_t *tg, im_wbuffer_fn write_fn, void *a, void *b )
im_wbuffer_fn write_fn, void *a, void *b )
{ {
WriteBuffer *b1, *b2; WriteBuffer *b1, *b2;
int result; int result;

View File

@ -10,6 +10,9 @@
* - double-buffer file writes * - double-buffer file writes
* 27/11/06 * 27/11/06
* - break stuff out to generate / iterate * - break stuff out to generate / iterate
* 15/10/09
* - get rid of inplace and default work stuff, you must now always set a
* work function
*/ */
/* /*
@ -52,7 +55,6 @@
#include <stdio.h> #include <stdio.h>
#include <stdlib.h> #include <stdlib.h>
#include <assert.h>
#ifdef HAVE_UNISTD_H #ifdef HAVE_UNISTD_H
#include <unistd.h> #include <unistd.h>
#endif /*HAVE_UNISTD_H*/ #endif /*HAVE_UNISTD_H*/
@ -215,7 +217,7 @@ threadgroup_idle_remove( im_thread_t *thr )
#ifdef DEBUG_HIGHWATER #ifdef DEBUG_HIGHWATER
tg->nidle -= 1; tg->nidle -= 1;
assert( tg->nidle >= 0 && tg->nidle <= tg->nthr ); g_assert( tg->nidle >= 0 && tg->nidle <= tg->nthr );
#endif /*DEBUG_HIGHWATER*/ #endif /*DEBUG_HIGHWATER*/
g_mutex_unlock( tg->idle_lock ); g_mutex_unlock( tg->idle_lock );
@ -230,12 +232,12 @@ threadgroup_idle_add( im_thread_t *thr )
g_mutex_lock( tg->idle_lock ); g_mutex_lock( tg->idle_lock );
assert( !g_slist_find( tg->idle, thr ) ); g_assert( !g_slist_find( tg->idle, thr ) );
tg->idle = g_slist_prepend( tg->idle, thr ); tg->idle = g_slist_prepend( tg->idle, thr );
#ifdef DEBUG_HIGHWATER #ifdef DEBUG_HIGHWATER
tg->nidle += 1; tg->nidle += 1;
assert( tg->nidle >= 0 && tg->nidle <= tg->nthr ); g_assert( tg->nidle >= 0 && tg->nidle <= tg->nthr );
#endif /*DEBUG_HIGHWATER*/ #endif /*DEBUG_HIGHWATER*/
im_semaphore_up( &tg->idle_sem ); im_semaphore_up( &tg->idle_sem );
@ -256,14 +258,14 @@ im_threadgroup_get( im_threadgroup_t *tg )
g_mutex_lock( tg->idle_lock ); g_mutex_lock( tg->idle_lock );
assert( tg->idle ); g_assert( tg->idle );
assert( tg->idle->data ); g_assert( tg->idle->data );
thr = (im_thread_t *) tg->idle->data; thr = (im_thread_t *) tg->idle->data;
tg->idle = g_slist_remove( tg->idle, thr ); tg->idle = g_slist_remove( tg->idle, thr );
#ifdef DEBUG_HIGHWATER #ifdef DEBUG_HIGHWATER
tg->nidle -= 1; tg->nidle -= 1;
assert( tg->nidle >= 0 && tg->nidle <= tg->nthr ); g_assert( tg->nidle >= 0 && tg->nidle <= tg->nthr );
if( tg->nidle < tg->min_idle ) if( tg->nidle < tg->min_idle )
tg->min_idle = tg->nidle; tg->min_idle = tg->nidle;
#endif /*DEBUG_HIGHWATER*/ #endif /*DEBUG_HIGHWATER*/
@ -336,26 +338,14 @@ work_fn( im_thread_t *thr )
{ {
/* Doublecheck only one thread per region. /* Doublecheck only one thread per region.
*/ */
assert( thr->thread == g_thread_self() ); g_assert( thr->thread == g_thread_self() );
/* Prepare this area. g_assert( thr->tg->work );
*/
if( thr->tg->inplace ) {
if( im_prepare_to( thr->reg, thr->oreg,
&thr->pos, thr->x, thr->y ) )
thr->error = -1;
}
else {
/* Attach to this position.
*/
if( im_prepare( thr->reg, &thr->pos ) )
thr->error = -1;
}
/* Call our work function. /* Call our work function.
*/ */
if( !thr->error && thr->tg->work && if( !thr->error &&
thr->tg->work( thr->reg, thr->a, thr->b, thr->c ) ) thr->tg->work( thr, thr->reg, thr->a, thr->b, thr->c ) )
thr->error = -1; thr->error = -1;
/* Back on the idle queue again. /* Back on the idle queue again.
@ -364,7 +354,7 @@ work_fn( im_thread_t *thr )
} }
#ifdef HAVE_THREADS #ifdef HAVE_THREADS
/* What runs as a thread ... loop, waiting to be told to fill our region. /* What runs as a thread ... loop, waiting to be told to do stuff.
*/ */
static void * static void *
thread_main_loop( void *a ) thread_main_loop( void *a )
@ -378,7 +368,7 @@ thread_main_loop( void *a )
im__region_take_ownership( thr->reg ); im__region_take_ownership( thr->reg );
for(;;) { for(;;) {
assert( tg == thr->tg ); g_assert( tg == thr->tg );
/* Signal the main thread that we are idle, and block. /* Signal the main thread that we are idle, and block.
*/ */
@ -495,7 +485,7 @@ threadgroup_kill_threads( im_threadgroup_t *tg )
thread_free( tg->thr[i] ); thread_free( tg->thr[i] );
tg->thr = NULL; tg->thr = NULL;
assert( !tg->idle ); g_assert( !tg->idle );
/* Reset the idle semaphore. /* Reset the idle semaphore.
*/ */
@ -550,7 +540,6 @@ im_threadgroup_create( IMAGE *im )
tg->zombie = 0; tg->zombie = 0;
tg->im = im; tg->im = im;
tg->work = NULL; tg->work = NULL;
tg->inplace = 0;
if( (tg->nthr = im_concurrency_get()) < 0 ) if( (tg->nthr = im_concurrency_get()) < 0 )
return( NULL ); return( NULL );
tg->thr = NULL; tg->thr = NULL;
@ -632,21 +621,6 @@ im_threadgroup_create( IMAGE *im )
void void
im_threadgroup_trigger( im_thread_t *thr ) im_threadgroup_trigger( im_thread_t *thr )
{ {
im_threadgroup_t *tg = thr->tg;
/* thr pos needs to be set before coming here ... check.
*/
{
Rect image;
image.left = 0;
image.top = 0;
image.width = tg->im->Xsize;
image.height = tg->im->Ysize;
assert( im_rect_includesrect( &image, &thr->pos ) );
}
/* Start worker going. /* Start worker going.
*/ */
im_semaphore_up( &thr->go ); im_semaphore_up( &thr->go );