]> source.dussan.org Git - rspamd.git/commitdiff
* Add lua worker type and lua worker bindings.
authorVsevolod Stakhov <vsevolod@rambler-co.ru>
Thu, 26 Jul 2012 17:50:13 +0000 (21:50 +0400)
committerVsevolod Stakhov <vsevolod@rambler-co.ru>
Thu, 26 Jul 2012 17:50:13 +0000 (21:50 +0400)
* Add lua utility library for basic utils.
* Fixes lua_buffer code.
Fix lua loading error.
Added some other lua utility functions.

15 files changed:
CMakeLists.txt
src/cfg_xml.c
src/lua/lua_buffer.c
src/lua/lua_cdb.c
src/lua/lua_classifier.c
src/lua/lua_common.c
src/lua/lua_common.h
src/lua/lua_config.c
src/lua/lua_mempool.c
src/lua/lua_message.c
src/lua/lua_session.c
src/lua/lua_task.c
src/lua/lua_upstream.c
src/lua_worker.c [new file with mode: 0644]
src/worker_util.c

index d01a6672223650c65f01d7518b6edb5b3c35555b..5379869ac83d529fec2d72602baeac19d0dd8a66 100644 (file)
@@ -930,6 +930,7 @@ SET(RSPAMDSRC       src/modules.c
                                src/fuzzy_storage.c
                                src/kvstorage_server.c
                                src/lmtp.c
+                               src/lua_worker.c
                                src/main.c
                                src/map.c
                                src/smtp.c
@@ -951,7 +952,7 @@ SET(PLUGINSSRC      src/plugins/surbl.c
                                src/plugins/dkim_check.c)
                                
 SET(MODULES_LIST surbl regexp chartable fuzzy_check spf dkim)
-SET(WORKERS_LIST normal controller smtp smtp_proxy lmtp fuzzy keystorage)
+SET(WORKERS_LIST normal controller smtp smtp_proxy lmtp fuzzy keystorage lua)
 
 AddModules(MODULES_LIST WORKERS_LIST)
 
index ad542265d6561141dee69bddcafe556797b90c57..45b67854807d0ef668339a9959b7bebbca206d53 100644 (file)
@@ -862,7 +862,27 @@ worker_foreach_callback (gpointer k, gpointer v, gpointer ud)
 
        if (!worker_options || (worker_config = g_hash_table_lookup (worker_options, &cd->wrk->type)) == NULL ||
                        (cparam = g_hash_table_lookup (worker_config, k)) == NULL) {
-               msg_warn ("unregistered worker attribute '%s' for worker %s", k, g_quark_to_string (cd->wrk->type));
+               /* Try to use universal handler if there is no specific handler */
+               if ((cparam = g_hash_table_lookup (worker_config, "*")) != NULL) {
+                       if (cd->wrk->ctx != NULL) {
+                               if (param->is_list) {
+                                       cur = param->d.list;
+                                       while (cur) {
+                                               cparam->handler (cd->cfg, cd->ctx, NULL, cur->data, k, cd->wrk->ctx, cparam->offset);
+                                               cur = g_list_next (cur);
+                                       }
+                               }
+                               else {
+                                       cparam->handler (cd->cfg, cd->ctx, NULL, param->d.param, k, cd->wrk->ctx, cparam->offset);
+                               }
+                       }
+                       else {
+                               msg_err ("Bad error detected: worker %s has not initialized its context", g_quark_to_string (cd->wrk->type));
+                       }
+               }
+               else {
+                       msg_warn ("unregistered worker attribute '%s' for worker %s", k, g_quark_to_string (cd->wrk->type));
+               }
        }
        else {
 
index bb5b1ea8cf3497316a2c17272c02e2df96730ece..9c18ef5cf0068cd882f00a8a47f12418ed8a803a 100644 (file)
@@ -34,14 +34,14 @@ LUA_FUNCTION_DEF (io_dispatcher, set_policy);
 LUA_FUNCTION_DEF (io_dispatcher, write);
 LUA_FUNCTION_DEF (io_dispatcher, pause);
 LUA_FUNCTION_DEF (io_dispatcher, restore);
-LUA_FUNCTION_DEF (io_dispatcher, delete);
+LUA_FUNCTION_DEF (io_dispatcher, destroy);
 
 static const struct luaL_reg    io_dispatcherlib_m[] = {
        LUA_INTERFACE_DEF (io_dispatcher, set_policy),
        LUA_INTERFACE_DEF (io_dispatcher, write),
        LUA_INTERFACE_DEF (io_dispatcher, pause),
        LUA_INTERFACE_DEF (io_dispatcher, restore),
-       {"__gc", lua_io_dispatcher_delete},
+       LUA_INTERFACE_DEF (io_dispatcher, destroy),
        {"__tostring", lua_class_tostring},
        {NULL, NULL}
 };
@@ -90,12 +90,12 @@ lua_io_read_cb (f_str_t * in, void *arg)
                need_unlock = TRUE;
        }
        /* callback (dispatcher, data) */
+       lua_rawgeti (cbdata->L, LUA_REGISTRYINDEX, cbdata->cbref_read);
        pdispatcher = lua_newuserdata (cbdata->L, sizeof (struct rspamd_io_dispatcher_s *));
        lua_setclass (cbdata->L, "rspamd{io_dispatcher}", -1);
        *pdispatcher = cbdata->d;
        lua_pushlstring (cbdata->L, in->begin, in->len);
 
-       lua_rawgeti (cbdata->L, LUA_REGISTRYINDEX, cbdata->cbref_read);
        if (lua_pcall (cbdata->L, 2, 1, 0) != 0) {
                msg_info ("call to session finalizer failed: %s", lua_tostring (cbdata->L, -1));
        }
@@ -107,16 +107,6 @@ lua_io_read_cb (f_str_t * in, void *arg)
                g_mutex_unlock (lua_mtx);
        }
 
-       if (!res) {
-               /* Unref callbacks */
-               luaL_unref (cbdata->L, LUA_REGISTRYINDEX, cbdata->cbref_read);
-               if (cbdata->cbref_write) {
-                       luaL_unref (cbdata->L, LUA_REGISTRYINDEX, cbdata->cbref_write);
-               }
-               luaL_unref (cbdata->L, LUA_REGISTRYINDEX, cbdata->cbref_err);
-               g_slice_free1 (sizeof (struct lua_dispatcher_cbdata), cbdata);
-       }
-
        return res;
 }
 
