Browse Source

Allow restriction of update commands for fuzzy storage.

Now it is possible to specify ip or networks from which fuzzy
updates or removes are possible.

Rework sockets logic while I'm here.

Create universal utility for parsing ipv4/mask strings.
tags/0.5.5
Vsevolod Stakhov 11 years ago
parent
commit
89a411447e
6 changed files with 188 additions and 39 deletions
  1. 139
    26
      src/fuzzy_storage.c
  2. 0
    9
      src/fuzzy_storage.h
  3. 1
    1
      src/map.c
  4. 3
    3
      src/plugins/custom/ipmark/ipmark.c
  5. 36
    0
      src/util.c
  6. 9
    0
      src/util.h

+ 139
- 26
src/fuzzy_storage.c View File

@@ -37,6 +37,7 @@
#include "message.h"
#include "fuzzy.h"
#include "bloom.h"
#include "map.h"
#include "fuzzy_storage.h"

#ifdef WITH_JUDY
@@ -96,6 +97,9 @@ struct rspamd_fuzzy_storage_ctx {
guint32 expire;
guint32 frequent_score;
guint32 max_mods;
radix_tree_t *update_ips;
gchar *update_map;
struct event_base *ev_base;
};

struct rspamd_fuzzy_node {
@@ -105,6 +109,20 @@ struct rspamd_fuzzy_node {
fuzzy_hash_t h;
};

struct fuzzy_session {
struct rspamd_worker *worker;
struct fuzzy_cmd cmd;
gint fd;
u_char *pos;
socklen_t salen;
union {
struct sockaddr ss;
struct sockaddr_storage sa;
struct sockaddr_in s4;
struct sockaddr_in6 v6;
} client_addr;
struct rspamd_fuzzy_storage_ctx *ctx;
};

#ifndef HAVE_SA_SIGINFO
static void
@@ -189,7 +207,7 @@ sync_cache (struct rspamd_worker *wrk)
bloom_del (bf, node->h.hash_pipe);
server_stat->fuzzy_hashes_expired ++;
server_stat->fuzzy_hashes --;
g_free (node);
g_slice_free1 (sizeof (struct rspamd_fuzzy_node), node);
continue;
}
if (write (fd, node, sizeof (struct rspamd_fuzzy_node)) == -1) {
@@ -221,7 +239,7 @@ sync_cache (struct rspamd_worker *wrk)
bloom_del (bf, node->h.hash_pipe);
server_stat->fuzzy_hashes_expired ++;
server_stat->fuzzy_hashes --;
g_free (node);
g_slice_free1 (sizeof (struct rspamd_fuzzy_node), node);
continue;
}
if (write (fd, node, sizeof (struct rspamd_fuzzy_node)) == -1) {
@@ -276,7 +294,7 @@ sigusr2_handler (gint fd, short what, void *arg)
event_del (&worker->bind_ev);
close (worker->cf->listen_sock);
msg_info ("worker's shutdown is pending in %d sec", SOFT_SHUTDOWN_TIME);
event_loopexit (&tv);
event_base_loopexit (ctx->ev_base, &tv);
mods = ctx->max_mods + 1;
sync_cache (worker);
return;
@@ -364,7 +382,7 @@ read_hashes_file (struct rspamd_worker *wrk)
}

