]> source.dussan.org Git - rspamd.git/commitdiff
[Minor] Added coroutines to redis API
authorMikhail Galanin <mgalanin@mimecast.com>
Mon, 10 Sep 2018 14:12:22 +0000 (15:12 +0100)
committerMikhail Galanin <mgalanin@mimecast.com>
Mon, 10 Sep 2018 14:12:22 +0000 (15:12 +0100)
src/lua/lua_redis.c

index 0fc9c43b7e4f3266bae57a2dea0fb3233b8bf886..1c407b2fd72a1f8b911c8cff6c0794c73311511d 100644 (file)
@@ -87,7 +87,7 @@ static const struct luaL_reg redislib_m[] = {
 #endif
 
 #ifdef WITH_HIREDIS
-struct lua_redis_specific_userdata;
+struct lua_redis_request_specific_userdata;
 /**
  * Struct for userdata representation
  */
@@ -100,19 +100,21 @@ struct lua_redis_userdata {
        struct rspamd_redis_pool *pool;
        gchar *server;
        gchar *reqline;
-       struct lua_redis_specific_userdata *specific;
+       struct lua_redis_request_specific_userdata *specific;
        gdouble timeout;
        guint16 port;
        guint16 terminated;
 };
 
 #define LUA_REDIS_SPECIFIC_REPLIED (1 << 0)
+/* session was finished */
 #define LUA_REDIS_SPECIFIC_FINISHED (1 << 1)
 #define LUA_REDIS_ASYNC (1 << 0)
 #define LUA_REDIS_TEXTDATA (1 << 1)
+#define LUA_REDIS_TERMINATED (1 << 2)
 #define IS_ASYNC(ctx) ((ctx)->flags & LUA_REDIS_ASYNC)
 
-struct lua_redis_specific_userdata {
+struct lua_redis_request_specific_userdata {
        gint cbref;
        guint nargs;
        gchar **args;
@@ -120,19 +122,27 @@ struct lua_redis_specific_userdata {
        struct rspamd_async_watcher *w;
        struct lua_redis_userdata *c;
        struct lua_redis_ctx *ctx;
-       struct lua_redis_specific_userdata *next;
+       struct lua_redis_request_specific_userdata *next;
        struct event timeout;
        guint flags;
 };
 
 struct lua_redis_ctx {
        guint flags;
-       union {
-               struct lua_redis_userdata async;
-               redisContext *sync;
-       } d;
+       struct lua_redis_userdata async;
        guint cmds_pending;
        ref_entry_t ref;
+       GQueue *replies; /* for sync connection only */
+       GQueue *events_cleanup; /* for sync connection only */
+       struct thread_entry *thread; /* for sync mode, set only if there was yield */
+};
+
+struct lua_redis_result {
+       gboolean is_error;
+       gint result_ref;
+       struct rspamd_async_watcher *w;
+       struct rspamd_async_session *s;
+       struct lua_redis_request_specific_userdata *sp_ud;
 };
 
 static struct lua_redis_ctx *
@@ -162,46 +172,50 @@ static void
 lua_redis_dtor (struct lua_redis_ctx *ctx)
 {
        struct lua_redis_userdata *ud;
-       struct lua_redis_specific_userdata *cur, *tmp;
+       struct lua_redis_request_specific_userdata *cur, *tmp;
        gboolean is_successful = TRUE;
        struct redisAsyncContext *ac;
 
-       if (IS_ASYNC (ctx)) {
-               msg_debug ("desctructing %p", ctx);
-               ud = &ctx->d.async;
-
-               if (ud->ctx) {
+       ud = &ctx->async;
+       msg_debug ("desctructing %p", ctx);
 
-                       LL_FOREACH_SAFE (ud->specific, cur, tmp) {
-                               event_del (&cur->timeout);
+       if (ud->ctx) {
 
-                               if (!(cur->flags & LUA_REDIS_SPECIFIC_REPLIED)) {
-                                       is_successful = FALSE;
-                               }
+               LL_FOREACH_SAFE (ud->specific, cur, tmp) {
+                       event_del (&cur->timeout);
 
-                               cur->flags |= LUA_REDIS_SPECIFIC_FINISHED;
+                       if (!(cur->flags & LUA_REDIS_SPECIFIC_REPLIED)) {
+                               is_successful = FALSE;
                        }
 
-                       ud->terminated = 1;
-                       ac = ud->ctx;
-                       ud->ctx = NULL;
-                       rspamd_redis_pool_release_connection (ud->pool, ac, is_successful);
+                       cur->flags |= LUA_REDIS_SPECIFIC_FINISHED;
                }
 
-               LL_FOREACH_SAFE (ud->specific, cur, tmp) {
-                       lua_redis_free_args (cur->args, cur->arglens, cur->nargs);
+               ctx->flags |= LUA_REDIS_TERMINATED;
 
-                       if (cur->cbref != -1) {
-                               luaL_unref (ud->cfg->lua_state, LUA_REGISTRYINDEX, cur->cbref);
-                       }
+               ud->terminated = 1;
+               ac = ud->ctx;
+               ud->ctx = NULL;
+               rspamd_redis_pool_release_connection (ud->pool, ac, is_successful);
+       }
+
+       LL_FOREACH_SAFE (ud->specific, cur, tmp) {
+               lua_redis_free_args (cur->args, cur->arglens, cur->nargs);
 
-                       g_free (cur);
+               if (cur->cbref != -1) {
+                       luaL_unref (ud->cfg->lua_state, LUA_REGISTRYINDEX, cur->cbref);
                }
+
+               g_free (cur);
        }
-       else {
-               if (ctx->d.sync) {
-                       redisFree (ctx->d.sync);
-               }
+
+       if (ctx->events_cleanup) {
+               g_queue_free (ctx->events_cleanup);
+               ctx->events_cleanup = NULL;
+       }
+       if (ctx->replies) {
+               g_queue_free (ctx->replies);
+               ctx->replies = NULL;
        }
 
        g_free (ctx);
@@ -222,7 +236,7 @@ lua_redis_gc (lua_State *L)
 static void
 lua_redis_fin (void *arg)
 {
-       struct lua_redis_specific_userdata *sp_ud = arg;
+       struct lua_redis_request_specific_userdata *sp_ud = arg;
        struct lua_redis_ctx *ctx;
 
        ctx = sp_ud->ctx;
@@ -241,7 +255,7 @@ lua_redis_fin (void *arg)
 static void
 lua_redis_push_error (const gchar *err,
        struct lua_redis_ctx *ctx,
-       struct lua_redis_specific_userdata *sp_ud,
+       struct lua_redis_request_specific_userdata *sp_ud,
        gboolean connected)
 {
        struct lua_redis_userdata *ud = sp_ud->c;
@@ -327,7 +341,7 @@ lua_redis_push_reply (lua_State *L, const redisReply *r, gboolean text_data)
  */
 static void
 lua_redis_push_data (const redisReply *r, struct lua_redis_ctx *ctx,
-               struct lua_redis_specific_userdata *sp_ud)
+               struct lua_redis_request_specific_userdata *sp_ud)
 {
        struct lua_redis_userdata *ud = sp_ud->c;
        struct lua_callback_state cbs;
@@ -373,7 +387,7 @@ static void
 lua_redis_callback (redisAsyncContext *c, gpointer r, gpointer priv)
 {
        redisReply *reply = r;
-       struct lua_redis_specific_userdata *sp_ud = priv;
+       struct lua_redis_request_specific_userdata *sp_ud = priv;
        struct lua_redis_ctx *ctx;
        struct lua_redis_userdata *ud;
        redisAsyncContext *ac;
@@ -431,10 +445,180 @@ lua_redis_callback (redisAsyncContext *c, gpointer r, gpointer priv)
        REDIS_RELEASE (ctx);
 }
 
+static gint
+lua_redis_push_results (struct lua_redis_ctx *ctx, lua_State *L)
+{
+       gint results = g_queue_get_length (ctx->replies);
+       gint i;
+       gboolean can_use_lua = TRUE;
+
+       results = g_queue_get_length (ctx->replies);
+
+       if (!lua_checkstack (L, (results * 2) + 1)) {
+               luaL_error (L, "cannot resize stack to fit %d commands",
+                               ctx->cmds_pending);
+
+               can_use_lua = FALSE;
+       }
+
+       for (i = 0; i < results; i ++) {
+               struct lua_redis_result *result = g_queue_pop_head (ctx->replies);
+
+               if (can_use_lua) {
+                       lua_pushboolean (L, !result->is_error);
+                       lua_rawgeti (L, LUA_REGISTRYINDEX, result->result_ref);
+               }
+
+               luaL_unref (L, LUA_REGISTRYINDEX, result->result_ref);
+
+               g_queue_push_tail (ctx->events_cleanup, result);
+       }
+
+       return can_use_lua ? results * 2 : 0;
+}
+
+static void
+lua_redis_cleanup_events (struct lua_redis_ctx *ctx)
+{
+       while (!g_queue_is_empty (ctx->events_cleanup)) {
+               struct lua_redis_result *result = g_queue_pop_head (ctx->events_cleanup);
+
+               rspamd_session_watcher_pop (result->s, result->w);
+               rspamd_session_remove_event (result->s, lua_redis_fin, result->sp_ud);
+
+               g_free (result);
+       }
+}
+
+/**
+ * Callback for redis replies
+ * @param c context of redis connection
+ * @param r redis reply
+ * @param priv userdata
+ */
+static void
+lua_redis_callback_sync (redisAsyncContext *ac, gpointer r, gpointer priv)
+{
+       redisReply *reply = r;
+
+       struct lua_redis_request_specific_userdata *sp_ud = priv;
+       struct lua_redis_ctx *ctx;
+       struct lua_redis_userdata *ud;
+       struct thread_entry* thread;
+       gint results;
+
+       ctx = sp_ud->ctx;
+       ud = sp_ud->c;
+       lua_State *L = ctx->async.cfg->lua_state;
+
+       sp_ud->flags |= LUA_REDIS_SPECIFIC_REPLIED;
+
+       if (ud->terminated) {
+               /* We are already at the termination stage, just go out */
+               /* TODO:
+                  if somebody is waiting for us (ctx->thread), return result,
+                  otherwise, indeed, ignore
+                */
+               return;
+       }
+
+       event_del (&sp_ud->timeout);
+
+       msg_debug ("got reply from redis %p for query %p", ctx, sp_ud);
+
+       struct lua_redis_result *result = g_malloc0 (sizeof *result);
+
+       /* If session is finished, we cannot call lua callbacks */
+       if (ac->err == 0) {
+               if (r != NULL) {
+                       if (reply->type != REDIS_REPLY_ERROR) {
+                               result->is_error = FALSE;
+                               lua_redis_push_reply (L, reply, ctx->flags & LUA_REDIS_TEXTDATA);
+                       }
+                       else {
+                               result->is_error = TRUE;
+                               lua_pushstring (L, reply->str);
+                       }
+               }
+               else {
+                       result->is_error = TRUE;
+                       lua_pushliteral (L, "received no data from server");
+               }
+       }
+       else {
+               result->is_error = TRUE;
+               if (ac->err == REDIS_ERR_IO) {
+                       lua_pushstring (L, strerror (errno));
+               }
+               else {
+                       lua_pushstring (L, ac->errstr);
+               }
+       }
+       /* if error happened, we should terminate the connection,
+          and release it */
+
+       if (result->is_error) {
+               /* Set to NULL to avoid double free in dtor */
+               sp_ud->c->ctx = NULL;
+               ctx->flags |= LUA_REDIS_TERMINATED;
+
+               /*
+                * This will call all callbacks pending so the entire context
+                * will be destructed
+                */
+               rspamd_redis_pool_release_connection (sp_ud->c->pool, ac, TRUE);
+       }
+
+       result->result_ref = luaL_ref (L, LUA_REGISTRYINDEX);
+       result->s = ud->s;
+       result->w = sp_ud->w;
+       result->sp_ud = sp_ud;
+
+       g_queue_push_tail (ctx->replies, result);
+
+       ctx->cmds_pending --;
+
+       if (ctx->cmds_pending == 0) {
+               if (ctx->thread) {
+                       /* somebody yielded and waits for results */
+                       thread = ctx->thread;
+                       ctx->thread = NULL;
+
+                       results = lua_redis_push_results (ctx, thread->lua_state);
+
+                       lua_thread_resume (thread, results);
+
+                       lua_redis_cleanup_events (ctx);
+               }
+       }
+}
+
+static void
+lua_redis_timeout_sync (int fd, short what, gpointer priv)
+{
+       struct lua_redis_request_specific_userdata *sp_ud = priv;
+       struct lua_redis_ctx *ctx = sp_ud->ctx;
+       redisAsyncContext *ac;
+
+       ac = sp_ud->c->ctx;
+
+       /* Set to NULL to avoid double free in dtor */
+       sp_ud->c->ctx = NULL;
+       ac->err = REDIS_ERR_IO;
+       errno = ETIMEDOUT;
+       ctx->flags |= LUA_REDIS_TERMINATED;
+
+       /*
+        * This will call all callbacks pending so the entire context
+        * will be destructed
+        */
+       rspamd_redis_pool_release_connection (sp_ud->c->pool, ac, TRUE);
+}
+
 static void
 lua_redis_timeout (int fd, short what, gpointer u)
 {
-       struct lua_redis_specific_userdata *sp_ud = u;
+       struct lua_redis_request_specific_userdata *sp_ud = u;
        struct lua_redis_ctx *ctx;
        redisAsyncContext *ac;
 
@@ -562,7 +746,7 @@ lua_redis_parse_args (lua_State *L, gint idx, const gchar *cmd,
 }
 
 static struct lua_redis_ctx *
-rspamd_lua_redis_prepare_connection (lua_State *L, gint *pcbref)
+rspamd_lua_redis_prepare_connection (lua_State *L, gint *pcbref, gboolean is_async)
 {
        struct lua_redis_ctx *ctx;
        rspamd_inet_addr_t *ip = NULL;
@@ -621,6 +805,12 @@ rspamd_lua_redis_prepare_connection (lua_State *L, gint *pcbref)
                        if (cfg && ev_base) {
                                ret = TRUE;
                        }
+                       else if (!cfg) {
+                               msg_err_task_check ("config is not passed");
+                       }
+                       else {
+                               msg_err_task_check ("ev_base is not set");
+                       }
                }
                else {
                        cfg = task->cfg;
@@ -687,15 +877,23 @@ rspamd_lua_redis_prepare_connection (lua_State *L, gint *pcbref)
                lua_pop (L, 1); /* table */
 
                if (session && rspamd_session_is_destroying (session)) {
+                       msg_err_task_check ("Session is being destroying");
                        ret = FALSE;
                }
 
-
                if (ret && addr != NULL) {
                        ctx = g_malloc0 (sizeof (struct lua_redis_ctx));
                        REF_INIT_RETAIN (ctx, lua_redis_dtor);
-                       ctx->flags |= flags | LUA_REDIS_ASYNC;
-                       ud = &ctx->d.async;
+                       if (is_async) {
+                               ctx->flags |= flags | LUA_REDIS_ASYNC;
+                               ud = &ctx->async;
+                       }
+                       else {
+                               ud = &ctx->async;
+                               ctx->replies = g_queue_new ();
+                               ctx->events_cleanup = g_queue_new ();
+
+                       }
                        ud->s = session;
                        ud->cfg = cfg;
                        ud->pool = cfg->redis_pool;
@@ -767,7 +965,7 @@ static int
 lua_redis_make_request (lua_State *L)
 {
        LUA_TRACE_POINT;
-       struct lua_redis_specific_userdata *sp_ud;
+       struct lua_redis_request_specific_userdata *sp_ud;
        struct lua_redis_userdata *ud;
        struct lua_redis_ctx *ctx, **pctx;
        const gchar *cmd = NULL;
@@ -776,10 +974,10 @@ lua_redis_make_request (lua_State *L)
        gint cbref = -1;
        gboolean ret = FALSE;
 
-       ctx = rspamd_lua_redis_prepare_connection (L, &cbref);
+       ctx = rspamd_lua_redis_prepare_connection (L, &cbref, TRUE);
 
        if (ctx) {
-               ud = &ctx->d.async;
+               ud = &ctx->async;
                sp_ud = g_malloc0 (sizeof (*sp_ud));
                sp_ud->cbref = cbref;
                sp_ud->c = ud;
@@ -1022,10 +1220,10 @@ lua_redis_connect (lua_State *L)
        struct lua_redis_ctx *ctx, **pctx;
        gdouble timeout = REDIS_DEFAULT_TIMEOUT;
 
-       ctx = rspamd_lua_redis_prepare_connection (L, NULL);
+       ctx = rspamd_lua_redis_prepare_connection (L, NULL, TRUE);
 
        if (ctx) {
-               ud = &ctx->d.async;
+               ud = &ctx->async;
 
                lua_pushstring (L, "timeout");
                lua_gettable (L, 1);
@@ -1061,33 +1259,14 @@ static int
 lua_redis_connect_sync (lua_State *L)
 {
        LUA_TRACE_POINT;
-       struct rspamd_lua_ip *addr = NULL;
        rspamd_inet_addr_t *ip = NULL;
-       const gchar *host;
-       struct timeval tv;
-       gboolean ret = FALSE;
-       guint flags = 0;
        gdouble timeout = REDIS_DEFAULT_TIMEOUT;
        struct lua_redis_ctx *ctx, **pctx;
 
-       if (lua_istable (L, 1)) {
-               lua_pushstring (L, "host");
-               lua_gettable (L, -2);
-               if (lua_type (L, -1) == LUA_TUSERDATA) {
-                       addr = lua_check_ip (L, -1);
-               }
-               else if (lua_type (L, -1) == LUA_TSTRING) {
-                       host = lua_tostring (L, -1);
-                       if (rspamd_parse_inet_address (&ip, host, strlen (host))) {
-                               addr = g_alloca (sizeof (*addr));
-                               addr->addr = ip;
+       ctx = rspamd_lua_redis_prepare_connection (L, NULL, FALSE);
 
-                               if (rspamd_inet_address_get_port (ip) == 0) {
-                                       rspamd_inet_address_set_port (ip, 6379);
-                               }
-                       }
-               }
-               lua_pop (L, 1);
+
+       if (ctx) {
 
                lua_pushstring (L, "timeout");
                lua_gettable (L, -2);
@@ -1096,52 +1275,7 @@ lua_redis_connect_sync (lua_State *L)
                }
                lua_pop (L, 1);
 
-               lua_pushstring (L, "opaque_data");
-               lua_gettable (L, -2);
-               if (!!lua_toboolean (L, -1)) {
-                       flags |= LUA_REDIS_TEXTDATA;
-               }
-               lua_pop (L, 1);
-
-               if (addr) {
-                       ret = TRUE;
-               }
-       }
-
-       if (ret) {
-               double_to_tv (timeout, &tv);
-               ctx = g_malloc0 (sizeof (struct lua_redis_ctx));
-               REF_INIT_RETAIN (ctx, lua_redis_dtor);
-               ctx->flags = flags;
-
-               if (rspamd_inet_address_get_af (addr->addr) == AF_UNIX) {
-                       ctx->d.sync = redisConnectUnixWithTimeout (
-                                       rspamd_inet_address_to_string (addr->addr), tv);
-               }
-               else {
-                       ctx->d.sync = redisConnectWithTimeout (
-                                       rspamd_inet_address_to_string (addr->addr),
-                                       rspamd_inet_address_get_port (addr->addr), tv);
-               }
-
-               if (ip) {
-                       rspamd_inet_address_free (ip);
-               }
-
-               if (ctx->d.sync == NULL || ctx->d.sync->err) {
-                       lua_pushboolean (L, FALSE);
-
-                       if (ctx->d.sync) {
-                               lua_pushstring (L, ctx->d.sync->errstr);
-                       }
-                       else {
-                               lua_pushstring (L, "unknown error");
-                       }
-
-                       REDIS_RELEASE (ctx);
-
-                       return 2;
-               }
+               ctx->async.timeout = timeout;
 
                lua_pushboolean (L, TRUE);
                pctx = lua_newuserdata (L, sizeof (ctx));
@@ -1174,118 +1308,107 @@ lua_redis_add_cmd (lua_State *L)
 {
        LUA_TRACE_POINT;
        struct lua_redis_ctx *ctx = lua_check_redis (L, 1);
-       struct lua_redis_specific_userdata *sp_ud;
+       struct lua_redis_request_specific_userdata *sp_ud;
        struct lua_redis_userdata *ud;
        const gchar *cmd = NULL;
        gint args_pos = 2;
-       gchar **args = NULL;
-       gsize *arglens = NULL;
-       guint nargs = 0;
        gint cbref = -1, ret;
        struct timeval tv;
 
        if (ctx) {
+               if (ctx->flags & LUA_REDIS_TERMINATED) {
+                       lua_pushboolean (L, FALSE);
+                       lua_pushstring (L, "Connection is terminated");
 
-               if (IS_ASYNC (ctx)) {
-                       ud = &ctx->d.async;
+                       return 2;
+               }
 
-                       /* Async version */
-                       if (lua_type (L, 2) == LUA_TSTRING) {
-                               /* No callback version */
-                               cmd = lua_tostring (L, 2);
-                               args_pos = 3;
-                       }
-                       else if (lua_type (L, 2) == LUA_TFUNCTION) {
-                               lua_pushvalue (L, 2);
-                               cbref = luaL_ref (L, LUA_REGISTRYINDEX);
-                               cmd = lua_tostring (L, 3);
-                               args_pos = 4;
-                       }
-                       else {
-                               return luaL_error (L, "invalid arguments");
-                       }
+               /* Async version */
+               if (lua_type (L, 2) == LUA_TSTRING) {
+                       /* No callback version */
+                       cmd = lua_tostring (L, 2);
+                       args_pos = 3;
+               }
+               else if (lua_type (L, 2) == LUA_TFUNCTION) {
+                       lua_pushvalue (L, 2);
+                       cbref = luaL_ref (L, LUA_REGISTRYINDEX);
+                       cmd = lua_tostring (L, 3);
+                       args_pos = 4;
+               }
+               else {
+                       return luaL_error (L, "invalid arguments");
+               }
 
-                       sp_ud = g_malloc0 (sizeof (*sp_ud));
+               sp_ud = g_malloc0 (sizeof (*sp_ud));
+               if (IS_ASYNC (ctx)) {
+                       sp_ud->c = &ctx->async;
+                       ud = &ctx->async;
                        sp_ud->cbref = cbref;
-                       sp_ud->c = &ctx->d.async;
-                       sp_ud->ctx = ctx;
+               }
+               else {
+                       sp_ud->c = &ctx->async;
+                       ud = &ctx->async;
+               }
+               sp_ud->ctx = ctx;
 
-                       lua_redis_parse_args (L, args_pos, cmd, &sp_ud->args,
-                                               &sp_ud->arglens, &sp_ud->nargs);
+               lua_redis_parse_args (L, args_pos, cmd, &sp_ud->args,
+                                       &sp_ud->arglens, &sp_ud->nargs);
 
-                       LL_PREPEND (sp_ud->c->specific, sp_ud);
+               LL_PREPEND (sp_ud->c->specific, sp_ud);
 
-                       if (ud->s && rspamd_session_is_destroying (ud->s)) {
-                               lua_pushboolean (L, 0);
-                               lua_pushstring (L, "session is terminating");
+               if (ud->s && rspamd_session_is_destroying (ud->s)) {
+                       lua_pushboolean (L, 0);
+                       lua_pushstring (L, "session is terminating");
 
-                               return 2;
-                       }
+                       return 2;
+               }
 
+               if (IS_ASYNC (ctx)) {
                        ret = redisAsyncCommandArgv (sp_ud->c->ctx,
                                        lua_redis_callback,
                                        sp_ud,
                                        sp_ud->nargs,
                                        (const gchar **)sp_ud->args,
                                        sp_ud->arglens);
+               }
+               else {
+                       ret = redisAsyncCommandArgv (sp_ud->c->ctx,
+                                       lua_redis_callback_sync,
+                                       sp_ud,
+                                       sp_ud->nargs,
+                                       (const gchar **)sp_ud->args,
+                                       sp_ud->arglens);
+               }
 
-                       if (ret == REDIS_OK) {
-                               if (ud->s) {
-                                       rspamd_session_add_event (ud->s,
-                                                       lua_redis_fin,
-                                                       sp_ud,
-                                                       g_quark_from_static_string ("lua redis"));
-                                       sp_ud->w = rspamd_session_get_watcher (ud->s);
-                                       rspamd_session_watcher_push (ud->s);
-                               }
+               if (ret == REDIS_OK) {
+                       if (ud->s) {
+                               rspamd_session_add_event (ud->s,
+                                               lua_redis_fin,
+                                               sp_ud,
+                                               g_quark_from_static_string ("lua redis"));
+                               sp_ud->w = rspamd_session_get_watcher (ud->s);
+                               rspamd_session_watcher_push (ud->s);
+                       }
 
-                               double_to_tv (sp_ud->c->timeout, &tv);
+                       double_to_tv (sp_ud->c->timeout, &tv);
+                       if (IS_ASYNC (ctx)) {
                                event_set (&sp_ud->timeout, -1, EV_TIMEOUT, lua_redis_timeout, sp_ud);
-                               event_base_set (ud->ev_base, &sp_ud->timeout);
-                               event_add (&sp_ud->timeout, &tv);
-                               REDIS_RETAIN (ctx);
-                               ctx->cmds_pending ++;
                        }
                        else {
-                               msg_info ("call to redis failed: %s",
-                                               sp_ud->c->ctx->errstr);
-                               lua_pushboolean (L, 0);
-                               lua_pushstring (L, sp_ud->c->ctx->errstr);
-
-                               return 2;
+                               event_set (&sp_ud->timeout, -1, EV_TIMEOUT, lua_redis_timeout_sync, sp_ud);
                        }
+                       event_base_set (ud->ev_base, &sp_ud->timeout);
+                       event_add (&sp_ud->timeout, &tv);
+                       REDIS_RETAIN (ctx);
+                       ctx->cmds_pending ++;
                }
                else {
-                       /* Synchronous version */
-                       if (lua_type (L, 2) == LUA_TSTRING) {
-                               cmd = lua_tostring (L, 2);
-                               args_pos = 3;
-                       }
-                       else {
-                               return luaL_error (L, "invalid arguments");
-                       }
-
-                       if (ctx->d.sync) {
-                               lua_redis_parse_args (L, args_pos, cmd, &args, &arglens, &nargs);
-
-                               if (nargs > 0) {
-                                       if (redisAppendCommandArgv (ctx->d.sync, nargs,
-                                                       (const char **)args, arglens) == REDIS_OK) {
-                                               ctx->cmds_pending ++;
-                                       }
-
-                                       lua_redis_free_args (args, arglens, nargs);
-                               }
-                               else {
-                                       lua_pushstring (L, "cannot append commands when not connected");
-                                       return lua_error (L);
-                               }
+                       msg_info ("call to redis failed: %s",
+                                       sp_ud->c->ctx->errstr);
+                       lua_pushboolean (L, 0);
+                       lua_pushstring (L, sp_ud->c->ctx->errstr);
 
-                       }
-                       else {
-                               lua_pushstring (L, "cannot append commands when not connected");
-                               return lua_error (L);
-                       }
+                       return 2;
                }
        }
 
@@ -1304,9 +1427,6 @@ lua_redis_exec (lua_State *L)
 {
        LUA_TRACE_POINT;
        struct lua_redis_ctx *ctx = lua_check_redis (L, 1);
-       redisReply *r;
-       gint ret;
-       guint i, nret = 0, pending;
 
        if (ctx == NULL) {
                lua_error (L);
@@ -1320,48 +1440,26 @@ lua_redis_exec (lua_State *L)
                return 0;
        }
        else {
-               if (!ctx->d.sync) {
+               if (false /* !ctx->d.sync */) {
                        lua_pushstring (L, "cannot exec commands when not connected");
                        lua_error (L);
                        return 0;
                }
                else {
-                       if (!lua_checkstack (L, (ctx->cmds_pending * 2) + 1)) {
-                               return luaL_error (L, "cannot resize stack to fit %d commands",
-                                       ctx->cmds_pending);
+                       if (ctx->cmds_pending == 0 && g_queue_get_length (ctx->replies) == 0) {
+                               lua_pushstring (L, "No pending commands to execute");
+                               lua_error (L);
                        }
-
-                       pending = ctx->cmds_pending;
-                       ctx->cmds_pending = 0;
-
-                       for (i = 0; i < pending; i ++) {
-                               ret = redisGetReply (ctx->d.sync, (void **)&r);
-
-                               if (ret == REDIS_OK) {
-                                       if (r->type != REDIS_REPLY_ERROR) {
-                                               lua_pushboolean (L, TRUE);
-                                               lua_redis_push_reply (L, r,
-                                                               ctx->flags & LUA_REDIS_TEXTDATA);
-                                       }
-                                       else {
-                                               lua_pushboolean (L, FALSE);
-                                               lua_pushlstring (L, r->str, r->len);
-                                       }
-
-                                       freeReplyObject (r);
-                               }
-                               else {
-                                       msg_info ("call to redis failed: %s", ctx->d.sync->errstr);
-                                       lua_pushboolean (L, FALSE);
-                                       lua_pushstring (L, ctx->d.sync->errstr);
-                               }
-
-                               nret += 2;
+                       if (ctx->cmds_pending == 0 && g_queue_get_length (ctx->replies) > 0) {
+                               gint results = lua_redis_push_results (ctx, L);
+                               return results;
+                       }
+                       else {
+                               ctx->thread = lua_thread_pool_get_running_entry (ctx->async.cfg->lua_thread_pool);
+                               return lua_thread_yield (ctx->thread, 0);
                        }
                }
        }
-
-       return nret;
 }
 #else
 static int