]> source.dussan.org Git - rspamd.git/commitdiff
* More things to be thread-safe:
authorVsevolod Stakhov <vsevolod@rambler-co.ru>
Mon, 13 Feb 2012 17:51:10 +0000 (21:51 +0400)
committerVsevolod Stakhov <vsevolod@rambler-co.ru>
Mon, 13 Feb 2012 17:51:10 +0000 (21:51 +0400)
 - pool allocator is now thread-safe
 - lua subsystem now holds lock to avoid lua stack corruption
 - events subsystem now using conditional variables to wait for async_threads
 - insert_result is thread-safe now

15 files changed:
src/classifiers/bayes.c
src/classifiers/classifiers.h
src/controller.c
src/events.c
src/events.h
src/filter.c
src/lua/lua_classifier.c
src/lua/lua_common.c
src/lua/lua_common.h
src/lua/lua_config.c
src/lua/lua_task.c
src/mem_pool.c
src/mem_pool.h
src/smtp_utils.c
src/worker.c

index 265957bc92ac52fcfc2303a2e2e01b5041d917e5..281cc6292c758ac3e223ea946942f8ed4df0f6bd 100644 (file)
@@ -165,6 +165,16 @@ bayes_classify_callback (gpointer key, gpointer value, gpointer data)
        return FALSE;
 }
 
+#if ((GLIB_MAJOR_VERSION == 2) && (GLIB_MINOR_VERSION <= 30))
+static void
+bayes_mutex_free (gpointer data)
+{
+       GMutex                                             *mtx = data;
+
+       g_mutex_free (mtx);
+}
+#endif
+
 struct classifier_ctx*
 bayes_init (memory_pool_t *pool, struct classifier_config *cfg)
 {
@@ -173,6 +183,13 @@ bayes_init (memory_pool_t *pool, struct classifier_config *cfg)
        ctx->pool = pool;
        ctx->cfg = cfg;
        ctx->debug = FALSE;
+#if ((GLIB_MAJOR_VERSION == 2) && (GLIB_MINOR_VERSION <= 30))
+       ctx->mtx = g_mutex_new ();
+       memory_pool_add_destructor (pool, (pool_destruct_func) bayes_mutex_free, ctx->mtx);
+#else
+       ctx->mtx = memory_pool_alloc (pool, sizeof (GMutex));
+       g_mutex_init (ctx->mtx);
+#endif
 
        return ctx;
 }
@@ -205,6 +222,8 @@ bayes_classify (struct classifier_ctx* ctx, statfile_pool_t *pool, GTree *input,
                }
        }
 
