]> source.dussan.org Git - rspamd.git/commitdiff
Upstreams get implementation.
authorVsevolod Stakhov <vsevolod@highsecure.ru>
Tue, 28 Oct 2014 17:04:43 +0000 (17:04 +0000)
committerVsevolod Stakhov <vsevolod@highsecure.ru>
Tue, 28 Oct 2014 17:04:43 +0000 (17:04 +0000)
src/libutil/upstream.c

index 55f44ec80e337e5a995ab40a040b8c3bcf918874..ca3c4a947fbd1506ba6fe9d13c779a268c8a4979 100644 (file)
@@ -27,6 +27,7 @@
 #include "ottery.h"
 #include "ref.h"
 #include "rdns.h"
+#include "xxhash.h"
 #include "utlist.h"
 
 struct upstream_inet_addr_entry {
@@ -52,6 +53,7 @@ struct upstream {
        } addrs;
 
        struct upstream_inet_addr_entry *new_addrs;
+       rspamd_mutex_t *lock;
 
        ref_entry_t ref;
 };
@@ -60,7 +62,7 @@ struct upstream_list {
        GPtrArray *ups;
        GPtrArray *alive;
        rspamd_mutex_t *lock;
-       guint hash_seed;
+       guint64 hash_seed;
 };
 
 static struct rdns_resolver *res = NULL;
@@ -92,6 +94,7 @@ rspamd_upstream_dns_cb (struct rdns_reply *reply, void *arg)
        if (reply->code == RDNS_RC_NOERROR) {
                entry = reply->entries;
 
+               rspamd_mutex_lock (up->lock);
                while (entry) {
 
                        if (entry->type == RDNS_REQUEST_A) {
@@ -113,6 +116,7 @@ rspamd_upstream_dns_cb (struct rdns_reply *reply, void *arg)
                        }
                        entry = entry->next;
                }
+               rspamd_mutex_unlock (up->lock);
        }
 
        REF_RELEASE (up);
@@ -166,6 +170,7 @@ rspamd_upstream_revive_cb (int fd, short what, void *arg)
 {
        struct upstream *up = (struct upstream *)arg;
 
+       rspamd_mutex_lock (up->lock);
        event_del (&up->ev);
        if (up->ls) {
                rspamd_upstream_set_active (up->ls, up);
@@ -175,6 +180,7 @@ rspamd_upstream_revive_cb (int fd, short what, void *arg)
                }
        }
 
+       rspamd_mutex_unlock (up->lock);
        REF_RELEASE (up);
 }
 
@@ -221,6 +227,7 @@ rspamd_upstream_fail (struct upstream *up)
        struct timeval tv;
        gdouble error_rate, max_error_rate;
 
+       rspamd_mutex_lock (up->lock);
        if (g_atomic_int_compare_and_exchange (&up->errors, 0, 1)) {
                gettimeofday (&up->tv, NULL);
                up->errors ++;
@@ -238,18 +245,19 @@ rspamd_upstream_fail (struct upstream *up)
                /* Remove upstream from the active list */
                rspamd_upstream_set_inactive (up->ls, up);
        }
+       rspamd_mutex_unlock (up->lock);
 }
 
 void
 rspamd_upstream_ok (struct upstream *up)
 {
+       rspamd_mutex_lock (up->lock);
        if (up->errors > 0) {
                up->errors = 0;
                rspamd_upstream_set_active (up->ls, up);
        }
 
-       /* Rotate weight of the alive upstream */
-       up->cur_weight = up->cur_weight > 0 ? up->cur_weight -- : up->weight;
+       rspamd_mutex_unlock (up->lock);
 }
 
 struct upstream_list*
@@ -258,7 +266,7 @@ rspamd_upstreams_create (void)
        struct upstream_list *ls;
 
        ls = g_slice_alloc (sizeof (*ls));
-       ls->hash_seed = ottery_rand_unsigned ();
+       ottery_rand_bytes (&ls->hash_seed, sizeof (ls->hash_seed));
        ls->ups = g_ptr_array_new ();
        ls->alive = g_ptr_array_new ();
        ls->lock = rspamd_mutex_new ();
@@ -277,6 +285,7 @@ rspamd_upstream_dtor (struct upstream *up)
                }
        }
 
+       rspamd_mutex_free (up->lock);
        g_free (up->name);
        g_slice_free1 (sizeof (*up), up);
 }
@@ -307,6 +316,7 @@ rspamd_upstreams_add_upstream (struct upstream_list *ups,
        up->ud = data;
        up->cur_weight = up->weight;
        REF_INIT_RETAIN (up, rspamd_upstream_dtor);
+       up->lock = rspamd_mutex_new ();
 
        rspamd_upstream_set_active (ups, up);
 
@@ -331,3 +341,143 @@ rspamd_upstreams_destroy (struct upstream_list *ups)
        rspamd_mutex_free (ups->lock);
        g_slice_free1 (sizeof (*ups), ups);
 }
