lua_State *lua_state;
};
-struct rspamd_proxy_session {
- rspamd_mempool_t *pool;
- struct rspamd_proxy_ctx *ctx;
- struct event_base *ev_base;
+enum rspamd_backend_flags {
+ RSPAMD_BACKEND_REPLIED = 1 << 0,
+ RSPAMD_BACKEND_CLOSED = 1 << 1,
+};
+
+struct rspamd_proxy_backend_connection {
struct rspamd_cryptobox_keypair *local_key;
struct rspamd_cryptobox_pubkey *remote_key;
struct upstream *up;
+ struct rspamd_http_connection *backend_conn;
+ gint backend_sock;
+ enum rspamd_backend_flags flags;
+};
+
+struct rspamd_proxy_session {
+ rspamd_mempool_t *pool;
+ struct rspamd_proxy_ctx *ctx;
rspamd_inet_addr_t *client_addr;
struct rspamd_http_connection *client_conn;
- struct rspamd_http_connection *backend_conn;
- struct rspamd_dns_resolver *resolver;
gpointer map;
gsize map_len;
gint client_sock;
- gint backend_sock;
- gboolean replied;
+ struct rspamd_proxy_backend_connection *master_conn;
+ GPtrArray *mirror_conns;
ref_entry_t ref;
};
static void
proxy_session_dtor (struct rspamd_proxy_session *session)
{
- rspamd_inet_address_destroy (session->client_addr);
-
- if (session->backend_conn) {
- rspamd_http_connection_unref (session->backend_conn);
- }
- if (session->client_conn) {
- rspamd_http_connection_unref (session->client_conn);
+ if (session->master_conn) {
+ if (session->master_conn->backend_conn) {
+ rspamd_http_connection_unref (session->master_conn->backend_conn);
+ }
+ close (session->master_conn->backend_sock);
}
if (session->map && session->map_len) {
munmap (session->map, session->map_len);
}
- close (session->backend_sock);
+ if (session->client_conn) {
+ rspamd_http_connection_unref (session->client_conn);
+ }
+
+ rspamd_inet_address_destroy (session->client_addr);
close (session->client_sock);
rspamd_mempool_delete (session->pool);
g_slice_free1 (sizeof (*session), session);
reply->code = code;
rspamd_http_connection_write_message (session->client_conn,
reply, NULL, NULL, session, session->client_sock,
- &session->ctx->io_tv, session->ev_base);
+ &session->ctx->io_tv, session->ctx->ev_base);
}
static void
struct rspamd_proxy_session *session = conn->ud;
msg_info_session ("abnormally closing connection from backend: %s, error: %s",
- rspamd_inet_address_to_string (rspamd_upstream_addr (session->up)),
+ rspamd_inet_address_to_string (rspamd_upstream_addr (session->master_conn->up)),
err->message);
- rspamd_http_connection_reset (session->backend_conn);
+ rspamd_http_connection_reset (session->master_conn->backend_conn);
/* Terminate session immediately */
proxy_client_write_error (session, err->code);
}
{
struct rspamd_proxy_session *session = conn->ud;
- rspamd_http_connection_steal_msg (session->backend_conn);
+ rspamd_http_connection_steal_msg (session->master_conn->backend_conn);
rspamd_http_message_remove_header (msg, "Content-Length");
rspamd_http_message_remove_header (msg, "Key");
- rspamd_http_connection_reset (session->backend_conn);
+ rspamd_http_connection_reset (session->master_conn->backend_conn);
rspamd_http_connection_write_message (session->client_conn,
msg, NULL, NULL, session, session->client_sock,
- &session->ctx->io_tv, session->ev_base);
+ &session->ctx->io_tv, session->ctx->ev_base);
return 0;
}
const rspamd_ftok_t *host;
gchar hostbuf[512];
- if (!session->replied) {
+ if (!session->master_conn) {
+ session->master_conn = rspamd_mempool_alloc0 (session->pool,
+ sizeof (*session->master_conn));
host = rspamd_http_message_find_header (msg, "Host");
if (host == NULL) {
goto err;
}
else {
- session->up = rspamd_upstream_get (backend->u,
+ session->master_conn->up = rspamd_upstream_get (backend->u,
RSPAMD_UPSTREAM_ROUND_ROBIN, NULL, 0);
- if (session->up == NULL) {
+ if (session->master_conn->up == NULL) {
msg_err_session ("cannot select upstream for %s", host ? hostbuf : "default");
goto err;
}
- session->backend_sock = rspamd_inet_address_connect (
- rspamd_upstream_addr (session->up), SOCK_STREAM, TRUE);
+ session->master_conn->backend_sock = rspamd_inet_address_connect (
+ rspamd_upstream_addr (session->master_conn->up),
+ SOCK_STREAM, TRUE);
- if (session->backend_sock == -1) {
+ if (session->master_conn->backend_sock == -1) {
msg_err_session ("cannot connect upstream for %s", host ? hostbuf : "default");
- rspamd_upstream_fail (session->up);
+ rspamd_upstream_fail (session->master_conn->up);
goto err;
}
rspamd_http_message_remove_header (msg, "Key");
rspamd_http_connection_reset (session->client_conn);
- session->backend_conn = rspamd_http_connection_new (
+ session->master_conn->backend_conn = rspamd_http_connection_new (
NULL,
proxy_backend_error_handler,
proxy_backend_finish_handler,
RSPAMD_HTTP_CLIENT,
session->ctx->keys_cache);
- rspamd_http_connection_set_key (session->backend_conn,
+ rspamd_http_connection_set_key (session->master_conn->backend_conn,
session->ctx->local_key);
msg->peer_key = rspamd_pubkey_ref (backend->key);
- session->replied = TRUE;
- rspamd_http_connection_write_message (session->backend_conn,
- msg, NULL, NULL, session, session->backend_sock,
- &session->ctx->io_tv, session->ev_base);
+ rspamd_http_connection_write_message (session->master_conn->backend_conn,
+ msg, NULL, NULL, session, session->master_conn->backend_sock,
+ &session->ctx->io_tv, session->ctx->ev_base);
}
}
else {
return 0;
err:
- session->replied = TRUE;
proxy_client_write_error (session, 404);
return 0;
session->client_sock = nfd;
session->client_addr = addr;
- session->resolver = ctx->resolver;
session->pool = rspamd_mempool_new (rspamd_mempool_suggest_size (), "proxy");
session->client_conn = rspamd_http_connection_new (
NULL,
0,
RSPAMD_HTTP_SERVER,
ctx->keys_cache);
- session->ev_base = ctx->ev_base;
session->ctx = ctx;
if (ctx->key) {