aboutsummaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/main.h13
-rw-r--r--src/worker.c122
-rw-r--r--src/worker_util.c114
3 files changed, 131 insertions, 118 deletions
diff --git a/src/main.h b/src/main.h
index 3acd5a870..338c09f45 100644
--- a/src/main.h
+++ b/src/main.h
@@ -257,6 +257,8 @@ struct worker_task {
struct rspamd_dns_resolver *resolver; /**< DNS resolver */
struct event_base *ev_base; /**< Event base */
+ GThreadPool *classify_pool; /**< A pool of classify threads */
+
struct {
enum rspamd_metric_action action; /**< Action of pre filters */
gchar *str; /**< String describing action */
@@ -302,6 +304,17 @@ void free_task_hard (gpointer ud);
void free_task_soft (gpointer ud);
/**
+ * Called if session was restored inside fin callback
+ */
+void rspamd_restore_task (void *arg);
+
+/**
+ * Called if all filters are processed
+ * @return TRUE if session should be terminated
+ */
+gboolean rspamd_fin_task (void *arg);
+
+/**
* Set counter for a symbol
*/
double set_counter (const gchar *name, guint32 value);
diff --git a/src/worker.c b/src/worker.c
index 15b8ec62f..f11a288c8 100644
--- a/src/worker.c
+++ b/src/worker.c
@@ -154,122 +154,6 @@ sigusr1_handler (gint fd, short what, void *arg)
}
/*
- * Called if all filters are processed
- * @return TRUE if session should be terminated
- */
-static gboolean
-fin_task (void *arg)
-{
- struct worker_task *task = (struct worker_task *) arg;
- struct rspamd_worker_ctx *ctx;
- gint r;
- GError *err = NULL;
-
- ctx = task->worker->ctx;
-
- /* Task is already finished or skipped */
- if (task->state == WRITE_REPLY) {
- if (task->fin_callback) {
- task->fin_callback (task->fin_arg);
- }
- else {
- rspamd_protocol_write_reply (task);
- }
- return TRUE;
- }
-
- /* We processed all filters and want to process statfiles */
- if (task->state != WAIT_POST_FILTER && task->state != WAIT_PRE_FILTER) {
- /* Process all statfiles */
- if (ctx->classify_pool == NULL) {
- /* Non-threaded version */
- process_statfiles (task);
- }
- else {
- /* Just process composites */
- make_composites (task);
- }
- if (task->cfg->post_filters) {
- /* More to process */
- /* Special state */
- task->state = WAIT_POST_FILTER;
- return FALSE;
- }
-
- }
-
- /* We are on post-filter waiting state */
- if (task->state != WAIT_PRE_FILTER) {
- /* Check if we have all events finished */
- task->state = WRITE_REPLY;
- if (task->fin_callback) {
- task->fin_callback (task->fin_arg);
- }
- else {
- rspamd_protocol_write_reply (task);
- }
- }
- else {
- /* We were waiting for pre-filter */
- if (task->pre_result.action != METRIC_ACTION_NOACTION) {
- /* Write result based on pre filters */
- task->state = WRITE_REPLY;
- if (task->fin_callback) {
- task->fin_callback (task->fin_arg);
- }
- else {
- rspamd_protocol_write_reply (task);
- }
- return TRUE;
- }
- else {
- task->state = WAIT_FILTER;
- r = process_filters (task);
- if (r == -1) {
- task->last_error = "Filter processing error";
- task->error_code = RSPAMD_FILTER_ERROR;
- task->state = WRITE_REPLY;
- rspamd_protocol_write_reply (task);
- return TRUE;
- }
- /* Add task to classify to classify pool */
- if (!task->is_skipped && ctx->classify_pool) {
- register_async_thread (task->s);
- g_thread_pool_push (ctx->classify_pool, task, &err);
- if (err != NULL) {
- msg_err ("cannot pull task to the pool: %s", err->message);
- remove_async_thread (task->s);
- g_error_free (err);
- }
- }
- if (task->is_skipped) {
- rspamd_protocol_write_reply (task);
- }
- else {
- return FALSE;
- }
- }
- }
-
- return TRUE;
-}
-
-/*
- * Called if session was restored inside fin callback
- */
-static void
-restore_task (void *arg)
-{
- struct worker_task *task = (struct worker_task *) arg;
-
- /* Call post filters */
- if (task->state == WAIT_POST_FILTER) {
- lua_call_post_filters (task);
- }
- task->s->wanna_die = TRUE;
-}
-
-/*
* Reduce number of tasks proceeded
*/
static void
@@ -478,8 +362,10 @@ accept_socket (gint fd, short what, void *arg)
memory_pool_add_destructor (new_task->task_pool, (pool_destruct_func)reduce_tasks_count, &ctx->tasks);
/* Set up async session */
- new_task->s =
- new_async_session (new_task->task_pool, fin_task, restore_task, free_task_hard, new_task);
+ new_task->s = new_async_session (new_task->task_pool, rspamd_fin_task,
+ rspamd_restore_task, free_task_hard, new_task);
+
+ new_task->classify_pool = ctx->classify_pool;
rspamd_http_connection_read_message (new_task->http_conn, new_task, nfd, &ctx->io_tv, ctx->ev_base);
}
diff --git a/src/worker_util.c b/src/worker_util.c
index 97dac2b2a..618ab6a5c 100644
--- a/src/worker_util.c
+++ b/src/worker_util.c
@@ -24,6 +24,7 @@
#include "config.h"
#include "main.h"
#include "message.h"
+#include "lua/lua_common.h"
extern struct rspamd_main *rspamd_main;
@@ -279,3 +280,116 @@ worker_stop_accept (struct rspamd_worker *worker)
g_list_free (worker->accept_events);
}
}
+
+/*
+ * Called if all filters are processed
+ * @return TRUE if session should be terminated
+ */
+gboolean
+rspamd_fin_task (void *arg)
+{
+ struct worker_task *task = (struct worker_task *) arg;
+ gint r;
+ GError *err = NULL;
+
+ /* Task is already finished or skipped */
+ if (task->state == WRITE_REPLY) {
+ if (task->fin_callback) {
+ task->fin_callback (task->fin_arg);
+ }
+ else {
+ rspamd_protocol_write_reply (task);
+ }
+ return TRUE;
+ }
+
+ /* We processed all filters and want to process statfiles */
+ if (task->state != WAIT_POST_FILTER && task->state != WAIT_PRE_FILTER) {
+ /* Process all statfiles */
+ if (task->classify_pool == NULL) {
+ /* Non-threaded version */
+ process_statfiles (task);
+ }
+ else {
+ /* Just process composites */
+ make_composites (task);
+ }
+ if (task->cfg->post_filters) {
+ /* More to process */
+ /* Special state */
+ task->state = WAIT_POST_FILTER;
+ return FALSE;
+ }
+
+ }
+
+ /* We are on post-filter waiting state */
+ if (task->state != WAIT_PRE_FILTER) {
+ /* Check if we have all events finished */
+ task->state = WRITE_REPLY;
+ if (task->fin_callback) {
+ task->fin_callback (task->fin_arg);
+ }
+ else {
+ rspamd_protocol_write_reply (task);
+ }
+ }
+ else {
+ /* We were waiting for pre-filter */
+ if (task->pre_result.action != METRIC_ACTION_NOACTION) {
+ /* Write result based on pre filters */
+ task->state = WRITE_REPLY;
+ if (task->fin_callback) {
+ task->fin_callback (task->fin_arg);
+ }
+ else {
+ rspamd_protocol_write_reply (task);
+ }
+ return TRUE;
+ }
+ else {
+ task->state = WAIT_FILTER;
+ r = process_filters (task);
+ if (r == -1) {
+ task->last_error = "Filter processing error";
+ task->error_code = RSPAMD_FILTER_ERROR;
+ task->state = WRITE_REPLY;
+ rspamd_protocol_write_reply (task);
+ return TRUE;
+ }
+ /* Add task to classify to classify pool */
+ if (!task->is_skipped && task->classify_pool) {
+ register_async_thread (task->s);
+ g_thread_pool_push (task->classify_pool, task, &err);
+ if (err != NULL) {
+ msg_err ("cannot pull task to the pool: %s", err->message);
+ remove_async_thread (task->s);
+ g_error_free (err);
+ }
+ }
+ if (task->is_skipped) {
+ rspamd_protocol_write_reply (task);
+ }
+ else {
+ return FALSE;
+ }
+ }
+ }
+
+ return TRUE;
+}
+
+/*
+ * Called if session was restored inside fin callback
+ */
+void
+rspamd_restore_task (void *arg)
+{
+ struct worker_task *task = (struct worker_task *) arg;
+
+ /* Call post filters */
+ if (task->state == WAIT_POST_FILTER) {
+ lua_call_post_filters (task);
+ }
+ task->s->wanna_die = TRUE;
+}