]> source.dussan.org Git - rspamd.git/commitdiff
[Feature] Implement pipelining for redis async interface
authorVsevolod Stakhov <vsevolod@highsecure.ru>
Tue, 26 Apr 2016 13:49:28 +0000 (14:49 +0100)
committerVsevolod Stakhov <vsevolod@highsecure.ru>
Tue, 26 Apr 2016 13:49:47 +0000 (14:49 +0100)
src/lua/lua_redis.c

index 952bfb8834f8f7f07c15a7080020c0d4da148c87..3341b0a7bf542da2790b44307a448d40a6e99fa5 100644 (file)
@@ -15,6 +15,7 @@
  */
 #include "lua_common.h"
 #include "dns.h"
+#include "utlist.h"
 
 #ifdef WITH_HIREDIS
 #include "hiredis.h"
@@ -71,6 +72,7 @@ static const struct luaL_reg redislib_m[] = {
 };
 
 #ifdef WITH_HIREDIS
+struct lua_redis_specific_userdata;
 /**
  * Struct for userdata representation
  */
@@ -78,16 +80,24 @@ struct lua_redis_userdata {
        redisAsyncContext *ctx;
        lua_State *L;
        struct rspamd_task *task;
-       struct event timeout;
        gchar *server;
        gchar *reqline;
-       gchar **args;
-       gint cbref;
-       guint nargs;
+       struct lua_redis_specific_userdata *specific;
+       gdouble timeout;
        guint16 port;
        guint16 terminated;
 };
 
