threadset: stop idle threads after a regular timeout (#3253)

* Add `vips_semaphore_down_timeout` to public API

In preparation for the next commit.

* threadset: stop idle threads after a regular timeout

In line with the previous behaviour of GLib's threadpool.

* threadset: fix a small memory leak

* threadset: ensure idle threads are freed on exit

GLib threads are spawned with `ref_count == 2`, this reference is
decreased during thread exit and associated resources are freed
during `g_thread_join()`. However, idle threads that are stopped
after a regular timeout are not joined. To fix this, decrease the
reference count during `vips_threadset_add()` and increase it just
before joining the thread with `g_thread_join()`.

See:
2d5d990c6a/glib/gthread.c (L522)
2d5d990c6a/glib/gthread-posix.c (L1287)

* Deprecate `vips_g_thread_join()` in favor of `g_thread_join()`

* nit: remove extra semicolon
This commit is contained in:
Kleis Auke Wolthuizen 2023-01-03 13:00:35 +01:00 committed by GitHub
parent 60e18873fd
commit 848a119faa
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 87 additions and 41 deletions

View File

@ -129,6 +129,12 @@ im_warning( const char *fmt, ... )
va_end( ap );
}
void *
vips_g_thread_join( GThread *thread )
{
return( g_thread_join( thread ) );
}
int
im_affine( IMAGE *in, IMAGE *out,
double a, double b, double c, double d, double dx, double dy,

View File

@ -147,6 +147,9 @@ VIPS_DEPRECATED_FOR(g_warning)
void im_warning( const char *fmt, ... )
G_GNUC_PRINTF( 1, 2 );
VIPS_DEPRECATED_FOR(g_thread_join)
void *vips_g_thread_join( GThread *thread );
VIPS_DEPRECATED
int im_iterate( VipsImage *im,
VipsStartFn start, im_generate_fn generate, VipsStopFn stop,

View File

@ -60,6 +60,8 @@ int vips_semaphore_up( VipsSemaphore *s );
VIPS_API
int vips_semaphore_down( VipsSemaphore *s );
VIPS_API
int vips_semaphore_down_timeout( VipsSemaphore *s, gint64 timeout );
VIPS_API
int vips_semaphore_upn( VipsSemaphore *s, int n );
VIPS_API
int vips_semaphore_downn( VipsSemaphore *s, int n );

View File

@ -55,8 +55,6 @@ void vips_g_cond_free( GCond * );
*/
VIPS_API
GThread *vips_g_thread_new( const char *, GThreadFunc, gpointer );
VIPS_API
void *vips_g_thread_join( GThread *thread );
VIPS_API
gboolean vips_thread_isvips( void );

View File

@ -110,37 +110,69 @@ vips_semaphore_up( VipsSemaphore *s )
}
/* Wait for sem>n, then subtract n.
* Returns -1 when the monotonic time in @end_time was passed.
*/
int
vips_semaphore_downn( VipsSemaphore *s, int n )
static int
vips__semaphore_downn_until( VipsSemaphore *s, int n, gint64 end_time )
{
int value_after_op;
VIPS_GATE_START( "vips_semaphore_downn: wait" );
VIPS_GATE_START( "vips__semaphore_downn_until: wait" );
g_mutex_lock( s->mutex );
while( s->v < n )
g_cond_wait( s->cond, s->mutex );
while( s->v < n ) {
if( end_time == -1 )
g_cond_wait( s->cond, s->mutex );
else if( !g_cond_wait_until( s->cond, s->mutex, end_time ) ) {
/* timeout has passed.
*/
g_mutex_unlock( s->mutex );
VIPS_GATE_STOP( "vips__semaphore_downn_until: wait" );
return( -1 );
}
}
s->v -= n;
value_after_op = s->v;
g_mutex_unlock( s->mutex );
#ifdef DEBUG_IO
printf( "vips_semaphore_downn(\"%s\",%d): %d\n",
printf( "vips__semaphore_downn_until(\"%s\",%d): %d\n",
s->name, n, value_after_op );
#endif /*DEBUG_IO*/
VIPS_GATE_STOP( "vips_semaphore_downn: wait" );
VIPS_GATE_STOP( "vips__semaphore_downn_until: wait" );
return( value_after_op );
}
/* Wait for sem>n, then subtract n.
*/
int
vips_semaphore_downn( VipsSemaphore *s, int n )
{
return( vips__semaphore_downn_until( s, n, -1 ) );
}
/* Wait for sem > 0, then decrement.
*/
int
vips_semaphore_down( VipsSemaphore *s )
{
return( vips_semaphore_downn( s, 1 ) );
return( vips__semaphore_downn_until( s, 1, -1 ) );
}
/* Wait for sem > 0, then decrement.
* Returns -1 when the relative time in @timeout (in microseconds)
* was passed.
*/
int
vips_semaphore_down_timeout( VipsSemaphore *s, gint64 timeout )
{
gint64 end_time = g_get_monotonic_time () + timeout;
return( vips__semaphore_downn_until( s, 1, end_time ) );
}

View File

@ -456,7 +456,7 @@ vips__render_shutdown( void )
vips_semaphore_up( &n_render_dirty_sem );
(void) vips_g_thread_join( thread );
(void) g_thread_join( thread );
}
else
g_mutex_unlock( render_dirty_lock );

View File

@ -190,19 +190,6 @@ vips_g_thread_new( const char *domain, GThreadFunc func, gpointer data )
return( thread );
}
void *
vips_g_thread_join( GThread *thread )
{
void *result;
result = g_thread_join( thread );
VIPS_DEBUG_MSG_RED( "vips_g_thread_join: g_thread_join( %p )\n",
thread );
return( result );
}
static int
get_num_processors( void )
{

View File

@ -92,6 +92,11 @@ struct _VipsThreadset {
int max_threads;
};
/* The maximum relative time (in microseconds) that a thread waits
* for work before being stopped.
*/
static const gint64 max_idle_time = 15 * G_TIME_SPAN_SECOND;
/* The thread work function.
*/
static void *
@ -101,9 +106,13 @@ vips_threadset_work( void *pointer )
VipsThreadset *set = member->set;
for(;;) {
/* Wait to be given work.
/* Wait for at least 15 seconds to be given work.
*/
if( vips_semaphore_down_timeout( &member->idle, max_idle_time ) == -1 )
break;
/* Killed or no task available? Leave this thread.
*/
vips_semaphore_down( &member->idle );
if( member->kill ||
!member->func )
break;
@ -134,9 +143,18 @@ vips_threadset_work( void *pointer )
g_mutex_unlock( set->lock );
}
/* Kill has been requested. We leave this thread on the members
* list so it can be found and joined.
/* Timed-out or kill has been requested ... remove from both free
* and member list.
*/
g_mutex_lock( set->lock );
set->free = g_slist_remove( set->free, member );
set->members = g_slist_remove( set->members, member );
set->n_threads -= 1;
g_mutex_unlock( set->lock );
vips_semaphore_destroy( &member->idle );
VIPS_FREE( member );
return( NULL );
}
@ -168,11 +186,16 @@ vips_threadset_add( VipsThreadset *set )
return( NULL );
}
/* Ensure idle threads are freed on exit, this
* ref is increased before the thread is joined.
*/
g_thread_unref( member->thread );
g_mutex_lock( set->lock );
set->members = g_slist_prepend( set->members, member );
set->n_threads += 1;
set->n_threads_highwater =
VIPS_MAX( set->n_threads_highwater, set->n_threads );;
VIPS_MAX( set->n_threads_highwater, set->n_threads );
g_mutex_unlock( set->lock );
return( member );
@ -274,20 +297,17 @@ vips_threadset_run( VipsThreadset *set,
static void
vips_threadset_kill_member( VipsThreadsetMember *member )
{
VipsThreadset *set = member->set;
GThread *thread;
thread = g_thread_ref( member->thread );
member->kill = TRUE;
vips_semaphore_up( &member->idle );
g_thread_join( member->thread );
vips_semaphore_destroy( &member->idle );
(void) g_thread_join( thread );
g_mutex_lock( set->lock );
set->free = g_slist_remove( set->free, member );
set->n_threads -= 1;
g_mutex_unlock( set->lock );
VIPS_FREE( member );
/* member is freed on thread exit.
*/
}
/**
@ -309,10 +329,8 @@ vips_threadset_free( VipsThreadset *set )
member = NULL;
g_mutex_lock( set->lock );
if( set->members ) {
if( set->members )
member = (VipsThreadsetMember *) set->members->data;
set->members = g_slist_remove( set->members, member );
}
g_mutex_unlock( set->lock );
if( !member )