Browse Source

* Rework thread pools locking logic to avoid global lua mutex usage.

Fixed several memory leaks with modern glib.
Fixed memory leak in dkim code.
Fixed a problem with static global variables in shared libraries.
tags/0.5.2
Vsevolod Stakhov 11 years ago
parent
commit
b90267a71c

+ 4
- 24
src/classifiers/bayes.c View File

@@ -165,16 +165,6 @@ bayes_classify_callback (gpointer key, gpointer value, gpointer data)
return FALSE;
}

#if ((GLIB_MAJOR_VERSION == 2) && (GLIB_MINOR_VERSION <= 30))
static void
bayes_mutex_free (gpointer data)
{
GMutex *mtx = data;

g_mutex_free (mtx);
}
#endif

struct classifier_ctx*
bayes_init (memory_pool_t *pool, struct classifier_config *cfg)
{
@@ -183,19 +173,12 @@ bayes_init (memory_pool_t *pool, struct classifier_config *cfg)
ctx->pool = pool;
ctx->cfg = cfg;
ctx->debug = FALSE;
#if ((GLIB_MAJOR_VERSION == 2) && (GLIB_MINOR_VERSION <= 30))
ctx->mtx = g_mutex_new ();
memory_pool_add_destructor (pool, (pool_destruct_func) bayes_mutex_free, ctx->mtx);
#else
ctx->mtx = memory_pool_alloc (pool, sizeof (GMutex));
g_mutex_init (ctx->mtx);
#endif

return ctx;
}

gboolean
bayes_classify (struct classifier_ctx* ctx, statfile_pool_t *pool, GTree *input, struct worker_task *task)
bayes_classify (struct classifier_ctx* ctx, statfile_pool_t *pool, GTree *input, struct worker_task *task, lua_State *L)
{
struct bayes_callback_data data;
gchar *value;
@@ -222,16 +205,13 @@ bayes_classify (struct classifier_ctx* ctx, statfile_pool_t *pool, GTree *input,
}
}

/* Critical section as here can be lua callbacks calling */
g_mutex_lock (ctx->mtx);
cur = call_classifier_pre_callbacks (ctx->cfg, task, FALSE, FALSE);
cur = call_classifier_pre_callbacks (ctx->cfg, task, FALSE, FALSE, L);
if (cur) {
memory_pool_add_destructor (task->task_pool, (pool_destruct_func)g_list_free, cur);
}
else {
cur = ctx->cfg->statfiles;
}
g_mutex_unlock (ctx->mtx);

data.statfiles_num = g_list_length (cur);
data.statfiles = g_new0 (struct bayes_statfile_data, data.statfiles_num);
@@ -402,7 +382,7 @@ bayes_learn (struct classifier_ctx* ctx, statfile_pool_t *pool, const char *symb

gboolean
bayes_learn_spam (struct classifier_ctx* ctx, statfile_pool_t *pool,
GTree *input, struct worker_task *task, gboolean is_spam, GError **err)
GTree *input, struct worker_task *task, gboolean is_spam, lua_State *L, GError **err)
{
struct bayes_callback_data data;
gchar *value;
@@ -431,7 +411,7 @@ bayes_learn_spam (struct classifier_ctx* ctx, statfile_pool_t *pool,
}
}

cur = call_classifier_pre_callbacks (ctx->cfg, task, FALSE, FALSE);
cur = call_classifier_pre_callbacks (ctx->cfg, task, FALSE, FALSE, L);
if (cur) {
memory_pool_add_destructor (task->task_pool, (pool_destruct_func)g_list_free, cur);
}

+ 7
- 7
src/classifiers/classifiers.h View File

@@ -5,6 +5,7 @@
#include "mem_pool.h"
#include "statfile.h"
#include "tokenizers/tokenizers.h"
#include <lua.h>

/* Consider this value as 0 */
#define ALPHA 0.0001
@@ -17,7 +18,6 @@ struct classifier_ctx {
GHashTable *results;
gboolean debug;
struct classifier_config *cfg;
GMutex *mtx;
};

struct classify_weight {
@@ -29,12 +29,12 @@ struct classify_weight {
struct classifier {
char *name;
struct classifier_ctx* (*init_func)(memory_pool_t *pool, struct classifier_config *cf);
gboolean (*classify_func)(struct classifier_ctx* ctx, statfile_pool_t *pool, GTree *input, struct worker_task *task);
gboolean (*classify_func)(struct classifier_ctx* ctx, statfile_pool_t *pool, GTree *input, struct worker_task *task, lua_State *L);
gboolean (*learn_func)(struct classifier_ctx* ctx, statfile_pool_t *pool,
const char *symbol, GTree *input, gboolean in_class,
double *sum, double multiplier, GError **err);
gboolean (*learn_spam_func)(struct classifier_ctx* ctx, statfile_pool_t *pool,
GTree *input, struct worker_task *task, gboolean is_spam, GError **err);
GTree *input, struct worker_task *task, gboolean is_spam, lua_State *L, GError **err);
GList* (*weights_func)(struct classifier_ctx* ctx, statfile_pool_t *pool, GTree *input, struct worker_task *task);
};

@@ -43,20 +43,20 @@ struct classifier* get_classifier (char *name);

/* Winnow algorithm */
struct classifier_ctx* winnow_init (memory_pool_t *pool, struct classifier_config *cf);
gboolean winnow_classify (struct classifier_ctx* ctx, statfile_pool_t *pool, GTree *input, struct worker_task *task);
gboolean winnow_classify (struct classifier_ctx* ctx, statfile_pool_t *pool, GTree *input, struct worker_task *task, lua_State *L);
gboolean winnow_learn (struct classifier_ctx* ctx, statfile_pool_t *pool, const char *symbol, GTree *input,
gboolean in_class, double *sum, double multiplier, GError **err);
gboolean winnow_learn_spam (struct classifier_ctx* ctx, statfile_pool_t *pool,
GTree *input, struct worker_task *task, gboolean is_spam, GError **err);
GTree *input, struct worker_task *task, gboolean is_spam, lua_State *L, GError **err);
GList *winnow_weights (struct classifier_ctx* ctx, statfile_pool_t *pool, GTree *input, struct worker_task *task);

/* Bayes algorithm */
struct classifier_ctx* bayes_init (memory_pool_t *pool, struct classifier_config *cf);
gboolean bayes_classify (struct classifier_ctx* ctx, statfile_pool_t *pool, GTree *input, struct worker_task *task);
gboolean bayes_classify (struct classifier_ctx* ctx, statfile_pool_t *pool, GTree *input, struct worker_task *task, lua_State *L);
gboolean bayes_learn (struct classifier_ctx* ctx, statfile_pool_t *pool, const char *symbol, GTree *input,
gboolean in_class, double *sum, double multiplier, GError **err);
gboolean bayes_learn_spam (struct classifier_ctx* ctx, statfile_pool_t *pool,
GTree *input, struct worker_task *task, gboolean is_spam, GError **err);
GTree *input, struct worker_task *task, gboolean is_spam, lua_State *L, GError **err);
GList *bayes_weights (struct classifier_ctx* ctx, statfile_pool_t *pool, GTree *input, struct worker_task *task);
/* Array of all defined classifiers */
extern struct classifier classifiers[];

