new sinkmemory works, more png fixing

This commit is contained in:
John Cupitt 2012-02-17 17:12:51 +00:00
parent ff7d02a979
commit fcf052ab09
6 changed files with 146 additions and 234 deletions

View File

@ -313,14 +313,19 @@ png2vips_interlace( Read *read, VipsImage *out )
printf( "png2vips_interlace: reading whole image\n" ); printf( "png2vips_interlace: reading whole image\n" );
#endif /*DEBUG*/ #endif /*DEBUG*/
if( !(read->row_pointer = VIPS_ARRAY( NULL, out->Ysize, png_bytep )) || if( vips_image_write_prepare( out ) )
vips_image_write_prepare( out ) ) return( -1 );
if( setjmp( png_jmpbuf( read->pPng ) ) )
return( -1 );
if( !(read->row_pointer = VIPS_ARRAY( NULL, out->Ysize, png_bytep )) )
return( -1 ); return( -1 );
for( y = 0; y < out->Ysize; y++ ) for( y = 0; y < out->Ysize; y++ )
read->row_pointer[y] = VIPS_IMAGE_ADDR( out, 0, y ); read->row_pointer[y] = VIPS_IMAGE_ADDR( out, 0, y );
if( setjmp( png_jmpbuf( read->pPng ) ) ) png_set_interlace_handling( read->pPng );
return( -1 );
png_read_image( read->pPng, read->row_pointer ); png_read_image( read->pPng, read->row_pointer );
return( 0 ); return( 0 );

View File

@ -58,6 +58,7 @@ int vips_sink_screen( VipsImage *in, VipsImage *out, VipsImage *mask,
VipsSinkNotify notify, void *a ); VipsSinkNotify notify, void *a );
int vips_sink_memory( VipsImage *im ); int vips_sink_memory( VipsImage *im );
int vips_sink_memory2( VipsImage *image );
void *vips_start_one( VipsImage *out, void *a, void *b ); void *vips_start_one( VipsImage *out, void *a, void *b );
int vips_stop_one( void *seq, void *a, void *b ); int vips_stop_one( void *seq, void *a, void *b );

View File

@ -15,6 +15,7 @@ libiofuncs_la_SOURCES = \
sink.h \ sink.h \
sink.c \ sink.c \
sinkmemory.c \ sinkmemory.c \
sinkmemory2.c \
sinkdisc.c \ sinkdisc.c \
sinkscreen.c \ sinkscreen.c \
memory.c \ memory.c \

View File

