aboutsummaryrefslogtreecommitdiffstats
path: root/src/libserver/fuzzy_backend
diff options
context:
space:
mode:
Diffstat (limited to 'src/libserver/fuzzy_backend')
-rw-r--r--src/libserver/fuzzy_backend/fuzzy_backend.c569
-rw-r--r--src/libserver/fuzzy_backend/fuzzy_backend.h131
-rw-r--r--src/libserver/fuzzy_backend/fuzzy_backend_redis.c1564
-rw-r--r--src/libserver/fuzzy_backend/fuzzy_backend_redis.h67
-rw-r--r--src/libserver/fuzzy_backend/fuzzy_backend_sqlite.c1065
-rw-r--r--src/libserver/fuzzy_backend/fuzzy_backend_sqlite.h107
6 files changed, 3503 insertions, 0 deletions
diff --git a/src/libserver/fuzzy_backend/fuzzy_backend.c b/src/libserver/fuzzy_backend/fuzzy_backend.c
new file mode 100644
index 000000000..f6dec1d6e
--- /dev/null
+++ b/src/libserver/fuzzy_backend/fuzzy_backend.c
@@ -0,0 +1,569 @@
+/*-
+ * Copyright 2016 Vsevolod Stakhov
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "config.h"
+#include "fuzzy_backend.h"
+#include "fuzzy_backend_sqlite.h"
+#include "fuzzy_backend_redis.h"
+#include "cfg_file.h"
+#include "fuzzy_wire.h"
+
+#define DEFAULT_EXPIRE 172800L
+
+enum rspamd_fuzzy_backend_type {
+ RSPAMD_FUZZY_BACKEND_SQLITE = 0,
+ RSPAMD_FUZZY_BACKEND_REDIS = 1,
+};
+
+static void* rspamd_fuzzy_backend_init_sqlite (struct rspamd_fuzzy_backend *bk,
+ const ucl_object_t *obj, struct rspamd_config *cfg, GError **err);
+static void rspamd_fuzzy_backend_check_sqlite (struct rspamd_fuzzy_backend *bk,
+ const struct rspamd_fuzzy_cmd *cmd,
+ rspamd_fuzzy_check_cb cb, void *ud,
+ void *subr_ud);
+static void rspamd_fuzzy_backend_update_sqlite (struct rspamd_fuzzy_backend *bk,
+ GArray *updates, const gchar *src,
+ rspamd_fuzzy_update_cb cb, void *ud,
+ void *subr_ud);
+static void rspamd_fuzzy_backend_count_sqlite (struct rspamd_fuzzy_backend *bk,
+ rspamd_fuzzy_count_cb cb, void *ud,
+ void *subr_ud);
+static void rspamd_fuzzy_backend_version_sqlite (struct rspamd_fuzzy_backend *bk,
+ const gchar *src,
+ rspamd_fuzzy_version_cb cb, void *ud,
+ void *subr_ud);
+static const gchar* rspamd_fuzzy_backend_id_sqlite (struct rspamd_fuzzy_backend *bk,
+ void *subr_ud);
+static void rspamd_fuzzy_backend_expire_sqlite (struct rspamd_fuzzy_backend *bk,
+ void *subr_ud);
+static void rspamd_fuzzy_backend_close_sqlite (struct rspamd_fuzzy_backend *bk,
+ void *subr_ud);
+
+struct rspamd_fuzzy_backend_subr {
+ void* (*init) (struct rspamd_fuzzy_backend *bk, const ucl_object_t *obj,
+ struct rspamd_config *cfg,
+ GError **err);
+ void (*check) (struct rspamd_fuzzy_backend *bk,
+ const struct rspamd_fuzzy_cmd *cmd,
+ rspamd_fuzzy_check_cb cb, void *ud,
+ void *subr_ud);
+ void (*update) (struct rspamd_fuzzy_backend *bk,
+ GArray *updates, const gchar *src,
+ rspamd_fuzzy_update_cb cb, void *ud,
+ void *subr_ud);
+ void (*count) (struct rspamd_fuzzy_backend *bk,
+ rspamd_fuzzy_count_cb cb, void *ud,
+ void *subr_ud);
+ void (*version) (struct rspamd_fuzzy_backend *bk,
+ const gchar *src,
+ rspamd_fuzzy_version_cb cb, void *ud,
+ void *subr_ud);
+ const gchar* (*id) (struct rspamd_fuzzy_backend *bk, void *subr_ud);
+ void (*periodic) (struct rspamd_fuzzy_backend *bk, void *subr_ud);
+ void (*close) (struct rspamd_fuzzy_backend *bk, void *subr_ud);
+};
+
+static const struct rspamd_fuzzy_backend_subr fuzzy_subrs[] = {
+ [RSPAMD_FUZZY_BACKEND_SQLITE] = {
+ .init = rspamd_fuzzy_backend_init_sqlite,
+ .check = rspamd_fuzzy_backend_check_sqlite,
+ .update = rspamd_fuzzy_backend_update_sqlite,
+ .count = rspamd_fuzzy_backend_count_sqlite,
+ .version = rspamd_fuzzy_backend_version_sqlite,
+ .id = rspamd_fuzzy_backend_id_sqlite,
+ .periodic = rspamd_fuzzy_backend_expire_sqlite,
+ .close = rspamd_fuzzy_backend_close_sqlite,
+ },
+#ifdef WITH_HIREDIS
+ [RSPAMD_FUZZY_BACKEND_REDIS] = {
+ .init = rspamd_fuzzy_backend_init_redis,
+ .check = rspamd_fuzzy_backend_check_redis,
+ .update = rspamd_fuzzy_backend_update_redis,
+ .count = rspamd_fuzzy_backend_count_redis,
+ .version = rspamd_fuzzy_backend_version_redis,
+ .id = rspamd_fuzzy_backend_id_redis,
+ .periodic = rspamd_fuzzy_backend_expire_redis,
+ .close = rspamd_fuzzy_backend_close_redis,
+ }
+#endif
+};
+
+struct rspamd_fuzzy_backend {
+ enum rspamd_fuzzy_backend_type type;
+ gdouble expire;
+ gdouble sync;
+ struct ev_loop *event_loop;
+ rspamd_fuzzy_periodic_cb periodic_cb;
+ void *periodic_ud;
+ const struct rspamd_fuzzy_backend_subr *subr;
+ void *subr_ud;
+ ev_timer periodic_event;
+};
+
+static GQuark
+rspamd_fuzzy_backend_quark (void)
+{
+ return g_quark_from_static_string ("fuzzy-backend");
+}
+
+static void*
+rspamd_fuzzy_backend_init_sqlite (struct rspamd_fuzzy_backend *bk,
+ const ucl_object_t *obj, struct rspamd_config *cfg, GError **err)
+{
+ const ucl_object_t *elt;
+
+ elt = ucl_object_lookup_any (obj, "hashfile", "hash_file", "file",
+ "database", NULL);
+
+ if (elt == NULL || ucl_object_type (elt) != UCL_STRING) {
+ g_set_error (err, rspamd_fuzzy_backend_quark (),
+ EINVAL, "missing sqlite3 path");
+ return NULL;
+ }
+
+ return rspamd_fuzzy_backend_sqlite_open (ucl_object_tostring (elt),
+ FALSE, err);
+}
+
+static void
+rspamd_fuzzy_backend_check_sqlite (struct rspamd_fuzzy_backend *bk,
+ const struct rspamd_fuzzy_cmd *cmd,
+ rspamd_fuzzy_check_cb cb, void *ud,
+ void *subr_ud)
+{
+ struct rspamd_fuzzy_backend_sqlite *sq = subr_ud;
+ struct rspamd_fuzzy_reply rep;
+
+ rep = rspamd_fuzzy_backend_sqlite_check (sq, cmd, bk->expire);
+
+ if (cb) {
+ cb (&rep, ud);
+ }
+}
+
+static void
+rspamd_fuzzy_backend_update_sqlite (struct rspamd_fuzzy_backend *bk,
+ GArray *updates, const gchar *src,
+ rspamd_fuzzy_update_cb cb, void *ud,
+ void *subr_ud)
+{
+ struct rspamd_fuzzy_backend_sqlite *sq = subr_ud;
+ gboolean success = FALSE;
+ guint i;
+ struct fuzzy_peer_cmd *io_cmd;
+ struct rspamd_fuzzy_cmd *cmd;
+ gpointer ptr;
+ guint nupdates = 0, nadded = 0, ndeleted = 0, nextended = 0, nignored = 0;
+
+ if (rspamd_fuzzy_backend_sqlite_prepare_update (sq, src)) {
+ for (i = 0; i < updates->len; i ++) {
+ io_cmd = &g_array_index (updates, struct fuzzy_peer_cmd, i);
+
+ if (io_cmd->is_shingle) {
+ cmd = &io_cmd->cmd.shingle.basic;
+ ptr = &io_cmd->cmd.shingle;
+ }
+ else {
+ cmd = &io_cmd->cmd.normal;
+ ptr = &io_cmd->cmd.normal;
+ }
+
+ if (cmd->cmd == FUZZY_WRITE) {
+ rspamd_fuzzy_backend_sqlite_add (sq, ptr);
+ nadded ++;
+ nupdates ++;
+ }
+ else if (cmd->cmd == FUZZY_DEL) {
+ rspamd_fuzzy_backend_sqlite_del (sq, ptr);
+ ndeleted ++;
+ nupdates ++;
+ }
+ else {
+ if (cmd->cmd == FUZZY_REFRESH) {
+ nextended ++;
+ }
+ else {
+ nignored ++;
+ }
+ }
+ }
+
+ if (rspamd_fuzzy_backend_sqlite_finish_update (sq, src,
+ nupdates > 0)) {
+ success = TRUE;
+ }
+ }
+
+ if (cb) {
+ cb (success, nadded, ndeleted, nextended, nignored, ud);
+ }
+}
+
+static void
+rspamd_fuzzy_backend_count_sqlite (struct rspamd_fuzzy_backend *bk,
+ rspamd_fuzzy_count_cb cb, void *ud,
+ void *subr_ud)
+{
+ struct rspamd_fuzzy_backend_sqlite *sq = subr_ud;
+ guint64 nhashes;
+
+ nhashes = rspamd_fuzzy_backend_sqlite_count (sq);
+
+ if (cb) {
+ cb (nhashes, ud);
+ }
+}
+
+static void
+rspamd_fuzzy_backend_version_sqlite (struct rspamd_fuzzy_backend *bk,
+ const gchar *src,
+ rspamd_fuzzy_version_cb cb, void *ud,
+ void *subr_ud)
+{
+ struct rspamd_fuzzy_backend_sqlite *sq = subr_ud;
+ guint64 rev;
+
+ rev = rspamd_fuzzy_backend_sqlite_version (sq, src);
+
+ if (cb) {
+ cb (rev, ud);
+ }
+}
+
+static const gchar*
+rspamd_fuzzy_backend_id_sqlite (struct rspamd_fuzzy_backend *bk,
+ void *subr_ud)
+{
+ struct rspamd_fuzzy_backend_sqlite *sq = subr_ud;
+
+ return rspamd_fuzzy_sqlite_backend_id (sq);
+}
+static void
+rspamd_fuzzy_backend_expire_sqlite (struct rspamd_fuzzy_backend *bk,
+ void *subr_ud)
+{
+ struct rspamd_fuzzy_backend_sqlite *sq = subr_ud;
+
+ rspamd_fuzzy_backend_sqlite_sync (sq, bk->expire, TRUE);
+}
+
+static void
+rspamd_fuzzy_backend_close_sqlite (struct rspamd_fuzzy_backend *bk,
+ void *subr_ud)
+{
+ struct rspamd_fuzzy_backend_sqlite *sq = subr_ud;
+
+ rspamd_fuzzy_backend_sqlite_close (sq);
+}
+
+
+struct rspamd_fuzzy_backend *
+rspamd_fuzzy_backend_create (struct ev_loop *ev_base,
+ const ucl_object_t *config,
+ struct rspamd_config *cfg,
+ GError **err)
+{
+ struct rspamd_fuzzy_backend *bk;
+ enum rspamd_fuzzy_backend_type type = RSPAMD_FUZZY_BACKEND_SQLITE;
+ const ucl_object_t *elt;
+ gdouble expire = DEFAULT_EXPIRE;
+
+ if (config != NULL) {
+ elt = ucl_object_lookup (config, "backend");
+
+ if (elt != NULL && ucl_object_type (elt) == UCL_STRING) {
+ if (strcmp (ucl_object_tostring (elt), "sqlite") == 0) {
+ type = RSPAMD_FUZZY_BACKEND_SQLITE;
+ }
+ else if (strcmp (ucl_object_tostring (elt), "redis") == 0) {
+ type = RSPAMD_FUZZY_BACKEND_REDIS;
+ }
+ else {
+ g_set_error (err, rspamd_fuzzy_backend_quark (),
+ EINVAL, "invalid backend type: %s",
+ ucl_object_tostring (elt));
+ return NULL;
+ }
+ }
+
+ elt = ucl_object_lookup (config, "expire");
+
+ if (elt != NULL) {
+ expire = ucl_object_todouble (elt);
+ }
+ }
+
+ bk = g_malloc0 (sizeof (*bk));
+ bk->event_loop = ev_base;
+ bk->expire = expire;
+ bk->type = type;
+ bk->subr = &fuzzy_subrs[type];
+
+ if ((bk->subr_ud = bk->subr->init (bk, config, cfg, err)) == NULL) {
+ g_free (bk);
+
+ return NULL;
+ }
+
+ return bk;
+}
+
+
+void
+rspamd_fuzzy_backend_check (struct rspamd_fuzzy_backend *bk,
+ const struct rspamd_fuzzy_cmd *cmd,
+ rspamd_fuzzy_check_cb cb, void *ud)
+{
+ g_assert (bk != NULL);
+
+ bk->subr->check (bk, cmd, cb, ud, bk->subr_ud);
+}
+
+static guint
+rspamd_fuzzy_digest_hash (gconstpointer key)
+{
+ guint ret;
+
+ /* Distirbuted uniformly already */
+ memcpy (&ret, key, sizeof (ret));
+
+ return ret;
+}
+
+static gboolean
+rspamd_fuzzy_digest_equal (gconstpointer v, gconstpointer v2)
+{
+ return memcmp (v, v2, rspamd_cryptobox_HASHBYTES) == 0;
+}
+
+static void
+rspamd_fuzzy_backend_deduplicate_queue (GArray *updates)
+{
+ GHashTable *seen = g_hash_table_new (rspamd_fuzzy_digest_hash,
+ rspamd_fuzzy_digest_equal);
+ struct fuzzy_peer_cmd *io_cmd, *found;
+ struct rspamd_fuzzy_cmd *cmd;
+ guchar *digest;
+ guint i;
+
+ for (i = 0; i < updates->len; i ++) {
+ io_cmd = &g_array_index (updates, struct fuzzy_peer_cmd, i);
+
+ if (io_cmd->is_shingle) {
+ cmd = &io_cmd->cmd.shingle.basic;
+ }
+ else {
+ cmd = &io_cmd->cmd.normal;
+ }
+
+ digest = cmd->digest;
+
+ found = g_hash_table_lookup (seen, digest);
+
+ if (found == NULL) {
+ /* Add to the seen list, if not a duplicate (huh?) */
+ if (cmd->cmd != FUZZY_DUP) {
+ g_hash_table_insert (seen, digest, io_cmd);
+ }
+ }
+ else {
+ if (found->cmd.normal.flag != cmd->flag) {
+ /* TODO: deal with flags better at some point */
+ continue;
+ }
+
+ /* Apply heuristic */
+ switch (cmd->cmd) {
+ case FUZZY_WRITE:
+ if (found->cmd.normal.cmd == FUZZY_WRITE) {
+ /* Already seen */
+ found->cmd.normal.value += cmd->value;
+ cmd->cmd = FUZZY_DUP; /* Ignore this one */
+ }
+ else if (found->cmd.normal.cmd == FUZZY_REFRESH) {
+ /* Seen refresh command, remove it as write has higher priority */
+ g_hash_table_replace (seen, digest, io_cmd);
+ found->cmd.normal.cmd = FUZZY_DUP;
+ }
+ else if (found->cmd.normal.cmd == FUZZY_DEL) {
+ /* Request delete + add, weird, but ignore add */
+ cmd->cmd = FUZZY_DUP; /* Ignore this one */
+ }
+ break;
+ case FUZZY_REFRESH:
+ if (found->cmd.normal.cmd == FUZZY_WRITE) {
+ /* No need to expire, handled by addition */
+ cmd->cmd = FUZZY_DUP; /* Ignore this one */
+ }
+ else if (found->cmd.normal.cmd == FUZZY_DEL) {
+ /* Request delete + expire, ignore expire */
+ cmd->cmd = FUZZY_DUP; /* Ignore this one */
+ }
+ else if (found->cmd.normal.cmd == FUZZY_REFRESH) {
+ /* Already handled */
+ cmd->cmd = FUZZY_DUP; /* Ignore this one */
+ }
+ break;
+ case FUZZY_DEL:
+ /* Delete has priority over all other commands */
+ g_hash_table_replace (seen, digest, io_cmd);
+ found->cmd.normal.cmd = FUZZY_DUP;
+ break;
+ default:
+ break;
+ }
+ }
+ }
+
+ g_hash_table_unref (seen);
+}
+
+void
+rspamd_fuzzy_backend_process_updates (struct rspamd_fuzzy_backend *bk,
+ GArray *updates, const gchar *src, rspamd_fuzzy_update_cb cb,
+ void *ud)
+{
+ g_assert (bk != NULL);
+ g_assert (updates != NULL);
+
+ if (updates) {
+ rspamd_fuzzy_backend_deduplicate_queue (updates);
+ bk->subr->update (bk, updates, src, cb, ud, bk->subr_ud);
+ }
+ else if (cb) {
+ cb (TRUE, 0, 0, 0, 0, ud);
+ }
+}
+
+
+void
+rspamd_fuzzy_backend_count (struct rspamd_fuzzy_backend *bk,
+ rspamd_fuzzy_count_cb cb, void *ud)
+{
+ g_assert (bk != NULL);
+
+ bk->subr->count (bk, cb, ud, bk->subr_ud);
+}
+
+
+void
+rspamd_fuzzy_backend_version (struct rspamd_fuzzy_backend *bk,
+ const gchar *src,
+ rspamd_fuzzy_version_cb cb, void *ud)
+{
+ g_assert (bk != NULL);
+
+ bk->subr->version (bk, src, cb, ud, bk->subr_ud);
+}
+
+const gchar *
+rspamd_fuzzy_backend_id (struct rspamd_fuzzy_backend *bk)
+{
+ g_assert (bk != NULL);
+
+ if (bk->subr->id) {
+ return bk->subr->id (bk, bk->subr_ud);
+ }
+
+ return NULL;
+}
+
+static inline void
+rspamd_fuzzy_backend_periodic_sync (struct rspamd_fuzzy_backend *bk)
+{
+ if (bk->periodic_cb) {
+ if (bk->periodic_cb (bk->periodic_ud)) {
+ if (bk->subr->periodic) {
+ bk->subr->periodic (bk, bk->subr_ud);
+ }
+ }
+ }
+ else {
+ if (bk->subr->periodic) {
+ bk->subr->periodic (bk, bk->subr_ud);
+ }
+ }
+}
+
+static void
+rspamd_fuzzy_backend_periodic_cb (EV_P_ ev_timer *w, int revents)
+{
+ struct rspamd_fuzzy_backend *bk = (struct rspamd_fuzzy_backend *)w->data;
+ gdouble jittered;
+
+ jittered = rspamd_time_jitter (bk->sync, bk->sync / 2.0);
+ w->repeat = jittered;
+ rspamd_fuzzy_backend_periodic_sync (bk);
+ ev_timer_again (EV_A_ w);
+}
+
+void
+rspamd_fuzzy_backend_start_update (struct rspamd_fuzzy_backend *bk,
+ gdouble timeout,
+ rspamd_fuzzy_periodic_cb cb,
+ void *ud)
+{
+ gdouble jittered;
+
+ g_assert (bk != NULL);
+
+ if (bk->subr->periodic) {
+ if (bk->sync > 0.0) {
+ ev_timer_stop (bk->event_loop, &bk->periodic_event);
+ }
+
+ if (cb) {
+ bk->periodic_cb = cb;
+ bk->periodic_ud = ud;
+ }
+
+ rspamd_fuzzy_backend_periodic_sync (bk);
+ bk->sync = timeout;
+ jittered = rspamd_time_jitter (timeout, timeout / 2.0);
+
+ bk->periodic_event.data = bk;
+ ev_timer_init (&bk->periodic_event, rspamd_fuzzy_backend_periodic_cb,
+ jittered, 0.0);
+ ev_timer_start (bk->event_loop, &bk->periodic_event);
+ }
+}
+
+void
+rspamd_fuzzy_backend_close (struct rspamd_fuzzy_backend *bk)
+{
+ g_assert (bk != NULL);
+
+ if (bk->sync > 0.0) {
+ rspamd_fuzzy_backend_periodic_sync (bk);
+ ev_timer_stop (bk->event_loop, &bk->periodic_event);
+ }
+
+ bk->subr->close (bk, bk->subr_ud);
+
+ g_free (bk);
+}
+
+struct ev_loop*
+rspamd_fuzzy_backend_event_base (struct rspamd_fuzzy_backend *backend)
+{
+ return backend->event_loop;
+}
+
+gdouble
+rspamd_fuzzy_backend_get_expire (struct rspamd_fuzzy_backend *backend)
+{
+ return backend->expire;
+}
diff --git a/src/libserver/fuzzy_backend/fuzzy_backend.h b/src/libserver/fuzzy_backend/fuzzy_backend.h
new file mode 100644
index 000000000..23b9b68ef
--- /dev/null
+++ b/src/libserver/fuzzy_backend/fuzzy_backend.h
@@ -0,0 +1,131 @@
+/*-
+ * Copyright 2016 Vsevolod Stakhov
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef SRC_LIBSERVER_FUZZY_BACKEND_H_
+#define SRC_LIBSERVER_FUZZY_BACKEND_H_
+
+#include "config.h"
+#include "contrib/libev/ev.h"
+#include "fuzzy_wire.h"
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+struct rspamd_fuzzy_backend;
+struct rspamd_config;
+
+/*
+ * Callbacks for fuzzy methods
+ */
+typedef void (*rspamd_fuzzy_check_cb) (struct rspamd_fuzzy_reply *rep, void *ud);
+
+typedef void (*rspamd_fuzzy_update_cb) (gboolean success,
+ guint nadded,
+ guint ndeleted,
+ guint nextended,
+ guint nignored,
+ void *ud);
+
+typedef void (*rspamd_fuzzy_version_cb) (guint64 rev, void *ud);
+
+typedef void (*rspamd_fuzzy_count_cb) (guint64 count, void *ud);
+
+typedef gboolean (*rspamd_fuzzy_periodic_cb) (void *ud);
+
+/**
+ * Open fuzzy backend
+ * @param ev_base
+ * @param config
+ * @param err
+ * @return
+ */
+struct rspamd_fuzzy_backend *rspamd_fuzzy_backend_create (struct ev_loop *ev_base,
+ const ucl_object_t *config,
+ struct rspamd_config *cfg,
+ GError **err);
+
+
+/**
+ * Check a specific hash in storage
+ * @param cmd
+ * @param cb
+ * @param ud
+ */
+void rspamd_fuzzy_backend_check (struct rspamd_fuzzy_backend *bk,
+ const struct rspamd_fuzzy_cmd *cmd,
+ rspamd_fuzzy_check_cb cb, void *ud);
+
+/**
+ * Process updates for a specific queue
+ * @param bk
+ * @param updates queue of struct fuzzy_peer_cmd
+ * @param src
+ */
+void rspamd_fuzzy_backend_process_updates (struct rspamd_fuzzy_backend *bk,
+ GArray *updates, const gchar *src, rspamd_fuzzy_update_cb cb,
+ void *ud);
+
+/**
+ * Gets number of hashes from the backend
+ * @param bk
+ * @param cb
+ * @param ud
+ */
+void rspamd_fuzzy_backend_count (struct rspamd_fuzzy_backend *bk,
+ rspamd_fuzzy_count_cb cb, void *ud);
+
+/**
+ * Returns number of revision for a specific source
+ * @param bk
+ * @param src
+ * @param cb
+ * @param ud
+ */
+void rspamd_fuzzy_backend_version (struct rspamd_fuzzy_backend *bk,
+ const gchar *src,
+ rspamd_fuzzy_version_cb cb, void *ud);
+
+/**
+ * Returns unique id for backend
+ * @param backend
+ * @return
+ */
+const gchar *rspamd_fuzzy_backend_id (struct rspamd_fuzzy_backend *backend);
+
+/**
+ * Starts expire process for the backend
+ * @param backend
+ */
+void rspamd_fuzzy_backend_start_update (struct rspamd_fuzzy_backend *backend,
+ gdouble timeout,
+ rspamd_fuzzy_periodic_cb cb,
+ void *ud);
+
+struct ev_loop *rspamd_fuzzy_backend_event_base (struct rspamd_fuzzy_backend *backend);
+
+gdouble rspamd_fuzzy_backend_get_expire (struct rspamd_fuzzy_backend *backend);
+
+/**
+ * Closes backend
+ * @param backend
+ */
+void rspamd_fuzzy_backend_close (struct rspamd_fuzzy_backend *backend);
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif /* SRC_LIBSERVER_FUZZY_BACKEND_H_ */
diff --git a/src/libserver/fuzzy_backend/fuzzy_backend_redis.c b/src/libserver/fuzzy_backend/fuzzy_backend_redis.c
new file mode 100644
index 000000000..7070773b3
--- /dev/null
+++ b/src/libserver/fuzzy_backend/fuzzy_backend_redis.c
@@ -0,0 +1,1564 @@
+/*-
+ * Copyright 2016 Vsevolod Stakhov
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "config.h"
+#include "ref.h"
+#include "fuzzy_backend.h"
+#include "fuzzy_backend_redis.h"
+#include "redis_pool.h"
+#include "cryptobox.h"
+#include "str_util.h"
+#include "upstream.h"
+#include "contrib/hiredis/hiredis.h"
+#include "contrib/hiredis/async.h"
+#include "lua/lua_common.h"
+
+#define REDIS_DEFAULT_PORT 6379
+#define REDIS_DEFAULT_OBJECT "fuzzy"
+#define REDIS_DEFAULT_TIMEOUT 2.0
+
+#define msg_err_redis_session(...) rspamd_default_log_function (G_LOG_LEVEL_CRITICAL, \
+ "fuzzy_redis", session->backend->id, \
+ G_STRFUNC, \
+ __VA_ARGS__)
+#define msg_warn_redis_session(...) rspamd_default_log_function (G_LOG_LEVEL_WARNING, \
+ "fuzzy_redis", session->backend->id, \
+ G_STRFUNC, \
+ __VA_ARGS__)
+#define msg_info_redis_session(...) rspamd_default_log_function (G_LOG_LEVEL_INFO, \
+ "fuzzy_redis", session->backend->id, \
+ G_STRFUNC, \
+ __VA_ARGS__)
+#define msg_debug_redis_session(...) rspamd_conditional_debug_fast (NULL, NULL, \
+ rspamd_fuzzy_redis_log_id, "fuzzy_redis", session->backend->id, \
+ G_STRFUNC, \
+ __VA_ARGS__)
+
+INIT_LOG_MODULE(fuzzy_redis)
+
+struct rspamd_fuzzy_backend_redis {
+ lua_State *L;
+ const gchar *redis_object;
+ const gchar *password;
+ const gchar *dbname;
+ gchar *id;
+ struct rspamd_redis_pool *pool;
+ gdouble timeout;
+ gint conf_ref;
+ ref_entry_t ref;
+};
+
+enum rspamd_fuzzy_redis_command {
+ RSPAMD_FUZZY_REDIS_COMMAND_COUNT,
+ RSPAMD_FUZZY_REDIS_COMMAND_VERSION,
+ RSPAMD_FUZZY_REDIS_COMMAND_UPDATES,
+ RSPAMD_FUZZY_REDIS_COMMAND_CHECK
+};
+
+struct rspamd_fuzzy_redis_session {
+ struct rspamd_fuzzy_backend_redis *backend;
+ redisAsyncContext *ctx;
+ ev_timer timeout;
+ const struct rspamd_fuzzy_cmd *cmd;
+ struct ev_loop *event_loop;
+ float prob;
+ gboolean shingles_checked;
+
+ enum rspamd_fuzzy_redis_command command;
+ guint nargs;
+
+ guint nadded;
+ guint ndeleted;
+ guint nextended;
+ guint nignored;
+
+ union {
+ rspamd_fuzzy_check_cb cb_check;
+ rspamd_fuzzy_update_cb cb_update;
+ rspamd_fuzzy_version_cb cb_version;
+ rspamd_fuzzy_count_cb cb_count;
+ } callback;
+ void *cbdata;
+
+ gchar **argv;
+ gsize *argv_lens;
+ struct upstream *up;
+ guchar found_digest[rspamd_cryptobox_HASHBYTES];
+};
+
+static inline struct upstream_list *
+rspamd_redis_get_servers (struct rspamd_fuzzy_backend_redis *ctx,
+ const gchar *what)
+{
+ lua_State *L = ctx->L;
+ struct upstream_list *res;
+
+ lua_rawgeti (L, LUA_REGISTRYINDEX, ctx->conf_ref);
+ lua_pushstring (L, what);
+ lua_gettable (L, -2);
+ res = *((struct upstream_list**)lua_touserdata (L, -1));
+ lua_settop (L, 0);
+
+ return res;
+}
+
+static inline void
+rspamd_fuzzy_redis_session_free_args (struct rspamd_fuzzy_redis_session *session)
+{
+ guint i;
+
+ if (session->argv) {
+ for (i = 0; i < session->nargs; i ++) {
+ g_free (session->argv[i]);
+ }
+
+ g_free (session->argv);
+ g_free (session->argv_lens);
+ }
+}
+static void
+rspamd_fuzzy_redis_session_dtor (struct rspamd_fuzzy_redis_session *session,
+ gboolean is_fatal)
+{
+ redisAsyncContext *ac;
+
+
+ if (session->ctx) {
+ ac = session->ctx;
+ session->ctx = NULL;
+ rspamd_redis_pool_release_connection (session->backend->pool,
+ ac,
+ is_fatal ? RSPAMD_REDIS_RELEASE_FATAL : RSPAMD_REDIS_RELEASE_DEFAULT);
+ }
+
+ ev_timer_stop (session->event_loop, &session->timeout);
+ rspamd_fuzzy_redis_session_free_args (session);
+
+ REF_RELEASE (session->backend);
+ g_free (session);
+}
+
+static void
+rspamd_fuzzy_backend_redis_dtor (struct rspamd_fuzzy_backend_redis *backend)
+{
+ lua_State *L = backend->L;
+
+ if (backend->conf_ref) {
+ luaL_unref (L, LUA_REGISTRYINDEX, backend->conf_ref);
+ }
+
+ if (backend->id) {
+ g_free (backend->id);
+ }
+
+ g_free (backend);
+}
+
+void*
+rspamd_fuzzy_backend_init_redis (struct rspamd_fuzzy_backend *bk,
+ const ucl_object_t *obj, struct rspamd_config *cfg, GError **err)
+{
+ struct rspamd_fuzzy_backend_redis *backend;
+ const ucl_object_t *elt;
+ gboolean ret = FALSE;
+ guchar id_hash[rspamd_cryptobox_HASHBYTES];
+ rspamd_cryptobox_hash_state_t st;
+ lua_State *L = (lua_State *)cfg->lua_state;
+ gint conf_ref = -1;
+
+ backend = g_malloc0 (sizeof (*backend));
+
+ backend->timeout = REDIS_DEFAULT_TIMEOUT;
+ backend->redis_object = REDIS_DEFAULT_OBJECT;
+ backend->L = L;
+
+ ret = rspamd_lua_try_load_redis (L, obj, cfg, &conf_ref);
+
+ /* Now try global redis settings */
+ if (!ret) {
+ elt = ucl_object_lookup (cfg->rcl_obj, "redis");
+
+ if (elt) {
+ const ucl_object_t *specific_obj;
+
+ specific_obj = ucl_object_lookup_any (elt, "fuzzy", "fuzzy_storage",
+ NULL);
+
+ if (specific_obj) {
+ ret = rspamd_lua_try_load_redis (L, specific_obj, cfg, &conf_ref);
+ }
+ else {
+ ret = rspamd_lua_try_load_redis (L, elt, cfg, &conf_ref);
+ }
+ }
+ }
+
+ if (!ret) {
+ msg_err_config ("cannot init redis backend for fuzzy storage");
+ g_free (backend);
+
+ return NULL;
+ }
+
+ elt = ucl_object_lookup (obj, "prefix");
+ if (elt == NULL || ucl_object_type (elt) != UCL_STRING) {
+ backend->redis_object = REDIS_DEFAULT_OBJECT;
+ }
+ else {
+ backend->redis_object = ucl_object_tostring (elt);
+ }
+
+ backend->conf_ref = conf_ref;
+
+ /* Check some common table values */
+ lua_rawgeti (L, LUA_REGISTRYINDEX, conf_ref);
+
+ lua_pushstring (L, "timeout");
+ lua_gettable (L, -2);
+ if (lua_type (L, -1) == LUA_TNUMBER) {
+ backend->timeout = lua_tonumber (L, -1);
+ }
+ lua_pop (L, 1);
+
+ lua_pushstring (L, "db");
+ lua_gettable (L, -2);
+ if (lua_type (L, -1) == LUA_TSTRING) {
+ backend->dbname = rspamd_mempool_strdup (cfg->cfg_pool,
+ lua_tostring (L, -1));
+ }
+ lua_pop (L, 1);
+
+ lua_pushstring (L, "password");
+ lua_gettable (L, -2);
+ if (lua_type (L, -1) == LUA_TSTRING) {
+ backend->password = rspamd_mempool_strdup (cfg->cfg_pool,
+ lua_tostring (L, -1));
+ }
+ lua_pop (L, 1);
+
+ lua_settop (L, 0);
+
+ REF_INIT_RETAIN (backend, rspamd_fuzzy_backend_redis_dtor);
+ backend->pool = cfg->redis_pool;
+ rspamd_cryptobox_hash_init (&st, NULL, 0);
+ rspamd_cryptobox_hash_update (&st, backend->redis_object,
+ strlen (backend->redis_object));
+
+ if (backend->dbname) {
+ rspamd_cryptobox_hash_update (&st, backend->dbname,
+ strlen (backend->dbname));
+ }
+
+ if (backend->password) {
+ rspamd_cryptobox_hash_update (&st, backend->password,
+ strlen (backend->password));
+ }
+
+ rspamd_cryptobox_hash_final (&st, id_hash);
+ backend->id = rspamd_encode_base32 (id_hash, sizeof (id_hash));
+
+ return backend;
+}
+
+static void
+rspamd_fuzzy_redis_timeout (EV_P_ ev_timer *w, int revents)
+{
+ struct rspamd_fuzzy_redis_session *session =
+ (struct rspamd_fuzzy_redis_session *)w->data;
+ redisAsyncContext *ac;
+ static char errstr[128];
+
+ if (session->ctx) {
+ ac = session->ctx;
+ session->ctx = NULL;
+ ac->err = REDIS_ERR_IO;
+ /* Should be safe as in hiredis it is char[128] */
+ rspamd_snprintf (errstr, sizeof (errstr), "%s", strerror (ETIMEDOUT));
+ ac->errstr = errstr;
+
+ /* This will cause session closing */
+ rspamd_redis_pool_release_connection (session->backend->pool,
+ ac, RSPAMD_REDIS_RELEASE_FATAL);
+ }
+}
+
+static void rspamd_fuzzy_redis_check_callback (redisAsyncContext *c, gpointer r,
+ gpointer priv);
+
+struct _rspamd_fuzzy_shingles_helper {
+ guchar digest[64];
+ guint found;
+};
+
+static gint
+rspamd_fuzzy_backend_redis_shingles_cmp (const void *a, const void *b)
+{
+ const struct _rspamd_fuzzy_shingles_helper *sha = a,
+ *shb = b;
+
+ return memcmp (sha->digest, shb->digest, sizeof (sha->digest));
+}
+
+static void
+rspamd_fuzzy_redis_shingles_callback (redisAsyncContext *c, gpointer r,
+ gpointer priv)
+{
+ struct rspamd_fuzzy_redis_session *session = priv;
+ redisReply *reply = r, *cur;
+ struct rspamd_fuzzy_reply rep;
+ GString *key;
+ struct _rspamd_fuzzy_shingles_helper *shingles, *prev = NULL, *sel = NULL;
+ guint i, found = 0, max_found = 0, cur_found = 0;
+
+ ev_timer_stop (session->event_loop, &session->timeout);
+ memset (&rep, 0, sizeof (rep));
+
+ if (c->err == 0) {
+ rspamd_upstream_ok (session->up);
+
+ if (reply->type == REDIS_REPLY_ARRAY &&
+ reply->elements == RSPAMD_SHINGLE_SIZE) {
+ shingles = g_alloca (sizeof (struct _rspamd_fuzzy_shingles_helper) *
+ RSPAMD_SHINGLE_SIZE);
+
+ for (i = 0; i < RSPAMD_SHINGLE_SIZE; i ++) {
+ cur = reply->element[i];
+
+ if (cur->type == REDIS_REPLY_STRING) {
+ shingles[i].found = 1;
+ memcpy (shingles[i].digest, cur->str, MIN (64, cur->len));
+ found ++;
+ }
+ else {
+ memset (shingles[i].digest, 0, sizeof (shingles[i].digest));
+ shingles[i].found = 0;
+ }
+ }
+
+ if (found > RSPAMD_SHINGLE_SIZE / 2) {
+ /* Now sort to find the most frequent element */
+ qsort (shingles, RSPAMD_SHINGLE_SIZE,
+ sizeof (struct _rspamd_fuzzy_shingles_helper),
+ rspamd_fuzzy_backend_redis_shingles_cmp);
+
+ prev = &shingles[0];
+
+ for (i = 1; i < RSPAMD_SHINGLE_SIZE; i ++) {
+ if (!shingles[i].found) {
+ continue;
+ }
+
+ if (memcmp (shingles[i].digest, prev->digest, 64) == 0) {
+ cur_found ++;
+
+ if (cur_found > max_found) {
+ max_found = cur_found;
+ sel = &shingles[i];
+ }
+ }
+ else {
+ cur_found = 1;
+ prev = &shingles[i];
+ }
+ }
+
+ if (max_found > RSPAMD_SHINGLE_SIZE / 2) {
+ session->prob = ((float)max_found) / RSPAMD_SHINGLE_SIZE;
+ rep.v1.prob = session->prob;
+
+ g_assert (sel != NULL);
+
+ /* Prepare new check command */
+ rspamd_fuzzy_redis_session_free_args (session);
+ session->nargs = 5;
+ session->argv = g_malloc (sizeof (gchar *) * session->nargs);
+ session->argv_lens = g_malloc (sizeof (gsize) * session->nargs);
+
+ key = g_string_new (session->backend->redis_object);
+ g_string_append_len (key, sel->digest, sizeof (sel->digest));
+ session->argv[0] = g_strdup ("HMGET");
+ session->argv_lens[0] = 5;
+ session->argv[1] = key->str;
+ session->argv_lens[1] = key->len;
+ session->argv[2] = g_strdup ("V");
+ session->argv_lens[2] = 1;
+ session->argv[3] = g_strdup ("F");
+ session->argv_lens[3] = 1;
+ session->argv[4] = g_strdup ("C");
+ session->argv_lens[4] = 1;
+ g_string_free (key, FALSE); /* Do not free underlying array */
+ memcpy (session->found_digest, sel->digest,
+ sizeof (session->cmd->digest));
+
+ g_assert (session->ctx != NULL);
+ if (redisAsyncCommandArgv (session->ctx,
+ rspamd_fuzzy_redis_check_callback,
+ session, session->nargs,
+ (const gchar **)session->argv,
+ session->argv_lens) != REDIS_OK) {
+
+ if (session->callback.cb_check) {
+ memset (&rep, 0, sizeof (rep));
+ session->callback.cb_check (&rep, session->cbdata);
+ }
+
+ rspamd_fuzzy_redis_session_dtor (session, TRUE);
+ }
+ else {
+ /* Add timeout */
+ session->timeout.data = session;
+ ev_timer_init (&session->timeout,
+ rspamd_fuzzy_redis_timeout,
+ session->backend->timeout, 0.0);
+ ev_timer_start (session->event_loop, &session->timeout);
+ }
+
+ return;
+ }
+ }
+ }
+
+ if (session->callback.cb_check) {
+ session->callback.cb_check (&rep, session->cbdata);
+ }
+ }
+ else {
+ if (session->callback.cb_check) {
+ session->callback.cb_check (&rep, session->cbdata);
+ }
+
+ if (c->errstr) {
+ msg_err_redis_session ("error getting shingles: %s", c->errstr);
+ }
+
+ rspamd_upstream_fail (session->up, FALSE, strerror (errno));
+ }
+
+ rspamd_fuzzy_redis_session_dtor (session, FALSE);
+}
+
+static void
+rspamd_fuzzy_backend_check_shingles (struct rspamd_fuzzy_redis_session *session)
+{
+ struct rspamd_fuzzy_reply rep;
+ const struct rspamd_fuzzy_shingle_cmd *shcmd;
+ GString *key;
+ guint i, init_len;
+
+ rspamd_fuzzy_redis_session_free_args (session);
+ /* First of all check digest */
+ session->nargs = RSPAMD_SHINGLE_SIZE + 1;
+ session->argv = g_malloc (sizeof (gchar *) * session->nargs);
+ session->argv_lens = g_malloc (sizeof (gsize) * session->nargs);
+ shcmd = (const struct rspamd_fuzzy_shingle_cmd *)session->cmd;
+
+ session->argv[0] = g_strdup ("MGET");
+ session->argv_lens[0] = 4;
+ init_len = strlen (session->backend->redis_object);
+
+ for (i = 0; i < RSPAMD_SHINGLE_SIZE; i ++) {
+
+ key = g_string_sized_new (init_len + 2 + 2 + sizeof ("18446744073709551616"));
+ rspamd_printf_gstring (key, "%s_%d_%uL", session->backend->redis_object,
+ i, shcmd->sgl.hashes[i]);
+ session->argv[i + 1] = key->str;
+ session->argv_lens[i + 1] = key->len;
+ g_string_free (key, FALSE); /* Do not free underlying array */
+ }
+
+ session->shingles_checked = TRUE;
+
+ g_assert (session->ctx != NULL);
+
+ if (redisAsyncCommandArgv (session->ctx, rspamd_fuzzy_redis_shingles_callback,
+ session, session->nargs,
+ (const gchar **)session->argv, session->argv_lens) != REDIS_OK) {
+ msg_err ("cannot execute redis command: %s", session->ctx->errstr);
+
+ if (session->callback.cb_check) {
+ memset (&rep, 0, sizeof (rep));
+ session->callback.cb_check (&rep, session->cbdata);
+ }
+
+ rspamd_fuzzy_redis_session_dtor (session, TRUE);
+ }
+ else {
+ /* Add timeout */
+ session->timeout.data = session;
+ ev_timer_init (&session->timeout,
+ rspamd_fuzzy_redis_timeout,
+ session->backend->timeout, 0.0);
+ ev_timer_start (session->event_loop, &session->timeout);
+ }
+}
+
+static void
+rspamd_fuzzy_redis_check_callback (redisAsyncContext *c, gpointer r,
+ gpointer priv)
+{
+ struct rspamd_fuzzy_redis_session *session = priv;
+ redisReply *reply = r, *cur;
+ struct rspamd_fuzzy_reply rep;
+ gulong value;
+ guint found_elts = 0;
+
+ ev_timer_stop (session->event_loop, &session->timeout);
+ memset (&rep, 0, sizeof (rep));
+
+ if (c->err == 0) {
+ rspamd_upstream_ok (session->up);
+
+ if (reply->type == REDIS_REPLY_ARRAY && reply->elements >= 2) {
+ cur = reply->element[0];
+
+ if (cur->type == REDIS_REPLY_STRING) {
+ value = strtoul (cur->str, NULL, 10);
+ rep.v1.value = value;
+ found_elts ++;
+ }
+
+ cur = reply->element[1];
+
+ if (cur->type == REDIS_REPLY_STRING) {
+ value = strtoul (cur->str, NULL, 10);
+ rep.v1.flag = value;
+ found_elts ++;
+ }
+
+ if (found_elts >= 2) {
+ rep.v1.prob = session->prob;
+ memcpy (rep.digest, session->found_digest, sizeof (rep.digest));
+ }
+
+ rep.ts = 0;
+
+ if (reply->elements > 2) {
+ cur = reply->element[2];
+
+ if (cur->type == REDIS_REPLY_STRING) {
+ rep.ts = strtoul (cur->str, NULL, 10);
+ }
+ }
+ }
+
+ if (found_elts != 2) {
+ if (session->cmd->shingles_count > 0 && !session->shingles_checked) {
+ /* We also need to check all shingles here */
+ rspamd_fuzzy_backend_check_shingles (session);
+ /* Do not free session */
+ return;
+ }
+ else {
+ if (session->callback.cb_check) {
+ session->callback.cb_check (&rep, session->cbdata);
+ }
+ }
+ }
+ else {
+ if (session->callback.cb_check) {
+ session->callback.cb_check (&rep, session->cbdata);
+ }
+ }
+ }
+ else {
+ if (session->callback.cb_check) {
+ session->callback.cb_check (&rep, session->cbdata);
+ }
+
+ if (c->errstr) {
+ msg_err_redis_session ("error getting hashes: %s", c->errstr);
+ }
+
+ rspamd_upstream_fail (session->up, FALSE, strerror (errno));
+ }
+
+ rspamd_fuzzy_redis_session_dtor (session, FALSE);
+}
+
+void
+rspamd_fuzzy_backend_check_redis (struct rspamd_fuzzy_backend *bk,
+ const struct rspamd_fuzzy_cmd *cmd,
+ rspamd_fuzzy_check_cb cb, void *ud,
+ void *subr_ud)
+{
+ struct rspamd_fuzzy_backend_redis *backend = subr_ud;
+ struct rspamd_fuzzy_redis_session *session;
+ struct upstream *up;
+ struct upstream_list *ups;
+ rspamd_inet_addr_t *addr;
+ struct rspamd_fuzzy_reply rep;
+ GString *key;
+
+ g_assert (backend != NULL);
+
+ session = g_malloc0 (sizeof (*session));
+ session->backend = backend;
+ REF_RETAIN (session->backend);
+
+ session->callback.cb_check = cb;
+ session->cbdata = ud;
+ session->command = RSPAMD_FUZZY_REDIS_COMMAND_CHECK;
+ session->cmd = cmd;
+ session->prob = 1.0;
+ memcpy (rep.digest, session->cmd->digest, sizeof (rep.digest));
+ memcpy (session->found_digest, session->cmd->digest, sizeof (rep.digest));
+ session->event_loop = rspamd_fuzzy_backend_event_base (bk);
+
+ /* First of all check digest */
+ session->nargs = 5;
+ session->argv = g_malloc (sizeof (gchar *) * session->nargs);
+ session->argv_lens = g_malloc (sizeof (gsize) * session->nargs);
+
+ key = g_string_new (backend->redis_object);
+ g_string_append_len (key, cmd->digest, sizeof (cmd->digest));
+ session->argv[0] = g_strdup ("HMGET");
+ session->argv_lens[0] = 5;
+ session->argv[1] = key->str;
+ session->argv_lens[1] = key->len;
+ session->argv[2] = g_strdup ("V");
+ session->argv_lens[2] = 1;
+ session->argv[3] = g_strdup ("F");
+ session->argv_lens[3] = 1;
+ session->argv[4] = g_strdup ("C");
+ session->argv_lens[4] = 1;
+ g_string_free (key, FALSE); /* Do not free underlying array */
+
+ ups = rspamd_redis_get_servers (backend, "read_servers");
+ up = rspamd_upstream_get (ups,
+ RSPAMD_UPSTREAM_ROUND_ROBIN,
+ NULL,
+ 0);
+
+ session->up = up;
+ addr = rspamd_upstream_addr_next (up);
+ g_assert (addr != NULL);
+ session->ctx = rspamd_redis_pool_connect (backend->pool,
+ backend->dbname, backend->password,
+ rspamd_inet_address_to_string (addr),
+ rspamd_inet_address_get_port (addr));
+
+ if (session->ctx == NULL) {
+ rspamd_upstream_fail (up, TRUE, strerror (errno));
+ rspamd_fuzzy_redis_session_dtor (session, TRUE);
+
+ if (cb) {
+ memset (&rep, 0, sizeof (rep));
+ cb (&rep, ud);
+ }
+ }
+ else {
+ if (redisAsyncCommandArgv (session->ctx, rspamd_fuzzy_redis_check_callback,
+ session, session->nargs,
+ (const gchar **)session->argv, session->argv_lens) != REDIS_OK) {
+ rspamd_fuzzy_redis_session_dtor (session, TRUE);
+
+ if (cb) {
+ memset (&rep, 0, sizeof (rep));
+ cb (&rep, ud);
+ }
+ }
+ else {
+ /* Add timeout */
+ session->timeout.data = session;
+ ev_timer_init (&session->timeout,
+ rspamd_fuzzy_redis_timeout,
+ session->backend->timeout, 0.0);
+ ev_timer_start (session->event_loop, &session->timeout);
+ }
+ }
+}
+
+static void
+rspamd_fuzzy_redis_count_callback (redisAsyncContext *c, gpointer r,
+ gpointer priv)
+{
+ struct rspamd_fuzzy_redis_session *session = priv;
+ redisReply *reply = r;
+ gulong nelts;
+
+ ev_timer_stop (session->event_loop, &session->timeout);
+
+ if (c->err == 0) {
+ rspamd_upstream_ok (session->up);
+
+ if (reply->type == REDIS_REPLY_INTEGER) {
+ if (session->callback.cb_count) {
+ session->callback.cb_count (reply->integer, session->cbdata);
+ }
+ }
+ else if (reply->type == REDIS_REPLY_STRING) {
+ nelts = strtoul (reply->str, NULL, 10);
+
+ if (session->callback.cb_count) {
+ session->callback.cb_count (nelts, session->cbdata);
+ }
+ }
+ else {
+ if (session->callback.cb_count) {
+ session->callback.cb_count (0, session->cbdata);
+ }
+ }
+ }
+ else {
+ if (session->callback.cb_count) {
+ session->callback.cb_count (0, session->cbdata);
+ }
+
+ if (c->errstr) {
+ msg_err_redis_session ("error getting count: %s", c->errstr);
+ }
+
+ rspamd_upstream_fail (session->up, FALSE, strerror (errno));
+ }
+
+ rspamd_fuzzy_redis_session_dtor (session, FALSE);
+}
+
+void
+rspamd_fuzzy_backend_count_redis (struct rspamd_fuzzy_backend *bk,
+ rspamd_fuzzy_count_cb cb, void *ud,
+ void *subr_ud)
+{
+ struct rspamd_fuzzy_backend_redis *backend = subr_ud;
+ struct rspamd_fuzzy_redis_session *session;
+ struct upstream *up;
+ struct upstream_list *ups;
+ rspamd_inet_addr_t *addr;
+ GString *key;
+
+ g_assert (backend != NULL);
+
+ session = g_malloc0 (sizeof (*session));
+ session->backend = backend;
+ REF_RETAIN (session->backend);
+
+ session->callback.cb_count = cb;
+ session->cbdata = ud;
+ session->command = RSPAMD_FUZZY_REDIS_COMMAND_COUNT;
+ session->event_loop = rspamd_fuzzy_backend_event_base (bk);
+
+ session->nargs = 2;
+ session->argv = g_malloc (sizeof (gchar *) * 2);
+ session->argv_lens = g_malloc (sizeof (gsize) * 2);
+ key = g_string_new (backend->redis_object);
+ g_string_append (key, "_count");
+ session->argv[0] = g_strdup ("GET");
+ session->argv_lens[0] = 3;
+ session->argv[1] = key->str;
+ session->argv_lens[1] = key->len;
+ g_string_free (key, FALSE); /* Do not free underlying array */
+
+ ups = rspamd_redis_get_servers (backend, "read_servers");
+ up = rspamd_upstream_get (ups,
+ RSPAMD_UPSTREAM_ROUND_ROBIN,
+ NULL,
+ 0);
+
+ session->up = up;
+ addr = rspamd_upstream_addr_next (up);
+ g_assert (addr != NULL);
+ session->ctx = rspamd_redis_pool_connect (backend->pool,
+ backend->dbname, backend->password,
+ rspamd_inet_address_to_string (addr),
+ rspamd_inet_address_get_port (addr));
+
+ if (session->ctx == NULL) {
+ rspamd_upstream_fail (up, TRUE, strerror (errno));
+ rspamd_fuzzy_redis_session_dtor (session, TRUE);
+
+ if (cb) {
+ cb (0, ud);
+ }
+ }
+ else {
+ if (redisAsyncCommandArgv (session->ctx, rspamd_fuzzy_redis_count_callback,
+ session, session->nargs,
+ (const gchar **)session->argv, session->argv_lens) != REDIS_OK) {
+ rspamd_fuzzy_redis_session_dtor (session, TRUE);
+
+ if (cb) {
+ cb (0, ud);
+ }
+ }
+ else {
+ /* Add timeout */
+ session->timeout.data = session;
+ ev_timer_init (&session->timeout,
+ rspamd_fuzzy_redis_timeout,
+ session->backend->timeout, 0.0);
+ ev_timer_start (session->event_loop, &session->timeout);
+ }
+ }
+}
+
+static void
+rspamd_fuzzy_redis_version_callback (redisAsyncContext *c, gpointer r,
+ gpointer priv)
+{
+ struct rspamd_fuzzy_redis_session *session = priv;
+ redisReply *reply = r;
+ gulong nelts;
+
+ ev_timer_stop (session->event_loop, &session->timeout);
+
+ if (c->err == 0) {
+ rspamd_upstream_ok (session->up);
+
+ if (reply->type == REDIS_REPLY_INTEGER) {
+ if (session->callback.cb_version) {
+ session->callback.cb_version (reply->integer, session->cbdata);
+ }
+ }
+ else if (reply->type == REDIS_REPLY_STRING) {
+ nelts = strtoul (reply->str, NULL, 10);
+
+ if (session->callback.cb_version) {
+ session->callback.cb_version (nelts, session->cbdata);
+ }
+ }
+ else {
+ if (session->callback.cb_version) {
+ session->callback.cb_version (0, session->cbdata);
+ }
+ }
+ }
+ else {
+ if (session->callback.cb_version) {
+ session->callback.cb_version (0, session->cbdata);
+ }
+
+ if (c->errstr) {
+ msg_err_redis_session ("error getting version: %s", c->errstr);
+ }
+
+ rspamd_upstream_fail (session->up, FALSE, strerror (errno));
+ }
+
+ rspamd_fuzzy_redis_session_dtor (session, FALSE);
+}
+
+void
+rspamd_fuzzy_backend_version_redis (struct rspamd_fuzzy_backend *bk,
+ const gchar *src,
+ rspamd_fuzzy_version_cb cb, void *ud,
+ void *subr_ud)
+{
+ struct rspamd_fuzzy_backend_redis *backend = subr_ud;
+ struct rspamd_fuzzy_redis_session *session;
+ struct upstream *up;
+ struct upstream_list *ups;
+ rspamd_inet_addr_t *addr;
+ GString *key;
+
+ g_assert (backend != NULL);
+
+ session = g_malloc0 (sizeof (*session));
+ session->backend = backend;
+ REF_RETAIN (session->backend);
+
+ session->callback.cb_version = cb;
+ session->cbdata = ud;
+ session->command = RSPAMD_FUZZY_REDIS_COMMAND_VERSION;
+ session->event_loop = rspamd_fuzzy_backend_event_base (bk);
+
+ session->nargs = 2;
+ session->argv = g_malloc (sizeof (gchar *) * 2);
+ session->argv_lens = g_malloc (sizeof (gsize) * 2);
+ key = g_string_new (backend->redis_object);
+ g_string_append (key, src);
+ session->argv[0] = g_strdup ("GET");
+ session->argv_lens[0] = 3;
+ session->argv[1] = key->str;
+ session->argv_lens[1] = key->len;
+ g_string_free (key, FALSE); /* Do not free underlying array */
+
+ ups = rspamd_redis_get_servers (backend, "read_servers");
+ up = rspamd_upstream_get (ups,
+ RSPAMD_UPSTREAM_ROUND_ROBIN,
+ NULL,
+ 0);
+
+ session->up = up;
+ addr = rspamd_upstream_addr_next (up);
+ g_assert (addr != NULL);
+ session->ctx = rspamd_redis_pool_connect (backend->pool,
+ backend->dbname, backend->password,
+ rspamd_inet_address_to_string (addr),
+ rspamd_inet_address_get_port (addr));
+
+ if (session->ctx == NULL) {
+ rspamd_upstream_fail (up, FALSE, strerror (errno));
+ rspamd_fuzzy_redis_session_dtor (session, TRUE);
+
+ if (cb) {
+ cb (0, ud);
+ }
+ }
+ else {
+ if (redisAsyncCommandArgv (session->ctx, rspamd_fuzzy_redis_version_callback,
+ session, session->nargs,
+ (const gchar **)session->argv, session->argv_lens) != REDIS_OK) {
+ rspamd_fuzzy_redis_session_dtor (session, TRUE);
+
+ if (cb) {
+ cb (0, ud);
+ }
+ }
+ else {
+ /* Add timeout */
+ session->timeout.data = session;
+ ev_timer_init (&session->timeout,
+ rspamd_fuzzy_redis_timeout,
+ session->backend->timeout, 0.0);
+ ev_timer_start (session->event_loop, &session->timeout);
+ }
+ }
+}
+
+const gchar*
+rspamd_fuzzy_backend_id_redis (struct rspamd_fuzzy_backend *bk,
+ void *subr_ud)
+{
+ struct rspamd_fuzzy_backend_redis *backend = subr_ud;
+ g_assert (backend != NULL);
+
+ return backend->id;
+}
+
+void
+rspamd_fuzzy_backend_expire_redis (struct rspamd_fuzzy_backend *bk,
+ void *subr_ud)
+{
+ struct rspamd_fuzzy_backend_redis *backend = subr_ud;
+
+ g_assert (backend != NULL);
+}
+
+static gboolean
+rspamd_fuzzy_update_append_command (struct rspamd_fuzzy_backend *bk,
+ struct rspamd_fuzzy_redis_session *session,
+ struct fuzzy_peer_cmd *io_cmd, guint *shift)
+{
+ GString *key, *value;
+ guint cur_shift = *shift;
+ guint i, klen;
+ struct rspamd_fuzzy_cmd *cmd;
+
+ if (io_cmd->is_shingle) {
+ cmd = &io_cmd->cmd.shingle.basic;
+ }
+ else {
+ cmd = &io_cmd->cmd.normal;
+
+ }
+
+ if (cmd->cmd == FUZZY_WRITE) {
+ /*
+ * For each normal hash addition we do 5 redis commands:
+ * HSET <key> F <flag>
+ * HSETNX <key> C <time>
+ * HINCRBY <key> V <weight>
+ * EXPIRE <key> <expire>
+ * Where <key> is <prefix> || <digest>
+ */
+
+ /* HSET */
+ klen = strlen (session->backend->redis_object) +
+ sizeof (cmd->digest) + 1;
+ key = g_string_sized_new (klen);
+ g_string_append (key, session->backend->redis_object);
+ g_string_append_len (key, cmd->digest, sizeof (cmd->digest));
+ value = g_string_sized_new (sizeof ("4294967296"));
+ rspamd_printf_gstring (value, "%d", cmd->flag);
+ session->argv[cur_shift] = g_strdup ("HSET");
+ session->argv_lens[cur_shift++] = sizeof ("HSET") - 1;
+ session->argv[cur_shift] = key->str;
+ session->argv_lens[cur_shift++] = key->len;
+ session->argv[cur_shift] = g_strdup ("F");
+ session->argv_lens[cur_shift++] = sizeof ("F") - 1;
+ session->argv[cur_shift] = value->str;
+ session->argv_lens[cur_shift++] = value->len;
+ g_string_free (key, FALSE);
+ g_string_free (value, FALSE);
+
+ if (redisAsyncCommandArgv (session->ctx, NULL, NULL,
+ 4,
+ (const gchar **)&session->argv[cur_shift - 4],
+ &session->argv_lens[cur_shift - 4]) != REDIS_OK) {
+
+ return FALSE;
+ }
+
+ /* HSETNX */
+ klen = strlen (session->backend->redis_object) +
+ sizeof (cmd->digest) + 1;
+ key = g_string_sized_new (klen);
+ g_string_append (key, session->backend->redis_object);
+ g_string_append_len (key, cmd->digest, sizeof (cmd->digest));
+ value = g_string_sized_new (sizeof ("18446744073709551616"));
+ rspamd_printf_gstring (value, "%L", (gint64)rspamd_get_calendar_ticks ());
+ session->argv[cur_shift] = g_strdup ("HSETNX");
+ session->argv_lens[cur_shift++] = sizeof ("HSETNX") - 1;
+ session->argv[cur_shift] = key->str;
+ session->argv_lens[cur_shift++] = key->len;
+ session->argv[cur_shift] = g_strdup ("C");
+ session->argv_lens[cur_shift++] = sizeof ("C") - 1;
+ session->argv[cur_shift] = value->str;
+ session->argv_lens[cur_shift++] = value->len;
+ g_string_free (key, FALSE);
+ g_string_free (value, FALSE);
+
+ if (redisAsyncCommandArgv (session->ctx, NULL, NULL,
+ 4,
+ (const gchar **)&session->argv[cur_shift - 4],
+ &session->argv_lens[cur_shift - 4]) != REDIS_OK) {
+
+ return FALSE;
+ }
+
+ /* HINCRBY */
+ key = g_string_sized_new (klen);
+ g_string_append (key, session->backend->redis_object);
+ g_string_append_len (key, cmd->digest, sizeof (cmd->digest));
+ value = g_string_sized_new (sizeof ("4294967296"));
+ rspamd_printf_gstring (value, "%d", cmd->value);
+ session->argv[cur_shift] = g_strdup ("HINCRBY");
+ session->argv_lens[cur_shift++] = sizeof ("HINCRBY") - 1;
+ session->argv[cur_shift] = key->str;
+ session->argv_lens[cur_shift++] = key->len;
+ session->argv[cur_shift] = g_strdup ("V");
+ session->argv_lens[cur_shift++] = sizeof ("V") - 1;
+ session->argv[cur_shift] = value->str;
+ session->argv_lens[cur_shift++] = value->len;
+ g_string_free (key, FALSE);
+ g_string_free (value, FALSE);
+
+ if (redisAsyncCommandArgv (session->ctx, NULL, NULL,
+ 4,
+ (const gchar **)&session->argv[cur_shift - 4],
+ &session->argv_lens[cur_shift - 4]) != REDIS_OK) {
+
+ return FALSE;
+ }
+
+ /* EXPIRE */
+ key = g_string_sized_new (klen);
+ g_string_append (key, session->backend->redis_object);
+ g_string_append_len (key, cmd->digest, sizeof (cmd->digest));
+ value = g_string_sized_new (sizeof ("4294967296"));
+ rspamd_printf_gstring (value, "%d",
+ (gint)rspamd_fuzzy_backend_get_expire (bk));
+ session->argv[cur_shift] = g_strdup ("EXPIRE");
+ session->argv_lens[cur_shift++] = sizeof ("EXPIRE") - 1;
+ session->argv[cur_shift] = key->str;
+ session->argv_lens[cur_shift++] = key->len;
+ session->argv[cur_shift] = value->str;
+ session->argv_lens[cur_shift++] = value->len;
+ g_string_free (key, FALSE);
+ g_string_free (value, FALSE);
+
+ if (redisAsyncCommandArgv (session->ctx, NULL, NULL,
+ 3,
+ (const gchar **)&session->argv[cur_shift - 3],
+ &session->argv_lens[cur_shift - 3]) != REDIS_OK) {
+
+ return FALSE;
+ }
+
+ /* INCR */
+ key = g_string_sized_new (klen);
+ g_string_append (key, session->backend->redis_object);
+ g_string_append (key, "_count");
+ session->argv[cur_shift] = g_strdup ("INCR");
+ session->argv_lens[cur_shift++] = sizeof ("INCR") - 1;
+ session->argv[cur_shift] = key->str;
+ session->argv_lens[cur_shift++] = key->len;
+ g_string_free (key, FALSE);
+
+ if (redisAsyncCommandArgv (session->ctx, NULL, NULL,
+ 2,
+ (const gchar **)&session->argv[cur_shift - 2],
+ &session->argv_lens[cur_shift - 2]) != REDIS_OK) {
+
+ return FALSE;
+ }
+ }
+ else if (cmd->cmd == FUZZY_DEL) {
+ /* DEL */
+ klen = strlen (session->backend->redis_object) +
+ sizeof (cmd->digest) + 1;
+
+ key = g_string_sized_new (klen);
+ g_string_append (key, session->backend->redis_object);
+ g_string_append_len (key, cmd->digest, sizeof (cmd->digest));
+ session->argv[cur_shift] = g_strdup ("DEL");
+ session->argv_lens[cur_shift++] = sizeof ("DEL") - 1;
+ session->argv[cur_shift] = key->str;
+ session->argv_lens[cur_shift++] = key->len;
+ g_string_free (key, FALSE);
+
+ if (redisAsyncCommandArgv (session->ctx, NULL, NULL,
+ 2,
+ (const gchar **)&session->argv[cur_shift - 2],
+ &session->argv_lens[cur_shift - 2]) != REDIS_OK) {
+
+ return FALSE;
+ }
+
+ /* DECR */
+ key = g_string_sized_new (klen);
+ g_string_append (key, session->backend->redis_object);
+ g_string_append (key, "_count");
+ session->argv[cur_shift] = g_strdup ("DECR");
+ session->argv_lens[cur_shift++] = sizeof ("DECR") - 1;
+ session->argv[cur_shift] = key->str;
+ session->argv_lens[cur_shift++] = key->len;
+ g_string_free (key, FALSE);
+
+ if (redisAsyncCommandArgv (session->ctx, NULL, NULL,
+ 2,
+ (const gchar **)&session->argv[cur_shift - 2],
+ &session->argv_lens[cur_shift - 2]) != REDIS_OK) {
+
+ return FALSE;
+ }
+ }
+ else if (cmd->cmd == FUZZY_REFRESH) {
+ /*
+ * Issue refresh command by just EXPIRE command
+ * EXPIRE <key> <expire>
+ * Where <key> is <prefix> || <digest>
+ */
+
+ klen = strlen (session->backend->redis_object) +
+ sizeof (cmd->digest) + 1;
+
+ /* EXPIRE */
+ key = g_string_sized_new (klen);
+ g_string_append (key, session->backend->redis_object);
+ g_string_append_len (key, cmd->digest, sizeof (cmd->digest));
+ value = g_string_sized_new (sizeof ("4294967296"));
+ rspamd_printf_gstring (value, "%d",
+ (gint)rspamd_fuzzy_backend_get_expire (bk));
+ session->argv[cur_shift] = g_strdup ("EXPIRE");
+ session->argv_lens[cur_shift++] = sizeof ("EXPIRE") - 1;
+ session->argv[cur_shift] = key->str;
+ session->argv_lens[cur_shift++] = key->len;
+ session->argv[cur_shift] = value->str;
+ session->argv_lens[cur_shift++] = value->len;
+ g_string_free (key, FALSE);
+ g_string_free (value, FALSE);
+
+ if (redisAsyncCommandArgv (session->ctx, NULL, NULL,
+ 3,
+ (const gchar **)&session->argv[cur_shift - 3],
+ &session->argv_lens[cur_shift - 3]) != REDIS_OK) {
+
+ return FALSE;
+ }
+ }
+ else if (cmd->cmd == FUZZY_DUP) {
+ /* Ignore */
+ }
+ else {
+ g_assert_not_reached ();
+ }
+
+ if (io_cmd->is_shingle) {
+ if (cmd->cmd == FUZZY_WRITE) {
+ klen = strlen (session->backend->redis_object) +
+ 64 + 1;
+
+ for (i = 0; i < RSPAMD_SHINGLE_SIZE; i ++) {
+ guchar *hval;
+ /*
+ * For each command with shingles we additionally emit 32 commands:
+ * SETEX <prefix>_<number>_<value> <expire> <digest>
+ */
+
+ /* SETEX */
+ key = g_string_sized_new (klen);
+ rspamd_printf_gstring (key, "%s_%d_%uL",
+ session->backend->redis_object,
+ i,
+ io_cmd->cmd.shingle.sgl.hashes[i]);
+ value = g_string_sized_new (sizeof ("4294967296"));
+ rspamd_printf_gstring (value, "%d",
+ (gint)rspamd_fuzzy_backend_get_expire (bk));
+ hval = g_malloc (sizeof (io_cmd->cmd.shingle.basic.digest));
+ memcpy (hval, io_cmd->cmd.shingle.basic.digest,
+ sizeof (io_cmd->cmd.shingle.basic.digest));
+ session->argv[cur_shift] = g_strdup ("SETEX");
+ session->argv_lens[cur_shift++] = sizeof ("SETEX") - 1;
+ session->argv[cur_shift] = key->str;
+ session->argv_lens[cur_shift++] = key->len;
+ session->argv[cur_shift] = value->str;
+ session->argv_lens[cur_shift++] = value->len;
+ session->argv[cur_shift] = hval;
+ session->argv_lens[cur_shift++] = sizeof (io_cmd->cmd.shingle.basic.digest);
+ g_string_free (key, FALSE);
+ g_string_free (value, FALSE);
+
+ if (redisAsyncCommandArgv (session->ctx, NULL, NULL,
+ 4,
+ (const gchar **)&session->argv[cur_shift - 4],
+ &session->argv_lens[cur_shift - 4]) != REDIS_OK) {
+
+ return FALSE;
+ }
+ }
+ }
+ else if (cmd->cmd == FUZZY_DEL) {
+ klen = strlen (session->backend->redis_object) +
+ 64 + 1;
+
+ for (i = 0; i < RSPAMD_SHINGLE_SIZE; i ++) {
+ key = g_string_sized_new (klen);
+ rspamd_printf_gstring (key, "%s_%d_%uL",
+ session->backend->redis_object,
+ i,
+ io_cmd->cmd.shingle.sgl.hashes[i]);
+ session->argv[cur_shift] = g_strdup ("DEL");
+ session->argv_lens[cur_shift++] = sizeof ("DEL") - 1;
+ session->argv[cur_shift] = key->str;
+ session->argv_lens[cur_shift++] = key->len;
+ g_string_free (key, FALSE);
+
+ if (redisAsyncCommandArgv (session->ctx, NULL, NULL,
+ 2,
+ (const gchar **)&session->argv[cur_shift - 2],
+ &session->argv_lens[cur_shift - 2]) != REDIS_OK) {
+
+ return FALSE;
+ }
+ }
+ }
+ else if (cmd->cmd == FUZZY_REFRESH) {
+ klen = strlen (session->backend->redis_object) +
+ 64 + 1;
+
+ for (i = 0; i < RSPAMD_SHINGLE_SIZE; i ++) {
+ /*
+ * For each command with shingles we additionally emit 32 commands:
+ * EXPIRE <prefix>_<number>_<value> <expire>
+ */
+
+ /* Expire */
+ key = g_string_sized_new (klen);
+ rspamd_printf_gstring (key, "%s_%d_%uL",
+ session->backend->redis_object,
+ i,
+ io_cmd->cmd.shingle.sgl.hashes[i]);
+ value = g_string_sized_new (sizeof ("18446744073709551616"));
+ rspamd_printf_gstring (value, "%d",
+ (gint)rspamd_fuzzy_backend_get_expire (bk));
+ session->argv[cur_shift] = g_strdup ("EXPIRE");
+ session->argv_lens[cur_shift++] = sizeof ("EXPIRE") - 1;
+ session->argv[cur_shift] = key->str;
+ session->argv_lens[cur_shift++] = key->len;
+ session->argv[cur_shift] = value->str;
+ session->argv_lens[cur_shift++] = value->len;
+ g_string_free (key, FALSE);
+ g_string_free (value, FALSE);
+
+ if (redisAsyncCommandArgv (session->ctx, NULL, NULL,
+ 3,
+ (const gchar **)&session->argv[cur_shift - 3],
+ &session->argv_lens[cur_shift - 3]) != REDIS_OK) {
+
+ return FALSE;
+ }
+ }
+ }
+ else if (cmd->cmd == FUZZY_DUP) {
+ /* Ignore */
+ }
+ else {
+ g_assert_not_reached ();
+ }
+ }
+
+ *shift = cur_shift;
+
+ return TRUE;
+}
+
+static void
+rspamd_fuzzy_redis_update_callback (redisAsyncContext *c, gpointer r,
+ gpointer priv)
+{
+ struct rspamd_fuzzy_redis_session *session = priv;
+ redisReply *reply = r;
+
+ ev_timer_stop (session->event_loop, &session->timeout);
+
+ if (c->err == 0) {
+ rspamd_upstream_ok (session->up);
+
+ if (reply->type == REDIS_REPLY_ARRAY) {
+ /* TODO: check all replies somehow */
+ if (session->callback.cb_update) {
+ session->callback.cb_update (TRUE,
+ session->nadded,
+ session->ndeleted,
+ session->nextended,
+ session->nignored,
+ session->cbdata);
+ }
+ }
+ else {
+ if (session->callback.cb_update) {
+ session->callback.cb_update (FALSE, 0, 0, 0, 0, session->cbdata);
+ }
+ }
+ }
+ else {
+ if (session->callback.cb_update) {
+ session->callback.cb_update (FALSE, 0, 0, 0, 0, session->cbdata);
+ }
+
+ if (c->errstr) {
+ msg_err_redis_session ("error sending update to redis: %s", c->errstr);
+ }
+
+ rspamd_upstream_fail (session->up, FALSE, strerror (errno));
+ }
+
+ rspamd_fuzzy_redis_session_dtor (session, FALSE);
+}
+
+void
+rspamd_fuzzy_backend_update_redis (struct rspamd_fuzzy_backend *bk,
+ GArray *updates, const gchar *src,
+ rspamd_fuzzy_update_cb cb, void *ud,
+ void *subr_ud)
+{
+ struct rspamd_fuzzy_backend_redis *backend = subr_ud;
+ struct rspamd_fuzzy_redis_session *session;
+ struct upstream *up;
+ struct upstream_list *ups;
+ rspamd_inet_addr_t *addr;
+ guint i;
+ GString *key;
+ struct fuzzy_peer_cmd *io_cmd;
+ struct rspamd_fuzzy_cmd *cmd = NULL;
+ guint nargs, ncommands, cur_shift;
+
+ g_assert (backend != NULL);
+
+ session = g_malloc0 (sizeof (*session));
+ session->backend = backend;
+ REF_RETAIN (session->backend);
+
+ /*
+ * For each normal hash addition we do 3 redis commands:
+ * HSET <key> F <flag>
+ * HINCRBY <key> V <weight>
+ * EXPIRE <key> <expire>
+ * INCR <prefix||fuzzy_count>
+ *
+ * Where <key> is <prefix> || <digest>
+ *
+ * For each command with shingles we additionally emit 32 commands:
+ * SETEX <prefix>_<number>_<value> <expire> <digest>
+ *
+ * For each delete command we emit:
+ * DEL <key>
+ *
+ * For each delete command with shingles we emit also 32 commands:
+ * DEL <prefix>_<number>_<value>
+ * DECR <prefix||fuzzy_count>
+ */
+
+ ncommands = 3; /* For MULTI + EXEC + INCR <src> */
+ nargs = 4;
+
+ for (i = 0; i < updates->len; i ++) {
+ io_cmd = &g_array_index (updates, struct fuzzy_peer_cmd, i);
+
+ if (io_cmd->is_shingle) {
+ cmd = &io_cmd->cmd.shingle.basic;
+ }
+ else {
+ cmd = &io_cmd->cmd.normal;
+ }
+
+ if (cmd->cmd == FUZZY_WRITE) {
+ ncommands += 5;
+ nargs += 17;
+ session->nadded ++;
+
+ if (io_cmd->is_shingle) {
+ ncommands += RSPAMD_SHINGLE_SIZE;
+ nargs += RSPAMD_SHINGLE_SIZE * 4;
+ }
+
+ }
+ else if (cmd->cmd == FUZZY_DEL) {
+ ncommands += 2;
+ nargs += 4;
+ session->ndeleted ++;
+
+ if (io_cmd->is_shingle) {
+ ncommands += RSPAMD_SHINGLE_SIZE;
+ nargs += RSPAMD_SHINGLE_SIZE * 2;
+ }
+ }
+ else if (cmd->cmd == FUZZY_REFRESH) {
+ ncommands += 1;
+ nargs += 3;
+ session->nextended ++;
+
+ if (io_cmd->is_shingle) {
+ ncommands += RSPAMD_SHINGLE_SIZE;
+ nargs += RSPAMD_SHINGLE_SIZE * 3;
+ }
+ }
+ else {
+ session->nignored ++;
+ }
+ }
+
+ /* Now we need to create a new request */
+ session->callback.cb_update = cb;
+ session->cbdata = ud;
+ session->command = RSPAMD_FUZZY_REDIS_COMMAND_UPDATES;
+ session->cmd = cmd;
+ session->prob = 1.0;
+ session->event_loop = rspamd_fuzzy_backend_event_base (bk);
+
+ /* First of all check digest */
+ session->nargs = nargs;
+ session->argv = g_malloc0 (sizeof (gchar *) * session->nargs);
+ session->argv_lens = g_malloc0 (sizeof (gsize) * session->nargs);
+
+ ups = rspamd_redis_get_servers (backend, "write_servers");
+ up = rspamd_upstream_get (ups,
+ RSPAMD_UPSTREAM_MASTER_SLAVE,
+ NULL,
+ 0);
+
+ session->up = up;
+ addr = rspamd_upstream_addr_next (up);
+ g_assert (addr != NULL);
+ session->ctx = rspamd_redis_pool_connect (backend->pool,
+ backend->dbname, backend->password,
+ rspamd_inet_address_to_string (addr),
+ rspamd_inet_address_get_port (addr));
+
+ if (session->ctx == NULL) {
+ rspamd_upstream_fail (up, TRUE, strerror (errno));
+ rspamd_fuzzy_redis_session_dtor (session, TRUE);
+
+ if (cb) {
+ cb (FALSE, 0, 0, 0, 0, ud);
+ }
+ }
+ else {
+ /* Start with MULTI command */
+ session->argv[0] = g_strdup ("MULTI");
+ session->argv_lens[0] = 5;
+
+ if (redisAsyncCommandArgv (session->ctx, NULL, NULL,
+ 1,
+ (const gchar **)session->argv,
+ session->argv_lens) != REDIS_OK) {
+
+ if (cb) {
+ cb (FALSE, 0, 0, 0, 0, ud);
+ }
+ rspamd_fuzzy_redis_session_dtor (session, TRUE);
+
+ return;
+ }
+
+ /* Now split the rest of commands in packs and emit them command by command */
+ cur_shift = 1;
+
+ for (i = 0; i < updates->len; i ++) {
+ io_cmd = &g_array_index (updates, struct fuzzy_peer_cmd, i);
+
+ if (!rspamd_fuzzy_update_append_command (bk, session, io_cmd,
+ &cur_shift)) {
+ if (cb) {
+ cb (FALSE, 0, 0, 0, 0, ud);
+ }
+ rspamd_fuzzy_redis_session_dtor (session, TRUE);
+
+ return;
+ }
+ }
+
+ /* Now INCR command for the source */
+ key = g_string_new (backend->redis_object);
+ g_string_append (key, src);
+ session->argv[cur_shift] = g_strdup ("INCR");
+ session->argv_lens[cur_shift ++] = 4;
+ session->argv[cur_shift] = key->str;
+ session->argv_lens[cur_shift ++] = key->len;
+ g_string_free (key, FALSE);
+
+ if (redisAsyncCommandArgv (session->ctx, NULL, NULL,
+ 2,
+ (const gchar **)&session->argv[cur_shift - 2],
+ &session->argv_lens[cur_shift - 2]) != REDIS_OK) {
+
+ if (cb) {
+ cb (FALSE, 0, 0, 0, 0, ud);
+ }
+ rspamd_fuzzy_redis_session_dtor (session, TRUE);
+
+ return;
+ }
+
+ /* Finally we call EXEC with a specific callback */
+ session->argv[cur_shift] = g_strdup ("EXEC");
+ session->argv_lens[cur_shift] = 4;
+
+ if (redisAsyncCommandArgv (session->ctx,
+ rspamd_fuzzy_redis_update_callback, session,
+ 1,
+ (const gchar **)&session->argv[cur_shift],
+ &session->argv_lens[cur_shift]) != REDIS_OK) {
+
+ if (cb) {
+ cb (FALSE, 0, 0, 0, 0, ud);
+ }
+ rspamd_fuzzy_redis_session_dtor (session, TRUE);
+
+ return;
+ }
+ else {
+ /* Add timeout */
+ session->timeout.data = session;
+ ev_timer_init (&session->timeout,
+ rspamd_fuzzy_redis_timeout,
+ session->backend->timeout, 0.0);
+ ev_timer_start (session->event_loop, &session->timeout);
+ }
+ }
+}
+
+void
+rspamd_fuzzy_backend_close_redis (struct rspamd_fuzzy_backend *bk,
+ void *subr_ud)
+{
+ struct rspamd_fuzzy_backend_redis *backend = subr_ud;
+
+ g_assert (backend != NULL);
+
+ REF_RELEASE (backend);
+}
diff --git a/src/libserver/fuzzy_backend/fuzzy_backend_redis.h b/src/libserver/fuzzy_backend/fuzzy_backend_redis.h
new file mode 100644
index 000000000..544b20f60
--- /dev/null
+++ b/src/libserver/fuzzy_backend/fuzzy_backend_redis.h
@@ -0,0 +1,67 @@
+/*-
+ * Copyright 2016 Vsevolod Stakhov
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef SRC_LIBSERVER_FUZZY_BACKEND_REDIS_H_
+#define SRC_LIBSERVER_FUZZY_BACKEND_REDIS_H_
+
+#include "config.h"
+#include "fuzzy_backend.h"
+
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+/*
+ * Subroutines for fuzzy_backend
+ */
+void *rspamd_fuzzy_backend_init_redis (struct rspamd_fuzzy_backend *bk,
+ const ucl_object_t *obj,
+ struct rspamd_config *cfg,
+ GError **err);
+
+void rspamd_fuzzy_backend_check_redis (struct rspamd_fuzzy_backend *bk,
+ const struct rspamd_fuzzy_cmd *cmd,
+ rspamd_fuzzy_check_cb cb, void *ud,
+ void *subr_ud);
+
+void rspamd_fuzzy_backend_update_redis (struct rspamd_fuzzy_backend *bk,
+ GArray *updates, const gchar *src,
+ rspamd_fuzzy_update_cb cb, void *ud,
+ void *subr_ud);
+
+void rspamd_fuzzy_backend_count_redis (struct rspamd_fuzzy_backend *bk,
+ rspamd_fuzzy_count_cb cb, void *ud,
+ void *subr_ud);
+
+void rspamd_fuzzy_backend_version_redis (struct rspamd_fuzzy_backend *bk,
+ const gchar *src,
+ rspamd_fuzzy_version_cb cb, void *ud,
+ void *subr_ud);
+
+const gchar *rspamd_fuzzy_backend_id_redis (struct rspamd_fuzzy_backend *bk,
+ void *subr_ud);
+
+void rspamd_fuzzy_backend_expire_redis (struct rspamd_fuzzy_backend *bk,
+ void *subr_ud);
+
+void rspamd_fuzzy_backend_close_redis (struct rspamd_fuzzy_backend *bk,
+ void *subr_ud);
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif /* SRC_LIBSERVER_FUZZY_BACKEND_REDIS_H_ */
diff --git a/src/libserver/fuzzy_backend/fuzzy_backend_sqlite.c b/src/libserver/fuzzy_backend/fuzzy_backend_sqlite.c
new file mode 100644
index 000000000..0f9b3c1ee
--- /dev/null
+++ b/src/libserver/fuzzy_backend/fuzzy_backend_sqlite.c
@@ -0,0 +1,1065 @@
+/*-
+ * Copyright 2016 Vsevolod Stakhov
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "config.h"
+#include "rspamd.h"
+#include "fuzzy_backend.h"
+#include "fuzzy_backend_sqlite.h"
+#include "unix-std.h"
+
+#include <sqlite3.h>
+#include "libutil/sqlite_utils.h"
+
+struct rspamd_fuzzy_backend_sqlite {
+ sqlite3 *db;
+ char *path;
+ gchar id[MEMPOOL_UID_LEN];
+ gsize count;
+ gsize expired;
+ rspamd_mempool_t *pool;
+};
+
+static const gdouble sql_sleep_time = 0.1;
+static const guint max_retries = 10;
+
+#define msg_err_fuzzy_backend(...) rspamd_default_log_function (G_LOG_LEVEL_CRITICAL, \
+ backend->pool->tag.tagname, backend->pool->tag.uid, \
+ G_STRFUNC, \
+ __VA_ARGS__)
+#define msg_warn_fuzzy_backend(...) rspamd_default_log_function (G_LOG_LEVEL_WARNING, \
+ backend->pool->tag.tagname, backend->pool->tag.uid, \
+ G_STRFUNC, \
+ __VA_ARGS__)
+#define msg_info_fuzzy_backend(...) rspamd_default_log_function (G_LOG_LEVEL_INFO, \
+ backend->pool->tag.tagname, backend->pool->tag.uid, \
+ G_STRFUNC, \
+ __VA_ARGS__)
+#define msg_debug_fuzzy_backend(...) rspamd_conditional_debug_fast (NULL, NULL, \
+ rspamd_fuzzy_sqlite_log_id, backend->pool->tag.tagname, backend->pool->tag.uid, \
+ G_STRFUNC, \
+ __VA_ARGS__)
+
+INIT_LOG_MODULE(fuzzy_sqlite)
+
+static const char *create_tables_sql =
+ "BEGIN;"
+ "CREATE TABLE IF NOT EXISTS digests("
+ " id INTEGER PRIMARY KEY,"
+ " flag INTEGER NOT NULL,"
+ " digest TEXT NOT NULL,"
+ " value INTEGER,"
+ " time INTEGER);"
+ "CREATE TABLE IF NOT EXISTS shingles("
+ " value INTEGER NOT NULL,"
+ " number INTEGER NOT NULL,"
+ " digest_id INTEGER REFERENCES digests(id) ON DELETE CASCADE "
+ " ON UPDATE CASCADE);"
+ "CREATE TABLE IF NOT EXISTS sources("
+ " name TEXT UNIQUE,"
+ " version INTEGER,"
+ " last INTEGER);"
+ "CREATE UNIQUE INDEX IF NOT EXISTS d ON digests(digest);"
+ "CREATE INDEX IF NOT EXISTS t ON digests(time);"
+ "CREATE INDEX IF NOT EXISTS dgst_id ON shingles(digest_id);"
+ "CREATE UNIQUE INDEX IF NOT EXISTS s ON shingles(value, number);"
+ "COMMIT;";
+#if 0
+static const char *create_index_sql =
+ "BEGIN;"
+ "CREATE UNIQUE INDEX IF NOT EXISTS d ON digests(digest);"
+ "CREATE INDEX IF NOT EXISTS t ON digests(time);"
+ "CREATE INDEX IF NOT EXISTS dgst_id ON shingles(digest_id);"
+ "CREATE UNIQUE INDEX IF NOT EXISTS s ON shingles(value, number);"
+ "COMMIT;";
+#endif
+enum rspamd_fuzzy_statement_idx {
+ RSPAMD_FUZZY_BACKEND_TRANSACTION_START = 0,
+ RSPAMD_FUZZY_BACKEND_TRANSACTION_COMMIT,
+ RSPAMD_FUZZY_BACKEND_TRANSACTION_ROLLBACK,
+ RSPAMD_FUZZY_BACKEND_INSERT,
+ RSPAMD_FUZZY_BACKEND_UPDATE,
+ RSPAMD_FUZZY_BACKEND_UPDATE_FLAG,
+ RSPAMD_FUZZY_BACKEND_INSERT_SHINGLE,
+ RSPAMD_FUZZY_BACKEND_CHECK,
+ RSPAMD_FUZZY_BACKEND_CHECK_SHINGLE,
+ RSPAMD_FUZZY_BACKEND_GET_DIGEST_BY_ID,
+ RSPAMD_FUZZY_BACKEND_DELETE,
+ RSPAMD_FUZZY_BACKEND_COUNT,
+ RSPAMD_FUZZY_BACKEND_EXPIRE,
+ RSPAMD_FUZZY_BACKEND_VACUUM,
+ RSPAMD_FUZZY_BACKEND_DELETE_ORPHANED,
+ RSPAMD_FUZZY_BACKEND_ADD_SOURCE,
+ RSPAMD_FUZZY_BACKEND_VERSION,
+ RSPAMD_FUZZY_BACKEND_SET_VERSION,
+ RSPAMD_FUZZY_BACKEND_MAX
+};
+static struct rspamd_fuzzy_stmts {
+ enum rspamd_fuzzy_statement_idx idx;
+ const gchar *sql;
+ const gchar *args;
+ sqlite3_stmt *stmt;
+ gint result;
+} prepared_stmts[RSPAMD_FUZZY_BACKEND_MAX] =
+{
+ {
+ .idx = RSPAMD_FUZZY_BACKEND_TRANSACTION_START,
+ .sql = "BEGIN TRANSACTION;",
+ .args = "",
+ .stmt = NULL,
+ .result = SQLITE_DONE
+ },
+ {
+ .idx = RSPAMD_FUZZY_BACKEND_TRANSACTION_COMMIT,
+ .sql = "COMMIT;",
+ .args = "",
+ .stmt = NULL,
+ .result = SQLITE_DONE
+ },
+ {
+ .idx = RSPAMD_FUZZY_BACKEND_TRANSACTION_ROLLBACK,
+ .sql = "ROLLBACK;",
+ .args = "",
+ .stmt = NULL,
+ .result = SQLITE_DONE
+ },
+ {
+ .idx = RSPAMD_FUZZY_BACKEND_INSERT,
+ .sql = "INSERT INTO digests(flag, digest, value, time) VALUES"
+ "(?1, ?2, ?3, strftime('%s','now'));",
+ .args = "SDI",
+ .stmt = NULL,
+ .result = SQLITE_DONE
+ },
+ {
+ .idx = RSPAMD_FUZZY_BACKEND_UPDATE,
+ .sql = "UPDATE digests SET value = value + ?1, time = strftime('%s','now') WHERE "
+ "digest==?2;",
+ .args = "ID",
+ .stmt = NULL,
+ .result = SQLITE_DONE
+ },
+ {
+ .idx = RSPAMD_FUZZY_BACKEND_UPDATE_FLAG,
+ .sql = "UPDATE digests SET value = ?1, flag = ?2, time = strftime('%s','now') WHERE "
+ "digest==?3;",
+ .args = "IID",
+ .stmt = NULL,
+ .result = SQLITE_DONE
+ },
+ {
+ .idx = RSPAMD_FUZZY_BACKEND_INSERT_SHINGLE,
+ .sql = "INSERT OR REPLACE INTO shingles(value, number, digest_id) "
+ "VALUES (?1, ?2, ?3);",
+ .args = "III",
+ .stmt = NULL,
+ .result = SQLITE_DONE
+ },
+ {
+ .idx = RSPAMD_FUZZY_BACKEND_CHECK,
+ .sql = "SELECT value, time, flag FROM digests WHERE digest==?1;",
+ .args = "D",
+ .stmt = NULL,
+ .result = SQLITE_ROW
+ },
+ {
+ .idx = RSPAMD_FUZZY_BACKEND_CHECK_SHINGLE,
+ .sql = "SELECT digest_id FROM shingles WHERE value=?1 AND number=?2",
+ .args = "IS",
+ .stmt = NULL,
+ .result = SQLITE_ROW
+ },
+ {
+ .idx = RSPAMD_FUZZY_BACKEND_GET_DIGEST_BY_ID,
+ .sql = "SELECT digest, value, time, flag FROM digests WHERE id=?1",
+ .args = "I",
+ .stmt = NULL,
+ .result = SQLITE_ROW
+ },
+ {
+ .idx = RSPAMD_FUZZY_BACKEND_DELETE,
+ .sql = "DELETE FROM digests WHERE digest==?1;",
+ .args = "D",
+ .stmt = NULL,
+ .result = SQLITE_DONE
+ },
+ {
+ .idx = RSPAMD_FUZZY_BACKEND_COUNT,
+ .sql = "SELECT COUNT(*) FROM digests;",
+ .args = "",
+ .stmt = NULL,
+ .result = SQLITE_ROW
+ },
+ {
+ .idx = RSPAMD_FUZZY_BACKEND_EXPIRE,
+ .sql = "DELETE FROM digests WHERE id IN (SELECT id FROM digests WHERE time < ?1 LIMIT ?2);",
+ .args = "II",
+ .stmt = NULL,
+ .result = SQLITE_DONE
+ },
+ {
+ .idx = RSPAMD_FUZZY_BACKEND_VACUUM,
+ .sql = "VACUUM;",
+ .args = "",
+ .stmt = NULL,
+ .result = SQLITE_DONE
+ },
+ {
+ .idx = RSPAMD_FUZZY_BACKEND_DELETE_ORPHANED,
+ .sql = "DELETE FROM shingles WHERE value=?1 AND number=?2;",
+ .args = "II",
+ .stmt = NULL,
+ .result = SQLITE_DONE
+ },
+ {
+ .idx = RSPAMD_FUZZY_BACKEND_ADD_SOURCE,
+ .sql = "INSERT OR IGNORE INTO sources(name, version, last) VALUES (?1, ?2, ?3);",
+ .args = "TII",
+ .stmt = NULL,
+ .result = SQLITE_DONE
+ },
+ {
+ .idx = RSPAMD_FUZZY_BACKEND_VERSION,
+ .sql = "SELECT version FROM sources WHERE name=?1;",
+ .args = "T",
+ .stmt = NULL,
+ .result = SQLITE_ROW
+ },
+ {
+ .idx = RSPAMD_FUZZY_BACKEND_SET_VERSION,
+ .sql = "INSERT OR REPLACE INTO sources (name, version, last) VALUES (?3, ?1, ?2);",
+ .args = "IIT",
+ .stmt = NULL,
+ .result = SQLITE_DONE
+ },
+};
+
+static GQuark
+rspamd_fuzzy_backend_sqlite_quark (void)
+{
+ return g_quark_from_static_string ("fuzzy-backend-sqlite");
+}
+
+static gboolean
+rspamd_fuzzy_backend_sqlite_prepare_stmts (struct rspamd_fuzzy_backend_sqlite *bk, GError **err)
+{
+ int i;
+
+ for (i = 0; i < RSPAMD_FUZZY_BACKEND_MAX; i ++) {
+ if (prepared_stmts[i].stmt != NULL) {
+ /* Skip already prepared statements */
+ continue;
+ }
+ if (sqlite3_prepare_v2 (bk->db, prepared_stmts[i].sql, -1,
+ &prepared_stmts[i].stmt, NULL) != SQLITE_OK) {
+ g_set_error (err, rspamd_fuzzy_backend_sqlite_quark (),
+ -1, "Cannot initialize prepared sql `%s`: %s",
+ prepared_stmts[i].sql, sqlite3_errmsg (bk->db));
+
+ return FALSE;
+ }
+ }
+
+ return TRUE;
+}
+
+static int
+rspamd_fuzzy_backend_sqlite_cleanup_stmt (struct rspamd_fuzzy_backend_sqlite *backend,
+ int idx)
+{
+ sqlite3_stmt *stmt;
+
+ if (idx < 0 || idx >= RSPAMD_FUZZY_BACKEND_MAX) {
+
+ return -1;
+ }
+
+ msg_debug_fuzzy_backend ("resetting `%s`", prepared_stmts[idx].sql);
+ stmt = prepared_stmts[idx].stmt;
+ sqlite3_clear_bindings (stmt);
+ sqlite3_reset (stmt);
+
+ return SQLITE_OK;
+}
+
+static int
+rspamd_fuzzy_backend_sqlite_run_stmt (struct rspamd_fuzzy_backend_sqlite *backend,
+ gboolean auto_cleanup,
+ int idx, ...)
+{
+ int retcode;
+ va_list ap;
+ sqlite3_stmt *stmt;
+ int i;
+ const char *argtypes;
+ guint retries = 0;
+ struct timespec ts;
+
+ if (idx < 0 || idx >= RSPAMD_FUZZY_BACKEND_MAX) {
+
+ return -1;
+ }
+
+ stmt = prepared_stmts[idx].stmt;
+ g_assert ((int)prepared_stmts[idx].idx == idx);
+
+ if (stmt == NULL) {
+ if ((retcode = sqlite3_prepare_v2 (backend->db, prepared_stmts[idx].sql, -1,
+ &prepared_stmts[idx].stmt, NULL)) != SQLITE_OK) {
+ msg_err_fuzzy_backend ("Cannot initialize prepared sql `%s`: %s",
+ prepared_stmts[idx].sql, sqlite3_errmsg (backend->db));
+
+ return retcode;
+ }
+ stmt = prepared_stmts[idx].stmt;
+ }
+
+ msg_debug_fuzzy_backend ("executing `%s` %s auto cleanup",
+ prepared_stmts[idx].sql, auto_cleanup ? "with" : "without");
+ argtypes = prepared_stmts[idx].args;
+ sqlite3_clear_bindings (stmt);
+ sqlite3_reset (stmt);
+ va_start (ap, idx);
+
+ for (i = 0; argtypes[i] != '\0'; i++) {
+ switch (argtypes[i]) {
+ case 'T':
+ sqlite3_bind_text (stmt, i + 1, va_arg (ap, const char*), -1,
+ SQLITE_STATIC);
+ break;
+ case 'I':
+ sqlite3_bind_int64 (stmt, i + 1, va_arg (ap, gint64));
+ break;
+ case 'S':
+ sqlite3_bind_int (stmt, i + 1, va_arg (ap, gint));
+ break;
+ case 'D':
+ /* Special case for digests variable */
+ sqlite3_bind_text (stmt, i + 1, va_arg (ap, const char*), 64,
+ SQLITE_STATIC);
+ break;
+ }
+ }
+
+ va_end (ap);
+
+retry:
+ retcode = sqlite3_step (stmt);
+
+ if (retcode == prepared_stmts[idx].result) {
+ retcode = SQLITE_OK;
+ }
+ else {
+ if ((retcode == SQLITE_BUSY ||
+ retcode == SQLITE_LOCKED) && retries++ < max_retries) {
+ double_to_ts (sql_sleep_time, &ts);
+ nanosleep (&ts, NULL);
+ goto retry;
+ }
+
+ msg_debug_fuzzy_backend ("failed to execute query %s: %d, %s", prepared_stmts[idx].sql,
+ retcode, sqlite3_errmsg (backend->db));
+ }
+
+ if (auto_cleanup) {
+ sqlite3_clear_bindings (stmt);
+ sqlite3_reset (stmt);
+ }
+
+ return retcode;
+}
+
+static void
+rspamd_fuzzy_backend_sqlite_close_stmts (struct rspamd_fuzzy_backend_sqlite *bk)
+{
+ int i;
+
+ for (i = 0; i < RSPAMD_FUZZY_BACKEND_MAX; i++) {
+ if (prepared_stmts[i].stmt != NULL) {
+ sqlite3_finalize (prepared_stmts[i].stmt);
+ prepared_stmts[i].stmt = NULL;
+ }
+ }
+
+ return;
+}
+
+static gboolean
+rspamd_fuzzy_backend_sqlite_run_sql (const gchar *sql, struct rspamd_fuzzy_backend_sqlite *bk,
+ GError **err)
+{
+ guint retries = 0;
+ struct timespec ts;
+ gint ret;
+
+ do {
+ ret = sqlite3_exec (bk->db, sql, NULL, NULL, NULL);
+ double_to_ts (sql_sleep_time, &ts);
+ } while (ret == SQLITE_BUSY && retries++ < max_retries &&
+ nanosleep (&ts, NULL) == 0);
+
+ if (ret != SQLITE_OK) {
+ g_set_error (err, rspamd_fuzzy_backend_sqlite_quark (),
+ -1, "Cannot execute raw sql `%s`: %s",
+ sql, sqlite3_errmsg (bk->db));
+ return FALSE;
+ }
+
+ return TRUE;
+}
+
+static struct rspamd_fuzzy_backend_sqlite *
+rspamd_fuzzy_backend_sqlite_open_db (const gchar *path, GError **err)
+{
+ struct rspamd_fuzzy_backend_sqlite *bk;
+ rspamd_cryptobox_hash_state_t st;
+ guchar hash_out[rspamd_cryptobox_HASHBYTES];
+
+ g_assert (path != NULL);
+
+ bk = g_malloc0 (sizeof (*bk));
+ bk->path = g_strdup (path);
+ bk->expired = 0;
+ bk->pool = rspamd_mempool_new (rspamd_mempool_suggest_size (),
+ "fuzzy_backend", 0);
+ bk->db = rspamd_sqlite3_open_or_create (bk->pool, bk->path,
+ create_tables_sql, 1, err);
+
+ if (bk->db == NULL) {
+ rspamd_fuzzy_backend_sqlite_close (bk);
+
+ return NULL;
+ }
+
+ if (!rspamd_fuzzy_backend_sqlite_prepare_stmts (bk, err)) {
+ rspamd_fuzzy_backend_sqlite_close (bk);
+
+ return NULL;
+ }
+
+ /* Set id for the backend */
+ rspamd_cryptobox_hash_init (&st, NULL, 0);
+ rspamd_cryptobox_hash_update (&st, path, strlen (path));
+ rspamd_cryptobox_hash_final (&st, hash_out);
+ rspamd_snprintf (bk->id, sizeof (bk->id), "%xs", hash_out);
+ memcpy (bk->pool->tag.uid, bk->id, sizeof (bk->pool->tag.uid));
+
+ return bk;
+}
+
+struct rspamd_fuzzy_backend_sqlite *
+rspamd_fuzzy_backend_sqlite_open (const gchar *path,
+ gboolean vacuum,
+ GError **err)
+{
+ struct rspamd_fuzzy_backend_sqlite *backend;
+
+ if (path == NULL) {
+ g_set_error (err, rspamd_fuzzy_backend_sqlite_quark (),
+ ENOENT, "Path has not been specified");
+ return NULL;
+ }
+
+ /* Open database */
+ if ((backend = rspamd_fuzzy_backend_sqlite_open_db (path, err)) == NULL) {
+ return NULL;
+ }
+
+ if (rspamd_fuzzy_backend_sqlite_run_stmt (backend, FALSE, RSPAMD_FUZZY_BACKEND_COUNT)
+ == SQLITE_OK) {
+ backend->count = sqlite3_column_int64 (
+ prepared_stmts[RSPAMD_FUZZY_BACKEND_COUNT].stmt, 0);
+ }
+
+ rspamd_fuzzy_backend_sqlite_cleanup_stmt (backend, RSPAMD_FUZZY_BACKEND_COUNT);
+
+ return backend;
+}
+
+static gint
+rspamd_fuzzy_backend_sqlite_int64_cmp (const void *a, const void *b)
+{
+ gint64 ia = *(gint64 *)a, ib = *(gint64 *)b;
+
+ return (ia - ib);
+}
+
+struct rspamd_fuzzy_reply
+rspamd_fuzzy_backend_sqlite_check (struct rspamd_fuzzy_backend_sqlite *backend,
+ const struct rspamd_fuzzy_cmd *cmd, gint64 expire)
+{
+ struct rspamd_fuzzy_reply rep;
+ const struct rspamd_fuzzy_shingle_cmd *shcmd;
+ int rc;
+ gint64 timestamp;
+ gint64 shingle_values[RSPAMD_SHINGLE_SIZE], i, sel_id, cur_id,
+ cur_cnt, max_cnt;
+
+ memset (&rep, 0, sizeof (rep));
+ memcpy (rep.digest, cmd->digest, sizeof (rep.digest));
+
+ if (backend == NULL) {
+ return rep;
+ }
+
+ /* Try direct match first of all */
+ rspamd_fuzzy_backend_sqlite_run_stmt (backend, TRUE,
+ RSPAMD_FUZZY_BACKEND_TRANSACTION_START);
+ rc = rspamd_fuzzy_backend_sqlite_run_stmt (backend, FALSE,
+ RSPAMD_FUZZY_BACKEND_CHECK,
+ cmd->digest);
+
+ if (rc == SQLITE_OK) {
+ timestamp = sqlite3_column_int64 (
+ prepared_stmts[RSPAMD_FUZZY_BACKEND_CHECK].stmt, 1);
+ if (time (NULL) - timestamp > expire) {
+ /* Expire element */
+ msg_debug_fuzzy_backend ("requested hash has been expired");
+ }
+ else {
+ rep.v1.value = sqlite3_column_int64 (
+ prepared_stmts[RSPAMD_FUZZY_BACKEND_CHECK].stmt, 0);
+ rep.v1.prob = 1.0;
+ rep.v1.flag = sqlite3_column_int (
+ prepared_stmts[RSPAMD_FUZZY_BACKEND_CHECK].stmt, 2);
+ }
+ }
+ else if (cmd->shingles_count > 0) {
+ /* Fuzzy match */
+
+ rspamd_fuzzy_backend_sqlite_cleanup_stmt (backend, RSPAMD_FUZZY_BACKEND_CHECK);
+ shcmd = (const struct rspamd_fuzzy_shingle_cmd *)cmd;
+
+ for (i = 0; i < RSPAMD_SHINGLE_SIZE; i ++) {
+ rc = rspamd_fuzzy_backend_sqlite_run_stmt (backend, FALSE,
+ RSPAMD_FUZZY_BACKEND_CHECK_SHINGLE,
+ shcmd->sgl.hashes[i], i);
+ if (rc == SQLITE_OK) {
+ shingle_values[i] = sqlite3_column_int64 (
+ prepared_stmts[RSPAMD_FUZZY_BACKEND_CHECK_SHINGLE].stmt,
+ 0);
+ }
+ else {
+ shingle_values[i] = -1;
+ }
+ msg_debug_fuzzy_backend ("looking for shingle %L -> %L: %d", i,
+ shcmd->sgl.hashes[i], rc);
+ }
+
+ rspamd_fuzzy_backend_sqlite_cleanup_stmt (backend,
+ RSPAMD_FUZZY_BACKEND_CHECK_SHINGLE);
+
+ qsort (shingle_values, RSPAMD_SHINGLE_SIZE, sizeof (gint64),
+ rspamd_fuzzy_backend_sqlite_int64_cmp);
+ sel_id = -1;
+ cur_id = -1;
+ cur_cnt = 0;
+ max_cnt = 0;
+
+ for (i = 0; i < RSPAMD_SHINGLE_SIZE; i ++) {
+ if (shingle_values[i] == -1) {
+ continue;
+ }
+
+ /* We have some value here, so we need to check it */
+ if (shingle_values[i] == cur_id) {
+ cur_cnt ++;
+ }
+ else {
+ cur_id = shingle_values[i];
+ if (cur_cnt >= max_cnt) {
+ max_cnt = cur_cnt;
+ sel_id = cur_id;
+ }
+ cur_cnt = 0;
+ }
+ }
+
+ if (cur_cnt > max_cnt) {
+ max_cnt = cur_cnt;
+ }
+
+ if (sel_id != -1) {
+ /* We have some id selected here */
+ rep.v1.prob = (float)max_cnt / (float)RSPAMD_SHINGLE_SIZE;
+
+ if (rep.v1.prob > 0.5) {
+ msg_debug_fuzzy_backend (
+ "found fuzzy hash with probability %.2f",
+ rep.v1.prob);
+ rc = rspamd_fuzzy_backend_sqlite_run_stmt (backend, FALSE,
+ RSPAMD_FUZZY_BACKEND_GET_DIGEST_BY_ID, sel_id);
+ if (rc == SQLITE_OK) {
+ timestamp = sqlite3_column_int64 (
+ prepared_stmts[RSPAMD_FUZZY_BACKEND_GET_DIGEST_BY_ID].stmt,
+ 2);
+ if (time (NULL) - timestamp > expire) {
+ /* Expire element */
+ msg_debug_fuzzy_backend (
+ "requested hash has been expired");
+ rep.v1.prob = 0.0;
+ }
+ else {
+ rep.ts = timestamp;
+ memcpy (rep.digest, sqlite3_column_blob (
+ prepared_stmts[RSPAMD_FUZZY_BACKEND_GET_DIGEST_BY_ID].stmt,
+ 0), sizeof (rep.digest));
+ rep.v1.value = sqlite3_column_int64 (
+ prepared_stmts[RSPAMD_FUZZY_BACKEND_GET_DIGEST_BY_ID].stmt,
+ 1);
+ rep.v1.flag = sqlite3_column_int (
+ prepared_stmts[RSPAMD_FUZZY_BACKEND_GET_DIGEST_BY_ID].stmt,
+ 3);
+ }
+ }
+ }
+ else {
+ /* Otherwise we assume that as error */
+ rep.v1.value = 0;
+ }
+
+ rspamd_fuzzy_backend_sqlite_cleanup_stmt (backend,
+ RSPAMD_FUZZY_BACKEND_GET_DIGEST_BY_ID);
+ }
+ }
+
+ rspamd_fuzzy_backend_sqlite_cleanup_stmt (backend, RSPAMD_FUZZY_BACKEND_CHECK);
+ rspamd_fuzzy_backend_sqlite_run_stmt (backend, TRUE,
+ RSPAMD_FUZZY_BACKEND_TRANSACTION_COMMIT);
+
+ return rep;
+}
+
+gboolean
+rspamd_fuzzy_backend_sqlite_prepare_update (struct rspamd_fuzzy_backend_sqlite *backend,
+ const gchar *source)
+{
+ gint rc;
+
+ if (backend == NULL) {
+ return FALSE;
+ }
+
+ rc = rspamd_fuzzy_backend_sqlite_run_stmt (backend, TRUE,
+ RSPAMD_FUZZY_BACKEND_TRANSACTION_START);
+
+ if (rc != SQLITE_OK) {
+ msg_warn_fuzzy_backend ("cannot start transaction for updates: %s",
+ sqlite3_errmsg (backend->db));
+ return FALSE;
+ }
+
+ return TRUE;
+}
+
+gboolean
+rspamd_fuzzy_backend_sqlite_add (struct rspamd_fuzzy_backend_sqlite *backend,
+ const struct rspamd_fuzzy_cmd *cmd)
+{
+ int rc, i;
+ gint64 id, flag;
+ const struct rspamd_fuzzy_shingle_cmd *shcmd;
+
+ if (backend == NULL) {
+ return FALSE;
+ }
+
+ rc = rspamd_fuzzy_backend_sqlite_run_stmt (backend, FALSE,
+ RSPAMD_FUZZY_BACKEND_CHECK,
+ cmd->digest);
+
+ if (rc == SQLITE_OK) {
+ /* Check flag */
+ flag = sqlite3_column_int64 (
+ prepared_stmts[RSPAMD_FUZZY_BACKEND_CHECK].stmt,
+ 2);
+ rspamd_fuzzy_backend_sqlite_cleanup_stmt (backend, RSPAMD_FUZZY_BACKEND_CHECK);
+
+ if (flag == cmd->flag) {
+ /* We need to increase weight */
+ rc = rspamd_fuzzy_backend_sqlite_run_stmt (backend, TRUE,
+ RSPAMD_FUZZY_BACKEND_UPDATE,
+ (gint64) cmd->value,
+ cmd->digest);
+ if (rc != SQLITE_OK) {
+ msg_warn_fuzzy_backend ("cannot update hash to %d -> "
+ "%*xs: %s", (gint) cmd->flag,
+ (gint) sizeof (cmd->digest), cmd->digest,
+ sqlite3_errmsg (backend->db));
+ }
+ }
+ else {
+ /* We need to relearn actually */
+
+ rc = rspamd_fuzzy_backend_sqlite_run_stmt (backend, TRUE,
+ RSPAMD_FUZZY_BACKEND_UPDATE_FLAG,
+ (gint64) cmd->value,
+ (gint64) cmd->flag,
+ cmd->digest);
+
+ if (rc != SQLITE_OK) {
+ msg_warn_fuzzy_backend ("cannot update hash to %d -> "
+ "%*xs: %s", (gint) cmd->flag,
+ (gint) sizeof (cmd->digest), cmd->digest,
+ sqlite3_errmsg (backend->db));
+ }
+ }
+ }
+ else {
+ rspamd_fuzzy_backend_sqlite_cleanup_stmt (backend, RSPAMD_FUZZY_BACKEND_CHECK);
+ rc = rspamd_fuzzy_backend_sqlite_run_stmt (backend, FALSE,
+ RSPAMD_FUZZY_BACKEND_INSERT,
+ (gint) cmd->flag,
+ cmd->digest,
+ (gint64) cmd->value);
+
+ if (rc == SQLITE_OK) {
+ if (cmd->shingles_count > 0) {
+ id = sqlite3_last_insert_rowid (backend->db);
+ shcmd = (const struct rspamd_fuzzy_shingle_cmd *) cmd;
+
+ for (i = 0; i < RSPAMD_SHINGLE_SIZE; i++) {
+ rc = rspamd_fuzzy_backend_sqlite_run_stmt (backend, TRUE,
+ RSPAMD_FUZZY_BACKEND_INSERT_SHINGLE,
+ shcmd->sgl.hashes[i], (gint64)i, id);
+ msg_debug_fuzzy_backend ("add shingle %d -> %L: %L",
+ i,
+ shcmd->sgl.hashes[i],
+ id);
+
+ if (rc != SQLITE_OK) {
+ msg_warn_fuzzy_backend ("cannot add shingle %d -> "
+ "%L: %L: %s", i,
+ shcmd->sgl.hashes[i],
+ id, sqlite3_errmsg (backend->db));
+ }
+ }
+ }
+ }
+ else {
+ msg_warn_fuzzy_backend ("cannot add hash to %d -> "
+ "%*xs: %s", (gint)cmd->flag,
+ (gint)sizeof (cmd->digest), cmd->digest,
+ sqlite3_errmsg (backend->db));
+ }
+
+ rspamd_fuzzy_backend_sqlite_cleanup_stmt (backend,
+ RSPAMD_FUZZY_BACKEND_INSERT);
+ }
+
+ return (rc == SQLITE_OK);
+}
+
+gboolean
+rspamd_fuzzy_backend_sqlite_finish_update (struct rspamd_fuzzy_backend_sqlite *backend,
+ const gchar *source, gboolean version_bump)
+{
+ gint rc = SQLITE_OK, wal_frames, wal_checkpointed, ver;
+
+ /* Get and update version */
+ if (version_bump) {
+ ver = rspamd_fuzzy_backend_sqlite_version (backend, source);
+ ++ver;
+
+ rc = rspamd_fuzzy_backend_sqlite_run_stmt (backend, TRUE,
+ RSPAMD_FUZZY_BACKEND_SET_VERSION,
+ (gint64)ver, (gint64)time (NULL), source);
+ }
+
+ if (rc == SQLITE_OK) {
+ rc = rspamd_fuzzy_backend_sqlite_run_stmt (backend, TRUE,
+ RSPAMD_FUZZY_BACKEND_TRANSACTION_COMMIT);
+
+ if (rc != SQLITE_OK) {
+ msg_warn_fuzzy_backend ("cannot commit updates: %s",
+ sqlite3_errmsg (backend->db));
+ rspamd_fuzzy_backend_sqlite_run_stmt (backend, TRUE,
+ RSPAMD_FUZZY_BACKEND_TRANSACTION_ROLLBACK);
+ return FALSE;
+ }
+ else {
+ if (!rspamd_sqlite3_sync (backend->db, &wal_frames, &wal_checkpointed)) {
+ msg_warn_fuzzy_backend ("cannot commit checkpoint: %s",
+ sqlite3_errmsg (backend->db));
+ }
+ else if (wal_checkpointed > 0) {
+ msg_info_fuzzy_backend ("total number of frames in the wal file: "
+ "%d, checkpointed: %d", wal_frames, wal_checkpointed);
+ }
+ }
+ }
+ else {
+ msg_warn_fuzzy_backend ("cannot update version for %s: %s", source,
+ sqlite3_errmsg (backend->db));
+ rspamd_fuzzy_backend_sqlite_run_stmt (backend, TRUE,
+ RSPAMD_FUZZY_BACKEND_TRANSACTION_ROLLBACK);
+ return FALSE;
+ }
+
+ return TRUE;
+}
+
+gboolean
+rspamd_fuzzy_backend_sqlite_del (struct rspamd_fuzzy_backend_sqlite *backend,
+ const struct rspamd_fuzzy_cmd *cmd)
+{
+ int rc = -1;
+
+ if (backend == NULL) {
+ return FALSE;
+ }
+
+ rc = rspamd_fuzzy_backend_sqlite_run_stmt (backend, FALSE,
+ RSPAMD_FUZZY_BACKEND_CHECK,
+ cmd->digest);
+
+ if (rc == SQLITE_OK) {
+ rspamd_fuzzy_backend_sqlite_cleanup_stmt (backend, RSPAMD_FUZZY_BACKEND_CHECK);
+
+ rc = rspamd_fuzzy_backend_sqlite_run_stmt (backend, TRUE,
+ RSPAMD_FUZZY_BACKEND_DELETE,
+ cmd->digest);
+ if (rc != SQLITE_OK) {
+ msg_warn_fuzzy_backend ("cannot update hash to %d -> "
+ "%*xs: %s", (gint) cmd->flag,
+ (gint) sizeof (cmd->digest), cmd->digest,
+ sqlite3_errmsg (backend->db));
+ }
+ }
+ else {
+ /* Hash is missing */
+ rspamd_fuzzy_backend_sqlite_cleanup_stmt (backend, RSPAMD_FUZZY_BACKEND_CHECK);
+ }
+
+ return (rc == SQLITE_OK);
+}
+
+gboolean
+rspamd_fuzzy_backend_sqlite_sync (struct rspamd_fuzzy_backend_sqlite *backend,
+ gint64 expire,
+ gboolean clean_orphaned)
+{
+ struct orphaned_shingle_elt {
+ gint64 value;
+ gint64 number;
+ };
+
+ /* Do not do more than 5k ops per step */
+ const guint64 max_changes = 5000;
+ gboolean ret = FALSE;
+ gint64 expire_lim, expired;
+ gint rc, i, orphaned_cnt = 0;
+ GError *err = NULL;
+ static const gchar orphaned_shingles[] = "SELECT shingles.value,shingles.number "
+ "FROM shingles "
+ "LEFT JOIN digests ON "
+ "shingles.digest_id=digests.id WHERE "
+ "digests.id IS NULL;";
+ sqlite3_stmt *stmt;
+ GArray *orphaned;
+ struct orphaned_shingle_elt orphaned_elt, *pelt;
+
+
+ if (backend == NULL) {
+ return FALSE;
+ }
+
+ /* Perform expire */
+ if (expire > 0) {
+ expire_lim = time (NULL) - expire;
+
+ if (expire_lim > 0) {
+ ret = rspamd_fuzzy_backend_sqlite_run_stmt (backend, TRUE,
+ RSPAMD_FUZZY_BACKEND_TRANSACTION_START);
+
+ if (ret == SQLITE_OK) {
+
+ rc = rspamd_fuzzy_backend_sqlite_run_stmt (backend, FALSE,
+ RSPAMD_FUZZY_BACKEND_EXPIRE, expire_lim, max_changes);
+
+ if (rc == SQLITE_OK) {
+ expired = sqlite3_changes (backend->db);
+
+ if (expired > 0) {
+ backend->expired += expired;
+ msg_info_fuzzy_backend ("expired %L hashes", expired);
+ }
+ }
+ else {
+ msg_warn_fuzzy_backend (
+ "cannot execute expired statement: %s",
+ sqlite3_errmsg (backend->db));
+ }
+
+ rspamd_fuzzy_backend_sqlite_cleanup_stmt (backend,
+ RSPAMD_FUZZY_BACKEND_EXPIRE);
+
+ ret = rspamd_fuzzy_backend_sqlite_run_stmt (backend, TRUE,
+ RSPAMD_FUZZY_BACKEND_TRANSACTION_COMMIT);
+
+ if (ret != SQLITE_OK) {
+ rspamd_fuzzy_backend_sqlite_run_stmt (backend, TRUE,
+ RSPAMD_FUZZY_BACKEND_TRANSACTION_ROLLBACK);
+ }
+ }
+ if (ret != SQLITE_OK) {
+ msg_warn_fuzzy_backend ("cannot expire db: %s",
+ sqlite3_errmsg (backend->db));
+ }
+ }
+ }
+
+ /* Cleanup database */
+ if (clean_orphaned) {
+ ret = rspamd_fuzzy_backend_sqlite_run_stmt (backend, TRUE,
+ RSPAMD_FUZZY_BACKEND_TRANSACTION_START);
+
+ if (ret == SQLITE_OK) {
+ if ((rc = sqlite3_prepare_v2 (backend->db,
+ orphaned_shingles,
+ -1,
+ &stmt,
+ NULL)) != SQLITE_OK) {
+ msg_warn_fuzzy_backend ("cannot cleanup shingles: %s",
+ sqlite3_errmsg (backend->db));
+ }
+ else {
+ orphaned = g_array_new (FALSE,
+ FALSE,
+ sizeof (struct orphaned_shingle_elt));
+
+ while (sqlite3_step (stmt) == SQLITE_ROW) {
+ orphaned_elt.value = sqlite3_column_int64 (stmt, 0);
+ orphaned_elt.number = sqlite3_column_int64 (stmt, 1);
+ g_array_append_val (orphaned, orphaned_elt);
+
+ if (orphaned->len > max_changes) {
+ break;
+ }
+ }
+
+ sqlite3_finalize (stmt);
+ orphaned_cnt = orphaned->len;
+
+ if (orphaned_cnt > 0) {
+ msg_info_fuzzy_backend (
+ "going to delete %ud orphaned shingles",
+ orphaned_cnt);
+ /* Need to delete orphaned elements */
+ for (i = 0; i < (gint) orphaned_cnt; i++) {
+ pelt = &g_array_index (orphaned,
+ struct orphaned_shingle_elt,
+ i);
+ rspamd_fuzzy_backend_sqlite_run_stmt (backend, TRUE,
+ RSPAMD_FUZZY_BACKEND_DELETE_ORPHANED,
+ pelt->value, pelt->number);
+ }
+ }
+
+
+ g_array_free (orphaned, TRUE);
+ }
+
+ ret = rspamd_fuzzy_backend_sqlite_run_stmt (backend, TRUE,
+ RSPAMD_FUZZY_BACKEND_TRANSACTION_COMMIT);
+
+ if (ret == SQLITE_OK) {
+ msg_info_fuzzy_backend (
+ "deleted %ud orphaned shingles",
+ orphaned_cnt);
+ }
+ else {
+ msg_warn_fuzzy_backend (
+ "cannot synchronize fuzzy backend: %e",
+ err);
+ rspamd_fuzzy_backend_sqlite_run_stmt (backend, TRUE,
+ RSPAMD_FUZZY_BACKEND_TRANSACTION_ROLLBACK);
+ }
+ }
+ }
+
+ return ret;
+}
+
+
+void
+rspamd_fuzzy_backend_sqlite_close (struct rspamd_fuzzy_backend_sqlite *backend)
+{
+ if (backend != NULL) {
+ if (backend->db != NULL) {
+ rspamd_fuzzy_backend_sqlite_close_stmts (backend);
+ sqlite3_close (backend->db);
+ }
+
+ if (backend->path != NULL) {
+ g_free (backend->path);
+ }
+
+ if (backend->pool) {
+ rspamd_mempool_delete (backend->pool);
+ }
+
+ g_free (backend);
+ }
+}
+
+
+gsize
+rspamd_fuzzy_backend_sqlite_count (struct rspamd_fuzzy_backend_sqlite *backend)
+{
+ if (backend) {
+ if (rspamd_fuzzy_backend_sqlite_run_stmt (backend, FALSE,
+ RSPAMD_FUZZY_BACKEND_COUNT) == SQLITE_OK) {
+ backend->count = sqlite3_column_int64 (
+ prepared_stmts[RSPAMD_FUZZY_BACKEND_COUNT].stmt, 0);
+ }
+
+ rspamd_fuzzy_backend_sqlite_cleanup_stmt (backend, RSPAMD_FUZZY_BACKEND_COUNT);
+
+ return backend->count;
+ }
+
+ return 0;
+}
+
+gint
+rspamd_fuzzy_backend_sqlite_version (struct rspamd_fuzzy_backend_sqlite *backend,
+ const gchar *source)
+{
+ gint ret = 0;
+
+ if (backend) {
+ if (rspamd_fuzzy_backend_sqlite_run_stmt (backend, FALSE,
+ RSPAMD_FUZZY_BACKEND_VERSION, source) == SQLITE_OK) {
+ ret = sqlite3_column_int64 (
+ prepared_stmts[RSPAMD_FUZZY_BACKEND_VERSION].stmt, 0);
+ }
+
+ rspamd_fuzzy_backend_sqlite_cleanup_stmt (backend, RSPAMD_FUZZY_BACKEND_VERSION);
+ }
+
+ return ret;
+}
+
+gsize
+rspamd_fuzzy_backend_sqlite_expired (struct rspamd_fuzzy_backend_sqlite *backend)
+{
+ return backend != NULL ? backend->expired : 0;
+}
+
+const gchar *
+rspamd_fuzzy_sqlite_backend_id (struct rspamd_fuzzy_backend_sqlite *backend)
+{
+ return backend != NULL ? backend->id : 0;
+}
diff --git a/src/libserver/fuzzy_backend/fuzzy_backend_sqlite.h b/src/libserver/fuzzy_backend/fuzzy_backend_sqlite.h
new file mode 100644
index 000000000..33dc94f30
--- /dev/null
+++ b/src/libserver/fuzzy_backend/fuzzy_backend_sqlite.h
@@ -0,0 +1,107 @@
+/*-
+ * Copyright 2016 Vsevolod Stakhov
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef FUZZY_BACKEND_H_
+#define FUZZY_BACKEND_H_
+
+#include "config.h"
+#include "fuzzy_wire.h"
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+struct rspamd_fuzzy_backend_sqlite;
+
+/**
+ * Open fuzzy backend
+ * @param path file to open (legacy file will be converted automatically)
+ * @param err error pointer
+ * @return backend structure or NULL
+ */
+struct rspamd_fuzzy_backend_sqlite *rspamd_fuzzy_backend_sqlite_open (const gchar *path,
+ gboolean vacuum,
+ GError **err);
+
+/**
+ * Check specified fuzzy in the backend
+ * @param backend
+ * @param cmd
+ * @return reply with probability and weight
+ */
+struct rspamd_fuzzy_reply rspamd_fuzzy_backend_sqlite_check (
+ struct rspamd_fuzzy_backend_sqlite *backend,
+ const struct rspamd_fuzzy_cmd *cmd,
+ gint64 expire);
+
+/**
+ * Prepare storage for updates (by starting transaction)
+ */
+gboolean rspamd_fuzzy_backend_sqlite_prepare_update (struct rspamd_fuzzy_backend_sqlite *backend,
+ const gchar *source);
+
+/**
+ * Add digest to the database
+ * @param backend
+ * @param cmd
+ * @return
+ */
+gboolean rspamd_fuzzy_backend_sqlite_add (struct rspamd_fuzzy_backend_sqlite *backend,
+ const struct rspamd_fuzzy_cmd *cmd);
+
+/**
+ * Delete digest from the database
+ * @param backend
+ * @param cmd
+ * @return
+ */
+gboolean rspamd_fuzzy_backend_sqlite_del (
+ struct rspamd_fuzzy_backend_sqlite *backend,
+ const struct rspamd_fuzzy_cmd *cmd);
+
+/**
+ * Commit updates to storage
+ */
+gboolean rspamd_fuzzy_backend_sqlite_finish_update (struct rspamd_fuzzy_backend_sqlite *backend,
+ const gchar *source, gboolean version_bump);
+
+/**
+ * Sync storage
+ * @param backend
+ * @return
+ */
+gboolean rspamd_fuzzy_backend_sqlite_sync (struct rspamd_fuzzy_backend_sqlite *backend,
+ gint64 expire,
+ gboolean clean_orphaned);
+
+/**
+ * Close storage
+ * @param backend
+ */
+void rspamd_fuzzy_backend_sqlite_close (struct rspamd_fuzzy_backend_sqlite *backend);
+
+gsize rspamd_fuzzy_backend_sqlite_count (struct rspamd_fuzzy_backend_sqlite *backend);
+
+gint rspamd_fuzzy_backend_sqlite_version (struct rspamd_fuzzy_backend_sqlite *backend, const gchar *source);
+
+gsize rspamd_fuzzy_backend_sqlite_expired (struct rspamd_fuzzy_backend_sqlite *backend);
+
+const gchar *rspamd_fuzzy_sqlite_backend_id (struct rspamd_fuzzy_backend_sqlite *backend);
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif /* FUZZY_BACKEND_H_ */