Browse Source

Rework task processing.

tags/1.0.0
Vsevolod Stakhov 9 years ago
parent
commit
a645c70082
7 changed files with 133 additions and 83 deletions
  1. 122
    73
      src/libserver/task.c
  2. 4
    3
      src/libserver/task.h
  3. 1
    1
      src/lmtp.c
  4. 3
    3
      src/lua/lua_task.c
  5. 1
    1
      src/plugins/dkim_check.c
  6. 1
    1
      src/plugins/fuzzy_check.c
  7. 1
    1
      src/smtp.c

+ 122
- 73
src/libserver/task.c View File

@@ -329,47 +329,141 @@ rspamd_task_load_message (struct rspamd_task *task,
return TRUE;
}

static gint
rspamd_task_select_processing_stage (struct rspamd_task *task, guint stages)
{
gint st;

st = ffs (task->processed_stages);

if (st == -1) {
st = (1 << 0);
}
else {
st = (1 << (st + 1));
}

if (stages & st) {
return st;
}
else if (st < RSPAMD_TASK_STAGE_DONE) {
/* We assume that the stage that was not requested is done */
task->processed_stages |= st;
return rspamd_task_select_processing_stage (task, stages);
}

/* We are done */
return RSPAMD_TASK_STAGE_DONE;
}

static gboolean
rspamd_process_filters (struct rspamd_task *task)
{
GList *cur;
struct metric *metric;
gpointer item = NULL;

/* Insert default metric to be sure that it exists all the time */
/* TODO: make preprocessing only once */
rspamd_create_metric_result (task, DEFAULT_METRIC);
if (task->settings) {
const ucl_object_t *wl;

wl = ucl_object_find_key (task->settings, "whitelist");
if (wl != NULL) {
msg_info ("<%s> is whitelisted", task->message_id);
task->flags |= RSPAMD_TASK_FLAG_SKIP;
return TRUE;
}
}

/* Process metrics symbols */
while (rspamd_symbols_cache_process_symbol (task, task->cfg->cache, &item)) {
/* Check reject actions */
cur = task->cfg->metrics_list;
while (cur) {
metric = cur->data;
if (!(task->flags & RSPAMD_TASK_FLAG_PASS_ALL) &&
metric->actions[METRIC_ACTION_REJECT].score > 0 &&
check_metric_is_spam (task, metric)) {
msg_info ("<%s> has already scored more than %.2f, so do not "
"plan any more checks", task->message_id,
metric->actions[METRIC_ACTION_REJECT].score);
return TRUE;
}
cur = g_list_next (cur);
}
}

return TRUE;
}

