diff options
-rw-r--r-- | src/plugins/regexp.c | 75 |
1 files changed, 66 insertions, 9 deletions
diff --git a/src/plugins/regexp.c b/src/plugins/regexp.c index ea1841969..863136d26 100644 --- a/src/plugins/regexp.c +++ b/src/plugins/regexp.c @@ -64,6 +64,7 @@ struct regexp_ctx { memory_pool_t *regexp_pool; memory_pool_t *dynamic_pool; gsize max_size; + GThreadPool *workers; }; struct regexp_json_buf { @@ -85,6 +86,7 @@ static const struct luaL_reg regexplib_m[] = { static struct regexp_ctx *regexp_module_ctx = NULL; static gint regexp_common_filter (struct worker_task *task); +static void process_regexp_item_threaded (gpointer data, gpointer user_data); static gboolean rspamd_regexp_match_number (struct worker_task *task, GList * args, void *unused); static gboolean rspamd_raw_header_exists (struct worker_task *task, GList * args, void *unused); static gboolean rspamd_check_smtp_data (struct worker_task *task, GList * args, void *unused); @@ -446,6 +448,7 @@ regexp_module_init (struct config_file *cfg, struct module_ctx **ctx) regexp_module_ctx->regexp_pool = memory_pool_new (memory_pool_get_size ()); regexp_module_ctx->dynamic_pool = NULL; regexp_module_ctx->autolearn_symbols = g_hash_table_new (g_str_hash, g_str_equal); + regexp_module_ctx->workers = NULL; *ctx = (struct module_ctx *)regexp_module_ctx; register_expression_function ("regexp_match_number", rspamd_regexp_match_number, NULL); @@ -456,6 +459,7 @@ regexp_module_init (struct config_file *cfg, struct module_ctx **ctx) (void)luaopen_regexp (cfg->lua_state); register_module_opt ("regexp", "dynamic_rules", MODULE_OPT_TYPE_STRING); register_module_opt ("regexp", "max_size", MODULE_OPT_TYPE_SIZE); + register_module_opt ("regexp", "max_threads", MODULE_OPT_TYPE_SIZE); register_module_opt ("regexp", "/^\\S+$/", MODULE_OPT_TYPE_STRING); return 0; @@ -499,9 +503,11 @@ regexp_module_config (struct config_file *cfg) GList *cur_opt = NULL; struct module_opt *cur; struct regexp_module_item *cur_item; - gchar *value; + gchar *value; gint res = TRUE; struct regexp_json_buf *jb, **pjb; + gsize thr; + GError *err = NULL; if ((value = get_module_opt (cfg, "regexp", "statfile_prefix")) != NULL) { regexp_module_ctx->statfile_prefix = memory_pool_strdup (regexp_module_ctx->regexp_pool, value); @@ -515,6 +521,22 @@ regexp_module_config (struct config_file *cfg) else { regexp_module_ctx->max_size = 0; } + if ((value = get_module_opt (cfg, "regexp", "max_threads")) != NULL) { + if (g_thread_supported ()) { + thr = parse_limit (value, -1); + if (thr > 1) { + g_thread_init (NULL); + regexp_module_ctx->workers = g_thread_pool_new (process_regexp_item_threaded, regexp_module_ctx, thr, TRUE, &err); + if (err != NULL) { + msg_err ("thread pool creation failed: %s", err->message); + regexp_module_ctx->workers = NULL; + } + } + } + } + else { + regexp_module_ctx->workers = NULL; + } if ((value = get_module_opt (cfg, "regexp", "dynamic_rules")) != NULL) { jb = g_malloc (sizeof (struct regexp_json_buf)); pjb = g_malloc (sizeof (struct regexp_json_buf *)); @@ -1169,22 +1191,57 @@ process_regexp_expression (struct expression *expr, gchar *symbol, struct worker return FALSE; } +struct regexp_threaded_ud { + struct regexp_module_item *item; + struct worker_task *task; +}; + static void -process_regexp_item (struct worker_task *task, void *user_data) +process_regexp_item_threaded (gpointer data, gpointer user_data) { - struct regexp_module_item *item = user_data; + struct regexp_threaded_ud *ud = user_data; gboolean res = FALSE; - - if (item->lua_function) { + + if (ud->item->lua_function) { /* Just call function */ - if (lua_call_expression_func ("regexp", item->lua_function, task, NULL, &res) && res) { - insert_result (task, item->symbol, 1, NULL); + if (lua_call_expression_func ("regexp", ud->item->lua_function, ud->task, NULL, &res) && res) { + insert_result (ud->task, ud->item->symbol, 1, NULL); } } else { /* Process expression */ - if (process_regexp_expression (item->expr, item->symbol, task, NULL)) { - insert_result (task, item->symbol, 1, NULL); + if (process_regexp_expression (ud->item->expr, ud->item->symbol, ud->task, NULL)) { + insert_result (ud->task, ud->item->symbol, 1, NULL); + } + } +} + +static void +process_regexp_item (struct worker_task *task, void *user_data) +{ + struct regexp_module_item *item = user_data; + gboolean res = FALSE; + struct regexp_threaded_ud *thr_ud; + + if (regexp_module_ctx->workers) { + thr_ud = memory_pool_alloc (task->task_pool, sizeof (struct regexp_threaded_ud)); + thr_ud->item = item; + thr_ud->task = task; + g_thread_pool_push (regexp_module_ctx->workers, thr_ud, NULL); + } + else { + /* Non-threaded version */ + if (item->lua_function) { + /* Just call function */ + if (lua_call_expression_func ("regexp", item->lua_function, task, NULL, &res) && res) { + insert_result (task, item->symbol, 1, NULL); + } + } + else { + /* Process expression */ + if (process_regexp_expression (item->expr, item->symbol, task, NULL)) { + insert_result (task, item->symbol, 1, NULL); + } } } } |