diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/main.h | 13 | ||||
-rw-r--r-- | src/worker.c | 122 | ||||
-rw-r--r-- | src/worker_util.c | 114 |
3 files changed, 131 insertions, 118 deletions
diff --git a/src/main.h b/src/main.h index 3acd5a870..338c09f45 100644 --- a/src/main.h +++ b/src/main.h @@ -257,6 +257,8 @@ struct worker_task { struct rspamd_dns_resolver *resolver; /**< DNS resolver */ struct event_base *ev_base; /**< Event base */ + GThreadPool *classify_pool; /**< A pool of classify threads */ + struct { enum rspamd_metric_action action; /**< Action of pre filters */ gchar *str; /**< String describing action */ @@ -302,6 +304,17 @@ void free_task_hard (gpointer ud); void free_task_soft (gpointer ud); /** + * Called if session was restored inside fin callback + */ +void rspamd_restore_task (void *arg); + +/** + * Called if all filters are processed + * @return TRUE if session should be terminated + */ +gboolean rspamd_fin_task (void *arg); + +/** * Set counter for a symbol */ double set_counter (const gchar *name, guint32 value); diff --git a/src/worker.c b/src/worker.c index 15b8ec62f..f11a288c8 100644 --- a/src/worker.c +++ b/src/worker.c @@ -154,122 +154,6 @@ sigusr1_handler (gint fd, short what, void *arg) } /* - * Called if all filters are processed - * @return TRUE if session should be terminated - */ -static gboolean -fin_task (void *arg) -{ - struct worker_task *task = (struct worker_task *) arg; - struct rspamd_worker_ctx *ctx; - gint r; - GError *err = NULL; - - ctx = task->worker->ctx; - - /* Task is already finished or skipped */ - if (task->state == WRITE_REPLY) { - if (task->fin_callback) { - task->fin_callback (task->fin_arg); - } - else { - rspamd_protocol_write_reply (task); - } - return TRUE; - } - - /* We processed all filters and want to process statfiles */ - if (task->state != WAIT_POST_FILTER && task->state != WAIT_PRE_FILTER) { - /* Process all statfiles */ - if (ctx->classify_pool == NULL) { - /* Non-threaded version */ - process_statfiles (task); - } - else { - /* Just process composites */ - make_composites (task); - } - if (task->cfg->post_filters) { - /* More to process */ - /* Special state */ - task->state = WAIT_POST_FILTER; - return FALSE; - } - - } - - /* We are on post-filter waiting state */ - if (task->state != WAIT_PRE_FILTER) { - /* Check if we have all events finished */ - task->state = WRITE_REPLY; - if (task->fin_callback) { - task->fin_callback (task->fin_arg); - } - else { - rspamd_protocol_write_reply (task); - } - } - else { - /* We were waiting for pre-filter */ - if (task->pre_result.action != METRIC_ACTION_NOACTION) { - /* Write result based on pre filters */ - task->state = WRITE_REPLY; - if (task->fin_callback) { - task->fin_callback (task->fin_arg); - } - else { - rspamd_protocol_write_reply (task); - } - return TRUE; - } - else { - task->state = WAIT_FILTER; - r = process_filters (task); - if (r == -1) { - task->last_error = "Filter processing error"; - task->error_code = RSPAMD_FILTER_ERROR; - task->state = WRITE_REPLY; - rspamd_protocol_write_reply (task); - return TRUE; - } - /* Add task to classify to classify pool */ - if (!task->is_skipped && ctx->classify_pool) { - register_async_thread (task->s); - g_thread_pool_push (ctx->classify_pool, task, &err); - if (err != NULL) { - msg_err ("cannot pull task to the pool: %s", err->message); - remove_async_thread (task->s); - g_error_free (err); - } - } - if (task->is_skipped) { - rspamd_protocol_write_reply (task); - } - else { - return FALSE; - } - } - } - - return TRUE; -} - -/* - * Called if session was restored inside fin callback - */ -static void -restore_task (void *arg) -{ - struct worker_task *task = (struct worker_task *) arg; - - /* Call post filters */ - if (task->state == WAIT_POST_FILTER) { - lua_call_post_filters (task); - } - task->s->wanna_die = TRUE; -} - -/* * Reduce number of tasks proceeded */ static void @@ -478,8 +362,10 @@ accept_socket (gint fd, short what, void *arg) memory_pool_add_destructor (new_task->task_pool, (pool_destruct_func)reduce_tasks_count, &ctx->tasks); /* Set up async session */ - new_task->s = - new_async_session (new_task->task_pool, fin_task, restore_task, free_task_hard, new_task); + new_task->s = new_async_session (new_task->task_pool, rspamd_fin_task, + rspamd_restore_task, free_task_hard, new_task); + + new_task->classify_pool = ctx->classify_pool; rspamd_http_connection_read_message (new_task->http_conn, new_task, nfd, &ctx->io_tv, ctx->ev_base); } diff --git a/src/worker_util.c b/src/worker_util.c index 97dac2b2a..618ab6a5c 100644 --- a/src/worker_util.c +++ b/src/worker_util.c @@ -24,6 +24,7 @@ #include "config.h" #include "main.h" #include "message.h" +#include "lua/lua_common.h" extern struct rspamd_main *rspamd_main; @@ -279,3 +280,116 @@ worker_stop_accept (struct rspamd_worker *worker) g_list_free (worker->accept_events); } } + +/* + * Called if all filters are processed + * @return TRUE if session should be terminated + */ +gboolean +rspamd_fin_task (void *arg) +{ + struct worker_task *task = (struct worker_task *) arg; + gint r; + GError *err = NULL; + + /* Task is already finished or skipped */ + if (task->state == WRITE_REPLY) { + if (task->fin_callback) { + task->fin_callback (task->fin_arg); + } + else { + rspamd_protocol_write_reply (task); + } + return TRUE; + } + + /* We processed all filters and want to process statfiles */ + if (task->state != WAIT_POST_FILTER && task->state != WAIT_PRE_FILTER) { + /* Process all statfiles */ + if (task->classify_pool == NULL) { + /* Non-threaded version */ + process_statfiles (task); + } + else { + /* Just process composites */ + make_composites (task); + } + if (task->cfg->post_filters) { + /* More to process */ + /* Special state */ + task->state = WAIT_POST_FILTER; + return FALSE; + } + + } + + /* We are on post-filter waiting state */ + if (task->state != WAIT_PRE_FILTER) { + /* Check if we have all events finished */ + task->state = WRITE_REPLY; + if (task->fin_callback) { + task->fin_callback (task->fin_arg); + } + else { + rspamd_protocol_write_reply (task); + } + } + else { + /* We were waiting for pre-filter */ + if (task->pre_result.action != METRIC_ACTION_NOACTION) { + /* Write result based on pre filters */ + task->state = WRITE_REPLY; + if (task->fin_callback) { + task->fin_callback (task->fin_arg); + } + else { + rspamd_protocol_write_reply (task); + } + return TRUE; + } + else { + task->state = WAIT_FILTER; + r = process_filters (task); + if (r == -1) { + task->last_error = "Filter processing error"; + task->error_code = RSPAMD_FILTER_ERROR; + task->state = WRITE_REPLY; + rspamd_protocol_write_reply (task); + return TRUE; + } + /* Add task to classify to classify pool */ + if (!task->is_skipped && task->classify_pool) { + register_async_thread (task->s); + g_thread_pool_push (task->classify_pool, task, &err); + if (err != NULL) { + msg_err ("cannot pull task to the pool: %s", err->message); + remove_async_thread (task->s); + g_error_free (err); + } + } + if (task->is_skipped) { + rspamd_protocol_write_reply (task); + } + else { + return FALSE; + } + } + } + + return TRUE; +} + +/* + * Called if session was restored inside fin callback + */ +void +rspamd_restore_task (void *arg) +{ + struct worker_task *task = (struct worker_task *) arg; + + /* Call post filters */ + if (task->state == WAIT_POST_FILTER) { + lua_call_post_filters (task); + } + task->s->wanna_die = TRUE; +} |