start cleaning up

This commit is contained in:
John Cupitt 2019-10-21 10:57:20 +01:00
parent 4a311a5cdd
commit d49e816641
5 changed files with 1328 additions and 1025 deletions

View File

@ -88,29 +88,29 @@ GType vips_stream_get_type( void );
const char *vips_stream_filename( VipsStream *stream );
#define VIPS_TYPE_STREAM_INPUT (vips_stream_input_get_type())
#define VIPS_STREAM_INPUT( obj ) \
#define VIPS_TYPE_STREAMI (vips_streami_get_type())
#define VIPS_STREAMI( obj ) \
(G_TYPE_CHECK_INSTANCE_CAST( (obj), \
VIPS_TYPE_STREAM_INPUT, VipsStreamInput ))
#define VIPS_STREAM_INPUT_CLASS( klass ) \
VIPS_TYPE_STREAMI, VipsStreami ))
#define VIPS_STREAMI_CLASS( klass ) \
(G_TYPE_CHECK_CLASS_CAST( (klass), \
VIPS_TYPE_STREAM_INPUT, VipsStreamInputClass))
#define VIPS_IS_STREAM_INPUT( obj ) \
(G_TYPE_CHECK_INSTANCE_TYPE( (obj), VIPS_TYPE_STREAM_INPUT ))
#define VIPS_IS_STREAM_INPUT_CLASS( klass ) \
(G_TYPE_CHECK_CLASS_TYPE( (klass), VIPS_TYPE_STREAM_INPUT ))
#define VIPS_STREAM_INPUT_GET_CLASS( obj ) \
VIPS_TYPE_STREAMI, VipsStreamiClass))
#define VIPS_IS_STREAMI( obj ) \
(G_TYPE_CHECK_INSTANCE_TYPE( (obj), VIPS_TYPE_STREAMI ))
#define VIPS_IS_STREAMI_CLASS( klass ) \
(G_TYPE_CHECK_CLASS_TYPE( (klass), VIPS_TYPE_STREAMI ))
#define VIPS_STREAMI_GET_CLASS( obj ) \
(G_TYPE_INSTANCE_GET_CLASS( (obj), \
VIPS_TYPE_STREAM_INPUT, VipsStreamInputClass ))
VIPS_TYPE_STREAMI, VipsStreamiClass ))
/* Read from something like a socket, file or memory area and present the data
* with a simple seek / read interface.
* with a unified seek / read / map interface.
*
* During the header phase, we save data from unseekable streams in a buffer
* so readers can rewind and read again. We don't buffer data during the
* decode stage.
*/
typedef struct _VipsStreamInput {
typedef struct _VipsStreami {
VipsStream parent_object;
/* We have two phases:
@ -124,25 +124,29 @@ typedef struct _VipsStreamInput {
*/
gboolean decode;
/* TRUE if this descriptor supports lseek(). If not, then we save data
* read during the header phase in a buffer.
/* TRUE if this input is something like a pipe. These don't support
* stream or map -- all you can do is read() bytes sequentially.
*
* If you attempt to map or get the size of a pipe-style input, it'll
* get read entirely into memory. Seeks will cause read up to the seek
* point.
*/
gboolean seekable;
gboolean is_pipe;
/* TRUE if this descriptor supports mmap(). If not, then we have to
* read() the whole stream if the loader needs the entire image.
/* The current read point and length.
*
* length is -1 for is_pipe sources.
*
* off_t can be 32 bits on some platforms, so make sure we have a
* full 64.
*/
gboolean mappable;
gint64 read_position;
gint64 length;
/*< private >*/
/* The current read point. off_t can be 32 bits on some platforms, so
* make sure we have a full 64.
*/
gint64 read_position;
/* Save data read during header phase here. If we rewind and try
* again, serve data from this until it runs out.
/* For is_pipe sources, save data read during header phase here. If
* we rewind and try again, serve data from this until it runs out.
*/
GByteArray *header_bytes;
@ -154,17 +158,17 @@ typedef struct _VipsStreamInput {
*/
VipsBlob *blob;
/* If we've mmaped the file, the base and length of the mapped area.
/* If we've mmaped the file, the base of the mapped area. Use @length
* for the length.
*/
const void *baseaddr;
size_t length;
} VipsStreamInput;
} VipsStreami;
typedef struct _VipsStreamInputClass {
typedef struct _VipsStreamiClass {
VipsStreamClass parent_class;
/* Subclasses can define these to implement other input methods.
/* Subclasses can define these to implement other streami methods.
*/
/* Read up to N bytes from the stream into the supplied buffer,
@ -172,7 +176,7 @@ typedef struct _VipsStreamInputClass {
*
* -1 on error, 0 on EOF.
*/
ssize_t (*read)( VipsStreamInput *, unsigned char *, size_t );
ssize_t (*read)( VipsStreami *, void *, size_t );
/* Map the entire stream into memory, for example with mmap(). Return
* the base and size of the mapped area.
@ -182,11 +186,11 @@ typedef struct _VipsStreamInputClass {
*
* NULL on error.
*/
const void *(*map)( VipsStreamInput *, size_t * );
const void *(*map)( VipsStreami *, size_t * );
/* Seek to a certain position, args exactly as lseek(2).
*/
gint64 (*seek)( VipsStreamInput *, gint64 offset, int );
gint64 (*seek)( VipsStreami *, gint64 offset, int );
/* Shut down anything that can safely restarted. For example, if
* there's a fd that supports lseek(), it can be closed, since later
@ -195,48 +199,45 @@ typedef struct _VipsStreamInputClass {
*
* Non-restartable shutdown shuld be in _finalize().
*/
void (*minimise)( VipsStreamInput * );
void (*minimise)( VipsStreami * );
} VipsStreamInputClass;
} VipsStreamiClass;
GType vips_stream_input_get_type( void );
GType vips_streami_get_type( void );
VipsStreamInput *vips_stream_input_new_from_descriptor( int descriptor );
VipsStreamInput *vips_stream_input_new_from_filename( const char *filename );
VipsStreamInput *vips_stream_input_new_from_blob( VipsBlob *blob );
VipsStreamInput *vips_stream_input_new_from_memory( const void *data,
size_t size );
VipsStreamInput *vips_stream_input_new_from_options( const char *options );
VipsStreami *vips_streami_new_from_descriptor( int descriptor );
VipsStreami *vips_streami_new_from_filename( const char *filename );
VipsStreami *vips_streami_new_from_blob( VipsBlob *blob );
VipsStreami *vips_streami_new_from_memory( const void *data, size_t size );
VipsStreami *vips_streami_new_from_options( const char *options );
ssize_t vips_stream_input_read( VipsStreamInput *input,
unsigned char *data, size_t length );
const void *vips_stream_input_map( VipsStreamInput *input, size_t *length );
gint64 vips_stream_input_seek( VipsStreamInput *input,
gint64 offset, int whence );
int vips_stream_input_rewind( VipsStreamInput *input );
void vips_stream_input_minimise( VipsStreamInput *input );
int vips_stream_input_decode( VipsStreamInput *input );
unsigned char *vips_stream_input_sniff( VipsStreamInput *input, size_t length );
gint64 vips_stream_input_size( VipsStreamInput *input );
ssize_t vips_streami_read( VipsStreami *streami, void *data, size_t length );
const void *vips_streami_map( VipsStreami *streami, size_t *length );
gint64 vips_streami_seek( VipsStreami *streami, gint64 offset, int whence );
int vips_streami_rewind( VipsStreami *streami );
void vips_streami_minimise( VipsStreami *streami );
int vips_streami_decode( VipsStreami *streami );
unsigned char *vips_streami_sniff( VipsStreami *streami, size_t length );
gint64 vips_streami_size( VipsStreami *streami );
#define VIPS_TYPE_STREAM_OUTPUT (vips_stream_output_get_type())
#define VIPS_STREAM_OUTPUT( obj ) \
#define VIPS_TYPE_STREAMO (vips_streamo_get_type())
#define VIPS_STREAMO( obj ) \
(G_TYPE_CHECK_INSTANCE_CAST( (obj), \
VIPS_TYPE_STREAM_OUTPUT, VipsStreamOutput ))
#define VIPS_STREAM_OUTPUT_CLASS( klass ) \
VIPS_TYPE_STREAMO, VipsStreamo ))
#define VIPS_STREAMO_CLASS( klass ) \
(G_TYPE_CHECK_CLASS_CAST( (klass), \
VIPS_TYPE_STREAM_OUTPUT, VipsStreamOutputClass))
#define VIPS_IS_STREAM_OUTPUT( obj ) \
(G_TYPE_CHECK_INSTANCE_TYPE( (obj), VIPS_TYPE_STREAM_OUTPUT ))
#define VIPS_IS_STREAM_OUTPUT_CLASS( klass ) \
(G_TYPE_CHECK_CLASS_TYPE( (klass), VIPS_TYPE_STREAM_OUTPUT ))
#define VIPS_STREAM_OUTPUT_GET_CLASS( obj ) \
VIPS_TYPE_STREAMO, VipsStreamoClass))
#define VIPS_IS_STREAMO( obj ) \
(G_TYPE_CHECK_INSTANCE_TYPE( (obj), VIPS_TYPE_STREAMO ))
#define VIPS_IS_STREAMO_CLASS( klass ) \
(G_TYPE_CHECK_CLASS_TYPE( (klass), VIPS_TYPE_STREAMO ))
#define VIPS_STREAMO_GET_CLASS( obj ) \
(G_TYPE_INSTANCE_GET_CLASS( (obj), \
VIPS_TYPE_STREAM_OUTPUT, VipsStreamOutputClass ))
VIPS_TYPE_STREAMO, VipsStreamoClass ))
/* Output to something like a socket, pipe or memory area.
*/
typedef struct _VipsStreamOutput {
typedef struct _VipsStreamo {
VipsStream parent_object;
/*< private >*/
@ -249,30 +250,29 @@ typedef struct _VipsStreamOutput {
*/
VipsBlob *blob;
} VipsStreamOutput;
} VipsStreamo;
typedef struct _VipsStreamOutputClass {
typedef struct _VipsStreamoClass {
VipsStreamClass parent_class;
/* If defined, output some bytes with this. Otherwise use write().
*/
ssize_t (*write)( VipsStreamOutput *, const unsigned char *, size_t );
ssize_t (*write)( VipsStreamo *, const void *, size_t );
/* A complete output image has been generated, so do any clearing up,
* eg. copy the bytes we saved in memory to the output blob.
* eg. copy the bytes we saved in memory to the stream blob.
*/
void (*finish)( VipsStreamOutput * );
void (*finish)( VipsStreamo * );
} VipsStreamOutputClass;
} VipsStreamoClass;
GType vips_stream_output_get_type( void );
GType vips_streamo_get_type( void );
VipsStreamOutput *vips_stream_output_new_from_descriptor( int descriptor );
VipsStreamOutput *vips_stream_output_new_from_filename( const char *filename );
VipsStreamOutput *vips_stream_output_new_memory( void );
int vips_stream_output_write( VipsStreamOutput *output,
const unsigned char *data, size_t length );
void vips_stream_output_finish( VipsStreamOutput *output );
VipsStreamo *vips_streamo_new_from_descriptor( int descriptor );
VipsStreamo *vips_streamo_new_from_filename( const char *filename );
VipsStreamo *vips_streamo_new_memory( void );
int vips_streamo_write( VipsStreamo *streamo, const void *data, size_t length );
void vips_streamo_finish( VipsStreamo *streamo );
#ifdef __cplusplus
}

