diff options
Diffstat (limited to 'src/rspamd_proxy.c')
-rw-r--r-- | src/rspamd_proxy.c | 551 |
1 files changed, 536 insertions, 15 deletions
diff --git a/src/rspamd_proxy.c b/src/rspamd_proxy.c index 694e87c12..195442f51 100644 --- a/src/rspamd_proxy.c +++ b/src/rspamd_proxy.c @@ -1,5 +1,5 @@ /* - * Copyright 2024 Vsevolod Stakhov + * Copyright 2025 Vsevolod Stakhov * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -85,6 +85,12 @@ worker_t rspamd_proxy_worker = { RSPAMD_WORKER_SOCKET_TCP, /* TCP socket */ RSPAMD_WORKER_VER}; +enum rspamd_proxy_log_tag_type { + RSPAMD_PROXY_LOG_TAG_SESSION = 0, /* Use session mempool tag (default) */ + RSPAMD_PROXY_LOG_TAG_QUEUE_ID, /* Use Queue-ID from client message */ + RSPAMD_PROXY_LOG_TAG_NONE, /* Skip log tag passing */ +}; + struct rspamd_http_upstream { char *name; char *settings_id; @@ -96,6 +102,10 @@ struct rspamd_http_upstream { gboolean local; gboolean self_scan; gboolean compress; + gboolean ssl; + gboolean keepalive; /* Whether to use keepalive for this upstream */ + enum rspamd_proxy_log_tag_type log_tag_type; + ucl_object_t *extra_headers; }; struct rspamd_http_mirror { @@ -109,6 +119,10 @@ struct rspamd_http_mirror { int parser_to_ref; gboolean local; gboolean compress; + gboolean ssl; + gboolean keepalive; /* Whether to use keepalive for this mirror */ + enum rspamd_proxy_log_tag_type log_tag_type; + ucl_object_t *extra_headers; }; static const uint64_t rspamd_rspamd_proxy_magic = 0xcdeb4fd1fc351980ULL; @@ -161,6 +175,8 @@ struct rspamd_proxy_ctx { /* Language detector */ struct rspamd_lang_detector *lang_det; double task_timeout; + /* Default log tag type for worker */ + enum rspamd_proxy_log_tag_type log_tag_type; struct rspamd_main *srv; }; @@ -195,6 +211,11 @@ enum rspamd_proxy_legacy_support { LEGACY_SUPPORT_SPAMC }; +enum rspamd_proxy_session_flags { + RSPAMD_PROXY_SESSION_FLAG_USE_KEEPALIVE = 1 << 0, + RSPAMD_PROXY_SESSION_FLAG_CLIENT_SUPPORTS_COMPRESSION = 1 << 1, +}; + struct rspamd_proxy_session { struct rspamd_worker *worker; rspamd_mempool_t *pool; @@ -214,6 +235,7 @@ struct rspamd_proxy_session { enum rspamd_proxy_legacy_support legacy_support; int retries; ref_entry_t ref; + enum rspamd_proxy_session_flags flags; }; static gboolean proxy_send_master_message(struct rspamd_proxy_session *session); @@ -224,6 +246,77 @@ rspamd_proxy_quark(void) return g_quark_from_static_string("rspamd-proxy"); } +static enum rspamd_proxy_log_tag_type +rspamd_proxy_parse_log_tag_type(const char *str) +{ + if (str == NULL) { + return RSPAMD_PROXY_LOG_TAG_SESSION; + } + + if (g_ascii_strcasecmp(str, "session") == 0 || + g_ascii_strcasecmp(str, "session_tag") == 0) { + return RSPAMD_PROXY_LOG_TAG_SESSION; + } + else if (g_ascii_strcasecmp(str, "queue_id") == 0 || + g_ascii_strcasecmp(str, "queue-id") == 0) { + return RSPAMD_PROXY_LOG_TAG_QUEUE_ID; + } + else if (g_ascii_strcasecmp(str, "none") == 0 || + g_ascii_strcasecmp(str, "skip") == 0) { + return RSPAMD_PROXY_LOG_TAG_NONE; + } + + /* Default to session tag for unknown values */ + return RSPAMD_PROXY_LOG_TAG_SESSION; +} + +static void +rspamd_proxy_add_log_tag_header(struct rspamd_http_message *msg, + struct rspamd_proxy_session *session, + enum rspamd_proxy_log_tag_type log_tag_type) +{ + const rspamd_ftok_t *queue_id_hdr; + + switch (log_tag_type) { + case RSPAMD_PROXY_LOG_TAG_SESSION: + /* Use session mempool tag (current behavior) */ + rspamd_http_message_add_header_len(msg, LOG_TAG_HEADER, session->pool->tag.uid, + strnlen(session->pool->tag.uid, sizeof(session->pool->tag.uid))); + break; + + case RSPAMD_PROXY_LOG_TAG_QUEUE_ID: + /* Try to extract Queue-ID from client message */ + if (session->client_message) { + queue_id_hdr = rspamd_http_message_find_header(session->client_message, QUEUE_ID_HEADER); + if (queue_id_hdr) { + rspamd_http_message_add_header_len(msg, LOG_TAG_HEADER, + queue_id_hdr->begin, queue_id_hdr->len); + } + /* If no Queue-ID found, fall back to session tag */ + else { + rspamd_http_message_add_header_len(msg, LOG_TAG_HEADER, session->pool->tag.uid, + strnlen(session->pool->tag.uid, sizeof(session->pool->tag.uid))); + } + } + else { + /* No client message, fall back to session tag */ + rspamd_http_message_add_header_len(msg, LOG_TAG_HEADER, session->pool->tag.uid, + strnlen(session->pool->tag.uid, sizeof(session->pool->tag.uid))); + } + break; + + case RSPAMD_PROXY_LOG_TAG_NONE: + /* Skip adding log tag header */ + break; + + default: + /* Fall back to session tag for unknown types */ + rspamd_http_message_add_header_len(msg, LOG_TAG_HEADER, session->pool->tag.uid, + strnlen(session->pool->tag.uid, sizeof(session->pool->tag.uid))); + break; + } +} + static gboolean rspamd_proxy_parse_lua_parser(lua_State *L, const ucl_object_t *obj, int *ref_from, int *ref_to, GError **err) @@ -392,6 +485,7 @@ rspamd_proxy_parse_upstream(rspamd_mempool_t *pool, up->parser_from_ref = -1; up->parser_to_ref = -1; up->timeout = ctx->timeout; + up->log_tag_type = ctx->log_tag_type; /* Inherit from worker default */ elt = ucl_object_lookup(obj, "key"); if (elt != NULL) { @@ -420,6 +514,21 @@ rspamd_proxy_parse_upstream(rspamd_mempool_t *pool, up->compress = TRUE; } + elt = ucl_object_lookup(obj, "ssl"); + if (elt && ucl_object_toboolean(elt)) { + up->ssl = TRUE; + } + + elt = ucl_object_lookup_any(obj, "keepalive", "keep_alive", NULL); + if (elt && ucl_object_toboolean(elt)) { + up->keepalive = TRUE; + } + + elt = ucl_object_lookup_any(obj, "keepalive", "keep_alive", NULL); + if (elt && ucl_object_toboolean(elt)) { + up->keepalive = TRUE; + } + elt = ucl_object_lookup(obj, "hosts"); if (elt == NULL && !up->self_scan) { @@ -469,6 +578,27 @@ rspamd_proxy_parse_upstream(rspamd_mempool_t *pool, up->settings_id = rspamd_mempool_strdup(pool, ucl_object_tostring(elt)); } + elt = ucl_object_lookup(obj, "extra_headers"); + if (elt && ucl_object_type(elt) == UCL_OBJECT) { + up->extra_headers = ucl_object_ref(elt); + rspamd_mempool_add_destructor(pool, + (rspamd_mempool_destruct_t) ucl_object_unref, + up->extra_headers); + } + + elt = ucl_object_lookup(obj, "extra_headers"); + if (elt && ucl_object_type(elt) == UCL_OBJECT) { + up->extra_headers = ucl_object_ref(elt); + rspamd_mempool_add_destructor(pool, + (rspamd_mempool_destruct_t) ucl_object_unref, + up->extra_headers); + } + + elt = ucl_object_lookup_any(obj, "log_tag", "log_tag_type", NULL); + if (elt && ucl_object_type(elt) == UCL_STRING) { + up->log_tag_type = rspamd_proxy_parse_log_tag_type(ucl_object_tostring(elt)); + } + /* * Accept lua function here in form * fun :: String -> UCL @@ -568,6 +698,7 @@ rspamd_proxy_parse_mirror(rspamd_mempool_t *pool, up->parser_to_ref = -1; up->parser_from_ref = -1; up->timeout = ctx->timeout; + up->log_tag_type = ctx->log_tag_type; /* Inherit from worker default */ elt = ucl_object_lookup(obj, "key"); if (elt != NULL) { @@ -648,6 +779,11 @@ rspamd_proxy_parse_mirror(rspamd_mempool_t *pool, up->settings_id = rspamd_mempool_strdup(pool, ucl_object_tostring(elt)); } + elt = ucl_object_lookup_any(obj, "log_tag", "log_tag_type", NULL); + if (elt && ucl_object_type(elt) == UCL_STRING) { + up->log_tag_type = rspamd_proxy_parse_log_tag_type(ucl_object_tostring(elt)); + } + g_ptr_array_add(ctx->mirrors, up); return TRUE; @@ -747,6 +883,29 @@ err: return FALSE; } +static gboolean +rspamd_proxy_parse_log_tag_worker_option(rspamd_mempool_t *pool, + const ucl_object_t *obj, + gpointer ud, + struct rspamd_rcl_section *section, + GError **err) +{ + struct rspamd_proxy_ctx *ctx; + struct rspamd_rcl_struct_parser *pd = ud; + + ctx = pd->user_struct; + + if (ucl_object_type(obj) != UCL_STRING) { + g_set_error(err, rspamd_proxy_quark(), 100, + "log_tag_type option must be a string"); + return FALSE; + } + + ctx->log_tag_type = rspamd_proxy_parse_log_tag_type(ucl_object_tostring(obj)); + + return TRUE; +} + gpointer init_rspamd_proxy(struct rspamd_config *cfg) { @@ -772,6 +931,7 @@ init_rspamd_proxy(struct rspamd_config *cfg) (rspamd_mempool_destruct_t) rspamd_array_free_hard, ctx->cmp_refs); ctx->max_retries = DEFAULT_RETRIES; ctx->spam_header = RSPAMD_MILTER_SPAM_HEADER; + ctx->log_tag_type = RSPAMD_PROXY_LOG_TAG_SESSION; /* Default to session tag */ rspamd_rcl_register_worker_option(cfg, type, @@ -895,6 +1055,16 @@ init_rspamd_proxy(struct rspamd_config *cfg) 0, "Use custom tempfail message"); + /* We need a custom parser for log_tag_type as it's an enum */ + rspamd_rcl_register_worker_option(cfg, + type, + "log_tag_type", + rspamd_proxy_parse_log_tag_worker_option, + ctx, + 0, + 0, + "Log tag type: session (default), queue_id, or none"); + return ctx; } @@ -905,7 +1075,11 @@ proxy_backend_close_connection(struct rspamd_proxy_backend_connection *conn) if (conn->backend_conn) { rspamd_http_connection_reset(conn->backend_conn); rspamd_http_connection_unref(conn->backend_conn); - close(conn->backend_sock); + + if (!(conn->s && (conn->s->flags & RSPAMD_PROXY_SESSION_FLAG_USE_KEEPALIVE))) { + /* Only close socket if we're not using keepalive */ + close(conn->backend_sock); + } } conn->flags |= RSPAMD_BACKEND_CLOSED; @@ -970,7 +1144,7 @@ proxy_backend_parse_results(struct rspamd_proxy_session *session, RSPAMD_FTOK_ASSIGN(&json_ct, "application/json"); if (ct && rspamd_ftok_casecmp(ct, &json_ct) == 0) { - parser = ucl_parser_new(0); + parser = ucl_parser_new(UCL_PARSER_SAFE_FLAGS); if (!ucl_parser_add_chunk(parser, in, inlen)) { char *encoded; @@ -1166,6 +1340,7 @@ proxy_request_compress(struct rspamd_http_message *msg) ZSTD_freeCCtx(zctx); rspamd_http_message_set_body_from_fstring_steal(msg, body); rspamd_http_message_add_header(msg, COMPRESSION_HEADER, "zstd"); + rspamd_http_message_add_header(msg, CONTENT_ENCODING_HEADER, "zstd"); } } @@ -1226,6 +1401,7 @@ proxy_request_decompress(struct rspamd_http_message *msg) ZSTD_freeDStream(zstream); rspamd_http_message_set_body_from_fstring_steal(msg, body); rspamd_http_message_remove_header(msg, COMPRESSION_HEADER); + rspamd_http_message_remove_header(msg, CONTENT_ENCODING_HEADER); } } @@ -1247,6 +1423,7 @@ proxy_session_refresh(struct rspamd_proxy_session *session) nsession->client_sock = session->client_sock; session->client_sock = -1; nsession->mirror_conns = g_ptr_array_sized_new(nsession->ctx->mirrors->len); + nsession->flags = session->flags; REF_INIT_RETAIN(nsession, proxy_session_dtor); @@ -1384,6 +1561,8 @@ proxy_backend_mirror_finish_handler(struct rspamd_http_connection *conn, struct rspamd_proxy_backend_connection *bk_conn = conn->ud; struct rspamd_proxy_session *session; const rspamd_ftok_t *orig_ct; + const rspamd_ftok_t *conn_hdr; + gboolean is_keepalive = FALSE; session = bk_conn->s; @@ -1403,6 +1582,36 @@ proxy_backend_mirror_finish_handler(struct rspamd_http_connection *conn, bk_conn->name, msg->code); rspamd_upstream_ok(bk_conn->up); + /* Check if we can use keepalive */ + conn_hdr = rspamd_http_message_find_header(msg, "Connection"); + if (conn_hdr) { + if (rspamd_substring_search_caseless(conn_hdr->begin, conn_hdr->len, + "keep-alive", 10) != -1) { + is_keepalive = TRUE; + } + } + + if (is_keepalive && (session->flags & RSPAMD_PROXY_SESSION_FLAG_USE_KEEPALIVE) && + bk_conn->up && session->ctx->http_ctx) { + /* Store connection in keepalive pool */ + const char *up_name = rspamd_upstream_name(bk_conn->up); + if (up_name) { + rspamd_http_context_prepare_keepalive(session->ctx->http_ctx, + conn, rspamd_upstream_addr_cur(bk_conn->up), + up_name, FALSE); + rspamd_http_context_push_keepalive(session->ctx->http_ctx, + conn, msg, session->ctx->event_loop); + + msg_debug_session("pushed mirror connection to %s to keepalive pool", + bk_conn->name); + + /* Mark connection as closed without actually closing it */ + bk_conn->flags |= RSPAMD_BACKEND_CLOSED; + REF_RELEASE(bk_conn->s); + return 0; + } + } + proxy_backend_close_connection(bk_conn); REF_RELEASE(bk_conn->s); @@ -1418,6 +1627,7 @@ proxy_open_mirror_connections(struct rspamd_proxy_session *session) struct rspamd_proxy_backend_connection *bk_conn; struct rspamd_http_message *msg; GError *err = NULL; + const rspamd_inet_addr_t *keepalive_addr; coin = rspamd_random_double(); @@ -1429,6 +1639,157 @@ proxy_open_mirror_connections(struct rspamd_proxy_session *session) continue; } + /* Check if we can use keepalive for this mirror */ + if (m->keepalive && session->ctx->http_ctx) { + const char *up_name = NULL; + unsigned int port = 0; + + /* Try to find a keepalive connection */ + if (m->u) { + struct upstream *up = rspamd_upstream_get(m->u, + RSPAMD_UPSTREAM_ROUND_ROBIN, NULL, 0); + if (up) { + up_name = rspamd_upstream_name(up); + port = rspamd_inet_address_get_port(rspamd_upstream_addr_cur(up)); + } + } + + if (up_name) { + keepalive_addr = rspamd_http_context_has_keepalive( + session->ctx->http_ctx, up_name, port, m->ssl); + + if (keepalive_addr) { + /* We found a keepalive connection, use it */ + struct rspamd_http_connection *conn; + + conn = rspamd_http_context_check_keepalive( + session->ctx->http_ctx, + (rspamd_inet_addr_t *) keepalive_addr, + up_name, + m->ssl); + + if (conn) { + /* We have a keepalive connection, set it up */ + bk_conn = rspamd_mempool_alloc0(session->pool, sizeof(*bk_conn)); + bk_conn->s = session; + bk_conn->name = m->name; + bk_conn->timeout = m->timeout; + bk_conn->parser_from_ref = m->parser_from_ref; + bk_conn->parser_to_ref = m->parser_to_ref; + bk_conn->backend_conn = conn; + bk_conn->backend_sock = conn->fd; + + msg = rspamd_http_connection_copy_msg(session->client_message, &err); + + if (msg == NULL) { + msg_err_session("cannot copy message to send to a mirror %s: %e", + m->name, err); + if (err) { + g_error_free(err); + } + continue; + } + + if (up_name) { + rspamd_http_message_remove_header(msg, "Host"); + rspamd_http_message_add_header(msg, "Host", up_name); + } + rspamd_http_message_remove_header(msg, "Connection"); + rspamd_http_message_add_header(msg, "Connection", "keep-alive"); + + if (msg->url->len == 0) { + msg->url = rspamd_fstring_append(msg->url, "/check", strlen("/check")); + } + + if (m->settings_id != NULL) { + rspamd_http_message_remove_header(msg, "Settings-ID"); + rspamd_http_message_add_header(msg, "Settings-ID", m->settings_id); + } + + /* Add extra headers if specified */ + if (m->extra_headers != NULL) { + ucl_object_iter_t it = NULL; + const ucl_object_t *cur; + const char *key, *value; + + while ((cur = ucl_object_iterate(m->extra_headers, &it, true)) != NULL) { + key = ucl_object_key(cur); + value = ucl_object_tostring(cur); + + if (key != NULL && value != NULL) { + rspamd_http_message_remove_header(msg, key); + rspamd_http_message_add_header(msg, key, value); + } + } + } + + /* Add log tag header based on mirror's configuration */ + rspamd_proxy_add_log_tag_header(msg, session, m->log_tag_type); + + /* Set handlers for the connection */ + conn->error_handler = proxy_backend_mirror_error_handler; + conn->finish_handler = proxy_backend_mirror_finish_handler; + conn->ud = bk_conn; + + if (m->key) { + msg->peer_key = rspamd_pubkey_ref(m->key); + } + + if (m->local || rspamd_inet_address_is_local(keepalive_addr)) { + if (session->fname) { + rspamd_http_message_add_header(msg, "File", session->fname); + } + + msg->method = HTTP_GET; + rspamd_http_connection_write_message_shared(conn, + msg, up_name, + NULL, bk_conn, + bk_conn->timeout); + } + else { + if (session->fname) { + msg->flags &= ~RSPAMD_HTTP_FLAG_SHMEM; + rspamd_http_message_set_body(msg, session->map, session->map_len); + } + + msg->method = HTTP_POST; + + if (m->compress) { + proxy_request_compress(msg); + + if (session->client_milter_conn) { + rspamd_http_message_add_header(msg, "Content-Type", + "application/octet-stream"); + } + } + else { + if (session->client_milter_conn) { + rspamd_http_message_add_header(msg, "Content-Type", + "text/plain"); + } + } + + rspamd_http_connection_write_message(conn, + msg, up_name, NULL, bk_conn, + bk_conn->timeout); + } + + g_ptr_array_add(session->mirror_conns, bk_conn); + REF_RETAIN(session); + msg_info_session("send request to %s (using keepalive)", m->name); + + /* + * We have found the existing keepalive connection, so we can + * process another mirror + */ + continue; + } + } + } + } + + /* Non-keepalive connection */ + bk_conn = rspamd_mempool_alloc0(session->pool, sizeof(*bk_conn)); bk_conn->s = session; @@ -1472,7 +1833,9 @@ proxy_open_mirror_connections(struct rspamd_proxy_session *session) rspamd_http_message_remove_header(msg, "Host"); rspamd_http_message_add_header(msg, "Host", up_name); } - rspamd_http_message_add_header(msg, "Connection", "close"); + rspamd_http_message_remove_header(msg, "Connection"); + rspamd_http_message_add_header(msg, "Connection", + m->keepalive ? "keep-alive" : "close"); if (msg->url->len == 0) { msg->url = rspamd_fstring_append(msg->url, "/check", strlen("/check")); @@ -1483,12 +1846,38 @@ proxy_open_mirror_connections(struct rspamd_proxy_session *session) rspamd_http_message_add_header(msg, "Settings-ID", m->settings_id); } + /* Add extra headers if specified */ + if (m->extra_headers != NULL) { + ucl_object_iter_t it = NULL; + const ucl_object_t *cur; + const char *key, *value; + + while ((cur = ucl_object_iterate(m->extra_headers, &it, true)) != NULL) { + key = ucl_object_key(cur); + value = ucl_object_tostring(cur); + + if (key != NULL && value != NULL) { + rspamd_http_message_remove_header(msg, key); + rspamd_http_message_add_header(msg, key, value); + } + } + } + + /* Add log tag header based on mirror's configuration */ + rspamd_proxy_add_log_tag_header(msg, session, m->log_tag_type); + + unsigned int http_opts = RSPAMD_HTTP_CLIENT_SIMPLE; + + if (m->ssl) { + http_opts |= RSPAMD_HTTP_CLIENT_SSL; + } + bk_conn->backend_conn = rspamd_http_connection_new_client_socket( session->ctx->http_ctx, NULL, proxy_backend_mirror_error_handler, proxy_backend_mirror_finish_handler, - RSPAMD_HTTP_CLIENT_SIMPLE, + http_opts, bk_conn->backend_sock); if (m->key) { @@ -1600,8 +1989,9 @@ proxy_backend_master_error_handler(struct rspamd_http_connection *conn, GError * session->retries++; msg_info_session("abnormally closing connection from backend: %s, error: %e," " retries left: %d", - rspamd_inet_address_to_string_pretty( - rspamd_upstream_addr_cur(session->master_conn->up)), + session->master_conn->up ? rspamd_inet_address_to_string_pretty( + rspamd_upstream_addr_cur(session->master_conn->up)) + : "self-scan", err, session->ctx->max_retries - session->retries); rspamd_upstream_fail(bk_conn->up, FALSE, err ? err->message : "unknown"); @@ -1632,8 +2022,9 @@ proxy_backend_master_error_handler(struct rspamd_http_connection *conn, GError * else { msg_info_session("retry connection to: %s" " retries left: %d", - rspamd_inet_address_to_string( - rspamd_upstream_addr_cur(session->master_conn->up)), + session->master_conn->up ? rspamd_inet_address_to_string( + rspamd_upstream_addr_cur(session->master_conn->up)) + : "self-scan", session->ctx->max_retries - session->retries); } } @@ -1647,7 +2038,9 @@ proxy_backend_master_finish_handler(struct rspamd_http_connection *conn, struct rspamd_proxy_session *session, *nsession; rspamd_fstring_t *reply; const rspamd_ftok_t *orig_ct; + const rspamd_ftok_t *conn_hdr; goffset body_offset = -1; + gboolean is_keepalive = FALSE; session = bk_conn->s; rspamd_http_connection_steal_msg(session->master_conn->backend_conn); @@ -1663,6 +2056,16 @@ proxy_backend_master_finish_handler(struct rspamd_http_connection *conn, rspamd_http_message_remove_header(msg, "Server"); rspamd_http_message_remove_header(msg, "Key"); orig_ct = rspamd_http_message_find_header(msg, "Content-Type"); + + /* Check if we can use keepalive */ + conn_hdr = rspamd_http_message_find_header(msg, "Connection"); + if (conn_hdr) { + if (rspamd_substring_search_caseless(conn_hdr->begin, conn_hdr->len, + "keep-alive", 10) != -1) { + is_keepalive = TRUE; + } + } + rspamd_http_connection_reset(session->master_conn->backend_conn); if (!proxy_backend_parse_results(session, bk_conn, session->ctx->lua_state, @@ -1695,6 +2098,22 @@ proxy_backend_master_finish_handler(struct rspamd_http_connection *conn, rspamd_upstream_ok(bk_conn->up); + /* Handle keepalive for master connection */ + if (is_keepalive && (session->flags & RSPAMD_PROXY_SESSION_FLAG_USE_KEEPALIVE) && + bk_conn->up && session->ctx->http_ctx) { + /* Store connection in keepalive pool */ + const char *up_name = rspamd_upstream_name(bk_conn->up); + if (up_name) { + rspamd_http_context_prepare_keepalive(session->ctx->http_ctx, + conn, rspamd_upstream_addr_cur(bk_conn->up), + up_name, FALSE); + + /* We'll push to keepalive pool after we're done with the response */ + msg_debug_session("will push master connection to %s to keepalive pool", + up_name); + } + } + if (session->client_milter_conn) { nsession = proxy_session_refresh(session); @@ -1708,6 +2127,20 @@ proxy_backend_master_finish_handler(struct rspamd_http_connection *conn, rspamd_milter_send_task_results(nsession->client_milter_conn, session->master_conn->results, NULL, 0); } + + /* Push to keepalive if needed */ + if (is_keepalive && (session->flags & RSPAMD_PROXY_SESSION_FLAG_USE_KEEPALIVE) && + bk_conn->up && session->ctx->http_ctx) { + const char *up_name = rspamd_upstream_name(bk_conn->up); + if (up_name) { + rspamd_http_context_push_keepalive(session->ctx->http_ctx, + conn, msg, session->ctx->event_loop); + + /* Mark connection as closed without actually closing it */ + bk_conn->flags |= RSPAMD_BACKEND_CLOSED; + } + } + REF_RELEASE(session); rspamd_http_message_free(msg); } @@ -1720,9 +2153,32 @@ proxy_backend_master_finish_handler(struct rspamd_http_connection *conn, rspamd_http_message_remove_header(msg, "Content-Type"); } + /* Clear any compression headers from backend response */ + rspamd_http_message_remove_header(msg, COMPRESSION_HEADER); + rspamd_http_message_remove_header(msg, CONTENT_ENCODING_HEADER); + + /* Compress response only if client supports compression and it's not a milter session */ + if (!session->client_milter_conn && + (session->flags & RSPAMD_PROXY_SESSION_FLAG_CLIENT_SUPPORTS_COMPRESSION)) { + proxy_request_compress(msg); + } + rspamd_http_connection_write_message(session->client_conn, msg, NULL, passed_ct, session, bk_conn->timeout); + + /* Push to keepalive if needed */ + if (is_keepalive && (session->flags & RSPAMD_PROXY_SESSION_FLAG_USE_KEEPALIVE) && + bk_conn->up && session->ctx->http_ctx) { + const char *up_name = rspamd_upstream_name(bk_conn->up); + if (up_name) { + rspamd_http_context_push_keepalive(session->ctx->http_ctx, + conn, msg, session->ctx->event_loop); + + /* Mark connection as closed without actually closing it */ + bk_conn->flags |= RSPAMD_BACKEND_CLOSED; + } + } } return 0; @@ -1982,6 +2438,14 @@ proxy_send_master_message(struct rspamd_proxy_session *session) /* Remove the original `Connection` header */ rspamd_http_message_remove_header(session->client_message, "Connection"); + /* Set keepalive flag based on backend configuration */ + if (backend && backend->keepalive) { + session->flags |= RSPAMD_PROXY_SESSION_FLAG_USE_KEEPALIVE; + } + else { + session->flags &= ~RSPAMD_PROXY_SESSION_FLAG_USE_KEEPALIVE; + } + if (backend == NULL) { /* No backend */ msg_err_session("cannot find upstream for %s", host ? hostbuf : "default"); @@ -2063,14 +2527,21 @@ proxy_send_master_message(struct rspamd_proxy_session *session) if (up_name) { rspamd_http_message_add_header(msg, "Host", up_name); } - rspamd_http_message_add_header(msg, "Connection", "close"); + rspamd_http_message_add_header(msg, "Connection", + backend->keepalive ? "keep-alive" : "close"); + + unsigned int http_opts = RSPAMD_HTTP_CLIENT_SIMPLE; + + if (backend->ssl) { + http_opts |= RSPAMD_HTTP_CLIENT_SSL; + } session->master_conn->backend_conn = rspamd_http_connection_new_client_socket( session->ctx->http_ctx, NULL, proxy_backend_master_error_handler, proxy_backend_master_finish_handler, - RSPAMD_HTTP_CLIENT_SIMPLE, + http_opts, session->master_conn->backend_sock); session->master_conn->flags &= ~RSPAMD_BACKEND_CLOSED; session->master_conn->parser_from_ref = backend->parser_from_ref; @@ -2086,6 +2557,26 @@ proxy_send_master_message(struct rspamd_proxy_session *session) backend->settings_id); } + /* Add extra headers if specified */ + if (backend->extra_headers != NULL) { + ucl_object_iter_t it = NULL; + const ucl_object_t *cur; + const char *key, *value; + + while ((cur = ucl_object_iterate(backend->extra_headers, &it, true)) != NULL) { + key = ucl_object_key(cur); + value = ucl_object_tostring(cur); + + if (key != NULL && value != NULL) { + rspamd_http_message_remove_header(msg, key); + rspamd_http_message_add_header(msg, key, value); + } + } + } + + /* Add log tag header based on backend's configuration */ + rspamd_proxy_add_log_tag_header(msg, session, backend->log_tag_type); + if (backend->local || rspamd_inet_address_is_local( rspamd_upstream_addr_cur( @@ -2201,13 +2692,40 @@ proxy_client_finish_handler(struct rspamd_http_connection *conn, session->client_message = rspamd_http_connection_steal_msg( session->client_conn); session->shmem_ref = rspamd_http_message_shmem_ref(session->client_message); + + /* Check if client supports compression */ + const rspamd_ftok_t *compression_hdr = rspamd_http_message_find_header(session->client_message, COMPRESSION_HEADER); + const rspamd_ftok_t *accept_encoding = rspamd_http_message_find_header(session->client_message, "Accept-Encoding"); + gboolean client_supports_compression = FALSE; + + /* Rule 1: If request had Compression: zstd header, client supports compression */ + if (compression_hdr) { + rspamd_ftok_t zstd_tok; + zstd_tok.begin = "zstd"; + zstd_tok.len = 4; + + if (rspamd_ftok_casecmp(compression_hdr, &zstd_tok) == 0) { + client_supports_compression = TRUE; + } + } + + /* Rule 2: If client has Accept-Encoding: zstd header, client supports compression */ + if (!client_supports_compression && accept_encoding && + rspamd_substring_search_caseless(accept_encoding->begin, accept_encoding->len, "zstd", 4) != -1) { + client_supports_compression = TRUE; + } + + if (client_supports_compression) { + session->flags |= RSPAMD_PROXY_SESSION_FLAG_CLIENT_SUPPORTS_COMPRESSION; + } + else { + session->flags &= ~RSPAMD_PROXY_SESSION_FLAG_CLIENT_SUPPORTS_COMPRESSION; + } rspamd_http_message_remove_header(msg, "Content-Length"); rspamd_http_message_remove_header(msg, "Transfer-Encoding"); rspamd_http_message_remove_header(msg, "Keep-Alive"); rspamd_http_message_remove_header(msg, "Connection"); rspamd_http_message_remove_header(msg, "Key"); - rspamd_http_message_add_header_len(msg, LOG_TAG_HEADER, session->pool->tag.uid, - strnlen(session->pool->tag.uid, sizeof(session->pool->tag.uid))); proxy_open_mirror_connections(session); rspamd_http_connection_reset(session->client_conn); @@ -2216,8 +2734,9 @@ proxy_client_finish_handler(struct rspamd_http_connection *conn, } else { msg_info_session("finished master connection to %s; HTTP code: %d", - rspamd_inet_address_to_string_pretty( - rspamd_upstream_addr_cur(session->master_conn->up)), + session->master_conn->up ? rspamd_inet_address_to_string_pretty( + rspamd_upstream_addr_cur(session->master_conn->up)) + : "self-scan", msg->code); proxy_backend_close_connection(session->master_conn); REF_RELEASE(session); @@ -2264,6 +2783,8 @@ proxy_milter_finish_handler(int fd, session->master_conn->name = "master"; session->client_message = msg; + /* Milter protocol doesn't support compression, so no need to set compression flag */ + proxy_open_mirror_connections(session); proxy_send_master_message(session); } |