From 030718405057a921f41917db104878973dba1f47 Mon Sep 17 00:00:00 2001 From: John Cupitt Date: Wed, 9 Oct 2019 17:42:06 +0100 Subject: [PATCH] new input stream done needs testing ... also needs minimise support, and sniffing --- libvips/include/vips/stream.h | 26 ++- libvips/iofuncs/stream.c | 366 +++++++++++++++++----------------- 2 files changed, 195 insertions(+), 197 deletions(-) diff --git a/libvips/include/vips/stream.h b/libvips/include/vips/stream.h index 8dc905f7..221a2ccb 100644 --- a/libvips/include/vips/stream.h +++ b/libvips/include/vips/stream.h @@ -58,16 +58,24 @@ typedef struct _VipsStream { VipsObject parent_object; /*< private >*/ - + /* Read/write this fd if connected to a system pipe/socket. Override * ::read() and ::write() to do something else. */ int descriptor; + /* A descriptor we close with vips_tracked_close(). + */ + int tracked_descriptor; + + /* A descriptor we close close(). + */ + int close_descriptor; + /* If descriptor is a file, the filename we opened. Handy for error * messages. */ - const char *filename; + char *filename; } VipsStream; @@ -111,15 +119,15 @@ typedef struct _VipsStreamInput { * is an unseekable source) so that we can rewind and try again, if * necessary. * - * Once we reach decode pahse, we no longer support seek and the + * Once we reach decode phase, we no longer support rewind and the * buffer of saved data is discarded. */ gboolean decode; - /* TRUE is this input source suports seek. If not, then we save data - * read during header phase in a buffer so we can rewind. + /* TRUE is this input source supports rewind. If not, then we save data + * read during header phase in a buffer. */ - gboolean seekable; + gboolean rewindable; /*< private >*/ @@ -130,7 +138,7 @@ typedef struct _VipsStreamInput { /* Save data read during header phase here. If we rewind and try * again, serve data from this until it runs out. */ - GByteArray *buffer; + GByteArray *header_bytes; /* For a memory source, the blob we read from. */ @@ -148,7 +156,7 @@ typedef struct _VipsStreamInputClass { /* Subclasses can define these to implement other input methods. */ ssize_t (*read)( VipsStreamInput *, unsigned char *, size_t ); - void (*rewind)( VipsStreamInput * ); + int (*rewind)( VipsStreamInput * ); } VipsStreamInputClass; @@ -157,7 +165,7 @@ GType vips_stream_input_get_type( void ); VipsStreamInput *vips_stream_input_new_from_descriptor( int descriptor ); VipsStreamInput *vips_stream_input_new_from_filename( const char *filename ); VipsStreamInput *vips_stream_input_new_from_blob( VipsBlob *blob ); -VipsStreamInput *vips_stream_input_new_from_buffer( void *buf, size_t len ); +VipsStreamInput *vips_stream_input_new_from_memory( void *data, size_t size ); ssize_t vips_stream_input_read( VipsStreamInput *input, unsigned char *buffer, size_t length ); diff --git a/libvips/iofuncs/stream.c b/libvips/iofuncs/stream.c index af74d140..62a43803 100644 --- a/libvips/iofuncs/stream.c +++ b/libvips/iofuncs/stream.c @@ -110,10 +110,16 @@ vips_stream_finalize( GObject *gobject ) VIPS_DEBUG_MSG( "\n" ); #endif /*VIPS_DEBUG*/ - if( stream->descriptor >= 0 ) { - vips_tracked_close( stream->descriptor ); - stream->descriptor = -1; + if( stream->close_descriptor >= 0 ) { + close( stream->close_descriptor ); + stream->close_descriptor = -1; } + + if( stream->tracked_descriptor >= 0 ) { + vips_tracked_close( stream->tracked_descriptor ); + stream->tracked_descriptor = -1; + } + VIPS_FREE( stream->filename ); G_OBJECT_CLASS( vips_stream_parent_class )->finalize( gobject ); @@ -148,29 +154,23 @@ static void vips_stream_init( VipsStream *stream ) { stream->descriptor = -1; + stream->tracked_descriptor = -1; + stream->close_descriptor = -1; } -void -vips_stream_attach( VipsStream *stream ) -{ -#ifdef VIPS_DEBUG - VIPS_DEBUG_MSG( "vips_stream_attach: " ); - vips_object_print_name( VIPS_OBJECT( stream ) ); - VIPS_DEBUG_MSG( "\n" ); -#endif /*VIPS_DEBUG*/ - - g_assert( !stream->attached ); - stream->attached = TRUE; -} +#define STREAM_NAME( STREAM ) \ + (VIPS_STREAM( STREAM )->filename ? \ + VIPS_STREAM( STREAM )->filename : \ + VIPS_OBJECT( STREAM )->nickname) G_DEFINE_TYPE( VipsStreamInput, vips_stream_input, VIPS_TYPE_STREAM ); static void vips_stream_input_finalize( GObject *gobject ) { - VipsStreamInput *stream = (VipsStreamInput *) gobject; + VipsStreamInput *input = VIPS_STREAM_INPUT( gobject ); - VIPS_FREE( stream->buffer ); + VIPS_FREEF( g_byte_array_unref, input->header_bytes ); G_OBJECT_CLASS( vips_stream_input_parent_class )->finalize( gobject ); } @@ -178,35 +178,103 @@ vips_stream_input_finalize( GObject *gobject ) static int vips_stream_input_build( VipsObject *object ) { - VipsStreamInput *stream = VIPS_STREAM_INPUT( object ); + VipsStream *stream = VIPS_STREAM( object ); + VipsStreamInput *input = VIPS_STREAM_INPUT( object ); - VIPS_DEBUG_MSG( "vips_stream_input_build: %p\n", stream ); + VIPS_DEBUG_MSG( "vips_stream_input_build: %p\n", input ); if( VIPS_OBJECT_CLASS( vips_stream_input_parent_class )-> build( object ) ) return( -1 ); if( vips_object_argument_isset( object, "filename" ) && - !vips_object_argument_isset( object, "descriptor" ) ) { - const char *filename = VIPS_STREAM( stream )->filename; + vips_object_argument_isset( object, "descriptor" ) ) { + vips_error( STREAM_NAME( stream ), + "%s", _( "don't set 'filename' and 'descriptor'" ) ); + return( -1 ); + } + if( vips_object_argument_isset( object, "filename" ) ) { int fd; - if( (fd = vips_tracked_open( filename, MODE_READ )) == -1 ) { - vips_error_system( errno, filename, + if( (fd = vips_tracked_open( stream->filename, + MODE_READ )) == -1 ) { + vips_error_system( errno, STREAM_NAME( stream ), "%s", _( "unable to open for read" ) ); return( -1 ); } - g_object_set( object, "descriptor", fd, NULL ); + stream->tracked_descriptor = fd; + stream->descriptor = fd; + input->rewindable = TRUE; } - g_assert( !stream->buffer ); - g_assert( stream->buffer_size > 0 && - stream->buffer_size < 1000000 ); - stream->buffer = g_new0( unsigned char, stream->buffer_size ); - stream->next_byte = NULL; - stream->bytes_available = 0; + if( vips_object_argument_isset( object, "descriptor" ) ) + stream->close_descriptor = stream->descriptor; + + if( vips_object_argument_isset( object, "blob" ) ) + input->rewindable = TRUE; + + /* We will need a save buffer for unrewindable streams. + */ + if( !input->rewindable ) + input->header_bytes = g_byte_array_new(); + + return( 0 ); +} + +static ssize_t +vips_stream_input_read_real( VipsStreamInput *input, + unsigned char *buffer, size_t length ) +{ + VipsStream *stream = VIPS_STREAM( input ); + + if( input->blob ) { + VipsArea *area = (VipsArea *) input->blob; + ssize_t available = VIPS_MIN( length, + area->length - input->read_position ); + + if( available <= 0 ) + return( 0 ); + + memcpy( buffer, area->data, available ); + + return( available ); + } + else if( stream->descriptor ) { + return( read( stream->descriptor, buffer, length ) ); + } + else { + g_assert( 0 ); + return( -1 ); + } +} + +static int +vips_stream_input_rewind_real( VipsStreamInput *input ) +{ + VipsStream *stream = VIPS_STREAM( input ); + + if( input->decode ) { + vips_error( STREAM_NAME( stream ), + "%s", _( "can't rewind after decode begins" ) ); + return( -1 ); + } + + if( input->rewindable && + stream->descriptor != -1 ) { + off_t new_pos; + + new_pos = lseek( stream->descriptor, 0, SEEK_SET ); + if( new_pos == -1 ) { + vips_error_system( errno, STREAM_NAME( stream ), + "%s", _( "unable to rewind" ) ); + return( 0 ); + } + } + + input->read_position = 0; + input->eof = FALSE; return( 0 ); } @@ -223,94 +291,87 @@ vips_stream_input_class_init( VipsStreamInputClass *class ) vobject_class->build = vips_stream_input_build; - VIPS_ARG_INT( class, "buffer_size", 2, - _( "Buffer size" ), - _( "Size of input buffer" ), - VIPS_ARGUMENT_OPTIONAL_INPUT, - G_STRUCT_OFFSET( VipsStreamInput, buffer_size ), - 1, 10000000, 4096 ); + class->read = vips_stream_input_read_real; + class->rewind = vips_stream_input_rewind_real; VIPS_ARG_BOXED( class, "blob", 3, _( "Blob" ), - _( "Memory area to read from" ), + _( "blob to load from" ), VIPS_ARGUMENT_OPTIONAL_INPUT, G_STRUCT_OFFSET( VipsStreamInput, blob ), VIPS_TYPE_BLOB ); + + VIPS_ARG_BOOL( class, "rewindable", 4, + _( "rewindable" ), + _( "'descriptor' supports rewind" ), + VIPS_ARGUMENT_OPTIONAL_INPUT, + G_STRUCT_OFFSET( VipsStreamInput, rewindable ), + FALSE ); + } static void -vips_stream_input_init( VipsStreamInput *stream ) +vips_stream_input_init( VipsStreamInput *input ) { - stream->buffer_size = 4096; } /** * vips_stream_input_new_from_descriptor: * @descriptor: read from this file descriptor * - * Create a stream attached to a file descriptor. @descriptor is closed when - * the #VipsStream is finalized. - * - * #VipsStream s start out empty, you need to call - * vips_stream_input_refill() to fill them with bytes. - * - * See also: vips_stream_input_refill(). + * Create an input stream attached to a file descriptor. @descriptor is + * closed with close() when the #VipsStream is finalized. * * Returns: a new #VipsStream */ VipsStreamInput * vips_stream_input_new_from_descriptor( int descriptor ) { - VipsStreamInput *stream; + VipsStreamInput *input; VIPS_DEBUG_MSG( "vips_stream_input_new_from_descriptor: %d\n", descriptor ); - stream = VIPS_STREAM_INPUT( + input = VIPS_STREAM_INPUT( g_object_new( VIPS_TYPE_STREAM_INPUT, "descriptor", descriptor, NULL ) ); - if( vips_object_build( VIPS_OBJECT( stream ) ) ) { - VIPS_UNREF( stream ); + if( vips_object_build( VIPS_OBJECT( input ) ) ) { + VIPS_UNREF( input ); return( NULL ); } - return( stream ); + return( input ); } /** * vips_stream_input_new_from_filename: * @descriptor: read from this filename * - * Create a stream attached to a file. - * - * #VipsStream s start out empty, you need to call - * vips_stream_input_refill() to fill them with bytes. - * - * See also: vips_stream_input_refill(). + * Create an input stream attached to a file. * * Returns: a new #VipsStream */ VipsStreamInput * vips_stream_input_new_from_filename( const char *filename ) { - VipsStreamInput *stream; + VipsStreamInput *input; VIPS_DEBUG_MSG( "vips_stream_input_new_from_filename: %s\n", filename ); - stream = VIPS_STREAM_INPUT( + input = VIPS_STREAM_INPUT( g_object_new( VIPS_TYPE_STREAM_INPUT, "filename", filename, NULL ) ); - if( vips_object_build( VIPS_OBJECT( stream ) ) ) { - VIPS_UNREF( stream ); + if( vips_object_build( VIPS_OBJECT( input ) ) ) { + VIPS_UNREF( input ); return( NULL ); } - return( stream ); + return( input ); } /** @@ -347,7 +408,7 @@ vips_stream_input_new_from_blob( VipsBlob *blob ) } /** - * vips_stream_input_new_from_buffer: + * vips_stream_input_new_from_memory: * @buf: memory area to load * @len: size of memory area * @@ -363,17 +424,17 @@ vips_stream_input_new_from_blob( VipsBlob *blob ) * Returns: a new #VipsStream */ VipsStreamInput * -vips_stream_input_new_from_buffer( void *buf, size_t len ) +vips_stream_input_new_from_memory( void *data, size_t size ) { VipsStreamInput *stream; VipsBlob *blob; - VIPS_DEBUG_MSG( "vips_stream_input_new_from_buffer: %p, len = %zd\n", - buf, len ); + VIPS_DEBUG_MSG( "vips_stream_input_new_from_buffer: %p, size = %zd\n", + data, size ); /* We don't take a copy of the data or free it. */ - blob = vips_blob_new( NULL, buf, len ); + blob = vips_blob_new( NULL, data, size ); stream = vips_stream_input_new_from_blob( blob ); @@ -382,147 +443,85 @@ vips_stream_input_new_from_buffer( void *buf, size_t len ) return( stream ); } -static ssize_t -vips_stream_input_read( VipsStreamInput *stream, - unsigned char *buffer, size_t buffer_size ) +ssize_t +vips_stream_input_read( VipsStreamInput *input, + unsigned char *buffer, size_t length ) { - VipsStreamInputClass *class = VIPS_STREAM_INPUT_GET_CLASS( stream ); + VipsStreamInputClass *class = VIPS_STREAM_INPUT_GET_CLASS( input ); - ssize_t len; + ssize_t bytes_read; - if( class->read ) - len = class->read( stream, buffer, buffer_size ); - else if( VIPS_STREAM( stream )->descriptor > 0 ) - len = read( VIPS_STREAM( stream )->descriptor, - buffer, buffer_size ); - else - g_assert( 0 ); + bytes_read = 0; - return( len ); -} - -/** - * vips_stream_input_refill: - * @stream: fill the stream buffer - * - * Reads data into the stream buffer. - * - * Returns: 0 on success, -1 on error or EOF. - */ -int -vips_stream_input_refill( VipsStreamInput *stream ) -{ - ssize_t len; - - /* If this is a memory source, we just set next_byte on the first - * call and we're done. - * - * If we're not attached, we can read even when the buffer isn't - * empty. Just move the unused bytes down and top up. - * - * If we're attached, we don't own the next_byte and bytes_available - * values (they are run by the load library) so we can't do this. - * - * We need to be able to refill the unattached buffer so we can do - * file format sniffing. + /* Are we serving from header_bytes? Get what we can from there. */ - if( stream->blob ) { - /* On the first call we read the whole of the input blob. On - * the second call, we EOF. - */ - if( stream->next_byte ) - len = 0; - else - stream->next_byte = (void *) - vips_blob_get( stream->blob, (size_t *) &len ); - } - else if( !VIPS_STREAM( stream )->attached ) { - memmove( stream->buffer, stream->next_byte, - stream->bytes_available ); - stream->next_byte = stream->buffer; + if( input->header_bytes && + input->header_bytes->len < input->read_position ) { + ssize_t available; - len = vips_stream_input_read( stream, - stream->next_byte, - stream->buffer_size - stream->bytes_available ); - } - else { - len = vips_stream_input_read( stream, - stream->buffer, stream->buffer_size ); - stream->next_byte = stream->buffer; - - /* This is incremented below, after we check the return value. - */ - stream->bytes_available = 0; + available = VIPS_MIN( length, + input->header_bytes->len - input->read_position ); + memcpy( buffer, + input->header_bytes->data + input->read_position, + available ); + input->read_position += available; + buffer += available; + length -= available; + bytes_read += available; } -#ifdef VIPS_DEBUG - if( len > 0 ) - VIPS_DEBUG_MSG( "vips_stream_input_refill: " - "read %zd bytes\n", len ); -#endif /*VIPS_DEBUG*/ + /* Any more bytes required? Call the read() method. + */ + if( length > 0 ) { + ssize_t read; - if( len <= 0 ) { - stream->eof = TRUE; - - if( len < 0 ) - vips_error_system( errno, "read", + if( (read = class->read( input, buffer, length )) == -1 ) { + vips_error_system( errno, STREAM_NAME( input ), "%s", _( "read error" ) ); + return( -1 ); + } + if( read == 0 ) + input->eof = TRUE; - return( -1 ); + /* If we are in header mode and there's a save buffer, we need + * to store this new data in case we rewind. + */ + if( input->header_bytes && + !input->decode && + read > 0 ) + g_byte_array_append( input->header_bytes, + buffer, read ); + + input->read_position += read; + bytes_read += read; } - stream->bytes_available += len; - - return( 0 ); + return( bytes_read ); } gboolean -vips_stream_input_eof( VipsStreamInput *stream ) +vips_stream_input_eof( VipsStreamInput *input ) { - if( !stream->eof && - stream->bytes_available == 0 && - !VIPS_STREAM( stream )->attached ) - vips_stream_input_refill( stream ); - - return( stream->eof ); + return( input->eof ); } -void -vips_stream_input_detach( VipsStreamInput *stream, - unsigned char *next_byte, size_t bytes_available ) +void +vips_stream_input_decode( VipsStreamInput *input ) { - VIPS_DEBUG_MSG( "vips_stream_input_detach:\n" ); - - g_assert( VIPS_STREAM( stream )->attached ); - VIPS_STREAM( stream )->attached = FALSE; - - stream->next_byte = next_byte; - stream->bytes_available = bytes_available; + input->decode = TRUE; + VIPS_FREEF( g_byte_array_unref, input->header_bytes ); } /** * vips_stream_input_sniff: * @bytes: number of bytes to sniff * - * Return a pointer to the start of the next @bytes bytes. This can only be - * used in detached mode. + * Return a pointer to the first few bytes of the file. */ unsigned char * -vips_stream_input_sniff( VipsStreamInput *stream, int bytes ) +vips_stream_input_sniff( VipsStreamInput *stream, size_t length ) { - if( VIPS_STREAM( stream )->attached ) { - g_warning( "%s", _( "cannot sniff attached streams" ) ); - return( NULL ); - } - - while( stream->bytes_available < bytes ) - if( vips_stream_input_refill( stream ) ) { - g_warning( "%s", - _( "not enough bytes in stream to sniff" ) ); - return( NULL ); - } - - return( stream->next_byte ); + return( NULL ); } G_DEFINE_TYPE( VipsStreamOutput, vips_stream_output, VIPS_TYPE_STREAM ); @@ -634,15 +633,6 @@ vips_stream_output_new_from_filename( const char *filename ) return( stream ); } -void -vips_stream_output_detach( VipsStreamOutput *stream ) -{ - VIPS_DEBUG_MSG( "vips_stream_output_detach:\n" ); - - g_assert( VIPS_STREAM( stream )->attached ); - VIPS_STREAM( stream )->attached = FALSE; -} - int vips_stream_output_write( VipsStreamOutput *stream, const unsigned char *buffer, size_t buffer_size )