summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--src/controller.c50
-rw-r--r--src/dns.c13
-rw-r--r--src/events.c126
-rw-r--r--src/events.h50
-rw-r--r--src/filter.c55
-rw-r--r--src/lua/lua_http.c14
-rw-r--r--src/lua/lua_redis.c25
-rw-r--r--src/lua/lua_task.c10
-rw-r--r--src/main.h10
-rw-r--r--src/plugins/fuzzy_check.c14
-rw-r--r--src/plugins/spf.c7
-rw-r--r--src/plugins/surbl.c69
-rw-r--r--src/smtp.c2
-rw-r--r--src/spf.c20
-rw-r--r--src/worker.c38
15 files changed, 202 insertions, 301 deletions
diff --git a/src/controller.c b/src/controller.c
index 206e17f82..fc26b36d9 100644
--- a/src/controller.c
+++ b/src/controller.c
@@ -832,6 +832,26 @@ process_normal_command (const gchar *line)
return NULL;
}
+static void
+fin_learn_task (void *arg)
+{
+ struct worker_task *task = (struct worker_task *)arg;
+
+ /* XXX: this is bad logic in fact */
+ /* Process all statfiles */
+ process_statfiles (task);
+ /* Call post filters */
+ lua_call_post_filters (task);
+ task->state = WRITE_REPLY;
+
+ if (task->fin_callback) {
+ task->fin_callback (task->fin_arg);
+ }
+ else {
+ rspamd_dispatcher_restore (task->dispatcher);
+ }
+}
+
static gboolean
controller_read_socket (f_str_t * in, void *arg)
{
@@ -943,7 +963,6 @@ controller_read_socket (f_str_t * in, void *arg)
r = process_message (task);
if (r == -1) {
msg_warn ("processing of message failed");
- destroy_session (task->s);
session->state = STATE_REPLY;
r = rspamd_snprintf (out_buf, sizeof (out_buf), "cannot process message" CRLF);
if (! rspamd_dispatcher_write (session->dispatcher, out_buf, r, FALSE, FALSE)) {
@@ -951,7 +970,8 @@ controller_read_socket (f_str_t * in, void *arg)
}
return FALSE;
}
-
+ /* Set up async session */
+ task->s = new_async_session (task->task_pool, fin_learn_task, free_task_hard, task);
r = process_filters (task);
if (r == -1) {
session->state = STATE_REPLY;
@@ -961,34 +981,12 @@ controller_read_socket (f_str_t * in, void *arg)
return FALSE;
}
}
- else if (r == 0) {
+ else {
session->state = STATE_LEARN_SPAM;
task->dispatcher = session->dispatcher;
session->learn_task = task;
rspamd_dispatcher_pause (session->dispatcher);
}
- else {
- lua_call_post_filters (task);
- session->state = STATE_REPLY;
-
- if (! learn_task_spam (session->learn_classifier, task, session->in_class, &err)) {
- if (err) {
- i = rspamd_snprintf (out_buf, sizeof (out_buf), "learn failed, learn classifier error: %s" CRLF END, err->message);
- g_error_free (err);
- }
- else {
- i = rspamd_snprintf (out_buf, sizeof (out_buf), "learn failed, unknown learn classifier error" CRLF END);
- }
- }
- else {
- i = rspamd_snprintf (out_buf, sizeof (out_buf), "learn ok" CRLF END);
- }
-
- destroy_session (task->s);
- if (!rspamd_dispatcher_write (session->dispatcher, out_buf, i, FALSE, FALSE)) {
- return FALSE;
- }
- }
break;
case STATE_WEIGHTS:
session->learn_buf = in;
@@ -1189,7 +1187,7 @@ accept_socket (gint fd, short what, void *arg)
io_tv->tv_sec = ctx->timeout / 1000;
io_tv->tv_usec = ctx->timeout - io_tv->tv_sec * 1000;
- new_session->s = new_async_session (new_session->session_pool, free_session, new_session);
+ new_session->s = new_async_session (new_session->session_pool, NULL, free_session, new_session);
new_session->dispatcher = rspamd_create_dispatcher (ctx->ev_base, nfd, BUFFER_LINE, controller_read_socket,
controller_write_socket, controller_err_socket, io_tv, (void *)new_session);
diff --git a/src/dns.c b/src/dns.c
index be0d2ad7d..1d868eb58 100644
--- a/src/dns.c
+++ b/src/dns.c
@@ -1220,6 +1220,7 @@ dns_read_cb (gint fd, short what, void *arg)
}
upstream_ok (&rep->request->server->up, rep->request->time);
rep->request->func (rep, rep->request->arg);
+ remove_normal_event (req->session, dns_fin_cb, req);
}
}
}
@@ -1239,11 +1240,11 @@ dns_timer_cb (gint fd, short what, void *arg)
rep->request = req;
rep->code = DNS_RC_SERVFAIL;
upstream_fail (&rep->request->server->up, rep->request->time);
- remove_normal_event (req->session, dns_fin_cb, req);
dns_check_throttling (req->resolver);
req->resolver->errors ++;
req->func (rep, req->arg);
+ remove_normal_event (req->session, dns_fin_cb, req);
return;
}
@@ -1262,8 +1263,9 @@ dns_timer_cb (gint fd, short what, void *arg)
rep = memory_pool_alloc0 (req->pool, sizeof (struct rspamd_dns_reply));
rep->request = req;
rep->code = DNS_RC_SERVFAIL;
- remove_normal_event (req->session, dns_fin_cb, req);
+
req->func (rep, req->arg);
+ remove_normal_event (req->session, dns_fin_cb, req);
return;
}
@@ -1277,8 +1279,9 @@ dns_timer_cb (gint fd, short what, void *arg)
rep->request = req;
rep->code = DNS_RC_SERVFAIL;
upstream_fail (&rep->request->server->up, rep->request->time);
- remove_normal_event (req->session, dns_fin_cb, req);
+
req->func (rep, req->arg);
+ remove_normal_event (req->session, dns_fin_cb, req);
return;
}
@@ -1288,9 +1291,9 @@ dns_timer_cb (gint fd, short what, void *arg)
rep = memory_pool_alloc0 (req->pool, sizeof (struct rspamd_dns_reply));
rep->request = req;
rep->code = DNS_RC_SERVFAIL;
- remove_normal_event (req->session, dns_fin_cb, req);
upstream_fail (&rep->request->server->up, rep->request->time);
req->func (rep, req->arg);
+ remove_normal_event (req->session, dns_fin_cb, req);
return;
}
evtimer_add (&req->timer_event, &req->tv);
@@ -1303,6 +1306,8 @@ dns_retransmit_handler (gint fd, short what, void *arg)
struct rspamd_dns_reply *rep;
gint r;
+ remove_normal_event (req->session, (event_finalizer_t)event_del, &req->io_event);
+
if (what == EV_WRITE) {
/* Retransmit dns request */
req->retransmits ++;
diff --git a/src/events.c b/src/events.c
index f91e3e847..6d848ca2d 100644
--- a/src/events.c
+++ b/src/events.c
@@ -26,20 +26,53 @@
#include "main.h"
#include "events.h"
+#undef RSPAMD_EVENTS_DEBUG
+
+static gboolean
+rspamd_event_equal (gconstpointer a, gconstpointer b)
+{
+ const struct rspamd_async_event *ev1 = a, *ev2 = b;
+
+ if (ev1->fin == ev2->fin) {
+ return ev1->user_data == ev2->user_data;
+ }
+
+ return FALSE;
+}
+
+static guint
+rspamd_event_hash (gconstpointer a)
+{
+ const struct rspamd_async_event *ev = a;
+ guint h = 0, i;
+ gchar *p;
+
+ p = (gchar *)ev->user_data;
+ for (i = 0; i < sizeof (gpointer); i ++) {
+ h ^= *p;
+ h += (h << 1) + (h << 4) + (h << 7) + (h << 8) + (h << 24);
+ p ++;
+ }
+
+ return h;
+}
struct rspamd_async_session *
-new_async_session (memory_pool_t * pool, event_finalizer_t fin, void *user_data)
+new_async_session (memory_pool_t * pool, event_finalizer_t fin, event_finalizer_t cleanup, void *user_data)
{
struct rspamd_async_session *new;
new = memory_pool_alloc (pool, sizeof (struct rspamd_async_session));
new->pool = pool;
new->fin = fin;
+ new->cleanup = cleanup;
new->user_data = user_data;
new->wanna_die = FALSE;
- new->events = g_queue_new ();
+ new->events = g_hash_table_new (rspamd_event_hash, rspamd_event_equal);
+ new->forced_events = g_queue_new ();
- memory_pool_add_destructor (pool, (pool_destruct_func) g_queue_free, new->events);
+ memory_pool_add_destructor (pool, (pool_destruct_func) g_hash_table_destroy, new->events);
+ memory_pool_add_destructor (pool, (pool_destruct_func) g_queue_free, new->forced_events);
return new;
}
@@ -57,7 +90,7 @@ register_async_event (struct rspamd_async_session *session, event_finalizer_t fi
if (forced) {
/* For forced events try first to increase its reference */
- cur = session->events->head;
+ cur = session->forced_events->head;
while (cur) {
ev = cur->data;
if (ev->forced && ev->fin == fin) {
@@ -73,7 +106,10 @@ register_async_event (struct rspamd_async_session *session, event_finalizer_t fi
new->user_data = user_data;
new->forced = forced;
new->ref = 1;
- g_queue_push_head (session->events, new);
+ g_hash_table_insert (session->events, new, new);
+#ifdef RSPAMD_EVENTS_DEBUG
+ msg_info ("added event: %p, pending %d events", user_data, g_hash_table_size (session->events));
+#endif
}
void
@@ -87,56 +123,64 @@ remove_forced_event (struct rspamd_async_session *session, event_finalizer_t fin
return;
}
- cur = session->events->head;
+ cur = session->forced_events->head;
while (cur) {
ev = cur->data;
if (ev->forced && ev->fin == fin) {
ev->ref--;
if (ev->ref == 0) {
- g_queue_delete_link (session->events, cur);
+ g_queue_delete_link (session->forced_events, cur);
}
break;
}
cur = g_list_next (cur);
}
- if (session->wanna_die && session->fin != NULL && g_queue_get_length (session->events) == 0) {
+ check_session_pending (session);
+
+ if (session->wanna_die && session->fin != NULL && g_queue_get_length (session->forced_events) == 0) {
/* Call session destroy after all forced events are ready */
- session->fin (session->user_data);
+ session->cleanup (session->user_data);
}
}
void
remove_normal_event (struct rspamd_async_session *session, event_finalizer_t fin, void *ud)
{
- struct rspamd_async_event *ev;
- GList *cur;
+ struct rspamd_async_event search_ev;
if (session == NULL) {
msg_info ("session is NULL");
return;
}
- cur = session->events->head;
- while (cur) {
- ev = cur->data;
- if (ev->fin == fin && ev->user_data == ud && !ev->forced) {
- g_queue_delete_link (session->events, cur);
- if (ev->fin) {
- ev->fin (ev->user_data);
- }
- break;
- }
- cur = g_list_next (cur);
+ search_ev.fin = fin;
+ search_ev.user_data = ud;
+ if (g_hash_table_remove (session->events, &search_ev)) {
+ fin (ud);
+ }
+
+#ifdef RSPAMD_EVENTS_DEBUG
+ msg_info ("removed event: %p, pending %d events", ud, g_hash_table_size (session->events));
+#endif
+
+ check_session_pending (session);
+}
+
+static void
+rspamd_session_destroy (gpointer k, gpointer v, gpointer unused)
+{
+ struct rspamd_async_event *ev = v;
+
+ /* Call event's finalizer */
+ if (ev->fin != NULL) {
+ ev->fin (ev->user_data);
}
}
gboolean
destroy_session (struct rspamd_async_session *session)
{
- struct rspamd_async_event *ev;
- GList *cur, *tmp;
-
if (session == NULL) {
msg_info ("session is NULL");
return FALSE;
@@ -144,30 +188,28 @@ destroy_session (struct rspamd_async_session *session)
session->wanna_die = TRUE;
- cur = session->events->head;
+ g_hash_table_foreach (session->events, rspamd_session_destroy, session);
- while (cur) {
- ev = cur->data;
- if (!ev->forced) {
- if (ev->fin != NULL) {
- ev->fin (ev->user_data);
- }
- tmp = cur;
- cur = g_list_next (cur);
- g_queue_delete_link (session->events, tmp);
- }
- else {
- /* Do nothing with forced callbacks */
- cur = g_list_next (cur);
+ if (g_queue_get_length (session->forced_events) == 0) {
+ if (session->cleanup != NULL) {
+ session->cleanup (session->user_data);
}
+ return TRUE;
}
- if (g_queue_get_length (session->events) == 0) {
+ return FALSE;
+}
+
+gboolean
+check_session_pending (struct rspamd_async_session *session)
+{
+ if (g_queue_get_length (session->forced_events) == 0 && g_hash_table_size (session->events) == 0) {
if (session->fin != NULL) {
session->fin (session->user_data);
}
- return TRUE;
+ /* No more events */
+ return FALSE;
}
- return FALSE;
+ return TRUE;
}
diff --git a/src/events.h b/src/events.h
index 3715b4d66..434c39d80 100644
--- a/src/events.h
+++ b/src/events.h
@@ -16,18 +16,49 @@ struct rspamd_async_event {
struct rspamd_async_session {
event_finalizer_t fin;
- GQueue *events;
+ event_finalizer_t cleanup;
+ GHashTable *events;
+ GQueue *forced_events;
void *user_data;
memory_pool_t *pool;
gboolean wanna_die;
};
-/* Makes new async session */
-struct rspamd_async_session *new_async_session (memory_pool_t *pool, event_finalizer_t fin, void *user_data);
-/* Insert event into session */
-void register_async_event (struct rspamd_async_session *session, event_finalizer_t fin, void *user_data, gboolean forced);
-/* Must be called by forced events to call session destructor properly */
+/**
+ * Make new async session
+ * @param pool pool to alloc memory from
+ * @param fin a callback called when no events are found in session
+ * @param cleanup a callback called when session is forcefully destroyed
+ * @param user_data abstract user data
+ * @return
+ */
+struct rspamd_async_session *new_async_session (memory_pool_t *pool,
+ event_finalizer_t fin, event_finalizer_t cleanup, void *user_data);
+
+/**
+ * Insert new event to the session
+ * @param session session object
+ * @param fin finalizer callback
+ * @param user_data abstract user_data
+ * @param forced session cannot be destroyed until forced event are still in it
+ */
+void register_async_event (struct rspamd_async_session *session,
+ event_finalizer_t fin, void *user_data, gboolean forced);
+
+
+/**
+ * Remove forced event
+ * @param session session object
+ * @param fin destructor function
+ */
void remove_forced_event (struct rspamd_async_session *session, event_finalizer_t fin);
+
+/**
+ * Remove normal event
+ * @param session session object
+ * @param fin final callback
+ * @param ud user data object
+ */
void remove_normal_event (struct rspamd_async_session *session, event_finalizer_t fin, void *ud);
/**
@@ -36,4 +67,11 @@ void remove_normal_event (struct rspamd_async_session *session, event_finalizer_
*/
gboolean destroy_session (struct rspamd_async_session *session);
+/**
+ * Check session for events pending and call fin callback if no events are pending
+ * @param session session object
+ * @return TRUE if session has pending events
+ */
+gboolean check_session_pending (struct rspamd_async_session *session);
+
#endif /* RSPAMD_EVENTS_H */
diff --git a/src/filter.c b/src/filter.c
index f4048955d..562f705e5 100644
--- a/src/filter.c
+++ b/src/filter.c
@@ -208,48 +208,6 @@ check_metric_is_spam (struct worker_task *task, struct metric *metric)
return FALSE;
}
-static gint
-continue_process_filters (struct worker_task *task)
-{
- GList *cur;
- gpointer item = task->save.item;
- struct metric *metric;
-
- while (call_symbol_callback (task, task->cfg->cache, &item)) {
- cur = task->cfg->metrics_list;
- while (cur) {
- metric = cur->data;
- /* call_filter_by_name (task, filt->func_name, filt->type, SCRIPT_HEADER); */
- if (task->save.saved) {
- task->save.entry = cur;
- task->save.item = item;
- return 0;
- }
- else if (!task->pass_all_filters &&
- metric->action == METRIC_ACTION_REJECT &&
- check_metric_is_spam (task, metric)) {
- goto end;
- }
- cur = g_list_next (cur);
- }
- }
-
-end:
- /* Process all statfiles */
- process_statfiles (task);
- /* Call post filters */
- lua_call_post_filters (task);
- task->state = WRITE_REPLY;
-
- if (task->fin_callback) {
- task->fin_callback (task->fin_arg);
- }
- else {
- rspamd_dispatcher_restore (task->dispatcher);
- }
- return 1;
-}
-
gint
process_filters (struct worker_task *task)
{
@@ -257,10 +215,6 @@ process_filters (struct worker_task *task)
struct metric *metric;
gpointer item = NULL;
- if (task->save.saved) {
- task->save.saved = 0;
- return continue_process_filters (task);
- }
/* Check skip */
if (check_skip (task->cfg->views, task)) {
task->is_skipped = TRUE;
@@ -282,15 +236,10 @@ process_filters (struct worker_task *task)
cur = task->cfg->metrics_list;
while (cur) {
metric = cur->data;
- if (task->save.saved) {
- task->save.entry = cur;
- task->save.item = item;
- return 0;
- }
- else if (!task->pass_all_filters &&
+ if (!task->pass_all_filters &&
metric->action == METRIC_ACTION_REJECT &&
check_metric_is_spam (task, metric)) {
- task->state = WRITE_REPLY;
+ check_session_pending (task->s);
return 1;
}
cur = g_list_next (cur);
diff --git a/src/lua/lua_http.c b/src/lua/lua_http.c
index 3764c96cc..16f237a36 100644
--- a/src/lua/lua_http.c
+++ b/src/lua/lua_http.c
@@ -103,13 +103,6 @@ lua_http_push_error (gint code, struct lua_http_ud *ud)
ud->parser_state = 3;
remove_normal_event (ud->task->s, lua_http_fin, ud);
-
- ud->task->save.saved--;
- if (ud->task->save.saved == 0) {
- /* Call other filters */
- ud->task->save.saved = 1;
- process_filters (ud->task);
- }
}
static void
@@ -151,12 +144,6 @@ lua_http_push_reply (f_str_t *in, struct lua_http_ud *ud)
}
remove_normal_event (ud->task->s, lua_http_fin, ud);
- ud->task->save.saved--;
- if (ud->task->save.saved == 0) {
- /* Call other filters */
- ud->task->save.saved = 1;
- process_filters (ud->task);
- }
}
/*
@@ -384,7 +371,6 @@ lua_http_make_request_common (lua_State *L, struct worker_task *task, const gcha
if (make_dns_request (task->resolver, task->s, task->task_pool, lua_http_dns_callback, ud,
DNS_REQUEST_A, hostname)) {
task->dns_requests ++;
- task->save.saved++;
}
return 0;
diff --git a/src/lua/lua_redis.c b/src/lua/lua_redis.c
index de8bff89b..49a7e9279 100644
--- a/src/lua/lua_redis.c
+++ b/src/lua/lua_redis.c
@@ -82,15 +82,8 @@ lua_redis_fin (void *arg)
struct lua_redis_userdata *ud = arg;
if (ud->ctx) {
- msg_info ("hui");
redisAsyncFree (ud->ctx);
luaL_unref (ud->L, LUA_REGISTRYINDEX, ud->cbref);
- /*
- ud->task->save.saved--;
- if (ud->task->save.saved == 0) {
- ud->task->save.saved = 1;
- process_filters (ud->task);
- }*/
}
}
@@ -112,7 +105,7 @@ lua_redis_push_error (const gchar *err, struct lua_redis_userdata *ud, gboolean
*ptask = ud->task;
/* String of error */
lua_pushstring (ud->L, err);
- /* Data */
+ /* Data is nil */
lua_pushnil (ud->L);
if (lua_pcall (ud->L, 3, 0, 0) != 0) {
msg_info ("call to callback failed: %s", lua_tostring (ud->L, -1));
@@ -138,8 +131,8 @@ lua_redis_push_data (const redisReply *r, struct lua_redis_userdata *ud)
lua_setclass (ud->L, "rspamd{task}", -1);
*ptask = ud->task;
- /* String of error */
- lua_pushstring (ud->L, ud->ctx->errstr);
+ /* Error is nil */
+ lua_pushnil (ud->L);
/* Data */
lua_pushlstring (ud->L, r->str, r->len);
@@ -162,8 +155,6 @@ lua_redis_callback (redisAsyncContext *c, gpointer r, gpointer priv)
redisReply *reply = r;
struct lua_redis_userdata *ud = priv;
- msg_info ("in callback: err: %d, r: %p", c->err, r);
-
if (c->err == 0) {
if (r != NULL) {
lua_redis_push_data (reply, ud);
@@ -173,7 +164,12 @@ lua_redis_callback (redisAsyncContext *c, gpointer r, gpointer priv)
}
}
else {
- lua_redis_push_error (c->errstr, ud, TRUE);
+ if (c->err == REDIS_ERR_IO) {
+ lua_redis_push_error (strerror (errno), ud, TRUE);
+ }
+ else {
+ lua_redis_push_error (c->errstr, ud, TRUE);
+ }
}
}
/**
@@ -191,7 +187,6 @@ lua_redis_make_request_real (struct lua_redis_userdata *ud)
}
else {
register_async_event (ud->task->s, lua_redis_fin, ud, FALSE);
- ud->task->save.saved ++;
}
redisLibeventAttach (ud->ctx, ud->task->ev_base);
/* Make a request now */
@@ -279,7 +274,7 @@ lua_redis_make_request (lua_State *L)
/* Now get remaining args */
ud->args_num = lua_gettop (L) - 5;
ud->args = memory_pool_alloc (task->task_pool, ud->args_num * sizeof (f_str_t));
- for (i = 0; i < ud->args_num - 1; i ++) {
+ for (i = 0; i < ud->args_num; i ++) {
tmp = lua_tolstring (L, i + 6, &ud->args[i].len);
/* Make a copy of argument */
ud->args[i].begin = memory_pool_alloc (task->task_pool, ud->args[i].len);
diff --git a/src/lua/lua_task.c b/src/lua/lua_task.c
index 478039149..19de0f7c8 100644
--- a/src/lua/lua_task.c
+++ b/src/lua/lua_task.c
@@ -538,13 +538,6 @@ lua_dns_callback (struct rspamd_dns_reply *reply, gpointer arg)
if (lua_pcall (cd->L, 5, 0, 0) != 0) {
msg_info ("call to %s failed: %s", cd->callback, lua_tostring (cd->L, -1));
}
-
- cd->task->save.saved--;
- if (cd->task->save.saved == 0) {
- /* Call other filters */
- cd->task->save.saved = 1;
- process_filters (cd->task);
- }
}
static gint
@@ -584,7 +577,6 @@ lua_task_resolve_dns_a (lua_State * L)
}
if (make_dns_request (task->resolver, task->s, task->task_pool, lua_dns_callback, (void *)cd, DNS_REQUEST_A, cd->to_resolve)) {
task->dns_requests ++;
- task->save.saved++;
}
}
return 0;
@@ -626,7 +618,6 @@ lua_task_resolve_dns_txt (lua_State * L)
}
if (make_dns_request (task->resolver, task->s, task->task_pool, lua_dns_callback, (void *)cd, DNS_REQUEST_TXT, cd->to_resolve)) {
task->dns_requests ++;
- task->save.saved++;
}
}
return 0;
@@ -671,7 +662,6 @@ lua_task_resolve_dns_ptr (lua_State * L)
if (make_dns_request (task->resolver, task->s, task->task_pool,
lua_dns_callback, (void *)cd, DNS_REQUEST_PTR, ina)) {
task->dns_requests ++;
- task->save.saved++;
}
}
return 0;
diff --git a/src/main.h b/src/main.h
index 59a5f3c30..8a680451a 100644
--- a/src/main.h
+++ b/src/main.h
@@ -108,15 +108,6 @@ struct counter_data {
};
/**
- * Save point object for delayed filters processing
- */
-struct save_point {
- GList *entry; /**< pointer to saved metric */
- void *item; /**< pointer to saved item */
- guint saved; /**< how much time we have delayed processing */
-};
-
-/**
* Structure to point exception in text from processing
*/
struct process_exception {
@@ -235,7 +226,6 @@ struct worker_task {
GList *messages; /**< list of messages that would be reported */
GHashTable *re_cache; /**< cache for matched or not matched regexps */
struct config_file *cfg; /**< pointer to config object */
- struct save_point save; /**< save point for delayed processing */
gchar *last_error; /**< last error */
gint error_code; /**< code of last error */
memory_pool_t *task_pool; /**< memory pool for task */
diff --git a/src/plugins/fuzzy_check.c b/src/plugins/fuzzy_check.c
index f330e7ecd..f7191117b 100644
--- a/src/plugins/fuzzy_check.c
+++ b/src/plugins/fuzzy_check.c
@@ -522,13 +522,6 @@ fuzzy_io_callback (gint fd, short what, void *arg)
msg_err ("got error on IO with server %s:%d, %d, %s", session->server->name, session->server->port, errno, strerror (errno));
ok:
remove_normal_event (session->task->s, fuzzy_io_fin, session);
-
- session->task->save.saved--;
- if (session->task->save.saved == 0) {
- /* Call other filters */
- session->task->save.saved = 1;
- process_filters (session->task);
- }
}
static void
@@ -601,12 +594,6 @@ fuzzy_learn_callback (gint fd, short what, void *arg)
}
ok:
remove_normal_event (session->session->s, fuzzy_learn_fin, session);
-
- (*session->saved)--;
- if (*session->saved == 0) {
- session->session->state = STATE_REPLY;
- session->session->dispatcher->write_callback (session->session);
- }
}
static inline void
@@ -642,7 +629,6 @@ register_fuzzy_call (struct worker_task *task, fuzzy_hash_t *h)
session->server = selected;
event_add (&session->ev, &session->tv);
register_async_event (task->s, fuzzy_io_fin, session, FALSE);
- task->save.saved++;
}
}
}
diff --git a/src/plugins/spf.c b/src/plugins/spf.c
index e9b1531ac..12ec80e67 100644
--- a/src/plugins/spf.c
+++ b/src/plugins/spf.c
@@ -227,13 +227,6 @@ spf_plugin_callback (struct spf_record *record, struct worker_task *task)
}
spf_check_list (l, task);
}
-
- if (task->save.saved == 0) {
- /* Call other filters */
- task->save.saved = 1;
- /* Note that here task MAY be destroyed */
- process_filters (task);
- }
}
diff --git a/src/plugins/surbl.c b/src/plugins/surbl.c
index b848d2a30..b1f51b7fc 100644
--- a/src/plugins/surbl.c
+++ b/src/plugins/surbl.c
@@ -634,7 +634,6 @@ make_surbl_requests (struct uri *url, struct worker_task *task,
debug_task ("send surbl dns request %s", surbl_req);
if (make_dns_request (task->resolver, task->s, task->task_pool, dns_callback, (void *)param, DNS_REQUEST_A, surbl_req)) {
task->dns_requests ++;
- param->task->save.saved++;
}
}
else if (err != NULL && err->code != WHITELIST_ERROR) {
@@ -703,13 +702,6 @@ dns_callback (struct rspamd_dns_reply *reply, gpointer arg)
else {
debug_task ("<%s> domain [%s] is not in surbl %s", param->task->message_id, param->host_resolve, param->suffix->suffix);
}
-
- param->task->save.saved--;
- if (param->task->save.saved == 0) {
- /* Call other filters */
- param->task->save.saved = 1;
- process_filters (param->task);
- }
}
static void
@@ -723,12 +715,6 @@ memcached_callback (memcached_ctx_t * ctx, memc_error_t error, void *data)
if (error != OK) {
msg_info ("memcached returned error %s on CONNECT stage", memc_strerror (error));
memc_close_ctx (param->ctx);
- param->task->save.saved--;
- if (param->task->save.saved == 0) {
- /* Call other filters */
- param->task->save.saved = 1;
- process_filters (param->task);
- }
}
else {
memc_get (param->ctx, param->ctx->param);
@@ -738,12 +724,6 @@ memcached_callback (memcached_ctx_t * ctx, memc_error_t error, void *data)
if (error != OK) {
msg_info ("memcached returned error %s on READ stage", memc_strerror (error));
memc_close_ctx (param->ctx);
- param->task->save.saved--;
- if (param->task->save.saved == 0) {
- /* Call other filters */
- param->task->save.saved = 1;
- process_filters (param->task);
- }
}
else {
url_count = (gint *)param->ctx->param->buf;
@@ -764,12 +744,6 @@ memcached_callback (memcached_ctx_t * ctx, memc_error_t error, void *data)
msg_info ("memcached returned error %s on WRITE stage", memc_strerror (error));
}
memc_close_ctx (param->ctx);
- param->task->save.saved--;
- if (param->task->save.saved == 0) {
- /* Call other filters */
- param->task->save.saved = 1;
- process_filters (param->task);
- }
make_surbl_requests (param->url, param->task, param->suffix, FALSE);
break;
default:
@@ -862,14 +836,6 @@ redirector_callback (gint fd, short what, void *arg)
msg_err ("write failed %s to %s", strerror (errno), param->redirector->name);
upstream_fail (&param->redirector->up, param->task->tv.tv_sec);
remove_normal_event (param->task->s, free_redirector_session, param);
-
- param->task->save.saved--;
- make_surbl_requests (param->url, param->task, param->suffix, FALSE);
- if (param->task->save.saved == 0) {
- /* Call other filters */
- param->task->save.saved = 1;
- process_filters (param->task);
- }
return;
}
param->state = STATE_READ;
@@ -880,13 +846,6 @@ redirector_callback (gint fd, short what, void *arg)
upstream_fail (&param->redirector->up, param->task->tv.tv_sec);
remove_normal_event (param->task->s, free_redirector_session, param);
- param->task->save.saved--;
- make_surbl_requests (param->url, param->task, param->suffix, FALSE);
- if (param->task->save.saved == 0) {
- /* Call other filters */
- param->task->save.saved = 1;
- process_filters (param->task);
- }
return;
}
break;
@@ -896,14 +855,8 @@ redirector_callback (gint fd, short what, void *arg)
if (r <= 0) {
msg_err ("read failed: %s from %s", strerror (errno), param->redirector->name);
upstream_fail (&param->redirector->up, param->task->tv.tv_sec);
- remove_normal_event (param->task->s, free_redirector_session, param);
- param->task->save.saved--;
make_surbl_requests (param->url, param->task, param->suffix, FALSE);
- if (param->task->save.saved == 0) {
- /* Call other filters */
- param->task->save.saved = 1;
- process_filters (param->task);
- }
+ remove_normal_event (param->task->s, free_redirector_session, param);
return;
}
@@ -925,29 +878,12 @@ redirector_callback (gint fd, short what, void *arg)
}
upstream_ok (&param->redirector->up, param->task->tv.tv_sec);
remove_normal_event (param->task->s, free_redirector_session, param);
-
- param->task->save.saved--;
- make_surbl_requests (param->url, param->task, param->suffix, FALSE);
- if (param->task->save.saved == 0) {
- /* Call other filters */
- param->task->save.saved = 1;
- process_filters (param->task);
- }
-
}
else {
msg_info ("<%s> reading redirector %s timed out, while waiting for read",
param->redirector->name, param->task->message_id);
upstream_fail (&param->redirector->up, param->task->tv.tv_sec);
remove_normal_event (param->task->s, free_redirector_session, param);
-
- param->task->save.saved--;
- make_surbl_requests (param->url, param->task, param->suffix, FALSE);
- if (param->task->save.saved == 0) {
- /* Call other filters */
- param->task->save.saved = 1;
- process_filters (param->task);
- }
}
break;
}
@@ -975,7 +911,6 @@ register_redirector_call (struct uri *url, struct worker_task *task,
if (s == -1) {
msg_info ("<%s> cannot create tcp socket failed: %s", task->message_id, strerror (errno));
- task->save.saved--;
make_surbl_requests (url, task, suffix, FALSE);
return;
}
@@ -1036,7 +971,6 @@ surbl_tree_url_callback (gpointer key, gpointer value, void *data)
insert_result (param->task, surbl_module_ctx->redirector_symbol, 1, g_list_prepend (NULL, red_domain));
}
register_redirector_call (url, param->task, param->suffix, red_domain);
- param->task->save.saved++;
return FALSE;
}
}
@@ -1047,7 +981,6 @@ surbl_tree_url_callback (gpointer key, gpointer value, void *data)
else {
if (param->task->worker->srv->cfg->memcached_servers_num > 0) {
register_memcached_call (url, param->task, param->suffix);
- param->task->save.saved++;
}
else {
make_surbl_requests (url, param->task, param->suffix, FALSE);
diff --git a/src/smtp.c b/src/smtp.c
index 0c88df3b0..8f3706a6b 100644
--- a/src/smtp.c
+++ b/src/smtp.c
@@ -700,7 +700,7 @@ accept_socket (gint fd, short what, void *arg)
/* Resolve client's addr */
/* Set up async session */
- session->s = new_async_session (session->pool, free_smtp_session, session);
+ session->s = new_async_session (session->pool, NULL, free_smtp_session, session);
session->state = SMTP_STATE_RESOLVE_REVERSE;
if (! make_dns_request (session->resolver, session->s, session->pool,
smtp_dns_cb, session, DNS_REQUEST_PTR, &session->client_addr)) {
diff --git a/src/spf.c b/src/spf.c
index ef64551b7..137ca0f5e 100644
--- a/src/spf.c
+++ b/src/spf.c
@@ -341,7 +341,6 @@ spf_record_dns_callback (struct rspamd_dns_reply *reply, gpointer arg)
/* Now resolve A record for this MX */
if (make_dns_request (task->resolver, task->s, task->task_pool, spf_record_dns_callback, (void *)cb, DNS_REQUEST_A, elt_data->mx.name)) {
task->dns_requests ++;
- task->save.saved++;
}
}
else if (reply->type == DNS_REQUEST_A) {
@@ -460,11 +459,6 @@ spf_record_dns_callback (struct rspamd_dns_reply *reply, gpointer arg)
break;
}
}
-
- cb->rec->task->save.saved--;
- if (cb->rec->task->save.saved == 0 && cb->rec->callback) {
- cb->rec->callback (cb->rec, cb->rec->task);
- }
}
static gboolean
@@ -494,7 +488,6 @@ parse_spf_a (struct worker_task *task, const gchar *begin, struct spf_record *re
cb->in_include = rec->in_include;
if (make_dns_request (task->resolver, task->s, task->task_pool, spf_record_dns_callback, (void *)cb, DNS_REQUEST_A, host)) {
task->dns_requests ++;
- task->save.saved++;
return TRUE;
}
@@ -542,7 +535,6 @@ parse_spf_mx (struct worker_task *task, const gchar *begin, struct spf_record *r
cb->in_include = rec->in_include;
if (make_dns_request (task->resolver, task->s, task->task_pool, spf_record_dns_callback, (void *)cb, DNS_REQUEST_MX, host)) {
task->dns_requests ++;
- task->save.saved++;
return TRUE;
}
@@ -600,8 +592,7 @@ parse_spf_include (struct worker_task *task, const gchar *begin, struct spf_reco
domain = memory_pool_strdup (task->task_pool, begin);
if (make_dns_request (task->resolver, task->s, task->task_pool, spf_record_dns_callback, (void *)cb, DNS_REQUEST_TXT, domain)) {
task->dns_requests ++;
- task->save.saved++;
-
+
return TRUE;
}
@@ -640,7 +631,6 @@ parse_spf_redirect (struct worker_task *task, const gchar *begin, struct spf_rec
domain = memory_pool_strdup (task->task_pool, begin);
if (make_dns_request (task->resolver, task->s, task->task_pool, spf_record_dns_callback, (void *)cb, DNS_REQUEST_TXT, domain)) {
task->dns_requests ++;
- task->save.saved++;
return TRUE;
}
@@ -672,7 +662,6 @@ parse_spf_exists (struct worker_task *task, const gchar *begin, struct spf_recor
if (make_dns_request (task->resolver, task->s, task->task_pool, spf_record_dns_callback, (void *)cb, DNS_REQUEST_A, host)) {
task->dns_requests ++;
- task->save.saved++;
return TRUE;
}
@@ -1164,11 +1153,6 @@ spf_dns_callback (struct rspamd_dns_reply *reply, gpointer arg)
cur = g_list_next (cur);
}
}
-
- rec->task->save.saved--;
- if (rec->task->save.saved == 0 && rec->callback) {
- rec->callback (rec, rec->task);
- }
}
gchar *
@@ -1236,7 +1220,6 @@ resolve_spf (struct worker_task *task, spf_cb_t callback)
if (make_dns_request (task->resolver, task->s, task->task_pool, spf_dns_callback, (void *)rec, DNS_REQUEST_TXT, rec->cur_domain)) {
task->dns_requests ++;
- task->save.saved++;
return TRUE;
}
}
@@ -1266,7 +1249,6 @@ resolve_spf (struct worker_task *task, spf_cb_t callback)
rec->sender_domain = rec->cur_domain;
if (make_dns_request (task->resolver, task->s, task->task_pool, spf_dns_callback, (void *)rec, DNS_REQUEST_TXT, rec->cur_domain)) {
task->dns_requests ++;
- task->save.saved++;
return TRUE;
}
}
diff --git a/src/worker.c b/src/worker.c
index 24cb0dd69..2411906b0 100644
--- a/src/worker.c
+++ b/src/worker.c
@@ -228,8 +228,6 @@ construct_task (struct rspamd_worker *worker)
memory_pool_add_destructor (new_task->task_pool,
(pool_destruct_func) g_tree_destroy,
new_task->urls);
- new_task->s =
- new_async_session (new_task->task_pool, free_task_hard, new_task);
new_task->sock = -1;
new_task->is_mime = TRUE;
@@ -489,16 +487,6 @@ read_socket (f_str_t * in, void *arg)
task->state = WRITE_ERROR;
return write_socket (task);
}
- else if (r == 0) {
- task->state = WAIT_FILTER;
- rspamd_dispatcher_pause (task->dispatcher);
- }
- else {
- process_statfiles (task);
- lua_call_post_filters (task);
- task->state = WRITE_REPLY;
- return write_socket (task);
- }
}
break;
case WRITE_REPLY:
@@ -594,6 +582,28 @@ err_socket (GError * err, void *arg)
}
/*
+ * Called if all filters are processed
+ */
+static void
+fin_task (void *arg)
+{
+ struct worker_task *task = (struct worker_task *) arg;
+
+ /* Process all statfiles */
+ process_statfiles (task);
+ /* Call post filters */
+ lua_call_post_filters (task);
+ task->state = WRITE_REPLY;
+
+ if (task->fin_callback) {
+ task->fin_callback (task->fin_arg);
+ }
+ else {
+ rspamd_dispatcher_restore (task->dispatcher);
+ }
+}
+
+/*
* Reduce number of tasks proceeded
*/
static void
@@ -670,6 +680,10 @@ accept_socket (gint fd, short what, void *arg)
ctx->tasks ++;
memory_pool_add_destructor (new_task->task_pool, (pool_destruct_func)reduce_tasks_count, &ctx->tasks);
+ /* Set up async session */
+ new_task->s =
+ new_async_session (new_task->task_pool, fin_task, free_task_hard, new_task);
+
/* Init custom filters */
#ifndef BUILD_STATIC
if (ctx->is_custom) {