]> source.dussan.org Git - rspamd.git/commitdiff
Allow multiple addresses per upstream.
authorVsevolod Stakhov <vsevolod@highsecure.ru>
Tue, 28 Oct 2014 16:04:33 +0000 (16:04 +0000)
committerVsevolod Stakhov <vsevolod@highsecure.ru>
Tue, 28 Oct 2014 16:04:33 +0000 (16:04 +0000)
src/libutil/addr.c
src/libutil/addr.h
src/libutil/upstream.c

index 362ab399482f9c257d20c4e5aa83fb5a5888c927..6aeed21e9319214fc84932474dd7f1d17e898cea 100644 (file)
@@ -185,11 +185,17 @@ rspamd_inet_address_connect (rspamd_inet_addr_t *addr, gint type,
 
 gboolean
 rspamd_parse_host_port_priority_strv (gchar **tokens,
-       rspamd_inet_addr_t *addr, guint *priority, gchar **name, guint default_port)
+       rspamd_inet_addr_t **addr,
+       guint *max_addrs,
+       guint *priority,
+       gchar **name,
+       guint default_port)
 {
        gchar *err_str, portbuf[8];
        const gchar *cur_tok, *cur_port;
-       struct addrinfo hints, *res;
+       struct addrinfo hints, *res, *cur;
+       rspamd_inet_addr_t *cur_addr;
+       guint addr_cnt;
        guint port_parsed, priority_parsed, saved_errno = errno;
        gint r;
 
@@ -265,9 +271,29 @@ rspamd_parse_host_port_priority_strv (gchar **tokens,
        }
 
        if ((r = getaddrinfo (cur_tok, cur_port, &hints, &res)) == 0) {
-               memcpy (&addr->addr, res->ai_addr,
-                       MIN (sizeof (addr->addr), res->ai_addrlen));
-               addr->af = res->ai_family;
+               /* Now copy up to max_addrs of addresses */
+               addr_cnt = 0;
+               cur = res;
+               while (cur && addr_cnt < *max_addrs) {
+                       cur = cur->ai_next;
+                       addr_cnt ++;
+               }
+
+               *addr = g_new (rspamd_inet_addr_t, addr_cnt);
+
+               cur = res;
+               addr_cnt = 0;
+               while (cur && addr_cnt < *max_addrs) {
+                       cur_addr = &(*addr)[addr_cnt];
+                       memcpy (&cur_addr->addr, cur->ai_addr,
+                                       MIN (sizeof (cur_addr->addr), cur->ai_addrlen));
+                       cur_addr->af = cur->ai_family;
+                       cur = cur->ai_next;
+                       addr_cnt ++;
+               }
+
+               *max_addrs = addr_cnt;
+
                freeaddrinfo (res);
        }
        else {
@@ -292,7 +318,8 @@ err:
 gboolean
 rspamd_parse_host_port_priority (
        const gchar *str,
-       rspamd_inet_addr_t *addr,
+       rspamd_inet_addr_t **addr,
+       guint *max_addrs,
        guint *priority,
        gchar **name,
        guint default_port)
@@ -305,8 +332,8 @@ rspamd_parse_host_port_priority (
                return FALSE;
        }
 
-       ret = rspamd_parse_host_port_priority_strv (tokens, addr, priority, name,
-                       default_port);
+       ret = rspamd_parse_host_port_priority_strv (tokens, addr, max_addrs,
+                       priority, name, default_port);
 
        g_strfreev (tokens);
 
@@ -315,9 +342,11 @@ rspamd_parse_host_port_priority (
 
 gboolean
 rspamd_parse_host_port (const gchar *str,
-       rspamd_inet_addr_t *addr,
+       rspamd_inet_addr_t **addr,
+       guint *max_addrs,
        gchar **name,
        guint default_port)
 {
-       return rspamd_parse_host_port_priority (str, addr, NULL, name, default_port);
+       return rspamd_parse_host_port_priority (str, addr, max_addrs, NULL,
+                       name, default_port);
 }
index 4e3a936f8602431ffa4449cef49a91af56f15078..5989b740ba8a2a4b45b0e807e88908d49554fd4a 100644 (file)
@@ -96,7 +96,8 @@ gboolean rspamd_ip_is_valid (rspamd_inet_addr_t *addr);
 gint rspamd_accept_from_socket (gint sock, rspamd_inet_addr_t *addr);
 
 gboolean rspamd_parse_host_port_priority_strv (gchar **tokens,
-       rspamd_inet_addr_t *addr, guint *priority, gchar **name, guint default_port);
+       rspamd_inet_addr_t **addr, guint *max_addrs, guint *priority,
+       gchar **name, guint default_port);
 
 /**
  * Parse host[:port[:priority]] line
@@ -106,7 +107,8 @@ gboolean rspamd_parse_host_port_priority_strv (gchar **tokens,
  * @return TRUE if string was parsed
  */
 gboolean rspamd_parse_host_port_priority (const gchar *str,
-               rspamd_inet_addr_t *addr, guint *priority, gchar **name, guint default_port);
+               rspamd_inet_addr_t **addr, guint *max_addrs,
+               guint *priority, gchar **name, guint default_port);
 
 /**
  * Parse host:port line
@@ -115,7 +117,8 @@ gboolean rspamd_parse_host_port_priority (const gchar *str,
  * @return TRUE if string was parsed
  */
 gboolean rspamd_parse_host_port (const gchar *str,
-       rspamd_inet_addr_t *addr, gchar **name, guint default_port);
+       rspamd_inet_addr_t **addr, guint *max_addrs,
+       gchar **name, guint default_port);
 
 
 #endif /* ADDR_H_ */
