path: root/src/lua/lua_redis.c
diff options
authorVsevolod Stakhov <>2016-04-26 14:49:28 +0100
committerVsevolod Stakhov <>2016-04-26 14:49:47 +0100
commit816bc6f6eafb1afd7d750cc202ca2cf774abfd65 (patch)
treef45413d6d73a05fadbd28f42f83a90c8b84f6aa8 /src/lua/lua_redis.c
parent3643f35e685778c3c2f53e56c182d804938de2f9 (diff)
[Feature] Implement pipelining for redis async interface
Diffstat (limited to 'src/lua/lua_redis.c')
1 files changed, 203 insertions, 83 deletions
diff --git a/src/lua/lua_redis.c b/src/lua/lua_redis.c
index 952bfb883..3341b0a7b 100644
--- a/src/lua/lua_redis.c
+++ b/src/lua/lua_redis.c
@@ -15,6 +15,7 @@
#include "lua_common.h"
#include "dns.h"
+#include "utlist.h"
#include "hiredis.h"
@@ -71,6 +72,7 @@ static const struct luaL_reg redislib_m[] = {
+struct lua_redis_specific_userdata;
* Struct for userdata representation
@@ -78,16 +80,24 @@ struct lua_redis_userdata {
redisAsyncContext *ctx;
lua_State *L;
struct rspamd_task *task;
- struct event timeout;
gchar *server;
gchar *reqline;
- gchar **args;
- gint cbref;
- guint nargs;
+ struct lua_redis_specific_userdata *specific;
+ gdouble timeout;
guint16 port;
guint16 terminated;
+struct lua_redis_specific_userdata {
+ gint cbref;
+ guint nargs;
+ gchar **args;
+ struct event timeout;
+ struct lua_redis_userdata *c;
+ struct lua_redis_ctx *ctx;
+ struct lua_redis_specific_userdata *next;
struct lua_redis_ctx {
gboolean async;
union {
@@ -124,6 +134,7 @@ static void
lua_redis_dtor (struct lua_redis_ctx *ctx)
struct lua_redis_userdata *ud;
+ struct lua_redis_specific_userdata *cur, *tmp;
if (ctx->async) {
ud = &ctx->d.async;
@@ -138,9 +149,17 @@ lua_redis_dtor (struct lua_redis_ctx *ctx)
ctx->ref.refcount = 100500;
redisAsyncFree (ud->ctx);
ctx->ref.refcount = 0;
- lua_redis_free_args (ud->args, ud->nargs);
- event_del (&ud->timeout);
- luaL_unref (ud->L, LUA_REGISTRYINDEX, ud->cbref);
+ LL_FOREACH_SAFE (ud->specific, cur, tmp) {
+ lua_redis_free_args (cur->args, cur->nargs);
+ event_del (&cur->timeout);
+ if (cur->cbref != -1) {
+ luaL_unref (ud->L, LUA_REGISTRYINDEX, cur->cbref);
+ }
+ g_slice_free1 (sizeof (*cur), cur);
+ }
else {
@@ -167,8 +186,10 @@ lua_redis_gc (lua_State *L)
static void
lua_redis_fin (void *arg)
- struct lua_redis_ctx *ctx = arg;
+ struct lua_redis_specific_userdata *sp_ud = arg;
+ struct lua_redis_ctx *ctx;
+ ctx = sp_ud->ctx;
@@ -180,28 +201,31 @@ lua_redis_fin (void *arg)
static void
lua_redis_push_error (const gchar *err,
struct lua_redis_ctx *ctx,
+ struct lua_redis_specific_userdata *sp_ud,
gboolean connected)
struct rspamd_task **ptask;
- struct lua_redis_userdata *ud = &ctx->d.async;
- /* Push error */
- lua_rawgeti (ud->L, LUA_REGISTRYINDEX, ud->cbref);
- ptask = lua_newuserdata (ud->L, sizeof (struct rspamd_task *));
- rspamd_lua_setclass (ud->L, "rspamd{task}", -1);
- *ptask = ud->task;
- /* String of error */
- lua_pushstring (ud->L, err);
- /* Data is nil */
- lua_pushnil (ud->L);
- if (lua_pcall (ud->L, 3, 0, 0) != 0) {
- msg_info ("call to callback failed: %s", lua_tostring (ud->L, -1));
- lua_pop (ud->L, 1);
+ struct lua_redis_userdata *ud = sp_ud->c;
+ if (sp_ud->cbref != -1) {
+ /* Push error */
+ lua_rawgeti (ud->L, LUA_REGISTRYINDEX, sp_ud->cbref);
+ ptask = lua_newuserdata (ud->L, sizeof (struct rspamd_task *));
+ rspamd_lua_setclass (ud->L, "rspamd{task}", -1);
+ *ptask = ud->task;
+ /* String of error */
+ lua_pushstring (ud->L, err);
+ /* Data is nil */
+ lua_pushnil (ud->L);
+ if (lua_pcall (ud->L, 3, 0, 0) != 0) {
+ msg_info ("call to callback failed: %s", lua_tostring (ud->L, -1));
+ lua_pop (ud->L, 1);
+ }
if (connected) {
- rspamd_session_remove_event (ud->task->s, lua_redis_fin, ctx);
+ rspamd_session_remove_event (ud->task->s, lua_redis_fin, sp_ud);
@@ -241,28 +265,31 @@ lua_redis_push_reply (lua_State *L, const redisReply *r)
* @param ud
static void
-lua_redis_push_data (const redisReply *r, struct lua_redis_ctx *ctx)
+lua_redis_push_data (const redisReply *r, struct lua_redis_ctx *ctx,
+ struct lua_redis_specific_userdata *sp_ud)
struct rspamd_task **ptask;
- struct lua_redis_userdata *ud = &ctx->d.async;
- /* Push error */
- lua_rawgeti (ud->L, LUA_REGISTRYINDEX, ud->cbref);
- ptask = lua_newuserdata (ud->L, sizeof (struct rspamd_task *));
- rspamd_lua_setclass (ud->L, "rspamd{task}", -1);
- *ptask = ud->task;
- /* Error is nil */
- lua_pushnil (ud->L);
- /* Data */
- lua_redis_push_reply (ud->L, r);
- if (lua_pcall (ud->L, 3, 0, 0) != 0) {
- msg_info ("call to callback failed: %s", lua_tostring (ud->L, -1));
- lua_pop (ud->L, 1);
+ struct lua_redis_userdata *ud = sp_ud->c;
+ if (sp_ud->cbref != -1) {
+ /* Push error */
+ lua_rawgeti (ud->L, LUA_REGISTRYINDEX, sp_ud->cbref);
+ ptask = lua_newuserdata (ud->L, sizeof (struct rspamd_task *));
+ rspamd_lua_setclass (ud->L, "rspamd{task}", -1);
+ *ptask = ud->task;
+ /* Error is nil */
+ lua_pushnil (ud->L);
+ /* Data */
+ lua_redis_push_reply (ud->L, r);
+ if (lua_pcall (ud->L, 3, 0, 0) != 0) {
+ msg_info ("call to callback failed: %s", lua_tostring (ud->L, -1));
+ lua_pop (ud->L, 1);
+ }
- rspamd_session_remove_event (ud->task->s, lua_redis_fin, ctx);
+ rspamd_session_remove_event (ud->task->s, lua_redis_fin, sp_ud);
@@ -275,13 +302,14 @@ static void
lua_redis_callback (redisAsyncContext *c, gpointer r, gpointer priv)
redisReply *reply = r;
- struct lua_redis_ctx *ctx = priv;
+ struct lua_redis_specific_userdata *sp_ud = priv;
+ struct lua_redis_ctx *ctx;
struct lua_redis_userdata *ud;
+ ctx = sp_ud->ctx;
+ ud = sp_ud->c;
- ud = &ctx->d.async;
if (ud->terminated) {
/* We are already at the termination stage, just go out */
@@ -291,22 +319,22 @@ lua_redis_callback (redisAsyncContext *c, gpointer r, gpointer priv)
if (c->err == 0) {
if (r != NULL) {
if (reply->type != REDIS_REPLY_ERROR) {
- lua_redis_push_data (reply, ctx);
+ lua_redis_push_data (reply, ctx, sp_ud);
else {
- lua_redis_push_error (reply->str, ctx, TRUE);
+ lua_redis_push_error (reply->str, ctx, sp_ud, TRUE);
else {
- lua_redis_push_error ("received no data from server", ctx, TRUE);
+ lua_redis_push_error ("received no data from server", ctx, sp_ud, TRUE);
else {
if (c->err == REDIS_ERR_IO) {
- lua_redis_push_error (strerror (errno), ctx, TRUE);
+ lua_redis_push_error (strerror (errno), ctx, sp_ud, TRUE);
else {
- lua_redis_push_error (c->errstr, ctx, TRUE);
+ lua_redis_push_error (c->errstr, ctx, sp_ud, TRUE);
@@ -316,11 +344,13 @@ lua_redis_callback (redisAsyncContext *c, gpointer r, gpointer priv)
static void
lua_redis_timeout (int fd, short what, gpointer u)
- struct lua_redis_ctx *ctx = u;
+ struct lua_redis_specific_userdata *sp_ud = u;
+ struct lua_redis_ctx *ctx;
+ ctx = sp_ud->ctx;
msg_info ("timeout while querying redis server");
- lua_redis_push_error ("timeout while connecting the server", ctx, TRUE);
+ lua_redis_push_error ("timeout while connecting the server", ctx, sp_ud, TRUE);
@@ -404,11 +434,12 @@ lua_redis_make_request (lua_State *L)
struct lua_redis_ctx *ctx;
rspamd_inet_addr_t *ip = NULL;
struct lua_redis_userdata *ud;
+ struct lua_redis_specific_userdata *sp_ud;
struct rspamd_lua_ip *addr = NULL;
struct rspamd_task *task = NULL;
const gchar *cmd = NULL, *host;
const gchar *password = NULL, *dbname = NULL;
- gint top, cbref = -1;
+ gint top, cbref = -1, args_pos;
struct timeval tv;
gboolean ret = FALSE;
gdouble timeout = REDIS_DEFAULT_TIMEOUT;
@@ -487,18 +518,24 @@ lua_redis_make_request (lua_State *L)
lua_pop (L, 1);
- if (task != NULL && addr != NULL && cbref != -1 && cmd != NULL) {
+ if (task != NULL && addr != NULL && cmd != NULL) {
ctx = g_slice_alloc0 (sizeof (struct lua_redis_ctx));
REF_INIT_RETAIN (ctx, lua_redis_dtor);
ctx->async = TRUE;
ud = &ctx->d.async;
ud->task = task;
ud->L = L;
- ud->cbref = cbref;
+ sp_ud = g_slice_alloc (sizeof (*sp_ud));
+ sp_ud->cbref = cbref;
+ sp_ud->c = ud;
lua_pushstring (L, "args");
lua_gettable (L, -2);
- lua_redis_parse_args (L, -1, cmd, &ud->args, &ud->nargs);
+ lua_redis_parse_args (L, -1, cmd, &sp_ud->args, &sp_ud->nargs);
lua_pop (L, 1);
+ LL_PREPEND (ud->specific, sp_ud);
ret = TRUE;
else {
@@ -514,7 +551,7 @@ lua_redis_make_request (lua_State *L)
top = lua_gettop (L);
/* Now get callback */
- if (lua_isfunction (L, 3) && addr != NULL && addr->addr && top >= 4) {
+ if (addr != NULL && addr->addr && top >= 4) {
/* Create userdata */
ctx = g_slice_alloc0 (sizeof (struct lua_redis_ctx));
REF_INIT_RETAIN (ctx, lua_redis_dtor);
@@ -523,19 +560,34 @@ lua_redis_make_request (lua_State *L)
ud->task = task;
ud->L = L;
- /* Pop other arguments */
- lua_pushvalue (L, 3);
- /* Get a reference */
- ud->cbref = luaL_ref (L, LUA_REGISTRYINDEX);
+ args_pos = 3;
+ if (lua_isfunction (L, 3)) {
+ /* Pop other arguments */
+ lua_pushvalue (L, 3);
+ /* Get a reference */
+ cbref = luaL_ref (L, LUA_REGISTRYINDEX);
+ args_pos = 4;
+ }
+ else {
+ cbref = -1;
+ }
- cmd = luaL_checkstring (L, 4);
+ sp_ud = g_slice_alloc (sizeof (*sp_ud));
+ sp_ud->cbref = cbref;
+ sp_ud->c = ud;
+ cmd = luaL_checkstring (L, args_pos);
if (top > 4) {
- lua_redis_parse_args (L, 5, cmd, &ud->args, &ud->nargs);
+ lua_redis_parse_args (L, args_pos + 1, cmd, &sp_ud->args,
+ &sp_ud->nargs);
else {
- lua_redis_parse_args (L, 0, cmd, &ud->args, &ud->nargs);
+ lua_redis_parse_args (L, 0, cmd, &sp_ud->args, &sp_ud->nargs);
+ LL_PREPEND (ud->specific, sp_ud);
ret = TRUE;
else {
@@ -545,6 +597,7 @@ lua_redis_make_request (lua_State *L)
if (ret) {
ud->terminated = 0;
+ ud->timeout = timeout;
ud->ctx = redisAsyncConnect (rspamd_inet_address_to_string (addr->addr),
rspamd_inet_address_get_port (addr->addr));
@@ -572,21 +625,22 @@ lua_redis_make_request (lua_State *L)
ret = redisAsyncCommandArgv (ud->ctx,
- ctx,
- ud->nargs,
- (const gchar **)ud->args,
+ sp_ud,
+ sp_ud->nargs,
+ (const gchar **)sp_ud->args,
if (ret == REDIS_OK) {
rspamd_session_add_event (ud->task->s,
- ctx,
+ sp_ud,
g_quark_from_static_string ("lua redis"));
+ sp_ud->ctx = ctx;
double_to_tv (timeout, &tv);
- event_set (&ud->timeout, -1, EV_TIMEOUT, lua_redis_timeout, ctx);
- event_base_set (ud->task->ev_base, &ud->timeout);
- event_add (&ud->timeout, &tv);
+ event_set (&sp_ud->timeout, -1, EV_TIMEOUT, lua_redis_timeout, sp_ud);
+ event_base_set (ud->task->ev_base, &sp_ud->timeout);
+ event_add (&sp_ud->timeout, &tv);
else {
msg_info ("call to redis failed: %s", ud->ctx->errstr);
@@ -735,9 +789,11 @@ lua_redis_connect (lua_State *L)
rspamd_inet_addr_t *ip = NULL;
const gchar *host;
struct lua_redis_ctx *ctx = NULL, **pctx;
+ struct lua_redis_specific_userdata *sp_ud;
struct lua_redis_userdata *ud;
struct rspamd_task *task = NULL;
gboolean ret = FALSE;
+ gdouble timeout = REDIS_DEFAULT_TIMEOUT;
if (lua_istable (L, 1)) {
/* Table version */
@@ -748,6 +804,12 @@ lua_redis_connect (lua_State *L)
lua_pop (L, 1);
+ lua_pushstring (L, "timeout");
+ lua_gettable (L, -2);
+ if (lua_type (L, -1) == LUA_TNUMBER) {
+ timeout = lua_tonumber (L, -1);
+ }
+ lua_pop (L, 1);
lua_pushstring (L, "host");
lua_gettable (L, -2);
@@ -783,13 +845,17 @@ lua_redis_connect (lua_State *L)
ud = &ctx->d.async;
ud->task = task;
ud->L = L;
- ud->cbref = -1;
+ sp_ud = g_slice_alloc0 (sizeof (*sp_ud));
+ sp_ud->cbref = -1;
+ sp_ud->c = ud;
+ LL_PREPEND (ud->specific, sp_ud);
ret = TRUE;
if (ret && ctx) {
ud->terminated = 0;
+ ud->timeout = timeout;
ud->ctx = redisAsyncConnect (rspamd_inet_address_to_string (addr->addr),
rspamd_inet_address_get_port (addr->addr));
@@ -919,23 +985,79 @@ static int
lua_redis_add_cmd (lua_State *L)
struct lua_redis_ctx *ctx = lua_check_redis (L, 1);
+ struct lua_redis_specific_userdata *sp_ud;
const gchar *cmd = NULL;
gint args_pos = 2;
gchar **args = NULL;
guint nargs = 0;
+ gint cbref = -1, ret;
+ struct timeval tv;
if (ctx) {
- if (lua_type (L, 2) == LUA_TSTRING) {
- cmd = lua_tostring (L, 2);
- args_pos = 3;
- }
if (ctx->async) {
- lua_pushstring (L, "Async redis pipelining is not implemented");
- lua_error (L);
- return 0;
+ /* Async version */
+ if (lua_type (L, 2) == LUA_TSTRING) {
+ /* No callback version */
+ cmd = lua_tostring (L, 2);
+ args_pos = 3;
+ }
+ else if (lua_type (L, 2) == LUA_TFUNCTION) {
+ lua_pushvalue (L, 2);
+ cbref = luaL_ref (L, LUA_REGISTRYINDEX);
+ cmd = lua_tostring (L, 3);
+ args_pos = 4;
+ }
+ else {
+ return luaL_error (L, "invalid arguments");
+ }
+ sp_ud = g_slice_alloc (sizeof (*sp_ud));
+ sp_ud->cbref = cbref;
+ sp_ud->c = &ctx->d.async;
+ sp_ud->ctx = ctx;
+ lua_redis_parse_args (L, args_pos, cmd, &sp_ud->args,
+ &sp_ud->nargs);
+ LL_PREPEND (sp_ud->c->specific, sp_ud);
+ ret = redisAsyncCommandArgv (sp_ud->c->ctx,
+ lua_redis_callback,
+ sp_ud,
+ sp_ud->nargs,
+ (const gchar **)sp_ud->args,
+ NULL);
+ if (ret == REDIS_OK) {
+ rspamd_session_add_event (sp_ud->c->task->s,
+ lua_redis_fin,
+ sp_ud,
+ g_quark_from_static_string ("lua redis"));
+ double_to_tv (sp_ud->c->timeout, &tv);
+ event_set (&sp_ud->timeout, -1, EV_TIMEOUT, lua_redis_timeout, sp_ud);
+ event_base_set (sp_ud->c->task->ev_base, &sp_ud->timeout);
+ event_add (&sp_ud->timeout, &tv);
+ REF_RETAIN (ctx);
+ }
+ else {
+ msg_info ("call to redis failed: %s", sp_ud->c->ctx->errstr);
+ lua_pushboolean (L, 0);
+ lua_pushstring (L, sp_ud->c->ctx->errstr);
+ return 2;
+ }
else {
+ /* Synchronous version */
+ if (lua_type (L, 2) == LUA_TSTRING) {
+ cmd = lua_tostring (L, 2);
+ args_pos = 3;
+ }
+ else {
+ return luaL_error (L, "invalid arguments");
+ }
if (ctx->d.sync) {
lua_redis_parse_args (L, args_pos, cmd, &args, &nargs);
@@ -947,15 +1069,13 @@ lua_redis_add_cmd (lua_State *L)
else {
lua_pushstring (L, "cannot append commands when not connected");
- lua_error (L);
- return 0;
+ return lua_error (L);
else {
lua_pushstring (L, "cannot append commands when not connected");
- lua_error (L);
- return 0;
+ return lua_error (L);