]> source.dussan.org Git - rspamd.git/commitdiff
* Finally get rid of stupid savepoints system and migrate to async events logic compl...
authorVsevolod Stakhov <vsevolod@rambler-co.ru>
Wed, 14 Dec 2011 16:05:56 +0000 (19:05 +0300)
committerVsevolod Stakhov <vsevolod@rambler-co.ru>
Wed, 14 Dec 2011 16:05:56 +0000 (19:05 +0300)
Fix lua redis library.

15 files changed:
src/controller.c
src/dns.c
src/events.c
src/events.h
src/filter.c
src/lua/lua_http.c
src/lua/lua_redis.c
src/lua/lua_task.c
src/main.h
src/plugins/fuzzy_check.c
src/plugins/spf.c
src/plugins/surbl.c
src/smtp.c
src/spf.c
src/worker.c

index 206e17f82991cbfb594c248a2f7f712ee28afc24..fc26b36d974fc8c8909fbaca73020f01b25d2684 100644 (file)
@@ -832,6 +832,26 @@ process_normal_command (const gchar *line)
        return NULL;
 }
 
+static void
+fin_learn_task (void *arg)
+{
+       struct worker_task             *task = (struct worker_task *)arg;
+
+       /* XXX: this is bad logic in fact */
+       /* Process all statfiles */
+       process_statfiles (task);
+       /* Call post filters */
+       lua_call_post_filters (task);
+       task->state = WRITE_REPLY;
+
+       if (task->fin_callback) {
+               task->fin_callback (task->fin_arg);
+       }
+       else {
+               rspamd_dispatcher_restore (task->dispatcher);
+       }
+}
+
 static                          gboolean
 controller_read_socket (f_str_t * in, void *arg)
 {
@@ -943,7 +963,6 @@ controller_read_socket (f_str_t * in, void *arg)
                r = process_message (task);
                if (r == -1) {
                        msg_warn ("processing of message failed");
-                       destroy_session (task->s);
                        session->state = STATE_REPLY;
                        r = rspamd_snprintf (out_buf, sizeof (out_buf), "cannot process message" CRLF);
                        if (! rspamd_dispatcher_write (session->dispatcher, out_buf, r, FALSE, FALSE)) {
@@ -951,7 +970,8 @@ controller_read_socket (f_str_t * in, void *arg)
                        }
                        return FALSE;
                }
-
+               /* Set up async session */
+               task->s = new_async_session (task->task_pool, fin_learn_task, free_task_hard, task);
                r = process_filters (task);
                if (r == -1) {
                        session->state = STATE_REPLY;
@@ -961,34 +981,12 @@ controller_read_socket (f_str_t * in, void *arg)
                                return FALSE;
                        }
                }
-               else if (r == 0) {
+               else {
                        session->state = STATE_LEARN_SPAM;
                        task->dispatcher = session->dispatcher;
                        session->learn_task = task;
                        rspamd_dispatcher_pause (session->dispatcher);
                }
-               else {
-                       lua_call_post_filters (task);
-                       session->state = STATE_REPLY;
-
-                       if (! learn_task_spam (session->learn_classifier, task, session->in_class, &err)) {
-                               if (err) {
-                                       i = rspamd_snprintf (out_buf, sizeof (out_buf), "learn failed, learn classifier error: %s" CRLF END, err->message);
-                                       g_error_free (err);
-                               }
-                               else {
-                                       i = rspamd_snprintf (out_buf, sizeof (out_buf), "learn failed, unknown learn classifier error" CRLF END);
-                               }
-                       }
-                       else {
-                               i = rspamd_snprintf (out_buf, sizeof (out_buf), "learn ok" CRLF END);
-                       }
-
-                       destroy_session (task->s);
-                       if (!rspamd_dispatcher_write (session->dispatcher, out_buf, i, FALSE, FALSE)) {
-                               return FALSE;
-                       }
-               }
                break;
        case STATE_WEIGHTS:
                session->learn_buf = in;
@@ -1189,7 +1187,7 @@ accept_socket (gint fd, short what, void *arg)
        io_tv->tv_sec = ctx->timeout / 1000;
        io_tv->tv_usec = ctx->timeout - io_tv->tv_sec * 1000;
 
-       new_session->s = new_async_session (new_session->session_pool, free_session, new_session);
+       new_session->s = new_async_session (new_session->session_pool, NULL, free_session, new_session);
 
        new_session->dispatcher = rspamd_create_dispatcher (ctx->ev_base, nfd, BUFFER_LINE, controller_read_socket,
                        controller_write_socket, controller_err_socket, io_tv, (void *)new_session);
index be0d2ad7d724f1b7115ca70cd5f7901e172b969e..1d868eb5806d24fa8a2c9e7700341976e500f684 100644 (file)
--- a/src/dns.c
+++ b/src/dns.c
@@ -1220,6 +1220,7 @@ dns_read_cb (gint fd, short what, void *arg)
                        }
                        upstream_ok (&rep->request->server->up, rep->request->time);
                        rep->request->func (rep, rep->request->arg);
+                       remove_normal_event (req->session, dns_fin_cb, req);
                }
        }
 }
@@ -1239,11 +1240,11 @@ dns_timer_cb (gint fd, short what, void *arg)
                rep->request = req;
                rep->code = DNS_RC_SERVFAIL;
                upstream_fail (&rep->request->server->up, rep->request->time);
-               remove_normal_event (req->session, dns_fin_cb, req);
                dns_check_throttling (req->resolver);
                req->resolver->errors ++;
 
                req->func (rep, req->arg);
