]> source.dussan.org Git - rspamd.git/commitdiff
[Feature] Implement multiple-sources fuzzy storage
authorVsevolod Stakhov <vsevolod@highsecure.ru>
Mon, 23 May 2016 17:16:11 +0000 (18:16 +0100)
committerVsevolod Stakhov <vsevolod@highsecure.ru>
Mon, 23 May 2016 17:16:11 +0000 (18:16 +0100)
src/fuzzy_storage.c
src/libserver/fuzzy_backend.c
src/libserver/fuzzy_backend.h

index b233fccd6ab1b74df2a1c17feb7c01d70169e92a..6c7289fee235661d702ef1e73061f492cca6fa6d 100644 (file)
@@ -44,6 +44,8 @@
 
 #define INVALID_NODE_TIME (guint64) - 1
 
+static const gchar *local_db_name = "local";
+
 /* Init functions */
 gpointer init_fuzzy (struct rspamd_config *cfg);
 void start_fuzzy (struct rspamd_worker *worker);
@@ -259,7 +261,7 @@ fuzzy_mirror_updates_to_http (struct rspamd_fuzzy_storage_ctx *ctx,
        guint32 rev;
        const gchar *p;
 
-       rev = rspamd_fuzzy_backend_version (ctx->backend);
+       rev = rspamd_fuzzy_backend_version (ctx->backend, local_db_name);
        rev = GUINT32_TO_LE (rev);
        len = sizeof (guint32) * 2; /* revision + last chunk */
 
@@ -382,7 +384,8 @@ rspamd_fuzzy_send_update_mirror (struct rspamd_fuzzy_storage_ctx *ctx,
 }
 
 static void
-rspamd_fuzzy_process_updates_queue (struct rspamd_fuzzy_storage_ctx *ctx)
+rspamd_fuzzy_process_updates_queue (struct rspamd_fuzzy_storage_ctx *ctx,
+               const gchar *source)
 {
        GList *cur;
        struct fuzzy_peer_cmd *io_cmd;
@@ -394,7 +397,7 @@ rspamd_fuzzy_process_updates_queue (struct rspamd_fuzzy_storage_ctx *ctx)
 
        if (ctx->updates_pending &&
                        g_queue_get_length (ctx->updates_pending) > 0 &&
-                       rspamd_fuzzy_backend_prepare_update (ctx->backend)) {
+                       rspamd_fuzzy_backend_prepare_update (ctx->backend, source)) {
                cur = ctx->updates_pending->head;
 
                while (cur) {
@@ -420,7 +423,7 @@ rspamd_fuzzy_process_updates_queue (struct rspamd_fuzzy_storage_ctx *ctx)
                        cur = g_list_next (cur);
                }
 
-               if (rspamd_fuzzy_backend_finish_update (ctx->backend)) {
+               if (rspamd_fuzzy_backend_finish_update (ctx->backend, source)) {
                        ctx->stat.fuzzy_hashes = rspamd_fuzzy_backend_count (ctx->backend);
 
                        for (i = 0; i < ctx->mirrors->len; i ++) {
@@ -440,7 +443,7 @@ rspamd_fuzzy_process_updates_queue (struct rspamd_fuzzy_storage_ctx *ctx)
 
                        g_queue_clear (ctx->updates_pending);
                        msg_info ("updated fuzzy storage: %ud updates processed, version: %d",
-                                       nupdates, rspamd_fuzzy_backend_version (ctx->backend));
+                                       nupdates, rspamd_fuzzy_backend_version (ctx->backend, source));
                }
                else {
                        msg_err ("cannot commit update transaction to fuzzy backend, "
@@ -887,6 +890,7 @@ rspamd_fuzzy_mirror_process_update (struct fuzzy_master_update_session *session,
                struct rspamd_http_message *msg)
 {
        const guchar *p;
+       gchar *src = NULL;
        gsize remain;
        guint32 revision, our_rev, len, cnt = 0;
        struct fuzzy_peer_cmd cmd, *pcmd;
@@ -897,12 +901,23 @@ rspamd_fuzzy_mirror_process_update (struct fuzzy_master_update_session *session,
        } state = read_len;
        GList *updates = NULL, *cur;
 
-       if (!msg->body || msg->body->len == 0) {
+       if (!msg->body || msg->body->len == 0 || !msg->url || msg->url->len == 0) {
                msg_err ("empty update message, not processing");
 
                return;
        }
 
+       /* Detect source from url: /update_v1/<source>, so we look for the last '/' */
+       remain = msg->url->len;
+       src = rspamd_fstringdup (msg->url);
+
+       while (remain--) {
+               if (src[remain] == '/') {
+                       src = &src[remain + 1];
+                       break;
+               }
+       }
+
        /*
         * Message format:
         * <uint32_le> - revision
@@ -918,11 +933,12 @@ rspamd_fuzzy_mirror_process_update (struct fuzzy_master_update_session *session,
        if (remain > sizeof (guint32) * 2) {
                memcpy (&revision, p, sizeof (guint32));
                revision = GUINT32_TO_LE (revision);
-               our_rev = rspamd_fuzzy_backend_version (session->ctx->backend);
+               our_rev = rspamd_fuzzy_backend_version (session->ctx->backend, src);
 
                if (revision <= our_rev) {
                        msg_err ("remote revision:d %d is older than ours: %d, refusing update",
                                        revision, our_rev);
+                       g_free (src);
 
                        return;
                }
@@ -1006,11 +1022,13 @@ rspamd_fuzzy_mirror_process_update (struct fuzzy_master_update_session *session,
                cur->data = NULL;
        }
 
-       rspamd_fuzzy_process_updates_queue (session->ctx);
+       rspamd_fuzzy_process_updates_queue (session->ctx, src);
        msg_info ("processed updates from the master, %ud operations processed,"
                        " revision: %ud", cnt, revision);
 
 err:
+       g_free (src);
+
        if (updates) {
                /* We still need to clear queue */
                for (cur = updates; cur != NULL; cur = g_list_next (cur)) {
@@ -1253,7 +1271,7 @@ sync_callback (gint fd, short what, void *arg)
        ctx = worker->ctx;
 
        if (ctx->backend) {
-               rspamd_fuzzy_process_updates_queue (ctx);
+               rspamd_fuzzy_process_updates_queue (ctx, local_db_name);
                /* Call backend sync */
                old_expired = rspamd_fuzzy_backend_expired (ctx->backend);
                rspamd_fuzzy_backend_sync (ctx->backend, ctx->expire, TRUE);
@@ -1287,7 +1305,7 @@ rspamd_fuzzy_storage_sync (struct rspamd_main *rspamd_main,
        struct rspamd_control_reply rep;
 
        if (ctx->backend) {
-               rspamd_fuzzy_process_updates_queue (ctx);
+               rspamd_fuzzy_process_updates_queue (ctx, local_db_name);
                /* Call backend sync */
                old_expired = rspamd_fuzzy_backend_expired (ctx->backend);
                rspamd_fuzzy_backend_sync (ctx->backend, ctx->expire, TRUE);
@@ -2112,7 +2130,7 @@ start_fuzzy (struct rspamd_worker *worker)
        rspamd_worker_block_signals ();
 
        if (worker->index == 0) {
-               rspamd_fuzzy_process_updates_queue (ctx);
+               rspamd_fuzzy_process_updates_queue (ctx, local_db_name);
                rspamd_fuzzy_backend_sync (ctx->backend, ctx->expire, TRUE);
        }
 
index 520eb3c0221f91cdb7211d346207d1d25ceac249..106e379f8f8164577e55a61490e82b2832c73507 100644 (file)
@@ -53,16 +53,20 @@ static const guint max_retries = 10;
 static const char *create_tables_sql =
                "BEGIN;"
                "CREATE TABLE IF NOT EXISTS digests("
-               "id INTEGER PRIMARY KEY,"
-               "flag INTEGER NOT NULL,"
-               "digest TEXT NOT NULL,"
-               "value INTEGER,"
-               "time INTEGER);"
+               "       id INTEGER PRIMARY KEY,"
+               "       flag INTEGER NOT NULL,"
+               "       digest TEXT NOT NULL,"
+               "       value INTEGER,"
+               "       time INTEGER);"
                "CREATE TABLE IF NOT EXISTS shingles("
-               "value INTEGER NOT NULL,"
-               "number INTEGER NOT NULL,"
-               "digest_id INTEGER REFERENCES digests(id) ON DELETE CASCADE "
-               "ON UPDATE CASCADE);"
+               "       value INTEGER NOT NULL,"
+               "       number INTEGER NOT NULL,"
+               "       digest_id INTEGER REFERENCES digests(id) ON DELETE CASCADE "
+               "       ON UPDATE CASCADE);"
+               "CREATE TABLE IF NOT EXISTS sources("
+               "       name TEXT UNIQUE,"
+               "       version INTEGER,"
+               "       last INTEGER);"
                "CREATE UNIQUE INDEX IF NOT EXISTS d ON digests(digest);"
                "CREATE INDEX IF NOT EXISTS t ON digests(time);"
                "CREATE INDEX IF NOT EXISTS dgst_id ON shingles(digest_id);"
@@ -93,7 +97,9 @@ enum rspamd_fuzzy_statement_idx {
        RSPAMD_FUZZY_BACKEND_EXPIRE,
        RSPAMD_FUZZY_BACKEND_VACUUM,
        RSPAMD_FUZZY_BACKEND_DELETE_ORPHANED,
+       RSPAMD_FUZZY_BACKEND_ADD_SOURCE,
        RSPAMD_FUZZY_BACKEND_VERSION,
+       RSPAMD_FUZZY_BACKEND_SET_VERSION,
        RSPAMD_FUZZY_BACKEND_MAX
 };
 static struct rspamd_fuzzy_stmts {
@@ -213,13 +219,27 @@ static struct rspamd_fuzzy_stmts {
                .stmt = NULL,
                .result = SQLITE_DONE
        },
+       {
+               .idx = RSPAMD_FUZZY_BACKEND_ADD_SOURCE,
+               .sql = "INSERT OR IGNORE INTO sources(name, version, last) VALUES (?1, ?2, ?3);",
+               .args = "TII",
+               .stmt = NULL,
+               .result = SQLITE_DONE
+       },
        {
                .idx = RSPAMD_FUZZY_BACKEND_VERSION,
-               .sql = "PRAGMA user_version;",
-               .args = "",
+               .sql = "SELECT version FROM sources WHERE name=?1;",
+               .args = "T",
                .stmt = NULL,
                .result = SQLITE_ROW
        },
+       {
+               .idx = RSPAMD_FUZZY_BACKEND_SET_VERSION,
+               .sql = "UPDATE sources SET version=?1, last=?2 WHERE name=?3;",
+               .args = "IIT",
+               .stmt = NULL,
+               .result = SQLITE_DONE
+       },
 };
 
 static GQuark
@@ -610,7 +630,8 @@ rspamd_fuzzy_backend_check (struct rspamd_fuzzy_backend *backend,
 }
 
 gboolean
-rspamd_fuzzy_backend_prepare_update (struct rspamd_fuzzy_backend *backend)
+rspamd_fuzzy_backend_prepare_update (struct rspamd_fuzzy_backend *backend,
+               const gchar *source)
 {
        gint rc;
 
@@ -729,42 +750,47 @@ rspamd_fuzzy_backend_add (struct rspamd_fuzzy_backend *backend,
 }
 
 gboolean
-rspamd_fuzzy_backend_finish_update (struct rspamd_fuzzy_backend *backend)
+rspamd_fuzzy_backend_finish_update (struct rspamd_fuzzy_backend *backend,
+               const gchar *source)
 {
        gint rc, wal_frames, wal_checkpointed, ver;
-       gint64 version = 0;
-       gchar version_buf[128];
+
+       /* Get and update version */
+       ver = rspamd_fuzzy_backend_version (backend, source);
+       ++ver;
 
        rc = rspamd_fuzzy_backend_run_stmt (backend, TRUE,
-                       RSPAMD_FUZZY_BACKEND_TRANSACTION_COMMIT);
+                               RSPAMD_FUZZY_BACKEND_SET_VERSION,
+                               (gint64)ver, (gint64)time (NULL), source);
 
-       if (rc != SQLITE_OK) {
-               msg_warn_fuzzy_backend ("cannot commit updates: %s",
-                               sqlite3_errmsg (backend->db));
-               rspamd_fuzzy_backend_run_stmt (backend, TRUE,
-                               RSPAMD_FUZZY_BACKEND_TRANSACTION_ROLLBACK);
-               return FALSE;
-       }
-       else {
-               if (!rspamd_sqlite3_sync (backend->db, &wal_frames, &wal_checkpointed)) {
-                       msg_warn_fuzzy_backend ("cannot commit checkpoint: %s",
+       if (rc == SQLITE_OK) {
+               rc = rspamd_fuzzy_backend_run_stmt (backend, TRUE,
+                               RSPAMD_FUZZY_BACKEND_TRANSACTION_COMMIT);
+
+               if (rc != SQLITE_OK) {
+                       msg_warn_fuzzy_backend ("cannot commit updates: %s",
                                        sqlite3_errmsg (backend->db));
+                       rspamd_fuzzy_backend_run_stmt (backend, TRUE,
+                                       RSPAMD_FUZZY_BACKEND_TRANSACTION_ROLLBACK);
+                       return FALSE;
                }
-               else if (wal_checkpointed > 0) {
-                       msg_info_fuzzy_backend ("total number of frames in the wal file: "
-                                       "%d, checkpointed: %d", wal_frames, wal_checkpointed);
+               else {
+                       if (!rspamd_sqlite3_sync (backend->db, &wal_frames, &wal_checkpointed)) {
+                               msg_warn_fuzzy_backend ("cannot commit checkpoint: %s",
+                                               sqlite3_errmsg (backend->db));
+                       }
+                       else if (wal_checkpointed > 0) {
+                               msg_info_fuzzy_backend ("total number of frames in the wal file: "
+                                               "%d, checkpointed: %d", wal_frames, wal_checkpointed);
+                       }
                }
        }
-
-       /* Get and update version */
-       ver = rspamd_fuzzy_backend_version (backend);
-       ++ver;
-       rspamd_snprintf (version_buf, sizeof (version_buf), "PRAGMA user_version=%d;",
-                       ver);
-
-       if (sqlite3_exec (backend->db, version_buf, NULL, NULL, NULL) != SQLITE_OK) {
-                       msg_err_fuzzy_backend ("cannot set database version to %L: %s",
-                                       version, sqlite3_errmsg (backend->db));
+       else {
+               msg_warn_fuzzy_backend ("cannot update version for %s: %s", source,
+                               sqlite3_errmsg (backend->db));
+               rspamd_fuzzy_backend_run_stmt (backend, TRUE,
+                               RSPAMD_FUZZY_BACKEND_TRANSACTION_ROLLBACK);
+               return FALSE;
        }
 
        return TRUE;
@@ -976,13 +1002,14 @@ rspamd_fuzzy_backend_count (struct rspamd_fuzzy_backend *backend)
 }
 
 gint
-rspamd_fuzzy_backend_version (struct rspamd_fuzzy_backend *backend)
+rspamd_fuzzy_backend_version (struct rspamd_fuzzy_backend *backend,
+               const gchar *source)
 {
-       gint ret = 0;
+       gint ret = -1;
 
        if (backend) {
                if (rspamd_fuzzy_backend_run_stmt (backend, FALSE,
-                               RSPAMD_FUZZY_BACKEND_VERSION) == SQLITE_OK) {
+                               RSPAMD_FUZZY_BACKEND_VERSION, source) == SQLITE_OK) {
                        ret = sqlite3_column_int64 (
                                        prepared_stmts[RSPAMD_FUZZY_BACKEND_VERSION].stmt, 0);
                }
index bcd199d1a4a1b4958fdd328102052a22ed3629a8..91a613f2a836b9a89ea3e0e9aedd7b7956167db3 100644 (file)
@@ -46,7 +46,8 @@ struct rspamd_fuzzy_reply rspamd_fuzzy_backend_check (
 /**
  * Prepare storage for updates (by starting transaction)
  */
-gboolean rspamd_fuzzy_backend_prepare_update (struct rspamd_fuzzy_backend *backend);
+gboolean rspamd_fuzzy_backend_prepare_update (struct rspamd_fuzzy_backend *backend,
+               const gchar *source);
 
 /**
  * Add digest to the database
@@ -72,7 +73,8 @@ gboolean rspamd_fuzzy_backend_del (
 /**
  * Commit updates to storage
  */
-gboolean rspamd_fuzzy_backend_finish_update (struct rspamd_fuzzy_backend *backend);
+gboolean rspamd_fuzzy_backend_finish_update (struct rspamd_fuzzy_backend *backend,
+               const gchar *source);
 
 /**
  * Sync storage
@@ -90,7 +92,7 @@ gboolean rspamd_fuzzy_backend_sync (struct rspamd_fuzzy_backend *backend,
 void rspamd_fuzzy_backend_close (struct rspamd_fuzzy_backend *backend);
 
 gsize rspamd_fuzzy_backend_count (struct rspamd_fuzzy_backend *backend);
-gint rspamd_fuzzy_backend_version (struct rspamd_fuzzy_backend *backend);
+gint rspamd_fuzzy_backend_version (struct rspamd_fuzzy_backend *backend, const gchar *source);
 gsize rspamd_fuzzy_backend_expired (struct rspamd_fuzzy_backend *backend);
 
 const gchar * rspamd_fuzzy_backend_id (struct rspamd_fuzzy_backend *backend);