aboutsummaryrefslogtreecommitdiffstats
path: root/src/worker.c
diff options
context:
space:
mode:
authorVsevolod Stakhov <vsevolod@highsecure.ru>2014-01-13 17:40:15 +0000
committerVsevolod Stakhov <vsevolod@highsecure.ru>2014-01-13 17:40:15 +0000
commitf16357c9ac57de02a6f3a2c67774aa4976329d71 (patch)
treedefa261880fb773943752dbfa63f94cf6e5816c4 /src/worker.c
parent6e61846cdac04d690a1ebaf0ca7fc2862efb2b0c (diff)
downloadrspamd-f16357c9ac57de02a6f3a2c67774aa4976329d71.tar.gz
rspamd-f16357c9ac57de02a6f3a2c67774aa4976329d71.zip
Implement HTTP session for normal worker.
Diffstat (limited to 'src/worker.c')
-rw-r--r--src/worker.c37
1 files changed, 31 insertions, 6 deletions
diff --git a/src/worker.c b/src/worker.c
index a36a0ed72..f2c579fdc 100644
--- a/src/worker.c
+++ b/src/worker.c
@@ -399,7 +399,8 @@ fin_task (void *arg)
{
struct worker_task *task = (struct worker_task *) arg;
struct rspamd_worker_ctx *ctx;
-
+ gint r;
+ GError *err = NULL;
ctx = task->worker->ctx;
@@ -409,7 +410,7 @@ fin_task (void *arg)
task->fin_callback (task->fin_arg);
}
else {
- rspamd_dispatcher_restore (task->dispatcher);
+ write_reply (task);
}
return TRUE;
}
@@ -442,7 +443,7 @@ fin_task (void *arg)
task->fin_callback (task->fin_arg);
}
else {
- rspamd_dispatcher_restore (task->dispatcher);
+ write_reply (task);
}
}
else {
@@ -454,12 +455,31 @@ fin_task (void *arg)
task->fin_callback (task->fin_arg);
}
else {
- rspamd_dispatcher_restore (task->dispatcher);
+ write_reply (task);
}
}
else {
- /* Check normal filters in write callback */
- rspamd_dispatcher_restore (task->dispatcher);
+ task->state = WAIT_FILTER;
+ r = process_filters (task);
+ if (r == -1) {
+ task->last_error = "Filter processing error";
+ task->error_code = RSPAMD_FILTER_ERROR;
+ task->state = WRITE_ERROR;
+ write_reply (task);
+ }
+ /* Add task to classify to classify pool */
+ if (!task->is_skipped && ctx->classify_pool) {
+ register_async_thread (task->s);
+ g_thread_pool_push (ctx->classify_pool, task, &err);
+ if (err != NULL) {
+ msg_err ("cannot pull task to the pool: %s", err->message);
+ remove_async_thread (task->s);
+ g_error_free (err);
+ }
+ }
+ if (task->is_skipped) {
+ write_reply (task);
+ }
}
}
@@ -585,7 +605,12 @@ static void
rspamd_worker_finish_handler (struct rspamd_http_connection *conn,
struct rspamd_http_message *msg)
{
+ struct worker_task *task = (struct worker_task *) conn->ud;
+ if (task->state == CLOSING_CONNECTION) {
+ msg_debug ("normally closing connection from: %s", inet_ntoa (task->client_addr));
+ destroy_session (task->s);
+ }
}
/*