View File

@ -2,6 +2,8 @@ noinst_LTLIBRARIES = libiofuncs.la
libiofuncs_la_SOURCES = \
stream.c \
streami.c \
streamo.c \
dbuf.c \
reorder.c \
vipsmarshal.h \

View File

@ -31,26 +31,6 @@
*/
/* TODO
*
* - catch out of range seeks
* - some sanity thing to prevent endless streams filling memory?
* - filename encoding
* - seek END and _size need thinking about with pipes ... perhaps we should
* force-read the whole thing in
* - _size() needs to be a vfunc too (libtiff needs it)
* - only fetch size once
* - revise logic once we have all the use cases nailed down ... split to:
* + base class, just contain interface and perhaps sniff support
* + memory input subclass
* + mappable input subclass,
* + seekable file input subclass
* + pipe input subclass (uses header cache)
* - need to be able to set seekable and mappable via constructor
* - test we can really change all behaviour in the subclass ... add callbacks
* as well to make it simpler for language bindings
*/
/*
*/
#define VIPS_DEBUG
@ -76,28 +56,6 @@
#include <vips/internal.h>
#include <vips/debug.h>
/* Try to make an O_BINARY ... sometimes need the leading '_'.
*/
#ifdef BINARY_OPEN
#ifndef O_BINARY
#ifdef _O_BINARY
#define O_BINARY _O_BINARY
#endif /*_O_BINARY*/
#endif /*!O_BINARY*/
#endif /*BINARY_OPEN*/
/* If we have O_BINARY, add it to a mode flags set.
*/
#ifdef O_BINARY
#define BINARYIZE(M) ((M) | O_BINARY)
#else /*!O_BINARY*/
#define BINARYIZE(M) (M)
#endif /*O_BINARY*/
#define MODE_READ BINARYIZE (O_RDONLY)
#define MODE_READWRITE BINARYIZE (O_RDWR)
#define MODE_WRITE BINARYIZE (O_WRONLY | O_CREAT | O_TRUNC)
/**
* SECTION: stream
* @short_description: a source/sink of bytes, perhaps a network socket
@ -115,17 +73,12 @@
/**
* VipsStream:
*
* A #VipsStream is a source of bytes for something like jpeg loading. It can
* be connected to a network socket, for example.
* A #VipsStream is a source or sink of bytes for something like jpeg loading.
* It can be connected to a network socket, for example.
*/
G_DEFINE_ABSTRACT_TYPE( VipsStream, vips_stream, VIPS_TYPE_OBJECT );
#define STREAM_NAME( STREAM ) \
(VIPS_STREAM( STREAM )->filename ? \
VIPS_STREAM( STREAM )->filename : \
VIPS_OBJECT( STREAM )->nickname)
static void
vips_stream_close( VipsStream *stream )
{
@ -197,903 +150,9 @@ vips_stream_init( VipsStream *stream )
}
const char *
vips_stream_filename( VipsStream *stream )
vips_stream_name( VipsStream *stream )
{
return( stream->filename );
}
G_DEFINE_TYPE( VipsStreamInput, vips_stream_input, VIPS_TYPE_STREAM );
static void
vips_stream_input_finalize( GObject *gobject )
{
VipsStreamInput *input = VIPS_STREAM_INPUT( gobject );
VIPS_FREEF( g_byte_array_unref, input->header_bytes );
VIPS_FREEF( g_byte_array_unref, input->sniff );
G_OBJECT_CLASS( vips_stream_input_parent_class )->finalize( gobject );
}
static int
vips_stream_input_open( VipsStreamInput *input )
{
VipsStream *stream = VIPS_STREAM( input );
if( stream->descriptor == -1 &&
stream->tracked_descriptor == -1 &&
stream->filename ) {
int fd;
if( (fd = vips_tracked_open( stream->filename,
MODE_READ )) == -1 ) {
vips_error_system( errno, STREAM_NAME( stream ),
"%s", _( "unable to open for read" ) );
return( -1 );
}
stream->tracked_descriptor = fd;
stream->descriptor = fd;
VIPS_DEBUG_MSG( "vips_stream_input_open: "
"restoring read position %zd\n", input->read_position );
if( vips__seek( stream->descriptor,
input->read_position, SEEK_SET ) == -1 )
return( -1 );
}
return( 0 );
}
static int
vips_stream_input_build( VipsObject *object )
{
VipsStream *stream = VIPS_STREAM( object );
VipsStreamInput *input = VIPS_STREAM_INPUT( object );
VIPS_DEBUG_MSG( "vips_stream_input_build: %p\n", input );
if( VIPS_OBJECT_CLASS( vips_stream_input_parent_class )->
build( object ) )
return( -1 );
if( vips_object_argument_isset( object, "filename" ) &&
vips_object_argument_isset( object, "descriptor" ) ) {
vips_error( STREAM_NAME( stream ),
"%s", _( "don't set 'filename' and 'descriptor'" ) );
return( -1 );
}
if( vips_object_argument_isset( object, "filename" ) &&
vips_stream_input_open( input ) )
return( -1 );
if( vips_object_argument_isset( object, "descriptor" ) ) {
stream->descriptor = dup( stream->descriptor );
stream->close_descriptor = stream->descriptor;
}
/* If there's a descriptor for input, test its properties.
*/
if( stream->descriptor != -1 ) {
/* Do +=0 on the current position. This fails for pipes, at
* least on linux.
*/
if( vips__seek( stream->descriptor, 0, SEEK_CUR ) != -1 )
input->seekable = TRUE;
if( vips__mmap_supported( stream->descriptor ) )
input->mappable = TRUE;
}
if( vips_object_argument_isset( object, "blob" ) )
input->seekable = TRUE;
/* Need to save the header if the source is not seekable.
*/
if( !input->seekable )
input->header_bytes = g_byte_array_new();
/* We always want a sniff buffer.
*/
input->sniff = g_byte_array_new();
return( 0 );
}
static ssize_t
vips_stream_input_read_real( VipsStreamInput *input,
unsigned char *data, size_t length )
{
VipsStream *stream = VIPS_STREAM( input );
VIPS_DEBUG_MSG( "vips_stream_input_read_real:\n" );
if( input->blob ) {
VipsArea *area = VIPS_AREA( input->blob );
ssize_t available = VIPS_MIN( length,
area->length - input->read_position );
if( available <= 0 )
return( 0 );
memcpy( data, area->data + input->read_position, available );
return( available );
}
else if( stream->descriptor != -1 ) {
return( read( stream->descriptor, data, length ) );
}
else {
g_assert( 0 );
return( -1 );
}
}
static const void *
vips_stream_input_map_real( VipsStreamInput *input, size_t *length )
{
VipsStream *stream = VIPS_STREAM( input );
gint64 file_length;
const void *file_baseaddr;
if( (file_length = vips_file_length( stream->descriptor )) < 0 )
return( NULL );
if( !(file_baseaddr = vips__mmap( stream->descriptor,
FALSE, file_length, 0 )) )
return( NULL );
if( length )
*length = file_length;
return( file_baseaddr );
}
static gint64
vips_stream_input_seek_real( VipsStreamInput *input, gint64 offset, int whence )
{
VipsStream *stream = VIPS_STREAM( input );
VIPS_DEBUG_MSG( "vips_stream_input_seek_real:\n" );
if( !input->seekable ||
stream->descriptor == -1 ) {
vips_error( STREAM_NAME( stream ), "%s", _( "not seekable" ) );
return( -1 );
}
return( vips__seek( stream->descriptor, offset, whence ) );
}
static void
vips_stream_input_minimise_real( VipsStreamInput *input )
{
VipsStream *stream = VIPS_STREAM( input );
VIPS_DEBUG_MSG( "vips_stream_input_minimise_real:\n" );
if( stream->filename &&
stream->descriptor != -1 &&
input->seekable )
vips_stream_close( stream );
}
static void
vips_stream_input_class_init( VipsStreamInputClass *class )
{
GObjectClass *gobject_class = G_OBJECT_CLASS( class );
VipsObjectClass *object_class = VIPS_OBJECT_CLASS( class );
gobject_class->finalize = vips_stream_input_finalize;
gobject_class->set_property = vips_object_set_property;
gobject_class->get_property = vips_object_get_property;
object_class->nickname = "stream_input";
object_class->description = _( "input stream" );
object_class->build = vips_stream_input_build;
class->read = vips_stream_input_read_real;
class->map = vips_stream_input_map_real;
class->seek = vips_stream_input_seek_real;
class->minimise = vips_stream_input_minimise_real;
VIPS_ARG_BOXED( class, "blob", 3,
_( "Blob" ),
_( "blob to load from" ),
VIPS_ARGUMENT_OPTIONAL_INPUT,
G_STRUCT_OFFSET( VipsStreamInput, blob ),
VIPS_TYPE_BLOB );
}
static void
vips_stream_input_init( VipsStreamInput *input )
{
input->length = -1;
}
/**
* vips_stream_input_new_from_descriptor:
* @descriptor: read from this file descriptor
*
* Create an input stream attached to a file descriptor. @descriptor is
* closed with close() when the #VipsStream is finalized.
*
* Returns: a new #VipsStream
*/
VipsStreamInput *
vips_stream_input_new_from_descriptor( int descriptor )
{
VipsStreamInput *input;
VIPS_DEBUG_MSG( "vips_stream_input_new_from_descriptor: %d\n",
descriptor );
input = VIPS_STREAM_INPUT(
g_object_new( VIPS_TYPE_STREAM_INPUT,
"descriptor", descriptor,
NULL ) );
if( vips_object_build( VIPS_OBJECT( input ) ) ) {
VIPS_UNREF( input );
return( NULL );
}
return( input );
}
/**
* vips_stream_input_new_from_filename:
* @descriptor: read from this filename
*
* Create an input stream attached to a file.
*
* Returns: a new #VipsStream
*/
VipsStreamInput *
vips_stream_input_new_from_filename( const char *filename )
{
VipsStreamInput *input;
VIPS_DEBUG_MSG( "vips_stream_input_new_from_filename: %s\n",
filename );
input = VIPS_STREAM_INPUT(
g_object_new( VIPS_TYPE_STREAM_INPUT,
"filename", filename,
NULL ) );
if( vips_object_build( VIPS_OBJECT( input ) ) ) {
VIPS_UNREF( input );
return( NULL );
}
return( input );
}
/**
* vips_stream_input_new_from_blob:
* @blob: memory area to load
*
* Create a stream attached to an area of memory.
*
* Returns: a new #VipsStream
*/
VipsStreamInput *
vips_stream_input_new_from_blob( VipsBlob *blob )
{
VipsStreamInput *input;
VIPS_DEBUG_MSG( "vips_stream_input_new_from_blob: %p\n", blob );
input = VIPS_STREAM_INPUT(
g_object_new( VIPS_TYPE_STREAM_INPUT,
"blob", blob,
NULL ) );
if( vips_object_build( VIPS_OBJECT( input ) ) ) {
VIPS_UNREF( input );
return( NULL );
}
return( input );
}
/**
* vips_stream_input_new_from_memory:
* @data: memory area to load
* @length: size of memory area
*
* Create a stream attached to an area of memory.
*
* You must not free @data while the stream is active.
*
* Returns: a new #VipsStream
*/
VipsStreamInput *
vips_stream_input_new_from_memory( const void *data, size_t length )
{
VipsStreamInput *input;
VipsBlob *blob;
VIPS_DEBUG_MSG( "vips_stream_input_new_from_buffer: "
"%p, length = %zd\n", data, length );
/* We don't take a copy of the data or free it.
*/
blob = vips_blob_new( NULL, data, length );
input = vips_stream_input_new_from_blob( blob );
vips_area_unref( VIPS_AREA( blob ) );
return( input );
}
/**
* vips_stream_input_new_from_options:
* @options: option string
*
* Create a stream from an option string.
*
* Returns: a new #VipsStream
*/
VipsStreamInput *
vips_stream_input_new_from_options( const char *options )
{
VipsStreamInput *input;
VIPS_DEBUG_MSG( "vips_stream_input_new_from_options: %s\n", options );
input = VIPS_STREAM_INPUT(
g_object_new( VIPS_TYPE_STREAM_INPUT, NULL ) );
if( vips_object_set_from_string( VIPS_OBJECT( input ), options ) ||
vips_object_build( VIPS_OBJECT( input ) ) ) {
VIPS_UNREF( input );
return( NULL );
}
return( input );
}
ssize_t
vips_stream_input_read( VipsStreamInput *input,
unsigned char *buffer, size_t length )
{
VipsStreamInputClass *class = VIPS_STREAM_INPUT_GET_CLASS( input );
ssize_t bytes_read;
VIPS_DEBUG_MSG( "vips_stream_input_read:\n" );
bytes_read = 0;
/* Are we serving from header_bytes? Get what we can from there.
*/
if( input->header_bytes &&
input->read_position < input->header_bytes->len ) {
ssize_t available;
available = VIPS_MIN( length,
input->header_bytes->len - input->read_position );
memcpy( buffer,
input->header_bytes->data + input->read_position,
available );
input->read_position += available;
buffer += available;
length -= available;
bytes_read += available;
VIPS_DEBUG_MSG( " %zd bytes from cache\n", available );
}
/* Any more bytes required? Call the read() method.
*/
if( length > 0 ) {
ssize_t n;
if( (n = class->read( input, buffer, length )) == -1 ) {
vips_error_system( errno, STREAM_NAME( input ),
"%s", _( "read error" ) );
return( -1 );
}
/* We need to save bytes if we're in header mode and we can't
* seek or map.
*/
if( input->header_bytes &&
(!input->seekable || !input->mappable) &&
!input->decode &&
n > 0 )
g_byte_array_append( input->header_bytes,
buffer, n );
input->read_position += n;
bytes_read += n;
VIPS_DEBUG_MSG( " %zd bytes from read()\n", n );
}
VIPS_DEBUG_MSG( " %zd bytes total\n", bytes_read );
return( bytes_read );
}
const void *
vips_stream_input_map( VipsStreamInput *input, size_t *length )
{
VipsStreamInputClass *class = VIPS_STREAM_INPUT_GET_CLASS( input );
unsigned char buffer[4096];
VIPS_DEBUG_MSG( "vips_stream_input_map:\n" );
/* Memory source ... easy!
*/
if( input->blob ) {
VIPS_DEBUG_MSG( " memory source\n" );
return( vips_blob_get( input->blob, length ) );
}
/* An input that supports mmap.
*/
if( input->mappable ) {
VIPS_DEBUG_MSG( " mmaping source\n" );
if( !input->baseaddr ) {
input->baseaddr = class->map( input, &input->length );
if( !input->baseaddr )
return( NULL );
}
if( length )
*length = input->length;
return( input->baseaddr );
}
/* Have to read() the whole thing. header_bytes will keep a copy of
* the file.
*/
VIPS_DEBUG_MSG( " read() of entire source\n" );
if( vips_stream_input_rewind( input ) )
return( NULL );
while( vips_stream_input_read( input, buffer, 4096 ) > 0 )
;
if( length )
*length = input->header_bytes->len;
return( input->header_bytes->data );
}
gint64
vips_stream_input_seek( VipsStreamInput *input, gint64 offset, int whence )
{
VipsStreamInputClass *class = VIPS_STREAM_INPUT_GET_CLASS( input );
gint64 new_pos;
VIPS_DEBUG_MSG( "vips_stream_input_seek:\n" );
switch( whence ) {
case SEEK_SET:
new_pos = offset;
break;
case SEEK_CUR:
new_pos = input->read_position + offset;
break;
case SEEK_END:
/* TODO .. I don't think any loader needs SEEK_END, and
* implementing it on pipes would force us to read the whole
* thing in.
*/
default:
vips_error( STREAM_NAME( input ), "%s", _( "bad 'whence'" ) );
return( -1 );
break;
}
if( !input->seekable ) {
/* We can seek on non-seekable streams during the header phase.
*/
if( input->decode ) {
vips_error( STREAM_NAME( input ),
"%s", _( "can't rewind after decode begins" ) );
return( -1 );
}
g_assert( input->header_bytes );
/* We may not have read up to the new position.
*/
while( input->read_position < new_pos ) {
unsigned char buffer[4096];
ssize_t read;
read = vips_stream_input_read( input, buffer, 4096 );
if( read < 0 )
return( -1 );
if( read == 0 ) {
}
}
}
else if( input->blob ||
input->baseaddr ) {
/* Memory streams and mapped streams don't need a seek.
*/
;
}
else {
if( (new_pos = class->seek( input, offset, whence )) == -1 )
return( -1 );
}
input->read_position = new_pos;
return( new_pos );
}
int
vips_stream_input_rewind( VipsStreamInput *input )
{
VIPS_DEBUG_MSG( "vips_stream_input_rewind:\n" );
if( vips_stream_input_seek( input, 0, SEEK_SET ) != 0 )
return( -1 );
return( 0 );
}
void
vips_stream_input_minimise( VipsStreamInput *input )
{
VipsStreamInputClass *class = VIPS_STREAM_INPUT_GET_CLASS( input );
class->minimise( input );
}
int
vips_stream_input_decode( VipsStreamInput *input )
{
VIPS_DEBUG_MSG( "vips_stream_input_decode:\n" );
/* We have finished reading the header. We can discard the bytes we
* saved.
*/
if( !input->decode ) {
input->decode = TRUE;
VIPS_FREEF( g_byte_array_unref, input->header_bytes );
VIPS_FREEF( g_byte_array_unref, input->sniff );
}
/* Make sure we are open, in case we've been minimised.
*/
if( vips_stream_input_open( input ) )
return( -1 );
return( 0 );
}
/**
* vips_stream_input_sniff:
* @input: sniff this stream
* @length: number of bytes to sniff
*
* Return a pointer to the first few bytes of the file.
*/
unsigned char *
vips_stream_input_sniff( VipsStreamInput *input, size_t length )
{
ssize_t n;
unsigned char *q;
VIPS_DEBUG_MSG( "vips_stream_input_sniff: %zd bytes\n", length );
if( vips_stream_input_rewind( input ) )
return( NULL );
g_byte_array_set_size( input->sniff, length );
for( q = input->sniff->data; length > 0; length -= n, q += n )
if( (n = vips_stream_input_read( input, q, length )) == -1 ||
n == 0 )
return( NULL );
return( input->sniff->data );
}
gint64
vips_stream_input_size( VipsStreamInput *input )
{
VipsStream *stream = VIPS_STREAM( input );
gint64 size;
if( stream->descriptor >= 0 &&
(size = vips_file_length( stream->descriptor )) >= 0 )
return( size );
else if( input->blob )
return( VIPS_AREA( input->blob )->length );
else if( input->header_bytes )
return( input->header_bytes->len );
else
return( -1 );
}
G_DEFINE_TYPE( VipsStreamOutput, vips_stream_output, VIPS_TYPE_STREAM );
static void
vips_stream_output_finalize( GObject *gobject )
{
VipsStreamOutput *output = VIPS_STREAM_OUTPUT( gobject );
VIPS_DEBUG_MSG( "vips_stream_output_finalize:\n" );
VIPS_FREEF( g_byte_array_unref, output->memory );
if( output->blob ) {
vips_area_unref( VIPS_AREA( output->blob ) );
output->blob = NULL;
}
G_OBJECT_CLASS( vips_stream_output_parent_class )->finalize( gobject );
}
static int
vips_stream_output_build( VipsObject *object )
{
VipsStream *stream = VIPS_STREAM( object );
VipsStreamOutput *output = VIPS_STREAM_OUTPUT( object );
VIPS_DEBUG_MSG( "vips_stream_output_build: %p\n", output );
if( VIPS_OBJECT_CLASS( vips_stream_output_parent_class )->
build( object ) )
return( -1 );
if( vips_object_argument_isset( object, "filename" ) &&
vips_object_argument_isset( object, "descriptor" ) ) {
vips_error( STREAM_NAME( stream ),
"%s", _( "don't set 'filename' and 'descriptor'" ) );
return( -1 );
}
if( vips_object_argument_isset( object, "filename" ) ) {
const char *filename = stream->filename;
int fd;
/* 0644 is rw user, r group and other.
*/
if( (fd = vips_tracked_open( filename,
MODE_WRITE, 0644 )) == -1 ) {
vips_error_system( errno, STREAM_NAME( stream ),
"%s", _( "unable to open for write" ) );
return( -1 );
}
stream->tracked_descriptor = fd;
stream->descriptor = fd;
}
else if( vips_object_argument_isset( object, "descriptor" ) ) {
stream->descriptor = dup( stream->descriptor );
stream->close_descriptor = stream->descriptor;
}
else {
output->memory = g_byte_array_new();
}
return( 0 );
}
static ssize_t
vips_stream_output_write_real( VipsStreamOutput *output,
const unsigned char *data, size_t length )
{
VipsStream *stream = VIPS_STREAM( output );
ssize_t len;
VIPS_DEBUG_MSG( "vips_stream_output_write_real: %zd bytes\n", length );
if( output->memory ) {
g_byte_array_append( output->memory, data, length );
len = length;
}
else
len = write( stream->descriptor, data, length );
return( len );
}
static void
vips_stream_output_finish_real( VipsStreamOutput *output )
{
VIPS_DEBUG_MSG( "vips_stream_output_finish_real:\n" );
/* Move the output buffer into the blob so it can be read out.
*/
if( output->memory ) {
unsigned char *data;
size_t length;
length = output->memory->len;
data = g_byte_array_free( output->memory, FALSE );
output->memory = NULL;
vips_blob_set( output->blob,
(VipsCallbackFn) g_free, data, length );
}
}
static void
vips_stream_output_class_init( VipsStreamOutputClass *class )
{
GObjectClass *gobject_class = G_OBJECT_CLASS( class );
VipsObjectClass *object_class = VIPS_OBJECT_CLASS( class );
gobject_class->finalize = vips_stream_output_finalize;
gobject_class->set_property = vips_object_set_property;
gobject_class->get_property = vips_object_get_property;
object_class->nickname = "stream_output";
object_class->description = _( "output stream" );
object_class->build = vips_stream_output_build;
class->write = vips_stream_output_write_real;
class->finish = vips_stream_output_finish_real;
/* SET_ALWAYS means that blob is set by C and the obj system is not
* involved in creation or destruction. It can be read at any time.
*/
VIPS_ARG_BOXED( class, "blob", 3,
_( "Blob" ),
_( "Blob to save to" ),
VIPS_ARGUMENT_SET_ALWAYS,
G_STRUCT_OFFSET( VipsStreamOutput, blob ),
VIPS_TYPE_BLOB );
}
static void
vips_stream_output_init( VipsStreamOutput *output )
{
output->blob = vips_blob_new( NULL, NULL, 0 );
}
/**
* vips_stream_output_new_from_descriptor:
* @descriptor: write to this file descriptor
*
* Create a stream attached to a file descriptor.
* @descriptor is closed when
* the #VipsStream is finalized.
*
* See also: vips_stream_output_write().
*
* Returns: a new #VipsStream
*/
VipsStreamOutput *
vips_stream_output_new_from_descriptor( int descriptor )
{
VipsStreamOutput *stream;
VIPS_DEBUG_MSG( "vips_stream_output_new_from_descriptor: %d\n",
descriptor );
stream = VIPS_STREAM_OUTPUT(
g_object_new( VIPS_TYPE_STREAM_OUTPUT,
"descriptor", descriptor,
NULL ) );
if( vips_object_build( VIPS_OBJECT( stream ) ) ) {
VIPS_UNREF( stream );
return( NULL );
}
return( stream );
}
/**
* vips_stream_output_new_from_filename:
* @filename: write to this file
*
* Create a stream attached to a file.
*
* See also: vips_stream_output_write().
*
* Returns: a new #VipsStream
*/
VipsStreamOutput *
vips_stream_output_new_from_filename( const char *filename )
{
VipsStreamOutput *stream;
VIPS_DEBUG_MSG( "vips_stream_output_new_from_filename: %s\n",
filename );
stream = VIPS_STREAM_OUTPUT(
g_object_new( VIPS_TYPE_STREAM_OUTPUT,
"filename", filename,
NULL ) );
if( vips_object_build( VIPS_OBJECT( stream ) ) ) {
VIPS_UNREF( stream );
return( NULL );
}
return( stream );
}
/**
* vips_stream_output_new_memory:
*
* Create a stream which will output to a memory area. Read from @blob to get
* memory output.
*
* See also: vips_stream_output_write().
*
* Returns: a new #VipsStream
*/
VipsStreamOutput *
vips_stream_output_new_memory( void )
{
VipsStreamOutput *stream;
VIPS_DEBUG_MSG( "vips_stream_output_new_memory:\n" );
stream = VIPS_STREAM_OUTPUT(
g_object_new( VIPS_TYPE_STREAM_OUTPUT,
NULL ) );
if( vips_object_build( VIPS_OBJECT( stream ) ) ) {
VIPS_UNREF( stream );
return( NULL );
}
return( stream );
}
int
vips_stream_output_write( VipsStreamOutput *output,
const unsigned char *data, size_t length )
{
VipsStreamOutputClass *class = VIPS_STREAM_OUTPUT_GET_CLASS( output );
VIPS_DEBUG_MSG( "vips_stream_output_write: %zd bytes\n", length );
while( length > 0 ) {
ssize_t n;
n = class->write( output, data, length );
/* n == 0 isn't strictly an error, but we treat it as one to
* make sure we don't get stuck in this loop.
*/
if( n <= 0 ) {
vips_error_system( errno, STREAM_NAME( output ),
"%s", _( "write error" ) );
return( -1 );
}
length -= n;
data += n;
}
return( 0 );
}
void
vips_stream_output_finish( VipsStreamOutput *output )
{
VipsStreamOutputClass *class = VIPS_STREAM_OUTPUT_GET_CLASS( output );
VIPS_DEBUG_MSG( "vips_stream_output_finish:\n" );
class->finish( output );
return( VIPS_STREAM( STREAM )->filename ?
VIPS_STREAM( STREAM )->filename :
VIPS_OBJECT( STREAM )->nickname );
}

