diff options
Diffstat (limited to 'src/rspamd_proxy.c')
-rw-r--r-- | src/rspamd_proxy.c | 491 |
1 files changed, 479 insertions, 12 deletions
diff --git a/src/rspamd_proxy.c b/src/rspamd_proxy.c index b7decbbcc..77d2336b2 100644 --- a/src/rspamd_proxy.c +++ b/src/rspamd_proxy.c @@ -1,5 +1,5 @@ /* - * Copyright 2024 Vsevolod Stakhov + * Copyright 2025 Vsevolod Stakhov * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -85,6 +85,12 @@ worker_t rspamd_proxy_worker = { RSPAMD_WORKER_SOCKET_TCP, /* TCP socket */ RSPAMD_WORKER_VER}; +enum rspamd_proxy_log_tag_type { + RSPAMD_PROXY_LOG_TAG_SESSION = 0, /* Use session mempool tag (default) */ + RSPAMD_PROXY_LOG_TAG_QUEUE_ID, /* Use Queue-ID from client message */ + RSPAMD_PROXY_LOG_TAG_NONE, /* Skip log tag passing */ +}; + struct rspamd_http_upstream { char *name; char *settings_id; @@ -96,6 +102,10 @@ struct rspamd_http_upstream { gboolean local; gboolean self_scan; gboolean compress; + gboolean ssl; + gboolean keepalive; /* Whether to use keepalive for this upstream */ + enum rspamd_proxy_log_tag_type log_tag_type; + ucl_object_t *extra_headers; }; struct rspamd_http_mirror { @@ -109,6 +119,10 @@ struct rspamd_http_mirror { int parser_to_ref; gboolean local; gboolean compress; + gboolean ssl; + gboolean keepalive; /* Whether to use keepalive for this mirror */ + enum rspamd_proxy_log_tag_type log_tag_type; + ucl_object_t *extra_headers; }; static const uint64_t rspamd_rspamd_proxy_magic = 0xcdeb4fd1fc351980ULL; @@ -161,6 +175,8 @@ struct rspamd_proxy_ctx { /* Language detector */ struct rspamd_lang_detector *lang_det; double task_timeout; + /* Default log tag type for worker */ + enum rspamd_proxy_log_tag_type log_tag_type; struct rspamd_main *srv; }; @@ -214,6 +230,7 @@ struct rspamd_proxy_session { enum rspamd_proxy_legacy_support legacy_support; int retries; ref_entry_t ref; + gboolean use_keepalive; /* Whether to use keepalive for this session */ }; static gboolean proxy_send_master_message(struct rspamd_proxy_session *session); @@ -224,6 +241,77 @@ rspamd_proxy_quark(void) return g_quark_from_static_string("rspamd-proxy"); } +static enum rspamd_proxy_log_tag_type +rspamd_proxy_parse_log_tag_type(const char *str) +{ + if (str == NULL) { + return RSPAMD_PROXY_LOG_TAG_SESSION; + } + + if (g_ascii_strcasecmp(str, "session") == 0 || + g_ascii_strcasecmp(str, "session_tag") == 0) { + return RSPAMD_PROXY_LOG_TAG_SESSION; + } + else if (g_ascii_strcasecmp(str, "queue_id") == 0 || + g_ascii_strcasecmp(str, "queue-id") == 0) { + return RSPAMD_PROXY_LOG_TAG_QUEUE_ID; + } + else if (g_ascii_strcasecmp(str, "none") == 0 || + g_ascii_strcasecmp(str, "skip") == 0) { + return RSPAMD_PROXY_LOG_TAG_NONE; + } + + /* Default to session tag for unknown values */ + return RSPAMD_PROXY_LOG_TAG_SESSION; +} + +static void +rspamd_proxy_add_log_tag_header(struct rspamd_http_message *msg, + struct rspamd_proxy_session *session, + enum rspamd_proxy_log_tag_type log_tag_type) +{ + const rspamd_ftok_t *queue_id_hdr; + + switch (log_tag_type) { + case RSPAMD_PROXY_LOG_TAG_SESSION: + /* Use session mempool tag (current behavior) */ + rspamd_http_message_add_header_len(msg, LOG_TAG_HEADER, session->pool->tag.uid, + strnlen(session->pool->tag.uid, sizeof(session->pool->tag.uid))); + break; + + case RSPAMD_PROXY_LOG_TAG_QUEUE_ID: + /* Try to extract Queue-ID from client message */ + if (session->client_message) { + queue_id_hdr = rspamd_http_message_find_header(session->client_message, QUEUE_ID_HEADER); + if (queue_id_hdr) { + rspamd_http_message_add_header_len(msg, LOG_TAG_HEADER, + queue_id_hdr->begin, queue_id_hdr->len); + } + /* If no Queue-ID found, fall back to session tag */ + else { + rspamd_http_message_add_header_len(msg, LOG_TAG_HEADER, session->pool->tag.uid, + strnlen(session->pool->tag.uid, sizeof(session->pool->tag.uid))); + } + } + else { + /* No client message, fall back to session tag */ + rspamd_http_message_add_header_len(msg, LOG_TAG_HEADER, session->pool->tag.uid, + strnlen(session->pool->tag.uid, sizeof(session->pool->tag.uid))); + } + break; + + case RSPAMD_PROXY_LOG_TAG_NONE: + /* Skip adding log tag header */ + break; + + default: + /* Fall back to session tag for unknown types */ + rspamd_http_message_add_header_len(msg, LOG_TAG_HEADER, session->pool->tag.uid, + strnlen(session->pool->tag.uid, sizeof(session->pool->tag.uid))); + break; + } +} + static gboolean rspamd_proxy_parse_lua_parser(lua_State *L, const ucl_object_t *obj, int *ref_from, int *ref_to, GError **err) @@ -392,6 +480,7 @@ rspamd_proxy_parse_upstream(rspamd_mempool_t *pool, up->parser_from_ref = -1; up->parser_to_ref = -1; up->timeout = ctx->timeout; + up->log_tag_type = ctx->log_tag_type; /* Inherit from worker default */ elt = ucl_object_lookup(obj, "key"); if (elt != NULL) { @@ -420,6 +509,21 @@ rspamd_proxy_parse_upstream(rspamd_mempool_t *pool, up->compress = TRUE; } + elt = ucl_object_lookup(obj, "ssl"); + if (elt && ucl_object_toboolean(elt)) { + up->ssl = TRUE; + } + + elt = ucl_object_lookup_any(obj, "keepalive", "keep_alive", NULL); + if (elt && ucl_object_toboolean(elt)) { + up->keepalive = TRUE; + } + + elt = ucl_object_lookup_any(obj, "keepalive", "keep_alive", NULL); + if (elt && ucl_object_toboolean(elt)) { + up->keepalive = TRUE; + } + elt = ucl_object_lookup(obj, "hosts"); if (elt == NULL && !up->self_scan) { @@ -469,6 +573,27 @@ rspamd_proxy_parse_upstream(rspamd_mempool_t *pool, up->settings_id = rspamd_mempool_strdup(pool, ucl_object_tostring(elt)); } + elt = ucl_object_lookup(obj, "extra_headers"); + if (elt && ucl_object_type(elt) == UCL_OBJECT) { + up->extra_headers = ucl_object_ref(elt); + rspamd_mempool_add_destructor(pool, + (rspamd_mempool_destruct_t) ucl_object_unref, + up->extra_headers); + } + + elt = ucl_object_lookup(obj, "extra_headers"); + if (elt && ucl_object_type(elt) == UCL_OBJECT) { + up->extra_headers = ucl_object_ref(elt); + rspamd_mempool_add_destructor(pool, + (rspamd_mempool_destruct_t) ucl_object_unref, + up->extra_headers); + } + + elt = ucl_object_lookup_any(obj, "log_tag", "log_tag_type", NULL); + if (elt && ucl_object_type(elt) == UCL_STRING) { + up->log_tag_type = rspamd_proxy_parse_log_tag_type(ucl_object_tostring(elt)); + } + /* * Accept lua function here in form * fun :: String -> UCL @@ -568,6 +693,7 @@ rspamd_proxy_parse_mirror(rspamd_mempool_t *pool, up->parser_to_ref = -1; up->parser_from_ref = -1; up->timeout = ctx->timeout; + up->log_tag_type = ctx->log_tag_type; /* Inherit from worker default */ elt = ucl_object_lookup(obj, "key"); if (elt != NULL) { @@ -648,6 +774,11 @@ rspamd_proxy_parse_mirror(rspamd_mempool_t *pool, up->settings_id = rspamd_mempool_strdup(pool, ucl_object_tostring(elt)); } + elt = ucl_object_lookup_any(obj, "log_tag", "log_tag_type", NULL); + if (elt && ucl_object_type(elt) == UCL_STRING) { + up->log_tag_type = rspamd_proxy_parse_log_tag_type(ucl_object_tostring(elt)); + } + g_ptr_array_add(ctx->mirrors, up); return TRUE; @@ -747,6 +878,29 @@ err: return FALSE; } +static gboolean +rspamd_proxy_parse_log_tag_worker_option(rspamd_mempool_t *pool, + const ucl_object_t *obj, + gpointer ud, + struct rspamd_rcl_section *section, + GError **err) +{ + struct rspamd_proxy_ctx *ctx; + struct rspamd_rcl_struct_parser *pd = ud; + + ctx = pd->user_struct; + + if (ucl_object_type(obj) != UCL_STRING) { + g_set_error(err, rspamd_proxy_quark(), 100, + "log_tag_type option must be a string"); + return FALSE; + } + + ctx->log_tag_type = rspamd_proxy_parse_log_tag_type(ucl_object_tostring(obj)); + + return TRUE; +} + gpointer init_rspamd_proxy(struct rspamd_config *cfg) { @@ -772,6 +926,7 @@ init_rspamd_proxy(struct rspamd_config *cfg) (rspamd_mempool_destruct_t) rspamd_array_free_hard, ctx->cmp_refs); ctx->max_retries = DEFAULT_RETRIES; ctx->spam_header = RSPAMD_MILTER_SPAM_HEADER; + ctx->log_tag_type = RSPAMD_PROXY_LOG_TAG_SESSION; /* Default to session tag */ rspamd_rcl_register_worker_option(cfg, type, @@ -895,6 +1050,16 @@ init_rspamd_proxy(struct rspamd_config *cfg) 0, "Use custom tempfail message"); + /* We need a custom parser for log_tag_type as it's an enum */ + rspamd_rcl_register_worker_option(cfg, + type, + "log_tag_type", + rspamd_proxy_parse_log_tag_worker_option, + ctx, + 0, + 0, + "Log tag type: session (default), queue_id, or none"); + return ctx; } @@ -905,7 +1070,11 @@ proxy_backend_close_connection(struct rspamd_proxy_backend_connection *conn) if (conn->backend_conn) { rspamd_http_connection_reset(conn->backend_conn); rspamd_http_connection_unref(conn->backend_conn); - close(conn->backend_sock); + + if (!(conn->s && conn->s->use_keepalive)) { + /* Only close socket if we're not using keepalive */ + close(conn->backend_sock); + } } conn->flags |= RSPAMD_BACKEND_CLOSED; @@ -970,7 +1139,7 @@ proxy_backend_parse_results(struct rspamd_proxy_session *session, RSPAMD_FTOK_ASSIGN(&json_ct, "application/json"); if (ct && rspamd_ftok_casecmp(ct, &json_ct) == 0) { - parser = ucl_parser_new(0); + parser = ucl_parser_new(UCL_PARSER_SAFE_FLAGS); if (!ucl_parser_add_chunk(parser, in, inlen)) { char *encoded; @@ -1384,6 +1553,8 @@ proxy_backend_mirror_finish_handler(struct rspamd_http_connection *conn, struct rspamd_proxy_backend_connection *bk_conn = conn->ud; struct rspamd_proxy_session *session; const rspamd_ftok_t *orig_ct; + const rspamd_ftok_t *conn_hdr; + gboolean is_keepalive = FALSE; session = bk_conn->s; @@ -1403,6 +1574,36 @@ proxy_backend_mirror_finish_handler(struct rspamd_http_connection *conn, bk_conn->name, msg->code); rspamd_upstream_ok(bk_conn->up); + /* Check if we can use keepalive */ + conn_hdr = rspamd_http_message_find_header(msg, "Connection"); + if (conn_hdr) { + if (rspamd_substring_search_caseless(conn_hdr->begin, conn_hdr->len, + "keep-alive", 10) != -1) { + is_keepalive = TRUE; + } + } + + if (is_keepalive && session->use_keepalive && + bk_conn->up && session->ctx->http_ctx) { + /* Store connection in keepalive pool */ + const char *up_name = rspamd_upstream_name(bk_conn->up); + if (up_name) { + rspamd_http_context_prepare_keepalive(session->ctx->http_ctx, + conn, rspamd_upstream_addr_cur(bk_conn->up), + up_name, FALSE); + rspamd_http_context_push_keepalive(session->ctx->http_ctx, + conn, msg, session->ctx->event_loop); + + msg_debug_session("pushed mirror connection to %s to keepalive pool", + bk_conn->name); + + /* Mark connection as closed without actually closing it */ + bk_conn->flags |= RSPAMD_BACKEND_CLOSED; + REF_RELEASE(bk_conn->s); + return 0; + } + } + proxy_backend_close_connection(bk_conn); REF_RELEASE(bk_conn->s); @@ -1418,6 +1619,7 @@ proxy_open_mirror_connections(struct rspamd_proxy_session *session) struct rspamd_proxy_backend_connection *bk_conn; struct rspamd_http_message *msg; GError *err = NULL; + const rspamd_inet_addr_t *keepalive_addr; coin = rspamd_random_double(); @@ -1429,6 +1631,157 @@ proxy_open_mirror_connections(struct rspamd_proxy_session *session) continue; } + /* Check if we can use keepalive for this mirror */ + if (m->keepalive && session->ctx->http_ctx) { + const char *up_name = NULL; + unsigned int port = 0; + + /* Try to find a keepalive connection */ + if (m->u) { + struct upstream *up = rspamd_upstream_get(m->u, + RSPAMD_UPSTREAM_ROUND_ROBIN, NULL, 0); + if (up) { + up_name = rspamd_upstream_name(up); + port = rspamd_inet_address_get_port(rspamd_upstream_addr_cur(up)); + } + } + + if (up_name) { + keepalive_addr = rspamd_http_context_has_keepalive( + session->ctx->http_ctx, up_name, port, m->ssl); + + if (keepalive_addr) { + /* We found a keepalive connection, use it */ + struct rspamd_http_connection *conn; + + conn = rspamd_http_context_check_keepalive( + session->ctx->http_ctx, + (rspamd_inet_addr_t *) keepalive_addr, + up_name, + m->ssl); + + if (conn) { + /* We have a keepalive connection, set it up */ + bk_conn = rspamd_mempool_alloc0(session->pool, sizeof(*bk_conn)); + bk_conn->s = session; + bk_conn->name = m->name; + bk_conn->timeout = m->timeout; + bk_conn->parser_from_ref = m->parser_from_ref; + bk_conn->parser_to_ref = m->parser_to_ref; + bk_conn->backend_conn = conn; + bk_conn->backend_sock = conn->fd; + + msg = rspamd_http_connection_copy_msg(session->client_message, &err); + + if (msg == NULL) { + msg_err_session("cannot copy message to send to a mirror %s: %e", + m->name, err); + if (err) { + g_error_free(err); + } + continue; + } + + if (up_name) { + rspamd_http_message_remove_header(msg, "Host"); + rspamd_http_message_add_header(msg, "Host", up_name); + } + rspamd_http_message_remove_header(msg, "Connection"); + rspamd_http_message_add_header(msg, "Connection", "keep-alive"); + + if (msg->url->len == 0) { + msg->url = rspamd_fstring_append(msg->url, "/check", strlen("/check")); + } + + if (m->settings_id != NULL) { + rspamd_http_message_remove_header(msg, "Settings-ID"); + rspamd_http_message_add_header(msg, "Settings-ID", m->settings_id); + } + + /* Add extra headers if specified */ + if (m->extra_headers != NULL) { + ucl_object_iter_t it = NULL; + const ucl_object_t *cur; + const char *key, *value; + + while ((cur = ucl_object_iterate(m->extra_headers, &it, true)) != NULL) { + key = ucl_object_key(cur); + value = ucl_object_tostring(cur); + + if (key != NULL && value != NULL) { + rspamd_http_message_remove_header(msg, key); + rspamd_http_message_add_header(msg, key, value); + } + } + } + + /* Add log tag header based on mirror's configuration */ + rspamd_proxy_add_log_tag_header(msg, session, m->log_tag_type); + + /* Set handlers for the connection */ + conn->error_handler = proxy_backend_mirror_error_handler; + conn->finish_handler = proxy_backend_mirror_finish_handler; + conn->ud = bk_conn; + + if (m->key) { + msg->peer_key = rspamd_pubkey_ref(m->key); + } + + if (m->local || rspamd_inet_address_is_local(keepalive_addr)) { + if (session->fname) { + rspamd_http_message_add_header(msg, "File", session->fname); + } + + msg->method = HTTP_GET; + rspamd_http_connection_write_message_shared(conn, + msg, up_name, + NULL, bk_conn, + bk_conn->timeout); + } + else { + if (session->fname) { + msg->flags &= ~RSPAMD_HTTP_FLAG_SHMEM; + rspamd_http_message_set_body(msg, session->map, session->map_len); + } + + msg->method = HTTP_POST; + + if (m->compress) { + proxy_request_compress(msg); + + if (session->client_milter_conn) { + rspamd_http_message_add_header(msg, "Content-Type", + "application/octet-stream"); + } + } + else { + if (session->client_milter_conn) { + rspamd_http_message_add_header(msg, "Content-Type", + "text/plain"); + } + } + + rspamd_http_connection_write_message(conn, + msg, up_name, NULL, bk_conn, + bk_conn->timeout); + } + + g_ptr_array_add(session->mirror_conns, bk_conn); + REF_RETAIN(session); + msg_info_session("send request to %s (using keepalive)", m->name); + + /* + * We have found the existing keepalive connection, so we can + * process another mirror + */ + continue; + } + } + } + } + + /* Non-keepalive connection */ + bk_conn = rspamd_mempool_alloc0(session->pool, sizeof(*bk_conn)); bk_conn->s = session; @@ -1472,7 +1825,9 @@ proxy_open_mirror_connections(struct rspamd_proxy_session *session) rspamd_http_message_remove_header(msg, "Host"); rspamd_http_message_add_header(msg, "Host", up_name); } - rspamd_http_message_add_header(msg, "Connection", "close"); + rspamd_http_message_remove_header(msg, "Connection"); + rspamd_http_message_add_header(msg, "Connection", + m->keepalive ? "keep-alive" : "close"); if (msg->url->len == 0) { msg->url = rspamd_fstring_append(msg->url, "/check", strlen("/check")); @@ -1483,12 +1838,38 @@ proxy_open_mirror_connections(struct rspamd_proxy_session *session) rspamd_http_message_add_header(msg, "Settings-ID", m->settings_id); } + /* Add extra headers if specified */ + if (m->extra_headers != NULL) { + ucl_object_iter_t it = NULL; + const ucl_object_t *cur; + const char *key, *value; + + while ((cur = ucl_object_iterate(m->extra_headers, &it, true)) != NULL) { + key = ucl_object_key(cur); + value = ucl_object_tostring(cur); + + if (key != NULL && value != NULL) { + rspamd_http_message_remove_header(msg, key); + rspamd_http_message_add_header(msg, key, value); + } + } + } + + /* Add log tag header based on mirror's configuration */ + rspamd_proxy_add_log_tag_header(msg, session, m->log_tag_type); + + unsigned int http_opts = RSPAMD_HTTP_CLIENT_SIMPLE; + + if (m->ssl) { + http_opts |= RSPAMD_HTTP_CLIENT_SSL; + } + bk_conn->backend_conn = rspamd_http_connection_new_client_socket( session->ctx->http_ctx, NULL, proxy_backend_mirror_error_handler, proxy_backend_mirror_finish_handler, - RSPAMD_HTTP_CLIENT_SIMPLE, + http_opts, bk_conn->backend_sock); if (m->key) { @@ -1601,7 +1982,8 @@ proxy_backend_master_error_handler(struct rspamd_http_connection *conn, GError * msg_info_session("abnormally closing connection from backend: %s, error: %e," " retries left: %d", session->master_conn->up ? rspamd_inet_address_to_string_pretty( - rspamd_upstream_addr_cur(session->master_conn->up)) : "self-scan", + rspamd_upstream_addr_cur(session->master_conn->up)) + : "self-scan", err, session->ctx->max_retries - session->retries); rspamd_upstream_fail(bk_conn->up, FALSE, err ? err->message : "unknown"); @@ -1633,7 +2015,8 @@ proxy_backend_master_error_handler(struct rspamd_http_connection *conn, GError * msg_info_session("retry connection to: %s" " retries left: %d", session->master_conn->up ? rspamd_inet_address_to_string( - rspamd_upstream_addr_cur(session->master_conn->up)) : "self-scan", + rspamd_upstream_addr_cur(session->master_conn->up)) + : "self-scan", session->ctx->max_retries - session->retries); } } @@ -1647,7 +2030,9 @@ proxy_backend_master_finish_handler(struct rspamd_http_connection *conn, struct rspamd_proxy_session *session, *nsession; rspamd_fstring_t *reply; const rspamd_ftok_t *orig_ct; + const rspamd_ftok_t *conn_hdr; goffset body_offset = -1; + gboolean is_keepalive = FALSE; session = bk_conn->s; rspamd_http_connection_steal_msg(session->master_conn->backend_conn); @@ -1663,6 +2048,16 @@ proxy_backend_master_finish_handler(struct rspamd_http_connection *conn, rspamd_http_message_remove_header(msg, "Server"); rspamd_http_message_remove_header(msg, "Key"); orig_ct = rspamd_http_message_find_header(msg, "Content-Type"); + + /* Check if we can use keepalive */ + conn_hdr = rspamd_http_message_find_header(msg, "Connection"); + if (conn_hdr) { + if (rspamd_substring_search_caseless(conn_hdr->begin, conn_hdr->len, + "keep-alive", 10) != -1) { + is_keepalive = TRUE; + } + } + rspamd_http_connection_reset(session->master_conn->backend_conn); if (!proxy_backend_parse_results(session, bk_conn, session->ctx->lua_state, @@ -1695,6 +2090,22 @@ proxy_backend_master_finish_handler(struct rspamd_http_connection *conn, rspamd_upstream_ok(bk_conn->up); + /* Handle keepalive for master connection */ + if (is_keepalive && session->use_keepalive && + bk_conn->up && session->ctx->http_ctx) { + /* Store connection in keepalive pool */ + const char *up_name = rspamd_upstream_name(bk_conn->up); + if (up_name) { + rspamd_http_context_prepare_keepalive(session->ctx->http_ctx, + conn, rspamd_upstream_addr_cur(bk_conn->up), + up_name, FALSE); + + /* We'll push to keepalive pool after we're done with the response */ + msg_debug_session("will push master connection to %s to keepalive pool", + up_name); + } + } + if (session->client_milter_conn) { nsession = proxy_session_refresh(session); @@ -1708,6 +2119,20 @@ proxy_backend_master_finish_handler(struct rspamd_http_connection *conn, rspamd_milter_send_task_results(nsession->client_milter_conn, session->master_conn->results, NULL, 0); } + + /* Push to keepalive if needed */ + if (is_keepalive && session->use_keepalive && + bk_conn->up && session->ctx->http_ctx) { + const char *up_name = rspamd_upstream_name(bk_conn->up); + if (up_name) { + rspamd_http_context_push_keepalive(session->ctx->http_ctx, + conn, msg, session->ctx->event_loop); + + /* Mark connection as closed without actually closing it */ + bk_conn->flags |= RSPAMD_BACKEND_CLOSED; + } + } + REF_RELEASE(session); rspamd_http_message_free(msg); } @@ -1723,6 +2148,19 @@ proxy_backend_master_finish_handler(struct rspamd_http_connection *conn, rspamd_http_connection_write_message(session->client_conn, msg, NULL, passed_ct, session, bk_conn->timeout); + + /* Push to keepalive if needed */ + if (is_keepalive && session->use_keepalive && + bk_conn->up && session->ctx->http_ctx) { + const char *up_name = rspamd_upstream_name(bk_conn->up); + if (up_name) { + rspamd_http_context_push_keepalive(session->ctx->http_ctx, + conn, msg, session->ctx->event_loop); + + /* Mark connection as closed without actually closing it */ + bk_conn->flags |= RSPAMD_BACKEND_CLOSED; + } + } } return 0; @@ -1982,6 +2420,9 @@ proxy_send_master_message(struct rspamd_proxy_session *session) /* Remove the original `Connection` header */ rspamd_http_message_remove_header(session->client_message, "Connection"); + /* Set keepalive flag based on backend configuration */ + session->use_keepalive = backend ? backend->keepalive : FALSE; + if (backend == NULL) { /* No backend */ msg_err_session("cannot find upstream for %s", host ? hostbuf : "default"); @@ -2063,14 +2504,21 @@ proxy_send_master_message(struct rspamd_proxy_session *session) if (up_name) { rspamd_http_message_add_header(msg, "Host", up_name); } - rspamd_http_message_add_header(msg, "Connection", "close"); + rspamd_http_message_add_header(msg, "Connection", + backend->keepalive ? "keep-alive" : "close"); + + unsigned int http_opts = RSPAMD_HTTP_CLIENT_SIMPLE; + + if (backend->ssl) { + http_opts |= RSPAMD_HTTP_CLIENT_SSL; + } session->master_conn->backend_conn = rspamd_http_connection_new_client_socket( session->ctx->http_ctx, NULL, proxy_backend_master_error_handler, proxy_backend_master_finish_handler, - RSPAMD_HTTP_CLIENT_SIMPLE, + http_opts, session->master_conn->backend_sock); session->master_conn->flags &= ~RSPAMD_BACKEND_CLOSED; session->master_conn->parser_from_ref = backend->parser_from_ref; @@ -2086,6 +2534,26 @@ proxy_send_master_message(struct rspamd_proxy_session *session) backend->settings_id); } + /* Add extra headers if specified */ + if (backend->extra_headers != NULL) { + ucl_object_iter_t it = NULL; + const ucl_object_t *cur; + const char *key, *value; + + while ((cur = ucl_object_iterate(backend->extra_headers, &it, true)) != NULL) { + key = ucl_object_key(cur); + value = ucl_object_tostring(cur); + + if (key != NULL && value != NULL) { + rspamd_http_message_remove_header(msg, key); + rspamd_http_message_add_header(msg, key, value); + } + } + } + + /* Add log tag header based on backend's configuration */ + rspamd_proxy_add_log_tag_header(msg, session, backend->log_tag_type); + if (backend->local || rspamd_inet_address_is_local( rspamd_upstream_addr_cur( @@ -2206,8 +2674,6 @@ proxy_client_finish_handler(struct rspamd_http_connection *conn, rspamd_http_message_remove_header(msg, "Keep-Alive"); rspamd_http_message_remove_header(msg, "Connection"); rspamd_http_message_remove_header(msg, "Key"); - rspamd_http_message_add_header_len(msg, LOG_TAG_HEADER, session->pool->tag.uid, - strnlen(session->pool->tag.uid, sizeof(session->pool->tag.uid))); proxy_open_mirror_connections(session); rspamd_http_connection_reset(session->client_conn); @@ -2217,7 +2683,8 @@ proxy_client_finish_handler(struct rspamd_http_connection *conn, else { msg_info_session("finished master connection to %s; HTTP code: %d", session->master_conn->up ? rspamd_inet_address_to_string_pretty( - rspamd_upstream_addr_cur(session->master_conn->up)) : "self-scan", + rspamd_upstream_addr_cur(session->master_conn->up)) + : "self-scan", msg->code); proxy_backend_close_connection(session->master_conn); REF_RELEASE(session); |