]> source.dussan.org Git - rspamd.git/commitdiff
* Add lua_buffer bindings to async buffered rspamd API.
authorVsevolod Stakhov <vsevolod@rambler-co.ru>
Tue, 24 Jul 2012 16:37:51 +0000 (20:37 +0400)
committerVsevolod Stakhov <vsevolod@rambler-co.ru>
Tue, 24 Jul 2012 16:37:51 +0000 (20:37 +0400)
src/lua/CMakeLists.txt
src/lua/lua_buffer.c [new file with mode: 0644]
src/lua/lua_session.c
src/lua/lua_task.c

index 93958674bae56b6e14197c29e906789d4a503ec1..92cf3df4b7f3d2a3f2ca227923f6f3a6786bdcdd 100644 (file)
@@ -12,7 +12,8 @@ SET(LUASRC                      lua_common.c
                                          lua_redis.c
                                          lua_upstream.c
                                          lua_mempool.c
-                                         lua_session.c)
+                                         lua_session.c
+                                         lua_buffer.c)
 
 ADD_LIBRARY(rspamd-lua ${LINK_TYPE} ${LUASRC})
 SET_TARGET_PROPERTIES(rspamd-lua PROPERTIES VERSION ${RSPAMD_VERSION})
diff --git a/src/lua/lua_buffer.c b/src/lua/lua_buffer.c
new file mode 100644 (file)
index 0000000..bb5b1ea
--- /dev/null
@@ -0,0 +1,384 @@
+/* Copyright (c) 2010-2012, Vsevolod Stakhov
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *       * Redistributions of source code must retain the above copyright
+ *         notice, this list of conditions and the following disclaimer.
+ *       * Redistributions in binary form must reproduce the above copyright
+ *         notice, this list of conditions and the following disclaimer in the
+ *         documentation and/or other materials provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED ''AS IS'' AND ANY
+ * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+ * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
+ * DISCLAIMED. IN NO EVENT SHALL AUTHOR BE LIABLE FOR ANY
+ * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
+ * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
+ * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
+ * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
+ * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#include "lua_common.h"
+#include "buffer.h"
+
+/* Public prototypes */
+struct rspamd_io_dispatcher_s *lua_check_io_dispatcher (lua_State * L);
+gint luaopen_io_dispatcher (lua_State * L);
+
+/* Lua bindings */
+LUA_FUNCTION_DEF (io_dispatcher, create);
+LUA_FUNCTION_DEF (io_dispatcher, set_policy);
+LUA_FUNCTION_DEF (io_dispatcher, write);
+LUA_FUNCTION_DEF (io_dispatcher, pause);
+LUA_FUNCTION_DEF (io_dispatcher, restore);
+LUA_FUNCTION_DEF (io_dispatcher, delete);
+
+static const struct luaL_reg    io_dispatcherlib_m[] = {
+       LUA_INTERFACE_DEF (io_dispatcher, set_policy),
+       LUA_INTERFACE_DEF (io_dispatcher, write),
+       LUA_INTERFACE_DEF (io_dispatcher, pause),
+       LUA_INTERFACE_DEF (io_dispatcher, restore),
+       {"__gc", lua_io_dispatcher_delete},
+       {"__tostring", lua_class_tostring},
+       {NULL, NULL}
+};
+
+static const struct luaL_reg    io_dispatcherlib_f[] = {
+       LUA_INTERFACE_DEF (io_dispatcher, create),
+       {NULL, NULL}
+};
+
+struct lua_dispatcher_cbdata {
+       lua_State *L;
+       rspamd_io_dispatcher_t *d;
+       struct event_base *base;
+       gint cbref_read;
+       gint cbref_write;
+       gint cbref_err;
+};
+
+struct rspamd_io_dispatcher_s      *
+lua_check_io_dispatcher (lua_State * L)
+{
+       void                                                            *ud = luaL_checkudata (L, 1, "rspamd{io_dispatcher}");
+       luaL_argcheck (L, ud != NULL, 1, "'io_dispatcher' expected");
+       return ud ? *((struct rspamd_io_dispatcher_s **)ud) : NULL;
+}
+
+struct event_base *
+lua_check_event_base (lua_State *L)
+{
+       void                                                            *ud = luaL_checkudata (L, 1, "rspamd{ev_base}");
+       luaL_argcheck (L, ud != NULL, 1, "'ev_base' expected");
+       return ud ? *((struct event_base **)ud) : NULL;
+}
+
+/* Dispatcher callbacks */
+
+static                          gboolean
+lua_io_read_cb (f_str_t * in, void *arg)
+{
+       struct lua_dispatcher_cbdata                                    *cbdata = arg;
+       gboolean                                                                                 need_unlock = FALSE, res;
+       rspamd_io_dispatcher_t                                                  **pdispatcher;
+
+       /* Avoid LOR here as mutex can be acquired before in lua_call */
+       if (g_mutex_trylock (lua_mtx)) {
+               need_unlock = TRUE;
+       }
+       /* callback (dispatcher, data) */
+       pdispatcher = lua_newuserdata (cbdata->L, sizeof (struct rspamd_io_dispatcher_s *));
+       lua_setclass (cbdata->L, "rspamd{io_dispatcher}", -1);
+       *pdispatcher = cbdata->d;
+       lua_pushlstring (cbdata->L, in->begin, in->len);
+
+       lua_rawgeti (cbdata->L, LUA_REGISTRYINDEX, cbdata->cbref_read);
+       if (lua_pcall (cbdata->L, 2, 1, 0) != 0) {
+               msg_info ("call to session finalizer failed: %s", lua_tostring (cbdata->L, -1));
+       }
+
+       res = lua_toboolean (cbdata->L, 1);
+       lua_pop (cbdata->L, 1);
+
+       if (need_unlock) {
+               g_mutex_unlock (lua_mtx);
+       }
+
+       if (!res) {
+               /* Unref callbacks */
+               luaL_unref (cbdata->L, LUA_REGISTRYINDEX, cbdata->cbref_read);
+               if (cbdata->cbref_write) {
+                       luaL_unref (cbdata->L, LUA_REGISTRYINDEX, cbdata->cbref_write);
+               }
+               luaL_unref (cbdata->L, LUA_REGISTRYINDEX, cbdata->cbref_err);
+               g_slice_free1 (sizeof (struct lua_dispatcher_cbdata), cbdata);
+       }
+
+       return res;
+}
+
+static                          gboolean
+lua_io_write_cb (void *arg)
+{
+       struct lua_dispatcher_cbdata                                    *cbdata = arg;
+       gboolean                                                                                 need_unlock = FALSE, res;
+       rspamd_io_dispatcher_t                                                  **pdispatcher;
+
+       if (cbdata->cbref_write) {
+               /* Avoid LOR here as mutex can be acquired before in lua_call */
+               if (g_mutex_trylock (lua_mtx)) {
+                       need_unlock = TRUE;
+               }
+               /* callback (dispatcher) */
+               pdispatcher = lua_newuserdata (cbdata->L, sizeof (struct rspamd_io_dispatcher_s *));
+               lua_setclass (cbdata->L, "rspamd{io_dispatcher}", -1);
+               *pdispatcher = cbdata->d;
+
+               lua_rawgeti (cbdata->L, LUA_REGISTRYINDEX, cbdata->cbref_read);
+               if (lua_pcall (cbdata->L, 1, 1, 0) != 0) {
+                       msg_info ("call to session finalizer failed: %s", lua_tostring (cbdata->L, -1));
+               }
+
+               res = lua_toboolean (cbdata->L, 1);
+               lua_pop (cbdata->L, 1);
+
+               if (need_unlock) {
+                       g_mutex_unlock (lua_mtx);
+               }
+
+               if (!res) {
+                       /* Unref callbacks */
+                       luaL_unref (cbdata->L, LUA_REGISTRYINDEX, cbdata->cbref_read);
+                       luaL_unref (cbdata->L, LUA_REGISTRYINDEX, cbdata->cbref_write);
+                       luaL_unref (cbdata->L, LUA_REGISTRYINDEX, cbdata->cbref_err);
+                       g_slice_free1 (sizeof (struct lua_dispatcher_cbdata), cbdata);
+               }
+       }
+
+       return res;
+}
+
+static void
+lua_io_err_cb (GError * err, void *arg)
+{
+       struct lua_dispatcher_cbdata                                    *cbdata = arg;
+       gboolean                                                                                 need_unlock = FALSE;
+       rspamd_io_dispatcher_t                                                  **pdispatcher;
+
+       /* Avoid LOR here as mutex can be acquired before in lua_call */
+       if (g_mutex_trylock (lua_mtx)) {
+               need_unlock = TRUE;
+       }
+       /* callback (dispatcher, err) */
+       pdispatcher = lua_newuserdata (cbdata->L, sizeof (struct rspamd_io_dispatcher_s *));
+       lua_setclass (cbdata->L, "rspamd{io_dispatcher}", -1);
+       *pdispatcher = cbdata->d;
+       lua_pushstring (cbdata->L, err->message);
+
+       lua_rawgeti (cbdata->L, LUA_REGISTRYINDEX, cbdata->cbref_read);
+       if (lua_pcall (cbdata->L, 2, 0, 0) != 0) {
+               msg_info ("call to session finalizer failed: %s", lua_tostring (cbdata->L, -1));
+       }
+
+       if (need_unlock) {
+               g_mutex_unlock (lua_mtx);
+       }
+       /* Unref callbacks */
+       luaL_unref (cbdata->L, LUA_REGISTRYINDEX, cbdata->cbref_read);
+       if (cbdata->cbref_write) {
+               luaL_unref (cbdata->L, LUA_REGISTRYINDEX, cbdata->cbref_write);
+       }
+       luaL_unref (cbdata->L, LUA_REGISTRYINDEX, cbdata->cbref_err);
+
+       g_error_free (err);
+       g_slice_free1 (sizeof (struct lua_dispatcher_cbdata), cbdata);
+}
+
+/*
+ * rspamd_dispatcher.create(base,fd, read_cb, write_cb, err_cb[, timeout])
+ */
+static int
+lua_io_dispatcher_create (lua_State *L)
+{
+       struct rspamd_io_dispatcher_s                                   *io_dispatcher, **pdispatcher;
+       gint                                                                                     fd;
+       struct lua_dispatcher_cbdata                                    *cbdata;
+       struct timeval                                                                   tv = {0, 0};
+       double                                                                                   tv_num, tmp;
+
+       if (lua_gettop (L) >= 5 && lua_isfunction (L, 3) && lua_isfunction (L, 5)) {
+               cbdata = g_slice_alloc0 (sizeof (struct lua_dispatcher_cbdata));
+               cbdata->base = lua_check_event_base (L);
+               if (cbdata->base == NULL) {
+                       /* Create new event base */
+                       msg_warn ("create new event base as it is not specified");
+                       cbdata->base = event_init ();
+               }
+               cbdata->L = L;
+               fd = lua_tointeger (L, 2);
+               lua_pushvalue (L, 3);
+               cbdata->cbref_read = luaL_ref (L, LUA_REGISTRYINDEX);
+               if (lua_isfunction (L, 4)) {
+                       /* Push write callback as well */
+                       lua_pushvalue (L, 4);
+                       cbdata->cbref_write = luaL_ref (L, LUA_REGISTRYINDEX);
+               }
+               /* Error callback */
+               lua_pushvalue (L, 5);
+               cbdata->cbref_err = luaL_ref (L, LUA_REGISTRYINDEX);
+
+               if (lua_gettop (L) > 5) {
+                       tv_num = lua_tonumber (L, 6);
+                       tv.tv_sec = trunc (tv_num);
+                       tv.tv_usec = modf (tv_num, &tmp) * 1000.;
+               }
+               io_dispatcher = rspamd_create_dispatcher (cbdata->base, fd, BUFFER_LINE, lua_io_read_cb, lua_io_write_cb, lua_io_err_cb, &tv, cbdata);
+               cbdata->d = io_dispatcher;
+               /* Push result */
+               pdispatcher = lua_newuserdata (L, sizeof (struct rspamd_io_dispatcher_s *));
+               lua_setclass (L, "rspamd{io_dispatcher}", -1);
+               *pdispatcher = io_dispatcher;
+       }
+       else {
+               msg_err ("invalid number of arguments to io_dispatcher.create: %d", lua_gettop (L));
+               lua_pushnil (L);
+       }
+
+       return 1;
+}
+
+static int
+lua_io_dispatcher_set_policy (lua_State *L)
+{
+       struct rspamd_io_dispatcher_s                                   *io_dispatcher = lua_check_io_dispatcher (L);
+       gint                                                                                     policy, limit = -1;
+
+       if (io_dispatcher) {
+               policy = lua_tonumber (L, 2);
+               if (policy > BUFFER_ANY || policy < BUFFER_LINE) {
+                       msg_err ("invalid policy: %d", policy);
+               }
+               else {
+                       if (lua_gettop (L) > 2) {
+                               limit = lua_tonumber (L, 3);
+                       }
+                       rspamd_set_dispatcher_policy (io_dispatcher, policy, limit);
+                       return 0;
+               }
+       }
+       else {
+               lua_pushnil (L);
+       }
+
+       return 1;
+}
+
+static int
+lua_io_dispatcher_write (lua_State *L)
+{
+       struct rspamd_io_dispatcher_s                                   *io_dispatcher = lua_check_io_dispatcher (L);
+       gboolean                                                                                 delayed = FALSE, res;
+       const gchar                                                                             *data;
+       size_t                                                                                   len;
+
+       if (io_dispatcher) {
+               if (lua_gettop (L) < 2) {
+                       msg_err ("invalid number of arguments to io_dispatcher.create: %d", lua_gettop (L));
+                       lua_pushboolean (L, FALSE);
+               }
+               else {
+                       data = lua_tolstring (L, 2, &len);
+                       if (lua_gettop (L) > 2) {
+                               delayed = lua_toboolean (L, 3);
+                       }
+                       res = rspamd_dispatcher_write (io_dispatcher, (void *)data, len, delayed, FALSE);
+                       lua_pushboolean (L, res);
+               }
+       }
+       else {
+               lua_pushnil (L);
+       }
+
+       return 1;
+}
+
+static int
+lua_io_dispatcher_pause (lua_State *L)
+{
+       struct rspamd_io_dispatcher_s                                   *io_dispatcher = lua_check_io_dispatcher (L);
+
+       if (io_dispatcher) {
+               rspamd_dispatcher_pause (io_dispatcher);
+               return 0;
+       }
+       else {
+               lua_pushnil (L);
+       }
+
+       return 1;
+}
+
+static int
+lua_io_dispatcher_restore (lua_State *L)
+{
+       struct rspamd_io_dispatcher_s                                   *io_dispatcher = lua_check_io_dispatcher (L);
+
+       if (io_dispatcher) {
+               rspamd_dispatcher_restore (io_dispatcher);
+               return 0;
+       }
+       else {
+               lua_pushnil (L);
+       }
+
+       return 1;
+}
+
+static int
+lua_io_dispatcher_delete (lua_State *L)
+{
+       struct rspamd_io_dispatcher_s                                   *io_dispatcher = lua_check_io_dispatcher (L);
+
+       if (io_dispatcher) {
+               rspamd_remove_dispatcher (io_dispatcher);
+               return 0;
+       }
+       else {
+               lua_pushnil (L);
+       }
+
+       return 1;
+}
+
+
+gint
+luaopen_io_dispatcher (lua_State * L)
+{
+       luaL_newmetatable (L, "rspamd{io_dispatcher}");
+       lua_pushstring (L, "__index");
+       lua_pushvalue (L, -2);
+       lua_settable (L, -3);
+
+       lua_pushstring (L, "class");
+       lua_pushstring (L, "rspamd{io_dispatcher}");
+       lua_rawset (L, -3);
+
+       luaL_openlib (L, NULL, io_dispatcherlib_m, 0);
+       luaL_openlib(L, "rspamd_io_dispatcher", io_dispatcherlib_f, 0);
+
+       /* Simple event class */
+       lua_newclass (L, "rspamd{ev_base}", null_reg);
+       luaL_openlib (L, "rspamd_ev_base", null_reg, 0);
+
+       /* Set buffer types globals */
+       lua_pushnumber (L, BUFFER_LINE);
+       lua_setglobal (L, "IO_BUFFER_LINE");
+       lua_pushnumber (L, BUFFER_CHARACTER);
+       lua_setglobal (L, "IO_BUFFER_CHARACTER");
+       lua_pushnumber (L, BUFFER_ANY);
+       lua_setglobal (L, "IO_BUFFER_ANY");
+       return 1;       
+}
index 37a41964ec8a21bef9b55b5b8fdfe706ac292a69..5ccfc76d3261c16e175894414c04f81556868323 100644 (file)
@@ -157,6 +157,8 @@ lua_session_cleanup (gpointer ud)
        }
 }
 
