Przeglądaj źródła

Start conversion of fuzzy to multiple workers

tags/1.1.0
Vsevolod Stakhov 8 lat temu
rodzic
commit
dbf63e9869
2 zmienionych plików z 73 dodań i 134 usunięć
  1. 53
    42
      src/fuzzy_storage.c
  2. 20
    92
      src/libserver/fuzzy_backend.c

+ 53
- 42
src/fuzzy_storage.c Wyświetl plik

@@ -58,7 +58,7 @@ worker_t fuzzy_worker = {
init_fuzzy, /* Init function */
start_fuzzy, /* Start function */
TRUE, /* No socket */
TRUE, /* Unique */
FALSE, /* Unique */
TRUE, /* Threaded */
FALSE, /* Non killable */
SOCK_DGRAM /* UDP socket */
@@ -388,34 +388,45 @@ accept_fuzzy_socket (gint fd, short what, void *arg)

/* Got some data */
if (what == EV_READ) {
worker->nconns++;

while ((r = rspamd_inet_address_recvfrom (fd, buf, sizeof (buf), 0,
&session.addr)) == -1) {
if (errno == EINTR) {
continue;
for (;;) {
worker->nconns++;

r = rspamd_inet_address_recvfrom (fd,
buf,
sizeof (buf),
0,
&session.addr);

if (r == -1) {
if (errno == EINTR) {
continue;
}
else if (errno == EAGAIN) {
return;
}

msg_err ("got error while reading from socket: %d, %s",
errno,
strerror (errno));
return;
}
msg_err ("got error while reading from socket: %d, %s",
errno,
strerror (errno));
return;
}

if (rspamd_fuzzy_cmd_from_wire (buf, r, &session)) {
/* Check shingles count sanity */
rspamd_fuzzy_process_command (&session);
}
else {
/* Discard input */
server_stat->fuzzy_hashes_checked[RSPAMD_FUZZY_EPOCH6] ++;
msg_debug ("invalid fuzzy command of size %d received", r);
}
if (rspamd_fuzzy_cmd_from_wire (buf, r, &session)) {
/* Check shingles count sanity */
rspamd_fuzzy_process_command (&session);
}
else {
/* Discard input */
server_stat->fuzzy_hashes_checked[RSPAMD_FUZZY_EPOCH6]++;
msg_debug ("invalid fuzzy command of size %d received", r);
}

rspamd_inet_address_destroy (session.addr);
worker->nconns --;
rspamd_inet_address_destroy (session.addr);
rspamd_explicit_memzero (session.nm, sizeof (session.nm));
worker->nconns--;
}
}

rspamd_explicit_memzero (session.nm, sizeof (session.nm));
}

static void
@@ -503,34 +514,34 @@ init_fuzzy (struct rspamd_config *cfg)
ctx->keypair_cache_size = DEFAULT_KEYPAIR_CACHE_SIZE;

rspamd_rcl_register_worker_option (cfg, type, "hashfile",
rspamd_rcl_parse_struct_string, ctx,
G_STRUCT_OFFSET (struct rspamd_fuzzy_storage_ctx, hashfile), 0);
rspamd_rcl_parse_struct_string, ctx,
G_STRUCT_OFFSET (struct rspamd_fuzzy_storage_ctx, hashfile), 0);

rspamd_rcl_register_worker_option (cfg, type, "hash_file",
rspamd_rcl_parse_struct_string, ctx,
G_STRUCT_OFFSET (struct rspamd_fuzzy_storage_ctx, hashfile), 0);
rspamd_rcl_parse_struct_string, ctx,
G_STRUCT_OFFSET (struct rspamd_fuzzy_storage_ctx, hashfile), 0);

rspamd_rcl_register_worker_option (cfg, type, "file",
rspamd_rcl_parse_struct_string, ctx,
G_STRUCT_OFFSET (struct rspamd_fuzzy_storage_ctx, hashfile), 0);
rspamd_rcl_parse_struct_string, ctx,
G_STRUCT_OFFSET (struct rspamd_fuzzy_storage_ctx, hashfile), 0);

rspamd_rcl_register_worker_option (cfg, type, "database",
rspamd_rcl_parse_struct_string, ctx,
G_STRUCT_OFFSET (struct rspamd_fuzzy_storage_ctx, hashfile), 0);
rspamd_rcl_parse_struct_string, ctx,
G_STRUCT_OFFSET (struct rspamd_fuzzy_storage_ctx, hashfile), 0);

