threadpool progress feedback

This commit is contained in:
John Cupitt 2010-03-21 13:54:56 +00:00
parent cdbda60d2e
commit 2bbcf5aa5e
5 changed files with 143 additions and 90 deletions

View File

@ -1,3 +1,6 @@
21/3/10 started 7.21.3
- added progress feedback to threadpool
16/1/10 started 7.21.2 16/1/10 started 7.21.2
- "invalidate" is careful to keep images alive, so invalidate callbacks can do - "invalidate" is careful to keep images alive, so invalidate callbacks can do
im_close() im_close()

3
TODO
View File

@ -1,9 +1,6 @@
- make VipsThread opaque ... have a separate VipsThreadState struct holding - make VipsThread opaque ... have a separate VipsThreadState struct holding
stuff and only expose that stuff and only expose that
- add progress feedback ... main thread should wake on every tile? or every
buffer write? or every second?
- try with im_render(), ouch, that will be painful - try with im_render(), ouch, that will be painful
- switch other file sinks over, maybe have a shim layer? - switch other file sinks over, maybe have a shim layer?

View File

@ -88,6 +88,11 @@ typedef int (*VipsThreadpoolAllocate)( VipsThread *thr,
typedef int (*VipsThreadpoolWork)( VipsThread *thr, REGION *reg, typedef int (*VipsThreadpoolWork)( VipsThread *thr, REGION *reg,
void *a, void *b, void *c ); void *a, void *b, void *c );
/* A progress function. This is run by the main thread once for every
* allocation. Return an error to kill computation early.
*/
typedef int (*VipsThreadpoolProgress)( void *a, void *b, void *c );
/* What we track for a group of threads working together. /* What we track for a group of threads working together.
*/ */
typedef struct _VipsThreadpool { typedef struct _VipsThreadpool {
@ -111,9 +116,13 @@ typedef struct _VipsThreadpool {
*/ */
im_semaphore_t finish; im_semaphore_t finish;
/* Workers up this for every loop to make the main thread tick.
*/
im_semaphore_t tick;
/* Set this to abort evaluation early with an error. /* Set this to abort evaluation early with an error.
*/ */
gboolean kill; gboolean error;
/* Set by Allocate (via an arg) to indicate normal end of computation. /* Set by Allocate (via an arg) to indicate normal end of computation.
*/ */
@ -125,9 +134,10 @@ typedef struct _VipsThreadpool {
gboolean zombie; gboolean zombie;
} VipsThreadpool; } VipsThreadpool;
int vips_threadpool_run( VipsImage *im, int vips_threadpool_run( VipsImage *im,
VipsThreadpoolAllocate allocate, VipsThreadpoolWork work, VipsThreadpoolAllocate allocate,
VipsThreadpoolWork work,
VipsThreadpoolProgress progress,
void *a, void *b, void *c ); void *a, void *b, void *c );
void vips_get_tile_size( VipsImage *im, void vips_get_tile_size( VipsImage *im,
int *tile_width, int *tile_height, int *nlines ); int *tile_width, int *tile_height, int *nlines );

View File

@ -396,34 +396,67 @@ wbuffer_work_fn( VipsThread *thr,
return( 0 ); return( 0 );
} }
/* Our VipsThreadpoolProgress function ... send some eval progress feedback.
*/
static int
wbuffer_progress_fn( void *a, void *b, void *c )
{
Write *write = (Write *) a;
/* Trigger any eval callbacks on our source image and
* check for errors.
*/
if( im__handle_eval( write->im,
write->tile_width, write->tile_height ) )
return( -1 );
return( 0 );
}
static void
write_init( Write *write,
VipsImage *im, im_wbuffer_fn write_fn, void *a, void *b )
{
write->im = im;
write->buf = wbuffer_new( write );
write->buf_back = wbuffer_new( write );
write->x = 0;
write->y = 0;
write->write_fn = write_fn;
write->a = a;
write->b = b;
vips_get_tile_size( im,
&write->tile_width, &write->tile_height, &write->nlines );
}
static void
write_free( Write *write )
{
IM_FREEF( wbuffer_free, write->buf );
IM_FREEF( wbuffer_free, write->buf_back );
}
int int
im_wbuffer2( VipsImage *im, im_wbuffer_fn write_fn, void *a, void *b ) im_wbuffer2( VipsImage *im, im_wbuffer_fn write_fn, void *a, void *b )
{ {
Write write; Write write;
int result; int result;
write.im = im;
write.buf = wbuffer_new( &write );
write.buf_back = wbuffer_new( &write );
write.x = 0;
write.y = 0;
write.write_fn = write_fn;
write.a = a;
write.b = b;
vips_get_tile_size( im,
&write.tile_width, &write.tile_height, &write.nlines );
if( im__start_eval( im ) ) if( im__start_eval( im ) )
return( -1 ); return( -1 );
write_init( &write, im, write_fn, a, b );
result = 0; result = 0;
if( !write.buf || if( !write.buf ||
!write.buf_back || !write.buf_back ||
wbuffer_position( write.buf, 0, write.nlines ) || wbuffer_position( write.buf, 0, write.nlines ) ||
vips_threadpool_run( im, vips_threadpool_run( im,
wbuffer_allocate_fn, wbuffer_work_fn, wbuffer_allocate_fn,
wbuffer_work_fn,
wbuffer_progress_fn,
&write, NULL, NULL ) ) &write, NULL, NULL ) )
result = -1; result = -1;
@ -440,10 +473,7 @@ im_wbuffer2( VipsImage *im, im_wbuffer_fn write_fn, void *a, void *b )
im__end_eval( im ); im__end_eval( im );
/* Free buffers ... this will wait for the final write to finish too. write_free( &write );
*/
wbuffer_free( write.buf );
wbuffer_free( write.buf_back );
return( result ); return( result );
} }

View File

@ -136,54 +136,49 @@ thread_free( VipsThread *thr )
* from the main thread if threading is off. * from the main thread if threading is off.
*/ */
static void static void
thread_loop( VipsThread *thr ) thread_work_unit( VipsThread *thr )
{ {
VipsThreadpool *pool = thr->pool; VipsThreadpool *pool = thr->pool;
for(;;) { /* Ask for a work unit.
/* Ask for a work unit. */
*/ g_mutex_lock( pool->allocate_lock );
g_mutex_lock( pool->allocate_lock ); if( !pool->stop ) {
if( !pool->stop ) { if( pool->allocate( thr,
if( pool->allocate( thr, pool->a, pool->b, pool->c, &pool->stop ) ) {
pool->a, pool->b, pool->c, &pool->stop ) ) thr->error = TRUE;
thr->error = -1; pool->error = TRUE;
} }
g_mutex_unlock( pool->allocate_lock );
if( pool->stop || pool->kill )
break;
if( thr->exit || thr->error )
break;
#ifdef TIME_THREAD
/* Note start time.
*/
if( thr->btime && thr->tpos < IM_TBUF_SIZE )
thr->btime[thr->tpos] =
g_timer_elapsed( thread_timer, NULL );
#endif /*TIME_THREAD*/
/* Loop once.
*/
if( pool->work( thr, thr->reg, pool->a, pool->b, pool->c ) )
thr->error = 1;
if( pool->stop || pool->kill )
break;
if( thr->exit || thr->error )
break;
#ifdef TIME_THREAD
/* Note stop time.
*/
if( thr->etime && thr->tpos < IM_TBUF_SIZE ) {
thr->etime[thr->tpos] =
g_timer_elapsed( thread_timer, NULL );
thr->tpos += 1;
}
#endif /*TIME_THREAD*/
} }
g_mutex_unlock( pool->allocate_lock );
if( pool->stop || pool->error )
return;
#ifdef TIME_THREAD
/* Note start time.
*/
if( thr->btime && thr->tpos < IM_TBUF_SIZE )
thr->btime[thr->tpos] =
g_timer_elapsed( thread_timer, NULL );
#endif /*TIME_THREAD*/
/* Process a work unit.
*/
if( pool->work( thr, thr->reg, pool->a, pool->b, pool->c ) ) {
thr->error = TRUE;
pool->error = TRUE;
}
#ifdef TIME_THREAD
/* Note stop time.
*/
if( thr->etime && thr->tpos < IM_TBUF_SIZE ) {
thr->etime[thr->tpos] =
g_timer_elapsed( thread_timer, NULL );
thr->tpos += 1;
}
#endif /*TIME_THREAD*/
} }
#ifdef HAVE_THREADS #ifdef HAVE_THREADS
@ -202,9 +197,16 @@ thread_main_loop( void *a )
*/ */
im__region_take_ownership( thr->reg ); im__region_take_ownership( thr->reg );
/* Do all the work we can. /* Process work units! Always tick, even if we are stopping, so the
* main thread will wake up for exit.
*/ */
thread_loop( thr ); for(;;) {
thread_work_unit( thr );
im_semaphore_up( &pool->tick );
if( pool->stop || pool->error )
break;
}
/* We are exiting: tell the main thread. /* We are exiting: tell the main thread.
*/ */
@ -321,6 +323,8 @@ vips_threadpool_free( VipsThreadpool *pool )
threadpool_kill_threads( pool ); threadpool_kill_threads( pool );
IM_FREEF( g_mutex_free, pool->allocate_lock ); IM_FREEF( g_mutex_free, pool->allocate_lock );
im_semaphore_destroy( &pool->finish );
im_semaphore_destroy( &pool->tick );
pool->zombie = 1; pool->zombie = 1;
return( 0 ); return( 0 );
@ -354,8 +358,9 @@ vips_threadpool_new( VipsImage *im )
pool->nthr = im_concurrency_get(); pool->nthr = im_concurrency_get();
pool->thr = NULL; pool->thr = NULL;
im_semaphore_init( &pool->finish, 0, "finish" ); im_semaphore_init( &pool->finish, 0, "finish" );
pool->kill = FALSE; im_semaphore_init( &pool->tick, 0, "tick" );
pool->stop = FALSE; pool->stop = FALSE;
pool->error = FALSE;
pool->zombie = FALSE; pool->zombie = FALSE;
/* Attach tidy-up callback. /* Attach tidy-up callback.
@ -403,7 +408,9 @@ threadpool_create_threads( VipsThreadpool *pool )
int int
vips_threadpool_run( VipsImage *im, vips_threadpool_run( VipsImage *im,
VipsThreadpoolAllocate allocate, VipsThreadpoolWork work, VipsThreadpoolAllocate allocate,
VipsThreadpoolWork work,
VipsThreadpoolProgress progress,
void *a, void *b, void *c ) void *a, void *b, void *c )
{ {
VipsThreadpool *pool; VipsThreadpool *pool;
@ -430,29 +437,35 @@ vips_threadpool_run( VipsImage *im,
return( -1 ); return( -1 );
} }
for(;;) {
#ifdef HAVE_THREADS #ifdef HAVE_THREADS
/* Wait for a tick from a worker.
*/
im_semaphore_down( &pool->tick );
#else
/* No threads, do the work ourselves in the main thread.
*/
thread_work_unit( pool->thr[0] );
#endif /*HAVE_THREADS*/
if( pool->stop || pool->error )
break;
if( progress &&
progress( pool->a, pool->b, pool->c ) )
pool->error = TRUE;
if( pool->stop || pool->error )
break;
}
/* Wait for them all to hit finish. /* Wait for them all to hit finish.
*/ */
im_semaphore_downn( &pool->finish, pool->nthr ); im_semaphore_downn( &pool->finish, pool->nthr );
#else
/* No threads, do the work ourselves in the main thread.
*/
thread_loop( pool->thr[0] );
#endif /*HAVE_THREADS*/
/* Test for error. /* Return 0 for success.
*/ */
result = 0; result = pool->error ? -1 : 0;
if( pool->kill ||
pool->im->kill )
result = -1;
else {
int i;
for( i = 0; i < pool->nthr; i++ )
if( pool->thr[i]->error )
result = -1;
}
vips_threadpool_free( pool ); vips_threadpool_free( pool );