]> source.dussan.org Git - rspamd.git/commitdiff
[Rework] Adopt fuzzy storage for abstract backend
authorVsevolod Stakhov <vsevolod@highsecure.ru>
Fri, 2 Sep 2016 13:32:58 +0000 (14:32 +0100)
committerVsevolod Stakhov <vsevolod@highsecure.ru>
Fri, 2 Sep 2016 13:32:58 +0000 (14:32 +0100)
src/fuzzy_storage.c
src/rspamd.c

index 0d29cd19717fdc35a6b1ba1f9b7691b0b2c86d50..d663a89e4fd68757a7caff39df1ebebf76b2317c 100644 (file)
 #include "libutil/http_private.h"
 #include "unix-std.h"
 
-/* This number is used as expire time in seconds for cache items  (2 days) */
-#define DEFAULT_EXPIRE 172800L
 /* Resync value in seconds */
 #define DEFAULT_SYNC_TIMEOUT 60.0
 #define DEFAULT_KEYPAIR_CACHE_SIZE 512
 #define DEFAULT_MASTER_TIMEOUT 10.0
 
-#define INVALID_NODE_TIME (guint64) - 1
-
 static const gchar *local_db_name = "local";
 
 #define msg_err_fuzzy_update(...) rspamd_default_log_function (G_LOG_LEVEL_CRITICAL, \
@@ -77,10 +73,6 @@ worker_t fuzzy_worker = {
                RSPAMD_WORKER_VER           /* Version info */
 };
 
