From 4499fc92189905fde71139822d784ab7819b181c Mon Sep 17 00:00:00 2001 From: Vsevolod Stakhov Date: Wed, 14 Dec 2011 19:05:56 +0300 Subject: * Finally get rid of stupid savepoints system and migrate to async events logic completely Fix lua redis library. --- src/controller.c | 50 +++++++++--------- src/dns.c | 13 +++-- src/events.c | 126 ++++++++++++++++++++++++++++++---------------- src/events.h | 50 +++++++++++++++--- src/filter.c | 55 +------------------- src/lua/lua_http.c | 14 ------ src/lua/lua_redis.c | 25 ++++----- src/lua/lua_task.c | 10 ---- src/main.h | 10 ---- src/plugins/fuzzy_check.c | 14 ------ src/plugins/spf.c | 7 --- src/plugins/surbl.c | 69 +------------------------ src/smtp.c | 2 +- src/spf.c | 20 +------- src/worker.c | 38 +++++++++----- 15 files changed, 202 insertions(+), 301 deletions(-) diff --git a/src/controller.c b/src/controller.c index 206e17f82..fc26b36d9 100644 --- a/src/controller.c +++ b/src/controller.c @@ -832,6 +832,26 @@ process_normal_command (const gchar *line) return NULL; } +static void +fin_learn_task (void *arg) +{ + struct worker_task *task = (struct worker_task *)arg; + + /* XXX: this is bad logic in fact */ + /* Process all statfiles */ + process_statfiles (task); + /* Call post filters */ + lua_call_post_filters (task); + task->state = WRITE_REPLY; + + if (task->fin_callback) { + task->fin_callback (task->fin_arg); + } + else { + rspamd_dispatcher_restore (task->dispatcher); + } +} + static gboolean controller_read_socket (f_str_t * in, void *arg) { @@ -943,7 +963,6 @@ controller_read_socket (f_str_t * in, void *arg) r = process_message (task); if (r == -1) { msg_warn ("processing of message failed"); - destroy_session (task->s); session->state = STATE_REPLY; r = rspamd_snprintf (out_buf, sizeof (out_buf), "cannot process message" CRLF); if (! rspamd_dispatcher_write (session->dispatcher, out_buf, r, FALSE, FALSE)) { @@ -951,7 +970,8 @@ controller_read_socket (f_str_t * in, void *arg) } return FALSE; } - + /* Set up async session */ + task->s = new_async_session (task->task_pool, fin_learn_task, free_task_hard, task); r = process_filters (task); if (r == -1) { session->state = STATE_REPLY; @@ -961,34 +981,12 @@ controller_read_socket (f_str_t * in, void *arg) return FALSE; } } - else if (r == 0) { + else { session->state = STATE_LEARN_SPAM; task->dispatcher = session->dispatcher; session->learn_task = task; rspamd_dispatcher_pause (session->dispatcher); } - else { - lua_call_post_filters (task); - session->state = STATE_REPLY; - - if (! learn_task_spam (session->learn_classifier, task, session->in_class, &err)) { - if (err) { - i = rspamd_snprintf (out_buf, sizeof (out_buf), "learn failed, learn classifier error: %s" CRLF END, err->message); - g_error_free (err); - } - else { - i = rspamd_snprintf (out_buf, sizeof (out_buf), "learn failed, unknown learn classifier error" CRLF END); - } - } - else { - i = rspamd_snprintf (out_buf, sizeof (out_buf), "learn ok" CRLF END); - } - - destroy_session (task->s); - if (!rspamd_dispatcher_write (session->dispatcher, out_buf, i, FALSE, FALSE)) { - return FALSE; - } - } break; case STATE_WEIGHTS: session->learn_buf = in; @@ -1189,7 +1187,7 @@ accept_socket (gint fd, short what, void *arg) io_tv->tv_sec = ctx->timeout / 1000; io_tv->tv_usec = ctx->timeout - io_tv->tv_sec * 1000; - new_session->s = new_async_session (new_session->session_pool, free_session, new_session); + new_session->s = new_async_session (new_session->session_pool, NULL, free_session, new_session); new_session->dispatcher = rspamd_create_dispatcher (ctx->ev_base, nfd, BUFFER_LINE, controller_read_socket, controller_write_socket, controller_err_socket, io_tv, (void *)new_session); diff --git a/src/dns.c b/src/dns.c index be0d2ad7d..1d868eb58 100644 --- a/src/dns.c +++ b/src/dns.c @@ -1220,6 +1220,7 @@ dns_read_cb (gint fd, short what, void *arg) } upstream_ok (&rep->request->server->up, rep->request->time); rep->request->func (rep, rep->request->arg); + remove_normal_event (req->session, dns_fin_cb, req); } } } @@ -1239,11 +1240,11 @@ dns_timer_cb (gint fd, short what, void *arg) rep->request = req; rep->code = DNS_RC_SERVFAIL; upstream_fail (&rep->request->server->up, rep->request->time); - remove_normal_event (req->session, dns_fin_cb, req); dns_check_throttling (req->resolver); req->resolver->errors ++; req->func (rep, req->arg); + remove_normal_event (req->session, dns_fin_cb, req); return; } @@ -1262,8 +1263,9 @@ dns_timer_cb (gint fd, short what, void *arg) rep = memory_pool_alloc0 (req->pool, sizeof (struct rspamd_dns_reply)); rep->request = req; rep->code = DNS_RC_SERVFAIL; - remove_normal_event (req->session, dns_fin_cb, req); + req->func (rep, req->arg); + remove_normal_event (req->session, dns_fin_cb, req); return; } @@ -1277,8 +1279,9 @@ dns_timer_cb (gint fd, short what, void *arg) rep->request = req; rep->code = DNS_RC_SERVFAIL; upstream_fail (&rep->request->server->up, rep->request->time); - remove_normal_event (req->session, dns_fin_cb, req); + req->func (rep, req->arg); + remove_normal_event (req->session, dns_fin_cb, req); return; } @@ -1288,9 +1291,9 @@ dns_timer_cb (gint fd, short what, void *arg) rep = memory_pool_alloc0 (req->pool, sizeof (struct rspamd_dns_reply)); rep->request = req; rep->code = DNS_RC_SERVFAIL; - remove_normal_event (req->session, dns_fin_cb, req); upstream_fail (&rep->request->server->up, rep->request->time); req->func (rep, req->arg); + remove_normal_event (req->session, dns_fin_cb, req); return; } evtimer_add (&req->timer_event, &req->tv); @@ -1303,6 +1306,8 @@ dns_retransmit_handler (gint fd, short what, void *arg) struct rspamd_dns_reply *rep; gint r; + remove_normal_event (req->session, (event_finalizer_t)event_del, &req->io_event); + if (what == EV_WRITE) { /* Retransmit dns request */ req->retransmits ++; diff --git a/src/events.c b/src/events.c index f91e3e847..6d848ca2d 100644 --- a/src/events.c +++ b/src/events.c @@ -26,20 +26,53 @@ #include "main.h" #include "events.h" +#undef RSPAMD_EVENTS_DEBUG + +static gboolean +rspamd_event_equal (gconstpointer a, gconstpointer b) +{ + const struct rspamd_async_event *ev1 = a, *ev2 = b; + + if (ev1->fin == ev2->fin) { + return ev1->user_data == ev2->user_data; + } + + return FALSE; +} + +static guint +rspamd_event_hash (gconstpointer a) +{ + const struct rspamd_async_event *ev = a; + guint h = 0, i; + gchar *p; + + p = (gchar *)ev->user_data; + for (i = 0; i < sizeof (gpointer); i ++) { + h ^= *p; + h += (h << 1) + (h << 4) + (h << 7) + (h << 8) + (h << 24); + p ++; + } + + return h; +} struct rspamd_async_session * -new_async_session (memory_pool_t * pool, event_finalizer_t fin, void *user_data) +new_async_session (memory_pool_t * pool, event_finalizer_t fin, event_finalizer_t cleanup, void *user_data) { struct rspamd_async_session *new; new = memory_pool_alloc (pool, sizeof (struct rspamd_async_session)); new->pool = pool; new->fin = fin; + new->cleanup = cleanup; new->user_data = user_data; new->wanna_die = FALSE; - new->events = g_queue_new (); + new->events = g_hash_table_new (rspamd_event_hash, rspamd_event_equal); + new->forced_events = g_queue_new (); - memory_pool_add_destructor (pool, (pool_destruct_func) g_queue_free, new->events); + memory_pool_add_destructor (pool, (pool_destruct_func) g_hash_table_destroy, new->events); + memory_pool_add_destructor (pool, (pool_destruct_func) g_queue_free, new->forced_events); return new; } @@ -57,7 +90,7 @@ register_async_event (struct rspamd_async_session *session, event_finalizer_t fi if (forced) { /* For forced events try first to increase its reference */ - cur = session->events->head; + cur = session->forced_events->head; while (cur) { ev = cur->data; if (ev->forced && ev->fin == fin) { @@ -73,7 +106,10 @@ register_async_event (struct rspamd_async_session *session, event_finalizer_t fi new->user_data = user_data; new->forced = forced; new->ref = 1; - g_queue_push_head (session->events, new); + g_hash_table_insert (session->events, new, new); +#ifdef RSPAMD_EVENTS_DEBUG + msg_info ("added event: %p, pending %d events", user_data, g_hash_table_size (session->events)); +#endif } void @@ -87,56 +123,64 @@ remove_forced_event (struct rspamd_async_session *session, event_finalizer_t fin return; } - cur = session->events->head; + cur = session->forced_events->head; while (cur) { ev = cur->data; if (ev->forced && ev->fin == fin) { ev->ref--; if (ev->ref == 0) { - g_queue_delete_link (session->events, cur); + g_queue_delete_link (session->forced_events, cur); } break; } cur = g_list_next (cur); } - if (session->wanna_die && session->fin != NULL && g_queue_get_length (session->events) == 0) { + check_session_pending (session); + + if (session->wanna_die && session->fin != NULL && g_queue_get_length (session->forced_events) == 0) { /* Call session destroy after all forced events are ready */ - session->fin (session->user_data); + session->cleanup (session->user_data); } } void remove_normal_event (struct rspamd_async_session *session, event_finalizer_t fin, void *ud) { - struct rspamd_async_event *ev; - GList *cur; + struct rspamd_async_event search_ev; if (session == NULL) { msg_info ("session is NULL"); return; } - cur = session->events->head; - while (cur) { - ev = cur->data; - if (ev->fin == fin && ev->user_data == ud && !ev->forced) { - g_queue_delete_link (session->events, cur); - if (ev->fin) { - ev->fin (ev->user_data); - } - break; - } - cur = g_list_next (cur); + search_ev.fin = fin; + search_ev.user_data = ud; + if (g_hash_table_remove (session->events, &search_ev)) { + fin (ud); + } + +#ifdef RSPAMD_EVENTS_DEBUG + msg_info ("removed event: %p, pending %d events", ud, g_hash_table_size (session->events)); +#endif + + check_session_pending (session); +} + +static void +rspamd_session_destroy (gpointer k, gpointer v, gpointer unused) +{ + struct rspamd_async_event *ev = v; + + /* Call event's finalizer */ + if (ev->fin != NULL) { + ev->fin (ev->user_data); } } gboolean destroy_session (struct rspamd_async_session *session) { - struct rspamd_async_event *ev; - GList *cur, *tmp; - if (session == NULL) { msg_info ("session is NULL"); return FALSE; @@ -144,30 +188,28 @@ destroy_session (struct rspamd_async_session *session) session->wanna_die = TRUE; - cur = session->events->head; + g_hash_table_foreach (session->events, rspamd_session_destroy, session); - while (cur) { - ev = cur->data; - if (!ev->forced) { - if (ev->fin != NULL) { - ev->fin (ev->user_data); - } - tmp = cur; - cur = g_list_next (cur); - g_queue_delete_link (session->events, tmp); - } - else { - /* Do nothing with forced callbacks */ - cur = g_list_next (cur); + if (g_queue_get_length (session->forced_events) == 0) { + if (session->cleanup != NULL) { + session->cleanup (session->user_data); } + return TRUE; } - if (g_queue_get_length (session->events) == 0) { + return FALSE; +} + +gboolean +check_session_pending (struct rspamd_async_session *session) +{ + if (g_queue_get_length (session->forced_events) == 0 && g_hash_table_size (session->events) == 0) { if (session->fin != NULL) { session->fin (session->user_data); } - return TRUE; + /* No more events */ + return FALSE; } - return FALSE; + return TRUE; } diff --git a/src/events.h b/src/events.h index 3715b4d66..434c39d80 100644 --- a/src/events.h +++ b/src/events.h @@ -16,18 +16,49 @@ struct rspamd_async_event { struct rspamd_async_session { event_finalizer_t fin; - GQueue *events; + event_finalizer_t cleanup; + GHashTable *events; + GQueue *forced_events; void *user_data; memory_pool_t *pool; gboolean wanna_die; }; -/* Makes new async session */ -struct rspamd_async_session *new_async_session (memory_pool_t *pool, event_finalizer_t fin, void *user_data); -/* Insert event into session */ -void register_async_event (struct rspamd_async_session *session, event_finalizer_t fin, void *user_data, gboolean forced); -/* Must be called by forced events to call session destructor properly */ +/** + * Make new async session + * @param pool pool to alloc memory from + * @param fin a callback called when no events are found in session + * @param cleanup a callback called when session is forcefully destroyed + * @param user_data abstract user data + * @return + */ +struct rspamd_async_session *new_async_session (memory_pool_t *pool, + event_finalizer_t fin, event_finalizer_t cleanup, void *user_data); + +/** + * Insert new event to the session + * @param session session object + * @param fin finalizer callback + * @param user_data abstract user_data + * @param forced session cannot be destroyed until forced event are still in it + */ +void register_async_event (struct rspamd_async_session *session, + event_finalizer_t fin, void *user_data, gboolean forced); + + +/** + * Remove forced event + * @param session session object + * @param fin destructor function + */ void remove_forced_event (struct rspamd_async_session *session, event_finalizer_t fin); + +/** + * Remove normal event + * @param session session object + * @param fin final callback + * @param ud user data object + */ void remove_normal_event (struct rspamd_async_session *session, event_finalizer_t fin, void *ud); /** @@ -36,4 +67,11 @@ void remove_normal_event (struct rspamd_async_session *session, event_finalizer_ */ gboolean destroy_session (struct rspamd_async_session *session); +/** + * Check session for events pending and call fin callback if no events are pending + * @param session session object + * @return TRUE if session has pending events + */ +gboolean check_session_pending (struct rspamd_async_session *session); + #endif /* RSPAMD_EVENTS_H */ diff --git a/src/filter.c b/src/filter.c index f4048955d..562f705e5 100644 --- a/src/filter.c +++ b/src/filter.c @@ -208,48 +208,6 @@ check_metric_is_spam (struct worker_task *task, struct metric *metric) return FALSE; } -static gint -continue_process_filters (struct worker_task *task) -{ - GList *cur; - gpointer item = task->save.item; - struct metric *metric; - - while (call_symbol_callback (task, task->cfg->cache, &item)) { - cur = task->cfg->metrics_list; - while (cur) { - metric = cur->data; - /* call_filter_by_name (task, filt->func_name, filt->type, SCRIPT_HEADER); */ - if (task->save.saved) { - task->save.entry = cur; - task->save.item = item; - return 0; - } - else if (!task->pass_all_filters && - metric->action == METRIC_ACTION_REJECT && - check_metric_is_spam (task, metric)) { - goto end; - } - cur = g_list_next (cur); - } - } - -end: - /* Process all statfiles */ - process_statfiles (task); - /* Call post filters */ - lua_call_post_filters (task); - task->state = WRITE_REPLY; - - if (task->fin_callback) { - task->fin_callback (task->fin_arg); - } - else { - rspamd_dispatcher_restore (task->dispatcher); - } - return 1; -} - gint process_filters (struct worker_task *task) { @@ -257,10 +215,6 @@ process_filters (struct worker_task *task) struct metric *metric; gpointer item = NULL; - if (task->save.saved) { - task->save.saved = 0; - return continue_process_filters (task); - } /* Check skip */ if (check_skip (task->cfg->views, task)) { task->is_skipped = TRUE; @@ -282,15 +236,10 @@ process_filters (struct worker_task *task) cur = task->cfg->metrics_list; while (cur) { metric = cur->data; - if (task->save.saved) { - task->save.entry = cur; - task->save.item = item; - return 0; - } - else if (!task->pass_all_filters && + if (!task->pass_all_filters && metric->action == METRIC_ACTION_REJECT && check_metric_is_spam (task, metric)) { - task->state = WRITE_REPLY; + check_session_pending (task->s); return 1; } cur = g_list_next (cur); diff --git a/src/lua/lua_http.c b/src/lua/lua_http.c index 3764c96cc..16f237a36 100644 --- a/src/lua/lua_http.c +++ b/src/lua/lua_http.c @@ -103,13 +103,6 @@ lua_http_push_error (gint code, struct lua_http_ud *ud) ud->parser_state = 3; remove_normal_event (ud->task->s, lua_http_fin, ud); - - ud->task->save.saved--; - if (ud->task->save.saved == 0) { - /* Call other filters */ - ud->task->save.saved = 1; - process_filters (ud->task); - } } static void @@ -151,12 +144,6 @@ lua_http_push_reply (f_str_t *in, struct lua_http_ud *ud) } remove_normal_event (ud->task->s, lua_http_fin, ud); - ud->task->save.saved--; - if (ud->task->save.saved == 0) { - /* Call other filters */ - ud->task->save.saved = 1; - process_filters (ud->task); - } } /* @@ -384,7 +371,6 @@ lua_http_make_request_common (lua_State *L, struct worker_task *task, const gcha if (make_dns_request (task->resolver, task->s, task->task_pool, lua_http_dns_callback, ud, DNS_REQUEST_A, hostname)) { task->dns_requests ++; - task->save.saved++; } return 0; diff --git a/src/lua/lua_redis.c b/src/lua/lua_redis.c index de8bff89b..49a7e9279 100644 --- a/src/lua/lua_redis.c +++ b/src/lua/lua_redis.c @@ -82,15 +82,8 @@ lua_redis_fin (void *arg) struct lua_redis_userdata *ud = arg; if (ud->ctx) { - msg_info ("hui"); redisAsyncFree (ud->ctx); luaL_unref (ud->L, LUA_REGISTRYINDEX, ud->cbref); - /* - ud->task->save.saved--; - if (ud->task->save.saved == 0) { - ud->task->save.saved = 1; - process_filters (ud->task); - }*/ } } @@ -112,7 +105,7 @@ lua_redis_push_error (const gchar *err, struct lua_redis_userdata *ud, gboolean *ptask = ud->task; /* String of error */ lua_pushstring (ud->L, err); - /* Data */ + /* Data is nil */ lua_pushnil (ud->L); if (lua_pcall (ud->L, 3, 0, 0) != 0) { msg_info ("call to callback failed: %s", lua_tostring (ud->L, -1)); @@ -138,8 +131,8 @@ lua_redis_push_data (const redisReply *r, struct lua_redis_userdata *ud) lua_setclass (ud->L, "rspamd{task}", -1); *ptask = ud->task; - /* String of error */ - lua_pushstring (ud->L, ud->ctx->errstr); + /* Error is nil */ + lua_pushnil (ud->L); /* Data */ lua_pushlstring (ud->L, r->str, r->len); @@ -162,8 +155,6 @@ lua_redis_callback (redisAsyncContext *c, gpointer r, gpointer priv) redisReply *reply = r; struct lua_redis_userdata *ud = priv; - msg_info ("in callback: err: %d, r: %p", c->err, r); - if (c->err == 0) { if (r != NULL) { lua_redis_push_data (reply, ud); @@ -173,7 +164,12 @@ lua_redis_callback (redisAsyncContext *c, gpointer r, gpointer priv) } } else { - lua_redis_push_error (c->errstr, ud, TRUE); + if (c->err == REDIS_ERR_IO) { + lua_redis_push_error (strerror (errno), ud, TRUE); + } + else { + lua_redis_push_error (c->errstr, ud, TRUE); + } } } /** @@ -191,7 +187,6 @@ lua_redis_make_request_real (struct lua_redis_userdata *ud) } else { register_async_event (ud->task->s, lua_redis_fin, ud, FALSE); - ud->task->save.saved ++; } redisLibeventAttach (ud->ctx, ud->task->ev_base); /* Make a request now */ @@ -279,7 +274,7 @@ lua_redis_make_request (lua_State *L) /* Now get remaining args */ ud->args_num = lua_gettop (L) - 5; ud->args = memory_pool_alloc (task->task_pool, ud->args_num * sizeof (f_str_t)); - for (i = 0; i < ud->args_num - 1; i ++) { + for (i = 0; i < ud->args_num; i ++) { tmp = lua_tolstring (L, i + 6, &ud->args[i].len); /* Make a copy of argument */ ud->args[i].begin = memory_pool_alloc (task->task_pool, ud->args[i].len); diff --git a/src/lua/lua_task.c b/src/lua/lua_task.c index 478039149..19de0f7c8 100644 --- a/src/lua/lua_task.c +++ b/src/lua/lua_task.c @@ -538,13 +538,6 @@ lua_dns_callback (struct rspamd_dns_reply *reply, gpointer arg) if (lua_pcall (cd->L, 5, 0, 0) != 0) { msg_info ("call to %s failed: %s", cd->callback, lua_tostring (cd->L, -1)); } - - cd->task->save.saved--; - if (cd->task->save.saved == 0) { - /* Call other filters */ - cd->task->save.saved = 1; - process_filters (cd->task); - } } static gint @@ -584,7 +577,6 @@ lua_task_resolve_dns_a (lua_State * L) } if (make_dns_request (task->resolver, task->s, task->task_pool, lua_dns_callback, (void *)cd, DNS_REQUEST_A, cd->to_resolve)) { task->dns_requests ++; - task->save.saved++; } } return 0; @@ -626,7 +618,6 @@ lua_task_resolve_dns_txt (lua_State * L) } if (make_dns_request (task->resolver, task->s, task->task_pool, lua_dns_callback, (void *)cd, DNS_REQUEST_TXT, cd->to_resolve)) { task->dns_requests ++; - task->save.saved++; } } return 0; @@ -671,7 +662,6 @@ lua_task_resolve_dns_ptr (lua_State * L) if (make_dns_request (task->resolver, task->s, task->task_pool, lua_dns_callback, (void *)cd, DNS_REQUEST_PTR, ina)) { task->dns_requests ++; - task->save.saved++; } } return 0; diff --git a/src/main.h b/src/main.h index 59a5f3c30..8a680451a 100644 --- a/src/main.h +++ b/src/main.h @@ -107,15 +107,6 @@ struct counter_data { gint number; }; -/** - * Save point object for delayed filters processing - */ -struct save_point { - GList *entry; /**< pointer to saved metric */ - void *item; /**< pointer to saved item */ - guint saved; /**< how much time we have delayed processing */ -}; - /** * Structure to point exception in text from processing */ @@ -235,7 +226,6 @@ struct worker_task { GList *messages; /**< list of messages that would be reported */ GHashTable *re_cache; /**< cache for matched or not matched regexps */ struct config_file *cfg; /**< pointer to config object */ - struct save_point save; /**< save point for delayed processing */ gchar *last_error; /**< last error */ gint error_code; /**< code of last error */ memory_pool_t *task_pool; /**< memory pool for task */ diff --git a/src/plugins/fuzzy_check.c b/src/plugins/fuzzy_check.c index f330e7ecd..f7191117b 100644 --- a/src/plugins/fuzzy_check.c +++ b/src/plugins/fuzzy_check.c @@ -522,13 +522,6 @@ fuzzy_io_callback (gint fd, short what, void *arg) msg_err ("got error on IO with server %s:%d, %d, %s", session->server->name, session->server->port, errno, strerror (errno)); ok: remove_normal_event (session->task->s, fuzzy_io_fin, session); - - session->task->save.saved--; - if (session->task->save.saved == 0) { - /* Call other filters */ - session->task->save.saved = 1; - process_filters (session->task); - } } static void @@ -601,12 +594,6 @@ fuzzy_learn_callback (gint fd, short what, void *arg) } ok: remove_normal_event (session->session->s, fuzzy_learn_fin, session); - - (*session->saved)--; - if (*session->saved == 0) { - session->session->state = STATE_REPLY; - session->session->dispatcher->write_callback (session->session); - } } static inline void @@ -642,7 +629,6 @@ register_fuzzy_call (struct worker_task *task, fuzzy_hash_t *h) session->server = selected; event_add (&session->ev, &session->tv); register_async_event (task->s, fuzzy_io_fin, session, FALSE); - task->save.saved++; } } } diff --git a/src/plugins/spf.c b/src/plugins/spf.c index e9b1531ac..12ec80e67 100644 --- a/src/plugins/spf.c +++ b/src/plugins/spf.c @@ -227,13 +227,6 @@ spf_plugin_callback (struct spf_record *record, struct worker_task *task) } spf_check_list (l, task); } - - if (task->save.saved == 0) { - /* Call other filters */ - task->save.saved = 1; - /* Note that here task MAY be destroyed */ - process_filters (task); - } } diff --git a/src/plugins/surbl.c b/src/plugins/surbl.c index b848d2a30..b1f51b7fc 100644 --- a/src/plugins/surbl.c +++ b/src/plugins/surbl.c @@ -634,7 +634,6 @@ make_surbl_requests (struct uri *url, struct worker_task *task, debug_task ("send surbl dns request %s", surbl_req); if (make_dns_request (task->resolver, task->s, task->task_pool, dns_callback, (void *)param, DNS_REQUEST_A, surbl_req)) { task->dns_requests ++; - param->task->save.saved++; } } else if (err != NULL && err->code != WHITELIST_ERROR) { @@ -703,13 +702,6 @@ dns_callback (struct rspamd_dns_reply *reply, gpointer arg) else { debug_task ("<%s> domain [%s] is not in surbl %s", param->task->message_id, param->host_resolve, param->suffix->suffix); } - - param->task->save.saved--; - if (param->task->save.saved == 0) { - /* Call other filters */ - param->task->save.saved = 1; - process_filters (param->task); - } } static void @@ -723,12 +715,6 @@ memcached_callback (memcached_ctx_t * ctx, memc_error_t error, void *data) if (error != OK) { msg_info ("memcached returned error %s on CONNECT stage", memc_strerror (error)); memc_close_ctx (param->ctx); - param->task->save.saved--; - if (param->task->save.saved == 0) { - /* Call other filters */ - param->task->save.saved = 1; - process_filters (param->task); - } } else { memc_get (param->ctx, param->ctx->param); @@ -738,12 +724,6 @@ memcached_callback (memcached_ctx_t * ctx, memc_error_t error, void *data) if (error != OK) { msg_info ("memcached returned error %s on READ stage", memc_strerror (error)); memc_close_ctx (param->ctx); - param->task->save.saved--; - if (param->task->save.saved == 0) { - /* Call other filters */ - param->task->save.saved = 1; - process_filters (param->task); - } } else { url_count = (gint *)param->ctx->param->buf; @@ -764,12 +744,6 @@ memcached_callback (memcached_ctx_t * ctx, memc_error_t error, void *data) msg_info ("memcached returned error %s on WRITE stage", memc_strerror (error)); } memc_close_ctx (param->ctx); - param->task->save.saved--; - if (param->task->save.saved == 0) { - /* Call other filters */ - param->task->save.saved = 1; - process_filters (param->task); - } make_surbl_requests (param->url, param->task, param->suffix, FALSE); break; default: @@ -862,14 +836,6 @@ redirector_callback (gint fd, short what, void *arg) msg_err ("write failed %s to %s", strerror (errno), param->redirector->name); upstream_fail (¶m->redirector->up, param->task->tv.tv_sec); remove_normal_event (param->task->s, free_redirector_session, param); - - param->task->save.saved--; - make_surbl_requests (param->url, param->task, param->suffix, FALSE); - if (param->task->save.saved == 0) { - /* Call other filters */ - param->task->save.saved = 1; - process_filters (param->task); - } return; } param->state = STATE_READ; @@ -880,13 +846,6 @@ redirector_callback (gint fd, short what, void *arg) upstream_fail (¶m->redirector->up, param->task->tv.tv_sec); remove_normal_event (param->task->s, free_redirector_session, param); - param->task->save.saved--; - make_surbl_requests (param->url, param->task, param->suffix, FALSE); - if (param->task->save.saved == 0) { - /* Call other filters */ - param->task->save.saved = 1; - process_filters (param->task); - } return; } break; @@ -896,14 +855,8 @@ redirector_callback (gint fd, short what, void *arg) if (r <= 0) { msg_err ("read failed: %s from %s", strerror (errno), param->redirector->name); upstream_fail (¶m->redirector->up, param->task->tv.tv_sec); - remove_normal_event (param->task->s, free_redirector_session, param); - param->task->save.saved--; make_surbl_requests (param->url, param->task, param->suffix, FALSE); - if (param->task->save.saved == 0) { - /* Call other filters */ - param->task->save.saved = 1; - process_filters (param->task); - } + remove_normal_event (param->task->s, free_redirector_session, param); return; } @@ -925,29 +878,12 @@ redirector_callback (gint fd, short what, void *arg) } upstream_ok (¶m->redirector->up, param->task->tv.tv_sec); remove_normal_event (param->task->s, free_redirector_session, param); - - param->task->save.saved--; - make_surbl_requests (param->url, param->task, param->suffix, FALSE); - if (param->task->save.saved == 0) { - /* Call other filters */ - param->task->save.saved = 1; - process_filters (param->task); - } - } else { msg_info ("<%s> reading redirector %s timed out, while waiting for read", param->redirector->name, param->task->message_id); upstream_fail (¶m->redirector->up, param->task->tv.tv_sec); remove_normal_event (param->task->s, free_redirector_session, param); - - param->task->save.saved--; - make_surbl_requests (param->url, param->task, param->suffix, FALSE); - if (param->task->save.saved == 0) { - /* Call other filters */ - param->task->save.saved = 1; - process_filters (param->task); - } } break; } @@ -975,7 +911,6 @@ register_redirector_call (struct uri *url, struct worker_task *task, if (s == -1) { msg_info ("<%s> cannot create tcp socket failed: %s", task->message_id, strerror (errno)); - task->save.saved--; make_surbl_requests (url, task, suffix, FALSE); return; } @@ -1036,7 +971,6 @@ surbl_tree_url_callback (gpointer key, gpointer value, void *data) insert_result (param->task, surbl_module_ctx->redirector_symbol, 1, g_list_prepend (NULL, red_domain)); } register_redirector_call (url, param->task, param->suffix, red_domain); - param->task->save.saved++; return FALSE; } } @@ -1047,7 +981,6 @@ surbl_tree_url_callback (gpointer key, gpointer value, void *data) else { if (param->task->worker->srv->cfg->memcached_servers_num > 0) { register_memcached_call (url, param->task, param->suffix); - param->task->save.saved++; } else { make_surbl_requests (url, param->task, param->suffix, FALSE); diff --git a/src/smtp.c b/src/smtp.c index 0c88df3b0..8f3706a6b 100644 --- a/src/smtp.c +++ b/src/smtp.c @@ -700,7 +700,7 @@ accept_socket (gint fd, short what, void *arg) /* Resolve client's addr */ /* Set up async session */ - session->s = new_async_session (session->pool, free_smtp_session, session); + session->s = new_async_session (session->pool, NULL, free_smtp_session, session); session->state = SMTP_STATE_RESOLVE_REVERSE; if (! make_dns_request (session->resolver, session->s, session->pool, smtp_dns_cb, session, DNS_REQUEST_PTR, &session->client_addr)) { diff --git a/src/spf.c b/src/spf.c index ef64551b7..137ca0f5e 100644 --- a/src/spf.c +++ b/src/spf.c @@ -341,7 +341,6 @@ spf_record_dns_callback (struct rspamd_dns_reply *reply, gpointer arg) /* Now resolve A record for this MX */ if (make_dns_request (task->resolver, task->s, task->task_pool, spf_record_dns_callback, (void *)cb, DNS_REQUEST_A, elt_data->mx.name)) { task->dns_requests ++; - task->save.saved++; } } else if (reply->type == DNS_REQUEST_A) { @@ -460,11 +459,6 @@ spf_record_dns_callback (struct rspamd_dns_reply *reply, gpointer arg) break; } } - - cb->rec->task->save.saved--; - if (cb->rec->task->save.saved == 0 && cb->rec->callback) { - cb->rec->callback (cb->rec, cb->rec->task); - } } static gboolean @@ -494,7 +488,6 @@ parse_spf_a (struct worker_task *task, const gchar *begin, struct spf_record *re cb->in_include = rec->in_include; if (make_dns_request (task->resolver, task->s, task->task_pool, spf_record_dns_callback, (void *)cb, DNS_REQUEST_A, host)) { task->dns_requests ++; - task->save.saved++; return TRUE; } @@ -542,7 +535,6 @@ parse_spf_mx (struct worker_task *task, const gchar *begin, struct spf_record *r cb->in_include = rec->in_include; if (make_dns_request (task->resolver, task->s, task->task_pool, spf_record_dns_callback, (void *)cb, DNS_REQUEST_MX, host)) { task->dns_requests ++; - task->save.saved++; return TRUE; } @@ -600,8 +592,7 @@ parse_spf_include (struct worker_task *task, const gchar *begin, struct spf_reco domain = memory_pool_strdup (task->task_pool, begin); if (make_dns_request (task->resolver, task->s, task->task_pool, spf_record_dns_callback, (void *)cb, DNS_REQUEST_TXT, domain)) { task->dns_requests ++; - task->save.saved++; - + return TRUE; } @@ -640,7 +631,6 @@ parse_spf_redirect (struct worker_task *task, const gchar *begin, struct spf_rec domain = memory_pool_strdup (task->task_pool, begin); if (make_dns_request (task->resolver, task->s, task->task_pool, spf_record_dns_callback, (void *)cb, DNS_REQUEST_TXT, domain)) { task->dns_requests ++; - task->save.saved++; return TRUE; } @@ -672,7 +662,6 @@ parse_spf_exists (struct worker_task *task, const gchar *begin, struct spf_recor if (make_dns_request (task->resolver, task->s, task->task_pool, spf_record_dns_callback, (void *)cb, DNS_REQUEST_A, host)) { task->dns_requests ++; - task->save.saved++; return TRUE; } @@ -1164,11 +1153,6 @@ spf_dns_callback (struct rspamd_dns_reply *reply, gpointer arg) cur = g_list_next (cur); } } - - rec->task->save.saved--; - if (rec->task->save.saved == 0 && rec->callback) { - rec->callback (rec, rec->task); - } } gchar * @@ -1236,7 +1220,6 @@ resolve_spf (struct worker_task *task, spf_cb_t callback) if (make_dns_request (task->resolver, task->s, task->task_pool, spf_dns_callback, (void *)rec, DNS_REQUEST_TXT, rec->cur_domain)) { task->dns_requests ++; - task->save.saved++; return TRUE; } } @@ -1266,7 +1249,6 @@ resolve_spf (struct worker_task *task, spf_cb_t callback) rec->sender_domain = rec->cur_domain; if (make_dns_request (task->resolver, task->s, task->task_pool, spf_dns_callback, (void *)rec, DNS_REQUEST_TXT, rec->cur_domain)) { task->dns_requests ++; - task->save.saved++; return TRUE; } } diff --git a/src/worker.c b/src/worker.c index 24cb0dd69..2411906b0 100644 --- a/src/worker.c +++ b/src/worker.c @@ -228,8 +228,6 @@ construct_task (struct rspamd_worker *worker) memory_pool_add_destructor (new_task->task_pool, (pool_destruct_func) g_tree_destroy, new_task->urls); - new_task->s = - new_async_session (new_task->task_pool, free_task_hard, new_task); new_task->sock = -1; new_task->is_mime = TRUE; @@ -489,16 +487,6 @@ read_socket (f_str_t * in, void *arg) task->state = WRITE_ERROR; return write_socket (task); } - else if (r == 0) { - task->state = WAIT_FILTER; - rspamd_dispatcher_pause (task->dispatcher); - } - else { - process_statfiles (task); - lua_call_post_filters (task); - task->state = WRITE_REPLY; - return write_socket (task); - } } break; case WRITE_REPLY: @@ -593,6 +581,28 @@ err_socket (GError * err, void *arg) destroy_session (task->s); } +/* + * Called if all filters are processed + */ +static void +fin_task (void *arg) +{ + struct worker_task *task = (struct worker_task *) arg; + + /* Process all statfiles */ + process_statfiles (task); + /* Call post filters */ + lua_call_post_filters (task); + task->state = WRITE_REPLY; + + if (task->fin_callback) { + task->fin_callback (task->fin_arg); + } + else { + rspamd_dispatcher_restore (task->dispatcher); + } +} + /* * Reduce number of tasks proceeded */ @@ -670,6 +680,10 @@ accept_socket (gint fd, short what, void *arg) ctx->tasks ++; memory_pool_add_destructor (new_task->task_pool, (pool_destruct_func)reduce_tasks_count, &ctx->tasks); + /* Set up async session */ + new_task->s = + new_async_session (new_task->task_pool, fin_task, free_task_hard, new_task); + /* Init custom filters */ #ifndef BUILD_STATIC if (ctx->is_custom) { -- cgit v1.2.3