123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663 |
- /*
- * Copyright 2023 Vsevolod Stakhov
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
- #include "config.h"
- #include "contrib/libev/ev.h"
- #include "redis_pool.h"
- #include "cfg_file.h"
- #include "contrib/hiredis/hiredis.h"
- #include "contrib/hiredis/async.h"
- #include "contrib/hiredis/adapters/libev.h"
- #include "cryptobox.h"
- #include "logger.h"
- #include "contrib/ankerl/unordered_dense.h"
-
- #include <list>
- #include <unordered_map>
-
- namespace rspamd {
- class redis_pool_elt;
- class redis_pool;
-
- #define msg_debug_rpool(...) rspamd_conditional_debug_fast(NULL, NULL, \
- rspamd_redis_pool_log_id, "redis_pool", conn->tag, \
- __FUNCTION__, \
- __VA_ARGS__)
-
- INIT_LOG_MODULE(redis_pool)
-
- enum class rspamd_redis_pool_connection_state : std::uint8_t {
- RSPAMD_REDIS_POOL_CONN_INACTIVE = 0,
- RSPAMD_REDIS_POOL_CONN_ACTIVE,
- RSPAMD_REDIS_POOL_CONN_FINALISING
- };
-
- struct redis_pool_connection {
- using redis_pool_connection_ptr = std::unique_ptr<redis_pool_connection>;
- using conn_iter_t = std::list<redis_pool_connection_ptr>::iterator;
- struct redisAsyncContext *ctx;
- redis_pool_elt *elt;
- redis_pool *pool;
- conn_iter_t elt_pos;
- ev_timer timeout;
- char tag[MEMPOOL_UID_LEN];
- rspamd_redis_pool_connection_state state;
-
- auto schedule_timeout() -> void;
- ~redis_pool_connection();
-
- explicit redis_pool_connection(redis_pool *_pool,
- redis_pool_elt *_elt,
- const std::string &db,
- const std::string &username,
- const std::string &password,
- struct redisAsyncContext *_ctx);
-
- private:
- static auto redis_conn_timeout_cb(EV_P_ ev_timer *w, int revents) -> void;
- static auto redis_quit_cb(redisAsyncContext *c, void *r, void *priv) -> void;
- static auto redis_on_disconnect(const struct redisAsyncContext *ac, int status) -> auto;
- };
-
-
- using redis_pool_key_t = std::uint64_t;
- class redis_pool;
-
- class redis_pool_elt {
- using redis_pool_connection_ptr = std::unique_ptr<redis_pool_connection>;
- redis_pool *pool;
- /*
- * These lists owns connections, so if an element is removed from both
- * lists, it is destructed
- */
- std::list<redis_pool_connection_ptr> active;
- std::list<redis_pool_connection_ptr> inactive;
- std::list<redis_pool_connection_ptr> terminating;
- std::string ip;
- std::string db;
- std::string username;
- std::string password;
- int port;
- redis_pool_key_t key;
- bool is_unix;
-
- public:
- /* Disable copy */
- redis_pool_elt() = delete;
- redis_pool_elt(const redis_pool_elt &) = delete;
- /* Enable move */
- redis_pool_elt(redis_pool_elt &&other) = default;
-
- explicit redis_pool_elt(redis_pool *_pool,
- const char *_db, const char *_username,
- const char *_password,
- const char *_ip, int _port)
- : pool(_pool), ip(_ip), port(_port),
- key(redis_pool_elt::make_key(_db, _username, _password, _ip, _port))
- {
- is_unix = ip[0] == '.' || ip[0] == '/';
-
- if (_db) {
- db = _db;
- }
- if (_username) {
- username = _username;
- }
- if (_password) {
- password = _password;
- }
- }
-
- auto new_connection() -> redisAsyncContext *;
-
- auto release_connection(const redis_pool_connection *conn) -> void
- {
- switch (conn->state) {
- case rspamd_redis_pool_connection_state::RSPAMD_REDIS_POOL_CONN_ACTIVE:
- active.erase(conn->elt_pos);
- break;
- case rspamd_redis_pool_connection_state::RSPAMD_REDIS_POOL_CONN_INACTIVE:
- inactive.erase(conn->elt_pos);
- break;
- case rspamd_redis_pool_connection_state::RSPAMD_REDIS_POOL_CONN_FINALISING:
- terminating.erase(conn->elt_pos);
- break;
- }
- }
-
- auto move_to_inactive(redis_pool_connection *conn) -> void
- {
- inactive.splice(std::end(inactive), active, conn->elt_pos);
- conn->elt_pos = std::prev(std::end(inactive));
- }
-
- auto move_to_terminating(redis_pool_connection *conn) -> void
- {
- terminating.splice(std::end(terminating), inactive, conn->elt_pos);
- conn->elt_pos = std::prev(std::end(terminating));
- }
-
- inline static auto make_key(const char *db, const char *username,
- const char *password, const char *ip, int port) -> redis_pool_key_t
- {
- rspamd_cryptobox_fast_hash_state_t st;
-
- rspamd_cryptobox_fast_hash_init(&st, rspamd_hash_seed());
-
- if (db) {
- rspamd_cryptobox_fast_hash_update(&st, db, strlen(db));
- }
- if (username) {
- rspamd_cryptobox_fast_hash_update(&st, username, strlen(username));
- }
- if (password) {
- rspamd_cryptobox_fast_hash_update(&st, password, strlen(password));
- }
-
- rspamd_cryptobox_fast_hash_update(&st, ip, strlen(ip));
- rspamd_cryptobox_fast_hash_update(&st, &port, sizeof(port));
-
- return rspamd_cryptobox_fast_hash_final(&st);
- }
-
- auto num_active() const -> auto
- {
- return active.size();
- }
-
- ~redis_pool_elt()
- {
- rspamd_explicit_memzero(password.data(), password.size());
- }
-
- private:
- auto redis_async_new() -> redisAsyncContext *
- {
- struct redisAsyncContext *ctx;
-
- if (is_unix) {
- ctx = redisAsyncConnectUnix(ip.c_str());
- }
- else {
- ctx = redisAsyncConnect(ip.c_str(), port);
- }
-
- if (ctx && ctx->err != REDIS_OK) {
- msg_err("cannot connect to redis %s (port %d): %s", ip.c_str(), port,
- ctx->errstr);
- redisAsyncFree(ctx);
-
- return nullptr;
- }
-
- return ctx;
- }
- };
-
- class redis_pool final {
- static constexpr const double default_timeout = 10.0;
- static constexpr const unsigned default_max_conns = 100;
-
- /* We want to have references integrity */
- ankerl::unordered_dense::map<redisAsyncContext *,
- redis_pool_connection *>
- conns_by_ctx;
- /*
- * We store a pointer to the element in each connection, so this has to be
- * a buckets map with pointers/references stability guarantees.
- */
- std::unordered_map<redis_pool_key_t, redis_pool_elt> elts_by_key;
- bool wanna_die = false; /* Hiredis is 'clever' so we can call ourselves from destructor */
- public:
- double timeout = default_timeout;
- unsigned max_conns = default_max_conns;
- struct ev_loop *event_loop;
- struct rspamd_config *cfg;
-
- public:
- explicit redis_pool()
- : event_loop(nullptr), cfg(nullptr)
- {
- conns_by_ctx.reserve(max_conns);
- }
-
- /* Legacy stuff */
- auto do_config(struct ev_loop *_loop, struct rspamd_config *_cfg) -> void
- {
- event_loop = _loop;
- cfg = _cfg;
- }
-
- auto new_connection(const char *db, const char *username,
- const char *password, const char *ip, int port) -> redisAsyncContext *;
-
- auto release_connection(redisAsyncContext *ctx,
- enum rspamd_redis_pool_release_type how) -> void;
-
- auto unregister_context(redisAsyncContext *ctx) -> void
- {
- conns_by_ctx.erase(ctx);
- }
-
- auto register_context(redisAsyncContext *ctx, redis_pool_connection *conn)
- {
- conns_by_ctx.emplace(ctx, conn);
- }
-
- /* Hack to prevent Redis callbacks to be executed */
- auto prepare_to_die() -> void
- {
- wanna_die = true;
- }
-
- ~redis_pool()
- {
- }
- };
-
-
- redis_pool_connection::~redis_pool_connection()
- {
- const auto *conn = this; /* For debug */
-
- if (state == rspamd_redis_pool_connection_state::RSPAMD_REDIS_POOL_CONN_ACTIVE) {
- msg_debug_rpool("active connection destructed: %p", ctx);
-
- if (ctx) {
- pool->unregister_context(ctx);
-
- if (!(ctx->c.flags & REDIS_FREEING)) {
- auto *ac = ctx;
- ctx = nullptr;
- ac->onDisconnect = nullptr;
- redisAsyncFree(ac);
- }
- }
- }
- else {
- msg_debug_rpool("inactive connection destructed: %p", ctx);
-
- ev_timer_stop(pool->event_loop, &timeout);
- if (ctx) {
- pool->unregister_context(ctx);
-
- if (!(ctx->c.flags & REDIS_FREEING)) {
- auto *ac = ctx;
- /* To prevent on_disconnect here */
- ctx = nullptr;
- ac->onDisconnect = nullptr;
- redisAsyncFree(ac);
- }
- }
- }
- }
-
- auto redis_pool_connection::redis_quit_cb(redisAsyncContext *c, void *r, void *priv) -> void
- {
- struct redis_pool_connection *conn =
- (struct redis_pool_connection *) priv;
-
- msg_debug_rpool("quit command reply for the connection %p",
- conn->ctx);
- /*
- * The connection will be freed by hiredis itself as we are here merely after
- * quit command has succeeded and we have timer being set already.
- * The problem is that when this callback is called, our connection is likely
- * dead, so probably even on_disconnect callback has been already called...
- *
- * Hence, the connection might already be freed, so even (conn) pointer may be
- * inaccessible.
- *
- * TODO: Use refcounts to prevent this stuff to happen, the problem is how
- * to handle Redis timeout on `quit` command in fact... The good thing is that
- * it will not likely happen.
- */
- }
-
- /*
- * Called for inactive connections that due to be removed
- */
- auto redis_pool_connection::redis_conn_timeout_cb(EV_P_ ev_timer *w, int revents) -> void
- {
- auto *conn = (struct redis_pool_connection *) w->data;
-
- g_assert(conn->state != rspamd_redis_pool_connection_state::RSPAMD_REDIS_POOL_CONN_ACTIVE);
-
- if (conn->state == rspamd_redis_pool_connection_state::RSPAMD_REDIS_POOL_CONN_INACTIVE) {
- msg_debug_rpool("scheduled soft removal of connection %p",
- conn->ctx);
- conn->state = rspamd_redis_pool_connection_state::RSPAMD_REDIS_POOL_CONN_FINALISING;
- ev_timer_again(EV_A_ w);
- redisAsyncCommand(conn->ctx, redis_pool_connection::redis_quit_cb, conn, "QUIT");
- conn->elt->move_to_terminating(conn);
- }
- else {
- /* Finalising by timeout */
- ev_timer_stop(EV_A_ w);
- msg_debug_rpool("final removal of connection %p, refcount: %d",
- conn->ctx);
-
- /* Erasure of shared pointer will cause it to be removed */
- conn->elt->release_connection(conn);
- }
- }
-
- auto redis_pool_connection::redis_on_disconnect(const struct redisAsyncContext *ac, int status) -> auto
- {
- auto *conn = (struct redis_pool_connection *) ac->data;
-
- /*
- * Here, we know that redis itself will free this connection
- * so, we need to do something very clever about it
- */
- if (conn->state != rspamd_redis_pool_connection_state::RSPAMD_REDIS_POOL_CONN_ACTIVE) {
- /* Do nothing for active connections as it is already handled somewhere */
- if (conn->ctx) {
- msg_debug_rpool("inactive connection terminated: %s",
- conn->ctx->errstr);
- }
-
- /* Erasure of shared pointer will cause it to be removed */
- conn->elt->release_connection(conn);
- }
- }
-
- auto redis_pool_connection::schedule_timeout() -> void
- {
- const auto *conn = this; /* For debug */
- double real_timeout;
- auto active_elts = elt->num_active();
-
- if (active_elts > pool->max_conns) {
- real_timeout = pool->timeout / 2.0;
- real_timeout = rspamd_time_jitter(real_timeout, real_timeout / 4.0);
- }
- else {
- real_timeout = pool->timeout;
- real_timeout = rspamd_time_jitter(real_timeout, real_timeout / 2.0);
- }
-
- msg_debug_rpool("scheduled connection %p cleanup in %.1f seconds",
- ctx, real_timeout);
-
- timeout.data = this;
- /* Restore in case if these fields have been modified externally */
- ctx->data = this;
- redisAsyncSetDisconnectCallback(ctx, redis_pool_connection::redis_on_disconnect);
- ev_timer_init(&timeout,
- redis_pool_connection::redis_conn_timeout_cb,
- real_timeout, real_timeout / 2.0);
- ev_timer_start(pool->event_loop, &timeout);
- }
-
-
- redis_pool_connection::redis_pool_connection(redis_pool *_pool,
- redis_pool_elt *_elt,
- const std::string &db,
- const std::string &username,
- const std::string &password,
- struct redisAsyncContext *_ctx)
- : ctx(_ctx), elt(_elt), pool(_pool)
- {
-
- state = rspamd_redis_pool_connection_state::RSPAMD_REDIS_POOL_CONN_ACTIVE;
-
- pool->register_context(ctx, this);
- ctx->data = this;
- memset(tag, 0, sizeof(tag));
- rspamd_random_hex(tag, sizeof(tag) - 1);
-
- redisLibevAttach(pool->event_loop, ctx);
- redisAsyncSetDisconnectCallback(ctx, redis_pool_connection::redis_on_disconnect);
-
- if (!username.empty()) {
- if (!password.empty()) {
- redisAsyncCommand(ctx, nullptr, nullptr,
- "AUTH %s %s", username.c_str(), password.c_str());
- }
- else {
- msg_warn("Redis requires a password when username is supplied");
- }
- }
- else if (!password.empty()) {
- redisAsyncCommand(ctx, nullptr, nullptr,
- "AUTH %s", password.c_str());
- }
- if (!db.empty()) {
- redisAsyncCommand(ctx, nullptr, nullptr,
- "SELECT %s", db.c_str());
- }
- }
-
- auto redis_pool_elt::new_connection() -> redisAsyncContext *
- {
- if (!inactive.empty()) {
- decltype(inactive)::value_type conn;
- conn.swap(inactive.back());
- inactive.pop_back();
-
- g_assert(conn->state != rspamd_redis_pool_connection_state::RSPAMD_REDIS_POOL_CONN_ACTIVE);
- if (conn->ctx->err == REDIS_OK) {
- /* Also check SO_ERROR */
- int err;
- socklen_t len = sizeof(int);
-
- if (getsockopt(conn->ctx->c.fd, SOL_SOCKET, SO_ERROR,
- (void *) &err, &len) == -1) {
- err = errno;
- }
-
- if (err != 0) {
- /*
- * We cannot reuse connection, so we just recursively call
- * this function one more time
- */
- return new_connection();
- }
- else {
- /* Reuse connection */
- ev_timer_stop(pool->event_loop, &conn->timeout);
- conn->state = rspamd_redis_pool_connection_state::RSPAMD_REDIS_POOL_CONN_ACTIVE;
- msg_debug_rpool("reused existing connection to %s:%d: %p",
- ip.c_str(), port, conn->ctx);
- active.emplace_front(std::move(conn));
- active.front()->elt_pos = active.begin();
-
- return active.front()->ctx;
- }
- }
- else {
- auto *nctx = redis_async_new();
- if (nctx) {
- active.emplace_front(std::make_unique<redis_pool_connection>(pool, this,
- db.c_str(), username.c_str(), password.c_str(), nctx));
- active.front()->elt_pos = active.begin();
- }
-
- return nctx;
- }
- }
- else {
- auto *nctx = redis_async_new();
- if (nctx) {
- active.emplace_front(std::make_unique<redis_pool_connection>(pool, this,
- db.c_str(), username.c_str(), password.c_str(), nctx));
- active.front()->elt_pos = active.begin();
- }
-
- return nctx;
- }
-
- RSPAMD_UNREACHABLE;
- }
-
- auto redis_pool::new_connection(const char *db, const char *username,
- const char *password, const char *ip, int port) -> redisAsyncContext *
- {
-
- if (!wanna_die) {
- auto key = redis_pool_elt::make_key(db, username, password, ip, port);
- auto found_elt = elts_by_key.find(key);
-
- if (found_elt != elts_by_key.end()) {
- auto &elt = found_elt->second;
-
- return elt.new_connection();
- }
- else {
- /* Need to create a pool */
- auto nelt = elts_by_key.try_emplace(key,
- this, db, username, password, ip, port);
-
- return nelt.first->second.new_connection();
- }
- }
-
- return nullptr;
- }
-
- auto redis_pool::release_connection(redisAsyncContext *ctx,
- enum rspamd_redis_pool_release_type how) -> void
- {
- if (!wanna_die) {
- auto conn_it = conns_by_ctx.find(ctx);
- if (conn_it != conns_by_ctx.end()) {
- auto *conn = conn_it->second;
- g_assert(conn->state == rspamd_redis_pool_connection_state::RSPAMD_REDIS_POOL_CONN_ACTIVE);
-
- if (ctx->err != REDIS_OK) {
- /* We need to terminate connection forcefully */
- msg_debug_rpool("closed connection %p due to an error", conn->ctx);
- }
- else {
- if (how == RSPAMD_REDIS_RELEASE_DEFAULT) {
- /* Ensure that there are no callbacks attached to this conn */
- if (ctx->replies.head == nullptr && (ctx->c.flags & REDIS_CONNECTED)) {
- /* Just move it to the inactive queue */
- conn->state = rspamd_redis_pool_connection_state::RSPAMD_REDIS_POOL_CONN_INACTIVE;
- conn->elt->move_to_inactive(conn);
- conn->schedule_timeout();
- msg_debug_rpool("mark connection %p inactive", conn->ctx);
-
- return;
- }
- else {
- msg_debug_rpool("closed connection %p due to callbacks left",
- conn->ctx);
- }
- }
- else {
- if (how == RSPAMD_REDIS_RELEASE_FATAL) {
- msg_debug_rpool("closed connection %p due to an fatal termination",
- conn->ctx);
- }
- else {
- msg_debug_rpool("closed connection %p due to explicit termination",
- conn->ctx);
- }
- }
- }
-
- conn->elt->release_connection(conn);
- }
- else {
- msg_err("fatal internal error, connection with ctx %p is not found in the Redis pool",
- ctx);
- RSPAMD_UNREACHABLE;
- }
- }
- }
-
- }// namespace rspamd
-
- void *
- rspamd_redis_pool_init(void)
- {
- return new rspamd::redis_pool{};
- }
-
- void rspamd_redis_pool_config(void *p,
- struct rspamd_config *cfg,
- struct ev_loop *ev_base)
- {
- g_assert(p != NULL);
- auto *pool = reinterpret_cast<class rspamd::redis_pool *>(p);
-
- pool->do_config(ev_base, cfg);
- }
-
-
- struct redisAsyncContext *
- rspamd_redis_pool_connect(void *p,
- const char *db, const char *username,
- const char *password, const char *ip, int port)
- {
- g_assert(p != NULL);
- auto *pool = reinterpret_cast<class rspamd::redis_pool *>(p);
-
- return pool->new_connection(db, username, password, ip, port);
- }
-
-
- void rspamd_redis_pool_release_connection(void *p,
- struct redisAsyncContext *ctx, enum rspamd_redis_pool_release_type how)
- {
- g_assert(p != NULL);
- g_assert(ctx != NULL);
- auto *pool = reinterpret_cast<class rspamd::redis_pool *>(p);
-
- pool->release_connection(ctx, how);
- }
-
-
- void rspamd_redis_pool_destroy(void *p)
- {
- auto *pool = reinterpret_cast<class rspamd::redis_pool *>(p);
-
- pool->prepare_to_die();
- delete pool;
- }
-
- const char *
- rspamd_redis_type_to_string(int type)
- {
- const char *ret = "unknown";
-
- switch (type) {
- case REDIS_REPLY_STRING:
- ret = "string";
- break;
- case REDIS_REPLY_ARRAY:
- ret = "array";
- break;
- case REDIS_REPLY_INTEGER:
- ret = "int";
- break;
- case REDIS_REPLY_STATUS:
- ret = "status";
- break;
- case REDIS_REPLY_NIL:
- ret = "nil";
- break;
- case REDIS_REPLY_ERROR:
- ret = "error";
- break;
- default:
- break;
- }
-
- return ret;
- }
|