index 44a0455adce6f160f926a0cbd7f30121c264892e..55f44ec80e337e5a995ab40a040b8c3bcf918874 100644 (file)
 #include "ottery.h"
 #include "ref.h"
 #include "rdns.h"
+#include "utlist.h"
+
+struct upstream_inet_addr_entry {
+       rspamd_inet_addr_t addr;
+       struct upstream_inet_addr_entry *next;
+};
 
 struct upstream {
        guint weight;
        guint cur_weight;
        guint errors;
-       guint port;
        gint active_idx;
        gchar *name;
        struct event ev;
        struct timeval tv;
        gpointer ud;
        struct upstream_list *ls;
-       rspamd_inet_addr_t addr;
+
+       struct {
+               rspamd_inet_addr_t *addr;
+               guint count;
+               guint cur;
+       } addrs;
+
+       struct upstream_inet_addr_entry *new_addrs;
+
        ref_entry_t ref;
 };
 
@@ -58,6 +71,7 @@ const guint default_revive_time = 60;
 const guint default_error_time = 10;
 const gdouble default_dns_timeout = 1.0;
 const guint default_dns_retransmits = 2;
+const guint default_max_addresses = 1024;
 
 static void
 rspamd_upstream_set_active (struct upstream_list *ls, struct upstream *up)
@@ -73,22 +87,92 @@ rspamd_upstream_dns_cb (struct rdns_reply *reply, void *arg)
 {
        struct upstream *up = (struct upstream *)arg;
        struct rdns_reply_entry *entry;
+       struct upstream_inet_addr_entry *up_ent;
 
        if (reply->code == RDNS_RC_NOERROR) {
-
+               entry = reply->entries;
+
+               while (entry) {
+
+                       if (entry->type == RDNS_REQUEST_A) {
+                               up_ent = g_malloc (sizeof (*up_ent));
+
+                               up_ent->addr.addr.s4.sin_addr = entry->content.a.addr;
+                               up_ent->addr.af = AF_INET;
+                               up_ent->addr.slen = sizeof (up_ent->addr.addr.s4);
+                               LL_PREPEND (up->new_addrs, up_ent);
+                       }
+                       else if (entry->type == RDNS_REQUEST_AAAA) {
+                               up_ent = g_malloc (sizeof (*up_ent));
+
+                               memcpy (&up_ent->addr.addr.s6.sin6_addr,
+                                               &entry->content.aaa.addr, sizeof (struct in6_addr));
+                               up_ent->addr.af = AF_INET6;
+                               up_ent->addr.slen = sizeof (up_ent->addr.addr.s6);
+                               LL_PREPEND (up->new_addrs, up_ent);
+                       }
+                       entry = entry->next;
+               }
        }
 
        REF_RELEASE (up);
 }
 
 static void
