|
|
@@ -40,7 +40,7 @@ |
|
|
|
/* Resync value in seconds */ |
|
|
|
#define DEFAULT_SYNC_TIMEOUT 60.0 |
|
|
|
#define DEFAULT_KEYPAIR_CACHE_SIZE 512 |
|
|
|
|
|
|
|
#define DEFAULT_MASTER_TIMEOUT 10.0 |
|
|
|
|
|
|
|
#define INVALID_NODE_TIME (guint64) - 1 |
|
|
|
|
|
|
@@ -53,7 +53,7 @@ worker_t fuzzy_worker = { |
|
|
|
init_fuzzy, /* Init function */ |
|
|
|
start_fuzzy, /* Start function */ |
|
|
|
RSPAMD_WORKER_HAS_SOCKET, |
|
|
|
RSPAMD_WORKER_SOCKET_UDP, /* UDP socket */ |
|
|
|
RSPAMD_WORKER_SOCKET_UDP|RSPAMD_WORKER_SOCKET_TCP, /* Both socket */ |
|
|
|
RSPAMD_WORKER_VER /* Version info */ |
|
|
|
}; |
|
|
|
|
|
|
@@ -93,7 +93,12 @@ struct rspamd_fuzzy_storage_ctx { |
|
|
|
gdouble expire; |
|
|
|
gdouble sync_timeout; |
|
|
|
radix_compressed_t *update_ips; |
|
|
|
radix_compressed_t *master_ips; |
|
|
|
struct rspamd_cryptobox_keypair *master_key; |
|
|
|
struct timeval master_io_tv; |
|
|
|
gdouble master_timeout; |
|
|
|
gchar *update_map; |
|
|
|
gchar *masters_map; |
|
|
|
guint keypair_cache_size; |
|
|
|
struct event_base *ev_base; |
|
|
|
gint peer_fd; |
|
|
@@ -160,6 +165,12 @@ struct fuzzy_key { |
|
|
|
struct fuzzy_key_stat *stat; |
|
|
|
}; |
|
|
|
|
|
|
|
struct fuzzy_master_update_session { |
|
|
|
struct rspamd_http_connection *conn; |
|
|
|
struct rspamd_fuzzy_storage_ctx *ctx; |
|
|
|
rspamd_inet_addr_t *addr; |
|
|
|
}; |
|
|
|
|
|
|
|
static void rspamd_fuzzy_write_reply (struct fuzzy_session *session); |
|
|
|
|
|
|
|
static gboolean |
|
|
@@ -703,6 +714,102 @@ fuzzy_session_destroy (gpointer d) |
|
|
|
g_slice_free1 (sizeof (*session), session); |
|
|
|
} |
|
|
|
|
|
|
|
static void |
|
|
|
rspamd_fuzzy_mirror_session_destroy (struct fuzzy_master_update_session *session) |
|
|
|
{ |
|
|
|
if (session) { |
|
|
|
rspamd_http_connection_unref (session->conn); |
|
|
|
rspamd_inet_address_destroy (session->addr); |
|
|
|
g_slice_free1 (sizeof (*session), session); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
static void |
|
|
|
rspamd_fuzzy_mirror_error_handler (struct rspamd_http_connection *conn, GError *err) |
|
|
|
{ |
|
|
|
struct fuzzy_master_update_session *session = conn->ud; |
|
|
|
|
|
|
|
msg_err ("abnormally closing connection from: %s, error: %e", |
|
|
|
rspamd_inet_address_to_string (session->addr), err); |
|
|
|
/* Terminate session immediately */ |
|
|
|
rspamd_fuzzy_mirror_session_destroy (session); |
|
|
|
} |
|
|
|
|
|
|
|
static gint |
|
|
|
rspamd_fuzzy_mirror_finish_handler (struct rspamd_http_connection *conn, |
|
|
|
struct rspamd_http_message *msg) |
|
|
|
{ |
|
|
|
struct fuzzy_master_update_session *session = conn->ud; |
|
|
|
|
|
|
|
/* TODO: implement updates */ |
|
|
|
rspamd_fuzzy_mirror_session_destroy (session); |
|
|
|
|
|
|
|
return 0; |
|
|
|
} |
|
|
|
|
|
|
|
static void |
|
|
|
accept_fuzzy_mirror_socket (gint fd, short what, void *arg) |
|
|
|
{ |
|
|
|
struct rspamd_worker *worker = (struct rspamd_worker *)arg; |
|
|
|
rspamd_inet_addr_t *addr; |
|
|
|
gint nfd; |
|
|
|
struct rspamd_http_connection *http_conn; |
|
|
|
struct rspamd_fuzzy_storage_ctx *ctx; |
|
|
|
struct fuzzy_master_update_session *session; |
|
|
|
|
|
|
|
if ((nfd = |
|
|
|
rspamd_accept_from_socket (fd, &addr)) == -1) { |
|
|
|
msg_warn ("accept failed: %s", strerror (errno)); |
|
|
|
return; |
|
|
|
} |
|
|
|
/* Check for EAGAIN */ |
|
|
|
if (nfd == 0) { |
|
|
|
return; |
|
|
|
} |
|
|
|
|
|
|
|
ctx = worker->ctx; |
|
|
|
|
|
|
|
if (!ctx->master_ips) { |
|
|
|
msg_err ("deny update request from %s as no masters defined", |
|
|
|
rspamd_inet_address_to_string (addr)); |
|
|
|
rspamd_inet_address_destroy (addr); |
|
|
|
close (nfd); |
|
|
|
|
|
|
|
return; |
|
|
|
} |
|
|
|
else if (radix_find_compressed_addr (ctx->master_ips, addr) == RADIX_NO_VALUE) { |
|
|
|
msg_err ("deny update request from %s", |
|
|
|
rspamd_inet_address_to_string (addr)); |
|
|
|
rspamd_inet_address_destroy (addr); |
|
|
|
close (nfd); |
|
|
|
|
|
|
|
return; |
|
|
|
} |
|
|
|
|
|
|
|
session = g_slice_alloc0 (sizeof (*session)); |
|
|
|
http_conn = rspamd_http_connection_new ( |
|
|
|
NULL, |
|
|
|
rspamd_fuzzy_mirror_error_handler, |
|
|
|
rspamd_fuzzy_mirror_finish_handler, |
|
|
|
0, |
|
|
|
RSPAMD_HTTP_SERVER, |
|
|
|
ctx->keypair_cache); |
|
|
|
|
|
|
|
if (ctx->master_key) { |
|
|
|
rspamd_http_connection_set_key (http_conn, ctx->master_key); |
|
|
|
} |
|
|
|
|
|
|
|
session->ctx = ctx; |
|
|
|
session->conn = http_conn; |
|
|
|
session->addr = addr; |
|
|
|
|
|
|
|
rspamd_http_connection_read_message (http_conn, |
|
|
|
session, |
|
|
|
nfd, |
|
|
|
&ctx->master_io_tv, |
|
|
|
ctx->ev_base); |
|
|
|
} |
|
|
|
|
|
|
|
/* |
|
|
|
* Accept new connection and construct task |
|
|
|
*/ |
|
|
@@ -1211,6 +1318,7 @@ init_fuzzy (struct rspamd_config *cfg) |
|
|
|
|
|
|
|
ctx->magic = rspamd_fuzzy_storage_magic; |
|
|
|
ctx->sync_timeout = DEFAULT_SYNC_TIMEOUT; |
|
|
|
ctx->master_timeout = DEFAULT_MASTER_TIMEOUT; |
|
|
|
ctx->expire = DEFAULT_EXPIRE; |
|
|
|
ctx->keypair_cache_size = DEFAULT_KEYPAIR_CACHE_SIZE; |
|
|
|
ctx->keys = g_hash_table_new_full (fuzzy_kp_hash, fuzzy_kp_equal, |
|
|
@@ -1315,6 +1423,32 @@ init_fuzzy (struct rspamd_config *cfg) |
|
|
|
0, |
|
|
|
"Allow encrypted requests only (and forbid all unknown keys or plaintext requests)"); |
|
|
|
|
|
|
|
rspamd_rcl_register_worker_option (cfg, |
|
|
|
type, |
|
|
|
"master_timeout", |
|
|
|
rspamd_rcl_parse_struct_time, |
|
|
|
ctx, |
|
|
|
G_STRUCT_OFFSET (struct rspamd_fuzzy_storage_ctx, master_timeout), |
|
|
|
RSPAMD_CL_FLAG_TIME_FLOAT, |
|
|
|
"Master protocol IO timeout"); |
|
|
|
|
|
|
|
rspamd_rcl_register_worker_option (cfg, |
|
|
|
type, |
|
|
|
"master_key", |
|
|
|
rspamd_rcl_parse_struct_keypair, |
|
|
|
ctx, |
|
|
|
G_STRUCT_OFFSET (struct rspamd_fuzzy_storage_ctx, master_key), |
|
|
|
0, |
|
|
|
"Encryption key for master/slave updates"); |
|
|
|
|
|
|
|
rspamd_rcl_register_worker_option (cfg, |
|
|
|
type, |
|
|
|
"masters", |
|
|
|
rspamd_rcl_parse_struct_string, |
|
|
|
ctx, |
|
|
|
G_STRUCT_OFFSET (struct rspamd_fuzzy_storage_ctx, masters_map), |
|
|
|
0, |
|
|
|
"Allow master/slave updates from the following IP addresses"); |
|
|
|
|
|
|
|
return ctx; |
|
|
|
} |
|
|
@@ -1371,13 +1505,25 @@ fuzzy_peer_rep (struct rspamd_worker *worker, |
|
|
|
ls = cur->data; |
|
|
|
|
|
|
|
if (ls->fd != -1) { |
|
|
|
accept_event = g_slice_alloc0 (sizeof (struct event)); |
|
|
|
event_set (accept_event, ls->fd, EV_READ | EV_PERSIST, |
|
|
|
accept_fuzzy_socket, worker); |
|
|
|
event_base_set (ctx->ev_base, accept_event); |
|
|
|
event_add (accept_event, NULL); |
|
|
|
worker->accept_events = g_list_prepend (worker->accept_events, |
|
|
|
accept_event); |
|
|
|
if (ls->type == RSPAMD_WORKER_SOCKET_UDP) { |
|
|
|
accept_event = g_slice_alloc0 (sizeof (struct event)); |
|
|
|
event_set (accept_event, ls->fd, EV_READ | EV_PERSIST, |
|
|
|
accept_fuzzy_socket, worker); |
|
|
|
event_base_set (ctx->ev_base, accept_event); |
|
|
|
event_add (accept_event, NULL); |
|
|
|
worker->accept_events = g_list_prepend (worker->accept_events, |
|
|
|
accept_event); |
|
|
|
} |
|
|
|
else if (worker->index == 0) { |
|
|
|
/* We allow TCP listeners only for a update worker */ |
|
|
|
accept_event = g_slice_alloc0 (sizeof (struct event)); |
|
|
|
event_set (accept_event, ls->fd, EV_READ | EV_PERSIST, |
|
|
|
accept_fuzzy_mirror_socket, worker); |
|
|
|
event_base_set (ctx->ev_base, accept_event); |
|
|
|
event_add (accept_event, NULL); |
|
|
|
worker->accept_events = g_list_prepend (worker->accept_events, |
|
|
|
accept_event); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
cur = g_list_next (cur); |
|
|
@@ -1415,6 +1561,7 @@ start_fuzzy (struct rspamd_worker *worker) |
|
|
|
"fuzzy", |
|
|
|
NULL); |
|
|
|
ctx->peer_fd = -1; |
|
|
|
double_to_tv (ctx->master_timeout, &ctx->master_io_tv); |
|
|
|
|
|
|
|
/* |
|
|
|
* Open DB and perform VACUUM |
|
|
@@ -1443,7 +1590,7 @@ start_fuzzy (struct rspamd_worker *worker) |
|
|
|
rspamd_fuzzy_storage_stat, ctx); |
|
|
|
rspamd_control_worker_add_cmd_handler (worker, RSPAMD_CONTROL_FUZZY_SYNC, |
|
|
|
rspamd_fuzzy_storage_sync, ctx); |
|
|
|
/* Create radix tree */ |
|
|
|
/* Create radix trees */ |
|
|
|
if (ctx->update_map != NULL) { |
|
|
|
if (!rspamd_map_is_map (ctx->update_map)) { |
|
|
|
if (!radix_add_generic_iplist (ctx->update_map, |
|
|
@@ -1460,6 +1607,22 @@ start_fuzzy (struct rspamd_worker *worker) |
|
|
|
|
|
|
|
} |
|
|
|
} |
|
|
|
if (ctx->masters_map != NULL) { |
|
|
|
if (!rspamd_map_is_map (ctx->masters_map)) { |
|
|
|
if (!radix_add_generic_iplist (ctx->masters_map, |
|
|
|
&ctx->master_ips)) { |
|
|
|
msg_warn ("cannot load or parse ip list from '%s'", |
|
|
|
ctx->masters_map); |
|
|
|
} |
|
|
|
} |
|
|
|
else { |
|
|
|
rspamd_map_add (worker->srv->cfg, ctx->masters_map, |
|
|
|
"Allow fuzzy master/slave updates from specified addresses", |
|
|
|
rspamd_radix_read, rspamd_radix_fin, |
|
|
|
(void **)&ctx->master_ips); |
|
|
|
|
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
/* Maps events */ |
|
|
|
ctx->resolver = dns_resolver_init (worker->srv->logger, |