summaryrefslogtreecommitdiffstats
path: root/src/rspamd_proxy.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/rspamd_proxy.c')
-rw-r--r--src/rspamd_proxy.c126
1 files changed, 125 insertions, 1 deletions
diff --git a/src/rspamd_proxy.c b/src/rspamd_proxy.c
index afbbbbe87..4bf8d2e18 100644
--- a/src/rspamd_proxy.c
+++ b/src/rspamd_proxy.c
@@ -460,6 +460,8 @@ proxy_backend_close_connection (struct rspamd_proxy_backend_connection *conn)
if (conn->results) {
ucl_object_unref (conn->results);
}
+
+ conn->flags |= RSPAMD_BACKEND_CLOSED;
}
static gboolean
@@ -520,6 +522,9 @@ proxy_backend_parse_results (struct rspamd_proxy_session *session,
static void
proxy_session_dtor (struct rspamd_proxy_session *session)
{
+ guint i;
+ struct rspamd_proxy_backend_connection *conn;
+
if (session->master_conn) {
proxy_backend_close_connection (session->master_conn);
}
@@ -533,6 +538,15 @@ proxy_session_dtor (struct rspamd_proxy_session *session)
rspamd_http_connection_unref (session->client_conn);
}
+ for (i = 0; i < session->mirror_conns->len; i ++) {
+ conn = g_ptr_array_index (session->mirror_conns, i);
+
+ if (!(conn->flags & RSPAMD_BACKEND_CLOSED)) {
+ proxy_backend_close_connection (conn);
+ }
+ }
+
+ g_ptr_array_free (session->mirror_conns, TRUE);
rspamd_inet_address_destroy (session->client_addr);
close (session->client_sock);
rspamd_mempool_delete (session->pool);
@@ -627,6 +641,109 @@ proxy_check_file (struct rspamd_http_message *msg,
}
static void
+proxy_backend_mirror_error_handler (struct rspamd_http_connection *conn, GError *err)
+{
+ struct rspamd_proxy_backend_connection *bk_conn = conn->ud;
+ struct rspamd_proxy_session *session;
+
+ session = bk_conn->s;
+ msg_info_session ("abnormally closing connection from backend: %s, error: %s",
+ rspamd_inet_address_to_string (rspamd_upstream_addr (bk_conn->up)),
+ err->message);
+
+ proxy_backend_close_connection (bk_conn);
+}
+
+static gint
+proxy_backend_mirror_finish_handler (struct rspamd_http_connection *conn,
+ struct rspamd_http_message *msg)
+{
+ struct rspamd_proxy_backend_connection *bk_conn = conn->ud;
+ struct rspamd_proxy_session *session;
+
+ session = bk_conn->s;
+
+ if (!proxy_backend_parse_results (session, bk_conn, session->ctx->lua_state,
+ -1, msg->body->str, msg->body->len)) {
+ msg_warn_session ("cannot parse results from the mirror backend %s",
+ bk_conn->name);
+ }
+
+ proxy_backend_close_connection (bk_conn);
+
+ return 0;
+}
+
+static void
+proxy_open_mirror_connections (struct rspamd_proxy_session *session)
+{
+ gdouble coin;
+ struct rspamd_http_mirror *m;
+ guint i;
+ struct rspamd_proxy_backend_connection *bk_conn;
+ struct rspamd_http_message *msg;
+
+ coin = rspamd_random_double ();
+
+ for (i = 0; i < session->ctx->mirrors->len; i ++) {
+ m = g_ptr_array_index (session->ctx->mirrors, i);
+
+ if (m->prob < coin) {
+ /* No luck */
+ continue;
+ }
+
+ bk_conn = rspamd_mempool_alloc0 (session->pool,
+ sizeof (*bk_conn));
+ bk_conn->s = session;
+ bk_conn->name = m->name;
+
+
+ bk_conn->up = rspamd_upstream_get (m->u,
+ RSPAMD_UPSTREAM_ROUND_ROBIN, NULL, 0);
+
+ if (bk_conn->up == NULL) {
+ msg_err_session ("cannot select upstream for %s", m->name);
+ continue;
+ }
+
+ bk_conn->backend_sock = rspamd_inet_address_connect (
+ rspamd_upstream_addr (bk_conn->up),
+ SOCK_STREAM, TRUE);
+
+ if (bk_conn->backend_sock == -1) {
+ msg_err_session ("cannot connect upstream for %s", m->name);
+ rspamd_upstream_fail (bk_conn->up);
+ continue;
+ }
+
+ msg = rspamd_http_connection_copy_msg (session->client_conn);
+ rspamd_http_message_remove_header (msg, "Content-Length");
+ rspamd_http_message_remove_header (msg, "Key");
+
+ bk_conn->backend_conn = rspamd_http_connection_new (
+ NULL,
+ proxy_backend_mirror_error_handler,
+ proxy_backend_mirror_finish_handler,
+ RSPAMD_HTTP_CLIENT_SIMPLE,
+ RSPAMD_HTTP_CLIENT,
+ session->ctx->keys_cache);
+
+ rspamd_http_connection_set_key (bk_conn->backend_conn,
+ session->ctx->local_key);
+ msg->peer_key = rspamd_pubkey_ref (m->key);
+
+ rspamd_http_connection_write_message (bk_conn->backend_conn,
+ msg, NULL, NULL, bk_conn,
+ bk_conn->backend_sock,
+ &session->ctx->io_tv, session->ctx->ev_base);
+
+ g_ptr_array_add (session->mirror_conns, bk_conn);
+ msg_info_session ("send request to %s", m->name);
+ }
+}
+
+static void
proxy_client_write_error (struct rspamd_proxy_session *session, gint code)
{
struct rspamd_http_message *reply;
@@ -648,7 +765,6 @@ proxy_backend_master_error_handler (struct rspamd_http_connection *conn, GError
msg_info_session ("abnormally closing connection from backend: %s, error: %s",
rspamd_inet_address_to_string (rspamd_upstream_addr (session->master_conn->up)),
err->message);
- rspamd_http_connection_reset (session->master_conn->backend_conn);
/* Terminate session immediately */
proxy_client_write_error (session, err->code);
}
@@ -669,6 +785,11 @@ proxy_backend_master_finish_handler (struct rspamd_http_connection *conn,
msg, NULL, NULL, session, session->client_sock,
&session->ctx->io_tv, session->ctx->ev_base);
+ if (!proxy_backend_parse_results (session, bk_conn, session->ctx->lua_state,
+ -1, msg->body->str, msg->body->len)) {
+ msg_warn_session ("cannot parse results from the master backend");
+ }
+
return 0;
}
@@ -680,6 +801,8 @@ proxy_client_error_handler (struct rspamd_http_connection *conn, GError *err)
msg_info_session ("abnormally closing connection from: %s, error: %s",
rspamd_inet_address_to_string (session->client_addr), err->message);
/* Terminate session immediately */
+ proxy_backend_close_connection (session->master_conn);
+ session->master_conn = NULL;
REF_RELEASE (session);
}
@@ -739,6 +862,7 @@ proxy_client_finish_handler (struct rspamd_http_connection *conn,
goto err;
}
+ proxy_open_mirror_connections (session);
rspamd_http_connection_steal_msg (session->client_conn);
rspamd_http_message_remove_header (msg, "Content-Length");
rspamd_http_message_remove_header (msg, "Key");