From 72f2f16245dbbe2994f44b247114ceb07c42810e Mon Sep 17 00:00:00 2001 From: Vsevolod Stakhov Date: Thu, 5 Jul 2018 13:49:12 +0100 Subject: [PATCH] [Feature] Add logic to deduplicate updates queue --- src/libserver/fuzzy_backend.c | 101 ++++++++++++++++++++++++++++ src/libserver/fuzzy_backend_redis.c | 6 ++ src/libserver/fuzzy_wire.h | 3 +- 3 files changed, 109 insertions(+), 1 deletion(-) diff --git a/src/libserver/fuzzy_backend.c b/src/libserver/fuzzy_backend.c index b66ac49c0..13aef8ba8 100644 --- a/src/libserver/fuzzy_backend.c +++ b/src/libserver/fuzzy_backend.c @@ -19,6 +19,7 @@ #include "fuzzy_backend_sqlite.h" #include "fuzzy_backend_redis.h" #include "cfg_file.h" +#include "fuzzy_wire.h" #define DEFAULT_EXPIRE 172800L @@ -324,6 +325,105 @@ rspamd_fuzzy_backend_check (struct rspamd_fuzzy_backend *bk, bk->subr->check (bk, cmd, cb, ud, bk->subr_ud); } +static guint +rspamd_fuzzy_digest_hash (gconstpointer key) +{ + guint ret; + + /* Distirbuted uniformly already */ + memcpy (&ret, key, sizeof (ret)); + + return ret; +} + +static gboolean +rspamd_fuzzy_digest_equal (gconstpointer v, gconstpointer v2) +{ + return memcmp (v, v2, rspamd_cryptobox_HASHBYTES) == 0; +} + +static void +rspamd_fuzzy_backend_deduplicate_queue (GArray *updates) +{ + GHashTable *seen = g_hash_table_new (rspamd_fuzzy_digest_hash, + rspamd_fuzzy_digest_equal); + struct fuzzy_peer_cmd *io_cmd, *found; + struct rspamd_fuzzy_cmd *cmd; + guchar *digest; + guint i; + + for (i = 0; i < updates->len; i ++) { + io_cmd = &g_array_index (updates, struct fuzzy_peer_cmd, i); + + if (io_cmd->is_shingle) { + cmd = &io_cmd->cmd.shingle.basic; + } + else { + cmd = &io_cmd->cmd.normal; + } + + digest = cmd->digest; + + found = g_hash_table_lookup (seen, digest); + + if (found == NULL) { + /* Add to the seen list, if not a duplicate (huh?) */ + if (cmd->cmd != FUZZY_DUP) { + g_hash_table_insert (seen, digest, io_cmd); + } + } + else { + if (found->cmd.normal.flag != cmd->flag) { + /* TODO: deal with flags better at some point */ + continue; + } + + /* Apply heuristic */ + switch (cmd->cmd) { + case FUZZY_WRITE: + if (found->cmd.normal.cmd == FUZZY_WRITE) { + /* Already seen */ + found->cmd.normal.value += cmd->value; + cmd->cmd = FUZZY_DUP; /* Ignore this one */ + } + else if (found->cmd.normal.cmd == FUZZY_REFRESH) { + /* Seen refresh command, remove it as write has higher priority */ + g_hash_table_replace (seen, digest, io_cmd); + found->cmd.normal.cmd = FUZZY_DUP; + } + else if (found->cmd.normal.cmd == FUZZY_DEL) { + /* Request delete + add, weird, but ignore add */ + cmd->cmd = FUZZY_DUP; /* Ignore this one */ + } + break; + case FUZZY_REFRESH: + if (found->cmd.normal.cmd == FUZZY_WRITE) { + /* No need to expire, handled by addition */ + cmd->cmd = FUZZY_DUP; /* Ignore this one */ + } + else if (found->cmd.normal.cmd == FUZZY_DEL) { + /* Request delete + expire, ignore expire */ + cmd->cmd = FUZZY_DUP; /* Ignore this one */ + } + else if (found->cmd.normal.cmd == FUZZY_REFRESH) { + /* Already handled */ + cmd->cmd = FUZZY_DUP; /* Ignore this one */ + } + break; + case FUZZY_DEL: + /* Delete has priority over all other commands */ + g_hash_table_replace (seen, digest, io_cmd); + found->cmd.normal.cmd = FUZZY_DUP; + break; + default: + break; + } + } + } + + g_hash_table_unref (seen); +} + void rspamd_fuzzy_backend_process_updates (struct rspamd_fuzzy_backend *bk, GArray *updates, const gchar *src, rspamd_fuzzy_update_cb cb, @@ -333,6 +433,7 @@ rspamd_fuzzy_backend_process_updates (struct rspamd_fuzzy_backend *bk, g_assert (updates != NULL); if (updates) { + rspamd_fuzzy_backend_deduplicate_queue (updates); bk->subr->update (bk, updates, src, cb, ud, bk->subr_ud); } else if (cb) { diff --git a/src/libserver/fuzzy_backend_redis.c b/src/libserver/fuzzy_backend_redis.c index 550e76f6f..89bc6728f 100644 --- a/src/libserver/fuzzy_backend_redis.c +++ b/src/libserver/fuzzy_backend_redis.c @@ -1191,6 +1191,9 @@ rspamd_fuzzy_update_append_command (struct rspamd_fuzzy_backend *bk, return FALSE; } } + else if (cmd->cmd == FUZZY_DUP) { + /* Ignore */ + } else { g_assert_not_reached (); } @@ -1305,6 +1308,9 @@ rspamd_fuzzy_update_append_command (struct rspamd_fuzzy_backend *bk, } } } + else if (cmd->cmd == FUZZY_DUP) { + /* Ignore */ + } else { g_assert_not_reached (); } diff --git a/src/libserver/fuzzy_wire.h b/src/libserver/fuzzy_wire.h index 73d4c480c..4a1dc3ed4 100644 --- a/src/libserver/fuzzy_wire.h +++ b/src/libserver/fuzzy_wire.h @@ -16,7 +16,8 @@ #define FUZZY_STAT 3 #define FUZZY_CLIENT_MAX 3 /* Internal commands */ -#define FUZZY_REFRESH 100 +#define FUZZY_REFRESH 100 /* Update expire */ +#define FUZZY_DUP 101 /* Skip duplicate in update queue */ /** * The epoch of the fuzzy client -- 2.39.5