diff options
author | Vsevolod Stakhov <vsevolod@highsecure.ru> | 2016-04-26 14:49:28 +0100 |
---|---|---|
committer | Vsevolod Stakhov <vsevolod@highsecure.ru> | 2016-04-26 14:49:47 +0100 |
commit | 816bc6f6eafb1afd7d750cc202ca2cf774abfd65 (patch) | |
tree | f45413d6d73a05fadbd28f42f83a90c8b84f6aa8 /src | |
parent | 3643f35e685778c3c2f53e56c182d804938de2f9 (diff) | |
download | rspamd-816bc6f6eafb1afd7d750cc202ca2cf774abfd65.tar.gz rspamd-816bc6f6eafb1afd7d750cc202ca2cf774abfd65.zip |
[Feature] Implement pipelining for redis async interface
Diffstat (limited to 'src')
-rw-r--r-- | src/lua/lua_redis.c | 286 |
1 files changed, 203 insertions, 83 deletions
diff --git a/src/lua/lua_redis.c b/src/lua/lua_redis.c index 952bfb883..3341b0a7b 100644 --- a/src/lua/lua_redis.c +++ b/src/lua/lua_redis.c @@ -15,6 +15,7 @@ */ #include "lua_common.h" #include "dns.h" +#include "utlist.h" #ifdef WITH_HIREDIS #include "hiredis.h" @@ -71,6 +72,7 @@ static const struct luaL_reg redislib_m[] = { }; #ifdef WITH_HIREDIS +struct lua_redis_specific_userdata; /** * Struct for userdata representation */ @@ -78,16 +80,24 @@ struct lua_redis_userdata { redisAsyncContext *ctx; lua_State *L; struct rspamd_task *task; - struct event timeout; gchar *server; gchar *reqline; - gchar **args; - gint cbref; - guint nargs; + struct lua_redis_specific_userdata *specific; + gdouble timeout; guint16 port; guint16 terminated; }; +struct lua_redis_specific_userdata { + gint cbref; + guint nargs; + gchar **args; + struct event timeout; + struct lua_redis_userdata *c; + struct lua_redis_ctx *ctx; + struct lua_redis_specific_userdata *next; +}; + struct lua_redis_ctx { gboolean async; union { @@ -124,6 +134,7 @@ static void lua_redis_dtor (struct lua_redis_ctx *ctx) { struct lua_redis_userdata *ud; + struct lua_redis_specific_userdata *cur, *tmp; if (ctx->async) { ud = &ctx->d.async; @@ -138,9 +149,17 @@ lua_redis_dtor (struct lua_redis_ctx *ctx) ctx->ref.refcount = 100500; redisAsyncFree (ud->ctx); ctx->ref.refcount = 0; - lua_redis_free_args (ud->args, ud->nargs); - event_del (&ud->timeout); - luaL_unref (ud->L, LUA_REGISTRYINDEX, ud->cbref); + + LL_FOREACH_SAFE (ud->specific, cur, tmp) { + lua_redis_free_args (cur->args, cur->nargs); + event_del (&cur->timeout); + + if (cur->cbref != -1) { + luaL_unref (ud->L, LUA_REGISTRYINDEX, cur->cbref); + } + + g_slice_free1 (sizeof (*cur), cur); + } } } else { @@ -167,8 +186,10 @@ lua_redis_gc (lua_State *L) static void lua_redis_fin (void *arg) { - struct lua_redis_ctx *ctx = arg; + struct lua_redis_specific_userdata *sp_ud = arg; + struct lua_redis_ctx *ctx; + ctx = sp_ud->ctx; REF_RELEASE (ctx); } @@ -180,28 +201,31 @@ lua_redis_fin (void *arg) static void lua_redis_push_error (const gchar *err, struct lua_redis_ctx *ctx, + struct lua_redis_specific_userdata *sp_ud, gboolean connected) { struct rspamd_task **ptask; - struct lua_redis_userdata *ud = &ctx->d.async; - - /* Push error */ - lua_rawgeti (ud->L, LUA_REGISTRYINDEX, ud->cbref); - ptask = lua_newuserdata (ud->L, sizeof (struct rspamd_task *)); - rspamd_lua_setclass (ud->L, "rspamd{task}", -1); - - *ptask = ud->task; - /* String of error */ - lua_pushstring (ud->L, err); - /* Data is nil */ - lua_pushnil (ud->L); - if (lua_pcall (ud->L, 3, 0, 0) != 0) { - msg_info ("call to callback failed: %s", lua_tostring (ud->L, -1)); - lua_pop (ud->L, 1); + struct lua_redis_userdata *ud = sp_ud->c; + + if (sp_ud->cbref != -1) { + /* Push error */ + lua_rawgeti (ud->L, LUA_REGISTRYINDEX, sp_ud->cbref); + ptask = lua_newuserdata (ud->L, sizeof (struct rspamd_task *)); + rspamd_lua_setclass (ud->L, "rspamd{task}", -1); + + *ptask = ud->task; + /* String of error */ + lua_pushstring (ud->L, err); + /* Data is nil */ + lua_pushnil (ud->L); + if (lua_pcall (ud->L, 3, 0, 0) != 0) { + msg_info ("call to callback failed: %s", lua_tostring (ud->L, -1)); + lua_pop (ud->L, 1); + } } if (connected) { - rspamd_session_remove_event (ud->task->s, lua_redis_fin, ctx); + rspamd_session_remove_event (ud->task->s, lua_redis_fin, sp_ud); } } @@ -241,28 +265,31 @@ lua_redis_push_reply (lua_State *L, const redisReply *r) * @param ud */ static void -lua_redis_push_data (const redisReply *r, struct lua_redis_ctx *ctx) +lua_redis_push_data (const redisReply *r, struct lua_redis_ctx *ctx, + struct lua_redis_specific_userdata *sp_ud) { struct rspamd_task **ptask; - struct lua_redis_userdata *ud = &ctx->d.async; - - /* Push error */ - lua_rawgeti (ud->L, LUA_REGISTRYINDEX, ud->cbref); - ptask = lua_newuserdata (ud->L, sizeof (struct rspamd_task *)); - rspamd_lua_setclass (ud->L, "rspamd{task}", -1); - - *ptask = ud->task; - /* Error is nil */ - lua_pushnil (ud->L); - /* Data */ - lua_redis_push_reply (ud->L, r); - - if (lua_pcall (ud->L, 3, 0, 0) != 0) { - msg_info ("call to callback failed: %s", lua_tostring (ud->L, -1)); - lua_pop (ud->L, 1); + struct lua_redis_userdata *ud = sp_ud->c; + + if (sp_ud->cbref != -1) { + /* Push error */ + lua_rawgeti (ud->L, LUA_REGISTRYINDEX, sp_ud->cbref); + ptask = lua_newuserdata (ud->L, sizeof (struct rspamd_task *)); + rspamd_lua_setclass (ud->L, "rspamd{task}", -1); + + *ptask = ud->task; + /* Error is nil */ + lua_pushnil (ud->L); + /* Data */ + lua_redis_push_reply (ud->L, r); + + if (lua_pcall (ud->L, 3, 0, 0) != 0) { + msg_info ("call to callback failed: %s", lua_tostring (ud->L, -1)); + lua_pop (ud->L, 1); + } } - rspamd_session_remove_event (ud->task->s, lua_redis_fin, ctx); + rspamd_session_remove_event (ud->task->s, lua_redis_fin, sp_ud); } /** @@ -275,13 +302,14 @@ static void lua_redis_callback (redisAsyncContext *c, gpointer r, gpointer priv) { redisReply *reply = r; - struct lua_redis_ctx *ctx = priv; + struct lua_redis_specific_userdata *sp_ud = priv; + struct lua_redis_ctx *ctx; struct lua_redis_userdata *ud; + ctx = sp_ud->ctx; + ud = sp_ud->c; REF_RETAIN (ctx); - ud = &ctx->d.async; - if (ud->terminated) { /* We are already at the termination stage, just go out */ REF_RELEASE (ctx); @@ -291,22 +319,22 @@ lua_redis_callback (redisAsyncContext *c, gpointer r, gpointer priv) if (c->err == 0) { if (r != NULL) { if (reply->type != REDIS_REPLY_ERROR) { - lua_redis_push_data (reply, ctx); + lua_redis_push_data (reply, ctx, sp_ud); } else { - lua_redis_push_error (reply->str, ctx, TRUE); + lua_redis_push_error (reply->str, ctx, sp_ud, TRUE); } } else { - lua_redis_push_error ("received no data from server", ctx, TRUE); + lua_redis_push_error ("received no data from server", ctx, sp_ud, TRUE); } } else { if (c->err == REDIS_ERR_IO) { - lua_redis_push_error (strerror (errno), ctx, TRUE); + lua_redis_push_error (strerror (errno), ctx, sp_ud, TRUE); } else { - lua_redis_push_error (c->errstr, ctx, TRUE); + lua_redis_push_error (c->errstr, ctx, sp_ud, TRUE); } } @@ -316,11 +344,13 @@ lua_redis_callback (redisAsyncContext *c, gpointer r, gpointer priv) static void lua_redis_timeout (int fd, short what, gpointer u) { - struct lua_redis_ctx *ctx = u; + struct lua_redis_specific_userdata *sp_ud = u; + struct lua_redis_ctx *ctx; + ctx = sp_ud->ctx; REF_RETAIN (ctx); msg_info ("timeout while querying redis server"); - lua_redis_push_error ("timeout while connecting the server", ctx, TRUE); + lua_redis_push_error ("timeout while connecting the server", ctx, sp_ud, TRUE); REF_RELEASE (ctx); } @@ -404,11 +434,12 @@ lua_redis_make_request (lua_State *L) struct lua_redis_ctx *ctx; rspamd_inet_addr_t *ip = NULL; struct lua_redis_userdata *ud; + struct lua_redis_specific_userdata *sp_ud; struct rspamd_lua_ip *addr = NULL; struct rspamd_task *task = NULL; const gchar *cmd = NULL, *host; const gchar *password = NULL, *dbname = NULL; - gint top, cbref = -1; + gint top, cbref = -1, args_pos; struct timeval tv; gboolean ret = FALSE; gdouble timeout = REDIS_DEFAULT_TIMEOUT; @@ -487,18 +518,24 @@ lua_redis_make_request (lua_State *L) lua_pop (L, 1); - if (task != NULL && addr != NULL && cbref != -1 && cmd != NULL) { + if (task != NULL && addr != NULL && cmd != NULL) { ctx = g_slice_alloc0 (sizeof (struct lua_redis_ctx)); REF_INIT_RETAIN (ctx, lua_redis_dtor); ctx->async = TRUE; ud = &ctx->d.async; ud->task = task; ud->L = L; - ud->cbref = cbref; + + sp_ud = g_slice_alloc (sizeof (*sp_ud)); + sp_ud->cbref = cbref; + sp_ud->c = ud; + lua_pushstring (L, "args"); lua_gettable (L, -2); - lua_redis_parse_args (L, -1, cmd, &ud->args, &ud->nargs); + lua_redis_parse_args (L, -1, cmd, &sp_ud->args, &sp_ud->nargs); lua_pop (L, 1); + LL_PREPEND (ud->specific, sp_ud); + ret = TRUE; } else { @@ -514,7 +551,7 @@ lua_redis_make_request (lua_State *L) top = lua_gettop (L); /* Now get callback */ - if (lua_isfunction (L, 3) && addr != NULL && addr->addr && top >= 4) { + if (addr != NULL && addr->addr && top >= 4) { /* Create userdata */ ctx = g_slice_alloc0 (sizeof (struct lua_redis_ctx)); REF_INIT_RETAIN (ctx, lua_redis_dtor); @@ -523,19 +560,34 @@ lua_redis_make_request (lua_State *L) ud->task = task; ud->L = L; - /* Pop other arguments */ - lua_pushvalue (L, 3); - /* Get a reference */ - ud->cbref = luaL_ref (L, LUA_REGISTRYINDEX); + args_pos = 3; + + if (lua_isfunction (L, 3)) { + /* Pop other arguments */ + lua_pushvalue (L, 3); + /* Get a reference */ + cbref = luaL_ref (L, LUA_REGISTRYINDEX); + args_pos = 4; + } + else { + cbref = -1; + } + - cmd = luaL_checkstring (L, 4); + sp_ud = g_slice_alloc (sizeof (*sp_ud)); + sp_ud->cbref = cbref; + sp_ud->c = ud; + cmd = luaL_checkstring (L, args_pos); if (top > 4) { - lua_redis_parse_args (L, 5, cmd, &ud->args, &ud->nargs); + lua_redis_parse_args (L, args_pos + 1, cmd, &sp_ud->args, + &sp_ud->nargs); } else { - lua_redis_parse_args (L, 0, cmd, &ud->args, &ud->nargs); + lua_redis_parse_args (L, 0, cmd, &sp_ud->args, &sp_ud->nargs); } + LL_PREPEND (ud->specific, sp_ud); + ret = TRUE; } else { @@ -545,6 +597,7 @@ lua_redis_make_request (lua_State *L) if (ret) { ud->terminated = 0; + ud->timeout = timeout; ud->ctx = redisAsyncConnect (rspamd_inet_address_to_string (addr->addr), rspamd_inet_address_get_port (addr->addr)); @@ -572,21 +625,22 @@ lua_redis_make_request (lua_State *L) ret = redisAsyncCommandArgv (ud->ctx, lua_redis_callback, - ctx, - ud->nargs, - (const gchar **)ud->args, + sp_ud, + sp_ud->nargs, + (const gchar **)sp_ud->args, NULL); if (ret == REDIS_OK) { rspamd_session_add_event (ud->task->s, lua_redis_fin, - ctx, + sp_ud, g_quark_from_static_string ("lua redis")); + sp_ud->ctx = ctx; double_to_tv (timeout, &tv); - event_set (&ud->timeout, -1, EV_TIMEOUT, lua_redis_timeout, ctx); - event_base_set (ud->task->ev_base, &ud->timeout); - event_add (&ud->timeout, &tv); + event_set (&sp_ud->timeout, -1, EV_TIMEOUT, lua_redis_timeout, sp_ud); + event_base_set (ud->task->ev_base, &sp_ud->timeout); + event_add (&sp_ud->timeout, &tv); } else { msg_info ("call to redis failed: %s", ud->ctx->errstr); @@ -735,9 +789,11 @@ lua_redis_connect (lua_State *L) rspamd_inet_addr_t *ip = NULL; const gchar *host; struct lua_redis_ctx *ctx = NULL, **pctx; + struct lua_redis_specific_userdata *sp_ud; struct lua_redis_userdata *ud; struct rspamd_task *task = NULL; gboolean ret = FALSE; + gdouble timeout = REDIS_DEFAULT_TIMEOUT; if (lua_istable (L, 1)) { /* Table version */ @@ -748,6 +804,12 @@ lua_redis_connect (lua_State *L) } lua_pop (L, 1); + lua_pushstring (L, "timeout"); + lua_gettable (L, -2); + if (lua_type (L, -1) == LUA_TNUMBER) { + timeout = lua_tonumber (L, -1); + } + lua_pop (L, 1); lua_pushstring (L, "host"); lua_gettable (L, -2); @@ -783,13 +845,17 @@ lua_redis_connect (lua_State *L) ud = &ctx->d.async; ud->task = task; ud->L = L; - ud->cbref = -1; + sp_ud = g_slice_alloc0 (sizeof (*sp_ud)); + sp_ud->cbref = -1; + sp_ud->c = ud; + LL_PREPEND (ud->specific, sp_ud); ret = TRUE; } } if (ret && ctx) { ud->terminated = 0; + ud->timeout = timeout; ud->ctx = redisAsyncConnect (rspamd_inet_address_to_string (addr->addr), rspamd_inet_address_get_port (addr->addr)); @@ -919,23 +985,79 @@ static int lua_redis_add_cmd (lua_State *L) { struct lua_redis_ctx *ctx = lua_check_redis (L, 1); + struct lua_redis_specific_userdata *sp_ud; const gchar *cmd = NULL; gint args_pos = 2; gchar **args = NULL; guint nargs = 0; + gint cbref = -1, ret; + struct timeval tv; if (ctx) { - if (lua_type (L, 2) == LUA_TSTRING) { - cmd = lua_tostring (L, 2); - args_pos = 3; - } if (ctx->async) { - lua_pushstring (L, "Async redis pipelining is not implemented"); - lua_error (L); - return 0; + /* Async version */ + if (lua_type (L, 2) == LUA_TSTRING) { + /* No callback version */ + cmd = lua_tostring (L, 2); + args_pos = 3; + } + else if (lua_type (L, 2) == LUA_TFUNCTION) { + lua_pushvalue (L, 2); + cbref = luaL_ref (L, LUA_REGISTRYINDEX); + cmd = lua_tostring (L, 3); + args_pos = 4; + } + else { + return luaL_error (L, "invalid arguments"); + } + + sp_ud = g_slice_alloc (sizeof (*sp_ud)); + sp_ud->cbref = cbref; + sp_ud->c = &ctx->d.async; + sp_ud->ctx = ctx; + + lua_redis_parse_args (L, args_pos, cmd, &sp_ud->args, + &sp_ud->nargs); + + LL_PREPEND (sp_ud->c->specific, sp_ud); + + ret = redisAsyncCommandArgv (sp_ud->c->ctx, + lua_redis_callback, + sp_ud, + sp_ud->nargs, + (const gchar **)sp_ud->args, + NULL); + + if (ret == REDIS_OK) { + rspamd_session_add_event (sp_ud->c->task->s, + lua_redis_fin, + sp_ud, + g_quark_from_static_string ("lua redis")); + + double_to_tv (sp_ud->c->timeout, &tv); + event_set (&sp_ud->timeout, -1, EV_TIMEOUT, lua_redis_timeout, sp_ud); + event_base_set (sp_ud->c->task->ev_base, &sp_ud->timeout); + event_add (&sp_ud->timeout, &tv); + REF_RETAIN (ctx); + } + else { + msg_info ("call to redis failed: %s", sp_ud->c->ctx->errstr); + lua_pushboolean (L, 0); + lua_pushstring (L, sp_ud->c->ctx->errstr); + return 2; + } } else { + /* Synchronous version */ + if (lua_type (L, 2) == LUA_TSTRING) { + cmd = lua_tostring (L, 2); + args_pos = 3; + } + else { + return luaL_error (L, "invalid arguments"); + } + if (ctx->d.sync) { lua_redis_parse_args (L, args_pos, cmd, &args, &nargs); @@ -947,15 +1069,13 @@ lua_redis_add_cmd (lua_State *L) } else { lua_pushstring (L, "cannot append commands when not connected"); - lua_error (L); - return 0; + return lua_error (L); } } else { lua_pushstring (L, "cannot append commands when not connected"); - lua_error (L); - return 0; + return lua_error (L); } } } |