diff options
-rw-r--r-- | src/events.c | 21 | ||||
-rw-r--r-- | src/events.h | 1 | ||||
-rw-r--r-- | src/filter.c | 20 | ||||
-rw-r--r-- | src/filter.h | 6 | ||||
-rw-r--r-- | src/plugins/regexp.c | 83 | ||||
-rw-r--r-- | src/worker.c | 39 |
6 files changed, 129 insertions, 41 deletions
diff --git a/src/events.c b/src/events.c index bf47bb1f8..0daefc95f 100644 --- a/src/events.c +++ b/src/events.c @@ -62,6 +62,13 @@ new_async_session (memory_pool_t * pool, event_finalizer_t fin, new->user_data = user_data; new->wanna_die = FALSE; new->events = g_hash_table_new (rspamd_event_hash, rspamd_event_equal); +#if ((GLIB_MAJOR_VERSION == 2) && (GLIB_MINOR_VERSION <= 30)) + new->mtx = g_mutex_new (); + memory_pool_add_destructor (pool, (pool_destruct_func) g_mutex_free, new->mtx); +#else + new->mtx = memory_pool_alloc (pool, sizeof (GMutex)); + g_mutex_init (new->mtx); +#endif new->threads = 0; memory_pool_add_destructor (pool, (pool_destruct_func) g_hash_table_destroy, new->events); @@ -148,7 +155,9 @@ destroy_session (struct rspamd_async_session *session) gboolean check_session_pending (struct rspamd_async_session *session) { - if (session->threads == 0 && g_hash_table_size (session->events) == 0) { + g_mutex_lock (session->mtx); + if (session->wanna_die && session->threads == 0 && g_hash_table_size (session->events) == 0) { + session->wanna_die = FALSE; if (session->fin != NULL) { session->fin (session->user_data); } @@ -157,11 +166,13 @@ check_session_pending (struct rspamd_async_session *session) if (session->restore != NULL) { session->restore (session->user_data); } + g_mutex_unlock (session->mtx); return TRUE; } + g_mutex_unlock (session->mtx); return FALSE; } - + g_mutex_unlock (session->mtx); return TRUE; } @@ -174,6 +185,9 @@ void register_async_thread (struct rspamd_async_session *session) { g_atomic_int_inc (&session->threads); +#ifdef RSPAMD_EVENTS_DEBUG + msg_info ("added thread: pending %d thread", session->threads); +#endif } /** @@ -186,4 +200,7 @@ remove_async_thread (struct rspamd_async_session *session) if (g_atomic_int_dec_and_test (&session->threads)) { (void) check_session_pending (session); } +#ifdef RSPAMD_EVENTS_DEBUG + msg_info ("removed thread: pending %d thread", session->threads); +#endif } diff --git a/src/events.h b/src/events.h index 138920060..af41cc84a 100644 --- a/src/events.h +++ b/src/events.h @@ -22,6 +22,7 @@ struct rspamd_async_session { memory_pool_t *pool; gboolean wanna_die; guint threads; + GMutex *mtx; }; /** diff --git a/src/filter.c b/src/filter.c index e4ba63f0f..cf8a63d0d 100644 --- a/src/filter.c +++ b/src/filter.c @@ -239,12 +239,14 @@ process_filters (struct worker_task *task) if (!task->pass_all_filters && metric->action == METRIC_ACTION_REJECT && check_metric_is_spam (task, metric)) { + task->s->wanna_die = TRUE; check_session_pending (task->s); return 1; } cur = g_list_next (cur); } } + task->s->wanna_die = TRUE; check_session_pending (task->s); return 1; @@ -623,6 +625,24 @@ process_statfiles (struct worker_task *task) make_composites (task); } +void +process_statfiles_threaded (gpointer data, gpointer user_data) +{ + struct worker_task *task = (struct worker_task *)data; + + if (task->is_skipped) { + return; + } + + if (task->tokens == NULL) { + task->tokens = g_hash_table_new (g_direct_hash, g_direct_equal); + memory_pool_add_destructor (task->task_pool, (pool_destruct_func)g_hash_table_unref, task->tokens); + } + + g_list_foreach (task->cfg->classifiers, classifiers_callback, task); + remove_async_thread (task->s); +} + static void insert_metric_header (gpointer metric_name, gpointer metric_value, gpointer data) { diff --git a/src/filter.h b/src/filter.h index d222fdd45..5cd6e4ff2 100644 --- a/src/filter.h +++ b/src/filter.h @@ -94,6 +94,12 @@ gint process_filters (struct worker_task *task); void process_statfiles (struct worker_task *task); /** + * Process message with statfiles threaded + * @param data worker's task that present message from user + */ +void process_statfiles_threaded (gpointer data, gpointer user_data); + +/** * Insert a result to task * @param task worker's task that present message from user * @param metric_name metric's name to which we need to insert result diff --git a/src/plugins/regexp.c b/src/plugins/regexp.c index 9d2210c56..c9b584436 100644 --- a/src/plugins/regexp.c +++ b/src/plugins/regexp.c @@ -64,6 +64,7 @@ struct regexp_ctx { memory_pool_t *regexp_pool; memory_pool_t *dynamic_pool; gsize max_size; + gsize max_threads; GThreadPool *workers; }; @@ -507,8 +508,6 @@ regexp_module_config (struct config_file *cfg) gchar *value; gint res = TRUE; struct regexp_json_buf *jb, **pjb; - gsize thr; - GError *err = NULL; if ((value = get_module_opt (cfg, "regexp", "statfile_prefix")) != NULL) { regexp_module_ctx->statfile_prefix = memory_pool_strdup (regexp_module_ctx->regexp_pool, value); @@ -524,24 +523,15 @@ regexp_module_config (struct config_file *cfg) } if ((value = get_module_opt (cfg, "regexp", "max_threads")) != NULL) { if (g_thread_supported ()) { - thr = parse_limit (value, -1); - if (thr > 1) { -#if ((GLIB_MAJOR_VERSION == 2) && (GLIB_MINOR_VERSION <= 30)) - g_thread_init (NULL); - workers_mtx = g_mutex_new (); -#else - workers_mtx = memory_pool_alloc (regexp_module_ctx->regexp_pool, sizeof (GMutex)); - g_mutex_init (workers_mtx); -#endif - regexp_module_ctx->workers = g_thread_pool_new (process_regexp_item_threaded, regexp_module_ctx, thr, TRUE, &err); - if (err != NULL) { - msg_err ("thread pool creation failed: %s", err->message); - regexp_module_ctx->workers = NULL; - } - } + regexp_module_ctx->max_threads = parse_limit (value, -1); } + else { + regexp_module_ctx->max_threads = 0; + } + regexp_module_ctx->workers = NULL; } else { + regexp_module_ctx->max_threads = 0; regexp_module_ctx->workers = NULL; } if ((value = get_module_opt (cfg, "regexp", "dynamic_rules")) != NULL) { @@ -1125,7 +1115,11 @@ process_regexp_expression (struct expression *expr, gchar *symbol, struct worker } else if (it->type == EXPR_STR) { /* This may be lua function, try to call it */ - cur = maybe_call_lua_function ((const gchar*)it->content.operand, task); + if (regexp_module_ctx->workers != NULL) { + g_mutex_lock (workers_mtx); + cur = maybe_call_lua_function ((const gchar*)it->content.operand, task); + g_mutex_unlock (workers_mtx); + } debug_task ("function %s returned %s", (const gchar *)it->content.operand, cur ? "true" : "false"); if (try_optimize) { try_optimize = optimize_regexp_expression (&it, stack, cur); @@ -1208,24 +1202,13 @@ struct regexp_threaded_ud { static void process_regexp_item_threaded (gpointer data, gpointer user_data) { - struct regexp_threaded_ud *ud = user_data; - gboolean res = FALSE; + struct regexp_threaded_ud *ud = data; - if (ud->item->lua_function) { - /* Just call function */ - if (lua_call_expression_func ("regexp", ud->item->lua_function, ud->task, NULL, &res) && res) { - g_mutex_lock (workers_mtx); - insert_result (ud->task, ud->item->symbol, 1, NULL); - g_mutex_unlock (workers_mtx); - } - } - else { - /* Process expression */ - if (process_regexp_expression (ud->item->expr, ud->item->symbol, ud->task, NULL)) { - g_mutex_lock (workers_mtx); - insert_result (ud->task, ud->item->symbol, 1, NULL); - g_mutex_unlock (workers_mtx); - } + /* Process expression */ + if (process_regexp_expression (ud->item->expr, ud->item->symbol, ud->task, NULL)) { + g_mutex_lock (workers_mtx); + insert_result (ud->task, ud->item->symbol, 1, NULL); + g_mutex_unlock (workers_mtx); } remove_async_thread (ud->task->s); } @@ -1236,13 +1219,37 @@ process_regexp_item (struct worker_task *task, void *user_data) struct regexp_module_item *item = user_data; gboolean res = FALSE; struct regexp_threaded_ud *thr_ud; + GError *err = NULL; + - if (regexp_module_ctx->workers) { + if (!item->lua_function && regexp_module_ctx->max_threads > 1) { + if (regexp_module_ctx->workers == NULL) { +#if ((GLIB_MAJOR_VERSION == 2) && (GLIB_MINOR_VERSION <= 30)) + g_thread_init (NULL); + workers_mtx = g_mutex_new (); +#else + workers_mtx = memory_pool_alloc (regexp_module_ctx->regexp_pool, sizeof (GMutex)); + g_mutex_init (workers_mtx); +#endif + regexp_module_ctx->workers = g_thread_pool_new (process_regexp_item_threaded, + regexp_module_ctx, regexp_module_ctx->max_threads, TRUE, &err); + if (err != NULL) { + msg_err ("thread pool creation failed: %s", err->message); + regexp_module_ctx->max_threads = 0; + return; + } + } thr_ud = memory_pool_alloc (task->task_pool, sizeof (struct regexp_threaded_ud)); thr_ud->item = item; thr_ud->task = task; - register_async_thread (task->s); - g_thread_pool_push (regexp_module_ctx->workers, thr_ud, NULL); + + g_thread_pool_push (regexp_module_ctx->workers, thr_ud, &err); + if (err == NULL) { + register_async_thread (task->s); + } + else { + msg_err ("error pushing task to the regexp thread pool: %s", err->message); + } } else { /* Non-threaded version */ diff --git a/src/worker.c b/src/worker.c index 48f4d1936..28bdb7100 100644 --- a/src/worker.c +++ b/src/worker.c @@ -103,6 +103,11 @@ struct rspamd_worker_ctx { guint32 tasks; /* Limit of tasks */ guint32 max_tasks; + /* Classify threads */ + guint32 classify_threads; + /* Classify threads */ + GThreadPool *classify_pool; + /* Events base */ struct event_base *ev_base; }; @@ -499,6 +504,16 @@ read_socket (f_str_t * in, void *arg) task->state = WRITE_ERROR; return write_socket (task); } + /* Add task to classify to classify pool */ + if (ctx->classify_pool) { + g_thread_pool_push (ctx->classify_pool, task, &err); + if (err != NULL) { + msg_err ("cannot pull task to the pool: %s", err->message); + } + else { + register_async_thread (task->s); + } + } } break; case WRITE_REPLY: @@ -603,10 +618,19 @@ static void fin_task (void *arg) { struct worker_task *task = (struct worker_task *) arg; + struct rspamd_worker_ctx *ctx; + ctx = task->worker->ctx; if (task->state != WAIT_POST_FILTER) { /* Process all statfiles */ - process_statfiles (task); + if (ctx->classify_pool == NULL) { + /* Non-threaded version */ + process_statfiles (task); + } + else { + /* Just process composites */ + make_composites (task); + } /* Call post filters */ lua_call_post_filters (task); } @@ -842,6 +866,7 @@ init_worker (void) ctx->is_mime = TRUE; ctx->timeout = DEFAULT_WORKER_IO_TIMEOUT; + ctx->classify_threads = 1; register_worker_opt (type, "mime", xml_handle_boolean, ctx, G_STRUCT_OFFSET (struct rspamd_worker_ctx, is_mime)); register_worker_opt (type, "http", xml_handle_boolean, ctx, G_STRUCT_OFFSET (struct rspamd_worker_ctx, is_http)); @@ -849,6 +874,7 @@ init_worker (void) register_worker_opt (type, "allow_learn", xml_handle_boolean, ctx, G_STRUCT_OFFSET (struct rspamd_worker_ctx, allow_learn)); register_worker_opt (type, "timeout", xml_handle_seconds, ctx, G_STRUCT_OFFSET (struct rspamd_worker_ctx, timeout)); register_worker_opt (type, "max_tasks", xml_handle_uint32, ctx, G_STRUCT_OFFSET (struct rspamd_worker_ctx, max_tasks)); + register_worker_opt (type, "classify_threads", xml_handle_uint32, ctx, G_STRUCT_OFFSET (struct rspamd_worker_ctx, classify_threads)); return ctx; } @@ -862,6 +888,7 @@ start_worker (struct rspamd_worker *worker) struct sigaction signals; gchar *is_custom_str; struct rspamd_worker_ctx *ctx = worker->ctx; + GError *err = NULL; #ifdef WITH_PROFILER extern void _start (void), etext (void); @@ -913,6 +940,16 @@ start_worker (struct rspamd_worker *worker) ctx->resolver = dns_resolver_init (ctx->ev_base, worker->srv->cfg); + /* Create classify pool */ + ctx->classify_pool = NULL; + if (ctx->classify_threads > 1) { + ctx->classify_pool = g_thread_pool_new (process_statfiles_threaded, ctx, ctx->classify_threads, TRUE, &err); + if (err != NULL) { + msg_err ("pool create failed: %s", err->message); + ctx->classify_pool = NULL; + } + } + event_base_loop (ctx->ev_base, 0); #ifndef BUILD_STATIC |