aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorVsevolod Stakhov <vsevolod@rambler-co.ru>2012-07-26 21:50:13 +0400
committerVsevolod Stakhov <vsevolod@rambler-co.ru>2012-07-26 21:50:13 +0400
commite1a8ed50131891516f5da6e22aae69a306147d38 (patch)
tree61002cf909d2d41553b94b8ed654d0f2f3567a67
parent2e615083e475c7390c667695b9e659fa5ba4da5f (diff)
downloadrspamd-e1a8ed50131891516f5da6e22aae69a306147d38.tar.gz
rspamd-e1a8ed50131891516f5da6e22aae69a306147d38.zip
* Add lua worker type and lua worker bindings.
* Add lua utility library for basic utils. * Fixes lua_buffer code. Fix lua loading error. Added some other lua utility functions.
-rw-r--r--CMakeLists.txt3
-rw-r--r--src/cfg_xml.c22
-rw-r--r--src/lua/lua_buffer.c41
-rw-r--r--src/lua/lua_cdb.c2
-rw-r--r--src/lua/lua_classifier.c4
-rw-r--r--src/lua/lua_common.c183
-rw-r--r--src/lua/lua_common.h8
-rw-r--r--src/lua/lua_config.c8
-rw-r--r--src/lua/lua_mempool.c2
-rw-r--r--src/lua/lua_message.c2
-rw-r--r--src/lua/lua_session.c4
-rw-r--r--src/lua/lua_task.c117
-rw-r--r--src/lua/lua_upstream.c4
-rw-r--r--src/lua_worker.c516
-rw-r--r--src/worker_util.c4
15 files changed, 880 insertions, 40 deletions
diff --git a/CMakeLists.txt b/CMakeLists.txt
index d01a66722..5379869ac 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -930,6 +930,7 @@ SET(RSPAMDSRC src/modules.c
src/fuzzy_storage.c
src/kvstorage_server.c
src/lmtp.c
+ src/lua_worker.c
src/main.c
src/map.c
src/smtp.c
@@ -951,7 +952,7 @@ SET(PLUGINSSRC src/plugins/surbl.c
src/plugins/dkim_check.c)
SET(MODULES_LIST surbl regexp chartable fuzzy_check spf dkim)
-SET(WORKERS_LIST normal controller smtp smtp_proxy lmtp fuzzy keystorage)
+SET(WORKERS_LIST normal controller smtp smtp_proxy lmtp fuzzy keystorage lua)
AddModules(MODULES_LIST WORKERS_LIST)
diff --git a/src/cfg_xml.c b/src/cfg_xml.c
index ad542265d..45b678548 100644
--- a/src/cfg_xml.c
+++ b/src/cfg_xml.c
@@ -862,7 +862,27 @@ worker_foreach_callback (gpointer k, gpointer v, gpointer ud)
if (!worker_options || (worker_config = g_hash_table_lookup (worker_options, &cd->wrk->type)) == NULL ||
(cparam = g_hash_table_lookup (worker_config, k)) == NULL) {
- msg_warn ("unregistered worker attribute '%s' for worker %s", k, g_quark_to_string (cd->wrk->type));
+ /* Try to use universal handler if there is no specific handler */
+ if ((cparam = g_hash_table_lookup (worker_config, "*")) != NULL) {
+ if (cd->wrk->ctx != NULL) {
+ if (param->is_list) {
+ cur = param->d.list;
+ while (cur) {
+ cparam->handler (cd->cfg, cd->ctx, NULL, cur->data, k, cd->wrk->ctx, cparam->offset);
+ cur = g_list_next (cur);
+ }
+ }
+ else {
+ cparam->handler (cd->cfg, cd->ctx, NULL, param->d.param, k, cd->wrk->ctx, cparam->offset);
+ }
+ }
+ else {
+ msg_err ("Bad error detected: worker %s has not initialized its context", g_quark_to_string (cd->wrk->type));
+ }
+ }
+ else {
+ msg_warn ("unregistered worker attribute '%s' for worker %s", k, g_quark_to_string (cd->wrk->type));
+ }
}
else {
diff --git a/src/lua/lua_buffer.c b/src/lua/lua_buffer.c
index bb5b1ea8c..9c18ef5cf 100644
--- a/src/lua/lua_buffer.c
+++ b/src/lua/lua_buffer.c
@@ -34,14 +34,14 @@ LUA_FUNCTION_DEF (io_dispatcher, set_policy);
LUA_FUNCTION_DEF (io_dispatcher, write);
LUA_FUNCTION_DEF (io_dispatcher, pause);
LUA_FUNCTION_DEF (io_dispatcher, restore);
-LUA_FUNCTION_DEF (io_dispatcher, delete);
+LUA_FUNCTION_DEF (io_dispatcher, destroy);
static const struct luaL_reg io_dispatcherlib_m[] = {
LUA_INTERFACE_DEF (io_dispatcher, set_policy),
LUA_INTERFACE_DEF (io_dispatcher, write),
LUA_INTERFACE_DEF (io_dispatcher, pause),
LUA_INTERFACE_DEF (io_dispatcher, restore),
- {"__gc", lua_io_dispatcher_delete},
+ LUA_INTERFACE_DEF (io_dispatcher, destroy),
{"__tostring", lua_class_tostring},
{NULL, NULL}
};
@@ -90,12 +90,12 @@ lua_io_read_cb (f_str_t * in, void *arg)
need_unlock = TRUE;
}
/* callback (dispatcher, data) */
+ lua_rawgeti (cbdata->L, LUA_REGISTRYINDEX, cbdata->cbref_read);
pdispatcher = lua_newuserdata (cbdata->L, sizeof (struct rspamd_io_dispatcher_s *));
lua_setclass (cbdata->L, "rspamd{io_dispatcher}", -1);
*pdispatcher = cbdata->d;
lua_pushlstring (cbdata->L, in->begin, in->len);
- lua_rawgeti (cbdata->L, LUA_REGISTRYINDEX, cbdata->cbref_read);
if (lua_pcall (cbdata->L, 2, 1, 0) != 0) {
msg_info ("call to session finalizer failed: %s", lua_tostring (cbdata->L, -1));
}
@@ -107,16 +107,6 @@ lua_io_read_cb (f_str_t * in, void *arg)
g_mutex_unlock (lua_mtx);
}
- if (!res) {
- /* Unref callbacks */
- luaL_unref (cbdata->L, LUA_REGISTRYINDEX, cbdata->cbref_read);
- if (cbdata->cbref_write) {
- luaL_unref (cbdata->L, LUA_REGISTRYINDEX, cbdata->cbref_write);
- }
- luaL_unref (cbdata->L, LUA_REGISTRYINDEX, cbdata->cbref_err);
- g_slice_free1 (sizeof (struct lua_dispatcher_cbdata), cbdata);
- }
-
return res;
}
@@ -132,12 +122,13 @@ lua_io_write_cb (void *arg)
if (g_mutex_trylock (lua_mtx)) {
need_unlock = TRUE;
}
+ lua_rawgeti (cbdata->L, LUA_REGISTRYINDEX, cbdata->cbref_read);
/* callback (dispatcher) */
pdispatcher = lua_newuserdata (cbdata->L, sizeof (struct rspamd_io_dispatcher_s *));
lua_setclass (cbdata->L, "rspamd{io_dispatcher}", -1);
*pdispatcher = cbdata->d;
- lua_rawgeti (cbdata->L, LUA_REGISTRYINDEX, cbdata->cbref_read);
+
if (lua_pcall (cbdata->L, 1, 1, 0) != 0) {
msg_info ("call to session finalizer failed: %s", lua_tostring (cbdata->L, -1));
}
@@ -148,14 +139,6 @@ lua_io_write_cb (void *arg)
if (need_unlock) {
g_mutex_unlock (lua_mtx);
}
-
- if (!res) {
- /* Unref callbacks */
- luaL_unref (cbdata->L, LUA_REGISTRYINDEX, cbdata->cbref_read);
- luaL_unref (cbdata->L, LUA_REGISTRYINDEX, cbdata->cbref_write);
- luaL_unref (cbdata->L, LUA_REGISTRYINDEX, cbdata->cbref_err);
- g_slice_free1 (sizeof (struct lua_dispatcher_cbdata), cbdata);
- }
}
return res;
@@ -173,12 +156,12 @@ lua_io_err_cb (GError * err, void *arg)
need_unlock = TRUE;
}
/* callback (dispatcher, err) */
+ lua_rawgeti (cbdata->L, LUA_REGISTRYINDEX, cbdata->cbref_err);
pdispatcher = lua_newuserdata (cbdata->L, sizeof (struct rspamd_io_dispatcher_s *));
lua_setclass (cbdata->L, "rspamd{io_dispatcher}", -1);
*pdispatcher = cbdata->d;
lua_pushstring (cbdata->L, err->message);
- lua_rawgeti (cbdata->L, LUA_REGISTRYINDEX, cbdata->cbref_read);
if (lua_pcall (cbdata->L, 2, 0, 0) != 0) {
msg_info ("call to session finalizer failed: %s", lua_tostring (cbdata->L, -1));
}
@@ -234,8 +217,12 @@ lua_io_dispatcher_create (lua_State *L)
tv_num = lua_tonumber (L, 6);
tv.tv_sec = trunc (tv_num);
tv.tv_usec = modf (tv_num, &tmp) * 1000.;
+ io_dispatcher = rspamd_create_dispatcher (cbdata->base, fd, BUFFER_LINE, lua_io_read_cb, lua_io_write_cb, lua_io_err_cb, &tv, cbdata);
+ }
+ else {
+ io_dispatcher = rspamd_create_dispatcher (cbdata->base, fd, BUFFER_LINE, lua_io_read_cb, lua_io_write_cb, lua_io_err_cb, NULL, cbdata);
}
- io_dispatcher = rspamd_create_dispatcher (cbdata->base, fd, BUFFER_LINE, lua_io_read_cb, lua_io_write_cb, lua_io_err_cb, &tv, cbdata);
+
cbdata->d = io_dispatcher;
/* Push result */
pdispatcher = lua_newuserdata (L, sizeof (struct rspamd_io_dispatcher_s *));
@@ -338,7 +325,7 @@ lua_io_dispatcher_restore (lua_State *L)
}
static int
-lua_io_dispatcher_delete (lua_State *L)
+lua_io_dispatcher_destroy (lua_State *L)
{
struct rspamd_io_dispatcher_s *io_dispatcher = lua_check_io_dispatcher (L);
@@ -369,10 +356,14 @@ luaopen_io_dispatcher (lua_State * L)
luaL_openlib (L, NULL, io_dispatcherlib_m, 0);
luaL_openlib(L, "rspamd_io_dispatcher", io_dispatcherlib_f, 0);
+ lua_pop(L, 1); /* remove metatable from stack */
+
/* Simple event class */
lua_newclass (L, "rspamd{ev_base}", null_reg);
luaL_openlib (L, "rspamd_ev_base", null_reg, 0);
+ lua_pop(L, 1); /* remove metatable from stack */
+
/* Set buffer types globals */
lua_pushnumber (L, BUFFER_LINE);
lua_setglobal (L, "IO_BUFFER_LINE");
diff --git a/src/lua/lua_cdb.c b/src/lua/lua_cdb.c
index 7adfeeac5..84fef66d9 100644
--- a/src/lua/lua_cdb.c
+++ b/src/lua/lua_cdb.c
@@ -161,5 +161,7 @@ luaopen_cdb (lua_State * L)
luaL_openlib (L, NULL, cdblib_m, 0);
luaL_openlib (L, "cdb", cdblib_f, 0);
+ lua_pop (L, 1); /* remove metatable from stack */
+
return 1;
}
diff --git a/src/lua/lua_classifier.c b/src/lua/lua_classifier.c
index edaf4e7a6..e929c6b50 100644
--- a/src/lua/lua_classifier.c
+++ b/src/lua/lua_classifier.c
@@ -371,6 +371,8 @@ luaopen_classifier (lua_State * L)
lua_newclass (L, "rspamd{classifier}", classifierlib_m);
luaL_openlib (L, "rspamd_classifier", null_reg, 0);
+ lua_pop (L, 1); /* remove metatable from stack */
+
return 1;
}
@@ -380,6 +382,8 @@ luaopen_statfile (lua_State * L)
lua_newclass (L, "rspamd{statfile}", statfilelib_m);
luaL_openlib (L, "rspamd_statfile", null_reg, 0);
+ lua_pop (L, 1); /* remove metatable from stack */
+
return 1;
}
diff --git a/src/lua/lua_common.c b/src/lua/lua_common.c
index 62fb5389b..6a671cf00 100644
--- a/src/lua/lua_common.c
+++ b/src/lua/lua_common.c
@@ -42,7 +42,7 @@ LUA_FUNCTION_DEF (logger, warn);
LUA_FUNCTION_DEF (logger, info);
LUA_FUNCTION_DEF (logger, debug);
-static const struct luaL_reg loggerlib_m[] = {
+static const struct luaL_reg loggerlib_f[] = {
LUA_INTERFACE_DEF (logger, err),
LUA_INTERFACE_DEF (logger, warn),
LUA_INTERFACE_DEF (logger, info),
@@ -51,9 +51,159 @@ static const struct luaL_reg loggerlib_m[] = {
{NULL, NULL}
};
+/* util module */
+LUA_FUNCTION_DEF (util, gethostbyname);
+LUA_FUNCTION_DEF (util, strsplit);
+LUA_FUNCTION_DEF (util, close);
+LUA_FUNCTION_DEF (util, ip_to_str);
+LUA_FUNCTION_DEF (util, str_to_ip);
+
+static const struct luaL_reg utillib_f[] = {
+ LUA_INTERFACE_DEF (util, gethostbyname),
+ LUA_INTERFACE_DEF (util, strsplit),
+ LUA_INTERFACE_DEF (util, close),
+ LUA_INTERFACE_DEF (util, ip_to_str),
+ LUA_INTERFACE_DEF (util, str_to_ip),
+ {"__tostring", lua_class_tostring},
+ {NULL, NULL}
+};
+
+/**
+ * Get numeric ip presentation of hostname
+ */
+static gint
+lua_util_gethostbyname (lua_State *L)
+{
+ const gchar *name;
+ struct hostent *hent;
+ struct in_addr ina;
+
+ name = luaL_checkstring (L, 1);
+ if (name) {
+ hent = gethostbyname (name);
+ if (hent) {
+ memcpy (&ina, hent->h_addr, sizeof (struct in_addr));
+ lua_pushinteger (L, ina.s_addr);
+ }
+ else {
+ lua_pushnil (L);
+ }
+ }
+ else {
+ lua_pushnil (L);
+ }
+ return 1;
+}
+
+/**
+ * Close file descriptor
+ */
+static gint
+lua_util_close (lua_State *L)
+{
+ gint fd;
+
+ fd = lua_tointeger (L, 1);
+
+ if (fd >= 0) {
+ close (fd);
+ }
+
+ return 0;
+}
+
+/**
+ * Convert numeric ip to string
+ */
+static gint
+lua_util_ip_to_str (lua_State *L)
+{
+ struct in_addr ina;
+
+ ina.s_addr = lua_tointeger (L, 1);
+ if (ina.s_addr) {
+ lua_pushstring (L, inet_ntoa (ina));
+ }
+ else {
+ lua_pushnil (L);
+ }
+
+ return 1;
+}
+
+/**
+ * Convert string ip to numeric
+ */
+static gint
+lua_util_str_to_ip (lua_State *L)
+{
+ struct in_addr ina;
+ const gchar *ip;
+
+ ip = luaL_checkstring (L, 1);
+ if (ip) {
+ if (inet_aton (ip, &ina) != 0) {
+ lua_pushinteger (L, ina.s_addr);
+ }
+ else {
+ lua_pushnil (L);
+ }
+ }
+ else {
+ lua_pushnil (L);
+ }
+
+ return 1;
+}
+
+/**
+ * Split string to the portions using separators as the second argument
+ */
+static gint
+lua_util_strsplit (lua_State *L)
+{
+ gchar **list, **cur;
+ const gchar *in, *separators = " ;,";
+ gint i;
+
+ in = luaL_checkstring (L, 1);
+
+ if (in) {
+ if (lua_gettop (L) > 1) {
+ separators = luaL_checkstring (L, 2);
+ }
+ list = g_strsplit_set (in, separators, -1);
+ if (list) {
+ cur = list;
+ lua_newtable (L);
+ i = 1;
+ while (*cur) {
+ lua_pushstring (L, *cur);
+ lua_rawseti (L, -2, i++);
+ cur ++;
+ }
+ g_strfreev (list);
+ }
+ else {
+ lua_pushnil (L);
+ }
+ }
+ else {
+ lua_pushnil (L);
+ }
+
+ return 1;
+}
+
/* Util functions */
+/**
+ * Create new class and store metatable on top of the stack
+ * @param L
+ * @param classname name of class
+ * @param func table of class methods
+ */
void
-lua_newclass (lua_State * L, const gchar *classname, const struct luaL_reg *func)
+lua_newclass (lua_State * L, const gchar *classname, const struct luaL_reg *methods)
{
luaL_newmetatable (L, classname); /* mt */
lua_pushstring (L, "__index");
@@ -64,7 +214,17 @@ lua_newclass (lua_State * L, const gchar *classname, const struct luaL_reg *func
lua_pushstring (L, classname); /* mt,"__index",it,"class",classname */
lua_rawset (L, -3); /* mt,"__index",it */
- luaL_openlib (L, NULL, func, 0);
+ luaL_openlib (L, NULL, methods, 0);
+}
+
+/**
+ * Create and register new class with static methods and store metatable on top of the stack
+ */
+void
+lua_newclass_full (lua_State *L, const gchar *classname, const gchar *static_name, const struct luaL_reg *methods, const struct luaL_reg *func)
+{
+ lua_newclass (L, classname, methods);
+ luaL_openlib(L, static_name, func, 0);
}
gint
@@ -210,11 +370,20 @@ luaopen_rspamd (lua_State * L)
return 1;
}
-gint
+static gint
luaopen_logger (lua_State * L)
{
- luaL_openlib (L, "rspamd_logger", loggerlib_m, 0);
+ luaL_openlib (L, "rspamd_logger", loggerlib_f, 0);
+
+ return 1;
+}
+
+
+static gint
+luaopen_util (lua_State *L)
+{
+ luaL_openlib (L, "rspamd_util", utillib_f, 0);
return 1;
}
@@ -252,9 +421,9 @@ init_lua (struct config_file *cfg)
(void)luaopen_rspamd (L);
(void)luaopen_logger (L);
+ (void)luaopen_util (L);
(void)luaopen_mempool (L);
(void)luaopen_config (L);
- (void)luaopen_session (L);
(void)luaopen_radix (L);
(void)luaopen_hash_table (L);
(void)luaopen_trie (L);
@@ -272,6 +441,8 @@ init_lua (struct config_file *cfg)
(void)luaopen_redis (L);
(void)luaopen_upstream (L);
(void)lua_add_actions_global (L);
+ (void)luaopen_session (L);
+ (void)luaopen_io_dispatcher (L);
cfg->lua_state = L;
memory_pool_add_destructor (cfg->cfg_pool, (pool_destruct_func)lua_close, L);
diff --git a/src/lua/lua_common.h b/src/lua/lua_common.h
index daab695fc..dc417fa68 100644
--- a/src/lua/lua_common.h
+++ b/src/lua/lua_common.h
@@ -24,7 +24,12 @@ extern GMutex *lua_mtx;
/**
* Create and register new class
*/
-void lua_newclass (lua_State *L, const gchar *classname, const struct luaL_reg *func);
+void lua_newclass (lua_State *L, const gchar *classname, const struct luaL_reg *methods);
+
+/**
+ * Create and register new class with static methods
+ */
+void lua_newclass_full (lua_State *L, const gchar *classname, const gchar *static_name, const struct luaL_reg *methods, const struct luaL_reg *func);
/**
* Set class name for object at @param objidx position
@@ -64,6 +69,7 @@ gint luaopen_redis (lua_State * L);
gint luaopen_upstream (lua_State * L);
gint luaopen_mempool (lua_State * L);
gint luaopen_session (lua_State * L);
+gint luaopen_io_dispatcher (lua_State * L);
void init_lua (struct config_file *cfg);
gboolean init_lua_filters (struct config_file *cfg);
diff --git a/src/lua/lua_config.c b/src/lua/lua_config.c
index 2258c03aa..217bfc761 100644
--- a/src/lua/lua_config.c
+++ b/src/lua/lua_config.c
@@ -967,6 +967,8 @@ luaopen_config (lua_State * L)
lua_newclass (L, "rspamd{config}", configlib_m);
luaL_openlib (L, "rspamd_config", null_reg, 0);
+ lua_pop (L, 1); /* remove metatable from stack */
+
return 1;
}
@@ -976,6 +978,8 @@ luaopen_radix (lua_State * L)
lua_newclass (L, "rspamd{radix}", radixlib_m);
luaL_openlib (L, "rspamd_radix", null_reg, 0);
+ lua_pop (L, 1); /* remove metatable from stack */
+
return 1;
}
@@ -985,6 +989,8 @@ luaopen_hash_table (lua_State * L)
lua_newclass (L, "rspamd{hash_table}", hashlib_m);
luaL_openlib (L, "rspamd_hash_table", null_reg, 0);
+ lua_pop (L, 1); /* remove metatable from stack */
+
return 1;
}
@@ -1003,5 +1009,7 @@ luaopen_trie (lua_State * L)
luaL_openlib (L, NULL, trielib_m, 0);
luaL_openlib(L, "rspamd_trie", trielib_f, 0);
+ lua_pop (L, 1); /* remove metatable from stack */
+
return 1;
}
diff --git a/src/lua/lua_mempool.c b/src/lua/lua_mempool.c
index 44fb3fe85..40094abc4 100644
--- a/src/lua/lua_mempool.c
+++ b/src/lua/lua_mempool.c
@@ -239,5 +239,7 @@ luaopen_mempool (lua_State * L)
luaL_openlib (L, NULL, mempoollib_m, 0);
luaL_openlib(L, "rspamd_mempool", mempoollib_f, 0);
+ lua_pop (L, 1); /* remove metatable from stack */
+
return 1;
}
diff --git a/src/lua/lua_message.c b/src/lua/lua_message.c
index 0dedb9068..6c91938ab 100644
--- a/src/lua/lua_message.c
+++ b/src/lua/lua_message.c
@@ -206,5 +206,7 @@ luaopen_message (lua_State * L)
lua_newclass (L, "rspamd{message}", msglib_m);
luaL_openlib (L, "rspamd_message", null_reg, 0);
+ lua_pop (L, 1); /* remove metatable from stack */
+
return 1;
}
diff --git a/src/lua/lua_session.c b/src/lua/lua_session.c
index 5ccfc76d3..9f4af2882 100644
--- a/src/lua/lua_session.c
+++ b/src/lua/lua_session.c
@@ -334,9 +334,13 @@ luaopen_session (lua_State * L)
luaL_openlib (L, NULL, sessionlib_m, 0);
luaL_openlib(L, "rspamd_session", sessionlib_f, 0);
+ lua_pop (L, 1); /* remove metatable from stack */
+
/* Simple event class */
lua_newclass (L, "rspamd{event}", eventlib_m);
luaL_openlib (L, "rspamd_event", null_reg, 0);
+ lua_pop (L, 1); /* remove metatable from stack */
+
return 1;
}
diff --git a/src/lua/lua_task.c b/src/lua/lua_task.c
index 3aa204597..abb8cc15d 100644
--- a/src/lua/lua_task.c
+++ b/src/lua/lua_task.c
@@ -42,8 +42,14 @@
extern stat_file_t* get_statfile_by_symbol (statfile_pool_t *pool, struct classifier_config *ccf,
const gchar *symbol, struct statfile **st, gboolean try_create);
+/* Task creation */
+LUA_FUNCTION_DEF (task, create_empty);
+LUA_FUNCTION_DEF (task, create_from_buffer);
/* Task methods */
LUA_FUNCTION_DEF (task, get_message);
+LUA_FUNCTION_DEF (task, process_message);
+LUA_FUNCTION_DEF (task, set_cfg);
+LUA_FUNCTION_DEF (task, destroy);
LUA_FUNCTION_DEF (task, get_mempool);
LUA_FUNCTION_DEF (task, get_ev_base);
LUA_FUNCTION_DEF (task, insert_result);
@@ -77,8 +83,17 @@ LUA_FUNCTION_DEF (task, get_metric_score);
LUA_FUNCTION_DEF (task, get_metric_action);
LUA_FUNCTION_DEF (task, learn_statfile);
+static const struct luaL_reg tasklib_f[] = {
+ LUA_INTERFACE_DEF (task, create_empty),
+ LUA_INTERFACE_DEF (task, create_from_buffer),
+ {NULL, NULL}
+};
+
static const struct luaL_reg tasklib_m[] = {
LUA_INTERFACE_DEF (task, get_message),
+ LUA_INTERFACE_DEF (task, destroy),
+ LUA_INTERFACE_DEF (task, process_message),
+ LUA_INTERFACE_DEF (task, set_cfg),
LUA_INTERFACE_DEF (task, get_mempool),
LUA_INTERFACE_DEF (task, get_ev_base),
LUA_INTERFACE_DEF (task, insert_result),
@@ -204,17 +219,96 @@ lua_check_url (lua_State * L)
}
/*** Task interface ***/
+
+static int
+lua_task_create_empty (lua_State *L)
+{
+ struct worker_task **ptask, *task;
+
+ task = construct_task (NULL);
+ ptask = lua_newuserdata (L, sizeof (gpointer));
+ lua_setclass (L, "rspamd{task}", -1);
+ *ptask = task;
+ return 1;
+}
+
+static int
+lua_task_create_from_buffer (lua_State *L)
+{
+ struct worker_task **ptask, *task;
+ const gchar *data;
+ size_t len;
+
+ data = luaL_checklstring (L, 1, &len);
+ if (data) {
+ task = construct_task (NULL);
+ ptask = lua_newuserdata (L, sizeof (gpointer));
+ lua_setclass (L, "rspamd{task}", -1);
+ *ptask = task;
+ task->msg = memory_pool_alloc (task->task_pool, sizeof (f_str_t));
+ task->msg->begin = memory_pool_alloc (task->task_pool, len);
+ memcpy (task->msg->begin, data, len);
+ task->msg->len = len;
+ }
+ return 1;
+}
+
+static int
+lua_task_process_message (lua_State *L)
+{
+ struct worker_task *task = lua_check_task (L);
+
+ if (task != NULL && task->msg != NULL && task->msg->len > 0) {
+ if (process_message (task) == 0) {
+ lua_pushboolean (L, TRUE);
+ }
+ else {
+ lua_pushboolean (L, FALSE);
+ }
+ }
+ else {
+ lua_pushboolean (L, FALSE);
+ }
+
+ return 1;
+}
+static int
+lua_task_set_cfg (lua_State *L)
+{
+ struct worker_task *task = lua_check_task (L);
+ void *ud = luaL_checkudata (L, 2, "rspamd{config}");
+
+ luaL_argcheck (L, ud != NULL, 1, "'config' expected");
+ task->cfg = ud ? *((struct config_file **)ud) : NULL;
+ return 0;
+}
+
+static int
+lua_task_destroy (lua_State *L)
+{
+ struct worker_task *task = lua_check_task (L);
+
+ if (task != NULL) {
+ free_task (task, FALSE);
+ }
+
+ return 0;
+}
+
static int
lua_task_get_message (lua_State * L)
{
GMimeMessage **pmsg;
struct worker_task *task = lua_check_task (L);
- if (task != NULL) {
+ if (task != NULL && task->message != NULL) {
pmsg = lua_newuserdata (L, sizeof (GMimeMessage *));
lua_setclass (L, "rspamd{message}", -1);
*pmsg = task->message;
}
+ else {
+ lua_pushnil (L);
+ }
return 1;
}
@@ -229,6 +323,9 @@ lua_task_get_mempool (lua_State * L)
lua_setclass (L, "rspamd{mempool}", -1);
*ppool = task->task_pool;
}
+ else {
+ lua_pushnil (L);
+ }
return 1;
}
@@ -243,6 +340,9 @@ lua_task_get_ev_base (lua_State * L)
lua_setclass (L, "rspamd{ev_base}", -1);
*pbase = task->ev_base;
}
+ else {
+ lua_pushnil (L);
+ }
return 1;
}
@@ -267,7 +367,7 @@ lua_task_insert_result (lua_State * L)
insert_result (task, symbol_name, flag, params);
}
- return 1;
+ return 0;
}
static gint
@@ -290,7 +390,7 @@ lua_task_set_pre_result (lua_State * L)
}
}
}
- return 1;
+ return 0;
}
struct lua_tree_cb_data {
@@ -1740,8 +1840,9 @@ lua_url_get_phished (lua_State *L)
gint
luaopen_task (lua_State * L)
{
- lua_newclass (L, "rspamd{task}", tasklib_m);
- luaL_openlib (L, "rspamd_task", null_reg, 0);
+ lua_newclass_full (L, "rspamd{task}", "rspamd_task", tasklib_m, tasklib_f);
+
+ lua_pop (L, 1); /* remove metatable from stack */
return 1;
}
@@ -1752,6 +1853,8 @@ luaopen_textpart (lua_State * L)
lua_newclass (L, "rspamd{textpart}", textpartlib_m);
luaL_openlib (L, "rspamd_textpart", null_reg, 0);
+ lua_pop (L, 1); /* remove metatable from stack */
+
return 1;
}
@@ -1761,6 +1864,8 @@ luaopen_image (lua_State * L)
lua_newclass (L, "rspamd{image}", imagelib_m);
luaL_openlib (L, "rspamd_image", null_reg, 0);
+ lua_pop (L, 1); /* remove metatable from stack */
+
return 1;
}
@@ -1770,6 +1875,8 @@ luaopen_url (lua_State * L)
lua_newclass (L, "rspamd{url}", urllib_m);
luaL_openlib (L, "rspamd_url", null_reg, 0);
+ lua_pop (L, 1); /* remove metatable from stack */
+
return 1;
}
diff --git a/src/lua/lua_upstream.c b/src/lua/lua_upstream.c
index f9e74e027..6780a756a 100644
--- a/src/lua/lua_upstream.c
+++ b/src/lua/lua_upstream.c
@@ -505,6 +505,8 @@ luaopen_upstream (lua_State * L)
luaL_openlib (L, NULL, upstream_list_m, 0);
luaL_openlib (L, "upstream_list", upstream_list_f, 0);
+ lua_pop (L, 1); /* remove metatable from stack */
+
luaL_newmetatable (L, "rspamd{upstream}");
lua_pushstring (L, "__index");
lua_pushvalue (L, -2);
@@ -517,5 +519,7 @@ luaopen_upstream (lua_State * L)
luaL_openlib (L, NULL, upstream_m, 0);
luaL_openlib (L, "upstream", upstream_f, 0);
+ lua_pop (L, 1); /* remove metatable from stack */
+
return 1;
}
diff --git a/src/lua_worker.c b/src/lua_worker.c
new file mode 100644
index 000000000..b7daf7ce7
--- /dev/null
+++ b/src/lua_worker.c
@@ -0,0 +1,516 @@
+/* Copyright (c) 2010-2012, Vsevolod Stakhov
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above copyright
+ * notice, this list of conditions and the following disclaimer in the
+ * documentation and/or other materials provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED ''AS IS'' AND ANY
+ * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+ * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
+ * DISCLAIMED. IN NO EVENT SHALL AUTHOR BE LIABLE FOR ANY
+ * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
+ * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
+ * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
+ * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
+ * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+
+#include "config.h"
+#include "util.h"
+#include "main.h"
+#include "protocol.h"
+#include "upstream.h"
+#include "cfg_file.h"
+#include "cfg_xml.h"
+#include "url.h"
+#include "message.h"
+#include "map.h"
+#include "dns.h"
+
+#include "lua/lua_common.h"
+
+#ifdef WITH_GPERF_TOOLS
+# include <glib/gprintf.h>
+#endif
+
+/* 60 seconds for worker's IO */
+#define DEFAULT_WORKER_IO_TIMEOUT 60000
+
+gpointer init_lua_worker (void);
+void start_lua_worker (struct rspamd_worker *worker);
+
+worker_t lua_worker = {
+ "lua", /* Name */
+ init_lua_worker, /* Init function */
+ start_lua_worker, /* Start function */
+ TRUE, /* Has socket */
+ FALSE, /* Non unique */
+ FALSE, /* Non threaded */
+ TRUE /* Killable */
+};
+
+/*
+ * Worker's context
+ */
+struct rspamd_lua_worker_ctx {
+ /* DNS resolver */
+ struct rspamd_dns_resolver *resolver;
+ /* Events base */
+ struct event_base *ev_base;
+ /* Other params */
+ GHashTable *params;
+ /* Lua script to load */
+ gchar *file;
+ /* Lua state */
+ lua_State *L;
+ /* Callback for accept */
+ gint cbref_accept;
+ /* Callback for finishing */
+ gint cbref_fin;
+ /* Config file */
+ struct config_file *cfg;
+};
+
+/* Lua bindings */
+LUA_FUNCTION_DEF (worker, get_ev_base);
+LUA_FUNCTION_DEF (worker, register_accept_callback);
+LUA_FUNCTION_DEF (worker, register_exit_callback);
+LUA_FUNCTION_DEF (worker, get_option);
+LUA_FUNCTION_DEF (worker, get_resolver);
+LUA_FUNCTION_DEF (worker, get_cfg);
+
+static const struct luaL_reg lua_workerlib_m[] = {
+ LUA_INTERFACE_DEF (worker, get_ev_base),
+ LUA_INTERFACE_DEF (worker, register_accept_callback),
+ LUA_INTERFACE_DEF (worker, register_exit_callback),
+ LUA_INTERFACE_DEF (worker, get_option),
+ LUA_INTERFACE_DEF (worker, get_resolver),
+ LUA_INTERFACE_DEF (worker, get_cfg),
+ {"__tostring", lua_class_tostring},
+ {NULL, NULL}
+};
+
+static sig_atomic_t wanna_die = 0;
+
+/* Basic functions of LUA API for worker object */
+static gint
+luaopen_lua_worker (lua_State * L)
+{
+ lua_newclass (L, "rspamd{worker}", lua_workerlib_m);
+ luaL_openlib (L, "rspamd_worker", null_reg, 0);
+
+ lua_pop (L, 1); /* remove metatable from stack */
+
+ return 1;
+}
+
+struct rspamd_lua_worker_ctx *
+lua_check_lua_worker (lua_State * L)
+{
+ void *ud = luaL_checkudata (L, 1, "rspamd{worker}");
+ luaL_argcheck (L, ud != NULL, 1, "'worker' expected");
+ return ud ? *((struct rspamd_lua_worker_ctx **)ud) : NULL;
+}
+
+static int
+lua_worker_get_ev_base (lua_State *L)
+{
+ struct rspamd_lua_worker_ctx *ctx = lua_check_lua_worker (L);
+ struct event_base **pbase;
+
+ if (ctx) {
+ pbase = lua_newuserdata (L, sizeof (struct event_base *));
+ lua_setclass (L, "rspamd{ev_base}", -1);
+ *pbase = ctx->ev_base;
+ }
+ else {
+ lua_pushnil (L);
+ }
+
+ return 1;
+}
+
+static int
+lua_worker_register_accept_callback (lua_State *L)
+{
+ struct rspamd_lua_worker_ctx *ctx = lua_check_lua_worker (L);
+
+ if (ctx) {
+ if (!lua_isfunction (L, 2)) {
+ msg_err ("invalid callback passed");
+ lua_pushnil (L);
+ }
+ else {
+ lua_pushvalue (L, 2);
+ ctx->cbref_accept = luaL_ref (L, LUA_REGISTRYINDEX);
+ return 0;
+ }
+ }
+ else {
+ lua_pushnil (L);
+ }
+
+ return 1;
+}
+
+static int
+lua_worker_register_exit_callback (lua_State *L)
+{
+ struct rspamd_lua_worker_ctx *ctx = lua_check_lua_worker (L);
+
+ if (ctx) {
+ if (!lua_isfunction (L, 2)) {
+ msg_err ("invalid callback passed");
+ lua_pushnil (L);
+ }
+ else {
+ lua_pushvalue (L, 2);
+ ctx->cbref_fin = luaL_ref (L, LUA_REGISTRYINDEX);
+ return 0;
+ }
+ }
+ else {
+ lua_pushnil (L);
+ }
+
+ return 1;
+}
+
+static int
+lua_worker_get_option (lua_State *L)
+{
+ struct rspamd_lua_worker_ctx *ctx = lua_check_lua_worker (L);
+ GList *val;
+ gint i;
+ const gchar *name;
+
+ if (ctx) {
+ name = luaL_checkstring (L, 2);
+ if (name == NULL) {
+ msg_err ("no name specified");
+ lua_pushnil (L);
+ }
+ else {
+ val = g_hash_table_lookup (ctx->params, name);
+ if (val == NULL) {
+ lua_pushnil (L);
+ }
+ else {
+ /* Push the array */
+ i = 1;
+ lua_newtable (L);
+ while (val) {
+ lua_pushstring (L, val->data);
+ lua_rawseti (L, -2, i++);
+ val = g_list_next (val);
+ }
+ }
+ }
+ }
+ else {
+ lua_pushnil (L);
+ }
+
+ return 1;
+}
+
+static int
+lua_worker_get_resolver (lua_State *L)
+{
+ struct rspamd_lua_worker_ctx *ctx = lua_check_lua_worker (L);
+
+ if (ctx) {
+ /* XXX: implement resolver API */
+ }
+ else {
+ lua_pushnil (L);
+ }
+
+ return 1;
+}
+
+static int
+lua_worker_get_cfg (lua_State *L)
+{
+ struct rspamd_lua_worker_ctx *ctx = lua_check_lua_worker (L);
+ struct config_file **pcfg;
+
+ if (ctx) {
+ pcfg = lua_newuserdata (L, sizeof (gpointer));
+ lua_setclass (L, "rspamd{config}", -1);
+ *pcfg = ctx->cfg;
+ }
+ else {
+ lua_pushnil (L);
+ }
+
+ return 1;
+}
+
+/* End of lua API */
+/* Signal handlers */
+
+#ifndef HAVE_SA_SIGINFO
+static void
+sig_handler (gint signo)
+#else
+static void
+sig_handler (gint signo, siginfo_t * info, void *unused)
+#endif
+{
+ struct timeval tv;
+
+ switch (signo) {
+ case SIGINT:
+ case SIGTERM:
+ if (!wanna_die) {
+ wanna_die = 1;
+ tv.tv_sec = 0;
+ tv.tv_usec = 0;
+ event_loopexit (&tv);
+
+#ifdef WITH_GPERF_TOOLS
+ ProfilerStop ();
+#endif
+ }
+ break;
+ }
+}
+
+/*
+ * Config reload is designed by sending sigusr2 to active workers and pending shutdown of them
+ */
+static void
+sigusr2_handler (gint fd, short what, void *arg)
+{
+ struct rspamd_worker *worker = (struct rspamd_worker *) arg;
+ /* Do not accept new connections, preparing to end worker's process */
+ struct timeval tv;
+
+ if (!wanna_die) {
+ tv.tv_sec = SOFT_SHUTDOWN_TIME;
+ tv.tv_usec = 0;
+ event_del (&worker->sig_ev_usr1);
+ event_del (&worker->sig_ev_usr2);
+ event_del (&worker->bind_ev);
+ msg_info ("worker's shutdown is pending in %d sec", SOFT_SHUTDOWN_TIME);
+ event_loopexit (&tv);
+ }
+ return;
+}
+
+/*
+ * Reopen log is designed by sending sigusr1 to active workers and pending shutdown of them
+ */
+static void
+sigusr1_handler (gint fd, short what, void *arg)
+{
+ struct rspamd_worker *worker = (struct rspamd_worker *) arg;
+
+ reopen_log (worker->srv->logger);
+
+ return;
+}
+
+static gboolean
+handle_lua_param (struct config_file *cfg, struct rspamd_xml_userdata *unused, GHashTable *attrs, gchar *data, gpointer user_data, gpointer dest_struct, gint offset)
+{
+ struct rspamd_lua_worker_ctx *ctx = dest_struct;
+ GList *val;
+ gchar *tag = user_data;
+
+ val = g_hash_table_lookup (ctx->params, tag);
+ if (val == NULL) {
+ g_hash_table_insert (ctx->params, g_strdup (tag), g_list_prepend (NULL, data));
+ }
+ else {
+ val = g_list_append (val, data);
+ }
+
+ return TRUE;
+}
+
+
+/*
+ * Accept new connection and construct task
+ */
+static void
+lua_accept_socket (gint fd, short what, void *arg)
+{
+ struct rspamd_worker *worker = (struct rspamd_worker *) arg;
+ struct rspamd_lua_worker_ctx *ctx, **pctx;
+ union sa_union su;
+ socklen_t addrlen = sizeof (su.ss);
+ gint nfd;
+ struct in_addr addr;
+ gchar *addr_str = NULL;
+ lua_State *L;
+
+ ctx = worker->ctx;
+ L = ctx->L;
+
+ if ((nfd =
+ accept_from_socket (fd, (struct sockaddr *) &su.ss, &addrlen)) == -1) {
+ msg_warn ("accept failed: %s", strerror (errno));
+ return;
+ }
+ /* Check for EAGAIN */
+ if (nfd == 0){
+ return;
+ }
+
+ if (su.ss.ss_family == AF_UNIX) {
+ msg_info ("accepted connection from unix socket");
+ addr.s_addr = INADDR_NONE;
+ addr_str = "127.0.0.1";
+ }
+ else if (su.ss.ss_family == AF_INET) {
+ msg_info ("accepted connection from %s port %d",
+ inet_ntoa (su.s4.sin_addr), ntohs (su.s4.sin_port));
+ memcpy (&addr, &su.s4.sin_addr,
+ sizeof (struct in_addr));
+ addr_str = g_strdup (inet_ntoa (su.s4.sin_addr));
+ }
+
+ /* Call finalizer function */
+ lua_rawgeti (L, LUA_REGISTRYINDEX, ctx->cbref_accept);
+ pctx = lua_newuserdata (L, sizeof (gpointer));
+ lua_setclass (L, "rspamd{worker}", -1);
+ *pctx = ctx;
+ lua_pushinteger (L, nfd);
+ lua_pushstring (L, addr_str);
+ lua_pushinteger (L, addr.s_addr);
+
+
+ if (lua_pcall (L, 4, 0, 0) != 0) {
+ msg_info ("call to worker accept failed: %s", lua_tostring (L, -1));
+ }
+
+ if (addr_str) {
+ g_free (addr_str);
+ }
+}
+
+gpointer
+init_lua_worker (void)
+{
+ struct rspamd_lua_worker_ctx *ctx;
+ GQuark type;
+
+ type = g_quark_try_string ("lua");
+
+ ctx = g_malloc0 (sizeof (struct rspamd_lua_worker_ctx));
+ ctx->params = g_hash_table_new_full (g_str_hash, g_str_equal, g_free, (GDestroyNotify)g_list_free);
+
+
+ register_worker_opt (type, "file", xml_handle_string, ctx, G_STRUCT_OFFSET (struct rspamd_lua_worker_ctx, file));
+ register_worker_opt (type, "*", handle_lua_param, ctx, 0);
+
+ return ctx;
+}
+
+/*
+ * Start worker process
+ */
+void
+start_lua_worker (struct rspamd_worker *worker)
+{
+ struct sigaction signals;
+ struct rspamd_lua_worker_ctx *ctx = worker->ctx, **pctx;
+ lua_State *L;
+
+#ifdef WITH_PROFILER
+ extern void _start (void), etext (void);
+ monstartup ((u_long) & _start, (u_long) & etext);
+#endif
+
+ gperf_profiler_init (worker->srv->cfg, "lua_worker");
+
+ worker->srv->pid = getpid ();
+
+ ctx->ev_base = event_init ();
+
+ L = worker->srv->cfg->lua_state;
+ ctx->L = L;
+ ctx->cfg = worker->srv->cfg;
+
+ init_signals (&signals, sig_handler);
+ sigprocmask (SIG_UNBLOCK, &signals.sa_mask, NULL);
+
+ /* SIGUSR2 handler */
+ signal_set (&worker->sig_ev_usr2, SIGUSR2, sigusr2_handler, (void *) worker);
+ event_base_set (ctx->ev_base, &worker->sig_ev_usr2);
+ signal_add (&worker->sig_ev_usr2, NULL);
+
+ /* SIGUSR1 handler */
+ signal_set (&worker->sig_ev_usr1, SIGUSR1, sigusr1_handler, (void *) worker);
+ event_base_set (ctx->ev_base, &worker->sig_ev_usr1);
+ signal_add (&worker->sig_ev_usr1, NULL);
+
+ /* Accept event */
+ event_set (&worker->bind_ev, worker->cf->listen_sock, EV_READ | EV_PERSIST,
+ lua_accept_socket, (void *) worker);
+ event_base_set (ctx->ev_base, &worker->bind_ev);
+ event_add (&worker->bind_ev, NULL);
+
+ /* Open worker's lib */
+ luaopen_lua_worker (L);
+
+ if (ctx->file == NULL) {
+ msg_err ("No lua script defined, so no reason to exist");
+ exit (EXIT_SUCCESS);
+ }
+ if (access (ctx->file, R_OK) == -1) {
+ msg_err ("Error reading lua script %s: %s", ctx->file, strerror (errno));
+ exit (EXIT_SUCCESS);
+ }
+
+ pctx = lua_newuserdata (L, sizeof (gpointer));
+ lua_setclass (L, "rspamd{worker}", -1);
+ lua_setglobal (L, "rspamd_worker");
+ *pctx = ctx;
+
+ if (luaL_dofile (L, ctx->file) != 0) {
+ msg_err ("Error executing lua script %s: %s", ctx->file, lua_tostring (L, -1));
+ exit (EXIT_SUCCESS);
+ }
+
+ if (ctx->cbref_accept == 0) {
+ msg_err ("No accept function defined, so no reason to exist");
+ exit (EXIT_SUCCESS);
+ }
+
+ /* Maps events */
+ start_map_watch (ctx->ev_base);
+ ctx->resolver = dns_resolver_init (ctx->ev_base, worker->srv->cfg);
+
+ event_base_loop (ctx->ev_base, 0);
+ luaL_unref (L, LUA_REGISTRYINDEX, ctx->cbref_accept);
+ if (ctx->cbref_fin != 0) {
+ /* Call finalizer function */
+ lua_rawgeti (L, LUA_REGISTRYINDEX, ctx->cbref_fin);
+ pctx = lua_newuserdata (L, sizeof (gpointer));
+ lua_setclass (L, "rspamd{worker}", -1);
+ *pctx = ctx;
+ if (lua_pcall (L, 1, 0, 0) != 0) {
+ msg_info ("call to worker finalizer failed: %s", lua_tostring (L, -1));
+ }
+ /* Free resources */
+ luaL_unref (L, LUA_REGISTRYINDEX, ctx->cbref_fin);
+ }
+
+ close_log (rspamd_main->logger);
+ exit (EXIT_SUCCESS);
+}
+
+/*
+ * vi:ts=4
+ */
+
diff --git a/src/worker_util.c b/src/worker_util.c
index ba4444485..976fe58c3 100644
--- a/src/worker_util.c
+++ b/src/worker_util.c
@@ -52,7 +52,9 @@ construct_task (struct rspamd_worker *worker)
new_task->worker = worker;
new_task->state = READ_COMMAND;
- new_task->cfg = worker->srv->cfg;
+ if (worker) {
+ new_task->cfg = worker->srv->cfg;
+ }
new_task->view_checked = FALSE;
#ifdef HAVE_CLOCK_GETTIME
# ifdef HAVE_CLOCK_PROCESS_CPUTIME_ID