aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorVsevolod Stakhov <vsevolod@rambler-co.ru>2012-02-13 21:51:10 +0400
committerVsevolod Stakhov <vsevolod@rambler-co.ru>2012-02-13 21:51:10 +0400
commita5b48a05a94d178c342bbad69a330addb518d148 (patch)
treeee7ab452cd4a98fdb7503e78cc52a3d4f66dd27e
parent0d64c808b7310b6e233ec570649fbb281a3f2b13 (diff)
downloadrspamd-a5b48a05a94d178c342bbad69a330addb518d148.tar.gz
rspamd-a5b48a05a94d178c342bbad69a330addb518d148.zip
* More things to be thread-safe:
- pool allocator is now thread-safe - lua subsystem now holds lock to avoid lua stack corruption - events subsystem now using conditional variables to wait for async_threads - insert_result is thread-safe now
-rw-r--r--src/classifiers/bayes.c20
-rw-r--r--src/classifiers/classifiers.h1
-rw-r--r--src/controller.c6
-rw-r--r--src/events.c74
-rw-r--r--src/events.h6
-rw-r--r--src/filter.c17
-rw-r--r--src/lua/lua_classifier.c6
-rw-r--r--src/lua/lua_common.c24
-rw-r--r--src/lua/lua_common.h1
-rw-r--r--src/lua/lua_config.c6
-rw-r--r--src/lua/lua_task.c2
-rw-r--r--src/mem_pool.c26
-rw-r--r--src/mem_pool.h5
-rw-r--r--src/smtp_utils.c4
-rw-r--r--src/worker.c31
15 files changed, 189 insertions, 40 deletions
diff --git a/src/classifiers/bayes.c b/src/classifiers/bayes.c
index 265957bc9..281cc6292 100644
--- a/src/classifiers/bayes.c
+++ b/src/classifiers/bayes.c
@@ -165,6 +165,16 @@ bayes_classify_callback (gpointer key, gpointer value, gpointer data)
return FALSE;
}
+#if ((GLIB_MAJOR_VERSION == 2) && (GLIB_MINOR_VERSION <= 30))
+static void
+bayes_mutex_free (gpointer data)
+{
+ GMutex *mtx = data;
+
+ g_mutex_free (mtx);
+}
+#endif
+
struct classifier_ctx*
bayes_init (memory_pool_t *pool, struct classifier_config *cfg)
{
@@ -173,6 +183,13 @@ bayes_init (memory_pool_t *pool, struct classifier_config *cfg)
ctx->pool = pool;
ctx->cfg = cfg;
ctx->debug = FALSE;
+#if ((GLIB_MAJOR_VERSION == 2) && (GLIB_MINOR_VERSION <= 30))
+ ctx->mtx = g_mutex_new ();
+ memory_pool_add_destructor (pool, (pool_destruct_func) bayes_mutex_free, ctx->mtx);
+#else
+ ctx->mtx = memory_pool_alloc (pool, sizeof (GMutex));
+ g_mutex_init (ctx->mtx);
+#endif
return ctx;
}
@@ -205,6 +222,8 @@ bayes_classify (struct classifier_ctx* ctx, statfile_pool_t *pool, GTree *input,
}
}
+ /* Critical section as here can be lua callbacks calling */
+ g_mutex_lock (ctx->mtx);
cur = call_classifier_pre_callbacks (ctx->cfg, task, FALSE, FALSE);
if (cur) {
memory_pool_add_destructor (task->task_pool, (pool_destruct_func)g_list_free, cur);
@@ -212,6 +231,7 @@ bayes_classify (struct classifier_ctx* ctx, statfile_pool_t *pool, GTree *input,
else {
cur = ctx->cfg->statfiles;
}
+ g_mutex_unlock (ctx->mtx);
data.statfiles_num = g_list_length (cur);
data.statfiles = g_new0 (struct bayes_statfile_data, data.statfiles_num);
diff --git a/src/classifiers/classifiers.h b/src/classifiers/classifiers.h
index b05b62e34..717083452 100644
--- a/src/classifiers/classifiers.h
+++ b/src/classifiers/classifiers.h
@@ -17,6 +17,7 @@ struct classifier_ctx {
GHashTable *results;
gboolean debug;
struct classifier_config *cfg;
+ GMutex *mtx;
};
struct classify_weight {
diff --git a/src/controller.c b/src/controller.c
index b9ec3677c..ddac27930 100644
--- a/src/controller.c
+++ b/src/controller.c
@@ -846,9 +846,9 @@ process_normal_command (const gchar *line)
}
/*
- * Called if all filters are processed
+ * Called if all filters are processed, non-threaded and simple version
*/
-static void
+static gboolean
fin_learn_task (void *arg)
{
struct worker_task *task = (struct worker_task *) arg;
@@ -870,6 +870,8 @@ fin_learn_task (void *arg)
rspamd_dispatcher_restore (task->dispatcher);
}
}
+
+ return TRUE;
}
/*
diff --git a/src/events.c b/src/events.c
index b658f48a9..0d831104c 100644
--- a/src/events.c
+++ b/src/events.c
@@ -56,10 +56,18 @@ event_mutex_free (gpointer data)
g_mutex_free (mtx);
}
+
+static void
+event_cond_free (gpointer data)
+{
+ GCond *cond = data;
+
+ g_cond_free (cond);
+}
#endif
struct rspamd_async_session *
-new_async_session (memory_pool_t * pool, event_finalizer_t fin,
+new_async_session (memory_pool_t * pool, session_finalizer_t fin,
event_finalizer_t restore, event_finalizer_t cleanup, void *user_data)
{
struct rspamd_async_session *new;
@@ -74,10 +82,14 @@ new_async_session (memory_pool_t * pool, event_finalizer_t fin,
new->events = g_hash_table_new (rspamd_event_hash, rspamd_event_equal);
#if ((GLIB_MAJOR_VERSION == 2) && (GLIB_MINOR_VERSION <= 30))
new->mtx = g_mutex_new ();
+ new->cond = g_cond_new ();
memory_pool_add_destructor (pool, (pool_destruct_func) event_mutex_free, new->mtx);
+ memory_pool_add_destructor (pool, (pool_destruct_func) event_cond_free, new->cond);
#else
new->mtx = memory_pool_alloc (pool, sizeof (GMutex));
g_mutex_init (new->mtx);
+ new->cond = memory_pool_alloc (pool, sizeof (GCond));
+ g_cond_init (new->cond);
#endif
new->threads = 0;
@@ -96,6 +108,7 @@ register_async_event (struct rspamd_async_session *session, event_finalizer_t fi
return;
}
+ g_mutex_lock (session->mtx);
new = memory_pool_alloc (session->pool, sizeof (struct rspamd_async_event));
new->fin = fin;
new->user_data = user_data;
@@ -106,6 +119,7 @@ register_async_event (struct rspamd_async_session *session, event_finalizer_t fi
msg_info ("added event: %p, pending %d events, subsystem: %s", user_data, g_hash_table_size (session->events),
g_quark_to_string (subsystem));
#endif
+ g_mutex_unlock (session->mtx);
}
void
@@ -118,6 +132,7 @@ remove_normal_event (struct rspamd_async_session *session, event_finalizer_t fin
return;
}
+ g_mutex_lock (session->mtx);
/* Search for event */
search_ev.fin = fin;
search_ev.user_data = ud;
@@ -130,13 +145,12 @@ remove_normal_event (struct rspamd_async_session *session, event_finalizer_t fin
/* Remove event */
fin (ud);
}
-
-
+ g_mutex_unlock (session->mtx);
check_session_pending (session);
}
-static void
+static gboolean
rspamd_session_destroy (gpointer k, gpointer v, gpointer unused)
{
struct rspamd_async_event *ev = v;
@@ -145,6 +159,8 @@ rspamd_session_destroy (gpointer k, gpointer v, gpointer unused)
if (ev->fin != NULL) {
ev->fin (ev->user_data);
}
+
+ return TRUE;
}
gboolean
@@ -155,36 +171,47 @@ destroy_session (struct rspamd_async_session *session)
return FALSE;
}
+ g_mutex_lock (session->mtx);
+ if (session->threads > 0) {
+ /* Wait for conditional variable to finish processing */
+ g_mutex_unlock (session->mtx);
+ g_cond_wait (session->cond, session->mtx);
+ }
+
session->wanna_die = TRUE;
- g_hash_table_foreach (session->events, rspamd_session_destroy, session);
+ g_hash_table_foreach_remove (session->events, rspamd_session_destroy, session);
- if (session->threads == 0) {
- if (session->cleanup != NULL) {
- session->cleanup (session->user_data);
- }
- return TRUE;
- }
+ /* Mutex can be destroyed here */
+ g_mutex_unlock (session->mtx);
- return FALSE;
+ if (session->cleanup != NULL) {
+ session->cleanup (session->user_data);
+ }
+ return TRUE;
}
gboolean
check_session_pending (struct rspamd_async_session *session)
{
g_mutex_lock (session->mtx);
- if (session->wanna_die && session->threads == 0 && g_hash_table_size (session->events) == 0) {
+ if (session->wanna_die && g_hash_table_size (session->events) == 0) {
session->wanna_die = FALSE;
- if (session->fin != NULL) {
- session->fin (session->user_data);
+ if (session->threads > 0) {
+ /* Wait for conditional variable to finish processing */
+ g_cond_wait (session->cond, session->mtx);
}
- /* Check events count again */
- if (g_hash_table_size (session->events) != 0) {
- if (session->restore != NULL) {
- session->restore (session->user_data);
+ if (session->fin != NULL) {
+ if (! session->fin (session->user_data)) {
+ g_mutex_unlock (session->mtx);
+ /* Session finished incompletely, perform restoration */
+ if (session->restore != NULL) {
+ session->restore (session->user_data);
+ /* Call pending once more */
+ return check_session_pending (session);
+ }
+ return TRUE;
}
- g_mutex_unlock (session->mtx);
- return TRUE;
}
g_mutex_unlock (session->mtx);
return FALSE;
@@ -215,7 +242,10 @@ void
remove_async_thread (struct rspamd_async_session *session)
{
if (g_atomic_int_dec_and_test (&session->threads)) {
- (void) check_session_pending (session);
+ /* Signal if there are any sessions waiting */
+ g_mutex_lock (session->mtx);
+ g_cond_signal (session->cond);
+ g_mutex_unlock (session->mtx);
}
#ifdef RSPAMD_EVENTS_DEBUG
msg_info ("removed thread: pending %d thread", session->threads);
diff --git a/src/events.h b/src/events.h
index 2c4ea2c91..1329d5837 100644
--- a/src/events.h
+++ b/src/events.h
@@ -6,6 +6,7 @@
struct rspamd_async_event;
typedef void (*event_finalizer_t)(void *user_data);
+typedef gboolean (*session_finalizer_t)(void *user_data);
struct rspamd_async_event {
GQuark subsystem;
@@ -15,7 +16,7 @@ struct rspamd_async_event {
};
struct rspamd_async_session {
- event_finalizer_t fin;
+ session_finalizer_t fin;
event_finalizer_t restore;
event_finalizer_t cleanup;
GHashTable *events;
@@ -24,6 +25,7 @@ struct rspamd_async_session {
gboolean wanna_die;
guint threads;
GMutex *mtx;
+ GCond *cond;
};
/**
@@ -36,7 +38,7 @@ struct rspamd_async_session {
* @return
*/
struct rspamd_async_session *new_async_session (memory_pool_t *pool,
- event_finalizer_t fin, event_finalizer_t restore,
+ session_finalizer_t fin, event_finalizer_t restore,
event_finalizer_t cleanup, void *user_data);
/**
diff --git a/src/filter.c b/src/filter.c
index cf8a63d0d..bca6d17d4 100644
--- a/src/filter.c
+++ b/src/filter.c
@@ -140,6 +140,12 @@ insert_metric_result (struct worker_task *task, struct metric *metric, const gch
}
+#if ((GLIB_MAJOR_VERSION == 2) && (GLIB_MINOR_VERSION <= 30))
+static GStaticMutex result_mtx = G_STATIC_MUTEX_INIT;
+#else
+G_LOCK_DEFINE (result_mtx);
+#endif
+
static void
insert_result_common (struct worker_task *task, const gchar *symbol, double flag, GList * opts, gboolean single)
{
@@ -147,6 +153,12 @@ insert_result_common (struct worker_task *task, const gchar *symbol, double flag
struct cache_item *item;
GList *cur, *metric_list;
+ /* Avoid concurrenting inserting of results */
+#if ((GLIB_MAJOR_VERSION == 2) && (GLIB_MINOR_VERSION <= 30))
+ g_static_mutex_lock (&result_mtx);
+#else
+ G_LOCK (result_mtx);
+#endif
metric_list = g_hash_table_lookup (task->cfg->metrics_symbols, symbol);
if (metric_list) {
cur = metric_list;
@@ -174,6 +186,11 @@ insert_result_common (struct worker_task *task, const gchar *symbol, double flag
/* XXX: it is not wise to destroy them here */
g_list_free (opts);
}
+#if ((GLIB_MAJOR_VERSION == 2) && (GLIB_MINOR_VERSION <= 30))
+ g_static_mutex_unlock (&result_mtx);
+#else
+ G_UNLOCK (result_mtx);
+#endif
}
/* Insert result that may be increased on next insertions */
diff --git a/src/lua/lua_classifier.c b/src/lua/lua_classifier.c
index 0bf0daca8..edaf4e7a6 100644
--- a/src/lua/lua_classifier.c
+++ b/src/lua/lua_classifier.c
@@ -123,6 +123,7 @@ call_classifier_pre_callbacks (struct classifier_config *ccf, struct worker_task
struct classifier_callback_data *cd;
lua_State *L;
+
/* Go throught all callbacks and call them, appending results to list */
cur = g_list_first (ccf->pre_callbacks);
while (cur) {
@@ -134,6 +135,7 @@ call_classifier_pre_callbacks (struct classifier_config *ccf, struct worker_task
cur = g_list_next (cur);
}
+ g_mutex_lock (lua_mtx);
if (res == NULL) {
L = task->cfg->lua_state;
/* Check function from global table 'classifiers' */
@@ -149,6 +151,8 @@ call_classifier_pre_callbacks (struct classifier_config *ccf, struct worker_task
}
lua_pop (L, 1);
}
+ g_mutex_unlock (lua_mtx);
+
return res;
}
@@ -162,6 +166,7 @@ call_classifier_post_callbacks (struct classifier_config *ccf, struct worker_tas
double out = in;
GList *cur;
+ g_mutex_lock (lua_mtx);
/* Go throught all callbacks and call them, appending results to list */
cur = g_list_first (ccf->pre_callbacks);
while (cur) {
@@ -190,6 +195,7 @@ call_classifier_post_callbacks (struct classifier_config *ccf, struct worker_tas
cur = g_list_next (cur);
}
+ g_mutex_unlock (lua_mtx);
return out;
diff --git a/src/lua/lua_common.c b/src/lua/lua_common.c
index 00fcadc93..4d90048ca 100644
--- a/src/lua/lua_common.c
+++ b/src/lua/lua_common.c
@@ -28,6 +28,9 @@
/* Lua module init function */
#define MODULE_INIT_FUNC "module_init"
+/* Global lua mutex */
+GMutex *lua_mtx = NULL;
+
const luaL_reg null_reg[] = {
{"__tostring", lua_class_tostring},
{NULL, NULL}
@@ -224,6 +227,13 @@ init_lua (struct config_file *cfg)
L = lua_open ();
luaL_openlibs (L);
+#if ((GLIB_MAJOR_VERSION == 2) && (GLIB_MINOR_VERSION <= 30))
+ lua_mtx = g_mutex_new ();
+#else
+ lua_mtx = g_malloc (sizeof (GMutex));
+ g_mutex_init (lua_mtx);
+#endif
+
(void)luaopen_rspamd (L);
(void)luaopen_logger (L);
(void)luaopen_config (L);
@@ -320,6 +330,7 @@ lua_call_filter (const gchar *function, struct worker_task *task)
struct worker_task **ptask;
lua_State *L = task->cfg->lua_state;
+ g_mutex_lock (lua_mtx);
lua_getglobal (L, function);
ptask = lua_newuserdata (L, sizeof (struct worker_task *));
lua_setclass (L, "rspamd{task}", -1);
@@ -335,6 +346,8 @@ lua_call_filter (const gchar *function, struct worker_task *task)
}
result = lua_tonumber (L, -1);
lua_pop (L, 1); /* pop returned value */
+ g_mutex_unlock (lua_mtx);
+
return result;
}
@@ -345,6 +358,7 @@ lua_call_chain_filter (const gchar *function, struct worker_task *task, gint *ma
guint i;
lua_State *L = task->cfg->lua_state;
+ g_mutex_lock (lua_mtx);
lua_getglobal (L, function);
for (i = 0; i < number; i++) {
@@ -360,6 +374,8 @@ lua_call_chain_filter (const gchar *function, struct worker_task *task, gint *ma
}
result = lua_tonumber (L, -1);
lua_pop (L, 1); /* pop returned value */
+ g_mutex_unlock (lua_mtx);
+
return result;
}
@@ -374,6 +390,7 @@ lua_call_expression_func (const gchar *module, const gchar *function,
struct expression_argument *arg;
int nargs = 1, pop = 0;
+ g_mutex_lock (lua_mtx);
/* Call specified function and expect result of given expected_type */
/* First check function in config table */
lua_getglobal (L, "config");
@@ -435,6 +452,7 @@ lua_call_expression_func (const gchar *module, const gchar *function,
if (lua_pcall (L, nargs, 1, 0) != 0) {
msg_info ("call to %s failed: %s", function, lua_tostring (L, -1));
+ g_mutex_unlock (lua_mtx);
return FALSE;
}
pop ++;
@@ -442,11 +460,13 @@ lua_call_expression_func (const gchar *module, const gchar *function,
if (!lua_isboolean (L, -1)) {
lua_pop (L, pop);
msg_info ("function %s must return a boolean", function);
+ g_mutex_unlock (lua_mtx);
return FALSE;
}
*res = lua_toboolean (L, -1);
lua_pop (L, pop);
+ g_mutex_unlock (lua_mtx);
return TRUE;
}
@@ -468,6 +488,7 @@ lua_consolidation_callback (gpointer key, gpointer value, gpointer arg)
struct consolidation_callback_data *data = (struct consolidation_callback_data *)arg;
lua_State *L = data->task->cfg->lua_state;
+ g_mutex_lock (lua_mtx);
lua_getglobal (L, data->func);
lua_pushstring (L, (const gchar *)key);
@@ -483,6 +504,7 @@ lua_consolidation_callback (gpointer key, gpointer value, gpointer arg)
res = lua_tonumber (L, -1);
lua_pop (L, 1); /* pop returned value */
data->score += res;
+ g_mutex_unlock (lua_mtx);
}
double
@@ -519,6 +541,7 @@ lua_normalizer_func (struct config_file *cfg, long double score, void *params)
return score;
}
+ g_mutex_lock (lua_mtx);
lua_getglobal (L, p->data);
lua_pushnumber (L, score);
@@ -533,6 +556,7 @@ lua_normalizer_func (struct config_file *cfg, long double score, void *params)
res = lua_tonumber (L, -1);
lua_pop (L, 1);
+ g_mutex_unlock (lua_mtx);
return res;
}
diff --git a/src/lua/lua_common.h b/src/lua/lua_common.h
index c1891a6a7..32604cbc4 100644
--- a/src/lua/lua_common.h
+++ b/src/lua/lua_common.h
@@ -15,6 +15,7 @@
#define LUA_INTERFACE_DEF(class, name) { #name, lua_##class##_##name }
extern const luaL_reg null_reg[];
+extern GMutex *lua_mtx;
#define RSPAMD_LUA_API_VERSION 9
diff --git a/src/lua/lua_config.c b/src/lua/lua_config.c
index cd1287a18..bdb4ce056 100644
--- a/src/lua/lua_config.c
+++ b/src/lua/lua_config.c
@@ -318,6 +318,7 @@ lua_config_function_callback (struct worker_task *task, GList *args, void *user_
GList *cur;
gboolean res = FALSE;
+ g_mutex_lock (lua_mtx);
if (cd->cb_is_ref) {
lua_rawgeti (cd->L, LUA_REGISTRYINDEX, cd->callback.ref);
}
@@ -347,6 +348,7 @@ lua_config_function_callback (struct worker_task *task, GList *args, void *user_
}
lua_pop (cd->L, 1);
}
+ g_mutex_unlock (lua_mtx);
return res;
}
@@ -439,6 +441,7 @@ lua_call_post_filters (struct worker_task *task)
struct worker_task **ptask;
GList *cur;
+ g_mutex_lock (lua_mtx);
cur = task->cfg->post_filters;
while (cur) {
cd = cur->data;
@@ -458,6 +461,7 @@ lua_call_post_filters (struct worker_task *task)
}
cur = g_list_next (cur);
}
+ g_mutex_unlock (lua_mtx);
}
static gint
@@ -583,6 +587,7 @@ lua_metric_symbol_callback (struct worker_task *task, gpointer ud)
struct lua_callback_data *cd = ud;
struct worker_task **ptask;
+ g_mutex_lock (lua_mtx);
if (cd->cb_is_ref) {
lua_rawgeti (cd->L, LUA_REGISTRYINDEX, cd->callback.ref);
}
@@ -597,6 +602,7 @@ lua_metric_symbol_callback (struct worker_task *task, gpointer ud)
msg_info ("call to %s failed: %s", cd->cb_is_ref ? "local function" :
cd->callback.name, lua_tostring (cd->L, -1));
}
+ g_mutex_unlock (lua_mtx);
}
static gint
diff --git a/src/lua/lua_task.c b/src/lua/lua_task.c
index c0b299fb7..d28d897f6 100644
--- a/src/lua/lua_task.c
+++ b/src/lua/lua_task.c
@@ -468,6 +468,7 @@ lua_dns_callback (struct rspamd_dns_reply *reply, gpointer arg)
union rspamd_reply_element *elt;
GList *cur;
+ g_mutex_lock (lua_mtx);
if (cd->cb_is_ref) {
lua_rawgeti (cd->L, LUA_REGISTRYINDEX, cd->callback.ref);
}
@@ -553,6 +554,7 @@ lua_dns_callback (struct rspamd_dns_reply *reply, gpointer arg)
if (cd->cb_is_ref) {
luaL_unref (cd->L, LUA_REGISTRYINDEX, cd->callback.ref);
}
+ g_mutex_unlock (lua_mtx);
}
static gint
diff --git a/src/mem_pool.c b/src/mem_pool.c
index d5b3ddf6e..c68ae4b13 100644
--- a/src/mem_pool.c
+++ b/src/mem_pool.c
@@ -41,6 +41,14 @@ pthread_mutex_t stat_mtx = PTHREAD_MUTEX_INITIALIZER;
# define STAT_UNLOCK() do {} while (0)
#endif
+#if ((GLIB_MAJOR_VERSION == 2) && (GLIB_MINOR_VERSION <= 30))
+# define POOL_MTX_LOCK() do { g_static_mutex_lock (&pool->mtx); } while (0)
+# define POOL_MTX_UNLOCK() do { g_static_mutex_unlock (&pool->mtx); } while (0)
+#else
+# define POOL_MTX_LOCK() do { g_mutex_lock (&pool->mtx); } while (0)
+# define POOL_MTX_UNLOCK() do { g_mutex_unlock (&pool->mtx); } while (0)
+#endif
+
/*
* This define specify whether we should check all pools for free space for new object
* or just begin scan from current (recently attached) pool
@@ -101,6 +109,7 @@ pool_chain_new_shared (gsize size)
struct _pool_chain_shared *chain;
gpointer map;
+
#if defined(HAVE_MMAP_ANON)
map = mmap (NULL, size + sizeof (struct _pool_chain_shared), PROT_READ | PROT_WRITE, MAP_ANON | MAP_SHARED, -1, 0);
if (map == MAP_FAILED) {
@@ -189,6 +198,11 @@ memory_pool_new (gsize size)
new->destructors = NULL;
/* Set it upon first call of set variable */
new->variables = NULL;
+#if ((GLIB_MAJOR_VERSION == 2) && (GLIB_MINOR_VERSION <= 30))
+ new->mtx = G_STATIC_MUTEX_INIT;
+#else
+ g_mutex_init (&new->mtx);
+#endif
mem_pool_stat->pools_allocated++;
@@ -203,6 +217,7 @@ memory_pool_alloc (memory_pool_t * pool, gsize size)
gint free;
if (pool) {
+ POOL_MTX_LOCK ();
#ifdef MEMORY_GREEDY
cur = pool->first_pool;
#else
@@ -214,6 +229,7 @@ memory_pool_alloc (memory_pool_t * pool, gsize size)
}
if (free < (gint)size && cur->next == NULL) {
/* Allocate new pool */
+
if (cur->len >= size + MEM_ALIGNMENT) {
new = pool_chain_new (cur->len);
}
@@ -227,10 +243,12 @@ memory_pool_alloc (memory_pool_t * pool, gsize size)
/* No need to align again */
tmp = new->pos;
new->pos = tmp + size;
+ POOL_MTX_UNLOCK ();
return tmp;
}
tmp = align_ptr (cur->pos, MEM_ALIGNMENT);
cur->pos = tmp + size;
+ POOL_MTX_UNLOCK ();
return tmp;
}
return NULL;
@@ -317,6 +335,7 @@ memory_pool_alloc_shared (memory_pool_t * pool, gsize size)
if (pool) {
g_return_val_if_fail (size > 0, NULL);
+ POOL_MTX_LOCK ();
cur = pool->shared_pool;
if (!cur) {
cur = pool_chain_new_shared (pool->first_pool->len);
@@ -329,6 +348,7 @@ memory_pool_alloc_shared (memory_pool_t * pool, gsize size)
}
if (free < (gint)size && cur->next == NULL) {
/* Allocate new pool */
+
if (cur->len >= size + MEM_ALIGNMENT) {
new = pool_chain_new_shared (cur->len);
}
@@ -342,10 +362,12 @@ memory_pool_alloc_shared (memory_pool_t * pool, gsize size)
STAT_LOCK ();
mem_pool_stat->bytes_allocated += size;
STAT_UNLOCK ();
+ POOL_MTX_UNLOCK ();
return new->begin;
}
tmp = align_ptr (cur->pos, MEM_ALIGNMENT);
cur->pos = tmp + size;
+ POOL_MTX_UNLOCK ();
return tmp;
}
return NULL;
@@ -455,12 +477,14 @@ memory_pool_add_destructor_full (memory_pool_t * pool, pool_destruct_func func,
cur = memory_pool_alloc (pool, sizeof (struct _pool_destructors));
if (cur) {
+ POOL_MTX_LOCK ();
cur->func = func;
cur->data = data;
cur->function = function;
cur->loc = line;
cur->prev = pool->destructors;
pool->destructors = cur;
+ POOL_MTX_UNLOCK ();
}
}
@@ -488,6 +512,7 @@ memory_pool_delete (memory_pool_t * pool)
struct _pool_chain_shared *cur_shared = pool->shared_pool, *tmp_shared;
struct _pool_destructors *destructor = pool->destructors;
+ POOL_MTX_LOCK ();
/* Call all pool destructors */
while (destructor) {
/* Avoid calling destructors for NULL pointers */
@@ -522,6 +547,7 @@ memory_pool_delete (memory_pool_t * pool)
}
mem_pool_stat->pools_freed++;
+ POOL_MTX_UNLOCK ();
g_slice_free (memory_pool_t, pool);
}
diff --git a/src/mem_pool.h b/src/mem_pool.h
index 62f6dcb9a..475a00629 100644
--- a/src/mem_pool.h
+++ b/src/mem_pool.h
@@ -76,6 +76,11 @@ typedef struct memory_pool_s {
struct _pool_chain_shared *shared_pool; /**< shared chain */
struct _pool_destructors *destructors; /**< destructors chain */
GHashTable *variables; /**< private memory pool variables */
+#if ((GLIB_MAJOR_VERSION == 2) && (GLIB_MINOR_VERSION <= 30))
+ GStaticMutex mtx; /**< threads lock */
+#else
+ GMutex mtx; /**< threads lock */
+#endif
} memory_pool_t;
/**
diff --git a/src/smtp_utils.c b/src/smtp_utils.c
index 3a9d62cbb..8ceaef5a6 100644
--- a/src/smtp_utils.c
+++ b/src/smtp_utils.c
@@ -28,7 +28,7 @@
#include "smtp.h"
#include "smtp_proto.h"
-void
+gboolean
free_smtp_session (gpointer arg)
{
struct smtp_session *session = arg;
@@ -56,6 +56,8 @@ free_smtp_session (gpointer arg)
memory_pool_delete (session->pool);
g_free (session);
}
+
+ return TRUE;
}
gboolean
diff --git a/src/worker.c b/src/worker.c
index f113e04d9..d56c2d924 100644
--- a/src/worker.c
+++ b/src/worker.c
@@ -613,7 +613,7 @@ err_socket (GError * err, void *arg)
/*
* Called if all filters are processed
*/
-static void
+static gboolean
fin_task (void *arg)
{
struct worker_task *task = (struct worker_task *) arg;
@@ -630,20 +630,25 @@ fin_task (void *arg)
/* Just process composites */
make_composites (task);
}
- /* Call post filters */
- lua_call_post_filters (task);
+ if (task->cfg->post_filters) {
+ /* More to process */
+ /* Special state */
+ task->state = WAIT_POST_FILTER;
+ return FALSE;
+ }
+
}
/* Check if we have all events finished */
- if (g_hash_table_size (task->s->events) == 0 && task->s->threads == 0) {
- task->state = WRITE_REPLY;
- if (task->fin_callback) {
- task->fin_callback (task->fin_arg);
- }
- else {
- rspamd_dispatcher_restore (task->dispatcher);
- }
+ task->state = WRITE_REPLY;
+ if (task->fin_callback) {
+ task->fin_callback (task->fin_arg);
+ }
+ else {
+ rspamd_dispatcher_restore (task->dispatcher);
}
+
+ return TRUE;
}
/*
@@ -654,8 +659,8 @@ restore_task (void *arg)
{
struct worker_task *task = (struct worker_task *) arg;
- /* Special state */
- task->state = WAIT_POST_FILTER;
+ /* Call post filters */
+ lua_call_post_filters (task);
task->s->wanna_die = TRUE;
}