aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--src/fuzzy_storage.c174
1 files changed, 172 insertions, 2 deletions
diff --git a/src/fuzzy_storage.c b/src/fuzzy_storage.c
index db30fc6f8..2f7ba0696 100644
--- a/src/fuzzy_storage.c
+++ b/src/fuzzy_storage.c
@@ -225,6 +225,162 @@ fuzzy_key_dtor (gpointer p)
g_slice_free1 (sizeof (*key), key);
}
+struct fuzzy_slave_connection {
+ struct rspamd_cryptobox_keypair *local_key;
+ struct rspamd_cryptobox_pubkey *remote_key;
+ struct upstream *up;
+ struct rspamd_http_connection *http_conn;
+ struct rspamd_fuzzy_mirror *mirror;
+ gint sock;
+};
+
+static void
+fuzzy_mirror_close_connection (struct fuzzy_slave_connection *conn)
+{
+ if (conn) {
+ if (conn->http_conn) {
+ rspamd_http_connection_reset (conn->http_conn);
+ rspamd_http_connection_unref (conn->http_conn);
+ }
+
+ close (conn->sock);
+
+ g_slice_free1 (sizeof (*conn), conn);
+ }
+}
+
+static void
+fuzzy_mirror_updates_to_http (struct rspamd_fuzzy_storage_ctx *ctx,
+ struct rspamd_http_message *msg)
+{
+ GList *cur;
+ struct fuzzy_peer_cmd *io_cmd;
+ gsize len;
+ guint32 rev;
+ const gchar *p;
+
+ rev = rspamd_fuzzy_backend_version (ctx->backend);
+ rev = GUINT32_TO_LE (rev);
+ len = sizeof (guint32) * 2; /* revision + last chunk */
+
+ for (cur = ctx->updates_pending->head; cur != NULL; cur = g_list_next (cur)) {
+ io_cmd = cur->data;
+
+ if (io_cmd->is_shingle) {
+ len += sizeof (guint32) + sizeof (gboolean) +
+ sizeof (struct rspamd_fuzzy_shingle_cmd);
+ }
+ else {
+ len += sizeof (guint32) + sizeof (gboolean) +
+ sizeof (struct rspamd_fuzzy_cmd);
+ }
+ }
+
+ msg->body = rspamd_fstring_sized_new (len);
+ msg->body = rspamd_fstring_append (msg->body, (const char *)&rev,
+ sizeof (rev));
+
+ for (cur = ctx->updates_pending->head; cur != NULL; cur = g_list_next (cur)) {
+ io_cmd = cur->data;
+
+ if (io_cmd->is_shingle) {
+ len = sizeof (gboolean) +
+ sizeof (struct rspamd_fuzzy_shingle_cmd);
+ }
+ else {
+ len = sizeof (gboolean) +
+ sizeof (struct rspamd_fuzzy_cmd);
+ }
+
+ p = (const char *)io_cmd;
+ msg->body = rspamd_fstring_append (msg->body, (const char *)&len,
+ sizeof (len));
+ msg->body = rspamd_fstring_append (msg->body, p, len);
+ }
+
+ /* Last chunk */
+ len = 0;
+ msg->body = rspamd_fstring_append (msg->body, (const char *)&len,
+ sizeof (len));
+}
+
+static void
+fuzzy_mirror_error_handler (struct rspamd_http_connection *conn, GError *err)
+{
+ struct fuzzy_slave_connection *bk_conn = conn->ud;
+ msg_info ("abnormally closing connection from backend: %s:%s, "
+ "error: %e",
+ bk_conn->mirror->name,
+ rspamd_inet_address_to_string (rspamd_upstream_addr (bk_conn->up)),
+ err);
+
+ fuzzy_mirror_close_connection (bk_conn);
+}
+
+static gint
+fuzzy_mirror_finish_handler (struct rspamd_http_connection *conn,
+ struct rspamd_http_message *msg)
+{
+ struct fuzzy_slave_connection *bk_conn = conn->ud;
+
+ msg_info ("finished mirror connection to %s", bk_conn->mirror->name);
+ fuzzy_mirror_close_connection (bk_conn);
+
+ return 0;
+}
+
+static void
+rspamd_fuzzy_send_update_mirror (struct rspamd_fuzzy_storage_ctx *ctx,
+ struct rspamd_fuzzy_mirror *m)
+{
+ struct fuzzy_slave_connection *conn;
+ struct rspamd_http_message *msg;
+ struct timeval tv;
+
+ conn = g_slice_alloc0 (sizeof (*conn));
+ conn->up = rspamd_upstream_get (m->u,
+ RSPAMD_UPSTREAM_MASTER_SLAVE, NULL, 0);
+ conn->mirror = m;
+
+ if (conn->up == NULL) {
+ msg_err ("cannot select upstream for %s", m->name);
+ return;
+ }
+
+ conn->sock = rspamd_inet_address_connect (
+ rspamd_upstream_addr (conn->up),
+ SOCK_STREAM, TRUE);
+
+ if (conn->sock == -1) {
+ msg_err ("cannot connect upstream for %s", m->name);
+ rspamd_upstream_fail (conn->up);
+ return;
+ }
+
+ msg = rspamd_http_new_message (HTTP_REQUEST);
+ rspamd_printf_fstring (&msg->url, "/update_v1");
+
+ conn->http_conn = rspamd_http_connection_new (
+ NULL,
+ fuzzy_mirror_error_handler,
+ fuzzy_mirror_finish_handler,
+ RSPAMD_HTTP_CLIENT_SIMPLE,
+ RSPAMD_HTTP_CLIENT,
+ ctx->keypair_cache);
+
+ rspamd_http_connection_set_key (conn->http_conn,
+ ctx->sync_keypair);
+ msg->peer_key = rspamd_pubkey_ref (m->key);
+ double_to_tv (ctx->sync_timeout, &tv);
+ fuzzy_mirror_updates_to_http (ctx, msg);
+
+ rspamd_http_connection_write_message (conn->http_conn,
+ msg, NULL, NULL, conn,
+ conn->sock,
+ &tv, ctx->ev_base);
+ msg_info ("send update request to %s", m->name);
+}
+
static void
rspamd_fuzzy_process_updates_queue (struct rspamd_fuzzy_storage_ctx *ctx)
{
@@ -232,13 +388,15 @@ rspamd_fuzzy_process_updates_queue (struct rspamd_fuzzy_storage_ctx *ctx)
struct fuzzy_peer_cmd *io_cmd;
struct rspamd_fuzzy_cmd *cmd;
gpointer ptr;
- guint nupdates = 0;
+ struct rspamd_fuzzy_mirror *m;
+ guint nupdates = 0, i;
time_t now = time (NULL);
if (ctx->updates_pending &&
g_queue_get_length (ctx->updates_pending) > 0 &&
rspamd_fuzzy_backend_prepare_update (ctx->backend)) {
cur = ctx->updates_pending->head;
+
while (cur) {
io_cmd = cur->data;
@@ -264,6 +422,14 @@ rspamd_fuzzy_process_updates_queue (struct rspamd_fuzzy_storage_ctx *ctx)
if (rspamd_fuzzy_backend_finish_update (ctx->backend)) {
ctx->stat.fuzzy_hashes = rspamd_fuzzy_backend_count (ctx->backend);
+
+ for (i = 0; i < ctx->mirrors->len; i ++) {
+ m = g_ptr_array_index (ctx->mirrors, i);
+
+ rspamd_fuzzy_send_update_mirror (ctx, m);
+ }
+
+ /* Clear updates */
cur = ctx->updates_pending->head;
while (cur) {
@@ -722,7 +888,7 @@ rspamd_fuzzy_mirror_process_update (struct fuzzy_master_update_session *session,
{
const guchar *p;
gsize remain;
- guint32 revision, our_rev, len;
+ guint32 revision, our_rev, len, cnt = 0;
struct fuzzy_peer_cmd cmd, *pcmd;
enum {
read_len = 0,
@@ -825,6 +991,7 @@ rspamd_fuzzy_mirror_process_update (struct fuzzy_master_update_session *session,
remain -= len;
len = 0;
state = read_len;
+ cnt ++;
break;
case finish_processing:
/* Do nothing */
@@ -840,6 +1007,8 @@ rspamd_fuzzy_mirror_process_update (struct fuzzy_master_update_session *session,
}
rspamd_fuzzy_process_updates_queue (session->ctx);
+ msg_info ("processed updates from the master, %ud operations processed,"
+ " revision: %ud", cnt, revision);
err:
if (updates) {
@@ -1595,6 +1764,7 @@ init_fuzzy (struct rspamd_config *cfg)
(GDestroyNotify) rspamd_inet_address_destroy, g_free,
rspamd_inet_address_hash, rspamd_inet_address_equal);
ctx->cfg = cfg;
+ ctx->mirrors = g_ptr_array_new ();
rspamd_rcl_register_worker_option (cfg,
type,