]> source.dussan.org Git - rspamd.git/commitdiff
More fixes to thread-safe processing.
authorVsevolod Stakhov <vsevolod@rambler-co.ru>
Tue, 14 Feb 2012 14:32:19 +0000 (18:32 +0400)
committerVsevolod Stakhov <vsevolod@rambler-co.ru>
Tue, 14 Feb 2012 14:32:19 +0000 (18:32 +0400)
src/expressions.c
src/filter.c
src/lua/lua_redis.c
src/plugins/regexp.c

index 99277dfd25c6e7a94627c5a2a8270edeef474a29..488ea5600e6f4dcadff441f73a6170ae79ad8894 100644 (file)
@@ -134,33 +134,6 @@ re_cache_del (const gchar *line, memory_pool_t *pool)
 
 }
 
-/* Task cache functions */
-void
-task_cache_add (struct worker_task *task, struct rspamd_regexp *re, gint32 result)
-{
-       if (result == 0) {
-               result = -1;
-       }
-
-       g_hash_table_insert (task->re_cache, re->regexp_text, GINT_TO_POINTER (result));
-}
-
-gint32
-task_cache_check (struct worker_task *task, struct rspamd_regexp *re)
-{
-       gpointer                        res;
-       gint32                          r;
-
-       if ((res = g_hash_table_lookup (task->re_cache, re->regexp_text)) != NULL) {
-               r = GPOINTER_TO_INT (res);
-               if (r == -1) {
-                       return 0;
-               }
-               return 1;
-       }
-       return -1;
-}
-
 /*
  * Functions for parsing expressions
  */
index bca6d17d401591b29a5c3b90ced7059b841d9878..1d6f19e21b99a6117cd5a7d52ab20fced55212e7 100644 (file)
@@ -214,14 +214,31 @@ check_metric_is_spam (struct worker_task *task, struct metric *metric)
        struct metric_result           *res;
        double                          ms, rs;
 
+       /* Avoid concurrenting while checking results */
+#if ((GLIB_MAJOR_VERSION == 2) && (GLIB_MINOR_VERSION <= 30))
+       g_static_mutex_lock (&result_mtx);
+#else
+       G_LOCK (result_mtx);
+#endif
        res = g_hash_table_lookup (task->results, metric->name);
        if (res) {
+#if ((GLIB_MAJOR_VERSION == 2) && (GLIB_MINOR_VERSION <= 30))
+               g_static_mutex_unlock (&result_mtx);
+#else
+               G_UNLOCK (result_mtx);
+#endif
                if (!check_metric_settings (res, &ms, &rs)) {
                        ms = metric->required_score;
                }
                return res->score >= ms;
        }
 
+#if ((GLIB_MAJOR_VERSION == 2) && (GLIB_MINOR_VERSION <= 30))
+       g_static_mutex_unlock (&result_mtx);
+#else
+       G_UNLOCK (result_mtx);
+#endif
+
        return FALSE;
 }
 
index a37396b7bb3a3a3b23be1b429b4516cd9797fc14..0358a69604556ce23dabea7e778d0ba50a78d995 100644 (file)
@@ -83,7 +83,9 @@ lua_redis_fin (void *arg)
 
        if (ud->ctx) {
                redisAsyncFree (ud->ctx);
+               g_mutex_lock (lua_mtx);
                luaL_unref (ud->L, LUA_REGISTRYINDEX, ud->cbref);
+               g_mutex_unlock (lua_mtx);
        }
 }
 
