aboutsummaryrefslogtreecommitdiffstats
path: root/src/lua/lua_redis.c
diff options
context:
space:
mode:
authorVsevolod Stakhov <vsevolod@highsecure.ru>2017-03-18 13:21:02 +0000
committerVsevolod Stakhov <vsevolod@highsecure.ru>2017-03-18 13:21:02 +0000
commitb582a1219d8b2b1e817359cf64fafd3b32ff72a9 (patch)
treee35b4209c41c6e8fb0a2271eb175ce27cb25c204 /src/lua/lua_redis.c
parent5a75d743c04eb6d97023cba7daf47514b7b33b1f (diff)
downloadrspamd-b582a1219d8b2b1e817359cf64fafd3b32ff72a9.tar.gz
rspamd-b582a1219d8b2b1e817359cf64fafd3b32ff72a9.zip
[Minor] Allow to pass data transparently to lua from redis
Diffstat (limited to 'src/lua/lua_redis.c')
-rw-r--r--src/lua/lua_redis.c98
1 files changed, 74 insertions, 24 deletions
diff --git a/src/lua/lua_redis.c b/src/lua/lua_redis.c
index acb355faa..4a0a1cfd9 100644
--- a/src/lua/lua_redis.c
+++ b/src/lua/lua_redis.c
@@ -106,6 +106,12 @@ struct lua_redis_userdata {
guint16 terminated;
};
+#define LUA_REDIS_SPECIFIC_REPLIED (1 << 0)
+#define LUA_REDIS_SPECIFIC_FINISHED (1 << 1)
+#define LUA_REDIS_ASYNC (1 << 0)
+#define LUA_REDIS_TEXTDATA (1 << 1)
+#define IS_ASYNC(ctx) ((ctx)->flags & LUA_REDIS_ASYNC)
+
struct lua_redis_specific_userdata {
gint cbref;
guint nargs;
@@ -116,12 +122,11 @@ struct lua_redis_specific_userdata {
struct lua_redis_ctx *ctx;
struct lua_redis_specific_userdata *next;
struct event timeout;
- gboolean replied;
- gboolean finished;
+ guint flags;
};
struct lua_redis_ctx {
- gboolean async;
+ guint flags;
union {
struct lua_redis_userdata async;
redisContext *sync;
@@ -161,7 +166,7 @@ lua_redis_dtor (struct lua_redis_ctx *ctx)
gboolean is_successfull = TRUE;
struct redisAsyncContext *ac;
- if (ctx->async) {
+ if (IS_ASYNC (ctx)) {
msg_debug ("desctructing %p", ctx);
ud = &ctx->d.async;
@@ -170,11 +175,11 @@ lua_redis_dtor (struct lua_redis_ctx *ctx)
LL_FOREACH_SAFE (ud->specific, cur, tmp) {
event_del (&cur->timeout);
- if (!cur->replied) {
+ if (!(cur->flags & LUA_REDIS_SPECIFIC_REPLIED)) {
is_successfull = FALSE;
}
- cur->finished = TRUE;
+ cur->flags |= LUA_REDIS_SPECIFIC_FINISHED;
}
ud->terminated = 1;
@@ -223,7 +228,7 @@ lua_redis_fin (void *arg)
ctx = sp_ud->ctx;
event_del (&sp_ud->timeout);
msg_debug ("finished redis query %p from session %p", sp_ud, ctx);
- sp_ud->finished = TRUE;
+ sp_ud->flags |= LUA_REDIS_SPECIFIC_FINISHED;
REDIS_RELEASE (ctx);
}
@@ -241,7 +246,7 @@ lua_redis_push_error (const gchar *err,
{
struct lua_redis_userdata *ud = sp_ud->c;
- if (!sp_ud->replied && !sp_ud->finished) {
+ if (!(sp_ud->flags & (LUA_REDIS_SPECIFIC_REPLIED|LUA_REDIS_SPECIFIC_FINISHED))) {
if (sp_ud->cbref != -1) {
/* Push error */
lua_rawgeti (ud->L, LUA_REGISTRYINDEX, sp_ud->cbref);
@@ -257,7 +262,8 @@ lua_redis_push_error (const gchar *err,
}
}
- sp_ud->replied = TRUE;
+ sp_ud->flags |= LUA_REDIS_SPECIFIC_REPLIED;
+
if (connected && ud->s) {
rspamd_session_watcher_pop (ud->s, sp_ud->w);
rspamd_session_remove_event (ud->s, lua_redis_fin, sp_ud);
@@ -269,9 +275,10 @@ lua_redis_push_error (const gchar *err,
}
static void
-lua_redis_push_reply (lua_State *L, const redisReply *r)
+lua_redis_push_reply (lua_State *L, const redisReply *r, gboolean text_data)
{
guint i;
+ struct rspamd_lua_text *t;
switch (r->type) {
case REDIS_REPLY_INTEGER:
@@ -283,12 +290,21 @@ lua_redis_push_reply (lua_State *L, const redisReply *r)
break;
case REDIS_REPLY_STRING:
case REDIS_REPLY_STATUS:
- lua_pushlstring (L, r->str, r->len);
+ if (text_data) {
+ t = lua_newuserdata (L, sizeof (*t));
+ rspamd_lua_setclass (L, "rspamd{text}", -1);
+ t->flags = 0;
+ t->start = r->str;
+ t->len = r->len;
+ }
+ else {
+ lua_pushlstring (L, r->str, r->len);
+ }
break;
case REDIS_REPLY_ARRAY:
lua_createtable (L, r->elements, 0);
for (i = 0; i < r->elements; ++i) {
- lua_redis_push_reply (L, r->element[i]);
+ lua_redis_push_reply (L, r->element[i], text_data);
lua_rawseti (L, -2, i + 1); /* Store sub-reply */
}
break;
@@ -309,14 +325,14 @@ lua_redis_push_data (const redisReply *r, struct lua_redis_ctx *ctx,
{
struct lua_redis_userdata *ud = sp_ud->c;
- if (!sp_ud->replied && !sp_ud->finished) {
+ if (!(sp_ud->flags & (LUA_REDIS_SPECIFIC_REPLIED|LUA_REDIS_SPECIFIC_FINISHED))) {
if (sp_ud->cbref != -1) {
/* Push error */
lua_rawgeti (ud->L, LUA_REGISTRYINDEX, sp_ud->cbref);
/* Error is nil */
lua_pushnil (ud->L);
/* Data */
- lua_redis_push_reply (ud->L, r);
+ lua_redis_push_reply (ud->L, r, ctx->flags & LUA_REDIS_TEXTDATA);
if (lua_pcall (ud->L, 2, 0, 0) != 0) {
msg_info ("call to callback failed: %s", lua_tostring (ud->L, -1));
@@ -325,7 +341,7 @@ lua_redis_push_data (const redisReply *r, struct lua_redis_ctx *ctx,
}
- sp_ud->replied = TRUE;
+ sp_ud->flags |= LUA_REDIS_SPECIFIC_REPLIED;
if (ud->s) {
rspamd_session_watcher_pop (ud->s, sp_ud->w);
@@ -365,7 +381,7 @@ lua_redis_callback (redisAsyncContext *c, gpointer r, gpointer priv)
REDIS_RETAIN (ctx);
/* If session is finished, we cannot call lua callbacks */
- if (!sp_ud->finished) {
+ if (!(sp_ud->flags & LUA_REDIS_SPECIFIC_FINISHED)) {
if (c->err == 0) {
if (r != NULL) {
if (reply->type != REDIS_REPLY_ERROR) {
@@ -412,7 +428,7 @@ lua_redis_timeout (int fd, short what, gpointer u)
struct lua_redis_ctx *ctx;
redisAsyncContext *ac;
- if (sp_ud->finished) {
+ if (sp_ud->flags & LUA_REDIS_SPECIFIC_FINISHED) {
return;
}
@@ -530,6 +546,7 @@ rspamd_lua_redis_prepare_connection (lua_State *L, gint *pcbref)
struct rspamd_async_session *session = NULL;
struct event_base *ev_base = NULL;
gboolean ret = FALSE;
+ guint flags = 0;
if (lua_istable (L, 1)) {
/* Table version */
@@ -564,6 +581,13 @@ rspamd_lua_redis_prepare_connection (lua_State *L, gint *pcbref)
}
lua_pop (L, 1);
+ lua_pushstring (L, "ev_base");
+ lua_gettable (L, -2);
+ if (lua_type (L, -1) == LUA_TUSERDATA) {
+ ev_base = lua_check_ev_base (L, -1);
+ }
+ lua_pop (L, 1);
+
if (cfg && ev_base) {
ret = TRUE;
}
@@ -622,13 +646,21 @@ rspamd_lua_redis_prepare_connection (lua_State *L, gint *pcbref)
dbname = lua_tostring (L, -1);
}
lua_pop (L, 1);
+
+ lua_pushstring (L, "opaque_data");
+ lua_gettable (L, -2);
+ if (!!lua_toboolean (L, -1)) {
+ flags |= LUA_REDIS_TEXTDATA;
+ }
+ lua_pop (L, 1);
+
lua_pop (L, 1); /* table */
if (ret && addr != NULL) {
ctx = g_slice_alloc0 (sizeof (struct lua_redis_ctx));
REF_INIT_RETAIN (ctx, lua_redis_dtor);
- ctx->async = TRUE;
+ ctx->flags |= flags | LUA_REDIS_ASYNC;
ud = &ctx->d.async;
ud->s = session;
ud->cfg = cfg;
@@ -731,6 +763,7 @@ lua_redis_make_request (lua_State *L)
lua_pop (L, 1);
ud->timeout = timeout;
+
lua_pushstring (L, "args");
lua_gettable (L, -2);
lua_redis_parse_args (L, -1, cmd, &sp_ud->args, &sp_ud->arglens,
@@ -814,7 +847,7 @@ lua_redis_make_request_sync (lua_State *L)
gdouble timeout = REDIS_DEFAULT_TIMEOUT;
gchar **args = NULL;
gsize *arglens = NULL;
- guint nargs = 0;
+ guint nargs = 0, flags = 0;
redisContext *ctx;
redisReply *r;
@@ -851,6 +884,14 @@ lua_redis_make_request_sync (lua_State *L)
}
lua_pop (L, 1);
+ lua_pushstring (L, "opaque_data");
+ lua_gettable (L, -2);
+ if (!!lua_toboolean (L, -1)) {
+ flags |= LUA_REDIS_TEXTDATA;
+ }
+ lua_pop (L, 1);
+
+
if (cmd) {
lua_pushstring (L, "args");
lua_gettable (L, -2);
@@ -890,7 +931,7 @@ lua_redis_make_request_sync (lua_State *L)
if (r != NULL) {
if (r->type != REDIS_REPLY_ERROR) {
lua_pushboolean (L, TRUE);
- lua_redis_push_reply (L, r);
+ lua_redis_push_reply (L, r, flags & LUA_REDIS_TEXTDATA);
}
else {
lua_pushboolean (L, FALSE);
@@ -979,6 +1020,7 @@ lua_redis_connect_sync (lua_State *L)
const gchar *host;
struct timeval tv;
gboolean ret = FALSE;
+ guint flags = 0;
gdouble timeout = REDIS_DEFAULT_TIMEOUT;
struct lua_redis_ctx *ctx, **pctx;
@@ -1008,6 +1050,13 @@ lua_redis_connect_sync (lua_State *L)
}
lua_pop (L, 1);
+ lua_pushstring (L, "opaque_data");
+ lua_gettable (L, -2);
+ if (!!lua_toboolean (L, -1)) {
+ flags |= LUA_REDIS_TEXTDATA;
+ }
+ lua_pop (L, 1);
+
if (addr) {
ret = TRUE;
}
@@ -1017,7 +1066,7 @@ lua_redis_connect_sync (lua_State *L)
double_to_tv (timeout, &tv);
ctx = g_slice_alloc0 (sizeof (struct lua_redis_ctx));
REF_INIT_RETAIN (ctx, lua_redis_dtor);
- ctx->async = FALSE;
+ ctx->flags = flags;
ctx->d.sync = redisConnectWithTimeout (
rspamd_inet_address_to_string (addr->addr),
rspamd_inet_address_get_port (addr->addr), tv);
@@ -1082,7 +1131,7 @@ lua_redis_add_cmd (lua_State *L)
if (ctx) {
- if (ctx->async) {
+ if (IS_ASYNC (ctx)) {
ud = &ctx->d.async;
/* Async version */
@@ -1202,7 +1251,7 @@ lua_redis_exec (lua_State *L)
return 1;
}
- if (ctx->async) {
+ if (IS_ASYNC (ctx)) {
lua_pushstring (L, "Async redis pipelining is not implemented");
lua_error (L);
return 0;
@@ -1220,7 +1269,8 @@ lua_redis_exec (lua_State *L)
if (ret == REDIS_OK) {
if (r->type != REDIS_REPLY_ERROR) {
lua_pushboolean (L, TRUE);
- lua_redis_push_reply (L, r);
+ lua_redis_push_reply (L, r,
+ ctx->flags & LUA_REDIS_TEXTDATA);
}
else {
lua_pushboolean (L, FALSE);