new input stream done

needs testing ... also needs minimise support, and sniffing
This commit is contained in:
John Cupitt 2019-10-09 17:42:06 +01:00
parent 372bbc8020
commit 0307184050
2 changed files with 195 additions and 197 deletions

View File

@ -58,16 +58,24 @@ typedef struct _VipsStream {
VipsObject parent_object; VipsObject parent_object;
/*< private >*/ /*< private >*/
/* Read/write this fd if connected to a system pipe/socket. Override /* Read/write this fd if connected to a system pipe/socket. Override
* ::read() and ::write() to do something else. * ::read() and ::write() to do something else.
*/ */
int descriptor; int descriptor;
/* A descriptor we close with vips_tracked_close().
*/
int tracked_descriptor;
/* A descriptor we close close().
*/
int close_descriptor;
/* If descriptor is a file, the filename we opened. Handy for error /* If descriptor is a file, the filename we opened. Handy for error
* messages. * messages.
*/ */
const char *filename; char *filename;
} VipsStream; } VipsStream;
@ -111,15 +119,15 @@ typedef struct _VipsStreamInput {
* is an unseekable source) so that we can rewind and try again, if * is an unseekable source) so that we can rewind and try again, if
* necessary. * necessary.
* *
* Once we reach decode pahse, we no longer support seek and the * Once we reach decode phase, we no longer support rewind and the
* buffer of saved data is discarded. * buffer of saved data is discarded.
*/ */
gboolean decode; gboolean decode;
/* TRUE is this input source suports seek. If not, then we save data /* TRUE is this input source supports rewind. If not, then we save data
* read during header phase in a buffer so we can rewind. * read during header phase in a buffer.
*/ */
gboolean seekable; gboolean rewindable;
/*< private >*/ /*< private >*/
@ -130,7 +138,7 @@ typedef struct _VipsStreamInput {
/* Save data read during header phase here. If we rewind and try /* Save data read during header phase here. If we rewind and try
* again, serve data from this until it runs out. * again, serve data from this until it runs out.
*/ */
GByteArray *buffer; GByteArray *header_bytes;
/* For a memory source, the blob we read from. /* For a memory source, the blob we read from.
*/ */
@ -148,7 +156,7 @@ typedef struct _VipsStreamInputClass {
/* Subclasses can define these to implement other input methods. /* Subclasses can define these to implement other input methods.
*/ */
ssize_t (*read)( VipsStreamInput *, unsigned char *, size_t ); ssize_t (*read)( VipsStreamInput *, unsigned char *, size_t );
void (*rewind)( VipsStreamInput * ); int (*rewind)( VipsStreamInput * );
} VipsStreamInputClass; } VipsStreamInputClass;
@ -157,7 +165,7 @@ GType vips_stream_input_get_type( void );
VipsStreamInput *vips_stream_input_new_from_descriptor( int descriptor ); VipsStreamInput *vips_stream_input_new_from_descriptor( int descriptor );
VipsStreamInput *vips_stream_input_new_from_filename( const char *filename ); VipsStreamInput *vips_stream_input_new_from_filename( const char *filename );
VipsStreamInput *vips_stream_input_new_from_blob( VipsBlob *blob ); VipsStreamInput *vips_stream_input_new_from_blob( VipsBlob *blob );
VipsStreamInput *vips_stream_input_new_from_buffer( void *buf, size_t len ); VipsStreamInput *vips_stream_input_new_from_memory( void *data, size_t size );
ssize_t vips_stream_input_read( VipsStreamInput *input, ssize_t vips_stream_input_read( VipsStreamInput *input,
unsigned char *buffer, size_t length ); unsigned char *buffer, size_t length );

View File

