summaryrefslogtreecommitdiffstats
path: root/src/fuzzy_storage.c
diff options
context:
space:
mode:
authorVsevolod Stakhov <vsevolod@highsecure.ru>2016-09-02 14:32:58 +0100
committerVsevolod Stakhov <vsevolod@highsecure.ru>2016-09-02 14:32:58 +0100
commite8abb0bd7ccd7412402e6ecd44c0232d2d18e2a5 (patch)
tree3525b81285a2c8b47a9bfc9cc11e994daab5e307 /src/fuzzy_storage.c
parentb493b3628def373e40dcdedb21869dc566b3aea1 (diff)
downloadrspamd-e8abb0bd7ccd7412402e6ecd44c0232d2d18e2a5.tar.gz
rspamd-e8abb0bd7ccd7412402e6ecd44c0232d2d18e2a5.zip
[Rework] Adopt fuzzy storage for abstract backend
Diffstat (limited to 'src/fuzzy_storage.c')
-rw-r--r--src/fuzzy_storage.c470
1 files changed, 259 insertions, 211 deletions
diff --git a/src/fuzzy_storage.c b/src/fuzzy_storage.c
index 0d29cd197..d663a89e4 100644
--- a/src/fuzzy_storage.c
+++ b/src/fuzzy_storage.c
@@ -36,15 +36,11 @@
#include "libutil/http_private.h"
#include "unix-std.h"
-/* This number is used as expire time in seconds for cache items (2 days) */
-#define DEFAULT_EXPIRE 172800L
/* Resync value in seconds */
#define DEFAULT_SYNC_TIMEOUT 60.0
#define DEFAULT_KEYPAIR_CACHE_SIZE 512
#define DEFAULT_MASTER_TIMEOUT 10.0
-#define INVALID_NODE_TIME (guint64) - 1
-
static const gchar *local_db_name = "local";
#define msg_err_fuzzy_update(...) rspamd_default_log_function (G_LOG_LEVEL_CRITICAL, \
@@ -77,10 +73,6 @@ worker_t fuzzy_worker = {
RSPAMD_WORKER_VER /* Version info */
};
-/* For evtimer */
-static struct timeval tmv;
-static struct event tev;
-
struct fuzzy_global_stat {
guint64 fuzzy_hashes;
/**< number of fuzzy hashes stored */
@@ -139,7 +131,7 @@ struct rspamd_fuzzy_storage_ctx {
gboolean encrypted_only;
struct rspamd_keypair_cache *keypair_cache;
rspamd_lru_hash_t *errors_ips;
- struct rspamd_fuzzy_backend_sqlite *backend;
+ struct rspamd_fuzzy_backend *backend;
GQueue *updates_pending;
struct rspamd_dns_resolver *resolver;
struct rspamd_config *cfg;
@@ -165,6 +157,7 @@ struct fuzzy_session {
} cmd;
struct rspamd_fuzzy_encrypted_reply reply;
+ struct fuzzy_key_stat *ip_stat;
enum rspamd_fuzzy_epoch epoch;
enum fuzzy_cmd_type cmd_type;
@@ -191,7 +184,10 @@ struct fuzzy_master_update_session {
const gchar *name;
gchar uid[16];
struct rspamd_http_connection *conn;
+ struct rspamd_http_message *msg;
struct rspamd_fuzzy_storage_ctx *ctx;
+ const gchar *src;
+ gchar *psrc;
rspamd_inet_addr_t *addr;
gboolean replied;
gint sock;
@@ -242,6 +238,14 @@ fuzzy_key_dtor (gpointer p)
g_slice_free1 (sizeof (*key), key);
}
+static void
+fuzzy_count_callback (guint64 count, void *ud)
+{
+ struct rspamd_fuzzy_storage_ctx *ctx = ud;
+
+ ctx->stat.fuzzy_hashes = count;
+}
+
struct fuzzy_slave_connection {
struct rspamd_cryptobox_keypair *local_key;
struct rspamd_cryptobox_pubkey *remote_key;
@@ -266,19 +270,34 @@ fuzzy_mirror_close_connection (struct fuzzy_slave_connection *conn)
}
}
+struct rspamd_fuzzy_updates_cbdata {
+ struct rspamd_fuzzy_storage_ctx *ctx;
+ struct rspamd_http_message *msg;
+ struct fuzzy_slave_connection *conn;
+ struct rspamd_fuzzy_mirror *m;
+};
+
static void
-fuzzy_mirror_updates_to_http (struct rspamd_fuzzy_storage_ctx *ctx,
- struct rspamd_http_message *msg)
+fuzzy_mirror_updates_version_cb (guint64 rev64, void *ud)
{
+ struct rspamd_fuzzy_updates_cbdata *cbdata = ud;
GList *cur;
struct fuzzy_peer_cmd *io_cmd;
- guint32 len;
- guint32 rev;
+ guint32 rev32 = rev64, len;
const gchar *p;
rspamd_fstring_t *reply;
+ struct fuzzy_slave_connection *conn;
+ struct rspamd_fuzzy_storage_ctx *ctx;
+ struct rspamd_http_message *msg;
+ struct rspamd_fuzzy_mirror *m;
+ struct timeval tv;
- rev = rspamd_fuzzy_backend_sqlite_version (ctx->backend, local_db_name);
- rev = GUINT32_TO_LE (rev);
+ conn = cbdata->conn;
+ ctx = cbdata->ctx;
+ msg = cbdata->msg;
+ m = cbdata->m;
+ g_slice_free1 (sizeof (*cbdata), cbdata);
+ rev32 = GUINT32_TO_LE (rev32);
len = sizeof (guint32) * 2; /* revision + last chunk */
for (cur = ctx->updates_pending->head; cur != NULL; cur = g_list_next (cur)) {
@@ -295,8 +314,8 @@ fuzzy_mirror_updates_to_http (struct rspamd_fuzzy_storage_ctx *ctx,
}
reply = rspamd_fstring_sized_new (len);
- reply = rspamd_fstring_append (reply, (const char *)&rev,
- sizeof (rev));
+ reply = rspamd_fstring_append (reply, (const char *)&rev32,
+ sizeof (rev32));
for (cur = ctx->updates_pending->head; cur != NULL; cur = g_list_next (cur)) {
io_cmd = cur->data;
@@ -320,6 +339,30 @@ fuzzy_mirror_updates_to_http (struct rspamd_fuzzy_storage_ctx *ctx,
len = 0;
reply = rspamd_fstring_append (reply, (const char *)&len, sizeof (len));
rspamd_http_message_set_body_from_fstring_steal (msg, reply);
+ double_to_tv (ctx->sync_timeout, &tv);
+ rspamd_http_connection_write_message (conn->http_conn,
+ msg, NULL, NULL, conn,
+ conn->sock,
+ &tv, ctx->ev_base);
+ msg_info ("send update request to %s", m->name);
+}
+
+static void
+fuzzy_mirror_updates_to_http (struct rspamd_fuzzy_mirror *m,
+ struct fuzzy_slave_connection *conn,
+ struct rspamd_fuzzy_storage_ctx *ctx,
+ struct rspamd_http_message *msg)
+{
+
+ struct rspamd_fuzzy_updates_cbdata *cbdata;
+
+ cbdata = g_slice_alloc (sizeof (*cbdata));
+ cbdata->ctx = ctx;
+ cbdata->msg = msg;
+ cbdata->conn = conn;
+ cbdata->m = m;
+ rspamd_fuzzy_backend_version (ctx->backend, local_db_name,
+ fuzzy_mirror_updates_version_cb, cbdata);
}
static void
@@ -353,7 +396,6 @@ rspamd_fuzzy_send_update_mirror (struct rspamd_fuzzy_storage_ctx *ctx,
{
struct fuzzy_slave_connection *conn;
struct rspamd_http_message *msg;
- struct timeval tv;
conn = g_slice_alloc0 (sizeof (*conn));
conn->up = rspamd_upstream_get (m->u,
@@ -389,84 +431,85 @@ rspamd_fuzzy_send_update_mirror (struct rspamd_fuzzy_storage_ctx *ctx,
rspamd_http_connection_set_key (conn->http_conn,
ctx->sync_keypair);
msg->peer_key = rspamd_pubkey_ref (m->key);
- double_to_tv (ctx->sync_timeout, &tv);
- fuzzy_mirror_updates_to_http (ctx, msg);
+ fuzzy_mirror_updates_to_http (m, conn, ctx, msg);
+}
- rspamd_http_connection_write_message (conn->http_conn,
- msg, NULL, NULL, conn,
- conn->sock,
- &tv, ctx->ev_base);
- msg_info ("send update request to %s", m->name);
+struct rspamd_updates_cbdata {
+ struct rspamd_fuzzy_storage_ctx *ctx;
+ const gchar *source;
+};
+
+static void
+fuzzy_update_version_callback (guint64 ver, void *ud)
+{
+ msg_info ("updated fuzzy storage from %s: version: %d",
+ (const char *)ud, (gint)ver);
}
static void
-rspamd_fuzzy_process_updates_queue (struct rspamd_fuzzy_storage_ctx *ctx,
- const gchar *source)
+rspamd_fuzzy_updates_cb (gboolean success, void *ud)
{
- GList *cur;
- struct fuzzy_peer_cmd *io_cmd;
- struct rspamd_fuzzy_cmd *cmd;
- gpointer ptr;
+ struct rspamd_updates_cbdata *cbdata = ud;
struct rspamd_fuzzy_mirror *m;
guint nupdates = 0, i;
+ struct rspamd_fuzzy_storage_ctx *ctx;
+ const gchar *source;
+ GList *cur;
+ struct fuzzy_peer_cmd *io_cmd;
- if (ctx->updates_pending &&
- g_queue_get_length (ctx->updates_pending) > 0 &&
- rspamd_fuzzy_backend_sqlite_prepare_update (ctx->backend, source)) {
- cur = ctx->updates_pending->head;
+ ctx = cbdata->ctx;
+ source = cbdata->source;
- while (cur) {
- io_cmd = cur->data;
+ if (success) {
+ rspamd_fuzzy_backend_count (ctx->backend, fuzzy_count_callback, ctx);
- if (io_cmd->is_shingle) {
- cmd = &io_cmd->cmd.shingle.basic;
- ptr = &io_cmd->cmd.shingle;
- }
- else {
- cmd = &io_cmd->cmd.normal;
- ptr = &io_cmd->cmd.normal;
- }
+ if (nupdates > 0) {
+ for (i = 0; i < ctx->mirrors->len; i ++) {
+ m = g_ptr_array_index (ctx->mirrors, i);
- if (cmd->cmd == FUZZY_WRITE) {
- rspamd_fuzzy_backend_sqlite_add (ctx->backend, ptr);
- }
- else {
- rspamd_fuzzy_backend_sqlite_del (ctx->backend, ptr);
+ rspamd_fuzzy_send_update_mirror (ctx, m);
}
+ }
+
+ /* Clear updates */
+ cur = ctx->updates_pending->head;
- nupdates++;
+ while (cur) {
+ io_cmd = cur->data;
+ g_slice_free1 (sizeof (*io_cmd), io_cmd);
cur = g_list_next (cur);
}
- if (rspamd_fuzzy_backend_sqlite_finish_update (ctx->backend, source, nupdates > 0)) {
- ctx->stat.fuzzy_hashes = rspamd_fuzzy_backend_sqlite_count (ctx->backend);
+ g_queue_clear (ctx->updates_pending);
+ rspamd_fuzzy_backend_version (ctx->backend, source,
+ fuzzy_update_version_callback, (void *)source);
- if (nupdates > 0) {
- for (i = 0; i < ctx->mirrors->len; i ++) {
- m = g_ptr_array_index (ctx->mirrors, i);
+ }
+ else {
+ msg_err ("cannot commit update transaction to fuzzy backend, "
+ "%ud updates are still pending",
+ g_queue_get_length (ctx->updates_pending));
+ }
- rspamd_fuzzy_send_update_mirror (ctx, m);
- }
- }
+ g_slice_free1 (sizeof (*cbdata), cbdata);
+}
- /* Clear updates */
- cur = ctx->updates_pending->head;
+static void
+rspamd_fuzzy_process_updates_queue (struct rspamd_fuzzy_storage_ctx *ctx,
+ const gchar *source)
+{
+
+ struct rspamd_updates_cbdata *cbdata;
+
+ if (ctx->updates_pending &&
+ g_queue_get_length (ctx->updates_pending) > 0) {
+ cbdata = g_slice_alloc (sizeof (*cbdata));
+ cbdata->ctx = ctx;
+ cbdata->source = source;
+ rspamd_fuzzy_backend_process_updates (ctx->backend, ctx->updates_pending,
+ source, rspamd_fuzzy_updates_cb, cbdata);
- while (cur) {
- io_cmd = cur->data;
- g_slice_free1 (sizeof (*io_cmd), io_cmd);
- cur = g_list_next (cur);
- }
- g_queue_clear (ctx->updates_pending);
- msg_info ("updated fuzzy storage: %ud updates processed, version: %d",
- nupdates, rspamd_fuzzy_backend_sqlite_version (ctx->backend, source));
- }
- else {
- msg_err ("cannot commit update transaction to fuzzy backend, "
- "%ud updates are still pending",
- g_queue_get_length (ctx->updates_pending));
- }
}
else if (ctx->updates_pending &&
g_queue_get_length (ctx->updates_pending) > 0) {
@@ -599,6 +642,77 @@ rspamd_fuzzy_update_stats (struct rspamd_fuzzy_storage_ctx *ctx,
}
static void
+rspamd_fuzzy_make_reply (struct rspamd_fuzzy_cmd *cmd,
+ struct rspamd_fuzzy_reply *result,
+ struct fuzzy_session *session,
+ gboolean encrypted, gboolean is_shingle)
+{
+ if (cmd) {
+ result->tag = cmd->tag;
+
+ memcpy (&session->reply.rep, result, sizeof (*result));
+
+ rspamd_fuzzy_update_stats (session->ctx,
+ session->epoch,
+ result->prob > 0.5,
+ is_shingle,
+ session->key_stat,
+ session->ip_stat,
+ cmd->cmd,
+ result->value);
+
+ if (encrypted) {
+ /* We need also to encrypt reply */
+ ottery_rand_bytes (session->reply.hdr.nonce,
+ sizeof (session->reply.hdr.nonce));
+ rspamd_cryptobox_encrypt_nm_inplace ((guchar *)&session->reply.rep,
+ sizeof (session->reply.rep),
+ session->reply.hdr.nonce,
+ session->nm,
+ session->reply.hdr.mac,
+ RSPAMD_CRYPTOBOX_MODE_25519);
+ }
+ }
+
+ rspamd_fuzzy_write_reply (session);
+}
+
+static void
+rspamd_fuzzy_check_callback (struct rspamd_fuzzy_reply *result, void *ud)
+{
+ struct fuzzy_session *session = ud;
+ gboolean encrypted = FALSE, is_shingle = FALSE;
+ struct rspamd_fuzzy_cmd *cmd = NULL;
+ gsize up_len = 0;
+
+ switch (session->cmd_type) {
+ case CMD_NORMAL:
+ cmd = &session->cmd.normal;
+ up_len = sizeof (session->cmd.normal);
+ break;
+ case CMD_SHINGLE:
+ cmd = &session->cmd.shingle.basic;
+ up_len = sizeof (session->cmd.shingle);
+ is_shingle = TRUE;
+ break;
+ case CMD_ENCRYPTED_NORMAL:
+ cmd = &session->cmd.enc_normal.cmd;
+ up_len = sizeof (session->cmd.normal);
+ encrypted = TRUE;
+ break;
+ case CMD_ENCRYPTED_SHINGLE:
+ cmd = &session->cmd.enc_shingle.cmd.basic;
+ up_len = sizeof (session->cmd.shingle);
+ encrypted = TRUE;
+ is_shingle = TRUE;
+ break;
+ }
+
+ rspamd_fuzzy_make_reply (cmd, result, session, encrypted, is_shingle);
+ REF_RELEASE (session);
+}
+
+static void
rspamd_fuzzy_process_command (struct fuzzy_session *session)
{
gboolean encrypted = FALSE, is_shingle = FALSE;
@@ -637,14 +751,16 @@ rspamd_fuzzy_process_command (struct fuzzy_session *session)
if (G_UNLIKELY (cmd == NULL || up_len == 0)) {
result.value = 500;
result.prob = 0.0;
- goto reply;
+ rspamd_fuzzy_make_reply (cmd, &result, session, encrypted, is_shingle);
+ return;
}
if (session->ctx->encrypted_only && !encrypted) {
/* Do not accept unencrypted commands */
result.value = 403;
result.prob = 0.0;
- goto reply;
+ rspamd_fuzzy_make_reply (cmd, &result, session, encrypted, is_shingle);
+ return;
}
if (session->key_stat) {
@@ -657,17 +773,21 @@ rspamd_fuzzy_process_command (struct fuzzy_session *session)
rspamd_lru_hash_insert (session->key_stat->last_ips,
naddr, ip_stat, -1, 0);
}
+
+ session->ip_stat = ip_stat;
}
result.flag = cmd->flag;
if (cmd->cmd == FUZZY_CHECK) {
- result = rspamd_fuzzy_backend_sqlite_check (session->ctx->backend, cmd,
- session->ctx->expire);
+ REF_RETAIN (session);
+ rspamd_fuzzy_backend_check (session->ctx->backend, cmd,
+ rspamd_fuzzy_check_callback, session);
}
else if (cmd->cmd == FUZZY_STAT) {
result.prob = 1.0;
result.value = 0;
- result.flag = rspamd_fuzzy_backend_sqlite_count (session->ctx->backend);
+ result.flag = session->ctx->stat.fuzzy_hashes;
+ rspamd_fuzzy_make_reply (cmd, &result, session, encrypted, is_shingle);
}
else {
if (rspamd_fuzzy_check_client (session)) {
@@ -703,36 +823,9 @@ rspamd_fuzzy_process_command (struct fuzzy_session *session)
result.value = 403;
result.prob = 0.0;
}
- }
-
-reply:
- if (cmd) {
- result.tag = cmd->tag;
- memcpy (&session->reply.rep, &result, sizeof (result));
-
- rspamd_fuzzy_update_stats (session->ctx,
- session->epoch,
- result.prob > 0.5,
- is_shingle,
- session->key_stat,
- ip_stat, cmd->cmd,
- result.value);
-
- if (encrypted) {
- /* We need also to encrypt reply */
- ottery_rand_bytes (session->reply.hdr.nonce,
- sizeof (session->reply.hdr.nonce));
- rspamd_cryptobox_encrypt_nm_inplace ((guchar *)&session->reply.rep,
- sizeof (session->reply.rep),
- session->reply.hdr.nonce,
- session->nm,
- session->reply.hdr.mac,
- RSPAMD_CRYPTOBOX_MODE_25519);
- }
+ rspamd_fuzzy_make_reply (cmd, &result, session, encrypted, is_shingle);
}
-
- rspamd_fuzzy_write_reply (session);
}
@@ -916,12 +1009,11 @@ rspamd_fuzzy_cmd_from_wire (guchar *buf, guint buflen, struct fuzzy_session *s)
static void
rspamd_fuzzy_mirror_process_update (struct fuzzy_master_update_session *session,
- struct rspamd_http_message *msg)
+ struct rspamd_http_message *msg, guint our_rev)
{
const guchar *p;
- gchar *src = NULL, *psrc;
gsize remain;
- gint32 revision, our_rev;
+ gint32 revision;
guint32 len = 0, cnt = 0;
struct fuzzy_peer_cmd cmd, *pcmd;
enum {
@@ -932,25 +1024,6 @@ rspamd_fuzzy_mirror_process_update (struct fuzzy_master_update_session *session,
GList *updates = NULL, *cur;
gpointer flag_ptr;
- if (!rspamd_http_message_get_body (msg, NULL) || !msg->url
- || msg->url->len == 0) {
- msg_err_fuzzy_update ("empty update message, not processing");
-
- return;
- }
-
- /* Detect source from url: /update_v1/<source>, so we look for the last '/' */
- remain = msg->url->len;
- psrc = rspamd_fstringdup (msg->url);
- src = psrc;
-
- while (remain--) {
- if (src[remain] == '/') {
- src = &src[remain + 1];
- break;
- }
- }
-
/*
* Message format:
* <uint32_le> - revision
@@ -965,13 +1038,11 @@ rspamd_fuzzy_mirror_process_update (struct fuzzy_master_update_session *session,
if (remain > sizeof (gint32) * 2) {
memcpy (&revision, p, sizeof (gint32));
revision = GINT32_TO_LE (revision);
- our_rev = rspamd_fuzzy_backend_sqlite_version (session->ctx->backend, src);
if (revision <= our_rev) {
msg_err_fuzzy_update ("remote revision: %d is older than ours: %d, "
"refusing update",
revision, our_rev);
- g_free (psrc);
return;
}
@@ -1076,7 +1147,7 @@ rspamd_fuzzy_mirror_process_update (struct fuzzy_master_update_session *session,
cur->data = NULL;
}
- rspamd_fuzzy_process_updates_queue (session->ctx, src);
+ rspamd_fuzzy_process_updates_queue (session->ctx, session->src);
msg_info_fuzzy_update ("processed updates from the master %s, "
"%ud operations processed,"
" revision: %d (local revision: %d)",
@@ -1084,8 +1155,6 @@ rspamd_fuzzy_mirror_process_update (struct fuzzy_master_update_session *session,
cnt, revision, our_rev);
err:
- g_free (psrc);
-
if (updates) {
/* We still need to clear queue */
for (cur = updates; cur != NULL; cur = g_list_next (cur)) {
@@ -1119,6 +1188,10 @@ rspamd_fuzzy_mirror_session_destroy (struct fuzzy_master_update_session *session
rspamd_http_connection_unref (session->conn);
rspamd_inet_address_destroy (session->addr);
close (session->sock);
+
+ if (session->psrc) {
+ g_free (session->psrc);
+ }
g_slice_free1 (sizeof (*session), session);
}
}
@@ -1151,6 +1224,15 @@ rspamd_fuzzy_mirror_send_reply (struct fuzzy_master_update_session *session,
session->ctx->ev_base);
}
+static void
+rspamd_fuzzy_update_version_callback (guint64 version, void *ud)
+{
+ struct fuzzy_master_update_session *session = ud;
+
+ rspamd_fuzzy_mirror_process_update (session, session->msg, version);
+ rspamd_fuzzy_mirror_send_reply (session, 200, "OK");
+}
+
static gint
rspamd_fuzzy_mirror_finish_handler (struct rspamd_http_connection *conn,
struct rspamd_http_message *msg)
@@ -1158,6 +1240,9 @@ rspamd_fuzzy_mirror_finish_handler (struct rspamd_http_connection *conn,
struct fuzzy_master_update_session *session = conn->ud;
const struct rspamd_cryptobox_pubkey *rk;
const gchar *err_str = NULL;
+ gchar *psrc;
+ const gchar *src = NULL;
+ gsize remain;
if (session->replied) {
rspamd_fuzzy_mirror_session_destroy (session);
@@ -1189,9 +1274,30 @@ rspamd_fuzzy_mirror_finish_handler (struct rspamd_http_connection *conn,
msg_warn_fuzzy_update ("no trusted key specified, accept any update from %s",
rspamd_inet_address_to_string (session->addr));
}
+ if (!rspamd_http_message_get_body (msg, NULL) || !msg->url
+ || msg->url->len == 0) {
+ msg_err_fuzzy_update ("empty update message, not processing");
- rspamd_fuzzy_mirror_process_update (session, msg);
- rspamd_fuzzy_mirror_send_reply (session, 200, "OK");
+ goto end;
+ }
+
+ /* Detect source from url: /update_v1/<source>, so we look for the last '/' */
+ remain = msg->url->len;
+ psrc = rspamd_fstringdup (msg->url);
+ src = psrc;
+
+ while (remain--) {
+ if (src[remain] == '/') {
+ src = &src[remain + 1];
+ break;
+ }
+ }
+
+ session->src = src;
+ session->psrc = psrc;
+ session->msg = msg;
+ rspamd_fuzzy_backend_version (session->ctx->backend, src,
+ rspamd_fuzzy_update_version_callback, session);
return 0;
}
@@ -1352,38 +1458,6 @@ accept_fuzzy_socket (gint fd, short what, void *arg)
}
}
-static void
-sync_callback (gint fd, short what, void *arg)
-{
- struct rspamd_worker *worker = (struct rspamd_worker *)arg;
- struct rspamd_fuzzy_storage_ctx *ctx;
- gdouble next_check;
- guint64 old_expired, new_expired;
-
- ctx = worker->ctx;
-
- if (ctx->backend) {
- rspamd_fuzzy_process_updates_queue (ctx, local_db_name);
- /* Call backend sync */
- old_expired = rspamd_fuzzy_backend_sqlite_expired (ctx->backend);
- rspamd_fuzzy_backend_sqlite_sync (ctx->backend, ctx->expire, TRUE);
- new_expired = rspamd_fuzzy_backend_sqlite_expired (ctx->backend);
-
- if (old_expired < new_expired) {
- ctx->stat.fuzzy_hashes_expired += new_expired - old_expired;
- }
- }
-
- /* Timer event */
- event_del (&tev);
- evtimer_set (&tev, sync_callback, worker);
- event_base_set (ctx->ev_base, &tev);
- /* Plan event with jitter */
- next_check = rspamd_time_jitter (ctx->sync_timeout, 0);
- double_to_tv (next_check, &tmv);
- evtimer_add (&tev, &tmv);
-}
-
static gboolean
rspamd_fuzzy_storage_sync (struct rspamd_main *rspamd_main,
struct rspamd_worker *worker, gint fd,
@@ -1392,29 +1466,13 @@ rspamd_fuzzy_storage_sync (struct rspamd_main *rspamd_main,
gpointer ud)
{
struct rspamd_fuzzy_storage_ctx *ctx = ud;
- gdouble next_check;
- guint64 old_expired, new_expired;
struct rspamd_control_reply rep;
- if (ctx->backend) {
- rspamd_fuzzy_process_updates_queue (ctx, local_db_name);
- /* Call backend sync */
- old_expired = rspamd_fuzzy_backend_sqlite_expired (ctx->backend);
- rspamd_fuzzy_backend_sqlite_sync (ctx->backend, ctx->expire, TRUE);
- new_expired = rspamd_fuzzy_backend_sqlite_expired (ctx->backend);
-
- if (old_expired < new_expired) {
- ctx->stat.fuzzy_hashes_expired += new_expired - old_expired;
- }
- }
-
rep.reply.fuzzy_sync.status = 0;
- /* Handle next timer event */
- event_del (&tev);
- next_check = rspamd_time_jitter (ctx->sync_timeout, 0);
- double_to_tv (next_check, &tmv);
- evtimer_add (&tev, &tmv);
+ if (ctx->backend && worker->index == 0) {
+ rspamd_fuzzy_backend_start_expire (ctx->backend, ctx->sync_timeout);
+ }
if (write (fd, &rep, sizeof (rep)) != sizeof (rep)) {
msg_err ("cannot write reply to the control socket: %s",
@@ -1439,14 +1497,14 @@ rspamd_fuzzy_storage_reload (struct rspamd_main *rspamd_main,
if (ctx->backend) {
/* Close backend and reopen it one more time */
- rspamd_fuzzy_backend_sqlite_close (ctx->backend);
+ rspamd_fuzzy_backend_close (ctx->backend);
}
memset (&rep, 0, sizeof (rep));
rep.type = RSPAMD_CONTROL_RELOAD;
- if ((ctx->backend = rspamd_fuzzy_backend_sqlite_open (ctx->hashfile,
- TRUE,
+ if ((ctx->backend = rspamd_fuzzy_backend_create (ctx->ev_base,
+ worker->cf->options,
&err)) == NULL) {
msg_err ("cannot open backend after reload: %e", err);
g_error_free (err);
@@ -1456,6 +1514,10 @@ rspamd_fuzzy_storage_reload (struct rspamd_main *rspamd_main,
rep.reply.reload.status = 0;
}
+ if (ctx->backend && worker->index == 0) {
+ rspamd_fuzzy_backend_start_expire (ctx->backend, ctx->sync_timeout);
+ }
+
if (write (fd, &rep, sizeof (rep)) != sizeof (rep)) {
msg_err ("cannot write reply to the control socket: %s",
strerror (errno));
@@ -1644,7 +1706,7 @@ rspamd_fuzzy_storage_stat (struct rspamd_main *rspamd_main,
rep.reply.fuzzy_stat.status = 0;
memcpy (rep.reply.fuzzy_stat.storage_id,
- rspamd_fuzzy_sqlite_backend_id (ctx->backend),
+ rspamd_fuzzy_backend_id (ctx->backend),
sizeof (rep.reply.fuzzy_stat.storage_id));
obj = rspamd_fuzzy_stat_to_ucl (ctx, TRUE);
@@ -1901,7 +1963,6 @@ init_fuzzy (struct rspamd_config *cfg)
ctx->magic = rspamd_fuzzy_storage_magic;
ctx->sync_timeout = DEFAULT_SYNC_TIMEOUT;
ctx->master_timeout = DEFAULT_MASTER_TIMEOUT;
- ctx->expire = DEFAULT_EXPIRE;
ctx->keypair_cache_size = DEFAULT_KEYPAIR_CACHE_SIZE;
ctx->keys = g_hash_table_new_full (fuzzy_kp_hash, fuzzy_kp_equal,
NULL, fuzzy_key_dtor);
@@ -2108,7 +2169,6 @@ fuzzy_peer_rep (struct rspamd_worker *worker,
GList *cur;
struct rspamd_worker_listen_socket *ls;
struct event *accept_events;
- gdouble next_check;
ctx->peer_fd = rep_fd;
@@ -2157,14 +2217,6 @@ fuzzy_peer_rep (struct rspamd_worker *worker,
event_base_set (ctx->ev_base, &ctx->peer_ev);
event_add (&ctx->peer_ev, NULL);
ctx->updates_pending = g_queue_new ();
-
- /* Timer event */
- evtimer_set (&tev, sync_callback, worker);
- event_base_set (ctx->ev_base, &tev);
- /* Plan event with jitter */
- next_check = rspamd_time_jitter (ctx->sync_timeout, 0);
- double_to_tv (next_check, &tmv);
- evtimer_add (&tev, &tmv);
}
}
@@ -2188,13 +2240,14 @@ start_fuzzy (struct rspamd_worker *worker)
/*
* Open DB and perform VACUUM
*/
- if ((ctx->backend = rspamd_fuzzy_backend_sqlite_open (ctx->hashfile, TRUE, &err)) == NULL) {
+ if ((ctx->backend = rspamd_fuzzy_backend_create (ctx->ev_base,
+ worker->cf->options, &err)) == NULL) {
msg_err ("cannot open backend: %e", err);
g_error_free (err);
exit (EXIT_SUCCESS);
}
- ctx->stat.fuzzy_hashes = rspamd_fuzzy_backend_sqlite_count (ctx->backend);
+ rspamd_fuzzy_backend_count (ctx->backend, fuzzy_count_callback, ctx);
if (ctx->keypair_cache_size > 0) {
/* Create keypairs cache */
@@ -2202,7 +2255,7 @@ start_fuzzy (struct rspamd_worker *worker)
}
if (worker->index == 0) {
- rspamd_fuzzy_backend_sqlite_sync (ctx->backend, ctx->expire, TRUE);
+ rspamd_fuzzy_backend_start_expire (ctx->backend, ctx->sync_timeout);
}
if (ctx->mirrors && ctx->mirrors->len != 0) {
@@ -2260,12 +2313,7 @@ start_fuzzy (struct rspamd_worker *worker)
event_base_loop (ctx->ev_base, 0);
rspamd_worker_block_signals ();
- if (worker->index == 0) {
- rspamd_fuzzy_process_updates_queue (ctx, local_db_name);
- rspamd_fuzzy_backend_sqlite_sync (ctx->backend, ctx->expire, TRUE);
- }
-
- rspamd_fuzzy_backend_sqlite_close (ctx->backend);
+ rspamd_fuzzy_backend_close (ctx->backend);
rspamd_log_close (worker->srv->logger);
if (ctx->peer_fd != -1) {