diff options
author | Vsevolod Stakhov <vsevolod@highsecure.ru> | 2017-03-18 13:21:02 +0000 |
---|---|---|
committer | Vsevolod Stakhov <vsevolod@highsecure.ru> | 2017-03-18 13:21:02 +0000 |
commit | b582a1219d8b2b1e817359cf64fafd3b32ff72a9 (patch) | |
tree | e35b4209c41c6e8fb0a2271eb175ce27cb25c204 /src | |
parent | 5a75d743c04eb6d97023cba7daf47514b7b33b1f (diff) | |
download | rspamd-b582a1219d8b2b1e817359cf64fafd3b32ff72a9.tar.gz rspamd-b582a1219d8b2b1e817359cf64fafd3b32ff72a9.zip |
[Minor] Allow to pass data transparently to lua from redis
Diffstat (limited to 'src')
-rw-r--r-- | src/lua/lua_redis.c | 98 |
1 files changed, 74 insertions, 24 deletions
diff --git a/src/lua/lua_redis.c b/src/lua/lua_redis.c index acb355faa..4a0a1cfd9 100644 --- a/src/lua/lua_redis.c +++ b/src/lua/lua_redis.c @@ -106,6 +106,12 @@ struct lua_redis_userdata { guint16 terminated; }; +#define LUA_REDIS_SPECIFIC_REPLIED (1 << 0) +#define LUA_REDIS_SPECIFIC_FINISHED (1 << 1) +#define LUA_REDIS_ASYNC (1 << 0) +#define LUA_REDIS_TEXTDATA (1 << 1) +#define IS_ASYNC(ctx) ((ctx)->flags & LUA_REDIS_ASYNC) + struct lua_redis_specific_userdata { gint cbref; guint nargs; @@ -116,12 +122,11 @@ struct lua_redis_specific_userdata { struct lua_redis_ctx *ctx; struct lua_redis_specific_userdata *next; struct event timeout; - gboolean replied; - gboolean finished; + guint flags; }; struct lua_redis_ctx { - gboolean async; + guint flags; union { struct lua_redis_userdata async; redisContext *sync; @@ -161,7 +166,7 @@ lua_redis_dtor (struct lua_redis_ctx *ctx) gboolean is_successfull = TRUE; struct redisAsyncContext *ac; - if (ctx->async) { + if (IS_ASYNC (ctx)) { msg_debug ("desctructing %p", ctx); ud = &ctx->d.async; @@ -170,11 +175,11 @@ lua_redis_dtor (struct lua_redis_ctx *ctx) LL_FOREACH_SAFE (ud->specific, cur, tmp) { event_del (&cur->timeout); - if (!cur->replied) { + if (!(cur->flags & LUA_REDIS_SPECIFIC_REPLIED)) { is_successfull = FALSE; } - cur->finished = TRUE; + cur->flags |= LUA_REDIS_SPECIFIC_FINISHED; } ud->terminated = 1; @@ -223,7 +228,7 @@ lua_redis_fin (void *arg) ctx = sp_ud->ctx; event_del (&sp_ud->timeout); msg_debug ("finished redis query %p from session %p", sp_ud, ctx); - sp_ud->finished = TRUE; + sp_ud->flags |= LUA_REDIS_SPECIFIC_FINISHED; REDIS_RELEASE (ctx); } @@ -241,7 +246,7 @@ lua_redis_push_error (const gchar *err, { struct lua_redis_userdata *ud = sp_ud->c; - if (!sp_ud->replied && !sp_ud->finished) { + if (!(sp_ud->flags & (LUA_REDIS_SPECIFIC_REPLIED|LUA_REDIS_SPECIFIC_FINISHED))) { if (sp_ud->cbref != -1) { /* Push error */ lua_rawgeti (ud->L, LUA_REGISTRYINDEX, sp_ud->cbref); @@ -257,7 +262,8 @@ lua_redis_push_error (const gchar *err, } } - sp_ud->replied = TRUE; + sp_ud->flags |= LUA_REDIS_SPECIFIC_REPLIED; + if (connected && ud->s) { rspamd_session_watcher_pop (ud->s, sp_ud->w); rspamd_session_remove_event (ud->s, lua_redis_fin, sp_ud); @@ -269,9 +275,10 @@ lua_redis_push_error (const gchar *err, } static void -lua_redis_push_reply (lua_State *L, const redisReply *r) +lua_redis_push_reply (lua_State *L, const redisReply *r, gboolean text_data) { guint i; + struct rspamd_lua_text *t; switch (r->type) { case REDIS_REPLY_INTEGER: @@ -283,12 +290,21 @@ lua_redis_push_reply (lua_State *L, const redisReply *r) break; case REDIS_REPLY_STRING: case REDIS_REPLY_STATUS: - lua_pushlstring (L, r->str, r->len); + if (text_data) { + t = lua_newuserdata (L, sizeof (*t)); + rspamd_lua_setclass (L, "rspamd{text}", -1); + t->flags = 0; + t->start = r->str; + t->len = r->len; + } + else { + lua_pushlstring (L, r->str, r->len); + } break; case REDIS_REPLY_ARRAY: lua_createtable (L, r->elements, 0); for (i = 0; i < r->elements; ++i) { - lua_redis_push_reply (L, r->element[i]); + lua_redis_push_reply (L, r->element[i], text_data); lua_rawseti (L, -2, i + 1); /* Store sub-reply */ } break; @@ -309,14 +325,14 @@ lua_redis_push_data (const redisReply *r, struct lua_redis_ctx *ctx, { struct lua_redis_userdata *ud = sp_ud->c; - if (!sp_ud->replied && !sp_ud->finished) { + if (!(sp_ud->flags & (LUA_REDIS_SPECIFIC_REPLIED|LUA_REDIS_SPECIFIC_FINISHED))) { if (sp_ud->cbref != -1) { /* Push error */ lua_rawgeti (ud->L, LUA_REGISTRYINDEX, sp_ud->cbref); /* Error is nil */ lua_pushnil (ud->L); /* Data */ - lua_redis_push_reply (ud->L, r); + lua_redis_push_reply (ud->L, r, ctx->flags & LUA_REDIS_TEXTDATA); if (lua_pcall (ud->L, 2, 0, 0) != 0) { msg_info ("call to callback failed: %s", lua_tostring (ud->L, -1)); @@ -325,7 +341,7 @@ lua_redis_push_data (const redisReply *r, struct lua_redis_ctx *ctx, } - sp_ud->replied = TRUE; + sp_ud->flags |= LUA_REDIS_SPECIFIC_REPLIED; if (ud->s) { rspamd_session_watcher_pop (ud->s, sp_ud->w); @@ -365,7 +381,7 @@ lua_redis_callback (redisAsyncContext *c, gpointer r, gpointer priv) REDIS_RETAIN (ctx); /* If session is finished, we cannot call lua callbacks */ - if (!sp_ud->finished) { + if (!(sp_ud->flags & LUA_REDIS_SPECIFIC_FINISHED)) { if (c->err == 0) { if (r != NULL) { if (reply->type != REDIS_REPLY_ERROR) { @@ -412,7 +428,7 @@ lua_redis_timeout (int fd, short what, gpointer u) struct lua_redis_ctx *ctx; redisAsyncContext *ac; - if (sp_ud->finished) { + if (sp_ud->flags & LUA_REDIS_SPECIFIC_FINISHED) { return; } @@ -530,6 +546,7 @@ rspamd_lua_redis_prepare_connection (lua_State *L, gint *pcbref) struct rspamd_async_session *session = NULL; struct event_base *ev_base = NULL; gboolean ret = FALSE; + guint flags = 0; if (lua_istable (L, 1)) { /* Table version */ @@ -564,6 +581,13 @@ rspamd_lua_redis_prepare_connection (lua_State *L, gint *pcbref) } lua_pop (L, 1); + lua_pushstring (L, "ev_base"); + lua_gettable (L, -2); + if (lua_type (L, -1) == LUA_TUSERDATA) { + ev_base = lua_check_ev_base (L, -1); + } + lua_pop (L, 1); + if (cfg && ev_base) { ret = TRUE; } @@ -622,13 +646,21 @@ rspamd_lua_redis_prepare_connection (lua_State *L, gint *pcbref) dbname = lua_tostring (L, -1); } lua_pop (L, 1); + + lua_pushstring (L, "opaque_data"); + lua_gettable (L, -2); + if (!!lua_toboolean (L, -1)) { + flags |= LUA_REDIS_TEXTDATA; + } + lua_pop (L, 1); + lua_pop (L, 1); /* table */ if (ret && addr != NULL) { ctx = g_slice_alloc0 (sizeof (struct lua_redis_ctx)); REF_INIT_RETAIN (ctx, lua_redis_dtor); - ctx->async = TRUE; + ctx->flags |= flags | LUA_REDIS_ASYNC; ud = &ctx->d.async; ud->s = session; ud->cfg = cfg; @@ -731,6 +763,7 @@ lua_redis_make_request (lua_State *L) lua_pop (L, 1); ud->timeout = timeout; + lua_pushstring (L, "args"); lua_gettable (L, -2); lua_redis_parse_args (L, -1, cmd, &sp_ud->args, &sp_ud->arglens, @@ -814,7 +847,7 @@ lua_redis_make_request_sync (lua_State *L) gdouble timeout = REDIS_DEFAULT_TIMEOUT; gchar **args = NULL; gsize *arglens = NULL; - guint nargs = 0; + guint nargs = 0, flags = 0; redisContext *ctx; redisReply *r; @@ -851,6 +884,14 @@ lua_redis_make_request_sync (lua_State *L) } lua_pop (L, 1); + lua_pushstring (L, "opaque_data"); + lua_gettable (L, -2); + if (!!lua_toboolean (L, -1)) { + flags |= LUA_REDIS_TEXTDATA; + } + lua_pop (L, 1); + + if (cmd) { lua_pushstring (L, "args"); lua_gettable (L, -2); @@ -890,7 +931,7 @@ lua_redis_make_request_sync (lua_State *L) if (r != NULL) { if (r->type != REDIS_REPLY_ERROR) { lua_pushboolean (L, TRUE); - lua_redis_push_reply (L, r); + lua_redis_push_reply (L, r, flags & LUA_REDIS_TEXTDATA); } else { lua_pushboolean (L, FALSE); @@ -979,6 +1020,7 @@ lua_redis_connect_sync (lua_State *L) const gchar *host; struct timeval tv; gboolean ret = FALSE; + guint flags = 0; gdouble timeout = REDIS_DEFAULT_TIMEOUT; struct lua_redis_ctx *ctx, **pctx; @@ -1008,6 +1050,13 @@ lua_redis_connect_sync (lua_State *L) } lua_pop (L, 1); + lua_pushstring (L, "opaque_data"); + lua_gettable (L, -2); + if (!!lua_toboolean (L, -1)) { + flags |= LUA_REDIS_TEXTDATA; + } + lua_pop (L, 1); + if (addr) { ret = TRUE; } @@ -1017,7 +1066,7 @@ lua_redis_connect_sync (lua_State *L) double_to_tv (timeout, &tv); ctx = g_slice_alloc0 (sizeof (struct lua_redis_ctx)); REF_INIT_RETAIN (ctx, lua_redis_dtor); - ctx->async = FALSE; + ctx->flags = flags; ctx->d.sync = redisConnectWithTimeout ( rspamd_inet_address_to_string (addr->addr), rspamd_inet_address_get_port (addr->addr), tv); @@ -1082,7 +1131,7 @@ lua_redis_add_cmd (lua_State *L) if (ctx) { - if (ctx->async) { + if (IS_ASYNC (ctx)) { ud = &ctx->d.async; /* Async version */ @@ -1202,7 +1251,7 @@ lua_redis_exec (lua_State *L) return 1; } - if (ctx->async) { + if (IS_ASYNC (ctx)) { lua_pushstring (L, "Async redis pipelining is not implemented"); lua_error (L); return 0; @@ -1220,7 +1269,8 @@ lua_redis_exec (lua_State *L) if (ret == REDIS_OK) { if (r->type != REDIS_REPLY_ERROR) { lua_pushboolean (L, TRUE); - lua_redis_push_reply (L, r); + lua_redis_push_reply (L, r, + ctx->flags & LUA_REDIS_TEXTDATA); } else { lua_pushboolean (L, FALSE); |