aboutsummaryrefslogtreecommitdiffstats
path: root/src/events.c
diff options
context:
space:
mode:
authorVsevolod Stakhov <vsevolod@rambler-co.ru>2011-12-14 19:05:56 +0300
committerVsevolod Stakhov <vsevolod@rambler-co.ru>2011-12-14 19:05:56 +0300
commit4499fc92189905fde71139822d784ab7819b181c (patch)
tree4057f60b1af82b52649d05f79a804543c57ae6f4 /src/events.c
parent7072f8548414cf0e88babd7556f1f893c93ba3dc (diff)
downloadrspamd-4499fc92189905fde71139822d784ab7819b181c.tar.gz
rspamd-4499fc92189905fde71139822d784ab7819b181c.zip
* Finally get rid of stupid savepoints system and migrate to async events logic completely
Fix lua redis library.
Diffstat (limited to 'src/events.c')
-rw-r--r--src/events.c126
1 files changed, 84 insertions, 42 deletions
diff --git a/src/events.c b/src/events.c
index f91e3e847..6d848ca2d 100644
--- a/src/events.c
+++ b/src/events.c
@@ -26,20 +26,53 @@
#include "main.h"
#include "events.h"
+#undef RSPAMD_EVENTS_DEBUG
+
+static gboolean
+rspamd_event_equal (gconstpointer a, gconstpointer b)
+{
+ const struct rspamd_async_event *ev1 = a, *ev2 = b;
+
+ if (ev1->fin == ev2->fin) {
+ return ev1->user_data == ev2->user_data;
+ }
+
+ return FALSE;
+}
+
+static guint
+rspamd_event_hash (gconstpointer a)
+{
+ const struct rspamd_async_event *ev = a;
+ guint h = 0, i;
+ gchar *p;
+
+ p = (gchar *)ev->user_data;
+ for (i = 0; i < sizeof (gpointer); i ++) {
+ h ^= *p;
+ h += (h << 1) + (h << 4) + (h << 7) + (h << 8) + (h << 24);
+ p ++;
+ }
+
+ return h;
+}
struct rspamd_async_session *
-new_async_session (memory_pool_t * pool, event_finalizer_t fin, void *user_data)
+new_async_session (memory_pool_t * pool, event_finalizer_t fin, event_finalizer_t cleanup, void *user_data)
{
struct rspamd_async_session *new;
new = memory_pool_alloc (pool, sizeof (struct rspamd_async_session));
new->pool = pool;
new->fin = fin;
+ new->cleanup = cleanup;
new->user_data = user_data;
new->wanna_die = FALSE;
- new->events = g_queue_new ();
+ new->events = g_hash_table_new (rspamd_event_hash, rspamd_event_equal);
+ new->forced_events = g_queue_new ();
- memory_pool_add_destructor (pool, (pool_destruct_func) g_queue_free, new->events);
+ memory_pool_add_destructor (pool, (pool_destruct_func) g_hash_table_destroy, new->events);
+ memory_pool_add_destructor (pool, (pool_destruct_func) g_queue_free, new->forced_events);
return new;
}
@@ -57,7 +90,7 @@ register_async_event (struct rspamd_async_session *session, event_finalizer_t fi
if (forced) {
/* For forced events try first to increase its reference */
- cur = session->events->head;
+ cur = session->forced_events->head;
while (cur) {
ev = cur->data;
if (ev->forced && ev->fin == fin) {
@@ -73,7 +106,10 @@ register_async_event (struct rspamd_async_session *session, event_finalizer_t fi
new->user_data = user_data;
new->forced = forced;
new->ref = 1;
- g_queue_push_head (session->events, new);
+ g_hash_table_insert (session->events, new, new);
+#ifdef RSPAMD_EVENTS_DEBUG
+ msg_info ("added event: %p, pending %d events", user_data, g_hash_table_size (session->events));
+#endif
}
void
@@ -87,56 +123,64 @@ remove_forced_event (struct rspamd_async_session *session, event_finalizer_t fin
return;
}
- cur = session->events->head;
+ cur = session->forced_events->head;
while (cur) {
ev = cur->data;
if (ev->forced && ev->fin == fin) {
ev->ref--;
if (ev->ref == 0) {
- g_queue_delete_link (session->events, cur);
+ g_queue_delete_link (session->forced_events, cur);
}
break;
}
cur = g_list_next (cur);
}
- if (session->wanna_die && session->fin != NULL && g_queue_get_length (session->events) == 0) {
+ check_session_pending (session);
+
+ if (session->wanna_die && session->fin != NULL && g_queue_get_length (session->forced_events) == 0) {
/* Call session destroy after all forced events are ready */
- session->fin (session->user_data);
+ session->cleanup (session->user_data);
}
}
void
remove_normal_event (struct rspamd_async_session *session, event_finalizer_t fin, void *ud)
{
- struct rspamd_async_event *ev;
- GList *cur;
+ struct rspamd_async_event search_ev;
if (session == NULL) {
msg_info ("session is NULL");
return;
}
- cur = session->events->head;
- while (cur) {
- ev = cur->data;
- if (ev->fin == fin && ev->user_data == ud && !ev->forced) {
- g_queue_delete_link (session->events, cur);
- if (ev->fin) {
- ev->fin (ev->user_data);
- }
- break;
- }
- cur = g_list_next (cur);
+ search_ev.fin = fin;
+ search_ev.user_data = ud;
+ if (g_hash_table_remove (session->events, &search_ev)) {
+ fin (ud);
+ }
+
+#ifdef RSPAMD_EVENTS_DEBUG
+ msg_info ("removed event: %p, pending %d events", ud, g_hash_table_size (session->events));
+#endif
+
+ check_session_pending (session);
+}
+
+static void
+rspamd_session_destroy (gpointer k, gpointer v, gpointer unused)
+{
+ struct rspamd_async_event *ev = v;
+
+ /* Call event's finalizer */
+ if (ev->fin != NULL) {
+ ev->fin (ev->user_data);
}
}
gboolean
destroy_session (struct rspamd_async_session *session)
{
- struct rspamd_async_event *ev;
- GList *cur, *tmp;
-
if (session == NULL) {
msg_info ("session is NULL");
return FALSE;
@@ -144,30 +188,28 @@ destroy_session (struct rspamd_async_session *session)
session->wanna_die = TRUE;
- cur = session->events->head;
+ g_hash_table_foreach (session->events, rspamd_session_destroy, session);
- while (cur) {
- ev = cur->data;
- if (!ev->forced) {
- if (ev->fin != NULL) {
- ev->fin (ev->user_data);
- }
- tmp = cur;
- cur = g_list_next (cur);
- g_queue_delete_link (session->events, tmp);
- }
- else {
- /* Do nothing with forced callbacks */
- cur = g_list_next (cur);
+ if (g_queue_get_length (session->forced_events) == 0) {
+ if (session->cleanup != NULL) {
+ session->cleanup (session->user_data);
}
+ return TRUE;
}
- if (g_queue_get_length (session->events) == 0) {
+ return FALSE;
+}
+
+gboolean
+check_session_pending (struct rspamd_async_session *session)
+{
+ if (g_queue_get_length (session->forced_events) == 0 && g_hash_table_size (session->events) == 0) {
if (session->fin != NULL) {
session->fin (session->user_data);
}
- return TRUE;
+ /* No more events */
+ return FALSE;
}
- return FALSE;
+ return TRUE;
}