Browse Source

More sync optimizations.

tags/1.0.3
Vsevolod Stakhov 8 years ago
parent
commit
f91453c705
3 changed files with 81 additions and 72 deletions
  1. 9
    8
      src/fuzzy_storage.c
  2. 70
    63
      src/libserver/fuzzy_backend.c
  3. 2
    1
      src/libserver/fuzzy_backend.h

+ 9
- 8
src/fuzzy_storage.c View File

@@ -267,18 +267,19 @@ sync_callback (gint fd, short what, void *arg)
gdouble next_check;

ctx = worker->ctx;
/* Call backend sync */
rspamd_fuzzy_backend_sync (ctx->backend, ctx->expire, FALSE);

server_stat->fuzzy_hashes_expired = rspamd_fuzzy_backend_expired (ctx->backend);

/* Timer event */
event_del (&tev);
evtimer_set (&tev, sync_callback, worker);
event_base_set (ctx->ev_base, &tev);
/* Plan event with jitter */
next_check = rspamd_time_jitter (ctx->sync_timeout, 0);
double_to_tv (next_check, &tmv);
evtimer_add (&tev, &tmv);

/* Call backend sync */
rspamd_fuzzy_backend_sync (ctx->backend, ctx->expire);

server_stat->fuzzy_hashes_expired = rspamd_fuzzy_backend_expired (ctx->backend);
}

gpointer
@@ -352,12 +353,12 @@ start_fuzzy (struct rspamd_worker *worker)

server_stat->fuzzy_hashes = rspamd_fuzzy_backend_count (ctx->backend);

rspamd_fuzzy_backend_sync (ctx->backend, ctx->expire, TRUE);
/* Timer event */
evtimer_set (&tev, sync_callback, worker);
event_base_set (ctx->ev_base, &tev);
/* Plan event with jitter */
next_check = ctx->sync_timeout * (1. + ((gdouble)ottery_rand_uint32 ()) /
G_MAXUINT32);
next_check = rspamd_time_jitter (ctx->sync_timeout, 0);
double_to_tv (next_check, &tmv);
evtimer_add (&tev, &tmv);

@@ -379,7 +380,7 @@ start_fuzzy (struct rspamd_worker *worker)

event_base_loop (ctx->ev_base, 0);

rspamd_fuzzy_backend_sync (ctx->backend, ctx->expire);
rspamd_fuzzy_backend_sync (ctx->backend, ctx->expire, TRUE);
rspamd_fuzzy_backend_close (ctx->backend);
rspamd_log_close (rspamd_main->logger);
exit (EXIT_SUCCESS);

+ 70
- 63
src/libserver/fuzzy_backend.c View File