+               remove_normal_event (req->session, dns_fin_cb, req);
 
                return;
        }
@@ -1262,8 +1263,9 @@ dns_timer_cb (gint fd, short what, void *arg)
                rep = memory_pool_alloc0 (req->pool, sizeof (struct rspamd_dns_reply));
                rep->request = req;
                rep->code = DNS_RC_SERVFAIL;
-               remove_normal_event (req->session, dns_fin_cb, req);
+
                req->func (rep, req->arg);
+               remove_normal_event (req->session, dns_fin_cb, req);
                return;
        }
        
@@ -1277,8 +1279,9 @@ dns_timer_cb (gint fd, short what, void *arg)
                rep->request = req;
                rep->code = DNS_RC_SERVFAIL;
                upstream_fail (&rep->request->server->up, rep->request->time);
-               remove_normal_event (req->session, dns_fin_cb, req);
+
                req->func (rep, req->arg);
+               remove_normal_event (req->session, dns_fin_cb, req);
 
                return;
        }
@@ -1288,9 +1291,9 @@ dns_timer_cb (gint fd, short what, void *arg)
                rep = memory_pool_alloc0 (req->pool, sizeof (struct rspamd_dns_reply));
                rep->request = req;
                rep->code = DNS_RC_SERVFAIL;
-               remove_normal_event (req->session, dns_fin_cb, req);
                upstream_fail (&rep->request->server->up, rep->request->time);
                req->func (rep, req->arg);
+               remove_normal_event (req->session, dns_fin_cb, req);
                return;
        }
        evtimer_add (&req->timer_event, &req->tv);
@@ -1303,6 +1306,8 @@ dns_retransmit_handler (gint fd, short what, void *arg)
        struct rspamd_dns_reply *rep;
        gint r;
 
