aboutsummaryrefslogtreecommitdiffstats
path: root/src/events.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/events.c')
-rw-r--r--src/events.c74
1 files changed, 52 insertions, 22 deletions
diff --git a/src/events.c b/src/events.c
index b658f48a9..0d831104c 100644
--- a/src/events.c
+++ b/src/events.c
@@ -56,10 +56,18 @@ event_mutex_free (gpointer data)
g_mutex_free (mtx);
}
+
+static void
+event_cond_free (gpointer data)
+{
+ GCond *cond = data;
+
+ g_cond_free (cond);
+}
#endif
struct rspamd_async_session *
-new_async_session (memory_pool_t * pool, event_finalizer_t fin,
+new_async_session (memory_pool_t * pool, session_finalizer_t fin,
event_finalizer_t restore, event_finalizer_t cleanup, void *user_data)
{
struct rspamd_async_session *new;
@@ -74,10 +82,14 @@ new_async_session (memory_pool_t * pool, event_finalizer_t fin,
new->events = g_hash_table_new (rspamd_event_hash, rspamd_event_equal);
#if ((GLIB_MAJOR_VERSION == 2) && (GLIB_MINOR_VERSION <= 30))
new->mtx = g_mutex_new ();
+ new->cond = g_cond_new ();
memory_pool_add_destructor (pool, (pool_destruct_func) event_mutex_free, new->mtx);
+ memory_pool_add_destructor (pool, (pool_destruct_func) event_cond_free, new->cond);
#else
new->mtx = memory_pool_alloc (pool, sizeof (GMutex));
g_mutex_init (new->mtx);
+ new->cond = memory_pool_alloc (pool, sizeof (GCond));
+ g_cond_init (new->cond);
#endif
new->threads = 0;
@@ -96,6 +108,7 @@ register_async_event (struct rspamd_async_session *session, event_finalizer_t fi
return;
}
+ g_mutex_lock (session->mtx);
new = memory_pool_alloc (session->pool, sizeof (struct rspamd_async_event));
new->fin = fin;
new->user_data = user_data;
@@ -106,6 +119,7 @@ register_async_event (struct rspamd_async_session *session, event_finalizer_t fi
msg_info ("added event: %p, pending %d events, subsystem: %s", user_data, g_hash_table_size (session->events),
g_quark_to_string (subsystem));
#endif
+ g_mutex_unlock (session->mtx);
}
void
@@ -118,6 +132,7 @@ remove_normal_event (struct rspamd_async_session *session, event_finalizer_t fin
return;
}
+ g_mutex_lock (session->mtx);
/* Search for event */
search_ev.fin = fin;
search_ev.user_data = ud;
@@ -130,13 +145,12 @@ remove_normal_event (struct rspamd_async_session *session, event_finalizer_t fin
/* Remove event */
fin (ud);
}
-
-
+ g_mutex_unlock (session->mtx);
check_session_pending (session);
}
-static void
+static gboolean
rspamd_session_destroy (gpointer k, gpointer v, gpointer unused)
{
struct rspamd_async_event *ev = v;
@@ -145,6 +159,8 @@ rspamd_session_destroy (gpointer k, gpointer v, gpointer unused)
if (ev->fin != NULL) {
ev->fin (ev->user_data);
}
+
+ return TRUE;
}
gboolean
@@ -155,36 +171,47 @@ destroy_session (struct rspamd_async_session *session)
return FALSE;
}
+ g_mutex_lock (session->mtx);
+ if (session->threads > 0) {
+ /* Wait for conditional variable to finish processing */
+ g_mutex_unlock (session->mtx);
+ g_cond_wait (session->cond, session->mtx);
+ }
+
session->wanna_die = TRUE;
- g_hash_table_foreach (session->events, rspamd_session_destroy, session);
+ g_hash_table_foreach_remove (session->events, rspamd_session_destroy, session);
- if (session->threads == 0) {
- if (session->cleanup != NULL) {
- session->cleanup (session->user_data);
- }
- return TRUE;
- }
+ /* Mutex can be destroyed here */
+ g_mutex_unlock (session->mtx);
- return FALSE;
+ if (session->cleanup != NULL) {
+ session->cleanup (session->user_data);
+ }
+ return TRUE;
}
gboolean
check_session_pending (struct rspamd_async_session *session)
{
g_mutex_lock (session->mtx);
- if (session->wanna_die && session->threads == 0 && g_hash_table_size (session->events) == 0) {
+ if (session->wanna_die && g_hash_table_size (session->events) == 0) {
session->wanna_die = FALSE;
- if (session->fin != NULL) {
- session->fin (session->user_data);
+ if (session->threads > 0) {
+ /* Wait for conditional variable to finish processing */
+ g_cond_wait (session->cond, session->mtx);
}
- /* Check events count again */
- if (g_hash_table_size (session->events) != 0) {
- if (session->restore != NULL) {
- session->restore (session->user_data);
+ if (session->fin != NULL) {
+ if (! session->fin (session->user_data)) {
+ g_mutex_unlock (session->mtx);
+ /* Session finished incompletely, perform restoration */
+ if (session->restore != NULL) {
+ session->restore (session->user_data);
+ /* Call pending once more */
+ return check_session_pending (session);
+ }
+ return TRUE;
}
- g_mutex_unlock (session->mtx);
- return TRUE;
}
g_mutex_unlock (session->mtx);
return FALSE;
@@ -215,7 +242,10 @@ void
remove_async_thread (struct rspamd_async_session *session)
{
if (g_atomic_int_dec_and_test (&session->threads)) {
- (void) check_session_pending (session);
+ /* Signal if there are any sessions waiting */
+ g_mutex_lock (session->mtx);
+ g_cond_signal (session->cond);
+ g_mutex_unlock (session->mtx);
}
#ifdef RSPAMD_EVENTS_DEBUG
msg_info ("removed thread: pending %d thread", session->threads);