aboutsummaryrefslogtreecommitdiffstats
path: root/src/lua/lua_redis.c
diff options
context:
space:
mode:
authorVsevolod Stakhov <vsevolod@highsecure.ru>2015-04-09 16:16:09 +0100
committerVsevolod Stakhov <vsevolod@highsecure.ru>2015-04-09 16:16:09 +0100
commitf0803d058f4d24652e185aa410cd5acbcf3f6e10 (patch)
treed1394ffc383dd29e734f5dcfa10700cd84187c52 /src/lua/lua_redis.c
parenta79164da5127eb03bf681af935053c1dd3075581 (diff)
downloadrspamd-f0803d058f4d24652e185aa410cd5acbcf3f6e10.tar.gz
rspamd-f0803d058f4d24652e185aa410cd5acbcf3f6e10.zip
Rework lua_redis to table form.
Diffstat (limited to 'src/lua/lua_redis.c')
-rw-r--r--src/lua/lua_redis.c230
1 files changed, 154 insertions, 76 deletions
diff --git a/src/lua/lua_redis.c b/src/lua/lua_redis.c
index 2fa366ffe..47608fa36 100644
--- a/src/lua/lua_redis.c
+++ b/src/lua/lua_redis.c
@@ -30,8 +30,8 @@
#define REDIS_DEFAULT_TIMEOUT 1.0
-/**
- * Redis access API for lua from task object
+/***
+ * This module implements redis asynchronous client for rspamd LUA API.
*/
LUA_FUNCTION_DEF (redis, make_request);
@@ -54,6 +54,8 @@ struct lua_redis_userdata {
gchar *server;
gchar *reqline;
guint16 port;
+ gchar **args;
+ guint nargs;
};
@@ -61,11 +63,20 @@ static void
lua_redis_fin (void *arg)
{
struct lua_redis_userdata *ud = arg;
+ guint i;
if (ud->ctx) {
redisAsyncFree (ud->ctx);
event_del (&ud->timeout);
luaL_unref (ud->L, LUA_REGISTRYINDEX, ud->cbref);
+
+ if (ud->args) {
+ for (i = 0; i < ud->nargs; i ++) {
+ g_free (ud->args[i]);
+ }
+
+ g_free (ud->args);
+ }
}
}
@@ -202,6 +213,49 @@ lua_redis_timeout (int fd, short what, gpointer u)
lua_redis_push_error ("timeout while connecting the server", ud, TRUE);
}
+
+static void
+lua_redis_parse_args (lua_State *L, gint idx, const gchar *cmd,
+ struct lua_redis_userdata *ud)
+{
+ gchar **args = NULL;
+ gint top;
+
+ if (idx != 0 && lua_type (L, idx) == LUA_TTABLE) {
+ /* Get all arguments */
+ lua_pushvalue (L, 5);
+ lua_pushnil (L);
+ top = 0;
+
+ while (lua_next (L, -2) != 0) {
+ if (lua_isstring (L, -1)) {
+ top ++;
+ }
+ lua_pop (L, 1);
+ }
+
+ args = g_malloc ((top + 1) * sizeof (gchar *));
+ lua_pushnil (L);
+ args[0] = g_strdup (cmd);
+ top = 1;
+
+ while (lua_next (L, -2) != 0) {
+ args[top++] = g_strdup (lua_tostring (L, -1));
+ lua_pop (L, 1);
+ }
+
+ lua_pop (L, 1);
+ }
+ else {
+ /* Use merely cmd */
+ args = g_malloc (sizeof (gchar *));
+ args[0] = g_strdup (cmd);
+ top = 1;
+ }
+
+ ud->nargs = top;
+ ud->args = args;
+}
/**
* Make request to redis server
* @param task worker task object
@@ -216,14 +270,69 @@ static int
lua_redis_make_request (lua_State *L)
{
struct lua_redis_userdata *ud;
- struct rspamd_lua_ip *addr;
- struct rspamd_task *task;
- const gchar **args = NULL, *cmd;
- gint top;
+ struct rspamd_lua_ip *addr = NULL;
+ struct rspamd_task *task = NULL;
+ const gchar *cmd = NULL;
+ gint top, cbref = -1;
struct timeval tv;
gboolean ret = FALSE;
+ gdouble timeout = REDIS_DEFAULT_TIMEOUT;
+
+ if (lua_istable (L, 1)) {
+ /* Table version */
+ lua_pushstring (L, "task");
+ lua_gettable (L, -2);
+ if (lua_type (L, -1) == LUA_TUSERDATA) {
+ task = lua_check_task (L, -1);
+ }
+ lua_pop (L, 1);
- if ((task = lua_check_task (L, 1)) != NULL) {
+ lua_pushstring (L, "callback");
+ lua_gettable (L, -2);
+ if (lua_type (L, -1) == LUA_TFUNCTION) {
+ /* This also pops function from the stack */
+ cbref = luaL_ref (L, LUA_REGISTRYINDEX);
+ }
+ else {
+ msg_err ("bad callback argument for lua redis");
+ lua_pop (L, 1);
+ }
+
+ lua_pushstring (L, "cmd");
+ lua_gettable (L, -2);
+ cmd = lua_tostring (L, -1);
+ lua_pop (L, 1);
+
+ lua_pushstring (L, "host");
+ lua_gettable (L, -2);
+ if (lua_type (L, -1) == LUA_TUSERDATA) {
+ addr = lua_check_ip (L, -1);
+ }
+ lua_pop (L, 1);
+
+ lua_pushstring (L, "timeout");
+ lua_gettable (L, -2);
+ timeout = lua_tonumber (L, -1);
+ lua_pop (L, 1);
+
+ if (task != NULL && addr != NULL && cbref != -1 && cmd != NULL) {
+ ud =
+ rspamd_mempool_alloc (task->task_pool,
+ sizeof (struct lua_redis_userdata));
+ ud->task = task;
+ ud->L = L;
+ ud->cbref = cbref;
+ lua_pushstring (L, "args");
+ lua_redis_parse_args (L, -1, cmd, ud);
+ }
+ else {
+ if (cbref != -1) {
+ luaL_unref (L, LUA_REGISTRYINDEX, cbref);
+ }
+ msg_err ("incorrect function invocation");
+ }
+ }
+ else if ((task = lua_check_task (L, 1)) != NULL) {
addr = lua_check_ip (L, 2);
top = lua_gettop (L);
/* Now get callback */
@@ -234,17 +343,7 @@ lua_redis_make_request (lua_State *L)
sizeof (struct lua_redis_userdata));
ud->task = task;
ud->L = L;
- ud->ctx = redisAsyncConnect (rspamd_inet_address_to_string (addr->addr),
- rspamd_inet_address_get_port (addr->addr));
- if (ud->ctx == NULL || ud->ctx->err) {
- redisAsyncFree (ud->ctx);
- lua_pushboolean (L, FALSE);
-
- return 1;
- }
-
- redisLibeventAttach (ud->ctx, ud->task->ev_base);
/* Pop other arguments */
lua_pushvalue (L, 3);
/* Get a reference */
@@ -252,71 +351,50 @@ lua_redis_make_request (lua_State *L)
cmd = luaL_checkstring (L, 4);
if (top > 4) {
- if (lua_istable (L, 5)) {
- /* Get all arguments */
- lua_pushvalue (L, 5);
- lua_pushnil (L);
- top = 0;
-
- while (lua_next (L, -2) != 0) {
- if (lua_isstring (L, -1)) {
- top ++;
- }
- lua_pop (L, 1);
- }
-
- args = g_alloca ((top + 1) * sizeof (gchar *));
- lua_pushnil (L);
- args[0] = cmd;
- top = 1;
-
- while (lua_next (L, -2) != 0) {
- args[top++] = lua_tostring (L, -1);
- lua_pop (L, 1);
- }
-
- lua_pop (L, 1);
- }
- else {
- msg_warn ("bad arguments format");
- args = g_alloca (sizeof (gchar *));
- args[0] = cmd;
- top = 1;
- }
+ lua_redis_parse_args (L, 5, cmd, ud);
}
else {
- args = g_alloca (sizeof (gchar *));
- args[0] = cmd;
- top = 1;
+ lua_redis_parse_args (L, 0, cmd, ud);
}
+ }
+ else {
+ msg_err ("incorrect function invocation");
+ }
+ }
- ret = redisAsyncCommandArgv (ud->ctx,
- lua_redis_callback,
- ud,
- top,
- args,
- NULL);
- if (ret == REDIS_OK) {
- register_async_event (ud->task->s,
- lua_redis_fin,
- ud,
- g_quark_from_static_string ("lua redis"));
- /*
- * TODO: cannot handle more than fixed timeout here
- */
- double_to_tv (REDIS_DEFAULT_TIMEOUT, &tv);
- event_set (&ud->timeout, -1, EV_TIMEOUT, lua_redis_timeout, ud);
- event_base_set (ud->task->ev_base, &ud->timeout);
- event_add (&ud->timeout, &tv);
- }
- else {
- msg_info ("call to redis failed: %s", ud->ctx->errstr);
- redisAsyncFree (ud->ctx);
- luaL_unref (ud->L, LUA_REGISTRYINDEX, ud->cbref);
- }
+ if (ret) {
+ ud->ctx = redisAsyncConnect (rspamd_inet_address_to_string (addr->addr),
+ rspamd_inet_address_get_port (addr->addr));
+
+ if (ud->ctx == NULL || ud->ctx->err) {
+ redisAsyncFree (ud->ctx);
+ luaL_unref (ud->L, LUA_REGISTRYINDEX, ud->cbref);
+ lua_pushboolean (L, FALSE);
+
+ return 1;
+ }
+ redisLibeventAttach (ud->ctx, ud->task->ev_base);
+ ret = redisAsyncCommandArgv (ud->ctx,
+ lua_redis_callback,
+ ud,
+ ud->nargs,
+ (const gchar **)ud->args,
+ NULL);
+ if (ret == REDIS_OK) {
+ register_async_event (ud->task->s,
+ lua_redis_fin,
+ ud,
+ g_quark_from_static_string ("lua redis"));
+
+ double_to_tv (timeout, &tv);
+ event_set (&ud->timeout, -1, EV_TIMEOUT, lua_redis_timeout, ud);
+ event_base_set (ud->task->ev_base, &ud->timeout);
+ event_add (&ud->timeout, &tv);
}
else {
- msg_info ("incorrect function invocation");
+ msg_info ("call to redis failed: %s", ud->ctx->errstr);
+ redisAsyncFree (ud->ctx);
+ luaL_unref (ud->L, LUA_REGISTRYINDEX, ud->cbref);
}
}