+       remove_normal_event (req->session, (event_finalizer_t)event_del, &req->io_event);
+
        if (what == EV_WRITE) {
                /* Retransmit dns request */
                req->retransmits ++;
index f91e3e847d0881a6796c743eab824db3fa29cdfc..6d848ca2d0ccf32af1374f090274a6789d640eb6 100644 (file)
 #include "main.h"
 #include "events.h"
 
+#undef RSPAMD_EVENTS_DEBUG
+
+static gboolean
+rspamd_event_equal (gconstpointer a, gconstpointer b)
+{
+       const struct rspamd_async_event  *ev1 = a, *ev2 = b;
+
+       if (ev1->fin == ev2->fin) {
+               return ev1->user_data == ev2->user_data;
+       }
+
+       return FALSE;
+}
+
+static guint
+rspamd_event_hash (gconstpointer a)
+{
+       const struct rspamd_async_event  *ev = a;
+       guint                             h = 0, i;
+       gchar                            *p;
+
+       p = (gchar *)ev->user_data;
+       for (i = 0; i < sizeof (gpointer); i ++) {
+               h ^= *p;
+               h += (h << 1) + (h << 4) + (h << 7) + (h << 8) + (h << 24);
+               p ++;
+       }
+
+       return h;
+}
 
 struct rspamd_async_session    *
-new_async_session (memory_pool_t * pool, event_finalizer_t fin, void *user_data)
+new_async_session (memory_pool_t * pool, event_finalizer_t fin, event_finalizer_t cleanup, void *user_data)
 {
        struct rspamd_async_session    *new;
 
        new = memory_pool_alloc (pool, sizeof (struct rspamd_async_session));
        new->pool = pool;
        new->fin = fin;
+       new->cleanup = cleanup;
        new->user_data = user_data;
        new->wanna_die = FALSE;
-       new->events = g_queue_new ();
+       new->events = g_hash_table_new (rspamd_event_hash, rspamd_event_equal);
+       new->forced_events = g_queue_new ();
 
-       memory_pool_add_destructor (pool, (pool_destruct_func) g_queue_free, new->events);
+       memory_pool_add_destructor (pool, (pool_destruct_func) g_hash_table_destroy, new->events);
+       memory_pool_add_destructor (pool, (pool_destruct_func) g_queue_free, new->forced_events);
 
        return new;
 }
@@ -57,7 +90,7 @@ register_async_event (struct rspamd_async_session *session, event_finalizer_t fi
 
        if (forced) {
                /* For forced events try first to increase its reference */
-               cur = session->events->head;
+               cur = session->forced_events->head;
                while (cur) {
                        ev = cur->data;
                        if (ev->forced && ev->fin == fin) {
@@ -73,7 +106,10 @@ register_async_event (struct rspamd_async_session *session, event_finalizer_t fi
        new->user_data = user_data;
        new->forced = forced;
        new->ref = 1;
-       g_queue_push_head (session->events, new);
+       g_hash_table_insert (session->events, new, new);
+#ifdef RSPAMD_EVENTS_DEBUG
+       msg_info ("added event: %p, pending %d events", user_data, g_hash_table_size (session->events));
+#endif
 }
 
 void
@@ -87,56 +123,64 @@ remove_forced_event (struct rspamd_async_session *session, event_finalizer_t fin
                return;
        }
 
-       cur = session->events->head;
+       cur = session->forced_events->head;
        while (cur) {
                ev = cur->data;
                if (ev->forced && ev->fin == fin) {
                        ev->ref--;
                        if (ev->ref == 0) {
-                               g_queue_delete_link (session->events, cur);
+                               g_queue_delete_link (session->forced_events, cur);
                        }
                        break;
                }
                cur = g_list_next (cur);
        }
 
-       if (session->wanna_die && session->fin != NULL && g_queue_get_length (session->events) == 0) {
+       check_session_pending (session);
+
+       if (session->wanna_die && session->fin != NULL && g_queue_get_length (session->forced_events) == 0) {
                /* Call session destroy after all forced events are ready */
-               session->fin (session->user_data);
+               session->cleanup (session->user_data);
        }
 }
 
 void
 remove_normal_event (struct rspamd_async_session *session, event_finalizer_t fin, void *ud)
 {
-       struct rspamd_async_event      *ev;
-       GList                          *cur;
+       struct rspamd_async_event       search_ev;
 
        if (session == NULL) {
                msg_info ("session is NULL");
                return;
        }
 
-       cur = session->events->head;
-       while (cur) {
-               ev = cur->data;
-               if (ev->fin == fin && ev->user_data == ud && !ev->forced) {
-                       g_queue_delete_link (session->events, cur);
-                       if (ev->fin) {
-                               ev->fin (ev->user_data);
-                       }
-                       break;
-               }
-               cur = g_list_next (cur);
+       search_ev.fin = fin;
+       search_ev.user_data = ud;
+       if (g_hash_table_remove (session->events, &search_ev)) {
+               fin (ud);
+       }
+
+#ifdef RSPAMD_EVENTS_DEBUG
+       msg_info ("removed event: %p, pending %d events", ud, g_hash_table_size (session->events));
+#endif
+
+       check_session_pending (session);
+}
+
+static void
+rspamd_session_destroy (gpointer k, gpointer v, gpointer unused)
+{
+       struct rspamd_async_event      *ev = v;
+
+       /* Call event's finalizer */
+       if (ev->fin != NULL) {
+               ev->fin (ev->user_data);
        }
 }
 
 gboolean
 destroy_session (struct rspamd_async_session *session)
 {
-       struct rspamd_async_event      *ev;
-       GList                          *cur, *tmp;
-
        if (session == NULL) {
                msg_info ("session is NULL");
                return FALSE;
@@ -144,30 +188,28 @@ destroy_session (struct rspamd_async_session *session)
 
        session->wanna_die = TRUE;
 
-       cur = session->events->head;
+       g_hash_table_foreach (session->events, rspamd_session_destroy, session);
 
-       while (cur) {
-               ev = cur->data;
-               if (!ev->forced) {
-                       if (ev->fin != NULL) {
-                               ev->fin (ev->user_data);
-                       }
-                       tmp = cur;
-                       cur = g_list_next (cur);
-                       g_queue_delete_link (session->events, tmp);
-               }
-               else {
-                       /* Do nothing with forced callbacks */
-                       cur = g_list_next (cur);
+       if (g_queue_get_length (session->forced_events) == 0) {
+               if (session->cleanup != NULL) {
+                       session->cleanup (session->user_data);
                }
+               return TRUE;
        }
 
-       if (g_queue_get_length (session->events) == 0) {
+       return FALSE;
+}
+
+gboolean
+check_session_pending (struct rspamd_async_session *session)
+{
+       if (g_queue_get_length (session->forced_events) == 0 && g_hash_table_size (session->events) == 0) {
                if (session->fin != NULL) {
                        session->fin (session->user_data);
                }
-               return TRUE;
+               /* No more events */
+               return FALSE;
        }
 
-       return FALSE;
+       return TRUE;
 }
index 3715b4d66ad5d81f3c6ba0f135a1714c6146b4d1..434c39d803db950c4d021ab95bc161262d895d4d 100644 (file)
@@ -16,18 +16,49 @@ struct rspamd_async_event {
 
 struct rspamd_async_session {
        event_finalizer_t fin;
-       GQueue *events;
+       event_finalizer_t cleanup;
+       GHashTable *events;
+       GQueue *forced_events;
        void *user_data;
        memory_pool_t *pool;
        gboolean wanna_die;
 };
 
-/* Makes new async session */
-struct rspamd_async_session *new_async_session (memory_pool_t *pool, event_finalizer_t fin, void *user_data);
-/* Insert event into session */
-void register_async_event (struct rspamd_async_session *session, event_finalizer_t fin, void *user_data, gboolean forced);
-/* Must be called by forced events to call session destructor properly */
+/**
+ * Make new async session
+ * @param pool pool to alloc memory from
+ * @param fin a callback called when no events are found in session
+ * @param cleanup a callback called when session is forcefully destroyed
+ * @param user_data abstract user data
+ * @return
+ */
+struct rspamd_async_session *new_async_session (memory_pool_t *pool,
+               event_finalizer_t fin, event_finalizer_t cleanup, void *user_data);
+
+/**
+ * Insert new event to the session
+ * @param session session object
+ * @param fin finalizer callback
+ * @param user_data abstract user_data
+ * @param forced session cannot be destroyed until forced event are still in it
+ */
+void register_async_event (struct rspamd_async_session *session,
+               event_finalizer_t fin, void *user_data, gboolean forced);
+
+
+/**
+ * Remove forced event
+ * @param session session object
+ * @param fin destructor function
+ */
 void remove_forced_event (struct rspamd_async_session *session, event_finalizer_t fin);
+
+/**
+ * Remove normal event
+ * @param session session object
+ * @param fin final callback
+ * @param ud user data object
+ */
 void remove_normal_event (struct rspamd_async_session *session, event_finalizer_t fin, void *ud); 
 
 /**
@@ -36,4 +67,11 @@ void remove_normal_event (struct rspamd_async_session *session, event_finalizer_
  */
 gboolean destroy_session (struct rspamd_async_session *session);
 
+/**
+ * Check session for events pending and call fin callback if no events are pending
+ * @param session session object
+ * @return TRUE if session has pending events
+ */
+gboolean check_session_pending (struct rspamd_async_session *session);
+
 #endif /* RSPAMD_EVENTS_H */
index f4048955d8d7f1af5950494677d5a821a1b58b2c..562f705e5dfde961a788307e97b74df44f038689 100644 (file)
@@ -208,48 +208,6 @@ check_metric_is_spam (struct worker_task *task, struct metric *metric)
        return FALSE;
 }
 
-static gint
-continue_process_filters (struct worker_task *task)
-{
-       GList                          *cur;
-       gpointer                        item = task->save.item;
-       struct metric                  *metric;
-
-       while (call_symbol_callback (task, task->cfg->cache, &item)) {
-               cur = task->cfg->metrics_list;
-               while (cur) {
-                       metric = cur->data;
-                       /* call_filter_by_name (task, filt->func_name, filt->type, SCRIPT_HEADER); */
-                       if (task->save.saved) {
-                               task->save.entry = cur;
-                               task->save.item = item;
-                               return 0;
-                       }
-                       else if (!task->pass_all_filters && 
-                                               metric->action == METRIC_ACTION_REJECT && 
-                                               check_metric_is_spam (task, metric)) {
-                               goto end;
-                       }
-                       cur = g_list_next (cur);
-               }
-       }
-
-end:
-       /* Process all statfiles */
-       process_statfiles (task);
-       /* Call post filters */
-       lua_call_post_filters (task);
-       task->state = WRITE_REPLY;
-
-       if (task->fin_callback) {
-               task->fin_callback (task->fin_arg);
-       }
-       else {
-               rspamd_dispatcher_restore (task->dispatcher);
-       }
-       return 1;
-}
-
 gint
 process_filters (struct worker_task *task)
 {
@@ -257,10 +215,6 @@ process_filters (struct worker_task *task)
        struct metric                  *metric;
        gpointer                        item = NULL;
 
-       if (task->save.saved) {
-               task->save.saved = 0;
-               return continue_process_filters (task);
-       }
        /* Check skip */
        if (check_skip (task->cfg->views, task)) {
                task->is_skipped = TRUE;
@@ -282,15 +236,10 @@ process_filters (struct worker_task *task)
                cur = task->cfg->metrics_list;
                while (cur) {
                        metric = cur->data;
-                       if (task->save.saved) {
-                               task->save.entry = cur;
-                               task->save.item = item;
-                               return 0;
-                       }
-                       else if (!task->pass_all_filters && 
+                       if (!task->pass_all_filters &&
                                                metric->action == METRIC_ACTION_REJECT && 
                                                check_metric_is_spam (task, metric)) {
-                               task->state = WRITE_REPLY;
+                               check_session_pending (task->s);
                                return 1;
                        }
                        cur = g_list_next (cur);
index 3764c96ccbeaf5f418ad593cfbea964f6645f72d..16f237a369a70fe52ad376affa456547b4e868d5 100644 (file)
@@ -103,13 +103,6 @@ lua_http_push_error (gint code, struct lua_http_ud *ud)
 
        ud->parser_state = 3;
        remove_normal_event (ud->task->s, lua_http_fin, ud);
-
-       ud->task->save.saved--;
-       if (ud->task->save.saved == 0) {
-               /* Call other filters */
-               ud->task->save.saved = 1;
-               process_filters (ud->task);
-       }
 }
 
 static void
@@ -151,12 +144,6 @@ lua_http_push_reply (f_str_t *in, struct lua_http_ud *ud)
        }
 
        remove_normal_event (ud->task->s, lua_http_fin, ud);
-       ud->task->save.saved--;
-       if (ud->task->save.saved == 0) {
-               /* Call other filters */
-               ud->task->save.saved = 1;
-               process_filters (ud->task);
-       }
 }
 
 /*
@@ -384,7 +371,6 @@ lua_http_make_request_common (lua_State *L, struct worker_task *task, const gcha
        if (make_dns_request (task->resolver, task->s, task->task_pool, lua_http_dns_callback, ud,
                        DNS_REQUEST_A, hostname)) {
                task->dns_requests ++;
-               task->save.saved++;
        }
 
        return 0;
index de8bff89b2855f271b60a5b717618118a220e083..49a7e92799943cee05ee004c5b320d544ebd926b 100644 (file)
@@ -82,15 +82,8 @@ lua_redis_fin (void *arg)
        struct lua_redis_userdata                       *ud = arg;
 
        if (ud->ctx) {
-               msg_info ("hui");
                redisAsyncFree (ud->ctx);
                luaL_unref (ud->L, LUA_REGISTRYINDEX, ud->cbref);
-               /*
-               ud->task->save.saved--;
-               if (ud->task->save.saved == 0) {
-                       ud->task->save.saved = 1;
-                       process_filters (ud->task);
-               }*/
        }
 }
 
@@ -112,7 +105,7 @@ lua_redis_push_error (const gchar *err, struct lua_redis_userdata *ud, gboolean
        *ptask = ud->task;
        /* String of error */
        lua_pushstring (ud->L, err);
-       /* Data */
+       /* Data is nil */
        lua_pushnil (ud->L);
        if (lua_pcall (ud->L, 3, 0, 0) != 0) {
                msg_info ("call to callback failed: %s", lua_tostring (ud->L, -1));
@@ -138,8 +131,8 @@ lua_redis_push_data (const redisReply *r, struct lua_redis_userdata *ud)
        lua_setclass (ud->L, "rspamd{task}", -1);
 
        *ptask = ud->task;
-       /* String of error */
-       lua_pushstring (ud->L, ud->ctx->errstr);
+       /* Error is nil */
+       lua_pushnil (ud->L);
        /* Data */
        lua_pushlstring (ud->L, r->str, r->len);
 
@@ -162,8 +155,6 @@ lua_redis_callback (redisAsyncContext *c, gpointer r, gpointer priv)
        redisReply                                                      *reply = r;
        struct lua_redis_userdata                       *ud = priv;
 
-       msg_info ("in callback: err: %d, r: %p", c->err, r);
-
        if (c->err == 0) {
                if (r != NULL) {
                        lua_redis_push_data (reply, ud);
@@ -173,7 +164,12 @@ lua_redis_callback (redisAsyncContext *c, gpointer r, gpointer priv)
                }
        }
        else {
-               lua_redis_push_error (c->errstr, ud, TRUE);
+               if (c->err == REDIS_ERR_IO) {
+                       lua_redis_push_error (strerror (errno), ud, TRUE);
+               }
+               else {
+                       lua_redis_push_error (c->errstr, ud, TRUE);
+               }
        }
 }
 /**
@@ -191,7 +187,6 @@ lua_redis_make_request_real (struct lua_redis_userdata *ud)
        }
        else {
                register_async_event (ud->task->s, lua_redis_fin, ud, FALSE);
-               ud->task->save.saved ++;
        }
        redisLibeventAttach (ud->ctx, ud->task->ev_base);
        /* Make a request now */
@@ -279,7 +274,7 @@ lua_redis_make_request (lua_State *L)
                        /* Now get remaining args */
                        ud->args_num = lua_gettop (L) - 5;
                        ud->args = memory_pool_alloc (task->task_pool, ud->args_num * sizeof (f_str_t));
-                       for (i = 0; i < ud->args_num - 1; i ++) {
+                       for (i = 0; i < ud->args_num; i ++) {
                                tmp = lua_tolstring (L, i + 6, &ud->args[i].len);
                                /* Make a copy of argument */
                                ud->args[i].begin = memory_pool_alloc (task->task_pool, ud->args[i].len);
index 4780391495262e0650f7b91d793cfb0ef85e65b6..19de0f7c8799c46d8873cdfd48667ea48ad79ca7 100644 (file)
@@ -538,13 +538,6 @@ lua_dns_callback (struct rspamd_dns_reply *reply, gpointer arg)
        if (lua_pcall (cd->L, 5, 0, 0) != 0) {
                msg_info ("call to %s failed: %s", cd->callback, lua_tostring (cd->L, -1));
        }
-
-       cd->task->save.saved--;
-       if (cd->task->save.saved == 0) {
-               /* Call other filters */
-               cd->task->save.saved = 1;
-               process_filters (cd->task);
-       }
 }
 
 static gint
@@ -584,7 +577,6 @@ lua_task_resolve_dns_a (lua_State * L)
                }
                if (make_dns_request (task->resolver, task->s, task->task_pool, lua_dns_callback, (void *)cd, DNS_REQUEST_A, cd->to_resolve)) {
                        task->dns_requests ++;
-                       task->save.saved++;
                }
        }
        return 0;
@@ -626,7 +618,6 @@ lua_task_resolve_dns_txt (lua_State * L)
                }
                if (make_dns_request (task->resolver, task->s, task->task_pool, lua_dns_callback, (void *)cd, DNS_REQUEST_TXT, cd->to_resolve)) {
                        task->dns_requests ++;
-                       task->save.saved++;
                }
        }
        return 0;
@@ -671,7 +662,6 @@ lua_task_resolve_dns_ptr (lua_State * L)
                if (make_dns_request (task->resolver, task->s, task->task_pool,
                                lua_dns_callback, (void *)cd, DNS_REQUEST_PTR, ina)) {
                        task->dns_requests ++;
-                       task->save.saved++;
                }
        }
        return 0;
index 59a5f3c30467807220a9b31d344f10ea98b65f93..8a680451a0d6d4dab936f27515f292df6760d0dc 100644 (file)
@@ -107,15 +107,6 @@ struct counter_data {
        gint number;
 };
 
-/**
- * Save point object for delayed filters processing
- */
-struct save_point {
-       GList *entry;                                                                                           /**< pointer to saved metric                                            */
-       void *item;                                                                                                     /**< pointer to saved item                                                      */
-       guint saved;                                                                                    /**< how much time we have delayed processing           */
-};
-
 /**
  * Structure to point exception in text from processing
  */
@@ -235,7 +226,6 @@ struct worker_task {
        GList *messages;                                                                                        /**< list of messages that would be reported            */
        GHashTable *re_cache;                                                                           /**< cache for matched or not matched regexps           */
        struct config_file *cfg;                                                                        /**< pointer to config object                                           */
-       struct save_point save;                                                                         /**< save point for delayed processing                          */
        gchar *last_error;                                                                                      /**< last error                                                                         */
        gint error_code;                                                                                                /**< code of last error                                                         */
        memory_pool_t *task_pool;                                                                       /**< memory pool for task                                                       */
index f330e7ecdaf853b8b7bf454ce9b17cc326d6829b..f7191117bec5bf516a1a8892c0e6689e47dcfc4b 100644 (file)
@@ -522,13 +522,6 @@ fuzzy_io_callback (gint fd, short what, void *arg)
        msg_err ("got error on IO with server %s:%d, %d, %s", session->server->name, session->server->port, errno, strerror (errno));
   ok:
        remove_normal_event (session->task->s, fuzzy_io_fin, session);
-
-       session->task->save.saved--;
-       if (session->task->save.saved == 0) {
-               /* Call other filters */
-               session->task->save.saved = 1;
-               process_filters (session->task);
-       }
 }
 
 static void
@@ -601,12 +594,6 @@ fuzzy_learn_callback (gint fd, short what, void *arg)
        }
   ok:
        remove_normal_event (session->session->s, fuzzy_learn_fin, session);
-
-       (*session->saved)--;
-       if (*session->saved == 0) {
-               session->session->state = STATE_REPLY;
-               session->session->dispatcher->write_callback (session->session);
-       }
 }
 
 static inline void
@@ -642,7 +629,6 @@ register_fuzzy_call (struct worker_task *task, fuzzy_hash_t *h)
                        session->server = selected;
                        event_add (&session->ev, &session->tv);
                        register_async_event (task->s, fuzzy_io_fin, session, FALSE);
-                       task->save.saved++;
                }
        }
 }
index e9b1531ac3de61b3bcef545b52207f3056354d5d..12ec80e6794a7483ff57ce327cdb582188601ad8 100644 (file)
@@ -227,13 +227,6 @@ spf_plugin_callback (struct spf_record *record, struct worker_task *task)
                }
                spf_check_list (l, task);
        }
