diff options
Diffstat (limited to 'src/worker_util.c')
-rw-r--r-- | src/worker_util.c | 114 |
1 files changed, 114 insertions, 0 deletions
diff --git a/src/worker_util.c b/src/worker_util.c index 97dac2b2a..618ab6a5c 100644 --- a/src/worker_util.c +++ b/src/worker_util.c @@ -24,6 +24,7 @@ #include "config.h" #include "main.h" #include "message.h" +#include "lua/lua_common.h" extern struct rspamd_main *rspamd_main; @@ -279,3 +280,116 @@ worker_stop_accept (struct rspamd_worker *worker) g_list_free (worker->accept_events); } } + +/* + * Called if all filters are processed + * @return TRUE if session should be terminated + */ +gboolean +rspamd_fin_task (void *arg) +{ + struct worker_task *task = (struct worker_task *) arg; + gint r; + GError *err = NULL; + + /* Task is already finished or skipped */ + if (task->state == WRITE_REPLY) { + if (task->fin_callback) { + task->fin_callback (task->fin_arg); + } + else { + rspamd_protocol_write_reply (task); + } + return TRUE; + } + + /* We processed all filters and want to process statfiles */ + if (task->state != WAIT_POST_FILTER && task->state != WAIT_PRE_FILTER) { + /* Process all statfiles */ + if (task->classify_pool == NULL) { + /* Non-threaded version */ + process_statfiles (task); + } + else { + /* Just process composites */ + make_composites (task); + } + if (task->cfg->post_filters) { + /* More to process */ + /* Special state */ + task->state = WAIT_POST_FILTER; + return FALSE; + } + + } + + /* We are on post-filter waiting state */ + if (task->state != WAIT_PRE_FILTER) { + /* Check if we have all events finished */ + task->state = WRITE_REPLY; + if (task->fin_callback) { + task->fin_callback (task->fin_arg); + } + else { + rspamd_protocol_write_reply (task); + } + } + else { + /* We were waiting for pre-filter */ + if (task->pre_result.action != METRIC_ACTION_NOACTION) { + /* Write result based on pre filters */ + task->state = WRITE_REPLY; + if (task->fin_callback) { + task->fin_callback (task->fin_arg); + } + else { + rspamd_protocol_write_reply (task); + } + return TRUE; + } + else { + task->state = WAIT_FILTER; + r = process_filters (task); + if (r == -1) { + task->last_error = "Filter processing error"; + task->error_code = RSPAMD_FILTER_ERROR; + task->state = WRITE_REPLY; + rspamd_protocol_write_reply (task); + return TRUE; + } + /* Add task to classify to classify pool */ + if (!task->is_skipped && task->classify_pool) { + register_async_thread (task->s); + g_thread_pool_push (task->classify_pool, task, &err); + if (err != NULL) { + msg_err ("cannot pull task to the pool: %s", err->message); + remove_async_thread (task->s); + g_error_free (err); + } + } + if (task->is_skipped) { + rspamd_protocol_write_reply (task); + } + else { + return FALSE; + } + } + } + + return TRUE; +} + +/* + * Called if session was restored inside fin callback + */ +void +rspamd_restore_task (void *arg) +{ + struct worker_task *task = (struct worker_task *) arg; + + /* Call post filters */ + if (task->state == WAIT_POST_FILTER) { + lua_call_post_filters (task); + } + task->s->wanna_die = TRUE; +} |