Browse Source

Rework lua_redis to table form.

tags/0.9.0
Vsevolod Stakhov 9 years ago
parent
commit
f0803d058f
1 changed files with 154 additions and 76 deletions
  1. 154
    76
      src/lua/lua_redis.c

+ 154
- 76
src/lua/lua_redis.c View File

@@ -30,8 +30,8 @@

#define REDIS_DEFAULT_TIMEOUT 1.0

/**
* Redis access API for lua from task object
/***
* This module implements redis asynchronous client for rspamd LUA API.
*/

LUA_FUNCTION_DEF (redis, make_request);
@@ -54,6 +54,8 @@ struct lua_redis_userdata {
gchar *server;
gchar *reqline;
guint16 port;
gchar **args;
guint nargs;
};


@@ -61,11 +63,20 @@ static void
lua_redis_fin (void *arg)
{
struct lua_redis_userdata *ud = arg;
guint i;

if (ud->ctx) {
redisAsyncFree (ud->ctx);
event_del (&ud->timeout);
luaL_unref (ud->L, LUA_REGISTRYINDEX, ud->cbref);

if (ud->args) {
for (i = 0; i < ud->nargs; i ++) {
g_free (ud->args[i]);
}

g_free (ud->args);
}
}
}

@@ -202,6 +213,49 @@ lua_redis_timeout (int fd, short what, gpointer u)
lua_redis_push_error ("timeout while connecting the server", ud, TRUE);
}


static void
lua_redis_parse_args (lua_State *L, gint idx, const gchar *cmd,
struct lua_redis_userdata *ud)
{
gchar **args = NULL;
gint top;

if (idx != 0 && lua_type (L, idx) == LUA_TTABLE) {
/* Get all arguments */
lua_pushvalue (L, 5);
lua_pushnil (L);
top = 0;

while (lua_next (L, -2) != 0) {
if (lua_isstring (L, -1)) {
top ++;
}
lua_pop (L, 1);
}

args = g_malloc ((top + 1) * sizeof (gchar *));
lua_pushnil (L);
args[0] = g_strdup (cmd);
top = 1;

while (lua_next (L, -2) != 0) {
args[top++] = g_strdup (lua_tostring (L, -1));
lua_pop (L, 1);
}

lua_pop (L, 1);
}
else {
/* Use merely cmd */
args = g_malloc (sizeof (gchar *));
args[0] = g_strdup (cmd);
top = 1;
}

ud->nargs = top;
ud->args = args;
}
/**
* Make request to redis server
* @param task worker task object
@@ -216,14 +270,69 @@ static int
lua_redis_make_request (lua_State *L)
{
struct lua_redis_userdata *ud;
struct rspamd_lua_ip *addr;
struct rspamd_task *task;
const gchar **args = NULL, *cmd;
gint top;
struct rspamd_lua_ip *addr = NULL;
struct rspamd_task *task = NULL;
const gchar *cmd = NULL;
gint top, cbref = -1;
struct timeval tv;
gboolean ret = FALSE;
gdouble timeout = REDIS_DEFAULT_TIMEOUT;

if (lua_istable (L, 1)) {
/* Table version */
lua_pushstring (L, "task");
lua_gettable (L, -2);
if (lua_type (L, -1) == LUA_TUSERDATA) {
task = lua_check_task (L, -1);
}
lua_pop (L, 1);

if ((task = lua_check_task (L, 1)) != NULL) {
lua_pushstring (L, "callback");
lua_gettable (L, -2);
if (lua_type (L, -1) == LUA_TFUNCTION) {
/* This also pops function from the stack */
cbref = luaL_ref (L, LUA_REGISTRYINDEX);
}
else {
msg_err ("bad callback argument for lua redis");
lua_pop (L, 1);
}

lua_pushstring (L, "cmd");
lua_gettable (L, -2);
cmd = lua_tostring (L, -1);
lua_pop (L, 1);

lua_pushstring (L, "host");
lua_gettable (L, -2);
if (lua_type (L, -1) == LUA_TUSERDATA) {
addr = lua_check_ip (L, -1);
}
lua_pop (L, 1);

lua_pushstring (L, "timeout");
lua_gettable (L, -2);
timeout = lua_tonumber (L, -1);
lua_pop (L, 1);

if (task != NULL && addr != NULL && cbref != -1 && cmd != NULL) {
ud =
rspamd_mempool_alloc (task->task_pool,
sizeof (struct lua_redis_userdata));
ud->task = task;
ud->L = L;
ud->cbref = cbref;
lua_pushstring (L, "args");
lua_redis_parse_args (L, -1, cmd, ud);
}
else {
if (cbref != -1) {
luaL_unref (L, LUA_REGISTRYINDEX, cbref);
}
msg_err ("incorrect function invocation");
}
}
else if ((task = lua_check_task (L, 1)) != NULL) {
addr = lua_check_ip (L, 2);
top = lua_gettop (L);
/* Now get callback */
@@ -234,17 +343,7 @@ lua_redis_make_request (lua_State *L)
sizeof (struct lua_redis_userdata));
ud->task = task;
ud->L = L;
ud->ctx = redisAsyncConnect (rspamd_inet_address_to_string (addr->addr),
rspamd_inet_address_get_port (addr->addr));

