aboutsummaryrefslogtreecommitdiffstats
path: root/src/plugins
diff options
context:
space:
mode:
authorVsevolod Stakhov <vsevolod@rambler-co.ru>2012-02-14 18:32:19 +0400
committerVsevolod Stakhov <vsevolod@rambler-co.ru>2012-02-14 18:32:19 +0400
commit236aabd74eee8c0475e332782c4bc5e010a3a7d8 (patch)
treeef818b8fb40e0bfe6b7d2609c521aaf3b1f718fe /src/plugins
parenta5b48a05a94d178c342bbad69a330addb518d148 (diff)
downloadrspamd-236aabd74eee8c0475e332782c4bc5e010a3a7d8.tar.gz
rspamd-236aabd74eee8c0475e332782c4bc5e010a3a7d8.zip
More fixes to thread-safe processing.
Diffstat (limited to 'src/plugins')
-rw-r--r--src/plugins/regexp.c71
1 files changed, 66 insertions, 5 deletions
diff --git a/src/plugins/regexp.c b/src/plugins/regexp.c
index c9b584436..5112678bc 100644
--- a/src/plugins/regexp.c
+++ b/src/plugins/regexp.c
@@ -27,7 +27,6 @@
*/
-
#include "config.h"
#include "main.h"
#include "message.h"
@@ -107,6 +106,65 @@ module_t regexp_module = {
regexp_module_reconfig
};
+/* Task cache functions */
+#if ((GLIB_MAJOR_VERSION == 2) && (GLIB_MINOR_VERSION <= 30))
+static GStaticMutex task_cache_mtx = G_STATIC_MUTEX_INIT;
+#else
+G_LOCK_DEFINE (task_cache_mtx);
+#endif
+
+void
+task_cache_add (struct worker_task *task, struct rspamd_regexp *re, gint32 result)
+{
+ if (result == 0) {
+ result = -1;
+ }
+ /* Avoid concurrenting inserting of results */
+#if ((GLIB_MAJOR_VERSION == 2) && (GLIB_MINOR_VERSION <= 30))
+ g_static_mutex_lock (&task_cache_mtx);
+#else
+ G_LOCK (task_cache_mtx);
+#endif
+ g_hash_table_insert (task->re_cache, re->regexp_text, GINT_TO_POINTER (result));
+#if ((GLIB_MAJOR_VERSION == 2) && (GLIB_MINOR_VERSION <= 30))
+ g_static_mutex_unlock (&task_cache_mtx);
+#else
+ G_UNLOCK (task_cache_mtx);
+#endif
+}
+
+gint32
+task_cache_check (struct worker_task *task, struct rspamd_regexp *re)
+{
+ gpointer res;
+ gint32 r;
+
+#if ((GLIB_MAJOR_VERSION == 2) && (GLIB_MINOR_VERSION <= 30))
+ g_static_mutex_lock (&task_cache_mtx);
+#else
+ G_LOCK (task_cache_mtx);
+#endif
+ if ((res = g_hash_table_lookup (task->re_cache, re->regexp_text)) != NULL) {
+ r = GPOINTER_TO_INT (res);
+#if ((GLIB_MAJOR_VERSION == 2) && (GLIB_MINOR_VERSION <= 30))
+ g_static_mutex_unlock (&task_cache_mtx);
+#else
+ G_UNLOCK (task_cache_mtx);
+#endif
+ if (r == -1) {
+ return 0;
+ }
+ return 1;
+ }
+#if ((GLIB_MAJOR_VERSION == 2) && (GLIB_MINOR_VERSION <= 30))
+ g_static_mutex_unlock (&task_cache_mtx);
+#else
+ G_UNLOCK (task_cache_mtx);
+#endif
+ return -1;
+}
+
+
static gint
luaopen_regexp (lua_State * L)
{
@@ -1004,6 +1062,7 @@ maybe_call_lua_function (const gchar *name, struct worker_task *task)
struct worker_task **ptask;
gboolean res;
+ g_mutex_lock (lua_mtx);
lua_getglobal (L, name);
if (lua_isfunction (L, -1)) {
ptask = lua_newuserdata (L, sizeof (struct worker_task *));
@@ -1012,15 +1071,18 @@ maybe_call_lua_function (const gchar *name, struct worker_task *task)
/* Call function */
if (lua_pcall (L, 1, 1, 0) != 0) {
msg_info ("call to %s failed: %s", (gchar *)name, lua_tostring (L, -1));
+ g_mutex_unlock (lua_mtx);
return FALSE;
}
res = lua_toboolean (L, -1);
lua_pop (L, 1);
+ g_mutex_unlock (lua_mtx);
return res;
}
else {
lua_pop (L, 1);
}
+ g_mutex_unlock (lua_mtx);
return FALSE;
}
@@ -1243,12 +1305,11 @@ process_regexp_item (struct worker_task *task, void *user_data)
thr_ud->item = item;
thr_ud->task = task;
+ register_async_thread (task->s);
g_thread_pool_push (regexp_module_ctx->workers, thr_ud, &err);
- if (err == NULL) {
- register_async_thread (task->s);
- }
- else {
+ if (err != NULL) {
msg_err ("error pushing task to the regexp thread pool: %s", err->message);
+ remove_async_thread (task->s);
}
}
else {