aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorVsevolod Stakhov <vsevolod@highsecure.ru>2015-05-26 11:37:48 +0100
committerVsevolod Stakhov <vsevolod@highsecure.ru>2015-05-26 11:37:48 +0100
commitb1575711e78b96cd8560d4bebcebcfa92e2d14a7 (patch)
tree9f4961a083214173d883a0825d5fccb6918372bc
parent9f4ebd97d0fbc2bffba13ec945de5e785b15b371 (diff)
downloadrspamd-b1575711e78b96cd8560d4bebcebcfa92e2d14a7.tar.gz
rspamd-b1575711e78b96cd8560d4bebcebcfa92e2d14a7.zip
Remove threading support at all.
-rw-r--r--src/controller.c6
-rw-r--r--src/libmime/filter.c18
-rw-r--r--src/libmime/filter.h6
-rw-r--r--src/libserver/task.c47
-rw-r--r--src/libserver/task.h3
-rw-r--r--src/lua/lua_util.c2
-rw-r--r--src/worker.c38
7 files changed, 11 insertions, 109 deletions
diff --git a/src/controller.c b/src/controller.c
index 52a3dd12a..0d8520058 100644
--- a/src/controller.c
+++ b/src/controller.c
@@ -1000,14 +1000,13 @@ rspamd_controller_handle_learn_common (
NULL,
rspamd_task_free_hard,
task);
- task->s->wanna_die = TRUE;
task->fin_arg = conn_ent;
task->http_conn = rspamd_http_connection_ref (conn_ent->conn);;
task->sock = conn_ent->conn->fd;
/* XXX: Handle encrypted messages */
- if (!rspamd_task_process (task, msg, msg->body->str, msg->body->len, NULL, FALSE)) {
+ if (!rspamd_task_process (task, msg, msg->body->str, msg->body->len, FALSE)) {
msg_warn ("filters cannot be processed for %s", task->message_id);
rspamd_controller_send_error (conn_ent, 500, task->last_error);
destroy_session (task->s);
@@ -1091,13 +1090,12 @@ rspamd_controller_handle_scan (struct rspamd_http_connection_entry *conn_ent,
NULL,
rspamd_task_free_hard,
task);
- task->s->wanna_die = TRUE;
task->fin_arg = conn_ent;
task->http_conn = rspamd_http_connection_ref (conn_ent->conn);
task->sock = conn_ent->conn->fd;
/* XXX: handle encrypted messages */
- if (!rspamd_task_process (task, msg, msg->body->str, msg->body->len, NULL, FALSE)) {
+ if (!rspamd_task_process (task, msg, msg->body->str, msg->body->len, FALSE)) {
msg_warn ("filters cannot be processed for %s", task->message_id);
rspamd_controller_send_error (conn_ent, 500, task->last_error);
destroy_session (task->s);
diff --git a/src/libmime/filter.c b/src/libmime/filter.c
index 70e5c3c6b..e23bb8f87 100644
--- a/src/libmime/filter.c
+++ b/src/libmime/filter.c
@@ -746,24 +746,6 @@ rspamd_process_statistics (struct rspamd_task *task)
rspamd_make_composites (task);
}
-void
-rspamd_process_statistic_threaded (gpointer data, gpointer user_data)
-{
- struct rspamd_task *task = (struct rspamd_task *)data;
- struct lua_locked_state *nL = user_data;
-
- if (RSPAMD_TASK_IS_SKIPPED (task)) {
- remove_async_thread (task->s);
- return;
- }
-
- /* TODO: handle err here */
- rspamd_mutex_lock (nL->m);
- rspamd_stat_classify (task, nL->L, NULL);
- rspamd_mutex_unlock (nL->m);
- remove_async_thread (task->s);
-}
-
static void
insert_metric_header (gpointer metric_name, gpointer metric_value,
gpointer data)
diff --git a/src/libmime/filter.h b/src/libmime/filter.h
index f763c3b8a..67dc60010 100644
--- a/src/libmime/filter.h
+++ b/src/libmime/filter.h
@@ -109,12 +109,6 @@ gint rspamd_process_filters (struct rspamd_task *task);
void rspamd_process_statistics (struct rspamd_task *task);
/**
- * Process message with statfiles threaded
- * @param data worker's task that present message from user
- */
-void rspamd_process_statistic_threaded (gpointer data, gpointer user_data);
-
-/**
* Insert a result to task
* @param task worker's task that present message from user
* @param metric_name metric's name to which we need to insert result
diff --git a/src/libserver/task.c b/src/libserver/task.c
index 1874847de..d4fae99db 100644
--- a/src/libserver/task.c
+++ b/src/libserver/task.c
@@ -122,7 +122,6 @@ rspamd_task_fin (void *arg)
{
struct rspamd_task *task = (struct rspamd_task *) arg;
gint r;
- GError *err = NULL;
/* Task is already finished or skipped */
if (task->state == WRITE_REPLY) {
@@ -133,14 +132,9 @@ rspamd_task_fin (void *arg)
/* We processed all filters and want to process statfiles */
if (task->state != WAIT_POST_FILTER && task->state != WAIT_PRE_FILTER) {
/* Process all statfiles */
- if (task->classify_pool == NULL) {
- /* Non-threaded version */
- rspamd_process_statistics (task);
- }
- else {
- /* Just process composites */
- rspamd_make_composites (task);
- }
+ /* Non-threaded version */
+ rspamd_process_statistics (task);
+
if (task->cfg->post_filters) {
/* More to process */
/* Special state */
@@ -174,16 +168,7 @@ rspamd_task_fin (void *arg)
rspamd_task_reply (task);
return TRUE;
}
- /* Add task to classify to classify pool */
- if (!RSPAMD_TASK_IS_SKIPPED (task) && task->classify_pool) {
- register_async_thread (task->s);
- g_thread_pool_push (task->classify_pool, task, &err);
- if (err != NULL) {
- msg_err ("cannot pull task to the pool: %s", err->message);
- remove_async_thread (task->s);
- g_error_free (err);
- }
- }
+
if (RSPAMD_TASK_IS_SKIPPED (task)) {
rspamd_task_reply (task);
}
@@ -209,7 +194,6 @@ rspamd_task_restore (void *arg)
!(task->flags & RSPAMD_TASK_FLAG_SKIP_EXTRA)) {
rspamd_lua_call_post_filters (task);
}
- task->s->wanna_die = TRUE;
}
/*
@@ -294,22 +278,17 @@ rspamd_task_free_soft (gpointer ud)
gboolean
rspamd_task_process (struct rspamd_task *task,
struct rspamd_http_message *msg, const gchar *start, gsize len,
- GThreadPool *classify_pool,
gboolean process_extra_filters)
{
gint r;
guint control_len;
struct ucl_parser *parser;
ucl_object_t *control_obj;
- GError *err = NULL;
task->msg.start = start;
task->msg.len = len;
debug_task ("got string of length %z", task->msg.len);
- /* We got body, set wanna_die flag */
- task->s->wanna_die = TRUE;
-
if (msg) {
rspamd_protocol_handle_headers (task, msg);
}
@@ -359,35 +338,23 @@ rspamd_task_process (struct rspamd_task *task,
}
if (!process_extra_filters || task->cfg->pre_filters == NULL) {
r = rspamd_process_filters (task);
+
if (r == -1) {
task->last_error = "filter processing error";
task->error_code = RSPAMD_FILTER_ERROR;
task->state = WRITE_REPLY;
return FALSE;
}
- /* Add task to classify to classify pool */
- if (!RSPAMD_TASK_IS_SKIPPED (task) && classify_pool) {
- register_async_thread (task->s);
- g_thread_pool_push (classify_pool, task, &err);
- if (err != NULL) {
- msg_err ("cannot pull task to the pool: %s", err->message);
- remove_async_thread (task->s);
- g_error_free (err);
- }
- else {
- task->classify_pool = classify_pool;
- }
- }
+
if (RSPAMD_TASK_IS_SKIPPED (task)) {
/* Call write_socket to write reply and exit */
task->state = WRITE_REPLY;
}
- task->s->wanna_die = TRUE;
+
}
else {
rspamd_lua_call_pre_filters (task);
/* We want fin_task after pre filters are processed */
- task->s->wanna_die = TRUE;
task->state = WAIT_PRE_FILTER;
}
diff --git a/src/libserver/task.h b/src/libserver/task.h
index 606e4dcea..05a50ecdc 100644
--- a/src/libserver/task.h
+++ b/src/libserver/task.h
@@ -151,7 +151,6 @@ struct rspamd_task {
struct rspamd_dns_resolver *resolver; /**< DNS resolver */
struct event_base *ev_base; /**< Event base */
- GThreadPool *classify_pool; /**< A pool of classify threads */
gpointer classify_data; /**< Opaque classifiers data */
struct {
@@ -188,13 +187,11 @@ gboolean rspamd_task_fin (void *arg);
* Process task from http message and write reply or call task->fin_handler
* @param task task to process
* @param msg incoming http message
- * @param classify_pool classify pool (or NULL)
* @param process_extra_filters whether to check pre and post filters
* @return task has been successfully parsed and processed
*/
gboolean rspamd_task_process (struct rspamd_task *task,
struct rspamd_http_message *msg, const gchar *start, gsize len,
- GThreadPool *classify_pool,
gboolean process_extra_filters);
/**
diff --git a/src/lua/lua_util.c b/src/lua/lua_util.c
index e84b3748b..4442a3161 100644
--- a/src/lua/lua_util.c
+++ b/src/lua/lua_util.c
@@ -193,7 +193,7 @@ lua_util_process_message (lua_State *L)
task->s = new_async_session (task->task_pool, rspamd_task_fin,
rspamd_task_restore, rspamd_task_free_hard, task);
- if (rspamd_task_process (task, NULL, message, mlen, NULL, TRUE)) {
+ if (rspamd_task_process (task, NULL, message, mlen, TRUE)) {
event_base_loop (base, 0);
if (res != NULL) {
diff --git a/src/worker.c b/src/worker.c
index 7c72b0515..dec3d679f 100644
--- a/src/worker.c
+++ b/src/worker.c
@@ -40,10 +40,6 @@
#include "lua/lua_common.h"
-#ifdef WITH_GPERF_TOOLS
-# include <glib/gprintf.h>
-#endif
-
/* 60 seconds for worker's IO */
#define DEFAULT_WORKER_IO_TIMEOUT 60000
@@ -81,10 +77,6 @@ struct rspamd_worker_ctx {
guint32 tasks;
/* Limit of tasks */
guint32 max_tasks;
- /* Classify threads */
- guint32 classify_threads;
- /* Classify threads */
- GThreadPool *classify_pool;
/* Events base */
struct event_base *ev_base;
/* Encryption key */
@@ -133,7 +125,7 @@ rspamd_worker_body_handler (struct rspamd_http_connection *conn,
}
- if (!rspamd_task_process (task, msg, chunk, len, ctx->classify_pool, TRUE)) {
+ if (!rspamd_task_process (task, msg, chunk, len, TRUE)) {
task->state = WRITE_REPLY;
}
@@ -180,7 +172,6 @@ rspamd_worker_finish_handler (struct rspamd_http_connection *conn,
* If all filters have finished their tasks, this function will trigger
* writing a reply.
*/
- task->s->wanna_die = TRUE;
check_session_pending (task->s);
}
@@ -254,8 +245,6 @@ accept_socket (gint fd, short what, void *arg)
new_task->s = new_async_session (new_task->task_pool, rspamd_task_fin,
rspamd_task_restore, rspamd_task_free_hard, new_task);
- new_task->classify_pool = ctx->classify_pool;
-
if (ctx->key) {
rspamd_http_connection_set_key (new_task->http_conn, ctx->key);
}
@@ -279,7 +268,6 @@ init_worker (struct rspamd_config *cfg)
ctx->is_mime = TRUE;
ctx->timeout = DEFAULT_WORKER_IO_TIMEOUT;
- ctx->classify_threads = 1;
rspamd_rcl_register_worker_option (cfg, type, "mime",
rspamd_rcl_parse_struct_boolean, ctx,
@@ -307,12 +295,6 @@ init_worker (struct rspamd_config *cfg)
G_STRUCT_OFFSET (struct rspamd_worker_ctx,
max_tasks), RSPAMD_CL_FLAG_INT_32);
- rspamd_rcl_register_worker_option (cfg, type, "classify_threads",
- rspamd_rcl_parse_struct_integer, ctx,
- G_STRUCT_OFFSET (struct rspamd_worker_ctx,
- classify_threads), RSPAMD_CL_FLAG_INT_32);
-
-
rspamd_rcl_register_worker_option (cfg, type, "keypair",
rspamd_rcl_parse_struct_keypair, ctx,
G_STRUCT_OFFSET (struct rspamd_worker_ctx,
@@ -328,8 +310,6 @@ void
start_worker (struct rspamd_worker *worker)
{
struct rspamd_worker_ctx *ctx = worker->ctx;
- GError *err = NULL;
- struct lua_locked_state *nL;
ctx->ev_base = rspamd_prepare_worker (worker, "normal", accept_socket);
msec_to_tv (ctx->timeout, &ctx->io_tv);
@@ -344,22 +324,6 @@ start_worker (struct rspamd_worker *worker)
rspamd_upstreams_library_init (ctx->resolver->r, ctx->ev_base);
rspamd_upstreams_library_config (worker->srv->cfg);
- /* Create classify pool */
- ctx->classify_pool = NULL;
- if (ctx->classify_threads > 1) {
- nL = rspamd_init_lua_locked (worker->srv->cfg);
- ctx->classify_pool = g_thread_pool_new (rspamd_process_statistic_threaded,
- nL,
- ctx->classify_threads,
- TRUE,
- &err);
- if (err != NULL) {
- msg_err ("pool create failed: %e", err);
- g_error_free (err);
- ctx->classify_pool = NULL;
- }
- }
-
/* XXX: stupid default */
ctx->keys_cache = rspamd_keypair_cache_new (256);