aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--src/libmime/mime_expressions.c6
-rw-r--r--src/libserver/cfg_file.h1
-rw-r--r--src/libserver/cfg_utils.c3
-rw-r--r--src/libserver/composites.c9
-rw-r--r--src/libutil/expression.c24
-rw-r--r--src/libutil/expression.h22
-rw-r--r--src/lua/CMakeLists.txt3
-rw-r--r--src/lua/lua_config.c103
-rw-r--r--src/lua/lua_expression.c56
-rw-r--r--src/lua/lua_thread_pool.c128
-rw-r--r--src/lua/lua_thread_pool.h81
-rw-r--r--src/plugins/regexp.c7
12 files changed, 363 insertions, 80 deletions
diff --git a/src/libmime/mime_expressions.c b/src/libmime/mime_expressions.c
index f9bfdc1bf..518b9a390 100644
--- a/src/libmime/mime_expressions.c
+++ b/src/libmime/mime_expressions.c
@@ -83,7 +83,7 @@ static gboolean rspamd_is_empty_body (struct rspamd_task *task,
static rspamd_expression_atom_t * rspamd_mime_expr_parse (const gchar *line, gsize len,
rspamd_mempool_t *pool, gpointer ud, GError **err);
-static gdouble rspamd_mime_expr_process (gpointer input, rspamd_expression_atom_t *atom);
+static gdouble rspamd_mime_expr_process (struct rspamd_expr_process_data *process_data, rspamd_expression_atom_t *atom);
static gint rspamd_mime_expr_priority (rspamd_expression_atom_t *atom);
static void rspamd_mime_expr_destroy (rspamd_expression_atom_t *atom);
@@ -913,9 +913,9 @@ rspamd_mime_expr_process_function (struct rspamd_function_atom * func,
}
static gdouble
-rspamd_mime_expr_process (gpointer input, rspamd_expression_atom_t *atom)
+rspamd_mime_expr_process (struct rspamd_expr_process_data *process_data, rspamd_expression_atom_t *atom)
{
- struct rspamd_task *task = input;
+ struct rspamd_task *task = process_data->task;
struct rspamd_mime_atom *mime_atom;
lua_State *L;
gdouble ret = 0;
diff --git a/src/libserver/cfg_file.h b/src/libserver/cfg_file.h
index 75b404530..c583766c4 100644
--- a/src/libserver/cfg_file.h
+++ b/src/libserver/cfg_file.h
@@ -380,6 +380,7 @@ struct rspamd_config {
gchar * checksum; /**< real checksum of config file */
gchar * dump_checksum; /**< dump checksum of config file */
gpointer lua_state; /**< pointer to lua state */
+ gpointer lua_thread_pool; /**< pointer to lua thread (coroutine) pool */
gchar * rrd_file; /**< rrd file to store statistics */
gchar * history_file; /**< file to save rolling history */
diff --git a/src/libserver/cfg_utils.c b/src/libserver/cfg_utils.c
index b7b9dfdee..016556912 100644
--- a/src/libserver/cfg_utils.c
+++ b/src/libserver/cfg_utils.c
@@ -20,6 +20,7 @@
#include "uthash_strcase.h"
#include "filter.h"
#include "lua/lua_common.h"
+#include "lua/lua_thread_pool.h"
#include "map.h"
#include "map_helpers.h"
#include "map_private.h"
@@ -175,6 +176,7 @@ rspamd_config_new (enum rspamd_config_init_flags flags)
if (!(flags & RSPAMD_CONFIG_INIT_SKIP_LUA)) {
cfg->lua_state = rspamd_lua_init ();
cfg->own_lua_state = TRUE;
+ cfg->lua_thread_pool = lua_thread_pool_new (cfg->lua_state);
}
cfg->cache = rspamd_symbols_cache_new (cfg);
@@ -259,6 +261,7 @@ rspamd_config_free (struct rspamd_config *cfg)
g_ptr_array_free (cfg->c_modules, TRUE);
if (cfg->lua_state && cfg->own_lua_state) {
+ lua_thread_pool_free (cfg->lua_thread_pool);
lua_close (cfg->lua_state);
}
REF_RELEASE (cfg->libs_ctx);
diff --git a/src/libserver/composites.c b/src/libserver/composites.c
index 8f3cb179d..125e97e6d 100644
--- a/src/libserver/composites.c
+++ b/src/libserver/composites.c
@@ -344,8 +344,13 @@ composites_foreach_callback (gpointer key, gpointer value, void *data)
return;
}
- rc = rspamd_process_expression (comp->expr,
- RSPAMD_EXPRESSION_FLAG_NOOPT, cd);
+ struct rspamd_expr_process_data process_data;
+ memset (&process_data, 0, sizeof process_data);
+
+ process_data.flags = RSPAMD_EXPRESSION_FLAG_NOOPT;
+ process_data.cd = cd;
+
+ rc = rspamd_process_expression (comp->expr, &process_data);
/* Checked bit */
setbit (cd->checked, comp->id * 2);
diff --git a/src/libutil/expression.c b/src/libutil/expression.c
index 21a137f43..2469d0415 100644
--- a/src/libutil/expression.c
+++ b/src/libutil/expression.c
@@ -984,8 +984,8 @@ rspamd_ast_do_op (struct rspamd_expression_elt *elt, gdouble val,
}
static gdouble
-rspamd_ast_process_node (struct rspamd_expression *expr, gint flags, GNode *node,
- gpointer data, GPtrArray *track)
+rspamd_ast_process_node (struct rspamd_expression *expr, GNode *node,
+ struct rspamd_expr_process_data *process_data)
{
struct rspamd_expression_elt *elt, *celt, *parelt = NULL;
GNode *cld;
@@ -1010,13 +1010,13 @@ rspamd_ast_process_node (struct rspamd_expression *expr, gint flags, GNode *node
t1 = rspamd_get_ticks (TRUE);
}
- elt->value = expr->subr->process (data, elt->p.atom);
+ elt->value = expr->subr->process (process_data, elt->p.atom);
if (fabs (elt->value) > 1e-9) {
elt->p.atom->hits ++;
- if (track) {
- g_ptr_array_add (track, elt->p.atom);
+ if (process_data->trace) {
+ g_ptr_array_add (process_data->trace, elt->p.atom);
}
}
@@ -1057,7 +1057,7 @@ rspamd_ast_process_node (struct rspamd_expression *expr, gint flags, GNode *node
continue;
}
- val = rspamd_ast_process_node (expr, flags, cld, data, track);
+ val = rspamd_ast_process_node (expr, cld, process_data);
if (isnan (acc)) {
acc = rspamd_ast_do_op (elt, val, 0, lim, TRUE);
@@ -1066,7 +1066,7 @@ rspamd_ast_process_node (struct rspamd_expression *expr, gint flags, GNode *node
acc = rspamd_ast_do_op (elt, val, acc, lim, FALSE);
}
- if (!(flags & RSPAMD_EXPRESSION_FLAG_NOOPT)) {
+ if (!(process_data->flags & RSPAMD_EXPRESSION_FLAG_NOOPT)) {
if (rspamd_ast_node_done (elt, parelt, acc, lim)) {
return acc;
}
@@ -1090,8 +1090,7 @@ rspamd_ast_cleanup_traverse (GNode *n, gpointer d)
}
gdouble
-rspamd_process_expression_track (struct rspamd_expression *expr, gint flags,
- gpointer data, GPtrArray *track)
+rspamd_process_expression_track (struct rspamd_expression *expr, struct rspamd_expr_process_data *process_data)
{
gdouble ret = 0;
@@ -1099,7 +1098,7 @@ rspamd_process_expression_track (struct rspamd_expression *expr, gint flags,
/* Ensure that stack is empty at this point */
g_assert (expr->expression_stack->len == 0);
- ret = rspamd_ast_process_node (expr, flags, expr->ast, data, track);
+ ret = rspamd_ast_process_node (expr, expr->ast, process_data);
/* Cleanup */
g_node_traverse (expr->ast, G_IN_ORDER, G_TRAVERSE_ALL, -1,
@@ -1124,10 +1123,9 @@ rspamd_process_expression_track (struct rspamd_expression *expr, gint flags,
}
gdouble
-rspamd_process_expression (struct rspamd_expression *expr, gint flags,
- gpointer data)
+rspamd_process_expression (struct rspamd_expression *expr, struct rspamd_expr_process_data *process_data)
{
- return rspamd_process_expression_track (expr, flags, data, NULL);
+ return rspamd_process_expression_track (expr, process_data);
}
static gboolean
diff --git a/src/libutil/expression.h b/src/libutil/expression.h
index fefde2974..7f7cb2dda 100644
--- a/src/libutil/expression.h
+++ b/src/libutil/expression.h
@@ -56,12 +56,24 @@ typedef struct rspamd_expression_atom_s {
gint priority;
} rspamd_expression_atom_t;
+struct rspamd_expr_process_data {
+ /* Current Lua state to run atom processing */
+ struct lua_State *L;
+ /* Parameter of lua-function processing atom*/
+ gint stack_item;
+ gint flags;
+ /* != NULL if trace is collected */
+ GPtrArray *trace;
+ struct composites_data *cd;
+ struct rspamd_task *task;
+};
+
struct rspamd_atom_subr {
/* Parses atom from string and returns atom structure */
rspamd_expression_atom_t * (*parse)(const gchar *line, gsize len,
rspamd_mempool_t *pool, gpointer ud, GError **err);
/* Process atom via the opaque pointer (e.g. struct rspamd_task *) */
- gdouble (*process) (gpointer input, rspamd_expression_atom_t *atom);
+ gdouble (*process) (struct rspamd_expr_process_data *process_data, rspamd_expression_atom_t *atom);
/* Calculates the relative priority of the expression */
gint (*priority) (rspamd_expression_atom_t *atom);
void (*destroy) (rspamd_expression_atom_t *atom);
@@ -92,8 +104,8 @@ gboolean rspamd_parse_expression (const gchar *line, gsize len,
* @param data opaque data pointer for all the atoms
* @return the value of expression
*/
-gdouble rspamd_process_expression (struct rspamd_expression *expr, gint flags,
- gpointer data);
+gdouble rspamd_process_expression (struct rspamd_expression *expr,
+ struct rspamd_expr_process_data *process_data);
/**
* Process the expression and return its value using atom 'process' functions with the specified data pointer.
@@ -103,8 +115,8 @@ gdouble rspamd_process_expression (struct rspamd_expression *expr, gint flags,
* @param track pointer array to atoms tracking
* @return the value of expression
*/
-gdouble rspamd_process_expression_track (struct rspamd_expression *expr, gint flags,
- gpointer data, GPtrArray *track);
+gdouble rspamd_process_expression_track (struct rspamd_expression *expr,
+ struct rspamd_expr_process_data *process_data);
/**
* Shows string representation of an expression
diff --git a/src/lua/CMakeLists.txt b/src/lua/CMakeLists.txt
index 9c561c0e4..ffc4b27ca 100644
--- a/src/lua/CMakeLists.txt
+++ b/src/lua/CMakeLists.txt
@@ -25,6 +25,7 @@ SET(LUASRC ${CMAKE_CURRENT_SOURCE_DIR}/lua_common.c
${CMAKE_CURRENT_SOURCE_DIR}/lua_fann.c
${CMAKE_CURRENT_SOURCE_DIR}/lua_sqlite3.c
${CMAKE_CURRENT_SOURCE_DIR}/lua_cryptobox.c
- ${CMAKE_CURRENT_SOURCE_DIR}/lua_map.c)
+ ${CMAKE_CURRENT_SOURCE_DIR}/lua_map.c
+ ${CMAKE_CURRENT_SOURCE_DIR}/lua_thread_pool.c)
SET(RSPAMD_LUA ${LUASRC} PARENT_SCOPE) \ No newline at end of file
diff --git a/src/lua/lua_config.c b/src/lua/lua_config.c
index d7af3956f..3a38d437b 100644
--- a/src/lua/lua_config.c
+++ b/src/lua/lua_config.c
@@ -19,6 +19,7 @@
#include "libserver/composites.h"
#include "libmime/lang_detection.h"
#include "lua/lua_map.h"
+#include "lua/lua_thread_pool.h"
#include "utlist.h"
#include <math.h>
@@ -1042,6 +1043,8 @@ struct lua_callback_data {
gint ref;
} callback;
gboolean cb_is_ref;
+ gpointer thread_entry;
+ gint stack_level;
gint order;
};
@@ -1191,42 +1194,71 @@ lua_watcher_callback (gpointer session_data, gpointer ud)
}
static void
+lua_metric_symbol_callback_return (struct rspamd_task *task, gpointer ud, gint ret);
+
+static void
lua_metric_symbol_callback (struct rspamd_task *task, gpointer ud)
{
struct lua_callback_data *cd = ud;
struct rspamd_task **ptask;
- gint level = lua_gettop (cd->L), nresults, err_idx, ret;
- lua_State *L = cd->L;
- GString *tb;
- struct rspamd_symbol_result *s;
+ gint ret;
- lua_pushcfunction (L, &rspamd_lua_traceback);
- err_idx = lua_gettop (L);
+ struct thread_entry *thread_entry = lua_thread_pool_get (task->cfg->lua_thread_pool);
+ cd->thread_entry = thread_entry;
- level ++;
+ lua_State *thread = thread_entry->lua_state;
+ cd->stack_level = lua_gettop (cd->L);
if (cd->cb_is_ref) {
- lua_rawgeti (L, LUA_REGISTRYINDEX, cd->callback.ref);
+ lua_rawgeti (thread, LUA_REGISTRYINDEX, cd->callback.ref);
}
else {
- lua_getglobal (L, cd->callback.name);
+ lua_getglobal (thread, cd->callback.name);
}
- ptask = lua_newuserdata (L, sizeof (struct rspamd_task *));
- rspamd_lua_setclass (L, "rspamd{task}", -1);
+ ptask = lua_newuserdata (thread, sizeof (struct rspamd_task *));
+ rspamd_lua_setclass (thread, "rspamd{task}", -1);
*ptask = task;
- if ((ret = lua_pcall (L, 1, LUA_MULTRET, err_idx)) != 0) {
- tb = lua_touserdata (L, -1);
+ ret = lua_resume (thread, 1);
+
+ if (ret != LUA_YIELD) {
+ /*
+ LUA_YIELD state should not be handled here.
+ It should only happen when the thread initiated a asynchronous event and it will be restored as soon
+ the event is finished
+ */
+ lua_metric_symbol_callback_return (task, ud, ret);
+ }
+}
+
+static void
+lua_metric_symbol_callback_return (struct rspamd_task *task, gpointer ud, gint ret)
+{
+ GString *tb;
+ struct lua_callback_data *cd = ud;
+ int nresults;
+ struct rspamd_symbol_result *s;
+ struct thread_entry *thread_entry = cd->thread_entry;
+ lua_State *thread = thread_entry->lua_state;
+
+ if (ret != 0) {
+ lua_pushcfunction (thread, rspamd_lua_traceback);
+ lua_call (thread, 0, LUA_MULTRET);
+
+ tb = lua_touserdata (thread, -1);
msg_err_task ("call to (%s) failed (%d): %v", cd->symbol, ret, tb);
if (tb) {
g_string_free (tb, TRUE);
- lua_pop (L, 1);
+ lua_pop (thread, 1);
}
+ g_assert (lua_gettop (thread) >= cd->stack_level);
+ // maybe there is a way to recover here. For now, just remove faulty thread
+ lua_thread_pool_terminate_entry (task->cfg->lua_thread_pool, cd->thread_entry);
}
else {
- nresults = lua_gettop (L) - level;
+ nresults = lua_gettop (thread) - cd->stack_level;
if (nresults >= 1) {
/* Function returned boolean, so maybe we need to insert result? */
@@ -1236,16 +1268,16 @@ lua_metric_symbol_callback (struct rspamd_task *task, gpointer ud)
gint type;
struct lua_watcher_data *wd;
- type = lua_type (cd->L, level + 1);
+ type = lua_type (thread, cd->stack_level + 1);
if (type == LUA_TBOOLEAN) {
- res = lua_toboolean (L, level + 1);
+ res = lua_toboolean (thread, cd->stack_level + 1);
}
else if (type == LUA_TFUNCTION) {
/* Function returned a closure that should be watched for */
wd = rspamd_mempool_alloc (task->task_pool, sizeof (*wd));
- lua_pushvalue (cd->L, level + 1);
- wd->cb_ref = luaL_ref (L, LUA_REGISTRYINDEX);
+ lua_pushvalue (thread /*cd->L*/, cd->stack_level + 1);
+ wd->cb_ref = luaL_ref (thread, LUA_REGISTRYINDEX);
wd->cbd = cd;
rspamd_session_watcher_push_callback (task->s,
rspamd_session_get_watcher (task->s),
@@ -1258,14 +1290,14 @@ lua_metric_symbol_callback (struct rspamd_task *task, gpointer ud)
rspamd_session_get_watcher (task->s));
}
else {
- res = lua_tonumber (L, level + 1);
+ res = lua_tonumber (thread, cd->stack_level + 1);
}
if (res) {
gint first_opt = 2;
- if (lua_type (L, level + 2) == LUA_TNUMBER) {
- flag = lua_tonumber (L, level + 2);
+ if (lua_type (thread, cd->stack_level + 2) == LUA_TNUMBER) {
+ flag = lua_tonumber (thread, cd->stack_level + 2);
/* Shift opt index */
first_opt = 3;
}
@@ -1276,35 +1308,40 @@ lua_metric_symbol_callback (struct rspamd_task *task, gpointer ud)
s = rspamd_task_insert_result (task, cd->symbol, flag, NULL);
if (s) {
- guint last_pos = lua_gettop (L);
+ guint last_pos = lua_gettop (thread);
- for (i = level + first_opt; i <= last_pos; i++) {
- if (lua_type (L, i) == LUA_TSTRING) {
- const char *opt = lua_tostring (L, i);
+ for (i = cd->stack_level + first_opt; i <= last_pos; i++) {
+ if (lua_type (thread, i) == LUA_TSTRING) {
+ const char *opt = lua_tostring (thread, i);
rspamd_task_add_result_option (task, s, opt);
}
- else if (lua_type (L, i) == LUA_TTABLE) {
- lua_pushvalue (L, i);
+ else if (lua_type (thread, i) == LUA_TTABLE) {
+ lua_pushvalue (thread, i);
- for (lua_pushnil (L); lua_next (L, -2); lua_pop (L, 1)) {
- const char *opt = lua_tostring (L, -1);
+ for (lua_pushnil (thread); lua_next (thread, -2); lua_pop (thread, 1)) {
+ const char *opt = lua_tostring (thread, -1);
rspamd_task_add_result_option (task, s, opt);
}
- lua_pop (L, 1);
+ lua_pop (thread, 1);
}
}
}
}
- lua_pop (L, nresults);
+ lua_pop (thread, nresults);
}
+
+ g_assert (lua_gettop (thread) == cd->stack_level); /* we properly cleaned up the stack */
+
+ lua_thread_pool_return(task->cfg->lua_thread_pool, cd->thread_entry);
}
- lua_pop (L, 1); /* Error function */
+ cd->thread_entry = NULL;
+ cd->stack_level = 0;
}
static gint
diff --git a/src/lua/lua_expression.c b/src/lua/lua_expression.c
index 03a667b8d..0d57a3bd8 100644
--- a/src/lua/lua_expression.c
+++ b/src/lua/lua_expression.c
@@ -98,7 +98,7 @@ static const struct luaL_reg exprlib_f[] = {
static rspamd_expression_atom_t * lua_atom_parse (const gchar *line, gsize len,
rspamd_mempool_t *pool, gpointer ud, GError **err);
-static gdouble lua_atom_process (gpointer input, rspamd_expression_atom_t *atom);
+static gdouble lua_atom_process (struct rspamd_expr_process_data *process_data, rspamd_expression_atom_t *atom);
static const struct rspamd_atom_subr lua_atom_subr = {
.parse = lua_atom_parse,
@@ -166,22 +166,22 @@ lua_atom_parse (const gchar *line, gsize len,
}
static gdouble
-lua_atom_process (gpointer input, rspamd_expression_atom_t *atom)
+lua_atom_process (struct rspamd_expr_process_data *process_data, rspamd_expression_atom_t *atom)
{
struct lua_expression *e = (struct lua_expression *)atom->data;
gdouble ret = 0;
- lua_rawgeti (e->L, LUA_REGISTRYINDEX, e->process_idx);
- lua_pushlstring (e->L, atom->str, atom->len);
- lua_pushvalue (e->L, GPOINTER_TO_INT (input));
+ lua_rawgeti (process_data->L, LUA_REGISTRYINDEX, e->process_idx);
+ lua_pushlstring (process_data->L, atom->str, atom->len);
+ lua_pushvalue (process_data->L, process_data->stack_item);
- if (lua_pcall (e->L, 2, 1, 0) != 0) {
- msg_info ("callback call failed: %s", lua_tostring (e->L, -1));
- lua_pop (e->L, 1);
+ if (lua_pcall (process_data->L, 2, 1, 0) != 0) {
+ msg_info ("callback call failed: %s", lua_tostring (process_data->L, -1));
+ lua_pop (process_data->L, 1);
}
else {
- ret = lua_tonumber (e->L, -1);
- lua_pop (e->L, 1);
+ ret = lua_tonumber (process_data->L, -1);
+ lua_pop (process_data->L, 1);
}
return ret;
@@ -195,11 +195,16 @@ lua_expr_process (lua_State *L)
gdouble res;
gint flags = 0;
+ struct rspamd_expr_process_data process_data;
+ memset (&process_data, 0, sizeof process_data);
+ process_data.L = L;
+ process_data.stack_item = 2;
+
if (lua_gettop (L) >= 3) {
- flags = lua_tonumber (L, 3);
+ process_data.flags = flags;
}
- res = rspamd_process_expression (e->expr, flags, GINT_TO_POINTER (2));
+ res = rspamd_process_expression (e->expr, &process_data);
lua_pushnumber (L, res);
@@ -214,29 +219,36 @@ lua_expr_process_traced (lua_State *L)
rspamd_expression_atom_t *atom;
gint res;
guint i;
- gint flags = 0;
- GPtrArray *trace;
+ struct rspamd_expr_process_data process_data;
+ memset (&process_data, 0, sizeof process_data);
+
+ process_data.L = L;
+ /*
+ * stack:1 - self
+ * stack:2 - data, see process_traced() definition for details
+ */
+ process_data.stack_item = 2;
if (lua_gettop (L) >= 3) {
- flags = lua_tonumber (L, 3);
+ process_data.flags = lua_tonumber (L, 3);
}
- trace = g_ptr_array_sized_new (32);
- res = rspamd_process_expression_track (e->expr, flags, GINT_TO_POINTER (2),
- trace);
+ process_data.trace = g_ptr_array_sized_new (32);
+
+ res = rspamd_process_expression_track (e->expr, &process_data);
lua_pushnumber (L, res);
- lua_createtable (L, trace->len, 0);
+ lua_createtable (L, process_data.trace->len, 0);
- for (i = 0; i < trace->len; i ++) {
- atom = g_ptr_array_index (trace, i);
+ for (i = 0; i < process_data.trace->len; i ++) {
+ atom = g_ptr_array_index (process_data.trace, i);
lua_pushlstring (L, atom->str, atom->len);
lua_rawseti (L, -2, i + 1);
}
- g_ptr_array_free (trace, TRUE);
+ g_ptr_array_free (process_data.trace, TRUE);
return 2;
}
diff --git a/src/lua/lua_thread_pool.c b/src/lua/lua_thread_pool.c
new file mode 100644
index 000000000..07364f270
--- /dev/null
+++ b/src/lua/lua_thread_pool.c
@@ -0,0 +1,128 @@
+#include "config.h"
+
+#include "lua_common.h"
+#include "lua_thread_pool.h"
+
+struct lua_thread_pool {
+ GQueue *available_items;
+ lua_State *L;
+ gint max_items;
+ struct thread_entry *running_entry;
+};
+
+static struct thread_entry *
+thread_entry_new (lua_State * L)
+{
+ struct thread_entry *ent;
+ ent = g_malloc (sizeof *ent);
+ ent->lua_state = lua_newthread (L);
+ ent->thread_index = luaL_ref (L, LUA_REGISTRYINDEX);
+
+ return ent;
+}
+
+static void
+thread_entry_free (lua_State * L, struct thread_entry *ent)
+{
+ luaL_unref (L, LUA_REGISTRYINDEX, ent->thread_index);
+ g_free (ent);
+}
+
+struct lua_thread_pool *
+lua_thread_pool_new (lua_State * L)
+{
+ struct lua_thread_pool * pool = g_new0 (struct lua_thread_pool, 1);
+
+ pool->L = L;
+ pool->max_items = 100;
+
+ pool->available_items = g_queue_new ();
+ int i;
+
+ struct thread_entry *ent;
+ for (i = 0; i < MAX(2, pool->max_items / 10); i ++) {
+ ent = thread_entry_new (pool->L);
+ g_queue_push_head (pool->available_items, ent);
+ }
+
+ return pool;
+}
+
+void
+lua_thread_pool_free (struct lua_thread_pool *pool)
+{
+ struct thread_entry *ent = NULL;
+ while (!g_queue_is_empty (pool->available_items)) {
+ ent = g_queue_pop_head (pool->available_items);
+ thread_entry_free (pool->L, ent);
+ }
+ g_queue_free (pool->available_items);
+ g_free (pool);
+}
+
+struct thread_entry *
+lua_thread_pool_get(struct lua_thread_pool *pool)
+{
+ gpointer cur;
+ struct thread_entry *ent = NULL;
+
+ cur = g_queue_pop_head (pool->available_items);
+
+ if (cur) {
+ ent = cur;
+ }
+ else {
+ ent = thread_entry_new (pool->L);
+ }
+
+ return ent;
+}
+
+void
+lua_thread_pool_return(struct lua_thread_pool *pool, struct thread_entry *thread_entry)
+{
+ g_assert (lua_status (thread_entry->lua_state) == 0); /* we can't return a running/yielded thread into the pool */
+
+ if (pool->running_entry == thread_entry) {
+ pool->running_entry = NULL;
+ }
+
+ if (g_queue_get_length (pool->available_items) <= pool->max_items) {
+ g_queue_push_head (pool->available_items, thread_entry);
+ }
+ else {
+ thread_entry_free (pool->L, thread_entry);
+ }
+}
+
+void
+lua_thread_pool_terminate_entry(struct lua_thread_pool *pool, struct thread_entry *thread_entry)
+{
+ struct thread_entry *ent = NULL;
+
+ /* we should only terminate failed threads */
+ g_assert (lua_status (thread_entry->lua_state) != 0 && lua_status (thread_entry->lua_state) != LUA_YIELD);
+
+ if (pool->running_entry == thread_entry) {
+ pool->running_entry = NULL;
+ }
+
+ thread_entry_free (pool->L, thread_entry);
+
+ if (g_queue_get_length (pool->available_items) <= pool->max_items) {
+ ent = thread_entry_new (pool->L);
+ g_queue_push_head (pool->available_items, ent);
+ }
+}
+
+struct thread_entry *
+lua_thread_pool_get_running_entry(struct lua_thread_pool *pool)
+{
+ return pool->running_entry;
+}
+
+void
+lua_thread_pool_set_running_entry(struct lua_thread_pool *pool, struct thread_entry *thread_entry)
+{
+ pool->running_entry = thread_entry;
+}
diff --git a/src/lua/lua_thread_pool.h b/src/lua/lua_thread_pool.h
new file mode 100644
index 000000000..33a0da879
--- /dev/null
+++ b/src/lua/lua_thread_pool.h
@@ -0,0 +1,81 @@
+#ifndef LUA_THREAD_POOL_H_
+#define LUA_THREAD_POOL_H_
+
+#include <lua.h>
+
+struct thread_entry {
+ lua_State *lua_state;
+ gint thread_index;
+};
+
+struct thread_pool;
+
+/**
+ * Allocates new thread pool on state L. Pre-creates number of lua-threads to use later on
+ *
+ * @param L
+ * @return
+ */
+struct lua_thread_pool *
+lua_thread_pool_new (lua_State * L);
+
+/**
+ * Destroys the pool
+ * @param pool
+ */
+void
+lua_thread_pool_free (struct lua_thread_pool *pool);
+
+/**
+ * Extracts a thread from the list of available ones.
+ * It immediately becames running one and should be used to run a Lua script/function straight away.
+ * as soon as the code is finished, it should be either returned into list of available threads by
+ * calling lua_thread_pool_return() or terminated by calling lua_thread_pool_terminate_entry()
+ * if the code finished with error.
+ *
+ * If the code performed YIELD, the thread is still running and it's live should be controlled by the callee
+ *
+ * @param pool
+ * @return
+ */
+struct thread_entry *
+lua_thread_pool_get(struct lua_thread_pool *pool);
+
+/**
+ * Return thread into the list of available ones. It can't be done with yielded or dead threads.
+ *
+ * @param pool
+ * @param thread_entry
+ */
+void
+lua_thread_pool_return(struct lua_thread_pool *pool, struct thread_entry *thread_entry);
+
+/**
+ * Removes thread from Lua state. It should be done to dead (which ended with an error) threads only
+ *
+ * @param pool
+ * @param thread_entry
+ */
+void
+lua_thread_pool_terminate_entry(struct lua_thread_pool *pool, struct thread_entry *thread_entry);
+
+/**
+ * Currently running thread. Typically needed in yielding point - to fill-up continuation.
+ *
+ * @param pool
+ * @return
+ */
+struct thread_entry *
+lua_thread_pool_get_running_entry(struct lua_thread_pool *pool);
+
+/**
+ * Updates currently running thread
+ *
+ * @param pool
+ * @param thread_entry
+ */
+void
+lua_thread_pool_set_running_entry(struct lua_thread_pool *pool, struct thread_entry *thread_entry);
+
+#endif /* LUA_THREAD_POOL_H_ */
+
diff --git a/src/plugins/regexp.c b/src/plugins/regexp.c
index 915305aa3..92cccc338 100644
--- a/src/plugins/regexp.c
+++ b/src/plugins/regexp.c
@@ -433,7 +433,12 @@ process_regexp_item (struct rspamd_task *task, void *user_data)
else {
/* Process expression */
if (item->expr) {
- res = rspamd_process_expression (item->expr, 0, task);
+ struct rspamd_expr_process_data process_data;
+ memset (&process_data, 0, sizeof process_data);
+
+ process_data.task = task;
+
+ res = rspamd_process_expression (item->expr, &process_data);
}
else {
msg_warn_task ("FIXME: %s symbol is broken with new expressions",