]> source.dussan.org Git - rspamd.git/commitdiff
Move fin_task to a common rspamd_fin_task function.
authorVsevolod Stakhov <vsevolod@highsecure.ru>
Tue, 21 Jan 2014 12:54:46 +0000 (12:54 +0000)
committerVsevolod Stakhov <vsevolod@highsecure.ru>
Tue, 21 Jan 2014 12:54:46 +0000 (12:54 +0000)
src/main.h
src/worker.c
src/worker_util.c

index 3acd5a8705c60d6187e538b446cedbe3dbd542e7..338c09f455b2624165141c8733afe24fee010317 100644 (file)
@@ -257,6 +257,8 @@ struct worker_task {
        struct rspamd_dns_resolver *resolver;                                           /**< DNS resolver                                                                       */
        struct event_base *ev_base;                                                                     /**< Event base                                                                         */
 
+       GThreadPool *classify_pool;                                                                     /**< A pool of classify threads                                         */
+
        struct {
                enum rspamd_metric_action action;                                               /**< Action of pre filters                                                      */
                gchar *str;                                                                                             /**< String describing action                                           */
@@ -301,6 +303,17 @@ void free_task (struct worker_task *task, gboolean is_soft);
 void free_task_hard (gpointer ud);
 void free_task_soft (gpointer ud);
 
+/**
+ * Called if session was restored inside fin callback
+ */
+void rspamd_restore_task (void *arg);
+
+/**
+ * Called if all filters are processed
+ * @return TRUE if session should be terminated
+ */
+gboolean rspamd_fin_task (void *arg);
+
 /**
  * Set counter for a symbol
  */
index 15b8ec62f69bacde2b7af3419ef5f23a522cf6d6..f11a288c8321c4861582526ee1bab5099bf0eeaf 100644 (file)
@@ -153,122 +153,6 @@ sigusr1_handler (gint fd, short what, void *arg)
        return;
 }
 
-/*
- * Called if all filters are processed
- * @return TRUE if session should be terminated
- */
-static gboolean
-fin_task (void *arg)
-{
-       struct worker_task              *task = (struct worker_task *) arg;
-       struct rspamd_worker_ctx        *ctx;
-       gint r;
-       GError *err = NULL;
-
-       ctx = task->worker->ctx;
-
-       /* Task is already finished or skipped */
-       if (task->state == WRITE_REPLY) {
-               if (task->fin_callback) {
-                       task->fin_callback (task->fin_arg);
-               }
-               else {
-                       rspamd_protocol_write_reply (task);
-               }
-               return TRUE;
-       }
-
-       /* We processed all filters and want to process statfiles */
-       if (task->state != WAIT_POST_FILTER && task->state != WAIT_PRE_FILTER) {
-               /* Process all statfiles */
-               if (ctx->classify_pool == NULL) {
-                       /* Non-threaded version */
-                       process_statfiles (task);
-               }
-               else {
-                       /* Just process composites */
-                       make_composites (task);
-               }
-               if (task->cfg->post_filters) {
-                       /* More to process */
-                       /* Special state */
-                       task->state = WAIT_POST_FILTER;
-                       return FALSE;
-               }
-
-       }
-
-       /* We are on post-filter waiting state */
-       if (task->state != WAIT_PRE_FILTER) {
-               /* Check if we have all events finished */
-               task->state = WRITE_REPLY;
-               if (task->fin_callback) {
-                       task->fin_callback (task->fin_arg);
-               }
-               else {
-                       rspamd_protocol_write_reply (task);
-               }
-       }
-       else {
-               /* We were waiting for pre-filter */
-               if (task->pre_result.action != METRIC_ACTION_NOACTION) {
-                       /* Write result based on pre filters */
-                       task->state = WRITE_REPLY;
-                       if (task->fin_callback) {
-                               task->fin_callback (task->fin_arg);
-                       }
-                       else {
-                               rspamd_protocol_write_reply (task);
-                       }
-                       return TRUE;
-               }
-               else {
-                       task->state = WAIT_FILTER;
-                       r = process_filters (task);
-                       if (r == -1) {
-                               task->last_error = "Filter processing error";
-                               task->error_code = RSPAMD_FILTER_ERROR;
-                               task->state = WRITE_REPLY;
-                               rspamd_protocol_write_reply (task);
-                               return TRUE;
-                       }
-                       /* Add task to classify to classify pool */
-                       if (!task->is_skipped && ctx->classify_pool) {
-                               register_async_thread (task->s);
-                               g_thread_pool_push (ctx->classify_pool, task, &err);
-                               if (err != NULL) {
-                                       msg_err ("cannot pull task to the pool: %s", err->message);
-                                       remove_async_thread (task->s);
-                                       g_error_free (err);
-                               }
-                       }
-                       if (task->is_skipped) {
-                               rspamd_protocol_write_reply (task);
-                       }
-                       else {
-                               return FALSE;
-                       }
-               }
-       }
-
-       return TRUE;
-}
-
-/*
- * Called if session was restored inside fin callback
- */
-static void
-restore_task (void *arg)
-{
-       struct worker_task             *task = (struct worker_task *) arg;
-
-       /* Call post filters */
-       if (task->state == WAIT_POST_FILTER) {
-               lua_call_post_filters (task);
-       }
-       task->s->wanna_die = TRUE;
-}
-
 /*
  * Reduce number of tasks proceeded
  */
@@ -478,8 +362,10 @@ accept_socket (gint fd, short what, void *arg)
        memory_pool_add_destructor (new_task->task_pool, (pool_destruct_func)reduce_tasks_count, &ctx->tasks);
 
        /* Set up async session */
-       new_task->s =
-                               new_async_session (new_task->task_pool, fin_task, restore_task, free_task_hard, new_task);
+       new_task->s = new_async_session (new_task->task_pool, rspamd_fin_task,
+                                               rspamd_restore_task, free_task_hard, new_task);
+
+       new_task->classify_pool = ctx->classify_pool;
 
        rspamd_http_connection_read_message (new_task->http_conn, new_task, nfd, &ctx->io_tv, ctx->ev_base);
 }
