diff options
Diffstat (limited to 'src/libserver/events.c')
-rw-r--r-- | src/libserver/events.c | 250 |
1 files changed, 250 insertions, 0 deletions
diff --git a/src/libserver/events.c b/src/libserver/events.c new file mode 100644 index 000000000..85843fd05 --- /dev/null +++ b/src/libserver/events.c @@ -0,0 +1,250 @@ +/* + * Copyright (c) 2009-2012, Vsevolod Stakhov + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY AUTHOR ''AS IS'' AND ANY + * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED + * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE + * DISCLAIMED. IN NO EVENT SHALL AUTHOR BE LIABLE FOR ANY + * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES + * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; + * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND + * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS + * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +#include "config.h" +#include "main.h" +#include "events.h" + +static gboolean +rspamd_event_equal (gconstpointer a, gconstpointer b) +{ + const struct rspamd_async_event *ev1 = a, *ev2 = b; + + if (ev1->fin == ev2->fin) { + return ev1->user_data == ev2->user_data; + } + + return FALSE; +} + +static guint +rspamd_event_hash (gconstpointer a) +{ + const struct rspamd_async_event *ev = a; + + return GPOINTER_TO_UINT (ev->user_data); +} + +#if ((GLIB_MAJOR_VERSION == 2) && (GLIB_MINOR_VERSION <= 30)) +static void +event_mutex_free (gpointer data) +{ + GMutex *mtx = data; + + g_mutex_free (mtx); +} + +static void +event_cond_free (gpointer data) +{ + GCond *cond = data; + + g_cond_free (cond); +} +#endif + +struct rspamd_async_session * +new_async_session (rspamd_mempool_t * pool, session_finalizer_t fin, + event_finalizer_t restore, event_finalizer_t cleanup, void *user_data) +{ + struct rspamd_async_session *new; + + new = rspamd_mempool_alloc (pool, sizeof (struct rspamd_async_session)); + new->pool = pool; + new->fin = fin; + new->restore = restore; + new->cleanup = cleanup; + new->user_data = user_data; + new->wanna_die = FALSE; + new->events = g_hash_table_new (rspamd_event_hash, rspamd_event_equal); +#if ((GLIB_MAJOR_VERSION == 2) && (GLIB_MINOR_VERSION <= 30)) + new->mtx = g_mutex_new (); + new->cond = g_cond_new (); + rspamd_mempool_add_destructor (pool, (rspamd_mempool_destruct_t) event_mutex_free, new->mtx); + rspamd_mempool_add_destructor (pool, (rspamd_mempool_destruct_t) event_cond_free, new->cond); +#else + new->mtx = rspamd_mempool_alloc (pool, sizeof (GMutex)); + g_mutex_init (new->mtx); + new->cond = rspamd_mempool_alloc (pool, sizeof (GCond)); + g_cond_init (new->cond); + rspamd_mempool_add_destructor (pool, (rspamd_mempool_destruct_t) g_mutex_clear, new->mtx); + rspamd_mempool_add_destructor (pool, (rspamd_mempool_destruct_t) g_cond_clear, new->cond); +#endif + new->threads = 0; + + rspamd_mempool_add_destructor (pool, (rspamd_mempool_destruct_t) g_hash_table_destroy, new->events); + + return new; +} + +void +register_async_event (struct rspamd_async_session *session, event_finalizer_t fin, void *user_data, GQuark subsystem) +{ + struct rspamd_async_event *new; + + if (session == NULL) { + msg_info ("session is NULL"); + return; + } + + g_mutex_lock (session->mtx); + new = rspamd_mempool_alloc (session->pool, sizeof (struct rspamd_async_event)); + new->fin = fin; + new->user_data = user_data; + new->subsystem = subsystem; + + g_hash_table_insert (session->events, new, new); + + msg_debug ("added event: %p, pending %d events, subsystem: %s", user_data, g_hash_table_size (session->events), + g_quark_to_string (subsystem)); + + g_mutex_unlock (session->mtx); +} + +void +remove_normal_event (struct rspamd_async_session *session, event_finalizer_t fin, void *ud) +{ + struct rspamd_async_event search_ev, *found_ev; + + if (session == NULL) { + msg_info ("session is NULL"); + return; + } + + g_mutex_lock (session->mtx); + /* Search for event */ + search_ev.fin = fin; + search_ev.user_data = ud; + if ((found_ev = g_hash_table_lookup (session->events, &search_ev)) != NULL) { + g_hash_table_remove (session->events, found_ev); + msg_debug ("removed event: %p, subsystem: %s, pending %d events", ud, + g_quark_to_string (found_ev->subsystem), g_hash_table_size (session->events)); + /* Remove event */ + fin (ud); + } + g_mutex_unlock (session->mtx); + + check_session_pending (session); +} + +static gboolean +rspamd_session_destroy (gpointer k, gpointer v, gpointer unused) +{ + struct rspamd_async_event *ev = v; + + /* Call event's finalizer */ + if (ev->fin != NULL) { + ev->fin (ev->user_data); + } + + return TRUE; +} + +gboolean +destroy_session (struct rspamd_async_session *session) +{ + if (session == NULL) { + msg_info ("session is NULL"); + return FALSE; + } + + g_mutex_lock (session->mtx); + if (session->threads > 0) { + /* Wait for conditional variable to finish processing */ + g_mutex_unlock (session->mtx); + g_cond_wait (session->cond, session->mtx); + } + + session->wanna_die = TRUE; + + g_hash_table_foreach_remove (session->events, rspamd_session_destroy, session); + + /* Mutex can be destroyed here */ + g_mutex_unlock (session->mtx); + + if (session->cleanup != NULL) { + session->cleanup (session->user_data); + } + return TRUE; +} + +gboolean +check_session_pending (struct rspamd_async_session *session) +{ + g_mutex_lock (session->mtx); + if (session->wanna_die && g_hash_table_size (session->events) == 0) { + session->wanna_die = FALSE; + if (session->threads > 0) { + /* Wait for conditional variable to finish processing */ + g_cond_wait (session->cond, session->mtx); + } + if (session->fin != NULL) { + g_mutex_unlock (session->mtx); + if (! session->fin (session->user_data)) { + /* Session finished incompletely, perform restoration */ + if (session->restore != NULL) { + session->restore (session->user_data); + /* Call pending once more */ + return check_session_pending (session); + } + return TRUE; + } + else { + return FALSE; + } + } + g_mutex_unlock (session->mtx); + return FALSE; + } + g_mutex_unlock (session->mtx); + return TRUE; +} + + +/** + * Add new async thread to session + * @param session session object + */ +void +register_async_thread (struct rspamd_async_session *session) +{ + g_atomic_int_inc (&session->threads); + msg_debug ("added thread: pending %d thread", session->threads); +} + +/** + * Remove async thread from session and check whether session can be terminated + * @param session session object + */ +void +remove_async_thread (struct rspamd_async_session *session) +{ + if (g_atomic_int_dec_and_test (&session->threads)) { + /* Signal if there are any sessions waiting */ + g_mutex_lock (session->mtx); + g_cond_signal (session->cond); + g_mutex_unlock (session->mtx); + } + msg_debug ("removed thread: pending %d thread", session->threads); +} |