-
-       if (task->save.saved == 0) {
-               /* Call other filters */
-               task->save.saved = 1;
-               /* Note that here task MAY be destroyed */
-               process_filters (task);
-       }
 }
 
 
index b848d2a30d8102bb6d8f6050424e2d25bce96d33..b1f51b7fc991b7ca2b99641cf6daa351a401713b 100644 (file)
@@ -634,7 +634,6 @@ make_surbl_requests (struct uri *url, struct worker_task *task,
                        debug_task ("send surbl dns request %s", surbl_req);
                        if (make_dns_request (task->resolver, task->s, task->task_pool, dns_callback, (void *)param, DNS_REQUEST_A, surbl_req)) {
                                task->dns_requests ++;
-                               param->task->save.saved++;
                        }
                }
                else if (err != NULL && err->code != WHITELIST_ERROR) {
@@ -703,13 +702,6 @@ dns_callback (struct rspamd_dns_reply *reply, gpointer arg)
        else {
                debug_task ("<%s> domain [%s] is not in surbl %s", param->task->message_id, param->host_resolve, param->suffix->suffix);
        }
-
-       param->task->save.saved--;
-       if (param->task->save.saved == 0) {
-               /* Call other filters */
-               param->task->save.saved = 1;
-               process_filters (param->task);
-       }
 }
 
 static void
