aboutsummaryrefslogtreecommitdiffstats
path: root/src/fuzzy_storage.c
diff options
context:
space:
mode:
authorVsevolod Stakhov <vsevolod@highsecure.ru>2016-05-14 13:38:10 +0100
committerVsevolod Stakhov <vsevolod@highsecure.ru>2016-05-14 13:38:10 +0100
commitd46a62b2cb8d30ef53ac04fa94cea1dc21fa40c1 (patch)
tree8ab9d0eb5cbae748a8c43dc637450ac425da83f2 /src/fuzzy_storage.c
parent8fd0795feffdde4dd3a9f2fbe8c6ba517be370bf (diff)
downloadrspamd-d46a62b2cb8d30ef53ac04fa94cea1dc21fa40c1.tar.gz
rspamd-d46a62b2cb8d30ef53ac04fa94cea1dc21fa40c1.zip
[Feature] Preliminary implementation of fuzzy master/slave updates
Diffstat (limited to 'src/fuzzy_storage.c')
-rw-r--r--src/fuzzy_storage.c183
1 files changed, 173 insertions, 10 deletions
diff --git a/src/fuzzy_storage.c b/src/fuzzy_storage.c
index a1881aff3..d8f82104f 100644
--- a/src/fuzzy_storage.c
+++ b/src/fuzzy_storage.c
@@ -40,7 +40,7 @@
/* Resync value in seconds */
#define DEFAULT_SYNC_TIMEOUT 60.0
#define DEFAULT_KEYPAIR_CACHE_SIZE 512
-
+#define DEFAULT_MASTER_TIMEOUT 10.0
#define INVALID_NODE_TIME (guint64) - 1
@@ -53,7 +53,7 @@ worker_t fuzzy_worker = {
init_fuzzy, /* Init function */
start_fuzzy, /* Start function */
RSPAMD_WORKER_HAS_SOCKET,
- RSPAMD_WORKER_SOCKET_UDP, /* UDP socket */
+ RSPAMD_WORKER_SOCKET_UDP|RSPAMD_WORKER_SOCKET_TCP, /* Both socket */
RSPAMD_WORKER_VER /* Version info */
};
@@ -93,7 +93,12 @@ struct rspamd_fuzzy_storage_ctx {
gdouble expire;
gdouble sync_timeout;
radix_compressed_t *update_ips;
+ radix_compressed_t *master_ips;
+ struct rspamd_cryptobox_keypair *master_key;
+ struct timeval master_io_tv;
+ gdouble master_timeout;
gchar *update_map;
+ gchar *masters_map;
guint keypair_cache_size;
struct event_base *ev_base;
gint peer_fd;
@@ -160,6 +165,12 @@ struct fuzzy_key {
struct fuzzy_key_stat *stat;
};
+struct fuzzy_master_update_session {
+ struct rspamd_http_connection *conn;
+ struct rspamd_fuzzy_storage_ctx *ctx;
+ rspamd_inet_addr_t *addr;
+};
+
static void rspamd_fuzzy_write_reply (struct fuzzy_session *session);
static gboolean
@@ -703,6 +714,102 @@ fuzzy_session_destroy (gpointer d)
g_slice_free1 (sizeof (*session), session);
}
+static void
+rspamd_fuzzy_mirror_session_destroy (struct fuzzy_master_update_session *session)
+{
+ if (session) {
+ rspamd_http_connection_unref (session->conn);
+ rspamd_inet_address_destroy (session->addr);
+ g_slice_free1 (sizeof (*session), session);
+ }
+}
+
+static void
+rspamd_fuzzy_mirror_error_handler (struct rspamd_http_connection *conn, GError *err)
+{
+ struct fuzzy_master_update_session *session = conn->ud;
+
+ msg_err ("abnormally closing connection from: %s, error: %e",
+ rspamd_inet_address_to_string (session->addr), err);
+ /* Terminate session immediately */
+ rspamd_fuzzy_mirror_session_destroy (session);
+}
+
+static gint
+rspamd_fuzzy_mirror_finish_handler (struct rspamd_http_connection *conn,
+ struct rspamd_http_message *msg)
+{
+ struct fuzzy_master_update_session *session = conn->ud;
+
+ /* TODO: implement updates */
+ rspamd_fuzzy_mirror_session_destroy (session);
+
+ return 0;
+}
+
+static void
+accept_fuzzy_mirror_socket (gint fd, short what, void *arg)
+{
+ struct rspamd_worker *worker = (struct rspamd_worker *)arg;
+ rspamd_inet_addr_t *addr;
+ gint nfd;
+ struct rspamd_http_connection *http_conn;
+ struct rspamd_fuzzy_storage_ctx *ctx;
+ struct fuzzy_master_update_session *session;
+
+ if ((nfd =
+ rspamd_accept_from_socket (fd, &addr)) == -1) {
+ msg_warn ("accept failed: %s", strerror (errno));
+ return;
+ }
+ /* Check for EAGAIN */
+ if (nfd == 0) {
+ return;
+ }
+
+ ctx = worker->ctx;
+
+ if (!ctx->master_ips) {
+ msg_err ("deny update request from %s as no masters defined",
+ rspamd_inet_address_to_string (addr));
+ rspamd_inet_address_destroy (addr);
+ close (nfd);
+
+ return;
+ }
+ else if (radix_find_compressed_addr (ctx->master_ips, addr) == RADIX_NO_VALUE) {
+ msg_err ("deny update request from %s",
+ rspamd_inet_address_to_string (addr));
+ rspamd_inet_address_destroy (addr);
+ close (nfd);
+
+ return;
+ }
+
+ session = g_slice_alloc0 (sizeof (*session));
+ http_conn = rspamd_http_connection_new (
+ NULL,
+ rspamd_fuzzy_mirror_error_handler,
+ rspamd_fuzzy_mirror_finish_handler,
+ 0,
+ RSPAMD_HTTP_SERVER,
+ ctx->keypair_cache);
+
+ if (ctx->master_key) {
+ rspamd_http_connection_set_key (http_conn, ctx->master_key);
+ }
+
+ session->ctx = ctx;
+ session->conn = http_conn;
+ session->addr = addr;
+
+ rspamd_http_connection_read_message (http_conn,
+ session,
+ nfd,
+ &ctx->master_io_tv,
+ ctx->ev_base);
+}
+
/*
* Accept new connection and construct task
*/
@@ -1211,6 +1318,7 @@ init_fuzzy (struct rspamd_config *cfg)
ctx->magic = rspamd_fuzzy_storage_magic;
ctx->sync_timeout = DEFAULT_SYNC_TIMEOUT;
+ ctx->master_timeout = DEFAULT_MASTER_TIMEOUT;
ctx->expire = DEFAULT_EXPIRE;
ctx->keypair_cache_size = DEFAULT_KEYPAIR_CACHE_SIZE;
ctx->keys = g_hash_table_new_full (fuzzy_kp_hash, fuzzy_kp_equal,
@@ -1315,6 +1423,32 @@ init_fuzzy (struct rspamd_config *cfg)
0,
"Allow encrypted requests only (and forbid all unknown keys or plaintext requests)");
+ rspamd_rcl_register_worker_option (cfg,
+ type,
+ "master_timeout",
+ rspamd_rcl_parse_struct_time,
+ ctx,
+ G_STRUCT_OFFSET (struct rspamd_fuzzy_storage_ctx, master_timeout),
+ RSPAMD_CL_FLAG_TIME_FLOAT,
+ "Master protocol IO timeout");
+
+ rspamd_rcl_register_worker_option (cfg,
+ type,
+ "master_key",
+ rspamd_rcl_parse_struct_keypair,
+ ctx,
+ G_STRUCT_OFFSET (struct rspamd_fuzzy_storage_ctx, master_key),
+ 0,
+ "Encryption key for master/slave updates");
+
+ rspamd_rcl_register_worker_option (cfg,
+ type,
+ "masters",
+ rspamd_rcl_parse_struct_string,
+ ctx,
+ G_STRUCT_OFFSET (struct rspamd_fuzzy_storage_ctx, masters_map),
+ 0,
+ "Allow master/slave updates from the following IP addresses");
return ctx;
}
@@ -1371,13 +1505,25 @@ fuzzy_peer_rep (struct rspamd_worker *worker,
ls = cur->data;
if (ls->fd != -1) {
- accept_event = g_slice_alloc0 (sizeof (struct event));
- event_set (accept_event, ls->fd, EV_READ | EV_PERSIST,
- accept_fuzzy_socket, worker);
- event_base_set (ctx->ev_base, accept_event);
- event_add (accept_event, NULL);
- worker->accept_events = g_list_prepend (worker->accept_events,
- accept_event);
+ if (ls->type == RSPAMD_WORKER_SOCKET_UDP) {
+ accept_event = g_slice_alloc0 (sizeof (struct event));
+ event_set (accept_event, ls->fd, EV_READ | EV_PERSIST,
+ accept_fuzzy_socket, worker);
+ event_base_set (ctx->ev_base, accept_event);
+ event_add (accept_event, NULL);
+ worker->accept_events = g_list_prepend (worker->accept_events,
+ accept_event);
+ }
+ else if (worker->index == 0) {
+ /* We allow TCP listeners only for a update worker */
+ accept_event = g_slice_alloc0 (sizeof (struct event));
+ event_set (accept_event, ls->fd, EV_READ | EV_PERSIST,
+ accept_fuzzy_mirror_socket, worker);
+ event_base_set (ctx->ev_base, accept_event);
+ event_add (accept_event, NULL);
+ worker->accept_events = g_list_prepend (worker->accept_events,
+ accept_event);
+ }
}
cur = g_list_next (cur);
@@ -1415,6 +1561,7 @@ start_fuzzy (struct rspamd_worker *worker)
"fuzzy",
NULL);
ctx->peer_fd = -1;
+ double_to_tv (ctx->master_timeout, &ctx->master_io_tv);
/*
* Open DB and perform VACUUM
@@ -1443,7 +1590,7 @@ start_fuzzy (struct rspamd_worker *worker)
rspamd_fuzzy_storage_stat, ctx);
rspamd_control_worker_add_cmd_handler (worker, RSPAMD_CONTROL_FUZZY_SYNC,
rspamd_fuzzy_storage_sync, ctx);
- /* Create radix tree */
+ /* Create radix trees */
if (ctx->update_map != NULL) {
if (!rspamd_map_is_map (ctx->update_map)) {
if (!radix_add_generic_iplist (ctx->update_map,
@@ -1460,6 +1607,22 @@ start_fuzzy (struct rspamd_worker *worker)
}
}
+ if (ctx->masters_map != NULL) {
+ if (!rspamd_map_is_map (ctx->masters_map)) {
+ if (!radix_add_generic_iplist (ctx->masters_map,
+ &ctx->master_ips)) {
+ msg_warn ("cannot load or parse ip list from '%s'",
+ ctx->masters_map);
+ }
+ }
+ else {
+ rspamd_map_add (worker->srv->cfg, ctx->masters_map,
+ "Allow fuzzy master/slave updates from specified addresses",
+ rspamd_radix_read, rspamd_radix_fin,
+ (void **)&ctx->master_ips);
+
+ }
+ }
/* Maps events */
ctx->resolver = dns_resolver_init (worker->srv->logger,