From a4c9f20fa7f30124253bbcc807eaec9cce65ee48 Mon Sep 17 00:00:00 2001
From: Vsevolod Stakhov <vsevolod@highsecure.ru>
Date: Thu, 5 May 2016 14:26:02 +0100
Subject: [Fix] Improve lua redis handling

---
 src/lua/lua_redis.c | 67 ++++++++++++++++++++++++++++++++++++++---------------
 1 file changed, 48 insertions(+), 19 deletions(-)

(limited to 'src')

diff --git a/src/lua/lua_redis.c b/src/lua/lua_redis.c
index 675a20d3c..f6623db3d 100644
--- a/src/lua/lua_redis.c
+++ b/src/lua/lua_redis.c
@@ -72,6 +72,22 @@ static const struct luaL_reg redislib_m[] = {
 	{NULL, NULL}
 };
 
+#undef REDIS_DEBUG_REFS
+#ifdef REDIS_DEBUG_REFS
+#define REDIS_RETAIN(x) do { \
+	msg_err ("retain ref %p, refcount: %d", (x), (x)->ref.refcount); \
+	REF_RETAIN(x);	\
+} while (0)
+
+#define REDIS_RELEASE(x) do { \
+	msg_err ("release ref %p, refcount: %d", (x), (x)->ref.refcount); \
+	REF_RELEASE(x);	\
+} while (0)
+#else
+#define REDIS_RETAIN REF_RETAIN
+#define REDIS_RELEASE REF_RELEASE
+#endif
+
 #ifdef WITH_HIREDIS
 struct lua_redis_specific_userdata;
 /**
@@ -93,6 +109,7 @@ struct lua_redis_specific_userdata {
 	gint cbref;
 	guint nargs;
 	gchar **args;
+	struct rspamd_async_watcher *w;
 	struct event timeout;
 	struct lua_redis_userdata *c;
 	struct lua_redis_ctx *ctx;
@@ -112,7 +129,7 @@ struct lua_redis_ctx {
 static struct lua_redis_ctx *
 lua_check_redis (lua_State * L, gint pos)
 {
-	void *ud = luaL_checkudata (L, pos, "rspamd{redis}");
+	void *ud = rspamd_lua_check_udata (L, pos, "rspamd{redis}");
 	luaL_argcheck (L, ud != NULL, pos, "'redis' expected");
 	return ud ? *((struct lua_redis_ctx **)ud) : NULL;
 }
@@ -182,7 +199,7 @@ lua_redis_gc (lua_State *L)
 	struct lua_redis_ctx *ctx = lua_check_redis (L, 1);
 
 	if (ctx) {
-		REF_RELEASE (ctx);
+		REDIS_RELEASE (ctx);
 	}
 
 	return 0;
@@ -195,7 +212,14 @@ lua_redis_fin (void *arg)
 	struct lua_redis_ctx *ctx;
 
 	ctx = sp_ud->ctx;
-	REF_RELEASE (ctx);
+	event_del (&sp_ud->timeout);
+
+	if (sp_ud->cbref != -1) {
+		luaL_unref (sp_ud->c->L, LUA_REGISTRYINDEX, sp_ud->cbref);
+		sp_ud->cbref = -1;
+	}
+
+	REDIS_RELEASE (ctx);
 }
 
 /**
@@ -227,10 +251,11 @@ lua_redis_push_error (const gchar *err,
 			msg_info ("call to callback failed: %s", lua_tostring (ud->L, -1));
 			lua_pop (ud->L, 1);
 		}
-	}
 
-	if (connected) {
-		rspamd_session_remove_event (ud->task->s, lua_redis_fin, sp_ud);
+		if (connected) {
+			rspamd_session_watcher_pop (ud->task->s, sp_ud->w);
+			rspamd_session_remove_event (ud->task->s, lua_redis_fin, sp_ud);
+		}
 	}
 }
 
@@ -292,9 +317,10 @@ lua_redis_push_data (const redisReply *r, struct lua_redis_ctx *ctx,
 			msg_info ("call to callback failed: %s", lua_tostring (ud->L, -1));
 			lua_pop (ud->L, 1);
 		}
-	}
 
-	rspamd_session_remove_event (ud->task->s, lua_redis_fin, sp_ud);
+		rspamd_session_watcher_pop (ud->task->s, sp_ud->w);
+		rspamd_session_remove_event (ud->task->s, lua_redis_fin, sp_ud);
+	}
 }
 
 /**
@@ -320,8 +346,7 @@ lua_redis_callback (redisAsyncContext *c, gpointer r, gpointer priv)
 		return;
 	}
 
-	REF_RETAIN (ctx);
-	event_del (&sp_ud->timeout);
+	REDIS_RETAIN (ctx);
 	ctx->cmds_pending --;
 
 	if (c->err == 0) {
@@ -357,7 +382,7 @@ lua_redis_callback (redisAsyncContext *c, gpointer r, gpointer priv)
 		}
 	}
 
-	REF_RELEASE (ctx);
+	REDIS_RELEASE (ctx);
 }
 
 static void
@@ -369,7 +394,7 @@ lua_redis_timeout (int fd, short what, gpointer u)
 
 	ctx = sp_ud->ctx;
 
-	REF_RETAIN (ctx);
+	REDIS_RETAIN (ctx);
 	msg_info ("timeout while querying redis server");
 	lua_redis_push_error ("timeout while connecting the server", ctx, sp_ud, FALSE);
 
@@ -384,7 +409,7 @@ lua_redis_timeout (int fd, short what, gpointer u)
 		sp_ud->c->terminated = 1;
 		redisAsyncFree (ac);
 	}
-	REF_RELEASE (ctx);
+	REDIS_RELEASE (ctx);
 }
 
 
@@ -640,7 +665,7 @@ lua_redis_make_request (lua_State *L)
 				ud->ctx = NULL;
 			}
 
-			REF_RELEASE (ctx);
+			REDIS_RELEASE (ctx);
 			lua_pushboolean (L, FALSE);
 			lua_pushnil (L);
 
@@ -669,9 +694,11 @@ lua_redis_make_request (lua_State *L)
 					lua_redis_fin,
 					sp_ud,
 					g_quark_from_static_string ("lua redis"));
+			sp_ud->w = rspamd_session_get_watcher (ud->task->s);
+			rspamd_session_watcher_push (ud->task->s);
 
 			sp_ud->ctx = ctx;
-			REF_RETAIN (ctx);
+			REDIS_RETAIN (ctx);
 			ctx->cmds_pending ++;
 			double_to_tv (timeout, &tv);
 			event_set (&sp_ud->timeout, -1, EV_TIMEOUT, lua_redis_timeout, sp_ud);
@@ -683,7 +710,7 @@ lua_redis_make_request (lua_State *L)
 			msg_info ("call to redis failed: %s", ud->ctx->errstr);
 			redisAsyncFree (ud->ctx);
 			ud->ctx = NULL;
-			REF_RELEASE (ctx);
+			REDIS_RELEASE (ctx);
 			ret = FALSE;
 		}
 	}
@@ -903,7 +930,7 @@ lua_redis_connect (lua_State *L)
 				rspamd_inet_address_get_port (addr->addr));
 
 		if (ud->ctx == NULL || ud->ctx->err) {
-			REF_RELEASE (ctx);
+			REDIS_RELEASE (ctx);
 			lua_pushboolean (L, FALSE);
 
 			return 1;
@@ -994,7 +1021,7 @@ lua_redis_connect_sync (lua_State *L)
 				lua_pushstring (L, "unknown error");
 			}
 
-			REF_RELEASE (ctx);
+			REDIS_RELEASE (ctx);
 
 			return 2;
 		}
@@ -1077,12 +1104,14 @@ lua_redis_add_cmd (lua_State *L)
 						lua_redis_fin,
 						sp_ud,
 						g_quark_from_static_string ("lua redis"));
+				sp_ud->w = rspamd_session_get_watcher (sp_ud->c->task->s);
+				rspamd_session_watcher_push (sp_ud->c->task->s);
 
 				double_to_tv (sp_ud->c->timeout, &tv);
 				event_set (&sp_ud->timeout, -1, EV_TIMEOUT, lua_redis_timeout, sp_ud);
 				event_base_set (sp_ud->c->task->ev_base, &sp_ud->timeout);
 				event_add (&sp_ud->timeout, &tv);
-				REF_RETAIN (ctx);
+				REDIS_RETAIN (ctx);
 				ctx->cmds_pending ++;
 			}
 			else {
-- 
cgit v1.2.3