Remove mutex lock for VipsThreadStartFn

vips_{avg,deviate,hough,max,min,stats} are the only arithmetic functions
that do not require a mutex on the _start and/or _stop function. All
other arithmetic functions still needs this, so move it to sink instead.
This commit is contained in:
Kleis Auke Wolthuizen 2020-02-03 14:54:01 +01:00
parent 80e0cc3d12
commit 4144049174
2 changed files with 34 additions and 12 deletions

View File

@ -73,6 +73,10 @@ typedef struct _Sink {
*/ */
VipsImage *t; VipsImage *t;
/* Mutex for serialising calls to VipsStartFn and VipsStopFn.
*/
GMutex *sslock;
/* Call params. /* Call params.
*/ */
VipsStartFn start_fn; VipsStartFn start_fn;
@ -249,9 +253,21 @@ static int
sink_call_stop( Sink *sink, SinkThreadState *state ) sink_call_stop( Sink *sink, SinkThreadState *state )
{ {
if( state->seq && sink->stop_fn ) { if( state->seq && sink->stop_fn ) {
int result;
VIPS_DEBUG_MSG( "sink_call_stop: state = %p\n", state ); VIPS_DEBUG_MSG( "sink_call_stop: state = %p\n", state );
if( sink->stop_fn( state->seq, sink->a, sink->b ) ) { VIPS_GATE_START( "sink_call_stop: wait" );
g_mutex_lock( sink->sslock );
VIPS_GATE_STOP( "sink_call_stop: wait" );
result = sink->stop_fn( state->seq, sink->a, sink->b );
g_mutex_unlock( sink->sslock );
if( result ) {
SinkBase *sink_base = (SinkBase *) sink; SinkBase *sink_base = (SinkBase *) sink;
vips_error( "vips_sink", vips_error( "vips_sink",
@ -286,7 +302,15 @@ sink_call_start( Sink *sink, SinkThreadState *state )
if( !state->seq && sink->start_fn ) { if( !state->seq && sink->start_fn ) {
VIPS_DEBUG_MSG( "sink_call_start: state = %p\n", state ); VIPS_DEBUG_MSG( "sink_call_start: state = %p\n", state );
state->seq = sink->start_fn( sink->t, sink->a, sink->b ); VIPS_GATE_START( "sink_call_start: wait" );
g_mutex_lock( sink->sslock );
VIPS_GATE_STOP( "sink_call_start: wait" );
state->seq = sink->start_fn( sink->t, sink->a, sink->b );
g_mutex_unlock( sink->sslock );
if( !state->seq ) { if( !state->seq ) {
SinkBase *sink_base = (SinkBase *) sink; SinkBase *sink_base = (SinkBase *) sink;
@ -346,6 +370,7 @@ vips_sink_thread_state_new( VipsImage *im, void *a )
static void static void
sink_free( Sink *sink ) sink_free( Sink *sink )
{ {
VIPS_FREEF( vips_g_mutex_free, sink->sslock );
VIPS_FREEF( sink_area_free, sink->area ); VIPS_FREEF( sink_area_free, sink->area );
VIPS_FREEF( sink_area_free, sink->old_area ); VIPS_FREEF( sink_area_free, sink->old_area );
VIPS_FREEF( g_object_unref, sink->t ); VIPS_FREEF( g_object_unref, sink->t );
@ -381,6 +406,7 @@ sink_init( Sink *sink,
vips_sink_base_init( &sink->sink_base, image ); vips_sink_base_init( &sink->sink_base, image );
sink->t = NULL; sink->t = NULL;
sink->sslock = vips_g_mutex_new();
sink->start_fn = start_fn; sink->start_fn = start_fn;
sink->generate_fn = generate_fn; sink->generate_fn = generate_fn;
sink->stop_fn = stop_fn; sink->stop_fn = stop_fn;

View File

@ -25,6 +25,7 @@
* - free threadpool earlier * - free threadpool earlier
* 02/02/20 kleisauke * 02/02/20 kleisauke
* - reuse threads by using GLib's threadpool * - reuse threads by using GLib's threadpool
* - remove mutex lock for VipsThreadStartFn
*/ */
/* /*
@ -666,11 +667,8 @@ vips_task_run( gpointer data, gpointer user_data )
VIPS_GATE_START( "vips_task_run: thread" ); VIPS_GATE_START( "vips_task_run: thread" );
// TODO: Could we move this to vips_task_work_unit()?
g_mutex_lock( task->allocate_lock );
if( !(state = task->start( task->im, task->a )) ) if( !(state = task->start( task->im, task->a )) )
task->error = TRUE; task->error = TRUE;
g_mutex_unlock( task->allocate_lock );
/* Process work units! Always tick, even if we are stopping, so the /* Process work units! Always tick, even if we are stopping, so the
* main thread will wake up for exit. * main thread will wake up for exit.
@ -686,9 +684,7 @@ vips_task_run( gpointer data, gpointer user_data )
break; break;
} }
g_mutex_lock( task->allocate_lock );
VIPS_FREEF( g_object_unref, state ); VIPS_FREEF( g_object_unref, state );
g_mutex_unlock( task->allocate_lock );
/* We are exiting: tell the main thread. /* We are exiting: tell the main thread.
*/ */
@ -815,8 +811,8 @@ vips_threadpool_push( const char *name, GFunc func, gpointer data )
* is allocated to it to build the per-thread state. Per-thread state is used * is allocated to it to build the per-thread state. Per-thread state is used
* by #VipsThreadpoolAllocate and #VipsThreadpoolWork to communicate. * by #VipsThreadpoolAllocate and #VipsThreadpoolWork to communicate.
* *
* #VipsThreadState is a subclass of #VipsObject. Start functions are called * #VipsThreadState is a subclass of #VipsObject. Start functions can be
* from allocate, that is, they are single-threaded. * executed concurrently.
* *
* See also: vips_threadpool_run(). * See also: vips_threadpool_run().
* *
@ -899,9 +895,9 @@ vips_threadpool_push( const char *name, GFunc func, gpointer data )
* The object returned by @start must be an instance of a subclass of * The object returned by @start must be an instance of a subclass of
* #VipsThreadState. Use this to communicate between @allocate and @work. * #VipsThreadState. Use this to communicate between @allocate and @work.
* *
* @allocate and @start are always single-threaded (so they can write to the * @allocate is always single-threaded (so it can write to the
* per-pool state), whereas @work can be executed concurrently. @progress is * per-pool state), whereas @start and @work can be executed concurrently.
* always called by * @progress is always called by
* the main thread (ie. the thread which called vips_threadpool_run()). * the main thread (ie. the thread which called vips_threadpool_run()).
* *
* See also: vips_concurrency_set(). * See also: vips_concurrency_set().