diff options
author | Vsevolod Stakhov <vsevolod@rambler-co.ru> | 2011-12-14 19:05:56 +0300 |
---|---|---|
committer | Vsevolod Stakhov <vsevolod@rambler-co.ru> | 2011-12-14 19:05:56 +0300 |
commit | 4499fc92189905fde71139822d784ab7819b181c (patch) | |
tree | 4057f60b1af82b52649d05f79a804543c57ae6f4 /src/events.c | |
parent | 7072f8548414cf0e88babd7556f1f893c93ba3dc (diff) | |
download | rspamd-4499fc92189905fde71139822d784ab7819b181c.tar.gz rspamd-4499fc92189905fde71139822d784ab7819b181c.zip |
* Finally get rid of stupid savepoints system and migrate to async events logic completely
Fix lua redis library.
Diffstat (limited to 'src/events.c')
-rw-r--r-- | src/events.c | 126 |
1 files changed, 84 insertions, 42 deletions
diff --git a/src/events.c b/src/events.c index f91e3e847..6d848ca2d 100644 --- a/src/events.c +++ b/src/events.c @@ -26,20 +26,53 @@ #include "main.h" #include "events.h" +#undef RSPAMD_EVENTS_DEBUG + +static gboolean +rspamd_event_equal (gconstpointer a, gconstpointer b) +{ + const struct rspamd_async_event *ev1 = a, *ev2 = b; + + if (ev1->fin == ev2->fin) { + return ev1->user_data == ev2->user_data; + } + + return FALSE; +} + +static guint +rspamd_event_hash (gconstpointer a) +{ + const struct rspamd_async_event *ev = a; + guint h = 0, i; + gchar *p; + + p = (gchar *)ev->user_data; + for (i = 0; i < sizeof (gpointer); i ++) { + h ^= *p; + h += (h << 1) + (h << 4) + (h << 7) + (h << 8) + (h << 24); + p ++; + } + + return h; +} struct rspamd_async_session * -new_async_session (memory_pool_t * pool, event_finalizer_t fin, void *user_data) +new_async_session (memory_pool_t * pool, event_finalizer_t fin, event_finalizer_t cleanup, void *user_data) { struct rspamd_async_session *new; new = memory_pool_alloc (pool, sizeof (struct rspamd_async_session)); new->pool = pool; new->fin = fin; + new->cleanup = cleanup; new->user_data = user_data; new->wanna_die = FALSE; - new->events = g_queue_new (); + new->events = g_hash_table_new (rspamd_event_hash, rspamd_event_equal); + new->forced_events = g_queue_new (); - memory_pool_add_destructor (pool, (pool_destruct_func) g_queue_free, new->events); + memory_pool_add_destructor (pool, (pool_destruct_func) g_hash_table_destroy, new->events); + memory_pool_add_destructor (pool, (pool_destruct_func) g_queue_free, new->forced_events); return new; } @@ -57,7 +90,7 @@ register_async_event (struct rspamd_async_session *session, event_finalizer_t fi if (forced) { /* For forced events try first to increase its reference */ - cur = session->events->head; + cur = session->forced_events->head; while (cur) { ev = cur->data; if (ev->forced && ev->fin == fin) { @@ -73,7 +106,10 @@ register_async_event (struct rspamd_async_session *session, event_finalizer_t fi new->user_data = user_data; new->forced = forced; new->ref = 1; - g_queue_push_head (session->events, new); + g_hash_table_insert (session->events, new, new); +#ifdef RSPAMD_EVENTS_DEBUG + msg_info ("added event: %p, pending %d events", user_data, g_hash_table_size (session->events)); +#endif } void @@ -87,56 +123,64 @@ remove_forced_event (struct rspamd_async_session *session, event_finalizer_t fin return; } - cur = session->events->head; + cur = session->forced_events->head; while (cur) { ev = cur->data; if (ev->forced && ev->fin == fin) { ev->ref--; if (ev->ref == 0) { - g_queue_delete_link (session->events, cur); + g_queue_delete_link (session->forced_events, cur); } break; } cur = g_list_next (cur); } - if (session->wanna_die && session->fin != NULL && g_queue_get_length (session->events) == 0) { + check_session_pending (session); + + if (session->wanna_die && session->fin != NULL && g_queue_get_length (session->forced_events) == 0) { /* Call session destroy after all forced events are ready */ - session->fin (session->user_data); + session->cleanup (session->user_data); } } void remove_normal_event (struct rspamd_async_session *session, event_finalizer_t fin, void *ud) { - struct rspamd_async_event *ev; - GList *cur; + struct rspamd_async_event search_ev; if (session == NULL) { msg_info ("session is NULL"); return; } - cur = session->events->head; - while (cur) { - ev = cur->data; - if (ev->fin == fin && ev->user_data == ud && !ev->forced) { - g_queue_delete_link (session->events, cur); - if (ev->fin) { - ev->fin (ev->user_data); - } - break; - } - cur = g_list_next (cur); + search_ev.fin = fin; + search_ev.user_data = ud; + if (g_hash_table_remove (session->events, &search_ev)) { + fin (ud); + } + +#ifdef RSPAMD_EVENTS_DEBUG + msg_info ("removed event: %p, pending %d events", ud, g_hash_table_size (session->events)); +#endif + + check_session_pending (session); +} + +static void +rspamd_session_destroy (gpointer k, gpointer v, gpointer unused) +{ + struct rspamd_async_event *ev = v; + + /* Call event's finalizer */ + if (ev->fin != NULL) { + ev->fin (ev->user_data); } } gboolean destroy_session (struct rspamd_async_session *session) { - struct rspamd_async_event *ev; - GList *cur, *tmp; - if (session == NULL) { msg_info ("session is NULL"); return FALSE; @@ -144,30 +188,28 @@ destroy_session (struct rspamd_async_session *session) session->wanna_die = TRUE; - cur = session->events->head; + g_hash_table_foreach (session->events, rspamd_session_destroy, session); - while (cur) { - ev = cur->data; - if (!ev->forced) { - if (ev->fin != NULL) { - ev->fin (ev->user_data); - } - tmp = cur; - cur = g_list_next (cur); - g_queue_delete_link (session->events, tmp); - } - else { - /* Do nothing with forced callbacks */ - cur = g_list_next (cur); + if (g_queue_get_length (session->forced_events) == 0) { + if (session->cleanup != NULL) { + session->cleanup (session->user_data); } + return TRUE; } - if (g_queue_get_length (session->events) == 0) { + return FALSE; +} + +gboolean +check_session_pending (struct rspamd_async_session *session) +{ + if (g_queue_get_length (session->forced_events) == 0 && g_hash_table_size (session->events) == 0) { if (session->fin != NULL) { session->fin (session->user_data); } - return TRUE; + /* No more events */ + return FALSE; } - return FALSE; + return TRUE; } |