aboutsummaryrefslogtreecommitdiffstats
path: root/src/worker.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/worker.c')
-rw-r--r--src/worker.c122
1 files changed, 4 insertions, 118 deletions
diff --git a/src/worker.c b/src/worker.c
index 15b8ec62f..f11a288c8 100644
--- a/src/worker.c
+++ b/src/worker.c
@@ -154,122 +154,6 @@ sigusr1_handler (gint fd, short what, void *arg)
}
/*
- * Called if all filters are processed
- * @return TRUE if session should be terminated
- */
-static gboolean
-fin_task (void *arg)
-{
- struct worker_task *task = (struct worker_task *) arg;
- struct rspamd_worker_ctx *ctx;
- gint r;
- GError *err = NULL;
-
- ctx = task->worker->ctx;
-
- /* Task is already finished or skipped */
- if (task->state == WRITE_REPLY) {
- if (task->fin_callback) {
- task->fin_callback (task->fin_arg);
- }
- else {
- rspamd_protocol_write_reply (task);
- }
- return TRUE;
- }
-
- /* We processed all filters and want to process statfiles */
- if (task->state != WAIT_POST_FILTER && task->state != WAIT_PRE_FILTER) {
- /* Process all statfiles */
- if (ctx->classify_pool == NULL) {
- /* Non-threaded version */
- process_statfiles (task);
- }
- else {
- /* Just process composites */
- make_composites (task);
- }
- if (task->cfg->post_filters) {
- /* More to process */
- /* Special state */
- task->state = WAIT_POST_FILTER;
- return FALSE;
- }
-
- }
-
- /* We are on post-filter waiting state */
- if (task->state != WAIT_PRE_FILTER) {
- /* Check if we have all events finished */
- task->state = WRITE_REPLY;
- if (task->fin_callback) {
- task->fin_callback (task->fin_arg);
- }
- else {
- rspamd_protocol_write_reply (task);
- }
- }
- else {
- /* We were waiting for pre-filter */
- if (task->pre_result.action != METRIC_ACTION_NOACTION) {
- /* Write result based on pre filters */
- task->state = WRITE_REPLY;
- if (task->fin_callback) {
- task->fin_callback (task->fin_arg);
- }
- else {
- rspamd_protocol_write_reply (task);
- }
- return TRUE;
- }
- else {
- task->state = WAIT_FILTER;
- r = process_filters (task);
- if (r == -1) {
- task->last_error = "Filter processing error";
- task->error_code = RSPAMD_FILTER_ERROR;
- task->state = WRITE_REPLY;
- rspamd_protocol_write_reply (task);
- return TRUE;
- }
- /* Add task to classify to classify pool */
- if (!task->is_skipped && ctx->classify_pool) {
- register_async_thread (task->s);
- g_thread_pool_push (ctx->classify_pool, task, &err);
- if (err != NULL) {
- msg_err ("cannot pull task to the pool: %s", err->message);
- remove_async_thread (task->s);
- g_error_free (err);
- }
- }
- if (task->is_skipped) {
- rspamd_protocol_write_reply (task);
- }
- else {
- return FALSE;
- }
- }
- }
-
- return TRUE;
-}
-
-/*
- * Called if session was restored inside fin callback
- */
-static void
-restore_task (void *arg)
-{
- struct worker_task *task = (struct worker_task *) arg;
-
- /* Call post filters */
- if (task->state == WAIT_POST_FILTER) {
- lua_call_post_filters (task);
- }
- task->s->wanna_die = TRUE;
-}
-
-/*
* Reduce number of tasks proceeded
*/
static void
@@ -478,8 +362,10 @@ accept_socket (gint fd, short what, void *arg)
memory_pool_add_destructor (new_task->task_pool, (pool_destruct_func)reduce_tasks_count, &ctx->tasks);
/* Set up async session */
- new_task->s =
- new_async_session (new_task->task_pool, fin_task, restore_task, free_task_hard, new_task);
+ new_task->s = new_async_session (new_task->task_pool, rspamd_fin_task,
+ rspamd_restore_task, free_task_hard, new_task);
+
+ new_task->classify_pool = ctx->classify_pool;
rspamd_http_connection_read_message (new_task->http_conn, new_task, nfd, &ctx->io_tv, ctx->ev_base);
}