index 97dac2b2aa1beeb29a0757b5436ca357f04db4ed..618ab6a5c92d458ded978760d552b994ef9eea0d 100644 (file)
@@ -24,6 +24,7 @@
 #include "config.h"
 #include "main.h"
 #include "message.h"
+#include "lua/lua_common.h"
 
 extern struct rspamd_main                      *rspamd_main;
 
@@ -279,3 +280,116 @@ worker_stop_accept (struct rspamd_worker *worker)
                g_list_free (worker->accept_events);
        }
 }
+
+/*
+ * Called if all filters are processed
+ * @return TRUE if session should be terminated
+ */
+gboolean
+rspamd_fin_task (void *arg)
+{
+       struct worker_task              *task = (struct worker_task *) arg;
+       gint r;
+       GError *err = NULL;
+
+       /* Task is already finished or skipped */
+       if (task->state == WRITE_REPLY) {
+               if (task->fin_callback) {
+                       task->fin_callback (task->fin_arg);
+               }
+               else {
+                       rspamd_protocol_write_reply (task);
+               }
+               return TRUE;
+       }
+
+       /* We processed all filters and want to process statfiles */
+       if (task->state != WAIT_POST_FILTER && task->state != WAIT_PRE_FILTER) {
+               /* Process all statfiles */
+               if (task->classify_pool == NULL) {
+                       /* Non-threaded version */
+                       process_statfiles (task);
+               }
+               else {
+                       /* Just process composites */
+                       make_composites (task);
+               }
+               if (task->cfg->post_filters) {
+                       /* More to process */
+                       /* Special state */
+                       task->state = WAIT_POST_FILTER;
+                       return FALSE;
+               }
+
+       }
+
+       /* We are on post-filter waiting state */
+       if (task->state != WAIT_PRE_FILTER) {
+               /* Check if we have all events finished */
+               task->state = WRITE_REPLY;
+               if (task->fin_callback) {
+                       task->fin_callback (task->fin_arg);
+               }
+               else {
+                       rspamd_protocol_write_reply (task);
+               }
+       }
+       else {
+               /* We were waiting for pre-filter */
+               if (task->pre_result.action != METRIC_ACTION_NOACTION) {
+                       /* Write result based on pre filters */
+                       task->state = WRITE_REPLY;
+                       if (task->fin_callback) {
+                               task->fin_callback (task->fin_arg);
+                       }
+                       else {
+                               rspamd_protocol_write_reply (task);
+                       }
+                       return TRUE;
+               }
+               else {
+                       task->state = WAIT_FILTER;
+                       r = process_filters (task);
+                       if (r == -1) {
+                               task->last_error = "Filter processing error";
+                               task->error_code = RSPAMD_FILTER_ERROR;
+                               task->state = WRITE_REPLY;
+                               rspamd_protocol_write_reply (task);
+                               return TRUE;
+                       }
+                       /* Add task to classify to classify pool */
+                       if (!task->is_skipped && task->classify_pool) {
+                               register_async_thread (task->s);
+                               g_thread_pool_push (task->classify_pool, task, &err);
+                               if (err != NULL) {
+                                       msg_err ("cannot pull task to the pool: %s", err->message);
+                                       remove_async_thread (task->s);
+                                       g_error_free (err);
+                               }
+                       }
+                       if (task->is_skipped) {
+                               rspamd_protocol_write_reply (task);
+                       }
+                       else {
+                               return FALSE;
+                       }
+               }
+       }
+
+       return TRUE;
+}
+
+/*
+ * Called if session was restored inside fin callback
+ */
+void
+rspamd_restore_task (void *arg)
+{
+       struct worker_task             *task = (struct worker_task *) arg;
+
+       /* Call post filters */
+       if (task->state == WAIT_POST_FILTER) {
+               lua_call_post_filters (task);
+       }
+       task->s->wanna_die = TRUE;
+}