if (ud->ctx == NULL || ud->ctx->err) {
redisAsyncFree (ud->ctx);
lua_pushboolean (L, FALSE);

return 1;
}

redisLibeventAttach (ud->ctx, ud->task->ev_base);
/* Pop other arguments */
lua_pushvalue (L, 3);
/* Get a reference */
@@ -252,71 +351,50 @@ lua_redis_make_request (lua_State *L)

cmd = luaL_checkstring (L, 4);
if (top > 4) {
if (lua_istable (L, 5)) {
/* Get all arguments */
lua_pushvalue (L, 5);
lua_pushnil (L);
top = 0;

while (lua_next (L, -2) != 0) {
if (lua_isstring (L, -1)) {
top ++;
}
lua_pop (L, 1);
}

args = g_alloca ((top + 1) * sizeof (gchar *));
lua_pushnil (L);
args[0] = cmd;
top = 1;

while (lua_next (L, -2) != 0) {
args[top++] = lua_tostring (L, -1);
lua_pop (L, 1);
}

lua_pop (L, 1);
}
else {
msg_warn ("bad arguments format");
args = g_alloca (sizeof (gchar *));
args[0] = cmd;
top = 1;
}
lua_redis_parse_args (L, 5, cmd, ud);
}
else {
args = g_alloca (sizeof (gchar *));
args[0] = cmd;
top = 1;
lua_redis_parse_args (L, 0, cmd, ud);
}
}
else {
msg_err ("incorrect function invocation");
}
}

ret = redisAsyncCommandArgv (ud->ctx,
lua_redis_callback,
ud,
top,
args,
NULL);
if (ret == REDIS_OK) {
register_async_event (ud->task->s,
lua_redis_fin,
ud,
g_quark_from_static_string ("lua redis"));
/*
* TODO: cannot handle more than fixed timeout here
*/
double_to_tv (REDIS_DEFAULT_TIMEOUT, &tv);
event_set (&ud->timeout, -1, EV_TIMEOUT, lua_redis_timeout, ud);
event_base_set (ud->task->ev_base, &ud->timeout);
event_add (&ud->timeout, &tv);
}
else {
msg_info ("call to redis failed: %s", ud->ctx->errstr);
redisAsyncFree (ud->ctx);
luaL_unref (ud->L, LUA_REGISTRYINDEX, ud->cbref);
}
if (ret) {
ud->ctx = redisAsyncConnect (rspamd_inet_address_to_string (addr->addr),
rspamd_inet_address_get_port (addr->addr));

if (ud->ctx == NULL || ud->ctx->err) {
redisAsyncFree (ud->ctx);
luaL_unref (ud->L, LUA_REGISTRYINDEX, ud->cbref);
lua_pushboolean (L, FALSE);

return 1;
}
redisLibeventAttach (ud->ctx, ud->task->ev_base);
ret = redisAsyncCommandArgv (ud->ctx,
lua_redis_callback,
ud,
ud->nargs,
(const gchar **)ud->args,
NULL);
if (ret == REDIS_OK) {
register_async_event (ud->task->s,
lua_redis_fin,
ud,
g_quark_from_static_string ("lua redis"));

double_to_tv (timeout, &tv);
event_set (&ud->timeout, -1, EV_TIMEOUT, lua_redis_timeout, ud);
event_base_set (ud->task->ev_base, &ud->timeout);
event_add (&ud->timeout, &tv);
}
else {
msg_info ("incorrect function invocation");
msg_info ("call to redis failed: %s", ud->ctx->errstr);
redisAsyncFree (ud->ctx);
luaL_unref (ud->L, LUA_REGISTRYINDEX, ud->cbref);
}
}


Loading…
Cancel
Save