+ 4
- 4
src/classifiers/winnow.c View File

@@ -193,7 +193,7 @@ winnow_init (memory_pool_t * pool, struct classifier_config *cfg)
}

gboolean
winnow_classify (struct classifier_ctx *ctx, statfile_pool_t * pool, GTree * input, struct worker_task *task)
winnow_classify (struct classifier_ctx *ctx, statfile_pool_t * pool, GTree * input, struct worker_task *task, lua_State *L)
{
struct winnow_callback_data data;
char *sumbuf, *value;
@@ -221,7 +221,7 @@ winnow_classify (struct classifier_ctx *ctx, statfile_pool_t * pool, GTree * inp
}
}

cur = call_classifier_pre_callbacks (ctx->cfg, task, FALSE, FALSE);
cur = call_classifier_pre_callbacks (ctx->cfg, task, FALSE, FALSE, L);
if (cur) {
memory_pool_add_destructor (task->task_pool, (pool_destruct_func)g_list_free, cur);
}
@@ -261,7 +261,7 @@ winnow_classify (struct classifier_ctx *ctx, statfile_pool_t * pool, GTree * inp

if (sel != NULL) {
#ifdef WITH_LUA
max = call_classifier_post_callbacks (ctx->cfg, task, max);
max = call_classifier_post_callbacks (ctx->cfg, task, max, L);
#endif
#ifdef HAVE_TANHL
max = tanhl (max);
@@ -593,7 +593,7 @@ end:

gboolean
winnow_learn_spam (struct classifier_ctx* ctx, statfile_pool_t *pool,
GTree *input, struct worker_task *task, gboolean is_spam, GError **err)
GTree *input, struct worker_task *task, gboolean is_spam, lua_State *L, GError **err)
{
g_set_error (err,
winnow_error_quark(), /* error domain */

+ 2
- 0
src/events.c View File

@@ -90,6 +90,8 @@ new_async_session (memory_pool_t * pool, session_finalizer_t fin,
g_mutex_init (new->mtx);
new->cond = memory_pool_alloc (pool, sizeof (GCond));
g_cond_init (new->cond);
memory_pool_add_destructor (pool, (pool_destruct_func) g_mutex_clear, new->mtx);
memory_pool_add_destructor (pool, (pool_destruct_func) g_cond_clear, new->cond);
#endif
new->threads = 0;


+ 3
- 13
src/expressions.c View File

@@ -785,7 +785,7 @@ parse_regexp (memory_pool_t * pool, gchar *line, gboolean raw_mode)
}

gboolean
call_expression_function (struct expression_function * func, struct worker_task * task)
call_expression_function (struct expression_function * func, struct worker_task * task, lua_State *L)
{
struct _fl *selected, key;

@@ -794,17 +794,7 @@ call_expression_function (struct expression_function * func, struct worker_task
selected = bsearch (&key, list_ptr, functions_number, sizeof (struct _fl), fl_cmp);
if (selected == NULL) {
/* Try to check lua function */
#if 0
if (! lua_call_expression_func (NULL, func->name, task, func->args, &res)) {
msg_warn ("call to undefined function %s", key.name);
return FALSE;
}
else {
return res;
}
#else
return FALSE;
#endif
}

return selected->func (task, func->args, selected->user_data);
@@ -830,7 +820,7 @@ get_function_arg (struct expression *expr, struct worker_task *task, gboolean wa
}
else if (expr->type == EXPR_FUNCTION && !want_string) {
res->type = EXPRESSION_ARGUMENT_BOOL;
cur = call_expression_function (expr->content.operand, task);
cur = call_expression_function (expr->content.operand, task, NULL);
res->data = GSIZE_TO_POINTER (cur);
}
else {
@@ -853,7 +843,7 @@ get_function_arg (struct expression *expr, struct worker_task *task, gboolean wa
return res;
}
else if (it->type == EXPR_FUNCTION) {
cur = (gsize) call_expression_function ((struct expression_function *)it->content.operand, task);
cur = (gsize) call_expression_function ((struct expression_function *)it->content.operand, task, NULL);
debug_task ("function %s returned %s", ((struct expression_function *)it->content.operand)->name, cur ? "true" : "false");
}
else if (it->type == EXPR_OPERATION) {

+ 3
- 1
src/expressions.h View File

@@ -7,6 +7,7 @@
#define RSPAMD_EXPRESSIONS_H

#include "config.h"
#include <lua.h>

struct worker_task;
struct rspamd_regexp;
@@ -72,9 +73,10 @@ struct expression* parse_expression (memory_pool_t *pool, gchar *line);
* Call specified fucntion and return boolean result
* @param func function to call
* @param task task object
* @param L lua specific state
* @return TRUE or FALSE depending on function result
*/
gboolean call_expression_function (struct expression_function *func, struct worker_task *task);
gboolean call_expression_function (struct expression_function *func, struct worker_task *task, lua_State *L);

/**
* Register specified function to rspamd internal functions list

+ 28
- 6
src/filter.c View File

@@ -585,10 +585,16 @@ make_composites (struct worker_task *task)
g_hash_table_foreach (task->results, composites_metric_callback, task);
}

struct classifiers_cbdata {
struct worker_task *task;
struct lua_locked_state *nL;
};

static void
classifiers_callback (gpointer value, void *arg)
{
struct worker_task *task = arg;
struct classifiers_cbdata *cbdata = arg;
struct worker_task *task;
struct classifier_config *cl = value;
struct classifier_ctx *ctx;
struct mime_text_part *text_part, *p1, *p2;
@@ -600,6 +606,8 @@ classifiers_callback (gpointer value, void *arg)
gint *dist = NULL, diff;
gboolean is_twopart = FALSE;
task = cbdata->task;

if ((header = g_hash_table_lookup (cl->opts, "header")) != NULL) {
cur = message_get_header (task->task_pool, task->message, header, FALSE);
if (cur) {
@@ -675,7 +683,15 @@ classifiers_callback (gpointer value, void *arg)

/* Take care of subject */
tokenize_subject (task, &tokens);
cl->classifier->classify_func (ctx, task->worker->srv->statfile_pool, tokens, task);
if (cbdata->nL != NULL) {
rspamd_mutex_lock (cbdata->nL->m);
cl->classifier->classify_func (ctx, task->worker->srv->statfile_pool, tokens, task, cbdata->nL->L);
rspamd_mutex_unlock (cbdata->nL->m);
}
else {
/* Non-threaded case */
cl->classifier->classify_func (ctx, task->worker->srv->statfile_pool, tokens, task, task->cfg->lua_state);
}

/* Autolearning */
cur = g_list_first (cl->statfiles);
@@ -695,6 +711,7 @@ classifiers_callback (gpointer value, void *arg)
void
process_statfiles (struct worker_task *task)
{
struct classifiers_cbdata cbdata;

if (task->is_skipped) {
return;
@@ -704,8 +721,9 @@ process_statfiles (struct worker_task *task)
task->tokens = g_hash_table_new (g_direct_hash, g_direct_equal);
memory_pool_add_destructor (task->task_pool, (pool_destruct_func)g_hash_table_unref, task->tokens);
}

g_list_foreach (task->cfg->classifiers, classifiers_callback, task);
cbdata.task = task;
cbdata.nL = NULL;
g_list_foreach (task->cfg->classifiers, classifiers_callback, &cbdata);

/* Process results */
make_composites (task);
@@ -715,6 +733,8 @@ void
process_statfiles_threaded (gpointer data, gpointer user_data)
{
struct worker_task *task = (struct worker_task *)data;
struct lua_locked_state *nL = user_data;
struct classifiers_cbdata cbdata;

if (task->is_skipped) {
return;
@@ -725,7 +745,9 @@ process_statfiles_threaded (gpointer data, gpointer user_data)
memory_pool_add_destructor (task->task_pool, (pool_destruct_func)g_hash_table_unref, task->tokens);
}

g_list_foreach (task->cfg->classifiers, classifiers_callback, task);
cbdata.task = task;
cbdata.nL = nL;
g_list_foreach (task->cfg->classifiers, classifiers_callback, &cbdata);
remove_async_thread (task->s);
}

@@ -1054,7 +1076,7 @@ learn_task_spam (struct classifier_config *cl, struct worker_task *task, gboolea
/* Learn */
if (!cl->classifier->learn_spam_func (
cls_ctx, task->worker->srv->statfile_pool,
tokens, task, is_spam, err)) {
tokens, task, is_spam, task->cfg->lua_state, err)) {
if (*err) {
msg_info ("learn failed for message <%s>, learn error: %s", task->message_id, (*err)->message);
return FALSE;

+ 3
- 4
src/html.c View File

@@ -29,7 +29,7 @@
#include "html.h"
#include "url.h"

sig_atomic_t tags_sorted = 0;
static sig_atomic_t tags_sorted = 0;

static struct html_tag tag_defs[] = {
/* W3C defined elements */
@@ -156,7 +156,7 @@ static struct html_tag tag_defs[] = {
{Tag_WBR, "wbr", (CM_INLINE | CM_EMPTY)},
};

sig_atomic_t entities_sorted = 0;
static sig_atomic_t entities_sorted = 0;
struct _entity;
typedef struct _entity entity;

@@ -438,7 +438,7 @@ static entity entities_defs[] = {
{"euro", 8364, "E"},
};

static entity *entities_defs_num = NULL;
static entity entities_defs_num[ (G_N_ELEMENTS (entities_defs)) ];

static gint
tag_cmp (const void *m1, const void *m2)
@@ -881,7 +881,6 @@ add_html_node (struct worker_task *task, memory_pool_t * pool, struct mime_text_
}
if (!entities_sorted) {
qsort (entities_defs, G_N_ELEMENTS (entities_defs), sizeof (entity), entity_cmp);
entities_defs_num = g_new (entity, G_N_ELEMENTS (entities_defs));
memcpy (entities_defs_num, entities_defs, sizeof (entities_defs));
qsort (entities_defs_num, G_N_ELEMENTS (entities_defs), sizeof (entity), entity_cmp_num);
entities_sorted = 1;

+ 2
- 26
src/lua/lua_buffer.c View File

@@ -82,13 +82,9 @@ static gboolean
lua_io_read_cb (f_str_t * in, void *arg)
{
struct lua_dispatcher_cbdata *cbdata = arg;
gboolean need_unlock = FALSE, res;
gboolean res;
rspamd_io_dispatcher_t **pdispatcher;

/* Avoid LOR here as mutex can be acquired before in lua_call */
if (g_mutex_trylock (lua_mtx)) {
need_unlock = TRUE;
}
/* callback (dispatcher, data) */
lua_rawgeti (cbdata->L, LUA_REGISTRYINDEX, cbdata->cbref_read);
pdispatcher = lua_newuserdata (cbdata->L, sizeof (struct rspamd_io_dispatcher_s *));
@@ -103,10 +99,6 @@ lua_io_read_cb (f_str_t * in, void *arg)
res = lua_toboolean (cbdata->L, -1);
lua_pop (cbdata->L, 1);

if (need_unlock) {
g_mutex_unlock (lua_mtx);
}

return res;
}

@@ -114,14 +106,10 @@ static gboolean
lua_io_write_cb (void *arg)
{
struct lua_dispatcher_cbdata *cbdata = arg;
gboolean need_unlock = FALSE, res;
gboolean res;
rspamd_io_dispatcher_t **pdispatcher;

if (cbdata->cbref_write) {
/* Avoid LOR here as mutex can be acquired before in lua_call */
if (g_mutex_trylock (lua_mtx)) {
need_unlock = TRUE;
}
lua_rawgeti (cbdata->L, LUA_REGISTRYINDEX, cbdata->cbref_read);
/* callback (dispatcher) */
pdispatcher = lua_newuserdata (cbdata->L, sizeof (struct rspamd_io_dispatcher_s *));
@@ -135,10 +123,6 @@ lua_io_write_cb (void *arg)

res = lua_toboolean (cbdata->L, -1);
lua_pop (cbdata->L, 1);

if (need_unlock) {
g_mutex_unlock (lua_mtx);
}
}

return res;
@@ -148,13 +132,8 @@ static void
lua_io_err_cb (GError * err, void *arg)
{
struct lua_dispatcher_cbdata *cbdata = arg;
gboolean need_unlock = FALSE;
rspamd_io_dispatcher_t **pdispatcher;

/* Avoid LOR here as mutex can be acquired before in lua_call */
if (g_mutex_trylock (lua_mtx)) {
need_unlock = TRUE;
}
/* callback (dispatcher, err) */
lua_rawgeti (cbdata->L, LUA_REGISTRYINDEX, cbdata->cbref_err);
pdispatcher = lua_newuserdata (cbdata->L, sizeof (struct rspamd_io_dispatcher_s *));
@@ -166,9 +145,6 @@ lua_io_err_cb (GError * err, void *arg)
msg_info ("call to session finalizer failed: %s", lua_tostring (cbdata->L, -1));
}

if (need_unlock) {
g_mutex_unlock (lua_mtx);
}
/* Unref callbacks */
luaL_unref (cbdata->L, LUA_REGISTRYINDEX, cbdata->cbref_read);
if (cbdata->cbref_write) {

+ 16
- 22
src/lua/lua_classifier.c View File

@@ -117,27 +117,24 @@ call_classifier_pre_callback (struct classifier_config *ccf, struct worker_task
/* Return list of statfiles that should be checked for this message */
GList *
call_classifier_pre_callbacks (struct classifier_config *ccf, struct worker_task *task,
gboolean is_learn, gboolean is_spam)
gboolean is_learn, gboolean is_spam, lua_State *L)
{
GList *res = NULL, *cur;
struct classifier_callback_data *cd;
lua_State *L;


/* Go throught all callbacks and call them, appending results to list */
cur = g_list_first (ccf->pre_callbacks);
while (cur) {
cd = cur->data;
lua_getglobal (cd->L, cd->name);
lua_getglobal (L, cd->name);

res = g_list_concat (res, call_classifier_pre_callback (ccf, task, cd->L, is_learn, is_spam));
res = g_list_concat (res, call_classifier_pre_callback (ccf, task, L, is_learn, is_spam));

cur = g_list_next (cur);
}
g_mutex_lock (lua_mtx);

if (res == NULL) {
L = task->cfg->lua_state;
/* Check function from global table 'classifiers' */
lua_getglobal (L, "classifiers");
if (lua_istable (L, -1)) {
@@ -151,14 +148,13 @@ call_classifier_pre_callbacks (struct classifier_config *ccf, struct worker_task
}
lua_pop (L, 1);
}
g_mutex_unlock (lua_mtx);

return res;
}

/* Return result mark for statfile */
double
call_classifier_post_callbacks (struct classifier_config *ccf, struct worker_task *task, double in)
call_classifier_post_callbacks (struct classifier_config *ccf, struct worker_task *task, double in, lua_State *L)
{
struct classifier_callback_data *cd;
struct classifier_config **pccf;
@@ -166,36 +162,34 @@ call_classifier_post_callbacks (struct classifier_config *ccf, struct worker_tas
double out = in;
GList *cur;

g_mutex_lock (lua_mtx);
/* Go throught all callbacks and call them, appending results to list */
cur = g_list_first (ccf->pre_callbacks);
while (cur) {
cd = cur->data;
lua_getglobal (cd->L, cd->name);
lua_getglobal (L, cd->name);

pccf = lua_newuserdata (cd->L, sizeof (struct classifier_config *));
lua_setclass (cd->L, "rspamd{classifier}", -1);
pccf = lua_newuserdata (L, sizeof (struct classifier_config *));
lua_setclass (L, "rspamd{classifier}", -1);
*pccf = ccf;

ptask = lua_newuserdata (cd->L, sizeof (struct worker_task *));
lua_setclass (cd->L, "rspamd{task}", -1);
ptask = lua_newuserdata (L, sizeof (struct worker_task *));
lua_setclass (L, "rspamd{task}", -1);
*ptask = task;

lua_pushnumber (cd->L, out);
lua_pushnumber (L, out);

if (lua_pcall (cd->L, 3, 1, 0) != 0) {
msg_warn ("error running function %s: %s", cd->name, lua_tostring (cd->L, -1));
if (lua_pcall (L, 3, 1, 0) != 0) {
msg_warn ("error running function %s: %s", cd->name, lua_tostring (L, -1));
}
else {
if (lua_isnumber (cd->L, 1)) {
out = lua_tonumber (cd->L, 1);
if (lua_isnumber (L, 1)) {
out = lua_tonumber (L, 1);
}
lua_pop (cd->L, 1);
lua_pop (L, 1);
}

cur = g_list_next (cur);
}
g_mutex_unlock (lua_mtx);

return out;


+ 32
- 25
src/lua/lua_common.c View File

@@ -28,9 +28,6 @@
/* Lua module init function */
#define MODULE_INIT_FUNC "module_init"

/* Global lua mutex */
GMutex *lua_mtx = NULL;

const luaL_reg null_reg[] = {
{"__tostring", lua_class_tostring},
{NULL, NULL}
@@ -404,7 +401,7 @@ lua_add_actions_global (lua_State *L)
lua_setglobal (L, "rspamd_actions");
}

void
lua_State *
init_lua (struct config_file *cfg)
{
lua_State *L;
@@ -412,13 +409,6 @@ init_lua (struct config_file *cfg)
L = lua_open ();
luaL_openlibs (L);

#if ((GLIB_MAJOR_VERSION == 2) && (GLIB_MINOR_VERSION <= 30))
lua_mtx = g_mutex_new ();
#else
lua_mtx = g_malloc (sizeof (GMutex));
g_mutex_init (lua_mtx);
#endif

(void)luaopen_rspamd (L);
(void)luaopen_logger (L);
(void)luaopen_util (L);
@@ -446,9 +436,38 @@ init_lua (struct config_file *cfg)
(void)luaopen_io_dispatcher (L);
(void)luaopen_dns_resolver (L);

cfg->lua_state = L;
memory_pool_add_destructor (cfg->cfg_pool, (pool_destruct_func)lua_close, L);
return L;
}

/**
* Initialize new locked lua_State structure
*/
struct lua_locked_state*
init_lua_locked (struct config_file *cfg)
{
struct lua_locked_state *new;

new = g_slice_alloc (sizeof (struct lua_locked_state));
new->L = init_lua (cfg);
new->m = rspamd_mutex_new ();

return new;
}


/**
* Free locked state structure
*/
void
free_lua_locked (struct lua_locked_state *st)
{
g_assert (st != NULL);

lua_close (st->L);

rspamd_mutex_free (st->m);

g_slice_free1 (sizeof (struct lua_locked_state), st);
}

gboolean
@@ -524,7 +543,6 @@ lua_call_filter (const gchar *function, struct worker_task *task)
struct worker_task **ptask;
lua_State *L = task->cfg->lua_state;

g_mutex_lock (lua_mtx);
lua_getglobal (L, function);
ptask = lua_newuserdata (L, sizeof (struct worker_task *));
lua_setclass (L, "rspamd{task}", -1);
@@ -540,7 +558,6 @@ lua_call_filter (const gchar *function, struct worker_task *task)
}
result = lua_tonumber (L, -1);
lua_pop (L, 1); /* pop returned value */
g_mutex_unlock (lua_mtx);

return result;
}
@@ -552,7 +569,6 @@ lua_call_chain_filter (const gchar *function, struct worker_task *task, gint *ma
guint i;
lua_State *L = task->cfg->lua_state;

g_mutex_lock (lua_mtx);
lua_getglobal (L, function);

for (i = 0; i < number; i++) {
@@ -568,7 +584,6 @@ lua_call_chain_filter (const gchar *function, struct worker_task *task, gint *ma
}
result = lua_tonumber (L, -1);
lua_pop (L, 1); /* pop returned value */
g_mutex_unlock (lua_mtx);

return result;
}
@@ -584,7 +599,6 @@ lua_call_expression_func (const gchar *module, const gchar *function,
struct expression_argument *arg;
int nargs = 1, pop = 0;

g_mutex_lock (lua_mtx);
/* Call specified function and expect result of given expected_type */
/* First check function in config table */
lua_getglobal (L, "config");
@@ -646,7 +660,6 @@ lua_call_expression_func (const gchar *module, const gchar *function,

if (lua_pcall (L, nargs, 1, 0) != 0) {
msg_info ("call to %s failed: %s", function, lua_tostring (L, -1));
g_mutex_unlock (lua_mtx);
return FALSE;
}
pop ++;
@@ -654,13 +667,11 @@ lua_call_expression_func (const gchar *module, const gchar *function,
if (!lua_isboolean (L, -1)) {
lua_pop (L, pop);
msg_info ("function %s must return a boolean", function);
g_mutex_unlock (lua_mtx);
return FALSE;
}
*res = lua_toboolean (L, -1);
lua_pop (L, pop);

g_mutex_unlock (lua_mtx);
return TRUE;
}

@@ -682,7 +693,6 @@ lua_consolidation_callback (gpointer key, gpointer value, gpointer arg)
struct consolidation_callback_data *data = (struct consolidation_callback_data *)arg;
lua_State *L = data->task->cfg->lua_state;

g_mutex_lock (lua_mtx);
lua_getglobal (L, data->func);

lua_pushstring (L, (const gchar *)key);
@@ -698,7 +708,6 @@ lua_consolidation_callback (gpointer key, gpointer value, gpointer arg)
res = lua_tonumber (L, -1);
lua_pop (L, 1); /* pop returned value */
data->score += res;
g_mutex_unlock (lua_mtx);
}

double
@@ -735,7 +744,6 @@ lua_normalizer_func (struct config_file *cfg, long double score, void *params)
return score;
}

g_mutex_lock (lua_mtx);
lua_getglobal (L, p->data);
lua_pushnumber (L, score);

@@ -750,7 +758,6 @@ lua_normalizer_func (struct config_file *cfg, long double score, void *params)
res = lua_tonumber (L, -1);
lua_pop (L, 1);

g_mutex_unlock (lua_mtx);
return res;
}


+ 18
- 4
src/lua/lua_common.h View File

@@ -15,10 +15,15 @@
#define LUA_INTERFACE_DEF(class, name) { #name, lua_##class##_##name }

extern const luaL_reg null_reg[];
extern GMutex *lua_mtx;

#define RSPAMD_LUA_API_VERSION 12

/* Locked lua state with mutex */
struct lua_locked_state {
lua_State *L;
rspamd_mutex_t *m;
};

/* Common utility functions */

/**
@@ -54,13 +59,22 @@ gpointer lua_check_class (lua_State *L, gint index, const gchar *name);
/**
* Initialize lua and bindings
*/
void init_lua (struct config_file *cfg);
lua_State* init_lua (struct config_file *cfg);

/**
* Load and initialize lua plugins
*/
gboolean init_lua_filters (struct config_file *cfg);

/**
* Initialize new locked lua_State structure
*/
struct lua_locked_state* init_lua_locked (struct config_file *cfg);
/**
* Free locked state structure
*/
void free_lua_locked (struct lua_locked_state *st);

/**
* Open libraries functions
*/
@@ -97,8 +111,8 @@ void lua_call_pre_filters (struct worker_task *task);
void add_luabuf (const gchar *line);

/* Classify functions */
GList *call_classifier_pre_callbacks (struct classifier_config *ccf, struct worker_task *task, gboolean is_learn, gboolean is_spam);
double call_classifier_post_callbacks (struct classifier_config *ccf, struct worker_task *task, double in);
GList *call_classifier_pre_callbacks (struct classifier_config *ccf, struct worker_task *task, gboolean is_learn, gboolean is_spam, lua_State *L);
double call_classifier_post_callbacks (struct classifier_config *ccf, struct worker_task *task, double in, lua_State *L);

double lua_normalizer_func (struct config_file *cfg, long double score, void *params);


+ 0
- 8
src/lua/lua_config.c View File

@@ -336,7 +336,6 @@ lua_config_function_callback (struct worker_task *task, GList *args, void *user_
GList *cur;
gboolean res = FALSE;

g_mutex_lock (lua_mtx);
if (cd->cb_is_ref) {
lua_rawgeti (cd->L, LUA_REGISTRYINDEX, cd->callback.ref);
}
@@ -366,7 +365,6 @@ lua_config_function_callback (struct worker_task *task, GList *args, void *user_
}
lua_pop (cd->L, 1);
}
g_mutex_unlock (lua_mtx);

return res;
}
@@ -459,7 +457,6 @@ lua_call_post_filters (struct worker_task *task)
struct worker_task **ptask;
GList *cur;

g_mutex_lock (lua_mtx);
cur = task->cfg->post_filters;
while (cur) {
cd = cur->data;
@@ -479,7 +476,6 @@ lua_call_post_filters (struct worker_task *task)
}
cur = g_list_next (cur);
}
g_mutex_unlock (lua_mtx);
}

static gint
@@ -514,7 +510,6 @@ lua_call_pre_filters (struct worker_task *task)
struct worker_task **ptask;
GList *cur;

g_mutex_lock (lua_mtx);
cur = task->cfg->pre_filters;
while (cur) {
cd = cur->data;
@@ -534,7 +529,6 @@ lua_call_pre_filters (struct worker_task *task)
}
cur = g_list_next (cur);
}
g_mutex_unlock (lua_mtx);
}

static gint
@@ -660,7 +654,6 @@ lua_metric_symbol_callback (struct worker_task *task, gpointer ud)
struct lua_callback_data *cd = ud;
struct worker_task **ptask;

g_mutex_lock (lua_mtx);
if (cd->cb_is_ref) {
lua_rawgeti (cd->L, LUA_REGISTRYINDEX, cd->callback.ref);
}
@@ -675,7 +668,6 @@ lua_metric_symbol_callback (struct worker_task *task, gpointer ud)
msg_info ("call to %s failed: %s", cd->cb_is_ref ? "local function" :
cd->callback.name, lua_tostring (cd->L, -1));
}
g_mutex_unlock (lua_mtx);
}

static gint

+ 0
- 10
src/lua/lua_dns.c View File

@@ -71,12 +71,6 @@ lua_dns_callback (struct rspamd_dns_reply *reply, gpointer arg)
struct rspamd_dns_resolver **presolver;
union rspamd_reply_element *elt;
GList *cur;
gboolean need_unlock = FALSE;

/* Avoid LOR here as mutex can be acquired before in lua_call */
if (g_mutex_trylock (lua_mtx)) {
need_unlock = TRUE;
}

lua_rawgeti (cd->L, LUA_REGISTRYINDEX, cd->cbref);
presolver = lua_newuserdata (cd->L, sizeof (gpointer));
@@ -140,10 +134,6 @@ lua_dns_callback (struct rspamd_dns_reply *reply, gpointer arg)

/* Unref function */
luaL_unref (cd->L, LUA_REGISTRYINDEX, cd->cbref);

if (need_unlock) {
g_mutex_unlock (lua_mtx);
}
}

static int

+ 1
- 18
src/lua/lua_http.c View File

@@ -81,12 +81,6 @@ lua_http_push_error (gint code, struct lua_http_ud *ud)
{
struct worker_task **ptask;
gint num;
gboolean need_unlock = FALSE;

/* Avoid LOR here as mutex can be acquired before in lua_call */
if (g_mutex_trylock (lua_mtx)) {
need_unlock = TRUE;
}

/* Push error */
if (ud->callback) {
@@ -115,9 +109,7 @@ lua_http_push_error (gint code, struct lua_http_ud *ud)
g_list_free (ud->headers);
ud->headers = NULL;
}
if (need_unlock) {
g_mutex_unlock (lua_mtx);
}

ud->parser_state = 3;
remove_normal_event (ud->s, lua_http_fin, ud);

@@ -130,12 +122,6 @@ lua_http_push_reply (f_str_t *in, struct lua_http_ud *ud)
struct lua_http_header *header;
struct worker_task **ptask;
gint num;
gboolean need_unlock = FALSE;

/* Avoid LOR here as mutex can be acquired before in lua_call */
if (g_mutex_trylock (lua_mtx)) {
need_unlock = TRUE;
}

if (ud->callback) {
/* Push error */
@@ -175,9 +161,6 @@ lua_http_push_reply (f_str_t *in, struct lua_http_ud *ud)
ud->headers = NULL;
}

if (need_unlock) {
g_mutex_unlock (lua_mtx);
}
remove_normal_event (ud->s, lua_http_fin, ud);

}

+ 0
- 8
src/lua/lua_mempool.c View File

@@ -93,20 +93,12 @@ static void
lua_mempool_destructor_func (gpointer p)
{
struct lua_mempool_udata *ud = p;
gboolean need_unlock = FALSE;

/* Avoid LOR here as mutex can be acquired before in lua_call */
if (g_mutex_trylock (lua_mtx)) {
need_unlock = TRUE;
}
lua_rawgeti (ud->L, LUA_REGISTRYINDEX, ud->cbref);
if (lua_pcall (ud->L, 0, 0, 0) != 0) {
msg_info ("call to destructor failed: %s", lua_tostring (ud->L, -1));
}
luaL_unref (ud->L, LUA_REGISTRYINDEX, ud->cbref);
if (need_unlock) {
g_mutex_unlock (lua_mtx);
}
}

static int

+ 0
- 17
src/lua/lua_redis.c View File

@@ -83,9 +83,7 @@ lua_redis_fin (void *arg)

if (ud->ctx) {
redisAsyncFree (ud->ctx);
g_mutex_lock (lua_mtx);
luaL_unref (ud->L, LUA_REGISTRYINDEX, ud->cbref);
g_mutex_unlock (lua_mtx);
}
}

@@ -98,12 +96,7 @@ static void
lua_redis_push_error (const gchar *err, struct lua_redis_userdata *ud, gboolean connected)
{
struct worker_task **ptask;
gboolean need_unlock = FALSE;

/* Avoid LOR here as mutex can be acquired before in lua_call */
if (g_mutex_trylock (lua_mtx)) {
need_unlock = TRUE;
}
/* Push error */
lua_rawgeti (ud->L, LUA_REGISTRYINDEX, ud->cbref);
ptask = lua_newuserdata (ud->L, sizeof (struct worker_task *));
@@ -117,9 +110,6 @@ lua_redis_push_error (const gchar *err, struct lua_redis_userdata *ud, gboolean
if (lua_pcall (ud->L, 3, 0, 0) != 0) {
msg_info ("call to callback failed: %s", lua_tostring (ud->L, -1));
}
if (need_unlock) {
g_mutex_unlock (lua_mtx);
}

if (connected) {
remove_normal_event (ud->task->s, lua_redis_fin, ud);
@@ -136,11 +126,7 @@ static void
lua_redis_push_data (const redisReply *r, struct lua_redis_userdata *ud)
{
struct worker_task **ptask;
gboolean need_unlock = FALSE;

if (g_mutex_trylock (lua_mtx)) {
need_unlock = TRUE;
}
/* Push error */
lua_rawgeti (ud->L, LUA_REGISTRYINDEX, ud->cbref);
ptask = lua_newuserdata (ud->L, sizeof (struct worker_task *));
@@ -170,9 +156,6 @@ lua_redis_push_data (const redisReply *r, struct lua_redis_userdata *ud)
if (lua_pcall (ud->L, 3, 0, 0) != 0) {
msg_info ("call to callback failed: %s", lua_tostring (ud->L, -1));
}
if (need_unlock) {
g_mutex_unlock (lua_mtx);
}

remove_normal_event (ud->task->s, lua_redis_fin, ud);
}

+ 2
- 34
src/lua/lua_session.c View File

@@ -89,12 +89,7 @@ static gboolean
lua_session_finalizer (gpointer ud)
{
struct lua_session_udata *cbdata = ud;
gboolean need_unlock = FALSE, res;

/* Avoid LOR here as mutex can be acquired before in lua_call */
if (g_mutex_trylock (lua_mtx)) {
need_unlock = TRUE;
}
gboolean res;

/* Call finalizer function */
lua_rawgeti (cbdata->L, LUA_REGISTRYINDEX, cbdata->cbref_fin);
@@ -104,9 +99,7 @@ lua_session_finalizer (gpointer ud)
res = lua_toboolean (cbdata->L, -1);
lua_pop (cbdata->L, 1);
luaL_unref (cbdata->L, LUA_REGISTRYINDEX, cbdata->cbref_fin);
if (need_unlock) {
g_mutex_unlock (lua_mtx);
}


return res;
}
@@ -115,13 +108,8 @@ static void
lua_session_restore (gpointer ud)
{
struct lua_session_udata *cbdata = ud;
gboolean need_unlock = FALSE;

if (cbdata->cbref_restore) {
/* Avoid LOR here as mutex can be acquired before in lua_call */
if (g_mutex_trylock (lua_mtx)) {
need_unlock = TRUE;
}

/* Call restorer function */
lua_rawgeti (cbdata->L, LUA_REGISTRYINDEX, cbdata->cbref_restore);
@@ -129,9 +117,6 @@ lua_session_restore (gpointer ud)
msg_info ("call to session restorer failed: %s", lua_tostring (cbdata->L, -1));
}
luaL_unref (cbdata->L, LUA_REGISTRYINDEX, cbdata->cbref_restore);
if (need_unlock) {
g_mutex_unlock (lua_mtx);
}
}
}

@@ -139,13 +124,8 @@ static void
lua_session_cleanup (gpointer ud)
{
struct lua_session_udata *cbdata = ud;
gboolean need_unlock = FALSE;

if (cbdata->cbref_cleanup) {
/* Avoid LOR here as mutex can be acquired before in lua_call */
if (g_mutex_trylock (lua_mtx)) {
need_unlock = TRUE;
}

/* Call restorer function */
lua_rawgeti (cbdata->L, LUA_REGISTRYINDEX, cbdata->cbref_cleanup);
@@ -153,9 +133,6 @@ lua_session_cleanup (gpointer ud)
msg_info ("call to session cleanup failed: %s", lua_tostring (cbdata->L, -1));
}
luaL_unref (cbdata->L, LUA_REGISTRYINDEX, cbdata->cbref_cleanup);
if (need_unlock) {
g_mutex_unlock (lua_mtx);
}
}
}

@@ -238,23 +215,14 @@ static void
lua_event_fin (gpointer ud)
{
struct lua_event_udata *cbdata = ud;
gboolean need_unlock = FALSE;

if (cbdata->cbref) {
/* Avoid LOR here as mutex can be acquired before in lua_call */
if (g_mutex_trylock (lua_mtx)) {
need_unlock = TRUE;
}

/* Call restorer function */
lua_rawgeti (cbdata->L, LUA_REGISTRYINDEX, cbdata->cbref);
if (lua_pcall (cbdata->L, 0, 0, 0) != 0) {
msg_info ("call to event finalizer failed: %s", lua_tostring (cbdata->L, -1));
}
luaL_unref (cbdata->L, LUA_REGISTRYINDEX, cbdata->cbref);
if (need_unlock) {
g_mutex_unlock (lua_mtx);
}
}
}


+ 1
- 3
src/lua/lua_task.c View File

@@ -689,7 +689,6 @@ lua_dns_callback (struct rspamd_dns_reply *reply, gpointer arg)
union rspamd_reply_element *elt;
GList *cur;

g_mutex_lock (lua_mtx);
if (cd->cb_is_ref) {
lua_rawgeti (cd->L, LUA_REGISTRYINDEX, cd->callback.ref);
}
@@ -775,7 +774,6 @@ lua_dns_callback (struct rspamd_dns_reply *reply, gpointer arg)
if (cd->cb_is_ref) {
luaL_unref (cd->L, LUA_REGISTRYINDEX, cd->callback.ref);
}
g_mutex_unlock (lua_mtx);
}

static gint
@@ -955,7 +953,7 @@ lua_task_call_rspamd_function (lua_State * L)
f.args = g_list_prepend (f.args, arg);
}
}
res = call_expression_function (&f, task);
res = call_expression_function (&f, task, L);
lua_pushboolean (L, res);
if (f.args) {
g_list_free (f.args);

+ 8
- 4
src/lua/lua_xmlrpc.c View File

@@ -353,11 +353,11 @@ xmlrpc_text (GMarkupParseContext *context, const gchar *text, gsize text_len, gp
gdouble dnum;

/* Strip line */
while (g_ascii_isspace (*text) && text_len > 0) {
while (text_len > 0 && g_ascii_isspace (*text)) {
text ++;
text_len --;
}
while (g_ascii_isspace (text[text_len - 1]) && text_len > 0) {
while (text_len > 0 && g_ascii_isspace (text[text_len - 1])) {
text_len --;
}

@@ -417,12 +417,16 @@ lua_xmlrpc_parse_reply (lua_State *L)
G_MARKUP_TREAT_CDATA_AS_TEXT, &ud, NULL);
res = g_markup_parse_context_parse (ctx, data, s, &err);

g_markup_parse_context_free (ctx);
if (! res) {
lua_pushnil (L);
lua_pushboolean (L, FALSE);
}
else {
lua_pushboolean (L, TRUE);
}
}
else {
lua_pushnil (L);
lua_pushboolean (L, FALSE);
}

return 1;

+ 4
- 2
src/main.c View File

@@ -317,7 +317,8 @@ reread_config (struct rspamd_main *rspamd)
cfg_file = memory_pool_strdup (tmp_cfg->cfg_pool, rspamd->cfg->cfg_name);
/* Save some variables */
tmp_cfg->cfg_name = cfg_file;
init_lua (tmp_cfg);
tmp_cfg->lua_state = init_lua (tmp_cfg);
memory_pool_add_destructor (tmp_cfg->cfg_pool, (pool_destruct_func)lua_close, tmp_cfg->lua_state);

if (! load_rspamd_config (tmp_cfg, FALSE)) {
msg_err ("cannot parse new config file, revert to old one");
@@ -909,7 +910,8 @@ main (gint argc, gchar **argv, gchar **env)
g_log_set_default_handler (rspamd_glib_log_function, rspamd_main->logger);

detect_priv (rspamd_main);
init_lua (rspamd_main->cfg);
rspamd_main->cfg->lua_state = init_lua (rspamd_main->cfg);
memory_pool_add_destructor (rspamd_main->cfg->cfg_pool, (pool_destruct_func)lua_close, rspamd_main->cfg->lua_state);

pworker = &workers[0];
while (*pworker) {

+ 3
- 0
src/mem_pool.c View File

@@ -620,6 +620,9 @@ memory_pool_delete (memory_pool_t * pool)

mem_pool_stat->pools_freed++;
POOL_MTX_UNLOCK ();
#if ((GLIB_MAJOR_VERSION == 2) && (GLIB_MINOR_VERSION > 30))
g_mutex_clear (&pool->mtx);
#endif
g_slice_free (memory_pool_t, pool);
}


+ 11
- 1
src/plugins/dkim_check.c View File

@@ -294,7 +294,17 @@ dkim_module_key_handler (rspamd_dkim_key_t *key, gsize keylen, rspamd_dkim_conte
else {
/* Insert tempfail symbol */
msg_info ("cannot get key for domain %s", ctx->dns_key);
insert_result (task, dkim_module_ctx->symbol_tempfail, 1, NULL);
if (err != NULL) {
insert_result (task, dkim_module_ctx->symbol_tempfail, 1, g_list_prepend (NULL, memory_pool_strdup (task->task_pool, err->message)));

}
else {
insert_result (task, dkim_module_ctx->symbol_tempfail, 1, NULL);
}
}

if (err) {
g_error_free (err);
}
}


+ 29
- 17
src/plugins/regexp.c View File

@@ -1069,13 +1069,11 @@ process_regexp (struct rspamd_regexp *re, struct worker_task *task, const gchar
}

static gboolean
maybe_call_lua_function (const gchar *name, struct worker_task *task)
maybe_call_lua_function (const gchar *name, struct worker_task *task, lua_State *L)
{
lua_State *L = task->cfg->lua_state;
struct worker_task **ptask;
gboolean res;

g_mutex_lock (lua_mtx);
lua_getglobal (L, name);
if (lua_isfunction (L, -1)) {
ptask = lua_newuserdata (L, sizeof (struct worker_task *));
@@ -1084,18 +1082,15 @@ maybe_call_lua_function (const gchar *name, struct worker_task *task)
/* Call function */
if (lua_pcall (L, 1, 1, 0) != 0) {
msg_info ("call to %s failed: %s", (gchar *)name, lua_tostring (L, -1));
g_mutex_unlock (lua_mtx);
return FALSE;
}
res = lua_toboolean (L, -1);
lua_pop (L, 1);
g_mutex_unlock (lua_mtx);
return res;
}
else {
lua_pop (L, 1);
}
g_mutex_unlock (lua_mtx);
return FALSE;
}

@@ -1156,7 +1151,7 @@ optimize_regexp_expression (struct expression **e, GQueue * stack, gboolean res)
}

static gboolean
process_regexp_expression (struct expression *expr, gchar *symbol, struct worker_task *task, const gchar *additional)
process_regexp_expression (struct expression *expr, gchar *symbol, struct worker_task *task, const gchar *additional, struct lua_locked_state *nL)
{
GQueue *stack;
gsize cur, op1, op2;
@@ -1179,7 +1174,14 @@ process_regexp_expression (struct expression *expr, gchar *symbol, struct worker
}
}
else if (it->type == EXPR_FUNCTION) {
cur = (gsize) call_expression_function ((struct expression_function *)it->content.operand, task);
if (nL) {
rspamd_mutex_lock (nL->m);
cur = (gsize) call_expression_function ((struct expression_function *)it->content.operand, task, nL->L);
rspamd_mutex_unlock (nL->m);
}
else {
cur = (gsize) call_expression_function ((struct expression_function *)it->content.operand, task, task->cfg->lua_state);
}
debug_task ("function %s returned %s", ((struct expression_function *)it->content.operand)->name, cur ? "true" : "false");
if (try_optimize) {
try_optimize = optimize_regexp_expression (&it, stack, cur);
@@ -1191,9 +1193,14 @@ process_regexp_expression (struct expression *expr, gchar *symbol, struct worker
else if (it->type == EXPR_STR) {
/* This may be lua function, try to call it */
if (regexp_module_ctx->workers != NULL) {
g_mutex_lock (workers_mtx);
cur = maybe_call_lua_function ((const gchar*)it->content.operand, task);
g_mutex_unlock (workers_mtx);
if (nL) {
rspamd_mutex_lock (nL->m);
cur = maybe_call_lua_function ((const gchar*)it->content.operand, task, nL->L);
rspamd_mutex_unlock (nL->m);
}
else {
cur = maybe_call_lua_function ((const gchar*)it->content.operand, task, task->cfg->lua_state);
}
}
debug_task ("function %s returned %s", (const gchar *)it->content.operand, cur ? "true" : "false");
if (try_optimize) {
@@ -1278,9 +1285,10 @@ static void
process_regexp_item_threaded (gpointer data, gpointer user_data)
{
struct regexp_threaded_ud *ud = data;
struct lua_locked_state *nL = user_data;

/* Process expression */
if (process_regexp_expression (ud->item->expr, ud->item->symbol, ud->task, NULL)) {
if (process_regexp_expression (ud->item->expr, ud->item->symbol, ud->task, NULL, nL)) {
g_mutex_lock (workers_mtx);
insert_result (ud->task, ud->item->symbol, 1, NULL);
g_mutex_unlock (workers_mtx);
@@ -1295,6 +1303,7 @@ process_regexp_item (struct worker_task *task, void *user_data)
gboolean res = FALSE;
struct regexp_threaded_ud *thr_ud;
GError *err = NULL;
struct lua_locked_state *nL;


if (!item->lua_function && regexp_module_ctx->max_threads > 1) {
@@ -1308,8 +1317,10 @@ process_regexp_item (struct worker_task *task, void *user_data)
workers_mtx = memory_pool_alloc (regexp_module_ctx->regexp_pool, sizeof (GMutex));
g_mutex_init (workers_mtx);
#endif
nL = init_lua_locked (task->cfg);
luaopen_regexp (nL->L);
regexp_module_ctx->workers = g_thread_pool_new (process_regexp_item_threaded,
regexp_module_ctx, regexp_module_ctx->max_threads, TRUE, &err);
nL, regexp_module_ctx->max_threads, TRUE, &err);
if (err != NULL) {
msg_err ("thread pool creation failed: %s", err->message);
regexp_module_ctx->max_threads = 0;
@@ -1320,6 +1331,7 @@ process_regexp_item (struct worker_task *task, void *user_data)
thr_ud->item = item;
thr_ud->task = task;


register_async_thread (task->s);
g_thread_pool_push (regexp_module_ctx->workers, thr_ud, &err);
if (err != NULL) {
@@ -1337,7 +1349,7 @@ process_regexp_item (struct worker_task *task, void *user_data)
}
else {
/* Process expression */
if (process_regexp_expression (item->expr, item->symbol, task, NULL)) {
if (process_regexp_expression (item->expr, item->symbol, task, NULL, NULL)) {
insert_result (task, item->symbol, 1, NULL);
}
}
@@ -1375,7 +1387,7 @@ rspamd_regexp_match_number (struct worker_task *task, GList * args, void *unused
}
}
else {
if (process_regexp_expression (cur->data, "regexp_match_number", task, NULL)) {
if (process_regexp_expression (cur->data, "regexp_match_number", task, NULL, NULL)) {
res++;
}
if (res >= param_count) {
@@ -1608,13 +1620,13 @@ rspamd_check_smtp_data (struct worker_task *task, GList * args, void *unused)
}
else if (arg != NULL) {
if (what != NULL) {
if (process_regexp_expression (arg->data, "regexp_check_smtp_data", task, what)) {
if (process_regexp_expression (arg->data, "regexp_check_smtp_data", task, what, NULL)) {
return TRUE;
}
}
else {
while (rcpt_list) {
if (process_regexp_expression (arg->data, "regexp_check_smtp_data", task, rcpt_list->data)) {
if (process_regexp_expression (arg->data, "regexp_check_smtp_data", task, rcpt_list->data, NULL)) {
return TRUE;
}
rcpt_list = g_list_next (rcpt_list);

+ 1
- 0
src/symbols_cache.c View File

@@ -776,6 +776,7 @@ validate_cache (struct symbols_cache *cache, struct config_file *cfg, gboolean s
}
cur = g_list_next (cur);
}
g_list_free (metric_symbols);
#endif /* GLIB_COMPAT */

return TRUE;

+ 18
- 1
src/util.c View File

@@ -1363,7 +1363,7 @@ rspamd_mutex_new (void)
{
rspamd_mutex_t *new;

new = g_malloc (sizeof (rspamd_mutex_t));
new = g_slice_alloc (sizeof (rspamd_mutex_t));
#if ((GLIB_MAJOR_VERSION == 2) && (GLIB_MINOR_VERSION > 30))
g_mutex_init (&new->mtx);
#else
@@ -1401,6 +1401,15 @@ rspamd_mutex_unlock (rspamd_mutex_t *mtx)
#endif
}

void
rspamd_mutex_free (rspamd_mutex_t *mtx)
{
#if ((GLIB_MAJOR_VERSION == 2) && (GLIB_MINOR_VERSION > 30))
g_mutex_clear (&mtx->mtx);
#endif
g_slice_free1 (sizeof (rspamd_mutex_t), mtx);
}

/**
* Create new rwlock
* @return
@@ -1476,6 +1485,14 @@ rspamd_rwlock_reader_unlock (rspamd_rwlock_t *mtx)
#endif
}

void
rspamd_rwlock_free (rspamd_rwlock_t *mtx)
{
#if ((GLIB_MAJOR_VERSION == 2) && (GLIB_MINOR_VERSION > 30))
g_rw_lock_clear (&mtx->rwlock);
#endif
g_slice_free1 (sizeof (rspamd_rwlock_t), mtx);
}

struct rspamd_thread_data {
gchar *name;

+ 12
- 0
src/util.h View File

@@ -265,6 +265,12 @@ void rspamd_mutex_lock (rspamd_mutex_t *mtx);
*/
void rspamd_mutex_unlock (rspamd_mutex_t *mtx);

/**
* Clear rspamd mutex
* @param mtx
*/
void rspamd_mutex_free (rspamd_mutex_t *mtx);

/**
* Create new rwloc
* @return
@@ -295,6 +301,12 @@ void rspamd_rwlock_writer_unlock (rspamd_rwlock_t *mtx);
*/
void rspamd_rwlock_reader_unlock (rspamd_rwlock_t *mtx);

/**
* Free rwlock
* @param mtx
*/
void rspamd_rwlock_free (rspamd_rwlock_t *mtx);

/**
* Create new named thread
* @param name name pattern

+ 3
- 1
src/worker.c View File

@@ -782,6 +782,7 @@ start_worker (struct rspamd_worker *worker)
gchar *is_custom_str;
struct rspamd_worker_ctx *ctx = worker->ctx;
GError *err = NULL;
struct lua_locked_state *nL;

#ifdef WITH_PROFILER
extern void _start (void), etext (void);
@@ -836,7 +837,8 @@ start_worker (struct rspamd_worker *worker)
/* Create classify pool */
ctx->classify_pool = NULL;
if (ctx->classify_threads > 1) {
ctx->classify_pool = g_thread_pool_new (process_statfiles_threaded, ctx, ctx->classify_threads, TRUE, &err);
nL = init_lua_locked (worker->srv->cfg);
ctx->classify_pool = g_thread_pool_new (process_statfiles_threaded, nL, ctx->classify_threads, TRUE, &err);
if (err != NULL) {
msg_err ("pool create failed: %s", err->message);
ctx->classify_pool = NULL;

Loading…
Cancel
Save