From b90267a71cc8cdc8b38675322ef9fa7a9cb5468c Mon Sep 17 00:00:00 2001 From: Vsevolod Stakhov Date: Wed, 22 Aug 2012 21:41:48 +0400 Subject: [PATCH] * Rework thread pools locking logic to avoid global lua mutex usage. Fixed several memory leaks with modern glib. Fixed memory leak in dkim code. Fixed a problem with static global variables in shared libraries. --- src/classifiers/bayes.c | 28 +++-------------- src/classifiers/classifiers.h | 14 ++++----- src/classifiers/winnow.c | 8 ++--- src/events.c | 2 ++ src/expressions.c | 16 ++-------- src/expressions.h | 4 ++- src/filter.c | 34 +++++++++++++++++---- src/html.c | 7 ++--- src/lua/lua_buffer.c | 28 ++--------------- src/lua/lua_classifier.c | 38 ++++++++++------------- src/lua/lua_common.c | 57 ++++++++++++++++++++--------------- src/lua/lua_common.h | 22 +++++++++++--- src/lua/lua_config.c | 8 ----- src/lua/lua_dns.c | 10 ------ src/lua/lua_http.c | 19 +----------- src/lua/lua_mempool.c | 8 ----- src/lua/lua_redis.c | 17 ----------- src/lua/lua_session.c | 36 ++-------------------- src/lua/lua_task.c | 4 +-- src/lua/lua_xmlrpc.c | 12 +++++--- src/main.c | 6 ++-- src/mem_pool.c | 3 ++ src/plugins/dkim_check.c | 12 +++++++- src/plugins/regexp.c | 46 +++++++++++++++++----------- src/symbols_cache.c | 1 + src/util.c | 19 +++++++++++- src/util.h | 12 ++++++++ src/worker.c | 4 ++- 28 files changed, 215 insertions(+), 260 deletions(-) diff --git a/src/classifiers/bayes.c b/src/classifiers/bayes.c index 281cc6292..64e543bc6 100644 --- a/src/classifiers/bayes.c +++ b/src/classifiers/bayes.c @@ -165,16 +165,6 @@ bayes_classify_callback (gpointer key, gpointer value, gpointer data) return FALSE; } -#if ((GLIB_MAJOR_VERSION == 2) && (GLIB_MINOR_VERSION <= 30)) -static void -bayes_mutex_free (gpointer data) -{ - GMutex *mtx = data; - - g_mutex_free (mtx); -} -#endif - struct classifier_ctx* bayes_init (memory_pool_t *pool, struct classifier_config *cfg) { @@ -183,19 +173,12 @@ bayes_init (memory_pool_t *pool, struct classifier_config *cfg) ctx->pool = pool; ctx->cfg = cfg; ctx->debug = FALSE; -#if ((GLIB_MAJOR_VERSION == 2) && (GLIB_MINOR_VERSION <= 30)) - ctx->mtx = g_mutex_new (); - memory_pool_add_destructor (pool, (pool_destruct_func) bayes_mutex_free, ctx->mtx); -#else - ctx->mtx = memory_pool_alloc (pool, sizeof (GMutex)); - g_mutex_init (ctx->mtx); -#endif return ctx; } gboolean -bayes_classify (struct classifier_ctx* ctx, statfile_pool_t *pool, GTree *input, struct worker_task *task) +bayes_classify (struct classifier_ctx* ctx, statfile_pool_t *pool, GTree *input, struct worker_task *task, lua_State *L) { struct bayes_callback_data data; gchar *value; @@ -222,16 +205,13 @@ bayes_classify (struct classifier_ctx* ctx, statfile_pool_t *pool, GTree *input, } } - /* Critical section as here can be lua callbacks calling */ - g_mutex_lock (ctx->mtx); - cur = call_classifier_pre_callbacks (ctx->cfg, task, FALSE, FALSE); + cur = call_classifier_pre_callbacks (ctx->cfg, task, FALSE, FALSE, L); if (cur) { memory_pool_add_destructor (task->task_pool, (pool_destruct_func)g_list_free, cur); } else { cur = ctx->cfg->statfiles; } - g_mutex_unlock (ctx->mtx); data.statfiles_num = g_list_length (cur); data.statfiles = g_new0 (struct bayes_statfile_data, data.statfiles_num); @@ -402,7 +382,7 @@ bayes_learn (struct classifier_ctx* ctx, statfile_pool_t *pool, const char *symb gboolean bayes_learn_spam (struct classifier_ctx* ctx, statfile_pool_t *pool, - GTree *input, struct worker_task *task, gboolean is_spam, GError **err) + GTree *input, struct worker_task *task, gboolean is_spam, lua_State *L, GError **err) { struct bayes_callback_data data; gchar *value; @@ -431,7 +411,7 @@ bayes_learn_spam (struct classifier_ctx* ctx, statfile_pool_t *pool, } } - cur = call_classifier_pre_callbacks (ctx->cfg, task, FALSE, FALSE); + cur = call_classifier_pre_callbacks (ctx->cfg, task, FALSE, FALSE, L); if (cur) { memory_pool_add_destructor (task->task_pool, (pool_destruct_func)g_list_free, cur); } diff --git a/src/classifiers/classifiers.h b/src/classifiers/classifiers.h index 717083452..c70a71cc6 100644 --- a/src/classifiers/classifiers.h +++ b/src/classifiers/classifiers.h @@ -5,6 +5,7 @@ #include "mem_pool.h" #include "statfile.h" #include "tokenizers/tokenizers.h" +#include /* Consider this value as 0 */ #define ALPHA 0.0001 @@ -17,7 +18,6 @@ struct classifier_ctx { GHashTable *results; gboolean debug; struct classifier_config *cfg; - GMutex *mtx; }; struct classify_weight { @@ -29,12 +29,12 @@ struct classify_weight { struct classifier { char *name; struct classifier_ctx* (*init_func)(memory_pool_t *pool, struct classifier_config *cf); - gboolean (*classify_func)(struct classifier_ctx* ctx, statfile_pool_t *pool, GTree *input, struct worker_task *task); + gboolean (*classify_func)(struct classifier_ctx* ctx, statfile_pool_t *pool, GTree *input, struct worker_task *task, lua_State *L); gboolean (*learn_func)(struct classifier_ctx* ctx, statfile_pool_t *pool, const char *symbol, GTree *input, gboolean in_class, double *sum, double multiplier, GError **err); gboolean (*learn_spam_func)(struct classifier_ctx* ctx, statfile_pool_t *pool, - GTree *input, struct worker_task *task, gboolean is_spam, GError **err); + GTree *input, struct worker_task *task, gboolean is_spam, lua_State *L, GError **err); GList* (*weights_func)(struct classifier_ctx* ctx, statfile_pool_t *pool, GTree *input, struct worker_task *task); }; @@ -43,20 +43,20 @@ struct classifier* get_classifier (char *name); /* Winnow algorithm */ struct classifier_ctx* winnow_init (memory_pool_t *pool, struct classifier_config *cf); -gboolean winnow_classify (struct classifier_ctx* ctx, statfile_pool_t *pool, GTree *input, struct worker_task *task); +gboolean winnow_classify (struct classifier_ctx* ctx, statfile_pool_t *pool, GTree *input, struct worker_task *task, lua_State *L); gboolean winnow_learn (struct classifier_ctx* ctx, statfile_pool_t *pool, const char *symbol, GTree *input, gboolean in_class, double *sum, double multiplier, GError **err); gboolean winnow_learn_spam (struct classifier_ctx* ctx, statfile_pool_t *pool, - GTree *input, struct worker_task *task, gboolean is_spam, GError **err); + GTree *input, struct worker_task *task, gboolean is_spam, lua_State *L, GError **err); GList *winnow_weights (struct classifier_ctx* ctx, statfile_pool_t *pool, GTree *input, struct worker_task *task); /* Bayes algorithm */ struct classifier_ctx* bayes_init (memory_pool_t *pool, struct classifier_config *cf); -gboolean bayes_classify (struct classifier_ctx* ctx, statfile_pool_t *pool, GTree *input, struct worker_task *task); +gboolean bayes_classify (struct classifier_ctx* ctx, statfile_pool_t *pool, GTree *input, struct worker_task *task, lua_State *L); gboolean bayes_learn (struct classifier_ctx* ctx, statfile_pool_t *pool, const char *symbol, GTree *input, gboolean in_class, double *sum, double multiplier, GError **err); gboolean bayes_learn_spam (struct classifier_ctx* ctx, statfile_pool_t *pool, - GTree *input, struct worker_task *task, gboolean is_spam, GError **err); + GTree *input, struct worker_task *task, gboolean is_spam, lua_State *L, GError **err); GList *bayes_weights (struct classifier_ctx* ctx, statfile_pool_t *pool, GTree *input, struct worker_task *task); /* Array of all defined classifiers */ extern struct classifier classifiers[]; diff --git a/src/classifiers/winnow.c b/src/classifiers/winnow.c index 32c17e8bc..498ea6373 100644 --- a/src/classifiers/winnow.c +++ b/src/classifiers/winnow.c @@ -193,7 +193,7 @@ winnow_init (memory_pool_t * pool, struct classifier_config *cfg) } gboolean -winnow_classify (struct classifier_ctx *ctx, statfile_pool_t * pool, GTree * input, struct worker_task *task) +winnow_classify (struct classifier_ctx *ctx, statfile_pool_t * pool, GTree * input, struct worker_task *task, lua_State *L) { struct winnow_callback_data data; char *sumbuf, *value; @@ -221,7 +221,7 @@ winnow_classify (struct classifier_ctx *ctx, statfile_pool_t * pool, GTree * inp } } - cur = call_classifier_pre_callbacks (ctx->cfg, task, FALSE, FALSE); + cur = call_classifier_pre_callbacks (ctx->cfg, task, FALSE, FALSE, L); if (cur) { memory_pool_add_destructor (task->task_pool, (pool_destruct_func)g_list_free, cur); } @@ -261,7 +261,7 @@ winnow_classify (struct classifier_ctx *ctx, statfile_pool_t * pool, GTree * inp if (sel != NULL) { #ifdef WITH_LUA - max = call_classifier_post_callbacks (ctx->cfg, task, max); + max = call_classifier_post_callbacks (ctx->cfg, task, max, L); #endif #ifdef HAVE_TANHL max = tanhl (max); @@ -593,7 +593,7 @@ end: gboolean winnow_learn_spam (struct classifier_ctx* ctx, statfile_pool_t *pool, - GTree *input, struct worker_task *task, gboolean is_spam, GError **err) + GTree *input, struct worker_task *task, gboolean is_spam, lua_State *L, GError **err) { g_set_error (err, winnow_error_quark(), /* error domain */ diff --git a/src/events.c b/src/events.c index 0d831104c..364bf6d14 100644 --- a/src/events.c +++ b/src/events.c @@ -90,6 +90,8 @@ new_async_session (memory_pool_t * pool, session_finalizer_t fin, g_mutex_init (new->mtx); new->cond = memory_pool_alloc (pool, sizeof (GCond)); g_cond_init (new->cond); + memory_pool_add_destructor (pool, (pool_destruct_func) g_mutex_clear, new->mtx); + memory_pool_add_destructor (pool, (pool_destruct_func) g_cond_clear, new->cond); #endif new->threads = 0; diff --git a/src/expressions.c b/src/expressions.c index f552a5a77..fd94ed895 100644 --- a/src/expressions.c +++ b/src/expressions.c @@ -785,7 +785,7 @@ parse_regexp (memory_pool_t * pool, gchar *line, gboolean raw_mode) } gboolean -call_expression_function (struct expression_function * func, struct worker_task * task) +call_expression_function (struct expression_function * func, struct worker_task * task, lua_State *L) { struct _fl *selected, key; @@ -794,17 +794,7 @@ call_expression_function (struct expression_function * func, struct worker_task selected = bsearch (&key, list_ptr, functions_number, sizeof (struct _fl), fl_cmp); if (selected == NULL) { /* Try to check lua function */ -#if 0 - if (! lua_call_expression_func (NULL, func->name, task, func->args, &res)) { - msg_warn ("call to undefined function %s", key.name); - return FALSE; - } - else { - return res; - } -#else return FALSE; -#endif } return selected->func (task, func->args, selected->user_data); @@ -830,7 +820,7 @@ get_function_arg (struct expression *expr, struct worker_task *task, gboolean wa } else if (expr->type == EXPR_FUNCTION && !want_string) { res->type = EXPRESSION_ARGUMENT_BOOL; - cur = call_expression_function (expr->content.operand, task); + cur = call_expression_function (expr->content.operand, task, NULL); res->data = GSIZE_TO_POINTER (cur); } else { @@ -853,7 +843,7 @@ get_function_arg (struct expression *expr, struct worker_task *task, gboolean wa return res; } else if (it->type == EXPR_FUNCTION) { - cur = (gsize) call_expression_function ((struct expression_function *)it->content.operand, task); + cur = (gsize) call_expression_function ((struct expression_function *)it->content.operand, task, NULL); debug_task ("function %s returned %s", ((struct expression_function *)it->content.operand)->name, cur ? "true" : "false"); } else if (it->type == EXPR_OPERATION) { diff --git a/src/expressions.h b/src/expressions.h index 0c1576b36..6f42fa3e4 100644 --- a/src/expressions.h +++ b/src/expressions.h @@ -7,6 +7,7 @@ #define RSPAMD_EXPRESSIONS_H #include "config.h" +#include struct worker_task; struct rspamd_regexp; @@ -72,9 +73,10 @@ struct expression* parse_expression (memory_pool_t *pool, gchar *line); * Call specified fucntion and return boolean result * @param func function to call * @param task task object + * @param L lua specific state * @return TRUE or FALSE depending on function result */ -gboolean call_expression_function (struct expression_function *func, struct worker_task *task); +gboolean call_expression_function (struct expression_function *func, struct worker_task *task, lua_State *L); /** * Register specified function to rspamd internal functions list diff --git a/src/filter.c b/src/filter.c index d7d2a0c15..7593046ca 100644 --- a/src/filter.c +++ b/src/filter.c @@ -585,10 +585,16 @@ make_composites (struct worker_task *task) g_hash_table_foreach (task->results, composites_metric_callback, task); } +struct classifiers_cbdata { + struct worker_task *task; + struct lua_locked_state *nL; +}; + static void classifiers_callback (gpointer value, void *arg) { - struct worker_task *task = arg; + struct classifiers_cbdata *cbdata = arg; + struct worker_task *task; struct classifier_config *cl = value; struct classifier_ctx *ctx; struct mime_text_part *text_part, *p1, *p2; @@ -600,6 +606,8 @@ classifiers_callback (gpointer value, void *arg) gint *dist = NULL, diff; gboolean is_twopart = FALSE; + task = cbdata->task; + if ((header = g_hash_table_lookup (cl->opts, "header")) != NULL) { cur = message_get_header (task->task_pool, task->message, header, FALSE); if (cur) { @@ -675,7 +683,15 @@ classifiers_callback (gpointer value, void *arg) /* Take care of subject */ tokenize_subject (task, &tokens); - cl->classifier->classify_func (ctx, task->worker->srv->statfile_pool, tokens, task); + if (cbdata->nL != NULL) { + rspamd_mutex_lock (cbdata->nL->m); + cl->classifier->classify_func (ctx, task->worker->srv->statfile_pool, tokens, task, cbdata->nL->L); + rspamd_mutex_unlock (cbdata->nL->m); + } + else { + /* Non-threaded case */ + cl->classifier->classify_func (ctx, task->worker->srv->statfile_pool, tokens, task, task->cfg->lua_state); + } /* Autolearning */ cur = g_list_first (cl->statfiles); @@ -695,6 +711,7 @@ classifiers_callback (gpointer value, void *arg) void process_statfiles (struct worker_task *task) { + struct classifiers_cbdata cbdata; if (task->is_skipped) { return; @@ -704,8 +721,9 @@ process_statfiles (struct worker_task *task) task->tokens = g_hash_table_new (g_direct_hash, g_direct_equal); memory_pool_add_destructor (task->task_pool, (pool_destruct_func)g_hash_table_unref, task->tokens); } - - g_list_foreach (task->cfg->classifiers, classifiers_callback, task); + cbdata.task = task; + cbdata.nL = NULL; + g_list_foreach (task->cfg->classifiers, classifiers_callback, &cbdata); /* Process results */ make_composites (task); @@ -715,6 +733,8 @@ void process_statfiles_threaded (gpointer data, gpointer user_data) { struct worker_task *task = (struct worker_task *)data; + struct lua_locked_state *nL = user_data; + struct classifiers_cbdata cbdata; if (task->is_skipped) { return; @@ -725,7 +745,9 @@ process_statfiles_threaded (gpointer data, gpointer user_data) memory_pool_add_destructor (task->task_pool, (pool_destruct_func)g_hash_table_unref, task->tokens); } - g_list_foreach (task->cfg->classifiers, classifiers_callback, task); + cbdata.task = task; + cbdata.nL = nL; + g_list_foreach (task->cfg->classifiers, classifiers_callback, &cbdata); remove_async_thread (task->s); } @@ -1054,7 +1076,7 @@ learn_task_spam (struct classifier_config *cl, struct worker_task *task, gboolea /* Learn */ if (!cl->classifier->learn_spam_func ( cls_ctx, task->worker->srv->statfile_pool, - tokens, task, is_spam, err)) { + tokens, task, is_spam, task->cfg->lua_state, err)) { if (*err) { msg_info ("learn failed for message <%s>, learn error: %s", task->message_id, (*err)->message); return FALSE; diff --git a/src/html.c b/src/html.c index 51d7ccb16..73c4193c6 100644 --- a/src/html.c +++ b/src/html.c @@ -29,7 +29,7 @@ #include "html.h" #include "url.h" -sig_atomic_t tags_sorted = 0; +static sig_atomic_t tags_sorted = 0; static struct html_tag tag_defs[] = { /* W3C defined elements */ @@ -156,7 +156,7 @@ static struct html_tag tag_defs[] = { {Tag_WBR, "wbr", (CM_INLINE | CM_EMPTY)}, }; -sig_atomic_t entities_sorted = 0; +static sig_atomic_t entities_sorted = 0; struct _entity; typedef struct _entity entity; @@ -438,7 +438,7 @@ static entity entities_defs[] = { {"euro", 8364, "E"}, }; -static entity *entities_defs_num = NULL; +static entity entities_defs_num[ (G_N_ELEMENTS (entities_defs)) ]; static gint tag_cmp (const void *m1, const void *m2) @@ -881,7 +881,6 @@ add_html_node (struct worker_task *task, memory_pool_t * pool, struct mime_text_ } if (!entities_sorted) { qsort (entities_defs, G_N_ELEMENTS (entities_defs), sizeof (entity), entity_cmp); - entities_defs_num = g_new (entity, G_N_ELEMENTS (entities_defs)); memcpy (entities_defs_num, entities_defs, sizeof (entities_defs)); qsort (entities_defs_num, G_N_ELEMENTS (entities_defs), sizeof (entity), entity_cmp_num); entities_sorted = 1; diff --git a/src/lua/lua_buffer.c b/src/lua/lua_buffer.c index 43df6a6d8..b7a460b52 100644 --- a/src/lua/lua_buffer.c +++ b/src/lua/lua_buffer.c @@ -82,13 +82,9 @@ static gboolean lua_io_read_cb (f_str_t * in, void *arg) { struct lua_dispatcher_cbdata *cbdata = arg; - gboolean need_unlock = FALSE, res; + gboolean res; rspamd_io_dispatcher_t **pdispatcher; - /* Avoid LOR here as mutex can be acquired before in lua_call */ - if (g_mutex_trylock (lua_mtx)) { - need_unlock = TRUE; - } /* callback (dispatcher, data) */ lua_rawgeti (cbdata->L, LUA_REGISTRYINDEX, cbdata->cbref_read); pdispatcher = lua_newuserdata (cbdata->L, sizeof (struct rspamd_io_dispatcher_s *)); @@ -103,10 +99,6 @@ lua_io_read_cb (f_str_t * in, void *arg) res = lua_toboolean (cbdata->L, -1); lua_pop (cbdata->L, 1); - if (need_unlock) { - g_mutex_unlock (lua_mtx); - } - return res; } @@ -114,14 +106,10 @@ static gboolean lua_io_write_cb (void *arg) { struct lua_dispatcher_cbdata *cbdata = arg; - gboolean need_unlock = FALSE, res; + gboolean res; rspamd_io_dispatcher_t **pdispatcher; if (cbdata->cbref_write) { - /* Avoid LOR here as mutex can be acquired before in lua_call */ - if (g_mutex_trylock (lua_mtx)) { - need_unlock = TRUE; - } lua_rawgeti (cbdata->L, LUA_REGISTRYINDEX, cbdata->cbref_read); /* callback (dispatcher) */ pdispatcher = lua_newuserdata (cbdata->L, sizeof (struct rspamd_io_dispatcher_s *)); @@ -135,10 +123,6 @@ lua_io_write_cb (void *arg) res = lua_toboolean (cbdata->L, -1); lua_pop (cbdata->L, 1); - - if (need_unlock) { - g_mutex_unlock (lua_mtx); - } } return res; @@ -148,13 +132,8 @@ static void lua_io_err_cb (GError * err, void *arg) { struct lua_dispatcher_cbdata *cbdata = arg; - gboolean need_unlock = FALSE; rspamd_io_dispatcher_t **pdispatcher; - /* Avoid LOR here as mutex can be acquired before in lua_call */ - if (g_mutex_trylock (lua_mtx)) { - need_unlock = TRUE; - } /* callback (dispatcher, err) */ lua_rawgeti (cbdata->L, LUA_REGISTRYINDEX, cbdata->cbref_err); pdispatcher = lua_newuserdata (cbdata->L, sizeof (struct rspamd_io_dispatcher_s *)); @@ -166,9 +145,6 @@ lua_io_err_cb (GError * err, void *arg) msg_info ("call to session finalizer failed: %s", lua_tostring (cbdata->L, -1)); } - if (need_unlock) { - g_mutex_unlock (lua_mtx); - } /* Unref callbacks */ luaL_unref (cbdata->L, LUA_REGISTRYINDEX, cbdata->cbref_read); if (cbdata->cbref_write) { diff --git a/src/lua/lua_classifier.c b/src/lua/lua_classifier.c index e929c6b50..4962a0e2b 100644 --- a/src/lua/lua_classifier.c +++ b/src/lua/lua_classifier.c @@ -117,27 +117,24 @@ call_classifier_pre_callback (struct classifier_config *ccf, struct worker_task /* Return list of statfiles that should be checked for this message */ GList * call_classifier_pre_callbacks (struct classifier_config *ccf, struct worker_task *task, - gboolean is_learn, gboolean is_spam) + gboolean is_learn, gboolean is_spam, lua_State *L) { GList *res = NULL, *cur; struct classifier_callback_data *cd; - lua_State *L; /* Go throught all callbacks and call them, appending results to list */ cur = g_list_first (ccf->pre_callbacks); while (cur) { cd = cur->data; - lua_getglobal (cd->L, cd->name); + lua_getglobal (L, cd->name); - res = g_list_concat (res, call_classifier_pre_callback (ccf, task, cd->L, is_learn, is_spam)); + res = g_list_concat (res, call_classifier_pre_callback (ccf, task, L, is_learn, is_spam)); cur = g_list_next (cur); } - - g_mutex_lock (lua_mtx); + if (res == NULL) { - L = task->cfg->lua_state; /* Check function from global table 'classifiers' */ lua_getglobal (L, "classifiers"); if (lua_istable (L, -1)) { @@ -151,14 +148,13 @@ call_classifier_pre_callbacks (struct classifier_config *ccf, struct worker_task } lua_pop (L, 1); } - g_mutex_unlock (lua_mtx); return res; } /* Return result mark for statfile */ double -call_classifier_post_callbacks (struct classifier_config *ccf, struct worker_task *task, double in) +call_classifier_post_callbacks (struct classifier_config *ccf, struct worker_task *task, double in, lua_State *L) { struct classifier_callback_data *cd; struct classifier_config **pccf; @@ -166,36 +162,34 @@ call_classifier_post_callbacks (struct classifier_config *ccf, struct worker_tas double out = in; GList *cur; - g_mutex_lock (lua_mtx); /* Go throught all callbacks and call them, appending results to list */ cur = g_list_first (ccf->pre_callbacks); while (cur) { cd = cur->data; - lua_getglobal (cd->L, cd->name); + lua_getglobal (L, cd->name); - pccf = lua_newuserdata (cd->L, sizeof (struct classifier_config *)); - lua_setclass (cd->L, "rspamd{classifier}", -1); + pccf = lua_newuserdata (L, sizeof (struct classifier_config *)); + lua_setclass (L, "rspamd{classifier}", -1); *pccf = ccf; - ptask = lua_newuserdata (cd->L, sizeof (struct worker_task *)); - lua_setclass (cd->L, "rspamd{task}", -1); + ptask = lua_newuserdata (L, sizeof (struct worker_task *)); + lua_setclass (L, "rspamd{task}", -1); *ptask = task; - lua_pushnumber (cd->L, out); + lua_pushnumber (L, out); - if (lua_pcall (cd->L, 3, 1, 0) != 0) { - msg_warn ("error running function %s: %s", cd->name, lua_tostring (cd->L, -1)); + if (lua_pcall (L, 3, 1, 0) != 0) { + msg_warn ("error running function %s: %s", cd->name, lua_tostring (L, -1)); } else { - if (lua_isnumber (cd->L, 1)) { - out = lua_tonumber (cd->L, 1); + if (lua_isnumber (L, 1)) { + out = lua_tonumber (L, 1); } - lua_pop (cd->L, 1); + lua_pop (L, 1); } cur = g_list_next (cur); } - g_mutex_unlock (lua_mtx); return out; diff --git a/src/lua/lua_common.c b/src/lua/lua_common.c index cb114bb34..f3a33a7d2 100644 --- a/src/lua/lua_common.c +++ b/src/lua/lua_common.c @@ -28,9 +28,6 @@ /* Lua module init function */ #define MODULE_INIT_FUNC "module_init" -/* Global lua mutex */ -GMutex *lua_mtx = NULL; - const luaL_reg null_reg[] = { {"__tostring", lua_class_tostring}, {NULL, NULL} @@ -404,7 +401,7 @@ lua_add_actions_global (lua_State *L) lua_setglobal (L, "rspamd_actions"); } -void +lua_State * init_lua (struct config_file *cfg) { lua_State *L; @@ -412,13 +409,6 @@ init_lua (struct config_file *cfg) L = lua_open (); luaL_openlibs (L); -#if ((GLIB_MAJOR_VERSION == 2) && (GLIB_MINOR_VERSION <= 30)) - lua_mtx = g_mutex_new (); -#else - lua_mtx = g_malloc (sizeof (GMutex)); - g_mutex_init (lua_mtx); -#endif - (void)luaopen_rspamd (L); (void)luaopen_logger (L); (void)luaopen_util (L); @@ -446,9 +436,38 @@ init_lua (struct config_file *cfg) (void)luaopen_io_dispatcher (L); (void)luaopen_dns_resolver (L); - cfg->lua_state = L; - memory_pool_add_destructor (cfg->cfg_pool, (pool_destruct_func)lua_close, L); + return L; +} + +/** + * Initialize new locked lua_State structure + */ +struct lua_locked_state* +init_lua_locked (struct config_file *cfg) +{ + struct lua_locked_state *new; + + new = g_slice_alloc (sizeof (struct lua_locked_state)); + new->L = init_lua (cfg); + new->m = rspamd_mutex_new (); + + return new; +} + + +/** + * Free locked state structure + */ +void +free_lua_locked (struct lua_locked_state *st) +{ + g_assert (st != NULL); + + lua_close (st->L); + + rspamd_mutex_free (st->m); + g_slice_free1 (sizeof (struct lua_locked_state), st); } gboolean @@ -524,7 +543,6 @@ lua_call_filter (const gchar *function, struct worker_task *task) struct worker_task **ptask; lua_State *L = task->cfg->lua_state; - g_mutex_lock (lua_mtx); lua_getglobal (L, function); ptask = lua_newuserdata (L, sizeof (struct worker_task *)); lua_setclass (L, "rspamd{task}", -1); @@ -540,7 +558,6 @@ lua_call_filter (const gchar *function, struct worker_task *task) } result = lua_tonumber (L, -1); lua_pop (L, 1); /* pop returned value */ - g_mutex_unlock (lua_mtx); return result; } @@ -552,7 +569,6 @@ lua_call_chain_filter (const gchar *function, struct worker_task *task, gint *ma guint i; lua_State *L = task->cfg->lua_state; - g_mutex_lock (lua_mtx); lua_getglobal (L, function); for (i = 0; i < number; i++) { @@ -568,7 +584,6 @@ lua_call_chain_filter (const gchar *function, struct worker_task *task, gint *ma } result = lua_tonumber (L, -1); lua_pop (L, 1); /* pop returned value */ - g_mutex_unlock (lua_mtx); return result; } @@ -584,7 +599,6 @@ lua_call_expression_func (const gchar *module, const gchar *function, struct expression_argument *arg; int nargs = 1, pop = 0; - g_mutex_lock (lua_mtx); /* Call specified function and expect result of given expected_type */ /* First check function in config table */ lua_getglobal (L, "config"); @@ -646,7 +660,6 @@ lua_call_expression_func (const gchar *module, const gchar *function, if (lua_pcall (L, nargs, 1, 0) != 0) { msg_info ("call to %s failed: %s", function, lua_tostring (L, -1)); - g_mutex_unlock (lua_mtx); return FALSE; } pop ++; @@ -654,13 +667,11 @@ lua_call_expression_func (const gchar *module, const gchar *function, if (!lua_isboolean (L, -1)) { lua_pop (L, pop); msg_info ("function %s must return a boolean", function); - g_mutex_unlock (lua_mtx); return FALSE; } *res = lua_toboolean (L, -1); lua_pop (L, pop); - g_mutex_unlock (lua_mtx); return TRUE; } @@ -682,7 +693,6 @@ lua_consolidation_callback (gpointer key, gpointer value, gpointer arg) struct consolidation_callback_data *data = (struct consolidation_callback_data *)arg; lua_State *L = data->task->cfg->lua_state; - g_mutex_lock (lua_mtx); lua_getglobal (L, data->func); lua_pushstring (L, (const gchar *)key); @@ -698,7 +708,6 @@ lua_consolidation_callback (gpointer key, gpointer value, gpointer arg) res = lua_tonumber (L, -1); lua_pop (L, 1); /* pop returned value */ data->score += res; - g_mutex_unlock (lua_mtx); } double @@ -735,7 +744,6 @@ lua_normalizer_func (struct config_file *cfg, long double score, void *params) return score; } - g_mutex_lock (lua_mtx); lua_getglobal (L, p->data); lua_pushnumber (L, score); @@ -750,7 +758,6 @@ lua_normalizer_func (struct config_file *cfg, long double score, void *params) res = lua_tonumber (L, -1); lua_pop (L, 1); - g_mutex_unlock (lua_mtx); return res; } diff --git a/src/lua/lua_common.h b/src/lua/lua_common.h index 1a309e0f0..712a51062 100644 --- a/src/lua/lua_common.h +++ b/src/lua/lua_common.h @@ -15,10 +15,15 @@ #define LUA_INTERFACE_DEF(class, name) { #name, lua_##class##_##name } extern const luaL_reg null_reg[]; -extern GMutex *lua_mtx; #define RSPAMD_LUA_API_VERSION 12 +/* Locked lua state with mutex */ +struct lua_locked_state { + lua_State *L; + rspamd_mutex_t *m; +}; + /* Common utility functions */ /** @@ -54,13 +59,22 @@ gpointer lua_check_class (lua_State *L, gint index, const gchar *name); /** * Initialize lua and bindings */ -void init_lua (struct config_file *cfg); +lua_State* init_lua (struct config_file *cfg); /** * Load and initialize lua plugins */ gboolean init_lua_filters (struct config_file *cfg); +/** + * Initialize new locked lua_State structure + */ +struct lua_locked_state* init_lua_locked (struct config_file *cfg); +/** + * Free locked state structure + */ +void free_lua_locked (struct lua_locked_state *st); + /** * Open libraries functions */ @@ -97,8 +111,8 @@ void lua_call_pre_filters (struct worker_task *task); void add_luabuf (const gchar *line); /* Classify functions */ -GList *call_classifier_pre_callbacks (struct classifier_config *ccf, struct worker_task *task, gboolean is_learn, gboolean is_spam); -double call_classifier_post_callbacks (struct classifier_config *ccf, struct worker_task *task, double in); +GList *call_classifier_pre_callbacks (struct classifier_config *ccf, struct worker_task *task, gboolean is_learn, gboolean is_spam, lua_State *L); +double call_classifier_post_callbacks (struct classifier_config *ccf, struct worker_task *task, double in, lua_State *L); double lua_normalizer_func (struct config_file *cfg, long double score, void *params); diff --git a/src/lua/lua_config.c b/src/lua/lua_config.c index 217bfc761..e2edf6a44 100644 --- a/src/lua/lua_config.c +++ b/src/lua/lua_config.c @@ -336,7 +336,6 @@ lua_config_function_callback (struct worker_task *task, GList *args, void *user_ GList *cur; gboolean res = FALSE; - g_mutex_lock (lua_mtx); if (cd->cb_is_ref) { lua_rawgeti (cd->L, LUA_REGISTRYINDEX, cd->callback.ref); } @@ -366,7 +365,6 @@ lua_config_function_callback (struct worker_task *task, GList *args, void *user_ } lua_pop (cd->L, 1); } - g_mutex_unlock (lua_mtx); return res; } @@ -459,7 +457,6 @@ lua_call_post_filters (struct worker_task *task) struct worker_task **ptask; GList *cur; - g_mutex_lock (lua_mtx); cur = task->cfg->post_filters; while (cur) { cd = cur->data; @@ -479,7 +476,6 @@ lua_call_post_filters (struct worker_task *task) } cur = g_list_next (cur); } - g_mutex_unlock (lua_mtx); } static gint @@ -514,7 +510,6 @@ lua_call_pre_filters (struct worker_task *task) struct worker_task **ptask; GList *cur; - g_mutex_lock (lua_mtx); cur = task->cfg->pre_filters; while (cur) { cd = cur->data; @@ -534,7 +529,6 @@ lua_call_pre_filters (struct worker_task *task) } cur = g_list_next (cur); } - g_mutex_unlock (lua_mtx); } static gint @@ -660,7 +654,6 @@ lua_metric_symbol_callback (struct worker_task *task, gpointer ud) struct lua_callback_data *cd = ud; struct worker_task **ptask; - g_mutex_lock (lua_mtx); if (cd->cb_is_ref) { lua_rawgeti (cd->L, LUA_REGISTRYINDEX, cd->callback.ref); } @@ -675,7 +668,6 @@ lua_metric_symbol_callback (struct worker_task *task, gpointer ud) msg_info ("call to %s failed: %s", cd->cb_is_ref ? "local function" : cd->callback.name, lua_tostring (cd->L, -1)); } - g_mutex_unlock (lua_mtx); } static gint diff --git a/src/lua/lua_dns.c b/src/lua/lua_dns.c index b31cac60a..48b0b218f 100644 --- a/src/lua/lua_dns.c +++ b/src/lua/lua_dns.c @@ -71,12 +71,6 @@ lua_dns_callback (struct rspamd_dns_reply *reply, gpointer arg) struct rspamd_dns_resolver **presolver; union rspamd_reply_element *elt; GList *cur; - gboolean need_unlock = FALSE; - - /* Avoid LOR here as mutex can be acquired before in lua_call */ - if (g_mutex_trylock (lua_mtx)) { - need_unlock = TRUE; - } lua_rawgeti (cd->L, LUA_REGISTRYINDEX, cd->cbref); presolver = lua_newuserdata (cd->L, sizeof (gpointer)); @@ -140,10 +134,6 @@ lua_dns_callback (struct rspamd_dns_reply *reply, gpointer arg) /* Unref function */ luaL_unref (cd->L, LUA_REGISTRYINDEX, cd->cbref); - - if (need_unlock) { - g_mutex_unlock (lua_mtx); - } } static int diff --git a/src/lua/lua_http.c b/src/lua/lua_http.c index 3fa5da2ab..433529940 100644 --- a/src/lua/lua_http.c +++ b/src/lua/lua_http.c @@ -81,12 +81,6 @@ lua_http_push_error (gint code, struct lua_http_ud *ud) { struct worker_task **ptask; gint num; - gboolean need_unlock = FALSE; - - /* Avoid LOR here as mutex can be acquired before in lua_call */ - if (g_mutex_trylock (lua_mtx)) { - need_unlock = TRUE; - } /* Push error */ if (ud->callback) { @@ -115,9 +109,7 @@ lua_http_push_error (gint code, struct lua_http_ud *ud) g_list_free (ud->headers); ud->headers = NULL; } - if (need_unlock) { - g_mutex_unlock (lua_mtx); - } + ud->parser_state = 3; remove_normal_event (ud->s, lua_http_fin, ud); @@ -130,12 +122,6 @@ lua_http_push_reply (f_str_t *in, struct lua_http_ud *ud) struct lua_http_header *header; struct worker_task **ptask; gint num; - gboolean need_unlock = FALSE; - - /* Avoid LOR here as mutex can be acquired before in lua_call */ - if (g_mutex_trylock (lua_mtx)) { - need_unlock = TRUE; - } if (ud->callback) { /* Push error */ @@ -175,9 +161,6 @@ lua_http_push_reply (f_str_t *in, struct lua_http_ud *ud) ud->headers = NULL; } - if (need_unlock) { - g_mutex_unlock (lua_mtx); - } remove_normal_event (ud->s, lua_http_fin, ud); } diff --git a/src/lua/lua_mempool.c b/src/lua/lua_mempool.c index 40094abc4..9d6168fa3 100644 --- a/src/lua/lua_mempool.c +++ b/src/lua/lua_mempool.c @@ -93,20 +93,12 @@ static void lua_mempool_destructor_func (gpointer p) { struct lua_mempool_udata *ud = p; - gboolean need_unlock = FALSE; - /* Avoid LOR here as mutex can be acquired before in lua_call */ - if (g_mutex_trylock (lua_mtx)) { - need_unlock = TRUE; - } lua_rawgeti (ud->L, LUA_REGISTRYINDEX, ud->cbref); if (lua_pcall (ud->L, 0, 0, 0) != 0) { msg_info ("call to destructor failed: %s", lua_tostring (ud->L, -1)); } luaL_unref (ud->L, LUA_REGISTRYINDEX, ud->cbref); - if (need_unlock) { - g_mutex_unlock (lua_mtx); - } } static int diff --git a/src/lua/lua_redis.c b/src/lua/lua_redis.c index 2e6211078..e312092cb 100644 --- a/src/lua/lua_redis.c +++ b/src/lua/lua_redis.c @@ -83,9 +83,7 @@ lua_redis_fin (void *arg) if (ud->ctx) { redisAsyncFree (ud->ctx); - g_mutex_lock (lua_mtx); luaL_unref (ud->L, LUA_REGISTRYINDEX, ud->cbref); - g_mutex_unlock (lua_mtx); } } @@ -98,12 +96,7 @@ static void lua_redis_push_error (const gchar *err, struct lua_redis_userdata *ud, gboolean connected) { struct worker_task **ptask; - gboolean need_unlock = FALSE; - /* Avoid LOR here as mutex can be acquired before in lua_call */ - if (g_mutex_trylock (lua_mtx)) { - need_unlock = TRUE; - } /* Push error */ lua_rawgeti (ud->L, LUA_REGISTRYINDEX, ud->cbref); ptask = lua_newuserdata (ud->L, sizeof (struct worker_task *)); @@ -117,9 +110,6 @@ lua_redis_push_error (const gchar *err, struct lua_redis_userdata *ud, gboolean if (lua_pcall (ud->L, 3, 0, 0) != 0) { msg_info ("call to callback failed: %s", lua_tostring (ud->L, -1)); } - if (need_unlock) { - g_mutex_unlock (lua_mtx); - } if (connected) { remove_normal_event (ud->task->s, lua_redis_fin, ud); @@ -136,11 +126,7 @@ static void lua_redis_push_data (const redisReply *r, struct lua_redis_userdata *ud) { struct worker_task **ptask; - gboolean need_unlock = FALSE; - if (g_mutex_trylock (lua_mtx)) { - need_unlock = TRUE; - } /* Push error */ lua_rawgeti (ud->L, LUA_REGISTRYINDEX, ud->cbref); ptask = lua_newuserdata (ud->L, sizeof (struct worker_task *)); @@ -170,9 +156,6 @@ lua_redis_push_data (const redisReply *r, struct lua_redis_userdata *ud) if (lua_pcall (ud->L, 3, 0, 0) != 0) { msg_info ("call to callback failed: %s", lua_tostring (ud->L, -1)); } - if (need_unlock) { - g_mutex_unlock (lua_mtx); - } remove_normal_event (ud->task->s, lua_redis_fin, ud); } diff --git a/src/lua/lua_session.c b/src/lua/lua_session.c index a25363430..db8d0ef23 100644 --- a/src/lua/lua_session.c +++ b/src/lua/lua_session.c @@ -89,12 +89,7 @@ static gboolean lua_session_finalizer (gpointer ud) { struct lua_session_udata *cbdata = ud; - gboolean need_unlock = FALSE, res; - - /* Avoid LOR here as mutex can be acquired before in lua_call */ - if (g_mutex_trylock (lua_mtx)) { - need_unlock = TRUE; - } + gboolean res; /* Call finalizer function */ lua_rawgeti (cbdata->L, LUA_REGISTRYINDEX, cbdata->cbref_fin); @@ -104,9 +99,7 @@ lua_session_finalizer (gpointer ud) res = lua_toboolean (cbdata->L, -1); lua_pop (cbdata->L, 1); luaL_unref (cbdata->L, LUA_REGISTRYINDEX, cbdata->cbref_fin); - if (need_unlock) { - g_mutex_unlock (lua_mtx); - } + return res; } @@ -115,13 +108,8 @@ static void lua_session_restore (gpointer ud) { struct lua_session_udata *cbdata = ud; - gboolean need_unlock = FALSE; if (cbdata->cbref_restore) { - /* Avoid LOR here as mutex can be acquired before in lua_call */ - if (g_mutex_trylock (lua_mtx)) { - need_unlock = TRUE; - } /* Call restorer function */ lua_rawgeti (cbdata->L, LUA_REGISTRYINDEX, cbdata->cbref_restore); @@ -129,9 +117,6 @@ lua_session_restore (gpointer ud) msg_info ("call to session restorer failed: %s", lua_tostring (cbdata->L, -1)); } luaL_unref (cbdata->L, LUA_REGISTRYINDEX, cbdata->cbref_restore); - if (need_unlock) { - g_mutex_unlock (lua_mtx); - } } } @@ -139,13 +124,8 @@ static void lua_session_cleanup (gpointer ud) { struct lua_session_udata *cbdata = ud; - gboolean need_unlock = FALSE; if (cbdata->cbref_cleanup) { - /* Avoid LOR here as mutex can be acquired before in lua_call */ - if (g_mutex_trylock (lua_mtx)) { - need_unlock = TRUE; - } /* Call restorer function */ lua_rawgeti (cbdata->L, LUA_REGISTRYINDEX, cbdata->cbref_cleanup); @@ -153,9 +133,6 @@ lua_session_cleanup (gpointer ud) msg_info ("call to session cleanup failed: %s", lua_tostring (cbdata->L, -1)); } luaL_unref (cbdata->L, LUA_REGISTRYINDEX, cbdata->cbref_cleanup); - if (need_unlock) { - g_mutex_unlock (lua_mtx); - } } } @@ -238,23 +215,14 @@ static void lua_event_fin (gpointer ud) { struct lua_event_udata *cbdata = ud; - gboolean need_unlock = FALSE; if (cbdata->cbref) { - /* Avoid LOR here as mutex can be acquired before in lua_call */ - if (g_mutex_trylock (lua_mtx)) { - need_unlock = TRUE; - } - /* Call restorer function */ lua_rawgeti (cbdata->L, LUA_REGISTRYINDEX, cbdata->cbref); if (lua_pcall (cbdata->L, 0, 0, 0) != 0) { msg_info ("call to event finalizer failed: %s", lua_tostring (cbdata->L, -1)); } luaL_unref (cbdata->L, LUA_REGISTRYINDEX, cbdata->cbref); - if (need_unlock) { - g_mutex_unlock (lua_mtx); - } } } diff --git a/src/lua/lua_task.c b/src/lua/lua_task.c index bc9c3ed3b..652607d49 100644 --- a/src/lua/lua_task.c +++ b/src/lua/lua_task.c @@ -689,7 +689,6 @@ lua_dns_callback (struct rspamd_dns_reply *reply, gpointer arg) union rspamd_reply_element *elt; GList *cur; - g_mutex_lock (lua_mtx); if (cd->cb_is_ref) { lua_rawgeti (cd->L, LUA_REGISTRYINDEX, cd->callback.ref); } @@ -775,7 +774,6 @@ lua_dns_callback (struct rspamd_dns_reply *reply, gpointer arg) if (cd->cb_is_ref) { luaL_unref (cd->L, LUA_REGISTRYINDEX, cd->callback.ref); } - g_mutex_unlock (lua_mtx); } static gint @@ -955,7 +953,7 @@ lua_task_call_rspamd_function (lua_State * L) f.args = g_list_prepend (f.args, arg); } } - res = call_expression_function (&f, task); + res = call_expression_function (&f, task, L); lua_pushboolean (L, res); if (f.args) { g_list_free (f.args); diff --git a/src/lua/lua_xmlrpc.c b/src/lua/lua_xmlrpc.c index 6ef9be7e4..e9b317517 100644 --- a/src/lua/lua_xmlrpc.c +++ b/src/lua/lua_xmlrpc.c @@ -353,11 +353,11 @@ xmlrpc_text (GMarkupParseContext *context, const gchar *text, gsize text_len, gp gdouble dnum; /* Strip line */ - while (g_ascii_isspace (*text) && text_len > 0) { + while (text_len > 0 && g_ascii_isspace (*text)) { text ++; text_len --; } - while (g_ascii_isspace (text[text_len - 1]) && text_len > 0) { + while (text_len > 0 && g_ascii_isspace (text[text_len - 1])) { text_len --; } @@ -417,12 +417,16 @@ lua_xmlrpc_parse_reply (lua_State *L) G_MARKUP_TREAT_CDATA_AS_TEXT, &ud, NULL); res = g_markup_parse_context_parse (ctx, data, s, &err); + g_markup_parse_context_free (ctx); if (! res) { - lua_pushnil (L); + lua_pushboolean (L, FALSE); + } + else { + lua_pushboolean (L, TRUE); } } else { - lua_pushnil (L); + lua_pushboolean (L, FALSE); } return 1; diff --git a/src/main.c b/src/main.c index a1999edc2..532f212d3 100644 --- a/src/main.c +++ b/src/main.c @@ -317,7 +317,8 @@ reread_config (struct rspamd_main *rspamd) cfg_file = memory_pool_strdup (tmp_cfg->cfg_pool, rspamd->cfg->cfg_name); /* Save some variables */ tmp_cfg->cfg_name = cfg_file; - init_lua (tmp_cfg); + tmp_cfg->lua_state = init_lua (tmp_cfg); + memory_pool_add_destructor (tmp_cfg->cfg_pool, (pool_destruct_func)lua_close, tmp_cfg->lua_state); if (! load_rspamd_config (tmp_cfg, FALSE)) { msg_err ("cannot parse new config file, revert to old one"); @@ -909,7 +910,8 @@ main (gint argc, gchar **argv, gchar **env) g_log_set_default_handler (rspamd_glib_log_function, rspamd_main->logger); detect_priv (rspamd_main); - init_lua (rspamd_main->cfg); + rspamd_main->cfg->lua_state = init_lua (rspamd_main->cfg); + memory_pool_add_destructor (rspamd_main->cfg->cfg_pool, (pool_destruct_func)lua_close, rspamd_main->cfg->lua_state); pworker = &workers[0]; while (*pworker) { diff --git a/src/mem_pool.c b/src/mem_pool.c index 2c111681f..9cc4a537d 100644 --- a/src/mem_pool.c +++ b/src/mem_pool.c @@ -620,6 +620,9 @@ memory_pool_delete (memory_pool_t * pool) mem_pool_stat->pools_freed++; POOL_MTX_UNLOCK (); +#if ((GLIB_MAJOR_VERSION == 2) && (GLIB_MINOR_VERSION > 30)) + g_mutex_clear (&pool->mtx); +#endif g_slice_free (memory_pool_t, pool); } diff --git a/src/plugins/dkim_check.c b/src/plugins/dkim_check.c index 99dccf80b..ef494a736 100644 --- a/src/plugins/dkim_check.c +++ b/src/plugins/dkim_check.c @@ -294,7 +294,17 @@ dkim_module_key_handler (rspamd_dkim_key_t *key, gsize keylen, rspamd_dkim_conte else { /* Insert tempfail symbol */ msg_info ("cannot get key for domain %s", ctx->dns_key); - insert_result (task, dkim_module_ctx->symbol_tempfail, 1, NULL); + if (err != NULL) { + insert_result (task, dkim_module_ctx->symbol_tempfail, 1, g_list_prepend (NULL, memory_pool_strdup (task->task_pool, err->message))); + + } + else { + insert_result (task, dkim_module_ctx->symbol_tempfail, 1, NULL); + } + } + + if (err) { + g_error_free (err); } } diff --git a/src/plugins/regexp.c b/src/plugins/regexp.c index 5b1eefe42..e0d90532c 100644 --- a/src/plugins/regexp.c +++ b/src/plugins/regexp.c @@ -1069,13 +1069,11 @@ process_regexp (struct rspamd_regexp *re, struct worker_task *task, const gchar } static gboolean -maybe_call_lua_function (const gchar *name, struct worker_task *task) +maybe_call_lua_function (const gchar *name, struct worker_task *task, lua_State *L) { - lua_State *L = task->cfg->lua_state; struct worker_task **ptask; gboolean res; - g_mutex_lock (lua_mtx); lua_getglobal (L, name); if (lua_isfunction (L, -1)) { ptask = lua_newuserdata (L, sizeof (struct worker_task *)); @@ -1084,18 +1082,15 @@ maybe_call_lua_function (const gchar *name, struct worker_task *task) /* Call function */ if (lua_pcall (L, 1, 1, 0) != 0) { msg_info ("call to %s failed: %s", (gchar *)name, lua_tostring (L, -1)); - g_mutex_unlock (lua_mtx); return FALSE; } res = lua_toboolean (L, -1); lua_pop (L, 1); - g_mutex_unlock (lua_mtx); return res; } else { lua_pop (L, 1); } - g_mutex_unlock (lua_mtx); return FALSE; } @@ -1156,7 +1151,7 @@ optimize_regexp_expression (struct expression **e, GQueue * stack, gboolean res) } static gboolean -process_regexp_expression (struct expression *expr, gchar *symbol, struct worker_task *task, const gchar *additional) +process_regexp_expression (struct expression *expr, gchar *symbol, struct worker_task *task, const gchar *additional, struct lua_locked_state *nL) { GQueue *stack; gsize cur, op1, op2; @@ -1179,7 +1174,14 @@ process_regexp_expression (struct expression *expr, gchar *symbol, struct worker } } else if (it->type == EXPR_FUNCTION) { - cur = (gsize) call_expression_function ((struct expression_function *)it->content.operand, task); + if (nL) { + rspamd_mutex_lock (nL->m); + cur = (gsize) call_expression_function ((struct expression_function *)it->content.operand, task, nL->L); + rspamd_mutex_unlock (nL->m); + } + else { + cur = (gsize) call_expression_function ((struct expression_function *)it->content.operand, task, task->cfg->lua_state); + } debug_task ("function %s returned %s", ((struct expression_function *)it->content.operand)->name, cur ? "true" : "false"); if (try_optimize) { try_optimize = optimize_regexp_expression (&it, stack, cur); @@ -1191,9 +1193,14 @@ process_regexp_expression (struct expression *expr, gchar *symbol, struct worker else if (it->type == EXPR_STR) { /* This may be lua function, try to call it */ if (regexp_module_ctx->workers != NULL) { - g_mutex_lock (workers_mtx); - cur = maybe_call_lua_function ((const gchar*)it->content.operand, task); - g_mutex_unlock (workers_mtx); + if (nL) { + rspamd_mutex_lock (nL->m); + cur = maybe_call_lua_function ((const gchar*)it->content.operand, task, nL->L); + rspamd_mutex_unlock (nL->m); + } + else { + cur = maybe_call_lua_function ((const gchar*)it->content.operand, task, task->cfg->lua_state); + } } debug_task ("function %s returned %s", (const gchar *)it->content.operand, cur ? "true" : "false"); if (try_optimize) { @@ -1278,9 +1285,10 @@ static void process_regexp_item_threaded (gpointer data, gpointer user_data) { struct regexp_threaded_ud *ud = data; + struct lua_locked_state *nL = user_data; /* Process expression */ - if (process_regexp_expression (ud->item->expr, ud->item->symbol, ud->task, NULL)) { + if (process_regexp_expression (ud->item->expr, ud->item->symbol, ud->task, NULL, nL)) { g_mutex_lock (workers_mtx); insert_result (ud->task, ud->item->symbol, 1, NULL); g_mutex_unlock (workers_mtx); @@ -1295,6 +1303,7 @@ process_regexp_item (struct worker_task *task, void *user_data) gboolean res = FALSE; struct regexp_threaded_ud *thr_ud; GError *err = NULL; + struct lua_locked_state *nL; if (!item->lua_function && regexp_module_ctx->max_threads > 1) { @@ -1308,8 +1317,10 @@ process_regexp_item (struct worker_task *task, void *user_data) workers_mtx = memory_pool_alloc (regexp_module_ctx->regexp_pool, sizeof (GMutex)); g_mutex_init (workers_mtx); #endif + nL = init_lua_locked (task->cfg); + luaopen_regexp (nL->L); regexp_module_ctx->workers = g_thread_pool_new (process_regexp_item_threaded, - regexp_module_ctx, regexp_module_ctx->max_threads, TRUE, &err); + nL, regexp_module_ctx->max_threads, TRUE, &err); if (err != NULL) { msg_err ("thread pool creation failed: %s", err->message); regexp_module_ctx->max_threads = 0; @@ -1320,6 +1331,7 @@ process_regexp_item (struct worker_task *task, void *user_data) thr_ud->item = item; thr_ud->task = task; + register_async_thread (task->s); g_thread_pool_push (regexp_module_ctx->workers, thr_ud, &err); if (err != NULL) { @@ -1337,7 +1349,7 @@ process_regexp_item (struct worker_task *task, void *user_data) } else { /* Process expression */ - if (process_regexp_expression (item->expr, item->symbol, task, NULL)) { + if (process_regexp_expression (item->expr, item->symbol, task, NULL, NULL)) { insert_result (task, item->symbol, 1, NULL); } } @@ -1375,7 +1387,7 @@ rspamd_regexp_match_number (struct worker_task *task, GList * args, void *unused } } else { - if (process_regexp_expression (cur->data, "regexp_match_number", task, NULL)) { + if (process_regexp_expression (cur->data, "regexp_match_number", task, NULL, NULL)) { res++; } if (res >= param_count) { @@ -1608,13 +1620,13 @@ rspamd_check_smtp_data (struct worker_task *task, GList * args, void *unused) } else if (arg != NULL) { if (what != NULL) { - if (process_regexp_expression (arg->data, "regexp_check_smtp_data", task, what)) { + if (process_regexp_expression (arg->data, "regexp_check_smtp_data", task, what, NULL)) { return TRUE; } } else { while (rcpt_list) { - if (process_regexp_expression (arg->data, "regexp_check_smtp_data", task, rcpt_list->data)) { + if (process_regexp_expression (arg->data, "regexp_check_smtp_data", task, rcpt_list->data, NULL)) { return TRUE; } rcpt_list = g_list_next (rcpt_list); diff --git a/src/symbols_cache.c b/src/symbols_cache.c index ed8fa73f6..09d3d843c 100644 --- a/src/symbols_cache.c +++ b/src/symbols_cache.c @@ -776,6 +776,7 @@ validate_cache (struct symbols_cache *cache, struct config_file *cfg, gboolean s } cur = g_list_next (cur); } + g_list_free (metric_symbols); #endif /* GLIB_COMPAT */ return TRUE; diff --git a/src/util.c b/src/util.c index b721e838b..75b0b0cfa 100644 --- a/src/util.c +++ b/src/util.c @@ -1363,7 +1363,7 @@ rspamd_mutex_new (void) { rspamd_mutex_t *new; - new = g_malloc (sizeof (rspamd_mutex_t)); + new = g_slice_alloc (sizeof (rspamd_mutex_t)); #if ((GLIB_MAJOR_VERSION == 2) && (GLIB_MINOR_VERSION > 30)) g_mutex_init (&new->mtx); #else @@ -1401,6 +1401,15 @@ rspamd_mutex_unlock (rspamd_mutex_t *mtx) #endif } +void +rspamd_mutex_free (rspamd_mutex_t *mtx) +{ +#if ((GLIB_MAJOR_VERSION == 2) && (GLIB_MINOR_VERSION > 30)) + g_mutex_clear (&mtx->mtx); +#endif + g_slice_free1 (sizeof (rspamd_mutex_t), mtx); +} + /** * Create new rwlock * @return @@ -1476,6 +1485,14 @@ rspamd_rwlock_reader_unlock (rspamd_rwlock_t *mtx) #endif } +void +rspamd_rwlock_free (rspamd_rwlock_t *mtx) +{ +#if ((GLIB_MAJOR_VERSION == 2) && (GLIB_MINOR_VERSION > 30)) + g_rw_lock_clear (&mtx->rwlock); +#endif + g_slice_free1 (sizeof (rspamd_rwlock_t), mtx); +} struct rspamd_thread_data { gchar *name; diff --git a/src/util.h b/src/util.h index 1db0166d9..961e75645 100644 --- a/src/util.h +++ b/src/util.h @@ -265,6 +265,12 @@ void rspamd_mutex_lock (rspamd_mutex_t *mtx); */ void rspamd_mutex_unlock (rspamd_mutex_t *mtx); +/** + * Clear rspamd mutex + * @param mtx + */ +void rspamd_mutex_free (rspamd_mutex_t *mtx); + /** * Create new rwloc * @return @@ -295,6 +301,12 @@ void rspamd_rwlock_writer_unlock (rspamd_rwlock_t *mtx); */ void rspamd_rwlock_reader_unlock (rspamd_rwlock_t *mtx); +/** + * Free rwlock + * @param mtx + */ +void rspamd_rwlock_free (rspamd_rwlock_t *mtx); + /** * Create new named thread * @param name name pattern diff --git a/src/worker.c b/src/worker.c index 465104b6e..08fbdda11 100644 --- a/src/worker.c +++ b/src/worker.c @@ -782,6 +782,7 @@ start_worker (struct rspamd_worker *worker) gchar *is_custom_str; struct rspamd_worker_ctx *ctx = worker->ctx; GError *err = NULL; + struct lua_locked_state *nL; #ifdef WITH_PROFILER extern void _start (void), etext (void); @@ -836,7 +837,8 @@ start_worker (struct rspamd_worker *worker) /* Create classify pool */ ctx->classify_pool = NULL; if (ctx->classify_threads > 1) { - ctx->classify_pool = g_thread_pool_new (process_statfiles_threaded, ctx, ctx->classify_threads, TRUE, &err); + nL = init_lua_locked (worker->srv->cfg); + ctx->classify_pool = g_thread_pool_new (process_statfiles_threaded, nL, ctx->classify_threads, TRUE, &err); if (err != NULL) { msg_err ("pool create failed: %s", err->message); ctx->classify_pool = NULL; -- 2.39.5