rspamd_rcl_register_worker_option (cfg, type, "sync",
rspamd_rcl_parse_struct_time, ctx,
G_STRUCT_OFFSET (struct rspamd_fuzzy_storage_ctx,
sync_timeout), RSPAMD_CL_FLAG_TIME_FLOAT);
rspamd_rcl_parse_struct_time, ctx,
G_STRUCT_OFFSET (struct rspamd_fuzzy_storage_ctx,
sync_timeout), RSPAMD_CL_FLAG_TIME_FLOAT);

rspamd_rcl_register_worker_option (cfg, type, "expire",
rspamd_rcl_parse_struct_time, ctx,
G_STRUCT_OFFSET (struct rspamd_fuzzy_storage_ctx,
expire), RSPAMD_CL_FLAG_TIME_FLOAT);
rspamd_rcl_parse_struct_time, ctx,
G_STRUCT_OFFSET (struct rspamd_fuzzy_storage_ctx,
expire), RSPAMD_CL_FLAG_TIME_FLOAT);

rspamd_rcl_register_worker_option (cfg, type, "allow_update",
rspamd_rcl_parse_struct_string, ctx,
G_STRUCT_OFFSET (struct rspamd_fuzzy_storage_ctx, update_map), 0);
rspamd_rcl_parse_struct_string, ctx,
G_STRUCT_OFFSET (struct rspamd_fuzzy_storage_ctx, update_map), 0);

rspamd_rcl_register_worker_option (cfg, type, "keypair",
rspamd_rcl_parse_struct_keypair, ctx,
@@ -538,7 +549,8 @@ init_fuzzy (struct rspamd_config *cfg)

rspamd_rcl_register_worker_option (cfg, type, "keypair_cache_size",
rspamd_rcl_parse_struct_integer, ctx,
G_STRUCT_OFFSET (struct rspamd_fuzzy_storage_ctx, keypair_cache_size),
G_STRUCT_OFFSET (struct rspamd_fuzzy_storage_ctx,
keypair_cache_size),
RSPAMD_CL_FLAG_UINT);

return ctx;
@@ -559,7 +571,6 @@ start_fuzzy (struct rspamd_worker *worker)
accept_fuzzy_socket);
server_stat = worker->srv->stat;


/*
* Open DB and perform VACUUM
*/

+ 20
- 92
src/libserver/fuzzy_backend.c Wyświetl plik

@@ -27,6 +27,7 @@
#include "unix-std.h"

#include <sqlite3.h>
#include "libutil/sqlite_utils.h"

