diff options
author | Mikhail Galanin <mgalanin@mimecast.com> | 2018-09-10 15:12:22 +0100 |
---|---|---|
committer | Mikhail Galanin <mgalanin@mimecast.com> | 2018-09-10 15:12:22 +0100 |
commit | 1fc19462b17aa958552f5aba0d3fbe9083b4bc19 (patch) | |
tree | 0c1bb5ca78ddac13b0b092103a56ab12dd40e8bd /src/lua/lua_redis.c | |
parent | 819de7eed4997268ebeb26809e2178f958f504f0 (diff) | |
download | rspamd-1fc19462b17aa958552f5aba0d3fbe9083b4bc19.tar.gz rspamd-1fc19462b17aa958552f5aba0d3fbe9083b4bc19.zip |
[Minor] Added coroutines to redis API
Diffstat (limited to 'src/lua/lua_redis.c')
-rw-r--r-- | src/lua/lua_redis.c | 564 |
1 files changed, 331 insertions, 233 deletions
diff --git a/src/lua/lua_redis.c b/src/lua/lua_redis.c index 0fc9c43b7..1c407b2fd 100644 --- a/src/lua/lua_redis.c +++ b/src/lua/lua_redis.c @@ -87,7 +87,7 @@ static const struct luaL_reg redislib_m[] = { #endif #ifdef WITH_HIREDIS -struct lua_redis_specific_userdata; +struct lua_redis_request_specific_userdata; /** * Struct for userdata representation */ @@ -100,19 +100,21 @@ struct lua_redis_userdata { struct rspamd_redis_pool *pool; gchar *server; gchar *reqline; - struct lua_redis_specific_userdata *specific; + struct lua_redis_request_specific_userdata *specific; gdouble timeout; guint16 port; guint16 terminated; }; #define LUA_REDIS_SPECIFIC_REPLIED (1 << 0) +/* session was finished */ #define LUA_REDIS_SPECIFIC_FINISHED (1 << 1) #define LUA_REDIS_ASYNC (1 << 0) #define LUA_REDIS_TEXTDATA (1 << 1) +#define LUA_REDIS_TERMINATED (1 << 2) #define IS_ASYNC(ctx) ((ctx)->flags & LUA_REDIS_ASYNC) -struct lua_redis_specific_userdata { +struct lua_redis_request_specific_userdata { gint cbref; guint nargs; gchar **args; @@ -120,19 +122,27 @@ struct lua_redis_specific_userdata { struct rspamd_async_watcher *w; struct lua_redis_userdata *c; struct lua_redis_ctx *ctx; - struct lua_redis_specific_userdata *next; + struct lua_redis_request_specific_userdata *next; struct event timeout; guint flags; }; struct lua_redis_ctx { guint flags; - union { - struct lua_redis_userdata async; - redisContext *sync; - } d; + struct lua_redis_userdata async; guint cmds_pending; ref_entry_t ref; + GQueue *replies; /* for sync connection only */ + GQueue *events_cleanup; /* for sync connection only */ + struct thread_entry *thread; /* for sync mode, set only if there was yield */ +}; + +struct lua_redis_result { + gboolean is_error; + gint result_ref; + struct rspamd_async_watcher *w; + struct rspamd_async_session *s; + struct lua_redis_request_specific_userdata *sp_ud; }; static struct lua_redis_ctx * @@ -162,46 +172,50 @@ static void lua_redis_dtor (struct lua_redis_ctx *ctx) { struct lua_redis_userdata *ud; - struct lua_redis_specific_userdata *cur, *tmp; + struct lua_redis_request_specific_userdata *cur, *tmp; gboolean is_successful = TRUE; struct redisAsyncContext *ac; - if (IS_ASYNC (ctx)) { - msg_debug ("desctructing %p", ctx); - ud = &ctx->d.async; - - if (ud->ctx) { + ud = &ctx->async; + msg_debug ("desctructing %p", ctx); - LL_FOREACH_SAFE (ud->specific, cur, tmp) { - event_del (&cur->timeout); + if (ud->ctx) { - if (!(cur->flags & LUA_REDIS_SPECIFIC_REPLIED)) { - is_successful = FALSE; - } + LL_FOREACH_SAFE (ud->specific, cur, tmp) { + event_del (&cur->timeout); - cur->flags |= LUA_REDIS_SPECIFIC_FINISHED; + if (!(cur->flags & LUA_REDIS_SPECIFIC_REPLIED)) { + is_successful = FALSE; } - ud->terminated = 1; - ac = ud->ctx; - ud->ctx = NULL; - rspamd_redis_pool_release_connection (ud->pool, ac, is_successful); + cur->flags |= LUA_REDIS_SPECIFIC_FINISHED; } - LL_FOREACH_SAFE (ud->specific, cur, tmp) { - lua_redis_free_args (cur->args, cur->arglens, cur->nargs); + ctx->flags |= LUA_REDIS_TERMINATED; - if (cur->cbref != -1) { - luaL_unref (ud->cfg->lua_state, LUA_REGISTRYINDEX, cur->cbref); - } + ud->terminated = 1; + ac = ud->ctx; + ud->ctx = NULL; + rspamd_redis_pool_release_connection (ud->pool, ac, is_successful); + } + + LL_FOREACH_SAFE (ud->specific, cur, tmp) { + lua_redis_free_args (cur->args, cur->arglens, cur->nargs); - g_free (cur); + if (cur->cbref != -1) { + luaL_unref (ud->cfg->lua_state, LUA_REGISTRYINDEX, cur->cbref); } + + g_free (cur); } - else { - if (ctx->d.sync) { - redisFree (ctx->d.sync); - } + + if (ctx->events_cleanup) { + g_queue_free (ctx->events_cleanup); + ctx->events_cleanup = NULL; + } + if (ctx->replies) { + g_queue_free (ctx->replies); + ctx->replies = NULL; } g_free (ctx); @@ -222,7 +236,7 @@ lua_redis_gc (lua_State *L) static void lua_redis_fin (void *arg) { - struct lua_redis_specific_userdata *sp_ud = arg; + struct lua_redis_request_specific_userdata *sp_ud = arg; struct lua_redis_ctx *ctx; ctx = sp_ud->ctx; @@ -241,7 +255,7 @@ lua_redis_fin (void *arg) static void lua_redis_push_error (const gchar *err, struct lua_redis_ctx *ctx, - struct lua_redis_specific_userdata *sp_ud, + struct lua_redis_request_specific_userdata *sp_ud, gboolean connected) { struct lua_redis_userdata *ud = sp_ud->c; @@ -327,7 +341,7 @@ lua_redis_push_reply (lua_State *L, const redisReply *r, gboolean text_data) */ static void lua_redis_push_data (const redisReply *r, struct lua_redis_ctx *ctx, - struct lua_redis_specific_userdata *sp_ud) + struct lua_redis_request_specific_userdata *sp_ud) { struct lua_redis_userdata *ud = sp_ud->c; struct lua_callback_state cbs; @@ -373,7 +387,7 @@ static void lua_redis_callback (redisAsyncContext *c, gpointer r, gpointer priv) { redisReply *reply = r; - struct lua_redis_specific_userdata *sp_ud = priv; + struct lua_redis_request_specific_userdata *sp_ud = priv; struct lua_redis_ctx *ctx; struct lua_redis_userdata *ud; redisAsyncContext *ac; @@ -431,10 +445,180 @@ lua_redis_callback (redisAsyncContext *c, gpointer r, gpointer priv) REDIS_RELEASE (ctx); } +static gint +lua_redis_push_results (struct lua_redis_ctx *ctx, lua_State *L) +{ + gint results = g_queue_get_length (ctx->replies); + gint i; + gboolean can_use_lua = TRUE; + + results = g_queue_get_length (ctx->replies); + + if (!lua_checkstack (L, (results * 2) + 1)) { + luaL_error (L, "cannot resize stack to fit %d commands", + ctx->cmds_pending); + + can_use_lua = FALSE; + } + + for (i = 0; i < results; i ++) { + struct lua_redis_result *result = g_queue_pop_head (ctx->replies); + + if (can_use_lua) { + lua_pushboolean (L, !result->is_error); + lua_rawgeti (L, LUA_REGISTRYINDEX, result->result_ref); + } + + luaL_unref (L, LUA_REGISTRYINDEX, result->result_ref); + + g_queue_push_tail (ctx->events_cleanup, result); + } + + return can_use_lua ? results * 2 : 0; +} + +static void +lua_redis_cleanup_events (struct lua_redis_ctx *ctx) +{ + while (!g_queue_is_empty (ctx->events_cleanup)) { + struct lua_redis_result *result = g_queue_pop_head (ctx->events_cleanup); + + rspamd_session_watcher_pop (result->s, result->w); + rspamd_session_remove_event (result->s, lua_redis_fin, result->sp_ud); + + g_free (result); + } +} + +/** + * Callback for redis replies + * @param c context of redis connection + * @param r redis reply + * @param priv userdata + */ +static void +lua_redis_callback_sync (redisAsyncContext *ac, gpointer r, gpointer priv) +{ + redisReply *reply = r; + + struct lua_redis_request_specific_userdata *sp_ud = priv; + struct lua_redis_ctx *ctx; + struct lua_redis_userdata *ud; + struct thread_entry* thread; + gint results; + + ctx = sp_ud->ctx; + ud = sp_ud->c; + lua_State *L = ctx->async.cfg->lua_state; + + sp_ud->flags |= LUA_REDIS_SPECIFIC_REPLIED; + + if (ud->terminated) { + /* We are already at the termination stage, just go out */ + /* TODO: + if somebody is waiting for us (ctx->thread), return result, + otherwise, indeed, ignore + */ + return; + } + + event_del (&sp_ud->timeout); + + msg_debug ("got reply from redis %p for query %p", ctx, sp_ud); + + struct lua_redis_result *result = g_malloc0 (sizeof *result); + + /* If session is finished, we cannot call lua callbacks */ + if (ac->err == 0) { + if (r != NULL) { + if (reply->type != REDIS_REPLY_ERROR) { + result->is_error = FALSE; + lua_redis_push_reply (L, reply, ctx->flags & LUA_REDIS_TEXTDATA); + } + else { + result->is_error = TRUE; + lua_pushstring (L, reply->str); + } + } + else { + result->is_error = TRUE; + lua_pushliteral (L, "received no data from server"); + } + } + else { + result->is_error = TRUE; + if (ac->err == REDIS_ERR_IO) { + lua_pushstring (L, strerror (errno)); + } + else { + lua_pushstring (L, ac->errstr); + } + } + /* if error happened, we should terminate the connection, + and release it */ + + if (result->is_error) { + /* Set to NULL to avoid double free in dtor */ + sp_ud->c->ctx = NULL; + ctx->flags |= LUA_REDIS_TERMINATED; + + /* + * This will call all callbacks pending so the entire context + * will be destructed + */ + rspamd_redis_pool_release_connection (sp_ud->c->pool, ac, TRUE); + } + + result->result_ref = luaL_ref (L, LUA_REGISTRYINDEX); + result->s = ud->s; + result->w = sp_ud->w; + result->sp_ud = sp_ud; + + g_queue_push_tail (ctx->replies, result); + + ctx->cmds_pending --; + + if (ctx->cmds_pending == 0) { + if (ctx->thread) { + /* somebody yielded and waits for results */ + thread = ctx->thread; + ctx->thread = NULL; + + results = lua_redis_push_results (ctx, thread->lua_state); + + lua_thread_resume (thread, results); + + lua_redis_cleanup_events (ctx); + } + } +} + +static void +lua_redis_timeout_sync (int fd, short what, gpointer priv) +{ + struct lua_redis_request_specific_userdata *sp_ud = priv; + struct lua_redis_ctx *ctx = sp_ud->ctx; + redisAsyncContext *ac; + + ac = sp_ud->c->ctx; + + /* Set to NULL to avoid double free in dtor */ + sp_ud->c->ctx = NULL; + ac->err = REDIS_ERR_IO; + errno = ETIMEDOUT; + ctx->flags |= LUA_REDIS_TERMINATED; + + /* + * This will call all callbacks pending so the entire context + * will be destructed + */ + rspamd_redis_pool_release_connection (sp_ud->c->pool, ac, TRUE); +} + static void lua_redis_timeout (int fd, short what, gpointer u) { - struct lua_redis_specific_userdata *sp_ud = u; + struct lua_redis_request_specific_userdata *sp_ud = u; struct lua_redis_ctx *ctx; redisAsyncContext *ac; @@ -562,7 +746,7 @@ lua_redis_parse_args (lua_State *L, gint idx, const gchar *cmd, } static struct lua_redis_ctx * -rspamd_lua_redis_prepare_connection (lua_State *L, gint *pcbref) +rspamd_lua_redis_prepare_connection (lua_State *L, gint *pcbref, gboolean is_async) { struct lua_redis_ctx *ctx; rspamd_inet_addr_t *ip = NULL; @@ -621,6 +805,12 @@ rspamd_lua_redis_prepare_connection (lua_State *L, gint *pcbref) if (cfg && ev_base) { ret = TRUE; } + else if (!cfg) { + msg_err_task_check ("config is not passed"); + } + else { + msg_err_task_check ("ev_base is not set"); + } } else { cfg = task->cfg; @@ -687,15 +877,23 @@ rspamd_lua_redis_prepare_connection (lua_State *L, gint *pcbref) lua_pop (L, 1); /* table */ if (session && rspamd_session_is_destroying (session)) { + msg_err_task_check ("Session is being destroying"); ret = FALSE; } - if (ret && addr != NULL) { ctx = g_malloc0 (sizeof (struct lua_redis_ctx)); REF_INIT_RETAIN (ctx, lua_redis_dtor); - ctx->flags |= flags | LUA_REDIS_ASYNC; - ud = &ctx->d.async; + if (is_async) { + ctx->flags |= flags | LUA_REDIS_ASYNC; + ud = &ctx->async; + } + else { + ud = &ctx->async; + ctx->replies = g_queue_new (); + ctx->events_cleanup = g_queue_new (); + + } ud->s = session; ud->cfg = cfg; ud->pool = cfg->redis_pool; @@ -767,7 +965,7 @@ static int lua_redis_make_request (lua_State *L) { LUA_TRACE_POINT; - struct lua_redis_specific_userdata *sp_ud; + struct lua_redis_request_specific_userdata *sp_ud; struct lua_redis_userdata *ud; struct lua_redis_ctx *ctx, **pctx; const gchar *cmd = NULL; @@ -776,10 +974,10 @@ lua_redis_make_request (lua_State *L) gint cbref = -1; gboolean ret = FALSE; - ctx = rspamd_lua_redis_prepare_connection (L, &cbref); + ctx = rspamd_lua_redis_prepare_connection (L, &cbref, TRUE); if (ctx) { - ud = &ctx->d.async; + ud = &ctx->async; sp_ud = g_malloc0 (sizeof (*sp_ud)); sp_ud->cbref = cbref; sp_ud->c = ud; @@ -1022,10 +1220,10 @@ lua_redis_connect (lua_State *L) struct lua_redis_ctx *ctx, **pctx; gdouble timeout = REDIS_DEFAULT_TIMEOUT; - ctx = rspamd_lua_redis_prepare_connection (L, NULL); + ctx = rspamd_lua_redis_prepare_connection (L, NULL, TRUE); if (ctx) { - ud = &ctx->d.async; + ud = &ctx->async; lua_pushstring (L, "timeout"); lua_gettable (L, 1); @@ -1061,33 +1259,14 @@ static int lua_redis_connect_sync (lua_State *L) { LUA_TRACE_POINT; - struct rspamd_lua_ip *addr = NULL; rspamd_inet_addr_t *ip = NULL; - const gchar *host; - struct timeval tv; - gboolean ret = FALSE; - guint flags = 0; gdouble timeout = REDIS_DEFAULT_TIMEOUT; struct lua_redis_ctx *ctx, **pctx; - if (lua_istable (L, 1)) { - lua_pushstring (L, "host"); - lua_gettable (L, -2); - if (lua_type (L, -1) == LUA_TUSERDATA) { - addr = lua_check_ip (L, -1); - } - else if (lua_type (L, -1) == LUA_TSTRING) { - host = lua_tostring (L, -1); - if (rspamd_parse_inet_address (&ip, host, strlen (host))) { - addr = g_alloca (sizeof (*addr)); - addr->addr = ip; + ctx = rspamd_lua_redis_prepare_connection (L, NULL, FALSE); - if (rspamd_inet_address_get_port (ip) == 0) { - rspamd_inet_address_set_port (ip, 6379); - } - } - } - lua_pop (L, 1); + + if (ctx) { lua_pushstring (L, "timeout"); lua_gettable (L, -2); @@ -1096,52 +1275,7 @@ lua_redis_connect_sync (lua_State *L) } lua_pop (L, 1); - lua_pushstring (L, "opaque_data"); - lua_gettable (L, -2); - if (!!lua_toboolean (L, -1)) { - flags |= LUA_REDIS_TEXTDATA; - } - lua_pop (L, 1); - - if (addr) { - ret = TRUE; - } - } - - if (ret) { - double_to_tv (timeout, &tv); - ctx = g_malloc0 (sizeof (struct lua_redis_ctx)); - REF_INIT_RETAIN (ctx, lua_redis_dtor); - ctx->flags = flags; - - if (rspamd_inet_address_get_af (addr->addr) == AF_UNIX) { - ctx->d.sync = redisConnectUnixWithTimeout ( - rspamd_inet_address_to_string (addr->addr), tv); - } - else { - ctx->d.sync = redisConnectWithTimeout ( - rspamd_inet_address_to_string (addr->addr), - rspamd_inet_address_get_port (addr->addr), tv); - } - - if (ip) { - rspamd_inet_address_free (ip); - } - - if (ctx->d.sync == NULL || ctx->d.sync->err) { - lua_pushboolean (L, FALSE); - - if (ctx->d.sync) { - lua_pushstring (L, ctx->d.sync->errstr); - } - else { - lua_pushstring (L, "unknown error"); - } - - REDIS_RELEASE (ctx); - - return 2; - } + ctx->async.timeout = timeout; lua_pushboolean (L, TRUE); pctx = lua_newuserdata (L, sizeof (ctx)); @@ -1174,118 +1308,107 @@ lua_redis_add_cmd (lua_State *L) { LUA_TRACE_POINT; struct lua_redis_ctx *ctx = lua_check_redis (L, 1); - struct lua_redis_specific_userdata *sp_ud; + struct lua_redis_request_specific_userdata *sp_ud; struct lua_redis_userdata *ud; const gchar *cmd = NULL; gint args_pos = 2; - gchar **args = NULL; - gsize *arglens = NULL; - guint nargs = 0; gint cbref = -1, ret; struct timeval tv; if (ctx) { + if (ctx->flags & LUA_REDIS_TERMINATED) { + lua_pushboolean (L, FALSE); + lua_pushstring (L, "Connection is terminated"); - if (IS_ASYNC (ctx)) { - ud = &ctx->d.async; + return 2; + } - /* Async version */ - if (lua_type (L, 2) == LUA_TSTRING) { - /* No callback version */ - cmd = lua_tostring (L, 2); - args_pos = 3; - } - else if (lua_type (L, 2) == LUA_TFUNCTION) { - lua_pushvalue (L, 2); - cbref = luaL_ref (L, LUA_REGISTRYINDEX); - cmd = lua_tostring (L, 3); - args_pos = 4; - } - else { - return luaL_error (L, "invalid arguments"); - } + /* Async version */ + if (lua_type (L, 2) == LUA_TSTRING) { + /* No callback version */ + cmd = lua_tostring (L, 2); + args_pos = 3; + } + else if (lua_type (L, 2) == LUA_TFUNCTION) { + lua_pushvalue (L, 2); + cbref = luaL_ref (L, LUA_REGISTRYINDEX); + cmd = lua_tostring (L, 3); + args_pos = 4; + } + else { + return luaL_error (L, "invalid arguments"); + } - sp_ud = g_malloc0 (sizeof (*sp_ud)); + sp_ud = g_malloc0 (sizeof (*sp_ud)); + if (IS_ASYNC (ctx)) { + sp_ud->c = &ctx->async; + ud = &ctx->async; sp_ud->cbref = cbref; - sp_ud->c = &ctx->d.async; - sp_ud->ctx = ctx; + } + else { + sp_ud->c = &ctx->async; + ud = &ctx->async; + } + sp_ud->ctx = ctx; - lua_redis_parse_args (L, args_pos, cmd, &sp_ud->args, - &sp_ud->arglens, &sp_ud->nargs); + lua_redis_parse_args (L, args_pos, cmd, &sp_ud->args, + &sp_ud->arglens, &sp_ud->nargs); - LL_PREPEND (sp_ud->c->specific, sp_ud); + LL_PREPEND (sp_ud->c->specific, sp_ud); - if (ud->s && rspamd_session_is_destroying (ud->s)) { - lua_pushboolean (L, 0); - lua_pushstring (L, "session is terminating"); + if (ud->s && rspamd_session_is_destroying (ud->s)) { + lua_pushboolean (L, 0); + lua_pushstring (L, "session is terminating"); - return 2; - } + return 2; + } + if (IS_ASYNC (ctx)) { ret = redisAsyncCommandArgv (sp_ud->c->ctx, lua_redis_callback, sp_ud, sp_ud->nargs, (const gchar **)sp_ud->args, sp_ud->arglens); + } + else { + ret = redisAsyncCommandArgv (sp_ud->c->ctx, + lua_redis_callback_sync, + sp_ud, + sp_ud->nargs, + (const gchar **)sp_ud->args, + sp_ud->arglens); + } - if (ret == REDIS_OK) { - if (ud->s) { - rspamd_session_add_event (ud->s, - lua_redis_fin, - sp_ud, - g_quark_from_static_string ("lua redis")); - sp_ud->w = rspamd_session_get_watcher (ud->s); - rspamd_session_watcher_push (ud->s); - } + if (ret == REDIS_OK) { + if (ud->s) { + rspamd_session_add_event (ud->s, + lua_redis_fin, + sp_ud, + g_quark_from_static_string ("lua redis")); + sp_ud->w = rspamd_session_get_watcher (ud->s); + rspamd_session_watcher_push (ud->s); + } - double_to_tv (sp_ud->c->timeout, &tv); + double_to_tv (sp_ud->c->timeout, &tv); + if (IS_ASYNC (ctx)) { event_set (&sp_ud->timeout, -1, EV_TIMEOUT, lua_redis_timeout, sp_ud); - event_base_set (ud->ev_base, &sp_ud->timeout); - event_add (&sp_ud->timeout, &tv); - REDIS_RETAIN (ctx); - ctx->cmds_pending ++; } else { - msg_info ("call to redis failed: %s", - sp_ud->c->ctx->errstr); - lua_pushboolean (L, 0); - lua_pushstring (L, sp_ud->c->ctx->errstr); - - return 2; + event_set (&sp_ud->timeout, -1, EV_TIMEOUT, lua_redis_timeout_sync, sp_ud); } + event_base_set (ud->ev_base, &sp_ud->timeout); + event_add (&sp_ud->timeout, &tv); + REDIS_RETAIN (ctx); + ctx->cmds_pending ++; } else { - /* Synchronous version */ - if (lua_type (L, 2) == LUA_TSTRING) { - cmd = lua_tostring (L, 2); - args_pos = 3; - } - else { - return luaL_error (L, "invalid arguments"); - } - - if (ctx->d.sync) { - lua_redis_parse_args (L, args_pos, cmd, &args, &arglens, &nargs); - - if (nargs > 0) { - if (redisAppendCommandArgv (ctx->d.sync, nargs, - (const char **)args, arglens) == REDIS_OK) { - ctx->cmds_pending ++; - } - - lua_redis_free_args (args, arglens, nargs); - } - else { - lua_pushstring (L, "cannot append commands when not connected"); - return lua_error (L); - } + msg_info ("call to redis failed: %s", + sp_ud->c->ctx->errstr); + lua_pushboolean (L, 0); + lua_pushstring (L, sp_ud->c->ctx->errstr); - } - else { - lua_pushstring (L, "cannot append commands when not connected"); - return lua_error (L); - } + return 2; } } @@ -1304,9 +1427,6 @@ lua_redis_exec (lua_State *L) { LUA_TRACE_POINT; struct lua_redis_ctx *ctx = lua_check_redis (L, 1); - redisReply *r; - gint ret; - guint i, nret = 0, pending; if (ctx == NULL) { lua_error (L); @@ -1320,48 +1440,26 @@ lua_redis_exec (lua_State *L) return 0; } else { - if (!ctx->d.sync) { + if (false /* !ctx->d.sync */) { lua_pushstring (L, "cannot exec commands when not connected"); lua_error (L); return 0; } else { - if (!lua_checkstack (L, (ctx->cmds_pending * 2) + 1)) { - return luaL_error (L, "cannot resize stack to fit %d commands", - ctx->cmds_pending); + if (ctx->cmds_pending == 0 && g_queue_get_length (ctx->replies) == 0) { + lua_pushstring (L, "No pending commands to execute"); + lua_error (L); } - - pending = ctx->cmds_pending; - ctx->cmds_pending = 0; - - for (i = 0; i < pending; i ++) { - ret = redisGetReply (ctx->d.sync, (void **)&r); - - if (ret == REDIS_OK) { - if (r->type != REDIS_REPLY_ERROR) { - lua_pushboolean (L, TRUE); - lua_redis_push_reply (L, r, - ctx->flags & LUA_REDIS_TEXTDATA); - } - else { - lua_pushboolean (L, FALSE); - lua_pushlstring (L, r->str, r->len); - } - - freeReplyObject (r); - } - else { - msg_info ("call to redis failed: %s", ctx->d.sync->errstr); - lua_pushboolean (L, FALSE); - lua_pushstring (L, ctx->d.sync->errstr); - } - - nret += 2; + if (ctx->cmds_pending == 0 && g_queue_get_length (ctx->replies) > 0) { + gint results = lua_redis_push_results (ctx, L); + return results; + } + else { + ctx->thread = lua_thread_pool_get_running_entry (ctx->async.cfg->lua_thread_pool); + return lua_thread_yield (ctx->thread, 0); } } } - - return nret; } #else static int |