]> source.dussan.org Git - rspamd.git/commitdiff
[Feature] Implement mirroring in rspamd proxy
authorVsevolod Stakhov <vsevolod@highsecure.ru>
Fri, 6 May 2016 15:51:10 +0000 (16:51 +0100)
committerVsevolod Stakhov <vsevolod@highsecure.ru>
Fri, 6 May 2016 15:51:10 +0000 (16:51 +0100)
src/rspamd_proxy.c

index afbbbbe876b338666a373dbd13547a3f217825ba..4bf8d2e18bc477900d387ff58a0abf5b5ce33e18 100644 (file)
@@ -460,6 +460,8 @@ proxy_backend_close_connection (struct rspamd_proxy_backend_connection *conn)
        if (conn->results) {
                ucl_object_unref (conn->results);
        }
+
+       conn->flags |= RSPAMD_BACKEND_CLOSED;
 }
 
 static gboolean
@@ -520,6 +522,9 @@ proxy_backend_parse_results (struct rspamd_proxy_session *session,
 static void
 proxy_session_dtor (struct rspamd_proxy_session *session)
 {
+       guint i;
+       struct rspamd_proxy_backend_connection *conn;
+
        if (session->master_conn) {
                proxy_backend_close_connection (session->master_conn);
        }
@@ -533,6 +538,15 @@ proxy_session_dtor (struct rspamd_proxy_session *session)
                rspamd_http_connection_unref (session->client_conn);
        }
 
+       for (i = 0; i < session->mirror_conns->len; i ++) {
+               conn = g_ptr_array_index (session->mirror_conns, i);
+
+               if (!(conn->flags & RSPAMD_BACKEND_CLOSED)) {
+                       proxy_backend_close_connection (conn);
+               }
+       }
+
+       g_ptr_array_free (session->mirror_conns, TRUE);
        rspamd_inet_address_destroy (session->client_addr);
        close (session->client_sock);
        rspamd_mempool_delete (session->pool);
@@ -626,6 +640,109 @@ proxy_check_file (struct rspamd_http_message *msg,
        return TRUE;
 }
 
+static void
+proxy_backend_mirror_error_handler (struct rspamd_http_connection *conn, GError *err)
+{
+       struct rspamd_proxy_backend_connection *bk_conn = conn->ud;
+       struct rspamd_proxy_session *session;
+
+       session = bk_conn->s;
+       msg_info_session ("abnormally closing connection from backend: %s, error: %s",
+               rspamd_inet_address_to_string (rspamd_upstream_addr (bk_conn->up)),
+               err->message);
+
+       proxy_backend_close_connection (bk_conn);
+}
+
+static gint
+proxy_backend_mirror_finish_handler (struct rspamd_http_connection *conn,
+       struct rspamd_http_message *msg)
+{
+       struct rspamd_proxy_backend_connection *bk_conn = conn->ud;
+       struct rspamd_proxy_session *session;
+
+       session = bk_conn->s;
+
+       if (!proxy_backend_parse_results (session, bk_conn, session->ctx->lua_state,
+                       -1, msg->body->str, msg->body->len)) {
+               msg_warn_session ("cannot parse results from the mirror backend %s",
+                               bk_conn->name);
+       }
+
+       proxy_backend_close_connection (bk_conn);
+
+       return 0;
+}
+
+static void
+proxy_open_mirror_connections (struct rspamd_proxy_session *session)
+{
+       gdouble coin;
+       struct rspamd_http_mirror *m;
+       guint i;
+       struct rspamd_proxy_backend_connection *bk_conn;
+       struct rspamd_http_message *msg;
+
+       coin = rspamd_random_double ();
+
+       for (i = 0; i < session->ctx->mirrors->len; i ++) {
+               m = g_ptr_array_index (session->ctx->mirrors, i);
+
+               if (m->prob < coin) {
+                       /* No luck */
+                       continue;
+               }
+
+               bk_conn = rspamd_mempool_alloc0 (session->pool,
+                               sizeof (*bk_conn));
+               bk_conn->s = session;
+               bk_conn->name = m->name;
+
+
+               bk_conn->up = rspamd_upstream_get (m->u,
+                               RSPAMD_UPSTREAM_ROUND_ROBIN, NULL, 0);
+
+               if (bk_conn->up == NULL) {
+                       msg_err_session ("cannot select upstream for %s", m->name);
+                       continue;
+               }
+
+               bk_conn->backend_sock = rspamd_inet_address_connect (
+                               rspamd_upstream_addr (bk_conn->up),
+                               SOCK_STREAM, TRUE);
+
+               if (bk_conn->backend_sock == -1) {
+                       msg_err_session ("cannot connect upstream for %s", m->name);
+                       rspamd_upstream_fail (bk_conn->up);
+                       continue;
+               }
+
+               msg = rspamd_http_connection_copy_msg (session->client_conn);
+               rspamd_http_message_remove_header (msg, "Content-Length");
+               rspamd_http_message_remove_header (msg, "Key");
+
+               bk_conn->backend_conn = rspamd_http_connection_new (
+                               NULL,
+                               proxy_backend_mirror_error_handler,
+                               proxy_backend_mirror_finish_handler,
+                               RSPAMD_HTTP_CLIENT_SIMPLE,
+                               RSPAMD_HTTP_CLIENT,
+                               session->ctx->keys_cache);
+
+               rspamd_http_connection_set_key (bk_conn->backend_conn,
+                               session->ctx->local_key);
+               msg->peer_key = rspamd_pubkey_ref (m->key);
+
+               rspamd_http_connection_write_message (bk_conn->backend_conn,
+                               msg, NULL, NULL, bk_conn,
+                               bk_conn->backend_sock,
+                               &session->ctx->io_tv, session->ctx->ev_base);
+
+               g_ptr_array_add (session->mirror_conns, bk_conn);
+               msg_info_session ("send request to %s", m->name);
+       }
+}
+
 static void
 proxy_client_write_error (struct rspamd_proxy_session *session, gint code)
 {
@@ -648,7 +765,6 @@ proxy_backend_master_error_handler (struct rspamd_http_connection *conn, GError
        msg_info_session ("abnormally closing connection from backend: %s, error: %s",
                rspamd_inet_address_to_string (rspamd_upstream_addr (session->master_conn->up)),
                err->message);
-       rspamd_http_connection_reset (session->master_conn->backend_conn);
        /* Terminate session immediately */
        proxy_client_write_error (session, err->code);
 }
@@ -669,6 +785,11 @@ proxy_backend_master_finish_handler (struct rspamd_http_connection *conn,
                msg, NULL, NULL, session, session->client_sock,
                &session->ctx->io_tv, session->ctx->ev_base);
 
+       if (!proxy_backend_parse_results (session, bk_conn, session->ctx->lua_state,
+                       -1, msg->body->str, msg->body->len)) {
+               msg_warn_session ("cannot parse results from the master backend");
+       }
+
        return 0;
 }
 
@@ -680,6 +801,8 @@ proxy_client_error_handler (struct rspamd_http_connection *conn, GError *err)
        msg_info_session ("abnormally closing connection from: %s, error: %s",
                rspamd_inet_address_to_string (session->client_addr), err->message);
        /* Terminate session immediately */
+       proxy_backend_close_connection (session->master_conn);
+       session->master_conn = NULL;
        REF_RELEASE (session);
 }
 
@@ -739,6 +862,7 @@ proxy_client_finish_handler (struct rspamd_http_connection *conn,
                                goto err;
                        }
 
+                       proxy_open_mirror_connections (session);
                        rspamd_http_connection_steal_msg (session->client_conn);
                        rspamd_http_message_remove_header (msg, "Content-Length");
                        rspamd_http_message_remove_header (msg, "Key");