diff options
author | Vsevolod Stakhov <vsevolod@rambler-co.ru> | 2012-02-13 21:51:10 +0400 |
---|---|---|
committer | Vsevolod Stakhov <vsevolod@rambler-co.ru> | 2012-02-13 21:51:10 +0400 |
commit | a5b48a05a94d178c342bbad69a330addb518d148 (patch) | |
tree | ee7ab452cd4a98fdb7503e78cc52a3d4f66dd27e | |
parent | 0d64c808b7310b6e233ec570649fbb281a3f2b13 (diff) | |
download | rspamd-a5b48a05a94d178c342bbad69a330addb518d148.tar.gz rspamd-a5b48a05a94d178c342bbad69a330addb518d148.zip |
* More things to be thread-safe:
- pool allocator is now thread-safe
- lua subsystem now holds lock to avoid lua stack corruption
- events subsystem now using conditional variables to wait for async_threads
- insert_result is thread-safe now
-rw-r--r-- | src/classifiers/bayes.c | 20 | ||||
-rw-r--r-- | src/classifiers/classifiers.h | 1 | ||||
-rw-r--r-- | src/controller.c | 6 | ||||
-rw-r--r-- | src/events.c | 74 | ||||
-rw-r--r-- | src/events.h | 6 | ||||
-rw-r--r-- | src/filter.c | 17 | ||||
-rw-r--r-- | src/lua/lua_classifier.c | 6 | ||||
-rw-r--r-- | src/lua/lua_common.c | 24 | ||||
-rw-r--r-- | src/lua/lua_common.h | 1 | ||||
-rw-r--r-- | src/lua/lua_config.c | 6 | ||||
-rw-r--r-- | src/lua/lua_task.c | 2 | ||||
-rw-r--r-- | src/mem_pool.c | 26 | ||||
-rw-r--r-- | src/mem_pool.h | 5 | ||||
-rw-r--r-- | src/smtp_utils.c | 4 | ||||
-rw-r--r-- | src/worker.c | 31 |
15 files changed, 189 insertions, 40 deletions
diff --git a/src/classifiers/bayes.c b/src/classifiers/bayes.c index 265957bc9..281cc6292 100644 --- a/src/classifiers/bayes.c +++ b/src/classifiers/bayes.c @@ -165,6 +165,16 @@ bayes_classify_callback (gpointer key, gpointer value, gpointer data) return FALSE; } +#if ((GLIB_MAJOR_VERSION == 2) && (GLIB_MINOR_VERSION <= 30)) +static void +bayes_mutex_free (gpointer data) +{ + GMutex *mtx = data; + + g_mutex_free (mtx); +} +#endif + struct classifier_ctx* bayes_init (memory_pool_t *pool, struct classifier_config *cfg) { @@ -173,6 +183,13 @@ bayes_init (memory_pool_t *pool, struct classifier_config *cfg) ctx->pool = pool; ctx->cfg = cfg; ctx->debug = FALSE; +#if ((GLIB_MAJOR_VERSION == 2) && (GLIB_MINOR_VERSION <= 30)) + ctx->mtx = g_mutex_new (); + memory_pool_add_destructor (pool, (pool_destruct_func) bayes_mutex_free, ctx->mtx); +#else + ctx->mtx = memory_pool_alloc (pool, sizeof (GMutex)); + g_mutex_init (ctx->mtx); +#endif return ctx; } @@ -205,6 +222,8 @@ bayes_classify (struct classifier_ctx* ctx, statfile_pool_t *pool, GTree *input, } } + /* Critical section as here can be lua callbacks calling */ + g_mutex_lock (ctx->mtx); cur = call_classifier_pre_callbacks (ctx->cfg, task, FALSE, FALSE); if (cur) { memory_pool_add_destructor (task->task_pool, (pool_destruct_func)g_list_free, cur); @@ -212,6 +231,7 @@ bayes_classify (struct classifier_ctx* ctx, statfile_pool_t *pool, GTree *input, else { cur = ctx->cfg->statfiles; } + g_mutex_unlock (ctx->mtx); data.statfiles_num = g_list_length (cur); data.statfiles = g_new0 (struct bayes_statfile_data, data.statfiles_num); diff --git a/src/classifiers/classifiers.h b/src/classifiers/classifiers.h index b05b62e34..717083452 100644 --- a/src/classifiers/classifiers.h +++ b/src/classifiers/classifiers.h @@ -17,6 +17,7 @@ struct classifier_ctx { GHashTable *results; gboolean debug; struct classifier_config *cfg; + GMutex *mtx; }; struct classify_weight { diff --git a/src/controller.c b/src/controller.c index b9ec3677c..ddac27930 100644 --- a/src/controller.c +++ b/src/controller.c @@ -846,9 +846,9 @@ process_normal_command (const gchar *line) } /* - * Called if all filters are processed + * Called if all filters are processed, non-threaded and simple version */ -static void +static gboolean fin_learn_task (void *arg) { struct worker_task *task = (struct worker_task *) arg; @@ -870,6 +870,8 @@ fin_learn_task (void *arg) rspamd_dispatcher_restore (task->dispatcher); } } + + return TRUE; } /* diff --git a/src/events.c b/src/events.c index b658f48a9..0d831104c 100644 --- a/src/events.c +++ b/src/events.c @@ -56,10 +56,18 @@ event_mutex_free (gpointer data) g_mutex_free (mtx); } + +static void +event_cond_free (gpointer data) +{ + GCond *cond = data; + + g_cond_free (cond); +} #endif struct rspamd_async_session * -new_async_session (memory_pool_t * pool, event_finalizer_t fin, +new_async_session (memory_pool_t * pool, session_finalizer_t fin, event_finalizer_t restore, event_finalizer_t cleanup, void *user_data) { struct rspamd_async_session *new; @@ -74,10 +82,14 @@ new_async_session (memory_pool_t * pool, event_finalizer_t fin, new->events = g_hash_table_new (rspamd_event_hash, rspamd_event_equal); #if ((GLIB_MAJOR_VERSION == 2) && (GLIB_MINOR_VERSION <= 30)) new->mtx = g_mutex_new (); + new->cond = g_cond_new (); memory_pool_add_destructor (pool, (pool_destruct_func) event_mutex_free, new->mtx); + memory_pool_add_destructor (pool, (pool_destruct_func) event_cond_free, new->cond); #else new->mtx = memory_pool_alloc (pool, sizeof (GMutex)); g_mutex_init (new->mtx); + new->cond = memory_pool_alloc (pool, sizeof (GCond)); + g_cond_init (new->cond); #endif new->threads = 0; @@ -96,6 +108,7 @@ register_async_event (struct rspamd_async_session *session, event_finalizer_t fi return; } + g_mutex_lock (session->mtx); new = memory_pool_alloc (session->pool, sizeof (struct rspamd_async_event)); new->fin = fin; new->user_data = user_data; @@ -106,6 +119,7 @@ register_async_event (struct rspamd_async_session *session, event_finalizer_t fi msg_info ("added event: %p, pending %d events, subsystem: %s", user_data, g_hash_table_size (session->events), g_quark_to_string (subsystem)); #endif + g_mutex_unlock (session->mtx); } void @@ -118,6 +132,7 @@ remove_normal_event (struct rspamd_async_session *session, event_finalizer_t fin return; } + g_mutex_lock (session->mtx); /* Search for event */ search_ev.fin = fin; search_ev.user_data = ud; @@ -130,13 +145,12 @@ remove_normal_event (struct rspamd_async_session *session, event_finalizer_t fin /* Remove event */ fin (ud); } - - + g_mutex_unlock (session->mtx); check_session_pending (session); } -static void +static gboolean rspamd_session_destroy (gpointer k, gpointer v, gpointer unused) { struct rspamd_async_event *ev = v; @@ -145,6 +159,8 @@ rspamd_session_destroy (gpointer k, gpointer v, gpointer unused) if (ev->fin != NULL) { ev->fin (ev->user_data); } + + return TRUE; } gboolean @@ -155,36 +171,47 @@ destroy_session (struct rspamd_async_session *session) return FALSE; } + g_mutex_lock (session->mtx); + if (session->threads > 0) { + /* Wait for conditional variable to finish processing */ + g_mutex_unlock (session->mtx); + g_cond_wait (session->cond, session->mtx); + } + session->wanna_die = TRUE; - g_hash_table_foreach (session->events, rspamd_session_destroy, session); + g_hash_table_foreach_remove (session->events, rspamd_session_destroy, session); - if (session->threads == 0) { - if (session->cleanup != NULL) { - session->cleanup (session->user_data); - } - return TRUE; - } + /* Mutex can be destroyed here */ + g_mutex_unlock (session->mtx); - return FALSE; + if (session->cleanup != NULL) { + session->cleanup (session->user_data); + } + return TRUE; } gboolean check_session_pending (struct rspamd_async_session *session) { g_mutex_lock (session->mtx); - if (session->wanna_die && session->threads == 0 && g_hash_table_size (session->events) == 0) { + if (session->wanna_die && g_hash_table_size (session->events) == 0) { session->wanna_die = FALSE; - if (session->fin != NULL) { - session->fin (session->user_data); + if (session->threads > 0) { + /* Wait for conditional variable to finish processing */ + g_cond_wait (session->cond, session->mtx); } - /* Check events count again */ - if (g_hash_table_size (session->events) != 0) { - if (session->restore != NULL) { - session->restore (session->user_data); + if (session->fin != NULL) { + if (! session->fin (session->user_data)) { + g_mutex_unlock (session->mtx); + /* Session finished incompletely, perform restoration */ + if (session->restore != NULL) { + session->restore (session->user_data); + /* Call pending once more */ + return check_session_pending (session); + } + return TRUE; } - g_mutex_unlock (session->mtx); - return TRUE; } g_mutex_unlock (session->mtx); return FALSE; @@ -215,7 +242,10 @@ void remove_async_thread (struct rspamd_async_session *session) { if (g_atomic_int_dec_and_test (&session->threads)) { - (void) check_session_pending (session); + /* Signal if there are any sessions waiting */ + g_mutex_lock (session->mtx); + g_cond_signal (session->cond); + g_mutex_unlock (session->mtx); } #ifdef RSPAMD_EVENTS_DEBUG msg_info ("removed thread: pending %d thread", session->threads); diff --git a/src/events.h b/src/events.h index 2c4ea2c91..1329d5837 100644 --- a/src/events.h +++ b/src/events.h @@ -6,6 +6,7 @@ struct rspamd_async_event; typedef void (*event_finalizer_t)(void *user_data); +typedef gboolean (*session_finalizer_t)(void *user_data); struct rspamd_async_event { GQuark subsystem; @@ -15,7 +16,7 @@ struct rspamd_async_event { }; struct rspamd_async_session { - event_finalizer_t fin; + session_finalizer_t fin; event_finalizer_t restore; event_finalizer_t cleanup; GHashTable *events; @@ -24,6 +25,7 @@ struct rspamd_async_session { gboolean wanna_die; guint threads; GMutex *mtx; + GCond *cond; }; /** @@ -36,7 +38,7 @@ struct rspamd_async_session { * @return */ struct rspamd_async_session *new_async_session (memory_pool_t *pool, - event_finalizer_t fin, event_finalizer_t restore, + session_finalizer_t fin, event_finalizer_t restore, event_finalizer_t cleanup, void *user_data); /** diff --git a/src/filter.c b/src/filter.c index cf8a63d0d..bca6d17d4 100644 --- a/src/filter.c +++ b/src/filter.c @@ -140,6 +140,12 @@ insert_metric_result (struct worker_task *task, struct metric *metric, const gch } +#if ((GLIB_MAJOR_VERSION == 2) && (GLIB_MINOR_VERSION <= 30)) +static GStaticMutex result_mtx = G_STATIC_MUTEX_INIT; +#else +G_LOCK_DEFINE (result_mtx); +#endif + static void insert_result_common (struct worker_task *task, const gchar *symbol, double flag, GList * opts, gboolean single) { @@ -147,6 +153,12 @@ insert_result_common (struct worker_task *task, const gchar *symbol, double flag struct cache_item *item; GList *cur, *metric_list; + /* Avoid concurrenting inserting of results */ +#if ((GLIB_MAJOR_VERSION == 2) && (GLIB_MINOR_VERSION <= 30)) + g_static_mutex_lock (&result_mtx); +#else + G_LOCK (result_mtx); +#endif metric_list = g_hash_table_lookup (task->cfg->metrics_symbols, symbol); if (metric_list) { cur = metric_list; @@ -174,6 +186,11 @@ insert_result_common (struct worker_task *task, const gchar *symbol, double flag /* XXX: it is not wise to destroy them here */ g_list_free (opts); } +#if ((GLIB_MAJOR_VERSION == 2) && (GLIB_MINOR_VERSION <= 30)) + g_static_mutex_unlock (&result_mtx); +#else + G_UNLOCK (result_mtx); +#endif } /* Insert result that may be increased on next insertions */ diff --git a/src/lua/lua_classifier.c b/src/lua/lua_classifier.c index 0bf0daca8..edaf4e7a6 100644 --- a/src/lua/lua_classifier.c +++ b/src/lua/lua_classifier.c @@ -123,6 +123,7 @@ call_classifier_pre_callbacks (struct classifier_config *ccf, struct worker_task struct classifier_callback_data *cd; lua_State *L; + /* Go throught all callbacks and call them, appending results to list */ cur = g_list_first (ccf->pre_callbacks); while (cur) { @@ -134,6 +135,7 @@ call_classifier_pre_callbacks (struct classifier_config *ccf, struct worker_task cur = g_list_next (cur); } + g_mutex_lock (lua_mtx); if (res == NULL) { L = task->cfg->lua_state; /* Check function from global table 'classifiers' */ @@ -149,6 +151,8 @@ call_classifier_pre_callbacks (struct classifier_config *ccf, struct worker_task } lua_pop (L, 1); } + g_mutex_unlock (lua_mtx); + return res; } @@ -162,6 +166,7 @@ call_classifier_post_callbacks (struct classifier_config *ccf, struct worker_tas double out = in; GList *cur; + g_mutex_lock (lua_mtx); /* Go throught all callbacks and call them, appending results to list */ cur = g_list_first (ccf->pre_callbacks); while (cur) { @@ -190,6 +195,7 @@ call_classifier_post_callbacks (struct classifier_config *ccf, struct worker_tas cur = g_list_next (cur); } + g_mutex_unlock (lua_mtx); return out; diff --git a/src/lua/lua_common.c b/src/lua/lua_common.c index 00fcadc93..4d90048ca 100644 --- a/src/lua/lua_common.c +++ b/src/lua/lua_common.c @@ -28,6 +28,9 @@ /* Lua module init function */ #define MODULE_INIT_FUNC "module_init" +/* Global lua mutex */ +GMutex *lua_mtx = NULL; + const luaL_reg null_reg[] = { {"__tostring", lua_class_tostring}, {NULL, NULL} @@ -224,6 +227,13 @@ init_lua (struct config_file *cfg) L = lua_open (); luaL_openlibs (L); +#if ((GLIB_MAJOR_VERSION == 2) && (GLIB_MINOR_VERSION <= 30)) + lua_mtx = g_mutex_new (); +#else + lua_mtx = g_malloc (sizeof (GMutex)); + g_mutex_init (lua_mtx); +#endif + (void)luaopen_rspamd (L); (void)luaopen_logger (L); (void)luaopen_config (L); @@ -320,6 +330,7 @@ lua_call_filter (const gchar *function, struct worker_task *task) struct worker_task **ptask; lua_State *L = task->cfg->lua_state; + g_mutex_lock (lua_mtx); lua_getglobal (L, function); ptask = lua_newuserdata (L, sizeof (struct worker_task *)); lua_setclass (L, "rspamd{task}", -1); @@ -335,6 +346,8 @@ lua_call_filter (const gchar *function, struct worker_task *task) } result = lua_tonumber (L, -1); lua_pop (L, 1); /* pop returned value */ + g_mutex_unlock (lua_mtx); + return result; } @@ -345,6 +358,7 @@ lua_call_chain_filter (const gchar *function, struct worker_task *task, gint *ma guint i; lua_State *L = task->cfg->lua_state; + g_mutex_lock (lua_mtx); lua_getglobal (L, function); for (i = 0; i < number; i++) { @@ -360,6 +374,8 @@ lua_call_chain_filter (const gchar *function, struct worker_task *task, gint *ma } result = lua_tonumber (L, -1); lua_pop (L, 1); /* pop returned value */ + g_mutex_unlock (lua_mtx); + return result; } @@ -374,6 +390,7 @@ lua_call_expression_func (const gchar *module, const gchar *function, struct expression_argument *arg; int nargs = 1, pop = 0; + g_mutex_lock (lua_mtx); /* Call specified function and expect result of given expected_type */ /* First check function in config table */ lua_getglobal (L, "config"); @@ -435,6 +452,7 @@ lua_call_expression_func (const gchar *module, const gchar *function, if (lua_pcall (L, nargs, 1, 0) != 0) { msg_info ("call to %s failed: %s", function, lua_tostring (L, -1)); + g_mutex_unlock (lua_mtx); return FALSE; } pop ++; @@ -442,11 +460,13 @@ lua_call_expression_func (const gchar *module, const gchar *function, if (!lua_isboolean (L, -1)) { lua_pop (L, pop); msg_info ("function %s must return a boolean", function); + g_mutex_unlock (lua_mtx); return FALSE; } *res = lua_toboolean (L, -1); lua_pop (L, pop); + g_mutex_unlock (lua_mtx); return TRUE; } @@ -468,6 +488,7 @@ lua_consolidation_callback (gpointer key, gpointer value, gpointer arg) struct consolidation_callback_data *data = (struct consolidation_callback_data *)arg; lua_State *L = data->task->cfg->lua_state; + g_mutex_lock (lua_mtx); lua_getglobal (L, data->func); lua_pushstring (L, (const gchar *)key); @@ -483,6 +504,7 @@ lua_consolidation_callback (gpointer key, gpointer value, gpointer arg) res = lua_tonumber (L, -1); lua_pop (L, 1); /* pop returned value */ data->score += res; + g_mutex_unlock (lua_mtx); } double @@ -519,6 +541,7 @@ lua_normalizer_func (struct config_file *cfg, long double score, void *params) return score; } + g_mutex_lock (lua_mtx); lua_getglobal (L, p->data); lua_pushnumber (L, score); @@ -533,6 +556,7 @@ lua_normalizer_func (struct config_file *cfg, long double score, void *params) res = lua_tonumber (L, -1); lua_pop (L, 1); + g_mutex_unlock (lua_mtx); return res; } diff --git a/src/lua/lua_common.h b/src/lua/lua_common.h index c1891a6a7..32604cbc4 100644 --- a/src/lua/lua_common.h +++ b/src/lua/lua_common.h @@ -15,6 +15,7 @@ #define LUA_INTERFACE_DEF(class, name) { #name, lua_##class##_##name } extern const luaL_reg null_reg[]; +extern GMutex *lua_mtx; #define RSPAMD_LUA_API_VERSION 9 diff --git a/src/lua/lua_config.c b/src/lua/lua_config.c index cd1287a18..bdb4ce056 100644 --- a/src/lua/lua_config.c +++ b/src/lua/lua_config.c @@ -318,6 +318,7 @@ lua_config_function_callback (struct worker_task *task, GList *args, void *user_ GList *cur; gboolean res = FALSE; + g_mutex_lock (lua_mtx); if (cd->cb_is_ref) { lua_rawgeti (cd->L, LUA_REGISTRYINDEX, cd->callback.ref); } @@ -347,6 +348,7 @@ lua_config_function_callback (struct worker_task *task, GList *args, void *user_ } lua_pop (cd->L, 1); } + g_mutex_unlock (lua_mtx); return res; } @@ -439,6 +441,7 @@ lua_call_post_filters (struct worker_task *task) struct worker_task **ptask; GList *cur; + g_mutex_lock (lua_mtx); cur = task->cfg->post_filters; while (cur) { cd = cur->data; @@ -458,6 +461,7 @@ lua_call_post_filters (struct worker_task *task) } cur = g_list_next (cur); } + g_mutex_unlock (lua_mtx); } static gint @@ -583,6 +587,7 @@ lua_metric_symbol_callback (struct worker_task *task, gpointer ud) struct lua_callback_data *cd = ud; struct worker_task **ptask; + g_mutex_lock (lua_mtx); if (cd->cb_is_ref) { lua_rawgeti (cd->L, LUA_REGISTRYINDEX, cd->callback.ref); } @@ -597,6 +602,7 @@ lua_metric_symbol_callback (struct worker_task *task, gpointer ud) msg_info ("call to %s failed: %s", cd->cb_is_ref ? "local function" : cd->callback.name, lua_tostring (cd->L, -1)); } + g_mutex_unlock (lua_mtx); } static gint diff --git a/src/lua/lua_task.c b/src/lua/lua_task.c index c0b299fb7..d28d897f6 100644 --- a/src/lua/lua_task.c +++ b/src/lua/lua_task.c @@ -468,6 +468,7 @@ lua_dns_callback (struct rspamd_dns_reply *reply, gpointer arg) union rspamd_reply_element *elt; GList *cur; + g_mutex_lock (lua_mtx); if (cd->cb_is_ref) { lua_rawgeti (cd->L, LUA_REGISTRYINDEX, cd->callback.ref); } @@ -553,6 +554,7 @@ lua_dns_callback (struct rspamd_dns_reply *reply, gpointer arg) if (cd->cb_is_ref) { luaL_unref (cd->L, LUA_REGISTRYINDEX, cd->callback.ref); } + g_mutex_unlock (lua_mtx); } static gint diff --git a/src/mem_pool.c b/src/mem_pool.c index d5b3ddf6e..c68ae4b13 100644 --- a/src/mem_pool.c +++ b/src/mem_pool.c @@ -41,6 +41,14 @@ pthread_mutex_t stat_mtx = PTHREAD_MUTEX_INITIALIZER; # define STAT_UNLOCK() do {} while (0) #endif +#if ((GLIB_MAJOR_VERSION == 2) && (GLIB_MINOR_VERSION <= 30)) +# define POOL_MTX_LOCK() do { g_static_mutex_lock (&pool->mtx); } while (0) +# define POOL_MTX_UNLOCK() do { g_static_mutex_unlock (&pool->mtx); } while (0) +#else +# define POOL_MTX_LOCK() do { g_mutex_lock (&pool->mtx); } while (0) +# define POOL_MTX_UNLOCK() do { g_mutex_unlock (&pool->mtx); } while (0) +#endif + /* * This define specify whether we should check all pools for free space for new object * or just begin scan from current (recently attached) pool @@ -101,6 +109,7 @@ pool_chain_new_shared (gsize size) struct _pool_chain_shared *chain; gpointer map; + #if defined(HAVE_MMAP_ANON) map = mmap (NULL, size + sizeof (struct _pool_chain_shared), PROT_READ | PROT_WRITE, MAP_ANON | MAP_SHARED, -1, 0); if (map == MAP_FAILED) { @@ -189,6 +198,11 @@ memory_pool_new (gsize size) new->destructors = NULL; /* Set it upon first call of set variable */ new->variables = NULL; +#if ((GLIB_MAJOR_VERSION == 2) && (GLIB_MINOR_VERSION <= 30)) + new->mtx = G_STATIC_MUTEX_INIT; +#else + g_mutex_init (&new->mtx); +#endif mem_pool_stat->pools_allocated++; @@ -203,6 +217,7 @@ memory_pool_alloc (memory_pool_t * pool, gsize size) gint free; if (pool) { + POOL_MTX_LOCK (); #ifdef MEMORY_GREEDY cur = pool->first_pool; #else @@ -214,6 +229,7 @@ memory_pool_alloc (memory_pool_t * pool, gsize size) } if (free < (gint)size && cur->next == NULL) { /* Allocate new pool */ + if (cur->len >= size + MEM_ALIGNMENT) { new = pool_chain_new (cur->len); } @@ -227,10 +243,12 @@ memory_pool_alloc (memory_pool_t * pool, gsize size) /* No need to align again */ tmp = new->pos; new->pos = tmp + size; + POOL_MTX_UNLOCK (); return tmp; } tmp = align_ptr (cur->pos, MEM_ALIGNMENT); cur->pos = tmp + size; + POOL_MTX_UNLOCK (); return tmp; } return NULL; @@ -317,6 +335,7 @@ memory_pool_alloc_shared (memory_pool_t * pool, gsize size) if (pool) { g_return_val_if_fail (size > 0, NULL); + POOL_MTX_LOCK (); cur = pool->shared_pool; if (!cur) { cur = pool_chain_new_shared (pool->first_pool->len); @@ -329,6 +348,7 @@ memory_pool_alloc_shared (memory_pool_t * pool, gsize size) } if (free < (gint)size && cur->next == NULL) { /* Allocate new pool */ + if (cur->len >= size + MEM_ALIGNMENT) { new = pool_chain_new_shared (cur->len); } @@ -342,10 +362,12 @@ memory_pool_alloc_shared (memory_pool_t * pool, gsize size) STAT_LOCK (); mem_pool_stat->bytes_allocated += size; STAT_UNLOCK (); + POOL_MTX_UNLOCK (); return new->begin; } tmp = align_ptr (cur->pos, MEM_ALIGNMENT); cur->pos = tmp + size; + POOL_MTX_UNLOCK (); return tmp; } return NULL; @@ -455,12 +477,14 @@ memory_pool_add_destructor_full (memory_pool_t * pool, pool_destruct_func func, cur = memory_pool_alloc (pool, sizeof (struct _pool_destructors)); if (cur) { + POOL_MTX_LOCK (); cur->func = func; cur->data = data; cur->function = function; cur->loc = line; cur->prev = pool->destructors; pool->destructors = cur; + POOL_MTX_UNLOCK (); } } @@ -488,6 +512,7 @@ memory_pool_delete (memory_pool_t * pool) struct _pool_chain_shared *cur_shared = pool->shared_pool, *tmp_shared; struct _pool_destructors *destructor = pool->destructors; + POOL_MTX_LOCK (); /* Call all pool destructors */ while (destructor) { /* Avoid calling destructors for NULL pointers */ @@ -522,6 +547,7 @@ memory_pool_delete (memory_pool_t * pool) } mem_pool_stat->pools_freed++; + POOL_MTX_UNLOCK (); g_slice_free (memory_pool_t, pool); } diff --git a/src/mem_pool.h b/src/mem_pool.h index 62f6dcb9a..475a00629 100644 --- a/src/mem_pool.h +++ b/src/mem_pool.h @@ -76,6 +76,11 @@ typedef struct memory_pool_s { struct _pool_chain_shared *shared_pool; /**< shared chain */ struct _pool_destructors *destructors; /**< destructors chain */ GHashTable *variables; /**< private memory pool variables */ +#if ((GLIB_MAJOR_VERSION == 2) && (GLIB_MINOR_VERSION <= 30)) + GStaticMutex mtx; /**< threads lock */ +#else + GMutex mtx; /**< threads lock */ +#endif } memory_pool_t; /** diff --git a/src/smtp_utils.c b/src/smtp_utils.c index 3a9d62cbb..8ceaef5a6 100644 --- a/src/smtp_utils.c +++ b/src/smtp_utils.c @@ -28,7 +28,7 @@ #include "smtp.h" #include "smtp_proto.h" -void +gboolean free_smtp_session (gpointer arg) { struct smtp_session *session = arg; @@ -56,6 +56,8 @@ free_smtp_session (gpointer arg) memory_pool_delete (session->pool); g_free (session); } + + return TRUE; } gboolean diff --git a/src/worker.c b/src/worker.c index f113e04d9..d56c2d924 100644 --- a/src/worker.c +++ b/src/worker.c @@ -613,7 +613,7 @@ err_socket (GError * err, void *arg) /* * Called if all filters are processed */ -static void +static gboolean fin_task (void *arg) { struct worker_task *task = (struct worker_task *) arg; @@ -630,20 +630,25 @@ fin_task (void *arg) /* Just process composites */ make_composites (task); } - /* Call post filters */ - lua_call_post_filters (task); + if (task->cfg->post_filters) { + /* More to process */ + /* Special state */ + task->state = WAIT_POST_FILTER; + return FALSE; + } + } /* Check if we have all events finished */ - if (g_hash_table_size (task->s->events) == 0 && task->s->threads == 0) { - task->state = WRITE_REPLY; - if (task->fin_callback) { - task->fin_callback (task->fin_arg); - } - else { - rspamd_dispatcher_restore (task->dispatcher); - } + task->state = WRITE_REPLY; + if (task->fin_callback) { + task->fin_callback (task->fin_arg); + } + else { + rspamd_dispatcher_restore (task->dispatcher); } + + return TRUE; } /* @@ -654,8 +659,8 @@ restore_task (void *arg) { struct worker_task *task = (struct worker_task *) arg; - /* Special state */ - task->state = WAIT_POST_FILTER; + /* Call post filters */ + lua_call_post_filters (task); task->s->wanna_die = TRUE; } |