Browse Source

Move all updates to a single transaction.

tags/1.1.0
Vsevolod Stakhov 8 years ago
parent
commit
0fc01045cf
3 changed files with 78 additions and 63 deletions
  1. 35
    15
      src/fuzzy_storage.c
  2. 33
    48
      src/libserver/fuzzy_backend.c
  3. 10
    0
      src/libserver/fuzzy_backend.h

+ 35
- 15
src/fuzzy_storage.c View File

@@ -150,26 +150,46 @@ rspamd_fuzzy_process_updates_queue (struct rspamd_fuzzy_storage_ctx *ctx)
struct fuzzy_peer_cmd *cmd;
guint nupdates = 0;

cur = ctx->updates_pending->head;
while (cur) {
cmd = cur->data;
if (rspamd_fuzzy_backend_prepare_update (ctx->backend)) {
cur = ctx->updates_pending->head;
while (cur) {
cmd = cur->data;

if (cmd->cmd.normal.cmd == FUZZY_WRITE) {
rspamd_fuzzy_backend_add (ctx->backend, &cmd->cmd.normal);
}
else {
rspamd_fuzzy_backend_del (ctx->backend, &cmd->cmd.normal);
if (cmd->cmd.normal.cmd == FUZZY_WRITE) {
rspamd_fuzzy_backend_add (ctx->backend, &cmd->cmd.normal);
}
else {
rspamd_fuzzy_backend_del (ctx->backend, &cmd->cmd.normal);
}

nupdates++;
cur = g_list_next (cur);
}

g_slice_free1 (sizeof (*cmd), cmd);
nupdates ++;
cur = g_list_next (cur);
}
if (rspamd_fuzzy_backend_finish_update (ctx->backend)) {
server_stat->fuzzy_hashes = rspamd_fuzzy_backend_count (ctx->backend);
cur = ctx->updates_pending->head;

g_queue_clear (ctx->updates_pending);
server_stat->fuzzy_hashes = rspamd_fuzzy_backend_count (ctx->backend);
while (cur) {
cmd = cur->data;
g_slice_free1 (sizeof (*cmd), cmd);
cur = g_list_next (cur);
}

msg_info ("updated fuzzy storage: %ud updates processed", nupdates);
g_queue_clear (ctx->updates_pending);
msg_info ("updated fuzzy storage: %ud updates processed", nupdates);
}
else {
msg_err ("cannot commit update transaction to fuzzy backend, "
"%ud updates are still pending",
g_queue_get_length (ctx->updates_pending));
}
}
else {
msg_err ("cannot start transaction in fuzzy backend, "
"%ud updates are still pending",
g_queue_get_length (ctx->updates_pending));
}
}

static void

+ 33
- 48
src/libserver/fuzzy_backend.c View File

@@ -577,12 +577,9 @@ rspamd_fuzzy_backend_check (struct rspamd_fuzzy_backend *backend,
}

