From: Vsevolod Stakhov Date: Sat, 14 May 2016 12:38:10 +0000 (+0100) Subject: [Feature] Preliminary implementation of fuzzy master/slave updates X-Git-Tag: 1.3.0~473 X-Git-Url: https://source.dussan.org/?a=commitdiff_plain;h=d46a62b2cb8d30ef53ac04fa94cea1dc21fa40c1;p=rspamd.git [Feature] Preliminary implementation of fuzzy master/slave updates --- diff --git a/src/fuzzy_storage.c b/src/fuzzy_storage.c index a1881aff3..d8f82104f 100644 --- a/src/fuzzy_storage.c +++ b/src/fuzzy_storage.c @@ -40,7 +40,7 @@ /* Resync value in seconds */ #define DEFAULT_SYNC_TIMEOUT 60.0 #define DEFAULT_KEYPAIR_CACHE_SIZE 512 - +#define DEFAULT_MASTER_TIMEOUT 10.0 #define INVALID_NODE_TIME (guint64) - 1 @@ -53,7 +53,7 @@ worker_t fuzzy_worker = { init_fuzzy, /* Init function */ start_fuzzy, /* Start function */ RSPAMD_WORKER_HAS_SOCKET, - RSPAMD_WORKER_SOCKET_UDP, /* UDP socket */ + RSPAMD_WORKER_SOCKET_UDP|RSPAMD_WORKER_SOCKET_TCP, /* Both socket */ RSPAMD_WORKER_VER /* Version info */ }; @@ -93,7 +93,12 @@ struct rspamd_fuzzy_storage_ctx { gdouble expire; gdouble sync_timeout; radix_compressed_t *update_ips; + radix_compressed_t *master_ips; + struct rspamd_cryptobox_keypair *master_key; + struct timeval master_io_tv; + gdouble master_timeout; gchar *update_map; + gchar *masters_map; guint keypair_cache_size; struct event_base *ev_base; gint peer_fd; @@ -160,6 +165,12 @@ struct fuzzy_key { struct fuzzy_key_stat *stat; }; +struct fuzzy_master_update_session { + struct rspamd_http_connection *conn; + struct rspamd_fuzzy_storage_ctx *ctx; + rspamd_inet_addr_t *addr; +}; + static void rspamd_fuzzy_write_reply (struct fuzzy_session *session); static gboolean @@ -703,6 +714,102 @@ fuzzy_session_destroy (gpointer d) g_slice_free1 (sizeof (*session), session); } +static void +rspamd_fuzzy_mirror_session_destroy (struct fuzzy_master_update_session *session) +{ + if (session) { + rspamd_http_connection_unref (session->conn); + rspamd_inet_address_destroy (session->addr); + g_slice_free1 (sizeof (*session), session); + } +} + +static void +rspamd_fuzzy_mirror_error_handler (struct rspamd_http_connection *conn, GError *err) +{ + struct fuzzy_master_update_session *session = conn->ud; + + msg_err ("abnormally closing connection from: %s, error: %e", + rspamd_inet_address_to_string (session->addr), err); + /* Terminate session immediately */ + rspamd_fuzzy_mirror_session_destroy (session); +} + +static gint +rspamd_fuzzy_mirror_finish_handler (struct rspamd_http_connection *conn, + struct rspamd_http_message *msg) +{ + struct fuzzy_master_update_session *session = conn->ud; + + /* TODO: implement updates */ + rspamd_fuzzy_mirror_session_destroy (session); + + return 0; +} + +static void +accept_fuzzy_mirror_socket (gint fd, short what, void *arg) +{ + struct rspamd_worker *worker = (struct rspamd_worker *)arg; + rspamd_inet_addr_t *addr; + gint nfd; + struct rspamd_http_connection *http_conn; + struct rspamd_fuzzy_storage_ctx *ctx; + struct fuzzy_master_update_session *session; + + if ((nfd = + rspamd_accept_from_socket (fd, &addr)) == -1) { + msg_warn ("accept failed: %s", strerror (errno)); + return; + } + /* Check for EAGAIN */ + if (nfd == 0) { + return; + } + + ctx = worker->ctx; + + if (!ctx->master_ips) { + msg_err ("deny update request from %s as no masters defined", + rspamd_inet_address_to_string (addr)); + rspamd_inet_address_destroy (addr); + close (nfd); + + return; + } + else if (radix_find_compressed_addr (ctx->master_ips, addr) == RADIX_NO_VALUE) { + msg_err ("deny update request from %s", + rspamd_inet_address_to_string (addr)); + rspamd_inet_address_destroy (addr); + close (nfd); + + return; + } + + session = g_slice_alloc0 (sizeof (*session)); + http_conn = rspamd_http_connection_new ( + NULL, + rspamd_fuzzy_mirror_error_handler, + rspamd_fuzzy_mirror_finish_handler, + 0, + RSPAMD_HTTP_SERVER, + ctx->keypair_cache); + + if (ctx->master_key) { + rspamd_http_connection_set_key (http_conn, ctx->master_key); + } + + session->ctx = ctx; + session->conn = http_conn; + session->addr = addr; + + rspamd_http_connection_read_message (http_conn, + session, + nfd, + &ctx->master_io_tv, + ctx->ev_base); +} + /* * Accept new connection and construct task */ @@ -1211,6 +1318,7 @@ init_fuzzy (struct rspamd_config *cfg) ctx->magic = rspamd_fuzzy_storage_magic; ctx->sync_timeout = DEFAULT_SYNC_TIMEOUT; + ctx->master_timeout = DEFAULT_MASTER_TIMEOUT; ctx->expire = DEFAULT_EXPIRE; ctx->keypair_cache_size = DEFAULT_KEYPAIR_CACHE_SIZE; ctx->keys = g_hash_table_new_full (fuzzy_kp_hash, fuzzy_kp_equal, @@ -1315,6 +1423,32 @@ init_fuzzy (struct rspamd_config *cfg) 0, "Allow encrypted requests only (and forbid all unknown keys or plaintext requests)"); + rspamd_rcl_register_worker_option (cfg, + type, + "master_timeout", + rspamd_rcl_parse_struct_time, + ctx, + G_STRUCT_OFFSET (struct rspamd_fuzzy_storage_ctx, master_timeout), + RSPAMD_CL_FLAG_TIME_FLOAT, + "Master protocol IO timeout"); + + rspamd_rcl_register_worker_option (cfg, + type, + "master_key", + rspamd_rcl_parse_struct_keypair, + ctx, + G_STRUCT_OFFSET (struct rspamd_fuzzy_storage_ctx, master_key), + 0, + "Encryption key for master/slave updates"); + + rspamd_rcl_register_worker_option (cfg, + type, + "masters", + rspamd_rcl_parse_struct_string, + ctx, + G_STRUCT_OFFSET (struct rspamd_fuzzy_storage_ctx, masters_map), + 0, + "Allow master/slave updates from the following IP addresses"); return ctx; } @@ -1371,13 +1505,25 @@ fuzzy_peer_rep (struct rspamd_worker *worker, ls = cur->data; if (ls->fd != -1) { - accept_event = g_slice_alloc0 (sizeof (struct event)); - event_set (accept_event, ls->fd, EV_READ | EV_PERSIST, - accept_fuzzy_socket, worker); - event_base_set (ctx->ev_base, accept_event); - event_add (accept_event, NULL); - worker->accept_events = g_list_prepend (worker->accept_events, - accept_event); + if (ls->type == RSPAMD_WORKER_SOCKET_UDP) { + accept_event = g_slice_alloc0 (sizeof (struct event)); + event_set (accept_event, ls->fd, EV_READ | EV_PERSIST, + accept_fuzzy_socket, worker); + event_base_set (ctx->ev_base, accept_event); + event_add (accept_event, NULL); + worker->accept_events = g_list_prepend (worker->accept_events, + accept_event); + } + else if (worker->index == 0) { + /* We allow TCP listeners only for a update worker */ + accept_event = g_slice_alloc0 (sizeof (struct event)); + event_set (accept_event, ls->fd, EV_READ | EV_PERSIST, + accept_fuzzy_mirror_socket, worker); + event_base_set (ctx->ev_base, accept_event); + event_add (accept_event, NULL); + worker->accept_events = g_list_prepend (worker->accept_events, + accept_event); + } } cur = g_list_next (cur); @@ -1415,6 +1561,7 @@ start_fuzzy (struct rspamd_worker *worker) "fuzzy", NULL); ctx->peer_fd = -1; + double_to_tv (ctx->master_timeout, &ctx->master_io_tv); /* * Open DB and perform VACUUM @@ -1443,7 +1590,7 @@ start_fuzzy (struct rspamd_worker *worker) rspamd_fuzzy_storage_stat, ctx); rspamd_control_worker_add_cmd_handler (worker, RSPAMD_CONTROL_FUZZY_SYNC, rspamd_fuzzy_storage_sync, ctx); - /* Create radix tree */ + /* Create radix trees */ if (ctx->update_map != NULL) { if (!rspamd_map_is_map (ctx->update_map)) { if (!radix_add_generic_iplist (ctx->update_map, @@ -1460,6 +1607,22 @@ start_fuzzy (struct rspamd_worker *worker) } } + if (ctx->masters_map != NULL) { + if (!rspamd_map_is_map (ctx->masters_map)) { + if (!radix_add_generic_iplist (ctx->masters_map, + &ctx->master_ips)) { + msg_warn ("cannot load or parse ip list from '%s'", + ctx->masters_map); + } + } + else { + rspamd_map_add (worker->srv->cfg, ctx->masters_map, + "Allow fuzzy master/slave updates from specified addresses", + rspamd_radix_read, rspamd_radix_fin, + (void **)&ctx->master_ips); + + } + } /* Maps events */ ctx->resolver = dns_resolver_init (worker->srv->logger, diff --git a/src/worker.c b/src/worker.c index d3e3fc4c8..ac104f7f0 100644 --- a/src/worker.c +++ b/src/worker.c @@ -320,9 +320,9 @@ accept_socket (gint fd, short what, void *arg) rspamd_http_connection_read_message (task->http_conn, task, - nfd, - &ctx->io_tv, - ctx->ev_base); + nfd, + &ctx->io_tv, + ctx->ev_base); } #ifdef WITH_HYPERSCAN