diff options
Diffstat (limited to 'src/events.c')
-rw-r--r-- | src/events.c | 74 |
1 files changed, 52 insertions, 22 deletions
diff --git a/src/events.c b/src/events.c index b658f48a9..0d831104c 100644 --- a/src/events.c +++ b/src/events.c @@ -56,10 +56,18 @@ event_mutex_free (gpointer data) g_mutex_free (mtx); } + +static void +event_cond_free (gpointer data) +{ + GCond *cond = data; + + g_cond_free (cond); +} #endif struct rspamd_async_session * -new_async_session (memory_pool_t * pool, event_finalizer_t fin, +new_async_session (memory_pool_t * pool, session_finalizer_t fin, event_finalizer_t restore, event_finalizer_t cleanup, void *user_data) { struct rspamd_async_session *new; @@ -74,10 +82,14 @@ new_async_session (memory_pool_t * pool, event_finalizer_t fin, new->events = g_hash_table_new (rspamd_event_hash, rspamd_event_equal); #if ((GLIB_MAJOR_VERSION == 2) && (GLIB_MINOR_VERSION <= 30)) new->mtx = g_mutex_new (); + new->cond = g_cond_new (); memory_pool_add_destructor (pool, (pool_destruct_func) event_mutex_free, new->mtx); + memory_pool_add_destructor (pool, (pool_destruct_func) event_cond_free, new->cond); #else new->mtx = memory_pool_alloc (pool, sizeof (GMutex)); g_mutex_init (new->mtx); + new->cond = memory_pool_alloc (pool, sizeof (GCond)); + g_cond_init (new->cond); #endif new->threads = 0; @@ -96,6 +108,7 @@ register_async_event (struct rspamd_async_session *session, event_finalizer_t fi return; } + g_mutex_lock (session->mtx); new = memory_pool_alloc (session->pool, sizeof (struct rspamd_async_event)); new->fin = fin; new->user_data = user_data; @@ -106,6 +119,7 @@ register_async_event (struct rspamd_async_session *session, event_finalizer_t fi msg_info ("added event: %p, pending %d events, subsystem: %s", user_data, g_hash_table_size (session->events), g_quark_to_string (subsystem)); #endif + g_mutex_unlock (session->mtx); } void @@ -118,6 +132,7 @@ remove_normal_event (struct rspamd_async_session *session, event_finalizer_t fin return; } + g_mutex_lock (session->mtx); /* Search for event */ search_ev.fin = fin; search_ev.user_data = ud; @@ -130,13 +145,12 @@ remove_normal_event (struct rspamd_async_session *session, event_finalizer_t fin /* Remove event */ fin (ud); } - - + g_mutex_unlock (session->mtx); check_session_pending (session); } -static void +static gboolean rspamd_session_destroy (gpointer k, gpointer v, gpointer unused) { struct rspamd_async_event *ev = v; @@ -145,6 +159,8 @@ rspamd_session_destroy (gpointer k, gpointer v, gpointer unused) if (ev->fin != NULL) { ev->fin (ev->user_data); } + + return TRUE; } gboolean @@ -155,36 +171,47 @@ destroy_session (struct rspamd_async_session *session) return FALSE; } + g_mutex_lock (session->mtx); + if (session->threads > 0) { + /* Wait for conditional variable to finish processing */ + g_mutex_unlock (session->mtx); + g_cond_wait (session->cond, session->mtx); + } + session->wanna_die = TRUE; - g_hash_table_foreach (session->events, rspamd_session_destroy, session); + g_hash_table_foreach_remove (session->events, rspamd_session_destroy, session); - if (session->threads == 0) { - if (session->cleanup != NULL) { - session->cleanup (session->user_data); - } - return TRUE; - } + /* Mutex can be destroyed here */ + g_mutex_unlock (session->mtx); - return FALSE; + if (session->cleanup != NULL) { + session->cleanup (session->user_data); + } + return TRUE; } gboolean check_session_pending (struct rspamd_async_session *session) { g_mutex_lock (session->mtx); - if (session->wanna_die && session->threads == 0 && g_hash_table_size (session->events) == 0) { + if (session->wanna_die && g_hash_table_size (session->events) == 0) { session->wanna_die = FALSE; - if (session->fin != NULL) { - session->fin (session->user_data); + if (session->threads > 0) { + /* Wait for conditional variable to finish processing */ + g_cond_wait (session->cond, session->mtx); } - /* Check events count again */ - if (g_hash_table_size (session->events) != 0) { - if (session->restore != NULL) { - session->restore (session->user_data); + if (session->fin != NULL) { + if (! session->fin (session->user_data)) { + g_mutex_unlock (session->mtx); + /* Session finished incompletely, perform restoration */ + if (session->restore != NULL) { + session->restore (session->user_data); + /* Call pending once more */ + return check_session_pending (session); + } + return TRUE; } - g_mutex_unlock (session->mtx); - return TRUE; } g_mutex_unlock (session->mtx); return FALSE; @@ -215,7 +242,10 @@ void remove_async_thread (struct rspamd_async_session *session) { if (g_atomic_int_dec_and_test (&session->threads)) { - (void) check_session_pending (session); + /* Signal if there are any sessions waiting */ + g_mutex_lock (session->mtx); + g_cond_signal (session->cond); + g_mutex_unlock (session->mtx); } #ifdef RSPAMD_EVENTS_DEBUG msg_info ("removed thread: pending %d thread", session->threads); |