diff options
author | Vsevolod Stakhov <vsevolod@rspamd.com> | 2023-07-01 13:32:22 +0100 |
---|---|---|
committer | Vsevolod Stakhov <vsevolod@rspamd.com> | 2023-07-01 13:32:22 +0100 |
commit | 845ae26f6ae5c1bac83fdf49ad7b9ef34482a9a5 (patch) | |
tree | 64cada48b4680b0efe30251cb414733fa784bff9 /src/fuzzy_storage.c | |
parent | 0e4932734ca0d7f5e1e5e90e554b186e271a0a4f (diff) | |
download | rspamd-845ae26f6ae5c1bac83fdf49ad7b9ef34482a9a5.tar.gz rspamd-845ae26f6ae5c1bac83fdf49ad7b9ef34482a9a5.zip |
[Feature] Allow fuzzy workers to exchange blocked information
Diffstat (limited to 'src/fuzzy_storage.c')
-rw-r--r-- | src/fuzzy_storage.c | 107 |
1 files changed, 104 insertions, 3 deletions
diff --git a/src/fuzzy_storage.c b/src/fuzzy_storage.c index 33f9d40e8..31a2c46e6 100644 --- a/src/fuzzy_storage.c +++ b/src/fuzzy_storage.c @@ -273,7 +273,7 @@ rspamd_fuzzy_check_ratelimit (struct fuzzy_session *session) (time_t)session->timestamp); if (elt) { - gboolean ratelimited = FALSE; + gboolean ratelimited = FALSE, new_ratelimit = FALSE; if (isnan (elt->cur)) { /* Ratelimit exceeded, preserve it for the whole ttl */ @@ -301,18 +301,40 @@ rspamd_fuzzy_check_ratelimit (struct fuzzy_session *session) rspamd_inet_address_to_string (masked), session->ctx->leaky_bucket_burst); elt->cur = NAN; + new_ratelimit = TRUE; } else { elt->cur ++; /* Allow one more request */ } } - rspamd_inet_address_free (masked); - if (ratelimited) { rspamd_fuzzy_maybe_call_blacklisted(session->ctx, session->addr, "ratelimit"); } + if (new_ratelimit) { + struct rspamd_srv_command srv_cmd; + + srv_cmd.type = RSPAMD_SRV_FUZZY_BLOCKED; + srv_cmd.cmd.fuzzy_blocked.af = rspamd_inet_address_get_af(masked); + + if (srv_cmd.cmd.fuzzy_blocked.af == AF_INET || srv_cmd.cmd.fuzzy_blocked.af == AF_INET6) { + socklen_t slen; + struct sockaddr *sa = rspamd_inet_address_get_sa(masked, &slen); + + if (slen <= sizeof(srv_cmd.cmd.fuzzy_blocked.addr)) { + memcpy(&srv_cmd.cmd.fuzzy_blocked.addr, sa, slen); + msg_debug("propagating blocked address to other workers"); + rspamd_srv_send_command(session->worker, session->ctx->event_loop, &srv_cmd, -1, NULL, NULL); + } + else { + msg_err("bad address length: %d, expected to be %d", (int)slen, (int)sizeof(srv_cmd.cmd.fuzzy_blocked.addr)); + } + } + } + + rspamd_inet_address_free (masked); + return !ratelimited; } else { @@ -1907,6 +1929,83 @@ rspamd_fuzzy_storage_sync (struct rspamd_main *rspamd_main, } static gboolean +rspamd_fuzzy_control_blocked (struct rspamd_main *rspamd_main, + struct rspamd_worker *worker, gint fd, + gint attached_fd, + struct rspamd_control_command *cmd, + gpointer ud) +{ + struct rspamd_fuzzy_storage_ctx *ctx = (struct rspamd_fuzzy_storage_ctx *)ud; + struct rspamd_control_reply rep; + struct rspamd_leaky_bucket_elt *elt; + ev_tstamp now = ev_now (ctx->event_loop); + rspamd_inet_addr_t *addr = NULL; + + rep.type = RSPAMD_CONTROL_FUZZY_BLOCKED; + rep.reply.fuzzy_blocked.status = 0; + + if (cmd->cmd.fuzzy_blocked.af == AF_INET) { + addr = rspamd_inet_address_from_sa(&cmd->cmd.fuzzy_blocked.addr.sa, + sizeof (struct sockaddr_in)); + } + else if (cmd->cmd.fuzzy_blocked.af == AF_INET6) { + addr = rspamd_inet_address_from_sa(&cmd->cmd.fuzzy_blocked.addr.sa, + sizeof (struct sockaddr_in6)); + } + else { + msg_err ("invalid address family: %d", cmd->cmd.fuzzy_blocked.af); + rep.reply.fuzzy_blocked.status = -1; + } + + if (addr) { + elt = rspamd_lru_hash_lookup(ctx->ratelimit_buckets, addr, + (time_t) now); + + if (elt) { + if (isnan (elt->cur)) { + /* Already ratelimited, ignore */ + } + else { + elt->last = now; + elt->cur = NAN; + + msg_info ("propagating ratelimiting %s, %.1f max elts", + rspamd_inet_address_to_string(addr), + ctx->leaky_bucket_burst); + rspamd_fuzzy_maybe_call_blacklisted(ctx, addr, "ratelimit"); + } + + rspamd_inet_address_free(addr); + + } + else { + /* New bucket */ + elt = g_malloc(sizeof(*elt)); + elt->addr = addr; /* transfer ownership */ + elt->cur = NAN; + elt->last = now; + + rspamd_lru_hash_insert(ctx->ratelimit_buckets, + addr, + elt, + (time_t)now, + ctx->leaky_bucket_ttl); + msg_info ("propagating ratelimiting %s, %.1f max elts", + rspamd_inet_address_to_string(addr), + ctx->leaky_bucket_burst); + rspamd_fuzzy_maybe_call_blacklisted(ctx, addr, "ratelimit"); + } + } + + if (write (fd, &rep, sizeof (rep)) != sizeof (rep)) { + msg_err ("cannot write reply to the control socket: %s", + strerror (errno)); + } + + return TRUE; +} + +static gboolean rspamd_fuzzy_storage_reload (struct rspamd_main *rspamd_main, struct rspamd_worker *worker, gint fd, gint attached_fd, @@ -2856,6 +2955,8 @@ start_fuzzy (struct rspamd_worker *worker) rspamd_fuzzy_storage_stat, ctx); rspamd_control_worker_add_cmd_handler (worker, RSPAMD_CONTROL_FUZZY_SYNC, rspamd_fuzzy_storage_sync, ctx); + rspamd_control_worker_add_cmd_handler (worker, RSPAMD_CONTROL_FUZZY_BLOCKED, + rspamd_fuzzy_control_blocked, ctx); if (ctx->update_map != NULL) { |