From 236aabd74eee8c0475e332782c4bc5e010a3a7d8 Mon Sep 17 00:00:00 2001 From: Vsevolod Stakhov Date: Tue, 14 Feb 2012 18:32:19 +0400 Subject: [PATCH] More fixes to thread-safe processing. --- src/expressions.c | 27 ----------------- src/filter.c | 17 +++++++++++ src/lua/lua_redis.c | 8 +++++ src/plugins/regexp.c | 71 ++++++++++++++++++++++++++++++++++++++++---- 4 files changed, 91 insertions(+), 32 deletions(-) diff --git a/src/expressions.c b/src/expressions.c index 99277dfd2..488ea5600 100644 --- a/src/expressions.c +++ b/src/expressions.c @@ -134,33 +134,6 @@ re_cache_del (const gchar *line, memory_pool_t *pool) } -/* Task cache functions */ -void -task_cache_add (struct worker_task *task, struct rspamd_regexp *re, gint32 result) -{ - if (result == 0) { - result = -1; - } - - g_hash_table_insert (task->re_cache, re->regexp_text, GINT_TO_POINTER (result)); -} - -gint32 -task_cache_check (struct worker_task *task, struct rspamd_regexp *re) -{ - gpointer res; - gint32 r; - - if ((res = g_hash_table_lookup (task->re_cache, re->regexp_text)) != NULL) { - r = GPOINTER_TO_INT (res); - if (r == -1) { - return 0; - } - return 1; - } - return -1; -} - /* * Functions for parsing expressions */ diff --git a/src/filter.c b/src/filter.c index bca6d17d4..1d6f19e21 100644 --- a/src/filter.c +++ b/src/filter.c @@ -214,14 +214,31 @@ check_metric_is_spam (struct worker_task *task, struct metric *metric) struct metric_result *res; double ms, rs; + /* Avoid concurrenting while checking results */ +#if ((GLIB_MAJOR_VERSION == 2) && (GLIB_MINOR_VERSION <= 30)) + g_static_mutex_lock (&result_mtx); +#else + G_LOCK (result_mtx); +#endif res = g_hash_table_lookup (task->results, metric->name); if (res) { +#if ((GLIB_MAJOR_VERSION == 2) && (GLIB_MINOR_VERSION <= 30)) + g_static_mutex_unlock (&result_mtx); +#else + G_UNLOCK (result_mtx); +#endif if (!check_metric_settings (res, &ms, &rs)) { ms = metric->required_score; } return res->score >= ms; } +#if ((GLIB_MAJOR_VERSION == 2) && (GLIB_MINOR_VERSION <= 30)) + g_static_mutex_unlock (&result_mtx); +#else + G_UNLOCK (result_mtx); +#endif + return FALSE; } diff --git a/src/lua/lua_redis.c b/src/lua/lua_redis.c index a37396b7b..0358a6960 100644 --- a/src/lua/lua_redis.c +++ b/src/lua/lua_redis.c @@ -83,7 +83,9 @@ lua_redis_fin (void *arg) if (ud->ctx) { redisAsyncFree (ud->ctx); + g_mutex_lock (lua_mtx); luaL_unref (ud->L, LUA_REGISTRYINDEX, ud->cbref); + g_mutex_unlock (lua_mtx); } } @@ -97,6 +99,7 @@ lua_redis_push_error (const gchar *err, struct lua_redis_userdata *ud, gboolean { struct worker_task **ptask; + g_mutex_lock (lua_mtx); /* Push error */ lua_rawgeti (ud->L, LUA_REGISTRYINDEX, ud->cbref); ptask = lua_newuserdata (ud->L, sizeof (struct worker_task *)); @@ -110,9 +113,12 @@ lua_redis_push_error (const gchar *err, struct lua_redis_userdata *ud, gboolean if (lua_pcall (ud->L, 3, 0, 0) != 0) { msg_info ("call to callback failed: %s", lua_tostring (ud->L, -1)); } + g_mutex_unlock (lua_mtx); + if (connected) { remove_normal_event (ud->task->s, lua_redis_fin, ud); } + } /** @@ -125,6 +131,7 @@ lua_redis_push_data (const redisReply *r, struct lua_redis_userdata *ud) { struct worker_task **ptask; + g_mutex_lock (lua_mtx); /* Push error */ lua_rawgeti (ud->L, LUA_REGISTRYINDEX, ud->cbref); ptask = lua_newuserdata (ud->L, sizeof (struct worker_task *)); @@ -154,6 +161,7 @@ lua_redis_push_data (const redisReply *r, struct lua_redis_userdata *ud) if (lua_pcall (ud->L, 3, 0, 0) != 0) { msg_info ("call to callback failed: %s", lua_tostring (ud->L, -1)); } + g_mutex_unlock (lua_mtx); remove_normal_event (ud->task->s, lua_redis_fin, ud); } diff --git a/src/plugins/regexp.c b/src/plugins/regexp.c index c9b584436..5112678bc 100644 --- a/src/plugins/regexp.c +++ b/src/plugins/regexp.c @@ -27,7 +27,6 @@ */ - #include "config.h" #include "main.h" #include "message.h" @@ -107,6 +106,65 @@ module_t regexp_module = { regexp_module_reconfig }; +/* Task cache functions */ +#if ((GLIB_MAJOR_VERSION == 2) && (GLIB_MINOR_VERSION <= 30)) +static GStaticMutex task_cache_mtx = G_STATIC_MUTEX_INIT; +#else +G_LOCK_DEFINE (task_cache_mtx); +#endif + +void +task_cache_add (struct worker_task *task, struct rspamd_regexp *re, gint32 result) +{ + if (result == 0) { + result = -1; + } + /* Avoid concurrenting inserting of results */ +#if ((GLIB_MAJOR_VERSION == 2) && (GLIB_MINOR_VERSION <= 30)) + g_static_mutex_lock (&task_cache_mtx); +#else + G_LOCK (task_cache_mtx); +#endif + g_hash_table_insert (task->re_cache, re->regexp_text, GINT_TO_POINTER (result)); +#if ((GLIB_MAJOR_VERSION == 2) && (GLIB_MINOR_VERSION <= 30)) + g_static_mutex_unlock (&task_cache_mtx); +#else + G_UNLOCK (task_cache_mtx); +#endif +} + +gint32 +task_cache_check (struct worker_task *task, struct rspamd_regexp *re) +{ + gpointer res; + gint32 r; + +#if ((GLIB_MAJOR_VERSION == 2) && (GLIB_MINOR_VERSION <= 30)) + g_static_mutex_lock (&task_cache_mtx); +#else + G_LOCK (task_cache_mtx); +#endif + if ((res = g_hash_table_lookup (task->re_cache, re->regexp_text)) != NULL) { + r = GPOINTER_TO_INT (res); +#if ((GLIB_MAJOR_VERSION == 2) && (GLIB_MINOR_VERSION <= 30)) + g_static_mutex_unlock (&task_cache_mtx); +#else + G_UNLOCK (task_cache_mtx); +#endif + if (r == -1) { + return 0; + } + return 1; + } +#if ((GLIB_MAJOR_VERSION == 2) && (GLIB_MINOR_VERSION <= 30)) + g_static_mutex_unlock (&task_cache_mtx); +#else + G_UNLOCK (task_cache_mtx); +#endif + return -1; +} + + static gint luaopen_regexp (lua_State * L) { @@ -1004,6 +1062,7 @@ maybe_call_lua_function (const gchar *name, struct worker_task *task) struct worker_task **ptask; gboolean res; + g_mutex_lock (lua_mtx); lua_getglobal (L, name); if (lua_isfunction (L, -1)) { ptask = lua_newuserdata (L, sizeof (struct worker_task *)); @@ -1012,15 +1071,18 @@ maybe_call_lua_function (const gchar *name, struct worker_task *task) /* Call function */ if (lua_pcall (L, 1, 1, 0) != 0) { msg_info ("call to %s failed: %s", (gchar *)name, lua_tostring (L, -1)); + g_mutex_unlock (lua_mtx); return FALSE; } res = lua_toboolean (L, -1); lua_pop (L, 1); + g_mutex_unlock (lua_mtx); return res; } else { lua_pop (L, 1); } + g_mutex_unlock (lua_mtx); return FALSE; } @@ -1243,12 +1305,11 @@ process_regexp_item (struct worker_task *task, void *user_data) thr_ud->item = item; thr_ud->task = task; + register_async_thread (task->s); g_thread_pool_push (regexp_module_ctx->workers, thr_ud, &err); - if (err == NULL) { - register_async_thread (task->s); - } - else { + if (err != NULL) { msg_err ("error pushing task to the regexp thread pool: %s", err->message); + remove_async_thread (task->s); } } else { -- 2.39.5