aboutsummaryrefslogtreecommitdiffstats
path: root/src/worker.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/worker.c')
-rw-r--r--src/worker.c37
1 files changed, 31 insertions, 6 deletions
diff --git a/src/worker.c b/src/worker.c
index a36a0ed72..f2c579fdc 100644
--- a/src/worker.c
+++ b/src/worker.c
@@ -399,7 +399,8 @@ fin_task (void *arg)
{
struct worker_task *task = (struct worker_task *) arg;
struct rspamd_worker_ctx *ctx;
-
+ gint r;
+ GError *err = NULL;
ctx = task->worker->ctx;
@@ -409,7 +410,7 @@ fin_task (void *arg)
task->fin_callback (task->fin_arg);
}
else {
- rspamd_dispatcher_restore (task->dispatcher);
+ write_reply (task);
}
return TRUE;
}
@@ -442,7 +443,7 @@ fin_task (void *arg)
task->fin_callback (task->fin_arg);
}
else {
- rspamd_dispatcher_restore (task->dispatcher);
+ write_reply (task);
}
}
else {
@@ -454,12 +455,31 @@ fin_task (void *arg)
task->fin_callback (task->fin_arg);
}
else {
- rspamd_dispatcher_restore (task->dispatcher);
+ write_reply (task);
}
}
else {
- /* Check normal filters in write callback */
- rspamd_dispatcher_restore (task->dispatcher);
+ task->state = WAIT_FILTER;
+ r = process_filters (task);
+ if (r == -1) {
+ task->last_error = "Filter processing error";
+ task->error_code = RSPAMD_FILTER_ERROR;
+ task->state = WRITE_ERROR;
+ write_reply (task);
+ }
+ /* Add task to classify to classify pool */
+ if (!task->is_skipped && ctx->classify_pool) {
+ register_async_thread (task->s);
+ g_thread_pool_push (ctx->classify_pool, task, &err);
+ if (err != NULL) {
+ msg_err ("cannot pull task to the pool: %s", err->message);
+ remove_async_thread (task->s);
+ g_error_free (err);
+ }
+ }
+ if (task->is_skipped) {
+ write_reply (task);
+ }
}
}
@@ -585,7 +605,12 @@ static void
rspamd_worker_finish_handler (struct rspamd_http_connection *conn,
struct rspamd_http_message *msg)
{
+ struct worker_task *task = (struct worker_task *) conn->ud;
+ if (task->state == CLOSING_CONNECTION) {
+ msg_debug ("normally closing connection from: %s", inet_ntoa (task->client_addr));
+ destroy_session (task->s);
+ }
}
/*