aboutsummaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
authorVsevolod Stakhov <vsevolod@rambler-co.ru>2012-02-14 18:32:19 +0400
committerVsevolod Stakhov <vsevolod@rambler-co.ru>2012-02-14 18:32:19 +0400
commit236aabd74eee8c0475e332782c4bc5e010a3a7d8 (patch)
treeef818b8fb40e0bfe6b7d2609c521aaf3b1f718fe /src
parenta5b48a05a94d178c342bbad69a330addb518d148 (diff)
downloadrspamd-236aabd74eee8c0475e332782c4bc5e010a3a7d8.tar.gz
rspamd-236aabd74eee8c0475e332782c4bc5e010a3a7d8.zip
More fixes to thread-safe processing.
Diffstat (limited to 'src')
-rw-r--r--src/expressions.c27
-rw-r--r--src/filter.c17
-rw-r--r--src/lua/lua_redis.c8
-rw-r--r--src/plugins/regexp.c71
4 files changed, 91 insertions, 32 deletions
diff --git a/src/expressions.c b/src/expressions.c
index 99277dfd2..488ea5600 100644
--- a/src/expressions.c
+++ b/src/expressions.c
@@ -134,33 +134,6 @@ re_cache_del (const gchar *line, memory_pool_t *pool)
}
-/* Task cache functions */
-void
-task_cache_add (struct worker_task *task, struct rspamd_regexp *re, gint32 result)
-{
- if (result == 0) {
- result = -1;
- }
-
- g_hash_table_insert (task->re_cache, re->regexp_text, GINT_TO_POINTER (result));
-}
-
-gint32
-task_cache_check (struct worker_task *task, struct rspamd_regexp *re)
-{
- gpointer res;
- gint32 r;
-
- if ((res = g_hash_table_lookup (task->re_cache, re->regexp_text)) != NULL) {
- r = GPOINTER_TO_INT (res);
- if (r == -1) {
- return 0;
- }
- return 1;
- }
- return -1;
-}
-
/*
* Functions for parsing expressions
*/
diff --git a/src/filter.c b/src/filter.c
index bca6d17d4..1d6f19e21 100644
--- a/src/filter.c
+++ b/src/filter.c
@@ -214,14 +214,31 @@ check_metric_is_spam (struct worker_task *task, struct metric *metric)
struct metric_result *res;
double ms, rs;
+ /* Avoid concurrenting while checking results */
+#if ((GLIB_MAJOR_VERSION == 2) && (GLIB_MINOR_VERSION <= 30))
+ g_static_mutex_lock (&result_mtx);
+#else
+ G_LOCK (result_mtx);
+#endif
res = g_hash_table_lookup (task->results, metric->name);
if (res) {
+#if ((GLIB_MAJOR_VERSION == 2) && (GLIB_MINOR_VERSION <= 30))
+ g_static_mutex_unlock (&result_mtx);
+#else
+ G_UNLOCK (result_mtx);
+#endif
if (!check_metric_settings (res, &ms, &rs)) {
ms = metric->required_score;
}
return res->score >= ms;
}
+#if ((GLIB_MAJOR_VERSION == 2) && (GLIB_MINOR_VERSION <= 30))
+ g_static_mutex_unlock (&result_mtx);
+#else
+ G_UNLOCK (result_mtx);
+#endif
+
return FALSE;
}
diff --git a/src/lua/lua_redis.c b/src/lua/lua_redis.c
index a37396b7b..0358a6960 100644
--- a/src/lua/lua_redis.c
+++ b/src/lua/lua_redis.c
@@ -83,7 +83,9 @@ lua_redis_fin (void *arg)
if (ud->ctx) {
redisAsyncFree (ud->ctx);
+ g_mutex_lock (lua_mtx);
luaL_unref (ud->L, LUA_REGISTRYINDEX, ud->cbref);
+ g_mutex_unlock (lua_mtx);
}
}
@@ -97,6 +99,7 @@ lua_redis_push_error (const gchar *err, struct lua_redis_userdata *ud, gboolean
{
struct worker_task **ptask;
+ g_mutex_lock (lua_mtx);
/* Push error */
lua_rawgeti (ud->L, LUA_REGISTRYINDEX, ud->cbref);
ptask = lua_newuserdata (ud->L, sizeof (struct worker_task *));
@@ -110,9 +113,12 @@ lua_redis_push_error (const gchar *err, struct lua_redis_userdata *ud, gboolean
if (lua_pcall (ud->L, 3, 0, 0) != 0) {
msg_info ("call to callback failed: %s", lua_tostring (ud->L, -1));
}
+ g_mutex_unlock (lua_mtx);
+
if (connected) {
remove_normal_event (ud->task->s, lua_redis_fin, ud);
}
+
}
/**
@@ -125,6 +131,7 @@ lua_redis_push_data (const redisReply *r, struct lua_redis_userdata *ud)
{
struct worker_task **ptask;
+ g_mutex_lock (lua_mtx);
/* Push error */
lua_rawgeti (ud->L, LUA_REGISTRYINDEX, ud->cbref);
ptask = lua_newuserdata (ud->L, sizeof (struct worker_task *));
@@ -154,6 +161,7 @@ lua_redis_push_data (const redisReply *r, struct lua_redis_userdata *ud)
if (lua_pcall (ud->L, 3, 0, 0) != 0) {
msg_info ("call to callback failed: %s", lua_tostring (ud->L, -1));
}
+ g_mutex_unlock (lua_mtx);
remove_normal_event (ud->task->s, lua_redis_fin, ud);
}
diff --git a/src/plugins/regexp.c b/src/plugins/regexp.c
index c9b584436..5112678bc 100644
--- a/src/plugins/regexp.c
+++ b/src/plugins/regexp.c
@@ -27,7 +27,6 @@
*/
-
#include "config.h"
#include "main.h"
#include "message.h"
@@ -107,6 +106,65 @@ module_t regexp_module = {
regexp_module_reconfig
};
+/* Task cache functions */
+#if ((GLIB_MAJOR_VERSION == 2) && (GLIB_MINOR_VERSION <= 30))
+static GStaticMutex task_cache_mtx = G_STATIC_MUTEX_INIT;
+#else
+G_LOCK_DEFINE (task_cache_mtx);
+#endif
+
+void
+task_cache_add (struct worker_task *task, struct rspamd_regexp *re, gint32 result)
+{
+ if (result == 0) {
+ result = -1;
+ }
+ /* Avoid concurrenting inserting of results */
+#if ((GLIB_MAJOR_VERSION == 2) && (GLIB_MINOR_VERSION <= 30))
+ g_static_mutex_lock (&task_cache_mtx);
+#else
+ G_LOCK (task_cache_mtx);
+#endif
+ g_hash_table_insert (task->re_cache, re->regexp_text, GINT_TO_POINTER (result));
+#if ((GLIB_MAJOR_VERSION == 2) && (GLIB_MINOR_VERSION <= 30))
+ g_static_mutex_unlock (&task_cache_mtx);
+#else
+ G_UNLOCK (task_cache_mtx);
+#endif
+}
+
+gint32
+task_cache_check (struct worker_task *task, struct rspamd_regexp *re)
+{
+ gpointer res;
+ gint32 r;
+
+#if ((GLIB_MAJOR_VERSION == 2) && (GLIB_MINOR_VERSION <= 30))
+ g_static_mutex_lock (&task_cache_mtx);
+#else
+ G_LOCK (task_cache_mtx);
+#endif
+ if ((res = g_hash_table_lookup (task->re_cache, re->regexp_text)) != NULL) {
+ r = GPOINTER_TO_INT (res);
+#if ((GLIB_MAJOR_VERSION == 2) && (GLIB_MINOR_VERSION <= 30))
+ g_static_mutex_unlock (&task_cache_mtx);
+#else
+ G_UNLOCK (task_cache_mtx);
+#endif
+ if (r == -1) {
+ return 0;
+ }
+ return 1;
+ }
+#if ((GLIB_MAJOR_VERSION == 2) && (GLIB_MINOR_VERSION <= 30))
+ g_static_mutex_unlock (&task_cache_mtx);
+#else
+ G_UNLOCK (task_cache_mtx);
+#endif
+ return -1;
+}
+
+
static gint
luaopen_regexp (lua_State * L)
{
@@ -1004,6 +1062,7 @@ maybe_call_lua_function (const gchar *name, struct worker_task *task)
struct worker_task **ptask;
gboolean res;
+ g_mutex_lock (lua_mtx);
lua_getglobal (L, name);
if (lua_isfunction (L, -1)) {
ptask = lua_newuserdata (L, sizeof (struct worker_task *));
@@ -1012,15 +1071,18 @@ maybe_call_lua_function (const gchar *name, struct worker_task *task)
/* Call function */
if (lua_pcall (L, 1, 1, 0) != 0) {
msg_info ("call to %s failed: %s", (gchar *)name, lua_tostring (L, -1));
+ g_mutex_unlock (lua_mtx);
return FALSE;
}
res = lua_toboolean (L, -1);
lua_pop (L, 1);
+ g_mutex_unlock (lua_mtx);
return res;
}
else {
lua_pop (L, 1);
}
+ g_mutex_unlock (lua_mtx);
return FALSE;
}
@@ -1243,12 +1305,11 @@ process_regexp_item (struct worker_task *task, void *user_data)
thr_ud->item = item;
thr_ud->task = task;
+ register_async_thread (task->s);
g_thread_pool_push (regexp_module_ctx->workers, thr_ud, &err);
- if (err == NULL) {
- register_async_thread (task->s);
- }
- else {
+ if (err != NULL) {
msg_err ("error pushing task to the regexp thread pool: %s", err->message);
+ remove_async_thread (task->s);
}
}
else {