@@ -132,12 +122,13 @@ lua_io_write_cb (void *arg)
                if (g_mutex_trylock (lua_mtx)) {
                        need_unlock = TRUE;
                }
+               lua_rawgeti (cbdata->L, LUA_REGISTRYINDEX, cbdata->cbref_read);
                /* callback (dispatcher) */
                pdispatcher = lua_newuserdata (cbdata->L, sizeof (struct rspamd_io_dispatcher_s *));
                lua_setclass (cbdata->L, "rspamd{io_dispatcher}", -1);
                *pdispatcher = cbdata->d;
 
-               lua_rawgeti (cbdata->L, LUA_REGISTRYINDEX, cbdata->cbref_read);
+
                if (lua_pcall (cbdata->L, 1, 1, 0) != 0) {
                        msg_info ("call to session finalizer failed: %s", lua_tostring (cbdata->L, -1));
                }
@@ -148,14 +139,6 @@ lua_io_write_cb (void *arg)
                if (need_unlock) {
                        g_mutex_unlock (lua_mtx);
                }
-
-               if (!res) {
-                       /* Unref callbacks */
-                       luaL_unref (cbdata->L, LUA_REGISTRYINDEX, cbdata->cbref_read);
-                       luaL_unref (cbdata->L, LUA_REGISTRYINDEX, cbdata->cbref_write);
-                       luaL_unref (cbdata->L, LUA_REGISTRYINDEX, cbdata->cbref_err);
-                       g_slice_free1 (sizeof (struct lua_dispatcher_cbdata), cbdata);
-               }
        }
 
        return res;
@@ -173,12 +156,12 @@ lua_io_err_cb (GError * err, void *arg)
                need_unlock = TRUE;
        }
        /* callback (dispatcher, err) */
+       lua_rawgeti (cbdata->L, LUA_REGISTRYINDEX, cbdata->cbref_err);
        pdispatcher = lua_newuserdata (cbdata->L, sizeof (struct rspamd_io_dispatcher_s *));
        lua_setclass (cbdata->L, "rspamd{io_dispatcher}", -1);
        *pdispatcher = cbdata->d;
        lua_pushstring (cbdata->L, err->message);
 
-       lua_rawgeti (cbdata->L, LUA_REGISTRYINDEX, cbdata->cbref_read);
        if (lua_pcall (cbdata->L, 2, 0, 0) != 0) {
                msg_info ("call to session finalizer failed: %s", lua_tostring (cbdata->L, -1));
        }
@@ -234,8 +217,12 @@ lua_io_dispatcher_create (lua_State *L)
                        tv_num = lua_tonumber (L, 6);
                        tv.tv_sec = trunc (tv_num);
                        tv.tv_usec = modf (tv_num, &tmp) * 1000.;
+                       io_dispatcher = rspamd_create_dispatcher (cbdata->base, fd, BUFFER_LINE, lua_io_read_cb, lua_io_write_cb, lua_io_err_cb, &tv, cbdata);
+               }
+               else {
+                       io_dispatcher = rspamd_create_dispatcher (cbdata->base, fd, BUFFER_LINE, lua_io_read_cb, lua_io_write_cb, lua_io_err_cb, NULL, cbdata);
                }
-               io_dispatcher = rspamd_create_dispatcher (cbdata->base, fd, BUFFER_LINE, lua_io_read_cb, lua_io_write_cb, lua_io_err_cb, &tv, cbdata);
+
                cbdata->d = io_dispatcher;
                /* Push result */
                pdispatcher = lua_newuserdata (L, sizeof (struct rspamd_io_dispatcher_s *));
@@ -338,7 +325,7 @@ lua_io_dispatcher_restore (lua_State *L)
 }
 
 static int
-lua_io_dispatcher_delete (lua_State *L)
+lua_io_dispatcher_destroy (lua_State *L)
 {
        struct rspamd_io_dispatcher_s                                   *io_dispatcher = lua_check_io_dispatcher (L);
 
@@ -369,10 +356,14 @@ luaopen_io_dispatcher (lua_State * L)
        luaL_openlib (L, NULL, io_dispatcherlib_m, 0);
        luaL_openlib(L, "rspamd_io_dispatcher", io_dispatcherlib_f, 0);
 
+       lua_pop(L, 1);                      /* remove metatable from stack */
+
        /* Simple event class */
        lua_newclass (L, "rspamd{ev_base}", null_reg);
        luaL_openlib (L, "rspamd_ev_base", null_reg, 0);
 
+       lua_pop(L, 1);                      /* remove metatable from stack */
+
        /* Set buffer types globals */
        lua_pushnumber (L, BUFFER_LINE);
        lua_setglobal (L, "IO_BUFFER_LINE");
index 7adfeeac539f7e72b5b2b0ee77cbbde3cf36f13d..84fef66d98f65a027770c886202391de427092db 100644 (file)
@@ -161,5 +161,7 @@ luaopen_cdb (lua_State * L)
        luaL_openlib (L, NULL, cdblib_m, 0);
        luaL_openlib (L, "cdb", cdblib_f, 0);
 
+       lua_pop (L, 1);                      /* remove metatable from stack */
+
        return 1;
 }
index edaf4e7a61553accce99f3fb16713ce54b928267..e929c6b50f0f23c64e20108ad3f9eb27778745a2 100644 (file)
@@ -371,6 +371,8 @@ luaopen_classifier (lua_State * L)
        lua_newclass (L, "rspamd{classifier}", classifierlib_m);
        luaL_openlib (L, "rspamd_classifier", null_reg, 0);
 
+       lua_pop (L, 1);                      /* remove metatable from stack */
+
        return 1;
 }
 
@@ -380,6 +382,8 @@ luaopen_statfile (lua_State * L)
        lua_newclass (L, "rspamd{statfile}", statfilelib_m);
        luaL_openlib (L, "rspamd_statfile", null_reg, 0);
 
+       lua_pop (L, 1);                      /* remove metatable from stack */
+
        return 1;
 }
 
