gboolean
rspamd_parse_host_port_priority_strv (gchar **tokens,
- rspamd_inet_addr_t *addr, guint *priority, gchar **name, guint default_port)
+ rspamd_inet_addr_t **addr,
+ guint *max_addrs,
+ guint *priority,
+ gchar **name,
+ guint default_port)
{
gchar *err_str, portbuf[8];
const gchar *cur_tok, *cur_port;
- struct addrinfo hints, *res;
+ struct addrinfo hints, *res, *cur;
+ rspamd_inet_addr_t *cur_addr;
+ guint addr_cnt;
guint port_parsed, priority_parsed, saved_errno = errno;
gint r;
}
if ((r = getaddrinfo (cur_tok, cur_port, &hints, &res)) == 0) {
- memcpy (&addr->addr, res->ai_addr,
- MIN (sizeof (addr->addr), res->ai_addrlen));
- addr->af = res->ai_family;
+ /* Now copy up to max_addrs of addresses */
+ addr_cnt = 0;
+ cur = res;
+ while (cur && addr_cnt < *max_addrs) {
+ cur = cur->ai_next;
+ addr_cnt ++;
+ }
+
+ *addr = g_new (rspamd_inet_addr_t, addr_cnt);
+
+ cur = res;
+ addr_cnt = 0;
+ while (cur && addr_cnt < *max_addrs) {
+ cur_addr = &(*addr)[addr_cnt];
+ memcpy (&cur_addr->addr, cur->ai_addr,
+ MIN (sizeof (cur_addr->addr), cur->ai_addrlen));
+ cur_addr->af = cur->ai_family;
+ cur = cur->ai_next;
+ addr_cnt ++;
+ }
+
+ *max_addrs = addr_cnt;
+
freeaddrinfo (res);
}
else {
gboolean
rspamd_parse_host_port_priority (
const gchar *str,
- rspamd_inet_addr_t *addr,
+ rspamd_inet_addr_t **addr,
+ guint *max_addrs,
guint *priority,
gchar **name,
guint default_port)
return FALSE;
}
- ret = rspamd_parse_host_port_priority_strv (tokens, addr, priority, name,
- default_port);
+ ret = rspamd_parse_host_port_priority_strv (tokens, addr, max_addrs,
+ priority, name, default_port);
g_strfreev (tokens);
gboolean
rspamd_parse_host_port (const gchar *str,
- rspamd_inet_addr_t *addr,
+ rspamd_inet_addr_t **addr,
+ guint *max_addrs,
gchar **name,
guint default_port)
{
- return rspamd_parse_host_port_priority (str, addr, NULL, name, default_port);
+ return rspamd_parse_host_port_priority (str, addr, max_addrs, NULL,
+ name, default_port);
}
gint rspamd_accept_from_socket (gint sock, rspamd_inet_addr_t *addr);
gboolean rspamd_parse_host_port_priority_strv (gchar **tokens,
- rspamd_inet_addr_t *addr, guint *priority, gchar **name, guint default_port);
+ rspamd_inet_addr_t **addr, guint *max_addrs, guint *priority,
+ gchar **name, guint default_port);
/**
* Parse host[:port[:priority]] line
* @return TRUE if string was parsed
*/
gboolean rspamd_parse_host_port_priority (const gchar *str,
- rspamd_inet_addr_t *addr, guint *priority, gchar **name, guint default_port);
+ rspamd_inet_addr_t **addr, guint *max_addrs,
+ guint *priority, gchar **name, guint default_port);
/**
* Parse host:port line
* @return TRUE if string was parsed
*/
gboolean rspamd_parse_host_port (const gchar *str,
- rspamd_inet_addr_t *addr, gchar **name, guint default_port);
+ rspamd_inet_addr_t **addr, guint *max_addrs,
+ gchar **name, guint default_port);
#endif /* ADDR_H_ */
#include "ottery.h"
#include "ref.h"
#include "rdns.h"
+#include "utlist.h"
+
+struct upstream_inet_addr_entry {
+ rspamd_inet_addr_t addr;
+ struct upstream_inet_addr_entry *next;
+};
struct upstream {
guint weight;
guint cur_weight;
guint errors;
- guint port;
gint active_idx;
gchar *name;
struct event ev;
struct timeval tv;
gpointer ud;
struct upstream_list *ls;
- rspamd_inet_addr_t addr;
+
+ struct {
+ rspamd_inet_addr_t *addr;
+ guint count;
+ guint cur;
+ } addrs;
+
+ struct upstream_inet_addr_entry *new_addrs;
+
ref_entry_t ref;
};
const guint default_error_time = 10;
const gdouble default_dns_timeout = 1.0;
const guint default_dns_retransmits = 2;
+const guint default_max_addresses = 1024;
static void
rspamd_upstream_set_active (struct upstream_list *ls, struct upstream *up)
{
struct upstream *up = (struct upstream *)arg;
struct rdns_reply_entry *entry;
+ struct upstream_inet_addr_entry *up_ent;
if (reply->code == RDNS_RC_NOERROR) {
-
+ entry = reply->entries;
+
+ while (entry) {
+
+ if (entry->type == RDNS_REQUEST_A) {
+ up_ent = g_malloc (sizeof (*up_ent));
+
+ up_ent->addr.addr.s4.sin_addr = entry->content.a.addr;
+ up_ent->addr.af = AF_INET;
+ up_ent->addr.slen = sizeof (up_ent->addr.addr.s4);
+ LL_PREPEND (up->new_addrs, up_ent);
+ }
+ else if (entry->type == RDNS_REQUEST_AAAA) {
+ up_ent = g_malloc (sizeof (*up_ent));
+
+ memcpy (&up_ent->addr.addr.s6.sin6_addr,
+ &entry->content.aaa.addr, sizeof (struct in6_addr));
+ up_ent->addr.af = AF_INET6;
+ up_ent->addr.slen = sizeof (up_ent->addr.addr.s6);
+ LL_PREPEND (up->new_addrs, up_ent);
+ }
+ entry = entry->next;
+ }
}
REF_RELEASE (up);
}
static void
-rspamd_revive_cb (int fd, short what, void *arg)
+rspamd_upstream_update_addrs (struct upstream *up)
+{
+ guint16 port;
+ guint addr_cnt;
+ struct upstream_inet_addr_entry *cur, *tmp;
+ rspamd_inet_addr_t *new_addrs, *old;
+
+ /*
+ * We need first of all get the saved port, since DNS gives us no
+ * idea about what port has been used previously
+ */
+ if (up->addrs.count > 0 && up->new_addrs) {
+ port = rspamd_inet_address_get_port (&up->addrs.addr[0]);
+
+ /* Now calculate new addrs count */
+ addr_cnt = 0;
+ LL_FOREACH (up->new_addrs, cur) {
+ addr_cnt ++;
+ }
+ new_addrs = g_new (rspamd_inet_addr_t, addr_cnt);
+
+ /* Copy addrs back */
+ addr_cnt = 0;
+ LL_FOREACH (up->new_addrs, cur) {
+ memcpy (&new_addrs[addr_cnt], cur, sizeof (rspamd_inet_addr_t));
+ rspamd_inet_address_set_port (&new_addrs[addr_cnt], port);
+ addr_cnt ++;
+ }
+
+ old = up->addrs.addr;
+ up->addrs.cur = 0;
+ up->addrs.count = addr_cnt;
+ up->addrs.addr = new_addrs;
+ g_free (old);
+ }
+
+ LL_FOREACH_SAFE (up->new_addrs, cur, tmp) {
+ g_free (cur);
+ }
+ up->new_addrs = NULL;
+}
+
+static void
+rspamd_upstream_revive_cb (int fd, short what, void *arg)
{
struct upstream *up = (struct upstream *)arg;
event_del (&up->ev);
if (up->ls) {
rspamd_upstream_set_active (up->ls, up);
+
+ if (up->new_addrs) {
+ rspamd_upstream_update_addrs (up);
+ }
}
REF_RELEASE (up);
static void
rspamd_upstream_set_inactive (struct upstream_list *ls, struct upstream *up)
{
- gint query_type = -1;
-
rspamd_mutex_lock (ls->lock);
g_ptr_array_remove_index (ls->alive, up->active_idx);
up->active_idx = -1;
- /* Resolve name of the upstream one more time */
- if (up->addr.af == AF_INET) {
- query_type = RDNS_REQUEST_A;
- }
- else if (up->addr.af == AF_INET6) {
- query_type = RDNS_REQUEST_AAAA;
- }
- if (query_type != -1) {
+ /* Resolve name of the upstream one more time */
+ if (up->name[0] != '/') {
REF_RETAIN (up);
rdns_make_request_full (res, rspamd_upstream_dns_cb, up,
default_dns_timeout, default_dns_retransmits,
RDNS_REQUEST_A, up->name);
+ REF_RETAIN (up);
+ rdns_make_request_full (res, rspamd_upstream_dns_cb, up,
+ default_dns_timeout, default_dns_retransmits,
+ RDNS_REQUEST_AAAA, up->name);
}
REF_RETAIN (up);
- evtimer_set (&up->ev, rspamd_revive_cb, up);
+ evtimer_set (&up->ev, rspamd_upstream_revive_cb, up);
event_base_set (ev_base, &up->ev);
up->tv.tv_sec = default_revive_time;
up->tv.tv_usec = 0;
ls = g_slice_alloc (sizeof (*ls));
ls->hash_seed = ottery_rand_unsigned ();
- ls->ups = g_array_new (FALSE, TRUE, sizeof (struct upstream_list));
+ ls->ups = g_ptr_array_new ();
ls->alive = g_ptr_array_new ();
ls->lock = rspamd_mutex_new ();
static void
rspamd_upstream_dtor (struct upstream *up)
{
+ struct upstream_inet_addr_entry *cur, *tmp;
+
+ if (up->new_addrs) {
+ LL_FOREACH_SAFE(up->new_addrs, cur, tmp) {
+ g_free (cur);
+ }
+ }
+
g_free (up->name);
g_slice_free1 (sizeof (*up), up);
}
rspamd_inet_addr_t*
rspamd_upstream_addr (struct upstream *up)
{
- return &up->addr;
+ return &up->addrs.addr[up->addrs.cur++ % up->addrs.count];
}
gboolean
up = g_slice_alloc0 (sizeof (*up));
- if (!rspamd_parse_host_port_priority (str, &up->addr, &up->weight,
+ up->addrs.count = default_max_addresses;
+ if (!rspamd_parse_host_port_priority (str, &up->addrs.addr,
+ &up->addrs.count, &up->weight,
&up->name, def_port)) {
g_slice_free1 (sizeof (*up), up);
return FALSE;
g_ptr_array_add (ups->ups, up);
up->ud = data;
up->cur_weight = up->weight;
- up->port = rspamd_inet_address_get_port (&up->addr);
REF_INIT_RETAIN (up, rspamd_upstream_dtor);
rspamd_upstream_set_active (ups, up);