#define INVALID_NODE_TIME (guint64) - 1
+static const gchar *local_db_name = "local";
+
/* Init functions */
gpointer init_fuzzy (struct rspamd_config *cfg);
void start_fuzzy (struct rspamd_worker *worker);
guint32 rev;
const gchar *p;
- rev = rspamd_fuzzy_backend_version (ctx->backend);
+ rev = rspamd_fuzzy_backend_version (ctx->backend, local_db_name);
rev = GUINT32_TO_LE (rev);
len = sizeof (guint32) * 2; /* revision + last chunk */
}
static void
-rspamd_fuzzy_process_updates_queue (struct rspamd_fuzzy_storage_ctx *ctx)
+rspamd_fuzzy_process_updates_queue (struct rspamd_fuzzy_storage_ctx *ctx,
+ const gchar *source)
{
GList *cur;
struct fuzzy_peer_cmd *io_cmd;
if (ctx->updates_pending &&
g_queue_get_length (ctx->updates_pending) > 0 &&
- rspamd_fuzzy_backend_prepare_update (ctx->backend)) {
+ rspamd_fuzzy_backend_prepare_update (ctx->backend, source)) {
cur = ctx->updates_pending->head;
while (cur) {
cur = g_list_next (cur);
}
- if (rspamd_fuzzy_backend_finish_update (ctx->backend)) {
+ if (rspamd_fuzzy_backend_finish_update (ctx->backend, source)) {
ctx->stat.fuzzy_hashes = rspamd_fuzzy_backend_count (ctx->backend);
for (i = 0; i < ctx->mirrors->len; i ++) {
g_queue_clear (ctx->updates_pending);
msg_info ("updated fuzzy storage: %ud updates processed, version: %d",
- nupdates, rspamd_fuzzy_backend_version (ctx->backend));
+ nupdates, rspamd_fuzzy_backend_version (ctx->backend, source));
}
else {
msg_err ("cannot commit update transaction to fuzzy backend, "
struct rspamd_http_message *msg)
{
const guchar *p;
+ gchar *src = NULL;
gsize remain;
guint32 revision, our_rev, len, cnt = 0;
struct fuzzy_peer_cmd cmd, *pcmd;
} state = read_len;
GList *updates = NULL, *cur;
- if (!msg->body || msg->body->len == 0) {
+ if (!msg->body || msg->body->len == 0 || !msg->url || msg->url->len == 0) {
msg_err ("empty update message, not processing");
return;
}
+ /* Detect source from url: /update_v1/<source>, so we look for the last '/' */
+ remain = msg->url->len;
+ src = rspamd_fstringdup (msg->url);
+
+ while (remain--) {
+ if (src[remain] == '/') {
+ src = &src[remain + 1];
+ break;
+ }
+ }
+
/*
* Message format:
* <uint32_le> - revision
if (remain > sizeof (guint32) * 2) {
memcpy (&revision, p, sizeof (guint32));
revision = GUINT32_TO_LE (revision);
- our_rev = rspamd_fuzzy_backend_version (session->ctx->backend);
+ our_rev = rspamd_fuzzy_backend_version (session->ctx->backend, src);
if (revision <= our_rev) {
msg_err ("remote revision:d %d is older than ours: %d, refusing update",
revision, our_rev);
+ g_free (src);
return;
}
cur->data = NULL;
}
- rspamd_fuzzy_process_updates_queue (session->ctx);
+ rspamd_fuzzy_process_updates_queue (session->ctx, src);
msg_info ("processed updates from the master, %ud operations processed,"
" revision: %ud", cnt, revision);
err:
+ g_free (src);
+
if (updates) {
/* We still need to clear queue */
for (cur = updates; cur != NULL; cur = g_list_next (cur)) {
ctx = worker->ctx;
if (ctx->backend) {
- rspamd_fuzzy_process_updates_queue (ctx);
+ rspamd_fuzzy_process_updates_queue (ctx, local_db_name);
/* Call backend sync */
old_expired = rspamd_fuzzy_backend_expired (ctx->backend);
rspamd_fuzzy_backend_sync (ctx->backend, ctx->expire, TRUE);
struct rspamd_control_reply rep;
if (ctx->backend) {
- rspamd_fuzzy_process_updates_queue (ctx);
+ rspamd_fuzzy_process_updates_queue (ctx, local_db_name);
/* Call backend sync */
old_expired = rspamd_fuzzy_backend_expired (ctx->backend);
rspamd_fuzzy_backend_sync (ctx->backend, ctx->expire, TRUE);
rspamd_worker_block_signals ();
if (worker->index == 0) {
- rspamd_fuzzy_process_updates_queue (ctx);
+ rspamd_fuzzy_process_updates_queue (ctx, local_db_name);
rspamd_fuzzy_backend_sync (ctx->backend, ctx->expire, TRUE);
}
static const char *create_tables_sql =
"BEGIN;"
"CREATE TABLE IF NOT EXISTS digests("
- "id INTEGER PRIMARY KEY,"
- "flag INTEGER NOT NULL,"
- "digest TEXT NOT NULL,"
- "value INTEGER,"
- "time INTEGER);"
+ " id INTEGER PRIMARY KEY,"
+ " flag INTEGER NOT NULL,"
+ " digest TEXT NOT NULL,"
+ " value INTEGER,"
+ " time INTEGER);"
"CREATE TABLE IF NOT EXISTS shingles("
- "value INTEGER NOT NULL,"
- "number INTEGER NOT NULL,"
- "digest_id INTEGER REFERENCES digests(id) ON DELETE CASCADE "
- "ON UPDATE CASCADE);"
+ " value INTEGER NOT NULL,"
+ " number INTEGER NOT NULL,"
+ " digest_id INTEGER REFERENCES digests(id) ON DELETE CASCADE "
+ " ON UPDATE CASCADE);"
+ "CREATE TABLE IF NOT EXISTS sources("
+ " name TEXT UNIQUE,"
+ " version INTEGER,"
+ " last INTEGER);"
"CREATE UNIQUE INDEX IF NOT EXISTS d ON digests(digest);"
"CREATE INDEX IF NOT EXISTS t ON digests(time);"
"CREATE INDEX IF NOT EXISTS dgst_id ON shingles(digest_id);"
RSPAMD_FUZZY_BACKEND_EXPIRE,
RSPAMD_FUZZY_BACKEND_VACUUM,
RSPAMD_FUZZY_BACKEND_DELETE_ORPHANED,
+ RSPAMD_FUZZY_BACKEND_ADD_SOURCE,
RSPAMD_FUZZY_BACKEND_VERSION,
+ RSPAMD_FUZZY_BACKEND_SET_VERSION,
RSPAMD_FUZZY_BACKEND_MAX
};
static struct rspamd_fuzzy_stmts {
.stmt = NULL,
.result = SQLITE_DONE
},
+ {
+ .idx = RSPAMD_FUZZY_BACKEND_ADD_SOURCE,
+ .sql = "INSERT OR IGNORE INTO sources(name, version, last) VALUES (?1, ?2, ?3);",
+ .args = "TII",
+ .stmt = NULL,
+ .result = SQLITE_DONE
+ },
{
.idx = RSPAMD_FUZZY_BACKEND_VERSION,
- .sql = "PRAGMA user_version;",
- .args = "",
+ .sql = "SELECT version FROM sources WHERE name=?1;",
+ .args = "T",
.stmt = NULL,
.result = SQLITE_ROW
},
+ {
+ .idx = RSPAMD_FUZZY_BACKEND_SET_VERSION,
+ .sql = "UPDATE sources SET version=?1, last=?2 WHERE name=?3;",
+ .args = "IIT",
+ .stmt = NULL,
+ .result = SQLITE_DONE
+ },
};
static GQuark
}
gboolean
-rspamd_fuzzy_backend_prepare_update (struct rspamd_fuzzy_backend *backend)
+rspamd_fuzzy_backend_prepare_update (struct rspamd_fuzzy_backend *backend,
+ const gchar *source)
{
gint rc;
}
gboolean
-rspamd_fuzzy_backend_finish_update (struct rspamd_fuzzy_backend *backend)
+rspamd_fuzzy_backend_finish_update (struct rspamd_fuzzy_backend *backend,
+ const gchar *source)
{
gint rc, wal_frames, wal_checkpointed, ver;
- gint64 version = 0;
- gchar version_buf[128];
+
+ /* Get and update version */
+ ver = rspamd_fuzzy_backend_version (backend, source);
+ ++ver;
rc = rspamd_fuzzy_backend_run_stmt (backend, TRUE,
- RSPAMD_FUZZY_BACKEND_TRANSACTION_COMMIT);
+ RSPAMD_FUZZY_BACKEND_SET_VERSION,
+ (gint64)ver, (gint64)time (NULL), source);
- if (rc != SQLITE_OK) {
- msg_warn_fuzzy_backend ("cannot commit updates: %s",
- sqlite3_errmsg (backend->db));
- rspamd_fuzzy_backend_run_stmt (backend, TRUE,
- RSPAMD_FUZZY_BACKEND_TRANSACTION_ROLLBACK);
- return FALSE;
- }
- else {
- if (!rspamd_sqlite3_sync (backend->db, &wal_frames, &wal_checkpointed)) {
- msg_warn_fuzzy_backend ("cannot commit checkpoint: %s",
+ if (rc == SQLITE_OK) {
+ rc = rspamd_fuzzy_backend_run_stmt (backend, TRUE,
+ RSPAMD_FUZZY_BACKEND_TRANSACTION_COMMIT);
+
+ if (rc != SQLITE_OK) {
+ msg_warn_fuzzy_backend ("cannot commit updates: %s",
sqlite3_errmsg (backend->db));
+ rspamd_fuzzy_backend_run_stmt (backend, TRUE,
+ RSPAMD_FUZZY_BACKEND_TRANSACTION_ROLLBACK);
+ return FALSE;
}
- else if (wal_checkpointed > 0) {
- msg_info_fuzzy_backend ("total number of frames in the wal file: "
- "%d, checkpointed: %d", wal_frames, wal_checkpointed);
+ else {
+ if (!rspamd_sqlite3_sync (backend->db, &wal_frames, &wal_checkpointed)) {
+ msg_warn_fuzzy_backend ("cannot commit checkpoint: %s",
+ sqlite3_errmsg (backend->db));
+ }
+ else if (wal_checkpointed > 0) {
+ msg_info_fuzzy_backend ("total number of frames in the wal file: "
+ "%d, checkpointed: %d", wal_frames, wal_checkpointed);
+ }
}
}
-
- /* Get and update version */
- ver = rspamd_fuzzy_backend_version (backend);
- ++ver;
- rspamd_snprintf (version_buf, sizeof (version_buf), "PRAGMA user_version=%d;",
- ver);
-
- if (sqlite3_exec (backend->db, version_buf, NULL, NULL, NULL) != SQLITE_OK) {
- msg_err_fuzzy_backend ("cannot set database version to %L: %s",
- version, sqlite3_errmsg (backend->db));
+ else {
+ msg_warn_fuzzy_backend ("cannot update version for %s: %s", source,
+ sqlite3_errmsg (backend->db));
+ rspamd_fuzzy_backend_run_stmt (backend, TRUE,
+ RSPAMD_FUZZY_BACKEND_TRANSACTION_ROLLBACK);
+ return FALSE;
}
return TRUE;
}
gint
-rspamd_fuzzy_backend_version (struct rspamd_fuzzy_backend *backend)
+rspamd_fuzzy_backend_version (struct rspamd_fuzzy_backend *backend,
+ const gchar *source)
{
- gint ret = 0;
+ gint ret = -1;
if (backend) {
if (rspamd_fuzzy_backend_run_stmt (backend, FALSE,
- RSPAMD_FUZZY_BACKEND_VERSION) == SQLITE_OK) {
+ RSPAMD_FUZZY_BACKEND_VERSION, source) == SQLITE_OK) {
ret = sqlite3_column_int64 (
prepared_stmts[RSPAMD_FUZZY_BACKEND_VERSION].stmt, 0);
}