path: root/src/fuzzy_storage.c
diff options
Diffstat (limited to 'src/fuzzy_storage.c')
1 files changed, 360 insertions, 42 deletions
diff --git a/src/fuzzy_storage.c b/src/fuzzy_storage.c
index 9738c3f63..d1cb2053a 100644
--- a/src/fuzzy_storage.c
+++ b/src/fuzzy_storage.c
@@ -41,6 +41,7 @@
+#define COOKIE_SIZE 128
static const gchar *local_db_name = "local";
@@ -49,15 +50,31 @@ static const gchar *local_db_name = "local";
#define msg_warn_fuzzy_update(...) rspamd_default_log_function (G_LOG_LEVEL_WARNING, \
- session->name, session->uid, \
+ session->name, session->uid, \
#define msg_info_fuzzy_update(...) rspamd_default_log_function (G_LOG_LEVEL_INFO, \
- session->name, session->uid, \
+ session->name, session->uid, \
#define msg_debug_fuzzy_update(...) rspamd_default_log_function (G_LOG_LEVEL_DEBUG, \
- session->name, session->uid, \
+ session->name, session->uid, \
+ __VA_ARGS__)
+#define msg_err_fuzzy_collection(...) rspamd_default_log_function (G_LOG_LEVEL_CRITICAL, \
+ "fuzzy_collection", session->uid, \
+ __VA_ARGS__)
+#define msg_warn_fuzzy_collection(...) rspamd_default_log_function (G_LOG_LEVEL_WARNING, \
+ "fuzzy_collection", session->uid, \
+ __VA_ARGS__)
+#define msg_info_fuzzy_collection(...) rspamd_default_log_function (G_LOG_LEVEL_INFO, \
+ "fuzzy_collection", session->uid, \
+ __VA_ARGS__)
+#define msg_debug_fuzzy_collection(...) rspamd_default_log_function (G_LOG_LEVEL_DEBUG, \
+ "fuzzy_collection", session->uid, \
@@ -132,6 +149,9 @@ struct rspamd_fuzzy_storage_ctx {
struct fuzzy_key *default_key;
GHashTable *keys;
gboolean encrypted_only;
+ gboolean collection_mode;
+ struct rspamd_cryptobox_keypair *collection_keypair;
+ struct rspamd_cryptobox_pubkey *collection_sign_key;
struct rspamd_keypair_cache *keypair_cache;
rspamd_lru_hash_t *errors_ips;
struct rspamd_fuzzy_backend *backend;
@@ -141,6 +161,8 @@ struct rspamd_fuzzy_storage_ctx {
struct rspamd_dns_resolver *resolver;
struct rspamd_config *cfg;
struct rspamd_worker *worker;
+ struct rspamd_http_connection_router *collection_rt;
+ guchar cookie[COOKIE_SIZE];
enum fuzzy_cmd_type {
@@ -816,16 +838,33 @@ rspamd_fuzzy_process_command (struct fuzzy_session *session)
result.flag = cmd->flag;
if (cmd->cmd == FUZZY_CHECK) {
- REF_RETAIN (session);
- rspamd_fuzzy_backend_check (session->ctx->backend, cmd,
- rspamd_fuzzy_check_callback, session);
+ if (G_UNLIKELY (session->ctx->collection_mode)) {
+ result.prob = 0;
+ result.value = 500;
+ result.flag = 0;
+ rspamd_fuzzy_make_reply (cmd, &result, session, encrypted, is_shingle);
+ }
+ else {
+ REF_RETAIN (session);
+ rspamd_fuzzy_backend_check (session->ctx->backend, cmd,
+ rspamd_fuzzy_check_callback, session);
+ }
else if (cmd->cmd == FUZZY_STAT) {
- result.prob = 1.0;
- result.value = 0;
- result.flag = session->ctx->stat.fuzzy_hashes;
- rspamd_fuzzy_make_reply (cmd, &result, session, encrypted, is_shingle);
+ if (G_UNLIKELY (session->ctx->collection_mode)) {
+ result.prob = 0;
+ result.value = 500;
+ result.flag = 0;
+ rspamd_fuzzy_make_reply (cmd, &result, session, encrypted, is_shingle);
+ }
+ else {
+ result.prob = 1.0;
+ result.value = 0;
+ result.flag = session->ctx->stat.fuzzy_hashes;
+ rspamd_fuzzy_make_reply (cmd, &result, session, encrypted, is_shingle);
+ }
else {
if (rspamd_fuzzy_check_client (session)) {
@@ -1347,6 +1386,210 @@ end:
return 0;
+struct rspamd_fuzzy_collection_session {
+ struct rspamd_fuzzy_storage_ctx *ctx;
+ struct rspamd_worker *worker;
+ rspamd_inet_addr_t *from_addr;
+ guchar uid[16];
+static void
+rspamd_fuzzy_collection_error_handler (struct rspamd_http_connection_entry *conn_ent,
+ GError *err)
+ struct rspamd_fuzzy_collection_session *session = conn_ent->ud;
+ msg_err_fuzzy_collection ("http error occurred: %s", err->message);
+static void
+rspamd_fuzzy_collection_finish_handler (struct rspamd_http_connection_entry *conn_ent)
+ struct rspamd_fuzzy_collection_session *session = conn_ent->ud;
+ rspamd_inet_address_destroy (session->from_addr);
+ g_slice_free1 (sizeof (struct rspamd_fuzzy_collection_session), session);
+rspamd_fuzzy_collection_send_error (struct rspamd_http_connection_entry *entry,
+ gint code, const gchar *error_msg, ...)
+ struct rspamd_http_message *msg;
+ va_list args;
+ rspamd_fstring_t *reply;
+ msg = rspamd_http_new_message (HTTP_RESPONSE);
+ va_start (args, error_msg);
+ msg->status = rspamd_fstring_new ();
+ rspamd_vprintf_fstring (&msg->status, error_msg, args);
+ va_end (args);
+ msg->date = time (NULL);
+ msg->code = code;
+ reply = rspamd_fstring_sized_new (msg->status->len + 16);
+ rspamd_printf_fstring (&reply, "%V", msg->status);
+ rspamd_http_message_set_body_from_fstring_steal (msg, reply);
+ rspamd_http_connection_reset (entry->conn);
+ rspamd_http_router_insert_headers (entry->rt, msg);
+ rspamd_http_connection_write_message (entry->conn,
+ msg,
+ "text/plain",
+ entry,
+ entry->conn->fd,
+ entry->rt->ptv,
+ entry->rt->ev_base);
+ entry->is_reply = TRUE;
+ * Note: this function steals fstring
+ */
+rspamd_fuzzy_collection_send_fstring (struct rspamd_http_connection_entry *entry,
+ rspamd_fstring_t *fstr)
+ struct rspamd_http_message *msg;
+ msg = rspamd_http_new_message (HTTP_RESPONSE);
+ msg->status = rspamd_fstring_new_init ("OK", 2);
+ msg->date = time (NULL);
+ msg->code = 200;
+ rspamd_http_message_set_body_from_fstring_steal (msg, fstr);
+ rspamd_http_connection_reset (entry->conn);
+ rspamd_http_router_insert_headers (entry->rt, msg);
+ rspamd_http_connection_write_message (entry->conn,
+ msg,
+ "application/octet-stream",
+ entry,
+ entry->conn->fd,
+ entry->rt->ptv,
+ entry->rt->ev_base);
+ entry->is_reply = TRUE;
+static int
+rspamd_fuzzy_collection_cookie_handler (struct rspamd_http_connection_entry *conn_ent,
+ struct rspamd_http_message *msg)
+ struct rspamd_fuzzy_collection_session *session = conn_ent->ud;
+ rspamd_fstring_t *cookie;
+ cookie = rspamd_fstring_new_init (session->ctx->cookie,
+ sizeof (session->ctx->cookie));
+ rspamd_fuzzy_collection_send_fstring (conn_ent, cookie);
+ return 0;
+static int
+rspamd_fuzzy_collection_data_handler (struct rspamd_http_connection_entry *conn_ent,
+ struct rspamd_http_message *msg)
+ struct rspamd_fuzzy_collection_session *session = conn_ent->ud;
+ rspamd_fstring_t *cookie;
+ cookie = rspamd_fstring_new_init (session->ctx->cookie,
+ sizeof (session->ctx->cookie));
+ rspamd_fuzzy_collection_send_fstring (conn_ent, cookie);
+ return 0;
+static void
+accept_fuzzy_collection_socket (gint fd, short what, void *arg)
+ struct rspamd_worker *worker = (struct rspamd_worker *)arg;
+ rspamd_inet_addr_t *addr;
+ gint nfd;
+ struct rspamd_fuzzy_storage_ctx *ctx;
+ struct rspamd_fuzzy_collection_session *session;
+ if ((nfd =
+ rspamd_accept_from_socket (fd, &addr, worker->accept_events)) == -1) {
+ msg_warn ("accept failed: %s", strerror (errno));
+ return;
+ }
+ /* Check for EAGAIN */
+ if (nfd == 0) {
+ return;
+ }
+ ctx = worker->ctx;
+ if (!ctx->collection_keypair) {
+ msg_err ("deny request from %s, as no local keypair is specified",
+ rspamd_inet_address_to_string (addr));
+ rspamd_inet_address_destroy (addr);
+ close (nfd);
+ return;
+ }
+ session = g_slice_alloc0 (sizeof (*session));
+ session->ctx = ctx;
+ session->worker = worker;
+ rspamd_random_hex (session->uid, sizeof (session->uid) - 1);
+ session->uid[sizeof (session->uid) - 1] = '\0';
+ session->from_addr = addr;
+ rspamd_http_router_handle_socket (ctx->collection_rt, nfd, session);
+ msg_info_fuzzy_collection ("accepted connection from %s port %d, session ptr: %p",
+ rspamd_inet_address_to_string (addr),
+ rspamd_inet_address_get_port (addr),
+ session);
+static void
+rspamd_fuzzy_collection_periodic (gint fd, gshort what, gpointer ud)
+ struct rspamd_fuzzy_storage_ctx *ctx = ud;
+ GList *cur;
+ struct fuzzy_peer_cmd *io_cmd;
+ if (++ctx->updates_failed > ctx->updates_maxfail) {
+ msg_err ("cannot store more data in workqueue, discard "
+ "%ud updates after %d missed collection points",
+ g_queue_get_length (ctx->updates_pending),
+ ctx->updates_maxfail);
+ ctx->updates_failed = 0;
+ cur = ctx->updates_pending->head;
+ while (cur) {
+ io_cmd = cur->data;
+ g_slice_free1 (sizeof (*io_cmd), io_cmd);
+ cur = g_list_next (cur);
+ }
+ g_queue_clear (ctx->updates_pending);
+ /* Regenerate cookie */
+ ottery_rand_bytes (ctx->cookie, sizeof (ctx->cookie));
+ }
+ else {
+ msg_err ("fuzzy data has not been collected in time, "
+ "%ud updates are still pending, %d updates left",
+ g_queue_get_length (ctx->updates_pending),
+ ctx->updates_maxfail - ctx->updates_failed);
+ }
+ if (ctx->worker->wanna_die) {
+ /* Plan exit */
+ struct timeval tv;
+ tv.tv_sec = 0;
+ tv.tv_usec = 0;
+ event_base_loopexit (ctx->ev_base, &tv);
+ }
static void
accept_fuzzy_mirror_socket (gint fd, short what, void *arg)
@@ -2205,6 +2448,30 @@ init_fuzzy (struct rspamd_config *cfg)
G_STRUCT_OFFSET (struct rspamd_fuzzy_storage_ctx, updates_maxfail),
"Maximum number of updates to be failed before discarding");
+ rspamd_rcl_register_worker_option (cfg,
+ type,
+ "collection_only",
+ rspamd_rcl_parse_struct_boolean,
+ ctx,
+ G_STRUCT_OFFSET (struct rspamd_fuzzy_storage_ctx, collection_mode),
+ 0,
+ "Start fuzzy in collection only mode");
+ rspamd_rcl_register_worker_option (cfg,
+ type,
+ "collection_signkey",
+ rspamd_rcl_parse_struct_pubkey,
+ ctx,
+ G_STRUCT_OFFSET (struct rspamd_fuzzy_storage_ctx, collection_sign_key),
+ "Accept only signed requests with the specified key");
+ rspamd_rcl_register_worker_option (cfg,
+ type,
+ "collection_keypair",
+ rspamd_rcl_parse_struct_keypair,
+ ctx,
+ G_STRUCT_OFFSET (struct rspamd_fuzzy_storage_ctx, collection_keypair),
+ 0,
+ "Use the specified keypair to encrypt collection protocol");
return ctx;
@@ -2272,8 +2539,15 @@ fuzzy_peer_rep (struct rspamd_worker *worker,
else if (worker->index == 0) {
/* We allow TCP listeners only for a update worker */
accept_events = g_slice_alloc0 (sizeof (struct event) * 2);
- event_set (&accept_events[0], ls->fd, EV_READ | EV_PERSIST,
- accept_fuzzy_mirror_socket, worker);
+ if (ctx->collection_mode) {
+ event_set (&accept_events[0], ls->fd, EV_READ | EV_PERSIST,
+ accept_fuzzy_collection_socket, worker);
+ }
+ else {
+ event_set (&accept_events[0], ls->fd, EV_READ | EV_PERSIST,
+ accept_fuzzy_mirror_socket, worker);
+ }
event_base_set (ctx->ev_base, &accept_events[0]);
event_add (&accept_events[0], NULL);
worker->accept_events = g_list_prepend (worker->accept_events,
@@ -2318,34 +2592,80 @@ start_fuzzy (struct rspamd_worker *worker)
rspamd_upstreams_library_config (worker->srv->cfg, ctx->cfg->ups_ctx,
ctx->ev_base, ctx->resolver->r);
- /*
- * Open DB and perform VACUUM
- */
- if ((ctx->backend = rspamd_fuzzy_backend_create (ctx->ev_base,
- worker->cf->options, cfg, &err)) == NULL) {
- msg_err ("cannot open backend: %e", err);
- g_error_free (err);
- exit (EXIT_SUCCESS);
- }
- rspamd_fuzzy_backend_count (ctx->backend, fuzzy_count_callback, ctx);
if (ctx->keypair_cache_size > 0) {
/* Create keypairs cache */
ctx->keypair_cache = rspamd_keypair_cache_new (ctx->keypair_cache_size);
- if (worker->index == 0) {
- ctx->updates_pending = g_queue_new ();
- rspamd_fuzzy_backend_start_update (ctx->backend, ctx->sync_timeout,
- rspamd_fuzzy_storage_periodic_callback, ctx);
+ if (!ctx->collection_mode) {
+ /*
+ * Open DB and perform VACUUM
+ */
+ if ((ctx->backend = rspamd_fuzzy_backend_create (ctx->ev_base,
+ worker->cf->options, cfg, &err)) == NULL) {
+ msg_err ("cannot open backend: %e", err);
+ g_error_free (err);
+ exit (EXIT_SUCCESS);
+ }
+ rspamd_fuzzy_backend_count (ctx->backend, fuzzy_count_callback, ctx);
+ if (worker->index == 0) {
+ ctx->updates_pending = g_queue_new ();
+ rspamd_fuzzy_backend_start_update (ctx->backend, ctx->sync_timeout,
+ rspamd_fuzzy_storage_periodic_callback, ctx);
+ }
+ double_to_tv (ctx->sync_timeout, &ctx->stat_tv);
+ event_set (&ctx->stat_ev, -1, EV_TIMEOUT, rspamd_fuzzy_stat_callback, ctx);
+ event_base_set (ctx->ev_base, &ctx->stat_ev);
+ event_add (&ctx->stat_ev, &ctx->stat_tv);
+ /* Register custom reload and stat commands for the control socket */
+ rspamd_control_worker_add_cmd_handler (worker, RSPAMD_CONTROL_RELOAD,
+ rspamd_fuzzy_storage_reload, ctx);
+ rspamd_control_worker_add_cmd_handler (worker, RSPAMD_CONTROL_FUZZY_STAT,
+ rspamd_fuzzy_storage_stat, ctx);
+ rspamd_control_worker_add_cmd_handler (worker, RSPAMD_CONTROL_FUZZY_SYNC,
+ rspamd_fuzzy_storage_sync, ctx);
+ else {
+ /*
+ * In collection mode we do a different thing:
+ * we collect fuzzy hashes in the updates queue and ignore all read commands
+ */
+ if (worker->index == 0) {
+ ctx->updates_pending = g_queue_new ();
+ double_to_tv (ctx->sync_timeout, &ctx->stat_tv);
+ event_set (&ctx->stat_ev, -1, EV_TIMEOUT|EV_PERSIST,
+ rspamd_fuzzy_collection_periodic, ctx);
+ event_base_set (ctx->ev_base, &ctx->stat_ev);
+ event_add (&ctx->stat_ev, &ctx->stat_tv);
+ ctx->collection_rt = rspamd_http_router_new (
+ rspamd_fuzzy_collection_error_handler,
+ rspamd_fuzzy_collection_finish_handler,
+ &ctx->stat_tv,
+ ctx->ev_base,
+ NULL, ctx->keypair_cache);
+ if (ctx->collection_keypair) {
+ rspamd_http_router_set_key (ctx->collection_rt,
+ ctx->collection_keypair);
+ }
- double_to_tv (ctx->sync_timeout, &ctx->stat_tv);
- event_set (&ctx->stat_ev, -1, EV_TIMEOUT, rspamd_fuzzy_stat_callback, ctx);
- event_base_set (ctx->ev_base, &ctx->stat_ev);
- event_add (&ctx->stat_ev, &ctx->stat_tv);
+ /* Generate new cookie */
+ ottery_rand_bytes (ctx->cookie, sizeof (ctx->cookie));
+ /* Register paths */
+ rspamd_http_router_add_path (ctx->collection_rt,
+ "/cookie",
+ rspamd_fuzzy_collection_cookie_handler);
+ rspamd_http_router_add_path (ctx->collection_rt,
+ "/data",
+ rspamd_fuzzy_collection_data_handler);
+ }
+ }
if (ctx->mirrors && ctx->mirrors->len != 0) {
if (ctx->sync_keypair == NULL) {
@@ -2361,14 +2681,6 @@ start_fuzzy (struct rspamd_worker *worker)
- /* Register custom reload and stat commands for the control socket */
- rspamd_control_worker_add_cmd_handler (worker, RSPAMD_CONTROL_RELOAD,
- rspamd_fuzzy_storage_reload, ctx);
- rspamd_control_worker_add_cmd_handler (worker, RSPAMD_CONTROL_FUZZY_STAT,
- rspamd_fuzzy_storage_stat, ctx);
- rspamd_control_worker_add_cmd_handler (worker, RSPAMD_CONTROL_FUZZY_SYNC,
- rspamd_fuzzy_storage_sync, ctx);
/* Create radix trees */
if (ctx->update_map != NULL) {
rspamd_config_radix_from_ucl (worker->srv->cfg, ctx->update_map,
@@ -2407,7 +2719,13 @@ start_fuzzy (struct rspamd_worker *worker)
event_base_loop (ctx->ev_base, 0);
- rspamd_fuzzy_backend_close (ctx->backend);
+ if (!ctx->collection_mode) {
+ rspamd_fuzzy_backend_close (ctx->backend);
+ }
+ else if (worker->index == 0) {
+ rspamd_http_router_free (ctx->collection_rt);
+ }
rspamd_log_close (worker->srv->logger);
if (ctx->peer_fd != -1) {