From 91b070392167b8ae1047045d09c03930dc2c0e02 Mon Sep 17 00:00:00 2001 From: John Cupitt Date: Thu, 21 Nov 2019 17:54:10 +0000 Subject: [PATCH] add VipsStreamou An output stream subclass you can easily connect to other destinations. --- libvips/include/vips/stream.h | 49 ++++++++- libvips/iofuncs/Makefile.am | 1 + libvips/iofuncs/init.c | 2 + libvips/iofuncs/streamiu.c | 22 ++-- libvips/iofuncs/streamo.c | 55 +++++----- libvips/iofuncs/streamou.c | 194 ++++++++++++++++++++++++++++++++++ 6 files changed, 283 insertions(+), 40 deletions(-) create mode 100644 libvips/iofuncs/streamou.c diff --git a/libvips/include/vips/stream.h b/libvips/include/vips/stream.h index d6055718..066b11dc 100644 --- a/libvips/include/vips/stream.h +++ b/libvips/include/vips/stream.h @@ -273,7 +273,9 @@ VipsStreamiu *vips_streamiu_new( void ); (G_TYPE_INSTANCE_GET_CLASS( (obj), \ VIPS_TYPE_STREAMO, VipsStreamoClass )) -#define VIPS_STREAMO_BUFFER_SIZE (4096) +/* PNG writes in 8kb chunks, so we need to be a little larger than that. + */ +#define VIPS_STREAMO_BUFFER_SIZE (8500) /* Output to something like a socket, pipe or memory area. */ @@ -282,13 +284,17 @@ typedef struct _VipsStreamo { /*< private >*/ + /* This stream should write to memory. + */ + gboolean memory; + /* The stream has been finished and can no longer be written. */ gboolean finished; /* Write memory output here. */ - GByteArray *memory; + GByteArray *memory_buffer; /* And return memory via this blob. */ @@ -337,6 +343,45 @@ int vips_streamo_writef( VipsStreamo *streamo, const char *fmt, ... ) __attribute__((format(printf, 2, 3))); int vips_streamo_write_amp( VipsStreamo *streamo, const char *str ); +#define VIPS_TYPE_STREAMOU (vips_streamou_get_type()) +#define VIPS_STREAMOU( obj ) \ + (G_TYPE_CHECK_INSTANCE_CAST( (obj), \ + VIPS_TYPE_STREAMOU, VipsStreamou )) +#define VIPS_STREAMOU_CLASS( klass ) \ + (G_TYPE_CHECK_CLASS_CAST( (klass), \ + VIPS_TYPE_STREAMOU, VipsStreamouClass)) +#define VIPS_IS_STREAMOU( obj ) \ + (G_TYPE_CHECK_INSTANCE_TYPE( (obj), VIPS_TYPE_STREAMOU )) +#define VIPS_IS_STREAMOU_CLASS( klass ) \ + (G_TYPE_CHECK_CLASS_TYPE( (klass), VIPS_TYPE_STREAMOU )) +#define VIPS_STREAMOU_GET_CLASS( obj ) \ + (G_TYPE_INSTANCE_GET_CLASS( (obj), \ + VIPS_TYPE_STREAMOU, VipsStreamouClass )) + +#define VIPS_STREAMOU_BUFFER_SIZE (4096) + +/* Output to something like a socket, pipe or memory area. + */ +typedef struct _VipsStreamou { + VipsStreamo parent_object; + +} VipsStreamou; + +typedef struct _VipsStreamouClass { + VipsStreamoClass parent_class; + + /* The action signals clients can use to implement write and finish. + * We must use gint64 everywhere since there's no G_TYPE_SIZE. + */ + + gint64 (*write)( VipsStreamou *, const void *, gint64 ); + void (*finish)( VipsStreamou * ); + +} VipsStreamouClass; + +GType vips_streamou_get_type( void ); +VipsStreamou *vips_streamou_new( void ); + #ifdef __cplusplus } #endif /*__cplusplus*/ diff --git a/libvips/iofuncs/Makefile.am b/libvips/iofuncs/Makefile.am index 76957617..9005ead0 100644 --- a/libvips/iofuncs/Makefile.am +++ b/libvips/iofuncs/Makefile.am @@ -5,6 +5,7 @@ libiofuncs_la_SOURCES = \ streami.c \ streamiu.c \ streamo.c \ + streamou.c \ bufis.c \ dbuf.c \ reorder.c \ diff --git a/libvips/iofuncs/init.c b/libvips/iofuncs/init.c index 4fc72969..a072fcb0 100644 --- a/libvips/iofuncs/init.c +++ b/libvips/iofuncs/init.c @@ -333,6 +333,7 @@ vips_init( const char *argv0 ) extern GType vips_streami_get_type( void ); extern GType vips_streamiu_get_type( void ); extern GType vips_streamo_get_type( void ); + extern GType vips_streamou_get_type( void ); static gboolean started = FALSE; static gboolean done = FALSE; @@ -450,6 +451,7 @@ vips_init( const char *argv0 ) (void) vips_streami_get_type(); (void) vips_streamiu_get_type(); (void) vips_streamo_get_type(); + (void) vips_streamou_get_type(); vips__meta_init_types(); vips__interpolate_init(); im__format_init(); diff --git a/libvips/iofuncs/streamiu.c b/libvips/iofuncs/streamiu.c index c8f64b17..dbfbb26c 100644 --- a/libvips/iofuncs/streamiu.c +++ b/libvips/iofuncs/streamiu.c @@ -1,4 +1,4 @@ -/* A Streamiu subclass with signals you can easily hook up to other input +/* A Streami subclass with signals you can easily hook up to other input * sources. * * J.Cupitt, 21/11/19 @@ -31,11 +31,6 @@ */ -/* TODO - * - * - can we map and then close the fd? how about on Windows? - */ - /* #define VIPS_DEBUG */ @@ -77,9 +72,9 @@ static guint vips_streamiu_signals[SIG_LAST] = { 0 }; static ssize_t vips_streamiu_read_real( VipsStreami *streami, - void *data, size_t length ) + void *buffer, size_t length ) { - ssize_t bytes_read; + gint64 bytes_read; VIPS_DEBUG_MSG( "vips_streamiu_read_real:\n" ); @@ -88,7 +83,7 @@ vips_streamiu_read_real( VipsStreami *streami, bytes_read = 0; g_signal_emit( streami, vips_streamiu_signals[SIG_READ], 0, - data, length, &bytes_read ); + buffer, length, &bytes_read ); VIPS_DEBUG_MSG( " %zd\n", bytes_read ); @@ -151,13 +146,12 @@ vips_streamiu_class_init( VipsStreamiuClass *class ) /** * VipsStreamiu::read: * @streamiu: the stream being operated on - * @buffer: %gint64, seek offset - * @size: %gint, seek origin + * @buffer: %gpointer, buffer to fill + * @size: %gint64, size of buffer * - * This signal is emitted to seek the stream. The handler should - * change the stream position appropriately. + * This signal is emitted to read bytes from the source into @buffer. * - * Returns: the new seek position. + * Returns: the number of bytes read. */ vips_streamiu_signals[SIG_READ] = g_signal_new( "read", G_TYPE_FROM_CLASS( class ), diff --git a/libvips/iofuncs/streamo.c b/libvips/iofuncs/streamo.c index e5849a5e..274798f4 100644 --- a/libvips/iofuncs/streamo.c +++ b/libvips/iofuncs/streamo.c @@ -31,12 +31,6 @@ */ -/* TODO - * - * - test we can really change all behaviour in the subclass ... add callbacks - * as well to make it simpler for language bindings - */ - /* #define VIPS_DEBUG */ @@ -93,7 +87,7 @@ vips_streamo_finalize( GObject *gobject ) VIPS_DEBUG_MSG( "vips_streamo_finalize:\n" ); - VIPS_FREEF( g_byte_array_unref, streamo->memory ); + VIPS_FREEF( g_byte_array_unref, streamo->memory_buffer ); if( streamo->blob ) { vips_area_unref( VIPS_AREA( streamo->blob ) ); streamo->blob = NULL; @@ -120,7 +114,7 @@ vips_streamo_build( VipsObject *object ) return( -1 ); } - if( vips_object_argument_isset( object, "filename" ) ) { + if( stream->filename ) { const char *filename = stream->filename; int fd; @@ -141,8 +135,8 @@ vips_streamo_build( VipsObject *object ) stream->descriptor = dup( stream->descriptor ); stream->close_descriptor = stream->descriptor; } - else { - streamo->memory = g_byte_array_new(); + else if( streamo->memory ) { + streamo->memory_buffer = g_byte_array_new(); } return( 0 ); @@ -182,10 +176,17 @@ vips_streamo_class_init( VipsStreamoClass *class ) class->write = vips_streamo_write_real; class->finish = vips_streamo_finish_real; + VIPS_ARG_BOOL( class, "memory", 3, + _( "Memory" ), + _( "File descriptor should output to memory" ), + VIPS_ARGUMENT_OPTIONAL_INPUT, + G_STRUCT_OFFSET( VipsStreamo, memory ), + FALSE ); + /* SET_ALWAYS means that blob is set by C and the obj system is not * involved in creation or destruction. It can be read at any time. */ - VIPS_ARG_BOXED( class, "blob", 3, + VIPS_ARG_BOXED( class, "blob", 4, _( "Blob" ), _( "Blob to save to" ), VIPS_ARGUMENT_SET_ALWAYS, @@ -277,7 +278,9 @@ vips_streamo_new_to_memory( void ) VIPS_DEBUG_MSG( "vips_streamo_new_to_memory:\n" ); - streamo = VIPS_STREAMO( g_object_new( VIPS_TYPE_STREAMO, NULL ) ); + streamo = VIPS_STREAMO( g_object_new( VIPS_TYPE_STREAMO, + "memory", TRUE, + NULL ) ); if( vips_object_build( VIPS_OBJECT( streamo ) ) ) { VIPS_UNREF( streamo ); @@ -293,11 +296,13 @@ vips_streamo_write_unbuffered( VipsStreamo *streamo, { VipsStreamoClass *class = VIPS_STREAMO_GET_CLASS( streamo ); + VIPS_DEBUG_MSG( "vips_streamo_write_unbuffered:\n" ); + if( streamo->finished ) return( 0 ); - if( streamo->memory ) - g_byte_array_append( streamo->memory, data, length ); + if( streamo->memory_buffer ) + g_byte_array_append( streamo->memory_buffer, data, length ); else while( length > 0 ) { ssize_t n; @@ -328,6 +333,8 @@ vips_streamo_flush( VipsStreamo *streamo ) g_assert( streamo->write_point >= 0 ); g_assert( streamo->write_point <= VIPS_STREAMO_BUFFER_SIZE ); + VIPS_DEBUG_MSG( "vips_streamo_flush:\n" ); + if( streamo->write_point > 0 ) { if( vips_streamo_write_unbuffered( streamo, streamo->output_buffer, streamo->write_point ) ) @@ -397,13 +404,13 @@ vips_streamo_finish( VipsStreamo *streamo ) /* Move the stream buffer into the blob so it can be read out. */ - if( streamo->memory ) { + if( streamo->memory_buffer ) { unsigned char *data; size_t length; - length = streamo->memory->len; - data = g_byte_array_free( streamo->memory, FALSE ); - streamo->memory = NULL; + length = streamo->memory_buffer->len; + data = g_byte_array_free( streamo->memory_buffer, FALSE ); + streamo->memory_buffer = NULL; vips_blob_set( streamo->blob, (VipsCallbackFn) g_free, data, length ); } @@ -436,22 +443,22 @@ vips_streamo_steal( VipsStreamo *streamo, size_t *length ) (void) vips_streamo_flush( streamo ); - if( !streamo->memory || + if( !streamo->memory_buffer || streamo->finished ) { if( length ) - *length = streamo->memory->len; + *length = streamo->memory_buffer->len; return( NULL ); } if( length ) - *length = streamo->memory->len; - data = g_byte_array_free( streamo->memory, FALSE ); - streamo->memory = NULL; + *length = streamo->memory_buffer->len; + data = g_byte_array_free( streamo->memory_buffer, FALSE ); + streamo->memory_buffer = NULL; /* We must have a valid byte array or finish will fail. */ - streamo->memory = g_byte_array_new(); + streamo->memory_buffer = g_byte_array_new(); vips_streamo_finish( streamo ); diff --git a/libvips/iofuncs/streamou.c b/libvips/iofuncs/streamou.c new file mode 100644 index 00000000..eff3d7b0 --- /dev/null +++ b/libvips/iofuncs/streamou.c @@ -0,0 +1,194 @@ +/* A Streamo subclass with signals you can easily hook up to other output + * sources. + * + * J.Cupitt, 21/11/19 + */ + +/* + + This file is part of VIPS. + + VIPS is free software; you can redistribute it and/or modify + it under the terms of the GNU Lesser General Public License as published by + the Free Software Foundation; either version 2 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA + 02110-1301 USA + + */ + +/* + + These files are distributed with VIPS - http://www.vips.ecs.soton.ac.uk + + */ + +/* +#define VIPS_DEBUG + */ + +#ifdef HAVE_CONFIG_H +#include +#endif /*HAVE_CONFIG_H*/ +#include + +#include +#include +#ifdef HAVE_UNISTD_H +#include +#endif /*HAVE_UNISTD_H*/ +#include +#include +#include +#include +#include +#include + +#include +#include +#include + +#include "vipsmarshal.h" + +G_DEFINE_TYPE( VipsStreamou, vips_streamou, VIPS_TYPE_STREAMO ); + +/* Our signals. + */ +enum { + SIG_WRITE, + SIG_FINISH, + SIG_LAST +}; + +static guint vips_streamou_signals[SIG_LAST] = { 0 }; + +static ssize_t +vips_streamou_write_real( VipsStreamo *streamo, + const void *data, size_t length ) +{ + gint64 bytes_written; + + VIPS_DEBUG_MSG( "vips_streamou_write_real:\n" ); + + /* Return value if no attached handler. + */ + bytes_written = 0; + + g_signal_emit( streamo, vips_streamou_signals[SIG_WRITE], 0, + data, length, &bytes_written ); + + VIPS_DEBUG_MSG( " %zd\n", bytes_written ); + + return( bytes_written ); +} + +static void +vips_streamou_finish_real( VipsStreamo *streamo ) +{ + VIPS_DEBUG_MSG( "vips_streamou_seek_real:\n" ); + + g_signal_emit( streamo, vips_streamou_signals[SIG_FINISH], 0 ); +} + +static gint64 +vips_streamou_write_signal_real( VipsStreamou *streamou, + const void *data, gint64 length ) +{ + VIPS_DEBUG_MSG( "vips_streamou_write_signal_real:\n" ); + + return( 0 ); +} + +static void +vips_streamou_finish_signal_real( VipsStreamou *streamou ) +{ + VIPS_DEBUG_MSG( "vips_streamou_finish_signal_real:\n" ); +} + +static void +vips_streamou_class_init( VipsStreamouClass *class ) +{ + VipsObjectClass *object_class = VIPS_OBJECT_CLASS( class ); + VipsStreamoClass *streamo_class = VIPS_STREAMO_CLASS( class ); + + object_class->nickname = "streamou"; + object_class->description = _( "input stream" ); + + streamo_class->write = vips_streamou_write_real; + streamo_class->finish = vips_streamou_finish_real; + + class->write = vips_streamou_write_signal_real; + class->finish = vips_streamou_finish_signal_real; + + /** + * VipsStreamou::write: + * @streamou: the stream being operated on + * @data: %pointer, bytes to write + * @length: %gint64, number of bytes + * + * This signal is emitted to write bytes to the stream. + * + * Returns: the number of bytes written. + */ + vips_streamou_signals[SIG_WRITE] = g_signal_new( "write", + G_TYPE_FROM_CLASS( class ), + G_SIGNAL_ACTION, + G_STRUCT_OFFSET( VipsStreamouClass, write ), + NULL, NULL, + vips_INT64__POINTER_INT64, + G_TYPE_INT64, 2, + G_TYPE_POINTER, G_TYPE_INT64 ); + + /** + * VipsStreamou::finish: + * @streamou: the stream being operated on + * + * This signal is emitted at the end of write. The stream should do + * any finishing necessary. + */ + vips_streamou_signals[SIG_FINISH] = g_signal_new( "finish", + G_TYPE_FROM_CLASS( class ), + G_SIGNAL_ACTION, + G_STRUCT_OFFSET( VipsStreamouClass, finish ), + NULL, NULL, + g_cclosure_marshal_VOID__VOID, + G_TYPE_NONE, 0 ); + +} + +static void +vips_streamou_init( VipsStreamou *streamou ) +{ +} + +/** + * vips_streamou_new: + * + * Create a #VipsStreamou. Attach signals to implement write and finish. + * + * Returns: a new #VipsStreamou + */ +VipsStreamou * +vips_streamou_new( void ) +{ + VipsStreamou *streamou; + + VIPS_DEBUG_MSG( "vips_streamou_new:\n" ); + + streamou = VIPS_STREAMOU( g_object_new( VIPS_TYPE_STREAMOU, NULL ) ); + + if( vips_object_build( VIPS_OBJECT( streamou ) ) ) { + VIPS_UNREF( streamou ); + return( NULL ); + } + + return( streamou ); +}