diff options
author | Vsevolod Stakhov <vsevolod@highsecure.ru> | 2014-04-22 14:43:00 +0100 |
---|---|---|
committer | Vsevolod Stakhov <vsevolod@highsecure.ru> | 2014-04-22 14:43:00 +0100 |
commit | 7de497b731406b0dceba4a86075d4086f5bd936a (patch) | |
tree | 4921a21cf035a999bc07785aa7c7a960e1c2787e | |
parent | e6d42dffd5367ca97a09167194f027727313af59 (diff) | |
download | rspamd-7de497b731406b0dceba4a86075d4086f5bd936a.tar.gz rspamd-7de497b731406b0dceba4a86075d4086f5bd936a.zip |
Unify task scan functions.
-rw-r--r-- | src/libmime/worker_util.c | 113 | ||||
-rw-r--r-- | src/libserver/task.c | 194 | ||||
-rw-r--r-- | src/libserver/task.h | 14 | ||||
-rw-r--r-- | src/worker.c | 55 |
4 files changed, 200 insertions, 176 deletions
diff --git a/src/libmime/worker_util.c b/src/libmime/worker_util.c index d029f5dc4..aa5719f2c 100644 --- a/src/libmime/worker_util.c +++ b/src/libmime/worker_util.c @@ -140,116 +140,3 @@ worker_stop_accept (struct rspamd_worker *worker) g_list_free (worker->accept_events); } } - -/* - * Called if all filters are processed - * @return TRUE if session should be terminated - */ -gboolean -rspamd_task_fin (void *arg) -{ - struct rspamd_task *task = (struct rspamd_task *) arg; - gint r; - GError *err = NULL; - - /* Task is already finished or skipped */ - if (task->state == WRITE_REPLY) { - if (task->fin_callback) { - task->fin_callback (task->fin_arg); - } - else { - rspamd_protocol_write_reply (task); - } - return TRUE; - } - - /* We processed all filters and want to process statfiles */ - if (task->state != WAIT_POST_FILTER && task->state != WAIT_PRE_FILTER) { - /* Process all statfiles */ - if (task->classify_pool == NULL) { - /* Non-threaded version */ - process_statfiles (task); - } - else { - /* Just process composites */ - make_composites (task); - } - if (task->cfg->post_filters) { - /* More to process */ - /* Special state */ - task->state = WAIT_POST_FILTER; - return FALSE; - } - - } - - /* We are on post-filter waiting state */ - if (task->state != WAIT_PRE_FILTER) { - /* Check if we have all events finished */ - task->state = WRITE_REPLY; - if (task->fin_callback) { - task->fin_callback (task->fin_arg); - } - else { - rspamd_protocol_write_reply (task); - } - } - else { - /* We were waiting for pre-filter */ - if (task->pre_result.action != METRIC_ACTION_NOACTION) { - /* Write result based on pre filters */ - task->state = WRITE_REPLY; - if (task->fin_callback) { - task->fin_callback (task->fin_arg); - } - else { - rspamd_protocol_write_reply (task); - } - return TRUE; - } - else { - task->state = WAIT_FILTER; - r = process_filters (task); - if (r == -1) { - task->last_error = "Filter processing error"; - task->error_code = RSPAMD_FILTER_ERROR; - task->state = WRITE_REPLY; - rspamd_protocol_write_reply (task); - return TRUE; - } - /* Add task to classify to classify pool */ - if (!task->is_skipped && task->classify_pool) { - register_async_thread (task->s); - g_thread_pool_push (task->classify_pool, task, &err); - if (err != NULL) { - msg_err ("cannot pull task to the pool: %s", err->message); - remove_async_thread (task->s); - g_error_free (err); - } - } - if (task->is_skipped) { - rspamd_protocol_write_reply (task); - } - else { - return FALSE; - } - } - } - - return TRUE; -} - -/* - * Called if session was restored inside fin callback - */ -void -rspamd_task_restore (void *arg) -{ - struct rspamd_task *task = (struct rspamd_task *) arg; - - /* Call post filters */ - if (task->state == WAIT_POST_FILTER) { - lua_call_post_filters (task); - } - task->s->wanna_die = TRUE; -} diff --git a/src/libserver/task.c b/src/libserver/task.c index f389793dd..04e385479 100644 --- a/src/libserver/task.c +++ b/src/libserver/task.c @@ -24,7 +24,9 @@ #include "task.h" #include "main.h" #include "filter.h" +#include "protocol.h" #include "message.h" +#include "lua/lua_common.h" /* * Destructor for recipients list in a task @@ -102,6 +104,115 @@ rspamd_task_new (struct rspamd_worker *worker) } +static void +rspamd_task_reply (struct rspamd_task *task) +{ + if (task->fin_callback) { + task->fin_callback (task->fin_arg); + } + else { + rspamd_protocol_write_reply (task); + } +} + +/* + * Called if all filters are processed + * @return TRUE if session should be terminated + */ +gboolean +rspamd_task_fin (void *arg) +{ + struct rspamd_task *task = (struct rspamd_task *) arg; + gint r; + GError *err = NULL; + + /* Task is already finished or skipped */ + if (task->state == WRITE_REPLY) { + rspamd_task_reply (task); + return TRUE; + } + + /* We processed all filters and want to process statfiles */ + if (task->state != WAIT_POST_FILTER && task->state != WAIT_PRE_FILTER) { + /* Process all statfiles */ + if (task->classify_pool == NULL) { + /* Non-threaded version */ + process_statfiles (task); + } + else { + /* Just process composites */ + make_composites (task); + } + if (task->cfg->post_filters) { + /* More to process */ + /* Special state */ + task->state = WAIT_POST_FILTER; + return FALSE; + } + + } + + /* We are on post-filter waiting state */ + if (task->state != WAIT_PRE_FILTER) { + /* Check if we have all events finished */ + task->state = WRITE_REPLY; + rspamd_task_reply (task); + } + else { + /* We were waiting for pre-filter */ + if (task->pre_result.action != METRIC_ACTION_NOACTION) { + /* Write result based on pre filters */ + task->state = WRITE_REPLY; + rspamd_task_reply (task); + return TRUE; + } + else { + task->state = WAIT_FILTER; + r = process_filters (task); + if (r == -1) { + task->last_error = "Filter processing error"; + task->error_code = RSPAMD_FILTER_ERROR; + task->state = WRITE_REPLY; + rspamd_task_reply (task); + return TRUE; + } + /* Add task to classify to classify pool */ + if (!task->is_skipped && task->classify_pool) { + register_async_thread (task->s); + g_thread_pool_push (task->classify_pool, task, &err); + if (err != NULL) { + msg_err ("cannot pull task to the pool: %s", err->message); + remove_async_thread (task->s); + g_error_free (err); + } + } + if (task->is_skipped) { + rspamd_task_reply (task); + } + else { + return FALSE; + } + } + } + + return TRUE; +} + +/* + * Called if session was restored inside fin callback + */ +void +rspamd_task_restore (void *arg) +{ + struct rspamd_task *task = (struct rspamd_task *) arg; + + /* Call post filters */ + if (task->state == WAIT_POST_FILTER && !task->skip_extra_filters) { + lua_call_post_filters (task); + } + task->s->wanna_die = TRUE; +} + /* * Free all structures of worker_task */ @@ -142,18 +253,85 @@ rspamd_task_free (struct rspamd_task *task, gboolean is_soft) } } -void -rspamd_task_free_hard (gpointer ud) +void rspamd_task_free_hard (gpointer ud) { - struct rspamd_task *task = ud; + struct rspamd_task *task = ud; - rspamd_task_free (task, FALSE); + rspamd_task_free (task, FALSE); } -void -rspamd_task_free_soft (gpointer ud) +void rspamd_task_free_soft (gpointer ud) +{ + struct rspamd_task *task = ud; + + rspamd_task_free (task, FALSE); +} + + +gboolean +rspamd_task_process (struct rspamd_task *task, + struct rspamd_http_message *msg, GThreadPool *classify_pool, + gboolean process_extra_filters) { - struct rspamd_task *task = ud; + gint r; + GError *err = NULL; + + if (msg->body->len == 0) { + msg_err ("got zero length body"); + task->last_error = "message's body is empty"; + return FALSE; + } + + task->msg = msg->body; + + debug_task ("got string of length %z", task->msg->len); + + /* We got body, set wanna_die flag */ + task->s->wanna_die = TRUE; + + r = process_message (task); + if (r == -1) { + msg_warn ("processing of message failed"); + task->last_error = "MIME processing error"; + task->error_code = RSPAMD_FILTER_ERROR; + task->state = WRITE_REPLY; + return FALSE; + } + task->skip_extra_filters = !process_extra_filters; + if (!process_extra_filters || task->cfg->pre_filters == NULL) { + r = process_filters (task); + if (r == -1) { + task->last_error = "filter processing error"; + task->error_code = RSPAMD_FILTER_ERROR; + task->state = WRITE_REPLY; + return FALSE; + } + /* Add task to classify to classify pool */ + if (!task->is_skipped && classify_pool) { + register_async_thread (task->s); + g_thread_pool_push (classify_pool, task, &err); + if (err != NULL) { + msg_err ("cannot pull task to the pool: %s", err->message); + remove_async_thread (task->s); + g_error_free (err); + } + else { + task->classify_pool = classify_pool; + } + } + if (task->is_skipped) { + /* Call write_socket to write reply and exit */ + task->state = WRITE_REPLY; + return TRUE; + } + } + else { + lua_call_pre_filters (task); + /* We want fin_task after pre filters are processed */ + task->s->wanna_die = TRUE; + task->state = WAIT_PRE_FILTER; + check_session_pending (task->s); + } - rspamd_task_free (task, FALSE); + return TRUE; } diff --git a/src/libserver/task.h b/src/libserver/task.h index f8f7c89e3..0891dc6e2 100644 --- a/src/libserver/task.h +++ b/src/libserver/task.h @@ -76,7 +76,7 @@ struct rspamd_task { gint sock; /**< socket descriptor */ gboolean is_mime; /**< if this task is mime task */ gboolean is_json; /**< output is JSON */ - gboolean allow_learn; /**< allow learning */ + gboolean skip_extra_filters; /**< skip pre and post filters */ gboolean is_skipped; /**< whether message was skipped by configuration */ gchar *helo; /**< helo header value */ @@ -162,4 +162,16 @@ void rspamd_task_restore (void *arg); */ gboolean rspamd_task_fin (void *arg); +/** + * Process task from http message and write reply or call task->fin_handler + * @param task task to process + * @param msg incoming http message + * @param classify_pool classify pool (or NULL) + * @param process_extra_filters whether to check pre and post filters + * @return task has been successfully parsed and processed + */ +gboolean rspamd_task_process (struct rspamd_task *task, + struct rspamd_http_message *msg, GThreadPool *classify_pool, + gboolean process_extra_filters); + #endif /* TASK_H_ */ diff --git a/src/worker.c b/src/worker.c index 94ec74451..9f6e269bb 100644 --- a/src/worker.c +++ b/src/worker.c @@ -170,8 +170,6 @@ rspamd_worker_body_handler (struct rspamd_http_connection *conn, { struct rspamd_task *task = (struct rspamd_task *) conn->ud; struct rspamd_worker_ctx *ctx; - ssize_t r; - GError *err = NULL; ctx = task->worker->ctx; @@ -193,58 +191,8 @@ rspamd_worker_body_handler (struct rspamd_http_connection *conn, return 0; } - task->msg = msg->body; + rspamd_task_process (task, msg, ctx->classify_pool, TRUE); - debug_task ("got string of length %z", task->msg->len); - - /* We got body, set wanna_die flag */ - task->s->wanna_die = TRUE; - - r = process_message (task); - if (r == -1) { - msg_warn ("processing of message failed"); - task->last_error = "MIME processing error"; - task->error_code = RSPAMD_FILTER_ERROR; - task->state = WRITE_REPLY; - return 0; - } - if (task->cmd == CMD_OTHER) { - /* Skip filters */ - task->state = WRITE_REPLY; - return 0; - } - else { - if (task->cfg->pre_filters == NULL) { - r = process_filters (task); - if (r == -1) { - task->last_error = "filter processing error"; - task->error_code = RSPAMD_FILTER_ERROR; - task->state = WRITE_REPLY; - return 0; - } - /* Add task to classify to classify pool */ - if (!task->is_skipped && ctx->classify_pool) { - register_async_thread (task->s); - g_thread_pool_push (ctx->classify_pool, task, &err); - if (err != NULL) { - msg_err ("cannot pull task to the pool: %s", err->message); - remove_async_thread (task->s); - } - } - if (task->is_skipped) { - /* Call write_socket to write reply and exit */ - task->state = WRITE_REPLY; - return 0; - } - } - else { - lua_call_pre_filters (task); - /* We want fin_task after pre filters are processed */ - task->s->wanna_die = TRUE; - task->state = WAIT_PRE_FILTER; - check_session_pending (task->s); - } - } return 0; } @@ -329,7 +277,6 @@ accept_socket (gint fd, short what, void *arg) /* Copy some variables */ new_task->sock = nfd; new_task->is_mime = ctx->is_mime; - new_task->allow_learn = ctx->allow_learn; memcpy (&new_task->client_addr, &addr, sizeof (addr)); worker->srv->stat->connections_count++; |