};
struct fuzzy_merge_op {
enum op_type op;
+ guchar digest[64];
union {
struct {
guint flag;
gint64 value;
- guchar digest[64];
gint64 tm;
} dgst;
struct {
guint number;
gint64 value;
- guchar digest[64];
} shgl;
} data;
};
+static guint
+rspamadm_op_hash (gconstpointer p)
+{
+ const struct fuzzy_merge_op *op = p;
+
+ /* Uniformly distributed */
+ return *(guint *)op->digest;
+}
+
+static gboolean
+rspamadm_op_equal (gconstpointer a, gconstpointer b)
+{
+ const struct fuzzy_merge_op *op1 = a, *op2 = b;
+
+ return memcmp (op1->digest, op2->digest, sizeof (op1->digest)) == 0;
+}
+
static void
rspamadm_fuzzy_merge (gint argc, gchar **argv)
{
sqlite3 *dest_db;
GPtrArray *source_dbs;
GArray *prstmt;
- GArray *ops;
+ GPtrArray *ops;
+ GHashTable *unique_ops;
rspamd_mempool_t *pool;
guint i, nsrc;
guint64 old_count, inserted = 0, updated = 0, shingles_inserted = 0;
- gint64 value, flag, tm, dig_id;
+ gint64 value, flag, tm, dig_id, src_value, src_flag;
sqlite3 *src;
sqlite3_stmt *stmt;
- struct fuzzy_merge_op nop, *op;
+ struct fuzzy_merge_op *nop, *op;
context = g_option_context_new (
"fuzzy_merge - merge fuzzy databases");
nsrc = g_strv_length (sources);
source_dbs = g_ptr_array_sized_new (nsrc);
- ops = g_array_new (FALSE, FALSE, sizeof (nop));
+ ops = g_ptr_array_new ();
+ unique_ops = g_hash_table_new (rspamadm_op_hash, rspamadm_op_equal);
for (i = 0; i < nsrc; i++) {
src = rspamd_sqlite3_open_or_create (pool, sources[i], NULL, &error);
while (sqlite3_step (stmt) == SQLITE_ROW) {
/* id, flag, digest, value, time */
digest = sqlite3_column_text (stmt, 2);
+ src_value = sqlite3_column_int64 (stmt, 3);
+ src_flag = sqlite3_column_int64 (stmt, 1);
/* Now search for this digest in the destination */
if (rspamd_sqlite3_run_prstmt (pool,
* We compare values and if src value is bigger than
* local one then we replace dest value with the src value
*/
- gint64 src_value = sqlite3_column_int64 (stmt, 3);
- gint64 src_flag = sqlite3_column_int64 (stmt, 1);
-
if (src_value > value && src_flag == flag) {
- nop.op = OP_UPDATE;
- memcpy (nop.data.dgst.digest, digest,
- sizeof (nop.data.dgst.digest));
- nop.data.dgst.flag = flag;
+ nop = g_slice_alloc (sizeof (*nop));
+ nop->op = OP_UPDATE;
+ memcpy (nop->digest, digest,
+ sizeof (nop->digest));
+ nop->data.dgst.flag = flag;
/* Update time as well */
- nop.data.dgst.tm = sqlite3_column_int64 (stmt, 4);
- g_array_append_val (ops, nop);
+ nop->data.dgst.tm = sqlite3_column_int64 (stmt, 4);
+
+ if ((op = g_hash_table_lookup (unique_ops, nop)) == NULL) {
+ g_ptr_array_add (ops, nop);
+ g_hash_table_insert (unique_ops, nop, nop);
+ }
+ else {
+ if (op->data.dgst.value < nop->data.dgst.value) {
+ op->data.dgst.value = nop->data.dgst.value;
+ }
+ }
}
}
else {
sqlite3_stmt *shgl_stmt;
/* Digest has not been found in the destination db, insert it */
- nop.op = OP_INSERT;
- memcpy (nop.data.dgst.digest, digest,
- sizeof (nop.data.dgst.digest));
- nop.data.dgst.flag = flag;
+ nop = g_slice_alloc (sizeof (*nop));
+ nop->op = OP_INSERT;
+ memcpy (nop->digest, digest,
+ sizeof (nop->digest));
+ nop->data.dgst.flag = src_flag;
+ nop->data.dgst.value = src_value;
/* Update time as well */
- nop.data.dgst.tm = sqlite3_column_int64 (stmt, 4);
- g_array_append_val (ops, nop);
+ nop->data.dgst.tm = sqlite3_column_int64 (stmt, 4);
+
+ if ((op = g_hash_table_lookup (unique_ops, nop)) == NULL) {
+ g_ptr_array_add (ops, nop);
+ g_hash_table_insert (unique_ops, nop, nop);
+ }
+ else {
+ continue;
+ }
/*
* If we have no digest registered, we also need to check
while (sqlite3_step (shgl_stmt) == SQLITE_ROW) {
/* value, number, digest_id */
- nop.op = OP_INSERT_SHINGLE;
- memcpy (nop.data.shgl.digest, digest,
- sizeof (nop.data.shgl.digest));
- nop.data.shgl.number = sqlite3_column_int64 (shgl_stmt, 1);
- nop.data.shgl.value = sqlite3_column_int64 (shgl_stmt,
+ nop->op = OP_INSERT_SHINGLE;
+ memcpy (nop->digest, digest, sizeof (nop->digest));
+ nop->data.shgl.number = sqlite3_column_int64 (shgl_stmt, 1);
+ nop->data.shgl.value = sqlite3_column_int64 (shgl_stmt,
0);
- g_array_append_val (ops, nop);
+ g_ptr_array_add (ops, nop);
}
sqlite3_finalize (shgl_stmt);
/* Now all ops are inside ops array, so we just iterate over it */
for (i = 0; i < ops->len; i ++) {
- op = &g_array_index (ops, struct fuzzy_merge_op, i);
+ op = g_ptr_array_index (ops, i);
switch (op->op) {
case OP_INSERT:
prstmt,
INSERT,
(gint64)op->data.dgst.flag,
- (gint64)sizeof (op->data.dgst.digest), op->data.dgst.digest,
+ (gint64)sizeof (op->digest), op->digest,
op->data.dgst.value,
op->data.dgst.tm) != SQLITE_OK) {
rspamd_fprintf(stderr, "cannot insert digest: %s\n",
UPDATE,
(gint64) op->data.dgst.value,
op->data.dgst.tm,
- (gint64) sizeof (op->data.dgst.digest),
- op->data.dgst.digest) != SQLITE_OK) {
+ (gint64) sizeof (op->digest),
+ op->digest) != SQLITE_OK) {
rspamd_fprintf(stderr, "cannot update digest: %s\n",
sqlite3_errmsg (dest_db));
goto err;
dest_db,
prstmt,
CHECK_DIGEST_ID,
- (gint64) sizeof (op->data.dgst.digest),
- op->data.dgst.digest,
+ (gint64) sizeof (op->digest),
+ op->digest,
&dig_id) == SQLITE_OK) {
if (rspamd_sqlite3_run_prstmt (pool,
dest_db,
rspamd_sqlite3_close_prstmt (dest_db, prstmt);
sqlite3_close (dest_db);
- g_array_free (ops, TRUE);
+ for (i = 0; i < ops->len; i++) {
+ op = g_ptr_array_index (ops, i);
+ g_slice_free1 (sizeof (*op), op);
+ }
+ g_ptr_array_free (ops, TRUE);
rspamd_mempool_delete (pool);
if (!quiet) {
rspamd_printf ("Successfully merged data into %s\n%L hashes added, "
"%L hashes updated, %L shingles inserted\nhashes count before update: "
"%L\nhashes count after update: %L\n",
+ target,
inserted, updated, shingles_inserted,
old_count, old_count + inserted);
}
TRANSACTION_ROLLBACK);
rspamd_sqlite3_close_prstmt (dest_db, prstmt);
sqlite3_close (dest_db);
- g_array_free (ops, TRUE);
+ for (i = 0; i < ops->len; i++) {
+ op = g_ptr_array_index (ops, i);
+ g_slice_free1 (sizeof (*op), op);
+ }
+ g_ptr_array_free (ops, TRUE);
rspamd_mempool_delete (pool);