]> source.dussan.org Git - rspamd.git/commitdiff
[Feature] Add logic to deduplicate updates queue
authorVsevolod Stakhov <vsevolod@highsecure.ru>
Thu, 5 Jul 2018 12:49:12 +0000 (13:49 +0100)
committerVsevolod Stakhov <vsevolod@highsecure.ru>
Thu, 5 Jul 2018 12:49:12 +0000 (13:49 +0100)
src/libserver/fuzzy_backend.c
src/libserver/fuzzy_backend_redis.c
src/libserver/fuzzy_wire.h

index b66ac49c07c079d3546a3b6689a2d783c1291ca1..13aef8ba8052bba5539bbec92f69c970585cb068 100644 (file)
@@ -19,6 +19,7 @@
 #include "fuzzy_backend_sqlite.h"
 #include "fuzzy_backend_redis.h"
 #include "cfg_file.h"
+#include "fuzzy_wire.h"
 
 #define DEFAULT_EXPIRE 172800L
 
@@ -324,6 +325,105 @@ rspamd_fuzzy_backend_check (struct rspamd_fuzzy_backend *bk,
        bk->subr->check (bk, cmd, cb, ud, bk->subr_ud);
 }
 
+static guint
+rspamd_fuzzy_digest_hash (gconstpointer key)
+{
+       guint ret;
+
+       /* Distirbuted uniformly already */
+       memcpy (&ret, key, sizeof (ret));
+
+       return ret;
+}
+
+static gboolean
+rspamd_fuzzy_digest_equal (gconstpointer v, gconstpointer v2)
+{
+       return memcmp (v, v2, rspamd_cryptobox_HASHBYTES) == 0;
+}
+
+static void
+rspamd_fuzzy_backend_deduplicate_queue (GArray *updates)
+{
+       GHashTable *seen = g_hash_table_new (rspamd_fuzzy_digest_hash,
+                       rspamd_fuzzy_digest_equal);
+       struct fuzzy_peer_cmd *io_cmd, *found;
+       struct rspamd_fuzzy_cmd *cmd;
+       guchar *digest;
+       guint i;
+
+       for (i = 0; i < updates->len; i ++) {
+               io_cmd = &g_array_index (updates, struct fuzzy_peer_cmd, i);
+
+               if (io_cmd->is_shingle) {
+                       cmd = &io_cmd->cmd.shingle.basic;
+               }
+               else {
+                       cmd = &io_cmd->cmd.normal;
+               }
+
+               digest = cmd->digest;
+
+               found = g_hash_table_lookup (seen, digest);
+
+               if (found == NULL) {
+                       /* Add to the seen list, if not a duplicate (huh?) */
+                       if (cmd->cmd != FUZZY_DUP) {
+                               g_hash_table_insert (seen, digest, io_cmd);
+                       }
+               }
+               else {
+                       if (found->cmd.normal.flag != cmd->flag) {
+                               /* TODO: deal with flags better at some point */
+                               continue;
+                       }
+
+                       /* Apply heuristic */
+                       switch (cmd->cmd) {
+                       case FUZZY_WRITE:
+                               if (found->cmd.normal.cmd == FUZZY_WRITE) {
+                                       /* Already seen */
+                                       found->cmd.normal.value += cmd->value;
+                                       cmd->cmd = FUZZY_DUP; /* Ignore this one */
+                               }
+                               else if (found->cmd.normal.cmd == FUZZY_REFRESH) {
+                                       /* Seen refresh command, remove it as write has higher priority */
+                                       g_hash_table_replace (seen, digest, io_cmd);
+                                       found->cmd.normal.cmd = FUZZY_DUP;
+                               }
+                               else if (found->cmd.normal.cmd == FUZZY_DEL) {
+                                       /* Request delete + add, weird, but ignore add */
+                                       cmd->cmd = FUZZY_DUP; /* Ignore this one */
+                               }
+                               break;
+                       case FUZZY_REFRESH:
+                               if (found->cmd.normal.cmd == FUZZY_WRITE) {
+                                       /* No need to expire, handled by addition */
+                                       cmd->cmd = FUZZY_DUP; /* Ignore this one */
+                               }
+                               else if (found->cmd.normal.cmd == FUZZY_DEL) {
+                                       /* Request delete + expire, ignore expire */
+                                       cmd->cmd = FUZZY_DUP; /* Ignore this one */
+                               }
+                               else if (found->cmd.normal.cmd == FUZZY_REFRESH) {
+                                       /* Already handled */
+                                       cmd->cmd = FUZZY_DUP; /* Ignore this one */
+                               }
+                               break;
+                       case FUZZY_DEL:
+                               /* Delete has priority over all other commands */
+                               g_hash_table_replace (seen, digest, io_cmd);
+                               found->cmd.normal.cmd = FUZZY_DUP;
+                               break;
+                       default:
+                               break;
+                       }
+               }
+       }
+
+       g_hash_table_unref (seen);
+}
+
 void
 rspamd_fuzzy_backend_process_updates (struct rspamd_fuzzy_backend *bk,
                GArray *updates, const gchar *src, rspamd_fuzzy_update_cb cb,
@@ -333,6 +433,7 @@ rspamd_fuzzy_backend_process_updates (struct rspamd_fuzzy_backend *bk,
        g_assert (updates != NULL);
 
        if (updates) {
+               rspamd_fuzzy_backend_deduplicate_queue (updates);
                bk->subr->update (bk, updates, src, cb, ud, bk->subr_ud);
        }
        else if (cb) {
index 550e76f6fa61cb66744052055d99d743df2ac855..89bc6728f4b910d4183834d2a04dc630d7632a2e 100644 (file)
@@ -1191,6 +1191,9 @@ rspamd_fuzzy_update_append_command (struct rspamd_fuzzy_backend *bk,
                        return FALSE;
                }
        }
+       else if (cmd->cmd == FUZZY_DUP) {
+               /* Ignore */
+       }
        else {
                g_assert_not_reached ();
        }
@@ -1305,6 +1308,9 @@ rspamd_fuzzy_update_append_command (struct rspamd_fuzzy_backend *bk,
                                }
                        }
                }
+               else if (cmd->cmd == FUZZY_DUP) {
+                       /* Ignore */
+               }
                else {
                        g_assert_not_reached ();
                }
index 73d4c480c46541918983a7cf34318ee3db45d75d..4a1dc3ed4bf8445f0939eadcb1991fe81413ea42 100644 (file)
@@ -16,7 +16,8 @@
 #define FUZZY_STAT 3
 #define FUZZY_CLIENT_MAX 3
 /* Internal commands */
-#define FUZZY_REFRESH 100
+#define FUZZY_REFRESH 100 /* Update expire */
+#define FUZZY_DUP 101 /* Skip duplicate in update queue */
 
 /**
  * The epoch of the fuzzy client