aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--src/lua/lua_redis.c67
1 files changed, 48 insertions, 19 deletions
diff --git a/src/lua/lua_redis.c b/src/lua/lua_redis.c
index 675a20d3c..f6623db3d 100644
--- a/src/lua/lua_redis.c
+++ b/src/lua/lua_redis.c
@@ -72,6 +72,22 @@ static const struct luaL_reg redislib_m[] = {
{NULL, NULL}
};
+#undef REDIS_DEBUG_REFS
+#ifdef REDIS_DEBUG_REFS
+#define REDIS_RETAIN(x) do { \
+ msg_err ("retain ref %p, refcount: %d", (x), (x)->ref.refcount); \
+ REF_RETAIN(x); \
+} while (0)
+
+#define REDIS_RELEASE(x) do { \
+ msg_err ("release ref %p, refcount: %d", (x), (x)->ref.refcount); \
+ REF_RELEASE(x); \
+} while (0)
+#else
+#define REDIS_RETAIN REF_RETAIN
+#define REDIS_RELEASE REF_RELEASE
+#endif
+
#ifdef WITH_HIREDIS
struct lua_redis_specific_userdata;
/**
@@ -93,6 +109,7 @@ struct lua_redis_specific_userdata {
gint cbref;
guint nargs;
gchar **args;
+ struct rspamd_async_watcher *w;
struct event timeout;
struct lua_redis_userdata *c;
struct lua_redis_ctx *ctx;
@@ -112,7 +129,7 @@ struct lua_redis_ctx {
static struct lua_redis_ctx *
lua_check_redis (lua_State * L, gint pos)
{
- void *ud = luaL_checkudata (L, pos, "rspamd{redis}");
+ void *ud = rspamd_lua_check_udata (L, pos, "rspamd{redis}");
luaL_argcheck (L, ud != NULL, pos, "'redis' expected");
return ud ? *((struct lua_redis_ctx **)ud) : NULL;
}
@@ -182,7 +199,7 @@ lua_redis_gc (lua_State *L)
struct lua_redis_ctx *ctx = lua_check_redis (L, 1);
if (ctx) {
- REF_RELEASE (ctx);
+ REDIS_RELEASE (ctx);
}
return 0;
@@ -195,7 +212,14 @@ lua_redis_fin (void *arg)
struct lua_redis_ctx *ctx;
ctx = sp_ud->ctx;
- REF_RELEASE (ctx);
+ event_del (&sp_ud->timeout);
+
+ if (sp_ud->cbref != -1) {
+ luaL_unref (sp_ud->c->L, LUA_REGISTRYINDEX, sp_ud->cbref);
+ sp_ud->cbref = -1;
+ }
+
+ REDIS_RELEASE (ctx);
}
/**
@@ -227,10 +251,11 @@ lua_redis_push_error (const gchar *err,
msg_info ("call to callback failed: %s", lua_tostring (ud->L, -1));
lua_pop (ud->L, 1);
}
- }
- if (connected) {
- rspamd_session_remove_event (ud->task->s, lua_redis_fin, sp_ud);
+ if (connected) {
+ rspamd_session_watcher_pop (ud->task->s, sp_ud->w);
+ rspamd_session_remove_event (ud->task->s, lua_redis_fin, sp_ud);
+ }
}
}
@@ -292,9 +317,10 @@ lua_redis_push_data (const redisReply *r, struct lua_redis_ctx *ctx,
msg_info ("call to callback failed: %s", lua_tostring (ud->L, -1));
lua_pop (ud->L, 1);
}
- }
- rspamd_session_remove_event (ud->task->s, lua_redis_fin, sp_ud);
+ rspamd_session_watcher_pop (ud->task->s, sp_ud->w);
+ rspamd_session_remove_event (ud->task->s, lua_redis_fin, sp_ud);
+ }
}
/**
@@ -320,8 +346,7 @@ lua_redis_callback (redisAsyncContext *c, gpointer r, gpointer priv)
return;
}
- REF_RETAIN (ctx);
- event_del (&sp_ud->timeout);
+ REDIS_RETAIN (ctx);
ctx->cmds_pending --;
if (c->err == 0) {
@@ -357,7 +382,7 @@ lua_redis_callback (redisAsyncContext *c, gpointer r, gpointer priv)
}
}
- REF_RELEASE (ctx);
+ REDIS_RELEASE (ctx);
}
static void
@@ -369,7 +394,7 @@ lua_redis_timeout (int fd, short what, gpointer u)
ctx = sp_ud->ctx;
- REF_RETAIN (ctx);
+ REDIS_RETAIN (ctx);
msg_info ("timeout while querying redis server");
lua_redis_push_error ("timeout while connecting the server", ctx, sp_ud, FALSE);
@@ -384,7 +409,7 @@ lua_redis_timeout (int fd, short what, gpointer u)
sp_ud->c->terminated = 1;
redisAsyncFree (ac);
}
- REF_RELEASE (ctx);
+ REDIS_RELEASE (ctx);
}
@@ -640,7 +665,7 @@ lua_redis_make_request (lua_State *L)
ud->ctx = NULL;
}
- REF_RELEASE (ctx);
+ REDIS_RELEASE (ctx);
lua_pushboolean (L, FALSE);
lua_pushnil (L);
@@ -669,9 +694,11 @@ lua_redis_make_request (lua_State *L)
lua_redis_fin,
sp_ud,
g_quark_from_static_string ("lua redis"));
+ sp_ud->w = rspamd_session_get_watcher (ud->task->s);
+ rspamd_session_watcher_push (ud->task->s);
sp_ud->ctx = ctx;
- REF_RETAIN (ctx);
+ REDIS_RETAIN (ctx);
ctx->cmds_pending ++;
double_to_tv (timeout, &tv);
event_set (&sp_ud->timeout, -1, EV_TIMEOUT, lua_redis_timeout, sp_ud);
@@ -683,7 +710,7 @@ lua_redis_make_request (lua_State *L)
msg_info ("call to redis failed: %s", ud->ctx->errstr);
redisAsyncFree (ud->ctx);
ud->ctx = NULL;
- REF_RELEASE (ctx);
+ REDIS_RELEASE (ctx);
ret = FALSE;
}
}
@@ -903,7 +930,7 @@ lua_redis_connect (lua_State *L)
rspamd_inet_address_get_port (addr->addr));
if (ud->ctx == NULL || ud->ctx->err) {
- REF_RELEASE (ctx);
+ REDIS_RELEASE (ctx);
lua_pushboolean (L, FALSE);
return 1;
@@ -994,7 +1021,7 @@ lua_redis_connect_sync (lua_State *L)
lua_pushstring (L, "unknown error");
}
- REF_RELEASE (ctx);
+ REDIS_RELEASE (ctx);
return 2;
}
@@ -1077,12 +1104,14 @@ lua_redis_add_cmd (lua_State *L)
lua_redis_fin,
sp_ud,
g_quark_from_static_string ("lua redis"));
+ sp_ud->w = rspamd_session_get_watcher (sp_ud->c->task->s);
+ rspamd_session_watcher_push (sp_ud->c->task->s);
double_to_tv (sp_ud->c->timeout, &tv);
event_set (&sp_ud->timeout, -1, EV_TIMEOUT, lua_redis_timeout, sp_ud);
event_base_set (sp_ud->c->task->ev_base, &sp_ud->timeout);
event_add (&sp_ud->timeout, &tv);
- REF_RETAIN (ctx);
+ REDIS_RETAIN (ctx);
ctx->cmds_pending ++;
}
else {