aboutsummaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
authorVsevolod Stakhov <vsevolod@highsecure.ru>2018-08-20 11:57:04 +0100
committerGitHub <noreply@github.com>2018-08-20 11:57:04 +0100
commit816987a0b25fd8e9915a0dc1f2f20c968c2177d4 (patch)
tree2729860f7cd12a4d8b86c917b5b215be83914c79 /src
parent2bc77d7a877940589c2ae78b7521a39a5e5be97f (diff)
parentf9606fe254533f1cb493230b5ab4ee075cd550f6 (diff)
downloadrspamd-816987a0b25fd8e9915a0dc1f2f20c968c2177d4.tar.gz
rspamd-816987a0b25fd8e9915a0dc1f2f20c968c2177d4.zip
Merge pull request #2406 from negram/lua-coroutine-model
[Project] coroutine threaded model for API calls: thread pool
Diffstat (limited to 'src')
-rw-r--r--src/libmime/mime_expressions.c6
-rw-r--r--src/libserver/cfg_file.h1
-rw-r--r--src/libserver/cfg_utils.c3
-rw-r--r--src/libserver/composites.c15
-rw-r--r--src/libutil/expression.c24
-rw-r--r--src/libutil/expression.h22
-rw-r--r--src/lua/CMakeLists.txt6
-rw-r--r--src/lua/lua_common.c17
-rw-r--r--src/lua/lua_common.h28
-rw-r--r--src/lua/lua_config.c140
-rw-r--r--src/lua/lua_dns.c597
-rw-r--r--src/lua/lua_dns_resolver.c631
-rw-r--r--src/lua/lua_dns_resolver.h16
-rw-r--r--src/lua/lua_expression.c56
-rw-r--r--src/lua/lua_http.c61
-rw-r--r--src/lua/lua_redis.c41
-rw-r--r--src/lua/lua_tcp.c82
-rw-r--r--src/lua/lua_thread_pool.c148
-rw-r--r--src/lua/lua_thread_pool.h106
-rw-r--r--src/plugins/lua/multimap.lua30
-rw-r--r--src/plugins/lua/reputation.lua69
-rw-r--r--src/plugins/regexp.c7
22 files changed, 1387 insertions, 719 deletions
diff --git a/src/libmime/mime_expressions.c b/src/libmime/mime_expressions.c
index f9bfdc1bf..518b9a390 100644
--- a/src/libmime/mime_expressions.c
+++ b/src/libmime/mime_expressions.c
@@ -83,7 +83,7 @@ static gboolean rspamd_is_empty_body (struct rspamd_task *task,
static rspamd_expression_atom_t * rspamd_mime_expr_parse (const gchar *line, gsize len,
rspamd_mempool_t *pool, gpointer ud, GError **err);
-static gdouble rspamd_mime_expr_process (gpointer input, rspamd_expression_atom_t *atom);
+static gdouble rspamd_mime_expr_process (struct rspamd_expr_process_data *process_data, rspamd_expression_atom_t *atom);
static gint rspamd_mime_expr_priority (rspamd_expression_atom_t *atom);
static void rspamd_mime_expr_destroy (rspamd_expression_atom_t *atom);
@@ -913,9 +913,9 @@ rspamd_mime_expr_process_function (struct rspamd_function_atom * func,
}
static gdouble
-rspamd_mime_expr_process (gpointer input, rspamd_expression_atom_t *atom)
+rspamd_mime_expr_process (struct rspamd_expr_process_data *process_data, rspamd_expression_atom_t *atom)
{
- struct rspamd_task *task = input;
+ struct rspamd_task *task = process_data->task;
struct rspamd_mime_atom *mime_atom;
lua_State *L;
gdouble ret = 0;
diff --git a/src/libserver/cfg_file.h b/src/libserver/cfg_file.h
index 75b404530..c583766c4 100644
--- a/src/libserver/cfg_file.h
+++ b/src/libserver/cfg_file.h
@@ -380,6 +380,7 @@ struct rspamd_config {
gchar * checksum; /**< real checksum of config file */
gchar * dump_checksum; /**< dump checksum of config file */
gpointer lua_state; /**< pointer to lua state */
+ gpointer lua_thread_pool; /**< pointer to lua thread (coroutine) pool */
gchar * rrd_file; /**< rrd file to store statistics */
gchar * history_file; /**< file to save rolling history */
diff --git a/src/libserver/cfg_utils.c b/src/libserver/cfg_utils.c
index b7b9dfdee..016556912 100644
--- a/src/libserver/cfg_utils.c
+++ b/src/libserver/cfg_utils.c
@@ -20,6 +20,7 @@
#include "uthash_strcase.h"
#include "filter.h"
#include "lua/lua_common.h"
+#include "lua/lua_thread_pool.h"
#include "map.h"
#include "map_helpers.h"
#include "map_private.h"
@@ -175,6 +176,7 @@ rspamd_config_new (enum rspamd_config_init_flags flags)
if (!(flags & RSPAMD_CONFIG_INIT_SKIP_LUA)) {
cfg->lua_state = rspamd_lua_init ();
cfg->own_lua_state = TRUE;
+ cfg->lua_thread_pool = lua_thread_pool_new (cfg->lua_state);
}
cfg->cache = rspamd_symbols_cache_new (cfg);
@@ -259,6 +261,7 @@ rspamd_config_free (struct rspamd_config *cfg)
g_ptr_array_free (cfg->c_modules, TRUE);
if (cfg->lua_state && cfg->own_lua_state) {
+ lua_thread_pool_free (cfg->lua_thread_pool);
lua_close (cfg->lua_state);
}
REF_RELEASE (cfg->libs_ctx);
diff --git a/src/libserver/composites.c b/src/libserver/composites.c
index 8f3cb179d..83f3a35d4 100644
--- a/src/libserver/composites.c
+++ b/src/libserver/composites.c
@@ -66,7 +66,7 @@ struct symbol_remove_data {
static rspamd_expression_atom_t * rspamd_composite_expr_parse (const gchar *line, gsize len,
rspamd_mempool_t *pool, gpointer ud, GError **err);
-static gdouble rspamd_composite_expr_process (gpointer input, rspamd_expression_atom_t *atom);
+static gdouble rspamd_composite_expr_process (struct rspamd_expr_process_data *process_data, rspamd_expression_atom_t *atom);
static gint rspamd_composite_expr_priority (rspamd_expression_atom_t *atom);
static void rspamd_composite_expr_destroy (rspamd_expression_atom_t *atom);
static void composites_foreach_callback (gpointer key, gpointer value, void *data);
@@ -173,9 +173,9 @@ rspamd_composite_process_single_symbol (struct composites_data *cd,
}
static gdouble
-rspamd_composite_expr_process (gpointer input, rspamd_expression_atom_t *atom)
+rspamd_composite_expr_process (struct rspamd_expr_process_data *process_data, rspamd_expression_atom_t *atom)
{
- struct composites_data *cd = (struct composites_data *)input;
+ struct composites_data *cd = process_data->cd;
const gchar *beg = atom->data, *sym = NULL;
gchar t;
struct symbol_remove_data *rd, *nrd;
@@ -344,8 +344,13 @@ composites_foreach_callback (gpointer key, gpointer value, void *data)
return;
}
- rc = rspamd_process_expression (comp->expr,
- RSPAMD_EXPRESSION_FLAG_NOOPT, cd);
+ struct rspamd_expr_process_data process_data;
+ memset (&process_data, 0, sizeof process_data);
+
+ process_data.flags = RSPAMD_EXPRESSION_FLAG_NOOPT;
+ process_data.cd = cd;
+
+ rc = rspamd_process_expression (comp->expr, &process_data);
/* Checked bit */
setbit (cd->checked, comp->id * 2);
diff --git a/src/libutil/expression.c b/src/libutil/expression.c
index 21a137f43..2469d0415 100644
--- a/src/libutil/expression.c
+++ b/src/libutil/expression.c
@@ -984,8 +984,8 @@ rspamd_ast_do_op (struct rspamd_expression_elt *elt, gdouble val,
}
static gdouble
-rspamd_ast_process_node (struct rspamd_expression *expr, gint flags, GNode *node,
- gpointer data, GPtrArray *track)
+rspamd_ast_process_node (struct rspamd_expression *expr, GNode *node,
+ struct rspamd_expr_process_data *process_data)
{
struct rspamd_expression_elt *elt, *celt, *parelt = NULL;
GNode *cld;
@@ -1010,13 +1010,13 @@ rspamd_ast_process_node (struct rspamd_expression *expr, gint flags, GNode *node
t1 = rspamd_get_ticks (TRUE);
}
- elt->value = expr->subr->process (data, elt->p.atom);
+ elt->value = expr->subr->process (process_data, elt->p.atom);
if (fabs (elt->value) > 1e-9) {
elt->p.atom->hits ++;
- if (track) {
- g_ptr_array_add (track, elt->p.atom);
+ if (process_data->trace) {
+ g_ptr_array_add (process_data->trace, elt->p.atom);
}
}
@@ -1057,7 +1057,7 @@ rspamd_ast_process_node (struct rspamd_expression *expr, gint flags, GNode *node
continue;
}
- val = rspamd_ast_process_node (expr, flags, cld, data, track);
+ val = rspamd_ast_process_node (expr, cld, process_data);
if (isnan (acc)) {
acc = rspamd_ast_do_op (elt, val, 0, lim, TRUE);
@@ -1066,7 +1066,7 @@ rspamd_ast_process_node (struct rspamd_expression *expr, gint flags, GNode *node
acc = rspamd_ast_do_op (elt, val, acc, lim, FALSE);
}
- if (!(flags & RSPAMD_EXPRESSION_FLAG_NOOPT)) {
+ if (!(process_data->flags & RSPAMD_EXPRESSION_FLAG_NOOPT)) {
if (rspamd_ast_node_done (elt, parelt, acc, lim)) {
return acc;
}
@@ -1090,8 +1090,7 @@ rspamd_ast_cleanup_traverse (GNode *n, gpointer d)
}
gdouble
-rspamd_process_expression_track (struct rspamd_expression *expr, gint flags,
- gpointer data, GPtrArray *track)
+rspamd_process_expression_track (struct rspamd_expression *expr, struct rspamd_expr_process_data *process_data)
{
gdouble ret = 0;
@@ -1099,7 +1098,7 @@ rspamd_process_expression_track (struct rspamd_expression *expr, gint flags,
/* Ensure that stack is empty at this point */
g_assert (expr->expression_stack->len == 0);
- ret = rspamd_ast_process_node (expr, flags, expr->ast, data, track);
+ ret = rspamd_ast_process_node (expr, expr->ast, process_data);
/* Cleanup */
g_node_traverse (expr->ast, G_IN_ORDER, G_TRAVERSE_ALL, -1,
@@ -1124,10 +1123,9 @@ rspamd_process_expression_track (struct rspamd_expression *expr, gint flags,
}
gdouble
-rspamd_process_expression (struct rspamd_expression *expr, gint flags,
- gpointer data)
+rspamd_process_expression (struct rspamd_expression *expr, struct rspamd_expr_process_data *process_data)
{
- return rspamd_process_expression_track (expr, flags, data, NULL);
+ return rspamd_process_expression_track (expr, process_data);
}
static gboolean
diff --git a/src/libutil/expression.h b/src/libutil/expression.h
index fefde2974..7f7cb2dda 100644
--- a/src/libutil/expression.h
+++ b/src/libutil/expression.h
@@ -56,12 +56,24 @@ typedef struct rspamd_expression_atom_s {
gint priority;
} rspamd_expression_atom_t;
+struct rspamd_expr_process_data {
+ /* Current Lua state to run atom processing */
+ struct lua_State *L;
+ /* Parameter of lua-function processing atom*/
+ gint stack_item;
+ gint flags;
+ /* != NULL if trace is collected */
+ GPtrArray *trace;
+ struct composites_data *cd;
+ struct rspamd_task *task;
+};
+
struct rspamd_atom_subr {
/* Parses atom from string and returns atom structure */
rspamd_expression_atom_t * (*parse)(const gchar *line, gsize len,
rspamd_mempool_t *pool, gpointer ud, GError **err);
/* Process atom via the opaque pointer (e.g. struct rspamd_task *) */
- gdouble (*process) (gpointer input, rspamd_expression_atom_t *atom);
+ gdouble (*process) (struct rspamd_expr_process_data *process_data, rspamd_expression_atom_t *atom);
/* Calculates the relative priority of the expression */
gint (*priority) (rspamd_expression_atom_t *atom);
void (*destroy) (rspamd_expression_atom_t *atom);
@@ -92,8 +104,8 @@ gboolean rspamd_parse_expression (const gchar *line, gsize len,
* @param data opaque data pointer for all the atoms
* @return the value of expression
*/
-gdouble rspamd_process_expression (struct rspamd_expression *expr, gint flags,
- gpointer data);
+gdouble rspamd_process_expression (struct rspamd_expression *expr,
+ struct rspamd_expr_process_data *process_data);
/**
* Process the expression and return its value using atom 'process' functions with the specified data pointer.
@@ -103,8 +115,8 @@ gdouble rspamd_process_expression (struct rspamd_expression *expr, gint flags,
* @param track pointer array to atoms tracking
* @return the value of expression
*/
-gdouble rspamd_process_expression_track (struct rspamd_expression *expr, gint flags,
- gpointer data, GPtrArray *track);
+gdouble rspamd_process_expression_track (struct rspamd_expression *expr,
+ struct rspamd_expr_process_data *process_data);
/**
* Shows string representation of an expression
diff --git a/src/lua/CMakeLists.txt b/src/lua/CMakeLists.txt
index 9c561c0e4..0d22027ce 100644
--- a/src/lua/CMakeLists.txt
+++ b/src/lua/CMakeLists.txt
@@ -12,7 +12,7 @@ SET(LUASRC ${CMAKE_CURRENT_SOURCE_DIR}/lua_common.c
${CMAKE_CURRENT_SOURCE_DIR}/lua_redis.c
${CMAKE_CURRENT_SOURCE_DIR}/lua_upstream.c
${CMAKE_CURRENT_SOURCE_DIR}/lua_mempool.c
- ${CMAKE_CURRENT_SOURCE_DIR}/lua_dns.c
+ ${CMAKE_CURRENT_SOURCE_DIR}/lua_dns_resolver.c
${CMAKE_CURRENT_SOURCE_DIR}/lua_rsa.c
${CMAKE_CURRENT_SOURCE_DIR}/lua_ip.c
${CMAKE_CURRENT_SOURCE_DIR}/lua_expression.c
@@ -25,6 +25,8 @@ SET(LUASRC ${CMAKE_CURRENT_SOURCE_DIR}/lua_common.c
${CMAKE_CURRENT_SOURCE_DIR}/lua_fann.c
${CMAKE_CURRENT_SOURCE_DIR}/lua_sqlite3.c
${CMAKE_CURRENT_SOURCE_DIR}/lua_cryptobox.c
- ${CMAKE_CURRENT_SOURCE_DIR}/lua_map.c)
+ ${CMAKE_CURRENT_SOURCE_DIR}/lua_map.c
+ ${CMAKE_CURRENT_SOURCE_DIR}/lua_thread_pool.c
+ ${CMAKE_CURRENT_SOURCE_DIR}/lua_dns.c)
SET(RSPAMD_LUA ${LUASRC} PARENT_SCOPE) \ No newline at end of file
diff --git a/src/lua/lua_common.c b/src/lua/lua_common.c
index 63a95f177..f446ec9e8 100644
--- a/src/lua/lua_common.c
+++ b/src/lua/lua_common.c
@@ -740,6 +740,7 @@ rspamd_lua_init ()
luaopen_fann (L);
luaopen_sqlite3 (L);
luaopen_cryptobox (L);
+ luaopen_dns (L);
luaL_newmetatable (L, "rspamd{ev_base}");
lua_pushstring (L, "class");
@@ -1494,14 +1495,26 @@ rspamd_lua_traceback (lua_State *L)
{
GString *tb;
+
+ tb = rspamd_lua_get_traceback_string (L);
+
+ lua_pushlightuserdata (L, tb);
+
+ return 1;
+}
+
+GString *
+rspamd_lua_get_traceback_string (lua_State *L)
+{
+ GString *tb;
const gchar *msg = lua_tostring (L, 1);
tb = g_string_sized_new (100);
g_string_append_printf (tb, "%s; trace:", msg);
+
rspamd_lua_traceback_string (L, tb);
- lua_pushlightuserdata (L, tb);
- return 1;
+ return tb;
}
guint
diff --git a/src/lua/lua_common.h b/src/lua/lua_common.h
index 913ebcb60..0a9527bb0 100644
--- a/src/lua/lua_common.h
+++ b/src/lua/lua_common.h
@@ -280,6 +280,7 @@ void luaopen_html (lua_State * L);
void luaopen_fann (lua_State *L);
void luaopen_sqlite3 (lua_State *L);
void luaopen_cryptobox (lua_State *L);
+void luaopen_dns (lua_State *L);
void rspamd_lua_dostring (const gchar *line);
@@ -337,6 +338,14 @@ gboolean rspamd_lua_parse_table_arguments (lua_State *L, gint pos,
gint rspamd_lua_traceback (lua_State *L);
/**
+ * Returns stack trace as a string. Caller should clear memory.
+ * @param L
+ * @return
+ */
+GString *
+rspamd_lua_get_traceback_string (lua_State *L);
+
+/**
* Returns size of table at position `tbl_pos`
*/
guint rspamd_lua_table_size (lua_State *L, gint tbl_pos);
@@ -408,6 +417,25 @@ void rspamd_lua_add_ref_dtor (lua_State *L, rspamd_mempool_t *pool,
gboolean rspamd_lua_require_function (lua_State *L, const gchar *modname,
const gchar *funcname);
+struct thread_entry;
+/**
+ * Yields thread. should be only called in return statement
+ * @param thread_entry
+ * @param nresults
+ * @return
+ */
+gint
+lua_yield_thread (struct thread_entry *thread_entry, gint nresults);
+
+/**
+ * Resumes suspended by lua_yield_thread () thread
+ * @param task
+ * @param thread_entry
+ * @param narg
+ */
+void
+lua_resume_thread (struct rspamd_task *task, struct thread_entry *thread_entry, gint narg);
+
/* Paths defs */
#define RSPAMD_CONFDIR_INDEX "CONFDIR"
#define RSPAMD_RUNDIR_INDEX "RUNDIR"
diff --git a/src/lua/lua_config.c b/src/lua/lua_config.c
index 815b09e63..6957ded7c 100644
--- a/src/lua/lua_config.c
+++ b/src/lua/lua_config.c
@@ -19,6 +19,7 @@
#include "libserver/composites.h"
#include "libmime/lang_detection.h"
#include "lua/lua_map.h"
+#include "lua/lua_thread_pool.h"
#include "utlist.h"
#include <math.h>
@@ -1042,6 +1043,7 @@ struct lua_callback_data {
gint ref;
} callback;
gboolean cb_is_ref;
+ gint stack_level;
gint order;
};
@@ -1190,43 +1192,111 @@ lua_watcher_callback (gpointer session_data, gpointer ud)
lua_settop (L, err_idx - 1);
}
+gint
+lua_do_resume (lua_State *L, gint narg)
+{
+#if LUA_VERSION_NUM < 503
+ return lua_resume (L, narg);
+#else
+ return lua_resume (L, NULL, narg);
+#endif
+}
+
+static void
+lua_metric_symbol_callback_return (struct rspamd_task *task, struct thread_entry *thread_entry, gpointer ud, gint ret);
+
static void
lua_metric_symbol_callback (struct rspamd_task *task, gpointer ud)
{
struct lua_callback_data *cd = ud;
struct rspamd_task **ptask;
- gint level = lua_gettop (cd->L), nresults, err_idx, ret;
- lua_State *L = cd->L;
- GString *tb;
- struct rspamd_symbol_result *s;
+ struct thread_entry *thread_entry;
+ gint ret;
- lua_pushcfunction (L, &rspamd_lua_traceback);
- err_idx = lua_gettop (L);
+ thread_entry = lua_thread_pool_get (task->cfg->lua_thread_pool);
- level ++;
+ g_assert(thread_entry->cd == NULL);
+ thread_entry->cd = cd;
+
+ lua_State *thread = thread_entry->lua_state;
+ cd->stack_level = lua_gettop (cd->L);
if (cd->cb_is_ref) {
- lua_rawgeti (L, LUA_REGISTRYINDEX, cd->callback.ref);
+ lua_rawgeti (thread, LUA_REGISTRYINDEX, cd->callback.ref);
}
else {
- lua_getglobal (L, cd->callback.name);
+ lua_getglobal (thread, cd->callback.name);
}
- ptask = lua_newuserdata (L, sizeof (struct rspamd_task *));
- rspamd_lua_setclass (L, "rspamd{task}", -1);
+ ptask = lua_newuserdata (thread, sizeof (struct rspamd_task *));
+ rspamd_lua_setclass (thread, "rspamd{task}", -1);
*ptask = task;
- if ((ret = lua_pcall (L, 1, LUA_MULTRET, err_idx)) != 0) {
- tb = lua_touserdata (L, -1);
+ ret = lua_do_resume (thread, 1);
+
+ if (ret != LUA_YIELD) {
+ /*
+ LUA_YIELD state should not be handled here.
+ It should only happen when the thread initiated a asynchronous event and it will be restored as soon
+ the event is finished
+ */
+ lua_metric_symbol_callback_return (task, thread_entry, ud, ret);
+ }
+}
+
+gint
+lua_yield_thread (struct thread_entry *thread_entry, gint nresults)
+{
+ g_assert (thread_entry->cd != NULL);
+
+ return lua_yield (thread_entry->lua_state, nresults);
+}
+
+void
+lua_resume_thread (struct rspamd_task *task, struct thread_entry *thread_entry, gint narg)
+{
+ g_assert (thread_entry->cd != NULL);
+
+ /*
+ * The only state where we can resume from is LUA_YIELD
+ * Another acceptable status is OK (0) but in that case we should push function on stack
+ * to start the thread from, which is happening in lua_metric_symbol_callback(), not in this function.
+ */
+ g_assert (lua_status (thread_entry->lua_state) == LUA_YIELD);
+
+ gint ret;
+
+ lua_thread_pool_set_running_entry (task->cfg->lua_thread_pool, thread_entry);
+ ret = lua_do_resume (thread_entry->lua_state, narg);
+
+ if (ret != LUA_YIELD) {
+ lua_metric_symbol_callback_return (task, thread_entry, thread_entry->cd, ret);
+ }
+}
+
+static void
+lua_metric_symbol_callback_return (struct rspamd_task *task, struct thread_entry *thread_entry, gpointer ud, gint ret)
+{
+ GString *tb;
+ struct lua_callback_data *cd = ud;
+ int nresults;
+ struct rspamd_symbol_result *s;
+ lua_State *thread = thread_entry->lua_state;
+
+ if (ret != 0) {
+
+ tb = rspamd_lua_get_traceback_string (thread);
msg_err_task ("call to (%s) failed (%d): %v", cd->symbol, ret, tb);
if (tb) {
g_string_free (tb, TRUE);
- lua_pop (L, 1);
}
+ g_assert (lua_gettop (thread) >= cd->stack_level);
+ /* maybe there is a way to recover here. For now, just remove faulty thread */
+ lua_thread_pool_terminate_entry (task->cfg->lua_thread_pool, thread_entry);
}
else {
- nresults = lua_gettop (L) - level;
+ nresults = lua_gettop (thread) - cd->stack_level;
if (nresults >= 1) {
/* Function returned boolean, so maybe we need to insert result? */
@@ -1236,16 +1306,16 @@ lua_metric_symbol_callback (struct rspamd_task *task, gpointer ud)
gint type;
struct lua_watcher_data *wd;
- type = lua_type (cd->L, level + 1);
+ type = lua_type (thread, cd->stack_level + 1);
if (type == LUA_TBOOLEAN) {
- res = lua_toboolean (L, level + 1);
+ res = lua_toboolean (thread, cd->stack_level + 1);
}
else if (type == LUA_TFUNCTION) {
/* Function returned a closure that should be watched for */
wd = rspamd_mempool_alloc (task->task_pool, sizeof (*wd));
- lua_pushvalue (cd->L, level + 1);
- wd->cb_ref = luaL_ref (L, LUA_REGISTRYINDEX);
+ lua_pushvalue (thread /*cd->L*/, cd->stack_level + 1);
+ wd->cb_ref = luaL_ref (thread, LUA_REGISTRYINDEX);
wd->cbd = cd;
rspamd_session_watcher_push_callback (task->s,
rspamd_session_get_watcher (task->s),
@@ -1258,14 +1328,14 @@ lua_metric_symbol_callback (struct rspamd_task *task, gpointer ud)
rspamd_session_get_watcher (task->s));
}
else {
- res = lua_tonumber (L, level + 1);
+ res = lua_tonumber (thread, cd->stack_level + 1);
}
if (res) {
gint first_opt = 2;
- if (lua_type (L, level + 2) == LUA_TNUMBER) {
- flag = lua_tonumber (L, level + 2);
+ if (lua_type (thread, cd->stack_level + 2) == LUA_TNUMBER) {
+ flag = lua_tonumber (thread, cd->stack_level + 2);
/* Shift opt index */
first_opt = 3;
}
@@ -1276,35 +1346,39 @@ lua_metric_symbol_callback (struct rspamd_task *task, gpointer ud)
s = rspamd_task_insert_result (task, cd->symbol, flag, NULL);
if (s) {
- guint last_pos = lua_gettop (L);
+ guint last_pos = lua_gettop (thread);
- for (i = level + first_opt; i <= last_pos; i++) {
- if (lua_type (L, i) == LUA_TSTRING) {
- const char *opt = lua_tostring (L, i);
+ for (i = cd->stack_level + first_opt; i <= last_pos; i++) {
+ if (lua_type (thread, i) == LUA_TSTRING) {
+ const char *opt = lua_tostring (thread, i);
rspamd_task_add_result_option (task, s, opt);
}
- else if (lua_type (L, i) == LUA_TTABLE) {
- lua_pushvalue (L, i);
+ else if (lua_type (thread, i) == LUA_TTABLE) {
+ lua_pushvalue (thread, i);
- for (lua_pushnil (L); lua_next (L, -2); lua_pop (L, 1)) {
- const char *opt = lua_tostring (L, -1);
+ for (lua_pushnil (thread); lua_next (thread, -2); lua_pop (thread, 1)) {
+ const char *opt = lua_tostring (thread, -1);
rspamd_task_add_result_option (task, s, opt);
}
- lua_pop (L, 1);
+ lua_pop (thread, 1);
}
}
}
}
- lua_pop (L, nresults);
+ lua_pop (thread, nresults);
}
+
+ g_assert (lua_gettop (thread) == cd->stack_level); /* we properly cleaned up the stack */
+
+ lua_thread_pool_return (task->cfg->lua_thread_pool, thread_entry);
}
- lua_pop (L, 1); /* Error function */
+ cd->stack_level = 0;
}
static gint
diff --git a/src/lua/lua_dns.c b/src/lua/lua_dns.c
index 389b9d4f1..0d12a137c 100644
--- a/src/lua/lua_dns.c
+++ b/src/lua/lua_dns.c
@@ -1,6 +1,4 @@
/*-
- * Copyright 2016 Vsevolod Stakhov
- *
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
@@ -14,301 +12,45 @@
* limitations under the License.
*/
#include "lua_common.h"
-#include "utlist.h"
-
-
-/***
- * @module rspamd_resolver
- * This module allows to resolve DNS names from LUA code. All resolving is executed
- * asynchronously. Here is an example of name resolution:
- * @example
-local function symbol_callback(task)
- local host = 'example.com'
+#include "lua_dns_resolver.h"
+#include "lua_thread_pool.h"
- local function dns_cb(resolver, to_resolve, results, err, _, authenticated)
- if not results then
- rspamd_logger.infox('DNS resolving of %1 failed: %2', host, err)
- return
- end
- for _,r in ipairs(results) do
- -- r is of type rspamd{ip} here, but it can be converted to string
- rspamd_logger.infox('Resolved %1 to %2', host, tostring(r))
- end
- end
-
- task:get_resolver():resolve_a(task:get_session(), task:get_mempool(),
- host, dns_cb)
-end
- */
-struct rspamd_dns_resolver * lua_check_dns_resolver (lua_State * L);
-void luaopen_dns_resolver (lua_State * L);
+LUA_FUNCTION_DEF (dns, request);
-/* Lua bindings */
-LUA_FUNCTION_DEF (dns_resolver, init);
-LUA_FUNCTION_DEF (dns_resolver, resolve_a);
-LUA_FUNCTION_DEF (dns_resolver, resolve_ptr);
-LUA_FUNCTION_DEF (dns_resolver, resolve_txt);
-LUA_FUNCTION_DEF (dns_resolver, resolve_mx);
-LUA_FUNCTION_DEF (dns_resolver, resolve_ns);
-LUA_FUNCTION_DEF (dns_resolver, resolve);
-
-static const struct luaL_reg dns_resolverlib_f[] = {
- LUA_INTERFACE_DEF (dns_resolver, init),
- {NULL, NULL}
-};
-
-static const struct luaL_reg dns_resolverlib_m[] = {
- LUA_INTERFACE_DEF (dns_resolver, resolve_a),
- LUA_INTERFACE_DEF (dns_resolver, resolve_ptr),
- LUA_INTERFACE_DEF (dns_resolver, resolve_txt),
- LUA_INTERFACE_DEF (dns_resolver, resolve_mx),
- LUA_INTERFACE_DEF (dns_resolver, resolve_ns),
- LUA_INTERFACE_DEF (dns_resolver, resolve),
- {"__tostring", rspamd_lua_class_tostring},
- {NULL, NULL}
+static const struct luaL_reg dns_f[] = {
+ LUA_INTERFACE_DEF (dns, request),
+ {"__tostring", rspamd_lua_class_tostring},
+ {NULL, NULL}
};
-struct rspamd_dns_resolver *
-lua_check_dns_resolver (lua_State * L)
-{
- void *ud = rspamd_lua_check_udata (L, 1, "rspamd{resolver}");
- luaL_argcheck (L, ud != NULL, 1, "'resolver' expected");
- return ud ? *((struct rspamd_dns_resolver **)ud) : NULL;
-}
+void
+lua_dns_callback (struct rdns_reply *reply, void *arg);
-struct lua_dns_cbdata {
- lua_State *L;
+struct lua_rspamd_dns_cbdata {
+ struct thread_entry *thread;
+ struct rspamd_task *task;
struct rspamd_dns_resolver *resolver;
- gint cbref;
- const gchar *to_resolve;
- const gchar *user_str;
struct rspamd_async_watcher *w;
struct rspamd_async_session *s;
};
-static int
-lua_dns_get_type (lua_State *L, int argno)
-{
- int type = RDNS_REQUEST_A;
- const gchar *strtype;
-
- if (lua_type (L, argno) != LUA_TSTRING) {
- lua_pushvalue (L, argno);
- lua_gettable (L, lua_upvalueindex (1));
-
- type = lua_tonumber (L, -1);
- lua_pop (L, 1);
- if (type == 0) {
- rspamd_lua_typerror (L, argno, "dns_request_type");
- }
- }
- else {
- strtype = lua_tostring (L, argno);
-
- if (g_ascii_strcasecmp (strtype, "a") == 0) {
- type = RDNS_REQUEST_A;
- }
- else if (g_ascii_strcasecmp (strtype, "aaaa") == 0) {
- type = RDNS_REQUEST_AAAA;
- }
- else if (g_ascii_strcasecmp (strtype, "mx") == 0) {
- type = RDNS_REQUEST_MX;
- }
- else if (g_ascii_strcasecmp (strtype, "txt") == 0) {
- type = RDNS_REQUEST_TXT;
- }
- else if (g_ascii_strcasecmp (strtype, "ptr") == 0) {
- type = RDNS_REQUEST_PTR;
- }
- else if (g_ascii_strcasecmp (strtype, "soa") == 0) {
- type = RDNS_REQUEST_SOA;
- }
- else {
- msg_err ("bad DNS type: %s", strtype);
- }
- }
-
- return type;
-}
-
-static void
-lua_dns_callback (struct rdns_reply *reply, gpointer arg)
-{
- struct lua_dns_cbdata *cd = arg;
- gint i = 0, naddrs = 0;
- struct rspamd_dns_resolver **presolver;
- struct rdns_reply_entry *elt;
- rspamd_inet_addr_t *addr;
-
- lua_rawgeti (cd->L, LUA_REGISTRYINDEX, cd->cbref);
- presolver = lua_newuserdata (cd->L, sizeof (gpointer));
- rspamd_lua_setclass (cd->L, "rspamd{resolver}", -1);
-
- *presolver = cd->resolver;
- lua_pushstring (cd->L, cd->to_resolve);
-
- /*
- * XXX: rework to handle different request types
- */
- if (reply->code == RDNS_RC_NOERROR) {
- LL_FOREACH (reply->entries, elt) {
- naddrs ++;
- }
-
- lua_createtable (cd->L, naddrs, 0);
-
- LL_FOREACH (reply->entries, elt)
- {
- switch (elt->type) {
- case RDNS_REQUEST_A:
- addr = rspamd_inet_address_new (AF_INET, &elt->content.a.addr);
- rspamd_lua_ip_push (cd->L, addr);
- rspamd_inet_address_free (addr);
- lua_rawseti (cd->L, -2, ++i);
- break;
- case RDNS_REQUEST_AAAA:
- addr = rspamd_inet_address_new (AF_INET6, &elt->content.aaa.addr);
- rspamd_lua_ip_push (cd->L, addr);
- rspamd_inet_address_free (addr);
- lua_rawseti (cd->L, -2, ++i);
- break;
- case RDNS_REQUEST_NS:
- lua_pushstring (cd->L, elt->content.ns.name);
- lua_rawseti (cd->L, -2, ++i);
- break;
- case RDNS_REQUEST_PTR:
- lua_pushstring (cd->L, elt->content.ptr.name);
- lua_rawseti (cd->L, -2, ++i);
- break;
- case RDNS_REQUEST_TXT:
- case RDNS_REQUEST_SPF:
- lua_pushstring (cd->L, elt->content.txt.data);
- lua_rawseti (cd->L, -2, ++i);
- break;
- case RDNS_REQUEST_MX:
- /* mx['name'], mx['priority'] */
- lua_createtable (cd->L, 0, 2);
- rspamd_lua_table_set (cd->L, "name", elt->content.mx.name);
- lua_pushstring (cd->L, "priority");
- lua_pushinteger (cd->L, elt->content.mx.priority);
- lua_settable (cd->L, -3);
-
- lua_rawseti (cd->L, -2, ++i);
- break;
- case RDNS_REQUEST_SOA:
- lua_createtable (cd->L, 0, 7);
- rspamd_lua_table_set (cd->L, "ns", elt->content.soa.mname);
- rspamd_lua_table_set (cd->L, "contact", elt->content.soa.admin);
- lua_pushstring (cd->L, "serial");
- lua_pushinteger (cd->L, elt->content.soa.serial);
- lua_settable (cd->L, -3);
- lua_pushstring (cd->L, "refresh");
- lua_pushinteger (cd->L, elt->content.soa.refresh);
- lua_settable (cd->L, -3);
- lua_pushstring (cd->L, "retry");
- lua_pushinteger (cd->L, elt->content.soa.retry);
- lua_settable (cd->L, -3);
- lua_pushstring (cd->L, "expiry");
- lua_pushinteger (cd->L, elt->content.soa.expire);
- lua_settable (cd->L, -3);
- /* Negative TTL */
- lua_pushstring (cd->L, "nx");
- lua_pushinteger (cd->L, elt->content.soa.minimum);
- lua_settable (cd->L, -3);
-
- lua_rawseti (cd->L, -2, ++i);
- break;
- }
- }
- lua_pushnil (cd->L);
- }
- else {
- lua_pushnil (cd->L);
- lua_pushstring (cd->L, rdns_strerror (reply->code));
- }
-
- if (cd->user_str != NULL) {
- lua_pushstring (cd->L, cd->user_str);
- }
- else {
- lua_pushnil (cd->L);
- }
-
- lua_pushboolean (cd->L, reply->authenticated);
-
- if (lua_pcall (cd->L, 6, 0, 0) != 0) {
- msg_info ("call to dns callback failed: %s", lua_tostring (cd->L, -1));
- lua_pop (cd->L, 1);
- }
-
- /* Unref function */
- luaL_unref (cd->L, LUA_REGISTRYINDEX, cd->cbref);
-
- if (cd->s) {
- rspamd_session_watcher_pop (cd->s, cd->w);
- }
-}
-
-/***
- * @function rspamd_resolver.init(ev_base, config)
- * @param {event_base} ev_base event base used for asynchronous events
- * @param {rspamd_config} config rspamd configuration parameters
- * @return {rspamd_resolver} new resolver object associated with the specified base
- */
-static int
-lua_dns_resolver_init (lua_State *L)
-{
- struct rspamd_dns_resolver *resolver, **presolver;
- struct rspamd_config *cfg, **pcfg;
- struct event_base *base, **pbase;
-
- /* Check args */
- pbase = rspamd_lua_check_udata (L, 1, "rspamd{ev_base}");
- luaL_argcheck (L, pbase != NULL, 1, "'ev_base' expected");
- base = pbase ? *(pbase) : NULL;
- pcfg = rspamd_lua_check_udata (L, 2, "rspamd{config}");
- luaL_argcheck (L, pcfg != NULL, 2, "'config' expected");
- cfg = pcfg ? *(pcfg) : NULL;
-
- if (base != NULL && cfg != NULL) {
- resolver = dns_resolver_init (NULL, base, cfg);
- if (resolver) {
- presolver = lua_newuserdata (L, sizeof (gpointer));
- rspamd_lua_setclass (L, "rspamd{resolver}", -1);
- *presolver = resolver;
- }
- else {
- lua_pushnil (L);
- }
- }
- else {
- lua_pushnil (L);
- }
-
- return 1;
-}
-
-static int
-lua_dns_resolver_resolve_common (lua_State *L,
- struct rspamd_dns_resolver *resolver,
- enum rdns_request_type type,
- int first)
+static gint
+lua_dns_request (lua_State *L)
{
- LUA_TRACE_POINT;
+ GError *err = NULL;
struct rspamd_async_session *session = NULL;
- rspamd_mempool_t *pool = NULL;
- const gchar *to_resolve = NULL, *user_str = NULL;
- struct lua_dns_cbdata *cbdata;
- gint cbref = -1, ret;
+ struct lua_rspamd_dns_cbdata *cbdata = NULL;
+ const gchar *to_resolve = NULL;
+ const gchar *type_str = NULL;
struct rspamd_task *task = NULL;
- GError *err = NULL;
+ rspamd_mempool_t *pool = NULL;
+ gint ret = 0;
gboolean forced = FALSE;
/* Check arguments */
- if (!rspamd_lua_parse_table_arguments (L, first, &err,
- "session=U{session};mempool=U{mempool};*name=S;*callback=F;"
- "option=S;task=U{task};forced=B",
- &session, &pool, &to_resolve, &cbref, &user_str, &task, &forced)) {
+ if (!rspamd_lua_parse_table_arguments (L, 1, &err,
+ "*name=S;*task=U{task};*type=S;forced=B",
+ &to_resolve, &task, &type_str, &forced)) {
if (err) {
ret = luaL_error (L, "invalid arguments: %s", err->message);
@@ -321,283 +63,108 @@ lua_dns_resolver_resolve_common (lua_State *L,
}
if (task) {
- pool = task->task_pool;
session = task->s;
+ pool = task->task_pool;
+ }
+ else {
+ return luaL_error (L, "invalid arguments: either task or session/config should be set");
}
- if (pool != NULL && to_resolve != NULL && cbref != -1) {
- cbdata = rspamd_mempool_alloc0 (pool, sizeof (struct lua_dns_cbdata));
- cbdata->L = L;
- cbdata->resolver = resolver;
- cbdata->cbref = cbref;
- cbdata->user_str = rspamd_mempool_strdup (pool, user_str);
-
- if (type != RDNS_REQUEST_PTR) {
- cbdata->to_resolve = rspamd_mempool_strdup (pool, to_resolve);
- }
- else {
- char *ptr_str;
+ enum rdns_request_type type = rdns_type_fromstr (type_str);
- ptr_str = rdns_generate_ptr_from_str (to_resolve);
+ if (type == RDNS_REQUEST_INVALID) {
+ return luaL_error (L, "invalid arguments: this record type is not supported");
+ }
- if (ptr_str == NULL) {
- msg_err_task_check ("wrong resolve string to PTR request: %s",
- to_resolve);
- lua_pushnil (L);
+ cbdata = rspamd_mempool_alloc0 (pool, sizeof (*cbdata));
- return 1;
- }
+ cbdata->task = task;
- cbdata->to_resolve = rspamd_mempool_strdup (pool, ptr_str);
- to_resolve = cbdata->to_resolve;
- free (ptr_str);
- }
+ if (type == RDNS_REQUEST_PTR) {
+ char *ptr_str;
- if (task == NULL) {
- if (make_dns_request (resolver,
- session,
- pool,
- lua_dns_callback,
- cbdata,
- type,
- to_resolve)) {
+ ptr_str = rdns_generate_ptr_from_str (to_resolve);
- lua_pushboolean (L, TRUE);
+ if (ptr_str == NULL) {
+ msg_err_task_check ("wrong resolve string to PTR request: %s",
+ to_resolve);
+ lua_pushnil (L);
- if (session) {
- cbdata->s = session;
- cbdata->w = rspamd_session_get_watcher (session);
- rspamd_session_watcher_push (session);
- }
- }
- else {
- lua_pushnil (L);
- }
+ return 1;
}
- else {
- if (forced) {
- ret = make_dns_request_task_forced (task,
- lua_dns_callback,
- cbdata,
- type,
- to_resolve);
- }
- else {
- ret = make_dns_request_task (task,
- lua_dns_callback,
- cbdata,
- type,
- to_resolve);
- }
- if (ret) {
- lua_pushboolean (L, TRUE);
- cbdata->s = session;
- cbdata->w = rspamd_session_get_watcher (session);
- rspamd_session_watcher_push (session);
- }
- else {
- lua_pushnil (L);
- }
- }
+ to_resolve = rspamd_mempool_strdup (pool, ptr_str);
+ free (ptr_str);
}
- else {
- return luaL_error (L, "invalid arguments to lua_resolve");
- }
-
- return 1;
-}
-
-/***
- * @method resolver:resolve_a(session, pool, host, callback)
- * Resolve A record for a specified host.
- * @param {async_session} session asynchronous session normally associated with rspamd task (`task:get_session()`)
- * @param {mempool} pool memory pool for storing intermediate data
- * @param {string} host name to resolve
- * @param {function} callback callback function to be called upon name resolution is finished; must be of type `function (resolver, to_resolve, results, err)`
- * @return {boolean} `true` if DNS request has been scheduled
- */
-static int
-lua_dns_resolver_resolve_a (lua_State *L)
-{
- struct rspamd_dns_resolver *dns_resolver = lua_check_dns_resolver (L);
-
- if (dns_resolver) {
- return lua_dns_resolver_resolve_common (L,
- dns_resolver,
- RDNS_REQUEST_A,
- 2);
+ if (forced) {
+ ret = make_dns_request_task_forced (task,
+ lua_dns_callback,
+ cbdata,
+ type,
+ to_resolve);
}
else {
- lua_pushnil (L);
+ ret = make_dns_request_task (task,
+ lua_dns_callback,
+ cbdata,
+ type,
+ to_resolve);
}
- return 1;
-}
-
-/***
- * @method resolver:resolve_ptr(session, pool, ip, callback)
- * Resolve PTR record for a specified host.
- * @param {async_session} session asynchronous session normally associated with rspamd task (`task:get_session()`)
- * @param {mempool} pool memory pool for storing intermediate data
- * @param {string} ip name to resolve in string form (e.g. '8.8.8.8' or '2001:dead::')
- * @param {function} callback callback function to be called upon name resolution is finished; must be of type `function (resolver, to_resolve, results, err)`
- * @return {boolean} `true` if DNS request has been scheduled
- */
-static int
-lua_dns_resolver_resolve_ptr (lua_State *L)
-{
- struct rspamd_dns_resolver *dns_resolver = lua_check_dns_resolver (L);
-
- if (dns_resolver) {
- return lua_dns_resolver_resolve_common (L,
- dns_resolver,
- RDNS_REQUEST_PTR,
- 2);
- }
- else {
- lua_pushnil (L);
- }
-
- return 1;
-}
-
-/***
- * @method resolver:resolve_txt(session, pool, host, callback)
- * Resolve TXT record for a specified host.
- * @param {async_session} session asynchronous session normally associated with rspamd task (`task:get_session()`)
- * @param {mempool} pool memory pool for storing intermediate data
- * @param {string} host name to get TXT record for
- * @param {function} callback callback function to be called upon name resolution is finished; must be of type `function (resolver, to_resolve, results, err)`
- * @return {boolean} `true` if DNS request has been scheduled
- */
-static int
-lua_dns_resolver_resolve_txt (lua_State *L)
-{
- struct rspamd_dns_resolver *dns_resolver = lua_check_dns_resolver (L);
-
- if (dns_resolver) {
- return lua_dns_resolver_resolve_common (L,
- dns_resolver,
- RDNS_REQUEST_TXT,
- 2);
+ if (ret) {
+ cbdata->thread = lua_thread_pool_get_running_entry (task->cfg->lua_thread_pool);
+ cbdata->s = session;
+ cbdata->w = rspamd_session_get_watcher (session);
+ rspamd_session_watcher_push (session);
+ return lua_yield_thread (cbdata->thread, 0);
}
else {
lua_pushnil (L);
+ return 1;
}
-
- return 1;
}
-/***
- * @method resolver:resolve_mx(session, pool, host, callback)
- * Resolve MX record for a specified host.
- * @param {async_session} session asynchronous session normally associated with rspamd task (`task:get_session()`)
- * @param {mempool} pool memory pool for storing intermediate data
- * @param {string} host name to get MX record for
- * @param {function} callback callback function to be called upon name resolution is finished; must be of type `function (resolver, to_resolve, results, err)`
- * @return {boolean} `true` if DNS request has been scheduled
- */
-static int
-lua_dns_resolver_resolve_mx (lua_State *L)
+void
+lua_dns_callback (struct rdns_reply *reply, void *arg)
{
- struct rspamd_dns_resolver *dns_resolver = lua_check_dns_resolver (L);
+ struct lua_rspamd_dns_cbdata *cbdata = arg;
+ lua_State *L = cbdata->thread->lua_state;
- if (dns_resolver) {
- return lua_dns_resolver_resolve_common (L,
- dns_resolver,
- RDNS_REQUEST_MX,
- 2);
+ if (reply->code != RDNS_RC_NOERROR) {
+ lua_pushboolean (L, false);
+ lua_pushstring (L, rdns_strerror (reply->code));
}
else {
- lua_pushnil (L);
- }
+ lua_push_dns_reply (L, reply);
- return 1;
-}
+ lua_pushboolean (L, reply->authenticated);
+ lua_setfield (L, -3, "authenticated");
-/***
- * @method resolver:resolve_ns(session, pool, host, callback)
- * Resolve NS records for a specified host.
- * @param {async_session} session asynchronous session normally associated with rspamd task (`task:get_session()`)
- * @param {mempool} pool memory pool for storing intermediate data
- * @param {string} host name to get NS records for
- * @param {function} callback callback function to be called upon name resolution is finished; must be of type `function (resolver, to_resolve, results, err)`
- * @return {boolean} `true` if DNS request has been scheduled
- */
-static int
-lua_dns_resolver_resolve_ns (lua_State *L)
-{
- struct rspamd_dns_resolver *dns_resolver = lua_check_dns_resolver (L);
-
- if (dns_resolver) {
- return lua_dns_resolver_resolve_common (L,
- dns_resolver,
- RDNS_REQUEST_NS,
- 2);
- }
- else {
- lua_pushnil (L);
+ /* result 1 - not and error */
+ lua_pushboolean (L, true);
+ /* push table into stack, result 2 - results itself */
+ lua_pushvalue (L, -3);
}
- return 1;
-}
-
-/* XXX: broken currently */
-static int
-lua_dns_resolver_resolve (lua_State *L)
-{
- struct rspamd_dns_resolver *dns_resolver = lua_check_dns_resolver (L);
- int type;
+ lua_resume_thread (cbdata->task, cbdata->thread, 2);
- type = lua_dns_get_type (L, 2);
-
- if (dns_resolver && type != 0) {
- return lua_dns_resolver_resolve_common (L, dns_resolver, type, 3);
- }
- else {
- lua_pushnil (L);
+ if (cbdata->s) {
+ rspamd_session_watcher_pop (cbdata->s, cbdata->w);
}
-
- return 1;
}
static gint
-lua_load_dns (lua_State * L)
+lua_load_dns (lua_State *L)
{
lua_newtable (L);
- luaL_register (L, NULL, dns_resolverlib_f);
+ luaL_register (L, NULL, dns_f);
return 1;
}
void
-luaopen_dns_resolver (lua_State * L)
+luaopen_dns (lua_State *L)
{
-
- luaL_newmetatable (L, "rspamd{resolver}");
- lua_pushstring (L, "__index");
- lua_pushvalue (L, -2);
- lua_settable (L, -3);
-
- lua_pushstring (L, "class");
- lua_pushstring (L, "rspamd{resolver}");
- lua_rawset (L, -3);
-
- {
- LUA_ENUM (L, DNS_A, RDNS_REQUEST_A);
- LUA_ENUM (L, DNS_PTR, RDNS_REQUEST_PTR);
- LUA_ENUM (L, DNS_MX, RDNS_REQUEST_MX);
- LUA_ENUM (L, DNS_TXT, RDNS_REQUEST_TXT);
- LUA_ENUM (L, DNS_SRV, RDNS_REQUEST_SRV);
- LUA_ENUM (L, DNS_SPF, RDNS_REQUEST_SPF);
- LUA_ENUM (L, DNS_AAAA, RDNS_REQUEST_AAAA);
- LUA_ENUM (L, DNS_SOA, RDNS_REQUEST_SOA);
- }
-
- luaL_register (L, NULL, dns_resolverlib_m);
- rspamd_lua_add_preload (L, "rspamd_resolver", lua_load_dns);
-
- lua_pop (L, 1); /* remove metatable from stack */
+ rspamd_lua_add_preload (L, "rspamd_dns", lua_load_dns);
}
diff --git a/src/lua/lua_dns_resolver.c b/src/lua/lua_dns_resolver.c
new file mode 100644
index 000000000..dc475f2ae
--- /dev/null
+++ b/src/lua/lua_dns_resolver.c
@@ -0,0 +1,631 @@
+/*-
+ * Copyright 2016 Vsevolod Stakhov
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "lua_common.h"
+#include "lua_thread_pool.h"
+#include "utlist.h"
+
+
+/***
+ * @module rspamd_resolver
+ * This module allows to resolve DNS names from LUA code. All resolving is executed
+ * asynchronously. Here is an example of name resolution:
+ * @example
+local function symbol_callback(task)
+ local host = 'example.com'
+
+ local function dns_cb(resolver, to_resolve, results, err, _, authenticated)
+ if not results then
+ rspamd_logger.infox('DNS resolving of %1 failed: %2', host, err)
+ return
+ end
+ for _,r in ipairs(results) do
+ -- r is of type rspamd{ip} here, but it can be converted to string
+ rspamd_logger.infox('Resolved %1 to %2', host, tostring(r))
+ end
+ end
+
+ task:get_resolver():resolve_a(task:get_session(), task:get_mempool(),
+ host, dns_cb)
+end
+ */
+struct rspamd_dns_resolver * lua_check_dns_resolver (lua_State * L);
+void luaopen_dns_resolver (lua_State * L);
+
+/* Lua bindings */
+LUA_FUNCTION_DEF (dns_resolver, init);
+LUA_FUNCTION_DEF (dns_resolver, resolve_a);
+LUA_FUNCTION_DEF (dns_resolver, resolve_ptr);
+LUA_FUNCTION_DEF (dns_resolver, resolve_txt);
+LUA_FUNCTION_DEF (dns_resolver, resolve_mx);
+LUA_FUNCTION_DEF (dns_resolver, resolve_ns);
+LUA_FUNCTION_DEF (dns_resolver, resolve);
+
+void lua_push_dns_reply (lua_State *L, const struct rdns_reply *reply);
+
+static const struct luaL_reg dns_resolverlib_f[] = {
+ LUA_INTERFACE_DEF (dns_resolver, init),
+ {NULL, NULL}
+};
+
+static const struct luaL_reg dns_resolverlib_m[] = {
+ LUA_INTERFACE_DEF (dns_resolver, resolve_a),
+ LUA_INTERFACE_DEF (dns_resolver, resolve_ptr),
+ LUA_INTERFACE_DEF (dns_resolver, resolve_txt),
+ LUA_INTERFACE_DEF (dns_resolver, resolve_mx),
+ LUA_INTERFACE_DEF (dns_resolver, resolve_ns),
+ LUA_INTERFACE_DEF (dns_resolver, resolve),
+ {"__tostring", rspamd_lua_class_tostring},
+ {NULL, NULL}
+};
+
+struct rspamd_dns_resolver *
+lua_check_dns_resolver (lua_State * L)
+{
+ void *ud = rspamd_lua_check_udata (L, 1, "rspamd{resolver}");
+ luaL_argcheck (L, ud != NULL, 1, "'resolver' expected");
+ return ud ? *((struct rspamd_dns_resolver **)ud) : NULL;
+}
+
+struct lua_dns_cbdata {
+ struct rspamd_task *task;
+ struct rspamd_dns_resolver *resolver;
+ gint cbref;
+ const gchar *to_resolve;
+ const gchar *user_str;
+ struct rspamd_async_watcher *w;
+ struct rspamd_async_session *s;
+};
+
+static int
+lua_dns_get_type (lua_State *L, int argno)
+{
+ int type = RDNS_REQUEST_A;
+ const gchar *strtype;
+
+ if (lua_type (L, argno) != LUA_TSTRING) {
+ lua_pushvalue (L, argno);
+ lua_gettable (L, lua_upvalueindex (1));
+
+ type = lua_tonumber (L, -1);
+ lua_pop (L, 1);
+ if (type == 0) {
+ rspamd_lua_typerror (L, argno, "dns_request_type");
+ }
+ }
+ else {
+ strtype = lua_tostring (L, argno);
+
+ if (g_ascii_strcasecmp (strtype, "a") == 0) {
+ type = RDNS_REQUEST_A;
+ }
+ else if (g_ascii_strcasecmp (strtype, "aaaa") == 0) {
+ type = RDNS_REQUEST_AAAA;
+ }
+ else if (g_ascii_strcasecmp (strtype, "mx") == 0) {
+ type = RDNS_REQUEST_MX;
+ }
+ else if (g_ascii_strcasecmp (strtype, "txt") == 0) {
+ type = RDNS_REQUEST_TXT;
+ }
+ else if (g_ascii_strcasecmp (strtype, "ptr") == 0) {
+ type = RDNS_REQUEST_PTR;
+ }
+ else if (g_ascii_strcasecmp (strtype, "soa") == 0) {
+ type = RDNS_REQUEST_SOA;
+ }
+ else {
+ msg_err ("bad DNS type: %s", strtype);
+ }
+ }
+
+ return type;
+}
+
+static void
+lua_dns_resolver_callback (struct rdns_reply *reply, gpointer arg)
+{
+ struct lua_dns_cbdata *cd = arg;
+ struct rspamd_dns_resolver **presolver;
+ lua_State *L;
+ struct lua_callback_state cbs;
+
+ lua_thread_pool_prepare_callback (cd->resolver->cfg->lua_thread_pool, &cbs);
+ L = cbs.L;
+
+ lua_rawgeti (L, LUA_REGISTRYINDEX, cd->cbref);
+
+ presolver = lua_newuserdata (L, sizeof (gpointer));
+ rspamd_lua_setclass (L, "rspamd{resolver}", -1);
+
+ *presolver = cd->resolver;
+ lua_pushstring (L, cd->to_resolve);
+
+ lua_push_dns_reply (L, reply);
+
+ /*
+ * 1 - resolver
+ * 2 - to_resolve
+ * 3 - entries | nil
+ * 4 - error | nil
+ * 5 - user_str
+ * 6 - reply->authenticated
+ */
+ if (reply->code != RDNS_RC_NOERROR) {
+ lua_pushnil (L);
+ lua_pushstring (L, rdns_strerror (reply->code));
+ }
+ if (cd->user_str != NULL) {
+ lua_pushstring (L, cd->user_str);
+ }
+ else {
+ lua_pushnil (L);
+ }
+
+ lua_pushboolean (L, reply->authenticated);
+
+ if (lua_pcall (L, 6, 0, 0) != 0) {
+ msg_info ("call to dns callback failed: %s", lua_tostring (L, -1));
+ lua_pop (L, 1);
+ }
+
+ /* Unref function */
+ luaL_unref (L, LUA_REGISTRYINDEX, cd->cbref);
+
+ lua_thread_pool_restore_callback (&cbs);
+
+ if (cd->s) {
+ rspamd_session_watcher_pop (cd->s, cd->w);
+ }
+}
+
+void
+lua_push_dns_reply (lua_State *L, const struct rdns_reply *reply)
+{
+ gint i = 0, naddrs = 0;
+ struct rdns_reply_entry *elt;
+ rspamd_inet_addr_t *addr;
+
+ if (reply->code == RDNS_RC_NOERROR) {
+ LL_FOREACH (reply->entries, elt) {
+ naddrs ++;
+ }
+
+ lua_createtable (L, naddrs, 0);
+
+ LL_FOREACH (reply->entries, elt)
+ {
+ switch (elt->type) {
+ case RDNS_REQUEST_A:
+ addr = rspamd_inet_address_new (AF_INET, &elt->content.a.addr);
+ rspamd_lua_ip_push (L, addr);
+ rspamd_inet_address_free (addr);
+ lua_rawseti (L, -2, ++i);
+ break;
+ case RDNS_REQUEST_AAAA:
+ addr = rspamd_inet_address_new (AF_INET6, &elt->content.aaa.addr);
+ rspamd_lua_ip_push (L, addr);
+ rspamd_inet_address_free (addr);
+ lua_rawseti (L, -2, ++i);
+ break;
+ case RDNS_REQUEST_NS:
+ lua_pushstring (L, elt->content.ns.name);
+ lua_rawseti (L, -2, ++i);
+ break;
+ case RDNS_REQUEST_PTR:
+ lua_pushstring (L, elt->content.ptr.name);
+ lua_rawseti (L, -2, ++i);
+ break;
+ case RDNS_REQUEST_TXT:
+ case RDNS_REQUEST_SPF:
+ lua_pushstring (L, elt->content.txt.data);
+ lua_rawseti (L, -2, ++i);
+ break;
+ case RDNS_REQUEST_MX:
+ /* mx['name'], mx['priority'] */
+ lua_createtable (L, 0, 2);
+ rspamd_lua_table_set (L, "name", elt->content.mx.name);
+ lua_pushstring (L, "priority");
+ lua_pushinteger (L, elt->content.mx.priority);
+ lua_settable (L, -3);
+
+ lua_rawseti (L, -2, ++i);
+ break;
+ case RDNS_REQUEST_SOA:
+ lua_createtable (L, 0, 7);
+ rspamd_lua_table_set (L, "ns", elt->content.soa.mname);
+ rspamd_lua_table_set (L, "contact", elt->content.soa.admin);
+ lua_pushstring (L, "serial");
+ lua_pushinteger (L, elt->content.soa.serial);
+ lua_settable (L, -3);
+ lua_pushstring (L, "refresh");
+ lua_pushinteger (L, elt->content.soa.refresh);
+ lua_settable (L, -3);
+ lua_pushstring (L, "retry");
+ lua_pushinteger (L, elt->content.soa.retry);
+ lua_settable (L, -3);
+ lua_pushstring (L, "expiry");
+ lua_pushinteger (L, elt->content.soa.expire);
+ lua_settable (L, -3);
+ /* Negative TTL */
+ lua_pushstring (L, "nx");
+ lua_pushinteger (L, elt->content.soa.minimum);
+ lua_settable (L, -3);
+
+ lua_rawseti (L, -2, ++i);
+ break;
+ }
+ }
+ lua_pushnil (L);
+ }
+}
+
+/***
+ * @function rspamd_resolver.init(ev_base, config)
+ * @param {event_base} ev_base event base used for asynchronous events
+ * @param {rspamd_config} config rspamd configuration parameters
+ * @return {rspamd_resolver} new resolver object associated with the specified base
+ */
+static int
+lua_dns_resolver_init (lua_State *L)
+{
+ struct rspamd_dns_resolver *resolver, **presolver;
+ struct rspamd_config *cfg, **pcfg;
+ struct event_base *base, **pbase;
+
+ /* Check args */
+ pbase = rspamd_lua_check_udata (L, 1, "rspamd{ev_base}");
+ luaL_argcheck (L, pbase != NULL, 1, "'ev_base' expected");
+ base = pbase ? *(pbase) : NULL;
+ pcfg = rspamd_lua_check_udata (L, 2, "rspamd{config}");
+ luaL_argcheck (L, pcfg != NULL, 2, "'config' expected");
+ cfg = pcfg ? *(pcfg) : NULL;
+
+ if (base != NULL && cfg != NULL) {
+ resolver = dns_resolver_init (NULL, base, cfg);
+ if (resolver) {
+ presolver = lua_newuserdata (L, sizeof (gpointer));
+ rspamd_lua_setclass (L, "rspamd{resolver}", -1);
+ *presolver = resolver;
+ }
+ else {
+ lua_pushnil (L);
+ }
+ }
+ else {
+ lua_pushnil (L);
+ }
+
+ return 1;
+}
+
+static int
+lua_dns_resolver_resolve_common (lua_State *L,
+ struct rspamd_dns_resolver *resolver,
+ enum rdns_request_type type,
+ int first)
+{
+ LUA_TRACE_POINT;
+ struct rspamd_async_session *session = NULL;
+ rspamd_mempool_t *pool = NULL;
+ const gchar *to_resolve = NULL, *user_str = NULL;
+ struct lua_dns_cbdata *cbdata;
+ gint cbref = -1, ret;
+ struct rspamd_task *task = NULL;
+ GError *err = NULL;
+ gboolean forced = FALSE;
+
+ /* Check arguments */
+ if (!rspamd_lua_parse_table_arguments (L, first, &err,
+ "session=U{session};mempool=U{mempool};*name=S;*callback=F;"
+ "option=S;task=U{task};forced=B",
+ &session, &pool, &to_resolve, &cbref, &user_str, &task, &forced)) {
+
+ if (err) {
+ ret = luaL_error (L, "invalid arguments: %s", err->message);
+ g_error_free (err);
+
+ return ret;
+ }
+
+ return luaL_error (L, "invalid arguments");
+ }
+
+ if (task) {
+ pool = task->task_pool;
+ session = task->s;
+ }
+ else if (!session || !pool) {
+ return luaL_error (L, "invalid arguments: either 'task' or 'session'/'mempool' should be set");
+ }
+
+ if (pool != NULL && to_resolve != NULL) {
+ cbdata = rspamd_mempool_alloc0 (pool, sizeof (struct lua_dns_cbdata));
+ cbdata->resolver = resolver;
+ cbdata->cbref = cbref;
+ cbdata->user_str = rspamd_mempool_strdup (pool, user_str);
+
+ if (type != RDNS_REQUEST_PTR) {
+ cbdata->to_resolve = rspamd_mempool_strdup (pool, to_resolve);
+ }
+ else {
+ char *ptr_str;
+
+ ptr_str = rdns_generate_ptr_from_str (to_resolve);
+
+ if (ptr_str == NULL) {
+ msg_err_task_check ("wrong resolve string to PTR request: %s",
+ to_resolve);
+ lua_pushnil (L);
+
+ return 1;
+ }
+
+ cbdata->to_resolve = rspamd_mempool_strdup (pool, ptr_str);
+ to_resolve = cbdata->to_resolve;
+ free (ptr_str);
+ }
+
+ if (task == NULL) {
+ if ( make_dns_request (resolver,
+ session,
+ pool,
+ lua_dns_resolver_callback,
+ cbdata,
+ type,
+ to_resolve)) {
+
+ lua_pushboolean (L, TRUE);
+
+ if (session) {
+ cbdata->s = session;
+ cbdata->w = rspamd_session_get_watcher (session);
+ rspamd_session_watcher_push (session);
+ }
+ }
+ else {
+ lua_pushnil (L);
+ }
+ }
+ else {
+ cbdata->task = task;
+
+ if (forced) {
+ ret = make_dns_request_task_forced (task,
+ lua_dns_resolver_callback,
+ cbdata,
+ type,
+ to_resolve);
+ }
+ else {
+ ret = make_dns_request_task (task,
+ lua_dns_resolver_callback,
+ cbdata,
+ type,
+ to_resolve);
+ }
+
+ if (ret) {
+ cbdata->s = session;
+ cbdata->w = rspamd_session_get_watcher (session);
+ rspamd_session_watcher_push (session);
+ /* callback was set up */
+ lua_pushboolean (L, TRUE);
+ }
+ else {
+ lua_pushnil (L);
+ }
+ }
+ }
+ else {
+ return luaL_error (L, "invalid arguments to lua_resolve");
+ }
+
+ return 1;
+
+}
+
+/***
+ * @method resolver:resolve_a(session, pool, host, callback)
+ * Resolve A record for a specified host.
+ * @param {async_session} session asynchronous session normally associated with rspamd task (`task:get_session()`)
+ * @param {mempool} pool memory pool for storing intermediate data
+ * @param {string} host name to resolve
+ * @param {function} callback callback function to be called upon name resolution is finished; must be of type `function (resolver, to_resolve, results, err)`
+ * @return {boolean} `true` if DNS request has been scheduled
+ */
+static int
+lua_dns_resolver_resolve_a (lua_State *L)
+{
+ struct rspamd_dns_resolver *dns_resolver = lua_check_dns_resolver (L);
+
+ if (dns_resolver) {
+ return lua_dns_resolver_resolve_common (L,
+ dns_resolver,
+ RDNS_REQUEST_A,
+ 2);
+ }
+ else {
+ lua_pushnil (L);
+ }
+
+ return 1;
+}
+
+/***
+ * @method resolver:resolve_ptr(session, pool, ip, callback)
+ * Resolve PTR record for a specified host.
+ * @param {async_session} session asynchronous session normally associated with rspamd task (`task:get_session()`)
+ * @param {mempool} pool memory pool for storing intermediate data
+ * @param {string} ip name to resolve in string form (e.g. '8.8.8.8' or '2001:dead::')
+ * @param {function} callback callback function to be called upon name resolution is finished; must be of type `function (resolver, to_resolve, results, err)`
+ * @return {boolean} `true` if DNS request has been scheduled
+ */
+static int
+lua_dns_resolver_resolve_ptr (lua_State *L)
+{
+ struct rspamd_dns_resolver *dns_resolver = lua_check_dns_resolver (L);
+
+ if (dns_resolver) {
+ return lua_dns_resolver_resolve_common (L,
+ dns_resolver,
+ RDNS_REQUEST_PTR,
+ 2);
+ }
+ else {
+ lua_pushnil (L);
+ }
+
+ return 1;
+}
+
+/***
+ * @method resolver:resolve_txt(session, pool, host, callback)
+ * Resolve TXT record for a specified host.
+ * @param {async_session} session asynchronous session normally associated with rspamd task (`task:get_session()`)
+ * @param {mempool} pool memory pool for storing intermediate data
+ * @param {string} host name to get TXT record for
+ * @param {function} callback callback function to be called upon name resolution is finished; must be of type `function (resolver, to_resolve, results, err)`
+ * @return {boolean} `true` if DNS request has been scheduled
+ */
+static int
+lua_dns_resolver_resolve_txt (lua_State *L)
+{
+ struct rspamd_dns_resolver *dns_resolver = lua_check_dns_resolver (L);
+
+ if (dns_resolver) {
+ return lua_dns_resolver_resolve_common (L,
+ dns_resolver,
+ RDNS_REQUEST_TXT,
+ 2);
+ }
+ else {
+ lua_pushnil (L);
+ }
+
+ return 1;
+}
+
+/***
+ * @method resolver:resolve_mx(session, pool, host, callback)
+ * Resolve MX record for a specified host.
+ * @param {async_session} session asynchronous session normally associated with rspamd task (`task:get_session()`)
+ * @param {mempool} pool memory pool for storing intermediate data
+ * @param {string} host name to get MX record for
+ * @param {function} callback callback function to be called upon name resolution is finished; must be of type `function (resolver, to_resolve, results, err)`
+ * @return {boolean} `true` if DNS request has been scheduled
+ */
+static int
+lua_dns_resolver_resolve_mx (lua_State *L)
+{
+ struct rspamd_dns_resolver *dns_resolver = lua_check_dns_resolver (L);
+
+ if (dns_resolver) {
+ return lua_dns_resolver_resolve_common (L,
+ dns_resolver,
+ RDNS_REQUEST_MX,
+ 2);
+ }
+ else {
+ lua_pushnil (L);
+ }
+
+ return 1;
+}
+
+/***
+ * @method resolver:resolve_ns(session, pool, host, callback)
+ * Resolve NS records for a specified host.
+ * @param {async_session} session asynchronous session normally associated with rspamd task (`task:get_session()`)
+ * @param {mempool} pool memory pool for storing intermediate data
+ * @param {string} host name to get NS records for
+ * @param {function} callback callback function to be called upon name resolution is finished; must be of type `function (resolver, to_resolve, results, err)`
+ * @return {boolean} `true` if DNS request has been scheduled
+ */
+static int
+lua_dns_resolver_resolve_ns (lua_State *L)
+{
+ struct rspamd_dns_resolver *dns_resolver = lua_check_dns_resolver (L);
+
+ if (dns_resolver) {
+ return lua_dns_resolver_resolve_common (L,
+ dns_resolver,
+ RDNS_REQUEST_NS,
+ 2);
+ }
+ else {
+ lua_pushnil (L);
+ }
+
+ return 1;
+}
+
+/* XXX: broken currently */
+static int
+lua_dns_resolver_resolve (lua_State *L)
+{
+ struct rspamd_dns_resolver *dns_resolver = lua_check_dns_resolver (L);
+ int type;
+
+ type = lua_dns_get_type (L, 2);
+
+ if (dns_resolver && type != 0) {
+ return lua_dns_resolver_resolve_common (L, dns_resolver, type, 3);
+ }
+ else {
+ lua_pushnil (L);
+ }
+
+ return 1;
+}
+
+static gint
+lua_load_dns_resolver (lua_State *L)
+{
+ lua_newtable (L);
+ luaL_register (L, NULL, dns_resolverlib_f);
+
+ return 1;
+}
+
+void
+luaopen_dns_resolver (lua_State * L)
+{
+
+ luaL_newmetatable (L, "rspamd{resolver}");
+ lua_pushstring (L, "__index");
+ lua_pushvalue (L, -2);
+ lua_settable (L, -3);
+
+ lua_pushstring (L, "class");
+ lua_pushstring (L, "rspamd{resolver}");
+ lua_rawset (L, -3);
+
+ {
+ LUA_ENUM (L, DNS_A, RDNS_REQUEST_A);
+ LUA_ENUM (L, DNS_PTR, RDNS_REQUEST_PTR);
+ LUA_ENUM (L, DNS_MX, RDNS_REQUEST_MX);
+ LUA_ENUM (L, DNS_TXT, RDNS_REQUEST_TXT);
+ LUA_ENUM (L, DNS_SRV, RDNS_REQUEST_SRV);
+ LUA_ENUM (L, DNS_SPF, RDNS_REQUEST_SPF);
+ LUA_ENUM (L, DNS_AAAA, RDNS_REQUEST_AAAA);
+ LUA_ENUM (L, DNS_SOA, RDNS_REQUEST_SOA);
+ }
+
+ luaL_register (L, NULL, dns_resolverlib_m);
+ rspamd_lua_add_preload (L, "rspamd_resolver", lua_load_dns_resolver);
+
+ lua_pop (L, 1); /* remove metatable from stack */
+}
diff --git a/src/lua/lua_dns_resolver.h b/src/lua/lua_dns_resolver.h
new file mode 100644
index 000000000..f5c71aa0b
--- /dev/null
+++ b/src/lua/lua_dns_resolver.h
@@ -0,0 +1,16 @@
+#ifndef RSPAMD_LUA_DNS_H
+#define RSPAMD_LUA_DNS_H
+
+typedef struct lua_State lua_State;
+struct rdns_reply;
+
+/**
+ * Pushes dns reply onto Lua stack
+ *
+ * @param L
+ * @param reply
+ */
+void
+lua_push_dns_reply (lua_State *L, const struct rdns_reply *reply);
+
+#endif //RSPAMD_LUA_DNS_H
diff --git a/src/lua/lua_expression.c b/src/lua/lua_expression.c
index 03a667b8d..0d57a3bd8 100644
--- a/src/lua/lua_expression.c
+++ b/src/lua/lua_expression.c
@@ -98,7 +98,7 @@ static const struct luaL_reg exprlib_f[] = {
static rspamd_expression_atom_t * lua_atom_parse (const gchar *line, gsize len,
rspamd_mempool_t *pool, gpointer ud, GError **err);
-static gdouble lua_atom_process (gpointer input, rspamd_expression_atom_t *atom);
+static gdouble lua_atom_process (struct rspamd_expr_process_data *process_data, rspamd_expression_atom_t *atom);
static const struct rspamd_atom_subr lua_atom_subr = {
.parse = lua_atom_parse,
@@ -166,22 +166,22 @@ lua_atom_parse (const gchar *line, gsize len,
}
static gdouble
-lua_atom_process (gpointer input, rspamd_expression_atom_t *atom)
+lua_atom_process (struct rspamd_expr_process_data *process_data, rspamd_expression_atom_t *atom)
{
struct lua_expression *e = (struct lua_expression *)atom->data;
gdouble ret = 0;
- lua_rawgeti (e->L, LUA_REGISTRYINDEX, e->process_idx);
- lua_pushlstring (e->L, atom->str, atom->len);
- lua_pushvalue (e->L, GPOINTER_TO_INT (input));
+ lua_rawgeti (process_data->L, LUA_REGISTRYINDEX, e->process_idx);
+ lua_pushlstring (process_data->L, atom->str, atom->len);
+ lua_pushvalue (process_data->L, process_data->stack_item);
- if (lua_pcall (e->L, 2, 1, 0) != 0) {
- msg_info ("callback call failed: %s", lua_tostring (e->L, -1));
- lua_pop (e->L, 1);
+ if (lua_pcall (process_data->L, 2, 1, 0) != 0) {
+ msg_info ("callback call failed: %s", lua_tostring (process_data->L, -1));
+ lua_pop (process_data->L, 1);
}
else {
- ret = lua_tonumber (e->L, -1);
- lua_pop (e->L, 1);
+ ret = lua_tonumber (process_data->L, -1);
+ lua_pop (process_data->L, 1);
}
return ret;
@@ -195,11 +195,16 @@ lua_expr_process (lua_State *L)
gdouble res;
gint flags = 0;
+ struct rspamd_expr_process_data process_data;
+ memset (&process_data, 0, sizeof process_data);
+ process_data.L = L;
+ process_data.stack_item = 2;
+
if (lua_gettop (L) >= 3) {
- flags = lua_tonumber (L, 3);
+ process_data.flags = flags;
}
- res = rspamd_process_expression (e->expr, flags, GINT_TO_POINTER (2));
+ res = rspamd_process_expression (e->expr, &process_data);
lua_pushnumber (L, res);
@@ -214,29 +219,36 @@ lua_expr_process_traced (lua_State *L)
rspamd_expression_atom_t *atom;
gint res;
guint i;
- gint flags = 0;
- GPtrArray *trace;
+ struct rspamd_expr_process_data process_data;
+ memset (&process_data, 0, sizeof process_data);
+
+ process_data.L = L;
+ /*
+ * stack:1 - self
+ * stack:2 - data, see process_traced() definition for details
+ */
+ process_data.stack_item = 2;
if (lua_gettop (L) >= 3) {
- flags = lua_tonumber (L, 3);
+ process_data.flags = lua_tonumber (L, 3);
}
- trace = g_ptr_array_sized_new (32);
- res = rspamd_process_expression_track (e->expr, flags, GINT_TO_POINTER (2),
- trace);
+ process_data.trace = g_ptr_array_sized_new (32);
+
+ res = rspamd_process_expression_track (e->expr, &process_data);
lua_pushnumber (L, res);
- lua_createtable (L, trace->len, 0);
+ lua_createtable (L, process_data.trace->len, 0);
- for (i = 0; i < trace->len; i ++) {
- atom = g_ptr_array_index (trace, i);
+ for (i = 0; i < process_data.trace->len; i ++) {
+ atom = g_ptr_array_index (process_data.trace, i);
lua_pushlstring (L, atom->str, atom->len);
lua_rawseti (L, -2, i + 1);
}
- g_ptr_array_free (trace, TRUE);
+ g_ptr_array_free (process_data.trace, TRUE);
return 2;
}
diff --git a/src/lua/lua_http.c b/src/lua/lua_http.c
index c292428c2..64617be9b 100644
--- a/src/lua/lua_http.c
+++ b/src/lua/lua_http.c
@@ -14,6 +14,7 @@
* limitations under the License.
*/
#include "lua_common.h"
+#include "lua_thread_pool.h"
#include "http_private.h"
#include "unix-std.h"
#include "zlib.h"
@@ -56,7 +57,6 @@ static const struct luaL_reg httplib_m[] = {
#define RSPAMD_LUA_HTTP_FLAG_NOVERIFY (1 << 1)
struct lua_http_cbdata {
- lua_State *L;
struct rspamd_http_connection *conn;
struct rspamd_async_session *session;
struct rspamd_async_watcher *w;
@@ -96,7 +96,7 @@ lua_http_fin (gpointer arg)
{
struct lua_http_cbdata *cbd = (struct lua_http_cbdata *)arg;
- luaL_unref (cbd->L, LUA_REGISTRYINDEX, cbd->cbref);
+ luaL_unref (cbd->cfg->lua_state, LUA_REGISTRYINDEX, cbd->cbref);
if (cbd->conn) {
/* Here we already have a connection, so we need to unref it */
rspamd_http_connection_unref (cbd->conn);
@@ -152,13 +152,22 @@ lua_http_maybe_free (struct lua_http_cbdata *cbd)
static void
lua_http_push_error (struct lua_http_cbdata *cbd, const char *err)
{
- lua_rawgeti (cbd->L, LUA_REGISTRYINDEX, cbd->cbref);
- lua_pushstring (cbd->L, err);
+ struct lua_callback_state lcbd;
+ lua_State *L;
+
+ lua_thread_pool_prepare_callback (cbd->cfg->lua_thread_pool, &lcbd);
+
+ L = lcbd.L;
- if (lua_pcall (cbd->L, 1, 0, 0) != 0) {
- msg_info ("callback call failed: %s", lua_tostring (cbd->L, -1));
- lua_pop (cbd->L, 1);
+ lua_rawgeti (L, LUA_REGISTRYINDEX, cbd->cbref);
+ lua_pushstring (L, err);
+
+ if (lua_pcall (L, 1, 0, 0) != 0) {
+ msg_info ("callback call failed: %s", lua_tostring (L, -1));
+ lua_pop (L, 1);
}
+
+ lua_thread_pool_restore_callback (&lcbd);
}
static void
@@ -179,51 +188,60 @@ lua_http_finish_handler (struct rspamd_http_connection *conn,
const gchar *body;
gsize body_len;
- lua_rawgeti (cbd->L, LUA_REGISTRYINDEX, cbd->cbref);
+ struct lua_callback_state lcbd;
+ lua_State *L;
+
+ lua_thread_pool_prepare_callback (cbd->cfg->lua_thread_pool, &lcbd);
+
+ L = lcbd.L;
+
+ lua_rawgeti (L, LUA_REGISTRYINDEX, cbd->cbref);
/* Error */
- lua_pushnil (cbd->L);
+ lua_pushnil (L);
/* Reply code */
- lua_pushinteger (cbd->L, msg->code);
+ lua_pushinteger (L, msg->code);
/* Body */
body = rspamd_http_message_get_body (msg, &body_len);
if (cbd->flags & RSPAMD_LUA_HTTP_FLAG_TEXT) {
struct rspamd_lua_text *t;
- t = lua_newuserdata (cbd->L, sizeof (*t));
- rspamd_lua_setclass (cbd->L, "rspamd{text}", -1);
+ t = lua_newuserdata (L, sizeof (*t));
+ rspamd_lua_setclass (L, "rspamd{text}", -1);
t->start = body;
t->len = body_len;
t->flags = 0;
}
else {
if (body_len > 0) {
- lua_pushlstring (cbd->L, body, body_len);
+ lua_pushlstring (L, body, body_len);
}
else {
- lua_pushnil (cbd->L);
+ lua_pushnil (L);
}
}
/* Headers */
- lua_newtable (cbd->L);
+ lua_newtable (L);
HASH_ITER (hh, msg->headers, h, htmp) {
/*
* Lowercase header name, as Lua cannot search in caseless matter
*/
rspamd_str_lc (h->combined->str, h->name.len);
- lua_pushlstring (cbd->L, h->name.begin, h->name.len);
- lua_pushlstring (cbd->L, h->value.begin, h->value.len);
- lua_settable (cbd->L, -3);
+ lua_pushlstring (L, h->name.begin, h->name.len);
+ lua_pushlstring (L, h->value.begin, h->value.len);
+ lua_settable (L, -3);
}
- if (lua_pcall (cbd->L, 4, 0, 0) != 0) {
- msg_info ("callback call failed: %s", lua_tostring (cbd->L, -1));
- lua_pop (cbd->L, 1);
+ if (lua_pcall (L, 4, 0, 0) != 0) {
+ msg_info ("callback call failed: %s", lua_tostring (L, -1));
+ lua_pop (L, 1);
}
lua_http_maybe_free (cbd);
+ lua_thread_pool_restore_callback (&lcbd);
+
return 0;
}
@@ -707,7 +725,6 @@ lua_http_request (lua_State *L)
}
cbd = g_malloc0 (sizeof (*cbd));
- cbd->L = L;
cbd->cbref = cbref;
cbd->msg = msg;
cbd->ev_base = ev_base;
diff --git a/src/lua/lua_redis.c b/src/lua/lua_redis.c
index e5b97ebeb..0fc9c43b7 100644
--- a/src/lua/lua_redis.c
+++ b/src/lua/lua_redis.c
@@ -14,6 +14,7 @@
* limitations under the License.
*/
#include "lua_common.h"
+#include "lua_thread_pool.h"
#include "utlist.h"
#include "contrib/hiredis/hiredis.h"
@@ -92,7 +93,7 @@ struct lua_redis_specific_userdata;
*/
struct lua_redis_userdata {
redisAsyncContext *ctx;
- lua_State *L;
+ struct rspamd_task *task;
struct rspamd_async_session *s;
struct event_base *ev_base;
struct rspamd_config *cfg;
@@ -191,7 +192,7 @@ lua_redis_dtor (struct lua_redis_ctx *ctx)
lua_redis_free_args (cur->args, cur->arglens, cur->nargs);
if (cur->cbref != -1) {
- luaL_unref (ud->L, LUA_REGISTRYINDEX, cur->cbref);
+ luaL_unref (ud->cfg->lua_state, LUA_REGISTRYINDEX, cur->cbref);
}
g_free (cur);
@@ -244,21 +245,27 @@ lua_redis_push_error (const gchar *err,
gboolean connected)
{
struct lua_redis_userdata *ud = sp_ud->c;
+ struct lua_callback_state cbs;
if (!(sp_ud->flags & (LUA_REDIS_SPECIFIC_REPLIED|LUA_REDIS_SPECIFIC_FINISHED))) {
if (sp_ud->cbref != -1) {
+
+ lua_thread_pool_prepare_callback (ud->cfg->lua_thread_pool, &cbs);
+
/* Push error */
- lua_rawgeti (ud->L, LUA_REGISTRYINDEX, sp_ud->cbref);
+ lua_rawgeti (cbs.L, LUA_REGISTRYINDEX, sp_ud->cbref);
/* String of error */
- lua_pushstring (ud->L, err);
+ lua_pushstring (cbs.L, err);
/* Data is nil */
- lua_pushnil (ud->L);
+ lua_pushnil (cbs.L);
- if (lua_pcall (ud->L, 2, 0, 0) != 0) {
- msg_info ("call to callback failed: %s", lua_tostring (ud->L, -1));
- lua_pop (ud->L, 1);
+ if (lua_pcall (cbs.L, 2, 0, 0) != 0) {
+ msg_info ("call to callback failed: %s", lua_tostring (cbs.L, -1));
+ lua_pop (cbs.L, 1);
}
+
+ lua_thread_pool_restore_callback (&cbs);
}
sp_ud->flags |= LUA_REDIS_SPECIFIC_REPLIED;
@@ -323,21 +330,25 @@ lua_redis_push_data (const redisReply *r, struct lua_redis_ctx *ctx,
struct lua_redis_specific_userdata *sp_ud)
{
struct lua_redis_userdata *ud = sp_ud->c;
+ struct lua_callback_state cbs;
if (!(sp_ud->flags & (LUA_REDIS_SPECIFIC_REPLIED|LUA_REDIS_SPECIFIC_FINISHED))) {
if (sp_ud->cbref != -1) {
+ lua_thread_pool_prepare_callback (ud->cfg->lua_thread_pool, &cbs);
+
/* Push error */
- lua_rawgeti (ud->L, LUA_REGISTRYINDEX, sp_ud->cbref);
+ lua_rawgeti (cbs.L, LUA_REGISTRYINDEX, sp_ud->cbref);
/* Error is nil */
- lua_pushnil (ud->L);
+ lua_pushnil (cbs.L);
/* Data */
- lua_redis_push_reply (ud->L, r, ctx->flags & LUA_REDIS_TEXTDATA);
+ lua_redis_push_reply (cbs.L, r, ctx->flags & LUA_REDIS_TEXTDATA);
- if (lua_pcall (ud->L, 2, 0, 0) != 0) {
- msg_info ("call to callback failed: %s", lua_tostring (ud->L, -1));
- lua_pop (ud->L, 1);
+ if (lua_pcall (cbs.L, 2, 0, 0) != 0) {
+ msg_info ("call to callback failed: %s", lua_tostring (cbs.L, -1));
+ lua_pop (cbs.L, 1);
}
+ lua_thread_pool_restore_callback (&cbs);
}
sp_ud->flags |= LUA_REDIS_SPECIFIC_REPLIED;
@@ -689,7 +700,7 @@ rspamd_lua_redis_prepare_connection (lua_State *L, gint *pcbref)
ud->cfg = cfg;
ud->pool = cfg->redis_pool;
ud->ev_base = ev_base;
- ud->L = L;
+ ud->task = task;
ret = TRUE;
}
diff --git a/src/lua/lua_tcp.c b/src/lua/lua_tcp.c
index 797bdcc4e..8d948c6d5 100644
--- a/src/lua/lua_tcp.c
+++ b/src/lua/lua_tcp.c
@@ -14,6 +14,7 @@
* limitations under the License.
*/
#include "lua_common.h"
+#include "lua_thread_pool.h"
#include "utlist.h"
#include "unix-std.h"
@@ -186,7 +187,6 @@ struct lua_tcp_dtor {
#define LUA_TCP_FLAG_FINISHED (1 << 4)
struct lua_tcp_cbdata {
- lua_State *L;
struct rspamd_async_session *session;
struct rspamd_async_event *async_ev;
struct event_base *ev_base;
@@ -203,6 +203,7 @@ struct lua_tcp_cbdata {
struct event ev;
struct lua_tcp_dtor *dtors;
ref_entry_t ref;
+ struct rspamd_task *task;
};
#define msg_debug_tcp(...) rspamd_conditional_debug_fast (NULL, cbd->addr, \
@@ -249,7 +250,7 @@ lua_tcp_shift_handler (struct lua_tcp_cbdata *cbd)
if (hdl->type == LUA_WANT_READ) {
if (hdl->h.r.cbref) {
- luaL_unref (cbd->L, LUA_REGISTRYINDEX, hdl->h.r.cbref);
+ luaL_unref (cbd->task->cfg->lua_state, LUA_REGISTRYINDEX, hdl->h.r.cbref);
}
if (hdl->h.r.stop_pattern) {
@@ -258,7 +259,7 @@ lua_tcp_shift_handler (struct lua_tcp_cbdata *cbd)
}
else {
if (hdl->h.w.cbref) {
- luaL_unref (cbd->L, LUA_REGISTRYINDEX, hdl->h.w.cbref);
+ luaL_unref (cbd->task->cfg->lua_state, LUA_REGISTRYINDEX, hdl->h.w.cbref);
}
if (hdl->h.w.iov) {
@@ -280,7 +281,7 @@ lua_tcp_fin (gpointer arg)
msg_debug_tcp ("finishing TCP connection");
if (cbd->connect_cb) {
- luaL_unref (cbd->L, LUA_REGISTRYINDEX, cbd->connect_cb);
+ luaL_unref (cbd->task->cfg->lua_state, LUA_REGISTRYINDEX, cbd->connect_cb);
}
if (cbd->fd != -1) {
@@ -338,6 +339,11 @@ lua_tcp_push_error (struct lua_tcp_cbdata *cbd, gboolean is_fatal,
struct lua_tcp_cbdata **pcbd;
struct lua_tcp_handler *hdl;
gint cbref, top;
+ struct lua_callback_state cbs;
+ lua_State *L;
+
+ lua_thread_pool_prepare_callback (cbd->task->cfg->lua_thread_pool, &cbs);
+ L = cbs.L;
va_start (ap, err);
@@ -356,27 +362,27 @@ lua_tcp_push_error (struct lua_tcp_cbdata *cbd, gboolean is_fatal,
}
if (cbref != -1) {
- top = lua_gettop (cbd->L);
- lua_rawgeti (cbd->L, LUA_REGISTRYINDEX, cbref);
+ top = lua_gettop (L);
+ lua_rawgeti (L, LUA_REGISTRYINDEX, cbref);
/* Error message */
va_copy (ap_copy, ap);
- lua_pushvfstring (cbd->L, err, ap_copy);
+ lua_pushvfstring (L, err, ap_copy);
va_end (ap_copy);
/* Body */
- lua_pushnil (cbd->L);
+ lua_pushnil (L);
/* Connection */
- pcbd = lua_newuserdata (cbd->L, sizeof (*pcbd));
+ pcbd = lua_newuserdata (L, sizeof (*pcbd));
*pcbd = cbd;
- rspamd_lua_setclass (cbd->L, "rspamd{tcp}", -1);
+ rspamd_lua_setclass (L, "rspamd{tcp}", -1);
REF_RETAIN (cbd);
- if (lua_pcall (cbd->L, 3, 0, 0) != 0) {
- msg_info ("callback call failed: %s", lua_tostring (cbd->L, -1));
+ if (lua_pcall (L, 3, 0, 0) != 0) {
+ msg_info ("callback call failed: %s", lua_tostring (L, -1));
}
- lua_settop (cbd->L, top);
+ lua_settop (L, top);
REF_RELEASE (cbd);
}
@@ -391,6 +397,8 @@ lua_tcp_push_error (struct lua_tcp_cbdata *cbd, gboolean is_fatal,
}
va_end (ap);
+
+ lua_thread_pool_restore_callback (&cbs);
}
static void
@@ -400,6 +408,11 @@ lua_tcp_push_data (struct lua_tcp_cbdata *cbd, const guint8 *str, gsize len)
struct lua_tcp_cbdata **pcbd;
struct lua_tcp_handler *hdl;
gint cbref, arg_cnt, top;
+ struct lua_callback_state cbs;
+ lua_State *L;
+
+ lua_thread_pool_prepare_callback (cbd->task->cfg->lua_thread_pool, &cbs);
+ L = cbs.L;
hdl = g_queue_peek_head (cbd->handlers);
@@ -413,15 +426,15 @@ lua_tcp_push_data (struct lua_tcp_cbdata *cbd, const guint8 *str, gsize len)
}
if (cbref != -1) {
- top = lua_gettop (cbd->L);
- lua_rawgeti (cbd->L, LUA_REGISTRYINDEX, cbref);
+ top = lua_gettop (L);
+ lua_rawgeti (L, LUA_REGISTRYINDEX, cbref);
/* Error */
- lua_pushnil (cbd->L);
+ lua_pushnil (L);
/* Body */
if (hdl->type == LUA_WANT_READ) {
- t = lua_newuserdata (cbd->L, sizeof (*t));
- rspamd_lua_setclass (cbd->L, "rspamd{text}", -1);
+ t = lua_newuserdata (L, sizeof (*t));
+ rspamd_lua_setclass (L, "rspamd{text}", -1);
t->start = (const gchar *)str;
t->len = len;
t->flags = 0;
@@ -431,19 +444,21 @@ lua_tcp_push_data (struct lua_tcp_cbdata *cbd, const guint8 *str, gsize len)
arg_cnt = 2;
}
/* Connection */
- pcbd = lua_newuserdata (cbd->L, sizeof (*pcbd));
+ pcbd = lua_newuserdata (L, sizeof (*pcbd));
*pcbd = cbd;
- rspamd_lua_setclass (cbd->L, "rspamd{tcp}", -1);
+ rspamd_lua_setclass (L, "rspamd{tcp}", -1);
REF_RETAIN (cbd);
- if (lua_pcall (cbd->L, arg_cnt, 0, 0) != 0) {
- msg_info ("callback call failed: %s", lua_tostring (cbd->L, -1));
+ if (lua_pcall (L, arg_cnt, 0, 0) != 0) {
+ msg_info ("callback call failed: %s", lua_tostring (L, -1));
}
- lua_settop (cbd->L, top);
+ lua_settop (L, top);
REF_RELEASE (cbd);
}
+
+ lua_thread_pool_restore_callback (&cbs);
}
static void
@@ -667,6 +682,8 @@ lua_tcp_handler (int fd, short what, gpointer ud)
gssize r;
gint so_error = 0;
socklen_t so_len = sizeof (so_error);
+ struct lua_callback_state cbs;
+ lua_State *L;
REF_RETAIN (cbd);
@@ -693,22 +710,25 @@ lua_tcp_handler (int fd, short what, gpointer ud)
else {
cbd->flags |= LUA_TCP_FLAG_CONNECTED;
+ lua_thread_pool_prepare_callback (cbd->task->cfg->lua_thread_pool, &cbs);
+ L = cbs.L;
+
if (cbd->connect_cb != -1) {
struct lua_tcp_cbdata **pcbd;
gint top;
- top = lua_gettop (cbd->L);
- lua_rawgeti (cbd->L, LUA_REGISTRYINDEX, cbd->connect_cb);
- pcbd = lua_newuserdata (cbd->L, sizeof (*pcbd));
+ top = lua_gettop (L);
+ lua_rawgeti (L, LUA_REGISTRYINDEX, cbd->connect_cb);
+ pcbd = lua_newuserdata (L, sizeof (*pcbd));
*pcbd = cbd;
REF_RETAIN (cbd);
- rspamd_lua_setclass (cbd->L, "rspamd{tcp}", -1);
+ rspamd_lua_setclass (L, "rspamd{tcp}", -1);
- if (lua_pcall (cbd->L, 1, 0, 0) != 0) {
- msg_info ("callback call failed: %s", lua_tostring (cbd->L, -1));
+ if (lua_pcall (L, 1, 0, 0) != 0) {
+ msg_info ("callback call failed: %s", lua_tostring (L, -1));
}
- lua_settop (cbd->L, top);
+ lua_settop (L, top);
REF_RELEASE (cbd);
}
@@ -1174,7 +1194,7 @@ lua_tcp_request (lua_State *L)
return 1;
}
- cbd->L = L;
+ cbd->task = task;
h = rspamd_random_uint64_fast ();
rspamd_snprintf (cbd->tag, sizeof (cbd->tag), "%uxL", h);
cbd->handlers = g_queue_new ();
diff --git a/src/lua/lua_thread_pool.c b/src/lua/lua_thread_pool.c
new file mode 100644
index 000000000..979b31f6b
--- /dev/null
+++ b/src/lua/lua_thread_pool.c
@@ -0,0 +1,148 @@
+#include "config.h"
+
+#include "lua_common.h"
+#include "lua_thread_pool.h"
+
+struct lua_thread_pool {
+ GQueue *available_items;
+ lua_State *L;
+ gint max_items;
+ struct thread_entry *running_entry;
+};
+
+static struct thread_entry *
+thread_entry_new (lua_State * L)
+{
+ struct thread_entry *ent;
+ ent = g_new0(struct thread_entry, 1);
+ ent->lua_state = lua_newthread (L);
+ ent->thread_index = luaL_ref (L, LUA_REGISTRYINDEX);
+
+ return ent;
+}
+
+static void
+thread_entry_free (lua_State * L, struct thread_entry *ent)
+{
+ luaL_unref (L, LUA_REGISTRYINDEX, ent->thread_index);
+ g_free (ent);
+}
+
+struct lua_thread_pool *
+lua_thread_pool_new (lua_State * L)
+{
+ struct lua_thread_pool * pool = g_new0 (struct lua_thread_pool, 1);
+
+ pool->L = L;
+ pool->max_items = 100;
+
+ pool->available_items = g_queue_new ();
+ int i;
+
+ struct thread_entry *ent;
+ for (i = 0; i < MAX(2, pool->max_items / 10); i ++) {
+ ent = thread_entry_new (pool->L);
+ g_queue_push_head (pool->available_items, ent);
+ }
+
+ return pool;
+}
+
+void
+lua_thread_pool_free (struct lua_thread_pool *pool)
+{
+ struct thread_entry *ent = NULL;
+ while (!g_queue_is_empty (pool->available_items)) {
+ ent = g_queue_pop_head (pool->available_items);
+ thread_entry_free (pool->L, ent);
+ }
+ g_queue_free (pool->available_items);
+ g_free (pool);
+}
+
+struct thread_entry *
+lua_thread_pool_get (struct lua_thread_pool *pool)
+{
+ gpointer cur;
+ struct thread_entry *ent = NULL;
+
+ cur = g_queue_pop_head (pool->available_items);
+
+ if (cur) {
+ ent = cur;
+ }
+ else {
+ ent = thread_entry_new (pool->L);
+ }
+
+ pool->running_entry = ent;
+
+ return ent;
+}
+
+void
+lua_thread_pool_return (struct lua_thread_pool *pool, struct thread_entry *thread_entry)
+{
+ g_assert (lua_status (thread_entry->lua_state) == 0); /* we can't return a running/yielded thread into the pool */
+
+ if (pool->running_entry == thread_entry) {
+ pool->running_entry = NULL;
+ }
+
+ if (g_queue_get_length (pool->available_items) <= pool->max_items) {
+ thread_entry->cd = NULL;
+ g_queue_push_head (pool->available_items, thread_entry);
+ }
+ else {
+ thread_entry_free (pool->L, thread_entry);
+ }
+}
+
+void
+lua_thread_pool_terminate_entry (struct lua_thread_pool *pool, struct thread_entry *thread_entry)
+{
+ struct thread_entry *ent = NULL;
+
+ /* we should only terminate failed threads */
+ g_assert (lua_status (thread_entry->lua_state) != 0 && lua_status (thread_entry->lua_state) != LUA_YIELD);
+
+ if (pool->running_entry == thread_entry) {
+ pool->running_entry = NULL;
+ }
+
+ thread_entry_free (pool->L, thread_entry);
+
+ if (g_queue_get_length (pool->available_items) <= pool->max_items) {
+ ent = thread_entry_new (pool->L);
+ g_queue_push_head (pool->available_items, ent);
+ }
+}
+
+struct thread_entry *
+lua_thread_pool_get_running_entry (struct lua_thread_pool *pool)
+{
+ return pool->running_entry;
+}
+
+void
+lua_thread_pool_set_running_entry (struct lua_thread_pool *pool, struct thread_entry *thread_entry)
+{
+ pool->running_entry = thread_entry;
+}
+
+
+void
+lua_thread_pool_prepare_callback (struct lua_thread_pool *pool, struct lua_callback_state *cbs)
+{
+ cbs->thread_pool = pool;
+ cbs->previous_thread = lua_thread_pool_get_running_entry (pool);
+ cbs->my_thread = lua_thread_pool_get (pool);
+ cbs->L = cbs->my_thread->lua_state;
+}
+
+void
+lua_thread_pool_restore_callback (struct lua_callback_state *cbs)
+{
+ lua_thread_pool_return (cbs->thread_pool, cbs->my_thread);
+ lua_thread_pool_set_running_entry (cbs->thread_pool, cbs->previous_thread);
+}
diff --git a/src/lua/lua_thread_pool.h b/src/lua/lua_thread_pool.h
new file mode 100644
index 000000000..b72b72e8d
--- /dev/null
+++ b/src/lua/lua_thread_pool.h
@@ -0,0 +1,106 @@
+#ifndef LUA_THREAD_POOL_H_
+#define LUA_THREAD_POOL_H_
+
+#include <lua.h>
+
+struct thread_entry {
+ lua_State *lua_state;
+ gint thread_index;
+ gpointer cd;
+};
+
+struct thread_pool;
+
+struct lua_callback_state {
+ lua_State *L;
+ struct thread_entry *my_thread;
+ struct thread_entry *previous_thread;
+ struct lua_thread_pool *thread_pool;
+};
+
+/**
+ * Allocates new thread pool on state L. Pre-creates number of lua-threads to use later on
+ *
+ * @param L
+ * @return
+ */
+struct lua_thread_pool *
+lua_thread_pool_new (lua_State * L);
+
+/**
+ * Destroys the pool
+ * @param pool
+ */
+void
+lua_thread_pool_free (struct lua_thread_pool *pool);
+
+/**
+ * Extracts a thread from the list of available ones.
+ * It immediately becames running one and should be used to run a Lua script/function straight away.
+ * as soon as the code is finished, it should be either returned into list of available threads by
+ * calling lua_thread_pool_return() or terminated by calling lua_thread_pool_terminate_entry()
+ * if the code finished with error.
+ *
+ * If the code performed YIELD, the thread is still running and it's live should be controlled by the callee
+ *
+ * @param pool
+ * @return
+ */
+struct thread_entry *
+lua_thread_pool_get(struct lua_thread_pool *pool);
+
+/**
+ * Return thread into the list of available ones. It can't be done with yielded or dead threads.
+ *
+ * @param pool
+ * @param thread_entry
+ */
+void
+lua_thread_pool_return(struct lua_thread_pool *pool, struct thread_entry *thread_entry);
+
+/**
+ * Removes thread from Lua state. It should be done to dead (which ended with an error) threads only
+ *
+ * @param pool
+ * @param thread_entry
+ */
+void
+lua_thread_pool_terminate_entry(struct lua_thread_pool *pool, struct thread_entry *thread_entry);
+
+/**
+ * Currently running thread. Typically needed in yielding point - to fill-up continuation.
+ *
+ * @param pool
+ * @return
+ */
+struct thread_entry *
+lua_thread_pool_get_running_entry (struct lua_thread_pool *pool);
+
+/**
+ * Updates currently running thread
+ *
+ * @param pool
+ * @param thread_entry
+ */
+void
+lua_thread_pool_set_running_entry (struct lua_thread_pool *pool, struct thread_entry *thread_entry);
+
+/**
+ * Prevents yielded thread to be used for callback execution. lua_thread_pool_restore_callback() should be called afterwards.
+ *
+ * @param pool
+ * @param cbs
+ */
+void
+lua_thread_pool_prepare_callback (struct lua_thread_pool *pool, struct lua_callback_state *cbs);
+
+/**
+ * Restores state after lua_thread_pool_prepare_callback () usage
+ *
+ * @param cbs
+ */
+void
+lua_thread_pool_restore_callback (struct lua_callback_state *cbs);
+
+#endif /* LUA_THREAD_POOL_H_ */
+
diff --git a/src/plugins/lua/multimap.lua b/src/plugins/lua/multimap.lua
index d20885525..909260681 100644
--- a/src/plugins/lua/multimap.lua
+++ b/src/plugins/lua/multimap.lua
@@ -28,6 +28,7 @@ local regexp = require "rspamd_regexp"
local rspamd_expression = require "rspamd_expression"
local rspamd_ip = require "rspamd_ip"
local lua_util = require "lua_util"
+local rspamd_dns = require "rspamd_dns"
local lua_selectors = require "lua_selectors"
local redis_params
local fun = require "fun"
@@ -706,24 +707,25 @@ local function multimap_callback(task, rule)
if rt == 'ip' then
match_rule(rule, ip)
else
- local cb = function (_, to_resolve, results, err)
- task:inc_dns_req()
- if err and (err ~= 'requested record is not found' and err ~= 'no records with this name') then
- rspamd_logger.errx(task, 'error looking up %s: %s', to_resolve, err)
- end
- if results then
- task:insert_result(rule['symbol'], 1, rule['map'])
+ local to_resolve = ip_to_rbl(ip, rule['map'])
+
+ local is_ok, results = rspamd_dns.request({
+ type = "a",
+ task = task,
+ name = to_resolve,
+ })
- if pre_filter then
- task:set_pre_result(rule['action'], 'Matched map: ' .. rule['symbol'])
- end
+ lua_util.debugm(N, rspamd_config, 'resolve() finished: results=%1, is_ok=%2, to_resolve=%3', results, is_ok, to_resolve)
+
+ if not is_ok and (results ~= 'requested record is not found' and results ~= 'no records with this name') then
+ rspamd_logger.errx(task, 'error looking up %s: %s', to_resolve, results)
+ elseif is_ok then
+ task:insert_result(rule['symbol'], 1, rule['map'])
+ if pre_filter then
+ task:set_pre_result(rule['action'], 'Matched map: ' .. rule['symbol'])
end
end
- task:get_resolver():resolve_a({task = task,
- name = ip_to_rbl(ip, rule['map']),
- callback = cb,
- })
end
end
end,
diff --git a/src/plugins/lua/reputation.lua b/src/plugins/lua/reputation.lua
index 138899417..f5b461d78 100644
--- a/src/plugins/lua/reputation.lua
+++ b/src/plugins/lua/reputation.lua
@@ -25,6 +25,7 @@ local N = 'reputation'
local rspamd_logger = require "rspamd_logger"
local rspamd_util = require "rspamd_util"
+local rspamd_dns = require "rspamd_dns"
local lua_util = require "lua_util"
local lua_maps = require "lua_maps"
local hash = require 'rspamd_cryptobox_hash'
@@ -716,49 +717,45 @@ end
--]]
local function reputation_dns_get_token(task, rule, token, continuation_cb)
- local r = task:get_resolver()
+ -- local r = task:get_resolver()
local key = gen_token_key(token, rule)
local dns_name = key .. '.' .. rule.backend.config.list
- local function dns_callback(_, to_resolve, results, err)
- if err and (err ~= 'requested record is not found' and err ~= 'no records with this name') then
- rspamd_logger.errx(task, 'error looking up %s: %s', to_resolve, err)
- end
- if not results then
- lua_util.debugm(N, task, 'DNS RESPONSE: label=%1 results=%2 error=%3 list=%4',
- to_resolve, false, err, rule.backend.config.list)
- else
- lua_util.debugm(N, task, 'DNS RESPONSE: label=%1 results=%2 error=%3 list=%4',
- to_resolve, true, err, rule.backend.config.list)
- end
-
- -- Now split tokens to list of values
- if not err and results then
- local values = {}
- -- Format: key1=num1;key2=num2...keyn=numn
- fun.each(function(e)
- local vals = lua_util.rspamd_str_split(e, "=")
- if vals and #vals == 2 then
- local nv = tonumber(vals[2])
- if nv then
- values[vals[1]] = nv
- end
- end
- end,
- lua_util.rspamd_str_split(results[1], ";"))
- continuation_cb(nil, to_resolve, values)
- else
- continuation_cb(err, to_resolve, nil)
- end
-
- task:inc_dns_req()
- end
- r:resolve_a({
+ local is_ok, results = rspamd_dns.request({
+ type = 'a',
task = task,
name = dns_name,
- callback = dns_callback,
forced = true,
})
+
+ if not is_ok and (results ~= 'requested record is not found' and results ~= 'no records with this name') then
+ rspamd_logger.errx(task, 'error looking up %s: %s', dns_name, results)
+ end
+
+ lua_util.debugm(N, task, 'DNS RESPONSE: label=%1 results=%2 is_ok=%3 list=%4',
+ dns_name, results, is_ok, rule.backend.config.list)
+
+ -- Now split tokens to list of values
+ if is_ok then
+ local values = {}
+ -- Format: key1=num1;key2=num2...keyn=numn
+ fun.each(function(e)
+ local vals = lua_util.rspamd_str_split(e, "=")
+ if vals and #vals == 2 then
+ local nv = tonumber(vals[2])
+ if nv then
+ values[vals[1]] = nv
+ end
+ end
+ end,
+ lua_util.rspamd_str_split(results[1], ";"))
+
+ continuation_cb(nil, dns_name, values)
+ else
+ continuation_cb(results, dns_name, nil)
+ end
+
+ task:inc_dns_req()
end
local function reputation_redis_init(rule, cfg, ev_base, worker)
diff --git a/src/plugins/regexp.c b/src/plugins/regexp.c
index 915305aa3..92cccc338 100644
--- a/src/plugins/regexp.c
+++ b/src/plugins/regexp.c
@@ -433,7 +433,12 @@ process_regexp_item (struct rspamd_task *task, void *user_data)
else {
/* Process expression */
if (item->expr) {
- res = rspamd_process_expression (item->expr, 0, task);
+ struct rspamd_expr_process_data process_data;
+ memset (&process_data, 0, sizeof process_data);
+
+ process_data.task = task;
+
+ res = rspamd_process_expression (item->expr, &process_data);
}
else {
msg_warn_task ("FIXME: %s symbol is broken with new expressions",