diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/rspamd_proxy.c | 126 |
1 files changed, 125 insertions, 1 deletions
diff --git a/src/rspamd_proxy.c b/src/rspamd_proxy.c index afbbbbe87..4bf8d2e18 100644 --- a/src/rspamd_proxy.c +++ b/src/rspamd_proxy.c @@ -460,6 +460,8 @@ proxy_backend_close_connection (struct rspamd_proxy_backend_connection *conn) if (conn->results) { ucl_object_unref (conn->results); } + + conn->flags |= RSPAMD_BACKEND_CLOSED; } static gboolean @@ -520,6 +522,9 @@ proxy_backend_parse_results (struct rspamd_proxy_session *session, static void proxy_session_dtor (struct rspamd_proxy_session *session) { + guint i; + struct rspamd_proxy_backend_connection *conn; + if (session->master_conn) { proxy_backend_close_connection (session->master_conn); } @@ -533,6 +538,15 @@ proxy_session_dtor (struct rspamd_proxy_session *session) rspamd_http_connection_unref (session->client_conn); } + for (i = 0; i < session->mirror_conns->len; i ++) { + conn = g_ptr_array_index (session->mirror_conns, i); + + if (!(conn->flags & RSPAMD_BACKEND_CLOSED)) { + proxy_backend_close_connection (conn); + } + } + + g_ptr_array_free (session->mirror_conns, TRUE); rspamd_inet_address_destroy (session->client_addr); close (session->client_sock); rspamd_mempool_delete (session->pool); @@ -627,6 +641,109 @@ proxy_check_file (struct rspamd_http_message *msg, } static void +proxy_backend_mirror_error_handler (struct rspamd_http_connection *conn, GError *err) +{ + struct rspamd_proxy_backend_connection *bk_conn = conn->ud; + struct rspamd_proxy_session *session; + + session = bk_conn->s; + msg_info_session ("abnormally closing connection from backend: %s, error: %s", + rspamd_inet_address_to_string (rspamd_upstream_addr (bk_conn->up)), + err->message); + + proxy_backend_close_connection (bk_conn); +} + +static gint +proxy_backend_mirror_finish_handler (struct rspamd_http_connection *conn, + struct rspamd_http_message *msg) +{ + struct rspamd_proxy_backend_connection *bk_conn = conn->ud; + struct rspamd_proxy_session *session; + + session = bk_conn->s; + + if (!proxy_backend_parse_results (session, bk_conn, session->ctx->lua_state, + -1, msg->body->str, msg->body->len)) { + msg_warn_session ("cannot parse results from the mirror backend %s", + bk_conn->name); + } + + proxy_backend_close_connection (bk_conn); + + return 0; +} + +static void +proxy_open_mirror_connections (struct rspamd_proxy_session *session) +{ + gdouble coin; + struct rspamd_http_mirror *m; + guint i; + struct rspamd_proxy_backend_connection *bk_conn; + struct rspamd_http_message *msg; + + coin = rspamd_random_double (); + + for (i = 0; i < session->ctx->mirrors->len; i ++) { + m = g_ptr_array_index (session->ctx->mirrors, i); + + if (m->prob < coin) { + /* No luck */ + continue; + } + + bk_conn = rspamd_mempool_alloc0 (session->pool, + sizeof (*bk_conn)); + bk_conn->s = session; + bk_conn->name = m->name; + + + bk_conn->up = rspamd_upstream_get (m->u, + RSPAMD_UPSTREAM_ROUND_ROBIN, NULL, 0); + + if (bk_conn->up == NULL) { + msg_err_session ("cannot select upstream for %s", m->name); + continue; + } + + bk_conn->backend_sock = rspamd_inet_address_connect ( + rspamd_upstream_addr (bk_conn->up), + SOCK_STREAM, TRUE); + + if (bk_conn->backend_sock == -1) { + msg_err_session ("cannot connect upstream for %s", m->name); + rspamd_upstream_fail (bk_conn->up); + continue; + } + + msg = rspamd_http_connection_copy_msg (session->client_conn); + rspamd_http_message_remove_header (msg, "Content-Length"); + rspamd_http_message_remove_header (msg, "Key"); + + bk_conn->backend_conn = rspamd_http_connection_new ( + NULL, + proxy_backend_mirror_error_handler, + proxy_backend_mirror_finish_handler, + RSPAMD_HTTP_CLIENT_SIMPLE, + RSPAMD_HTTP_CLIENT, + session->ctx->keys_cache); + + rspamd_http_connection_set_key (bk_conn->backend_conn, + session->ctx->local_key); + msg->peer_key = rspamd_pubkey_ref (m->key); + + rspamd_http_connection_write_message (bk_conn->backend_conn, + msg, NULL, NULL, bk_conn, + bk_conn->backend_sock, + &session->ctx->io_tv, session->ctx->ev_base); + + g_ptr_array_add (session->mirror_conns, bk_conn); + msg_info_session ("send request to %s", m->name); + } +} + +static void proxy_client_write_error (struct rspamd_proxy_session *session, gint code) { struct rspamd_http_message *reply; @@ -648,7 +765,6 @@ proxy_backend_master_error_handler (struct rspamd_http_connection *conn, GError msg_info_session ("abnormally closing connection from backend: %s, error: %s", rspamd_inet_address_to_string (rspamd_upstream_addr (session->master_conn->up)), err->message); - rspamd_http_connection_reset (session->master_conn->backend_conn); /* Terminate session immediately */ proxy_client_write_error (session, err->code); } @@ -669,6 +785,11 @@ proxy_backend_master_finish_handler (struct rspamd_http_connection *conn, msg, NULL, NULL, session, session->client_sock, &session->ctx->io_tv, session->ctx->ev_base); + if (!proxy_backend_parse_results (session, bk_conn, session->ctx->lua_state, + -1, msg->body->str, msg->body->len)) { + msg_warn_session ("cannot parse results from the master backend"); + } + return 0; } @@ -680,6 +801,8 @@ proxy_client_error_handler (struct rspamd_http_connection *conn, GError *err) msg_info_session ("abnormally closing connection from: %s, error: %s", rspamd_inet_address_to_string (session->client_addr), err->message); /* Terminate session immediately */ + proxy_backend_close_connection (session->master_conn); + session->master_conn = NULL; REF_RELEASE (session); } @@ -739,6 +862,7 @@ proxy_client_finish_handler (struct rspamd_http_connection *conn, goto err; } + proxy_open_mirror_connections (session); rspamd_http_connection_steal_msg (session->client_conn); rspamd_http_message_remove_header (msg, "Content-Length"); rspamd_http_message_remove_header (msg, "Key"); |