aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--src/libserver/redis_pool.c42
1 files changed, 39 insertions, 3 deletions
diff --git a/src/libserver/redis_pool.c b/src/libserver/redis_pool.c
index dd5adf221..ba39bb2f8 100644
--- a/src/libserver/redis_pool.c
+++ b/src/libserver/redis_pool.c
@@ -115,6 +115,10 @@ rspamd_redis_pool_conn_dtor (struct rspamd_redis_pool_connection *conn)
event_del (&conn->timeout);
}
+ if (conn->ctx) {
+ g_hash_table_remove (conn->elt->pool->elts_by_ctx, conn->ctx);
+ }
+
g_queue_unlink (conn->elt->inactive, conn->entry);
}
@@ -180,6 +184,28 @@ rspamd_redis_pool_schedule_timeout (struct rspamd_redis_pool_connection *conn)
event_add (&conn->timeout, &tv);
}
+static void
+rspamd_redis_pool_on_disconnect (const struct redisAsyncContext *ac, int status,
+ void *ud)
+{
+ struct rspamd_redis_pool_connection *conn = ud;
+
+ /*
+ * Here, we know that redis itself will free this connection
+ * so, we need to do something very clever about it
+ */
+
+ if (!conn->active) {
+ /* Do nothing for active connections as it is already handled somewhere */
+ if (conn->ctx) {
+ msg_info_rpool ("inactive connection terminated: %s",
+ conn->ctx->errstr);
+ }
+
+ REF_RELEASE (conn);
+ }
+}
+
static struct rspamd_redis_pool_connection *
rspamd_redis_pool_new_connection (struct rspamd_redis_pool *pool,
struct rspamd_redis_pool_elt *elt,
@@ -214,6 +240,8 @@ rspamd_redis_pool_new_connection (struct rspamd_redis_pool *pool,
msg_debug_rpool ("created new connection to %s:%d", ip, port);
redisLibeventAttach (ctx, pool->ev_base);
+ redisAsyncSetDisconnectCallback (ctx, rspamd_redis_pool_on_disconnect,
+ conn);
if (password) {
redisAsyncCommand (ctx, NULL, NULL, "AUTH %s", password);
@@ -295,9 +323,16 @@ rspamd_redis_pool_connect (struct rspamd_redis_pool *pool,
event_del (&conn->timeout);
}
- conn->active = TRUE;
- g_queue_push_tail_link (elt->active, conn_entry);
- msg_debug_rpool ("reused existing connection to %s:%d", ip, port);
+ if (conn->ctx->err == REDIS_OK) {
+ conn->active = TRUE;
+ g_queue_push_tail_link (elt->active, conn_entry);
+ msg_debug_rpool ("reused existing connection to %s:%d", ip, port);
+ }
+ else {
+ REF_RELEASE (conn);
+ conn = rspamd_redis_pool_new_connection (pool, elt,
+ db, password, ip, port);
+ }
}
else {
@@ -307,6 +342,7 @@ rspamd_redis_pool_connect (struct rspamd_redis_pool *pool,
}
}
else {
+ /* Need to create a pool */
elt = rspamd_redis_pool_new_elt (pool);
elt->key = key;
g_hash_table_insert (pool->elts_by_key, &elt->key, elt);