aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--src/libutil/upstream.c106
-rw-r--r--test/rspamd_upstream_test.c58
2 files changed, 111 insertions, 53 deletions
diff --git a/src/libutil/upstream.c b/src/libutil/upstream.c
index 9eecb85cf..9044d0633 100644
--- a/src/libutil/upstream.c
+++ b/src/libutil/upstream.c
@@ -36,6 +36,11 @@ struct upstream_inet_addr_entry {
struct upstream_inet_addr_entry *next;
};
+struct upstream_addr_elt {
+ rspamd_inet_addr_t *addr;
+ guint errors;
+};
+
struct upstream {
guint weight;
guint cur_weight;
@@ -51,7 +56,7 @@ struct upstream {
struct upstream_ctx *ctx;
struct {
- GPtrArray *addr;
+ GPtrArray *addr; /* struct upstream_addr_elt */
guint cur;
} addrs;
@@ -192,12 +197,12 @@ rspamd_upstream_af_to_weight (const rspamd_inet_addr_t *addr)
static gint
rspamd_upstream_addr_sort_func (gconstpointer a, gconstpointer b)
{
- const rspamd_inet_addr_t **ip1 = (const rspamd_inet_addr_t **)a,
- **ip2 = (const rspamd_inet_addr_t **)b;
+ const struct upstream_addr_elt **ip1 = (const struct upstream_addr_elt **)a,
+ **ip2 = (const struct upstream_addr_elt **)b;
gint w1, w2;
- w1 = rspamd_upstream_af_to_weight (*ip1);
- w2 = rspamd_upstream_af_to_weight (*ip2);
+ w1 = rspamd_upstream_af_to_weight ((*ip1)->addr);
+ w2 = rspamd_upstream_af_to_weight ((*ip2)->addr);
return w2 - w1;
}
@@ -212,12 +217,22 @@ rspamd_upstream_set_active (struct upstream_list *ls, struct upstream *up)
}
static void
+rspamd_upstream_addr_elt_dtor (gpointer a)
+{
+ struct upstream_addr_elt *elt = a;
+
+ rspamd_inet_address_destroy (elt->addr);
+ g_slice_free1 (sizeof (*elt), elt);
+}
+
+static void
rspamd_upstream_update_addrs (struct upstream *up)
{
guint16 port;
guint addr_cnt;
struct upstream_inet_addr_entry *cur, *tmp;
GPtrArray *new_addrs;
+ struct upstream_addr_elt *addr_elt;
/*
* We need first of all get the saved port, since DNS gives us no
@@ -226,8 +241,8 @@ rspamd_upstream_update_addrs (struct upstream *up)
rspamd_mutex_lock (up->lock);
if (up->addrs.addr->len > 0 && up->new_addrs) {
- port = rspamd_inet_address_get_port (g_ptr_array_index (up->addrs.addr,
- 0));
+ addr_elt = g_ptr_array_index (up->addrs.addr, 0);
+ port = rspamd_inet_address_get_port (addr_elt->addr);
/* Free old addresses */
g_ptr_array_free (up->addrs.addr, TRUE);
@@ -237,13 +252,15 @@ rspamd_upstream_update_addrs (struct upstream *up)
LL_FOREACH (up->new_addrs, cur) {
addr_cnt++;
}
- new_addrs = g_ptr_array_new_full (addr_cnt,
- (GDestroyNotify) rspamd_inet_address_destroy);
+ new_addrs = g_ptr_array_new_full (addr_cnt, rspamd_upstream_addr_elt_dtor);
/* Copy addrs back */
LL_FOREACH (up->new_addrs, cur) {
rspamd_inet_address_set_port (cur->addr, port);
- g_ptr_array_add (new_addrs, cur->addr);
+ addr_elt = g_slice_alloc (sizeof (*addr_elt));
+ addr_elt->addr = cur->addr;
+ addr_elt->errors = 0;
+ g_ptr_array_add (new_addrs, addr_elt);
}
up->addrs.cur = 0;
@@ -363,6 +380,7 @@ rspamd_upstream_fail (struct upstream *up)
struct timeval tv;
gdouble error_rate, max_error_rate;
gint msec_last, msec_cur;
+ struct upstream_addr_elt *addr_elt;
gettimeofday (&tv, NULL);
@@ -392,17 +410,31 @@ rspamd_upstream_fail (struct upstream *up)
}
}
}
+
+ /* Also increase count of errors for this specific address */
+ if (up->addrs.addr) {
+ addr_elt = g_ptr_array_index (up->addrs.addr, up->addrs.cur);
+ addr_elt->errors ++;
+ }
+
rspamd_mutex_unlock (up->lock);
}
void
rspamd_upstream_ok (struct upstream *up)
{
+ struct upstream_addr_elt *addr_elt;
+
rspamd_mutex_lock (up->lock);
if (up->errors > 0 && up->active_idx != -1) {
/* We touch upstream if and only if it is active */
up->errors = 0;
rspamd_upstream_set_active (up->ls, up);
+
+ if (up->addrs.addr) {
+ addr_elt = g_ptr_array_index (up->addrs.addr, up->addrs.cur);
+ addr_elt->errors = 0;
+ }
}
rspamd_mutex_unlock (up->lock);
@@ -470,30 +502,18 @@ rspamd_upstream_dtor (struct upstream *up)
rspamd_inet_addr_t*
rspamd_upstream_addr (struct upstream *up)
{
- gint idx, next_idx, w1, w2;
- /*
- * We know that addresses are sorted in the way that ipv4 addresses come
- * first. Therefore, we select only ipv4 addresses if they exist, since
- * many systems now has poorly supported ipv6
- */
- idx = up->addrs.cur;
- next_idx = (idx + 1) % up->addrs.addr->len;
- w1 = rspamd_upstream_af_to_weight (g_ptr_array_index (up->addrs.addr, idx));
- w2 = rspamd_upstream_af_to_weight (g_ptr_array_index (up->addrs.addr,
- next_idx));
-
- /*
- * We don't care about the exact priorities, but we prefer ipv4/unix
- * addresses before any ipv6 addresses
- */
- if (!w1 || w2) {
+ guint idx, next_idx;
+ struct upstream_addr_elt *e1, *e2;
+
+ do {
+ idx = up->addrs.cur;
+ next_idx = (idx + 1) % up->addrs.addr->len;
+ e1 = g_ptr_array_index (up->addrs.addr, idx);
+ e2 = g_ptr_array_index (up->addrs.addr, next_idx);
up->addrs.cur = next_idx;
- }
- else {
- up->addrs.cur = 0;
- }
+ } while (e2->errors > e1->errors);
- return g_ptr_array_index (up->addrs.addr, up->addrs.cur);
+ return e2->addr;
}
const gchar*
@@ -507,15 +527,26 @@ rspamd_upstreams_add_upstream (struct upstream_list *ups,
const gchar *str, guint16 def_port, void *data)
{
struct upstream *up;
+ GPtrArray *addrs;
+ guint i;
+ rspamd_inet_addr_t *addr;
up = g_slice_alloc0 (sizeof (*up));
- if (!rspamd_parse_host_port_priority (str, &up->addrs.addr,
+ if (!rspamd_parse_host_port_priority (str, &addrs,
&up->weight,
&up->name, def_port, NULL)) {
g_slice_free1 (sizeof (*up), up);
return FALSE;
}
+ else {
+ for (i = 0; i < addrs->len; i ++) {
+ addr = g_ptr_array_index (addrs, i);
+ rspamd_upstream_add_addr (up, rspamd_inet_address_copy (addr));
+ }
+
+ g_ptr_array_free (addrs, TRUE);
+ }
g_ptr_array_add (ups->ups, up);
up->ud = data;
@@ -537,10 +568,17 @@ rspamd_upstreams_add_upstream (struct upstream_list *ups,
gboolean
rspamd_upstream_add_addr (struct upstream *up, rspamd_inet_addr_t *addr)
{
+ struct upstream_addr_elt *elt;
/*
* XXX: slow and inefficient
*/
- g_ptr_array_add (up->addrs.addr, addr);
+ if (up->addrs.addr == NULL) {
+ up->addrs.addr = g_ptr_array_new_full (8, rspamd_upstream_addr_elt_dtor);
+ }
+
+ elt = g_slice_alloc0 (sizeof (*elt));
+ elt->addr = addr;
+ g_ptr_array_add (up->addrs.addr, elt);
g_ptr_array_sort (up->addrs.addr, rspamd_upstream_addr_sort_func);
return TRUE;
diff --git a/test/rspamd_upstream_test.c b/test/rspamd_upstream_test.c
index a446cc05d..61dcd02e9 100644
--- a/test/rspamd_upstream_test.c
+++ b/test/rspamd_upstream_test.c
@@ -84,6 +84,45 @@ rspamd_upstream_test_func (void)
resolver = dns_resolver_init (NULL, ev_base, cfg);
rspamd_upstreams_library_config (cfg, cfg->ups_ctx, ev_base, resolver->r);
+ /*
+ * Test v4/v6 priorities
+ */
+ nls = rspamd_upstreams_create (cfg->ups_ctx);
+ g_assert (rspamd_upstreams_add_upstream (nls, "127.0.0.1", 0, NULL));
+ up = rspamd_upstream_get (nls, RSPAMD_UPSTREAM_RANDOM, NULL, 0);
+ rspamd_parse_inet_address (&paddr, "127.0.0.2", 0);
+ g_assert (rspamd_upstream_add_addr (up, paddr));
+ rspamd_parse_inet_address (&paddr, "::1", 0);
+ g_assert (rspamd_upstream_add_addr (up, paddr));
+ /* Rewind to start */
+ addr = rspamd_upstream_addr (up);
+ addr = rspamd_upstream_addr (up);
+ /* cur should be zero here */
+ addr = rspamd_upstream_addr (up);
+ next_addr = rspamd_upstream_addr (up);
+ g_assert (rspamd_inet_address_get_af (addr) == AF_INET);
+ g_assert (rspamd_inet_address_get_af (next_addr) == AF_INET);
+ next_addr = rspamd_upstream_addr (up);
+ g_assert (rspamd_inet_address_get_af (next_addr) == AF_INET6);
+ next_addr = rspamd_upstream_addr (up);
+ g_assert (rspamd_inet_address_get_af (next_addr) == AF_INET);
+ next_addr = rspamd_upstream_addr (up);
+ g_assert (rspamd_inet_address_get_af (next_addr) == AF_INET);
+ next_addr = rspamd_upstream_addr (up);
+ g_assert (rspamd_inet_address_get_af (next_addr) == AF_INET6);
+ /* Test errors with IPv6 */
+ rspamd_upstream_fail (up);
+ /* Now we should have merely IPv4 addresses in rotation */
+ addr = rspamd_upstream_addr (up);
+ for (i = 0; i < 256; i++) {
+ next_addr = rspamd_upstream_addr (up);
+ g_assert (rspamd_inet_address_get_af (addr) == AF_INET);
+ g_assert (rspamd_inet_address_get_af (next_addr) == AF_INET);
+ g_assert (rspamd_inet_address_compare (addr, next_addr) != 0);
+ addr = next_addr;
+ }
+ rspamd_upstreams_destroy (nls);
+
ls = rspamd_upstreams_create (cfg->ups_ctx);
g_assert (rspamd_upstreams_parse_line (ls, test_upstream_list, 443, NULL));
g_assert (rspamd_upstreams_count (ls) == 3);
@@ -128,25 +167,6 @@ rspamd_upstream_test_func (void)
rspamd_upstreams_destroy (nls);
- /*
- * Test v4/v6 priorities
- */
- nls = rspamd_upstreams_create (cfg->ups_ctx);
- g_assert (rspamd_upstreams_add_upstream (nls, "127.0.0.1", 0, NULL));
- up = rspamd_upstream_get (nls, RSPAMD_UPSTREAM_RANDOM, NULL, 0);
- rspamd_parse_inet_address (&paddr, "127.0.0.2", 0);
- g_assert (rspamd_upstream_add_addr (up, paddr));
- rspamd_parse_inet_address (&paddr, "::1", 0);
- g_assert (rspamd_upstream_add_addr (up, paddr));
- addr = rspamd_upstream_addr (up);
- for (i = 0; i < 256; i ++) {
- next_addr = rspamd_upstream_addr (up);
- g_assert (rspamd_inet_address_get_af (addr) == AF_INET);
- g_assert (rspamd_inet_address_get_af (next_addr) == AF_INET);
- g_assert (rspamd_inet_address_compare (addr, next_addr) != 0);
- addr = next_addr;
- }
- rspamd_upstreams_destroy (nls);
/* Upstream fail test */
evtimer_set (&ev, rspamd_upstream_timeout_handler, resolver);