@ -110,10 +110,16 @@ vips_stream_finalize( GObject *gobject )
VIPS_DEBUG_MSG( "\n" ); VIPS_DEBUG_MSG( "\n" );
#endif /*VIPS_DEBUG*/ #endif /*VIPS_DEBUG*/
if( stream->descriptor >= 0 ) { if( stream->close_descriptor >= 0 ) {
vips_tracked_close( stream->descriptor ); close( stream->close_descriptor );
stream->descriptor = -1; stream->close_descriptor = -1;
} }
if( stream->tracked_descriptor >= 0 ) {
vips_tracked_close( stream->tracked_descriptor );
stream->tracked_descriptor = -1;
}
VIPS_FREE( stream->filename ); VIPS_FREE( stream->filename );
G_OBJECT_CLASS( vips_stream_parent_class )->finalize( gobject ); G_OBJECT_CLASS( vips_stream_parent_class )->finalize( gobject );
@ -148,29 +154,23 @@ static void
vips_stream_init( VipsStream *stream ) vips_stream_init( VipsStream *stream )
{ {
stream->descriptor = -1; stream->descriptor = -1;
stream->tracked_descriptor = -1;
stream->close_descriptor = -1;
} }
void #define STREAM_NAME( STREAM ) \
vips_stream_attach( VipsStream *stream ) (VIPS_STREAM( STREAM )->filename ? \
{ VIPS_STREAM( STREAM )->filename : \
#ifdef VIPS_DEBUG VIPS_OBJECT( STREAM )->nickname)
VIPS_DEBUG_MSG( "vips_stream_attach: " );
vips_object_print_name( VIPS_OBJECT( stream ) );
VIPS_DEBUG_MSG( "\n" );
#endif /*VIPS_DEBUG*/
g_assert( !stream->attached );
stream->attached = TRUE;
}
G_DEFINE_TYPE( VipsStreamInput, vips_stream_input, VIPS_TYPE_STREAM ); G_DEFINE_TYPE( VipsStreamInput, vips_stream_input, VIPS_TYPE_STREAM );
static void static void
vips_stream_input_finalize( GObject *gobject ) vips_stream_input_finalize( GObject *gobject )
{ {
VipsStreamInput *stream = (VipsStreamInput *) gobject; VipsStreamInput *input = VIPS_STREAM_INPUT( gobject );
VIPS_FREE( stream->buffer ); VIPS_FREEF( g_byte_array_unref, input->header_bytes );
G_OBJECT_CLASS( vips_stream_input_parent_class )->finalize( gobject ); G_OBJECT_CLASS( vips_stream_input_parent_class )->finalize( gobject );
} }
@ -178,35 +178,103 @@ vips_stream_input_finalize( GObject *gobject )
static int static int
vips_stream_input_build( VipsObject *object ) vips_stream_input_build( VipsObject *object )
{ {
VipsStreamInput *stream = VIPS_STREAM_INPUT( object ); VipsStream *stream = VIPS_STREAM( object );
VipsStreamInput *input = VIPS_STREAM_INPUT( object );
VIPS_DEBUG_MSG( "vips_stream_input_build: %p\n", stream ); VIPS_DEBUG_MSG( "vips_stream_input_build: %p\n", input );
if( VIPS_OBJECT_CLASS( vips_stream_input_parent_class )-> if( VIPS_OBJECT_CLASS( vips_stream_input_parent_class )->
build( object ) ) build( object ) )
return( -1 ); return( -1 );
if( vips_object_argument_isset( object, "filename" ) && if( vips_object_argument_isset( object, "filename" ) &&
!vips_object_argument_isset( object, "descriptor" ) ) { vips_object_argument_isset( object, "descriptor" ) ) {
const char *filename = VIPS_STREAM( stream )->filename; vips_error( STREAM_NAME( stream ),
"%s", _( "don't set 'filename' and 'descriptor'" ) );
return( -1 );
}
if( vips_object_argument_isset( object, "filename" ) ) {
int fd; int fd;
if( (fd = vips_tracked_open( filename, MODE_READ )) == -1 ) { if( (fd = vips_tracked_open( stream->filename,
vips_error_system( errno, filename, MODE_READ )) == -1 ) {
vips_error_system( errno, STREAM_NAME( stream ),
"%s", _( "unable to open for read" ) ); "%s", _( "unable to open for read" ) );
return( -1 ); return( -1 );
} }
g_object_set( object, "descriptor", fd, NULL ); stream->tracked_descriptor = fd;
stream->descriptor = fd;
input->rewindable = TRUE;
} }
g_assert( !stream->buffer ); if( vips_object_argument_isset( object, "descriptor" ) )
g_assert( stream->buffer_size > 0 && stream->close_descriptor = stream->descriptor;
stream->buffer_size < 1000000 );
stream->buffer = g_new0( unsigned char, stream->buffer_size ); if( vips_object_argument_isset( object, "blob" ) )
stream->next_byte = NULL; input->rewindable = TRUE;
stream->bytes_available = 0;
/* We will need a save buffer for unrewindable streams.
*/
if( !input->rewindable )
input->header_bytes = g_byte_array_new();
return( 0 );
}
static ssize_t
vips_stream_input_read_real( VipsStreamInput *input,
unsigned char *buffer, size_t length )
{
VipsStream *stream = VIPS_STREAM( input );
if( input->blob ) {
VipsArea *area = (VipsArea *) input->blob;
ssize_t available = VIPS_MIN( length,
area->length - input->read_position );
if( available <= 0 )
return( 0 );
memcpy( buffer, area->data, available );
return( available );
}
else if( stream->descriptor ) {
return( read( stream->descriptor, buffer, length ) );
}
else {
g_assert( 0 );
return( -1 );
}
}
static int
vips_stream_input_rewind_real( VipsStreamInput *input )
{
VipsStream *stream = VIPS_STREAM( input );
if( input->decode ) {
vips_error( STREAM_NAME( stream ),
"%s", _( "can't rewind after decode begins" ) );
return( -1 );
}
if( input->rewindable &&
stream->descriptor != -1 ) {
off_t new_pos;
new_pos = lseek( stream->descriptor, 0, SEEK_SET );
if( new_pos == -1 ) {
vips_error_system( errno, STREAM_NAME( stream ),
"%s", _( "unable to rewind" ) );
return( 0 );
}
}
input->read_position = 0;
input->eof = FALSE;
return( 0 ); return( 0 );
} }
@ -223,94 +291,87 @@ vips_stream_input_class_init( VipsStreamInputClass *class )
vobject_class->build = vips_stream_input_build; vobject_class->build = vips_stream_input_build;
VIPS_ARG_INT( class, "buffer_size", 2, class->read = vips_stream_input_read_real;
_( "Buffer size" ), class->rewind = vips_stream_input_rewind_real;
_( "Size of input buffer" ),
VIPS_ARGUMENT_OPTIONAL_INPUT,
G_STRUCT_OFFSET( VipsStreamInput, buffer_size ),
1, 10000000, 4096 );
VIPS_ARG_BOXED( class, "blob", 3, VIPS_ARG_BOXED( class, "blob", 3,
_( "Blob" ), _( "Blob" ),
_( "Memory area to read from" ), _( "blob to load from" ),
VIPS_ARGUMENT_OPTIONAL_INPUT, VIPS_ARGUMENT_OPTIONAL_INPUT,
G_STRUCT_OFFSET( VipsStreamInput, blob ), G_STRUCT_OFFSET( VipsStreamInput, blob ),
VIPS_TYPE_BLOB ); VIPS_TYPE_BLOB );
VIPS_ARG_BOOL( class, "rewindable", 4,
_( "rewindable" ),
_( "'descriptor' supports rewind" ),
VIPS_ARGUMENT_OPTIONAL_INPUT,
G_STRUCT_OFFSET( VipsStreamInput, rewindable ),
FALSE );
} }
static void static void
vips_stream_input_init( VipsStreamInput *stream ) vips_stream_input_init( VipsStreamInput *input )
{ {
stream->buffer_size = 4096;
} }
/** /**
* vips_stream_input_new_from_descriptor: * vips_stream_input_new_from_descriptor:
* @descriptor: read from this file descriptor * @descriptor: read from this file descriptor
* *
* Create a stream attached to a file descriptor. @descriptor is closed when * Create an input stream attached to a file descriptor. @descriptor is
* the #VipsStream is finalized. * closed with close() when the #VipsStream is finalized.
*
* #VipsStream s start out empty, you need to call
* vips_stream_input_refill() to fill them with bytes.
*
* See also: vips_stream_input_refill().
* *
* Returns: a new #VipsStream * Returns: a new #VipsStream
*/ */
VipsStreamInput * VipsStreamInput *
vips_stream_input_new_from_descriptor( int descriptor ) vips_stream_input_new_from_descriptor( int descriptor )
{ {
VipsStreamInput *stream; VipsStreamInput *input;
VIPS_DEBUG_MSG( "vips_stream_input_new_from_descriptor: %d\n", VIPS_DEBUG_MSG( "vips_stream_input_new_from_descriptor: %d\n",
descriptor ); descriptor );
stream = VIPS_STREAM_INPUT( input = VIPS_STREAM_INPUT(
g_object_new( VIPS_TYPE_STREAM_INPUT, g_object_new( VIPS_TYPE_STREAM_INPUT,
"descriptor", descriptor, "descriptor", descriptor,
NULL ) ); NULL ) );
if( vips_object_build( VIPS_OBJECT( stream ) ) ) { if( vips_object_build( VIPS_OBJECT( input ) ) ) {
VIPS_UNREF( stream ); VIPS_UNREF( input );
return( NULL ); return( NULL );
} }
return( stream ); return( input );
} }
/** /**
* vips_stream_input_new_from_filename: * vips_stream_input_new_from_filename:
* @descriptor: read from this filename * @descriptor: read from this filename
* *
* Create a stream attached to a file. * Create an input stream attached to a file.
*
* #VipsStream s start out empty, you need to call
* vips_stream_input_refill() to fill them with bytes.
*
* See also: vips_stream_input_refill().
* *
* Returns: a new #VipsStream * Returns: a new #VipsStream
*/ */
VipsStreamInput * VipsStreamInput *
vips_stream_input_new_from_filename( const char *filename ) vips_stream_input_new_from_filename( const char *filename )
{ {
VipsStreamInput *stream; VipsStreamInput *input;
VIPS_DEBUG_MSG( "vips_stream_input_new_from_filename: %s\n", VIPS_DEBUG_MSG( "vips_stream_input_new_from_filename: %s\n",
filename ); filename );
stream = VIPS_STREAM_INPUT( input = VIPS_STREAM_INPUT(
g_object_new( VIPS_TYPE_STREAM_INPUT, g_object_new( VIPS_TYPE_STREAM_INPUT,
"filename", filename, "filename", filename,
NULL ) ); NULL ) );
if( vips_object_build( VIPS_OBJECT( stream ) ) ) { if( vips_object_build( VIPS_OBJECT( input ) ) ) {
VIPS_UNREF( stream ); VIPS_UNREF( input );
return( NULL ); return( NULL );
} }
return( stream ); return( input );
} }
/** /**
@ -347,7 +408,7 @@ vips_stream_input_new_from_blob( VipsBlob *blob )
} }
/** /**
* vips_stream_input_new_from_buffer: * vips_stream_input_new_from_memory:
* @buf: memory area to load * @buf: memory area to load
* @len: size of memory area * @len: size of memory area
* *
@ -363,17 +424,17 @@ vips_stream_input_new_from_blob( VipsBlob *blob )
* Returns: a new #VipsStream * Returns: a new #VipsStream
*/ */
VipsStreamInput * VipsStreamInput *
vips_stream_input_new_from_buffer( void *buf, size_t len ) vips_stream_input_new_from_memory( void *data, size_t size )
{ {
VipsStreamInput *stream; VipsStreamInput *stream;
VipsBlob *blob; VipsBlob *blob;
VIPS_DEBUG_MSG( "vips_stream_input_new_from_buffer: %p, len = %zd\n", VIPS_DEBUG_MSG( "vips_stream_input_new_from_buffer: %p, size = %zd\n",
buf, len ); data, size );
/* We don't take a copy of the data or free it. /* We don't take a copy of the data or free it.
*/ */
blob = vips_blob_new( NULL, buf, len ); blob = vips_blob_new( NULL, data, size );
stream = vips_stream_input_new_from_blob( blob ); stream = vips_stream_input_new_from_blob( blob );
@ -382,147 +443,85 @@ vips_stream_input_new_from_buffer( void *buf, size_t len )
return( stream ); return( stream );
} }
static ssize_t ssize_t
vips_stream_input_read( VipsStreamInput *stream, vips_stream_input_read( VipsStreamInput *input,
unsigned char *buffer, size_t buffer_size ) unsigned char *buffer, size_t length )
{ {
VipsStreamInputClass *class = VIPS_STREAM_INPUT_GET_CLASS( stream ); VipsStreamInputClass *class = VIPS_STREAM_INPUT_GET_CLASS( input );
ssize_t len; ssize_t bytes_read;
if( class->read ) bytes_read = 0;
len = class->read( stream, buffer, buffer_size );
else if( VIPS_STREAM( stream )->descriptor > 0 )
len = read( VIPS_STREAM( stream )->descriptor,
buffer, buffer_size );
else
g_assert( 0 );
return( len ); /* Are we serving from header_bytes? Get what we can from there.
}
/**
* vips_stream_input_refill:
* @stream: fill the stream buffer
*
* Reads data into the stream buffer.
*
* Returns: 0 on success, -1 on error or EOF.
*/
int
vips_stream_input_refill( VipsStreamInput *stream )
{
ssize_t len;
/* If this is a memory source, we just set next_byte on the first
* call and we're done.
*
* If we're not attached, we can read even when the buffer isn't
* empty. Just move the unused bytes down and top up.
*
* If we're attached, we don't own the next_byte and bytes_available
* values (they are run by the load library) so we can't do this.
*
* We need to be able to refill the unattached buffer so we can do
* file format sniffing.
*/ */
if( stream->blob ) { if( input->header_bytes &&
/* On the first call we read the whole of the input blob. On input->header_bytes->len < input->read_position ) {
* the second call, we EOF. ssize_t available;
*/
if( stream->next_byte )
len = 0;
else
stream->next_byte = (void *)
vips_blob_get( stream->blob, (size_t *) &len );
}
else if( !VIPS_STREAM( stream )->attached ) {
memmove( stream->buffer, stream->next_byte,
stream->bytes_available );
stream->next_byte = stream->buffer;
len = vips_stream_input_read( stream, available = VIPS_MIN( length,
stream->next_byte, input->header_bytes->len - input->read_position );
stream->buffer_size - stream->bytes_available ); memcpy( buffer,
} input->header_bytes->data + input->read_position,
else { available );
len = vips_stream_input_read( stream, input->read_position += available;
stream->buffer, stream->buffer_size ); buffer += available;
stream->next_byte = stream->buffer; length -= available;
bytes_read += available;
/* This is incremented below, after we check the return value.
*/
stream->bytes_available = 0;
} }
#ifdef VIPS_DEBUG /* Any more bytes required? Call the read() method.
if( len > 0 ) */
VIPS_DEBUG_MSG( "vips_stream_input_refill: " if( length > 0 ) {
"read %zd bytes\n", len ); ssize_t read;
#endif /*VIPS_DEBUG*/
if( len <= 0 ) { if( (read = class->read( input, buffer, length )) == -1 ) {
stream->eof = TRUE; vips_error_system( errno, STREAM_NAME( input ),
if( len < 0 )
vips_error_system( errno, "read",
"%s", _( "read error" ) ); "%s", _( "read error" ) );
return( -1 );
}
if( read == 0 )
input->eof = TRUE;
return( -1 ); /* If we are in header mode and there's a save buffer, we need
* to store this new data in case we rewind.
*/
if( input->header_bytes &&
!input->decode &&
read > 0 )
g_byte_array_append( input->header_bytes,
buffer, read );
input->read_position += read;
bytes_read += read;
} }
stream->bytes_available += len; return( bytes_read );
return( 0 );
} }
gboolean gboolean
vips_stream_input_eof( VipsStreamInput *stream ) vips_stream_input_eof( VipsStreamInput *input )
{ {
if( !stream->eof && return( input->eof );
stream->bytes_available == 0 &&
!VIPS_STREAM( stream )->attached )
vips_stream_input_refill( stream );
return( stream->eof );
} }
void void
vips_stream_input_detach( VipsStreamInput *stream, vips_stream_input_decode( VipsStreamInput *input )
unsigned char *next_byte, size_t bytes_available )
{ {
VIPS_DEBUG_MSG( "vips_stream_input_detach:\n" ); input->decode = TRUE;
VIPS_FREEF( g_byte_array_unref, input->header_bytes );
g_assert( VIPS_STREAM( stream )->attached );
VIPS_STREAM( stream )->attached = FALSE;
stream->next_byte = next_byte;
stream->bytes_available = bytes_available;
} }
/** /**
* vips_stream_input_sniff: * vips_stream_input_sniff:
* @bytes: number of bytes to sniff * @bytes: number of bytes to sniff
* *
* Return a pointer to the start of the next @bytes bytes. This can only be * Return a pointer to the first few bytes of the file.
* used in detached mode.
*/ */
unsigned char * unsigned char *
vips_stream_input_sniff( VipsStreamInput *stream, int bytes ) vips_stream_input_sniff( VipsStreamInput *stream, size_t length )
{ {
if( VIPS_STREAM( stream )->attached ) { return( NULL );
g_warning( "%s", _( "cannot sniff attached streams" ) );
return( NULL );
}
while( stream->bytes_available < bytes )
if( vips_stream_input_refill( stream ) ) {
g_warning( "%s",
_( "not enough bytes in stream to sniff" ) );
return( NULL );
}
return( stream->next_byte );
} }
G_DEFINE_TYPE( VipsStreamOutput, vips_stream_output, VIPS_TYPE_STREAM ); G_DEFINE_TYPE( VipsStreamOutput, vips_stream_output, VIPS_TYPE_STREAM );
@ -634,15 +633,6 @@ vips_stream_output_new_from_filename( const char *filename )
return( stream ); return( stream );
} }
void
vips_stream_output_detach( VipsStreamOutput *stream )
{
VIPS_DEBUG_MSG( "vips_stream_output_detach:\n" );
g_assert( VIPS_STREAM( stream )->attached );
VIPS_STREAM( stream )->attached = FALSE;
}
int int
vips_stream_output_write( VipsStreamOutput *stream, vips_stream_output_write( VipsStreamOutput *stream,
const unsigned char *buffer, size_t buffer_size ) const unsigned char *buffer, size_t buffer_size )