From 647d5d2a8f00aa97800a4b40fcab0f1bf02c3f9b Mon Sep 17 00:00:00 2001 From: Vsevolod Stakhov Date: Thu, 5 May 2016 16:36:58 +0100 Subject: [PATCH] [Fix] Another rework for lua_redis events handling Issue: #609 --- src/lua/lua_redis.c | 86 ++++++++++++++++++++++++--------------------- 1 file changed, 46 insertions(+), 40 deletions(-) diff --git a/src/lua/lua_redis.c b/src/lua/lua_redis.c index f6623db3d..920d8852a 100644 --- a/src/lua/lua_redis.c +++ b/src/lua/lua_redis.c @@ -110,10 +110,11 @@ struct lua_redis_specific_userdata { guint nargs; gchar **args; struct rspamd_async_watcher *w; - struct event timeout; struct lua_redis_userdata *c; struct lua_redis_ctx *ctx; struct lua_redis_specific_userdata *next; + struct event timeout; + gboolean replied; }; struct lua_redis_ctx { @@ -156,6 +157,7 @@ lua_redis_dtor (struct lua_redis_ctx *ctx) gboolean is_connected = FALSE; if (ctx->async) { + msg_debug ("desctructing %p", ctx); ud = &ctx->d.async; if (ud->ctx) { @@ -213,11 +215,8 @@ lua_redis_fin (void *arg) ctx = sp_ud->ctx; event_del (&sp_ud->timeout); - - if (sp_ud->cbref != -1) { - luaL_unref (sp_ud->c->L, LUA_REGISTRYINDEX, sp_ud->cbref); - sp_ud->cbref = -1; - } + msg_debug ("finished redis query %p from session %p", sp_ud, ctx); + sp_ud->replied = TRUE; REDIS_RELEASE (ctx); } @@ -236,22 +235,25 @@ lua_redis_push_error (const gchar *err, struct rspamd_task **ptask; struct lua_redis_userdata *ud = sp_ud->c; - if (sp_ud->cbref != -1) { - /* Push error */ - lua_rawgeti (ud->L, LUA_REGISTRYINDEX, sp_ud->cbref); - ptask = lua_newuserdata (ud->L, sizeof (struct rspamd_task *)); - rspamd_lua_setclass (ud->L, "rspamd{task}", -1); - - *ptask = ud->task; - /* String of error */ - lua_pushstring (ud->L, err); - /* Data is nil */ - lua_pushnil (ud->L); - if (lua_pcall (ud->L, 3, 0, 0) != 0) { - msg_info ("call to callback failed: %s", lua_tostring (ud->L, -1)); - lua_pop (ud->L, 1); + if (!sp_ud->replied) { + if (sp_ud->cbref != -1) { + /* Push error */ + lua_rawgeti (ud->L, LUA_REGISTRYINDEX, sp_ud->cbref); + ptask = lua_newuserdata (ud->L, sizeof (struct rspamd_task *)); + rspamd_lua_setclass (ud->L, "rspamd{task}", -1); + + *ptask = ud->task; + /* String of error */ + lua_pushstring (ud->L, err); + /* Data is nil */ + lua_pushnil (ud->L); + if (lua_pcall (ud->L, 3, 0, 0) != 0) { + msg_info ("call to callback failed: %s", lua_tostring (ud->L, -1)); + lua_pop (ud->L, 1); + } } + sp_ud->replied = TRUE; if (connected) { rspamd_session_watcher_pop (ud->task->s, sp_ud->w); rspamd_session_remove_event (ud->task->s, lua_redis_fin, sp_ud); @@ -301,21 +303,24 @@ lua_redis_push_data (const redisReply *r, struct lua_redis_ctx *ctx, struct rspamd_task **ptask; struct lua_redis_userdata *ud = sp_ud->c; - if (sp_ud->cbref != -1) { - /* Push error */ - lua_rawgeti (ud->L, LUA_REGISTRYINDEX, sp_ud->cbref); - ptask = lua_newuserdata (ud->L, sizeof (struct rspamd_task *)); - rspamd_lua_setclass (ud->L, "rspamd{task}", -1); - - *ptask = ud->task; - /* Error is nil */ - lua_pushnil (ud->L); - /* Data */ - lua_redis_push_reply (ud->L, r); - - if (lua_pcall (ud->L, 3, 0, 0) != 0) { - msg_info ("call to callback failed: %s", lua_tostring (ud->L, -1)); - lua_pop (ud->L, 1); + if (!sp_ud->replied) { + if (sp_ud->cbref != -1) { + /* Push error */ + lua_rawgeti (ud->L, LUA_REGISTRYINDEX, sp_ud->cbref); + ptask = lua_newuserdata (ud->L, sizeof (struct rspamd_task *)); + rspamd_lua_setclass (ud->L, "rspamd{task}", -1); + + *ptask = ud->task; + /* Error is nil */ + lua_pushnil (ud->L); + /* Data */ + lua_redis_push_reply (ud->L, r); + + if (lua_pcall (ud->L, 3, 0, 0) != 0) { + msg_info ("call to callback failed: %s", lua_tostring (ud->L, -1)); + lua_pop (ud->L, 1); + } + } rspamd_session_watcher_pop (ud->task->s, sp_ud->w); @@ -346,6 +351,8 @@ lua_redis_callback (redisAsyncContext *c, gpointer r, gpointer priv) return; } + msg_debug ("got reply from redis %p for query %p", ctx, sp_ud); + REDIS_RETAIN (ctx); ctx->cmds_pending --; @@ -395,7 +402,7 @@ lua_redis_timeout (int fd, short what, gpointer u) ctx = sp_ud->ctx; REDIS_RETAIN (ctx); - msg_info ("timeout while querying redis server"); + msg_debug ("timeout while querying redis server"); lua_redis_push_error ("timeout while connecting the server", ctx, sp_ud, FALSE); if (sp_ud->c->ctx) { @@ -406,7 +413,6 @@ lua_redis_timeout (int fd, short what, gpointer u) * This will call all callbacks pending so the entire context * will be destructed */ - sp_ud->c->terminated = 1; redisAsyncFree (ac); } REDIS_RELEASE (ctx); @@ -584,7 +590,7 @@ lua_redis_make_request (lua_State *L) ud->task = task; ud->L = L; - sp_ud = g_slice_alloc (sizeof (*sp_ud)); + sp_ud = g_slice_alloc0 (sizeof (*sp_ud)); sp_ud->cbref = cbref; sp_ud->c = ud; @@ -632,7 +638,7 @@ lua_redis_make_request (lua_State *L) } - sp_ud = g_slice_alloc (sizeof (*sp_ud)); + sp_ud = g_slice_alloc0 (sizeof (*sp_ud)); sp_ud->cbref = cbref; sp_ud->c = ud; cmd = luaL_checkstring (L, args_pos); @@ -1082,7 +1088,7 @@ lua_redis_add_cmd (lua_State *L) return luaL_error (L, "invalid arguments"); } - sp_ud = g_slice_alloc (sizeof (*sp_ud)); + sp_ud = g_slice_alloc0 (sizeof (*sp_ud)); sp_ud->cbref = cbref; sp_ud->c = &ctx->d.async; sp_ud->ctx = ctx; -- 2.39.5