index 62fb5389b664ba7f96f132e0b80962e1b3bb6257..6a671cf00f900dc1807f690178917fa6bd8f9f4b 100644 (file)
@@ -42,7 +42,7 @@ LUA_FUNCTION_DEF (logger, warn);
 LUA_FUNCTION_DEF (logger, info);
 LUA_FUNCTION_DEF (logger, debug);
 
-static const struct luaL_reg    loggerlib_m[] = {
+static const struct luaL_reg    loggerlib_f[] = {
        LUA_INTERFACE_DEF (logger, err),
        LUA_INTERFACE_DEF (logger, warn),
        LUA_INTERFACE_DEF (logger, info),
@@ -51,9 +51,159 @@ static const struct luaL_reg    loggerlib_m[] = {
        {NULL, NULL}
 };
 
+/* util module */
+LUA_FUNCTION_DEF (util, gethostbyname);
+LUA_FUNCTION_DEF (util, strsplit);
+LUA_FUNCTION_DEF (util, close);
+LUA_FUNCTION_DEF (util, ip_to_str);
+LUA_FUNCTION_DEF (util, str_to_ip);
+
+static const struct luaL_reg    utillib_f[] = {
+       LUA_INTERFACE_DEF (util, gethostbyname),
+       LUA_INTERFACE_DEF (util, strsplit),
+       LUA_INTERFACE_DEF (util, close),
+       LUA_INTERFACE_DEF (util, ip_to_str),
+       LUA_INTERFACE_DEF (util, str_to_ip),
+       {"__tostring", lua_class_tostring},
+       {NULL, NULL}
+};
+
+/**
+ * Get numeric ip presentation of hostname
+ */
+static gint
+lua_util_gethostbyname (lua_State *L)
+{
+       const gchar                                             *name;
+       struct hostent                                  *hent;
+       struct in_addr                                   ina;
+
+       name = luaL_checkstring (L, 1);
+       if (name) {
+               hent = gethostbyname (name);
+               if (hent) {
+                       memcpy (&ina, hent->h_addr, sizeof (struct in_addr));
+                       lua_pushinteger (L, ina.s_addr);
+               }
+               else {
+                       lua_pushnil (L);
+               }
+       }
+       else {
+               lua_pushnil (L);
+       }
+       return 1;
+}
+
+/**
+ * Close file descriptor
+ */
+static gint
+lua_util_close (lua_State *L)
+{
+       gint                                                     fd;
+
+       fd = lua_tointeger (L, 1);
+
+       if (fd >= 0) {
+               close (fd);
+       }
+
+       return 0;
+}
+
+/**
+ * Convert numeric ip to string
+ */
+static gint
+lua_util_ip_to_str (lua_State *L)
+{
+       struct in_addr                                   ina;
+
+       ina.s_addr = lua_tointeger (L, 1);
+       if (ina.s_addr) {
+               lua_pushstring (L, inet_ntoa (ina));
+       }
+       else {
+               lua_pushnil (L);
+       }
+
+       return 1;
+}
+
+/**
+ * Convert string ip to numeric
+ */
+static gint
+lua_util_str_to_ip (lua_State *L)
+{
+       struct in_addr                                   ina;
+       const gchar                                             *ip;
+
+       ip = luaL_checkstring (L, 1);
+       if (ip) {
+               if (inet_aton (ip, &ina) != 0) {
+                       lua_pushinteger (L, ina.s_addr);
+               }
+               else {
+                       lua_pushnil (L);
+               }
+       }
+       else {
+               lua_pushnil (L);
+       }
+
+       return 1;
+}
+
+/**
+ * Split string to the portions using separators as the second argument
+ */
+static gint
+lua_util_strsplit (lua_State *L)
+{
+       gchar                                                   **list, **cur;
+       const gchar                                             *in, *separators = " ;,";
+       gint                                                     i;
+
+       in = luaL_checkstring (L, 1);
+
+       if (in) {
+               if (lua_gettop (L) > 1) {
+                       separators = luaL_checkstring (L, 2);
+               }
+               list = g_strsplit_set (in, separators, -1);
+               if (list) {
+                       cur = list;
+                       lua_newtable (L);
+                       i = 1;
+                       while (*cur) {
+                               lua_pushstring (L, *cur);
+                               lua_rawseti (L, -2, i++);
+                               cur ++;
+                       }
+                       g_strfreev (list);
+               }
+               else {
+                       lua_pushnil (L);
+               }
+       }
+       else {
+               lua_pushnil (L);
+       }
+
+       return 1;
+}
+
 /* Util functions */
+/**
+ * Create new class and store metatable on top of the stack
+ * @param L
+ * @param classname name of class
+ * @param func table of class methods
+ */
 void
-lua_newclass (lua_State * L, const gchar *classname, const struct luaL_reg *func)
+lua_newclass (lua_State * L, const gchar *classname, const struct luaL_reg *methods)
 {
        luaL_newmetatable (L, classname);       /* mt */
        lua_pushstring (L, "__index");
@@ -64,7 +214,17 @@ lua_newclass (lua_State * L, const gchar *classname, const struct luaL_reg *func
        lua_pushstring (L, classname);  /* mt,"__index",it,"class",classname */
        lua_rawset (L, -3);                     /* mt,"__index",it */
 
-       luaL_openlib (L, NULL, func, 0);
+       luaL_openlib (L, NULL, methods, 0);
+}
+
+/**
+ * Create and register new class with static methods and store metatable on top of the stack
+ */
+void
+lua_newclass_full (lua_State *L, const gchar *classname, const gchar *static_name, const struct luaL_reg *methods, const struct luaL_reg *func)
+{
+       lua_newclass (L, classname, methods);
+       luaL_openlib(L, static_name, func, 0);
 }
 
 gint
@@ -210,11 +370,20 @@ luaopen_rspamd (lua_State * L)
        return 1;
 }
 
-gint
+static gint
 luaopen_logger (lua_State * L)
 {
 
-       luaL_openlib (L, "rspamd_logger", loggerlib_m, 0);
+       luaL_openlib (L, "rspamd_logger", loggerlib_f, 0);
+
+       return 1;
+}
+
+
+static gint
+luaopen_util (lua_State *L)
+{
+       luaL_openlib (L, "rspamd_util", utillib_f, 0);
 
        return 1;
 }
@@ -252,9 +421,9 @@ init_lua (struct config_file *cfg)
 
        (void)luaopen_rspamd (L);
        (void)luaopen_logger (L);
+       (void)luaopen_util (L);
        (void)luaopen_mempool (L);
        (void)luaopen_config (L);
-       (void)luaopen_session (L);
        (void)luaopen_radix (L);
        (void)luaopen_hash_table (L);
        (void)luaopen_trie (L);
@@ -272,6 +441,8 @@ init_lua (struct config_file *cfg)
        (void)luaopen_redis (L);
        (void)luaopen_upstream (L);
        (void)lua_add_actions_global (L);
+       (void)luaopen_session (L);
+       (void)luaopen_io_dispatcher (L);
 
        cfg->lua_state = L;
        memory_pool_add_destructor (cfg->cfg_pool, (pool_destruct_func)lua_close, L);
index daab695fc32708fd8a032968c4a536c2169beaf4..dc417fa68ce202e5870b422188416969b265ba10 100644 (file)
@@ -24,7 +24,12 @@ extern GMutex *lua_mtx;
 /**
  * Create and register new class
  */
-void lua_newclass (lua_State *L, const gchar *classname, const struct luaL_reg *func);
+void lua_newclass (lua_State *L, const gchar *classname, const struct luaL_reg *methods);
+
+/**
+ * Create and register new class with static methods
+ */
+void lua_newclass_full (lua_State *L, const gchar *classname, const gchar *static_name, const struct luaL_reg *methods, const struct luaL_reg *func);
 
 /**
  * Set class name for object at @param objidx position
@@ -64,6 +69,7 @@ gint luaopen_redis (lua_State * L);
 gint luaopen_upstream (lua_State * L);
 gint luaopen_mempool (lua_State * L);
 gint luaopen_session (lua_State * L);
+gint luaopen_io_dispatcher (lua_State * L);
 
 void init_lua (struct config_file *cfg);
 gboolean init_lua_filters (struct config_file *cfg);
index 2258c03aa06bdf399bf43086c3b52493f4ad1f5d..217bfc761afa237230488a987d5a177018ad6f4c 100644 (file)
@@ -967,6 +967,8 @@ luaopen_config (lua_State * L)
        lua_newclass (L, "rspamd{config}", configlib_m);
        luaL_openlib (L, "rspamd_config", null_reg, 0);
 
+       lua_pop (L, 1);                      /* remove metatable from stack */
+
        return 1;
 }
 
@@ -976,6 +978,8 @@ luaopen_radix (lua_State * L)
        lua_newclass (L, "rspamd{radix}", radixlib_m);
        luaL_openlib (L, "rspamd_radix", null_reg, 0);
 
+       lua_pop (L, 1);                      /* remove metatable from stack */
+
        return 1;
 }
 
