From 01f1885f87e3f9ba597b469cbacbd1fd8ce6c433 Mon Sep 17 00:00:00 2001 From: John Cupitt Date: Fri, 2 Nov 2007 13:03:31 +0000 Subject: [PATCH] bg write broken out from generate --- libsrc/iofuncs/im_generate.c | 383 +------------------------- libsrc/iofuncs/im_write_buffer.c | 451 +++++++++++++++++++++++++++++++ 2 files changed, 453 insertions(+), 381 deletions(-) create mode 100644 libsrc/iofuncs/im_write_buffer.c diff --git a/libsrc/iofuncs/im_generate.c b/libsrc/iofuncs/im_generate.c index 88276c71..30e8d05c 100644 --- a/libsrc/iofuncs/im_generate.c +++ b/libsrc/iofuncs/im_generate.c @@ -327,386 +327,6 @@ eval_to_memory( im_threadgroup_t *tg, REGION *or ) return( 0 ); } -/* A buffer we are going to write to disc in a background thread. - */ -typedef struct _WriteBuffer { - im_threadgroup_t *tg; /* What makes the pixels */ - REGION *region; /* Pixels */ - Rect area; /* Part of image this region covers */ - im_semaphore_t go; /* Start bg thread loop */ - im_semaphore_t nwrite; /* Number of threads writing to region */ - im_semaphore_t done; /* Bg thread has done write */ - int write_errno; /* Save write errors here */ - GThread *thread; /* BG writer thread */ - gboolean kill; /* Set to ask thread to exit */ -} WriteBuffer; - -static void -wbuffer_free( WriteBuffer *wbuffer ) -{ - /* Is there a thread running this region? Kill it! - */ - if( wbuffer->thread ) { - wbuffer->kill = TRUE; - im_semaphore_up( &wbuffer->go ); - - /* Return value is always NULL (see wbuffer_write_thread). - */ - (void) g_thread_join( wbuffer->thread ); -#ifdef DEBUG_CREATE - printf( "wbuffer_free: g_thread_join()\n" ); -#endif /*DEBUG_CREATE*/ - - wbuffer->thread = NULL; - } - - IM_FREEF( im_region_free, wbuffer->region ); - im_semaphore_destroy( &wbuffer->go ); - im_semaphore_destroy( &wbuffer->nwrite ); - im_semaphore_destroy( &wbuffer->done ); - im_free( wbuffer ); -} - -static void -wbuffer_write( WriteBuffer *wbuffer ) -{ - im_threadgroup_t *tg = wbuffer->tg; - IMAGE *im = tg->im; - REGION *region = wbuffer->region; - Rect *area = &wbuffer->area; - size_t nwritten, count; - void *buf; - - count = region->bpl * area->height; - buf = IM_REGION_ADDR( region, 0, area->top ); - do { - nwritten = write( im->fd, buf, count ); - - /* Write failed? Note in wbuffer errno for the main - * thread to pick up. - */ - if( nwritten == (size_t) -1 ) { - wbuffer->write_errno = errno; - break; - } - - buf = (void *) ((char *) buf + nwritten); - count -= nwritten; - } while( count > 0 ); - -#ifdef DEBUG_IO - printf( "wbuffer_write: %d bytes from wbuffer %p\n", - region->bpl * area->height, wbuffer ); -#endif /*DEBUG_IO*/ -} - -#ifdef HAVE_THREADS -/* Run this as a thread to do a BG write. - */ -static void * -wbuffer_write_thread( void *data ) -{ - WriteBuffer *wbuffer = (WriteBuffer *) data; - - for(;;) { - im_semaphore_down( &wbuffer->go ); - - if( wbuffer->kill ) - break; - - /* Wait for all writer threads to leave this wbuffer. - */ - im_semaphore_downn( &wbuffer->nwrite, 0 ); - - wbuffer_write( wbuffer ); - - /* Signal write complete. - */ - im_semaphore_up( &wbuffer->done ); - } - - return( NULL ); -} -#endif /*HAVE_THREADS*/ - -static WriteBuffer * -wbuffer_new( im_threadgroup_t *tg ) -{ - WriteBuffer *wbuffer; - - if( !(wbuffer = IM_NEW( NULL, WriteBuffer )) ) - return( NULL ); - wbuffer->tg = tg; - wbuffer->region = NULL; - im_semaphore_init( &wbuffer->go, 0, "go" ); - im_semaphore_init( &wbuffer->nwrite, 0, "nwrite" ); - im_semaphore_init( &wbuffer->done, 0, "done" ); - wbuffer->write_errno = 0; - wbuffer->thread = NULL; - wbuffer->kill = FALSE; - - if( !(wbuffer->region = im_region_create( tg->im )) ) { - wbuffer_free( wbuffer ); - return( NULL ); - } - -#ifdef HAVE_THREADS - /* Make this last (picks up parts of wbuffer on startup). - */ - if( !(wbuffer->thread = g_thread_create( wbuffer_write_thread, wbuffer, - TRUE, NULL )) ) { - im_error( "wbuffer_new", _( "unable to create thread" ) ); - wbuffer_free( wbuffer ); - return( NULL ); - } -#endif /*HAVE_THREADS*/ - - return( wbuffer ); -} - -/* At end of work_fn ... need to tell wbuffer write thread that we're done. - */ -static int -wbuffer_work_fn( REGION *region, WriteBuffer *wbuffer ) -{ - im_semaphore_upn( &wbuffer->nwrite, 1 ); - - return( 0 ); -} - -/* Attach a wbuffer to a position. - */ -static int -wbuffer_position( WriteBuffer *wbuffer, - int left, int top, int width, int height ) -{ - Rect image, area; - - image.left = 0; - image.top = 0; - image.width = wbuffer->tg->im->Xsize; - image.height = wbuffer->tg->im->Ysize; - - area.left = left; - area.top = top; - area.width = width; - area.height = height; - - im_rect_intersectrect( &area, &image, &wbuffer->area ); - if( im_region_buffer( wbuffer->region, &wbuffer->area ) ) - return( -1 ); - - /* This should be an exclusive buffer, hopefully. - */ - assert( !wbuffer->region->buffer->done ); - - return( 0 ); -} - -/* Loop over a wbuffer filling it threadily. - */ -static int -wbuffer_fill( WriteBuffer *wbuffer ) -{ - Rect *area = &wbuffer->area; - im_threadgroup_t *tg = wbuffer->tg; - IMAGE *im = tg->im; - Rect image; - - int x, y; - -#ifdef DEBUG_IO - printf( "wbuffer_fill: starting for wbuffer %p at line %d\n", - wbuffer, area->top ); -#endif /*DEBUG_IO*/ - - image.left = 0; - image.top = 0; - image.width = im->Xsize; - image.height = im->Ysize; - - /* Loop over area, sparking threads for all sub-parts in turn. - */ - for( y = area->top; y < IM_RECT_BOTTOM( area ); y += tg->ph ) - for( x = area->left; x < IM_RECT_RIGHT( area ); x += tg->pw ) { - im_thread_t *thr; - Rect pos; - Rect clipped; - - /* thrs appear on idle when the child thread does - * threadgroup_idle_add and hits the 'go' semaphore. - */ - thr = im_threadgroup_get( tg ); - - /* Set the position we want to generate with this - * thread. Clip against the size of the image and the - * space available in or. - */ - pos.left = x; - pos.top = y; - pos.width = tg->pw; - pos.height = tg->ph; - im_rect_intersectrect( &pos, &image, &clipped ); - im_rect_intersectrect( &clipped, area, &clipped ); - - /* Note params. - */ - thr->oreg = wbuffer->region; - thr->pos = clipped; - thr->x = clipped.left; - thr->y = clipped.top; - thr->a = wbuffer; - -#ifdef DEBUG_IO - printf( "wbuffer_fill: starting for tile at %d x %d\n", - x, y ); -#endif /*DEBUG_IO*/ - - /* Add writer to n of writers on wbuffer, set it going. - */ - im_semaphore_upn( &wbuffer->nwrite, -1 ); - im_threadgroup_trigger( thr ); - - /* Trigger any eval callbacks on our source image. - */ - im__handle_eval( tg->im, tg->pw, tg->ph ); - - /* Check for errors. - */ - if( im_threadgroup_iserror( tg ) ) { - /* Don't kill threads yet ... we may want to - * get some error stuff out of them. - */ - im_threadgroup_wait( tg ); - return( -1 ); - } - } - - return( 0 ); -} - -/* Eval to file. - */ -static int -wbuffer_eval_to_file( WriteBuffer *b1, WriteBuffer *b2 ) -{ - im_threadgroup_t *tg = b1->tg; - IMAGE *im = tg->im; - int y; - - assert( b1->tg == b2->tg ); - -#ifdef DEBUG_IO - int nstrips; - - nstrips = 0; - printf( "wbuffer_eval_to_file: partial image output to file\n" ); -#endif /*DEBUG_IO*/ - - /* Note we'll be working to fill a contigious area. - */ - tg->inplace = 1; - - /* What threads do at the end of each tile ... decrement the nwrite - * semaphore. - */ - tg->work = (im__work_fn) wbuffer_work_fn; - - /* Fill to in steps, write each to the output. - */ - for( y = 0; y < im->Ysize; y += tg->nlines ) { - /* Attach to this position in image. - */ - if( wbuffer_position( b1, 0, y, im->Xsize, tg->nlines ) ) - return( -1 ); - - /* Spark off threads to fill with data. - */ - if( wbuffer_fill( b1 ) ) - return( -1 ); - - /* We have to keep the ordering on wbuffer writes, so we can't - * have more than one background write going at once. Plus we - * want to make sure write()s don't get interleaved. Wait for - * the previous BG write (if any) to finish. - */ - if( y > 0 ) { - im_semaphore_down( &b2->done ); - - /* Previous write suceeded? - */ - if( b2->write_errno ) { - im_error_system( b2->write_errno, - "im__eval_to_file", - _( "write failed" ) ); - return( -1 ); - } - } - - /* b1 write can go. - */ - im_semaphore_up( &b1->go ); - -#ifndef HAVE_THREADS - /* No threading ... just write. - */ - wbuffer_write( b1 ); -#endif /*HAVE_THREADS*/ - - /* Rotate wbuffers. - */ - { - WriteBuffer *t; - - t = b1; b1 = b2; b2 = t; - } - -#ifdef DEBUG_IO - nstrips++; -#endif /*DEBUG_IO*/ - } - - /* Wait for all threads to finish, check for any errors. - */ - im_threadgroup_wait( tg ); - im_semaphore_down( &b2->done ); - if( im_threadgroup_iserror( tg ) ) - return( -1 ); - if( b1->write_errno || b2->write_errno ) { - im_error_system( - b1->write_errno ? b1->write_errno : b2->write_errno, - "im__eval_to_file", _( "write failed" ) ); - return( -1 ); - } - -#ifdef DEBUG_IO - printf( "wbuffer_eval_to_file: success! %d strips written\n", nstrips ); -#endif /*DEBUG_IO*/ - - return( 0 ); -} - -static int -eval_to_file( im_threadgroup_t *tg ) -{ - WriteBuffer *b1, *b2; - - b1 = wbuffer_new( tg ); - b2 = wbuffer_new( tg ); - - if( !b1 || !b2 || wbuffer_eval_to_file( b1, b2 ) ) { - IM_FREEF( wbuffer_free, b1 ); - IM_FREEF( wbuffer_free, b2 ); - - return( -1 ); - } - - wbuffer_free( b1 ); - wbuffer_free( b2 ); - - return( 0 ); -} - /* Attach a generate function to an image. */ int @@ -782,7 +402,8 @@ im_generate( IMAGE *im, return( -1 ); } if( im->dtype == IM_OPENOUT ) - res = eval_to_file( tg ); + res = im_wbuffer( tg, + im_wbuffer_write_vips, NULL, NULL ); else res = eval_to_memory( tg, or ); diff --git a/libsrc/iofuncs/im_write_buffer.c b/libsrc/iofuncs/im_write_buffer.c new file mode 100644 index 00000000..df39cb93 --- /dev/null +++ b/libsrc/iofuncs/im_write_buffer.c @@ -0,0 +1,451 @@ +/* Double-buffered write. + * + * 2/11/07 + * - cut from im_generate + */ + +/* + + This file is part of VIPS. + + VIPS is free software; you can redistribute it and/or modify + it under the terms of the GNU Lesser General Public License as published by + the Free Software Foundation; either version 2 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + + */ + +/* + + These files are distributed with VIPS - http://www.vips.ecs.soton.ac.uk + + */ + +/* +#define DEBUG + */ + +#ifdef HAVE_CONFIG_H +#include +#endif /*HAVE_CONFIG_H*/ +#include + +#include +#include +#include +#include +#include +#include +#include +#ifdef HAVE_UNISTD_H +#include +#endif /*HAVE_UNISTD_H*/ + +#include +#include +#include + +#ifdef WITH_DMALLOC +#include +#endif /*WITH_DMALLOC*/ + +/* A buffer we are going to write to disc in a background thread. + */ +typedef struct _WriteBuffer { + im_threadgroup_t *tg; /* What makes the pixels */ + REGION *region; /* Pixels */ + Rect area; /* Part of image this region covers */ + im_semaphore_t go; /* Start bg thread loop */ + im_semaphore_t nwrite; /* Number of threads writing to region */ + im_semaphore_t done; /* Bg thread has done write */ + int write_errno; /* Save write errors here */ + GThread *thread; /* BG writer thread */ + gboolean kill; /* Set to ask thread to exit */ + im_wbuffer_fn write_fn; /* BG write with this */ + void *a; /* Client data */ + void *b; +} WriteBuffer; + +/* A write function for VIPS images. Just write() the pixel data. + */ +int +im_wbuffer_write_vips( REGION *region, Rect *area, void *a, void *b ) +{ + size_t nwritten, count; + void *buf; + + count = region->bpl * area->height; + buf = IM_REGION_ADDR( region, 0, area->top ); + do { + nwritten = write( region->im->fd, buf, count ); + + /* Write failed? Note in wbuffer errno for the main + * thread to pick up. + */ + if( nwritten == (size_t) -1 ) + return( errno ); + + buf = (void *) ((char *) buf + nwritten); + count -= nwritten; + } while( count > 0 ); + + return( 0 ); +} + +static void +wbuffer_free( WriteBuffer *wbuffer ) +{ + /* Is there a thread running this region? Kill it! + */ + if( wbuffer->thread ) { + wbuffer->kill = TRUE; + im_semaphore_up( &wbuffer->go ); + + /* Return value is always NULL (see wbuffer_write_thread). + */ + (void) g_thread_join( wbuffer->thread ); +#ifdef DEBUG_CREATE + printf( "wbuffer_free: g_thread_join()\n" ); +#endif /*DEBUG_CREATE*/ + + wbuffer->thread = NULL; + } + + IM_FREEF( im_region_free, wbuffer->region ); + im_semaphore_destroy( &wbuffer->go ); + im_semaphore_destroy( &wbuffer->nwrite ); + im_semaphore_destroy( &wbuffer->done ); + im_free( wbuffer ); +} + +static void +wbuffer_write( WriteBuffer *wbuffer ) +{ + wbuffer->write_errno = wbuffer->write_fn( wbuffer->region, + &wbuffer->area, wbuffer->a, wbuffer->b ); + +#ifdef DEBUG + printf( "wbuffer_write: %d bytes from wbuffer %p\n", + region->bpl * area->height, wbuffer ); +#endif /*DEBUG*/ +} + +#ifdef HAVE_THREADS +/* Run this as a thread to do a BG write. + */ +static void * +wbuffer_write_thread( void *data ) +{ + WriteBuffer *wbuffer = (WriteBuffer *) data; + + for(;;) { + im_semaphore_down( &wbuffer->go ); + + if( wbuffer->kill ) + break; + + /* Wait for all writer threads to leave this wbuffer. + */ + im_semaphore_downn( &wbuffer->nwrite, 0 ); + + wbuffer_write( wbuffer ); + + /* Signal write complete. + */ + im_semaphore_up( &wbuffer->done ); + } + + return( NULL ); +} +#endif /*HAVE_THREADS*/ + +static WriteBuffer * +wbuffer_new( im_threadgroup_t *tg, im_wbuffer_fn write_fn, void *a, void *b ) +{ + WriteBuffer *wbuffer; + + if( !(wbuffer = IM_NEW( NULL, WriteBuffer )) ) + return( NULL ); + wbuffer->tg = tg; + wbuffer->region = NULL; + im_semaphore_init( &wbuffer->go, 0, "go" ); + im_semaphore_init( &wbuffer->nwrite, 0, "nwrite" ); + im_semaphore_init( &wbuffer->done, 0, "done" ); + wbuffer->write_errno = 0; + wbuffer->thread = NULL; + wbuffer->kill = FALSE; + wbuffer->write_fn = write_fn; + wbuffer->a = a; + wbuffer->b = b; + + if( !(wbuffer->region = im_region_create( tg->im )) ) { + wbuffer_free( wbuffer ); + return( NULL ); + } + +#ifdef HAVE_THREADS + /* Make this last (picks up parts of wbuffer on startup). + */ + if( !(wbuffer->thread = g_thread_create( wbuffer_write_thread, wbuffer, + TRUE, NULL )) ) { + im_error( "wbuffer_new", _( "unable to create thread" ) ); + wbuffer_free( wbuffer ); + return( NULL ); + } +#endif /*HAVE_THREADS*/ + + return( wbuffer ); +} + +/* At end of work_fn ... need to tell wbuffer write thread that we're done. + */ +static int +wbuffer_work_fn( REGION *region, WriteBuffer *wbuffer ) +{ + im_semaphore_upn( &wbuffer->nwrite, 1 ); + + return( 0 ); +} + +/* Attach a wbuffer to a position. + */ +static int +wbuffer_position( WriteBuffer *wbuffer, + int left, int top, int width, int height ) +{ + Rect image, area; + + image.left = 0; + image.top = 0; + image.width = wbuffer->tg->im->Xsize; + image.height = wbuffer->tg->im->Ysize; + + area.left = left; + area.top = top; + area.width = width; + area.height = height; + + im_rect_intersectrect( &area, &image, &wbuffer->area ); + if( im_region_buffer( wbuffer->region, &wbuffer->area ) ) + return( -1 ); + + /* This should be an exclusive buffer, hopefully. + */ + assert( !wbuffer->region->buffer->done ); + + return( 0 ); +} + +/* Loop over a wbuffer filling it threadily. + */ +static int +wbuffer_fill( WriteBuffer *wbuffer ) +{ + Rect *area = &wbuffer->area; + im_threadgroup_t *tg = wbuffer->tg; + IMAGE *im = tg->im; + Rect image; + + int x, y; + +#ifdef DEBUG + printf( "wbuffer_fill: starting for wbuffer %p at line %d\n", + wbuffer, area->top ); +#endif /*DEBUG*/ + + image.left = 0; + image.top = 0; + image.width = im->Xsize; + image.height = im->Ysize; + + /* Loop over area, sparking threads for all sub-parts in turn. + */ + for( y = area->top; y < IM_RECT_BOTTOM( area ); y += tg->ph ) + for( x = area->left; x < IM_RECT_RIGHT( area ); x += tg->pw ) { + im_thread_t *thr; + Rect pos; + Rect clipped; + + /* thrs appear on idle when the child thread does + * threadgroup_idle_add and hits the 'go' semaphore. + */ + thr = im_threadgroup_get( tg ); + + /* Set the position we want to generate with this + * thread. Clip against the size of the image and the + * space available in or. + */ + pos.left = x; + pos.top = y; + pos.width = tg->pw; + pos.height = tg->ph; + im_rect_intersectrect( &pos, &image, &clipped ); + im_rect_intersectrect( &clipped, area, &clipped ); + + /* Note params. + */ + thr->oreg = wbuffer->region; + thr->pos = clipped; + thr->x = clipped.left; + thr->y = clipped.top; + thr->a = wbuffer; + +#ifdef DEBUG + printf( "wbuffer_fill: starting for tile at %d x %d\n", + x, y ); +#endif /*DEBUG*/ + + /* Add writer to n of writers on wbuffer, set it going. + */ + im_semaphore_upn( &wbuffer->nwrite, -1 ); + im_threadgroup_trigger( thr ); + + /* Trigger any eval callbacks on our source image. + */ + im__handle_eval( tg->im, tg->pw, tg->ph ); + + /* Check for errors. + */ + if( im_threadgroup_iserror( tg ) ) { + /* Don't kill threads yet ... we may want to + * get some error stuff out of them. + */ + im_threadgroup_wait( tg ); + return( -1 ); + } + } + + return( 0 ); +} + +/* Eval to file. + */ +static int +wbuffer_eval_to_file( WriteBuffer *b1, WriteBuffer *b2 ) +{ + im_threadgroup_t *tg = b1->tg; + IMAGE *im = tg->im; + int y; + + assert( b1->tg == b2->tg ); + +#ifdef DEBUG + int nstrips; + + nstrips = 0; + printf( "wbuffer_eval_to_file: partial image output to file\n" ); +#endif /*DEBUG*/ + + /* Note we'll be working to fill a contigious area. + */ + tg->inplace = 1; + + /* What threads do at the end of each tile ... decrement the nwrite + * semaphore. + */ + tg->work = (im__work_fn) wbuffer_work_fn; + + /* Fill to in steps, write each to the output. + */ + for( y = 0; y < im->Ysize; y += tg->nlines ) { + /* Attach to this position in image. + */ + if( wbuffer_position( b1, 0, y, im->Xsize, tg->nlines ) ) + return( -1 ); + + /* Spark off threads to fill with data. + */ + if( wbuffer_fill( b1 ) ) + return( -1 ); + + /* We have to keep the ordering on wbuffer writes, so we can't + * have more than one background write going at once. Plus we + * want to make sure write()s don't get interleaved. Wait for + * the previous BG write (if any) to finish. + */ + if( y > 0 ) { + im_semaphore_down( &b2->done ); + + /* Previous write suceeded? + */ + if( b2->write_errno ) { + im_error_system( b2->write_errno, + "im__eval_to_file", + _( "write failed" ) ); + return( -1 ); + } + } + + /* b1 write can go. + */ + im_semaphore_up( &b1->go ); + +#ifndef HAVE_THREADS + /* No threading ... just write. + */ + wbuffer_write( b1 ); +#endif /*HAVE_THREADS*/ + + /* Rotate wbuffers. + */ + { + WriteBuffer *t; + + t = b1; b1 = b2; b2 = t; + } + +#ifdef DEBUG + nstrips++; +#endif /*DEBUG*/ + } + + /* Wait for all threads to finish, check for any errors. + */ + im_threadgroup_wait( tg ); + im_semaphore_down( &b2->done ); + if( im_threadgroup_iserror( tg ) ) + return( -1 ); + if( b1->write_errno || b2->write_errno ) { + im_error_system( + b1->write_errno ? b1->write_errno : b2->write_errno, + "im__eval_to_file", _( "write failed" ) ); + return( -1 ); + } + +#ifdef DEBUG + printf( "wbuffer_eval_to_file: success! %d strips written\n", nstrips ); +#endif /*DEBUG*/ + + return( 0 ); +} + +int +im_wbuffer( im_threadgroup_t *tg, + im_wbuffer_fn write_fn, void *a, void *b ) +{ + WriteBuffer *b1, *b2; + + b1 = wbuffer_new( tg, write_fn, a, b ); + b2 = wbuffer_new( tg, write_fn, a, b ); + + if( !b1 || !b2 || wbuffer_eval_to_file( b1, b2 ) ) { + IM_FREEF( wbuffer_free, b1 ); + IM_FREEF( wbuffer_free, b2 ); + + return( -1 ); + } + + wbuffer_free( b1 ); + wbuffer_free( b2 ); + + return( 0 ); +}