aboutsummaryrefslogtreecommitdiffstats
path: root/src/worker.c
diff options
context:
space:
mode:
authorVsevolod Stakhov <vsevolod@rambler-co.ru>2012-03-30 20:54:18 +0400
committerVsevolod Stakhov <vsevolod@rambler-co.ru>2012-03-30 20:54:18 +0400
commite5d0c7f8f6cda246eddfcab82b056650be753fe7 (patch)
tree6a2370ad9e0bd6dbf363ef0cb6927196c832c758 /src/worker.c
parent8e09451a57dda5becb741e289a5dbb9890747f8d (diff)
downloadrspamd-e5d0c7f8f6cda246eddfcab82b056650be753fe7.tar.gz
rspamd-e5d0c7f8f6cda246eddfcab82b056650be753fe7.zip
* Implement pre-filters that realizes concepts to check mail by some absolute values like:
- greylisting - DNS BL/WL - ratelimits
Diffstat (limited to 'src/worker.c')
-rw-r--r--src/worker.c95
1 files changed, 73 insertions, 22 deletions
diff --git a/src/worker.c b/src/worker.c
index d56c2d924..dad842ce3 100644
--- a/src/worker.c
+++ b/src/worker.c
@@ -247,6 +247,7 @@ construct_task (struct rspamd_worker *worker)
new_task->urls);
new_task->sock = -1;
new_task->is_mime = TRUE;
+ new_task->pre_result.action = METRIC_ACTION_NOACTION;
return new_task;
}
@@ -497,22 +498,31 @@ read_socket (f_str_t * in, void *arg)
return write_socket (task);
}
else {
- r = process_filters (task);
- if (r == -1) {
- task->last_error = "Filter processing error";
- task->error_code = RSPAMD_FILTER_ERROR;
- task->state = WRITE_ERROR;
- return write_socket (task);
- }
- /* Add task to classify to classify pool */
- if (ctx->classify_pool) {
- register_async_thread (task->s);
- g_thread_pool_push (ctx->classify_pool, task, &err);
- if (err != NULL) {
- msg_err ("cannot pull task to the pool: %s", err->message);
- remove_async_thread (task->s);
+ if (task->cfg->pre_filters == NULL) {
+ r = process_filters (task);
+ if (r == -1) {
+ task->last_error = "Filter processing error";
+ task->error_code = RSPAMD_FILTER_ERROR;
+ task->state = WRITE_ERROR;
+ return write_socket (task);
+ }
+ /* Add task to classify to classify pool */
+ if (ctx->classify_pool) {
+ register_async_thread (task->s);
+ g_thread_pool_push (ctx->classify_pool, task, &err);
+ if (err != NULL) {
+ msg_err ("cannot pull task to the pool: %s", err->message);
+ remove_async_thread (task->s);
+ }
}
}
+ else {
+ lua_call_pre_filters (task);
+ /* We want fin_task after pre filters are processed */
+ task->s->wanna_die = TRUE;
+ task->state = WAIT_PRE_FILTER;
+ check_session_pending (task->s);
+ }
}
break;
case WRITE_REPLY:
@@ -521,6 +531,7 @@ read_socket (f_str_t * in, void *arg)
break;
case WAIT_FILTER:
case WAIT_POST_FILTER:
+ case WAIT_PRE_FILTER:
msg_info ("ignoring trailing garbadge of size %z", in->len);
break;
default:
@@ -539,6 +550,8 @@ write_socket (void *arg)
{
struct worker_task *task = (struct worker_task *) arg;
struct rspamd_worker_ctx *ctx;
+ GError *err = NULL;
+ gint r;
ctx = task->worker->ctx;
@@ -578,6 +591,25 @@ write_socket (void *arg)
case WAIT_POST_FILTER:
/* Do nothing here */
break;
+ case WAIT_PRE_FILTER:
+ task->state = WAIT_FILTER;
+ r = process_filters (task);
+ if (r == -1) {
+ task->last_error = "Filter processing error";
+ task->error_code = RSPAMD_FILTER_ERROR;
+ task->state = WRITE_ERROR;
+ return write_socket (task);
+ }
+ /* Add task to classify to classify pool */
+ if (ctx->classify_pool) {
+ register_async_thread (task->s);
+ g_thread_pool_push (ctx->classify_pool, task, &err);
+ if (err != NULL) {
+ msg_err ("cannot pull task to the pool: %s", err->message);
+ remove_async_thread (task->s);
+ }
+ }
+ break;
default:
msg_info ("abnormally closing connection at state: %d", task->state);
if (ctx->is_custom) {
@@ -616,11 +648,12 @@ err_socket (GError * err, void *arg)
static gboolean
fin_task (void *arg)
{
- struct worker_task *task = (struct worker_task *) arg;
- struct rspamd_worker_ctx *ctx;
+ struct worker_task *task = (struct worker_task *) arg;
+ struct rspamd_worker_ctx *ctx;
+
ctx = task->worker->ctx;
- if (task->state != WAIT_POST_FILTER) {
+ if (task->state != WAIT_POST_FILTER && task->state != WAIT_PRE_FILTER) {
/* Process all statfiles */
if (ctx->classify_pool == NULL) {
/* Non-threaded version */
@@ -639,13 +672,31 @@ fin_task (void *arg)
}
- /* Check if we have all events finished */
- task->state = WRITE_REPLY;
- if (task->fin_callback) {
- task->fin_callback (task->fin_arg);
+ if (task->state != WAIT_PRE_FILTER) {
+ /* Check if we have all events finished */
+ task->state = WRITE_REPLY;
+ if (task->fin_callback) {
+ task->fin_callback (task->fin_arg);
+ }
+ else {
+ rspamd_dispatcher_restore (task->dispatcher);
+ }
}
else {
- rspamd_dispatcher_restore (task->dispatcher);
+ if (task->pre_result.action != METRIC_ACTION_NOACTION) {
+ /* Write result based on pre filters */
+ task->state = WRITE_REPLY;
+ if (task->fin_callback) {
+ task->fin_callback (task->fin_arg);
+ }
+ else {
+ rspamd_dispatcher_restore (task->dispatcher);
+ }
+ }
+ else {
+ /* Check normal filters in write callback */
+ rspamd_dispatcher_restore (task->dispatcher);
+ }
}
return TRUE;