aboutsummaryrefslogtreecommitdiffstats
path: root/src/controller.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/controller.c')
-rw-r--r--src/controller.c51
1 files changed, 36 insertions, 15 deletions
diff --git a/src/controller.c b/src/controller.c
index fc26b36d9..1b78b82f5 100644
--- a/src/controller.c
+++ b/src/controller.c
@@ -832,26 +832,47 @@ process_normal_command (const gchar *line)
return NULL;
}
+/*
+ * Called if all filters are processed
+ */
static void
fin_learn_task (void *arg)
{
- struct worker_task *task = (struct worker_task *)arg;
-
- /* XXX: this is bad logic in fact */
- /* Process all statfiles */
- process_statfiles (task);
- /* Call post filters */
- lua_call_post_filters (task);
- task->state = WRITE_REPLY;
-
- if (task->fin_callback) {
- task->fin_callback (task->fin_arg);
+ struct worker_task *task = (struct worker_task *) arg;
+
+ if (task->state != WRITING_REPLY) {
+ task->state = WRITE_REPLY;
+ /* Process all statfiles */
+ process_statfiles (task);
+ /* Call post filters */
+ lua_call_post_filters (task);
}
- else {
- rspamd_dispatcher_restore (task->dispatcher);
+
+ /* Check if we have all events finished */
+ if (task->state != WRITING_REPLY) {
+ if (task->fin_callback) {
+ task->fin_callback (task->fin_arg);
+ }
+ else {
+ rspamd_dispatcher_restore (task->dispatcher);
+ }
}
}
+/*
+ * Called if session was restored inside fin callback
+ */
+static void
+restore_learn_task (void *arg)
+{
+ struct worker_task *task = (struct worker_task *) arg;
+
+ /* Special state */
+ task->state = WRITING_REPLY;
+
+ rspamd_dispatcher_pause (task->dispatcher);
+}
+
static gboolean
controller_read_socket (f_str_t * in, void *arg)
{
@@ -971,7 +992,7 @@ controller_read_socket (f_str_t * in, void *arg)
return FALSE;
}
/* Set up async session */
- task->s = new_async_session (task->task_pool, fin_learn_task, free_task_hard, task);
+ task->s = new_async_session (task->task_pool, fin_learn_task, restore_learn_task, free_task_hard, task);
r = process_filters (task);
if (r == -1) {
session->state = STATE_REPLY;
@@ -1187,7 +1208,7 @@ accept_socket (gint fd, short what, void *arg)
io_tv->tv_sec = ctx->timeout / 1000;
io_tv->tv_usec = ctx->timeout - io_tv->tv_sec * 1000;
- new_session->s = new_async_session (new_session->session_pool, NULL, free_session, new_session);
+ new_session->s = new_async_session (new_session->session_pool, NULL, NULL, free_session, new_session);
new_session->dispatcher = rspamd_create_dispatcher (ctx->ev_base, nfd, BUFFER_LINE, controller_read_socket,
controller_write_socket, controller_err_socket, io_tv, (void *)new_session);