aboutsummaryrefslogtreecommitdiffstats
path: root/src/events.c
diff options
context:
space:
mode:
authorVsevolod Stakhov <vsevolod@rambler-co.ru>2012-01-31 22:24:51 +0400
committerVsevolod Stakhov <vsevolod@rambler-co.ru>2012-01-31 22:24:51 +0400
commit5c0f36dfe734e46d72e4afdfd71c05353e4df86d (patch)
tree8ed04921ae2a4ee18106b0ca8096641e12ed1c01 /src/events.c
parent2d708971163dc99f9c29cc47e7d4f56a3af882c5 (diff)
downloadrspamd-5c0f36dfe734e46d72e4afdfd71c05353e4df86d.tar.gz
rspamd-5c0f36dfe734e46d72e4afdfd71c05353e4df86d.zip
Fixes to threading (still incomplete).
Diffstat (limited to 'src/events.c')
-rw-r--r--src/events.c21
1 files changed, 19 insertions, 2 deletions
diff --git a/src/events.c b/src/events.c
index bf47bb1f8..0daefc95f 100644
--- a/src/events.c
+++ b/src/events.c
@@ -62,6 +62,13 @@ new_async_session (memory_pool_t * pool, event_finalizer_t fin,
new->user_data = user_data;
new->wanna_die = FALSE;
new->events = g_hash_table_new (rspamd_event_hash, rspamd_event_equal);
+#if ((GLIB_MAJOR_VERSION == 2) && (GLIB_MINOR_VERSION <= 30))
+ new->mtx = g_mutex_new ();
+ memory_pool_add_destructor (pool, (pool_destruct_func) g_mutex_free, new->mtx);
+#else
+ new->mtx = memory_pool_alloc (pool, sizeof (GMutex));
+ g_mutex_init (new->mtx);
+#endif
new->threads = 0;
memory_pool_add_destructor (pool, (pool_destruct_func) g_hash_table_destroy, new->events);
@@ -148,7 +155,9 @@ destroy_session (struct rspamd_async_session *session)
gboolean
check_session_pending (struct rspamd_async_session *session)
{
- if (session->threads == 0 && g_hash_table_size (session->events) == 0) {
+ g_mutex_lock (session->mtx);
+ if (session->wanna_die && session->threads == 0 && g_hash_table_size (session->events) == 0) {
+ session->wanna_die = FALSE;
if (session->fin != NULL) {
session->fin (session->user_data);
}
@@ -157,11 +166,13 @@ check_session_pending (struct rspamd_async_session *session)
if (session->restore != NULL) {
session->restore (session->user_data);
}
+ g_mutex_unlock (session->mtx);
return TRUE;
}
+ g_mutex_unlock (session->mtx);
return FALSE;
}
-
+ g_mutex_unlock (session->mtx);
return TRUE;
}
@@ -174,6 +185,9 @@ void
register_async_thread (struct rspamd_async_session *session)
{
g_atomic_int_inc (&session->threads);
+#ifdef RSPAMD_EVENTS_DEBUG
+ msg_info ("added thread: pending %d thread", session->threads);
+#endif
}
/**
@@ -186,4 +200,7 @@ remove_async_thread (struct rspamd_async_session *session)
if (g_atomic_int_dec_and_test (&session->threads)) {
(void) check_session_pending (session);
}
+#ifdef RSPAMD_EVENTS_DEBUG
+ msg_info ("removed thread: pending %d thread", session->threads);
+#endif
}