summaryrefslogtreecommitdiffstats
path: root/src/libserver/fuzzy_backend.c
diff options
context:
space:
mode:
authorVsevolod Stakhov <vsevolod@highsecure.ru>2018-07-05 13:49:12 +0100
committerVsevolod Stakhov <vsevolod@highsecure.ru>2018-07-05 13:49:12 +0100
commit72f2f16245dbbe2994f44b247114ceb07c42810e (patch)
tree641ae569f8872f07a5b97a5bae20f8a7d566a60c /src/libserver/fuzzy_backend.c
parent7c5d79db90db646cdaef6430482ad8e7dc530f39 (diff)
downloadrspamd-72f2f16245dbbe2994f44b247114ceb07c42810e.tar.gz
rspamd-72f2f16245dbbe2994f44b247114ceb07c42810e.zip
[Feature] Add logic to deduplicate updates queue
Diffstat (limited to 'src/libserver/fuzzy_backend.c')
-rw-r--r--src/libserver/fuzzy_backend.c101
1 files changed, 101 insertions, 0 deletions
diff --git a/src/libserver/fuzzy_backend.c b/src/libserver/fuzzy_backend.c
index b66ac49c0..13aef8ba8 100644
--- a/src/libserver/fuzzy_backend.c
+++ b/src/libserver/fuzzy_backend.c
@@ -19,6 +19,7 @@
#include "fuzzy_backend_sqlite.h"
#include "fuzzy_backend_redis.h"
#include "cfg_file.h"
+#include "fuzzy_wire.h"
#define DEFAULT_EXPIRE 172800L
@@ -324,6 +325,105 @@ rspamd_fuzzy_backend_check (struct rspamd_fuzzy_backend *bk,
bk->subr->check (bk, cmd, cb, ud, bk->subr_ud);
}
+static guint
+rspamd_fuzzy_digest_hash (gconstpointer key)
+{
+ guint ret;
+
+ /* Distirbuted uniformly already */
+ memcpy (&ret, key, sizeof (ret));
+
+ return ret;
+}
+
+static gboolean
+rspamd_fuzzy_digest_equal (gconstpointer v, gconstpointer v2)
+{
+ return memcmp (v, v2, rspamd_cryptobox_HASHBYTES) == 0;
+}
+
+static void
+rspamd_fuzzy_backend_deduplicate_queue (GArray *updates)
+{
+ GHashTable *seen = g_hash_table_new (rspamd_fuzzy_digest_hash,
+ rspamd_fuzzy_digest_equal);
+ struct fuzzy_peer_cmd *io_cmd, *found;
+ struct rspamd_fuzzy_cmd *cmd;
+ guchar *digest;
+ guint i;
+
+ for (i = 0; i < updates->len; i ++) {
+ io_cmd = &g_array_index (updates, struct fuzzy_peer_cmd, i);
+
+ if (io_cmd->is_shingle) {
+ cmd = &io_cmd->cmd.shingle.basic;
+ }
+ else {
+ cmd = &io_cmd->cmd.normal;
+ }
+
+ digest = cmd->digest;
+
+ found = g_hash_table_lookup (seen, digest);
+
+ if (found == NULL) {
+ /* Add to the seen list, if not a duplicate (huh?) */
+ if (cmd->cmd != FUZZY_DUP) {
+ g_hash_table_insert (seen, digest, io_cmd);
+ }
+ }
+ else {
+ if (found->cmd.normal.flag != cmd->flag) {
+ /* TODO: deal with flags better at some point */
+ continue;
+ }
+
+ /* Apply heuristic */
+ switch (cmd->cmd) {
+ case FUZZY_WRITE:
+ if (found->cmd.normal.cmd == FUZZY_WRITE) {
+ /* Already seen */
+ found->cmd.normal.value += cmd->value;
+ cmd->cmd = FUZZY_DUP; /* Ignore this one */
+ }
+ else if (found->cmd.normal.cmd == FUZZY_REFRESH) {
+ /* Seen refresh command, remove it as write has higher priority */
+ g_hash_table_replace (seen, digest, io_cmd);
+ found->cmd.normal.cmd = FUZZY_DUP;
+ }
+ else if (found->cmd.normal.cmd == FUZZY_DEL) {
+ /* Request delete + add, weird, but ignore add */
+ cmd->cmd = FUZZY_DUP; /* Ignore this one */
+ }
+ break;
+ case FUZZY_REFRESH:
+ if (found->cmd.normal.cmd == FUZZY_WRITE) {
+ /* No need to expire, handled by addition */
+ cmd->cmd = FUZZY_DUP; /* Ignore this one */
+ }
+ else if (found->cmd.normal.cmd == FUZZY_DEL) {
+ /* Request delete + expire, ignore expire */
+ cmd->cmd = FUZZY_DUP; /* Ignore this one */
+ }
+ else if (found->cmd.normal.cmd == FUZZY_REFRESH) {
+ /* Already handled */
+ cmd->cmd = FUZZY_DUP; /* Ignore this one */
+ }
+ break;
+ case FUZZY_DEL:
+ /* Delete has priority over all other commands */
+ g_hash_table_replace (seen, digest, io_cmd);
+ found->cmd.normal.cmd = FUZZY_DUP;
+ break;
+ default:
+ break;
+ }
+ }
+ }
+
+ g_hash_table_unref (seen);
+}
+
void
rspamd_fuzzy_backend_process_updates (struct rspamd_fuzzy_backend *bk,
GArray *updates, const gchar *src, rspamd_fuzzy_update_cb cb,
@@ -333,6 +433,7 @@ rspamd_fuzzy_backend_process_updates (struct rspamd_fuzzy_backend *bk,
g_assert (updates != NULL);
if (updates) {
+ rspamd_fuzzy_backend_deduplicate_queue (updates);
bk->subr->update (bk, updates, src, cb, ud, bk->subr_ud);
}
else if (cb) {