aboutsummaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
authorVsevolod Stakhov <vsevolod@rspamd.com>2023-07-01 13:32:22 +0100
committerVsevolod Stakhov <vsevolod@rspamd.com>2023-07-01 13:32:22 +0100
commit845ae26f6ae5c1bac83fdf49ad7b9ef34482a9a5 (patch)
tree64cada48b4680b0efe30251cb414733fa784bff9 /src
parent0e4932734ca0d7f5e1e5e90e554b186e271a0a4f (diff)
downloadrspamd-845ae26f6ae5c1bac83fdf49ad7b9ef34482a9a5.tar.gz
rspamd-845ae26f6ae5c1bac83fdf49ad7b9ef34482a9a5.zip
[Feature] Allow fuzzy workers to exchange blocked information
Diffstat (limited to 'src')
-rw-r--r--src/fuzzy_storage.c107
-rw-r--r--src/libserver/hyperscan_tools.cxx2
-rw-r--r--src/libserver/rspamd_control.c18
-rw-r--r--src/libserver/rspamd_control.h27
4 files changed, 147 insertions, 7 deletions
diff --git a/src/fuzzy_storage.c b/src/fuzzy_storage.c
index 33f9d40e8..31a2c46e6 100644
--- a/src/fuzzy_storage.c
+++ b/src/fuzzy_storage.c
@@ -273,7 +273,7 @@ rspamd_fuzzy_check_ratelimit (struct fuzzy_session *session)
(time_t)session->timestamp);
if (elt) {
- gboolean ratelimited = FALSE;
+ gboolean ratelimited = FALSE, new_ratelimit = FALSE;
if (isnan (elt->cur)) {
/* Ratelimit exceeded, preserve it for the whole ttl */
@@ -301,18 +301,40 @@ rspamd_fuzzy_check_ratelimit (struct fuzzy_session *session)
rspamd_inet_address_to_string (masked),
session->ctx->leaky_bucket_burst);
elt->cur = NAN;
+ new_ratelimit = TRUE;
}
else {
elt->cur ++; /* Allow one more request */
}
}
- rspamd_inet_address_free (masked);
-
if (ratelimited) {
rspamd_fuzzy_maybe_call_blacklisted(session->ctx, session->addr, "ratelimit");
}
+ if (new_ratelimit) {
+ struct rspamd_srv_command srv_cmd;
+
+ srv_cmd.type = RSPAMD_SRV_FUZZY_BLOCKED;
+ srv_cmd.cmd.fuzzy_blocked.af = rspamd_inet_address_get_af(masked);
+
+ if (srv_cmd.cmd.fuzzy_blocked.af == AF_INET || srv_cmd.cmd.fuzzy_blocked.af == AF_INET6) {
+ socklen_t slen;
+ struct sockaddr *sa = rspamd_inet_address_get_sa(masked, &slen);
+
+ if (slen <= sizeof(srv_cmd.cmd.fuzzy_blocked.addr)) {
+ memcpy(&srv_cmd.cmd.fuzzy_blocked.addr, sa, slen);
+ msg_debug("propagating blocked address to other workers");
+ rspamd_srv_send_command(session->worker, session->ctx->event_loop, &srv_cmd, -1, NULL, NULL);
+ }
+ else {
+ msg_err("bad address length: %d, expected to be %d", (int)slen, (int)sizeof(srv_cmd.cmd.fuzzy_blocked.addr));
+ }
+ }
+ }
+
+ rspamd_inet_address_free (masked);
+
return !ratelimited;
}
else {
@@ -1907,6 +1929,83 @@ rspamd_fuzzy_storage_sync (struct rspamd_main *rspamd_main,
}
static gboolean
+rspamd_fuzzy_control_blocked (struct rspamd_main *rspamd_main,
+ struct rspamd_worker *worker, gint fd,
+ gint attached_fd,
+ struct rspamd_control_command *cmd,
+ gpointer ud)
+{
+ struct rspamd_fuzzy_storage_ctx *ctx = (struct rspamd_fuzzy_storage_ctx *)ud;
+ struct rspamd_control_reply rep;
+ struct rspamd_leaky_bucket_elt *elt;
+ ev_tstamp now = ev_now (ctx->event_loop);
+ rspamd_inet_addr_t *addr = NULL;
+
+ rep.type = RSPAMD_CONTROL_FUZZY_BLOCKED;
+ rep.reply.fuzzy_blocked.status = 0;
+
+ if (cmd->cmd.fuzzy_blocked.af == AF_INET) {
+ addr = rspamd_inet_address_from_sa(&cmd->cmd.fuzzy_blocked.addr.sa,
+ sizeof (struct sockaddr_in));
+ }
+ else if (cmd->cmd.fuzzy_blocked.af == AF_INET6) {
+ addr = rspamd_inet_address_from_sa(&cmd->cmd.fuzzy_blocked.addr.sa,
+ sizeof (struct sockaddr_in6));
+ }
+ else {
+ msg_err ("invalid address family: %d", cmd->cmd.fuzzy_blocked.af);
+ rep.reply.fuzzy_blocked.status = -1;
+ }
+
+ if (addr) {
+ elt = rspamd_lru_hash_lookup(ctx->ratelimit_buckets, addr,
+ (time_t) now);
+
+ if (elt) {
+ if (isnan (elt->cur)) {
+ /* Already ratelimited, ignore */
+ }
+ else {
+ elt->last = now;
+ elt->cur = NAN;
+
+ msg_info ("propagating ratelimiting %s, %.1f max elts",
+ rspamd_inet_address_to_string(addr),
+ ctx->leaky_bucket_burst);
+ rspamd_fuzzy_maybe_call_blacklisted(ctx, addr, "ratelimit");
+ }
+
+ rspamd_inet_address_free(addr);
+
+ }
+ else {
+ /* New bucket */
+ elt = g_malloc(sizeof(*elt));
+ elt->addr = addr; /* transfer ownership */
+ elt->cur = NAN;
+ elt->last = now;
+
+ rspamd_lru_hash_insert(ctx->ratelimit_buckets,
+ addr,
+ elt,
+ (time_t)now,
+ ctx->leaky_bucket_ttl);
+ msg_info ("propagating ratelimiting %s, %.1f max elts",
+ rspamd_inet_address_to_string(addr),
+ ctx->leaky_bucket_burst);
+ rspamd_fuzzy_maybe_call_blacklisted(ctx, addr, "ratelimit");
+ }
+ }
+
+ if (write (fd, &rep, sizeof (rep)) != sizeof (rep)) {
+ msg_err ("cannot write reply to the control socket: %s",
+ strerror (errno));
+ }
+
+ return TRUE;
+}
+
+static gboolean
rspamd_fuzzy_storage_reload (struct rspamd_main *rspamd_main,
struct rspamd_worker *worker, gint fd,
gint attached_fd,
@@ -2856,6 +2955,8 @@ start_fuzzy (struct rspamd_worker *worker)
rspamd_fuzzy_storage_stat, ctx);
rspamd_control_worker_add_cmd_handler (worker, RSPAMD_CONTROL_FUZZY_SYNC,
rspamd_fuzzy_storage_sync, ctx);
+ rspamd_control_worker_add_cmd_handler (worker, RSPAMD_CONTROL_FUZZY_BLOCKED,
+ rspamd_fuzzy_control_blocked, ctx);
if (ctx->update_map != NULL) {
diff --git a/src/libserver/hyperscan_tools.cxx b/src/libserver/hyperscan_tools.cxx
index 8159cbd26..6fdc72e87 100644
--- a/src/libserver/hyperscan_tools.cxx
+++ b/src/libserver/hyperscan_tools.cxx
@@ -609,7 +609,7 @@ rspamd_hyperscan_notice_known(const char *fname)
(int)strlen(fname), fname, (int)sizeof(notice_cmd.cmd.hyperscan_cache_file.path));
}
else {
- notice_cmd.type = RSPAMD_NOTICE_HYPERSCAN_CACHE;
+ notice_cmd.type = RSPAMD_SRV_NOTICE_HYPERSCAN_CACHE;
rspamd_strlcpy(notice_cmd.cmd.hyperscan_cache_file.path, fname, sizeof(notice_cmd.cmd.hyperscan_cache_file.path));
rspamd_srv_send_command(rspamd_current_worker,
rspamd_current_worker->srv->event_loop, &notice_cmd, -1,
diff --git a/src/libserver/rspamd_control.c b/src/libserver/rspamd_control.c
index 82913a19f..cbafec270 100644
--- a/src/libserver/rspamd_control.c
+++ b/src/libserver/rspamd_control.c
@@ -629,6 +629,7 @@ rspamd_control_default_cmd_handler (gint fd,
case RSPAMD_CONTROL_FUZZY_SYNC:
case RSPAMD_CONTROL_LOG_PIPE:
case RSPAMD_CONTROL_CHILD_CHANGE:
+ case RSPAMD_CONTROL_FUZZY_BLOCKED:
break;
case RSPAMD_CONTROL_RERESOLVE:
if (cd->worker->srv->cfg) {
@@ -1020,12 +1021,22 @@ rspamd_srv_handler (EV_P_ ev_io *w, int revents)
case RSPAMD_SRV_HEALTH:
rspamd_fill_health_reply (srv, &rdata->rep);
break;
- case RSPAMD_NOTICE_HYPERSCAN_CACHE:
+ case RSPAMD_SRV_NOTICE_HYPERSCAN_CACHE:
#ifdef WITH_HYPERSCAN
rspamd_hyperscan_notice_known(cmd.cmd.hyperscan_cache_file.path);
#endif
rdata->rep.reply.hyperscan_cache_file.unused = 0;
break;
+ case RSPAMD_SRV_FUZZY_BLOCKED:
+ /* Broadcast command to all workers */
+ memset (&wcmd, 0, sizeof (wcmd));
+ wcmd.type = RSPAMD_CONTROL_FUZZY_BLOCKED;
+ /* Ensure that memcpy is safe */
+ G_STATIC_ASSERT(sizeof(wcmd.cmd.fuzzy_blocked) == sizeof(cmd.cmd.fuzzy_blocked));
+ memcpy(&wcmd.cmd.fuzzy_blocked, &cmd.cmd.fuzzy_blocked, sizeof(wcmd.cmd.fuzzy_blocked));
+ rspamd_control_broadcast_cmd (srv, &wcmd, rfd,
+ rspamd_control_ignore_io_handler, NULL, worker->pid);
+ break;
default:
msg_err ("unknown command type: %d", cmd.type);
break;
@@ -1354,9 +1365,12 @@ const gchar *rspamd_srv_command_to_string (enum rspamd_srv_type cmd)
case RSPAMD_SRV_HEALTH:
reply = "health";
break;
- case RSPAMD_NOTICE_HYPERSCAN_CACHE:
+ case RSPAMD_SRV_NOTICE_HYPERSCAN_CACHE:
reply = "notice_hyperscan_cache";
break;
+ case RSPAMD_SRV_FUZZY_BLOCKED:
+ reply = "fuzzy_blocked";
+ break;
}
return reply;
diff --git a/src/libserver/rspamd_control.h b/src/libserver/rspamd_control.h
index 049c9b80c..dd661c145 100644
--- a/src/libserver/rspamd_control.h
+++ b/src/libserver/rspamd_control.h
@@ -36,6 +36,7 @@ enum rspamd_control_type {
RSPAMD_CONTROL_FUZZY_SYNC,
RSPAMD_CONTROL_MONITORED_CHANGE,
RSPAMD_CONTROL_CHILD_CHANGE,
+ RSPAMD_CONTROL_FUZZY_BLOCKED,
RSPAMD_CONTROL_MAX
};
@@ -47,7 +48,8 @@ enum rspamd_srv_type {
RSPAMD_SRV_ON_FORK,
RSPAMD_SRV_HEARTBEAT,
RSPAMD_SRV_HEALTH,
- RSPAMD_NOTICE_HYPERSCAN_CACHE,
+ RSPAMD_SRV_NOTICE_HYPERSCAN_CACHE,
+ RSPAMD_SRV_FUZZY_BLOCKED, /* Used to notify main process about a blocked ip */
};
enum rspamd_log_pipe_type {
@@ -96,6 +98,14 @@ struct rspamd_control_command {
pid_t pid;
guint additional;
} child_change;
+ struct {
+ union {
+ struct sockaddr sa;
+ struct sockaddr_in s4;
+ struct sockaddr_in6 s6;
+ } addr;
+ sa_family_t af;
+ } fuzzy_blocked;
} cmd;
};
@@ -134,6 +144,9 @@ struct rspamd_control_reply {
struct {
guint status;
} fuzzy_sync;
+ struct {
+ guint status;
+ } fuzzy_blocked;
} reply;
};
@@ -179,6 +192,15 @@ struct rspamd_srv_command {
struct {
char path[CONTROL_PATHLEN];
} hyperscan_cache_file;
+ /* Send when one worker has blocked some IP address */
+ struct {
+ union {
+ struct sockaddr sa;
+ struct sockaddr_in s4;
+ struct sockaddr_in6 s6;
+ } addr;
+ sa_family_t af;
+ } fuzzy_blocked;
} cmd;
};
@@ -213,6 +235,9 @@ struct rspamd_srv_reply {
struct {
int unused;
} hyperscan_cache_file;
+ struct {
+ int unused;
+ } fuzzy_blocked;
} reply;
};