From: Vsevolod Stakhov Date: Sat, 26 Sep 2015 13:37:00 +0000 (+0100) Subject: Add merge logic. X-Git-Tag: 1.0.3~13 X-Git-Url: https://source.dussan.org/?a=commitdiff_plain;h=d0a51ab128ee0bfaa5daf6493fc8df595d234bc2;p=rspamd.git Add merge logic. --- diff --git a/src/rspamadm/fuzzy_merge.c b/src/rspamadm/fuzzy_merge.c index 278f84424..236226047 100644 --- a/src/rspamadm/fuzzy_merge.c +++ b/src/rspamadm/fuzzy_merge.c @@ -24,10 +24,12 @@ #include "config.h" #include "rspamadm.h" +#include "logger.h" #include "sqlite_utils.h" static gchar *target = NULL; static gchar **sources = NULL; +static gboolean quiet; static void rspamadm_fuzzy_merge (gint argc, gchar **argv); static const char *rspamadm_fuzzy_merge_help (gboolean full_help); @@ -44,6 +46,8 @@ static GOptionEntry entries[] = { "Source for merge (can be repeated)", NULL}, {"destination", 'd', 0, G_OPTION_ARG_STRING, &target, "Destination db", NULL}, + {"quiet", 'q', 0, G_OPTION_ARG_NONE, &quiet, + "Supress output", NULL}, {NULL, 0, 0, G_OPTION_ARG_NONE, NULL, NULL, NULL} }; @@ -67,7 +71,7 @@ static const gchar *create_tables_sql = static const gchar *select_digests_sql = "SELECT * FROM digests;"; static const gchar *select_shingles_sql = - "SELECT * FROM shingles;"; + "SELECT * FROM shingles WHERE digest_id=?1;"; enum statement_idx { TRANSACTION_START = 0, @@ -77,7 +81,7 @@ enum statement_idx { UPDATE, INSERT_SHINGLE, CHECK, - CHECK_SHINGLE, + CHECK_DIGEST_ID, COUNT, STMAX }; @@ -127,9 +131,9 @@ static struct rspamd_sqlite3_prstmt prepared_stmts[STMAX] = { }, [UPDATE] = { .idx = UPDATE, - .sql = "UPDATE digests SET value = value + ?1 WHERE " - "digest==?2;", - .args = "IB", + .sql = "UPDATE digests SET value=?1, time=?2 WHERE " + "digest==?3;", + .args = "IIB", .stmt = NULL, .result = SQLITE_DONE, .ret = "" @@ -140,15 +144,15 @@ static struct rspamd_sqlite3_prstmt prepared_stmts[STMAX] = { .args = "B", .stmt = NULL, .result = SQLITE_ROW, - .ret = "" + .ret = "III" }, - [CHECK_SHINGLE] = { - .idx = CHECK_SHINGLE, - .sql = "SELECT digest_id FROM shingles WHERE value=?1 AND number=?2", - .args = "IS", + [CHECK_DIGEST_ID] = { + .idx = CHECK_DIGEST_ID, + .sql = "SELECT id FROM digests WHERE digest==?1", + .args = "B", .stmt = NULL, .result = SQLITE_ROW, - .ret = "" + .ret = "I" }, [COUNT] = { .idx = COUNT, @@ -197,7 +201,7 @@ struct fuzzy_merge_op { struct { guint number; gint64 value; - gint64 dgst; + guchar digest[64]; } shgl; } data; }; @@ -213,8 +217,11 @@ rspamadm_fuzzy_merge (gint argc, gchar **argv) GArray *ops; rspamd_mempool_t *pool; guint i, nsrc; - guint64 old_count, new_count, inserted = 0, updated = 0; + guint64 old_count, inserted = 0, updated = 0, shingles_inserted = 0; + gint64 value, flag, tm, dig_id; + sqlite3 *src; sqlite3_stmt *stmt; + struct fuzzy_merge_op nop, *op; context = g_option_context_new ( "fuzzy_merge - merge fuzzy databases"); @@ -259,10 +266,9 @@ rspamadm_fuzzy_merge (gint argc, gchar **argv) nsrc = g_strv_length (sources); source_dbs = g_ptr_array_sized_new (nsrc); + ops = g_array_new (FALSE, FALSE, sizeof (nop)); for (i = 0; i < nsrc; i++) { - sqlite3 *src; - src = rspamd_sqlite3_open_or_create (pool, sources[i], NULL, &error); if (src == NULL) { @@ -276,6 +282,199 @@ rspamadm_fuzzy_merge (gint argc, gchar **argv) } for (i = 0; i < nsrc; i++) { - /* Select all digests */ + const guchar *digest; + + src = g_ptr_array_index (source_dbs, i); + + if (sqlite3_prepare_v2 (src, select_digests_sql, -1, &stmt, NULL) != + SQLITE_OK) { + fprintf (stderr, "cannot prepare statement %s\n", select_digests_sql); + exit (1); + } + + + while (sqlite3_step (stmt) == SQLITE_ROW) { + /* id, flag, digest, value, time */ + digest = sqlite3_column_text (stmt, 2); + + /* Now search for this digest in the destination */ + if (rspamd_sqlite3_run_prstmt (pool, + dest_db, + prstmt, + CHECK, + (gint64)sqlite3_column_bytes (stmt, 2), digest, + &value, &tm, &flag) == SQLITE_OK) { + /* + * We compare values and if src value is bigger than + * local one then we replace dest value with the src value + */ + gint64 src_value = sqlite3_column_int64 (stmt, 3); + gint64 src_flag = sqlite3_column_int64 (stmt, 1); + + if (src_value > value && src_flag == flag) { + nop.op = OP_UPDATE; + memcpy (nop.data.dgst.digest, digest, + sizeof (nop.data.dgst.digest)); + nop.data.dgst.flag = flag; + /* Update time as well */ + nop.data.dgst.tm = sqlite3_column_int64 (stmt, 4); + g_array_append_val (ops, nop); + } + } + else { + sqlite3_stmt *shgl_stmt; + + /* Digest has not been found in the destination db, insert it */ + nop.op = OP_INSERT; + memcpy (nop.data.dgst.digest, digest, + sizeof (nop.data.dgst.digest)); + nop.data.dgst.flag = flag; + /* Update time as well */ + nop.data.dgst.tm = sqlite3_column_int64 (stmt, 4); + g_array_append_val (ops, nop); + + /* + * If we have no digest registered, we also need to check + * shingles associated with this digest + */ + if (sqlite3_prepare_v2 (src, + select_shingles_sql, + -1, + &shgl_stmt, + NULL) != SQLITE_OK) { + sqlite3_bind_int64 (shgl_stmt, + sqlite3_column_int64 (stmt, 0), 1); + + while (sqlite3_step (shgl_stmt) == SQLITE_ROW) { + /* value, number, digest_id */ + nop.op = OP_INSERT_SHINGLE; + memcpy (nop.data.shgl.digest, digest, + sizeof (nop.data.shgl.digest)); + nop.data.shgl.number = sqlite3_column_int64 (shgl_stmt, 1); + nop.data.shgl.value = sqlite3_column_int64 (shgl_stmt, + 0); + g_array_append_val (ops, nop); + } + + sqlite3_finalize (shgl_stmt); + } + } + } + + /* Cleanup */ + sqlite3_finalize (stmt); + sqlite3_close (src); } -} \ No newline at end of file + + /* Start transaction */ + if (rspamd_sqlite3_run_prstmt (pool, + dest_db, + prstmt, + TRANSACTION_START) == SQLITE_OK) { + fprintf (stderr, "cannot start transaction in destination\n"); + exit (1); + } + + /* Now all ops are inside ops array, so we just iterate over it */ + for (i = 0; i < ops->len; i ++) { + op = &g_array_index (ops, struct fuzzy_merge_op, i); + + switch (op->op) { + case OP_INSERT: + /* flag, digest, value, time */ + if (rspamd_sqlite3_run_prstmt (pool, + dest_db, + prstmt, + INSERT, + (gint64)op->data.dgst.flag, + (gint64)sizeof (op->data.dgst.digest), op->data.dgst.digest, + op->data.dgst.value, + op->data.dgst.tm) != SQLITE_OK) { + fprintf (stderr, "cannot insert digest\n"); + goto err; + } + + inserted ++; + break; + case OP_UPDATE: + if (rspamd_sqlite3_run_prstmt (pool, + dest_db, + prstmt, + UPDATE, + (gint64) op->data.dgst.value, + op->data.dgst.tm, + (gint64) sizeof (op->data.dgst.digest), + op->data.dgst.digest) != SQLITE_OK) { + fprintf (stderr, "cannot update digest\n"); + goto err; + } + + updated ++; + break; + case OP_INSERT_SHINGLE: + /* First select the appropriate digest */ + if (rspamd_sqlite3_run_prstmt (pool, + dest_db, + prstmt, + CHECK_DIGEST_ID, + (gint64) sizeof (op->data.dgst.digest), + op->data.dgst.digest, + &dig_id) == SQLITE_OK) { + if (rspamd_sqlite3_run_prstmt (pool, + dest_db, + prstmt, + INSERT_SHINGLE, + (gint64)op->data.shgl.value, + (gint64)op->data.shgl.number, + dig_id) != SQLITE_OK) { + fprintf (stderr, "cannot insert shingle\n"); + goto err; + } + + shingles_inserted ++; + } + else { + msg_warn_pool ("cannot find digest id for shingle"); + } + + break; + } + } + + /* Normal closing */ + rspamd_sqlite3_run_prstmt (pool, + dest_db, + prstmt, + TRANSACTION_COMMIT); + rspamd_sqlite3_close_prstmt (dest_db, prstmt); + sqlite3_close (dest_db); + g_array_free (ops, TRUE); + rspamd_mempool_delete (pool); + + if (!quiet) { + rspamd_printf ("Successfully merged data into %s\n%L hashes added, " + "%L hashes updated, %L shingles inserted\nhashes count before update: " + "%L\nhashes count after update: %L\n", + inserted, updated, shingles_inserted, + old_count, old_count + inserted); + } + + exit (EXIT_SUCCESS); + +err: + rspamd_sqlite3_run_prstmt (pool, + dest_db, + prstmt, + TRANSACTION_ROLLBACK); + rspamd_sqlite3_close_prstmt (dest_db, prstmt); + sqlite3_close (dest_db); + g_array_free (ops, TRUE); + rspamd_mempool_delete (pool); + + + if (!quiet) { + rspamd_printf ("Merge failed, rolled back\n"); + } + + exit (EXIT_FAILURE); +}