diff options
author | Vsevolod Stakhov <vsevolod@highsecure.ru> | 2015-05-26 11:37:48 +0100 |
---|---|---|
committer | Vsevolod Stakhov <vsevolod@highsecure.ru> | 2015-05-26 11:37:48 +0100 |
commit | b1575711e78b96cd8560d4bebcebcfa92e2d14a7 (patch) | |
tree | 9f4961a083214173d883a0825d5fccb6918372bc /src | |
parent | 9f4ebd97d0fbc2bffba13ec945de5e785b15b371 (diff) | |
download | rspamd-b1575711e78b96cd8560d4bebcebcfa92e2d14a7.tar.gz rspamd-b1575711e78b96cd8560d4bebcebcfa92e2d14a7.zip |
Remove threading support at all.
Diffstat (limited to 'src')
-rw-r--r-- | src/controller.c | 6 | ||||
-rw-r--r-- | src/libmime/filter.c | 18 | ||||
-rw-r--r-- | src/libmime/filter.h | 6 | ||||
-rw-r--r-- | src/libserver/task.c | 47 | ||||
-rw-r--r-- | src/libserver/task.h | 3 | ||||
-rw-r--r-- | src/lua/lua_util.c | 2 | ||||
-rw-r--r-- | src/worker.c | 38 |
7 files changed, 11 insertions, 109 deletions
diff --git a/src/controller.c b/src/controller.c index 52a3dd12a..0d8520058 100644 --- a/src/controller.c +++ b/src/controller.c @@ -1000,14 +1000,13 @@ rspamd_controller_handle_learn_common ( NULL, rspamd_task_free_hard, task); - task->s->wanna_die = TRUE; task->fin_arg = conn_ent; task->http_conn = rspamd_http_connection_ref (conn_ent->conn);; task->sock = conn_ent->conn->fd; /* XXX: Handle encrypted messages */ - if (!rspamd_task_process (task, msg, msg->body->str, msg->body->len, NULL, FALSE)) { + if (!rspamd_task_process (task, msg, msg->body->str, msg->body->len, FALSE)) { msg_warn ("filters cannot be processed for %s", task->message_id); rspamd_controller_send_error (conn_ent, 500, task->last_error); destroy_session (task->s); @@ -1091,13 +1090,12 @@ rspamd_controller_handle_scan (struct rspamd_http_connection_entry *conn_ent, NULL, rspamd_task_free_hard, task); - task->s->wanna_die = TRUE; task->fin_arg = conn_ent; task->http_conn = rspamd_http_connection_ref (conn_ent->conn); task->sock = conn_ent->conn->fd; /* XXX: handle encrypted messages */ - if (!rspamd_task_process (task, msg, msg->body->str, msg->body->len, NULL, FALSE)) { + if (!rspamd_task_process (task, msg, msg->body->str, msg->body->len, FALSE)) { msg_warn ("filters cannot be processed for %s", task->message_id); rspamd_controller_send_error (conn_ent, 500, task->last_error); destroy_session (task->s); diff --git a/src/libmime/filter.c b/src/libmime/filter.c index 70e5c3c6b..e23bb8f87 100644 --- a/src/libmime/filter.c +++ b/src/libmime/filter.c @@ -746,24 +746,6 @@ rspamd_process_statistics (struct rspamd_task *task) rspamd_make_composites (task); } -void -rspamd_process_statistic_threaded (gpointer data, gpointer user_data) -{ - struct rspamd_task *task = (struct rspamd_task *)data; - struct lua_locked_state *nL = user_data; - - if (RSPAMD_TASK_IS_SKIPPED (task)) { - remove_async_thread (task->s); - return; - } - - /* TODO: handle err here */ - rspamd_mutex_lock (nL->m); - rspamd_stat_classify (task, nL->L, NULL); - rspamd_mutex_unlock (nL->m); - remove_async_thread (task->s); -} - static void insert_metric_header (gpointer metric_name, gpointer metric_value, gpointer data) diff --git a/src/libmime/filter.h b/src/libmime/filter.h index f763c3b8a..67dc60010 100644 --- a/src/libmime/filter.h +++ b/src/libmime/filter.h @@ -109,12 +109,6 @@ gint rspamd_process_filters (struct rspamd_task *task); void rspamd_process_statistics (struct rspamd_task *task); /** - * Process message with statfiles threaded - * @param data worker's task that present message from user - */ -void rspamd_process_statistic_threaded (gpointer data, gpointer user_data); - -/** * Insert a result to task * @param task worker's task that present message from user * @param metric_name metric's name to which we need to insert result diff --git a/src/libserver/task.c b/src/libserver/task.c index 1874847de..d4fae99db 100644 --- a/src/libserver/task.c +++ b/src/libserver/task.c @@ -122,7 +122,6 @@ rspamd_task_fin (void *arg) { struct rspamd_task *task = (struct rspamd_task *) arg; gint r; - GError *err = NULL; /* Task is already finished or skipped */ if (task->state == WRITE_REPLY) { @@ -133,14 +132,9 @@ rspamd_task_fin (void *arg) /* We processed all filters and want to process statfiles */ if (task->state != WAIT_POST_FILTER && task->state != WAIT_PRE_FILTER) { /* Process all statfiles */ - if (task->classify_pool == NULL) { - /* Non-threaded version */ - rspamd_process_statistics (task); - } - else { - /* Just process composites */ - rspamd_make_composites (task); - } + /* Non-threaded version */ + rspamd_process_statistics (task); + if (task->cfg->post_filters) { /* More to process */ /* Special state */ @@ -174,16 +168,7 @@ rspamd_task_fin (void *arg) rspamd_task_reply (task); return TRUE; } - /* Add task to classify to classify pool */ - if (!RSPAMD_TASK_IS_SKIPPED (task) && task->classify_pool) { - register_async_thread (task->s); - g_thread_pool_push (task->classify_pool, task, &err); - if (err != NULL) { - msg_err ("cannot pull task to the pool: %s", err->message); - remove_async_thread (task->s); - g_error_free (err); - } - } + if (RSPAMD_TASK_IS_SKIPPED (task)) { rspamd_task_reply (task); } @@ -209,7 +194,6 @@ rspamd_task_restore (void *arg) !(task->flags & RSPAMD_TASK_FLAG_SKIP_EXTRA)) { rspamd_lua_call_post_filters (task); } - task->s->wanna_die = TRUE; } /* @@ -294,22 +278,17 @@ rspamd_task_free_soft (gpointer ud) gboolean rspamd_task_process (struct rspamd_task *task, struct rspamd_http_message *msg, const gchar *start, gsize len, - GThreadPool *classify_pool, gboolean process_extra_filters) { gint r; guint control_len; struct ucl_parser *parser; ucl_object_t *control_obj; - GError *err = NULL; task->msg.start = start; task->msg.len = len; debug_task ("got string of length %z", task->msg.len); - /* We got body, set wanna_die flag */ - task->s->wanna_die = TRUE; - if (msg) { rspamd_protocol_handle_headers (task, msg); } @@ -359,35 +338,23 @@ rspamd_task_process (struct rspamd_task *task, } if (!process_extra_filters || task->cfg->pre_filters == NULL) { r = rspamd_process_filters (task); + if (r == -1) { task->last_error = "filter processing error"; task->error_code = RSPAMD_FILTER_ERROR; task->state = WRITE_REPLY; return FALSE; } - /* Add task to classify to classify pool */ - if (!RSPAMD_TASK_IS_SKIPPED (task) && classify_pool) { - register_async_thread (task->s); - g_thread_pool_push (classify_pool, task, &err); - if (err != NULL) { - msg_err ("cannot pull task to the pool: %s", err->message); - remove_async_thread (task->s); - g_error_free (err); - } - else { - task->classify_pool = classify_pool; - } - } + if (RSPAMD_TASK_IS_SKIPPED (task)) { /* Call write_socket to write reply and exit */ task->state = WRITE_REPLY; } - task->s->wanna_die = TRUE; + } else { rspamd_lua_call_pre_filters (task); /* We want fin_task after pre filters are processed */ - task->s->wanna_die = TRUE; task->state = WAIT_PRE_FILTER; } diff --git a/src/libserver/task.h b/src/libserver/task.h index 606e4dcea..05a50ecdc 100644 --- a/src/libserver/task.h +++ b/src/libserver/task.h @@ -151,7 +151,6 @@ struct rspamd_task { struct rspamd_dns_resolver *resolver; /**< DNS resolver */ struct event_base *ev_base; /**< Event base */ - GThreadPool *classify_pool; /**< A pool of classify threads */ gpointer classify_data; /**< Opaque classifiers data */ struct { @@ -188,13 +187,11 @@ gboolean rspamd_task_fin (void *arg); * Process task from http message and write reply or call task->fin_handler * @param task task to process * @param msg incoming http message - * @param classify_pool classify pool (or NULL) * @param process_extra_filters whether to check pre and post filters * @return task has been successfully parsed and processed */ gboolean rspamd_task_process (struct rspamd_task *task, struct rspamd_http_message *msg, const gchar *start, gsize len, - GThreadPool *classify_pool, gboolean process_extra_filters); /** diff --git a/src/lua/lua_util.c b/src/lua/lua_util.c index e84b3748b..4442a3161 100644 --- a/src/lua/lua_util.c +++ b/src/lua/lua_util.c @@ -193,7 +193,7 @@ lua_util_process_message (lua_State *L) task->s = new_async_session (task->task_pool, rspamd_task_fin, rspamd_task_restore, rspamd_task_free_hard, task); - if (rspamd_task_process (task, NULL, message, mlen, NULL, TRUE)) { + if (rspamd_task_process (task, NULL, message, mlen, TRUE)) { event_base_loop (base, 0); if (res != NULL) { diff --git a/src/worker.c b/src/worker.c index 7c72b0515..dec3d679f 100644 --- a/src/worker.c +++ b/src/worker.c @@ -40,10 +40,6 @@ #include "lua/lua_common.h" -#ifdef WITH_GPERF_TOOLS -# include <glib/gprintf.h> -#endif - /* 60 seconds for worker's IO */ #define DEFAULT_WORKER_IO_TIMEOUT 60000 @@ -81,10 +77,6 @@ struct rspamd_worker_ctx { guint32 tasks; /* Limit of tasks */ guint32 max_tasks; - /* Classify threads */ - guint32 classify_threads; - /* Classify threads */ - GThreadPool *classify_pool; /* Events base */ struct event_base *ev_base; /* Encryption key */ @@ -133,7 +125,7 @@ rspamd_worker_body_handler (struct rspamd_http_connection *conn, } - if (!rspamd_task_process (task, msg, chunk, len, ctx->classify_pool, TRUE)) { + if (!rspamd_task_process (task, msg, chunk, len, TRUE)) { task->state = WRITE_REPLY; } @@ -180,7 +172,6 @@ rspamd_worker_finish_handler (struct rspamd_http_connection *conn, * If all filters have finished their tasks, this function will trigger * writing a reply. */ - task->s->wanna_die = TRUE; check_session_pending (task->s); } @@ -254,8 +245,6 @@ accept_socket (gint fd, short what, void *arg) new_task->s = new_async_session (new_task->task_pool, rspamd_task_fin, rspamd_task_restore, rspamd_task_free_hard, new_task); - new_task->classify_pool = ctx->classify_pool; - if (ctx->key) { rspamd_http_connection_set_key (new_task->http_conn, ctx->key); } @@ -279,7 +268,6 @@ init_worker (struct rspamd_config *cfg) ctx->is_mime = TRUE; ctx->timeout = DEFAULT_WORKER_IO_TIMEOUT; - ctx->classify_threads = 1; rspamd_rcl_register_worker_option (cfg, type, "mime", rspamd_rcl_parse_struct_boolean, ctx, @@ -307,12 +295,6 @@ init_worker (struct rspamd_config *cfg) G_STRUCT_OFFSET (struct rspamd_worker_ctx, max_tasks), RSPAMD_CL_FLAG_INT_32); - rspamd_rcl_register_worker_option (cfg, type, "classify_threads", - rspamd_rcl_parse_struct_integer, ctx, - G_STRUCT_OFFSET (struct rspamd_worker_ctx, - classify_threads), RSPAMD_CL_FLAG_INT_32); - - rspamd_rcl_register_worker_option (cfg, type, "keypair", rspamd_rcl_parse_struct_keypair, ctx, G_STRUCT_OFFSET (struct rspamd_worker_ctx, @@ -328,8 +310,6 @@ void start_worker (struct rspamd_worker *worker) { struct rspamd_worker_ctx *ctx = worker->ctx; - GError *err = NULL; - struct lua_locked_state *nL; ctx->ev_base = rspamd_prepare_worker (worker, "normal", accept_socket); msec_to_tv (ctx->timeout, &ctx->io_tv); @@ -344,22 +324,6 @@ start_worker (struct rspamd_worker *worker) rspamd_upstreams_library_init (ctx->resolver->r, ctx->ev_base); rspamd_upstreams_library_config (worker->srv->cfg); - /* Create classify pool */ - ctx->classify_pool = NULL; - if (ctx->classify_threads > 1) { - nL = rspamd_init_lua_locked (worker->srv->cfg); - ctx->classify_pool = g_thread_pool_new (rspamd_process_statistic_threaded, - nL, - ctx->classify_threads, - TRUE, - &err); - if (err != NULL) { - msg_err ("pool create failed: %e", err); - g_error_free (err); - ctx->classify_pool = NULL; - } - } - /* XXX: stupid default */ ctx->keys_cache = rspamd_keypair_cache_new (256); |