diff options
-rw-r--r-- | src/libserver/redis_pool.c | 42 |
1 files changed, 39 insertions, 3 deletions
diff --git a/src/libserver/redis_pool.c b/src/libserver/redis_pool.c index dd5adf221..ba39bb2f8 100644 --- a/src/libserver/redis_pool.c +++ b/src/libserver/redis_pool.c @@ -115,6 +115,10 @@ rspamd_redis_pool_conn_dtor (struct rspamd_redis_pool_connection *conn) event_del (&conn->timeout); } + if (conn->ctx) { + g_hash_table_remove (conn->elt->pool->elts_by_ctx, conn->ctx); + } + g_queue_unlink (conn->elt->inactive, conn->entry); } @@ -180,6 +184,28 @@ rspamd_redis_pool_schedule_timeout (struct rspamd_redis_pool_connection *conn) event_add (&conn->timeout, &tv); } +static void +rspamd_redis_pool_on_disconnect (const struct redisAsyncContext *ac, int status, + void *ud) +{ + struct rspamd_redis_pool_connection *conn = ud; + + /* + * Here, we know that redis itself will free this connection + * so, we need to do something very clever about it + */ + + if (!conn->active) { + /* Do nothing for active connections as it is already handled somewhere */ + if (conn->ctx) { + msg_info_rpool ("inactive connection terminated: %s", + conn->ctx->errstr); + } + + REF_RELEASE (conn); + } +} + static struct rspamd_redis_pool_connection * rspamd_redis_pool_new_connection (struct rspamd_redis_pool *pool, struct rspamd_redis_pool_elt *elt, @@ -214,6 +240,8 @@ rspamd_redis_pool_new_connection (struct rspamd_redis_pool *pool, msg_debug_rpool ("created new connection to %s:%d", ip, port); redisLibeventAttach (ctx, pool->ev_base); + redisAsyncSetDisconnectCallback (ctx, rspamd_redis_pool_on_disconnect, + conn); if (password) { redisAsyncCommand (ctx, NULL, NULL, "AUTH %s", password); @@ -295,9 +323,16 @@ rspamd_redis_pool_connect (struct rspamd_redis_pool *pool, event_del (&conn->timeout); } - conn->active = TRUE; - g_queue_push_tail_link (elt->active, conn_entry); - msg_debug_rpool ("reused existing connection to %s:%d", ip, port); + if (conn->ctx->err == REDIS_OK) { + conn->active = TRUE; + g_queue_push_tail_link (elt->active, conn_entry); + msg_debug_rpool ("reused existing connection to %s:%d", ip, port); + } + else { + REF_RELEASE (conn); + conn = rspamd_redis_pool_new_connection (pool, elt, + db, password, ip, port); + } } else { @@ -307,6 +342,7 @@ rspamd_redis_pool_connect (struct rspamd_redis_pool *pool, } } else { + /* Need to create a pool */ elt = rspamd_redis_pool_new_elt (pool); elt->key = key; g_hash_table_insert (pool->elts_by_key, &elt->key, elt); |