aboutsummaryrefslogtreecommitdiffstats
path: root/src/lua/lua_redis.c
diff options
context:
space:
mode:
authorMikhail Galanin <mgalanin@mimecast.com>2018-09-10 15:12:22 +0100
committerMikhail Galanin <mgalanin@mimecast.com>2018-09-10 15:12:22 +0100
commit1fc19462b17aa958552f5aba0d3fbe9083b4bc19 (patch)
tree0c1bb5ca78ddac13b0b092103a56ab12dd40e8bd /src/lua/lua_redis.c
parent819de7eed4997268ebeb26809e2178f958f504f0 (diff)
downloadrspamd-1fc19462b17aa958552f5aba0d3fbe9083b4bc19.tar.gz
rspamd-1fc19462b17aa958552f5aba0d3fbe9083b4bc19.zip
[Minor] Added coroutines to redis API
Diffstat (limited to 'src/lua/lua_redis.c')
-rw-r--r--src/lua/lua_redis.c564
1 files changed, 331 insertions, 233 deletions
diff --git a/src/lua/lua_redis.c b/src/lua/lua_redis.c
index 0fc9c43b7..1c407b2fd 100644
--- a/src/lua/lua_redis.c
+++ b/src/lua/lua_redis.c
@@ -87,7 +87,7 @@ static const struct luaL_reg redislib_m[] = {
#endif
#ifdef WITH_HIREDIS
-struct lua_redis_specific_userdata;
+struct lua_redis_request_specific_userdata;
/**
* Struct for userdata representation
*/
@@ -100,19 +100,21 @@ struct lua_redis_userdata {
struct rspamd_redis_pool *pool;
gchar *server;
gchar *reqline;
- struct lua_redis_specific_userdata *specific;
+ struct lua_redis_request_specific_userdata *specific;
gdouble timeout;
guint16 port;
guint16 terminated;
};
#define LUA_REDIS_SPECIFIC_REPLIED (1 << 0)
+/* session was finished */
#define LUA_REDIS_SPECIFIC_FINISHED (1 << 1)
#define LUA_REDIS_ASYNC (1 << 0)
#define LUA_REDIS_TEXTDATA (1 << 1)
+#define LUA_REDIS_TERMINATED (1 << 2)
#define IS_ASYNC(ctx) ((ctx)->flags & LUA_REDIS_ASYNC)
-struct lua_redis_specific_userdata {
+struct lua_redis_request_specific_userdata {
gint cbref;
guint nargs;
gchar **args;
@@ -120,19 +122,27 @@ struct lua_redis_specific_userdata {
struct rspamd_async_watcher *w;
struct lua_redis_userdata *c;
struct lua_redis_ctx *ctx;
- struct lua_redis_specific_userdata *next;
+ struct lua_redis_request_specific_userdata *next;
struct event timeout;
guint flags;
};
struct lua_redis_ctx {
guint flags;
- union {
- struct lua_redis_userdata async;
- redisContext *sync;
- } d;
+ struct lua_redis_userdata async;
guint cmds_pending;
ref_entry_t ref;
+ GQueue *replies; /* for sync connection only */
+ GQueue *events_cleanup; /* for sync connection only */
+ struct thread_entry *thread; /* for sync mode, set only if there was yield */
+};
+
+struct lua_redis_result {
+ gboolean is_error;
+ gint result_ref;
+ struct rspamd_async_watcher *w;
+ struct rspamd_async_session *s;
+ struct lua_redis_request_specific_userdata *sp_ud;
};
static struct lua_redis_ctx *
@@ -162,46 +172,50 @@ static void
lua_redis_dtor (struct lua_redis_ctx *ctx)
{
struct lua_redis_userdata *ud;
- struct lua_redis_specific_userdata *cur, *tmp;
+ struct lua_redis_request_specific_userdata *cur, *tmp;
gboolean is_successful = TRUE;
struct redisAsyncContext *ac;
- if (IS_ASYNC (ctx)) {
- msg_debug ("desctructing %p", ctx);
- ud = &ctx->d.async;
-
- if (ud->ctx) {
+ ud = &ctx->async;
+ msg_debug ("desctructing %p", ctx);
- LL_FOREACH_SAFE (ud->specific, cur, tmp) {
- event_del (&cur->timeout);
+ if (ud->ctx) {
- if (!(cur->flags & LUA_REDIS_SPECIFIC_REPLIED)) {
- is_successful = FALSE;
- }
+ LL_FOREACH_SAFE (ud->specific, cur, tmp) {
+ event_del (&cur->timeout);
- cur->flags |= LUA_REDIS_SPECIFIC_FINISHED;
+ if (!(cur->flags & LUA_REDIS_SPECIFIC_REPLIED)) {
+ is_successful = FALSE;
}
- ud->terminated = 1;
- ac = ud->ctx;
- ud->ctx = NULL;
- rspamd_redis_pool_release_connection (ud->pool, ac, is_successful);
+ cur->flags |= LUA_REDIS_SPECIFIC_FINISHED;
}
- LL_FOREACH_SAFE (ud->specific, cur, tmp) {
- lua_redis_free_args (cur->args, cur->arglens, cur->nargs);
+ ctx->flags |= LUA_REDIS_TERMINATED;
- if (cur->cbref != -1) {
- luaL_unref (ud->cfg->lua_state, LUA_REGISTRYINDEX, cur->cbref);
- }
+ ud->terminated = 1;
+ ac = ud->ctx;
+ ud->ctx = NULL;
+ rspamd_redis_pool_release_connection (ud->pool, ac, is_successful);
+ }
+
+ LL_FOREACH_SAFE (ud->specific, cur, tmp) {
+ lua_redis_free_args (cur->args, cur->arglens, cur->nargs);
- g_free (cur);
+ if (cur->cbref != -1) {
+ luaL_unref (ud->cfg->lua_state, LUA_REGISTRYINDEX, cur->cbref);
}
+
+ g_free (cur);
}
- else {
- if (ctx->d.sync) {
- redisFree (ctx->d.sync);
- }
+
+ if (ctx->events_cleanup) {
+ g_queue_free (ctx->events_cleanup);
+ ctx->events_cleanup = NULL;
+ }
+ if (ctx->replies) {
+ g_queue_free (ctx->replies);
+ ctx->replies = NULL;
}
g_free (ctx);
@@ -222,7 +236,7 @@ lua_redis_gc (lua_State *L)
static void
lua_redis_fin (void *arg)
{
- struct lua_redis_specific_userdata *sp_ud = arg;
+ struct lua_redis_request_specific_userdata *sp_ud = arg;
struct lua_redis_ctx *ctx;
ctx = sp_ud->ctx;
@@ -241,7 +255,7 @@ lua_redis_fin (void *arg)
static void
lua_redis_push_error (const gchar *err,
struct lua_redis_ctx *ctx,
- struct lua_redis_specific_userdata *sp_ud,
+ struct lua_redis_request_specific_userdata *sp_ud,
gboolean connected)
{
struct lua_redis_userdata *ud = sp_ud->c;
@@ -327,7 +341,7 @@ lua_redis_push_reply (lua_State *L, const redisReply *r, gboolean text_data)
*/
static void
lua_redis_push_data (const redisReply *r, struct lua_redis_ctx *ctx,
- struct lua_redis_specific_userdata *sp_ud)
+ struct lua_redis_request_specific_userdata *sp_ud)
{
struct lua_redis_userdata *ud = sp_ud->c;
struct lua_callback_state cbs;
@@ -373,7 +387,7 @@ static void
lua_redis_callback (redisAsyncContext *c, gpointer r, gpointer priv)
{
redisReply *reply = r;
- struct lua_redis_specific_userdata *sp_ud = priv;
+ struct lua_redis_request_specific_userdata *sp_ud = priv;
struct lua_redis_ctx *ctx;
struct lua_redis_userdata *ud;
redisAsyncContext *ac;
@@ -431,10 +445,180 @@ lua_redis_callback (redisAsyncContext *c, gpointer r, gpointer priv)
REDIS_RELEASE (ctx);
}
+static gint
+lua_redis_push_results (struct lua_redis_ctx *ctx, lua_State *L)
+{
+ gint results = g_queue_get_length (ctx->replies);
+ gint i;
+ gboolean can_use_lua = TRUE;
+
+ results = g_queue_get_length (ctx->replies);
+
+ if (!lua_checkstack (L, (results * 2) + 1)) {
+ luaL_error (L, "cannot resize stack to fit %d commands",
+ ctx->cmds_pending);
+
+ can_use_lua = FALSE;
+ }
+
+ for (i = 0; i < results; i ++) {
+ struct lua_redis_result *result = g_queue_pop_head (ctx->replies);
+
+ if (can_use_lua) {
+ lua_pushboolean (L, !result->is_error);
+ lua_rawgeti (L, LUA_REGISTRYINDEX, result->result_ref);
+ }
+
+ luaL_unref (L, LUA_REGISTRYINDEX, result->result_ref);
+
+ g_queue_push_tail (ctx->events_cleanup, result);
+ }
+
+ return can_use_lua ? results * 2 : 0;
+}
+
+static void
+lua_redis_cleanup_events (struct lua_redis_ctx *ctx)
+{
+ while (!g_queue_is_empty (ctx->events_cleanup)) {
+ struct lua_redis_result *result = g_queue_pop_head (ctx->events_cleanup);
+
+ rspamd_session_watcher_pop (result->s, result->w);
+ rspamd_session_remove_event (result->s, lua_redis_fin, result->sp_ud);
+
+ g_free (result);
+ }
+}
+
+/**
+ * Callback for redis replies
+ * @param c context of redis connection
+ * @param r redis reply
+ * @param priv userdata
+ */
+static void
+lua_redis_callback_sync (redisAsyncContext *ac, gpointer r, gpointer priv)
+{
+ redisReply *reply = r;
+
+ struct lua_redis_request_specific_userdata *sp_ud = priv;
+ struct lua_redis_ctx *ctx;
+ struct lua_redis_userdata *ud;
+ struct thread_entry* thread;
+ gint results;
+
+ ctx = sp_ud->ctx;
+ ud = sp_ud->c;
+ lua_State *L = ctx->async.cfg->lua_state;
+
+ sp_ud->flags |= LUA_REDIS_SPECIFIC_REPLIED;
+
+ if (ud->terminated) {
+ /* We are already at the termination stage, just go out */
+ /* TODO:
+ if somebody is waiting for us (ctx->thread), return result,
+ otherwise, indeed, ignore
+ */
+ return;
+ }
+
+ event_del (&sp_ud->timeout);
+
+ msg_debug ("got reply from redis %p for query %p", ctx, sp_ud);
+
+ struct lua_redis_result *result = g_malloc0 (sizeof *result);
+
+ /* If session is finished, we cannot call lua callbacks */
+ if (ac->err == 0) {
+ if (r != NULL) {
+ if (reply->type != REDIS_REPLY_ERROR) {
+ result->is_error = FALSE;
+ lua_redis_push_reply (L, reply, ctx->flags & LUA_REDIS_TEXTDATA);
+ }
+ else {
+ result->is_error = TRUE;
+ lua_pushstring (L, reply->str);
+ }
+ }
+ else {
+ result->is_error = TRUE;
+ lua_pushliteral (L, "received no data from server");
+ }
+ }
+ else {
+ result->is_error = TRUE;
+ if (ac->err == REDIS_ERR_IO) {
+ lua_pushstring (L, strerror (errno));
+ }
+ else {
+ lua_pushstring (L, ac->errstr);
+ }
+ }
+ /* if error happened, we should terminate the connection,
+ and release it */
+
+ if (result->is_error) {
+ /* Set to NULL to avoid double free in dtor */
+ sp_ud->c->ctx = NULL;
+ ctx->flags |= LUA_REDIS_TERMINATED;
+
+ /*
+ * This will call all callbacks pending so the entire context
+ * will be destructed
+ */
+ rspamd_redis_pool_release_connection (sp_ud->c->pool, ac, TRUE);
+ }
+
+ result->result_ref = luaL_ref (L, LUA_REGISTRYINDEX);
+ result->s = ud->s;
+ result->w = sp_ud->w;
+ result->sp_ud = sp_ud;
+
+ g_queue_push_tail (ctx->replies, result);
+
+ ctx->cmds_pending --;
+
+ if (ctx->cmds_pending == 0) {
+ if (ctx->thread) {
+ /* somebody yielded and waits for results */
+ thread = ctx->thread;
+ ctx->thread = NULL;
+
+ results = lua_redis_push_results (ctx, thread->lua_state);
+
+ lua_thread_resume (thread, results);
+
+ lua_redis_cleanup_events (ctx);
+ }
+ }
+}
+
+static void
+lua_redis_timeout_sync (int fd, short what, gpointer priv)
+{
+ struct lua_redis_request_specific_userdata *sp_ud = priv;
+ struct lua_redis_ctx *ctx = sp_ud->ctx;
+ redisAsyncContext *ac;
+
+ ac = sp_ud->c->ctx;
+
+ /* Set to NULL to avoid double free in dtor */
+ sp_ud->c->ctx = NULL;
+ ac->err = REDIS_ERR_IO;
+ errno = ETIMEDOUT;
+ ctx->flags |= LUA_REDIS_TERMINATED;
+
+ /*
+ * This will call all callbacks pending so the entire context
+ * will be destructed
+ */
+ rspamd_redis_pool_release_connection (sp_ud->c->pool, ac, TRUE);
+}
+
static void
lua_redis_timeout (int fd, short what, gpointer u)
{
- struct lua_redis_specific_userdata *sp_ud = u;
+ struct lua_redis_request_specific_userdata *sp_ud = u;
struct lua_redis_ctx *ctx;
redisAsyncContext *ac;
@@ -562,7 +746,7 @@ lua_redis_parse_args (lua_State *L, gint idx, const gchar *cmd,
}
static struct lua_redis_ctx *
-rspamd_lua_redis_prepare_connection (lua_State *L, gint *pcbref)
+rspamd_lua_redis_prepare_connection (lua_State *L, gint *pcbref, gboolean is_async)
{
struct lua_redis_ctx *ctx;
rspamd_inet_addr_t *ip = NULL;
@@ -621,6 +805,12 @@ rspamd_lua_redis_prepare_connection (lua_State *L, gint *pcbref)
if (cfg && ev_base) {
ret = TRUE;
}
+ else if (!cfg) {
+ msg_err_task_check ("config is not passed");
+ }
+ else {
+ msg_err_task_check ("ev_base is not set");
+ }
}
else {
cfg = task->cfg;
@@ -687,15 +877,23 @@ rspamd_lua_redis_prepare_connection (lua_State *L, gint *pcbref)
lua_pop (L, 1); /* table */
if (session && rspamd_session_is_destroying (session)) {
+ msg_err_task_check ("Session is being destroying");
ret = FALSE;
}
-
if (ret && addr != NULL) {
ctx = g_malloc0 (sizeof (struct lua_redis_ctx));
REF_INIT_RETAIN (ctx, lua_redis_dtor);
- ctx->flags |= flags | LUA_REDIS_ASYNC;
- ud = &ctx->d.async;
+ if (is_async) {
+ ctx->flags |= flags | LUA_REDIS_ASYNC;
+ ud = &ctx->async;
+ }
+ else {
+ ud = &ctx->async;
+ ctx->replies = g_queue_new ();
+ ctx->events_cleanup = g_queue_new ();
+
+ }
ud->s = session;
ud->cfg = cfg;
ud->pool = cfg->redis_pool;
@@ -767,7 +965,7 @@ static int
lua_redis_make_request (lua_State *L)
{
LUA_TRACE_POINT;
- struct lua_redis_specific_userdata *sp_ud;
+ struct lua_redis_request_specific_userdata *sp_ud;
struct lua_redis_userdata *ud;
struct lua_redis_ctx *ctx, **pctx;
const gchar *cmd = NULL;
@@ -776,10 +974,10 @@ lua_redis_make_request (lua_State *L)
gint cbref = -1;
gboolean ret = FALSE;
- ctx = rspamd_lua_redis_prepare_connection (L, &cbref);
+ ctx = rspamd_lua_redis_prepare_connection (L, &cbref, TRUE);
if (ctx) {
- ud = &ctx->d.async;
+ ud = &ctx->async;
sp_ud = g_malloc0 (sizeof (*sp_ud));
sp_ud->cbref = cbref;
sp_ud->c = ud;
@@ -1022,10 +1220,10 @@ lua_redis_connect (lua_State *L)
struct lua_redis_ctx *ctx, **pctx;
gdouble timeout = REDIS_DEFAULT_TIMEOUT;
- ctx = rspamd_lua_redis_prepare_connection (L, NULL);
+ ctx = rspamd_lua_redis_prepare_connection (L, NULL, TRUE);
if (ctx) {
- ud = &ctx->d.async;
+ ud = &ctx->async;
lua_pushstring (L, "timeout");
lua_gettable (L, 1);
@@ -1061,33 +1259,14 @@ static int
lua_redis_connect_sync (lua_State *L)
{
LUA_TRACE_POINT;
- struct rspamd_lua_ip *addr = NULL;
rspamd_inet_addr_t *ip = NULL;
- const gchar *host;
- struct timeval tv;
- gboolean ret = FALSE;
- guint flags = 0;
gdouble timeout = REDIS_DEFAULT_TIMEOUT;
struct lua_redis_ctx *ctx, **pctx;
- if (lua_istable (L, 1)) {
- lua_pushstring (L, "host");
- lua_gettable (L, -2);
- if (lua_type (L, -1) == LUA_TUSERDATA) {
- addr = lua_check_ip (L, -1);
- }
- else if (lua_type (L, -1) == LUA_TSTRING) {
- host = lua_tostring (L, -1);
- if (rspamd_parse_inet_address (&ip, host, strlen (host))) {
- addr = g_alloca (sizeof (*addr));
- addr->addr = ip;
+ ctx = rspamd_lua_redis_prepare_connection (L, NULL, FALSE);
- if (rspamd_inet_address_get_port (ip) == 0) {
- rspamd_inet_address_set_port (ip, 6379);
- }
- }
- }
- lua_pop (L, 1);
+
+ if (ctx) {
lua_pushstring (L, "timeout");
lua_gettable (L, -2);
@@ -1096,52 +1275,7 @@ lua_redis_connect_sync (lua_State *L)
}
lua_pop (L, 1);
- lua_pushstring (L, "opaque_data");
- lua_gettable (L, -2);
- if (!!lua_toboolean (L, -1)) {
- flags |= LUA_REDIS_TEXTDATA;
- }
- lua_pop (L, 1);
-
- if (addr) {
- ret = TRUE;
- }
- }
-
- if (ret) {
- double_to_tv (timeout, &tv);
- ctx = g_malloc0 (sizeof (struct lua_redis_ctx));
- REF_INIT_RETAIN (ctx, lua_redis_dtor);
- ctx->flags = flags;
-
- if (rspamd_inet_address_get_af (addr->addr) == AF_UNIX) {
- ctx->d.sync = redisConnectUnixWithTimeout (
- rspamd_inet_address_to_string (addr->addr), tv);
- }
- else {
- ctx->d.sync = redisConnectWithTimeout (
- rspamd_inet_address_to_string (addr->addr),
- rspamd_inet_address_get_port (addr->addr), tv);
- }
-
- if (ip) {
- rspamd_inet_address_free (ip);
- }
-
- if (ctx->d.sync == NULL || ctx->d.sync->err) {
- lua_pushboolean (L, FALSE);
-
- if (ctx->d.sync) {
- lua_pushstring (L, ctx->d.sync->errstr);
- }
- else {
- lua_pushstring (L, "unknown error");
- }
-
- REDIS_RELEASE (ctx);
-
- return 2;
- }
+ ctx->async.timeout = timeout;
lua_pushboolean (L, TRUE);
pctx = lua_newuserdata (L, sizeof (ctx));
@@ -1174,118 +1308,107 @@ lua_redis_add_cmd (lua_State *L)
{
LUA_TRACE_POINT;
struct lua_redis_ctx *ctx = lua_check_redis (L, 1);
- struct lua_redis_specific_userdata *sp_ud;
+ struct lua_redis_request_specific_userdata *sp_ud;
struct lua_redis_userdata *ud;
const gchar *cmd = NULL;
gint args_pos = 2;
- gchar **args = NULL;
- gsize *arglens = NULL;
- guint nargs = 0;
gint cbref = -1, ret;
struct timeval tv;
if (ctx) {
+ if (ctx->flags & LUA_REDIS_TERMINATED) {
+ lua_pushboolean (L, FALSE);
+ lua_pushstring (L, "Connection is terminated");
- if (IS_ASYNC (ctx)) {
- ud = &ctx->d.async;
+ return 2;
+ }
- /* Async version */
- if (lua_type (L, 2) == LUA_TSTRING) {
- /* No callback version */
- cmd = lua_tostring (L, 2);
- args_pos = 3;
- }
- else if (lua_type (L, 2) == LUA_TFUNCTION) {
- lua_pushvalue (L, 2);
- cbref = luaL_ref (L, LUA_REGISTRYINDEX);
- cmd = lua_tostring (L, 3);
- args_pos = 4;
- }
- else {
- return luaL_error (L, "invalid arguments");
- }
+ /* Async version */
+ if (lua_type (L, 2) == LUA_TSTRING) {
+ /* No callback version */
+ cmd = lua_tostring (L, 2);
+ args_pos = 3;
+ }
+ else if (lua_type (L, 2) == LUA_TFUNCTION) {
+ lua_pushvalue (L, 2);
+ cbref = luaL_ref (L, LUA_REGISTRYINDEX);
+ cmd = lua_tostring (L, 3);
+ args_pos = 4;
+ }
+ else {
+ return luaL_error (L, "invalid arguments");
+ }
- sp_ud = g_malloc0 (sizeof (*sp_ud));
+ sp_ud = g_malloc0 (sizeof (*sp_ud));
+ if (IS_ASYNC (ctx)) {
+ sp_ud->c = &ctx->async;
+ ud = &ctx->async;
sp_ud->cbref = cbref;
- sp_ud->c = &ctx->d.async;
- sp_ud->ctx = ctx;
+ }
+ else {
+ sp_ud->c = &ctx->async;
+ ud = &ctx->async;
+ }
+ sp_ud->ctx = ctx;
- lua_redis_parse_args (L, args_pos, cmd, &sp_ud->args,
- &sp_ud->arglens, &sp_ud->nargs);
+ lua_redis_parse_args (L, args_pos, cmd, &sp_ud->args,
+ &sp_ud->arglens, &sp_ud->nargs);
- LL_PREPEND (sp_ud->c->specific, sp_ud);
+ LL_PREPEND (sp_ud->c->specific, sp_ud);
- if (ud->s && rspamd_session_is_destroying (ud->s)) {
- lua_pushboolean (L, 0);
- lua_pushstring (L, "session is terminating");
+ if (ud->s && rspamd_session_is_destroying (ud->s)) {
+ lua_pushboolean (L, 0);
+ lua_pushstring (L, "session is terminating");
- return 2;
- }
+ return 2;
+ }
+ if (IS_ASYNC (ctx)) {
ret = redisAsyncCommandArgv (sp_ud->c->ctx,
lua_redis_callback,
sp_ud,
sp_ud->nargs,
(const gchar **)sp_ud->args,
sp_ud->arglens);
+ }
+ else {
+ ret = redisAsyncCommandArgv (sp_ud->c->ctx,
+ lua_redis_callback_sync,
+ sp_ud,
+ sp_ud->nargs,
+ (const gchar **)sp_ud->args,
+ sp_ud->arglens);
+ }
- if (ret == REDIS_OK) {
- if (ud->s) {
- rspamd_session_add_event (ud->s,
- lua_redis_fin,
- sp_ud,
- g_quark_from_static_string ("lua redis"));
- sp_ud->w = rspamd_session_get_watcher (ud->s);
- rspamd_session_watcher_push (ud->s);
- }
+ if (ret == REDIS_OK) {
+ if (ud->s) {
+ rspamd_session_add_event (ud->s,
+ lua_redis_fin,
+ sp_ud,
+ g_quark_from_static_string ("lua redis"));
+ sp_ud->w = rspamd_session_get_watcher (ud->s);
+ rspamd_session_watcher_push (ud->s);
+ }
- double_to_tv (sp_ud->c->timeout, &tv);
+ double_to_tv (sp_ud->c->timeout, &tv);
+ if (IS_ASYNC (ctx)) {
event_set (&sp_ud->timeout, -1, EV_TIMEOUT, lua_redis_timeout, sp_ud);
- event_base_set (ud->ev_base, &sp_ud->timeout);
- event_add (&sp_ud->timeout, &tv);
- REDIS_RETAIN (ctx);
- ctx->cmds_pending ++;
}
else {
- msg_info ("call to redis failed: %s",
- sp_ud->c->ctx->errstr);
- lua_pushboolean (L, 0);
- lua_pushstring (L, sp_ud->c->ctx->errstr);
-
- return 2;
+ event_set (&sp_ud->timeout, -1, EV_TIMEOUT, lua_redis_timeout_sync, sp_ud);
}
+ event_base_set (ud->ev_base, &sp_ud->timeout);
+ event_add (&sp_ud->timeout, &tv);
+ REDIS_RETAIN (ctx);
+ ctx->cmds_pending ++;
}
else {
- /* Synchronous version */
- if (lua_type (L, 2) == LUA_TSTRING) {
- cmd = lua_tostring (L, 2);
- args_pos = 3;
- }
- else {
- return luaL_error (L, "invalid arguments");
- }
-
- if (ctx->d.sync) {
- lua_redis_parse_args (L, args_pos, cmd, &args, &arglens, &nargs);
-
- if (nargs > 0) {
- if (redisAppendCommandArgv (ctx->d.sync, nargs,
- (const char **)args, arglens) == REDIS_OK) {
- ctx->cmds_pending ++;
- }
-
- lua_redis_free_args (args, arglens, nargs);
- }
- else {
- lua_pushstring (L, "cannot append commands when not connected");
- return lua_error (L);
- }
+ msg_info ("call to redis failed: %s",
+ sp_ud->c->ctx->errstr);
+ lua_pushboolean (L, 0);
+ lua_pushstring (L, sp_ud->c->ctx->errstr);
- }
- else {
- lua_pushstring (L, "cannot append commands when not connected");
- return lua_error (L);
- }
+ return 2;
}
}
@@ -1304,9 +1427,6 @@ lua_redis_exec (lua_State *L)
{
LUA_TRACE_POINT;
struct lua_redis_ctx *ctx = lua_check_redis (L, 1);
- redisReply *r;
- gint ret;
- guint i, nret = 0, pending;
if (ctx == NULL) {
lua_error (L);
@@ -1320,48 +1440,26 @@ lua_redis_exec (lua_State *L)
return 0;
}
else {
- if (!ctx->d.sync) {
+ if (false /* !ctx->d.sync */) {
lua_pushstring (L, "cannot exec commands when not connected");
lua_error (L);
return 0;
}
else {
- if (!lua_checkstack (L, (ctx->cmds_pending * 2) + 1)) {
- return luaL_error (L, "cannot resize stack to fit %d commands",
- ctx->cmds_pending);
+ if (ctx->cmds_pending == 0 && g_queue_get_length (ctx->replies) == 0) {
+ lua_pushstring (L, "No pending commands to execute");
+ lua_error (L);
}
-
- pending = ctx->cmds_pending;
- ctx->cmds_pending = 0;
-
- for (i = 0; i < pending; i ++) {
- ret = redisGetReply (ctx->d.sync, (void **)&r);
-
- if (ret == REDIS_OK) {
- if (r->type != REDIS_REPLY_ERROR) {
- lua_pushboolean (L, TRUE);
- lua_redis_push_reply (L, r,
- ctx->flags & LUA_REDIS_TEXTDATA);
- }
- else {
- lua_pushboolean (L, FALSE);
- lua_pushlstring (L, r->str, r->len);
- }
-
- freeReplyObject (r);
- }
- else {
- msg_info ("call to redis failed: %s", ctx->d.sync->errstr);
- lua_pushboolean (L, FALSE);
- lua_pushstring (L, ctx->d.sync->errstr);
- }
-
- nret += 2;
+ if (ctx->cmds_pending == 0 && g_queue_get_length (ctx->replies) > 0) {
+ gint results = lua_redis_push_results (ctx, L);
+ return results;
+ }
+ else {
+ ctx->thread = lua_thread_pool_get_running_entry (ctx->async.cfg->lua_thread_pool);
+ return lua_thread_yield (ctx->thread, 0);
}
}
}
-
- return nret;
}
#else
static int