|
|
@@ -122,6 +122,7 @@ INIT_LOG_MODULE(lua_redis) |
|
|
|
#define LUA_REDIS_TEXTDATA (1 << 1) |
|
|
|
#define LUA_REDIS_TERMINATED (1 << 2) |
|
|
|
#define LUA_REDIS_NO_POOL (1 << 3) |
|
|
|
#define LUA_REDIS_SUBSCRIBED (1 << 4) |
|
|
|
#define IS_ASYNC(ctx) ((ctx)->flags & LUA_REDIS_ASYNC) |
|
|
|
|
|
|
|
struct lua_redis_request_specific_userdata { |
|
|
@@ -263,7 +264,9 @@ lua_redis_fin (void *arg) |
|
|
|
ctx = sp_ud->ctx; |
|
|
|
ud = sp_ud->c; |
|
|
|
|
|
|
|
ev_timer_stop (sp_ud->ctx->async.event_loop, &sp_ud->timeout_ev); |
|
|
|
if (ev_can_stop (&sp_ud->timeout_ev)) { |
|
|
|
ev_timer_stop (sp_ud->ctx->async.event_loop, &sp_ud->timeout_ev); |
|
|
|
} |
|
|
|
|
|
|
|
msg_debug_lua_redis ("finished redis query %p from session %p; refcount=%d", |
|
|
|
sp_ud, ctx, ctx->ref.refcount); |
|
|
@@ -383,7 +386,8 @@ lua_redis_push_data (const redisReply *r, struct lua_redis_ctx *ctx, |
|
|
|
struct lua_callback_state cbs; |
|
|
|
lua_State *L; |
|
|
|
|
|
|
|
if (!(sp_ud->flags & (LUA_REDIS_SPECIFIC_REPLIED|LUA_REDIS_SPECIFIC_FINISHED))) { |
|
|
|
if (!(sp_ud->flags & (LUA_REDIS_SPECIFIC_REPLIED|LUA_REDIS_SPECIFIC_FINISHED)) || |
|
|
|
(sp_ud->flags & LUA_REDIS_SUBSCRIBED)) { |
|
|
|
if (sp_ud->cbref != -1) { |
|
|
|
lua_thread_pool_prepare_callback (ud->cfg->lua_thread_pool, &cbs); |
|
|
|
L = cbs.L; |
|
|
@@ -409,17 +413,29 @@ lua_redis_push_data (const redisReply *r, struct lua_redis_ctx *ctx, |
|
|
|
lua_thread_pool_restore_callback (&cbs); |
|
|
|
} |
|
|
|
|
|
|
|
if (sp_ud->flags & LUA_REDIS_SUBSCRIBED) { |
|
|
|
if (!(sp_ud->flags & LUA_REDIS_SPECIFIC_REPLIED)) { |
|
|
|
if (ev_can_stop (&sp_ud->timeout_ev)) { |
|
|
|
ev_timer_stop (sp_ud->ctx->async.event_loop, |
|
|
|
&sp_ud->timeout_ev); |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
sp_ud->flags |= LUA_REDIS_SPECIFIC_REPLIED; |
|
|
|
|
|
|
|
if (ud->s) { |
|
|
|
if (ud->item) { |
|
|
|
rspamd_symcache_item_async_dec_check (ud->task, ud->item, M); |
|
|
|
} |
|
|
|
if (!(sp_ud->flags & LUA_REDIS_SUBSCRIBED)) { |
|
|
|
if (ud->s) { |
|
|
|
if (ud->item) { |
|
|
|
rspamd_symcache_item_async_dec_check (ud->task, |
|
|
|
ud->item, M); |
|
|
|
} |
|
|
|
|
|
|
|
rspamd_session_remove_event (ud->s, lua_redis_fin, sp_ud); |
|
|
|
} |
|
|
|
else { |
|
|
|
lua_redis_fin (sp_ud); |
|
|
|
rspamd_session_remove_event (ud->s, lua_redis_fin, sp_ud); |
|
|
|
} |
|
|
|
else { |
|
|
|
lua_redis_fin (sp_ud); |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
@@ -453,7 +469,8 @@ lua_redis_callback (redisAsyncContext *c, gpointer r, gpointer priv) |
|
|
|
REDIS_RETAIN (ctx); |
|
|
|
|
|
|
|
/* If session is finished, we cannot call lua callbacks */ |
|
|
|
if (!(sp_ud->flags & LUA_REDIS_SPECIFIC_FINISHED)) { |
|
|
|
if (!(sp_ud->flags & LUA_REDIS_SPECIFIC_FINISHED) || |
|
|
|
(sp_ud->flags & LUA_REDIS_SUBSCRIBED)) { |
|
|
|
if (c->err == 0) { |
|
|
|
if (r != NULL) { |
|
|
|
if (reply->type != REDIS_REPLY_ERROR) { |
|
|
@@ -477,20 +494,22 @@ lua_redis_callback (redisAsyncContext *c, gpointer r, gpointer priv) |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
ctx->cmds_pending --; |
|
|
|
if (!(sp_ud->flags & LUA_REDIS_SUBSCRIBED)) { |
|
|
|
ctx->cmds_pending--; |
|
|
|
|
|
|
|
if (ctx->cmds_pending == 0 && !ud->terminated) { |
|
|
|
/* Disconnect redis early as we don't need it anymore */ |
|
|
|
ud->terminated = 1; |
|
|
|
ac = ud->ctx; |
|
|
|
ud->ctx = NULL; |
|
|
|
if (ctx->cmds_pending == 0 && !ud->terminated) { |
|
|
|
/* Disconnect redis early as we don't need it anymore */ |
|
|
|
ud->terminated = 1; |
|
|
|
ac = ud->ctx; |
|
|
|
ud->ctx = NULL; |
|
|
|
|
|
|
|
if (ac) { |
|
|
|
msg_debug_lua_redis ("release redis connection ud=%p; ctx=%p; refcount=%d", |
|
|
|
ud, ctx, ctx->ref.refcount); |
|
|
|
rspamd_redis_pool_release_connection (ud->pool, ac, |
|
|
|
(ctx->flags & LUA_REDIS_NO_POOL) ? |
|
|
|
RSPAMD_REDIS_RELEASE_ENFORCE : RSPAMD_REDIS_RELEASE_DEFAULT); |
|
|
|
if (ac) { |
|
|
|
msg_debug_lua_redis ("release redis connection ud=%p; ctx=%p; refcount=%d", |
|
|
|
ud, ctx, ctx->ref.refcount); |
|
|
|
rspamd_redis_pool_release_connection (ud->pool, ac, |
|
|
|
(ctx->flags & LUA_REDIS_NO_POOL) ? |
|
|
|
RSPAMD_REDIS_RELEASE_ENFORCE : RSPAMD_REDIS_RELEASE_DEFAULT); |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
@@ -586,7 +605,10 @@ lua_redis_callback_sync (redisAsyncContext *ac, gpointer r, gpointer priv) |
|
|
|
return; |
|
|
|
} |
|
|
|
|
|
|
|
ev_timer_stop (ud->event_loop, &sp_ud->timeout_ev); |
|
|
|
if (ev_can_stop ( &sp_ud->timeout_ev)) { |
|
|
|
ev_timer_stop (ud->event_loop, &sp_ud->timeout_ev); |
|
|
|
} |
|
|
|
|
|
|
|
msg_debug_lua_redis ("got reply from redis: %p for query %p", ac, sp_ud); |
|
|
|
|
|
|
|
struct lua_redis_result *result = g_malloc0 (sizeof *result); |
|
|
@@ -617,6 +639,7 @@ lua_redis_callback_sync (redisAsyncContext *ac, gpointer r, gpointer priv) |
|
|
|
lua_pushstring (L, ac->errstr); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
/* if error happened, we should terminate the connection, |
|
|
|
and release it */ |
|
|
|
|
|
|
@@ -1125,10 +1148,17 @@ lua_redis_make_request (lua_State *L) |
|
|
|
|
|
|
|
REDIS_RETAIN (ctx); /* Cleared by fin event */ |
|
|
|
ctx->cmds_pending ++; |
|
|
|
|
|
|
|
if (ud->ctx->c.flags & REDIS_SUBSCRIBED) { |
|
|
|
msg_debug_lua_redis ("subscribe command, never unref/timeout"); |
|
|
|
sp_ud->flags |= LUA_REDIS_SUBSCRIBED; |
|
|
|
} |
|
|
|
|
|
|
|
sp_ud->timeout_ev.data = sp_ud; |
|
|
|
ev_now_update_if_cheap ((struct ev_loop *)ud->event_loop); |
|
|
|
ev_timer_init (&sp_ud->timeout_ev, lua_redis_timeout, timeout, 0.0); |
|
|
|
ev_timer_start (ud->event_loop, &sp_ud->timeout_ev); |
|
|
|
|
|
|
|
ret = TRUE; |
|
|
|
} |
|
|
|
else { |