From 9b9aa6efc85d4a71af2a287bbcdd4fccea5255be Mon Sep 17 00:00:00 2001 From: Vsevolod Stakhov Date: Thu, 3 Dec 2015 18:51:21 +0000 Subject: [PATCH] Rework upstreams library Now each address has its own errors count, so rspamd will prefer upstream addrs with no errors to addrs with errors. This might help to resolve issues on systems where ipv6 does not work. --- src/libutil/upstream.c | 106 ++++++++++++++++++++++++------------ test/rspamd_upstream_test.c | 58 +++++++++++++------- 2 files changed, 111 insertions(+), 53 deletions(-) diff --git a/src/libutil/upstream.c b/src/libutil/upstream.c index 9eecb85cf..9044d0633 100644 --- a/src/libutil/upstream.c +++ b/src/libutil/upstream.c @@ -36,6 +36,11 @@ struct upstream_inet_addr_entry { struct upstream_inet_addr_entry *next; }; +struct upstream_addr_elt { + rspamd_inet_addr_t *addr; + guint errors; +}; + struct upstream { guint weight; guint cur_weight; @@ -51,7 +56,7 @@ struct upstream { struct upstream_ctx *ctx; struct { - GPtrArray *addr; + GPtrArray *addr; /* struct upstream_addr_elt */ guint cur; } addrs; @@ -192,12 +197,12 @@ rspamd_upstream_af_to_weight (const rspamd_inet_addr_t *addr) static gint rspamd_upstream_addr_sort_func (gconstpointer a, gconstpointer b) { - const rspamd_inet_addr_t **ip1 = (const rspamd_inet_addr_t **)a, - **ip2 = (const rspamd_inet_addr_t **)b; + const struct upstream_addr_elt **ip1 = (const struct upstream_addr_elt **)a, + **ip2 = (const struct upstream_addr_elt **)b; gint w1, w2; - w1 = rspamd_upstream_af_to_weight (*ip1); - w2 = rspamd_upstream_af_to_weight (*ip2); + w1 = rspamd_upstream_af_to_weight ((*ip1)->addr); + w2 = rspamd_upstream_af_to_weight ((*ip2)->addr); return w2 - w1; } @@ -211,6 +216,15 @@ rspamd_upstream_set_active (struct upstream_list *ls, struct upstream *up) rspamd_mutex_unlock (ls->lock); } +static void +rspamd_upstream_addr_elt_dtor (gpointer a) +{ + struct upstream_addr_elt *elt = a; + + rspamd_inet_address_destroy (elt->addr); + g_slice_free1 (sizeof (*elt), elt); +} + static void rspamd_upstream_update_addrs (struct upstream *up) { @@ -218,6 +232,7 @@ rspamd_upstream_update_addrs (struct upstream *up) guint addr_cnt; struct upstream_inet_addr_entry *cur, *tmp; GPtrArray *new_addrs; + struct upstream_addr_elt *addr_elt; /* * We need first of all get the saved port, since DNS gives us no @@ -226,8 +241,8 @@ rspamd_upstream_update_addrs (struct upstream *up) rspamd_mutex_lock (up->lock); if (up->addrs.addr->len > 0 && up->new_addrs) { - port = rspamd_inet_address_get_port (g_ptr_array_index (up->addrs.addr, - 0)); + addr_elt = g_ptr_array_index (up->addrs.addr, 0); + port = rspamd_inet_address_get_port (addr_elt->addr); /* Free old addresses */ g_ptr_array_free (up->addrs.addr, TRUE); @@ -237,13 +252,15 @@ rspamd_upstream_update_addrs (struct upstream *up) LL_FOREACH (up->new_addrs, cur) { addr_cnt++; } - new_addrs = g_ptr_array_new_full (addr_cnt, - (GDestroyNotify) rspamd_inet_address_destroy); + new_addrs = g_ptr_array_new_full (addr_cnt, rspamd_upstream_addr_elt_dtor); /* Copy addrs back */ LL_FOREACH (up->new_addrs, cur) { rspamd_inet_address_set_port (cur->addr, port); - g_ptr_array_add (new_addrs, cur->addr); + addr_elt = g_slice_alloc (sizeof (*addr_elt)); + addr_elt->addr = cur->addr; + addr_elt->errors = 0; + g_ptr_array_add (new_addrs, addr_elt); } up->addrs.cur = 0; @@ -363,6 +380,7 @@ rspamd_upstream_fail (struct upstream *up) struct timeval tv; gdouble error_rate, max_error_rate; gint msec_last, msec_cur; + struct upstream_addr_elt *addr_elt; gettimeofday (&tv, NULL); @@ -392,17 +410,31 @@ rspamd_upstream_fail (struct upstream *up) } } } + + /* Also increase count of errors for this specific address */ + if (up->addrs.addr) { + addr_elt = g_ptr_array_index (up->addrs.addr, up->addrs.cur); + addr_elt->errors ++; + } + rspamd_mutex_unlock (up->lock); } void rspamd_upstream_ok (struct upstream *up) { + struct upstream_addr_elt *addr_elt; + rspamd_mutex_lock (up->lock); if (up->errors > 0 && up->active_idx != -1) { /* We touch upstream if and only if it is active */ up->errors = 0; rspamd_upstream_set_active (up->ls, up); + + if (up->addrs.addr) { + addr_elt = g_ptr_array_index (up->addrs.addr, up->addrs.cur); + addr_elt->errors = 0; + } } rspamd_mutex_unlock (up->lock); @@ -470,30 +502,18 @@ rspamd_upstream_dtor (struct upstream *up) rspamd_inet_addr_t* rspamd_upstream_addr (struct upstream *up) { - gint idx, next_idx, w1, w2; - /* - * We know that addresses are sorted in the way that ipv4 addresses come - * first. Therefore, we select only ipv4 addresses if they exist, since - * many systems now has poorly supported ipv6 - */ - idx = up->addrs.cur; - next_idx = (idx + 1) % up->addrs.addr->len; - w1 = rspamd_upstream_af_to_weight (g_ptr_array_index (up->addrs.addr, idx)); - w2 = rspamd_upstream_af_to_weight (g_ptr_array_index (up->addrs.addr, - next_idx)); - - /* - * We don't care about the exact priorities, but we prefer ipv4/unix - * addresses before any ipv6 addresses - */ - if (!w1 || w2) { + guint idx, next_idx; + struct upstream_addr_elt *e1, *e2; + + do { + idx = up->addrs.cur; + next_idx = (idx + 1) % up->addrs.addr->len; + e1 = g_ptr_array_index (up->addrs.addr, idx); + e2 = g_ptr_array_index (up->addrs.addr, next_idx); up->addrs.cur = next_idx; - } - else { - up->addrs.cur = 0; - } + } while (e2->errors > e1->errors); - return g_ptr_array_index (up->addrs.addr, up->addrs.cur); + return e2->addr; } const gchar* @@ -507,15 +527,26 @@ rspamd_upstreams_add_upstream (struct upstream_list *ups, const gchar *str, guint16 def_port, void *data) { struct upstream *up; + GPtrArray *addrs; + guint i; + rspamd_inet_addr_t *addr; up = g_slice_alloc0 (sizeof (*up)); - if (!rspamd_parse_host_port_priority (str, &up->addrs.addr, + if (!rspamd_parse_host_port_priority (str, &addrs, &up->weight, &up->name, def_port, NULL)) { g_slice_free1 (sizeof (*up), up); return FALSE; } + else { + for (i = 0; i < addrs->len; i ++) { + addr = g_ptr_array_index (addrs, i); + rspamd_upstream_add_addr (up, rspamd_inet_address_copy (addr)); + } + + g_ptr_array_free (addrs, TRUE); + } g_ptr_array_add (ups->ups, up); up->ud = data; @@ -537,10 +568,17 @@ rspamd_upstreams_add_upstream (struct upstream_list *ups, gboolean rspamd_upstream_add_addr (struct upstream *up, rspamd_inet_addr_t *addr) { + struct upstream_addr_elt *elt; /* * XXX: slow and inefficient */ - g_ptr_array_add (up->addrs.addr, addr); + if (up->addrs.addr == NULL) { + up->addrs.addr = g_ptr_array_new_full (8, rspamd_upstream_addr_elt_dtor); + } + + elt = g_slice_alloc0 (sizeof (*elt)); + elt->addr = addr; + g_ptr_array_add (up->addrs.addr, elt); g_ptr_array_sort (up->addrs.addr, rspamd_upstream_addr_sort_func); return TRUE; diff --git a/test/rspamd_upstream_test.c b/test/rspamd_upstream_test.c index a446cc05d..61dcd02e9 100644 --- a/test/rspamd_upstream_test.c +++ b/test/rspamd_upstream_test.c @@ -84,6 +84,45 @@ rspamd_upstream_test_func (void) resolver = dns_resolver_init (NULL, ev_base, cfg); rspamd_upstreams_library_config (cfg, cfg->ups_ctx, ev_base, resolver->r); + /* + * Test v4/v6 priorities + */ + nls = rspamd_upstreams_create (cfg->ups_ctx); + g_assert (rspamd_upstreams_add_upstream (nls, "127.0.0.1", 0, NULL)); + up = rspamd_upstream_get (nls, RSPAMD_UPSTREAM_RANDOM, NULL, 0); + rspamd_parse_inet_address (&paddr, "127.0.0.2", 0); + g_assert (rspamd_upstream_add_addr (up, paddr)); + rspamd_parse_inet_address (&paddr, "::1", 0); + g_assert (rspamd_upstream_add_addr (up, paddr)); + /* Rewind to start */ + addr = rspamd_upstream_addr (up); + addr = rspamd_upstream_addr (up); + /* cur should be zero here */ + addr = rspamd_upstream_addr (up); + next_addr = rspamd_upstream_addr (up); + g_assert (rspamd_inet_address_get_af (addr) == AF_INET); + g_assert (rspamd_inet_address_get_af (next_addr) == AF_INET); + next_addr = rspamd_upstream_addr (up); + g_assert (rspamd_inet_address_get_af (next_addr) == AF_INET6); + next_addr = rspamd_upstream_addr (up); + g_assert (rspamd_inet_address_get_af (next_addr) == AF_INET); + next_addr = rspamd_upstream_addr (up); + g_assert (rspamd_inet_address_get_af (next_addr) == AF_INET); + next_addr = rspamd_upstream_addr (up); + g_assert (rspamd_inet_address_get_af (next_addr) == AF_INET6); + /* Test errors with IPv6 */ + rspamd_upstream_fail (up); + /* Now we should have merely IPv4 addresses in rotation */ + addr = rspamd_upstream_addr (up); + for (i = 0; i < 256; i++) { + next_addr = rspamd_upstream_addr (up); + g_assert (rspamd_inet_address_get_af (addr) == AF_INET); + g_assert (rspamd_inet_address_get_af (next_addr) == AF_INET); + g_assert (rspamd_inet_address_compare (addr, next_addr) != 0); + addr = next_addr; + } + rspamd_upstreams_destroy (nls); + ls = rspamd_upstreams_create (cfg->ups_ctx); g_assert (rspamd_upstreams_parse_line (ls, test_upstream_list, 443, NULL)); g_assert (rspamd_upstreams_count (ls) == 3); @@ -128,25 +167,6 @@ rspamd_upstream_test_func (void) rspamd_upstreams_destroy (nls); - /* - * Test v4/v6 priorities - */ - nls = rspamd_upstreams_create (cfg->ups_ctx); - g_assert (rspamd_upstreams_add_upstream (nls, "127.0.0.1", 0, NULL)); - up = rspamd_upstream_get (nls, RSPAMD_UPSTREAM_RANDOM, NULL, 0); - rspamd_parse_inet_address (&paddr, "127.0.0.2", 0); - g_assert (rspamd_upstream_add_addr (up, paddr)); - rspamd_parse_inet_address (&paddr, "::1", 0); - g_assert (rspamd_upstream_add_addr (up, paddr)); - addr = rspamd_upstream_addr (up); - for (i = 0; i < 256; i ++) { - next_addr = rspamd_upstream_addr (up); - g_assert (rspamd_inet_address_get_af (addr) == AF_INET); - g_assert (rspamd_inet_address_get_af (next_addr) == AF_INET); - g_assert (rspamd_inet_address_compare (addr, next_addr) != 0); - addr = next_addr; - } - rspamd_upstreams_destroy (nls); /* Upstream fail test */ evtimer_set (&ev, rspamd_upstream_timeout_handler, resolver); -- 2.39.5