+
+static void
+rspamd_upstream_restore_cb (gpointer elt, gpointer ls)
+{
+       struct upstream *up = (struct upstream *)elt;
+       struct upstream_list *ups = (struct upstream_list *)ls;
+
+       /* Here the upstreams list is already locked */
+       rspamd_mutex_lock (up->lock);
+       event_del (&up->ev);
+
+       if (up->new_addrs) {
+               rspamd_upstream_update_addrs (up);
+       }
+
+       g_ptr_array_add (ups->alive, up);
+       up->active_idx = ups->alive->len - 1;
+       rspamd_mutex_lock (up->lock);
+       /* For revive event */
+       REF_RELEASE (up);
+}
+
+static struct upstream*
+rspamd_upstream_get_random (struct upstream_list *ups)
+{
+       guint idx = ottery_rand_range (ups->alive->len - 1);
+
+       return g_ptr_array_index (ups->alive, idx);
+}
+
+static struct upstream*
+rspamd_upstream_get_round_robin (struct upstream_list *ups, gboolean use_cur)
+{
+       guint max_weight = 0;
+       struct upstream *up, *selected;
+       guint i;
+
+       /* Select upstream with the maximum cur_weight */
+       rspamd_mutex_lock (ups->lock);
+       for (i = 0; i < ups->alive->len; i ++) {
+               up = g_ptr_array_index (ups->alive, i);
+               if (use_cur) {
+                       if (up->cur_weight > max_weight) {
+                               selected = up;
+                               max_weight = up->cur_weight;
+                       }
+               }
+               else {
+                       if (up->weight > max_weight) {
+                               selected = up;
+                               max_weight = up->weight;
+                       }
+               }
+       }
+
+       if (use_cur) {
+               if (selected->cur_weight > 0) {
+                       selected->cur_weight--;
+               }
+               else {
+                       selected->cur_weight = selected->weight;
+               }
+       }
+       rspamd_mutex_unlock (ups->lock);
+
+       return selected;
+}
+
+/*
+ * The key idea of this function is obtained from the following paper:
+ * A Fast, Minimal Memory, Consistent Hash Algorithm
+ * John Lamping, Eric Veach
+ *
+ * http://arxiv.org/abs/1406.2294
+ */
+static guint32
+rspamd_consistent_hash (guint64 key, guint32 nbuckets)
+{
+       gint64 b = -1, j = 0;
+
+       while (j < nbuckets) {
+               b = j;
+               key *= 2862933555777941757ULL + 1;
+               j = (b + 1) * (double)(1ULL << 31) / (double)((key >> 33) + 1ULL);
+       }
+
+       return b;
+}
+
+static struct upstream*
+rspamd_upstream_get_hashed (struct upstream_list *ups, const guint8 *key, guint keylen)
+{
+       union {
+               guint64 k64;
+               guint32 k32[2];
+       } h;
+
+       guint32 idx;
+
+       /* Generate 64 bits input key */
+       h.k32[0] = XXH32 (key, keylen, ((guint32*)&ups->hash_seed)[0]);
+       h.k32[1] = XXH32 (key, keylen, ((guint32*)&ups->hash_seed)[1]);
+
+       rspamd_mutex_lock (ups->lock);
+       idx = rspamd_consistent_hash (h.k64, ups->alive->len);
+       rspamd_mutex_unlock (ups->lock);
+
+       return g_ptr_array_index (ups->alive, idx);
+}
+
+struct upstream*
+rspamd_upstream_get (struct upstream_list *ups,
+               enum rspamd_upstream_rotation type, ...)
+{
+       va_list ap;
+       const guint8 *key;
+       guint keylen;
+
+       rspamd_mutex_lock (ups->lock);
+       if (ups->alive->len == 0) {
+               /* We have no upstreams alive */
+               g_ptr_array_foreach (ups->ups, rspamd_upstream_restore_cb, ups);
+       }
+       rspamd_mutex_unlock (ups->lock);
+
+       switch (type) {
+       case RSPAMD_UPSTREAM_RANDOM:
+               return rspamd_upstream_get_random (ups);
+       case RSPAMD_UPSTREAM_HASHED:
+               va_start (ap, type);
+               key = va_arg (ap, const guint8 *);
+               keylen = va_arg (ap, guint);
+               va_end (ap);
+               return rspamd_upstream_get_hashed (ups, key, keylen);
+       case RSPAMD_UPSTREAM_ROUND_ROBIN:
+               return rspamd_upstream_get_round_robin (ups, TRUE);
+       case RSPAMD_UPSTREAM_MASTER_SLAVE:
+               return rspamd_upstream_get_round_robin (ups, FALSE);
+       }
+}