]> source.dussan.org Git - rspamd.git/commitdiff
[Rework] Start conversion of the redis pool code to c++
authorVsevolod Stakhov <vsevolod@highsecure.ru>
Thu, 22 Jul 2021 12:05:00 +0000 (13:05 +0100)
committerVsevolod Stakhov <vsevolod@highsecure.ru>
Thu, 22 Jul 2021 12:05:00 +0000 (13:05 +0100)
No functional changes.

src/libserver/CMakeLists.txt
src/libserver/redis_pool.c [deleted file]
src/libserver/redis_pool.cxx [new file with mode: 0644]

index 1dc8d9006f234951c996e02b44ef86bf1547f5ae..a4fdbbfcbdc2f86973597f1c87f06246d2cc5860 100644 (file)
@@ -16,7 +16,7 @@ SET(LIBRSPAMDSERVERSRC
                                ${CMAKE_CURRENT_SOURCE_DIR}/monitored.c
                                ${CMAKE_CURRENT_SOURCE_DIR}/protocol.c
                                ${CMAKE_CURRENT_SOURCE_DIR}/re_cache.c
-                               ${CMAKE_CURRENT_SOURCE_DIR}/redis_pool.c
+               ${CMAKE_CURRENT_SOURCE_DIR}/redis_pool.cxx
                                ${CMAKE_CURRENT_SOURCE_DIR}/roll_history.c
                                ${CMAKE_CURRENT_SOURCE_DIR}/spf.c
                                ${CMAKE_CURRENT_SOURCE_DIR}/ssl_util.c
diff --git a/src/libserver/redis_pool.c b/src/libserver/redis_pool.c
deleted file mode 100644 (file)
index da97c60..0000000
+++ /dev/null
@@ -1,572 +0,0 @@
-/*-
- * Copyright 2016 Vsevolod Stakhov
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#include "config.h"
-#include "contrib/libev/ev.h"
-#include "redis_pool.h"
-#include "cfg_file.h"
-#include "contrib/hiredis/hiredis.h"
-#include "contrib/hiredis/async.h"
-#include "contrib/hiredis/adapters/libev.h"
-#include "cryptobox.h"
-#include "logger.h"
-
-struct rspamd_redis_pool_elt;
-
-enum rspamd_redis_pool_connection_state {
-       RSPAMD_REDIS_POOL_CONN_INACTIVE = 0,
-       RSPAMD_REDIS_POOL_CONN_ACTIVE,
-       RSPAMD_REDIS_POOL_CONN_FINALISING
-};
-
-struct rspamd_redis_pool_connection {
-       struct redisAsyncContext *ctx;
-       struct rspamd_redis_pool_elt *elt;
-       GList *entry;
-       ev_timer timeout;
-       enum rspamd_redis_pool_connection_state state;
-       gchar tag[MEMPOOL_UID_LEN];
-       ref_entry_t ref;
-};
-
-struct rspamd_redis_pool_elt {
-       struct rspamd_redis_pool *pool;
-       guint64 key;
-       GQueue *active;
-       GQueue *inactive;
-};
-
-struct rspamd_redis_pool {
-       struct ev_loop *event_loop;
-       struct rspamd_config *cfg;
-       GHashTable *elts_by_key;
-       GHashTable *elts_by_ctx;
-       gdouble timeout;
-       guint max_conns;
-};
-
-static const gdouble default_timeout = 10.0;
-static const guint default_max_conns = 100;
-
-#define msg_err_rpool(...) rspamd_default_log_function (G_LOG_LEVEL_CRITICAL, \
-               "redis_pool", conn->tag, \
-        G_STRFUNC, \
-        __VA_ARGS__)
-#define msg_warn_rpool(...)   rspamd_default_log_function (G_LOG_LEVEL_WARNING, \
-               "redis_pool", conn->tag, \
-        G_STRFUNC, \
-        __VA_ARGS__)
-#define msg_info_rpool(...)   rspamd_default_log_function (G_LOG_LEVEL_INFO, \
-               "redis_pool", conn->tag, \
-        G_STRFUNC, \
-        __VA_ARGS__)
-#define msg_debug_rpool(...)  rspamd_conditional_debug_fast (NULL, NULL, \
-        rspamd_redis_pool_log_id, "redis_pool", conn->tag, \
-        G_STRFUNC, \
-        __VA_ARGS__)
-
-INIT_LOG_MODULE(redis_pool)
-
-static inline guint64
-rspamd_redis_pool_get_key (const gchar *db, const gchar *password,
-               const char *ip, int port)
-{
-       rspamd_cryptobox_fast_hash_state_t st;
-
-       rspamd_cryptobox_fast_hash_init (&st, rspamd_hash_seed ());
-
-       if (db) {
-               rspamd_cryptobox_fast_hash_update (&st, db, strlen (db));
-       }
-       if (password) {
-               rspamd_cryptobox_fast_hash_update (&st, password, strlen (password));
-       }
-
-       rspamd_cryptobox_fast_hash_update (&st, ip, strlen (ip));
-       rspamd_cryptobox_fast_hash_update (&st, &port, sizeof (port));
-
-       return rspamd_cryptobox_fast_hash_final (&st);
-}
-
-
-static void
-rspamd_redis_pool_conn_dtor (struct rspamd_redis_pool_connection *conn)
-{
-       if (conn->state == RSPAMD_REDIS_POOL_CONN_ACTIVE) {
-               msg_debug_rpool ("active connection removed");
-
-               if (conn->ctx) {
-                       if (!(conn->ctx->c.flags & REDIS_FREEING)) {
-                               redisAsyncContext *ac = conn->ctx;
-
-                               conn->ctx = NULL;
-                               g_hash_table_remove (conn->elt->pool->elts_by_ctx, ac);
-                               ac->onDisconnect = NULL;
-                               redisAsyncFree (ac);
-                       }
-               }
-
-               if (conn->entry) {
-                       g_queue_unlink (conn->elt->active, conn->entry);
-               }
-       }
-       else {
-               msg_debug_rpool ("inactive connection removed");
-
-               ev_timer_stop (conn->elt->pool->event_loop, &conn->timeout);
-
-               if (conn->ctx && !(conn->ctx->c.flags & REDIS_FREEING)) {
-                       redisAsyncContext *ac = conn->ctx;
-
-                       /* To prevent on_disconnect here */
-                       conn->state = RSPAMD_REDIS_POOL_CONN_FINALISING;
-                       g_hash_table_remove (conn->elt->pool->elts_by_ctx, ac);
-                       conn->ctx = NULL;
-                       ac->onDisconnect = NULL;
-                       redisAsyncFree (ac);
-               }
-
-               if (conn->entry) {
-                       g_queue_unlink (conn->elt->inactive, conn->entry);
-               }
-       }
-
-
-       if (conn->entry) {
-               g_list_free (conn->entry);
-       }
-
-       g_free (conn);
-}
-
-static void
-rspamd_redis_pool_elt_dtor (gpointer p)
-{
-       GList *cur;
-       struct rspamd_redis_pool_elt *elt = p;
-       struct rspamd_redis_pool_connection *c;
-
-       for (cur = elt->active->head; cur != NULL; cur = g_list_next (cur)) {
-               c = cur->data;
-               c->entry = NULL;
-               REF_RELEASE (c);
-       }
-
-       for (cur = elt->inactive->head; cur != NULL; cur = g_list_next (cur)) {
-               c = cur->data;
-               c->entry = NULL;
-               REF_RELEASE (c);
-       }
-
-       g_queue_free (elt->active);
-       g_queue_free (elt->inactive);
-       g_free (elt);
-}
-
-static void
-rspamd_redis_on_quit (redisAsyncContext *c, gpointer r, gpointer priv)
-{
-       struct rspamd_redis_pool_connection *conn =
-                       (struct rspamd_redis_pool_connection *)priv;
-
-       msg_debug_rpool ("quit command reply for the connection %p, refcount: %d",
-                       conn->ctx, conn->ref.refcount);
-       /*
-        * The connection will be freed by hiredis itself as we are here merely after
-        * quit command has succeeded and we have timer being set already.
-        * The problem is that when this callback is called, our connection is likely
-        * dead, so probably even on_disconnect callback has been already called...
-        *
-        * Hence, the connection might already be freed, so even (conn) pointer may be
-        * inaccessible.
-        *
-        * TODO: Use refcounts to prevent this stuff to happen, the problem is how
-        * to handle Redis timeout on `quit` command in fact... The good thing is that
-        * it will not likely happen.
-        */
-}
-
-static void
-rspamd_redis_conn_timeout (EV_P_ ev_timer *w, int revents)
-{
-       struct rspamd_redis_pool_connection *conn =
-                       (struct rspamd_redis_pool_connection *)w->data;
-
-       g_assert (conn->state != RSPAMD_REDIS_POOL_CONN_ACTIVE);
-
-       if (conn->state == RSPAMD_REDIS_POOL_CONN_INACTIVE) {
-               msg_debug_rpool ("scheduled soft removal of connection %p, refcount: %d",
-                               conn->ctx, conn->ref.refcount);
-               /* Prevent reusing */
-               if (conn->entry) {
-                       g_queue_delete_link (conn->elt->inactive, conn->entry);
-                       conn->entry = NULL;
-               }
-
-               conn->state = RSPAMD_REDIS_POOL_CONN_FINALISING;
-               ev_timer_again (EV_A_ w);
-               redisAsyncCommand (conn->ctx, rspamd_redis_on_quit, conn, "QUIT");
-       }
-       else {
-               /* Finalising by timeout */
-               ev_timer_stop (EV_A_ w);
-               msg_debug_rpool ("final removal of connection %p, refcount: %d",
-                               conn->ctx, conn->ref.refcount);
-               REF_RELEASE (conn);
-       }
-
-}
-
-static void
-rspamd_redis_pool_schedule_timeout (struct rspamd_redis_pool_connection *conn)
-{
-       gdouble real_timeout;
-       guint active_elts;
-
-       active_elts = g_queue_get_length (conn->elt->active);
-
-       if (active_elts > conn->elt->pool->max_conns) {
-               real_timeout = conn->elt->pool->timeout / 2.0;
-               real_timeout = rspamd_time_jitter (real_timeout, real_timeout / 4.0);
-       }
-       else {
-               real_timeout = conn->elt->pool->timeout;
-               real_timeout = rspamd_time_jitter (real_timeout, real_timeout / 2.0);
-       }
-
-       msg_debug_rpool ("scheduled connection %p cleanup in %.1f seconds",
-                       conn->ctx, real_timeout);
-
-       conn->timeout.data = conn;
-       ev_timer_init (&conn->timeout,
-                       rspamd_redis_conn_timeout,
-                       real_timeout, real_timeout / 2.0);
-       ev_timer_start (conn->elt->pool->event_loop, &conn->timeout);
-}
-
-static void
-rspamd_redis_pool_on_disconnect (const struct redisAsyncContext *ac, int status)
-{
-       struct rspamd_redis_pool_connection *conn = ac->data;
-
-       /*
-        * Here, we know that redis itself will free this connection
-        * so, we need to do something very clever about it
-        */
-       if (conn->state != RSPAMD_REDIS_POOL_CONN_ACTIVE) {
-               /* Do nothing for active connections as it is already handled somewhere */
-               if (conn->ctx) {
-                       msg_debug_rpool ("inactive connection terminated: %s, refs: %d",
-                               conn->ctx->errstr, conn->ref.refcount);
-               }
-
-               REF_RELEASE (conn);
-       }
-}
-
-static struct rspamd_redis_pool_connection *
-rspamd_redis_pool_new_connection (struct rspamd_redis_pool *pool,
-               struct rspamd_redis_pool_elt *elt,
-               const char *db,
-               const char *password,
-               const char *ip,
-               gint port)
-{
-       struct rspamd_redis_pool_connection *conn;
-       struct redisAsyncContext *ctx;
-
-       if (*ip == '/' || *ip == '.') {
-               ctx = redisAsyncConnectUnix (ip);
-       }
-       else {
-               ctx = redisAsyncConnect (ip, port);
-       }
-
-       if (ctx) {
-
-               if (ctx->err != REDIS_OK) {
-                       msg_err ("cannot connect to redis %s (port %d): %s", ip, port, ctx->errstr);
-                       redisAsyncFree (ctx);
-
-                       return NULL;
-               }
-               else {
-                       conn = g_malloc0 (sizeof (*conn));
-                       conn->entry = g_list_prepend (NULL, conn);
-                       conn->elt = elt;
-                       conn->state = RSPAMD_REDIS_POOL_CONN_ACTIVE;
-
-                       g_hash_table_insert (elt->pool->elts_by_ctx, ctx, conn);
-                       g_queue_push_head_link (elt->active, conn->entry);
-                       conn->ctx = ctx;
-                       ctx->data = conn;
-                       rspamd_random_hex (conn->tag, sizeof (conn->tag));
-                       REF_INIT_RETAIN (conn, rspamd_redis_pool_conn_dtor);
-                       msg_debug_rpool ("created new connection to %s:%d: %p", ip, port, ctx);
-
-                       redisLibevAttach (pool->event_loop, ctx);
-                       redisAsyncSetDisconnectCallback (ctx, rspamd_redis_pool_on_disconnect);
-
-                       if (password) {
-                               redisAsyncCommand (ctx, NULL, NULL,
-                                               "AUTH %s", password);
-                       }
-                       if (db) {
-                               redisAsyncCommand (ctx, NULL, NULL,
-                                               "SELECT %s", db);
-                       }
-               }
-
-               return conn;
-       }
-
-       return NULL;
-}
-
-static struct rspamd_redis_pool_elt *
-rspamd_redis_pool_new_elt (struct rspamd_redis_pool *pool)
-{
-       struct rspamd_redis_pool_elt *elt;
-
-       elt = g_malloc0 (sizeof (*elt));
-       elt->active = g_queue_new ();
-       elt->inactive = g_queue_new ();
-       elt->pool = pool;
-
-       return elt;
-}
-
-struct rspamd_redis_pool *
-rspamd_redis_pool_init (void)
-{
-       struct rspamd_redis_pool *pool;
-
-       pool = g_malloc0 (sizeof (*pool));
-       pool->elts_by_key = g_hash_table_new_full (g_int64_hash, g_int64_equal,
-                       NULL, rspamd_redis_pool_elt_dtor);
-       pool->elts_by_ctx = g_hash_table_new (g_direct_hash, g_direct_equal);
-
-       return pool;
-}
-
-void
-rspamd_redis_pool_config (struct rspamd_redis_pool *pool,
-               struct rspamd_config *cfg,
-               struct ev_loop *ev_base)
-{
-       g_assert (pool != NULL);
-
-       pool->event_loop = ev_base;
-       pool->cfg = cfg;
-       pool->timeout = default_timeout;
-       pool->max_conns = default_max_conns;
-}
-
-
-struct redisAsyncContext*
-rspamd_redis_pool_connect (struct rspamd_redis_pool *pool,
-               const gchar *db, const gchar *password,
-               const char *ip, int port)
-{
-       guint64 key;
-       struct rspamd_redis_pool_elt *elt;
-       GList *conn_entry;
-       struct rspamd_redis_pool_connection *conn;
-
-       g_assert (pool != NULL);
-       g_assert (pool->event_loop != NULL);
-       g_assert (ip != NULL);
-
-       key = rspamd_redis_pool_get_key (db, password, ip, port);
-       elt = g_hash_table_lookup (pool->elts_by_key, &key);
-
-       if (elt) {
-               if (g_queue_get_length (elt->inactive) > 0) {
-                       conn_entry = g_queue_pop_head_link (elt->inactive);
-                       conn = conn_entry->data;
-                       g_assert (conn->state != RSPAMD_REDIS_POOL_CONN_ACTIVE);
-
-                       if (conn->ctx->err == REDIS_OK) {
-                               /* Also check SO_ERROR */
-                               gint err;
-                               socklen_t len = sizeof (gint);
-
-                               if (getsockopt (conn->ctx->c.fd, SOL_SOCKET, SO_ERROR,
-                                               (void *) &err, &len) == -1) {
-                                       err = errno;
-                               }
-
-                               if (err != 0) {
-                                       g_list_free (conn->entry);
-                                       conn->entry = NULL;
-                                       REF_RELEASE (conn);
-                                       conn = rspamd_redis_pool_new_connection (pool, elt,
-                                                       db, password, ip, port);
-                               }
-                               else {
-
-                                       ev_timer_stop (elt->pool->event_loop, &conn->timeout);
-                                       conn->state = RSPAMD_REDIS_POOL_CONN_ACTIVE;
-                                       g_queue_push_tail_link (elt->active, conn_entry);
-                                       msg_debug_rpool ("reused existing connection to %s:%d: %p",
-                                                       ip, port, conn->ctx);
-                               }
-                       }
-                       else {
-                               g_list_free (conn->entry);
-                               conn->entry = NULL;
-                               REF_RELEASE (conn);
-                               conn = rspamd_redis_pool_new_connection (pool, elt,
-                                               db, password, ip, port);
-                       }
-
-               }
-               else {
-                       /* Need to create connection */
-                       conn = rspamd_redis_pool_new_connection (pool, elt,
-                                       db, password, ip, port);
-               }
-       }
-       else {
-               /* Need to create a pool */
-               elt = rspamd_redis_pool_new_elt (pool);
-               elt->key = key;
-               g_hash_table_insert (pool->elts_by_key, &elt->key, elt);
-
-               conn = rspamd_redis_pool_new_connection (pool, elt,
-                               db, password, ip, port);
-       }
-
-       if (!conn) {
-               return NULL;
-       }
-
-       REF_RETAIN (conn);
-
-       return conn->ctx;
-}
-
-
-void
-rspamd_redis_pool_release_connection (struct rspamd_redis_pool *pool,
-               struct redisAsyncContext *ctx, enum rspamd_redis_pool_release_type how)
-{
-       struct rspamd_redis_pool_connection *conn;
-
-       g_assert (pool != NULL);
-       g_assert (ctx != NULL);
-
-       conn = g_hash_table_lookup (pool->elts_by_ctx, ctx);
-       if (conn != NULL) {
-               g_assert (conn->state == RSPAMD_REDIS_POOL_CONN_ACTIVE);
-
-               if (ctx->err != REDIS_OK) {
-                       /* We need to terminate connection forcefully */
-                       msg_debug_rpool ("closed connection %p due to an error", conn->ctx);
-                       REF_RELEASE (conn);
-               }
-               else {
-                       if (how == RSPAMD_REDIS_RELEASE_DEFAULT) {
-                               /* Ensure that there are no callbacks attached to this conn */
-                               if (ctx->replies.head == NULL) {
-                                       /* Just move it to the inactive queue */
-                                       g_queue_unlink (conn->elt->active, conn->entry);
-                                       g_queue_push_head_link (conn->elt->inactive, conn->entry);
-                                       conn->state = RSPAMD_REDIS_POOL_CONN_INACTIVE;
-                                       rspamd_redis_pool_schedule_timeout (conn);
-                                       msg_debug_rpool ("mark connection %p inactive", conn->ctx);
-                               }
-                               else {
-                                       msg_debug_rpool ("closed connection %p due to callbacks left",
-                                                       conn->ctx);
-                                       REF_RELEASE (conn);
-                               }
-                       }
-                       else {
-                               if (how == RSPAMD_REDIS_RELEASE_FATAL) {
-                                       msg_debug_rpool ("closed connection %p due to an fatal termination",
-                                                       conn->ctx);
-                               }
-                               else {
-                                       msg_debug_rpool ("closed connection %p due to explicit termination",
-                                                       conn->ctx);
-                               }
-
-                               REF_RELEASE (conn);
-                       }
-               }
-
-               REF_RELEASE (conn);
-       }
-       else {
-               g_assert_not_reached ();
-       }
-}
-
-
-void
-rspamd_redis_pool_destroy (struct rspamd_redis_pool *pool)
-{
-       struct rspamd_redis_pool_elt *elt;
-       GHashTableIter it;
-       gpointer k, v;
-
-       g_assert (pool != NULL);
-
-       g_hash_table_iter_init (&it, pool->elts_by_key);
-
-       while (g_hash_table_iter_next (&it, &k, &v)) {
-               elt = v;
-               rspamd_redis_pool_elt_dtor (elt);
-               g_hash_table_iter_steal (&it);
-       }
-
-       g_hash_table_unref (pool->elts_by_ctx);
-       g_hash_table_unref (pool->elts_by_key);
-
-       g_free (pool);
-}
-
-const gchar*
-rspamd_redis_type_to_string (int type)
-{
-       const gchar *ret = "unknown";
-
-       switch (type) {
-       case REDIS_REPLY_STRING:
-               ret = "string";
-               break;
-       case REDIS_REPLY_ARRAY:
-               ret = "array";
-               break;
-       case REDIS_REPLY_INTEGER:
-               ret = "int";
-               break;
-       case REDIS_REPLY_STATUS:
-               ret = "status";
-               break;
-       case REDIS_REPLY_NIL:
-               ret = "nil";
-               break;
-       case REDIS_REPLY_ERROR:
-               ret = "error";
-               break;
-       default:
-               break;
-       }
-
-       return ret;
-}
diff --git a/src/libserver/redis_pool.cxx b/src/libserver/redis_pool.cxx
new file mode 100644 (file)
index 0000000..a81039d
--- /dev/null
@@ -0,0 +1,572 @@
+/*-
+ * Copyright 2016 Vsevolod Stakhov
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "config.h"
+#include "contrib/libev/ev.h"
+#include "redis_pool.h"
+#include "cfg_file.h"
+#include "contrib/hiredis/hiredis.h"
+#include "contrib/hiredis/async.h"
+#include "contrib/hiredis/adapters/libev.h"
+#include "cryptobox.h"
+#include "logger.h"
+
+struct rspamd_redis_pool_elt;
+
+enum rspamd_redis_pool_connection_state {
+       RSPAMD_REDIS_POOL_CONN_INACTIVE = 0,
+       RSPAMD_REDIS_POOL_CONN_ACTIVE,
+       RSPAMD_REDIS_POOL_CONN_FINALISING
+};
+
+struct rspamd_redis_pool_connection {
+       struct redisAsyncContext *ctx;
+       struct rspamd_redis_pool_elt *elt;
+       GList *entry;
+       ev_timer timeout;
+       enum rspamd_redis_pool_connection_state state;
+       gchar tag[MEMPOOL_UID_LEN];
+       ref_entry_t ref;
+};
+
+struct rspamd_redis_pool_elt {
+       struct rspamd_redis_pool *pool;
+       guint64 key;
+       GQueue *active;
+       GQueue *inactive;
+};
+
+struct rspamd_redis_pool {
+       struct ev_loop *event_loop;
+       struct rspamd_config *cfg;
+       GHashTable *elts_by_key;
+       GHashTable *elts_by_ctx;
+       gdouble timeout;
+       guint max_conns;
+};
+
+static const gdouble default_timeout = 10.0;
+static const guint default_max_conns = 100;
+
+#define msg_err_rpool(...) rspamd_default_log_function (G_LOG_LEVEL_CRITICAL, \
+               "redis_pool", conn->tag, \
+        G_STRFUNC, \
+        __VA_ARGS__)
+#define msg_warn_rpool(...)   rspamd_default_log_function (G_LOG_LEVEL_WARNING, \
+               "redis_pool", conn->tag, \
+        G_STRFUNC, \
+        __VA_ARGS__)
+#define msg_info_rpool(...)   rspamd_default_log_function (G_LOG_LEVEL_INFO, \
+               "redis_pool", conn->tag, \
+        G_STRFUNC, \
+        __VA_ARGS__)
+#define msg_debug_rpool(...)  rspamd_conditional_debug_fast (NULL, NULL, \
+        rspamd_redis_pool_log_id, "redis_pool", conn->tag, \
+        G_STRFUNC, \
+        __VA_ARGS__)
+
+INIT_LOG_MODULE(redis_pool)
+
+static inline guint64
+rspamd_redis_pool_get_key (const gchar *db, const gchar *password,
+               const char *ip, int port)
+{
+       rspamd_cryptobox_fast_hash_state_t st;
+
+       rspamd_cryptobox_fast_hash_init (&st, rspamd_hash_seed ());
+
+       if (db) {
+               rspamd_cryptobox_fast_hash_update (&st, db, strlen (db));
+       }
+       if (password) {
+               rspamd_cryptobox_fast_hash_update (&st, password, strlen (password));
+       }
+
+       rspamd_cryptobox_fast_hash_update (&st, ip, strlen (ip));
+       rspamd_cryptobox_fast_hash_update (&st, &port, sizeof (port));
+
+       return rspamd_cryptobox_fast_hash_final (&st);
+}
+
+
+static void
+rspamd_redis_pool_conn_dtor (struct rspamd_redis_pool_connection *conn)
+{
+       if (conn->state == RSPAMD_REDIS_POOL_CONN_ACTIVE) {
+               msg_debug_rpool ("active connection removed");
+
+               if (conn->ctx) {
+                       if (!(conn->ctx->c.flags & REDIS_FREEING)) {
+                               redisAsyncContext *ac = conn->ctx;
+
+                               conn->ctx = NULL;
+                               g_hash_table_remove (conn->elt->pool->elts_by_ctx, ac);
+                               ac->onDisconnect = NULL;
+                               redisAsyncFree (ac);
+                       }
+               }
+
+               if (conn->entry) {
+                       g_queue_unlink (conn->elt->active, conn->entry);
+               }
+       }
+       else {
+               msg_debug_rpool ("inactive connection removed");
+
+               ev_timer_stop (conn->elt->pool->event_loop, &conn->timeout);
+
+               if (conn->ctx && !(conn->ctx->c.flags & REDIS_FREEING)) {
+                       redisAsyncContext *ac = conn->ctx;
+
+                       /* To prevent on_disconnect here */
+                       conn->state = RSPAMD_REDIS_POOL_CONN_FINALISING;
+                       g_hash_table_remove (conn->elt->pool->elts_by_ctx, ac);
+                       conn->ctx = NULL;
+                       ac->onDisconnect = NULL;
+                       redisAsyncFree (ac);
+               }
+
+               if (conn->entry) {
+                       g_queue_unlink (conn->elt->inactive, conn->entry);
+               }
+       }
+
+
+       if (conn->entry) {
+               g_list_free (conn->entry);
+       }
+
+       g_free (conn);
+}
+
+static void
+rspamd_redis_pool_elt_dtor (gpointer p)
+{
+       GList *cur;
+       struct rspamd_redis_pool_elt *elt = (struct rspamd_redis_pool_elt *)p;
+       struct rspamd_redis_pool_connection *c;
+
+       for (cur = elt->active->head; cur != NULL; cur = g_list_next (cur)) {
+               c = (struct rspamd_redis_pool_connection *)cur->data;
+               c->entry = NULL;
+               REF_RELEASE (c);
+       }
+
+       for (cur = elt->inactive->head; cur != NULL; cur = g_list_next (cur)) {
+               c = (struct rspamd_redis_pool_connection *)cur->data;
+               c->entry = NULL;
+               REF_RELEASE (c);
+       }
+
+       g_queue_free (elt->active);
+       g_queue_free (elt->inactive);
+       g_free (elt);
+}
+
+static void
+rspamd_redis_on_quit (redisAsyncContext *c, gpointer r, gpointer priv)
+{
+       struct rspamd_redis_pool_connection *conn =
+                       (struct rspamd_redis_pool_connection *)priv;
+
+       msg_debug_rpool ("quit command reply for the connection %p, refcount: %d",
+                       conn->ctx, conn->ref.refcount);
+       /*
+        * The connection will be freed by hiredis itself as we are here merely after
+        * quit command has succeeded and we have timer being set already.
+        * The problem is that when this callback is called, our connection is likely
+        * dead, so probably even on_disconnect callback has been already called...
+        *
+        * Hence, the connection might already be freed, so even (conn) pointer may be
+        * inaccessible.
+        *
+        * TODO: Use refcounts to prevent this stuff to happen, the problem is how
+        * to handle Redis timeout on `quit` command in fact... The good thing is that
+        * it will not likely happen.
+        */
+}
+
+static void
+rspamd_redis_conn_timeout (EV_P_ ev_timer *w, int revents)
+{
+       struct rspamd_redis_pool_connection *conn =
+                       (struct rspamd_redis_pool_connection *)w->data;
+
+       g_assert (conn->state != RSPAMD_REDIS_POOL_CONN_ACTIVE);
+
+       if (conn->state == RSPAMD_REDIS_POOL_CONN_INACTIVE) {
+               msg_debug_rpool ("scheduled soft removal of connection %p, refcount: %d",
+                               conn->ctx, conn->ref.refcount);
+               /* Prevent reusing */
+               if (conn->entry) {
+                       g_queue_delete_link (conn->elt->inactive, conn->entry);
+                       conn->entry = NULL;
+               }
+
+               conn->state = RSPAMD_REDIS_POOL_CONN_FINALISING;
+               ev_timer_again (EV_A_ w);
+               redisAsyncCommand (conn->ctx, rspamd_redis_on_quit, conn, "QUIT");
+       }
+       else {
+               /* Finalising by timeout */
+               ev_timer_stop (EV_A_ w);
+               msg_debug_rpool ("final removal of connection %p, refcount: %d",
+                               conn->ctx, conn->ref.refcount);
+               REF_RELEASE (conn);
+       }
+
+}
+
+static void
+rspamd_redis_pool_schedule_timeout (struct rspamd_redis_pool_connection *conn)
+{
+       gdouble real_timeout;
+       guint active_elts;
+
+       active_elts = g_queue_get_length (conn->elt->active);
+
+       if (active_elts > conn->elt->pool->max_conns) {
+               real_timeout = conn->elt->pool->timeout / 2.0;
+               real_timeout = rspamd_time_jitter (real_timeout, real_timeout / 4.0);
+       }
+       else {
+               real_timeout = conn->elt->pool->timeout;
+               real_timeout = rspamd_time_jitter (real_timeout, real_timeout / 2.0);
+       }
+
+       msg_debug_rpool ("scheduled connection %p cleanup in %.1f seconds",
+                       conn->ctx, real_timeout);
+
+       conn->timeout.data = conn;
+       ev_timer_init (&conn->timeout,
+                       rspamd_redis_conn_timeout,
+                       real_timeout, real_timeout / 2.0);
+       ev_timer_start (conn->elt->pool->event_loop, &conn->timeout);
+}
+
+static void
+rspamd_redis_pool_on_disconnect (const struct redisAsyncContext *ac, int status)
+{
+       struct rspamd_redis_pool_connection *conn = (struct rspamd_redis_pool_connection *)ac->data;
+
+       /*
+        * Here, we know that redis itself will free this connection
+        * so, we need to do something very clever about it
+        */
+       if (conn->state != RSPAMD_REDIS_POOL_CONN_ACTIVE) {
+               /* Do nothing for active connections as it is already handled somewhere */
+               if (conn->ctx) {
+                       msg_debug_rpool ("inactive connection terminated: %s, refs: %d",
+                               conn->ctx->errstr, conn->ref.refcount);
+               }
+
+               REF_RELEASE (conn);
+       }
+}
+
+static struct rspamd_redis_pool_connection *
+rspamd_redis_pool_new_connection (struct rspamd_redis_pool *pool,
+               struct rspamd_redis_pool_elt *elt,
+               const char *db,
+               const char *password,
+               const char *ip,
+               gint port)
+{
+       struct rspamd_redis_pool_connection *conn;
+       struct redisAsyncContext *ctx;
+
+       if (*ip == '/' || *ip == '.') {
+               ctx = redisAsyncConnectUnix (ip);
+       }
+       else {
+               ctx = redisAsyncConnect (ip, port);
+       }
+
+       if (ctx) {
+
+               if (ctx->err != REDIS_OK) {
+                       msg_err ("cannot connect to redis %s (port %d): %s", ip, port, ctx->errstr);
+                       redisAsyncFree (ctx);
+
+                       return NULL;
+               }
+               else {
+                       conn = (struct rspamd_redis_pool_connection *)g_malloc0 (sizeof (*conn));
+                       conn->entry = g_list_prepend (NULL, conn);
+                       conn->elt = elt;
+                       conn->state = RSPAMD_REDIS_POOL_CONN_ACTIVE;
+
+                       g_hash_table_insert (elt->pool->elts_by_ctx, ctx, conn);
+                       g_queue_push_head_link (elt->active, conn->entry);
+                       conn->ctx = ctx;
+                       ctx->data = conn;
+                       rspamd_random_hex ((guchar *)conn->tag, sizeof (conn->tag));
+                       REF_INIT_RETAIN (conn, rspamd_redis_pool_conn_dtor);
+                       msg_debug_rpool ("created new connection to %s:%d: %p", ip, port, ctx);
+
+                       redisLibevAttach (pool->event_loop, ctx);
+                       redisAsyncSetDisconnectCallback (ctx, rspamd_redis_pool_on_disconnect);
+
+                       if (password) {
+                               redisAsyncCommand (ctx, NULL, NULL,
+                                               "AUTH %s", password);
+                       }
+                       if (db) {
+                               redisAsyncCommand (ctx, NULL, NULL,
+                                               "SELECT %s", db);
+                       }
+               }
+
+               return conn;
+       }
+
+       return NULL;
+}
+
+static struct rspamd_redis_pool_elt *
+rspamd_redis_pool_new_elt (struct rspamd_redis_pool *pool)
+{
+       struct rspamd_redis_pool_elt *elt;
+
+       elt = (struct rspamd_redis_pool_elt *)g_malloc0 (sizeof (*elt));
+       elt->active = g_queue_new ();
+       elt->inactive = g_queue_new ();
+       elt->pool = pool;
+
+       return elt;
+}
+
+struct rspamd_redis_pool *
+rspamd_redis_pool_init (void)
+{
+       struct rspamd_redis_pool *pool;
+
+       pool = (struct rspamd_redis_pool *)g_malloc0 (sizeof (*pool));
+       pool->elts_by_key = g_hash_table_new_full (g_int64_hash, g_int64_equal,
+                       NULL, rspamd_redis_pool_elt_dtor);
+       pool->elts_by_ctx = g_hash_table_new (g_direct_hash, g_direct_equal);
+
+       return pool;
+}
+
+void
+rspamd_redis_pool_config (struct rspamd_redis_pool *pool,
+               struct rspamd_config *cfg,
+               struct ev_loop *ev_base)
+{
+       g_assert (pool != NULL);
+
+       pool->event_loop = ev_base;
+       pool->cfg = cfg;
+       pool->timeout = default_timeout;
+       pool->max_conns = default_max_conns;
+}
+
+
+struct redisAsyncContext*
+rspamd_redis_pool_connect (struct rspamd_redis_pool *pool,
+               const gchar *db, const gchar *password,
+               const char *ip, int port)
+{
+       guint64 key;
+       struct rspamd_redis_pool_elt *elt;
+       GList *conn_entry;
+       struct rspamd_redis_pool_connection *conn;
+
+       g_assert (pool != NULL);
+       g_assert (pool->event_loop != NULL);
+       g_assert (ip != NULL);
+
+       key = rspamd_redis_pool_get_key (db, password, ip, port);
+       elt = (struct rspamd_redis_pool_elt *)g_hash_table_lookup (pool->elts_by_key, &key);
+
+       if (elt) {
+               if (g_queue_get_length (elt->inactive) > 0) {
+                       conn_entry = g_queue_pop_head_link (elt->inactive);
+                       conn = (struct rspamd_redis_pool_connection *)conn_entry->data;
+                       g_assert (conn->state != RSPAMD_REDIS_POOL_CONN_ACTIVE);
+
+                       if (conn->ctx->err == REDIS_OK) {
+                               /* Also check SO_ERROR */
+                               gint err;
+                               socklen_t len = sizeof (gint);
+
+                               if (getsockopt (conn->ctx->c.fd, SOL_SOCKET, SO_ERROR,
+                                               (void *) &err, &len) == -1) {
+                                       err = errno;
+                               }
+
+                               if (err != 0) {
+                                       g_list_free (conn->entry);
+                                       conn->entry = NULL;
+                                       REF_RELEASE (conn);
+                                       conn = rspamd_redis_pool_new_connection (pool, elt,
+                                                       db, password, ip, port);
+                               }
+                               else {
+
+                                       ev_timer_stop (elt->pool->event_loop, &conn->timeout);
+                                       conn->state = RSPAMD_REDIS_POOL_CONN_ACTIVE;
+                                       g_queue_push_tail_link (elt->active, conn_entry);
+                                       msg_debug_rpool ("reused existing connection to %s:%d: %p",
+                                                       ip, port, conn->ctx);
+                               }
+                       }
+                       else {
+                               g_list_free (conn->entry);
+                               conn->entry = NULL;
+                               REF_RELEASE (conn);
+                               conn = rspamd_redis_pool_new_connection (pool, elt,
+                                               db, password, ip, port);
+                       }
+
+               }
+               else {
+                       /* Need to create connection */
+                       conn = rspamd_redis_pool_new_connection (pool, elt,
+                                       db, password, ip, port);
+               }
+       }
+       else {
+               /* Need to create a pool */
+               elt = rspamd_redis_pool_new_elt (pool);
+               elt->key = key;
+               g_hash_table_insert (pool->elts_by_key, &elt->key, elt);
+
+               conn = rspamd_redis_pool_new_connection (pool, elt,
+                               db, password, ip, port);
+       }
+
+       if (!conn) {
+               return NULL;
+       }
+
+       REF_RETAIN (conn);
+
+       return conn->ctx;
+}
+
+
+void
+rspamd_redis_pool_release_connection (struct rspamd_redis_pool *pool,
+               struct redisAsyncContext *ctx, enum rspamd_redis_pool_release_type how)
+{
+       struct rspamd_redis_pool_connection *conn;
+
+       g_assert (pool != NULL);
+       g_assert (ctx != NULL);
+
+       conn = (struct rspamd_redis_pool_connection *)g_hash_table_lookup (pool->elts_by_ctx, ctx);
+       if (conn != NULL) {
+               g_assert (conn->state == RSPAMD_REDIS_POOL_CONN_ACTIVE);
+
+               if (ctx->err != REDIS_OK) {
+                       /* We need to terminate connection forcefully */
+                       msg_debug_rpool ("closed connection %p due to an error", conn->ctx);
+                       REF_RELEASE (conn);
+               }
+               else {
+                       if (how == RSPAMD_REDIS_RELEASE_DEFAULT) {
+                               /* Ensure that there are no callbacks attached to this conn */
+                               if (ctx->replies.head == NULL) {
+                                       /* Just move it to the inactive queue */
+                                       g_queue_unlink (conn->elt->active, conn->entry);
+                                       g_queue_push_head_link (conn->elt->inactive, conn->entry);
+                                       conn->state = RSPAMD_REDIS_POOL_CONN_INACTIVE;
+                                       rspamd_redis_pool_schedule_timeout (conn);
+                                       msg_debug_rpool ("mark connection %p inactive", conn->ctx);
+                               }
+                               else {
+                                       msg_debug_rpool ("closed connection %p due to callbacks left",
+                                                       conn->ctx);
+                                       REF_RELEASE (conn);
+                               }
+                       }
+                       else {
+                               if (how == RSPAMD_REDIS_RELEASE_FATAL) {
+                                       msg_debug_rpool ("closed connection %p due to an fatal termination",
+                                                       conn->ctx);
+                               }
+                               else {
+                                       msg_debug_rpool ("closed connection %p due to explicit termination",
+                                                       conn->ctx);
+                               }
+
+                               REF_RELEASE (conn);
+                       }
+               }
+
+               REF_RELEASE (conn);
+       }
+       else {
+               g_assert_not_reached ();
+       }
+}
+
+
+void
+rspamd_redis_pool_destroy (struct rspamd_redis_pool *pool)
+{
+       struct rspamd_redis_pool_elt *elt;
+       GHashTableIter it;
+       gpointer k, v;
+
+       g_assert (pool != NULL);
+
+       g_hash_table_iter_init (&it, pool->elts_by_key);
+
+       while (g_hash_table_iter_next (&it, &k, &v)) {
+               elt = (struct rspamd_redis_pool_elt *)v;
+               rspamd_redis_pool_elt_dtor (elt);
+               g_hash_table_iter_steal (&it);
+       }
+
+       g_hash_table_unref (pool->elts_by_ctx);
+       g_hash_table_unref (pool->elts_by_key);
+
+       g_free (pool);
+}
+
+const gchar*
+rspamd_redis_type_to_string (int type)
+{
+       const gchar *ret = "unknown";
+
+       switch (type) {
+       case REDIS_REPLY_STRING:
+               ret = "string";
+               break;
+       case REDIS_REPLY_ARRAY:
+               ret = "array";
+               break;
+       case REDIS_REPLY_INTEGER:
+               ret = "int";
+               break;
+       case REDIS_REPLY_STATUS:
+               ret = "status";
+               break;
+       case REDIS_REPLY_NIL:
+               ret = "nil";
+               break;
+       case REDIS_REPLY_ERROR:
+               ret = "error";
+               break;
+       default:
+               break;
+       }
+
+       return ret;
+}