From f0803d058f4d24652e185aa410cd5acbcf3f6e10 Mon Sep 17 00:00:00 2001 From: Vsevolod Stakhov Date: Thu, 9 Apr 2015 16:16:09 +0100 Subject: [PATCH] Rework lua_redis to table form. --- src/lua/lua_redis.c | 230 +++++++++++++++++++++++++++++--------------- 1 file changed, 154 insertions(+), 76 deletions(-) diff --git a/src/lua/lua_redis.c b/src/lua/lua_redis.c index 2fa366ffe..47608fa36 100644 --- a/src/lua/lua_redis.c +++ b/src/lua/lua_redis.c @@ -30,8 +30,8 @@ #define REDIS_DEFAULT_TIMEOUT 1.0 -/** - * Redis access API for lua from task object +/*** + * This module implements redis asynchronous client for rspamd LUA API. */ LUA_FUNCTION_DEF (redis, make_request); @@ -54,6 +54,8 @@ struct lua_redis_userdata { gchar *server; gchar *reqline; guint16 port; + gchar **args; + guint nargs; }; @@ -61,11 +63,20 @@ static void lua_redis_fin (void *arg) { struct lua_redis_userdata *ud = arg; + guint i; if (ud->ctx) { redisAsyncFree (ud->ctx); event_del (&ud->timeout); luaL_unref (ud->L, LUA_REGISTRYINDEX, ud->cbref); + + if (ud->args) { + for (i = 0; i < ud->nargs; i ++) { + g_free (ud->args[i]); + } + + g_free (ud->args); + } } } @@ -202,6 +213,49 @@ lua_redis_timeout (int fd, short what, gpointer u) lua_redis_push_error ("timeout while connecting the server", ud, TRUE); } + +static void +lua_redis_parse_args (lua_State *L, gint idx, const gchar *cmd, + struct lua_redis_userdata *ud) +{ + gchar **args = NULL; + gint top; + + if (idx != 0 && lua_type (L, idx) == LUA_TTABLE) { + /* Get all arguments */ + lua_pushvalue (L, 5); + lua_pushnil (L); + top = 0; + + while (lua_next (L, -2) != 0) { + if (lua_isstring (L, -1)) { + top ++; + } + lua_pop (L, 1); + } + + args = g_malloc ((top + 1) * sizeof (gchar *)); + lua_pushnil (L); + args[0] = g_strdup (cmd); + top = 1; + + while (lua_next (L, -2) != 0) { + args[top++] = g_strdup (lua_tostring (L, -1)); + lua_pop (L, 1); + } + + lua_pop (L, 1); + } + else { + /* Use merely cmd */ + args = g_malloc (sizeof (gchar *)); + args[0] = g_strdup (cmd); + top = 1; + } + + ud->nargs = top; + ud->args = args; +} /** * Make request to redis server * @param task worker task object @@ -216,14 +270,69 @@ static int lua_redis_make_request (lua_State *L) { struct lua_redis_userdata *ud; - struct rspamd_lua_ip *addr; - struct rspamd_task *task; - const gchar **args = NULL, *cmd; - gint top; + struct rspamd_lua_ip *addr = NULL; + struct rspamd_task *task = NULL; + const gchar *cmd = NULL; + gint top, cbref = -1; struct timeval tv; gboolean ret = FALSE; + gdouble timeout = REDIS_DEFAULT_TIMEOUT; + + if (lua_istable (L, 1)) { + /* Table version */ + lua_pushstring (L, "task"); + lua_gettable (L, -2); + if (lua_type (L, -1) == LUA_TUSERDATA) { + task = lua_check_task (L, -1); + } + lua_pop (L, 1); - if ((task = lua_check_task (L, 1)) != NULL) { + lua_pushstring (L, "callback"); + lua_gettable (L, -2); + if (lua_type (L, -1) == LUA_TFUNCTION) { + /* This also pops function from the stack */ + cbref = luaL_ref (L, LUA_REGISTRYINDEX); + } + else { + msg_err ("bad callback argument for lua redis"); + lua_pop (L, 1); + } + + lua_pushstring (L, "cmd"); + lua_gettable (L, -2); + cmd = lua_tostring (L, -1); + lua_pop (L, 1); + + lua_pushstring (L, "host"); + lua_gettable (L, -2); + if (lua_type (L, -1) == LUA_TUSERDATA) { + addr = lua_check_ip (L, -1); + } + lua_pop (L, 1); + + lua_pushstring (L, "timeout"); + lua_gettable (L, -2); + timeout = lua_tonumber (L, -1); + lua_pop (L, 1); + + if (task != NULL && addr != NULL && cbref != -1 && cmd != NULL) { + ud = + rspamd_mempool_alloc (task->task_pool, + sizeof (struct lua_redis_userdata)); + ud->task = task; + ud->L = L; + ud->cbref = cbref; + lua_pushstring (L, "args"); + lua_redis_parse_args (L, -1, cmd, ud); + } + else { + if (cbref != -1) { + luaL_unref (L, LUA_REGISTRYINDEX, cbref); + } + msg_err ("incorrect function invocation"); + } + } + else if ((task = lua_check_task (L, 1)) != NULL) { addr = lua_check_ip (L, 2); top = lua_gettop (L); /* Now get callback */ @@ -234,17 +343,7 @@ lua_redis_make_request (lua_State *L) sizeof (struct lua_redis_userdata)); ud->task = task; ud->L = L; - ud->ctx = redisAsyncConnect (rspamd_inet_address_to_string (addr->addr), - rspamd_inet_address_get_port (addr->addr)); - if (ud->ctx == NULL || ud->ctx->err) { - redisAsyncFree (ud->ctx); - lua_pushboolean (L, FALSE); - - return 1; - } - - redisLibeventAttach (ud->ctx, ud->task->ev_base); /* Pop other arguments */ lua_pushvalue (L, 3); /* Get a reference */ @@ -252,71 +351,50 @@ lua_redis_make_request (lua_State *L) cmd = luaL_checkstring (L, 4); if (top > 4) { - if (lua_istable (L, 5)) { - /* Get all arguments */ - lua_pushvalue (L, 5); - lua_pushnil (L); - top = 0; - - while (lua_next (L, -2) != 0) { - if (lua_isstring (L, -1)) { - top ++; - } - lua_pop (L, 1); - } - - args = g_alloca ((top + 1) * sizeof (gchar *)); - lua_pushnil (L); - args[0] = cmd; - top = 1; - - while (lua_next (L, -2) != 0) { - args[top++] = lua_tostring (L, -1); - lua_pop (L, 1); - } - - lua_pop (L, 1); - } - else { - msg_warn ("bad arguments format"); - args = g_alloca (sizeof (gchar *)); - args[0] = cmd; - top = 1; - } + lua_redis_parse_args (L, 5, cmd, ud); } else { - args = g_alloca (sizeof (gchar *)); - args[0] = cmd; - top = 1; + lua_redis_parse_args (L, 0, cmd, ud); } + } + else { + msg_err ("incorrect function invocation"); + } + } - ret = redisAsyncCommandArgv (ud->ctx, - lua_redis_callback, - ud, - top, - args, - NULL); - if (ret == REDIS_OK) { - register_async_event (ud->task->s, - lua_redis_fin, - ud, - g_quark_from_static_string ("lua redis")); - /* - * TODO: cannot handle more than fixed timeout here - */ - double_to_tv (REDIS_DEFAULT_TIMEOUT, &tv); - event_set (&ud->timeout, -1, EV_TIMEOUT, lua_redis_timeout, ud); - event_base_set (ud->task->ev_base, &ud->timeout); - event_add (&ud->timeout, &tv); - } - else { - msg_info ("call to redis failed: %s", ud->ctx->errstr); - redisAsyncFree (ud->ctx); - luaL_unref (ud->L, LUA_REGISTRYINDEX, ud->cbref); - } + if (ret) { + ud->ctx = redisAsyncConnect (rspamd_inet_address_to_string (addr->addr), + rspamd_inet_address_get_port (addr->addr)); + + if (ud->ctx == NULL || ud->ctx->err) { + redisAsyncFree (ud->ctx); + luaL_unref (ud->L, LUA_REGISTRYINDEX, ud->cbref); + lua_pushboolean (L, FALSE); + + return 1; + } + redisLibeventAttach (ud->ctx, ud->task->ev_base); + ret = redisAsyncCommandArgv (ud->ctx, + lua_redis_callback, + ud, + ud->nargs, + (const gchar **)ud->args, + NULL); + if (ret == REDIS_OK) { + register_async_event (ud->task->s, + lua_redis_fin, + ud, + g_quark_from_static_string ("lua redis")); + + double_to_tv (timeout, &tv); + event_set (&ud->timeout, -1, EV_TIMEOUT, lua_redis_timeout, ud); + event_base_set (ud->task->ev_base, &ud->timeout); + event_add (&ud->timeout, &tv); } else { - msg_info ("incorrect function invocation"); + msg_info ("call to redis failed: %s", ud->ctx->errstr); + redisAsyncFree (ud->ctx); + luaL_unref (ud->L, LUA_REGISTRYINDEX, ud->cbref); } } -- 2.39.5