diff options
-rw-r--r-- | src/libmime/mime_expressions.c | 6 | ||||
-rw-r--r-- | src/libserver/cfg_file.h | 1 | ||||
-rw-r--r-- | src/libserver/cfg_utils.c | 3 | ||||
-rw-r--r-- | src/libserver/composites.c | 9 | ||||
-rw-r--r-- | src/libutil/expression.c | 24 | ||||
-rw-r--r-- | src/libutil/expression.h | 22 | ||||
-rw-r--r-- | src/lua/CMakeLists.txt | 3 | ||||
-rw-r--r-- | src/lua/lua_config.c | 103 | ||||
-rw-r--r-- | src/lua/lua_expression.c | 56 | ||||
-rw-r--r-- | src/lua/lua_thread_pool.c | 128 | ||||
-rw-r--r-- | src/lua/lua_thread_pool.h | 81 | ||||
-rw-r--r-- | src/plugins/regexp.c | 7 |
12 files changed, 363 insertions, 80 deletions
diff --git a/src/libmime/mime_expressions.c b/src/libmime/mime_expressions.c index f9bfdc1bf..518b9a390 100644 --- a/src/libmime/mime_expressions.c +++ b/src/libmime/mime_expressions.c @@ -83,7 +83,7 @@ static gboolean rspamd_is_empty_body (struct rspamd_task *task, static rspamd_expression_atom_t * rspamd_mime_expr_parse (const gchar *line, gsize len, rspamd_mempool_t *pool, gpointer ud, GError **err); -static gdouble rspamd_mime_expr_process (gpointer input, rspamd_expression_atom_t *atom); +static gdouble rspamd_mime_expr_process (struct rspamd_expr_process_data *process_data, rspamd_expression_atom_t *atom); static gint rspamd_mime_expr_priority (rspamd_expression_atom_t *atom); static void rspamd_mime_expr_destroy (rspamd_expression_atom_t *atom); @@ -913,9 +913,9 @@ rspamd_mime_expr_process_function (struct rspamd_function_atom * func, } static gdouble -rspamd_mime_expr_process (gpointer input, rspamd_expression_atom_t *atom) +rspamd_mime_expr_process (struct rspamd_expr_process_data *process_data, rspamd_expression_atom_t *atom) { - struct rspamd_task *task = input; + struct rspamd_task *task = process_data->task; struct rspamd_mime_atom *mime_atom; lua_State *L; gdouble ret = 0; diff --git a/src/libserver/cfg_file.h b/src/libserver/cfg_file.h index 75b404530..c583766c4 100644 --- a/src/libserver/cfg_file.h +++ b/src/libserver/cfg_file.h @@ -380,6 +380,7 @@ struct rspamd_config { gchar * checksum; /**< real checksum of config file */ gchar * dump_checksum; /**< dump checksum of config file */ gpointer lua_state; /**< pointer to lua state */ + gpointer lua_thread_pool; /**< pointer to lua thread (coroutine) pool */ gchar * rrd_file; /**< rrd file to store statistics */ gchar * history_file; /**< file to save rolling history */ diff --git a/src/libserver/cfg_utils.c b/src/libserver/cfg_utils.c index b7b9dfdee..016556912 100644 --- a/src/libserver/cfg_utils.c +++ b/src/libserver/cfg_utils.c @@ -20,6 +20,7 @@ #include "uthash_strcase.h" #include "filter.h" #include "lua/lua_common.h" +#include "lua/lua_thread_pool.h" #include "map.h" #include "map_helpers.h" #include "map_private.h" @@ -175,6 +176,7 @@ rspamd_config_new (enum rspamd_config_init_flags flags) if (!(flags & RSPAMD_CONFIG_INIT_SKIP_LUA)) { cfg->lua_state = rspamd_lua_init (); cfg->own_lua_state = TRUE; + cfg->lua_thread_pool = lua_thread_pool_new (cfg->lua_state); } cfg->cache = rspamd_symbols_cache_new (cfg); @@ -259,6 +261,7 @@ rspamd_config_free (struct rspamd_config *cfg) g_ptr_array_free (cfg->c_modules, TRUE); if (cfg->lua_state && cfg->own_lua_state) { + lua_thread_pool_free (cfg->lua_thread_pool); lua_close (cfg->lua_state); } REF_RELEASE (cfg->libs_ctx); diff --git a/src/libserver/composites.c b/src/libserver/composites.c index 8f3cb179d..125e97e6d 100644 --- a/src/libserver/composites.c +++ b/src/libserver/composites.c @@ -344,8 +344,13 @@ composites_foreach_callback (gpointer key, gpointer value, void *data) return; } - rc = rspamd_process_expression (comp->expr, - RSPAMD_EXPRESSION_FLAG_NOOPT, cd); + struct rspamd_expr_process_data process_data; + memset (&process_data, 0, sizeof process_data); + + process_data.flags = RSPAMD_EXPRESSION_FLAG_NOOPT; + process_data.cd = cd; + + rc = rspamd_process_expression (comp->expr, &process_data); /* Checked bit */ setbit (cd->checked, comp->id * 2); diff --git a/src/libutil/expression.c b/src/libutil/expression.c index 21a137f43..2469d0415 100644 --- a/src/libutil/expression.c +++ b/src/libutil/expression.c @@ -984,8 +984,8 @@ rspamd_ast_do_op (struct rspamd_expression_elt *elt, gdouble val, } static gdouble -rspamd_ast_process_node (struct rspamd_expression *expr, gint flags, GNode *node, - gpointer data, GPtrArray *track) +rspamd_ast_process_node (struct rspamd_expression *expr, GNode *node, + struct rspamd_expr_process_data *process_data) { struct rspamd_expression_elt *elt, *celt, *parelt = NULL; GNode *cld; @@ -1010,13 +1010,13 @@ rspamd_ast_process_node (struct rspamd_expression *expr, gint flags, GNode *node t1 = rspamd_get_ticks (TRUE); } - elt->value = expr->subr->process (data, elt->p.atom); + elt->value = expr->subr->process (process_data, elt->p.atom); if (fabs (elt->value) > 1e-9) { elt->p.atom->hits ++; - if (track) { - g_ptr_array_add (track, elt->p.atom); + if (process_data->trace) { + g_ptr_array_add (process_data->trace, elt->p.atom); } } @@ -1057,7 +1057,7 @@ rspamd_ast_process_node (struct rspamd_expression *expr, gint flags, GNode *node continue; } - val = rspamd_ast_process_node (expr, flags, cld, data, track); + val = rspamd_ast_process_node (expr, cld, process_data); if (isnan (acc)) { acc = rspamd_ast_do_op (elt, val, 0, lim, TRUE); @@ -1066,7 +1066,7 @@ rspamd_ast_process_node (struct rspamd_expression *expr, gint flags, GNode *node acc = rspamd_ast_do_op (elt, val, acc, lim, FALSE); } - if (!(flags & RSPAMD_EXPRESSION_FLAG_NOOPT)) { + if (!(process_data->flags & RSPAMD_EXPRESSION_FLAG_NOOPT)) { if (rspamd_ast_node_done (elt, parelt, acc, lim)) { return acc; } @@ -1090,8 +1090,7 @@ rspamd_ast_cleanup_traverse (GNode *n, gpointer d) } gdouble -rspamd_process_expression_track (struct rspamd_expression *expr, gint flags, - gpointer data, GPtrArray *track) +rspamd_process_expression_track (struct rspamd_expression *expr, struct rspamd_expr_process_data *process_data) { gdouble ret = 0; @@ -1099,7 +1098,7 @@ rspamd_process_expression_track (struct rspamd_expression *expr, gint flags, /* Ensure that stack is empty at this point */ g_assert (expr->expression_stack->len == 0); - ret = rspamd_ast_process_node (expr, flags, expr->ast, data, track); + ret = rspamd_ast_process_node (expr, expr->ast, process_data); /* Cleanup */ g_node_traverse (expr->ast, G_IN_ORDER, G_TRAVERSE_ALL, -1, @@ -1124,10 +1123,9 @@ rspamd_process_expression_track (struct rspamd_expression *expr, gint flags, } gdouble -rspamd_process_expression (struct rspamd_expression *expr, gint flags, - gpointer data) +rspamd_process_expression (struct rspamd_expression *expr, struct rspamd_expr_process_data *process_data) { - return rspamd_process_expression_track (expr, flags, data, NULL); + return rspamd_process_expression_track (expr, process_data); } static gboolean diff --git a/src/libutil/expression.h b/src/libutil/expression.h index fefde2974..7f7cb2dda 100644 --- a/src/libutil/expression.h +++ b/src/libutil/expression.h @@ -56,12 +56,24 @@ typedef struct rspamd_expression_atom_s { gint priority; } rspamd_expression_atom_t; +struct rspamd_expr_process_data { + /* Current Lua state to run atom processing */ + struct lua_State *L; + /* Parameter of lua-function processing atom*/ + gint stack_item; + gint flags; + /* != NULL if trace is collected */ + GPtrArray *trace; + struct composites_data *cd; + struct rspamd_task *task; +}; + struct rspamd_atom_subr { /* Parses atom from string and returns atom structure */ rspamd_expression_atom_t * (*parse)(const gchar *line, gsize len, rspamd_mempool_t *pool, gpointer ud, GError **err); /* Process atom via the opaque pointer (e.g. struct rspamd_task *) */ - gdouble (*process) (gpointer input, rspamd_expression_atom_t *atom); + gdouble (*process) (struct rspamd_expr_process_data *process_data, rspamd_expression_atom_t *atom); /* Calculates the relative priority of the expression */ gint (*priority) (rspamd_expression_atom_t *atom); void (*destroy) (rspamd_expression_atom_t *atom); @@ -92,8 +104,8 @@ gboolean rspamd_parse_expression (const gchar *line, gsize len, * @param data opaque data pointer for all the atoms * @return the value of expression */ -gdouble rspamd_process_expression (struct rspamd_expression *expr, gint flags, - gpointer data); +gdouble rspamd_process_expression (struct rspamd_expression *expr, + struct rspamd_expr_process_data *process_data); /** * Process the expression and return its value using atom 'process' functions with the specified data pointer. @@ -103,8 +115,8 @@ gdouble rspamd_process_expression (struct rspamd_expression *expr, gint flags, * @param track pointer array to atoms tracking * @return the value of expression */ -gdouble rspamd_process_expression_track (struct rspamd_expression *expr, gint flags, - gpointer data, GPtrArray *track); +gdouble rspamd_process_expression_track (struct rspamd_expression *expr, + struct rspamd_expr_process_data *process_data); /** * Shows string representation of an expression diff --git a/src/lua/CMakeLists.txt b/src/lua/CMakeLists.txt index 9c561c0e4..ffc4b27ca 100644 --- a/src/lua/CMakeLists.txt +++ b/src/lua/CMakeLists.txt @@ -25,6 +25,7 @@ SET(LUASRC ${CMAKE_CURRENT_SOURCE_DIR}/lua_common.c ${CMAKE_CURRENT_SOURCE_DIR}/lua_fann.c ${CMAKE_CURRENT_SOURCE_DIR}/lua_sqlite3.c ${CMAKE_CURRENT_SOURCE_DIR}/lua_cryptobox.c - ${CMAKE_CURRENT_SOURCE_DIR}/lua_map.c) + ${CMAKE_CURRENT_SOURCE_DIR}/lua_map.c + ${CMAKE_CURRENT_SOURCE_DIR}/lua_thread_pool.c) SET(RSPAMD_LUA ${LUASRC} PARENT_SCOPE)
\ No newline at end of file diff --git a/src/lua/lua_config.c b/src/lua/lua_config.c index d7af3956f..3a38d437b 100644 --- a/src/lua/lua_config.c +++ b/src/lua/lua_config.c @@ -19,6 +19,7 @@ #include "libserver/composites.h" #include "libmime/lang_detection.h" #include "lua/lua_map.h" +#include "lua/lua_thread_pool.h" #include "utlist.h" #include <math.h> @@ -1042,6 +1043,8 @@ struct lua_callback_data { gint ref; } callback; gboolean cb_is_ref; + gpointer thread_entry; + gint stack_level; gint order; }; @@ -1191,42 +1194,71 @@ lua_watcher_callback (gpointer session_data, gpointer ud) } static void +lua_metric_symbol_callback_return (struct rspamd_task *task, gpointer ud, gint ret); + +static void lua_metric_symbol_callback (struct rspamd_task *task, gpointer ud) { struct lua_callback_data *cd = ud; struct rspamd_task **ptask; - gint level = lua_gettop (cd->L), nresults, err_idx, ret; - lua_State *L = cd->L; - GString *tb; - struct rspamd_symbol_result *s; + gint ret; - lua_pushcfunction (L, &rspamd_lua_traceback); - err_idx = lua_gettop (L); + struct thread_entry *thread_entry = lua_thread_pool_get (task->cfg->lua_thread_pool); + cd->thread_entry = thread_entry; - level ++; + lua_State *thread = thread_entry->lua_state; + cd->stack_level = lua_gettop (cd->L); if (cd->cb_is_ref) { - lua_rawgeti (L, LUA_REGISTRYINDEX, cd->callback.ref); + lua_rawgeti (thread, LUA_REGISTRYINDEX, cd->callback.ref); } else { - lua_getglobal (L, cd->callback.name); + lua_getglobal (thread, cd->callback.name); } - ptask = lua_newuserdata (L, sizeof (struct rspamd_task *)); - rspamd_lua_setclass (L, "rspamd{task}", -1); + ptask = lua_newuserdata (thread, sizeof (struct rspamd_task *)); + rspamd_lua_setclass (thread, "rspamd{task}", -1); *ptask = task; - if ((ret = lua_pcall (L, 1, LUA_MULTRET, err_idx)) != 0) { - tb = lua_touserdata (L, -1); + ret = lua_resume (thread, 1); + + if (ret != LUA_YIELD) { + /* + LUA_YIELD state should not be handled here. + It should only happen when the thread initiated a asynchronous event and it will be restored as soon + the event is finished + */ + lua_metric_symbol_callback_return (task, ud, ret); + } +} + +static void +lua_metric_symbol_callback_return (struct rspamd_task *task, gpointer ud, gint ret) +{ + GString *tb; + struct lua_callback_data *cd = ud; + int nresults; + struct rspamd_symbol_result *s; + struct thread_entry *thread_entry = cd->thread_entry; + lua_State *thread = thread_entry->lua_state; + + if (ret != 0) { + lua_pushcfunction (thread, rspamd_lua_traceback); + lua_call (thread, 0, LUA_MULTRET); + + tb = lua_touserdata (thread, -1); msg_err_task ("call to (%s) failed (%d): %v", cd->symbol, ret, tb); if (tb) { g_string_free (tb, TRUE); - lua_pop (L, 1); + lua_pop (thread, 1); } + g_assert (lua_gettop (thread) >= cd->stack_level); + // maybe there is a way to recover here. For now, just remove faulty thread + lua_thread_pool_terminate_entry (task->cfg->lua_thread_pool, cd->thread_entry); } else { - nresults = lua_gettop (L) - level; + nresults = lua_gettop (thread) - cd->stack_level; if (nresults >= 1) { /* Function returned boolean, so maybe we need to insert result? */ @@ -1236,16 +1268,16 @@ lua_metric_symbol_callback (struct rspamd_task *task, gpointer ud) gint type; struct lua_watcher_data *wd; - type = lua_type (cd->L, level + 1); + type = lua_type (thread, cd->stack_level + 1); if (type == LUA_TBOOLEAN) { - res = lua_toboolean (L, level + 1); + res = lua_toboolean (thread, cd->stack_level + 1); } else if (type == LUA_TFUNCTION) { /* Function returned a closure that should be watched for */ wd = rspamd_mempool_alloc (task->task_pool, sizeof (*wd)); - lua_pushvalue (cd->L, level + 1); - wd->cb_ref = luaL_ref (L, LUA_REGISTRYINDEX); + lua_pushvalue (thread /*cd->L*/, cd->stack_level + 1); + wd->cb_ref = luaL_ref (thread, LUA_REGISTRYINDEX); wd->cbd = cd; rspamd_session_watcher_push_callback (task->s, rspamd_session_get_watcher (task->s), @@ -1258,14 +1290,14 @@ lua_metric_symbol_callback (struct rspamd_task *task, gpointer ud) rspamd_session_get_watcher (task->s)); } else { - res = lua_tonumber (L, level + 1); + res = lua_tonumber (thread, cd->stack_level + 1); } if (res) { gint first_opt = 2; - if (lua_type (L, level + 2) == LUA_TNUMBER) { - flag = lua_tonumber (L, level + 2); + if (lua_type (thread, cd->stack_level + 2) == LUA_TNUMBER) { + flag = lua_tonumber (thread, cd->stack_level + 2); /* Shift opt index */ first_opt = 3; } @@ -1276,35 +1308,40 @@ lua_metric_symbol_callback (struct rspamd_task *task, gpointer ud) s = rspamd_task_insert_result (task, cd->symbol, flag, NULL); if (s) { - guint last_pos = lua_gettop (L); + guint last_pos = lua_gettop (thread); - for (i = level + first_opt; i <= last_pos; i++) { - if (lua_type (L, i) == LUA_TSTRING) { - const char *opt = lua_tostring (L, i); + for (i = cd->stack_level + first_opt; i <= last_pos; i++) { + if (lua_type (thread, i) == LUA_TSTRING) { + const char *opt = lua_tostring (thread, i); rspamd_task_add_result_option (task, s, opt); } - else if (lua_type (L, i) == LUA_TTABLE) { - lua_pushvalue (L, i); + else if (lua_type (thread, i) == LUA_TTABLE) { + lua_pushvalue (thread, i); - for (lua_pushnil (L); lua_next (L, -2); lua_pop (L, 1)) { - const char *opt = lua_tostring (L, -1); + for (lua_pushnil (thread); lua_next (thread, -2); lua_pop (thread, 1)) { + const char *opt = lua_tostring (thread, -1); rspamd_task_add_result_option (task, s, opt); } - lua_pop (L, 1); + lua_pop (thread, 1); } } } } - lua_pop (L, nresults); + lua_pop (thread, nresults); } + + g_assert (lua_gettop (thread) == cd->stack_level); /* we properly cleaned up the stack */ + + lua_thread_pool_return(task->cfg->lua_thread_pool, cd->thread_entry); } - lua_pop (L, 1); /* Error function */ + cd->thread_entry = NULL; + cd->stack_level = 0; } static gint diff --git a/src/lua/lua_expression.c b/src/lua/lua_expression.c index 03a667b8d..0d57a3bd8 100644 --- a/src/lua/lua_expression.c +++ b/src/lua/lua_expression.c @@ -98,7 +98,7 @@ static const struct luaL_reg exprlib_f[] = { static rspamd_expression_atom_t * lua_atom_parse (const gchar *line, gsize len, rspamd_mempool_t *pool, gpointer ud, GError **err); -static gdouble lua_atom_process (gpointer input, rspamd_expression_atom_t *atom); +static gdouble lua_atom_process (struct rspamd_expr_process_data *process_data, rspamd_expression_atom_t *atom); static const struct rspamd_atom_subr lua_atom_subr = { .parse = lua_atom_parse, @@ -166,22 +166,22 @@ lua_atom_parse (const gchar *line, gsize len, } static gdouble -lua_atom_process (gpointer input, rspamd_expression_atom_t *atom) +lua_atom_process (struct rspamd_expr_process_data *process_data, rspamd_expression_atom_t *atom) { struct lua_expression *e = (struct lua_expression *)atom->data; gdouble ret = 0; - lua_rawgeti (e->L, LUA_REGISTRYINDEX, e->process_idx); - lua_pushlstring (e->L, atom->str, atom->len); - lua_pushvalue (e->L, GPOINTER_TO_INT (input)); + lua_rawgeti (process_data->L, LUA_REGISTRYINDEX, e->process_idx); + lua_pushlstring (process_data->L, atom->str, atom->len); + lua_pushvalue (process_data->L, process_data->stack_item); - if (lua_pcall (e->L, 2, 1, 0) != 0) { - msg_info ("callback call failed: %s", lua_tostring (e->L, -1)); - lua_pop (e->L, 1); + if (lua_pcall (process_data->L, 2, 1, 0) != 0) { + msg_info ("callback call failed: %s", lua_tostring (process_data->L, -1)); + lua_pop (process_data->L, 1); } else { - ret = lua_tonumber (e->L, -1); - lua_pop (e->L, 1); + ret = lua_tonumber (process_data->L, -1); + lua_pop (process_data->L, 1); } return ret; @@ -195,11 +195,16 @@ lua_expr_process (lua_State *L) gdouble res; gint flags = 0; + struct rspamd_expr_process_data process_data; + memset (&process_data, 0, sizeof process_data); + process_data.L = L; + process_data.stack_item = 2; + if (lua_gettop (L) >= 3) { - flags = lua_tonumber (L, 3); + process_data.flags = flags; } - res = rspamd_process_expression (e->expr, flags, GINT_TO_POINTER (2)); + res = rspamd_process_expression (e->expr, &process_data); lua_pushnumber (L, res); @@ -214,29 +219,36 @@ lua_expr_process_traced (lua_State *L) rspamd_expression_atom_t *atom; gint res; guint i; - gint flags = 0; - GPtrArray *trace; + struct rspamd_expr_process_data process_data; + memset (&process_data, 0, sizeof process_data); + + process_data.L = L; + /* + * stack:1 - self + * stack:2 - data, see process_traced() definition for details + */ + process_data.stack_item = 2; if (lua_gettop (L) >= 3) { - flags = lua_tonumber (L, 3); + process_data.flags = lua_tonumber (L, 3); } - trace = g_ptr_array_sized_new (32); - res = rspamd_process_expression_track (e->expr, flags, GINT_TO_POINTER (2), - trace); + process_data.trace = g_ptr_array_sized_new (32); + + res = rspamd_process_expression_track (e->expr, &process_data); lua_pushnumber (L, res); - lua_createtable (L, trace->len, 0); + lua_createtable (L, process_data.trace->len, 0); - for (i = 0; i < trace->len; i ++) { - atom = g_ptr_array_index (trace, i); + for (i = 0; i < process_data.trace->len; i ++) { + atom = g_ptr_array_index (process_data.trace, i); lua_pushlstring (L, atom->str, atom->len); lua_rawseti (L, -2, i + 1); } - g_ptr_array_free (trace, TRUE); + g_ptr_array_free (process_data.trace, TRUE); return 2; } diff --git a/src/lua/lua_thread_pool.c b/src/lua/lua_thread_pool.c new file mode 100644 index 000000000..07364f270 --- /dev/null +++ b/src/lua/lua_thread_pool.c @@ -0,0 +1,128 @@ +#include "config.h" + +#include "lua_common.h" +#include "lua_thread_pool.h" + +struct lua_thread_pool { + GQueue *available_items; + lua_State *L; + gint max_items; + struct thread_entry *running_entry; +}; + +static struct thread_entry * +thread_entry_new (lua_State * L) +{ + struct thread_entry *ent; + ent = g_malloc (sizeof *ent); + ent->lua_state = lua_newthread (L); + ent->thread_index = luaL_ref (L, LUA_REGISTRYINDEX); + + return ent; +} + +static void +thread_entry_free (lua_State * L, struct thread_entry *ent) +{ + luaL_unref (L, LUA_REGISTRYINDEX, ent->thread_index); + g_free (ent); +} + +struct lua_thread_pool * +lua_thread_pool_new (lua_State * L) +{ + struct lua_thread_pool * pool = g_new0 (struct lua_thread_pool, 1); + + pool->L = L; + pool->max_items = 100; + + pool->available_items = g_queue_new (); + int i; + + struct thread_entry *ent; + for (i = 0; i < MAX(2, pool->max_items / 10); i ++) { + ent = thread_entry_new (pool->L); + g_queue_push_head (pool->available_items, ent); + } + + return pool; +} + +void +lua_thread_pool_free (struct lua_thread_pool *pool) +{ + struct thread_entry *ent = NULL; + while (!g_queue_is_empty (pool->available_items)) { + ent = g_queue_pop_head (pool->available_items); + thread_entry_free (pool->L, ent); + } + g_queue_free (pool->available_items); + g_free (pool); +} + +struct thread_entry * +lua_thread_pool_get(struct lua_thread_pool *pool) +{ + gpointer cur; + struct thread_entry *ent = NULL; + + cur = g_queue_pop_head (pool->available_items); + + if (cur) { + ent = cur; + } + else { + ent = thread_entry_new (pool->L); + } + + return ent; +} + +void +lua_thread_pool_return(struct lua_thread_pool *pool, struct thread_entry *thread_entry) +{ + g_assert (lua_status (thread_entry->lua_state) == 0); /* we can't return a running/yielded thread into the pool */ + + if (pool->running_entry == thread_entry) { + pool->running_entry = NULL; + } + + if (g_queue_get_length (pool->available_items) <= pool->max_items) { + g_queue_push_head (pool->available_items, thread_entry); + } + else { + thread_entry_free (pool->L, thread_entry); + } +} + +void +lua_thread_pool_terminate_entry(struct lua_thread_pool *pool, struct thread_entry *thread_entry) +{ + struct thread_entry *ent = NULL; + + /* we should only terminate failed threads */ + g_assert (lua_status (thread_entry->lua_state) != 0 && lua_status (thread_entry->lua_state) != LUA_YIELD); + + if (pool->running_entry == thread_entry) { + pool->running_entry = NULL; + } + + thread_entry_free (pool->L, thread_entry); + + if (g_queue_get_length (pool->available_items) <= pool->max_items) { + ent = thread_entry_new (pool->L); + g_queue_push_head (pool->available_items, ent); + } +} + +struct thread_entry * +lua_thread_pool_get_running_entry(struct lua_thread_pool *pool) +{ + return pool->running_entry; +} + +void +lua_thread_pool_set_running_entry(struct lua_thread_pool *pool, struct thread_entry *thread_entry) +{ + pool->running_entry = thread_entry; +} diff --git a/src/lua/lua_thread_pool.h b/src/lua/lua_thread_pool.h new file mode 100644 index 000000000..33a0da879 --- /dev/null +++ b/src/lua/lua_thread_pool.h @@ -0,0 +1,81 @@ +#ifndef LUA_THREAD_POOL_H_ +#define LUA_THREAD_POOL_H_ + +#include <lua.h> + +struct thread_entry { + lua_State *lua_state; + gint thread_index; +}; + +struct thread_pool; + +/** + * Allocates new thread pool on state L. Pre-creates number of lua-threads to use later on + * + * @param L + * @return + */ +struct lua_thread_pool * +lua_thread_pool_new (lua_State * L); + +/** + * Destroys the pool + * @param pool + */ +void +lua_thread_pool_free (struct lua_thread_pool *pool); + +/** + * Extracts a thread from the list of available ones. + * It immediately becames running one and should be used to run a Lua script/function straight away. + * as soon as the code is finished, it should be either returned into list of available threads by + * calling lua_thread_pool_return() or terminated by calling lua_thread_pool_terminate_entry() + * if the code finished with error. + * + * If the code performed YIELD, the thread is still running and it's live should be controlled by the callee + * + * @param pool + * @return + */ +struct thread_entry * +lua_thread_pool_get(struct lua_thread_pool *pool); + +/** + * Return thread into the list of available ones. It can't be done with yielded or dead threads. + * + * @param pool + * @param thread_entry + */ +void +lua_thread_pool_return(struct lua_thread_pool *pool, struct thread_entry *thread_entry); + +/** + * Removes thread from Lua state. It should be done to dead (which ended with an error) threads only + * + * @param pool + * @param thread_entry + */ +void +lua_thread_pool_terminate_entry(struct lua_thread_pool *pool, struct thread_entry *thread_entry); + +/** + * Currently running thread. Typically needed in yielding point - to fill-up continuation. + * + * @param pool + * @return + */ +struct thread_entry * +lua_thread_pool_get_running_entry(struct lua_thread_pool *pool); + +/** + * Updates currently running thread + * + * @param pool + * @param thread_entry + */ +void +lua_thread_pool_set_running_entry(struct lua_thread_pool *pool, struct thread_entry *thread_entry); + +#endif /* LUA_THREAD_POOL_H_ */ + diff --git a/src/plugins/regexp.c b/src/plugins/regexp.c index 915305aa3..92cccc338 100644 --- a/src/plugins/regexp.c +++ b/src/plugins/regexp.c @@ -433,7 +433,12 @@ process_regexp_item (struct rspamd_task *task, void *user_data) else { /* Process expression */ if (item->expr) { - res = rspamd_process_expression (item->expr, 0, task); + struct rspamd_expr_process_data process_data; + memset (&process_data, 0, sizeof process_data); + + process_data.task = task; + + res = rspamd_process_expression (item->expr, &process_data); } else { msg_warn_task ("FIXME: %s symbol is broken with new expressions", |