]> source.dussan.org Git - rspamd.git/commitdiff
[Project] Rework and simplify fuzzy storage, remove mirroring
authorVsevolod Stakhov <vsevolod@highsecure.ru>
Wed, 19 Jun 2019 13:13:39 +0000 (14:13 +0100)
committerVsevolod Stakhov <vsevolod@highsecure.ru>
Sat, 22 Jun 2019 09:57:29 +0000 (10:57 +0100)
src/controller.c
src/fuzzy_storage.c

index bf74f03a3fd2a35f5087a54a19e8aa1cac9d22e6..8510879450b034f9c870a983c4e49e63e0455aa1 100644 (file)
@@ -105,12 +105,8 @@ INIT_LOG_MODULE(controller)
 #define COLOR_REJECT "#CB4B4B"
 #define COLOR_TOTAL "#9440ED"
 
-const struct timeval rrd_update_time = {
-               .tv_sec = 1,
-               .tv_usec = 0
-};
-
-const guint64 rspamd_controller_ctx_magic = 0xf72697805e6941faULL;
+const static ev_tstamp rrd_update_time = 1.0;
+const static guint64 rspamd_controller_ctx_magic = 0xf72697805e6941faULL;
 
 extern void fuzzy_stat_command (struct rspamd_task *task);
 