+
+
 static int
 lua_session_create (lua_State *L)
 {
@@ -207,7 +209,7 @@ lua_session_create (lua_State *L)
        }
        session = new_async_session (mempool, lua_session_finalizer, lua_session_restore, lua_session_cleanup, cbdata);
        cbdata->session = session;
-       psession = lua_newuserdata (L, sizeof (struct real_name *));
+       psession = lua_newuserdata (L, sizeof (struct rspamd_async_session *));
        lua_setclass (L, "rspamd{session}", -1);
        *psession = session;
 
index 3ef88e3906e156e57d60f671b50483a35e606ace..3aa2045977f09fb1d25f740c4fb178860a7374b3 100644 (file)
@@ -45,6 +45,7 @@ extern stat_file_t* get_statfile_by_symbol (statfile_pool_t *pool, struct classi
 /* Task methods */
 LUA_FUNCTION_DEF (task, get_message);
 LUA_FUNCTION_DEF (task, get_mempool);
+LUA_FUNCTION_DEF (task, get_ev_base);
 LUA_FUNCTION_DEF (task, insert_result);
 LUA_FUNCTION_DEF (task, set_pre_result);
 LUA_FUNCTION_DEF (task, get_urls);
@@ -79,6 +80,7 @@ LUA_FUNCTION_DEF (task, learn_statfile);
 static const struct luaL_reg    tasklib_m[] = {
        LUA_INTERFACE_DEF (task, get_message),
        LUA_INTERFACE_DEF (task, get_mempool),
+       LUA_INTERFACE_DEF (task, get_ev_base),
        LUA_INTERFACE_DEF (task, insert_result),
        LUA_INTERFACE_DEF (task, set_pre_result),
        LUA_INTERFACE_DEF (task, get_urls),
@@ -230,6 +232,20 @@ lua_task_get_mempool (lua_State * L)
        return 1;
 }
 
+static int
+lua_task_get_ev_base (lua_State * L)
+{
+       struct event_base             **pbase;
+       struct worker_task             *task = lua_check_task (L);
+
+       if (task != NULL) {
+               pbase = lua_newuserdata (L, sizeof (struct event_base *));
+               lua_setclass (L, "rspamd{ev_base}", -1);
+               *pbase = task->ev_base;
+       }
+       return 1;
+}
+
 static gint
 lua_task_insert_result (lua_State * L)
 {