-/* For evtimer */
-static struct timeval tmv;
-static struct event tev;
-
 struct fuzzy_global_stat {
        guint64 fuzzy_hashes;
        /**< number of fuzzy hashes stored                                      */
@@ -139,7 +131,7 @@ struct rspamd_fuzzy_storage_ctx {
        gboolean encrypted_only;
        struct rspamd_keypair_cache *keypair_cache;
        rspamd_lru_hash_t *errors_ips;
-       struct rspamd_fuzzy_backend_sqlite *backend;
+       struct rspamd_fuzzy_backend *backend;
        GQueue *updates_pending;
        struct rspamd_dns_resolver *resolver;
        struct rspamd_config *cfg;
@@ -165,6 +157,7 @@ struct fuzzy_session {
        } cmd;
 
        struct rspamd_fuzzy_encrypted_reply reply;
+       struct fuzzy_key_stat *ip_stat;
 
        enum rspamd_fuzzy_epoch epoch;
        enum fuzzy_cmd_type cmd_type;
@@ -191,7 +184,10 @@ struct fuzzy_master_update_session {
        const gchar *name;
        gchar uid[16];
        struct rspamd_http_connection *conn;
+       struct rspamd_http_message *msg;
        struct rspamd_fuzzy_storage_ctx *ctx;
+       const gchar *src;
+       gchar *psrc;
        rspamd_inet_addr_t *addr;
        gboolean replied;
        gint sock;
@@ -242,6 +238,14 @@ fuzzy_key_dtor (gpointer p)
        g_slice_free1 (sizeof (*key), key);
 }
 
+static void
+fuzzy_count_callback (guint64 count, void *ud)
+{
+       struct rspamd_fuzzy_storage_ctx *ctx = ud;
+
+       ctx->stat.fuzzy_hashes = count;
+}
+
 struct fuzzy_slave_connection {
        struct rspamd_cryptobox_keypair *local_key;
        struct rspamd_cryptobox_pubkey *remote_key;
@@ -266,19 +270,34 @@ fuzzy_mirror_close_connection (struct fuzzy_slave_connection *conn)
        }
 }
 
+struct rspamd_fuzzy_updates_cbdata {
+       struct rspamd_fuzzy_storage_ctx *ctx;
+       struct rspamd_http_message *msg;
+       struct fuzzy_slave_connection *conn;
+       struct rspamd_fuzzy_mirror *m;
+};
+
 static void
-fuzzy_mirror_updates_to_http (struct rspamd_fuzzy_storage_ctx *ctx,
-               struct rspamd_http_message *msg)
+fuzzy_mirror_updates_version_cb (guint64 rev64, void *ud)
 {
+       struct rspamd_fuzzy_updates_cbdata *cbdata = ud;
        GList *cur;
        struct fuzzy_peer_cmd *io_cmd;
-       guint32 len;
-       guint32 rev;
+       guint32 rev32 = rev64, len;
        const gchar *p;
        rspamd_fstring_t *reply;
+       struct fuzzy_slave_connection *conn;
+       struct rspamd_fuzzy_storage_ctx *ctx;
+       struct rspamd_http_message *msg;
+       struct rspamd_fuzzy_mirror *m;
+       struct timeval tv;
 
-       rev = rspamd_fuzzy_backend_sqlite_version (ctx->backend, local_db_name);
-       rev = GUINT32_TO_LE (rev);
+       conn = cbdata->conn;
+       ctx = cbdata->ctx;
+       msg = cbdata->msg;
+       m = cbdata->m;
+       g_slice_free1 (sizeof (*cbdata), cbdata);
+       rev32 = GUINT32_TO_LE (rev32);
        len = sizeof (guint32) * 2; /* revision + last chunk */
 
        for (cur = ctx->updates_pending->head; cur != NULL; cur = g_list_next (cur)) {
@@ -295,8 +314,8 @@ fuzzy_mirror_updates_to_http (struct rspamd_fuzzy_storage_ctx *ctx,
        }
 
        reply = rspamd_fstring_sized_new (len);
-       reply = rspamd_fstring_append (reply, (const char *)&rev,
-                       sizeof (rev));
+       reply = rspamd_fstring_append (reply, (const char *)&rev32,
+                       sizeof (rev32));
 
        for (cur = ctx->updates_pending->head; cur != NULL; cur = g_list_next (cur)) {
                io_cmd = cur->data;
@@ -320,6 +339,30 @@ fuzzy_mirror_updates_to_http (struct rspamd_fuzzy_storage_ctx *ctx,
        len = 0;
        reply = rspamd_fstring_append (reply, (const char *)&len, sizeof (len));
        rspamd_http_message_set_body_from_fstring_steal (msg, reply);
+       double_to_tv (ctx->sync_timeout, &tv);
+       rspamd_http_connection_write_message (conn->http_conn,
+                       msg, NULL, NULL, conn,
+                       conn->sock,
+                       &tv, ctx->ev_base);
+       msg_info ("send update request to %s", m->name);
+}
+
+static void
+fuzzy_mirror_updates_to_http (struct rspamd_fuzzy_mirror *m,
+               struct fuzzy_slave_connection *conn,
+               struct rspamd_fuzzy_storage_ctx *ctx,
+               struct rspamd_http_message *msg)
+{
+
+       struct rspamd_fuzzy_updates_cbdata *cbdata;
+
+       cbdata = g_slice_alloc (sizeof (*cbdata));
+       cbdata->ctx = ctx;
+       cbdata->msg = msg;
+       cbdata->conn = conn;
+       cbdata->m = m;
+       rspamd_fuzzy_backend_version (ctx->backend, local_db_name,
+                       fuzzy_mirror_updates_version_cb, cbdata);
 }
 
 static void
@@ -353,7 +396,6 @@ rspamd_fuzzy_send_update_mirror (struct rspamd_fuzzy_storage_ctx *ctx,
 {
        struct fuzzy_slave_connection *conn;
        struct rspamd_http_message *msg;
-       struct timeval tv;
 
        conn = g_slice_alloc0 (sizeof (*conn));
        conn->up = rspamd_upstream_get (m->u,
@@ -389,84 +431,85 @@ rspamd_fuzzy_send_update_mirror (struct rspamd_fuzzy_storage_ctx *ctx,
        rspamd_http_connection_set_key (conn->http_conn,
                        ctx->sync_keypair);
        msg->peer_key = rspamd_pubkey_ref (m->key);
-       double_to_tv (ctx->sync_timeout, &tv);
-       fuzzy_mirror_updates_to_http (ctx, msg);
+       fuzzy_mirror_updates_to_http (m, conn, ctx, msg);
+}
 
-       rspamd_http_connection_write_message (conn->http_conn,
-                       msg, NULL, NULL, conn,
-                       conn->sock,
-                       &tv, ctx->ev_base);
-       msg_info ("send update request to %s", m->name);
+struct rspamd_updates_cbdata {
+       struct rspamd_fuzzy_storage_ctx *ctx;
+       const gchar *source;
+};
+
+static void
+fuzzy_update_version_callback (guint64 ver, void *ud)
+{
+       msg_info ("updated fuzzy storage from %s: version: %d",
+               (const char *)ud, (gint)ver);
 }
 
 static void
-rspamd_fuzzy_process_updates_queue (struct rspamd_fuzzy_storage_ctx *ctx,
-               const gchar *source)
+rspamd_fuzzy_updates_cb (gboolean success, void *ud)
 {
-       GList *cur;
-       struct fuzzy_peer_cmd *io_cmd;
-       struct rspamd_fuzzy_cmd *cmd;
-       gpointer ptr;
+       struct rspamd_updates_cbdata *cbdata = ud;
        struct rspamd_fuzzy_mirror *m;
        guint nupdates = 0, i;
+       struct rspamd_fuzzy_storage_ctx *ctx;
+       const gchar *source;
+       GList *cur;
+       struct fuzzy_peer_cmd *io_cmd;
 
-       if (ctx->updates_pending &&
-                       g_queue_get_length (ctx->updates_pending) > 0 &&
-                       rspamd_fuzzy_backend_sqlite_prepare_update (ctx->backend, source)) {
-               cur = ctx->updates_pending->head;
+       ctx = cbdata->ctx;
+       source = cbdata->source;
 
-               while (cur) {
-                       io_cmd = cur->data;
+       if (success) {
+               rspamd_fuzzy_backend_count (ctx->backend, fuzzy_count_callback, ctx);
 
-                       if (io_cmd->is_shingle) {
-                               cmd = &io_cmd->cmd.shingle.basic;
-                               ptr = &io_cmd->cmd.shingle;
-                       }
-                       else {
-                               cmd = &io_cmd->cmd.normal;
-                               ptr = &io_cmd->cmd.normal;
-                       }
+               if (nupdates > 0) {
+                       for (i = 0; i < ctx->mirrors->len; i ++) {
+                               m = g_ptr_array_index (ctx->mirrors, i);
 
-                       if (cmd->cmd == FUZZY_WRITE) {
-                               rspamd_fuzzy_backend_sqlite_add (ctx->backend, ptr);
-                       }
-                       else {
-                               rspamd_fuzzy_backend_sqlite_del (ctx->backend, ptr);
+                               rspamd_fuzzy_send_update_mirror (ctx, m);
                        }
+               }
+
+               /* Clear updates */
+               cur = ctx->updates_pending->head;
 
-                       nupdates++;
+               while (cur) {
+                       io_cmd = cur->data;
+                       g_slice_free1 (sizeof (*io_cmd), io_cmd);
                        cur = g_list_next (cur);
                }
 
-               if (rspamd_fuzzy_backend_sqlite_finish_update (ctx->backend, source, nupdates > 0)) {
-                       ctx->stat.fuzzy_hashes = rspamd_fuzzy_backend_sqlite_count (ctx->backend);
+               g_queue_clear (ctx->updates_pending);
+               rspamd_fuzzy_backend_version (ctx->backend, source,
+                               fuzzy_update_version_callback, (void *)source);
 
-                       if (nupdates > 0) {
-                               for (i = 0; i < ctx->mirrors->len; i ++) {
-                                       m = g_ptr_array_index (ctx->mirrors, i);
+       }
+       else {
+               msg_err ("cannot commit update transaction to fuzzy backend, "
+                               "%ud updates are still pending",
+                               g_queue_get_length (ctx->updates_pending));
+       }
 
-                                       rspamd_fuzzy_send_update_mirror (ctx, m);
-                               }
-                       }
+       g_slice_free1 (sizeof (*cbdata), cbdata);
+}
 
-                       /* Clear updates */
-                       cur = ctx->updates_pending->head;
+static void
+rspamd_fuzzy_process_updates_queue (struct rspamd_fuzzy_storage_ctx *ctx,
+               const gchar *source)
+{
+
+       struct rspamd_updates_cbdata *cbdata;
+
+       if (ctx->updates_pending &&
+                       g_queue_get_length (ctx->updates_pending) > 0) {
+               cbdata = g_slice_alloc (sizeof (*cbdata));
+               cbdata->ctx = ctx;
+               cbdata->source = source;
+               rspamd_fuzzy_backend_process_updates (ctx->backend, ctx->updates_pending,
+                               source, rspamd_fuzzy_updates_cb, cbdata);
 
-                       while (cur) {
-                               io_cmd = cur->data;
-                               g_slice_free1 (sizeof (*io_cmd), io_cmd);
-                               cur = g_list_next (cur);
-                       }
 
-                       g_queue_clear (ctx->updates_pending);
-                       msg_info ("updated fuzzy storage: %ud updates processed, version: %d",
-                                       nupdates, rspamd_fuzzy_backend_sqlite_version (ctx->backend, source));
-               }
-               else {
-                       msg_err ("cannot commit update transaction to fuzzy backend, "
-                                       "%ud updates are still pending",
-                                       g_queue_get_length (ctx->updates_pending));
-               }
        }
        else if (ctx->updates_pending &&
                        g_queue_get_length (ctx->updates_pending) > 0) {
@@ -598,6 +641,77 @@ rspamd_fuzzy_update_stats (struct rspamd_fuzzy_storage_ctx *ctx,
        }
 }
 
+static void
+rspamd_fuzzy_make_reply (struct rspamd_fuzzy_cmd *cmd,
+               struct rspamd_fuzzy_reply *result,
+               struct fuzzy_session *session,
+               gboolean encrypted, gboolean is_shingle)
+{
+       if (cmd) {
+               result->tag = cmd->tag;
+
+               memcpy (&session->reply.rep, result, sizeof (*result));
+
+               rspamd_fuzzy_update_stats (session->ctx,
+                               session->epoch,
+                               result->prob > 0.5,
+                               is_shingle,
+                               session->key_stat,
+                               session->ip_stat,
+                               cmd->cmd,
+                               result->value);
+
+               if (encrypted) {
+                       /* We need also to encrypt reply */
+                       ottery_rand_bytes (session->reply.hdr.nonce,
+                                       sizeof (session->reply.hdr.nonce));
+                       rspamd_cryptobox_encrypt_nm_inplace ((guchar *)&session->reply.rep,
+                                       sizeof (session->reply.rep),
+                                       session->reply.hdr.nonce,
+                                       session->nm,
+                                       session->reply.hdr.mac,
+                                       RSPAMD_CRYPTOBOX_MODE_25519);
+               }
+       }
+
+       rspamd_fuzzy_write_reply (session);
+}
+
+static void
+rspamd_fuzzy_check_callback (struct rspamd_fuzzy_reply *result, void *ud)
+{
+       struct fuzzy_session *session = ud;
+       gboolean encrypted = FALSE, is_shingle = FALSE;
+       struct rspamd_fuzzy_cmd *cmd = NULL;
+       gsize up_len = 0;
+
+       switch (session->cmd_type) {
+       case CMD_NORMAL:
+               cmd = &session->cmd.normal;
+               up_len = sizeof (session->cmd.normal);
+               break;
+       case CMD_SHINGLE:
+               cmd = &session->cmd.shingle.basic;
+               up_len = sizeof (session->cmd.shingle);
+               is_shingle = TRUE;
+               break;
+       case CMD_ENCRYPTED_NORMAL:
+               cmd = &session->cmd.enc_normal.cmd;
+               up_len = sizeof (session->cmd.normal);
+               encrypted = TRUE;
+               break;
+       case CMD_ENCRYPTED_SHINGLE:
+               cmd = &session->cmd.enc_shingle.cmd.basic;
+               up_len = sizeof (session->cmd.shingle);
+               encrypted = TRUE;
+               is_shingle = TRUE;
+               break;
+       }
+
+       rspamd_fuzzy_make_reply (cmd, result, session, encrypted, is_shingle);
+       REF_RELEASE (session);
+}
+
 static void
 rspamd_fuzzy_process_command (struct fuzzy_session *session)
 {
@@ -637,14 +751,16 @@ rspamd_fuzzy_process_command (struct fuzzy_session *session)
        if (G_UNLIKELY (cmd == NULL || up_len == 0)) {
                result.value = 500;
                result.prob = 0.0;
-               goto reply;
+               rspamd_fuzzy_make_reply (cmd, &result, session, encrypted, is_shingle);
+               return;
        }
 
        if (session->ctx->encrypted_only && !encrypted) {
                /* Do not accept unencrypted commands */
                result.value = 403;
                result.prob = 0.0;
-               goto reply;
+               rspamd_fuzzy_make_reply (cmd, &result, session, encrypted, is_shingle);
+               return;
        }
 
        if (session->key_stat) {
@@ -657,17 +773,21 @@ rspamd_fuzzy_process_command (struct fuzzy_session *session)
                        rspamd_lru_hash_insert (session->key_stat->last_ips,
                                        naddr, ip_stat, -1, 0);
                }
+
+               session->ip_stat = ip_stat;
        }
 
        result.flag = cmd->flag;
        if (cmd->cmd == FUZZY_CHECK) {
-               result = rspamd_fuzzy_backend_sqlite_check (session->ctx->backend, cmd,
-                               session->ctx->expire);
+               REF_RETAIN (session);
+               rspamd_fuzzy_backend_check (session->ctx->backend, cmd,
+                               rspamd_fuzzy_check_callback, session);
        }
        else if (cmd->cmd == FUZZY_STAT) {
                result.prob = 1.0;
                result.value = 0;
-               result.flag = rspamd_fuzzy_backend_sqlite_count (session->ctx->backend);
+               result.flag = session->ctx->stat.fuzzy_hashes;
+               rspamd_fuzzy_make_reply (cmd, &result, session, encrypted, is_shingle);
        }
        else {
                if (rspamd_fuzzy_check_client (session)) {
@@ -703,36 +823,9 @@ rspamd_fuzzy_process_command (struct fuzzy_session *session)
                        result.value = 403;
                        result.prob = 0.0;
                }
-       }
-
-reply:
-       if (cmd) {
-               result.tag = cmd->tag;
 
-               memcpy (&session->reply.rep, &result, sizeof (result));
-
-               rspamd_fuzzy_update_stats (session->ctx,
-                               session->epoch,
-                               result.prob > 0.5,
-                               is_shingle,
-                               session->key_stat,
-                               ip_stat, cmd->cmd,
-                               result.value);
-
-               if (encrypted) {
-                       /* We need also to encrypt reply */
-                       ottery_rand_bytes (session->reply.hdr.nonce,
-                                       sizeof (session->reply.hdr.nonce));
-                       rspamd_cryptobox_encrypt_nm_inplace ((guchar *)&session->reply.rep,
-                                       sizeof (session->reply.rep),
-                                       session->reply.hdr.nonce,
-                                       session->nm,
-                                       session->reply.hdr.mac,
-                                       RSPAMD_CRYPTOBOX_MODE_25519);
-               }
+               rspamd_fuzzy_make_reply (cmd, &result, session, encrypted, is_shingle);
        }
-
-       rspamd_fuzzy_write_reply (session);
 }
 
 
@@ -916,12 +1009,11 @@ rspamd_fuzzy_cmd_from_wire (guchar *buf, guint buflen, struct fuzzy_session *s)
 
 static void
 rspamd_fuzzy_mirror_process_update (struct fuzzy_master_update_session *session,
-               struct rspamd_http_message *msg)
+               struct rspamd_http_message *msg, guint our_rev)
 {
        const guchar *p;
-       gchar *src = NULL, *psrc;
        gsize remain;
-       gint32 revision, our_rev;
+       gint32 revision;
        guint32 len = 0, cnt = 0;
        struct fuzzy_peer_cmd cmd, *pcmd;
        enum {
@@ -932,25 +1024,6 @@ rspamd_fuzzy_mirror_process_update (struct fuzzy_master_update_session *session,
        GList *updates = NULL, *cur;
        gpointer flag_ptr;
 
-       if (!rspamd_http_message_get_body (msg, NULL) || !msg->url
-                       || msg->url->len == 0) {
-               msg_err_fuzzy_update ("empty update message, not processing");
-
-               return;
-       }
-
-       /* Detect source from url: /update_v1/<source>, so we look for the last '/' */
-       remain = msg->url->len;
-       psrc = rspamd_fstringdup (msg->url);
-       src = psrc;
-
-       while (remain--) {
-               if (src[remain] == '/') {
-                       src = &src[remain + 1];
-                       break;
-               }
-       }
-
        /*
         * Message format:
         * <uint32_le> - revision
@@ -965,13 +1038,11 @@ rspamd_fuzzy_mirror_process_update (struct fuzzy_master_update_session *session,
        if (remain > sizeof (gint32) * 2) {
                memcpy (&revision, p, sizeof (gint32));
                revision = GINT32_TO_LE (revision);
-               our_rev = rspamd_fuzzy_backend_sqlite_version (session->ctx->backend, src);
 
                if (revision <= our_rev) {
                        msg_err_fuzzy_update ("remote revision: %d is older than ours: %d, "
                                        "refusing update",
                                        revision, our_rev);
-                       g_free (psrc);
 
                        return;
                }
@@ -1076,7 +1147,7 @@ rspamd_fuzzy_mirror_process_update (struct fuzzy_master_update_session *session,
                cur->data = NULL;
        }
 
-       rspamd_fuzzy_process_updates_queue (session->ctx, src);
+       rspamd_fuzzy_process_updates_queue (session->ctx, session->src);
        msg_info_fuzzy_update ("processed updates from the master %s, "
                        "%ud operations processed,"
                        " revision: %d (local revision: %d)",
@@ -1084,8 +1155,6 @@ rspamd_fuzzy_mirror_process_update (struct fuzzy_master_update_session *session,
                        cnt, revision, our_rev);
 
 err:
-       g_free (psrc);
-
        if (updates) {
                /* We still need to clear queue */
                for (cur = updates; cur != NULL; cur = g_list_next (cur)) {
@@ -1119,6 +1188,10 @@ rspamd_fuzzy_mirror_session_destroy (struct fuzzy_master_update_session *session
                rspamd_http_connection_unref (session->conn);
                rspamd_inet_address_destroy (session->addr);
                close (session->sock);
+
+               if (session->psrc) {
+                       g_free (session->psrc);
+               }
                g_slice_free1 (sizeof (*session), session);
        }
 }
@@ -1151,6 +1224,15 @@ rspamd_fuzzy_mirror_send_reply (struct fuzzy_master_update_session *session,
                        session->ctx->ev_base);
 }
 
+static void
+rspamd_fuzzy_update_version_callback (guint64 version, void *ud)
+{
+       struct fuzzy_master_update_session *session = ud;
+
+       rspamd_fuzzy_mirror_process_update (session, session->msg, version);
+       rspamd_fuzzy_mirror_send_reply (session, 200, "OK");
+}
+
 static gint
 rspamd_fuzzy_mirror_finish_handler (struct rspamd_http_connection *conn,
        struct rspamd_http_message *msg)
@@ -1158,6 +1240,9 @@ rspamd_fuzzy_mirror_finish_handler (struct rspamd_http_connection *conn,
        struct fuzzy_master_update_session *session = conn->ud;
        const struct rspamd_cryptobox_pubkey *rk;
        const gchar *err_str = NULL;
+       gchar *psrc;
+       const gchar *src = NULL;
+       gsize remain;
 
        if (session->replied) {
                rspamd_fuzzy_mirror_session_destroy (session);
@@ -1189,9 +1274,30 @@ rspamd_fuzzy_mirror_finish_handler (struct rspamd_http_connection *conn,
                        msg_warn_fuzzy_update ("no trusted key specified, accept any update from %s",
                                        rspamd_inet_address_to_string (session->addr));
                }
+               if (!rspamd_http_message_get_body (msg, NULL) || !msg->url
+                               || msg->url->len == 0) {
+                       msg_err_fuzzy_update ("empty update message, not processing");
 
-               rspamd_fuzzy_mirror_process_update (session, msg);
-               rspamd_fuzzy_mirror_send_reply (session, 200, "OK");
+                       goto end;
+               }
+
+               /* Detect source from url: /update_v1/<source>, so we look for the last '/' */
+               remain = msg->url->len;
+               psrc = rspamd_fstringdup (msg->url);
+               src = psrc;
+
+               while (remain--) {
+                       if (src[remain] == '/') {
+                               src = &src[remain + 1];
+                               break;
+                       }
+               }
+
+               session->src = src;
+               session->psrc = psrc;
+               session->msg = msg;
+               rspamd_fuzzy_backend_version (session->ctx->backend, src,
+                               rspamd_fuzzy_update_version_callback, session);
 
                return 0;
        }
@@ -1352,38 +1458,6 @@ accept_fuzzy_socket (gint fd, short what, void *arg)
        }
 }
 
-static void
-sync_callback (gint fd, short what, void *arg)
-{
-       struct rspamd_worker *worker = (struct rspamd_worker *)arg;
-       struct rspamd_fuzzy_storage_ctx *ctx;
-       gdouble next_check;
-       guint64 old_expired, new_expired;
-
-       ctx = worker->ctx;
-
-       if (ctx->backend) {
-               rspamd_fuzzy_process_updates_queue (ctx, local_db_name);
-               /* Call backend sync */
-               old_expired = rspamd_fuzzy_backend_sqlite_expired (ctx->backend);
-               rspamd_fuzzy_backend_sqlite_sync (ctx->backend, ctx->expire, TRUE);
-               new_expired = rspamd_fuzzy_backend_sqlite_expired (ctx->backend);
-
-               if (old_expired < new_expired) {
-                       ctx->stat.fuzzy_hashes_expired += new_expired - old_expired;
-               }
-       }
-
-       /* Timer event */
-       event_del (&tev);
-       evtimer_set (&tev, sync_callback, worker);
-       event_base_set (ctx->ev_base, &tev);
-       /* Plan event with jitter */
-       next_check = rspamd_time_jitter (ctx->sync_timeout, 0);
-       double_to_tv (next_check, &tmv);
-       evtimer_add (&tev, &tmv);
-}
-
 static gboolean
 rspamd_fuzzy_storage_sync (struct rspamd_main *rspamd_main,
                struct rspamd_worker *worker, gint fd,
@@ -1392,29 +1466,13 @@ rspamd_fuzzy_storage_sync (struct rspamd_main *rspamd_main,
                gpointer ud)
 {
        struct rspamd_fuzzy_storage_ctx *ctx = ud;
-       gdouble next_check;
-       guint64 old_expired, new_expired;
        struct rspamd_control_reply rep;
 
-       if (ctx->backend) {
-               rspamd_fuzzy_process_updates_queue (ctx, local_db_name);
-               /* Call backend sync */
-               old_expired = rspamd_fuzzy_backend_sqlite_expired (ctx->backend);
-               rspamd_fuzzy_backend_sqlite_sync (ctx->backend, ctx->expire, TRUE);
-               new_expired = rspamd_fuzzy_backend_sqlite_expired (ctx->backend);
-
-               if (old_expired < new_expired) {
-                       ctx->stat.fuzzy_hashes_expired += new_expired - old_expired;
-               }
-       }
-
        rep.reply.fuzzy_sync.status = 0;
 
-       /* Handle next timer event */
-       event_del (&tev);
-       next_check = rspamd_time_jitter (ctx->sync_timeout, 0);
-       double_to_tv (next_check, &tmv);
-       evtimer_add (&tev, &tmv);
+       if (ctx->backend && worker->index == 0) {
+               rspamd_fuzzy_backend_start_expire (ctx->backend, ctx->sync_timeout);
+       }
 
        if (write (fd, &rep, sizeof (rep)) != sizeof (rep)) {
                msg_err ("cannot write reply to the control socket: %s",
@@ -1439,14 +1497,14 @@ rspamd_fuzzy_storage_reload (struct rspamd_main *rspamd_main,
 
        if (ctx->backend) {
                /* Close backend and reopen it one more time */
-               rspamd_fuzzy_backend_sqlite_close (ctx->backend);
+               rspamd_fuzzy_backend_close (ctx->backend);
        }
 
        memset (&rep, 0, sizeof (rep));
        rep.type = RSPAMD_CONTROL_RELOAD;
 
-       if ((ctx->backend = rspamd_fuzzy_backend_sqlite_open (ctx->hashfile,
-                       TRUE,
+       if ((ctx->backend = rspamd_fuzzy_backend_create (ctx->ev_base,
+                       worker->cf->options,
                        &err)) == NULL) {
                msg_err ("cannot open backend after reload: %e", err);
                g_error_free (err);
@@ -1456,6 +1514,10 @@ rspamd_fuzzy_storage_reload (struct rspamd_main *rspamd_main,
                rep.reply.reload.status = 0;
        }
 
+       if (ctx->backend && worker->index == 0) {
+               rspamd_fuzzy_backend_start_expire (ctx->backend, ctx->sync_timeout);
+       }
+
        if (write (fd, &rep, sizeof (rep)) != sizeof (rep)) {
                msg_err ("cannot write reply to the control socket: %s",
                                strerror (errno));
@@ -1644,7 +1706,7 @@ rspamd_fuzzy_storage_stat (struct rspamd_main *rspamd_main,
                rep.reply.fuzzy_stat.status = 0;
 
                memcpy (rep.reply.fuzzy_stat.storage_id,
-                               rspamd_fuzzy_sqlite_backend_id (ctx->backend),
+                               rspamd_fuzzy_backend_id (ctx->backend),
                                sizeof (rep.reply.fuzzy_stat.storage_id));
 
                obj = rspamd_fuzzy_stat_to_ucl (ctx, TRUE);
@@ -1901,7 +1963,6 @@ init_fuzzy (struct rspamd_config *cfg)
        ctx->magic = rspamd_fuzzy_storage_magic;
        ctx->sync_timeout = DEFAULT_SYNC_TIMEOUT;
        ctx->master_timeout = DEFAULT_MASTER_TIMEOUT;
-       ctx->expire = DEFAULT_EXPIRE;
        ctx->keypair_cache_size = DEFAULT_KEYPAIR_CACHE_SIZE;
        ctx->keys = g_hash_table_new_full (fuzzy_kp_hash, fuzzy_kp_equal,
                        NULL, fuzzy_key_dtor);
@@ -2108,7 +2169,6 @@ fuzzy_peer_rep (struct rspamd_worker *worker,
        GList *cur;
        struct rspamd_worker_listen_socket *ls;
        struct event *accept_events;
-       gdouble next_check;
 
        ctx->peer_fd = rep_fd;
 
@@ -2157,14 +2217,6 @@ fuzzy_peer_rep (struct rspamd_worker *worker,
                event_base_set (ctx->ev_base, &ctx->peer_ev);
                event_add (&ctx->peer_ev, NULL);
                ctx->updates_pending = g_queue_new ();
-
-               /* Timer event */
-               evtimer_set (&tev, sync_callback, worker);
-               event_base_set (ctx->ev_base, &tev);
-               /* Plan event with jitter */
-               next_check = rspamd_time_jitter (ctx->sync_timeout, 0);
-               double_to_tv (next_check, &tmv);
-               evtimer_add (&tev, &tmv);
        }
 }
 
@@ -2188,13 +2240,14 @@ start_fuzzy (struct rspamd_worker *worker)
        /*
         * Open DB and perform VACUUM
         */
-       if ((ctx->backend = rspamd_fuzzy_backend_sqlite_open (ctx->hashfile, TRUE, &err)) == NULL) {
+       if ((ctx->backend = rspamd_fuzzy_backend_create (ctx->ev_base,
+                       worker->cf->options, &err)) == NULL) {
                msg_err ("cannot open backend: %e", err);
                g_error_free (err);
                exit (EXIT_SUCCESS);
        }
 
-       ctx->stat.fuzzy_hashes = rspamd_fuzzy_backend_sqlite_count (ctx->backend);
+       rspamd_fuzzy_backend_count (ctx->backend, fuzzy_count_callback, ctx);
 
        if (ctx->keypair_cache_size > 0) {
                /* Create keypairs cache */
@@ -2202,7 +2255,7 @@ start_fuzzy (struct rspamd_worker *worker)
        }
 
        if (worker->index == 0) {
-               rspamd_fuzzy_backend_sqlite_sync (ctx->backend, ctx->expire, TRUE);
+               rspamd_fuzzy_backend_start_expire (ctx->backend, ctx->sync_timeout);
        }
 
        if (ctx->mirrors && ctx->mirrors->len != 0) {
@@ -2260,12 +2313,7 @@ start_fuzzy (struct rspamd_worker *worker)
        event_base_loop (ctx->ev_base, 0);
        rspamd_worker_block_signals ();
 
-       if (worker->index == 0) {
-               rspamd_fuzzy_process_updates_queue (ctx, local_db_name);
-               rspamd_fuzzy_backend_sqlite_sync (ctx->backend, ctx->expire, TRUE);
-       }
-
-       rspamd_fuzzy_backend_sqlite_close (ctx->backend);
+       rspamd_fuzzy_backend_close (ctx->backend);
        rspamd_log_close (worker->srv->logger);
 
        if (ctx->peer_fd != -1) {
index 3ddee3ff4e4455ddc79188a1c08663679eef37e0..eb2f37ecb26f1c93e10831afdfb636bfc46d82da 100644 (file)
@@ -16,7 +16,6 @@
 #include "config.h"
 #include "rspamd.h"
 #include "libutil/map.h"
-#include "fuzzy_storage.h"
 #include "lua/lua_common.h"
 #include "libserver/worker_util.h"
 #include "libserver/rspamd_control.h"