]> source.dussan.org Git - rspamd.git/commitdiff
[Feature] Split main connection from mirrored connections
authorVsevolod Stakhov <vsevolod@highsecure.ru>
Fri, 6 May 2016 14:38:40 +0000 (15:38 +0100)
committerVsevolod Stakhov <vsevolod@highsecure.ru>
Fri, 6 May 2016 14:38:40 +0000 (15:38 +0100)
src/rspamd_proxy.c

index 626cfdee1fba35de22fb08beac6756a4c6adc249..ef85728e32310a5229b7f763a30cbaf861471c83 100644 (file)
@@ -103,22 +103,30 @@ struct rspamd_proxy_ctx {
        lua_State *lua_state;
 };
 
-struct rspamd_proxy_session {
-       rspamd_mempool_t *pool;
-       struct rspamd_proxy_ctx *ctx;
-       struct event_base *ev_base;
+enum rspamd_backend_flags {
+       RSPAMD_BACKEND_REPLIED = 1 << 0,
+       RSPAMD_BACKEND_CLOSED = 1 << 1,
+};
+
+struct rspamd_proxy_backend_connection {
        struct rspamd_cryptobox_keypair *local_key;
        struct rspamd_cryptobox_pubkey *remote_key;
        struct upstream *up;
+       struct rspamd_http_connection *backend_conn;
+       gint backend_sock;
+       enum rspamd_backend_flags flags;
+};
+
+struct rspamd_proxy_session {
+       rspamd_mempool_t *pool;
+       struct rspamd_proxy_ctx *ctx;
        rspamd_inet_addr_t *client_addr;
        struct rspamd_http_connection *client_conn;
-       struct rspamd_http_connection *backend_conn;
-       struct rspamd_dns_resolver *resolver;
        gpointer map;
        gsize map_len;
        gint client_sock;
-       gint backend_sock;
-       gboolean replied;
+       struct rspamd_proxy_backend_connection *master_conn;
+       GPtrArray *mirror_conns;
        ref_entry_t ref;
 };
 
@@ -436,20 +444,22 @@ init_rspamd_proxy (struct rspamd_config *cfg)
 static void
 proxy_session_dtor (struct rspamd_proxy_session *session)
 {
-       rspamd_inet_address_destroy (session->client_addr);
-
-       if (session->backend_conn) {
-               rspamd_http_connection_unref (session->backend_conn);
-       }
-       if (session->client_conn) {
-               rspamd_http_connection_unref (session->client_conn);
+       if (session->master_conn) {
+               if (session->master_conn->backend_conn) {
+                       rspamd_http_connection_unref (session->master_conn->backend_conn);
+               }
+               close (session->master_conn->backend_sock);
        }
 
        if (session->map && session->map_len) {
                munmap (session->map, session->map_len);
        }
 
-       close (session->backend_sock);
+       if (session->client_conn) {
+               rspamd_http_connection_unref (session->client_conn);
+       }
+
+       rspamd_inet_address_destroy (session->client_addr);
        close (session->client_sock);
        rspamd_mempool_delete (session->pool);
        g_slice_free1 (sizeof (*session), session);
@@ -551,7 +561,7 @@ proxy_client_write_error (struct rspamd_proxy_session *session, gint code)
        reply->code = code;
        rspamd_http_connection_write_message (session->client_conn,
                        reply, NULL, NULL, session, session->client_sock,
-                       &session->ctx->io_tv, session->ev_base);
+                       &session->ctx->io_tv, session->ctx->ev_base);
 }
 
 static void
@@ -560,9 +570,9 @@ proxy_backend_error_handler (struct rspamd_http_connection *conn, GError *err)
        struct rspamd_proxy_session *session = conn->ud;
 
        msg_info_session ("abnormally closing connection from backend: %s, error: %s",
-               rspamd_inet_address_to_string (rspamd_upstream_addr (session->up)),
+               rspamd_inet_address_to_string (rspamd_upstream_addr (session->master_conn->up)),
                err->message);
-       rspamd_http_connection_reset (session->backend_conn);
+       rspamd_http_connection_reset (session->master_conn->backend_conn);
        /* Terminate session immediately */
        proxy_client_write_error (session, err->code);
 }