struct rspamd_fuzzy_backend {
sqlite3 *db;
@@ -66,6 +67,10 @@ static const char *create_tables_sql =
"number INTEGER NOT NULL,"
"digest_id INTEGER REFERENCES digests(id) ON DELETE CASCADE "
"ON UPDATE CASCADE);"
"CREATE UNIQUE INDEX IF NOT EXISTS d ON digests(digest);"
"CREATE INDEX IF NOT EXISTS t ON digests(time);"
"CREATE INDEX IF NOT EXISTS dgst_id ON shingles(digest_id);"
"CREATE UNIQUE INDEX IF NOT EXISTS s ON shingles(value, number);"
"COMMIT;";
static const char *create_index_sql =
"BEGIN;"
@@ -386,23 +391,25 @@ static struct rspamd_fuzzy_backend *
rspamd_fuzzy_backend_open_db (const gchar *path, GError **err)
{
struct rspamd_fuzzy_backend *bk;
sqlite3 *sqlite;
int rc;

if ((rc = sqlite3_open_v2 (path, &sqlite,
SQLITE_OPEN_READWRITE|SQLITE_OPEN_NOMUTEX, NULL)) != SQLITE_OK) {
g_set_error (err, rspamd_fuzzy_backend_quark (),
rc, "Cannot open sqlite db %s: %d",
path, rc);

return NULL;
}

bk = g_slice_alloc (sizeof (*bk));
bk->path = g_strdup (path);
bk->db = sqlite;
bk->expired = 0;
bk->pool = rspamd_mempool_new (rspamd_mempool_suggest_size (), "fuzzy_backend");
bk->db = rspamd_sqlite3_open_or_create (bk->pool, bk->path,
create_tables_sql, err);

if (bk->db == NULL) {
rspamd_fuzzy_backend_close (bk);

return NULL;
}

if (!rspamd_fuzzy_backend_prepare_stmts (bk, err)) {
rspamd_fuzzy_backend_close (bk);

return NULL;
}

return bk;
}
@@ -412,94 +419,17 @@ rspamd_fuzzy_backend_open (const gchar *path,
gboolean vacuum,
GError **err)
{
gchar *dir;
gint fd;
struct rspamd_fuzzy_backend *backend;
static const char sqlite_wal[] = "PRAGMA journal_mode=\"wal\";",
fallback_journal[] = "PRAGMA journal_mode=\"off\";",
foreign_keys[] = "PRAGMA foreign_keys=\"ON\";",
secure_delete[] = "PRAGMA secure_delete=\"OFF\";",
enable_mmap[] = "PRAGMA mmap_size=268435456;";
gint rc;

if (path == NULL) {
g_set_error (err, rspamd_fuzzy_backend_quark (),
ENOENT, "Path has not been specified");
return NULL;
}
/* First of all we check path for existence */
dir = g_path_get_dirname (path);
if (dir == NULL) {
g_set_error (err, rspamd_fuzzy_backend_quark (),
errno, "Cannot get directory name for %s: %s", path,
strerror (errno));
return NULL;
}

if (access (path, W_OK) == -1 && access (dir, W_OK) == -1) {
g_set_error (err, rspamd_fuzzy_backend_quark (),
errno, "Cannot access directory %s to create database: %s",
dir, strerror (errno));
g_free (dir);

return NULL;
}

g_free (dir);

if ((fd = open (path, O_RDONLY)) == -1) {
if (errno != ENOENT) {
g_set_error (err, rspamd_fuzzy_backend_quark (),
errno, "Cannot open file %s: %s",
path, strerror (errno));

return NULL;
}
}

close (fd);

/* Open database */
if ((backend = rspamd_fuzzy_backend_open_db (path, err)) == NULL) {
GError *tmp = NULL;

if ((backend = rspamd_fuzzy_backend_create_db (path, TRUE, &tmp)) == NULL) {
g_clear_error (err);
g_propagate_error (err, tmp);
return NULL;
}
g_clear_error (err);
}

if ((rc = sqlite3_exec (backend->db, sqlite_wal, NULL, NULL, NULL)) != SQLITE_OK) {
msg_warn_fuzzy_backend ("WAL mode is not supported (%s), locking issues might occur",
sqlite3_errmsg (backend->db));
sqlite3_exec (backend->db, fallback_journal, NULL, NULL, NULL);
}

if ((rc = sqlite3_exec (backend->db, foreign_keys, NULL, NULL, NULL)) !=
SQLITE_OK) {
msg_warn_fuzzy_backend ("foreign keys are not supported: %s",
sqlite3_errmsg (backend->db));
}

if ((rc = sqlite3_exec (backend->db, secure_delete, NULL, NULL, NULL)) !=
SQLITE_OK) {
msg_warn_fuzzy_backend ("cannot disable secure delete: %s",
sqlite3_errmsg (backend->db));
}

if (sizeof (gpointer) >= 8 &&
(rc = sqlite3_exec (backend->db, enable_mmap, NULL, NULL, NULL)) !=
SQLITE_OK) {
msg_warn_fuzzy_backend ("cannot enable mmap: %s",
sqlite3_errmsg (backend->db));
}

if (vacuum) {
rspamd_fuzzy_backend_run_simple (RSPAMD_FUZZY_BACKEND_VACUUM,
backend,
NULL);
return NULL;
}

if (rspamd_fuzzy_backend_run_stmt (backend, RSPAMD_FUZZY_BACKEND_COUNT)
@@ -508,8 +438,6 @@ rspamd_fuzzy_backend_open (const gchar *path,
prepared_stmts[RSPAMD_FUZZY_BACKEND_COUNT].stmt, 0);
}

rspamd_fuzzy_backend_run_sql (create_index_sql, backend, NULL);

rspamd_fuzzy_backend_run_simple (RSPAMD_FUZZY_BACKEND_TRANSACTION_START,
backend, NULL);


Ładowanie…
Anuluj
Zapisz