*/
#include "config.h"
#include "rspamd.h"
+#include "contrib/uthash/utlist.h"
#include "events.h"
#include "cryptobox.h"
G_STRFUNC, \
__VA_ARGS__)
-struct rspamd_async_watcher {
+struct rspamd_watch_stack {
event_watcher_t cb;
gpointer ud;
+ struct rspamd_watch_stack *next;
+};
+
+struct rspamd_async_watcher {
+ struct rspamd_watch_stack *st;
guint remain;
gint id;
};
g_quark_to_string (subsystem));
}
+static inline void
+rspamd_session_call_watcher_stack (struct rspamd_async_session *session,
+ struct rspamd_async_watcher *w)
+{
+ struct rspamd_watch_stack *st;
+
+ LL_FOREACH (w->st, st) {
+ st->cb (session->user_data, st->ud);
+ }
+
+ w->st = NULL;
+}
+
void
rspamd_session_remove_event (struct rspamd_async_session *session,
event_finalizer_t fin,
if (found_ev->w) {
if (found_ev->w->remain > 0) {
if (--found_ev->w->remain == 0) {
- found_ev->w->cb (session->user_data, found_ev->w->ud);
+ rspamd_session_call_watcher_stack (session, found_ev->w);
}
}
}
event_watcher_t cb,
gpointer ud)
{
+ struct rspamd_watch_stack *st_elt;
+
g_assert (session != NULL);
g_assert (!RSPAMD_SESSION_IS_WATCHING (session));
if (session->cur_watcher == NULL) {
- session->cur_watcher = rspamd_mempool_alloc (session->pool,
+ session->cur_watcher = rspamd_mempool_alloc0 (session->pool,
sizeof (*session->cur_watcher));
+ session->flags |= RSPAMD_SESSION_FLAG_WATCHING;
}
- session->cur_watcher->cb = cb;
- session->cur_watcher->remain = 0;
- session->cur_watcher->ud = ud;
+ st_elt = rspamd_mempool_alloc (session->pool, sizeof (*st_elt));
+ st_elt->cb = cb;
+ st_elt->ud = ud;
+ LL_PREPEND (session->cur_watcher->st, st_elt);
+
session->cur_watcher->id = id;
- session->flags |= RSPAMD_SESSION_FLAG_WATCHING;
}
guint
return npending;
}
-void
-rspamd_session_watcher_push (struct rspamd_async_session *session)
+inline void
+rspamd_session_watcher_push_callback (struct rspamd_async_session *session,
+ struct rspamd_async_watcher *w,
+ event_watcher_t cb,
+ gpointer ud)
{
+ struct rspamd_watch_stack *st;
+
g_assert (session != NULL);
- if (RSPAMD_SESSION_IS_WATCHING (session)) {
- session->cur_watcher->remain ++;
+ if (w == NULL) {
+ if (RSPAMD_SESSION_IS_WATCHING (session)) {
+ w = session->cur_watcher;
+ }
+ else {
+ return;
+ }
+ }
+
+ if (w) {
+ w->remain ++;
msg_debug_session ("push session, watcher: %d, %d events",
- session->cur_watcher->id,
- session->cur_watcher->remain);
+ w->id,
+ w->remain);
+
+ if (cb) {
+ st = rspamd_mempool_alloc (session->pool, sizeof (*st));
+ st->cb = cb;
+ st->ud = ud;
+
+ LL_PREPEND (w->st, st);
+ }
}
}
+void
+rspamd_session_watcher_push (struct rspamd_async_session *session)
+{
+ rspamd_session_watcher_push_callback (session, NULL, NULL, NULL);
+}
+
void
rspamd_session_watcher_push_specific (struct rspamd_async_session *session,
struct rspamd_async_watcher *w)
{
- g_assert (session != NULL);
-
- if (w) {
- w->remain ++;
- msg_debug_session ("push specific, watcher: %d, %d events",
- w->id,
- w->remain);
- }
+ rspamd_session_watcher_push_callback (session, w, NULL, NULL);
}
void
w->remain);
if (--w->remain == 0) {
- w->cb (session->user_data, w->ud);
+ rspamd_session_call_watcher_stack (session, w);
}
}
}