diff options
author | Vsevolod Stakhov <vsevolod@highsecure.ru> | 2016-10-17 16:03:22 +0100 |
---|---|---|
committer | Vsevolod Stakhov <vsevolod@highsecure.ru> | 2016-10-17 16:03:22 +0100 |
commit | 45a1c1e24ac3ac96b8207c9c83ba397114223c4f (patch) | |
tree | 15a68bbe37da4d30afc58b693a4a08dcc115ae89 /src/lua/lua_redis.c | |
parent | ed77fe58566345ed84bd4db9fc5313c246e5ff99 (diff) | |
download | rspamd-45a1c1e24ac3ac96b8207c9c83ba397114223c4f.tar.gz rspamd-45a1c1e24ac3ac96b8207c9c83ba397114223c4f.zip |
[Rework] Make lua_redis task agnostic
Diffstat (limited to 'src/lua/lua_redis.c')
-rw-r--r-- | src/lua/lua_redis.c | 405 |
1 files changed, 185 insertions, 220 deletions
diff --git a/src/lua/lua_redis.c b/src/lua/lua_redis.c index 2466f63d4..8d027eb6f 100644 --- a/src/lua/lua_redis.c +++ b/src/lua/lua_redis.c @@ -94,7 +94,9 @@ struct lua_redis_specific_userdata; struct lua_redis_userdata { redisAsyncContext *ctx; lua_State *L; - struct rspamd_task *task; + struct rspamd_async_session *s; + struct event_base *ev_base; + struct rspamd_config *cfg; struct rspamd_redis_pool *pool; gchar *server; gchar *reqline; @@ -237,31 +239,31 @@ lua_redis_push_error (const gchar *err, struct lua_redis_specific_userdata *sp_ud, gboolean connected) { - struct rspamd_task **ptask; struct lua_redis_userdata *ud = sp_ud->c; if (!sp_ud->replied && !sp_ud->finished) { - if (sp_ud->cbref != -1 && ud->task) { + if (sp_ud->cbref != -1) { /* Push error */ lua_rawgeti (ud->L, LUA_REGISTRYINDEX, sp_ud->cbref); - ptask = lua_newuserdata (ud->L, sizeof (struct rspamd_task *)); - rspamd_lua_setclass (ud->L, "rspamd{task}", -1); - *ptask = ud->task; /* String of error */ lua_pushstring (ud->L, err); /* Data is nil */ lua_pushnil (ud->L); - if (lua_pcall (ud->L, 3, 0, 0) != 0) { + + if (lua_pcall (ud->L, 2, 0, 0) != 0) { msg_info ("call to callback failed: %s", lua_tostring (ud->L, -1)); lua_pop (ud->L, 1); } } sp_ud->replied = TRUE; - if (connected && ud->task) { - rspamd_session_watcher_pop (ud->task->s, sp_ud->w); - rspamd_session_remove_event (ud->task->s, lua_redis_fin, sp_ud); + if (connected && ud->s) { + rspamd_session_watcher_pop (ud->s, sp_ud->w); + rspamd_session_remove_event (ud->s, lua_redis_fin, sp_ud); + } + else { + lua_redis_fin (sp_ud); } } } @@ -305,23 +307,18 @@ static void lua_redis_push_data (const redisReply *r, struct lua_redis_ctx *ctx, struct lua_redis_specific_userdata *sp_ud) { - struct rspamd_task **ptask; struct lua_redis_userdata *ud = sp_ud->c; if (!sp_ud->replied && !sp_ud->finished) { - if (sp_ud->cbref != -1 && ud->task) { + if (sp_ud->cbref != -1) { /* Push error */ lua_rawgeti (ud->L, LUA_REGISTRYINDEX, sp_ud->cbref); - ptask = lua_newuserdata (ud->L, sizeof (struct rspamd_task *)); - rspamd_lua_setclass (ud->L, "rspamd{task}", -1); - - *ptask = ud->task; /* Error is nil */ lua_pushnil (ud->L); /* Data */ lua_redis_push_reply (ud->L, r); - if (lua_pcall (ud->L, 3, 0, 0) != 0) { + if (lua_pcall (ud->L, 2, 0, 0) != 0) { msg_info ("call to callback failed: %s", lua_tostring (ud->L, -1)); lua_pop (ud->L, 1); } @@ -330,9 +327,12 @@ lua_redis_push_data (const redisReply *r, struct lua_redis_ctx *ctx, sp_ud->replied = TRUE; - if (ud->task) { - rspamd_session_watcher_pop (ud->task->s, sp_ud->w); - rspamd_session_remove_event (ud->task->s, lua_redis_fin, sp_ud); + if (ud->s) { + rspamd_session_watcher_pop (ud->s, sp_ud->w); + rspamd_session_remove_event (ud->s, lua_redis_fin, sp_ud); + } + else { + lua_redis_fin (sp_ud); } } } @@ -511,58 +511,78 @@ lua_redis_parse_args (lua_State *L, gint idx, const gchar *cmd, *nargs = top; } - -/*** - * @function rspamd_redis.make_request({params}) - * Make request to redis server, params is a table of key=value arguments in any order - * @param {task} task worker task object - * @param {ip|string} host server address - * @param {function} callback callback to be called in form `function (task, err, data)` - * @param {string} cmd command to be sent to redis - * @param {table} args numeric array of strings used as redis arguments - * @param {number} timeout timeout in seconds for request (1.0 by default) - * @return {boolean} `true` if a request has been scheduled - */ -static int -lua_redis_make_request (lua_State *L) +static struct lua_redis_ctx * +rspamd_lua_redis_prepare_connection (lua_State *L, gint *pcbref) { - struct lua_redis_ctx *ctx, **pctx; + struct lua_redis_ctx *ctx; rspamd_inet_addr_t *ip = NULL; struct lua_redis_userdata *ud; - struct lua_redis_specific_userdata *sp_ud; struct rspamd_lua_ip *addr = NULL; struct rspamd_task *task = NULL; - const gchar *cmd = NULL, *host; + const gchar *host; const gchar *password = NULL, *dbname = NULL; - gint top, cbref = -1, args_pos; - struct timeval tv; + gint cbref = -1; + struct rspamd_config *cfg = NULL; + struct rspamd_async_session *session = NULL; + struct event_base *ev_base = NULL; gboolean ret = FALSE; - gdouble timeout = REDIS_DEFAULT_TIMEOUT; if (lua_istable (L, 1)) { /* Table version */ lua_pushstring (L, "task"); lua_gettable (L, -2); if (lua_type (L, -1) == LUA_TUSERDATA) { - task = lua_check_task (L, -1); + task = lua_check_task_maybe (L, -1); } lua_pop (L, 1); - lua_pushstring (L, "callback"); - lua_gettable (L, -2); - if (lua_type (L, -1) == LUA_TFUNCTION) { - /* This also pops function from the stack */ - cbref = luaL_ref (L, LUA_REGISTRYINDEX); + if (!task) { + /* We need to get ev_base, config and session separately */ + lua_pushstring (L, "config"); + lua_gettable (L, -2); + if (lua_type (L, -1) == LUA_TUSERDATA) { + cfg = lua_check_config (L, -1); + } + lua_pop (L, 1); + + lua_pushstring (L, "session"); + lua_gettable (L, -2); + if (lua_type (L, -1) == LUA_TUSERDATA) { + session = lua_check_session (L, -1); + } + lua_pop (L, 1); + + lua_pushstring (L, "ev_base"); + lua_gettable (L, -2); + if (lua_type (L, -1) == LUA_TUSERDATA) { + ev_base = lua_check_ev_base (L, -1); + } + lua_pop (L, 1); + + if (cfg && ev_base) { + ret = TRUE; + } } else { - msg_err ("bad callback argument for lua redis"); - lua_pop (L, 1); + cfg = task->cfg; + session = task->s; + ev_base = task->ev_base; + ret = TRUE; } - lua_pushstring (L, "cmd"); - lua_gettable (L, -2); - cmd = lua_tostring (L, -1); - lua_pop (L, 1); + if (pcbref) { + lua_pushstring (L, "callback"); + lua_gettable (L, -2); + if (lua_type (L, -1) == LUA_TFUNCTION) { + /* This also pops function from the stack */ + cbref = luaL_ref (L, LUA_REGISTRYINDEX); + *pcbref = cbref; + } + else { + *pcbref = -1; + lua_pop (L, 1); + } + } lua_pushstring (L, "host"); lua_gettable (L, -2); @@ -588,14 +608,6 @@ lua_redis_make_request (lua_State *L) } } } - - lua_pop (L, 1); - - lua_pushstring (L, "timeout"); - lua_gettable (L, -2); - if (lua_type (L, -1) == LUA_TNUMBER) { - timeout = lua_tonumber (L, -1); - } lua_pop (L, 1); lua_pushstring (L, "password"); @@ -613,26 +625,17 @@ lua_redis_make_request (lua_State *L) lua_pop (L, 1); - if (task != NULL && addr != NULL && cmd != NULL) { + if (ret && addr != NULL) { ctx = g_slice_alloc0 (sizeof (struct lua_redis_ctx)); REF_INIT_RETAIN (ctx, lua_redis_dtor); ctx->async = TRUE; ud = &ctx->d.async; - ud->task = task; - ud->pool = task->cfg->redis_pool; + ud->s = session; + ud->cfg = cfg; + ud->pool = cfg->redis_pool; + ud->ev_base = ev_base; ud->L = L; - sp_ud = g_slice_alloc0 (sizeof (*sp_ud)); - sp_ud->cbref = cbref; - sp_ud->c = ud; - - lua_pushstring (L, "args"); - lua_gettable (L, -2); - lua_redis_parse_args (L, -1, cmd, &sp_ud->args, &sp_ud->arglens, - &sp_ud->nargs); - lua_pop (L, 1); - LL_PREPEND (ud->specific, sp_ud); - ret = TRUE; } else { @@ -641,62 +644,12 @@ lua_redis_make_request (lua_State *L) } msg_err_task_check ("incorrect function invocation"); - } - } - else if ((task = lua_check_task (L, 1)) != NULL) { - addr = lua_check_ip (L, 2); - top = lua_gettop (L); - - /* Now get callback */ - if (addr != NULL && addr->addr && top >= 4) { - /* Create userdata */ - ctx = g_slice_alloc0 (sizeof (struct lua_redis_ctx)); - REF_INIT_RETAIN (ctx, lua_redis_dtor); - ctx->async = TRUE; - ud = &ctx->d.async; - ud->task = task; - ud->pool = task->cfg->redis_pool; - ud->L = L; - - args_pos = 3; - - if (lua_isfunction (L, 3)) { - /* Pop other arguments */ - lua_pushvalue (L, 3); - /* Get a reference */ - cbref = luaL_ref (L, LUA_REGISTRYINDEX); - args_pos = 4; - } - else { - cbref = -1; - } - - - sp_ud = g_slice_alloc0 (sizeof (*sp_ud)); - sp_ud->cbref = cbref; - sp_ud->c = ud; - cmd = luaL_checkstring (L, args_pos); - if (top > 4) { - lua_redis_parse_args (L, args_pos + 1, cmd, &sp_ud->args, - &sp_ud->arglens, &sp_ud->nargs); - } - else { - lua_redis_parse_args (L, 0, cmd, &sp_ud->args, - &sp_ud->arglens, &sp_ud->nargs); - } - - LL_PREPEND (ud->specific, sp_ud); - - ret = TRUE; - } - else { - msg_err_task_check ("incorrect function invocation"); + ret = FALSE; } } if (ret) { ud->terminated = 0; - ud->timeout = timeout; ud->ctx = rspamd_redis_pool_connect (ud->pool, dbname, password, rspamd_inet_address_to_string (addr->addr), @@ -714,44 +667,109 @@ lua_redis_make_request (lua_State *L) } REDIS_RELEASE (ctx); - lua_pushboolean (L, FALSE); - lua_pushnil (L); - return 2; + return NULL; } + + return ctx; + } + + return NULL; +} + +/*** + * @function rspamd_redis.make_request({params}) + * Make request to redis server, params is a table of key=value arguments in any order + * @param {task} task worker task object + * @param {ip|string} host server address + * @param {function} callback callback to be called in form `function (task, err, data)` + * @param {string} cmd command to be sent to redis + * @param {table} args numeric array of strings used as redis arguments + * @param {number} timeout timeout in seconds for request (1.0 by default) + * @return {boolean} `true` if a request has been scheduled + */ +static int +lua_redis_make_request (lua_State *L) +{ + struct lua_redis_specific_userdata *sp_ud; + struct lua_redis_userdata *ud; + struct lua_redis_ctx *ctx, **pctx; + const gchar *cmd = NULL; + struct timeval tv; + gdouble timeout = REDIS_DEFAULT_TIMEOUT; + gint cbref = -1; + gboolean ret = FALSE; + + ctx = rspamd_lua_redis_prepare_connection (L, &cbref); + + if (ctx) { + ud = &ctx->d.async; + sp_ud = g_slice_alloc0 (sizeof (*sp_ud)); + sp_ud->cbref = cbref; + sp_ud->c = ud; + + lua_pushstring (L, "cmd"); + lua_gettable (L, -2); + cmd = lua_tostring (L, -1); + lua_pop (L, 1); + + lua_pushstring (L, "timeout"); + lua_gettable (L, -2); + if (lua_type (L, -1) == LUA_TNUMBER) { + timeout = lua_tonumber (L, -1); + } + lua_pop (L, 1); + ud->timeout = timeout; + + lua_pushstring (L, "args"); + lua_gettable (L, -2); + lua_redis_parse_args (L, -1, cmd, &sp_ud->args, &sp_ud->arglens, + &sp_ud->nargs); + lua_pop (L, 1); + LL_PREPEND (ud->specific, sp_ud); ret = redisAsyncCommandArgv (ud->ctx, - lua_redis_callback, - sp_ud, - sp_ud->nargs, - (const gchar **)sp_ud->args, - sp_ud->arglens); + lua_redis_callback, + sp_ud, + sp_ud->nargs, + (const gchar **)sp_ud->args, + sp_ud->arglens); if (ret == REDIS_OK) { - rspamd_session_add_event (ud->task->s, - lua_redis_fin, - sp_ud, - g_quark_from_static_string ("lua redis")); - sp_ud->w = rspamd_session_get_watcher (ud->task->s); - rspamd_session_watcher_push (ud->task->s); + if (ud->s) { + rspamd_session_add_event (ud->s, + lua_redis_fin, + sp_ud, + g_quark_from_static_string ("lua redis")); + sp_ud->w = rspamd_session_get_watcher (ud->s); + rspamd_session_watcher_push (ud->s); + } + else { + sp_ud->w = NULL; + } - sp_ud->ctx = ctx; REDIS_RETAIN (ctx); ctx->cmds_pending ++; double_to_tv (timeout, &tv); event_set (&sp_ud->timeout, -1, EV_TIMEOUT, lua_redis_timeout, sp_ud); - event_base_set (ud->task->ev_base, &sp_ud->timeout); + event_base_set (ud->ev_base, &sp_ud->timeout); event_add (&sp_ud->timeout, &tv); ret = TRUE; } else { - msg_info_task_check ("call to redis failed: %s", ud->ctx->errstr); + msg_info ("call to redis failed: %s", ud->ctx->errstr); rspamd_redis_pool_release_connection (ud->pool, ud->ctx, TRUE); ud->ctx = NULL; REDIS_RELEASE (ctx); ret = FALSE; } } + else { + lua_pushboolean (L, FALSE); + lua_pushnil (L); + + return 2; + } lua_pushboolean (L, ret); @@ -900,23 +918,15 @@ lua_redis_make_request_sync (lua_State *L) static int lua_redis_connect (lua_State *L) { - struct rspamd_lua_ip *addr = NULL; - rspamd_inet_addr_t *ip = NULL; - const gchar *host; - struct lua_redis_ctx *ctx = NULL, **pctx; struct lua_redis_userdata *ud; - struct rspamd_task *task = NULL; - gboolean ret = FALSE; + struct lua_redis_ctx *ctx, **pctx; gdouble timeout = REDIS_DEFAULT_TIMEOUT; + gboolean ret = FALSE; - if (lua_istable (L, 1)) { - /* Table version */ - lua_pushstring (L, "task"); - lua_gettable (L, -2); - if (lua_type (L, -1) == LUA_TUSERDATA) { - task = lua_check_task (L, -1); - } - lua_pop (L, 1); + ctx = rspamd_lua_redis_prepare_connection (L, NULL); + + if (ctx) { + ud = &ctx->d.async; lua_pushstring (L, "timeout"); lua_gettable (L, -2); @@ -924,63 +934,16 @@ lua_redis_connect (lua_State *L) timeout = lua_tonumber (L, -1); } lua_pop (L, 1); - - lua_pushstring (L, "host"); - lua_gettable (L, -2); - - if (lua_type (L, -1) == LUA_TUSERDATA) { - addr = lua_check_ip (L, -1); - } - else if (lua_type (L, -1) == LUA_TSTRING) { - host = lua_tostring (L, -1); - - if (rspamd_parse_inet_address (&ip, host, strlen (host))) { - addr = g_alloca (sizeof (*addr)); - addr->addr = ip; - - if (rspamd_inet_address_get_port (ip) == 0) { - rspamd_inet_address_set_port (ip, 6379); - } - - if (task) { - rspamd_mempool_add_destructor (task->task_pool, - (rspamd_mempool_destruct_t)rspamd_inet_address_destroy, - ip); - } - } - } - - lua_pop (L, 1); - - if (task != NULL && addr != NULL) { - ctx = g_slice_alloc0 (sizeof (struct lua_redis_ctx)); - REF_INIT_RETAIN (ctx, lua_redis_dtor); - ctx->async = TRUE; - ud = &ctx->d.async; - ud->task = task; - ud->pool = task->cfg->redis_pool; - ud->L = L; - ret = TRUE; - } - } - - if (ret && ctx) { - ud->terminated = 0; ud->timeout = timeout; - ud->ctx = rspamd_redis_pool_connect (ud->pool, - NULL, NULL, - rspamd_inet_address_to_string (addr->addr), - rspamd_inet_address_get_port (addr->addr)); - - if (ud->ctx == NULL || ud->ctx->err) { - msg_err_task_check ("cannot connect to redis: %s", - ud->ctx->errstr); - REDIS_RELEASE (ctx); - lua_pushboolean (L, FALSE); + } + else { + lua_pushboolean (L, FALSE); + lua_pushnil (L); - return 1; - } + return 2; + } + if (ret) { pctx = lua_newuserdata (L, sizeof (ctx)); *pctx = ctx; rspamd_lua_setclass (L, "rspamd{redis}", -1); @@ -1099,6 +1062,7 @@ lua_redis_add_cmd (lua_State *L) { struct lua_redis_ctx *ctx = lua_check_redis (L, 1); struct lua_redis_specific_userdata *sp_ud; + struct lua_redis_userdata *ud; const gchar *cmd = NULL; gint args_pos = 2; gchar **args = NULL; @@ -1106,12 +1070,11 @@ lua_redis_add_cmd (lua_State *L) guint nargs = 0; gint cbref = -1, ret; struct timeval tv; - struct rspamd_task *task; if (ctx) { if (ctx->async) { - task = ctx->d.async.task; + ud = &ctx->d.async; /* Async version */ if (lua_type (L, 2) == LUA_TSTRING) { @@ -1147,22 +1110,24 @@ lua_redis_add_cmd (lua_State *L) sp_ud->arglens); if (ret == REDIS_OK) { - rspamd_session_add_event (sp_ud->c->task->s, - lua_redis_fin, - sp_ud, - g_quark_from_static_string ("lua redis")); - sp_ud->w = rspamd_session_get_watcher (sp_ud->c->task->s); - rspamd_session_watcher_push (sp_ud->c->task->s); + if (ud->s) { + rspamd_session_add_event (ud->s, + lua_redis_fin, + sp_ud, + g_quark_from_static_string ("lua redis")); + sp_ud->w = rspamd_session_get_watcher (ud->s); + rspamd_session_watcher_push (ud->s); + } double_to_tv (sp_ud->c->timeout, &tv); event_set (&sp_ud->timeout, -1, EV_TIMEOUT, lua_redis_timeout, sp_ud); - event_base_set (sp_ud->c->task->ev_base, &sp_ud->timeout); + event_base_set (ud->ev_base, &sp_ud->timeout); event_add (&sp_ud->timeout, &tv); REDIS_RETAIN (ctx); ctx->cmds_pending ++; } else { - msg_info_task_check ("call to redis failed: %s", + msg_info ("call to redis failed: %s", sp_ud->c->ctx->errstr); lua_pushboolean (L, 0); lua_pushstring (L, sp_ud->c->ctx->errstr); |