Browse Source

[Feature] Allow fuzzy workers to exchange blocked information

tags/3.6
Vsevolod Stakhov 11 months ago
parent
commit
845ae26f6a
No account linked to committer's email address

+ 104
- 3
src/fuzzy_storage.c View File

@@ -273,7 +273,7 @@ rspamd_fuzzy_check_ratelimit (struct fuzzy_session *session)
(time_t)session->timestamp);

if (elt) {
gboolean ratelimited = FALSE;
gboolean ratelimited = FALSE, new_ratelimit = FALSE;

if (isnan (elt->cur)) {
/* Ratelimit exceeded, preserve it for the whole ttl */
@@ -301,18 +301,40 @@ rspamd_fuzzy_check_ratelimit (struct fuzzy_session *session)
rspamd_inet_address_to_string (masked),
session->ctx->leaky_bucket_burst);
elt->cur = NAN;
new_ratelimit = TRUE;
}
else {
elt->cur ++; /* Allow one more request */
}
}

rspamd_inet_address_free (masked);

if (ratelimited) {
rspamd_fuzzy_maybe_call_blacklisted(session->ctx, session->addr, "ratelimit");
}

if (new_ratelimit) {
struct rspamd_srv_command srv_cmd;

srv_cmd.type = RSPAMD_SRV_FUZZY_BLOCKED;
srv_cmd.cmd.fuzzy_blocked.af = rspamd_inet_address_get_af(masked);

if (srv_cmd.cmd.fuzzy_blocked.af == AF_INET || srv_cmd.cmd.fuzzy_blocked.af == AF_INET6) {
socklen_t slen;
struct sockaddr *sa = rspamd_inet_address_get_sa(masked, &slen);

if (slen <= sizeof(srv_cmd.cmd.fuzzy_blocked.addr)) {
memcpy(&srv_cmd.cmd.fuzzy_blocked.addr, sa, slen);
msg_debug("propagating blocked address to other workers");
rspamd_srv_send_command(session->worker, session->ctx->event_loop, &srv_cmd, -1, NULL, NULL);
}
else {
msg_err("bad address length: %d, expected to be %d", (int)slen, (int)sizeof(srv_cmd.cmd.fuzzy_blocked.addr));
}
}
}

rspamd_inet_address_free (masked);