@@ -723,12 +715,6 @@ memcached_callback (memcached_ctx_t * ctx, memc_error_t error, void *data)
                if (error != OK) {
                        msg_info ("memcached returned error %s on CONNECT stage", memc_strerror (error));
                        memc_close_ctx (param->ctx);
-                       param->task->save.saved--;
-                       if (param->task->save.saved == 0) {
-                               /* Call other filters */
-                               param->task->save.saved = 1;
-                               process_filters (param->task);
-                       }
                }
                else {
                        memc_get (param->ctx, param->ctx->param);
@@ -738,12 +724,6 @@ memcached_callback (memcached_ctx_t * ctx, memc_error_t error, void *data)
                if (error != OK) {
                        msg_info ("memcached returned error %s on READ stage", memc_strerror (error));
                        memc_close_ctx (param->ctx);
-                       param->task->save.saved--;
-                       if (param->task->save.saved == 0) {
-                               /* Call other filters */
-                               param->task->save.saved = 1;
-                               process_filters (param->task);
-                       }
                }
                else {
                        url_count = (gint *)param->ctx->param->buf;
@@ -764,12 +744,6 @@ memcached_callback (memcached_ctx_t * ctx, memc_error_t error, void *data)
                        msg_info ("memcached returned error %s on WRITE stage", memc_strerror (error));
                }
                memc_close_ctx (param->ctx);
-               param->task->save.saved--;
-               if (param->task->save.saved == 0) {
-                       /* Call other filters */
-                       param->task->save.saved = 1;
-                       process_filters (param->task);
-               }
                make_surbl_requests (param->url, param->task, param->suffix, FALSE);
                break;
        default:
@@ -862,14 +836,6 @@ redirector_callback (gint fd, short what, void *arg)
                                msg_err ("write failed %s to %s", strerror (errno), param->redirector->name);
                                upstream_fail (&param->redirector->up, param->task->tv.tv_sec);
                                remove_normal_event (param->task->s, free_redirector_session, param);
-
-                               param->task->save.saved--;
-                               make_surbl_requests (param->url, param->task, param->suffix, FALSE);
-                               if (param->task->save.saved == 0) {
-                                       /* Call other filters */
-                                       param->task->save.saved = 1;
-                                       process_filters (param->task);
-                               }
                                return;
                        }
                        param->state = STATE_READ;
