diff options
-rw-r--r-- | src/libutil/upstream.c | 106 | ||||
-rw-r--r-- | test/rspamd_upstream_test.c | 58 |
2 files changed, 111 insertions, 53 deletions
diff --git a/src/libutil/upstream.c b/src/libutil/upstream.c index 9eecb85cf..9044d0633 100644 --- a/src/libutil/upstream.c +++ b/src/libutil/upstream.c @@ -36,6 +36,11 @@ struct upstream_inet_addr_entry { struct upstream_inet_addr_entry *next; }; +struct upstream_addr_elt { + rspamd_inet_addr_t *addr; + guint errors; +}; + struct upstream { guint weight; guint cur_weight; @@ -51,7 +56,7 @@ struct upstream { struct upstream_ctx *ctx; struct { - GPtrArray *addr; + GPtrArray *addr; /* struct upstream_addr_elt */ guint cur; } addrs; @@ -192,12 +197,12 @@ rspamd_upstream_af_to_weight (const rspamd_inet_addr_t *addr) static gint rspamd_upstream_addr_sort_func (gconstpointer a, gconstpointer b) { - const rspamd_inet_addr_t **ip1 = (const rspamd_inet_addr_t **)a, - **ip2 = (const rspamd_inet_addr_t **)b; + const struct upstream_addr_elt **ip1 = (const struct upstream_addr_elt **)a, + **ip2 = (const struct upstream_addr_elt **)b; gint w1, w2; - w1 = rspamd_upstream_af_to_weight (*ip1); - w2 = rspamd_upstream_af_to_weight (*ip2); + w1 = rspamd_upstream_af_to_weight ((*ip1)->addr); + w2 = rspamd_upstream_af_to_weight ((*ip2)->addr); return w2 - w1; } @@ -212,12 +217,22 @@ rspamd_upstream_set_active (struct upstream_list *ls, struct upstream *up) } static void +rspamd_upstream_addr_elt_dtor (gpointer a) +{ + struct upstream_addr_elt *elt = a; + + rspamd_inet_address_destroy (elt->addr); + g_slice_free1 (sizeof (*elt), elt); +} + +static void rspamd_upstream_update_addrs (struct upstream *up) { guint16 port; guint addr_cnt; struct upstream_inet_addr_entry *cur, *tmp; GPtrArray *new_addrs; + struct upstream_addr_elt *addr_elt; /* * We need first of all get the saved port, since DNS gives us no @@ -226,8 +241,8 @@ rspamd_upstream_update_addrs (struct upstream *up) rspamd_mutex_lock (up->lock); if (up->addrs.addr->len > 0 && up->new_addrs) { - port = rspamd_inet_address_get_port (g_ptr_array_index (up->addrs.addr, - 0)); + addr_elt = g_ptr_array_index (up->addrs.addr, 0); + port = rspamd_inet_address_get_port (addr_elt->addr); /* Free old addresses */ g_ptr_array_free (up->addrs.addr, TRUE); @@ -237,13 +252,15 @@ rspamd_upstream_update_addrs (struct upstream *up) LL_FOREACH (up->new_addrs, cur) { addr_cnt++; } - new_addrs = g_ptr_array_new_full (addr_cnt, - (GDestroyNotify) rspamd_inet_address_destroy); + new_addrs = g_ptr_array_new_full (addr_cnt, rspamd_upstream_addr_elt_dtor); /* Copy addrs back */ LL_FOREACH (up->new_addrs, cur) { rspamd_inet_address_set_port (cur->addr, port); - g_ptr_array_add (new_addrs, cur->addr); + addr_elt = g_slice_alloc (sizeof (*addr_elt)); + addr_elt->addr = cur->addr; + addr_elt->errors = 0; + g_ptr_array_add (new_addrs, addr_elt); } up->addrs.cur = 0; @@ -363,6 +380,7 @@ rspamd_upstream_fail (struct upstream *up) struct timeval tv; gdouble error_rate, max_error_rate; gint msec_last, msec_cur; + struct upstream_addr_elt *addr_elt; gettimeofday (&tv, NULL); @@ -392,17 +410,31 @@ rspamd_upstream_fail (struct upstream *up) } } } + + /* Also increase count of errors for this specific address */ + if (up->addrs.addr) { + addr_elt = g_ptr_array_index (up->addrs.addr, up->addrs.cur); + addr_elt->errors ++; + } + rspamd_mutex_unlock (up->lock); } void rspamd_upstream_ok (struct upstream *up) { + struct upstream_addr_elt *addr_elt; + rspamd_mutex_lock (up->lock); if (up->errors > 0 && up->active_idx != -1) { /* We touch upstream if and only if it is active */ up->errors = 0; rspamd_upstream_set_active (up->ls, up); + + if (up->addrs.addr) { + addr_elt = g_ptr_array_index (up->addrs.addr, up->addrs.cur); + addr_elt->errors = 0; + } } rspamd_mutex_unlock (up->lock); @@ -470,30 +502,18 @@ rspamd_upstream_dtor (struct upstream *up) rspamd_inet_addr_t* rspamd_upstream_addr (struct upstream *up) { - gint idx, next_idx, w1, w2; - /* - * We know that addresses are sorted in the way that ipv4 addresses come - * first. Therefore, we select only ipv4 addresses if they exist, since - * many systems now has poorly supported ipv6 - */ - idx = up->addrs.cur; - next_idx = (idx + 1) % up->addrs.addr->len; - w1 = rspamd_upstream_af_to_weight (g_ptr_array_index (up->addrs.addr, idx)); - w2 = rspamd_upstream_af_to_weight (g_ptr_array_index (up->addrs.addr, - next_idx)); - - /* - * We don't care about the exact priorities, but we prefer ipv4/unix - * addresses before any ipv6 addresses - */ - if (!w1 || w2) { + guint idx, next_idx; + struct upstream_addr_elt *e1, *e2; + + do { + idx = up->addrs.cur; + next_idx = (idx + 1) % up->addrs.addr->len; + e1 = g_ptr_array_index (up->addrs.addr, idx); + e2 = g_ptr_array_index (up->addrs.addr, next_idx); up->addrs.cur = next_idx; - } - else { - up->addrs.cur = 0; - } + } while (e2->errors > e1->errors); - return g_ptr_array_index (up->addrs.addr, up->addrs.cur); + return e2->addr; } const gchar* @@ -507,15 +527,26 @@ rspamd_upstreams_add_upstream (struct upstream_list *ups, const gchar *str, guint16 def_port, void *data) { struct upstream *up; + GPtrArray *addrs; + guint i; + rspamd_inet_addr_t *addr; up = g_slice_alloc0 (sizeof (*up)); - if (!rspamd_parse_host_port_priority (str, &up->addrs.addr, + if (!rspamd_parse_host_port_priority (str, &addrs, &up->weight, &up->name, def_port, NULL)) { g_slice_free1 (sizeof (*up), up); return FALSE; } + else { + for (i = 0; i < addrs->len; i ++) { + addr = g_ptr_array_index (addrs, i); + rspamd_upstream_add_addr (up, rspamd_inet_address_copy (addr)); + } + + g_ptr_array_free (addrs, TRUE); + } g_ptr_array_add (ups->ups, up); up->ud = data; @@ -537,10 +568,17 @@ rspamd_upstreams_add_upstream (struct upstream_list *ups, gboolean rspamd_upstream_add_addr (struct upstream *up, rspamd_inet_addr_t *addr) { + struct upstream_addr_elt *elt; /* * XXX: slow and inefficient */ - g_ptr_array_add (up->addrs.addr, addr); + if (up->addrs.addr == NULL) { + up->addrs.addr = g_ptr_array_new_full (8, rspamd_upstream_addr_elt_dtor); + } + + elt = g_slice_alloc0 (sizeof (*elt)); + elt->addr = addr; + g_ptr_array_add (up->addrs.addr, elt); g_ptr_array_sort (up->addrs.addr, rspamd_upstream_addr_sort_func); return TRUE; diff --git a/test/rspamd_upstream_test.c b/test/rspamd_upstream_test.c index a446cc05d..61dcd02e9 100644 --- a/test/rspamd_upstream_test.c +++ b/test/rspamd_upstream_test.c @@ -84,6 +84,45 @@ rspamd_upstream_test_func (void) resolver = dns_resolver_init (NULL, ev_base, cfg); rspamd_upstreams_library_config (cfg, cfg->ups_ctx, ev_base, resolver->r); + /* + * Test v4/v6 priorities + */ + nls = rspamd_upstreams_create (cfg->ups_ctx); + g_assert (rspamd_upstreams_add_upstream (nls, "127.0.0.1", 0, NULL)); + up = rspamd_upstream_get (nls, RSPAMD_UPSTREAM_RANDOM, NULL, 0); + rspamd_parse_inet_address (&paddr, "127.0.0.2", 0); + g_assert (rspamd_upstream_add_addr (up, paddr)); + rspamd_parse_inet_address (&paddr, "::1", 0); + g_assert (rspamd_upstream_add_addr (up, paddr)); + /* Rewind to start */ + addr = rspamd_upstream_addr (up); + addr = rspamd_upstream_addr (up); + /* cur should be zero here */ + addr = rspamd_upstream_addr (up); + next_addr = rspamd_upstream_addr (up); + g_assert (rspamd_inet_address_get_af (addr) == AF_INET); + g_assert (rspamd_inet_address_get_af (next_addr) == AF_INET); + next_addr = rspamd_upstream_addr (up); + g_assert (rspamd_inet_address_get_af (next_addr) == AF_INET6); + next_addr = rspamd_upstream_addr (up); + g_assert (rspamd_inet_address_get_af (next_addr) == AF_INET); + next_addr = rspamd_upstream_addr (up); + g_assert (rspamd_inet_address_get_af (next_addr) == AF_INET); + next_addr = rspamd_upstream_addr (up); + g_assert (rspamd_inet_address_get_af (next_addr) == AF_INET6); + /* Test errors with IPv6 */ + rspamd_upstream_fail (up); + /* Now we should have merely IPv4 addresses in rotation */ + addr = rspamd_upstream_addr (up); + for (i = 0; i < 256; i++) { + next_addr = rspamd_upstream_addr (up); + g_assert (rspamd_inet_address_get_af (addr) == AF_INET); + g_assert (rspamd_inet_address_get_af (next_addr) == AF_INET); + g_assert (rspamd_inet_address_compare (addr, next_addr) != 0); + addr = next_addr; + } + rspamd_upstreams_destroy (nls); + ls = rspamd_upstreams_create (cfg->ups_ctx); g_assert (rspamd_upstreams_parse_line (ls, test_upstream_list, 443, NULL)); g_assert (rspamd_upstreams_count (ls) == 3); @@ -128,25 +167,6 @@ rspamd_upstream_test_func (void) rspamd_upstreams_destroy (nls); - /* - * Test v4/v6 priorities - */ - nls = rspamd_upstreams_create (cfg->ups_ctx); - g_assert (rspamd_upstreams_add_upstream (nls, "127.0.0.1", 0, NULL)); - up = rspamd_upstream_get (nls, RSPAMD_UPSTREAM_RANDOM, NULL, 0); - rspamd_parse_inet_address (&paddr, "127.0.0.2", 0); - g_assert (rspamd_upstream_add_addr (up, paddr)); - rspamd_parse_inet_address (&paddr, "::1", 0); - g_assert (rspamd_upstream_add_addr (up, paddr)); - addr = rspamd_upstream_addr (up); - for (i = 0; i < 256; i ++) { - next_addr = rspamd_upstream_addr (up); - g_assert (rspamd_inet_address_get_af (addr) == AF_INET); - g_assert (rspamd_inet_address_get_af (next_addr) == AF_INET); - g_assert (rspamd_inet_address_compare (addr, next_addr) != 0); - addr = next_addr; - } - rspamd_upstreams_destroy (nls); /* Upstream fail test */ evtimer_set (&ev, rspamd_upstream_timeout_handler, resolver); |