diff options
author | Vsevolod Stakhov <vsevolod@rambler-co.ru> | 2012-03-30 20:54:18 +0400 |
---|---|---|
committer | Vsevolod Stakhov <vsevolod@rambler-co.ru> | 2012-03-30 20:54:18 +0400 |
commit | e5d0c7f8f6cda246eddfcab82b056650be753fe7 (patch) | |
tree | 6a2370ad9e0bd6dbf363ef0cb6927196c832c758 /src/worker.c | |
parent | 8e09451a57dda5becb741e289a5dbb9890747f8d (diff) | |
download | rspamd-e5d0c7f8f6cda246eddfcab82b056650be753fe7.tar.gz rspamd-e5d0c7f8f6cda246eddfcab82b056650be753fe7.zip |
* Implement pre-filters that realizes concepts to check mail by some absolute values like:
- greylisting
- DNS BL/WL
- ratelimits
Diffstat (limited to 'src/worker.c')
-rw-r--r-- | src/worker.c | 95 |
1 files changed, 73 insertions, 22 deletions
diff --git a/src/worker.c b/src/worker.c index d56c2d924..dad842ce3 100644 --- a/src/worker.c +++ b/src/worker.c @@ -247,6 +247,7 @@ construct_task (struct rspamd_worker *worker) new_task->urls); new_task->sock = -1; new_task->is_mime = TRUE; + new_task->pre_result.action = METRIC_ACTION_NOACTION; return new_task; } @@ -497,22 +498,31 @@ read_socket (f_str_t * in, void *arg) return write_socket (task); } else { - r = process_filters (task); - if (r == -1) { - task->last_error = "Filter processing error"; - task->error_code = RSPAMD_FILTER_ERROR; - task->state = WRITE_ERROR; - return write_socket (task); - } - /* Add task to classify to classify pool */ - if (ctx->classify_pool) { - register_async_thread (task->s); - g_thread_pool_push (ctx->classify_pool, task, &err); - if (err != NULL) { - msg_err ("cannot pull task to the pool: %s", err->message); - remove_async_thread (task->s); + if (task->cfg->pre_filters == NULL) { + r = process_filters (task); + if (r == -1) { + task->last_error = "Filter processing error"; + task->error_code = RSPAMD_FILTER_ERROR; + task->state = WRITE_ERROR; + return write_socket (task); + } + /* Add task to classify to classify pool */ + if (ctx->classify_pool) { + register_async_thread (task->s); + g_thread_pool_push (ctx->classify_pool, task, &err); + if (err != NULL) { + msg_err ("cannot pull task to the pool: %s", err->message); + remove_async_thread (task->s); + } } } + else { + lua_call_pre_filters (task); + /* We want fin_task after pre filters are processed */ + task->s->wanna_die = TRUE; + task->state = WAIT_PRE_FILTER; + check_session_pending (task->s); + } } break; case WRITE_REPLY: @@ -521,6 +531,7 @@ read_socket (f_str_t * in, void *arg) break; case WAIT_FILTER: case WAIT_POST_FILTER: + case WAIT_PRE_FILTER: msg_info ("ignoring trailing garbadge of size %z", in->len); break; default: @@ -539,6 +550,8 @@ write_socket (void *arg) { struct worker_task *task = (struct worker_task *) arg; struct rspamd_worker_ctx *ctx; + GError *err = NULL; + gint r; ctx = task->worker->ctx; @@ -578,6 +591,25 @@ write_socket (void *arg) case WAIT_POST_FILTER: /* Do nothing here */ break; + case WAIT_PRE_FILTER: + task->state = WAIT_FILTER; + r = process_filters (task); + if (r == -1) { + task->last_error = "Filter processing error"; + task->error_code = RSPAMD_FILTER_ERROR; + task->state = WRITE_ERROR; + return write_socket (task); + } + /* Add task to classify to classify pool */ + if (ctx->classify_pool) { + register_async_thread (task->s); + g_thread_pool_push (ctx->classify_pool, task, &err); + if (err != NULL) { + msg_err ("cannot pull task to the pool: %s", err->message); + remove_async_thread (task->s); + } + } + break; default: msg_info ("abnormally closing connection at state: %d", task->state); if (ctx->is_custom) { @@ -616,11 +648,12 @@ err_socket (GError * err, void *arg) static gboolean fin_task (void *arg) { - struct worker_task *task = (struct worker_task *) arg; - struct rspamd_worker_ctx *ctx; + struct worker_task *task = (struct worker_task *) arg; + struct rspamd_worker_ctx *ctx; + ctx = task->worker->ctx; - if (task->state != WAIT_POST_FILTER) { + if (task->state != WAIT_POST_FILTER && task->state != WAIT_PRE_FILTER) { /* Process all statfiles */ if (ctx->classify_pool == NULL) { /* Non-threaded version */ @@ -639,13 +672,31 @@ fin_task (void *arg) } - /* Check if we have all events finished */ - task->state = WRITE_REPLY; - if (task->fin_callback) { - task->fin_callback (task->fin_arg); + if (task->state != WAIT_PRE_FILTER) { + /* Check if we have all events finished */ + task->state = WRITE_REPLY; + if (task->fin_callback) { + task->fin_callback (task->fin_arg); + } + else { + rspamd_dispatcher_restore (task->dispatcher); + } } else { - rspamd_dispatcher_restore (task->dispatcher); + if (task->pre_result.action != METRIC_ACTION_NOACTION) { + /* Write result based on pre filters */ + task->state = WRITE_REPLY; + if (task->fin_callback) { + task->fin_callback (task->fin_arg); + } + else { + rspamd_dispatcher_restore (task->dispatcher); + } + } + else { + /* Check normal filters in write callback */ + rspamd_dispatcher_restore (task->dispatcher); + } } return TRUE; |