From f046f61e40a6033828e89be935637c9d8a389092 Mon Sep 17 00:00:00 2001 From: Vsevolod Stakhov Date: Tue, 28 Oct 2014 15:19:06 +0000 Subject: [PATCH] Add some basic logic for upstreams. --- src/libutil/addr.c | 13 +++- src/libutil/addr.h | 6 +- src/libutil/upstream.c | 134 ++++++++++++++++++++++++++++++++++++++--- 3 files changed, 138 insertions(+), 15 deletions(-) diff --git a/src/libutil/addr.c b/src/libutil/addr.c index 00b371ab2..362ab3994 100644 --- a/src/libutil/addr.c +++ b/src/libutil/addr.c @@ -185,7 +185,7 @@ rspamd_inet_address_connect (rspamd_inet_addr_t *addr, gint type, gboolean rspamd_parse_host_port_priority_strv (gchar **tokens, - rspamd_inet_addr_t *addr, guint *priority, guint default_port) + rspamd_inet_addr_t *addr, guint *priority, gchar **name, guint default_port) { gchar *err_str, portbuf[8]; const gchar *cur_tok, *cur_port; @@ -267,6 +267,7 @@ rspamd_parse_host_port_priority_strv (gchar **tokens, if ((r = getaddrinfo (cur_tok, cur_port, &hints, &res)) == 0) { memcpy (&addr->addr, res->ai_addr, MIN (sizeof (addr->addr), res->ai_addrlen)); + addr->af = res->ai_family; freeaddrinfo (res); } else { @@ -277,6 +278,9 @@ rspamd_parse_host_port_priority_strv (gchar **tokens, } /* Restore errno */ + if (name != NULL) { + *name = g_strdup (tokens[0]); + } errno = saved_errno; return TRUE; @@ -290,6 +294,7 @@ rspamd_parse_host_port_priority ( const gchar *str, rspamd_inet_addr_t *addr, guint *priority, + gchar **name, guint default_port) { gchar **tokens; @@ -300,7 +305,8 @@ rspamd_parse_host_port_priority ( return FALSE; } - ret = rspamd_parse_host_port_priority_strv (tokens, addr, priority, default_port); + ret = rspamd_parse_host_port_priority_strv (tokens, addr, priority, name, + default_port); g_strfreev (tokens); @@ -310,7 +316,8 @@ rspamd_parse_host_port_priority ( gboolean rspamd_parse_host_port (const gchar *str, rspamd_inet_addr_t *addr, + gchar **name, guint default_port) { - return rspamd_parse_host_port_priority (str, addr, NULL, default_port); + return rspamd_parse_host_port_priority (str, addr, NULL, name, default_port); } diff --git a/src/libutil/addr.h b/src/libutil/addr.h index 706f2bf87..4e3a936f8 100644 --- a/src/libutil/addr.h +++ b/src/libutil/addr.h @@ -96,7 +96,7 @@ gboolean rspamd_ip_is_valid (rspamd_inet_addr_t *addr); gint rspamd_accept_from_socket (gint sock, rspamd_inet_addr_t *addr); gboolean rspamd_parse_host_port_priority_strv (gchar **tokens, - rspamd_inet_addr_t *addr, guint *priority, guint default_port); + rspamd_inet_addr_t *addr, guint *priority, gchar **name, guint default_port); /** * Parse host[:port[:priority]] line @@ -106,7 +106,7 @@ gboolean rspamd_parse_host_port_priority_strv (gchar **tokens, * @return TRUE if string was parsed */ gboolean rspamd_parse_host_port_priority (const gchar *str, - rspamd_inet_addr_t *addr, guint *priority, guint default_port); + rspamd_inet_addr_t *addr, guint *priority, gchar **name, guint default_port); /** * Parse host:port line @@ -115,7 +115,7 @@ gboolean rspamd_parse_host_port_priority (const gchar *str, * @return TRUE if string was parsed */ gboolean rspamd_parse_host_port (const gchar *str, - rspamd_inet_addr_t *addr, guint default_port); + rspamd_inet_addr_t *addr, gchar **name, guint default_port); #endif /* ADDR_H_ */ diff --git a/src/libutil/upstream.c b/src/libutil/upstream.c index f8a0d7618..44a0455ad 100644 --- a/src/libutil/upstream.c +++ b/src/libutil/upstream.c @@ -25,23 +25,26 @@ #include "config.h" #include "upstream.h" #include "ottery.h" +#include "ref.h" +#include "rdns.h" struct upstream { guint weight; guint cur_weight; guint errors; guint port; - guint active_idx; + gint active_idx; gchar *name; struct event ev; struct timeval tv; gpointer ud; struct upstream_list *ls; rspamd_inet_addr_t addr; + ref_entry_t ref; }; struct upstream_list { - GArray *ups; + GPtrArray *ups; GPtrArray *alive; rspamd_mutex_t *lock; guint hash_seed; @@ -49,23 +52,78 @@ struct upstream_list { static struct rdns_resolver *res = NULL; static struct event_base *ev_base = NULL; +/* 4 errors in 10 seconds */ const guint default_max_errors = 4; +const guint default_revive_time = 60; +const guint default_error_time = 10; +const gdouble default_dns_timeout = 1.0; +const guint default_dns_retransmits = 2; static void -rspamd_upstream_set_inactive (struct upstream_list *ls, struct upstream *up) +rspamd_upstream_set_active (struct upstream_list *ls, struct upstream *up) { rspamd_mutex_lock (ls->lock); - g_ptr_array_remove_index (ls->alive, up->active_idx); - up->active_idx = 0; + g_ptr_array_add (ls->alive, up); + up->active_idx = ls->alive->len - 1; rspamd_mutex_unlock (ls->lock); } static void -rspamd_upstream_set_active (struct upstream_list *ls, struct upstream *up) +rspamd_upstream_dns_cb (struct rdns_reply *reply, void *arg) { + struct upstream *up = (struct upstream *)arg; + struct rdns_reply_entry *entry; + + if (reply->code == RDNS_RC_NOERROR) { + + } + + REF_RELEASE (up); +} + +static void +rspamd_revive_cb (int fd, short what, void *arg) +{ + struct upstream *up = (struct upstream *)arg; + + event_del (&up->ev); + if (up->ls) { + rspamd_upstream_set_active (up->ls, up); + } + + REF_RELEASE (up); +} + +static void +rspamd_upstream_set_inactive (struct upstream_list *ls, struct upstream *up) +{ + gint query_type = -1; + rspamd_mutex_lock (ls->lock); - g_ptr_array_add (ls->alive, up); - up->active_idx = ls->alive->len - 1; + g_ptr_array_remove_index (ls->alive, up->active_idx); + up->active_idx = -1; + /* Resolve name of the upstream one more time */ + if (up->addr.af == AF_INET) { + query_type = RDNS_REQUEST_A; + } + else if (up->addr.af == AF_INET6) { + query_type = RDNS_REQUEST_AAAA; + } + + if (query_type != -1) { + REF_RETAIN (up); + rdns_make_request_full (res, rspamd_upstream_dns_cb, up, + default_dns_timeout, default_dns_retransmits, + RDNS_REQUEST_A, up->name); + } + + REF_RETAIN (up); + evtimer_set (&up->ev, rspamd_revive_cb, up); + event_base_set (ev_base, &up->ev); + up->tv.tv_sec = default_revive_time; + up->tv.tv_usec = 0; + event_add (&up->ev, &up->tv); + rspamd_mutex_unlock (ls->lock); } @@ -80,6 +138,9 @@ rspamd_upstreams_library_init (struct rdns_resolver *resolver, void rspamd_upstream_fail (struct upstream *up) { + struct timeval tv; + gdouble error_rate, max_error_rate; + if (g_atomic_int_compare_and_exchange (&up->errors, 0, 1)) { gettimeofday (&up->tv, NULL); up->errors ++; @@ -88,7 +149,12 @@ rspamd_upstream_fail (struct upstream *up) g_atomic_int_inc (&up->errors); } - if (g_atomic_int_compare_and_exchange (&up->errors, default_max_errors, 0)) { + gettimeofday (&tv, NULL); + + error_rate = ((gdouble)up->errors) / (tv.tv_sec - up->tv.tv_sec); + max_error_rate = (gdouble)default_max_errors / (gdouble)default_error_time; + + if (error_rate > max_error_rate) { /* Remove upstream from the active list */ rspamd_upstream_set_inactive (up->ls, up); } @@ -120,9 +186,59 @@ rspamd_upstreams_create (void) return ls; } +static void +rspamd_upstream_dtor (struct upstream *up) +{ + g_free (up->name); + g_slice_free1 (sizeof (*up), up); +} rspamd_inet_addr_t* rspamd_upstream_addr (struct upstream *up) { return &up->addr; } + +gboolean +rspamd_upstreams_add_upstream (struct upstream_list *ups, + const gchar *str, guint16 def_port, void *data) +{ + struct upstream *up; + + up = g_slice_alloc0 (sizeof (*up)); + + if (!rspamd_parse_host_port_priority (str, &up->addr, &up->weight, + &up->name, def_port)) { + g_slice_free1 (sizeof (*up), up); + return FALSE; + } + + g_ptr_array_add (ups->ups, up); + up->ud = data; + up->cur_weight = up->weight; + up->port = rspamd_inet_address_get_port (&up->addr); + REF_INIT_RETAIN (up, rspamd_upstream_dtor); + + rspamd_upstream_set_active (ups, up); + + return TRUE; +} + +void +rspamd_upstreams_destroy (struct upstream_list *ups) +{ + guint i; + struct upstream *up; + + g_ptr_array_free (ups->alive, TRUE); + + for (i = 0; i < ups->ups->len; i ++) { + up = g_ptr_array_index (ups->ups, i); + up->ls = NULL; + REF_RELEASE (up); + } + + g_ptr_array_free (ups->ups, TRUE); + rspamd_mutex_free (ups->lock); + g_slice_free1 (sizeof (*ups), ups); +} -- 2.39.5