@@ -880,13 +846,6 @@ redirector_callback (gint fd, short what, void *arg)
                        upstream_fail (&param->redirector->up, param->task->tv.tv_sec);
                        remove_normal_event (param->task->s, free_redirector_session, param);
 
-                       param->task->save.saved--;
-                       make_surbl_requests (param->url, param->task, param->suffix, FALSE);
-                       if (param->task->save.saved == 0) {
-                               /* Call other filters */
-                               param->task->save.saved = 1;
-                               process_filters (param->task);
-                       }
                        return;
                }
                break;
@@ -896,14 +855,8 @@ redirector_callback (gint fd, short what, void *arg)
                        if (r <= 0) {
                                msg_err ("read failed: %s from %s", strerror (errno), param->redirector->name);
                                upstream_fail (&param->redirector->up, param->task->tv.tv_sec);
-                               remove_normal_event (param->task->s, free_redirector_session, param);
-                               param->task->save.saved--;
                                make_surbl_requests (param->url, param->task, param->suffix, FALSE);
-                               if (param->task->save.saved == 0) {
-                                       /* Call other filters */
-                                       param->task->save.saved = 1;
-                                       process_filters (param->task);
-                               }
+                               remove_normal_event (param->task->s, free_redirector_session, param);
                                return;
                        }
 
