Browse Source

Add finalize process operation.

We could use sqlite transaction for reading as well.
tags/1.0.0
Vsevolod Stakhov 9 years ago
parent
commit
f4d576701c

+ 5
- 0
src/libstat/backends/backends.h View File

@@ -47,6 +47,8 @@ struct rspamd_stat_backend {
struct rspamd_statfile_config *stcf, gboolean learn, gpointer ctx);
gboolean (*process_token)(struct rspamd_task *task, struct token_node_s *tok,
struct rspamd_token_result *res, gpointer ctx);
void (*finalize_process)(struct rspamd_task *task,
gpointer runtime, gpointer ctx);
gboolean (*learn_token)(struct rspamd_task *task, struct token_node_s *tok,
struct rspamd_token_result *res, gpointer ctx);
gulong (*total_learns)(struct rspamd_task *task,
@@ -71,6 +73,9 @@ struct rspamd_stat_backend {
struct token_node_s *tok, \
struct rspamd_token_result *res, \
gpointer ctx); \
void rspamd_##name##_finalize_process (struct rspamd_task *task, \
gpointer runtime, \
gpointer ctx); \
gboolean rspamd_##name##_learn_token (struct rspamd_task *task, \
struct token_node_s *tok, \
struct rspamd_token_result *res, \

+ 6
- 0
src/libstat/backends/mmaped_file.c View File

@@ -1089,3 +1089,9 @@ rspamd_mmaped_file_finalize_learn (struct rspamd_task *task, gpointer runtime,
msync (mf->map, mf->len, MS_INVALIDATE | MS_ASYNC);
}
}

void
rspamd_mmaped_file_finalize_process (struct rspamd_task *task, gpointer runtime,
gpointer ctx)
{
}

+ 98
- 6
src/libstat/backends/sqlite3_backend.c View File

@@ -81,7 +81,8 @@ static const char *create_tables_sql =
"COMMIT;";