+       /* Critical section as here can be lua callbacks calling */
+       g_mutex_lock (ctx->mtx);
        cur = call_classifier_pre_callbacks (ctx->cfg, task, FALSE, FALSE);
        if (cur) {
                memory_pool_add_destructor (task->task_pool, (pool_destruct_func)g_list_free, cur);
@@ -212,6 +231,7 @@ bayes_classify (struct classifier_ctx* ctx, statfile_pool_t *pool, GTree *input,
        else {
                cur = ctx->cfg->statfiles;
        }
+       g_mutex_unlock (ctx->mtx);
 
        data.statfiles_num = g_list_length (cur);
        data.statfiles = g_new0 (struct bayes_statfile_data, data.statfiles_num);
index b05b62e34fa13a720271e2df70f2e277e00d21a0..7170834522aefd15f9156c14a32426a98e35a990 100644 (file)
@@ -17,6 +17,7 @@ struct classifier_ctx {
        GHashTable *results;
        gboolean debug;
        struct classifier_config *cfg;
+       GMutex *mtx;
 };
 
 struct classify_weight {
index b9ec3677c5fab94a08cbfa1bb75abdfac6d8ca58..ddac279306b882f2ba3f1d04db0da14157b1cfa5 100644 (file)
@@ -846,9 +846,9 @@ process_normal_command (const gchar *line)
 }
 
 /*
- * Called if all filters are processed
+ * Called if all filters are processed, non-threaded and simple version
  */
-static void
+static gboolean
 fin_learn_task (void *arg)
 {
        struct worker_task             *task = (struct worker_task *) arg;
@@ -870,6 +870,8 @@ fin_learn_task (void *arg)
                        rspamd_dispatcher_restore (task->dispatcher);
                }
        }
+
+       return TRUE;
 }
 
 /*
index b658f48a9cede13a4160f6c589c7088e468d4296..0d831104cfcb58f6e28d43a8b19fc2b9a214131f 100644 (file)
@@ -56,10 +56,18 @@ event_mutex_free (gpointer data)
 
        g_mutex_free (mtx);
 }
+
+static void
+event_cond_free (gpointer data)
+{
+       GCond                                              *cond = data;
+
+       g_cond_free (cond);
+}
 #endif
 
 struct rspamd_async_session    *
-new_async_session (memory_pool_t * pool, event_finalizer_t fin,
+new_async_session (memory_pool_t * pool, session_finalizer_t fin,
                event_finalizer_t restore, event_finalizer_t cleanup, void *user_data)
 {
        struct rspamd_async_session    *new;
@@ -74,10 +82,14 @@ new_async_session (memory_pool_t * pool, event_finalizer_t fin,
        new->events = g_hash_table_new (rspamd_event_hash, rspamd_event_equal);
 #if ((GLIB_MAJOR_VERSION == 2) && (GLIB_MINOR_VERSION <= 30))
        new->mtx = g_mutex_new ();
+       new->cond = g_cond_new ();
        memory_pool_add_destructor (pool, (pool_destruct_func) event_mutex_free, new->mtx);
+       memory_pool_add_destructor (pool, (pool_destruct_func) event_cond_free, new->cond);
 #else
        new->mtx = memory_pool_alloc (pool, sizeof (GMutex));
        g_mutex_init (new->mtx);
+       new->cond = memory_pool_alloc (pool, sizeof (GCond));
+       g_cond_init (new->cond);
 #endif
        new->threads = 0;
 
@@ -96,6 +108,7 @@ register_async_event (struct rspamd_async_session *session, event_finalizer_t fi
                return;
        }
 
+       g_mutex_lock (session->mtx);
        new = memory_pool_alloc (session->pool, sizeof (struct rspamd_async_event));
        new->fin = fin;
        new->user_data = user_data;
@@ -106,6 +119,7 @@ register_async_event (struct rspamd_async_session *session, event_finalizer_t fi
        msg_info ("added event: %p, pending %d events, subsystem: %s", user_data, g_hash_table_size (session->events),
                        g_quark_to_string (subsystem));
 #endif
+       g_mutex_unlock (session->mtx);
 }
 
 void
@@ -118,6 +132,7 @@ remove_normal_event (struct rspamd_async_session *session, event_finalizer_t fin
                return;
        }
 
+       g_mutex_lock (session->mtx);
        /* Search for event */
        search_ev.fin = fin;
        search_ev.user_data = ud;
@@ -130,13 +145,12 @@ remove_normal_event (struct rspamd_async_session *session, event_finalizer_t fin
                /* Remove event */
                fin (ud);
        }
-
-
+       g_mutex_unlock (session->mtx);
 
        check_session_pending (session);
 }
 
-static void
+static gboolean
 rspamd_session_destroy (gpointer k, gpointer v, gpointer unused)
 {
        struct rspamd_async_event      *ev = v;
@@ -145,6 +159,8 @@ rspamd_session_destroy (gpointer k, gpointer v, gpointer unused)
        if (ev->fin != NULL) {
                ev->fin (ev->user_data);
        }
+
+       return TRUE;
 }
 
 gboolean
@@ -155,36 +171,47 @@ destroy_session (struct rspamd_async_session *session)
                return FALSE;
        }
 
+       g_mutex_lock (session->mtx);
+       if (session->threads > 0) {
+       /* Wait for conditional variable to finish processing */
+               g_mutex_unlock (session->mtx);
+               g_cond_wait (session->cond, session->mtx);
+       }
+
        session->wanna_die = TRUE;
 
