diff options
author | Vsevolod Stakhov <vsevolod@highsecure.ru> | 2015-06-02 14:19:55 +0100 |
---|---|---|
committer | Vsevolod Stakhov <vsevolod@highsecure.ru> | 2015-06-02 15:39:37 +0100 |
commit | a645c70082c32d2a60454b8bce30152b4760636b (patch) | |
tree | 39e5bfe8a13469a354f87ad01f6e7a008b9ef29d | |
parent | 029721955b19dd15209814bbe029d9ae749f4ebb (diff) | |
download | rspamd-a645c70082c32d2a60454b8bce30152b4760636b.tar.gz rspamd-a645c70082c32d2a60454b8bce30152b4760636b.zip |
Rework task processing.
-rw-r--r-- | src/libserver/task.c | 195 | ||||
-rw-r--r-- | src/libserver/task.h | 7 | ||||
-rw-r--r-- | src/lmtp.c | 2 | ||||
-rw-r--r-- | src/lua/lua_task.c | 6 | ||||
-rw-r--r-- | src/plugins/dkim_check.c | 2 | ||||
-rw-r--r-- | src/plugins/fuzzy_check.c | 2 | ||||
-rw-r--r-- | src/smtp.c | 2 |
7 files changed, 133 insertions, 83 deletions
diff --git a/src/libserver/task.c b/src/libserver/task.c index c63224415..33a03049f 100644 --- a/src/libserver/task.c +++ b/src/libserver/task.c @@ -329,47 +329,141 @@ rspamd_task_load_message (struct rspamd_task *task, return TRUE; } +static gint +rspamd_task_select_processing_stage (struct rspamd_task *task, guint stages) +{ + gint st; + + st = ffs (task->processed_stages); + + if (st == -1) { + st = (1 << 0); + } + else { + st = (1 << (st + 1)); + } + + if (stages & st) { + return st; + } + else if (st < RSPAMD_TASK_STAGE_DONE) { + /* We assume that the stage that was not requested is done */ + task->processed_stages |= st; + return rspamd_task_select_processing_stage (task, stages); + } + + /* We are done */ + return RSPAMD_TASK_STAGE_DONE; +} + +static gboolean +rspamd_process_filters (struct rspamd_task *task) +{ + GList *cur; + struct metric *metric; + gpointer item = NULL; + + /* Insert default metric to be sure that it exists all the time */ + /* TODO: make preprocessing only once */ + rspamd_create_metric_result (task, DEFAULT_METRIC); + if (task->settings) { + const ucl_object_t *wl; + + wl = ucl_object_find_key (task->settings, "whitelist"); + if (wl != NULL) { + msg_info ("<%s> is whitelisted", task->message_id); + task->flags |= RSPAMD_TASK_FLAG_SKIP; + return TRUE; + } + } + + /* Process metrics symbols */ + while (rspamd_symbols_cache_process_symbol (task, task->cfg->cache, &item)) { + /* Check reject actions */ + cur = task->cfg->metrics_list; + while (cur) { + metric = cur->data; + if (!(task->flags & RSPAMD_TASK_FLAG_PASS_ALL) && + metric->actions[METRIC_ACTION_REJECT].score > 0 && + check_metric_is_spam (task, metric)) { + msg_info ("<%s> has already scored more than %.2f, so do not " + "plan any more checks", task->message_id, + metric->actions[METRIC_ACTION_REJECT].score); + return TRUE; + } + cur = g_list_next (cur); + } + } + + return TRUE; +} + gboolean -rspamd_task_process (struct rspamd_task *task, - struct rspamd_http_message *msg, const gchar *start, gsize len, - guint stages) +rspamd_task_process (struct rspamd_task *task, guint stages) { - gint r; + gint st; - if (stages & RSPAMD_TASK_STAGE_READ_MESSAGE) { - /* Process message itself */ - r = process_message (task); - if (r == -1) { - msg_warn ("processing of message failed"); - task->last_error = "MIME processing error"; - task->error_code = RSPAMD_FILTER_ERROR; + if (RSPAMD_TASK_IS_PROCESSED (task)) { + return TRUE; + } + + st = rspamd_task_select_processing_stage (task, stages); + + switch (st) { + case RSPAMD_TASK_STAGE_READ_MESSAGE: + if (!rspamd_message_parse (task)) { return FALSE; } - if (!process_extra_filters) { - task->flags |= RSPAMD_TASK_FLAG_SKIP_EXTRA; - } - if (!process_extra_filters || task->cfg->pre_filters == NULL) { - r = rspamd_process_filters (task); + break; - if (r == -1) { - task->last_error = "filter processing error"; - task->error_code = RSPAMD_FILTER_ERROR; - task->state = WRITE_REPLY; + case RSPAMD_TASK_STAGE_PRE_FILTERS: + rspamd_lua_call_pre_filters (task); + break; + + case RSPAMD_TASK_STAGE_FILTERS: + if (!rspamd_process_filters (task)) { return FALSE; } + break; - if (RSPAMD_TASK_IS_SKIPPED (task)) { - /* Call write_socket to write reply and exit */ - task->state = WRITE_REPLY; + case RSPAMD_TASK_STAGE_CLASSIFIERS: + if (!rspamd_stat_classify (task, task->cfg->lua_state, &task->err)) { + return FALSE; } + break; + + case RSPAMD_TASK_STAGE_COMPOSITES: + rspamd_make_composites (task); + break; + + case RSPAMD_TASK_STAGE_POST_FILTERS: + rspamd_lua_call_post_filters (task); + break; + + case RSPAMD_TASK_STAGE_DONE: + return TRUE; + + default: + /* TODO: not implemented stage */ + break; + } + + if (RSPAMD_TASK_IS_SKIPPED (task)) { + task->processed_stages |= RSPAMD_TASK_STAGE_DONE; + return TRUE; + } + if (rspamd_session_events_pending (task->s) != 0) { + /* We have events pending, so we consider this stage as incomplete */ + msg_debug ("need more work on stage %d", st); } else { - rspamd_lua_call_pre_filters (task); - /* We want fin_task after pre filters are processed */ - if (rspamd_session_events_pending (task->s) != 0) { - task->state = WAIT_PRE_FILTER; - } + /* Mark the current stage as done and go to the next stage */ + msg_debug ("completed stage %d", st); + task->processed_stages |= st; + + /* Tail recursion */ + return rspamd_task_process (task, stages); } return TRUE; @@ -548,48 +642,3 @@ check_metric_is_spam (struct rspamd_task *task, struct metric *metric) return FALSE; } - -gint -rspamd_process_filters (struct rspamd_task *task) -{ - GList *cur; - struct metric *metric; - gpointer item = NULL; - - /* Insert default metric to be sure that it exists all the time */ - rspamd_create_metric_result (task, DEFAULT_METRIC); - if (task->settings) { - const ucl_object_t *wl; - - wl = ucl_object_find_key (task->settings, "whitelist"); - if (wl != NULL) { - msg_info ("<%s> is whitelisted", task->message_id); - task->flags |= RSPAMD_TASK_FLAG_SKIP; - return 0; - } - } - - /* Process metrics symbols */ - while (rspamd_symbols_cache_process_symbol (task, task->cfg->cache, &item)) { - /* Check reject actions */ - cur = task->cfg->metrics_list; - while (cur) { - metric = cur->data; - if (!(task->flags & RSPAMD_TASK_FLAG_PASS_ALL) && - metric->actions[METRIC_ACTION_REJECT].score > 0 && - check_metric_is_spam (task, metric)) { - msg_info ("<%s> has already scored more than %.2f, so do not " - "plan any more checks", task->message_id, - metric->actions[METRIC_ACTION_REJECT].score); - return 1; - } - cur = g_list_next (cur); - } - } - - if (rspamd_session_events_pending (task->s) != 0) { - task->state = WAIT_FILTER; - } - - return 1; -} diff --git a/src/libserver/task.h b/src/libserver/task.h index 01a59b98c..d7698a130 100644 --- a/src/libserver/task.h +++ b/src/libserver/task.h @@ -58,8 +58,9 @@ enum rspamd_task_stage { RSPAMD_TASK_STAGE_PRE_FILTERS = (1 << 3), RSPAMD_TASK_STAGE_FILTERS = (1 << 4), RSPAMD_TASK_STAGE_CLASSIFIERS = (1 << 5), - RSPAMD_TASK_STAGE_POST_FILTERS = (1 << 6), - RSPAMD_TASK_STAGE_WRITE_REPLY = (1 << 7) + RSPAMD_TASK_STAGE_COMPOSITES = (1 << 6), + RSPAMD_TASK_STAGE_POST_FILTERS = (1 << 7), + RSPAMD_TASK_STAGE_DONE = (1 << 8) }; #define RSPAMD_TASK_PROCESS_ALL (RSPAMD_TASK_STAGE_CONNECT | \ @@ -89,6 +90,7 @@ enum rspamd_task_stage { #define RSPAMD_TASK_IS_SKIPPED(task) (((task)->flags & RSPAMD_TASK_FLAG_SKIP)) #define RSPAMD_TASK_IS_JSON(task) (((task)->flags & RSPAMD_TASK_FLAG_JSON)) #define RSPAMD_TASK_IS_SPAMC(task) (((task)->flags & RSPAMD_TASK_FLAG_SPAMC)) +#define RSPAMD_TASK_IS_PROCESSED(task) (((task)->processed_stages & RSPAMD_TASK_STAGE_DONE)) typedef gint (*protocol_reply_func)(struct rspamd_task *task); @@ -157,7 +159,6 @@ struct rspamd_task { double time_virtual; struct timeval tv; guint32 scan_milliseconds; /**< how much milliseconds passed */ - guint32 parser_recursion; /**< for avoiding recursion stack overflow */ gboolean (*fin_callback)(struct rspamd_task *task, void *arg); /**< calback for filters finalizing */ void *fin_arg; /**< argument for fin callback */ diff --git a/src/lmtp.c b/src/lmtp.c index 91fbff17e..8adb46677 100644 --- a/src/lmtp.c +++ b/src/lmtp.c @@ -166,7 +166,7 @@ lmtp_read_socket (rspamd_fstring_t * in, void *arg) } break; case READ_MESSAGE: - r = process_message (lmtp->task); + r = rspamd_message_parse (lmtp->task); r = rspamd_process_filters (lmtp->task); if (r == -1) { return FALSE; diff --git a/src/lua/lua_task.c b/src/lua/lua_task.c index 8a8f6c640..36f100449 100644 --- a/src/lua/lua_task.c +++ b/src/lua/lua_task.c @@ -71,7 +71,7 @@ LUA_FUNCTION_DEF (task, create_empty); LUA_FUNCTION_DEF (task, create_from_buffer); /* Task methods */ LUA_FUNCTION_DEF (task, get_message); -LUA_FUNCTION_DEF (task, process_message); +LUA_FUNCTION_DEF (task, rspamd_message_parse); /*** * @method task:get_cfg() * Get configuration object for a task. @@ -474,7 +474,7 @@ static const struct luaL_reg tasklib_f[] = { static const struct luaL_reg tasklib_m[] = { LUA_INTERFACE_DEF (task, get_message), LUA_INTERFACE_DEF (task, destroy), - LUA_INTERFACE_DEF (task, process_message), + LUA_INTERFACE_DEF (task, rspamd_message_parse), LUA_INTERFACE_DEF (task, set_cfg), LUA_INTERFACE_DEF (task, get_cfg), LUA_INTERFACE_DEF (task, get_mempool), @@ -623,7 +623,7 @@ lua_task_process_message (lua_State *L) struct rspamd_task *task = lua_check_task (L, 1); if (task != NULL && task->msg.len > 0) { - if (process_message (task) == 0) { + if (rspamd_message_parse (task) == 0) { lua_pushboolean (L, TRUE); } else { diff --git a/src/plugins/dkim_check.c b/src/plugins/dkim_check.c index 4ffbb9672..9787ac5a5 100644 --- a/src/plugins/dkim_check.c +++ b/src/plugins/dkim_check.c @@ -425,7 +425,7 @@ dkim_symbol_callback (struct rspamd_task *task, void *unused) struct dkim_check_result *res = NULL, *cur; /* First check if a message has its signature */ - hlist = message_get_header (task, + hlist = rspamd_message_get_header (task, DKIM_SIGNHEADER, FALSE); if (hlist != NULL) { diff --git a/src/plugins/fuzzy_check.c b/src/plugins/fuzzy_check.c index 848d5d54b..c585df7a4 100644 --- a/src/plugins/fuzzy_check.c +++ b/src/plugins/fuzzy_check.c @@ -1227,7 +1227,7 @@ fuzzy_process_handler (struct rspamd_http_connection_entry *conn_ent, saved = rspamd_mempool_alloc0 (task->task_pool, sizeof (gint)); err = rspamd_mempool_alloc0 (task->task_pool, sizeof (GError *)); - r = process_message (task); + r = rspamd_message_parse (task); if (r == -1) { msg_warn ("<%s>: cannot process message for fuzzy", task->message_id); rspamd_task_free (task, FALSE); diff --git a/src/smtp.c b/src/smtp.c index 74e2323a2..51d5753aa 100644 --- a/src/smtp.c +++ b/src/smtp.c @@ -305,7 +305,7 @@ process_smtp_data (struct smtp_session *session) sizeof (struct in_addr)); session->task->cmd = CMD_CHECK; - if (process_message (session->task) == -1) { + if (rspamd_message_parse (session->task) == -1) { msg_err ("cannot process message"); munmap (session->task->msg->str, st.st_size); goto err; |