From: Vsevolod Stakhov Date: Tue, 24 Jul 2012 16:37:51 +0000 (+0400) Subject: * Add lua_buffer bindings to async buffered rspamd API. X-Git-Tag: 0.5.1~15 X-Git-Url: https://source.dussan.org/?a=commitdiff_plain;h=0d35a8f9cb4942e84d3e26ca4d010393a56e2904;p=rspamd.git * Add lua_buffer bindings to async buffered rspamd API. --- diff --git a/src/lua/CMakeLists.txt b/src/lua/CMakeLists.txt index 93958674b..92cf3df4b 100644 --- a/src/lua/CMakeLists.txt +++ b/src/lua/CMakeLists.txt @@ -12,7 +12,8 @@ SET(LUASRC lua_common.c lua_redis.c lua_upstream.c lua_mempool.c - lua_session.c) + lua_session.c + lua_buffer.c) ADD_LIBRARY(rspamd-lua ${LINK_TYPE} ${LUASRC}) SET_TARGET_PROPERTIES(rspamd-lua PROPERTIES VERSION ${RSPAMD_VERSION}) diff --git a/src/lua/lua_buffer.c b/src/lua/lua_buffer.c new file mode 100644 index 000000000..bb5b1ea8c --- /dev/null +++ b/src/lua/lua_buffer.c @@ -0,0 +1,384 @@ +/* Copyright (c) 2010-2012, Vsevolod Stakhov + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED ''AS IS'' AND ANY + * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED + * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE + * DISCLAIMED. IN NO EVENT SHALL AUTHOR BE LIABLE FOR ANY + * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES + * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; + * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND + * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS + * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +#include "lua_common.h" +#include "buffer.h" + +/* Public prototypes */ +struct rspamd_io_dispatcher_s *lua_check_io_dispatcher (lua_State * L); +gint luaopen_io_dispatcher (lua_State * L); + +/* Lua bindings */ +LUA_FUNCTION_DEF (io_dispatcher, create); +LUA_FUNCTION_DEF (io_dispatcher, set_policy); +LUA_FUNCTION_DEF (io_dispatcher, write); +LUA_FUNCTION_DEF (io_dispatcher, pause); +LUA_FUNCTION_DEF (io_dispatcher, restore); +LUA_FUNCTION_DEF (io_dispatcher, delete); + +static const struct luaL_reg io_dispatcherlib_m[] = { + LUA_INTERFACE_DEF (io_dispatcher, set_policy), + LUA_INTERFACE_DEF (io_dispatcher, write), + LUA_INTERFACE_DEF (io_dispatcher, pause), + LUA_INTERFACE_DEF (io_dispatcher, restore), + {"__gc", lua_io_dispatcher_delete}, + {"__tostring", lua_class_tostring}, + {NULL, NULL} +}; + +static const struct luaL_reg io_dispatcherlib_f[] = { + LUA_INTERFACE_DEF (io_dispatcher, create), + {NULL, NULL} +}; + +struct lua_dispatcher_cbdata { + lua_State *L; + rspamd_io_dispatcher_t *d; + struct event_base *base; + gint cbref_read; + gint cbref_write; + gint cbref_err; +}; + +struct rspamd_io_dispatcher_s * +lua_check_io_dispatcher (lua_State * L) +{ + void *ud = luaL_checkudata (L, 1, "rspamd{io_dispatcher}"); + luaL_argcheck (L, ud != NULL, 1, "'io_dispatcher' expected"); + return ud ? *((struct rspamd_io_dispatcher_s **)ud) : NULL; +} + +struct event_base * +lua_check_event_base (lua_State *L) +{ + void *ud = luaL_checkudata (L, 1, "rspamd{ev_base}"); + luaL_argcheck (L, ud != NULL, 1, "'ev_base' expected"); + return ud ? *((struct event_base **)ud) : NULL; +} + +/* Dispatcher callbacks */ + +static gboolean +lua_io_read_cb (f_str_t * in, void *arg) +{ + struct lua_dispatcher_cbdata *cbdata = arg; + gboolean need_unlock = FALSE, res; + rspamd_io_dispatcher_t **pdispatcher; + + /* Avoid LOR here as mutex can be acquired before in lua_call */ + if (g_mutex_trylock (lua_mtx)) { + need_unlock = TRUE; + } + /* callback (dispatcher, data) */ + pdispatcher = lua_newuserdata (cbdata->L, sizeof (struct rspamd_io_dispatcher_s *)); + lua_setclass (cbdata->L, "rspamd{io_dispatcher}", -1); + *pdispatcher = cbdata->d; + lua_pushlstring (cbdata->L, in->begin, in->len); + + lua_rawgeti (cbdata->L, LUA_REGISTRYINDEX, cbdata->cbref_read); + if (lua_pcall (cbdata->L, 2, 1, 0) != 0) { + msg_info ("call to session finalizer failed: %s", lua_tostring (cbdata->L, -1)); + } + + res = lua_toboolean (cbdata->L, 1); + lua_pop (cbdata->L, 1); + + if (need_unlock) { + g_mutex_unlock (lua_mtx); + } + + if (!res) { + /* Unref callbacks */ + luaL_unref (cbdata->L, LUA_REGISTRYINDEX, cbdata->cbref_read); + if (cbdata->cbref_write) { + luaL_unref (cbdata->L, LUA_REGISTRYINDEX, cbdata->cbref_write); + } + luaL_unref (cbdata->L, LUA_REGISTRYINDEX, cbdata->cbref_err); + g_slice_free1 (sizeof (struct lua_dispatcher_cbdata), cbdata); + } + + return res; +} + +static gboolean +lua_io_write_cb (void *arg) +{ + struct lua_dispatcher_cbdata *cbdata = arg; + gboolean need_unlock = FALSE, res; + rspamd_io_dispatcher_t **pdispatcher; + + if (cbdata->cbref_write) { + /* Avoid LOR here as mutex can be acquired before in lua_call */ + if (g_mutex_trylock (lua_mtx)) { + need_unlock = TRUE; + } + /* callback (dispatcher) */ + pdispatcher = lua_newuserdata (cbdata->L, sizeof (struct rspamd_io_dispatcher_s *)); + lua_setclass (cbdata->L, "rspamd{io_dispatcher}", -1); + *pdispatcher = cbdata->d; + + lua_rawgeti (cbdata->L, LUA_REGISTRYINDEX, cbdata->cbref_read); + if (lua_pcall (cbdata->L, 1, 1, 0) != 0) { + msg_info ("call to session finalizer failed: %s", lua_tostring (cbdata->L, -1)); + } + + res = lua_toboolean (cbdata->L, 1); + lua_pop (cbdata->L, 1); + + if (need_unlock) { + g_mutex_unlock (lua_mtx); + } + + if (!res) { + /* Unref callbacks */ + luaL_unref (cbdata->L, LUA_REGISTRYINDEX, cbdata->cbref_read); + luaL_unref (cbdata->L, LUA_REGISTRYINDEX, cbdata->cbref_write); + luaL_unref (cbdata->L, LUA_REGISTRYINDEX, cbdata->cbref_err); + g_slice_free1 (sizeof (struct lua_dispatcher_cbdata), cbdata); + } + } + + return res; +} + +static void +lua_io_err_cb (GError * err, void *arg) +{ + struct lua_dispatcher_cbdata *cbdata = arg; + gboolean need_unlock = FALSE; + rspamd_io_dispatcher_t **pdispatcher; + + /* Avoid LOR here as mutex can be acquired before in lua_call */ + if (g_mutex_trylock (lua_mtx)) { + need_unlock = TRUE; + } + /* callback (dispatcher, err) */ + pdispatcher = lua_newuserdata (cbdata->L, sizeof (struct rspamd_io_dispatcher_s *)); + lua_setclass (cbdata->L, "rspamd{io_dispatcher}", -1); + *pdispatcher = cbdata->d; + lua_pushstring (cbdata->L, err->message); + + lua_rawgeti (cbdata->L, LUA_REGISTRYINDEX, cbdata->cbref_read); + if (lua_pcall (cbdata->L, 2, 0, 0) != 0) { + msg_info ("call to session finalizer failed: %s", lua_tostring (cbdata->L, -1)); + } + + if (need_unlock) { + g_mutex_unlock (lua_mtx); + } + /* Unref callbacks */ + luaL_unref (cbdata->L, LUA_REGISTRYINDEX, cbdata->cbref_read); + if (cbdata->cbref_write) { + luaL_unref (cbdata->L, LUA_REGISTRYINDEX, cbdata->cbref_write); + } + luaL_unref (cbdata->L, LUA_REGISTRYINDEX, cbdata->cbref_err); + + g_error_free (err); + g_slice_free1 (sizeof (struct lua_dispatcher_cbdata), cbdata); +} + +/* + * rspamd_dispatcher.create(base,fd, read_cb, write_cb, err_cb[, timeout]) + */ +static int +lua_io_dispatcher_create (lua_State *L) +{ + struct rspamd_io_dispatcher_s *io_dispatcher, **pdispatcher; + gint fd; + struct lua_dispatcher_cbdata *cbdata; + struct timeval tv = {0, 0}; + double tv_num, tmp; + + if (lua_gettop (L) >= 5 && lua_isfunction (L, 3) && lua_isfunction (L, 5)) { + cbdata = g_slice_alloc0 (sizeof (struct lua_dispatcher_cbdata)); + cbdata->base = lua_check_event_base (L); + if (cbdata->base == NULL) { + /* Create new event base */ + msg_warn ("create new event base as it is not specified"); + cbdata->base = event_init (); + } + cbdata->L = L; + fd = lua_tointeger (L, 2); + lua_pushvalue (L, 3); + cbdata->cbref_read = luaL_ref (L, LUA_REGISTRYINDEX); + if (lua_isfunction (L, 4)) { + /* Push write callback as well */ + lua_pushvalue (L, 4); + cbdata->cbref_write = luaL_ref (L, LUA_REGISTRYINDEX); + } + /* Error callback */ + lua_pushvalue (L, 5); + cbdata->cbref_err = luaL_ref (L, LUA_REGISTRYINDEX); + + if (lua_gettop (L) > 5) { + tv_num = lua_tonumber (L, 6); + tv.tv_sec = trunc (tv_num); + tv.tv_usec = modf (tv_num, &tmp) * 1000.; + } + io_dispatcher = rspamd_create_dispatcher (cbdata->base, fd, BUFFER_LINE, lua_io_read_cb, lua_io_write_cb, lua_io_err_cb, &tv, cbdata); + cbdata->d = io_dispatcher; + /* Push result */ + pdispatcher = lua_newuserdata (L, sizeof (struct rspamd_io_dispatcher_s *)); + lua_setclass (L, "rspamd{io_dispatcher}", -1); + *pdispatcher = io_dispatcher; + } + else { + msg_err ("invalid number of arguments to io_dispatcher.create: %d", lua_gettop (L)); + lua_pushnil (L); + } + + return 1; +} + +static int +lua_io_dispatcher_set_policy (lua_State *L) +{ + struct rspamd_io_dispatcher_s *io_dispatcher = lua_check_io_dispatcher (L); + gint policy, limit = -1; + + if (io_dispatcher) { + policy = lua_tonumber (L, 2); + if (policy > BUFFER_ANY || policy < BUFFER_LINE) { + msg_err ("invalid policy: %d", policy); + } + else { + if (lua_gettop (L) > 2) { + limit = lua_tonumber (L, 3); + } + rspamd_set_dispatcher_policy (io_dispatcher, policy, limit); + return 0; + } + } + else { + lua_pushnil (L); + } + + return 1; +} + +static int +lua_io_dispatcher_write (lua_State *L) +{ + struct rspamd_io_dispatcher_s *io_dispatcher = lua_check_io_dispatcher (L); + gboolean delayed = FALSE, res; + const gchar *data; + size_t len; + + if (io_dispatcher) { + if (lua_gettop (L) < 2) { + msg_err ("invalid number of arguments to io_dispatcher.create: %d", lua_gettop (L)); + lua_pushboolean (L, FALSE); + } + else { + data = lua_tolstring (L, 2, &len); + if (lua_gettop (L) > 2) { + delayed = lua_toboolean (L, 3); + } + res = rspamd_dispatcher_write (io_dispatcher, (void *)data, len, delayed, FALSE); + lua_pushboolean (L, res); + } + } + else { + lua_pushnil (L); + } + + return 1; +} + +static int +lua_io_dispatcher_pause (lua_State *L) +{ + struct rspamd_io_dispatcher_s *io_dispatcher = lua_check_io_dispatcher (L); + + if (io_dispatcher) { + rspamd_dispatcher_pause (io_dispatcher); + return 0; + } + else { + lua_pushnil (L); + } + + return 1; +} + +static int +lua_io_dispatcher_restore (lua_State *L) +{ + struct rspamd_io_dispatcher_s *io_dispatcher = lua_check_io_dispatcher (L); + + if (io_dispatcher) { + rspamd_dispatcher_restore (io_dispatcher); + return 0; + } + else { + lua_pushnil (L); + } + + return 1; +} + +static int +lua_io_dispatcher_delete (lua_State *L) +{ + struct rspamd_io_dispatcher_s *io_dispatcher = lua_check_io_dispatcher (L); + + if (io_dispatcher) { + rspamd_remove_dispatcher (io_dispatcher); + return 0; + } + else { + lua_pushnil (L); + } + + return 1; +} + + +gint +luaopen_io_dispatcher (lua_State * L) +{ + luaL_newmetatable (L, "rspamd{io_dispatcher}"); + lua_pushstring (L, "__index"); + lua_pushvalue (L, -2); + lua_settable (L, -3); + + lua_pushstring (L, "class"); + lua_pushstring (L, "rspamd{io_dispatcher}"); + lua_rawset (L, -3); + + luaL_openlib (L, NULL, io_dispatcherlib_m, 0); + luaL_openlib(L, "rspamd_io_dispatcher", io_dispatcherlib_f, 0); + + /* Simple event class */ + lua_newclass (L, "rspamd{ev_base}", null_reg); + luaL_openlib (L, "rspamd_ev_base", null_reg, 0); + + /* Set buffer types globals */ + lua_pushnumber (L, BUFFER_LINE); + lua_setglobal (L, "IO_BUFFER_LINE"); + lua_pushnumber (L, BUFFER_CHARACTER); + lua_setglobal (L, "IO_BUFFER_CHARACTER"); + lua_pushnumber (L, BUFFER_ANY); + lua_setglobal (L, "IO_BUFFER_ANY"); + return 1; +} diff --git a/src/lua/lua_session.c b/src/lua/lua_session.c index 37a41964e..5ccfc76d3 100644 --- a/src/lua/lua_session.c +++ b/src/lua/lua_session.c @@ -157,6 +157,8 @@ lua_session_cleanup (gpointer ud) } } + + static int lua_session_create (lua_State *L) { @@ -207,7 +209,7 @@ lua_session_create (lua_State *L) } session = new_async_session (mempool, lua_session_finalizer, lua_session_restore, lua_session_cleanup, cbdata); cbdata->session = session; - psession = lua_newuserdata (L, sizeof (struct real_name *)); + psession = lua_newuserdata (L, sizeof (struct rspamd_async_session *)); lua_setclass (L, "rspamd{session}", -1); *psession = session; diff --git a/src/lua/lua_task.c b/src/lua/lua_task.c index 3ef88e390..3aa204597 100644 --- a/src/lua/lua_task.c +++ b/src/lua/lua_task.c @@ -45,6 +45,7 @@ extern stat_file_t* get_statfile_by_symbol (statfile_pool_t *pool, struct classi /* Task methods */ LUA_FUNCTION_DEF (task, get_message); LUA_FUNCTION_DEF (task, get_mempool); +LUA_FUNCTION_DEF (task, get_ev_base); LUA_FUNCTION_DEF (task, insert_result); LUA_FUNCTION_DEF (task, set_pre_result); LUA_FUNCTION_DEF (task, get_urls); @@ -79,6 +80,7 @@ LUA_FUNCTION_DEF (task, learn_statfile); static const struct luaL_reg tasklib_m[] = { LUA_INTERFACE_DEF (task, get_message), LUA_INTERFACE_DEF (task, get_mempool), + LUA_INTERFACE_DEF (task, get_ev_base), LUA_INTERFACE_DEF (task, insert_result), LUA_INTERFACE_DEF (task, set_pre_result), LUA_INTERFACE_DEF (task, get_urls), @@ -230,6 +232,20 @@ lua_task_get_mempool (lua_State * L) return 1; } +static int +lua_task_get_ev_base (lua_State * L) +{ + struct event_base **pbase; + struct worker_task *task = lua_check_task (L); + + if (task != NULL) { + pbase = lua_newuserdata (L, sizeof (struct event_base *)); + lua_setclass (L, "rspamd{ev_base}", -1); + *pbase = task->ev_base; + } + return 1; +} + static gint lua_task_insert_result (lua_State * L) {