diff options
author | Vsevolod Stakhov <vsevolod@rambler-co.ru> | 2012-01-31 22:24:51 +0400 |
---|---|---|
committer | Vsevolod Stakhov <vsevolod@rambler-co.ru> | 2012-01-31 22:24:51 +0400 |
commit | 5c0f36dfe734e46d72e4afdfd71c05353e4df86d (patch) | |
tree | 8ed04921ae2a4ee18106b0ca8096641e12ed1c01 /src/plugins/regexp.c | |
parent | 2d708971163dc99f9c29cc47e7d4f56a3af882c5 (diff) | |
download | rspamd-5c0f36dfe734e46d72e4afdfd71c05353e4df86d.tar.gz rspamd-5c0f36dfe734e46d72e4afdfd71c05353e4df86d.zip |
Fixes to threading (still incomplete).
Diffstat (limited to 'src/plugins/regexp.c')
-rw-r--r-- | src/plugins/regexp.c | 83 |
1 files changed, 45 insertions, 38 deletions
diff --git a/src/plugins/regexp.c b/src/plugins/regexp.c index 9d2210c56..c9b584436 100644 --- a/src/plugins/regexp.c +++ b/src/plugins/regexp.c @@ -64,6 +64,7 @@ struct regexp_ctx { memory_pool_t *regexp_pool; memory_pool_t *dynamic_pool; gsize max_size; + gsize max_threads; GThreadPool *workers; }; @@ -507,8 +508,6 @@ regexp_module_config (struct config_file *cfg) gchar *value; gint res = TRUE; struct regexp_json_buf *jb, **pjb; - gsize thr; - GError *err = NULL; if ((value = get_module_opt (cfg, "regexp", "statfile_prefix")) != NULL) { regexp_module_ctx->statfile_prefix = memory_pool_strdup (regexp_module_ctx->regexp_pool, value); @@ -524,24 +523,15 @@ regexp_module_config (struct config_file *cfg) } if ((value = get_module_opt (cfg, "regexp", "max_threads")) != NULL) { if (g_thread_supported ()) { - thr = parse_limit (value, -1); - if (thr > 1) { -#if ((GLIB_MAJOR_VERSION == 2) && (GLIB_MINOR_VERSION <= 30)) - g_thread_init (NULL); - workers_mtx = g_mutex_new (); -#else - workers_mtx = memory_pool_alloc (regexp_module_ctx->regexp_pool, sizeof (GMutex)); - g_mutex_init (workers_mtx); -#endif - regexp_module_ctx->workers = g_thread_pool_new (process_regexp_item_threaded, regexp_module_ctx, thr, TRUE, &err); - if (err != NULL) { - msg_err ("thread pool creation failed: %s", err->message); - regexp_module_ctx->workers = NULL; - } - } + regexp_module_ctx->max_threads = parse_limit (value, -1); } + else { + regexp_module_ctx->max_threads = 0; + } + regexp_module_ctx->workers = NULL; } else { + regexp_module_ctx->max_threads = 0; regexp_module_ctx->workers = NULL; } if ((value = get_module_opt (cfg, "regexp", "dynamic_rules")) != NULL) { @@ -1125,7 +1115,11 @@ process_regexp_expression (struct expression *expr, gchar *symbol, struct worker } else if (it->type == EXPR_STR) { /* This may be lua function, try to call it */ - cur = maybe_call_lua_function ((const gchar*)it->content.operand, task); + if (regexp_module_ctx->workers != NULL) { + g_mutex_lock (workers_mtx); + cur = maybe_call_lua_function ((const gchar*)it->content.operand, task); + g_mutex_unlock (workers_mtx); + } debug_task ("function %s returned %s", (const gchar *)it->content.operand, cur ? "true" : "false"); if (try_optimize) { try_optimize = optimize_regexp_expression (&it, stack, cur); @@ -1208,24 +1202,13 @@ struct regexp_threaded_ud { static void process_regexp_item_threaded (gpointer data, gpointer user_data) { - struct regexp_threaded_ud *ud = user_data; - gboolean res = FALSE; + struct regexp_threaded_ud *ud = data; - if (ud->item->lua_function) { - /* Just call function */ - if (lua_call_expression_func ("regexp", ud->item->lua_function, ud->task, NULL, &res) && res) { - g_mutex_lock (workers_mtx); - insert_result (ud->task, ud->item->symbol, 1, NULL); - g_mutex_unlock (workers_mtx); - } - } - else { - /* Process expression */ - if (process_regexp_expression (ud->item->expr, ud->item->symbol, ud->task, NULL)) { - g_mutex_lock (workers_mtx); - insert_result (ud->task, ud->item->symbol, 1, NULL); - g_mutex_unlock (workers_mtx); - } + /* Process expression */ + if (process_regexp_expression (ud->item->expr, ud->item->symbol, ud->task, NULL)) { + g_mutex_lock (workers_mtx); + insert_result (ud->task, ud->item->symbol, 1, NULL); + g_mutex_unlock (workers_mtx); } remove_async_thread (ud->task->s); } @@ -1236,13 +1219,37 @@ process_regexp_item (struct worker_task *task, void *user_data) struct regexp_module_item *item = user_data; gboolean res = FALSE; struct regexp_threaded_ud *thr_ud; + GError *err = NULL; + - if (regexp_module_ctx->workers) { + if (!item->lua_function && regexp_module_ctx->max_threads > 1) { + if (regexp_module_ctx->workers == NULL) { +#if ((GLIB_MAJOR_VERSION == 2) && (GLIB_MINOR_VERSION <= 30)) + g_thread_init (NULL); + workers_mtx = g_mutex_new (); +#else + workers_mtx = memory_pool_alloc (regexp_module_ctx->regexp_pool, sizeof (GMutex)); + g_mutex_init (workers_mtx); +#endif + regexp_module_ctx->workers = g_thread_pool_new (process_regexp_item_threaded, + regexp_module_ctx, regexp_module_ctx->max_threads, TRUE, &err); + if (err != NULL) { + msg_err ("thread pool creation failed: %s", err->message); + regexp_module_ctx->max_threads = 0; + return; + } + } thr_ud = memory_pool_alloc (task->task_pool, sizeof (struct regexp_threaded_ud)); thr_ud->item = item; thr_ud->task = task; - register_async_thread (task->s); - g_thread_pool_push (regexp_module_ctx->workers, thr_ud, NULL); + + g_thread_pool_push (regexp_module_ctx->workers, thr_ud, &err); + if (err == NULL) { + register_async_thread (task->s); + } + else { + msg_err ("error pushing task to the regexp thread pool: %s", err->message); + } } else { /* Non-threaded version */ |