summaryrefslogtreecommitdiffstats
path: root/src/fuzzy_storage.c
diff options
context:
space:
mode:
authorVsevolod Stakhov <vsevolod@highsecure.ru>2013-06-04 14:51:42 +0100
committerVsevolod Stakhov <vsevolod@highsecure.ru>2013-06-04 14:51:42 +0100
commit89a411447ebc068efbe4c841c95adeaf9a1bbcd0 (patch)
tree4808e0e1e26ba10735718c2a6cc3711a5b4e8620 /src/fuzzy_storage.c
parentd0544d42a0cd4aa6f1b8d52c90c0e65b9d1c335b (diff)
downloadrspamd-89a411447ebc068efbe4c841c95adeaf9a1bbcd0.tar.gz
rspamd-89a411447ebc068efbe4c841c95adeaf9a1bbcd0.zip
Allow restriction of update commands for fuzzy storage.
Now it is possible to specify ip or networks from which fuzzy updates or removes are possible. Rework sockets logic while I'm here. Create universal utility for parsing ipv4/mask strings.
Diffstat (limited to 'src/fuzzy_storage.c')
-rw-r--r--src/fuzzy_storage.c165
1 files changed, 139 insertions, 26 deletions
diff --git a/src/fuzzy_storage.c b/src/fuzzy_storage.c
index 6a064621a..105e3b95c 100644
--- a/src/fuzzy_storage.c
+++ b/src/fuzzy_storage.c
@@ -37,6 +37,7 @@
#include "message.h"
#include "fuzzy.h"
#include "bloom.h"
+#include "map.h"
#include "fuzzy_storage.h"
#ifdef WITH_JUDY
@@ -96,6 +97,9 @@ struct rspamd_fuzzy_storage_ctx {
guint32 expire;
guint32 frequent_score;
guint32 max_mods;
+ radix_tree_t *update_ips;
+ gchar *update_map;
+ struct event_base *ev_base;
};
struct rspamd_fuzzy_node {
@@ -105,6 +109,20 @@ struct rspamd_fuzzy_node {
fuzzy_hash_t h;
};
+struct fuzzy_session {
+ struct rspamd_worker *worker;
+ struct fuzzy_cmd cmd;
+ gint fd;
+ u_char *pos;
+ socklen_t salen;
+ union {
+ struct sockaddr ss;
+ struct sockaddr_storage sa;
+ struct sockaddr_in s4;
+ struct sockaddr_in6 v6;
+ } client_addr;
+ struct rspamd_fuzzy_storage_ctx *ctx;
+};
#ifndef HAVE_SA_SIGINFO
static void
@@ -189,7 +207,7 @@ sync_cache (struct rspamd_worker *wrk)
bloom_del (bf, node->h.hash_pipe);
server_stat->fuzzy_hashes_expired ++;
server_stat->fuzzy_hashes --;
- g_free (node);
+ g_slice_free1 (sizeof (struct rspamd_fuzzy_node), node);
continue;
}
if (write (fd, node, sizeof (struct rspamd_fuzzy_node)) == -1) {
@@ -221,7 +239,7 @@ sync_cache (struct rspamd_worker *wrk)
bloom_del (bf, node->h.hash_pipe);
server_stat->fuzzy_hashes_expired ++;
server_stat->fuzzy_hashes --;
- g_free (node);
+ g_slice_free1 (sizeof (struct rspamd_fuzzy_node), node);
continue;
}
if (write (fd, node, sizeof (struct rspamd_fuzzy_node)) == -1) {
@@ -276,7 +294,7 @@ sigusr2_handler (gint fd, short what, void *arg)
event_del (&worker->bind_ev);
close (worker->cf->listen_sock);
msg_info ("worker's shutdown is pending in %d sec", SOFT_SHUTDOWN_TIME);
- event_loopexit (&tv);
+ event_base_loopexit (ctx->ev_base, &tv);
mods = ctx->max_mods + 1;
sync_cache (worker);
return;
@@ -364,7 +382,7 @@ read_hashes_file (struct rspamd_worker *wrk)
}
for (;;) {
- node = g_malloc (sizeof (struct rspamd_fuzzy_node));
+ node = g_slice_alloc (sizeof (struct rspamd_fuzzy_node));
if (version == 0) {
r = read (fd, &legacy_node, sizeof (legacy_node));
if (r != sizeof (legacy_node)) {
@@ -544,7 +562,7 @@ process_write_command (struct fuzzy_cmd *cmd, struct rspamd_fuzzy_storage_ctx *c
}
}
- h = g_malloc (sizeof (struct rspamd_fuzzy_node));
+ h = g_slice_alloc (sizeof (struct rspamd_fuzzy_node));
memcpy (&h->h.hash_pipe, &cmd->hash, sizeof (cmd->hash));
h->h.block_size = cmd->blocksize;
h->time = (guint64) time (NULL);
@@ -585,7 +603,7 @@ delete_hash (GQueue *hash, fuzzy_hash_t *s, struct rspamd_fuzzy_storage_ctx *ctx
if (pvalue) {
data = *pvalue;
res = JudySLDel (&jtree, s->hash_pipe, PJE0);
- g_free (data);
+ g_slice_free1 (sizeof (struct rspamd_fuzzy_node), data);
bloom_del (bf, s->hash_pipe);
msg_info ("fuzzy hash was successfully deleted");
server_stat->fuzzy_hashes --;
@@ -600,7 +618,7 @@ delete_hash (GQueue *hash, fuzzy_hash_t *s, struct rspamd_fuzzy_storage_ctx *ctx
while (cur) {
h = cur->data;
if (fuzzy_compare_hashes (&h->h, s) > LEV_LIMIT) {
- g_free (h);
+ g_slice_free1 (sizeof (struct rspamd_fuzzy_node), h);
tmp = cur;
cur = g_list_next (cur);
g_queue_delete_link (hash, tmp);
@@ -653,15 +671,35 @@ process_delete_command (struct fuzzy_cmd *cmd, struct rspamd_fuzzy_storage_ctx *
return res;
}
+/**
+ * Checks the client's address for update commands permission
+ */
+static gboolean
+check_fuzzy_client (struct fuzzy_session *session)
+{
+ if (session->ctx->update_ips != NULL) {
+ /* XXX: cannot work with ipv6 addresses */
+ if (session->client_addr.ss.sa_family != AF_INET) {
+ return FALSE;
+ }
+ if (radix32tree_find (session->ctx->update_ips,
+ ntohl (session->client_addr.s4.sin_addr.s_addr)) == RADIX_NO_VALUE) {
+ return FALSE;
+ }
+ }
+
+ return TRUE;
+}
+
#define CMD_PROCESS(x) \
do { \
if (process_##x##_command (&session->cmd, session->worker->ctx)) { \
- if (sendto (session->fd, "OK" CRLF, sizeof ("OK" CRLF) - 1, 0, (struct sockaddr *)&session->sa, session->salen) == -1) { \
+ if (sendto (session->fd, "OK" CRLF, sizeof ("OK" CRLF) - 1, 0, &session->client_addr.ss, session->salen) == -1) { \
msg_err ("error while writing reply: %s", strerror (errno)); \
} \
} \
else { \
- if (sendto (session->fd, "ERR" CRLF, sizeof ("ERR" CRLF) - 1, 0, (struct sockaddr *)&session->sa, session->salen) == -1) { \
+ if (sendto (session->fd, "ERR" CRLF, sizeof ("ERR" CRLF) - 1, 0, &session->client_addr.ss, session->salen) == -1) { \
msg_err ("error while writing reply: %s", strerror (errno)); \
} \
} \
@@ -678,24 +716,45 @@ process_fuzzy_command (struct fuzzy_session *session)
r = process_check_command (&session->cmd, &flag, session->worker->ctx);
if (r != 0) {
r = rspamd_snprintf (buf, sizeof (buf), "OK %d %d" CRLF, r, flag);
- if (sendto (session->fd, buf, r, 0, (struct sockaddr *)&session->sa, session->salen) == -1) {
+ if (sendto (session->fd, buf, r, 0,
+ &session->client_addr.ss, session->salen) == -1) {
msg_err ("error while writing reply: %s", strerror (errno));
}
}
else {
- if (sendto (session->fd, "ERR" CRLF, sizeof ("ERR" CRLF) - 1, 0, (struct sockaddr *)&session->sa, session->salen) == -1) {
+ if (sendto (session->fd, "ERR" CRLF, sizeof ("ERR" CRLF) - 1, 0,
+ &session->client_addr.ss, session->salen) == -1) {
msg_err ("error while writing reply: %s", strerror (errno));
}
}
break;
case FUZZY_WRITE:
- CMD_PROCESS (write);
+ if (!check_fuzzy_client (session)) {
+ msg_info ("try to insert a hash from an untrusted address");
+ if (sendto (session->fd, "UNAUTH" CRLF, sizeof ("UNAUTH" CRLF) - 1, 0,
+ &session->client_addr.ss, session->salen) == -1) {
+ msg_err ("error while writing reply: %s", strerror (errno));
+ }
+ }
+ else {
+ CMD_PROCESS (write);
+ }
break;
case FUZZY_DEL:
- CMD_PROCESS (delete);
+ if (!check_fuzzy_client (session)) {
+ msg_info ("try to delete a hash from an untrusted address");
+ if (sendto (session->fd, "UNAUTH" CRLF, sizeof ("UNAUTH" CRLF) - 1, 0,
+ &session->client_addr.ss, session->salen) == -1) {
+ msg_err ("error while writing reply: %s", strerror (errno));
+ }
+ }
+ else {
+ CMD_PROCESS (delete);
+ }
break;
default:
- if (sendto (session->fd, "ERR" CRLF, sizeof ("ERR" CRLF) - 1, 0, (struct sockaddr *)&session->sa, session->salen) == -1) {
+ if (sendto (session->fd, "ERR" CRLF, sizeof ("ERR" CRLF) - 1, 0,
+ &session->client_addr.ss, session->salen) == -1) {
msg_err ("error while writing reply: %s", strerror (errno));
}
break;
@@ -725,15 +784,20 @@ accept_fuzzy_socket (gint fd, short what, void *arg)
session.worker = worker;
session.fd = fd;
session.pos = (u_char *) & session.cmd;
- session.salen = sizeof (session.sa);
+ session.salen = sizeof (session.client_addr);
+ session.ctx = worker->ctx;
/* Got some data */
if (what == EV_READ) {
- if ((r = recvfrom (fd, session.pos, sizeof (struct fuzzy_cmd), MSG_WAITALL, (struct sockaddr *)&session.sa, &session.salen)) == -1) {
+ while ((r = recvfrom (fd, session.pos, sizeof (struct fuzzy_cmd),
+ MSG_WAITALL, &session.client_addr.ss, &session.salen)) == -1) {
+ if (errno == EINTR) {
+ continue;
+ }
msg_err ("got error while reading from socket: %d, %s", errno, strerror (errno));
return;
}
- else if (r == sizeof (struct fuzzy_cmd)) {
+ if (r == sizeof (struct fuzzy_cmd)) {
/* Assume that the whole command was read */
process_fuzzy_command (&session);
}
@@ -757,9 +821,13 @@ accept_fuzzy_socket (gint fd, short what, void *arg)
static void
sync_callback (gint fd, short what, void *arg)
{
- struct rspamd_worker *worker = (struct rspamd_worker *)arg;
+ struct rspamd_worker *worker = (struct rspamd_worker *)arg;
+ struct rspamd_fuzzy_storage_ctx *ctx;
+
+ ctx = worker->ctx;
/* Timer event */
evtimer_set (&tev, sync_callback, worker);
+ event_base_set (ctx->ev_base, &tev);
/* Plan event with jitter */
tmv.tv_sec = SYNC_TIMEOUT + SYNC_TIMEOUT * g_random_double ();
tmv.tv_usec = 0;
@@ -768,6 +836,29 @@ sync_callback (gint fd, short what, void *arg)
sync_cache (worker);
}
+static gboolean
+parse_fuzzy_update_list (struct rspamd_fuzzy_storage_ctx *ctx)
+{
+ gchar **strvec, **cur;
+ struct in_addr ina;
+ guint32 mask;
+
+ strvec = g_strsplit_set (ctx->update_map, ",", 0);
+ cur = strvec;
+
+ while (*cur != NULL) {
+ /* XXX: handle only ipv4 addresses */
+ if (parse_ipmask_v4 (*cur, &ina, &mask)) {
+ if (ctx->update_ips == NULL) {
+ ctx->update_ips = radix_tree_create ();
+ }
+ radix32tree_add (ctx->update_ips, htonl (ina.s_addr), mask, 1);
+ }
+ }
+
+ return (ctx->update_ips != NULL);
+}
+
gpointer
init_fuzzy (void)
{
@@ -787,11 +878,13 @@ init_fuzzy (void)
register_worker_opt (type, "max_mods", xml_handle_uint32, ctx,
G_STRUCT_OFFSET (struct rspamd_fuzzy_storage_ctx, max_mods));
register_worker_opt (type, "frequent_score", xml_handle_uint32, ctx,
- G_STRUCT_OFFSET (struct rspamd_fuzzy_storage_ctx, frequent_score));
+ G_STRUCT_OFFSET (struct rspamd_fuzzy_storage_ctx, frequent_score));
register_worker_opt (type, "expire", xml_handle_seconds, ctx,
- G_STRUCT_OFFSET (struct rspamd_fuzzy_storage_ctx, expire));
+ G_STRUCT_OFFSET (struct rspamd_fuzzy_storage_ctx, expire));
register_worker_opt (type, "use_judy", xml_handle_boolean, ctx,
- G_STRUCT_OFFSET (struct rspamd_fuzzy_storage_ctx, use_judy));
+ G_STRUCT_OFFSET (struct rspamd_fuzzy_storage_ctx, use_judy));
+ register_worker_opt (type, "allow_update", xml_handle_string, ctx,
+ G_STRUCT_OFFSET (struct rspamd_fuzzy_storage_ctx, update_map));
return ctx;
}
@@ -802,13 +895,14 @@ init_fuzzy (void)
void
start_fuzzy (struct rspamd_worker *worker)
{
- struct sigaction signals;
- struct event sev;
- gint retries = 0;
+ struct sigaction signals;
+ struct event sev;
+ gint retries = 0;
+ struct rspamd_fuzzy_storage_ctx *ctx = worker->ctx;
worker->srv->pid = getpid ();
- event_init ();
+ ctx->ev_base = event_init ();
server_stat = worker->srv->stat;
@@ -817,13 +911,16 @@ start_fuzzy (struct rspamd_worker *worker)
/* SIGUSR2 handler */
signal_set (&worker->sig_ev_usr2, SIGUSR2, sigusr2_handler, (void *) worker);
+ event_base_set (ctx->ev_base, &worker->sig_ev_usr2);
signal_add (&worker->sig_ev_usr2, NULL);
/* SIGUSR1 handler */
signal_set (&worker->sig_ev_usr1, SIGUSR1, sigusr1_handler, (void *) worker);
+ event_base_set (ctx->ev_base, &worker->sig_ev_usr1);
signal_add (&worker->sig_ev_usr1, NULL);
signal_set (&sev, SIGTERM, sigterm_handler, (void *)worker);
+ event_base_set (ctx->ev_base, &sev);
signal_add (&sev, NULL);
/* Listen event */
@@ -844,16 +941,32 @@ start_fuzzy (struct rspamd_worker *worker)
}
/* Timer event */
evtimer_set (&tev, sync_callback, worker);
+ event_base_set (ctx->ev_base, &tev);
/* Plan event with jitter */
tmv.tv_sec = SYNC_TIMEOUT + SYNC_TIMEOUT * g_random_double ();
tmv.tv_usec = 0;
evtimer_add (&tev, &tmv);
event_set (&worker->bind_ev, worker->cf->listen_sock, EV_READ | EV_PERSIST, accept_fuzzy_socket, (void *)worker);
+ event_base_set (ctx->ev_base, &worker->bind_ev);
event_add (&worker->bind_ev, NULL);
+ /* Create radix tree */
+ if (ctx->update_map != NULL) {
+ if (!add_map (worker->srv->cfg, ctx->update_map, "Allow fuzzy updates from specified addresses",
+ read_radix_list, fin_radix_list, (void **)&ctx->update_ips)) {
+ if (!parse_fuzzy_update_list (ctx)) {
+ msg_warn ("cannot load or parse ip list from '%s'", ctx->update_map);
+ }
+ }
+ }
+
+ /* Maps events */
+ start_map_watch (worker->srv->cfg, ctx->ev_base);
+
gperf_profiler_init (worker->srv->cfg, "fuzzy");
- event_loop (0);
+ event_base_loop (ctx->ev_base, 0);
+ close_log (rspamd_main->logger);
exit (EXIT_SUCCESS);
}