From b4f39106c6f4020e9bd4e1fbf39214f2872c5112 Mon Sep 17 00:00:00 2001 From: Vsevolod Stakhov Date: Mon, 28 Sep 2015 09:35:47 +0100 Subject: Fix duplicate cases in merge operation. --- src/rspamadm/fuzzy_merge.c | 114 +++++++++++++++++++++++++++++++-------------- 1 file changed, 79 insertions(+), 35 deletions(-) (limited to 'src') diff --git a/src/rspamadm/fuzzy_merge.c b/src/rspamadm/fuzzy_merge.c index 48adb519f..0f1e619e1 100644 --- a/src/rspamadm/fuzzy_merge.c +++ b/src/rspamadm/fuzzy_merge.c @@ -191,21 +191,37 @@ enum op_type { }; struct fuzzy_merge_op { enum op_type op; + guchar digest[64]; union { struct { guint flag; gint64 value; - guchar digest[64]; gint64 tm; } dgst; struct { guint number; gint64 value; - guchar digest[64]; } shgl; } data; }; +static guint +rspamadm_op_hash (gconstpointer p) +{ + const struct fuzzy_merge_op *op = p; + + /* Uniformly distributed */ + return *(guint *)op->digest; +} + +static gboolean +rspamadm_op_equal (gconstpointer a, gconstpointer b) +{ + const struct fuzzy_merge_op *op1 = a, *op2 = b; + + return memcmp (op1->digest, op2->digest, sizeof (op1->digest)) == 0; +} + static void rspamadm_fuzzy_merge (gint argc, gchar **argv) { @@ -214,14 +230,15 @@ rspamadm_fuzzy_merge (gint argc, gchar **argv) sqlite3 *dest_db; GPtrArray *source_dbs; GArray *prstmt; - GArray *ops; + GPtrArray *ops; + GHashTable *unique_ops; rspamd_mempool_t *pool; guint i, nsrc; guint64 old_count, inserted = 0, updated = 0, shingles_inserted = 0; - gint64 value, flag, tm, dig_id; + gint64 value, flag, tm, dig_id, src_value, src_flag; sqlite3 *src; sqlite3_stmt *stmt; - struct fuzzy_merge_op nop, *op; + struct fuzzy_merge_op *nop, *op; context = g_option_context_new ( "fuzzy_merge - merge fuzzy databases"); @@ -266,7 +283,8 @@ rspamadm_fuzzy_merge (gint argc, gchar **argv) nsrc = g_strv_length (sources); source_dbs = g_ptr_array_sized_new (nsrc); - ops = g_array_new (FALSE, FALSE, sizeof (nop)); + ops = g_ptr_array_new (); + unique_ops = g_hash_table_new (rspamadm_op_hash, rspamadm_op_equal); for (i = 0; i < nsrc; i++) { src = rspamd_sqlite3_open_or_create (pool, sources[i], NULL, &error); @@ -301,6 +319,8 @@ rspamadm_fuzzy_merge (gint argc, gchar **argv) while (sqlite3_step (stmt) == SQLITE_ROW) { /* id, flag, digest, value, time */ digest = sqlite3_column_text (stmt, 2); + src_value = sqlite3_column_int64 (stmt, 3); + src_flag = sqlite3_column_int64 (stmt, 1); /* Now search for this digest in the destination */ if (rspamd_sqlite3_run_prstmt (pool, @@ -313,30 +333,46 @@ rspamadm_fuzzy_merge (gint argc, gchar **argv) * We compare values and if src value is bigger than * local one then we replace dest value with the src value */ - gint64 src_value = sqlite3_column_int64 (stmt, 3); - gint64 src_flag = sqlite3_column_int64 (stmt, 1); - if (src_value > value && src_flag == flag) { - nop.op = OP_UPDATE; - memcpy (nop.data.dgst.digest, digest, - sizeof (nop.data.dgst.digest)); - nop.data.dgst.flag = flag; + nop = g_slice_alloc (sizeof (*nop)); + nop->op = OP_UPDATE; + memcpy (nop->digest, digest, + sizeof (nop->digest)); + nop->data.dgst.flag = flag; /* Update time as well */ - nop.data.dgst.tm = sqlite3_column_int64 (stmt, 4); - g_array_append_val (ops, nop); + nop->data.dgst.tm = sqlite3_column_int64 (stmt, 4); + + if ((op = g_hash_table_lookup (unique_ops, nop)) == NULL) { + g_ptr_array_add (ops, nop); + g_hash_table_insert (unique_ops, nop, nop); + } + else { + if (op->data.dgst.value < nop->data.dgst.value) { + op->data.dgst.value = nop->data.dgst.value; + } + } } } else { sqlite3_stmt *shgl_stmt; /* Digest has not been found in the destination db, insert it */ - nop.op = OP_INSERT; - memcpy (nop.data.dgst.digest, digest, - sizeof (nop.data.dgst.digest)); - nop.data.dgst.flag = flag; + nop = g_slice_alloc (sizeof (*nop)); + nop->op = OP_INSERT; + memcpy (nop->digest, digest, + sizeof (nop->digest)); + nop->data.dgst.flag = src_flag; + nop->data.dgst.value = src_value; /* Update time as well */ - nop.data.dgst.tm = sqlite3_column_int64 (stmt, 4); - g_array_append_val (ops, nop); + nop->data.dgst.tm = sqlite3_column_int64 (stmt, 4); + + if ((op = g_hash_table_lookup (unique_ops, nop)) == NULL) { + g_ptr_array_add (ops, nop); + g_hash_table_insert (unique_ops, nop, nop); + } + else { + continue; + } /* * If we have no digest registered, we also need to check @@ -352,13 +388,12 @@ rspamadm_fuzzy_merge (gint argc, gchar **argv) while (sqlite3_step (shgl_stmt) == SQLITE_ROW) { /* value, number, digest_id */ - nop.op = OP_INSERT_SHINGLE; - memcpy (nop.data.shgl.digest, digest, - sizeof (nop.data.shgl.digest)); - nop.data.shgl.number = sqlite3_column_int64 (shgl_stmt, 1); - nop.data.shgl.value = sqlite3_column_int64 (shgl_stmt, + nop->op = OP_INSERT_SHINGLE; + memcpy (nop->digest, digest, sizeof (nop->digest)); + nop->data.shgl.number = sqlite3_column_int64 (shgl_stmt, 1); + nop->data.shgl.value = sqlite3_column_int64 (shgl_stmt, 0); - g_array_append_val (ops, nop); + g_ptr_array_add (ops, nop); } sqlite3_finalize (shgl_stmt); @@ -387,7 +422,7 @@ rspamadm_fuzzy_merge (gint argc, gchar **argv) /* Now all ops are inside ops array, so we just iterate over it */ for (i = 0; i < ops->len; i ++) { - op = &g_array_index (ops, struct fuzzy_merge_op, i); + op = g_ptr_array_index (ops, i); switch (op->op) { case OP_INSERT: @@ -397,7 +432,7 @@ rspamadm_fuzzy_merge (gint argc, gchar **argv) prstmt, INSERT, (gint64)op->data.dgst.flag, - (gint64)sizeof (op->data.dgst.digest), op->data.dgst.digest, + (gint64)sizeof (op->digest), op->digest, op->data.dgst.value, op->data.dgst.tm) != SQLITE_OK) { rspamd_fprintf(stderr, "cannot insert digest: %s\n", @@ -414,8 +449,8 @@ rspamadm_fuzzy_merge (gint argc, gchar **argv) UPDATE, (gint64) op->data.dgst.value, op->data.dgst.tm, - (gint64) sizeof (op->data.dgst.digest), - op->data.dgst.digest) != SQLITE_OK) { + (gint64) sizeof (op->digest), + op->digest) != SQLITE_OK) { rspamd_fprintf(stderr, "cannot update digest: %s\n", sqlite3_errmsg (dest_db)); goto err; @@ -429,8 +464,8 @@ rspamadm_fuzzy_merge (gint argc, gchar **argv) dest_db, prstmt, CHECK_DIGEST_ID, - (gint64) sizeof (op->data.dgst.digest), - op->data.dgst.digest, + (gint64) sizeof (op->digest), + op->digest, &dig_id) == SQLITE_OK) { if (rspamd_sqlite3_run_prstmt (pool, dest_db, @@ -466,13 +501,18 @@ rspamadm_fuzzy_merge (gint argc, gchar **argv) rspamd_sqlite3_close_prstmt (dest_db, prstmt); sqlite3_close (dest_db); - g_array_free (ops, TRUE); + for (i = 0; i < ops->len; i++) { + op = g_ptr_array_index (ops, i); + g_slice_free1 (sizeof (*op), op); + } + g_ptr_array_free (ops, TRUE); rspamd_mempool_delete (pool); if (!quiet) { rspamd_printf ("Successfully merged data into %s\n%L hashes added, " "%L hashes updated, %L shingles inserted\nhashes count before update: " "%L\nhashes count after update: %L\n", + target, inserted, updated, shingles_inserted, old_count, old_count + inserted); } @@ -486,7 +526,11 @@ err: TRANSACTION_ROLLBACK); rspamd_sqlite3_close_prstmt (dest_db, prstmt); sqlite3_close (dest_db); - g_array_free (ops, TRUE); + for (i = 0; i < ops->len; i++) { + op = g_ptr_array_index (ops, i); + g_slice_free1 (sizeof (*op), op); + } + g_ptr_array_free (ops, TRUE); rspamd_mempool_delete (pool); -- cgit v1.2.3