aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorVsevolod Stakhov <vsevolod@highsecure.ru>2016-09-02 14:33:49 +0100
committerVsevolod Stakhov <vsevolod@highsecure.ru>2016-09-02 14:33:49 +0100
commit7a93b2c7c4a2181c00cf3cf6ef24ebe76e7bb0b8 (patch)
treefbd58dc5c67fd7ecb2c20ec2d4f292f22f3200fe
parent2d90a61b3a70bc75735575435276c9226c7f2c0f (diff)
parente8abb0bd7ccd7412402e6ecd44c0232d2d18e2a5 (diff)
downloadrspamd-7a93b2c7c4a2181c00cf3cf6ef24ebe76e7bb0b8.tar.gz
rspamd-7a93b2c7c4a2181c00cf3cf6ef24ebe76e7bb0b8.zip
Merge branch 'fuzzy-backend-rework'
-rw-r--r--src/controller.c2
-rw-r--r--src/fuzzy_storage.c472
-rw-r--r--src/libserver/CMakeLists.txt1
-rw-r--r--src/libserver/fuzzy_backend.c1196
-rw-r--r--src/libserver/fuzzy_backend.h102
-rw-r--r--src/libserver/fuzzy_backend_sqlite.c1055
-rw-r--r--src/libserver/fuzzy_backend_sqlite.h98
-rw-r--r--src/libserver/fuzzy_wire.h (renamed from src/fuzzy_storage.h)8
-rw-r--r--src/plugins/fuzzy_check.c2
-rw-r--r--src/rspamd.c1
10 files changed, 1747 insertions, 1190 deletions
diff --git a/src/controller.c b/src/controller.c
index 2b19a7dd7..5d001fbb6 100644
--- a/src/controller.c
+++ b/src/controller.c
@@ -24,7 +24,7 @@
#include "libserver/worker_util.h"
#include "cryptobox.h"
#include "ottery.h"
-#include "fuzzy_storage.h"
+#include "fuzzy_wire.h"
#include "libutil/rrd.h"
#include "unix-std.h"
#include "utlist.h"
diff --git a/src/fuzzy_storage.c b/src/fuzzy_storage.c
index e321aa0b6..d663a89e4 100644
--- a/src/fuzzy_storage.c
+++ b/src/fuzzy_storage.c
@@ -22,7 +22,7 @@
#include "util.h"
#include "rspamd.h"
#include "map.h"
-#include "fuzzy_storage.h"
+#include "fuzzy_wire.h"
#include "fuzzy_backend.h"
#include "ottery.h"
#include "libserver/worker_util.h"
@@ -36,15 +36,11 @@
#include "libutil/http_private.h"
#include "unix-std.h"
-/* This number is used as expire time in seconds for cache items (2 days) */
-#define DEFAULT_EXPIRE 172800L
/* Resync value in seconds */
#define DEFAULT_SYNC_TIMEOUT 60.0
#define DEFAULT_KEYPAIR_CACHE_SIZE 512
#define DEFAULT_MASTER_TIMEOUT 10.0
-#define INVALID_NODE_TIME (guint64) - 1
-
static const gchar *local_db_name = "local";
#define msg_err_fuzzy_update(...) rspamd_default_log_function (G_LOG_LEVEL_CRITICAL, \
@@ -77,10 +73,6 @@ worker_t fuzzy_worker = {
RSPAMD_WORKER_VER /* Version info */
};
-/* For evtimer */
-static struct timeval tmv;
-static struct event tev;
-
struct fuzzy_global_stat {
guint64 fuzzy_hashes;
/**< number of fuzzy hashes stored */
@@ -165,6 +157,7 @@ struct fuzzy_session {
} cmd;
struct rspamd_fuzzy_encrypted_reply reply;
+ struct fuzzy_key_stat *ip_stat;
enum rspamd_fuzzy_epoch epoch;
enum fuzzy_cmd_type cmd_type;
@@ -176,14 +169,6 @@ struct fuzzy_session {
guchar nm[rspamd_cryptobox_MAX_NMBYTES];
};
-struct fuzzy_peer_cmd {
- gboolean is_shingle;
- union {
- struct rspamd_fuzzy_cmd normal;
- struct rspamd_fuzzy_shingle_cmd shingle;
- } cmd;
-};
-
struct fuzzy_peer_request {
struct event io_ev;
struct fuzzy_peer_cmd cmd;
@@ -199,7 +184,10 @@ struct fuzzy_master_update_session {
const gchar *name;
gchar uid[16];
struct rspamd_http_connection *conn;
+ struct rspamd_http_message *msg;
struct rspamd_fuzzy_storage_ctx *ctx;
+ const gchar *src;
+ gchar *psrc;
rspamd_inet_addr_t *addr;
gboolean replied;
gint sock;
@@ -250,6 +238,14 @@ fuzzy_key_dtor (gpointer p)
g_slice_free1 (sizeof (*key), key);
}
+static void
+fuzzy_count_callback (guint64 count, void *ud)
+{
+ struct rspamd_fuzzy_storage_ctx *ctx = ud;
+
+ ctx->stat.fuzzy_hashes = count;
+}
+
struct fuzzy_slave_connection {
struct rspamd_cryptobox_keypair *local_key;
struct rspamd_cryptobox_pubkey *remote_key;
@@ -274,19 +270,34 @@ fuzzy_mirror_close_connection (struct fuzzy_slave_connection *conn)
}
}
+struct rspamd_fuzzy_updates_cbdata {
+ struct rspamd_fuzzy_storage_ctx *ctx;
+ struct rspamd_http_message *msg;
+ struct fuzzy_slave_connection *conn;
+ struct rspamd_fuzzy_mirror *m;
+};
+
static void
-fuzzy_mirror_updates_to_http (struct rspamd_fuzzy_storage_ctx *ctx,
- struct rspamd_http_message *msg)
+fuzzy_mirror_updates_version_cb (guint64 rev64, void *ud)
{
+ struct rspamd_fuzzy_updates_cbdata *cbdata = ud;
GList *cur;
struct fuzzy_peer_cmd *io_cmd;
- guint32 len;
- guint32 rev;
+ guint32 rev32 = rev64, len;
const gchar *p;
rspamd_fstring_t *reply;
+ struct fuzzy_slave_connection *conn;
+ struct rspamd_fuzzy_storage_ctx *ctx;
+ struct rspamd_http_message *msg;
+ struct rspamd_fuzzy_mirror *m;
+ struct timeval tv;
- rev = rspamd_fuzzy_backend_version (ctx->backend, local_db_name);
- rev = GUINT32_TO_LE (rev);
+ conn = cbdata->conn;
+ ctx = cbdata->ctx;
+ msg = cbdata->msg;
+ m = cbdata->m;
+ g_slice_free1 (sizeof (*cbdata), cbdata);
+ rev32 = GUINT32_TO_LE (rev32);
len = sizeof (guint32) * 2; /* revision + last chunk */
for (cur = ctx->updates_pending->head; cur != NULL; cur = g_list_next (cur)) {
@@ -303,8 +314,8 @@ fuzzy_mirror_updates_to_http (struct rspamd_fuzzy_storage_ctx *ctx,
}
reply = rspamd_fstring_sized_new (len);
- reply = rspamd_fstring_append (reply, (const char *)&rev,
- sizeof (rev));
+ reply = rspamd_fstring_append (reply, (const char *)&rev32,
+ sizeof (rev32));
for (cur = ctx->updates_pending->head; cur != NULL; cur = g_list_next (cur)) {
io_cmd = cur->data;
@@ -328,6 +339,30 @@ fuzzy_mirror_updates_to_http (struct rspamd_fuzzy_storage_ctx *ctx,
len = 0;
reply = rspamd_fstring_append (reply, (const char *)&len, sizeof (len));
rspamd_http_message_set_body_from_fstring_steal (msg, reply);
+ double_to_tv (ctx->sync_timeout, &tv);
+ rspamd_http_connection_write_message (conn->http_conn,
+ msg, NULL, NULL, conn,
+ conn->sock,
+ &tv, ctx->ev_base);
+ msg_info ("send update request to %s", m->name);
+}
+
+static void
+fuzzy_mirror_updates_to_http (struct rspamd_fuzzy_mirror *m,
+ struct fuzzy_slave_connection *conn,
+ struct rspamd_fuzzy_storage_ctx *ctx,
+ struct rspamd_http_message *msg)
+{
+
+ struct rspamd_fuzzy_updates_cbdata *cbdata;
+
+ cbdata = g_slice_alloc (sizeof (*cbdata));
+ cbdata->ctx = ctx;
+ cbdata->msg = msg;
+ cbdata->conn = conn;
+ cbdata->m = m;
+ rspamd_fuzzy_backend_version (ctx->backend, local_db_name,
+ fuzzy_mirror_updates_version_cb, cbdata);
}
static void
@@ -361,7 +396,6 @@ rspamd_fuzzy_send_update_mirror (struct rspamd_fuzzy_storage_ctx *ctx,
{
struct fuzzy_slave_connection *conn;
struct rspamd_http_message *msg;
- struct timeval tv;
conn = g_slice_alloc0 (sizeof (*conn));
conn->up = rspamd_upstream_get (m->u,
@@ -397,84 +431,85 @@ rspamd_fuzzy_send_update_mirror (struct rspamd_fuzzy_storage_ctx *ctx,
rspamd_http_connection_set_key (conn->http_conn,
ctx->sync_keypair);
msg->peer_key = rspamd_pubkey_ref (m->key);
- double_to_tv (ctx->sync_timeout, &tv);
- fuzzy_mirror_updates_to_http (ctx, msg);
+ fuzzy_mirror_updates_to_http (m, conn, ctx, msg);
+}
- rspamd_http_connection_write_message (conn->http_conn,
- msg, NULL, NULL, conn,
- conn->sock,
- &tv, ctx->ev_base);
- msg_info ("send update request to %s", m->name);
+struct rspamd_updates_cbdata {
+ struct rspamd_fuzzy_storage_ctx *ctx;
+ const gchar *source;
+};
+
+static void
+fuzzy_update_version_callback (guint64 ver, void *ud)
+{
+ msg_info ("updated fuzzy storage from %s: version: %d",
+ (const char *)ud, (gint)ver);
}
static void
-rspamd_fuzzy_process_updates_queue (struct rspamd_fuzzy_storage_ctx *ctx,
- const gchar *source)
+rspamd_fuzzy_updates_cb (gboolean success, void *ud)
{
- GList *cur;
- struct fuzzy_peer_cmd *io_cmd;
- struct rspamd_fuzzy_cmd *cmd;
- gpointer ptr;
+ struct rspamd_updates_cbdata *cbdata = ud;
struct rspamd_fuzzy_mirror *m;
guint nupdates = 0, i;
+ struct rspamd_fuzzy_storage_ctx *ctx;
+ const gchar *source;
+ GList *cur;
+ struct fuzzy_peer_cmd *io_cmd;
- if (ctx->updates_pending &&
- g_queue_get_length (ctx->updates_pending) > 0 &&
- rspamd_fuzzy_backend_prepare_update (ctx->backend, source)) {
- cur = ctx->updates_pending->head;
+ ctx = cbdata->ctx;
+ source = cbdata->source;
- while (cur) {
- io_cmd = cur->data;
+ if (success) {
+ rspamd_fuzzy_backend_count (ctx->backend, fuzzy_count_callback, ctx);
- if (io_cmd->is_shingle) {
- cmd = &io_cmd->cmd.shingle.basic;
- ptr = &io_cmd->cmd.shingle;
- }
- else {
- cmd = &io_cmd->cmd.normal;
- ptr = &io_cmd->cmd.normal;
- }
+ if (nupdates > 0) {
+ for (i = 0; i < ctx->mirrors->len; i ++) {
+ m = g_ptr_array_index (ctx->mirrors, i);
- if (cmd->cmd == FUZZY_WRITE) {
- rspamd_fuzzy_backend_add (ctx->backend, ptr);
- }
- else {
- rspamd_fuzzy_backend_del (ctx->backend, ptr);
+ rspamd_fuzzy_send_update_mirror (ctx, m);
}
+ }
- nupdates++;
+ /* Clear updates */
+ cur = ctx->updates_pending->head;
+
+ while (cur) {
+ io_cmd = cur->data;
+ g_slice_free1 (sizeof (*io_cmd), io_cmd);
cur = g_list_next (cur);
}
- if (rspamd_fuzzy_backend_finish_update (ctx->backend, source, nupdates > 0)) {
- ctx->stat.fuzzy_hashes = rspamd_fuzzy_backend_count (ctx->backend);
+ g_queue_clear (ctx->updates_pending);
+ rspamd_fuzzy_backend_version (ctx->backend, source,
+ fuzzy_update_version_callback, (void *)source);
- if (nupdates > 0) {
- for (i = 0; i < ctx->mirrors->len; i ++) {
- m = g_ptr_array_index (ctx->mirrors, i);
+ }
+ else {
+ msg_err ("cannot commit update transaction to fuzzy backend, "
+ "%ud updates are still pending",
+ g_queue_get_length (ctx->updates_pending));
+ }
- rspamd_fuzzy_send_update_mirror (ctx, m);
- }
- }
+ g_slice_free1 (sizeof (*cbdata), cbdata);
+}
- /* Clear updates */
- cur = ctx->updates_pending->head;
+static void
+rspamd_fuzzy_process_updates_queue (struct rspamd_fuzzy_storage_ctx *ctx,
+ const gchar *source)
+{
+
+ struct rspamd_updates_cbdata *cbdata;
+
+ if (ctx->updates_pending &&
+ g_queue_get_length (ctx->updates_pending) > 0) {
+ cbdata = g_slice_alloc (sizeof (*cbdata));
+ cbdata->ctx = ctx;
+ cbdata->source = source;
+ rspamd_fuzzy_backend_process_updates (ctx->backend, ctx->updates_pending,
+ source, rspamd_fuzzy_updates_cb, cbdata);
- while (cur) {
- io_cmd = cur->data;
- g_slice_free1 (sizeof (*io_cmd), io_cmd);
- cur = g_list_next (cur);
- }
- g_queue_clear (ctx->updates_pending);
- msg_info ("updated fuzzy storage: %ud updates processed, version: %d",
- nupdates, rspamd_fuzzy_backend_version (ctx->backend, source));
- }
- else {
- msg_err ("cannot commit update transaction to fuzzy backend, "
- "%ud updates are still pending",
- g_queue_get_length (ctx->updates_pending));
- }
}
else if (ctx->updates_pending &&
g_queue_get_length (ctx->updates_pending) > 0) {
@@ -607,6 +642,77 @@ rspamd_fuzzy_update_stats (struct rspamd_fuzzy_storage_ctx *ctx,
}
static void
+rspamd_fuzzy_make_reply (struct rspamd_fuzzy_cmd *cmd,
+ struct rspamd_fuzzy_reply *result,
+ struct fuzzy_session *session,
+ gboolean encrypted, gboolean is_shingle)
+{
+ if (cmd) {
+ result->tag = cmd->tag;
+
+ memcpy (&session->reply.rep, result, sizeof (*result));
+
+ rspamd_fuzzy_update_stats (session->ctx,
+ session->epoch,
+ result->prob > 0.5,
+ is_shingle,
+ session->key_stat,
+ session->ip_stat,
+ cmd->cmd,
+ result->value);
+
+ if (encrypted) {
+ /* We need also to encrypt reply */
+ ottery_rand_bytes (session->reply.hdr.nonce,
+ sizeof (session->reply.hdr.nonce));
+ rspamd_cryptobox_encrypt_nm_inplace ((guchar *)&session->reply.rep,
+ sizeof (session->reply.rep),
+ session->reply.hdr.nonce,
+ session->nm,
+ session->reply.hdr.mac,
+ RSPAMD_CRYPTOBOX_MODE_25519);
+ }
+ }
+
+ rspamd_fuzzy_write_reply (session);
+}
+
+static void
+rspamd_fuzzy_check_callback (struct rspamd_fuzzy_reply *result, void *ud)
+{
+ struct fuzzy_session *session = ud;
+ gboolean encrypted = FALSE, is_shingle = FALSE;
+ struct rspamd_fuzzy_cmd *cmd = NULL;
+ gsize up_len = 0;
+
+ switch (session->cmd_type) {
+ case CMD_NORMAL:
+ cmd = &session->cmd.normal;
+ up_len = sizeof (session->cmd.normal);
+ break;
+ case CMD_SHINGLE:
+ cmd = &session->cmd.shingle.basic;
+ up_len = sizeof (session->cmd.shingle);
+ is_shingle = TRUE;
+ break;
+ case CMD_ENCRYPTED_NORMAL:
+ cmd = &session->cmd.enc_normal.cmd;
+ up_len = sizeof (session->cmd.normal);
+ encrypted = TRUE;
+ break;
+ case CMD_ENCRYPTED_SHINGLE:
+ cmd = &session->cmd.enc_shingle.cmd.basic;
+ up_len = sizeof (session->cmd.shingle);
+ encrypted = TRUE;
+ is_shingle = TRUE;
+ break;
+ }
+
+ rspamd_fuzzy_make_reply (cmd, result, session, encrypted, is_shingle);
+ REF_RELEASE (session);
+}
+
+static void
rspamd_fuzzy_process_command (struct fuzzy_session *session)
{
gboolean encrypted = FALSE, is_shingle = FALSE;
@@ -645,14 +751,16 @@ rspamd_fuzzy_process_command (struct fuzzy_session *session)
if (G_UNLIKELY (cmd == NULL || up_len == 0)) {
result.value = 500;
result.prob = 0.0;
- goto reply;
+ rspamd_fuzzy_make_reply (cmd, &result, session, encrypted, is_shingle);
+ return;
}
if (session->ctx->encrypted_only && !encrypted) {
/* Do not accept unencrypted commands */
result.value = 403;
result.prob = 0.0;
- goto reply;
+ rspamd_fuzzy_make_reply (cmd, &result, session, encrypted, is_shingle);
+ return;
}
if (session->key_stat) {
@@ -665,17 +773,21 @@ rspamd_fuzzy_process_command (struct fuzzy_session *session)
rspamd_lru_hash_insert (session->key_stat->last_ips,
naddr, ip_stat, -1, 0);
}
+
+ session->ip_stat = ip_stat;
}
result.flag = cmd->flag;
if (cmd->cmd == FUZZY_CHECK) {
- result = rspamd_fuzzy_backend_check (session->ctx->backend, cmd,
- session->ctx->expire);
+ REF_RETAIN (session);
+ rspamd_fuzzy_backend_check (session->ctx->backend, cmd,
+ rspamd_fuzzy_check_callback, session);
}
else if (cmd->cmd == FUZZY_STAT) {
result.prob = 1.0;
result.value = 0;
- result.flag = rspamd_fuzzy_backend_count (session->ctx->backend);
+ result.flag = session->ctx->stat.fuzzy_hashes;
+ rspamd_fuzzy_make_reply (cmd, &result, session, encrypted, is_shingle);
}
else {
if (rspamd_fuzzy_check_client (session)) {
@@ -711,36 +823,9 @@ rspamd_fuzzy_process_command (struct fuzzy_session *session)
result.value = 403;
result.prob = 0.0;
}
- }
-reply:
- if (cmd) {
- result.tag = cmd->tag;
-
- memcpy (&session->reply.rep, &result, sizeof (result));
-
- rspamd_fuzzy_update_stats (session->ctx,
- session->epoch,
- result.prob > 0.5,
- is_shingle,
- session->key_stat,
- ip_stat, cmd->cmd,
- result.value);
-
- if (encrypted) {
- /* We need also to encrypt reply */
- ottery_rand_bytes (session->reply.hdr.nonce,
- sizeof (session->reply.hdr.nonce));
- rspamd_cryptobox_encrypt_nm_inplace ((guchar *)&session->reply.rep,
- sizeof (session->reply.rep),
- session->reply.hdr.nonce,
- session->nm,
- session->reply.hdr.mac,
- RSPAMD_CRYPTOBOX_MODE_25519);
- }
+ rspamd_fuzzy_make_reply (cmd, &result, session, encrypted, is_shingle);
}
-
- rspamd_fuzzy_write_reply (session);
}
@@ -924,12 +1009,11 @@ rspamd_fuzzy_cmd_from_wire (guchar *buf, guint buflen, struct fuzzy_session *s)
static void
rspamd_fuzzy_mirror_process_update (struct fuzzy_master_update_session *session,
- struct rspamd_http_message *msg)
+ struct rspamd_http_message *msg, guint our_rev)
{
const guchar *p;
- gchar *src = NULL, *psrc;
gsize remain;
- gint32 revision, our_rev;
+ gint32 revision;
guint32 len = 0, cnt = 0;
struct fuzzy_peer_cmd cmd, *pcmd;
enum {
@@ -940,25 +1024,6 @@ rspamd_fuzzy_mirror_process_update (struct fuzzy_master_update_session *session,
GList *updates = NULL, *cur;
gpointer flag_ptr;
- if (!rspamd_http_message_get_body (msg, NULL) || !msg->url
- || msg->url->len == 0) {
- msg_err_fuzzy_update ("empty update message, not processing");
-
- return;
- }
-
- /* Detect source from url: /update_v1/<source>, so we look for the last '/' */
- remain = msg->url->len;
- psrc = rspamd_fstringdup (msg->url);
- src = psrc;
-
- while (remain--) {
- if (src[remain] == '/') {
- src = &src[remain + 1];
- break;
- }
- }
-
/*
* Message format:
* <uint32_le> - revision
@@ -973,13 +1038,11 @@ rspamd_fuzzy_mirror_process_update (struct fuzzy_master_update_session *session,
if (remain > sizeof (gint32) * 2) {
memcpy (&revision, p, sizeof (gint32));
revision = GINT32_TO_LE (revision);
- our_rev = rspamd_fuzzy_backend_version (session->ctx->backend, src);
if (revision <= our_rev) {
msg_err_fuzzy_update ("remote revision: %d is older than ours: %d, "
"refusing update",
revision, our_rev);
- g_free (psrc);
return;
}
@@ -1084,7 +1147,7 @@ rspamd_fuzzy_mirror_process_update (struct fuzzy_master_update_session *session,
cur->data = NULL;
}
- rspamd_fuzzy_process_updates_queue (session->ctx, src);
+ rspamd_fuzzy_process_updates_queue (session->ctx, session->src);
msg_info_fuzzy_update ("processed updates from the master %s, "
"%ud operations processed,"
" revision: %d (local revision: %d)",
@@ -1092,8 +1155,6 @@ rspamd_fuzzy_mirror_process_update (struct fuzzy_master_update_session *session,
cnt, revision, our_rev);
err:
- g_free (psrc);
-
if (updates) {
/* We still need to clear queue */
for (cur = updates; cur != NULL; cur = g_list_next (cur)) {
@@ -1127,6 +1188,10 @@ rspamd_fuzzy_mirror_session_destroy (struct fuzzy_master_update_session *session
rspamd_http_connection_unref (session->conn);
rspamd_inet_address_destroy (session->addr);
close (session->sock);
+
+ if (session->psrc) {
+ g_free (session->psrc);
+ }
g_slice_free1 (sizeof (*session), session);
}
}
@@ -1159,6 +1224,15 @@ rspamd_fuzzy_mirror_send_reply (struct fuzzy_master_update_session *session,
session->ctx->ev_base);
}
+static void
+rspamd_fuzzy_update_version_callback (guint64 version, void *ud)
+{
+ struct fuzzy_master_update_session *session = ud;
+
+ rspamd_fuzzy_mirror_process_update (session, session->msg, version);
+ rspamd_fuzzy_mirror_send_reply (session, 200, "OK");
+}
+
static gint
rspamd_fuzzy_mirror_finish_handler (struct rspamd_http_connection *conn,
struct rspamd_http_message *msg)
@@ -1166,6 +1240,9 @@ rspamd_fuzzy_mirror_finish_handler (struct rspamd_http_connection *conn,
struct fuzzy_master_update_session *session = conn->ud;
const struct rspamd_cryptobox_pubkey *rk;
const gchar *err_str = NULL;
+ gchar *psrc;
+ const gchar *src = NULL;
+ gsize remain;
if (session->replied) {
rspamd_fuzzy_mirror_session_destroy (session);
@@ -1197,9 +1274,30 @@ rspamd_fuzzy_mirror_finish_handler (struct rspamd_http_connection *conn,
msg_warn_fuzzy_update ("no trusted key specified, accept any update from %s",
rspamd_inet_address_to_string (session->addr));
}
+ if (!rspamd_http_message_get_body (msg, NULL) || !msg->url
+ || msg->url->len == 0) {
+ msg_err_fuzzy_update ("empty update message, not processing");
+
+ goto end;
+ }
- rspamd_fuzzy_mirror_process_update (session, msg);
- rspamd_fuzzy_mirror_send_reply (session, 200, "OK");
+ /* Detect source from url: /update_v1/<source>, so we look for the last '/' */
+ remain = msg->url->len;
+ psrc = rspamd_fstringdup (msg->url);
+ src = psrc;
+
+ while (remain--) {
+ if (src[remain] == '/') {
+ src = &src[remain + 1];
+ break;
+ }
+ }
+
+ session->src = src;
+ session->psrc = psrc;
+ session->msg = msg;
+ rspamd_fuzzy_backend_version (session->ctx->backend, src,
+ rspamd_fuzzy_update_version_callback, session);
return 0;
}
@@ -1360,38 +1458,6 @@ accept_fuzzy_socket (gint fd, short what, void *arg)
}
}
-static void
-sync_callback (gint fd, short what, void *arg)
-{
- struct rspamd_worker *worker = (struct rspamd_worker *)arg;
- struct rspamd_fuzzy_storage_ctx *ctx;
- gdouble next_check;
- guint64 old_expired, new_expired;
-
- ctx = worker->ctx;
-
- if (ctx->backend) {
- rspamd_fuzzy_process_updates_queue (ctx, local_db_name);
- /* Call backend sync */
- old_expired = rspamd_fuzzy_backend_expired (ctx->backend);
- rspamd_fuzzy_backend_sync (ctx->backend, ctx->expire, TRUE);
- new_expired = rspamd_fuzzy_backend_expired (ctx->backend);
-
- if (old_expired < new_expired) {
- ctx->stat.fuzzy_hashes_expired += new_expired - old_expired;
- }
- }
-
- /* Timer event */
- event_del (&tev);
- evtimer_set (&tev, sync_callback, worker);
- event_base_set (ctx->ev_base, &tev);
- /* Plan event with jitter */
- next_check = rspamd_time_jitter (ctx->sync_timeout, 0);
- double_to_tv (next_check, &tmv);
- evtimer_add (&tev, &tmv);
-}
-
static gboolean
rspamd_fuzzy_storage_sync (struct rspamd_main *rspamd_main,
struct rspamd_worker *worker, gint fd,
@@ -1400,29 +1466,13 @@ rspamd_fuzzy_storage_sync (struct rspamd_main *rspamd_main,
gpointer ud)
{
struct rspamd_fuzzy_storage_ctx *ctx = ud;
- gdouble next_check;
- guint64 old_expired, new_expired;
struct rspamd_control_reply rep;
- if (ctx->backend) {
- rspamd_fuzzy_process_updates_queue (ctx, local_db_name);
- /* Call backend sync */
- old_expired = rspamd_fuzzy_backend_expired (ctx->backend);
- rspamd_fuzzy_backend_sync (ctx->backend, ctx->expire, TRUE);
- new_expired = rspamd_fuzzy_backend_expired (ctx->backend);
-
- if (old_expired < new_expired) {
- ctx->stat.fuzzy_hashes_expired += new_expired - old_expired;
- }
- }
-
rep.reply.fuzzy_sync.status = 0;
- /* Handle next timer event */
- event_del (&tev);
- next_check = rspamd_time_jitter (ctx->sync_timeout, 0);
- double_to_tv (next_check, &tmv);
- evtimer_add (&tev, &tmv);
+ if (ctx->backend && worker->index == 0) {
+ rspamd_fuzzy_backend_start_expire (ctx->backend, ctx->sync_timeout);
+ }
if (write (fd, &rep, sizeof (rep)) != sizeof (rep)) {
msg_err ("cannot write reply to the control socket: %s",
@@ -1453,8 +1503,8 @@ rspamd_fuzzy_storage_reload (struct rspamd_main *rspamd_main,
memset (&rep, 0, sizeof (rep));
rep.type = RSPAMD_CONTROL_RELOAD;
- if ((ctx->backend = rspamd_fuzzy_backend_open (ctx->hashfile,
- TRUE,
+ if ((ctx->backend = rspamd_fuzzy_backend_create (ctx->ev_base,
+ worker->cf->options,
&err)) == NULL) {
msg_err ("cannot open backend after reload: %e", err);
g_error_free (err);
@@ -1464,6 +1514,10 @@ rspamd_fuzzy_storage_reload (struct rspamd_main *rspamd_main,
rep.reply.reload.status = 0;
}
+ if (ctx->backend && worker->index == 0) {
+ rspamd_fuzzy_backend_start_expire (ctx->backend, ctx->sync_timeout);
+ }
+
if (write (fd, &rep, sizeof (rep)) != sizeof (rep)) {
msg_err ("cannot write reply to the control socket: %s",
strerror (errno));
@@ -1909,7 +1963,6 @@ init_fuzzy (struct rspamd_config *cfg)
ctx->magic = rspamd_fuzzy_storage_magic;
ctx->sync_timeout = DEFAULT_SYNC_TIMEOUT;
ctx->master_timeout = DEFAULT_MASTER_TIMEOUT;
- ctx->expire = DEFAULT_EXPIRE;
ctx->keypair_cache_size = DEFAULT_KEYPAIR_CACHE_SIZE;
ctx->keys = g_hash_table_new_full (fuzzy_kp_hash, fuzzy_kp_equal,
NULL, fuzzy_key_dtor);
@@ -2116,7 +2169,6 @@ fuzzy_peer_rep (struct rspamd_worker *worker,
GList *cur;
struct rspamd_worker_listen_socket *ls;
struct event *accept_events;
- gdouble next_check;
ctx->peer_fd = rep_fd;
@@ -2165,14 +2217,6 @@ fuzzy_peer_rep (struct rspamd_worker *worker,
event_base_set (ctx->ev_base, &ctx->peer_ev);
event_add (&ctx->peer_ev, NULL);
ctx->updates_pending = g_queue_new ();
-
- /* Timer event */
- evtimer_set (&tev, sync_callback, worker);
- event_base_set (ctx->ev_base, &tev);
- /* Plan event with jitter */
- next_check = rspamd_time_jitter (ctx->sync_timeout, 0);
- double_to_tv (next_check, &tmv);
- evtimer_add (&tev, &tmv);
}
}
@@ -2196,13 +2240,14 @@ start_fuzzy (struct rspamd_worker *worker)
/*
* Open DB and perform VACUUM
*/
- if ((ctx->backend = rspamd_fuzzy_backend_open (ctx->hashfile, TRUE, &err)) == NULL) {
+ if ((ctx->backend = rspamd_fuzzy_backend_create (ctx->ev_base,
+ worker->cf->options, &err)) == NULL) {
msg_err ("cannot open backend: %e", err);
g_error_free (err);
exit (EXIT_SUCCESS);
}
- ctx->stat.fuzzy_hashes = rspamd_fuzzy_backend_count (ctx->backend);
+ rspamd_fuzzy_backend_count (ctx->backend, fuzzy_count_callback, ctx);
if (ctx->keypair_cache_size > 0) {
/* Create keypairs cache */
@@ -2210,7 +2255,7 @@ start_fuzzy (struct rspamd_worker *worker)
}
if (worker->index == 0) {
- rspamd_fuzzy_backend_sync (ctx->backend, ctx->expire, TRUE);
+ rspamd_fuzzy_backend_start_expire (ctx->backend, ctx->sync_timeout);
}
if (ctx->mirrors && ctx->mirrors->len != 0) {
@@ -2268,11 +2313,6 @@ start_fuzzy (struct rspamd_worker *worker)
event_base_loop (ctx->ev_base, 0);
rspamd_worker_block_signals ();
- if (worker->index == 0) {
- rspamd_fuzzy_process_updates_queue (ctx, local_db_name);
- rspamd_fuzzy_backend_sync (ctx->backend, ctx->expire, TRUE);
- }
-
rspamd_fuzzy_backend_close (ctx->backend);
rspamd_log_close (worker->srv->logger);
diff --git a/src/libserver/CMakeLists.txt b/src/libserver/CMakeLists.txt
index 295ad59c8..4f3b9a260 100644
--- a/src/libserver/CMakeLists.txt
+++ b/src/libserver/CMakeLists.txt
@@ -9,6 +9,7 @@ SET(LIBRSPAMDSERVERSRC
${CMAKE_CURRENT_SOURCE_DIR}/dynamic_cfg.c
${CMAKE_CURRENT_SOURCE_DIR}/events.c
${CMAKE_CURRENT_SOURCE_DIR}/fuzzy_backend.c
+ ${CMAKE_CURRENT_SOURCE_DIR}/fuzzy_backend_sqlite.c
${CMAKE_CURRENT_SOURCE_DIR}/html.c
${CMAKE_CURRENT_SOURCE_DIR}/monitored.c
${CMAKE_CURRENT_SOURCE_DIR}/protocol.c
diff --git a/src/libserver/fuzzy_backend.c b/src/libserver/fuzzy_backend.c
index 463fdd1f4..44493e17b 100644
--- a/src/libserver/fuzzy_backend.c
+++ b/src/libserver/fuzzy_backend.c
@@ -13,1042 +13,392 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
#include "config.h"
-#include "rspamd.h"
#include "fuzzy_backend.h"
-#include "unix-std.h"
+#include "fuzzy_backend_sqlite.h"
-#include <sqlite3.h>
-#include "libutil/sqlite_utils.h"
+#define DEFAULT_EXPIRE 172800L
-struct rspamd_fuzzy_backend {
- sqlite3 *db;
- char *path;
- gchar id[MEMPOOL_UID_LEN];
- gsize count;
- gsize expired;
- rspamd_mempool_t *pool;
+enum rspamd_fuzzy_backend_type {
+ RSPAMD_FUZZY_BACKEND_SQLITE = 0,
+ // RSPAMD_FUZZY_BACKEND_REDIS
};
-static const gdouble sql_sleep_time = 0.1;
-static const guint max_retries = 10;
-
-#define msg_err_fuzzy_backend(...) rspamd_default_log_function (G_LOG_LEVEL_CRITICAL, \
- backend->pool->tag.tagname, backend->pool->tag.uid, \
- G_STRFUNC, \
- __VA_ARGS__)
-#define msg_warn_fuzzy_backend(...) rspamd_default_log_function (G_LOG_LEVEL_WARNING, \
- backend->pool->tag.tagname, backend->pool->tag.uid, \
- G_STRFUNC, \
- __VA_ARGS__)
-#define msg_info_fuzzy_backend(...) rspamd_default_log_function (G_LOG_LEVEL_INFO, \
- backend->pool->tag.tagname, backend->pool->tag.uid, \
- G_STRFUNC, \
- __VA_ARGS__)
-#define msg_debug_fuzzy_backend(...) rspamd_default_log_function (G_LOG_LEVEL_DEBUG, \
- backend->pool->tag.tagname, backend->pool->tag.uid, \
- G_STRFUNC, \
- __VA_ARGS__)
-
-static const char *create_tables_sql =
- "BEGIN;"
- "CREATE TABLE IF NOT EXISTS digests("
- " id INTEGER PRIMARY KEY,"
- " flag INTEGER NOT NULL,"
- " digest TEXT NOT NULL,"
- " value INTEGER,"
- " time INTEGER);"
- "CREATE TABLE IF NOT EXISTS shingles("
- " value INTEGER NOT NULL,"
- " number INTEGER NOT NULL,"
- " digest_id INTEGER REFERENCES digests(id) ON DELETE CASCADE "
- " ON UPDATE CASCADE);"
- "CREATE TABLE IF NOT EXISTS sources("
- " name TEXT UNIQUE,"
- " version INTEGER,"
- " last INTEGER);"
- "CREATE UNIQUE INDEX IF NOT EXISTS d ON digests(digest);"
- "CREATE INDEX IF NOT EXISTS t ON digests(time);"
- "CREATE INDEX IF NOT EXISTS dgst_id ON shingles(digest_id);"
- "CREATE UNIQUE INDEX IF NOT EXISTS s ON shingles(value, number);"
- "COMMIT;";
-#if 0
-static const char *create_index_sql =
- "BEGIN;"
- "CREATE UNIQUE INDEX IF NOT EXISTS d ON digests(digest);"
- "CREATE INDEX IF NOT EXISTS t ON digests(time);"
- "CREATE INDEX IF NOT EXISTS dgst_id ON shingles(digest_id);"
- "CREATE UNIQUE INDEX IF NOT EXISTS s ON shingles(value, number);"
- "COMMIT;";
-#endif
-enum rspamd_fuzzy_statement_idx {
- RSPAMD_FUZZY_BACKEND_TRANSACTION_START = 0,
- RSPAMD_FUZZY_BACKEND_TRANSACTION_COMMIT,
- RSPAMD_FUZZY_BACKEND_TRANSACTION_ROLLBACK,
- RSPAMD_FUZZY_BACKEND_INSERT,
- RSPAMD_FUZZY_BACKEND_UPDATE,
- RSPAMD_FUZZY_BACKEND_UPDATE_FLAG,
- RSPAMD_FUZZY_BACKEND_INSERT_SHINGLE,
- RSPAMD_FUZZY_BACKEND_CHECK,
- RSPAMD_FUZZY_BACKEND_CHECK_SHINGLE,
- RSPAMD_FUZZY_BACKEND_GET_DIGEST_BY_ID,
- RSPAMD_FUZZY_BACKEND_DELETE,
- RSPAMD_FUZZY_BACKEND_COUNT,
- RSPAMD_FUZZY_BACKEND_EXPIRE,
- RSPAMD_FUZZY_BACKEND_VACUUM,
- RSPAMD_FUZZY_BACKEND_DELETE_ORPHANED,
- RSPAMD_FUZZY_BACKEND_ADD_SOURCE,
- RSPAMD_FUZZY_BACKEND_VERSION,
- RSPAMD_FUZZY_BACKEND_SET_VERSION,
- RSPAMD_FUZZY_BACKEND_MAX
+static void* rspamd_fuzzy_backend_init_sqlite (struct rspamd_fuzzy_backend *bk,
+ const ucl_object_t *obj, GError **err);
+static void rspamd_fuzzy_backend_check_sqlite (struct rspamd_fuzzy_backend *bk,
+ const struct rspamd_fuzzy_cmd *cmd,
+ rspamd_fuzzy_check_cb cb, void *ud,
+ void *subr_ud);
+static void rspamd_fuzzy_backend_update_sqlite (struct rspamd_fuzzy_backend *bk,
+ GQueue *updates, const gchar *src,
+ rspamd_fuzzy_update_cb cb, void *ud,
+ void *subr_ud);
+static void rspamd_fuzzy_backend_count_sqlite (struct rspamd_fuzzy_backend *bk,
+ rspamd_fuzzy_count_cb cb, void *ud,
+ void *subr_ud);
+static void rspamd_fuzzy_backend_version_sqlite (struct rspamd_fuzzy_backend *bk,
+ const gchar *src,
+ rspamd_fuzzy_version_cb cb, void *ud,
+ void *subr_ud);
+static const gchar* rspamd_fuzzy_backend_id_sqlite (struct rspamd_fuzzy_backend *bk,
+ void *subr_ud);
+static void rspamd_fuzzy_backend_expire_sqlite (struct rspamd_fuzzy_backend *bk,
+ void *subr_ud);
+static void rspamd_fuzzy_backend_close_sqlite (struct rspamd_fuzzy_backend *bk,
+ void *subr_ud);
+
+struct rspamd_fuzzy_backend_subr {
+ void* (*init) (struct rspamd_fuzzy_backend *bk, const ucl_object_t *obj,
+ GError **err);
+ void (*check) (struct rspamd_fuzzy_backend *bk,
+ const struct rspamd_fuzzy_cmd *cmd,
+ rspamd_fuzzy_check_cb cb, void *ud,
+ void *subr_ud);
+ void (*update) (struct rspamd_fuzzy_backend *bk,
+ GQueue *updates, const gchar *src,
+ rspamd_fuzzy_update_cb cb, void *ud,
+ void *subr_ud);
+ void (*count) (struct rspamd_fuzzy_backend *bk,
+ rspamd_fuzzy_count_cb cb, void *ud,
+ void *subr_ud);
+ void (*version) (struct rspamd_fuzzy_backend *bk,
+ const gchar *src,
+ rspamd_fuzzy_version_cb cb, void *ud,
+ void *subr_ud);
+ const gchar* (*id) (struct rspamd_fuzzy_backend *bk, void *subr_ud);
+ void (*expire) (struct rspamd_fuzzy_backend *bk, void *subr_ud);
+ void (*close) (struct rspamd_fuzzy_backend *bk, void *subr_ud);
};
-static struct rspamd_fuzzy_stmts {
- enum rspamd_fuzzy_statement_idx idx;
- const gchar *sql;
- const gchar *args;
- sqlite3_stmt *stmt;
- gint result;
-} prepared_stmts[RSPAMD_FUZZY_BACKEND_MAX] =
-{
- {
- .idx = RSPAMD_FUZZY_BACKEND_TRANSACTION_START,
- .sql = "BEGIN TRANSACTION;",
- .args = "",
- .stmt = NULL,
- .result = SQLITE_DONE
- },
- {
- .idx = RSPAMD_FUZZY_BACKEND_TRANSACTION_COMMIT,
- .sql = "COMMIT;",
- .args = "",
- .stmt = NULL,
- .result = SQLITE_DONE
- },
- {
- .idx = RSPAMD_FUZZY_BACKEND_TRANSACTION_ROLLBACK,
- .sql = "ROLLBACK;",
- .args = "",
- .stmt = NULL,
- .result = SQLITE_DONE
- },
- {
- .idx = RSPAMD_FUZZY_BACKEND_INSERT,
- .sql = "INSERT INTO digests(flag, digest, value, time) VALUES"
- "(?1, ?2, ?3, strftime('%s','now'));",
- .args = "SDI",
- .stmt = NULL,
- .result = SQLITE_DONE
- },
- {
- .idx = RSPAMD_FUZZY_BACKEND_UPDATE,
- .sql = "UPDATE digests SET value = value + ?1, time = strftime('%s','now') WHERE "
- "digest==?2;",
- .args = "ID",
- .stmt = NULL,
- .result = SQLITE_DONE
- },
- {
- .idx = RSPAMD_FUZZY_BACKEND_UPDATE_FLAG,
- .sql = "UPDATE digests SET value = ?1, flag = ?2, time = strftime('%s','now') WHERE "
- "digest==?3;",
- .args = "IID",
- .stmt = NULL,
- .result = SQLITE_DONE
- },
- {
- .idx = RSPAMD_FUZZY_BACKEND_INSERT_SHINGLE,
- .sql = "INSERT OR REPLACE INTO shingles(value, number, digest_id) "
- "VALUES (?1, ?2, ?3);",
- .args = "III",
- .stmt = NULL,
- .result = SQLITE_DONE
- },
- {
- .idx = RSPAMD_FUZZY_BACKEND_CHECK,
- .sql = "SELECT value, time, flag FROM digests WHERE digest==?1;",
- .args = "D",
- .stmt = NULL,
- .result = SQLITE_ROW
- },
- {
- .idx = RSPAMD_FUZZY_BACKEND_CHECK_SHINGLE,
- .sql = "SELECT digest_id FROM shingles WHERE value=?1 AND number=?2",
- .args = "IS",
- .stmt = NULL,
- .result = SQLITE_ROW
- },
- {
- .idx = RSPAMD_FUZZY_BACKEND_GET_DIGEST_BY_ID,
- .sql = "SELECT digest, value, time, flag FROM digests WHERE id=?1",
- .args = "I",
- .stmt = NULL,
- .result = SQLITE_ROW
- },
- {
- .idx = RSPAMD_FUZZY_BACKEND_DELETE,
- .sql = "DELETE FROM digests WHERE digest==?1;",
- .args = "D",
- .stmt = NULL,
- .result = SQLITE_DONE
- },
- {
- .idx = RSPAMD_FUZZY_BACKEND_COUNT,
- .sql = "SELECT COUNT(*) FROM digests;",
- .args = "",
- .stmt = NULL,
- .result = SQLITE_ROW
- },
- {
- .idx = RSPAMD_FUZZY_BACKEND_EXPIRE,
- .sql = "DELETE FROM digests WHERE id IN (SELECT id FROM digests WHERE time < ?1 LIMIT ?2);",
- .args = "II",
- .stmt = NULL,
- .result = SQLITE_DONE
- },
- {
- .idx = RSPAMD_FUZZY_BACKEND_VACUUM,
- .sql = "VACUUM;",
- .args = "",
- .stmt = NULL,
- .result = SQLITE_DONE
- },
- {
- .idx = RSPAMD_FUZZY_BACKEND_DELETE_ORPHANED,
- .sql = "DELETE FROM shingles WHERE value=?1 AND number=?2;",
- .args = "II",
- .stmt = NULL,
- .result = SQLITE_DONE
- },
- {
- .idx = RSPAMD_FUZZY_BACKEND_ADD_SOURCE,
- .sql = "INSERT OR IGNORE INTO sources(name, version, last) VALUES (?1, ?2, ?3);",
- .args = "TII",
- .stmt = NULL,
- .result = SQLITE_DONE
- },
- {
- .idx = RSPAMD_FUZZY_BACKEND_VERSION,
- .sql = "SELECT version FROM sources WHERE name=?1;",
- .args = "T",
- .stmt = NULL,
- .result = SQLITE_ROW
- },
- {
- .idx = RSPAMD_FUZZY_BACKEND_SET_VERSION,
- .sql = "INSERT OR REPLACE INTO sources (name, version, last) VALUES (?3, ?1, ?2);",
- .args = "IIT",
- .stmt = NULL,
- .result = SQLITE_DONE
- },
+
+static const struct rspamd_fuzzy_backend_subr fuzzy_subrs[] = {
+ [RSPAMD_FUZZY_BACKEND_SQLITE] = {
+ .init = rspamd_fuzzy_backend_init_sqlite,
+ .check = rspamd_fuzzy_backend_check_sqlite,
+ .update = rspamd_fuzzy_backend_update_sqlite,
+ .count = rspamd_fuzzy_backend_count_sqlite,
+ .version = rspamd_fuzzy_backend_version_sqlite,
+ .id = rspamd_fuzzy_backend_id_sqlite,
+ .expire = rspamd_fuzzy_backend_expire_sqlite,
+ .close = rspamd_fuzzy_backend_close_sqlite,
+ }
+};
+
+struct rspamd_fuzzy_backend {
+ enum rspamd_fuzzy_backend_type type;
+ gdouble expire;
+ gdouble sync;
+ struct event_base *ev_base;
+ const struct rspamd_fuzzy_backend_subr *subr;
+ void *subr_ud;
+ struct event expire_event;
};
static GQuark
-rspamd_fuzzy_backend_quark(void)
+rspamd_fuzzy_backend_quark (void)
{
- return g_quark_from_static_string ("fuzzy-storage-backend");
+ return g_quark_from_static_string ("fuzzy-backend");
}
-static gboolean
-rspamd_fuzzy_backend_prepare_stmts (struct rspamd_fuzzy_backend *bk, GError **err)
+static void*
+rspamd_fuzzy_backend_init_sqlite (struct rspamd_fuzzy_backend *bk,
+ const ucl_object_t *obj, GError **err)
{
- int i;
+ const ucl_object_t *elt;
- for (i = 0; i < RSPAMD_FUZZY_BACKEND_MAX; i ++) {
- if (prepared_stmts[i].stmt != NULL) {
- /* Skip already prepared statements */
- continue;
- }
- if (sqlite3_prepare_v2 (bk->db, prepared_stmts[i].sql, -1,
- &prepared_stmts[i].stmt, NULL) != SQLITE_OK) {
- g_set_error (err, rspamd_fuzzy_backend_quark (),
- -1, "Cannot initialize prepared sql `%s`: %s",
- prepared_stmts[i].sql, sqlite3_errmsg (bk->db));
+ elt = ucl_object_lookup_any (obj, "hashfile", "hash_file", "file",
+ "database", NULL);
- return FALSE;
- }
+ if (elt == NULL || ucl_object_type (elt) != UCL_STRING) {
+ g_set_error (err, rspamd_fuzzy_backend_quark (),
+ EINVAL, "missing sqlite3 path");
+ return NULL;
}
- return TRUE;
+ return rspamd_fuzzy_backend_sqlite_open (ucl_object_tostring (elt),
+ FALSE, err);
}
-static int
-rspamd_fuzzy_backend_cleanup_stmt (struct rspamd_fuzzy_backend *backend,
- int idx)
+static void
+rspamd_fuzzy_backend_check_sqlite (struct rspamd_fuzzy_backend *bk,
+ const struct rspamd_fuzzy_cmd *cmd,
+ rspamd_fuzzy_check_cb cb, void *ud,
+ void *subr_ud)
{
- sqlite3_stmt *stmt;
+ struct rspamd_fuzzy_backend_sqlite *sq = subr_ud;
+ struct rspamd_fuzzy_reply rep;
- if (idx < 0 || idx >= RSPAMD_FUZZY_BACKEND_MAX) {
+ rep = rspamd_fuzzy_backend_sqlite_check (sq, cmd, bk->expire);
- return -1;
+ if (cb) {
+ cb (&rep, ud);
}
-
- msg_debug_fuzzy_backend ("reseting `%s`", prepared_stmts[idx].sql);
- stmt = prepared_stmts[idx].stmt;
- sqlite3_clear_bindings (stmt);
- sqlite3_reset (stmt);
-
- return SQLITE_OK;
}
-static int
-rspamd_fuzzy_backend_run_stmt (struct rspamd_fuzzy_backend *backend,
- gboolean auto_cleanup,
- int idx, ...)
+static void
+rspamd_fuzzy_backend_update_sqlite (struct rspamd_fuzzy_backend *bk,
+ GQueue *updates, const gchar *src,
+ rspamd_fuzzy_update_cb cb, void *ud,
+ void *subr_ud)
{
- int retcode;
- va_list ap;
- sqlite3_stmt *stmt;
- int i;
- const char *argtypes;
- guint retries = 0;
- struct timespec ts;
-
- if (idx < 0 || idx >= RSPAMD_FUZZY_BACKEND_MAX) {
-
- return -1;
- }
-
- stmt = prepared_stmts[idx].stmt;
- g_assert ((int)prepared_stmts[idx].idx == idx);
-
- if (stmt == NULL) {
- if ((retcode = sqlite3_prepare_v2 (backend->db, prepared_stmts[idx].sql, -1,
- &prepared_stmts[idx].stmt, NULL)) != SQLITE_OK) {
- msg_err_fuzzy_backend ("Cannot initialize prepared sql `%s`: %s",
- prepared_stmts[idx].sql, sqlite3_errmsg (backend->db));
+ struct rspamd_fuzzy_backend_sqlite *sq = subr_ud;
+ gboolean success = FALSE;
+ GList *cur;
+ struct fuzzy_peer_cmd *io_cmd;
+ struct rspamd_fuzzy_cmd *cmd;
+ gpointer ptr;
+ guint nupdates = 0;
+
+ if (rspamd_fuzzy_backend_sqlite_prepare_update (sq, src)) {
+ cur = updates->head;
+
+ while (cur) {
+ io_cmd = cur->data;
+
+ if (io_cmd->is_shingle) {
+ cmd = &io_cmd->cmd.shingle.basic;
+ ptr = &io_cmd->cmd.shingle;
+ }
+ else {
+ cmd = &io_cmd->cmd.normal;
+ ptr = &io_cmd->cmd.normal;
+ }
- return retcode;
- }
- stmt = prepared_stmts[idx].stmt;
- }
+ if (cmd->cmd == FUZZY_WRITE) {
+ rspamd_fuzzy_backend_sqlite_add (sq, ptr);
+ }
+ else {
+ rspamd_fuzzy_backend_sqlite_del (sq, ptr);
+ }
- msg_debug_fuzzy_backend ("executing `%s` %s auto cleanup",
- prepared_stmts[idx].sql, auto_cleanup ? "with" : "without");
- argtypes = prepared_stmts[idx].args;
- sqlite3_clear_bindings (stmt);
- sqlite3_reset (stmt);
- va_start (ap, idx);
-
- for (i = 0; argtypes[i] != '\0'; i++) {
- switch (argtypes[i]) {
- case 'T':
- sqlite3_bind_text (stmt, i + 1, va_arg (ap, const char*), -1,
- SQLITE_STATIC);
- break;
- case 'I':
- sqlite3_bind_int64 (stmt, i + 1, va_arg (ap, gint64));
- break;
- case 'S':
- sqlite3_bind_int (stmt, i + 1, va_arg (ap, gint));
- break;
- case 'D':
- /* Special case for digests variable */
- sqlite3_bind_text (stmt, i + 1, va_arg (ap, const char*), 64,
- SQLITE_STATIC);
- break;
+ nupdates ++;
+ cur = g_list_next (cur);
}
- }
-
- va_end (ap);
-retry:
- retcode = sqlite3_step (stmt);
-
- if (retcode == prepared_stmts[idx].result) {
- retcode = SQLITE_OK;
- }
- else {
- if ((retcode == SQLITE_BUSY ||
- retcode == SQLITE_LOCKED) && retries++ < max_retries) {
- double_to_ts (sql_sleep_time, &ts);
- nanosleep (&ts, NULL);
- goto retry;
+ if (rspamd_fuzzy_backend_sqlite_finish_update (sq, src,
+ nupdates > 0)) {
+ success = TRUE;
}
-
- msg_debug_fuzzy_backend ("failed to execute query %s: %d, %s", prepared_stmts[idx].sql,
- retcode, sqlite3_errmsg (backend->db));
}
- if (auto_cleanup) {
- sqlite3_clear_bindings (stmt);
- sqlite3_reset (stmt);
+ if (cb) {
+ cb (success, ud);
}
-
- return retcode;
}
static void
-rspamd_fuzzy_backend_close_stmts (struct rspamd_fuzzy_backend *bk)
+rspamd_fuzzy_backend_count_sqlite (struct rspamd_fuzzy_backend *bk,
+ rspamd_fuzzy_count_cb cb, void *ud,
+ void *subr_ud)
{
- int i;
-
- for (i = 0; i < RSPAMD_FUZZY_BACKEND_MAX; i++) {
- if (prepared_stmts[i].stmt != NULL) {
- sqlite3_finalize (prepared_stmts[i].stmt);
- prepared_stmts[i].stmt = NULL;
- }
- }
+ struct rspamd_fuzzy_backend_sqlite *sq = subr_ud;
+ guint64 nhashes;
- return;
-}
+ nhashes = rspamd_fuzzy_backend_sqlite_count (sq);
-static gboolean
-rspamd_fuzzy_backend_run_sql (const gchar *sql, struct rspamd_fuzzy_backend *bk,
- GError **err)
-{
- guint retries = 0;
- struct timespec ts;
- gint ret;
-
- do {
- ret = sqlite3_exec (bk->db, sql, NULL, NULL, NULL);
- double_to_ts (sql_sleep_time, &ts);
- } while (ret == SQLITE_BUSY && retries++ < max_retries &&
- nanosleep (&ts, NULL) == 0);
-
- if (ret != SQLITE_OK) {
- g_set_error (err, rspamd_fuzzy_backend_quark (),
- -1, "Cannot execute raw sql `%s`: %s",
- sql, sqlite3_errmsg (bk->db));
- return FALSE;
+ if (cb) {
+ cb (nhashes, ud);
}
-
- return TRUE;
}
-static struct rspamd_fuzzy_backend *
-rspamd_fuzzy_backend_open_db (const gchar *path, GError **err)
+static void
+rspamd_fuzzy_backend_version_sqlite (struct rspamd_fuzzy_backend *bk,
+ const gchar *src,
+ rspamd_fuzzy_version_cb cb, void *ud,
+ void *subr_ud)
{
- struct rspamd_fuzzy_backend *bk;
- rspamd_cryptobox_hash_state_t st;
- guchar hash_out[rspamd_cryptobox_HASHBYTES];
-
- g_assert (path != NULL);
+ struct rspamd_fuzzy_backend_sqlite *sq = subr_ud;
+ guint64 rev;
- bk = g_slice_alloc (sizeof (*bk));
- bk->path = g_strdup (path);
- bk->expired = 0;
- bk->pool = rspamd_mempool_new (rspamd_mempool_suggest_size (), "fuzzy_backend");
- bk->db = rspamd_sqlite3_open_or_create (bk->pool, bk->path,
- create_tables_sql, 1, err);
+ rev = rspamd_fuzzy_backend_sqlite_version (sq, src);
- if (bk->db == NULL) {
- rspamd_fuzzy_backend_close (bk);
-
- return NULL;
+ if (cb) {
+ cb (rev, ud);
}
-
- if (!rspamd_fuzzy_backend_prepare_stmts (bk, err)) {
- rspamd_fuzzy_backend_close (bk);
-
- return NULL;
- }
-
- /* Set id for the backend */
- rspamd_cryptobox_hash_init (&st, NULL, 0);
- rspamd_cryptobox_hash_update (&st, path, strlen (path));
- rspamd_cryptobox_hash_final (&st, hash_out);
- rspamd_snprintf (bk->id, sizeof (bk->id), "%xs", hash_out);
- memcpy (bk->pool->tag.uid, bk->id, sizeof (bk->pool->tag.uid));
-
- return bk;
}
-struct rspamd_fuzzy_backend *
-rspamd_fuzzy_backend_open (const gchar *path,
- gboolean vacuum,
- GError **err)
+static const gchar*
+rspamd_fuzzy_backend_id_sqlite (struct rspamd_fuzzy_backend *bk,
+ void *subr_ud)
{
- struct rspamd_fuzzy_backend *backend;
-
- if (path == NULL) {
- g_set_error (err, rspamd_fuzzy_backend_quark (),
- ENOENT, "Path has not been specified");
- return NULL;
- }
-
- /* Open database */
- if ((backend = rspamd_fuzzy_backend_open_db (path, err)) == NULL) {
- return NULL;
- }
-
- if (rspamd_fuzzy_backend_run_stmt (backend, FALSE, RSPAMD_FUZZY_BACKEND_COUNT)
- == SQLITE_OK) {
- backend->count = sqlite3_column_int64 (
- prepared_stmts[RSPAMD_FUZZY_BACKEND_COUNT].stmt, 0);
- }
+ struct rspamd_fuzzy_backend_sqlite *sq = subr_ud;
- rspamd_fuzzy_backend_cleanup_stmt (backend, RSPAMD_FUZZY_BACKEND_COUNT);
-
- return backend;
+ return rspamd_fuzzy_sqlite_backend_id (sq);
}
-
-static gint
-rspamd_fuzzy_backend_int64_cmp (const void *a, const void *b)
+static void
+rspamd_fuzzy_backend_expire_sqlite (struct rspamd_fuzzy_backend *bk,
+ void *subr_ud)
{
- gint64 ia = *(gint64 *)a, ib = *(gint64 *)b;
+ struct rspamd_fuzzy_backend_sqlite *sq = subr_ud;
- return (ia - ib);
+ rspamd_fuzzy_backend_sqlite_sync (sq, bk->expire, TRUE);
}
-struct rspamd_fuzzy_reply
-rspamd_fuzzy_backend_check (struct rspamd_fuzzy_backend *backend,
- const struct rspamd_fuzzy_cmd *cmd, gint64 expire)
+static void
+rspamd_fuzzy_backend_close_sqlite (struct rspamd_fuzzy_backend *bk,
+ void *subr_ud)
{
- struct rspamd_fuzzy_reply rep = {0, 0, 0, 0.0};
- const struct rspamd_fuzzy_shingle_cmd *shcmd;
- int rc;
- gint64 timestamp;
- gint64 shingle_values[RSPAMD_SHINGLE_SIZE], i, sel_id, cur_id,
- cur_cnt, max_cnt;
-
- if (backend == NULL) {
- return rep;
- }
+ struct rspamd_fuzzy_backend_sqlite *sq = subr_ud;
- /* Try direct match first of all */
- rspamd_fuzzy_backend_run_stmt (backend, TRUE,
- RSPAMD_FUZZY_BACKEND_TRANSACTION_START);
- rc = rspamd_fuzzy_backend_run_stmt (backend, FALSE,
- RSPAMD_FUZZY_BACKEND_CHECK,
- cmd->digest);
-
- if (rc == SQLITE_OK) {
- timestamp = sqlite3_column_int64 (
- prepared_stmts[RSPAMD_FUZZY_BACKEND_CHECK].stmt, 1);
- if (time (NULL) - timestamp > expire) {
- /* Expire element */
- msg_debug_fuzzy_backend ("requested hash has been expired");
- }
- else {
- rep.value = sqlite3_column_int64 (
- prepared_stmts[RSPAMD_FUZZY_BACKEND_CHECK].stmt, 0);
- rep.prob = 1.0;
- rep.flag = sqlite3_column_int (
- prepared_stmts[RSPAMD_FUZZY_BACKEND_CHECK].stmt, 2);
- }
- }
- else if (cmd->shingles_count > 0) {
- /* Fuzzy match */
-
- rspamd_fuzzy_backend_cleanup_stmt (backend, RSPAMD_FUZZY_BACKEND_CHECK);
- shcmd = (const struct rspamd_fuzzy_shingle_cmd *)cmd;
-
- for (i = 0; i < RSPAMD_SHINGLE_SIZE; i ++) {
- rc = rspamd_fuzzy_backend_run_stmt (backend, FALSE,
- RSPAMD_FUZZY_BACKEND_CHECK_SHINGLE,
- shcmd->sgl.hashes[i], i);
- if (rc == SQLITE_OK) {
- shingle_values[i] = sqlite3_column_int64 (
- prepared_stmts[RSPAMD_FUZZY_BACKEND_CHECK_SHINGLE].stmt,
- 0);
- }
- else {
- shingle_values[i] = -1;
- }
- msg_debug_fuzzy_backend ("looking for shingle %L -> %L: %d", i,
- shcmd->sgl.hashes[i], rc);
- }
+ rspamd_fuzzy_backend_sqlite_close (sq);
+}
- rspamd_fuzzy_backend_cleanup_stmt (backend,
- RSPAMD_FUZZY_BACKEND_CHECK_SHINGLE);
- qsort (shingle_values, RSPAMD_SHINGLE_SIZE, sizeof (gint64),
- rspamd_fuzzy_backend_int64_cmp);
- sel_id = -1;
- cur_id = -1;
- cur_cnt = 0;
- max_cnt = 0;
+struct rspamd_fuzzy_backend *
+rspamd_fuzzy_backend_create (struct event_base *ev_base,
+ const ucl_object_t *config, GError **err)
+{
+ struct rspamd_fuzzy_backend *bk;
+ enum rspamd_fuzzy_backend_type type = RSPAMD_FUZZY_BACKEND_SQLITE;
+ const ucl_object_t *elt;
+ gdouble expire = DEFAULT_EXPIRE;
- for (i = 0; i < RSPAMD_SHINGLE_SIZE; i ++) {
- if (shingle_values[i] == -1) {
- continue;
- }
+ if (config != NULL) {
+ elt = ucl_object_lookup (config, "type");
- /* We have some value here, so we need to check it */
- if (shingle_values[i] == cur_id) {
- cur_cnt ++;
+ if (elt != NULL && ucl_object_type (elt) == UCL_STRING) {
+ if (strcmp (ucl_object_tostring (elt), "sqlite") == 0) {
+ type = RSPAMD_FUZZY_BACKEND_SQLITE;
}
else {
- cur_id = shingle_values[i];
- if (cur_cnt >= max_cnt) {
- max_cnt = cur_cnt;
- sel_id = cur_id;
- }
- cur_cnt = 0;
+ g_set_error (err, rspamd_fuzzy_backend_quark (),
+ EINVAL, "invalid backend type: %s",
+ ucl_object_tostring (elt));
+ return NULL;
}
}
- if (cur_cnt > max_cnt) {
- max_cnt = cur_cnt;
- }
-
- if (sel_id != -1) {
- /* We have some id selected here */
- rep.prob = (float)max_cnt / (float)RSPAMD_SHINGLE_SIZE;
-
- if (rep.prob > 0.5) {
- msg_debug_fuzzy_backend (
- "found fuzzy hash with probability %.2f",
- rep.prob);
- rc = rspamd_fuzzy_backend_run_stmt (backend, FALSE,
- RSPAMD_FUZZY_BACKEND_GET_DIGEST_BY_ID, sel_id);
- if (rc == SQLITE_OK) {
- timestamp = sqlite3_column_int64 (
- prepared_stmts[RSPAMD_FUZZY_BACKEND_GET_DIGEST_BY_ID].stmt,
- 2);
- if (time (NULL) - timestamp > expire) {
- /* Expire element */
- msg_debug_fuzzy_backend (
- "requested hash has been expired");
- rep.prob = 0.0;
- }
- else {
- rep.value = sqlite3_column_int64 (
- prepared_stmts[RSPAMD_FUZZY_BACKEND_GET_DIGEST_BY_ID].stmt,
- 1);
- rep.flag = sqlite3_column_int (
- prepared_stmts[RSPAMD_FUZZY_BACKEND_GET_DIGEST_BY_ID].stmt,
- 3);
- }
- }
- }
- else {
- /* Otherwise we assume that as error */
- rep.value = 0;
- }
+ elt = ucl_object_lookup (config, "expire");
- rspamd_fuzzy_backend_cleanup_stmt (backend,
- RSPAMD_FUZZY_BACKEND_GET_DIGEST_BY_ID);
+ if (elt != NULL) {
+ expire = ucl_object_todouble (elt);
}
}
- rspamd_fuzzy_backend_cleanup_stmt (backend, RSPAMD_FUZZY_BACKEND_CHECK);
- rspamd_fuzzy_backend_run_stmt (backend, TRUE,
- RSPAMD_FUZZY_BACKEND_TRANSACTION_COMMIT);
+ bk = g_slice_alloc0 (sizeof (*bk));
+ bk->ev_base = ev_base;
+ bk->expire = expire;
+ bk->type = type;
+ bk->subr = &fuzzy_subrs[type];
- return rep;
-}
-
-gboolean
-rspamd_fuzzy_backend_prepare_update (struct rspamd_fuzzy_backend *backend,
- const gchar *source)
-{
- gint rc;
-
- if (backend == NULL) {
- return FALSE;
+ if ((bk->subr_ud = bk->subr->init (bk, config, err)) == NULL) {
+ g_slice_free1 (sizeof (*bk), bk);
}
- rc = rspamd_fuzzy_backend_run_stmt (backend, TRUE,
- RSPAMD_FUZZY_BACKEND_TRANSACTION_START);
+ return bk;
+}
- if (rc != SQLITE_OK) {
- msg_warn_fuzzy_backend ("cannot start transaction for updates: %s",
- sqlite3_errmsg (backend->db));
- return FALSE;
- }
- return TRUE;
+void
+rspamd_fuzzy_backend_check (struct rspamd_fuzzy_backend *bk,
+ const struct rspamd_fuzzy_cmd *cmd,
+ rspamd_fuzzy_check_cb cb, void *ud)
+{
+ g_assert (bk != NULL);
+
+ bk->subr->check (bk, cmd, cb, ud, bk->subr_ud);
}
-gboolean
-rspamd_fuzzy_backend_add (struct rspamd_fuzzy_backend *backend,
- const struct rspamd_fuzzy_cmd *cmd)
+void
+rspamd_fuzzy_backend_process_updates (struct rspamd_fuzzy_backend *bk,
+ GQueue *updates, const gchar *src, rspamd_fuzzy_update_cb cb,
+ void *ud)
{
- int rc, i;
- gint64 id, flag;
- const struct rspamd_fuzzy_shingle_cmd *shcmd;
+ g_assert (bk != NULL);
+ g_assert (updates != NULL);
- if (backend == NULL) {
- return FALSE;
+ if (g_queue_get_length (updates) > 0) {
+ bk->subr->update (bk, updates, src, cb, ud, bk->subr_ud);
}
-
- rc = rspamd_fuzzy_backend_run_stmt (backend, FALSE,
- RSPAMD_FUZZY_BACKEND_CHECK,
- cmd->digest);
-
- if (rc == SQLITE_OK) {
- /* Check flag */
- flag = sqlite3_column_int64 (
- prepared_stmts[RSPAMD_FUZZY_BACKEND_CHECK].stmt,
- 2);
- rspamd_fuzzy_backend_cleanup_stmt (backend, RSPAMD_FUZZY_BACKEND_CHECK);
-
- if (flag == cmd->flag) {
- /* We need to increase weight */
- rc = rspamd_fuzzy_backend_run_stmt (backend, TRUE,
- RSPAMD_FUZZY_BACKEND_UPDATE,
- (gint64) cmd->value,
- cmd->digest);
- if (rc != SQLITE_OK) {
- msg_warn_fuzzy_backend ("cannot update hash to %d -> "
- "%*xs: %s", (gint) cmd->flag,
- (gint) sizeof (cmd->digest), cmd->digest,
- sqlite3_errmsg (backend->db));
- }
- }
- else {
- /* We need to relearn actually */
-
- rc = rspamd_fuzzy_backend_run_stmt (backend, TRUE,
- RSPAMD_FUZZY_BACKEND_UPDATE_FLAG,
- (gint64) cmd->value,
- (gint64) cmd->flag,
- cmd->digest);
-
- if (rc != SQLITE_OK) {
- msg_warn_fuzzy_backend ("cannot update hash to %d -> "
- "%*xs: %s", (gint) cmd->flag,
- (gint) sizeof (cmd->digest), cmd->digest,
- sqlite3_errmsg (backend->db));
- }
- }
+ else if (cb) {
+ cb (TRUE, ud);
}
- else {
- rspamd_fuzzy_backend_cleanup_stmt (backend, RSPAMD_FUZZY_BACKEND_CHECK);
- rc = rspamd_fuzzy_backend_run_stmt (backend, FALSE,
- RSPAMD_FUZZY_BACKEND_INSERT,
- (gint) cmd->flag,
- cmd->digest,
- (gint64) cmd->value);
-
- if (rc == SQLITE_OK) {
- if (cmd->shingles_count > 0) {
- id = sqlite3_last_insert_rowid (backend->db);
- shcmd = (const struct rspamd_fuzzy_shingle_cmd *) cmd;
-
- for (i = 0; i < RSPAMD_SHINGLE_SIZE; i++) {
- rc = rspamd_fuzzy_backend_run_stmt (backend, TRUE,
- RSPAMD_FUZZY_BACKEND_INSERT_SHINGLE,
- shcmd->sgl.hashes[i], (gint64)i, id);
- msg_debug_fuzzy_backend ("add shingle %d -> %L: %L",
- i,
- shcmd->sgl.hashes[i],
- id);
-
- if (rc != SQLITE_OK) {
- msg_warn_fuzzy_backend ("cannot add shingle %d -> "
- "%L: %L: %s", i,
- shcmd->sgl.hashes[i],
- id, sqlite3_errmsg (backend->db));
- }
- }
- }
- }
- else {
- msg_warn_fuzzy_backend ("cannot add hash to %d -> "
- "%*xs: %s", (gint)cmd->flag,
- (gint)sizeof (cmd->digest), cmd->digest,
- sqlite3_errmsg (backend->db));
- }
-
- rspamd_fuzzy_backend_cleanup_stmt (backend,
- RSPAMD_FUZZY_BACKEND_INSERT);
- }
-
- return (rc == SQLITE_OK);
}
-gboolean
-rspamd_fuzzy_backend_finish_update (struct rspamd_fuzzy_backend *backend,
- const gchar *source, gboolean version_bump)
-{
- gint rc = SQLITE_OK, wal_frames, wal_checkpointed, ver;
- /* Get and update version */
- if (version_bump) {
- ver = rspamd_fuzzy_backend_version (backend, source);
- ++ver;
+void
+rspamd_fuzzy_backend_count (struct rspamd_fuzzy_backend *bk,
+ rspamd_fuzzy_count_cb cb, void *ud)
+{
+ g_assert (bk != NULL);
- rc = rspamd_fuzzy_backend_run_stmt (backend, TRUE,
- RSPAMD_FUZZY_BACKEND_SET_VERSION,
- (gint64)ver, (gint64)time (NULL), source);
- }
+ bk->subr->count (bk, cb, ud, bk->subr_ud);
+}
- if (rc == SQLITE_OK) {
- rc = rspamd_fuzzy_backend_run_stmt (backend, TRUE,
- RSPAMD_FUZZY_BACKEND_TRANSACTION_COMMIT);
- if (rc != SQLITE_OK) {
- msg_warn_fuzzy_backend ("cannot commit updates: %s",
- sqlite3_errmsg (backend->db));
- rspamd_fuzzy_backend_run_stmt (backend, TRUE,
- RSPAMD_FUZZY_BACKEND_TRANSACTION_ROLLBACK);
- return FALSE;
- }
- else {
- if (!rspamd_sqlite3_sync (backend->db, &wal_frames, &wal_checkpointed)) {
- msg_warn_fuzzy_backend ("cannot commit checkpoint: %s",
- sqlite3_errmsg (backend->db));
- }
- else if (wal_checkpointed > 0) {
- msg_info_fuzzy_backend ("total number of frames in the wal file: "
- "%d, checkpointed: %d", wal_frames, wal_checkpointed);
- }
- }
- }
- else {
- msg_warn_fuzzy_backend ("cannot update version for %s: %s", source,
- sqlite3_errmsg (backend->db));
- rspamd_fuzzy_backend_run_stmt (backend, TRUE,
- RSPAMD_FUZZY_BACKEND_TRANSACTION_ROLLBACK);
- return FALSE;
- }
+void
+rspamd_fuzzy_backend_version (struct rspamd_fuzzy_backend *bk,
+ const gchar *src,
+ rspamd_fuzzy_version_cb cb, void *ud)
+{
+ g_assert (bk != NULL);
- return TRUE;
+ bk->subr->version (bk, src, cb, ud, bk->subr_ud);
}
-gboolean
-rspamd_fuzzy_backend_del (struct rspamd_fuzzy_backend *backend,
- const struct rspamd_fuzzy_cmd *cmd)
+const gchar *
+rspamd_fuzzy_backend_id (struct rspamd_fuzzy_backend *bk)
{
- int rc = -1;
+ g_assert (bk != NULL);
- if (backend == NULL) {
- return FALSE;
+ if (bk->subr->id) {
+ return bk->subr->id (bk, bk->subr_ud);
}
- rc = rspamd_fuzzy_backend_run_stmt (backend, FALSE,
- RSPAMD_FUZZY_BACKEND_CHECK,
- cmd->digest);
-
- if (rc == SQLITE_OK) {
- rspamd_fuzzy_backend_cleanup_stmt (backend, RSPAMD_FUZZY_BACKEND_CHECK);
-
- rc = rspamd_fuzzy_backend_run_stmt (backend, TRUE,
- RSPAMD_FUZZY_BACKEND_DELETE,
- cmd->digest);
- if (rc != SQLITE_OK) {
- msg_warn_fuzzy_backend ("cannot update hash to %d -> "
- "%*xs: %s", (gint) cmd->flag,
- (gint) sizeof (cmd->digest), cmd->digest,
- sqlite3_errmsg (backend->db));
- }
- }
- else {
- /* Hash is missing */
- rspamd_fuzzy_backend_cleanup_stmt (backend, RSPAMD_FUZZY_BACKEND_CHECK);
- }
-
- return (rc == SQLITE_OK);
+ return NULL;
}
-gboolean
-rspamd_fuzzy_backend_sync (struct rspamd_fuzzy_backend *backend,
- gint64 expire,
- gboolean clean_orphaned)
+static void
+rspamd_fuzzy_backend_expire_cb (gint fd, short what, void *ud)
{
- struct orphaned_shingle_elt {
- gint64 value;
- gint64 number;
- };
-
- /* Do not do more than 5k ops per step */
- const guint64 max_changes = 5000;
- gboolean ret = FALSE;
- gint64 expire_lim, expired;
- gint rc, i, orphaned_cnt = 0;
- GError *err = NULL;
- static const gchar orphaned_shingles[] = "SELECT shingles.value,shingles.number "
- "FROM shingles "
- "LEFT JOIN digests ON "
- "shingles.digest_id=digests.id WHERE "
- "digests.id IS NULL;";
- sqlite3_stmt *stmt;
- GArray *orphaned;
- struct orphaned_shingle_elt orphaned_elt, *pelt;
-
-
- if (backend == NULL) {
- return FALSE;
- }
-
- /* Perform expire */
- if (expire > 0) {
- expire_lim = time (NULL) - expire;
-
- if (expire_lim > 0) {
- ret = rspamd_fuzzy_backend_run_stmt (backend, TRUE,
- RSPAMD_FUZZY_BACKEND_TRANSACTION_START);
-
- if (ret == SQLITE_OK) {
-
- rc = rspamd_fuzzy_backend_run_stmt (backend, FALSE,
- RSPAMD_FUZZY_BACKEND_EXPIRE, expire_lim, max_changes);
-
- if (rc == SQLITE_OK) {
- expired = sqlite3_changes (backend->db);
-
- if (expired > 0) {
- backend->expired += expired;
- msg_info_fuzzy_backend ("expired %L hashes", expired);
- }
- }
- else {
- msg_warn_fuzzy_backend (
- "cannot execute expired statement: %s",
- sqlite3_errmsg (backend->db));
- }
-
- rspamd_fuzzy_backend_cleanup_stmt (backend,
- RSPAMD_FUZZY_BACKEND_EXPIRE);
-
- ret = rspamd_fuzzy_backend_run_stmt (backend, TRUE,
- RSPAMD_FUZZY_BACKEND_TRANSACTION_COMMIT);
-
- if (ret != SQLITE_OK) {
- rspamd_fuzzy_backend_run_stmt (backend, TRUE,
- RSPAMD_FUZZY_BACKEND_TRANSACTION_ROLLBACK);
- }
- }
- if (ret != SQLITE_OK) {
- msg_warn_fuzzy_backend ("cannot expire db: %s",
- sqlite3_errmsg (backend->db));
- }
- }
- }
-
- /* Cleanup database */
- if (clean_orphaned) {
- ret = rspamd_fuzzy_backend_run_stmt (backend, TRUE,
- RSPAMD_FUZZY_BACKEND_TRANSACTION_START);
-
- if (ret == SQLITE_OK) {
- if ((rc = sqlite3_prepare_v2 (backend->db,
- orphaned_shingles,
- -1,
- &stmt,
- NULL)) != SQLITE_OK) {
- msg_warn_fuzzy_backend ("cannot cleanup shingles: %s",
- sqlite3_errmsg (backend->db));
- }
- else {
- orphaned = g_array_new (FALSE,
- FALSE,
- sizeof (struct orphaned_shingle_elt));
-
- while (sqlite3_step (stmt) == SQLITE_ROW) {
- orphaned_elt.value = sqlite3_column_int64 (stmt, 0);
- orphaned_elt.number = sqlite3_column_int64 (stmt, 1);
- g_array_append_val (orphaned, orphaned_elt);
-
- if (orphaned->len > max_changes) {
- break;
- }
- }
-
- sqlite3_finalize (stmt);
- orphaned_cnt = orphaned->len;
-
- if (orphaned_cnt > 0) {
- msg_info_fuzzy_backend (
- "going to delete %ud orphaned shingles",
- orphaned_cnt);
- /* Need to delete orphaned elements */
- for (i = 0; i < (gint) orphaned_cnt; i++) {
- pelt = &g_array_index (orphaned,
- struct orphaned_shingle_elt,
- i);
- rspamd_fuzzy_backend_run_stmt (backend, TRUE,
- RSPAMD_FUZZY_BACKEND_DELETE_ORPHANED,
- pelt->value, pelt->number);
- }
- }
-
-
- g_array_free (orphaned, TRUE);
- }
-
- ret = rspamd_fuzzy_backend_run_stmt (backend, TRUE,
- RSPAMD_FUZZY_BACKEND_TRANSACTION_COMMIT);
-
- if (ret == SQLITE_OK) {
- msg_info_fuzzy_backend (
- "deleted %ud orphaned shingles",
- orphaned_cnt);
- }
- else {
- msg_warn_fuzzy_backend (
- "cannot synchronize fuzzy backend: %e",
- err);
- rspamd_fuzzy_backend_run_stmt (backend, TRUE,
- RSPAMD_FUZZY_BACKEND_TRANSACTION_ROLLBACK);
- }
- }
- }
-
- return ret;
+ struct rspamd_fuzzy_backend *bk = ud;
+ gdouble jittered;
+ struct timeval tv;
+
+ jittered = rspamd_time_jitter (bk->sync, bk->sync / 2.0);
+ double_to_tv (jittered, &tv);
+ event_del (&bk->expire_event);
+ bk->subr->expire (bk, bk->subr_ud);
+ event_add (&bk->expire_event, &tv);
}
-
void
-rspamd_fuzzy_backend_close (struct rspamd_fuzzy_backend *backend)
+rspamd_fuzzy_backend_start_expire (struct rspamd_fuzzy_backend *bk,
+ gdouble timeout)
{
- if (backend != NULL) {
- if (backend->db != NULL) {
- rspamd_fuzzy_backend_close_stmts (backend);
- sqlite3_close (backend->db);
- }
+ gdouble jittered;
+ struct timeval tv;
- if (backend->path != NULL) {
- g_free (backend->path);
- }
-
- if (backend->pool) {
- rspamd_mempool_delete (backend->pool);
- }
+ g_assert (bk != NULL);
- g_slice_free1 (sizeof (*backend), backend);
- }
-}
-
-
-gsize
-rspamd_fuzzy_backend_count (struct rspamd_fuzzy_backend *backend)
-{
- if (backend) {
- if (rspamd_fuzzy_backend_run_stmt (backend, FALSE,
- RSPAMD_FUZZY_BACKEND_COUNT) == SQLITE_OK) {
- backend->count = sqlite3_column_int64 (
- prepared_stmts[RSPAMD_FUZZY_BACKEND_COUNT].stmt, 0);
+ if (bk->subr->expire) {
+ if (bk->sync > 0.0) {
+ event_del (&bk->expire_event);
}
- rspamd_fuzzy_backend_cleanup_stmt (backend, RSPAMD_FUZZY_BACKEND_COUNT);
-
- return backend->count;
+ bk->subr->expire (bk, bk->subr_ud);
+ bk->sync = timeout;
+ jittered = rspamd_time_jitter (timeout, timeout / 2.0);
+ double_to_tv (jittered, &tv);
+ event_set (&bk->expire_event, -1, EV_TIMEOUT,
+ rspamd_fuzzy_backend_expire_cb, bk);
+ event_base_set (bk->ev_base, &bk->expire_event);
+ event_add (&bk->expire_event, &tv);
}
-
- return 0;
}
-gint
-rspamd_fuzzy_backend_version (struct rspamd_fuzzy_backend *backend,
- const gchar *source)
+void
+rspamd_fuzzy_backend_close (struct rspamd_fuzzy_backend *bk)
{
- gint ret = -1;
+ g_assert (bk != NULL);
- if (backend) {
- if (rspamd_fuzzy_backend_run_stmt (backend, FALSE,
- RSPAMD_FUZZY_BACKEND_VERSION, source) == SQLITE_OK) {
- ret = sqlite3_column_int64 (
- prepared_stmts[RSPAMD_FUZZY_BACKEND_VERSION].stmt, 0);
- }
+ bk->subr->close (bk, bk->subr_ud);
- rspamd_fuzzy_backend_cleanup_stmt (backend, RSPAMD_FUZZY_BACKEND_VERSION);
+ if (bk->sync > 0.0) {
+ bk->subr->expire (bk, bk->subr_ud);
+ event_del (&bk->expire_event);
}
- return ret;
-}
-
-gsize
-rspamd_fuzzy_backend_expired (struct rspamd_fuzzy_backend *backend)
-{
- return backend != NULL ? backend->expired : 0;
-}
-
-const gchar *
-rspamd_fuzzy_backend_id (struct rspamd_fuzzy_backend *backend)
-{
- return backend != NULL ? backend->id : 0;
+ g_slice_free1 (sizeof (*bk), bk);
}
diff --git a/src/libserver/fuzzy_backend.h b/src/libserver/fuzzy_backend.h
index a1736b676..adb7e5075 100644
--- a/src/libserver/fuzzy_backend.h
+++ b/src/libserver/fuzzy_backend.h
@@ -13,86 +13,92 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-#ifndef FUZZY_BACKEND_H_
-#define FUZZY_BACKEND_H_
+#ifndef SRC_LIBSERVER_FUZZY_BACKEND_H_
+#define SRC_LIBSERVER_FUZZY_BACKEND_H_
#include "config.h"
-#include "fuzzy_storage.h"
-
+#include <event.h>
+#include "fuzzy_wire.h"
struct rspamd_fuzzy_backend;
+/*
+ * Callbacks for fuzzy methods
+ */
+typedef void (*rspamd_fuzzy_check_cb) (struct rspamd_fuzzy_reply *rep, void *ud);
+typedef void (*rspamd_fuzzy_update_cb) (gboolean success, void *ud);
+typedef void (*rspamd_fuzzy_version_cb) (guint64 rev, void *ud);
+typedef void (*rspamd_fuzzy_count_cb) (guint64 count, void *ud);
+
/**
* Open fuzzy backend
- * @param path file to open (legacy file will be converted automatically)
- * @param err error pointer
- * @return backend structure or NULL
+ * @param ev_base
+ * @param config
+ * @param err
+ * @return
*/
-struct rspamd_fuzzy_backend *rspamd_fuzzy_backend_open (const gchar *path,
- gboolean vacuum,
- GError **err);
+struct rspamd_fuzzy_backend * rspamd_fuzzy_backend_create (struct event_base *ev_base,
+ const ucl_object_t *config, GError **err);
+
/**
- * Check specified fuzzy in the backend
- * @param backend
+ * Check a specific hash in storage
* @param cmd
- * @return reply with probability and weight
+ * @param cb
+ * @param ud
*/
-struct rspamd_fuzzy_reply rspamd_fuzzy_backend_check (
- struct rspamd_fuzzy_backend *backend,
+void rspamd_fuzzy_backend_check (struct rspamd_fuzzy_backend *bk,
const struct rspamd_fuzzy_cmd *cmd,
- gint64 expire);
+ rspamd_fuzzy_check_cb cb, void *ud);
/**
- * Prepare storage for updates (by starting transaction)
+ * Process updates for a specific queue
+ * @param bk
+ * @param updates queue of struct fuzzy_peer_cmd
+ * @param src
*/
-gboolean rspamd_fuzzy_backend_prepare_update (struct rspamd_fuzzy_backend *backend,
- const gchar *source);
+void rspamd_fuzzy_backend_process_updates (struct rspamd_fuzzy_backend *bk,
+ GQueue *updates, const gchar *src, rspamd_fuzzy_update_cb cb,
+ void *ud);
/**
- * Add digest to the database
- * @param backend
- * @param cmd
- * @return
+ * Gets number of hashes from the backend
+ * @param bk
+ * @param cb
+ * @param ud
*/
-gboolean rspamd_fuzzy_backend_add (struct rspamd_fuzzy_backend *backend,
- const struct rspamd_fuzzy_cmd *cmd);
+void rspamd_fuzzy_backend_count (struct rspamd_fuzzy_backend *bk,
+ rspamd_fuzzy_count_cb cb, void *ud);
/**
- * Delete digest from the database
- * @param backend
- * @param cmd
- * @return
+ * Returns number of revision for a specific source
+ * @param bk
+ * @param src
+ * @param cb
+ * @param ud
*/
-gboolean rspamd_fuzzy_backend_del (
- struct rspamd_fuzzy_backend *backend,
- const struct rspamd_fuzzy_cmd *cmd);
+void rspamd_fuzzy_backend_version (struct rspamd_fuzzy_backend *bk,
+ const gchar *src,
+ rspamd_fuzzy_version_cb cb, void *ud);
/**
- * Commit updates to storage
+ * Returns unique id for backend
+ * @param backend
+ * @return
*/
-gboolean rspamd_fuzzy_backend_finish_update (struct rspamd_fuzzy_backend *backend,
- const gchar *source, gboolean version_bump);
+const gchar * rspamd_fuzzy_backend_id (struct rspamd_fuzzy_backend *backend);
/**
- * Sync storage
+ * Starts expire process for the backend
* @param backend
- * @return
*/
-gboolean rspamd_fuzzy_backend_sync (struct rspamd_fuzzy_backend *backend,
- gint64 expire,
- gboolean clean_orphaned);
+void rspamd_fuzzy_backend_start_expire (struct rspamd_fuzzy_backend *backend,
+ gdouble timeout);
/**
- * Close storage
+ * Closes backend
* @param backend
*/
void rspamd_fuzzy_backend_close (struct rspamd_fuzzy_backend *backend);
-gsize rspamd_fuzzy_backend_count (struct rspamd_fuzzy_backend *backend);
-gint rspamd_fuzzy_backend_version (struct rspamd_fuzzy_backend *backend, const gchar *source);
-gsize rspamd_fuzzy_backend_expired (struct rspamd_fuzzy_backend *backend);
-
-const gchar * rspamd_fuzzy_backend_id (struct rspamd_fuzzy_backend *backend);
-
-#endif /* FUZZY_BACKEND_H_ */
+#endif /* SRC_LIBSERVER_FUZZY_BACKEND_H_ */
diff --git a/src/libserver/fuzzy_backend_sqlite.c b/src/libserver/fuzzy_backend_sqlite.c
new file mode 100644
index 000000000..834d274c8
--- /dev/null
+++ b/src/libserver/fuzzy_backend_sqlite.c
@@ -0,0 +1,1055 @@
+/*-
+ * Copyright 2016 Vsevolod Stakhov
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "config.h"
+#include "rspamd.h"
+#include "fuzzy_backend.h"
+#include "fuzzy_backend_sqlite.h"
+#include "unix-std.h"
+
+#include <sqlite3.h>
+#include "libutil/sqlite_utils.h"
+
+struct rspamd_fuzzy_backend_sqlite {
+ sqlite3 *db;
+ char *path;
+ gchar id[MEMPOOL_UID_LEN];
+ gsize count;
+ gsize expired;
+ rspamd_mempool_t *pool;
+};
+
+static const gdouble sql_sleep_time = 0.1;
+static const guint max_retries = 10;
+
+#define msg_err_fuzzy_backend(...) rspamd_default_log_function (G_LOG_LEVEL_CRITICAL, \
+ backend->pool->tag.tagname, backend->pool->tag.uid, \
+ G_STRFUNC, \
+ __VA_ARGS__)
+#define msg_warn_fuzzy_backend(...) rspamd_default_log_function (G_LOG_LEVEL_WARNING, \
+ backend->pool->tag.tagname, backend->pool->tag.uid, \
+ G_STRFUNC, \
+ __VA_ARGS__)
+#define msg_info_fuzzy_backend(...) rspamd_default_log_function (G_LOG_LEVEL_INFO, \
+ backend->pool->tag.tagname, backend->pool->tag.uid, \
+ G_STRFUNC, \
+ __VA_ARGS__)
+#define msg_debug_fuzzy_backend(...) rspamd_default_log_function (G_LOG_LEVEL_DEBUG, \
+ backend->pool->tag.tagname, backend->pool->tag.uid, \
+ G_STRFUNC, \
+ __VA_ARGS__)
+
+static const char *create_tables_sql =
+ "BEGIN;"
+ "CREATE TABLE IF NOT EXISTS digests("
+ " id INTEGER PRIMARY KEY,"
+ " flag INTEGER NOT NULL,"
+ " digest TEXT NOT NULL,"
+ " value INTEGER,"
+ " time INTEGER);"
+ "CREATE TABLE IF NOT EXISTS shingles("
+ " value INTEGER NOT NULL,"
+ " number INTEGER NOT NULL,"
+ " digest_id INTEGER REFERENCES digests(id) ON DELETE CASCADE "
+ " ON UPDATE CASCADE);"
+ "CREATE TABLE IF NOT EXISTS sources("
+ " name TEXT UNIQUE,"
+ " version INTEGER,"
+ " last INTEGER);"
+ "CREATE UNIQUE INDEX IF NOT EXISTS d ON digests(digest);"
+ "CREATE INDEX IF NOT EXISTS t ON digests(time);"
+ "CREATE INDEX IF NOT EXISTS dgst_id ON shingles(digest_id);"
+ "CREATE UNIQUE INDEX IF NOT EXISTS s ON shingles(value, number);"
+ "COMMIT;";
+#if 0
+static const char *create_index_sql =
+ "BEGIN;"
+ "CREATE UNIQUE INDEX IF NOT EXISTS d ON digests(digest);"
+ "CREATE INDEX IF NOT EXISTS t ON digests(time);"
+ "CREATE INDEX IF NOT EXISTS dgst_id ON shingles(digest_id);"
+ "CREATE UNIQUE INDEX IF NOT EXISTS s ON shingles(value, number);"
+ "COMMIT;";
+#endif
+enum rspamd_fuzzy_statement_idx {
+ RSPAMD_FUZZY_BACKEND_TRANSACTION_START = 0,
+ RSPAMD_FUZZY_BACKEND_TRANSACTION_COMMIT,
+ RSPAMD_FUZZY_BACKEND_TRANSACTION_ROLLBACK,
+ RSPAMD_FUZZY_BACKEND_INSERT,
+ RSPAMD_FUZZY_BACKEND_UPDATE,
+ RSPAMD_FUZZY_BACKEND_UPDATE_FLAG,
+ RSPAMD_FUZZY_BACKEND_INSERT_SHINGLE,
+ RSPAMD_FUZZY_BACKEND_CHECK,
+ RSPAMD_FUZZY_BACKEND_CHECK_SHINGLE,
+ RSPAMD_FUZZY_BACKEND_GET_DIGEST_BY_ID,
+ RSPAMD_FUZZY_BACKEND_DELETE,
+ RSPAMD_FUZZY_BACKEND_COUNT,
+ RSPAMD_FUZZY_BACKEND_EXPIRE,
+ RSPAMD_FUZZY_BACKEND_VACUUM,
+ RSPAMD_FUZZY_BACKEND_DELETE_ORPHANED,
+ RSPAMD_FUZZY_BACKEND_ADD_SOURCE,
+ RSPAMD_FUZZY_BACKEND_VERSION,
+ RSPAMD_FUZZY_BACKEND_SET_VERSION,
+ RSPAMD_FUZZY_BACKEND_MAX
+};
+static struct rspamd_fuzzy_stmts {
+ enum rspamd_fuzzy_statement_idx idx;
+ const gchar *sql;
+ const gchar *args;
+ sqlite3_stmt *stmt;
+ gint result;
+} prepared_stmts[RSPAMD_FUZZY_BACKEND_MAX] =
+{
+ {
+ .idx = RSPAMD_FUZZY_BACKEND_TRANSACTION_START,
+ .sql = "BEGIN TRANSACTION;",
+ .args = "",
+ .stmt = NULL,
+ .result = SQLITE_DONE
+ },
+ {
+ .idx = RSPAMD_FUZZY_BACKEND_TRANSACTION_COMMIT,
+ .sql = "COMMIT;",
+ .args = "",
+ .stmt = NULL,
+ .result = SQLITE_DONE
+ },
+ {
+ .idx = RSPAMD_FUZZY_BACKEND_TRANSACTION_ROLLBACK,
+ .sql = "ROLLBACK;",
+ .args = "",
+ .stmt = NULL,
+ .result = SQLITE_DONE
+ },
+ {
+ .idx = RSPAMD_FUZZY_BACKEND_INSERT,
+ .sql = "INSERT INTO digests(flag, digest, value, time) VALUES"
+ "(?1, ?2, ?3, strftime('%s','now'));",
+ .args = "SDI",
+ .stmt = NULL,
+ .result = SQLITE_DONE
+ },
+ {
+ .idx = RSPAMD_FUZZY_BACKEND_UPDATE,
+ .sql = "UPDATE digests SET value = value + ?1, time = strftime('%s','now') WHERE "
+ "digest==?2;",
+ .args = "ID",
+ .stmt = NULL,
+ .result = SQLITE_DONE
+ },
+ {
+ .idx = RSPAMD_FUZZY_BACKEND_UPDATE_FLAG,
+ .sql = "UPDATE digests SET value = ?1, flag = ?2, time = strftime('%s','now') WHERE "
+ "digest==?3;",
+ .args = "IID",
+ .stmt = NULL,
+ .result = SQLITE_DONE
+ },
+ {
+ .idx = RSPAMD_FUZZY_BACKEND_INSERT_SHINGLE,
+ .sql = "INSERT OR REPLACE INTO shingles(value, number, digest_id) "
+ "VALUES (?1, ?2, ?3);",
+ .args = "III",
+ .stmt = NULL,
+ .result = SQLITE_DONE
+ },
+ {
+ .idx = RSPAMD_FUZZY_BACKEND_CHECK,
+ .sql = "SELECT value, time, flag FROM digests WHERE digest==?1;",
+ .args = "D",
+ .stmt = NULL,
+ .result = SQLITE_ROW
+ },
+ {
+ .idx = RSPAMD_FUZZY_BACKEND_CHECK_SHINGLE,
+ .sql = "SELECT digest_id FROM shingles WHERE value=?1 AND number=?2",
+ .args = "IS",
+ .stmt = NULL,
+ .result = SQLITE_ROW
+ },
+ {
+ .idx = RSPAMD_FUZZY_BACKEND_GET_DIGEST_BY_ID,
+ .sql = "SELECT digest, value, time, flag FROM digests WHERE id=?1",
+ .args = "I",
+ .stmt = NULL,
+ .result = SQLITE_ROW
+ },
+ {
+ .idx = RSPAMD_FUZZY_BACKEND_DELETE,
+ .sql = "DELETE FROM digests WHERE digest==?1;",
+ .args = "D",
+ .stmt = NULL,
+ .result = SQLITE_DONE
+ },
+ {
+ .idx = RSPAMD_FUZZY_BACKEND_COUNT,
+ .sql = "SELECT COUNT(*) FROM digests;",
+ .args = "",
+ .stmt = NULL,
+ .result = SQLITE_ROW
+ },
+ {
+ .idx = RSPAMD_FUZZY_BACKEND_EXPIRE,
+ .sql = "DELETE FROM digests WHERE id IN (SELECT id FROM digests WHERE time < ?1 LIMIT ?2);",
+ .args = "II",
+ .stmt = NULL,
+ .result = SQLITE_DONE
+ },
+ {
+ .idx = RSPAMD_FUZZY_BACKEND_VACUUM,
+ .sql = "VACUUM;",
+ .args = "",
+ .stmt = NULL,
+ .result = SQLITE_DONE
+ },
+ {
+ .idx = RSPAMD_FUZZY_BACKEND_DELETE_ORPHANED,
+ .sql = "DELETE FROM shingles WHERE value=?1 AND number=?2;",
+ .args = "II",
+ .stmt = NULL,
+ .result = SQLITE_DONE
+ },
+ {
+ .idx = RSPAMD_FUZZY_BACKEND_ADD_SOURCE,
+ .sql = "INSERT OR IGNORE INTO sources(name, version, last) VALUES (?1, ?2, ?3);",
+ .args = "TII",
+ .stmt = NULL,
+ .result = SQLITE_DONE
+ },
+ {
+ .idx = RSPAMD_FUZZY_BACKEND_VERSION,
+ .sql = "SELECT version FROM sources WHERE name=?1;",
+ .args = "T",
+ .stmt = NULL,
+ .result = SQLITE_ROW
+ },
+ {
+ .idx = RSPAMD_FUZZY_BACKEND_SET_VERSION,
+ .sql = "INSERT OR REPLACE INTO sources (name, version, last) VALUES (?3, ?1, ?2);",
+ .args = "IIT",
+ .stmt = NULL,
+ .result = SQLITE_DONE
+ },
+};
+
+static GQuark
+rspamd_fuzzy_backend_sqlite_quark (void)
+{
+ return g_quark_from_static_string ("fuzzy-backend-sqlite");
+}
+
+static gboolean
+rspamd_fuzzy_backend_sqlite_prepare_stmts (struct rspamd_fuzzy_backend_sqlite *bk, GError **err)
+{
+ int i;
+
+ for (i = 0; i < RSPAMD_FUZZY_BACKEND_MAX; i ++) {
+ if (prepared_stmts[i].stmt != NULL) {
+ /* Skip already prepared statements */
+ continue;
+ }
+ if (sqlite3_prepare_v2 (bk->db, prepared_stmts[i].sql, -1,
+ &prepared_stmts[i].stmt, NULL) != SQLITE_OK) {
+ g_set_error (err, rspamd_fuzzy_backend_sqlite_quark (),
+ -1, "Cannot initialize prepared sql `%s`: %s",
+ prepared_stmts[i].sql, sqlite3_errmsg (bk->db));
+
+ return FALSE;
+ }
+ }
+
+ return TRUE;
+}
+
+static int
+rspamd_fuzzy_backend_sqlite_cleanup_stmt (struct rspamd_fuzzy_backend_sqlite *backend,
+ int idx)
+{
+ sqlite3_stmt *stmt;
+
+ if (idx < 0 || idx >= RSPAMD_FUZZY_BACKEND_MAX) {
+
+ return -1;
+ }
+
+ msg_debug_fuzzy_backend ("reseting `%s`", prepared_stmts[idx].sql);
+ stmt = prepared_stmts[idx].stmt;
+ sqlite3_clear_bindings (stmt);
+ sqlite3_reset (stmt);
+
+ return SQLITE_OK;
+}
+
+static int
+rspamd_fuzzy_backend_sqlite_run_stmt (struct rspamd_fuzzy_backend_sqlite *backend,
+ gboolean auto_cleanup,
+ int idx, ...)
+{
+ int retcode;
+ va_list ap;
+ sqlite3_stmt *stmt;
+ int i;
+ const char *argtypes;
+ guint retries = 0;
+ struct timespec ts;
+
+ if (idx < 0 || idx >= RSPAMD_FUZZY_BACKEND_MAX) {
+
+ return -1;
+ }
+
+ stmt = prepared_stmts[idx].stmt;
+ g_assert ((int)prepared_stmts[idx].idx == idx);
+
+ if (stmt == NULL) {
+ if ((retcode = sqlite3_prepare_v2 (backend->db, prepared_stmts[idx].sql, -1,
+ &prepared_stmts[idx].stmt, NULL)) != SQLITE_OK) {
+ msg_err_fuzzy_backend ("Cannot initialize prepared sql `%s`: %s",
+ prepared_stmts[idx].sql, sqlite3_errmsg (backend->db));
+
+ return retcode;
+ }
+ stmt = prepared_stmts[idx].stmt;
+ }
+
+ msg_debug_fuzzy_backend ("executing `%s` %s auto cleanup",
+ prepared_stmts[idx].sql, auto_cleanup ? "with" : "without");
+ argtypes = prepared_stmts[idx].args;
+ sqlite3_clear_bindings (stmt);
+ sqlite3_reset (stmt);
+ va_start (ap, idx);
+
+ for (i = 0; argtypes[i] != '\0'; i++) {
+ switch (argtypes[i]) {
+ case 'T':
+ sqlite3_bind_text (stmt, i + 1, va_arg (ap, const char*), -1,
+ SQLITE_STATIC);
+ break;
+ case 'I':
+ sqlite3_bind_int64 (stmt, i + 1, va_arg (ap, gint64));
+ break;
+ case 'S':
+ sqlite3_bind_int (stmt, i + 1, va_arg (ap, gint));
+ break;
+ case 'D':
+ /* Special case for digests variable */
+ sqlite3_bind_text (stmt, i + 1, va_arg (ap, const char*), 64,
+ SQLITE_STATIC);
+ break;
+ }
+ }
+
+ va_end (ap);
+
+retry:
+ retcode = sqlite3_step (stmt);
+
+ if (retcode == prepared_stmts[idx].result) {
+ retcode = SQLITE_OK;
+ }
+ else {
+ if ((retcode == SQLITE_BUSY ||
+ retcode == SQLITE_LOCKED) && retries++ < max_retries) {
+ double_to_ts (sql_sleep_time, &ts);
+ nanosleep (&ts, NULL);
+ goto retry;
+ }
+
+ msg_debug_fuzzy_backend ("failed to execute query %s: %d, %s", prepared_stmts[idx].sql,
+ retcode, sqlite3_errmsg (backend->db));
+ }
+
+ if (auto_cleanup) {
+ sqlite3_clear_bindings (stmt);
+ sqlite3_reset (stmt);
+ }
+
+ return retcode;
+}
+
+static void
+rspamd_fuzzy_backend_sqlite_close_stmts (struct rspamd_fuzzy_backend_sqlite *bk)
+{
+ int i;
+
+ for (i = 0; i < RSPAMD_FUZZY_BACKEND_MAX; i++) {
+ if (prepared_stmts[i].stmt != NULL) {
+ sqlite3_finalize (prepared_stmts[i].stmt);
+ prepared_stmts[i].stmt = NULL;
+ }
+ }
+
+ return;
+}
+
+static gboolean
+rspamd_fuzzy_backend_sqlite_run_sql (const gchar *sql, struct rspamd_fuzzy_backend_sqlite *bk,
+ GError **err)
+{
+ guint retries = 0;
+ struct timespec ts;
+ gint ret;
+
+ do {
+ ret = sqlite3_exec (bk->db, sql, NULL, NULL, NULL);
+ double_to_ts (sql_sleep_time, &ts);
+ } while (ret == SQLITE_BUSY && retries++ < max_retries &&
+ nanosleep (&ts, NULL) == 0);
+
+ if (ret != SQLITE_OK) {
+ g_set_error (err, rspamd_fuzzy_backend_sqlite_quark (),
+ -1, "Cannot execute raw sql `%s`: %s",
+ sql, sqlite3_errmsg (bk->db));
+ return FALSE;
+ }
+
+ return TRUE;
+}
+
+static struct rspamd_fuzzy_backend_sqlite *
+rspamd_fuzzy_backend_sqlite_open_db (const gchar *path, GError **err)
+{
+ struct rspamd_fuzzy_backend_sqlite *bk;
+ rspamd_cryptobox_hash_state_t st;
+ guchar hash_out[rspamd_cryptobox_HASHBYTES];
+
+ g_assert (path != NULL);
+
+ bk = g_slice_alloc (sizeof (*bk));
+ bk->path = g_strdup (path);
+ bk->expired = 0;
+ bk->pool = rspamd_mempool_new (rspamd_mempool_suggest_size (), "fuzzy_backend");
+ bk->db = rspamd_sqlite3_open_or_create (bk->pool, bk->path,
+ create_tables_sql, 1, err);
+
+ if (bk->db == NULL) {
+ rspamd_fuzzy_backend_sqlite_close (bk);
+
+ return NULL;
+ }
+
+ if (!rspamd_fuzzy_backend_sqlite_prepare_stmts (bk, err)) {
+ rspamd_fuzzy_backend_sqlite_close (bk);
+
+ return NULL;
+ }
+
+ /* Set id for the backend */
+ rspamd_cryptobox_hash_init (&st, NULL, 0);
+ rspamd_cryptobox_hash_update (&st, path, strlen (path));
+ rspamd_cryptobox_hash_final (&st, hash_out);
+ rspamd_snprintf (bk->id, sizeof (bk->id), "%xs", hash_out);
+ memcpy (bk->pool->tag.uid, bk->id, sizeof (bk->pool->tag.uid));
+
+ return bk;
+}
+
+struct rspamd_fuzzy_backend_sqlite *
+rspamd_fuzzy_backend_sqlite_open (const gchar *path,
+ gboolean vacuum,
+ GError **err)
+{
+ struct rspamd_fuzzy_backend_sqlite *backend;
+
+ if (path == NULL) {
+ g_set_error (err, rspamd_fuzzy_backend_sqlite_quark (),
+ ENOENT, "Path has not been specified");
+ return NULL;
+ }
+
+ /* Open database */
+ if ((backend = rspamd_fuzzy_backend_sqlite_open_db (path, err)) == NULL) {
+ return NULL;
+ }
+
+ if (rspamd_fuzzy_backend_sqlite_run_stmt (backend, FALSE, RSPAMD_FUZZY_BACKEND_COUNT)
+ == SQLITE_OK) {
+ backend->count = sqlite3_column_int64 (
+ prepared_stmts[RSPAMD_FUZZY_BACKEND_COUNT].stmt, 0);
+ }
+
+ rspamd_fuzzy_backend_sqlite_cleanup_stmt (backend, RSPAMD_FUZZY_BACKEND_COUNT);
+
+ return backend;
+}
+
+static gint
+rspamd_fuzzy_backend_sqlite_int64_cmp (const void *a, const void *b)
+{
+ gint64 ia = *(gint64 *)a, ib = *(gint64 *)b;
+
+ return (ia - ib);
+}
+
+struct rspamd_fuzzy_reply
+rspamd_fuzzy_backend_sqlite_check (struct rspamd_fuzzy_backend_sqlite *backend,
+ const struct rspamd_fuzzy_cmd *cmd, gint64 expire)
+{
+ struct rspamd_fuzzy_reply rep = {0, 0, 0, 0.0};
+ const struct rspamd_fuzzy_shingle_cmd *shcmd;
+ int rc;
+ gint64 timestamp;
+ gint64 shingle_values[RSPAMD_SHINGLE_SIZE], i, sel_id, cur_id,
+ cur_cnt, max_cnt;
+
+ if (backend == NULL) {
+ return rep;
+ }
+
+ /* Try direct match first of all */
+ rspamd_fuzzy_backend_sqlite_run_stmt (backend, TRUE,
+ RSPAMD_FUZZY_BACKEND_TRANSACTION_START);
+ rc = rspamd_fuzzy_backend_sqlite_run_stmt (backend, FALSE,
+ RSPAMD_FUZZY_BACKEND_CHECK,
+ cmd->digest);
+
+ if (rc == SQLITE_OK) {
+ timestamp = sqlite3_column_int64 (
+ prepared_stmts[RSPAMD_FUZZY_BACKEND_CHECK].stmt, 1);
+ if (time (NULL) - timestamp > expire) {
+ /* Expire element */
+ msg_debug_fuzzy_backend ("requested hash has been expired");
+ }
+ else {
+ rep.value = sqlite3_column_int64 (
+ prepared_stmts[RSPAMD_FUZZY_BACKEND_CHECK].stmt, 0);
+ rep.prob = 1.0;
+ rep.flag = sqlite3_column_int (
+ prepared_stmts[RSPAMD_FUZZY_BACKEND_CHECK].stmt, 2);
+ }
+ }
+ else if (cmd->shingles_count > 0) {
+ /* Fuzzy match */
+
+ rspamd_fuzzy_backend_sqlite_cleanup_stmt (backend, RSPAMD_FUZZY_BACKEND_CHECK);
+ shcmd = (const struct rspamd_fuzzy_shingle_cmd *)cmd;
+
+ for (i = 0; i < RSPAMD_SHINGLE_SIZE; i ++) {
+ rc = rspamd_fuzzy_backend_sqlite_run_stmt (backend, FALSE,
+ RSPAMD_FUZZY_BACKEND_CHECK_SHINGLE,
+ shcmd->sgl.hashes[i], i);
+ if (rc == SQLITE_OK) {
+ shingle_values[i] = sqlite3_column_int64 (
+ prepared_stmts[RSPAMD_FUZZY_BACKEND_CHECK_SHINGLE].stmt,
+ 0);
+ }
+ else {
+ shingle_values[i] = -1;
+ }
+ msg_debug_fuzzy_backend ("looking for shingle %L -> %L: %d", i,
+ shcmd->sgl.hashes[i], rc);
+ }
+
+ rspamd_fuzzy_backend_sqlite_cleanup_stmt (backend,
+ RSPAMD_FUZZY_BACKEND_CHECK_SHINGLE);
+
+ qsort (shingle_values, RSPAMD_SHINGLE_SIZE, sizeof (gint64),
+ rspamd_fuzzy_backend_sqlite_int64_cmp);
+ sel_id = -1;
+ cur_id = -1;
+ cur_cnt = 0;
+ max_cnt = 0;
+
+ for (i = 0; i < RSPAMD_SHINGLE_SIZE; i ++) {
+ if (shingle_values[i] == -1) {
+ continue;
+ }
+
+ /* We have some value here, so we need to check it */
+ if (shingle_values[i] == cur_id) {
+ cur_cnt ++;
+ }
+ else {
+ cur_id = shingle_values[i];
+ if (cur_cnt >= max_cnt) {
+ max_cnt = cur_cnt;
+ sel_id = cur_id;
+ }
+ cur_cnt = 0;
+ }
+ }
+
+ if (cur_cnt > max_cnt) {
+ max_cnt = cur_cnt;
+ }
+
+ if (sel_id != -1) {
+ /* We have some id selected here */
+ rep.prob = (float)max_cnt / (float)RSPAMD_SHINGLE_SIZE;
+
+ if (rep.prob > 0.5) {
+ msg_debug_fuzzy_backend (
+ "found fuzzy hash with probability %.2f",
+ rep.prob);
+ rc = rspamd_fuzzy_backend_sqlite_run_stmt (backend, FALSE,
+ RSPAMD_FUZZY_BACKEND_GET_DIGEST_BY_ID, sel_id);
+ if (rc == SQLITE_OK) {
+ timestamp = sqlite3_column_int64 (
+ prepared_stmts[RSPAMD_FUZZY_BACKEND_GET_DIGEST_BY_ID].stmt,
+ 2);
+ if (time (NULL) - timestamp > expire) {
+ /* Expire element */
+ msg_debug_fuzzy_backend (
+ "requested hash has been expired");
+ rep.prob = 0.0;
+ }
+ else {
+ rep.value = sqlite3_column_int64 (
+ prepared_stmts[RSPAMD_FUZZY_BACKEND_GET_DIGEST_BY_ID].stmt,
+ 1);
+ rep.flag = sqlite3_column_int (
+ prepared_stmts[RSPAMD_FUZZY_BACKEND_GET_DIGEST_BY_ID].stmt,
+ 3);
+ }
+ }
+ }
+ else {
+ /* Otherwise we assume that as error */
+ rep.value = 0;
+ }
+
+ rspamd_fuzzy_backend_sqlite_cleanup_stmt (backend,
+ RSPAMD_FUZZY_BACKEND_GET_DIGEST_BY_ID);
+ }
+ }
+
+ rspamd_fuzzy_backend_sqlite_cleanup_stmt (backend, RSPAMD_FUZZY_BACKEND_CHECK);
+ rspamd_fuzzy_backend_sqlite_run_stmt (backend, TRUE,
+ RSPAMD_FUZZY_BACKEND_TRANSACTION_COMMIT);
+
+ return rep;
+}
+
+gboolean
+rspamd_fuzzy_backend_sqlite_prepare_update (struct rspamd_fuzzy_backend_sqlite *backend,
+ const gchar *source)
+{
+ gint rc;
+
+ if (backend == NULL) {
+ return FALSE;
+ }
+
+ rc = rspamd_fuzzy_backend_sqlite_run_stmt (backend, TRUE,
+ RSPAMD_FUZZY_BACKEND_TRANSACTION_START);
+
+ if (rc != SQLITE_OK) {
+ msg_warn_fuzzy_backend ("cannot start transaction for updates: %s",
+ sqlite3_errmsg (backend->db));
+ return FALSE;
+ }
+
+ return TRUE;
+}
+
+gboolean
+rspamd_fuzzy_backend_sqlite_add (struct rspamd_fuzzy_backend_sqlite *backend,
+ const struct rspamd_fuzzy_cmd *cmd)
+{
+ int rc, i;
+ gint64 id, flag;
+ const struct rspamd_fuzzy_shingle_cmd *shcmd;
+
+ if (backend == NULL) {
+ return FALSE;
+ }
+
+ rc = rspamd_fuzzy_backend_sqlite_run_stmt (backend, FALSE,
+ RSPAMD_FUZZY_BACKEND_CHECK,
+ cmd->digest);
+
+ if (rc == SQLITE_OK) {
+ /* Check flag */
+ flag = sqlite3_column_int64 (
+ prepared_stmts[RSPAMD_FUZZY_BACKEND_CHECK].stmt,
+ 2);
+ rspamd_fuzzy_backend_sqlite_cleanup_stmt (backend, RSPAMD_FUZZY_BACKEND_CHECK);
+
+ if (flag == cmd->flag) {
+ /* We need to increase weight */
+ rc = rspamd_fuzzy_backend_sqlite_run_stmt (backend, TRUE,
+ RSPAMD_FUZZY_BACKEND_UPDATE,
+ (gint64) cmd->value,
+ cmd->digest);
+ if (rc != SQLITE_OK) {
+ msg_warn_fuzzy_backend ("cannot update hash to %d -> "
+ "%*xs: %s", (gint) cmd->flag,
+ (gint) sizeof (cmd->digest), cmd->digest,
+ sqlite3_errmsg (backend->db));
+ }
+ }
+ else {
+ /* We need to relearn actually */
+
+ rc = rspamd_fuzzy_backend_sqlite_run_stmt (backend, TRUE,
+ RSPAMD_FUZZY_BACKEND_UPDATE_FLAG,
+ (gint64) cmd->value,
+ (gint64) cmd->flag,
+ cmd->digest);
+
+ if (rc != SQLITE_OK) {
+ msg_warn_fuzzy_backend ("cannot update hash to %d -> "
+ "%*xs: %s", (gint) cmd->flag,
+ (gint) sizeof (cmd->digest), cmd->digest,
+ sqlite3_errmsg (backend->db));
+ }
+ }
+ }
+ else {
+ rspamd_fuzzy_backend_sqlite_cleanup_stmt (backend, RSPAMD_FUZZY_BACKEND_CHECK);
+ rc = rspamd_fuzzy_backend_sqlite_run_stmt (backend, FALSE,
+ RSPAMD_FUZZY_BACKEND_INSERT,
+ (gint) cmd->flag,
+ cmd->digest,
+ (gint64) cmd->value);
+
+ if (rc == SQLITE_OK) {
+ if (cmd->shingles_count > 0) {
+ id = sqlite3_last_insert_rowid (backend->db);
+ shcmd = (const struct rspamd_fuzzy_shingle_cmd *) cmd;
+
+ for (i = 0; i < RSPAMD_SHINGLE_SIZE; i++) {
+ rc = rspamd_fuzzy_backend_sqlite_run_stmt (backend, TRUE,
+ RSPAMD_FUZZY_BACKEND_INSERT_SHINGLE,
+ shcmd->sgl.hashes[i], (gint64)i, id);
+ msg_debug_fuzzy_backend ("add shingle %d -> %L: %L",
+ i,
+ shcmd->sgl.hashes[i],
+ id);
+
+ if (rc != SQLITE_OK) {
+ msg_warn_fuzzy_backend ("cannot add shingle %d -> "
+ "%L: %L: %s", i,
+ shcmd->sgl.hashes[i],
+ id, sqlite3_errmsg (backend->db));
+ }
+ }
+ }
+ }
+ else {
+ msg_warn_fuzzy_backend ("cannot add hash to %d -> "
+ "%*xs: %s", (gint)cmd->flag,
+ (gint)sizeof (cmd->digest), cmd->digest,
+ sqlite3_errmsg (backend->db));
+ }
+
+ rspamd_fuzzy_backend_sqlite_cleanup_stmt (backend,
+ RSPAMD_FUZZY_BACKEND_INSERT);
+ }
+
+ return (rc == SQLITE_OK);
+}
+
+gboolean
+rspamd_fuzzy_backend_sqlite_finish_update (struct rspamd_fuzzy_backend_sqlite *backend,
+ const gchar *source, gboolean version_bump)
+{
+ gint rc = SQLITE_OK, wal_frames, wal_checkpointed, ver;
+
+ /* Get and update version */
+ if (version_bump) {
+ ver = rspamd_fuzzy_backend_sqlite_version (backend, source);
+ ++ver;
+
+ rc = rspamd_fuzzy_backend_sqlite_run_stmt (backend, TRUE,
+ RSPAMD_FUZZY_BACKEND_SET_VERSION,
+ (gint64)ver, (gint64)time (NULL), source);
+ }
+
+ if (rc == SQLITE_OK) {
+ rc = rspamd_fuzzy_backend_sqlite_run_stmt (backend, TRUE,
+ RSPAMD_FUZZY_BACKEND_TRANSACTION_COMMIT);
+
+ if (rc != SQLITE_OK) {
+ msg_warn_fuzzy_backend ("cannot commit updates: %s",
+ sqlite3_errmsg (backend->db));
+ rspamd_fuzzy_backend_sqlite_run_stmt (backend, TRUE,
+ RSPAMD_FUZZY_BACKEND_TRANSACTION_ROLLBACK);
+ return FALSE;
+ }
+ else {
+ if (!rspamd_sqlite3_sync (backend->db, &wal_frames, &wal_checkpointed)) {
+ msg_warn_fuzzy_backend ("cannot commit checkpoint: %s",
+ sqlite3_errmsg (backend->db));
+ }
+ else if (wal_checkpointed > 0) {
+ msg_info_fuzzy_backend ("total number of frames in the wal file: "
+ "%d, checkpointed: %d", wal_frames, wal_checkpointed);
+ }
+ }
+ }
+ else {
+ msg_warn_fuzzy_backend ("cannot update version for %s: %s", source,
+ sqlite3_errmsg (backend->db));
+ rspamd_fuzzy_backend_sqlite_run_stmt (backend, TRUE,
+ RSPAMD_FUZZY_BACKEND_TRANSACTION_ROLLBACK);
+ return FALSE;
+ }
+
+ return TRUE;
+}
+
+gboolean
+rspamd_fuzzy_backend_sqlite_del (struct rspamd_fuzzy_backend_sqlite *backend,
+ const struct rspamd_fuzzy_cmd *cmd)
+{
+ int rc = -1;
+
+ if (backend == NULL) {
+ return FALSE;
+ }
+
+ rc = rspamd_fuzzy_backend_sqlite_run_stmt (backend, FALSE,
+ RSPAMD_FUZZY_BACKEND_CHECK,
+ cmd->digest);
+
+ if (rc == SQLITE_OK) {
+ rspamd_fuzzy_backend_sqlite_cleanup_stmt (backend, RSPAMD_FUZZY_BACKEND_CHECK);
+
+ rc = rspamd_fuzzy_backend_sqlite_run_stmt (backend, TRUE,
+ RSPAMD_FUZZY_BACKEND_DELETE,
+ cmd->digest);
+ if (rc != SQLITE_OK) {
+ msg_warn_fuzzy_backend ("cannot update hash to %d -> "
+ "%*xs: %s", (gint) cmd->flag,
+ (gint) sizeof (cmd->digest), cmd->digest,
+ sqlite3_errmsg (backend->db));
+ }
+ }
+ else {
+ /* Hash is missing */
+ rspamd_fuzzy_backend_sqlite_cleanup_stmt (backend, RSPAMD_FUZZY_BACKEND_CHECK);
+ }
+
+ return (rc == SQLITE_OK);
+}
+
+gboolean
+rspamd_fuzzy_backend_sqlite_sync (struct rspamd_fuzzy_backend_sqlite *backend,
+ gint64 expire,
+ gboolean clean_orphaned)
+{
+ struct orphaned_shingle_elt {
+ gint64 value;
+ gint64 number;
+ };
+
+ /* Do not do more than 5k ops per step */
+ const guint64 max_changes = 5000;
+ gboolean ret = FALSE;
+ gint64 expire_lim, expired;
+ gint rc, i, orphaned_cnt = 0;
+ GError *err = NULL;
+ static const gchar orphaned_shingles[] = "SELECT shingles.value,shingles.number "
+ "FROM shingles "
+ "LEFT JOIN digests ON "
+ "shingles.digest_id=digests.id WHERE "
+ "digests.id IS NULL;";
+ sqlite3_stmt *stmt;
+ GArray *orphaned;
+ struct orphaned_shingle_elt orphaned_elt, *pelt;
+
+
+ if (backend == NULL) {
+ return FALSE;
+ }
+
+ /* Perform expire */
+ if (expire > 0) {
+ expire_lim = time (NULL) - expire;
+
+ if (expire_lim > 0) {
+ ret = rspamd_fuzzy_backend_sqlite_run_stmt (backend, TRUE,
+ RSPAMD_FUZZY_BACKEND_TRANSACTION_START);
+
+ if (ret == SQLITE_OK) {
+
+ rc = rspamd_fuzzy_backend_sqlite_run_stmt (backend, FALSE,
+ RSPAMD_FUZZY_BACKEND_EXPIRE, expire_lim, max_changes);
+
+ if (rc == SQLITE_OK) {
+ expired = sqlite3_changes (backend->db);
+
+ if (expired > 0) {
+ backend->expired += expired;
+ msg_info_fuzzy_backend ("expired %L hashes", expired);
+ }
+ }
+ else {
+ msg_warn_fuzzy_backend (
+ "cannot execute expired statement: %s",
+ sqlite3_errmsg (backend->db));
+ }
+
+ rspamd_fuzzy_backend_sqlite_cleanup_stmt (backend,
+ RSPAMD_FUZZY_BACKEND_EXPIRE);
+
+ ret = rspamd_fuzzy_backend_sqlite_run_stmt (backend, TRUE,
+ RSPAMD_FUZZY_BACKEND_TRANSACTION_COMMIT);
+
+ if (ret != SQLITE_OK) {
+ rspamd_fuzzy_backend_sqlite_run_stmt (backend, TRUE,
+ RSPAMD_FUZZY_BACKEND_TRANSACTION_ROLLBACK);
+ }
+ }
+ if (ret != SQLITE_OK) {
+ msg_warn_fuzzy_backend ("cannot expire db: %s",
+ sqlite3_errmsg (backend->db));
+ }
+ }
+ }
+
+ /* Cleanup database */
+ if (clean_orphaned) {
+ ret = rspamd_fuzzy_backend_sqlite_run_stmt (backend, TRUE,
+ RSPAMD_FUZZY_BACKEND_TRANSACTION_START);
+
+ if (ret == SQLITE_OK) {
+ if ((rc = sqlite3_prepare_v2 (backend->db,
+ orphaned_shingles,
+ -1,
+ &stmt,
+ NULL)) != SQLITE_OK) {
+ msg_warn_fuzzy_backend ("cannot cleanup shingles: %s",
+ sqlite3_errmsg (backend->db));
+ }
+ else {
+ orphaned = g_array_new (FALSE,
+ FALSE,
+ sizeof (struct orphaned_shingle_elt));
+
+ while (sqlite3_step (stmt) == SQLITE_ROW) {
+ orphaned_elt.value = sqlite3_column_int64 (stmt, 0);
+ orphaned_elt.number = sqlite3_column_int64 (stmt, 1);
+ g_array_append_val (orphaned, orphaned_elt);
+
+ if (orphaned->len > max_changes) {
+ break;
+ }
+ }
+
+ sqlite3_finalize (stmt);
+ orphaned_cnt = orphaned->len;
+
+ if (orphaned_cnt > 0) {
+ msg_info_fuzzy_backend (
+ "going to delete %ud orphaned shingles",
+ orphaned_cnt);
+ /* Need to delete orphaned elements */
+ for (i = 0; i < (gint) orphaned_cnt; i++) {
+ pelt = &g_array_index (orphaned,
+ struct orphaned_shingle_elt,
+ i);
+ rspamd_fuzzy_backend_sqlite_run_stmt (backend, TRUE,
+ RSPAMD_FUZZY_BACKEND_DELETE_ORPHANED,
+ pelt->value, pelt->number);
+ }
+ }
+
+
+ g_array_free (orphaned, TRUE);
+ }
+
+ ret = rspamd_fuzzy_backend_sqlite_run_stmt (backend, TRUE,
+ RSPAMD_FUZZY_BACKEND_TRANSACTION_COMMIT);
+
+ if (ret == SQLITE_OK) {
+ msg_info_fuzzy_backend (
+ "deleted %ud orphaned shingles",
+ orphaned_cnt);
+ }
+ else {
+ msg_warn_fuzzy_backend (
+ "cannot synchronize fuzzy backend: %e",
+ err);
+ rspamd_fuzzy_backend_sqlite_run_stmt (backend, TRUE,
+ RSPAMD_FUZZY_BACKEND_TRANSACTION_ROLLBACK);
+ }
+ }
+ }
+
+ return ret;
+}
+
+
+void
+rspamd_fuzzy_backend_sqlite_close (struct rspamd_fuzzy_backend_sqlite *backend)
+{
+ if (backend != NULL) {
+ if (backend->db != NULL) {
+ rspamd_fuzzy_backend_sqlite_close_stmts (backend);
+ sqlite3_close (backend->db);
+ }
+
+ if (backend->path != NULL) {
+ g_free (backend->path);
+ }
+
+ if (backend->pool) {
+ rspamd_mempool_delete (backend->pool);
+ }
+
+ g_slice_free1 (sizeof (*backend), backend);
+ }
+}
+
+
+gsize
+rspamd_fuzzy_backend_sqlite_count (struct rspamd_fuzzy_backend_sqlite *backend)
+{
+ if (backend) {
+ if (rspamd_fuzzy_backend_sqlite_run_stmt (backend, FALSE,
+ RSPAMD_FUZZY_BACKEND_COUNT) == SQLITE_OK) {
+ backend->count = sqlite3_column_int64 (
+ prepared_stmts[RSPAMD_FUZZY_BACKEND_COUNT].stmt, 0);
+ }
+
+ rspamd_fuzzy_backend_sqlite_cleanup_stmt (backend, RSPAMD_FUZZY_BACKEND_COUNT);
+
+ return backend->count;
+ }
+
+ return 0;
+}
+
+gint
+rspamd_fuzzy_backend_sqlite_version (struct rspamd_fuzzy_backend_sqlite *backend,
+ const gchar *source)
+{
+ gint ret = -1;
+
+ if (backend) {
+ if (rspamd_fuzzy_backend_sqlite_run_stmt (backend, FALSE,
+ RSPAMD_FUZZY_BACKEND_VERSION, source) == SQLITE_OK) {
+ ret = sqlite3_column_int64 (
+ prepared_stmts[RSPAMD_FUZZY_BACKEND_VERSION].stmt, 0);
+ }
+
+ rspamd_fuzzy_backend_sqlite_cleanup_stmt (backend, RSPAMD_FUZZY_BACKEND_VERSION);
+ }
+
+ return ret;
+}
+
+gsize
+rspamd_fuzzy_backend_sqlite_expired (struct rspamd_fuzzy_backend_sqlite *backend)
+{
+ return backend != NULL ? backend->expired : 0;
+}
+
+const gchar *
+rspamd_fuzzy_sqlite_backend_id (struct rspamd_fuzzy_backend_sqlite *backend)
+{
+ return backend != NULL ? backend->id : 0;
+}
diff --git a/src/libserver/fuzzy_backend_sqlite.h b/src/libserver/fuzzy_backend_sqlite.h
new file mode 100644
index 000000000..032d1e3cd
--- /dev/null
+++ b/src/libserver/fuzzy_backend_sqlite.h
@@ -0,0 +1,98 @@
+/*-
+ * Copyright 2016 Vsevolod Stakhov
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef FUZZY_BACKEND_H_
+#define FUZZY_BACKEND_H_
+
+#include "config.h"
+#include "fuzzy_wire.h"
+
+
+struct rspamd_fuzzy_backend_sqlite;
+
+/**
+ * Open fuzzy backend
+ * @param path file to open (legacy file will be converted automatically)
+ * @param err error pointer
+ * @return backend structure or NULL
+ */
+struct rspamd_fuzzy_backend_sqlite *rspamd_fuzzy_backend_sqlite_open (const gchar *path,
+ gboolean vacuum,
+ GError **err);
+
+/**
+ * Check specified fuzzy in the backend
+ * @param backend
+ * @param cmd
+ * @return reply with probability and weight
+ */
+struct rspamd_fuzzy_reply rspamd_fuzzy_backend_sqlite_check (
+ struct rspamd_fuzzy_backend_sqlite *backend,
+ const struct rspamd_fuzzy_cmd *cmd,
+ gint64 expire);
+
+/**
+ * Prepare storage for updates (by starting transaction)
+ */
+gboolean rspamd_fuzzy_backend_sqlite_prepare_update (struct rspamd_fuzzy_backend_sqlite *backend,
+ const gchar *source);
+
+/**
+ * Add digest to the database
+ * @param backend
+ * @param cmd
+ * @return
+ */
+gboolean rspamd_fuzzy_backend_sqlite_add (struct rspamd_fuzzy_backend_sqlite *backend,
+ const struct rspamd_fuzzy_cmd *cmd);
+
+/**
+ * Delete digest from the database
+ * @param backend
+ * @param cmd
+ * @return
+ */
+gboolean rspamd_fuzzy_backend_sqlite_del (
+ struct rspamd_fuzzy_backend_sqlite *backend,
+ const struct rspamd_fuzzy_cmd *cmd);
+
+/**
+ * Commit updates to storage
+ */
+gboolean rspamd_fuzzy_backend_sqlite_finish_update (struct rspamd_fuzzy_backend_sqlite *backend,
+ const gchar *source, gboolean version_bump);
+
+/**
+ * Sync storage
+ * @param backend
+ * @return
+ */
+gboolean rspamd_fuzzy_backend_sqlite_sync (struct rspamd_fuzzy_backend_sqlite *backend,
+ gint64 expire,
+ gboolean clean_orphaned);
+
+/**
+ * Close storage
+ * @param backend
+ */
+void rspamd_fuzzy_backend_sqlite_close (struct rspamd_fuzzy_backend_sqlite *backend);
+
+gsize rspamd_fuzzy_backend_sqlite_count (struct rspamd_fuzzy_backend_sqlite *backend);
+gint rspamd_fuzzy_backend_sqlite_version (struct rspamd_fuzzy_backend_sqlite *backend, const gchar *source);
+gsize rspamd_fuzzy_backend_sqlite_expired (struct rspamd_fuzzy_backend_sqlite *backend);
+
+const gchar * rspamd_fuzzy_sqlite_backend_id (struct rspamd_fuzzy_backend_sqlite *backend);
+
+#endif /* FUZZY_BACKEND_H_ */
diff --git a/src/fuzzy_storage.h b/src/libserver/fuzzy_wire.h
index a9c3f174b..fb0cbf3ad 100644
--- a/src/fuzzy_storage.h
+++ b/src/libserver/fuzzy_wire.h
@@ -84,4 +84,12 @@ struct rspamd_fuzzy_stat_entry {
guint32 fuzzy_cnt;
};
+struct fuzzy_peer_cmd {
+ gboolean is_shingle;
+ union {
+ struct rspamd_fuzzy_cmd normal;
+ struct rspamd_fuzzy_shingle_cmd shingle;
+ } cmd;
+};
+
#endif
diff --git a/src/plugins/fuzzy_check.c b/src/plugins/fuzzy_check.c
index 9952c26d9..bc04b753b 100644
--- a/src/plugins/fuzzy_check.c
+++ b/src/plugins/fuzzy_check.c
@@ -35,7 +35,7 @@
#include "libutil/map.h"
#include "libmime/images.h"
#include "libserver/worker_util.h"
-#include "fuzzy_storage.h"
+#include "fuzzy_wire.h"
#include "utlist.h"
#include "cryptobox.h"
#include "ottery.h"
diff --git a/src/rspamd.c b/src/rspamd.c
index 3ddee3ff4..eb2f37ecb 100644
--- a/src/rspamd.c
+++ b/src/rspamd.c
@@ -16,7 +16,6 @@
#include "config.h"
#include "rspamd.h"
#include "libutil/map.h"
-#include "fuzzy_storage.h"
#include "lua/lua_common.h"
#include "libserver/worker_util.h"
#include "libserver/rspamd_control.h"