@@ -985,6 +989,8 @@ luaopen_hash_table (lua_State * L)
        lua_newclass (L, "rspamd{hash_table}", hashlib_m);
        luaL_openlib (L, "rspamd_hash_table", null_reg, 0);
 
+       lua_pop (L, 1);                      /* remove metatable from stack */
+
        return 1;
 }
 
@@ -1003,5 +1009,7 @@ luaopen_trie (lua_State * L)
        luaL_openlib (L, NULL, trielib_m, 0);
        luaL_openlib(L, "rspamd_trie", trielib_f, 0);
 
+       lua_pop (L, 1);                      /* remove metatable from stack */
+
        return 1;
 }
index 44fb3fe85e3d22dfc630b685d9e85ef33a2a9aff..40094abc45dbb5acf7c200fd5ed7c82046efdc87 100644 (file)
@@ -239,5 +239,7 @@ luaopen_mempool (lua_State * L)
        luaL_openlib (L, NULL, mempoollib_m, 0);
        luaL_openlib(L, "rspamd_mempool", mempoollib_f, 0);
 
+       lua_pop (L, 1);                      /* remove metatable from stack */
+
        return 1;       
 }
index 0dedb9068030b01e366b7d7eee20cb9da3703d3d..6c91938ab57f91ec7e62b79095753cd0b5a417a7 100644 (file)
@@ -206,5 +206,7 @@ luaopen_message (lua_State * L)
        lua_newclass (L, "rspamd{message}", msglib_m);
        luaL_openlib (L, "rspamd_message", null_reg, 0);
 
+       lua_pop (L, 1);                      /* remove metatable from stack */
+
        return 1;
 }
index 5ccfc76d3261c16e175894414c04f81556868323..9f4af288230a6812599c5a90ee772f8d8a90c8fe 100644 (file)
@@ -334,9 +334,13 @@ luaopen_session (lua_State * L)
        luaL_openlib (L, NULL, sessionlib_m, 0);
        luaL_openlib(L, "rspamd_session", sessionlib_f, 0);
 
+       lua_pop (L, 1);                      /* remove metatable from stack */
+
        /* Simple event class */
        lua_newclass (L, "rspamd{event}", eventlib_m);
        luaL_openlib (L, "rspamd_event", null_reg, 0);
 
+       lua_pop (L, 1);                      /* remove metatable from stack */
+
        return 1;       
 }
index 3aa2045977f09fb1d25f740c4fb178860a7374b3..abb8cc15d121536de85701f7936904a8fd083b63 100644 (file)
 extern stat_file_t* get_statfile_by_symbol (statfile_pool_t *pool, struct classifier_config *ccf,
                const gchar *symbol, struct statfile **st, gboolean try_create);
 
+/* Task creation */
+LUA_FUNCTION_DEF (task, create_empty);
+LUA_FUNCTION_DEF (task, create_from_buffer);
 /* Task methods */
 LUA_FUNCTION_DEF (task, get_message);
+LUA_FUNCTION_DEF (task, process_message);
+LUA_FUNCTION_DEF (task, set_cfg);
+LUA_FUNCTION_DEF (task, destroy);
 LUA_FUNCTION_DEF (task, get_mempool);
 LUA_FUNCTION_DEF (task, get_ev_base);
 LUA_FUNCTION_DEF (task, insert_result);
@@ -77,8 +83,17 @@ LUA_FUNCTION_DEF (task, get_metric_score);
 LUA_FUNCTION_DEF (task, get_metric_action);
 LUA_FUNCTION_DEF (task, learn_statfile);
 
