#include "fuzzy_backend_sqlite.h"
#include "fuzzy_backend_redis.h"
#include "cfg_file.h"
+#include "fuzzy_wire.h"
#define DEFAULT_EXPIRE 172800L
bk->subr->check (bk, cmd, cb, ud, bk->subr_ud);
}
+static guint
+rspamd_fuzzy_digest_hash (gconstpointer key)
+{
+ guint ret;
+
+ /* Distirbuted uniformly already */
+ memcpy (&ret, key, sizeof (ret));
+
+ return ret;
+}
+
+static gboolean
+rspamd_fuzzy_digest_equal (gconstpointer v, gconstpointer v2)
+{
+ return memcmp (v, v2, rspamd_cryptobox_HASHBYTES) == 0;
+}
+
+static void
+rspamd_fuzzy_backend_deduplicate_queue (GArray *updates)
+{
+ GHashTable *seen = g_hash_table_new (rspamd_fuzzy_digest_hash,
+ rspamd_fuzzy_digest_equal);
+ struct fuzzy_peer_cmd *io_cmd, *found;
+ struct rspamd_fuzzy_cmd *cmd;
+ guchar *digest;
+ guint i;
+
+ for (i = 0; i < updates->len; i ++) {
+ io_cmd = &g_array_index (updates, struct fuzzy_peer_cmd, i);
+
+ if (io_cmd->is_shingle) {
+ cmd = &io_cmd->cmd.shingle.basic;
+ }
+ else {
+ cmd = &io_cmd->cmd.normal;
+ }
+
+ digest = cmd->digest;
+
+ found = g_hash_table_lookup (seen, digest);
+
+ if (found == NULL) {
+ /* Add to the seen list, if not a duplicate (huh?) */
+ if (cmd->cmd != FUZZY_DUP) {
+ g_hash_table_insert (seen, digest, io_cmd);
+ }
+ }
+ else {
+ if (found->cmd.normal.flag != cmd->flag) {
+ /* TODO: deal with flags better at some point */
+ continue;
+ }
+
+ /* Apply heuristic */
+ switch (cmd->cmd) {
+ case FUZZY_WRITE:
+ if (found->cmd.normal.cmd == FUZZY_WRITE) {
+ /* Already seen */
+ found->cmd.normal.value += cmd->value;
+ cmd->cmd = FUZZY_DUP; /* Ignore this one */
+ }
+ else if (found->cmd.normal.cmd == FUZZY_REFRESH) {
+ /* Seen refresh command, remove it as write has higher priority */
+ g_hash_table_replace (seen, digest, io_cmd);
+ found->cmd.normal.cmd = FUZZY_DUP;
+ }
+ else if (found->cmd.normal.cmd == FUZZY_DEL) {
+ /* Request delete + add, weird, but ignore add */
+ cmd->cmd = FUZZY_DUP; /* Ignore this one */
+ }
+ break;
+ case FUZZY_REFRESH:
+ if (found->cmd.normal.cmd == FUZZY_WRITE) {
+ /* No need to expire, handled by addition */
+ cmd->cmd = FUZZY_DUP; /* Ignore this one */
+ }
+ else if (found->cmd.normal.cmd == FUZZY_DEL) {
+ /* Request delete + expire, ignore expire */
+ cmd->cmd = FUZZY_DUP; /* Ignore this one */
+ }
+ else if (found->cmd.normal.cmd == FUZZY_REFRESH) {
+ /* Already handled */
+ cmd->cmd = FUZZY_DUP; /* Ignore this one */
+ }
+ break;
+ case FUZZY_DEL:
+ /* Delete has priority over all other commands */
+ g_hash_table_replace (seen, digest, io_cmd);
+ found->cmd.normal.cmd = FUZZY_DUP;
+ break;
+ default:
+ break;
+ }
+ }
+ }
+
+ g_hash_table_unref (seen);
+}
+
void
rspamd_fuzzy_backend_process_updates (struct rspamd_fuzzy_backend *bk,
GArray *updates, const gchar *src, rspamd_fuzzy_update_cb cb,
g_assert (updates != NULL);
if (updates) {
+ rspamd_fuzzy_backend_deduplicate_queue (updates);
bk->subr->update (bk, updates, src, cb, ud, bk->subr_ud);
}
else if (cb) {