]> source.dussan.org Git - rspamd.git/commitdiff
[Minor] Try to fix sync Redis API by not resuming LUA_REDIS_SPECIFIC_FINISHED threads
authorVsevolod Stakhov <vsevolod@highsecure.ru>
Tue, 27 Apr 2021 11:15:34 +0000 (12:15 +0100)
committerVsevolod Stakhov <vsevolod@highsecure.ru>
Tue, 27 Apr 2021 11:15:34 +0000 (12:15 +0100)
src/lua/lua_redis.c
src/lua/lua_thread_pool.c
src/lua/lua_thread_pool.h

index 8e9e11ddac33cff887e18985e19fc7c4ffeca3a1..b624fc43bf55b045562e451d9ef890f6682ebc4e 100644 (file)
@@ -612,75 +612,86 @@ lua_redis_callback_sync (redisAsyncContext *ac, gpointer r, gpointer priv)
                ev_timer_stop (ud->event_loop, &sp_ud->timeout_ev);
        }
 
-       msg_debug_lua_redis ("got reply from redis: %p for query %p", ac, sp_ud);
+       if (!(sp_ud->flags & LUA_REDIS_SPECIFIC_FINISHED)) {
+               msg_debug_lua_redis ("got reply from redis: %p for query %p", ac, sp_ud);
 
-       struct lua_redis_result *result = g_malloc0 (sizeof *result);
+               struct lua_redis_result *result = g_malloc0 (sizeof *result);
 
-       /* If session is finished, we cannot call lua callbacks */
-       if (ac->err == 0) {
-               if (r != NULL) {
-                       if (reply->type != REDIS_REPLY_ERROR) {
-                               result->is_error = FALSE;
-                               lua_redis_push_reply (L, reply, ctx->flags & LUA_REDIS_TEXTDATA);
+               if (ac->err == 0) {
+                       if (r != NULL) {
+                               if (reply->type != REDIS_REPLY_ERROR) {
+                                       result->is_error = FALSE;
+                                       lua_redis_push_reply (L, reply, ctx->flags & LUA_REDIS_TEXTDATA);
+                               }
+                               else {
+                                       result->is_error = TRUE;
+                                       lua_pushstring (L, reply->str);
+                               }
                        }
                        else {
                                result->is_error = TRUE;
-                               lua_pushstring (L, reply->str);
+                               lua_pushliteral (L, "received no data from server");
                        }
                }
                else {
                        result->is_error = TRUE;
-                       lua_pushliteral (L, "received no data from server");
-               }
-       }
-       else {
-               result->is_error = TRUE;
-               if (ac->err == REDIS_ERR_IO) {
-                       lua_pushstring (L, strerror (errno));
-               }
-               else {
-                       lua_pushstring (L, ac->errstr);
+                       if (ac->err == REDIS_ERR_IO) {
+                               lua_pushstring (L, strerror (errno));
+                       }
+                       else {
+                               lua_pushstring (L, ac->errstr);
+                       }
                }
-       }
 
-       /* if error happened, we should terminate the connection,
-          and release it */
+               /* if error happened, we should terminate the connection,
+                  and release it */
 
-       if (result->is_error && sp_ud->c->ctx) {
-               ac = sp_ud->c->ctx;
-               /* Set to NULL to avoid double free in dtor */
-               sp_ud->c->ctx = NULL;
-               ctx->flags |= LUA_REDIS_TERMINATED;
+               if (result->is_error && sp_ud->c->ctx) {
+                       ac = sp_ud->c->ctx;
+                       /* Set to NULL to avoid double free in dtor */
+                       sp_ud->c->ctx = NULL;
+                       ctx->flags |= LUA_REDIS_TERMINATED;
 
-               /*
-                * This will call all callbacks pending so the entire context
-                * will be destructed
-                */
-               rspamd_redis_pool_release_connection (sp_ud->c->pool, ac,
-                               RSPAMD_REDIS_RELEASE_FATAL);
-       }
+                       /*
+                        * This will call all callbacks pending so the entire context
+                        * will be destructed
+                        */
+                       rspamd_redis_pool_release_connection (sp_ud->c->pool, ac,
+                                       RSPAMD_REDIS_RELEASE_FATAL);
+               }
+
+               result->result_ref = luaL_ref (L, LUA_REGISTRYINDEX);
+               result->s = ud->s;
+               result->item = ud->item;
+               result->task = ud->task;
+               result->sp_ud = sp_ud;
 
-       result->result_ref = luaL_ref (L, LUA_REGISTRYINDEX);
-       result->s = ud->s;
-       result->item = ud->item;
-       result->task = ud->task;
-       result->sp_ud = sp_ud;
+               g_queue_push_tail (ctx->replies, result);
 
-       g_queue_push_tail (ctx->replies, result);
+       }
 
        ctx->cmds_pending --;
 
        if (ctx->cmds_pending == 0) {
                if (ctx->thread) {
-                       /* somebody yielded and waits for results */
-                       thread = ctx->thread;
-                       ctx->thread = NULL;
-
-                       results = lua_redis_push_results (ctx, thread->lua_state);
-                       lua_thread_resume (thread, results);
-                       lua_redis_cleanup_events (ctx);
+                       if (!(sp_ud->flags & LUA_REDIS_SPECIFIC_FINISHED)) {
+                               /* somebody yielded and waits for results */
+                               thread = ctx->thread;
+                               ctx->thread = NULL;
+
+                               results = lua_redis_push_results(ctx, thread->lua_state);
+                               lua_thread_resume (thread, results);
+                               lua_redis_cleanup_events(ctx);
+                       }
+                       else {
+                               /* We cannot resume the thread as the associated task has gone */
+                               lua_thread_pool_terminate_entry (ud->cfg->lua_thread_pool,
+                                               ctx->thread);
+                               ctx->thread = NULL;
+                       }
                }
        }
+
 }
 
 static void
@@ -692,6 +703,10 @@ lua_redis_timeout_sync (EV_P_ ev_timer *w, int revents)
        struct lua_redis_userdata *ud;
        redisAsyncContext *ac;
 
+       if (sp_ud->flags & LUA_REDIS_SPECIFIC_FINISHED) {
+               return;
+       }
+
        ud = sp_ud->c;
        ctx = sp_ud->ctx;
        msg_debug_lua_redis ("timeout while querying redis server: %p, redis: %p", sp_ud,
index 89a516a271218cafa731a21137090ee68cc55d32..01c55b4a9dcb9316064c02968a5dc9457498e81b 100644 (file)
@@ -157,8 +157,8 @@ lua_thread_pool_return_full (struct lua_thread_pool *pool,
        }
 }
 
-static void
-lua_thread_pool_terminate_entry (struct lua_thread_pool *pool,
+void
+lua_thread_pool_terminate_entry_full (struct lua_thread_pool *pool,
                struct thread_entry *thread_entry, const gchar *loc)
 {
        struct thread_entry *ent = NULL;
@@ -327,7 +327,7 @@ lua_resume_thread_internal_full (struct thread_entry *thread_entry,
                         * Maybe there is a way to recover here.
                         * For now, just remove faulty thread
                         */
-                       lua_thread_pool_terminate_entry (pool, thread_entry, loc);
+                       lua_thread_pool_terminate_entry_full (pool, thread_entry, loc);
                }
        }
 }
index be954271d8a7a150541bba7994e14bf5945fd488..66c8b991ce4bceccbe89e8a1533fb760dfab1d09 100644 (file)
@@ -183,6 +183,19 @@ lua_thread_resume_full (struct thread_entry *thread_entry,
 #define lua_thread_resume(thread_entry, narg) \
     lua_thread_resume_full (thread_entry, narg, G_STRLOC)
 
+/**
+ * Terminates thread pool entry and fill the pool with another thread entry if needed
+ * @param pool
+ * @param thread_entry
+ * @param loc
+ */
+void
+lua_thread_pool_terminate_entry_full (struct lua_thread_pool *pool,
+                                                                struct thread_entry *thread_entry,
+                                                                const gchar *loc);
+#define lua_thread_pool_terminate_entry(pool, thread_entry) \
+    lua_thread_pool_terminate_entry_full (pool, thread_entry, G_STRLOC)
+
 #ifdef  __cplusplus
 }
 #endif