for (;;) {
node = g_malloc (sizeof (struct rspamd_fuzzy_node));
node = g_slice_alloc (sizeof (struct rspamd_fuzzy_node));
if (version == 0) {
r = read (fd, &legacy_node, sizeof (legacy_node));
if (r != sizeof (legacy_node)) {
@@ -544,7 +562,7 @@ process_write_command (struct fuzzy_cmd *cmd, struct rspamd_fuzzy_storage_ctx *c
}
}

h = g_malloc (sizeof (struct rspamd_fuzzy_node));
h = g_slice_alloc (sizeof (struct rspamd_fuzzy_node));
memcpy (&h->h.hash_pipe, &cmd->hash, sizeof (cmd->hash));
h->h.block_size = cmd->blocksize;
h->time = (guint64) time (NULL);
@@ -585,7 +603,7 @@ delete_hash (GQueue *hash, fuzzy_hash_t *s, struct rspamd_fuzzy_storage_ctx *ctx
if (pvalue) {
data = *pvalue;
res = JudySLDel (&jtree, s->hash_pipe, PJE0);
g_free (data);
g_slice_free1 (sizeof (struct rspamd_fuzzy_node), data);
bloom_del (bf, s->hash_pipe);
msg_info ("fuzzy hash was successfully deleted");
server_stat->fuzzy_hashes --;
@@ -600,7 +618,7 @@ delete_hash (GQueue *hash, fuzzy_hash_t *s, struct rspamd_fuzzy_storage_ctx *ctx
while (cur) {
h = cur->data;
if (fuzzy_compare_hashes (&h->h, s) > LEV_LIMIT) {
g_free (h);
g_slice_free1 (sizeof (struct rspamd_fuzzy_node), h);
tmp = cur;
cur = g_list_next (cur);
g_queue_delete_link (hash, tmp);
@@ -653,15 +671,35 @@ process_delete_command (struct fuzzy_cmd *cmd, struct rspamd_fuzzy_storage_ctx *
return res;
}

/**
* Checks the client's address for update commands permission
*/
static gboolean
check_fuzzy_client (struct fuzzy_session *session)
{
if (session->ctx->update_ips != NULL) {
/* XXX: cannot work with ipv6 addresses */
if (session->client_addr.ss.sa_family != AF_INET) {
return FALSE;
}
if (radix32tree_find (session->ctx->update_ips,
ntohl (session->client_addr.s4.sin_addr.s_addr)) == RADIX_NO_VALUE) {
return FALSE;
}
}

return TRUE;
}

#define CMD_PROCESS(x) \
do { \
if (process_##x##_command (&session->cmd, session->worker->ctx)) { \
if (sendto (session->fd, "OK" CRLF, sizeof ("OK" CRLF) - 1, 0, (struct sockaddr *)&session->sa, session->salen) == -1) { \
if (sendto (session->fd, "OK" CRLF, sizeof ("OK" CRLF) - 1, 0, &session->client_addr.ss, session->salen) == -1) { \
msg_err ("error while writing reply: %s", strerror (errno)); \
} \
} \
else { \
if (sendto (session->fd, "ERR" CRLF, sizeof ("ERR" CRLF) - 1, 0, (struct sockaddr *)&session->sa, session->salen) == -1) { \
if (sendto (session->fd, "ERR" CRLF, sizeof ("ERR" CRLF) - 1, 0, &session->client_addr.ss, session->salen) == -1) { \
msg_err ("error while writing reply: %s", strerror (errno)); \
} \
} \
@@ -678,24 +716,45 @@ process_fuzzy_command (struct fuzzy_session *session)
r = process_check_command (&session->cmd, &flag, session->worker->ctx);
if (r != 0) {
r = rspamd_snprintf (buf, sizeof (buf), "OK %d %d" CRLF, r, flag);
if (sendto (session->fd, buf, r, 0, (struct sockaddr *)&session->sa, session->salen) == -1) {
if (sendto (session->fd, buf, r, 0,
&session->client_addr.ss, session->salen) == -1) {
msg_err ("error while writing reply: %s", strerror (errno));
}
}
else {
if (sendto (session->fd, "ERR" CRLF, sizeof ("ERR" CRLF) - 1, 0, (struct sockaddr *)&session->sa, session->salen) == -1) {
if (sendto (session->fd, "ERR" CRLF, sizeof ("ERR" CRLF) - 1, 0,
&session->client_addr.ss, session->salen) == -1) {
msg_err ("error while writing reply: %s", strerror (errno));
}
}
break;
case FUZZY_WRITE:
CMD_PROCESS (write);
if (!check_fuzzy_client (session)) {
msg_info ("try to insert a hash from an untrusted address");
if (sendto (session->fd, "UNAUTH" CRLF, sizeof ("UNAUTH" CRLF) - 1, 0,
&session->client_addr.ss, session->salen) == -1) {
msg_err ("error while writing reply: %s", strerror (errno));
}
}
else {
CMD_PROCESS (write);
}
break;
case FUZZY_DEL:
CMD_PROCESS (delete);
if (!check_fuzzy_client (session)) {
msg_info ("try to delete a hash from an untrusted address");
if (sendto (session->fd, "UNAUTH" CRLF, sizeof ("UNAUTH" CRLF) - 1, 0,
&session->client_addr.ss, session->salen) == -1) {
msg_err ("error while writing reply: %s", strerror (errno));
}
}
else {
CMD_PROCESS (delete);
}
break;
default:
if (sendto (session->fd, "ERR" CRLF, sizeof ("ERR" CRLF) - 1, 0, (struct sockaddr *)&session->sa, session->salen) == -1) {
if (sendto (session->fd, "ERR" CRLF, sizeof ("ERR" CRLF) - 1, 0,
&session->client_addr.ss, session->salen) == -1) {
msg_err ("error while writing reply: %s", strerror (errno));
}
break;
@@ -725,15 +784,20 @@ accept_fuzzy_socket (gint fd, short what, void *arg)
session.worker = worker;
session.fd = fd;
session.pos = (u_char *) & session.cmd;
session.salen = sizeof (session.sa);
session.salen = sizeof (session.client_addr);
session.ctx = worker->ctx;

/* Got some data */
if (what == EV_READ) {
if ((r = recvfrom (fd, session.pos, sizeof (struct fuzzy_cmd), MSG_WAITALL, (struct sockaddr *)&session.sa, &session.salen)) == -1) {
while ((r = recvfrom (fd, session.pos, sizeof (struct fuzzy_cmd),
MSG_WAITALL, &session.client_addr.ss, &session.salen)) == -1) {
if (errno == EINTR) {
continue;
}
msg_err ("got error while reading from socket: %d, %s", errno, strerror (errno));
return;
}
else if (r == sizeof (struct fuzzy_cmd)) {
if (r == sizeof (struct fuzzy_cmd)) {
/* Assume that the whole command was read */
process_fuzzy_command (&session);
}
@@ -757,9 +821,13 @@ accept_fuzzy_socket (gint fd, short what, void *arg)
static void
sync_callback (gint fd, short what, void *arg)
{
struct rspamd_worker *worker = (struct rspamd_worker *)arg;
struct rspamd_worker *worker = (struct rspamd_worker *)arg;
struct rspamd_fuzzy_storage_ctx *ctx;

ctx = worker->ctx;
/* Timer event */
evtimer_set (&tev, sync_callback, worker);
event_base_set (ctx->ev_base, &tev);
/* Plan event with jitter */
tmv.tv_sec = SYNC_TIMEOUT + SYNC_TIMEOUT * g_random_double ();
tmv.tv_usec = 0;
@@ -768,6 +836,29 @@ sync_callback (gint fd, short what, void *arg)
sync_cache (worker);
}

static gboolean
parse_fuzzy_update_list (struct rspamd_fuzzy_storage_ctx *ctx)
{
gchar **strvec, **cur;
struct in_addr ina;
guint32 mask;

strvec = g_strsplit_set (ctx->update_map, ",", 0);
cur = strvec;

while (*cur != NULL) {
/* XXX: handle only ipv4 addresses */
if (parse_ipmask_v4 (*cur, &ina, &mask)) {
if (ctx->update_ips == NULL) {
ctx->update_ips = radix_tree_create ();
}
radix32tree_add (ctx->update_ips, htonl (ina.s_addr), mask, 1);
}
}

return (ctx->update_ips != NULL);
}

gpointer
init_fuzzy (void)
{
@@ -787,11 +878,13 @@ init_fuzzy (void)
register_worker_opt (type, "max_mods", xml_handle_uint32, ctx,
G_STRUCT_OFFSET (struct rspamd_fuzzy_storage_ctx, max_mods));
register_worker_opt (type, "frequent_score", xml_handle_uint32, ctx,
G_STRUCT_OFFSET (struct rspamd_fuzzy_storage_ctx, frequent_score));
G_STRUCT_OFFSET (struct rspamd_fuzzy_storage_ctx, frequent_score));
register_worker_opt (type, "expire", xml_handle_seconds, ctx,
G_STRUCT_OFFSET (struct rspamd_fuzzy_storage_ctx, expire));
G_STRUCT_OFFSET (struct rspamd_fuzzy_storage_ctx, expire));
register_worker_opt (type, "use_judy", xml_handle_boolean, ctx,
G_STRUCT_OFFSET (struct rspamd_fuzzy_storage_ctx, use_judy));
G_STRUCT_OFFSET (struct rspamd_fuzzy_storage_ctx, use_judy));
register_worker_opt (type, "allow_update", xml_handle_string, ctx,
G_STRUCT_OFFSET (struct rspamd_fuzzy_storage_ctx, update_map));

return ctx;
}
@@ -802,13 +895,14 @@ init_fuzzy (void)
void
start_fuzzy (struct rspamd_worker *worker)
{
struct sigaction signals;
struct event sev;
gint retries = 0;
struct sigaction signals;
struct event sev;
gint retries = 0;
struct rspamd_fuzzy_storage_ctx *ctx = worker->ctx;

worker->srv->pid = getpid ();

event_init ();
ctx->ev_base = event_init ();

server_stat = worker->srv->stat;

@@ -817,13 +911,16 @@ start_fuzzy (struct rspamd_worker *worker)

/* SIGUSR2 handler */
signal_set (&worker->sig_ev_usr2, SIGUSR2, sigusr2_handler, (void *) worker);
event_base_set (ctx->ev_base, &worker->sig_ev_usr2);
signal_add (&worker->sig_ev_usr2, NULL);

/* SIGUSR1 handler */
signal_set (&worker->sig_ev_usr1, SIGUSR1, sigusr1_handler, (void *) worker);
event_base_set (ctx->ev_base, &worker->sig_ev_usr1);
signal_add (&worker->sig_ev_usr1, NULL);

signal_set (&sev, SIGTERM, sigterm_handler, (void *)worker);
event_base_set (ctx->ev_base, &sev);
signal_add (&sev, NULL);

/* Listen event */
@@ -844,16 +941,32 @@ start_fuzzy (struct rspamd_worker *worker)
}
/* Timer event */
evtimer_set (&tev, sync_callback, worker);
event_base_set (ctx->ev_base, &tev);
/* Plan event with jitter */
tmv.tv_sec = SYNC_TIMEOUT + SYNC_TIMEOUT * g_random_double ();
tmv.tv_usec = 0;
evtimer_add (&tev, &tmv);

event_set (&worker->bind_ev, worker->cf->listen_sock, EV_READ | EV_PERSIST, accept_fuzzy_socket, (void *)worker);
event_base_set (ctx->ev_base, &worker->bind_ev);
event_add (&worker->bind_ev, NULL);

/* Create radix tree */
if (ctx->update_map != NULL) {
if (!add_map (worker->srv->cfg, ctx->update_map, "Allow fuzzy updates from specified addresses",
read_radix_list, fin_radix_list, (void **)&ctx->update_ips)) {
if (!parse_fuzzy_update_list (ctx)) {
msg_warn ("cannot load or parse ip list from '%s'", ctx->update_map);
}
}
}

/* Maps events */
start_map_watch (worker->srv->cfg, ctx->ev_base);

gperf_profiler_init (worker->srv->cfg, "fuzzy");

event_loop (0);
event_base_loop (ctx->ev_base, 0);
close_log (rspamd_main->logger);
exit (EXIT_SUCCESS);
}

+ 0
- 9
src/fuzzy_storage.h View File

@@ -18,13 +18,4 @@ struct fuzzy_cmd {
u_char hash[FUZZY_HASHLEN];
};

struct fuzzy_session {
struct rspamd_worker *worker;
struct fuzzy_cmd cmd;
gint fd;
u_char *pos;
socklen_t salen;
struct sockaddr_storage sa;
};

#endif

+ 1
- 1
src/map.c View File

@@ -996,7 +996,7 @@ check_map_proto (const gchar *map_line, gint *res, const gchar **pos)
*pos = map_line;
}
else {
msg_warn ("invalid map fetching protocol: %s", map_line);
msg_debug ("invalid map fetching protocol: %s", map_line);
return FALSE;
}


+ 3
- 3
src/plugins/custom/ipmark/ipmark.c View File

@@ -87,7 +87,7 @@ parse_ipmask (const char *begin, struct in_addr *ina, int *mask, int *value)
const char *pos;
char ip_buf[sizeof ("255.255.255.255")], mask_buf[3] = { '\0', '\0', '\0' }, *p;
int state = 1, dots = 0;
bzero (ip_buf, sizeof (ip_buf));
bzero (mask_buf, sizeof (mask_buf));
pos = begin;
@@ -164,8 +164,8 @@ parse_ipmask (const char *begin, struct in_addr *ina, int *mask, int *value)
*mask = 32;
}

*mask = 0xFFFFFFFF << (32 - *mask);
*mask = 0xFFFFFFFF << (32 - *mask);
return TRUE;
}


+ 36
- 0
src/util.c View File

@@ -1823,6 +1823,42 @@ rspamd_str_pool_copy (gconstpointer data, gpointer ud)
return data ? memory_pool_strdup (pool, data) : NULL;
}

gboolean
parse_ipmask_v4 (const char *line, struct in_addr *ina, int *mask)
{
const char *pos;
char ip_buf[INET_ADDRSTRLEN + 1], mask_buf[3] = { '\0', '\0', '\0' };

bzero (ip_buf, sizeof (ip_buf));

if ((pos = strchr (line, '/')) != NULL) {
rspamd_strlcpy (ip_buf, line, MIN ((gsize)(pos - line), sizeof (ip_buf)));
rspamd_strlcpy (mask_buf, pos + 1, sizeof (mask_buf));
}
else {
rspamd_strlcpy (ip_buf, line, sizeof (ip_buf));
}

if (!inet_aton (ip_buf, ina)) {
return FALSE;
}

if (mask_buf[0] != '\0') {
/* Also parse mask */
*mask = (mask_buf[0] - '0') * 10 + mask_buf[1] - '0';
if (*mask > 32) {
return FALSE;
}
}
else {
*mask = 32;
}

*mask = G_MAXUINT32 << (32 - *mask);

return TRUE;
}

/*
* vi:ts=4
*/

+ 9
- 0
src/util.h View File

@@ -381,4 +381,13 @@ void rspamd_hash_table_copy (GHashTable *src, GHashTable *dst,
*/
gpointer rspamd_str_pool_copy (gconstpointer data, gpointer ud);

/**
* Parse ipv4 address with optional mask in CIDR format
* @param line cidr notation of ipv4 address
* @param ina destination address
* @param mask destination mask
* @return
*/
gboolean parse_ipmask_v4 (const char *line, struct in_addr *ina, int *mask);

#endif

Loading…
Cancel
Save