aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--src/events.c21
-rw-r--r--src/events.h1
-rw-r--r--src/filter.c20
-rw-r--r--src/filter.h6
-rw-r--r--src/plugins/regexp.c83
-rw-r--r--src/worker.c39
6 files changed, 129 insertions, 41 deletions
diff --git a/src/events.c b/src/events.c
index bf47bb1f8..0daefc95f 100644
--- a/src/events.c
+++ b/src/events.c
@@ -62,6 +62,13 @@ new_async_session (memory_pool_t * pool, event_finalizer_t fin,
new->user_data = user_data;
new->wanna_die = FALSE;
new->events = g_hash_table_new (rspamd_event_hash, rspamd_event_equal);
+#if ((GLIB_MAJOR_VERSION == 2) && (GLIB_MINOR_VERSION <= 30))
+ new->mtx = g_mutex_new ();
+ memory_pool_add_destructor (pool, (pool_destruct_func) g_mutex_free, new->mtx);
+#else
+ new->mtx = memory_pool_alloc (pool, sizeof (GMutex));
+ g_mutex_init (new->mtx);
+#endif
new->threads = 0;
memory_pool_add_destructor (pool, (pool_destruct_func) g_hash_table_destroy, new->events);
@@ -148,7 +155,9 @@ destroy_session (struct rspamd_async_session *session)
gboolean
check_session_pending (struct rspamd_async_session *session)
{
- if (session->threads == 0 && g_hash_table_size (session->events) == 0) {
+ g_mutex_lock (session->mtx);
+ if (session->wanna_die && session->threads == 0 && g_hash_table_size (session->events) == 0) {
+ session->wanna_die = FALSE;
if (session->fin != NULL) {
session->fin (session->user_data);
}
@@ -157,11 +166,13 @@ check_session_pending (struct rspamd_async_session *session)
if (session->restore != NULL) {
session->restore (session->user_data);
}
+ g_mutex_unlock (session->mtx);
return TRUE;
}
+ g_mutex_unlock (session->mtx);
return FALSE;
}
-
+ g_mutex_unlock (session->mtx);
return TRUE;
}
@@ -174,6 +185,9 @@ void
register_async_thread (struct rspamd_async_session *session)
{
g_atomic_int_inc (&session->threads);
+#ifdef RSPAMD_EVENTS_DEBUG
+ msg_info ("added thread: pending %d thread", session->threads);
+#endif
}
/**
@@ -186,4 +200,7 @@ remove_async_thread (struct rspamd_async_session *session)
if (g_atomic_int_dec_and_test (&session->threads)) {
(void) check_session_pending (session);
}
+#ifdef RSPAMD_EVENTS_DEBUG
+ msg_info ("removed thread: pending %d thread", session->threads);
+#endif
}
diff --git a/src/events.h b/src/events.h
index 138920060..af41cc84a 100644
--- a/src/events.h
+++ b/src/events.h
@@ -22,6 +22,7 @@ struct rspamd_async_session {
memory_pool_t *pool;
gboolean wanna_die;
guint threads;
+ GMutex *mtx;
};
/**
diff --git a/src/filter.c b/src/filter.c
index e4ba63f0f..cf8a63d0d 100644
--- a/src/filter.c
+++ b/src/filter.c
@@ -239,12 +239,14 @@ process_filters (struct worker_task *task)
if (!task->pass_all_filters &&
metric->action == METRIC_ACTION_REJECT &&
check_metric_is_spam (task, metric)) {
+ task->s->wanna_die = TRUE;
check_session_pending (task->s);
return 1;
}
cur = g_list_next (cur);
}
}
+ task->s->wanna_die = TRUE;
check_session_pending (task->s);
return 1;
@@ -623,6 +625,24 @@ process_statfiles (struct worker_task *task)
make_composites (task);
}
+void
+process_statfiles_threaded (gpointer data, gpointer user_data)
+{
+ struct worker_task *task = (struct worker_task *)data;
+
+ if (task->is_skipped) {
+ return;
+ }
+
+ if (task->tokens == NULL) {
+ task->tokens = g_hash_table_new (g_direct_hash, g_direct_equal);
+ memory_pool_add_destructor (task->task_pool, (pool_destruct_func)g_hash_table_unref, task->tokens);
+ }
+
+ g_list_foreach (task->cfg->classifiers, classifiers_callback, task);
+ remove_async_thread (task->s);
+}
+
static void
insert_metric_header (gpointer metric_name, gpointer metric_value, gpointer data)
{
diff --git a/src/filter.h b/src/filter.h
index d222fdd45..5cd6e4ff2 100644
--- a/src/filter.h
+++ b/src/filter.h
@@ -94,6 +94,12 @@ gint process_filters (struct worker_task *task);
void process_statfiles (struct worker_task *task);
/**
+ * Process message with statfiles threaded
+ * @param data worker's task that present message from user
+ */
+void process_statfiles_threaded (gpointer data, gpointer user_data);
+
+/**
* Insert a result to task
* @param task worker's task that present message from user
* @param metric_name metric's name to which we need to insert result
diff --git a/src/plugins/regexp.c b/src/plugins/regexp.c
index 9d2210c56..c9b584436 100644
--- a/src/plugins/regexp.c
+++ b/src/plugins/regexp.c
@@ -64,6 +64,7 @@ struct regexp_ctx {
memory_pool_t *regexp_pool;
memory_pool_t *dynamic_pool;
gsize max_size;
+ gsize max_threads;
GThreadPool *workers;
};
@@ -507,8 +508,6 @@ regexp_module_config (struct config_file *cfg)
gchar *value;
gint res = TRUE;
struct regexp_json_buf *jb, **pjb;
- gsize thr;
- GError *err = NULL;
if ((value = get_module_opt (cfg, "regexp", "statfile_prefix")) != NULL) {
regexp_module_ctx->statfile_prefix = memory_pool_strdup (regexp_module_ctx->regexp_pool, value);
@@ -524,24 +523,15 @@ regexp_module_config (struct config_file *cfg)
}
if ((value = get_module_opt (cfg, "regexp", "max_threads")) != NULL) {
if (g_thread_supported ()) {
- thr = parse_limit (value, -1);
- if (thr > 1) {
-#if ((GLIB_MAJOR_VERSION == 2) && (GLIB_MINOR_VERSION <= 30))
- g_thread_init (NULL);
- workers_mtx = g_mutex_new ();
-#else
- workers_mtx = memory_pool_alloc (regexp_module_ctx->regexp_pool, sizeof (GMutex));
- g_mutex_init (workers_mtx);
-#endif
- regexp_module_ctx->workers = g_thread_pool_new (process_regexp_item_threaded, regexp_module_ctx, thr, TRUE, &err);
- if (err != NULL) {
- msg_err ("thread pool creation failed: %s", err->message);
- regexp_module_ctx->workers = NULL;
- }
- }
+ regexp_module_ctx->max_threads = parse_limit (value, -1);
}
+ else {
+ regexp_module_ctx->max_threads = 0;
+ }
+ regexp_module_ctx->workers = NULL;
}
else {
+ regexp_module_ctx->max_threads = 0;
regexp_module_ctx->workers = NULL;
}
if ((value = get_module_opt (cfg, "regexp", "dynamic_rules")) != NULL) {
@@ -1125,7 +1115,11 @@ process_regexp_expression (struct expression *expr, gchar *symbol, struct worker
}
else if (it->type == EXPR_STR) {
/* This may be lua function, try to call it */
- cur = maybe_call_lua_function ((const gchar*)it->content.operand, task);
+ if (regexp_module_ctx->workers != NULL) {
+ g_mutex_lock (workers_mtx);
+ cur = maybe_call_lua_function ((const gchar*)it->content.operand, task);
+ g_mutex_unlock (workers_mtx);
+ }
debug_task ("function %s returned %s", (const gchar *)it->content.operand, cur ? "true" : "false");
if (try_optimize) {
try_optimize = optimize_regexp_expression (&it, stack, cur);
@@ -1208,24 +1202,13 @@ struct regexp_threaded_ud {
static void
process_regexp_item_threaded (gpointer data, gpointer user_data)
{
- struct regexp_threaded_ud *ud = user_data;
- gboolean res = FALSE;
+ struct regexp_threaded_ud *ud = data;
- if (ud->item->lua_function) {
- /* Just call function */
- if (lua_call_expression_func ("regexp", ud->item->lua_function, ud->task, NULL, &res) && res) {
- g_mutex_lock (workers_mtx);
- insert_result (ud->task, ud->item->symbol, 1, NULL);
- g_mutex_unlock (workers_mtx);
- }
- }
- else {
- /* Process expression */
- if (process_regexp_expression (ud->item->expr, ud->item->symbol, ud->task, NULL)) {
- g_mutex_lock (workers_mtx);
- insert_result (ud->task, ud->item->symbol, 1, NULL);
- g_mutex_unlock (workers_mtx);
- }
+ /* Process expression */
+ if (process_regexp_expression (ud->item->expr, ud->item->symbol, ud->task, NULL)) {
+ g_mutex_lock (workers_mtx);
+ insert_result (ud->task, ud->item->symbol, 1, NULL);
+ g_mutex_unlock (workers_mtx);
}
remove_async_thread (ud->task->s);
}
@@ -1236,13 +1219,37 @@ process_regexp_item (struct worker_task *task, void *user_data)
struct regexp_module_item *item = user_data;
gboolean res = FALSE;
struct regexp_threaded_ud *thr_ud;
+ GError *err = NULL;
+
- if (regexp_module_ctx->workers) {
+ if (!item->lua_function && regexp_module_ctx->max_threads > 1) {
+ if (regexp_module_ctx->workers == NULL) {
+#if ((GLIB_MAJOR_VERSION == 2) && (GLIB_MINOR_VERSION <= 30))
+ g_thread_init (NULL);
+ workers_mtx = g_mutex_new ();
+#else
+ workers_mtx = memory_pool_alloc (regexp_module_ctx->regexp_pool, sizeof (GMutex));
+ g_mutex_init (workers_mtx);
+#endif
+ regexp_module_ctx->workers = g_thread_pool_new (process_regexp_item_threaded,
+ regexp_module_ctx, regexp_module_ctx->max_threads, TRUE, &err);
+ if (err != NULL) {
+ msg_err ("thread pool creation failed: %s", err->message);
+ regexp_module_ctx->max_threads = 0;
+ return;
+ }
+ }
thr_ud = memory_pool_alloc (task->task_pool, sizeof (struct regexp_threaded_ud));
thr_ud->item = item;
thr_ud->task = task;
- register_async_thread (task->s);
- g_thread_pool_push (regexp_module_ctx->workers, thr_ud, NULL);
+
+ g_thread_pool_push (regexp_module_ctx->workers, thr_ud, &err);
+ if (err == NULL) {
+ register_async_thread (task->s);
+ }
+ else {
+ msg_err ("error pushing task to the regexp thread pool: %s", err->message);
+ }
}
else {
/* Non-threaded version */
diff --git a/src/worker.c b/src/worker.c
index 48f4d1936..28bdb7100 100644
--- a/src/worker.c
+++ b/src/worker.c
@@ -103,6 +103,11 @@ struct rspamd_worker_ctx {
guint32 tasks;
/* Limit of tasks */
guint32 max_tasks;
+ /* Classify threads */
+ guint32 classify_threads;
+ /* Classify threads */
+ GThreadPool *classify_pool;
+ /* Events base */
struct event_base *ev_base;
};
@@ -499,6 +504,16 @@ read_socket (f_str_t * in, void *arg)
task->state = WRITE_ERROR;
return write_socket (task);
}
+ /* Add task to classify to classify pool */
+ if (ctx->classify_pool) {
+ g_thread_pool_push (ctx->classify_pool, task, &err);
+ if (err != NULL) {
+ msg_err ("cannot pull task to the pool: %s", err->message);
+ }
+ else {
+ register_async_thread (task->s);
+ }
+ }
}
break;
case WRITE_REPLY:
@@ -603,10 +618,19 @@ static void
fin_task (void *arg)
{
struct worker_task *task = (struct worker_task *) arg;
+ struct rspamd_worker_ctx *ctx;
+ ctx = task->worker->ctx;
if (task->state != WAIT_POST_FILTER) {
/* Process all statfiles */
- process_statfiles (task);
+ if (ctx->classify_pool == NULL) {
+ /* Non-threaded version */
+ process_statfiles (task);
+ }
+ else {
+ /* Just process composites */
+ make_composites (task);
+ }
/* Call post filters */
lua_call_post_filters (task);
}
@@ -842,6 +866,7 @@ init_worker (void)
ctx->is_mime = TRUE;
ctx->timeout = DEFAULT_WORKER_IO_TIMEOUT;
+ ctx->classify_threads = 1;
register_worker_opt (type, "mime", xml_handle_boolean, ctx, G_STRUCT_OFFSET (struct rspamd_worker_ctx, is_mime));
register_worker_opt (type, "http", xml_handle_boolean, ctx, G_STRUCT_OFFSET (struct rspamd_worker_ctx, is_http));
@@ -849,6 +874,7 @@ init_worker (void)
register_worker_opt (type, "allow_learn", xml_handle_boolean, ctx, G_STRUCT_OFFSET (struct rspamd_worker_ctx, allow_learn));
register_worker_opt (type, "timeout", xml_handle_seconds, ctx, G_STRUCT_OFFSET (struct rspamd_worker_ctx, timeout));
register_worker_opt (type, "max_tasks", xml_handle_uint32, ctx, G_STRUCT_OFFSET (struct rspamd_worker_ctx, max_tasks));
+ register_worker_opt (type, "classify_threads", xml_handle_uint32, ctx, G_STRUCT_OFFSET (struct rspamd_worker_ctx, classify_threads));
return ctx;
}
@@ -862,6 +888,7 @@ start_worker (struct rspamd_worker *worker)
struct sigaction signals;
gchar *is_custom_str;
struct rspamd_worker_ctx *ctx = worker->ctx;
+ GError *err = NULL;
#ifdef WITH_PROFILER
extern void _start (void), etext (void);
@@ -913,6 +940,16 @@ start_worker (struct rspamd_worker *worker)
ctx->resolver = dns_resolver_init (ctx->ev_base, worker->srv->cfg);
+ /* Create classify pool */
+ ctx->classify_pool = NULL;
+ if (ctx->classify_threads > 1) {
+ ctx->classify_pool = g_thread_pool_new (process_statfiles_threaded, ctx, ctx->classify_threads, TRUE, &err);
+ if (err != NULL) {
+ msg_err ("pool create failed: %s", err->message);
+ ctx->classify_pool = NULL;
+ }
+ }
+
event_base_loop (ctx->ev_base, 0);
#ifndef BUILD_STATIC