add VipsStreamou

An output stream subclass you can easily connect to other destinations.
This commit is contained in:
John Cupitt 2019-11-21 17:54:10 +00:00
parent 6423721951
commit 91b0703921
6 changed files with 283 additions and 40 deletions

View File

@ -273,7 +273,9 @@ VipsStreamiu *vips_streamiu_new( void );
(G_TYPE_INSTANCE_GET_CLASS( (obj), \
VIPS_TYPE_STREAMO, VipsStreamoClass ))
#define VIPS_STREAMO_BUFFER_SIZE (4096)
/* PNG writes in 8kb chunks, so we need to be a little larger than that.
*/
#define VIPS_STREAMO_BUFFER_SIZE (8500)
/* Output to something like a socket, pipe or memory area.
*/
@ -282,13 +284,17 @@ typedef struct _VipsStreamo {
/*< private >*/
/* This stream should write to memory.
*/
gboolean memory;
/* The stream has been finished and can no longer be written.
*/
gboolean finished;
/* Write memory output here.
*/
GByteArray *memory;
GByteArray *memory_buffer;
/* And return memory via this blob.
*/
@ -337,6 +343,45 @@ int vips_streamo_writef( VipsStreamo *streamo, const char *fmt, ... )
__attribute__((format(printf, 2, 3)));
int vips_streamo_write_amp( VipsStreamo *streamo, const char *str );
#define VIPS_TYPE_STREAMOU (vips_streamou_get_type())
#define VIPS_STREAMOU( obj ) \
(G_TYPE_CHECK_INSTANCE_CAST( (obj), \
VIPS_TYPE_STREAMOU, VipsStreamou ))
#define VIPS_STREAMOU_CLASS( klass ) \
(G_TYPE_CHECK_CLASS_CAST( (klass), \
VIPS_TYPE_STREAMOU, VipsStreamouClass))
#define VIPS_IS_STREAMOU( obj ) \
(G_TYPE_CHECK_INSTANCE_TYPE( (obj), VIPS_TYPE_STREAMOU ))
#define VIPS_IS_STREAMOU_CLASS( klass ) \
(G_TYPE_CHECK_CLASS_TYPE( (klass), VIPS_TYPE_STREAMOU ))
#define VIPS_STREAMOU_GET_CLASS( obj ) \
(G_TYPE_INSTANCE_GET_CLASS( (obj), \
VIPS_TYPE_STREAMOU, VipsStreamouClass ))
#define VIPS_STREAMOU_BUFFER_SIZE (4096)
/* Output to something like a socket, pipe or memory area.
*/
typedef struct _VipsStreamou {
VipsStreamo parent_object;
} VipsStreamou;
typedef struct _VipsStreamouClass {
VipsStreamoClass parent_class;
/* The action signals clients can use to implement write and finish.
* We must use gint64 everywhere since there's no G_TYPE_SIZE.
*/
gint64 (*write)( VipsStreamou *, const void *, gint64 );
void (*finish)( VipsStreamou * );
} VipsStreamouClass;
GType vips_streamou_get_type( void );
VipsStreamou *vips_streamou_new( void );
#ifdef __cplusplus
}
#endif /*__cplusplus*/

View File

@ -5,6 +5,7 @@ libiofuncs_la_SOURCES = \
streami.c \
streamiu.c \
streamo.c \
streamou.c \
bufis.c \
dbuf.c \
reorder.c \

View File

@ -333,6 +333,7 @@ vips_init( const char *argv0 )
extern GType vips_streami_get_type( void );
extern GType vips_streamiu_get_type( void );
extern GType vips_streamo_get_type( void );
extern GType vips_streamou_get_type( void );
static gboolean started = FALSE;
static gboolean done = FALSE;
@ -450,6 +451,7 @@ vips_init( const char *argv0 )
(void) vips_streami_get_type();
(void) vips_streamiu_get_type();
(void) vips_streamo_get_type();
(void) vips_streamou_get_type();
vips__meta_init_types();
vips__interpolate_init();
im__format_init();

