aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorVsevolod Stakhov <vsevolod@highsecure.ru>2018-10-20 14:34:08 +0100
committerVsevolod Stakhov <vsevolod@highsecure.ru>2018-10-20 19:43:32 +0100
commitb849942bc86cf57d4ae4e57a676ed0f6a057cad2 (patch)
tree41b187bb0f23590d39574a93e1ca2930f8d570ea
parent5bdc64980877fdc234eba3419a3a596eddc0dc7f (diff)
downloadrspamd-b849942bc86cf57d4ae4e57a676ed0f6a057cad2.tar.gz
rspamd-b849942bc86cf57d4ae4e57a676ed0f6a057cad2.zip
[Project] Kill async watchers as they are way too complex
-rw-r--r--src/libserver/events.c259
-rw-r--r--src/libserver/events.h61
2 files changed, 5 insertions, 315 deletions
diff --git a/src/libserver/events.c b/src/libserver/events.c
index 325da4452..c938945d2 100644
--- a/src/libserver/events.c
+++ b/src/libserver/events.c
@@ -19,11 +19,9 @@
#include "events.h"
#include "cryptobox.h"
-#define RSPAMD_SESSION_FLAG_WATCHING (1 << 0)
#define RSPAMD_SESSION_FLAG_DESTROYING (1 << 1)
#define RSPAMD_SESSION_FLAG_CLEANUP (1 << 2)
-#define RSPAMD_SESSION_IS_WATCHING(s) ((s)->flags & RSPAMD_SESSION_FLAG_WATCHING)
#define RSPAMD_SESSION_CAN_ADD_EVENT(s) (!((s)->flags & (RSPAMD_SESSION_FLAG_DESTROYING|RSPAMD_SESSION_FLAG_CLEANUP)))
#define msg_err_session(...) rspamd_default_log_function(G_LOG_LEVEL_CRITICAL, \
@@ -48,23 +46,11 @@ INIT_LOG_MODULE(events)
/* Average symbols count to optimize hash allocation */
static struct rspamd_counter_data events_count;
-struct rspamd_watch_stack {
- event_watcher_t cb;
- gpointer ud;
- struct rspamd_watch_stack *next;
-};
-
-struct rspamd_async_watcher {
- struct rspamd_watch_stack *st;
- guint remain;
- gint id;
-};
struct rspamd_async_event {
GQuark subsystem;
event_finalizer_t fin;
void *user_data;
- struct rspamd_async_watcher *w;
};
static guint rspamd_event_hash (gconstpointer a);
@@ -85,7 +71,6 @@ struct rspamd_async_session {
khash_t(rspamd_events_hash) *events;
void *user_data;
rspamd_mempool_t *pool;
- struct rspamd_async_watcher *cur_watcher;
guint flags;
};
@@ -161,7 +146,6 @@ rspamd_session_create (rspamd_mempool_t * pool,
struct rspamd_async_event *
rspamd_session_add_event (struct rspamd_async_session *session,
- struct rspamd_async_watcher *w,
event_finalizer_t fin,
gpointer user_data,
GQuark subsystem)
@@ -188,37 +172,11 @@ rspamd_session_add_event (struct rspamd_async_session *session,
new_event->user_data = user_data;
new_event->subsystem = subsystem;
- if (w == NULL) {
- if (RSPAMD_SESSION_IS_WATCHING (session)) {
- new_event->w = session->cur_watcher;
- new_event->w->remain++;
- msg_debug_session ("added event: %p, pending %d events, "
- "subsystem: %s, watcher: %d (%d)",
- user_data,
- kh_size (session->events),
- g_quark_to_string (subsystem),
- new_event->w->id,
- new_event->w->remain);
- } else {
- new_event->w = NULL;
- msg_debug_session ("added event: %p, pending %d events, "
- "subsystem: %s, no watcher!",
- user_data,
- kh_size (session->events),
- g_quark_to_string (subsystem));
- }
- }
- else {
- new_event->w = w;
- new_event->w->remain++;
- msg_debug_session ("added event: %p, pending %d events, "
- "subsystem: %s, explicit watcher: %d (%d)",
- user_data,
- kh_size (session->events),
- g_quark_to_string (subsystem),
- new_event->w->id,
- new_event->w->remain);
- }
+ msg_debug_session ("added event: %p, pending %d events, "
+ "subsystem: %s",
+ user_data,
+ kh_size (session->events),
+ g_quark_to_string (subsystem));
kh_put (rspamd_events_hash, session->events, new_event, &ret);
g_assert (ret > 0);
@@ -226,19 +184,6 @@ rspamd_session_add_event (struct rspamd_async_session *session,
return new_event;
}
-static inline void
-rspamd_session_call_watcher_stack (struct rspamd_async_session *session,
- struct rspamd_async_watcher *w)
-{
- struct rspamd_watch_stack *st;
-
- LL_FOREACH (w->st, st) {
- st->cb (session->user_data, st->ud);
- }
-
- w->st = NULL;
-}
-
void
rspamd_session_remove_event (struct rspamd_async_session *session,
event_finalizer_t fin,
@@ -282,27 +227,6 @@ rspamd_session_remove_event (struct rspamd_async_session *session,
/* Remove event */
fin (ud);
- /* Call watcher if needed */
- if (found_ev->w) {
- msg_debug_session ("removed event: %p, subsystem: %s, "
- "pending %d events, watcher: %d (%d pending)", ud,
- g_quark_to_string (found_ev->subsystem),
- kh_size (session->events),
- found_ev->w->id, found_ev->w->remain - 1);
-
- if (found_ev->w->remain > 0) {
- if (--found_ev->w->remain == 0) {
- rspamd_session_call_watcher_stack (session, found_ev->w);
- }
- }
- }
- else {
- msg_debug_session ("removed event: %p, subsystem: %s, "
- "pending %d events, no watcher!", ud,
- g_quark_to_string (found_ev->subsystem),
- kh_size (session->events));
- }
-
rspamd_session_pending (session);
}
@@ -381,65 +305,6 @@ rspamd_session_pending (struct rspamd_async_session *session)
return ret;
}
-void
-rspamd_session_watch_start (struct rspamd_async_session *session,
- gint id,
- event_watcher_t cb,
- gpointer ud)
-{
- struct rspamd_watch_stack *st_elt;
-
- g_assert (session != NULL);
- g_assert (!RSPAMD_SESSION_IS_WATCHING (session));
-
- if (session->cur_watcher == NULL) {
- session->cur_watcher = rspamd_mempool_alloc0 (session->pool,
- sizeof (*session->cur_watcher));
-
- st_elt = rspamd_mempool_alloc (session->pool, sizeof (*st_elt));
- st_elt->cb = cb;
- st_elt->ud = ud;
- LL_PREPEND (session->cur_watcher->st, st_elt);
- }
- else {
- if (session->cur_watcher->st) {
- /* Reuse the existing (empty) watcher */
- session->cur_watcher->st->cb = cb;
- session->cur_watcher->st->ud = ud;
- }
- else {
- st_elt = rspamd_mempool_alloc (session->pool, sizeof (*st_elt));
- st_elt->cb = cb;
- st_elt->ud = ud;
- LL_PREPEND (session->cur_watcher->st, st_elt);
- }
- }
-
- session->cur_watcher->id = id;
- session->flags |= RSPAMD_SESSION_FLAG_WATCHING;
-}
-
-guint
-rspamd_session_watch_stop (struct rspamd_async_session *session)
-{
- guint remain;
-
- g_assert (session != NULL);
- g_assert (RSPAMD_SESSION_IS_WATCHING (session));
-
- remain = session->cur_watcher->remain;
-
- if (remain > 0) {
- /* Avoid reusing */
- session->cur_watcher = NULL;
- }
-
- session->flags &= ~RSPAMD_SESSION_FLAG_WATCHING;
-
- return remain;
-}
-
-
guint
rspamd_session_events_pending (struct rspamd_async_session *session)
{
@@ -450,123 +315,9 @@ rspamd_session_events_pending (struct rspamd_async_session *session)
npending = kh_size (session->events);
msg_debug_session ("pending %d events", npending);
- if (RSPAMD_SESSION_IS_WATCHING (session)) {
- npending += session->cur_watcher->remain;
- msg_debug_session ("pending %d watchers, id: %d",
- session->cur_watcher->remain, session->cur_watcher->id);
- }
-
return npending;
}
-inline void
-rspamd_session_watcher_push_callback (struct rspamd_async_session *session,
- struct rspamd_async_watcher *w,
- event_watcher_t cb,
- gpointer ud)
-{
- struct rspamd_watch_stack *st;
-
- g_assert (session != NULL);
-
- if (w == NULL) {
- if (RSPAMD_SESSION_IS_WATCHING (session)) {
- w = session->cur_watcher;
- }
- else {
- return;
- }
- }
-
- if (w) {
- w->remain ++;
- msg_debug_session ("push session, watcher: %d, %d events",
- w->id,
- w->remain);
-
- if (cb) {
- st = rspamd_mempool_alloc (session->pool, sizeof (*st));
- st->cb = cb;
- st->ud = ud;
-
- LL_PREPEND (w->st, st);
- }
- }
-}
-
-void
-rspamd_session_watcher_push (struct rspamd_async_session *session)
-{
- rspamd_session_watcher_push_callback (session, NULL, NULL, NULL);
-}
-
-void
-rspamd_session_watcher_push_specific (struct rspamd_async_session *session,
- struct rspamd_async_watcher *w)
-{
- rspamd_session_watcher_push_callback (session, w, NULL, NULL);
-}
-
-void
-rspamd_session_watcher_pop (struct rspamd_async_session *session,
- struct rspamd_async_watcher *w)
-{
- g_assert (session != NULL);
-
- if (w && w->remain > 0) {
- msg_debug_session ("pop session, watcher: %d, %d events", w->id,
- w->remain);
- w->remain --;
-
- if (w->remain == 0) {
- rspamd_session_call_watcher_stack (session, w);
- }
- }
-}
-
-struct rspamd_async_watcher*
-rspamd_session_get_watcher (struct rspamd_async_session *session)
-{
- g_assert (session != NULL);
-
- if (RSPAMD_SESSION_IS_WATCHING (session)) {
- return session->cur_watcher;
- }
- else {
- return NULL;
- }
-}
-
-struct rspamd_async_watcher*
-rspamd_session_replace_watcher (struct rspamd_async_session *s,
- struct rspamd_async_watcher *w)
-{
- struct rspamd_async_watcher *res = NULL;
-
- g_assert (s != NULL);
-
- if (s->cur_watcher) {
- res = s->cur_watcher;
-
- if (!w) {
- /* We remove watching, so clear watching flag as well */
- s->flags &= ~RSPAMD_SESSION_FLAG_WATCHING;
-
- }
-
- s->cur_watcher = w;
- }
- else {
- if (w) {
- s->flags |= RSPAMD_SESSION_FLAG_WATCHING;
- }
-
- s->cur_watcher = w;
- }
-
- return res;
-}
-
rspamd_mempool_t *
rspamd_session_mempool (struct rspamd_async_session *session)
{
diff --git a/src/libserver/events.h b/src/libserver/events.h
index 68895d38d..85d4e48e2 100644
--- a/src/libserver/events.h
+++ b/src/libserver/events.h
@@ -21,10 +21,8 @@
struct rspamd_async_event;
struct rspamd_async_session;
-struct rspamd_async_watcher;
typedef void (*event_finalizer_t)(gpointer ud);
-typedef void (*event_watcher_t)(gpointer session_data, gpointer ud);
typedef gboolean (*session_finalizer_t)(gpointer user_data);
/**
@@ -49,7 +47,6 @@ struct rspamd_async_session * rspamd_session_create (rspamd_mempool_t *pool,
*/
struct rspamd_async_event *
rspamd_session_add_event (struct rspamd_async_session *session,
- struct rspamd_async_watcher *w,
event_finalizer_t fin,
gpointer user_data,
GQuark subsystem);
@@ -96,64 +93,6 @@ gboolean rspamd_session_pending (struct rspamd_async_session *session);
*/
guint rspamd_session_events_pending (struct rspamd_async_session *session);
-/**
- * Start watching for events in the session, so the specified watcher will be added
- * to all subsequent events until `rspamd_session_watch_stop` is called
- * @param s session object
- * @param cb watcher callback that is called when all events watched are destroyed
- * @param ud opaque data for the callback
- */
-void rspamd_session_watch_start (struct rspamd_async_session *s,
- gint id,
- event_watcher_t cb,
- gpointer ud);
-
-/**
- * Stop watching mode, if no events are watched since the last `rspamd_session_watch_start`,
- * then the watcher is silently ignored
- * @param s session
- * @return number of events watched
- */
-guint rspamd_session_watch_stop (struct rspamd_async_session *s);
-
-/**
- * Create a fake event just for event watcher
- * @param s
- */
-void rspamd_session_watcher_push (struct rspamd_async_session *s);
-
-/**
- * Push callback to the watcher specified
- */
-void rspamd_session_watcher_push_callback (struct rspamd_async_session *s,
- struct rspamd_async_watcher *w,
- event_watcher_t cb,
- gpointer ud);
-
-/**
- * Increase refcount for a specific watcher
- */
-void rspamd_session_watcher_push_specific (struct rspamd_async_session *s,
- struct rspamd_async_watcher *w);
-
-/**
- * Remove a fake event from a watcher
- * @param s
- */
-void rspamd_session_watcher_pop (struct rspamd_async_session *s,
- struct rspamd_async_watcher *w);
-
-/**
- * Returns the current watcher for events session
- * @param s
- * @return
- */
-struct rspamd_async_watcher* rspamd_session_get_watcher (
- struct rspamd_async_session *s);
-
-struct rspamd_async_watcher* rspamd_session_replace_watcher (
- struct rspamd_async_session *s,
- struct rspamd_async_watcher *w);
/**
* Returns TRUE if an async session is currently destroying