From 848a119faa7ba076bedaade815fe806fed584b27 Mon Sep 17 00:00:00 2001 From: Kleis Auke Wolthuizen Date: Tue, 3 Jan 2023 13:00:35 +0100 Subject: [PATCH] threadset: stop idle threads after a regular timeout (#3253) * Add `vips_semaphore_down_timeout` to public API In preparation for the next commit. * threadset: stop idle threads after a regular timeout In line with the previous behaviour of GLib's threadpool. * threadset: fix a small memory leak * threadset: ensure idle threads are freed on exit GLib threads are spawned with `ref_count == 2`, this reference is decreased during thread exit and associated resources are freed during `g_thread_join()`. However, idle threads that are stopped after a regular timeout are not joined. To fix this, decrease the reference count during `vips_threadset_add()` and increase it just before joining the thread with `g_thread_join()`. See: https://gitlab.gnome.org/GNOME/glib/-/blob/2d5d990c6ac7ff45bcc4485cce95e87687d47595/glib/gthread.c#L522 https://gitlab.gnome.org/GNOME/glib/-/blob/2d5d990c6ac7ff45bcc4485cce95e87687d47595/glib/gthread-posix.c#L1287 * Deprecate `vips_g_thread_join()` in favor of `g_thread_join()` * nit: remove extra semicolon --- libvips/deprecated/rename.c | 6 +++ libvips/include/vips/almostdeprecated.h | 3 ++ libvips/include/vips/semaphore.h | 2 + libvips/include/vips/thread.h | 2 - libvips/iofuncs/semaphore.c | 48 +++++++++++++++++++---- libvips/iofuncs/sinkscreen.c | 2 +- libvips/iofuncs/thread.c | 13 ------- libvips/iofuncs/threadset.c | 52 +++++++++++++++++-------- 8 files changed, 87 insertions(+), 41 deletions(-) diff --git a/libvips/deprecated/rename.c b/libvips/deprecated/rename.c index b312386e..e3bc987e 100644 --- a/libvips/deprecated/rename.c +++ b/libvips/deprecated/rename.c @@ -129,6 +129,12 @@ im_warning( const char *fmt, ... ) va_end( ap ); } +void * +vips_g_thread_join( GThread *thread ) +{ + return( g_thread_join( thread ) ); +} + int im_affine( IMAGE *in, IMAGE *out, double a, double b, double c, double d, double dx, double dy, diff --git a/libvips/include/vips/almostdeprecated.h b/libvips/include/vips/almostdeprecated.h index 0d43ab9e..4084134d 100644 --- a/libvips/include/vips/almostdeprecated.h +++ b/libvips/include/vips/almostdeprecated.h @@ -147,6 +147,9 @@ VIPS_DEPRECATED_FOR(g_warning) void im_warning( const char *fmt, ... ) G_GNUC_PRINTF( 1, 2 ); +VIPS_DEPRECATED_FOR(g_thread_join) +void *vips_g_thread_join( GThread *thread ); + VIPS_DEPRECATED int im_iterate( VipsImage *im, VipsStartFn start, im_generate_fn generate, VipsStopFn stop, diff --git a/libvips/include/vips/semaphore.h b/libvips/include/vips/semaphore.h index 0762aa44..8c9ef0fa 100644 --- a/libvips/include/vips/semaphore.h +++ b/libvips/include/vips/semaphore.h @@ -60,6 +60,8 @@ int vips_semaphore_up( VipsSemaphore *s ); VIPS_API int vips_semaphore_down( VipsSemaphore *s ); VIPS_API +int vips_semaphore_down_timeout( VipsSemaphore *s, gint64 timeout ); +VIPS_API int vips_semaphore_upn( VipsSemaphore *s, int n ); VIPS_API int vips_semaphore_downn( VipsSemaphore *s, int n ); diff --git a/libvips/include/vips/thread.h b/libvips/include/vips/thread.h index ac3cd528..347a7113 100644 --- a/libvips/include/vips/thread.h +++ b/libvips/include/vips/thread.h @@ -55,8 +55,6 @@ void vips_g_cond_free( GCond * ); */ VIPS_API GThread *vips_g_thread_new( const char *, GThreadFunc, gpointer ); -VIPS_API -void *vips_g_thread_join( GThread *thread ); VIPS_API gboolean vips_thread_isvips( void ); diff --git a/libvips/iofuncs/semaphore.c b/libvips/iofuncs/semaphore.c index d627a420..96a67be2 100644 --- a/libvips/iofuncs/semaphore.c +++ b/libvips/iofuncs/semaphore.c @@ -110,37 +110,69 @@ vips_semaphore_up( VipsSemaphore *s ) } /* Wait for sem>n, then subtract n. + * Returns -1 when the monotonic time in @end_time was passed. */ -int -vips_semaphore_downn( VipsSemaphore *s, int n ) +static int +vips__semaphore_downn_until( VipsSemaphore *s, int n, gint64 end_time ) { int value_after_op; - VIPS_GATE_START( "vips_semaphore_downn: wait" ); + VIPS_GATE_START( "vips__semaphore_downn_until: wait" ); g_mutex_lock( s->mutex ); - while( s->v < n ) - g_cond_wait( s->cond, s->mutex ); + while( s->v < n ) { + if( end_time == -1 ) + g_cond_wait( s->cond, s->mutex ); + else if( !g_cond_wait_until( s->cond, s->mutex, end_time ) ) { + /* timeout has passed. + */ + g_mutex_unlock( s->mutex ); + + VIPS_GATE_STOP( "vips__semaphore_downn_until: wait" ); + return( -1 ); + } + } + s->v -= n; value_after_op = s->v; g_mutex_unlock( s->mutex ); #ifdef DEBUG_IO - printf( "vips_semaphore_downn(\"%s\",%d): %d\n", + printf( "vips__semaphore_downn_until(\"%s\",%d): %d\n", s->name, n, value_after_op ); #endif /*DEBUG_IO*/ - VIPS_GATE_STOP( "vips_semaphore_downn: wait" ); + VIPS_GATE_STOP( "vips__semaphore_downn_until: wait" ); return( value_after_op ); } +/* Wait for sem>n, then subtract n. + */ +int +vips_semaphore_downn( VipsSemaphore *s, int n ) +{ + return( vips__semaphore_downn_until( s, n, -1 ) ); +} + /* Wait for sem > 0, then decrement. */ int vips_semaphore_down( VipsSemaphore *s ) { - return( vips_semaphore_downn( s, 1 ) ); + return( vips__semaphore_downn_until( s, 1, -1 ) ); +} + +/* Wait for sem > 0, then decrement. + * Returns -1 when the relative time in @timeout (in microseconds) + * was passed. + */ +int +vips_semaphore_down_timeout( VipsSemaphore *s, gint64 timeout ) +{ + gint64 end_time = g_get_monotonic_time () + timeout; + + return( vips__semaphore_downn_until( s, 1, end_time ) ); } diff --git a/libvips/iofuncs/sinkscreen.c b/libvips/iofuncs/sinkscreen.c index 3728e8d6..ebf34c39 100644 --- a/libvips/iofuncs/sinkscreen.c +++ b/libvips/iofuncs/sinkscreen.c @@ -456,7 +456,7 @@ vips__render_shutdown( void ) vips_semaphore_up( &n_render_dirty_sem ); - (void) vips_g_thread_join( thread ); + (void) g_thread_join( thread ); } else g_mutex_unlock( render_dirty_lock ); diff --git a/libvips/iofuncs/thread.c b/libvips/iofuncs/thread.c index ff2de251..479e6142 100644 --- a/libvips/iofuncs/thread.c +++ b/libvips/iofuncs/thread.c @@ -190,19 +190,6 @@ vips_g_thread_new( const char *domain, GThreadFunc func, gpointer data ) return( thread ); } -void * -vips_g_thread_join( GThread *thread ) -{ - void *result; - - result = g_thread_join( thread ); - - VIPS_DEBUG_MSG_RED( "vips_g_thread_join: g_thread_join( %p )\n", - thread ); - - return( result ); -} - static int get_num_processors( void ) { diff --git a/libvips/iofuncs/threadset.c b/libvips/iofuncs/threadset.c index 4e9cb10c..06c91c4b 100644 --- a/libvips/iofuncs/threadset.c +++ b/libvips/iofuncs/threadset.c @@ -92,6 +92,11 @@ struct _VipsThreadset { int max_threads; }; +/* The maximum relative time (in microseconds) that a thread waits + * for work before being stopped. + */ +static const gint64 max_idle_time = 15 * G_TIME_SPAN_SECOND; + /* The thread work function. */ static void * @@ -101,9 +106,13 @@ vips_threadset_work( void *pointer ) VipsThreadset *set = member->set; for(;;) { - /* Wait to be given work. + /* Wait for at least 15 seconds to be given work. + */ + if( vips_semaphore_down_timeout( &member->idle, max_idle_time ) == -1 ) + break; + + /* Killed or no task available? Leave this thread. */ - vips_semaphore_down( &member->idle ); if( member->kill || !member->func ) break; @@ -134,9 +143,18 @@ vips_threadset_work( void *pointer ) g_mutex_unlock( set->lock ); } - /* Kill has been requested. We leave this thread on the members - * list so it can be found and joined. + /* Timed-out or kill has been requested ... remove from both free + * and member list. */ + g_mutex_lock( set->lock ); + set->free = g_slist_remove( set->free, member ); + set->members = g_slist_remove( set->members, member ); + set->n_threads -= 1; + g_mutex_unlock( set->lock ); + + vips_semaphore_destroy( &member->idle ); + + VIPS_FREE( member ); return( NULL ); } @@ -168,11 +186,16 @@ vips_threadset_add( VipsThreadset *set ) return( NULL ); } + /* Ensure idle threads are freed on exit, this + * ref is increased before the thread is joined. + */ + g_thread_unref( member->thread ); + g_mutex_lock( set->lock ); set->members = g_slist_prepend( set->members, member ); set->n_threads += 1; set->n_threads_highwater = - VIPS_MAX( set->n_threads_highwater, set->n_threads );; + VIPS_MAX( set->n_threads_highwater, set->n_threads ); g_mutex_unlock( set->lock ); return( member ); @@ -274,20 +297,17 @@ vips_threadset_run( VipsThreadset *set, static void vips_threadset_kill_member( VipsThreadsetMember *member ) { - VipsThreadset *set = member->set; + GThread *thread; + thread = g_thread_ref( member->thread ); member->kill = TRUE; + vips_semaphore_up( &member->idle ); - g_thread_join( member->thread ); - vips_semaphore_destroy( &member->idle ); + (void) g_thread_join( thread ); - g_mutex_lock( set->lock ); - set->free = g_slist_remove( set->free, member ); - set->n_threads -= 1; - g_mutex_unlock( set->lock ); - - VIPS_FREE( member ); + /* member is freed on thread exit. + */ } /** @@ -309,10 +329,8 @@ vips_threadset_free( VipsThreadset *set ) member = NULL; g_mutex_lock( set->lock ); - if( set->members ) { + if( set->members ) member = (VipsThreadsetMember *) set->members->data; - set->members = g_slist_remove( set->members, member ); - } g_mutex_unlock( set->lock ); if( !member )