-       g_hash_table_foreach (session->events, rspamd_session_destroy, session);
+       g_hash_table_foreach_remove (session->events, rspamd_session_destroy, session);
 
-       if (session->threads == 0) {
-               if (session->cleanup != NULL) {
-                       session->cleanup (session->user_data);
-               }
-               return TRUE;
-       }
+       /* Mutex can be destroyed here */
+       g_mutex_unlock (session->mtx);
 
-       return FALSE;
+       if (session->cleanup != NULL) {
+               session->cleanup (session->user_data);
+       }
+       return TRUE;
 }
 
 gboolean
 check_session_pending (struct rspamd_async_session *session)
 {
        g_mutex_lock (session->mtx);
-       if (session->wanna_die && session->threads == 0 && g_hash_table_size (session->events) == 0) {
+       if (session->wanna_die && g_hash_table_size (session->events) == 0) {
                session->wanna_die = FALSE;
-               if (session->fin != NULL) {
-                       session->fin (session->user_data);
+               if (session->threads > 0) {
+                       /* Wait for conditional variable to finish processing */
+                       g_cond_wait (session->cond, session->mtx);
                }
-               /* Check events count again */
-               if (g_hash_table_size (session->events) != 0) {
-                       if (session->restore != NULL) {
-                               session->restore (session->user_data);
+               if (session->fin != NULL) {
+                       if (! session->fin (session->user_data)) {
+                               g_mutex_unlock (session->mtx);
+                               /* Session finished incompletely, perform restoration */
+                               if (session->restore != NULL) {
+                                       session->restore (session->user_data);
+                                       /* Call pending once more */
+                                       return check_session_pending (session);
+                               }
+                               return TRUE;
                        }
-                       g_mutex_unlock (session->mtx);
-                       return TRUE;
                }
                g_mutex_unlock (session->mtx);
                return FALSE;
@@ -215,7 +242,10 @@ void
 remove_async_thread (struct rspamd_async_session *session)
 {
        if (g_atomic_int_dec_and_test (&session->threads)) {
-               (void) check_session_pending (session);
+               /* Signal if there are any sessions waiting */
+               g_mutex_lock (session->mtx);
+               g_cond_signal (session->cond);
+               g_mutex_unlock (session->mtx);
        }
 #ifdef RSPAMD_EVENTS_DEBUG
        msg_info ("removed thread: pending %d thread", session->threads);
index 2c4ea2c917e5fb7594cf106b298e5f09f1113cd0..1329d58379888c9dd3e308c89f47ca27b954183a 100644 (file)
@@ -6,6 +6,7 @@
 struct rspamd_async_event;
 
 typedef void (*event_finalizer_t)(void *user_data);
+typedef gboolean (*session_finalizer_t)(void *user_data);
 
 struct rspamd_async_event {
        GQuark subsystem;
@@ -15,7 +16,7 @@ struct rspamd_async_event {
 };
 
 struct rspamd_async_session {
-       event_finalizer_t fin;
+       session_finalizer_t fin;
        event_finalizer_t restore;
        event_finalizer_t cleanup;
        GHashTable *events;
@@ -24,6 +25,7 @@ struct rspamd_async_session {
        gboolean wanna_die;
        guint threads;
        GMutex *mtx;
+       GCond *cond;
 };
 
 /**
@@ -36,7 +38,7 @@ struct rspamd_async_session {
  * @return
  */
 struct rspamd_async_session *new_async_session (memory_pool_t *pool,
-               event_finalizer_t fin, event_finalizer_t restore,
+               session_finalizer_t fin, event_finalizer_t restore,
                event_finalizer_t cleanup, void *user_data);
 
 /**
index cf8a63d0d109f15a52926df3c27d822ee78249ae..bca6d17d401591b29a5c3b90ced7059b841d9878 100644 (file)
@@ -140,6 +140,12 @@ insert_metric_result (struct worker_task *task, struct metric *metric, const gch
        
 }
 
+#if ((GLIB_MAJOR_VERSION == 2) && (GLIB_MINOR_VERSION <= 30))
+static GStaticMutex result_mtx =  G_STATIC_MUTEX_INIT;
+#else
+G_LOCK_DEFINE (result_mtx);
+#endif
+
 static void
 insert_result_common (struct worker_task *task, const gchar *symbol, double flag, GList * opts, gboolean single)
 {
@@ -147,6 +153,12 @@ insert_result_common (struct worker_task *task, const gchar *symbol, double flag
        struct cache_item              *item;
        GList                          *cur, *metric_list;
 
+       /* Avoid concurrenting inserting of results */
+#if ((GLIB_MAJOR_VERSION == 2) && (GLIB_MINOR_VERSION <= 30))
+       g_static_mutex_lock (&result_mtx);
+#else
+       G_LOCK (result_mtx);
+#endif
        metric_list = g_hash_table_lookup (task->cfg->metrics_symbols, symbol);
        if (metric_list) {
                cur = metric_list;
@@ -174,6 +186,11 @@ insert_result_common (struct worker_task *task, const gchar *symbol, double flag
                /* XXX: it is not wise to destroy them here */
                g_list_free (opts);
        }
+#if ((GLIB_MAJOR_VERSION == 2) && (GLIB_MINOR_VERSION <= 30))
+       g_static_mutex_unlock (&result_mtx);
+#else
+       G_UNLOCK (result_mtx);
+#endif
 }
 
 /* Insert result that may be increased on next insertions */
index 0bf0daca8f91ae2d4d13e80b08861efaacd0ff6c..edaf4e7a61553accce99f3fb16713ce54b928267 100644 (file)
@@ -123,6 +123,7 @@ call_classifier_pre_callbacks (struct classifier_config *ccf, struct worker_task
        struct classifier_callback_data *cd;
        lua_State                       *L;
 
+
        /* Go throught all callbacks and call them, appending results to list */
        cur = g_list_first (ccf->pre_callbacks);
        while (cur) {
@@ -134,6 +135,7 @@ call_classifier_pre_callbacks (struct classifier_config *ccf, struct worker_task
                cur = g_list_next (cur);
        }
        
+       g_mutex_lock (lua_mtx);
        if (res == NULL) {
                L = task->cfg->lua_state;
                /* Check function from global table 'classifiers' */
@@ -149,6 +151,8 @@ call_classifier_pre_callbacks (struct classifier_config *ccf, struct worker_task
                }
                lua_pop (L, 1);
        }
+       g_mutex_unlock (lua_mtx);
+
        return res;
 }
 
@@ -162,6 +166,7 @@ call_classifier_post_callbacks (struct classifier_config *ccf, struct worker_tas
        double                          out = in;
        GList                          *cur;
 
+       g_mutex_lock (lua_mtx);
        /* Go throught all callbacks and call them, appending results to list */
        cur = g_list_first (ccf->pre_callbacks);
        while (cur) {
@@ -190,6 +195,7 @@ call_classifier_post_callbacks (struct classifier_config *ccf, struct worker_tas
 
                cur = g_list_next (cur);
        }
+       g_mutex_unlock (lua_mtx);
 
        return out;
 
index 00fcadc936205b527f9f984fb6b16f1ca50c9cd5..4d90048cac671a1572d521b391e01f5b0fe00022 100644 (file)
@@ -28,6 +28,9 @@
 /* Lua module init function */
 #define MODULE_INIT_FUNC "module_init"
 
+/* Global lua mutex */
+GMutex *lua_mtx = NULL;
+
 const luaL_reg                  null_reg[] = {
        {"__tostring", lua_class_tostring},
        {NULL, NULL}
@@ -224,6 +227,13 @@ init_lua (struct config_file *cfg)
        L = lua_open ();
        luaL_openlibs (L);
 
+#if ((GLIB_MAJOR_VERSION == 2) && (GLIB_MINOR_VERSION <= 30))
+       lua_mtx = g_mutex_new ();
+#else
+       lua_mtx = g_malloc (sizeof (GMutex));
+       g_mutex_init (lua_mtx);
+#endif
+
        (void)luaopen_rspamd (L);
        (void)luaopen_logger (L);
        (void)luaopen_config (L);
@@ -320,6 +330,7 @@ lua_call_filter (const gchar *function, struct worker_task *task)
        struct worker_task            **ptask;
        lua_State                      *L = task->cfg->lua_state;
 
+       g_mutex_lock (lua_mtx);
        lua_getglobal (L, function);
        ptask = lua_newuserdata (L, sizeof (struct worker_task *));
        lua_setclass (L, "rspamd{task}", -1);
@@ -335,6 +346,8 @@ lua_call_filter (const gchar *function, struct worker_task *task)
        }
        result = lua_tonumber (L, -1);
        lua_pop (L, 1);                         /* pop returned value */
+       g_mutex_unlock (lua_mtx);
+
        return result;
 }
 
@@ -345,6 +358,7 @@ lua_call_chain_filter (const gchar *function, struct worker_task *task, gint *ma
        guint                           i;
        lua_State                      *L = task->cfg->lua_state;
 
+       g_mutex_lock (lua_mtx);
        lua_getglobal (L, function);
 
        for (i = 0; i < number; i++) {
@@ -360,6 +374,8 @@ lua_call_chain_filter (const gchar *function, struct worker_task *task, gint *ma
        }
        result = lua_tonumber (L, -1);
        lua_pop (L, 1);                         /* pop returned value */
+       g_mutex_unlock (lua_mtx);
+
        return result;
 }
 
@@ -374,6 +390,7 @@ lua_call_expression_func (const gchar *module, const gchar *function,
        struct expression_argument     *arg;
        int                             nargs = 1, pop = 0;
 
+       g_mutex_lock (lua_mtx);
        /* Call specified function and expect result of given expected_type */
        /* First check function in config table */
        lua_getglobal (L, "config");
@@ -435,6 +452,7 @@ lua_call_expression_func (const gchar *module, const gchar *function,
 
        if (lua_pcall (L, nargs, 1, 0) != 0) {
                msg_info ("call to %s failed: %s", function, lua_tostring (L, -1));
+               g_mutex_unlock (lua_mtx);
                return FALSE;
        }
        pop ++;
@@ -442,11 +460,13 @@ lua_call_expression_func (const gchar *module, const gchar *function,
        if (!lua_isboolean (L, -1)) {
                lua_pop (L, pop);
                msg_info ("function %s must return a boolean", function);
+               g_mutex_unlock (lua_mtx);
                return FALSE;
        }
        *res = lua_toboolean (L, -1);
        lua_pop (L, pop);
 
+       g_mutex_unlock (lua_mtx);
        return TRUE;
 }
 
@@ -468,6 +488,7 @@ lua_consolidation_callback (gpointer key, gpointer value, gpointer arg)
        struct consolidation_callback_data *data = (struct consolidation_callback_data *)arg;
        lua_State                      *L = data->task->cfg->lua_state;
 
+       g_mutex_lock (lua_mtx);
        lua_getglobal (L, data->func);
 
        lua_pushstring (L, (const gchar *)key);
@@ -483,6 +504,7 @@ lua_consolidation_callback (gpointer key, gpointer value, gpointer arg)
        res = lua_tonumber (L, -1);
        lua_pop (L, 1);                         /* pop returned value */
        data->score += res;
+       g_mutex_unlock (lua_mtx);
 }
 
 double
@@ -519,6 +541,7 @@ lua_normalizer_func (struct config_file *cfg, long double score, void *params)
         return score;
     }
 
+    g_mutex_lock (lua_mtx);
     lua_getglobal (L, p->data);
     lua_pushnumber (L, score);
 
@@ -533,6 +556,7 @@ lua_normalizer_func (struct config_file *cfg, long double score, void *params)
        res = lua_tonumber (L, -1);
        lua_pop (L, 1);
 
+       g_mutex_unlock (lua_mtx);
     return res;
 }
 
index c1891a6a7453c57b9e191f9f2492174d2921723b..32604cbc4158028a287c7d586f19e59dbc867f35 100644 (file)
@@ -15,6 +15,7 @@
 #define LUA_INTERFACE_DEF(class, name) { #name, lua_##class##_##name }
 
 extern const luaL_reg null_reg[];
+extern GMutex *lua_mtx;
 
 #define RSPAMD_LUA_API_VERSION 9
 
index cd1287a188d20dd514f65f9e278e7fffb9446f18..bdb4ce0568ce28aa9d0485316feb175ba1c0be61 100644 (file)
@@ -318,6 +318,7 @@ lua_config_function_callback (struct worker_task *task, GList *args, void *user_
        GList                          *cur;
        gboolean                        res = FALSE;
 
+       g_mutex_lock (lua_mtx);
        if (cd->cb_is_ref) {
                lua_rawgeti (cd->L, LUA_REGISTRYINDEX, cd->callback.ref);
        }
@@ -347,6 +348,7 @@ lua_config_function_callback (struct worker_task *task, GList *args, void *user_
                }
                lua_pop (cd->L, 1);
        }
+       g_mutex_unlock (lua_mtx);
 
        return res;
 }
@@ -439,6 +441,7 @@ lua_call_post_filters (struct worker_task *task)
        struct worker_task            **ptask;
        GList                          *cur;
 
+       g_mutex_lock (lua_mtx);
        cur = task->cfg->post_filters;
        while (cur) {
                cd = cur->data;
@@ -458,6 +461,7 @@ lua_call_post_filters (struct worker_task *task)
                }
                cur = g_list_next (cur);
        }
+       g_mutex_unlock (lua_mtx);
 }
 
 static gint
@@ -583,6 +587,7 @@ lua_metric_symbol_callback (struct worker_task *task, gpointer ud)
        struct lua_callback_data       *cd = ud;
        struct worker_task            **ptask;
 
+       g_mutex_lock (lua_mtx);
        if (cd->cb_is_ref) {
                lua_rawgeti (cd->L, LUA_REGISTRYINDEX, cd->callback.ref);
        }
@@ -597,6 +602,7 @@ lua_metric_symbol_callback (struct worker_task *task, gpointer ud)
                msg_info ("call to %s failed: %s", cd->cb_is_ref ? "local function" :
                                                                        cd->callback.name, lua_tostring (cd->L, -1));
        }
+       g_mutex_unlock (lua_mtx);
 }
 
 static gint
index c0b299fb750c7fb299d1e864394910838270e679..d28d897f6bc24b725072743ea9d4c54bff5e9f14 100644 (file)
@@ -468,6 +468,7 @@ lua_dns_callback (struct rspamd_dns_reply *reply, gpointer arg)
        union rspamd_reply_element     *elt;
        GList                          *cur;
 
+       g_mutex_lock (lua_mtx);
        if (cd->cb_is_ref) {
                lua_rawgeti (cd->L, LUA_REGISTRYINDEX, cd->callback.ref);
        }
@@ -553,6 +554,7 @@ lua_dns_callback (struct rspamd_dns_reply *reply, gpointer arg)
        if (cd->cb_is_ref) {
                luaL_unref (cd->L, LUA_REGISTRYINDEX, cd->callback.ref);
        }
+       g_mutex_unlock (lua_mtx);
 }
 
 static gint
index d5b3ddf6e6738af92de11fb66f99b40b15e03632..c68ae4b132b6e405c4df83a9e48201fb10da0cf1 100644 (file)
@@ -41,6 +41,14 @@ pthread_mutex_t                 stat_mtx = PTHREAD_MUTEX_INITIALIZER;
 #   define STAT_UNLOCK() do {} while (0)
 #endif
 
+#if ((GLIB_MAJOR_VERSION == 2) && (GLIB_MINOR_VERSION <= 30))
+#      define POOL_MTX_LOCK()  do { g_static_mutex_lock (&pool->mtx); } while (0)
+#      define POOL_MTX_UNLOCK()        do { g_static_mutex_unlock (&pool->mtx); } while (0)
+#else
+#      define POOL_MTX_LOCK()  do { g_mutex_lock (&pool->mtx); } while (0)
+#      define POOL_MTX_UNLOCK()        do { g_mutex_unlock (&pool->mtx); } while (0)
+#endif
+
 /* 
  * This define specify whether we should check all pools for free space for new object
  * or just begin scan from current (recently attached) pool
@@ -101,6 +109,7 @@ pool_chain_new_shared (gsize size)
        struct _pool_chain_shared      *chain;
        gpointer                        map;
 
+
 #if defined(HAVE_MMAP_ANON)
        map = mmap (NULL, size + sizeof (struct _pool_chain_shared), PROT_READ | PROT_WRITE, MAP_ANON | MAP_SHARED, -1, 0);
        if (map == MAP_FAILED) {
@@ -189,6 +198,11 @@ memory_pool_new (gsize size)
        new->destructors = NULL;
        /* Set it upon first call of set variable */
        new->variables = NULL;
+#if ((GLIB_MAJOR_VERSION == 2) && (GLIB_MINOR_VERSION <= 30))
+       new->mtx = G_STATIC_MUTEX_INIT;
+#else
+       g_mutex_init (&new->mtx);
+#endif
 
        mem_pool_stat->pools_allocated++;
 
@@ -203,6 +217,7 @@ memory_pool_alloc (memory_pool_t * pool, gsize size)
        gint                            free;
 
        if (pool) {
+               POOL_MTX_LOCK ();
 #ifdef MEMORY_GREEDY
                cur = pool->first_pool;
 #else
@@ -214,6 +229,7 @@ memory_pool_alloc (memory_pool_t * pool, gsize size)
                }
                if (free < (gint)size && cur->next == NULL) {
                        /* Allocate new pool */
+
                        if (cur->len >= size + MEM_ALIGNMENT) {
                                new = pool_chain_new (cur->len);
                        }
@@ -227,10 +243,12 @@ memory_pool_alloc (memory_pool_t * pool, gsize size)
                        /* No need to align again */
                        tmp = new->pos;
                        new->pos = tmp + size;
+                       POOL_MTX_UNLOCK ();
                        return tmp;
                }
                tmp = align_ptr (cur->pos, MEM_ALIGNMENT);
                cur->pos = tmp + size;
+               POOL_MTX_UNLOCK ();
                return tmp;
        }
        return NULL;
@@ -317,6 +335,7 @@ memory_pool_alloc_shared (memory_pool_t * pool, gsize size)
        if (pool) {
                g_return_val_if_fail (size > 0, NULL);
 
+               POOL_MTX_LOCK ();
                cur = pool->shared_pool;
                if (!cur) {
                        cur = pool_chain_new_shared (pool->first_pool->len);
@@ -329,6 +348,7 @@ memory_pool_alloc_shared (memory_pool_t * pool, gsize size)
                }
                if (free < (gint)size && cur->next == NULL) {
                        /* Allocate new pool */
+
                        if (cur->len >= size + MEM_ALIGNMENT) {
                                new = pool_chain_new_shared (cur->len);
                        }
@@ -342,10 +362,12 @@ memory_pool_alloc_shared (memory_pool_t * pool, gsize size)
                        STAT_LOCK ();
                        mem_pool_stat->bytes_allocated += size;
                        STAT_UNLOCK ();
+                       POOL_MTX_UNLOCK ();
                        return new->begin;
                }
                tmp = align_ptr (cur->pos, MEM_ALIGNMENT);
                cur->pos = tmp + size;
+               POOL_MTX_UNLOCK ();
                return tmp;
        }
        return NULL;
@@ -455,12 +477,14 @@ memory_pool_add_destructor_full (memory_pool_t * pool, pool_destruct_func func,
 
        cur = memory_pool_alloc (pool, sizeof (struct _pool_destructors));
        if (cur) {
+               POOL_MTX_LOCK ();
                cur->func = func;
                cur->data = data;
                cur->function = function;
                cur->loc = line;
                cur->prev = pool->destructors;
                pool->destructors = cur;
+               POOL_MTX_UNLOCK ();
        }
 }
 
@@ -488,6 +512,7 @@ memory_pool_delete (memory_pool_t * pool)
        struct _pool_chain_shared      *cur_shared = pool->shared_pool, *tmp_shared;
        struct _pool_destructors       *destructor = pool->destructors;
 
+       POOL_MTX_LOCK ();
        /* Call all pool destructors */
        while (destructor) {
                /* Avoid calling destructors for NULL pointers */
@@ -522,6 +547,7 @@ memory_pool_delete (memory_pool_t * pool)
        }
 
        mem_pool_stat->pools_freed++;
+       POOL_MTX_UNLOCK ();
        g_slice_free (memory_pool_t, pool);
 }
 
index 62f6dcb9aa89c6782e6273fa16ace57679ffa3e7..475a00629149b13392c659853c7c2dcbb312c04c 100644 (file)
@@ -76,6 +76,11 @@ typedef struct memory_pool_s {
        struct _pool_chain_shared *shared_pool; /**< shared chain                                                       */
        struct _pool_destructors *destructors;  /**< destructors chain                                          */
        GHashTable *variables;                                  /**< private memory pool variables                      */
+#if ((GLIB_MAJOR_VERSION == 2) && (GLIB_MINOR_VERSION <= 30))
+       GStaticMutex mtx;                                               /**< threads lock                                                       */
+#else
+       GMutex mtx;                                                             /**< threads lock                                                       */
+#endif
 } memory_pool_t;
 
 /**
index 3a9d62cbbf4e405dd4d665f97c7a5cd933e80c9e..8ceaef5a673d3214fa2bc6ee99ba683b2ba08020 100644 (file)
@@ -28,7 +28,7 @@
 #include "smtp.h"
 #include "smtp_proto.h"
 
-void
+gboolean
 free_smtp_session (gpointer arg)
 {
        struct smtp_session            *session = arg;
@@ -56,6 +56,8 @@ free_smtp_session (gpointer arg)
                memory_pool_delete (session->pool);
                g_free (session);
        }
+
+       return TRUE;
 }
 
 gboolean
index f113e04d980702329a30283967e0729fd035d0ee..d56c2d9247c621f29ecf02d53ca2a1f9d1804b5b 100644 (file)
@@ -613,7 +613,7 @@ err_socket (GError * err, void *arg)
 /*
  * Called if all filters are processed
  */
-static void
+static gboolean
 fin_task (void *arg)
 {
        struct worker_task             *task = (struct worker_task *) arg;
@@ -630,20 +630,25 @@ fin_task (void *arg)
                        /* Just process composites */
                        make_composites (task);
                }
-               /* Call post filters */
-               lua_call_post_filters (task);
+               if (task->cfg->post_filters) {
+                       /* More to process */
+                       /* Special state */
+                       task->state = WAIT_POST_FILTER;
+                       return FALSE;
+               }
+
        }
 
        /* Check if we have all events finished */
-       if (g_hash_table_size (task->s->events) == 0 && task->s->threads == 0) {
-               task->state = WRITE_REPLY;
-               if (task->fin_callback) {
-                       task->fin_callback (task->fin_arg);
-               }
-               else {
-                       rspamd_dispatcher_restore (task->dispatcher);
-               }
+       task->state = WRITE_REPLY;
+       if (task->fin_callback) {
+               task->fin_callback (task->fin_arg);
+       }
+       else {
+               rspamd_dispatcher_restore (task->dispatcher);
        }
+
+       return TRUE;
 }
 
 /*
@@ -654,8 +659,8 @@ restore_task (void *arg)
 {
        struct worker_task             *task = (struct worker_task *) arg;
 
-       /* Special state */
-       task->state = WAIT_POST_FILTER;
+       /* Call post filters */
+       lua_call_post_filters (task);
        task->s->wanna_die = TRUE;
 }