From de820f915d1393f5f7a2c080fd66f9109c489624 Mon Sep 17 00:00:00 2001 From: Vsevolod Stakhov Date: Thu, 12 Jan 2017 15:05:35 +0000 Subject: [PATCH] [Fix] Backport redis pool fixes from master --- src/libserver/redis_pool.c | 61 ++++++++++++++++++++++++++++---------- 1 file changed, 45 insertions(+), 16 deletions(-) diff --git a/src/libserver/redis_pool.c b/src/libserver/redis_pool.c index 578115571..4e1a788ad 100644 --- a/src/libserver/redis_pool.c +++ b/src/libserver/redis_pool.c @@ -102,11 +102,19 @@ rspamd_redis_pool_conn_dtor (struct rspamd_redis_pool_connection *conn) msg_debug_rpool ("active connection removed"); if (conn->ctx) { - g_hash_table_remove (conn->elt->pool->elts_by_ctx, conn->ctx); - redisAsyncFree (conn->ctx); + if (!(conn->ctx->c.flags & REDIS_FREEING)) { + redisAsyncContext *ac = conn->ctx; + + conn->ctx = NULL; + g_hash_table_remove (conn->elt->pool->elts_by_ctx, conn->ctx); + ac->onDisconnect = NULL; + redisAsyncFree (ac); + } } - g_queue_unlink (conn->elt->active, conn->entry); + if (conn->entry) { + g_queue_unlink (conn->elt->active, conn->entry); + } } else { msg_debug_rpool ("inactive connection removed"); @@ -126,11 +134,16 @@ rspamd_redis_pool_conn_dtor (struct rspamd_redis_pool_connection *conn) redisAsyncFree (ac); } - g_queue_unlink (conn->elt->inactive, conn->entry); + if (conn->entry) { + g_queue_unlink (conn->elt->inactive, conn->entry); + } } - g_list_free (conn->entry); + if (conn->entry) { + g_list_free (conn->entry); + } + g_slice_free1 (sizeof (*conn), conn); } @@ -143,11 +156,13 @@ rspamd_redis_pool_elt_dtor (gpointer p) for (cur = elt->active->head; cur != NULL; cur = g_list_next (cur)) { c = cur->data; + c->entry = NULL; REF_RELEASE (c); } for (cur = elt->inactive->head; cur != NULL; cur = g_list_next (cur)) { c = cur->data; + c->entry = NULL; REF_RELEASE (c); } @@ -225,7 +240,12 @@ rspamd_redis_pool_new_connection (struct rspamd_redis_pool *pool, struct rspamd_redis_pool_connection *conn; struct redisAsyncContext *ctx; - ctx = redisAsyncConnect (ip, port); + if (*ip == '/' || *ip == '.') { + ctx = redisAsyncConnectUnix (ip); + } + else { + ctx = redisAsyncConnect (ip, port); + } if (ctx) { @@ -327,16 +347,18 @@ rspamd_redis_pool_connect (struct rspamd_redis_pool *pool, conn_entry = g_queue_pop_head_link (elt->inactive); conn = conn_entry->data; - if (event_get_base (&conn->timeout)) { - event_del (&conn->timeout); - } - if (conn->ctx->err == REDIS_OK) { + if (event_get_base (&conn->timeout)) { + event_del (&conn->timeout); + } + conn->active = TRUE; g_queue_push_tail_link (elt->active, conn_entry); msg_debug_rpool ("reused existing connection to %s:%d", ip, port); } else { + g_list_free (conn->entry); + conn->entry = NULL; REF_RELEASE (conn); conn = rspamd_redis_pool_new_connection (pool, elt, db, password, ip, port); @@ -388,12 +410,19 @@ rspamd_redis_pool_release_connection (struct rspamd_redis_pool *pool, REF_RELEASE (conn); } else { - /* Just move it to the inactive queue */ - g_queue_unlink (conn->elt->active, conn->entry); - g_queue_push_head_link (conn->elt->inactive, conn->entry); - conn->active = FALSE; - rspamd_redis_pool_schedule_timeout (conn); - msg_debug_rpool ("mark connection inactive"); + /* Ensure that there are no callbacks attached to this conn */ + if (ctx->replies.head == NULL) { + /* Just move it to the inactive queue */ + g_queue_unlink (conn->elt->active, conn->entry); + g_queue_push_head_link (conn->elt->inactive, conn->entry); + conn->active = FALSE; + rspamd_redis_pool_schedule_timeout (conn); + msg_debug_rpool ("mark connection inactive"); + } + else { + msg_debug_rpool ("closed connection due to callbacks leftover"); + REF_RELEASE (conn); + } } REF_RELEASE (conn); -- 2.39.5