diff options
author | Vsevolod Stakhov <vsevolod@rambler-co.ru> | 2012-02-14 18:32:19 +0400 |
---|---|---|
committer | Vsevolod Stakhov <vsevolod@rambler-co.ru> | 2012-02-14 18:32:19 +0400 |
commit | 236aabd74eee8c0475e332782c4bc5e010a3a7d8 (patch) | |
tree | ef818b8fb40e0bfe6b7d2609c521aaf3b1f718fe /src/plugins | |
parent | a5b48a05a94d178c342bbad69a330addb518d148 (diff) | |
download | rspamd-236aabd74eee8c0475e332782c4bc5e010a3a7d8.tar.gz rspamd-236aabd74eee8c0475e332782c4bc5e010a3a7d8.zip |
More fixes to thread-safe processing.
Diffstat (limited to 'src/plugins')
-rw-r--r-- | src/plugins/regexp.c | 71 |
1 files changed, 66 insertions, 5 deletions
diff --git a/src/plugins/regexp.c b/src/plugins/regexp.c index c9b584436..5112678bc 100644 --- a/src/plugins/regexp.c +++ b/src/plugins/regexp.c @@ -27,7 +27,6 @@ */ - #include "config.h" #include "main.h" #include "message.h" @@ -107,6 +106,65 @@ module_t regexp_module = { regexp_module_reconfig }; +/* Task cache functions */ +#if ((GLIB_MAJOR_VERSION == 2) && (GLIB_MINOR_VERSION <= 30)) +static GStaticMutex task_cache_mtx = G_STATIC_MUTEX_INIT; +#else +G_LOCK_DEFINE (task_cache_mtx); +#endif + +void +task_cache_add (struct worker_task *task, struct rspamd_regexp *re, gint32 result) +{ + if (result == 0) { + result = -1; + } + /* Avoid concurrenting inserting of results */ +#if ((GLIB_MAJOR_VERSION == 2) && (GLIB_MINOR_VERSION <= 30)) + g_static_mutex_lock (&task_cache_mtx); +#else + G_LOCK (task_cache_mtx); +#endif + g_hash_table_insert (task->re_cache, re->regexp_text, GINT_TO_POINTER (result)); +#if ((GLIB_MAJOR_VERSION == 2) && (GLIB_MINOR_VERSION <= 30)) + g_static_mutex_unlock (&task_cache_mtx); +#else + G_UNLOCK (task_cache_mtx); +#endif +} + +gint32 +task_cache_check (struct worker_task *task, struct rspamd_regexp *re) +{ + gpointer res; + gint32 r; + +#if ((GLIB_MAJOR_VERSION == 2) && (GLIB_MINOR_VERSION <= 30)) + g_static_mutex_lock (&task_cache_mtx); +#else + G_LOCK (task_cache_mtx); +#endif + if ((res = g_hash_table_lookup (task->re_cache, re->regexp_text)) != NULL) { + r = GPOINTER_TO_INT (res); +#if ((GLIB_MAJOR_VERSION == 2) && (GLIB_MINOR_VERSION <= 30)) + g_static_mutex_unlock (&task_cache_mtx); +#else + G_UNLOCK (task_cache_mtx); +#endif + if (r == -1) { + return 0; + } + return 1; + } +#if ((GLIB_MAJOR_VERSION == 2) && (GLIB_MINOR_VERSION <= 30)) + g_static_mutex_unlock (&task_cache_mtx); +#else + G_UNLOCK (task_cache_mtx); +#endif + return -1; +} + + static gint luaopen_regexp (lua_State * L) { @@ -1004,6 +1062,7 @@ maybe_call_lua_function (const gchar *name, struct worker_task *task) struct worker_task **ptask; gboolean res; + g_mutex_lock (lua_mtx); lua_getglobal (L, name); if (lua_isfunction (L, -1)) { ptask = lua_newuserdata (L, sizeof (struct worker_task *)); @@ -1012,15 +1071,18 @@ maybe_call_lua_function (const gchar *name, struct worker_task *task) /* Call function */ if (lua_pcall (L, 1, 1, 0) != 0) { msg_info ("call to %s failed: %s", (gchar *)name, lua_tostring (L, -1)); + g_mutex_unlock (lua_mtx); return FALSE; } res = lua_toboolean (L, -1); lua_pop (L, 1); + g_mutex_unlock (lua_mtx); return res; } else { lua_pop (L, 1); } + g_mutex_unlock (lua_mtx); return FALSE; } @@ -1243,12 +1305,11 @@ process_regexp_item (struct worker_task *task, void *user_data) thr_ud->item = item; thr_ud->task = task; + register_async_thread (task->s); g_thread_pool_push (regexp_module_ctx->workers, thr_ud, &err); - if (err == NULL) { - register_async_thread (task->s); - } - else { + if (err != NULL) { msg_err ("error pushing task to the regexp thread pool: %s", err->message); + remove_async_thread (task->s); } } else { |