889
libvips/iofuncs/streami.c Normal file
View File

@ -0,0 +1,889 @@
/* A byte source/sink .. it can be a pipe, file descriptor, memory area,
* socket, node.js stream, etc.
*
* J.Cupitt, 19/6/14
*/
/*
This file is part of VIPS.
VIPS is free software; you can redistribute it and/or modify
it under the terms of the GNU Lesser General Public License as published by
the Free Software Foundation; either version 2 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
02110-1301 USA
*/
/*
These files are distributed with VIPS - http://www.vips.ecs.soton.ac.uk
*/
/* TODO
*
* - catch out of range seeks
* - some sanity thing to prevent endless streams filling memory?
* - filename encoding
* - seek END and _size need thinking about with pipes ... perhaps we should
* force-read the whole thing in
* - _size() needs to be a vfunc too (libtiff needs it)
* - _miminize() is a vfunc, so _open() must be as well
* + perhaps minimise should not be a vfunc? is it really useful for
* subclasses?
* + perhaps make _open() / _close() into vfuncs and don't expose minimise
* - only fetch length once
* - need to be able to set is_pipe via constructor
* - test we can really change all behaviour in the subclass ... add callbacks
* as well to make it simpler for language bindings
*/
/*
*/
#define VIPS_DEBUG
#ifdef HAVE_CONFIG_H
#include <config.h>
#endif /*HAVE_CONFIG_H*/
#include <vips/intl.h>
#include <stdio.h>
#include <stdlib.h>
#ifdef HAVE_UNISTD_H
#include <unistd.h>
#endif /*HAVE_UNISTD_H*/
#include <string.h>
#include <errno.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <unistd.h>
#include <vips/vips.h>
#include <vips/internal.h>
#include <vips/debug.h>
/* Try to make an O_BINARY ... sometimes need the leading '_'.
*/
#ifdef BINARY_OPEN
#ifndef O_BINARY
#ifdef _O_BINARY
#define O_BINARY _O_BINARY
#endif /*_O_BINARY*/
#endif /*!O_BINARY*/
#endif /*BINARY_OPEN*/
/* If we have O_BINARY, add it to a mode flags set.
*/
#ifdef O_BINARY
#define BINARYIZE(M) ((M) | O_BINARY)
#else /*!O_BINARY*/
#define BINARYIZE(M) (M)
#endif /*O_BINARY*/
#define MODE_READ BINARYIZE (O_RDONLY)
#define MODE_READWRITE BINARYIZE (O_RDWR)
#define MODE_WRITE BINARYIZE (O_WRONLY | O_CREAT | O_TRUNC)
G_DEFINE_TYPE( VipsStreami, vips_streami, VIPS_TYPE_STREAM );
static int
vips_streami_sanity( VipsStreami *streami )
{
VipsStream *stream = VIPS_STREAM( streami );
if( streami->blob ) {
VipsArea *area = VIPS_AREA( streami->blob );
/* Not a pipe (can map and seek).
*/
g_assert( !streami->is_pipe );
/* Read position must lie within the buffer.
*/
g_assert( streami->read_position >= 0 );
g_assert( streami->read_position < streami->length );
g_assert( streami->read_position < area->length );
/* No need for header tracking.
*/
g_assert( !streami->header_bytes );
/* Only have sniff during header,
*/
g_assert( streami->decode ||
!streami->sniff );
/* No descriptor or filename.
*/
g_assert( stream->descriptor == -1 );
g_assert( stream->close_descriptor == -1 );
g_assert( stream->tracked_descriptor == -1 );
g_assert( !stream->filename );
}
else if( streami->is_pipe ) {
/* In header, read_position must be within header_bytes.
*/
g_assert( !streami->decode ||
(streami->read_position >= 0 &&
streami->read_position < streami->header_bytes->len) );
/* If we're in the header, we must save bytes we read. If not
* in header, should have no saved bytes.
*/
g_assert( streami->decode ||
streami->header_bytes );
g_assert( !streami->decode ||
!streami->header_bytes );
/* After we're done with the header, the sniff buffer should
* be gone.
*/
g_assert( !streami->decode ||
!streami->sniff );
/* No length available.
*/
g_assert( streami->length == -1 );
}
else {
/* Something that supports seek and map. No need to save
* header bytes.
*/
g_assert( !streami->header_bytes );
/* After we're done with the header, the sniff buffer should
* be gone.
*/
g_assert( !streami->decode ||
!streami->sniff );
/* Have length.
*/
g_assert( streami->length != -1 );
/* Read position must lie within the file.
*/
g_assert( streami->read_position >= 0 );
g_assert( streami->read_position < streami->length );
/* No need for header tracking.
*/
g_assert( !streami->header_bytes );
/* Only have sniff during header,
*/
g_assert( streami->decode ||
!streami->sniff );
/* Supports minimise, so if descriptor is -1, we must have a
* filename we can reopen.
*/
g_assert( stream->descriptor != -1 ||
(stream->filename && stream->descriptor) );
}
}
static void
vips_streami_finalize( GObject *gobject )
{
VipsStreami *streami = VIPS_STREAMI( gobject );
VIPS_FREEF( g_byte_array_unref, streami->header_bytes );
VIPS_FREEF( g_byte_array_unref, streami->sniff );
G_OBJECT_CLASS( vips_streami_parent_class )->finalize( gobject );
}
static int
vips_streami_open( VipsStreami *streami )
{
VipsStream *stream = VIPS_STREAM( streami );
vips_streami_sanity( streami );
if( stream->descriptor == -1 &&
stream->tracked_descriptor == -1 &&
stream->filename ) {
int fd;
if( (fd = vips_tracked_open( stream->filename,
MODE_READ )) == -1 ) {
vips_error_system( errno, vips_stream_name( stream ),
"%s", _( "unable to open for read" ) );
return( -1 );
}
stream->tracked_descriptor = fd;
stream->descriptor = fd;
if( stream->length == -1 &&
(stream->length = vips_file_length( fd )) == -1 )
return( -1 );
VIPS_DEBUG_MSG( "vips_streami_open: "
"restoring read position %zd\n",
streami->read_position );
if( vips__seek( stream->descriptor,
streami->read_position, SEEK_SET ) == -1 )
return( -1 );
}
return( 0 );
}
static int
vips_streami_build( VipsObject *object )
{
VipsStream *stream = VIPS_STREAM( object );
VipsStreami *streami = VIPS_STREAMI( object );
VIPS_DEBUG_MSG( "vips_streami_build: %p\n", streami );
if( VIPS_OBJECT_CLASS( vips_streami_parent_class )->
build( object ) )
return( -1 );
if( vips_object_argument_isset( object, "filename" ) &&
vips_object_argument_isset( object, "descriptor" ) ) {
vips_error( STREAM_NAME( stream ),
"%s", _( "don't set 'filename' and 'descriptor'" ) );
return( -1 );
}
if( vips_object_argument_isset( object, "filename" ) &&
vips_streami_open( streami ) )
return( -1 );
if( vips_object_argument_isset( object, "descriptor" ) ) {
stream->descriptor = dup( stream->descriptor );
stream->close_descriptor = stream->descriptor;
}
/* If there's a descriptor for streami, test its properties.
*/
if( stream->descriptor != -1 ) {
/* Do +=0 on the current position. This fails for pipes, at
* least on linux.
*/
if( vips__seek( stream->descriptor, 0, SEEK_CUR ) != -1 )
streami->is_pipe = TRUE;
}
/* Need to save the header for pipe-style sources.
*/
if( streami->is_pipe )
streami->header_bytes = g_byte_array_new();
/* We always want a sniff buffer.
*/
streami->sniff = g_byte_array_new();
return( 0 );
}
static ssize_t
vips_streami_read_real( VipsStreami *streami, void *data, size_t length )
{
VipsStream *stream = VIPS_STREAM( streami );
VIPS_DEBUG_MSG( "vips_streami_read_real:\n" );
if( streami->blob ) {
VipsArea *area = VIPS_AREA( streami->blob );
ssize_t available = VIPS_MIN( length,
area->length - streami->read_position );
if( available <= 0 )
return( 0 );
memcpy( data, area->data + streami->read_position, available );
return( available );
}
else if( stream->descriptor != -1 ) {
return( read( stream->descriptor, data, length ) );
}
else {
g_assert( 0 );
return( -1 );
}
}
static const void *
vips_streami_map_real( VipsStreami *streami, size_t *length )
{
VipsStream *stream = VIPS_STREAM( streami );
const void *file_baseaddr;
g_assert( streami->length > 0 );
if( !(file_baseaddr = vips__mmap( stream->descriptor,
FALSE, streami->length, 0 )) )
return( NULL );
if( length )
*length = streami->length;
return( file_baseaddr );
}
static gint64
vips_streami_seek_real( VipsStreami *streami, gint64 offset, int whence )
{
VipsStream *stream = VIPS_STREAM( streami );
VIPS_DEBUG_MSG( "vips_streami_seek_real:\n" );
if( streami->is_pipe ||
stream->descriptor == -1 ) {
vips_error( vips_stream_name( stream ),
"%s", _( "not seekable" ) );
return( -1 );
}
return( vips__seek( stream->descriptor, offset, whence ) );
}
static void
vips_streami_minimise_real( VipsStreami *streami )
{
VipsStream *stream = VIPS_STREAM( streami );
VIPS_DEBUG_MSG( "vips_streami_minimise_real:\n" );
if( stream->filename &&
stream->descriptor != -1 &&
!streami->is_pipe )
vips_stream_close( stream );
}
static void
vips_streami_class_init( VipsStreamiClass *class )
{
GObjectClass *gobject_class = G_OBJECT_CLASS( class );
VipsObjectClass *object_class = VIPS_OBJECT_CLASS( class );
gobject_class->finalize = vips_streami_finalize;
gobject_class->set_property = vips_object_set_property;
gobject_class->get_property = vips_object_get_property;
object_class->nickname = "streami";
object_class->description = _( "streami stream" );
object_class->build = vips_streami_build;
class->read = vips_streami_read_real;
class->map = vips_streami_map_real;
class->seek = vips_streami_seek_real;
class->minimise = vips_streami_minimise_real;
VIPS_ARG_BOXED( class, "blob", 3,
_( "Blob" ),
_( "blob to load from" ),
VIPS_ARGUMENT_OPTIONAL_INPUT,
G_STRUCT_OFFSET( VipsStreami, blob ),
VIPS_TYPE_BLOB );
}
static void
vips_streami_init( VipsStreami *streami )
{
streami->length = -1;
}
/**
* vips_streami_new_from_descriptor:
* @descriptor: read from this file descriptor
*
* Create an streami stream attached to a file descriptor. @descriptor is
* closed with close() when the #VipsStream is finalized.
*
* Returns: a new #VipsStream
*/
VipsStreami *
vips_streami_new_from_descriptor( int descriptor )
{
VipsStreami *streami;
VIPS_DEBUG_MSG( "vips_streami_new_from_descriptor: %d\n",
descriptor );
streami = VIPS_STREAMI(
g_object_new( VIPS_TYPE_STREAMI,
"descriptor", descriptor,
NULL ) );
if( vips_object_build( VIPS_OBJECT( streami ) ) ) {
VIPS_UNREF( streami );
return( NULL );
}
vips_streami_sanity( streami );
return( streami );
}
/**
* vips_streami_new_from_filename:
* @descriptor: read from this filename
*
* Create an streami stream attached to a file.
*
* Returns: a new #VipsStream
*/
VipsStreami *
vips_streami_new_from_filename( const char *filename )
{
VipsStreami *streami;
VIPS_DEBUG_MSG( "vips_streami_new_from_filename: %s\n",
filename );
streami = VIPS_STREAMI(
g_object_new( VIPS_TYPE_STREAMI,
"filename", filename,
NULL ) );
if( vips_object_build( VIPS_OBJECT( streami ) ) ) {
VIPS_UNREF( streami );
return( NULL );
}
vips_streami_sanity( streami );
return( streami );
}
/**
* vips_streami_new_from_blob:
* @blob: memory area to load
*
* Create a stream attached to an area of memory.
*
* Returns: a new #VipsStream
*/
VipsStreami *
vips_streami_new_from_blob( VipsBlob *blob )
{
VipsStreami *streami;
VIPS_DEBUG_MSG( "vips_streami_new_from_blob: %p\n", blob );
streami = VIPS_STREAMI(
g_object_new( VIPS_TYPE_STREAMI,
"blob", blob,
NULL ) );
if( vips_object_build( VIPS_OBJECT( streami ) ) ) {
VIPS_UNREF( streami );
return( NULL );
}
vips_streami_sanity( streami );
return( streami );
}
/**
* vips_streami_new_from_memory:
* @data: memory area to load
* @length: size of memory area
*
* Create a stream attached to an area of memory.
*
* You must not free @data while the stream is active.
*
* Returns: a new #VipsStream
*/
VipsStreami *
vips_streami_new_from_memory( const void *data, size_t length )
{
VipsStreami *streami;
VipsBlob *blob;
VIPS_DEBUG_MSG( "vips_streami_new_from_buffer: "
"%p, length = %zd\n", data, length );
/* We don't take a copy of the data or free it.
*/
blob = vips_blob_new( NULL, data, length );
streami = vips_streami_new_from_blob( blob );
vips_area_unref( VIPS_AREA( blob ) );
vips_streami_sanity( streami );
return( streami );
}
/**
* vips_streami_new_from_options:
* @options: option string
*
* Create a stream from an option string.
*
* Returns: a new #VipsStream
*/
VipsStreami *
vips_streami_new_from_options( const char *options )
{
VipsStreami *streami;
VIPS_DEBUG_MSG( "vips_streami_new_from_options: %s\n", options );
streami = VIPS_STREAMI(
g_object_new( VIPS_TYPE_STREAMI, NULL ) );
if( vips_object_set_from_string( VIPS_OBJECT( streami ), options ) ||
vips_object_build( VIPS_OBJECT( streami ) ) ) {
VIPS_UNREF( streami );
return( NULL );
}
vips_streami_sanity( streami );
return( streami );
}
ssize_t
vips_streami_read( VipsStreami *streami, void *buffer, size_t length )
{
VipsStreamiClass *class = VIPS_STREAMI_GET_CLASS( streami );
ssize_t bytes_read;
VIPS_DEBUG_MSG( "vips_streami_read:\n" );
vips_streami_sanity( streami );
bytes_read = 0;
/* Are we serving from header_bytes? Get what we can from there.
*/
if( streami->header_bytes &&
streami->read_position < streami->header_bytes->len ) {
ssize_t available;
available = VIPS_MIN( length,
streami->header_bytes->len - streami->read_position );
memcpy( buffer,
streami->header_bytes->data + streami->read_position,
available );
streami->read_position += available;
buffer += available;
length -= available;
bytes_read += available;
VIPS_DEBUG_MSG( " %zd bytes from cache\n", available );
}
/* Any more bytes required? Call the read() vfunc.
*/
if( length > 0 ) {
ssize_t n;
if( (n = class->read( streami, buffer, length )) == -1 ) {
vips_error_system( errno, STREAM_NAME( streami ),
"%s", _( "read error" ) );
return( -1 );
}
/* We need to save bytes if we're in header mode and we can't
* seek or map.
*/
if( streami->header_bytes &&
streami->is_pipe &&
!streami->decode &&
n > 0 )
g_byte_array_append( streami->header_bytes,
buffer, n );
streami->read_position += n;
bytes_read += n;
VIPS_DEBUG_MSG( " %zd bytes from read()\n", n );
}
VIPS_DEBUG_MSG( " %zd bytes total\n", bytes_read );
vips_streami_sanity( streami );
return( bytes_read );
}
/* Read the entire pipe into memory and turn this into a memory source stream.
*/
static int
vips_streami_read_whole_pipe( VipsStreami *streami )
{
gint64 old_read_position;
unsigned char buffer[4096];
unsigned char *data;
VIPS_DEBUG_MSG( "vips_streami_read_whole_pipe:\n" );
vips_streami_sanity( streami );
old_read_position = streami->read_position;
if( vips_streami_rewind( streami ) )
return( -1 );
/* TODO ... add something to prevent unbounded streams filling memory.
*/
while( vips_streami_read( streami, buffer, 4096 ) > 0 )
;
streami->read_position = old_read_position;
/* Move header_bytes into the memory blob and set up as a memory
* source.
*/
streami->length = streami->header_bytes->len;
data = g_byte_array_free( streami->header_bytes, FALSE );
streami->header_bytes = NULL;
vips_blob_set( streami->blob,
(VipsCallbackFn) g_free, data, streami->length );
vips_stream_close( stream );
streami->is_pipe = FALSE;
vips_streami_sanity( streami );
return( 0 );
}
const void *
vips_streami_map( VipsStreami *streami, size_t *length_out )
{
VipsStreamiClass *class = VIPS_STREAMI_GET_CLASS( streami );
void *data;
ssize_t length;
VIPS_DEBUG_MSG( "vips_streami_map:\n" );
vips_streami_sanity( streami );
/* Memory source ... easy!
*/
if( streami->blob ) {
VIPS_DEBUG_MSG( " memory source\n" );
data = vips_blob_get( streami->blob, &length );
}
else if( streami->is_pipe ) {
if( vips_streami_read_whole_pipe( streami ) )
return( NULL );
data = vips_blob_get( streami->blob, &length );
}
else {
/* A streami that supports mmap.
*/
VIPS_DEBUG_MSG( " mmaping source\n" );
if( !streami->baseaddr &&
!(streami->baseaddr =
class->map( streami, &streami->length )) )
return( NULL );
length = streami->length;
data = streami->baseaddr;
}
if( length_out )
*length_out = length;
vips_streami_sanity( streami );
return( data );
}
gint64
vips_streami_seek( VipsStreami *streami, gint64 offset, int whence )
{
VipsStreamiClass *class = VIPS_STREAMI_GET_CLASS( streami );
gint64 new_pos;
VIPS_DEBUG_MSG( "vips_streami_seek:\n" );
vips_streami_sanity( streami );
switch( whence ) {
case SEEK_SET:
new_pos = offset;
break;
case SEEK_CUR:
new_pos = streami->read_position + offset;
break;
case SEEK_END:
if( streami->length == -1 &&
vips_streami_read_whole_pipe( streami ) )
return( -1 );
new_pos = streami->length + offset;
break;
default:
vips_error( vips_stream_name( VIPS_STREAM( streami ) ),
"%s", _( "bad 'whence'" ) );
return( -1 );
break;
}
if( streami->is_pipe ) {
/* We can seek on non-seekable streams during the header phase.
*/
if( streami->decode ) {
vips_error( STREAM_NAME( streami ),
"%s", _( "can't rewind after decode begins" ) );
return( -1 );
}
g_assert( streami->header_bytes );
/* We may not have read up to the new position.
*/
while( streami->read_position < new_pos ) {
unsigned char buffer[4096];
ssize_t read;
read = vips_streami_read( streami, buffer, 4096 );
if( read < 0 )
return( -1 );
if( read == 0 ) {
}
}
}
else {
if( (new_pos = class->seek( streami, offset, whence )) == -1 )
return( -1 );
}
streami->read_position = new_pos;
vips_streami_sanity( streami );
return( new_pos );
}
int
vips_streami_rewind( VipsStreami *streami )
{
VIPS_DEBUG_MSG( "vips_streami_rewind:\n" );
vips_streami_sanity( streami );
if( vips_streami_seek( streami, 0, SEEK_SET ) != 0 )
return( -1 );
vips_streami_sanity( streami );
return( 0 );
}
void
vips_streami_minimise( VipsStreami *streami )
{
VipsStreamiClass *class = VIPS_STREAMI_GET_CLASS( streami );
vips_streami_sanity( streami );
class->minimise( streami );
vips_streami_sanity( streami );
}
int
vips_streami_decode( VipsStreami *streami )
{
VIPS_DEBUG_MSG( "vips_streami_decode:\n" );
vips_streami_sanity( streami );
/* We have finished reading the header. We can discard the bytes we
* saved.
*/
if( !streami->decode ) {
streami->decode = TRUE;
VIPS_FREEF( g_byte_array_unref, streami->header_bytes );
VIPS_FREEF( g_byte_array_unref, streami->sniff );
}
/* Make sure we are open, in case we've been minimised.
*/
if( vips_streami_open( streami ) )
return( -1 );
vips_streami_sanity( streami );
return( 0 );
}
/**
* vips_streami_sniff:
* @streami: sniff this stream
* @length: number of bytes to sniff
*
* Return a pointer to the first few bytes of the file.
*/
unsigned char *
vips_streami_sniff( VipsStreami *streami, size_t length )
{
ssize_t n;
unsigned char *q;
VIPS_DEBUG_MSG( "vips_streami_sniff: %zd bytes\n", length );
vips_streami_sanity( streami );
if( vips_streami_rewind( streami ) )
return( NULL );
g_byte_array_set_size( streami->sniff, length );
for( q = streami->sniff->data; length > 0; length -= n, q += n )
if( (n = vips_streami_read( streami, q, length )) == -1 ||
n == 0 )
return( NULL );
vips_streami_sanity( streami );
return( streami->sniff->data );
}
gint64
vips_streami_size( VipsStreami *streami )
{
VipsStream *stream = VIPS_STREAM( streami );
gint64 size;
vips_streami_sanity( streami );
if( stream->descriptor >= 0 &&
(size = vips_file_length( stream->descriptor )) >= 0 )
return( size );
else if( streami->blob )
return( VIPS_AREA( streami->blob )->length );
else if( streami->header_bytes )
return( streami->header_bytes->len );
else
return( -1 );
}

