diff options
-rw-r--r-- | src/libstat/backends/backends.h | 5 | ||||
-rw-r--r-- | src/libstat/backends/mmaped_file.c | 6 | ||||
-rw-r--r-- | src/libstat/backends/sqlite3_backend.c | 104 | ||||
-rw-r--r-- | src/libstat/stat_config.c | 1 | ||||
-rw-r--r-- | src/libstat/stat_process.c | 13 |
5 files changed, 122 insertions, 7 deletions
diff --git a/src/libstat/backends/backends.h b/src/libstat/backends/backends.h index e40bb6c9c..55d3535f7 100644 --- a/src/libstat/backends/backends.h +++ b/src/libstat/backends/backends.h @@ -47,6 +47,8 @@ struct rspamd_stat_backend { struct rspamd_statfile_config *stcf, gboolean learn, gpointer ctx); gboolean (*process_token)(struct rspamd_task *task, struct token_node_s *tok, struct rspamd_token_result *res, gpointer ctx); + void (*finalize_process)(struct rspamd_task *task, + gpointer runtime, gpointer ctx); gboolean (*learn_token)(struct rspamd_task *task, struct token_node_s *tok, struct rspamd_token_result *res, gpointer ctx); gulong (*total_learns)(struct rspamd_task *task, @@ -71,6 +73,9 @@ struct rspamd_stat_backend { struct token_node_s *tok, \ struct rspamd_token_result *res, \ gpointer ctx); \ + void rspamd_##name##_finalize_process (struct rspamd_task *task, \ + gpointer runtime, \ + gpointer ctx); \ gboolean rspamd_##name##_learn_token (struct rspamd_task *task, \ struct token_node_s *tok, \ struct rspamd_token_result *res, \ diff --git a/src/libstat/backends/mmaped_file.c b/src/libstat/backends/mmaped_file.c index b7e5650d3..fb3c4fd43 100644 --- a/src/libstat/backends/mmaped_file.c +++ b/src/libstat/backends/mmaped_file.c @@ -1089,3 +1089,9 @@ rspamd_mmaped_file_finalize_learn (struct rspamd_task *task, gpointer runtime, msync (mf->map, mf->len, MS_INVALIDATE | MS_ASYNC); } } + +void +rspamd_mmaped_file_finalize_process (struct rspamd_task *task, gpointer runtime, + gpointer ctx) +{ +} diff --git a/src/libstat/backends/sqlite3_backend.c b/src/libstat/backends/sqlite3_backend.c index 31b593c64..836886e1f 100644 --- a/src/libstat/backends/sqlite3_backend.c +++ b/src/libstat/backends/sqlite3_backend.c @@ -81,7 +81,8 @@ static const char *create_tables_sql = "COMMIT;"; enum rspamd_stat_sqlite3_stmt_idx { - RSPAMD_STAT_BACKEND_TRANSACTION_START = 0, + RSPAMD_STAT_BACKEND_TRANSACTION_START_IM = 0, + RSPAMD_STAT_BACKEND_TRANSACTION_START_DEF, RSPAMD_STAT_BACKEND_TRANSACTION_COMMIT, RSPAMD_STAT_BACKEND_TRANSACTION_ROLLBACK, RSPAMD_STAT_BACKEND_GET_TOKEN, @@ -102,7 +103,7 @@ static struct rspamd_sqlite3_prstmt { } prepared_stmts[RSPAMD_STAT_BACKEND_MAX] = { { - .idx = RSPAMD_STAT_BACKEND_TRANSACTION_START, + .idx = RSPAMD_STAT_BACKEND_TRANSACTION_START_IM, .sql = "BEGIN IMMEDIATE TRANSACTION;", .args = "", .stmt = NULL, @@ -110,6 +111,14 @@ static struct rspamd_sqlite3_prstmt { .ret = "" }, { + .idx = RSPAMD_STAT_BACKEND_TRANSACTION_START_DEF, + .sql = "BEGIN DEFERRED TRANSACTION;", + .args = "", + .stmt = NULL, + .result = SQLITE_DONE, + .ret = "" + }, + { .idx = RSPAMD_STAT_BACKEND_TRANSACTION_COMMIT, .sql = "COMMIT;", .args = "", @@ -313,6 +322,39 @@ rspamd_sqlite3_close_prstmt (struct rspamd_stat_sqlite3_db *db) return; } +static gboolean +rspamd_sqlite3_wait (const gchar *lock) +{ + gint fd; + struct timespec sleep_ts = { + .tv_sec = 0, + .tv_nsec = 1000000 + }; + + fd = open (lock, O_RDONLY); + + if (fd == -1) { + msg_err ("cannot open lock file %s: %s", lock, strerror (errno)); + + return FALSE; + } + + while (!rspamd_file_lock (fd, TRUE)) { + if (nanosleep (&sleep_ts, NULL) == -1 && errno != EINTR) { + close (fd); + msg_err ("cannot sleep open lock file %s: %s", lock, strerror (errno)); + + return FALSE; + } + } + + rspamd_file_unlock (fd, FALSE); + + close (fd); + + return TRUE; +} + static struct rspamd_stat_sqlite3_db * rspamd_sqlite3_opendb (const gchar *path, const ucl_object_t *opts, gboolean create, GError **err) @@ -320,15 +362,35 @@ rspamd_sqlite3_opendb (const gchar *path, const ucl_object_t *opts, struct rspamd_stat_sqlite3_db *bk; sqlite3 *sqlite; sqlite3_stmt *stmt; - gint rc, flags; - static const char sqlite_wal[] = "PRAGMA journal_mode=WAL;", - fallback_journal[] = "PRAGMA journal_mode=OFF;", + gint rc, flags, lock_fd; + gchar lock_path[PATH_MAX]; + static const char sqlite_wal[] = "PRAGMA journal_mode=\"wal\";", + fallback_journal[] = "PRAGMA journal_mode=\"off\";", user_version[] = "PRAGMA user_version;"; flags = SQLITE_OPEN_READWRITE; if (create) { flags |= SQLITE_OPEN_CREATE; + + rspamd_snprintf (lock_path, sizeof (lock_path), "%s.lock", path); + lock_fd = open (lock_path, O_WRONLY|O_CREAT|O_EXCL, 00600); + + if (lock_fd == -1 && (errno == EEXIST || errno == EBUSY)) { + if (!rspamd_sqlite3_wait (lock_path)) { + g_set_error (err, rspamd_sqlite3_quark (), + errno, "cannot create sqlite file %s: %s", + path, strerror (errno)); + + return NULL; + } + + /* At this point we have database created */ + create = FALSE; + } + else { + g_assert (rspamd_file_lock (lock_fd, FALSE)); + } } else if (access (path, R_OK) == -1) { g_set_error (err, rspamd_sqlite3_quark (), @@ -378,9 +440,16 @@ rspamd_sqlite3_opendb (const gchar *path, const ucl_object_t *opts, -1, "cannot execute create sql `%s`: %s", create_tables_sql, sqlite3_errmsg (sqlite)); sqlite3_close (sqlite); + rspamd_file_unlock (lock_fd, FALSE); + unlink (lock_path); + close (lock_fd); return NULL; } + + rspamd_file_unlock (lock_fd, FALSE); + unlink (lock_path); + close (lock_fd); } bk = g_slice_alloc0 (sizeof (*bk)); @@ -538,6 +607,11 @@ rspamd_sqlite3_process_token (struct rspamd_task *task, struct token_node_s *tok return FALSE; } + if (!bk->in_transaction) { + rspamd_sqlite3_run_prstmt (bk, RSPAMD_STAT_BACKEND_TRANSACTION_START_DEF); + bk->in_transaction = TRUE; + } + memcpy (&idx, tok->data, sizeof (idx)); /* TODO: language and user support */ @@ -559,6 +633,24 @@ rspamd_sqlite3_process_token (struct rspamd_task *task, struct token_node_s *tok return TRUE; } +void +rspamd_sqlite3_finalize_process (struct rspamd_task *task, gpointer runtime, + gpointer ctx) +{ + struct rspamd_stat_sqlite3_rt *rt = runtime; + struct rspamd_stat_sqlite3_db *bk; + + g_assert (rt != NULL); + bk = rt->db; + + if (bk->in_transaction) { + rspamd_sqlite3_run_prstmt (bk, RSPAMD_STAT_BACKEND_TRANSACTION_COMMIT); + bk->in_transaction = FALSE; + } + + return; +} + gboolean rspamd_sqlite3_learn_token (struct rspamd_task *task, struct token_node_s *tok, struct rspamd_token_result *res, gpointer p) @@ -582,7 +674,7 @@ rspamd_sqlite3_learn_token (struct rspamd_task *task, struct token_node_s *tok, } if (!bk->in_transaction) { - rspamd_sqlite3_run_prstmt (bk, RSPAMD_STAT_BACKEND_TRANSACTION_START); + rspamd_sqlite3_run_prstmt (bk, RSPAMD_STAT_BACKEND_TRANSACTION_START_IM); bk->in_transaction = TRUE; } diff --git a/src/libstat/stat_config.c b/src/libstat/stat_config.c index c75b02fe0..6f27d1c10 100644 --- a/src/libstat/stat_config.c +++ b/src/libstat/stat_config.c @@ -60,6 +60,7 @@ static struct rspamd_stat_tokenizer stat_tokenizers[] = { .init = rspamd_##eltn##_init, \ .runtime = rspamd_##eltn##_runtime, \ .process_token = rspamd_##eltn##_process_token, \ + .finalize_process = rspamd_##eltn##_finalize_process, \ .learn_token = rspamd_##eltn##_learn_token, \ .finalize_learn = rspamd_##eltn##_finalize_learn, \ .total_learns = rspamd_##eltn##_total_learns, \ diff --git a/src/libstat/stat_process.c b/src/libstat/stat_process.c index 978264ac4..9c261eccd 100644 --- a/src/libstat/stat_process.c +++ b/src/libstat/stat_process.c @@ -356,11 +356,12 @@ rspamd_stat_classify (struct rspamd_task *task, lua_State *L, GError **err) struct rspamd_stat_classifier *cls; struct rspamd_classifier_config *clcf; struct rspamd_stat_ctx *st_ctx; + struct rspamd_statfile_runtime *st_run; struct rspamd_tokenizer_runtime *tklist = NULL, *tok; struct rspamd_classifier_runtime *cl_run; struct classifier_ctx *cl_ctx; GList *cl_runtimes; - GList *cur; + GList *cur, *curst; gboolean ret = RSPAMD_STAT_PROCESS_ERROR, compat = TRUE; const ucl_object_t *obj; @@ -440,6 +441,16 @@ rspamd_stat_classify (struct rspamd_task *task, lua_State *L, GError **err) } } + curst = cl_run->st_runtime; + + while (curst) { + st_run = curst->data; + st_run->backend->finalize_learn (task, + st_run->backend_runtime, + st_run->backend->ctx); + curst = g_list_next (curst); + } + cur = g_list_next (cur); } |