From d23f1ee2f96e00c108a63c9849aabc1608b81e8a Mon Sep 17 00:00:00 2001 From: Vsevolod Stakhov Date: Wed, 5 May 2021 14:59:56 +0100 Subject: [PATCH] [Rework] Use C++ version of the lua threads pool --- config.h.in | 7 + src/lua/CMakeLists.txt | 2 +- src/lua/lua_common.h | 568 ++++++++++++++++++------------------ src/lua/lua_thread_pool.c | 349 ---------------------- src/lua/lua_thread_pool.cxx | 365 +++++++++++++++++++++++ 5 files changed, 662 insertions(+), 629 deletions(-) delete mode 100644 src/lua/lua_thread_pool.c create mode 100644 src/lua/lua_thread_pool.cxx diff --git a/config.h.in b/config.h.in index 85a1fe829..c52506f8e 100644 --- a/config.h.in +++ b/config.h.in @@ -419,6 +419,12 @@ extern uint64_t ottery_rand_uint64(void); #endif #endif +#ifdef __cplusplus + #define RSPAMD_CONSTRUCTOR(f) \ + static void f(void); \ + struct f##_t_ { f##_t_(void) { f(); } }; static f##_t_ f##_; \ + static void f(void) +#else #if __GNUC__ > 2 || (__GNUC__ == 2 && __GNUC_MINOR__ >= 7) #define RSPAMD_CONSTRUCTOR(f) \ static void f(void) __attribute__((constructor)); \ @@ -430,6 +436,7 @@ extern uint64_t ottery_rand_uint64(void); /* In fact, everything else is not supported ¯\_(ツ)_/¯ */ #error incompatible compiler found, need gcc > 2.7 or clang #endif +#endif /* __cplusplus */ #ifdef __GNUC__ #define RSPAMD_CONST_FUNCTION __attribute__ ((const)) diff --git a/src/lua/CMakeLists.txt b/src/lua/CMakeLists.txt index 73ccc609b..4782d3f8e 100644 --- a/src/lua/CMakeLists.txt +++ b/src/lua/CMakeLists.txt @@ -25,7 +25,7 @@ SET(LUASRC ${CMAKE_CURRENT_SOURCE_DIR}/lua_common.c ${CMAKE_CURRENT_SOURCE_DIR}/lua_sqlite3.c ${CMAKE_CURRENT_SOURCE_DIR}/lua_cryptobox.c ${CMAKE_CURRENT_SOURCE_DIR}/lua_map.c - ${CMAKE_CURRENT_SOURCE_DIR}/lua_thread_pool.c + ${CMAKE_CURRENT_SOURCE_DIR}/lua_thread_pool.cxx ${CMAKE_CURRENT_SOURCE_DIR}/lua_dns.c ${CMAKE_CURRENT_SOURCE_DIR}/lua_udp.c ${CMAKE_CURRENT_SOURCE_DIR}/lua_text.c diff --git a/src/lua/lua_common.h b/src/lua/lua_common.h index bf918274c..08f98d7f1 100644 --- a/src/lua/lua_common.h +++ b/src/lua/lua_common.h @@ -3,19 +3,33 @@ #include "config.h" +/* Lua headers do not have __cplusplus guards... */ +#ifdef __cplusplus +extern "C" { +#endif + #include #include #include -#include - #ifdef WITH_LUAJIT #include #endif +#ifdef __cplusplus +} +#endif +#include + + + #include "rspamd.h" #include "ucl.h" #include "lua_ucl.h" +#ifdef __cplusplus +extern "C" { +#endif + #ifndef lua_open #define lua_open() luaL_newstate () #endif @@ -33,14 +47,14 @@ static inline void luaL_register (lua_State *L, const gchar *name, const struct luaL_reg *methods) { - if (name != NULL) { - lua_newtable (L); - } - luaL_setfuncs (L, methods, 0); - if (name != NULL) { - lua_pushvalue (L, -1); - lua_setglobal (L, name); - } +if (name != NULL) { + lua_newtable (L); +} +luaL_setfuncs (L, methods, 0); +if (name != NULL) { + lua_pushvalue (L, -1); + lua_setglobal (L, name); +} } #endif @@ -49,25 +63,25 @@ luaL_register (lua_State *L, const gchar *name, const struct luaL_reg *methods) /* Special hack to work with moonjit of specific version */ #if !defined(MOONJIT_VERSION) && (!defined(LUAJIT_VERSION_NUM) || LUAJIT_VERSION_NUM != 20200) static inline int lua_absindex (lua_State *L, int i) { - if (i < 0 && i > LUA_REGISTRYINDEX) - i += lua_gettop(L) + 1; - return i; +if (i < 0 && i > LUA_REGISTRYINDEX) + i += lua_gettop(L) + 1; +return i; } #endif static inline int lua_rawgetp (lua_State *L, int i, const void *p) { - int abs_i = lua_absindex(L, i); - lua_pushlightuserdata(L, (void*)p); - lua_rawget(L, abs_i); - return lua_type(L, -1); +int abs_i = lua_absindex(L, i); +lua_pushlightuserdata(L, (void*)p); +lua_rawget(L, abs_i); +return lua_type(L, -1); } static inline void lua_rawsetp (lua_State *L, int i, const void *p) { - int abs_i = lua_absindex(L, i); - luaL_checkstack(L, 1, "not enough stack slots"); - lua_pushlightuserdata(L, (void*)p); - lua_insert(L, -2); - lua_rawset(L, abs_i); +int abs_i = lua_absindex(L, i); +luaL_checkstack(L, 1, "not enough stack slots"); +lua_pushlightuserdata(L, (void*)p); +lua_insert(L, -2); +lua_rawset(L, abs_i); } #endif @@ -76,25 +90,21 @@ static inline void lua_rawsetp (lua_State *L, int i, const void *p) { #define LUA_PUBLIC_FUNCTION_DEF(class, name) int lua_##class##_##name (lua_State * L) #define LUA_INTERFACE_DEF(class, name) { #name, lua_##class##_##name } -#ifdef __cplusplus -extern "C" { -#endif - extern const luaL_reg null_reg[]; #define RSPAMD_LUA_API_VERSION 12 /* Locked lua state with mutex */ struct lua_locked_state { - lua_State *L; - rspamd_mutex_t *m; +lua_State *L; +rspamd_mutex_t *m; }; /** - * Lua IP address structure - */ +* Lua IP address structure +*/ struct rspamd_lua_ip { - rspamd_inet_addr_t *addr; +rspamd_inet_addr_t *addr; }; #define RSPAMD_TEXT_FLAG_OWN (1u << 0u) @@ -103,21 +113,21 @@ struct rspamd_lua_ip { #define RSPAMD_TEXT_FLAG_SYSMALLOC (1u << 3u) #define RSPAMD_TEXT_FLAG_FAKE (1u << 4u) struct rspamd_lua_text { - const gchar *start; - guint len; - guint flags; +const gchar *start; +guint len; +guint flags; }; struct rspamd_lua_url { - struct rspamd_url *url; +struct rspamd_url *url; }; struct rspamd_lua_regexp { - rspamd_regexp_t *re; - gchar *module; - gchar *re_pattern; - gsize match_limit; - gint re_flags; +rspamd_regexp_t *re; +gchar *module; +gchar *re_pattern; +gsize match_limit; +gint re_flags; }; struct rspamd_map; @@ -126,174 +136,174 @@ struct radix_tree_compressed; struct rspamd_mime_header; enum rspamd_lua_map_type { - RSPAMD_LUA_MAP_RADIX = 0, - RSPAMD_LUA_MAP_SET, - RSPAMD_LUA_MAP_HASH, - RSPAMD_LUA_MAP_REGEXP, - RSPAMD_LUA_MAP_REGEXP_MULTIPLE, - RSPAMD_LUA_MAP_CALLBACK, - RSPAMD_LUA_MAP_CDB, - RSPAMD_LUA_MAP_UNKNOWN, +RSPAMD_LUA_MAP_RADIX = 0, +RSPAMD_LUA_MAP_SET, +RSPAMD_LUA_MAP_HASH, +RSPAMD_LUA_MAP_REGEXP, +RSPAMD_LUA_MAP_REGEXP_MULTIPLE, +RSPAMD_LUA_MAP_CALLBACK, +RSPAMD_LUA_MAP_CDB, +RSPAMD_LUA_MAP_UNKNOWN, }; struct rspamd_lua_map { - struct rspamd_map *map; - enum rspamd_lua_map_type type; - guint flags; - - union { - struct rspamd_radix_map_helper *radix; - struct rspamd_hash_map_helper *hash; - struct rspamd_regexp_map_helper *re_map; - struct rspamd_cdb_map_helper *cdb_map; - struct lua_map_callback_data *cbdata; - } data; +struct rspamd_map *map; +enum rspamd_lua_map_type type; +guint flags; + +union { + struct rspamd_radix_map_helper *radix; + struct rspamd_hash_map_helper *hash; + struct rspamd_regexp_map_helper *re_map; + struct rspamd_cdb_map_helper *cdb_map; + struct lua_map_callback_data *cbdata; +} data; }; struct rspamd_lua_cached_entry { - gint ref; - guint id; +gint ref; +guint id; }; /* Common utility functions */ /** - * Create and register new class - */ +* Create and register new class +*/ void rspamd_lua_new_class (lua_State *L, - const gchar *classname, - const struct luaL_reg *methods); + const gchar *classname, + const struct luaL_reg *methods); /** - * Set class name for object at @param objidx position - */ +* Set class name for object at @param objidx position +*/ void rspamd_lua_setclass (lua_State *L, const gchar *classname, gint objidx); /** - * Pushes the metatable for specific class on top of the stack - * @param L - * @param classname - */ +* Pushes the metatable for specific class on top of the stack +* @param L +* @param classname +*/ void rspamd_lua_class_metatable (lua_State *L, const gchar *classname); /** - * Adds a new field to the class (metatable) identified by `classname` - * @param L - * @param classname - * @param meth - */ +* Adds a new field to the class (metatable) identified by `classname` +* @param L +* @param classname +* @param meth +*/ void rspamd_lua_add_metamethod (lua_State *L, const gchar *classname, - luaL_Reg *meth); + luaL_Reg *meth); /** - * Set index of table to value (like t['index'] = value) - */ +* Set index of table to value (like t['index'] = value) +*/ void rspamd_lua_table_set (lua_State *L, const gchar *index, const gchar *value); /** - * Get string value of index in a table (return t['index']) - */ +* Get string value of index in a table (return t['index']) +*/ const gchar *rspamd_lua_table_get (lua_State *L, const gchar *index); /** - * Convert classname to string - */ +* Convert classname to string +*/ gint rspamd_lua_class_tostring (lua_State *L); /** - * Check whether the argument at specified index is of the specified class - */ +* Check whether the argument at specified index is of the specified class +*/ gpointer rspamd_lua_check_class (lua_State *L, gint index, const gchar *name); /** - * Initialize lua and bindings - */ +* Initialize lua and bindings +*/ lua_State *rspamd_lua_init (bool wipe_mem); void rspamd_lua_start_gc (struct rspamd_config *cfg); /** - * Sets field in a global variable - * @param L - * @param global_name - * @param field_name - * @param new_elt - */ +* Sets field in a global variable +* @param L +* @param global_name +* @param field_name +* @param new_elt +*/ void rspamd_plugins_table_push_elt (lua_State *L, const gchar *field_name, - const gchar *new_elt); + const gchar *new_elt); /** - * Load and initialize lua plugins - */ +* Load and initialize lua plugins +*/ gboolean rspamd_init_lua_filters (struct rspamd_config *cfg, bool force_load, bool strict); /** - * Initialize new locked lua_State structure - */ +* Initialize new locked lua_State structure +*/ struct lua_locked_state *rspamd_init_lua_locked (struct rspamd_config *cfg); /** - * Free locked state structure - */ +* Free locked state structure +*/ void rspamd_free_lua_locked (struct lua_locked_state *st); /** - * Push lua ip address - */ +* Push lua ip address +*/ void rspamd_lua_ip_push (lua_State *L, rspamd_inet_addr_t *addr); /** - * Push rspamd task structure to lua - */ +* Push rspamd task structure to lua +*/ void rspamd_lua_task_push (lua_State *L, struct rspamd_task *task); /** - * Return lua ip structure at the specified address - */ +* Return lua ip structure at the specified address +*/ struct rspamd_lua_ip *lua_check_ip (lua_State *L, gint pos); struct rspamd_lua_text *lua_check_text (lua_State *L, gint pos); /** - * Checks for a text or a string. In case of string a pointer to static structure is returned. - * So it should not be reused or placed to Lua stack anyhow! - * However, you can use this function up to 4 times and have distinct static structures - * @param L - * @param pos - * @return - */ +* Checks for a text or a string. In case of string a pointer to static structure is returned. +* So it should not be reused or placed to Lua stack anyhow! +* However, you can use this function up to 4 times and have distinct static structures +* @param L +* @param pos +* @return +*/ struct rspamd_lua_text *lua_check_text_or_string (lua_State *L, gint pos); /* Creates and *pushes* new rspamd text, data is copied if RSPAMD_TEXT_FLAG_OWN is in flags*/ struct rspamd_lua_text *lua_new_text (lua_State *L, const gchar *start, - gsize len, gboolean own); + gsize len, gboolean own); struct rspamd_lua_regexp *lua_check_regexp (lua_State *L, gint pos); enum rspamd_lua_task_header_type { - RSPAMD_TASK_HEADER_PUSH_SIMPLE = 0, - RSPAMD_TASK_HEADER_PUSH_RAW, - RSPAMD_TASK_HEADER_PUSH_FULL, - RSPAMD_TASK_HEADER_PUSH_COUNT, - RSPAMD_TASK_HEADER_PUSH_HAS, +RSPAMD_TASK_HEADER_PUSH_SIMPLE = 0, +RSPAMD_TASK_HEADER_PUSH_RAW, +RSPAMD_TASK_HEADER_PUSH_FULL, +RSPAMD_TASK_HEADER_PUSH_COUNT, +RSPAMD_TASK_HEADER_PUSH_HAS, }; gint rspamd_lua_push_header (lua_State *L, - struct rspamd_mime_header *h, - enum rspamd_lua_task_header_type how); + struct rspamd_mime_header *h, + enum rspamd_lua_task_header_type how); /** - * Push specific header to lua - */ +* Push specific header to lua +*/ gint rspamd_lua_push_header_array (lua_State *L, - const gchar *name, - struct rspamd_mime_header *rh, - enum rspamd_lua_task_header_type how, - gboolean strong); + const gchar *name, + struct rspamd_mime_header *rh, + enum rspamd_lua_task_header_type how, + gboolean strong); /** - * Check for task at the specified position - */ +* Check for task at the specified position +*/ struct rspamd_task *lua_check_task (lua_State *L, gint pos); struct rspamd_task *lua_check_task_maybe (lua_State *L, gint pos); @@ -301,21 +311,21 @@ struct rspamd_task *lua_check_task_maybe (lua_State *L, gint pos); struct rspamd_lua_map *lua_check_map (lua_State *L, gint pos); /** - * Push ip address from a string (nil is pushed if a string cannot be converted) - */ +* Push ip address from a string (nil is pushed if a string cannot be converted) +*/ void rspamd_lua_ip_push_fromstring (lua_State *L, const gchar *ip_str); /** - * Create type error - */ +* Create type error +*/ int rspamd_lua_typerror (lua_State *L, int narg, const char *tname); /** - * Open libraries functions - */ +* Open libraries functions +*/ /** - * Add preload function - */ +* Add preload function +*/ void rspamd_lua_add_preload (lua_State *L, const gchar *name, lua_CFunction func); void luaopen_task (lua_State *L); @@ -391,30 +401,30 @@ void luaopen_parsers (lua_State *L); void rspamd_lua_dostring (const gchar *line); double rspamd_lua_normalize (struct rspamd_config *cfg, - long double score, - void *params); + long double score, + void *params); /* Config file functions */ void rspamd_lua_post_load_config (struct rspamd_config *cfg); gboolean rspamd_lua_handle_param (struct rspamd_task *task, - gchar *mname, - gchar *optname, - enum lua_var_type expected_type, - gpointer *res); + gchar *mname, + gchar *optname, + enum lua_var_type expected_type, + gpointer *res); gboolean rspamd_lua_check_condition (struct rspamd_config *cfg, - const gchar *condition); + const gchar *condition); void rspamd_lua_dumpstack (lua_State *L); /* Set lua path according to the configuration */ void rspamd_lua_set_path (lua_State *L, const ucl_object_t *cfg_obj, - GHashTable *vars); + GHashTable *vars); /* Set some lua globals */ gboolean rspamd_lua_set_env (lua_State *L, GHashTable *vars, char **lua_env, - GError **err); + GError **err); void rspamd_lua_set_globals (struct rspamd_config *cfg, lua_State *L); @@ -431,51 +441,51 @@ struct rspamd_dns_resolver *lua_check_dns_resolver (lua_State *L, gint pos); struct rspamd_lua_url *lua_check_url (lua_State * L, gint pos); enum rspamd_lua_parse_arguments_flags { - RSPAMD_LUA_PARSE_ARGUMENTS_DEFAULT = 0, - RSPAMD_LUA_PARSE_ARGUMENTS_IGNORE_MISSING, +RSPAMD_LUA_PARSE_ARGUMENTS_DEFAULT = 0, +RSPAMD_LUA_PARSE_ARGUMENTS_IGNORE_MISSING, }; /** - * Extract an arguments from lua table according to format string. Supported arguments are: - * [*]key=S|I|N|B|V|U{a-z};[key=...] - * - S - const char * - * - I - gint64_t - * - i - int32_t - * - N - double - * - B - gboolean - * - V - size_t + const char * - * - U{classname} - userdata of the following class (stored in gpointer) - * - F - function - * - O - ucl_object_t * - * - D - same as N but argument is set to NAN not to 0.0 - * - u{classname} - userdata of the following class (stored directly) - * - * If any of keys is prefixed with `*` then it is treated as required argument - * @param L lua state - * @param pos at which pos start extraction - * @param err error pointer - * @param how extraction type (IGNORE_MISSING means that default values will not be set) - * @param extraction_pattern static pattern - * @return TRUE if a table has been parsed - */ +* Extract an arguments from lua table according to format string. Supported arguments are: +* [*]key=S|I|N|B|V|U{a-z};[key=...] +* - S - const char * +* - I - gint64_t +* - i - int32_t +* - N - double +* - B - gboolean +* - V - size_t + const char * +* - U{classname} - userdata of the following class (stored in gpointer) +* - F - function +* - O - ucl_object_t * +* - D - same as N but argument is set to NAN not to 0.0 +* - u{classname} - userdata of the following class (stored directly) +* +* If any of keys is prefixed with `*` then it is treated as required argument +* @param L lua state +* @param pos at which pos start extraction +* @param err error pointer +* @param how extraction type (IGNORE_MISSING means that default values will not be set) +* @param extraction_pattern static pattern +* @return TRUE if a table has been parsed +*/ gboolean rspamd_lua_parse_table_arguments (lua_State *L, gint pos, - GError **err, - enum rspamd_lua_parse_arguments_flags how, - const gchar *extraction_pattern, ...); + GError **err, + enum rspamd_lua_parse_arguments_flags how, + const gchar *extraction_pattern, ...); gint rspamd_lua_traceback (lua_State *L); /** - * Returns stack trace as a string. Caller should clear memory. - * @param L - * @return - */ +* Returns stack trace as a string. Caller should clear memory. +* @param L +* @return +*/ void rspamd_lua_get_traceback_string (lua_State *L, luaL_Buffer *buf); /** - * Returns size of table at position `tbl_pos` - */ +* Returns size of table at position `tbl_pos` +*/ guint rspamd_lua_table_size (lua_State *L, gint tbl_pos); void lua_push_emails_address_list (lua_State *L, GPtrArray *addrs, int flags); @@ -484,160 +494,160 @@ void lua_push_emails_address_list (lua_State *L, GPtrArray *addrs, int flags); #define TRACE_POINTS 6 struct lua_logger_trace { - gint cur_level; - gconstpointer traces[TRACE_POINTS]; +gint cur_level; +gconstpointer traces[TRACE_POINTS]; }; enum lua_logger_escape_type { - LUA_ESCAPE_NONE = (0u), - LUA_ESCAPE_UNPRINTABLE = (1u << 0u), - LUA_ESCAPE_NEWLINES = (1u << 1u), - LUA_ESCAPE_8BIT = (1u << 2u), +LUA_ESCAPE_NONE = (0u), +LUA_ESCAPE_UNPRINTABLE = (1u << 0u), +LUA_ESCAPE_NEWLINES = (1u << 1u), +LUA_ESCAPE_8BIT = (1u << 2u), }; #define LUA_ESCAPE_LOG (LUA_ESCAPE_UNPRINTABLE|LUA_ESCAPE_NEWLINES) #define LUA_ESCAPE_ALL (LUA_ESCAPE_UNPRINTABLE|LUA_ESCAPE_NEWLINES|LUA_ESCAPE_8BIT) /** - * Log lua object to string - * @param L - * @param pos - * @param outbuf - * @param len - * @return - */ +* Log lua object to string +* @param L +* @param pos +* @param outbuf +* @param len +* @return +*/ gsize lua_logger_out_type (lua_State *L, gint pos, gchar *outbuf, - gsize len, struct lua_logger_trace *trace, - enum lua_logger_escape_type esc_type); + gsize len, struct lua_logger_trace *trace, + enum lua_logger_escape_type esc_type); /** - * Safely checks userdata to match specified class - * @param L - * @param pos - * @param classname - */ +* Safely checks userdata to match specified class +* @param L +* @param pos +* @param classname +*/ void *rspamd_lua_check_udata (lua_State *L, gint pos, const gchar *classname); /** - * Safely checks userdata to match specified class - * @param L - * @param pos - * @param classname - */ +* Safely checks userdata to match specified class +* @param L +* @param pos +* @param classname +*/ void *rspamd_lua_check_udata_maybe (lua_State *L, gint pos, const gchar *classname); /** - * Call finishing script with the specified task - * @param sc - * @param task - */ +* Call finishing script with the specified task +* @param sc +* @param task +*/ void lua_call_finish_script (struct rspamd_config_cfg_lua_script *sc, - struct rspamd_task *task); + struct rspamd_task *task); /** - * Run post-load operations - * @param L - * @param cfg - * @param ev_base - */ +* Run post-load operations +* @param L +* @param cfg +* @param ev_base +*/ void rspamd_lua_run_postloads (lua_State *L, struct rspamd_config *cfg, - struct ev_loop *ev_base, struct rspamd_worker *w); + struct ev_loop *ev_base, struct rspamd_worker *w); void rspamd_lua_run_config_post_init (lua_State *L, struct rspamd_config *cfg); void rspamd_lua_run_config_unload (lua_State *L, struct rspamd_config *cfg); /** - * Adds new destructor for a local function for specific pool - * @param L - * @param pool - * @param ref - */ +* Adds new destructor for a local function for specific pool +* @param L +* @param pool +* @param ref +*/ void rspamd_lua_add_ref_dtor (lua_State *L, rspamd_mempool_t *pool, - gint ref); + gint ref); /** - * Tries to load some module using `require` and get some method from it - * @param L - * @param modname - * @param funcname - * @return TRUE if function exists in that module, the function is pushed in stack, otherwise stack is unchanged and FALSE is returned - */ +* Tries to load some module using `require` and get some method from it +* @param L +* @param modname +* @param funcname +* @return TRUE if function exists in that module, the function is pushed in stack, otherwise stack is unchanged and FALSE is returned +*/ gboolean rspamd_lua_require_function (lua_State *L, const gchar *modname, - const gchar *funcname); + const gchar *funcname); /** - * Tries to load redis server definition from ucl object specified - * @param L - * @param obj - * @param cfg - * @return - */ +* Tries to load redis server definition from ucl object specified +* @param L +* @param obj +* @param cfg +* @return +*/ gboolean rspamd_lua_try_load_redis (lua_State *L, const ucl_object_t *obj, - struct rspamd_config *cfg, gint *ref_id); + struct rspamd_config *cfg, gint *ref_id); struct rspamd_stat_token_s; /** - * Pushes a single word into Lua - * @param L - * @param word - */ +* Pushes a single word into Lua +* @param L +* @param word +*/ void rspamd_lua_push_full_word (lua_State *L, struct rspamd_stat_token_s *word); enum rspamd_lua_words_type { - RSPAMD_LUA_WORDS_STEM = 0, - RSPAMD_LUA_WORDS_NORM, - RSPAMD_LUA_WORDS_RAW, - RSPAMD_LUA_WORDS_FULL, - RSPAMD_LUA_WORDS_MAX +RSPAMD_LUA_WORDS_STEM = 0, +RSPAMD_LUA_WORDS_NORM, +RSPAMD_LUA_WORDS_RAW, +RSPAMD_LUA_WORDS_FULL, +RSPAMD_LUA_WORDS_MAX }; /** - * Pushes words (rspamd_stat_token_t) to Lua - * @param L - * @param words - * @param how - */ +* Pushes words (rspamd_stat_token_t) to Lua +* @param L +* @param words +* @param how +*/ gint rspamd_lua_push_words (lua_State *L, GArray *words, - enum rspamd_lua_words_type how); + enum rspamd_lua_words_type how); /** - * Returns newly allocated name for caller module name - * @param L - * @return - */ +* Returns newly allocated name for caller module name +* @param L +* @return +*/ gchar *rspamd_lua_get_module_name (lua_State *L); /** - * Call Lua function in a universal way. Arguments string: - * - i - lua_integer, argument - gint64 - * - n - lua_number, argument - gdouble - * - s - lua_string, argument - const gchar * (zero terminated) - * - l - lua_lstring, argument - (size_t + const gchar *) pair - * - u - lua_userdata, argument - (const char * + void *) - classname + pointer - * - b - lua_boolean, argument - gboolean (not bool due to varargs promotion) - * - f - lua_function, argument - int - position of the function on stack (not lua_registry) - * - t - lua_text, argument - int - position of the lua_text on stack (not lua_registry) - * @param L lua state - * @param cbref LUA_REGISTRY reference - * @param strloc where this function is called from - * @param nret number of results (or LUA_MULTRET) - * @param args arguments format string - * @param err error to promote - * @param ... arguments - * @return true of pcall returned 0, false + err otherwise - */ +* Call Lua function in a universal way. Arguments string: +* - i - lua_integer, argument - gint64 +* - n - lua_number, argument - gdouble +* - s - lua_string, argument - const gchar * (zero terminated) +* - l - lua_lstring, argument - (size_t + const gchar *) pair +* - u - lua_userdata, argument - (const char * + void *) - classname + pointer +* - b - lua_boolean, argument - gboolean (not bool due to varargs promotion) +* - f - lua_function, argument - int - position of the function on stack (not lua_registry) +* - t - lua_text, argument - int - position of the lua_text on stack (not lua_registry) +* @param L lua state +* @param cbref LUA_REGISTRY reference +* @param strloc where this function is called from +* @param nret number of results (or LUA_MULTRET) +* @param args arguments format string +* @param err error to promote +* @param ... arguments +* @return true of pcall returned 0, false + err otherwise +*/ bool rspamd_lua_universal_pcall (lua_State *L, gint cbref, const gchar* strloc, - gint nret, const gchar *args, GError **err, ...); + gint nret, const gchar *args, GError **err, ...); /** - * Wrapper for lua_geti from lua 5.3 - * @param L - * @param index - * @param i - * @return - */ +* Wrapper for lua_geti from lua 5.3 +* @param L +* @param index +* @param i +* @return +*/ #if defined( LUA_VERSION_NUM ) && LUA_VERSION_NUM <= 502 gint rspamd_lua_geti (lua_State *L, int index, int i); #else diff --git a/src/lua/lua_thread_pool.c b/src/lua/lua_thread_pool.c deleted file mode 100644 index 50c53698c..000000000 --- a/src/lua/lua_thread_pool.c +++ /dev/null @@ -1,349 +0,0 @@ -/*- - * Copyright 2018 Mikhail Galanin - * Copyright 2019 Vsevolod Stakhov - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include "config.h" - -#include "lua_common.h" -#include "lua_thread_pool.h" - -#define msg_debug_lua_threads(...) rspamd_conditional_debug_fast (NULL, NULL, \ - rspamd_lua_threads_log_id, "lua_threads", NULL, \ - G_STRFUNC, \ - __VA_ARGS__) - -INIT_LOG_MODULE(lua_threads) - -struct lua_thread_pool { - GQueue *available_items; - lua_State *L; - gint max_items; - struct thread_entry *running_entry; -}; - -static struct thread_entry * -thread_entry_new (lua_State * L) -{ - struct thread_entry *ent; - ent = g_new0(struct thread_entry, 1); - ent->lua_state = lua_newthread (L); - ent->thread_index = luaL_ref (L, LUA_REGISTRYINDEX); - - return ent; -} - -static void -thread_entry_free (lua_State * L, struct thread_entry *ent) -{ - luaL_unref (L, LUA_REGISTRYINDEX, ent->thread_index); - g_free (ent); -} - -struct lua_thread_pool * -lua_thread_pool_new (lua_State * L) -{ - struct lua_thread_pool * pool = g_new0 (struct lua_thread_pool, 1); - - pool->L = L; - pool->max_items = 100; - - pool->available_items = g_queue_new (); - int i; - - struct thread_entry *ent; - for (i = 0; i < MAX(2, pool->max_items / 10); i ++) { - ent = thread_entry_new (pool->L); - g_queue_push_head (pool->available_items, ent); - } - - return pool; -} - -void -lua_thread_pool_free (struct lua_thread_pool *pool) -{ - struct thread_entry *ent = NULL; - while (!g_queue_is_empty (pool->available_items)) { - ent = g_queue_pop_head (pool->available_items); - thread_entry_free (pool->L, ent); - } - g_queue_free (pool->available_items); - g_free (pool); -} - -static struct thread_entry *lua_thread_pool_get (struct lua_thread_pool *pool); - -struct thread_entry * -lua_thread_pool_get_for_task (struct rspamd_task *task) -{ - struct thread_entry *ent = lua_thread_pool_get (task->cfg->lua_thread_pool); - - ent->task = task; - - return ent; -} - -struct thread_entry * -lua_thread_pool_get_for_config (struct rspamd_config *cfg) -{ - struct thread_entry *ent = lua_thread_pool_get (cfg->lua_thread_pool); - - ent->cfg = cfg; - - return ent; -} - -static struct thread_entry * -lua_thread_pool_get (struct lua_thread_pool *pool) -{ - gpointer cur; - struct thread_entry *ent = NULL; - cur = g_queue_pop_head (pool->available_items); - - if (cur) { - ent = cur; - } - else { - ent = thread_entry_new (pool->L); - } - - pool->running_entry = ent; - - return ent; -} - -void -lua_thread_pool_return_full (struct lua_thread_pool *pool, - struct thread_entry *thread_entry, const gchar *loc) -{ - /* we can't return a running/yielded thread into the pool */ - g_assert (lua_status (thread_entry->lua_state) == 0); - - if (pool->running_entry == thread_entry) { - pool->running_entry = NULL; - } - - if (g_queue_get_length (pool->available_items) <= pool->max_items) { - thread_entry->cd = NULL; - thread_entry->finish_callback = NULL; - thread_entry->error_callback = NULL; - thread_entry->task = NULL; - thread_entry->cfg = NULL; - - msg_debug_lua_threads ("%s: returned thread to the threads pool %ud items", - loc, - g_queue_get_length (pool->available_items)); - - g_queue_push_head (pool->available_items, thread_entry); - } - else { - msg_debug_lua_threads ("%s: removed thread as thread pool has %ud items", - loc, - g_queue_get_length (pool->available_items)); - thread_entry_free (pool->L, thread_entry); - } -} - -void -lua_thread_pool_terminate_entry_full (struct lua_thread_pool *pool, - struct thread_entry *thread_entry, const gchar *loc, - bool enforce) -{ - struct thread_entry *ent = NULL; - - - if (!enforce) { - /* we should only terminate failed threads */ - g_assert (lua_status(thread_entry->lua_state) != 0 && - lua_status(thread_entry->lua_state) != LUA_YIELD); - } - - if (pool->running_entry == thread_entry) { - pool->running_entry = NULL; - } - - msg_debug_lua_threads ("%s: terminated thread entry", loc); - thread_entry_free (pool->L, thread_entry); - - if (g_queue_get_length (pool->available_items) <= pool->max_items) { - ent = thread_entry_new (pool->L); - g_queue_push_head (pool->available_items, ent); - } -} - -struct thread_entry * -lua_thread_pool_get_running_entry_full (struct lua_thread_pool *pool, - const gchar *loc) -{ - msg_debug_lua_threads ("%s: lua_thread_pool_get_running_entry_full", loc); - return pool->running_entry; -} - -void -lua_thread_pool_set_running_entry_full (struct lua_thread_pool *pool, - struct thread_entry *thread_entry, - const gchar *loc) -{ - msg_debug_lua_threads ("%s: lua_thread_pool_set_running_entry_full", loc); - pool->running_entry = thread_entry; -} - -static void -lua_thread_pool_set_running_entry_for_thread (struct thread_entry *thread_entry, - const gchar *loc) -{ - struct lua_thread_pool *pool; - - if (thread_entry->task) { - pool = thread_entry->task->cfg->lua_thread_pool; - } - else { - pool = thread_entry->cfg->lua_thread_pool; - } - - lua_thread_pool_set_running_entry_full (pool, thread_entry, loc); -} - -void -lua_thread_pool_prepare_callback_full (struct lua_thread_pool *pool, - struct lua_callback_state *cbs, const gchar *loc) -{ - msg_debug_lua_threads ("%s: lua_thread_pool_prepare_callback_full", loc); - cbs->thread_pool = pool; - cbs->previous_thread = lua_thread_pool_get_running_entry_full (pool, loc); - cbs->my_thread = lua_thread_pool_get (pool); - cbs->L = cbs->my_thread->lua_state; -} - -void -lua_thread_pool_restore_callback_full (struct lua_callback_state *cbs, - const gchar *loc) -{ - lua_thread_pool_return_full (cbs->thread_pool, cbs->my_thread, loc); - lua_thread_pool_set_running_entry_full (cbs->thread_pool, - cbs->previous_thread, loc); -} - -static gint -lua_do_resume_full (lua_State *L, gint narg, const gchar *loc) -{ - msg_debug_lua_threads ("%s: lua_do_resume_full", loc); -#if LUA_VERSION_NUM < 502 - return lua_resume (L, narg); -#else - #if LUA_VERSION_NUM >= 504 - return lua_resume (L, NULL, narg, NULL); - #else - return lua_resume (L, NULL, narg); - #endif -#endif -} - -static void lua_resume_thread_internal_full (struct thread_entry *thread_entry, - gint narg, const gchar *loc); - -void -lua_thread_call_full (struct thread_entry *thread_entry, - int narg, const gchar *loc) -{ - g_assert (lua_status (thread_entry->lua_state) == 0); /* we can't call running/yielded thread */ - g_assert (thread_entry->task != NULL || thread_entry->cfg != NULL); /* we can't call without pool */ - - lua_resume_thread_internal_full (thread_entry, narg, loc); -} - -void -lua_thread_resume_full (struct thread_entry *thread_entry, gint narg, - const gchar *loc) -{ - /* - * The only state where we can resume from is LUA_YIELD - * Another acceptable status is OK (0) but in that case we should push function on stack - * to start the thread from, which is happening in lua_thread_call(), not in this function. - */ - g_assert (lua_status (thread_entry->lua_state) == LUA_YIELD); - msg_debug_lua_threads ("%s: lua_thread_resume_full", loc); - lua_thread_pool_set_running_entry_for_thread (thread_entry, loc); - - lua_resume_thread_internal_full (thread_entry, narg, loc); -} - -static void -lua_resume_thread_internal_full (struct thread_entry *thread_entry, - gint narg, const gchar *loc) -{ - gint ret; - struct lua_thread_pool *pool; - struct rspamd_task *task; - - msg_debug_lua_threads ("%s: lua_resume_thread_internal_full", loc); - ret = lua_do_resume_full (thread_entry->lua_state, narg, loc); - - if (ret != LUA_YIELD) { - /* - LUA_YIELD state should not be handled here. - It should only happen when the thread initiated a asynchronous event and it will be restored as soon - the event is finished - */ - - if (thread_entry->task) { - pool = thread_entry->task->cfg->lua_thread_pool; - } - else { - pool = thread_entry->cfg->lua_thread_pool; - } - - if (ret == 0) { - if (thread_entry->finish_callback) { - thread_entry->finish_callback (thread_entry, ret); - } - lua_thread_pool_return_full (pool, thread_entry, loc); - } - else { - rspamd_lua_traceback (thread_entry->lua_state); - if (thread_entry->error_callback) { - thread_entry->error_callback (thread_entry, ret, - lua_tostring (thread_entry->lua_state, -1)); - } - else if (thread_entry->task) { - task = thread_entry->task; - msg_err_task ("lua call failed (%d): %s", ret, - lua_tostring (thread_entry->lua_state, -1)); - } - else { - msg_err ("lua call failed (%d): %s", ret, - lua_tostring (thread_entry->lua_state, -1)); - } - - /* - * Maybe there is a way to recover here. - * For now, just remove faulty thread - */ - lua_thread_pool_terminate_entry_full (pool, thread_entry, loc, false); - } - } -} - -gint -lua_thread_yield_full (struct thread_entry *thread_entry, - gint nresults, - const gchar *loc) -{ - g_assert (lua_status (thread_entry->lua_state) == 0); - - msg_debug_lua_threads ("%s: lua_thread_yield_full", loc); - return lua_yield (thread_entry->lua_state, nresults); -} diff --git a/src/lua/lua_thread_pool.cxx b/src/lua/lua_thread_pool.cxx new file mode 100644 index 000000000..62da49482 --- /dev/null +++ b/src/lua/lua_thread_pool.cxx @@ -0,0 +1,365 @@ +/*- + * Copyright 2018 Mikhail Galanin + * Copyright 2019 Vsevolod Stakhov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "config.h" + +#include "lua_common.h" +#include "lua_thread_pool.h" + +#include + +#define msg_debug_lua_threads(...) rspamd_conditional_debug_fast (NULL, NULL, \ + rspamd_lua_threads_log_id, "lua_threads", NULL, \ + G_STRFUNC, \ + __VA_ARGS__) + +INIT_LOG_MODULE(lua_threads) + +static struct thread_entry *thread_entry_new(lua_State *L); +static void thread_entry_free(lua_State *L, struct thread_entry *ent); + +#define CFG_POOL_GET(cfg) (reinterpret_cast((cfg)->lua_thread_pool)) + +struct lua_thread_pool { + std::vector available_items; + lua_State *L; + gint max_items; + struct thread_entry *running_entry; + static const int default_max_items = 100; + + lua_thread_pool(lua_State *L, gint max_items = default_max_items) : + L(L), max_items(max_items) { + running_entry = nullptr; + available_items.reserve(max_items); + + for (auto i = 0; i < MAX(2, max_items / 10); i++) { + auto *ent = thread_entry_new(L); + available_items.push_back(ent); + } + } + + ~lua_thread_pool() { + for (auto *ent : available_items) { + thread_entry_free(L, ent); + } + } + + auto get_thread() -> struct thread_entry * { + struct thread_entry *ent; + + if (!available_items.empty()) { + ent = available_items.back(); + available_items.pop_back(); + } + else { + ent = thread_entry_new(L); + } + + running_entry = ent; + + return ent; + } + + auto return_thread(struct thread_entry *thread_entry, const gchar *loc) -> void { + /* we can't return a running/yielded thread into the pool */ + g_assert (lua_status(thread_entry->lua_state) == 0); + + if (running_entry == thread_entry) { + running_entry = NULL; + } + + if (available_items.size() <= max_items) { + thread_entry->cd = NULL; + thread_entry->finish_callback = NULL; + thread_entry->error_callback = NULL; + thread_entry->task = NULL; + thread_entry->cfg = NULL; + + msg_debug_lua_threads ("%s: returned thread to the threads pool %ud items", + loc, + available_items.size()); + + available_items.push_back(thread_entry); + } + else { + msg_debug_lua_threads ("%s: removed thread as thread pool has %ud items", + loc, + available_items.size()); + thread_entry_free(L, thread_entry); + } + } + + auto terminate_thread(struct thread_entry *thread_entry, + const gchar *loc, + bool enforce) -> void { + struct thread_entry *ent = NULL; + + if (!enforce) { + /* we should only terminate failed threads */ + g_assert (lua_status(thread_entry->lua_state) != 0 && + lua_status(thread_entry->lua_state) != LUA_YIELD); + } + + if (running_entry == thread_entry) { + running_entry = NULL; + } + + msg_debug_lua_threads ("%s: terminated thread entry", loc); + thread_entry_free(L, thread_entry); + + if (available_items.size() <= max_items) { + ent = thread_entry_new(L); + available_items.push_back(ent); + } + } + + auto get_running_entry(void) -> struct thread_entry * { + return running_entry; + }; + + auto set_running_entry(struct thread_entry *ent) -> struct thread_entry * { + auto *old_entry = running_entry; + running_entry = ent; + return old_entry; + }; +}; + +static struct thread_entry * +thread_entry_new(lua_State *L) +{ + struct thread_entry *ent; + ent = g_new0(struct thread_entry, 1); + ent->lua_state = lua_newthread(L); + ent->thread_index = luaL_ref(L, LUA_REGISTRYINDEX); + + return ent; +} + +static void +thread_entry_free(lua_State *L, struct thread_entry *ent) +{ + luaL_unref(L, LUA_REGISTRYINDEX, ent->thread_index); + g_free(ent); +} + +struct lua_thread_pool * +lua_thread_pool_new(lua_State *L) +{ + auto *pool = new lua_thread_pool(L); + return pool; +} + +void +lua_thread_pool_free(struct lua_thread_pool *pool) +{ + delete pool; +} + + +struct thread_entry * +lua_thread_pool_get_for_task(struct rspamd_task *task) +{ + struct thread_entry *ent = CFG_POOL_GET(task->cfg)->get_thread(); + + ent->task = task; + + return ent; +} + +struct thread_entry * +lua_thread_pool_get_for_config(struct rspamd_config *cfg) +{ + struct thread_entry *ent = CFG_POOL_GET(cfg)->get_thread(); + + ent->cfg = cfg; + + return ent; +} + +void +lua_thread_pool_return_full(struct lua_thread_pool *pool, + struct thread_entry *thread_entry, const gchar *loc) +{ + pool->return_thread(thread_entry, loc); +} + +void +lua_thread_pool_terminate_entry_full(struct lua_thread_pool *pool, + struct thread_entry *thread_entry, const gchar *loc, + bool enforce) +{ + pool->terminate_thread(thread_entry, loc, enforce); +} + +struct thread_entry * +lua_thread_pool_get_running_entry_full(struct lua_thread_pool *pool, + const gchar *loc) +{ + msg_debug_lua_threads ("%s: lua_thread_pool_get_running_entry_full", loc); + return pool->get_running_entry(); +} + +void +lua_thread_pool_set_running_entry_full(struct lua_thread_pool *pool, + struct thread_entry *thread_entry, + const gchar *loc) +{ + msg_debug_lua_threads ("%s: lua_thread_pool_set_running_entry_full", loc); + pool->set_running_entry(thread_entry); +} + +static void +lua_thread_pool_set_running_entry_for_thread(struct thread_entry *thread_entry, + const gchar *loc) +{ + struct lua_thread_pool *pool; + + if (thread_entry->task) { + pool = CFG_POOL_GET(thread_entry->task->cfg); + } + else { + pool = CFG_POOL_GET(thread_entry->cfg); + } + + lua_thread_pool_set_running_entry_full(pool, thread_entry, loc); +} + +void +lua_thread_pool_prepare_callback_full(struct lua_thread_pool *pool, + struct lua_callback_state *cbs, + const gchar *loc) +{ + msg_debug_lua_threads ("%s: lua_thread_pool_prepare_callback_full", loc); + cbs->thread_pool = pool; + cbs->previous_thread = lua_thread_pool_get_running_entry_full(pool, loc); + cbs->my_thread = pool->get_thread(); + cbs->L = cbs->my_thread->lua_state; +} + +void +lua_thread_pool_restore_callback_full(struct lua_callback_state *cbs, + const gchar *loc) +{ + lua_thread_pool_return_full(cbs->thread_pool, cbs->my_thread, loc); + lua_thread_pool_set_running_entry_full(cbs->thread_pool, + cbs->previous_thread, loc); +} + +static gint +lua_do_resume_full(lua_State *L, gint narg, const gchar *loc) { + msg_debug_lua_threads ("%s: lua_do_resume_full", loc); +#if LUA_VERSION_NUM < 502 + return lua_resume(L, narg); +#else +#if LUA_VERSION_NUM >= 504 + return lua_resume (L, NULL, narg, NULL); +#else + return lua_resume (L, NULL, narg); +#endif +#endif +} + +static void +lua_resume_thread_internal_full(struct thread_entry *thread_entry, + gint narg, const gchar *loc) { + gint ret; + struct lua_thread_pool *pool; + struct rspamd_task *task; + + msg_debug_lua_threads ("%s: lua_resume_thread_internal_full", loc); + ret = lua_do_resume_full(thread_entry->lua_state, narg, loc); + + if (ret != LUA_YIELD) { + /* + LUA_YIELD state should not be handled here. + It should only happen when the thread initiated a asynchronous event and it will be restored as soon + the event is finished + */ + + if (thread_entry->task) { + pool = CFG_POOL_GET(thread_entry->task->cfg); + } + else { + pool = CFG_POOL_GET(thread_entry->cfg); + } + + if (ret == 0) { + if (thread_entry->finish_callback) { + thread_entry->finish_callback(thread_entry, ret); + } + + pool->return_thread(thread_entry, loc); + } + else { + rspamd_lua_traceback(thread_entry->lua_state); + if (thread_entry->error_callback) { + thread_entry->error_callback(thread_entry, ret, + lua_tostring (thread_entry->lua_state, -1)); + } + else if (thread_entry->task) { + task = thread_entry->task; + msg_err_task ("lua call failed (%d): %s", ret, + lua_tostring(thread_entry->lua_state, -1)); + } + else { + msg_err ("lua call failed (%d): %s", ret, + lua_tostring(thread_entry->lua_state, -1)); + } + + /* + * Maybe there is a way to recover here. + * For now, just remove faulty thread + */ + pool->terminate_thread(thread_entry, loc, false); + } + } +} + +void +lua_thread_resume_full(struct thread_entry *thread_entry, gint narg, + const gchar *loc) +{ + /* + * The only state where we can resume from is LUA_YIELD + * Another acceptable status is OK (0) but in that case we should push function on stack + * to start the thread from, which is happening in lua_thread_call(), not in this function. + */ + g_assert (lua_status(thread_entry->lua_state) == LUA_YIELD); + msg_debug_lua_threads ("%s: lua_thread_resume_full", loc); + lua_thread_pool_set_running_entry_for_thread(thread_entry, loc); + lua_resume_thread_internal_full(thread_entry, narg, loc); +} + +void +lua_thread_call_full(struct thread_entry *thread_entry, + int narg, const gchar *loc) +{ + g_assert (lua_status(thread_entry->lua_state) == 0); /* we can't call running/yielded thread */ + g_assert (thread_entry->task != NULL || thread_entry->cfg != NULL); /* we can't call without pool */ + + lua_resume_thread_internal_full(thread_entry, narg, loc); +} + +gint +lua_thread_yield_full(struct thread_entry *thread_entry, + gint nresults, + const gchar *loc) { + g_assert (lua_status(thread_entry->lua_state) == 0); + + msg_debug_lua_threads ("%s: lua_thread_yield_full", loc); + return lua_yield(thread_entry->lua_state, nresults); +} -- 2.39.5