diff options
author | Vsevolod Stakhov <vsevolod@rspamd.com> | 2023-07-01 13:32:22 +0100 |
---|---|---|
committer | Vsevolod Stakhov <vsevolod@rspamd.com> | 2023-07-01 13:32:22 +0100 |
commit | 845ae26f6ae5c1bac83fdf49ad7b9ef34482a9a5 (patch) | |
tree | 64cada48b4680b0efe30251cb414733fa784bff9 | |
parent | 0e4932734ca0d7f5e1e5e90e554b186e271a0a4f (diff) | |
download | rspamd-845ae26f6ae5c1bac83fdf49ad7b9ef34482a9a5.tar.gz rspamd-845ae26f6ae5c1bac83fdf49ad7b9ef34482a9a5.zip |
[Feature] Allow fuzzy workers to exchange blocked information
-rw-r--r-- | src/fuzzy_storage.c | 107 | ||||
-rw-r--r-- | src/libserver/hyperscan_tools.cxx | 2 | ||||
-rw-r--r-- | src/libserver/rspamd_control.c | 18 | ||||
-rw-r--r-- | src/libserver/rspamd_control.h | 27 |
4 files changed, 147 insertions, 7 deletions
diff --git a/src/fuzzy_storage.c b/src/fuzzy_storage.c index 33f9d40e8..31a2c46e6 100644 --- a/src/fuzzy_storage.c +++ b/src/fuzzy_storage.c @@ -273,7 +273,7 @@ rspamd_fuzzy_check_ratelimit (struct fuzzy_session *session) (time_t)session->timestamp); if (elt) { - gboolean ratelimited = FALSE; + gboolean ratelimited = FALSE, new_ratelimit = FALSE; if (isnan (elt->cur)) { /* Ratelimit exceeded, preserve it for the whole ttl */ @@ -301,18 +301,40 @@ rspamd_fuzzy_check_ratelimit (struct fuzzy_session *session) rspamd_inet_address_to_string (masked), session->ctx->leaky_bucket_burst); elt->cur = NAN; + new_ratelimit = TRUE; } else { elt->cur ++; /* Allow one more request */ } } - rspamd_inet_address_free (masked); - if (ratelimited) { rspamd_fuzzy_maybe_call_blacklisted(session->ctx, session->addr, "ratelimit"); } + if (new_ratelimit) { + struct rspamd_srv_command srv_cmd; + + srv_cmd.type = RSPAMD_SRV_FUZZY_BLOCKED; + srv_cmd.cmd.fuzzy_blocked.af = rspamd_inet_address_get_af(masked); + + if (srv_cmd.cmd.fuzzy_blocked.af == AF_INET || srv_cmd.cmd.fuzzy_blocked.af == AF_INET6) { + socklen_t slen; + struct sockaddr *sa = rspamd_inet_address_get_sa(masked, &slen); + + if (slen <= sizeof(srv_cmd.cmd.fuzzy_blocked.addr)) { + memcpy(&srv_cmd.cmd.fuzzy_blocked.addr, sa, slen); + msg_debug("propagating blocked address to other workers"); + rspamd_srv_send_command(session->worker, session->ctx->event_loop, &srv_cmd, -1, NULL, NULL); + } + else { + msg_err("bad address length: %d, expected to be %d", (int)slen, (int)sizeof(srv_cmd.cmd.fuzzy_blocked.addr)); + } + } + } + + rspamd_inet_address_free (masked); + return !ratelimited; } else { @@ -1907,6 +1929,83 @@ rspamd_fuzzy_storage_sync (struct rspamd_main *rspamd_main, } static gboolean +rspamd_fuzzy_control_blocked (struct rspamd_main *rspamd_main, + struct rspamd_worker *worker, gint fd, + gint attached_fd, + struct rspamd_control_command *cmd, + gpointer ud) +{ + struct rspamd_fuzzy_storage_ctx *ctx = (struct rspamd_fuzzy_storage_ctx *)ud; + struct rspamd_control_reply rep; + struct rspamd_leaky_bucket_elt *elt; + ev_tstamp now = ev_now (ctx->event_loop); + rspamd_inet_addr_t *addr = NULL; + + rep.type = RSPAMD_CONTROL_FUZZY_BLOCKED; + rep.reply.fuzzy_blocked.status = 0; + + if (cmd->cmd.fuzzy_blocked.af == AF_INET) { + addr = rspamd_inet_address_from_sa(&cmd->cmd.fuzzy_blocked.addr.sa, + sizeof (struct sockaddr_in)); + } + else if (cmd->cmd.fuzzy_blocked.af == AF_INET6) { + addr = rspamd_inet_address_from_sa(&cmd->cmd.fuzzy_blocked.addr.sa, + sizeof (struct sockaddr_in6)); + } + else { + msg_err ("invalid address family: %d", cmd->cmd.fuzzy_blocked.af); + rep.reply.fuzzy_blocked.status = -1; + } + + if (addr) { + elt = rspamd_lru_hash_lookup(ctx->ratelimit_buckets, addr, + (time_t) now); + + if (elt) { + if (isnan (elt->cur)) { + /* Already ratelimited, ignore */ + } + else { + elt->last = now; + elt->cur = NAN; + + msg_info ("propagating ratelimiting %s, %.1f max elts", + rspamd_inet_address_to_string(addr), + ctx->leaky_bucket_burst); + rspamd_fuzzy_maybe_call_blacklisted(ctx, addr, "ratelimit"); + } + + rspamd_inet_address_free(addr); + + } + else { + /* New bucket */ + elt = g_malloc(sizeof(*elt)); + elt->addr = addr; /* transfer ownership */ + elt->cur = NAN; + elt->last = now; + + rspamd_lru_hash_insert(ctx->ratelimit_buckets, + addr, + elt, + (time_t)now, + ctx->leaky_bucket_ttl); + msg_info ("propagating ratelimiting %s, %.1f max elts", + rspamd_inet_address_to_string(addr), + ctx->leaky_bucket_burst); + rspamd_fuzzy_maybe_call_blacklisted(ctx, addr, "ratelimit"); + } + } + + if (write (fd, &rep, sizeof (rep)) != sizeof (rep)) { + msg_err ("cannot write reply to the control socket: %s", + strerror (errno)); + } + + return TRUE; +} + +static gboolean rspamd_fuzzy_storage_reload (struct rspamd_main *rspamd_main, struct rspamd_worker *worker, gint fd, gint attached_fd, @@ -2856,6 +2955,8 @@ start_fuzzy (struct rspamd_worker *worker) rspamd_fuzzy_storage_stat, ctx); rspamd_control_worker_add_cmd_handler (worker, RSPAMD_CONTROL_FUZZY_SYNC, rspamd_fuzzy_storage_sync, ctx); + rspamd_control_worker_add_cmd_handler (worker, RSPAMD_CONTROL_FUZZY_BLOCKED, + rspamd_fuzzy_control_blocked, ctx); if (ctx->update_map != NULL) { diff --git a/src/libserver/hyperscan_tools.cxx b/src/libserver/hyperscan_tools.cxx index 8159cbd26..6fdc72e87 100644 --- a/src/libserver/hyperscan_tools.cxx +++ b/src/libserver/hyperscan_tools.cxx @@ -609,7 +609,7 @@ rspamd_hyperscan_notice_known(const char *fname) (int)strlen(fname), fname, (int)sizeof(notice_cmd.cmd.hyperscan_cache_file.path)); } else { - notice_cmd.type = RSPAMD_NOTICE_HYPERSCAN_CACHE; + notice_cmd.type = RSPAMD_SRV_NOTICE_HYPERSCAN_CACHE; rspamd_strlcpy(notice_cmd.cmd.hyperscan_cache_file.path, fname, sizeof(notice_cmd.cmd.hyperscan_cache_file.path)); rspamd_srv_send_command(rspamd_current_worker, rspamd_current_worker->srv->event_loop, ¬ice_cmd, -1, diff --git a/src/libserver/rspamd_control.c b/src/libserver/rspamd_control.c index 82913a19f..cbafec270 100644 --- a/src/libserver/rspamd_control.c +++ b/src/libserver/rspamd_control.c @@ -629,6 +629,7 @@ rspamd_control_default_cmd_handler (gint fd, case RSPAMD_CONTROL_FUZZY_SYNC: case RSPAMD_CONTROL_LOG_PIPE: case RSPAMD_CONTROL_CHILD_CHANGE: + case RSPAMD_CONTROL_FUZZY_BLOCKED: break; case RSPAMD_CONTROL_RERESOLVE: if (cd->worker->srv->cfg) { @@ -1020,12 +1021,22 @@ rspamd_srv_handler (EV_P_ ev_io *w, int revents) case RSPAMD_SRV_HEALTH: rspamd_fill_health_reply (srv, &rdata->rep); break; - case RSPAMD_NOTICE_HYPERSCAN_CACHE: + case RSPAMD_SRV_NOTICE_HYPERSCAN_CACHE: #ifdef WITH_HYPERSCAN rspamd_hyperscan_notice_known(cmd.cmd.hyperscan_cache_file.path); #endif rdata->rep.reply.hyperscan_cache_file.unused = 0; break; + case RSPAMD_SRV_FUZZY_BLOCKED: + /* Broadcast command to all workers */ + memset (&wcmd, 0, sizeof (wcmd)); + wcmd.type = RSPAMD_CONTROL_FUZZY_BLOCKED; + /* Ensure that memcpy is safe */ + G_STATIC_ASSERT(sizeof(wcmd.cmd.fuzzy_blocked) == sizeof(cmd.cmd.fuzzy_blocked)); + memcpy(&wcmd.cmd.fuzzy_blocked, &cmd.cmd.fuzzy_blocked, sizeof(wcmd.cmd.fuzzy_blocked)); + rspamd_control_broadcast_cmd (srv, &wcmd, rfd, + rspamd_control_ignore_io_handler, NULL, worker->pid); + break; default: msg_err ("unknown command type: %d", cmd.type); break; @@ -1354,9 +1365,12 @@ const gchar *rspamd_srv_command_to_string (enum rspamd_srv_type cmd) case RSPAMD_SRV_HEALTH: reply = "health"; break; - case RSPAMD_NOTICE_HYPERSCAN_CACHE: + case RSPAMD_SRV_NOTICE_HYPERSCAN_CACHE: reply = "notice_hyperscan_cache"; break; + case RSPAMD_SRV_FUZZY_BLOCKED: + reply = "fuzzy_blocked"; + break; } return reply; diff --git a/src/libserver/rspamd_control.h b/src/libserver/rspamd_control.h index 049c9b80c..dd661c145 100644 --- a/src/libserver/rspamd_control.h +++ b/src/libserver/rspamd_control.h @@ -36,6 +36,7 @@ enum rspamd_control_type { RSPAMD_CONTROL_FUZZY_SYNC, RSPAMD_CONTROL_MONITORED_CHANGE, RSPAMD_CONTROL_CHILD_CHANGE, + RSPAMD_CONTROL_FUZZY_BLOCKED, RSPAMD_CONTROL_MAX }; @@ -47,7 +48,8 @@ enum rspamd_srv_type { RSPAMD_SRV_ON_FORK, RSPAMD_SRV_HEARTBEAT, RSPAMD_SRV_HEALTH, - RSPAMD_NOTICE_HYPERSCAN_CACHE, + RSPAMD_SRV_NOTICE_HYPERSCAN_CACHE, + RSPAMD_SRV_FUZZY_BLOCKED, /* Used to notify main process about a blocked ip */ }; enum rspamd_log_pipe_type { @@ -96,6 +98,14 @@ struct rspamd_control_command { pid_t pid; guint additional; } child_change; + struct { + union { + struct sockaddr sa; + struct sockaddr_in s4; + struct sockaddr_in6 s6; + } addr; + sa_family_t af; + } fuzzy_blocked; } cmd; }; @@ -134,6 +144,9 @@ struct rspamd_control_reply { struct { guint status; } fuzzy_sync; + struct { + guint status; + } fuzzy_blocked; } reply; }; @@ -179,6 +192,15 @@ struct rspamd_srv_command { struct { char path[CONTROL_PATHLEN]; } hyperscan_cache_file; + /* Send when one worker has blocked some IP address */ + struct { + union { + struct sockaddr sa; + struct sockaddr_in s4; + struct sockaddr_in6 s6; + } addr; + sa_family_t af; + } fuzzy_blocked; } cmd; }; @@ -213,6 +235,9 @@ struct rspamd_srv_reply { struct { int unused; } hyperscan_cache_file; + struct { + int unused; + } fuzzy_blocked; } reply; }; |