@@ -97,6 +99,7 @@ lua_redis_push_error (const gchar *err, struct lua_redis_userdata *ud, gboolean
 {
        struct worker_task                                      **ptask;
 
+       g_mutex_lock (lua_mtx);
        /* Push error */
        lua_rawgeti (ud->L, LUA_REGISTRYINDEX, ud->cbref);
        ptask = lua_newuserdata (ud->L, sizeof (struct worker_task *));
@@ -110,9 +113,12 @@ lua_redis_push_error (const gchar *err, struct lua_redis_userdata *ud, gboolean
        if (lua_pcall (ud->L, 3, 0, 0) != 0) {
                msg_info ("call to callback failed: %s", lua_tostring (ud->L, -1));
        }
+       g_mutex_unlock (lua_mtx);
+
        if (connected) {
                remove_normal_event (ud->task->s, lua_redis_fin, ud);
        }
+
 }
 
 /**
@@ -125,6 +131,7 @@ lua_redis_push_data (const redisReply *r, struct lua_redis_userdata *ud)
 {
        struct worker_task                                      **ptask;
 
+       g_mutex_lock (lua_mtx);
        /* Push error */
        lua_rawgeti (ud->L, LUA_REGISTRYINDEX, ud->cbref);
        ptask = lua_newuserdata (ud->L, sizeof (struct worker_task *));
@@ -154,6 +161,7 @@ lua_redis_push_data (const redisReply *r, struct lua_redis_userdata *ud)
        if (lua_pcall (ud->L, 3, 0, 0) != 0) {
                msg_info ("call to callback failed: %s", lua_tostring (ud->L, -1));
        }
+       g_mutex_unlock (lua_mtx);
 
        remove_normal_event (ud->task->s, lua_redis_fin, ud);
 }
index c9b58443634a027d96f5d08ed4794f6dd9570825..5112678bc053c9fe05b3511c7109e3084e9f6d0b 100644 (file)
@@ -27,7 +27,6 @@
  */
 
 
-
 #include "config.h"
 #include "main.h"
 #include "message.h"
@@ -107,6 +106,65 @@ module_t regexp_module = {
        regexp_module_reconfig
 };
 
+/* Task cache functions */
+#if ((GLIB_MAJOR_VERSION == 2) && (GLIB_MINOR_VERSION <= 30))
+static GStaticMutex task_cache_mtx =  G_STATIC_MUTEX_INIT;
+#else
+G_LOCK_DEFINE (task_cache_mtx);
+#endif
+
+void
+task_cache_add (struct worker_task *task, struct rspamd_regexp *re, gint32 result)
+{
+       if (result == 0) {
+               result = -1;
+       }
+       /* Avoid concurrenting inserting of results */
+#if ((GLIB_MAJOR_VERSION == 2) && (GLIB_MINOR_VERSION <= 30))
+       g_static_mutex_lock (&task_cache_mtx);
+#else
+       G_LOCK (task_cache_mtx);
+#endif
+       g_hash_table_insert (task->re_cache, re->regexp_text, GINT_TO_POINTER (result));
+#if ((GLIB_MAJOR_VERSION == 2) && (GLIB_MINOR_VERSION <= 30))
+       g_static_mutex_unlock (&task_cache_mtx);
+#else
+       G_UNLOCK (task_cache_mtx);
+#endif
+}
+
+gint32
+task_cache_check (struct worker_task *task, struct rspamd_regexp *re)
+{
+       gpointer                        res;
+       gint32                          r;
+
+#if ((GLIB_MAJOR_VERSION == 2) && (GLIB_MINOR_VERSION <= 30))
+       g_static_mutex_lock (&task_cache_mtx);
+#else
+       G_LOCK (task_cache_mtx);
+#endif
+       if ((res = g_hash_table_lookup (task->re_cache, re->regexp_text)) != NULL) {
+               r = GPOINTER_TO_INT (res);
+#if ((GLIB_MAJOR_VERSION == 2) && (GLIB_MINOR_VERSION <= 30))
+               g_static_mutex_unlock (&task_cache_mtx);
+#else
+               G_UNLOCK (task_cache_mtx);
+#endif
+               if (r == -1) {
+                       return 0;
+               }
+               return 1;
+       }
+#if ((GLIB_MAJOR_VERSION == 2) && (GLIB_MINOR_VERSION <= 30))
+       g_static_mutex_unlock (&task_cache_mtx);
+#else
+       G_UNLOCK (task_cache_mtx);
+#endif
+       return -1;
+}
+
+
 static gint
 luaopen_regexp (lua_State * L)
 {
@@ -1004,6 +1062,7 @@ maybe_call_lua_function (const gchar *name, struct worker_task *task)
        struct worker_task            **ptask;
        gboolean                        res;
 
+       g_mutex_lock (lua_mtx);
        lua_getglobal (L, name);
        if (lua_isfunction (L, -1)) {
                ptask = lua_newuserdata (L, sizeof (struct worker_task *));
@@ -1012,15 +1071,18 @@ maybe_call_lua_function (const gchar *name, struct worker_task *task)
                /* Call function */
                if (lua_pcall (L, 1, 1, 0) != 0) {
                        msg_info ("call to %s failed: %s", (gchar *)name, lua_tostring (L, -1));
+                       g_mutex_unlock (lua_mtx);
                        return FALSE;
                }
                res = lua_toboolean (L, -1);
                lua_pop (L, 1);
+               g_mutex_unlock (lua_mtx);
                return res;
        }
        else {
                lua_pop (L, 1);
        }
+       g_mutex_unlock (lua_mtx);
        return FALSE;
 }
 
@@ -1243,12 +1305,11 @@ process_regexp_item (struct worker_task *task, void *user_data)
                thr_ud->item = item;
                thr_ud->task = task;
 
+               register_async_thread (task->s);
                g_thread_pool_push (regexp_module_ctx->workers, thr_ud, &err);
-               if (err == NULL) {
-                       register_async_thread (task->s);
-               }
-               else {
+               if (err != NULL) {
                        msg_err ("error pushing task to the regexp thread pool: %s", err->message);
+                       remove_async_thread (task->s);
                }
        }
        else {