aboutsummaryrefslogtreecommitdiffstats
path: root/src/worker_util.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/worker_util.c')
-rw-r--r--src/worker_util.c114
1 files changed, 114 insertions, 0 deletions
diff --git a/src/worker_util.c b/src/worker_util.c
index 97dac2b2a..618ab6a5c 100644
--- a/src/worker_util.c
+++ b/src/worker_util.c
@@ -24,6 +24,7 @@
#include "config.h"
#include "main.h"
#include "message.h"
+#include "lua/lua_common.h"
extern struct rspamd_main *rspamd_main;
@@ -279,3 +280,116 @@ worker_stop_accept (struct rspamd_worker *worker)
g_list_free (worker->accept_events);
}
}
+
+/*
+ * Called if all filters are processed
+ * @return TRUE if session should be terminated
+ */
+gboolean
+rspamd_fin_task (void *arg)
+{
+ struct worker_task *task = (struct worker_task *) arg;
+ gint r;
+ GError *err = NULL;
+
+ /* Task is already finished or skipped */
+ if (task->state == WRITE_REPLY) {
+ if (task->fin_callback) {
+ task->fin_callback (task->fin_arg);
+ }
+ else {
+ rspamd_protocol_write_reply (task);
+ }
+ return TRUE;
+ }
+
+ /* We processed all filters and want to process statfiles */
+ if (task->state != WAIT_POST_FILTER && task->state != WAIT_PRE_FILTER) {
+ /* Process all statfiles */
+ if (task->classify_pool == NULL) {
+ /* Non-threaded version */
+ process_statfiles (task);
+ }
+ else {
+ /* Just process composites */
+ make_composites (task);
+ }
+ if (task->cfg->post_filters) {
+ /* More to process */
+ /* Special state */
+ task->state = WAIT_POST_FILTER;
+ return FALSE;
+ }
+
+ }
+
+ /* We are on post-filter waiting state */
+ if (task->state != WAIT_PRE_FILTER) {
+ /* Check if we have all events finished */
+ task->state = WRITE_REPLY;
+ if (task->fin_callback) {
+ task->fin_callback (task->fin_arg);
+ }
+ else {
+ rspamd_protocol_write_reply (task);
+ }
+ }
+ else {
+ /* We were waiting for pre-filter */
+ if (task->pre_result.action != METRIC_ACTION_NOACTION) {
+ /* Write result based on pre filters */
+ task->state = WRITE_REPLY;
+ if (task->fin_callback) {
+ task->fin_callback (task->fin_arg);
+ }
+ else {
+ rspamd_protocol_write_reply (task);
+ }
+ return TRUE;
+ }
+ else {
+ task->state = WAIT_FILTER;
+ r = process_filters (task);
+ if (r == -1) {
+ task->last_error = "Filter processing error";
+ task->error_code = RSPAMD_FILTER_ERROR;
+ task->state = WRITE_REPLY;
+ rspamd_protocol_write_reply (task);
+ return TRUE;
+ }
+ /* Add task to classify to classify pool */
+ if (!task->is_skipped && task->classify_pool) {
+ register_async_thread (task->s);
+ g_thread_pool_push (task->classify_pool, task, &err);
+ if (err != NULL) {
+ msg_err ("cannot pull task to the pool: %s", err->message);
+ remove_async_thread (task->s);
+ g_error_free (err);
+ }
+ }
+ if (task->is_skipped) {
+ rspamd_protocol_write_reply (task);
+ }
+ else {
+ return FALSE;
+ }
+ }
+ }
+
+ return TRUE;
+}
+
+/*
+ * Called if session was restored inside fin callback
+ */
+void
+rspamd_restore_task (void *arg)
+{
+ struct worker_task *task = (struct worker_task *) arg;
+
+ /* Call post filters */
+ if (task->state == WAIT_POST_FILTER) {
+ lua_call_post_filters (task);
+ }
+ task->s->wanna_die = TRUE;
+}