enum rspamd_stat_sqlite3_stmt_idx {
RSPAMD_STAT_BACKEND_TRANSACTION_START = 0,
RSPAMD_STAT_BACKEND_TRANSACTION_START_IM = 0,
RSPAMD_STAT_BACKEND_TRANSACTION_START_DEF,
RSPAMD_STAT_BACKEND_TRANSACTION_COMMIT,
RSPAMD_STAT_BACKEND_TRANSACTION_ROLLBACK,
RSPAMD_STAT_BACKEND_GET_TOKEN,
@@ -102,13 +103,21 @@ static struct rspamd_sqlite3_prstmt {
} prepared_stmts[RSPAMD_STAT_BACKEND_MAX] =
{
{
.idx = RSPAMD_STAT_BACKEND_TRANSACTION_START,
.idx = RSPAMD_STAT_BACKEND_TRANSACTION_START_IM,
.sql = "BEGIN IMMEDIATE TRANSACTION;",
.args = "",
.stmt = NULL,
.result = SQLITE_DONE,
.ret = ""
},
{
.idx = RSPAMD_STAT_BACKEND_TRANSACTION_START_DEF,
.sql = "BEGIN DEFERRED TRANSACTION;",
.args = "",
.stmt = NULL,
.result = SQLITE_DONE,
.ret = ""
},
{
.idx = RSPAMD_STAT_BACKEND_TRANSACTION_COMMIT,
.sql = "COMMIT;",
@@ -313,6 +322,39 @@ rspamd_sqlite3_close_prstmt (struct rspamd_stat_sqlite3_db *db)
return;
}

static gboolean
rspamd_sqlite3_wait (const gchar *lock)
{
gint fd;
struct timespec sleep_ts = {
.tv_sec = 0,
.tv_nsec = 1000000
};

fd = open (lock, O_RDONLY);

if (fd == -1) {
msg_err ("cannot open lock file %s: %s", lock, strerror (errno));

return FALSE;
}

while (!rspamd_file_lock (fd, TRUE)) {
if (nanosleep (&sleep_ts, NULL) == -1 && errno != EINTR) {
close (fd);
msg_err ("cannot sleep open lock file %s: %s", lock, strerror (errno));

return FALSE;
}
}

rspamd_file_unlock (fd, FALSE);

close (fd);

return TRUE;
}

static struct rspamd_stat_sqlite3_db *
rspamd_sqlite3_opendb (const gchar *path, const ucl_object_t *opts,
gboolean create, GError **err)
@@ -320,15 +362,35 @@ rspamd_sqlite3_opendb (const gchar *path, const ucl_object_t *opts,
struct rspamd_stat_sqlite3_db *bk;
sqlite3 *sqlite;
sqlite3_stmt *stmt;
gint rc, flags;
static const char sqlite_wal[] = "PRAGMA journal_mode=WAL;",
fallback_journal[] = "PRAGMA journal_mode=OFF;",
gint rc, flags, lock_fd;
gchar lock_path[PATH_MAX];
static const char sqlite_wal[] = "PRAGMA journal_mode=\"wal\";",
fallback_journal[] = "PRAGMA journal_mode=\"off\";",
user_version[] = "PRAGMA user_version;";

flags = SQLITE_OPEN_READWRITE;

if (create) {
flags |= SQLITE_OPEN_CREATE;

rspamd_snprintf (lock_path, sizeof (lock_path), "%s.lock", path);
lock_fd = open (lock_path, O_WRONLY|O_CREAT|O_EXCL, 00600);

if (lock_fd == -1 && (errno == EEXIST || errno == EBUSY)) {
if (!rspamd_sqlite3_wait (lock_path)) {
g_set_error (err, rspamd_sqlite3_quark (),
errno, "cannot create sqlite file %s: %s",
path, strerror (errno));

return NULL;
}

/* At this point we have database created */
create = FALSE;
}
else {
g_assert (rspamd_file_lock (lock_fd, FALSE));
}
}
else if (access (path, R_OK) == -1) {
g_set_error (err, rspamd_sqlite3_quark (),
@@ -378,9 +440,16 @@ rspamd_sqlite3_opendb (const gchar *path, const ucl_object_t *opts,
-1, "cannot execute create sql `%s`: %s",
create_tables_sql, sqlite3_errmsg (sqlite));
sqlite3_close (sqlite);
rspamd_file_unlock (lock_fd, FALSE);
unlink (lock_path);
close (lock_fd);

return NULL;
}

rspamd_file_unlock (lock_fd, FALSE);
unlink (lock_path);
close (lock_fd);
}

bk = g_slice_alloc0 (sizeof (*bk));
@@ -538,6 +607,11 @@ rspamd_sqlite3_process_token (struct rspamd_task *task, struct token_node_s *tok
return FALSE;
}

if (!bk->in_transaction) {
rspamd_sqlite3_run_prstmt (bk, RSPAMD_STAT_BACKEND_TRANSACTION_START_DEF);
bk->in_transaction = TRUE;
}

memcpy (&idx, tok->data, sizeof (idx));

/* TODO: language and user support */
@@ -559,6 +633,24 @@ rspamd_sqlite3_process_token (struct rspamd_task *task, struct token_node_s *tok
return TRUE;
}

void
rspamd_sqlite3_finalize_process (struct rspamd_task *task, gpointer runtime,
gpointer ctx)
{
struct rspamd_stat_sqlite3_rt *rt = runtime;
struct rspamd_stat_sqlite3_db *bk;

g_assert (rt != NULL);
bk = rt->db;

if (bk->in_transaction) {
rspamd_sqlite3_run_prstmt (bk, RSPAMD_STAT_BACKEND_TRANSACTION_COMMIT);
bk->in_transaction = FALSE;
}

return;
}

gboolean
rspamd_sqlite3_learn_token (struct rspamd_task *task, struct token_node_s *tok,
struct rspamd_token_result *res, gpointer p)
@@ -582,7 +674,7 @@ rspamd_sqlite3_learn_token (struct rspamd_task *task, struct token_node_s *tok,
}

if (!bk->in_transaction) {
rspamd_sqlite3_run_prstmt (bk, RSPAMD_STAT_BACKEND_TRANSACTION_START);
rspamd_sqlite3_run_prstmt (bk, RSPAMD_STAT_BACKEND_TRANSACTION_START_IM);
bk->in_transaction = TRUE;
}


+ 1
- 0
src/libstat/stat_config.c View File

@@ -60,6 +60,7 @@ static struct rspamd_stat_tokenizer stat_tokenizers[] = {
.init = rspamd_##eltn##_init, \
.runtime = rspamd_##eltn##_runtime, \
.process_token = rspamd_##eltn##_process_token, \
.finalize_process = rspamd_##eltn##_finalize_process, \
.learn_token = rspamd_##eltn##_learn_token, \
.finalize_learn = rspamd_##eltn##_finalize_learn, \
.total_learns = rspamd_##eltn##_total_learns, \

+ 12
- 1
src/libstat/stat_process.c View File

@@ -356,11 +356,12 @@ rspamd_stat_classify (struct rspamd_task *task, lua_State *L, GError **err)
struct rspamd_stat_classifier *cls;
struct rspamd_classifier_config *clcf;
struct rspamd_stat_ctx *st_ctx;
struct rspamd_statfile_runtime *st_run;
struct rspamd_tokenizer_runtime *tklist = NULL, *tok;
struct rspamd_classifier_runtime *cl_run;
struct classifier_ctx *cl_ctx;
GList *cl_runtimes;
GList *cur;
GList *cur, *curst;
gboolean ret = RSPAMD_STAT_PROCESS_ERROR, compat = TRUE;
const ucl_object_t *obj;

@@ -440,6 +441,16 @@ rspamd_stat_classify (struct rspamd_task *task, lua_State *L, GError **err)
}
}

curst = cl_run->st_runtime;

while (curst) {
st_run = curst->data;
st_run->backend->finalize_learn (task,
st_run->backend_runtime,
st_run->backend->ctx);
curst = g_list_next (curst);
}

cur = g_list_next (cur);
}


Loading…
Cancel
Save