#define LUA_REDIS_TEXTDATA (1 << 1)
#define LUA_REDIS_TERMINATED (1 << 2)
#define LUA_REDIS_NO_POOL (1 << 3)
+#define LUA_REDIS_SUBSCRIBED (1 << 4)
#define IS_ASYNC(ctx) ((ctx)->flags & LUA_REDIS_ASYNC)
struct lua_redis_request_specific_userdata {
ctx = sp_ud->ctx;
ud = sp_ud->c;
- ev_timer_stop (sp_ud->ctx->async.event_loop, &sp_ud->timeout_ev);
+ if (ev_can_stop (&sp_ud->timeout_ev)) {
+ ev_timer_stop (sp_ud->ctx->async.event_loop, &sp_ud->timeout_ev);
+ }
msg_debug_lua_redis ("finished redis query %p from session %p; refcount=%d",
sp_ud, ctx, ctx->ref.refcount);
struct lua_callback_state cbs;
lua_State *L;
- if (!(sp_ud->flags & (LUA_REDIS_SPECIFIC_REPLIED|LUA_REDIS_SPECIFIC_FINISHED))) {
+ if (!(sp_ud->flags & (LUA_REDIS_SPECIFIC_REPLIED|LUA_REDIS_SPECIFIC_FINISHED)) ||
+ (sp_ud->flags & LUA_REDIS_SUBSCRIBED)) {
if (sp_ud->cbref != -1) {
lua_thread_pool_prepare_callback (ud->cfg->lua_thread_pool, &cbs);
L = cbs.L;
lua_thread_pool_restore_callback (&cbs);
}
+ if (sp_ud->flags & LUA_REDIS_SUBSCRIBED) {
+ if (!(sp_ud->flags & LUA_REDIS_SPECIFIC_REPLIED)) {
+ if (ev_can_stop (&sp_ud->timeout_ev)) {
+ ev_timer_stop (sp_ud->ctx->async.event_loop,
+ &sp_ud->timeout_ev);
+ }
+ }
+ }
+
sp_ud->flags |= LUA_REDIS_SPECIFIC_REPLIED;
- if (ud->s) {
- if (ud->item) {
- rspamd_symcache_item_async_dec_check (ud->task, ud->item, M);
- }
+ if (!(sp_ud->flags & LUA_REDIS_SUBSCRIBED)) {
+ if (ud->s) {
+ if (ud->item) {
+ rspamd_symcache_item_async_dec_check (ud->task,
+ ud->item, M);
+ }
- rspamd_session_remove_event (ud->s, lua_redis_fin, sp_ud);
- }
- else {
- lua_redis_fin (sp_ud);
+ rspamd_session_remove_event (ud->s, lua_redis_fin, sp_ud);
+ }
+ else {
+ lua_redis_fin (sp_ud);
+ }
}
}
}
REDIS_RETAIN (ctx);
/* If session is finished, we cannot call lua callbacks */
- if (!(sp_ud->flags & LUA_REDIS_SPECIFIC_FINISHED)) {
+ if (!(sp_ud->flags & LUA_REDIS_SPECIFIC_FINISHED) ||
+ (sp_ud->flags & LUA_REDIS_SUBSCRIBED)) {
if (c->err == 0) {
if (r != NULL) {
if (reply->type != REDIS_REPLY_ERROR) {
}
}
- ctx->cmds_pending --;
+ if (!(sp_ud->flags & LUA_REDIS_SUBSCRIBED)) {
+ ctx->cmds_pending--;
- if (ctx->cmds_pending == 0 && !ud->terminated) {
- /* Disconnect redis early as we don't need it anymore */
- ud->terminated = 1;
- ac = ud->ctx;
- ud->ctx = NULL;
+ if (ctx->cmds_pending == 0 && !ud->terminated) {
+ /* Disconnect redis early as we don't need it anymore */
+ ud->terminated = 1;
+ ac = ud->ctx;
+ ud->ctx = NULL;
- if (ac) {
- msg_debug_lua_redis ("release redis connection ud=%p; ctx=%p; refcount=%d",
- ud, ctx, ctx->ref.refcount);
- rspamd_redis_pool_release_connection (ud->pool, ac,
- (ctx->flags & LUA_REDIS_NO_POOL) ?
- RSPAMD_REDIS_RELEASE_ENFORCE : RSPAMD_REDIS_RELEASE_DEFAULT);
+ if (ac) {
+ msg_debug_lua_redis ("release redis connection ud=%p; ctx=%p; refcount=%d",
+ ud, ctx, ctx->ref.refcount);
+ rspamd_redis_pool_release_connection (ud->pool, ac,
+ (ctx->flags & LUA_REDIS_NO_POOL) ?
+ RSPAMD_REDIS_RELEASE_ENFORCE : RSPAMD_REDIS_RELEASE_DEFAULT);
+ }
}
}
return;
}
- ev_timer_stop (ud->event_loop, &sp_ud->timeout_ev);
+ if (ev_can_stop ( &sp_ud->timeout_ev)) {
+ ev_timer_stop (ud->event_loop, &sp_ud->timeout_ev);
+ }
+
msg_debug_lua_redis ("got reply from redis: %p for query %p", ac, sp_ud);
struct lua_redis_result *result = g_malloc0 (sizeof *result);
lua_pushstring (L, ac->errstr);
}
}
+
/* if error happened, we should terminate the connection,
and release it */
REDIS_RETAIN (ctx); /* Cleared by fin event */
ctx->cmds_pending ++;
+
+ if (ud->ctx->c.flags & REDIS_SUBSCRIBED) {
+ msg_debug_lua_redis ("subscribe command, never unref/timeout");
+ sp_ud->flags |= LUA_REDIS_SUBSCRIBED;
+ }
+
sp_ud->timeout_ev.data = sp_ud;
ev_now_update_if_cheap ((struct ev_loop *)ud->event_loop);
ev_timer_init (&sp_ud->timeout_ev, lua_redis_timeout, timeout, 0.0);
ev_timer_start (ud->event_loop, &sp_ud->timeout_ev);
+
ret = TRUE;
}
else {