|
|
@@ -69,20 +69,17 @@ struct redis_pool_connection { |
|
|
|
gchar tag[MEMPOOL_UID_LEN]; |
|
|
|
|
|
|
|
auto schedule_timeout() -> void; |
|
|
|
|
|
|
|
~redis_pool_connection(); |
|
|
|
|
|
|
|
explicit redis_pool_connection(redis_pool *_pool, |
|
|
|
redis_pool_elt *_elt, |
|
|
|
const char *db, |
|
|
|
const char *password, |
|
|
|
const std::string &db, |
|
|
|
const std::string &password, |
|
|
|
struct redisAsyncContext *_ctx); |
|
|
|
|
|
|
|
private: |
|
|
|
static auto redis_conn_timeout_cb(EV_P_ ev_timer *w, int revents) -> void; |
|
|
|
|
|
|
|
static auto redis_quit_cb(redisAsyncContext *c, void *r, void *priv) -> void; |
|
|
|
|
|
|
|
static auto redis_on_disconnect(const struct redisAsyncContext *ac, int status) -> auto; |
|
|
|
}; |
|
|
|
|
|
|
@@ -109,10 +106,17 @@ public: |
|
|
|
explicit redis_pool_elt(redis_pool *_pool, |
|
|
|
const gchar *_db, const gchar *_password, |
|
|
|
const char *_ip, int _port) |
|
|
|
: pool(_pool), ip(_ip), db(_db), password(_password), port(_port), |
|
|
|
: pool(_pool), ip(_ip), port(_port), |
|
|
|
key(redis_pool_elt::make_key(_db, _password, _ip, _port)) |
|
|
|
{ |
|
|
|
is_unix = ip[0] == '.' || ip[0] == '/'; |
|
|
|
|
|
|
|
if (_db) { |
|
|
|
db = _db; |
|
|
|
} |
|
|
|
if (_password) { |
|
|
|
password = _password; |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
auto new_connection() -> redisAsyncContext *; |
|
|
@@ -157,6 +161,10 @@ public: |
|
|
|
return active.size(); |
|
|
|
} |
|
|
|
|
|
|
|
~redis_pool_elt() { |
|
|
|
rspamd_explicit_memzero(password.data(), password.size()); |
|
|
|
} |
|
|
|
|
|
|
|
private: |
|
|
|
auto redis_async_new() -> redisAsyncContext * |
|
|
|
{ |
|
|
@@ -186,9 +194,9 @@ class redis_pool { |
|
|
|
static constexpr const unsigned default_max_conns = 100; |
|
|
|
|
|
|
|
/* We want to have references integrity */ |
|
|
|
robin_hood::unordered_node_map<redis_pool_key_t, redis_pool_elt> elts_by_key; |
|
|
|
robin_hood::unordered_flat_map<redisAsyncContext *, |
|
|
|
redis_pool_connection *> conns_by_ctx; |
|
|
|
robin_hood::unordered_node_map<redis_pool_key_t, redis_pool_elt> elts_by_key; |
|
|
|
public: |
|
|
|
double timeout = default_timeout; |
|
|
|
unsigned max_conns = default_max_conns; |
|
|
@@ -363,8 +371,8 @@ redis_pool_connection::schedule_timeout() -> void |
|
|
|
|
|
|
|
redis_pool_connection::redis_pool_connection(redis_pool *_pool, |
|
|
|
redis_pool_elt *_elt, |
|
|
|
const char *db, |
|
|
|
const char *password, |
|
|
|
const std::string &db, |
|
|
|
const std::string &password, |
|
|
|
struct redisAsyncContext *_ctx) |
|
|
|
: ctx(_ctx), elt(_elt), pool(_pool) |
|
|
|
{ |
|
|
@@ -378,13 +386,13 @@ redis_pool_connection::redis_pool_connection(redis_pool *_pool, |
|
|
|
redisLibevAttach(pool->event_loop, ctx); |
|
|
|
redisAsyncSetDisconnectCallback(ctx, redis_pool_connection::redis_on_disconnect); |
|
|
|
|
|
|
|
if (password) { |
|
|
|
redisAsyncCommand(ctx, NULL, NULL, |
|
|
|
"AUTH %s", password); |
|
|
|
if (!password.empty()) { |
|
|
|
redisAsyncCommand(ctx, nullptr, nullptr, |
|
|
|
"AUTH %s", password.c_str()); |
|
|
|
} |
|
|
|
if (db) { |
|
|
|
redisAsyncCommand(ctx, NULL, NULL, |
|
|
|
"SELECT %s", db); |
|
|
|
if (!db.empty()) { |
|
|
|
redisAsyncCommand(ctx, nullptr, nullptr, |
|
|
|
"SELECT %s", db.c_str()); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
@@ -392,7 +400,8 @@ auto |
|
|
|
redis_pool_elt::new_connection() -> redisAsyncContext * |
|
|
|
{ |
|
|
|
if (!inactive.empty()) { |
|
|
|
auto &&conn = std::move(inactive.back()); |
|
|
|
decltype(inactive)::value_type conn; |
|
|
|
conn.swap(inactive.back()); |
|
|
|
inactive.pop_back(); |
|
|
|
|
|
|
|
g_assert (conn->state != RSPAMD_REDIS_POOL_CONN_ACTIVE); |
|
|
@@ -421,6 +430,8 @@ redis_pool_elt::new_connection() -> redisAsyncContext * |
|
|
|
ip.c_str(), port, conn->ctx); |
|
|
|
active.emplace_front(std::move(conn)); |
|
|
|
active.front()->elt_pos = active.begin(); |
|
|
|
|
|
|
|
return active.front()->ctx; |
|
|
|
} |
|
|
|
} |
|
|
|
else { |
|
|
@@ -463,8 +474,9 @@ redis_pool::new_connection(const gchar *db, const gchar *password, |
|
|
|
} |
|
|
|
else { |
|
|
|
/* Need to create a pool */ |
|
|
|
auto nelt = elts_by_key.emplace(key, |
|
|
|
redis_pool_elt{this, db, password, ip, port}); |
|
|
|
auto nelt = elts_by_key.emplace(std::piecewise_construct, |
|
|
|
std::forward_as_tuple(key), |
|
|
|
std::forward_as_tuple(this, db, password, ip, port)); |
|
|
|
|
|
|
|
return nelt.first->second.new_connection(); |
|
|
|
} |
|
|
@@ -481,7 +493,6 @@ auto redis_pool::release_connection(redisAsyncContext *ctx, |
|
|
|
if (ctx->err != REDIS_OK) { |
|
|
|
/* We need to terminate connection forcefully */ |
|
|
|
msg_debug_rpool ("closed connection %p due to an error", conn->ctx); |
|
|
|
conn->elt->release_active(conn); |
|
|
|
} |
|
|
|
else { |
|
|
|
if (how == RSPAMD_REDIS_RELEASE_DEFAULT) { |
|
|
@@ -492,11 +503,12 @@ auto redis_pool::release_connection(redisAsyncContext *ctx, |
|
|
|
conn->elt->move_to_inactive(conn); |
|
|
|
conn->schedule_timeout(); |
|
|
|
msg_debug_rpool("mark connection %p inactive", conn->ctx); |
|
|
|
|
|
|
|
return; |
|
|
|
} |
|
|
|
else { |
|
|
|
msg_debug_rpool("closed connection %p due to callbacks left", |
|
|
|
conn->ctx); |
|
|
|
conn->elt->release_active(conn); |
|
|
|
} |
|
|
|
} |
|
|
|
else { |
|
|
@@ -508,8 +520,6 @@ auto redis_pool::release_connection(redisAsyncContext *ctx, |
|
|
|
msg_debug_rpool("closed connection %p due to explicit termination", |
|
|
|
conn->ctx); |
|
|
|
} |
|
|
|
|
|
|
|
conn->elt->release_active(conn); |
|
|
|
} |
|
|
|
} |
|
|
|
|