diff options
author | Vsevolod Stakhov <vsevolod@highsecure.ru> | 2016-05-06 15:38:40 +0100 |
---|---|---|
committer | Vsevolod Stakhov <vsevolod@highsecure.ru> | 2016-05-06 15:38:40 +0100 |
commit | 4aff515297321488a4383d7d660ffba4faa3e421 (patch) | |
tree | 2e18e596a22e9347b3afcb0babbafced22007018 /src | |
parent | ce904384d6711d0d04b43e051c597a11d74ffc36 (diff) | |
download | rspamd-4aff515297321488a4383d7d660ffba4faa3e421.tar.gz rspamd-4aff515297321488a4383d7d660ffba4faa3e421.zip |
[Feature] Split main connection from mirrored connections
Diffstat (limited to 'src')
-rw-r--r-- | src/rspamd_proxy.c | 85 |
1 files changed, 47 insertions, 38 deletions
diff --git a/src/rspamd_proxy.c b/src/rspamd_proxy.c index 626cfdee1..ef85728e3 100644 --- a/src/rspamd_proxy.c +++ b/src/rspamd_proxy.c @@ -103,22 +103,30 @@ struct rspamd_proxy_ctx { lua_State *lua_state; }; -struct rspamd_proxy_session { - rspamd_mempool_t *pool; - struct rspamd_proxy_ctx *ctx; - struct event_base *ev_base; +enum rspamd_backend_flags { + RSPAMD_BACKEND_REPLIED = 1 << 0, + RSPAMD_BACKEND_CLOSED = 1 << 1, +}; + +struct rspamd_proxy_backend_connection { struct rspamd_cryptobox_keypair *local_key; struct rspamd_cryptobox_pubkey *remote_key; struct upstream *up; + struct rspamd_http_connection *backend_conn; + gint backend_sock; + enum rspamd_backend_flags flags; +}; + +struct rspamd_proxy_session { + rspamd_mempool_t *pool; + struct rspamd_proxy_ctx *ctx; rspamd_inet_addr_t *client_addr; struct rspamd_http_connection *client_conn; - struct rspamd_http_connection *backend_conn; - struct rspamd_dns_resolver *resolver; gpointer map; gsize map_len; gint client_sock; - gint backend_sock; - gboolean replied; + struct rspamd_proxy_backend_connection *master_conn; + GPtrArray *mirror_conns; ref_entry_t ref; }; @@ -436,20 +444,22 @@ init_rspamd_proxy (struct rspamd_config *cfg) static void proxy_session_dtor (struct rspamd_proxy_session *session) { - rspamd_inet_address_destroy (session->client_addr); - - if (session->backend_conn) { - rspamd_http_connection_unref (session->backend_conn); - } - if (session->client_conn) { - rspamd_http_connection_unref (session->client_conn); + if (session->master_conn) { + if (session->master_conn->backend_conn) { + rspamd_http_connection_unref (session->master_conn->backend_conn); + } + close (session->master_conn->backend_sock); } if (session->map && session->map_len) { munmap (session->map, session->map_len); } - close (session->backend_sock); + if (session->client_conn) { + rspamd_http_connection_unref (session->client_conn); + } + + rspamd_inet_address_destroy (session->client_addr); close (session->client_sock); rspamd_mempool_delete (session->pool); g_slice_free1 (sizeof (*session), session); @@ -551,7 +561,7 @@ proxy_client_write_error (struct rspamd_proxy_session *session, gint code) reply->code = code; rspamd_http_connection_write_message (session->client_conn, reply, NULL, NULL, session, session->client_sock, - &session->ctx->io_tv, session->ev_base); + &session->ctx->io_tv, session->ctx->ev_base); } static void @@ -560,9 +570,9 @@ proxy_backend_error_handler (struct rspamd_http_connection *conn, GError *err) struct rspamd_proxy_session *session = conn->ud; msg_info_session ("abnormally closing connection from backend: %s, error: %s", - rspamd_inet_address_to_string (rspamd_upstream_addr (session->up)), + rspamd_inet_address_to_string (rspamd_upstream_addr (session->master_conn->up)), err->message); - rspamd_http_connection_reset (session->backend_conn); + rspamd_http_connection_reset (session->master_conn->backend_conn); /* Terminate session immediately */ proxy_client_write_error (session, err->code); } @@ -573,13 +583,13 @@ proxy_backend_finish_handler (struct rspamd_http_connection *conn, { struct rspamd_proxy_session *session = conn->ud; - rspamd_http_connection_steal_msg (session->backend_conn); + rspamd_http_connection_steal_msg (session->master_conn->backend_conn); rspamd_http_message_remove_header (msg, "Content-Length"); rspamd_http_message_remove_header (msg, "Key"); - rspamd_http_connection_reset (session->backend_conn); + rspamd_http_connection_reset (session->master_conn->backend_conn); rspamd_http_connection_write_message (session->client_conn, msg, NULL, NULL, session, session->client_sock, - &session->ctx->io_tv, session->ev_base); + &session->ctx->io_tv, session->ctx->ev_base); return 0; } @@ -604,7 +614,9 @@ proxy_client_finish_handler (struct rspamd_http_connection *conn, const rspamd_ftok_t *host; gchar hostbuf[512]; - if (!session->replied) { + if (!session->master_conn) { + session->master_conn = rspamd_mempool_alloc0 (session->pool, + sizeof (*session->master_conn)); host = rspamd_http_message_find_header (msg, "Host"); if (host == NULL) { @@ -625,20 +637,21 @@ proxy_client_finish_handler (struct rspamd_http_connection *conn, goto err; } else { - session->up = rspamd_upstream_get (backend->u, + session->master_conn->up = rspamd_upstream_get (backend->u, RSPAMD_UPSTREAM_ROUND_ROBIN, NULL, 0); - if (session->up == NULL) { + if (session->master_conn->up == NULL) { msg_err_session ("cannot select upstream for %s", host ? hostbuf : "default"); goto err; } - session->backend_sock = rspamd_inet_address_connect ( - rspamd_upstream_addr (session->up), SOCK_STREAM, TRUE); + session->master_conn->backend_sock = rspamd_inet_address_connect ( + rspamd_upstream_addr (session->master_conn->up), + SOCK_STREAM, TRUE); - if (session->backend_sock == -1) { + if (session->master_conn->backend_sock == -1) { msg_err_session ("cannot connect upstream for %s", host ? hostbuf : "default"); - rspamd_upstream_fail (session->up); + rspamd_upstream_fail (session->master_conn->up); goto err; } @@ -651,7 +664,7 @@ proxy_client_finish_handler (struct rspamd_http_connection *conn, rspamd_http_message_remove_header (msg, "Key"); rspamd_http_connection_reset (session->client_conn); - session->backend_conn = rspamd_http_connection_new ( + session->master_conn->backend_conn = rspamd_http_connection_new ( NULL, proxy_backend_error_handler, proxy_backend_finish_handler, @@ -659,14 +672,13 @@ proxy_client_finish_handler (struct rspamd_http_connection *conn, RSPAMD_HTTP_CLIENT, session->ctx->keys_cache); - rspamd_http_connection_set_key (session->backend_conn, + rspamd_http_connection_set_key (session->master_conn->backend_conn, session->ctx->local_key); msg->peer_key = rspamd_pubkey_ref (backend->key); - session->replied = TRUE; - rspamd_http_connection_write_message (session->backend_conn, - msg, NULL, NULL, session, session->backend_sock, - &session->ctx->io_tv, session->ev_base); + rspamd_http_connection_write_message (session->master_conn->backend_conn, + msg, NULL, NULL, session, session->master_conn->backend_sock, + &session->ctx->io_tv, session->ctx->ev_base); } } else { @@ -676,7 +688,6 @@ proxy_client_finish_handler (struct rspamd_http_connection *conn, return 0; err: - session->replied = TRUE; proxy_client_write_error (session, 404); return 0; @@ -708,7 +719,6 @@ proxy_accept_socket (gint fd, short what, void *arg) session->client_sock = nfd; session->client_addr = addr; - session->resolver = ctx->resolver; session->pool = rspamd_mempool_new (rspamd_mempool_suggest_size (), "proxy"); session->client_conn = rspamd_http_connection_new ( NULL, @@ -717,7 +727,6 @@ proxy_accept_socket (gint fd, short what, void *arg) 0, RSPAMD_HTTP_SERVER, ctx->keys_cache); - session->ev_base = ctx->ev_base; session->ctx = ctx; if (ctx->key) { |