@ -666,7 +666,8 @@ vips_image_generate( VipsImage *image,
res = vips_sink_disc( image, res = vips_sink_disc( image,
(VipsRegionWrite) write_vips, NULL ); (VipsRegionWrite) write_vips, NULL );
else else
res = vips_sink_memory( image ); // res = vips_sink_memory( image );
res = vips_sink_memory2( image );
/* Error? /* Error?
*/ */

View File

@ -350,13 +350,7 @@ wbuffer_allocate_fn( VipsThreadState *state, void *a, gboolean *stop )
/* Swap buffers. /* Swap buffers.
*/ */
{ VIPS_SWAP( WriteBuffer *, write->buf, write->buf_back );
WriteBuffer *t;
t = write->buf;
write->buf = write->buf_back;
write->buf_back = t;
}
/* Position buf at the new y. /* Position buf at the new y.
*/ */
@ -377,6 +371,9 @@ wbuffer_allocate_fn( VipsThreadState *state, void *a, gboolean *stop )
tile.width = sink_base->tile_width; tile.width = sink_base->tile_width;
tile.height = sink_base->tile_height; tile.height = sink_base->tile_height;
vips_rect_intersectrect( &image, &tile, &state->pos ); vips_rect_intersectrect( &image, &tile, &state->pos );
/* The thread needs to know which buffer it's writing to.
*/
wstate->buf = write->buf; wstate->buf = write->buf;
VIPS_DEBUG_MSG( " allocated %d x %d:\n", tile.left, tile.top ); VIPS_DEBUG_MSG( " allocated %d x %d:\n", tile.left, tile.top );

View File

@ -35,8 +35,8 @@
*/ */
/* /*
#define VIPS_DEBUG
*/ */
#define VIPS_DEBUG
#ifdef HAVE_CONFIG_H #ifdef HAVE_CONFIG_H
#include <config.h> #include <config.h>
@ -45,13 +45,6 @@
#include <stdio.h> #include <stdio.h>
#include <stdlib.h> #include <stdlib.h>
#include <stdarg.h>
#include <errno.h>
#include <string.h>
#include <sys/types.h>
#ifdef HAVE_UNISTD_H
#include <unistd.h>
#endif /*HAVE_UNISTD_H*/
#include <vips/vips.h> #include <vips/vips.h>
#include <vips/internal.h> #include <vips/internal.h>
@ -63,32 +56,37 @@
/* A part of the image we are writing. /* A part of the image we are writing.
*/ */
typedef struct _SinkMemoryBuffer { typedef struct _SinkMemoryArea {
struct _SinkMemory *write; struct _SinkMemory *memory;
VipsRegion *region; /* Pixels */ VipsRect rect; /* Part of image this area covers */
VipsSemaphore nwrite; /* Number of threads writing to region */ VipsSemaphore nwrite; /* Number of threads writing to this area */
} SinkMemoryBuffer; } SinkMemoryArea;
/* Per-call state. /* Per-call state.
*/ */
typedef struct _SinkMemory { typedef struct _SinkMemory {
SinkBase sink_base; SinkBase sink_base;
/* We are current writing tiles to buf, we'll delay starting a new /* We are current writing tiles to area, we'll delay starting a new
* buffer if buf_back (the previous position) hasn't completed. * area if old_area (the previous position) hasn't completed.
*/ */
SinkMemoryBuffer *buf; SinkMemoryArea *area;
SinkMemoryBuffer *buf_back; SinkMemoryArea *old_area;
/* A region covering the whole of the output image ... we write to
* this from many workers with vips_region_prepare_to().
*/
VipsRegion *region;
} SinkMemory; } SinkMemory;
/* Our per-thread state ... we need to also track the buffer that pos is /* Our per-thread state ... we need to also track the area that pos is
* supposed to write to. * supposed to write to.
*/ */
typedef struct _SinkMemoryThreadState { typedef struct _SinkMemoryThreadState {
VipsThreadState parent_object; VipsThreadState parent_object;
SinkMemoryBuffer *buf; SinkMemoryArea *area;
} SinkMemoryThreadState; } SinkMemoryThreadState;
typedef struct _SinkMemoryThreadStateClass { typedef struct _SinkMemoryThreadStateClass {
@ -104,167 +102,96 @@ sink_memory_thread_state_class_init( SinkMemoryThreadStateClass *class )
{ {
VipsObjectClass *object_class = VIPS_OBJECT_CLASS( class ); VipsObjectClass *object_class = VIPS_OBJECT_CLASS( class );
object_class->nickname = "writethreadstate"; object_class->nickname = "sinkmemorythreadstate";
object_class->description = _( "per-thread state for sinkmemory" ); object_class->description = _( "per-thread state for sinkmemory" );
} }
static void static void
write_thread_state_init( SinkMemoryThreadState *state ) sink_memory_thread_state_init( SinkMemoryThreadState *state )
{ {
state->buf = NULL;
} }
static VipsThreadState * static VipsThreadState *
write_thread_state_new( VipsImage *im, void *a ) sink_memory_thread_state_new( VipsImage *image, void *a )
{ {
return( VIPS_THREAD_STATE( vips_object_new( return( VIPS_THREAD_STATE( vips_object_new(
write_thread_state_get_type(), sink_memory_thread_state_get_type(),
vips_thread_state_set, im, a ) ) ); vips_thread_state_set, image, a ) ) );
} }
static void static void
wbuffer_free( SinkMemoryBuffer *wbuffer ) sink_memory_area_free( SinkMemoryArea *area )
{ {
VIPS_UNREF( wbuffer->region ); vips_semaphore_destroy( &area->nwrite );
vips_semaphore_destroy( &wbuffer->nwrite ); vips_free( area );
vips_free( wbuffer );
} }
static SinkMemoryBuffer * static SinkMemoryArea *
wbuffer_new( SinkMemory *write ) sink_memory_area_new( SinkMemory *memory )
{ {
SinkMemoryBuffer *wbuffer; SinkMemoryArea *area;
if( !(wbuffer = VIPS_NEW( NULL, SinkMemoryBuffer )) ) if( !(area = VIPS_NEW( NULL, SinkMemoryArea )) )
return( NULL ); return( NULL );
wbuffer->write = write; area->memory = memory;
wbuffer->region = NULL; vips_semaphore_init( &area->nwrite, 0, "nwrite" );
vips_semaphore_init( &wbuffer->nwrite, 0, "nwrite" );
if( !(wbuffer->region = vips_region_new( write->sink_base.im )) ) { return( area );
wbuffer_free( wbuffer );
return( NULL );
}
/* The worker threads need to be able to move the buffers around.
*/
vips__region_no_ownership( wbuffer->region );
#ifdef HAVE_THREADS
/* Make this last (picks up parts of wbuffer on startup).
*/
if( !(wbuffer->thread = g_thread_create( wbuffer_write_thread, wbuffer,
TRUE, NULL )) ) {
vips_error( "wbuffer_new",
"%s", _( "unable to create thread" ) );
wbuffer_free( wbuffer );
return( NULL );
}
#endif /*HAVE_THREADS*/
return( wbuffer );
} }
/* Block until the previous write completes, then write the front buffer. /* Move an area to a position.
*/ */
static int static void
wbuffer_flush( SinkMemory *write ) sink_memory_area_position( SinkMemoryArea *area, int top, int height )
{ {
VIPS_DEBUG_MSG( "wbuffer_flush:\n" ); SinkMemory *memory = area->memory;
/* Block until the other buffer has been written. We have to do this VipsRect all, rect;
* before we can set this buffer writing or we'll lose output ordering.
*/
if( write->buf->area.top > 0 ) {
vips_semaphore_down( &write->buf_back->done );
/* Previous write suceeded? all.left = 0;
*/ all.top = 0;
if( write->buf_back->write_errno ) { all.width = memory->sink_base.im->Xsize;
vips_error_system( write->buf_back->write_errno, all.height = memory->sink_base.im->Ysize;
"wbuffer_write", "%s", _( "write failed" ) );
return( -1 );
}
}
/* Set the background writer going for this buffer. rect.left = 0;
*/ rect.top = top;
#ifdef HAVE_THREADS rect.width = memory->sink_base.im->Xsize;
vips_semaphore_up( &write->buf->go ); rect.height = height;
#else
/* No threads? SinkMemory ourselves synchronously.
*/
wbuffer_write( write->buf );
#endif /*HAVE_THREADS*/
return( 0 ); vips_rect_intersectrect( &all, &rect, &area->rect );
}
/* Move a wbuffer to a position.
*/
static int
wbuffer_position( SinkMemoryBuffer *wbuffer, int top, int height )
{
VipsRect image, area;
int result;
image.left = 0;
image.top = 0;
image.width = wbuffer->write->sink_base.im->Xsize;
image.height = wbuffer->write->sink_base.im->Ysize;
area.left = 0;
area.top = top;
area.width = wbuffer->write->sink_base.im->Xsize;
area.height = height;
vips_rect_intersectrect( &area, &image, &wbuffer->area );
/* The workers take turns to move the buffers.
*/
vips__region_take_ownership( wbuffer->region );
result = vips_region_buffer( wbuffer->region, &wbuffer->area );
vips__region_no_ownership( wbuffer->region );
/* This should be an exclusive buffer, hopefully.
*/
g_assert( !wbuffer->region->buffer->done );
return( result );
} }
/* Our VipsThreadpoolAllocate function ... move the thread to the next tile /* Our VipsThreadpoolAllocate function ... move the thread to the next tile
* that needs doing. If no buffer is available (the bg writer hasn't yet * that needs doing. If we fill the current area, we block until the previous
* finished with it), we block. If all tiles are done, we return FALSE to end * area is finished, then swap areas.
* If all tiles are done, we return FALSE to end
* iteration. * iteration.
*/ */
static gboolean static gboolean
wbuffer_allocate_fn( VipsThreadState *state, void *a, gboolean *stop ) sink_memory_area_allocate_fn( VipsThreadState *state, void *a, gboolean *stop )
{ {
SinkMemoryThreadState *wstate = (SinkMemoryThreadState *) state; SinkMemoryThreadState *wstate = (SinkMemoryThreadState *) state;
SinkMemory *write = (SinkMemory *) a; SinkMemory *memory = (SinkMemory *) a;
SinkBase *sink_base = (SinkBase *) write; SinkBase *sink_base = (SinkBase *) memory;
VipsRect image; VipsRect image;
VipsRect tile; VipsRect tile;
VIPS_DEBUG_MSG( "wbuffer_allocate_fn:\n" ); VIPS_DEBUG_MSG( "sink_memory_area_allocate_fn:\n" );
/* Is the state x/y OK? New line or maybe new buffer or maybe even /* Is the state x/y OK? New line or maybe new buffer or maybe even
* all done. * all done.
*/ */
if( sink_base->x >= write->buf->area.width ) { if( sink_base->x >= memory->area->rect.width ) {
sink_base->x = 0; sink_base->x = 0;
sink_base->y += sink_base->tile_height; sink_base->y += sink_base->tile_height;
if( sink_base->y >= VIPS_RECT_BOTTOM( &write->buf->area ) ) { if( sink_base->y >= VIPS_RECT_BOTTOM( &memory->area->rect ) ) {
/* Block until the last write is done, then set write /* Block until the previous area is done.
* of the front buffer going.
*/ */
if( wbuffer_flush( write ) ) if( memory->area->rect.top > 0 )
return( -1 ); vips_semaphore_downn(
&memory->old_area->nwrite, 0 );
/* End of image? /* End of image?
*/ */
@ -275,19 +202,13 @@ wbuffer_allocate_fn( VipsThreadState *state, void *a, gboolean *stop )
/* Swap buffers. /* Swap buffers.
*/ */
{ VIPS_SWAP( SinkMemoryArea *,
SinkMemoryBuffer *t; memory->area, memory->old_area );
t = write->buf;
write->buf = write->buf_back;
write->buf_back = t;
}
/* Position buf at the new y. /* Position buf at the new y.
*/ */
if( wbuffer_position( write->buf, sink_memory_area_position( memory->area,
sink_base->y, sink_base->nlines ) ) sink_base->y, sink_base->nlines );
return( -1 );
} }
} }
@ -302,13 +223,16 @@ wbuffer_allocate_fn( VipsThreadState *state, void *a, gboolean *stop )
tile.width = sink_base->tile_width; tile.width = sink_base->tile_width;
tile.height = sink_base->tile_height; tile.height = sink_base->tile_height;
vips_rect_intersectrect( &image, &tile, &state->pos ); vips_rect_intersectrect( &image, &tile, &state->pos );
wstate->buf = write->buf;
/* The thread needs to know which area it's writing to.
*/
wstate->area = memory->area;
VIPS_DEBUG_MSG( " allocated %d x %d:\n", tile.left, tile.top ); VIPS_DEBUG_MSG( " allocated %d x %d:\n", tile.left, tile.top );
/* Add to the number of writers on the buffer. /* Add to the number of writers on the area.
*/ */
vips_semaphore_upn( &write->buf->nwrite, -1 ); vips_semaphore_upn( &memory->area->nwrite, -1 );
/* Move state on. /* Move state on.
*/ */
@ -320,119 +244,102 @@ wbuffer_allocate_fn( VipsThreadState *state, void *a, gboolean *stop )
/* Our VipsThreadpoolWork function ... generate a tile! /* Our VipsThreadpoolWork function ... generate a tile!
*/ */
static int static int
wbuffer_work_fn( VipsThreadState *state, void *a ) sink_memory_area_work_fn( VipsThreadState *state, void *a )
{ {
SinkMemory *memory = (SinkMemory *) a;
SinkMemoryThreadState *wstate = (SinkMemoryThreadState *) state; SinkMemoryThreadState *wstate = (SinkMemoryThreadState *) state;
SinkMemoryArea *area = wstate->area;
VIPS_DEBUG_MSG( "wbuffer_work_fn: %p %d x %d\n", VIPS_DEBUG_MSG( "sink_memory_area_work_fn: %p %d x %d\n",
state, state->pos.left, state->pos.top ); state, state->pos.left, state->pos.top );
if( vips_region_prepare_to( state->reg, wstate->buf->region, if( vips_region_prepare_to( state->reg, memory->region,
&state->pos, state->pos.left, state->pos.top ) ) { &state->pos, state->pos.left, state->pos.top ) ) {
VIPS_DEBUG_MSG( "wbuffer_work_fn: %p error!\n", state ); VIPS_DEBUG_MSG( "sink_memory_area_work_fn: %p error!\n",
state );
return( -1 ); return( -1 );
} }
VIPS_DEBUG_MSG( "wbuffer_work_fn: %p done\n", state ); VIPS_DEBUG_MSG( "sink_memory_area_work_fn: %p done\n", state );
/* Tell the bg write thread we've left. /* Tell the allocator we're done.
*/ */
vips_semaphore_upn( &wstate->buf->nwrite, 1 ); vips_semaphore_upn( &area->nwrite, 1 );
return( 0 ); return( 0 );
} }
static void static void
write_init( SinkMemory *write, sink_memory_free( SinkMemory *memory )
VipsImage *image, VipsRegionSinkMemory write_fn, void *a )
{ {
vips_sink_base_init( &write->sink_base, image ); VIPS_FREEF( sink_memory_area_free, memory->area );
VIPS_FREEF( sink_memory_area_free, memory->old_area );
write->buf = wbuffer_new( write ); VIPS_UNREF( memory->region );
write->buf_back = wbuffer_new( write );
write->write_fn = write_fn;
write->a = a;
} }
static void static int
write_free( SinkMemory *write ) sink_memory_init( SinkMemory *memory, VipsImage *image )
{ {
VIPS_FREEF( wbuffer_free, write->buf ); VipsRect all;
VIPS_FREEF( wbuffer_free, write->buf_back );
vips_sink_base_init( &memory->sink_base, image );
memory->area = NULL;
memory->old_area = NULL;
all.left = 0;
all.top = 0;
all.width = image->Xsize;
all.height = image->Ysize;
if( !(memory->region = vips_region_new( image )) ||
vips_region_image( memory->region, &all ) ||
!(memory->area = sink_memory_area_new( memory )) ||
!(memory->old_area = sink_memory_area_new( memory )) ) {
sink_memory_free( memory );
return( -1 );
}
return( 0 );
} }
/** /**
* VipsRegionSinkMemory: * vips_sink_memory2:
* @region: get pixels from here * @im: generate this image to memory
* @area: area to write
* @a: client data
* *
* The function should write the pixels in @area from @region. @a is the * Loops over an image, generating it to a memory buffer attached to the
* value passed into vips_discsink(). * image.
* *
* See also: vips_sink_disc(). * See also: vips_sink(), vips_get_tile_size().
* *
* Returns: 0 on success, -1 on error. * Returns: 0 on success, or -1 on error.
*/
/**
* vips_sink_disc:
* @im: image to process
* @write_fn: called for every batch of pixels
* @a: client data
*
* vips_sink_disc() loops over @im, top-to-bottom, generating it in sections.
* As each section is produced, @write_fn is called.
*
* @write_fn is always called single-threaded (though not always from the same
* thread), it's always given image
* sections in top-to-bottom order, and there are never any gaps.
*
* This operation is handy for making image sinks which output to things like
* disc files.
*
* See also: vips_concurrency_set().
*
* Returns: 0 on success, -1 on error.
*/ */
int int
vips_sink_disc( VipsImage *im, VipsRegionSinkMemory write_fn, void *a ) vips_sink_memory2( VipsImage *image )
{ {
SinkMemory write; SinkMemory memory;
int result; int result;
vips_image_preeval( im ); VIPS_DEBUG_MSG( "vips_sink_memory2:\n" );
write_init( &write, im, write_fn, a ); if( sink_memory_init( &memory, image ) )
return( -1 );
vips_image_preeval( image );
result = 0; result = 0;
if( !write.buf || sink_memory_area_position( memory.area, 0, memory.sink_base.nlines );
!write.buf_back || if( vips_threadpool_run( image,
wbuffer_position( write.buf, 0, write.sink_base.nlines ) || sink_memory_thread_state_new,
vips_threadpool_run( im, sink_memory_area_allocate_fn,
write_thread_state_new, sink_memory_area_work_fn,
wbuffer_allocate_fn, vips_sink_base_progress,
wbuffer_work_fn, &memory ) )
vips_sink_base_progress,
&write ) )
result = -1; result = -1;
/* Just before allocate signalled stop, it set write.buf writing. We vips_image_posteval( image );
* need to wait for this write to finish.
*
* We can't just free the buffers (which will wait for the bg threads
* to finish), since the bg thread might see the kill before it gets a
* chance to write.
*
* If the pool exited with an error, write.buf might not have been
* started (if the allocate failed), and in any case, we don't care if
* the final write went through or not.
*/
if( !result )
vips_semaphore_down( &write.buf->done );
vips_image_posteval( im ); sink_memory_free( &memory );
write_free( &write ); VIPS_DEBUG_MSG( "vips_sink_memory2: done\n" );
return( result ); return( result );
} }