@@ -71,6 +71,7 @@ local function redis_query_sentinel(ev_base, params, initialised) | |||
timeout = params.timeout, | |||
config = rspamd_config, | |||
ev_base = ev_base, | |||
no_pool = true, | |||
}) | |||
if not is_ok then |
@@ -140,7 +140,8 @@ rspamd_fuzzy_redis_session_dtor (struct rspamd_fuzzy_redis_session *session, | |||
ac = session->ctx; | |||
session->ctx = NULL; | |||
rspamd_redis_pool_release_connection (session->backend->pool, | |||
ac, is_fatal); | |||
ac, | |||
is_fatal ? RSPAMD_REDIS_RELEASE_FATAL : RSPAMD_REDIS_RELEASE_DEFAULT); | |||
} | |||
ev_timer_stop (session->event_loop, &session->timeout); | |||
@@ -290,7 +291,7 @@ rspamd_fuzzy_redis_timeout (EV_P_ ev_timer *w, int revents) | |||
/* This will cause session closing */ | |||
rspamd_redis_pool_release_connection (session->backend->pool, | |||
ac, TRUE); | |||
ac, RSPAMD_REDIS_RELEASE_FATAL); | |||
} | |||
} | |||
@@ -395,7 +395,7 @@ rspamd_redis_pool_connect (struct rspamd_redis_pool *pool, | |||
void | |||
rspamd_redis_pool_release_connection (struct rspamd_redis_pool *pool, | |||
struct redisAsyncContext *ctx, gboolean is_fatal) | |||
struct redisAsyncContext *ctx, enum rspamd_redis_pool_release_type how) | |||
{ | |||
struct rspamd_redis_pool_connection *conn; | |||
@@ -406,24 +406,38 @@ rspamd_redis_pool_release_connection (struct rspamd_redis_pool *pool, | |||
if (conn != NULL) { | |||
g_assert (conn->active); | |||
if (is_fatal || ctx->err != REDIS_OK) { | |||
if (ctx->err != REDIS_OK) { | |||
/* We need to terminate connection forcefully */ | |||
msg_debug_rpool ("closed connection %p forcefully", conn->ctx); | |||
msg_debug_rpool ("closed connection %p due to an error", conn->ctx); | |||
REF_RELEASE (conn); | |||
} | |||
else { | |||
/* Ensure that there are no callbacks attached to this conn */ | |||
if (ctx->replies.head == NULL) { | |||
/* Just move it to the inactive queue */ | |||
g_queue_unlink (conn->elt->active, conn->entry); | |||
g_queue_push_head_link (conn->elt->inactive, conn->entry); | |||
conn->active = FALSE; | |||
rspamd_redis_pool_schedule_timeout (conn); | |||
msg_debug_rpool ("mark connection %p inactive", conn->ctx); | |||
if (how == RSPAMD_REDIS_RELEASE_DEFAULT) { | |||
/* Ensure that there are no callbacks attached to this conn */ | |||
if (ctx->replies.head == NULL) { | |||
/* Just move it to the inactive queue */ | |||
g_queue_unlink (conn->elt->active, conn->entry); | |||
g_queue_push_head_link (conn->elt->inactive, conn->entry); | |||
conn->active = FALSE; | |||
rspamd_redis_pool_schedule_timeout (conn); | |||
msg_debug_rpool ("mark connection %p inactive", conn->ctx); | |||
} | |||
else { | |||
msg_debug_rpool ("closed connection %p due to callbacks left", | |||
conn->ctx); | |||
REF_RELEASE (conn); | |||
} | |||
} | |||
else { | |||
msg_debug_rpool ("closed connection %p due to callbacks left", | |||
conn->ctx); | |||
if (how == RSPAMD_REDIS_RELEASE_FATAL) { | |||
msg_debug_rpool ("closed connection %p due to an fatal termination", | |||
conn->ctx); | |||
} | |||
else { | |||
msg_debug_rpool ("closed connection %p due to explicit termination", | |||
conn->ctx); | |||
} | |||
REF_RELEASE (conn); | |||
} | |||
} |
@@ -53,13 +53,19 @@ struct redisAsyncContext* rspamd_redis_pool_connect ( | |||
const gchar *db, const gchar *password, | |||
const char *ip, int port); | |||
enum rspamd_redis_pool_release_type { | |||
RSPAMD_REDIS_RELEASE_DEFAULT = 0, | |||
RSPAMD_REDIS_RELEASE_FATAL = 1, | |||
RSPAMD_REDIS_RELEASE_ENFORCE | |||
}; | |||
/** | |||
* Release a connection to the pool | |||
* @param pool | |||
* @param ctx | |||
*/ | |||
void rspamd_redis_pool_release_connection (struct rspamd_redis_pool *pool, | |||
struct redisAsyncContext *ctx, gboolean is_fatal); | |||
struct redisAsyncContext *ctx, enum rspamd_redis_pool_release_type how); | |||
/** | |||
* Stops redis pool and destroys it |
@@ -114,6 +114,7 @@ struct lua_redis_userdata { | |||
#define LUA_REDIS_ASYNC (1 << 0) | |||
#define LUA_REDIS_TEXTDATA (1 << 1) | |||
#define LUA_REDIS_TERMINATED (1 << 2) | |||
#define LUA_REDIS_NO_POOL (1 << 3) | |||
#define IS_ASYNC(ctx) ((ctx)->flags & LUA_REDIS_ASYNC) | |||
struct lua_redis_request_specific_userdata { | |||
@@ -198,7 +199,17 @@ lua_redis_dtor (struct lua_redis_ctx *ctx) | |||
ud->terminated = 1; | |||
ac = ud->ctx; | |||
ud->ctx = NULL; | |||
rspamd_redis_pool_release_connection (ud->pool, ac, !is_successful); | |||
if (!is_successful) { | |||
rspamd_redis_pool_release_connection (ud->pool, ac, | |||
RSPAMD_REDIS_RELEASE_FATAL); | |||
} | |||
else { | |||
rspamd_redis_pool_release_connection (ud->pool, ac, | |||
(ctx->flags & LUA_REDIS_NO_POOL) ? | |||
RSPAMD_REDIS_RELEASE_ENFORCE : RSPAMD_REDIS_RELEASE_DEFAULT); | |||
} | |||
} | |||
LL_FOREACH_SAFE (ud->specific, cur, tmp) { | |||
@@ -456,7 +467,9 @@ lua_redis_callback (redisAsyncContext *c, gpointer r, gpointer priv) | |||
ud->ctx = NULL; | |||
if (ac) { | |||
rspamd_redis_pool_release_connection (ud->pool, ac, FALSE); | |||
rspamd_redis_pool_release_connection (ud->pool, ac, | |||
(ctx->flags & LUA_REDIS_NO_POOL) ? | |||
RSPAMD_REDIS_RELEASE_ENFORCE : RSPAMD_REDIS_RELEASE_DEFAULT); | |||
} | |||
} | |||
@@ -596,7 +609,8 @@ lua_redis_callback_sync (redisAsyncContext *ac, gpointer r, gpointer priv) | |||
* This will call all callbacks pending so the entire context | |||
* will be destructed | |||
*/ | |||
rspamd_redis_pool_release_connection (sp_ud->c->pool, ac, TRUE); | |||
rspamd_redis_pool_release_connection (sp_ud->c->pool, ac, | |||
RSPAMD_REDIS_RELEASE_FATAL); | |||
} | |||
result->result_ref = luaL_ref (L, LUA_REGISTRYINDEX); | |||
@@ -646,7 +660,8 @@ lua_redis_timeout_sync (EV_P_ ev_timer *w, int revents) | |||
* This will call all callbacks pending so the entire context | |||
* will be destructed | |||
*/ | |||
rspamd_redis_pool_release_connection (sp_ud->c->pool, ac, TRUE); | |||
rspamd_redis_pool_release_connection (sp_ud->c->pool, ac, | |||
RSPAMD_REDIS_RELEASE_FATAL); | |||
} | |||
} | |||
@@ -679,7 +694,8 @@ lua_redis_timeout (EV_P_ ev_timer *w, int revents) | |||
* This will call all callbacks pending so the entire context | |||
* will be destructed | |||
*/ | |||
rspamd_redis_pool_release_connection (sp_ud->c->pool, ac, TRUE); | |||
rspamd_redis_pool_release_connection (sp_ud->c->pool, ac, | |||
RSPAMD_REDIS_RELEASE_FATAL); | |||
} | |||
REDIS_RELEASE (ctx); | |||
@@ -904,6 +920,13 @@ rspamd_lua_redis_prepare_connection (lua_State *L, gint *pcbref, gboolean is_asy | |||
} | |||
lua_pop (L, 1); | |||
lua_pushstring (L, "no_pool"); | |||
lua_gettable (L, -2); | |||
if (!!lua_toboolean (L, -1)) { | |||
flags |= LUA_REDIS_NO_POOL; | |||
} | |||
lua_pop (L, 1); | |||
lua_pop (L, 1); /* table */ | |||
if (session && rspamd_session_blocked (session)) { | |||
@@ -962,7 +985,8 @@ rspamd_lua_redis_prepare_connection (lua_State *L, gint *pcbref, gboolean is_asy | |||
if (ud->ctx) { | |||
msg_err_task_check ("cannot connect to redis: %s", | |||
ud->ctx->errstr); | |||
rspamd_redis_pool_release_connection (ud->pool, ud->ctx, TRUE); | |||
rspamd_redis_pool_release_connection (ud->pool, ud->ctx, | |||
RSPAMD_REDIS_RELEASE_FATAL); | |||
ud->ctx = NULL; | |||
} | |||
else { | |||
@@ -1065,7 +1089,8 @@ lua_redis_make_request (lua_State *L) | |||
} | |||
else { | |||
msg_info ("call to redis failed: %s", ud->ctx->errstr); | |||
rspamd_redis_pool_release_connection (ud->pool, ud->ctx, TRUE); | |||
rspamd_redis_pool_release_connection (ud->pool, ud->ctx, | |||
RSPAMD_REDIS_RELEASE_FATAL); | |||
ud->ctx = NULL; | |||
REDIS_RELEASE (ctx); | |||
ret = FALSE; |