return !ratelimited;
}
else {
@@ -1906,6 +1928,83 @@ rspamd_fuzzy_storage_sync (struct rspamd_main *rspamd_main,
return TRUE;
}

static gboolean
rspamd_fuzzy_control_blocked (struct rspamd_main *rspamd_main,
struct rspamd_worker *worker, gint fd,
gint attached_fd,
struct rspamd_control_command *cmd,
gpointer ud)
{
struct rspamd_fuzzy_storage_ctx *ctx = (struct rspamd_fuzzy_storage_ctx *)ud;
struct rspamd_control_reply rep;
struct rspamd_leaky_bucket_elt *elt;
ev_tstamp now = ev_now (ctx->event_loop);
rspamd_inet_addr_t *addr = NULL;

rep.type = RSPAMD_CONTROL_FUZZY_BLOCKED;
rep.reply.fuzzy_blocked.status = 0;

if (cmd->cmd.fuzzy_blocked.af == AF_INET) {
addr = rspamd_inet_address_from_sa(&cmd->cmd.fuzzy_blocked.addr.sa,
sizeof (struct sockaddr_in));
}
else if (cmd->cmd.fuzzy_blocked.af == AF_INET6) {
addr = rspamd_inet_address_from_sa(&cmd->cmd.fuzzy_blocked.addr.sa,
sizeof (struct sockaddr_in6));
}
else {
msg_err ("invalid address family: %d", cmd->cmd.fuzzy_blocked.af);
rep.reply.fuzzy_blocked.status = -1;
}

if (addr) {
elt = rspamd_lru_hash_lookup(ctx->ratelimit_buckets, addr,
(time_t) now);

if (elt) {
if (isnan (elt->cur)) {
/* Already ratelimited, ignore */
}
else {
elt->last = now;
elt->cur = NAN;

msg_info ("propagating ratelimiting %s, %.1f max elts",
rspamd_inet_address_to_string(addr),
ctx->leaky_bucket_burst);
rspamd_fuzzy_maybe_call_blacklisted(ctx, addr, "ratelimit");
}

rspamd_inet_address_free(addr);

}
else {
/* New bucket */
elt = g_malloc(sizeof(*elt));
elt->addr = addr; /* transfer ownership */
elt->cur = NAN;
elt->last = now;

rspamd_lru_hash_insert(ctx->ratelimit_buckets,
addr,
elt,
(time_t)now,
ctx->leaky_bucket_ttl);
msg_info ("propagating ratelimiting %s, %.1f max elts",
rspamd_inet_address_to_string(addr),
ctx->leaky_bucket_burst);
rspamd_fuzzy_maybe_call_blacklisted(ctx, addr, "ratelimit");
}
}

if (write (fd, &rep, sizeof (rep)) != sizeof (rep)) {
msg_err ("cannot write reply to the control socket: %s",
strerror (errno));
}

return TRUE;
}

static gboolean
rspamd_fuzzy_storage_reload (struct rspamd_main *rspamd_main,
struct rspamd_worker *worker, gint fd,
@@ -2856,6 +2955,8 @@ start_fuzzy (struct rspamd_worker *worker)
rspamd_fuzzy_storage_stat, ctx);
rspamd_control_worker_add_cmd_handler (worker, RSPAMD_CONTROL_FUZZY_SYNC,
rspamd_fuzzy_storage_sync, ctx);
rspamd_control_worker_add_cmd_handler (worker, RSPAMD_CONTROL_FUZZY_BLOCKED,
rspamd_fuzzy_control_blocked, ctx);


if (ctx->update_map != NULL) {

+ 1
- 1
src/libserver/hyperscan_tools.cxx View File

@@ -609,7 +609,7 @@ rspamd_hyperscan_notice_known(const char *fname)
(int)strlen(fname), fname, (int)sizeof(notice_cmd.cmd.hyperscan_cache_file.path));
}
else {
notice_cmd.type = RSPAMD_NOTICE_HYPERSCAN_CACHE;
notice_cmd.type = RSPAMD_SRV_NOTICE_HYPERSCAN_CACHE;
rspamd_strlcpy(notice_cmd.cmd.hyperscan_cache_file.path, fname, sizeof(notice_cmd.cmd.hyperscan_cache_file.path));
rspamd_srv_send_command(rspamd_current_worker,
rspamd_current_worker->srv->event_loop, &notice_cmd, -1,

+ 16
- 2
src/libserver/rspamd_control.c View File

@@ -629,6 +629,7 @@ rspamd_control_default_cmd_handler (gint fd,
case RSPAMD_CONTROL_FUZZY_SYNC:
case RSPAMD_CONTROL_LOG_PIPE:
case RSPAMD_CONTROL_CHILD_CHANGE:
case RSPAMD_CONTROL_FUZZY_BLOCKED:
break;
case RSPAMD_CONTROL_RERESOLVE:
if (cd->worker->srv->cfg) {
@@ -1020,12 +1021,22 @@ rspamd_srv_handler (EV_P_ ev_io *w, int revents)
case RSPAMD_SRV_HEALTH:
rspamd_fill_health_reply (srv, &rdata->rep);
break;
case RSPAMD_NOTICE_HYPERSCAN_CACHE:
case RSPAMD_SRV_NOTICE_HYPERSCAN_CACHE:
#ifdef WITH_HYPERSCAN
rspamd_hyperscan_notice_known(cmd.cmd.hyperscan_cache_file.path);
#endif
rdata->rep.reply.hyperscan_cache_file.unused = 0;
break;
case RSPAMD_SRV_FUZZY_BLOCKED:
/* Broadcast command to all workers */
memset (&wcmd, 0, sizeof (wcmd));
wcmd.type = RSPAMD_CONTROL_FUZZY_BLOCKED;
/* Ensure that memcpy is safe */
G_STATIC_ASSERT(sizeof(wcmd.cmd.fuzzy_blocked) == sizeof(cmd.cmd.fuzzy_blocked));
memcpy(&wcmd.cmd.fuzzy_blocked, &cmd.cmd.fuzzy_blocked, sizeof(wcmd.cmd.fuzzy_blocked));
rspamd_control_broadcast_cmd (srv, &wcmd, rfd,
rspamd_control_ignore_io_handler, NULL, worker->pid);
break;
default:
msg_err ("unknown command type: %d", cmd.type);
break;
@@ -1354,9 +1365,12 @@ const gchar *rspamd_srv_command_to_string (enum rspamd_srv_type cmd)
case RSPAMD_SRV_HEALTH:
reply = "health";
break;
case RSPAMD_NOTICE_HYPERSCAN_CACHE:
case RSPAMD_SRV_NOTICE_HYPERSCAN_CACHE:
reply = "notice_hyperscan_cache";
break;
case RSPAMD_SRV_FUZZY_BLOCKED:
reply = "fuzzy_blocked";
break;
}

return reply;

+ 26
- 1
src/libserver/rspamd_control.h View File

@@ -36,6 +36,7 @@ enum rspamd_control_type {
RSPAMD_CONTROL_FUZZY_SYNC,
RSPAMD_CONTROL_MONITORED_CHANGE,
RSPAMD_CONTROL_CHILD_CHANGE,
RSPAMD_CONTROL_FUZZY_BLOCKED,
RSPAMD_CONTROL_MAX
};

@@ -47,7 +48,8 @@ enum rspamd_srv_type {
RSPAMD_SRV_ON_FORK,
RSPAMD_SRV_HEARTBEAT,
RSPAMD_SRV_HEALTH,
RSPAMD_NOTICE_HYPERSCAN_CACHE,
RSPAMD_SRV_NOTICE_HYPERSCAN_CACHE,
RSPAMD_SRV_FUZZY_BLOCKED, /* Used to notify main process about a blocked ip */
};

enum rspamd_log_pipe_type {
@@ -96,6 +98,14 @@ struct rspamd_control_command {
pid_t pid;
guint additional;
} child_change;
struct {
union {
struct sockaddr sa;
struct sockaddr_in s4;
struct sockaddr_in6 s6;
} addr;
sa_family_t af;
} fuzzy_blocked;
} cmd;
};

@@ -134,6 +144,9 @@ struct rspamd_control_reply {
struct {
guint status;
} fuzzy_sync;
struct {
guint status;
} fuzzy_blocked;
} reply;
};

@@ -179,6 +192,15 @@ struct rspamd_srv_command {
struct {
char path[CONTROL_PATHLEN];
} hyperscan_cache_file;
/* Send when one worker has blocked some IP address */
struct {
union {
struct sockaddr sa;
struct sockaddr_in s4;
struct sockaddr_in6 s6;
} addr;
sa_family_t af;
} fuzzy_blocked;
} cmd;
};

@@ -213,6 +235,9 @@ struct rspamd_srv_reply {
struct {
int unused;
} hyperscan_cache_file;
struct {
int unused;
} fuzzy_blocked;
} reply;
};


Loading…
Cancel
Save