aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorVsevolod Stakhov <vsevolod@highsecure.ru>2014-04-22 14:43:00 +0100
committerVsevolod Stakhov <vsevolod@highsecure.ru>2014-04-22 14:43:00 +0100
commit7de497b731406b0dceba4a86075d4086f5bd936a (patch)
tree4921a21cf035a999bc07785aa7c7a960e1c2787e
parente6d42dffd5367ca97a09167194f027727313af59 (diff)
downloadrspamd-7de497b731406b0dceba4a86075d4086f5bd936a.tar.gz
rspamd-7de497b731406b0dceba4a86075d4086f5bd936a.zip
Unify task scan functions.
-rw-r--r--src/libmime/worker_util.c113
-rw-r--r--src/libserver/task.c194
-rw-r--r--src/libserver/task.h14
-rw-r--r--src/worker.c55
4 files changed, 200 insertions, 176 deletions
diff --git a/src/libmime/worker_util.c b/src/libmime/worker_util.c
index d029f5dc4..aa5719f2c 100644
--- a/src/libmime/worker_util.c
+++ b/src/libmime/worker_util.c
@@ -140,116 +140,3 @@ worker_stop_accept (struct rspamd_worker *worker)
g_list_free (worker->accept_events);
}
}
-
-/*
- * Called if all filters are processed
- * @return TRUE if session should be terminated
- */
-gboolean
-rspamd_task_fin (void *arg)
-{
- struct rspamd_task *task = (struct rspamd_task *) arg;
- gint r;
- GError *err = NULL;
-
- /* Task is already finished or skipped */
- if (task->state == WRITE_REPLY) {
- if (task->fin_callback) {
- task->fin_callback (task->fin_arg);
- }
- else {
- rspamd_protocol_write_reply (task);
- }
- return TRUE;
- }
-
- /* We processed all filters and want to process statfiles */
- if (task->state != WAIT_POST_FILTER && task->state != WAIT_PRE_FILTER) {
- /* Process all statfiles */
- if (task->classify_pool == NULL) {
- /* Non-threaded version */
- process_statfiles (task);
- }
- else {
- /* Just process composites */
- make_composites (task);
- }
- if (task->cfg->post_filters) {
- /* More to process */
- /* Special state */
- task->state = WAIT_POST_FILTER;
- return FALSE;
- }
-
- }
-
- /* We are on post-filter waiting state */
- if (task->state != WAIT_PRE_FILTER) {
- /* Check if we have all events finished */
- task->state = WRITE_REPLY;
- if (task->fin_callback) {
- task->fin_callback (task->fin_arg);
- }
- else {
- rspamd_protocol_write_reply (task);
- }
- }
- else {
- /* We were waiting for pre-filter */
- if (task->pre_result.action != METRIC_ACTION_NOACTION) {
- /* Write result based on pre filters */
- task->state = WRITE_REPLY;
- if (task->fin_callback) {
- task->fin_callback (task->fin_arg);
- }
- else {
- rspamd_protocol_write_reply (task);
- }
- return TRUE;
- }
- else {
- task->state = WAIT_FILTER;
- r = process_filters (task);
- if (r == -1) {
- task->last_error = "Filter processing error";
- task->error_code = RSPAMD_FILTER_ERROR;
- task->state = WRITE_REPLY;
- rspamd_protocol_write_reply (task);
- return TRUE;
- }
- /* Add task to classify to classify pool */
- if (!task->is_skipped && task->classify_pool) {
- register_async_thread (task->s);
- g_thread_pool_push (task->classify_pool, task, &err);
- if (err != NULL) {
- msg_err ("cannot pull task to the pool: %s", err->message);
- remove_async_thread (task->s);
- g_error_free (err);
- }
- }
- if (task->is_skipped) {
- rspamd_protocol_write_reply (task);
- }
- else {
- return FALSE;
- }
- }
- }
-
- return TRUE;
-}
-
-/*
- * Called if session was restored inside fin callback
- */
-void
-rspamd_task_restore (void *arg)
-{
- struct rspamd_task *task = (struct rspamd_task *) arg;
-
- /* Call post filters */
- if (task->state == WAIT_POST_FILTER) {
- lua_call_post_filters (task);
- }
- task->s->wanna_die = TRUE;
-}
diff --git a/src/libserver/task.c b/src/libserver/task.c
index f389793dd..04e385479 100644
--- a/src/libserver/task.c
+++ b/src/libserver/task.c
@@ -24,7 +24,9 @@
#include "task.h"
#include "main.h"
#include "filter.h"
+#include "protocol.h"
#include "message.h"
+#include "lua/lua_common.h"
/*
* Destructor for recipients list in a task
@@ -102,6 +104,115 @@ rspamd_task_new (struct rspamd_worker *worker)
}
+static void
+rspamd_task_reply (struct rspamd_task *task)
+{
+ if (task->fin_callback) {
+ task->fin_callback (task->fin_arg);
+ }
+ else {
+ rspamd_protocol_write_reply (task);
+ }
+}
+
+/*
+ * Called if all filters are processed
+ * @return TRUE if session should be terminated
+ */
+gboolean
+rspamd_task_fin (void *arg)
+{
+ struct rspamd_task *task = (struct rspamd_task *) arg;
+ gint r;
+ GError *err = NULL;
+
+ /* Task is already finished or skipped */
+ if (task->state == WRITE_REPLY) {
+ rspamd_task_reply (task);
+ return TRUE;
+ }
+
+ /* We processed all filters and want to process statfiles */
+ if (task->state != WAIT_POST_FILTER && task->state != WAIT_PRE_FILTER) {
+ /* Process all statfiles */
+ if (task->classify_pool == NULL) {
+ /* Non-threaded version */
+ process_statfiles (task);
+ }
+ else {
+ /* Just process composites */
+ make_composites (task);
+ }
+ if (task->cfg->post_filters) {
+ /* More to process */
+ /* Special state */
+ task->state = WAIT_POST_FILTER;
+ return FALSE;
+ }
+
+ }
+
+ /* We are on post-filter waiting state */
+ if (task->state != WAIT_PRE_FILTER) {
+ /* Check if we have all events finished */
+ task->state = WRITE_REPLY;
+ rspamd_task_reply (task);
+ }
+ else {
+ /* We were waiting for pre-filter */
+ if (task->pre_result.action != METRIC_ACTION_NOACTION) {
+ /* Write result based on pre filters */
+ task->state = WRITE_REPLY;
+ rspamd_task_reply (task);
+ return TRUE;
+ }
+ else {
+ task->state = WAIT_FILTER;
+ r = process_filters (task);
+ if (r == -1) {
+ task->last_error = "Filter processing error";
+ task->error_code = RSPAMD_FILTER_ERROR;
+ task->state = WRITE_REPLY;
+ rspamd_task_reply (task);
+ return TRUE;
+ }
+ /* Add task to classify to classify pool */
+ if (!task->is_skipped && task->classify_pool) {
+ register_async_thread (task->s);
+ g_thread_pool_push (task->classify_pool, task, &err);
+ if (err != NULL) {
+ msg_err ("cannot pull task to the pool: %s", err->message);
+ remove_async_thread (task->s);
+ g_error_free (err);
+ }
+ }
+ if (task->is_skipped) {
+ rspamd_task_reply (task);
+ }
+ else {
+ return FALSE;
+ }
+ }
+ }
+
+ return TRUE;
+}
+
+/*
+ * Called if session was restored inside fin callback
+ */
+void
+rspamd_task_restore (void *arg)
+{
+ struct rspamd_task *task = (struct rspamd_task *) arg;
+
+ /* Call post filters */
+ if (task->state == WAIT_POST_FILTER && !task->skip_extra_filters) {
+ lua_call_post_filters (task);
+ }
+ task->s->wanna_die = TRUE;
+}
+
/*
* Free all structures of worker_task
*/
@@ -142,18 +253,85 @@ rspamd_task_free (struct rspamd_task *task, gboolean is_soft)
}
}
-void
-rspamd_task_free_hard (gpointer ud)
+void rspamd_task_free_hard (gpointer ud)
{
- struct rspamd_task *task = ud;
+ struct rspamd_task *task = ud;
- rspamd_task_free (task, FALSE);
+ rspamd_task_free (task, FALSE);
}
-void
-rspamd_task_free_soft (gpointer ud)
+void rspamd_task_free_soft (gpointer ud)
+{
+ struct rspamd_task *task = ud;
+
+ rspamd_task_free (task, FALSE);
+}
+
+
+gboolean
+rspamd_task_process (struct rspamd_task *task,
+ struct rspamd_http_message *msg, GThreadPool *classify_pool,
+ gboolean process_extra_filters)
{
- struct rspamd_task *task = ud;
+ gint r;
+ GError *err = NULL;
+
+ if (msg->body->len == 0) {
+ msg_err ("got zero length body");
+ task->last_error = "message's body is empty";
+ return FALSE;
+ }
+
+ task->msg = msg->body;
+
+ debug_task ("got string of length %z", task->msg->len);
+
+ /* We got body, set wanna_die flag */
+ task->s->wanna_die = TRUE;
+
+ r = process_message (task);
+ if (r == -1) {
+ msg_warn ("processing of message failed");
+ task->last_error = "MIME processing error";
+ task->error_code = RSPAMD_FILTER_ERROR;
+ task->state = WRITE_REPLY;
+ return FALSE;
+ }
+ task->skip_extra_filters = !process_extra_filters;
+ if (!process_extra_filters || task->cfg->pre_filters == NULL) {
+ r = process_filters (task);
+ if (r == -1) {
+ task->last_error = "filter processing error";
+ task->error_code = RSPAMD_FILTER_ERROR;
+ task->state = WRITE_REPLY;
+ return FALSE;
+ }
+ /* Add task to classify to classify pool */
+ if (!task->is_skipped && classify_pool) {
+ register_async_thread (task->s);
+ g_thread_pool_push (classify_pool, task, &err);
+ if (err != NULL) {
+ msg_err ("cannot pull task to the pool: %s", err->message);
+ remove_async_thread (task->s);
+ g_error_free (err);
+ }
+ else {
+ task->classify_pool = classify_pool;
+ }
+ }
+ if (task->is_skipped) {
+ /* Call write_socket to write reply and exit */
+ task->state = WRITE_REPLY;
+ return TRUE;
+ }
+ }
+ else {
+ lua_call_pre_filters (task);
+ /* We want fin_task after pre filters are processed */
+ task->s->wanna_die = TRUE;
+ task->state = WAIT_PRE_FILTER;
+ check_session_pending (task->s);
+ }
- rspamd_task_free (task, FALSE);
+ return TRUE;
}
diff --git a/src/libserver/task.h b/src/libserver/task.h
index f8f7c89e3..0891dc6e2 100644
--- a/src/libserver/task.h
+++ b/src/libserver/task.h
@@ -76,7 +76,7 @@ struct rspamd_task {
gint sock; /**< socket descriptor */
gboolean is_mime; /**< if this task is mime task */
gboolean is_json; /**< output is JSON */
- gboolean allow_learn; /**< allow learning */
+ gboolean skip_extra_filters; /**< skip pre and post filters */
gboolean is_skipped; /**< whether message was skipped by configuration */
gchar *helo; /**< helo header value */
@@ -162,4 +162,16 @@ void rspamd_task_restore (void *arg);
*/
gboolean rspamd_task_fin (void *arg);
+/**
+ * Process task from http message and write reply or call task->fin_handler
+ * @param task task to process
+ * @param msg incoming http message
+ * @param classify_pool classify pool (or NULL)
+ * @param process_extra_filters whether to check pre and post filters
+ * @return task has been successfully parsed and processed
+ */
+gboolean rspamd_task_process (struct rspamd_task *task,
+ struct rspamd_http_message *msg, GThreadPool *classify_pool,
+ gboolean process_extra_filters);
+
#endif /* TASK_H_ */
diff --git a/src/worker.c b/src/worker.c
index 94ec74451..9f6e269bb 100644
--- a/src/worker.c
+++ b/src/worker.c
@@ -170,8 +170,6 @@ rspamd_worker_body_handler (struct rspamd_http_connection *conn,
{
struct rspamd_task *task = (struct rspamd_task *) conn->ud;
struct rspamd_worker_ctx *ctx;
- ssize_t r;
- GError *err = NULL;
ctx = task->worker->ctx;
@@ -193,58 +191,8 @@ rspamd_worker_body_handler (struct rspamd_http_connection *conn,
return 0;
}
- task->msg = msg->body;
+ rspamd_task_process (task, msg, ctx->classify_pool, TRUE);
- debug_task ("got string of length %z", task->msg->len);
-
- /* We got body, set wanna_die flag */
- task->s->wanna_die = TRUE;
-
- r = process_message (task);
- if (r == -1) {
- msg_warn ("processing of message failed");
- task->last_error = "MIME processing error";
- task->error_code = RSPAMD_FILTER_ERROR;
- task->state = WRITE_REPLY;
- return 0;
- }
- if (task->cmd == CMD_OTHER) {
- /* Skip filters */
- task->state = WRITE_REPLY;
- return 0;
- }
- else {
- if (task->cfg->pre_filters == NULL) {
- r = process_filters (task);
- if (r == -1) {
- task->last_error = "filter processing error";
- task->error_code = RSPAMD_FILTER_ERROR;
- task->state = WRITE_REPLY;
- return 0;
- }
- /* Add task to classify to classify pool */
- if (!task->is_skipped && ctx->classify_pool) {
- register_async_thread (task->s);
- g_thread_pool_push (ctx->classify_pool, task, &err);
- if (err != NULL) {
- msg_err ("cannot pull task to the pool: %s", err->message);
- remove_async_thread (task->s);
- }
- }
- if (task->is_skipped) {
- /* Call write_socket to write reply and exit */
- task->state = WRITE_REPLY;
- return 0;
- }
- }
- else {
- lua_call_pre_filters (task);
- /* We want fin_task after pre filters are processed */
- task->s->wanna_die = TRUE;
- task->state = WAIT_PRE_FILTER;
- check_session_pending (task->s);
- }
- }
return 0;
}
@@ -329,7 +277,6 @@ accept_socket (gint fd, short what, void *arg)
/* Copy some variables */
new_task->sock = nfd;
new_task->is_mime = ctx->is_mime;
- new_task->allow_learn = ctx->allow_learn;
memcpy (&new_task->client_addr, &addr, sizeof (addr));
worker->srv->stat->connections_count++;