@@ -573,13 +583,13 @@ proxy_backend_finish_handler (struct rspamd_http_connection *conn,
 {
        struct rspamd_proxy_session *session = conn->ud;
 
-       rspamd_http_connection_steal_msg (session->backend_conn);
+       rspamd_http_connection_steal_msg (session->master_conn->backend_conn);
        rspamd_http_message_remove_header (msg, "Content-Length");
        rspamd_http_message_remove_header (msg, "Key");
-       rspamd_http_connection_reset (session->backend_conn);
+       rspamd_http_connection_reset (session->master_conn->backend_conn);
        rspamd_http_connection_write_message (session->client_conn,
                msg, NULL, NULL, session, session->client_sock,
-               &session->ctx->io_tv, session->ev_base);
+               &session->ctx->io_tv, session->ctx->ev_base);
 
        return 0;
 }
@@ -604,7 +614,9 @@ proxy_client_finish_handler (struct rspamd_http_connection *conn,
        const rspamd_ftok_t *host;
        gchar hostbuf[512];
 
-       if (!session->replied) {
+       if (!session->master_conn) {
+               session->master_conn = rspamd_mempool_alloc0 (session->pool,
+                               sizeof (*session->master_conn));
                host = rspamd_http_message_find_header (msg, "Host");
 
                if (host == NULL) {
@@ -625,20 +637,21 @@ proxy_client_finish_handler (struct rspamd_http_connection *conn,
                        goto err;
                }
                else {
-                       session->up = rspamd_upstream_get (backend->u,
+                       session->master_conn->up = rspamd_upstream_get (backend->u,
                                        RSPAMD_UPSTREAM_ROUND_ROBIN, NULL, 0);
 
-                       if (session->up == NULL) {
+                       if (session->master_conn->up == NULL) {
                                msg_err_session ("cannot select upstream for %s", host ? hostbuf : "default");
                                goto err;
                        }
 
-                       session->backend_sock = rspamd_inet_address_connect (
-                                       rspamd_upstream_addr (session->up), SOCK_STREAM, TRUE);
+                       session->master_conn->backend_sock = rspamd_inet_address_connect (
+                                       rspamd_upstream_addr (session->master_conn->up),
+                                       SOCK_STREAM, TRUE);
 
-                       if (session->backend_sock == -1) {
+                       if (session->master_conn->backend_sock == -1) {
                                msg_err_session ("cannot connect upstream for %s", host ? hostbuf : "default");
-                               rspamd_upstream_fail (session->up);
+                               rspamd_upstream_fail (session->master_conn->up);
                                goto err;
                        }
 
@@ -651,7 +664,7 @@ proxy_client_finish_handler (struct rspamd_http_connection *conn,
                        rspamd_http_message_remove_header (msg, "Key");
                        rspamd_http_connection_reset (session->client_conn);
 
-                       session->backend_conn = rspamd_http_connection_new (
+                       session->master_conn->backend_conn = rspamd_http_connection_new (
                                        NULL,
                                        proxy_backend_error_handler,
                                        proxy_backend_finish_handler,
@@ -659,14 +672,13 @@ proxy_client_finish_handler (struct rspamd_http_connection *conn,
                                        RSPAMD_HTTP_CLIENT,
                                        session->ctx->keys_cache);
 
-                       rspamd_http_connection_set_key (session->backend_conn,
+                       rspamd_http_connection_set_key (session->master_conn->backend_conn,
                                        session->ctx->local_key);
                        msg->peer_key = rspamd_pubkey_ref (backend->key);
-                       session->replied = TRUE;
 
-                       rspamd_http_connection_write_message (session->backend_conn,
-                               msg, NULL, NULL, session, session->backend_sock,
-                               &session->ctx->io_tv, session->ev_base);
+                       rspamd_http_connection_write_message (session->master_conn->backend_conn,
+                               msg, NULL, NULL, session, session->master_conn->backend_sock,
+                               &session->ctx->io_tv, session->ctx->ev_base);
                }
        }
        else {
@@ -676,7 +688,6 @@ proxy_client_finish_handler (struct rspamd_http_connection *conn,
        return 0;
 
 err:
-       session->replied = TRUE;
        proxy_client_write_error (session, 404);
 
        return 0;
@@ -708,7 +719,6 @@ proxy_accept_socket (gint fd, short what, void *arg)
        session->client_sock = nfd;
        session->client_addr = addr;
 
-       session->resolver = ctx->resolver;
        session->pool = rspamd_mempool_new (rspamd_mempool_suggest_size (), "proxy");
        session->client_conn = rspamd_http_connection_new (
                NULL,
@@ -717,7 +727,6 @@ proxy_accept_socket (gint fd, short what, void *arg)
                0,
                RSPAMD_HTTP_SERVER,
                ctx->keys_cache);
-       session->ev_base = ctx->ev_base;
        session->ctx = ctx;
 
        if (ctx->key) {