From e1a8ed50131891516f5da6e22aae69a306147d38 Mon Sep 17 00:00:00 2001 From: Vsevolod Stakhov Date: Thu, 26 Jul 2012 21:50:13 +0400 Subject: [PATCH] * Add lua worker type and lua worker bindings. * Add lua utility library for basic utils. * Fixes lua_buffer code. Fix lua loading error. Added some other lua utility functions. --- CMakeLists.txt | 3 +- src/cfg_xml.c | 22 +- src/lua/lua_buffer.c | 41 ++-- src/lua/lua_cdb.c | 2 + src/lua/lua_classifier.c | 4 + src/lua/lua_common.c | 183 +++++++++++++- src/lua/lua_common.h | 8 +- src/lua/lua_config.c | 8 + src/lua/lua_mempool.c | 2 + src/lua/lua_message.c | 2 + src/lua/lua_session.c | 4 + src/lua/lua_task.c | 117 ++++++++- src/lua/lua_upstream.c | 4 + src/lua_worker.c | 516 +++++++++++++++++++++++++++++++++++++++ src/worker_util.c | 4 +- 15 files changed, 880 insertions(+), 40 deletions(-) create mode 100644 src/lua_worker.c diff --git a/CMakeLists.txt b/CMakeLists.txt index d01a66722..5379869ac 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -930,6 +930,7 @@ SET(RSPAMDSRC src/modules.c src/fuzzy_storage.c src/kvstorage_server.c src/lmtp.c + src/lua_worker.c src/main.c src/map.c src/smtp.c @@ -951,7 +952,7 @@ SET(PLUGINSSRC src/plugins/surbl.c src/plugins/dkim_check.c) SET(MODULES_LIST surbl regexp chartable fuzzy_check spf dkim) -SET(WORKERS_LIST normal controller smtp smtp_proxy lmtp fuzzy keystorage) +SET(WORKERS_LIST normal controller smtp smtp_proxy lmtp fuzzy keystorage lua) AddModules(MODULES_LIST WORKERS_LIST) diff --git a/src/cfg_xml.c b/src/cfg_xml.c index ad542265d..45b678548 100644 --- a/src/cfg_xml.c +++ b/src/cfg_xml.c @@ -862,7 +862,27 @@ worker_foreach_callback (gpointer k, gpointer v, gpointer ud) if (!worker_options || (worker_config = g_hash_table_lookup (worker_options, &cd->wrk->type)) == NULL || (cparam = g_hash_table_lookup (worker_config, k)) == NULL) { - msg_warn ("unregistered worker attribute '%s' for worker %s", k, g_quark_to_string (cd->wrk->type)); + /* Try to use universal handler if there is no specific handler */ + if ((cparam = g_hash_table_lookup (worker_config, "*")) != NULL) { + if (cd->wrk->ctx != NULL) { + if (param->is_list) { + cur = param->d.list; + while (cur) { + cparam->handler (cd->cfg, cd->ctx, NULL, cur->data, k, cd->wrk->ctx, cparam->offset); + cur = g_list_next (cur); + } + } + else { + cparam->handler (cd->cfg, cd->ctx, NULL, param->d.param, k, cd->wrk->ctx, cparam->offset); + } + } + else { + msg_err ("Bad error detected: worker %s has not initialized its context", g_quark_to_string (cd->wrk->type)); + } + } + else { + msg_warn ("unregistered worker attribute '%s' for worker %s", k, g_quark_to_string (cd->wrk->type)); + } } else { diff --git a/src/lua/lua_buffer.c b/src/lua/lua_buffer.c index bb5b1ea8c..9c18ef5cf 100644 --- a/src/lua/lua_buffer.c +++ b/src/lua/lua_buffer.c @@ -34,14 +34,14 @@ LUA_FUNCTION_DEF (io_dispatcher, set_policy); LUA_FUNCTION_DEF (io_dispatcher, write); LUA_FUNCTION_DEF (io_dispatcher, pause); LUA_FUNCTION_DEF (io_dispatcher, restore); -LUA_FUNCTION_DEF (io_dispatcher, delete); +LUA_FUNCTION_DEF (io_dispatcher, destroy); static const struct luaL_reg io_dispatcherlib_m[] = { LUA_INTERFACE_DEF (io_dispatcher, set_policy), LUA_INTERFACE_DEF (io_dispatcher, write), LUA_INTERFACE_DEF (io_dispatcher, pause), LUA_INTERFACE_DEF (io_dispatcher, restore), - {"__gc", lua_io_dispatcher_delete}, + LUA_INTERFACE_DEF (io_dispatcher, destroy), {"__tostring", lua_class_tostring}, {NULL, NULL} }; @@ -90,12 +90,12 @@ lua_io_read_cb (f_str_t * in, void *arg) need_unlock = TRUE; } /* callback (dispatcher, data) */ + lua_rawgeti (cbdata->L, LUA_REGISTRYINDEX, cbdata->cbref_read); pdispatcher = lua_newuserdata (cbdata->L, sizeof (struct rspamd_io_dispatcher_s *)); lua_setclass (cbdata->L, "rspamd{io_dispatcher}", -1); *pdispatcher = cbdata->d; lua_pushlstring (cbdata->L, in->begin, in->len); - lua_rawgeti (cbdata->L, LUA_REGISTRYINDEX, cbdata->cbref_read); if (lua_pcall (cbdata->L, 2, 1, 0) != 0) { msg_info ("call to session finalizer failed: %s", lua_tostring (cbdata->L, -1)); } @@ -107,16 +107,6 @@ lua_io_read_cb (f_str_t * in, void *arg) g_mutex_unlock (lua_mtx); } - if (!res) { - /* Unref callbacks */ - luaL_unref (cbdata->L, LUA_REGISTRYINDEX, cbdata->cbref_read); - if (cbdata->cbref_write) { - luaL_unref (cbdata->L, LUA_REGISTRYINDEX, cbdata->cbref_write); - } - luaL_unref (cbdata->L, LUA_REGISTRYINDEX, cbdata->cbref_err); - g_slice_free1 (sizeof (struct lua_dispatcher_cbdata), cbdata); - } - return res; } @@ -132,12 +122,13 @@ lua_io_write_cb (void *arg) if (g_mutex_trylock (lua_mtx)) { need_unlock = TRUE; } + lua_rawgeti (cbdata->L, LUA_REGISTRYINDEX, cbdata->cbref_read); /* callback (dispatcher) */ pdispatcher = lua_newuserdata (cbdata->L, sizeof (struct rspamd_io_dispatcher_s *)); lua_setclass (cbdata->L, "rspamd{io_dispatcher}", -1); *pdispatcher = cbdata->d; - lua_rawgeti (cbdata->L, LUA_REGISTRYINDEX, cbdata->cbref_read); + if (lua_pcall (cbdata->L, 1, 1, 0) != 0) { msg_info ("call to session finalizer failed: %s", lua_tostring (cbdata->L, -1)); } @@ -148,14 +139,6 @@ lua_io_write_cb (void *arg) if (need_unlock) { g_mutex_unlock (lua_mtx); } - - if (!res) { - /* Unref callbacks */ - luaL_unref (cbdata->L, LUA_REGISTRYINDEX, cbdata->cbref_read); - luaL_unref (cbdata->L, LUA_REGISTRYINDEX, cbdata->cbref_write); - luaL_unref (cbdata->L, LUA_REGISTRYINDEX, cbdata->cbref_err); - g_slice_free1 (sizeof (struct lua_dispatcher_cbdata), cbdata); - } } return res; @@ -173,12 +156,12 @@ lua_io_err_cb (GError * err, void *arg) need_unlock = TRUE; } /* callback (dispatcher, err) */ + lua_rawgeti (cbdata->L, LUA_REGISTRYINDEX, cbdata->cbref_err); pdispatcher = lua_newuserdata (cbdata->L, sizeof (struct rspamd_io_dispatcher_s *)); lua_setclass (cbdata->L, "rspamd{io_dispatcher}", -1); *pdispatcher = cbdata->d; lua_pushstring (cbdata->L, err->message); - lua_rawgeti (cbdata->L, LUA_REGISTRYINDEX, cbdata->cbref_read); if (lua_pcall (cbdata->L, 2, 0, 0) != 0) { msg_info ("call to session finalizer failed: %s", lua_tostring (cbdata->L, -1)); } @@ -234,8 +217,12 @@ lua_io_dispatcher_create (lua_State *L) tv_num = lua_tonumber (L, 6); tv.tv_sec = trunc (tv_num); tv.tv_usec = modf (tv_num, &tmp) * 1000.; + io_dispatcher = rspamd_create_dispatcher (cbdata->base, fd, BUFFER_LINE, lua_io_read_cb, lua_io_write_cb, lua_io_err_cb, &tv, cbdata); + } + else { + io_dispatcher = rspamd_create_dispatcher (cbdata->base, fd, BUFFER_LINE, lua_io_read_cb, lua_io_write_cb, lua_io_err_cb, NULL, cbdata); } - io_dispatcher = rspamd_create_dispatcher (cbdata->base, fd, BUFFER_LINE, lua_io_read_cb, lua_io_write_cb, lua_io_err_cb, &tv, cbdata); + cbdata->d = io_dispatcher; /* Push result */ pdispatcher = lua_newuserdata (L, sizeof (struct rspamd_io_dispatcher_s *)); @@ -338,7 +325,7 @@ lua_io_dispatcher_restore (lua_State *L) } static int -lua_io_dispatcher_delete (lua_State *L) +lua_io_dispatcher_destroy (lua_State *L) { struct rspamd_io_dispatcher_s *io_dispatcher = lua_check_io_dispatcher (L); @@ -369,10 +356,14 @@ luaopen_io_dispatcher (lua_State * L) luaL_openlib (L, NULL, io_dispatcherlib_m, 0); luaL_openlib(L, "rspamd_io_dispatcher", io_dispatcherlib_f, 0); + lua_pop(L, 1); /* remove metatable from stack */ + /* Simple event class */ lua_newclass (L, "rspamd{ev_base}", null_reg); luaL_openlib (L, "rspamd_ev_base", null_reg, 0); + lua_pop(L, 1); /* remove metatable from stack */ + /* Set buffer types globals */ lua_pushnumber (L, BUFFER_LINE); lua_setglobal (L, "IO_BUFFER_LINE"); diff --git a/src/lua/lua_cdb.c b/src/lua/lua_cdb.c index 7adfeeac5..84fef66d9 100644 --- a/src/lua/lua_cdb.c +++ b/src/lua/lua_cdb.c @@ -161,5 +161,7 @@ luaopen_cdb (lua_State * L) luaL_openlib (L, NULL, cdblib_m, 0); luaL_openlib (L, "cdb", cdblib_f, 0); + lua_pop (L, 1); /* remove metatable from stack */ + return 1; } diff --git a/src/lua/lua_classifier.c b/src/lua/lua_classifier.c index edaf4e7a6..e929c6b50 100644 --- a/src/lua/lua_classifier.c +++ b/src/lua/lua_classifier.c @@ -371,6 +371,8 @@ luaopen_classifier (lua_State * L) lua_newclass (L, "rspamd{classifier}", classifierlib_m); luaL_openlib (L, "rspamd_classifier", null_reg, 0); + lua_pop (L, 1); /* remove metatable from stack */ + return 1; } @@ -380,6 +382,8 @@ luaopen_statfile (lua_State * L) lua_newclass (L, "rspamd{statfile}", statfilelib_m); luaL_openlib (L, "rspamd_statfile", null_reg, 0); + lua_pop (L, 1); /* remove metatable from stack */ + return 1; } diff --git a/src/lua/lua_common.c b/src/lua/lua_common.c index 62fb5389b..6a671cf00 100644 --- a/src/lua/lua_common.c +++ b/src/lua/lua_common.c @@ -42,7 +42,7 @@ LUA_FUNCTION_DEF (logger, warn); LUA_FUNCTION_DEF (logger, info); LUA_FUNCTION_DEF (logger, debug); -static const struct luaL_reg loggerlib_m[] = { +static const struct luaL_reg loggerlib_f[] = { LUA_INTERFACE_DEF (logger, err), LUA_INTERFACE_DEF (logger, warn), LUA_INTERFACE_DEF (logger, info), @@ -51,9 +51,159 @@ static const struct luaL_reg loggerlib_m[] = { {NULL, NULL} }; +/* util module */ +LUA_FUNCTION_DEF (util, gethostbyname); +LUA_FUNCTION_DEF (util, strsplit); +LUA_FUNCTION_DEF (util, close); +LUA_FUNCTION_DEF (util, ip_to_str); +LUA_FUNCTION_DEF (util, str_to_ip); + +static const struct luaL_reg utillib_f[] = { + LUA_INTERFACE_DEF (util, gethostbyname), + LUA_INTERFACE_DEF (util, strsplit), + LUA_INTERFACE_DEF (util, close), + LUA_INTERFACE_DEF (util, ip_to_str), + LUA_INTERFACE_DEF (util, str_to_ip), + {"__tostring", lua_class_tostring}, + {NULL, NULL} +}; + +/** + * Get numeric ip presentation of hostname + */ +static gint +lua_util_gethostbyname (lua_State *L) +{ + const gchar *name; + struct hostent *hent; + struct in_addr ina; + + name = luaL_checkstring (L, 1); + if (name) { + hent = gethostbyname (name); + if (hent) { + memcpy (&ina, hent->h_addr, sizeof (struct in_addr)); + lua_pushinteger (L, ina.s_addr); + } + else { + lua_pushnil (L); + } + } + else { + lua_pushnil (L); + } + return 1; +} + +/** + * Close file descriptor + */ +static gint +lua_util_close (lua_State *L) +{ + gint fd; + + fd = lua_tointeger (L, 1); + + if (fd >= 0) { + close (fd); + } + + return 0; +} + +/** + * Convert numeric ip to string + */ +static gint +lua_util_ip_to_str (lua_State *L) +{ + struct in_addr ina; + + ina.s_addr = lua_tointeger (L, 1); + if (ina.s_addr) { + lua_pushstring (L, inet_ntoa (ina)); + } + else { + lua_pushnil (L); + } + + return 1; +} + +/** + * Convert string ip to numeric + */ +static gint +lua_util_str_to_ip (lua_State *L) +{ + struct in_addr ina; + const gchar *ip; + + ip = luaL_checkstring (L, 1); + if (ip) { + if (inet_aton (ip, &ina) != 0) { + lua_pushinteger (L, ina.s_addr); + } + else { + lua_pushnil (L); + } + } + else { + lua_pushnil (L); + } + + return 1; +} + +/** + * Split string to the portions using separators as the second argument + */ +static gint +lua_util_strsplit (lua_State *L) +{ + gchar **list, **cur; + const gchar *in, *separators = " ;,"; + gint i; + + in = luaL_checkstring (L, 1); + + if (in) { + if (lua_gettop (L) > 1) { + separators = luaL_checkstring (L, 2); + } + list = g_strsplit_set (in, separators, -1); + if (list) { + cur = list; + lua_newtable (L); + i = 1; + while (*cur) { + lua_pushstring (L, *cur); + lua_rawseti (L, -2, i++); + cur ++; + } + g_strfreev (list); + } + else { + lua_pushnil (L); + } + } + else { + lua_pushnil (L); + } + + return 1; +} + /* Util functions */ +/** + * Create new class and store metatable on top of the stack + * @param L + * @param classname name of class + * @param func table of class methods + */ void -lua_newclass (lua_State * L, const gchar *classname, const struct luaL_reg *func) +lua_newclass (lua_State * L, const gchar *classname, const struct luaL_reg *methods) { luaL_newmetatable (L, classname); /* mt */ lua_pushstring (L, "__index"); @@ -64,7 +214,17 @@ lua_newclass (lua_State * L, const gchar *classname, const struct luaL_reg *func lua_pushstring (L, classname); /* mt,"__index",it,"class",classname */ lua_rawset (L, -3); /* mt,"__index",it */ - luaL_openlib (L, NULL, func, 0); + luaL_openlib (L, NULL, methods, 0); +} + +/** + * Create and register new class with static methods and store metatable on top of the stack + */ +void +lua_newclass_full (lua_State *L, const gchar *classname, const gchar *static_name, const struct luaL_reg *methods, const struct luaL_reg *func) +{ + lua_newclass (L, classname, methods); + luaL_openlib(L, static_name, func, 0); } gint @@ -210,11 +370,20 @@ luaopen_rspamd (lua_State * L) return 1; } -gint +static gint luaopen_logger (lua_State * L) { - luaL_openlib (L, "rspamd_logger", loggerlib_m, 0); + luaL_openlib (L, "rspamd_logger", loggerlib_f, 0); + + return 1; +} + + +static gint +luaopen_util (lua_State *L) +{ + luaL_openlib (L, "rspamd_util", utillib_f, 0); return 1; } @@ -252,9 +421,9 @@ init_lua (struct config_file *cfg) (void)luaopen_rspamd (L); (void)luaopen_logger (L); + (void)luaopen_util (L); (void)luaopen_mempool (L); (void)luaopen_config (L); - (void)luaopen_session (L); (void)luaopen_radix (L); (void)luaopen_hash_table (L); (void)luaopen_trie (L); @@ -272,6 +441,8 @@ init_lua (struct config_file *cfg) (void)luaopen_redis (L); (void)luaopen_upstream (L); (void)lua_add_actions_global (L); + (void)luaopen_session (L); + (void)luaopen_io_dispatcher (L); cfg->lua_state = L; memory_pool_add_destructor (cfg->cfg_pool, (pool_destruct_func)lua_close, L); diff --git a/src/lua/lua_common.h b/src/lua/lua_common.h index daab695fc..dc417fa68 100644 --- a/src/lua/lua_common.h +++ b/src/lua/lua_common.h @@ -24,7 +24,12 @@ extern GMutex *lua_mtx; /** * Create and register new class */ -void lua_newclass (lua_State *L, const gchar *classname, const struct luaL_reg *func); +void lua_newclass (lua_State *L, const gchar *classname, const struct luaL_reg *methods); + +/** + * Create and register new class with static methods + */ +void lua_newclass_full (lua_State *L, const gchar *classname, const gchar *static_name, const struct luaL_reg *methods, const struct luaL_reg *func); /** * Set class name for object at @param objidx position @@ -64,6 +69,7 @@ gint luaopen_redis (lua_State * L); gint luaopen_upstream (lua_State * L); gint luaopen_mempool (lua_State * L); gint luaopen_session (lua_State * L); +gint luaopen_io_dispatcher (lua_State * L); void init_lua (struct config_file *cfg); gboolean init_lua_filters (struct config_file *cfg); diff --git a/src/lua/lua_config.c b/src/lua/lua_config.c index 2258c03aa..217bfc761 100644 --- a/src/lua/lua_config.c +++ b/src/lua/lua_config.c @@ -967,6 +967,8 @@ luaopen_config (lua_State * L) lua_newclass (L, "rspamd{config}", configlib_m); luaL_openlib (L, "rspamd_config", null_reg, 0); + lua_pop (L, 1); /* remove metatable from stack */ + return 1; } @@ -976,6 +978,8 @@ luaopen_radix (lua_State * L) lua_newclass (L, "rspamd{radix}", radixlib_m); luaL_openlib (L, "rspamd_radix", null_reg, 0); + lua_pop (L, 1); /* remove metatable from stack */ + return 1; } @@ -985,6 +989,8 @@ luaopen_hash_table (lua_State * L) lua_newclass (L, "rspamd{hash_table}", hashlib_m); luaL_openlib (L, "rspamd_hash_table", null_reg, 0); + lua_pop (L, 1); /* remove metatable from stack */ + return 1; } @@ -1003,5 +1009,7 @@ luaopen_trie (lua_State * L) luaL_openlib (L, NULL, trielib_m, 0); luaL_openlib(L, "rspamd_trie", trielib_f, 0); + lua_pop (L, 1); /* remove metatable from stack */ + return 1; } diff --git a/src/lua/lua_mempool.c b/src/lua/lua_mempool.c index 44fb3fe85..40094abc4 100644 --- a/src/lua/lua_mempool.c +++ b/src/lua/lua_mempool.c @@ -239,5 +239,7 @@ luaopen_mempool (lua_State * L) luaL_openlib (L, NULL, mempoollib_m, 0); luaL_openlib(L, "rspamd_mempool", mempoollib_f, 0); + lua_pop (L, 1); /* remove metatable from stack */ + return 1; } diff --git a/src/lua/lua_message.c b/src/lua/lua_message.c index 0dedb9068..6c91938ab 100644 --- a/src/lua/lua_message.c +++ b/src/lua/lua_message.c @@ -206,5 +206,7 @@ luaopen_message (lua_State * L) lua_newclass (L, "rspamd{message}", msglib_m); luaL_openlib (L, "rspamd_message", null_reg, 0); + lua_pop (L, 1); /* remove metatable from stack */ + return 1; } diff --git a/src/lua/lua_session.c b/src/lua/lua_session.c index 5ccfc76d3..9f4af2882 100644 --- a/src/lua/lua_session.c +++ b/src/lua/lua_session.c @@ -334,9 +334,13 @@ luaopen_session (lua_State * L) luaL_openlib (L, NULL, sessionlib_m, 0); luaL_openlib(L, "rspamd_session", sessionlib_f, 0); + lua_pop (L, 1); /* remove metatable from stack */ + /* Simple event class */ lua_newclass (L, "rspamd{event}", eventlib_m); luaL_openlib (L, "rspamd_event", null_reg, 0); + lua_pop (L, 1); /* remove metatable from stack */ + return 1; } diff --git a/src/lua/lua_task.c b/src/lua/lua_task.c index 3aa204597..abb8cc15d 100644 --- a/src/lua/lua_task.c +++ b/src/lua/lua_task.c @@ -42,8 +42,14 @@ extern stat_file_t* get_statfile_by_symbol (statfile_pool_t *pool, struct classifier_config *ccf, const gchar *symbol, struct statfile **st, gboolean try_create); +/* Task creation */ +LUA_FUNCTION_DEF (task, create_empty); +LUA_FUNCTION_DEF (task, create_from_buffer); /* Task methods */ LUA_FUNCTION_DEF (task, get_message); +LUA_FUNCTION_DEF (task, process_message); +LUA_FUNCTION_DEF (task, set_cfg); +LUA_FUNCTION_DEF (task, destroy); LUA_FUNCTION_DEF (task, get_mempool); LUA_FUNCTION_DEF (task, get_ev_base); LUA_FUNCTION_DEF (task, insert_result); @@ -77,8 +83,17 @@ LUA_FUNCTION_DEF (task, get_metric_score); LUA_FUNCTION_DEF (task, get_metric_action); LUA_FUNCTION_DEF (task, learn_statfile); +static const struct luaL_reg tasklib_f[] = { + LUA_INTERFACE_DEF (task, create_empty), + LUA_INTERFACE_DEF (task, create_from_buffer), + {NULL, NULL} +}; + static const struct luaL_reg tasklib_m[] = { LUA_INTERFACE_DEF (task, get_message), + LUA_INTERFACE_DEF (task, destroy), + LUA_INTERFACE_DEF (task, process_message), + LUA_INTERFACE_DEF (task, set_cfg), LUA_INTERFACE_DEF (task, get_mempool), LUA_INTERFACE_DEF (task, get_ev_base), LUA_INTERFACE_DEF (task, insert_result), @@ -204,17 +219,96 @@ lua_check_url (lua_State * L) } /*** Task interface ***/ + +static int +lua_task_create_empty (lua_State *L) +{ + struct worker_task **ptask, *task; + + task = construct_task (NULL); + ptask = lua_newuserdata (L, sizeof (gpointer)); + lua_setclass (L, "rspamd{task}", -1); + *ptask = task; + return 1; +} + +static int +lua_task_create_from_buffer (lua_State *L) +{ + struct worker_task **ptask, *task; + const gchar *data; + size_t len; + + data = luaL_checklstring (L, 1, &len); + if (data) { + task = construct_task (NULL); + ptask = lua_newuserdata (L, sizeof (gpointer)); + lua_setclass (L, "rspamd{task}", -1); + *ptask = task; + task->msg = memory_pool_alloc (task->task_pool, sizeof (f_str_t)); + task->msg->begin = memory_pool_alloc (task->task_pool, len); + memcpy (task->msg->begin, data, len); + task->msg->len = len; + } + return 1; +} + +static int +lua_task_process_message (lua_State *L) +{ + struct worker_task *task = lua_check_task (L); + + if (task != NULL && task->msg != NULL && task->msg->len > 0) { + if (process_message (task) == 0) { + lua_pushboolean (L, TRUE); + } + else { + lua_pushboolean (L, FALSE); + } + } + else { + lua_pushboolean (L, FALSE); + } + + return 1; +} +static int +lua_task_set_cfg (lua_State *L) +{ + struct worker_task *task = lua_check_task (L); + void *ud = luaL_checkudata (L, 2, "rspamd{config}"); + + luaL_argcheck (L, ud != NULL, 1, "'config' expected"); + task->cfg = ud ? *((struct config_file **)ud) : NULL; + return 0; +} + +static int +lua_task_destroy (lua_State *L) +{ + struct worker_task *task = lua_check_task (L); + + if (task != NULL) { + free_task (task, FALSE); + } + + return 0; +} + static int lua_task_get_message (lua_State * L) { GMimeMessage **pmsg; struct worker_task *task = lua_check_task (L); - if (task != NULL) { + if (task != NULL && task->message != NULL) { pmsg = lua_newuserdata (L, sizeof (GMimeMessage *)); lua_setclass (L, "rspamd{message}", -1); *pmsg = task->message; } + else { + lua_pushnil (L); + } return 1; } @@ -229,6 +323,9 @@ lua_task_get_mempool (lua_State * L) lua_setclass (L, "rspamd{mempool}", -1); *ppool = task->task_pool; } + else { + lua_pushnil (L); + } return 1; } @@ -243,6 +340,9 @@ lua_task_get_ev_base (lua_State * L) lua_setclass (L, "rspamd{ev_base}", -1); *pbase = task->ev_base; } + else { + lua_pushnil (L); + } return 1; } @@ -267,7 +367,7 @@ lua_task_insert_result (lua_State * L) insert_result (task, symbol_name, flag, params); } - return 1; + return 0; } static gint @@ -290,7 +390,7 @@ lua_task_set_pre_result (lua_State * L) } } } - return 1; + return 0; } struct lua_tree_cb_data { @@ -1740,8 +1840,9 @@ lua_url_get_phished (lua_State *L) gint luaopen_task (lua_State * L) { - lua_newclass (L, "rspamd{task}", tasklib_m); - luaL_openlib (L, "rspamd_task", null_reg, 0); + lua_newclass_full (L, "rspamd{task}", "rspamd_task", tasklib_m, tasklib_f); + + lua_pop (L, 1); /* remove metatable from stack */ return 1; } @@ -1752,6 +1853,8 @@ luaopen_textpart (lua_State * L) lua_newclass (L, "rspamd{textpart}", textpartlib_m); luaL_openlib (L, "rspamd_textpart", null_reg, 0); + lua_pop (L, 1); /* remove metatable from stack */ + return 1; } @@ -1761,6 +1864,8 @@ luaopen_image (lua_State * L) lua_newclass (L, "rspamd{image}", imagelib_m); luaL_openlib (L, "rspamd_image", null_reg, 0); + lua_pop (L, 1); /* remove metatable from stack */ + return 1; } @@ -1770,6 +1875,8 @@ luaopen_url (lua_State * L) lua_newclass (L, "rspamd{url}", urllib_m); luaL_openlib (L, "rspamd_url", null_reg, 0); + lua_pop (L, 1); /* remove metatable from stack */ + return 1; } diff --git a/src/lua/lua_upstream.c b/src/lua/lua_upstream.c index f9e74e027..6780a756a 100644 --- a/src/lua/lua_upstream.c +++ b/src/lua/lua_upstream.c @@ -505,6 +505,8 @@ luaopen_upstream (lua_State * L) luaL_openlib (L, NULL, upstream_list_m, 0); luaL_openlib (L, "upstream_list", upstream_list_f, 0); + lua_pop (L, 1); /* remove metatable from stack */ + luaL_newmetatable (L, "rspamd{upstream}"); lua_pushstring (L, "__index"); lua_pushvalue (L, -2); @@ -517,5 +519,7 @@ luaopen_upstream (lua_State * L) luaL_openlib (L, NULL, upstream_m, 0); luaL_openlib (L, "upstream", upstream_f, 0); + lua_pop (L, 1); /* remove metatable from stack */ + return 1; } diff --git a/src/lua_worker.c b/src/lua_worker.c new file mode 100644 index 000000000..b7daf7ce7 --- /dev/null +++ b/src/lua_worker.c @@ -0,0 +1,516 @@ +/* Copyright (c) 2010-2012, Vsevolod Stakhov + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED ''AS IS'' AND ANY + * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED + * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE + * DISCLAIMED. IN NO EVENT SHALL AUTHOR BE LIABLE FOR ANY + * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES + * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; + * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND + * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS + * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + + +#include "config.h" +#include "util.h" +#include "main.h" +#include "protocol.h" +#include "upstream.h" +#include "cfg_file.h" +#include "cfg_xml.h" +#include "url.h" +#include "message.h" +#include "map.h" +#include "dns.h" + +#include "lua/lua_common.h" + +#ifdef WITH_GPERF_TOOLS +# include +#endif + +/* 60 seconds for worker's IO */ +#define DEFAULT_WORKER_IO_TIMEOUT 60000 + +gpointer init_lua_worker (void); +void start_lua_worker (struct rspamd_worker *worker); + +worker_t lua_worker = { + "lua", /* Name */ + init_lua_worker, /* Init function */ + start_lua_worker, /* Start function */ + TRUE, /* Has socket */ + FALSE, /* Non unique */ + FALSE, /* Non threaded */ + TRUE /* Killable */ +}; + +/* + * Worker's context + */ +struct rspamd_lua_worker_ctx { + /* DNS resolver */ + struct rspamd_dns_resolver *resolver; + /* Events base */ + struct event_base *ev_base; + /* Other params */ + GHashTable *params; + /* Lua script to load */ + gchar *file; + /* Lua state */ + lua_State *L; + /* Callback for accept */ + gint cbref_accept; + /* Callback for finishing */ + gint cbref_fin; + /* Config file */ + struct config_file *cfg; +}; + +/* Lua bindings */ +LUA_FUNCTION_DEF (worker, get_ev_base); +LUA_FUNCTION_DEF (worker, register_accept_callback); +LUA_FUNCTION_DEF (worker, register_exit_callback); +LUA_FUNCTION_DEF (worker, get_option); +LUA_FUNCTION_DEF (worker, get_resolver); +LUA_FUNCTION_DEF (worker, get_cfg); + +static const struct luaL_reg lua_workerlib_m[] = { + LUA_INTERFACE_DEF (worker, get_ev_base), + LUA_INTERFACE_DEF (worker, register_accept_callback), + LUA_INTERFACE_DEF (worker, register_exit_callback), + LUA_INTERFACE_DEF (worker, get_option), + LUA_INTERFACE_DEF (worker, get_resolver), + LUA_INTERFACE_DEF (worker, get_cfg), + {"__tostring", lua_class_tostring}, + {NULL, NULL} +}; + +static sig_atomic_t wanna_die = 0; + +/* Basic functions of LUA API for worker object */ +static gint +luaopen_lua_worker (lua_State * L) +{ + lua_newclass (L, "rspamd{worker}", lua_workerlib_m); + luaL_openlib (L, "rspamd_worker", null_reg, 0); + + lua_pop (L, 1); /* remove metatable from stack */ + + return 1; +} + +struct rspamd_lua_worker_ctx * +lua_check_lua_worker (lua_State * L) +{ + void *ud = luaL_checkudata (L, 1, "rspamd{worker}"); + luaL_argcheck (L, ud != NULL, 1, "'worker' expected"); + return ud ? *((struct rspamd_lua_worker_ctx **)ud) : NULL; +} + +static int +lua_worker_get_ev_base (lua_State *L) +{ + struct rspamd_lua_worker_ctx *ctx = lua_check_lua_worker (L); + struct event_base **pbase; + + if (ctx) { + pbase = lua_newuserdata (L, sizeof (struct event_base *)); + lua_setclass (L, "rspamd{ev_base}", -1); + *pbase = ctx->ev_base; + } + else { + lua_pushnil (L); + } + + return 1; +} + +static int +lua_worker_register_accept_callback (lua_State *L) +{ + struct rspamd_lua_worker_ctx *ctx = lua_check_lua_worker (L); + + if (ctx) { + if (!lua_isfunction (L, 2)) { + msg_err ("invalid callback passed"); + lua_pushnil (L); + } + else { + lua_pushvalue (L, 2); + ctx->cbref_accept = luaL_ref (L, LUA_REGISTRYINDEX); + return 0; + } + } + else { + lua_pushnil (L); + } + + return 1; +} + +static int +lua_worker_register_exit_callback (lua_State *L) +{ + struct rspamd_lua_worker_ctx *ctx = lua_check_lua_worker (L); + + if (ctx) { + if (!lua_isfunction (L, 2)) { + msg_err ("invalid callback passed"); + lua_pushnil (L); + } + else { + lua_pushvalue (L, 2); + ctx->cbref_fin = luaL_ref (L, LUA_REGISTRYINDEX); + return 0; + } + } + else { + lua_pushnil (L); + } + + return 1; +} + +static int +lua_worker_get_option (lua_State *L) +{ + struct rspamd_lua_worker_ctx *ctx = lua_check_lua_worker (L); + GList *val; + gint i; + const gchar *name; + + if (ctx) { + name = luaL_checkstring (L, 2); + if (name == NULL) { + msg_err ("no name specified"); + lua_pushnil (L); + } + else { + val = g_hash_table_lookup (ctx->params, name); + if (val == NULL) { + lua_pushnil (L); + } + else { + /* Push the array */ + i = 1; + lua_newtable (L); + while (val) { + lua_pushstring (L, val->data); + lua_rawseti (L, -2, i++); + val = g_list_next (val); + } + } + } + } + else { + lua_pushnil (L); + } + + return 1; +} + +static int +lua_worker_get_resolver (lua_State *L) +{ + struct rspamd_lua_worker_ctx *ctx = lua_check_lua_worker (L); + + if (ctx) { + /* XXX: implement resolver API */ + } + else { + lua_pushnil (L); + } + + return 1; +} + +static int +lua_worker_get_cfg (lua_State *L) +{ + struct rspamd_lua_worker_ctx *ctx = lua_check_lua_worker (L); + struct config_file **pcfg; + + if (ctx) { + pcfg = lua_newuserdata (L, sizeof (gpointer)); + lua_setclass (L, "rspamd{config}", -1); + *pcfg = ctx->cfg; + } + else { + lua_pushnil (L); + } + + return 1; +} + +/* End of lua API */ +/* Signal handlers */ + +#ifndef HAVE_SA_SIGINFO +static void +sig_handler (gint signo) +#else +static void +sig_handler (gint signo, siginfo_t * info, void *unused) +#endif +{ + struct timeval tv; + + switch (signo) { + case SIGINT: + case SIGTERM: + if (!wanna_die) { + wanna_die = 1; + tv.tv_sec = 0; + tv.tv_usec = 0; + event_loopexit (&tv); + +#ifdef WITH_GPERF_TOOLS + ProfilerStop (); +#endif + } + break; + } +} + +/* + * Config reload is designed by sending sigusr2 to active workers and pending shutdown of them + */ +static void +sigusr2_handler (gint fd, short what, void *arg) +{ + struct rspamd_worker *worker = (struct rspamd_worker *) arg; + /* Do not accept new connections, preparing to end worker's process */ + struct timeval tv; + + if (!wanna_die) { + tv.tv_sec = SOFT_SHUTDOWN_TIME; + tv.tv_usec = 0; + event_del (&worker->sig_ev_usr1); + event_del (&worker->sig_ev_usr2); + event_del (&worker->bind_ev); + msg_info ("worker's shutdown is pending in %d sec", SOFT_SHUTDOWN_TIME); + event_loopexit (&tv); + } + return; +} + +/* + * Reopen log is designed by sending sigusr1 to active workers and pending shutdown of them + */ +static void +sigusr1_handler (gint fd, short what, void *arg) +{ + struct rspamd_worker *worker = (struct rspamd_worker *) arg; + + reopen_log (worker->srv->logger); + + return; +} + +static gboolean +handle_lua_param (struct config_file *cfg, struct rspamd_xml_userdata *unused, GHashTable *attrs, gchar *data, gpointer user_data, gpointer dest_struct, gint offset) +{ + struct rspamd_lua_worker_ctx *ctx = dest_struct; + GList *val; + gchar *tag = user_data; + + val = g_hash_table_lookup (ctx->params, tag); + if (val == NULL) { + g_hash_table_insert (ctx->params, g_strdup (tag), g_list_prepend (NULL, data)); + } + else { + val = g_list_append (val, data); + } + + return TRUE; +} + + +/* + * Accept new connection and construct task + */ +static void +lua_accept_socket (gint fd, short what, void *arg) +{ + struct rspamd_worker *worker = (struct rspamd_worker *) arg; + struct rspamd_lua_worker_ctx *ctx, **pctx; + union sa_union su; + socklen_t addrlen = sizeof (su.ss); + gint nfd; + struct in_addr addr; + gchar *addr_str = NULL; + lua_State *L; + + ctx = worker->ctx; + L = ctx->L; + + if ((nfd = + accept_from_socket (fd, (struct sockaddr *) &su.ss, &addrlen)) == -1) { + msg_warn ("accept failed: %s", strerror (errno)); + return; + } + /* Check for EAGAIN */ + if (nfd == 0){ + return; + } + + if (su.ss.ss_family == AF_UNIX) { + msg_info ("accepted connection from unix socket"); + addr.s_addr = INADDR_NONE; + addr_str = "127.0.0.1"; + } + else if (su.ss.ss_family == AF_INET) { + msg_info ("accepted connection from %s port %d", + inet_ntoa (su.s4.sin_addr), ntohs (su.s4.sin_port)); + memcpy (&addr, &su.s4.sin_addr, + sizeof (struct in_addr)); + addr_str = g_strdup (inet_ntoa (su.s4.sin_addr)); + } + + /* Call finalizer function */ + lua_rawgeti (L, LUA_REGISTRYINDEX, ctx->cbref_accept); + pctx = lua_newuserdata (L, sizeof (gpointer)); + lua_setclass (L, "rspamd{worker}", -1); + *pctx = ctx; + lua_pushinteger (L, nfd); + lua_pushstring (L, addr_str); + lua_pushinteger (L, addr.s_addr); + + + if (lua_pcall (L, 4, 0, 0) != 0) { + msg_info ("call to worker accept failed: %s", lua_tostring (L, -1)); + } + + if (addr_str) { + g_free (addr_str); + } +} + +gpointer +init_lua_worker (void) +{ + struct rspamd_lua_worker_ctx *ctx; + GQuark type; + + type = g_quark_try_string ("lua"); + + ctx = g_malloc0 (sizeof (struct rspamd_lua_worker_ctx)); + ctx->params = g_hash_table_new_full (g_str_hash, g_str_equal, g_free, (GDestroyNotify)g_list_free); + + + register_worker_opt (type, "file", xml_handle_string, ctx, G_STRUCT_OFFSET (struct rspamd_lua_worker_ctx, file)); + register_worker_opt (type, "*", handle_lua_param, ctx, 0); + + return ctx; +} + +/* + * Start worker process + */ +void +start_lua_worker (struct rspamd_worker *worker) +{ + struct sigaction signals; + struct rspamd_lua_worker_ctx *ctx = worker->ctx, **pctx; + lua_State *L; + +#ifdef WITH_PROFILER + extern void _start (void), etext (void); + monstartup ((u_long) & _start, (u_long) & etext); +#endif + + gperf_profiler_init (worker->srv->cfg, "lua_worker"); + + worker->srv->pid = getpid (); + + ctx->ev_base = event_init (); + + L = worker->srv->cfg->lua_state; + ctx->L = L; + ctx->cfg = worker->srv->cfg; + + init_signals (&signals, sig_handler); + sigprocmask (SIG_UNBLOCK, &signals.sa_mask, NULL); + + /* SIGUSR2 handler */ + signal_set (&worker->sig_ev_usr2, SIGUSR2, sigusr2_handler, (void *) worker); + event_base_set (ctx->ev_base, &worker->sig_ev_usr2); + signal_add (&worker->sig_ev_usr2, NULL); + + /* SIGUSR1 handler */ + signal_set (&worker->sig_ev_usr1, SIGUSR1, sigusr1_handler, (void *) worker); + event_base_set (ctx->ev_base, &worker->sig_ev_usr1); + signal_add (&worker->sig_ev_usr1, NULL); + + /* Accept event */ + event_set (&worker->bind_ev, worker->cf->listen_sock, EV_READ | EV_PERSIST, + lua_accept_socket, (void *) worker); + event_base_set (ctx->ev_base, &worker->bind_ev); + event_add (&worker->bind_ev, NULL); + + /* Open worker's lib */ + luaopen_lua_worker (L); + + if (ctx->file == NULL) { + msg_err ("No lua script defined, so no reason to exist"); + exit (EXIT_SUCCESS); + } + if (access (ctx->file, R_OK) == -1) { + msg_err ("Error reading lua script %s: %s", ctx->file, strerror (errno)); + exit (EXIT_SUCCESS); + } + + pctx = lua_newuserdata (L, sizeof (gpointer)); + lua_setclass (L, "rspamd{worker}", -1); + lua_setglobal (L, "rspamd_worker"); + *pctx = ctx; + + if (luaL_dofile (L, ctx->file) != 0) { + msg_err ("Error executing lua script %s: %s", ctx->file, lua_tostring (L, -1)); + exit (EXIT_SUCCESS); + } + + if (ctx->cbref_accept == 0) { + msg_err ("No accept function defined, so no reason to exist"); + exit (EXIT_SUCCESS); + } + + /* Maps events */ + start_map_watch (ctx->ev_base); + ctx->resolver = dns_resolver_init (ctx->ev_base, worker->srv->cfg); + + event_base_loop (ctx->ev_base, 0); + luaL_unref (L, LUA_REGISTRYINDEX, ctx->cbref_accept); + if (ctx->cbref_fin != 0) { + /* Call finalizer function */ + lua_rawgeti (L, LUA_REGISTRYINDEX, ctx->cbref_fin); + pctx = lua_newuserdata (L, sizeof (gpointer)); + lua_setclass (L, "rspamd{worker}", -1); + *pctx = ctx; + if (lua_pcall (L, 1, 0, 0) != 0) { + msg_info ("call to worker finalizer failed: %s", lua_tostring (L, -1)); + } + /* Free resources */ + luaL_unref (L, LUA_REGISTRYINDEX, ctx->cbref_fin); + } + + close_log (rspamd_main->logger); + exit (EXIT_SUCCESS); +} + +/* + * vi:ts=4 + */ + diff --git a/src/worker_util.c b/src/worker_util.c index ba4444485..976fe58c3 100644 --- a/src/worker_util.c +++ b/src/worker_util.c @@ -52,7 +52,9 @@ construct_task (struct rspamd_worker *worker) new_task->worker = worker; new_task->state = READ_COMMAND; - new_task->cfg = worker->srv->cfg; + if (worker) { + new_task->cfg = worker->srv->cfg; + } new_task->view_checked = FALSE; #ifdef HAVE_CLOCK_GETTIME # ifdef HAVE_CLOCK_PROCESS_CPUTIME_ID -- 2.39.5