path: root/src/lua/lua_redis.c
diff options
authorVsevolod Stakhov <>2016-10-17 16:03:22 +0100
committerVsevolod Stakhov <>2016-10-17 16:03:22 +0100
commit45a1c1e24ac3ac96b8207c9c83ba397114223c4f (patch)
tree15a68bbe37da4d30afc58b693a4a08dcc115ae89 /src/lua/lua_redis.c
parented77fe58566345ed84bd4db9fc5313c246e5ff99 (diff)
[Rework] Make lua_redis task agnostic
Diffstat (limited to 'src/lua/lua_redis.c')
1 files changed, 185 insertions, 220 deletions
diff --git a/src/lua/lua_redis.c b/src/lua/lua_redis.c
index 2466f63d4..8d027eb6f 100644
--- a/src/lua/lua_redis.c
+++ b/src/lua/lua_redis.c
@@ -94,7 +94,9 @@ struct lua_redis_specific_userdata;
struct lua_redis_userdata {
redisAsyncContext *ctx;
lua_State *L;
- struct rspamd_task *task;
+ struct rspamd_async_session *s;
+ struct event_base *ev_base;
+ struct rspamd_config *cfg;
struct rspamd_redis_pool *pool;
gchar *server;
gchar *reqline;
@@ -237,31 +239,31 @@ lua_redis_push_error (const gchar *err,
struct lua_redis_specific_userdata *sp_ud,
gboolean connected)
- struct rspamd_task **ptask;
struct lua_redis_userdata *ud = sp_ud->c;
if (!sp_ud->replied && !sp_ud->finished) {
- if (sp_ud->cbref != -1 && ud->task) {
+ if (sp_ud->cbref != -1) {
/* Push error */
lua_rawgeti (ud->L, LUA_REGISTRYINDEX, sp_ud->cbref);
- ptask = lua_newuserdata (ud->L, sizeof (struct rspamd_task *));
- rspamd_lua_setclass (ud->L, "rspamd{task}", -1);
- *ptask = ud->task;
/* String of error */
lua_pushstring (ud->L, err);
/* Data is nil */
lua_pushnil (ud->L);
- if (lua_pcall (ud->L, 3, 0, 0) != 0) {
+ if (lua_pcall (ud->L, 2, 0, 0) != 0) {
msg_info ("call to callback failed: %s", lua_tostring (ud->L, -1));
lua_pop (ud->L, 1);
sp_ud->replied = TRUE;
- if (connected && ud->task) {
- rspamd_session_watcher_pop (ud->task->s, sp_ud->w);
- rspamd_session_remove_event (ud->task->s, lua_redis_fin, sp_ud);
+ if (connected && ud->s) {
+ rspamd_session_watcher_pop (ud->s, sp_ud->w);
+ rspamd_session_remove_event (ud->s, lua_redis_fin, sp_ud);
+ }
+ else {
+ lua_redis_fin (sp_ud);
@@ -305,23 +307,18 @@ static void
lua_redis_push_data (const redisReply *r, struct lua_redis_ctx *ctx,
struct lua_redis_specific_userdata *sp_ud)
- struct rspamd_task **ptask;
struct lua_redis_userdata *ud = sp_ud->c;
if (!sp_ud->replied && !sp_ud->finished) {
- if (sp_ud->cbref != -1 && ud->task) {
+ if (sp_ud->cbref != -1) {
/* Push error */
lua_rawgeti (ud->L, LUA_REGISTRYINDEX, sp_ud->cbref);
- ptask = lua_newuserdata (ud->L, sizeof (struct rspamd_task *));
- rspamd_lua_setclass (ud->L, "rspamd{task}", -1);
- *ptask = ud->task;
/* Error is nil */
lua_pushnil (ud->L);
/* Data */
lua_redis_push_reply (ud->L, r);
- if (lua_pcall (ud->L, 3, 0, 0) != 0) {
+ if (lua_pcall (ud->L, 2, 0, 0) != 0) {
msg_info ("call to callback failed: %s", lua_tostring (ud->L, -1));
lua_pop (ud->L, 1);
@@ -330,9 +327,12 @@ lua_redis_push_data (const redisReply *r, struct lua_redis_ctx *ctx,
sp_ud->replied = TRUE;
- if (ud->task) {
- rspamd_session_watcher_pop (ud->task->s, sp_ud->w);
- rspamd_session_remove_event (ud->task->s, lua_redis_fin, sp_ud);
+ if (ud->s) {
+ rspamd_session_watcher_pop (ud->s, sp_ud->w);
+ rspamd_session_remove_event (ud->s, lua_redis_fin, sp_ud);
+ }
+ else {
+ lua_redis_fin (sp_ud);
@@ -511,58 +511,78 @@ lua_redis_parse_args (lua_State *L, gint idx, const gchar *cmd,
*nargs = top;
- * @function rspamd_redis.make_request({params})
- * Make request to redis server, params is a table of key=value arguments in any order
- * @param {task} task worker task object
- * @param {ip|string} host server address
- * @param {function} callback callback to be called in form `function (task, err, data)`
- * @param {string} cmd command to be sent to redis
- * @param {table} args numeric array of strings used as redis arguments
- * @param {number} timeout timeout in seconds for request (1.0 by default)
- * @return {boolean} `true` if a request has been scheduled
- */
-static int
-lua_redis_make_request (lua_State *L)
+static struct lua_redis_ctx *
+rspamd_lua_redis_prepare_connection (lua_State *L, gint *pcbref)
- struct lua_redis_ctx *ctx, **pctx;
+ struct lua_redis_ctx *ctx;
rspamd_inet_addr_t *ip = NULL;
struct lua_redis_userdata *ud;
- struct lua_redis_specific_userdata *sp_ud;
struct rspamd_lua_ip *addr = NULL;
struct rspamd_task *task = NULL;
- const gchar *cmd = NULL, *host;
+ const gchar *host;
const gchar *password = NULL, *dbname = NULL;
- gint top, cbref = -1, args_pos;
- struct timeval tv;
+ gint cbref = -1;
+ struct rspamd_config *cfg = NULL;
+ struct rspamd_async_session *session = NULL;
+ struct event_base *ev_base = NULL;
gboolean ret = FALSE;
- gdouble timeout = REDIS_DEFAULT_TIMEOUT;
if (lua_istable (L, 1)) {
/* Table version */
lua_pushstring (L, "task");
lua_gettable (L, -2);
if (lua_type (L, -1) == LUA_TUSERDATA) {
- task = lua_check_task (L, -1);
+ task = lua_check_task_maybe (L, -1);
lua_pop (L, 1);
- lua_pushstring (L, "callback");
- lua_gettable (L, -2);
- if (lua_type (L, -1) == LUA_TFUNCTION) {
- /* This also pops function from the stack */
- cbref = luaL_ref (L, LUA_REGISTRYINDEX);
+ if (!task) {
+ /* We need to get ev_base, config and session separately */
+ lua_pushstring (L, "config");
+ lua_gettable (L, -2);
+ if (lua_type (L, -1) == LUA_TUSERDATA) {
+ cfg = lua_check_config (L, -1);
+ }
+ lua_pop (L, 1);
+ lua_pushstring (L, "session");
+ lua_gettable (L, -2);
+ if (lua_type (L, -1) == LUA_TUSERDATA) {
+ session = lua_check_session (L, -1);
+ }
+ lua_pop (L, 1);
+ lua_pushstring (L, "ev_base");
+ lua_gettable (L, -2);
+ if (lua_type (L, -1) == LUA_TUSERDATA) {
+ ev_base = lua_check_ev_base (L, -1);
+ }
+ lua_pop (L, 1);
+ if (cfg && ev_base) {
+ ret = TRUE;
+ }
else {
- msg_err ("bad callback argument for lua redis");
- lua_pop (L, 1);
+ cfg = task->cfg;
+ session = task->s;
+ ev_base = task->ev_base;
+ ret = TRUE;
- lua_pushstring (L, "cmd");
- lua_gettable (L, -2);
- cmd = lua_tostring (L, -1);
- lua_pop (L, 1);
+ if (pcbref) {
+ lua_pushstring (L, "callback");
+ lua_gettable (L, -2);
+ if (lua_type (L, -1) == LUA_TFUNCTION) {
+ /* This also pops function from the stack */
+ cbref = luaL_ref (L, LUA_REGISTRYINDEX);
+ *pcbref = cbref;
+ }
+ else {
+ *pcbref = -1;
+ lua_pop (L, 1);
+ }
+ }
lua_pushstring (L, "host");
lua_gettable (L, -2);
@@ -588,14 +608,6 @@ lua_redis_make_request (lua_State *L)
- lua_pop (L, 1);
- lua_pushstring (L, "timeout");
- lua_gettable (L, -2);
- if (lua_type (L, -1) == LUA_TNUMBER) {
- timeout = lua_tonumber (L, -1);
- }
lua_pop (L, 1);
lua_pushstring (L, "password");
@@ -613,26 +625,17 @@ lua_redis_make_request (lua_State *L)
lua_pop (L, 1);
- if (task != NULL && addr != NULL && cmd != NULL) {
+ if (ret && addr != NULL) {
ctx = g_slice_alloc0 (sizeof (struct lua_redis_ctx));
REF_INIT_RETAIN (ctx, lua_redis_dtor);
ctx->async = TRUE;
ud = &ctx->d.async;
- ud->task = task;
- ud->pool = task->cfg->redis_pool;
+ ud->s = session;
+ ud->cfg = cfg;
+ ud->pool = cfg->redis_pool;
+ ud->ev_base = ev_base;
ud->L = L;
- sp_ud = g_slice_alloc0 (sizeof (*sp_ud));
- sp_ud->cbref = cbref;
- sp_ud->c = ud;
- lua_pushstring (L, "args");
- lua_gettable (L, -2);
- lua_redis_parse_args (L, -1, cmd, &sp_ud->args, &sp_ud->arglens,
- &sp_ud->nargs);
- lua_pop (L, 1);
- LL_PREPEND (ud->specific, sp_ud);
ret = TRUE;
else {
@@ -641,62 +644,12 @@ lua_redis_make_request (lua_State *L)
msg_err_task_check ("incorrect function invocation");
- }
- }
- else if ((task = lua_check_task (L, 1)) != NULL) {
- addr = lua_check_ip (L, 2);
- top = lua_gettop (L);
- /* Now get callback */
- if (addr != NULL && addr->addr && top >= 4) {
- /* Create userdata */
- ctx = g_slice_alloc0 (sizeof (struct lua_redis_ctx));
- REF_INIT_RETAIN (ctx, lua_redis_dtor);
- ctx->async = TRUE;
- ud = &ctx->d.async;
- ud->task = task;
- ud->pool = task->cfg->redis_pool;
- ud->L = L;
- args_pos = 3;
- if (lua_isfunction (L, 3)) {
- /* Pop other arguments */
- lua_pushvalue (L, 3);
- /* Get a reference */
- cbref = luaL_ref (L, LUA_REGISTRYINDEX);
- args_pos = 4;
- }
- else {
- cbref = -1;
- }
- sp_ud = g_slice_alloc0 (sizeof (*sp_ud));
- sp_ud->cbref = cbref;
- sp_ud->c = ud;
- cmd = luaL_checkstring (L, args_pos);
- if (top > 4) {
- lua_redis_parse_args (L, args_pos + 1, cmd, &sp_ud->args,
- &sp_ud->arglens, &sp_ud->nargs);
- }
- else {
- lua_redis_parse_args (L, 0, cmd, &sp_ud->args,
- &sp_ud->arglens, &sp_ud->nargs);
- }
- LL_PREPEND (ud->specific, sp_ud);
- ret = TRUE;
- }
- else {
- msg_err_task_check ("incorrect function invocation");
+ ret = FALSE;
if (ret) {
ud->terminated = 0;
- ud->timeout = timeout;
ud->ctx = rspamd_redis_pool_connect (ud->pool,
dbname, password,
rspamd_inet_address_to_string (addr->addr),
@@ -714,44 +667,109 @@ lua_redis_make_request (lua_State *L)
- lua_pushboolean (L, FALSE);
- lua_pushnil (L);
- return 2;
+ return NULL;
+ return ctx;
+ }
+ return NULL;
+ * @function rspamd_redis.make_request({params})
+ * Make request to redis server, params is a table of key=value arguments in any order
+ * @param {task} task worker task object
+ * @param {ip|string} host server address
+ * @param {function} callback callback to be called in form `function (task, err, data)`
+ * @param {string} cmd command to be sent to redis
+ * @param {table} args numeric array of strings used as redis arguments
+ * @param {number} timeout timeout in seconds for request (1.0 by default)
+ * @return {boolean} `true` if a request has been scheduled
+ */
+static int
+lua_redis_make_request (lua_State *L)
+ struct lua_redis_specific_userdata *sp_ud;
+ struct lua_redis_userdata *ud;
+ struct lua_redis_ctx *ctx, **pctx;
+ const gchar *cmd = NULL;
+ struct timeval tv;
+ gdouble timeout = REDIS_DEFAULT_TIMEOUT;
+ gint cbref = -1;
+ gboolean ret = FALSE;
+ ctx = rspamd_lua_redis_prepare_connection (L, &cbref);
+ if (ctx) {
+ ud = &ctx->d.async;
+ sp_ud = g_slice_alloc0 (sizeof (*sp_ud));
+ sp_ud->cbref = cbref;
+ sp_ud->c = ud;
+ lua_pushstring (L, "cmd");
+ lua_gettable (L, -2);
+ cmd = lua_tostring (L, -1);
+ lua_pop (L, 1);
+ lua_pushstring (L, "timeout");
+ lua_gettable (L, -2);
+ if (lua_type (L, -1) == LUA_TNUMBER) {
+ timeout = lua_tonumber (L, -1);
+ }
+ lua_pop (L, 1);
+ ud->timeout = timeout;
+ lua_pushstring (L, "args");
+ lua_gettable (L, -2);
+ lua_redis_parse_args (L, -1, cmd, &sp_ud->args, &sp_ud->arglens,
+ &sp_ud->nargs);
+ lua_pop (L, 1);
+ LL_PREPEND (ud->specific, sp_ud);
ret = redisAsyncCommandArgv (ud->ctx,
- lua_redis_callback,
- sp_ud,
- sp_ud->nargs,
- (const gchar **)sp_ud->args,
- sp_ud->arglens);
+ lua_redis_callback,
+ sp_ud,
+ sp_ud->nargs,
+ (const gchar **)sp_ud->args,
+ sp_ud->arglens);
if (ret == REDIS_OK) {
- rspamd_session_add_event (ud->task->s,
- lua_redis_fin,
- sp_ud,
- g_quark_from_static_string ("lua redis"));
- sp_ud->w = rspamd_session_get_watcher (ud->task->s);
- rspamd_session_watcher_push (ud->task->s);
+ if (ud->s) {
+ rspamd_session_add_event (ud->s,
+ lua_redis_fin,
+ sp_ud,
+ g_quark_from_static_string ("lua redis"));
+ sp_ud->w = rspamd_session_get_watcher (ud->s);
+ rspamd_session_watcher_push (ud->s);
+ }
+ else {
+ sp_ud->w = NULL;
+ }
- sp_ud->ctx = ctx;
ctx->cmds_pending ++;
double_to_tv (timeout, &tv);
event_set (&sp_ud->timeout, -1, EV_TIMEOUT, lua_redis_timeout, sp_ud);
- event_base_set (ud->task->ev_base, &sp_ud->timeout);
+ event_base_set (ud->ev_base, &sp_ud->timeout);
event_add (&sp_ud->timeout, &tv);
ret = TRUE;
else {
- msg_info_task_check ("call to redis failed: %s", ud->ctx->errstr);
+ msg_info ("call to redis failed: %s", ud->ctx->errstr);
rspamd_redis_pool_release_connection (ud->pool, ud->ctx, TRUE);
ud->ctx = NULL;
ret = FALSE;
+ else {
+ lua_pushboolean (L, FALSE);
+ lua_pushnil (L);
+ return 2;
+ }
lua_pushboolean (L, ret);
@@ -900,23 +918,15 @@ lua_redis_make_request_sync (lua_State *L)
static int
lua_redis_connect (lua_State *L)
- struct rspamd_lua_ip *addr = NULL;
- rspamd_inet_addr_t *ip = NULL;
- const gchar *host;
- struct lua_redis_ctx *ctx = NULL, **pctx;
struct lua_redis_userdata *ud;
- struct rspamd_task *task = NULL;
- gboolean ret = FALSE;
+ struct lua_redis_ctx *ctx, **pctx;
gdouble timeout = REDIS_DEFAULT_TIMEOUT;
+ gboolean ret = FALSE;
- if (lua_istable (L, 1)) {
- /* Table version */
- lua_pushstring (L, "task");
- lua_gettable (L, -2);
- if (lua_type (L, -1) == LUA_TUSERDATA) {
- task = lua_check_task (L, -1);
- }
- lua_pop (L, 1);
+ ctx = rspamd_lua_redis_prepare_connection (L, NULL);
+ if (ctx) {
+ ud = &ctx->d.async;
lua_pushstring (L, "timeout");
lua_gettable (L, -2);
@@ -924,63 +934,16 @@ lua_redis_connect (lua_State *L)
timeout = lua_tonumber (L, -1);
lua_pop (L, 1);
- lua_pushstring (L, "host");
- lua_gettable (L, -2);
- if (lua_type (L, -1) == LUA_TUSERDATA) {
- addr = lua_check_ip (L, -1);
- }
- else if (lua_type (L, -1) == LUA_TSTRING) {
- host = lua_tostring (L, -1);
- if (rspamd_parse_inet_address (&ip, host, strlen (host))) {
- addr = g_alloca (sizeof (*addr));
- addr->addr = ip;
- if (rspamd_inet_address_get_port (ip) == 0) {
- rspamd_inet_address_set_port (ip, 6379);
- }
- if (task) {
- rspamd_mempool_add_destructor (task->task_pool,
- (rspamd_mempool_destruct_t)rspamd_inet_address_destroy,
- ip);
- }
- }
- }
- lua_pop (L, 1);
- if (task != NULL && addr != NULL) {
- ctx = g_slice_alloc0 (sizeof (struct lua_redis_ctx));
- REF_INIT_RETAIN (ctx, lua_redis_dtor);
- ctx->async = TRUE;
- ud = &ctx->d.async;
- ud->task = task;
- ud->pool = task->cfg->redis_pool;
- ud->L = L;
- ret = TRUE;
- }
- }
- if (ret && ctx) {
- ud->terminated = 0;
ud->timeout = timeout;
- ud->ctx = rspamd_redis_pool_connect (ud->pool,
- rspamd_inet_address_to_string (addr->addr),
- rspamd_inet_address_get_port (addr->addr));
- if (ud->ctx == NULL || ud->ctx->err) {
- msg_err_task_check ("cannot connect to redis: %s",
- ud->ctx->errstr);
- lua_pushboolean (L, FALSE);
+ }
+ else {
+ lua_pushboolean (L, FALSE);
+ lua_pushnil (L);
- return 1;
- }
+ return 2;
+ }
+ if (ret) {
pctx = lua_newuserdata (L, sizeof (ctx));
*pctx = ctx;
rspamd_lua_setclass (L, "rspamd{redis}", -1);
@@ -1099,6 +1062,7 @@ lua_redis_add_cmd (lua_State *L)
struct lua_redis_ctx *ctx = lua_check_redis (L, 1);
struct lua_redis_specific_userdata *sp_ud;
+ struct lua_redis_userdata *ud;
const gchar *cmd = NULL;
gint args_pos = 2;
gchar **args = NULL;
@@ -1106,12 +1070,11 @@ lua_redis_add_cmd (lua_State *L)
guint nargs = 0;
gint cbref = -1, ret;
struct timeval tv;
- struct rspamd_task *task;
if (ctx) {
if (ctx->async) {
- task = ctx->d.async.task;
+ ud = &ctx->d.async;
/* Async version */
if (lua_type (L, 2) == LUA_TSTRING) {
@@ -1147,22 +1110,24 @@ lua_redis_add_cmd (lua_State *L)
if (ret == REDIS_OK) {
- rspamd_session_add_event (sp_ud->c->task->s,
- lua_redis_fin,
- sp_ud,
- g_quark_from_static_string ("lua redis"));
- sp_ud->w = rspamd_session_get_watcher (sp_ud->c->task->s);
- rspamd_session_watcher_push (sp_ud->c->task->s);
+ if (ud->s) {
+ rspamd_session_add_event (ud->s,
+ lua_redis_fin,
+ sp_ud,
+ g_quark_from_static_string ("lua redis"));
+ sp_ud->w = rspamd_session_get_watcher (ud->s);
+ rspamd_session_watcher_push (ud->s);
+ }
double_to_tv (sp_ud->c->timeout, &tv);
event_set (&sp_ud->timeout, -1, EV_TIMEOUT, lua_redis_timeout, sp_ud);
- event_base_set (sp_ud->c->task->ev_base, &sp_ud->timeout);
+ event_base_set (ud->ev_base, &sp_ud->timeout);
event_add (&sp_ud->timeout, &tv);
ctx->cmds_pending ++;
else {
- msg_info_task_check ("call to redis failed: %s",
+ msg_info ("call to redis failed: %s",
lua_pushboolean (L, 0);
lua_pushstring (L, sp_ud->c->ctx->errstr);