From cbde625fb24020aa9038947f865cd41e91687553 Mon Sep 17 00:00:00 2001 From: Vsevolod Stakhov Date: Tue, 28 Oct 2014 16:04:33 +0000 Subject: [PATCH] Allow multiple addresses per upstream. --- src/libutil/addr.c | 49 ++++++++++++---- src/libutil/addr.h | 9 ++- src/libutil/upstream.c | 127 +++++++++++++++++++++++++++++++++++------ 3 files changed, 153 insertions(+), 32 deletions(-) diff --git a/src/libutil/addr.c b/src/libutil/addr.c index 362ab3994..6aeed21e9 100644 --- a/src/libutil/addr.c +++ b/src/libutil/addr.c @@ -185,11 +185,17 @@ rspamd_inet_address_connect (rspamd_inet_addr_t *addr, gint type, gboolean rspamd_parse_host_port_priority_strv (gchar **tokens, - rspamd_inet_addr_t *addr, guint *priority, gchar **name, guint default_port) + rspamd_inet_addr_t **addr, + guint *max_addrs, + guint *priority, + gchar **name, + guint default_port) { gchar *err_str, portbuf[8]; const gchar *cur_tok, *cur_port; - struct addrinfo hints, *res; + struct addrinfo hints, *res, *cur; + rspamd_inet_addr_t *cur_addr; + guint addr_cnt; guint port_parsed, priority_parsed, saved_errno = errno; gint r; @@ -265,9 +271,29 @@ rspamd_parse_host_port_priority_strv (gchar **tokens, } if ((r = getaddrinfo (cur_tok, cur_port, &hints, &res)) == 0) { - memcpy (&addr->addr, res->ai_addr, - MIN (sizeof (addr->addr), res->ai_addrlen)); - addr->af = res->ai_family; + /* Now copy up to max_addrs of addresses */ + addr_cnt = 0; + cur = res; + while (cur && addr_cnt < *max_addrs) { + cur = cur->ai_next; + addr_cnt ++; + } + + *addr = g_new (rspamd_inet_addr_t, addr_cnt); + + cur = res; + addr_cnt = 0; + while (cur && addr_cnt < *max_addrs) { + cur_addr = &(*addr)[addr_cnt]; + memcpy (&cur_addr->addr, cur->ai_addr, + MIN (sizeof (cur_addr->addr), cur->ai_addrlen)); + cur_addr->af = cur->ai_family; + cur = cur->ai_next; + addr_cnt ++; + } + + *max_addrs = addr_cnt; + freeaddrinfo (res); } else { @@ -292,7 +318,8 @@ err: gboolean rspamd_parse_host_port_priority ( const gchar *str, - rspamd_inet_addr_t *addr, + rspamd_inet_addr_t **addr, + guint *max_addrs, guint *priority, gchar **name, guint default_port) @@ -305,8 +332,8 @@ rspamd_parse_host_port_priority ( return FALSE; } - ret = rspamd_parse_host_port_priority_strv (tokens, addr, priority, name, - default_port); + ret = rspamd_parse_host_port_priority_strv (tokens, addr, max_addrs, + priority, name, default_port); g_strfreev (tokens); @@ -315,9 +342,11 @@ rspamd_parse_host_port_priority ( gboolean rspamd_parse_host_port (const gchar *str, - rspamd_inet_addr_t *addr, + rspamd_inet_addr_t **addr, + guint *max_addrs, gchar **name, guint default_port) { - return rspamd_parse_host_port_priority (str, addr, NULL, name, default_port); + return rspamd_parse_host_port_priority (str, addr, max_addrs, NULL, + name, default_port); } diff --git a/src/libutil/addr.h b/src/libutil/addr.h index 4e3a936f8..5989b740b 100644 --- a/src/libutil/addr.h +++ b/src/libutil/addr.h @@ -96,7 +96,8 @@ gboolean rspamd_ip_is_valid (rspamd_inet_addr_t *addr); gint rspamd_accept_from_socket (gint sock, rspamd_inet_addr_t *addr); gboolean rspamd_parse_host_port_priority_strv (gchar **tokens, - rspamd_inet_addr_t *addr, guint *priority, gchar **name, guint default_port); + rspamd_inet_addr_t **addr, guint *max_addrs, guint *priority, + gchar **name, guint default_port); /** * Parse host[:port[:priority]] line @@ -106,7 +107,8 @@ gboolean rspamd_parse_host_port_priority_strv (gchar **tokens, * @return TRUE if string was parsed */ gboolean rspamd_parse_host_port_priority (const gchar *str, - rspamd_inet_addr_t *addr, guint *priority, gchar **name, guint default_port); + rspamd_inet_addr_t **addr, guint *max_addrs, + guint *priority, gchar **name, guint default_port); /** * Parse host:port line @@ -115,7 +117,8 @@ gboolean rspamd_parse_host_port_priority (const gchar *str, * @return TRUE if string was parsed */ gboolean rspamd_parse_host_port (const gchar *str, - rspamd_inet_addr_t *addr, gchar **name, guint default_port); + rspamd_inet_addr_t **addr, guint *max_addrs, + gchar **name, guint default_port); #endif /* ADDR_H_ */ diff --git a/src/libutil/upstream.c b/src/libutil/upstream.c index 44a0455ad..55f44ec80 100644 --- a/src/libutil/upstream.c +++ b/src/libutil/upstream.c @@ -27,19 +27,32 @@ #include "ottery.h" #include "ref.h" #include "rdns.h" +#include "utlist.h" + +struct upstream_inet_addr_entry { + rspamd_inet_addr_t addr; + struct upstream_inet_addr_entry *next; +}; struct upstream { guint weight; guint cur_weight; guint errors; - guint port; gint active_idx; gchar *name; struct event ev; struct timeval tv; gpointer ud; struct upstream_list *ls; - rspamd_inet_addr_t addr; + + struct { + rspamd_inet_addr_t *addr; + guint count; + guint cur; + } addrs; + + struct upstream_inet_addr_entry *new_addrs; + ref_entry_t ref; }; @@ -58,6 +71,7 @@ const guint default_revive_time = 60; const guint default_error_time = 10; const gdouble default_dns_timeout = 1.0; const guint default_dns_retransmits = 2; +const guint default_max_addresses = 1024; static void rspamd_upstream_set_active (struct upstream_list *ls, struct upstream *up) @@ -73,22 +87,92 @@ rspamd_upstream_dns_cb (struct rdns_reply *reply, void *arg) { struct upstream *up = (struct upstream *)arg; struct rdns_reply_entry *entry; + struct upstream_inet_addr_entry *up_ent; if (reply->code == RDNS_RC_NOERROR) { - + entry = reply->entries; + + while (entry) { + + if (entry->type == RDNS_REQUEST_A) { + up_ent = g_malloc (sizeof (*up_ent)); + + up_ent->addr.addr.s4.sin_addr = entry->content.a.addr; + up_ent->addr.af = AF_INET; + up_ent->addr.slen = sizeof (up_ent->addr.addr.s4); + LL_PREPEND (up->new_addrs, up_ent); + } + else if (entry->type == RDNS_REQUEST_AAAA) { + up_ent = g_malloc (sizeof (*up_ent)); + + memcpy (&up_ent->addr.addr.s6.sin6_addr, + &entry->content.aaa.addr, sizeof (struct in6_addr)); + up_ent->addr.af = AF_INET6; + up_ent->addr.slen = sizeof (up_ent->addr.addr.s6); + LL_PREPEND (up->new_addrs, up_ent); + } + entry = entry->next; + } } REF_RELEASE (up); } static void -rspamd_revive_cb (int fd, short what, void *arg) +rspamd_upstream_update_addrs (struct upstream *up) +{ + guint16 port; + guint addr_cnt; + struct upstream_inet_addr_entry *cur, *tmp; + rspamd_inet_addr_t *new_addrs, *old; + + /* + * We need first of all get the saved port, since DNS gives us no + * idea about what port has been used previously + */ + if (up->addrs.count > 0 && up->new_addrs) { + port = rspamd_inet_address_get_port (&up->addrs.addr[0]); + + /* Now calculate new addrs count */ + addr_cnt = 0; + LL_FOREACH (up->new_addrs, cur) { + addr_cnt ++; + } + new_addrs = g_new (rspamd_inet_addr_t, addr_cnt); + + /* Copy addrs back */ + addr_cnt = 0; + LL_FOREACH (up->new_addrs, cur) { + memcpy (&new_addrs[addr_cnt], cur, sizeof (rspamd_inet_addr_t)); + rspamd_inet_address_set_port (&new_addrs[addr_cnt], port); + addr_cnt ++; + } + + old = up->addrs.addr; + up->addrs.cur = 0; + up->addrs.count = addr_cnt; + up->addrs.addr = new_addrs; + g_free (old); + } + + LL_FOREACH_SAFE (up->new_addrs, cur, tmp) { + g_free (cur); + } + up->new_addrs = NULL; +} + +static void +rspamd_upstream_revive_cb (int fd, short what, void *arg) { struct upstream *up = (struct upstream *)arg; event_del (&up->ev); if (up->ls) { rspamd_upstream_set_active (up->ls, up); + + if (up->new_addrs) { + rspamd_upstream_update_addrs (up); + } } REF_RELEASE (up); @@ -97,28 +181,24 @@ rspamd_revive_cb (int fd, short what, void *arg) static void rspamd_upstream_set_inactive (struct upstream_list *ls, struct upstream *up) { - gint query_type = -1; - rspamd_mutex_lock (ls->lock); g_ptr_array_remove_index (ls->alive, up->active_idx); up->active_idx = -1; - /* Resolve name of the upstream one more time */ - if (up->addr.af == AF_INET) { - query_type = RDNS_REQUEST_A; - } - else if (up->addr.af == AF_INET6) { - query_type = RDNS_REQUEST_AAAA; - } - if (query_type != -1) { + /* Resolve name of the upstream one more time */ + if (up->name[0] != '/') { REF_RETAIN (up); rdns_make_request_full (res, rspamd_upstream_dns_cb, up, default_dns_timeout, default_dns_retransmits, RDNS_REQUEST_A, up->name); + REF_RETAIN (up); + rdns_make_request_full (res, rspamd_upstream_dns_cb, up, + default_dns_timeout, default_dns_retransmits, + RDNS_REQUEST_AAAA, up->name); } REF_RETAIN (up); - evtimer_set (&up->ev, rspamd_revive_cb, up); + evtimer_set (&up->ev, rspamd_upstream_revive_cb, up); event_base_set (ev_base, &up->ev); up->tv.tv_sec = default_revive_time; up->tv.tv_usec = 0; @@ -179,7 +259,7 @@ rspamd_upstreams_create (void) ls = g_slice_alloc (sizeof (*ls)); ls->hash_seed = ottery_rand_unsigned (); - ls->ups = g_array_new (FALSE, TRUE, sizeof (struct upstream_list)); + ls->ups = g_ptr_array_new (); ls->alive = g_ptr_array_new (); ls->lock = rspamd_mutex_new (); @@ -189,6 +269,14 @@ rspamd_upstreams_create (void) static void rspamd_upstream_dtor (struct upstream *up) { + struct upstream_inet_addr_entry *cur, *tmp; + + if (up->new_addrs) { + LL_FOREACH_SAFE(up->new_addrs, cur, tmp) { + g_free (cur); + } + } + g_free (up->name); g_slice_free1 (sizeof (*up), up); } @@ -196,7 +284,7 @@ rspamd_upstream_dtor (struct upstream *up) rspamd_inet_addr_t* rspamd_upstream_addr (struct upstream *up) { - return &up->addr; + return &up->addrs.addr[up->addrs.cur++ % up->addrs.count]; } gboolean @@ -207,7 +295,9 @@ rspamd_upstreams_add_upstream (struct upstream_list *ups, up = g_slice_alloc0 (sizeof (*up)); - if (!rspamd_parse_host_port_priority (str, &up->addr, &up->weight, + up->addrs.count = default_max_addresses; + if (!rspamd_parse_host_port_priority (str, &up->addrs.addr, + &up->addrs.count, &up->weight, &up->name, def_port)) { g_slice_free1 (sizeof (*up), up); return FALSE; @@ -216,7 +306,6 @@ rspamd_upstreams_add_upstream (struct upstream_list *ups, g_ptr_array_add (ups->ups, up); up->ud = data; up->cur_weight = up->weight; - up->port = rspamd_inet_address_get_port (&up->addr); REF_INIT_RETAIN (up, rspamd_upstream_dtor); rspamd_upstream_set_active (ups, up); -- 2.39.5