aboutsummaryrefslogtreecommitdiffstats
path: root/src/rspamd_proxy.c
diff options
context:
space:
mode:
authorVsevolod Stakhov <vsevolod@highsecure.ru>2016-07-18 17:50:54 +0100
committerVsevolod Stakhov <vsevolod@highsecure.ru>2016-07-18 17:51:26 +0100
commit88c07077e25bbd57c441c49cd667678d2492693c (patch)
tree7b0c7d299962268ef15c8055abcf2ef7f85d5c3b /src/rspamd_proxy.c
parentc2d4fc63c8c7834570c855bc7547f3437c6af6b1 (diff)
downloadrspamd-88c07077e25bbd57c441c49cd667678d2492693c.tar.gz
rspamd-88c07077e25bbd57c441c49cd667678d2492693c.zip
[Feature] Implement retransmits for master connection
Diffstat (limited to 'src/rspamd_proxy.c')
-rw-r--r--src/rspamd_proxy.c229
1 files changed, 145 insertions, 84 deletions
diff --git a/src/rspamd_proxy.c b/src/rspamd_proxy.c
index dcf492a24..a726f8f39 100644
--- a/src/rspamd_proxy.c
+++ b/src/rspamd_proxy.c
@@ -33,6 +33,7 @@
/* Rotate keys each minute by default */
#define DEFAULT_ROTATION_TIME 60.0
+#define DEFAULT_RETRIES 5
#define msg_err_session(...) rspamd_default_log_function (G_LOG_LEVEL_CRITICAL, \
session->pool->tag.tagname, session->pool->tag.uid, \
@@ -115,6 +116,8 @@ struct rspamd_proxy_ctx {
lua_State *lua_state;
/* Array of callback functions called on end of scan to compare results */
GArray *cmp_refs;
+ /* Maximum count for retries */
+ guint max_retries;
};
enum rspamd_backend_flags {
@@ -149,13 +152,17 @@ struct rspamd_proxy_session {
gpointer map;
gpointer shmem_ref;
struct rspamd_proxy_backend_connection *master_conn;
+ struct rspamd_http_message *client_message;
GPtrArray *mirror_conns;
gsize map_len;
gint client_sock;
gboolean is_spamc;
+ gint retries;
ref_entry_t ref;
};
+static gboolean proxy_send_master_message (struct rspamd_proxy_session *session);
+
static GQuark
rspamd_proxy_quark (void)
{
@@ -621,6 +628,7 @@ init_rspamd_proxy (struct rspamd_config *cfg)
ctx->cfg = cfg;
ctx->lua_state = cfg->lua_state;
ctx->cmp_refs = g_array_new (FALSE, FALSE, sizeof (gint));
+ ctx->max_retries = DEFAULT_RETRIES;
rspamd_rcl_register_worker_option (cfg,
type,
@@ -675,6 +683,14 @@ init_rspamd_proxy (struct rspamd_config *cfg)
0,
RSPAMD_CL_FLAG_MULTIPLE,
"Compare script to be executed");
+ rspamd_rcl_register_worker_option (cfg,
+ type,
+ "timeout",
+ rspamd_rcl_parse_struct_integer,
+ ctx,
+ G_STRUCT_OFFSET (struct rspamd_proxy_ctx, max_retries),
+ RSPAMD_CL_FLAG_UINT,
+ "Maximum number of retries for master connection");
return ctx;
}
@@ -846,6 +862,7 @@ proxy_session_dtor (struct rspamd_proxy_session *session)
g_ptr_array_free (session->mirror_conns, TRUE);
rspamd_http_message_shmem_unref (session->shmem_ref);
+ rspamd_http_message_unref (session->client_message);
rspamd_inet_address_destroy (session->client_addr);
close (session->client_sock);
rspamd_mempool_delete (session->pool);
@@ -1030,7 +1047,7 @@ proxy_open_mirror_connections (struct rspamd_proxy_session *session)
continue;
}
- msg = rspamd_http_connection_copy_msg (session->client_conn);
+ msg = rspamd_http_connection_copy_msg (session->client_message);
if (msg == NULL) {
msg_err_session ("cannot copy message to send to a mirror %s: %s",
@@ -1107,9 +1124,21 @@ proxy_backend_master_error_handler (struct rspamd_http_connection *conn, GError
msg_info_session ("abnormally closing connection from backend: %s, error: %s",
rspamd_inet_address_to_string (rspamd_upstream_addr (session->master_conn->up)),
err->message);
- /* Terminate session immediately */
- proxy_client_write_error (session, err->code, err->message);
+ session->retries ++;
proxy_backend_close_connection (session->master_conn);
+
+ if (session->ctx->max_retries &&
+ session->retries > session->ctx->max_retries) {
+ msg_err_session ("cannot connect to upstream, maximum retries "
+ "has been reached: %d", session->retries);
+ /* Terminate session immediately */
+ proxy_client_write_error (session, err->code, err->message);
+ }
+ else {
+ if (!proxy_send_master_message (session)) {
+ proxy_client_write_error (session, err->code, err->message);
+ }
+ }
}
static gint
@@ -1154,6 +1183,110 @@ proxy_backend_master_finish_handler (struct rspamd_http_connection *conn,
return 0;
}
+static gboolean
+proxy_send_master_message (struct rspamd_proxy_session *session)
+{
+ struct rspamd_http_message *msg;
+ struct rspamd_http_upstream *backend = NULL;
+ const rspamd_ftok_t *host;
+ gchar hostbuf[512];
+
+ host = rspamd_http_message_find_header (session->client_message, "Host");
+
+ if (host == NULL) {
+ backend = session->ctx->default_upstream;
+ }
+ else {
+ rspamd_strlcpy (hostbuf, host->begin, MIN(host->len + 1, sizeof (hostbuf)));
+ backend = g_hash_table_lookup (session->ctx->upstreams, hostbuf);
+
+ if (backend == NULL) {
+ backend = session->ctx->default_upstream;
+ }
+ }
+
+ if (backend == NULL) {
+ /* No backend */
+ msg_err_session ("cannot find upstream for %s", host ? hostbuf : "default");
+ goto err;
+ }
+ else {
+ retry:
+ if (session->ctx->max_retries &&
+ session->retries > session->ctx->max_retries) {
+ msg_err_session ("cannot connect to upstream, maximum retries "
+ "has been reached: %d", session->retries);
+ goto err;
+ }
+
+ session->master_conn->up = rspamd_upstream_get (backend->u,
+ RSPAMD_UPSTREAM_ROUND_ROBIN, NULL, 0);
+ session->master_conn->io_tv = &backend->io_tv;
+
+ if (session->master_conn->up == NULL) {
+ msg_err_session ("cannot select upstream for %s",
+ host ? hostbuf : "default");
+ goto err;
+ }
+
+ session->master_conn->backend_sock = rspamd_inet_address_connect (
+ rspamd_upstream_addr (session->master_conn->up),
+ SOCK_STREAM, TRUE);
+
+ if (session->master_conn->backend_sock == -1) {
+ msg_err_session ("cannot connect upstream: %s(%s)",
+ host ? hostbuf : "default",
+ rspamd_inet_address_to_string (rspamd_upstream_addr (
+ session->master_conn->up)));
+ rspamd_upstream_fail (session->master_conn->up);
+ session->retries ++;
+ goto retry;
+ }
+
+ session->master_conn->backend_conn = rspamd_http_connection_new (
+ NULL,
+ proxy_backend_master_error_handler,
+ proxy_backend_master_finish_handler,
+ RSPAMD_HTTP_CLIENT_SIMPLE,
+ RSPAMD_HTTP_CLIENT,
+ session->ctx->keys_cache,
+ NULL);
+ session->master_conn->parser_from_ref = backend->parser_from_ref;
+ session->master_conn->parser_to_ref = backend->parser_to_ref;
+
+ msg = rspamd_http_connection_copy_msg (session->client_message);
+ rspamd_http_connection_set_key (session->master_conn->backend_conn,
+ session->ctx->local_key);
+ msg->peer_key = rspamd_pubkey_ref (backend->key);
+
+ if (backend->local ||
+ rspamd_inet_address_is_local (
+ rspamd_upstream_addr (session->master_conn->up))) {
+ rspamd_http_connection_write_message_shared (
+ session->master_conn->backend_conn,
+ msg, NULL, NULL, session->master_conn,
+ session->master_conn->backend_sock,
+ session->master_conn->io_tv, session->ctx->ev_base);
+ }
+ else {
+ rspamd_http_connection_write_message (
+ session->master_conn->backend_conn,
+ msg, NULL, NULL, session->master_conn,
+ session->master_conn->backend_sock,
+ session->master_conn->io_tv, session->ctx->ev_base);
+ }
+ }
+
+ return TRUE;
+
+err:
+ rspamd_http_connection_steal_msg (session->client_conn);
+ rspamd_http_connection_reset (session->client_conn);
+ proxy_client_write_error (session, 404, "Backend not found");
+
+ return FALSE;
+}
+
static void
proxy_client_error_handler (struct rspamd_http_connection *conn, GError *err)
{
@@ -1171,16 +1304,12 @@ proxy_client_finish_handler (struct rspamd_http_connection *conn,
struct rspamd_http_message *msg)
{
struct rspamd_proxy_session *session = conn->ud;
- struct rspamd_http_upstream *backend = NULL;
- const rspamd_ftok_t *host;
- gchar hostbuf[512];
if (!session->master_conn) {
session->master_conn = rspamd_mempool_alloc0 (session->pool,
sizeof (*session->master_conn));
session->master_conn->s = session;
session->master_conn->name = "master";
- host = rspamd_http_message_find_header (msg, "Host");
/* Reset spamc legacy */
if (msg->method >= HTTP_SYMBOLS) {
@@ -1193,88 +1322,20 @@ proxy_client_finish_handler (struct rspamd_http_connection *conn,
msg->url = rspamd_fstring_append (msg->url, "/check", strlen ("/check"));
}
- if (host == NULL) {
- backend = session->ctx->default_upstream;
- }
- else {
- rspamd_strlcpy (hostbuf, host->begin, MIN(host->len + 1, sizeof (hostbuf)));
- backend = g_hash_table_lookup (session->ctx->upstreams, hostbuf);
-
- if (backend == NULL) {
- backend = session->ctx->default_upstream;
- }
- }
-
- if (backend == NULL) {
- /* No backend */
- msg_err_session ("cannot find upstream for %s", host ? hostbuf : "default");
+ if (!proxy_check_file (msg, session)) {
goto err;
}
- else {
- session->master_conn->up = rspamd_upstream_get (backend->u,
- RSPAMD_UPSTREAM_ROUND_ROBIN, NULL, 0);
- session->master_conn->io_tv = &backend->io_tv;
- if (session->master_conn->up == NULL) {
- msg_err_session ("cannot select upstream for %s", host ? hostbuf : "default");
- goto err;
- }
-
- session->master_conn->backend_sock = rspamd_inet_address_connect (
- rspamd_upstream_addr (session->master_conn->up),
- SOCK_STREAM, TRUE);
-
- if (session->master_conn->backend_sock == -1) {
- msg_err_session ("cannot connect upstream: %s(%s)",
- host ? hostbuf : "default",
- rspamd_inet_address_to_string (rspamd_upstream_addr (session->master_conn->up)));
- rspamd_upstream_fail (session->master_conn->up);
- goto err;
- }
+ session->client_message = rspamd_http_connection_steal_msg (
+ session->client_conn);
+ rspamd_http_message_remove_header (msg, "Content-Length");
+ rspamd_http_message_remove_header (msg, "Key");
- if (!proxy_check_file (msg, session)) {
- goto err;
- }
+ proxy_open_mirror_connections (session);
+ rspamd_http_connection_reset (session->client_conn);
+ session->shmem_ref = rspamd_http_message_shmem_ref (session->client_message);
- proxy_open_mirror_connections (session);
- rspamd_http_connection_steal_msg (session->client_conn);
- rspamd_http_message_remove_header (msg, "Content-Length");
- rspamd_http_message_remove_header (msg, "Key");
- rspamd_http_connection_reset (session->client_conn);
- session->shmem_ref = rspamd_http_message_shmem_ref (msg);
-
- session->master_conn->backend_conn = rspamd_http_connection_new (
- NULL,
- proxy_backend_master_error_handler,
- proxy_backend_master_finish_handler,
- RSPAMD_HTTP_CLIENT_SIMPLE,
- RSPAMD_HTTP_CLIENT,
- session->ctx->keys_cache,
- NULL);
- session->master_conn->parser_from_ref = backend->parser_from_ref;
- session->master_conn->parser_to_ref = backend->parser_to_ref;
-
- rspamd_http_connection_set_key (session->master_conn->backend_conn,
- session->ctx->local_key);
- msg->peer_key = rspamd_pubkey_ref (backend->key);
-
- if (backend->local ||
- rspamd_inet_address_is_local (
- rspamd_upstream_addr (session->master_conn->up))) {
- rspamd_http_connection_write_message_shared (
- session->master_conn->backend_conn,
- msg, NULL, NULL, session->master_conn,
- session->master_conn->backend_sock,
- session->master_conn->io_tv, session->ctx->ev_base);
- }
- else {
- rspamd_http_connection_write_message (
- session->master_conn->backend_conn,
- msg, NULL, NULL, session->master_conn,
- session->master_conn->backend_sock,
- session->master_conn->io_tv, session->ctx->ev_base);
- }
- }
+ proxy_send_master_message (session);
}
else {
msg_info_session ("finished master connection");