aboutsummaryrefslogtreecommitdiffstats
path: root/src/fuzzy_storage.c
diff options
context:
space:
mode:
authorVsevolod Stakhov <vsevolod@highsecure.ru>2016-05-23 12:18:00 +0100
committerVsevolod Stakhov <vsevolod@highsecure.ru>2016-05-23 12:18:00 +0100
commit40d5f05721ac5d96a99659407eac91eab2420f86 (patch)
tree6b720d2ea256514a1c03eb47ff08c04b6cbae63e /src/fuzzy_storage.c
parent21d10d44caed825b3c6f06faf92715de98c7dca3 (diff)
downloadrspamd-40d5f05721ac5d96a99659407eac91eab2420f86.tar.gz
rspamd-40d5f05721ac5d96a99659407eac91eab2420f86.zip
[Feature] Implement fuzzy storage updates
Diffstat (limited to 'src/fuzzy_storage.c')
-rw-r--r--src/fuzzy_storage.c197
1 files changed, 189 insertions, 8 deletions
diff --git a/src/fuzzy_storage.c b/src/fuzzy_storage.c
index d8f82104f..db1557e47 100644
--- a/src/fuzzy_storage.c
+++ b/src/fuzzy_storage.c
@@ -94,7 +94,8 @@ struct rspamd_fuzzy_storage_ctx {
gdouble sync_timeout;
radix_compressed_t *update_ips;
radix_compressed_t *master_ips;
- struct rspamd_cryptobox_keypair *master_key;
+ struct rspamd_cryptobox_keypair *sync_keypair;
+ struct rspamd_cryptobox_pubkey *master_key;
struct timeval master_io_tv;
gdouble master_timeout;
gchar *update_map;
@@ -704,6 +705,144 @@ rspamd_fuzzy_cmd_from_wire (guchar *buf, guint buflen, struct fuzzy_session *s)
}
static void
+rspamd_fuzzy_mirror_process_update (struct fuzzy_master_update_session *session,
+ struct rspamd_http_message *msg)
+{
+ const guchar *p;
+ gsize remain;
+ guint32 revision, our_rev, len;
+ struct fuzzy_peer_cmd cmd, *pcmd;
+ enum {
+ read_len = 0,
+ read_data,
+ finish_processing
+ } state = read_len;
+ GList *updates = NULL, *cur;
+
+ if (!msg->body || msg->body->len == 0) {
+ msg_err ("empty update message, not processing");
+
+ return;
+ }
+
+ /*
+ * Message format:
+ * <uint32_le> - revision
+ * <uint32_le> - size of the next element
+ * <data> - command data
+ * ...
+ * <0> - end of data
+ * ... - ignored
+ */
+ p = (const guchar *)msg->body->str;
+ remain = msg->body->len;
+
+ if (remain > sizeof (guint32) * 2) {
+ memcpy (&revision, p, sizeof (guint32));
+ revision = GUINT32_TO_LE (revision);
+ our_rev = rspamd_fuzzy_backend_version (session->ctx->backend);
+
+ if (revision <= our_rev) {
+ msg_err ("remote revision:d %d is older than ours: %d, refusing update",
+ revision, our_rev);
+
+ return;
+ }
+ else if (revision - our_rev > 1) {
+ msg_warn ("remote revision:d %d is newer more than 1 revision "
+ "than ours: %d, cold sync is recommended",
+ revision, our_rev);
+ }
+
+ remain -= sizeof (guint32);
+ p += sizeof (guint32);
+ }
+ else {
+ msg_err ("short update message, not processing");
+ goto err;
+ }
+
+ while (remain > 0) {
+ switch (state) {
+ case read_len:
+ if (remain < sizeof (guint32)) {
+ msg_err ("short update message while reading length, not processing");
+ goto err;
+ }
+
+ memcpy (&len, p, sizeof (guint32));
+ len = GUINT32_TO_LE (len);
+ remain -= sizeof (guint32);
+ p += sizeof (guint32);
+
+ if (len == 0) {
+ remain = 0;
+ state = finish_processing;
+ }
+ else {
+ state = read_data;
+ }
+ break;
+ case read_data:
+ if (remain < len) {
+ msg_err ("short update message while reading data, not processing");
+ return;
+ }
+
+ if (len < sizeof (struct rspamd_fuzzy_cmd) + sizeof (gboolean) ||
+ len > sizeof (cmd)) {
+ /* Bad size command */
+ msg_err ("incorrect element size: %d, at least %d expected", len,
+ (gint)(sizeof (struct rspamd_fuzzy_cmd) + sizeof (gboolean)));
+ goto err;
+ }
+
+ memcpy (&cmd, p, sizeof (gboolean));
+ if (cmd.is_shingle && len < sizeof (cmd)) {
+ /* Short command */
+ msg_err ("incorrect element size: %d, at least %d expected", len,
+ (gint)(sizeof (cmd)));
+ goto err;
+ }
+
+ pcmd = g_slice_alloc (sizeof (cmd));
+ memcpy (pcmd, p, len);
+ updates = g_list_prepend (updates, pcmd);
+
+ p += len;
+ remain -= len;
+ len = 0;
+ state = read_len;
+ break;
+ case finish_processing:
+ /* Do nothing */
+ remain = 0;
+ break;
+ }
+ }
+
+ /* Insert elements to the updates from head */
+ for (cur = updates; cur != NULL; cur = g_list_next (cur)) {
+ g_queue_push_head (session->ctx->updates_pending, cur->data);
+ cur->data = NULL;
+ }
+
+ rspamd_fuzzy_process_updates_queue (session->ctx);
+
+err:
+ if (updates) {
+ for (cur = updates; cur != NULL; cur = g_list_next (cur)) {
+ if (cur->data) {
+ g_slice_free1 (sizeof (cmd), cur->data);
+ }
+ }
+
+ g_list_free (updates);
+ }
+}
+
+
+static void
fuzzy_session_destroy (gpointer d)
{
struct fuzzy_session *session = d;
@@ -740,8 +879,35 @@ rspamd_fuzzy_mirror_finish_handler (struct rspamd_http_connection *conn,
struct rspamd_http_message *msg)
{
struct fuzzy_master_update_session *session = conn->ud;
+ const struct rspamd_cryptobox_pubkey *rk;
+
+ /* Check key */
+ if (!rspamd_http_connection_is_encrypted (conn)) {
+ msg_err ("refuse unencrypted update from: %s",
+ rspamd_inet_address_to_string (session->addr));
+ goto end;
+ }
+ else {
+
+ if (session->ctx->master_key) {
+ rk = rspamd_http_connection_get_peer_key (conn);
+ g_assert (rk != NULL);
+
+ if (!rspamd_pubkey_equal (rk, session->ctx->master_key)) {
+ msg_err ("refuse unknown pubkey update from: %s",
+ rspamd_inet_address_to_string (session->addr));
+ goto end;
+ }
+ }
+ else {
+ msg_warn ("no trusted key specified, accept any update from %s",
+ rspamd_inet_address_to_string (session->addr));
+ }
+
+ rspamd_fuzzy_mirror_process_update (session, msg);
+ }
- /* TODO: implement updates */
+end:
rspamd_fuzzy_mirror_session_destroy (session);
return 0;
@@ -786,6 +952,15 @@ accept_fuzzy_mirror_socket (gint fd, short what, void *arg)
return;
}
+ if (!ctx->sync_keypair) {
+ msg_err ("deny update request from %s, as no local keypair is specified",
+ rspamd_inet_address_to_string (addr));
+ rspamd_inet_address_destroy (addr);
+ close (nfd);
+
+ return;
+ }
+
session = g_slice_alloc0 (sizeof (*session));
http_conn = rspamd_http_connection_new (
NULL,
@@ -795,10 +970,7 @@ accept_fuzzy_mirror_socket (gint fd, short what, void *arg)
RSPAMD_HTTP_SERVER,
ctx->keypair_cache);
- if (ctx->master_key) {
- rspamd_http_connection_set_key (http_conn, ctx->master_key);
- }
-
+ rspamd_http_connection_set_key (http_conn, ctx->sync_keypair);
session->ctx = ctx;
session->conn = http_conn;
session->addr = addr;
@@ -1434,10 +1606,10 @@ init_fuzzy (struct rspamd_config *cfg)
rspamd_rcl_register_worker_option (cfg,
type,
- "master_key",
+ "sync_keypair",
rspamd_rcl_parse_struct_keypair,
ctx,
- G_STRUCT_OFFSET (struct rspamd_fuzzy_storage_ctx, master_key),
+ G_STRUCT_OFFSET (struct rspamd_fuzzy_storage_ctx, sync_keypair),
0,
"Encryption key for master/slave updates");
@@ -1450,6 +1622,15 @@ init_fuzzy (struct rspamd_config *cfg)
0,
"Allow master/slave updates from the following IP addresses");
+ rspamd_rcl_register_worker_option (cfg,
+ type,
+ "master_key",
+ rspamd_rcl_parse_struct_pubkey,
+ ctx,
+ G_STRUCT_OFFSET (struct rspamd_fuzzy_storage_ctx, master_key),
+ 0,
+ "Allow master/slave updates merely using the specified key");
+
return ctx;
}