aboutsummaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
authorVsevolod Stakhov <vsevolod@highsecure.ru>2016-05-05 16:36:58 +0100
committerVsevolod Stakhov <vsevolod@highsecure.ru>2016-05-05 16:36:58 +0100
commit647d5d2a8f00aa97800a4b40fcab0f1bf02c3f9b (patch)
treecbfe3c165c10cc91b3158cd120cbadd2fc7bbfcb /src
parentdedb8506630e0de3f384d19c781993dedd73a2d5 (diff)
downloadrspamd-647d5d2a8f00aa97800a4b40fcab0f1bf02c3f9b.tar.gz
rspamd-647d5d2a8f00aa97800a4b40fcab0f1bf02c3f9b.zip
[Fix] Another rework for lua_redis events handling
Issue: #609
Diffstat (limited to 'src')
-rw-r--r--src/lua/lua_redis.c86
1 files changed, 46 insertions, 40 deletions
diff --git a/src/lua/lua_redis.c b/src/lua/lua_redis.c
index f6623db3d..920d8852a 100644
--- a/src/lua/lua_redis.c
+++ b/src/lua/lua_redis.c
@@ -110,10 +110,11 @@ struct lua_redis_specific_userdata {
guint nargs;
gchar **args;
struct rspamd_async_watcher *w;
- struct event timeout;
struct lua_redis_userdata *c;
struct lua_redis_ctx *ctx;
struct lua_redis_specific_userdata *next;
+ struct event timeout;
+ gboolean replied;
};
struct lua_redis_ctx {
@@ -156,6 +157,7 @@ lua_redis_dtor (struct lua_redis_ctx *ctx)
gboolean is_connected = FALSE;
if (ctx->async) {
+ msg_debug ("desctructing %p", ctx);
ud = &ctx->d.async;
if (ud->ctx) {
@@ -213,11 +215,8 @@ lua_redis_fin (void *arg)
ctx = sp_ud->ctx;
event_del (&sp_ud->timeout);
-
- if (sp_ud->cbref != -1) {
- luaL_unref (sp_ud->c->L, LUA_REGISTRYINDEX, sp_ud->cbref);
- sp_ud->cbref = -1;
- }
+ msg_debug ("finished redis query %p from session %p", sp_ud, ctx);
+ sp_ud->replied = TRUE;
REDIS_RELEASE (ctx);
}
@@ -236,22 +235,25 @@ lua_redis_push_error (const gchar *err,
struct rspamd_task **ptask;
struct lua_redis_userdata *ud = sp_ud->c;
- if (sp_ud->cbref != -1) {
- /* Push error */
- lua_rawgeti (ud->L, LUA_REGISTRYINDEX, sp_ud->cbref);
- ptask = lua_newuserdata (ud->L, sizeof (struct rspamd_task *));
- rspamd_lua_setclass (ud->L, "rspamd{task}", -1);
-
- *ptask = ud->task;
- /* String of error */
- lua_pushstring (ud->L, err);
- /* Data is nil */
- lua_pushnil (ud->L);
- if (lua_pcall (ud->L, 3, 0, 0) != 0) {
- msg_info ("call to callback failed: %s", lua_tostring (ud->L, -1));
- lua_pop (ud->L, 1);
+ if (!sp_ud->replied) {
+ if (sp_ud->cbref != -1) {
+ /* Push error */
+ lua_rawgeti (ud->L, LUA_REGISTRYINDEX, sp_ud->cbref);
+ ptask = lua_newuserdata (ud->L, sizeof (struct rspamd_task *));
+ rspamd_lua_setclass (ud->L, "rspamd{task}", -1);
+
+ *ptask = ud->task;
+ /* String of error */
+ lua_pushstring (ud->L, err);
+ /* Data is nil */
+ lua_pushnil (ud->L);
+ if (lua_pcall (ud->L, 3, 0, 0) != 0) {
+ msg_info ("call to callback failed: %s", lua_tostring (ud->L, -1));
+ lua_pop (ud->L, 1);
+ }
}
+ sp_ud->replied = TRUE;
if (connected) {
rspamd_session_watcher_pop (ud->task->s, sp_ud->w);
rspamd_session_remove_event (ud->task->s, lua_redis_fin, sp_ud);
@@ -301,21 +303,24 @@ lua_redis_push_data (const redisReply *r, struct lua_redis_ctx *ctx,
struct rspamd_task **ptask;
struct lua_redis_userdata *ud = sp_ud->c;
- if (sp_ud->cbref != -1) {
- /* Push error */
- lua_rawgeti (ud->L, LUA_REGISTRYINDEX, sp_ud->cbref);
- ptask = lua_newuserdata (ud->L, sizeof (struct rspamd_task *));
- rspamd_lua_setclass (ud->L, "rspamd{task}", -1);
-
- *ptask = ud->task;
- /* Error is nil */
- lua_pushnil (ud->L);
- /* Data */
- lua_redis_push_reply (ud->L, r);
-
- if (lua_pcall (ud->L, 3, 0, 0) != 0) {
- msg_info ("call to callback failed: %s", lua_tostring (ud->L, -1));
- lua_pop (ud->L, 1);
+ if (!sp_ud->replied) {
+ if (sp_ud->cbref != -1) {
+ /* Push error */
+ lua_rawgeti (ud->L, LUA_REGISTRYINDEX, sp_ud->cbref);
+ ptask = lua_newuserdata (ud->L, sizeof (struct rspamd_task *));
+ rspamd_lua_setclass (ud->L, "rspamd{task}", -1);
+
+ *ptask = ud->task;
+ /* Error is nil */
+ lua_pushnil (ud->L);
+ /* Data */
+ lua_redis_push_reply (ud->L, r);
+
+ if (lua_pcall (ud->L, 3, 0, 0) != 0) {
+ msg_info ("call to callback failed: %s", lua_tostring (ud->L, -1));
+ lua_pop (ud->L, 1);
+ }
+
}
rspamd_session_watcher_pop (ud->task->s, sp_ud->w);
@@ -346,6 +351,8 @@ lua_redis_callback (redisAsyncContext *c, gpointer r, gpointer priv)
return;
}
+ msg_debug ("got reply from redis %p for query %p", ctx, sp_ud);
+
REDIS_RETAIN (ctx);
ctx->cmds_pending --;
@@ -395,7 +402,7 @@ lua_redis_timeout (int fd, short what, gpointer u)
ctx = sp_ud->ctx;
REDIS_RETAIN (ctx);
- msg_info ("timeout while querying redis server");
+ msg_debug ("timeout while querying redis server");
lua_redis_push_error ("timeout while connecting the server", ctx, sp_ud, FALSE);
if (sp_ud->c->ctx) {
@@ -406,7 +413,6 @@ lua_redis_timeout (int fd, short what, gpointer u)
* This will call all callbacks pending so the entire context
* will be destructed
*/
- sp_ud->c->terminated = 1;
redisAsyncFree (ac);
}
REDIS_RELEASE (ctx);
@@ -584,7 +590,7 @@ lua_redis_make_request (lua_State *L)
ud->task = task;
ud->L = L;
- sp_ud = g_slice_alloc (sizeof (*sp_ud));
+ sp_ud = g_slice_alloc0 (sizeof (*sp_ud));
sp_ud->cbref = cbref;
sp_ud->c = ud;
@@ -632,7 +638,7 @@ lua_redis_make_request (lua_State *L)
}
- sp_ud = g_slice_alloc (sizeof (*sp_ud));
+ sp_ud = g_slice_alloc0 (sizeof (*sp_ud));
sp_ud->cbref = cbref;
sp_ud->c = ud;
cmd = luaL_checkstring (L, args_pos);
@@ -1082,7 +1088,7 @@ lua_redis_add_cmd (lua_State *L)
return luaL_error (L, "invalid arguments");
}
- sp_ud = g_slice_alloc (sizeof (*sp_ud));
+ sp_ud = g_slice_alloc0 (sizeof (*sp_ud));
sp_ud->cbref = cbref;
sp_ud->c = &ctx->d.async;
sp_ud->ctx = ctx;