]> source.dussan.org Git - rspamd.git/commitdiff
[Fix] Another rework for lua_redis events handling
authorVsevolod Stakhov <vsevolod@highsecure.ru>
Thu, 5 May 2016 15:36:58 +0000 (16:36 +0100)
committerVsevolod Stakhov <vsevolod@highsecure.ru>
Thu, 5 May 2016 15:36:58 +0000 (16:36 +0100)
Issue: #609

src/lua/lua_redis.c

index f6623db3df542d1cc2208c6a14a594e957b644d4..920d8852a9aa683f12ae334363be9739b18dc948 100644 (file)
@@ -110,10 +110,11 @@ struct lua_redis_specific_userdata {
        guint nargs;
        gchar **args;
        struct rspamd_async_watcher *w;
-       struct event timeout;
        struct lua_redis_userdata *c;
        struct lua_redis_ctx *ctx;
        struct lua_redis_specific_userdata *next;
+       struct event timeout;
+       gboolean replied;
 };
 
 struct lua_redis_ctx {
@@ -156,6 +157,7 @@ lua_redis_dtor (struct lua_redis_ctx *ctx)
        gboolean is_connected = FALSE;
 
        if (ctx->async) {
+               msg_debug ("desctructing %p", ctx);
                ud = &ctx->d.async;
 
                if (ud->ctx) {
@@ -213,11 +215,8 @@ lua_redis_fin (void *arg)
 
        ctx = sp_ud->ctx;
        event_del (&sp_ud->timeout);
-
-       if (sp_ud->cbref != -1) {
-               luaL_unref (sp_ud->c->L, LUA_REGISTRYINDEX, sp_ud->cbref);
-               sp_ud->cbref = -1;
-       }
+       msg_debug ("finished redis query %p from session %p", sp_ud, ctx);
+       sp_ud->replied = TRUE;
 
        REDIS_RELEASE (ctx);
 }
@@ -236,22 +235,25 @@ lua_redis_push_error (const gchar *err,
        struct rspamd_task **ptask;
        struct lua_redis_userdata *ud = sp_ud->c;
 
-       if (sp_ud->cbref != -1) {
-               /* Push error */
-               lua_rawgeti (ud->L, LUA_REGISTRYINDEX, sp_ud->cbref);
-               ptask = lua_newuserdata (ud->L, sizeof (struct rspamd_task *));
-               rspamd_lua_setclass (ud->L, "rspamd{task}", -1);
-
-               *ptask = ud->task;
-               /* String of error */
-               lua_pushstring (ud->L, err);
-               /* Data is nil */
-               lua_pushnil (ud->L);
-               if (lua_pcall (ud->L, 3, 0, 0) != 0) {
-                       msg_info ("call to callback failed: %s", lua_tostring (ud->L, -1));
-                       lua_pop (ud->L, 1);
+       if (!sp_ud->replied) {
+               if (sp_ud->cbref != -1) {
+                       /* Push error */
+                       lua_rawgeti (ud->L, LUA_REGISTRYINDEX, sp_ud->cbref);
+                       ptask = lua_newuserdata (ud->L, sizeof (struct rspamd_task *));
+                       rspamd_lua_setclass (ud->L, "rspamd{task}", -1);
+
+                       *ptask = ud->task;
+                       /* String of error */
+                       lua_pushstring (ud->L, err);
+                       /* Data is nil */
+                       lua_pushnil (ud->L);
+                       if (lua_pcall (ud->L, 3, 0, 0) != 0) {
+                               msg_info ("call to callback failed: %s", lua_tostring (ud->L, -1));
+                               lua_pop (ud->L, 1);
+                       }
                }
 
+               sp_ud->replied = TRUE;
                if (connected) {
                        rspamd_session_watcher_pop (ud->task->s, sp_ud->w);
                        rspamd_session_remove_event (ud->task->s, lua_redis_fin, sp_ud);
@@ -301,21 +303,24 @@ lua_redis_push_data (const redisReply *r, struct lua_redis_ctx *ctx,
        struct rspamd_task **ptask;
        struct lua_redis_userdata *ud = sp_ud->c;
 
-       if (sp_ud->cbref != -1) {
-               /* Push error */
-               lua_rawgeti (ud->L, LUA_REGISTRYINDEX, sp_ud->cbref);
-               ptask = lua_newuserdata (ud->L, sizeof (struct rspamd_task *));
-               rspamd_lua_setclass (ud->L, "rspamd{task}", -1);
-
-               *ptask = ud->task;
-               /* Error is nil */
-               lua_pushnil (ud->L);
-               /* Data */
-               lua_redis_push_reply (ud->L, r);
-
-               if (lua_pcall (ud->L, 3, 0, 0) != 0) {
-                       msg_info ("call to callback failed: %s", lua_tostring (ud->L, -1));
-                       lua_pop (ud->L, 1);
+       if (!sp_ud->replied) {
+               if (sp_ud->cbref != -1) {
+                       /* Push error */
+                       lua_rawgeti (ud->L, LUA_REGISTRYINDEX, sp_ud->cbref);
+                       ptask = lua_newuserdata (ud->L, sizeof (struct rspamd_task *));
+                       rspamd_lua_setclass (ud->L, "rspamd{task}", -1);
+
+                       *ptask = ud->task;
+                       /* Error is nil */
+                       lua_pushnil (ud->L);
+                       /* Data */
+                       lua_redis_push_reply (ud->L, r);
+
+                       if (lua_pcall (ud->L, 3, 0, 0) != 0) {
+                               msg_info ("call to callback failed: %s", lua_tostring (ud->L, -1));
+                               lua_pop (ud->L, 1);
+                       }
+
                }
 
                rspamd_session_watcher_pop (ud->task->s, sp_ud->w);
@@ -346,6 +351,8 @@ lua_redis_callback (redisAsyncContext *c, gpointer r, gpointer priv)
                return;
        }
 
+       msg_debug ("got reply from redis %p for query %p", ctx, sp_ud);
+
        REDIS_RETAIN (ctx);
        ctx->cmds_pending --;
 
@@ -395,7 +402,7 @@ lua_redis_timeout (int fd, short what, gpointer u)
        ctx = sp_ud->ctx;
 
        REDIS_RETAIN (ctx);
-       msg_info ("timeout while querying redis server");
+       msg_debug ("timeout while querying redis server");
        lua_redis_push_error ("timeout while connecting the server", ctx, sp_ud, FALSE);
 
        if (sp_ud->c->ctx) {
@@ -406,7 +413,6 @@ lua_redis_timeout (int fd, short what, gpointer u)
                 * This will call all callbacks pending so the entire context
                 * will be destructed
                 */
-               sp_ud->c->terminated = 1;
                redisAsyncFree (ac);
        }
        REDIS_RELEASE (ctx);
@@ -584,7 +590,7 @@ lua_redis_make_request (lua_State *L)
                        ud->task = task;
                        ud->L = L;
 
-                       sp_ud = g_slice_alloc (sizeof (*sp_ud));
+                       sp_ud = g_slice_alloc0 (sizeof (*sp_ud));
                        sp_ud->cbref = cbref;
                        sp_ud->c = ud;
 
@@ -632,7 +638,7 @@ lua_redis_make_request (lua_State *L)
                        }
 
 
-                       sp_ud = g_slice_alloc (sizeof (*sp_ud));
+                       sp_ud = g_slice_alloc0 (sizeof (*sp_ud));
                        sp_ud->cbref = cbref;
                        sp_ud->c = ud;
                        cmd = luaL_checkstring (L, args_pos);
@@ -1082,7 +1088,7 @@ lua_redis_add_cmd (lua_State *L)
                                return luaL_error (L, "invalid arguments");
                        }
 
-                       sp_ud = g_slice_alloc (sizeof (*sp_ud));
+                       sp_ud = g_slice_alloc0 (sizeof (*sp_ud));
                        sp_ud->cbref = cbref;
                        sp_ud->c = &ctx->d.async;
                        sp_ud->ctx = ctx;