]> source.dussan.org Git - rspamd.git/commitdiff
Add merge logic.
authorVsevolod Stakhov <vsevolod@highsecure.ru>
Sat, 26 Sep 2015 13:37:00 +0000 (14:37 +0100)
committerVsevolod Stakhov <vsevolod@highsecure.ru>
Sat, 26 Sep 2015 13:37:00 +0000 (14:37 +0100)
src/rspamadm/fuzzy_merge.c

index 278f84424901823a680998af67af634937949665..23622604762a6c2eef73c36b9f314921f6e0e310 100644 (file)
 
 #include "config.h"
 #include "rspamadm.h"
+#include "logger.h"
 #include "sqlite_utils.h"
 
 static gchar *target = NULL;
 static gchar **sources = NULL;
+static gboolean quiet;
 
 static void rspamadm_fuzzy_merge (gint argc, gchar **argv);
 static const char *rspamadm_fuzzy_merge_help (gboolean full_help);
@@ -44,6 +46,8 @@ static GOptionEntry entries[] = {
                                "Source for merge (can be repeated)",                    NULL},
                {"destination", 'd', 0, G_OPTION_ARG_STRING, &target,
                                "Destination db",     NULL},
+               {"quiet", 'q', 0, G_OPTION_ARG_NONE, &quiet,
+                               "Supress output", NULL},
                {NULL,  0,   0, G_OPTION_ARG_NONE, NULL, NULL, NULL}
 };
 
@@ -67,7 +71,7 @@ static const gchar *create_tables_sql =
 static const gchar *select_digests_sql =
                                "SELECT * FROM digests;";
 static const gchar *select_shingles_sql =
