aboutsummaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
authorVsevolod Stakhov <vsevolod@highsecure.ru>2016-05-23 18:16:11 +0100
committerVsevolod Stakhov <vsevolod@highsecure.ru>2016-05-23 18:16:11 +0100
commitf3b08af57c535bf632b600ef7b4d53eae3306744 (patch)
tree158b75c04aa1fc3363271cd6e6d83a730677bc71 /src
parent163f058f1fdf93c7b557d6eb95f4d245337c8350 (diff)
downloadrspamd-f3b08af57c535bf632b600ef7b4d53eae3306744.tar.gz
rspamd-f3b08af57c535bf632b600ef7b4d53eae3306744.zip
[Feature] Implement multiple-sources fuzzy storage
Diffstat (limited to 'src')
-rw-r--r--src/fuzzy_storage.c40
-rw-r--r--src/libserver/fuzzy_backend.c111
-rw-r--r--src/libserver/fuzzy_backend.h8
3 files changed, 103 insertions, 56 deletions
diff --git a/src/fuzzy_storage.c b/src/fuzzy_storage.c
index b233fccd6..6c7289fee 100644
--- a/src/fuzzy_storage.c
+++ b/src/fuzzy_storage.c
@@ -44,6 +44,8 @@
#define INVALID_NODE_TIME (guint64) - 1
+static const gchar *local_db_name = "local";
+
/* Init functions */
gpointer init_fuzzy (struct rspamd_config *cfg);
void start_fuzzy (struct rspamd_worker *worker);
@@ -259,7 +261,7 @@ fuzzy_mirror_updates_to_http (struct rspamd_fuzzy_storage_ctx *ctx,
guint32 rev;
const gchar *p;
- rev = rspamd_fuzzy_backend_version (ctx->backend);
+ rev = rspamd_fuzzy_backend_version (ctx->backend, local_db_name);
rev = GUINT32_TO_LE (rev);
len = sizeof (guint32) * 2; /* revision + last chunk */
@@ -382,7 +384,8 @@ rspamd_fuzzy_send_update_mirror (struct rspamd_fuzzy_storage_ctx *ctx,
}
static void
-rspamd_fuzzy_process_updates_queue (struct rspamd_fuzzy_storage_ctx *ctx)
+rspamd_fuzzy_process_updates_queue (struct rspamd_fuzzy_storage_ctx *ctx,
+ const gchar *source)
{
GList *cur;
struct fuzzy_peer_cmd *io_cmd;
@@ -394,7 +397,7 @@ rspamd_fuzzy_process_updates_queue (struct rspamd_fuzzy_storage_ctx *ctx)
if (ctx->updates_pending &&
g_queue_get_length (ctx->updates_pending) > 0 &&
- rspamd_fuzzy_backend_prepare_update (ctx->backend)) {
+ rspamd_fuzzy_backend_prepare_update (ctx->backend, source)) {
cur = ctx->updates_pending->head;
while (cur) {
@@ -420,7 +423,7 @@ rspamd_fuzzy_process_updates_queue (struct rspamd_fuzzy_storage_ctx *ctx)
cur = g_list_next (cur);
}
- if (rspamd_fuzzy_backend_finish_update (ctx->backend)) {
+ if (rspamd_fuzzy_backend_finish_update (ctx->backend, source)) {
ctx->stat.fuzzy_hashes = rspamd_fuzzy_backend_count (ctx->backend);
for (i = 0; i < ctx->mirrors->len; i ++) {
@@ -440,7 +443,7 @@ rspamd_fuzzy_process_updates_queue (struct rspamd_fuzzy_storage_ctx *ctx)
g_queue_clear (ctx->updates_pending);
msg_info ("updated fuzzy storage: %ud updates processed, version: %d",
- nupdates, rspamd_fuzzy_backend_version (ctx->backend));
+ nupdates, rspamd_fuzzy_backend_version (ctx->backend, source));
}
else {
msg_err ("cannot commit update transaction to fuzzy backend, "
@@ -887,6 +890,7 @@ rspamd_fuzzy_mirror_process_update (struct fuzzy_master_update_session *session,
struct rspamd_http_message *msg)
{
const guchar *p;
+ gchar *src = NULL;
gsize remain;
guint32 revision, our_rev, len, cnt = 0;
struct fuzzy_peer_cmd cmd, *pcmd;
@@ -897,12 +901,23 @@ rspamd_fuzzy_mirror_process_update (struct fuzzy_master_update_session *session,
} state = read_len;
GList *updates = NULL, *cur;
- if (!msg->body || msg->body->len == 0) {
+ if (!msg->body || msg->body->len == 0 || !msg->url || msg->url->len == 0) {
msg_err ("empty update message, not processing");
return;
}
+ /* Detect source from url: /update_v1/<source>, so we look for the last '/' */
+ remain = msg->url->len;
+ src = rspamd_fstringdup (msg->url);
+
+ while (remain--) {
+ if (src[remain] == '/') {
+ src = &src[remain + 1];
+ break;
+ }
+ }
+
/*
* Message format:
* <uint32_le> - revision
@@ -918,11 +933,12 @@ rspamd_fuzzy_mirror_process_update (struct fuzzy_master_update_session *session,
if (remain > sizeof (guint32) * 2) {
memcpy (&revision, p, sizeof (guint32));
revision = GUINT32_TO_LE (revision);
- our_rev = rspamd_fuzzy_backend_version (session->ctx->backend);
+ our_rev = rspamd_fuzzy_backend_version (session->ctx->backend, src);
if (revision <= our_rev) {
msg_err ("remote revision:d %d is older than ours: %d, refusing update",
revision, our_rev);
+ g_free (src);
return;
}
@@ -1006,11 +1022,13 @@ rspamd_fuzzy_mirror_process_update (struct fuzzy_master_update_session *session,
cur->data = NULL;
}
- rspamd_fuzzy_process_updates_queue (session->ctx);
+ rspamd_fuzzy_process_updates_queue (session->ctx, src);
msg_info ("processed updates from the master, %ud operations processed,"
" revision: %ud", cnt, revision);
err:
+ g_free (src);
+
if (updates) {
/* We still need to clear queue */
for (cur = updates; cur != NULL; cur = g_list_next (cur)) {
@@ -1253,7 +1271,7 @@ sync_callback (gint fd, short what, void *arg)
ctx = worker->ctx;
if (ctx->backend) {
- rspamd_fuzzy_process_updates_queue (ctx);
+ rspamd_fuzzy_process_updates_queue (ctx, local_db_name);
/* Call backend sync */
old_expired = rspamd_fuzzy_backend_expired (ctx->backend);
rspamd_fuzzy_backend_sync (ctx->backend, ctx->expire, TRUE);
@@ -1287,7 +1305,7 @@ rspamd_fuzzy_storage_sync (struct rspamd_main *rspamd_main,
struct rspamd_control_reply rep;
if (ctx->backend) {
- rspamd_fuzzy_process_updates_queue (ctx);
+ rspamd_fuzzy_process_updates_queue (ctx, local_db_name);
/* Call backend sync */
old_expired = rspamd_fuzzy_backend_expired (ctx->backend);
rspamd_fuzzy_backend_sync (ctx->backend, ctx->expire, TRUE);
@@ -2112,7 +2130,7 @@ start_fuzzy (struct rspamd_worker *worker)
rspamd_worker_block_signals ();
if (worker->index == 0) {
- rspamd_fuzzy_process_updates_queue (ctx);
+ rspamd_fuzzy_process_updates_queue (ctx, local_db_name);
rspamd_fuzzy_backend_sync (ctx->backend, ctx->expire, TRUE);
}
diff --git a/src/libserver/fuzzy_backend.c b/src/libserver/fuzzy_backend.c
index 520eb3c02..106e379f8 100644
--- a/src/libserver/fuzzy_backend.c
+++ b/src/libserver/fuzzy_backend.c
@@ -53,16 +53,20 @@ static const guint max_retries = 10;
static const char *create_tables_sql =
"BEGIN;"
"CREATE TABLE IF NOT EXISTS digests("
- "id INTEGER PRIMARY KEY,"
- "flag INTEGER NOT NULL,"
- "digest TEXT NOT NULL,"
- "value INTEGER,"
- "time INTEGER);"
+ " id INTEGER PRIMARY KEY,"
+ " flag INTEGER NOT NULL,"
+ " digest TEXT NOT NULL,"
+ " value INTEGER,"
+ " time INTEGER);"
"CREATE TABLE IF NOT EXISTS shingles("
- "value INTEGER NOT NULL,"
- "number INTEGER NOT NULL,"
- "digest_id INTEGER REFERENCES digests(id) ON DELETE CASCADE "
- "ON UPDATE CASCADE);"
+ " value INTEGER NOT NULL,"
+ " number INTEGER NOT NULL,"
+ " digest_id INTEGER REFERENCES digests(id) ON DELETE CASCADE "
+ " ON UPDATE CASCADE);"
+ "CREATE TABLE IF NOT EXISTS sources("
+ " name TEXT UNIQUE,"
+ " version INTEGER,"
+ " last INTEGER);"
"CREATE UNIQUE INDEX IF NOT EXISTS d ON digests(digest);"
"CREATE INDEX IF NOT EXISTS t ON digests(time);"
"CREATE INDEX IF NOT EXISTS dgst_id ON shingles(digest_id);"
@@ -93,7 +97,9 @@ enum rspamd_fuzzy_statement_idx {
RSPAMD_FUZZY_BACKEND_EXPIRE,
RSPAMD_FUZZY_BACKEND_VACUUM,
RSPAMD_FUZZY_BACKEND_DELETE_ORPHANED,
+ RSPAMD_FUZZY_BACKEND_ADD_SOURCE,
RSPAMD_FUZZY_BACKEND_VERSION,
+ RSPAMD_FUZZY_BACKEND_SET_VERSION,
RSPAMD_FUZZY_BACKEND_MAX
};
static struct rspamd_fuzzy_stmts {
@@ -214,12 +220,26 @@ static struct rspamd_fuzzy_stmts {
.result = SQLITE_DONE
},
{
+ .idx = RSPAMD_FUZZY_BACKEND_ADD_SOURCE,
+ .sql = "INSERT OR IGNORE INTO sources(name, version, last) VALUES (?1, ?2, ?3);",
+ .args = "TII",
+ .stmt = NULL,
+ .result = SQLITE_DONE
+ },
+ {
.idx = RSPAMD_FUZZY_BACKEND_VERSION,
- .sql = "PRAGMA user_version;",
- .args = "",
+ .sql = "SELECT version FROM sources WHERE name=?1;",
+ .args = "T",
.stmt = NULL,
.result = SQLITE_ROW
},
+ {
+ .idx = RSPAMD_FUZZY_BACKEND_SET_VERSION,
+ .sql = "UPDATE sources SET version=?1, last=?2 WHERE name=?3;",
+ .args = "IIT",
+ .stmt = NULL,
+ .result = SQLITE_DONE
+ },
};
static GQuark
@@ -610,7 +630,8 @@ rspamd_fuzzy_backend_check (struct rspamd_fuzzy_backend *backend,
}
gboolean
-rspamd_fuzzy_backend_prepare_update (struct rspamd_fuzzy_backend *backend)
+rspamd_fuzzy_backend_prepare_update (struct rspamd_fuzzy_backend *backend,
+ const gchar *source)
{
gint rc;
@@ -729,42 +750,47 @@ rspamd_fuzzy_backend_add (struct rspamd_fuzzy_backend *backend,
}
gboolean
-rspamd_fuzzy_backend_finish_update (struct rspamd_fuzzy_backend *backend)
+rspamd_fuzzy_backend_finish_update (struct rspamd_fuzzy_backend *backend,
+ const gchar *source)
{
gint rc, wal_frames, wal_checkpointed, ver;
- gint64 version = 0;
- gchar version_buf[128];
+
+ /* Get and update version */
+ ver = rspamd_fuzzy_backend_version (backend, source);
+ ++ver;
rc = rspamd_fuzzy_backend_run_stmt (backend, TRUE,
- RSPAMD_FUZZY_BACKEND_TRANSACTION_COMMIT);
+ RSPAMD_FUZZY_BACKEND_SET_VERSION,
+ (gint64)ver, (gint64)time (NULL), source);
- if (rc != SQLITE_OK) {
- msg_warn_fuzzy_backend ("cannot commit updates: %s",
- sqlite3_errmsg (backend->db));
- rspamd_fuzzy_backend_run_stmt (backend, TRUE,
- RSPAMD_FUZZY_BACKEND_TRANSACTION_ROLLBACK);
- return FALSE;
- }
- else {
- if (!rspamd_sqlite3_sync (backend->db, &wal_frames, &wal_checkpointed)) {
- msg_warn_fuzzy_backend ("cannot commit checkpoint: %s",
+ if (rc == SQLITE_OK) {
+ rc = rspamd_fuzzy_backend_run_stmt (backend, TRUE,
+ RSPAMD_FUZZY_BACKEND_TRANSACTION_COMMIT);
+
+ if (rc != SQLITE_OK) {
+ msg_warn_fuzzy_backend ("cannot commit updates: %s",
sqlite3_errmsg (backend->db));
+ rspamd_fuzzy_backend_run_stmt (backend, TRUE,
+ RSPAMD_FUZZY_BACKEND_TRANSACTION_ROLLBACK);
+ return FALSE;
}
- else if (wal_checkpointed > 0) {
- msg_info_fuzzy_backend ("total number of frames in the wal file: "
- "%d, checkpointed: %d", wal_frames, wal_checkpointed);
+ else {
+ if (!rspamd_sqlite3_sync (backend->db, &wal_frames, &wal_checkpointed)) {
+ msg_warn_fuzzy_backend ("cannot commit checkpoint: %s",
+ sqlite3_errmsg (backend->db));
+ }
+ else if (wal_checkpointed > 0) {
+ msg_info_fuzzy_backend ("total number of frames in the wal file: "
+ "%d, checkpointed: %d", wal_frames, wal_checkpointed);
+ }
}
}
-
- /* Get and update version */
- ver = rspamd_fuzzy_backend_version (backend);
- ++ver;
- rspamd_snprintf (version_buf, sizeof (version_buf), "PRAGMA user_version=%d;",
- ver);
-
- if (sqlite3_exec (backend->db, version_buf, NULL, NULL, NULL) != SQLITE_OK) {
- msg_err_fuzzy_backend ("cannot set database version to %L: %s",
- version, sqlite3_errmsg (backend->db));
+ else {
+ msg_warn_fuzzy_backend ("cannot update version for %s: %s", source,
+ sqlite3_errmsg (backend->db));
+ rspamd_fuzzy_backend_run_stmt (backend, TRUE,
+ RSPAMD_FUZZY_BACKEND_TRANSACTION_ROLLBACK);
+ return FALSE;
}
return TRUE;
@@ -976,13 +1002,14 @@ rspamd_fuzzy_backend_count (struct rspamd_fuzzy_backend *backend)
}
gint
-rspamd_fuzzy_backend_version (struct rspamd_fuzzy_backend *backend)
+rspamd_fuzzy_backend_version (struct rspamd_fuzzy_backend *backend,
+ const gchar *source)
{
- gint ret = 0;
+ gint ret = -1;
if (backend) {
if (rspamd_fuzzy_backend_run_stmt (backend, FALSE,
- RSPAMD_FUZZY_BACKEND_VERSION) == SQLITE_OK) {
+ RSPAMD_FUZZY_BACKEND_VERSION, source) == SQLITE_OK) {
ret = sqlite3_column_int64 (
prepared_stmts[RSPAMD_FUZZY_BACKEND_VERSION].stmt, 0);
}
diff --git a/src/libserver/fuzzy_backend.h b/src/libserver/fuzzy_backend.h
index bcd199d1a..91a613f2a 100644
--- a/src/libserver/fuzzy_backend.h
+++ b/src/libserver/fuzzy_backend.h
@@ -46,7 +46,8 @@ struct rspamd_fuzzy_reply rspamd_fuzzy_backend_check (
/**
* Prepare storage for updates (by starting transaction)
*/
-gboolean rspamd_fuzzy_backend_prepare_update (struct rspamd_fuzzy_backend *backend);
+gboolean rspamd_fuzzy_backend_prepare_update (struct rspamd_fuzzy_backend *backend,
+ const gchar *source);
/**
* Add digest to the database
@@ -72,7 +73,8 @@ gboolean rspamd_fuzzy_backend_del (
/**
* Commit updates to storage
*/
-gboolean rspamd_fuzzy_backend_finish_update (struct rspamd_fuzzy_backend *backend);
+gboolean rspamd_fuzzy_backend_finish_update (struct rspamd_fuzzy_backend *backend,
+ const gchar *source);
/**
* Sync storage
@@ -90,7 +92,7 @@ gboolean rspamd_fuzzy_backend_sync (struct rspamd_fuzzy_backend *backend,
void rspamd_fuzzy_backend_close (struct rspamd_fuzzy_backend *backend);
gsize rspamd_fuzzy_backend_count (struct rspamd_fuzzy_backend *backend);
-gint rspamd_fuzzy_backend_version (struct rspamd_fuzzy_backend *backend);
+gint rspamd_fuzzy_backend_version (struct rspamd_fuzzy_backend *backend, const gchar *source);
gsize rspamd_fuzzy_backend_expired (struct rspamd_fuzzy_backend *backend);
const gchar * rspamd_fuzzy_backend_id (struct rspamd_fuzzy_backend *backend);