]> source.dussan.org Git - rspamd.git/commitdiff
Fixes to threading (still incomplete).
authorVsevolod Stakhov <vsevolod@rambler-co.ru>
Tue, 31 Jan 2012 18:24:51 +0000 (22:24 +0400)
committerVsevolod Stakhov <vsevolod@rambler-co.ru>
Tue, 31 Jan 2012 18:24:51 +0000 (22:24 +0400)
src/events.c
src/events.h
src/filter.c
src/filter.h
src/plugins/regexp.c
src/worker.c

index bf47bb1f89a534ca3a1241fd420791b58329fd87..0daefc95f41fed122b6d6b22ecaa4f65f235eced 100644 (file)
@@ -62,6 +62,13 @@ new_async_session (memory_pool_t * pool, event_finalizer_t fin,
        new->user_data = user_data;
        new->wanna_die = FALSE;
        new->events = g_hash_table_new (rspamd_event_hash, rspamd_event_equal);
+#if ((GLIB_MAJOR_VERSION == 2) && (GLIB_MINOR_VERSION <= 30))
+       new->mtx = g_mutex_new ();
+       memory_pool_add_destructor (pool, (pool_destruct_func) g_mutex_free, new->mtx);
+#else
+       new->mtx = memory_pool_alloc (pool, sizeof (GMutex));
+       g_mutex_init (new->mtx);
+#endif
        new->threads = 0;
 
        memory_pool_add_destructor (pool, (pool_destruct_func) g_hash_table_destroy, new->events);
@@ -148,7 +155,9 @@ destroy_session (struct rspamd_async_session *session)
 gboolean
 check_session_pending (struct rspamd_async_session *session)
 {
-       if (session->threads == 0 && g_hash_table_size (session->events) == 0) {
+       g_mutex_lock (session->mtx);
+       if (session->wanna_die && session->threads == 0 && g_hash_table_size (session->events) == 0) {
+               session->wanna_die = FALSE;
                if (session->fin != NULL) {
                        session->fin (session->user_data);
                }
@@ -157,11 +166,13 @@ check_session_pending (struct rspamd_async_session *session)
                        if (session->restore != NULL) {
                                session->restore (session->user_data);
                        }
+                       g_mutex_unlock (session->mtx);
                        return TRUE;
                }
+               g_mutex_unlock (session->mtx);
                return FALSE;
        }
-
+       g_mutex_unlock (session->mtx);
        return TRUE;
 }
 
@@ -174,6 +185,9 @@ void
 register_async_thread (struct rspamd_async_session *session)
 {
        g_atomic_int_inc (&session->threads);
+#ifdef RSPAMD_EVENTS_DEBUG
+       msg_info ("added thread: pending %d thread", session->threads);
+#endif
 }
 
 /**
@@ -186,4 +200,7 @@ remove_async_thread (struct rspamd_async_session *session)
        if (g_atomic_int_dec_and_test (&session->threads)) {
                (void) check_session_pending (session);
        }
+#ifdef RSPAMD_EVENTS_DEBUG
+       msg_info ("removed thread: pending %d thread", session->threads);
+#endif
 }
index 138920060bd76bd38aef09de36d79d5987036265..af41cc84ab525b340bd00b8e1d98e3908ee679f7 100644 (file)
@@ -22,6 +22,7 @@ struct rspamd_async_session {
        memory_pool_t *pool;
        gboolean wanna_die;
        guint threads;
+       GMutex *mtx;
 };
 
 /**
index e4ba63f0fe08eec7ab084ffeec609322f0543bcb..cf8a63d0d109f15a52926df3c27d822ee78249ae 100644 (file)
@@ -239,12 +239,14 @@ process_filters (struct worker_task *task)
                        if (!task->pass_all_filters &&
                                                metric->action == METRIC_ACTION_REJECT && 
                                                check_metric_is_spam (task, metric)) {
+                               task->s->wanna_die = TRUE;
                                check_session_pending (task->s);
                                return 1;
                        }
                        cur = g_list_next (cur);
                }
        }
+       task->s->wanna_die = TRUE;
        check_session_pending (task->s);
 
        return 1;
@@ -623,6 +625,24 @@ process_statfiles (struct worker_task *task)
        make_composites (task);
 }
 
+void
+process_statfiles_threaded (gpointer data, gpointer user_data)
+{
+       struct worker_task             *task = (struct worker_task *)data;
+
+       if (task->is_skipped) {
+               return;
+       }
+
+       if (task->tokens == NULL) {
+               task->tokens = g_hash_table_new (g_direct_hash, g_direct_equal);
+               memory_pool_add_destructor (task->task_pool, (pool_destruct_func)g_hash_table_unref, task->tokens);
+       }
+
+       g_list_foreach (task->cfg->classifiers, classifiers_callback, task);
+       remove_async_thread (task->s);
+}
+
 static void
 insert_metric_header (gpointer metric_name, gpointer metric_value, gpointer data)
 {
index d222fdd4540eaba54031b2cafeba608707ecb251..5cd6e4ff2be8ce2a5576e1342dfc4694f9875bc0 100644 (file)
@@ -93,6 +93,12 @@ gint process_filters (struct worker_task *task);
  */
 void process_statfiles (struct worker_task *task);
 
+/**
+ * Process message with statfiles threaded
+ * @param data worker's task that present message from user
+ */
+void process_statfiles_threaded (gpointer data, gpointer user_data);
+
 /**
  * Insert a result to task
  * @param task worker's task that present message from user
index 9d2210c5643b83548349d2ed4e4b41f09302845d..c9b58443634a027d96f5d08ed4794f6dd9570825 100644 (file)
@@ -64,6 +64,7 @@ struct regexp_ctx {
        memory_pool_t                  *regexp_pool;
        memory_pool_t                  *dynamic_pool;
        gsize                           max_size;
+       gsize                                                   max_threads;
        GThreadPool                                        *workers;
 };
 
@@ -507,8 +508,6 @@ regexp_module_config (struct config_file *cfg)
        gchar                          *value;
        gint                            res = TRUE;
        struct regexp_json_buf         *jb, **pjb;
-       gsize                                                   thr;
-       GError                                             *err = NULL;
 
        if ((value = get_module_opt (cfg, "regexp", "statfile_prefix")) != NULL) {
                regexp_module_ctx->statfile_prefix = memory_pool_strdup (regexp_module_ctx->regexp_pool, value);
@@ -524,24 +523,15 @@ regexp_module_config (struct config_file *cfg)
        }
        if ((value = get_module_opt (cfg, "regexp", "max_threads")) != NULL) {
                if (g_thread_supported ()) {
-                       thr = parse_limit (value, -1);
-                       if (thr > 1) {
-#if ((GLIB_MAJOR_VERSION == 2) && (GLIB_MINOR_VERSION <= 30))
-                               g_thread_init (NULL);
-                               workers_mtx = g_mutex_new ();
-#else
-                               workers_mtx = memory_pool_alloc (regexp_module_ctx->regexp_pool, sizeof (GMutex));
-                               g_mutex_init (workers_mtx);
-#endif
-                               regexp_module_ctx->workers = g_thread_pool_new (process_regexp_item_threaded, regexp_module_ctx, thr, TRUE, &err);
-                               if (err != NULL) {
-                                       msg_err ("thread pool creation failed: %s", err->message);
-                                       regexp_module_ctx->workers = NULL;
-                               }
-                       }
+                       regexp_module_ctx->max_threads = parse_limit (value, -1);
                }
+               else {
+                       regexp_module_ctx->max_threads = 0;
+               }
+               regexp_module_ctx->workers = NULL;
        }
        else {
+               regexp_module_ctx->max_threads = 0;
                regexp_module_ctx->workers = NULL;
        }
        if ((value = get_module_opt (cfg, "regexp", "dynamic_rules")) != NULL) {
@@ -1125,7 +1115,11 @@ process_regexp_expression (struct expression *expr, gchar *symbol, struct worker
                }
                else if (it->type == EXPR_STR) {
                        /* This may be lua function, try to call it */
-                       cur = maybe_call_lua_function ((const gchar*)it->content.operand, task);
+                       if (regexp_module_ctx->workers != NULL) {
+                               g_mutex_lock (workers_mtx);
+                               cur = maybe_call_lua_function ((const gchar*)it->content.operand, task);
+                               g_mutex_unlock (workers_mtx);
+                       }
                        debug_task ("function %s returned %s", (const gchar *)it->content.operand, cur ? "true" : "false");
                        if (try_optimize) {
                                try_optimize = optimize_regexp_expression (&it, stack, cur);
@@ -1208,24 +1202,13 @@ struct regexp_threaded_ud {
 static void
 process_regexp_item_threaded (gpointer data, gpointer user_data)
 {
-       struct regexp_threaded_ud          *ud = user_data;
-       gboolean                        res = FALSE;
+       struct regexp_threaded_ud          *ud = data;
 
-       if (ud->item->lua_function) {
-               /* Just call function */
-               if (lua_call_expression_func ("regexp", ud->item->lua_function, ud->task, NULL, &res) && res) {
-                       g_mutex_lock (workers_mtx);
-                       insert_result (ud->task, ud->item->symbol, 1, NULL);
-                       g_mutex_unlock (workers_mtx);
-               }
-       }
-       else {
-               /* Process expression */
-               if (process_regexp_expression (ud->item->expr, ud->item->symbol, ud->task, NULL)) {
-                       g_mutex_lock (workers_mtx);
-                       insert_result (ud->task, ud->item->symbol, 1, NULL);
-                       g_mutex_unlock (workers_mtx);
-               }
+       /* Process expression */
+       if (process_regexp_expression (ud->item->expr, ud->item->symbol, ud->task, NULL)) {
+               g_mutex_lock (workers_mtx);
+               insert_result (ud->task, ud->item->symbol, 1, NULL);
+               g_mutex_unlock (workers_mtx);
        }
        remove_async_thread (ud->task->s);
 }
@@ -1236,13 +1219,37 @@ process_regexp_item (struct worker_task *task, void *user_data)
        struct regexp_module_item      *item = user_data;
        gboolean                        res = FALSE;
        struct regexp_threaded_ud          *thr_ud;
+       GError                                             *err = NULL;
+
 
-       if (regexp_module_ctx->workers) {
+       if (!item->lua_function && regexp_module_ctx->max_threads > 1) {
+               if (regexp_module_ctx->workers == NULL) {
+#if ((GLIB_MAJOR_VERSION == 2) && (GLIB_MINOR_VERSION <= 30))
+                       g_thread_init (NULL);
+                       workers_mtx = g_mutex_new ();
+#else
+                       workers_mtx = memory_pool_alloc (regexp_module_ctx->regexp_pool, sizeof (GMutex));
+                       g_mutex_init (workers_mtx);
+#endif
+                       regexp_module_ctx->workers = g_thread_pool_new (process_regexp_item_threaded,
+                                       regexp_module_ctx, regexp_module_ctx->max_threads, TRUE, &err);
+                       if (err != NULL) {
+                               msg_err ("thread pool creation failed: %s", err->message);
+                               regexp_module_ctx->max_threads = 0;
+                               return;
+                       }
+               }
                thr_ud = memory_pool_alloc (task->task_pool, sizeof (struct regexp_threaded_ud));
                thr_ud->item = item;
                thr_ud->task = task;
-               register_async_thread (task->s);
-               g_thread_pool_push (regexp_module_ctx->workers, thr_ud, NULL);
+
+               g_thread_pool_push (regexp_module_ctx->workers, thr_ud, &err);
+               if (err == NULL) {
+                       register_async_thread (task->s);
+               }
+               else {
+                       msg_err ("error pushing task to the regexp thread pool: %s", err->message);
+               }
        }
        else {
                /* Non-threaded version */
index 48f4d19367094c7fd08513bfb0323862c39fe9c3..28bdb7100e13c5f9f0ffec6cf35ed2332b1a6abd 100644 (file)
@@ -103,6 +103,11 @@ struct rspamd_worker_ctx {
        guint32                         tasks;
        /* Limit of tasks */
        guint32                         max_tasks;
+       /* Classify threads */
+       guint32                                                 classify_threads;
+       /* Classify threads */
+       GThreadPool                                        *classify_pool;
+       /* Events base */
        struct event_base              *ev_base;
 };
 
@@ -499,6 +504,16 @@ read_socket (f_str_t * in, void *arg)
                                task->state = WRITE_ERROR;
                                return write_socket (task);
                        }
+                       /* Add task to classify to classify pool */
+                       if (ctx->classify_pool) {
+                               g_thread_pool_push (ctx->classify_pool, task, &err);
+                               if (err != NULL) {
+                                       msg_err ("cannot pull task to the pool: %s", err->message);
+                               }
+                               else {
+                                       register_async_thread (task->s);
+                               }
+                       }
                }
                break;
        case WRITE_REPLY:
@@ -603,10 +618,19 @@ static void
 fin_task (void *arg)
 {
        struct worker_task             *task = (struct worker_task *) arg;
+       struct rspamd_worker_ctx       *ctx;
 
+       ctx = task->worker->ctx;
        if (task->state != WAIT_POST_FILTER) {
                /* Process all statfiles */
-               process_statfiles (task);
+               if (ctx->classify_pool == NULL) {
+                       /* Non-threaded version */
+                       process_statfiles (task);
+               }
+               else {
+                       /* Just process composites */
+                       make_composites (task);
+               }
                /* Call post filters */
                lua_call_post_filters (task);
        }
@@ -842,6 +866,7 @@ init_worker (void)
 
        ctx->is_mime = TRUE;
        ctx->timeout = DEFAULT_WORKER_IO_TIMEOUT;
+       ctx->classify_threads = 1;
 
        register_worker_opt (type, "mime", xml_handle_boolean, ctx, G_STRUCT_OFFSET (struct rspamd_worker_ctx, is_mime));
        register_worker_opt (type, "http", xml_handle_boolean, ctx, G_STRUCT_OFFSET (struct rspamd_worker_ctx, is_http));
@@ -849,6 +874,7 @@ init_worker (void)
        register_worker_opt (type, "allow_learn", xml_handle_boolean, ctx, G_STRUCT_OFFSET (struct rspamd_worker_ctx, allow_learn));
        register_worker_opt (type, "timeout", xml_handle_seconds, ctx, G_STRUCT_OFFSET (struct rspamd_worker_ctx, timeout));
        register_worker_opt (type, "max_tasks", xml_handle_uint32, ctx, G_STRUCT_OFFSET (struct rspamd_worker_ctx, max_tasks));
+       register_worker_opt (type, "classify_threads", xml_handle_uint32, ctx, G_STRUCT_OFFSET (struct rspamd_worker_ctx, classify_threads));
 
        return ctx;
 }
@@ -862,6 +888,7 @@ start_worker (struct rspamd_worker *worker)
        struct sigaction                signals;
        gchar                          *is_custom_str;
        struct rspamd_worker_ctx       *ctx = worker->ctx;
+       GError                                             *err = NULL;
 
 #ifdef WITH_PROFILER
        extern void                     _start (void), etext (void);
@@ -913,6 +940,16 @@ start_worker (struct rspamd_worker *worker)
 
        ctx->resolver = dns_resolver_init (ctx->ev_base, worker->srv->cfg);
 
+       /* Create classify pool */
+       ctx->classify_pool = NULL;
+       if (ctx->classify_threads > 1) {
+               ctx->classify_pool = g_thread_pool_new (process_statfiles_threaded, ctx, ctx->classify_threads, TRUE, &err);
+               if (err != NULL) {
+                       msg_err ("pool create failed: %s", err->message);
+                       ctx->classify_pool = NULL;
+               }
+       }
+
        event_base_loop (ctx->ev_base, 0);
 
 #ifndef BUILD_STATIC