aboutsummaryrefslogtreecommitdiffstats
path: root/src/libserver/redis_pool.cxx
diff options
context:
space:
mode:
Diffstat (limited to 'src/libserver/redis_pool.cxx')
-rw-r--r--src/libserver/redis_pool.cxx242
1 files changed, 136 insertions, 106 deletions
diff --git a/src/libserver/redis_pool.cxx b/src/libserver/redis_pool.cxx
index a059ea6fe..9053cc0e7 100644
--- a/src/libserver/redis_pool.cxx
+++ b/src/libserver/redis_pool.cxx
@@ -29,8 +29,8 @@
#include "libutil/cxx/local_shared_ptr.hxx"
namespace rspamd {
-struct redis_pool_elt;
-struct redis_pool;
+class redis_pool_elt;
+class redis_pool;
#define msg_err_rpool(...) rspamd_default_log_function (G_LOG_LEVEL_CRITICAL, \
"redis_pool", conn->tag, \
@@ -61,31 +61,34 @@ struct redis_pool_connection {
using redis_pool_connection_ptr = std::unique_ptr<redis_pool_connection>;
using conn_iter_t = std::list<redis_pool_connection_ptr>::iterator;
struct redisAsyncContext *ctx;
- struct redis_pool_elt *elt;
- struct redis_pool *pool;
+ redis_pool_elt *elt;
+ redis_pool *pool;
conn_iter_t elt_pos;
ev_timer timeout;
enum rspamd_redis_pool_connection_state state;
gchar tag[MEMPOOL_UID_LEN];
- auto schedule_timeout () -> void;
+ auto schedule_timeout() -> void;
+
~redis_pool_connection();
- explicit redis_pool_connection(struct redis_pool *_pool,
- struct redis_pool_elt *_elt,
- const char *db,
- const char *password,
+ explicit redis_pool_connection(redis_pool *_pool,
+ redis_pool_elt *_elt,
+ const char *db,
+ const char *password,
struct redisAsyncContext *_ctx);
private:
static auto redis_conn_timeout_cb(EV_P_ ev_timer *w, int revents) -> void;
+
static auto redis_quit_cb(redisAsyncContext *c, void *r, void *priv) -> void;
+
static auto redis_on_disconnect(const struct redisAsyncContext *ac, int status) -> auto;
};
using redis_pool_key_t = std::uint64_t;
-struct redis_pool;
+class redis_pool;
class redis_pool_elt {
using redis_pool_connection_ptr = std::unique_ptr<redis_pool_connection>;
@@ -106,15 +109,31 @@ public:
explicit redis_pool_elt(redis_pool *_pool,
const gchar *_db, const gchar *_password,
const char *_ip, int _port)
- : pool(_pool), ip(_ip), db(_db), port(_port), password(_password),
- key(redis_pool_elt::make_key(_db, _password, _ip, _port))
+ : pool(_pool), ip(_ip), db(_db), password(_password), port(_port),
+ key(redis_pool_elt::make_key(_db, _password, _ip, _port))
{
is_unix = ip[0] == '.' || ip[0] == '/';
}
auto new_connection() -> redisAsyncContext *;
+
+ auto release_active(const redis_pool_connection *conn) -> void
+ {
+ active.erase(conn->elt_pos);
+ }
+
+ auto release_inactive(const redis_pool_connection *conn) -> void
+ {
+ inactive.erase(conn->elt_pos);
+ }
+
+ auto move_to_inactive(const redis_pool_connection *conn) -> void
+ {
+ inactive.splice(std::end(inactive), active, conn->elt_pos);
+ }
+
inline static auto make_key(const gchar *db, const gchar *password,
- const char *ip, int port) -> redis_pool_key_t
+ const char *ip, int port) -> redis_pool_key_t
{
rspamd_cryptobox_fast_hash_state_t st;
@@ -132,8 +151,14 @@ public:
return rspamd_cryptobox_fast_hash_final(&st);
}
+
+ auto num_active() const -> auto
+ {
+ return active.size();
+ }
+
private:
- auto redis_async_new() -> redisAsyncContext*
+ auto redis_async_new() -> redisAsyncContext *
{
struct redisAsyncContext *ctx;
@@ -164,30 +189,40 @@ class redis_pool {
robin_hood::unordered_node_map<redis_pool_key_t, redis_pool_elt> elts_by_key;
robin_hood::unordered_flat_map<redisAsyncContext *,
redis_pool_connection *> conns_by_ctx;
+public:
double timeout = default_timeout;
unsigned max_conns = default_max_conns;
-public:
struct ev_loop *event_loop;
struct rspamd_config *cfg;
public:
- explicit redis_pool() : event_loop(nullptr), cfg(nullptr) {
+ explicit redis_pool() : event_loop(nullptr), cfg(nullptr)
+ {
conns_by_ctx.reserve(max_conns);
}
/* Legacy stuff */
- auto do_config(struct ev_loop *_loop, struct rspamd_config *_cfg) -> void {
+ auto do_config(struct ev_loop *_loop, struct rspamd_config *_cfg) -> void
+ {
event_loop = _loop;
cfg = _cfg;
}
auto new_connection(const gchar *db, const gchar *password,
const char *ip, int port) -> redisAsyncContext *;
- auto release_connection(redisAsyncContext *ctx) -> void;
- auto unregister_context(redisAsyncContext *ctx) -> void {
+ auto release_connection(redisAsyncContext *ctx,
+ enum rspamd_redis_pool_release_type how) -> void;
+
+ auto unregister_context(redisAsyncContext *ctx) -> void
+ {
conns_by_ctx.erase(ctx);
}
+
+ auto register_context(redisAsyncContext *ctx, redis_pool_connection *conn)
+ {
+ conns_by_ctx.emplace(ctx, conn);
+ }
};
@@ -230,7 +265,7 @@ auto
redis_pool_connection::redis_quit_cb(redisAsyncContext *c, void *r, void *priv) -> void
{
struct redis_pool_connection *conn =
- (struct redis_pool_connection *)priv;
+ (struct redis_pool_connection *) priv;
msg_debug_rpool("quit command reply for the connection %p",
conn->ctx);
@@ -255,7 +290,7 @@ redis_pool_connection::redis_quit_cb(redisAsyncContext *c, void *r, void *priv)
auto
redis_pool_connection::redis_conn_timeout_cb(EV_P_ ev_timer *w, int revents) -> void
{
- auto *conn = (struct redis_pool_connection *)w->data;
+ auto *conn = (struct redis_pool_connection *) w->data;
g_assert (conn->state != RSPAMD_REDIS_POOL_CONN_ACTIVE);
@@ -273,7 +308,7 @@ redis_pool_connection::redis_conn_timeout_cb(EV_P_ ev_timer *w, int revents) ->
conn->ctx);
/* Erasure of shared pointer will cause it to be removed */
- conn->elt->inactive.erase(conn->elt_pos);
+ conn->elt->release_inactive(conn);
}
}
@@ -281,7 +316,7 @@ redis_pool_connection::redis_conn_timeout_cb(EV_P_ ev_timer *w, int revents) ->
auto
redis_pool_connection::redis_on_disconnect(const struct redisAsyncContext *ac, int status) -> auto
{
- auto *conn = (struct redis_pool_connection *)ac->data;
+ auto *conn = (struct redis_pool_connection *) ac->data;
/*
* Here, we know that redis itself will free this connection
@@ -295,7 +330,7 @@ redis_pool_connection::redis_on_disconnect(const struct redisAsyncContext *ac, i
}
/* Erasure of shared pointer will cause it to be removed */
- conn->elt->inactive.erase(conn->elt_pos);
+ conn->elt->release_inactive(conn);
}
}
@@ -304,15 +339,15 @@ redis_pool_connection::schedule_timeout() -> void
{
const auto *conn = this; /* For debug */
double real_timeout;
- auto active_elts = elt->active.size();
+ auto active_elts = elt->num_active();
if (active_elts > pool->max_conns) {
real_timeout = pool->timeout / 2.0;
- real_timeout = rspamd_time_jitter (real_timeout, real_timeout / 4.0);
+ real_timeout = rspamd_time_jitter(real_timeout, real_timeout / 4.0);
}
else {
real_timeout = pool->timeout;
- real_timeout = rspamd_time_jitter (real_timeout, real_timeout / 2.0);
+ real_timeout = rspamd_time_jitter(real_timeout, real_timeout / 2.0);
}
msg_debug_rpool("scheduled connection %p cleanup in %.1f seconds",
@@ -326,8 +361,8 @@ redis_pool_connection::schedule_timeout() -> void
}
-redis_pool_connection::redis_pool_connection(struct redis_pool *_pool,
- struct redis_pool_elt *_elt,
+redis_pool_connection::redis_pool_connection(redis_pool *_pool,
+ redis_pool_elt *_elt,
const char *db,
const char *password,
struct redisAsyncContext *_ctx)
@@ -336,9 +371,9 @@ redis_pool_connection::redis_pool_connection(struct redis_pool *_pool,
state = RSPAMD_REDIS_POOL_CONN_ACTIVE;
- pool->conns_by_ctx.emplace(ctx, this);
+ pool->register_context(ctx, this);
ctx->data = this;
- rspamd_random_hex((guchar *)tag, sizeof(tag));
+ rspamd_random_hex((guchar *) tag, sizeof(tag));
redisLibevAttach(pool->event_loop, ctx);
redisAsyncSetDisconnectCallback(ctx, redis_pool_connection::redis_on_disconnect);
@@ -384,14 +419,16 @@ redis_pool_elt::new_connection() -> redisAsyncContext *
conn->state = RSPAMD_REDIS_POOL_CONN_ACTIVE;
msg_debug_rpool("reused existing connection to %s:%d: %p",
ip.c_str(), port, conn->ctx);
- active.emplace_back(std::move(conn));
+ active.emplace_front(std::move(conn));
+ active.front()->elt_pos = active.begin();
}
}
else {
auto *nctx = redis_async_new();
if (nctx) {
- active.emplace_back(std::make_unique<redis_pool_connection>(pool, this,
+ active.emplace_front(std::make_unique<redis_pool_connection>(pool, this,
db.c_str(), password.c_str(), nctx));
+ active.front()->elt_pos = active.begin();
}
return nctx;
@@ -400,17 +437,20 @@ redis_pool_elt::new_connection() -> redisAsyncContext *
else {
auto *nctx = redis_async_new();
if (nctx) {
- active.emplace_back(std::make_unique<redis_pool_connection>(pool, this,
+ active.emplace_front(std::make_unique<redis_pool_connection>(pool, this,
db.c_str(), password.c_str(), nctx));
+ active.front()->elt_pos = active.begin();
}
return nctx;
}
+
+ RSPAMD_UNREACHABLE;
}
auto
redis_pool::new_connection(const gchar *db, const gchar *password,
- const char *ip, int port) -> redisAsyncContext *
+ const char *ip, int port) -> redisAsyncContext *
{
auto key = redis_pool_elt::make_key(db, password, ip, port);
@@ -430,120 +470,110 @@ redis_pool::new_connection(const gchar *db, const gchar *password,
}
}
-}
-
-void *
-rspamd_redis_pool_init (void)
-{
- return new rspamd::redis_pool{};
-}
-
-void
-rspamd_redis_pool_config (void *p,
- struct rspamd_config *cfg,
- struct ev_loop *ev_base)
-{
- g_assert (p != NULL);
- auto *pool = reinterpret_cast<struct rspamd::redis_pool *>(p);
-
- pool->do_config(ev_base, cfg);
-}
-
-
-struct redisAsyncContext*
-rspamd_redis_pool_connect (void *p,
- const gchar *db, const gchar *password,
- const char *ip, int port)
-{
- g_assert (p != NULL);
- auto *pool = reinterpret_cast<struct rspamd::redis_pool *>(p);
-
- return pool->new_connection(db, password, ip, port);
-}
-
-
-void
-rspamd_redis_pool_release_connection (struct rspamd_redis_pool *pool,
- struct redisAsyncContext *ctx, enum rspamd_redis_pool_release_type how)
+auto redis_pool::release_connection(redisAsyncContext *ctx,
+ enum rspamd_redis_pool_release_type how) -> void
{
- struct rspamd_redis_pool_connection *conn;
-
- g_assert (pool != NULL);
- g_assert (ctx != NULL);
-
- conn = (struct rspamd_redis_pool_connection *)g_hash_table_lookup (pool->elts_by_ctx, ctx);
- if (conn != NULL) {
+ auto conn_it = conns_by_ctx.find(ctx);
+ if (conn_it != conns_by_ctx.end()) {
+ auto *conn = conn_it->second;
g_assert (conn->state == RSPAMD_REDIS_POOL_CONN_ACTIVE);
if (ctx->err != REDIS_OK) {
/* We need to terminate connection forcefully */
msg_debug_rpool ("closed connection %p due to an error", conn->ctx);
- REF_RELEASE (conn);
+ conn->elt->release_active(conn);
}
else {
if (how == RSPAMD_REDIS_RELEASE_DEFAULT) {
/* Ensure that there are no callbacks attached to this conn */
- if (ctx->replies.head == NULL) {
+ if (ctx->replies.head == nullptr) {
/* Just move it to the inactive queue */
- g_queue_unlink (conn->elt->active, conn->entry);
- g_queue_push_head_link (conn->elt->inactive, conn->entry);
conn->state = RSPAMD_REDIS_POOL_CONN_INACTIVE;
- rspamd_redis_pool_schedule_timeout (conn);
- msg_debug_rpool ("mark connection %p inactive", conn->ctx);
+ conn->elt->move_to_inactive(conn);
+ conn->schedule_timeout();
+ msg_debug_rpool("mark connection %p inactive", conn->ctx);
}
else {
- msg_debug_rpool ("closed connection %p due to callbacks left",
+ msg_debug_rpool("closed connection %p due to callbacks left",
conn->ctx);
- REF_RELEASE (conn);
+ conn->elt->release_active(conn);
}
}
else {
if (how == RSPAMD_REDIS_RELEASE_FATAL) {
- msg_debug_rpool ("closed connection %p due to an fatal termination",
+ msg_debug_rpool("closed connection %p due to an fatal termination",
conn->ctx);
}
else {
- msg_debug_rpool ("closed connection %p due to explicit termination",
+ msg_debug_rpool("closed connection %p due to explicit termination",
conn->ctx);
}
- REF_RELEASE (conn);
+ conn->elt->release_active(conn);
}
}
- REF_RELEASE (conn);
+ conn->elt->release_active(conn);
}
else {
- g_assert_not_reached ();
+ RSPAMD_UNREACHABLE;
}
}
+}
+
+void *
+rspamd_redis_pool_init(void)
+{
+ return new rspamd::redis_pool{};
+}
void
-rspamd_redis_pool_destroy (struct rspamd_redis_pool *pool)
+rspamd_redis_pool_config(void *p,
+ struct rspamd_config *cfg,
+ struct ev_loop *ev_base)
{
- struct rspamd_redis_pool_elt *elt;
- GHashTableIter it;
- gpointer k, v;
+ g_assert (p != NULL);
+ auto *pool = reinterpret_cast<class rspamd::redis_pool *>(p);
- g_assert (pool != NULL);
+ pool->do_config(ev_base, cfg);
+}
- g_hash_table_iter_init (&it, pool->elts_by_key);
- while (g_hash_table_iter_next (&it, &k, &v)) {
- elt = (struct rspamd_redis_pool_elt *)v;
- rspamd_redis_pool_elt_dtor (elt);
- g_hash_table_iter_steal (&it);
- }
+struct redisAsyncContext *
+rspamd_redis_pool_connect(void *p,
+ const gchar *db, const gchar *password,
+ const char *ip, int port)
+{
+ g_assert (p != NULL);
+ auto *pool = reinterpret_cast<class rspamd::redis_pool *>(p);
+
+ return pool->new_connection(db, password, ip, port);
+}
+
- g_hash_table_unref (pool->elts_by_ctx);
- g_hash_table_unref (pool->elts_by_key);
+void
+rspamd_redis_pool_release_connection(void *p,
+ struct redisAsyncContext *ctx, enum rspamd_redis_pool_release_type how)
+{
+ g_assert (p != NULL);
+ g_assert (ctx != NULL);
+ auto *pool = reinterpret_cast<class rspamd::redis_pool *>(p);
+
+ pool->release_connection(ctx, how);
+}
+
+
+void
+rspamd_redis_pool_destroy(void *p)
+{
+ auto *pool = reinterpret_cast<class rspamd::redis_pool *>(p);
- g_free (pool);
+ delete pool;
}
-const gchar*
-rspamd_redis_type_to_string (int type)
+const gchar *
+rspamd_redis_type_to_string(int type)
{
const gchar *ret = "unknown";