aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--src/libserver/http/http_connection.c2
-rw-r--r--src/rspamd_proxy.c262
2 files changed, 258 insertions, 6 deletions
diff --git a/src/libserver/http/http_connection.c b/src/libserver/http/http_connection.c
index baf37a385..d94f9835e 100644
--- a/src/libserver/http/http_connection.c
+++ b/src/libserver/http/http_connection.c
@@ -2633,4 +2633,4 @@ void rspamd_http_connection_disable_encryption(struct rspamd_http_connection *co
priv->peer_key = NULL;
priv->flags &= ~RSPAMD_HTTP_CONN_FLAG_ENCRYPTED;
}
-} \ No newline at end of file
+}
diff --git a/src/rspamd_proxy.c b/src/rspamd_proxy.c
index c8c3d5a71..8e69298e6 100644
--- a/src/rspamd_proxy.c
+++ b/src/rspamd_proxy.c
@@ -97,6 +97,7 @@ struct rspamd_http_upstream {
gboolean self_scan;
gboolean compress;
gboolean ssl;
+ gboolean keepalive; /* Whether to use keepalive for this upstream */
ucl_object_t *extra_headers;
};
@@ -112,6 +113,7 @@ struct rspamd_http_mirror {
gboolean local;
gboolean compress;
gboolean ssl;
+ gboolean keepalive; /* Whether to use keepalive for this mirror */
ucl_object_t *extra_headers;
};
@@ -218,6 +220,7 @@ struct rspamd_proxy_session {
enum rspamd_proxy_legacy_support legacy_support;
int retries;
ref_entry_t ref;
+ gboolean use_keepalive; /* Whether to use keepalive for this session */
};
static gboolean proxy_send_master_message(struct rspamd_proxy_session *session);
@@ -429,9 +432,14 @@ rspamd_proxy_parse_upstream(rspamd_mempool_t *pool,
up->ssl = TRUE;
}
- elt = ucl_object_lookup(obj, "ssl");
+ elt = ucl_object_lookup_any(obj, "keepalive", "keep_alive", NULL);
if (elt && ucl_object_toboolean(elt)) {
- up->ssl = TRUE;
+ up->keepalive = TRUE;
+ }
+
+ elt = ucl_object_lookup_any(obj, "keepalive", "keep_alive", NULL);
+ if (elt && ucl_object_toboolean(elt)) {
+ up->keepalive = TRUE;
}
elt = ucl_object_lookup(obj, "hosts");
@@ -935,7 +943,11 @@ proxy_backend_close_connection(struct rspamd_proxy_backend_connection *conn)
if (conn->backend_conn) {
rspamd_http_connection_reset(conn->backend_conn);
rspamd_http_connection_unref(conn->backend_conn);
- close(conn->backend_sock);
+
+ if (!(conn->s && conn->s->use_keepalive)) {
+ /* Only close socket if we're not using keepalive */
+ close(conn->backend_sock);
+ }
}
conn->flags |= RSPAMD_BACKEND_CLOSED;
@@ -1414,6 +1426,8 @@ proxy_backend_mirror_finish_handler(struct rspamd_http_connection *conn,
struct rspamd_proxy_backend_connection *bk_conn = conn->ud;
struct rspamd_proxy_session *session;
const rspamd_ftok_t *orig_ct;
+ const rspamd_ftok_t *conn_hdr;
+ gboolean is_keepalive = FALSE;
session = bk_conn->s;
@@ -1433,6 +1447,36 @@ proxy_backend_mirror_finish_handler(struct rspamd_http_connection *conn,
bk_conn->name, msg->code);
rspamd_upstream_ok(bk_conn->up);
+ /* Check if we can use keepalive */
+ conn_hdr = rspamd_http_message_find_header(msg, "Connection");
+ if (conn_hdr) {
+ if (rspamd_substring_search_caseless(conn_hdr->begin, conn_hdr->len,
+ "keep-alive", 10) != -1) {
+ is_keepalive = TRUE;
+ }
+ }
+
+ if (is_keepalive && session->use_keepalive &&
+ bk_conn->up && session->ctx->http_ctx) {
+ /* Store connection in keepalive pool */
+ const char *up_name = rspamd_upstream_name(bk_conn->up);
+ if (up_name) {
+ rspamd_http_context_prepare_keepalive(session->ctx->http_ctx,
+ conn, rspamd_upstream_addr_cur(bk_conn->up),
+ up_name, FALSE);
+ rspamd_http_context_push_keepalive(session->ctx->http_ctx,
+ conn, msg, session->ctx->event_loop);
+
+ msg_debug_session("pushed mirror connection to %s to keepalive pool",
+ bk_conn->name);
+
+ /* Mark connection as closed without actually closing it */
+ bk_conn->flags |= RSPAMD_BACKEND_CLOSED;
+ REF_RELEASE(bk_conn->s);
+ return 0;
+ }
+ }
+
proxy_backend_close_connection(bk_conn);
REF_RELEASE(bk_conn->s);
@@ -1448,6 +1492,7 @@ proxy_open_mirror_connections(struct rspamd_proxy_session *session)
struct rspamd_proxy_backend_connection *bk_conn;
struct rspamd_http_message *msg;
GError *err = NULL;
+ const rspamd_inet_addr_t *keepalive_addr;
coin = rspamd_random_double();
@@ -1459,6 +1504,153 @@ proxy_open_mirror_connections(struct rspamd_proxy_session *session)
continue;
}
+ /* Check if we can use keepalive for this mirror */
+ if (m->keepalive && session->ctx->http_ctx) {
+ const char *up_name = NULL;
+ unsigned int port = 0;
+
+ /* Try to find a keepalive connection */
+ if (m->u) {
+ struct upstream *up = rspamd_upstream_get(m->u,
+ RSPAMD_UPSTREAM_ROUND_ROBIN, NULL, 0);
+ if (up) {
+ up_name = rspamd_upstream_name(up);
+ port = rspamd_inet_address_get_port(rspamd_upstream_addr_cur(up));
+ }
+ }
+
+ if (up_name) {
+ keepalive_addr = rspamd_http_context_has_keepalive(
+ session->ctx->http_ctx, up_name, port, m->ssl);
+
+ if (keepalive_addr) {
+ /* We found a keepalive connection, use it */
+ struct rspamd_http_connection *conn;
+
+ conn = rspamd_http_context_check_keepalive(
+ session->ctx->http_ctx,
+ (rspamd_inet_addr_t *) keepalive_addr,
+ up_name,
+ m->ssl);
+
+ if (conn) {
+ /* We have a keepalive connection, set it up */
+ bk_conn = rspamd_mempool_alloc0(session->pool, sizeof(*bk_conn));
+ bk_conn->s = session;
+ bk_conn->name = m->name;
+ bk_conn->timeout = m->timeout;
+ bk_conn->parser_from_ref = m->parser_from_ref;
+ bk_conn->parser_to_ref = m->parser_to_ref;
+ bk_conn->backend_conn = conn;
+ bk_conn->backend_sock = conn->fd;
+
+ msg = rspamd_http_connection_copy_msg(session->client_message, &err);
+
+ if (msg == NULL) {
+ msg_err_session("cannot copy message to send to a mirror %s: %e",
+ m->name, err);
+ if (err) {
+ g_error_free(err);
+ }
+ continue;
+ }
+
+ if (up_name) {
+ rspamd_http_message_remove_header(msg, "Host");
+ rspamd_http_message_add_header(msg, "Host", up_name);
+ }
+ rspamd_http_message_add_header(msg, "Connection", "keep-alive");
+
+ if (msg->url->len == 0) {
+ msg->url = rspamd_fstring_append(msg->url, "/check", strlen("/check"));
+ }
+
+ if (m->settings_id != NULL) {
+ rspamd_http_message_remove_header(msg, "Settings-ID");
+ rspamd_http_message_add_header(msg, "Settings-ID", m->settings_id);
+ }
+
+ /* Add extra headers if specified */
+ if (m->extra_headers != NULL) {
+ ucl_object_iter_t it = NULL;
+ const ucl_object_t *cur;
+ const char *key, *value;
+
+ while ((cur = ucl_object_iterate(m->extra_headers, &it, true)) != NULL) {
+ key = ucl_object_key(cur);
+ value = ucl_object_tostring(cur);
+
+ if (key != NULL && value != NULL) {
+ rspamd_http_message_remove_header(msg, key);
+ rspamd_http_message_add_header(msg, key, value);
+ }
+ }
+ }
+
+ /* Set handlers for the connection */
+ conn->error_handler = proxy_backend_mirror_error_handler;
+ conn->finish_handler = proxy_backend_mirror_finish_handler;
+ conn->ud = bk_conn;
+
+ if (m->key) {
+ msg->peer_key = rspamd_pubkey_ref(m->key);
+ }
+
+ if (m->local || rspamd_inet_address_is_local(keepalive_addr)) {
+ if (session->fname) {
+ rspamd_http_message_add_header(msg, "File", session->fname);
+ }
+
+ msg->method = HTTP_GET;
+ rspamd_http_connection_write_message_shared(conn,
+ msg, up_name,
+ NULL, bk_conn,
+ bk_conn->timeout);
+ }
+ else {
+ if (session->fname) {
+ msg->flags &= ~RSPAMD_HTTP_FLAG_SHMEM;
+ rspamd_http_message_set_body(msg, session->map, session->map_len);
+ }
+
+ msg->method = HTTP_POST;
+
+ if (m->compress) {
+ proxy_request_compress(msg);
+
+ if (session->client_milter_conn) {
+ rspamd_http_message_add_header(msg, "Content-Type",
+ "application/octet-stream");
+ }
+ }
+ else {
+ if (session->client_milter_conn) {
+ rspamd_http_message_add_header(msg, "Content-Type",
+ "text/plain");
+ }
+ }
+
+ rspamd_http_connection_write_message(conn,
+ msg, up_name, NULL, bk_conn,
+ bk_conn->timeout);
+ }
+
+ g_ptr_array_add(session->mirror_conns, bk_conn);
+ REF_RETAIN(session);
+ msg_info_session("send request to %s (using keepalive)", m->name);
+
+ /*
+ * We have found the existing keepalive connection, so we can
+ * process another mirror
+ */
+ continue;
+ }
+ }
+ }
+ }
+
+ /* Non-keepalive connection */
+
bk_conn = rspamd_mempool_alloc0(session->pool,
sizeof(*bk_conn));
bk_conn->s = session;
@@ -1502,7 +1694,8 @@ proxy_open_mirror_connections(struct rspamd_proxy_session *session)
rspamd_http_message_remove_header(msg, "Host");
rspamd_http_message_add_header(msg, "Host", up_name);
}
- rspamd_http_message_add_header(msg, "Connection", "close");
+ rspamd_http_message_add_header(msg, "Connection",
+ m->keepalive ? "keep-alive" : "close");
if (msg->url->len == 0) {
msg->url = rspamd_fstring_append(msg->url, "/check", strlen("/check"));
@@ -1702,7 +1895,9 @@ proxy_backend_master_finish_handler(struct rspamd_http_connection *conn,
struct rspamd_proxy_session *session, *nsession;
rspamd_fstring_t *reply;
const rspamd_ftok_t *orig_ct;
+ const rspamd_ftok_t *conn_hdr;
goffset body_offset = -1;
+ gboolean is_keepalive = FALSE;
session = bk_conn->s;
rspamd_http_connection_steal_msg(session->master_conn->backend_conn);
@@ -1718,6 +1913,16 @@ proxy_backend_master_finish_handler(struct rspamd_http_connection *conn,
rspamd_http_message_remove_header(msg, "Server");
rspamd_http_message_remove_header(msg, "Key");
orig_ct = rspamd_http_message_find_header(msg, "Content-Type");
+
+ /* Check if we can use keepalive */
+ conn_hdr = rspamd_http_message_find_header(msg, "Connection");
+ if (conn_hdr) {
+ if (rspamd_substring_search_caseless(conn_hdr->begin, conn_hdr->len,
+ "keep-alive", 10) != -1) {
+ is_keepalive = TRUE;
+ }
+ }
+
rspamd_http_connection_reset(session->master_conn->backend_conn);
if (!proxy_backend_parse_results(session, bk_conn, session->ctx->lua_state,
@@ -1750,6 +1955,22 @@ proxy_backend_master_finish_handler(struct rspamd_http_connection *conn,
rspamd_upstream_ok(bk_conn->up);
+ /* Handle keepalive for master connection */
+ if (is_keepalive && session->use_keepalive &&
+ bk_conn->up && session->ctx->http_ctx) {
+ /* Store connection in keepalive pool */
+ const char *up_name = rspamd_upstream_name(bk_conn->up);
+ if (up_name) {
+ rspamd_http_context_prepare_keepalive(session->ctx->http_ctx,
+ conn, rspamd_upstream_addr_cur(bk_conn->up),
+ up_name, FALSE);
+
+ /* We'll push to keepalive pool after we're done with the response */
+ msg_debug_session("will push master connection to %s to keepalive pool",
+ up_name);
+ }
+ }
+
if (session->client_milter_conn) {
nsession = proxy_session_refresh(session);
@@ -1763,6 +1984,20 @@ proxy_backend_master_finish_handler(struct rspamd_http_connection *conn,
rspamd_milter_send_task_results(nsession->client_milter_conn,
session->master_conn->results, NULL, 0);
}
+
+ /* Push to keepalive if needed */
+ if (is_keepalive && session->use_keepalive &&
+ bk_conn->up && session->ctx->http_ctx) {
+ const char *up_name = rspamd_upstream_name(bk_conn->up);
+ if (up_name) {
+ rspamd_http_context_push_keepalive(session->ctx->http_ctx,
+ conn, msg, session->ctx->event_loop);
+
+ /* Mark connection as closed without actually closing it */
+ bk_conn->flags |= RSPAMD_BACKEND_CLOSED;
+ }
+ }
+
REF_RELEASE(session);
rspamd_http_message_free(msg);
}
@@ -1778,6 +2013,19 @@ proxy_backend_master_finish_handler(struct rspamd_http_connection *conn,
rspamd_http_connection_write_message(session->client_conn,
msg, NULL, passed_ct, session,
bk_conn->timeout);
+
+ /* Push to keepalive if needed */
+ if (is_keepalive && session->use_keepalive &&
+ bk_conn->up && session->ctx->http_ctx) {
+ const char *up_name = rspamd_upstream_name(bk_conn->up);
+ if (up_name) {
+ rspamd_http_context_push_keepalive(session->ctx->http_ctx,
+ conn, msg, session->ctx->event_loop);
+
+ /* Mark connection as closed without actually closing it */
+ bk_conn->flags |= RSPAMD_BACKEND_CLOSED;
+ }
+ }
}
return 0;
@@ -2037,6 +2285,9 @@ proxy_send_master_message(struct rspamd_proxy_session *session)
/* Remove the original `Connection` header */
rspamd_http_message_remove_header(session->client_message, "Connection");
+ /* Set keepalive flag based on backend configuration */
+ session->use_keepalive = backend ? backend->keepalive : FALSE;
+
if (backend == NULL) {
/* No backend */
msg_err_session("cannot find upstream for %s", host ? hostbuf : "default");
@@ -2118,7 +2369,8 @@ proxy_send_master_message(struct rspamd_proxy_session *session)
if (up_name) {
rspamd_http_message_add_header(msg, "Host", up_name);
}
- rspamd_http_message_add_header(msg, "Connection", "close");
+ rspamd_http_message_add_header(msg, "Connection",
+ backend->keepalive ? "keep-alive" : "close");
unsigned int http_opts = RSPAMD_HTTP_CLIENT_SIMPLE;