diff options
Diffstat (limited to 'src/libserver')
-rw-r--r-- | src/libserver/redis_pool.c | 59 |
1 files changed, 37 insertions, 22 deletions
diff --git a/src/libserver/redis_pool.c b/src/libserver/redis_pool.c index cfc2757be..8ef0f9ad5 100644 --- a/src/libserver/redis_pool.c +++ b/src/libserver/redis_pool.c @@ -20,6 +20,7 @@ #include "cfg_file.h" #include "hiredis/hiredis.h" #include "hiredis/async.h" +#include "hiredis/adapters/libevent.h" #include "cryptobox.h" #include "ref.h" @@ -78,14 +79,19 @@ rspamd_redis_pool_get_key (const gchar *db, const gchar *password, static void rspamd_redis_pool_conn_dtor (struct rspamd_redis_pool_connection *c) { - if (c->active && c->ctx != NULL) { - g_hash_table_remove (c->elt->pool->elts_by_ctx, c->ctx); - redisAsyncFree (c->ctx); + if (c->active) { + if (c->ctx) { + g_hash_table_remove (c->elt->pool->elts_by_ctx, c->ctx); + redisAsyncFree (c->ctx); + } + g_queue_unlink (c->elt->active, c->entry); } + else { + if (event_get_base (&c->timeout)) { + event_del (&c->timeout); + } - if (!c->active && event_get_base (&c->timeout)) { - event_del (&c->timeout); g_queue_unlink (c->elt->inactive, c->entry); } @@ -162,20 +168,30 @@ rspamd_redis_pool_new_connection (struct rspamd_redis_pool *pool, ctx = redisAsyncConnect (ip, port); if (ctx) { - conn = g_slice_alloc0 (sizeof (conn)); - conn->entry = g_list_prepend (NULL, conn); - conn->elt = elt; - conn->active = TRUE; - g_hash_table_insert (elt->pool->elts_by_ctx, ctx, conn); - g_queue_push_head_link (elt->active, conn->entry); - conn->ctx = ctx; - REF_INIT_RETAIN (conn, rspamd_redis_pool_conn_dtor); - - if (password) { - redisAsyncCommand (ctx, NULL, NULL, "AUTH %s", password); + + if (ctx->err != REDIS_OK) { + redisAsyncFree (ctx); + + return NULL; } - if (db) { - redisAsyncCommand (ctx, NULL, NULL, "SELECT %s", db); + else { + conn = g_slice_alloc0 (sizeof (*conn)); + conn->entry = g_list_prepend (NULL, conn); + conn->elt = elt; + conn->active = TRUE; + g_hash_table_insert (elt->pool->elts_by_ctx, ctx, conn); + g_queue_push_head_link (elt->active, conn->entry); + conn->ctx = ctx; + REF_INIT_RETAIN (conn, rspamd_redis_pool_conn_dtor); + + redisLibeventAttach (ctx, pool->ev_base); + + if (password) { + redisAsyncCommand (ctx, NULL, NULL, "AUTH %s", password); + } + if (db) { + redisAsyncCommand (ctx, NULL, NULL, "SELECT %s", db); + } } return conn; @@ -252,7 +268,6 @@ rspamd_redis_pool_connect (struct rspamd_redis_pool *pool, conn->active = TRUE; g_queue_push_tail_link (elt->active, conn_entry); - REF_RETAIN (conn); } else { @@ -260,8 +275,6 @@ rspamd_redis_pool_connect (struct rspamd_redis_pool *pool, conn = rspamd_redis_pool_new_connection (pool, elt, db, password, ip, port); } - - return conn->ctx; } else { elt = rspamd_redis_pool_new_elt (pool); @@ -272,6 +285,8 @@ rspamd_redis_pool_connect (struct rspamd_redis_pool *pool, db, password, ip, port); } + REF_RETAIN (conn); + return conn->ctx; } @@ -289,7 +304,7 @@ rspamd_redis_pool_release_connection (struct rspamd_redis_pool *pool, if (conn != NULL) { REF_RELEASE (conn); - if (is_fatal) { + if (is_fatal || ctx->err == REDIS_ERR_IO || ctx->err == REDIS_ERR_EOF) { /* We need to terminate connection forcefully */ REF_RELEASE (conn); } |