gboolean
rspamd_fuzzy_backend_add (struct rspamd_fuzzy_backend *backend,
const struct rspamd_fuzzy_cmd *cmd)
rspamd_fuzzy_backend_prepare_update (struct rspamd_fuzzy_backend *backend)
{
int rc, i;
gint64 id;
const struct rspamd_fuzzy_shingle_cmd *shcmd;
gint rc;

if (backend == NULL) {
return FALSE;
@@ -592,13 +589,26 @@ rspamd_fuzzy_backend_add (struct rspamd_fuzzy_backend *backend,
RSPAMD_FUZZY_BACKEND_TRANSACTION_START);

if (rc != SQLITE_OK) {
msg_warn_fuzzy_backend ("cannot start transaction to add hash to %d -> "
"%*xs: %s", (gint) cmd->flag,
(gint) sizeof (cmd->digest), cmd->digest,
msg_warn_fuzzy_backend ("cannot start transaction for updates: %s",
sqlite3_errmsg (backend->db));
return FALSE;
}

return TRUE;
}

gboolean
rspamd_fuzzy_backend_add (struct rspamd_fuzzy_backend *backend,
const struct rspamd_fuzzy_cmd *cmd)
{
int rc, i;
gint64 id;
const struct rspamd_fuzzy_shingle_cmd *shcmd;

if (backend == NULL) {
return FALSE;
}

rc = rspamd_fuzzy_backend_run_stmt (backend, TRUE,
RSPAMD_FUZZY_BACKEND_CHECK,
cmd->digest);
@@ -658,30 +668,28 @@ rspamd_fuzzy_backend_add (struct rspamd_fuzzy_backend *backend,
RSPAMD_FUZZY_BACKEND_INSERT);
}

if (rc == SQLITE_OK) {
rc = rspamd_fuzzy_backend_run_stmt (backend, TRUE,
RSPAMD_FUZZY_BACKEND_TRANSACTION_COMMIT);
}
else {
msg_warn_fuzzy_backend ("cannot add hash to %d -> "
"%*xs: %s", (gint) cmd->flag,
(gint) sizeof (cmd->digest), cmd->digest,
sqlite3_errmsg (backend->db));
rspamd_fuzzy_backend_run_stmt (backend, TRUE,
RSPAMD_FUZZY_BACKEND_TRANSACTION_ROLLBACK);
}
return (rc == SQLITE_OK);
}

gboolean
rspamd_fuzzy_backend_finish_update (struct rspamd_fuzzy_backend *backend)
{
gint rc;

rc = rspamd_fuzzy_backend_run_stmt (backend, TRUE,
RSPAMD_FUZZY_BACKEND_TRANSACTION_COMMIT);

if (rc != SQLITE_OK) {
msg_warn_fuzzy_backend ("cannot commit hash to %d -> "
"%*xs: %s", (gint) cmd->flag,
(gint) sizeof (cmd->digest), cmd->digest,
msg_warn_fuzzy_backend ("cannot commit updates: %s",
sqlite3_errmsg (backend->db));
rspamd_fuzzy_backend_run_stmt (backend, TRUE,
RSPAMD_FUZZY_BACKEND_TRANSACTION_ROLLBACK);
return FALSE;
}

return (rc == SQLITE_OK);
return TRUE;
}


gboolean
rspamd_fuzzy_backend_del (struct rspamd_fuzzy_backend *backend,
const struct rspamd_fuzzy_cmd *cmd)
@@ -692,33 +700,10 @@ rspamd_fuzzy_backend_del (struct rspamd_fuzzy_backend *backend,
return FALSE;
}

rspamd_fuzzy_backend_run_stmt (backend, TRUE,
RSPAMD_FUZZY_BACKEND_TRANSACTION_START);

rc = rspamd_fuzzy_backend_run_stmt (backend, TRUE,
RSPAMD_FUZZY_BACKEND_DELETE,
cmd->digest);

if (rc == SQLITE_OK) {
rc = rspamd_fuzzy_backend_run_stmt (backend, TRUE,
RSPAMD_FUZZY_BACKEND_TRANSACTION_COMMIT);

if (rc != SQLITE_OK) {
msg_warn_fuzzy_backend ("cannot commit delete hash from %d -> "
"%*xs: %s", (gint) cmd->flag,
(gint) sizeof (cmd->digest), cmd->digest,
sqlite3_errmsg (backend->db));
}
}
else {
msg_warn_fuzzy_backend ("cannot delete hash from %d -> "
"%*xs: %s", (gint) cmd->flag,
(gint) sizeof (cmd->digest), cmd->digest,
sqlite3_errmsg (backend->db));
rspamd_fuzzy_backend_run_stmt (backend, TRUE,
RSPAMD_FUZZY_BACKEND_TRANSACTION_ROLLBACK);
}

return (rc == SQLITE_OK);
}


+ 10
- 0
src/libserver/fuzzy_backend.h View File

@@ -50,6 +50,11 @@ struct rspamd_fuzzy_reply rspamd_fuzzy_backend_check (
const struct rspamd_fuzzy_cmd *cmd,
gint64 expire);

/**
* Prepare storage for updates (by starting transaction)
*/
gboolean rspamd_fuzzy_backend_prepare_update (struct rspamd_fuzzy_backend *backend);

/**
* Add digest to the database
* @param backend
@@ -70,6 +75,11 @@ gboolean rspamd_fuzzy_backend_del (
struct rspamd_fuzzy_backend *backend,
const struct rspamd_fuzzy_cmd *cmd);

/**
* Commit updates to storage
*/
gboolean rspamd_fuzzy_backend_finish_update (struct rspamd_fuzzy_backend *backend);

/**
* Sync storage
* @param backend

Loading…
Cancel
Save