aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorVsevolod Stakhov <vsevolod@highsecure.ru>2015-06-02 14:19:55 +0100
committerVsevolod Stakhov <vsevolod@highsecure.ru>2015-06-02 15:39:37 +0100
commita645c70082c32d2a60454b8bce30152b4760636b (patch)
tree39e5bfe8a13469a354f87ad01f6e7a008b9ef29d
parent029721955b19dd15209814bbe029d9ae749f4ebb (diff)
downloadrspamd-a645c70082c32d2a60454b8bce30152b4760636b.tar.gz
rspamd-a645c70082c32d2a60454b8bce30152b4760636b.zip
Rework task processing.
-rw-r--r--src/libserver/task.c195
-rw-r--r--src/libserver/task.h7
-rw-r--r--src/lmtp.c2
-rw-r--r--src/lua/lua_task.c6
-rw-r--r--src/plugins/dkim_check.c2
-rw-r--r--src/plugins/fuzzy_check.c2
-rw-r--r--src/smtp.c2
7 files changed, 133 insertions, 83 deletions
diff --git a/src/libserver/task.c b/src/libserver/task.c
index c63224415..33a03049f 100644
--- a/src/libserver/task.c
+++ b/src/libserver/task.c
@@ -329,47 +329,141 @@ rspamd_task_load_message (struct rspamd_task *task,
return TRUE;
}
+static gint
+rspamd_task_select_processing_stage (struct rspamd_task *task, guint stages)
+{
+ gint st;
+
+ st = ffs (task->processed_stages);
+
+ if (st == -1) {
+ st = (1 << 0);
+ }
+ else {
+ st = (1 << (st + 1));
+ }
+
+ if (stages & st) {
+ return st;
+ }
+ else if (st < RSPAMD_TASK_STAGE_DONE) {
+ /* We assume that the stage that was not requested is done */
+ task->processed_stages |= st;
+ return rspamd_task_select_processing_stage (task, stages);
+ }
+
+ /* We are done */
+ return RSPAMD_TASK_STAGE_DONE;
+}
+
+static gboolean
+rspamd_process_filters (struct rspamd_task *task)
+{
+ GList *cur;
+ struct metric *metric;
+ gpointer item = NULL;
+
+ /* Insert default metric to be sure that it exists all the time */
+ /* TODO: make preprocessing only once */
+ rspamd_create_metric_result (task, DEFAULT_METRIC);
+ if (task->settings) {
+ const ucl_object_t *wl;
+
+ wl = ucl_object_find_key (task->settings, "whitelist");
+ if (wl != NULL) {
+ msg_info ("<%s> is whitelisted", task->message_id);
+ task->flags |= RSPAMD_TASK_FLAG_SKIP;
+ return TRUE;
+ }
+ }
+
+ /* Process metrics symbols */
+ while (rspamd_symbols_cache_process_symbol (task, task->cfg->cache, &item)) {
+ /* Check reject actions */
+ cur = task->cfg->metrics_list;
+ while (cur) {
+ metric = cur->data;
+ if (!(task->flags & RSPAMD_TASK_FLAG_PASS_ALL) &&
+ metric->actions[METRIC_ACTION_REJECT].score > 0 &&
+ check_metric_is_spam (task, metric)) {
+ msg_info ("<%s> has already scored more than %.2f, so do not "
+ "plan any more checks", task->message_id,
+ metric->actions[METRIC_ACTION_REJECT].score);
+ return TRUE;
+ }
+ cur = g_list_next (cur);
+ }
+ }
+
+ return TRUE;
+}
+
gboolean
-rspamd_task_process (struct rspamd_task *task,
- struct rspamd_http_message *msg, const gchar *start, gsize len,
- guint stages)
+rspamd_task_process (struct rspamd_task *task, guint stages)
{
- gint r;
+ gint st;
- if (stages & RSPAMD_TASK_STAGE_READ_MESSAGE) {
- /* Process message itself */
- r = process_message (task);
- if (r == -1) {
- msg_warn ("processing of message failed");
- task->last_error = "MIME processing error";
- task->error_code = RSPAMD_FILTER_ERROR;
+ if (RSPAMD_TASK_IS_PROCESSED (task)) {
+ return TRUE;
+ }
+
+ st = rspamd_task_select_processing_stage (task, stages);
+
+ switch (st) {
+ case RSPAMD_TASK_STAGE_READ_MESSAGE:
+ if (!rspamd_message_parse (task)) {
return FALSE;
}
- if (!process_extra_filters) {
- task->flags |= RSPAMD_TASK_FLAG_SKIP_EXTRA;
- }
- if (!process_extra_filters || task->cfg->pre_filters == NULL) {
- r = rspamd_process_filters (task);
+ break;
- if (r == -1) {
- task->last_error = "filter processing error";
- task->error_code = RSPAMD_FILTER_ERROR;
- task->state = WRITE_REPLY;
+ case RSPAMD_TASK_STAGE_PRE_FILTERS:
+ rspamd_lua_call_pre_filters (task);
+ break;
+
+ case RSPAMD_TASK_STAGE_FILTERS:
+ if (!rspamd_process_filters (task)) {
return FALSE;
}
+ break;
- if (RSPAMD_TASK_IS_SKIPPED (task)) {
- /* Call write_socket to write reply and exit */
- task->state = WRITE_REPLY;
+ case RSPAMD_TASK_STAGE_CLASSIFIERS:
+ if (!rspamd_stat_classify (task, task->cfg->lua_state, &task->err)) {
+ return FALSE;
}
+ break;
+
+ case RSPAMD_TASK_STAGE_COMPOSITES:
+ rspamd_make_composites (task);
+ break;
+
+ case RSPAMD_TASK_STAGE_POST_FILTERS:
+ rspamd_lua_call_post_filters (task);
+ break;
+
+ case RSPAMD_TASK_STAGE_DONE:
+ return TRUE;
+
+ default:
+ /* TODO: not implemented stage */
+ break;
+ }
+
+ if (RSPAMD_TASK_IS_SKIPPED (task)) {
+ task->processed_stages |= RSPAMD_TASK_STAGE_DONE;
+ return TRUE;
+ }
+ if (rspamd_session_events_pending (task->s) != 0) {
+ /* We have events pending, so we consider this stage as incomplete */
+ msg_debug ("need more work on stage %d", st);
}
else {
- rspamd_lua_call_pre_filters (task);
- /* We want fin_task after pre filters are processed */
- if (rspamd_session_events_pending (task->s) != 0) {
- task->state = WAIT_PRE_FILTER;
- }
+ /* Mark the current stage as done and go to the next stage */
+ msg_debug ("completed stage %d", st);
+ task->processed_stages |= st;
+
+ /* Tail recursion */
+ return rspamd_task_process (task, stages);
}
return TRUE;
@@ -548,48 +642,3 @@ check_metric_is_spam (struct rspamd_task *task, struct metric *metric)
return FALSE;
}
-
-gint
-rspamd_process_filters (struct rspamd_task *task)
-{
- GList *cur;
- struct metric *metric;
- gpointer item = NULL;
-
- /* Insert default metric to be sure that it exists all the time */
- rspamd_create_metric_result (task, DEFAULT_METRIC);
- if (task->settings) {
- const ucl_object_t *wl;
-
- wl = ucl_object_find_key (task->settings, "whitelist");
- if (wl != NULL) {
- msg_info ("<%s> is whitelisted", task->message_id);
- task->flags |= RSPAMD_TASK_FLAG_SKIP;
- return 0;
- }
- }
-
- /* Process metrics symbols */
- while (rspamd_symbols_cache_process_symbol (task, task->cfg->cache, &item)) {
- /* Check reject actions */
- cur = task->cfg->metrics_list;
- while (cur) {
- metric = cur->data;
- if (!(task->flags & RSPAMD_TASK_FLAG_PASS_ALL) &&
- metric->actions[METRIC_ACTION_REJECT].score > 0 &&
- check_metric_is_spam (task, metric)) {
- msg_info ("<%s> has already scored more than %.2f, so do not "
- "plan any more checks", task->message_id,
- metric->actions[METRIC_ACTION_REJECT].score);
- return 1;
- }
- cur = g_list_next (cur);
- }
- }
-
- if (rspamd_session_events_pending (task->s) != 0) {
- task->state = WAIT_FILTER;
- }
-
- return 1;
-}
diff --git a/src/libserver/task.h b/src/libserver/task.h
index 01a59b98c..d7698a130 100644
--- a/src/libserver/task.h
+++ b/src/libserver/task.h
@@ -58,8 +58,9 @@ enum rspamd_task_stage {
RSPAMD_TASK_STAGE_PRE_FILTERS = (1 << 3),
RSPAMD_TASK_STAGE_FILTERS = (1 << 4),
RSPAMD_TASK_STAGE_CLASSIFIERS = (1 << 5),
- RSPAMD_TASK_STAGE_POST_FILTERS = (1 << 6),
- RSPAMD_TASK_STAGE_WRITE_REPLY = (1 << 7)
+ RSPAMD_TASK_STAGE_COMPOSITES = (1 << 6),
+ RSPAMD_TASK_STAGE_POST_FILTERS = (1 << 7),
+ RSPAMD_TASK_STAGE_DONE = (1 << 8)
};
#define RSPAMD_TASK_PROCESS_ALL (RSPAMD_TASK_STAGE_CONNECT | \
@@ -89,6 +90,7 @@ enum rspamd_task_stage {
#define RSPAMD_TASK_IS_SKIPPED(task) (((task)->flags & RSPAMD_TASK_FLAG_SKIP))
#define RSPAMD_TASK_IS_JSON(task) (((task)->flags & RSPAMD_TASK_FLAG_JSON))
#define RSPAMD_TASK_IS_SPAMC(task) (((task)->flags & RSPAMD_TASK_FLAG_SPAMC))
+#define RSPAMD_TASK_IS_PROCESSED(task) (((task)->processed_stages & RSPAMD_TASK_STAGE_DONE))
typedef gint (*protocol_reply_func)(struct rspamd_task *task);
@@ -157,7 +159,6 @@ struct rspamd_task {
double time_virtual;
struct timeval tv;
guint32 scan_milliseconds; /**< how much milliseconds passed */
- guint32 parser_recursion; /**< for avoiding recursion stack overflow */
gboolean (*fin_callback)(struct rspamd_task *task, void *arg); /**< calback for filters finalizing */
void *fin_arg; /**< argument for fin callback */
diff --git a/src/lmtp.c b/src/lmtp.c
index 91fbff17e..8adb46677 100644
--- a/src/lmtp.c
+++ b/src/lmtp.c
@@ -166,7 +166,7 @@ lmtp_read_socket (rspamd_fstring_t * in, void *arg)
}
break;
case READ_MESSAGE:
- r = process_message (lmtp->task);
+ r = rspamd_message_parse (lmtp->task);
r = rspamd_process_filters (lmtp->task);
if (r == -1) {
return FALSE;
diff --git a/src/lua/lua_task.c b/src/lua/lua_task.c
index 8a8f6c640..36f100449 100644
--- a/src/lua/lua_task.c
+++ b/src/lua/lua_task.c
@@ -71,7 +71,7 @@ LUA_FUNCTION_DEF (task, create_empty);
LUA_FUNCTION_DEF (task, create_from_buffer);
/* Task methods */
LUA_FUNCTION_DEF (task, get_message);
-LUA_FUNCTION_DEF (task, process_message);
+LUA_FUNCTION_DEF (task, rspamd_message_parse);
/***
* @method task:get_cfg()
* Get configuration object for a task.
@@ -474,7 +474,7 @@ static const struct luaL_reg tasklib_f[] = {
static const struct luaL_reg tasklib_m[] = {
LUA_INTERFACE_DEF (task, get_message),
LUA_INTERFACE_DEF (task, destroy),
- LUA_INTERFACE_DEF (task, process_message),
+ LUA_INTERFACE_DEF (task, rspamd_message_parse),
LUA_INTERFACE_DEF (task, set_cfg),
LUA_INTERFACE_DEF (task, get_cfg),
LUA_INTERFACE_DEF (task, get_mempool),
@@ -623,7 +623,7 @@ lua_task_process_message (lua_State *L)
struct rspamd_task *task = lua_check_task (L, 1);
if (task != NULL && task->msg.len > 0) {
- if (process_message (task) == 0) {
+ if (rspamd_message_parse (task) == 0) {
lua_pushboolean (L, TRUE);
}
else {
diff --git a/src/plugins/dkim_check.c b/src/plugins/dkim_check.c
index 4ffbb9672..9787ac5a5 100644
--- a/src/plugins/dkim_check.c
+++ b/src/plugins/dkim_check.c
@@ -425,7 +425,7 @@ dkim_symbol_callback (struct rspamd_task *task, void *unused)
struct dkim_check_result *res = NULL, *cur;
/* First check if a message has its signature */
- hlist = message_get_header (task,
+ hlist = rspamd_message_get_header (task,
DKIM_SIGNHEADER,
FALSE);
if (hlist != NULL) {
diff --git a/src/plugins/fuzzy_check.c b/src/plugins/fuzzy_check.c
index 848d5d54b..c585df7a4 100644
--- a/src/plugins/fuzzy_check.c
+++ b/src/plugins/fuzzy_check.c
@@ -1227,7 +1227,7 @@ fuzzy_process_handler (struct rspamd_http_connection_entry *conn_ent,
saved = rspamd_mempool_alloc0 (task->task_pool, sizeof (gint));
err = rspamd_mempool_alloc0 (task->task_pool, sizeof (GError *));
- r = process_message (task);
+ r = rspamd_message_parse (task);
if (r == -1) {
msg_warn ("<%s>: cannot process message for fuzzy", task->message_id);
rspamd_task_free (task, FALSE);
diff --git a/src/smtp.c b/src/smtp.c
index 74e2323a2..51d5753aa 100644
--- a/src/smtp.c
+++ b/src/smtp.c
@@ -305,7 +305,7 @@ process_smtp_data (struct smtp_session *session)
sizeof (struct in_addr));
session->task->cmd = CMD_CHECK;
- if (process_message (session->task) == -1) {
+ if (rspamd_message_parse (session->task) == -1) {
msg_err ("cannot process message");
munmap (session->task->msg->str, st.st_size);
goto err;