diff options
author | Vsevolod Stakhov <vsevolod@highsecure.ru> | 2018-08-20 11:57:04 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2018-08-20 11:57:04 +0100 |
commit | 816987a0b25fd8e9915a0dc1f2f20c968c2177d4 (patch) | |
tree | 2729860f7cd12a4d8b86c917b5b215be83914c79 /src | |
parent | 2bc77d7a877940589c2ae78b7521a39a5e5be97f (diff) | |
parent | f9606fe254533f1cb493230b5ab4ee075cd550f6 (diff) | |
download | rspamd-816987a0b25fd8e9915a0dc1f2f20c968c2177d4.tar.gz rspamd-816987a0b25fd8e9915a0dc1f2f20c968c2177d4.zip |
Merge pull request #2406 from negram/lua-coroutine-model
[Project] coroutine threaded model for API calls: thread pool
Diffstat (limited to 'src')
-rw-r--r-- | src/libmime/mime_expressions.c | 6 | ||||
-rw-r--r-- | src/libserver/cfg_file.h | 1 | ||||
-rw-r--r-- | src/libserver/cfg_utils.c | 3 | ||||
-rw-r--r-- | src/libserver/composites.c | 15 | ||||
-rw-r--r-- | src/libutil/expression.c | 24 | ||||
-rw-r--r-- | src/libutil/expression.h | 22 | ||||
-rw-r--r-- | src/lua/CMakeLists.txt | 6 | ||||
-rw-r--r-- | src/lua/lua_common.c | 17 | ||||
-rw-r--r-- | src/lua/lua_common.h | 28 | ||||
-rw-r--r-- | src/lua/lua_config.c | 140 | ||||
-rw-r--r-- | src/lua/lua_dns.c | 597 | ||||
-rw-r--r-- | src/lua/lua_dns_resolver.c | 631 | ||||
-rw-r--r-- | src/lua/lua_dns_resolver.h | 16 | ||||
-rw-r--r-- | src/lua/lua_expression.c | 56 | ||||
-rw-r--r-- | src/lua/lua_http.c | 61 | ||||
-rw-r--r-- | src/lua/lua_redis.c | 41 | ||||
-rw-r--r-- | src/lua/lua_tcp.c | 82 | ||||
-rw-r--r-- | src/lua/lua_thread_pool.c | 148 | ||||
-rw-r--r-- | src/lua/lua_thread_pool.h | 106 | ||||
-rw-r--r-- | src/plugins/lua/multimap.lua | 30 | ||||
-rw-r--r-- | src/plugins/lua/reputation.lua | 69 | ||||
-rw-r--r-- | src/plugins/regexp.c | 7 |
22 files changed, 1387 insertions, 719 deletions
diff --git a/src/libmime/mime_expressions.c b/src/libmime/mime_expressions.c index f9bfdc1bf..518b9a390 100644 --- a/src/libmime/mime_expressions.c +++ b/src/libmime/mime_expressions.c @@ -83,7 +83,7 @@ static gboolean rspamd_is_empty_body (struct rspamd_task *task, static rspamd_expression_atom_t * rspamd_mime_expr_parse (const gchar *line, gsize len, rspamd_mempool_t *pool, gpointer ud, GError **err); -static gdouble rspamd_mime_expr_process (gpointer input, rspamd_expression_atom_t *atom); +static gdouble rspamd_mime_expr_process (struct rspamd_expr_process_data *process_data, rspamd_expression_atom_t *atom); static gint rspamd_mime_expr_priority (rspamd_expression_atom_t *atom); static void rspamd_mime_expr_destroy (rspamd_expression_atom_t *atom); @@ -913,9 +913,9 @@ rspamd_mime_expr_process_function (struct rspamd_function_atom * func, } static gdouble -rspamd_mime_expr_process (gpointer input, rspamd_expression_atom_t *atom) +rspamd_mime_expr_process (struct rspamd_expr_process_data *process_data, rspamd_expression_atom_t *atom) { - struct rspamd_task *task = input; + struct rspamd_task *task = process_data->task; struct rspamd_mime_atom *mime_atom; lua_State *L; gdouble ret = 0; diff --git a/src/libserver/cfg_file.h b/src/libserver/cfg_file.h index 75b404530..c583766c4 100644 --- a/src/libserver/cfg_file.h +++ b/src/libserver/cfg_file.h @@ -380,6 +380,7 @@ struct rspamd_config { gchar * checksum; /**< real checksum of config file */ gchar * dump_checksum; /**< dump checksum of config file */ gpointer lua_state; /**< pointer to lua state */ + gpointer lua_thread_pool; /**< pointer to lua thread (coroutine) pool */ gchar * rrd_file; /**< rrd file to store statistics */ gchar * history_file; /**< file to save rolling history */ diff --git a/src/libserver/cfg_utils.c b/src/libserver/cfg_utils.c index b7b9dfdee..016556912 100644 --- a/src/libserver/cfg_utils.c +++ b/src/libserver/cfg_utils.c @@ -20,6 +20,7 @@ #include "uthash_strcase.h" #include "filter.h" #include "lua/lua_common.h" +#include "lua/lua_thread_pool.h" #include "map.h" #include "map_helpers.h" #include "map_private.h" @@ -175,6 +176,7 @@ rspamd_config_new (enum rspamd_config_init_flags flags) if (!(flags & RSPAMD_CONFIG_INIT_SKIP_LUA)) { cfg->lua_state = rspamd_lua_init (); cfg->own_lua_state = TRUE; + cfg->lua_thread_pool = lua_thread_pool_new (cfg->lua_state); } cfg->cache = rspamd_symbols_cache_new (cfg); @@ -259,6 +261,7 @@ rspamd_config_free (struct rspamd_config *cfg) g_ptr_array_free (cfg->c_modules, TRUE); if (cfg->lua_state && cfg->own_lua_state) { + lua_thread_pool_free (cfg->lua_thread_pool); lua_close (cfg->lua_state); } REF_RELEASE (cfg->libs_ctx); diff --git a/src/libserver/composites.c b/src/libserver/composites.c index 8f3cb179d..83f3a35d4 100644 --- a/src/libserver/composites.c +++ b/src/libserver/composites.c @@ -66,7 +66,7 @@ struct symbol_remove_data { static rspamd_expression_atom_t * rspamd_composite_expr_parse (const gchar *line, gsize len, rspamd_mempool_t *pool, gpointer ud, GError **err); -static gdouble rspamd_composite_expr_process (gpointer input, rspamd_expression_atom_t *atom); +static gdouble rspamd_composite_expr_process (struct rspamd_expr_process_data *process_data, rspamd_expression_atom_t *atom); static gint rspamd_composite_expr_priority (rspamd_expression_atom_t *atom); static void rspamd_composite_expr_destroy (rspamd_expression_atom_t *atom); static void composites_foreach_callback (gpointer key, gpointer value, void *data); @@ -173,9 +173,9 @@ rspamd_composite_process_single_symbol (struct composites_data *cd, } static gdouble -rspamd_composite_expr_process (gpointer input, rspamd_expression_atom_t *atom) +rspamd_composite_expr_process (struct rspamd_expr_process_data *process_data, rspamd_expression_atom_t *atom) { - struct composites_data *cd = (struct composites_data *)input; + struct composites_data *cd = process_data->cd; const gchar *beg = atom->data, *sym = NULL; gchar t; struct symbol_remove_data *rd, *nrd; @@ -344,8 +344,13 @@ composites_foreach_callback (gpointer key, gpointer value, void *data) return; } - rc = rspamd_process_expression (comp->expr, - RSPAMD_EXPRESSION_FLAG_NOOPT, cd); + struct rspamd_expr_process_data process_data; + memset (&process_data, 0, sizeof process_data); + + process_data.flags = RSPAMD_EXPRESSION_FLAG_NOOPT; + process_data.cd = cd; + + rc = rspamd_process_expression (comp->expr, &process_data); /* Checked bit */ setbit (cd->checked, comp->id * 2); diff --git a/src/libutil/expression.c b/src/libutil/expression.c index 21a137f43..2469d0415 100644 --- a/src/libutil/expression.c +++ b/src/libutil/expression.c @@ -984,8 +984,8 @@ rspamd_ast_do_op (struct rspamd_expression_elt *elt, gdouble val, } static gdouble -rspamd_ast_process_node (struct rspamd_expression *expr, gint flags, GNode *node, - gpointer data, GPtrArray *track) +rspamd_ast_process_node (struct rspamd_expression *expr, GNode *node, + struct rspamd_expr_process_data *process_data) { struct rspamd_expression_elt *elt, *celt, *parelt = NULL; GNode *cld; @@ -1010,13 +1010,13 @@ rspamd_ast_process_node (struct rspamd_expression *expr, gint flags, GNode *node t1 = rspamd_get_ticks (TRUE); } - elt->value = expr->subr->process (data, elt->p.atom); + elt->value = expr->subr->process (process_data, elt->p.atom); if (fabs (elt->value) > 1e-9) { elt->p.atom->hits ++; - if (track) { - g_ptr_array_add (track, elt->p.atom); + if (process_data->trace) { + g_ptr_array_add (process_data->trace, elt->p.atom); } } @@ -1057,7 +1057,7 @@ rspamd_ast_process_node (struct rspamd_expression *expr, gint flags, GNode *node continue; } - val = rspamd_ast_process_node (expr, flags, cld, data, track); + val = rspamd_ast_process_node (expr, cld, process_data); if (isnan (acc)) { acc = rspamd_ast_do_op (elt, val, 0, lim, TRUE); @@ -1066,7 +1066,7 @@ rspamd_ast_process_node (struct rspamd_expression *expr, gint flags, GNode *node acc = rspamd_ast_do_op (elt, val, acc, lim, FALSE); } - if (!(flags & RSPAMD_EXPRESSION_FLAG_NOOPT)) { + if (!(process_data->flags & RSPAMD_EXPRESSION_FLAG_NOOPT)) { if (rspamd_ast_node_done (elt, parelt, acc, lim)) { return acc; } @@ -1090,8 +1090,7 @@ rspamd_ast_cleanup_traverse (GNode *n, gpointer d) } gdouble -rspamd_process_expression_track (struct rspamd_expression *expr, gint flags, - gpointer data, GPtrArray *track) +rspamd_process_expression_track (struct rspamd_expression *expr, struct rspamd_expr_process_data *process_data) { gdouble ret = 0; @@ -1099,7 +1098,7 @@ rspamd_process_expression_track (struct rspamd_expression *expr, gint flags, /* Ensure that stack is empty at this point */ g_assert (expr->expression_stack->len == 0); - ret = rspamd_ast_process_node (expr, flags, expr->ast, data, track); + ret = rspamd_ast_process_node (expr, expr->ast, process_data); /* Cleanup */ g_node_traverse (expr->ast, G_IN_ORDER, G_TRAVERSE_ALL, -1, @@ -1124,10 +1123,9 @@ rspamd_process_expression_track (struct rspamd_expression *expr, gint flags, } gdouble -rspamd_process_expression (struct rspamd_expression *expr, gint flags, - gpointer data) +rspamd_process_expression (struct rspamd_expression *expr, struct rspamd_expr_process_data *process_data) { - return rspamd_process_expression_track (expr, flags, data, NULL); + return rspamd_process_expression_track (expr, process_data); } static gboolean diff --git a/src/libutil/expression.h b/src/libutil/expression.h index fefde2974..7f7cb2dda 100644 --- a/src/libutil/expression.h +++ b/src/libutil/expression.h @@ -56,12 +56,24 @@ typedef struct rspamd_expression_atom_s { gint priority; } rspamd_expression_atom_t; +struct rspamd_expr_process_data { + /* Current Lua state to run atom processing */ + struct lua_State *L; + /* Parameter of lua-function processing atom*/ + gint stack_item; + gint flags; + /* != NULL if trace is collected */ + GPtrArray *trace; + struct composites_data *cd; + struct rspamd_task *task; +}; + struct rspamd_atom_subr { /* Parses atom from string and returns atom structure */ rspamd_expression_atom_t * (*parse)(const gchar *line, gsize len, rspamd_mempool_t *pool, gpointer ud, GError **err); /* Process atom via the opaque pointer (e.g. struct rspamd_task *) */ - gdouble (*process) (gpointer input, rspamd_expression_atom_t *atom); + gdouble (*process) (struct rspamd_expr_process_data *process_data, rspamd_expression_atom_t *atom); /* Calculates the relative priority of the expression */ gint (*priority) (rspamd_expression_atom_t *atom); void (*destroy) (rspamd_expression_atom_t *atom); @@ -92,8 +104,8 @@ gboolean rspamd_parse_expression (const gchar *line, gsize len, * @param data opaque data pointer for all the atoms * @return the value of expression */ -gdouble rspamd_process_expression (struct rspamd_expression *expr, gint flags, - gpointer data); +gdouble rspamd_process_expression (struct rspamd_expression *expr, + struct rspamd_expr_process_data *process_data); /** * Process the expression and return its value using atom 'process' functions with the specified data pointer. @@ -103,8 +115,8 @@ gdouble rspamd_process_expression (struct rspamd_expression *expr, gint flags, * @param track pointer array to atoms tracking * @return the value of expression */ -gdouble rspamd_process_expression_track (struct rspamd_expression *expr, gint flags, - gpointer data, GPtrArray *track); +gdouble rspamd_process_expression_track (struct rspamd_expression *expr, + struct rspamd_expr_process_data *process_data); /** * Shows string representation of an expression diff --git a/src/lua/CMakeLists.txt b/src/lua/CMakeLists.txt index 9c561c0e4..0d22027ce 100644 --- a/src/lua/CMakeLists.txt +++ b/src/lua/CMakeLists.txt @@ -12,7 +12,7 @@ SET(LUASRC ${CMAKE_CURRENT_SOURCE_DIR}/lua_common.c ${CMAKE_CURRENT_SOURCE_DIR}/lua_redis.c ${CMAKE_CURRENT_SOURCE_DIR}/lua_upstream.c ${CMAKE_CURRENT_SOURCE_DIR}/lua_mempool.c - ${CMAKE_CURRENT_SOURCE_DIR}/lua_dns.c + ${CMAKE_CURRENT_SOURCE_DIR}/lua_dns_resolver.c ${CMAKE_CURRENT_SOURCE_DIR}/lua_rsa.c ${CMAKE_CURRENT_SOURCE_DIR}/lua_ip.c ${CMAKE_CURRENT_SOURCE_DIR}/lua_expression.c @@ -25,6 +25,8 @@ SET(LUASRC ${CMAKE_CURRENT_SOURCE_DIR}/lua_common.c ${CMAKE_CURRENT_SOURCE_DIR}/lua_fann.c ${CMAKE_CURRENT_SOURCE_DIR}/lua_sqlite3.c ${CMAKE_CURRENT_SOURCE_DIR}/lua_cryptobox.c - ${CMAKE_CURRENT_SOURCE_DIR}/lua_map.c) + ${CMAKE_CURRENT_SOURCE_DIR}/lua_map.c + ${CMAKE_CURRENT_SOURCE_DIR}/lua_thread_pool.c + ${CMAKE_CURRENT_SOURCE_DIR}/lua_dns.c) SET(RSPAMD_LUA ${LUASRC} PARENT_SCOPE)
\ No newline at end of file diff --git a/src/lua/lua_common.c b/src/lua/lua_common.c index 63a95f177..f446ec9e8 100644 --- a/src/lua/lua_common.c +++ b/src/lua/lua_common.c @@ -740,6 +740,7 @@ rspamd_lua_init () luaopen_fann (L); luaopen_sqlite3 (L); luaopen_cryptobox (L); + luaopen_dns (L); luaL_newmetatable (L, "rspamd{ev_base}"); lua_pushstring (L, "class"); @@ -1494,14 +1495,26 @@ rspamd_lua_traceback (lua_State *L) { GString *tb; + + tb = rspamd_lua_get_traceback_string (L); + + lua_pushlightuserdata (L, tb); + + return 1; +} + +GString * +rspamd_lua_get_traceback_string (lua_State *L) +{ + GString *tb; const gchar *msg = lua_tostring (L, 1); tb = g_string_sized_new (100); g_string_append_printf (tb, "%s; trace:", msg); + rspamd_lua_traceback_string (L, tb); - lua_pushlightuserdata (L, tb); - return 1; + return tb; } guint diff --git a/src/lua/lua_common.h b/src/lua/lua_common.h index 913ebcb60..0a9527bb0 100644 --- a/src/lua/lua_common.h +++ b/src/lua/lua_common.h @@ -280,6 +280,7 @@ void luaopen_html (lua_State * L); void luaopen_fann (lua_State *L); void luaopen_sqlite3 (lua_State *L); void luaopen_cryptobox (lua_State *L); +void luaopen_dns (lua_State *L); void rspamd_lua_dostring (const gchar *line); @@ -337,6 +338,14 @@ gboolean rspamd_lua_parse_table_arguments (lua_State *L, gint pos, gint rspamd_lua_traceback (lua_State *L); /** + * Returns stack trace as a string. Caller should clear memory. + * @param L + * @return + */ +GString * +rspamd_lua_get_traceback_string (lua_State *L); + +/** * Returns size of table at position `tbl_pos` */ guint rspamd_lua_table_size (lua_State *L, gint tbl_pos); @@ -408,6 +417,25 @@ void rspamd_lua_add_ref_dtor (lua_State *L, rspamd_mempool_t *pool, gboolean rspamd_lua_require_function (lua_State *L, const gchar *modname, const gchar *funcname); +struct thread_entry; +/** + * Yields thread. should be only called in return statement + * @param thread_entry + * @param nresults + * @return + */ +gint +lua_yield_thread (struct thread_entry *thread_entry, gint nresults); + +/** + * Resumes suspended by lua_yield_thread () thread + * @param task + * @param thread_entry + * @param narg + */ +void +lua_resume_thread (struct rspamd_task *task, struct thread_entry *thread_entry, gint narg); + /* Paths defs */ #define RSPAMD_CONFDIR_INDEX "CONFDIR" #define RSPAMD_RUNDIR_INDEX "RUNDIR" diff --git a/src/lua/lua_config.c b/src/lua/lua_config.c index 815b09e63..6957ded7c 100644 --- a/src/lua/lua_config.c +++ b/src/lua/lua_config.c @@ -19,6 +19,7 @@ #include "libserver/composites.h" #include "libmime/lang_detection.h" #include "lua/lua_map.h" +#include "lua/lua_thread_pool.h" #include "utlist.h" #include <math.h> @@ -1042,6 +1043,7 @@ struct lua_callback_data { gint ref; } callback; gboolean cb_is_ref; + gint stack_level; gint order; }; @@ -1190,43 +1192,111 @@ lua_watcher_callback (gpointer session_data, gpointer ud) lua_settop (L, err_idx - 1); } +gint +lua_do_resume (lua_State *L, gint narg) +{ +#if LUA_VERSION_NUM < 503 + return lua_resume (L, narg); +#else + return lua_resume (L, NULL, narg); +#endif +} + +static void +lua_metric_symbol_callback_return (struct rspamd_task *task, struct thread_entry *thread_entry, gpointer ud, gint ret); + static void lua_metric_symbol_callback (struct rspamd_task *task, gpointer ud) { struct lua_callback_data *cd = ud; struct rspamd_task **ptask; - gint level = lua_gettop (cd->L), nresults, err_idx, ret; - lua_State *L = cd->L; - GString *tb; - struct rspamd_symbol_result *s; + struct thread_entry *thread_entry; + gint ret; - lua_pushcfunction (L, &rspamd_lua_traceback); - err_idx = lua_gettop (L); + thread_entry = lua_thread_pool_get (task->cfg->lua_thread_pool); - level ++; + g_assert(thread_entry->cd == NULL); + thread_entry->cd = cd; + + lua_State *thread = thread_entry->lua_state; + cd->stack_level = lua_gettop (cd->L); if (cd->cb_is_ref) { - lua_rawgeti (L, LUA_REGISTRYINDEX, cd->callback.ref); + lua_rawgeti (thread, LUA_REGISTRYINDEX, cd->callback.ref); } else { - lua_getglobal (L, cd->callback.name); + lua_getglobal (thread, cd->callback.name); } - ptask = lua_newuserdata (L, sizeof (struct rspamd_task *)); - rspamd_lua_setclass (L, "rspamd{task}", -1); + ptask = lua_newuserdata (thread, sizeof (struct rspamd_task *)); + rspamd_lua_setclass (thread, "rspamd{task}", -1); *ptask = task; - if ((ret = lua_pcall (L, 1, LUA_MULTRET, err_idx)) != 0) { - tb = lua_touserdata (L, -1); + ret = lua_do_resume (thread, 1); + + if (ret != LUA_YIELD) { + /* + LUA_YIELD state should not be handled here. + It should only happen when the thread initiated a asynchronous event and it will be restored as soon + the event is finished + */ + lua_metric_symbol_callback_return (task, thread_entry, ud, ret); + } +} + +gint +lua_yield_thread (struct thread_entry *thread_entry, gint nresults) +{ + g_assert (thread_entry->cd != NULL); + + return lua_yield (thread_entry->lua_state, nresults); +} + +void +lua_resume_thread (struct rspamd_task *task, struct thread_entry *thread_entry, gint narg) +{ + g_assert (thread_entry->cd != NULL); + + /* + * The only state where we can resume from is LUA_YIELD + * Another acceptable status is OK (0) but in that case we should push function on stack + * to start the thread from, which is happening in lua_metric_symbol_callback(), not in this function. + */ + g_assert (lua_status (thread_entry->lua_state) == LUA_YIELD); + + gint ret; + + lua_thread_pool_set_running_entry (task->cfg->lua_thread_pool, thread_entry); + ret = lua_do_resume (thread_entry->lua_state, narg); + + if (ret != LUA_YIELD) { + lua_metric_symbol_callback_return (task, thread_entry, thread_entry->cd, ret); + } +} + +static void +lua_metric_symbol_callback_return (struct rspamd_task *task, struct thread_entry *thread_entry, gpointer ud, gint ret) +{ + GString *tb; + struct lua_callback_data *cd = ud; + int nresults; + struct rspamd_symbol_result *s; + lua_State *thread = thread_entry->lua_state; + + if (ret != 0) { + + tb = rspamd_lua_get_traceback_string (thread); msg_err_task ("call to (%s) failed (%d): %v", cd->symbol, ret, tb); if (tb) { g_string_free (tb, TRUE); - lua_pop (L, 1); } + g_assert (lua_gettop (thread) >= cd->stack_level); + /* maybe there is a way to recover here. For now, just remove faulty thread */ + lua_thread_pool_terminate_entry (task->cfg->lua_thread_pool, thread_entry); } else { - nresults = lua_gettop (L) - level; + nresults = lua_gettop (thread) - cd->stack_level; if (nresults >= 1) { /* Function returned boolean, so maybe we need to insert result? */ @@ -1236,16 +1306,16 @@ lua_metric_symbol_callback (struct rspamd_task *task, gpointer ud) gint type; struct lua_watcher_data *wd; - type = lua_type (cd->L, level + 1); + type = lua_type (thread, cd->stack_level + 1); if (type == LUA_TBOOLEAN) { - res = lua_toboolean (L, level + 1); + res = lua_toboolean (thread, cd->stack_level + 1); } else if (type == LUA_TFUNCTION) { /* Function returned a closure that should be watched for */ wd = rspamd_mempool_alloc (task->task_pool, sizeof (*wd)); - lua_pushvalue (cd->L, level + 1); - wd->cb_ref = luaL_ref (L, LUA_REGISTRYINDEX); + lua_pushvalue (thread /*cd->L*/, cd->stack_level + 1); + wd->cb_ref = luaL_ref (thread, LUA_REGISTRYINDEX); wd->cbd = cd; rspamd_session_watcher_push_callback (task->s, rspamd_session_get_watcher (task->s), @@ -1258,14 +1328,14 @@ lua_metric_symbol_callback (struct rspamd_task *task, gpointer ud) rspamd_session_get_watcher (task->s)); } else { - res = lua_tonumber (L, level + 1); + res = lua_tonumber (thread, cd->stack_level + 1); } if (res) { gint first_opt = 2; - if (lua_type (L, level + 2) == LUA_TNUMBER) { - flag = lua_tonumber (L, level + 2); + if (lua_type (thread, cd->stack_level + 2) == LUA_TNUMBER) { + flag = lua_tonumber (thread, cd->stack_level + 2); /* Shift opt index */ first_opt = 3; } @@ -1276,35 +1346,39 @@ lua_metric_symbol_callback (struct rspamd_task *task, gpointer ud) s = rspamd_task_insert_result (task, cd->symbol, flag, NULL); if (s) { - guint last_pos = lua_gettop (L); + guint last_pos = lua_gettop (thread); - for (i = level + first_opt; i <= last_pos; i++) { - if (lua_type (L, i) == LUA_TSTRING) { - const char *opt = lua_tostring (L, i); + for (i = cd->stack_level + first_opt; i <= last_pos; i++) { + if (lua_type (thread, i) == LUA_TSTRING) { + const char *opt = lua_tostring (thread, i); rspamd_task_add_result_option (task, s, opt); } - else if (lua_type (L, i) == LUA_TTABLE) { - lua_pushvalue (L, i); + else if (lua_type (thread, i) == LUA_TTABLE) { + lua_pushvalue (thread, i); - for (lua_pushnil (L); lua_next (L, -2); lua_pop (L, 1)) { - const char *opt = lua_tostring (L, -1); + for (lua_pushnil (thread); lua_next (thread, -2); lua_pop (thread, 1)) { + const char *opt = lua_tostring (thread, -1); rspamd_task_add_result_option (task, s, opt); } - lua_pop (L, 1); + lua_pop (thread, 1); } } } } - lua_pop (L, nresults); + lua_pop (thread, nresults); } + + g_assert (lua_gettop (thread) == cd->stack_level); /* we properly cleaned up the stack */ + + lua_thread_pool_return (task->cfg->lua_thread_pool, thread_entry); } - lua_pop (L, 1); /* Error function */ + cd->stack_level = 0; } static gint diff --git a/src/lua/lua_dns.c b/src/lua/lua_dns.c index 389b9d4f1..0d12a137c 100644 --- a/src/lua/lua_dns.c +++ b/src/lua/lua_dns.c @@ -1,6 +1,4 @@ /*- - * Copyright 2016 Vsevolod Stakhov - * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at @@ -14,301 +12,45 @@ * limitations under the License. */ #include "lua_common.h" -#include "utlist.h" - - -/*** - * @module rspamd_resolver - * This module allows to resolve DNS names from LUA code. All resolving is executed - * asynchronously. Here is an example of name resolution: - * @example -local function symbol_callback(task) - local host = 'example.com' +#include "lua_dns_resolver.h" +#include "lua_thread_pool.h" - local function dns_cb(resolver, to_resolve, results, err, _, authenticated) - if not results then - rspamd_logger.infox('DNS resolving of %1 failed: %2', host, err) - return - end - for _,r in ipairs(results) do - -- r is of type rspamd{ip} here, but it can be converted to string - rspamd_logger.infox('Resolved %1 to %2', host, tostring(r)) - end - end - - task:get_resolver():resolve_a(task:get_session(), task:get_mempool(), - host, dns_cb) -end - */ -struct rspamd_dns_resolver * lua_check_dns_resolver (lua_State * L); -void luaopen_dns_resolver (lua_State * L); +LUA_FUNCTION_DEF (dns, request); -/* Lua bindings */ -LUA_FUNCTION_DEF (dns_resolver, init); -LUA_FUNCTION_DEF (dns_resolver, resolve_a); -LUA_FUNCTION_DEF (dns_resolver, resolve_ptr); -LUA_FUNCTION_DEF (dns_resolver, resolve_txt); -LUA_FUNCTION_DEF (dns_resolver, resolve_mx); -LUA_FUNCTION_DEF (dns_resolver, resolve_ns); -LUA_FUNCTION_DEF (dns_resolver, resolve); - -static const struct luaL_reg dns_resolverlib_f[] = { - LUA_INTERFACE_DEF (dns_resolver, init), - {NULL, NULL} -}; - -static const struct luaL_reg dns_resolverlib_m[] = { - LUA_INTERFACE_DEF (dns_resolver, resolve_a), - LUA_INTERFACE_DEF (dns_resolver, resolve_ptr), - LUA_INTERFACE_DEF (dns_resolver, resolve_txt), - LUA_INTERFACE_DEF (dns_resolver, resolve_mx), - LUA_INTERFACE_DEF (dns_resolver, resolve_ns), - LUA_INTERFACE_DEF (dns_resolver, resolve), - {"__tostring", rspamd_lua_class_tostring}, - {NULL, NULL} +static const struct luaL_reg dns_f[] = { + LUA_INTERFACE_DEF (dns, request), + {"__tostring", rspamd_lua_class_tostring}, + {NULL, NULL} }; -struct rspamd_dns_resolver * -lua_check_dns_resolver (lua_State * L) -{ - void *ud = rspamd_lua_check_udata (L, 1, "rspamd{resolver}"); - luaL_argcheck (L, ud != NULL, 1, "'resolver' expected"); - return ud ? *((struct rspamd_dns_resolver **)ud) : NULL; -} +void +lua_dns_callback (struct rdns_reply *reply, void *arg); -struct lua_dns_cbdata { - lua_State *L; +struct lua_rspamd_dns_cbdata { + struct thread_entry *thread; + struct rspamd_task *task; struct rspamd_dns_resolver *resolver; - gint cbref; - const gchar *to_resolve; - const gchar *user_str; struct rspamd_async_watcher *w; struct rspamd_async_session *s; }; -static int -lua_dns_get_type (lua_State *L, int argno) -{ - int type = RDNS_REQUEST_A; - const gchar *strtype; - - if (lua_type (L, argno) != LUA_TSTRING) { - lua_pushvalue (L, argno); - lua_gettable (L, lua_upvalueindex (1)); - - type = lua_tonumber (L, -1); - lua_pop (L, 1); - if (type == 0) { - rspamd_lua_typerror (L, argno, "dns_request_type"); - } - } - else { - strtype = lua_tostring (L, argno); - - if (g_ascii_strcasecmp (strtype, "a") == 0) { - type = RDNS_REQUEST_A; - } - else if (g_ascii_strcasecmp (strtype, "aaaa") == 0) { - type = RDNS_REQUEST_AAAA; - } - else if (g_ascii_strcasecmp (strtype, "mx") == 0) { - type = RDNS_REQUEST_MX; - } - else if (g_ascii_strcasecmp (strtype, "txt") == 0) { - type = RDNS_REQUEST_TXT; - } - else if (g_ascii_strcasecmp (strtype, "ptr") == 0) { - type = RDNS_REQUEST_PTR; - } - else if (g_ascii_strcasecmp (strtype, "soa") == 0) { - type = RDNS_REQUEST_SOA; - } - else { - msg_err ("bad DNS type: %s", strtype); - } - } - - return type; -} - -static void -lua_dns_callback (struct rdns_reply *reply, gpointer arg) -{ - struct lua_dns_cbdata *cd = arg; - gint i = 0, naddrs = 0; - struct rspamd_dns_resolver **presolver; - struct rdns_reply_entry *elt; - rspamd_inet_addr_t *addr; - - lua_rawgeti (cd->L, LUA_REGISTRYINDEX, cd->cbref); - presolver = lua_newuserdata (cd->L, sizeof (gpointer)); - rspamd_lua_setclass (cd->L, "rspamd{resolver}", -1); - - *presolver = cd->resolver; - lua_pushstring (cd->L, cd->to_resolve); - - /* - * XXX: rework to handle different request types - */ - if (reply->code == RDNS_RC_NOERROR) { - LL_FOREACH (reply->entries, elt) { - naddrs ++; - } - - lua_createtable (cd->L, naddrs, 0); - - LL_FOREACH (reply->entries, elt) - { - switch (elt->type) { - case RDNS_REQUEST_A: - addr = rspamd_inet_address_new (AF_INET, &elt->content.a.addr); - rspamd_lua_ip_push (cd->L, addr); - rspamd_inet_address_free (addr); - lua_rawseti (cd->L, -2, ++i); - break; - case RDNS_REQUEST_AAAA: - addr = rspamd_inet_address_new (AF_INET6, &elt->content.aaa.addr); - rspamd_lua_ip_push (cd->L, addr); - rspamd_inet_address_free (addr); - lua_rawseti (cd->L, -2, ++i); - break; - case RDNS_REQUEST_NS: - lua_pushstring (cd->L, elt->content.ns.name); - lua_rawseti (cd->L, -2, ++i); - break; - case RDNS_REQUEST_PTR: - lua_pushstring (cd->L, elt->content.ptr.name); - lua_rawseti (cd->L, -2, ++i); - break; - case RDNS_REQUEST_TXT: - case RDNS_REQUEST_SPF: - lua_pushstring (cd->L, elt->content.txt.data); - lua_rawseti (cd->L, -2, ++i); - break; - case RDNS_REQUEST_MX: - /* mx['name'], mx['priority'] */ - lua_createtable (cd->L, 0, 2); - rspamd_lua_table_set (cd->L, "name", elt->content.mx.name); - lua_pushstring (cd->L, "priority"); - lua_pushinteger (cd->L, elt->content.mx.priority); - lua_settable (cd->L, -3); - - lua_rawseti (cd->L, -2, ++i); - break; - case RDNS_REQUEST_SOA: - lua_createtable (cd->L, 0, 7); - rspamd_lua_table_set (cd->L, "ns", elt->content.soa.mname); - rspamd_lua_table_set (cd->L, "contact", elt->content.soa.admin); - lua_pushstring (cd->L, "serial"); - lua_pushinteger (cd->L, elt->content.soa.serial); - lua_settable (cd->L, -3); - lua_pushstring (cd->L, "refresh"); - lua_pushinteger (cd->L, elt->content.soa.refresh); - lua_settable (cd->L, -3); - lua_pushstring (cd->L, "retry"); - lua_pushinteger (cd->L, elt->content.soa.retry); - lua_settable (cd->L, -3); - lua_pushstring (cd->L, "expiry"); - lua_pushinteger (cd->L, elt->content.soa.expire); - lua_settable (cd->L, -3); - /* Negative TTL */ - lua_pushstring (cd->L, "nx"); - lua_pushinteger (cd->L, elt->content.soa.minimum); - lua_settable (cd->L, -3); - - lua_rawseti (cd->L, -2, ++i); - break; - } - } - lua_pushnil (cd->L); - } - else { - lua_pushnil (cd->L); - lua_pushstring (cd->L, rdns_strerror (reply->code)); - } - - if (cd->user_str != NULL) { - lua_pushstring (cd->L, cd->user_str); - } - else { - lua_pushnil (cd->L); - } - - lua_pushboolean (cd->L, reply->authenticated); - - if (lua_pcall (cd->L, 6, 0, 0) != 0) { - msg_info ("call to dns callback failed: %s", lua_tostring (cd->L, -1)); - lua_pop (cd->L, 1); - } - - /* Unref function */ - luaL_unref (cd->L, LUA_REGISTRYINDEX, cd->cbref); - - if (cd->s) { - rspamd_session_watcher_pop (cd->s, cd->w); - } -} - -/*** - * @function rspamd_resolver.init(ev_base, config) - * @param {event_base} ev_base event base used for asynchronous events - * @param {rspamd_config} config rspamd configuration parameters - * @return {rspamd_resolver} new resolver object associated with the specified base - */ -static int -lua_dns_resolver_init (lua_State *L) -{ - struct rspamd_dns_resolver *resolver, **presolver; - struct rspamd_config *cfg, **pcfg; - struct event_base *base, **pbase; - - /* Check args */ - pbase = rspamd_lua_check_udata (L, 1, "rspamd{ev_base}"); - luaL_argcheck (L, pbase != NULL, 1, "'ev_base' expected"); - base = pbase ? *(pbase) : NULL; - pcfg = rspamd_lua_check_udata (L, 2, "rspamd{config}"); - luaL_argcheck (L, pcfg != NULL, 2, "'config' expected"); - cfg = pcfg ? *(pcfg) : NULL; - - if (base != NULL && cfg != NULL) { - resolver = dns_resolver_init (NULL, base, cfg); - if (resolver) { - presolver = lua_newuserdata (L, sizeof (gpointer)); - rspamd_lua_setclass (L, "rspamd{resolver}", -1); - *presolver = resolver; - } - else { - lua_pushnil (L); - } - } - else { - lua_pushnil (L); - } - - return 1; -} - -static int -lua_dns_resolver_resolve_common (lua_State *L, - struct rspamd_dns_resolver *resolver, - enum rdns_request_type type, - int first) +static gint +lua_dns_request (lua_State *L) { - LUA_TRACE_POINT; + GError *err = NULL; struct rspamd_async_session *session = NULL; - rspamd_mempool_t *pool = NULL; - const gchar *to_resolve = NULL, *user_str = NULL; - struct lua_dns_cbdata *cbdata; - gint cbref = -1, ret; + struct lua_rspamd_dns_cbdata *cbdata = NULL; + const gchar *to_resolve = NULL; + const gchar *type_str = NULL; struct rspamd_task *task = NULL; - GError *err = NULL; + rspamd_mempool_t *pool = NULL; + gint ret = 0; gboolean forced = FALSE; /* Check arguments */ - if (!rspamd_lua_parse_table_arguments (L, first, &err, - "session=U{session};mempool=U{mempool};*name=S;*callback=F;" - "option=S;task=U{task};forced=B", - &session, &pool, &to_resolve, &cbref, &user_str, &task, &forced)) { + if (!rspamd_lua_parse_table_arguments (L, 1, &err, + "*name=S;*task=U{task};*type=S;forced=B", + &to_resolve, &task, &type_str, &forced)) { if (err) { ret = luaL_error (L, "invalid arguments: %s", err->message); @@ -321,283 +63,108 @@ lua_dns_resolver_resolve_common (lua_State *L, } if (task) { - pool = task->task_pool; session = task->s; + pool = task->task_pool; + } + else { + return luaL_error (L, "invalid arguments: either task or session/config should be set"); } - if (pool != NULL && to_resolve != NULL && cbref != -1) { - cbdata = rspamd_mempool_alloc0 (pool, sizeof (struct lua_dns_cbdata)); - cbdata->L = L; - cbdata->resolver = resolver; - cbdata->cbref = cbref; - cbdata->user_str = rspamd_mempool_strdup (pool, user_str); - - if (type != RDNS_REQUEST_PTR) { - cbdata->to_resolve = rspamd_mempool_strdup (pool, to_resolve); - } - else { - char *ptr_str; + enum rdns_request_type type = rdns_type_fromstr (type_str); - ptr_str = rdns_generate_ptr_from_str (to_resolve); + if (type == RDNS_REQUEST_INVALID) { + return luaL_error (L, "invalid arguments: this record type is not supported"); + } - if (ptr_str == NULL) { - msg_err_task_check ("wrong resolve string to PTR request: %s", - to_resolve); - lua_pushnil (L); + cbdata = rspamd_mempool_alloc0 (pool, sizeof (*cbdata)); - return 1; - } + cbdata->task = task; - cbdata->to_resolve = rspamd_mempool_strdup (pool, ptr_str); - to_resolve = cbdata->to_resolve; - free (ptr_str); - } + if (type == RDNS_REQUEST_PTR) { + char *ptr_str; - if (task == NULL) { - if (make_dns_request (resolver, - session, - pool, - lua_dns_callback, - cbdata, - type, - to_resolve)) { + ptr_str = rdns_generate_ptr_from_str (to_resolve); - lua_pushboolean (L, TRUE); + if (ptr_str == NULL) { + msg_err_task_check ("wrong resolve string to PTR request: %s", + to_resolve); + lua_pushnil (L); - if (session) { - cbdata->s = session; - cbdata->w = rspamd_session_get_watcher (session); - rspamd_session_watcher_push (session); - } - } - else { - lua_pushnil (L); - } + return 1; } - else { - if (forced) { - ret = make_dns_request_task_forced (task, - lua_dns_callback, - cbdata, - type, - to_resolve); - } - else { - ret = make_dns_request_task (task, - lua_dns_callback, - cbdata, - type, - to_resolve); - } - if (ret) { - lua_pushboolean (L, TRUE); - cbdata->s = session; - cbdata->w = rspamd_session_get_watcher (session); - rspamd_session_watcher_push (session); - } - else { - lua_pushnil (L); - } - } + to_resolve = rspamd_mempool_strdup (pool, ptr_str); + free (ptr_str); } - else { - return luaL_error (L, "invalid arguments to lua_resolve"); - } - - return 1; -} - -/*** - * @method resolver:resolve_a(session, pool, host, callback) - * Resolve A record for a specified host. - * @param {async_session} session asynchronous session normally associated with rspamd task (`task:get_session()`) - * @param {mempool} pool memory pool for storing intermediate data - * @param {string} host name to resolve - * @param {function} callback callback function to be called upon name resolution is finished; must be of type `function (resolver, to_resolve, results, err)` - * @return {boolean} `true` if DNS request has been scheduled - */ -static int -lua_dns_resolver_resolve_a (lua_State *L) -{ - struct rspamd_dns_resolver *dns_resolver = lua_check_dns_resolver (L); - - if (dns_resolver) { - return lua_dns_resolver_resolve_common (L, - dns_resolver, - RDNS_REQUEST_A, - 2); + if (forced) { + ret = make_dns_request_task_forced (task, + lua_dns_callback, + cbdata, + type, + to_resolve); } else { - lua_pushnil (L); + ret = make_dns_request_task (task, + lua_dns_callback, + cbdata, + type, + to_resolve); } - return 1; -} - -/*** - * @method resolver:resolve_ptr(session, pool, ip, callback) - * Resolve PTR record for a specified host. - * @param {async_session} session asynchronous session normally associated with rspamd task (`task:get_session()`) - * @param {mempool} pool memory pool for storing intermediate data - * @param {string} ip name to resolve in string form (e.g. '8.8.8.8' or '2001:dead::') - * @param {function} callback callback function to be called upon name resolution is finished; must be of type `function (resolver, to_resolve, results, err)` - * @return {boolean} `true` if DNS request has been scheduled - */ -static int -lua_dns_resolver_resolve_ptr (lua_State *L) -{ - struct rspamd_dns_resolver *dns_resolver = lua_check_dns_resolver (L); - - if (dns_resolver) { - return lua_dns_resolver_resolve_common (L, - dns_resolver, - RDNS_REQUEST_PTR, - 2); - } - else { - lua_pushnil (L); - } - - return 1; -} - -/*** - * @method resolver:resolve_txt(session, pool, host, callback) - * Resolve TXT record for a specified host. - * @param {async_session} session asynchronous session normally associated with rspamd task (`task:get_session()`) - * @param {mempool} pool memory pool for storing intermediate data - * @param {string} host name to get TXT record for - * @param {function} callback callback function to be called upon name resolution is finished; must be of type `function (resolver, to_resolve, results, err)` - * @return {boolean} `true` if DNS request has been scheduled - */ -static int -lua_dns_resolver_resolve_txt (lua_State *L) -{ - struct rspamd_dns_resolver *dns_resolver = lua_check_dns_resolver (L); - - if (dns_resolver) { - return lua_dns_resolver_resolve_common (L, - dns_resolver, - RDNS_REQUEST_TXT, - 2); + if (ret) { + cbdata->thread = lua_thread_pool_get_running_entry (task->cfg->lua_thread_pool); + cbdata->s = session; + cbdata->w = rspamd_session_get_watcher (session); + rspamd_session_watcher_push (session); + return lua_yield_thread (cbdata->thread, 0); } else { lua_pushnil (L); + return 1; } - - return 1; } -/*** - * @method resolver:resolve_mx(session, pool, host, callback) - * Resolve MX record for a specified host. - * @param {async_session} session asynchronous session normally associated with rspamd task (`task:get_session()`) - * @param {mempool} pool memory pool for storing intermediate data - * @param {string} host name to get MX record for - * @param {function} callback callback function to be called upon name resolution is finished; must be of type `function (resolver, to_resolve, results, err)` - * @return {boolean} `true` if DNS request has been scheduled - */ -static int -lua_dns_resolver_resolve_mx (lua_State *L) +void +lua_dns_callback (struct rdns_reply *reply, void *arg) { - struct rspamd_dns_resolver *dns_resolver = lua_check_dns_resolver (L); + struct lua_rspamd_dns_cbdata *cbdata = arg; + lua_State *L = cbdata->thread->lua_state; - if (dns_resolver) { - return lua_dns_resolver_resolve_common (L, - dns_resolver, - RDNS_REQUEST_MX, - 2); + if (reply->code != RDNS_RC_NOERROR) { + lua_pushboolean (L, false); + lua_pushstring (L, rdns_strerror (reply->code)); } else { - lua_pushnil (L); - } + lua_push_dns_reply (L, reply); - return 1; -} + lua_pushboolean (L, reply->authenticated); + lua_setfield (L, -3, "authenticated"); -/*** - * @method resolver:resolve_ns(session, pool, host, callback) - * Resolve NS records for a specified host. - * @param {async_session} session asynchronous session normally associated with rspamd task (`task:get_session()`) - * @param {mempool} pool memory pool for storing intermediate data - * @param {string} host name to get NS records for - * @param {function} callback callback function to be called upon name resolution is finished; must be of type `function (resolver, to_resolve, results, err)` - * @return {boolean} `true` if DNS request has been scheduled - */ -static int -lua_dns_resolver_resolve_ns (lua_State *L) -{ - struct rspamd_dns_resolver *dns_resolver = lua_check_dns_resolver (L); - - if (dns_resolver) { - return lua_dns_resolver_resolve_common (L, - dns_resolver, - RDNS_REQUEST_NS, - 2); - } - else { - lua_pushnil (L); + /* result 1 - not and error */ + lua_pushboolean (L, true); + /* push table into stack, result 2 - results itself */ + lua_pushvalue (L, -3); } - return 1; -} - -/* XXX: broken currently */ -static int -lua_dns_resolver_resolve (lua_State *L) -{ - struct rspamd_dns_resolver *dns_resolver = lua_check_dns_resolver (L); - int type; + lua_resume_thread (cbdata->task, cbdata->thread, 2); - type = lua_dns_get_type (L, 2); - - if (dns_resolver && type != 0) { - return lua_dns_resolver_resolve_common (L, dns_resolver, type, 3); - } - else { - lua_pushnil (L); + if (cbdata->s) { + rspamd_session_watcher_pop (cbdata->s, cbdata->w); } - - return 1; } static gint -lua_load_dns (lua_State * L) +lua_load_dns (lua_State *L) { lua_newtable (L); - luaL_register (L, NULL, dns_resolverlib_f); + luaL_register (L, NULL, dns_f); return 1; } void -luaopen_dns_resolver (lua_State * L) +luaopen_dns (lua_State *L) { - - luaL_newmetatable (L, "rspamd{resolver}"); - lua_pushstring (L, "__index"); - lua_pushvalue (L, -2); - lua_settable (L, -3); - - lua_pushstring (L, "class"); - lua_pushstring (L, "rspamd{resolver}"); - lua_rawset (L, -3); - - { - LUA_ENUM (L, DNS_A, RDNS_REQUEST_A); - LUA_ENUM (L, DNS_PTR, RDNS_REQUEST_PTR); - LUA_ENUM (L, DNS_MX, RDNS_REQUEST_MX); - LUA_ENUM (L, DNS_TXT, RDNS_REQUEST_TXT); - LUA_ENUM (L, DNS_SRV, RDNS_REQUEST_SRV); - LUA_ENUM (L, DNS_SPF, RDNS_REQUEST_SPF); - LUA_ENUM (L, DNS_AAAA, RDNS_REQUEST_AAAA); - LUA_ENUM (L, DNS_SOA, RDNS_REQUEST_SOA); - } - - luaL_register (L, NULL, dns_resolverlib_m); - rspamd_lua_add_preload (L, "rspamd_resolver", lua_load_dns); - - lua_pop (L, 1); /* remove metatable from stack */ + rspamd_lua_add_preload (L, "rspamd_dns", lua_load_dns); } diff --git a/src/lua/lua_dns_resolver.c b/src/lua/lua_dns_resolver.c new file mode 100644 index 000000000..dc475f2ae --- /dev/null +++ b/src/lua/lua_dns_resolver.c @@ -0,0 +1,631 @@ +/*- + * Copyright 2016 Vsevolod Stakhov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "lua_common.h" +#include "lua_thread_pool.h" +#include "utlist.h" + + +/*** + * @module rspamd_resolver + * This module allows to resolve DNS names from LUA code. All resolving is executed + * asynchronously. Here is an example of name resolution: + * @example +local function symbol_callback(task) + local host = 'example.com' + + local function dns_cb(resolver, to_resolve, results, err, _, authenticated) + if not results then + rspamd_logger.infox('DNS resolving of %1 failed: %2', host, err) + return + end + for _,r in ipairs(results) do + -- r is of type rspamd{ip} here, but it can be converted to string + rspamd_logger.infox('Resolved %1 to %2', host, tostring(r)) + end + end + + task:get_resolver():resolve_a(task:get_session(), task:get_mempool(), + host, dns_cb) +end + */ +struct rspamd_dns_resolver * lua_check_dns_resolver (lua_State * L); +void luaopen_dns_resolver (lua_State * L); + +/* Lua bindings */ +LUA_FUNCTION_DEF (dns_resolver, init); +LUA_FUNCTION_DEF (dns_resolver, resolve_a); +LUA_FUNCTION_DEF (dns_resolver, resolve_ptr); +LUA_FUNCTION_DEF (dns_resolver, resolve_txt); +LUA_FUNCTION_DEF (dns_resolver, resolve_mx); +LUA_FUNCTION_DEF (dns_resolver, resolve_ns); +LUA_FUNCTION_DEF (dns_resolver, resolve); + +void lua_push_dns_reply (lua_State *L, const struct rdns_reply *reply); + +static const struct luaL_reg dns_resolverlib_f[] = { + LUA_INTERFACE_DEF (dns_resolver, init), + {NULL, NULL} +}; + +static const struct luaL_reg dns_resolverlib_m[] = { + LUA_INTERFACE_DEF (dns_resolver, resolve_a), + LUA_INTERFACE_DEF (dns_resolver, resolve_ptr), + LUA_INTERFACE_DEF (dns_resolver, resolve_txt), + LUA_INTERFACE_DEF (dns_resolver, resolve_mx), + LUA_INTERFACE_DEF (dns_resolver, resolve_ns), + LUA_INTERFACE_DEF (dns_resolver, resolve), + {"__tostring", rspamd_lua_class_tostring}, + {NULL, NULL} +}; + +struct rspamd_dns_resolver * +lua_check_dns_resolver (lua_State * L) +{ + void *ud = rspamd_lua_check_udata (L, 1, "rspamd{resolver}"); + luaL_argcheck (L, ud != NULL, 1, "'resolver' expected"); + return ud ? *((struct rspamd_dns_resolver **)ud) : NULL; +} + +struct lua_dns_cbdata { + struct rspamd_task *task; + struct rspamd_dns_resolver *resolver; + gint cbref; + const gchar *to_resolve; + const gchar *user_str; + struct rspamd_async_watcher *w; + struct rspamd_async_session *s; +}; + +static int +lua_dns_get_type (lua_State *L, int argno) +{ + int type = RDNS_REQUEST_A; + const gchar *strtype; + + if (lua_type (L, argno) != LUA_TSTRING) { + lua_pushvalue (L, argno); + lua_gettable (L, lua_upvalueindex (1)); + + type = lua_tonumber (L, -1); + lua_pop (L, 1); + if (type == 0) { + rspamd_lua_typerror (L, argno, "dns_request_type"); + } + } + else { + strtype = lua_tostring (L, argno); + + if (g_ascii_strcasecmp (strtype, "a") == 0) { + type = RDNS_REQUEST_A; + } + else if (g_ascii_strcasecmp (strtype, "aaaa") == 0) { + type = RDNS_REQUEST_AAAA; + } + else if (g_ascii_strcasecmp (strtype, "mx") == 0) { + type = RDNS_REQUEST_MX; + } + else if (g_ascii_strcasecmp (strtype, "txt") == 0) { + type = RDNS_REQUEST_TXT; + } + else if (g_ascii_strcasecmp (strtype, "ptr") == 0) { + type = RDNS_REQUEST_PTR; + } + else if (g_ascii_strcasecmp (strtype, "soa") == 0) { + type = RDNS_REQUEST_SOA; + } + else { + msg_err ("bad DNS type: %s", strtype); + } + } + + return type; +} + +static void +lua_dns_resolver_callback (struct rdns_reply *reply, gpointer arg) +{ + struct lua_dns_cbdata *cd = arg; + struct rspamd_dns_resolver **presolver; + lua_State *L; + struct lua_callback_state cbs; + + lua_thread_pool_prepare_callback (cd->resolver->cfg->lua_thread_pool, &cbs); + L = cbs.L; + + lua_rawgeti (L, LUA_REGISTRYINDEX, cd->cbref); + + presolver = lua_newuserdata (L, sizeof (gpointer)); + rspamd_lua_setclass (L, "rspamd{resolver}", -1); + + *presolver = cd->resolver; + lua_pushstring (L, cd->to_resolve); + + lua_push_dns_reply (L, reply); + + /* + * 1 - resolver + * 2 - to_resolve + * 3 - entries | nil + * 4 - error | nil + * 5 - user_str + * 6 - reply->authenticated + */ + if (reply->code != RDNS_RC_NOERROR) { + lua_pushnil (L); + lua_pushstring (L, rdns_strerror (reply->code)); + } + if (cd->user_str != NULL) { + lua_pushstring (L, cd->user_str); + } + else { + lua_pushnil (L); + } + + lua_pushboolean (L, reply->authenticated); + + if (lua_pcall (L, 6, 0, 0) != 0) { + msg_info ("call to dns callback failed: %s", lua_tostring (L, -1)); + lua_pop (L, 1); + } + + /* Unref function */ + luaL_unref (L, LUA_REGISTRYINDEX, cd->cbref); + + lua_thread_pool_restore_callback (&cbs); + + if (cd->s) { + rspamd_session_watcher_pop (cd->s, cd->w); + } +} + +void +lua_push_dns_reply (lua_State *L, const struct rdns_reply *reply) +{ + gint i = 0, naddrs = 0; + struct rdns_reply_entry *elt; + rspamd_inet_addr_t *addr; + + if (reply->code == RDNS_RC_NOERROR) { + LL_FOREACH (reply->entries, elt) { + naddrs ++; + } + + lua_createtable (L, naddrs, 0); + + LL_FOREACH (reply->entries, elt) + { + switch (elt->type) { + case RDNS_REQUEST_A: + addr = rspamd_inet_address_new (AF_INET, &elt->content.a.addr); + rspamd_lua_ip_push (L, addr); + rspamd_inet_address_free (addr); + lua_rawseti (L, -2, ++i); + break; + case RDNS_REQUEST_AAAA: + addr = rspamd_inet_address_new (AF_INET6, &elt->content.aaa.addr); + rspamd_lua_ip_push (L, addr); + rspamd_inet_address_free (addr); + lua_rawseti (L, -2, ++i); + break; + case RDNS_REQUEST_NS: + lua_pushstring (L, elt->content.ns.name); + lua_rawseti (L, -2, ++i); + break; + case RDNS_REQUEST_PTR: + lua_pushstring (L, elt->content.ptr.name); + lua_rawseti (L, -2, ++i); + break; + case RDNS_REQUEST_TXT: + case RDNS_REQUEST_SPF: + lua_pushstring (L, elt->content.txt.data); + lua_rawseti (L, -2, ++i); + break; + case RDNS_REQUEST_MX: + /* mx['name'], mx['priority'] */ + lua_createtable (L, 0, 2); + rspamd_lua_table_set (L, "name", elt->content.mx.name); + lua_pushstring (L, "priority"); + lua_pushinteger (L, elt->content.mx.priority); + lua_settable (L, -3); + + lua_rawseti (L, -2, ++i); + break; + case RDNS_REQUEST_SOA: + lua_createtable (L, 0, 7); + rspamd_lua_table_set (L, "ns", elt->content.soa.mname); + rspamd_lua_table_set (L, "contact", elt->content.soa.admin); + lua_pushstring (L, "serial"); + lua_pushinteger (L, elt->content.soa.serial); + lua_settable (L, -3); + lua_pushstring (L, "refresh"); + lua_pushinteger (L, elt->content.soa.refresh); + lua_settable (L, -3); + lua_pushstring (L, "retry"); + lua_pushinteger (L, elt->content.soa.retry); + lua_settable (L, -3); + lua_pushstring (L, "expiry"); + lua_pushinteger (L, elt->content.soa.expire); + lua_settable (L, -3); + /* Negative TTL */ + lua_pushstring (L, "nx"); + lua_pushinteger (L, elt->content.soa.minimum); + lua_settable (L, -3); + + lua_rawseti (L, -2, ++i); + break; + } + } + lua_pushnil (L); + } +} + +/*** + * @function rspamd_resolver.init(ev_base, config) + * @param {event_base} ev_base event base used for asynchronous events + * @param {rspamd_config} config rspamd configuration parameters + * @return {rspamd_resolver} new resolver object associated with the specified base + */ +static int +lua_dns_resolver_init (lua_State *L) +{ + struct rspamd_dns_resolver *resolver, **presolver; + struct rspamd_config *cfg, **pcfg; + struct event_base *base, **pbase; + + /* Check args */ + pbase = rspamd_lua_check_udata (L, 1, "rspamd{ev_base}"); + luaL_argcheck (L, pbase != NULL, 1, "'ev_base' expected"); + base = pbase ? *(pbase) : NULL; + pcfg = rspamd_lua_check_udata (L, 2, "rspamd{config}"); + luaL_argcheck (L, pcfg != NULL, 2, "'config' expected"); + cfg = pcfg ? *(pcfg) : NULL; + + if (base != NULL && cfg != NULL) { + resolver = dns_resolver_init (NULL, base, cfg); + if (resolver) { + presolver = lua_newuserdata (L, sizeof (gpointer)); + rspamd_lua_setclass (L, "rspamd{resolver}", -1); + *presolver = resolver; + } + else { + lua_pushnil (L); + } + } + else { + lua_pushnil (L); + } + + return 1; +} + +static int +lua_dns_resolver_resolve_common (lua_State *L, + struct rspamd_dns_resolver *resolver, + enum rdns_request_type type, + int first) +{ + LUA_TRACE_POINT; + struct rspamd_async_session *session = NULL; + rspamd_mempool_t *pool = NULL; + const gchar *to_resolve = NULL, *user_str = NULL; + struct lua_dns_cbdata *cbdata; + gint cbref = -1, ret; + struct rspamd_task *task = NULL; + GError *err = NULL; + gboolean forced = FALSE; + + /* Check arguments */ + if (!rspamd_lua_parse_table_arguments (L, first, &err, + "session=U{session};mempool=U{mempool};*name=S;*callback=F;" + "option=S;task=U{task};forced=B", + &session, &pool, &to_resolve, &cbref, &user_str, &task, &forced)) { + + if (err) { + ret = luaL_error (L, "invalid arguments: %s", err->message); + g_error_free (err); + + return ret; + } + + return luaL_error (L, "invalid arguments"); + } + + if (task) { + pool = task->task_pool; + session = task->s; + } + else if (!session || !pool) { + return luaL_error (L, "invalid arguments: either 'task' or 'session'/'mempool' should be set"); + } + + if (pool != NULL && to_resolve != NULL) { + cbdata = rspamd_mempool_alloc0 (pool, sizeof (struct lua_dns_cbdata)); + cbdata->resolver = resolver; + cbdata->cbref = cbref; + cbdata->user_str = rspamd_mempool_strdup (pool, user_str); + + if (type != RDNS_REQUEST_PTR) { + cbdata->to_resolve = rspamd_mempool_strdup (pool, to_resolve); + } + else { + char *ptr_str; + + ptr_str = rdns_generate_ptr_from_str (to_resolve); + + if (ptr_str == NULL) { + msg_err_task_check ("wrong resolve string to PTR request: %s", + to_resolve); + lua_pushnil (L); + + return 1; + } + + cbdata->to_resolve = rspamd_mempool_strdup (pool, ptr_str); + to_resolve = cbdata->to_resolve; + free (ptr_str); + } + + if (task == NULL) { + if ( make_dns_request (resolver, + session, + pool, + lua_dns_resolver_callback, + cbdata, + type, + to_resolve)) { + + lua_pushboolean (L, TRUE); + + if (session) { + cbdata->s = session; + cbdata->w = rspamd_session_get_watcher (session); + rspamd_session_watcher_push (session); + } + } + else { + lua_pushnil (L); + } + } + else { + cbdata->task = task; + + if (forced) { + ret = make_dns_request_task_forced (task, + lua_dns_resolver_callback, + cbdata, + type, + to_resolve); + } + else { + ret = make_dns_request_task (task, + lua_dns_resolver_callback, + cbdata, + type, + to_resolve); + } + + if (ret) { + cbdata->s = session; + cbdata->w = rspamd_session_get_watcher (session); + rspamd_session_watcher_push (session); + /* callback was set up */ + lua_pushboolean (L, TRUE); + } + else { + lua_pushnil (L); + } + } + } + else { + return luaL_error (L, "invalid arguments to lua_resolve"); + } + + return 1; + +} + +/*** + * @method resolver:resolve_a(session, pool, host, callback) + * Resolve A record for a specified host. + * @param {async_session} session asynchronous session normally associated with rspamd task (`task:get_session()`) + * @param {mempool} pool memory pool for storing intermediate data + * @param {string} host name to resolve + * @param {function} callback callback function to be called upon name resolution is finished; must be of type `function (resolver, to_resolve, results, err)` + * @return {boolean} `true` if DNS request has been scheduled + */ +static int +lua_dns_resolver_resolve_a (lua_State *L) +{ + struct rspamd_dns_resolver *dns_resolver = lua_check_dns_resolver (L); + + if (dns_resolver) { + return lua_dns_resolver_resolve_common (L, + dns_resolver, + RDNS_REQUEST_A, + 2); + } + else { + lua_pushnil (L); + } + + return 1; +} + +/*** + * @method resolver:resolve_ptr(session, pool, ip, callback) + * Resolve PTR record for a specified host. + * @param {async_session} session asynchronous session normally associated with rspamd task (`task:get_session()`) + * @param {mempool} pool memory pool for storing intermediate data + * @param {string} ip name to resolve in string form (e.g. '8.8.8.8' or '2001:dead::') + * @param {function} callback callback function to be called upon name resolution is finished; must be of type `function (resolver, to_resolve, results, err)` + * @return {boolean} `true` if DNS request has been scheduled + */ +static int +lua_dns_resolver_resolve_ptr (lua_State *L) +{ + struct rspamd_dns_resolver *dns_resolver = lua_check_dns_resolver (L); + + if (dns_resolver) { + return lua_dns_resolver_resolve_common (L, + dns_resolver, + RDNS_REQUEST_PTR, + 2); + } + else { + lua_pushnil (L); + } + + return 1; +} + +/*** + * @method resolver:resolve_txt(session, pool, host, callback) + * Resolve TXT record for a specified host. + * @param {async_session} session asynchronous session normally associated with rspamd task (`task:get_session()`) + * @param {mempool} pool memory pool for storing intermediate data + * @param {string} host name to get TXT record for + * @param {function} callback callback function to be called upon name resolution is finished; must be of type `function (resolver, to_resolve, results, err)` + * @return {boolean} `true` if DNS request has been scheduled + */ +static int +lua_dns_resolver_resolve_txt (lua_State *L) +{ + struct rspamd_dns_resolver *dns_resolver = lua_check_dns_resolver (L); + + if (dns_resolver) { + return lua_dns_resolver_resolve_common (L, + dns_resolver, + RDNS_REQUEST_TXT, + 2); + } + else { + lua_pushnil (L); + } + + return 1; +} + +/*** + * @method resolver:resolve_mx(session, pool, host, callback) + * Resolve MX record for a specified host. + * @param {async_session} session asynchronous session normally associated with rspamd task (`task:get_session()`) + * @param {mempool} pool memory pool for storing intermediate data + * @param {string} host name to get MX record for + * @param {function} callback callback function to be called upon name resolution is finished; must be of type `function (resolver, to_resolve, results, err)` + * @return {boolean} `true` if DNS request has been scheduled + */ +static int +lua_dns_resolver_resolve_mx (lua_State *L) +{ + struct rspamd_dns_resolver *dns_resolver = lua_check_dns_resolver (L); + + if (dns_resolver) { + return lua_dns_resolver_resolve_common (L, + dns_resolver, + RDNS_REQUEST_MX, + 2); + } + else { + lua_pushnil (L); + } + + return 1; +} + +/*** + * @method resolver:resolve_ns(session, pool, host, callback) + * Resolve NS records for a specified host. + * @param {async_session} session asynchronous session normally associated with rspamd task (`task:get_session()`) + * @param {mempool} pool memory pool for storing intermediate data + * @param {string} host name to get NS records for + * @param {function} callback callback function to be called upon name resolution is finished; must be of type `function (resolver, to_resolve, results, err)` + * @return {boolean} `true` if DNS request has been scheduled + */ +static int +lua_dns_resolver_resolve_ns (lua_State *L) +{ + struct rspamd_dns_resolver *dns_resolver = lua_check_dns_resolver (L); + + if (dns_resolver) { + return lua_dns_resolver_resolve_common (L, + dns_resolver, + RDNS_REQUEST_NS, + 2); + } + else { + lua_pushnil (L); + } + + return 1; +} + +/* XXX: broken currently */ +static int +lua_dns_resolver_resolve (lua_State *L) +{ + struct rspamd_dns_resolver *dns_resolver = lua_check_dns_resolver (L); + int type; + + type = lua_dns_get_type (L, 2); + + if (dns_resolver && type != 0) { + return lua_dns_resolver_resolve_common (L, dns_resolver, type, 3); + } + else { + lua_pushnil (L); + } + + return 1; +} + +static gint +lua_load_dns_resolver (lua_State *L) +{ + lua_newtable (L); + luaL_register (L, NULL, dns_resolverlib_f); + + return 1; +} + +void +luaopen_dns_resolver (lua_State * L) +{ + + luaL_newmetatable (L, "rspamd{resolver}"); + lua_pushstring (L, "__index"); + lua_pushvalue (L, -2); + lua_settable (L, -3); + + lua_pushstring (L, "class"); + lua_pushstring (L, "rspamd{resolver}"); + lua_rawset (L, -3); + + { + LUA_ENUM (L, DNS_A, RDNS_REQUEST_A); + LUA_ENUM (L, DNS_PTR, RDNS_REQUEST_PTR); + LUA_ENUM (L, DNS_MX, RDNS_REQUEST_MX); + LUA_ENUM (L, DNS_TXT, RDNS_REQUEST_TXT); + LUA_ENUM (L, DNS_SRV, RDNS_REQUEST_SRV); + LUA_ENUM (L, DNS_SPF, RDNS_REQUEST_SPF); + LUA_ENUM (L, DNS_AAAA, RDNS_REQUEST_AAAA); + LUA_ENUM (L, DNS_SOA, RDNS_REQUEST_SOA); + } + + luaL_register (L, NULL, dns_resolverlib_m); + rspamd_lua_add_preload (L, "rspamd_resolver", lua_load_dns_resolver); + + lua_pop (L, 1); /* remove metatable from stack */ +} diff --git a/src/lua/lua_dns_resolver.h b/src/lua/lua_dns_resolver.h new file mode 100644 index 000000000..f5c71aa0b --- /dev/null +++ b/src/lua/lua_dns_resolver.h @@ -0,0 +1,16 @@ +#ifndef RSPAMD_LUA_DNS_H +#define RSPAMD_LUA_DNS_H + +typedef struct lua_State lua_State; +struct rdns_reply; + +/** + * Pushes dns reply onto Lua stack + * + * @param L + * @param reply + */ +void +lua_push_dns_reply (lua_State *L, const struct rdns_reply *reply); + +#endif //RSPAMD_LUA_DNS_H diff --git a/src/lua/lua_expression.c b/src/lua/lua_expression.c index 03a667b8d..0d57a3bd8 100644 --- a/src/lua/lua_expression.c +++ b/src/lua/lua_expression.c @@ -98,7 +98,7 @@ static const struct luaL_reg exprlib_f[] = { static rspamd_expression_atom_t * lua_atom_parse (const gchar *line, gsize len, rspamd_mempool_t *pool, gpointer ud, GError **err); -static gdouble lua_atom_process (gpointer input, rspamd_expression_atom_t *atom); +static gdouble lua_atom_process (struct rspamd_expr_process_data *process_data, rspamd_expression_atom_t *atom); static const struct rspamd_atom_subr lua_atom_subr = { .parse = lua_atom_parse, @@ -166,22 +166,22 @@ lua_atom_parse (const gchar *line, gsize len, } static gdouble -lua_atom_process (gpointer input, rspamd_expression_atom_t *atom) +lua_atom_process (struct rspamd_expr_process_data *process_data, rspamd_expression_atom_t *atom) { struct lua_expression *e = (struct lua_expression *)atom->data; gdouble ret = 0; - lua_rawgeti (e->L, LUA_REGISTRYINDEX, e->process_idx); - lua_pushlstring (e->L, atom->str, atom->len); - lua_pushvalue (e->L, GPOINTER_TO_INT (input)); + lua_rawgeti (process_data->L, LUA_REGISTRYINDEX, e->process_idx); + lua_pushlstring (process_data->L, atom->str, atom->len); + lua_pushvalue (process_data->L, process_data->stack_item); - if (lua_pcall (e->L, 2, 1, 0) != 0) { - msg_info ("callback call failed: %s", lua_tostring (e->L, -1)); - lua_pop (e->L, 1); + if (lua_pcall (process_data->L, 2, 1, 0) != 0) { + msg_info ("callback call failed: %s", lua_tostring (process_data->L, -1)); + lua_pop (process_data->L, 1); } else { - ret = lua_tonumber (e->L, -1); - lua_pop (e->L, 1); + ret = lua_tonumber (process_data->L, -1); + lua_pop (process_data->L, 1); } return ret; @@ -195,11 +195,16 @@ lua_expr_process (lua_State *L) gdouble res; gint flags = 0; + struct rspamd_expr_process_data process_data; + memset (&process_data, 0, sizeof process_data); + process_data.L = L; + process_data.stack_item = 2; + if (lua_gettop (L) >= 3) { - flags = lua_tonumber (L, 3); + process_data.flags = flags; } - res = rspamd_process_expression (e->expr, flags, GINT_TO_POINTER (2)); + res = rspamd_process_expression (e->expr, &process_data); lua_pushnumber (L, res); @@ -214,29 +219,36 @@ lua_expr_process_traced (lua_State *L) rspamd_expression_atom_t *atom; gint res; guint i; - gint flags = 0; - GPtrArray *trace; + struct rspamd_expr_process_data process_data; + memset (&process_data, 0, sizeof process_data); + + process_data.L = L; + /* + * stack:1 - self + * stack:2 - data, see process_traced() definition for details + */ + process_data.stack_item = 2; if (lua_gettop (L) >= 3) { - flags = lua_tonumber (L, 3); + process_data.flags = lua_tonumber (L, 3); } - trace = g_ptr_array_sized_new (32); - res = rspamd_process_expression_track (e->expr, flags, GINT_TO_POINTER (2), - trace); + process_data.trace = g_ptr_array_sized_new (32); + + res = rspamd_process_expression_track (e->expr, &process_data); lua_pushnumber (L, res); - lua_createtable (L, trace->len, 0); + lua_createtable (L, process_data.trace->len, 0); - for (i = 0; i < trace->len; i ++) { - atom = g_ptr_array_index (trace, i); + for (i = 0; i < process_data.trace->len; i ++) { + atom = g_ptr_array_index (process_data.trace, i); lua_pushlstring (L, atom->str, atom->len); lua_rawseti (L, -2, i + 1); } - g_ptr_array_free (trace, TRUE); + g_ptr_array_free (process_data.trace, TRUE); return 2; } diff --git a/src/lua/lua_http.c b/src/lua/lua_http.c index c292428c2..64617be9b 100644 --- a/src/lua/lua_http.c +++ b/src/lua/lua_http.c @@ -14,6 +14,7 @@ * limitations under the License. */ #include "lua_common.h" +#include "lua_thread_pool.h" #include "http_private.h" #include "unix-std.h" #include "zlib.h" @@ -56,7 +57,6 @@ static const struct luaL_reg httplib_m[] = { #define RSPAMD_LUA_HTTP_FLAG_NOVERIFY (1 << 1) struct lua_http_cbdata { - lua_State *L; struct rspamd_http_connection *conn; struct rspamd_async_session *session; struct rspamd_async_watcher *w; @@ -96,7 +96,7 @@ lua_http_fin (gpointer arg) { struct lua_http_cbdata *cbd = (struct lua_http_cbdata *)arg; - luaL_unref (cbd->L, LUA_REGISTRYINDEX, cbd->cbref); + luaL_unref (cbd->cfg->lua_state, LUA_REGISTRYINDEX, cbd->cbref); if (cbd->conn) { /* Here we already have a connection, so we need to unref it */ rspamd_http_connection_unref (cbd->conn); @@ -152,13 +152,22 @@ lua_http_maybe_free (struct lua_http_cbdata *cbd) static void lua_http_push_error (struct lua_http_cbdata *cbd, const char *err) { - lua_rawgeti (cbd->L, LUA_REGISTRYINDEX, cbd->cbref); - lua_pushstring (cbd->L, err); + struct lua_callback_state lcbd; + lua_State *L; + + lua_thread_pool_prepare_callback (cbd->cfg->lua_thread_pool, &lcbd); + + L = lcbd.L; - if (lua_pcall (cbd->L, 1, 0, 0) != 0) { - msg_info ("callback call failed: %s", lua_tostring (cbd->L, -1)); - lua_pop (cbd->L, 1); + lua_rawgeti (L, LUA_REGISTRYINDEX, cbd->cbref); + lua_pushstring (L, err); + + if (lua_pcall (L, 1, 0, 0) != 0) { + msg_info ("callback call failed: %s", lua_tostring (L, -1)); + lua_pop (L, 1); } + + lua_thread_pool_restore_callback (&lcbd); } static void @@ -179,51 +188,60 @@ lua_http_finish_handler (struct rspamd_http_connection *conn, const gchar *body; gsize body_len; - lua_rawgeti (cbd->L, LUA_REGISTRYINDEX, cbd->cbref); + struct lua_callback_state lcbd; + lua_State *L; + + lua_thread_pool_prepare_callback (cbd->cfg->lua_thread_pool, &lcbd); + + L = lcbd.L; + + lua_rawgeti (L, LUA_REGISTRYINDEX, cbd->cbref); /* Error */ - lua_pushnil (cbd->L); + lua_pushnil (L); /* Reply code */ - lua_pushinteger (cbd->L, msg->code); + lua_pushinteger (L, msg->code); /* Body */ body = rspamd_http_message_get_body (msg, &body_len); if (cbd->flags & RSPAMD_LUA_HTTP_FLAG_TEXT) { struct rspamd_lua_text *t; - t = lua_newuserdata (cbd->L, sizeof (*t)); - rspamd_lua_setclass (cbd->L, "rspamd{text}", -1); + t = lua_newuserdata (L, sizeof (*t)); + rspamd_lua_setclass (L, "rspamd{text}", -1); t->start = body; t->len = body_len; t->flags = 0; } else { if (body_len > 0) { - lua_pushlstring (cbd->L, body, body_len); + lua_pushlstring (L, body, body_len); } else { - lua_pushnil (cbd->L); + lua_pushnil (L); } } /* Headers */ - lua_newtable (cbd->L); + lua_newtable (L); HASH_ITER (hh, msg->headers, h, htmp) { /* * Lowercase header name, as Lua cannot search in caseless matter */ rspamd_str_lc (h->combined->str, h->name.len); - lua_pushlstring (cbd->L, h->name.begin, h->name.len); - lua_pushlstring (cbd->L, h->value.begin, h->value.len); - lua_settable (cbd->L, -3); + lua_pushlstring (L, h->name.begin, h->name.len); + lua_pushlstring (L, h->value.begin, h->value.len); + lua_settable (L, -3); } - if (lua_pcall (cbd->L, 4, 0, 0) != 0) { - msg_info ("callback call failed: %s", lua_tostring (cbd->L, -1)); - lua_pop (cbd->L, 1); + if (lua_pcall (L, 4, 0, 0) != 0) { + msg_info ("callback call failed: %s", lua_tostring (L, -1)); + lua_pop (L, 1); } lua_http_maybe_free (cbd); + lua_thread_pool_restore_callback (&lcbd); + return 0; } @@ -707,7 +725,6 @@ lua_http_request (lua_State *L) } cbd = g_malloc0 (sizeof (*cbd)); - cbd->L = L; cbd->cbref = cbref; cbd->msg = msg; cbd->ev_base = ev_base; diff --git a/src/lua/lua_redis.c b/src/lua/lua_redis.c index e5b97ebeb..0fc9c43b7 100644 --- a/src/lua/lua_redis.c +++ b/src/lua/lua_redis.c @@ -14,6 +14,7 @@ * limitations under the License. */ #include "lua_common.h" +#include "lua_thread_pool.h" #include "utlist.h" #include "contrib/hiredis/hiredis.h" @@ -92,7 +93,7 @@ struct lua_redis_specific_userdata; */ struct lua_redis_userdata { redisAsyncContext *ctx; - lua_State *L; + struct rspamd_task *task; struct rspamd_async_session *s; struct event_base *ev_base; struct rspamd_config *cfg; @@ -191,7 +192,7 @@ lua_redis_dtor (struct lua_redis_ctx *ctx) lua_redis_free_args (cur->args, cur->arglens, cur->nargs); if (cur->cbref != -1) { - luaL_unref (ud->L, LUA_REGISTRYINDEX, cur->cbref); + luaL_unref (ud->cfg->lua_state, LUA_REGISTRYINDEX, cur->cbref); } g_free (cur); @@ -244,21 +245,27 @@ lua_redis_push_error (const gchar *err, gboolean connected) { struct lua_redis_userdata *ud = sp_ud->c; + struct lua_callback_state cbs; if (!(sp_ud->flags & (LUA_REDIS_SPECIFIC_REPLIED|LUA_REDIS_SPECIFIC_FINISHED))) { if (sp_ud->cbref != -1) { + + lua_thread_pool_prepare_callback (ud->cfg->lua_thread_pool, &cbs); + /* Push error */ - lua_rawgeti (ud->L, LUA_REGISTRYINDEX, sp_ud->cbref); + lua_rawgeti (cbs.L, LUA_REGISTRYINDEX, sp_ud->cbref); /* String of error */ - lua_pushstring (ud->L, err); + lua_pushstring (cbs.L, err); /* Data is nil */ - lua_pushnil (ud->L); + lua_pushnil (cbs.L); - if (lua_pcall (ud->L, 2, 0, 0) != 0) { - msg_info ("call to callback failed: %s", lua_tostring (ud->L, -1)); - lua_pop (ud->L, 1); + if (lua_pcall (cbs.L, 2, 0, 0) != 0) { + msg_info ("call to callback failed: %s", lua_tostring (cbs.L, -1)); + lua_pop (cbs.L, 1); } + + lua_thread_pool_restore_callback (&cbs); } sp_ud->flags |= LUA_REDIS_SPECIFIC_REPLIED; @@ -323,21 +330,25 @@ lua_redis_push_data (const redisReply *r, struct lua_redis_ctx *ctx, struct lua_redis_specific_userdata *sp_ud) { struct lua_redis_userdata *ud = sp_ud->c; + struct lua_callback_state cbs; if (!(sp_ud->flags & (LUA_REDIS_SPECIFIC_REPLIED|LUA_REDIS_SPECIFIC_FINISHED))) { if (sp_ud->cbref != -1) { + lua_thread_pool_prepare_callback (ud->cfg->lua_thread_pool, &cbs); + /* Push error */ - lua_rawgeti (ud->L, LUA_REGISTRYINDEX, sp_ud->cbref); + lua_rawgeti (cbs.L, LUA_REGISTRYINDEX, sp_ud->cbref); /* Error is nil */ - lua_pushnil (ud->L); + lua_pushnil (cbs.L); /* Data */ - lua_redis_push_reply (ud->L, r, ctx->flags & LUA_REDIS_TEXTDATA); + lua_redis_push_reply (cbs.L, r, ctx->flags & LUA_REDIS_TEXTDATA); - if (lua_pcall (ud->L, 2, 0, 0) != 0) { - msg_info ("call to callback failed: %s", lua_tostring (ud->L, -1)); - lua_pop (ud->L, 1); + if (lua_pcall (cbs.L, 2, 0, 0) != 0) { + msg_info ("call to callback failed: %s", lua_tostring (cbs.L, -1)); + lua_pop (cbs.L, 1); } + lua_thread_pool_restore_callback (&cbs); } sp_ud->flags |= LUA_REDIS_SPECIFIC_REPLIED; @@ -689,7 +700,7 @@ rspamd_lua_redis_prepare_connection (lua_State *L, gint *pcbref) ud->cfg = cfg; ud->pool = cfg->redis_pool; ud->ev_base = ev_base; - ud->L = L; + ud->task = task; ret = TRUE; } diff --git a/src/lua/lua_tcp.c b/src/lua/lua_tcp.c index 797bdcc4e..8d948c6d5 100644 --- a/src/lua/lua_tcp.c +++ b/src/lua/lua_tcp.c @@ -14,6 +14,7 @@ * limitations under the License. */ #include "lua_common.h" +#include "lua_thread_pool.h" #include "utlist.h" #include "unix-std.h" @@ -186,7 +187,6 @@ struct lua_tcp_dtor { #define LUA_TCP_FLAG_FINISHED (1 << 4) struct lua_tcp_cbdata { - lua_State *L; struct rspamd_async_session *session; struct rspamd_async_event *async_ev; struct event_base *ev_base; @@ -203,6 +203,7 @@ struct lua_tcp_cbdata { struct event ev; struct lua_tcp_dtor *dtors; ref_entry_t ref; + struct rspamd_task *task; }; #define msg_debug_tcp(...) rspamd_conditional_debug_fast (NULL, cbd->addr, \ @@ -249,7 +250,7 @@ lua_tcp_shift_handler (struct lua_tcp_cbdata *cbd) if (hdl->type == LUA_WANT_READ) { if (hdl->h.r.cbref) { - luaL_unref (cbd->L, LUA_REGISTRYINDEX, hdl->h.r.cbref); + luaL_unref (cbd->task->cfg->lua_state, LUA_REGISTRYINDEX, hdl->h.r.cbref); } if (hdl->h.r.stop_pattern) { @@ -258,7 +259,7 @@ lua_tcp_shift_handler (struct lua_tcp_cbdata *cbd) } else { if (hdl->h.w.cbref) { - luaL_unref (cbd->L, LUA_REGISTRYINDEX, hdl->h.w.cbref); + luaL_unref (cbd->task->cfg->lua_state, LUA_REGISTRYINDEX, hdl->h.w.cbref); } if (hdl->h.w.iov) { @@ -280,7 +281,7 @@ lua_tcp_fin (gpointer arg) msg_debug_tcp ("finishing TCP connection"); if (cbd->connect_cb) { - luaL_unref (cbd->L, LUA_REGISTRYINDEX, cbd->connect_cb); + luaL_unref (cbd->task->cfg->lua_state, LUA_REGISTRYINDEX, cbd->connect_cb); } if (cbd->fd != -1) { @@ -338,6 +339,11 @@ lua_tcp_push_error (struct lua_tcp_cbdata *cbd, gboolean is_fatal, struct lua_tcp_cbdata **pcbd; struct lua_tcp_handler *hdl; gint cbref, top; + struct lua_callback_state cbs; + lua_State *L; + + lua_thread_pool_prepare_callback (cbd->task->cfg->lua_thread_pool, &cbs); + L = cbs.L; va_start (ap, err); @@ -356,27 +362,27 @@ lua_tcp_push_error (struct lua_tcp_cbdata *cbd, gboolean is_fatal, } if (cbref != -1) { - top = lua_gettop (cbd->L); - lua_rawgeti (cbd->L, LUA_REGISTRYINDEX, cbref); + top = lua_gettop (L); + lua_rawgeti (L, LUA_REGISTRYINDEX, cbref); /* Error message */ va_copy (ap_copy, ap); - lua_pushvfstring (cbd->L, err, ap_copy); + lua_pushvfstring (L, err, ap_copy); va_end (ap_copy); /* Body */ - lua_pushnil (cbd->L); + lua_pushnil (L); /* Connection */ - pcbd = lua_newuserdata (cbd->L, sizeof (*pcbd)); + pcbd = lua_newuserdata (L, sizeof (*pcbd)); *pcbd = cbd; - rspamd_lua_setclass (cbd->L, "rspamd{tcp}", -1); + rspamd_lua_setclass (L, "rspamd{tcp}", -1); REF_RETAIN (cbd); - if (lua_pcall (cbd->L, 3, 0, 0) != 0) { - msg_info ("callback call failed: %s", lua_tostring (cbd->L, -1)); + if (lua_pcall (L, 3, 0, 0) != 0) { + msg_info ("callback call failed: %s", lua_tostring (L, -1)); } - lua_settop (cbd->L, top); + lua_settop (L, top); REF_RELEASE (cbd); } @@ -391,6 +397,8 @@ lua_tcp_push_error (struct lua_tcp_cbdata *cbd, gboolean is_fatal, } va_end (ap); + + lua_thread_pool_restore_callback (&cbs); } static void @@ -400,6 +408,11 @@ lua_tcp_push_data (struct lua_tcp_cbdata *cbd, const guint8 *str, gsize len) struct lua_tcp_cbdata **pcbd; struct lua_tcp_handler *hdl; gint cbref, arg_cnt, top; + struct lua_callback_state cbs; + lua_State *L; + + lua_thread_pool_prepare_callback (cbd->task->cfg->lua_thread_pool, &cbs); + L = cbs.L; hdl = g_queue_peek_head (cbd->handlers); @@ -413,15 +426,15 @@ lua_tcp_push_data (struct lua_tcp_cbdata *cbd, const guint8 *str, gsize len) } if (cbref != -1) { - top = lua_gettop (cbd->L); - lua_rawgeti (cbd->L, LUA_REGISTRYINDEX, cbref); + top = lua_gettop (L); + lua_rawgeti (L, LUA_REGISTRYINDEX, cbref); /* Error */ - lua_pushnil (cbd->L); + lua_pushnil (L); /* Body */ if (hdl->type == LUA_WANT_READ) { - t = lua_newuserdata (cbd->L, sizeof (*t)); - rspamd_lua_setclass (cbd->L, "rspamd{text}", -1); + t = lua_newuserdata (L, sizeof (*t)); + rspamd_lua_setclass (L, "rspamd{text}", -1); t->start = (const gchar *)str; t->len = len; t->flags = 0; @@ -431,19 +444,21 @@ lua_tcp_push_data (struct lua_tcp_cbdata *cbd, const guint8 *str, gsize len) arg_cnt = 2; } /* Connection */ - pcbd = lua_newuserdata (cbd->L, sizeof (*pcbd)); + pcbd = lua_newuserdata (L, sizeof (*pcbd)); *pcbd = cbd; - rspamd_lua_setclass (cbd->L, "rspamd{tcp}", -1); + rspamd_lua_setclass (L, "rspamd{tcp}", -1); REF_RETAIN (cbd); - if (lua_pcall (cbd->L, arg_cnt, 0, 0) != 0) { - msg_info ("callback call failed: %s", lua_tostring (cbd->L, -1)); + if (lua_pcall (L, arg_cnt, 0, 0) != 0) { + msg_info ("callback call failed: %s", lua_tostring (L, -1)); } - lua_settop (cbd->L, top); + lua_settop (L, top); REF_RELEASE (cbd); } + + lua_thread_pool_restore_callback (&cbs); } static void @@ -667,6 +682,8 @@ lua_tcp_handler (int fd, short what, gpointer ud) gssize r; gint so_error = 0; socklen_t so_len = sizeof (so_error); + struct lua_callback_state cbs; + lua_State *L; REF_RETAIN (cbd); @@ -693,22 +710,25 @@ lua_tcp_handler (int fd, short what, gpointer ud) else { cbd->flags |= LUA_TCP_FLAG_CONNECTED; + lua_thread_pool_prepare_callback (cbd->task->cfg->lua_thread_pool, &cbs); + L = cbs.L; + if (cbd->connect_cb != -1) { struct lua_tcp_cbdata **pcbd; gint top; - top = lua_gettop (cbd->L); - lua_rawgeti (cbd->L, LUA_REGISTRYINDEX, cbd->connect_cb); - pcbd = lua_newuserdata (cbd->L, sizeof (*pcbd)); + top = lua_gettop (L); + lua_rawgeti (L, LUA_REGISTRYINDEX, cbd->connect_cb); + pcbd = lua_newuserdata (L, sizeof (*pcbd)); *pcbd = cbd; REF_RETAIN (cbd); - rspamd_lua_setclass (cbd->L, "rspamd{tcp}", -1); + rspamd_lua_setclass (L, "rspamd{tcp}", -1); - if (lua_pcall (cbd->L, 1, 0, 0) != 0) { - msg_info ("callback call failed: %s", lua_tostring (cbd->L, -1)); + if (lua_pcall (L, 1, 0, 0) != 0) { + msg_info ("callback call failed: %s", lua_tostring (L, -1)); } - lua_settop (cbd->L, top); + lua_settop (L, top); REF_RELEASE (cbd); } @@ -1174,7 +1194,7 @@ lua_tcp_request (lua_State *L) return 1; } - cbd->L = L; + cbd->task = task; h = rspamd_random_uint64_fast (); rspamd_snprintf (cbd->tag, sizeof (cbd->tag), "%uxL", h); cbd->handlers = g_queue_new (); diff --git a/src/lua/lua_thread_pool.c b/src/lua/lua_thread_pool.c new file mode 100644 index 000000000..979b31f6b --- /dev/null +++ b/src/lua/lua_thread_pool.c @@ -0,0 +1,148 @@ +#include "config.h" + +#include "lua_common.h" +#include "lua_thread_pool.h" + +struct lua_thread_pool { + GQueue *available_items; + lua_State *L; + gint max_items; + struct thread_entry *running_entry; +}; + +static struct thread_entry * +thread_entry_new (lua_State * L) +{ + struct thread_entry *ent; + ent = g_new0(struct thread_entry, 1); + ent->lua_state = lua_newthread (L); + ent->thread_index = luaL_ref (L, LUA_REGISTRYINDEX); + + return ent; +} + +static void +thread_entry_free (lua_State * L, struct thread_entry *ent) +{ + luaL_unref (L, LUA_REGISTRYINDEX, ent->thread_index); + g_free (ent); +} + +struct lua_thread_pool * +lua_thread_pool_new (lua_State * L) +{ + struct lua_thread_pool * pool = g_new0 (struct lua_thread_pool, 1); + + pool->L = L; + pool->max_items = 100; + + pool->available_items = g_queue_new (); + int i; + + struct thread_entry *ent; + for (i = 0; i < MAX(2, pool->max_items / 10); i ++) { + ent = thread_entry_new (pool->L); + g_queue_push_head (pool->available_items, ent); + } + + return pool; +} + +void +lua_thread_pool_free (struct lua_thread_pool *pool) +{ + struct thread_entry *ent = NULL; + while (!g_queue_is_empty (pool->available_items)) { + ent = g_queue_pop_head (pool->available_items); + thread_entry_free (pool->L, ent); + } + g_queue_free (pool->available_items); + g_free (pool); +} + +struct thread_entry * +lua_thread_pool_get (struct lua_thread_pool *pool) +{ + gpointer cur; + struct thread_entry *ent = NULL; + + cur = g_queue_pop_head (pool->available_items); + + if (cur) { + ent = cur; + } + else { + ent = thread_entry_new (pool->L); + } + + pool->running_entry = ent; + + return ent; +} + +void +lua_thread_pool_return (struct lua_thread_pool *pool, struct thread_entry *thread_entry) +{ + g_assert (lua_status (thread_entry->lua_state) == 0); /* we can't return a running/yielded thread into the pool */ + + if (pool->running_entry == thread_entry) { + pool->running_entry = NULL; + } + + if (g_queue_get_length (pool->available_items) <= pool->max_items) { + thread_entry->cd = NULL; + g_queue_push_head (pool->available_items, thread_entry); + } + else { + thread_entry_free (pool->L, thread_entry); + } +} + +void +lua_thread_pool_terminate_entry (struct lua_thread_pool *pool, struct thread_entry *thread_entry) +{ + struct thread_entry *ent = NULL; + + /* we should only terminate failed threads */ + g_assert (lua_status (thread_entry->lua_state) != 0 && lua_status (thread_entry->lua_state) != LUA_YIELD); + + if (pool->running_entry == thread_entry) { + pool->running_entry = NULL; + } + + thread_entry_free (pool->L, thread_entry); + + if (g_queue_get_length (pool->available_items) <= pool->max_items) { + ent = thread_entry_new (pool->L); + g_queue_push_head (pool->available_items, ent); + } +} + +struct thread_entry * +lua_thread_pool_get_running_entry (struct lua_thread_pool *pool) +{ + return pool->running_entry; +} + +void +lua_thread_pool_set_running_entry (struct lua_thread_pool *pool, struct thread_entry *thread_entry) +{ + pool->running_entry = thread_entry; +} + + +void +lua_thread_pool_prepare_callback (struct lua_thread_pool *pool, struct lua_callback_state *cbs) +{ + cbs->thread_pool = pool; + cbs->previous_thread = lua_thread_pool_get_running_entry (pool); + cbs->my_thread = lua_thread_pool_get (pool); + cbs->L = cbs->my_thread->lua_state; +} + +void +lua_thread_pool_restore_callback (struct lua_callback_state *cbs) +{ + lua_thread_pool_return (cbs->thread_pool, cbs->my_thread); + lua_thread_pool_set_running_entry (cbs->thread_pool, cbs->previous_thread); +} diff --git a/src/lua/lua_thread_pool.h b/src/lua/lua_thread_pool.h new file mode 100644 index 000000000..b72b72e8d --- /dev/null +++ b/src/lua/lua_thread_pool.h @@ -0,0 +1,106 @@ +#ifndef LUA_THREAD_POOL_H_ +#define LUA_THREAD_POOL_H_ + +#include <lua.h> + +struct thread_entry { + lua_State *lua_state; + gint thread_index; + gpointer cd; +}; + +struct thread_pool; + +struct lua_callback_state { + lua_State *L; + struct thread_entry *my_thread; + struct thread_entry *previous_thread; + struct lua_thread_pool *thread_pool; +}; + +/** + * Allocates new thread pool on state L. Pre-creates number of lua-threads to use later on + * + * @param L + * @return + */ +struct lua_thread_pool * +lua_thread_pool_new (lua_State * L); + +/** + * Destroys the pool + * @param pool + */ +void +lua_thread_pool_free (struct lua_thread_pool *pool); + +/** + * Extracts a thread from the list of available ones. + * It immediately becames running one and should be used to run a Lua script/function straight away. + * as soon as the code is finished, it should be either returned into list of available threads by + * calling lua_thread_pool_return() or terminated by calling lua_thread_pool_terminate_entry() + * if the code finished with error. + * + * If the code performed YIELD, the thread is still running and it's live should be controlled by the callee + * + * @param pool + * @return + */ +struct thread_entry * +lua_thread_pool_get(struct lua_thread_pool *pool); + +/** + * Return thread into the list of available ones. It can't be done with yielded or dead threads. + * + * @param pool + * @param thread_entry + */ +void +lua_thread_pool_return(struct lua_thread_pool *pool, struct thread_entry *thread_entry); + +/** + * Removes thread from Lua state. It should be done to dead (which ended with an error) threads only + * + * @param pool + * @param thread_entry + */ +void +lua_thread_pool_terminate_entry(struct lua_thread_pool *pool, struct thread_entry *thread_entry); + +/** + * Currently running thread. Typically needed in yielding point - to fill-up continuation. + * + * @param pool + * @return + */ +struct thread_entry * +lua_thread_pool_get_running_entry (struct lua_thread_pool *pool); + +/** + * Updates currently running thread + * + * @param pool + * @param thread_entry + */ +void +lua_thread_pool_set_running_entry (struct lua_thread_pool *pool, struct thread_entry *thread_entry); + +/** + * Prevents yielded thread to be used for callback execution. lua_thread_pool_restore_callback() should be called afterwards. + * + * @param pool + * @param cbs + */ +void +lua_thread_pool_prepare_callback (struct lua_thread_pool *pool, struct lua_callback_state *cbs); + +/** + * Restores state after lua_thread_pool_prepare_callback () usage + * + * @param cbs + */ +void +lua_thread_pool_restore_callback (struct lua_callback_state *cbs); + +#endif /* LUA_THREAD_POOL_H_ */ + diff --git a/src/plugins/lua/multimap.lua b/src/plugins/lua/multimap.lua index d20885525..909260681 100644 --- a/src/plugins/lua/multimap.lua +++ b/src/plugins/lua/multimap.lua @@ -28,6 +28,7 @@ local regexp = require "rspamd_regexp" local rspamd_expression = require "rspamd_expression" local rspamd_ip = require "rspamd_ip" local lua_util = require "lua_util" +local rspamd_dns = require "rspamd_dns" local lua_selectors = require "lua_selectors" local redis_params local fun = require "fun" @@ -706,24 +707,25 @@ local function multimap_callback(task, rule) if rt == 'ip' then match_rule(rule, ip) else - local cb = function (_, to_resolve, results, err) - task:inc_dns_req() - if err and (err ~= 'requested record is not found' and err ~= 'no records with this name') then - rspamd_logger.errx(task, 'error looking up %s: %s', to_resolve, err) - end - if results then - task:insert_result(rule['symbol'], 1, rule['map']) + local to_resolve = ip_to_rbl(ip, rule['map']) + + local is_ok, results = rspamd_dns.request({ + type = "a", + task = task, + name = to_resolve, + }) - if pre_filter then - task:set_pre_result(rule['action'], 'Matched map: ' .. rule['symbol']) - end + lua_util.debugm(N, rspamd_config, 'resolve() finished: results=%1, is_ok=%2, to_resolve=%3', results, is_ok, to_resolve) + + if not is_ok and (results ~= 'requested record is not found' and results ~= 'no records with this name') then + rspamd_logger.errx(task, 'error looking up %s: %s', to_resolve, results) + elseif is_ok then + task:insert_result(rule['symbol'], 1, rule['map']) + if pre_filter then + task:set_pre_result(rule['action'], 'Matched map: ' .. rule['symbol']) end end - task:get_resolver():resolve_a({task = task, - name = ip_to_rbl(ip, rule['map']), - callback = cb, - }) end end end, diff --git a/src/plugins/lua/reputation.lua b/src/plugins/lua/reputation.lua index 138899417..f5b461d78 100644 --- a/src/plugins/lua/reputation.lua +++ b/src/plugins/lua/reputation.lua @@ -25,6 +25,7 @@ local N = 'reputation' local rspamd_logger = require "rspamd_logger" local rspamd_util = require "rspamd_util" +local rspamd_dns = require "rspamd_dns" local lua_util = require "lua_util" local lua_maps = require "lua_maps" local hash = require 'rspamd_cryptobox_hash' @@ -716,49 +717,45 @@ end --]] local function reputation_dns_get_token(task, rule, token, continuation_cb) - local r = task:get_resolver() + -- local r = task:get_resolver() local key = gen_token_key(token, rule) local dns_name = key .. '.' .. rule.backend.config.list - local function dns_callback(_, to_resolve, results, err) - if err and (err ~= 'requested record is not found' and err ~= 'no records with this name') then - rspamd_logger.errx(task, 'error looking up %s: %s', to_resolve, err) - end - if not results then - lua_util.debugm(N, task, 'DNS RESPONSE: label=%1 results=%2 error=%3 list=%4', - to_resolve, false, err, rule.backend.config.list) - else - lua_util.debugm(N, task, 'DNS RESPONSE: label=%1 results=%2 error=%3 list=%4', - to_resolve, true, err, rule.backend.config.list) - end - - -- Now split tokens to list of values - if not err and results then - local values = {} - -- Format: key1=num1;key2=num2...keyn=numn - fun.each(function(e) - local vals = lua_util.rspamd_str_split(e, "=") - if vals and #vals == 2 then - local nv = tonumber(vals[2]) - if nv then - values[vals[1]] = nv - end - end - end, - lua_util.rspamd_str_split(results[1], ";")) - continuation_cb(nil, to_resolve, values) - else - continuation_cb(err, to_resolve, nil) - end - - task:inc_dns_req() - end - r:resolve_a({ + local is_ok, results = rspamd_dns.request({ + type = 'a', task = task, name = dns_name, - callback = dns_callback, forced = true, }) + + if not is_ok and (results ~= 'requested record is not found' and results ~= 'no records with this name') then + rspamd_logger.errx(task, 'error looking up %s: %s', dns_name, results) + end + + lua_util.debugm(N, task, 'DNS RESPONSE: label=%1 results=%2 is_ok=%3 list=%4', + dns_name, results, is_ok, rule.backend.config.list) + + -- Now split tokens to list of values + if is_ok then + local values = {} + -- Format: key1=num1;key2=num2...keyn=numn + fun.each(function(e) + local vals = lua_util.rspamd_str_split(e, "=") + if vals and #vals == 2 then + local nv = tonumber(vals[2]) + if nv then + values[vals[1]] = nv + end + end + end, + lua_util.rspamd_str_split(results[1], ";")) + + continuation_cb(nil, dns_name, values) + else + continuation_cb(results, dns_name, nil) + end + + task:inc_dns_req() end local function reputation_redis_init(rule, cfg, ev_base, worker) diff --git a/src/plugins/regexp.c b/src/plugins/regexp.c index 915305aa3..92cccc338 100644 --- a/src/plugins/regexp.c +++ b/src/plugins/regexp.c @@ -433,7 +433,12 @@ process_regexp_item (struct rspamd_task *task, void *user_data) else { /* Process expression */ if (item->expr) { - res = rspamd_process_expression (item->expr, 0, task); + struct rspamd_expr_process_data process_data; + memset (&process_data, 0, sizeof process_data); + + process_data.task = task; + + res = rspamd_process_expression (item->expr, &process_data); } else { msg_warn_task ("FIXME: %s symbol is broken with new expressions", |