diff options
Diffstat (limited to 'src/libserver/redis_pool.cxx')
-rw-r--r-- | src/libserver/redis_pool.cxx | 242 |
1 files changed, 136 insertions, 106 deletions
diff --git a/src/libserver/redis_pool.cxx b/src/libserver/redis_pool.cxx index a059ea6fe..9053cc0e7 100644 --- a/src/libserver/redis_pool.cxx +++ b/src/libserver/redis_pool.cxx @@ -29,8 +29,8 @@ #include "libutil/cxx/local_shared_ptr.hxx" namespace rspamd { -struct redis_pool_elt; -struct redis_pool; +class redis_pool_elt; +class redis_pool; #define msg_err_rpool(...) rspamd_default_log_function (G_LOG_LEVEL_CRITICAL, \ "redis_pool", conn->tag, \ @@ -61,31 +61,34 @@ struct redis_pool_connection { using redis_pool_connection_ptr = std::unique_ptr<redis_pool_connection>; using conn_iter_t = std::list<redis_pool_connection_ptr>::iterator; struct redisAsyncContext *ctx; - struct redis_pool_elt *elt; - struct redis_pool *pool; + redis_pool_elt *elt; + redis_pool *pool; conn_iter_t elt_pos; ev_timer timeout; enum rspamd_redis_pool_connection_state state; gchar tag[MEMPOOL_UID_LEN]; - auto schedule_timeout () -> void; + auto schedule_timeout() -> void; + ~redis_pool_connection(); - explicit redis_pool_connection(struct redis_pool *_pool, - struct redis_pool_elt *_elt, - const char *db, - const char *password, + explicit redis_pool_connection(redis_pool *_pool, + redis_pool_elt *_elt, + const char *db, + const char *password, struct redisAsyncContext *_ctx); private: static auto redis_conn_timeout_cb(EV_P_ ev_timer *w, int revents) -> void; + static auto redis_quit_cb(redisAsyncContext *c, void *r, void *priv) -> void; + static auto redis_on_disconnect(const struct redisAsyncContext *ac, int status) -> auto; }; using redis_pool_key_t = std::uint64_t; -struct redis_pool; +class redis_pool; class redis_pool_elt { using redis_pool_connection_ptr = std::unique_ptr<redis_pool_connection>; @@ -106,15 +109,31 @@ public: explicit redis_pool_elt(redis_pool *_pool, const gchar *_db, const gchar *_password, const char *_ip, int _port) - : pool(_pool), ip(_ip), db(_db), port(_port), password(_password), - key(redis_pool_elt::make_key(_db, _password, _ip, _port)) + : pool(_pool), ip(_ip), db(_db), password(_password), port(_port), + key(redis_pool_elt::make_key(_db, _password, _ip, _port)) { is_unix = ip[0] == '.' || ip[0] == '/'; } auto new_connection() -> redisAsyncContext *; + + auto release_active(const redis_pool_connection *conn) -> void + { + active.erase(conn->elt_pos); + } + + auto release_inactive(const redis_pool_connection *conn) -> void + { + inactive.erase(conn->elt_pos); + } + + auto move_to_inactive(const redis_pool_connection *conn) -> void + { + inactive.splice(std::end(inactive), active, conn->elt_pos); + } + inline static auto make_key(const gchar *db, const gchar *password, - const char *ip, int port) -> redis_pool_key_t + const char *ip, int port) -> redis_pool_key_t { rspamd_cryptobox_fast_hash_state_t st; @@ -132,8 +151,14 @@ public: return rspamd_cryptobox_fast_hash_final(&st); } + + auto num_active() const -> auto + { + return active.size(); + } + private: - auto redis_async_new() -> redisAsyncContext* + auto redis_async_new() -> redisAsyncContext * { struct redisAsyncContext *ctx; @@ -164,30 +189,40 @@ class redis_pool { robin_hood::unordered_node_map<redis_pool_key_t, redis_pool_elt> elts_by_key; robin_hood::unordered_flat_map<redisAsyncContext *, redis_pool_connection *> conns_by_ctx; +public: double timeout = default_timeout; unsigned max_conns = default_max_conns; -public: struct ev_loop *event_loop; struct rspamd_config *cfg; public: - explicit redis_pool() : event_loop(nullptr), cfg(nullptr) { + explicit redis_pool() : event_loop(nullptr), cfg(nullptr) + { conns_by_ctx.reserve(max_conns); } /* Legacy stuff */ - auto do_config(struct ev_loop *_loop, struct rspamd_config *_cfg) -> void { + auto do_config(struct ev_loop *_loop, struct rspamd_config *_cfg) -> void + { event_loop = _loop; cfg = _cfg; } auto new_connection(const gchar *db, const gchar *password, const char *ip, int port) -> redisAsyncContext *; - auto release_connection(redisAsyncContext *ctx) -> void; - auto unregister_context(redisAsyncContext *ctx) -> void { + auto release_connection(redisAsyncContext *ctx, + enum rspamd_redis_pool_release_type how) -> void; + + auto unregister_context(redisAsyncContext *ctx) -> void + { conns_by_ctx.erase(ctx); } + + auto register_context(redisAsyncContext *ctx, redis_pool_connection *conn) + { + conns_by_ctx.emplace(ctx, conn); + } }; @@ -230,7 +265,7 @@ auto redis_pool_connection::redis_quit_cb(redisAsyncContext *c, void *r, void *priv) -> void { struct redis_pool_connection *conn = - (struct redis_pool_connection *)priv; + (struct redis_pool_connection *) priv; msg_debug_rpool("quit command reply for the connection %p", conn->ctx); @@ -255,7 +290,7 @@ redis_pool_connection::redis_quit_cb(redisAsyncContext *c, void *r, void *priv) auto redis_pool_connection::redis_conn_timeout_cb(EV_P_ ev_timer *w, int revents) -> void { - auto *conn = (struct redis_pool_connection *)w->data; + auto *conn = (struct redis_pool_connection *) w->data; g_assert (conn->state != RSPAMD_REDIS_POOL_CONN_ACTIVE); @@ -273,7 +308,7 @@ redis_pool_connection::redis_conn_timeout_cb(EV_P_ ev_timer *w, int revents) -> conn->ctx); /* Erasure of shared pointer will cause it to be removed */ - conn->elt->inactive.erase(conn->elt_pos); + conn->elt->release_inactive(conn); } } @@ -281,7 +316,7 @@ redis_pool_connection::redis_conn_timeout_cb(EV_P_ ev_timer *w, int revents) -> auto redis_pool_connection::redis_on_disconnect(const struct redisAsyncContext *ac, int status) -> auto { - auto *conn = (struct redis_pool_connection *)ac->data; + auto *conn = (struct redis_pool_connection *) ac->data; /* * Here, we know that redis itself will free this connection @@ -295,7 +330,7 @@ redis_pool_connection::redis_on_disconnect(const struct redisAsyncContext *ac, i } /* Erasure of shared pointer will cause it to be removed */ - conn->elt->inactive.erase(conn->elt_pos); + conn->elt->release_inactive(conn); } } @@ -304,15 +339,15 @@ redis_pool_connection::schedule_timeout() -> void { const auto *conn = this; /* For debug */ double real_timeout; - auto active_elts = elt->active.size(); + auto active_elts = elt->num_active(); if (active_elts > pool->max_conns) { real_timeout = pool->timeout / 2.0; - real_timeout = rspamd_time_jitter (real_timeout, real_timeout / 4.0); + real_timeout = rspamd_time_jitter(real_timeout, real_timeout / 4.0); } else { real_timeout = pool->timeout; - real_timeout = rspamd_time_jitter (real_timeout, real_timeout / 2.0); + real_timeout = rspamd_time_jitter(real_timeout, real_timeout / 2.0); } msg_debug_rpool("scheduled connection %p cleanup in %.1f seconds", @@ -326,8 +361,8 @@ redis_pool_connection::schedule_timeout() -> void } -redis_pool_connection::redis_pool_connection(struct redis_pool *_pool, - struct redis_pool_elt *_elt, +redis_pool_connection::redis_pool_connection(redis_pool *_pool, + redis_pool_elt *_elt, const char *db, const char *password, struct redisAsyncContext *_ctx) @@ -336,9 +371,9 @@ redis_pool_connection::redis_pool_connection(struct redis_pool *_pool, state = RSPAMD_REDIS_POOL_CONN_ACTIVE; - pool->conns_by_ctx.emplace(ctx, this); + pool->register_context(ctx, this); ctx->data = this; - rspamd_random_hex((guchar *)tag, sizeof(tag)); + rspamd_random_hex((guchar *) tag, sizeof(tag)); redisLibevAttach(pool->event_loop, ctx); redisAsyncSetDisconnectCallback(ctx, redis_pool_connection::redis_on_disconnect); @@ -384,14 +419,16 @@ redis_pool_elt::new_connection() -> redisAsyncContext * conn->state = RSPAMD_REDIS_POOL_CONN_ACTIVE; msg_debug_rpool("reused existing connection to %s:%d: %p", ip.c_str(), port, conn->ctx); - active.emplace_back(std::move(conn)); + active.emplace_front(std::move(conn)); + active.front()->elt_pos = active.begin(); } } else { auto *nctx = redis_async_new(); if (nctx) { - active.emplace_back(std::make_unique<redis_pool_connection>(pool, this, + active.emplace_front(std::make_unique<redis_pool_connection>(pool, this, db.c_str(), password.c_str(), nctx)); + active.front()->elt_pos = active.begin(); } return nctx; @@ -400,17 +437,20 @@ redis_pool_elt::new_connection() -> redisAsyncContext * else { auto *nctx = redis_async_new(); if (nctx) { - active.emplace_back(std::make_unique<redis_pool_connection>(pool, this, + active.emplace_front(std::make_unique<redis_pool_connection>(pool, this, db.c_str(), password.c_str(), nctx)); + active.front()->elt_pos = active.begin(); } return nctx; } + + RSPAMD_UNREACHABLE; } auto redis_pool::new_connection(const gchar *db, const gchar *password, - const char *ip, int port) -> redisAsyncContext * + const char *ip, int port) -> redisAsyncContext * { auto key = redis_pool_elt::make_key(db, password, ip, port); @@ -430,120 +470,110 @@ redis_pool::new_connection(const gchar *db, const gchar *password, } } -} - -void * -rspamd_redis_pool_init (void) -{ - return new rspamd::redis_pool{}; -} - -void -rspamd_redis_pool_config (void *p, - struct rspamd_config *cfg, - struct ev_loop *ev_base) -{ - g_assert (p != NULL); - auto *pool = reinterpret_cast<struct rspamd::redis_pool *>(p); - - pool->do_config(ev_base, cfg); -} - - -struct redisAsyncContext* -rspamd_redis_pool_connect (void *p, - const gchar *db, const gchar *password, - const char *ip, int port) -{ - g_assert (p != NULL); - auto *pool = reinterpret_cast<struct rspamd::redis_pool *>(p); - - return pool->new_connection(db, password, ip, port); -} - - -void -rspamd_redis_pool_release_connection (struct rspamd_redis_pool *pool, - struct redisAsyncContext *ctx, enum rspamd_redis_pool_release_type how) +auto redis_pool::release_connection(redisAsyncContext *ctx, + enum rspamd_redis_pool_release_type how) -> void { - struct rspamd_redis_pool_connection *conn; - - g_assert (pool != NULL); - g_assert (ctx != NULL); - - conn = (struct rspamd_redis_pool_connection *)g_hash_table_lookup (pool->elts_by_ctx, ctx); - if (conn != NULL) { + auto conn_it = conns_by_ctx.find(ctx); + if (conn_it != conns_by_ctx.end()) { + auto *conn = conn_it->second; g_assert (conn->state == RSPAMD_REDIS_POOL_CONN_ACTIVE); if (ctx->err != REDIS_OK) { /* We need to terminate connection forcefully */ msg_debug_rpool ("closed connection %p due to an error", conn->ctx); - REF_RELEASE (conn); + conn->elt->release_active(conn); } else { if (how == RSPAMD_REDIS_RELEASE_DEFAULT) { /* Ensure that there are no callbacks attached to this conn */ - if (ctx->replies.head == NULL) { + if (ctx->replies.head == nullptr) { /* Just move it to the inactive queue */ - g_queue_unlink (conn->elt->active, conn->entry); - g_queue_push_head_link (conn->elt->inactive, conn->entry); conn->state = RSPAMD_REDIS_POOL_CONN_INACTIVE; - rspamd_redis_pool_schedule_timeout (conn); - msg_debug_rpool ("mark connection %p inactive", conn->ctx); + conn->elt->move_to_inactive(conn); + conn->schedule_timeout(); + msg_debug_rpool("mark connection %p inactive", conn->ctx); } else { - msg_debug_rpool ("closed connection %p due to callbacks left", + msg_debug_rpool("closed connection %p due to callbacks left", conn->ctx); - REF_RELEASE (conn); + conn->elt->release_active(conn); } } else { if (how == RSPAMD_REDIS_RELEASE_FATAL) { - msg_debug_rpool ("closed connection %p due to an fatal termination", + msg_debug_rpool("closed connection %p due to an fatal termination", conn->ctx); } else { - msg_debug_rpool ("closed connection %p due to explicit termination", + msg_debug_rpool("closed connection %p due to explicit termination", conn->ctx); } - REF_RELEASE (conn); + conn->elt->release_active(conn); } } - REF_RELEASE (conn); + conn->elt->release_active(conn); } else { - g_assert_not_reached (); + RSPAMD_UNREACHABLE; } } +} + +void * +rspamd_redis_pool_init(void) +{ + return new rspamd::redis_pool{}; +} void -rspamd_redis_pool_destroy (struct rspamd_redis_pool *pool) +rspamd_redis_pool_config(void *p, + struct rspamd_config *cfg, + struct ev_loop *ev_base) { - struct rspamd_redis_pool_elt *elt; - GHashTableIter it; - gpointer k, v; + g_assert (p != NULL); + auto *pool = reinterpret_cast<class rspamd::redis_pool *>(p); - g_assert (pool != NULL); + pool->do_config(ev_base, cfg); +} - g_hash_table_iter_init (&it, pool->elts_by_key); - while (g_hash_table_iter_next (&it, &k, &v)) { - elt = (struct rspamd_redis_pool_elt *)v; - rspamd_redis_pool_elt_dtor (elt); - g_hash_table_iter_steal (&it); - } +struct redisAsyncContext * +rspamd_redis_pool_connect(void *p, + const gchar *db, const gchar *password, + const char *ip, int port) +{ + g_assert (p != NULL); + auto *pool = reinterpret_cast<class rspamd::redis_pool *>(p); + + return pool->new_connection(db, password, ip, port); +} + - g_hash_table_unref (pool->elts_by_ctx); - g_hash_table_unref (pool->elts_by_key); +void +rspamd_redis_pool_release_connection(void *p, + struct redisAsyncContext *ctx, enum rspamd_redis_pool_release_type how) +{ + g_assert (p != NULL); + g_assert (ctx != NULL); + auto *pool = reinterpret_cast<class rspamd::redis_pool *>(p); + + pool->release_connection(ctx, how); +} + + +void +rspamd_redis_pool_destroy(void *p) +{ + auto *pool = reinterpret_cast<class rspamd::redis_pool *>(p); - g_free (pool); + delete pool; } -const gchar* -rspamd_redis_type_to_string (int type) +const gchar * +rspamd_redis_type_to_string(int type) { const gchar *ret = "unknown"; |