/* Manage pipelines of partial images. * * J.Cupitt, 17/4/93. * 1/7/93 JC * - adapted for partial v2 * - ANSIfied * 6/7/93 JC * - im_setupout() conventions clarified - see autorewind in * im_iocheck(). * 20/7/93 JC * - eval callbacks added * 7/9/93 JC * - demand hint mechanism added * 25/10/93 * - asynchronous output mechanisms removed, as no observable speed-up * 9/5/94 * - new thread stuff added, with a define to turn it off * 15/8/94 * - start & stop functions can now be NULL for no-op * 7/10/94 JC * - evalend callback system added * 23/12/94 JC * - IM_ARRAY uses added * 22/2/95 JC * - im_fill_copy() added * - im_region_region() uses modified * 24/4/95 JC & KM * - im_fill_lines() bug removed * 30/8/96 JC * - revised and simplified ... some code shared with im_iterate() * - new im_generate_region() added * 2/3/98 JC * - IM_ANY added * 20/7/99 JC * - tile geometry made into ints for easy tuning * 30/7/99 RP JC * - threads reorganised for POSIX * 29/9/99 JC * - threadgroup stuff added * 15/4/04 * - better how-many-pixels-calculated * 27/11/06 * - merge background write stuff */ /* This file is part of VIPS. VIPS is free software; you can redistribute it and/or modify it under the terms of the GNU Lesser General Public License as published by the Free Software Foundation; either version 2 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more details. You should have received a copy of the GNU Lesser General Public License along with this program; if not, write to the Free Software Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ /* These files are distributed with VIPS - http://www.vips.ecs.soton.ac.uk */ /* #define DEBUG_IO */ #ifdef HAVE_CONFIG_H #include #endif /*HAVE_CONFIG_H*/ #include #include #include #include #include #include #include #include #ifdef HAVE_UNISTD_H #include #endif /*HAVE_UNISTD_H*/ #include #include #include #ifdef WITH_DMALLOC #include #endif /*WITH_DMALLOC*/ /* Start and stop functions for one image in, input image is first user data. */ void * im_start_one( IMAGE *out, IMAGE *in, void *dummy ) { return( im_region_create( in ) ); } int im_stop_one( REGION *reg, void *dummy1, void *dummy2 ) { im_region_free( reg ); return( 0 ); } /* Stop and start functions for many images in. First client is pointer to * null-terminated array of input images. */ int im_stop_many( REGION **ar, void *dummy1, void *dummy2 ) { int i; if( ! ar ) return 0; for( i = 0; ar[i]; i++ ) im_region_free( ar[i] ); im_free( (char *) ar ); return( 0 ); } void * im_start_many( IMAGE *out, IMAGE **in, void *dummy ) { int i, n; REGION **ar; /* How many images? */ for( n = 0; in[n]; n++ ) ; /* Alocate space for region array. */ if( !(ar = IM_ARRAY( NULL, n + 1, REGION * )) ) return( NULL ); /* Create a set of regions. */ for( i = 0; i < n; i++ ) if( !(ar[i] = im_region_create( in[i] )) ) { im_stop_many( ar, NULL, NULL ); return( NULL ); } ar[n] = NULL; return( ar ); } /* Convenience function - make a null-terminated array of input images. * Use with im_start_many. */ IMAGE ** im_allocate_input_array( IMAGE *out, ... ) { va_list ap; IMAGE **ar; IMAGE *im; int i, n; /* Count input images. */ va_start( ap, out ); for( n = 0; (im = va_arg( ap, IMAGE * )); n++ ) ; va_end( ap ); /* Allocate array. */ if( !(ar = IM_ARRAY( out, n + 1, IMAGE * )) ) return( NULL ); /* Fill array. */ va_start( ap, out ); for( i = 0; i < n; i++ ) ar[i] = va_arg( ap, IMAGE * ); va_end( ap ); ar[n] = NULL; return( ar ); } /* Loop over a big region, filling it in many small pieces with threads. */ static int eval_to_region( REGION *or, im_threadgroup_t *tg ) { Rect *r = &or->valid; Rect image; int x, y; image.left = 0; image.top = 0; image.width = or->im->Xsize; image.height = or->im->Ysize; /* Note we'll be working to fill a contigious area. */ tg->inplace = 1; /* Loop over or, attaching to all sub-parts in turn. */ for( y = r->top; y < IM_RECT_BOTTOM( r ); y += tg->ph ) for( x = r->left; x < IM_RECT_RIGHT( r ); x += tg->pw ) { im_thread_t *thr; Rect pos; Rect clipped; /* thrs appear on idle when the child thread does * threadgroup_idle_add and hits the 'go' semaphore. */ thr = im_threadgroup_get( tg ); /* Set the position we want to generate with this * thread. Clip against the size of the image and the * space available in or. */ pos.left = x; pos.top = y; pos.width = tg->pw; pos.height = tg->ph; im_rect_intersectrect( &pos, &image, &clipped ); im_rect_intersectrect( &clipped, r, &clipped ); /* Note params and start work. */ thr->oreg = or; thr->pos = clipped; thr->x = clipped.left; thr->y = clipped.top; im_threadgroup_trigger( thr ); /* Trigger any eval callbacks on our source image. */ im__handle_eval( tg->im, tg->pw, tg->ph ); /* Check for errors. */ if( im_threadgroup_iserror( tg ) ) { /* Don't kill threads yet ... we may want to * get some error stuff out of them. */ im_threadgroup_wait( tg ); return( -1 ); } } /* Wait for all threads to hit 'go' again. */ im_threadgroup_wait( tg ); if( im_threadgroup_iserror( tg ) ) return( -1 ); return( 0 ); } /* Output to a memory area. Might be im_setbuf(), im_mmapin()/im_makerw() or * im_mmapinrw(). */ static int eval_to_memory( im_threadgroup_t *tg, REGION *or ) { int y, chunk; IMAGE *im = or->im; #ifdef DEBUG_IO int ntiles = 0; printf( "eval_to_memory: partial image output to memory area\n" ); #endif /*DEBUG_IO*/ /* Choose a chunk size ... 1/100th of the height of the image, about. * This sets the granularity of user feedback on eval progress, but * does not affect mem requirements etc. */ chunk = (im->Ysize / 100) + 1; /* Loop down the output image, evaling each chunk. */ for( y = 0; y < im->Ysize; y += chunk ) { Rect pos; /* Attach or to this position in image. */ pos.left = 0; pos.top = y; pos.width = im->Xsize; pos.height = chunk; if( im_region_image( or, &pos ) ) return( -1 ); /* Ask for evaluation of this area. */ if( eval_to_region( or, tg ) ) return( -1 ); #ifdef DEBUG_IO ntiles++; #endif /*DEBUG_IO*/ } #ifdef DEBUG_IO printf( "eval_to_memory: success! %d patches written\n", ntiles ); #endif /*DEBUG_IO*/ return( 0 ); } /* A buffer we are going to write to disc in a background thread. */ typedef struct _WriteBuffer { im_threadgroup_t *tg; /* What makes the pixels */ REGION *region; /* Pixels */ Rect area; /* Part of image this region covers */ im_semaphore_t go; /* Start bg thread loop */ im_semaphore_t nwrite; /* Number of threads writing to region */ im_semaphore_t done; /* Bg thread has done write */ int write_errno; /* Save write errors here */ GThread *thread; /* BG writer thread */ gboolean kill; /* Set to ask thread to exit */ } WriteBuffer; static void wbuffer_free( WriteBuffer *wbuffer ) { /* Is there a thread running this region? Kill it! */ if( wbuffer->thread ) { wbuffer->kill = TRUE; im_semaphore_up( &wbuffer->go ); /* Return value is always NULL (see wbuffer_write_thread). */ (void) g_thread_join( wbuffer->thread ); #ifdef DEBUG_CREATE printf( "wbuffer_free: g_thread_join()\n" ); #endif /*DEBUG_CREATE*/ wbuffer->thread = NULL; } IM_FREEF( im_region_free, wbuffer->region ); im_semaphore_destroy( &wbuffer->go ); im_semaphore_destroy( &wbuffer->nwrite ); im_semaphore_destroy( &wbuffer->done ); im_free( wbuffer ); } static void wbuffer_write( WriteBuffer *wbuffer ) { im_threadgroup_t *tg = wbuffer->tg; IMAGE *im = tg->im; REGION *region = wbuffer->region; Rect *area = &wbuffer->area; size_t nwritten, count; void *buf; count = region->bpl * area->height; buf = IM_REGION_ADDR( region, 0, area->top ); do { nwritten = write( im->fd, buf, count ); /* Write failed? Note in wbuffer errno for the main * thread to pick up. */ if( nwritten == (size_t) -1 ) { wbuffer->write_errno = errno; break; } buf = (void *) ((char *) buf + nwritten); count -= nwritten; } while( count > 0 ); #ifdef DEBUG_IO printf( "wbuffer_write: %d bytes from wbuffer %p\n", region->bpl * area->height, wbuffer ); #endif /*DEBUG_IO*/ } #ifdef HAVE_THREADS /* Run this as a thread to do a BG write. */ static void * wbuffer_write_thread( void *data ) { WriteBuffer *wbuffer = (WriteBuffer *) data; for(;;) { im_semaphore_down( &wbuffer->go ); if( wbuffer->kill ) break; /* Wait for all writer threads to leave this wbuffer. */ im_semaphore_downn( &wbuffer->nwrite, 0 ); wbuffer_write( wbuffer ); /* Signal write complete. */ im_semaphore_up( &wbuffer->done ); } return( NULL ); } #endif /*HAVE_THREADS*/ static WriteBuffer * wbuffer_new( im_threadgroup_t *tg ) { WriteBuffer *wbuffer; if( !(wbuffer = IM_NEW( NULL, WriteBuffer )) ) return( NULL ); wbuffer->tg = tg; wbuffer->region = NULL; im_semaphore_init( &wbuffer->go, 0, "go" ); im_semaphore_init( &wbuffer->nwrite, 0, "nwrite" ); im_semaphore_init( &wbuffer->done, 0, "done" ); wbuffer->write_errno = 0; wbuffer->thread = NULL; wbuffer->kill = FALSE; if( !(wbuffer->region = im_region_create( tg->im )) ) { wbuffer_free( wbuffer ); return( NULL ); } #ifdef HAVE_THREADS /* Make this last (picks up parts of wbuffer on startup). */ if( !(wbuffer->thread = g_thread_create( wbuffer_write_thread, wbuffer, TRUE, NULL )) ) { im_error( "wbuffer_new", _( "unable to create thread" ) ); wbuffer_free( wbuffer ); return( NULL ); } #endif /*HAVE_THREADS*/ return( wbuffer ); } /* At end of work_fn ... need to tell wbuffer write thread that we're done. */ static int wbuffer_work_fn( REGION *region, WriteBuffer *wbuffer ) { im_semaphore_upn( &wbuffer->nwrite, 1 ); return( 0 ); } /* Attach a wbuffer to a position. */ static int wbuffer_position( WriteBuffer *wbuffer, int left, int top, int width, int height ) { Rect image, area; image.left = 0; image.top = 0; image.width = wbuffer->tg->im->Xsize; image.height = wbuffer->tg->im->Ysize; area.left = left; area.top = top; area.width = width; area.height = height; im_rect_intersectrect( &area, &image, &wbuffer->area ); if( im_region_buffer( wbuffer->region, &wbuffer->area ) ) return( -1 ); /* This should be an exclusive buffer, hopefully. */ assert( !wbuffer->region->buffer->done ); return( 0 ); } /* Loop over a wbuffer filling it threadily. */ static int wbuffer_fill( WriteBuffer *wbuffer ) { Rect *area = &wbuffer->area; im_threadgroup_t *tg = wbuffer->tg; IMAGE *im = tg->im; Rect image; int x, y; #ifdef DEBUG_IO printf( "wbuffer_fill: starting for wbuffer %p at line %d\n", wbuffer, area->top ); #endif /*DEBUG_IO*/ image.left = 0; image.top = 0; image.width = im->Xsize; image.height = im->Ysize; /* Loop over area, sparking threads for all sub-parts in turn. */ for( y = area->top; y < IM_RECT_BOTTOM( area ); y += tg->ph ) for( x = area->left; x < IM_RECT_RIGHT( area ); x += tg->pw ) { im_thread_t *thr; Rect pos; Rect clipped; /* thrs appear on idle when the child thread does * threadgroup_idle_add and hits the 'go' semaphore. */ thr = im_threadgroup_get( tg ); /* Set the position we want to generate with this * thread. Clip against the size of the image and the * space available in or. */ pos.left = x; pos.top = y; pos.width = tg->pw; pos.height = tg->ph; im_rect_intersectrect( &pos, &image, &clipped ); im_rect_intersectrect( &clipped, area, &clipped ); /* Note params. */ thr->oreg = wbuffer->region; thr->pos = clipped; thr->x = clipped.left; thr->y = clipped.top; thr->a = wbuffer; #ifdef DEBUG_IO printf( "wbuffer_fill: starting for tile at %d x %d\n", x, y ); #endif /*DEBUG_IO*/ /* Add writer to n of writers on wbuffer, set it going. */ im_semaphore_upn( &wbuffer->nwrite, -1 ); im_threadgroup_trigger( thr ); /* Trigger any eval callbacks on our source image. */ im__handle_eval( tg->im, tg->pw, tg->ph ); /* Check for errors. */ if( im_threadgroup_iserror( tg ) ) { /* Don't kill threads yet ... we may want to * get some error stuff out of them. */ im_threadgroup_wait( tg ); return( -1 ); } } return( 0 ); } /* Eval to file. */ static int wbuffer_eval_to_file( WriteBuffer *b1, WriteBuffer *b2 ) { im_threadgroup_t *tg = b1->tg; IMAGE *im = tg->im; int y; assert( b1->tg == b2->tg ); #ifdef DEBUG_IO int nstrips; nstrips = 0; printf( "wbuffer_eval_to_file: partial image output to file\n" ); #endif /*DEBUG_IO*/ /* Note we'll be working to fill a contigious area. */ tg->inplace = 1; /* What threads do at the end of each tile ... decrement the nwrite * semaphore. */ tg->work = (im__work_fn) wbuffer_work_fn; /* Fill to in steps, write each to the output. */ for( y = 0; y < im->Ysize; y += tg->nlines ) { /* Attach to this position in image. */ if( wbuffer_position( b1, 0, y, im->Xsize, tg->nlines ) ) return( -1 ); /* Spark off threads to fill with data. */ if( wbuffer_fill( b1 ) ) return( -1 ); /* We have to keep the ordering on wbuffer writes, so we can't * have more than one background write going at once. Plus we * want to make sure write()s don't get interleaved. Wait for * the previous BG write (if any) to finish. */ if( y > 0 ) { im_semaphore_down( &b2->done ); /* Previous write suceeded? */ if( b2->write_errno ) { im_error_system( b2->write_errno, "im__eval_to_file", _( "write failed" ) ); return( -1 ); } } /* b1 write can go. */ im_semaphore_up( &b1->go ); #ifndef HAVE_THREADS /* No threading ... just write. */ wbuffer_write( b1 ); #endif /*HAVE_THREADS*/ /* Rotate wbuffers. */ { WriteBuffer *t; t = b1; b1 = b2; b2 = t; } #ifdef DEBUG_IO nstrips++; #endif /*DEBUG_IO*/ } /* Wait for all threads to finish, check for any errors. */ im_threadgroup_wait( tg ); im_semaphore_down( &b2->done ); if( im_threadgroup_iserror( tg ) ) return( -1 ); if( b1->write_errno || b2->write_errno ) { im_error_system( b1->write_errno ? b1->write_errno : b2->write_errno, "im__eval_to_file", _( "write failed" ) ); return( -1 ); } #ifdef DEBUG_IO printf( "wbuffer_eval_to_file: success! %d strips written\n", nstrips ); #endif /*DEBUG_IO*/ return( 0 ); } static int eval_to_file( im_threadgroup_t *tg ) { WriteBuffer *b1, *b2; b1 = wbuffer_new( tg ); b2 = wbuffer_new( tg ); if( !b1 || !b2 || wbuffer_eval_to_file( b1, b2 ) ) { IM_FREEF( wbuffer_free, b1 ); IM_FREEF( wbuffer_free, b2 ); return( -1 ); } wbuffer_free( b1 ); wbuffer_free( b2 ); return( 0 ); } /* Attach a generate function to an image. */ int im_generate( IMAGE *im, void *(*start_fn)(), int (*gen_fn)(), int (*stop_fn)(), void *a, void *b ) { int res; REGION *or; im_threadgroup_t *tg; if( im_image_sanity( im ) ) return( -1 ); if( im->Xsize <= 0 || im->Ysize <= 0 || im->Bands <= 0 ) { im_error( "im_generate", _( "bad dimensions" ) ); return( -1 ); } /* Look at output type to decide our action. */ switch( im->dtype ) { case IM_PARTIAL: /* Output to partial image. Just attach functions and return. */ if( im->generate || im->start || im->stop ) { im_error( "im_generate", _( "func already attached" ) ); return( -1 ); } im->start = start_fn; im->generate = gen_fn; im->stop = stop_fn; im->client1 = a; im->client2 = b; #ifdef DEBUG_IO printf( "im_generate: attaching partial callbacks\n" ); #endif /*DEBUG_IO*/ break; case IM_SETBUF: case IM_SETBUF_FOREIGN: case IM_MMAPINRW: case IM_OPENOUT: /* Eval now .. sanity check. */ if( im->generate || im->start || im->stop ) { im_error( "im_generate", _( "func already attached" ) ); return( -1 ); } /* Get output ready. */ if( im_setupout( im ) ) return( -1 ); /* Attach callbacks. */ im->start = start_fn; im->generate = gen_fn; im->stop = stop_fn; im->client1 = a; im->client2 = b; /* Evaluate. Two output styles: to memory area (im_setbuf() * or im_mmapinrw()) or to file (im_openout()). */ if( !(or = im_region_create( im )) ) return( -1 ); if( !(tg = im_threadgroup_create( im )) ) { im_region_free( or ); return( -1 ); } if( im->dtype == IM_OPENOUT ) res = eval_to_file( tg ); else res = eval_to_memory( tg, or ); /* Clean up. */ im_threadgroup_free( tg ); im_region_free( or ); /* Evaluation is now complete, with all sequences finished. * Trigger evalend callbacks, then free them to make sure we * don't trigger twice. */ res |= im__trigger_callbacks( im->evalendfns ); IM_FREEF( im_slist_free_all, im->evalendfns ); /* Error? */ if( res ) return( -1 ); break; default: /* Not a known output style. */ im_error( "im_generate", _( "unable to output to a %s image" ), im_dtype2char( im->dtype ) ); return( -1 ); } return( 0 ); } /* Generate a region of pixels ... with threads! Very like im_prepare(), but * threaded and does sub-division. */ int im_prepare_thread( im_threadgroup_t *tg, REGION *or, Rect *r ) { IMAGE *im = or->im; if( im_image_sanity( im ) ) return( -1 ); switch( im->dtype ) { case IM_PARTIAL: if( im_region_fill( or, r, (im_region_fill_fn) eval_to_region, tg ) ) return( -1 ); break; case IM_OPENIN: case IM_SETBUF: case IM_SETBUF_FOREIGN: case IM_MMAPIN: case IM_MMAPINRW: /* Attach to existing buffer. */ if( im_region_image( or, r ) ) return( -1 ); break; default: im_error( "im_prepare_thread", _( "unable to input from a %s " "image" ), im_dtype2char( im->dtype ) ); return( -1 ); } return( 0 ); }