@@ -925,29 +878,12 @@ redirector_callback (gint fd, short what, void *arg)
                        }
                        upstream_ok (&param->redirector->up, param->task->tv.tv_sec);
                        remove_normal_event (param->task->s, free_redirector_session, param);
-
-                       param->task->save.saved--;
-                       make_surbl_requests (param->url, param->task, param->suffix, FALSE);
-                       if (param->task->save.saved == 0) {
-                               /* Call other filters */
-                               param->task->save.saved = 1;
-                               process_filters (param->task);
-                       }
-
                }
                else {
                        msg_info ("<%s> reading redirector %s timed out, while waiting for read",
                                        param->redirector->name, param->task->message_id);
                        upstream_fail (&param->redirector->up, param->task->tv.tv_sec);
                        remove_normal_event (param->task->s, free_redirector_session, param);
-
-                       param->task->save.saved--;
-                       make_surbl_requests (param->url, param->task, param->suffix, FALSE);
-                       if (param->task->save.saved == 0) {
-                               /* Call other filters */
-                               param->task->save.saved = 1;
-                               process_filters (param->task);
-                       }
                }
                break;
        }
@@ -975,7 +911,6 @@ register_redirector_call (struct uri *url, struct worker_task *task,
 
        if (s == -1) {
                msg_info ("<%s> cannot create tcp socket failed: %s", task->message_id, strerror (errno));
-               task->save.saved--;
                make_surbl_requests (url, task, suffix, FALSE);
                return;
        }
@@ -1036,7 +971,6 @@ surbl_tree_url_callback (gpointer key, gpointer value, void *data)
                                                        insert_result (param->task, surbl_module_ctx->redirector_symbol, 1, g_list_prepend (NULL, red_domain));
                                                }
                                                register_redirector_call (url, param->task, param->suffix, red_domain);
-                                               param->task->save.saved++;
                                                return FALSE;
                                        }
                                }
@@ -1047,7 +981,6 @@ surbl_tree_url_callback (gpointer key, gpointer value, void *data)
        else {
                if (param->task->worker->srv->cfg->memcached_servers_num > 0) {
                        register_memcached_call (url, param->task, param->suffix);
-                       param->task->save.saved++;
                }
                else {
                        make_surbl_requests (url, param->task, param->suffix, FALSE);
index 0c88df3b02a5bcb7946ec8e32d26eae1c9c22664..8f3706a6bf9a2a6e6c109e266c9c920e97102b21 100644 (file)
@@ -700,7 +700,7 @@ accept_socket (gint fd, short what, void *arg)
 
        /* Resolve client's addr */
        /* Set up async session */
-       session->s = new_async_session (session->pool, free_smtp_session, session);
+       session->s = new_async_session (session->pool, NULL, free_smtp_session, session);
        session->state = SMTP_STATE_RESOLVE_REVERSE;
        if (! make_dns_request (session->resolver, session->s, session->pool,
                        smtp_dns_cb, session, DNS_REQUEST_PTR, &session->client_addr)) {
index ef64551b748aa1c1cd152c73633dcdfe2121625b..137ca0f5e3153836c1655e05430b67aecfe60d12 100644 (file)
--- a/src/spf.c
+++ b/src/spf.c
@@ -341,7 +341,6 @@ spf_record_dns_callback (struct rspamd_dns_reply *reply, gpointer arg)
                                                /* Now resolve A record for this MX */
                                                if (make_dns_request (task->resolver, task->s, task->task_pool, spf_record_dns_callback, (void *)cb, DNS_REQUEST_A, elt_data->mx.name)) {
                                                        task->dns_requests ++;
-                                                       task->save.saved++;
                                                }
                                        }
                                        else if (reply->type == DNS_REQUEST_A) {
@@ -460,11 +459,6 @@ spf_record_dns_callback (struct rspamd_dns_reply *reply, gpointer arg)
                                        break;
                }
        }
-
-       cb->rec->task->save.saved--;
-       if (cb->rec->task->save.saved == 0 && cb->rec->callback) {
-               cb->rec->callback (cb->rec, cb->rec->task);
-       }
 }
 
 static gboolean
