aboutsummaryrefslogtreecommitdiffstats
path: root/src/plugins/regexp.c
diff options
context:
space:
mode:
authorVsevolod Stakhov <vsevolod@rambler-co.ru>2012-01-31 22:24:51 +0400
committerVsevolod Stakhov <vsevolod@rambler-co.ru>2012-01-31 22:24:51 +0400
commit5c0f36dfe734e46d72e4afdfd71c05353e4df86d (patch)
tree8ed04921ae2a4ee18106b0ca8096641e12ed1c01 /src/plugins/regexp.c
parent2d708971163dc99f9c29cc47e7d4f56a3af882c5 (diff)
downloadrspamd-5c0f36dfe734e46d72e4afdfd71c05353e4df86d.tar.gz
rspamd-5c0f36dfe734e46d72e4afdfd71c05353e4df86d.zip
Fixes to threading (still incomplete).
Diffstat (limited to 'src/plugins/regexp.c')
-rw-r--r--src/plugins/regexp.c83
1 files changed, 45 insertions, 38 deletions
diff --git a/src/plugins/regexp.c b/src/plugins/regexp.c
index 9d2210c56..c9b584436 100644
--- a/src/plugins/regexp.c
+++ b/src/plugins/regexp.c
@@ -64,6 +64,7 @@ struct regexp_ctx {
memory_pool_t *regexp_pool;
memory_pool_t *dynamic_pool;
gsize max_size;
+ gsize max_threads;
GThreadPool *workers;
};
@@ -507,8 +508,6 @@ regexp_module_config (struct config_file *cfg)
gchar *value;
gint res = TRUE;
struct regexp_json_buf *jb, **pjb;
- gsize thr;
- GError *err = NULL;
if ((value = get_module_opt (cfg, "regexp", "statfile_prefix")) != NULL) {
regexp_module_ctx->statfile_prefix = memory_pool_strdup (regexp_module_ctx->regexp_pool, value);
@@ -524,24 +523,15 @@ regexp_module_config (struct config_file *cfg)
}
if ((value = get_module_opt (cfg, "regexp", "max_threads")) != NULL) {
if (g_thread_supported ()) {
- thr = parse_limit (value, -1);
- if (thr > 1) {
-#if ((GLIB_MAJOR_VERSION == 2) && (GLIB_MINOR_VERSION <= 30))
- g_thread_init (NULL);
- workers_mtx = g_mutex_new ();
-#else
- workers_mtx = memory_pool_alloc (regexp_module_ctx->regexp_pool, sizeof (GMutex));
- g_mutex_init (workers_mtx);
-#endif
- regexp_module_ctx->workers = g_thread_pool_new (process_regexp_item_threaded, regexp_module_ctx, thr, TRUE, &err);
- if (err != NULL) {
- msg_err ("thread pool creation failed: %s", err->message);
- regexp_module_ctx->workers = NULL;
- }
- }
+ regexp_module_ctx->max_threads = parse_limit (value, -1);
}
+ else {
+ regexp_module_ctx->max_threads = 0;
+ }
+ regexp_module_ctx->workers = NULL;
}
else {
+ regexp_module_ctx->max_threads = 0;
regexp_module_ctx->workers = NULL;
}
if ((value = get_module_opt (cfg, "regexp", "dynamic_rules")) != NULL) {
@@ -1125,7 +1115,11 @@ process_regexp_expression (struct expression *expr, gchar *symbol, struct worker
}
else if (it->type == EXPR_STR) {
/* This may be lua function, try to call it */
- cur = maybe_call_lua_function ((const gchar*)it->content.operand, task);
+ if (regexp_module_ctx->workers != NULL) {
+ g_mutex_lock (workers_mtx);
+ cur = maybe_call_lua_function ((const gchar*)it->content.operand, task);
+ g_mutex_unlock (workers_mtx);
+ }
debug_task ("function %s returned %s", (const gchar *)it->content.operand, cur ? "true" : "false");
if (try_optimize) {
try_optimize = optimize_regexp_expression (&it, stack, cur);
@@ -1208,24 +1202,13 @@ struct regexp_threaded_ud {
static void
process_regexp_item_threaded (gpointer data, gpointer user_data)
{
- struct regexp_threaded_ud *ud = user_data;
- gboolean res = FALSE;
+ struct regexp_threaded_ud *ud = data;
- if (ud->item->lua_function) {
- /* Just call function */
- if (lua_call_expression_func ("regexp", ud->item->lua_function, ud->task, NULL, &res) && res) {
- g_mutex_lock (workers_mtx);
- insert_result (ud->task, ud->item->symbol, 1, NULL);
- g_mutex_unlock (workers_mtx);
- }
- }
- else {
- /* Process expression */
- if (process_regexp_expression (ud->item->expr, ud->item->symbol, ud->task, NULL)) {
- g_mutex_lock (workers_mtx);
- insert_result (ud->task, ud->item->symbol, 1, NULL);
- g_mutex_unlock (workers_mtx);
- }
+ /* Process expression */
+ if (process_regexp_expression (ud->item->expr, ud->item->symbol, ud->task, NULL)) {
+ g_mutex_lock (workers_mtx);
+ insert_result (ud->task, ud->item->symbol, 1, NULL);
+ g_mutex_unlock (workers_mtx);
}
remove_async_thread (ud->task->s);
}
@@ -1236,13 +1219,37 @@ process_regexp_item (struct worker_task *task, void *user_data)
struct regexp_module_item *item = user_data;
gboolean res = FALSE;
struct regexp_threaded_ud *thr_ud;
+ GError *err = NULL;
+
- if (regexp_module_ctx->workers) {
+ if (!item->lua_function && regexp_module_ctx->max_threads > 1) {
+ if (regexp_module_ctx->workers == NULL) {
+#if ((GLIB_MAJOR_VERSION == 2) && (GLIB_MINOR_VERSION <= 30))
+ g_thread_init (NULL);
+ workers_mtx = g_mutex_new ();
+#else
+ workers_mtx = memory_pool_alloc (regexp_module_ctx->regexp_pool, sizeof (GMutex));
+ g_mutex_init (workers_mtx);
+#endif
+ regexp_module_ctx->workers = g_thread_pool_new (process_regexp_item_threaded,
+ regexp_module_ctx, regexp_module_ctx->max_threads, TRUE, &err);
+ if (err != NULL) {
+ msg_err ("thread pool creation failed: %s", err->message);
+ regexp_module_ctx->max_threads = 0;
+ return;
+ }
+ }
thr_ud = memory_pool_alloc (task->task_pool, sizeof (struct regexp_threaded_ud));
thr_ud->item = item;
thr_ud->task = task;
- register_async_thread (task->s);
- g_thread_pool_push (regexp_module_ctx->workers, thr_ud, NULL);
+
+ g_thread_pool_push (regexp_module_ctx->workers, thr_ud, &err);
+ if (err == NULL) {
+ register_async_thread (task->s);
+ }
+ else {
+ msg_err ("error pushing task to the regexp thread pool: %s", err->message);
+ }
}
else {
/* Non-threaded version */