+static const struct luaL_reg    tasklib_f[] = {
+       LUA_INTERFACE_DEF (task, create_empty),
+       LUA_INTERFACE_DEF (task, create_from_buffer),
+       {NULL, NULL}
+};
+
 static const struct luaL_reg    tasklib_m[] = {
        LUA_INTERFACE_DEF (task, get_message),
+       LUA_INTERFACE_DEF (task, destroy),
+       LUA_INTERFACE_DEF (task, process_message),
+       LUA_INTERFACE_DEF (task, set_cfg),
        LUA_INTERFACE_DEF (task, get_mempool),
        LUA_INTERFACE_DEF (task, get_ev_base),
        LUA_INTERFACE_DEF (task, insert_result),
@@ -204,17 +219,96 @@ lua_check_url (lua_State * L)
 }
 
 /*** Task interface    ***/
+
+static int
+lua_task_create_empty (lua_State *L)
+{
+       struct worker_task             **ptask, *task;
+
+       task = construct_task (NULL);
+       ptask = lua_newuserdata (L, sizeof (gpointer));
+       lua_setclass (L, "rspamd{task}", -1);
+       *ptask = task;
+       return 1;
+}
+
+static int
+lua_task_create_from_buffer (lua_State *L)
+{
+       struct worker_task             **ptask, *task;
+       const gchar                                             *data;
+       size_t                                                   len;
+
+       data = luaL_checklstring (L, 1, &len);
+       if (data) {
+               task = construct_task (NULL);
+               ptask = lua_newuserdata (L, sizeof (gpointer));
+               lua_setclass (L, "rspamd{task}", -1);
+               *ptask = task;
+               task->msg = memory_pool_alloc (task->task_pool, sizeof (f_str_t));
+               task->msg->begin = memory_pool_alloc (task->task_pool, len);
+               memcpy (task->msg->begin, data, len);
+               task->msg->len = len;
+       }
+       return 1;
+}
+
+static int
+lua_task_process_message (lua_State *L)
+{
+       struct worker_task             *task = lua_check_task (L);
+
+       if (task != NULL && task->msg != NULL && task->msg->len > 0) {
+               if (process_message (task) == 0) {
+                       lua_pushboolean (L, TRUE);
+               }
+               else {
+                       lua_pushboolean (L, FALSE);
+               }
+       }
+       else {
+               lua_pushboolean (L, FALSE);
+       }
+
+       return 1;
+}
+static int
+lua_task_set_cfg (lua_State *L)
+{
+       struct worker_task             *task = lua_check_task (L);
+       void                           *ud = luaL_checkudata (L, 2, "rspamd{config}");
+
+       luaL_argcheck (L, ud != NULL, 1, "'config' expected");
+       task->cfg = ud ? *((struct config_file **)ud) : NULL;
+       return 0;
+}
+
+static int
+lua_task_destroy (lua_State *L)
+{
+       struct worker_task             *task = lua_check_task (L);
+
+       if (task != NULL) {
+               free_task (task, FALSE);
+       }
+
+       return 0;
+}
+
 static int
 lua_task_get_message (lua_State * L)
 {
        GMimeMessage                  **pmsg;
        struct worker_task             *task = lua_check_task (L);
 
-       if (task != NULL) {
+       if (task != NULL && task->message != NULL) {
                pmsg = lua_newuserdata (L, sizeof (GMimeMessage *));
                lua_setclass (L, "rspamd{message}", -1);
                *pmsg = task->message;
        }
+       else {
+               lua_pushnil (L);
+       }
        return 1;
 }
 
@@ -229,6 +323,9 @@ lua_task_get_mempool (lua_State * L)
                lua_setclass (L, "rspamd{mempool}", -1);
                *ppool = task->task_pool;
        }
+       else {
+               lua_pushnil (L);
+       }
        return 1;
 }
 
@@ -243,6 +340,9 @@ lua_task_get_ev_base (lua_State * L)
                lua_setclass (L, "rspamd{ev_base}", -1);
                *pbase = task->ev_base;
        }
+       else {
+               lua_pushnil (L);
+       }
        return 1;
 }
 
@@ -267,7 +367,7 @@ lua_task_insert_result (lua_State * L)
 
                insert_result (task, symbol_name, flag, params);
        }
-       return 1;
+       return 0;
 }
 
 static gint
@@ -290,7 +390,7 @@ lua_task_set_pre_result (lua_State * L)
                        }
                }
        }
-       return 1;
+       return 0;
 }
 
 struct lua_tree_cb_data {
@@ -1740,8 +1840,9 @@ lua_url_get_phished (lua_State *L)
 gint
 luaopen_task (lua_State * L)
 {
-       lua_newclass (L, "rspamd{task}", tasklib_m);
-       luaL_openlib (L, "rspamd_task", null_reg, 0);
+       lua_newclass_full (L, "rspamd{task}", "rspamd_task", tasklib_m, tasklib_f);
+
+       lua_pop (L, 1);                      /* remove metatable from stack */
 
        return 1;
 }
@@ -1752,6 +1853,8 @@ luaopen_textpart (lua_State * L)
        lua_newclass (L, "rspamd{textpart}", textpartlib_m);
        luaL_openlib (L, "rspamd_textpart", null_reg, 0);
 
+       lua_pop (L, 1);                      /* remove metatable from stack */
+
        return 1;
 }
 
@@ -1761,6 +1864,8 @@ luaopen_image (lua_State * L)
        lua_newclass (L, "rspamd{image}", imagelib_m);
        luaL_openlib (L, "rspamd_image", null_reg, 0);
 
+       lua_pop (L, 1);                      /* remove metatable from stack */
+
        return 1;
 }
 
@@ -1770,6 +1875,8 @@ luaopen_url (lua_State * L)
        lua_newclass (L, "rspamd{url}", urllib_m);
        luaL_openlib (L, "rspamd_url", null_reg, 0);
 
+       lua_pop (L, 1);                      /* remove metatable from stack */
+
        return 1;
 }
 
index f9e74e02708b6bc12c72c534ea0d4cfa289aaaef..6780a756ab71c7e67829185050689272087fc688 100644 (file)
@@ -505,6 +505,8 @@ luaopen_upstream (lua_State * L)
        luaL_openlib (L, NULL, upstream_list_m, 0);
        luaL_openlib (L, "upstream_list", upstream_list_f, 0);
 
