Browse Source

[Feature] Implement multiple-sources fuzzy storage

tags/1.3.0
Vsevolod Stakhov 8 years ago
parent
commit
f3b08af57c
3 changed files with 103 additions and 56 deletions
  1. 29
    11
      src/fuzzy_storage.c
  2. 69
    42
      src/libserver/fuzzy_backend.c
  3. 5
    3
      src/libserver/fuzzy_backend.h

+ 29
- 11
src/fuzzy_storage.c View File

@@ -44,6 +44,8 @@

#define INVALID_NODE_TIME (guint64) - 1

static const gchar *local_db_name = "local";

/* Init functions */
gpointer init_fuzzy (struct rspamd_config *cfg);
void start_fuzzy (struct rspamd_worker *worker);
@@ -259,7 +261,7 @@ fuzzy_mirror_updates_to_http (struct rspamd_fuzzy_storage_ctx *ctx,
guint32 rev;
const gchar *p;

rev = rspamd_fuzzy_backend_version (ctx->backend);
rev = rspamd_fuzzy_backend_version (ctx->backend, local_db_name);
rev = GUINT32_TO_LE (rev);
len = sizeof (guint32) * 2; /* revision + last chunk */

@@ -382,7 +384,8 @@ rspamd_fuzzy_send_update_mirror (struct rspamd_fuzzy_storage_ctx *ctx,
}

static void
rspamd_fuzzy_process_updates_queue (struct rspamd_fuzzy_storage_ctx *ctx)
rspamd_fuzzy_process_updates_queue (struct rspamd_fuzzy_storage_ctx *ctx,
const gchar *source)
{
GList *cur;
struct fuzzy_peer_cmd *io_cmd;
@@ -394,7 +397,7 @@ rspamd_fuzzy_process_updates_queue (struct rspamd_fuzzy_storage_ctx *ctx)

if (ctx->updates_pending &&
g_queue_get_length (ctx->updates_pending) > 0 &&
rspamd_fuzzy_backend_prepare_update (ctx->backend)) {
rspamd_fuzzy_backend_prepare_update (ctx->backend, source)) {
cur = ctx->updates_pending->head;

while (cur) {
@@ -420,7 +423,7 @@ rspamd_fuzzy_process_updates_queue (struct rspamd_fuzzy_storage_ctx *ctx)
cur = g_list_next (cur);
}

if (rspamd_fuzzy_backend_finish_update (ctx->backend)) {
if (rspamd_fuzzy_backend_finish_update (ctx->backend, source)) {
ctx->stat.fuzzy_hashes = rspamd_fuzzy_backend_count (ctx->backend);

for (i = 0; i < ctx->mirrors->len; i ++) {
@@ -440,7 +443,7 @@ rspamd_fuzzy_process_updates_queue (struct rspamd_fuzzy_storage_ctx *ctx)

g_queue_clear (ctx->updates_pending);
msg_info ("updated fuzzy storage: %ud updates processed, version: %d",
nupdates, rspamd_fuzzy_backend_version (ctx->backend));
nupdates, rspamd_fuzzy_backend_version (ctx->backend, source));
}
else {
msg_err ("cannot commit update transaction to fuzzy backend, "
@@ -887,6 +890,7 @@ rspamd_fuzzy_mirror_process_update (struct fuzzy_master_update_session *session,
struct rspamd_http_message *msg)
{
const guchar *p;
gchar *src = NULL;
gsize remain;
guint32 revision, our_rev, len, cnt = 0;
struct fuzzy_peer_cmd cmd, *pcmd;
@@ -897,12 +901,23 @@ rspamd_fuzzy_mirror_process_update (struct fuzzy_master_update_session *session,
} state = read_len;
GList *updates = NULL, *cur;

if (!msg->body || msg->body->len == 0) {
if (!msg->body || msg->body->len == 0 || !msg->url || msg->url->len == 0) {
msg_err ("empty update message, not processing");

return;
}

/* Detect source from url: /update_v1/<source>, so we look for the last '/' */
remain = msg->url->len;
src = rspamd_fstringdup (msg->url);

while (remain--) {
if (src[remain] == '/') {
src = &src[remain + 1];
break;
}
}

/*
* Message format:
* <uint32_le> - revision
@@ -918,11 +933,12 @@ rspamd_fuzzy_mirror_process_update (struct fuzzy_master_update_session *session,
if (remain > sizeof (guint32) * 2) {
memcpy (&revision, p, sizeof (guint32));
revision = GUINT32_TO_LE (revision);
our_rev = rspamd_fuzzy_backend_version (session->ctx->backend);
our_rev = rspamd_fuzzy_backend_version (session->ctx->backend, src);

if (revision <= our_rev) {
msg_err ("remote revision:d %d is older than ours: %d, refusing update",
revision, our_rev);
g_free (src);

return;
}
@@ -1006,11 +1022,13 @@ rspamd_fuzzy_mirror_process_update (struct fuzzy_master_update_session *session,
cur->data = NULL;
}

rspamd_fuzzy_process_updates_queue (session->ctx);
rspamd_fuzzy_process_updates_queue (session->ctx, src);
msg_info ("processed updates from the master, %ud operations processed,"
" revision: %ud", cnt, revision);

err:
g_free (src);

if (updates) {
/* We still need to clear queue */
for (cur = updates; cur != NULL; cur = g_list_next (cur)) {
@@ -1253,7 +1271,7 @@ sync_callback (gint fd, short what, void *arg)
ctx = worker->ctx;

if (ctx->backend) {
rspamd_fuzzy_process_updates_queue (ctx);
rspamd_fuzzy_process_updates_queue (ctx, local_db_name);
/* Call backend sync */
old_expired = rspamd_fuzzy_backend_expired (ctx->backend);
rspamd_fuzzy_backend_sync (ctx->backend, ctx->expire, TRUE);
@@ -1287,7 +1305,7 @@ rspamd_fuzzy_storage_sync (struct rspamd_main *rspamd_main,
struct rspamd_control_reply rep;

if (ctx->backend) {
rspamd_fuzzy_process_updates_queue (ctx);
rspamd_fuzzy_process_updates_queue (ctx, local_db_name);
/* Call backend sync */
old_expired = rspamd_fuzzy_backend_expired (ctx->backend);
rspamd_fuzzy_backend_sync (ctx->backend, ctx->expire, TRUE);
@@ -2112,7 +2130,7 @@ start_fuzzy (struct rspamd_worker *worker)
rspamd_worker_block_signals ();

if (worker->index == 0) {
rspamd_fuzzy_process_updates_queue (ctx);
rspamd_fuzzy_process_updates_queue (ctx, local_db_name);
rspamd_fuzzy_backend_sync (ctx->backend, ctx->expire, TRUE);
}


+ 69
- 42
src/libserver/fuzzy_backend.c View File

@@ -53,16 +53,20 @@ static const guint max_retries = 10;
static const char *create_tables_sql =
"BEGIN;"
"CREATE TABLE IF NOT EXISTS digests("
"id INTEGER PRIMARY KEY,"
"flag INTEGER NOT NULL,"
"digest TEXT NOT NULL,"
"value INTEGER,"
"time INTEGER);"
" id INTEGER PRIMARY KEY,"
" flag INTEGER NOT NULL,"
" digest TEXT NOT NULL,"
" value INTEGER,"
" time INTEGER);"
"CREATE TABLE IF NOT EXISTS shingles("
"value INTEGER NOT NULL,"
"number INTEGER NOT NULL,"
"digest_id INTEGER REFERENCES digests(id) ON DELETE CASCADE "
"ON UPDATE CASCADE);"
" value INTEGER NOT NULL,"
" number INTEGER NOT NULL,"
" digest_id INTEGER REFERENCES digests(id) ON DELETE CASCADE "
" ON UPDATE CASCADE);"
"CREATE TABLE IF NOT EXISTS sources("
" name TEXT UNIQUE,"
" version INTEGER,"
" last INTEGER);"
"CREATE UNIQUE INDEX IF NOT EXISTS d ON digests(digest);"
"CREATE INDEX IF NOT EXISTS t ON digests(time);"
"CREATE INDEX IF NOT EXISTS dgst_id ON shingles(digest_id);"
@@ -93,7 +97,9 @@ enum rspamd_fuzzy_statement_idx {
RSPAMD_FUZZY_BACKEND_EXPIRE,
RSPAMD_FUZZY_BACKEND_VACUUM,
RSPAMD_FUZZY_BACKEND_DELETE_ORPHANED,
RSPAMD_FUZZY_BACKEND_ADD_SOURCE,
RSPAMD_FUZZY_BACKEND_VERSION,
RSPAMD_FUZZY_BACKEND_SET_VERSION,
RSPAMD_FUZZY_BACKEND_MAX
};
static struct rspamd_fuzzy_stmts {
@@ -213,13 +219,27 @@ static struct rspamd_fuzzy_stmts {
.stmt = NULL,
.result = SQLITE_DONE
},
{
.idx = RSPAMD_FUZZY_BACKEND_ADD_SOURCE,
.sql = "INSERT OR IGNORE INTO sources(name, version, last) VALUES (?1, ?2, ?3);",
.args = "TII",
.stmt = NULL,
.result = SQLITE_DONE
},
{
.idx = RSPAMD_FUZZY_BACKEND_VERSION,
.sql = "PRAGMA user_version;",
.args = "",
.sql = "SELECT version FROM sources WHERE name=?1;",
.args = "T",
.stmt = NULL,
.result = SQLITE_ROW
},
{
.idx = RSPAMD_FUZZY_BACKEND_SET_VERSION,
.sql = "UPDATE sources SET version=?1, last=?2 WHERE name=?3;",
.args = "IIT",
.stmt = NULL,
.result = SQLITE_DONE
},
};

static GQuark
@@ -610,7 +630,8 @@ rspamd_fuzzy_backend_check (struct rspamd_fuzzy_backend *backend,
}

gboolean
rspamd_fuzzy_backend_prepare_update (struct rspamd_fuzzy_backend *backend)
rspamd_fuzzy_backend_prepare_update (struct rspamd_fuzzy_backend *backend,
const gchar *source)
{
gint rc;

@@ -729,42 +750,47 @@ rspamd_fuzzy_backend_add (struct rspamd_fuzzy_backend *backend,
}

gboolean
rspamd_fuzzy_backend_finish_update (struct rspamd_fuzzy_backend *backend)
rspamd_fuzzy_backend_finish_update (struct rspamd_fuzzy_backend *backend,
const gchar *source)
{
gint rc, wal_frames, wal_checkpointed, ver;
gint64 version = 0;
gchar version_buf[128];

/* Get and update version */
ver = rspamd_fuzzy_backend_version (backend, source);
++ver;

rc = rspamd_fuzzy_backend_run_stmt (backend, TRUE,
RSPAMD_FUZZY_BACKEND_TRANSACTION_COMMIT);
RSPAMD_FUZZY_BACKEND_SET_VERSION,
(gint64)ver, (gint64)time (NULL), source);

if (rc != SQLITE_OK) {
msg_warn_fuzzy_backend ("cannot commit updates: %s",
sqlite3_errmsg (backend->db));
rspamd_fuzzy_backend_run_stmt (backend, TRUE,
RSPAMD_FUZZY_BACKEND_TRANSACTION_ROLLBACK);
return FALSE;
}
else {
if (!rspamd_sqlite3_sync (backend->db, &wal_frames, &wal_checkpointed)) {
msg_warn_fuzzy_backend ("cannot commit checkpoint: %s",
if (rc == SQLITE_OK) {
rc = rspamd_fuzzy_backend_run_stmt (backend, TRUE,
RSPAMD_FUZZY_BACKEND_TRANSACTION_COMMIT);

if (rc != SQLITE_OK) {
msg_warn_fuzzy_backend ("cannot commit updates: %s",
sqlite3_errmsg (backend->db));
rspamd_fuzzy_backend_run_stmt (backend, TRUE,
RSPAMD_FUZZY_BACKEND_TRANSACTION_ROLLBACK);
return FALSE;
}
else if (wal_checkpointed > 0) {
msg_info_fuzzy_backend ("total number of frames in the wal file: "
"%d, checkpointed: %d", wal_frames, wal_checkpointed);
else {
if (!rspamd_sqlite3_sync (backend->db, &wal_frames, &wal_checkpointed)) {
msg_warn_fuzzy_backend ("cannot commit checkpoint: %s",
sqlite3_errmsg (backend->db));
}
else if (wal_checkpointed > 0) {
msg_info_fuzzy_backend ("total number of frames in the wal file: "
"%d, checkpointed: %d", wal_frames, wal_checkpointed);
}
}
}

/* Get and update version */
ver = rspamd_fuzzy_backend_version (backend);
++ver;
rspamd_snprintf (version_buf, sizeof (version_buf), "PRAGMA user_version=%d;",
ver);

if (sqlite3_exec (backend->db, version_buf, NULL, NULL, NULL) != SQLITE_OK) {
msg_err_fuzzy_backend ("cannot set database version to %L: %s",
version, sqlite3_errmsg (backend->db));
else {
msg_warn_fuzzy_backend ("cannot update version for %s: %s", source,
sqlite3_errmsg (backend->db));
rspamd_fuzzy_backend_run_stmt (backend, TRUE,
RSPAMD_FUZZY_BACKEND_TRANSACTION_ROLLBACK);
return FALSE;
}

return TRUE;
@@ -976,13 +1002,14 @@ rspamd_fuzzy_backend_count (struct rspamd_fuzzy_backend *backend)
}

gint
rspamd_fuzzy_backend_version (struct rspamd_fuzzy_backend *backend)
rspamd_fuzzy_backend_version (struct rspamd_fuzzy_backend *backend,
const gchar *source)
{
gint ret = 0;
gint ret = -1;

if (backend) {
if (rspamd_fuzzy_backend_run_stmt (backend, FALSE,
RSPAMD_FUZZY_BACKEND_VERSION) == SQLITE_OK) {
RSPAMD_FUZZY_BACKEND_VERSION, source) == SQLITE_OK) {
ret = sqlite3_column_int64 (
prepared_stmts[RSPAMD_FUZZY_BACKEND_VERSION].stmt, 0);
}

+ 5
- 3
src/libserver/fuzzy_backend.h View File

@@ -46,7 +46,8 @@ struct rspamd_fuzzy_reply rspamd_fuzzy_backend_check (
/**
* Prepare storage for updates (by starting transaction)
*/
gboolean rspamd_fuzzy_backend_prepare_update (struct rspamd_fuzzy_backend *backend);
gboolean rspamd_fuzzy_backend_prepare_update (struct rspamd_fuzzy_backend *backend,
const gchar *source);

/**
* Add digest to the database
@@ -72,7 +73,8 @@ gboolean rspamd_fuzzy_backend_del (
/**
* Commit updates to storage
*/
gboolean rspamd_fuzzy_backend_finish_update (struct rspamd_fuzzy_backend *backend);
gboolean rspamd_fuzzy_backend_finish_update (struct rspamd_fuzzy_backend *backend,
const gchar *source);

/**
* Sync storage
@@ -90,7 +92,7 @@ gboolean rspamd_fuzzy_backend_sync (struct rspamd_fuzzy_backend *backend,
void rspamd_fuzzy_backend_close (struct rspamd_fuzzy_backend *backend);

gsize rspamd_fuzzy_backend_count (struct rspamd_fuzzy_backend *backend);
gint rspamd_fuzzy_backend_version (struct rspamd_fuzzy_backend *backend);
gint rspamd_fuzzy_backend_version (struct rspamd_fuzzy_backend *backend, const gchar *source);
gsize rspamd_fuzzy_backend_expired (struct rspamd_fuzzy_backend *backend);

const gchar * rspamd_fuzzy_backend_id (struct rspamd_fuzzy_backend *backend);

Loading…
Cancel
Save