From 2d708971163dc99f9c29cc47e7d4f56a3af882c5 Mon Sep 17 00:00:00 2001 From: Vsevolod Stakhov Date: Tue, 31 Jan 2012 21:24:32 +0400 Subject: [PATCH] Rework events library slightly: - forced events are no longer checked or created - add async threads to prevent session to be destroyed till all threads are finished --- src/events.c | 81 +++++++++++++++----------------------------- src/events.h | 25 ++++++++------ src/plugins/regexp.c | 2 ++ src/smtp.c | 3 +- 4 files changed, 45 insertions(+), 66 deletions(-) diff --git a/src/events.c b/src/events.c index eb1e26094..bf47bb1f8 100644 --- a/src/events.c +++ b/src/events.c @@ -62,10 +62,9 @@ new_async_session (memory_pool_t * pool, event_finalizer_t fin, new->user_data = user_data; new->wanna_die = FALSE; new->events = g_hash_table_new (rspamd_event_hash, rspamd_event_equal); - new->forced_events = g_queue_new (); + new->threads = 0; memory_pool_add_destructor (pool, (pool_destruct_func) g_hash_table_destroy, new->events); - memory_pool_add_destructor (pool, (pool_destruct_func) g_queue_free, new->forced_events); return new; } @@ -73,70 +72,23 @@ new_async_session (memory_pool_t * pool, event_finalizer_t fin, void register_async_event (struct rspamd_async_session *session, event_finalizer_t fin, void *user_data, gboolean forced) { - struct rspamd_async_event *new, *ev; - GList *cur; + struct rspamd_async_event *new; if (session == NULL) { msg_info ("session is NULL"); return; } - if (forced) { - /* For forced events try first to increase its reference */ - cur = session->forced_events->head; - while (cur) { - ev = cur->data; - if (ev->forced && ev->fin == fin) { - ev->ref++; - return; - } - cur = g_list_next (cur); - } - } - new = memory_pool_alloc (session->pool, sizeof (struct rspamd_async_event)); new->fin = fin; new->user_data = user_data; - new->forced = forced; - new->ref = 1; + g_hash_table_insert (session->events, new, new); #ifdef RSPAMD_EVENTS_DEBUG msg_info ("added event: %p, pending %d events", user_data, g_hash_table_size (session->events)); #endif } -void -remove_forced_event (struct rspamd_async_session *session, event_finalizer_t fin) -{ - struct rspamd_async_event *ev; - GList *cur; - - if (session == NULL) { - msg_info ("session is NULL"); - return; - } - - cur = session->forced_events->head; - while (cur) { - ev = cur->data; - if (ev->forced && ev->fin == fin) { - ev->ref--; - if (ev->ref == 0) { - g_queue_delete_link (session->forced_events, cur); - } - break; - } - cur = g_list_next (cur); - } - - check_session_pending (session); - - if (session->wanna_die && session->fin != NULL && g_queue_get_length (session->forced_events) == 0) { - /* Call session destroy after all forced events are ready */ - session->cleanup (session->user_data); - } -} - void remove_normal_event (struct rspamd_async_session *session, event_finalizer_t fin, void *ud) { @@ -183,7 +135,7 @@ destroy_session (struct rspamd_async_session *session) g_hash_table_foreach (session->events, rspamd_session_destroy, session); - if (g_queue_get_length (session->forced_events) == 0) { + if (session->threads == 0) { if (session->cleanup != NULL) { session->cleanup (session->user_data); } @@ -196,7 +148,7 @@ destroy_session (struct rspamd_async_session *session) gboolean check_session_pending (struct rspamd_async_session *session) { - if (g_queue_get_length (session->forced_events) == 0 && g_hash_table_size (session->events) == 0) { + if (session->threads == 0 && g_hash_table_size (session->events) == 0) { if (session->fin != NULL) { session->fin (session->user_data); } @@ -212,3 +164,26 @@ check_session_pending (struct rspamd_async_session *session) return TRUE; } + + +/** + * Add new async thread to session + * @param session session object + */ +void +register_async_thread (struct rspamd_async_session *session) +{ + g_atomic_int_inc (&session->threads); +} + +/** + * Remove async thread from session and check whether session can be terminated + * @param session session object + */ +void +remove_async_thread (struct rspamd_async_session *session) +{ + if (g_atomic_int_dec_and_test (&session->threads)) { + (void) check_session_pending (session); + } +} diff --git a/src/events.h b/src/events.h index 2aba3eb2e..138920060 100644 --- a/src/events.h +++ b/src/events.h @@ -10,7 +10,6 @@ typedef void (*event_finalizer_t)(void *user_data); struct rspamd_async_event { event_finalizer_t fin; void *user_data; - gboolean forced; guint ref; }; @@ -19,10 +18,10 @@ struct rspamd_async_session { event_finalizer_t restore; event_finalizer_t cleanup; GHashTable *events; - GQueue *forced_events; void *user_data; memory_pool_t *pool; gboolean wanna_die; + guint threads; }; /** @@ -43,19 +42,11 @@ struct rspamd_async_session *new_async_session (memory_pool_t *pool, * @param session session object * @param fin finalizer callback * @param user_data abstract user_data - * @param forced session cannot be destroyed until forced event are still in it + * @param forced unused */ void register_async_event (struct rspamd_async_session *session, event_finalizer_t fin, void *user_data, gboolean forced); - -/** - * Remove forced event - * @param session session object - * @param fin destructor function - */ -void remove_forced_event (struct rspamd_async_session *session, event_finalizer_t fin); - /** * Remove normal event * @param session session object @@ -77,4 +68,16 @@ gboolean destroy_session (struct rspamd_async_session *session); */ gboolean check_session_pending (struct rspamd_async_session *session); +/** + * Add new async thread to session + * @param session session object + */ +void register_async_thread (struct rspamd_async_session *session); + +/** + * Remove async thread from session and check whether session can be terminated + * @param session session object + */ +void remove_async_thread (struct rspamd_async_session *session); + #endif /* RSPAMD_EVENTS_H */ diff --git a/src/plugins/regexp.c b/src/plugins/regexp.c index 902078d29..9d2210c56 100644 --- a/src/plugins/regexp.c +++ b/src/plugins/regexp.c @@ -1227,6 +1227,7 @@ process_regexp_item_threaded (gpointer data, gpointer user_data) g_mutex_unlock (workers_mtx); } } + remove_async_thread (ud->task->s); } static void @@ -1240,6 +1241,7 @@ process_regexp_item (struct worker_task *task, void *user_data) thr_ud = memory_pool_alloc (task->task_pool, sizeof (struct regexp_threaded_ud)); thr_ud->item = item; thr_ud->task = task; + register_async_thread (task->s); g_thread_pool_push (regexp_module_ctx->workers, thr_ud, NULL); } else { diff --git a/src/smtp.c b/src/smtp.c index 2b5236cc6..46df67b36 100644 --- a/src/smtp.c +++ b/src/smtp.c @@ -589,8 +589,7 @@ smtp_dns_cb (struct rspamd_dns_reply *reply, void *arg) gint res = 0; union rspamd_reply_element *elt; GList *cur; - - remove_forced_event (session->s, (event_finalizer_t)smtp_dns_cb); + switch (session->state) { case SMTP_STATE_RESOLVE_REVERSE: /* Parse reverse reply and start resolve of this ip */ -- 2.39.5