aboutsummaryrefslogtreecommitdiffstats
path: root/src/libserver
diff options
context:
space:
mode:
Diffstat (limited to 'src/libserver')
-rw-r--r--src/libserver/redis_pool.c59
1 files changed, 37 insertions, 22 deletions
diff --git a/src/libserver/redis_pool.c b/src/libserver/redis_pool.c
index cfc2757be..8ef0f9ad5 100644
--- a/src/libserver/redis_pool.c
+++ b/src/libserver/redis_pool.c
@@ -20,6 +20,7 @@
#include "cfg_file.h"
#include "hiredis/hiredis.h"
#include "hiredis/async.h"
+#include "hiredis/adapters/libevent.h"
#include "cryptobox.h"
#include "ref.h"
@@ -78,14 +79,19 @@ rspamd_redis_pool_get_key (const gchar *db, const gchar *password,
static void
rspamd_redis_pool_conn_dtor (struct rspamd_redis_pool_connection *c)
{
- if (c->active && c->ctx != NULL) {
- g_hash_table_remove (c->elt->pool->elts_by_ctx, c->ctx);
- redisAsyncFree (c->ctx);
+ if (c->active) {
+ if (c->ctx) {
+ g_hash_table_remove (c->elt->pool->elts_by_ctx, c->ctx);
+ redisAsyncFree (c->ctx);
+ }
+
g_queue_unlink (c->elt->active, c->entry);
}
+ else {
+ if (event_get_base (&c->timeout)) {
+ event_del (&c->timeout);
+ }
- if (!c->active && event_get_base (&c->timeout)) {
- event_del (&c->timeout);
g_queue_unlink (c->elt->inactive, c->entry);
}
@@ -162,20 +168,30 @@ rspamd_redis_pool_new_connection (struct rspamd_redis_pool *pool,
ctx = redisAsyncConnect (ip, port);
if (ctx) {
- conn = g_slice_alloc0 (sizeof (conn));
- conn->entry = g_list_prepend (NULL, conn);
- conn->elt = elt;
- conn->active = TRUE;
- g_hash_table_insert (elt->pool->elts_by_ctx, ctx, conn);
- g_queue_push_head_link (elt->active, conn->entry);
- conn->ctx = ctx;
- REF_INIT_RETAIN (conn, rspamd_redis_pool_conn_dtor);
-
- if (password) {
- redisAsyncCommand (ctx, NULL, NULL, "AUTH %s", password);
+
+ if (ctx->err != REDIS_OK) {
+ redisAsyncFree (ctx);
+
+ return NULL;
}
- if (db) {
- redisAsyncCommand (ctx, NULL, NULL, "SELECT %s", db);
+ else {
+ conn = g_slice_alloc0 (sizeof (*conn));
+ conn->entry = g_list_prepend (NULL, conn);
+ conn->elt = elt;
+ conn->active = TRUE;
+ g_hash_table_insert (elt->pool->elts_by_ctx, ctx, conn);
+ g_queue_push_head_link (elt->active, conn->entry);
+ conn->ctx = ctx;
+ REF_INIT_RETAIN (conn, rspamd_redis_pool_conn_dtor);
+
+ redisLibeventAttach (ctx, pool->ev_base);
+
+ if (password) {
+ redisAsyncCommand (ctx, NULL, NULL, "AUTH %s", password);
+ }
+ if (db) {
+ redisAsyncCommand (ctx, NULL, NULL, "SELECT %s", db);
+ }
}
return conn;
@@ -252,7 +268,6 @@ rspamd_redis_pool_connect (struct rspamd_redis_pool *pool,
conn->active = TRUE;
g_queue_push_tail_link (elt->active, conn_entry);
- REF_RETAIN (conn);
}
else {
@@ -260,8 +275,6 @@ rspamd_redis_pool_connect (struct rspamd_redis_pool *pool,
conn = rspamd_redis_pool_new_connection (pool, elt,
db, password, ip, port);
}
-
- return conn->ctx;
}
else {
elt = rspamd_redis_pool_new_elt (pool);
@@ -272,6 +285,8 @@ rspamd_redis_pool_connect (struct rspamd_redis_pool *pool,
db, password, ip, port);
}
+ REF_RETAIN (conn);
+
return conn->ctx;
}
@@ -289,7 +304,7 @@ rspamd_redis_pool_release_connection (struct rspamd_redis_pool *pool,
if (conn != NULL) {
REF_RELEASE (conn);
- if (is_fatal) {
+ if (is_fatal || ctx->err == REDIS_ERR_IO || ctx->err == REDIS_ERR_EOF) {
/* We need to terminate connection forcefully */
REF_RELEASE (conn);
}