diff options
Diffstat (limited to 'src/fuzzy_storage.c')
-rw-r--r-- | src/fuzzy_storage.c | 174 |
1 files changed, 172 insertions, 2 deletions
diff --git a/src/fuzzy_storage.c b/src/fuzzy_storage.c index db30fc6f8..2f7ba0696 100644 --- a/src/fuzzy_storage.c +++ b/src/fuzzy_storage.c @@ -225,6 +225,162 @@ fuzzy_key_dtor (gpointer p) g_slice_free1 (sizeof (*key), key); } +struct fuzzy_slave_connection { + struct rspamd_cryptobox_keypair *local_key; + struct rspamd_cryptobox_pubkey *remote_key; + struct upstream *up; + struct rspamd_http_connection *http_conn; + struct rspamd_fuzzy_mirror *mirror; + gint sock; +}; + +static void +fuzzy_mirror_close_connection (struct fuzzy_slave_connection *conn) +{ + if (conn) { + if (conn->http_conn) { + rspamd_http_connection_reset (conn->http_conn); + rspamd_http_connection_unref (conn->http_conn); + } + + close (conn->sock); + + g_slice_free1 (sizeof (*conn), conn); + } +} + +static void +fuzzy_mirror_updates_to_http (struct rspamd_fuzzy_storage_ctx *ctx, + struct rspamd_http_message *msg) +{ + GList *cur; + struct fuzzy_peer_cmd *io_cmd; + gsize len; + guint32 rev; + const gchar *p; + + rev = rspamd_fuzzy_backend_version (ctx->backend); + rev = GUINT32_TO_LE (rev); + len = sizeof (guint32) * 2; /* revision + last chunk */ + + for (cur = ctx->updates_pending->head; cur != NULL; cur = g_list_next (cur)) { + io_cmd = cur->data; + + if (io_cmd->is_shingle) { + len += sizeof (guint32) + sizeof (gboolean) + + sizeof (struct rspamd_fuzzy_shingle_cmd); + } + else { + len += sizeof (guint32) + sizeof (gboolean) + + sizeof (struct rspamd_fuzzy_cmd); + } + } + + msg->body = rspamd_fstring_sized_new (len); + msg->body = rspamd_fstring_append (msg->body, (const char *)&rev, + sizeof (rev)); + + for (cur = ctx->updates_pending->head; cur != NULL; cur = g_list_next (cur)) { + io_cmd = cur->data; + + if (io_cmd->is_shingle) { + len = sizeof (gboolean) + + sizeof (struct rspamd_fuzzy_shingle_cmd); + } + else { + len = sizeof (gboolean) + + sizeof (struct rspamd_fuzzy_cmd); + } + + p = (const char *)io_cmd; + msg->body = rspamd_fstring_append (msg->body, (const char *)&len, + sizeof (len)); + msg->body = rspamd_fstring_append (msg->body, p, len); + } + + /* Last chunk */ + len = 0; + msg->body = rspamd_fstring_append (msg->body, (const char *)&len, + sizeof (len)); +} + +static void +fuzzy_mirror_error_handler (struct rspamd_http_connection *conn, GError *err) +{ + struct fuzzy_slave_connection *bk_conn = conn->ud; + msg_info ("abnormally closing connection from backend: %s:%s, " + "error: %e", + bk_conn->mirror->name, + rspamd_inet_address_to_string (rspamd_upstream_addr (bk_conn->up)), + err); + + fuzzy_mirror_close_connection (bk_conn); +} + +static gint +fuzzy_mirror_finish_handler (struct rspamd_http_connection *conn, + struct rspamd_http_message *msg) +{ + struct fuzzy_slave_connection *bk_conn = conn->ud; + + msg_info ("finished mirror connection to %s", bk_conn->mirror->name); + fuzzy_mirror_close_connection (bk_conn); + + return 0; +} + +static void +rspamd_fuzzy_send_update_mirror (struct rspamd_fuzzy_storage_ctx *ctx, + struct rspamd_fuzzy_mirror *m) +{ + struct fuzzy_slave_connection *conn; + struct rspamd_http_message *msg; + struct timeval tv; + + conn = g_slice_alloc0 (sizeof (*conn)); + conn->up = rspamd_upstream_get (m->u, + RSPAMD_UPSTREAM_MASTER_SLAVE, NULL, 0); + conn->mirror = m; + + if (conn->up == NULL) { + msg_err ("cannot select upstream for %s", m->name); + return; + } + + conn->sock = rspamd_inet_address_connect ( + rspamd_upstream_addr (conn->up), + SOCK_STREAM, TRUE); + + if (conn->sock == -1) { + msg_err ("cannot connect upstream for %s", m->name); + rspamd_upstream_fail (conn->up); + return; + } + + msg = rspamd_http_new_message (HTTP_REQUEST); + rspamd_printf_fstring (&msg->url, "/update_v1"); + + conn->http_conn = rspamd_http_connection_new ( + NULL, + fuzzy_mirror_error_handler, + fuzzy_mirror_finish_handler, + RSPAMD_HTTP_CLIENT_SIMPLE, + RSPAMD_HTTP_CLIENT, + ctx->keypair_cache); + + rspamd_http_connection_set_key (conn->http_conn, + ctx->sync_keypair); + msg->peer_key = rspamd_pubkey_ref (m->key); + double_to_tv (ctx->sync_timeout, &tv); + fuzzy_mirror_updates_to_http (ctx, msg); + + rspamd_http_connection_write_message (conn->http_conn, + msg, NULL, NULL, conn, + conn->sock, + &tv, ctx->ev_base); + msg_info ("send update request to %s", m->name); +} + static void rspamd_fuzzy_process_updates_queue (struct rspamd_fuzzy_storage_ctx *ctx) { @@ -232,13 +388,15 @@ rspamd_fuzzy_process_updates_queue (struct rspamd_fuzzy_storage_ctx *ctx) struct fuzzy_peer_cmd *io_cmd; struct rspamd_fuzzy_cmd *cmd; gpointer ptr; - guint nupdates = 0; + struct rspamd_fuzzy_mirror *m; + guint nupdates = 0, i; time_t now = time (NULL); if (ctx->updates_pending && g_queue_get_length (ctx->updates_pending) > 0 && rspamd_fuzzy_backend_prepare_update (ctx->backend)) { cur = ctx->updates_pending->head; + while (cur) { io_cmd = cur->data; @@ -264,6 +422,14 @@ rspamd_fuzzy_process_updates_queue (struct rspamd_fuzzy_storage_ctx *ctx) if (rspamd_fuzzy_backend_finish_update (ctx->backend)) { ctx->stat.fuzzy_hashes = rspamd_fuzzy_backend_count (ctx->backend); + + for (i = 0; i < ctx->mirrors->len; i ++) { + m = g_ptr_array_index (ctx->mirrors, i); + + rspamd_fuzzy_send_update_mirror (ctx, m); + } + + /* Clear updates */ cur = ctx->updates_pending->head; while (cur) { @@ -722,7 +888,7 @@ rspamd_fuzzy_mirror_process_update (struct fuzzy_master_update_session *session, { const guchar *p; gsize remain; - guint32 revision, our_rev, len; + guint32 revision, our_rev, len, cnt = 0; struct fuzzy_peer_cmd cmd, *pcmd; enum { read_len = 0, @@ -825,6 +991,7 @@ rspamd_fuzzy_mirror_process_update (struct fuzzy_master_update_session *session, remain -= len; len = 0; state = read_len; + cnt ++; break; case finish_processing: /* Do nothing */ @@ -840,6 +1007,8 @@ rspamd_fuzzy_mirror_process_update (struct fuzzy_master_update_session *session, } rspamd_fuzzy_process_updates_queue (session->ctx); + msg_info ("processed updates from the master, %ud operations processed," + " revision: %ud", cnt, revision); err: if (updates) { @@ -1595,6 +1764,7 @@ init_fuzzy (struct rspamd_config *cfg) (GDestroyNotify) rspamd_inet_address_destroy, g_free, rspamd_inet_address_hash, rspamd_inet_address_equal); ctx->cfg = cfg; + ctx->mirrors = g_ptr_array_new (); rspamd_rcl_register_worker_option (cfg, type, |