/*- * Copyright 2016 Vsevolod Stakhov * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #include "lua_common.h" #include "lua_thread_pool.h" #include "utlist.h" #include "contrib/hiredis/hiredis.h" #include "contrib/hiredis/async.h" #define REDIS_DEFAULT_TIMEOUT 1.0 static const gchar *M = "rspamd lua redis"; static void *redis_null; /*** * @module rspamd_redis * This module implements redis asynchronous client for rspamd LUA API. * Here is an example of using of this module: * @example local rspamd_redis = require "rspamd_redis" local rspamd_logger = require "rspamd_logger" local function symbol_callback(task) local redis_key = 'some_key' local function redis_cb(err, data) if not err then rspamd_logger.infox('redis returned %1=%2', redis_key, data) end end rspamd_redis.make_request(task, "127.0.0.1:6379", redis_cb, 'GET', {redis_key}) -- or in table form: -- rspamd_redis.make_request({task=task, host="127.0.0.1:6379, -- callback=redis_cb, timeout=2.0, cmd='GET', args={redis_key}}) end */ LUA_FUNCTION_DEF(redis, make_request); LUA_FUNCTION_DEF(redis, make_request_sync); LUA_FUNCTION_DEF(redis, connect); LUA_FUNCTION_DEF(redis, connect_sync); LUA_FUNCTION_DEF(redis, add_cmd); LUA_FUNCTION_DEF(redis, exec); LUA_FUNCTION_DEF(redis, gc); static const struct luaL_reg redislib_f[] = { LUA_INTERFACE_DEF(redis, make_request), LUA_INTERFACE_DEF(redis, make_request_sync), LUA_INTERFACE_DEF(redis, connect), LUA_INTERFACE_DEF(redis, connect_sync), {NULL, NULL}}; static const struct luaL_reg redislib_m[] = { LUA_INTERFACE_DEF(redis, add_cmd), LUA_INTERFACE_DEF(redis, exec), {"__gc", lua_redis_gc}, {"__tostring", rspamd_lua_class_tostring}, {NULL, NULL}}; #undef REDIS_DEBUG_REFS #ifdef REDIS_DEBUG_REFS #define REDIS_RETAIN(x) \ do { \ msg_err("retain ref %p, refcount: %d", (x), (x)->ref.refcount); \ REF_RETAIN(x); \ } while (0) #define REDIS_RELEASE(x) \ do { \ msg_err("release ref %p, refcount: %d", (x), (x)->ref.refcount); \ REF_RELEASE(x); \ } while (0) #else #define REDIS_RETAIN REF_RETAIN #define REDIS_RELEASE REF_RELEASE #endif struct lua_redis_request_specific_userdata; /** * Struct for userdata representation */ struct lua_redis_userdata { redisAsyncContext *ctx; struct rspamd_task *task; struct rspamd_symcache_dynamic_item *item; struct rspamd_async_session *s; struct ev_loop *event_loop; struct rspamd_config *cfg; struct rspamd_redis_pool *pool; gchar *server; gchar log_tag[RSPAMD_LOG_ID_LEN + 1]; struct lua_redis_request_specific_userdata *specific; gdouble timeout; guint16 port; guint16 terminated; }; #define msg_debug_lua_redis(...) rspamd_conditional_debug_fast(NULL, NULL, \ rspamd_lua_redis_log_id, "lua_redis", ud->log_tag, \ G_STRFUNC, \ __VA_ARGS__) INIT_LOG_MODULE(lua_redis) #define LUA_REDIS_SPECIFIC_REPLIED (1 << 0) /* session was finished */ #define LUA_REDIS_SPECIFIC_FINISHED (1 << 1) #define LUA_REDIS_ASYNC (1 << 0) #define LUA_REDIS_TEXTDATA (1 << 1) #define LUA_REDIS_TERMINATED (1 << 2) #define LUA_REDIS_NO_POOL (1 << 3) #define LUA_REDIS_SUBSCRIBED (1 << 4) #define IS_ASYNC(ctx) ((ctx)->flags & LUA_REDIS_ASYNC) struct lua_redis_request_specific_userdata { gint cbref; guint nargs; gchar **args; gsize *arglens; struct lua_redis_userdata *c; struct lua_redis_ctx *ctx; struct lua_redis_request_specific_userdata *next; ev_timer timeout_ev; guint flags; }; struct lua_redis_ctx { guint flags; struct lua_redis_userdata async; guint cmds_pending; ref_entry_t ref; GQueue *replies; /* for sync connection only */ GQueue *events_cleanup; /* for sync connection only */ struct thread_entry *thread; /* for sync mode, set only if there was yield */ }; struct lua_redis_result { gboolean is_error; gint result_ref; struct rspamd_symcache_dynamic_item *item; struct rspamd_async_session *s; struct rspamd_task *task; struct lua_redis_request_specific_userdata *sp_ud; }; static struct lua_redis_ctx * lua_check_redis(lua_State *L, gint pos) { void *ud = rspamd_lua_check_udata(L, pos, "rspamd{redis}"); luaL_argcheck(L, ud != NULL, pos, "'redis' expected"); return ud ? *((struct lua_redis_ctx **) ud) : NULL; } static void lua_redis_free_args(char **args, gsize *arglens, guint nargs) { guint i; if (args) { for (i = 0; i < nargs; i++) { g_free(args[i]); } g_free(args); g_free(arglens); } } static void lua_redis_dtor(struct lua_redis_ctx *ctx) { struct lua_redis_userdata *ud; struct lua_redis_request_specific_userdata *cur, *tmp; gboolean is_successful = TRUE; struct redisAsyncContext *ac; ud = &ctx->async; msg_debug_lua_redis("destructing %p", ctx); if (ud->ctx) { LL_FOREACH_SAFE(ud->specific, cur, tmp) { ev_timer_stop(ud->event_loop, &cur->timeout_ev); if (!(cur->flags & LUA_REDIS_SPECIFIC_REPLIED)) { is_successful = FALSE; } cur->flags |= LUA_REDIS_SPECIFIC_FINISHED; } ctx->flags |= LUA_REDIS_TERMINATED; ud->terminated = 1; ac = ud->ctx; ud->ctx = NULL; if (!is_successful) { rspamd_redis_pool_release_connection(ud->pool, ac, RSPAMD_REDIS_RELEASE_FATAL); } else { rspamd_redis_pool_release_connection(ud->pool, ac, (ctx->flags & LUA_REDIS_NO_POOL) ? RSPAMD_REDIS_RELEASE_ENFORCE : RSPAMD_REDIS_RELEASE_DEFAULT); } } LL_FOREACH_SAFE(ud->specific, cur, tmp) { lua_redis_free_args(cur->args, cur->arglens, cur->nargs); if (cur->cbref != -1) { luaL_unref(ud->cfg->lua_state, LUA_REGISTRYINDEX, cur->cbref); } g_free(cur); } if (ctx->events_cleanup) { g_queue_free(ctx->events_cleanup); ctx->events_cleanup = NULL; } if (ctx->replies) { g_queue_free(ctx->replies); ctx->replies = NULL; } g_free(ctx); } static gint lua_redis_gc(lua_State *L) { struct lua_redis_ctx *ctx = lua_check_redis(L, 1); if (ctx) { REDIS_RELEASE(ctx); } return 0; } static void lua_redis_fin(void *arg) { struct lua_redis_request_specific_userdata *sp_ud = arg; struct lua_redis_userdata *ud; struct lua_redis_ctx *ctx; ctx = sp_ud->ctx; ud = sp_ud->c; if (ev_can_stop(&sp_ud->timeout_ev)) { ev_timer_stop(sp_ud->ctx->async.event_loop, &sp_ud->timeout_ev); } msg_debug_lua_redis("finished redis query %p from session %p; refcount=%d", sp_ud, ctx, ctx->ref.refcount); sp_ud->flags |= LUA_REDIS_SPECIFIC_FINISHED; REDIS_RELEASE(ctx); } /** * Push error of redis request to lua callback * @param code * @param ud */ static void lua_redis_push_error(const gchar *err, struct lua_redis_ctx *ctx, struct lua_redis_request_specific_userdata *sp_ud, gboolean connected) { struct lua_redis_userdata *ud = sp_ud->c; struct lua_callback_state cbs; lua_State *L; if (!(sp_ud->flags & (LUA_REDIS_SPECIFIC_REPLIED | LUA_REDIS_SPECIFIC_FINISHED))) { if (sp_ud->cbref != -1) { lua_thread_pool_prepare_callback(ud->cfg->lua_thread_pool, &cbs); L = cbs.L; lua_pushcfunction(L, &rspamd_lua_traceback); int err_idx = lua_gettop(L); /* Push error */ lua_rawgeti(cbs.L, LUA_REGISTRYINDEX, sp_ud->cbref); /* String of error */ lua_pushstring(cbs.L, err); /* Data is nil */ lua_pushnil(cbs.L); if (ud->item) { rspamd_symcache_set_cur_item(ud->task, ud->item); } if (lua_pcall(cbs.L, 2, 0, err_idx) != 0) { msg_info("call to callback failed: %s", lua_tostring(cbs.L, -1)); } lua_settop(L, err_idx - 1); lua_thread_pool_restore_callback(&cbs); } sp_ud->flags |= LUA_REDIS_SPECIFIC_REPLIED; if (connected && ud->s) { if (ud->item) { rspamd_symcache_item_async_dec_check(ud->task, ud->item, M); } rspamd_session_remove_event(ud->s, lua_redis_fin, sp_ud); } else { lua_redis_fin(sp_ud); } } } static void lua_redis_push_reply(lua_State *L, const redisReply *r, gboolean text_data) { guint i; struct rspamd_lua_text *t; switch (r->type) { case REDIS_REPLY_INTEGER: lua_pushinteger(L, r->integer); break; case REDIS_REPLY_NIL: lua_getfield(L, LUA_REGISTRYINDEX, "redis.null"); break; case REDIS_REPLY_STRING: case REDIS_REPLY_STATUS: if (text_data) { t = lua_newuserdata(L, sizeof(*t)); rspamd_lua_setclass(L, "rspamd{text}", -1); t->flags = 0; t->start = r->str; t->len = r->len; } else { lua_pushlstring(L, r->str, r->len); } break; case REDIS_REPLY_ARRAY: lua_createtable(L, r->elements, 0); for (i = 0; i < r->elements; ++i) { lua_redis_push_reply(L, r->element[i], text_data); lua_rawseti(L, -2, i + 1); /* Store sub-reply */ } break; default: /* should not happen */ msg_info("unknown reply type: %d", r->type); break; } } /** * Push data of redis request to lua callback * @param r redis reply data * @param ud */ static void lua_redis_push_data(const redisReply *r, struct lua_redis_ctx *ctx, struct lua_redis_request_specific_userdata *sp_ud) { struct lua_redis_userdata *ud = sp_ud->c; struct lua_callback_state cbs; lua_State *L; if (!(sp_ud->flags & (LUA_REDIS_SPECIFIC_REPLIED | LUA_REDIS_SPECIFIC_FINISHED)) || (sp_ud->flags & LUA_REDIS_SUBSCRIBED)) { if (sp_ud->cbref != -1) { lua_thread_pool_prepare_callback(ud->cfg->lua_thread_pool, &cbs); L = cbs.L; lua_pushcfunction(L, &rspamd_lua_traceback); int err_idx = lua_gettop(L); /* Push error */ lua_rawgeti(cbs.L, LUA_REGISTRYINDEX, sp_ud->cbref); /* Error is nil */ lua_pushnil(cbs.L); /* Data */ lua_redis_push_reply(cbs.L, r, ctx->flags & LUA_REDIS_TEXTDATA); if (ud->item) { rspamd_symcache_set_cur_item(ud->task, ud->item); } gint ret = lua_pcall(cbs.L, 2, 0, err_idx); if (ret != 0) { msg_info("call to lua_redis callback failed (%d): %s", ret, lua_tostring(cbs.L, -1)); } lua_settop(L, err_idx - 1); lua_thread_pool_restore_callback(&cbs); } if (sp_ud->flags & LUA_REDIS_SUBSCRIBED) { if (!(sp_ud->flags & LUA_REDIS_SPECIFIC_REPLIED)) { if (ev_can_stop(&sp_ud->timeout_ev)) { ev_timer_stop(sp_ud->ctx->async.event_loop, &sp_ud->timeout_ev); } } } sp_ud->flags |= LUA_REDIS_SPECIFIC_REPLIED; if (!(sp_ud->flags & LUA_REDIS_SUBSCRIBED)) { if (ud->s) { if (ud->item) { rspamd_symcache_item_async_dec_check(ud->task, ud->item, M); } rspamd_session_remove_event(ud->s, lua_redis_fin, sp_ud); } else { lua_redis_fin(sp_ud); } } } } /** * Callback for redis replies * @param c context of redis connection * @param r redis reply * @param priv userdata */ static void lua_redis_callback(redisAsyncContext *c, gpointer r, gpointer priv) { redisReply *reply = r; struct lua_redis_request_specific_userdata *sp_ud = priv; struct lua_redis_ctx *ctx; struct lua_redis_userdata *ud; redisAsyncContext *ac; ctx = sp_ud->ctx; ud = sp_ud->c; if (ud->terminated || !rspamd_lua_is_initialised()) { /* We are already at the termination stage, just go out */ return; } msg_debug_lua_redis("got reply from redis %p for query %p", sp_ud->c->ctx, sp_ud); REDIS_RETAIN(ctx); /* If session is finished, we cannot call lua callbacks */ if (!(sp_ud->flags & LUA_REDIS_SPECIFIC_FINISHED) || (sp_ud->flags & LUA_REDIS_SUBSCRIBED)) { if (c->err == 0) { if (r != NULL) { if (reply->type != REDIS_REPLY_ERROR) { lua_redis_push_data(reply, ctx, sp_ud); } else { lua_redis_push_error(reply->str, ctx, sp_ud, TRUE); } } else { lua_redis_push_error("received no data from server", ctx, sp_ud, TRUE); } } else { if (c->err == REDIS_ERR_IO) { lua_redis_push_error(strerror(errno), ctx, sp_ud, TRUE); } else { lua_redis_push_error(c->errstr, ctx, sp_ud, TRUE); } } } if (!(sp_ud->flags & LUA_REDIS_SUBSCRIBED)) { ctx->cmds_pending--; if (ctx->cmds_pending == 0 && !ud->terminated) { /* Disconnect redis early as we don't need it anymore */ ud->terminated = 1; ac = ud->ctx; ud->ctx = NULL; if (ac) { msg_debug_lua_redis("release redis connection ud=%p; ctx=%p; refcount=%d", ud, ctx, ctx->ref.refcount); rspamd_redis_pool_release_connection(ud->pool, ac, (ctx->flags & LUA_REDIS_NO_POOL) ? RSPAMD_REDIS_RELEASE_ENFORCE : RSPAMD_REDIS_RELEASE_DEFAULT); } } } REDIS_RELEASE(ctx); } static gint lua_redis_push_results(struct lua_redis_ctx *ctx, lua_State *L) { gint results = g_queue_get_length(ctx->replies); gint i; gboolean can_use_lua = TRUE; results = g_queue_get_length(ctx->replies); if (!lua_checkstack(L, (results * 2) + 1)) { luaL_error(L, "cannot resize stack to fit %d commands", ctx->cmds_pending); can_use_lua = FALSE; } for (i = 0; i < results; i++) { struct lua_redis_result *result = g_queue_pop_head(ctx->replies); if (can_use_lua) { lua_pushboolean(L, !result->is_error); lua_rawgeti(L, LUA_REGISTRYINDEX, result->result_ref); } luaL_unref(L, LUA_REGISTRYINDEX, result->result_ref); g_queue_push_tail(ctx->events_cleanup, result); } return can_use_lua ? results * 2 : 0; } static void lua_redis_cleanup_events(struct lua_redis_ctx *ctx) { REDIS_RETAIN(ctx); /* To avoid preliminary destruction */ while (!g_queue_is_empty(ctx->events_cleanup)) { struct lua_redis_result *result = g_queue_pop_head(ctx->events_cleanup); if (result->item) { rspamd_symcache_item_async_dec_check(result->task, result->item, M); } if (result->s) { rspamd_session_remove_event(result->s, lua_redis_fin, result->sp_ud); } else { lua_redis_fin(result->sp_ud); } g_free(result); } REDIS_RELEASE(ctx); } /** * Callback for redis replies * @param c context of redis connection * @param r redis reply * @param priv userdata */ static void lua_redis_callback_sync(redisAsyncContext *ac, gpointer r, gpointer priv) { redisReply *reply = r; struct lua_redis_request_specific_userdata *sp_ud = priv; struct lua_redis_ctx *ctx; struct lua_redis_userdata *ud; struct thread_entry *thread; gint results; ctx = sp_ud->ctx; ud = sp_ud->c; lua_State *L = ctx->async.cfg->lua_state; sp_ud->flags |= LUA_REDIS_SPECIFIC_REPLIED; if (ud->terminated) { /* We are already at the termination stage, just go out */ /* TODO: if somebody is waiting for us (ctx->thread), return result, otherwise, indeed, ignore */ return; } if (ev_can_stop(&sp_ud->timeout_ev)) { ev_timer_stop(ud->event_loop, &sp_ud->timeout_ev); } if (!(sp_ud->flags & LUA_REDIS_SPECIFIC_FINISHED)) { msg_debug_lua_redis("got reply from redis: %p for query %p", ac, sp_ud); struct lua_redis_result *result = g_malloc0(sizeof *result); if (ac->err == 0) { if (r != NULL) { if (reply->type != REDIS_REPLY_ERROR) { result->is_error = FALSE; lua_redis_push_reply(L, reply, ctx->flags & LUA_REDIS_TEXTDATA); } else { result->is_error = TRUE; lua_pushstring(L, reply->str); } } else { result->is_error = TRUE; lua_pushliteral(L, "received no data from server"); } } else { result->is_error = TRUE; if (ac->err == REDIS_ERR_IO) { lua_pushstring(L, strerror(errno)); } else { lua_pushstring(L, ac->errstr); } } /* if error happened, we should terminate the connection, and release it */ if (result->is_error && sp_ud->c->ctx) { ac = sp_ud->c->ctx; /* Set to NULL to avoid double free in dtor */ sp_ud->c->ctx = NULL; ctx->flags |= LUA_REDIS_TERMINATED; /* * This will call all callbacks pending so the entire context * will be destructed */ rspamd_redis_pool_release_connection(sp_ud->c->pool, ac, RSPAMD_REDIS_RELEASE_FATAL); } result->result_ref = luaL_ref(L, LUA_REGISTRYINDEX); result->s = ud->s; result->item = ud->item; result->task = ud->task; result->sp_ud = sp_ud; g_queue_push_tail(ctx->replies, result); } ctx->cmds_pending--; if (ctx->cmds_pending == 0) { if (ctx->thread) { if (!(sp_ud->flags & LUA_REDIS_SPECIFIC_FINISHED)) { /* somebody yielded and waits for results */ thread = ctx->thread; ctx->thread = NULL; results = lua_redis_push_results(ctx, thread->lua_state); if (ud->item) { rspamd_symcache_set_cur_item(ud->task, ud->item); } lua_thread_resume(thread, results); lua_redis_cleanup_events(ctx); } else { /* We cannot resume the thread as the associated task has gone */ lua_thread_pool_terminate_entry_full(ud->cfg->lua_thread_pool, ctx->thread, G_STRLOC, true); ctx->thread = NULL; } } } } static void lua_redis_timeout_sync(EV_P_ ev_timer *w, int revents) { struct lua_redis_request_specific_userdata *sp_ud = (struct lua_redis_request_specific_userdata *) w->data; struct lua_redis_ctx *ctx; struct lua_redis_userdata *ud; redisAsyncContext *ac; if (sp_ud->flags & LUA_REDIS_SPECIFIC_FINISHED) { return; } ud = sp_ud->c; ctx = sp_ud->ctx; msg_debug_lua_redis("timeout while querying redis server: %p, redis: %p", sp_ud, sp_ud->c->ctx); if (sp_ud->c->ctx) { ac = sp_ud->c->ctx; /* Set to NULL to avoid double free in dtor */ sp_ud->c->ctx = NULL; ac->err = REDIS_ERR_IO; errno = ETIMEDOUT; ctx->flags |= LUA_REDIS_TERMINATED; /* * This will call all callbacks pending so the entire context * will be destructed */ rspamd_redis_pool_release_connection(sp_ud->c->pool, ac, RSPAMD_REDIS_RELEASE_FATAL); } } static void lua_redis_timeout(EV_P_ ev_timer *w, int revents) { struct lua_redis_request_specific_userdata *sp_ud = (struct lua_redis_request_specific_userdata *) w->data; struct lua_redis_userdata *ud; struct lua_redis_ctx *ctx; redisAsyncContext *ac; if (sp_ud->flags & LUA_REDIS_SPECIFIC_FINISHED) { return; } ctx = sp_ud->ctx; ud = sp_ud->c; REDIS_RETAIN(ctx); msg_debug_lua_redis("timeout while querying redis server: %p, redis: %p", sp_ud, sp_ud->c->ctx); lua_redis_push_error("timeout while connecting the server", ctx, sp_ud, TRUE); if (sp_ud->c->ctx) { ac = sp_ud->c->ctx; /* Set to NULL to avoid double free in dtor */ sp_ud->c->ctx = NULL; ac->err = REDIS_ERR_IO; errno = ETIMEDOUT; /* * This will call all callbacks pending so the entire context * will be destructed */ rspamd_redis_pool_release_connection(sp_ud->c->pool, ac, RSPAMD_REDIS_RELEASE_FATAL); } REDIS_RELEASE(ctx); } static void lua_redis_parse_args(lua_State *L, gint idx, const gchar *cmd, gchar ***pargs, gsize **parglens, guint *nargs) { gchar **args = NULL; gsize *arglens; gint top; if (idx != 0 && lua_type(L, idx) == LUA_TTABLE) { /* Get all arguments */ lua_pushvalue(L, idx); lua_pushnil(L); top = 0; while (lua_next(L, -2) != 0) { gint type = lua_type(L, -1); if (type == LUA_TNUMBER || type == LUA_TSTRING || type == LUA_TUSERDATA) { top++; } lua_pop(L, 1); } args = g_malloc((top + 1) * sizeof(gchar *)); arglens = g_malloc((top + 1) * sizeof(gsize)); arglens[0] = strlen(cmd); args[0] = g_malloc(arglens[0]); memcpy(args[0], cmd, arglens[0]); top = 1; lua_pushnil(L); while (lua_next(L, -2) != 0) { gint type = lua_type(L, -1); if (type == LUA_TSTRING) { const gchar *s; s = lua_tolstring(L, -1, &arglens[top]); args[top] = g_malloc(arglens[top]); memcpy(args[top], s, arglens[top]); top++; } else if (type == LUA_TUSERDATA) { struct rspamd_lua_text *t; t = lua_check_text(L, -1); if (t && t->start) { arglens[top] = t->len; args[top] = g_malloc(arglens[top]); memcpy(args[top], t->start, arglens[top]); top++; } } else if (type == LUA_TNUMBER) { gdouble val = lua_tonumber(L, -1); gint r; gchar numbuf[64]; if (val == (gdouble) ((gint64) val)) { r = rspamd_snprintf(numbuf, sizeof(numbuf), "%L", (gint64) val); } else { r = rspamd_snprintf(numbuf, sizeof(numbuf), "%f", val); } arglens[top] = r; args[top] = g_malloc(arglens[top]); memcpy(args[top], numbuf, arglens[top]); top++; } lua_pop(L, 1); } lua_pop(L, 1); } else { /* Use merely cmd */ args = g_malloc(sizeof(gchar *)); arglens = g_malloc(sizeof(gsize)); arglens[0] = strlen(cmd); args[0] = g_malloc(arglens[0]); memcpy(args[0], cmd, arglens[0]); top = 1; } *pargs = args; *parglens = arglens; *nargs = top; } static struct lua_redis_ctx * rspamd_lua_redis_prepare_connection(lua_State *L, gint *pcbref, gboolean is_async) { struct lua_redis_ctx *ctx = NULL; rspamd_inet_addr_t *ip = NULL; struct lua_redis_userdata *ud = NULL; struct rspamd_lua_ip *addr = NULL; struct rspamd_task *task = NULL; const gchar *host = NULL; const gchar *username = NULL, *password = NULL, *dbname = NULL, *log_tag = NULL; gint cbref = -1; struct rspamd_config *cfg = NULL; struct rspamd_async_session *session = NULL; struct ev_loop *ev_base = NULL; gboolean ret = FALSE; guint flags = 0; if (lua_istable(L, 1)) { /* Table version */ lua_pushvalue(L, 1); lua_pushstring(L, "task"); lua_gettable(L, -2); if (lua_type(L, -1) == LUA_TUSERDATA) { task = lua_check_task_maybe(L, -1); } lua_pop(L, 1); if (!task) { /* We need to get ev_base, config and session separately */ lua_pushstring(L, "config"); lua_gettable(L, -2); if (lua_type(L, -1) == LUA_TUSERDATA) { cfg = lua_check_config(L, -1); } lua_pop(L, 1); lua_pushstring(L, "session"); lua_gettable(L, -2); if (lua_type(L, -1) == LUA_TUSERDATA) { session = lua_check_session(L, -1); } lua_pop(L, 1); lua_pushstring(L, "ev_base"); lua_gettable(L, -2); if (lua_type(L, -1) == LUA_TUSERDATA) { ev_base = lua_check_ev_base(L, -1); } lua_pop(L, 1); if (cfg && ev_base) { ret = TRUE; } else if (!cfg) { msg_err_task_check("config is not passed"); } else { msg_err_task_check("ev_base is not set"); } } else { cfg = task->cfg; session = task->s; ev_base = task->event_loop; log_tag = task->task_pool->tag.uid; ret = TRUE; } if (pcbref) { lua_pushstring(L, "callback"); lua_gettable(L, -2); if (lua_type(L, -1) == LUA_TFUNCTION) { /* This also pops function from the stack */ cbref = luaL_ref(L, LUA_REGISTRYINDEX); *pcbref = cbref; } else { *pcbref = -1; lua_pop(L, 1); } } lua_pushstring(L, "host"); lua_gettable(L, -2); if (lua_type(L, -1) == LUA_TUSERDATA) { addr = lua_check_ip(L, -1); host = rspamd_inet_address_to_string_pretty(addr->addr); } else if (lua_type(L, -1) == LUA_TSTRING) { host = lua_tostring(L, -1); if (rspamd_parse_inet_address(&ip, host, strlen(host), RSPAMD_INET_ADDRESS_PARSE_DEFAULT)) { addr = g_alloca(sizeof(*addr)); addr->addr = ip; if (rspamd_inet_address_get_port(ip) == 0) { rspamd_inet_address_set_port(ip, 6379); } } } lua_pop(L, 1); lua_pushstring(L, "username"); lua_gettable(L, -2); if (lua_type(L, -1) == LUA_TSTRING) { username = lua_tostring(L, -1); } lua_pop(L, 1); lua_pushstring(L, "password"); lua_gettable(L, -2); if (lua_type(L, -1) == LUA_TSTRING) { password = lua_tostring(L, -1); } lua_pop(L, 1); lua_pushstring(L, "dbname"); lua_gettable(L, -2); if (lua_type(L, -1) == LUA_TSTRING) { dbname = lua_tostring(L, -1); } lua_pop(L, 1); lua_pushstring(L, "opaque_data"); lua_gettable(L, -2); if (!!lua_toboolean(L, -1)) { flags |= LUA_REDIS_TEXTDATA; } lua_pop(L, 1); lua_pushstring(L, "no_pool"); lua_gettable(L, -2); if (!!lua_toboolean(L, -1)) { flags |= LUA_REDIS_NO_POOL; } lua_pop(L, 1); lua_pop(L, 1); /* table */ if (session && rspamd_session_blocked(session)) { msg_err_task_check("Session is being destroying"); ret = FALSE; } if (ret && addr != NULL) { ctx = g_malloc0(sizeof(struct lua_redis_ctx)); REF_INIT_RETAIN(ctx, lua_redis_dtor); if (is_async) { ctx->flags |= flags | LUA_REDIS_ASYNC; ud = &ctx->async; } else { ud = &ctx->async; ctx->replies = g_queue_new(); ctx->events_cleanup = g_queue_new(); } ud->s = session; ud->cfg = cfg; ud->pool = cfg->redis_pool; ud->event_loop = ev_base; ud->task = task; if (log_tag) { rspamd_strlcpy(ud->log_tag, log_tag, sizeof(ud->log_tag)); } else { /* Use pointer itself as a tag */ rspamd_snprintf(ud->log_tag, sizeof(ud->log_tag), "%ud", (int) rspamd_cryptobox_fast_hash(&ud, sizeof(ud), 0)); } if (task) { ud->item = rspamd_symcache_get_cur_item(task); } ret = TRUE; } else { if (cbref != -1) { luaL_unref(L, LUA_REGISTRYINDEX, cbref); } msg_err_task_check("incorrect function invocation"); ret = FALSE; } } if (ret) { ud->terminated = 0; ud->ctx = rspamd_redis_pool_connect(ud->pool, dbname, username, password, rspamd_inet_address_to_string(addr->addr), rspamd_inet_address_get_port(addr->addr)); if (ip) { rspamd_inet_address_free(ip); } if (ud->ctx == NULL || ud->ctx->err) { if (ud->ctx) { msg_err_task_check("cannot connect to redis: %s", ud->ctx->errstr); rspamd_redis_pool_release_connection(ud->pool, ud->ctx, RSPAMD_REDIS_RELEASE_FATAL); ud->ctx = NULL; } else { msg_err_task_check("cannot connect to redis (OS error): %s", strerror(errno)); } REDIS_RELEASE(ctx); return NULL; } msg_debug_lua_redis("opened redis connection host=%s; ctx=%p; ud=%p", host, ctx, ud); return ctx; } if (ip) { rspamd_inet_address_free(ip); } return NULL; } /*** * @function rspamd_redis.make_request({params}) * Make request to redis server, params is a table of key=value arguments in any order * @param {task} task worker task object * @param {ip|string} host server address * @param {function} callback callback to be called in form `function (task, err, data)` * @param {string} cmd command to be sent to redis * @param {table} args numeric array of strings used as redis arguments * @param {number} timeout timeout in seconds for request (1.0 by default) * @return {boolean} `true` if a request has been scheduled */ static int lua_redis_make_request(lua_State *L) { LUA_TRACE_POINT; struct lua_redis_request_specific_userdata *sp_ud; struct lua_redis_userdata *ud; struct lua_redis_ctx *ctx, **pctx; const gchar *cmd = NULL; gdouble timeout = REDIS_DEFAULT_TIMEOUT; gint cbref = -1; gboolean ret = FALSE; ctx = rspamd_lua_redis_prepare_connection(L, &cbref, TRUE); if (ctx) { ud = &ctx->async; sp_ud = g_malloc0(sizeof(*sp_ud)); sp_ud->cbref = cbref; sp_ud->c = ud; sp_ud->ctx = ctx; lua_pushstring(L, "cmd"); lua_gettable(L, -2); cmd = lua_tostring(L, -1); lua_pop(L, 1); lua_pushstring(L, "timeout"); lua_gettable(L, 1); if (lua_type(L, -1) == LUA_TNUMBER) { timeout = lua_tonumber(L, -1); } lua_pop(L, 1); ud->timeout = timeout; lua_pushstring(L, "args"); lua_gettable(L, 1); lua_redis_parse_args(L, -1, cmd, &sp_ud->args, &sp_ud->arglens, &sp_ud->nargs); lua_pop(L, 1); LL_PREPEND(ud->specific, sp_ud); ret = redisAsyncCommandArgv(ud->ctx, lua_redis_callback, sp_ud, sp_ud->nargs, (const gchar **) sp_ud->args, sp_ud->arglens); if (ret == REDIS_OK) { if (ud->s) { rspamd_session_add_event(ud->s, lua_redis_fin, sp_ud, M); if (ud->item) { rspamd_symcache_item_async_inc(ud->task, ud->item, M); } } REDIS_RETAIN(ctx); /* Cleared by fin event */ ctx->cmds_pending++; if (ud->ctx->c.flags & REDIS_SUBSCRIBED) { msg_debug_lua_redis("subscribe command, never unref/timeout"); sp_ud->flags |= LUA_REDIS_SUBSCRIBED; } sp_ud->timeout_ev.data = sp_ud; ev_now_update_if_cheap((struct ev_loop *) ud->event_loop); ev_timer_init(&sp_ud->timeout_ev, lua_redis_timeout, timeout, 0.0); ev_timer_start(ud->event_loop, &sp_ud->timeout_ev); ret = TRUE; } else { msg_info("call to redis failed: %s", ud->ctx->errstr); rspamd_redis_pool_release_connection(ud->pool, ud->ctx, RSPAMD_REDIS_RELEASE_FATAL); ud->ctx = NULL; REDIS_RELEASE(ctx); ret = FALSE; } } else { lua_pushboolean(L, FALSE); lua_pushnil(L); return 2; } lua_pushboolean(L, ret); if (ret) { pctx = lua_newuserdata(L, sizeof(ctx)); *pctx = ctx; rspamd_lua_setclass(L, "rspamd{redis}", -1); } else { lua_pushnil(L); } return 2; } /*** * @function rspamd_redis.make_request_sync({params}) * Make blocking request to redis server, params is a table of key=value arguments in any order * @param {ip|string} host server address * @param {string} cmd command to be sent to redis * @param {table} args numeric array of strings used as redis arguments * @param {number} timeout timeout in seconds for request (1.0 by default) * @return {boolean + result} `true` and a result if a request has been successful */ static int lua_redis_make_request_sync(lua_State *L) { LUA_TRACE_POINT; struct rspamd_lua_ip *addr = NULL; rspamd_inet_addr_t *ip = NULL; const gchar *cmd = NULL, *host; struct timeval tv; gboolean ret = FALSE; gdouble timeout = REDIS_DEFAULT_TIMEOUT; gchar **args = NULL; gsize *arglens = NULL; guint nargs = 0, flags = 0; redisContext *ctx; redisReply *r; if (lua_istable(L, 1)) { lua_pushvalue(L, 1); lua_pushstring(L, "cmd"); lua_gettable(L, -2); cmd = lua_tostring(L, -1); lua_pop(L, 1); lua_pushstring(L, "host"); lua_gettable(L, -2); if (lua_type(L, -1) == LUA_TUSERDATA) { addr = lua_check_ip(L, -1); } else if (lua_type(L, -1) == LUA_TSTRING) { host = lua_tostring(L, -1); if (rspamd_parse_inet_address(&ip, host, strlen(host), RSPAMD_INET_ADDRESS_PARSE_DEFAULT)) { addr = g_alloca(sizeof(*addr)); addr->addr = ip; if (rspamd_inet_address_get_port(ip) == 0) { rspamd_inet_address_set_port(ip, 6379); } } } lua_pop(L, 1); lua_pushstring(L, "timeout"); lua_gettable(L, -2); if (lua_type(L, -1) == LUA_TNUMBER) { timeout = lua_tonumber(L, -1); } lua_pop(L, 1); lua_pushstring(L, "opaque_data"); lua_gettable(L, -2); if (!!lua_toboolean(L, -1)) { flags |= LUA_REDIS_TEXTDATA; } lua_pop(L, 1); if (cmd) { lua_pushstring(L, "args"); lua_gettable(L, -2); lua_redis_parse_args(L, -1, cmd, &args, &arglens, &nargs); lua_pop(L, 1); } lua_pop(L, 1); if (addr && cmd) { ret = TRUE; } } if (ret) { double_to_tv(timeout, &tv); if (rspamd_inet_address_get_af(addr->addr) == AF_UNIX) { ctx = redisConnectUnixWithTimeout( rspamd_inet_address_to_string(addr->addr), tv); } else { ctx = redisConnectWithTimeout( rspamd_inet_address_to_string(addr->addr), rspamd_inet_address_get_port(addr->addr), tv); } if (ip) { rspamd_inet_address_free(ip); } if (ctx == NULL || ctx->err) { redisFree(ctx); lua_redis_free_args(args, arglens, nargs); lua_pushboolean(L, FALSE); return 1; } r = redisCommandArgv(ctx, nargs, (const gchar **) args, arglens); if (r != NULL) { if (r->type != REDIS_REPLY_ERROR) { lua_pushboolean(L, TRUE); lua_redis_push_reply(L, r, flags & LUA_REDIS_TEXTDATA); } else { lua_pushboolean(L, FALSE); lua_pushstring(L, r->str); } freeReplyObject(r); redisFree(ctx); lua_redis_free_args(args, arglens, nargs); return 2; } else { msg_info("call to redis failed: %s", ctx->errstr); redisFree(ctx); lua_redis_free_args(args, arglens, nargs); lua_pushboolean(L, FALSE); } } else { if (ip) { rspamd_inet_address_free(ip); } msg_err("bad arguments for redis request"); lua_redis_free_args(args, arglens, nargs); lua_pushboolean(L, FALSE); } return 1; } /*** * @function rspamd_redis.connect({params}) * Make request to redis server, params is a table of key=value arguments in any order * @param {task} task worker task object * @param {ip|string} host server address * @param {number} timeout timeout in seconds for request (1.0 by default) * @return {boolean,redis} new connection object or nil if connection failed */ static int lua_redis_connect(lua_State *L) { LUA_TRACE_POINT; struct lua_redis_userdata *ud; struct lua_redis_ctx *ctx, **pctx; gdouble timeout = REDIS_DEFAULT_TIMEOUT; ctx = rspamd_lua_redis_prepare_connection(L, NULL, TRUE); if (ctx) { ud = &ctx->async; lua_pushstring(L, "timeout"); lua_gettable(L, 1); if (lua_type(L, -1) == LUA_TNUMBER) { timeout = lua_tonumber(L, -1); } lua_pop(L, 1); ud->timeout = timeout; } else { lua_pushboolean(L, FALSE); lua_pushnil(L); return 2; } lua_pushboolean(L, TRUE); pctx = lua_newuserdata(L, sizeof(ctx)); *pctx = ctx; rspamd_lua_setclass(L, "rspamd{redis}", -1); return 2; } /*** * @function rspamd_redis.connect_sync({params}) * Make blocking request to redis server, params is a table of key=value arguments in any order * @param {ip|string} host server address * @param {number} timeout timeout in seconds for request (1.0 by default) * @return {redis} redis object if a request has been successful */ static int lua_redis_connect_sync(lua_State *L) { LUA_TRACE_POINT; gdouble timeout = REDIS_DEFAULT_TIMEOUT; struct lua_redis_ctx *ctx, **pctx; ctx = rspamd_lua_redis_prepare_connection(L, NULL, FALSE); if (ctx) { if (lua_istable(L, 1)) { lua_pushstring(L, "timeout"); lua_gettable(L, 1); if (lua_type(L, -1) == LUA_TNUMBER) { timeout = lua_tonumber(L, -1); } lua_pop(L, 1); } ctx->async.timeout = timeout; lua_pushboolean(L, TRUE); pctx = lua_newuserdata(L, sizeof(ctx)); *pctx = ctx; rspamd_lua_setclass(L, "rspamd{redis}", -1); } else { lua_pushboolean(L, FALSE); lua_pushstring(L, "bad arguments for redis request"); return 2; } return 2; } /*** * @method rspamd_redis:add_cmd(cmd, {args}) * Append new cmd to redis pipeline * @param {string} cmd command to be sent to redis * @param {table} args array of strings used as redis arguments * @return {boolean} `true` if a request has been successful */ static int lua_redis_add_cmd(lua_State *L) { LUA_TRACE_POINT; struct lua_redis_ctx *ctx = lua_check_redis(L, 1); struct lua_redis_request_specific_userdata *sp_ud; struct lua_redis_userdata *ud; const gchar *cmd = NULL; gint args_pos = 2; gint cbref = -1, ret; if (ctx) { if (ctx->flags & LUA_REDIS_TERMINATED) { lua_pushboolean(L, FALSE); lua_pushstring(L, "Connection is terminated"); return 2; } /* Async version */ if (lua_type(L, 2) == LUA_TSTRING) { /* No callback version */ cmd = lua_tostring(L, 2); args_pos = 3; } else if (lua_type(L, 2) == LUA_TFUNCTION) { lua_pushvalue(L, 2); cbref = luaL_ref(L, LUA_REGISTRYINDEX); cmd = lua_tostring(L, 3); args_pos = 4; } else { return luaL_error(L, "invalid arguments"); } sp_ud = g_malloc0(sizeof(*sp_ud)); if (IS_ASYNC(ctx)) { sp_ud->c = &ctx->async; ud = &ctx->async; sp_ud->cbref = cbref; } else { sp_ud->c = &ctx->async; ud = &ctx->async; } sp_ud->ctx = ctx; lua_redis_parse_args(L, args_pos, cmd, &sp_ud->args, &sp_ud->arglens, &sp_ud->nargs); LL_PREPEND(sp_ud->c->specific, sp_ud); if (ud->s && rspamd_session_blocked(ud->s)) { lua_pushboolean(L, 0); lua_pushstring(L, "session is terminating"); return 2; } if (IS_ASYNC(ctx)) { ret = redisAsyncCommandArgv(sp_ud->c->ctx, lua_redis_callback, sp_ud, sp_ud->nargs, (const gchar **) sp_ud->args, sp_ud->arglens); } else { ret = redisAsyncCommandArgv(sp_ud->c->ctx, lua_redis_callback_sync, sp_ud, sp_ud->nargs, (const gchar **) sp_ud->args, sp_ud->arglens); } if (ret == REDIS_OK) { if (ud->s) { rspamd_session_add_event(ud->s, lua_redis_fin, sp_ud, M); if (ud->item) { rspamd_symcache_item_async_inc(ud->task, ud->item, M); } } sp_ud->timeout_ev.data = sp_ud; if (IS_ASYNC(ctx)) { ev_timer_init(&sp_ud->timeout_ev, lua_redis_timeout, sp_ud->c->timeout, 0.0); } else { ev_timer_init(&sp_ud->timeout_ev, lua_redis_timeout_sync, sp_ud->c->timeout, 0.0); } ev_timer_start(ud->event_loop, &sp_ud->timeout_ev); REDIS_RETAIN(ctx); ctx->cmds_pending++; } else { msg_info("call to redis failed: %s", sp_ud->c->ctx->errstr); lua_pushboolean(L, 0); lua_pushstring(L, sp_ud->c->ctx->errstr); return 2; } } lua_pushboolean(L, true); return 1; } /*** * @method rspamd_redis:exec() * Executes pending commands (suitable for blocking IO only for now) * @return {boolean}, {table}, ...: pairs in format [bool, result] for each request pending */ static int lua_redis_exec(lua_State *L) { LUA_TRACE_POINT; struct lua_redis_ctx *ctx = lua_check_redis(L, 1); if (ctx == NULL) { lua_error(L); return 1; } if (IS_ASYNC(ctx)) { lua_pushstring(L, "Async redis pipelining is not implemented"); lua_error(L); return 0; } else { if (ctx->cmds_pending == 0 && g_queue_get_length(ctx->replies) == 0) { lua_pushstring(L, "No pending commands to execute"); lua_error(L); } if (ctx->cmds_pending == 0 && g_queue_get_length(ctx->replies) > 0) { gint results = lua_redis_push_results(ctx, L); return results; } else { ctx->thread = lua_thread_pool_get_running_entry(ctx->async.cfg->lua_thread_pool); return lua_thread_yield(ctx->thread, 0); } } } static gint lua_load_redis(lua_State *L) { lua_newtable(L); luaL_register(L, NULL, redislib_f); return 1; } static gint lua_redis_null_idx(lua_State *L) { lua_pushnil(L); return 1; } static void lua_redis_null_mt(lua_State *L) { luaL_newmetatable(L, "redis{null}"); lua_pushcfunction(L, lua_redis_null_idx); lua_setfield(L, -2, "__index"); lua_pushcfunction(L, lua_redis_null_idx); lua_setfield(L, -2, "__tostring"); lua_pop(L, 1); } /** * Open redis library * @param L lua stack * @return */ void luaopen_redis(lua_State *L) { rspamd_lua_new_class(L, "rspamd{redis}", redislib_m); lua_pop(L, 1); rspamd_lua_add_preload(L, "rspamd_redis", lua_load_redis); /* Set null element */ lua_redis_null_mt(L); redis_null = lua_newuserdata(L, 0); luaL_getmetatable(L, "redis{null}"); lua_setmetatable(L, -2); lua_setfield(L, LUA_REGISTRYINDEX, "redis.null"); }