]> source.dussan.org Git - rspamd.git/commitdiff
[Feature] Fuzzy_storage: add preliminary support of rate limits
authorVsevolod Stakhov <vsevolod@highsecure.ru>
Sat, 8 Dec 2018 14:44:52 +0000 (14:44 +0000)
committerVsevolod Stakhov <vsevolod@highsecure.ru>
Sat, 8 Dec 2018 14:44:52 +0000 (14:44 +0000)
src/fuzzy_storage.c

index a3486635f0da3ddcb48dd381d238704b3cb9bdd9..7fa821e9b6f8728e7a90158f2b0d09b6eb8905e6 100644 (file)
@@ -17,8 +17,8 @@
  * Rspamd fuzzy storage server
  */
 
-#include <src/libserver/fuzzy_wire.h>
 #include "config.h"
+#include "libserver/fuzzy_wire.h"
 #include "util.h"
 #include "rspamd.h"
 #include "map.h"
 #include "libutil/map_private.h"
 #include "libutil/hash.h"
 #include "libutil/http_private.h"
+#include "libutil/hash.h"
 #include "unix-std.h"
 
+#include <math.h>
+
 /* Resync value in seconds */
 #define DEFAULT_SYNC_TIMEOUT 60.0
 #define DEFAULT_KEYPAIR_CACHE_SIZE 512
 #define DEFAULT_MASTER_TIMEOUT 10.0
 #define DEFAULT_UPDATES_MAXFAIL 3
 #define COOKIE_SIZE 128
+#define DEFAULT_MAX_BUCKETS 2000
+#define DEFAULT_BUCKET_TTL 3600
+#define DEFAULT_BUCKET_MASK 24
 
 static const gchar *local_db_name = "local";
 
@@ -115,6 +121,12 @@ struct rspamd_fuzzy_mirror {
        struct rspamd_cryptobox_pubkey *key;
 };
 
