diff options
-rw-r--r-- | src/libserver/http/http_connection.c | 2 | ||||
-rw-r--r-- | src/rspamd_proxy.c | 262 |
2 files changed, 258 insertions, 6 deletions
diff --git a/src/libserver/http/http_connection.c b/src/libserver/http/http_connection.c index baf37a385..d94f9835e 100644 --- a/src/libserver/http/http_connection.c +++ b/src/libserver/http/http_connection.c @@ -2633,4 +2633,4 @@ void rspamd_http_connection_disable_encryption(struct rspamd_http_connection *co priv->peer_key = NULL; priv->flags &= ~RSPAMD_HTTP_CONN_FLAG_ENCRYPTED; } -}
\ No newline at end of file +} diff --git a/src/rspamd_proxy.c b/src/rspamd_proxy.c index c8c3d5a71..8e69298e6 100644 --- a/src/rspamd_proxy.c +++ b/src/rspamd_proxy.c @@ -97,6 +97,7 @@ struct rspamd_http_upstream { gboolean self_scan; gboolean compress; gboolean ssl; + gboolean keepalive; /* Whether to use keepalive for this upstream */ ucl_object_t *extra_headers; }; @@ -112,6 +113,7 @@ struct rspamd_http_mirror { gboolean local; gboolean compress; gboolean ssl; + gboolean keepalive; /* Whether to use keepalive for this mirror */ ucl_object_t *extra_headers; }; @@ -218,6 +220,7 @@ struct rspamd_proxy_session { enum rspamd_proxy_legacy_support legacy_support; int retries; ref_entry_t ref; + gboolean use_keepalive; /* Whether to use keepalive for this session */ }; static gboolean proxy_send_master_message(struct rspamd_proxy_session *session); @@ -429,9 +432,14 @@ rspamd_proxy_parse_upstream(rspamd_mempool_t *pool, up->ssl = TRUE; } - elt = ucl_object_lookup(obj, "ssl"); + elt = ucl_object_lookup_any(obj, "keepalive", "keep_alive", NULL); if (elt && ucl_object_toboolean(elt)) { - up->ssl = TRUE; + up->keepalive = TRUE; + } + + elt = ucl_object_lookup_any(obj, "keepalive", "keep_alive", NULL); + if (elt && ucl_object_toboolean(elt)) { + up->keepalive = TRUE; } elt = ucl_object_lookup(obj, "hosts"); @@ -935,7 +943,11 @@ proxy_backend_close_connection(struct rspamd_proxy_backend_connection *conn) if (conn->backend_conn) { rspamd_http_connection_reset(conn->backend_conn); rspamd_http_connection_unref(conn->backend_conn); - close(conn->backend_sock); + + if (!(conn->s && conn->s->use_keepalive)) { + /* Only close socket if we're not using keepalive */ + close(conn->backend_sock); + } } conn->flags |= RSPAMD_BACKEND_CLOSED; @@ -1414,6 +1426,8 @@ proxy_backend_mirror_finish_handler(struct rspamd_http_connection *conn, struct rspamd_proxy_backend_connection *bk_conn = conn->ud; struct rspamd_proxy_session *session; const rspamd_ftok_t *orig_ct; + const rspamd_ftok_t *conn_hdr; + gboolean is_keepalive = FALSE; session = bk_conn->s; @@ -1433,6 +1447,36 @@ proxy_backend_mirror_finish_handler(struct rspamd_http_connection *conn, bk_conn->name, msg->code); rspamd_upstream_ok(bk_conn->up); + /* Check if we can use keepalive */ + conn_hdr = rspamd_http_message_find_header(msg, "Connection"); + if (conn_hdr) { + if (rspamd_substring_search_caseless(conn_hdr->begin, conn_hdr->len, + "keep-alive", 10) != -1) { + is_keepalive = TRUE; + } + } + + if (is_keepalive && session->use_keepalive && + bk_conn->up && session->ctx->http_ctx) { + /* Store connection in keepalive pool */ + const char *up_name = rspamd_upstream_name(bk_conn->up); + if (up_name) { + rspamd_http_context_prepare_keepalive(session->ctx->http_ctx, + conn, rspamd_upstream_addr_cur(bk_conn->up), + up_name, FALSE); + rspamd_http_context_push_keepalive(session->ctx->http_ctx, + conn, msg, session->ctx->event_loop); + + msg_debug_session("pushed mirror connection to %s to keepalive pool", + bk_conn->name); + + /* Mark connection as closed without actually closing it */ + bk_conn->flags |= RSPAMD_BACKEND_CLOSED; + REF_RELEASE(bk_conn->s); + return 0; + } + } + proxy_backend_close_connection(bk_conn); REF_RELEASE(bk_conn->s); @@ -1448,6 +1492,7 @@ proxy_open_mirror_connections(struct rspamd_proxy_session *session) struct rspamd_proxy_backend_connection *bk_conn; struct rspamd_http_message *msg; GError *err = NULL; + const rspamd_inet_addr_t *keepalive_addr; coin = rspamd_random_double(); @@ -1459,6 +1504,153 @@ proxy_open_mirror_connections(struct rspamd_proxy_session *session) continue; } + /* Check if we can use keepalive for this mirror */ + if (m->keepalive && session->ctx->http_ctx) { + const char *up_name = NULL; + unsigned int port = 0; + + /* Try to find a keepalive connection */ + if (m->u) { + struct upstream *up = rspamd_upstream_get(m->u, + RSPAMD_UPSTREAM_ROUND_ROBIN, NULL, 0); + if (up) { + up_name = rspamd_upstream_name(up); + port = rspamd_inet_address_get_port(rspamd_upstream_addr_cur(up)); + } + } + + if (up_name) { + keepalive_addr = rspamd_http_context_has_keepalive( + session->ctx->http_ctx, up_name, port, m->ssl); + + if (keepalive_addr) { + /* We found a keepalive connection, use it */ + struct rspamd_http_connection *conn; + + conn = rspamd_http_context_check_keepalive( + session->ctx->http_ctx, + (rspamd_inet_addr_t *) keepalive_addr, + up_name, + m->ssl); + + if (conn) { + /* We have a keepalive connection, set it up */ + bk_conn = rspamd_mempool_alloc0(session->pool, sizeof(*bk_conn)); + bk_conn->s = session; + bk_conn->name = m->name; + bk_conn->timeout = m->timeout; + bk_conn->parser_from_ref = m->parser_from_ref; + bk_conn->parser_to_ref = m->parser_to_ref; + bk_conn->backend_conn = conn; + bk_conn->backend_sock = conn->fd; + + msg = rspamd_http_connection_copy_msg(session->client_message, &err); + + if (msg == NULL) { + msg_err_session("cannot copy message to send to a mirror %s: %e", + m->name, err); + if (err) { + g_error_free(err); + } + continue; + } + + if (up_name) { + rspamd_http_message_remove_header(msg, "Host"); + rspamd_http_message_add_header(msg, "Host", up_name); + } + rspamd_http_message_add_header(msg, "Connection", "keep-alive"); + + if (msg->url->len == 0) { + msg->url = rspamd_fstring_append(msg->url, "/check", strlen("/check")); + } + + if (m->settings_id != NULL) { + rspamd_http_message_remove_header(msg, "Settings-ID"); + rspamd_http_message_add_header(msg, "Settings-ID", m->settings_id); + } + + /* Add extra headers if specified */ + if (m->extra_headers != NULL) { + ucl_object_iter_t it = NULL; + const ucl_object_t *cur; + const char *key, *value; + + while ((cur = ucl_object_iterate(m->extra_headers, &it, true)) != NULL) { + key = ucl_object_key(cur); + value = ucl_object_tostring(cur); + + if (key != NULL && value != NULL) { + rspamd_http_message_remove_header(msg, key); + rspamd_http_message_add_header(msg, key, value); + } + } + } + + /* Set handlers for the connection */ + conn->error_handler = proxy_backend_mirror_error_handler; + conn->finish_handler = proxy_backend_mirror_finish_handler; + conn->ud = bk_conn; + + if (m->key) { + msg->peer_key = rspamd_pubkey_ref(m->key); + } + + if (m->local || rspamd_inet_address_is_local(keepalive_addr)) { + if (session->fname) { + rspamd_http_message_add_header(msg, "File", session->fname); + } + + msg->method = HTTP_GET; + rspamd_http_connection_write_message_shared(conn, + msg, up_name, + NULL, bk_conn, + bk_conn->timeout); + } + else { + if (session->fname) { + msg->flags &= ~RSPAMD_HTTP_FLAG_SHMEM; + rspamd_http_message_set_body(msg, session->map, session->map_len); + } + + msg->method = HTTP_POST; + + if (m->compress) { + proxy_request_compress(msg); + + if (session->client_milter_conn) { + rspamd_http_message_add_header(msg, "Content-Type", + "application/octet-stream"); + } + } + else { + if (session->client_milter_conn) { + rspamd_http_message_add_header(msg, "Content-Type", + "text/plain"); + } + } + + rspamd_http_connection_write_message(conn, + msg, up_name, NULL, bk_conn, + bk_conn->timeout); + } + + g_ptr_array_add(session->mirror_conns, bk_conn); + REF_RETAIN(session); + msg_info_session("send request to %s (using keepalive)", m->name); + + /* + * We have found the existing keepalive connection, so we can + * process another mirror + */ + continue; + } + } + } + } + + /* Non-keepalive connection */ + bk_conn = rspamd_mempool_alloc0(session->pool, sizeof(*bk_conn)); bk_conn->s = session; @@ -1502,7 +1694,8 @@ proxy_open_mirror_connections(struct rspamd_proxy_session *session) rspamd_http_message_remove_header(msg, "Host"); rspamd_http_message_add_header(msg, "Host", up_name); } - rspamd_http_message_add_header(msg, "Connection", "close"); + rspamd_http_message_add_header(msg, "Connection", + m->keepalive ? "keep-alive" : "close"); if (msg->url->len == 0) { msg->url = rspamd_fstring_append(msg->url, "/check", strlen("/check")); @@ -1702,7 +1895,9 @@ proxy_backend_master_finish_handler(struct rspamd_http_connection *conn, struct rspamd_proxy_session *session, *nsession; rspamd_fstring_t *reply; const rspamd_ftok_t *orig_ct; + const rspamd_ftok_t *conn_hdr; goffset body_offset = -1; + gboolean is_keepalive = FALSE; session = bk_conn->s; rspamd_http_connection_steal_msg(session->master_conn->backend_conn); @@ -1718,6 +1913,16 @@ proxy_backend_master_finish_handler(struct rspamd_http_connection *conn, rspamd_http_message_remove_header(msg, "Server"); rspamd_http_message_remove_header(msg, "Key"); orig_ct = rspamd_http_message_find_header(msg, "Content-Type"); + + /* Check if we can use keepalive */ + conn_hdr = rspamd_http_message_find_header(msg, "Connection"); + if (conn_hdr) { + if (rspamd_substring_search_caseless(conn_hdr->begin, conn_hdr->len, + "keep-alive", 10) != -1) { + is_keepalive = TRUE; + } + } + rspamd_http_connection_reset(session->master_conn->backend_conn); if (!proxy_backend_parse_results(session, bk_conn, session->ctx->lua_state, @@ -1750,6 +1955,22 @@ proxy_backend_master_finish_handler(struct rspamd_http_connection *conn, rspamd_upstream_ok(bk_conn->up); + /* Handle keepalive for master connection */ + if (is_keepalive && session->use_keepalive && + bk_conn->up && session->ctx->http_ctx) { + /* Store connection in keepalive pool */ + const char *up_name = rspamd_upstream_name(bk_conn->up); + if (up_name) { + rspamd_http_context_prepare_keepalive(session->ctx->http_ctx, + conn, rspamd_upstream_addr_cur(bk_conn->up), + up_name, FALSE); + + /* We'll push to keepalive pool after we're done with the response */ + msg_debug_session("will push master connection to %s to keepalive pool", + up_name); + } + } + if (session->client_milter_conn) { nsession = proxy_session_refresh(session); @@ -1763,6 +1984,20 @@ proxy_backend_master_finish_handler(struct rspamd_http_connection *conn, rspamd_milter_send_task_results(nsession->client_milter_conn, session->master_conn->results, NULL, 0); } + + /* Push to keepalive if needed */ + if (is_keepalive && session->use_keepalive && + bk_conn->up && session->ctx->http_ctx) { + const char *up_name = rspamd_upstream_name(bk_conn->up); + if (up_name) { + rspamd_http_context_push_keepalive(session->ctx->http_ctx, + conn, msg, session->ctx->event_loop); + + /* Mark connection as closed without actually closing it */ + bk_conn->flags |= RSPAMD_BACKEND_CLOSED; + } + } + REF_RELEASE(session); rspamd_http_message_free(msg); } @@ -1778,6 +2013,19 @@ proxy_backend_master_finish_handler(struct rspamd_http_connection *conn, rspamd_http_connection_write_message(session->client_conn, msg, NULL, passed_ct, session, bk_conn->timeout); + + /* Push to keepalive if needed */ + if (is_keepalive && session->use_keepalive && + bk_conn->up && session->ctx->http_ctx) { + const char *up_name = rspamd_upstream_name(bk_conn->up); + if (up_name) { + rspamd_http_context_push_keepalive(session->ctx->http_ctx, + conn, msg, session->ctx->event_loop); + + /* Mark connection as closed without actually closing it */ + bk_conn->flags |= RSPAMD_BACKEND_CLOSED; + } + } } return 0; @@ -2037,6 +2285,9 @@ proxy_send_master_message(struct rspamd_proxy_session *session) /* Remove the original `Connection` header */ rspamd_http_message_remove_header(session->client_message, "Connection"); + /* Set keepalive flag based on backend configuration */ + session->use_keepalive = backend ? backend->keepalive : FALSE; + if (backend == NULL) { /* No backend */ msg_err_session("cannot find upstream for %s", host ? hostbuf : "default"); @@ -2118,7 +2369,8 @@ proxy_send_master_message(struct rspamd_proxy_session *session) if (up_name) { rspamd_http_message_add_header(msg, "Host", up_name); } - rspamd_http_message_add_header(msg, "Connection", "close"); + rspamd_http_message_add_header(msg, "Connection", + backend->keepalive ? "keep-alive" : "close"); unsigned int http_opts = RSPAMD_HTTP_CLIENT_SIMPLE; |