]> source.dussan.org Git - rspamd.git/commitdiff
[Feature] Initial support of subscribe command in lua_redis
authorVsevolod Stakhov <vsevolod@highsecure.ru>
Tue, 19 May 2020 20:00:16 +0000 (21:00 +0100)
committerVsevolod Stakhov <vsevolod@highsecure.ru>
Tue, 19 May 2020 20:00:16 +0000 (21:00 +0100)
src/lua/lua_redis.c

index f9dbbdd1333e654b8d8be3ef396a87557767eeb7..492c639067140f0bca4c344ff59406800a9ccaf7 100644 (file)
@@ -122,6 +122,7 @@ INIT_LOG_MODULE(lua_redis)
 #define LUA_REDIS_TEXTDATA (1 << 1)
 #define LUA_REDIS_TERMINATED (1 << 2)
 #define LUA_REDIS_NO_POOL (1 << 3)
+#define LUA_REDIS_SUBSCRIBED (1 << 4)
 #define IS_ASYNC(ctx) ((ctx)->flags & LUA_REDIS_ASYNC)
 
 struct lua_redis_request_specific_userdata {
@@ -263,7 +264,9 @@ lua_redis_fin (void *arg)
        ctx = sp_ud->ctx;
        ud = sp_ud->c;
 
-       ev_timer_stop (sp_ud->ctx->async.event_loop, &sp_ud->timeout_ev);
+       if (ev_can_stop (&sp_ud->timeout_ev)) {
+               ev_timer_stop (sp_ud->ctx->async.event_loop, &sp_ud->timeout_ev);
+       }
 
        msg_debug_lua_redis ("finished redis query %p from session %p; refcount=%d",
                        sp_ud, ctx, ctx->ref.refcount);
@@ -383,7 +386,8 @@ lua_redis_push_data (const redisReply *r, struct lua_redis_ctx *ctx,
        struct lua_callback_state cbs;
        lua_State *L;
 
-       if (!(sp_ud->flags & (LUA_REDIS_SPECIFIC_REPLIED|LUA_REDIS_SPECIFIC_FINISHED))) {
+       if (!(sp_ud->flags & (LUA_REDIS_SPECIFIC_REPLIED|LUA_REDIS_SPECIFIC_FINISHED)) ||
+                       (sp_ud->flags & LUA_REDIS_SUBSCRIBED)) {
                if (sp_ud->cbref != -1) {
                        lua_thread_pool_prepare_callback (ud->cfg->lua_thread_pool, &cbs);
                        L = cbs.L;
@@ -409,17 +413,29 @@ lua_redis_push_data (const redisReply *r, struct lua_redis_ctx *ctx,
                        lua_thread_pool_restore_callback (&cbs);
                }
 
+               if (sp_ud->flags & LUA_REDIS_SUBSCRIBED) {
+                       if (!(sp_ud->flags & LUA_REDIS_SPECIFIC_REPLIED)) {
+                               if (ev_can_stop (&sp_ud->timeout_ev)) {
+                                       ev_timer_stop (sp_ud->ctx->async.event_loop,
+                                                       &sp_ud->timeout_ev);
+                               }
+                       }
+               }
+
                sp_ud->flags |= LUA_REDIS_SPECIFIC_REPLIED;
 
-               if (ud->s) {
-                       if (ud->item) {
-                               rspamd_symcache_item_async_dec_check (ud->task, ud->item, M);
-                       }
+               if (!(sp_ud->flags & LUA_REDIS_SUBSCRIBED)) {
+                       if (ud->s) {
+                               if (ud->item) {
+                                       rspamd_symcache_item_async_dec_check (ud->task,
+                                                       ud->item, M);
+                               }
 
-                       rspamd_session_remove_event (ud->s, lua_redis_fin, sp_ud);
-               }
-               else {
-                       lua_redis_fin (sp_ud);
+                               rspamd_session_remove_event (ud->s, lua_redis_fin, sp_ud);
+                       }
+                       else {
+                               lua_redis_fin (sp_ud);
+                       }
                }
        }
 }
@@ -453,7 +469,8 @@ lua_redis_callback (redisAsyncContext *c, gpointer r, gpointer priv)
        REDIS_RETAIN (ctx);
 
        /* If session is finished, we cannot call lua callbacks */
-       if (!(sp_ud->flags & LUA_REDIS_SPECIFIC_FINISHED)) {
+       if (!(sp_ud->flags & LUA_REDIS_SPECIFIC_FINISHED) ||
+                       (sp_ud->flags & LUA_REDIS_SUBSCRIBED)) {
                if (c->err == 0) {
                        if (r != NULL) {
                                if (reply->type != REDIS_REPLY_ERROR) {
@@ -477,20 +494,22 @@ lua_redis_callback (redisAsyncContext *c, gpointer r, gpointer priv)
                }
        }
 
-       ctx->cmds_pending --;
+       if (!(sp_ud->flags & LUA_REDIS_SUBSCRIBED)) {
+               ctx->cmds_pending--;
 
-       if (ctx->cmds_pending == 0 && !ud->terminated) {
-               /* Disconnect redis early as we don't need it anymore */
-               ud->terminated = 1;
-               ac = ud->ctx;
-               ud->ctx = NULL;
+               if (ctx->cmds_pending == 0 && !ud->terminated) {
+                       /* Disconnect redis early as we don't need it anymore */
+                       ud->terminated = 1;
+                       ac = ud->ctx;
+                       ud->ctx = NULL;
 
-               if (ac) {
-                       msg_debug_lua_redis ("release redis connection ud=%p; ctx=%p; refcount=%d",
-                                       ud, ctx, ctx->ref.refcount);
-                       rspamd_redis_pool_release_connection (ud->pool, ac,
-                                       (ctx->flags & LUA_REDIS_NO_POOL) ?
-                                       RSPAMD_REDIS_RELEASE_ENFORCE : RSPAMD_REDIS_RELEASE_DEFAULT);
+                       if (ac) {
+                               msg_debug_lua_redis ("release redis connection ud=%p; ctx=%p; refcount=%d",
+                                               ud, ctx, ctx->ref.refcount);
+                               rspamd_redis_pool_release_connection (ud->pool, ac,
+                                               (ctx->flags & LUA_REDIS_NO_POOL) ?
+                                               RSPAMD_REDIS_RELEASE_ENFORCE : RSPAMD_REDIS_RELEASE_DEFAULT);
+                       }
                }
        }
 
@@ -586,7 +605,10 @@ lua_redis_callback_sync (redisAsyncContext *ac, gpointer r, gpointer priv)
                return;
        }
 
-       ev_timer_stop (ud->event_loop, &sp_ud->timeout_ev);
+       if (ev_can_stop ( &sp_ud->timeout_ev)) {
+               ev_timer_stop (ud->event_loop, &sp_ud->timeout_ev);
+       }
+
        msg_debug_lua_redis ("got reply from redis: %p for query %p", ac, sp_ud);
 
        struct lua_redis_result *result = g_malloc0 (sizeof *result);
@@ -617,6 +639,7 @@ lua_redis_callback_sync (redisAsyncContext *ac, gpointer r, gpointer priv)
                        lua_pushstring (L, ac->errstr);
                }
        }
+
        /* if error happened, we should terminate the connection,
           and release it */
 
@@ -1125,10 +1148,17 @@ lua_redis_make_request (lua_State *L)
 
                        REDIS_RETAIN (ctx); /* Cleared by fin event */
                        ctx->cmds_pending ++;
+
+                       if (ud->ctx->c.flags & REDIS_SUBSCRIBED) {
+                               msg_debug_lua_redis ("subscribe command, never unref/timeout");
+                               sp_ud->flags |= LUA_REDIS_SUBSCRIBED;
+                       }
+
                        sp_ud->timeout_ev.data = sp_ud;
                        ev_now_update_if_cheap ((struct ev_loop *)ud->event_loop);
                        ev_timer_init (&sp_ud->timeout_ev, lua_redis_timeout, timeout, 0.0);
                        ev_timer_start (ud->event_loop, &sp_ud->timeout_ev);
+
                        ret = TRUE;
                }
                else {