gboolean
rspamd_task_process (struct rspamd_task *task,
struct rspamd_http_message *msg, const gchar *start, gsize len,
guint stages)
rspamd_task_process (struct rspamd_task *task, guint stages)
{
gint r;
gint st;

if (stages & RSPAMD_TASK_STAGE_READ_MESSAGE) {
/* Process message itself */
r = process_message (task);
if (r == -1) {
msg_warn ("processing of message failed");
task->last_error = "MIME processing error";
task->error_code = RSPAMD_FILTER_ERROR;
if (RSPAMD_TASK_IS_PROCESSED (task)) {
return TRUE;
}

st = rspamd_task_select_processing_stage (task, stages);

switch (st) {
case RSPAMD_TASK_STAGE_READ_MESSAGE:
if (!rspamd_message_parse (task)) {
return FALSE;
}
if (!process_extra_filters) {
task->flags |= RSPAMD_TASK_FLAG_SKIP_EXTRA;
}
if (!process_extra_filters || task->cfg->pre_filters == NULL) {
r = rspamd_process_filters (task);
break;

if (r == -1) {
task->last_error = "filter processing error";
task->error_code = RSPAMD_FILTER_ERROR;
task->state = WRITE_REPLY;
case RSPAMD_TASK_STAGE_PRE_FILTERS:
rspamd_lua_call_pre_filters (task);
break;

case RSPAMD_TASK_STAGE_FILTERS:
if (!rspamd_process_filters (task)) {
return FALSE;
}
break;

if (RSPAMD_TASK_IS_SKIPPED (task)) {
/* Call write_socket to write reply and exit */
task->state = WRITE_REPLY;
case RSPAMD_TASK_STAGE_CLASSIFIERS:
if (!rspamd_stat_classify (task, task->cfg->lua_state, &task->err)) {
return FALSE;
}
break;

case RSPAMD_TASK_STAGE_COMPOSITES:
rspamd_make_composites (task);
break;

case RSPAMD_TASK_STAGE_POST_FILTERS:
rspamd_lua_call_post_filters (task);
break;

case RSPAMD_TASK_STAGE_DONE:
return TRUE;

default:
/* TODO: not implemented stage */
break;
}

if (RSPAMD_TASK_IS_SKIPPED (task)) {
task->processed_stages |= RSPAMD_TASK_STAGE_DONE;
return TRUE;
}

if (rspamd_session_events_pending (task->s) != 0) {
/* We have events pending, so we consider this stage as incomplete */
msg_debug ("need more work on stage %d", st);
}
else {
rspamd_lua_call_pre_filters (task);
/* We want fin_task after pre filters are processed */
if (rspamd_session_events_pending (task->s) != 0) {
task->state = WAIT_PRE_FILTER;
}
/* Mark the current stage as done and go to the next stage */
msg_debug ("completed stage %d", st);
task->processed_stages |= st;

/* Tail recursion */
return rspamd_task_process (task, stages);
}

return TRUE;
@@ -548,48 +642,3 @@ check_metric_is_spam (struct rspamd_task *task, struct metric *metric)

return FALSE;
}

gint
rspamd_process_filters (struct rspamd_task *task)
{
GList *cur;
struct metric *metric;
gpointer item = NULL;

/* Insert default metric to be sure that it exists all the time */
rspamd_create_metric_result (task, DEFAULT_METRIC);
if (task->settings) {
const ucl_object_t *wl;

wl = ucl_object_find_key (task->settings, "whitelist");
if (wl != NULL) {
msg_info ("<%s> is whitelisted", task->message_id);
task->flags |= RSPAMD_TASK_FLAG_SKIP;
return 0;
}
}

/* Process metrics symbols */
while (rspamd_symbols_cache_process_symbol (task, task->cfg->cache, &item)) {
/* Check reject actions */
cur = task->cfg->metrics_list;
while (cur) {
metric = cur->data;
if (!(task->flags & RSPAMD_TASK_FLAG_PASS_ALL) &&
metric->actions[METRIC_ACTION_REJECT].score > 0 &&
check_metric_is_spam (task, metric)) {
msg_info ("<%s> has already scored more than %.2f, so do not "
"plan any more checks", task->message_id,
metric->actions[METRIC_ACTION_REJECT].score);
return 1;
}
cur = g_list_next (cur);
}
}

if (rspamd_session_events_pending (task->s) != 0) {
task->state = WAIT_FILTER;
}

return 1;
}

+ 4
- 3
src/libserver/task.h View File

@@ -58,8 +58,9 @@ enum rspamd_task_stage {
RSPAMD_TASK_STAGE_PRE_FILTERS = (1 << 3),
RSPAMD_TASK_STAGE_FILTERS = (1 << 4),
RSPAMD_TASK_STAGE_CLASSIFIERS = (1 << 5),
RSPAMD_TASK_STAGE_POST_FILTERS = (1 << 6),
RSPAMD_TASK_STAGE_WRITE_REPLY = (1 << 7)
RSPAMD_TASK_STAGE_COMPOSITES = (1 << 6),
RSPAMD_TASK_STAGE_POST_FILTERS = (1 << 7),
RSPAMD_TASK_STAGE_DONE = (1 << 8)
};

#define RSPAMD_TASK_PROCESS_ALL (RSPAMD_TASK_STAGE_CONNECT | \
@@ -89,6 +90,7 @@ enum rspamd_task_stage {
#define RSPAMD_TASK_IS_SKIPPED(task) (((task)->flags & RSPAMD_TASK_FLAG_SKIP))
#define RSPAMD_TASK_IS_JSON(task) (((task)->flags & RSPAMD_TASK_FLAG_JSON))
#define RSPAMD_TASK_IS_SPAMC(task) (((task)->flags & RSPAMD_TASK_FLAG_SPAMC))
#define RSPAMD_TASK_IS_PROCESSED(task) (((task)->processed_stages & RSPAMD_TASK_STAGE_DONE))

typedef gint (*protocol_reply_func)(struct rspamd_task *task);

@@ -157,7 +159,6 @@ struct rspamd_task {
double time_virtual;
struct timeval tv;
guint32 scan_milliseconds; /**< how much milliseconds passed */
guint32 parser_recursion; /**< for avoiding recursion stack overflow */
gboolean (*fin_callback)(struct rspamd_task *task, void *arg); /**< calback for filters finalizing */
void *fin_arg; /**< argument for fin callback */


+ 1
- 1
src/lmtp.c View File

@@ -166,7 +166,7 @@ lmtp_read_socket (rspamd_fstring_t * in, void *arg)
}
break;
case READ_MESSAGE:
r = process_message (lmtp->task);
r = rspamd_message_parse (lmtp->task);
r = rspamd_process_filters (lmtp->task);
if (r == -1) {
return FALSE;

+ 3
- 3
src/lua/lua_task.c View File

@@ -71,7 +71,7 @@ LUA_FUNCTION_DEF (task, create_empty);
LUA_FUNCTION_DEF (task, create_from_buffer);
/* Task methods */
LUA_FUNCTION_DEF (task, get_message);
LUA_FUNCTION_DEF (task, process_message);
LUA_FUNCTION_DEF (task, rspamd_message_parse);
/***
* @method task:get_cfg()
* Get configuration object for a task.
@@ -474,7 +474,7 @@ static const struct luaL_reg tasklib_f[] = {
static const struct luaL_reg tasklib_m[] = {
LUA_INTERFACE_DEF (task, get_message),
LUA_INTERFACE_DEF (task, destroy),
LUA_INTERFACE_DEF (task, process_message),
LUA_INTERFACE_DEF (task, rspamd_message_parse),
LUA_INTERFACE_DEF (task, set_cfg),
LUA_INTERFACE_DEF (task, get_cfg),
LUA_INTERFACE_DEF (task, get_mempool),
@@ -623,7 +623,7 @@ lua_task_process_message (lua_State *L)
struct rspamd_task *task = lua_check_task (L, 1);

if (task != NULL && task->msg.len > 0) {
if (process_message (task) == 0) {
if (rspamd_message_parse (task) == 0) {
lua_pushboolean (L, TRUE);
}
else {

+ 1
- 1
src/plugins/dkim_check.c View File

@@ -425,7 +425,7 @@ dkim_symbol_callback (struct rspamd_task *task, void *unused)
struct dkim_check_result *res = NULL, *cur;
/* First check if a message has its signature */

hlist = message_get_header (task,
hlist = rspamd_message_get_header (task,
DKIM_SIGNHEADER,
FALSE);
if (hlist != NULL) {

+ 1
- 1
src/plugins/fuzzy_check.c View File

@@ -1227,7 +1227,7 @@ fuzzy_process_handler (struct rspamd_http_connection_entry *conn_ent,

saved = rspamd_mempool_alloc0 (task->task_pool, sizeof (gint));
err = rspamd_mempool_alloc0 (task->task_pool, sizeof (GError *));
r = process_message (task);
r = rspamd_message_parse (task);
if (r == -1) {
msg_warn ("<%s>: cannot process message for fuzzy", task->message_id);
rspamd_task_free (task, FALSE);

+ 1
- 1
src/smtp.c View File

@@ -305,7 +305,7 @@ process_smtp_data (struct smtp_session *session)
sizeof (struct in_addr));
session->task->cmd = CMD_CHECK;

if (process_message (session->task) == -1) {
if (rspamd_message_parse (session->task) == -1) {
msg_err ("cannot process message");
munmap (session->task->msg->str, st.st_size);
goto err;

Loading…
Cancel
Save