threadpool compiles

This commit is contained in:
John Cupitt 2010-03-18 22:08:07 +00:00
parent b68a25a229
commit b0c8ddc1df
4 changed files with 78 additions and 97 deletions

View File

@ -60,7 +60,7 @@ typedef struct {
void *a, *b, *c; /* User arguments to work fns */
#ifdef TIME_THREAD
hrtime_t *btime, *etime;
double *btime, *etime;
int tpos;
#endif /*TIME_THREAD*/
} im_thread_t;

View File

@ -48,7 +48,7 @@ typedef struct {
/* All private.
*/
/*< private >*/
struct _VipsTreadpool *pool; /* Pool we are part of */
struct _VipsThreadpool *pool; /* Pool we are part of */
REGION *reg; /* Region this thread operates on */
@ -64,7 +64,7 @@ typedef struct {
void *a, *b, *c; /* User arguments to work fns */
#ifdef TIME_THREAD
hrtime_t *btime, *etime;
double *btime, *etime;
int tpos;
#endif /*TIME_THREAD*/
} VipsThread;
@ -93,9 +93,10 @@ typedef struct _VipsThreadpool {
/* Do a unit of work (runs in parallel) and allocate a unit of work
* (serial). Plus the mutex we use to serialize work allocation.
*/
VipsThreadPoolAllocate allocate;
VipsThreadPoolWork work;
VipsThreadpoolAllocate allocate;
VipsThreadpoolWork work;
GMutex *allocate_lock;
void *a, *b, *c; /* User arguments to work / allocate */
int nthr; /* Number of threads in pool */
VipsThread **thr; /* Threads */
@ -114,11 +115,9 @@ typedef struct _VipsThreadpool {
gboolean zombie;
} VipsThreadpool;
/* Thread pool functions.
*/
VipsThreadpool *vips_threadpool_new( IMAGE *im );
int vips_threadpool_free( VipsThreadpool *pool );
void vips_threadpool_run( VipsThreadpool *pool );
int vips_threadpool_run( VipsImage *im,
VipsThreadpoolAllocate allocate, VipsThreadpoolWork work,
void *a, void *b, void *c );
#ifdef __cplusplus
}

View File

@ -39,6 +39,7 @@ libiofuncs_la_SOURCES = \
rect.c \
semaphore.c \
threadgroup.c \
threadpool.c \
util.c \
im_init_world.c \
buf.c \

View File

@ -31,11 +31,10 @@
*/
/*
*/
#define TIME_THREAD
#define DEBUG_CREATE
#define DEBUG_HIGHWATER
#define DEBUG_IO
*/
#ifdef HAVE_CONFIG_H
#include <config.h>
@ -77,13 +76,14 @@
/* Size of time buffers.
*/
#define IM_TBUF_SIZE (20000)
static GTimer *thread_timer = NULL;
#endif /*TIME_THREAD*/
#ifdef TIME_THREAD
/* Save time buffers.
*/
static int
save_time_buffers( REGION *reg )
save_time_buffers( VipsThread *thr )
{
int i;
static int rn = 1;
@ -93,8 +93,8 @@ save_time_buffers( REGION *reg )
im_snprintf( name, 256, "time%d", rn++ );
if( !(fp = fopen( name, "w" )) )
error_exit( "unable to write to \"%s\"", name );
for( i = 0; i < reg->tpos; i++ )
fprintf( fp, "%lld\n%lld\n", reg->btime[i], reg->etime[i] );
for( i = 0; i < thr->tpos; i++ )
fprintf( fp, "%g, %g\n", thr->btime[i], thr->etime[i] );
fclose( fp );
return( 0 );
@ -110,7 +110,6 @@ thread_free( VipsThread *thr )
*/
if( thr->thread ) {
thr->kill = 1;
im_semaphore_up( &thr->go );
/* Return value is always NULL (see thread_main_loop).
*/
@ -121,11 +120,10 @@ thread_free( VipsThread *thr )
thr->thread = NULL;
}
im_semaphore_destroy( &thr->go );
IM_FREEF( im_region_free, thr->reg );
thr->oreg = NULL;
thr->tg = NULL;
thr->pool = NULL;
#ifdef TIME_THREAD
if( thr->btime )
@ -169,35 +167,17 @@ thread_main_loop( void *a )
im__region_take_ownership( thr->reg );
for(;;) {
/* Block until work is ready for us.
*/
im_semaphore_down( &thr->go );
/* Asked to exit?
*/
if( thr->kill )
break;
/* Starting to work on something.
*/
for(;;) {
gboolean alldone;
/* Ask for a work unit.
*/
g_mutex_lock( &pool->allocate_lock );
alldone = pool->allocate( thr );
if( alldone && !pool->stop ) {
/* The allocate function has returned TRUE
* (all done) for the first time. Set
* everything to make all threads return to
* their 'go' semaphores and restart the main
* thread.
*/
g_mutex_lock( pool->allocate_lock );
if( !pool->stop ) {
gboolean morework;
morework = pool->allocate( thr );
if( !morework )
pool->stop = TRUE;
im_semaphore_up( &pool->main );
}
g_mutex_unlock( &pool->allocate_lock );
g_mutex_unlock( pool->allocate_lock );
/* Asked to stop work?
*/
@ -209,7 +189,8 @@ thread_main_loop( void *a )
/* Note start time.
*/
if( thr->btime )
thr->btime[thr->tpos] = gethrtime();
thr->btime[thr->tpos] =
g_timer_elapsed( thread_timer, NULL );
#endif /*TIME_THREAD*/
/* Loop once.
@ -220,12 +201,16 @@ thread_main_loop( void *a )
/* Note stop time.
*/
if( thr->etime ) {
thr->etime[thr->tpos] = gethrtime();
thr->etime[thr->tpos] =
g_timer_elapsed( thread_timer, NULL );
thr->tpos++;
}
#endif /*TIME_THREAD*/
}
}
/* We are exiting: tell the main thread.
*/
im_semaphore_up( &pool->finish );
return( NULL );
}
@ -266,8 +251,8 @@ vips_thread_new( VipsThreadpool *pool )
im__region_no_ownership( thr->reg );
#ifdef TIME_THREAD
thr->btime = IM_ARRAY( pool->im, IM_TBUF_SIZE, hrtime_t );
thr->etime = IM_ARRAY( pool->im, IM_TBUF_SIZE, hrtime_t );
thr->btime = IM_ARRAY( pool->im, IM_TBUF_SIZE, double );
thr->etime = IM_ARRAY( pool->im, IM_TBUF_SIZE, double );
if( !thr->btime || !thr->etime ) {
thread_free( thr );
return( NULL );
@ -343,11 +328,6 @@ vips_threadpool_free( VipsThreadpool *pool )
IM_FREEF( g_mutex_free, pool->allocate_lock );
pool->zombie = 1;
#ifdef DEBUG_HIGHWATER
printf( "vips_threadpool_free %p: max busy workers = %d\n",
pool, pool->nthr - pool->min_idle );
#endif /*DEBUG_HIGHWATER*/
return( 0 );
}
@ -367,7 +347,6 @@ VipsThreadpool *
vips_threadpool_new( VipsImage *im )
{
VipsThreadpool *pool;
int i;
/* Allocate and init new thread block.
*/
@ -430,11 +409,6 @@ vips_threadpool_new( VipsImage *im )
"groups of %d scanlines\n", pool->pw, pool->ph, pool->nlines );
#endif /*DEBUG_IO*/
#ifdef DEBUG_HIGHWATER
pool->nidle = 0;
pool->min_idle = pool->nthr;
#endif /*DEBUG_HIGHWATER*/
#ifdef DEBUG_IO
printf( "vips_threadpool_new: \"%s\" (%p), with %d threads\n",
im->filename, pool, pool->nthr );
@ -454,7 +428,7 @@ threadpool_create_threads( VipsThreadpool *pool )
/* Make thread array.
*/
if( !(pool->thr = IM_ARRAY( im, pool->nthr, VipsThread * )) )
if( !(pool->thr = IM_ARRAY( pool->im, pool->nthr, VipsThread * )) )
return( -1 );
for( i = 0; i < pool->nthr; i++ )
pool->thr[i] = NULL;
@ -470,25 +444,34 @@ threadpool_create_threads( VipsThreadpool *pool )
return( 0 );
}
/* Run a threadpool. The allocate and work fns need to have been set.
*
* If we don't have threading enabled, do the work in the main thread.
*/
int
vips_threadpool_run( VipsThreadpool *pool,
VipsThreadpoolAllocate allocate, VipsThreadpoolWork work )
vips_threadpool_run( VipsImage *im,
VipsThreadpoolAllocate allocate, VipsThreadpoolWork work,
void *a, void *b, void *c )
{
VipsThreadpool *pool;
int result;
#ifdef TIME_THREAD
if( !thread_timer )
thread_timer = g_timer_new();
#endif /*TIME_THREAD*/
if( !(pool = vips_threadpool_new( im )) )
return( -1 );
pool->allocate = allocate;
pool->work = work;
pool->kill = 0;
pool->stop = 0;
pool->a = a;
pool->b = b;
pool->c = c;
/* Attach workers and set them going.
*/
if( threadpool_create_threads( pool ) )
if( threadpool_create_threads( pool ) ) {
vips_threadpool_free( pool );
return( -1 );
}
#ifdef HAVE_THREADS
/* Wait for them all to hit finish.
@ -498,10 +481,10 @@ vips_threadpool_run( VipsThreadpool *pool,
/* No threads, do the work ourselves in the main thread.
*/
for(;;) {
gboolean alldone;
gboolean morework;
alldone = pool->allocate( pool->thr[0] );
if( alldone && !pool->stop )
morework = pool->allocate( pool->thr[0] );
if( !morework && !pool->stop )
pool->stop = TRUE;
if( pool->thr[0]->error )
break;
@ -520,17 +503,15 @@ vips_threadpool_run( VipsThreadpool *pool,
pool->im->kill )
result = -1;
else {
int i;
for( i = 0; i < pool->nthr; i++ )
if( pool->thr[i]->error )
result = -1;
}
if( result ) {
threadpool_kill_threads( pool );
return( -1 );
vips_threadpool_free( pool );
return( result );
}
pool->kill = 0;
pool->stop = 0;
return( 0 );
}