@@ -132,7 +128,7 @@ worker_t controller_worker = {
 struct rspamd_controller_worker_ctx {
        guint64 magic;
        /* Events base */
-       struct ev_loop *ev_base;
+       struct ev_loop *event_loop;
        /* DNS resolver */
        struct rspamd_dns_resolver *resolver;
        /* Config */
@@ -153,7 +149,7 @@ struct rspamd_controller_worker_ctx {
        struct rspamd_http_context *http_ctx;
        struct rspamd_http_connection_router *http;
        /* Server's start time */
-       time_t start_time;
+       ev_tstamp start_time;
        /* Main server */
        struct rspamd_main *srv;
        /* SSL cert */
@@ -182,9 +178,9 @@ struct rspamd_controller_worker_ctx {
        /* Local keypair */
        gpointer key;
 
-       struct event *rrd_event;
+       ev_timer rrd_event;
        struct rspamd_rrd_file *rrd;
-       struct event save_stats_event;
+       ev_timer save_stats_event;
        struct rspamd_lang_detector *lang_det;
        gdouble task_timeout;
 };
@@ -1525,7 +1521,7 @@ rspamd_controller_handle_lua_history (lua_State *L,
 
                        if (lua_isfunction (L, -1)) {
                                task = rspamd_task_new (session->ctx->worker, session->cfg,
-                                               session->pool, ctx->lang_det, ctx->ev_base);
+                                               session->pool, ctx->lang_det, ctx->event_loop);
 
                                task->resolver = ctx->resolver;
                                task->s = rspamd_session_create (session->pool,
@@ -1822,7 +1818,7 @@ rspamd_controller_handle_lua (struct rspamd_http_connection_entry *conn_ent,
        }
 
        task = rspamd_task_new (session->ctx->worker, session->cfg, session->pool,
-                       ctx->lang_det, ctx->ev_base);
+                       ctx->lang_det, ctx->event_loop);
 
        task->resolver = ctx->resolver;
        task->s = rspamd_session_create (session->pool,
@@ -2004,7 +2000,7 @@ rspamd_controller_handle_learn_common (
        }
 
        task = rspamd_task_new (session->ctx->worker, session->cfg, session->pool,
-                       session->ctx->lang_det, ctx->ev_base);
+                       session->ctx->lang_det, ctx->event_loop);
 
        task->resolver = ctx->resolver;
        task->s = rspamd_session_create (session->pool,
@@ -2102,7 +2098,7 @@ rspamd_controller_handle_scan (struct rspamd_http_connection_entry *conn_ent,
        }
 
        task = rspamd_task_new (session->ctx->worker, session->cfg, session->pool,
-                       ctx->lang_det, ctx->ev_base);
+                       ctx->lang_det, ctx->event_loop);
 
        task->resolver = ctx->resolver;
        task->s = rspamd_session_create (session->pool,
@@ -2133,7 +2129,7 @@ rspamd_controller_handle_scan (struct rspamd_http_connection_entry *conn_ent,
 
                event_set (&task->timeout_ev, -1, EV_TIMEOUT, rspamd_task_timeout,
                                task);
-               event_base_set (ctx->ev_base, &task->timeout_ev);
+               event_base_set (ctx->event_loop, &task->timeout_ev);
                double_to_tv (ctx->task_timeout, &task_tv);
                event_add (&task->timeout_ev, &task_tv);
        }
@@ -2600,7 +2596,7 @@ rspamd_controller_handle_stat_common (
        ctx = session->ctx;
 
        task = rspamd_task_new (session->ctx->worker, session->cfg, session->pool,
-                       ctx->lang_det, ctx->ev_base);
+                       ctx->lang_det, ctx->event_loop);
        task->resolver = ctx->resolver;
        cbdata = rspamd_mempool_alloc0 (session->pool, sizeof (*cbdata));
        cbdata->conn_ent = conn_ent;
@@ -3002,7 +2998,7 @@ rspamd_controller_handle_lua_plugin (struct rspamd_http_connection_entry *conn_e
        }
 
        task = rspamd_task_new (session->ctx->worker, session->cfg, session->pool,
-                       ctx->lang_det, ctx->ev_base);
+                       ctx->lang_det, ctx->event_loop);
 
        task->resolver = ctx->resolver;
        task->s = rspamd_session_create (session->pool,
@@ -3487,7 +3483,7 @@ lua_csession_get_ev_base (lua_State *L)
                s = c->ud;
                pbase = lua_newuserdata (L, sizeof (struct ev_loop *));
                rspamd_lua_setclass (L, "rspamd{ev_base}", -1);
-               *pbase = s->ctx->ev_base;
+               *pbase = s->ctx->event_loop;
        }
        else {
                return luaL_error (L, "invalid arguments");
@@ -3702,7 +3698,7 @@ start_controller_worker (struct rspamd_worker *worker)
        const guint save_stats_interval = 60 * 1000; /* 1 minute */
        gpointer m;
 
-       ctx->ev_base = rspamd_prepare_worker (worker,
+       ctx->event_loop = rspamd_prepare_worker (worker,
                        "controller",
                        rspamd_controller_accept_socket);
        msec_to_tv (ctx->timeout, &ctx->io_tv);
@@ -3752,7 +3748,7 @@ start_controller_worker (struct rspamd_worker *worker)
                if (ctx->rrd) {
                        ctx->rrd_event = g_malloc0 (sizeof (*ctx->rrd_event));
                        evtimer_set (ctx->rrd_event, rspamd_controller_rrd_update, ctx);
-                       event_base_set (ctx->ev_base, ctx->rrd_event);
+                       event_base_set (ctx->event_loop, ctx->rrd_event);
                        event_add (ctx->rrd_event, &rrd_update_time);
                }
                else if (rrd_err) {
@@ -3773,7 +3769,7 @@ start_controller_worker (struct rspamd_worker *worker)
                        "password");
 
        /* Accept event */
-       ctx->http_ctx = rspamd_http_context_create (ctx->cfg, ctx->ev_base,
+       ctx->http_ctx = rspamd_http_context_create (ctx->cfg, ctx->event_loop,
                        ctx->cfg->ups_ctx);
        ctx->http = rspamd_http_router_new (rspamd_controller_error_handler,
                        rspamd_controller_finish_handler, &ctx->io_tv,
@@ -3889,40 +3885,40 @@ start_controller_worker (struct rspamd_worker *worker)
                        rspamd_controller_handle_unknown);
 
        ctx->resolver = rspamd_dns_resolver_init (worker->srv->logger,
-                       ctx->ev_base,
+                       ctx->event_loop,
                        worker->srv->cfg);
 
        rspamd_upstreams_library_config (worker->srv->cfg, worker->srv->cfg->ups_ctx,
-                       ctx->ev_base, ctx->resolver->r);
-       rspamd_symcache_start_refresh (worker->srv->cfg->cache, ctx->ev_base,
+                       ctx->event_loop, ctx->resolver->r);
+       rspamd_symcache_start_refresh (worker->srv->cfg->cache, ctx->event_loop,
                        worker);
-       rspamd_stat_init (worker->srv->cfg, ctx->ev_base);
+       rspamd_stat_init (worker->srv->cfg, ctx->event_loop);
 
        if (worker->index == 0) {
                if (!ctx->cfg->disable_monitored) {
-                       rspamd_worker_init_monitored (worker, ctx->ev_base, ctx->resolver);
+                       rspamd_worker_init_monitored (worker, ctx->event_loop, ctx->resolver);
                }
 
-               rspamd_map_watch (worker->srv->cfg, ctx->ev_base,
+               rspamd_map_watch (worker->srv->cfg, ctx->event_loop,
                                ctx->resolver, worker, TRUE);
 
                /* Schedule periodic stats saving, see #1823 */
                event_set (&ctx->save_stats_event, -1, EV_PERSIST,
                                rspamd_controller_stats_save_periodic,
                                ctx);
-               event_base_set (ctx->ev_base, &ctx->save_stats_event);
+               event_base_set (ctx->event_loop, &ctx->save_stats_event);
                msec_to_tv (save_stats_interval, &stv);
                evtimer_add (&ctx->save_stats_event, &stv);
        }
        else {
-               rspamd_map_watch (worker->srv->cfg, ctx->ev_base,
+               rspamd_map_watch (worker->srv->cfg, ctx->event_loop,
                                ctx->resolver, worker, FALSE);
        }
 
-       rspamd_lua_run_postloads (ctx->cfg->lua_state, ctx->cfg, ctx->ev_base, worker);
+       rspamd_lua_run_postloads (ctx->cfg->lua_state, ctx->cfg, ctx->event_loop, worker);
 
        /* Start event loop */
-       event_base_loop (ctx->ev_base, 0);
+       event_base_loop (ctx->event_loop, 0);
        rspamd_worker_block_signals ();
 
        rspamd_stat_close ();
index 86a4230de55f0bf7b44dd73b6994a07d5b3fd801..4565be874e594cbb3dfa6225b6ed0532e4c2430f 100644 (file)
@@ -36,8 +36,7 @@
 #include "libserver/rspamd_control.h"
 #include "libutil/hash.h"
 #include "libutil/map_private.h"
-#include "libutil/http_private.h"
-#include "libutil/http_router.h"
+#include "contrib/uthash/utlist.h"
 #include "unix-std.h"
 
 #include <math.h>
@@ -132,7 +131,7 @@ static const guint64 rspamd_fuzzy_storage_magic = 0x291a3253eb1b3ea5ULL;
 struct rspamd_fuzzy_storage_ctx {
        guint64 magic;
        /* Events base */
-       struct ev_loop *ev_base;
+       struct ev_loop *event_loop;
        /* DNS resolver */
        struct rspamd_dns_resolver *resolver;
        /* Config */
@@ -146,34 +145,21 @@ struct rspamd_fuzzy_storage_ctx {
        struct rspamd_radix_map_helper *blocked_ips;
        struct rspamd_radix_map_helper *ratelimit_whitelist;
 
-       struct rspamd_cryptobox_keypair *sync_keypair;
-       struct rspamd_cryptobox_pubkey *master_key;
-       struct timeval master_io_tv;
-       gdouble master_timeout;
-       GPtrArray *mirrors;
        const ucl_object_t *update_map;
-       const ucl_object_t *masters_map;
        const ucl_object_t *blocked_map;
        const ucl_object_t *ratelimit_whitelist_map;
 
-       GHashTable *master_flags;
        guint keypair_cache_size;
-       gint peer_fd;
-       struct event peer_ev;
-       struct event stat_ev;
-       struct timeval stat_tv;
+       ev_timer stat_ev;
+       ev_io peer_ev;
+       ev_tstamp stat_timeout;
 
        /* Local keypair */
        struct rspamd_cryptobox_keypair *default_keypair; /* Bad clash, need for parse keypair */
        struct fuzzy_key *default_key;
        GHashTable *keys;
        gboolean encrypted_only;
-       gboolean collection_mode;
        gboolean read_only;
-       struct rspamd_cryptobox_keypair *collection_keypair;
-       struct rspamd_cryptobox_pubkey *collection_sign_key;
-       gchar *collection_id_file;
-       struct rspamd_http_context *http_ctx;
        struct rspamd_keypair_cache *keypair_cache;
        rspamd_lru_hash_t *errors_ips;
        rspamd_lru_hash_t *ratelimit_buckets;
@@ -181,7 +167,8 @@ struct rspamd_fuzzy_storage_ctx {
        GArray *updates_pending;
        guint updates_failed;
        guint updates_maxfail;
-       guint32 collection_id;
+       /* Used to send data between workers */
+       gint peer_fd;
 
        /* Ratelimits */
        guint leaky_bucket_ttl;
@@ -192,7 +179,6 @@ struct rspamd_fuzzy_storage_ctx {
        gdouble leaky_bucket_rate;
 
        struct rspamd_worker *worker;
-       struct rspamd_http_connection_router *collection_rt;
        const ucl_object_t *skip_map;
        struct rspamd_hash_map_helper *skip_hashes;
        guchar cookie[COOKIE_SIZE];
@@ -224,14 +210,14 @@ struct fuzzy_session {
        enum fuzzy_cmd_type cmd_type;
        gint fd;
        guint64 time;
-       struct event io;
+       struct ev_io io;
        ref_entry_t ref;
        struct fuzzy_key_stat *key_stat;
        guchar nm[rspamd_cryptobox_MAX_NMBYTES];
 };
 
 struct fuzzy_peer_request {
-       struct event io_ev;
+       ev_io io_ev;
        struct fuzzy_peer_cmd cmd;
 };
 
@@ -241,19 +227,13 @@ struct fuzzy_key {
        struct fuzzy_key_stat *stat;
 };
 
-struct fuzzy_master_update_session {
-       const gchar *name;
-       gchar uid[16];
-       struct rspamd_http_connection *conn;
-       struct rspamd_http_message *msg;
+struct rspamd_updates_cbdata {
+       GArray *updates_pending;
        struct rspamd_fuzzy_storage_ctx *ctx;
-       const gchar *src;
-       gchar *psrc;
-       rspamd_inet_addr_t *addr;
-       gboolean replied;
-       gint sock;
+       gchar *source;
 };
 
+
 static void rspamd_fuzzy_write_reply (struct fuzzy_session *session);
 
 static gboolean
@@ -261,8 +241,7 @@ rspamd_fuzzy_check_ratelimit (struct fuzzy_session *session)
 {
        rspamd_inet_addr_t *masked;
        struct rspamd_leaky_bucket_elt *elt;
-       struct timeval tv;
-       gdouble now;
+       ev_tstamp now;
 
        if (session->ctx->ratelimit_whitelist != NULL) {
                if (rspamd_match_radix_map_addr (session->ctx->ratelimit_whitelist,
@@ -289,15 +268,9 @@ rspamd_fuzzy_check_ratelimit (struct fuzzy_session *session)
                                MIN (MAX (session->ctx->leaky_bucket_mask * 4, 64), 128));
        }
 
-#ifdef HAVE_EVENT_NO_CACHE_TIME_FUNC
-       event_base_gettimeofday_cached (session->ctx->ev_base, &tv);
-#else
-       gettimeofday (&tv, NULL);
-#endif
-
-       now = tv_to_double (&tv);
+       now = ev_now (session->ctx->event_loop);
        elt = rspamd_lru_hash_lookup (session->ctx->ratelimit_buckets, masked,
-                       tv.tv_sec);
+                       now);
 
        if (elt) {
                gboolean ratelimited = FALSE;
@@ -348,7 +321,7 @@ rspamd_fuzzy_check_ratelimit (struct fuzzy_session *session)
                rspamd_lru_hash_insert (session->ctx->ratelimit_buckets,
                                masked,
                                elt,
-                               tv.tv_sec,
+                               now,
                                session->ctx->leaky_bucket_ttl);
        }
 
@@ -424,15 +397,6 @@ fuzzy_count_callback (guint64 count, void *ud)
        ctx->stat.fuzzy_hashes = count;
 }
 
-struct fuzzy_slave_connection {
-       struct rspamd_cryptobox_keypair *local_key;
-       struct rspamd_cryptobox_pubkey *remote_key;
-       struct upstream *up;
-       struct rspamd_http_connection *http_conn;
-       struct rspamd_fuzzy_mirror *mirror;
-       gint sock;
-};
-
 static void
 fuzzy_rl_bucket_free (gpointer p)
 {
@@ -443,227 +407,31 @@ fuzzy_rl_bucket_free (gpointer p)
 }
 
 static void
-fuzzy_mirror_close_connection (struct fuzzy_slave_connection *conn)
-{
-       if (conn) {
-               if (conn->http_conn) {
-                       rspamd_http_connection_reset (conn->http_conn);
-                       rspamd_http_connection_unref (conn->http_conn);
-               }
-
-               close (conn->sock);
-
-               g_free (conn);
-       }
-}
-
-struct rspamd_fuzzy_updates_cbdata {
-       struct rspamd_fuzzy_storage_ctx *ctx;
-       struct rspamd_http_message *msg;
-       struct fuzzy_slave_connection *conn;
-       struct rspamd_fuzzy_mirror *m;
-       GArray *updates_pending;
-};
-
-static void
-fuzzy_mirror_updates_version_cb (guint64 rev64, void *ud)
-{
-       struct rspamd_fuzzy_updates_cbdata *cbdata = ud;
-       struct fuzzy_peer_cmd *io_cmd;
-       guint32 rev32 = rev64, len;
-       const gchar *p;
-       rspamd_fstring_t *reply;
-       struct fuzzy_slave_connection *conn;
-       struct rspamd_fuzzy_storage_ctx *ctx;
-       struct rspamd_http_message *msg;
-       struct rspamd_fuzzy_mirror *m;
-       struct timeval tv;
-       guint i;
-
-       conn = cbdata->conn;
-       ctx = cbdata->ctx;
-       msg = cbdata->msg;
-       m = cbdata->m;
-
-       rev32 = GUINT32_TO_LE (rev32);
-       len = sizeof (guint32) * 2; /* revision + last chunk */
-
-       for (i = 0; i < cbdata->updates_pending->len; i ++) {
-               io_cmd = &g_array_index (cbdata->updates_pending,
-                               struct fuzzy_peer_cmd, i);
-
-               if (io_cmd->is_shingle) {
-                       len += sizeof (guint32) + sizeof (guint32) +
-                                       sizeof (struct rspamd_fuzzy_shingle_cmd);
-               }
-               else {
-                       len += sizeof (guint32) + sizeof (guint32) +
-                                       sizeof (struct rspamd_fuzzy_cmd);
-               }
-       }
-
-       reply = rspamd_fstring_sized_new (len);
-       reply = rspamd_fstring_append (reply, (const char *)&rev32,
-                       sizeof (rev32));
-
-       for (i = 0; i < cbdata->updates_pending->len; i ++) {
-               io_cmd = &g_array_index (cbdata->updates_pending, struct fuzzy_peer_cmd, i);
-
-               if (io_cmd->is_shingle) {
-                       len = sizeof (guint32) +
-                                       sizeof (struct rspamd_fuzzy_shingle_cmd);
-               }
-               else {
-                       len = sizeof (guint32) +
-                                       sizeof (struct rspamd_fuzzy_cmd);
-               }
-
-               p = (const char *)io_cmd;
-               len = GUINT32_TO_LE (len);
-               reply = rspamd_fstring_append (reply, (const char *)&len, sizeof (len));
-               reply = rspamd_fstring_append (reply, p, len);
-       }
-
-       /* Last chunk */
-       len = 0;
-       reply = rspamd_fstring_append (reply, (const char *)&len, sizeof (len));
-       rspamd_http_message_set_body_from_fstring_steal (msg, reply);
-       double_to_tv (ctx->sync_timeout, &tv);
-       rspamd_http_connection_write_message (conn->http_conn,
-                       msg, NULL, NULL, conn,
-                       &tv);
-       msg_info ("send update request to %s", m->name);
-
-       g_array_free (cbdata->updates_pending, TRUE);
-       g_free (cbdata);
-}
-
-static void
-fuzzy_mirror_updates_to_http (struct rspamd_fuzzy_mirror *m,
-                                                         struct fuzzy_slave_connection *conn,
-                                                         struct rspamd_fuzzy_storage_ctx *ctx,
-                                                         struct rspamd_http_message *msg,
-                                                         GArray *updates)
-{
-
-       struct rspamd_fuzzy_updates_cbdata *cbdata;
-
-       cbdata = g_malloc (sizeof (*cbdata));
-       cbdata->ctx = ctx;
-       cbdata->msg = msg;
-       cbdata->conn = conn;
-       cbdata->m = m;
-       /* Copy queue */
-       cbdata->updates_pending = g_array_sized_new (FALSE, FALSE,
-                       sizeof (struct fuzzy_peer_cmd), updates->len);
-       g_array_append_vals (cbdata->updates_pending, updates->data, updates->len);
-       rspamd_fuzzy_backend_version (ctx->backend, local_db_name,
-                       fuzzy_mirror_updates_version_cb, cbdata);
-}
-
-static void
-fuzzy_mirror_error_handler (struct rspamd_http_connection *conn, GError *err)
-{
-       struct fuzzy_slave_connection *bk_conn = conn->ud;
-       msg_info ("abnormally closing connection from backend: %s:%s, "
-                       "error: %e",
-                       bk_conn->mirror->name,
-                       rspamd_inet_address_to_string (rspamd_upstream_addr_cur (bk_conn->up)),
-                       err);
-
-       fuzzy_mirror_close_connection (bk_conn);
-}
-
-static gint
-fuzzy_mirror_finish_handler (struct rspamd_http_connection *conn,
-       struct rspamd_http_message *msg)
+fuzzy_stat_count_callback (guint64 count, void *ud)
 {
-       struct fuzzy_slave_connection *bk_conn = conn->ud;
-
-       msg_info ("finished mirror connection to %s", bk_conn->mirror->name);
-       fuzzy_mirror_close_connection (bk_conn);
+       struct rspamd_fuzzy_storage_ctx *ctx = ud;
 
-       return 0;
+       ev_timer_again (ctx->event_loop, &ctx->stat_ev);
+       ctx->stat.fuzzy_hashes = count;
 }
 
 static void
-rspamd_fuzzy_send_update_mirror (struct rspamd_fuzzy_storage_ctx *ctx,
-               struct rspamd_fuzzy_mirror *m, GArray *updates)
+rspamd_fuzzy_stat_callback (EV_P_ ev_timer *w, int revents)
 {
-       struct fuzzy_slave_connection *conn;
-       struct rspamd_http_message *msg;
-
-       conn = g_malloc0 (sizeof (*conn));
-       conn->up = rspamd_upstream_get (m->u,
-                       RSPAMD_UPSTREAM_MASTER_SLAVE, NULL, 0);
-       conn->mirror = m;
-
-       if (conn->up == NULL) {
-               g_free (conn);
-               msg_err ("cannot select upstream for %s", m->name);
-               return;
-       }
-
-       conn->sock = rspamd_inet_address_connect (
-                       rspamd_upstream_addr_next (conn->up),
-                       SOCK_STREAM, TRUE);
-
-       if (conn->sock == -1) {
-               g_free (conn);
-               msg_err ("cannot connect upstream for %s", m->name);
-               rspamd_upstream_fail (conn->up, TRUE);
-               return;
-       }
-
-       msg = rspamd_http_new_message (HTTP_REQUEST);
-       rspamd_printf_fstring (&msg->url, "/update_v1/%s", m->name);
-
-       conn->http_conn = rspamd_http_connection_new_client_socket (
-                       ctx->http_ctx,
-                       NULL,
-                       fuzzy_mirror_error_handler,
-                       fuzzy_mirror_finish_handler,
-                       RSPAMD_HTTP_CLIENT_SIMPLE,
-                       conn->sock);
-
-       rspamd_http_connection_set_key (conn->http_conn,
-                       ctx->sync_keypair);
-       msg->peer_key = rspamd_pubkey_ref (m->key);
-       fuzzy_mirror_updates_to_http (m, conn, ctx, msg, updates);
+       struct rspamd_fuzzy_storage_ctx *ctx =
+                       (struct rspamd_fuzzy_storage_ctx *)w->data;
+       rspamd_fuzzy_backend_count (ctx->backend, fuzzy_stat_count_callback, ctx);
 }
 
-struct rspamd_updates_cbdata {
-       GArray *updates_pending;
-       struct rspamd_fuzzy_storage_ctx *ctx;
-       gchar *source;
-};
 
 static void
 fuzzy_update_version_callback (guint64 ver, void *ud)
 {
        msg_info ("updated fuzzy storage from %s: version: %d",
-               (const char *)ud, (gint)ver);
+                       (const char *)ud, (gint)ver);
        g_free (ud);
 }
 
-static void
-fuzzy_stat_count_callback (guint64 count, void *ud)
-{
-       struct rspamd_fuzzy_storage_ctx *ctx = ud;
-
-       event_add (&ctx->stat_ev, &ctx->stat_tv);
-       ctx->stat.fuzzy_hashes = count;
-}
-
-static void
-rspamd_fuzzy_stat_callback (gint fd, gshort what, gpointer ud)
-{
-       struct rspamd_fuzzy_storage_ctx *ctx = ud;
-
-       event_del (&ctx->stat_ev);
-       rspamd_fuzzy_backend_count (ctx->backend, fuzzy_stat_count_callback, ctx);
-}
-
 static void
 rspamd_fuzzy_updates_cb (gboolean success,
                                                 guint nadded,
@@ -673,8 +441,6 @@ rspamd_fuzzy_updates_cb (gboolean success,
                                                 void *ud)
 {
        struct rspamd_updates_cbdata *cbdata = ud;
-       struct rspamd_fuzzy_mirror *m;
-       guint i;
        struct rspamd_fuzzy_storage_ctx *ctx;
        const gchar *source;
 
@@ -684,15 +450,6 @@ rspamd_fuzzy_updates_cb (gboolean success,
        if (success) {
                rspamd_fuzzy_backend_count (ctx->backend, fuzzy_count_callback, ctx);
 
-               if (ctx->updates_pending->len > 0) {
-                       for (i = 0; i < ctx->mirrors->len; i ++) {
-                               m = g_ptr_array_index (ctx->mirrors, i);
-
-                               rspamd_fuzzy_send_update_mirror (ctx, m,
-                                               cbdata->updates_pending);
-                       }
-               }
-
                msg_info ("successfully updated fuzzy storage: %d updates in queue; "
                                  "%d pending currently; "
                                  "%d added, %d deleted, %d extended, %d duplicates",
@@ -727,12 +484,7 @@ rspamd_fuzzy_updates_cb (gboolean success,
 
        if (ctx->worker->wanna_die) {
                /* Plan exit */
-               struct timeval tv;
-
-               tv.tv_sec = 0;
-               tv.tv_usec = 0;
-
-               event_base_loopexit (ctx->ev_base, &tv);
+               ev_break (ctx->event_loop, EVBREAK_ALL);
        }
 
        g_array_free (cbdata->updates_pending, TRUE);
@@ -762,9 +514,9 @@ rspamd_fuzzy_process_updates_queue (struct rspamd_fuzzy_storage_ctx *ctx,
 }
 
 static void
-rspamd_fuzzy_reply_io (gint fd, gshort what, gpointer d)
+rspamd_fuzzy_reply_io (EV_P_ ev_io *w, int revents)
 {
-       struct fuzzy_session *session = d;
+       struct fuzzy_session *session = (struct fuzzy_session *)w->data;
 
        rspamd_fuzzy_write_reply (session);
        REF_RELEASE (session);
@@ -807,10 +559,9 @@ rspamd_fuzzy_write_reply (struct fuzzy_session *session)
                if (errno == EINTR || errno == EWOULDBLOCK || errno == EAGAIN) {
                        /* Grab reference to avoid early destruction */
                        REF_RETAIN (session);
-                       event_set (&session->io, session->fd, EV_WRITE,
-                                       rspamd_fuzzy_reply_io, session);
-                       event_base_set (session->ctx->ev_base, &session->io);
-                       event_add (&session->io, NULL);
+                       session->io.data = session;
+                       ev_io_init (&session->io, rspamd_fuzzy_reply_io, session->fd, EV_WRITE);
+                       ev_io_start (session->ctx->event_loop, &session->io);
                }
                else {
                        msg_err ("error while writing reply: %s", strerror (errno));
@@ -818,22 +569,6 @@ rspamd_fuzzy_write_reply (struct fuzzy_session *session)
        }
 }
 
-static void
-fuzzy_peer_send_io (gint fd, gshort what, gpointer d)
-{
-       struct fuzzy_peer_request *up_req = d;
-       gssize r;
-
-       r = write (fd, &up_req->cmd, sizeof (up_req->cmd));
-
-       if (r != sizeof (up_req->cmd)) {
-               msg_err ("cannot send update request to the peer: %s", strerror (errno));
-       }
-
-       event_del (&up_req->io_ev);
-       g_free (up_req);
-}
-
 static void
 rspamd_fuzzy_update_stats (struct rspamd_fuzzy_storage_ctx *ctx,
                enum rspamd_fuzzy_epoch epoch,
@@ -945,6 +680,22 @@ rspamd_fuzzy_make_reply (struct rspamd_fuzzy_cmd *cmd,
        rspamd_fuzzy_write_reply (session);
 }
 
+static void
+fuzzy_peer_send_io (EV_P_ ev_io *w, int revents)
+{
+       struct fuzzy_peer_request *up_req = (struct fuzzy_peer_request *)w->data;
+       gssize r;
+
+       r = write (w->fd, &up_req->cmd, sizeof (up_req->cmd));
+
+       if (r != sizeof (up_req->cmd)) {
+               msg_err ("cannot send update request to the peer: %s", strerror (errno));
+       }
+
+       ev_io_stop (EV_A_ w);
+       g_free (up_req);
+}
+
 static void
 rspamd_fuzzy_check_callback (struct rspamd_fuzzy_reply *result, void *ud)
 {
@@ -984,7 +735,7 @@ rspamd_fuzzy_check_callback (struct rspamd_fuzzy_reply *result, void *ud)
                struct fuzzy_peer_cmd up_cmd;
                struct fuzzy_peer_request *up_req;
 
-               if (session->worker->index == 0 || session->ctx->peer_fd == -1) {
+               if (session->worker->index == 0) {
                        /* Just add to the queue */
                        memset (&up_cmd, 0, sizeof (up_cmd));
                        up_cmd.is_shingle = is_shingle;
@@ -1017,10 +768,10 @@ rspamd_fuzzy_check_callback (struct rspamd_fuzzy_reply *result, void *ud)
                                                sizeof (up_req->cmd.cmd.shingle.sgl));
                        }
 
-                       event_set (&up_req->io_ev, session->ctx->peer_fd, EV_WRITE,
-                                       fuzzy_peer_send_io, up_req);
-                       event_base_set (session->ctx->ev_base, &up_req->io_ev);
-                       event_add (&up_req->io_ev, NULL);
+                       up_req->io_ev.data = up_req;
+                       ev_io_init (&up_req->io_ev, fuzzy_peer_send_io,
+                                       session->ctx->peer_fd, EV_WRITE);
+                       ev_io_start (session->ctx->event_loop, &up_req->io_ev);
                }
        }
 
@@ -1103,17 +854,9 @@ rspamd_fuzzy_process_command (struct fuzzy_session *session)
 
        if (cmd->cmd == FUZZY_CHECK) {
                if (rspamd_fuzzy_check_client (session, FALSE)) {
-                       if (G_UNLIKELY (session->ctx->collection_mode)) {
-                               result.v1.prob = 0;
-                               result.v1.value = 500;
-                               result.v1.flag = 0;
-                               rspamd_fuzzy_make_reply (cmd, &result, session, encrypted,
-                                               is_shingle);
-                       } else {
-                               REF_RETAIN (session);
-                               rspamd_fuzzy_backend_check (session->ctx->backend, cmd,
-                                               rspamd_fuzzy_check_callback, session);
-                       }
+                       REF_RETAIN (session);
+                       rspamd_fuzzy_backend_check (session->ctx->backend, cmd,
+                                       rspamd_fuzzy_check_callback, session);
                }
                else {
                        result.v1.value = 403;
@@ -1123,18 +866,10 @@ rspamd_fuzzy_process_command (struct fuzzy_session *session)
                }
        }
        else if (cmd->cmd == FUZZY_STAT) {
-               if (G_UNLIKELY (session->ctx->collection_mode)) {
-                       result.v1.prob = 0;
-                       result.v1.value = 500;
-                       result.v1.flag = 0;
-                       rspamd_fuzzy_make_reply (cmd, &result, session, encrypted, is_shingle);
-               }
-               else {
-                       result.v1.prob = 1.0;
-                       result.v1.value = 0;
-                       result.v1.flag = session->ctx->stat.fuzzy_hashes;
-                       rspamd_fuzzy_make_reply (cmd, &result, session, encrypted, is_shingle);
-               }
+               result.v1.prob = 1.0;
+               result.v1.value = 0;
+               result.v1.flag = session->ctx->stat.fuzzy_hashes;
+               rspamd_fuzzy_make_reply (cmd, &result, session, encrypted, is_shingle);
        }
        else {
                if (rspamd_fuzzy_check_client (session, TRUE)) {
@@ -1169,10 +904,10 @@ rspamd_fuzzy_process_command (struct fuzzy_session *session)
                                                (gpointer)&up_req->cmd.cmd.shingle :
                                                (gpointer)&up_req->cmd.cmd.normal;
                                memcpy (ptr, cmd, up_len);
-                               event_set (&up_req->io_ev, session->ctx->peer_fd, EV_WRITE,
-                                               fuzzy_peer_send_io, up_req);
-                               event_base_set (session->ctx->ev_base, &up_req->io_ev);
-                               event_add (&up_req->io_ev, NULL);
+                               up_req->io_ev.data = up_req;
+                               ev_io_init (&up_req->io_ev, fuzzy_peer_send_io,
+                                               session->ctx->peer_fd, EV_WRITE);
+                               ev_io_start (session->ctx->event_loop, &up_req->io_ev);
                        }
 
                        result.v1.value = 0;
@@ -1378,746 +1113,126 @@ rspamd_fuzzy_cmd_from_wire (guchar *buf, guint buflen, struct fuzzy_session *s)
        return TRUE;
 }
 
+
 static void
-rspamd_fuzzy_mirror_process_update (struct fuzzy_master_update_session *session,
-               struct rspamd_http_message *msg, guint our_rev)
+fuzzy_session_destroy (gpointer d)
 {
-       const guchar *p;
-       gsize remain;
-       gint32 revision;
-       guint32 len = 0, cnt = 0;
-       struct fuzzy_peer_cmd cmd;
-       enum {
-               read_len = 0,
-               read_data,
-               finish_processing
-       } state = read_len;
-
-       gpointer flag_ptr;
+       struct fuzzy_session *session = d;
 
-       /*
-        * Message format:
-        * <uint32_le> - revision
-        * <uint32_le> - size of the next element
-        * <data> - command data
-        * ...
-        * <0> - end of data
-        * ... - ignored
-        */
-       p = rspamd_http_message_get_body (msg, &remain);
+       rspamd_inet_address_free (session->addr);
+       rspamd_explicit_memzero (session->nm, sizeof (session->nm));
+       session->worker->nconns--;
+       g_free (session);
+}
 
-       if (p && remain >= sizeof (gint32) * 2) {
-               memcpy (&revision, p, sizeof (gint32));
-               revision = GINT32_TO_LE (revision);
+/*
+ * Accept new connection and construct task
+ */
+static void
+accept_fuzzy_socket (EV_P_ ev_io *w, int revents)
+{
+       struct rspamd_worker *worker = (struct rspamd_worker *)w->data;
+       struct fuzzy_session *session;
+       rspamd_inet_addr_t *addr;
+       gssize r;
+       guint8 buf[512];
+       guint64 *nerrors;
 
-               if (revision <= our_rev) {
-                       msg_err_fuzzy_update ("remote revision: %d is older than ours: %d, "
-                                       "refusing update",
-                                       revision, our_rev);
+       /* Got some data */
+       if (revents == EV_READ) {
 
-                       return;
-               }
-               else if (revision - our_rev > 1) {
-                       msg_warn_fuzzy_update ("remote revision: %d is newer more than one revision "
-                                       "than ours: %d, cold sync is recommended",
-                                                               revision, our_rev);
-               }
+               for (;;) {
+                       worker->nconns++;
 
-               remain -= sizeof (gint32);
-               p += sizeof (gint32);
-       }
-       else {
-               msg_err_fuzzy_update ("short update message, not processing");
-               goto err;
-       }
+                       r = rspamd_inet_address_recvfrom (w->fd,
+                                       buf,
+                                       sizeof (buf),
+                                       0,
+                                       &addr);
 
-       while (remain > 0) {
-               switch (state) {
-               case read_len:
-                       if (remain < sizeof (guint32)) {
-                               msg_err_fuzzy_update ("short update message while reading "
-                                               "length, not processing");
-                               goto err;
-                       }
+                       if (r == -1) {
+                               if (errno == EINTR) {
+                                       continue;
+                               }
+                               else if (errno == EAGAIN || errno == EWOULDBLOCK) {
 
-                       memcpy (&len, p, sizeof (guint32));
-                       len = GUINT32_TO_LE (len);
-                       remain -= sizeof (guint32);
-                       p += sizeof (guint32);
+                                       return;
+                               }
 
-                       if (len == 0) {
-                               remain = 0;
-                               state = finish_processing;
-                       }
-                       else {
-                               state = read_data;
-                       }
-                       break;
-               case read_data:
-                       if (remain < len) {
-                               msg_err_fuzzy_update ("short update message while reading data, "
-                                               "not processing"
-                                               " (%zd is available, %d is required)", remain, len);
+                               msg_err ("got error while reading from socket: %d, %s",
+                                               errno,
+                                               strerror (errno));
                                return;
                        }
 
-                       if (len < sizeof (struct rspamd_fuzzy_cmd) + sizeof (guint32) ||
-                                       len > sizeof (cmd)) {
-                               /* Bad size command */
-                               msg_err_fuzzy_update ("incorrect element size: %d, at least "
-                                               "%d expected", len,
-                                               (gint)(sizeof (struct rspamd_fuzzy_cmd) + sizeof (guint32)));
-                               goto err;
-                       }
+                       session = g_malloc0 (sizeof (*session));
+                       REF_INIT_RETAIN (session, fuzzy_session_destroy);
+                       session->worker = worker;
+                       session->fd = w->fd;
+                       session->ctx = worker->ctx;
+                       session->time = (guint64) time (NULL);
+                       session->addr = addr;
 
-                       memcpy (&cmd, p, len);
-                       if (cmd.is_shingle && len != sizeof (cmd)) {
-                               /* Short command */
-                               msg_err_fuzzy_update ("incorrect element size: %d, at least "
-                                               "%d expected", len,
-                                               (gint)(sizeof (cmd)));
-                               goto err;
+                       if (rspamd_fuzzy_cmd_from_wire (buf, r, session)) {
+                               /* Check shingles count sanity */
+                               rspamd_fuzzy_process_command (session);
                        }
+                       else {
+                               /* Discard input */
+                               session->ctx->stat.invalid_requests ++;
+                               msg_debug ("invalid fuzzy command of size %z received", r);
+
+                               nerrors = rspamd_lru_hash_lookup (session->ctx->errors_ips,
+                                               addr, -1);
 
-                       if (cmd.is_shingle) {
-                               if ((flag_ptr = g_hash_table_lookup (session->ctx->master_flags,
-                                               GUINT_TO_POINTER (cmd.cmd.shingle.basic.flag))) != NULL) {
-                                       cmd.cmd.shingle.basic.flag = GPOINTER_TO_UINT (flag_ptr);
+                               if (nerrors == NULL) {
+                                       nerrors = g_malloc (sizeof (*nerrors));
+                                       *nerrors = 1;
+                                       rspamd_lru_hash_insert (session->ctx->errors_ips,
+                                                       rspamd_inet_address_copy (addr),
+                                                       nerrors, -1, -1);
                                }
-                       }
-                       else {
-                               if ((flag_ptr = g_hash_table_lookup (session->ctx->master_flags,
-                                               GUINT_TO_POINTER (cmd.cmd.normal.flag))) != NULL) {
-                                       cmd.cmd.normal.flag = GPOINTER_TO_UINT (flag_ptr);
+                               else {
+                                       *nerrors = *nerrors + 1;
                                }
                        }
 
-                       g_array_append_val (session->ctx->updates_pending, cmd);
-
-                       p += len;
-                       remain -= len;
-                       len = 0;
-                       state = read_len;
-                       cnt ++;
-                       break;
-               case finish_processing:
-                       /* Do nothing */
-                       remain = 0;
-                       break;
+                       REF_RELEASE (session);
                }
        }
-
-
-       rspamd_fuzzy_process_updates_queue (session->ctx, session->src, TRUE);
-       msg_info_fuzzy_update ("processed updates from the master %s, "
-                       "%ud operations processed,"
-                       " revision: %d (local revision: %d)",
-                       rspamd_inet_address_to_string (session->addr),
-                       cnt, revision, our_rev);
-
-err:
-       return;
 }
 
-
-static void
-fuzzy_session_destroy (gpointer d)
+static gboolean
+rspamd_fuzzy_storage_periodic_callback (void *ud)
 {
-       struct fuzzy_session *session = d;
+       struct rspamd_fuzzy_storage_ctx *ctx = ud;
 
-       rspamd_inet_address_free (session->addr);
-       rspamd_explicit_memzero (session->nm, sizeof (session->nm));
-       session->worker->nconns--;
-       g_free (session);
-}
+       if (ctx->updates_pending->len > 0) {
+               rspamd_fuzzy_process_updates_queue (ctx, local_db_name, FALSE);
 
-static void
-rspamd_fuzzy_mirror_session_destroy (struct fuzzy_master_update_session *session)
-{
-       if (session) {
-               rspamd_http_connection_reset (session->conn);
-               rspamd_http_connection_unref (session->conn);
-               rspamd_inet_address_free (session->addr);
-               close (session->sock);
-
-               if (session->psrc) {
-                       g_free (session->psrc);
-               }
-               g_free (session);
+               return TRUE;
        }
-}
-
-static void
-rspamd_fuzzy_mirror_error_handler (struct rspamd_http_connection *conn, GError *err)
-{
-       struct fuzzy_master_update_session *session = conn->ud;
 
-       msg_err_fuzzy_update ("abnormally closing connection from: %s, error: %e",
-               rspamd_inet_address_to_string (session->addr), err);
-       /* Terminate session immediately */
-       rspamd_fuzzy_mirror_session_destroy (session);
+       return FALSE;
 }
 
-static void
-rspamd_fuzzy_mirror_send_reply (struct fuzzy_master_update_session *session,
-               guint code, const gchar *str)
+static gboolean
+rspamd_fuzzy_storage_sync (struct rspamd_main *rspamd_main,
+               struct rspamd_worker *worker, gint fd,
+               gint attached_fd,
+               struct rspamd_control_command *cmd,
+               gpointer ud)
 {
-       struct rspamd_http_message *msg;
-
-       msg = rspamd_http_new_message (HTTP_RESPONSE);
-       msg->url = rspamd_fstring_new_init (str, strlen (str));
-       msg->code = code;
-       session->replied = TRUE;
+       struct rspamd_fuzzy_storage_ctx *ctx = ud;
+       struct rspamd_control_reply rep;
 
-       rspamd_http_connection_reset (session->conn);
-       rspamd_http_connection_write_message (session->conn, msg, NULL, "text/plain",
-                       session, &session->ctx->master_io_tv);
-}
+       rep.reply.fuzzy_sync.status = 0;
 
-static void
-rspamd_fuzzy_update_version_callback (guint64 version, void *ud)
-{
-       struct fuzzy_master_update_session *session = ud;
-
-       rspamd_fuzzy_mirror_process_update (session, session->msg, version);
-       rspamd_fuzzy_mirror_send_reply (session, 200, "OK");
-}
-
-static gint
-rspamd_fuzzy_mirror_finish_handler (struct rspamd_http_connection *conn,
-       struct rspamd_http_message *msg)
-{
-       struct fuzzy_master_update_session *session = conn->ud;
-       const struct rspamd_cryptobox_pubkey *rk;
-       const gchar *err_str = NULL;
-       gchar *psrc;
-       const gchar *src = NULL;
-       gsize remain;
-
-       if (session->replied) {
-               rspamd_fuzzy_mirror_session_destroy (session);
-
-               return 0;
-       }
-
-       /* Check key */
-       if (!rspamd_http_connection_is_encrypted (conn)) {
-               msg_err_fuzzy_update ("refuse unencrypted update from: %s",
-                               rspamd_inet_address_to_string (session->addr));
-               err_str = "Unencrypted update is not allowed";
-               goto end;
-       }
-       else {
-
-               if (session->ctx->master_key) {
-                       rk = rspamd_http_connection_get_peer_key (conn);
-                       g_assert (rk != NULL);
-
-                       if (!rspamd_pubkey_equal (rk, session->ctx->master_key)) {
-                               msg_err_fuzzy_update ("refuse unknown pubkey update from: %s",
-                                               rspamd_inet_address_to_string (session->addr));
-                               err_str = "Unknown pubkey";
-                               goto end;
-                       }
-               }
-               else {
-                       msg_warn_fuzzy_update ("no trusted key specified, accept any update from %s",
-                                       rspamd_inet_address_to_string (session->addr));
-               }
-               if (!rspamd_http_message_get_body (msg, NULL) || !msg->url
-                               || msg->url->len == 0) {
-                       msg_err_fuzzy_update ("empty update message, not processing");
-                       err_str = "Empty update";
-
-                       goto end;
-               }
-
-               /* Detect source from url: /update_v1/<source>, so we look for the last '/' */
-               remain = msg->url->len;
-               psrc = rspamd_fstringdup (msg->url);
-               src = psrc;
-
-               while (remain--) {
-                       if (src[remain] == '/') {
-                               src = &src[remain + 1];
-                               break;
-                       }
-               }
-
-               session->src = src;
-               session->psrc = psrc;
-               session->msg = msg;
-               rspamd_fuzzy_backend_version (session->ctx->backend, src,
-                               rspamd_fuzzy_update_version_callback, session);
-
-               return 0;
-       }
-
-end:
-       rspamd_fuzzy_mirror_send_reply (session, 403, err_str);
-
-       return 0;
-}
-
-struct rspamd_fuzzy_collection_session {
-       struct rspamd_fuzzy_storage_ctx *ctx;
-       struct rspamd_worker *worker;
-       rspamd_inet_addr_t *from_addr;
-       guchar uid[16];
-};
-
-static void
-rspamd_fuzzy_collection_error_handler (struct rspamd_http_connection_entry *conn_ent,
-       GError *err)
-{
-       struct rspamd_fuzzy_collection_session *session = conn_ent->ud;
-
-       msg_err_fuzzy_collection ("http error occurred: %s", err->message);
-}
-
-static void
-rspamd_fuzzy_collection_finish_handler (struct rspamd_http_connection_entry *conn_ent)
-{
-       struct rspamd_fuzzy_collection_session *session = conn_ent->ud;
-
-
-       rspamd_inet_address_free (session->from_addr);
-       g_free (session);
-}
-
-void
-rspamd_fuzzy_collection_send_error (struct rspamd_http_connection_entry *entry,
-       gint code, const gchar *error_msg, ...)
-{
-       struct rspamd_http_message *msg;
-       va_list args;
-       rspamd_fstring_t *reply;
-
-       msg = rspamd_http_new_message (HTTP_RESPONSE);
-
-       va_start (args, error_msg);
-       msg->status = rspamd_fstring_new ();
-       rspamd_vprintf_fstring (&msg->status, error_msg, args);
-       va_end (args);
-
-       msg->date = time (NULL);
-       msg->code = code;
-       reply = rspamd_fstring_sized_new (msg->status->len + 16);
-       rspamd_printf_fstring (&reply, "%V", msg->status);
-       rspamd_http_message_set_body_from_fstring_steal (msg, reply);
-       rspamd_http_connection_reset (entry->conn);
-       rspamd_http_router_insert_headers (entry->rt, msg);
-       rspamd_http_connection_write_message (entry->conn,
-               msg,
-               NULL,
-               "text/plain",
-               entry,
-               entry->rt->ptv);
-       entry->is_reply = TRUE;
-}
-
-/*
- * Note: this function steals fstring
- */
-void
-rspamd_fuzzy_collection_send_fstring (struct rspamd_http_connection_entry *entry,
-       rspamd_fstring_t *fstr)
-{
-       struct rspamd_http_message *msg;
-
-       msg = rspamd_http_new_message (HTTP_RESPONSE);
-       msg->status = rspamd_fstring_new_init ("OK", 2);
-       msg->date = time (NULL);
-       msg->code = 200;
-       rspamd_http_message_set_body_from_fstring_steal (msg, fstr);
-       rspamd_http_connection_reset (entry->conn);
-       rspamd_http_router_insert_headers (entry->rt, msg);
-       rspamd_http_connection_write_message (entry->conn,
-               msg,
-               NULL,
-               "application/octet-stream",
-               entry,
-               entry->rt->ptv);
-       entry->is_reply = TRUE;
-}
-
-static int
-rspamd_fuzzy_collection_cookie (struct rspamd_http_connection_entry *conn_ent,
-       struct rspamd_http_message *msg)
-{
-       struct rspamd_fuzzy_collection_session *session = conn_ent->ud;
-       rspamd_fstring_t *cookie;
-
-       cookie = rspamd_fstring_new_init (session->ctx->cookie,
-                       sizeof (session->ctx->cookie));
-       rspamd_fuzzy_collection_send_fstring (conn_ent, cookie);
-
-       return 0;
-}
-
-static int
-rspamd_fuzzy_collection_data (struct rspamd_http_connection_entry *conn_ent,
-       struct rspamd_http_message *msg)
-{
-       struct rspamd_fuzzy_collection_session *session = conn_ent->ud;
-       const rspamd_ftok_t *sign_header;
-       struct rspamd_fuzzy_storage_ctx *ctx;
-       guint i;
-       struct fuzzy_peer_cmd *io_cmd;
-       rspamd_fstring_t *reply;
-       GError *err = NULL;
-       guchar *decoded_signature;
-       gsize dec_len;
-       guint32 cmdlen, nupdates = 0;
-
-       sign_header = rspamd_http_message_find_header (msg, "Signature");
-
-       if (sign_header == NULL) {
-               rspamd_fuzzy_collection_send_error (conn_ent, 403, "Missing signature");
-
-               return 0;
-       }
-
-       ctx = session->ctx;
-
-       if (ctx->collection_sign_key == NULL) {
-               rspamd_fuzzy_collection_send_error (conn_ent, 500, "Misconfigured signature key");
-
-               return 0;
-       }
-
-       decoded_signature = g_malloc (sign_header->len * 2 + 1);
-       dec_len = rspamd_decode_hex_buf (sign_header->begin, sign_header->len,
-                       decoded_signature, sign_header->len * 2 + 1);
-
-       if (dec_len == -1 || !rspamd_keypair_verify (ctx->collection_sign_key,
-                       ctx->cookie, sizeof (ctx->cookie),
-                       decoded_signature, dec_len, &err)) {
-               if (err) {
-                       rspamd_fuzzy_collection_send_error (conn_ent, 403, "Signature verification error: %e",
-                                       err);
-                       g_error_free (err);
-               }
-               else {
-                       rspamd_fuzzy_collection_send_error (conn_ent, 403, "Signature verification error");
-               }
-
-               g_free (decoded_signature);
-
-               return 0;
-       }
-
-       g_free (decoded_signature);
-
-       /* Generate new cookie */
-       ottery_rand_bytes (ctx->cookie, sizeof (ctx->cookie));
-
-       /* Send&Clear updates */
-       reply = rspamd_fstring_sized_new (8192);
-       /*
-        * Message format:
-        * <uint32_le> - revision
-        * <uint32_le> - size of the next element
-        * <data> - command data
-        * ...
-        * <0> - end of data
-        * ... - ignored
-        */
-       reply = rspamd_fstring_append (reply, (const gchar *)&ctx->collection_id,
-                                       sizeof (ctx->collection_id));
-
-       for (i = 0; i < ctx->updates_pending->len; i ++) {
-               io_cmd = &g_array_index (ctx->updates_pending, struct fuzzy_peer_cmd, i);
-
-               if (io_cmd->is_shingle) {
-                       cmdlen = sizeof (io_cmd->cmd.shingle) + sizeof (guint32);
-
-               }
-               else {
-                       cmdlen = sizeof (io_cmd->cmd.normal) + sizeof (guint32);
-               }
-
-               cmdlen = GUINT32_TO_LE (cmdlen);
-               reply = rspamd_fstring_append (reply, (const gchar *)&cmdlen,
-                               sizeof (cmdlen));
-               reply = rspamd_fstring_append (reply, (const gchar *)io_cmd,
-                               cmdlen);
-               nupdates ++;
-       }
-
-       msg_info_fuzzy_collection ("collection %d done, send %d updates",
-                       ctx->collection_id, nupdates);
-       /* Last command */
-       cmdlen = 0;
-       reply = rspamd_fstring_append (reply, (const gchar *)&cmdlen,
-                       sizeof (cmdlen));
-
-       ctx->updates_pending->len = 0;
-       /* Clear failed attempts counter */
-       ctx->updates_failed = 0;
-       ctx->collection_id ++;
-       rspamd_fuzzy_collection_send_fstring (conn_ent, reply);
-
-       return 0;
-}
-
-
-static void
-accept_fuzzy_collection_socket (gint fd, short what, void *arg)
-{
-       struct rspamd_worker *worker = (struct rspamd_worker *)arg;
-       rspamd_inet_addr_t *addr;
-       gint nfd;
-       struct rspamd_fuzzy_storage_ctx *ctx;
-       struct rspamd_fuzzy_collection_session *session;
-
-       if ((nfd =
-                       rspamd_accept_from_socket (fd, &addr, worker->accept_events)) == -1) {
-               msg_warn ("accept failed: %s", strerror (errno));
-               return;
-       }
-       /* Check for EAGAIN */
-       if (nfd == 0) {
-               return;
-       }
-
-       ctx = worker->ctx;
-
-       if (!ctx->collection_keypair) {
-               msg_err ("deny request from %s, as no local keypair is specified",
-                               rspamd_inet_address_to_string (addr));
-               rspamd_inet_address_free (addr);
-               close (nfd);
-
-               return;
-       }
-
-       session = g_malloc0 (sizeof (*session));
-       session->ctx = ctx;
-       session->worker = worker;
-       rspamd_random_hex (session->uid, sizeof (session->uid) - 1);
-       session->uid[sizeof (session->uid) - 1] = '\0';
-       session->from_addr = addr;
-       rspamd_http_router_handle_socket (ctx->collection_rt, nfd, session);
-       msg_info_fuzzy_collection ("accepted connection from %s port %d, session ptr: %p",
-                       rspamd_inet_address_to_string (addr),
-                       rspamd_inet_address_get_port (addr),
-                       session);
-}
-
-static void
-rspamd_fuzzy_collection_periodic (gint fd, gshort what, gpointer ud)
-{
-       struct rspamd_fuzzy_storage_ctx *ctx = ud;
-
-       if (++ctx->updates_failed > ctx->updates_maxfail) {
-               msg_err ("cannot store more data in workqueue, discard "
-                               "%ud updates after %d missed collection points",
-                               ctx->updates_pending->len,
-                               ctx->updates_maxfail);
-               ctx->updates_failed = 0;
-               ctx->updates_pending->len = 0;
-               /* Regenerate cookie */
-               ottery_rand_bytes (ctx->cookie, sizeof (ctx->cookie));
-       }
-       else {
-               msg_err ("fuzzy data has not been collected in time, "
-                               "%ud updates are still pending, %d updates left",
-                               ctx->updates_pending->len,
-                               ctx->updates_maxfail - ctx->updates_failed);
-       }
-
-       if (ctx->worker->wanna_die) {
-               /* Plan exit */
-               struct timeval tv;
-
-               tv.tv_sec = 0;
-               tv.tv_usec = 0;
-
-               event_base_loopexit (ctx->ev_base, &tv);
-       }
-}
-
-
-static void
-accept_fuzzy_mirror_socket (gint fd, short what, void *arg)
-{
-       struct rspamd_worker *worker = (struct rspamd_worker *)arg;
-       rspamd_inet_addr_t *addr;
-       gint nfd;
-       struct rspamd_http_connection *http_conn;
-       struct rspamd_fuzzy_storage_ctx *ctx;
-       struct fuzzy_master_update_session *session;
-
-       if ((nfd =
-                       rspamd_accept_from_socket (fd, &addr, worker->accept_events)) == -1) {
-               msg_warn ("accept failed: %s", strerror (errno));
-               return;
-       }
-       /* Check for EAGAIN */
-       if (nfd == 0) {
-               return;
-       }
-
-       ctx = worker->ctx;
-
-       if (!ctx->master_ips) {
-               msg_err ("deny update request from %s as no masters defined",
-                               rspamd_inet_address_to_string (addr));
-               rspamd_inet_address_free (addr);
-               close (nfd);
-
-               return;
-       }
-       else if (rspamd_match_radix_map_addr (ctx->master_ips, addr) == NULL) {
-               msg_err ("deny update request from %s",
-                               rspamd_inet_address_to_string (addr));
-               rspamd_inet_address_free (addr);
-               close (nfd);
-
-               return;
-       }
-
-       if (!ctx->sync_keypair) {
-               msg_err ("deny update request from %s, as no local keypair is specified",
-                               rspamd_inet_address_to_string (addr));
-               rspamd_inet_address_free (addr);
-               close (nfd);
-
-               return;
-       }
-
-       session = g_malloc0 (sizeof (*session));
-       session->name = rspamd_inet_address_to_string (addr);
-       rspamd_random_hex (session->uid, sizeof (session->uid) - 1);
-       session->uid[sizeof (session->uid) - 1] = '\0';
-       http_conn = rspamd_http_connection_new_server (
-                       ctx->http_ctx,
-                       nfd,
-                       NULL,
-                       rspamd_fuzzy_mirror_error_handler,
-                       rspamd_fuzzy_mirror_finish_handler,
-                       0);
-
-       rspamd_http_connection_set_key (http_conn, ctx->sync_keypair);
-       session->ctx = ctx;
-       session->conn = http_conn;
-       session->addr = addr;
-       session->sock = nfd;
-
-       rspamd_http_connection_read_message (http_conn,
-                       session,
-                       &ctx->master_io_tv);
-}
-
-/*
- * Accept new connection and construct task
- */
-static void
-accept_fuzzy_socket (gint fd, short what, void *arg)
-{
-       struct rspamd_worker *worker = (struct rspamd_worker *)arg;
-       struct fuzzy_session *session;
-       rspamd_inet_addr_t *addr;
-       gssize r;
-       guint8 buf[512];
-       guint64 *nerrors;
-
-       /* Got some data */
-       if (what == EV_READ) {
-
-               for (;;) {
-                       worker->nconns++;
-
-                       r = rspamd_inet_address_recvfrom (fd,
-                                       buf,
-                                       sizeof (buf),
-                                       0,
-                                       &addr);
-
-                       if (r == -1) {
-                               if (errno == EINTR) {
-                                       continue;
-                               }
-                               else if (errno == EAGAIN || errno == EWOULDBLOCK) {
-
-                                       return;
-                               }
-
-                               msg_err ("got error while reading from socket: %d, %s",
-                                               errno,
-                                               strerror (errno));
-                               return;
-                       }
-
-                       session = g_malloc0 (sizeof (*session));
-                       REF_INIT_RETAIN (session, fuzzy_session_destroy);
-                       session->worker = worker;
-                       session->fd = fd;
-                       session->ctx = worker->ctx;
-                       session->time = (guint64) time (NULL);
-                       session->addr = addr;
-
-                       if (rspamd_fuzzy_cmd_from_wire (buf, r, session)) {
-                               /* Check shingles count sanity */
-                               rspamd_fuzzy_process_command (session);
-                       }
-                       else {
-                               /* Discard input */
-                               session->ctx->stat.invalid_requests ++;
-                               msg_debug ("invalid fuzzy command of size %z received", r);
-
-                               nerrors = rspamd_lru_hash_lookup (session->ctx->errors_ips,
-                                               addr, -1);
-
-                               if (nerrors == NULL) {
-                                       nerrors = g_malloc (sizeof (*nerrors));
-                                       *nerrors = 1;
-                                       rspamd_lru_hash_insert (session->ctx->errors_ips,
-                                                       rspamd_inet_address_copy (addr),
-                                                       nerrors, -1, -1);
-                               }
-                               else {
-                                       *nerrors = *nerrors + 1;
-                               }
-                       }
-
-                       REF_RELEASE (session);
-               }
-       }
-}
-
-static gboolean
-rspamd_fuzzy_storage_periodic_callback (void *ud)
-{
-       struct rspamd_fuzzy_storage_ctx *ctx = ud;
-
-       if (ctx->updates_pending->len > 0) {
-               rspamd_fuzzy_process_updates_queue (ctx, local_db_name, FALSE);
-
-               return TRUE;
-       }
-
-       return FALSE;
-}
-
-static gboolean
-rspamd_fuzzy_storage_sync (struct rspamd_main *rspamd_main,
-               struct rspamd_worker *worker, gint fd,
-               gint attached_fd,
-               struct rspamd_control_command *cmd,
-               gpointer ud)
-{
-       struct rspamd_fuzzy_storage_ctx *ctx = ud;
-       struct rspamd_control_reply rep;
-
-       rep.reply.fuzzy_sync.status = 0;
-
-       if (ctx->backend && worker->index == 0) {
-               rspamd_fuzzy_process_updates_queue (ctx, local_db_name, FALSE);
-               rspamd_fuzzy_backend_start_update (ctx->backend, ctx->sync_timeout,
-                               rspamd_fuzzy_storage_periodic_callback, ctx);
-       }
+       if (ctx->backend && worker->index == 0) {
+               rspamd_fuzzy_process_updates_queue (ctx, local_db_name, FALSE);
+               rspamd_fuzzy_backend_start_update (ctx->backend, ctx->sync_timeout,
+                               rspamd_fuzzy_storage_periodic_callback, ctx);
+       }
 
        if (write (fd, &rep, sizeof (rep)) != sizeof (rep)) {
                msg_err ("cannot write reply to the control socket: %s",
@@ -2148,7 +1263,7 @@ rspamd_fuzzy_storage_reload (struct rspamd_main *rspamd_main,
        memset (&rep, 0, sizeof (rep));
        rep.type = RSPAMD_CONTROL_RELOAD;
 
-       if ((ctx->backend = rspamd_fuzzy_backend_create (ctx->ev_base,
+       if ((ctx->backend = rspamd_fuzzy_backend_create (ctx->event_loop,
                        worker->cf->options, rspamd_main->cfg,
                        &err)) == NULL) {
                msg_err ("cannot open backend after reload: %e", err);
@@ -2390,120 +1505,6 @@ rspamd_fuzzy_storage_stat (struct rspamd_main *rspamd_main,
        return TRUE;
 }
 
-static gboolean
-fuzzy_storage_parse_mirror (rspamd_mempool_t *pool,
-       const ucl_object_t *obj,
-       gpointer ud,
-       struct rspamd_rcl_section *section,
-       GError **err)
-{
-       const ucl_object_t *elt;
-       struct rspamd_fuzzy_mirror *up = NULL;
-       struct rspamd_rcl_struct_parser *pd = ud;
-       struct rspamd_fuzzy_storage_ctx *ctx;
-
-       ctx = pd->user_struct;
-
-       if (ucl_object_type (obj) != UCL_OBJECT) {
-               g_set_error (err, g_quark_try_string ("fuzzy"), 100,
-                               "mirror/slave option must be an object");
-
-               return FALSE;
-       }
-
-       elt = ucl_object_lookup (obj, "name");
-       if (elt == NULL) {
-               g_set_error (err, g_quark_try_string ("fuzzy"), 100,
-                               "mirror option must have some name definition");
-
-               return FALSE;
-       }
-
-       up = g_malloc0 (sizeof (*up));
-       up->name = g_strdup (ucl_object_tostring (elt));
-
-       elt = ucl_object_lookup (obj, "key");
-       if (elt != NULL) {
-               up->key = rspamd_pubkey_from_base32 (ucl_object_tostring (elt), 0,
-                               RSPAMD_KEYPAIR_KEX, RSPAMD_CRYPTOBOX_MODE_25519);
-       }
-
-       if (up->key == NULL) {
-               g_set_error (err, g_quark_try_string ("fuzzy"), 100,
-                               "cannot read mirror key");
-
-               goto err;
-       }
-
-       elt = ucl_object_lookup (obj, "hosts");
-
-       if (elt == NULL) {
-               g_set_error (err, g_quark_try_string ("fuzzy"), 100,
-                               "mirror option must have some hosts definition");
-
-               goto err;
-       }
-
-       up->u = rspamd_upstreams_create (ctx->cfg->ups_ctx);
-       if (!rspamd_upstreams_from_ucl (up->u, elt, 11335, NULL)) {
-               g_set_error (err,  g_quark_try_string ("fuzzy"), 100,
-                               "mirror has bad hosts definition");
-
-               goto err;
-       }
-
-       g_ptr_array_add (ctx->mirrors, up);
-
-       return TRUE;
-
-err:
-       g_free (up->name);
-       rspamd_upstreams_destroy (up->u);
-
-       if (up->key) {
-               rspamd_pubkey_unref (up->key);
-       }
-
-       g_free (up);
-
-       return FALSE;
-}
-
-static gboolean
-fuzzy_storage_parse_master_flags (rspamd_mempool_t *pool,
-       const ucl_object_t *obj,
-       gpointer ud,
-       struct rspamd_rcl_section *section,
-       GError **err)
-{
-       const ucl_object_t *cur;
-       struct rspamd_rcl_struct_parser *pd = ud;
-       struct rspamd_fuzzy_storage_ctx *ctx;
-       ucl_object_iter_t it = NULL;
-       gulong remote_flag;
-       gint64 local_flag;
-
-       ctx = pd->user_struct;
-
-       if (ucl_object_type (obj) != UCL_OBJECT) {
-               g_set_error (err, g_quark_try_string ("fuzzy"), 100,
-                               "master_flags option must be an object");
-
-               return FALSE;
-       }
-
-       while ((cur = ucl_iterate_object (obj, &it, true)) != NULL) {
-               if (rspamd_strtoul (cur->key, cur->keylen, &remote_flag) &&
-                               ucl_object_toint_safe (cur, (int64_t *)&local_flag)) {
-                       g_hash_table_insert (ctx->master_flags, GUINT_TO_POINTER (remote_flag),
-                                       GUINT_TO_POINTER (local_flag));
-               }
-       }
-
-       return TRUE;
-}
-
-
 static gboolean
 fuzzy_parse_keypair (rspamd_mempool_t *pool,
                const ucl_object_t *obj,
@@ -2599,26 +1600,18 @@ init_fuzzy (struct rspamd_config *cfg)
 
        ctx->magic = rspamd_fuzzy_storage_magic;
        ctx->sync_timeout = DEFAULT_SYNC_TIMEOUT;
-       ctx->master_timeout = DEFAULT_MASTER_TIMEOUT;
        ctx->keypair_cache_size = DEFAULT_KEYPAIR_CACHE_SIZE;
        ctx->keys = g_hash_table_new_full (fuzzy_kp_hash, fuzzy_kp_equal,
                        NULL, fuzzy_key_dtor);
        rspamd_mempool_add_destructor (cfg->cfg_pool,
                        (rspamd_mempool_destruct_t)g_hash_table_unref, ctx->keys);
-       ctx->master_flags = g_hash_table_new (g_direct_hash, g_direct_equal);
-       rspamd_mempool_add_destructor (cfg->cfg_pool,
-                       (rspamd_mempool_destruct_t)g_hash_table_unref, ctx->master_flags);
        ctx->errors_ips = rspamd_lru_hash_new_full (1024,
                        (GDestroyNotify) rspamd_inet_address_free, g_free,
                        rspamd_inet_address_hash, rspamd_inet_address_equal);
        rspamd_mempool_add_destructor (cfg->cfg_pool,
                        (rspamd_mempool_destruct_t)rspamd_lru_hash_destroy, ctx->errors_ips);
        ctx->cfg = cfg;
-       ctx->mirrors = g_ptr_array_new ();
-       rspamd_mempool_add_destructor (cfg->cfg_pool,
-                       (rspamd_mempool_destruct_t)rspamd_ptr_array_free_hard, ctx->mirrors);
        ctx->updates_maxfail = DEFAULT_UPDATES_MAXFAIL;
-       ctx->collection_id_file = RSPAMD_DBDIR "/fuzzy_collection.id";
        ctx->leaky_bucket_mask = DEFAULT_BUCKET_MASK;
        ctx->leaky_bucket_ttl = DEFAULT_BUCKET_TTL;
        ctx->max_buckets = DEFAULT_MAX_BUCKETS;
@@ -2694,32 +1687,6 @@ init_fuzzy (struct rspamd_config *cfg)
                        0,
                        "Work in read only mode");
 
-       rspamd_rcl_register_worker_option (cfg,
-                       type,
-                       "master_timeout",
-                       rspamd_rcl_parse_struct_time,
-                       ctx,
-                       G_STRUCT_OFFSET (struct rspamd_fuzzy_storage_ctx, master_timeout),
-                       RSPAMD_CL_FLAG_TIME_FLOAT,
-                       "Master protocol IO timeout");
-
-       rspamd_rcl_register_worker_option (cfg,
-                       type,
-                       "sync_keypair",
-                       rspamd_rcl_parse_struct_keypair,
-                       ctx,
-                       G_STRUCT_OFFSET (struct rspamd_fuzzy_storage_ctx, sync_keypair),
-                       0,
-                       "Encryption key for master/slave updates");
-
-       rspamd_rcl_register_worker_option (cfg,
-                       type,
-                       "masters",
-                       rspamd_rcl_parse_struct_ucl,
-                       ctx,
-                       G_STRUCT_OFFSET (struct rspamd_fuzzy_storage_ctx, masters_map),
-                       0,
-                       "Allow master/slave updates from the following IP addresses");
 
        rspamd_rcl_register_worker_option (cfg,
                        type,
@@ -2730,41 +1697,7 @@ init_fuzzy (struct rspamd_config *cfg)
                        0,
                        "Block requests from specific networks");
 
-       rspamd_rcl_register_worker_option (cfg,
-                       type,
-                       "master_key",
-                       rspamd_rcl_parse_struct_pubkey,
-                       ctx,
-                       G_STRUCT_OFFSET (struct rspamd_fuzzy_storage_ctx, master_key),
-                       0,
-                       "Allow master/slave updates merely using the specified key");
-
-       rspamd_rcl_register_worker_option (cfg,
-                       type,
-                       "mirror",
-                       fuzzy_storage_parse_mirror,
-                       ctx,
-                       0,
-                       RSPAMD_CL_FLAG_MULTIPLE,
-                       "List of slave hosts");
-
-       rspamd_rcl_register_worker_option (cfg,
-                       type,
-                       "slave",
-                       fuzzy_storage_parse_mirror,
-                       ctx,
-                       0,
-                       RSPAMD_CL_FLAG_MULTIPLE,
-                       "List of slave hosts");
 
-       rspamd_rcl_register_worker_option (cfg,
-                       type,
-                       "master_flags",
-                       fuzzy_storage_parse_master_flags,
-                       ctx,
-                       0,
-                       0,
-                       "Map of flags in form master_flags = { master_flag = local_flag; ... }; ");
        rspamd_rcl_register_worker_option (cfg,
                        type,
                        "updates_maxfail",
@@ -2773,38 +1706,6 @@ init_fuzzy (struct rspamd_config *cfg)
                        G_STRUCT_OFFSET (struct rspamd_fuzzy_storage_ctx, updates_maxfail),
                        RSPAMD_CL_FLAG_UINT,
                        "Maximum number of updates to be failed before discarding");
-       rspamd_rcl_register_worker_option (cfg,
-                       type,
-                       "collection_only",
-                       rspamd_rcl_parse_struct_boolean,
-                       ctx,
-                       G_STRUCT_OFFSET (struct rspamd_fuzzy_storage_ctx, collection_mode),
-                       0,
-                       "Start fuzzy in collection only mode");
-       rspamd_rcl_register_worker_option (cfg,
-                       type,
-                       "collection_signkey",
-                       rspamd_rcl_parse_struct_pubkey,
-                       ctx,
-                       G_STRUCT_OFFSET (struct rspamd_fuzzy_storage_ctx, collection_sign_key),
-                       RSPAMD_CL_FLAG_SIGNKEY,
-                       "Accept only signed requests with the specified key");
-       rspamd_rcl_register_worker_option (cfg,
-                       type,
-                       "collection_keypair",
-                       rspamd_rcl_parse_struct_keypair,
-                       ctx,
-                       G_STRUCT_OFFSET (struct rspamd_fuzzy_storage_ctx, collection_keypair),
-                       0,
-                       "Use the specified keypair to encrypt collection protocol");
-       rspamd_rcl_register_worker_option (cfg,
-                       type,
-                       "collection_id_file",
-                       rspamd_rcl_parse_struct_string,
-                       ctx,
-                       G_STRUCT_OFFSET (struct rspamd_fuzzy_storage_ctx, collection_id_file),
-                       RSPAMD_CL_FLAG_STRING_PATH,
-                       "Store collection epoch in the desired file");
        rspamd_rcl_register_worker_option (cfg,
                        type,
                        "skip_hashes",
@@ -2877,17 +1778,18 @@ init_fuzzy (struct rspamd_config *cfg)
 }
 
 static void
-rspamd_fuzzy_peer_io (gint fd, gshort what, gpointer d)
+rspamd_fuzzy_peer_io (EV_P_ ev_io *w, int revents)
 {
        struct fuzzy_peer_cmd cmd;
-       struct rspamd_fuzzy_storage_ctx *ctx = d;
+       struct rspamd_fuzzy_storage_ctx *ctx =
+                       (struct rspamd_fuzzy_storage_ctx *)w->data;
        gssize r;
 
-       r = read (fd, &cmd, sizeof (cmd));
+       r = read (w->fd, &cmd, sizeof (cmd));
 
        if (r != sizeof (cmd)) {
                if (errno == EINTR) {
-                       rspamd_fuzzy_peer_io (fd, what, d);
+                       rspamd_fuzzy_peer_io (EV_A_ w, revents);
                        return;
                }
                if (errno != EAGAIN) {
@@ -2907,7 +1809,7 @@ fuzzy_peer_rep (struct rspamd_worker *worker,
        struct rspamd_fuzzy_storage_ctx *ctx = ud;
        GList *cur;
        struct rspamd_worker_listen_socket *ls;
-       struct event *accept_events;
+       struct rspamd_worker_accept_event *ac_ev;
 
        ctx->peer_fd = rep_fd;
 
@@ -2931,30 +1833,17 @@ fuzzy_peer_rep (struct rspamd_worker *worker,
                                        rspamd_inet_address_to_string_pretty (ls->addr));
 
                        if (ls->type == RSPAMD_WORKER_SOCKET_UDP) {
-                               accept_events = g_malloc0 (sizeof (struct event) * 2);
-                               event_set (&accept_events[0], ls->fd, EV_READ | EV_PERSIST,
-                                               accept_fuzzy_socket, worker);
-                               event_base_set (ctx->ev_base, &accept_events[0]);
-                               event_add (&accept_events[0], NULL);
-                               worker->accept_events = g_list_prepend (worker->accept_events,
-                                               accept_events);
+                               ac_ev = g_malloc0 (sizeof (*ac_ev));
+                               ac_ev->accept_ev.data = worker;
+                               ac_ev->event_loop = ctx->event_loop;
+                               ev_io_init (&ac_ev->accept_ev, accept_fuzzy_socket, ls->fd,
+                                               EV_READ);
+                               ev_io_start (ctx->event_loop, &ac_ev->accept_ev);
+                               DL_APPEND (worker->accept_events, ac_ev);
                        }
-                       else if (worker->index == 0) {
+                       else  {
                                /* We allow TCP listeners only for a update worker */
-                               accept_events = g_malloc0 (sizeof (struct event) * 2);
-
-                               if (ctx->collection_mode) {
-                                       event_set (&accept_events[0], ls->fd, EV_READ | EV_PERSIST,
-                                                       accept_fuzzy_collection_socket, worker);
-                               }
-                               else {
-                                       event_set (&accept_events[0], ls->fd, EV_READ | EV_PERSIST,
-                                                       accept_fuzzy_mirror_socket, worker);
-                               }
-                               event_base_set (ctx->ev_base, &accept_events[0]);
-                               event_add (&accept_events[0], NULL);
-                               worker->accept_events = g_list_prepend (worker->accept_events,
-                                               accept_events);
+                               g_assert_not_reached ();
                        }
                }
 
@@ -2963,10 +1852,9 @@ fuzzy_peer_rep (struct rspamd_worker *worker,
 
        if (worker->index == 0 && ctx->peer_fd != -1) {
                /* Listen for peer requests */
-               event_set (&ctx->peer_ev, ctx->peer_fd, EV_READ | EV_PERSIST,
-                               rspamd_fuzzy_peer_io, ctx);
-               event_base_set (ctx->ev_base, &ctx->peer_ev);
-               event_add (&ctx->peer_ev, NULL);
+               ctx->peer_ev.data = ctx;
+               ev_io_init (&ctx->peer_ev, rspamd_fuzzy_peer_io, ctx->peer_fd, EV_READ);
+               ev_io_start (ctx->event_loop, &ctx->peer_ev);
        }
 }
 
@@ -2981,140 +1869,53 @@ start_fuzzy (struct rspamd_worker *worker)
        struct rspamd_srv_command srv_cmd;
        struct rspamd_config *cfg = worker->srv->cfg;
 
-       ctx->ev_base = rspamd_prepare_worker (worker,
+       ctx->event_loop = rspamd_prepare_worker (worker,
                        "fuzzy",
                        NULL);
        ctx->peer_fd = -1;
        ctx->worker = worker;
        ctx->cfg = worker->srv->cfg;
-       double_to_tv (ctx->master_timeout, &ctx->master_io_tv);
-
        ctx->resolver = rspamd_dns_resolver_init (worker->srv->logger,
-                       ctx->ev_base,
+                       ctx->event_loop,
                        worker->srv->cfg);
        rspamd_upstreams_library_config (worker->srv->cfg, ctx->cfg->ups_ctx,
-                       ctx->ev_base, ctx->resolver->r);
+                       ctx->event_loop, ctx->resolver->r);
        if (ctx->keypair_cache_size > 0) {
                /* Create keypairs cache */
                ctx->keypair_cache = rspamd_keypair_cache_new (ctx->keypair_cache_size);
        }
 
-       ctx->http_ctx = rspamd_http_context_create (cfg, ctx->ev_base, ctx->cfg->ups_ctx);
 
-       if (!ctx->collection_mode) {
-               /*
-                * Open DB and perform VACUUM
-                */
-               if ((ctx->backend = rspamd_fuzzy_backend_create (ctx->ev_base,
-                               worker->cf->options, cfg, &err)) == NULL) {
-                       msg_err ("cannot open backend: %e", err);
-                       if (err) {
-                               g_error_free (err);
-                       }
-                       exit (EXIT_SUCCESS);
-               }
-
-               rspamd_fuzzy_backend_count (ctx->backend, fuzzy_count_callback, ctx);
-
-
-               if (worker->index == 0) {
-                       ctx->updates_pending = g_array_sized_new (FALSE, FALSE,
-                                       sizeof (struct fuzzy_peer_cmd), 1024);
-                       rspamd_fuzzy_backend_start_update (ctx->backend, ctx->sync_timeout,
-                                       rspamd_fuzzy_storage_periodic_callback, ctx);
+       if ((ctx->backend = rspamd_fuzzy_backend_create (ctx->event_loop,
+                       worker->cf->options, cfg, &err)) == NULL) {
+               msg_err ("cannot open backend: %e", err);
+               if (err) {
+                       g_error_free (err);
                }
-
-               double_to_tv (ctx->sync_timeout, &ctx->stat_tv);
-               event_set (&ctx->stat_ev, -1, EV_TIMEOUT, rspamd_fuzzy_stat_callback, ctx);
-               event_base_set (ctx->ev_base, &ctx->stat_ev);
-               event_add (&ctx->stat_ev, &ctx->stat_tv);
-
-               /* Register custom reload and stat commands for the control socket */
-               rspamd_control_worker_add_cmd_handler (worker, RSPAMD_CONTROL_RELOAD,
-                               rspamd_fuzzy_storage_reload, ctx);
-               rspamd_control_worker_add_cmd_handler (worker, RSPAMD_CONTROL_FUZZY_STAT,
-                               rspamd_fuzzy_storage_stat, ctx);
-               rspamd_control_worker_add_cmd_handler (worker, RSPAMD_CONTROL_FUZZY_SYNC,
-                               rspamd_fuzzy_storage_sync, ctx);
+               exit (EXIT_SUCCESS);
        }
-       else {
-               /*
-                * In collection mode we do a different thing:
-                * we collect fuzzy hashes in the updates queue and ignore all read commands
-                */
-               if (worker->index == 0) {
-                       ctx->updates_pending = g_array_sized_new (FALSE, FALSE,
-                                       sizeof (struct fuzzy_peer_cmd), 1024);
-                       double_to_tv (ctx->sync_timeout, &ctx->stat_tv);
-                       event_set (&ctx->stat_ev, -1, EV_TIMEOUT|EV_PERSIST,
-                                       rspamd_fuzzy_collection_periodic, ctx);
-                       event_base_set (ctx->ev_base, &ctx->stat_ev);
-                       event_add (&ctx->stat_ev, &ctx->stat_tv);
-
-                       ctx->collection_rt = rspamd_http_router_new (
-                                       rspamd_fuzzy_collection_error_handler,
-                                       rspamd_fuzzy_collection_finish_handler,
-                                       &ctx->stat_tv,
-                                       NULL,
-                                       ctx->http_ctx);
-
-                       if (ctx->collection_keypair) {
-                               rspamd_http_router_set_key (ctx->collection_rt,
-                                               ctx->collection_keypair);
-                       }
 
-                       /* Try to load collection id */
-                       if (ctx->collection_id_file) {
-                               gint fd;
+       rspamd_fuzzy_backend_count (ctx->backend, fuzzy_count_callback, ctx);
 
-                               fd = rspamd_file_xopen (ctx->collection_id_file, O_RDONLY, 0,
-                                               FALSE);
 
-                               if (fd == -1) {
-                                       if (errno != ENOENT) {
-                                               msg_err ("cannot open collection id from %s: %s",
-                                                               ctx->collection_id_file, strerror (errno));
-                                       }
-
-                                       ctx->collection_id = 0;
-                               }
-                               else {
-                                       if (read (fd, &ctx->collection_id,
-                                                       sizeof (ctx->collection_id)) == -1) {
-                                               msg_err ("cannot read collection id from %s: %s",
-                                                               ctx->collection_id_file, strerror (errno));
-                                               ctx->collection_id = 0;
-                                       }
-
-                                       close (fd);
-                               }
-                       }
-
-                       /* Generate new cookie */
-                       ottery_rand_bytes (ctx->cookie, sizeof (ctx->cookie));
-                       /* Register paths */
-                       rspamd_http_router_add_path (ctx->collection_rt,
-                                       "/cookie",
-                                       rspamd_fuzzy_collection_cookie);
-                       rspamd_http_router_add_path (ctx->collection_rt,
-                                       "/data",
-                                       rspamd_fuzzy_collection_data);
-               }
+       if (worker->index == 0) {
+               ctx->updates_pending = g_array_sized_new (FALSE, FALSE,
+                               sizeof (struct fuzzy_peer_cmd), 1024);
+               rspamd_fuzzy_backend_start_update (ctx->backend, ctx->sync_timeout,
+                               rspamd_fuzzy_storage_periodic_callback, ctx);
        }
 
-       if (ctx->mirrors && ctx->mirrors->len != 0) {
-               if (ctx->sync_keypair == NULL) {
-                       GString *pk_str = NULL;
-
-                       ctx->sync_keypair = rspamd_keypair_new (RSPAMD_KEYPAIR_KEX,
-                                       RSPAMD_CRYPTOBOX_MODE_25519);
-                       pk_str = rspamd_keypair_print (ctx->sync_keypair,
-                                                               RSPAMD_KEYPAIR_COMPONENT_PK|RSPAMD_KEYPAIR_BASE32);
-                       msg_warn_config ("generating new temporary keypair for communicating"
-                                       " with slave hosts, pk is %s", pk_str->str);
-                       g_string_free (pk_str, TRUE);
-               }
-       }
+       ctx->stat_ev.data = ctx;
+       ev_timer_init (&ctx->stat_ev, rspamd_fuzzy_stat_callback, ctx->sync_timeout,
+                       ctx->sync_timeout);
+       ev_timer_start (ctx->event_loop, &ctx->stat_ev);
+       /* Register custom reload and stat commands for the control socket */
+       rspamd_control_worker_add_cmd_handler (worker, RSPAMD_CONTROL_RELOAD,
+                       rspamd_fuzzy_storage_reload, ctx);
+       rspamd_control_worker_add_cmd_handler (worker, RSPAMD_CONTROL_FUZZY_STAT,
+                       rspamd_fuzzy_storage_stat, ctx);
+       rspamd_control_worker_add_cmd_handler (worker, RSPAMD_CONTROL_FUZZY_SYNC,
+                       rspamd_fuzzy_storage_sync, ctx);
 
        /* Create radix trees */
        if (ctx->update_map != NULL) {
@@ -3123,12 +1924,6 @@ start_fuzzy (struct rspamd_worker *worker)
                                &ctx->update_ips, NULL);
        }
 
-       if (ctx->masters_map != NULL) {
-               rspamd_config_radix_from_ucl (worker->srv->cfg, ctx->masters_map,
-                               "Allow fuzzy master/slave updates from specified addresses",
-                               &ctx->master_ips, NULL);
-       }
-
        if (ctx->skip_map != NULL) {
                struct rspamd_map *m;
 
@@ -3168,9 +1963,9 @@ start_fuzzy (struct rspamd_worker *worker)
 
        /* Maps events */
        ctx->resolver = rspamd_dns_resolver_init (worker->srv->logger,
-                       ctx->ev_base,
+                       ctx->event_loop,
                        worker->srv->cfg);
-       rspamd_map_watch (worker->srv->cfg, ctx->ev_base, ctx->resolver, worker, 0);
+       rspamd_map_watch (worker->srv->cfg, ctx->event_loop, ctx->resolver, worker, 0);
 
        /* Get peer pipe */
        memset (&srv_cmd, 0, sizeof (srv_cmd));
@@ -3180,64 +1975,35 @@ start_fuzzy (struct rspamd_worker *worker)
        memset (srv_cmd.cmd.spair.pair_id, 0, sizeof (srv_cmd.cmd.spair.pair_id));
        memcpy (srv_cmd.cmd.spair.pair_id, "fuzzy", sizeof ("fuzzy"));
 
-       rspamd_srv_send_command (worker, ctx->ev_base, &srv_cmd, -1,
+       rspamd_srv_send_command (worker, ctx->event_loop, &srv_cmd, -1,
                        fuzzy_peer_rep, ctx);
 
-       event_base_loop (ctx->ev_base, 0);
+       ev_loop (ctx->event_loop, 0);
        rspamd_worker_block_signals ();
 
-       if (worker->index == 0 && ctx->updates_pending->len > 0) {
-               if (!ctx->collection_mode) {
-                       rspamd_fuzzy_process_updates_queue (ctx, local_db_name, FALSE);
-                       event_base_loop (ctx->ev_base, 0);
+       if (ctx->peer_fd != -1) {
+               if (worker->index == 0) {
+                       ev_io_stop (ctx->event_loop, &ctx->peer_ev);
                }
+               close (ctx->peer_fd);
        }
 
-       if (!ctx->collection_mode) {
-               rspamd_fuzzy_backend_close (ctx->backend);
+       if (worker->index == 0 && ctx->updates_pending->len > 0) {
+               rspamd_fuzzy_process_updates_queue (ctx, local_db_name, FALSE);
+               ev_loop (ctx->event_loop, 0);
        }
-       else if (worker->index == 0) {
-               gint fd;
-
-               rspamd_http_router_free (ctx->collection_rt);
 
-               /* Try to save collection id */
-               fd = rspamd_file_xopen (ctx->collection_id_file,
-                               O_WRONLY | O_CREAT | O_TRUNC, 00644, 0);
-
-               if (fd == -1) {
-                       msg_err ("cannot open collection id to store in %s: %s",
-                                       ctx->collection_id_file, strerror (errno));
-               }
-               else {
-                       if (write (fd, &ctx->collection_id,
-                                       sizeof (ctx->collection_id)) == -1) {
-                               msg_err ("cannot store collection id in %s: %s",
-                                               ctx->collection_id_file, strerror (errno));
-                       }
-
-                       close (fd);
-               }
-       }
+       rspamd_fuzzy_backend_close (ctx->backend);
 
        if (worker->index == 0) {
                g_array_free (ctx->updates_pending, TRUE);
        }
 
-       if (ctx->peer_fd != -1) {
-               if (worker->index == 0) {
-                       event_del (&ctx->peer_ev);
-               }
-               close (ctx->peer_fd);
-       }
-
        if (ctx->keypair_cache) {
                rspamd_keypair_cache_destroy (ctx->keypair_cache);
        }
 
-       struct rspamd_http_context *http_ctx = ctx->http_ctx;
        REF_RELEASE (ctx->cfg);
-       rspamd_http_context_free (http_ctx);
        rspamd_log_close (worker->srv->logger, TRUE);
 
        exit (EXIT_SUCCESS);