]> source.dussan.org Git - rspamd.git/commitdiff
[Rework] Further rework of the redis pool
authorVsevolod Stakhov <vsevolod@highsecure.ru>
Sat, 11 Sep 2021 19:53:29 +0000 (20:53 +0100)
committerVsevolod Stakhov <vsevolod@highsecure.ru>
Sat, 11 Sep 2021 19:53:29 +0000 (20:53 +0100)
src/libserver/redis_pool.cxx

index a059ea6fee72f570c6ffae9661aa441e8bb62cc3..9053cc0e79dfaa9c970454cb6c681fd7c303799d 100644 (file)
@@ -29,8 +29,8 @@
 #include "libutil/cxx/local_shared_ptr.hxx"
 
 namespace rspamd {
-struct redis_pool_elt;
-struct redis_pool;
+class redis_pool_elt;
+class redis_pool;
 
 #define msg_err_rpool(...) rspamd_default_log_function (G_LOG_LEVEL_CRITICAL, \
         "redis_pool", conn->tag, \
@@ -61,31 +61,34 @@ struct redis_pool_connection {
        using redis_pool_connection_ptr = std::unique_ptr<redis_pool_connection>;
        using conn_iter_t = std::list<redis_pool_connection_ptr>::iterator;
        struct redisAsyncContext *ctx;
-       struct redis_pool_elt *elt;
-       struct redis_pool *pool;
+       redis_pool_elt *elt;
+       redis_pool *pool;
        conn_iter_t elt_pos;
        ev_timer timeout;
        enum rspamd_redis_pool_connection_state state;
        gchar tag[MEMPOOL_UID_LEN];
 
-       auto schedule_timeout () -> void;
+       auto schedule_timeout() -> void;
+
        ~redis_pool_connection();
 
-       explicit redis_pool_connection(struct redis_pool *_pool,
-                                                                       struct redis_pool_elt *_elt,
-                                                                       const char *db,
-                                                                       const char *password,
+       explicit redis_pool_connection(redis_pool *_pool,
+                                                                  redis_pool_elt *_elt,
+                                                                  const char *db,
+                                                                  const char *password,
                                                                   struct redisAsyncContext *_ctx);
 
 private:
        static auto redis_conn_timeout_cb(EV_P_ ev_timer *w, int revents) -> void;
+
        static auto redis_quit_cb(redisAsyncContext *c, void *r, void *priv) -> void;
+
        static auto redis_on_disconnect(const struct redisAsyncContext *ac, int status) -> auto;
 };
 
 
 using redis_pool_key_t = std::uint64_t;
-struct redis_pool;
+class redis_pool;
 
 class redis_pool_elt {
        using redis_pool_connection_ptr = std::unique_ptr<redis_pool_connection>;
@@ -106,15 +109,31 @@ public:
        explicit redis_pool_elt(redis_pool *_pool,
                                                        const gchar *_db, const gchar *_password,
                                                        const char *_ip, int _port)
-                       : pool(_pool), ip(_ip), db(_db), port(_port), password(_password),
-                       key(redis_pool_elt::make_key(_db, _password, _ip, _port))
+                       : pool(_pool), ip(_ip), db(_db), password(_password), port(_port),
+                         key(redis_pool_elt::make_key(_db, _password, _ip, _port))
        {
                is_unix = ip[0] == '.' || ip[0] == '/';
        }
 
        auto new_connection() -> redisAsyncContext *;
+
+       auto release_active(const redis_pool_connection *conn) -> void
+       {
+               active.erase(conn->elt_pos);
+       }
+
+       auto release_inactive(const redis_pool_connection *conn) -> void
+       {
+               inactive.erase(conn->elt_pos);
+       }
+
+       auto move_to_inactive(const redis_pool_connection *conn) -> void
+       {
+               inactive.splice(std::end(inactive), active, conn->elt_pos);
+       }
+
        inline static auto make_key(const gchar *db, const gchar *password,
-                                          const char *ip, int port) -> redis_pool_key_t
+                                                               const char *ip, int port) -> redis_pool_key_t
        {
                rspamd_cryptobox_fast_hash_state_t st;
 
@@ -132,8 +151,14 @@ public:
 
                return rspamd_cryptobox_fast_hash_final(&st);
        }
+
+       auto num_active() const -> auto
+       {
+               return active.size();
+       }
+
 private:
-       auto redis_async_new() -> redisAsyncContext*
+       auto redis_async_new() -> redisAsyncContext *
        {
                struct redisAsyncContext *ctx;
 
@@ -164,30 +189,40 @@ class redis_pool {
        robin_hood::unordered_node_map<redis_pool_key_t, redis_pool_elt> elts_by_key;
        robin_hood::unordered_flat_map<redisAsyncContext *,
                        redis_pool_connection *> conns_by_ctx;
+public:
        double timeout = default_timeout;
        unsigned max_conns = default_max_conns;
-public:
        struct ev_loop *event_loop;
        struct rspamd_config *cfg;
 
 public:
-       explicit redis_pool() : event_loop(nullptr), cfg(nullptr) {
+       explicit redis_pool() : event_loop(nullptr), cfg(nullptr)
+       {
                conns_by_ctx.reserve(max_conns);
        }
 
        /* Legacy stuff */
-       auto do_config(struct ev_loop *_loop, struct rspamd_config *_cfg) -> void {
+       auto do_config(struct ev_loop *_loop, struct rspamd_config *_cfg) -> void
+       {
                event_loop = _loop;
                cfg = _cfg;
        }
 
        auto new_connection(const gchar *db, const gchar *password,
                                                const char *ip, int port) -> redisAsyncContext *;
-       auto release_connection(redisAsyncContext *ctx) -> void;
 
-       auto unregister_context(redisAsyncContext *ctx) -> void {
+       auto release_connection(redisAsyncContext *ctx,
+                                                       enum rspamd_redis_pool_release_type how) -> void;
+
+       auto unregister_context(redisAsyncContext *ctx) -> void
+       {
                conns_by_ctx.erase(ctx);
        }
+
+       auto register_context(redisAsyncContext *ctx, redis_pool_connection *conn)
+       {
+               conns_by_ctx.emplace(ctx, conn);
+       }
 };
 
 
@@ -230,7 +265,7 @@ auto
 redis_pool_connection::redis_quit_cb(redisAsyncContext *c, void *r, void *priv) -> void
 {
        struct redis_pool_connection *conn =
-                       (struct redis_pool_connection *)priv;
+                       (struct redis_pool_connection *) priv;
 
        msg_debug_rpool("quit command reply for the connection %p",
                        conn->ctx);
@@ -255,7 +290,7 @@ redis_pool_connection::redis_quit_cb(redisAsyncContext *c, void *r, void *priv)
 auto
 redis_pool_connection::redis_conn_timeout_cb(EV_P_ ev_timer *w, int revents) -> void
 {
-       auto *conn = (struct redis_pool_connection *)w->data;
+       auto *conn = (struct redis_pool_connection *) w->data;
 
        g_assert (conn->state != RSPAMD_REDIS_POOL_CONN_ACTIVE);
 
@@ -273,7 +308,7 @@ redis_pool_connection::redis_conn_timeout_cb(EV_P_ ev_timer *w, int revents) ->
                                conn->ctx);
 
                /* Erasure of shared pointer will cause it to be removed */
-               conn->elt->inactive.erase(conn->elt_pos);
+               conn->elt->release_inactive(conn);
        }
 
 }
@@ -281,7 +316,7 @@ redis_pool_connection::redis_conn_timeout_cb(EV_P_ ev_timer *w, int revents) ->
 auto
 redis_pool_connection::redis_on_disconnect(const struct redisAsyncContext *ac, int status) -> auto
 {
-       auto *conn = (struct redis_pool_connection *)ac->data;
+       auto *conn = (struct redis_pool_connection *) ac->data;
 
        /*
         * Here, we know that redis itself will free this connection
@@ -295,7 +330,7 @@ redis_pool_connection::redis_on_disconnect(const struct redisAsyncContext *ac, i
                }
 
                /* Erasure of shared pointer will cause it to be removed */
-               conn->elt->inactive.erase(conn->elt_pos);
+               conn->elt->release_inactive(conn);
        }
 }
 
@@ -304,15 +339,15 @@ redis_pool_connection::schedule_timeout() -> void
 {
        const auto *conn = this; /* For debug */
        double real_timeout;
-       auto active_elts = elt->active.size();
+       auto active_elts = elt->num_active();
 
        if (active_elts > pool->max_conns) {
                real_timeout = pool->timeout / 2.0;
-               real_timeout = rspamd_time_jitter (real_timeout, real_timeout / 4.0);
+               real_timeout = rspamd_time_jitter(real_timeout, real_timeout / 4.0);
        }
        else {
                real_timeout = pool->timeout;
-               real_timeout = rspamd_time_jitter (real_timeout, real_timeout / 2.0);
+               real_timeout = rspamd_time_jitter(real_timeout, real_timeout / 2.0);
        }
 
        msg_debug_rpool("scheduled connection %p cleanup in %.1f seconds",
@@ -326,8 +361,8 @@ redis_pool_connection::schedule_timeout() -> void
 }
 
 
-redis_pool_connection::redis_pool_connection(struct redis_pool *_pool,
-                                                                                        struct redis_pool_elt *_elt,
+redis_pool_connection::redis_pool_connection(redis_pool *_pool,
+                                                                                        redis_pool_elt *_elt,
                                                                                         const char *db,
                                                                                         const char *password,
                                                                                         struct redisAsyncContext *_ctx)
@@ -336,9 +371,9 @@ redis_pool_connection::redis_pool_connection(struct redis_pool *_pool,
 
        state = RSPAMD_REDIS_POOL_CONN_ACTIVE;
 
-       pool->conns_by_ctx.emplace(ctx, this);
+       pool->register_context(ctx, this);
        ctx->data = this;
-       rspamd_random_hex((guchar *)tag, sizeof(tag));
+       rspamd_random_hex((guchar *) tag, sizeof(tag));
 
        redisLibevAttach(pool->event_loop, ctx);
        redisAsyncSetDisconnectCallback(ctx, redis_pool_connection::redis_on_disconnect);
@@ -384,14 +419,16 @@ redis_pool_elt::new_connection() -> redisAsyncContext *
                                conn->state = RSPAMD_REDIS_POOL_CONN_ACTIVE;
                                msg_debug_rpool("reused existing connection to %s:%d: %p",
                                                ip.c_str(), port, conn->ctx);
-                               active.emplace_back(std::move(conn));
+                               active.emplace_front(std::move(conn));
+                               active.front()->elt_pos = active.begin();
                        }
                }
                else {
                        auto *nctx = redis_async_new();
                        if (nctx) {
-                               active.emplace_back(std::make_unique<redis_pool_connection>(pool, this,
+                               active.emplace_front(std::make_unique<redis_pool_connection>(pool, this,
                                                db.c_str(), password.c_str(), nctx));
+                               active.front()->elt_pos = active.begin();
                        }
 
                        return nctx;
@@ -400,17 +437,20 @@ redis_pool_elt::new_connection() -> redisAsyncContext *
        else {
                auto *nctx = redis_async_new();
                if (nctx) {
-                       active.emplace_back(std::make_unique<redis_pool_connection>(pool, this,
+                       active.emplace_front(std::make_unique<redis_pool_connection>(pool, this,
                                        db.c_str(), password.c_str(), nctx));
+                       active.front()->elt_pos = active.begin();
                }
 
                return nctx;
        }
+
+       RSPAMD_UNREACHABLE;
 }
 
 auto
 redis_pool::new_connection(const gchar *db, const gchar *password,
-                                       const char *ip, int port) -> redisAsyncContext *
+                                                  const char *ip, int port) -> redisAsyncContext *
 {
 
        auto key = redis_pool_elt::make_key(db, password, ip, port);
@@ -430,120 +470,110 @@ redis_pool::new_connection(const gchar *db, const gchar *password,
        }
 }
 
-}
-
-void *
-rspamd_redis_pool_init (void)
-{
-       return new rspamd::redis_pool{};
-}
-
-void
-rspamd_redis_pool_config (void *p,
-               struct rspamd_config *cfg,
-               struct ev_loop *ev_base)
-{
-       g_assert (p != NULL);
-       auto *pool = reinterpret_cast<struct rspamd::redis_pool *>(p);
-
-       pool->do_config(ev_base, cfg);
-}
-
-
-struct redisAsyncContext*
-rspamd_redis_pool_connect (void *p,
-               const gchar *db, const gchar *password,
-               const char *ip, int port)
-{
-       g_assert (p != NULL);
-       auto *pool = reinterpret_cast<struct rspamd::redis_pool *>(p);
-
-       return pool->new_connection(db, password, ip, port);
-}
-
-
-void
-rspamd_redis_pool_release_connection (struct rspamd_redis_pool *pool,
-               struct redisAsyncContext *ctx, enum rspamd_redis_pool_release_type how)
+auto redis_pool::release_connection(redisAsyncContext *ctx,
+                                                                       enum rspamd_redis_pool_release_type how) -> void
 {
-       struct rspamd_redis_pool_connection *conn;
-
-       g_assert (pool != NULL);
-       g_assert (ctx != NULL);
-
-       conn = (struct rspamd_redis_pool_connection *)g_hash_table_lookup (pool->elts_by_ctx, ctx);
-       if (conn != NULL) {
+       auto conn_it = conns_by_ctx.find(ctx);
+       if (conn_it != conns_by_ctx.end()) {
+               auto *conn = conn_it->second;
                g_assert (conn->state == RSPAMD_REDIS_POOL_CONN_ACTIVE);
 
                if (ctx->err != REDIS_OK) {
                        /* We need to terminate connection forcefully */
                        msg_debug_rpool ("closed connection %p due to an error", conn->ctx);
-                       REF_RELEASE (conn);
+                       conn->elt->release_active(conn);
                }
                else {
                        if (how == RSPAMD_REDIS_RELEASE_DEFAULT) {
                                /* Ensure that there are no callbacks attached to this conn */
-                               if (ctx->replies.head == NULL) {
+                               if (ctx->replies.head == nullptr) {
                                        /* Just move it to the inactive queue */
-                                       g_queue_unlink (conn->elt->active, conn->entry);
-                                       g_queue_push_head_link (conn->elt->inactive, conn->entry);
                                        conn->state = RSPAMD_REDIS_POOL_CONN_INACTIVE;
-                                       rspamd_redis_pool_schedule_timeout (conn);
-                                       msg_debug_rpool ("mark connection %p inactive", conn->ctx);
+                                       conn->elt->move_to_inactive(conn);
+                                       conn->schedule_timeout();
+                                       msg_debug_rpool("mark connection %p inactive", conn->ctx);
                                }
                                else {
-                                       msg_debug_rpool ("closed connection %p due to callbacks left",
+                                       msg_debug_rpool("closed connection %p due to callbacks left",
                                                        conn->ctx);
-                                       REF_RELEASE (conn);
+                                       conn->elt->release_active(conn);
                                }
                        }
                        else {
                                if (how == RSPAMD_REDIS_RELEASE_FATAL) {
-                                       msg_debug_rpool ("closed connection %p due to an fatal termination",
+                                       msg_debug_rpool("closed connection %p due to an fatal termination",
                                                        conn->ctx);
                                }
                                else {
-                                       msg_debug_rpool ("closed connection %p due to explicit termination",
+                                       msg_debug_rpool("closed connection %p due to explicit termination",
                                                        conn->ctx);
                                }
 
-                               REF_RELEASE (conn);
+                               conn->elt->release_active(conn);
                        }
                }
 
-               REF_RELEASE (conn);
+               conn->elt->release_active(conn);
        }
        else {
-               g_assert_not_reached ();
+               RSPAMD_UNREACHABLE;
        }
 }
 
+}
+
+void *
+rspamd_redis_pool_init(void)
+{
+       return new rspamd::redis_pool{};
+}
 
 void
-rspamd_redis_pool_destroy (struct rspamd_redis_pool *pool)
+rspamd_redis_pool_config(void *p,
+                                                struct rspamd_config *cfg,
+                                                struct ev_loop *ev_base)
 {
-       struct rspamd_redis_pool_elt *elt;
-       GHashTableIter it;
-       gpointer k, v;
+       g_assert (p != NULL);
+       auto *pool = reinterpret_cast<class rspamd::redis_pool *>(p);
 
-       g_assert (pool != NULL);
+       pool->do_config(ev_base, cfg);
+}
 
-       g_hash_table_iter_init (&it, pool->elts_by_key);
 
-       while (g_hash_table_iter_next (&it, &k, &v)) {
-               elt = (struct rspamd_redis_pool_elt *)v;
-               rspamd_redis_pool_elt_dtor (elt);
-               g_hash_table_iter_steal (&it);
-       }
+struct redisAsyncContext *
+rspamd_redis_pool_connect(void *p,
+                                                 const gchar *db, const gchar *password,
+                                                 const char *ip, int port)
+{
+       g_assert (p != NULL);
+       auto *pool = reinterpret_cast<class rspamd::redis_pool *>(p);
+
+       return pool->new_connection(db, password, ip, port);
+}
+
 
-       g_hash_table_unref (pool->elts_by_ctx);
-       g_hash_table_unref (pool->elts_by_key);
+void
+rspamd_redis_pool_release_connection(void *p,
+                                                                        struct redisAsyncContext *ctx, enum rspamd_redis_pool_release_type how)
+{
+       g_assert (p != NULL);
+       g_assert (ctx != NULL);
+       auto *pool = reinterpret_cast<class rspamd::redis_pool *>(p);
+
+       pool->release_connection(ctx, how);
+}
+
+
+void
+rspamd_redis_pool_destroy(void *p)
+{
+       auto *pool = reinterpret_cast<class rspamd::redis_pool *>(p);
 
-       g_free (pool);
+       delete pool;
 }
 
-const gchar*
-rspamd_redis_type_to_string (int type)
+const gchar *
+rspamd_redis_type_to_string(int type)
 {
        const gchar *ret = "unknown";