diff options
Diffstat (limited to 'src/worker.c')
-rw-r--r-- | src/worker.c | 122 |
1 files changed, 4 insertions, 118 deletions
diff --git a/src/worker.c b/src/worker.c index 15b8ec62f..f11a288c8 100644 --- a/src/worker.c +++ b/src/worker.c @@ -154,122 +154,6 @@ sigusr1_handler (gint fd, short what, void *arg) } /* - * Called if all filters are processed - * @return TRUE if session should be terminated - */ -static gboolean -fin_task (void *arg) -{ - struct worker_task *task = (struct worker_task *) arg; - struct rspamd_worker_ctx *ctx; - gint r; - GError *err = NULL; - - ctx = task->worker->ctx; - - /* Task is already finished or skipped */ - if (task->state == WRITE_REPLY) { - if (task->fin_callback) { - task->fin_callback (task->fin_arg); - } - else { - rspamd_protocol_write_reply (task); - } - return TRUE; - } - - /* We processed all filters and want to process statfiles */ - if (task->state != WAIT_POST_FILTER && task->state != WAIT_PRE_FILTER) { - /* Process all statfiles */ - if (ctx->classify_pool == NULL) { - /* Non-threaded version */ - process_statfiles (task); - } - else { - /* Just process composites */ - make_composites (task); - } - if (task->cfg->post_filters) { - /* More to process */ - /* Special state */ - task->state = WAIT_POST_FILTER; - return FALSE; - } - - } - - /* We are on post-filter waiting state */ - if (task->state != WAIT_PRE_FILTER) { - /* Check if we have all events finished */ - task->state = WRITE_REPLY; - if (task->fin_callback) { - task->fin_callback (task->fin_arg); - } - else { - rspamd_protocol_write_reply (task); - } - } - else { - /* We were waiting for pre-filter */ - if (task->pre_result.action != METRIC_ACTION_NOACTION) { - /* Write result based on pre filters */ - task->state = WRITE_REPLY; - if (task->fin_callback) { - task->fin_callback (task->fin_arg); - } - else { - rspamd_protocol_write_reply (task); - } - return TRUE; - } - else { - task->state = WAIT_FILTER; - r = process_filters (task); - if (r == -1) { - task->last_error = "Filter processing error"; - task->error_code = RSPAMD_FILTER_ERROR; - task->state = WRITE_REPLY; - rspamd_protocol_write_reply (task); - return TRUE; - } - /* Add task to classify to classify pool */ - if (!task->is_skipped && ctx->classify_pool) { - register_async_thread (task->s); - g_thread_pool_push (ctx->classify_pool, task, &err); - if (err != NULL) { - msg_err ("cannot pull task to the pool: %s", err->message); - remove_async_thread (task->s); - g_error_free (err); - } - } - if (task->is_skipped) { - rspamd_protocol_write_reply (task); - } - else { - return FALSE; - } - } - } - - return TRUE; -} - -/* - * Called if session was restored inside fin callback - */ -static void -restore_task (void *arg) -{ - struct worker_task *task = (struct worker_task *) arg; - - /* Call post filters */ - if (task->state == WAIT_POST_FILTER) { - lua_call_post_filters (task); - } - task->s->wanna_die = TRUE; -} - -/* * Reduce number of tasks proceeded */ static void @@ -478,8 +362,10 @@ accept_socket (gint fd, short what, void *arg) memory_pool_add_destructor (new_task->task_pool, (pool_destruct_func)reduce_tasks_count, &ctx->tasks); /* Set up async session */ - new_task->s = - new_async_session (new_task->task_pool, fin_task, restore_task, free_task_hard, new_task); + new_task->s = new_async_session (new_task->task_pool, rspamd_fin_task, + rspamd_restore_task, free_task_hard, new_task); + + new_task->classify_pool = ctx->classify_pool; rspamd_http_connection_read_message (new_task->http_conn, new_task, nfd, &ctx->io_tv, ctx->ev_base); } |