+struct rspamd_leaky_bucket_elt {
+       rspamd_inet_addr_t *addr;
+       gdouble last;
+       gdouble cur;
+};
+
 static const guint64 rspamd_fuzzy_storage_magic = 0x291a3253eb1b3ea5ULL;
 
 struct rspamd_fuzzy_storage_ctx {
@@ -132,6 +144,7 @@ struct rspamd_fuzzy_storage_ctx {
        struct rspamd_radix_map_helper *update_ips;
        struct rspamd_radix_map_helper *master_ips;
        struct rspamd_radix_map_helper *blocked_ips;
+       struct rspamd_radix_map_helper *ratelimit_whitelist;
 
        struct rspamd_cryptobox_keypair *sync_keypair;
        struct rspamd_cryptobox_pubkey *master_key;
@@ -141,6 +154,7 @@ struct rspamd_fuzzy_storage_ctx {
        const ucl_object_t *update_map;
        const ucl_object_t *masters_map;
        const ucl_object_t *blocked_map;
+       const ucl_object_t *ratelimit_whitelist_map;
 
        GHashTable *master_flags;
        guint keypair_cache_size;
@@ -148,6 +162,7 @@ struct rspamd_fuzzy_storage_ctx {
        struct event peer_ev;
        struct event stat_ev;
        struct timeval stat_tv;
+
        /* Local keypair */
        struct rspamd_cryptobox_keypair *default_keypair; /* Bad clash, need for parse keypair */
        struct fuzzy_key *default_key;
@@ -160,11 +175,21 @@ struct rspamd_fuzzy_storage_ctx {
        gchar *collection_id_file;
        struct rspamd_keypair_cache *keypair_cache;
        rspamd_lru_hash_t *errors_ips;
+       rspamd_lru_hash_t *ratelimit_buckets;
        struct rspamd_fuzzy_backend *backend;
        GArray *updates_pending;
        guint updates_failed;
        guint updates_maxfail;
        guint32 collection_id;
+
+       /* Ratelimits */
+       guint leaky_bucket_ttl;
+       guint leaky_bucket_mask;
+       guint max_buckets;
+       gboolean ratelimit_log_only;
+       gdouble leaky_bucket_burst;
+       gdouble leaky_bucket_rate;
+
        struct rspamd_worker *worker;
        struct rspamd_http_connection_router *collection_rt;
        const ucl_object_t *skip_map;
@@ -230,6 +255,105 @@ struct fuzzy_master_update_session {
 
 static void rspamd_fuzzy_write_reply (struct fuzzy_session *session);
 
+static gboolean
+rspamd_fuzzy_check_ratelimit (struct fuzzy_session *session)
+{
+       rspamd_inet_addr_t *masked;
+       struct rspamd_leaky_bucket_elt *elt;
+       struct timeval tv;
+       gdouble now;
+
+       if (session->ctx->ratelimit_whitelist != NULL) {
+               if (rspamd_match_radix_map_addr (session->ctx->ratelimit_whitelist,
+                               session->addr) != NULL) {
+                       return TRUE;
+               }
+       }
+
+       /*
+       if (rspamd_inet_address_is_local (session->addr, TRUE)) {
+               return TRUE;
+       }
+       */
+
+       masked = rspamd_inet_address_copy (session->addr);
+
+       if (rspamd_inet_address_get_af (masked) == AF_INET) {
+               rspamd_inet_address_apply_mask (masked,
+                               MIN (session->ctx->leaky_bucket_mask, 32));
+       }
+       else {
+               /* Must be at least /64 */
+               rspamd_inet_address_apply_mask (masked,
+                               MIN (MAX (session->ctx->leaky_bucket_mask * 4, 64), 128));
+       }
+
+#ifdef HAVE_EVENT_NO_CACHE_TIME_FUNC
+       event_base_gettimeofday_cached (session->ctx->ev_base, &tv);
+#else
+       gettimeofday (&tv, NULL);
+#endif
+
+       now = tv_to_double (&tv);
+       elt = rspamd_lru_hash_lookup (session->ctx->ratelimit_buckets, masked,
+                       tv.tv_sec);
+
+       if (elt) {
+               gboolean ratelimited = FALSE;
+
+               if (isnan (elt->cur)) {
+                       /* Ratelimit exceeded, preserve it for the whole ttl */
+                       ratelimited = TRUE;
+               }
+               else {
+                       /* Update bucket */
+                       if (elt->last < now) {
+                               elt->cur -= session->ctx->leaky_bucket_rate * (now - elt->last);
+                               elt->last = now;
+
+                               if (elt->cur < 0) {
+                                       elt->cur = 0;
+                               }
+                       }
+                       else {
+                               elt->last = now;
+                       }
+
+                       /* Check bucket */
+                       if (elt->cur >= session->ctx->leaky_bucket_burst) {
+
+                               msg_info ("ratelimiting %s (%s), %.1f max elts",
+                                               rspamd_inet_address_to_string (session->addr),
+                                               rspamd_inet_address_to_string (masked),
+                                               session->ctx->leaky_bucket_burst);
+                               elt->cur = NAN;
+                       }
+                       else {
+                               elt->cur ++; /* Allow one more request */
+                       }
+               }
+
+               rspamd_inet_address_free (masked);
+
+               return !ratelimited;
+       }
+       else {
+               /* New bucket */
+               elt = g_malloc (sizeof (*elt));
+               elt->addr = masked; /* transfer ownership */
+               elt->cur = 1;
+               elt->last = now;
+
+               rspamd_lru_hash_insert (session->ctx->ratelimit_buckets,
+                               masked,
+                               elt,
+                               tv.tv_sec,
+                               session->ctx->leaky_bucket_ttl);
+       }
+
+       return TRUE;
+}
+
 static gboolean
 rspamd_fuzzy_check_client (struct fuzzy_session *session, gboolean is_write)
 {
@@ -259,6 +383,15 @@ rspamd_fuzzy_check_client (struct fuzzy_session *session, gboolean is_write)
        }
 
        /* Non write */
+       if (session->ctx->ratelimit_buckets) {
+               if (session->ctx->ratelimit_log_only) {
+                       (void)rspamd_fuzzy_check_ratelimit (session); /* Check but ignore */
+               }
+               else {
+                       return rspamd_fuzzy_check_ratelimit (session);
+               }
+       }
+
        return TRUE;
 }
 
@@ -299,6 +432,15 @@ struct fuzzy_slave_connection {
        gint sock;
 };
 
+static void
+fuzzy_rl_bucket_free (gpointer p)
+{
+       struct rspamd_leaky_bucket_elt *elt = (struct rspamd_leaky_bucket_elt *)p;
+
+       rspamd_inet_address_free (elt->addr);
+       g_free (elt);
+}
+
 static void
 fuzzy_mirror_close_connection (struct fuzzy_slave_connection *conn)
 {
@@ -2492,6 +2634,11 @@ init_fuzzy (struct rspamd_config *cfg)
                        (rspamd_mempool_destruct_t)rspamd_ptr_array_free_hard, ctx->mirrors);
        ctx->updates_maxfail = DEFAULT_UPDATES_MAXFAIL;
        ctx->collection_id_file = RSPAMD_DBDIR "/fuzzy_collection.id";
+       ctx->leaky_bucket_mask = DEFAULT_BUCKET_MASK;
+       ctx->leaky_bucket_ttl = DEFAULT_BUCKET_TTL;
+       ctx->max_buckets = DEFAULT_MAX_BUCKETS;
+       ctx->leaky_bucket_burst = NAN;
+       ctx->leaky_bucket_rate = NAN;
 
        rspamd_rcl_register_worker_option (cfg,
                        type,
@@ -2682,6 +2829,65 @@ init_fuzzy (struct rspamd_config *cfg)
                        0,
                        "Skip specific hashes from the map");
 
+       /* Ratelimits */
+       rspamd_rcl_register_worker_option (cfg,
+                       type,
+                       "ratelimit_whitelist",
+                       rspamd_rcl_parse_struct_ucl,
+                       ctx,
+                       G_STRUCT_OFFSET (struct rspamd_fuzzy_storage_ctx, ratelimit_whitelist_map),
+                       0,
+                       "Skip specific addresses from rate limiting");
+       rspamd_rcl_register_worker_option (cfg,
+                       type,
+                       "ratelimit_max_buckets",
+                       rspamd_rcl_parse_struct_integer,
+                       ctx,
+                       G_STRUCT_OFFSET (struct rspamd_fuzzy_storage_ctx, max_buckets),
+                       RSPAMD_CL_FLAG_UINT,
+                       "Maximum number of leaky buckets (default: " G_STRINGIFY(DEFAULT_MAX_BUCKETS) ")");
+       rspamd_rcl_register_worker_option (cfg,
+                       type,
+                       "ratelimit_network_mask",
+                       rspamd_rcl_parse_struct_integer,
+                       ctx,
+                       G_STRUCT_OFFSET (struct rspamd_fuzzy_storage_ctx, leaky_bucket_mask),
+                       RSPAMD_CL_FLAG_UINT,
+                       "Network mask to apply for IPv4 rate addresses (default: " G_STRINGIFY(DEFAULT_BUCKET_MASK) ")");
+       rspamd_rcl_register_worker_option (cfg,
+                       type,
+                       "ratelimit_bucket_ttl",
+                       rspamd_rcl_parse_struct_time,
+                       ctx,
+                       G_STRUCT_OFFSET (struct rspamd_fuzzy_storage_ctx, leaky_bucket_ttl),
+                       RSPAMD_CL_FLAG_TIME_INTEGER,
+                       "Time to live for ratelimit element (default: " G_STRINGIFY(DEFAULT_BUCKET_TTL) ")");
+       rspamd_rcl_register_worker_option (cfg,
+                       type,
+                       "ratelimit_rate",
+                       rspamd_rcl_parse_struct_double,
+                       ctx,
+                       G_STRUCT_OFFSET (struct rspamd_fuzzy_storage_ctx, leaky_bucket_rate),
+                       0,
+                       "Leak rate in requests per second");
+       rspamd_rcl_register_worker_option (cfg,
+                       type,
+                       "ratelimit_burst",
+                       rspamd_rcl_parse_struct_double,
+                       ctx,
+                       G_STRUCT_OFFSET (struct rspamd_fuzzy_storage_ctx, leaky_bucket_burst),
+                       0,
+                       "Peak value for ratelimit bucket");
+       rspamd_rcl_register_worker_option (cfg,
+                       type,
+                       "ratelimit_log_only",
+                       rspamd_rcl_parse_struct_boolean,
+                       ctx,
+                       G_STRUCT_OFFSET (struct rspamd_fuzzy_storage_ctx, ratelimit_log_only),
+                       0,
+                       "Don't really ban on ratelimit reaching, just log");
+
+
        return ctx;
 }
 
@@ -2959,6 +3165,20 @@ start_fuzzy (struct rspamd_worker *worker)
                                &ctx->blocked_ips, NULL);
        }
 
+       /* Create radix trees */
+       if (ctx->ratelimit_whitelist_map != NULL) {
+               rspamd_config_radix_from_ucl (worker->srv->cfg, ctx->ratelimit_whitelist_map,
+                               "Skip ratelimits from specific ip addresses/networks",
+                               &ctx->ratelimit_whitelist, NULL);
+       }
+
+       /* Ratelimits */
+       if (!isnan (ctx->leaky_bucket_rate) && !isnan (ctx->leaky_bucket_burst)) {
+               ctx->ratelimit_buckets = rspamd_lru_hash_new_full (ctx->max_buckets,
+                               NULL, fuzzy_rl_bucket_free,
+                               rspamd_inet_address_hash, rspamd_inet_address_equal);
+       }
+
        /* Maps events */
        ctx->resolver = dns_resolver_init (worker->srv->logger,
                                ctx->ev_base,