summaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
authorVsevolod Stakhov <vsevolod@highsecure.ru>2016-05-06 15:38:40 +0100
committerVsevolod Stakhov <vsevolod@highsecure.ru>2016-05-06 15:38:40 +0100
commit4aff515297321488a4383d7d660ffba4faa3e421 (patch)
tree2e18e596a22e9347b3afcb0babbafced22007018 /src
parentce904384d6711d0d04b43e051c597a11d74ffc36 (diff)
downloadrspamd-4aff515297321488a4383d7d660ffba4faa3e421.tar.gz
rspamd-4aff515297321488a4383d7d660ffba4faa3e421.zip
[Feature] Split main connection from mirrored connections
Diffstat (limited to 'src')
-rw-r--r--src/rspamd_proxy.c85
1 files changed, 47 insertions, 38 deletions
diff --git a/src/rspamd_proxy.c b/src/rspamd_proxy.c
index 626cfdee1..ef85728e3 100644
--- a/src/rspamd_proxy.c
+++ b/src/rspamd_proxy.c
@@ -103,22 +103,30 @@ struct rspamd_proxy_ctx {
lua_State *lua_state;
};
-struct rspamd_proxy_session {
- rspamd_mempool_t *pool;
- struct rspamd_proxy_ctx *ctx;
- struct event_base *ev_base;
+enum rspamd_backend_flags {
+ RSPAMD_BACKEND_REPLIED = 1 << 0,
+ RSPAMD_BACKEND_CLOSED = 1 << 1,
+};
+
+struct rspamd_proxy_backend_connection {
struct rspamd_cryptobox_keypair *local_key;
struct rspamd_cryptobox_pubkey *remote_key;
struct upstream *up;
+ struct rspamd_http_connection *backend_conn;
+ gint backend_sock;
+ enum rspamd_backend_flags flags;
+};
+
+struct rspamd_proxy_session {
+ rspamd_mempool_t *pool;
+ struct rspamd_proxy_ctx *ctx;
rspamd_inet_addr_t *client_addr;
struct rspamd_http_connection *client_conn;
- struct rspamd_http_connection *backend_conn;
- struct rspamd_dns_resolver *resolver;
gpointer map;
gsize map_len;
gint client_sock;
- gint backend_sock;
- gboolean replied;
+ struct rspamd_proxy_backend_connection *master_conn;
+ GPtrArray *mirror_conns;
ref_entry_t ref;
};
@@ -436,20 +444,22 @@ init_rspamd_proxy (struct rspamd_config *cfg)
static void
proxy_session_dtor (struct rspamd_proxy_session *session)
{
- rspamd_inet_address_destroy (session->client_addr);
-
- if (session->backend_conn) {
- rspamd_http_connection_unref (session->backend_conn);
- }
- if (session->client_conn) {
- rspamd_http_connection_unref (session->client_conn);
+ if (session->master_conn) {
+ if (session->master_conn->backend_conn) {
+ rspamd_http_connection_unref (session->master_conn->backend_conn);
+ }
+ close (session->master_conn->backend_sock);
}
if (session->map && session->map_len) {
munmap (session->map, session->map_len);
}
- close (session->backend_sock);
+ if (session->client_conn) {
+ rspamd_http_connection_unref (session->client_conn);
+ }
+
+ rspamd_inet_address_destroy (session->client_addr);
close (session->client_sock);
rspamd_mempool_delete (session->pool);
g_slice_free1 (sizeof (*session), session);
@@ -551,7 +561,7 @@ proxy_client_write_error (struct rspamd_proxy_session *session, gint code)
reply->code = code;
rspamd_http_connection_write_message (session->client_conn,
reply, NULL, NULL, session, session->client_sock,
- &session->ctx->io_tv, session->ev_base);
+ &session->ctx->io_tv, session->ctx->ev_base);
}
static void
@@ -560,9 +570,9 @@ proxy_backend_error_handler (struct rspamd_http_connection *conn, GError *err)
struct rspamd_proxy_session *session = conn->ud;
msg_info_session ("abnormally closing connection from backend: %s, error: %s",
- rspamd_inet_address_to_string (rspamd_upstream_addr (session->up)),
+ rspamd_inet_address_to_string (rspamd_upstream_addr (session->master_conn->up)),
err->message);
- rspamd_http_connection_reset (session->backend_conn);
+ rspamd_http_connection_reset (session->master_conn->backend_conn);
/* Terminate session immediately */
proxy_client_write_error (session, err->code);
}
@@ -573,13 +583,13 @@ proxy_backend_finish_handler (struct rspamd_http_connection *conn,
{
struct rspamd_proxy_session *session = conn->ud;
- rspamd_http_connection_steal_msg (session->backend_conn);
+ rspamd_http_connection_steal_msg (session->master_conn->backend_conn);
rspamd_http_message_remove_header (msg, "Content-Length");
rspamd_http_message_remove_header (msg, "Key");
- rspamd_http_connection_reset (session->backend_conn);
+ rspamd_http_connection_reset (session->master_conn->backend_conn);
rspamd_http_connection_write_message (session->client_conn,
msg, NULL, NULL, session, session->client_sock,
- &session->ctx->io_tv, session->ev_base);
+ &session->ctx->io_tv, session->ctx->ev_base);
return 0;
}
@@ -604,7 +614,9 @@ proxy_client_finish_handler (struct rspamd_http_connection *conn,
const rspamd_ftok_t *host;
gchar hostbuf[512];
- if (!session->replied) {
+ if (!session->master_conn) {
+ session->master_conn = rspamd_mempool_alloc0 (session->pool,
+ sizeof (*session->master_conn));
host = rspamd_http_message_find_header (msg, "Host");
if (host == NULL) {
@@ -625,20 +637,21 @@ proxy_client_finish_handler (struct rspamd_http_connection *conn,
goto err;
}
else {
- session->up = rspamd_upstream_get (backend->u,
+ session->master_conn->up = rspamd_upstream_get (backend->u,
RSPAMD_UPSTREAM_ROUND_ROBIN, NULL, 0);
- if (session->up == NULL) {
+ if (session->master_conn->up == NULL) {
msg_err_session ("cannot select upstream for %s", host ? hostbuf : "default");
goto err;
}
- session->backend_sock = rspamd_inet_address_connect (
- rspamd_upstream_addr (session->up), SOCK_STREAM, TRUE);
+ session->master_conn->backend_sock = rspamd_inet_address_connect (
+ rspamd_upstream_addr (session->master_conn->up),
+ SOCK_STREAM, TRUE);
- if (session->backend_sock == -1) {
+ if (session->master_conn->backend_sock == -1) {
msg_err_session ("cannot connect upstream for %s", host ? hostbuf : "default");
- rspamd_upstream_fail (session->up);
+ rspamd_upstream_fail (session->master_conn->up);
goto err;
}
@@ -651,7 +664,7 @@ proxy_client_finish_handler (struct rspamd_http_connection *conn,
rspamd_http_message_remove_header (msg, "Key");
rspamd_http_connection_reset (session->client_conn);
- session->backend_conn = rspamd_http_connection_new (
+ session->master_conn->backend_conn = rspamd_http_connection_new (
NULL,
proxy_backend_error_handler,
proxy_backend_finish_handler,
@@ -659,14 +672,13 @@ proxy_client_finish_handler (struct rspamd_http_connection *conn,
RSPAMD_HTTP_CLIENT,
session->ctx->keys_cache);
- rspamd_http_connection_set_key (session->backend_conn,
+ rspamd_http_connection_set_key (session->master_conn->backend_conn,
session->ctx->local_key);
msg->peer_key = rspamd_pubkey_ref (backend->key);
- session->replied = TRUE;
- rspamd_http_connection_write_message (session->backend_conn,
- msg, NULL, NULL, session, session->backend_sock,
- &session->ctx->io_tv, session->ev_base);
+ rspamd_http_connection_write_message (session->master_conn->backend_conn,
+ msg, NULL, NULL, session, session->master_conn->backend_sock,
+ &session->ctx->io_tv, session->ctx->ev_base);
}
}
else {
@@ -676,7 +688,6 @@ proxy_client_finish_handler (struct rspamd_http_connection *conn,
return 0;
err:
- session->replied = TRUE;
proxy_client_write_error (session, 404);
return 0;
@@ -708,7 +719,6 @@ proxy_accept_socket (gint fd, short what, void *arg)
session->client_sock = nfd;
session->client_addr = addr;
- session->resolver = ctx->resolver;
session->pool = rspamd_mempool_new (rspamd_mempool_suggest_size (), "proxy");
session->client_conn = rspamd_http_connection_new (
NULL,
@@ -717,7 +727,6 @@ proxy_accept_socket (gint fd, short what, void *arg)
0,
RSPAMD_HTTP_SERVER,
ctx->keys_cache);
- session->ev_base = ctx->ev_base;
session->ctx = ctx;
if (ctx->key) {