@@ -494,7 +488,6 @@ parse_spf_a (struct worker_task *task, const gchar *begin, struct spf_record *re
        cb->in_include = rec->in_include;
        if (make_dns_request (task->resolver, task->s, task->task_pool, spf_record_dns_callback, (void *)cb, DNS_REQUEST_A, host)) {
                task->dns_requests ++;
-               task->save.saved++;
                
                return TRUE;
        }
@@ -542,7 +535,6 @@ parse_spf_mx (struct worker_task *task, const gchar *begin, struct spf_record *r
        cb->in_include = rec->in_include;
        if (make_dns_request (task->resolver, task->s, task->task_pool, spf_record_dns_callback, (void *)cb, DNS_REQUEST_MX, host)) {
                task->dns_requests ++;
-               task->save.saved++;
                
                return TRUE;
        }
@@ -600,8 +592,7 @@ parse_spf_include (struct worker_task *task, const gchar *begin, struct spf_reco
        domain = memory_pool_strdup (task->task_pool, begin);
        if (make_dns_request (task->resolver, task->s, task->task_pool, spf_record_dns_callback, (void *)cb, DNS_REQUEST_TXT, domain)) {
                task->dns_requests ++;
-               task->save.saved++;
-               
+
                return TRUE;
        }
 
@@ -640,7 +631,6 @@ parse_spf_redirect (struct worker_task *task, const gchar *begin, struct spf_rec
        domain = memory_pool_strdup (task->task_pool, begin);
        if (make_dns_request (task->resolver, task->s, task->task_pool, spf_record_dns_callback, (void *)cb, DNS_REQUEST_TXT, domain)) {
                task->dns_requests ++;
-               task->save.saved++;
                
                return TRUE;
        }
@@ -672,7 +662,6 @@ parse_spf_exists (struct worker_task *task, const gchar *begin, struct spf_recor
 
        if (make_dns_request (task->resolver, task->s, task->task_pool, spf_record_dns_callback, (void *)cb, DNS_REQUEST_A, host)) {
                task->dns_requests ++;
-               task->save.saved++;
                
                return TRUE;
        }
@@ -1164,11 +1153,6 @@ spf_dns_callback (struct rspamd_dns_reply *reply, gpointer arg)
                        cur = g_list_next (cur);
                }
        }
-
-       rec->task->save.saved--;
-       if (rec->task->save.saved == 0 && rec->callback) {
-               rec->callback (rec, rec->task);
-       }
 }
 
 gchar *
@@ -1236,7 +1220,6 @@ resolve_spf (struct worker_task *task, spf_cb_t callback)
 
                if (make_dns_request (task->resolver, task->s, task->task_pool, spf_dns_callback, (void *)rec, DNS_REQUEST_TXT, rec->cur_domain)) {
                        task->dns_requests ++;
-                       task->save.saved++;
                        return TRUE;
                }
        }
@@ -1266,7 +1249,6 @@ resolve_spf (struct worker_task *task, spf_cb_t callback)
                        rec->sender_domain = rec->cur_domain;
                        if (make_dns_request (task->resolver, task->s, task->task_pool, spf_dns_callback, (void *)rec, DNS_REQUEST_TXT, rec->cur_domain)) {
                                task->dns_requests ++;
-                               task->save.saved++;
                                return TRUE;
                        }
                }
index 24cb0dd69c658362adc258bdb20eef334a124685..2411906b048c8f161846e768f91d671dcfe644e0 100644 (file)
@@ -228,8 +228,6 @@ construct_task (struct rspamd_worker *worker)
        memory_pool_add_destructor (new_task->task_pool,
                                        (pool_destruct_func) g_tree_destroy,
                                        new_task->urls);
-       new_task->s =
-                       new_async_session (new_task->task_pool, free_task_hard, new_task);
        new_task->sock = -1;
        new_task->is_mime = TRUE;
 
@@ -489,16 +487,6 @@ read_socket (f_str_t * in, void *arg)
                                task->state = WRITE_ERROR;
                                return write_socket (task);
                        }
-                       else if (r == 0) {
-                               task->state = WAIT_FILTER;
-                               rspamd_dispatcher_pause (task->dispatcher);
-                       }
-                       else {
-                               process_statfiles (task);
-                               lua_call_post_filters (task);
-                               task->state = WRITE_REPLY;
-                               return write_socket (task);
-                       }
                }
                break;
        case WRITE_REPLY:
@@ -593,6 +581,28 @@ err_socket (GError * err, void *arg)
        destroy_session (task->s);
 }
 
+/*
+ * Called if all filters are processed
+ */
+static void
+fin_task (void *arg)
+{
+       struct worker_task             *task = (struct worker_task *) arg;
+
+       /* Process all statfiles */
+       process_statfiles (task);
+       /* Call post filters */
+       lua_call_post_filters (task);
+       task->state = WRITE_REPLY;
+
+       if (task->fin_callback) {
+               task->fin_callback (task->fin_arg);
+       }
+       else {
+               rspamd_dispatcher_restore (task->dispatcher);
+       }
+}
+
 /*
  * Reduce number of tasks proceeded
  */
@@ -670,6 +680,10 @@ accept_socket (gint fd, short what, void *arg)
        ctx->tasks ++;
        memory_pool_add_destructor (new_task->task_pool, (pool_destruct_func)reduce_tasks_count, &ctx->tasks);
 
+       /* Set up async session */
+       new_task->s =
+                               new_async_session (new_task->task_pool, fin_task, free_task_hard, new_task);
+
        /* Init custom filters */
 #ifndef BUILD_STATIC
        if (ctx->is_custom) {