-rspamd_revive_cb (int fd, short what, void *arg)
+rspamd_upstream_update_addrs (struct upstream *up)
+{
+       guint16 port;
+       guint addr_cnt;
+       struct upstream_inet_addr_entry *cur, *tmp;
+       rspamd_inet_addr_t *new_addrs, *old;
+
+       /*
+        * We need first of all get the saved port, since DNS gives us no
+        * idea about what port has been used previously
+        */
+       if (up->addrs.count > 0 && up->new_addrs) {
+               port = rspamd_inet_address_get_port (&up->addrs.addr[0]);
+
+               /* Now calculate new addrs count */
+               addr_cnt = 0;
+               LL_FOREACH (up->new_addrs, cur) {
+                       addr_cnt ++;
+               }
+               new_addrs = g_new (rspamd_inet_addr_t, addr_cnt);
+
+               /* Copy addrs back */
+               addr_cnt = 0;
+               LL_FOREACH (up->new_addrs, cur) {
+                       memcpy (&new_addrs[addr_cnt], cur, sizeof (rspamd_inet_addr_t));
+                       rspamd_inet_address_set_port (&new_addrs[addr_cnt], port);
+                       addr_cnt ++;
+               }
+
+               old = up->addrs.addr;
+               up->addrs.cur = 0;
+               up->addrs.count = addr_cnt;
+               up->addrs.addr = new_addrs;
+               g_free (old);
+       }
+
+       LL_FOREACH_SAFE (up->new_addrs, cur, tmp) {
+               g_free (cur);
+       }
+       up->new_addrs = NULL;
+}
+
+static void
+rspamd_upstream_revive_cb (int fd, short what, void *arg)
 {
        struct upstream *up = (struct upstream *)arg;
 
        event_del (&up->ev);
        if (up->ls) {
                rspamd_upstream_set_active (up->ls, up);
+
+               if (up->new_addrs) {
+                       rspamd_upstream_update_addrs (up);
+               }
        }
 
        REF_RELEASE (up);
@@ -97,28 +181,24 @@ rspamd_revive_cb (int fd, short what, void *arg)
 static void
 rspamd_upstream_set_inactive (struct upstream_list *ls, struct upstream *up)
 {
-       gint query_type = -1;
-
        rspamd_mutex_lock (ls->lock);
        g_ptr_array_remove_index (ls->alive, up->active_idx);
        up->active_idx = -1;
-       /* Resolve name of the upstream one more time */
-       if (up->addr.af == AF_INET) {
-               query_type = RDNS_REQUEST_A;
-       }
-       else if (up->addr.af == AF_INET6) {
-               query_type = RDNS_REQUEST_AAAA;
-       }
 
-       if (query_type != -1) {
+       /* Resolve name of the upstream one more time */
+       if (up->name[0] != '/') {
                REF_RETAIN (up);
                rdns_make_request_full (res, rspamd_upstream_dns_cb, up,
                        default_dns_timeout, default_dns_retransmits,
                        RDNS_REQUEST_A, up->name);
+               REF_RETAIN (up);
+               rdns_make_request_full (res, rspamd_upstream_dns_cb, up,
+                       default_dns_timeout, default_dns_retransmits,
+                       RDNS_REQUEST_AAAA, up->name);
        }
 
        REF_RETAIN (up);
-       evtimer_set (&up->ev, rspamd_revive_cb, up);
+       evtimer_set (&up->ev, rspamd_upstream_revive_cb, up);
        event_base_set (ev_base, &up->ev);
        up->tv.tv_sec = default_revive_time;
        up->tv.tv_usec = 0;
@@ -179,7 +259,7 @@ rspamd_upstreams_create (void)
 
        ls = g_slice_alloc (sizeof (*ls));
        ls->hash_seed = ottery_rand_unsigned ();
-       ls->ups = g_array_new (FALSE, TRUE, sizeof (struct upstream_list));
+       ls->ups = g_ptr_array_new ();
        ls->alive = g_ptr_array_new ();
        ls->lock = rspamd_mutex_new ();
 
@@ -189,6 +269,14 @@ rspamd_upstreams_create (void)
 static void
 rspamd_upstream_dtor (struct upstream *up)
 {
+       struct upstream_inet_addr_entry *cur, *tmp;
+
+       if (up->new_addrs) {
+               LL_FOREACH_SAFE(up->new_addrs, cur, tmp) {
+                       g_free (cur);
+               }
+       }
+
        g_free (up->name);
        g_slice_free1 (sizeof (*up), up);
 }
@@ -196,7 +284,7 @@ rspamd_upstream_dtor (struct upstream *up)
 rspamd_inet_addr_t*
 rspamd_upstream_addr (struct upstream *up)
 {
-       return &up->addr;
+       return &up->addrs.addr[up->addrs.cur++ % up->addrs.count];
 }
 
 gboolean
@@ -207,7 +295,9 @@ rspamd_upstreams_add_upstream (struct upstream_list *ups,
 
        up = g_slice_alloc0 (sizeof (*up));
 
-       if (!rspamd_parse_host_port_priority (str, &up->addr, &up->weight,
+       up->addrs.count = default_max_addresses;
+       if (!rspamd_parse_host_port_priority (str, &up->addrs.addr,
+                       &up->addrs.count, &up->weight,
                        &up->name, def_port)) {
                g_slice_free1 (sizeof (*up), up);
                return FALSE;
@@ -216,7 +306,6 @@ rspamd_upstreams_add_upstream (struct upstream_list *ups,
        g_ptr_array_add (ups->ups, up);
        up->ud = data;
        up->cur_weight = up->weight;
-       up->port = rspamd_inet_address_get_port (&up->addr);
        REF_INIT_RETAIN (up, rspamd_upstream_dtor);
 
        rspamd_upstream_set_active (ups, up);