+       lua_pop (L, 1);                      /* remove metatable from stack */
+
        luaL_newmetatable (L, "rspamd{upstream}");
        lua_pushstring (L, "__index");
        lua_pushvalue (L, -2);
@@ -517,5 +519,7 @@ luaopen_upstream (lua_State * L)
        luaL_openlib (L, NULL, upstream_m, 0);
        luaL_openlib (L, "upstream", upstream_f, 0);
 
+       lua_pop (L, 1);                      /* remove metatable from stack */
+
        return 1;
 }
diff --git a/src/lua_worker.c b/src/lua_worker.c
new file mode 100644 (file)
index 0000000..b7daf7c
--- /dev/null
@@ -0,0 +1,516 @@
+/* Copyright (c) 2010-2012, Vsevolod Stakhov
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *       * Redistributions of source code must retain the above copyright
+ *         notice, this list of conditions and the following disclaimer.
+ *       * Redistributions in binary form must reproduce the above copyright
+ *         notice, this list of conditions and the following disclaimer in the
+ *         documentation and/or other materials provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED ''AS IS'' AND ANY
+ * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+ * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
+ * DISCLAIMED. IN NO EVENT SHALL AUTHOR BE LIABLE FOR ANY
+ * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
+ * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
+ * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
+ * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
+ * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+
+#include "config.h"
+#include "util.h"
+#include "main.h"
+#include "protocol.h"
+#include "upstream.h"
+#include "cfg_file.h"
+#include "cfg_xml.h"
+#include "url.h"
+#include "message.h"
+#include "map.h"
+#include "dns.h"
+
+#include "lua/lua_common.h"
+
+#ifdef WITH_GPERF_TOOLS
+#   include <glib/gprintf.h>
+#endif
+
+/* 60 seconds for worker's IO */
+#define DEFAULT_WORKER_IO_TIMEOUT 60000
+
+gpointer init_lua_worker (void);
+void start_lua_worker (struct rspamd_worker *worker);
+
+worker_t lua_worker = {
+       "lua",                                  /* Name */
+       init_lua_worker,                /* Init function */
+       start_lua_worker,               /* Start function */
+       TRUE,                                   /* Has socket */
+       FALSE,                                  /* Non unique */
+       FALSE,                                  /* Non threaded */
+       TRUE                                    /* Killable */
+};
+
+/*
+ * Worker's context
+ */
+struct rspamd_lua_worker_ctx {
+       /* DNS resolver */
+       struct rspamd_dns_resolver     *resolver;
+       /* Events base */
+       struct event_base              *ev_base;
+       /* Other params */
+       GHashTable                                         *params;
+       /* Lua script to load */
+       gchar                                              *file;
+       /* Lua state */
+       lua_State                                          *L;
+       /* Callback for accept */
+       gint                                                    cbref_accept;
+       /* Callback for finishing */
+       gint                                                    cbref_fin;
+       /* Config file */
+       struct config_file                         *cfg;
+};
+
+/* Lua bindings */
+LUA_FUNCTION_DEF (worker, get_ev_base);
+LUA_FUNCTION_DEF (worker, register_accept_callback);
+LUA_FUNCTION_DEF (worker, register_exit_callback);
+LUA_FUNCTION_DEF (worker, get_option);
+LUA_FUNCTION_DEF (worker, get_resolver);
+LUA_FUNCTION_DEF (worker, get_cfg);
+
+static const struct luaL_reg    lua_workerlib_m[] = {
+       LUA_INTERFACE_DEF (worker, get_ev_base),
+       LUA_INTERFACE_DEF (worker, register_accept_callback),
+       LUA_INTERFACE_DEF (worker, register_exit_callback),
+       LUA_INTERFACE_DEF (worker, get_option),
+       LUA_INTERFACE_DEF (worker, get_resolver),
+       LUA_INTERFACE_DEF (worker, get_cfg),
+       {"__tostring", lua_class_tostring},
+       {NULL, NULL}
+};
+
+static sig_atomic_t             wanna_die = 0;
+
+/* Basic functions of LUA API for worker object */
+static gint
+luaopen_lua_worker (lua_State * L)
+{
+       lua_newclass (L, "rspamd{worker}", lua_workerlib_m);
+       luaL_openlib (L, "rspamd_worker", null_reg, 0);
+
+       lua_pop (L, 1);                      /* remove metatable from stack */
+
+       return 1;
+}
+
+struct rspamd_lua_worker_ctx      *
+lua_check_lua_worker (lua_State * L)
+{
+       void                                                            *ud = luaL_checkudata (L, 1, "rspamd{worker}");
+       luaL_argcheck (L, ud != NULL, 1, "'worker' expected");
+       return ud ? *((struct rspamd_lua_worker_ctx **)ud) : NULL;
+}
+
+static int
+lua_worker_get_ev_base (lua_State *L)
+{
+       struct rspamd_lua_worker_ctx                                    *ctx = lua_check_lua_worker (L);
+       struct event_base                                                               **pbase;
+
+       if (ctx) {
+               pbase = lua_newuserdata (L, sizeof (struct event_base *));
+               lua_setclass (L, "rspamd{ev_base}", -1);
+               *pbase = ctx->ev_base;
+       }
+       else {
+               lua_pushnil (L);
+       }
+
+       return 1;
+}
+
+static int
+lua_worker_register_accept_callback (lua_State *L)
+{
+       struct rspamd_lua_worker_ctx                                    *ctx = lua_check_lua_worker (L);
+
+       if (ctx) {
+               if (!lua_isfunction (L, 2)) {
+                       msg_err ("invalid callback passed");
+                       lua_pushnil (L);
+               }
+               else {
+                       lua_pushvalue (L, 2);
+                       ctx->cbref_accept = luaL_ref (L, LUA_REGISTRYINDEX);
+                       return 0;
+               }
+       }
+       else {
+               lua_pushnil (L);
+       }
+
+       return 1;
+}
+
+static int
+lua_worker_register_exit_callback (lua_State *L)
+{
+       struct rspamd_lua_worker_ctx                                    *ctx = lua_check_lua_worker (L);
+
+       if (ctx) {
+               if (!lua_isfunction (L, 2)) {
+                       msg_err ("invalid callback passed");
+                       lua_pushnil (L);
+               }
+               else {
+                       lua_pushvalue (L, 2);
+                       ctx->cbref_fin = luaL_ref (L, LUA_REGISTRYINDEX);
+                       return 0;
+               }
+       }
+       else {
+               lua_pushnil (L);
+       }
+
+       return 1;
+}
+
+static int
+lua_worker_get_option (lua_State *L)
+{
+       struct rspamd_lua_worker_ctx                                    *ctx = lua_check_lua_worker (L);
+       GList                                                                                   *val;
+       gint                                                                                     i;
+       const gchar                                                                             *name;
+
+       if (ctx) {
+               name = luaL_checkstring (L, 2);
+               if (name == NULL) {
+                       msg_err ("no name specified");
+                       lua_pushnil (L);
+               }
+               else {
+                       val = g_hash_table_lookup (ctx->params, name);
+                       if (val == NULL) {
+                               lua_pushnil (L);
+                       }
+                       else {
+                               /* Push the array */
+                               i = 1;
+                               lua_newtable (L);
+                               while (val) {
+                                       lua_pushstring (L, val->data);
+                                       lua_rawseti (L, -2, i++);
+                                       val = g_list_next (val);
+                               }
+                       }
+               }
+       }
+       else {
+               lua_pushnil (L);
+       }
+
+       return 1;
+}
+
+static int
+lua_worker_get_resolver (lua_State *L)
+{
+       struct rspamd_lua_worker_ctx                                    *ctx = lua_check_lua_worker (L);
+
+       if (ctx) {
+               /* XXX: implement resolver API */
+       }
+       else {
+               lua_pushnil (L);
+       }
+
+       return 1;
+}
+
+static int
+lua_worker_get_cfg (lua_State *L)
+{
+       struct rspamd_lua_worker_ctx                                    *ctx = lua_check_lua_worker (L);
+       struct config_file                                                              **pcfg;
+
+       if (ctx) {
+               pcfg = lua_newuserdata (L, sizeof (gpointer));
+               lua_setclass (L, "rspamd{config}", -1);
+               *pcfg = ctx->cfg;
+       }
+       else {
+               lua_pushnil (L);
+       }
+
+       return 1;
+}
+
+/* End of lua API */
+/* Signal handlers */
+
+#ifndef HAVE_SA_SIGINFO
+static void
+sig_handler (gint signo)
+#else
+static void
+sig_handler (gint signo, siginfo_t * info, void *unused)
+#endif
+{
+       struct timeval                  tv;
+
+       switch (signo) {
+       case SIGINT:
+       case SIGTERM:
+               if (!wanna_die) {
+                       wanna_die = 1;
+                       tv.tv_sec = 0;
+                       tv.tv_usec = 0;
+                       event_loopexit (&tv);
+
+#ifdef WITH_GPERF_TOOLS
+                       ProfilerStop ();
+#endif
+               }
+               break;
+       }
+}
+
+/*
+ * Config reload is designed by sending sigusr2 to active workers and pending shutdown of them
+ */
+static void
+sigusr2_handler (gint fd, short what, void *arg)
+{
+       struct rspamd_worker           *worker = (struct rspamd_worker *) arg;
+       /* Do not accept new connections, preparing to end worker's process */
+       struct timeval                  tv;
+
+       if (!wanna_die) {
+               tv.tv_sec = SOFT_SHUTDOWN_TIME;
+               tv.tv_usec = 0;
+               event_del (&worker->sig_ev_usr1);
+               event_del (&worker->sig_ev_usr2);
+               event_del (&worker->bind_ev);
+               msg_info ("worker's shutdown is pending in %d sec", SOFT_SHUTDOWN_TIME);
+               event_loopexit (&tv);
+       }
+       return;
+}
+
+/*
+ * Reopen log is designed by sending sigusr1 to active workers and pending shutdown of them
+ */
+static void
+sigusr1_handler (gint fd, short what, void *arg)
+{
+       struct rspamd_worker           *worker = (struct rspamd_worker *) arg;
+
+       reopen_log (worker->srv->logger);
+
+       return;
+}
+
+static gboolean
+handle_lua_param (struct config_file *cfg, struct rspamd_xml_userdata *unused, GHashTable *attrs, gchar *data, gpointer user_data, gpointer dest_struct, gint offset)
+{
+       struct rspamd_lua_worker_ctx       *ctx = dest_struct;
+       GList                                                      *val;
+       gchar                                                      *tag = user_data;
+
+       val = g_hash_table_lookup (ctx->params, tag);
+       if (val == NULL) {
+               g_hash_table_insert (ctx->params, g_strdup (tag), g_list_prepend (NULL, data));
+       }
+       else {
+               val = g_list_append (val, data);
+       }
+
+       return TRUE;
+}
+
+
+/*
+ * Accept new connection and construct task
+ */
+static void
+lua_accept_socket (gint fd, short what, void *arg)
+{
+       struct rspamd_worker           *worker = (struct rspamd_worker *) arg;
+       struct rspamd_lua_worker_ctx   *ctx, **pctx;
+       union sa_union                  su;
+       socklen_t                       addrlen = sizeof (su.ss);
+       gint                            nfd;
+       struct in_addr                              addr;
+       gchar                                              *addr_str = NULL;
+       lua_State                                          *L;
+
+       ctx = worker->ctx;
+       L = ctx->L;
+
+       if ((nfd =
+                       accept_from_socket (fd, (struct sockaddr *) &su.ss, &addrlen)) == -1) {
+               msg_warn ("accept failed: %s", strerror (errno));
+               return;
+       }
+       /* Check for EAGAIN */
+       if (nfd == 0){
+               return;
+       }
+
+       if (su.ss.ss_family == AF_UNIX) {
+               msg_info ("accepted connection from unix socket");
+               addr.s_addr = INADDR_NONE;
+               addr_str = "127.0.0.1";
+       }
+       else if (su.ss.ss_family == AF_INET) {
+               msg_info ("accepted connection from %s port %d",
+                               inet_ntoa (su.s4.sin_addr), ntohs (su.s4.sin_port));
+               memcpy (&addr, &su.s4.sin_addr,
+                               sizeof (struct in_addr));
+               addr_str = g_strdup (inet_ntoa (su.s4.sin_addr));
+       }
+
+       /* Call finalizer function */
+       lua_rawgeti (L, LUA_REGISTRYINDEX, ctx->cbref_accept);
+       pctx = lua_newuserdata (L, sizeof (gpointer));
+       lua_setclass (L, "rspamd{worker}", -1);
+       *pctx = ctx;
+       lua_pushinteger (L, nfd);
+       lua_pushstring (L, addr_str);
+       lua_pushinteger (L, addr.s_addr);
+
+
+       if (lua_pcall (L, 4, 0, 0) != 0) {
+               msg_info ("call to worker accept failed: %s", lua_tostring (L, -1));
+       }
+
+       if (addr_str) {
+               g_free (addr_str);
+       }
+}
+
+gpointer
+init_lua_worker (void)
+{
+       struct rspamd_lua_worker_ctx       *ctx;
+       GQuark                                                          type;
+
+       type = g_quark_try_string ("lua");
+
+       ctx = g_malloc0 (sizeof (struct rspamd_lua_worker_ctx));
+       ctx->params = g_hash_table_new_full (g_str_hash, g_str_equal, g_free, (GDestroyNotify)g_list_free);
+
+
+       register_worker_opt (type, "file", xml_handle_string, ctx, G_STRUCT_OFFSET (struct rspamd_lua_worker_ctx, file));
+       register_worker_opt (type, "*", handle_lua_param, ctx, 0);
+
+       return ctx;
+}
+
+/*
+ * Start worker process
+ */
+void
+start_lua_worker (struct rspamd_worker *worker)
+{
+       struct sigaction                signals;
+       struct rspamd_lua_worker_ctx   *ctx = worker->ctx, **pctx;
+       lua_State                                          *L;
+
+#ifdef WITH_PROFILER
+       extern void                     _start (void), etext (void);
+       monstartup ((u_long) & _start, (u_long) & etext);
+#endif
+
+       gperf_profiler_init (worker->srv->cfg, "lua_worker");
+
+       worker->srv->pid = getpid ();
+
+       ctx->ev_base = event_init ();
+
+       L = worker->srv->cfg->lua_state;
+       ctx->L = L;
+       ctx->cfg = worker->srv->cfg;
+
+       init_signals (&signals, sig_handler);
+       sigprocmask (SIG_UNBLOCK, &signals.sa_mask, NULL);
+
+       /* SIGUSR2 handler */
+       signal_set (&worker->sig_ev_usr2, SIGUSR2, sigusr2_handler, (void *) worker);
+       event_base_set (ctx->ev_base, &worker->sig_ev_usr2);
+       signal_add (&worker->sig_ev_usr2, NULL);
+
+       /* SIGUSR1 handler */
+       signal_set (&worker->sig_ev_usr1, SIGUSR1, sigusr1_handler, (void *) worker);
+       event_base_set (ctx->ev_base, &worker->sig_ev_usr1);
+       signal_add (&worker->sig_ev_usr1, NULL);
+
+       /* Accept event */
+       event_set (&worker->bind_ev, worker->cf->listen_sock, EV_READ | EV_PERSIST,
+                       lua_accept_socket, (void *) worker);
+       event_base_set (ctx->ev_base, &worker->bind_ev);
+       event_add (&worker->bind_ev, NULL);
+
+       /* Open worker's lib */
+       luaopen_lua_worker (L);
+
+       if (ctx->file == NULL) {
+               msg_err ("No lua script defined, so no reason to exist");
+               exit (EXIT_SUCCESS);
+       }
+       if (access (ctx->file, R_OK) == -1) {
+               msg_err ("Error reading lua script %s: %s", ctx->file, strerror (errno));
+               exit (EXIT_SUCCESS);
+       }
+
+       pctx = lua_newuserdata (L, sizeof (gpointer));
+       lua_setclass (L, "rspamd{worker}", -1);
+       lua_setglobal (L, "rspamd_worker");
+       *pctx = ctx;
+
+       if (luaL_dofile (L, ctx->file) != 0) {
+               msg_err ("Error executing lua script %s: %s", ctx->file, lua_tostring (L, -1));
+               exit (EXIT_SUCCESS);
+       }
+
+       if (ctx->cbref_accept == 0) {
+               msg_err ("No accept function defined, so no reason to exist");
+               exit (EXIT_SUCCESS);
+       }
+
+       /* Maps events */
+       start_map_watch (ctx->ev_base);
+       ctx->resolver = dns_resolver_init (ctx->ev_base, worker->srv->cfg);
+
+       event_base_loop (ctx->ev_base, 0);
+       luaL_unref (L, LUA_REGISTRYINDEX, ctx->cbref_accept);
+       if (ctx->cbref_fin != 0) {
+               /* Call finalizer function */
+               lua_rawgeti (L, LUA_REGISTRYINDEX, ctx->cbref_fin);
+               pctx = lua_newuserdata (L, sizeof (gpointer));
+               lua_setclass (L, "rspamd{worker}", -1);
+               *pctx = ctx;
+               if (lua_pcall (L, 1, 0, 0) != 0) {
+                       msg_info ("call to worker finalizer failed: %s", lua_tostring (L, -1));
+               }
+               /* Free resources */
+               luaL_unref (L, LUA_REGISTRYINDEX, ctx->cbref_fin);
+       }
+
+       close_log (rspamd_main->logger);
+       exit (EXIT_SUCCESS);
+}
+
+/*
+ * vi:ts=4
+ */
+
index ba44444850860013eb22c838f0d26b1a2c10cc2e..976fe58c3cb3d022f67225b97b28739e75341b06 100644 (file)
@@ -52,7 +52,9 @@ construct_task (struct rspamd_worker *worker)
 
        new_task->worker = worker;
        new_task->state = READ_COMMAND;
-       new_task->cfg = worker->srv->cfg;
+       if (worker) {
+               new_task->cfg = worker->srv->cfg;
+       }
        new_task->view_checked = FALSE;
 #ifdef HAVE_CLOCK_GETTIME
 # ifdef HAVE_CLOCK_PROCESS_CPUTIME_ID