This commit is contained in:
John Cupitt 2010-03-19 14:54:12 +00:00
parent b0c8ddc1df
commit dedbe06ea4
4 changed files with 592 additions and 47 deletions

View File

@ -72,13 +72,14 @@ typedef struct {
/* A work function. This does a unit of work (eg. processing a tile or /* A work function. This does a unit of work (eg. processing a tile or
* whatever). * whatever).
*/ */
typedef int (*VipsThreadpoolWork)( VipsThread *thr, typedef int (*VipsThreadpoolWork)( VipsThread *thr, REGION *reg,
REGION *, void *, void *, void * ); void *a, void *b, void *c );
/* A work allocate function. This is run single-threaded by a worker to /* A work allocate function. This is run single-threaded by a worker to
* set up a new work unit. Return TRUE if computation is all done. * set up a new work unit.
*/ */
typedef gboolean (*VipsThreadpoolAllocate)( VipsThread *thr ); typedef int (*VipsThreadpoolAllocate)( VipsThread *thr,
void *a, void *b, void *c );
/* What we track for a group of threads working together. /* What we track for a group of threads working together.
*/ */
@ -86,9 +87,7 @@ typedef struct _VipsThreadpool {
/* All private. /* All private.
*/ */
/*< private >*/ /*< private >*/
IMAGE *im; /* Image we are calculating */ VipsImage *im; /* Image we are calculating */
int pw, ph; /* Tile size */
int nlines; /* Scanlines-at-once we prefer for iteration */
/* Do a unit of work (runs in parallel) and allocate a unit of work /* Do a unit of work (runs in parallel) and allocate a unit of work
* (serial). Plus the mutex we use to serialize work allocation. * (serial). Plus the mutex we use to serialize work allocation.
@ -118,6 +117,8 @@ typedef struct _VipsThreadpool {
int vips_threadpool_run( VipsImage *im, int vips_threadpool_run( VipsImage *im,
VipsThreadpoolAllocate allocate, VipsThreadpoolWork work, VipsThreadpoolAllocate allocate, VipsThreadpoolWork work,
void *a, void *b, void *c ); void *a, void *b, void *c );
void vips_get_tile_size( VipsImage *im,
int *tile_width, int *tile_height, int *nlines );
#ifdef __cplusplus #ifdef __cplusplus
} }

View File

@ -39,7 +39,6 @@ libiofuncs_la_SOURCES = \
rect.c \ rect.c \
semaphore.c \ semaphore.c \
threadgroup.c \ threadgroup.c \
threadpool.c \
util.c \ util.c \
im_init_world.c \ im_init_world.c \
buf.c \ buf.c \

View File

@ -0,0 +1,535 @@
/* Double-buffered write.
*
* 19/3/10
* - from im_wbuffer.c
* - move on top of VipsThreadpool, instead of im_threadgroup_t
*/
/*
This file is part of VIPS.
VIPS is free software; you can redistribute it and/or modify
it under the terms of the GNU Lesser General Public License as published by
the Free Software Foundation; either version 2 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
*/
/*
These files are distributed with VIPS - http://www.vips.ecs.soton.ac.uk
*/
/*
*/
#define DEBUG
#ifdef HAVE_CONFIG_H
#include <config.h>
#endif /*HAVE_CONFIG_H*/
#include <vips/intl.h>
#include <stdio.h>
#include <stdlib.h>
#include <stdarg.h>
#include <assert.h>
#include <errno.h>
#include <string.h>
#include <sys/types.h>
#ifdef HAVE_UNISTD_H
#include <unistd.h>
#endif /*HAVE_UNISTD_H*/
#include <vips/vips.h>
#include <vips/internal.h>
#include <vips/thread.h>
#include <vips/threadpool.h>
#ifdef WITH_DMALLOC
#include <dmalloc.h>
#endif /*WITH_DMALLOC*/
/* A buffer we are going to write to disc in a background thread.
*/
typedef struct _WriteBuffer {
struct _Write *write;
REGION *region; /* Pixels */
Rect area; /* Part of image this region covers */
im_semaphore_t go; /* Start bg thread loop */
im_semaphore_t nwrite; /* Number of threads writing to region */
im_semaphore_t done; /* Bg thread has done write */
int write_errno; /* Save write errors here */
GThread *thread; /* BG writer thread */
gboolean kill; /* Set to ask thread to exit */
im_wbuffer_fn write_fn; /* BG write with this */
void *a; /* Client data */
void *b;
} WriteBuffer;
/* Per-call state.
*/
typedef struct _Write {
VipsImage *im;
/* We are current writing tiles to buf, buf_back is in the hands of
* the bg write thread.
*/
WriteBuffer *buf;
WriteBuffer *buf_back;
/* The position we're at in buf.
*/
int x;
int y;
/* The tilesize we've picked.
*/
int tile_width;
int tile_height;
int nlines;
} Write;
static void
wbuffer_free( WriteBuffer *wbuffer )
{
/* Is there a thread running this region? Kill it!
*/
if( wbuffer->thread ) {
wbuffer->kill = TRUE;
im_semaphore_up( &wbuffer->go );
/* Return value is always NULL (see wbuffer_write_thread).
*/
(void) g_thread_join( wbuffer->thread );
#ifdef DEBUG_CREATE
printf( "wbuffer_free: g_thread_join()\n" );
#endif /*DEBUG_CREATE*/
wbuffer->thread = NULL;
}
IM_FREEF( im_region_free, wbuffer->region );
im_semaphore_destroy( &wbuffer->go );
im_semaphore_destroy( &wbuffer->nwrite );
im_semaphore_destroy( &wbuffer->done );
im_free( wbuffer );
}
static void
wbuffer_write( WriteBuffer *wbuffer )
{
wbuffer->write_errno = wbuffer->write_fn( wbuffer->region,
&wbuffer->area, wbuffer->a, wbuffer->b );
#ifdef DEBUG
printf( "wbuffer_write: %d bytes from wbuffer %p\n",
wbuffer->region->bpl * wbuffer->area.height, wbuffer );
#endif /*DEBUG*/
}
#ifdef HAVE_THREADS
/* Run this as a thread to do a BG write.
*/
static void *
wbuffer_write_thread( void *data )
{
WriteBuffer *wbuffer = (WriteBuffer *) data;
for(;;) {
im_semaphore_down( &wbuffer->go );
if( wbuffer->kill )
break;
/* Wait for all writer threads to leave this wbuffer.
*/
im_semaphore_downn( &wbuffer->nwrite, 0 );
wbuffer_write( wbuffer );
/* Signal write complete.
*/
im_semaphore_up( &wbuffer->done );
}
return( NULL );
}
#endif /*HAVE_THREADS*/
static WriteBuffer *
wbuffer_new( Write *write, im_wbuffer_fn write_fn, void *a, void *b )
{
WriteBuffer *wbuffer;
if( !(wbuffer = IM_NEW( NULL, WriteBuffer )) )
return( NULL );
wbuffer->write = write;
wbuffer->region = NULL;
im_semaphore_init( &wbuffer->go, 0, "go" );
im_semaphore_init( &wbuffer->nwrite, 0, "nwrite" );
im_semaphore_init( &wbuffer->done, 0, "done" );
wbuffer->write_errno = 0;
wbuffer->thread = NULL;
wbuffer->kill = FALSE;
wbuffer->write_fn = write_fn;
wbuffer->a = a;
wbuffer->b = b;
if( !(wbuffer->region = im_region_create( write->im )) ) {
wbuffer_free( wbuffer );
return( NULL );
}
#ifdef HAVE_THREADS
/* Make this last (picks up parts of wbuffer on startup).
*/
if( !(wbuffer->thread = g_thread_create( wbuffer_write_thread, wbuffer,
TRUE, NULL )) ) {
im_error( "wbuffer_new", "%s", _( "unable to create thread" ) );
wbuffer_free( wbuffer );
return( NULL );
}
#endif /*HAVE_THREADS*/
return( wbuffer );
}
/* Our VipsThreadpoolWork function ... generate a tile and tell the wbuffer
* write thread that we're done.
*/
static int
wbuffer_work_fn( VipsThread *thr,
REGION *reg, void *a, void *b, void *c )
{
WriteBuffer *wbuffer = (WriteBuffer *) a;
Write *write = wbuffer->write;
if( im_prepare_to( reg, wbuffer->reg,
&thr->pos, thr->pos.left, thr->pos.top ) )
return( -1 );
im_semaphore_up( &wbuffer->nwrite );
return( 0 );
}
/* Move a wbuffer to a position.
*/
static int
wbuffer_position( WriteBuffer *wbuffer, int top, int height )
{
Rect image, area;
image.left = 0;
image.top = 0;
image.width = wbuffer->write->im->Xsize;
image.height = wbuffer->write->im->Ysize;
area.left = 0;
area.top = top;
area.width = wbuffer->write->im->Xsize;
area.height = height;
im_rect_intersectrect( &area, &image, &wbuffer->area );
if( im_region_buffer( wbuffer->region, &wbuffer->area ) )
return( -1 );
/* This should be an exclusive buffer, hopefully.
*/
g_assert( !wbuffer->region->buffer->done );
return( 0 );
}
/* Our VipsThreadpoolAllocate function ... move the thread to the next tile
* that needs doing. If no buffer is available (the bg writer hasn't yet
* finished with it), we block. If all tiles are done, we return FALSE to end
* iteration.
*/
static gboolean
wbuffer_allocate_fn( VipsThread *thr, void *a, void *b, void *c )
{
WriteBuffer *wbuffer = (WriteBuffer *) a;
Write *write = wbuffer->write;
Rect image;
Rect tile;
/* Is the current x/y position OK? New line or maybe new buffer or
* maybe all done.
*/
if( write->x > write->buf->area.width ) {
write->x = 0;
write->y += write->tile_height;
if( write->y > IM_RECT_BOTTOM( &write->buf->area ) ) {
/* Buffer full. Block until the other buffer has been
* written.
*/
if( write->buf->area.top > 0 ) {
im_semaphore_down( &write->buf_back->done );
/* Previous write suceeded?
*/
if( write->buf_back->write_errno ) {
thr->err = write->buf_back->write_errno;
return( -1 );
}
}
/* Start writing this buffer.
*/
im_semaphore_up( &write->buf->go );
/* Swap buffers.
*/
{
WriteBuffer *t;
t = write->buf;
write->buf = write->buf_back;
write->buf_back = t;
}
/* End of image?
*/
if( write->buf_back->area.top + write->nlines >
write->im->Ysize )
return( -1 );
/* Position buf below buf_back.
*/
if( wbuffer_position( write->buf,
write->buf_back->area.top + write->nlines,
write->nlines ) )
return( -1 );
}
}
image.left = 0;
image.top = 0;
image.width = write->im->Xsize;
image.height = write->im->Ysize;
tile.left = write->x;
tile.top = write->y;
tile.width = write->tile_width;
tile.height = write->tile_height;
im_rect_intersectrect( &image, &tile, &thr->pos );
write->x += write->tile_width;
return( 0 );
}
/* Loop over a wbuffer filling it threadily.
*/
static int
wbuffer_fill( WriteBuffer *wbuffer )
{
Rect *area = &wbuffer->area;
im_threadgroup_t *tg = wbuffer->tg;
IMAGE *im = tg->im;
Rect image;
int x, y;
#ifdef DEBUG
printf( "wbuffer_fill: starting for wbuffer %p at line %d\n",
wbuffer, area->top );
#endif /*DEBUG*/
image.left = 0;
image.top = 0;
image.width = im->Xsize;
image.height = im->Ysize;
/* Loop over area, sparking threads for all sub-parts in turn.
*/
for( y = area->top; y < IM_RECT_BOTTOM( area ); y += tg->ph )
for( x = area->left; x < IM_RECT_RIGHT( area ); x += tg->pw ) {
im_thread_t *thr;
Rect pos;
Rect clipped;
/* thrs appear on idle when the child thread does
* threadgroup_idle_add and hits the 'go' semaphore.
*/
thr = im_threadgroup_get( tg );
/* Set the position we want to generate with this
* thread. Clip against the size of the image and the
* space available in or.
*/
pos.left = x;
pos.top = y;
pos.width = tg->pw;
pos.height = tg->ph;
im_rect_intersectrect( &pos, &image, &clipped );
im_rect_intersectrect( &clipped, area, &clipped );
/* Note params.
*/
thr->oreg = wbuffer->region;
thr->pos = clipped;
thr->x = clipped.left;
thr->y = clipped.top;
thr->a = wbuffer;
#ifdef DEBUG
printf( "wbuffer_fill: starting for tile at %d x %d\n",
x, y );
#endif /*DEBUG*/
/* Add writer to n of writers on wbuffer, set it going.
*/
im_semaphore_upn( &wbuffer->nwrite, -1 );
im_threadgroup_trigger( thr );
/* Trigger any eval callbacks on our source image and
* check for errors.
*/
if( im__handle_eval( tg->im, tg->pw, tg->ph ) ||
im_threadgroup_iserror( tg ) ) {
/* Don't kill threads yet ... we may want to
* get some error stuff out of them.
*/
im_threadgroup_wait( tg );
return( -1 );
}
}
return( 0 );
}
/* Eval to file.
*/
static int
wbuffer_eval_to_file( WriteBuffer *b1, WriteBuffer *b2 )
{
im_threadgroup_t *tg = b1->tg;
IMAGE *im = tg->im;
int y;
assert( b1->tg == b2->tg );
#ifdef DEBUG
int nstrips;
nstrips = 0;
printf( "wbuffer_eval_to_file: partial image output to file\n" );
#endif /*DEBUG*/
/* What threads do at the end of each tile ... decrement the nwrite
* semaphore.
*/
tg->work = wbuffer_work_fn;
/* Fill to in steps, write each to the output.
*/
for( y = 0; y < im->Ysize; y += tg->nlines ) {
/* Attach to this position in image.
*/
if( wbuffer_position( b1, 0, y, im->Xsize, tg->nlines ) )
return( -1 );
/* Spark off threads to fill with data.
*/
if( wbuffer_fill( b1 ) )
return( -1 );
/* We have to keep the ordering on wbuffer writes, so we can't
* have more than one background write going at once. Plus we
* want to make sure write()s don't get interleaved. Wait for
* the previous BG write (if any) to finish.
*/
if( y > 0 ) {
im_semaphore_down( &b2->done );
/* Previous write suceeded?
*/
if( b2->write_errno ) {
im_error_system( b2->write_errno,
"im__eval_to_file",
"%s", _( "write failed" ) );
return( -1 );
}
}
/* b1 write can go.
*/
im_semaphore_up( &b1->go );
#ifndef HAVE_THREADS
/* No threading ... just write.
*/
wbuffer_write( b1 );
#endif /*HAVE_THREADS*/
/* Rotate wbuffers.
*/
{
WriteBuffer *t;
t = b1; b1 = b2; b2 = t;
}
#ifdef DEBUG
nstrips++;
#endif /*DEBUG*/
}
/* Wait for all threads to finish, check for any errors.
*/
im_threadgroup_wait( tg );
im_semaphore_down( &b2->done );
if( im_threadgroup_iserror( tg ) )
return( -1 );
if( b1->write_errno || b2->write_errno ) {
im_error_system(
b1->write_errno ? b1->write_errno : b2->write_errno,
"im__eval_to_file", "%s", _( "write failed" ) );
return( -1 );
}
#ifdef DEBUG
printf( "wbuffer_eval_to_file: success! %d strips written\n", nstrips );
#endif /*DEBUG*/
return( 0 );
}
int
im_wbuffer( im_threadgroup_t *tg, im_wbuffer_fn write_fn, void *a, void *b )
{
WriteBuffer *b1, *b2;
int result;
if( im__start_eval( tg->im ) )
return( -1 );
result = 0;
b1 = wbuffer_new( tg, write_fn, a, b );
b2 = wbuffer_new( tg, write_fn, a, b );
if( !b1 || !b2 || wbuffer_eval_to_file( b1, b2 ) )
result = -1;
im__end_eval( tg->im );
wbuffer_free( b1 );
wbuffer_free( b2 );
return( result );
}

View File

@ -173,7 +173,8 @@ thread_main_loop( void *a )
if( !pool->stop ) { if( !pool->stop ) {
gboolean morework; gboolean morework;
morework = pool->allocate( thr ); morework = pool->allocate( thr,
pool->a, pool->b, pool->c );
if( !morework ) if( !morework )
pool->stop = TRUE; pool->stop = TRUE;
} }
@ -356,8 +357,7 @@ vips_threadpool_new( VipsImage *im )
pool->allocate = NULL; pool->allocate = NULL;
pool->work = NULL; pool->work = NULL;
pool->allocate_lock = g_mutex_new(); pool->allocate_lock = g_mutex_new();
if( (pool->nthr = im_concurrency_get()) < 0 ) pool->nthr = im_concurrency_get();
return( NULL );
pool->thr = NULL; pool->thr = NULL;
im_semaphore_init( &pool->finish, 0, "finish" ); im_semaphore_init( &pool->finish, 0, "finish" );
pool->kill = FALSE; pool->kill = FALSE;
@ -365,37 +365,6 @@ vips_threadpool_new( VipsImage *im )
pool->progress = FALSE; pool->progress = FALSE;
pool->zombie = FALSE; pool->zombie = FALSE;
/* Pick a render geometry.
*/
switch( pool->im->dhint ) {
case IM_SMALLTILE:
pool->pw = im__tile_width;
pool->ph = im__tile_height;
/* Enough lines of tiles that we can expect to be able to keep
* nthr busy.
*/
pool->nlines = pool->ph * (1 + pool->nthr /
IM_MAX( 1, pool->im->Xsize / pool->pw ));
break;
case IM_FATSTRIP:
pool->pw = pool->im->Xsize;
pool->ph = im__fatstrip_height;
pool->nlines = pool->ph * pool->nthr * 2;
break;
case IM_ANY:
case IM_THINSTRIP:
pool->pw = pool->im->Xsize;
pool->ph = im__thinstrip_height;
pool->nlines = pool->ph * pool->nthr * 2;
break;
default:
g_assert( 0 );
}
/* Attach tidy-up callback. /* Attach tidy-up callback.
*/ */
if( im_add_close_callback( im, if( im_add_close_callback( im,
@ -404,11 +373,6 @@ vips_threadpool_new( VipsImage *im )
return( NULL ); return( NULL );
} }
#ifdef DEBUG_IO
printf( "vips_threadpool_new: %d by %d patches, "
"groups of %d scanlines\n", pool->pw, pool->ph, pool->nlines );
#endif /*DEBUG_IO*/
#ifdef DEBUG_IO #ifdef DEBUG_IO
printf( "vips_threadpool_new: \"%s\" (%p), with %d threads\n", printf( "vips_threadpool_new: \"%s\" (%p), with %d threads\n",
im->filename, pool, pool->nthr ); im->filename, pool, pool->nthr );
@ -515,3 +479,49 @@ vips_threadpool_run( VipsImage *im,
return( result ); return( result );
} }
/* Pick a tile size and buffer size.
*/
void
vips_get_tile_size( VipsImage *im,
int *tile_width, int *tile_height, int *nlines )
{
const int nthr = im_get_concurrency();
/* Pick a render geometry.
*/
switch( pool->im->dhint ) {
case IM_SMALLTILE:
*tile_width = im__tile_width;
*tile_height = im__tile_height;
/* Enough lines of tiles that we can expect to be able to keep
* nthr busy.
*/
*nlines = *tile_height *
(1 + nthr / IM_MAX( 1, im->Xsize / *tile_width ));
break;
case IM_FATSTRIP:
*tile_width = im->Xsize;
*tile_height = im__fatstrip_height;
*nlines = *tile_height * nthr * 2;
break;
case IM_ANY:
case IM_THINSTRIP:
*tile_width = im->Xsize;
*tile_height = im__thinstrip_height;
*nlines = *tile_height * nthr * 2;
break;
default:
g_assert( 0 );
}
#ifdef DEBUG_IO
printf( "vips_get_tile_size: %d by %d patches, "
"groups of %d scanlines\n",
*tile_width, *tile_height, *nlines );
#endif /*DEBUG_IO*/
}