|
|
@@ -202,7 +202,7 @@ private: |
|
|
|
} |
|
|
|
}; |
|
|
|
|
|
|
|
class redis_pool { |
|
|
|
class redis_pool final { |
|
|
|
static constexpr const double default_timeout = 10.0; |
|
|
|
static constexpr const unsigned default_max_conns = 100; |
|
|
|
|
|
|
@@ -210,6 +210,7 @@ class redis_pool { |
|
|
|
robin_hood::unordered_flat_map<redisAsyncContext *, |
|
|
|
redis_pool_connection *> conns_by_ctx; |
|
|
|
robin_hood::unordered_node_map<redis_pool_key_t, redis_pool_elt> elts_by_key; |
|
|
|
bool wanna_die = false; /* Hiredis is 'clever' so we can call ourselves from destructor */ |
|
|
|
public: |
|
|
|
double timeout = default_timeout; |
|
|
|
unsigned max_conns = default_max_conns; |
|
|
@@ -244,6 +245,14 @@ public: |
|
|
|
{ |
|
|
|
conns_by_ctx.emplace(ctx, conn); |
|
|
|
} |
|
|
|
|
|
|
|
~redis_pool() { |
|
|
|
/* |
|
|
|
* XXX: this will prevent hiredis to unregister connections that |
|
|
|
* are already destroyed during redisAsyncFree... |
|
|
|
*/ |
|
|
|
wanna_die = true; |
|
|
|
} |
|
|
|
}; |
|
|
|
|
|
|
|
|
|
|
@@ -479,69 +488,75 @@ redis_pool::new_connection(const gchar *db, const gchar *password, |
|
|
|
const char *ip, int port) -> redisAsyncContext * |
|
|
|
{ |
|
|
|
|
|
|
|
auto key = redis_pool_elt::make_key(db, password, ip, port); |
|
|
|
auto found_elt = elts_by_key.find(key); |
|
|
|
if (!wanna_die) { |
|
|
|
auto key = redis_pool_elt::make_key(db, password, ip, port); |
|
|
|
auto found_elt = elts_by_key.find(key); |
|
|
|
|
|
|
|
if (found_elt != elts_by_key.end()) { |
|
|
|
auto &elt = found_elt->second; |
|
|
|
if (found_elt != elts_by_key.end()) { |
|
|
|
auto &elt = found_elt->second; |
|
|
|
|
|
|
|
return elt.new_connection(); |
|
|
|
} |
|
|
|
else { |
|
|
|
/* Need to create a pool */ |
|
|
|
auto nelt = elts_by_key.emplace(std::piecewise_construct, |
|
|
|
std::forward_as_tuple(key), |
|
|
|
std::forward_as_tuple(this, db, password, ip, port)); |
|
|
|
return elt.new_connection(); |
|
|
|
} |
|
|
|
else { |
|
|
|
/* Need to create a pool */ |
|
|
|
auto nelt = elts_by_key.emplace(std::piecewise_construct, |
|
|
|
std::forward_as_tuple(key), |
|
|
|
std::forward_as_tuple(this, db, password, ip, port)); |
|
|
|
|
|
|
|
return nelt.first->second.new_connection(); |
|
|
|
return nelt.first->second.new_connection(); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
return nullptr; |
|
|
|
} |
|
|
|
|
|
|
|
auto redis_pool::release_connection(redisAsyncContext *ctx, |
|
|
|
enum rspamd_redis_pool_release_type how) -> void |
|
|
|
{ |
|
|
|
auto conn_it = conns_by_ctx.find(ctx); |
|
|
|
if (conn_it != conns_by_ctx.end()) { |
|
|
|
auto *conn = conn_it->second; |
|
|
|
g_assert (conn->state == RSPAMD_REDIS_POOL_CONN_ACTIVE); |
|
|
|
|
|
|
|
if (ctx->err != REDIS_OK) { |
|
|
|
/* We need to terminate connection forcefully */ |
|
|
|
msg_debug_rpool ("closed connection %p due to an error", conn->ctx); |
|
|
|
} |
|
|
|
else { |
|
|
|
if (how == RSPAMD_REDIS_RELEASE_DEFAULT) { |
|
|
|
/* Ensure that there are no callbacks attached to this conn */ |
|
|
|
if (ctx->replies.head == nullptr) { |
|
|
|
/* Just move it to the inactive queue */ |
|
|
|
conn->state = RSPAMD_REDIS_POOL_CONN_INACTIVE; |
|
|
|
conn->elt->move_to_inactive(conn); |
|
|
|
conn->schedule_timeout(); |
|
|
|
msg_debug_rpool("mark connection %p inactive", conn->ctx); |
|
|
|
|
|
|
|
return; |
|
|
|
} |
|
|
|
else { |
|
|
|
msg_debug_rpool("closed connection %p due to callbacks left", |
|
|
|
conn->ctx); |
|
|
|
} |
|
|
|
if (!wanna_die) { |
|
|
|
auto conn_it = conns_by_ctx.find(ctx); |
|
|
|
if (conn_it != conns_by_ctx.end()) { |
|
|
|
auto *conn = conn_it->second; |
|
|
|
g_assert (conn->state == RSPAMD_REDIS_POOL_CONN_ACTIVE); |
|
|
|
|
|
|
|
if (ctx->err != REDIS_OK) { |
|
|
|
/* We need to terminate connection forcefully */ |
|
|
|
msg_debug_rpool ("closed connection %p due to an error", conn->ctx); |
|
|
|
} |
|
|
|
else { |
|
|
|
if (how == RSPAMD_REDIS_RELEASE_FATAL) { |
|
|
|
msg_debug_rpool("closed connection %p due to an fatal termination", |
|
|
|
conn->ctx); |
|
|
|
if (how == RSPAMD_REDIS_RELEASE_DEFAULT) { |
|
|
|
/* Ensure that there are no callbacks attached to this conn */ |
|
|
|
if (ctx->replies.head == nullptr) { |
|
|
|
/* Just move it to the inactive queue */ |
|
|
|
conn->state = RSPAMD_REDIS_POOL_CONN_INACTIVE; |
|
|
|
conn->elt->move_to_inactive(conn); |
|
|
|
conn->schedule_timeout(); |
|
|
|
msg_debug_rpool("mark connection %p inactive", conn->ctx); |
|
|
|
|
|
|
|
return; |
|
|
|
} |
|
|
|
else { |
|
|
|
msg_debug_rpool("closed connection %p due to callbacks left", |
|
|
|
conn->ctx); |
|
|
|
} |
|
|
|
} |
|
|
|
else { |
|
|
|
msg_debug_rpool("closed connection %p due to explicit termination", |
|
|
|
conn->ctx); |
|
|
|
if (how == RSPAMD_REDIS_RELEASE_FATAL) { |
|
|
|
msg_debug_rpool("closed connection %p due to an fatal termination", |
|
|
|
conn->ctx); |
|
|
|
} |
|
|
|
else { |
|
|
|
msg_debug_rpool("closed connection %p due to explicit termination", |
|
|
|
conn->ctx); |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
conn->elt->release_connection(conn); |
|
|
|
} |
|
|
|
else { |
|
|
|
RSPAMD_UNREACHABLE; |
|
|
|
conn->elt->release_connection(conn); |
|
|
|
} |
|
|
|
else { |
|
|
|
RSPAMD_UNREACHABLE; |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
|