diff options
-rw-r--r-- | src/libserver/events.c | 259 | ||||
-rw-r--r-- | src/libserver/events.h | 61 |
2 files changed, 5 insertions, 315 deletions
diff --git a/src/libserver/events.c b/src/libserver/events.c index 325da4452..c938945d2 100644 --- a/src/libserver/events.c +++ b/src/libserver/events.c @@ -19,11 +19,9 @@ #include "events.h" #include "cryptobox.h" -#define RSPAMD_SESSION_FLAG_WATCHING (1 << 0) #define RSPAMD_SESSION_FLAG_DESTROYING (1 << 1) #define RSPAMD_SESSION_FLAG_CLEANUP (1 << 2) -#define RSPAMD_SESSION_IS_WATCHING(s) ((s)->flags & RSPAMD_SESSION_FLAG_WATCHING) #define RSPAMD_SESSION_CAN_ADD_EVENT(s) (!((s)->flags & (RSPAMD_SESSION_FLAG_DESTROYING|RSPAMD_SESSION_FLAG_CLEANUP))) #define msg_err_session(...) rspamd_default_log_function(G_LOG_LEVEL_CRITICAL, \ @@ -48,23 +46,11 @@ INIT_LOG_MODULE(events) /* Average symbols count to optimize hash allocation */ static struct rspamd_counter_data events_count; -struct rspamd_watch_stack { - event_watcher_t cb; - gpointer ud; - struct rspamd_watch_stack *next; -}; - -struct rspamd_async_watcher { - struct rspamd_watch_stack *st; - guint remain; - gint id; -}; struct rspamd_async_event { GQuark subsystem; event_finalizer_t fin; void *user_data; - struct rspamd_async_watcher *w; }; static guint rspamd_event_hash (gconstpointer a); @@ -85,7 +71,6 @@ struct rspamd_async_session { khash_t(rspamd_events_hash) *events; void *user_data; rspamd_mempool_t *pool; - struct rspamd_async_watcher *cur_watcher; guint flags; }; @@ -161,7 +146,6 @@ rspamd_session_create (rspamd_mempool_t * pool, struct rspamd_async_event * rspamd_session_add_event (struct rspamd_async_session *session, - struct rspamd_async_watcher *w, event_finalizer_t fin, gpointer user_data, GQuark subsystem) @@ -188,37 +172,11 @@ rspamd_session_add_event (struct rspamd_async_session *session, new_event->user_data = user_data; new_event->subsystem = subsystem; - if (w == NULL) { - if (RSPAMD_SESSION_IS_WATCHING (session)) { - new_event->w = session->cur_watcher; - new_event->w->remain++; - msg_debug_session ("added event: %p, pending %d events, " - "subsystem: %s, watcher: %d (%d)", - user_data, - kh_size (session->events), - g_quark_to_string (subsystem), - new_event->w->id, - new_event->w->remain); - } else { - new_event->w = NULL; - msg_debug_session ("added event: %p, pending %d events, " - "subsystem: %s, no watcher!", - user_data, - kh_size (session->events), - g_quark_to_string (subsystem)); - } - } - else { - new_event->w = w; - new_event->w->remain++; - msg_debug_session ("added event: %p, pending %d events, " - "subsystem: %s, explicit watcher: %d (%d)", - user_data, - kh_size (session->events), - g_quark_to_string (subsystem), - new_event->w->id, - new_event->w->remain); - } + msg_debug_session ("added event: %p, pending %d events, " + "subsystem: %s", + user_data, + kh_size (session->events), + g_quark_to_string (subsystem)); kh_put (rspamd_events_hash, session->events, new_event, &ret); g_assert (ret > 0); @@ -226,19 +184,6 @@ rspamd_session_add_event (struct rspamd_async_session *session, return new_event; } -static inline void -rspamd_session_call_watcher_stack (struct rspamd_async_session *session, - struct rspamd_async_watcher *w) -{ - struct rspamd_watch_stack *st; - - LL_FOREACH (w->st, st) { - st->cb (session->user_data, st->ud); - } - - w->st = NULL; -} - void rspamd_session_remove_event (struct rspamd_async_session *session, event_finalizer_t fin, @@ -282,27 +227,6 @@ rspamd_session_remove_event (struct rspamd_async_session *session, /* Remove event */ fin (ud); - /* Call watcher if needed */ - if (found_ev->w) { - msg_debug_session ("removed event: %p, subsystem: %s, " - "pending %d events, watcher: %d (%d pending)", ud, - g_quark_to_string (found_ev->subsystem), - kh_size (session->events), - found_ev->w->id, found_ev->w->remain - 1); - - if (found_ev->w->remain > 0) { - if (--found_ev->w->remain == 0) { - rspamd_session_call_watcher_stack (session, found_ev->w); - } - } - } - else { - msg_debug_session ("removed event: %p, subsystem: %s, " - "pending %d events, no watcher!", ud, - g_quark_to_string (found_ev->subsystem), - kh_size (session->events)); - } - rspamd_session_pending (session); } @@ -381,65 +305,6 @@ rspamd_session_pending (struct rspamd_async_session *session) return ret; } -void -rspamd_session_watch_start (struct rspamd_async_session *session, - gint id, - event_watcher_t cb, - gpointer ud) -{ - struct rspamd_watch_stack *st_elt; - - g_assert (session != NULL); - g_assert (!RSPAMD_SESSION_IS_WATCHING (session)); - - if (session->cur_watcher == NULL) { - session->cur_watcher = rspamd_mempool_alloc0 (session->pool, - sizeof (*session->cur_watcher)); - - st_elt = rspamd_mempool_alloc (session->pool, sizeof (*st_elt)); - st_elt->cb = cb; - st_elt->ud = ud; - LL_PREPEND (session->cur_watcher->st, st_elt); - } - else { - if (session->cur_watcher->st) { - /* Reuse the existing (empty) watcher */ - session->cur_watcher->st->cb = cb; - session->cur_watcher->st->ud = ud; - } - else { - st_elt = rspamd_mempool_alloc (session->pool, sizeof (*st_elt)); - st_elt->cb = cb; - st_elt->ud = ud; - LL_PREPEND (session->cur_watcher->st, st_elt); - } - } - - session->cur_watcher->id = id; - session->flags |= RSPAMD_SESSION_FLAG_WATCHING; -} - -guint -rspamd_session_watch_stop (struct rspamd_async_session *session) -{ - guint remain; - - g_assert (session != NULL); - g_assert (RSPAMD_SESSION_IS_WATCHING (session)); - - remain = session->cur_watcher->remain; - - if (remain > 0) { - /* Avoid reusing */ - session->cur_watcher = NULL; - } - - session->flags &= ~RSPAMD_SESSION_FLAG_WATCHING; - - return remain; -} - - guint rspamd_session_events_pending (struct rspamd_async_session *session) { @@ -450,123 +315,9 @@ rspamd_session_events_pending (struct rspamd_async_session *session) npending = kh_size (session->events); msg_debug_session ("pending %d events", npending); - if (RSPAMD_SESSION_IS_WATCHING (session)) { - npending += session->cur_watcher->remain; - msg_debug_session ("pending %d watchers, id: %d", - session->cur_watcher->remain, session->cur_watcher->id); - } - return npending; } -inline void -rspamd_session_watcher_push_callback (struct rspamd_async_session *session, - struct rspamd_async_watcher *w, - event_watcher_t cb, - gpointer ud) -{ - struct rspamd_watch_stack *st; - - g_assert (session != NULL); - - if (w == NULL) { - if (RSPAMD_SESSION_IS_WATCHING (session)) { - w = session->cur_watcher; - } - else { - return; - } - } - - if (w) { - w->remain ++; - msg_debug_session ("push session, watcher: %d, %d events", - w->id, - w->remain); - - if (cb) { - st = rspamd_mempool_alloc (session->pool, sizeof (*st)); - st->cb = cb; - st->ud = ud; - - LL_PREPEND (w->st, st); - } - } -} - -void -rspamd_session_watcher_push (struct rspamd_async_session *session) -{ - rspamd_session_watcher_push_callback (session, NULL, NULL, NULL); -} - -void -rspamd_session_watcher_push_specific (struct rspamd_async_session *session, - struct rspamd_async_watcher *w) -{ - rspamd_session_watcher_push_callback (session, w, NULL, NULL); -} - -void -rspamd_session_watcher_pop (struct rspamd_async_session *session, - struct rspamd_async_watcher *w) -{ - g_assert (session != NULL); - - if (w && w->remain > 0) { - msg_debug_session ("pop session, watcher: %d, %d events", w->id, - w->remain); - w->remain --; - - if (w->remain == 0) { - rspamd_session_call_watcher_stack (session, w); - } - } -} - -struct rspamd_async_watcher* -rspamd_session_get_watcher (struct rspamd_async_session *session) -{ - g_assert (session != NULL); - - if (RSPAMD_SESSION_IS_WATCHING (session)) { - return session->cur_watcher; - } - else { - return NULL; - } -} - -struct rspamd_async_watcher* -rspamd_session_replace_watcher (struct rspamd_async_session *s, - struct rspamd_async_watcher *w) -{ - struct rspamd_async_watcher *res = NULL; - - g_assert (s != NULL); - - if (s->cur_watcher) { - res = s->cur_watcher; - - if (!w) { - /* We remove watching, so clear watching flag as well */ - s->flags &= ~RSPAMD_SESSION_FLAG_WATCHING; - - } - - s->cur_watcher = w; - } - else { - if (w) { - s->flags |= RSPAMD_SESSION_FLAG_WATCHING; - } - - s->cur_watcher = w; - } - - return res; -} - rspamd_mempool_t * rspamd_session_mempool (struct rspamd_async_session *session) { diff --git a/src/libserver/events.h b/src/libserver/events.h index 68895d38d..85d4e48e2 100644 --- a/src/libserver/events.h +++ b/src/libserver/events.h @@ -21,10 +21,8 @@ struct rspamd_async_event; struct rspamd_async_session; -struct rspamd_async_watcher; typedef void (*event_finalizer_t)(gpointer ud); -typedef void (*event_watcher_t)(gpointer session_data, gpointer ud); typedef gboolean (*session_finalizer_t)(gpointer user_data); /** @@ -49,7 +47,6 @@ struct rspamd_async_session * rspamd_session_create (rspamd_mempool_t *pool, */ struct rspamd_async_event * rspamd_session_add_event (struct rspamd_async_session *session, - struct rspamd_async_watcher *w, event_finalizer_t fin, gpointer user_data, GQuark subsystem); @@ -96,64 +93,6 @@ gboolean rspamd_session_pending (struct rspamd_async_session *session); */ guint rspamd_session_events_pending (struct rspamd_async_session *session); -/** - * Start watching for events in the session, so the specified watcher will be added - * to all subsequent events until `rspamd_session_watch_stop` is called - * @param s session object - * @param cb watcher callback that is called when all events watched are destroyed - * @param ud opaque data for the callback - */ -void rspamd_session_watch_start (struct rspamd_async_session *s, - gint id, - event_watcher_t cb, - gpointer ud); - -/** - * Stop watching mode, if no events are watched since the last `rspamd_session_watch_start`, - * then the watcher is silently ignored - * @param s session - * @return number of events watched - */ -guint rspamd_session_watch_stop (struct rspamd_async_session *s); - -/** - * Create a fake event just for event watcher - * @param s - */ -void rspamd_session_watcher_push (struct rspamd_async_session *s); - -/** - * Push callback to the watcher specified - */ -void rspamd_session_watcher_push_callback (struct rspamd_async_session *s, - struct rspamd_async_watcher *w, - event_watcher_t cb, - gpointer ud); - -/** - * Increase refcount for a specific watcher - */ -void rspamd_session_watcher_push_specific (struct rspamd_async_session *s, - struct rspamd_async_watcher *w); - -/** - * Remove a fake event from a watcher - * @param s - */ -void rspamd_session_watcher_pop (struct rspamd_async_session *s, - struct rspamd_async_watcher *w); - -/** - * Returns the current watcher for events session - * @param s - * @return - */ -struct rspamd_async_watcher* rspamd_session_get_watcher ( - struct rspamd_async_session *s); - -struct rspamd_async_watcher* rspamd_session_replace_watcher ( - struct rspamd_async_session *s, - struct rspamd_async_watcher *w); /** * Returns TRUE if an async session is currently destroying |