-                               "SELECT * FROM shingles;";
+                               "SELECT * FROM shingles WHERE digest_id=?1;";
 
 enum statement_idx {
        TRANSACTION_START = 0,
@@ -77,7 +81,7 @@ enum statement_idx {
        UPDATE,
        INSERT_SHINGLE,
        CHECK,
-       CHECK_SHINGLE,
+       CHECK_DIGEST_ID,
        COUNT,
        STMAX
 };
@@ -127,9 +131,9 @@ static struct rspamd_sqlite3_prstmt prepared_stmts[STMAX] = {
                },
                [UPDATE] = {
                                .idx = UPDATE,
-                               .sql = "UPDATE digests SET value = value + ?1 WHERE "
-                                               "digest==?2;",
-                               .args = "IB",
+                               .sql = "UPDATE digests SET value=?1, time=?2 WHERE "
+                                               "digest==?3;",
+                               .args = "IIB",
                                .stmt = NULL,
                                .result = SQLITE_DONE,
                                .ret = ""
@@ -140,15 +144,15 @@ static struct rspamd_sqlite3_prstmt prepared_stmts[STMAX] = {
                                .args = "B",
                                .stmt = NULL,
                                .result = SQLITE_ROW,
-                               .ret = ""
+                               .ret = "III"
                },
-               [CHECK_SHINGLE] = {
-                               .idx = CHECK_SHINGLE,
-                               .sql = "SELECT digest_id FROM shingles WHERE value=?1 AND number=?2",
-                               .args = "IS",
+               [CHECK_DIGEST_ID] = {
+                               .idx = CHECK_DIGEST_ID,
+                               .sql = "SELECT id FROM digests WHERE digest==?1",
+                               .args = "B",
                                .stmt = NULL,
                                .result = SQLITE_ROW,
-                               .ret = ""
+                               .ret = "I"
                },
                [COUNT] = {
                                .idx = COUNT,
@@ -197,7 +201,7 @@ struct fuzzy_merge_op {
                struct {
                        guint number;
                        gint64 value;
-                       gint64 dgst;
+                       guchar digest[64];
                } shgl;
        } data;
 };
@@ -213,8 +217,11 @@ rspamadm_fuzzy_merge (gint argc, gchar **argv)
        GArray *ops;
        rspamd_mempool_t *pool;
        guint i, nsrc;
-       guint64 old_count, new_count, inserted = 0, updated = 0;
+       guint64 old_count, inserted = 0, updated = 0, shingles_inserted = 0;
+       gint64 value, flag, tm, dig_id;
+       sqlite3 *src;
        sqlite3_stmt *stmt;
+       struct fuzzy_merge_op nop, *op;
 
        context = g_option_context_new (
                        "fuzzy_merge - merge fuzzy databases");
@@ -259,10 +266,9 @@ rspamadm_fuzzy_merge (gint argc, gchar **argv)
 
        nsrc = g_strv_length (sources);
        source_dbs = g_ptr_array_sized_new (nsrc);
+       ops = g_array_new (FALSE, FALSE, sizeof (nop));
 
        for (i = 0; i < nsrc; i++) {
-               sqlite3 *src;
-
                src = rspamd_sqlite3_open_or_create (pool, sources[i], NULL, &error);
 
                if (src == NULL) {
@@ -276,6 +282,199 @@ rspamadm_fuzzy_merge (gint argc, gchar **argv)
        }
 
        for (i = 0; i < nsrc; i++) {
-               /* Select all digests */
+               const guchar *digest;
+
+               src = g_ptr_array_index (source_dbs, i);
+
+               if (sqlite3_prepare_v2 (src, select_digests_sql, -1, &stmt, NULL) !=
+                                       SQLITE_OK) {
+                       fprintf (stderr, "cannot prepare statement %s\n", select_digests_sql);
+                       exit (1);
+               }
+
+
+               while (sqlite3_step (stmt) == SQLITE_ROW) {
+                       /* id, flag, digest, value, time */
+                       digest = sqlite3_column_text (stmt, 2);
+
+                       /* Now search for this digest in the destination */
+                       if (rspamd_sqlite3_run_prstmt (pool,
+                                       dest_db,
+                                       prstmt,
+                                       CHECK,
+                                       (gint64)sqlite3_column_bytes (stmt, 2), digest,
+                                       &value, &tm, &flag) == SQLITE_OK) {
+                               /*
+                                * We compare values and if src value is bigger than
+                                * local one then we replace dest value with the src value
+                                */
+                               gint64 src_value = sqlite3_column_int64 (stmt, 3);
+                               gint64 src_flag = sqlite3_column_int64 (stmt, 1);
+
+                               if (src_value > value && src_flag == flag) {
+                                       nop.op = OP_UPDATE;
+                                       memcpy (nop.data.dgst.digest, digest,
+                                                       sizeof (nop.data.dgst.digest));
+                                       nop.data.dgst.flag = flag;
+                                       /* Update time as well */
+                                       nop.data.dgst.tm = sqlite3_column_int64 (stmt, 4);
+                                       g_array_append_val (ops, nop);
+                               }
+                       }
+                       else {
+                               sqlite3_stmt *shgl_stmt;
+
+                               /* Digest has not been found in the destination db, insert it */
+                               nop.op = OP_INSERT;
+                               memcpy (nop.data.dgst.digest, digest,
+                                               sizeof (nop.data.dgst.digest));
+                               nop.data.dgst.flag = flag;
+                               /* Update time as well */
+                               nop.data.dgst.tm = sqlite3_column_int64 (stmt, 4);
+                               g_array_append_val (ops, nop);
+
+                               /*
+                                * If we have no digest registered, we also need to check
+                                * shingles associated with this digest
+                                */
+                               if (sqlite3_prepare_v2 (src,
+                                               select_shingles_sql,
+                                               -1,
+                                               &shgl_stmt,
+                                               NULL) != SQLITE_OK) {
+                                       sqlite3_bind_int64 (shgl_stmt,
+                                                       sqlite3_column_int64 (stmt, 0), 1);
+
+                                       while (sqlite3_step (shgl_stmt) == SQLITE_ROW) {
+                                               /* value, number, digest_id */
+                                               nop.op = OP_INSERT_SHINGLE;
+                                               memcpy (nop.data.shgl.digest, digest,
+                                                               sizeof (nop.data.shgl.digest));
+                                               nop.data.shgl.number = sqlite3_column_int64 (shgl_stmt, 1);
+                                               nop.data.shgl.value = sqlite3_column_int64 (shgl_stmt,
+                                                               0);
+                                               g_array_append_val (ops, nop);
+                                       }
+
+                                       sqlite3_finalize (shgl_stmt);
+                               }
+                       }
+               }
+
+               /* Cleanup */
+               sqlite3_finalize (stmt);
+               sqlite3_close (src);
        }
-}
\ No newline at end of file
+
+       /* Start transaction */
+       if (rspamd_sqlite3_run_prstmt (pool,
+                       dest_db,
+                       prstmt,
+                       TRANSACTION_START) == SQLITE_OK) {
+               fprintf (stderr, "cannot start transaction in destination\n");
+               exit (1);
+       }
+
+       /* Now all ops are inside ops array, so we just iterate over it */
+       for (i = 0; i < ops->len; i ++) {
+               op = &g_array_index (ops, struct fuzzy_merge_op, i);
+
+               switch (op->op) {
+               case OP_INSERT:
+                       /* flag, digest, value, time */
+                       if (rspamd_sqlite3_run_prstmt (pool,
+                                       dest_db,
+                                       prstmt,
+                                       INSERT,
+                                       (gint64)op->data.dgst.flag,
+                                       (gint64)sizeof (op->data.dgst.digest), op->data.dgst.digest,
+                                       op->data.dgst.value,
+                                       op->data.dgst.tm) != SQLITE_OK) {
+                               fprintf (stderr, "cannot insert digest\n");
+                               goto err;
+                       }
+
+                       inserted ++;
+                       break;
+               case OP_UPDATE:
+                       if (rspamd_sqlite3_run_prstmt (pool,
+                                       dest_db,
+                                       prstmt,
+                                       UPDATE,
+                                       (gint64) op->data.dgst.value,
+                                       op->data.dgst.tm,
+                                       (gint64) sizeof (op->data.dgst.digest),
+                                       op->data.dgst.digest) != SQLITE_OK) {
+                               fprintf (stderr, "cannot update digest\n");
+                               goto err;
+                       }
+
+                       updated ++;
+                       break;
+               case OP_INSERT_SHINGLE:
+                       /* First select the appropriate digest */
+                       if (rspamd_sqlite3_run_prstmt (pool,
+                                       dest_db,
+                                       prstmt,
+                                       CHECK_DIGEST_ID,
+                                       (gint64) sizeof (op->data.dgst.digest),
+                                       op->data.dgst.digest,
+                                       &dig_id) == SQLITE_OK) {
+                               if (rspamd_sqlite3_run_prstmt (pool,
+                                               dest_db,
+                                               prstmt,
+                                               INSERT_SHINGLE,
+                                               (gint64)op->data.shgl.value,
+                                               (gint64)op->data.shgl.number,
+                                               dig_id) != SQLITE_OK) {
+                                       fprintf (stderr, "cannot insert shingle\n");
+                                       goto err;
+                               }
+
+                               shingles_inserted ++;
+                       }
+                       else {
+                               msg_warn_pool ("cannot find digest id for shingle");
+                       }
+
+                       break;
+               }
+       }
+
+       /* Normal closing */
+       rspamd_sqlite3_run_prstmt (pool,
+                       dest_db,
+                       prstmt,
+                       TRANSACTION_COMMIT);
+       rspamd_sqlite3_close_prstmt (dest_db, prstmt);
+       sqlite3_close (dest_db);
+       g_array_free (ops, TRUE);
+       rspamd_mempool_delete (pool);
+
+       if (!quiet) {
+               rspamd_printf ("Successfully merged data into %s\n%L hashes added, "
+                               "%L hashes updated, %L shingles inserted\nhashes count before update: "
+                               "%L\nhashes count after update: %L\n",
+                               inserted, updated, shingles_inserted,
+                               old_count, old_count + inserted);
+       }
+
+       exit (EXIT_SUCCESS);
+
+err:
+       rspamd_sqlite3_run_prstmt (pool,
+               dest_db,
+               prstmt,
+               TRANSACTION_ROLLBACK);
+       rspamd_sqlite3_close_prstmt (dest_db, prstmt);
+       sqlite3_close (dest_db);
+       g_array_free (ops, TRUE);
+       rspamd_mempool_delete (pool);
+
+
+       if (!quiet) {
+               rspamd_printf ("Merge failed, rolled back\n");
+       }
+
+       exit (EXIT_FAILURE);
+}