aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorVsevolod Stakhov <vsevolod@highsecure.ru>2015-09-26 14:37:00 +0100
committerVsevolod Stakhov <vsevolod@highsecure.ru>2015-09-26 14:37:00 +0100
commitd0a51ab128ee0bfaa5daf6493fc8df595d234bc2 (patch)
tree3cbd83e6f6d4ca003b251c5aedcd6d9551f664d4
parent67f20b684d802fd534c9d43e163d2b52a6cdddd6 (diff)
downloadrspamd-d0a51ab128ee0bfaa5daf6493fc8df595d234bc2.tar.gz
rspamd-d0a51ab128ee0bfaa5daf6493fc8df595d234bc2.zip
Add merge logic.
-rw-r--r--src/rspamadm/fuzzy_merge.c233
1 files changed, 216 insertions, 17 deletions
diff --git a/src/rspamadm/fuzzy_merge.c b/src/rspamadm/fuzzy_merge.c
index 278f84424..236226047 100644
--- a/src/rspamadm/fuzzy_merge.c
+++ b/src/rspamadm/fuzzy_merge.c
@@ -24,10 +24,12 @@
#include "config.h"
#include "rspamadm.h"
+#include "logger.h"
#include "sqlite_utils.h"
static gchar *target = NULL;
static gchar **sources = NULL;
+static gboolean quiet;
static void rspamadm_fuzzy_merge (gint argc, gchar **argv);
static const char *rspamadm_fuzzy_merge_help (gboolean full_help);
@@ -44,6 +46,8 @@ static GOptionEntry entries[] = {
"Source for merge (can be repeated)", NULL},
{"destination", 'd', 0, G_OPTION_ARG_STRING, &target,
"Destination db", NULL},
+ {"quiet", 'q', 0, G_OPTION_ARG_NONE, &quiet,
+ "Supress output", NULL},
{NULL, 0, 0, G_OPTION_ARG_NONE, NULL, NULL, NULL}
};
@@ -67,7 +71,7 @@ static const gchar *create_tables_sql =
static const gchar *select_digests_sql =
"SELECT * FROM digests;";
static const gchar *select_shingles_sql =
- "SELECT * FROM shingles;";
+ "SELECT * FROM shingles WHERE digest_id=?1;";
enum statement_idx {
TRANSACTION_START = 0,
@@ -77,7 +81,7 @@ enum statement_idx {
UPDATE,
INSERT_SHINGLE,
CHECK,
- CHECK_SHINGLE,
+ CHECK_DIGEST_ID,
COUNT,
STMAX
};
@@ -127,9 +131,9 @@ static struct rspamd_sqlite3_prstmt prepared_stmts[STMAX] = {
},
[UPDATE] = {
.idx = UPDATE,
- .sql = "UPDATE digests SET value = value + ?1 WHERE "
- "digest==?2;",
- .args = "IB",
+ .sql = "UPDATE digests SET value=?1, time=?2 WHERE "
+ "digest==?3;",
+ .args = "IIB",
.stmt = NULL,
.result = SQLITE_DONE,
.ret = ""
@@ -140,15 +144,15 @@ static struct rspamd_sqlite3_prstmt prepared_stmts[STMAX] = {
.args = "B",
.stmt = NULL,
.result = SQLITE_ROW,
- .ret = ""
+ .ret = "III"
},
- [CHECK_SHINGLE] = {
- .idx = CHECK_SHINGLE,
- .sql = "SELECT digest_id FROM shingles WHERE value=?1 AND number=?2",
- .args = "IS",
+ [CHECK_DIGEST_ID] = {
+ .idx = CHECK_DIGEST_ID,
+ .sql = "SELECT id FROM digests WHERE digest==?1",
+ .args = "B",
.stmt = NULL,
.result = SQLITE_ROW,
- .ret = ""
+ .ret = "I"
},
[COUNT] = {
.idx = COUNT,
@@ -197,7 +201,7 @@ struct fuzzy_merge_op {
struct {
guint number;
gint64 value;
- gint64 dgst;
+ guchar digest[64];
} shgl;
} data;
};
@@ -213,8 +217,11 @@ rspamadm_fuzzy_merge (gint argc, gchar **argv)
GArray *ops;
rspamd_mempool_t *pool;
guint i, nsrc;
- guint64 old_count, new_count, inserted = 0, updated = 0;
+ guint64 old_count, inserted = 0, updated = 0, shingles_inserted = 0;
+ gint64 value, flag, tm, dig_id;
+ sqlite3 *src;
sqlite3_stmt *stmt;
+ struct fuzzy_merge_op nop, *op;
context = g_option_context_new (
"fuzzy_merge - merge fuzzy databases");
@@ -259,10 +266,9 @@ rspamadm_fuzzy_merge (gint argc, gchar **argv)
nsrc = g_strv_length (sources);
source_dbs = g_ptr_array_sized_new (nsrc);
+ ops = g_array_new (FALSE, FALSE, sizeof (nop));
for (i = 0; i < nsrc; i++) {
- sqlite3 *src;
-
src = rspamd_sqlite3_open_or_create (pool, sources[i], NULL, &error);
if (src == NULL) {
@@ -276,6 +282,199 @@ rspamadm_fuzzy_merge (gint argc, gchar **argv)
}
for (i = 0; i < nsrc; i++) {
- /* Select all digests */
+ const guchar *digest;
+
+ src = g_ptr_array_index (source_dbs, i);
+
+ if (sqlite3_prepare_v2 (src, select_digests_sql, -1, &stmt, NULL) !=
+ SQLITE_OK) {
+ fprintf (stderr, "cannot prepare statement %s\n", select_digests_sql);
+ exit (1);
+ }
+
+
+ while (sqlite3_step (stmt) == SQLITE_ROW) {
+ /* id, flag, digest, value, time */
+ digest = sqlite3_column_text (stmt, 2);
+
+ /* Now search for this digest in the destination */
+ if (rspamd_sqlite3_run_prstmt (pool,
+ dest_db,
+ prstmt,
+ CHECK,
+ (gint64)sqlite3_column_bytes (stmt, 2), digest,
+ &value, &tm, &flag) == SQLITE_OK) {
+ /*
+ * We compare values and if src value is bigger than
+ * local one then we replace dest value with the src value
+ */
+ gint64 src_value = sqlite3_column_int64 (stmt, 3);
+ gint64 src_flag = sqlite3_column_int64 (stmt, 1);
+
+ if (src_value > value && src_flag == flag) {
+ nop.op = OP_UPDATE;
+ memcpy (nop.data.dgst.digest, digest,
+ sizeof (nop.data.dgst.digest));
+ nop.data.dgst.flag = flag;
+ /* Update time as well */
+ nop.data.dgst.tm = sqlite3_column_int64 (stmt, 4);
+ g_array_append_val (ops, nop);
+ }
+ }
+ else {
+ sqlite3_stmt *shgl_stmt;
+
+ /* Digest has not been found in the destination db, insert it */
+ nop.op = OP_INSERT;
+ memcpy (nop.data.dgst.digest, digest,
+ sizeof (nop.data.dgst.digest));
+ nop.data.dgst.flag = flag;
+ /* Update time as well */
+ nop.data.dgst.tm = sqlite3_column_int64 (stmt, 4);
+ g_array_append_val (ops, nop);
+
+ /*
+ * If we have no digest registered, we also need to check
+ * shingles associated with this digest
+ */
+ if (sqlite3_prepare_v2 (src,
+ select_shingles_sql,
+ -1,
+ &shgl_stmt,
+ NULL) != SQLITE_OK) {
+ sqlite3_bind_int64 (shgl_stmt,
+ sqlite3_column_int64 (stmt, 0), 1);
+
+ while (sqlite3_step (shgl_stmt) == SQLITE_ROW) {
+ /* value, number, digest_id */
+ nop.op = OP_INSERT_SHINGLE;
+ memcpy (nop.data.shgl.digest, digest,
+ sizeof (nop.data.shgl.digest));
+ nop.data.shgl.number = sqlite3_column_int64 (shgl_stmt, 1);
+ nop.data.shgl.value = sqlite3_column_int64 (shgl_stmt,
+ 0);
+ g_array_append_val (ops, nop);
+ }
+
+ sqlite3_finalize (shgl_stmt);
+ }
+ }
+ }
+
+ /* Cleanup */
+ sqlite3_finalize (stmt);
+ sqlite3_close (src);
}
-} \ No newline at end of file
+
+ /* Start transaction */
+ if (rspamd_sqlite3_run_prstmt (pool,
+ dest_db,
+ prstmt,
+ TRANSACTION_START) == SQLITE_OK) {
+ fprintf (stderr, "cannot start transaction in destination\n");
+ exit (1);
+ }
+
+ /* Now all ops are inside ops array, so we just iterate over it */
+ for (i = 0; i < ops->len; i ++) {
+ op = &g_array_index (ops, struct fuzzy_merge_op, i);
+
+ switch (op->op) {
+ case OP_INSERT:
+ /* flag, digest, value, time */
+ if (rspamd_sqlite3_run_prstmt (pool,
+ dest_db,
+ prstmt,
+ INSERT,
+ (gint64)op->data.dgst.flag,
+ (gint64)sizeof (op->data.dgst.digest), op->data.dgst.digest,
+ op->data.dgst.value,
+ op->data.dgst.tm) != SQLITE_OK) {
+ fprintf (stderr, "cannot insert digest\n");
+ goto err;
+ }
+
+ inserted ++;
+ break;
+ case OP_UPDATE:
+ if (rspamd_sqlite3_run_prstmt (pool,
+ dest_db,
+ prstmt,
+ UPDATE,
+ (gint64) op->data.dgst.value,
+ op->data.dgst.tm,
+ (gint64) sizeof (op->data.dgst.digest),
+ op->data.dgst.digest) != SQLITE_OK) {
+ fprintf (stderr, "cannot update digest\n");
+ goto err;
+ }
+
+ updated ++;
+ break;
+ case OP_INSERT_SHINGLE:
+ /* First select the appropriate digest */
+ if (rspamd_sqlite3_run_prstmt (pool,
+ dest_db,
+ prstmt,
+ CHECK_DIGEST_ID,
+ (gint64) sizeof (op->data.dgst.digest),
+ op->data.dgst.digest,
+ &dig_id) == SQLITE_OK) {
+ if (rspamd_sqlite3_run_prstmt (pool,
+ dest_db,
+ prstmt,
+ INSERT_SHINGLE,
+ (gint64)op->data.shgl.value,
+ (gint64)op->data.shgl.number,
+ dig_id) != SQLITE_OK) {
+ fprintf (stderr, "cannot insert shingle\n");
+ goto err;
+ }
+
+ shingles_inserted ++;
+ }
+ else {
+ msg_warn_pool ("cannot find digest id for shingle");
+ }
+
+ break;
+ }
+ }
+
+ /* Normal closing */
+ rspamd_sqlite3_run_prstmt (pool,
+ dest_db,
+ prstmt,
+ TRANSACTION_COMMIT);
+ rspamd_sqlite3_close_prstmt (dest_db, prstmt);
+ sqlite3_close (dest_db);
+ g_array_free (ops, TRUE);
+ rspamd_mempool_delete (pool);
+
+ if (!quiet) {
+ rspamd_printf ("Successfully merged data into %s\n%L hashes added, "
+ "%L hashes updated, %L shingles inserted\nhashes count before update: "
+ "%L\nhashes count after update: %L\n",
+ inserted, updated, shingles_inserted,
+ old_count, old_count + inserted);
+ }
+
+ exit (EXIT_SUCCESS);
+
+err:
+ rspamd_sqlite3_run_prstmt (pool,
+ dest_db,
+ prstmt,
+ TRANSACTION_ROLLBACK);
+ rspamd_sqlite3_close_prstmt (dest_db, prstmt);
+ sqlite3_close (dest_db);
+ g_array_free (ops, TRUE);
+ rspamd_mempool_delete (pool);
+
+
+ if (!quiet) {
+ rspamd_printf ("Merge failed, rolled back\n");
+ }
+
+ exit (EXIT_FAILURE);
+}