From f3b08af57c535bf632b600ef7b4d53eae3306744 Mon Sep 17 00:00:00 2001 From: Vsevolod Stakhov Date: Mon, 23 May 2016 18:16:11 +0100 Subject: [Feature] Implement multiple-sources fuzzy storage --- src/fuzzy_storage.c | 40 ++++++++++----- src/libserver/fuzzy_backend.c | 111 ++++++++++++++++++++++++++---------------- src/libserver/fuzzy_backend.h | 8 +-- 3 files changed, 103 insertions(+), 56 deletions(-) (limited to 'src') diff --git a/src/fuzzy_storage.c b/src/fuzzy_storage.c index b233fccd6..6c7289fee 100644 --- a/src/fuzzy_storage.c +++ b/src/fuzzy_storage.c @@ -44,6 +44,8 @@ #define INVALID_NODE_TIME (guint64) - 1 +static const gchar *local_db_name = "local"; + /* Init functions */ gpointer init_fuzzy (struct rspamd_config *cfg); void start_fuzzy (struct rspamd_worker *worker); @@ -259,7 +261,7 @@ fuzzy_mirror_updates_to_http (struct rspamd_fuzzy_storage_ctx *ctx, guint32 rev; const gchar *p; - rev = rspamd_fuzzy_backend_version (ctx->backend); + rev = rspamd_fuzzy_backend_version (ctx->backend, local_db_name); rev = GUINT32_TO_LE (rev); len = sizeof (guint32) * 2; /* revision + last chunk */ @@ -382,7 +384,8 @@ rspamd_fuzzy_send_update_mirror (struct rspamd_fuzzy_storage_ctx *ctx, } static void -rspamd_fuzzy_process_updates_queue (struct rspamd_fuzzy_storage_ctx *ctx) +rspamd_fuzzy_process_updates_queue (struct rspamd_fuzzy_storage_ctx *ctx, + const gchar *source) { GList *cur; struct fuzzy_peer_cmd *io_cmd; @@ -394,7 +397,7 @@ rspamd_fuzzy_process_updates_queue (struct rspamd_fuzzy_storage_ctx *ctx) if (ctx->updates_pending && g_queue_get_length (ctx->updates_pending) > 0 && - rspamd_fuzzy_backend_prepare_update (ctx->backend)) { + rspamd_fuzzy_backend_prepare_update (ctx->backend, source)) { cur = ctx->updates_pending->head; while (cur) { @@ -420,7 +423,7 @@ rspamd_fuzzy_process_updates_queue (struct rspamd_fuzzy_storage_ctx *ctx) cur = g_list_next (cur); } - if (rspamd_fuzzy_backend_finish_update (ctx->backend)) { + if (rspamd_fuzzy_backend_finish_update (ctx->backend, source)) { ctx->stat.fuzzy_hashes = rspamd_fuzzy_backend_count (ctx->backend); for (i = 0; i < ctx->mirrors->len; i ++) { @@ -440,7 +443,7 @@ rspamd_fuzzy_process_updates_queue (struct rspamd_fuzzy_storage_ctx *ctx) g_queue_clear (ctx->updates_pending); msg_info ("updated fuzzy storage: %ud updates processed, version: %d", - nupdates, rspamd_fuzzy_backend_version (ctx->backend)); + nupdates, rspamd_fuzzy_backend_version (ctx->backend, source)); } else { msg_err ("cannot commit update transaction to fuzzy backend, " @@ -887,6 +890,7 @@ rspamd_fuzzy_mirror_process_update (struct fuzzy_master_update_session *session, struct rspamd_http_message *msg) { const guchar *p; + gchar *src = NULL; gsize remain; guint32 revision, our_rev, len, cnt = 0; struct fuzzy_peer_cmd cmd, *pcmd; @@ -897,12 +901,23 @@ rspamd_fuzzy_mirror_process_update (struct fuzzy_master_update_session *session, } state = read_len; GList *updates = NULL, *cur; - if (!msg->body || msg->body->len == 0) { + if (!msg->body || msg->body->len == 0 || !msg->url || msg->url->len == 0) { msg_err ("empty update message, not processing"); return; } + /* Detect source from url: /update_v1/, so we look for the last '/' */ + remain = msg->url->len; + src = rspamd_fstringdup (msg->url); + + while (remain--) { + if (src[remain] == '/') { + src = &src[remain + 1]; + break; + } + } + /* * Message format: * - revision @@ -918,11 +933,12 @@ rspamd_fuzzy_mirror_process_update (struct fuzzy_master_update_session *session, if (remain > sizeof (guint32) * 2) { memcpy (&revision, p, sizeof (guint32)); revision = GUINT32_TO_LE (revision); - our_rev = rspamd_fuzzy_backend_version (session->ctx->backend); + our_rev = rspamd_fuzzy_backend_version (session->ctx->backend, src); if (revision <= our_rev) { msg_err ("remote revision:d %d is older than ours: %d, refusing update", revision, our_rev); + g_free (src); return; } @@ -1006,11 +1022,13 @@ rspamd_fuzzy_mirror_process_update (struct fuzzy_master_update_session *session, cur->data = NULL; } - rspamd_fuzzy_process_updates_queue (session->ctx); + rspamd_fuzzy_process_updates_queue (session->ctx, src); msg_info ("processed updates from the master, %ud operations processed," " revision: %ud", cnt, revision); err: + g_free (src); + if (updates) { /* We still need to clear queue */ for (cur = updates; cur != NULL; cur = g_list_next (cur)) { @@ -1253,7 +1271,7 @@ sync_callback (gint fd, short what, void *arg) ctx = worker->ctx; if (ctx->backend) { - rspamd_fuzzy_process_updates_queue (ctx); + rspamd_fuzzy_process_updates_queue (ctx, local_db_name); /* Call backend sync */ old_expired = rspamd_fuzzy_backend_expired (ctx->backend); rspamd_fuzzy_backend_sync (ctx->backend, ctx->expire, TRUE); @@ -1287,7 +1305,7 @@ rspamd_fuzzy_storage_sync (struct rspamd_main *rspamd_main, struct rspamd_control_reply rep; if (ctx->backend) { - rspamd_fuzzy_process_updates_queue (ctx); + rspamd_fuzzy_process_updates_queue (ctx, local_db_name); /* Call backend sync */ old_expired = rspamd_fuzzy_backend_expired (ctx->backend); rspamd_fuzzy_backend_sync (ctx->backend, ctx->expire, TRUE); @@ -2112,7 +2130,7 @@ start_fuzzy (struct rspamd_worker *worker) rspamd_worker_block_signals (); if (worker->index == 0) { - rspamd_fuzzy_process_updates_queue (ctx); + rspamd_fuzzy_process_updates_queue (ctx, local_db_name); rspamd_fuzzy_backend_sync (ctx->backend, ctx->expire, TRUE); } diff --git a/src/libserver/fuzzy_backend.c b/src/libserver/fuzzy_backend.c index 520eb3c02..106e379f8 100644 --- a/src/libserver/fuzzy_backend.c +++ b/src/libserver/fuzzy_backend.c @@ -53,16 +53,20 @@ static const guint max_retries = 10; static const char *create_tables_sql = "BEGIN;" "CREATE TABLE IF NOT EXISTS digests(" - "id INTEGER PRIMARY KEY," - "flag INTEGER NOT NULL," - "digest TEXT NOT NULL," - "value INTEGER," - "time INTEGER);" + " id INTEGER PRIMARY KEY," + " flag INTEGER NOT NULL," + " digest TEXT NOT NULL," + " value INTEGER," + " time INTEGER);" "CREATE TABLE IF NOT EXISTS shingles(" - "value INTEGER NOT NULL," - "number INTEGER NOT NULL," - "digest_id INTEGER REFERENCES digests(id) ON DELETE CASCADE " - "ON UPDATE CASCADE);" + " value INTEGER NOT NULL," + " number INTEGER NOT NULL," + " digest_id INTEGER REFERENCES digests(id) ON DELETE CASCADE " + " ON UPDATE CASCADE);" + "CREATE TABLE IF NOT EXISTS sources(" + " name TEXT UNIQUE," + " version INTEGER," + " last INTEGER);" "CREATE UNIQUE INDEX IF NOT EXISTS d ON digests(digest);" "CREATE INDEX IF NOT EXISTS t ON digests(time);" "CREATE INDEX IF NOT EXISTS dgst_id ON shingles(digest_id);" @@ -93,7 +97,9 @@ enum rspamd_fuzzy_statement_idx { RSPAMD_FUZZY_BACKEND_EXPIRE, RSPAMD_FUZZY_BACKEND_VACUUM, RSPAMD_FUZZY_BACKEND_DELETE_ORPHANED, + RSPAMD_FUZZY_BACKEND_ADD_SOURCE, RSPAMD_FUZZY_BACKEND_VERSION, + RSPAMD_FUZZY_BACKEND_SET_VERSION, RSPAMD_FUZZY_BACKEND_MAX }; static struct rspamd_fuzzy_stmts { @@ -213,13 +219,27 @@ static struct rspamd_fuzzy_stmts { .stmt = NULL, .result = SQLITE_DONE }, + { + .idx = RSPAMD_FUZZY_BACKEND_ADD_SOURCE, + .sql = "INSERT OR IGNORE INTO sources(name, version, last) VALUES (?1, ?2, ?3);", + .args = "TII", + .stmt = NULL, + .result = SQLITE_DONE + }, { .idx = RSPAMD_FUZZY_BACKEND_VERSION, - .sql = "PRAGMA user_version;", - .args = "", + .sql = "SELECT version FROM sources WHERE name=?1;", + .args = "T", .stmt = NULL, .result = SQLITE_ROW }, + { + .idx = RSPAMD_FUZZY_BACKEND_SET_VERSION, + .sql = "UPDATE sources SET version=?1, last=?2 WHERE name=?3;", + .args = "IIT", + .stmt = NULL, + .result = SQLITE_DONE + }, }; static GQuark @@ -610,7 +630,8 @@ rspamd_fuzzy_backend_check (struct rspamd_fuzzy_backend *backend, } gboolean -rspamd_fuzzy_backend_prepare_update (struct rspamd_fuzzy_backend *backend) +rspamd_fuzzy_backend_prepare_update (struct rspamd_fuzzy_backend *backend, + const gchar *source) { gint rc; @@ -729,42 +750,47 @@ rspamd_fuzzy_backend_add (struct rspamd_fuzzy_backend *backend, } gboolean -rspamd_fuzzy_backend_finish_update (struct rspamd_fuzzy_backend *backend) +rspamd_fuzzy_backend_finish_update (struct rspamd_fuzzy_backend *backend, + const gchar *source) { gint rc, wal_frames, wal_checkpointed, ver; - gint64 version = 0; - gchar version_buf[128]; + + /* Get and update version */ + ver = rspamd_fuzzy_backend_version (backend, source); + ++ver; rc = rspamd_fuzzy_backend_run_stmt (backend, TRUE, - RSPAMD_FUZZY_BACKEND_TRANSACTION_COMMIT); + RSPAMD_FUZZY_BACKEND_SET_VERSION, + (gint64)ver, (gint64)time (NULL), source); - if (rc != SQLITE_OK) { - msg_warn_fuzzy_backend ("cannot commit updates: %s", - sqlite3_errmsg (backend->db)); - rspamd_fuzzy_backend_run_stmt (backend, TRUE, - RSPAMD_FUZZY_BACKEND_TRANSACTION_ROLLBACK); - return FALSE; - } - else { - if (!rspamd_sqlite3_sync (backend->db, &wal_frames, &wal_checkpointed)) { - msg_warn_fuzzy_backend ("cannot commit checkpoint: %s", + if (rc == SQLITE_OK) { + rc = rspamd_fuzzy_backend_run_stmt (backend, TRUE, + RSPAMD_FUZZY_BACKEND_TRANSACTION_COMMIT); + + if (rc != SQLITE_OK) { + msg_warn_fuzzy_backend ("cannot commit updates: %s", sqlite3_errmsg (backend->db)); + rspamd_fuzzy_backend_run_stmt (backend, TRUE, + RSPAMD_FUZZY_BACKEND_TRANSACTION_ROLLBACK); + return FALSE; } - else if (wal_checkpointed > 0) { - msg_info_fuzzy_backend ("total number of frames in the wal file: " - "%d, checkpointed: %d", wal_frames, wal_checkpointed); + else { + if (!rspamd_sqlite3_sync (backend->db, &wal_frames, &wal_checkpointed)) { + msg_warn_fuzzy_backend ("cannot commit checkpoint: %s", + sqlite3_errmsg (backend->db)); + } + else if (wal_checkpointed > 0) { + msg_info_fuzzy_backend ("total number of frames in the wal file: " + "%d, checkpointed: %d", wal_frames, wal_checkpointed); + } } } - - /* Get and update version */ - ver = rspamd_fuzzy_backend_version (backend); - ++ver; - rspamd_snprintf (version_buf, sizeof (version_buf), "PRAGMA user_version=%d;", - ver); - - if (sqlite3_exec (backend->db, version_buf, NULL, NULL, NULL) != SQLITE_OK) { - msg_err_fuzzy_backend ("cannot set database version to %L: %s", - version, sqlite3_errmsg (backend->db)); + else { + msg_warn_fuzzy_backend ("cannot update version for %s: %s", source, + sqlite3_errmsg (backend->db)); + rspamd_fuzzy_backend_run_stmt (backend, TRUE, + RSPAMD_FUZZY_BACKEND_TRANSACTION_ROLLBACK); + return FALSE; } return TRUE; @@ -976,13 +1002,14 @@ rspamd_fuzzy_backend_count (struct rspamd_fuzzy_backend *backend) } gint -rspamd_fuzzy_backend_version (struct rspamd_fuzzy_backend *backend) +rspamd_fuzzy_backend_version (struct rspamd_fuzzy_backend *backend, + const gchar *source) { - gint ret = 0; + gint ret = -1; if (backend) { if (rspamd_fuzzy_backend_run_stmt (backend, FALSE, - RSPAMD_FUZZY_BACKEND_VERSION) == SQLITE_OK) { + RSPAMD_FUZZY_BACKEND_VERSION, source) == SQLITE_OK) { ret = sqlite3_column_int64 ( prepared_stmts[RSPAMD_FUZZY_BACKEND_VERSION].stmt, 0); } diff --git a/src/libserver/fuzzy_backend.h b/src/libserver/fuzzy_backend.h index bcd199d1a..91a613f2a 100644 --- a/src/libserver/fuzzy_backend.h +++ b/src/libserver/fuzzy_backend.h @@ -46,7 +46,8 @@ struct rspamd_fuzzy_reply rspamd_fuzzy_backend_check ( /** * Prepare storage for updates (by starting transaction) */ -gboolean rspamd_fuzzy_backend_prepare_update (struct rspamd_fuzzy_backend *backend); +gboolean rspamd_fuzzy_backend_prepare_update (struct rspamd_fuzzy_backend *backend, + const gchar *source); /** * Add digest to the database @@ -72,7 +73,8 @@ gboolean rspamd_fuzzy_backend_del ( /** * Commit updates to storage */ -gboolean rspamd_fuzzy_backend_finish_update (struct rspamd_fuzzy_backend *backend); +gboolean rspamd_fuzzy_backend_finish_update (struct rspamd_fuzzy_backend *backend, + const gchar *source); /** * Sync storage @@ -90,7 +92,7 @@ gboolean rspamd_fuzzy_backend_sync (struct rspamd_fuzzy_backend *backend, void rspamd_fuzzy_backend_close (struct rspamd_fuzzy_backend *backend); gsize rspamd_fuzzy_backend_count (struct rspamd_fuzzy_backend *backend); -gint rspamd_fuzzy_backend_version (struct rspamd_fuzzy_backend *backend); +gint rspamd_fuzzy_backend_version (struct rspamd_fuzzy_backend *backend, const gchar *source); gsize rspamd_fuzzy_backend_expired (struct rspamd_fuzzy_backend *backend); const gchar * rspamd_fuzzy_backend_id (struct rspamd_fuzzy_backend *backend); -- cgit v1.2.3