diff options
author | Vsevolod Stakhov <vsevolod@rambler-co.ru> | 2012-01-31 21:24:32 +0400 |
---|---|---|
committer | Vsevolod Stakhov <vsevolod@rambler-co.ru> | 2012-01-31 21:24:32 +0400 |
commit | 2d708971163dc99f9c29cc47e7d4f56a3af882c5 (patch) | |
tree | 91d29c6ac7727405e01f065eabe062431302483c /src/events.c | |
parent | 3958296431df16a30550047103008c1324fa331a (diff) | |
download | rspamd-2d708971163dc99f9c29cc47e7d4f56a3af882c5.tar.gz rspamd-2d708971163dc99f9c29cc47e7d4f56a3af882c5.zip |
Rework events library slightly:
- forced events are no longer checked or created
- add async threads to prevent session to be destroyed till all threads are finished
Diffstat (limited to 'src/events.c')
-rw-r--r-- | src/events.c | 81 |
1 files changed, 28 insertions, 53 deletions
diff --git a/src/events.c b/src/events.c index eb1e26094..bf47bb1f8 100644 --- a/src/events.c +++ b/src/events.c @@ -62,10 +62,9 @@ new_async_session (memory_pool_t * pool, event_finalizer_t fin, new->user_data = user_data; new->wanna_die = FALSE; new->events = g_hash_table_new (rspamd_event_hash, rspamd_event_equal); - new->forced_events = g_queue_new (); + new->threads = 0; memory_pool_add_destructor (pool, (pool_destruct_func) g_hash_table_destroy, new->events); - memory_pool_add_destructor (pool, (pool_destruct_func) g_queue_free, new->forced_events); return new; } @@ -73,32 +72,17 @@ new_async_session (memory_pool_t * pool, event_finalizer_t fin, void register_async_event (struct rspamd_async_session *session, event_finalizer_t fin, void *user_data, gboolean forced) { - struct rspamd_async_event *new, *ev; - GList *cur; + struct rspamd_async_event *new; if (session == NULL) { msg_info ("session is NULL"); return; } - if (forced) { - /* For forced events try first to increase its reference */ - cur = session->forced_events->head; - while (cur) { - ev = cur->data; - if (ev->forced && ev->fin == fin) { - ev->ref++; - return; - } - cur = g_list_next (cur); - } - } - new = memory_pool_alloc (session->pool, sizeof (struct rspamd_async_event)); new->fin = fin; new->user_data = user_data; - new->forced = forced; - new->ref = 1; + g_hash_table_insert (session->events, new, new); #ifdef RSPAMD_EVENTS_DEBUG msg_info ("added event: %p, pending %d events", user_data, g_hash_table_size (session->events)); @@ -106,38 +90,6 @@ register_async_event (struct rspamd_async_session *session, event_finalizer_t fi } void -remove_forced_event (struct rspamd_async_session *session, event_finalizer_t fin) -{ - struct rspamd_async_event *ev; - GList *cur; - - if (session == NULL) { - msg_info ("session is NULL"); - return; - } - - cur = session->forced_events->head; - while (cur) { - ev = cur->data; - if (ev->forced && ev->fin == fin) { - ev->ref--; - if (ev->ref == 0) { - g_queue_delete_link (session->forced_events, cur); - } - break; - } - cur = g_list_next (cur); - } - - check_session_pending (session); - - if (session->wanna_die && session->fin != NULL && g_queue_get_length (session->forced_events) == 0) { - /* Call session destroy after all forced events are ready */ - session->cleanup (session->user_data); - } -} - -void remove_normal_event (struct rspamd_async_session *session, event_finalizer_t fin, void *ud) { struct rspamd_async_event search_ev; @@ -183,7 +135,7 @@ destroy_session (struct rspamd_async_session *session) g_hash_table_foreach (session->events, rspamd_session_destroy, session); - if (g_queue_get_length (session->forced_events) == 0) { + if (session->threads == 0) { if (session->cleanup != NULL) { session->cleanup (session->user_data); } @@ -196,7 +148,7 @@ destroy_session (struct rspamd_async_session *session) gboolean check_session_pending (struct rspamd_async_session *session) { - if (g_queue_get_length (session->forced_events) == 0 && g_hash_table_size (session->events) == 0) { + if (session->threads == 0 && g_hash_table_size (session->events) == 0) { if (session->fin != NULL) { session->fin (session->user_data); } @@ -212,3 +164,26 @@ check_session_pending (struct rspamd_async_session *session) return TRUE; } + + +/** + * Add new async thread to session + * @param session session object + */ +void +register_async_thread (struct rspamd_async_session *session) +{ + g_atomic_int_inc (&session->threads); +} + +/** + * Remove async thread from session and check whether session can be terminated + * @param session session object + */ +void +remove_async_thread (struct rspamd_async_session *session) +{ + if (g_atomic_int_dec_and_test (&session->threads)) { + (void) check_session_pending (session); + } +} |