diff options
author | Vsevolod Stakhov <vsevolod@highsecure.ru> | 2020-05-19 21:00:16 +0100 |
---|---|---|
committer | Vsevolod Stakhov <vsevolod@highsecure.ru> | 2020-05-19 21:00:16 +0100 |
commit | 48036e40210693fbe552016bfe0deb35796438af (patch) | |
tree | bf8cf49fa46fefe3e3057d576fafe25fc61c6324 /src/lua/lua_redis.c | |
parent | 5d09c6fb3d8a413d8ead10ada657f6e459523216 (diff) | |
download | rspamd-48036e40210693fbe552016bfe0deb35796438af.tar.gz rspamd-48036e40210693fbe552016bfe0deb35796438af.zip |
[Feature] Initial support of subscribe command in lua_redis
Diffstat (limited to 'src/lua/lua_redis.c')
-rw-r--r-- | src/lua/lua_redis.c | 78 |
1 files changed, 54 insertions, 24 deletions
diff --git a/src/lua/lua_redis.c b/src/lua/lua_redis.c index f9dbbdd13..492c63906 100644 --- a/src/lua/lua_redis.c +++ b/src/lua/lua_redis.c @@ -122,6 +122,7 @@ INIT_LOG_MODULE(lua_redis) #define LUA_REDIS_TEXTDATA (1 << 1) #define LUA_REDIS_TERMINATED (1 << 2) #define LUA_REDIS_NO_POOL (1 << 3) +#define LUA_REDIS_SUBSCRIBED (1 << 4) #define IS_ASYNC(ctx) ((ctx)->flags & LUA_REDIS_ASYNC) struct lua_redis_request_specific_userdata { @@ -263,7 +264,9 @@ lua_redis_fin (void *arg) ctx = sp_ud->ctx; ud = sp_ud->c; - ev_timer_stop (sp_ud->ctx->async.event_loop, &sp_ud->timeout_ev); + if (ev_can_stop (&sp_ud->timeout_ev)) { + ev_timer_stop (sp_ud->ctx->async.event_loop, &sp_ud->timeout_ev); + } msg_debug_lua_redis ("finished redis query %p from session %p; refcount=%d", sp_ud, ctx, ctx->ref.refcount); @@ -383,7 +386,8 @@ lua_redis_push_data (const redisReply *r, struct lua_redis_ctx *ctx, struct lua_callback_state cbs; lua_State *L; - if (!(sp_ud->flags & (LUA_REDIS_SPECIFIC_REPLIED|LUA_REDIS_SPECIFIC_FINISHED))) { + if (!(sp_ud->flags & (LUA_REDIS_SPECIFIC_REPLIED|LUA_REDIS_SPECIFIC_FINISHED)) || + (sp_ud->flags & LUA_REDIS_SUBSCRIBED)) { if (sp_ud->cbref != -1) { lua_thread_pool_prepare_callback (ud->cfg->lua_thread_pool, &cbs); L = cbs.L; @@ -409,17 +413,29 @@ lua_redis_push_data (const redisReply *r, struct lua_redis_ctx *ctx, lua_thread_pool_restore_callback (&cbs); } + if (sp_ud->flags & LUA_REDIS_SUBSCRIBED) { + if (!(sp_ud->flags & LUA_REDIS_SPECIFIC_REPLIED)) { + if (ev_can_stop (&sp_ud->timeout_ev)) { + ev_timer_stop (sp_ud->ctx->async.event_loop, + &sp_ud->timeout_ev); + } + } + } + sp_ud->flags |= LUA_REDIS_SPECIFIC_REPLIED; - if (ud->s) { - if (ud->item) { - rspamd_symcache_item_async_dec_check (ud->task, ud->item, M); - } + if (!(sp_ud->flags & LUA_REDIS_SUBSCRIBED)) { + if (ud->s) { + if (ud->item) { + rspamd_symcache_item_async_dec_check (ud->task, + ud->item, M); + } - rspamd_session_remove_event (ud->s, lua_redis_fin, sp_ud); - } - else { - lua_redis_fin (sp_ud); + rspamd_session_remove_event (ud->s, lua_redis_fin, sp_ud); + } + else { + lua_redis_fin (sp_ud); + } } } } @@ -453,7 +469,8 @@ lua_redis_callback (redisAsyncContext *c, gpointer r, gpointer priv) REDIS_RETAIN (ctx); /* If session is finished, we cannot call lua callbacks */ - if (!(sp_ud->flags & LUA_REDIS_SPECIFIC_FINISHED)) { + if (!(sp_ud->flags & LUA_REDIS_SPECIFIC_FINISHED) || + (sp_ud->flags & LUA_REDIS_SUBSCRIBED)) { if (c->err == 0) { if (r != NULL) { if (reply->type != REDIS_REPLY_ERROR) { @@ -477,20 +494,22 @@ lua_redis_callback (redisAsyncContext *c, gpointer r, gpointer priv) } } - ctx->cmds_pending --; + if (!(sp_ud->flags & LUA_REDIS_SUBSCRIBED)) { + ctx->cmds_pending--; - if (ctx->cmds_pending == 0 && !ud->terminated) { - /* Disconnect redis early as we don't need it anymore */ - ud->terminated = 1; - ac = ud->ctx; - ud->ctx = NULL; + if (ctx->cmds_pending == 0 && !ud->terminated) { + /* Disconnect redis early as we don't need it anymore */ + ud->terminated = 1; + ac = ud->ctx; + ud->ctx = NULL; - if (ac) { - msg_debug_lua_redis ("release redis connection ud=%p; ctx=%p; refcount=%d", - ud, ctx, ctx->ref.refcount); - rspamd_redis_pool_release_connection (ud->pool, ac, - (ctx->flags & LUA_REDIS_NO_POOL) ? - RSPAMD_REDIS_RELEASE_ENFORCE : RSPAMD_REDIS_RELEASE_DEFAULT); + if (ac) { + msg_debug_lua_redis ("release redis connection ud=%p; ctx=%p; refcount=%d", + ud, ctx, ctx->ref.refcount); + rspamd_redis_pool_release_connection (ud->pool, ac, + (ctx->flags & LUA_REDIS_NO_POOL) ? + RSPAMD_REDIS_RELEASE_ENFORCE : RSPAMD_REDIS_RELEASE_DEFAULT); + } } } @@ -586,7 +605,10 @@ lua_redis_callback_sync (redisAsyncContext *ac, gpointer r, gpointer priv) return; } - ev_timer_stop (ud->event_loop, &sp_ud->timeout_ev); + if (ev_can_stop ( &sp_ud->timeout_ev)) { + ev_timer_stop (ud->event_loop, &sp_ud->timeout_ev); + } + msg_debug_lua_redis ("got reply from redis: %p for query %p", ac, sp_ud); struct lua_redis_result *result = g_malloc0 (sizeof *result); @@ -617,6 +639,7 @@ lua_redis_callback_sync (redisAsyncContext *ac, gpointer r, gpointer priv) lua_pushstring (L, ac->errstr); } } + /* if error happened, we should terminate the connection, and release it */ @@ -1125,10 +1148,17 @@ lua_redis_make_request (lua_State *L) REDIS_RETAIN (ctx); /* Cleared by fin event */ ctx->cmds_pending ++; + + if (ud->ctx->c.flags & REDIS_SUBSCRIBED) { + msg_debug_lua_redis ("subscribe command, never unref/timeout"); + sp_ud->flags |= LUA_REDIS_SUBSCRIBED; + } + sp_ud->timeout_ev.data = sp_ud; ev_now_update_if_cheap ((struct ev_loop *)ud->event_loop); ev_timer_init (&sp_ud->timeout_ev, lua_redis_timeout, timeout, 0.0); ev_timer_start (ud->event_loop, &sp_ud->timeout_ev); + ret = TRUE; } else { |