From 88c07077e25bbd57c441c49cd667678d2492693c Mon Sep 17 00:00:00 2001 From: Vsevolod Stakhov Date: Mon, 18 Jul 2016 17:50:54 +0100 Subject: [PATCH] [Feature] Implement retransmits for master connection --- src/rspamd_proxy.c | 229 ++++++++++++++++++++++++++++----------------- 1 file changed, 145 insertions(+), 84 deletions(-) diff --git a/src/rspamd_proxy.c b/src/rspamd_proxy.c index dcf492a24..a726f8f39 100644 --- a/src/rspamd_proxy.c +++ b/src/rspamd_proxy.c @@ -33,6 +33,7 @@ /* Rotate keys each minute by default */ #define DEFAULT_ROTATION_TIME 60.0 +#define DEFAULT_RETRIES 5 #define msg_err_session(...) rspamd_default_log_function (G_LOG_LEVEL_CRITICAL, \ session->pool->tag.tagname, session->pool->tag.uid, \ @@ -115,6 +116,8 @@ struct rspamd_proxy_ctx { lua_State *lua_state; /* Array of callback functions called on end of scan to compare results */ GArray *cmp_refs; + /* Maximum count for retries */ + guint max_retries; }; enum rspamd_backend_flags { @@ -149,13 +152,17 @@ struct rspamd_proxy_session { gpointer map; gpointer shmem_ref; struct rspamd_proxy_backend_connection *master_conn; + struct rspamd_http_message *client_message; GPtrArray *mirror_conns; gsize map_len; gint client_sock; gboolean is_spamc; + gint retries; ref_entry_t ref; }; +static gboolean proxy_send_master_message (struct rspamd_proxy_session *session); + static GQuark rspamd_proxy_quark (void) { @@ -621,6 +628,7 @@ init_rspamd_proxy (struct rspamd_config *cfg) ctx->cfg = cfg; ctx->lua_state = cfg->lua_state; ctx->cmp_refs = g_array_new (FALSE, FALSE, sizeof (gint)); + ctx->max_retries = DEFAULT_RETRIES; rspamd_rcl_register_worker_option (cfg, type, @@ -675,6 +683,14 @@ init_rspamd_proxy (struct rspamd_config *cfg) 0, RSPAMD_CL_FLAG_MULTIPLE, "Compare script to be executed"); + rspamd_rcl_register_worker_option (cfg, + type, + "timeout", + rspamd_rcl_parse_struct_integer, + ctx, + G_STRUCT_OFFSET (struct rspamd_proxy_ctx, max_retries), + RSPAMD_CL_FLAG_UINT, + "Maximum number of retries for master connection"); return ctx; } @@ -846,6 +862,7 @@ proxy_session_dtor (struct rspamd_proxy_session *session) g_ptr_array_free (session->mirror_conns, TRUE); rspamd_http_message_shmem_unref (session->shmem_ref); + rspamd_http_message_unref (session->client_message); rspamd_inet_address_destroy (session->client_addr); close (session->client_sock); rspamd_mempool_delete (session->pool); @@ -1030,7 +1047,7 @@ proxy_open_mirror_connections (struct rspamd_proxy_session *session) continue; } - msg = rspamd_http_connection_copy_msg (session->client_conn); + msg = rspamd_http_connection_copy_msg (session->client_message); if (msg == NULL) { msg_err_session ("cannot copy message to send to a mirror %s: %s", @@ -1107,9 +1124,21 @@ proxy_backend_master_error_handler (struct rspamd_http_connection *conn, GError msg_info_session ("abnormally closing connection from backend: %s, error: %s", rspamd_inet_address_to_string (rspamd_upstream_addr (session->master_conn->up)), err->message); - /* Terminate session immediately */ - proxy_client_write_error (session, err->code, err->message); + session->retries ++; proxy_backend_close_connection (session->master_conn); + + if (session->ctx->max_retries && + session->retries > session->ctx->max_retries) { + msg_err_session ("cannot connect to upstream, maximum retries " + "has been reached: %d", session->retries); + /* Terminate session immediately */ + proxy_client_write_error (session, err->code, err->message); + } + else { + if (!proxy_send_master_message (session)) { + proxy_client_write_error (session, err->code, err->message); + } + } } static gint @@ -1154,6 +1183,110 @@ proxy_backend_master_finish_handler (struct rspamd_http_connection *conn, return 0; } +static gboolean +proxy_send_master_message (struct rspamd_proxy_session *session) +{ + struct rspamd_http_message *msg; + struct rspamd_http_upstream *backend = NULL; + const rspamd_ftok_t *host; + gchar hostbuf[512]; + + host = rspamd_http_message_find_header (session->client_message, "Host"); + + if (host == NULL) { + backend = session->ctx->default_upstream; + } + else { + rspamd_strlcpy (hostbuf, host->begin, MIN(host->len + 1, sizeof (hostbuf))); + backend = g_hash_table_lookup (session->ctx->upstreams, hostbuf); + + if (backend == NULL) { + backend = session->ctx->default_upstream; + } + } + + if (backend == NULL) { + /* No backend */ + msg_err_session ("cannot find upstream for %s", host ? hostbuf : "default"); + goto err; + } + else { + retry: + if (session->ctx->max_retries && + session->retries > session->ctx->max_retries) { + msg_err_session ("cannot connect to upstream, maximum retries " + "has been reached: %d", session->retries); + goto err; + } + + session->master_conn->up = rspamd_upstream_get (backend->u, + RSPAMD_UPSTREAM_ROUND_ROBIN, NULL, 0); + session->master_conn->io_tv = &backend->io_tv; + + if (session->master_conn->up == NULL) { + msg_err_session ("cannot select upstream for %s", + host ? hostbuf : "default"); + goto err; + } + + session->master_conn->backend_sock = rspamd_inet_address_connect ( + rspamd_upstream_addr (session->master_conn->up), + SOCK_STREAM, TRUE); + + if (session->master_conn->backend_sock == -1) { + msg_err_session ("cannot connect upstream: %s(%s)", + host ? hostbuf : "default", + rspamd_inet_address_to_string (rspamd_upstream_addr ( + session->master_conn->up))); + rspamd_upstream_fail (session->master_conn->up); + session->retries ++; + goto retry; + } + + session->master_conn->backend_conn = rspamd_http_connection_new ( + NULL, + proxy_backend_master_error_handler, + proxy_backend_master_finish_handler, + RSPAMD_HTTP_CLIENT_SIMPLE, + RSPAMD_HTTP_CLIENT, + session->ctx->keys_cache, + NULL); + session->master_conn->parser_from_ref = backend->parser_from_ref; + session->master_conn->parser_to_ref = backend->parser_to_ref; + + msg = rspamd_http_connection_copy_msg (session->client_message); + rspamd_http_connection_set_key (session->master_conn->backend_conn, + session->ctx->local_key); + msg->peer_key = rspamd_pubkey_ref (backend->key); + + if (backend->local || + rspamd_inet_address_is_local ( + rspamd_upstream_addr (session->master_conn->up))) { + rspamd_http_connection_write_message_shared ( + session->master_conn->backend_conn, + msg, NULL, NULL, session->master_conn, + session->master_conn->backend_sock, + session->master_conn->io_tv, session->ctx->ev_base); + } + else { + rspamd_http_connection_write_message ( + session->master_conn->backend_conn, + msg, NULL, NULL, session->master_conn, + session->master_conn->backend_sock, + session->master_conn->io_tv, session->ctx->ev_base); + } + } + + return TRUE; + +err: + rspamd_http_connection_steal_msg (session->client_conn); + rspamd_http_connection_reset (session->client_conn); + proxy_client_write_error (session, 404, "Backend not found"); + + return FALSE; +} + static void proxy_client_error_handler (struct rspamd_http_connection *conn, GError *err) { @@ -1171,16 +1304,12 @@ proxy_client_finish_handler (struct rspamd_http_connection *conn, struct rspamd_http_message *msg) { struct rspamd_proxy_session *session = conn->ud; - struct rspamd_http_upstream *backend = NULL; - const rspamd_ftok_t *host; - gchar hostbuf[512]; if (!session->master_conn) { session->master_conn = rspamd_mempool_alloc0 (session->pool, sizeof (*session->master_conn)); session->master_conn->s = session; session->master_conn->name = "master"; - host = rspamd_http_message_find_header (msg, "Host"); /* Reset spamc legacy */ if (msg->method >= HTTP_SYMBOLS) { @@ -1193,88 +1322,20 @@ proxy_client_finish_handler (struct rspamd_http_connection *conn, msg->url = rspamd_fstring_append (msg->url, "/check", strlen ("/check")); } - if (host == NULL) { - backend = session->ctx->default_upstream; - } - else { - rspamd_strlcpy (hostbuf, host->begin, MIN(host->len + 1, sizeof (hostbuf))); - backend = g_hash_table_lookup (session->ctx->upstreams, hostbuf); - - if (backend == NULL) { - backend = session->ctx->default_upstream; - } - } - - if (backend == NULL) { - /* No backend */ - msg_err_session ("cannot find upstream for %s", host ? hostbuf : "default"); + if (!proxy_check_file (msg, session)) { goto err; } - else { - session->master_conn->up = rspamd_upstream_get (backend->u, - RSPAMD_UPSTREAM_ROUND_ROBIN, NULL, 0); - session->master_conn->io_tv = &backend->io_tv; - if (session->master_conn->up == NULL) { - msg_err_session ("cannot select upstream for %s", host ? hostbuf : "default"); - goto err; - } - - session->master_conn->backend_sock = rspamd_inet_address_connect ( - rspamd_upstream_addr (session->master_conn->up), - SOCK_STREAM, TRUE); - - if (session->master_conn->backend_sock == -1) { - msg_err_session ("cannot connect upstream: %s(%s)", - host ? hostbuf : "default", - rspamd_inet_address_to_string (rspamd_upstream_addr (session->master_conn->up))); - rspamd_upstream_fail (session->master_conn->up); - goto err; - } + session->client_message = rspamd_http_connection_steal_msg ( + session->client_conn); + rspamd_http_message_remove_header (msg, "Content-Length"); + rspamd_http_message_remove_header (msg, "Key"); - if (!proxy_check_file (msg, session)) { - goto err; - } + proxy_open_mirror_connections (session); + rspamd_http_connection_reset (session->client_conn); + session->shmem_ref = rspamd_http_message_shmem_ref (session->client_message); - proxy_open_mirror_connections (session); - rspamd_http_connection_steal_msg (session->client_conn); - rspamd_http_message_remove_header (msg, "Content-Length"); - rspamd_http_message_remove_header (msg, "Key"); - rspamd_http_connection_reset (session->client_conn); - session->shmem_ref = rspamd_http_message_shmem_ref (msg); - - session->master_conn->backend_conn = rspamd_http_connection_new ( - NULL, - proxy_backend_master_error_handler, - proxy_backend_master_finish_handler, - RSPAMD_HTTP_CLIENT_SIMPLE, - RSPAMD_HTTP_CLIENT, - session->ctx->keys_cache, - NULL); - session->master_conn->parser_from_ref = backend->parser_from_ref; - session->master_conn->parser_to_ref = backend->parser_to_ref; - - rspamd_http_connection_set_key (session->master_conn->backend_conn, - session->ctx->local_key); - msg->peer_key = rspamd_pubkey_ref (backend->key); - - if (backend->local || - rspamd_inet_address_is_local ( - rspamd_upstream_addr (session->master_conn->up))) { - rspamd_http_connection_write_message_shared ( - session->master_conn->backend_conn, - msg, NULL, NULL, session->master_conn, - session->master_conn->backend_sock, - session->master_conn->io_tv, session->ctx->ev_base); - } - else { - rspamd_http_connection_write_message ( - session->master_conn->backend_conn, - msg, NULL, NULL, session->master_conn, - session->master_conn->backend_sock, - session->master_conn->io_tv, session->ctx->ev_base); - } - } + proxy_send_master_message (session); } else { msg_info_session ("finished master connection"); -- 2.39.5