diff --git a/ChangeLog b/ChangeLog index 7a08d0c9..3e8d1ec0 100644 --- a/ChangeLog +++ b/ChangeLog @@ -1,3 +1,6 @@ +21/3/10 started 7.21.3 +- added progress feedback to threadpool + 16/1/10 started 7.21.2 - "invalidate" is careful to keep images alive, so invalidate callbacks can do im_close() diff --git a/TODO b/TODO index 7c309e39..0012effc 100644 --- a/TODO +++ b/TODO @@ -1,9 +1,6 @@ - make VipsThread opaque ... have a separate VipsThreadState struct holding stuff and only expose that -- add progress feedback ... main thread should wake on every tile? or every - buffer write? or every second? - - try with im_render(), ouch, that will be painful - switch other file sinks over, maybe have a shim layer? diff --git a/libvips/include/vips/threadpool.h b/libvips/include/vips/threadpool.h index 2f9faacd..a004c9ad 100644 --- a/libvips/include/vips/threadpool.h +++ b/libvips/include/vips/threadpool.h @@ -88,6 +88,11 @@ typedef int (*VipsThreadpoolAllocate)( VipsThread *thr, typedef int (*VipsThreadpoolWork)( VipsThread *thr, REGION *reg, void *a, void *b, void *c ); +/* A progress function. This is run by the main thread once for every + * allocation. Return an error to kill computation early. + */ +typedef int (*VipsThreadpoolProgress)( void *a, void *b, void *c ); + /* What we track for a group of threads working together. */ typedef struct _VipsThreadpool { @@ -111,9 +116,13 @@ typedef struct _VipsThreadpool { */ im_semaphore_t finish; + /* Workers up this for every loop to make the main thread tick. + */ + im_semaphore_t tick; + /* Set this to abort evaluation early with an error. */ - gboolean kill; + gboolean error; /* Set by Allocate (via an arg) to indicate normal end of computation. */ @@ -125,9 +134,10 @@ typedef struct _VipsThreadpool { gboolean zombie; } VipsThreadpool; - int vips_threadpool_run( VipsImage *im, - VipsThreadpoolAllocate allocate, VipsThreadpoolWork work, + VipsThreadpoolAllocate allocate, + VipsThreadpoolWork work, + VipsThreadpoolProgress progress, void *a, void *b, void *c ); void vips_get_tile_size( VipsImage *im, int *tile_width, int *tile_height, int *nlines ); diff --git a/libvips/iofuncs/im_wbuffer2.c b/libvips/iofuncs/im_wbuffer2.c index aa312bdd..946f829a 100644 --- a/libvips/iofuncs/im_wbuffer2.c +++ b/libvips/iofuncs/im_wbuffer2.c @@ -396,34 +396,67 @@ wbuffer_work_fn( VipsThread *thr, return( 0 ); } +/* Our VipsThreadpoolProgress function ... send some eval progress feedback. + */ +static int +wbuffer_progress_fn( void *a, void *b, void *c ) +{ + Write *write = (Write *) a; + + /* Trigger any eval callbacks on our source image and + * check for errors. + */ + if( im__handle_eval( write->im, + write->tile_width, write->tile_height ) ) + return( -1 ); + + return( 0 ); +} + +static void +write_init( Write *write, + VipsImage *im, im_wbuffer_fn write_fn, void *a, void *b ) +{ + write->im = im; + write->buf = wbuffer_new( write ); + write->buf_back = wbuffer_new( write ); + write->x = 0; + write->y = 0; + write->write_fn = write_fn; + write->a = a; + write->b = b; + + vips_get_tile_size( im, + &write->tile_width, &write->tile_height, &write->nlines ); +} + +static void +write_free( Write *write ) +{ + IM_FREEF( wbuffer_free, write->buf ); + IM_FREEF( wbuffer_free, write->buf_back ); +} + int im_wbuffer2( VipsImage *im, im_wbuffer_fn write_fn, void *a, void *b ) { Write write; int result; - write.im = im; - write.buf = wbuffer_new( &write ); - write.buf_back = wbuffer_new( &write ); - write.x = 0; - write.y = 0; - write.write_fn = write_fn; - write.a = a; - write.b = b; - - vips_get_tile_size( im, - &write.tile_width, &write.tile_height, &write.nlines ); - - if( im__start_eval( im ) ) + if( im__start_eval( im ) ) return( -1 ); + write_init( &write, im, write_fn, a, b ); + result = 0; if( !write.buf || !write.buf_back || wbuffer_position( write.buf, 0, write.nlines ) || vips_threadpool_run( im, - wbuffer_allocate_fn, wbuffer_work_fn, + wbuffer_allocate_fn, + wbuffer_work_fn, + wbuffer_progress_fn, &write, NULL, NULL ) ) result = -1; @@ -440,10 +473,7 @@ im_wbuffer2( VipsImage *im, im_wbuffer_fn write_fn, void *a, void *b ) im__end_eval( im ); - /* Free buffers ... this will wait for the final write to finish too. - */ - wbuffer_free( write.buf ); - wbuffer_free( write.buf_back ); + write_free( &write ); return( result ); } diff --git a/libvips/iofuncs/threadpool.c b/libvips/iofuncs/threadpool.c index 738a64c5..4de23b55 100644 --- a/libvips/iofuncs/threadpool.c +++ b/libvips/iofuncs/threadpool.c @@ -133,57 +133,52 @@ thread_free( VipsThread *thr ) } /* The main loop: get some work, do it! Can run from many worker threads, or - * from the main thread if threading is off. + * from the main thread if threading is off. */ static void -thread_loop( VipsThread *thr ) +thread_work_unit( VipsThread *thr ) { VipsThreadpool *pool = thr->pool; - for(;;) { - /* Ask for a work unit. - */ - g_mutex_lock( pool->allocate_lock ); - if( !pool->stop ) { - if( pool->allocate( thr, - pool->a, pool->b, pool->c, &pool->stop ) ) - thr->error = -1; + /* Ask for a work unit. + */ + g_mutex_lock( pool->allocate_lock ); + if( !pool->stop ) { + if( pool->allocate( thr, + pool->a, pool->b, pool->c, &pool->stop ) ) { + thr->error = TRUE; + pool->error = TRUE; } - g_mutex_unlock( pool->allocate_lock ); - - if( pool->stop || pool->kill ) - break; - if( thr->exit || thr->error ) - break; - -#ifdef TIME_THREAD - /* Note start time. - */ - if( thr->btime && thr->tpos < IM_TBUF_SIZE ) - thr->btime[thr->tpos] = - g_timer_elapsed( thread_timer, NULL ); -#endif /*TIME_THREAD*/ - - /* Loop once. - */ - if( pool->work( thr, thr->reg, pool->a, pool->b, pool->c ) ) - thr->error = 1; - - if( pool->stop || pool->kill ) - break; - if( thr->exit || thr->error ) - break; - -#ifdef TIME_THREAD - /* Note stop time. - */ - if( thr->etime && thr->tpos < IM_TBUF_SIZE ) { - thr->etime[thr->tpos] = - g_timer_elapsed( thread_timer, NULL ); - thr->tpos += 1; - } -#endif /*TIME_THREAD*/ } + g_mutex_unlock( pool->allocate_lock ); + + if( pool->stop || pool->error ) + return; + +#ifdef TIME_THREAD + /* Note start time. + */ + if( thr->btime && thr->tpos < IM_TBUF_SIZE ) + thr->btime[thr->tpos] = + g_timer_elapsed( thread_timer, NULL ); +#endif /*TIME_THREAD*/ + + /* Process a work unit. + */ + if( pool->work( thr, thr->reg, pool->a, pool->b, pool->c ) ) { + thr->error = TRUE; + pool->error = TRUE; + } + +#ifdef TIME_THREAD + /* Note stop time. + */ + if( thr->etime && thr->tpos < IM_TBUF_SIZE ) { + thr->etime[thr->tpos] = + g_timer_elapsed( thread_timer, NULL ); + thr->tpos += 1; + } +#endif /*TIME_THREAD*/ } #ifdef HAVE_THREADS @@ -202,11 +197,18 @@ thread_main_loop( void *a ) */ im__region_take_ownership( thr->reg ); - /* Do all the work we can. + /* Process work units! Always tick, even if we are stopping, so the + * main thread will wake up for exit. */ - thread_loop( thr ); + for(;;) { + thread_work_unit( thr ); + im_semaphore_up( &pool->tick ); - /* We are exiting: tell the main thread. + if( pool->stop || pool->error ) + break; + } + + /* We are exiting: tell the main thread. */ im_semaphore_up( &pool->finish ); @@ -321,6 +323,8 @@ vips_threadpool_free( VipsThreadpool *pool ) threadpool_kill_threads( pool ); IM_FREEF( g_mutex_free, pool->allocate_lock ); + im_semaphore_destroy( &pool->finish ); + im_semaphore_destroy( &pool->tick ); pool->zombie = 1; return( 0 ); @@ -354,8 +358,9 @@ vips_threadpool_new( VipsImage *im ) pool->nthr = im_concurrency_get(); pool->thr = NULL; im_semaphore_init( &pool->finish, 0, "finish" ); - pool->kill = FALSE; + im_semaphore_init( &pool->tick, 0, "tick" ); pool->stop = FALSE; + pool->error = FALSE; pool->zombie = FALSE; /* Attach tidy-up callback. @@ -403,7 +408,9 @@ threadpool_create_threads( VipsThreadpool *pool ) int vips_threadpool_run( VipsImage *im, - VipsThreadpoolAllocate allocate, VipsThreadpoolWork work, + VipsThreadpoolAllocate allocate, + VipsThreadpoolWork work, + VipsThreadpoolProgress progress, void *a, void *b, void *c ) { VipsThreadpool *pool; @@ -430,29 +437,35 @@ vips_threadpool_run( VipsImage *im, return( -1 ); } + for(;;) { #ifdef HAVE_THREADS + /* Wait for a tick from a worker. + */ + im_semaphore_down( &pool->tick ); +#else + /* No threads, do the work ourselves in the main thread. + */ + thread_work_unit( pool->thr[0] ); +#endif /*HAVE_THREADS*/ + + if( pool->stop || pool->error ) + break; + + if( progress && + progress( pool->a, pool->b, pool->c ) ) + pool->error = TRUE; + + if( pool->stop || pool->error ) + break; + } + /* Wait for them all to hit finish. */ im_semaphore_downn( &pool->finish, pool->nthr ); -#else - /* No threads, do the work ourselves in the main thread. - */ - thread_loop( pool->thr[0] ); -#endif /*HAVE_THREADS*/ - /* Test for error. + /* Return 0 for success. */ - result = 0; - if( pool->kill || - pool->im->kill ) - result = -1; - else { - int i; - - for( i = 0; i < pool->nthr; i++ ) - if( pool->thr[i]->error ) - result = -1; - } + result = pool->error ? -1 : 0; vips_threadpool_free( pool );