From 1abb0939d17211cadd1be7b7249cf024f8cfebdf Mon Sep 17 00:00:00 2001 From: Vsevolod Stakhov Date: Tue, 26 May 2015 09:38:47 +0100 Subject: [PATCH] Remove async threads for now. They are anyway broken in the asynchronous world. --- src/libserver/events.c | 137 ++++++++++++----------------------------- src/libserver/events.h | 44 +++---------- 2 files changed, 48 insertions(+), 133 deletions(-) diff --git a/src/libserver/events.c b/src/libserver/events.c index 1b3e845ce..9af0a84a8 100644 --- a/src/libserver/events.c +++ b/src/libserver/events.c @@ -26,6 +26,30 @@ #include "main.h" #include "events.h" +struct rspamd_async_watcher { + event_watcher_t cb; + guint remain; + gpointer ud; +}; + +struct rspamd_async_event { + GQuark subsystem; + event_finalizer_t fin; + void *user_data; + struct rspamd_async_watcher *w; +}; + +struct rspamd_async_session { + session_finalizer_t fin; + event_finalizer_t restore; + event_finalizer_t cleanup; + GHashTable *events; + void *user_data; + rspamd_mempool_t *pool; + struct rspamd_async_watcher *cur_watcher; + guint flags; +}; + static gboolean rspamd_event_equal (gconstpointer a, gconstpointer b) { @@ -42,27 +66,15 @@ static guint rspamd_event_hash (gconstpointer a) { const struct rspamd_async_event *ev = a; + XXH64_state_t st; - return GPOINTER_TO_UINT (ev->user_data); -} + XXH64_reset (&st, rspamd_hash_seed ()); + XXH64_update (&st, ev->user_data, sizeof (gpointer)); + XXH64_update (&st, ev->fin, sizeof (*ev->fin)); -#if ((GLIB_MAJOR_VERSION == 2) && (GLIB_MINOR_VERSION <= 30)) -static void -event_mutex_free (gpointer data) -{ - GMutex *mtx = data; - - g_mutex_free (mtx); + return XXH64_digest (&st); } -static void -event_cond_free (gpointer data) -{ - GCond *cond = data; - - g_cond_free (cond); -} -#endif struct rspamd_async_session * new_async_session (rspamd_mempool_t * pool, session_finalizer_t fin, @@ -70,36 +82,13 @@ new_async_session (rspamd_mempool_t * pool, session_finalizer_t fin, { struct rspamd_async_session *new; - new = rspamd_mempool_alloc (pool, sizeof (struct rspamd_async_session)); + new = rspamd_mempool_alloc0 (pool, sizeof (struct rspamd_async_session)); new->pool = pool; new->fin = fin; new->restore = restore; new->cleanup = cleanup; new->user_data = user_data; - new->wanna_die = FALSE; new->events = g_hash_table_new (rspamd_event_hash, rspamd_event_equal); -#if ((GLIB_MAJOR_VERSION == 2) && (GLIB_MINOR_VERSION <= 30)) - new->mtx = g_mutex_new (); - new->cond = g_cond_new (); - rspamd_mempool_add_destructor (pool, - (rspamd_mempool_destruct_t) event_mutex_free, - new->mtx); - rspamd_mempool_add_destructor (pool, - (rspamd_mempool_destruct_t) event_cond_free, - new->cond); -#else - new->mtx = rspamd_mempool_alloc (pool, sizeof (GMutex)); - g_mutex_init (new->mtx); - new->cond = rspamd_mempool_alloc (pool, sizeof (GCond)); - g_cond_init (new->cond); - rspamd_mempool_add_destructor (pool, - (rspamd_mempool_destruct_t) g_mutex_clear, - new->mtx); - rspamd_mempool_add_destructor (pool, - (rspamd_mempool_destruct_t) g_cond_clear, - new->cond); -#endif - new->threads = 0; rspamd_mempool_add_destructor (pool, (rspamd_mempool_destruct_t) g_hash_table_destroy, @@ -121,21 +110,26 @@ register_async_event (struct rspamd_async_session *session, return; } - g_mutex_lock (session->mtx); new = rspamd_mempool_alloc (session->pool, sizeof (struct rspamd_async_event)); new->fin = fin; new->user_data = user_data; new->subsystem = subsystem; + if (session->cur_watcher) { + new->w = session->cur_watcher; + new->w->remain ++; + } + else { + new->w = NULL; + } + g_hash_table_insert (session->events, new, new); msg_debug ("added event: %p, pending %d events, subsystem: %s", user_data, g_hash_table_size (session->events), g_quark_to_string (subsystem)); - - g_mutex_unlock (session->mtx); } void @@ -150,7 +144,6 @@ remove_normal_event (struct rspamd_async_session *session, return; } - g_mutex_lock (session->mtx); /* Search for event */ search_ev.fin = fin; search_ev.user_data = ud; @@ -163,7 +156,6 @@ remove_normal_event (struct rspamd_async_session *session, /* Remove event */ fin (ud); } - g_mutex_unlock (session->mtx); check_session_pending (session); } @@ -192,22 +184,10 @@ destroy_session (struct rspamd_async_session *session) return FALSE; } - g_mutex_lock (session->mtx); - if (session->threads > 0) { - /* Wait for conditional variable to finish processing */ - g_mutex_unlock (session->mtx); - g_cond_wait (session->cond, session->mtx); - } - - session->wanna_die = TRUE; - g_hash_table_foreach_remove (session->events, rspamd_session_destroy, session); - /* Mutex can be destroyed here */ - g_mutex_unlock (session->mtx); - if (session->cleanup != NULL) { session->cleanup (session->user_data); } @@ -217,15 +197,8 @@ destroy_session (struct rspamd_async_session *session) gboolean check_session_pending (struct rspamd_async_session *session) { - g_mutex_lock (session->mtx); - if (session->wanna_die && g_hash_table_size (session->events) == 0) { - session->wanna_die = FALSE; - if (session->threads > 0) { - /* Wait for conditional variable to finish processing */ - g_cond_wait (session->cond, session->mtx); - } + if (g_hash_table_size (session->events) == 0) { if (session->fin != NULL) { - g_mutex_unlock (session->mtx); if (!session->fin (session->user_data)) { /* Session finished incompletely, perform restoration */ if (session->restore != NULL) { @@ -239,37 +212,9 @@ check_session_pending (struct rspamd_async_session *session) return FALSE; } } - g_mutex_unlock (session->mtx); + return FALSE; } - g_mutex_unlock (session->mtx); - return TRUE; -} - -/** - * Add new async thread to session - * @param session session object - */ -void -register_async_thread (struct rspamd_async_session *session) -{ - g_atomic_int_inc (&session->threads); - msg_debug ("added thread: pending %d thread", session->threads); -} - -/** - * Remove async thread from session and check whether session can be terminated - * @param session session object - */ -void -remove_async_thread (struct rspamd_async_session *session) -{ - if (g_atomic_int_dec_and_test (&session->threads)) { - /* Signal if there are any sessions waiting */ - g_mutex_lock (session->mtx); - g_cond_signal (session->cond); - g_mutex_unlock (session->mtx); - } - msg_debug ("removed thread: pending %d thread", session->threads); + return TRUE; } diff --git a/src/libserver/events.h b/src/libserver/events.h index e751c1054..442834f98 100644 --- a/src/libserver/events.h +++ b/src/libserver/events.h @@ -29,29 +29,11 @@ #include "mem_pool.h" struct rspamd_async_event; +struct rspamd_async_session; -typedef void (*event_finalizer_t)(void *user_data); -typedef gboolean (*session_finalizer_t)(void *user_data); - -struct rspamd_async_event { - GQuark subsystem; - event_finalizer_t fin; - void *user_data; - guint ref; -}; - -struct rspamd_async_session { - session_finalizer_t fin; - event_finalizer_t restore; - event_finalizer_t cleanup; - GHashTable *events; - void *user_data; - rspamd_mempool_t *pool; - gboolean wanna_die; - guint threads; - GMutex *mtx; - GCond *cond; -}; +typedef void (*event_finalizer_t)(gpointer ud); +typedef void (*event_watcher_t)(guint remain, gboolean terminated, gpointer ud); +typedef gboolean (*session_finalizer_t)(gpointer user_data); /** * Make new async session @@ -64,7 +46,7 @@ struct rspamd_async_session { */ struct rspamd_async_session * new_async_session (rspamd_mempool_t *pool, session_finalizer_t fin, event_finalizer_t restore, - event_finalizer_t cleanup, void *user_data); + event_finalizer_t cleanup, gpointer user_data); /** * Insert new event to the session @@ -74,7 +56,7 @@ struct rspamd_async_session * new_async_session (rspamd_mempool_t *pool, * @param forced unused */ void register_async_event (struct rspamd_async_session *session, - event_finalizer_t fin, void *user_data, GQuark subsystem); + event_finalizer_t fin, gpointer user_data, GQuark subsystem); /** * Remove normal event @@ -84,7 +66,7 @@ void register_async_event (struct rspamd_async_session *session, */ void remove_normal_event (struct rspamd_async_session *session, event_finalizer_t fin, - void *ud); + gpointer ud); /** * Must be called at the end of session, it calls fin functions for all non-forced callbacks @@ -99,16 +81,4 @@ gboolean destroy_session (struct rspamd_async_session *session); */ gboolean check_session_pending (struct rspamd_async_session *session); -/** - * Add new async thread to session - * @param session session object - */ -void register_async_thread (struct rspamd_async_session *session); - -/** - * Remove async thread from session and check whether session can be terminated - * @param session session object - */ -void remove_async_thread (struct rspamd_async_session *session); - #endif /* RSPAMD_EVENTS_H */ -- 2.39.5