]> source.dussan.org Git - rspamd.git/commitdiff
Add some basic logic for upstreams.
authorVsevolod Stakhov <vsevolod@highsecure.ru>
Tue, 28 Oct 2014 15:19:06 +0000 (15:19 +0000)
committerVsevolod Stakhov <vsevolod@highsecure.ru>
Tue, 28 Oct 2014 15:19:06 +0000 (15:19 +0000)
src/libutil/addr.c
src/libutil/addr.h
src/libutil/upstream.c

index 00b371ab261a41df4423be9ed398b2892fda31b0..362ab399482f9c257d20c4e5aa83fb5a5888c927 100644 (file)
@@ -185,7 +185,7 @@ rspamd_inet_address_connect (rspamd_inet_addr_t *addr, gint type,
 
 gboolean
 rspamd_parse_host_port_priority_strv (gchar **tokens,
-       rspamd_inet_addr_t *addr, guint *priority, guint default_port)
+       rspamd_inet_addr_t *addr, guint *priority, gchar **name, guint default_port)
 {
        gchar *err_str, portbuf[8];
        const gchar *cur_tok, *cur_port;
@@ -267,6 +267,7 @@ rspamd_parse_host_port_priority_strv (gchar **tokens,
        if ((r = getaddrinfo (cur_tok, cur_port, &hints, &res)) == 0) {
                memcpy (&addr->addr, res->ai_addr,
                        MIN (sizeof (addr->addr), res->ai_addrlen));
+               addr->af = res->ai_family;
                freeaddrinfo (res);
        }
        else {
@@ -277,6 +278,9 @@ rspamd_parse_host_port_priority_strv (gchar **tokens,
        }
 
        /* Restore errno */
+       if (name != NULL) {
+               *name = g_strdup (tokens[0]);
+       }
        errno = saved_errno;
        return TRUE;
 
@@ -290,6 +294,7 @@ rspamd_parse_host_port_priority (
        const gchar *str,
        rspamd_inet_addr_t *addr,
        guint *priority,
+       gchar **name,
        guint default_port)
 {
        gchar **tokens;
@@ -300,7 +305,8 @@ rspamd_parse_host_port_priority (
                return FALSE;
        }
 
-       ret = rspamd_parse_host_port_priority_strv (tokens, addr, priority, default_port);
+       ret = rspamd_parse_host_port_priority_strv (tokens, addr, priority, name,
+                       default_port);
 
        g_strfreev (tokens);
 
@@ -310,7 +316,8 @@ rspamd_parse_host_port_priority (
 gboolean
 rspamd_parse_host_port (const gchar *str,
        rspamd_inet_addr_t *addr,
+       gchar **name,
        guint default_port)
 {
-       return rspamd_parse_host_port_priority (str, addr, NULL, default_port);
+       return rspamd_parse_host_port_priority (str, addr, NULL, name, default_port);
 }
index 706f2bf87a090738be7ebb806851dceb821279a7..4e3a936f8602431ffa4449cef49a91af56f15078 100644 (file)
@@ -96,7 +96,7 @@ gboolean rspamd_ip_is_valid (rspamd_inet_addr_t *addr);
 gint rspamd_accept_from_socket (gint sock, rspamd_inet_addr_t *addr);
 
 gboolean rspamd_parse_host_port_priority_strv (gchar **tokens,
-       rspamd_inet_addr_t *addr, guint *priority, guint default_port);
+       rspamd_inet_addr_t *addr, guint *priority, gchar **name, guint default_port);
 
 /**
  * Parse host[:port[:priority]] line
@@ -106,7 +106,7 @@ gboolean rspamd_parse_host_port_priority_strv (gchar **tokens,
  * @return TRUE if string was parsed
  */
 gboolean rspamd_parse_host_port_priority (const gchar *str,
-               rspamd_inet_addr_t *addr, guint *priority, guint default_port);
+               rspamd_inet_addr_t *addr, guint *priority, gchar **name, guint default_port);
 
 /**
  * Parse host:port line
@@ -115,7 +115,7 @@ gboolean rspamd_parse_host_port_priority (const gchar *str,
  * @return TRUE if string was parsed
  */
 gboolean rspamd_parse_host_port (const gchar *str,
-       rspamd_inet_addr_t *addr, guint default_port);
+       rspamd_inet_addr_t *addr, gchar **name, guint default_port);
 
 
 #endif /* ADDR_H_ */
index f8a0d761842ad23a48a049aa40663483205a52f8..44a0455adce6f160f926a0cbd7f30121c264892e 100644 (file)
 #include "config.h"
 #include "upstream.h"
 #include "ottery.h"
+#include "ref.h"
+#include "rdns.h"
 
 struct upstream {
        guint weight;
        guint cur_weight;
        guint errors;
        guint port;
-       guint active_idx;
+       gint active_idx;
        gchar *name;
        struct event ev;
        struct timeval tv;
        gpointer ud;
        struct upstream_list *ls;
        rspamd_inet_addr_t addr;
+       ref_entry_t ref;
 };
 
 struct upstream_list {
-       GArray *ups;
+       GPtrArray *ups;
        GPtrArray *alive;
        rspamd_mutex_t *lock;
        guint hash_seed;
@@ -49,23 +52,78 @@ struct upstream_list {
 
 static struct rdns_resolver *res = NULL;
 static struct event_base *ev_base = NULL;
+/* 4 errors in 10 seconds */
 const guint default_max_errors = 4;
+const guint default_revive_time = 60;
+const guint default_error_time = 10;
+const gdouble default_dns_timeout = 1.0;
+const guint default_dns_retransmits = 2;
 
 static void
-rspamd_upstream_set_inactive (struct upstream_list *ls, struct upstream *up)
+rspamd_upstream_set_active (struct upstream_list *ls, struct upstream *up)
 {
        rspamd_mutex_lock (ls->lock);
-       g_ptr_array_remove_index (ls->alive, up->active_idx);
-       up->active_idx = 0;
+       g_ptr_array_add (ls->alive, up);
+       up->active_idx = ls->alive->len - 1;
        rspamd_mutex_unlock (ls->lock);
 }
 
 static void
-rspamd_upstream_set_active (struct upstream_list *ls, struct upstream *up)
+rspamd_upstream_dns_cb (struct rdns_reply *reply, void *arg)
 {
+       struct upstream *up = (struct upstream *)arg;
+       struct rdns_reply_entry *entry;
+
+       if (reply->code == RDNS_RC_NOERROR) {
+
+       }
+
+       REF_RELEASE (up);
+}
+
+static void
+rspamd_revive_cb (int fd, short what, void *arg)
+{
+       struct upstream *up = (struct upstream *)arg;
+
+       event_del (&up->ev);
+       if (up->ls) {
+               rspamd_upstream_set_active (up->ls, up);
+       }
+
+       REF_RELEASE (up);
+}
+
+static void
+rspamd_upstream_set_inactive (struct upstream_list *ls, struct upstream *up)
+{
+       gint query_type = -1;
+
        rspamd_mutex_lock (ls->lock);
-       g_ptr_array_add (ls->alive, up);
-       up->active_idx = ls->alive->len - 1;
+       g_ptr_array_remove_index (ls->alive, up->active_idx);
+       up->active_idx = -1;
+       /* Resolve name of the upstream one more time */
+       if (up->addr.af == AF_INET) {
+               query_type = RDNS_REQUEST_A;
+       }
+       else if (up->addr.af == AF_INET6) {
+               query_type = RDNS_REQUEST_AAAA;
+       }
+
+       if (query_type != -1) {
+               REF_RETAIN (up);
+               rdns_make_request_full (res, rspamd_upstream_dns_cb, up,
+                       default_dns_timeout, default_dns_retransmits,
+                       RDNS_REQUEST_A, up->name);
+       }
+
+       REF_RETAIN (up);
+       evtimer_set (&up->ev, rspamd_revive_cb, up);
+       event_base_set (ev_base, &up->ev);
+       up->tv.tv_sec = default_revive_time;
+       up->tv.tv_usec = 0;
+       event_add (&up->ev, &up->tv);
+
        rspamd_mutex_unlock (ls->lock);
 }
 
@@ -80,6 +138,9 @@ rspamd_upstreams_library_init (struct rdns_resolver *resolver,
 void
 rspamd_upstream_fail (struct upstream *up)
 {
+       struct timeval tv;
+       gdouble error_rate, max_error_rate;
+
        if (g_atomic_int_compare_and_exchange (&up->errors, 0, 1)) {
                gettimeofday (&up->tv, NULL);
                up->errors ++;
@@ -88,7 +149,12 @@ rspamd_upstream_fail (struct upstream *up)
                g_atomic_int_inc (&up->errors);
        }
 
-       if (g_atomic_int_compare_and_exchange (&up->errors, default_max_errors, 0)) {
+       gettimeofday (&tv, NULL);
+
+       error_rate = ((gdouble)up->errors) / (tv.tv_sec - up->tv.tv_sec);
+       max_error_rate = (gdouble)default_max_errors / (gdouble)default_error_time;
+
+       if (error_rate > max_error_rate) {
                /* Remove upstream from the active list */
                rspamd_upstream_set_inactive (up->ls, up);
        }
@@ -120,9 +186,59 @@ rspamd_upstreams_create (void)
        return ls;
 }
 
+static void
+rspamd_upstream_dtor (struct upstream *up)
+{
+       g_free (up->name);
+       g_slice_free1 (sizeof (*up), up);
+}
 
 rspamd_inet_addr_t*
 rspamd_upstream_addr (struct upstream *up)
 {
        return &up->addr;
 }
+
+gboolean
+rspamd_upstreams_add_upstream (struct upstream_list *ups,
+               const gchar *str, guint16 def_port, void *data)
+{
+       struct upstream *up;
+
+       up = g_slice_alloc0 (sizeof (*up));
+
+       if (!rspamd_parse_host_port_priority (str, &up->addr, &up->weight,
+                       &up->name, def_port)) {
+               g_slice_free1 (sizeof (*up), up);
+               return FALSE;
+       }
+
+       g_ptr_array_add (ups->ups, up);
+       up->ud = data;
+       up->cur_weight = up->weight;
+       up->port = rspamd_inet_address_get_port (&up->addr);
+       REF_INIT_RETAIN (up, rspamd_upstream_dtor);
+
+       rspamd_upstream_set_active (ups, up);
+
+       return TRUE;
+}
+
+void
+rspamd_upstreams_destroy (struct upstream_list *ups)
+{
+       guint i;
+       struct upstream *up;
+
+       g_ptr_array_free (ups->alive, TRUE);
+
+       for (i = 0; i < ups->ups->len; i ++) {
+               up = g_ptr_array_index (ups->ups, i);
+               up->ls = NULL;
+               REF_RELEASE (up);
+       }
+
+       g_ptr_array_free (ups->ups, TRUE);
+       rspamd_mutex_free (ups->lock);
+       g_slice_free1 (sizeof (*ups), ups);
+}