@@ -179,8 +179,8 @@ static struct rspamd_fuzzy_stmts {
},
{
.idx = RSPAMD_FUZZY_BACKEND_EXPIRE,
.sql = "DELETE FROM digests WHERE time < ?1;",
.args = "I",
.sql = "DELETE FROM digests WHERE id = (SELECT id FROM digests WHERE time < ?1 LIMIT ?2);",
.args = "II",
.stmt = NULL,
.result = SQLITE_DONE
},
@@ -412,26 +412,13 @@ rspamd_fuzzy_backend_open_db (const gchar *path, GError **err)
struct rspamd_fuzzy_backend*
rspamd_fuzzy_backend_open (const gchar *path, GError **err)
{
struct orphaned_shingle_elt {
gint64 value;
gint64 number;
};

gchar *dir;
gint fd;
struct rspamd_fuzzy_backend *backend;
static const char sqlite_wal[] = "PRAGMA journal_mode=\"wal\";",
fallback_journal[] = "PRAGMA journal_mode=\"off\";",
foreign_keys[] = "PRAGMA foreign_keys=\"ON\";",
orphaned_shingles[] = "SELECT shingles.value,shingles.number "
"FROM shingles "
"LEFT JOIN digests ON "
"shingles.digest_id=digests.id WHERE "
"digests.id IS NULL;";
sqlite3_stmt *stmt;
GArray *orphaned;
struct orphaned_shingle_elt orphaned_elt, *pelt;
gint rc, i;
foreign_keys[] = "PRAGMA foreign_keys=\"ON\";";
gint rc;

if (path == NULL) {
g_set_error (err, rspamd_fuzzy_backend_quark (),
@@ -494,49 +481,6 @@ rspamd_fuzzy_backend_open (const gchar *path, GError **err)
sqlite3_errmsg (backend->db));
}

/* Cleanup database */
if ((rc = sqlite3_prepare_v2 (backend->db, orphaned_shingles, -1, &stmt,
NULL)) != SQLITE_OK) {
msg_warn_fuzzy_backend ("cannot cleanup shingles: %s",
sqlite3_errmsg (backend->db));
}
else {
orphaned = g_array_new (FALSE, FALSE, sizeof (struct orphaned_shingle_elt));

while (sqlite3_step (stmt) == SQLITE_ROW) {
orphaned_elt.value = sqlite3_column_int64 (stmt, 0);
orphaned_elt.number = sqlite3_column_int64 (stmt, 1);
g_array_append_val (orphaned, orphaned_elt);
}

sqlite3_finalize (stmt);
msg_info_fuzzy_backend ("going to delete %ud orphaned shingles",
orphaned->len);

if (orphaned->len > 0) {
/* Need to delete orphaned elements */
rspamd_fuzzy_backend_run_simple (
RSPAMD_FUZZY_BACKEND_TRANSACTION_START,
backend,
NULL);

for (i = 0; i < (gint)orphaned->len; i++) {
pelt = &g_array_index (orphaned, struct orphaned_shingle_elt, i);
rspamd_fuzzy_backend_run_stmt (backend,
RSPAMD_FUZZY_BACKEND_DELETE_ORPHANED,
pelt->value, pelt->number);
}

rspamd_fuzzy_backend_run_simple (
RSPAMD_FUZZY_BACKEND_TRANSACTION_COMMIT,
backend,
NULL);
msg_info_fuzzy_backend ("deleted %ud orphaned shingles", orphaned->len);
}

g_array_free (orphaned, TRUE);
}

rspamd_fuzzy_backend_run_simple (RSPAMD_FUZZY_BACKEND_VACUUM, backend, NULL);

if (rspamd_fuzzy_backend_run_stmt (backend, RSPAMD_FUZZY_BACKEND_COUNT)
@@ -728,12 +672,29 @@ rspamd_fuzzy_backend_del (struct rspamd_fuzzy_backend *backend,
}

gboolean
rspamd_fuzzy_backend_sync (struct rspamd_fuzzy_backend *backend, gint64 expire)
rspamd_fuzzy_backend_sync (struct rspamd_fuzzy_backend *backend,
gint64 expire,
gboolean clean_orphaned)
{
struct orphaned_shingle_elt {
gint64 value;
gint64 number;
};

/* Do not do more than 5k ops per step */
const guint64 max_changes = 5000;
gboolean ret = FALSE;
gint64 expire_lim, expired;
gint rc;
gint rc, i;
GError *err = NULL;
static const gchar orphaned_shingles[] = "SELECT shingles.value,shingles.number "
"FROM shingles "
"LEFT JOIN digests ON "
"shingles.digest_id=digests.id WHERE "
"digests.id IS NULL;";
sqlite3_stmt *stmt;
GArray *orphaned;
struct orphaned_shingle_elt orphaned_elt, *pelt;

/* Perform expire */
if (expire > 0) {
@@ -741,7 +702,7 @@ rspamd_fuzzy_backend_sync (struct rspamd_fuzzy_backend *backend, gint64 expire)

if (expire_lim > 0) {
rc = rspamd_fuzzy_backend_run_stmt (backend,
RSPAMD_FUZZY_BACKEND_EXPIRE, expire_lim);
RSPAMD_FUZZY_BACKEND_EXPIRE, expire_lim, max_changes);

if (rc == SQLITE_OK) {
expired = sqlite3_changes (backend->db);
@@ -756,8 +717,54 @@ rspamd_fuzzy_backend_sync (struct rspamd_fuzzy_backend *backend, gint64 expire)
sqlite3_errmsg (backend->db));
}
}
}

/* Cleanup database */
if (clean_orphaned) {
if ((rc = sqlite3_prepare_v2 (backend->db, orphaned_shingles, -1, &stmt,
NULL)) != SQLITE_OK) {
msg_warn_fuzzy_backend ("cannot cleanup shingles: %s",
sqlite3_errmsg (backend->db));
}
else {
orphaned = g_array_new (FALSE,
FALSE,
sizeof (struct orphaned_shingle_elt));

while (sqlite3_step (stmt) == SQLITE_ROW) {
orphaned_elt.value = sqlite3_column_int64 (stmt, 0);
orphaned_elt.number = sqlite3_column_int64 (stmt, 1);
g_array_append_val (orphaned, orphaned_elt);

if (orphaned->len > max_changes) {
break;
}
}

sqlite3_finalize (stmt);

if (orphaned->len > 0) {
msg_info_fuzzy_backend ("going to delete %ud orphaned shingles",
orphaned->len);
/* Need to delete orphaned elements */

for (i = 0; i < (gint) orphaned->len; i++) {
pelt = &g_array_index (orphaned,
struct orphaned_shingle_elt,
i);
rspamd_fuzzy_backend_run_stmt (backend,
RSPAMD_FUZZY_BACKEND_DELETE_ORPHANED,
pelt->value, pelt->number);
}

msg_info_fuzzy_backend ("deleted %ud orphaned shingles",
orphaned->len);
}

g_array_free (orphaned, TRUE);
}
}

ret = rspamd_fuzzy_backend_run_simple (RSPAMD_FUZZY_BACKEND_TRANSACTION_COMMIT,
backend, &err);


+ 2
- 1
src/libserver/fuzzy_backend.h View File

@@ -75,7 +75,8 @@ gboolean rspamd_fuzzy_backend_del (
* @return
*/
gboolean rspamd_fuzzy_backend_sync (struct rspamd_fuzzy_backend *backend,
gint64 expire);
gint64 expire,
gboolean clean_orphaned);

/**
* Close storage

Loading…
Cancel
Save