aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorVsevolod Stakhov <vsevolod@rambler-co.ru>2012-01-30 22:26:25 +0400
committerVsevolod Stakhov <vsevolod@rambler-co.ru>2012-01-30 22:26:25 +0400
commita874d5eb9fe0a2e1ddf1a0f48e6df41845be087f (patch)
tree9a5e58ad412428bae591fbd5fea9ba7719af6d39
parent661ead901b2ce0d58b3cb374e26c9abc02d96384 (diff)
downloadrspamd-a874d5eb9fe0a2e1ddf1a0f48e6df41845be087f.tar.gz
rspamd-a874d5eb9fe0a2e1ddf1a0f48e6df41845be087f.zip
* Add support to process regexp in multiply threads by using thread pool.
-rw-r--r--src/plugins/regexp.c75
1 files changed, 66 insertions, 9 deletions
diff --git a/src/plugins/regexp.c b/src/plugins/regexp.c
index ea1841969..863136d26 100644
--- a/src/plugins/regexp.c
+++ b/src/plugins/regexp.c
@@ -64,6 +64,7 @@ struct regexp_ctx {
memory_pool_t *regexp_pool;
memory_pool_t *dynamic_pool;
gsize max_size;
+ GThreadPool *workers;
};
struct regexp_json_buf {
@@ -85,6 +86,7 @@ static const struct luaL_reg regexplib_m[] = {
static struct regexp_ctx *regexp_module_ctx = NULL;
static gint regexp_common_filter (struct worker_task *task);
+static void process_regexp_item_threaded (gpointer data, gpointer user_data);
static gboolean rspamd_regexp_match_number (struct worker_task *task, GList * args, void *unused);
static gboolean rspamd_raw_header_exists (struct worker_task *task, GList * args, void *unused);
static gboolean rspamd_check_smtp_data (struct worker_task *task, GList * args, void *unused);
@@ -446,6 +448,7 @@ regexp_module_init (struct config_file *cfg, struct module_ctx **ctx)
regexp_module_ctx->regexp_pool = memory_pool_new (memory_pool_get_size ());
regexp_module_ctx->dynamic_pool = NULL;
regexp_module_ctx->autolearn_symbols = g_hash_table_new (g_str_hash, g_str_equal);
+ regexp_module_ctx->workers = NULL;
*ctx = (struct module_ctx *)regexp_module_ctx;
register_expression_function ("regexp_match_number", rspamd_regexp_match_number, NULL);
@@ -456,6 +459,7 @@ regexp_module_init (struct config_file *cfg, struct module_ctx **ctx)
(void)luaopen_regexp (cfg->lua_state);
register_module_opt ("regexp", "dynamic_rules", MODULE_OPT_TYPE_STRING);
register_module_opt ("regexp", "max_size", MODULE_OPT_TYPE_SIZE);
+ register_module_opt ("regexp", "max_threads", MODULE_OPT_TYPE_SIZE);
register_module_opt ("regexp", "/^\\S+$/", MODULE_OPT_TYPE_STRING);
return 0;
@@ -499,9 +503,11 @@ regexp_module_config (struct config_file *cfg)
GList *cur_opt = NULL;
struct module_opt *cur;
struct regexp_module_item *cur_item;
- gchar *value;
+ gchar *value;
gint res = TRUE;
struct regexp_json_buf *jb, **pjb;
+ gsize thr;
+ GError *err = NULL;
if ((value = get_module_opt (cfg, "regexp", "statfile_prefix")) != NULL) {
regexp_module_ctx->statfile_prefix = memory_pool_strdup (regexp_module_ctx->regexp_pool, value);
@@ -515,6 +521,22 @@ regexp_module_config (struct config_file *cfg)
else {
regexp_module_ctx->max_size = 0;
}
+ if ((value = get_module_opt (cfg, "regexp", "max_threads")) != NULL) {
+ if (g_thread_supported ()) {
+ thr = parse_limit (value, -1);
+ if (thr > 1) {
+ g_thread_init (NULL);
+ regexp_module_ctx->workers = g_thread_pool_new (process_regexp_item_threaded, regexp_module_ctx, thr, TRUE, &err);
+ if (err != NULL) {
+ msg_err ("thread pool creation failed: %s", err->message);
+ regexp_module_ctx->workers = NULL;
+ }
+ }
+ }
+ }
+ else {
+ regexp_module_ctx->workers = NULL;
+ }
if ((value = get_module_opt (cfg, "regexp", "dynamic_rules")) != NULL) {
jb = g_malloc (sizeof (struct regexp_json_buf));
pjb = g_malloc (sizeof (struct regexp_json_buf *));
@@ -1169,22 +1191,57 @@ process_regexp_expression (struct expression *expr, gchar *symbol, struct worker
return FALSE;
}
+struct regexp_threaded_ud {
+ struct regexp_module_item *item;
+ struct worker_task *task;
+};
+
static void
-process_regexp_item (struct worker_task *task, void *user_data)
+process_regexp_item_threaded (gpointer data, gpointer user_data)
{
- struct regexp_module_item *item = user_data;
+ struct regexp_threaded_ud *ud = user_data;
gboolean res = FALSE;
-
- if (item->lua_function) {
+
+ if (ud->item->lua_function) {
/* Just call function */
- if (lua_call_expression_func ("regexp", item->lua_function, task, NULL, &res) && res) {
- insert_result (task, item->symbol, 1, NULL);
+ if (lua_call_expression_func ("regexp", ud->item->lua_function, ud->task, NULL, &res) && res) {
+ insert_result (ud->task, ud->item->symbol, 1, NULL);
}
}
else {
/* Process expression */
- if (process_regexp_expression (item->expr, item->symbol, task, NULL)) {
- insert_result (task, item->symbol, 1, NULL);
+ if (process_regexp_expression (ud->item->expr, ud->item->symbol, ud->task, NULL)) {
+ insert_result (ud->task, ud->item->symbol, 1, NULL);
+ }
+ }
+}
+
+static void
+process_regexp_item (struct worker_task *task, void *user_data)
+{
+ struct regexp_module_item *item = user_data;
+ gboolean res = FALSE;
+ struct regexp_threaded_ud *thr_ud;
+
+ if (regexp_module_ctx->workers) {
+ thr_ud = memory_pool_alloc (task->task_pool, sizeof (struct regexp_threaded_ud));
+ thr_ud->item = item;
+ thr_ud->task = task;
+ g_thread_pool_push (regexp_module_ctx->workers, thr_ud, NULL);
+ }
+ else {
+ /* Non-threaded version */
+ if (item->lua_function) {
+ /* Just call function */
+ if (lua_call_expression_func ("regexp", item->lua_function, task, NULL, &res) && res) {
+ insert_result (task, item->symbol, 1, NULL);
+ }
+ }
+ else {
+ /* Process expression */
+ if (process_regexp_expression (item->expr, item->symbol, task, NULL)) {
+ insert_result (task, item->symbol, 1, NULL);
+ }
}
}
}