summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorVsevolod Stakhov <vsevolod@highsecure.ru>2015-12-03 18:51:21 +0000
committerVsevolod Stakhov <vsevolod@highsecure.ru>2015-12-03 18:51:21 +0000
commit9b9aa6efc85d4a71af2a287bbcdd4fccea5255be (patch)
tree84d6f747d5f6a67d9f67b9857adbc5cc92a83653
parent5126e935664c63e1430c812dd7c05f0bea5ffc27 (diff)
downloadrspamd-9b9aa6efc85d4a71af2a287bbcdd4fccea5255be.tar.gz
rspamd-9b9aa6efc85d4a71af2a287bbcdd4fccea5255be.zip
Rework upstreams library
Now each address has its own errors count, so rspamd will prefer upstream addrs with no errors to addrs with errors. This might help to resolve issues on systems where ipv6 does not work.
-rw-r--r--src/libutil/upstream.c106
-rw-r--r--test/rspamd_upstream_test.c58
2 files changed, 111 insertions, 53 deletions
diff --git a/src/libutil/upstream.c b/src/libutil/upstream.c
index 9eecb85cf..9044d0633 100644
--- a/src/libutil/upstream.c
+++ b/src/libutil/upstream.c
@@ -36,6 +36,11 @@ struct upstream_inet_addr_entry {
struct upstream_inet_addr_entry *next;
};
+struct upstream_addr_elt {
+ rspamd_inet_addr_t *addr;
+ guint errors;
+};
+
struct upstream {
guint weight;
guint cur_weight;
@@ -51,7 +56,7 @@ struct upstream {
struct upstream_ctx *ctx;
struct {
- GPtrArray *addr;
+ GPtrArray *addr; /* struct upstream_addr_elt */
guint cur;
} addrs;
@@ -192,12 +197,12 @@ rspamd_upstream_af_to_weight (const rspamd_inet_addr_t *addr)
static gint
rspamd_upstream_addr_sort_func (gconstpointer a, gconstpointer b)
{
- const rspamd_inet_addr_t **ip1 = (const rspamd_inet_addr_t **)a,
- **ip2 = (const rspamd_inet_addr_t **)b;
+ const struct upstream_addr_elt **ip1 = (const struct upstream_addr_elt **)a,
+ **ip2 = (const struct upstream_addr_elt **)b;
gint w1, w2;
- w1 = rspamd_upstream_af_to_weight (*ip1);
- w2 = rspamd_upstream_af_to_weight (*ip2);
+ w1 = rspamd_upstream_af_to_weight ((*ip1)->addr);
+ w2 = rspamd_upstream_af_to_weight ((*ip2)->addr);
return w2 - w1;
}
@@ -212,12 +217,22 @@ rspamd_upstream_set_active (struct upstream_list *ls, struct upstream *up)
}
static void
+rspamd_upstream_addr_elt_dtor (gpointer a)
+{
+ struct upstream_addr_elt *elt = a;
+
+ rspamd_inet_address_destroy (elt->addr);
+ g_slice_free1 (sizeof (*elt), elt);
+}
+
+static void
rspamd_upstream_update_addrs (struct upstream *up)
{
guint16 port;
guint addr_cnt;
struct upstream_inet_addr_entry *cur, *tmp;
GPtrArray *new_addrs;
+ struct upstream_addr_elt *addr_elt;
/*
* We need first of all get the saved port, since DNS gives us no
@@ -226,8 +241,8 @@ rspamd_upstream_update_addrs (struct upstream *up)
rspamd_mutex_lock (up->lock);
if (up->addrs.addr->len > 0 && up->new_addrs) {
- port = rspamd_inet_address_get_port (g_ptr_array_index (up->addrs.addr,
- 0));
+ addr_elt = g_ptr_array_index (up->addrs.addr, 0);
+ port = rspamd_inet_address_get_port (addr_elt->addr);
/* Free old addresses */
g_ptr_array_free (up->addrs.addr, TRUE);
@@ -237,13 +252,15 @@ rspamd_upstream_update_addrs (struct upstream *up)
LL_FOREACH (up->new_addrs, cur) {
addr_cnt++;
}
- new_addrs = g_ptr_array_new_full (addr_cnt,
- (GDestroyNotify) rspamd_inet_address_destroy);
+ new_addrs = g_ptr_array_new_full (addr_cnt, rspamd_upstream_addr_elt_dtor);
/* Copy addrs back */
LL_FOREACH (up->new_addrs, cur) {
rspamd_inet_address_set_port (cur->addr, port);
- g_ptr_array_add (new_addrs, cur->addr);
+ addr_elt = g_slice_alloc (sizeof (*addr_elt));
+ addr_elt->addr = cur->addr;
+ addr_elt->errors = 0;
+ g_ptr_array_add (new_addrs, addr_elt);
}
up->addrs.cur = 0;
@@ -363,6 +380,7 @@ rspamd_upstream_fail (struct upstream *up)
struct timeval tv;
gdouble error_rate, max_error_rate;
gint msec_last, msec_cur;
+ struct upstream_addr_elt *addr_elt;
gettimeofday (&tv, NULL);
@@ -392,17 +410,31 @@ rspamd_upstream_fail (struct upstream *up)
}
}
}
+
+ /* Also increase count of errors for this specific address */
+ if (up->addrs.addr) {
+ addr_elt = g_ptr_array_index (up->addrs.addr, up->addrs.cur);
+ addr_elt->errors ++;
+ }
+
rspamd_mutex_unlock (up->lock);
}
void
rspamd_upstream_ok (struct upstream *up)
{
+ struct upstream_addr_elt *addr_elt;
+
rspamd_mutex_lock (up->lock);
if (up->errors > 0 && up->active_idx != -1) {
/* We touch upstream if and only if it is active */
up->errors = 0;
rspamd_upstream_set_active (up->ls, up);
+
+ if (up->addrs.addr) {
+ addr_elt = g_ptr_array_index (up->addrs.addr, up->addrs.cur);
+ addr_elt->errors = 0;
+ }
}
rspamd_mutex_unlock (up->lock);
@@ -470,30 +502,18 @@ rspamd_upstream_dtor (struct upstream *up)
rspamd_inet_addr_t*
rspamd_upstream_addr (struct upstream *up)
{
- gint idx, next_idx, w1, w2;
- /*
- * We know that addresses are sorted in the way that ipv4 addresses come
- * first. Therefore, we select only ipv4 addresses if they exist, since
- * many systems now has poorly supported ipv6
- */
- idx = up->addrs.cur;
- next_idx = (idx + 1) % up->addrs.addr->len;
- w1 = rspamd_upstream_af_to_weight (g_ptr_array_index (up->addrs.addr, idx));
- w2 = rspamd_upstream_af_to_weight (g_ptr_array_index (up->addrs.addr,
- next_idx));
-
- /*
- * We don't care about the exact priorities, but we prefer ipv4/unix
- * addresses before any ipv6 addresses
- */
- if (!w1 || w2) {
+ guint idx, next_idx;
+ struct upstream_addr_elt *e1, *e2;
+
+ do {
+ idx = up->addrs.cur;
+ next_idx = (idx + 1) % up->addrs.addr->len;
+ e1 = g_ptr_array_index (up->addrs.addr, idx);
+ e2 = g_ptr_array_index (up->addrs.addr, next_idx);
up->addrs.cur = next_idx;
- }
- else {
- up->addrs.cur = 0;
- }
+ } while (e2->errors > e1->errors);
- return g_ptr_array_index (up->addrs.addr, up->addrs.cur);
+ return e2->addr;
}
const gchar*
@@ -507,15 +527,26 @@ rspamd_upstreams_add_upstream (struct upstream_list *ups,
const gchar *str, guint16 def_port, void *data)
{
struct upstream *up;
+ GPtrArray *addrs;
+ guint i;
+ rspamd_inet_addr_t *addr;
up = g_slice_alloc0 (sizeof (*up));
- if (!rspamd_parse_host_port_priority (str, &up->addrs.addr,
+ if (!rspamd_parse_host_port_priority (str, &addrs,
&up->weight,
&up->name, def_port, NULL)) {
g_slice_free1 (sizeof (*up), up);
return FALSE;
}
+ else {
+ for (i = 0; i < addrs->len; i ++) {
+ addr = g_ptr_array_index (addrs, i);
+ rspamd_upstream_add_addr (up, rspamd_inet_address_copy (addr));
+ }
+
+ g_ptr_array_free (addrs, TRUE);
+ }
g_ptr_array_add (ups->ups, up);
up->ud = data;
@@ -537,10 +568,17 @@ rspamd_upstreams_add_upstream (struct upstream_list *ups,
gboolean
rspamd_upstream_add_addr (struct upstream *up, rspamd_inet_addr_t *addr)
{
+ struct upstream_addr_elt *elt;
/*
* XXX: slow and inefficient
*/
- g_ptr_array_add (up->addrs.addr, addr);
+ if (up->addrs.addr == NULL) {
+ up->addrs.addr = g_ptr_array_new_full (8, rspamd_upstream_addr_elt_dtor);
+ }
+
+ elt = g_slice_alloc0 (sizeof (*elt));
+ elt->addr = addr;
+ g_ptr_array_add (up->addrs.addr, elt);
g_ptr_array_sort (up->addrs.addr, rspamd_upstream_addr_sort_func);
return TRUE;
diff --git a/test/rspamd_upstream_test.c b/test/rspamd_upstream_test.c
index a446cc05d..61dcd02e9 100644
--- a/test/rspamd_upstream_test.c
+++ b/test/rspamd_upstream_test.c
@@ -84,6 +84,45 @@ rspamd_upstream_test_func (void)
resolver = dns_resolver_init (NULL, ev_base, cfg);
rspamd_upstreams_library_config (cfg, cfg->ups_ctx, ev_base, resolver->r);
+ /*
+ * Test v4/v6 priorities
+ */
+ nls = rspamd_upstreams_create (cfg->ups_ctx);
+ g_assert (rspamd_upstreams_add_upstream (nls, "127.0.0.1", 0, NULL));
+ up = rspamd_upstream_get (nls, RSPAMD_UPSTREAM_RANDOM, NULL, 0);
+ rspamd_parse_inet_address (&paddr, "127.0.0.2", 0);
+ g_assert (rspamd_upstream_add_addr (up, paddr));
+ rspamd_parse_inet_address (&paddr, "::1", 0);
+ g_assert (rspamd_upstream_add_addr (up, paddr));
+ /* Rewind to start */
+ addr = rspamd_upstream_addr (up);
+ addr = rspamd_upstream_addr (up);
+ /* cur should be zero here */
+ addr = rspamd_upstream_addr (up);
+ next_addr = rspamd_upstream_addr (up);
+ g_assert (rspamd_inet_address_get_af (addr) == AF_INET);
+ g_assert (rspamd_inet_address_get_af (next_addr) == AF_INET);
+ next_addr = rspamd_upstream_addr (up);
+ g_assert (rspamd_inet_address_get_af (next_addr) == AF_INET6);
+ next_addr = rspamd_upstream_addr (up);
+ g_assert (rspamd_inet_address_get_af (next_addr) == AF_INET);
+ next_addr = rspamd_upstream_addr (up);
+ g_assert (rspamd_inet_address_get_af (next_addr) == AF_INET);
+ next_addr = rspamd_upstream_addr (up);
+ g_assert (rspamd_inet_address_get_af (next_addr) == AF_INET6);
+ /* Test errors with IPv6 */
+ rspamd_upstream_fail (up);
+ /* Now we should have merely IPv4 addresses in rotation */
+ addr = rspamd_upstream_addr (up);
+ for (i = 0; i < 256; i++) {
+ next_addr = rspamd_upstream_addr (up);
+ g_assert (rspamd_inet_address_get_af (addr) == AF_INET);
+ g_assert (rspamd_inet_address_get_af (next_addr) == AF_INET);
+ g_assert (rspamd_inet_address_compare (addr, next_addr) != 0);
+ addr = next_addr;
+ }
+ rspamd_upstreams_destroy (nls);
+
ls = rspamd_upstreams_create (cfg->ups_ctx);
g_assert (rspamd_upstreams_parse_line (ls, test_upstream_list, 443, NULL));
g_assert (rspamd_upstreams_count (ls) == 3);
@@ -128,25 +167,6 @@ rspamd_upstream_test_func (void)
rspamd_upstreams_destroy (nls);
- /*
- * Test v4/v6 priorities
- */
- nls = rspamd_upstreams_create (cfg->ups_ctx);
- g_assert (rspamd_upstreams_add_upstream (nls, "127.0.0.1", 0, NULL));
- up = rspamd_upstream_get (nls, RSPAMD_UPSTREAM_RANDOM, NULL, 0);
- rspamd_parse_inet_address (&paddr, "127.0.0.2", 0);
- g_assert (rspamd_upstream_add_addr (up, paddr));
- rspamd_parse_inet_address (&paddr, "::1", 0);
- g_assert (rspamd_upstream_add_addr (up, paddr));
- addr = rspamd_upstream_addr (up);
- for (i = 0; i < 256; i ++) {
- next_addr = rspamd_upstream_addr (up);
- g_assert (rspamd_inet_address_get_af (addr) == AF_INET);
- g_assert (rspamd_inet_address_get_af (next_addr) == AF_INET);
- g_assert (rspamd_inet_address_compare (addr, next_addr) != 0);
- addr = next_addr;
- }
- rspamd_upstreams_destroy (nls);
/* Upstream fail test */
evtimer_set (&ev, rspamd_upstream_timeout_handler, resolver);