View File

@ -1,4 +1,4 @@
/* A Streamiu subclass with signals you can easily hook up to other input
/* A Streami subclass with signals you can easily hook up to other input
* sources.
*
* J.Cupitt, 21/11/19
@ -31,11 +31,6 @@
*/
/* TODO
*
* - can we map and then close the fd? how about on Windows?
*/
/*
#define VIPS_DEBUG
*/
@ -77,9 +72,9 @@ static guint vips_streamiu_signals[SIG_LAST] = { 0 };
static ssize_t
vips_streamiu_read_real( VipsStreami *streami,
void *data, size_t length )
void *buffer, size_t length )
{
ssize_t bytes_read;
gint64 bytes_read;
VIPS_DEBUG_MSG( "vips_streamiu_read_real:\n" );
@ -88,7 +83,7 @@ vips_streamiu_read_real( VipsStreami *streami,
bytes_read = 0;
g_signal_emit( streami, vips_streamiu_signals[SIG_READ], 0,
data, length, &bytes_read );
buffer, length, &bytes_read );
VIPS_DEBUG_MSG( " %zd\n", bytes_read );
@ -151,13 +146,12 @@ vips_streamiu_class_init( VipsStreamiuClass *class )
/**
* VipsStreamiu::read:
* @streamiu: the stream being operated on
* @buffer: %gint64, seek offset
* @size: %gint, seek origin
* @buffer: %gpointer, buffer to fill
* @size: %gint64, size of buffer
*
* This signal is emitted to seek the stream. The handler should
* change the stream position appropriately.
* This signal is emitted to read bytes from the source into @buffer.
*
* Returns: the new seek position.
* Returns: the number of bytes read.
*/
vips_streamiu_signals[SIG_READ] = g_signal_new( "read",
G_TYPE_FROM_CLASS( class ),

View File

@ -31,12 +31,6 @@
*/
/* TODO
*
* - test we can really change all behaviour in the subclass ... add callbacks
* as well to make it simpler for language bindings
*/
/*
#define VIPS_DEBUG
*/
@ -93,7 +87,7 @@ vips_streamo_finalize( GObject *gobject )
VIPS_DEBUG_MSG( "vips_streamo_finalize:\n" );
VIPS_FREEF( g_byte_array_unref, streamo->memory );
VIPS_FREEF( g_byte_array_unref, streamo->memory_buffer );
if( streamo->blob ) {
vips_area_unref( VIPS_AREA( streamo->blob ) );
streamo->blob = NULL;
@ -120,7 +114,7 @@ vips_streamo_build( VipsObject *object )
return( -1 );
}
if( vips_object_argument_isset( object, "filename" ) ) {
if( stream->filename ) {
const char *filename = stream->filename;
int fd;
@ -141,8 +135,8 @@ vips_streamo_build( VipsObject *object )
stream->descriptor = dup( stream->descriptor );
stream->close_descriptor = stream->descriptor;
}
else {
streamo->memory = g_byte_array_new();
else if( streamo->memory ) {
streamo->memory_buffer = g_byte_array_new();
}
return( 0 );
@ -182,10 +176,17 @@ vips_streamo_class_init( VipsStreamoClass *class )
class->write = vips_streamo_write_real;
class->finish = vips_streamo_finish_real;
VIPS_ARG_BOOL( class, "memory", 3,
_( "Memory" ),
_( "File descriptor should output to memory" ),
VIPS_ARGUMENT_OPTIONAL_INPUT,
G_STRUCT_OFFSET( VipsStreamo, memory ),
FALSE );
/* SET_ALWAYS means that blob is set by C and the obj system is not
* involved in creation or destruction. It can be read at any time.
*/
VIPS_ARG_BOXED( class, "blob", 3,
VIPS_ARG_BOXED( class, "blob", 4,
_( "Blob" ),
_( "Blob to save to" ),
VIPS_ARGUMENT_SET_ALWAYS,
@ -277,7 +278,9 @@ vips_streamo_new_to_memory( void )
VIPS_DEBUG_MSG( "vips_streamo_new_to_memory:\n" );
streamo = VIPS_STREAMO( g_object_new( VIPS_TYPE_STREAMO, NULL ) );
streamo = VIPS_STREAMO( g_object_new( VIPS_TYPE_STREAMO,
"memory", TRUE,
NULL ) );
if( vips_object_build( VIPS_OBJECT( streamo ) ) ) {
VIPS_UNREF( streamo );
@ -293,11 +296,13 @@ vips_streamo_write_unbuffered( VipsStreamo *streamo,
{
VipsStreamoClass *class = VIPS_STREAMO_GET_CLASS( streamo );
VIPS_DEBUG_MSG( "vips_streamo_write_unbuffered:\n" );
if( streamo->finished )
return( 0 );
if( streamo->memory )
g_byte_array_append( streamo->memory, data, length );
if( streamo->memory_buffer )
g_byte_array_append( streamo->memory_buffer, data, length );
else
while( length > 0 ) {
ssize_t n;
@ -328,6 +333,8 @@ vips_streamo_flush( VipsStreamo *streamo )
g_assert( streamo->write_point >= 0 );
g_assert( streamo->write_point <= VIPS_STREAMO_BUFFER_SIZE );
VIPS_DEBUG_MSG( "vips_streamo_flush:\n" );
if( streamo->write_point > 0 ) {
if( vips_streamo_write_unbuffered( streamo,
streamo->output_buffer, streamo->write_point ) )
@ -397,13 +404,13 @@ vips_streamo_finish( VipsStreamo *streamo )
/* Move the stream buffer into the blob so it can be read out.
*/
if( streamo->memory ) {
if( streamo->memory_buffer ) {
unsigned char *data;
size_t length;
length = streamo->memory->len;
data = g_byte_array_free( streamo->memory, FALSE );
streamo->memory = NULL;
length = streamo->memory_buffer->len;
data = g_byte_array_free( streamo->memory_buffer, FALSE );
streamo->memory_buffer = NULL;
vips_blob_set( streamo->blob,
(VipsCallbackFn) g_free, data, length );
}
@ -436,22 +443,22 @@ vips_streamo_steal( VipsStreamo *streamo, size_t *length )
(void) vips_streamo_flush( streamo );
if( !streamo->memory ||
if( !streamo->memory_buffer ||
streamo->finished ) {
if( length )
*length = streamo->memory->len;
*length = streamo->memory_buffer->len;
return( NULL );
}
if( length )
*length = streamo->memory->len;
data = g_byte_array_free( streamo->memory, FALSE );
streamo->memory = NULL;
*length = streamo->memory_buffer->len;
data = g_byte_array_free( streamo->memory_buffer, FALSE );
streamo->memory_buffer = NULL;
/* We must have a valid byte array or finish will fail.
*/
streamo->memory = g_byte_array_new();
streamo->memory_buffer = g_byte_array_new();
vips_streamo_finish( streamo );

194
libvips/iofuncs/streamou.c Normal file
View File

@ -0,0 +1,194 @@
/* A Streamo subclass with signals you can easily hook up to other output
* sources.
*
* J.Cupitt, 21/11/19
*/
/*
This file is part of VIPS.
VIPS is free software; you can redistribute it and/or modify
it under the terms of the GNU Lesser General Public License as published by
the Free Software Foundation; either version 2 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
02110-1301 USA
*/
/*
These files are distributed with VIPS - http://www.vips.ecs.soton.ac.uk
*/
/*
#define VIPS_DEBUG
*/
#ifdef HAVE_CONFIG_H
#include <config.h>
#endif /*HAVE_CONFIG_H*/
#include <vips/intl.h>
#include <stdio.h>
#include <stdlib.h>
#ifdef HAVE_UNISTD_H
#include <unistd.h>
#endif /*HAVE_UNISTD_H*/
#include <string.h>
#include <errno.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <unistd.h>
#include <vips/vips.h>
#include <vips/internal.h>
#include <vips/debug.h>
#include "vipsmarshal.h"
G_DEFINE_TYPE( VipsStreamou, vips_streamou, VIPS_TYPE_STREAMO );
/* Our signals.
*/
enum {
SIG_WRITE,
SIG_FINISH,
SIG_LAST
};
static guint vips_streamou_signals[SIG_LAST] = { 0 };
static ssize_t
vips_streamou_write_real( VipsStreamo *streamo,
const void *data, size_t length )
{
gint64 bytes_written;
VIPS_DEBUG_MSG( "vips_streamou_write_real:\n" );
/* Return value if no attached handler.
*/
bytes_written = 0;
g_signal_emit( streamo, vips_streamou_signals[SIG_WRITE], 0,
data, length, &bytes_written );
VIPS_DEBUG_MSG( " %zd\n", bytes_written );
return( bytes_written );
}
static void
vips_streamou_finish_real( VipsStreamo *streamo )
{
VIPS_DEBUG_MSG( "vips_streamou_seek_real:\n" );
g_signal_emit( streamo, vips_streamou_signals[SIG_FINISH], 0 );
}
static gint64
vips_streamou_write_signal_real( VipsStreamou *streamou,
const void *data, gint64 length )
{
VIPS_DEBUG_MSG( "vips_streamou_write_signal_real:\n" );
return( 0 );
}
static void
vips_streamou_finish_signal_real( VipsStreamou *streamou )
{
VIPS_DEBUG_MSG( "vips_streamou_finish_signal_real:\n" );
}
static void
vips_streamou_class_init( VipsStreamouClass *class )
{
VipsObjectClass *object_class = VIPS_OBJECT_CLASS( class );
VipsStreamoClass *streamo_class = VIPS_STREAMO_CLASS( class );
object_class->nickname = "streamou";
object_class->description = _( "input stream" );
streamo_class->write = vips_streamou_write_real;
streamo_class->finish = vips_streamou_finish_real;
class->write = vips_streamou_write_signal_real;
class->finish = vips_streamou_finish_signal_real;
/**
* VipsStreamou::write:
* @streamou: the stream being operated on
* @data: %pointer, bytes to write
* @length: %gint64, number of bytes
*
* This signal is emitted to write bytes to the stream.
*
* Returns: the number of bytes written.
*/
vips_streamou_signals[SIG_WRITE] = g_signal_new( "write",
G_TYPE_FROM_CLASS( class ),
G_SIGNAL_ACTION,
G_STRUCT_OFFSET( VipsStreamouClass, write ),
NULL, NULL,
vips_INT64__POINTER_INT64,
G_TYPE_INT64, 2,
G_TYPE_POINTER, G_TYPE_INT64 );
/**
* VipsStreamou::finish:
* @streamou: the stream being operated on
*
* This signal is emitted at the end of write. The stream should do
* any finishing necessary.
*/
vips_streamou_signals[SIG_FINISH] = g_signal_new( "finish",
G_TYPE_FROM_CLASS( class ),
G_SIGNAL_ACTION,
G_STRUCT_OFFSET( VipsStreamouClass, finish ),
NULL, NULL,
g_cclosure_marshal_VOID__VOID,
G_TYPE_NONE, 0 );
}
static void
vips_streamou_init( VipsStreamou *streamou )
{
}
/**
* vips_streamou_new:
*
* Create a #VipsStreamou. Attach signals to implement write and finish.
*
* Returns: a new #VipsStreamou
*/
VipsStreamou *
vips_streamou_new( void )
{
VipsStreamou *streamou;
VIPS_DEBUG_MSG( "vips_streamou_new:\n" );
streamou = VIPS_STREAMOU( g_object_new( VIPS_TYPE_STREAMOU, NULL ) );
if( vips_object_build( VIPS_OBJECT( streamou ) ) ) {
VIPS_UNREF( streamou );
return( NULL );
}
return( streamou );
}