diff options
author | Vsevolod Stakhov <vsevolod@highsecure.ru> | 2017-01-26 14:35:10 +0000 |
---|---|---|
committer | Vsevolod Stakhov <vsevolod@highsecure.ru> | 2017-01-26 15:50:43 +0000 |
commit | 3e1e0ffe8fe4b054b61008a7c6e93470ba578224 (patch) | |
tree | 0a8abb31eb3dc22d62990b77a766962b9a9d3048 /src/fuzzy_storage.c | |
parent | 6713205e121e37bdbbf38faa852ad9ea9a9a225e (diff) | |
download | rspamd-3e1e0ffe8fe4b054b61008a7c6e93470ba578224.tar.gz rspamd-3e1e0ffe8fe4b054b61008a7c6e93470ba578224.zip |
[Feature] Start collection only mode implementation for fuzzy storage
Diffstat (limited to 'src/fuzzy_storage.c')
-rw-r--r-- | src/fuzzy_storage.c | 402 |
1 files changed, 360 insertions, 42 deletions
diff --git a/src/fuzzy_storage.c b/src/fuzzy_storage.c index 9738c3f63..d1cb2053a 100644 --- a/src/fuzzy_storage.c +++ b/src/fuzzy_storage.c @@ -41,6 +41,7 @@ #define DEFAULT_KEYPAIR_CACHE_SIZE 512 #define DEFAULT_MASTER_TIMEOUT 10.0 #define DEFAULT_UPDATES_MAXFAIL 3 +#define COOKIE_SIZE 128 static const gchar *local_db_name = "local"; @@ -49,15 +50,31 @@ static const gchar *local_db_name = "local"; G_STRFUNC, \ __VA_ARGS__) #define msg_warn_fuzzy_update(...) rspamd_default_log_function (G_LOG_LEVEL_WARNING, \ - session->name, session->uid, \ + session->name, session->uid, \ G_STRFUNC, \ __VA_ARGS__) #define msg_info_fuzzy_update(...) rspamd_default_log_function (G_LOG_LEVEL_INFO, \ - session->name, session->uid, \ + session->name, session->uid, \ G_STRFUNC, \ __VA_ARGS__) #define msg_debug_fuzzy_update(...) rspamd_default_log_function (G_LOG_LEVEL_DEBUG, \ - session->name, session->uid, \ + session->name, session->uid, \ + G_STRFUNC, \ + __VA_ARGS__) +#define msg_err_fuzzy_collection(...) rspamd_default_log_function (G_LOG_LEVEL_CRITICAL, \ + "fuzzy_collection", session->uid, \ + G_STRFUNC, \ + __VA_ARGS__) +#define msg_warn_fuzzy_collection(...) rspamd_default_log_function (G_LOG_LEVEL_WARNING, \ + "fuzzy_collection", session->uid, \ + G_STRFUNC, \ + __VA_ARGS__) +#define msg_info_fuzzy_collection(...) rspamd_default_log_function (G_LOG_LEVEL_INFO, \ + "fuzzy_collection", session->uid, \ + G_STRFUNC, \ + __VA_ARGS__) +#define msg_debug_fuzzy_collection(...) rspamd_default_log_function (G_LOG_LEVEL_DEBUG, \ + "fuzzy_collection", session->uid, \ G_STRFUNC, \ __VA_ARGS__) @@ -132,6 +149,9 @@ struct rspamd_fuzzy_storage_ctx { struct fuzzy_key *default_key; GHashTable *keys; gboolean encrypted_only; + gboolean collection_mode; + struct rspamd_cryptobox_keypair *collection_keypair; + struct rspamd_cryptobox_pubkey *collection_sign_key; struct rspamd_keypair_cache *keypair_cache; rspamd_lru_hash_t *errors_ips; struct rspamd_fuzzy_backend *backend; @@ -141,6 +161,8 @@ struct rspamd_fuzzy_storage_ctx { struct rspamd_dns_resolver *resolver; struct rspamd_config *cfg; struct rspamd_worker *worker; + struct rspamd_http_connection_router *collection_rt; + guchar cookie[COOKIE_SIZE]; }; enum fuzzy_cmd_type { @@ -816,16 +838,33 @@ rspamd_fuzzy_process_command (struct fuzzy_session *session) } result.flag = cmd->flag; + if (cmd->cmd == FUZZY_CHECK) { - REF_RETAIN (session); - rspamd_fuzzy_backend_check (session->ctx->backend, cmd, - rspamd_fuzzy_check_callback, session); + if (G_UNLIKELY (session->ctx->collection_mode)) { + result.prob = 0; + result.value = 500; + result.flag = 0; + rspamd_fuzzy_make_reply (cmd, &result, session, encrypted, is_shingle); + } + else { + REF_RETAIN (session); + rspamd_fuzzy_backend_check (session->ctx->backend, cmd, + rspamd_fuzzy_check_callback, session); + } } else if (cmd->cmd == FUZZY_STAT) { - result.prob = 1.0; - result.value = 0; - result.flag = session->ctx->stat.fuzzy_hashes; - rspamd_fuzzy_make_reply (cmd, &result, session, encrypted, is_shingle); + if (G_UNLIKELY (session->ctx->collection_mode)) { + result.prob = 0; + result.value = 500; + result.flag = 0; + rspamd_fuzzy_make_reply (cmd, &result, session, encrypted, is_shingle); + } + else { + result.prob = 1.0; + result.value = 0; + result.flag = session->ctx->stat.fuzzy_hashes; + rspamd_fuzzy_make_reply (cmd, &result, session, encrypted, is_shingle); + } } else { if (rspamd_fuzzy_check_client (session)) { @@ -1347,6 +1386,210 @@ end: return 0; } +struct rspamd_fuzzy_collection_session { + struct rspamd_fuzzy_storage_ctx *ctx; + struct rspamd_worker *worker; + rspamd_inet_addr_t *from_addr; + guchar uid[16]; +}; + +static void +rspamd_fuzzy_collection_error_handler (struct rspamd_http_connection_entry *conn_ent, + GError *err) +{ + struct rspamd_fuzzy_collection_session *session = conn_ent->ud; + + msg_err_fuzzy_collection ("http error occurred: %s", err->message); +} + +static void +rspamd_fuzzy_collection_finish_handler (struct rspamd_http_connection_entry *conn_ent) +{ + struct rspamd_fuzzy_collection_session *session = conn_ent->ud; + + + rspamd_inet_address_destroy (session->from_addr); + + + g_slice_free1 (sizeof (struct rspamd_fuzzy_collection_session), session); +} + +void +rspamd_fuzzy_collection_send_error (struct rspamd_http_connection_entry *entry, + gint code, const gchar *error_msg, ...) +{ + struct rspamd_http_message *msg; + va_list args; + rspamd_fstring_t *reply; + + msg = rspamd_http_new_message (HTTP_RESPONSE); + + va_start (args, error_msg); + msg->status = rspamd_fstring_new (); + rspamd_vprintf_fstring (&msg->status, error_msg, args); + va_end (args); + + msg->date = time (NULL); + msg->code = code; + reply = rspamd_fstring_sized_new (msg->status->len + 16); + rspamd_printf_fstring (&reply, "%V", msg->status); + rspamd_http_message_set_body_from_fstring_steal (msg, reply); + rspamd_http_connection_reset (entry->conn); + rspamd_http_router_insert_headers (entry->rt, msg); + rspamd_http_connection_write_message (entry->conn, + msg, + NULL, + "text/plain", + entry, + entry->conn->fd, + entry->rt->ptv, + entry->rt->ev_base); + entry->is_reply = TRUE; +} + +/* + * Note: this function steals fstring + */ +void +rspamd_fuzzy_collection_send_fstring (struct rspamd_http_connection_entry *entry, + rspamd_fstring_t *fstr) +{ + struct rspamd_http_message *msg; + + msg = rspamd_http_new_message (HTTP_RESPONSE); + msg->status = rspamd_fstring_new_init ("OK", 2); + msg->date = time (NULL); + msg->code = 200; + rspamd_http_message_set_body_from_fstring_steal (msg, fstr); + rspamd_http_connection_reset (entry->conn); + rspamd_http_router_insert_headers (entry->rt, msg); + rspamd_http_connection_write_message (entry->conn, + msg, + NULL, + "application/octet-stream", + entry, + entry->conn->fd, + entry->rt->ptv, + entry->rt->ev_base); + entry->is_reply = TRUE; +} + +static int +rspamd_fuzzy_collection_cookie_handler (struct rspamd_http_connection_entry *conn_ent, + struct rspamd_http_message *msg) +{ + struct rspamd_fuzzy_collection_session *session = conn_ent->ud; + rspamd_fstring_t *cookie; + + cookie = rspamd_fstring_new_init (session->ctx->cookie, + sizeof (session->ctx->cookie)); + rspamd_fuzzy_collection_send_fstring (conn_ent, cookie); + + return 0; +} + +static int +rspamd_fuzzy_collection_data_handler (struct rspamd_http_connection_entry *conn_ent, + struct rspamd_http_message *msg) +{ + struct rspamd_fuzzy_collection_session *session = conn_ent->ud; + rspamd_fstring_t *cookie; + + cookie = rspamd_fstring_new_init (session->ctx->cookie, + sizeof (session->ctx->cookie)); + rspamd_fuzzy_collection_send_fstring (conn_ent, cookie); + + return 0; +} + + +static void +accept_fuzzy_collection_socket (gint fd, short what, void *arg) +{ + struct rspamd_worker *worker = (struct rspamd_worker *)arg; + rspamd_inet_addr_t *addr; + gint nfd; + struct rspamd_fuzzy_storage_ctx *ctx; + struct rspamd_fuzzy_collection_session *session; + + if ((nfd = + rspamd_accept_from_socket (fd, &addr, worker->accept_events)) == -1) { + msg_warn ("accept failed: %s", strerror (errno)); + return; + } + /* Check for EAGAIN */ + if (nfd == 0) { + return; + } + + ctx = worker->ctx; + + if (!ctx->collection_keypair) { + msg_err ("deny request from %s, as no local keypair is specified", + rspamd_inet_address_to_string (addr)); + rspamd_inet_address_destroy (addr); + close (nfd); + + return; + } + + session = g_slice_alloc0 (sizeof (*session)); + session->ctx = ctx; + session->worker = worker; + rspamd_random_hex (session->uid, sizeof (session->uid) - 1); + session->uid[sizeof (session->uid) - 1] = '\0'; + session->from_addr = addr; + rspamd_http_router_handle_socket (ctx->collection_rt, nfd, session); + msg_info_fuzzy_collection ("accepted connection from %s port %d, session ptr: %p", + rspamd_inet_address_to_string (addr), + rspamd_inet_address_get_port (addr), + session); +} + +static void +rspamd_fuzzy_collection_periodic (gint fd, gshort what, gpointer ud) +{ + struct rspamd_fuzzy_storage_ctx *ctx = ud; + GList *cur; + struct fuzzy_peer_cmd *io_cmd; + + if (++ctx->updates_failed > ctx->updates_maxfail) { + msg_err ("cannot store more data in workqueue, discard " + "%ud updates after %d missed collection points", + g_queue_get_length (ctx->updates_pending), + ctx->updates_maxfail); + ctx->updates_failed = 0; + cur = ctx->updates_pending->head; + + while (cur) { + io_cmd = cur->data; + g_slice_free1 (sizeof (*io_cmd), io_cmd); + cur = g_list_next (cur); + } + + g_queue_clear (ctx->updates_pending); + /* Regenerate cookie */ + ottery_rand_bytes (ctx->cookie, sizeof (ctx->cookie)); + } + else { + msg_err ("fuzzy data has not been collected in time, " + "%ud updates are still pending, %d updates left", + g_queue_get_length (ctx->updates_pending), + ctx->updates_maxfail - ctx->updates_failed); + } + + if (ctx->worker->wanna_die) { + /* Plan exit */ + struct timeval tv; + + tv.tv_sec = 0; + tv.tv_usec = 0; + + event_base_loopexit (ctx->ev_base, &tv); + } +} + + static void accept_fuzzy_mirror_socket (gint fd, short what, void *arg) { @@ -2205,6 +2448,30 @@ init_fuzzy (struct rspamd_config *cfg) G_STRUCT_OFFSET (struct rspamd_fuzzy_storage_ctx, updates_maxfail), RSPAMD_CL_FLAG_UINT, "Maximum number of updates to be failed before discarding"); + rspamd_rcl_register_worker_option (cfg, + type, + "collection_only", + rspamd_rcl_parse_struct_boolean, + ctx, + G_STRUCT_OFFSET (struct rspamd_fuzzy_storage_ctx, collection_mode), + 0, + "Start fuzzy in collection only mode"); + rspamd_rcl_register_worker_option (cfg, + type, + "collection_signkey", + rspamd_rcl_parse_struct_pubkey, + ctx, + G_STRUCT_OFFSET (struct rspamd_fuzzy_storage_ctx, collection_sign_key), + RSPAMD_CL_FLAG_SIGNKEY, + "Accept only signed requests with the specified key"); + rspamd_rcl_register_worker_option (cfg, + type, + "collection_keypair", + rspamd_rcl_parse_struct_keypair, + ctx, + G_STRUCT_OFFSET (struct rspamd_fuzzy_storage_ctx, collection_keypair), + 0, + "Use the specified keypair to encrypt collection protocol"); return ctx; } @@ -2272,8 +2539,15 @@ fuzzy_peer_rep (struct rspamd_worker *worker, else if (worker->index == 0) { /* We allow TCP listeners only for a update worker */ accept_events = g_slice_alloc0 (sizeof (struct event) * 2); - event_set (&accept_events[0], ls->fd, EV_READ | EV_PERSIST, - accept_fuzzy_mirror_socket, worker); + + if (ctx->collection_mode) { + event_set (&accept_events[0], ls->fd, EV_READ | EV_PERSIST, + accept_fuzzy_collection_socket, worker); + } + else { + event_set (&accept_events[0], ls->fd, EV_READ | EV_PERSIST, + accept_fuzzy_mirror_socket, worker); + } event_base_set (ctx->ev_base, &accept_events[0]); event_add (&accept_events[0], NULL); worker->accept_events = g_list_prepend (worker->accept_events, @@ -2318,34 +2592,80 @@ start_fuzzy (struct rspamd_worker *worker) worker->srv->cfg); rspamd_upstreams_library_config (worker->srv->cfg, ctx->cfg->ups_ctx, ctx->ev_base, ctx->resolver->r); - - /* - * Open DB and perform VACUUM - */ - if ((ctx->backend = rspamd_fuzzy_backend_create (ctx->ev_base, - worker->cf->options, cfg, &err)) == NULL) { - msg_err ("cannot open backend: %e", err); - g_error_free (err); - exit (EXIT_SUCCESS); - } - - rspamd_fuzzy_backend_count (ctx->backend, fuzzy_count_callback, ctx); - if (ctx->keypair_cache_size > 0) { /* Create keypairs cache */ ctx->keypair_cache = rspamd_keypair_cache_new (ctx->keypair_cache_size); } - if (worker->index == 0) { - ctx->updates_pending = g_queue_new (); - rspamd_fuzzy_backend_start_update (ctx->backend, ctx->sync_timeout, - rspamd_fuzzy_storage_periodic_callback, ctx); + if (!ctx->collection_mode) { + /* + * Open DB and perform VACUUM + */ + if ((ctx->backend = rspamd_fuzzy_backend_create (ctx->ev_base, + worker->cf->options, cfg, &err)) == NULL) { + msg_err ("cannot open backend: %e", err); + g_error_free (err); + exit (EXIT_SUCCESS); + } + + rspamd_fuzzy_backend_count (ctx->backend, fuzzy_count_callback, ctx); + + + if (worker->index == 0) { + ctx->updates_pending = g_queue_new (); + rspamd_fuzzy_backend_start_update (ctx->backend, ctx->sync_timeout, + rspamd_fuzzy_storage_periodic_callback, ctx); + } + + double_to_tv (ctx->sync_timeout, &ctx->stat_tv); + event_set (&ctx->stat_ev, -1, EV_TIMEOUT, rspamd_fuzzy_stat_callback, ctx); + event_base_set (ctx->ev_base, &ctx->stat_ev); + event_add (&ctx->stat_ev, &ctx->stat_tv); + + /* Register custom reload and stat commands for the control socket */ + rspamd_control_worker_add_cmd_handler (worker, RSPAMD_CONTROL_RELOAD, + rspamd_fuzzy_storage_reload, ctx); + rspamd_control_worker_add_cmd_handler (worker, RSPAMD_CONTROL_FUZZY_STAT, + rspamd_fuzzy_storage_stat, ctx); + rspamd_control_worker_add_cmd_handler (worker, RSPAMD_CONTROL_FUZZY_SYNC, + rspamd_fuzzy_storage_sync, ctx); } + else { + /* + * In collection mode we do a different thing: + * we collect fuzzy hashes in the updates queue and ignore all read commands + */ + if (worker->index == 0) { + ctx->updates_pending = g_queue_new (); + double_to_tv (ctx->sync_timeout, &ctx->stat_tv); + event_set (&ctx->stat_ev, -1, EV_TIMEOUT|EV_PERSIST, + rspamd_fuzzy_collection_periodic, ctx); + event_base_set (ctx->ev_base, &ctx->stat_ev); + event_add (&ctx->stat_ev, &ctx->stat_tv); + + ctx->collection_rt = rspamd_http_router_new ( + rspamd_fuzzy_collection_error_handler, + rspamd_fuzzy_collection_finish_handler, + &ctx->stat_tv, + ctx->ev_base, + NULL, ctx->keypair_cache); + + if (ctx->collection_keypair) { + rspamd_http_router_set_key (ctx->collection_rt, + ctx->collection_keypair); + } - double_to_tv (ctx->sync_timeout, &ctx->stat_tv); - event_set (&ctx->stat_ev, -1, EV_TIMEOUT, rspamd_fuzzy_stat_callback, ctx); - event_base_set (ctx->ev_base, &ctx->stat_ev); - event_add (&ctx->stat_ev, &ctx->stat_tv); + /* Generate new cookie */ + ottery_rand_bytes (ctx->cookie, sizeof (ctx->cookie)); + /* Register paths */ + rspamd_http_router_add_path (ctx->collection_rt, + "/cookie", + rspamd_fuzzy_collection_cookie_handler); + rspamd_http_router_add_path (ctx->collection_rt, + "/data", + rspamd_fuzzy_collection_data_handler); + } + } if (ctx->mirrors && ctx->mirrors->len != 0) { if (ctx->sync_keypair == NULL) { @@ -2361,14 +2681,6 @@ start_fuzzy (struct rspamd_worker *worker) } } - /* Register custom reload and stat commands for the control socket */ - rspamd_control_worker_add_cmd_handler (worker, RSPAMD_CONTROL_RELOAD, - rspamd_fuzzy_storage_reload, ctx); - rspamd_control_worker_add_cmd_handler (worker, RSPAMD_CONTROL_FUZZY_STAT, - rspamd_fuzzy_storage_stat, ctx); - rspamd_control_worker_add_cmd_handler (worker, RSPAMD_CONTROL_FUZZY_SYNC, - rspamd_fuzzy_storage_sync, ctx); - /* Create radix trees */ if (ctx->update_map != NULL) { rspamd_config_radix_from_ucl (worker->srv->cfg, ctx->update_map, @@ -2407,7 +2719,13 @@ start_fuzzy (struct rspamd_worker *worker) event_base_loop (ctx->ev_base, 0); } - rspamd_fuzzy_backend_close (ctx->backend); + if (!ctx->collection_mode) { + rspamd_fuzzy_backend_close (ctx->backend); + } + else if (worker->index == 0) { + rspamd_http_router_free (ctx->collection_rt); + } + rspamd_log_close (worker->srv->logger); if (ctx->peer_fd != -1) { |