]> source.dussan.org Git - rspamd.git/commitdiff
* Add support to process regexp in multiply threads by using thread pool.
authorVsevolod Stakhov <vsevolod@rambler-co.ru>
Mon, 30 Jan 2012 18:26:25 +0000 (22:26 +0400)
committerVsevolod Stakhov <vsevolod@rambler-co.ru>
Mon, 30 Jan 2012 18:26:25 +0000 (22:26 +0400)
src/plugins/regexp.c

index ea1841969160f11a49685c565d6b116b51e3beea..863136d2678cf064d04e0773769c711e4564b5ce 100644 (file)
@@ -64,6 +64,7 @@ struct regexp_ctx {
        memory_pool_t                  *regexp_pool;
        memory_pool_t                  *dynamic_pool;
        gsize                           max_size;
+       GThreadPool                                        *workers;
 };
 
 struct regexp_json_buf {
@@ -85,6 +86,7 @@ static const struct luaL_reg    regexplib_m[] = {
 static struct regexp_ctx       *regexp_module_ctx = NULL;
 
 static gint                     regexp_common_filter (struct worker_task *task);
+static void                                            process_regexp_item_threaded (gpointer data, gpointer user_data);
 static gboolean                 rspamd_regexp_match_number (struct worker_task *task, GList * args, void *unused);
 static gboolean                 rspamd_raw_header_exists (struct worker_task *task, GList * args, void *unused);
 static gboolean                 rspamd_check_smtp_data (struct worker_task *task, GList * args, void *unused);
@@ -446,6 +448,7 @@ regexp_module_init (struct config_file *cfg, struct module_ctx **ctx)
        regexp_module_ctx->regexp_pool = memory_pool_new (memory_pool_get_size ());
        regexp_module_ctx->dynamic_pool = NULL;
        regexp_module_ctx->autolearn_symbols = g_hash_table_new (g_str_hash, g_str_equal);
+       regexp_module_ctx->workers = NULL;
 
        *ctx = (struct module_ctx *)regexp_module_ctx;
        register_expression_function ("regexp_match_number", rspamd_regexp_match_number, NULL);
@@ -456,6 +459,7 @@ regexp_module_init (struct config_file *cfg, struct module_ctx **ctx)
        (void)luaopen_regexp (cfg->lua_state);
        register_module_opt ("regexp", "dynamic_rules", MODULE_OPT_TYPE_STRING);
        register_module_opt ("regexp", "max_size", MODULE_OPT_TYPE_SIZE);
+       register_module_opt ("regexp", "max_threads", MODULE_OPT_TYPE_SIZE);
        register_module_opt ("regexp", "/^\\S+$/", MODULE_OPT_TYPE_STRING);
 
        return 0;
@@ -499,9 +503,11 @@ regexp_module_config (struct config_file *cfg)
        GList                          *cur_opt = NULL;
        struct module_opt              *cur;
        struct regexp_module_item      *cur_item;
-       gchar                           *value;
+       gchar                          *value;
        gint                            res = TRUE;
        struct regexp_json_buf         *jb, **pjb;
+       gsize                                                   thr;
+       GError                                             *err = NULL;
 
        if ((value = get_module_opt (cfg, "regexp", "statfile_prefix")) != NULL) {
                regexp_module_ctx->statfile_prefix = memory_pool_strdup (regexp_module_ctx->regexp_pool, value);
@@ -515,6 +521,22 @@ regexp_module_config (struct config_file *cfg)
        else {
                regexp_module_ctx->max_size = 0;
        }
+       if ((value = get_module_opt (cfg, "regexp", "max_threads")) != NULL) {
+               if (g_thread_supported ()) {
+                       thr = parse_limit (value, -1);
+                       if (thr > 1) {
+                               g_thread_init (NULL);
+                               regexp_module_ctx->workers = g_thread_pool_new (process_regexp_item_threaded, regexp_module_ctx, thr, TRUE, &err);
+                               if (err != NULL) {
+                                       msg_err ("thread pool creation failed: %s", err->message);
+                                       regexp_module_ctx->workers = NULL;
+                               }
+                       }
+               }
+       }
+       else {
+               regexp_module_ctx->workers = NULL;
+       }
        if ((value = get_module_opt (cfg, "regexp", "dynamic_rules")) != NULL) {
                jb = g_malloc (sizeof (struct regexp_json_buf));
                pjb = g_malloc (sizeof (struct regexp_json_buf *));
@@ -1169,22 +1191,57 @@ process_regexp_expression (struct expression *expr, gchar *symbol, struct worker
        return FALSE;
 }
 
+struct regexp_threaded_ud {
+       struct regexp_module_item *item;
+       struct worker_task *task;
+};
+
 static void
-process_regexp_item (struct worker_task *task, void *user_data)
+process_regexp_item_threaded (gpointer data, gpointer user_data)
 {
-       struct regexp_module_item      *item = user_data;
+       struct regexp_threaded_ud          *ud = user_data;
        gboolean                        res = FALSE;
-       
-       if (item->lua_function) {
+
+       if (ud->item->lua_function) {
                /* Just call function */
-               if (lua_call_expression_func ("regexp", item->lua_function, task, NULL, &res) && res) {
-                       insert_result (task, item->symbol, 1, NULL);
+               if (lua_call_expression_func ("regexp", ud->item->lua_function, ud->task, NULL, &res) && res) {
+                       insert_result (ud->task, ud->item->symbol, 1, NULL);
                }
        }
        else {
                /* Process expression */
-               if (process_regexp_expression (item->expr, item->symbol, task, NULL)) {
-                       insert_result (task, item->symbol, 1, NULL);
+               if (process_regexp_expression (ud->item->expr, ud->item->symbol, ud->task, NULL)) {
+                       insert_result (ud->task, ud->item->symbol, 1, NULL);
+               }
+       }
+}
+
+static void
+process_regexp_item (struct worker_task *task, void *user_data)
+{
+       struct regexp_module_item      *item = user_data;
+       gboolean                        res = FALSE;
+       struct regexp_threaded_ud          *thr_ud;
+
+       if (regexp_module_ctx->workers) {
+               thr_ud = memory_pool_alloc (task->task_pool, sizeof (struct regexp_threaded_ud));
+               thr_ud->item = item;
+               thr_ud->task = task;
+               g_thread_pool_push (regexp_module_ctx->workers, thr_ud, NULL);
+       }
+       else {
+               /* Non-threaded version */
+               if (item->lua_function) {
+                       /* Just call function */
+                       if (lua_call_expression_func ("regexp", item->lua_function, task, NULL, &res) && res) {
+                               insert_result (task, item->symbol, 1, NULL);
+                       }
+               }
+               else {
+                       /* Process expression */
+                       if (process_regexp_expression (item->expr, item->symbol, task, NULL)) {
+                               insert_result (task, item->symbol, 1, NULL);
+                       }
                }
        }
 }