+struct lua_redis_specific_userdata {
+       gint cbref;
+       guint nargs;
+       gchar **args;
+       struct event timeout;
+       struct lua_redis_userdata *c;
+       struct lua_redis_ctx *ctx;
+       struct lua_redis_specific_userdata *next;
+};
+
 struct lua_redis_ctx {
        gboolean async;
        union {
@@ -124,6 +134,7 @@ static void
 lua_redis_dtor (struct lua_redis_ctx *ctx)
 {
        struct lua_redis_userdata *ud;
+       struct lua_redis_specific_userdata *cur, *tmp;
 
        if (ctx->async) {
                ud = &ctx->d.async;
@@ -138,9 +149,17 @@ lua_redis_dtor (struct lua_redis_ctx *ctx)
                        ctx->ref.refcount = 100500;
                        redisAsyncFree (ud->ctx);
                        ctx->ref.refcount = 0;
-                       lua_redis_free_args (ud->args, ud->nargs);
-                       event_del (&ud->timeout);
-                       luaL_unref (ud->L, LUA_REGISTRYINDEX, ud->cbref);
+
+                       LL_FOREACH_SAFE (ud->specific, cur, tmp) {
+                               lua_redis_free_args (cur->args, cur->nargs);
+                               event_del (&cur->timeout);
+
+                               if (cur->cbref != -1) {
+                                       luaL_unref (ud->L, LUA_REGISTRYINDEX, cur->cbref);
+                               }
+
+                               g_slice_free1 (sizeof (*cur), cur);
+                       }
                }
        }
        else {
@@ -167,8 +186,10 @@ lua_redis_gc (lua_State *L)
 static void
 lua_redis_fin (void *arg)
 {
-       struct lua_redis_ctx *ctx = arg;
+       struct lua_redis_specific_userdata *sp_ud = arg;
+       struct lua_redis_ctx *ctx;
 
+       ctx = sp_ud->ctx;
        REF_RELEASE (ctx);
 }
 
@@ -180,28 +201,31 @@ lua_redis_fin (void *arg)
 static void
 lua_redis_push_error (const gchar *err,
        struct lua_redis_ctx *ctx,
+       struct lua_redis_specific_userdata *sp_ud,
        gboolean connected)
 {
        struct rspamd_task **ptask;
-       struct lua_redis_userdata *ud = &ctx->d.async;
-
-       /* Push error */
-       lua_rawgeti (ud->L, LUA_REGISTRYINDEX, ud->cbref);
-       ptask = lua_newuserdata (ud->L, sizeof (struct rspamd_task *));
-       rspamd_lua_setclass (ud->L, "rspamd{task}", -1);
-
-       *ptask = ud->task;
-       /* String of error */
-       lua_pushstring (ud->L, err);
-       /* Data is nil */
-       lua_pushnil (ud->L);
-       if (lua_pcall (ud->L, 3, 0, 0) != 0) {
-               msg_info ("call to callback failed: %s", lua_tostring (ud->L, -1));
-               lua_pop (ud->L, 1);
+       struct lua_redis_userdata *ud = sp_ud->c;
+
+       if (sp_ud->cbref != -1) {
+               /* Push error */
+               lua_rawgeti (ud->L, LUA_REGISTRYINDEX, sp_ud->cbref);
+               ptask = lua_newuserdata (ud->L, sizeof (struct rspamd_task *));
+               rspamd_lua_setclass (ud->L, "rspamd{task}", -1);
+
+               *ptask = ud->task;
+               /* String of error */
+               lua_pushstring (ud->L, err);
+               /* Data is nil */
+               lua_pushnil (ud->L);
+               if (lua_pcall (ud->L, 3, 0, 0) != 0) {
+                       msg_info ("call to callback failed: %s", lua_tostring (ud->L, -1));
+                       lua_pop (ud->L, 1);
+               }
        }
 
        if (connected) {
-               rspamd_session_remove_event (ud->task->s, lua_redis_fin, ctx);
+               rspamd_session_remove_event (ud->task->s, lua_redis_fin, sp_ud);
        }
 }
 
@@ -241,28 +265,31 @@ lua_redis_push_reply (lua_State *L, const redisReply *r)
  * @param ud
  */
 static void
-lua_redis_push_data (const redisReply *r, struct lua_redis_ctx *ctx)
+lua_redis_push_data (const redisReply *r, struct lua_redis_ctx *ctx,
+               struct lua_redis_specific_userdata *sp_ud)
 {
        struct rspamd_task **ptask;
-       struct lua_redis_userdata *ud = &ctx->d.async;
-
-       /* Push error */
-       lua_rawgeti (ud->L, LUA_REGISTRYINDEX, ud->cbref);
-       ptask = lua_newuserdata (ud->L, sizeof (struct rspamd_task *));
-       rspamd_lua_setclass (ud->L, "rspamd{task}", -1);
-
-       *ptask = ud->task;
-       /* Error is nil */
-       lua_pushnil (ud->L);
-       /* Data */
-       lua_redis_push_reply (ud->L, r);
-
-       if (lua_pcall (ud->L, 3, 0, 0) != 0) {
-               msg_info ("call to callback failed: %s", lua_tostring (ud->L, -1));
-               lua_pop (ud->L, 1);
+       struct lua_redis_userdata *ud = sp_ud->c;
+
+       if (sp_ud->cbref != -1) {
+               /* Push error */
+               lua_rawgeti (ud->L, LUA_REGISTRYINDEX, sp_ud->cbref);
+               ptask = lua_newuserdata (ud->L, sizeof (struct rspamd_task *));
+               rspamd_lua_setclass (ud->L, "rspamd{task}", -1);
+
+               *ptask = ud->task;
+               /* Error is nil */
+               lua_pushnil (ud->L);
+               /* Data */
+               lua_redis_push_reply (ud->L, r);
+
+               if (lua_pcall (ud->L, 3, 0, 0) != 0) {
+                       msg_info ("call to callback failed: %s", lua_tostring (ud->L, -1));
+                       lua_pop (ud->L, 1);
+               }
        }
 
-       rspamd_session_remove_event (ud->task->s, lua_redis_fin, ctx);
+       rspamd_session_remove_event (ud->task->s, lua_redis_fin, sp_ud);
 }
 
 /**
@@ -275,13 +302,14 @@ static void
 lua_redis_callback (redisAsyncContext *c, gpointer r, gpointer priv)
 {
        redisReply *reply = r;
-       struct lua_redis_ctx *ctx = priv;
+       struct lua_redis_specific_userdata *sp_ud = priv;
+       struct lua_redis_ctx *ctx;
        struct lua_redis_userdata *ud;
 
+       ctx = sp_ud->ctx;
+       ud = sp_ud->c;
        REF_RETAIN (ctx);
 
-       ud = &ctx->d.async;
-
        if (ud->terminated) {
                /* We are already at the termination stage, just go out */
                REF_RELEASE (ctx);
@@ -291,22 +319,22 @@ lua_redis_callback (redisAsyncContext *c, gpointer r, gpointer priv)
        if (c->err == 0) {
                if (r != NULL) {
                        if (reply->type != REDIS_REPLY_ERROR) {
-                               lua_redis_push_data (reply, ctx);
+                               lua_redis_push_data (reply, ctx, sp_ud);
                        }
                        else {
-                               lua_redis_push_error (reply->str, ctx, TRUE);
+                               lua_redis_push_error (reply->str, ctx, sp_ud, TRUE);
                        }
                }
                else {
-                       lua_redis_push_error ("received no data from server", ctx, TRUE);
+                       lua_redis_push_error ("received no data from server", ctx, sp_ud, TRUE);
                }
        }
        else {
                if (c->err == REDIS_ERR_IO) {
-                       lua_redis_push_error (strerror (errno), ctx, TRUE);
+                       lua_redis_push_error (strerror (errno), ctx, sp_ud, TRUE);
                }
                else {
-                       lua_redis_push_error (c->errstr, ctx, TRUE);
+                       lua_redis_push_error (c->errstr, ctx, sp_ud, TRUE);
                }
        }
 
@@ -316,11 +344,13 @@ lua_redis_callback (redisAsyncContext *c, gpointer r, gpointer priv)
 static void
 lua_redis_timeout (int fd, short what, gpointer u)
 {
-       struct lua_redis_ctx *ctx = u;
+       struct lua_redis_specific_userdata *sp_ud = u;
+       struct lua_redis_ctx *ctx;
+       ctx = sp_ud->ctx;
 
        REF_RETAIN (ctx);
        msg_info ("timeout while querying redis server");
-       lua_redis_push_error ("timeout while connecting the server", ctx, TRUE);
+       lua_redis_push_error ("timeout while connecting the server", ctx, sp_ud, TRUE);
        REF_RELEASE (ctx);
 }
 
@@ -404,11 +434,12 @@ lua_redis_make_request (lua_State *L)
        struct lua_redis_ctx *ctx;
        rspamd_inet_addr_t *ip = NULL;
        struct lua_redis_userdata *ud;
+       struct lua_redis_specific_userdata *sp_ud;
        struct rspamd_lua_ip *addr = NULL;
        struct rspamd_task *task = NULL;
        const gchar *cmd = NULL, *host;
        const gchar *password = NULL, *dbname = NULL;
-       gint top, cbref = -1;
+       gint top, cbref = -1, args_pos;
        struct timeval tv;
        gboolean ret = FALSE;
        gdouble timeout = REDIS_DEFAULT_TIMEOUT;
@@ -487,18 +518,24 @@ lua_redis_make_request (lua_State *L)
                lua_pop (L, 1);
 
 
-               if (task != NULL && addr != NULL && cbref != -1 && cmd != NULL) {
+               if (task != NULL && addr != NULL && cmd != NULL) {
                        ctx = g_slice_alloc0 (sizeof (struct lua_redis_ctx));
                        REF_INIT_RETAIN (ctx, lua_redis_dtor);
                        ctx->async = TRUE;
                        ud = &ctx->d.async;
                        ud->task = task;
                        ud->L = L;
-                       ud->cbref = cbref;
+
+                       sp_ud = g_slice_alloc (sizeof (*sp_ud));
+                       sp_ud->cbref = cbref;
+                       sp_ud->c = ud;
+
                        lua_pushstring (L, "args");
                        lua_gettable (L, -2);
-                       lua_redis_parse_args (L, -1, cmd, &ud->args, &ud->nargs);
+                       lua_redis_parse_args (L, -1, cmd, &sp_ud->args, &sp_ud->nargs);
                        lua_pop (L, 1);
+                       LL_PREPEND (ud->specific, sp_ud);
+
                        ret = TRUE;
                }
                else {
@@ -514,7 +551,7 @@ lua_redis_make_request (lua_State *L)
                top = lua_gettop (L);
 
                /* Now get callback */
-               if (lua_isfunction (L, 3) && addr != NULL && addr->addr && top >= 4) {
+               if (addr != NULL && addr->addr && top >= 4) {
                        /* Create userdata */
                        ctx = g_slice_alloc0 (sizeof (struct lua_redis_ctx));
                        REF_INIT_RETAIN (ctx, lua_redis_dtor);
@@ -523,19 +560,34 @@ lua_redis_make_request (lua_State *L)
                        ud->task = task;
                        ud->L = L;
 
-                       /* Pop other arguments */
-                       lua_pushvalue (L, 3);
-                       /* Get a reference */
-                       ud->cbref = luaL_ref (L, LUA_REGISTRYINDEX);
+                       args_pos = 3;
+
+                       if (lua_isfunction (L, 3)) {
+                               /* Pop other arguments */
+                               lua_pushvalue (L, 3);
+                               /* Get a reference */
+                               cbref = luaL_ref (L, LUA_REGISTRYINDEX);
+                               args_pos = 4;
+                       }
+                       else {
+                               cbref = -1;
+                       }
+
 
-                       cmd = luaL_checkstring (L, 4);
+                       sp_ud = g_slice_alloc (sizeof (*sp_ud));
+                       sp_ud->cbref = cbref;
+                       sp_ud->c = ud;
+                       cmd = luaL_checkstring (L, args_pos);
                        if (top > 4) {
-                               lua_redis_parse_args (L, 5, cmd, &ud->args, &ud->nargs);
+                               lua_redis_parse_args (L, args_pos + 1, cmd, &sp_ud->args,
+                                               &sp_ud->nargs);
                        }
                        else {
-                               lua_redis_parse_args (L, 0, cmd, &ud->args, &ud->nargs);
+                               lua_redis_parse_args (L, 0, cmd, &sp_ud->args, &sp_ud->nargs);
                        }
 
+                       LL_PREPEND (ud->specific, sp_ud);
+
                        ret = TRUE;
                }
                else {
@@ -545,6 +597,7 @@ lua_redis_make_request (lua_State *L)
 
        if (ret) {
                ud->terminated = 0;
+               ud->timeout = timeout;
                ud->ctx = redisAsyncConnect (rspamd_inet_address_to_string (addr->addr),
                                rspamd_inet_address_get_port (addr->addr));
 
@@ -572,21 +625,22 @@ lua_redis_make_request (lua_State *L)
 
                ret = redisAsyncCommandArgv (ud->ctx,
                                        lua_redis_callback,
-                                       ctx,
-                                       ud->nargs,
-                                       (const gchar **)ud->args,
+                                       sp_ud,
+                                       sp_ud->nargs,
+                                       (const gchar **)sp_ud->args,
                                        NULL);
 
                if (ret == REDIS_OK) {
                        rspamd_session_add_event (ud->task->s,
                                        lua_redis_fin,
-                                       ctx,
+                                       sp_ud,
                                        g_quark_from_static_string ("lua redis"));
 
+                       sp_ud->ctx = ctx;
                        double_to_tv (timeout, &tv);
-                       event_set (&ud->timeout, -1, EV_TIMEOUT, lua_redis_timeout, ctx);
-                       event_base_set (ud->task->ev_base, &ud->timeout);
-                       event_add (&ud->timeout, &tv);
+                       event_set (&sp_ud->timeout, -1, EV_TIMEOUT, lua_redis_timeout, sp_ud);
+                       event_base_set (ud->task->ev_base, &sp_ud->timeout);
+                       event_add (&sp_ud->timeout, &tv);
                }
                else {
                        msg_info ("call to redis failed: %s", ud->ctx->errstr);
@@ -735,9 +789,11 @@ lua_redis_connect (lua_State *L)
        rspamd_inet_addr_t *ip = NULL;
        const gchar *host;
        struct lua_redis_ctx *ctx = NULL, **pctx;
+       struct lua_redis_specific_userdata *sp_ud;
        struct lua_redis_userdata *ud;
        struct rspamd_task *task = NULL;
        gboolean ret = FALSE;
+       gdouble timeout = REDIS_DEFAULT_TIMEOUT;
 
        if (lua_istable (L, 1)) {
                /* Table version */
@@ -748,6 +804,12 @@ lua_redis_connect (lua_State *L)
                }
                lua_pop (L, 1);
 
+               lua_pushstring (L, "timeout");
+               lua_gettable (L, -2);
+               if (lua_type (L, -1) == LUA_TNUMBER) {
+                       timeout = lua_tonumber (L, -1);
+               }
+               lua_pop (L, 1);
 
                lua_pushstring (L, "host");
                lua_gettable (L, -2);
@@ -783,13 +845,17 @@ lua_redis_connect (lua_State *L)
                        ud = &ctx->d.async;
                        ud->task = task;
                        ud->L = L;
-                       ud->cbref = -1;
+                       sp_ud = g_slice_alloc0 (sizeof (*sp_ud));
+                       sp_ud->cbref = -1;
+                       sp_ud->c = ud;
+                       LL_PREPEND (ud->specific, sp_ud);
                        ret = TRUE;
                }
        }
 
        if (ret && ctx) {
                ud->terminated = 0;
+               ud->timeout = timeout;
                ud->ctx = redisAsyncConnect (rspamd_inet_address_to_string (addr->addr),
                                rspamd_inet_address_get_port (addr->addr));
 
@@ -919,23 +985,79 @@ static int
 lua_redis_add_cmd (lua_State *L)
 {
        struct lua_redis_ctx *ctx = lua_check_redis (L, 1);
+       struct lua_redis_specific_userdata *sp_ud;
        const gchar *cmd = NULL;
        gint args_pos = 2;
        gchar **args = NULL;
        guint nargs = 0;
+       gint cbref = -1, ret;
+       struct timeval tv;
 
        if (ctx) {
-               if (lua_type (L, 2) == LUA_TSTRING) {
-                       cmd = lua_tostring (L, 2);
-                       args_pos = 3;
-               }
 
                if (ctx->async) {
-                       lua_pushstring (L, "Async redis pipelining is not implemented");
-                       lua_error (L);
-                       return 0;
+                       /* Async version */
+                       if (lua_type (L, 2) == LUA_TSTRING) {
+                               /* No callback version */
+                               cmd = lua_tostring (L, 2);
+                               args_pos = 3;
+                       }
+                       else if (lua_type (L, 2) == LUA_TFUNCTION) {
+                               lua_pushvalue (L, 2);
+                               cbref = luaL_ref (L, LUA_REGISTRYINDEX);
+                               cmd = lua_tostring (L, 3);
+                               args_pos = 4;
+                       }
+                       else {
+                               return luaL_error (L, "invalid arguments");
+                       }
+
+                       sp_ud = g_slice_alloc (sizeof (*sp_ud));
+                       sp_ud->cbref = cbref;
+                       sp_ud->c = &ctx->d.async;
+                       sp_ud->ctx = ctx;
+
+                       lua_redis_parse_args (L, args_pos, cmd, &sp_ud->args,
+                                               &sp_ud->nargs);
+
+                       LL_PREPEND (sp_ud->c->specific, sp_ud);
+
+                       ret = redisAsyncCommandArgv (sp_ud->c->ctx,
+                                       lua_redis_callback,
+                                       sp_ud,
+                                       sp_ud->nargs,
+                                       (const gchar **)sp_ud->args,
+                                       NULL);
+
+                       if (ret == REDIS_OK) {
+                               rspamd_session_add_event (sp_ud->c->task->s,
+                                               lua_redis_fin,
+                                               sp_ud,
+                                               g_quark_from_static_string ("lua redis"));
+
+                               double_to_tv (sp_ud->c->timeout, &tv);
+                               event_set (&sp_ud->timeout, -1, EV_TIMEOUT, lua_redis_timeout, sp_ud);
+                               event_base_set (sp_ud->c->task->ev_base, &sp_ud->timeout);
+                               event_add (&sp_ud->timeout, &tv);
+                               REF_RETAIN (ctx);
+                       }
+                       else {
+                               msg_info ("call to redis failed: %s", sp_ud->c->ctx->errstr);
+                               lua_pushboolean (L, 0);
+                               lua_pushstring (L, sp_ud->c->ctx->errstr);
+                               return 2;
+                       }
                }
                else {
+                       /* Synchronous version */
+                       if (lua_type (L, 2) == LUA_TSTRING) {
+                               cmd = lua_tostring (L, 2);
+                               args_pos = 3;
+                       }
+                       else {
+                               return luaL_error (L, "invalid arguments");
+                       }
+
                        if (ctx->d.sync) {
                                lua_redis_parse_args (L, args_pos, cmd, &args, &nargs);
 
@@ -947,15 +1069,13 @@ lua_redis_add_cmd (lua_State *L)
                                }
                                else {
                                        lua_pushstring (L, "cannot append commands when not connected");
-                                       lua_error (L);
-                                       return 0;
+                                       return lua_error (L);
                                }
 
                        }
                        else {
                                lua_pushstring (L, "cannot append commands when not connected");
-                               lua_error (L);
-                               return 0;
+                               return lua_error (L);
                        }
                }
        }