353
libvips/iofuncs/streamo.c Normal file
View File

@ -0,0 +1,353 @@
/* A byte source/sink .. it can be a pipe, file descriptor, memory area,
* socket, node.js stream, etc.
*
* J.Cupitt, 19/6/14
*/
/*
This file is part of VIPS.
VIPS is free software; you can redistribute it and/or modify
it under the terms of the GNU Lesser General Public License as published by
the Free Software Foundation; either version 2 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
02110-1301 USA
*/
/*
These files are distributed with VIPS - http://www.vips.ecs.soton.ac.uk
*/
/* TODO
*
* - filename encoding
* - need printf()-style output? eg. for CSV write ... see dbuf.c
* - test we can really change all behaviour in the subclass ... add callbacks
* as well to make it simpler for language bindings
*/
/*
*/
#define VIPS_DEBUG
#ifdef HAVE_CONFIG_H
#include <config.h>
#endif /*HAVE_CONFIG_H*/
#include <vips/intl.h>
#include <stdio.h>
#include <stdlib.h>
#ifdef HAVE_UNISTD_H
#include <unistd.h>
#endif /*HAVE_UNISTD_H*/
#include <string.h>
#include <errno.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <unistd.h>
#include <vips/vips.h>
#include <vips/internal.h>
#include <vips/debug.h>
/* Try to make an O_BINARY ... sometimes need the leading '_'.
*/
#ifdef BINARY_OPEN
#ifndef O_BINARY
#ifdef _O_BINARY
#define O_BINARY _O_BINARY
#endif /*_O_BINARY*/
#endif /*!O_BINARY*/
#endif /*BINARY_OPEN*/
/* If we have O_BINARY, add it to a mode flags set.
*/
#ifdef O_BINARY
#define BINARYIZE(M) ((M) | O_BINARY)
#else /*!O_BINARY*/
#define BINARYIZE(M) (M)
#endif /*O_BINARY*/
#define MODE_READ BINARYIZE (O_RDONLY)
#define MODE_READWRITE BINARYIZE (O_RDWR)
#define MODE_WRITE BINARYIZE (O_WRONLY | O_CREAT | O_TRUNC)
G_DEFINE_TYPE( VipsStreamo, vips_streamo, VIPS_TYPE_STREAM );
static void
vips_streamo_finalize( GObject *gobject )
{
VipsStreamo *streamo = VIPS_STREAMO( gobject );
VIPS_DEBUG_MSG( "vips_streamo_finalize:\n" );
VIPS_FREEF( g_byte_array_unref, streamo->memory );
if( streamo->blob ) {
vips_area_unref( VIPS_AREA( streamo->blob ) );
streamo->blob = NULL;
}
G_OBJECT_CLASS( vips_streamo_parent_class )->finalize( gobject );
}
static int
vips_streamo_build( VipsObject *object )
{
VipsStream *stream = VIPS_STREAM( object );
VipsStreamo *streamo = VIPS_STREAMO( object );
VIPS_DEBUG_MSG( "vips_streamo_build: %p\n", stream );
if( VIPS_OBJECT_CLASS( vips_streamo_parent_class )->build( object ) )
return( -1 );
if( vips_object_argument_isset( object, "filename" ) &&
vips_object_argument_isset( object, "descriptor" ) ) {
vips_error( vips_stream_name( stream ),
"%s", _( "don't set 'filename' and 'descriptor'" ) );
return( -1 );
}
if( vips_object_argument_isset( object, "filename" ) ) {
const char *filename = stream->filename;
int fd;
/* 0644 is rw user, r group and other.
*/
if( (fd = vips_tracked_open( filename,
MODE_WRITE, 0644 )) == -1 ) {
vips_error_system( errno, vips_stream_name( stream ),
"%s", _( "unable to open for write" ) );
return( -1 );
}
stream->tracked_descriptor = fd;
stream->descriptor = fd;
}
else if( vips_object_argument_isset( object, "descriptor" ) ) {
stream->descriptor = dup( stream->descriptor );
stream->close_descriptor = stream->descriptor;
}
else {
streamo->memory = g_byte_array_new();
}
return( 0 );
}
static ssize_t
vips_streamo_write_real( VipsStreamo *streamo, const void *data, size_t length )
{
VipsStream *stream = VIPS_STREAM( streamo );
ssize_t len;
VIPS_DEBUG_MSG( "vips_streamo_write_real: %zd bytes\n", length );
if( streamo->memory ) {
g_byte_array_append( streamo->memory, data, length );
len = length;
}
else
len = write( stream->descriptor, data, length );
return( len );
}
static void
vips_streamo_finish_real( VipsStreamo *streamo )
{
VIPS_DEBUG_MSG( "vips_streamo_finish_real:\n" );
/* Move the stream buffer into the blob so it can be read out.
*/
if( streamo->memory ) {
unsigned char *data;
size_t length;
length = streamo->memory->len;
data = g_byte_array_free( streamo->memory, FALSE );
streamo->memory = NULL;
vips_blob_set( streamo->blob,
(VipsCallbackFn) g_free, data, length );
}
}
static void
vips_streamo_class_init( VipsStreamoClass *class )
{
GObjectClass *gobject_class = G_OBJECT_CLASS( class );
VipsObjectClass *object_class = VIPS_OBJECT_CLASS( class );
gobject_class->finalize = vips_streamo_finalize;
gobject_class->set_property = vips_object_set_property;
gobject_class->get_property = vips_object_get_property;
object_class->nickname = "streamo";
object_class->description = _( "stream stream" );
object_class->build = vips_streamo_build;
class->write = vips_streamo_write_real;
class->finish = vips_streamo_finish_real;
/* SET_ALWAYS means that blob is set by C and the obj system is not
* involved in creation or destruction. It can be read at any time.
*/
VIPS_ARG_BOXED( class, "blob", 3,
_( "Blob" ),
_( "Blob to save to" ),
VIPS_ARGUMENT_SET_ALWAYS,
G_STRUCT_OFFSET( VipsStreamo, blob ),
VIPS_TYPE_BLOB );
}
static void
vips_streamo_init( VipsStreamo *streamo )
{
streamo->blob = vips_blob_new( NULL, NULL, 0 );
}
/**
* vips_streamo_new_from_descriptor:
* @descriptor: write to this file descriptor
*
* Create a stream attached to a file descriptor.
* @descriptor is closed when
* the #VipsStream is finalized.
*
* See also: vips_streamo_write().
*
* Returns: a new #VipsStream
*/
VipsStreamo *
vips_streamo_new_from_descriptor( int descriptor )
{
VipsStreamo *streamo;
VIPS_DEBUG_MSG( "vips_streamo_new_from_descriptor: %d\n",
descriptor );
streamo = VIPS_STREAMO( g_object_new( VIPS_TYPE_STREAMO,
"descriptor", descriptor,
NULL ) );
if( vips_object_build( VIPS_OBJECT( streamo ) ) ) {
VIPS_UNREF( streamo );
return( NULL );
}
return( streamo );
}
/**
* vips_streamo_new_from_filename:
* @filename: write to this file
*
* Create a stream attached to a file.
*
* See also: vips_streamo_write().
*
* Returns: a new #VipsStream
*/
VipsStreamo *
vips_streamo_new_from_filename( const char *filename )
{
VipsStreamo *streamo;
VIPS_DEBUG_MSG( "vips_streamo_new_from_filename: %s\n",
filename );
streamo = VIPS_STREAMO( g_object_new( VIPS_TYPE_STREAMO,
"filename", filename,
NULL ) );
if( vips_object_build( VIPS_OBJECT( streamo ) ) ) {
VIPS_UNREF( streamo );
return( NULL );
}
return( streamo );
}
/**
* vips_streamo_new_memory:
*
* Create a stream which will stream to a memory area. Read from @blob to get
* memory stream.
*
* See also: vips_streamo_write().
*
* Returns: a new #VipsStream
*/
VipsStreamo *
vips_streamo_new_memory( void )
{
VipsStreamo *streamo;
VIPS_DEBUG_MSG( "vips_streamo_new_memory:\n" );
streamo = VIPS_STREAMO( g_object_new( VIPS_TYPE_STREAMO, NULL ) );
if( vips_object_build( VIPS_OBJECT( streamo ) ) ) {
VIPS_UNREF( streamo );
return( NULL );
}
return( streamo );
}
int
vips_streamo_write( VipsStreamo *streamo, const void *data, size_t length )
{
VipsStreamoClass *class = VIPS_STREAMO_GET_CLASS( stream );
VIPS_DEBUG_MSG( "vips_streamo_write: %zd bytes\n", length );
while( length > 0 ) {
ssize_t n;
n = class->write( streamo, data, length );
/* n == 0 isn't strictly an error, but we treat it as one to
* make sure we don't get stuck in this loop.
*/
if( n <= 0 ) {
vips_error_system( errno,
vips_stream_name( VIPS_STREAM( streamo ) ),
"%s", _( "write error" ) );
return( -1 );
}
length -= n;
data += n;
}
return( 0 );
}
void
vips_streamo_finish( VipsStreamo *streamo )
{
VipsStreamoClass *class = VIPS_STREAMO_GET_CLASS( streamo );
VIPS_DEBUG_MSG( "vips_streamo_finish:\n" );
class->finish( streamo );
}