diff options
author | Vsevolod Stakhov <vsevolod@highsecure.ru> | 2016-09-02 14:33:49 +0100 |
---|---|---|
committer | Vsevolod Stakhov <vsevolod@highsecure.ru> | 2016-09-02 14:33:49 +0100 |
commit | 7a93b2c7c4a2181c00cf3cf6ef24ebe76e7bb0b8 (patch) | |
tree | fbd58dc5c67fd7ecb2c20ec2d4f292f22f3200fe | |
parent | 2d90a61b3a70bc75735575435276c9226c7f2c0f (diff) | |
parent | e8abb0bd7ccd7412402e6ecd44c0232d2d18e2a5 (diff) | |
download | rspamd-7a93b2c7c4a2181c00cf3cf6ef24ebe76e7bb0b8.tar.gz rspamd-7a93b2c7c4a2181c00cf3cf6ef24ebe76e7bb0b8.zip |
Merge branch 'fuzzy-backend-rework'
-rw-r--r-- | src/controller.c | 2 | ||||
-rw-r--r-- | src/fuzzy_storage.c | 472 | ||||
-rw-r--r-- | src/libserver/CMakeLists.txt | 1 | ||||
-rw-r--r-- | src/libserver/fuzzy_backend.c | 1196 | ||||
-rw-r--r-- | src/libserver/fuzzy_backend.h | 102 | ||||
-rw-r--r-- | src/libserver/fuzzy_backend_sqlite.c | 1055 | ||||
-rw-r--r-- | src/libserver/fuzzy_backend_sqlite.h | 98 | ||||
-rw-r--r-- | src/libserver/fuzzy_wire.h (renamed from src/fuzzy_storage.h) | 8 | ||||
-rw-r--r-- | src/plugins/fuzzy_check.c | 2 | ||||
-rw-r--r-- | src/rspamd.c | 1 |
10 files changed, 1747 insertions, 1190 deletions
diff --git a/src/controller.c b/src/controller.c index 2b19a7dd7..5d001fbb6 100644 --- a/src/controller.c +++ b/src/controller.c @@ -24,7 +24,7 @@ #include "libserver/worker_util.h" #include "cryptobox.h" #include "ottery.h" -#include "fuzzy_storage.h" +#include "fuzzy_wire.h" #include "libutil/rrd.h" #include "unix-std.h" #include "utlist.h" diff --git a/src/fuzzy_storage.c b/src/fuzzy_storage.c index e321aa0b6..d663a89e4 100644 --- a/src/fuzzy_storage.c +++ b/src/fuzzy_storage.c @@ -22,7 +22,7 @@ #include "util.h" #include "rspamd.h" #include "map.h" -#include "fuzzy_storage.h" +#include "fuzzy_wire.h" #include "fuzzy_backend.h" #include "ottery.h" #include "libserver/worker_util.h" @@ -36,15 +36,11 @@ #include "libutil/http_private.h" #include "unix-std.h" -/* This number is used as expire time in seconds for cache items (2 days) */ -#define DEFAULT_EXPIRE 172800L /* Resync value in seconds */ #define DEFAULT_SYNC_TIMEOUT 60.0 #define DEFAULT_KEYPAIR_CACHE_SIZE 512 #define DEFAULT_MASTER_TIMEOUT 10.0 -#define INVALID_NODE_TIME (guint64) - 1 - static const gchar *local_db_name = "local"; #define msg_err_fuzzy_update(...) rspamd_default_log_function (G_LOG_LEVEL_CRITICAL, \ @@ -77,10 +73,6 @@ worker_t fuzzy_worker = { RSPAMD_WORKER_VER /* Version info */ }; -/* For evtimer */ -static struct timeval tmv; -static struct event tev; - struct fuzzy_global_stat { guint64 fuzzy_hashes; /**< number of fuzzy hashes stored */ @@ -165,6 +157,7 @@ struct fuzzy_session { } cmd; struct rspamd_fuzzy_encrypted_reply reply; + struct fuzzy_key_stat *ip_stat; enum rspamd_fuzzy_epoch epoch; enum fuzzy_cmd_type cmd_type; @@ -176,14 +169,6 @@ struct fuzzy_session { guchar nm[rspamd_cryptobox_MAX_NMBYTES]; }; -struct fuzzy_peer_cmd { - gboolean is_shingle; - union { - struct rspamd_fuzzy_cmd normal; - struct rspamd_fuzzy_shingle_cmd shingle; - } cmd; -}; - struct fuzzy_peer_request { struct event io_ev; struct fuzzy_peer_cmd cmd; @@ -199,7 +184,10 @@ struct fuzzy_master_update_session { const gchar *name; gchar uid[16]; struct rspamd_http_connection *conn; + struct rspamd_http_message *msg; struct rspamd_fuzzy_storage_ctx *ctx; + const gchar *src; + gchar *psrc; rspamd_inet_addr_t *addr; gboolean replied; gint sock; @@ -250,6 +238,14 @@ fuzzy_key_dtor (gpointer p) g_slice_free1 (sizeof (*key), key); } +static void +fuzzy_count_callback (guint64 count, void *ud) +{ + struct rspamd_fuzzy_storage_ctx *ctx = ud; + + ctx->stat.fuzzy_hashes = count; +} + struct fuzzy_slave_connection { struct rspamd_cryptobox_keypair *local_key; struct rspamd_cryptobox_pubkey *remote_key; @@ -274,19 +270,34 @@ fuzzy_mirror_close_connection (struct fuzzy_slave_connection *conn) } } +struct rspamd_fuzzy_updates_cbdata { + struct rspamd_fuzzy_storage_ctx *ctx; + struct rspamd_http_message *msg; + struct fuzzy_slave_connection *conn; + struct rspamd_fuzzy_mirror *m; +}; + static void -fuzzy_mirror_updates_to_http (struct rspamd_fuzzy_storage_ctx *ctx, - struct rspamd_http_message *msg) +fuzzy_mirror_updates_version_cb (guint64 rev64, void *ud) { + struct rspamd_fuzzy_updates_cbdata *cbdata = ud; GList *cur; struct fuzzy_peer_cmd *io_cmd; - guint32 len; - guint32 rev; + guint32 rev32 = rev64, len; const gchar *p; rspamd_fstring_t *reply; + struct fuzzy_slave_connection *conn; + struct rspamd_fuzzy_storage_ctx *ctx; + struct rspamd_http_message *msg; + struct rspamd_fuzzy_mirror *m; + struct timeval tv; - rev = rspamd_fuzzy_backend_version (ctx->backend, local_db_name); - rev = GUINT32_TO_LE (rev); + conn = cbdata->conn; + ctx = cbdata->ctx; + msg = cbdata->msg; + m = cbdata->m; + g_slice_free1 (sizeof (*cbdata), cbdata); + rev32 = GUINT32_TO_LE (rev32); len = sizeof (guint32) * 2; /* revision + last chunk */ for (cur = ctx->updates_pending->head; cur != NULL; cur = g_list_next (cur)) { @@ -303,8 +314,8 @@ fuzzy_mirror_updates_to_http (struct rspamd_fuzzy_storage_ctx *ctx, } reply = rspamd_fstring_sized_new (len); - reply = rspamd_fstring_append (reply, (const char *)&rev, - sizeof (rev)); + reply = rspamd_fstring_append (reply, (const char *)&rev32, + sizeof (rev32)); for (cur = ctx->updates_pending->head; cur != NULL; cur = g_list_next (cur)) { io_cmd = cur->data; @@ -328,6 +339,30 @@ fuzzy_mirror_updates_to_http (struct rspamd_fuzzy_storage_ctx *ctx, len = 0; reply = rspamd_fstring_append (reply, (const char *)&len, sizeof (len)); rspamd_http_message_set_body_from_fstring_steal (msg, reply); + double_to_tv (ctx->sync_timeout, &tv); + rspamd_http_connection_write_message (conn->http_conn, + msg, NULL, NULL, conn, + conn->sock, + &tv, ctx->ev_base); + msg_info ("send update request to %s", m->name); +} + +static void +fuzzy_mirror_updates_to_http (struct rspamd_fuzzy_mirror *m, + struct fuzzy_slave_connection *conn, + struct rspamd_fuzzy_storage_ctx *ctx, + struct rspamd_http_message *msg) +{ + + struct rspamd_fuzzy_updates_cbdata *cbdata; + + cbdata = g_slice_alloc (sizeof (*cbdata)); + cbdata->ctx = ctx; + cbdata->msg = msg; + cbdata->conn = conn; + cbdata->m = m; + rspamd_fuzzy_backend_version (ctx->backend, local_db_name, + fuzzy_mirror_updates_version_cb, cbdata); } static void @@ -361,7 +396,6 @@ rspamd_fuzzy_send_update_mirror (struct rspamd_fuzzy_storage_ctx *ctx, { struct fuzzy_slave_connection *conn; struct rspamd_http_message *msg; - struct timeval tv; conn = g_slice_alloc0 (sizeof (*conn)); conn->up = rspamd_upstream_get (m->u, @@ -397,84 +431,85 @@ rspamd_fuzzy_send_update_mirror (struct rspamd_fuzzy_storage_ctx *ctx, rspamd_http_connection_set_key (conn->http_conn, ctx->sync_keypair); msg->peer_key = rspamd_pubkey_ref (m->key); - double_to_tv (ctx->sync_timeout, &tv); - fuzzy_mirror_updates_to_http (ctx, msg); + fuzzy_mirror_updates_to_http (m, conn, ctx, msg); +} - rspamd_http_connection_write_message (conn->http_conn, - msg, NULL, NULL, conn, - conn->sock, - &tv, ctx->ev_base); - msg_info ("send update request to %s", m->name); +struct rspamd_updates_cbdata { + struct rspamd_fuzzy_storage_ctx *ctx; + const gchar *source; +}; + +static void +fuzzy_update_version_callback (guint64 ver, void *ud) +{ + msg_info ("updated fuzzy storage from %s: version: %d", + (const char *)ud, (gint)ver); } static void -rspamd_fuzzy_process_updates_queue (struct rspamd_fuzzy_storage_ctx *ctx, - const gchar *source) +rspamd_fuzzy_updates_cb (gboolean success, void *ud) { - GList *cur; - struct fuzzy_peer_cmd *io_cmd; - struct rspamd_fuzzy_cmd *cmd; - gpointer ptr; + struct rspamd_updates_cbdata *cbdata = ud; struct rspamd_fuzzy_mirror *m; guint nupdates = 0, i; + struct rspamd_fuzzy_storage_ctx *ctx; + const gchar *source; + GList *cur; + struct fuzzy_peer_cmd *io_cmd; - if (ctx->updates_pending && - g_queue_get_length (ctx->updates_pending) > 0 && - rspamd_fuzzy_backend_prepare_update (ctx->backend, source)) { - cur = ctx->updates_pending->head; + ctx = cbdata->ctx; + source = cbdata->source; - while (cur) { - io_cmd = cur->data; + if (success) { + rspamd_fuzzy_backend_count (ctx->backend, fuzzy_count_callback, ctx); - if (io_cmd->is_shingle) { - cmd = &io_cmd->cmd.shingle.basic; - ptr = &io_cmd->cmd.shingle; - } - else { - cmd = &io_cmd->cmd.normal; - ptr = &io_cmd->cmd.normal; - } + if (nupdates > 0) { + for (i = 0; i < ctx->mirrors->len; i ++) { + m = g_ptr_array_index (ctx->mirrors, i); - if (cmd->cmd == FUZZY_WRITE) { - rspamd_fuzzy_backend_add (ctx->backend, ptr); - } - else { - rspamd_fuzzy_backend_del (ctx->backend, ptr); + rspamd_fuzzy_send_update_mirror (ctx, m); } + } - nupdates++; + /* Clear updates */ + cur = ctx->updates_pending->head; + + while (cur) { + io_cmd = cur->data; + g_slice_free1 (sizeof (*io_cmd), io_cmd); cur = g_list_next (cur); } - if (rspamd_fuzzy_backend_finish_update (ctx->backend, source, nupdates > 0)) { - ctx->stat.fuzzy_hashes = rspamd_fuzzy_backend_count (ctx->backend); + g_queue_clear (ctx->updates_pending); + rspamd_fuzzy_backend_version (ctx->backend, source, + fuzzy_update_version_callback, (void *)source); - if (nupdates > 0) { - for (i = 0; i < ctx->mirrors->len; i ++) { - m = g_ptr_array_index (ctx->mirrors, i); + } + else { + msg_err ("cannot commit update transaction to fuzzy backend, " + "%ud updates are still pending", + g_queue_get_length (ctx->updates_pending)); + } - rspamd_fuzzy_send_update_mirror (ctx, m); - } - } + g_slice_free1 (sizeof (*cbdata), cbdata); +} - /* Clear updates */ - cur = ctx->updates_pending->head; +static void +rspamd_fuzzy_process_updates_queue (struct rspamd_fuzzy_storage_ctx *ctx, + const gchar *source) +{ + + struct rspamd_updates_cbdata *cbdata; + + if (ctx->updates_pending && + g_queue_get_length (ctx->updates_pending) > 0) { + cbdata = g_slice_alloc (sizeof (*cbdata)); + cbdata->ctx = ctx; + cbdata->source = source; + rspamd_fuzzy_backend_process_updates (ctx->backend, ctx->updates_pending, + source, rspamd_fuzzy_updates_cb, cbdata); - while (cur) { - io_cmd = cur->data; - g_slice_free1 (sizeof (*io_cmd), io_cmd); - cur = g_list_next (cur); - } - g_queue_clear (ctx->updates_pending); - msg_info ("updated fuzzy storage: %ud updates processed, version: %d", - nupdates, rspamd_fuzzy_backend_version (ctx->backend, source)); - } - else { - msg_err ("cannot commit update transaction to fuzzy backend, " - "%ud updates are still pending", - g_queue_get_length (ctx->updates_pending)); - } } else if (ctx->updates_pending && g_queue_get_length (ctx->updates_pending) > 0) { @@ -607,6 +642,77 @@ rspamd_fuzzy_update_stats (struct rspamd_fuzzy_storage_ctx *ctx, } static void +rspamd_fuzzy_make_reply (struct rspamd_fuzzy_cmd *cmd, + struct rspamd_fuzzy_reply *result, + struct fuzzy_session *session, + gboolean encrypted, gboolean is_shingle) +{ + if (cmd) { + result->tag = cmd->tag; + + memcpy (&session->reply.rep, result, sizeof (*result)); + + rspamd_fuzzy_update_stats (session->ctx, + session->epoch, + result->prob > 0.5, + is_shingle, + session->key_stat, + session->ip_stat, + cmd->cmd, + result->value); + + if (encrypted) { + /* We need also to encrypt reply */ + ottery_rand_bytes (session->reply.hdr.nonce, + sizeof (session->reply.hdr.nonce)); + rspamd_cryptobox_encrypt_nm_inplace ((guchar *)&session->reply.rep, + sizeof (session->reply.rep), + session->reply.hdr.nonce, + session->nm, + session->reply.hdr.mac, + RSPAMD_CRYPTOBOX_MODE_25519); + } + } + + rspamd_fuzzy_write_reply (session); +} + +static void +rspamd_fuzzy_check_callback (struct rspamd_fuzzy_reply *result, void *ud) +{ + struct fuzzy_session *session = ud; + gboolean encrypted = FALSE, is_shingle = FALSE; + struct rspamd_fuzzy_cmd *cmd = NULL; + gsize up_len = 0; + + switch (session->cmd_type) { + case CMD_NORMAL: + cmd = &session->cmd.normal; + up_len = sizeof (session->cmd.normal); + break; + case CMD_SHINGLE: + cmd = &session->cmd.shingle.basic; + up_len = sizeof (session->cmd.shingle); + is_shingle = TRUE; + break; + case CMD_ENCRYPTED_NORMAL: + cmd = &session->cmd.enc_normal.cmd; + up_len = sizeof (session->cmd.normal); + encrypted = TRUE; + break; + case CMD_ENCRYPTED_SHINGLE: + cmd = &session->cmd.enc_shingle.cmd.basic; + up_len = sizeof (session->cmd.shingle); + encrypted = TRUE; + is_shingle = TRUE; + break; + } + + rspamd_fuzzy_make_reply (cmd, result, session, encrypted, is_shingle); + REF_RELEASE (session); +} + +static void rspamd_fuzzy_process_command (struct fuzzy_session *session) { gboolean encrypted = FALSE, is_shingle = FALSE; @@ -645,14 +751,16 @@ rspamd_fuzzy_process_command (struct fuzzy_session *session) if (G_UNLIKELY (cmd == NULL || up_len == 0)) { result.value = 500; result.prob = 0.0; - goto reply; + rspamd_fuzzy_make_reply (cmd, &result, session, encrypted, is_shingle); + return; } if (session->ctx->encrypted_only && !encrypted) { /* Do not accept unencrypted commands */ result.value = 403; result.prob = 0.0; - goto reply; + rspamd_fuzzy_make_reply (cmd, &result, session, encrypted, is_shingle); + return; } if (session->key_stat) { @@ -665,17 +773,21 @@ rspamd_fuzzy_process_command (struct fuzzy_session *session) rspamd_lru_hash_insert (session->key_stat->last_ips, naddr, ip_stat, -1, 0); } + + session->ip_stat = ip_stat; } result.flag = cmd->flag; if (cmd->cmd == FUZZY_CHECK) { - result = rspamd_fuzzy_backend_check (session->ctx->backend, cmd, - session->ctx->expire); + REF_RETAIN (session); + rspamd_fuzzy_backend_check (session->ctx->backend, cmd, + rspamd_fuzzy_check_callback, session); } else if (cmd->cmd == FUZZY_STAT) { result.prob = 1.0; result.value = 0; - result.flag = rspamd_fuzzy_backend_count (session->ctx->backend); + result.flag = session->ctx->stat.fuzzy_hashes; + rspamd_fuzzy_make_reply (cmd, &result, session, encrypted, is_shingle); } else { if (rspamd_fuzzy_check_client (session)) { @@ -711,36 +823,9 @@ rspamd_fuzzy_process_command (struct fuzzy_session *session) result.value = 403; result.prob = 0.0; } - } -reply: - if (cmd) { - result.tag = cmd->tag; - - memcpy (&session->reply.rep, &result, sizeof (result)); - - rspamd_fuzzy_update_stats (session->ctx, - session->epoch, - result.prob > 0.5, - is_shingle, - session->key_stat, - ip_stat, cmd->cmd, - result.value); - - if (encrypted) { - /* We need also to encrypt reply */ - ottery_rand_bytes (session->reply.hdr.nonce, - sizeof (session->reply.hdr.nonce)); - rspamd_cryptobox_encrypt_nm_inplace ((guchar *)&session->reply.rep, - sizeof (session->reply.rep), - session->reply.hdr.nonce, - session->nm, - session->reply.hdr.mac, - RSPAMD_CRYPTOBOX_MODE_25519); - } + rspamd_fuzzy_make_reply (cmd, &result, session, encrypted, is_shingle); } - - rspamd_fuzzy_write_reply (session); } @@ -924,12 +1009,11 @@ rspamd_fuzzy_cmd_from_wire (guchar *buf, guint buflen, struct fuzzy_session *s) static void rspamd_fuzzy_mirror_process_update (struct fuzzy_master_update_session *session, - struct rspamd_http_message *msg) + struct rspamd_http_message *msg, guint our_rev) { const guchar *p; - gchar *src = NULL, *psrc; gsize remain; - gint32 revision, our_rev; + gint32 revision; guint32 len = 0, cnt = 0; struct fuzzy_peer_cmd cmd, *pcmd; enum { @@ -940,25 +1024,6 @@ rspamd_fuzzy_mirror_process_update (struct fuzzy_master_update_session *session, GList *updates = NULL, *cur; gpointer flag_ptr; - if (!rspamd_http_message_get_body (msg, NULL) || !msg->url - || msg->url->len == 0) { - msg_err_fuzzy_update ("empty update message, not processing"); - - return; - } - - /* Detect source from url: /update_v1/<source>, so we look for the last '/' */ - remain = msg->url->len; - psrc = rspamd_fstringdup (msg->url); - src = psrc; - - while (remain--) { - if (src[remain] == '/') { - src = &src[remain + 1]; - break; - } - } - /* * Message format: * <uint32_le> - revision @@ -973,13 +1038,11 @@ rspamd_fuzzy_mirror_process_update (struct fuzzy_master_update_session *session, if (remain > sizeof (gint32) * 2) { memcpy (&revision, p, sizeof (gint32)); revision = GINT32_TO_LE (revision); - our_rev = rspamd_fuzzy_backend_version (session->ctx->backend, src); if (revision <= our_rev) { msg_err_fuzzy_update ("remote revision: %d is older than ours: %d, " "refusing update", revision, our_rev); - g_free (psrc); return; } @@ -1084,7 +1147,7 @@ rspamd_fuzzy_mirror_process_update (struct fuzzy_master_update_session *session, cur->data = NULL; } - rspamd_fuzzy_process_updates_queue (session->ctx, src); + rspamd_fuzzy_process_updates_queue (session->ctx, session->src); msg_info_fuzzy_update ("processed updates from the master %s, " "%ud operations processed," " revision: %d (local revision: %d)", @@ -1092,8 +1155,6 @@ rspamd_fuzzy_mirror_process_update (struct fuzzy_master_update_session *session, cnt, revision, our_rev); err: - g_free (psrc); - if (updates) { /* We still need to clear queue */ for (cur = updates; cur != NULL; cur = g_list_next (cur)) { @@ -1127,6 +1188,10 @@ rspamd_fuzzy_mirror_session_destroy (struct fuzzy_master_update_session *session rspamd_http_connection_unref (session->conn); rspamd_inet_address_destroy (session->addr); close (session->sock); + + if (session->psrc) { + g_free (session->psrc); + } g_slice_free1 (sizeof (*session), session); } } @@ -1159,6 +1224,15 @@ rspamd_fuzzy_mirror_send_reply (struct fuzzy_master_update_session *session, session->ctx->ev_base); } +static void +rspamd_fuzzy_update_version_callback (guint64 version, void *ud) +{ + struct fuzzy_master_update_session *session = ud; + + rspamd_fuzzy_mirror_process_update (session, session->msg, version); + rspamd_fuzzy_mirror_send_reply (session, 200, "OK"); +} + static gint rspamd_fuzzy_mirror_finish_handler (struct rspamd_http_connection *conn, struct rspamd_http_message *msg) @@ -1166,6 +1240,9 @@ rspamd_fuzzy_mirror_finish_handler (struct rspamd_http_connection *conn, struct fuzzy_master_update_session *session = conn->ud; const struct rspamd_cryptobox_pubkey *rk; const gchar *err_str = NULL; + gchar *psrc; + const gchar *src = NULL; + gsize remain; if (session->replied) { rspamd_fuzzy_mirror_session_destroy (session); @@ -1197,9 +1274,30 @@ rspamd_fuzzy_mirror_finish_handler (struct rspamd_http_connection *conn, msg_warn_fuzzy_update ("no trusted key specified, accept any update from %s", rspamd_inet_address_to_string (session->addr)); } + if (!rspamd_http_message_get_body (msg, NULL) || !msg->url + || msg->url->len == 0) { + msg_err_fuzzy_update ("empty update message, not processing"); + + goto end; + } - rspamd_fuzzy_mirror_process_update (session, msg); - rspamd_fuzzy_mirror_send_reply (session, 200, "OK"); + /* Detect source from url: /update_v1/<source>, so we look for the last '/' */ + remain = msg->url->len; + psrc = rspamd_fstringdup (msg->url); + src = psrc; + + while (remain--) { + if (src[remain] == '/') { + src = &src[remain + 1]; + break; + } + } + + session->src = src; + session->psrc = psrc; + session->msg = msg; + rspamd_fuzzy_backend_version (session->ctx->backend, src, + rspamd_fuzzy_update_version_callback, session); return 0; } @@ -1360,38 +1458,6 @@ accept_fuzzy_socket (gint fd, short what, void *arg) } } -static void -sync_callback (gint fd, short what, void *arg) -{ - struct rspamd_worker *worker = (struct rspamd_worker *)arg; - struct rspamd_fuzzy_storage_ctx *ctx; - gdouble next_check; - guint64 old_expired, new_expired; - - ctx = worker->ctx; - - if (ctx->backend) { - rspamd_fuzzy_process_updates_queue (ctx, local_db_name); - /* Call backend sync */ - old_expired = rspamd_fuzzy_backend_expired (ctx->backend); - rspamd_fuzzy_backend_sync (ctx->backend, ctx->expire, TRUE); - new_expired = rspamd_fuzzy_backend_expired (ctx->backend); - - if (old_expired < new_expired) { - ctx->stat.fuzzy_hashes_expired += new_expired - old_expired; - } - } - - /* Timer event */ - event_del (&tev); - evtimer_set (&tev, sync_callback, worker); - event_base_set (ctx->ev_base, &tev); - /* Plan event with jitter */ - next_check = rspamd_time_jitter (ctx->sync_timeout, 0); - double_to_tv (next_check, &tmv); - evtimer_add (&tev, &tmv); -} - static gboolean rspamd_fuzzy_storage_sync (struct rspamd_main *rspamd_main, struct rspamd_worker *worker, gint fd, @@ -1400,29 +1466,13 @@ rspamd_fuzzy_storage_sync (struct rspamd_main *rspamd_main, gpointer ud) { struct rspamd_fuzzy_storage_ctx *ctx = ud; - gdouble next_check; - guint64 old_expired, new_expired; struct rspamd_control_reply rep; - if (ctx->backend) { - rspamd_fuzzy_process_updates_queue (ctx, local_db_name); - /* Call backend sync */ - old_expired = rspamd_fuzzy_backend_expired (ctx->backend); - rspamd_fuzzy_backend_sync (ctx->backend, ctx->expire, TRUE); - new_expired = rspamd_fuzzy_backend_expired (ctx->backend); - - if (old_expired < new_expired) { - ctx->stat.fuzzy_hashes_expired += new_expired - old_expired; - } - } - rep.reply.fuzzy_sync.status = 0; - /* Handle next timer event */ - event_del (&tev); - next_check = rspamd_time_jitter (ctx->sync_timeout, 0); - double_to_tv (next_check, &tmv); - evtimer_add (&tev, &tmv); + if (ctx->backend && worker->index == 0) { + rspamd_fuzzy_backend_start_expire (ctx->backend, ctx->sync_timeout); + } if (write (fd, &rep, sizeof (rep)) != sizeof (rep)) { msg_err ("cannot write reply to the control socket: %s", @@ -1453,8 +1503,8 @@ rspamd_fuzzy_storage_reload (struct rspamd_main *rspamd_main, memset (&rep, 0, sizeof (rep)); rep.type = RSPAMD_CONTROL_RELOAD; - if ((ctx->backend = rspamd_fuzzy_backend_open (ctx->hashfile, - TRUE, + if ((ctx->backend = rspamd_fuzzy_backend_create (ctx->ev_base, + worker->cf->options, &err)) == NULL) { msg_err ("cannot open backend after reload: %e", err); g_error_free (err); @@ -1464,6 +1514,10 @@ rspamd_fuzzy_storage_reload (struct rspamd_main *rspamd_main, rep.reply.reload.status = 0; } + if (ctx->backend && worker->index == 0) { + rspamd_fuzzy_backend_start_expire (ctx->backend, ctx->sync_timeout); + } + if (write (fd, &rep, sizeof (rep)) != sizeof (rep)) { msg_err ("cannot write reply to the control socket: %s", strerror (errno)); @@ -1909,7 +1963,6 @@ init_fuzzy (struct rspamd_config *cfg) ctx->magic = rspamd_fuzzy_storage_magic; ctx->sync_timeout = DEFAULT_SYNC_TIMEOUT; ctx->master_timeout = DEFAULT_MASTER_TIMEOUT; - ctx->expire = DEFAULT_EXPIRE; ctx->keypair_cache_size = DEFAULT_KEYPAIR_CACHE_SIZE; ctx->keys = g_hash_table_new_full (fuzzy_kp_hash, fuzzy_kp_equal, NULL, fuzzy_key_dtor); @@ -2116,7 +2169,6 @@ fuzzy_peer_rep (struct rspamd_worker *worker, GList *cur; struct rspamd_worker_listen_socket *ls; struct event *accept_events; - gdouble next_check; ctx->peer_fd = rep_fd; @@ -2165,14 +2217,6 @@ fuzzy_peer_rep (struct rspamd_worker *worker, event_base_set (ctx->ev_base, &ctx->peer_ev); event_add (&ctx->peer_ev, NULL); ctx->updates_pending = g_queue_new (); - - /* Timer event */ - evtimer_set (&tev, sync_callback, worker); - event_base_set (ctx->ev_base, &tev); - /* Plan event with jitter */ - next_check = rspamd_time_jitter (ctx->sync_timeout, 0); - double_to_tv (next_check, &tmv); - evtimer_add (&tev, &tmv); } } @@ -2196,13 +2240,14 @@ start_fuzzy (struct rspamd_worker *worker) /* * Open DB and perform VACUUM */ - if ((ctx->backend = rspamd_fuzzy_backend_open (ctx->hashfile, TRUE, &err)) == NULL) { + if ((ctx->backend = rspamd_fuzzy_backend_create (ctx->ev_base, + worker->cf->options, &err)) == NULL) { msg_err ("cannot open backend: %e", err); g_error_free (err); exit (EXIT_SUCCESS); } - ctx->stat.fuzzy_hashes = rspamd_fuzzy_backend_count (ctx->backend); + rspamd_fuzzy_backend_count (ctx->backend, fuzzy_count_callback, ctx); if (ctx->keypair_cache_size > 0) { /* Create keypairs cache */ @@ -2210,7 +2255,7 @@ start_fuzzy (struct rspamd_worker *worker) } if (worker->index == 0) { - rspamd_fuzzy_backend_sync (ctx->backend, ctx->expire, TRUE); + rspamd_fuzzy_backend_start_expire (ctx->backend, ctx->sync_timeout); } if (ctx->mirrors && ctx->mirrors->len != 0) { @@ -2268,11 +2313,6 @@ start_fuzzy (struct rspamd_worker *worker) event_base_loop (ctx->ev_base, 0); rspamd_worker_block_signals (); - if (worker->index == 0) { - rspamd_fuzzy_process_updates_queue (ctx, local_db_name); - rspamd_fuzzy_backend_sync (ctx->backend, ctx->expire, TRUE); - } - rspamd_fuzzy_backend_close (ctx->backend); rspamd_log_close (worker->srv->logger); diff --git a/src/libserver/CMakeLists.txt b/src/libserver/CMakeLists.txt index 295ad59c8..4f3b9a260 100644 --- a/src/libserver/CMakeLists.txt +++ b/src/libserver/CMakeLists.txt @@ -9,6 +9,7 @@ SET(LIBRSPAMDSERVERSRC ${CMAKE_CURRENT_SOURCE_DIR}/dynamic_cfg.c ${CMAKE_CURRENT_SOURCE_DIR}/events.c ${CMAKE_CURRENT_SOURCE_DIR}/fuzzy_backend.c + ${CMAKE_CURRENT_SOURCE_DIR}/fuzzy_backend_sqlite.c ${CMAKE_CURRENT_SOURCE_DIR}/html.c ${CMAKE_CURRENT_SOURCE_DIR}/monitored.c ${CMAKE_CURRENT_SOURCE_DIR}/protocol.c diff --git a/src/libserver/fuzzy_backend.c b/src/libserver/fuzzy_backend.c index 463fdd1f4..44493e17b 100644 --- a/src/libserver/fuzzy_backend.c +++ b/src/libserver/fuzzy_backend.c @@ -13,1042 +13,392 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + #include "config.h" -#include "rspamd.h" #include "fuzzy_backend.h" -#include "unix-std.h" +#include "fuzzy_backend_sqlite.h" -#include <sqlite3.h> -#include "libutil/sqlite_utils.h" +#define DEFAULT_EXPIRE 172800L -struct rspamd_fuzzy_backend { - sqlite3 *db; - char *path; - gchar id[MEMPOOL_UID_LEN]; - gsize count; - gsize expired; - rspamd_mempool_t *pool; +enum rspamd_fuzzy_backend_type { + RSPAMD_FUZZY_BACKEND_SQLITE = 0, + // RSPAMD_FUZZY_BACKEND_REDIS }; -static const gdouble sql_sleep_time = 0.1; -static const guint max_retries = 10; - -#define msg_err_fuzzy_backend(...) rspamd_default_log_function (G_LOG_LEVEL_CRITICAL, \ - backend->pool->tag.tagname, backend->pool->tag.uid, \ - G_STRFUNC, \ - __VA_ARGS__) -#define msg_warn_fuzzy_backend(...) rspamd_default_log_function (G_LOG_LEVEL_WARNING, \ - backend->pool->tag.tagname, backend->pool->tag.uid, \ - G_STRFUNC, \ - __VA_ARGS__) -#define msg_info_fuzzy_backend(...) rspamd_default_log_function (G_LOG_LEVEL_INFO, \ - backend->pool->tag.tagname, backend->pool->tag.uid, \ - G_STRFUNC, \ - __VA_ARGS__) -#define msg_debug_fuzzy_backend(...) rspamd_default_log_function (G_LOG_LEVEL_DEBUG, \ - backend->pool->tag.tagname, backend->pool->tag.uid, \ - G_STRFUNC, \ - __VA_ARGS__) - -static const char *create_tables_sql = - "BEGIN;" - "CREATE TABLE IF NOT EXISTS digests(" - " id INTEGER PRIMARY KEY," - " flag INTEGER NOT NULL," - " digest TEXT NOT NULL," - " value INTEGER," - " time INTEGER);" - "CREATE TABLE IF NOT EXISTS shingles(" - " value INTEGER NOT NULL," - " number INTEGER NOT NULL," - " digest_id INTEGER REFERENCES digests(id) ON DELETE CASCADE " - " ON UPDATE CASCADE);" - "CREATE TABLE IF NOT EXISTS sources(" - " name TEXT UNIQUE," - " version INTEGER," - " last INTEGER);" - "CREATE UNIQUE INDEX IF NOT EXISTS d ON digests(digest);" - "CREATE INDEX IF NOT EXISTS t ON digests(time);" - "CREATE INDEX IF NOT EXISTS dgst_id ON shingles(digest_id);" - "CREATE UNIQUE INDEX IF NOT EXISTS s ON shingles(value, number);" - "COMMIT;"; -#if 0 -static const char *create_index_sql = - "BEGIN;" - "CREATE UNIQUE INDEX IF NOT EXISTS d ON digests(digest);" - "CREATE INDEX IF NOT EXISTS t ON digests(time);" - "CREATE INDEX IF NOT EXISTS dgst_id ON shingles(digest_id);" - "CREATE UNIQUE INDEX IF NOT EXISTS s ON shingles(value, number);" - "COMMIT;"; -#endif -enum rspamd_fuzzy_statement_idx { - RSPAMD_FUZZY_BACKEND_TRANSACTION_START = 0, - RSPAMD_FUZZY_BACKEND_TRANSACTION_COMMIT, - RSPAMD_FUZZY_BACKEND_TRANSACTION_ROLLBACK, - RSPAMD_FUZZY_BACKEND_INSERT, - RSPAMD_FUZZY_BACKEND_UPDATE, - RSPAMD_FUZZY_BACKEND_UPDATE_FLAG, - RSPAMD_FUZZY_BACKEND_INSERT_SHINGLE, - RSPAMD_FUZZY_BACKEND_CHECK, - RSPAMD_FUZZY_BACKEND_CHECK_SHINGLE, - RSPAMD_FUZZY_BACKEND_GET_DIGEST_BY_ID, - RSPAMD_FUZZY_BACKEND_DELETE, - RSPAMD_FUZZY_BACKEND_COUNT, - RSPAMD_FUZZY_BACKEND_EXPIRE, - RSPAMD_FUZZY_BACKEND_VACUUM, - RSPAMD_FUZZY_BACKEND_DELETE_ORPHANED, - RSPAMD_FUZZY_BACKEND_ADD_SOURCE, - RSPAMD_FUZZY_BACKEND_VERSION, - RSPAMD_FUZZY_BACKEND_SET_VERSION, - RSPAMD_FUZZY_BACKEND_MAX +static void* rspamd_fuzzy_backend_init_sqlite (struct rspamd_fuzzy_backend *bk, + const ucl_object_t *obj, GError **err); +static void rspamd_fuzzy_backend_check_sqlite (struct rspamd_fuzzy_backend *bk, + const struct rspamd_fuzzy_cmd *cmd, + rspamd_fuzzy_check_cb cb, void *ud, + void *subr_ud); +static void rspamd_fuzzy_backend_update_sqlite (struct rspamd_fuzzy_backend *bk, + GQueue *updates, const gchar *src, + rspamd_fuzzy_update_cb cb, void *ud, + void *subr_ud); +static void rspamd_fuzzy_backend_count_sqlite (struct rspamd_fuzzy_backend *bk, + rspamd_fuzzy_count_cb cb, void *ud, + void *subr_ud); +static void rspamd_fuzzy_backend_version_sqlite (struct rspamd_fuzzy_backend *bk, + const gchar *src, + rspamd_fuzzy_version_cb cb, void *ud, + void *subr_ud); +static const gchar* rspamd_fuzzy_backend_id_sqlite (struct rspamd_fuzzy_backend *bk, + void *subr_ud); +static void rspamd_fuzzy_backend_expire_sqlite (struct rspamd_fuzzy_backend *bk, + void *subr_ud); +static void rspamd_fuzzy_backend_close_sqlite (struct rspamd_fuzzy_backend *bk, + void *subr_ud); + +struct rspamd_fuzzy_backend_subr { + void* (*init) (struct rspamd_fuzzy_backend *bk, const ucl_object_t *obj, + GError **err); + void (*check) (struct rspamd_fuzzy_backend *bk, + const struct rspamd_fuzzy_cmd *cmd, + rspamd_fuzzy_check_cb cb, void *ud, + void *subr_ud); + void (*update) (struct rspamd_fuzzy_backend *bk, + GQueue *updates, const gchar *src, + rspamd_fuzzy_update_cb cb, void *ud, + void *subr_ud); + void (*count) (struct rspamd_fuzzy_backend *bk, + rspamd_fuzzy_count_cb cb, void *ud, + void *subr_ud); + void (*version) (struct rspamd_fuzzy_backend *bk, + const gchar *src, + rspamd_fuzzy_version_cb cb, void *ud, + void *subr_ud); + const gchar* (*id) (struct rspamd_fuzzy_backend *bk, void *subr_ud); + void (*expire) (struct rspamd_fuzzy_backend *bk, void *subr_ud); + void (*close) (struct rspamd_fuzzy_backend *bk, void *subr_ud); }; -static struct rspamd_fuzzy_stmts { - enum rspamd_fuzzy_statement_idx idx; - const gchar *sql; - const gchar *args; - sqlite3_stmt *stmt; - gint result; -} prepared_stmts[RSPAMD_FUZZY_BACKEND_MAX] = -{ - { - .idx = RSPAMD_FUZZY_BACKEND_TRANSACTION_START, - .sql = "BEGIN TRANSACTION;", - .args = "", - .stmt = NULL, - .result = SQLITE_DONE - }, - { - .idx = RSPAMD_FUZZY_BACKEND_TRANSACTION_COMMIT, - .sql = "COMMIT;", - .args = "", - .stmt = NULL, - .result = SQLITE_DONE - }, - { - .idx = RSPAMD_FUZZY_BACKEND_TRANSACTION_ROLLBACK, - .sql = "ROLLBACK;", - .args = "", - .stmt = NULL, - .result = SQLITE_DONE - }, - { - .idx = RSPAMD_FUZZY_BACKEND_INSERT, - .sql = "INSERT INTO digests(flag, digest, value, time) VALUES" - "(?1, ?2, ?3, strftime('%s','now'));", - .args = "SDI", - .stmt = NULL, - .result = SQLITE_DONE - }, - { - .idx = RSPAMD_FUZZY_BACKEND_UPDATE, - .sql = "UPDATE digests SET value = value + ?1, time = strftime('%s','now') WHERE " - "digest==?2;", - .args = "ID", - .stmt = NULL, - .result = SQLITE_DONE - }, - { - .idx = RSPAMD_FUZZY_BACKEND_UPDATE_FLAG, - .sql = "UPDATE digests SET value = ?1, flag = ?2, time = strftime('%s','now') WHERE " - "digest==?3;", - .args = "IID", - .stmt = NULL, - .result = SQLITE_DONE - }, - { - .idx = RSPAMD_FUZZY_BACKEND_INSERT_SHINGLE, - .sql = "INSERT OR REPLACE INTO shingles(value, number, digest_id) " - "VALUES (?1, ?2, ?3);", - .args = "III", - .stmt = NULL, - .result = SQLITE_DONE - }, - { - .idx = RSPAMD_FUZZY_BACKEND_CHECK, - .sql = "SELECT value, time, flag FROM digests WHERE digest==?1;", - .args = "D", - .stmt = NULL, - .result = SQLITE_ROW - }, - { - .idx = RSPAMD_FUZZY_BACKEND_CHECK_SHINGLE, - .sql = "SELECT digest_id FROM shingles WHERE value=?1 AND number=?2", - .args = "IS", - .stmt = NULL, - .result = SQLITE_ROW - }, - { - .idx = RSPAMD_FUZZY_BACKEND_GET_DIGEST_BY_ID, - .sql = "SELECT digest, value, time, flag FROM digests WHERE id=?1", - .args = "I", - .stmt = NULL, - .result = SQLITE_ROW - }, - { - .idx = RSPAMD_FUZZY_BACKEND_DELETE, - .sql = "DELETE FROM digests WHERE digest==?1;", - .args = "D", - .stmt = NULL, - .result = SQLITE_DONE - }, - { - .idx = RSPAMD_FUZZY_BACKEND_COUNT, - .sql = "SELECT COUNT(*) FROM digests;", - .args = "", - .stmt = NULL, - .result = SQLITE_ROW - }, - { - .idx = RSPAMD_FUZZY_BACKEND_EXPIRE, - .sql = "DELETE FROM digests WHERE id IN (SELECT id FROM digests WHERE time < ?1 LIMIT ?2);", - .args = "II", - .stmt = NULL, - .result = SQLITE_DONE - }, - { - .idx = RSPAMD_FUZZY_BACKEND_VACUUM, - .sql = "VACUUM;", - .args = "", - .stmt = NULL, - .result = SQLITE_DONE - }, - { - .idx = RSPAMD_FUZZY_BACKEND_DELETE_ORPHANED, - .sql = "DELETE FROM shingles WHERE value=?1 AND number=?2;", - .args = "II", - .stmt = NULL, - .result = SQLITE_DONE - }, - { - .idx = RSPAMD_FUZZY_BACKEND_ADD_SOURCE, - .sql = "INSERT OR IGNORE INTO sources(name, version, last) VALUES (?1, ?2, ?3);", - .args = "TII", - .stmt = NULL, - .result = SQLITE_DONE - }, - { - .idx = RSPAMD_FUZZY_BACKEND_VERSION, - .sql = "SELECT version FROM sources WHERE name=?1;", - .args = "T", - .stmt = NULL, - .result = SQLITE_ROW - }, - { - .idx = RSPAMD_FUZZY_BACKEND_SET_VERSION, - .sql = "INSERT OR REPLACE INTO sources (name, version, last) VALUES (?3, ?1, ?2);", - .args = "IIT", - .stmt = NULL, - .result = SQLITE_DONE - }, + +static const struct rspamd_fuzzy_backend_subr fuzzy_subrs[] = { + [RSPAMD_FUZZY_BACKEND_SQLITE] = { + .init = rspamd_fuzzy_backend_init_sqlite, + .check = rspamd_fuzzy_backend_check_sqlite, + .update = rspamd_fuzzy_backend_update_sqlite, + .count = rspamd_fuzzy_backend_count_sqlite, + .version = rspamd_fuzzy_backend_version_sqlite, + .id = rspamd_fuzzy_backend_id_sqlite, + .expire = rspamd_fuzzy_backend_expire_sqlite, + .close = rspamd_fuzzy_backend_close_sqlite, + } +}; + +struct rspamd_fuzzy_backend { + enum rspamd_fuzzy_backend_type type; + gdouble expire; + gdouble sync; + struct event_base *ev_base; + const struct rspamd_fuzzy_backend_subr *subr; + void *subr_ud; + struct event expire_event; }; static GQuark -rspamd_fuzzy_backend_quark(void) +rspamd_fuzzy_backend_quark (void) { - return g_quark_from_static_string ("fuzzy-storage-backend"); + return g_quark_from_static_string ("fuzzy-backend"); } -static gboolean -rspamd_fuzzy_backend_prepare_stmts (struct rspamd_fuzzy_backend *bk, GError **err) +static void* +rspamd_fuzzy_backend_init_sqlite (struct rspamd_fuzzy_backend *bk, + const ucl_object_t *obj, GError **err) { - int i; + const ucl_object_t *elt; - for (i = 0; i < RSPAMD_FUZZY_BACKEND_MAX; i ++) { - if (prepared_stmts[i].stmt != NULL) { - /* Skip already prepared statements */ - continue; - } - if (sqlite3_prepare_v2 (bk->db, prepared_stmts[i].sql, -1, - &prepared_stmts[i].stmt, NULL) != SQLITE_OK) { - g_set_error (err, rspamd_fuzzy_backend_quark (), - -1, "Cannot initialize prepared sql `%s`: %s", - prepared_stmts[i].sql, sqlite3_errmsg (bk->db)); + elt = ucl_object_lookup_any (obj, "hashfile", "hash_file", "file", + "database", NULL); - return FALSE; - } + if (elt == NULL || ucl_object_type (elt) != UCL_STRING) { + g_set_error (err, rspamd_fuzzy_backend_quark (), + EINVAL, "missing sqlite3 path"); + return NULL; } - return TRUE; + return rspamd_fuzzy_backend_sqlite_open (ucl_object_tostring (elt), + FALSE, err); } -static int -rspamd_fuzzy_backend_cleanup_stmt (struct rspamd_fuzzy_backend *backend, - int idx) +static void +rspamd_fuzzy_backend_check_sqlite (struct rspamd_fuzzy_backend *bk, + const struct rspamd_fuzzy_cmd *cmd, + rspamd_fuzzy_check_cb cb, void *ud, + void *subr_ud) { - sqlite3_stmt *stmt; + struct rspamd_fuzzy_backend_sqlite *sq = subr_ud; + struct rspamd_fuzzy_reply rep; - if (idx < 0 || idx >= RSPAMD_FUZZY_BACKEND_MAX) { + rep = rspamd_fuzzy_backend_sqlite_check (sq, cmd, bk->expire); - return -1; + if (cb) { + cb (&rep, ud); } - - msg_debug_fuzzy_backend ("reseting `%s`", prepared_stmts[idx].sql); - stmt = prepared_stmts[idx].stmt; - sqlite3_clear_bindings (stmt); - sqlite3_reset (stmt); - - return SQLITE_OK; } -static int -rspamd_fuzzy_backend_run_stmt (struct rspamd_fuzzy_backend *backend, - gboolean auto_cleanup, - int idx, ...) +static void +rspamd_fuzzy_backend_update_sqlite (struct rspamd_fuzzy_backend *bk, + GQueue *updates, const gchar *src, + rspamd_fuzzy_update_cb cb, void *ud, + void *subr_ud) { - int retcode; - va_list ap; - sqlite3_stmt *stmt; - int i; - const char *argtypes; - guint retries = 0; - struct timespec ts; - - if (idx < 0 || idx >= RSPAMD_FUZZY_BACKEND_MAX) { - - return -1; - } - - stmt = prepared_stmts[idx].stmt; - g_assert ((int)prepared_stmts[idx].idx == idx); - - if (stmt == NULL) { - if ((retcode = sqlite3_prepare_v2 (backend->db, prepared_stmts[idx].sql, -1, - &prepared_stmts[idx].stmt, NULL)) != SQLITE_OK) { - msg_err_fuzzy_backend ("Cannot initialize prepared sql `%s`: %s", - prepared_stmts[idx].sql, sqlite3_errmsg (backend->db)); + struct rspamd_fuzzy_backend_sqlite *sq = subr_ud; + gboolean success = FALSE; + GList *cur; + struct fuzzy_peer_cmd *io_cmd; + struct rspamd_fuzzy_cmd *cmd; + gpointer ptr; + guint nupdates = 0; + + if (rspamd_fuzzy_backend_sqlite_prepare_update (sq, src)) { + cur = updates->head; + + while (cur) { + io_cmd = cur->data; + + if (io_cmd->is_shingle) { + cmd = &io_cmd->cmd.shingle.basic; + ptr = &io_cmd->cmd.shingle; + } + else { + cmd = &io_cmd->cmd.normal; + ptr = &io_cmd->cmd.normal; + } - return retcode; - } - stmt = prepared_stmts[idx].stmt; - } + if (cmd->cmd == FUZZY_WRITE) { + rspamd_fuzzy_backend_sqlite_add (sq, ptr); + } + else { + rspamd_fuzzy_backend_sqlite_del (sq, ptr); + } - msg_debug_fuzzy_backend ("executing `%s` %s auto cleanup", - prepared_stmts[idx].sql, auto_cleanup ? "with" : "without"); - argtypes = prepared_stmts[idx].args; - sqlite3_clear_bindings (stmt); - sqlite3_reset (stmt); - va_start (ap, idx); - - for (i = 0; argtypes[i] != '\0'; i++) { - switch (argtypes[i]) { - case 'T': - sqlite3_bind_text (stmt, i + 1, va_arg (ap, const char*), -1, - SQLITE_STATIC); - break; - case 'I': - sqlite3_bind_int64 (stmt, i + 1, va_arg (ap, gint64)); - break; - case 'S': - sqlite3_bind_int (stmt, i + 1, va_arg (ap, gint)); - break; - case 'D': - /* Special case for digests variable */ - sqlite3_bind_text (stmt, i + 1, va_arg (ap, const char*), 64, - SQLITE_STATIC); - break; + nupdates ++; + cur = g_list_next (cur); } - } - - va_end (ap); -retry: - retcode = sqlite3_step (stmt); - - if (retcode == prepared_stmts[idx].result) { - retcode = SQLITE_OK; - } - else { - if ((retcode == SQLITE_BUSY || - retcode == SQLITE_LOCKED) && retries++ < max_retries) { - double_to_ts (sql_sleep_time, &ts); - nanosleep (&ts, NULL); - goto retry; + if (rspamd_fuzzy_backend_sqlite_finish_update (sq, src, + nupdates > 0)) { + success = TRUE; } - - msg_debug_fuzzy_backend ("failed to execute query %s: %d, %s", prepared_stmts[idx].sql, - retcode, sqlite3_errmsg (backend->db)); } - if (auto_cleanup) { - sqlite3_clear_bindings (stmt); - sqlite3_reset (stmt); + if (cb) { + cb (success, ud); } - - return retcode; } static void -rspamd_fuzzy_backend_close_stmts (struct rspamd_fuzzy_backend *bk) +rspamd_fuzzy_backend_count_sqlite (struct rspamd_fuzzy_backend *bk, + rspamd_fuzzy_count_cb cb, void *ud, + void *subr_ud) { - int i; - - for (i = 0; i < RSPAMD_FUZZY_BACKEND_MAX; i++) { - if (prepared_stmts[i].stmt != NULL) { - sqlite3_finalize (prepared_stmts[i].stmt); - prepared_stmts[i].stmt = NULL; - } - } + struct rspamd_fuzzy_backend_sqlite *sq = subr_ud; + guint64 nhashes; - return; -} + nhashes = rspamd_fuzzy_backend_sqlite_count (sq); -static gboolean -rspamd_fuzzy_backend_run_sql (const gchar *sql, struct rspamd_fuzzy_backend *bk, - GError **err) -{ - guint retries = 0; - struct timespec ts; - gint ret; - - do { - ret = sqlite3_exec (bk->db, sql, NULL, NULL, NULL); - double_to_ts (sql_sleep_time, &ts); - } while (ret == SQLITE_BUSY && retries++ < max_retries && - nanosleep (&ts, NULL) == 0); - - if (ret != SQLITE_OK) { - g_set_error (err, rspamd_fuzzy_backend_quark (), - -1, "Cannot execute raw sql `%s`: %s", - sql, sqlite3_errmsg (bk->db)); - return FALSE; + if (cb) { + cb (nhashes, ud); } - - return TRUE; } -static struct rspamd_fuzzy_backend * -rspamd_fuzzy_backend_open_db (const gchar *path, GError **err) +static void +rspamd_fuzzy_backend_version_sqlite (struct rspamd_fuzzy_backend *bk, + const gchar *src, + rspamd_fuzzy_version_cb cb, void *ud, + void *subr_ud) { - struct rspamd_fuzzy_backend *bk; - rspamd_cryptobox_hash_state_t st; - guchar hash_out[rspamd_cryptobox_HASHBYTES]; - - g_assert (path != NULL); + struct rspamd_fuzzy_backend_sqlite *sq = subr_ud; + guint64 rev; - bk = g_slice_alloc (sizeof (*bk)); - bk->path = g_strdup (path); - bk->expired = 0; - bk->pool = rspamd_mempool_new (rspamd_mempool_suggest_size (), "fuzzy_backend"); - bk->db = rspamd_sqlite3_open_or_create (bk->pool, bk->path, - create_tables_sql, 1, err); + rev = rspamd_fuzzy_backend_sqlite_version (sq, src); - if (bk->db == NULL) { - rspamd_fuzzy_backend_close (bk); - - return NULL; + if (cb) { + cb (rev, ud); } - - if (!rspamd_fuzzy_backend_prepare_stmts (bk, err)) { - rspamd_fuzzy_backend_close (bk); - - return NULL; - } - - /* Set id for the backend */ - rspamd_cryptobox_hash_init (&st, NULL, 0); - rspamd_cryptobox_hash_update (&st, path, strlen (path)); - rspamd_cryptobox_hash_final (&st, hash_out); - rspamd_snprintf (bk->id, sizeof (bk->id), "%xs", hash_out); - memcpy (bk->pool->tag.uid, bk->id, sizeof (bk->pool->tag.uid)); - - return bk; } -struct rspamd_fuzzy_backend * -rspamd_fuzzy_backend_open (const gchar *path, - gboolean vacuum, - GError **err) +static const gchar* +rspamd_fuzzy_backend_id_sqlite (struct rspamd_fuzzy_backend *bk, + void *subr_ud) { - struct rspamd_fuzzy_backend *backend; - - if (path == NULL) { - g_set_error (err, rspamd_fuzzy_backend_quark (), - ENOENT, "Path has not been specified"); - return NULL; - } - - /* Open database */ - if ((backend = rspamd_fuzzy_backend_open_db (path, err)) == NULL) { - return NULL; - } - - if (rspamd_fuzzy_backend_run_stmt (backend, FALSE, RSPAMD_FUZZY_BACKEND_COUNT) - == SQLITE_OK) { - backend->count = sqlite3_column_int64 ( - prepared_stmts[RSPAMD_FUZZY_BACKEND_COUNT].stmt, 0); - } + struct rspamd_fuzzy_backend_sqlite *sq = subr_ud; - rspamd_fuzzy_backend_cleanup_stmt (backend, RSPAMD_FUZZY_BACKEND_COUNT); - - return backend; + return rspamd_fuzzy_sqlite_backend_id (sq); } - -static gint -rspamd_fuzzy_backend_int64_cmp (const void *a, const void *b) +static void +rspamd_fuzzy_backend_expire_sqlite (struct rspamd_fuzzy_backend *bk, + void *subr_ud) { - gint64 ia = *(gint64 *)a, ib = *(gint64 *)b; + struct rspamd_fuzzy_backend_sqlite *sq = subr_ud; - return (ia - ib); + rspamd_fuzzy_backend_sqlite_sync (sq, bk->expire, TRUE); } -struct rspamd_fuzzy_reply -rspamd_fuzzy_backend_check (struct rspamd_fuzzy_backend *backend, - const struct rspamd_fuzzy_cmd *cmd, gint64 expire) +static void +rspamd_fuzzy_backend_close_sqlite (struct rspamd_fuzzy_backend *bk, + void *subr_ud) { - struct rspamd_fuzzy_reply rep = {0, 0, 0, 0.0}; - const struct rspamd_fuzzy_shingle_cmd *shcmd; - int rc; - gint64 timestamp; - gint64 shingle_values[RSPAMD_SHINGLE_SIZE], i, sel_id, cur_id, - cur_cnt, max_cnt; - - if (backend == NULL) { - return rep; - } + struct rspamd_fuzzy_backend_sqlite *sq = subr_ud; - /* Try direct match first of all */ - rspamd_fuzzy_backend_run_stmt (backend, TRUE, - RSPAMD_FUZZY_BACKEND_TRANSACTION_START); - rc = rspamd_fuzzy_backend_run_stmt (backend, FALSE, - RSPAMD_FUZZY_BACKEND_CHECK, - cmd->digest); - - if (rc == SQLITE_OK) { - timestamp = sqlite3_column_int64 ( - prepared_stmts[RSPAMD_FUZZY_BACKEND_CHECK].stmt, 1); - if (time (NULL) - timestamp > expire) { - /* Expire element */ - msg_debug_fuzzy_backend ("requested hash has been expired"); - } - else { - rep.value = sqlite3_column_int64 ( - prepared_stmts[RSPAMD_FUZZY_BACKEND_CHECK].stmt, 0); - rep.prob = 1.0; - rep.flag = sqlite3_column_int ( - prepared_stmts[RSPAMD_FUZZY_BACKEND_CHECK].stmt, 2); - } - } - else if (cmd->shingles_count > 0) { - /* Fuzzy match */ - - rspamd_fuzzy_backend_cleanup_stmt (backend, RSPAMD_FUZZY_BACKEND_CHECK); - shcmd = (const struct rspamd_fuzzy_shingle_cmd *)cmd; - - for (i = 0; i < RSPAMD_SHINGLE_SIZE; i ++) { - rc = rspamd_fuzzy_backend_run_stmt (backend, FALSE, - RSPAMD_FUZZY_BACKEND_CHECK_SHINGLE, - shcmd->sgl.hashes[i], i); - if (rc == SQLITE_OK) { - shingle_values[i] = sqlite3_column_int64 ( - prepared_stmts[RSPAMD_FUZZY_BACKEND_CHECK_SHINGLE].stmt, - 0); - } - else { - shingle_values[i] = -1; - } - msg_debug_fuzzy_backend ("looking for shingle %L -> %L: %d", i, - shcmd->sgl.hashes[i], rc); - } + rspamd_fuzzy_backend_sqlite_close (sq); +} - rspamd_fuzzy_backend_cleanup_stmt (backend, - RSPAMD_FUZZY_BACKEND_CHECK_SHINGLE); - qsort (shingle_values, RSPAMD_SHINGLE_SIZE, sizeof (gint64), - rspamd_fuzzy_backend_int64_cmp); - sel_id = -1; - cur_id = -1; - cur_cnt = 0; - max_cnt = 0; +struct rspamd_fuzzy_backend * +rspamd_fuzzy_backend_create (struct event_base *ev_base, + const ucl_object_t *config, GError **err) +{ + struct rspamd_fuzzy_backend *bk; + enum rspamd_fuzzy_backend_type type = RSPAMD_FUZZY_BACKEND_SQLITE; + const ucl_object_t *elt; + gdouble expire = DEFAULT_EXPIRE; - for (i = 0; i < RSPAMD_SHINGLE_SIZE; i ++) { - if (shingle_values[i] == -1) { - continue; - } + if (config != NULL) { + elt = ucl_object_lookup (config, "type"); - /* We have some value here, so we need to check it */ - if (shingle_values[i] == cur_id) { - cur_cnt ++; + if (elt != NULL && ucl_object_type (elt) == UCL_STRING) { + if (strcmp (ucl_object_tostring (elt), "sqlite") == 0) { + type = RSPAMD_FUZZY_BACKEND_SQLITE; } else { - cur_id = shingle_values[i]; - if (cur_cnt >= max_cnt) { - max_cnt = cur_cnt; - sel_id = cur_id; - } - cur_cnt = 0; + g_set_error (err, rspamd_fuzzy_backend_quark (), + EINVAL, "invalid backend type: %s", + ucl_object_tostring (elt)); + return NULL; } } - if (cur_cnt > max_cnt) { - max_cnt = cur_cnt; - } - - if (sel_id != -1) { - /* We have some id selected here */ - rep.prob = (float)max_cnt / (float)RSPAMD_SHINGLE_SIZE; - - if (rep.prob > 0.5) { - msg_debug_fuzzy_backend ( - "found fuzzy hash with probability %.2f", - rep.prob); - rc = rspamd_fuzzy_backend_run_stmt (backend, FALSE, - RSPAMD_FUZZY_BACKEND_GET_DIGEST_BY_ID, sel_id); - if (rc == SQLITE_OK) { - timestamp = sqlite3_column_int64 ( - prepared_stmts[RSPAMD_FUZZY_BACKEND_GET_DIGEST_BY_ID].stmt, - 2); - if (time (NULL) - timestamp > expire) { - /* Expire element */ - msg_debug_fuzzy_backend ( - "requested hash has been expired"); - rep.prob = 0.0; - } - else { - rep.value = sqlite3_column_int64 ( - prepared_stmts[RSPAMD_FUZZY_BACKEND_GET_DIGEST_BY_ID].stmt, - 1); - rep.flag = sqlite3_column_int ( - prepared_stmts[RSPAMD_FUZZY_BACKEND_GET_DIGEST_BY_ID].stmt, - 3); - } - } - } - else { - /* Otherwise we assume that as error */ - rep.value = 0; - } + elt = ucl_object_lookup (config, "expire"); - rspamd_fuzzy_backend_cleanup_stmt (backend, - RSPAMD_FUZZY_BACKEND_GET_DIGEST_BY_ID); + if (elt != NULL) { + expire = ucl_object_todouble (elt); } } - rspamd_fuzzy_backend_cleanup_stmt (backend, RSPAMD_FUZZY_BACKEND_CHECK); - rspamd_fuzzy_backend_run_stmt (backend, TRUE, - RSPAMD_FUZZY_BACKEND_TRANSACTION_COMMIT); + bk = g_slice_alloc0 (sizeof (*bk)); + bk->ev_base = ev_base; + bk->expire = expire; + bk->type = type; + bk->subr = &fuzzy_subrs[type]; - return rep; -} - -gboolean -rspamd_fuzzy_backend_prepare_update (struct rspamd_fuzzy_backend *backend, - const gchar *source) -{ - gint rc; - - if (backend == NULL) { - return FALSE; + if ((bk->subr_ud = bk->subr->init (bk, config, err)) == NULL) { + g_slice_free1 (sizeof (*bk), bk); } - rc = rspamd_fuzzy_backend_run_stmt (backend, TRUE, - RSPAMD_FUZZY_BACKEND_TRANSACTION_START); + return bk; +} - if (rc != SQLITE_OK) { - msg_warn_fuzzy_backend ("cannot start transaction for updates: %s", - sqlite3_errmsg (backend->db)); - return FALSE; - } - return TRUE; +void +rspamd_fuzzy_backend_check (struct rspamd_fuzzy_backend *bk, + const struct rspamd_fuzzy_cmd *cmd, + rspamd_fuzzy_check_cb cb, void *ud) +{ + g_assert (bk != NULL); + + bk->subr->check (bk, cmd, cb, ud, bk->subr_ud); } -gboolean -rspamd_fuzzy_backend_add (struct rspamd_fuzzy_backend *backend, - const struct rspamd_fuzzy_cmd *cmd) +void +rspamd_fuzzy_backend_process_updates (struct rspamd_fuzzy_backend *bk, + GQueue *updates, const gchar *src, rspamd_fuzzy_update_cb cb, + void *ud) { - int rc, i; - gint64 id, flag; - const struct rspamd_fuzzy_shingle_cmd *shcmd; + g_assert (bk != NULL); + g_assert (updates != NULL); - if (backend == NULL) { - return FALSE; + if (g_queue_get_length (updates) > 0) { + bk->subr->update (bk, updates, src, cb, ud, bk->subr_ud); } - - rc = rspamd_fuzzy_backend_run_stmt (backend, FALSE, - RSPAMD_FUZZY_BACKEND_CHECK, - cmd->digest); - - if (rc == SQLITE_OK) { - /* Check flag */ - flag = sqlite3_column_int64 ( - prepared_stmts[RSPAMD_FUZZY_BACKEND_CHECK].stmt, - 2); - rspamd_fuzzy_backend_cleanup_stmt (backend, RSPAMD_FUZZY_BACKEND_CHECK); - - if (flag == cmd->flag) { - /* We need to increase weight */ - rc = rspamd_fuzzy_backend_run_stmt (backend, TRUE, - RSPAMD_FUZZY_BACKEND_UPDATE, - (gint64) cmd->value, - cmd->digest); - if (rc != SQLITE_OK) { - msg_warn_fuzzy_backend ("cannot update hash to %d -> " - "%*xs: %s", (gint) cmd->flag, - (gint) sizeof (cmd->digest), cmd->digest, - sqlite3_errmsg (backend->db)); - } - } - else { - /* We need to relearn actually */ - - rc = rspamd_fuzzy_backend_run_stmt (backend, TRUE, - RSPAMD_FUZZY_BACKEND_UPDATE_FLAG, - (gint64) cmd->value, - (gint64) cmd->flag, - cmd->digest); - - if (rc != SQLITE_OK) { - msg_warn_fuzzy_backend ("cannot update hash to %d -> " - "%*xs: %s", (gint) cmd->flag, - (gint) sizeof (cmd->digest), cmd->digest, - sqlite3_errmsg (backend->db)); - } - } + else if (cb) { + cb (TRUE, ud); } - else { - rspamd_fuzzy_backend_cleanup_stmt (backend, RSPAMD_FUZZY_BACKEND_CHECK); - rc = rspamd_fuzzy_backend_run_stmt (backend, FALSE, - RSPAMD_FUZZY_BACKEND_INSERT, - (gint) cmd->flag, - cmd->digest, - (gint64) cmd->value); - - if (rc == SQLITE_OK) { - if (cmd->shingles_count > 0) { - id = sqlite3_last_insert_rowid (backend->db); - shcmd = (const struct rspamd_fuzzy_shingle_cmd *) cmd; - - for (i = 0; i < RSPAMD_SHINGLE_SIZE; i++) { - rc = rspamd_fuzzy_backend_run_stmt (backend, TRUE, - RSPAMD_FUZZY_BACKEND_INSERT_SHINGLE, - shcmd->sgl.hashes[i], (gint64)i, id); - msg_debug_fuzzy_backend ("add shingle %d -> %L: %L", - i, - shcmd->sgl.hashes[i], - id); - - if (rc != SQLITE_OK) { - msg_warn_fuzzy_backend ("cannot add shingle %d -> " - "%L: %L: %s", i, - shcmd->sgl.hashes[i], - id, sqlite3_errmsg (backend->db)); - } - } - } - } - else { - msg_warn_fuzzy_backend ("cannot add hash to %d -> " - "%*xs: %s", (gint)cmd->flag, - (gint)sizeof (cmd->digest), cmd->digest, - sqlite3_errmsg (backend->db)); - } - - rspamd_fuzzy_backend_cleanup_stmt (backend, - RSPAMD_FUZZY_BACKEND_INSERT); - } - - return (rc == SQLITE_OK); } -gboolean -rspamd_fuzzy_backend_finish_update (struct rspamd_fuzzy_backend *backend, - const gchar *source, gboolean version_bump) -{ - gint rc = SQLITE_OK, wal_frames, wal_checkpointed, ver; - /* Get and update version */ - if (version_bump) { - ver = rspamd_fuzzy_backend_version (backend, source); - ++ver; +void +rspamd_fuzzy_backend_count (struct rspamd_fuzzy_backend *bk, + rspamd_fuzzy_count_cb cb, void *ud) +{ + g_assert (bk != NULL); - rc = rspamd_fuzzy_backend_run_stmt (backend, TRUE, - RSPAMD_FUZZY_BACKEND_SET_VERSION, - (gint64)ver, (gint64)time (NULL), source); - } + bk->subr->count (bk, cb, ud, bk->subr_ud); +} - if (rc == SQLITE_OK) { - rc = rspamd_fuzzy_backend_run_stmt (backend, TRUE, - RSPAMD_FUZZY_BACKEND_TRANSACTION_COMMIT); - if (rc != SQLITE_OK) { - msg_warn_fuzzy_backend ("cannot commit updates: %s", - sqlite3_errmsg (backend->db)); - rspamd_fuzzy_backend_run_stmt (backend, TRUE, - RSPAMD_FUZZY_BACKEND_TRANSACTION_ROLLBACK); - return FALSE; - } - else { - if (!rspamd_sqlite3_sync (backend->db, &wal_frames, &wal_checkpointed)) { - msg_warn_fuzzy_backend ("cannot commit checkpoint: %s", - sqlite3_errmsg (backend->db)); - } - else if (wal_checkpointed > 0) { - msg_info_fuzzy_backend ("total number of frames in the wal file: " - "%d, checkpointed: %d", wal_frames, wal_checkpointed); - } - } - } - else { - msg_warn_fuzzy_backend ("cannot update version for %s: %s", source, - sqlite3_errmsg (backend->db)); - rspamd_fuzzy_backend_run_stmt (backend, TRUE, - RSPAMD_FUZZY_BACKEND_TRANSACTION_ROLLBACK); - return FALSE; - } +void +rspamd_fuzzy_backend_version (struct rspamd_fuzzy_backend *bk, + const gchar *src, + rspamd_fuzzy_version_cb cb, void *ud) +{ + g_assert (bk != NULL); - return TRUE; + bk->subr->version (bk, src, cb, ud, bk->subr_ud); } -gboolean -rspamd_fuzzy_backend_del (struct rspamd_fuzzy_backend *backend, - const struct rspamd_fuzzy_cmd *cmd) +const gchar * +rspamd_fuzzy_backend_id (struct rspamd_fuzzy_backend *bk) { - int rc = -1; + g_assert (bk != NULL); - if (backend == NULL) { - return FALSE; + if (bk->subr->id) { + return bk->subr->id (bk, bk->subr_ud); } - rc = rspamd_fuzzy_backend_run_stmt (backend, FALSE, - RSPAMD_FUZZY_BACKEND_CHECK, - cmd->digest); - - if (rc == SQLITE_OK) { - rspamd_fuzzy_backend_cleanup_stmt (backend, RSPAMD_FUZZY_BACKEND_CHECK); - - rc = rspamd_fuzzy_backend_run_stmt (backend, TRUE, - RSPAMD_FUZZY_BACKEND_DELETE, - cmd->digest); - if (rc != SQLITE_OK) { - msg_warn_fuzzy_backend ("cannot update hash to %d -> " - "%*xs: %s", (gint) cmd->flag, - (gint) sizeof (cmd->digest), cmd->digest, - sqlite3_errmsg (backend->db)); - } - } - else { - /* Hash is missing */ - rspamd_fuzzy_backend_cleanup_stmt (backend, RSPAMD_FUZZY_BACKEND_CHECK); - } - - return (rc == SQLITE_OK); + return NULL; } -gboolean -rspamd_fuzzy_backend_sync (struct rspamd_fuzzy_backend *backend, - gint64 expire, - gboolean clean_orphaned) +static void +rspamd_fuzzy_backend_expire_cb (gint fd, short what, void *ud) { - struct orphaned_shingle_elt { - gint64 value; - gint64 number; - }; - - /* Do not do more than 5k ops per step */ - const guint64 max_changes = 5000; - gboolean ret = FALSE; - gint64 expire_lim, expired; - gint rc, i, orphaned_cnt = 0; - GError *err = NULL; - static const gchar orphaned_shingles[] = "SELECT shingles.value,shingles.number " - "FROM shingles " - "LEFT JOIN digests ON " - "shingles.digest_id=digests.id WHERE " - "digests.id IS NULL;"; - sqlite3_stmt *stmt; - GArray *orphaned; - struct orphaned_shingle_elt orphaned_elt, *pelt; - - - if (backend == NULL) { - return FALSE; - } - - /* Perform expire */ - if (expire > 0) { - expire_lim = time (NULL) - expire; - - if (expire_lim > 0) { - ret = rspamd_fuzzy_backend_run_stmt (backend, TRUE, - RSPAMD_FUZZY_BACKEND_TRANSACTION_START); - - if (ret == SQLITE_OK) { - - rc = rspamd_fuzzy_backend_run_stmt (backend, FALSE, - RSPAMD_FUZZY_BACKEND_EXPIRE, expire_lim, max_changes); - - if (rc == SQLITE_OK) { - expired = sqlite3_changes (backend->db); - - if (expired > 0) { - backend->expired += expired; - msg_info_fuzzy_backend ("expired %L hashes", expired); - } - } - else { - msg_warn_fuzzy_backend ( - "cannot execute expired statement: %s", - sqlite3_errmsg (backend->db)); - } - - rspamd_fuzzy_backend_cleanup_stmt (backend, - RSPAMD_FUZZY_BACKEND_EXPIRE); - - ret = rspamd_fuzzy_backend_run_stmt (backend, TRUE, - RSPAMD_FUZZY_BACKEND_TRANSACTION_COMMIT); - - if (ret != SQLITE_OK) { - rspamd_fuzzy_backend_run_stmt (backend, TRUE, - RSPAMD_FUZZY_BACKEND_TRANSACTION_ROLLBACK); - } - } - if (ret != SQLITE_OK) { - msg_warn_fuzzy_backend ("cannot expire db: %s", - sqlite3_errmsg (backend->db)); - } - } - } - - /* Cleanup database */ - if (clean_orphaned) { - ret = rspamd_fuzzy_backend_run_stmt (backend, TRUE, - RSPAMD_FUZZY_BACKEND_TRANSACTION_START); - - if (ret == SQLITE_OK) { - if ((rc = sqlite3_prepare_v2 (backend->db, - orphaned_shingles, - -1, - &stmt, - NULL)) != SQLITE_OK) { - msg_warn_fuzzy_backend ("cannot cleanup shingles: %s", - sqlite3_errmsg (backend->db)); - } - else { - orphaned = g_array_new (FALSE, - FALSE, - sizeof (struct orphaned_shingle_elt)); - - while (sqlite3_step (stmt) == SQLITE_ROW) { - orphaned_elt.value = sqlite3_column_int64 (stmt, 0); - orphaned_elt.number = sqlite3_column_int64 (stmt, 1); - g_array_append_val (orphaned, orphaned_elt); - - if (orphaned->len > max_changes) { - break; - } - } - - sqlite3_finalize (stmt); - orphaned_cnt = orphaned->len; - - if (orphaned_cnt > 0) { - msg_info_fuzzy_backend ( - "going to delete %ud orphaned shingles", - orphaned_cnt); - /* Need to delete orphaned elements */ - for (i = 0; i < (gint) orphaned_cnt; i++) { - pelt = &g_array_index (orphaned, - struct orphaned_shingle_elt, - i); - rspamd_fuzzy_backend_run_stmt (backend, TRUE, - RSPAMD_FUZZY_BACKEND_DELETE_ORPHANED, - pelt->value, pelt->number); - } - } - - - g_array_free (orphaned, TRUE); - } - - ret = rspamd_fuzzy_backend_run_stmt (backend, TRUE, - RSPAMD_FUZZY_BACKEND_TRANSACTION_COMMIT); - - if (ret == SQLITE_OK) { - msg_info_fuzzy_backend ( - "deleted %ud orphaned shingles", - orphaned_cnt); - } - else { - msg_warn_fuzzy_backend ( - "cannot synchronize fuzzy backend: %e", - err); - rspamd_fuzzy_backend_run_stmt (backend, TRUE, - RSPAMD_FUZZY_BACKEND_TRANSACTION_ROLLBACK); - } - } - } - - return ret; + struct rspamd_fuzzy_backend *bk = ud; + gdouble jittered; + struct timeval tv; + + jittered = rspamd_time_jitter (bk->sync, bk->sync / 2.0); + double_to_tv (jittered, &tv); + event_del (&bk->expire_event); + bk->subr->expire (bk, bk->subr_ud); + event_add (&bk->expire_event, &tv); } - void -rspamd_fuzzy_backend_close (struct rspamd_fuzzy_backend *backend) +rspamd_fuzzy_backend_start_expire (struct rspamd_fuzzy_backend *bk, + gdouble timeout) { - if (backend != NULL) { - if (backend->db != NULL) { - rspamd_fuzzy_backend_close_stmts (backend); - sqlite3_close (backend->db); - } + gdouble jittered; + struct timeval tv; - if (backend->path != NULL) { - g_free (backend->path); - } - - if (backend->pool) { - rspamd_mempool_delete (backend->pool); - } + g_assert (bk != NULL); - g_slice_free1 (sizeof (*backend), backend); - } -} - - -gsize -rspamd_fuzzy_backend_count (struct rspamd_fuzzy_backend *backend) -{ - if (backend) { - if (rspamd_fuzzy_backend_run_stmt (backend, FALSE, - RSPAMD_FUZZY_BACKEND_COUNT) == SQLITE_OK) { - backend->count = sqlite3_column_int64 ( - prepared_stmts[RSPAMD_FUZZY_BACKEND_COUNT].stmt, 0); + if (bk->subr->expire) { + if (bk->sync > 0.0) { + event_del (&bk->expire_event); } - rspamd_fuzzy_backend_cleanup_stmt (backend, RSPAMD_FUZZY_BACKEND_COUNT); - - return backend->count; + bk->subr->expire (bk, bk->subr_ud); + bk->sync = timeout; + jittered = rspamd_time_jitter (timeout, timeout / 2.0); + double_to_tv (jittered, &tv); + event_set (&bk->expire_event, -1, EV_TIMEOUT, + rspamd_fuzzy_backend_expire_cb, bk); + event_base_set (bk->ev_base, &bk->expire_event); + event_add (&bk->expire_event, &tv); } - - return 0; } -gint -rspamd_fuzzy_backend_version (struct rspamd_fuzzy_backend *backend, - const gchar *source) +void +rspamd_fuzzy_backend_close (struct rspamd_fuzzy_backend *bk) { - gint ret = -1; + g_assert (bk != NULL); - if (backend) { - if (rspamd_fuzzy_backend_run_stmt (backend, FALSE, - RSPAMD_FUZZY_BACKEND_VERSION, source) == SQLITE_OK) { - ret = sqlite3_column_int64 ( - prepared_stmts[RSPAMD_FUZZY_BACKEND_VERSION].stmt, 0); - } + bk->subr->close (bk, bk->subr_ud); - rspamd_fuzzy_backend_cleanup_stmt (backend, RSPAMD_FUZZY_BACKEND_VERSION); + if (bk->sync > 0.0) { + bk->subr->expire (bk, bk->subr_ud); + event_del (&bk->expire_event); } - return ret; -} - -gsize -rspamd_fuzzy_backend_expired (struct rspamd_fuzzy_backend *backend) -{ - return backend != NULL ? backend->expired : 0; -} - -const gchar * -rspamd_fuzzy_backend_id (struct rspamd_fuzzy_backend *backend) -{ - return backend != NULL ? backend->id : 0; + g_slice_free1 (sizeof (*bk), bk); } diff --git a/src/libserver/fuzzy_backend.h b/src/libserver/fuzzy_backend.h index a1736b676..adb7e5075 100644 --- a/src/libserver/fuzzy_backend.h +++ b/src/libserver/fuzzy_backend.h @@ -13,86 +13,92 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -#ifndef FUZZY_BACKEND_H_ -#define FUZZY_BACKEND_H_ +#ifndef SRC_LIBSERVER_FUZZY_BACKEND_H_ +#define SRC_LIBSERVER_FUZZY_BACKEND_H_ #include "config.h" -#include "fuzzy_storage.h" - +#include <event.h> +#include "fuzzy_wire.h" struct rspamd_fuzzy_backend; +/* + * Callbacks for fuzzy methods + */ +typedef void (*rspamd_fuzzy_check_cb) (struct rspamd_fuzzy_reply *rep, void *ud); +typedef void (*rspamd_fuzzy_update_cb) (gboolean success, void *ud); +typedef void (*rspamd_fuzzy_version_cb) (guint64 rev, void *ud); +typedef void (*rspamd_fuzzy_count_cb) (guint64 count, void *ud); + /** * Open fuzzy backend - * @param path file to open (legacy file will be converted automatically) - * @param err error pointer - * @return backend structure or NULL + * @param ev_base + * @param config + * @param err + * @return */ -struct rspamd_fuzzy_backend *rspamd_fuzzy_backend_open (const gchar *path, - gboolean vacuum, - GError **err); +struct rspamd_fuzzy_backend * rspamd_fuzzy_backend_create (struct event_base *ev_base, + const ucl_object_t *config, GError **err); + /** - * Check specified fuzzy in the backend - * @param backend + * Check a specific hash in storage * @param cmd - * @return reply with probability and weight + * @param cb + * @param ud */ -struct rspamd_fuzzy_reply rspamd_fuzzy_backend_check ( - struct rspamd_fuzzy_backend *backend, +void rspamd_fuzzy_backend_check (struct rspamd_fuzzy_backend *bk, const struct rspamd_fuzzy_cmd *cmd, - gint64 expire); + rspamd_fuzzy_check_cb cb, void *ud); /** - * Prepare storage for updates (by starting transaction) + * Process updates for a specific queue + * @param bk + * @param updates queue of struct fuzzy_peer_cmd + * @param src */ -gboolean rspamd_fuzzy_backend_prepare_update (struct rspamd_fuzzy_backend *backend, - const gchar *source); +void rspamd_fuzzy_backend_process_updates (struct rspamd_fuzzy_backend *bk, + GQueue *updates, const gchar *src, rspamd_fuzzy_update_cb cb, + void *ud); /** - * Add digest to the database - * @param backend - * @param cmd - * @return + * Gets number of hashes from the backend + * @param bk + * @param cb + * @param ud */ -gboolean rspamd_fuzzy_backend_add (struct rspamd_fuzzy_backend *backend, - const struct rspamd_fuzzy_cmd *cmd); +void rspamd_fuzzy_backend_count (struct rspamd_fuzzy_backend *bk, + rspamd_fuzzy_count_cb cb, void *ud); /** - * Delete digest from the database - * @param backend - * @param cmd - * @return + * Returns number of revision for a specific source + * @param bk + * @param src + * @param cb + * @param ud */ -gboolean rspamd_fuzzy_backend_del ( - struct rspamd_fuzzy_backend *backend, - const struct rspamd_fuzzy_cmd *cmd); +void rspamd_fuzzy_backend_version (struct rspamd_fuzzy_backend *bk, + const gchar *src, + rspamd_fuzzy_version_cb cb, void *ud); /** - * Commit updates to storage + * Returns unique id for backend + * @param backend + * @return */ -gboolean rspamd_fuzzy_backend_finish_update (struct rspamd_fuzzy_backend *backend, - const gchar *source, gboolean version_bump); +const gchar * rspamd_fuzzy_backend_id (struct rspamd_fuzzy_backend *backend); /** - * Sync storage + * Starts expire process for the backend * @param backend - * @return */ -gboolean rspamd_fuzzy_backend_sync (struct rspamd_fuzzy_backend *backend, - gint64 expire, - gboolean clean_orphaned); +void rspamd_fuzzy_backend_start_expire (struct rspamd_fuzzy_backend *backend, + gdouble timeout); /** - * Close storage + * Closes backend * @param backend */ void rspamd_fuzzy_backend_close (struct rspamd_fuzzy_backend *backend); -gsize rspamd_fuzzy_backend_count (struct rspamd_fuzzy_backend *backend); -gint rspamd_fuzzy_backend_version (struct rspamd_fuzzy_backend *backend, const gchar *source); -gsize rspamd_fuzzy_backend_expired (struct rspamd_fuzzy_backend *backend); - -const gchar * rspamd_fuzzy_backend_id (struct rspamd_fuzzy_backend *backend); - -#endif /* FUZZY_BACKEND_H_ */ +#endif /* SRC_LIBSERVER_FUZZY_BACKEND_H_ */ diff --git a/src/libserver/fuzzy_backend_sqlite.c b/src/libserver/fuzzy_backend_sqlite.c new file mode 100644 index 000000000..834d274c8 --- /dev/null +++ b/src/libserver/fuzzy_backend_sqlite.c @@ -0,0 +1,1055 @@ +/*- + * Copyright 2016 Vsevolod Stakhov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "config.h" +#include "rspamd.h" +#include "fuzzy_backend.h" +#include "fuzzy_backend_sqlite.h" +#include "unix-std.h" + +#include <sqlite3.h> +#include "libutil/sqlite_utils.h" + +struct rspamd_fuzzy_backend_sqlite { + sqlite3 *db; + char *path; + gchar id[MEMPOOL_UID_LEN]; + gsize count; + gsize expired; + rspamd_mempool_t *pool; +}; + +static const gdouble sql_sleep_time = 0.1; +static const guint max_retries = 10; + +#define msg_err_fuzzy_backend(...) rspamd_default_log_function (G_LOG_LEVEL_CRITICAL, \ + backend->pool->tag.tagname, backend->pool->tag.uid, \ + G_STRFUNC, \ + __VA_ARGS__) +#define msg_warn_fuzzy_backend(...) rspamd_default_log_function (G_LOG_LEVEL_WARNING, \ + backend->pool->tag.tagname, backend->pool->tag.uid, \ + G_STRFUNC, \ + __VA_ARGS__) +#define msg_info_fuzzy_backend(...) rspamd_default_log_function (G_LOG_LEVEL_INFO, \ + backend->pool->tag.tagname, backend->pool->tag.uid, \ + G_STRFUNC, \ + __VA_ARGS__) +#define msg_debug_fuzzy_backend(...) rspamd_default_log_function (G_LOG_LEVEL_DEBUG, \ + backend->pool->tag.tagname, backend->pool->tag.uid, \ + G_STRFUNC, \ + __VA_ARGS__) + +static const char *create_tables_sql = + "BEGIN;" + "CREATE TABLE IF NOT EXISTS digests(" + " id INTEGER PRIMARY KEY," + " flag INTEGER NOT NULL," + " digest TEXT NOT NULL," + " value INTEGER," + " time INTEGER);" + "CREATE TABLE IF NOT EXISTS shingles(" + " value INTEGER NOT NULL," + " number INTEGER NOT NULL," + " digest_id INTEGER REFERENCES digests(id) ON DELETE CASCADE " + " ON UPDATE CASCADE);" + "CREATE TABLE IF NOT EXISTS sources(" + " name TEXT UNIQUE," + " version INTEGER," + " last INTEGER);" + "CREATE UNIQUE INDEX IF NOT EXISTS d ON digests(digest);" + "CREATE INDEX IF NOT EXISTS t ON digests(time);" + "CREATE INDEX IF NOT EXISTS dgst_id ON shingles(digest_id);" + "CREATE UNIQUE INDEX IF NOT EXISTS s ON shingles(value, number);" + "COMMIT;"; +#if 0 +static const char *create_index_sql = + "BEGIN;" + "CREATE UNIQUE INDEX IF NOT EXISTS d ON digests(digest);" + "CREATE INDEX IF NOT EXISTS t ON digests(time);" + "CREATE INDEX IF NOT EXISTS dgst_id ON shingles(digest_id);" + "CREATE UNIQUE INDEX IF NOT EXISTS s ON shingles(value, number);" + "COMMIT;"; +#endif +enum rspamd_fuzzy_statement_idx { + RSPAMD_FUZZY_BACKEND_TRANSACTION_START = 0, + RSPAMD_FUZZY_BACKEND_TRANSACTION_COMMIT, + RSPAMD_FUZZY_BACKEND_TRANSACTION_ROLLBACK, + RSPAMD_FUZZY_BACKEND_INSERT, + RSPAMD_FUZZY_BACKEND_UPDATE, + RSPAMD_FUZZY_BACKEND_UPDATE_FLAG, + RSPAMD_FUZZY_BACKEND_INSERT_SHINGLE, + RSPAMD_FUZZY_BACKEND_CHECK, + RSPAMD_FUZZY_BACKEND_CHECK_SHINGLE, + RSPAMD_FUZZY_BACKEND_GET_DIGEST_BY_ID, + RSPAMD_FUZZY_BACKEND_DELETE, + RSPAMD_FUZZY_BACKEND_COUNT, + RSPAMD_FUZZY_BACKEND_EXPIRE, + RSPAMD_FUZZY_BACKEND_VACUUM, + RSPAMD_FUZZY_BACKEND_DELETE_ORPHANED, + RSPAMD_FUZZY_BACKEND_ADD_SOURCE, + RSPAMD_FUZZY_BACKEND_VERSION, + RSPAMD_FUZZY_BACKEND_SET_VERSION, + RSPAMD_FUZZY_BACKEND_MAX +}; +static struct rspamd_fuzzy_stmts { + enum rspamd_fuzzy_statement_idx idx; + const gchar *sql; + const gchar *args; + sqlite3_stmt *stmt; + gint result; +} prepared_stmts[RSPAMD_FUZZY_BACKEND_MAX] = +{ + { + .idx = RSPAMD_FUZZY_BACKEND_TRANSACTION_START, + .sql = "BEGIN TRANSACTION;", + .args = "", + .stmt = NULL, + .result = SQLITE_DONE + }, + { + .idx = RSPAMD_FUZZY_BACKEND_TRANSACTION_COMMIT, + .sql = "COMMIT;", + .args = "", + .stmt = NULL, + .result = SQLITE_DONE + }, + { + .idx = RSPAMD_FUZZY_BACKEND_TRANSACTION_ROLLBACK, + .sql = "ROLLBACK;", + .args = "", + .stmt = NULL, + .result = SQLITE_DONE + }, + { + .idx = RSPAMD_FUZZY_BACKEND_INSERT, + .sql = "INSERT INTO digests(flag, digest, value, time) VALUES" + "(?1, ?2, ?3, strftime('%s','now'));", + .args = "SDI", + .stmt = NULL, + .result = SQLITE_DONE + }, + { + .idx = RSPAMD_FUZZY_BACKEND_UPDATE, + .sql = "UPDATE digests SET value = value + ?1, time = strftime('%s','now') WHERE " + "digest==?2;", + .args = "ID", + .stmt = NULL, + .result = SQLITE_DONE + }, + { + .idx = RSPAMD_FUZZY_BACKEND_UPDATE_FLAG, + .sql = "UPDATE digests SET value = ?1, flag = ?2, time = strftime('%s','now') WHERE " + "digest==?3;", + .args = "IID", + .stmt = NULL, + .result = SQLITE_DONE + }, + { + .idx = RSPAMD_FUZZY_BACKEND_INSERT_SHINGLE, + .sql = "INSERT OR REPLACE INTO shingles(value, number, digest_id) " + "VALUES (?1, ?2, ?3);", + .args = "III", + .stmt = NULL, + .result = SQLITE_DONE + }, + { + .idx = RSPAMD_FUZZY_BACKEND_CHECK, + .sql = "SELECT value, time, flag FROM digests WHERE digest==?1;", + .args = "D", + .stmt = NULL, + .result = SQLITE_ROW + }, + { + .idx = RSPAMD_FUZZY_BACKEND_CHECK_SHINGLE, + .sql = "SELECT digest_id FROM shingles WHERE value=?1 AND number=?2", + .args = "IS", + .stmt = NULL, + .result = SQLITE_ROW + }, + { + .idx = RSPAMD_FUZZY_BACKEND_GET_DIGEST_BY_ID, + .sql = "SELECT digest, value, time, flag FROM digests WHERE id=?1", + .args = "I", + .stmt = NULL, + .result = SQLITE_ROW + }, + { + .idx = RSPAMD_FUZZY_BACKEND_DELETE, + .sql = "DELETE FROM digests WHERE digest==?1;", + .args = "D", + .stmt = NULL, + .result = SQLITE_DONE + }, + { + .idx = RSPAMD_FUZZY_BACKEND_COUNT, + .sql = "SELECT COUNT(*) FROM digests;", + .args = "", + .stmt = NULL, + .result = SQLITE_ROW + }, + { + .idx = RSPAMD_FUZZY_BACKEND_EXPIRE, + .sql = "DELETE FROM digests WHERE id IN (SELECT id FROM digests WHERE time < ?1 LIMIT ?2);", + .args = "II", + .stmt = NULL, + .result = SQLITE_DONE + }, + { + .idx = RSPAMD_FUZZY_BACKEND_VACUUM, + .sql = "VACUUM;", + .args = "", + .stmt = NULL, + .result = SQLITE_DONE + }, + { + .idx = RSPAMD_FUZZY_BACKEND_DELETE_ORPHANED, + .sql = "DELETE FROM shingles WHERE value=?1 AND number=?2;", + .args = "II", + .stmt = NULL, + .result = SQLITE_DONE + }, + { + .idx = RSPAMD_FUZZY_BACKEND_ADD_SOURCE, + .sql = "INSERT OR IGNORE INTO sources(name, version, last) VALUES (?1, ?2, ?3);", + .args = "TII", + .stmt = NULL, + .result = SQLITE_DONE + }, + { + .idx = RSPAMD_FUZZY_BACKEND_VERSION, + .sql = "SELECT version FROM sources WHERE name=?1;", + .args = "T", + .stmt = NULL, + .result = SQLITE_ROW + }, + { + .idx = RSPAMD_FUZZY_BACKEND_SET_VERSION, + .sql = "INSERT OR REPLACE INTO sources (name, version, last) VALUES (?3, ?1, ?2);", + .args = "IIT", + .stmt = NULL, + .result = SQLITE_DONE + }, +}; + +static GQuark +rspamd_fuzzy_backend_sqlite_quark (void) +{ + return g_quark_from_static_string ("fuzzy-backend-sqlite"); +} + +static gboolean +rspamd_fuzzy_backend_sqlite_prepare_stmts (struct rspamd_fuzzy_backend_sqlite *bk, GError **err) +{ + int i; + + for (i = 0; i < RSPAMD_FUZZY_BACKEND_MAX; i ++) { + if (prepared_stmts[i].stmt != NULL) { + /* Skip already prepared statements */ + continue; + } + if (sqlite3_prepare_v2 (bk->db, prepared_stmts[i].sql, -1, + &prepared_stmts[i].stmt, NULL) != SQLITE_OK) { + g_set_error (err, rspamd_fuzzy_backend_sqlite_quark (), + -1, "Cannot initialize prepared sql `%s`: %s", + prepared_stmts[i].sql, sqlite3_errmsg (bk->db)); + + return FALSE; + } + } + + return TRUE; +} + +static int +rspamd_fuzzy_backend_sqlite_cleanup_stmt (struct rspamd_fuzzy_backend_sqlite *backend, + int idx) +{ + sqlite3_stmt *stmt; + + if (idx < 0 || idx >= RSPAMD_FUZZY_BACKEND_MAX) { + + return -1; + } + + msg_debug_fuzzy_backend ("reseting `%s`", prepared_stmts[idx].sql); + stmt = prepared_stmts[idx].stmt; + sqlite3_clear_bindings (stmt); + sqlite3_reset (stmt); + + return SQLITE_OK; +} + +static int +rspamd_fuzzy_backend_sqlite_run_stmt (struct rspamd_fuzzy_backend_sqlite *backend, + gboolean auto_cleanup, + int idx, ...) +{ + int retcode; + va_list ap; + sqlite3_stmt *stmt; + int i; + const char *argtypes; + guint retries = 0; + struct timespec ts; + + if (idx < 0 || idx >= RSPAMD_FUZZY_BACKEND_MAX) { + + return -1; + } + + stmt = prepared_stmts[idx].stmt; + g_assert ((int)prepared_stmts[idx].idx == idx); + + if (stmt == NULL) { + if ((retcode = sqlite3_prepare_v2 (backend->db, prepared_stmts[idx].sql, -1, + &prepared_stmts[idx].stmt, NULL)) != SQLITE_OK) { + msg_err_fuzzy_backend ("Cannot initialize prepared sql `%s`: %s", + prepared_stmts[idx].sql, sqlite3_errmsg (backend->db)); + + return retcode; + } + stmt = prepared_stmts[idx].stmt; + } + + msg_debug_fuzzy_backend ("executing `%s` %s auto cleanup", + prepared_stmts[idx].sql, auto_cleanup ? "with" : "without"); + argtypes = prepared_stmts[idx].args; + sqlite3_clear_bindings (stmt); + sqlite3_reset (stmt); + va_start (ap, idx); + + for (i = 0; argtypes[i] != '\0'; i++) { + switch (argtypes[i]) { + case 'T': + sqlite3_bind_text (stmt, i + 1, va_arg (ap, const char*), -1, + SQLITE_STATIC); + break; + case 'I': + sqlite3_bind_int64 (stmt, i + 1, va_arg (ap, gint64)); + break; + case 'S': + sqlite3_bind_int (stmt, i + 1, va_arg (ap, gint)); + break; + case 'D': + /* Special case for digests variable */ + sqlite3_bind_text (stmt, i + 1, va_arg (ap, const char*), 64, + SQLITE_STATIC); + break; + } + } + + va_end (ap); + +retry: + retcode = sqlite3_step (stmt); + + if (retcode == prepared_stmts[idx].result) { + retcode = SQLITE_OK; + } + else { + if ((retcode == SQLITE_BUSY || + retcode == SQLITE_LOCKED) && retries++ < max_retries) { + double_to_ts (sql_sleep_time, &ts); + nanosleep (&ts, NULL); + goto retry; + } + + msg_debug_fuzzy_backend ("failed to execute query %s: %d, %s", prepared_stmts[idx].sql, + retcode, sqlite3_errmsg (backend->db)); + } + + if (auto_cleanup) { + sqlite3_clear_bindings (stmt); + sqlite3_reset (stmt); + } + + return retcode; +} + +static void +rspamd_fuzzy_backend_sqlite_close_stmts (struct rspamd_fuzzy_backend_sqlite *bk) +{ + int i; + + for (i = 0; i < RSPAMD_FUZZY_BACKEND_MAX; i++) { + if (prepared_stmts[i].stmt != NULL) { + sqlite3_finalize (prepared_stmts[i].stmt); + prepared_stmts[i].stmt = NULL; + } + } + + return; +} + +static gboolean +rspamd_fuzzy_backend_sqlite_run_sql (const gchar *sql, struct rspamd_fuzzy_backend_sqlite *bk, + GError **err) +{ + guint retries = 0; + struct timespec ts; + gint ret; + + do { + ret = sqlite3_exec (bk->db, sql, NULL, NULL, NULL); + double_to_ts (sql_sleep_time, &ts); + } while (ret == SQLITE_BUSY && retries++ < max_retries && + nanosleep (&ts, NULL) == 0); + + if (ret != SQLITE_OK) { + g_set_error (err, rspamd_fuzzy_backend_sqlite_quark (), + -1, "Cannot execute raw sql `%s`: %s", + sql, sqlite3_errmsg (bk->db)); + return FALSE; + } + + return TRUE; +} + +static struct rspamd_fuzzy_backend_sqlite * +rspamd_fuzzy_backend_sqlite_open_db (const gchar *path, GError **err) +{ + struct rspamd_fuzzy_backend_sqlite *bk; + rspamd_cryptobox_hash_state_t st; + guchar hash_out[rspamd_cryptobox_HASHBYTES]; + + g_assert (path != NULL); + + bk = g_slice_alloc (sizeof (*bk)); + bk->path = g_strdup (path); + bk->expired = 0; + bk->pool = rspamd_mempool_new (rspamd_mempool_suggest_size (), "fuzzy_backend"); + bk->db = rspamd_sqlite3_open_or_create (bk->pool, bk->path, + create_tables_sql, 1, err); + + if (bk->db == NULL) { + rspamd_fuzzy_backend_sqlite_close (bk); + + return NULL; + } + + if (!rspamd_fuzzy_backend_sqlite_prepare_stmts (bk, err)) { + rspamd_fuzzy_backend_sqlite_close (bk); + + return NULL; + } + + /* Set id for the backend */ + rspamd_cryptobox_hash_init (&st, NULL, 0); + rspamd_cryptobox_hash_update (&st, path, strlen (path)); + rspamd_cryptobox_hash_final (&st, hash_out); + rspamd_snprintf (bk->id, sizeof (bk->id), "%xs", hash_out); + memcpy (bk->pool->tag.uid, bk->id, sizeof (bk->pool->tag.uid)); + + return bk; +} + +struct rspamd_fuzzy_backend_sqlite * +rspamd_fuzzy_backend_sqlite_open (const gchar *path, + gboolean vacuum, + GError **err) +{ + struct rspamd_fuzzy_backend_sqlite *backend; + + if (path == NULL) { + g_set_error (err, rspamd_fuzzy_backend_sqlite_quark (), + ENOENT, "Path has not been specified"); + return NULL; + } + + /* Open database */ + if ((backend = rspamd_fuzzy_backend_sqlite_open_db (path, err)) == NULL) { + return NULL; + } + + if (rspamd_fuzzy_backend_sqlite_run_stmt (backend, FALSE, RSPAMD_FUZZY_BACKEND_COUNT) + == SQLITE_OK) { + backend->count = sqlite3_column_int64 ( + prepared_stmts[RSPAMD_FUZZY_BACKEND_COUNT].stmt, 0); + } + + rspamd_fuzzy_backend_sqlite_cleanup_stmt (backend, RSPAMD_FUZZY_BACKEND_COUNT); + + return backend; +} + +static gint +rspamd_fuzzy_backend_sqlite_int64_cmp (const void *a, const void *b) +{ + gint64 ia = *(gint64 *)a, ib = *(gint64 *)b; + + return (ia - ib); +} + +struct rspamd_fuzzy_reply +rspamd_fuzzy_backend_sqlite_check (struct rspamd_fuzzy_backend_sqlite *backend, + const struct rspamd_fuzzy_cmd *cmd, gint64 expire) +{ + struct rspamd_fuzzy_reply rep = {0, 0, 0, 0.0}; + const struct rspamd_fuzzy_shingle_cmd *shcmd; + int rc; + gint64 timestamp; + gint64 shingle_values[RSPAMD_SHINGLE_SIZE], i, sel_id, cur_id, + cur_cnt, max_cnt; + + if (backend == NULL) { + return rep; + } + + /* Try direct match first of all */ + rspamd_fuzzy_backend_sqlite_run_stmt (backend, TRUE, + RSPAMD_FUZZY_BACKEND_TRANSACTION_START); + rc = rspamd_fuzzy_backend_sqlite_run_stmt (backend, FALSE, + RSPAMD_FUZZY_BACKEND_CHECK, + cmd->digest); + + if (rc == SQLITE_OK) { + timestamp = sqlite3_column_int64 ( + prepared_stmts[RSPAMD_FUZZY_BACKEND_CHECK].stmt, 1); + if (time (NULL) - timestamp > expire) { + /* Expire element */ + msg_debug_fuzzy_backend ("requested hash has been expired"); + } + else { + rep.value = sqlite3_column_int64 ( + prepared_stmts[RSPAMD_FUZZY_BACKEND_CHECK].stmt, 0); + rep.prob = 1.0; + rep.flag = sqlite3_column_int ( + prepared_stmts[RSPAMD_FUZZY_BACKEND_CHECK].stmt, 2); + } + } + else if (cmd->shingles_count > 0) { + /* Fuzzy match */ + + rspamd_fuzzy_backend_sqlite_cleanup_stmt (backend, RSPAMD_FUZZY_BACKEND_CHECK); + shcmd = (const struct rspamd_fuzzy_shingle_cmd *)cmd; + + for (i = 0; i < RSPAMD_SHINGLE_SIZE; i ++) { + rc = rspamd_fuzzy_backend_sqlite_run_stmt (backend, FALSE, + RSPAMD_FUZZY_BACKEND_CHECK_SHINGLE, + shcmd->sgl.hashes[i], i); + if (rc == SQLITE_OK) { + shingle_values[i] = sqlite3_column_int64 ( + prepared_stmts[RSPAMD_FUZZY_BACKEND_CHECK_SHINGLE].stmt, + 0); + } + else { + shingle_values[i] = -1; + } + msg_debug_fuzzy_backend ("looking for shingle %L -> %L: %d", i, + shcmd->sgl.hashes[i], rc); + } + + rspamd_fuzzy_backend_sqlite_cleanup_stmt (backend, + RSPAMD_FUZZY_BACKEND_CHECK_SHINGLE); + + qsort (shingle_values, RSPAMD_SHINGLE_SIZE, sizeof (gint64), + rspamd_fuzzy_backend_sqlite_int64_cmp); + sel_id = -1; + cur_id = -1; + cur_cnt = 0; + max_cnt = 0; + + for (i = 0; i < RSPAMD_SHINGLE_SIZE; i ++) { + if (shingle_values[i] == -1) { + continue; + } + + /* We have some value here, so we need to check it */ + if (shingle_values[i] == cur_id) { + cur_cnt ++; + } + else { + cur_id = shingle_values[i]; + if (cur_cnt >= max_cnt) { + max_cnt = cur_cnt; + sel_id = cur_id; + } + cur_cnt = 0; + } + } + + if (cur_cnt > max_cnt) { + max_cnt = cur_cnt; + } + + if (sel_id != -1) { + /* We have some id selected here */ + rep.prob = (float)max_cnt / (float)RSPAMD_SHINGLE_SIZE; + + if (rep.prob > 0.5) { + msg_debug_fuzzy_backend ( + "found fuzzy hash with probability %.2f", + rep.prob); + rc = rspamd_fuzzy_backend_sqlite_run_stmt (backend, FALSE, + RSPAMD_FUZZY_BACKEND_GET_DIGEST_BY_ID, sel_id); + if (rc == SQLITE_OK) { + timestamp = sqlite3_column_int64 ( + prepared_stmts[RSPAMD_FUZZY_BACKEND_GET_DIGEST_BY_ID].stmt, + 2); + if (time (NULL) - timestamp > expire) { + /* Expire element */ + msg_debug_fuzzy_backend ( + "requested hash has been expired"); + rep.prob = 0.0; + } + else { + rep.value = sqlite3_column_int64 ( + prepared_stmts[RSPAMD_FUZZY_BACKEND_GET_DIGEST_BY_ID].stmt, + 1); + rep.flag = sqlite3_column_int ( + prepared_stmts[RSPAMD_FUZZY_BACKEND_GET_DIGEST_BY_ID].stmt, + 3); + } + } + } + else { + /* Otherwise we assume that as error */ + rep.value = 0; + } + + rspamd_fuzzy_backend_sqlite_cleanup_stmt (backend, + RSPAMD_FUZZY_BACKEND_GET_DIGEST_BY_ID); + } + } + + rspamd_fuzzy_backend_sqlite_cleanup_stmt (backend, RSPAMD_FUZZY_BACKEND_CHECK); + rspamd_fuzzy_backend_sqlite_run_stmt (backend, TRUE, + RSPAMD_FUZZY_BACKEND_TRANSACTION_COMMIT); + + return rep; +} + +gboolean +rspamd_fuzzy_backend_sqlite_prepare_update (struct rspamd_fuzzy_backend_sqlite *backend, + const gchar *source) +{ + gint rc; + + if (backend == NULL) { + return FALSE; + } + + rc = rspamd_fuzzy_backend_sqlite_run_stmt (backend, TRUE, + RSPAMD_FUZZY_BACKEND_TRANSACTION_START); + + if (rc != SQLITE_OK) { + msg_warn_fuzzy_backend ("cannot start transaction for updates: %s", + sqlite3_errmsg (backend->db)); + return FALSE; + } + + return TRUE; +} + +gboolean +rspamd_fuzzy_backend_sqlite_add (struct rspamd_fuzzy_backend_sqlite *backend, + const struct rspamd_fuzzy_cmd *cmd) +{ + int rc, i; + gint64 id, flag; + const struct rspamd_fuzzy_shingle_cmd *shcmd; + + if (backend == NULL) { + return FALSE; + } + + rc = rspamd_fuzzy_backend_sqlite_run_stmt (backend, FALSE, + RSPAMD_FUZZY_BACKEND_CHECK, + cmd->digest); + + if (rc == SQLITE_OK) { + /* Check flag */ + flag = sqlite3_column_int64 ( + prepared_stmts[RSPAMD_FUZZY_BACKEND_CHECK].stmt, + 2); + rspamd_fuzzy_backend_sqlite_cleanup_stmt (backend, RSPAMD_FUZZY_BACKEND_CHECK); + + if (flag == cmd->flag) { + /* We need to increase weight */ + rc = rspamd_fuzzy_backend_sqlite_run_stmt (backend, TRUE, + RSPAMD_FUZZY_BACKEND_UPDATE, + (gint64) cmd->value, + cmd->digest); + if (rc != SQLITE_OK) { + msg_warn_fuzzy_backend ("cannot update hash to %d -> " + "%*xs: %s", (gint) cmd->flag, + (gint) sizeof (cmd->digest), cmd->digest, + sqlite3_errmsg (backend->db)); + } + } + else { + /* We need to relearn actually */ + + rc = rspamd_fuzzy_backend_sqlite_run_stmt (backend, TRUE, + RSPAMD_FUZZY_BACKEND_UPDATE_FLAG, + (gint64) cmd->value, + (gint64) cmd->flag, + cmd->digest); + + if (rc != SQLITE_OK) { + msg_warn_fuzzy_backend ("cannot update hash to %d -> " + "%*xs: %s", (gint) cmd->flag, + (gint) sizeof (cmd->digest), cmd->digest, + sqlite3_errmsg (backend->db)); + } + } + } + else { + rspamd_fuzzy_backend_sqlite_cleanup_stmt (backend, RSPAMD_FUZZY_BACKEND_CHECK); + rc = rspamd_fuzzy_backend_sqlite_run_stmt (backend, FALSE, + RSPAMD_FUZZY_BACKEND_INSERT, + (gint) cmd->flag, + cmd->digest, + (gint64) cmd->value); + + if (rc == SQLITE_OK) { + if (cmd->shingles_count > 0) { + id = sqlite3_last_insert_rowid (backend->db); + shcmd = (const struct rspamd_fuzzy_shingle_cmd *) cmd; + + for (i = 0; i < RSPAMD_SHINGLE_SIZE; i++) { + rc = rspamd_fuzzy_backend_sqlite_run_stmt (backend, TRUE, + RSPAMD_FUZZY_BACKEND_INSERT_SHINGLE, + shcmd->sgl.hashes[i], (gint64)i, id); + msg_debug_fuzzy_backend ("add shingle %d -> %L: %L", + i, + shcmd->sgl.hashes[i], + id); + + if (rc != SQLITE_OK) { + msg_warn_fuzzy_backend ("cannot add shingle %d -> " + "%L: %L: %s", i, + shcmd->sgl.hashes[i], + id, sqlite3_errmsg (backend->db)); + } + } + } + } + else { + msg_warn_fuzzy_backend ("cannot add hash to %d -> " + "%*xs: %s", (gint)cmd->flag, + (gint)sizeof (cmd->digest), cmd->digest, + sqlite3_errmsg (backend->db)); + } + + rspamd_fuzzy_backend_sqlite_cleanup_stmt (backend, + RSPAMD_FUZZY_BACKEND_INSERT); + } + + return (rc == SQLITE_OK); +} + +gboolean +rspamd_fuzzy_backend_sqlite_finish_update (struct rspamd_fuzzy_backend_sqlite *backend, + const gchar *source, gboolean version_bump) +{ + gint rc = SQLITE_OK, wal_frames, wal_checkpointed, ver; + + /* Get and update version */ + if (version_bump) { + ver = rspamd_fuzzy_backend_sqlite_version (backend, source); + ++ver; + + rc = rspamd_fuzzy_backend_sqlite_run_stmt (backend, TRUE, + RSPAMD_FUZZY_BACKEND_SET_VERSION, + (gint64)ver, (gint64)time (NULL), source); + } + + if (rc == SQLITE_OK) { + rc = rspamd_fuzzy_backend_sqlite_run_stmt (backend, TRUE, + RSPAMD_FUZZY_BACKEND_TRANSACTION_COMMIT); + + if (rc != SQLITE_OK) { + msg_warn_fuzzy_backend ("cannot commit updates: %s", + sqlite3_errmsg (backend->db)); + rspamd_fuzzy_backend_sqlite_run_stmt (backend, TRUE, + RSPAMD_FUZZY_BACKEND_TRANSACTION_ROLLBACK); + return FALSE; + } + else { + if (!rspamd_sqlite3_sync (backend->db, &wal_frames, &wal_checkpointed)) { + msg_warn_fuzzy_backend ("cannot commit checkpoint: %s", + sqlite3_errmsg (backend->db)); + } + else if (wal_checkpointed > 0) { + msg_info_fuzzy_backend ("total number of frames in the wal file: " + "%d, checkpointed: %d", wal_frames, wal_checkpointed); + } + } + } + else { + msg_warn_fuzzy_backend ("cannot update version for %s: %s", source, + sqlite3_errmsg (backend->db)); + rspamd_fuzzy_backend_sqlite_run_stmt (backend, TRUE, + RSPAMD_FUZZY_BACKEND_TRANSACTION_ROLLBACK); + return FALSE; + } + + return TRUE; +} + +gboolean +rspamd_fuzzy_backend_sqlite_del (struct rspamd_fuzzy_backend_sqlite *backend, + const struct rspamd_fuzzy_cmd *cmd) +{ + int rc = -1; + + if (backend == NULL) { + return FALSE; + } + + rc = rspamd_fuzzy_backend_sqlite_run_stmt (backend, FALSE, + RSPAMD_FUZZY_BACKEND_CHECK, + cmd->digest); + + if (rc == SQLITE_OK) { + rspamd_fuzzy_backend_sqlite_cleanup_stmt (backend, RSPAMD_FUZZY_BACKEND_CHECK); + + rc = rspamd_fuzzy_backend_sqlite_run_stmt (backend, TRUE, + RSPAMD_FUZZY_BACKEND_DELETE, + cmd->digest); + if (rc != SQLITE_OK) { + msg_warn_fuzzy_backend ("cannot update hash to %d -> " + "%*xs: %s", (gint) cmd->flag, + (gint) sizeof (cmd->digest), cmd->digest, + sqlite3_errmsg (backend->db)); + } + } + else { + /* Hash is missing */ + rspamd_fuzzy_backend_sqlite_cleanup_stmt (backend, RSPAMD_FUZZY_BACKEND_CHECK); + } + + return (rc == SQLITE_OK); +} + +gboolean +rspamd_fuzzy_backend_sqlite_sync (struct rspamd_fuzzy_backend_sqlite *backend, + gint64 expire, + gboolean clean_orphaned) +{ + struct orphaned_shingle_elt { + gint64 value; + gint64 number; + }; + + /* Do not do more than 5k ops per step */ + const guint64 max_changes = 5000; + gboolean ret = FALSE; + gint64 expire_lim, expired; + gint rc, i, orphaned_cnt = 0; + GError *err = NULL; + static const gchar orphaned_shingles[] = "SELECT shingles.value,shingles.number " + "FROM shingles " + "LEFT JOIN digests ON " + "shingles.digest_id=digests.id WHERE " + "digests.id IS NULL;"; + sqlite3_stmt *stmt; + GArray *orphaned; + struct orphaned_shingle_elt orphaned_elt, *pelt; + + + if (backend == NULL) { + return FALSE; + } + + /* Perform expire */ + if (expire > 0) { + expire_lim = time (NULL) - expire; + + if (expire_lim > 0) { + ret = rspamd_fuzzy_backend_sqlite_run_stmt (backend, TRUE, + RSPAMD_FUZZY_BACKEND_TRANSACTION_START); + + if (ret == SQLITE_OK) { + + rc = rspamd_fuzzy_backend_sqlite_run_stmt (backend, FALSE, + RSPAMD_FUZZY_BACKEND_EXPIRE, expire_lim, max_changes); + + if (rc == SQLITE_OK) { + expired = sqlite3_changes (backend->db); + + if (expired > 0) { + backend->expired += expired; + msg_info_fuzzy_backend ("expired %L hashes", expired); + } + } + else { + msg_warn_fuzzy_backend ( + "cannot execute expired statement: %s", + sqlite3_errmsg (backend->db)); + } + + rspamd_fuzzy_backend_sqlite_cleanup_stmt (backend, + RSPAMD_FUZZY_BACKEND_EXPIRE); + + ret = rspamd_fuzzy_backend_sqlite_run_stmt (backend, TRUE, + RSPAMD_FUZZY_BACKEND_TRANSACTION_COMMIT); + + if (ret != SQLITE_OK) { + rspamd_fuzzy_backend_sqlite_run_stmt (backend, TRUE, + RSPAMD_FUZZY_BACKEND_TRANSACTION_ROLLBACK); + } + } + if (ret != SQLITE_OK) { + msg_warn_fuzzy_backend ("cannot expire db: %s", + sqlite3_errmsg (backend->db)); + } + } + } + + /* Cleanup database */ + if (clean_orphaned) { + ret = rspamd_fuzzy_backend_sqlite_run_stmt (backend, TRUE, + RSPAMD_FUZZY_BACKEND_TRANSACTION_START); + + if (ret == SQLITE_OK) { + if ((rc = sqlite3_prepare_v2 (backend->db, + orphaned_shingles, + -1, + &stmt, + NULL)) != SQLITE_OK) { + msg_warn_fuzzy_backend ("cannot cleanup shingles: %s", + sqlite3_errmsg (backend->db)); + } + else { + orphaned = g_array_new (FALSE, + FALSE, + sizeof (struct orphaned_shingle_elt)); + + while (sqlite3_step (stmt) == SQLITE_ROW) { + orphaned_elt.value = sqlite3_column_int64 (stmt, 0); + orphaned_elt.number = sqlite3_column_int64 (stmt, 1); + g_array_append_val (orphaned, orphaned_elt); + + if (orphaned->len > max_changes) { + break; + } + } + + sqlite3_finalize (stmt); + orphaned_cnt = orphaned->len; + + if (orphaned_cnt > 0) { + msg_info_fuzzy_backend ( + "going to delete %ud orphaned shingles", + orphaned_cnt); + /* Need to delete orphaned elements */ + for (i = 0; i < (gint) orphaned_cnt; i++) { + pelt = &g_array_index (orphaned, + struct orphaned_shingle_elt, + i); + rspamd_fuzzy_backend_sqlite_run_stmt (backend, TRUE, + RSPAMD_FUZZY_BACKEND_DELETE_ORPHANED, + pelt->value, pelt->number); + } + } + + + g_array_free (orphaned, TRUE); + } + + ret = rspamd_fuzzy_backend_sqlite_run_stmt (backend, TRUE, + RSPAMD_FUZZY_BACKEND_TRANSACTION_COMMIT); + + if (ret == SQLITE_OK) { + msg_info_fuzzy_backend ( + "deleted %ud orphaned shingles", + orphaned_cnt); + } + else { + msg_warn_fuzzy_backend ( + "cannot synchronize fuzzy backend: %e", + err); + rspamd_fuzzy_backend_sqlite_run_stmt (backend, TRUE, + RSPAMD_FUZZY_BACKEND_TRANSACTION_ROLLBACK); + } + } + } + + return ret; +} + + +void +rspamd_fuzzy_backend_sqlite_close (struct rspamd_fuzzy_backend_sqlite *backend) +{ + if (backend != NULL) { + if (backend->db != NULL) { + rspamd_fuzzy_backend_sqlite_close_stmts (backend); + sqlite3_close (backend->db); + } + + if (backend->path != NULL) { + g_free (backend->path); + } + + if (backend->pool) { + rspamd_mempool_delete (backend->pool); + } + + g_slice_free1 (sizeof (*backend), backend); + } +} + + +gsize +rspamd_fuzzy_backend_sqlite_count (struct rspamd_fuzzy_backend_sqlite *backend) +{ + if (backend) { + if (rspamd_fuzzy_backend_sqlite_run_stmt (backend, FALSE, + RSPAMD_FUZZY_BACKEND_COUNT) == SQLITE_OK) { + backend->count = sqlite3_column_int64 ( + prepared_stmts[RSPAMD_FUZZY_BACKEND_COUNT].stmt, 0); + } + + rspamd_fuzzy_backend_sqlite_cleanup_stmt (backend, RSPAMD_FUZZY_BACKEND_COUNT); + + return backend->count; + } + + return 0; +} + +gint +rspamd_fuzzy_backend_sqlite_version (struct rspamd_fuzzy_backend_sqlite *backend, + const gchar *source) +{ + gint ret = -1; + + if (backend) { + if (rspamd_fuzzy_backend_sqlite_run_stmt (backend, FALSE, + RSPAMD_FUZZY_BACKEND_VERSION, source) == SQLITE_OK) { + ret = sqlite3_column_int64 ( + prepared_stmts[RSPAMD_FUZZY_BACKEND_VERSION].stmt, 0); + } + + rspamd_fuzzy_backend_sqlite_cleanup_stmt (backend, RSPAMD_FUZZY_BACKEND_VERSION); + } + + return ret; +} + +gsize +rspamd_fuzzy_backend_sqlite_expired (struct rspamd_fuzzy_backend_sqlite *backend) +{ + return backend != NULL ? backend->expired : 0; +} + +const gchar * +rspamd_fuzzy_sqlite_backend_id (struct rspamd_fuzzy_backend_sqlite *backend) +{ + return backend != NULL ? backend->id : 0; +} diff --git a/src/libserver/fuzzy_backend_sqlite.h b/src/libserver/fuzzy_backend_sqlite.h new file mode 100644 index 000000000..032d1e3cd --- /dev/null +++ b/src/libserver/fuzzy_backend_sqlite.h @@ -0,0 +1,98 @@ +/*- + * Copyright 2016 Vsevolod Stakhov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef FUZZY_BACKEND_H_ +#define FUZZY_BACKEND_H_ + +#include "config.h" +#include "fuzzy_wire.h" + + +struct rspamd_fuzzy_backend_sqlite; + +/** + * Open fuzzy backend + * @param path file to open (legacy file will be converted automatically) + * @param err error pointer + * @return backend structure or NULL + */ +struct rspamd_fuzzy_backend_sqlite *rspamd_fuzzy_backend_sqlite_open (const gchar *path, + gboolean vacuum, + GError **err); + +/** + * Check specified fuzzy in the backend + * @param backend + * @param cmd + * @return reply with probability and weight + */ +struct rspamd_fuzzy_reply rspamd_fuzzy_backend_sqlite_check ( + struct rspamd_fuzzy_backend_sqlite *backend, + const struct rspamd_fuzzy_cmd *cmd, + gint64 expire); + +/** + * Prepare storage for updates (by starting transaction) + */ +gboolean rspamd_fuzzy_backend_sqlite_prepare_update (struct rspamd_fuzzy_backend_sqlite *backend, + const gchar *source); + +/** + * Add digest to the database + * @param backend + * @param cmd + * @return + */ +gboolean rspamd_fuzzy_backend_sqlite_add (struct rspamd_fuzzy_backend_sqlite *backend, + const struct rspamd_fuzzy_cmd *cmd); + +/** + * Delete digest from the database + * @param backend + * @param cmd + * @return + */ +gboolean rspamd_fuzzy_backend_sqlite_del ( + struct rspamd_fuzzy_backend_sqlite *backend, + const struct rspamd_fuzzy_cmd *cmd); + +/** + * Commit updates to storage + */ +gboolean rspamd_fuzzy_backend_sqlite_finish_update (struct rspamd_fuzzy_backend_sqlite *backend, + const gchar *source, gboolean version_bump); + +/** + * Sync storage + * @param backend + * @return + */ +gboolean rspamd_fuzzy_backend_sqlite_sync (struct rspamd_fuzzy_backend_sqlite *backend, + gint64 expire, + gboolean clean_orphaned); + +/** + * Close storage + * @param backend + */ +void rspamd_fuzzy_backend_sqlite_close (struct rspamd_fuzzy_backend_sqlite *backend); + +gsize rspamd_fuzzy_backend_sqlite_count (struct rspamd_fuzzy_backend_sqlite *backend); +gint rspamd_fuzzy_backend_sqlite_version (struct rspamd_fuzzy_backend_sqlite *backend, const gchar *source); +gsize rspamd_fuzzy_backend_sqlite_expired (struct rspamd_fuzzy_backend_sqlite *backend); + +const gchar * rspamd_fuzzy_sqlite_backend_id (struct rspamd_fuzzy_backend_sqlite *backend); + +#endif /* FUZZY_BACKEND_H_ */ diff --git a/src/fuzzy_storage.h b/src/libserver/fuzzy_wire.h index a9c3f174b..fb0cbf3ad 100644 --- a/src/fuzzy_storage.h +++ b/src/libserver/fuzzy_wire.h @@ -84,4 +84,12 @@ struct rspamd_fuzzy_stat_entry { guint32 fuzzy_cnt; }; +struct fuzzy_peer_cmd { + gboolean is_shingle; + union { + struct rspamd_fuzzy_cmd normal; + struct rspamd_fuzzy_shingle_cmd shingle; + } cmd; +}; + #endif diff --git a/src/plugins/fuzzy_check.c b/src/plugins/fuzzy_check.c index 9952c26d9..bc04b753b 100644 --- a/src/plugins/fuzzy_check.c +++ b/src/plugins/fuzzy_check.c @@ -35,7 +35,7 @@ #include "libutil/map.h" #include "libmime/images.h" #include "libserver/worker_util.h" -#include "fuzzy_storage.h" +#include "fuzzy_wire.h" #include "utlist.h" #include "cryptobox.h" #include "ottery.h" diff --git a/src/rspamd.c b/src/rspamd.c index 3ddee3ff4..eb2f37ecb 100644 --- a/src/rspamd.c +++ b/src/rspamd.c @@ -16,7 +16,6 @@ #include "config.h" #include "rspamd.h" #include "libutil/map.h" -#include "fuzzy_storage.h" #include "lua/lua_common.h" #include "libserver/worker_util.h" #include "libserver/rspamd_control.h" |