summaryrefslogtreecommitdiffstats
path: root/src/libutil
diff options
context:
space:
mode:
authorVsevolod Stakhov <vsevolod@highsecure.ru>2014-10-28 17:04:43 +0000
committerVsevolod Stakhov <vsevolod@highsecure.ru>2014-10-28 17:04:43 +0000
commit88490d9f18997e2acc347e6891953c19eaf7c7df (patch)
tree97c7f56588605eb2a79453b738e4015764016e03 /src/libutil
parentcbde625fb24020aa9038947f865cd41e91687553 (diff)
downloadrspamd-88490d9f18997e2acc347e6891953c19eaf7c7df.tar.gz
rspamd-88490d9f18997e2acc347e6891953c19eaf7c7df.zip
Upstreams get implementation.
Diffstat (limited to 'src/libutil')
-rw-r--r--src/libutil/upstream.c158
1 files changed, 154 insertions, 4 deletions
diff --git a/src/libutil/upstream.c b/src/libutil/upstream.c
index 55f44ec80..ca3c4a947 100644
--- a/src/libutil/upstream.c
+++ b/src/libutil/upstream.c
@@ -27,6 +27,7 @@
#include "ottery.h"
#include "ref.h"
#include "rdns.h"
+#include "xxhash.h"
#include "utlist.h"
struct upstream_inet_addr_entry {
@@ -52,6 +53,7 @@ struct upstream {
} addrs;
struct upstream_inet_addr_entry *new_addrs;
+ rspamd_mutex_t *lock;
ref_entry_t ref;
};
@@ -60,7 +62,7 @@ struct upstream_list {
GPtrArray *ups;
GPtrArray *alive;
rspamd_mutex_t *lock;
- guint hash_seed;
+ guint64 hash_seed;
};
static struct rdns_resolver *res = NULL;
@@ -92,6 +94,7 @@ rspamd_upstream_dns_cb (struct rdns_reply *reply, void *arg)
if (reply->code == RDNS_RC_NOERROR) {
entry = reply->entries;
+ rspamd_mutex_lock (up->lock);
while (entry) {
if (entry->type == RDNS_REQUEST_A) {
@@ -113,6 +116,7 @@ rspamd_upstream_dns_cb (struct rdns_reply *reply, void *arg)
}
entry = entry->next;
}
+ rspamd_mutex_unlock (up->lock);
}
REF_RELEASE (up);
@@ -166,6 +170,7 @@ rspamd_upstream_revive_cb (int fd, short what, void *arg)
{
struct upstream *up = (struct upstream *)arg;
+ rspamd_mutex_lock (up->lock);
event_del (&up->ev);
if (up->ls) {
rspamd_upstream_set_active (up->ls, up);
@@ -175,6 +180,7 @@ rspamd_upstream_revive_cb (int fd, short what, void *arg)
}
}
+ rspamd_mutex_unlock (up->lock);
REF_RELEASE (up);
}
@@ -221,6 +227,7 @@ rspamd_upstream_fail (struct upstream *up)
struct timeval tv;
gdouble error_rate, max_error_rate;
+ rspamd_mutex_lock (up->lock);
if (g_atomic_int_compare_and_exchange (&up->errors, 0, 1)) {
gettimeofday (&up->tv, NULL);
up->errors ++;
@@ -238,18 +245,19 @@ rspamd_upstream_fail (struct upstream *up)
/* Remove upstream from the active list */
rspamd_upstream_set_inactive (up->ls, up);
}
+ rspamd_mutex_unlock (up->lock);
}
void
rspamd_upstream_ok (struct upstream *up)
{
+ rspamd_mutex_lock (up->lock);
if (up->errors > 0) {
up->errors = 0;
rspamd_upstream_set_active (up->ls, up);
}
- /* Rotate weight of the alive upstream */
- up->cur_weight = up->cur_weight > 0 ? up->cur_weight -- : up->weight;
+ rspamd_mutex_unlock (up->lock);
}
struct upstream_list*
@@ -258,7 +266,7 @@ rspamd_upstreams_create (void)
struct upstream_list *ls;
ls = g_slice_alloc (sizeof (*ls));
- ls->hash_seed = ottery_rand_unsigned ();
+ ottery_rand_bytes (&ls->hash_seed, sizeof (ls->hash_seed));
ls->ups = g_ptr_array_new ();
ls->alive = g_ptr_array_new ();
ls->lock = rspamd_mutex_new ();
@@ -277,6 +285,7 @@ rspamd_upstream_dtor (struct upstream *up)
}
}
+ rspamd_mutex_free (up->lock);
g_free (up->name);
g_slice_free1 (sizeof (*up), up);
}
@@ -307,6 +316,7 @@ rspamd_upstreams_add_upstream (struct upstream_list *ups,
up->ud = data;
up->cur_weight = up->weight;
REF_INIT_RETAIN (up, rspamd_upstream_dtor);
+ up->lock = rspamd_mutex_new ();
rspamd_upstream_set_active (ups, up);
@@ -331,3 +341,143 @@ rspamd_upstreams_destroy (struct upstream_list *ups)
rspamd_mutex_free (ups->lock);
g_slice_free1 (sizeof (*ups), ups);
}
+
+static void
+rspamd_upstream_restore_cb (gpointer elt, gpointer ls)
+{
+ struct upstream *up = (struct upstream *)elt;
+ struct upstream_list *ups = (struct upstream_list *)ls;
+
+ /* Here the upstreams list is already locked */
+ rspamd_mutex_lock (up->lock);
+ event_del (&up->ev);
+
+ if (up->new_addrs) {
+ rspamd_upstream_update_addrs (up);
+ }
+
+ g_ptr_array_add (ups->alive, up);
+ up->active_idx = ups->alive->len - 1;
+ rspamd_mutex_lock (up->lock);
+ /* For revive event */
+ REF_RELEASE (up);
+}
+
+static struct upstream*
+rspamd_upstream_get_random (struct upstream_list *ups)
+{
+ guint idx = ottery_rand_range (ups->alive->len - 1);
+
+ return g_ptr_array_index (ups->alive, idx);
+}
+
+static struct upstream*
+rspamd_upstream_get_round_robin (struct upstream_list *ups, gboolean use_cur)
+{
+ guint max_weight = 0;
+ struct upstream *up, *selected;
+ guint i;
+
+ /* Select upstream with the maximum cur_weight */
+ rspamd_mutex_lock (ups->lock);
+ for (i = 0; i < ups->alive->len; i ++) {
+ up = g_ptr_array_index (ups->alive, i);
+ if (use_cur) {
+ if (up->cur_weight > max_weight) {
+ selected = up;
+ max_weight = up->cur_weight;
+ }
+ }
+ else {
+ if (up->weight > max_weight) {
+ selected = up;
+ max_weight = up->weight;
+ }
+ }
+ }
+
+ if (use_cur) {
+ if (selected->cur_weight > 0) {
+ selected->cur_weight--;
+ }
+ else {
+ selected->cur_weight = selected->weight;
+ }
+ }
+ rspamd_mutex_unlock (ups->lock);
+
+ return selected;
+}
+
+/*
+ * The key idea of this function is obtained from the following paper:
+ * A Fast, Minimal Memory, Consistent Hash Algorithm
+ * John Lamping, Eric Veach
+ *
+ * http://arxiv.org/abs/1406.2294
+ */
+static guint32
+rspamd_consistent_hash (guint64 key, guint32 nbuckets)
+{
+ gint64 b = -1, j = 0;
+
+ while (j < nbuckets) {
+ b = j;
+ key *= 2862933555777941757ULL + 1;
+ j = (b + 1) * (double)(1ULL << 31) / (double)((key >> 33) + 1ULL);
+ }
+
+ return b;
+}
+
+static struct upstream*
+rspamd_upstream_get_hashed (struct upstream_list *ups, const guint8 *key, guint keylen)
+{
+ union {
+ guint64 k64;
+ guint32 k32[2];
+ } h;
+
+ guint32 idx;
+
+ /* Generate 64 bits input key */
+ h.k32[0] = XXH32 (key, keylen, ((guint32*)&ups->hash_seed)[0]);
+ h.k32[1] = XXH32 (key, keylen, ((guint32*)&ups->hash_seed)[1]);
+
+ rspamd_mutex_lock (ups->lock);
+ idx = rspamd_consistent_hash (h.k64, ups->alive->len);
+ rspamd_mutex_unlock (ups->lock);
+
+ return g_ptr_array_index (ups->alive, idx);
+}
+
+struct upstream*
+rspamd_upstream_get (struct upstream_list *ups,
+ enum rspamd_upstream_rotation type, ...)
+{
+ va_list ap;
+ const guint8 *key;
+ guint keylen;
+
+ rspamd_mutex_lock (ups->lock);
+ if (ups->alive->len == 0) {
+ /* We have no upstreams alive */
+ g_ptr_array_foreach (ups->ups, rspamd_upstream_restore_cb, ups);
+ }
+ rspamd_mutex_unlock (ups->lock);
+
+ switch (type) {
+ case RSPAMD_UPSTREAM_RANDOM:
+ return rspamd_upstream_get_random (ups);
+ case RSPAMD_UPSTREAM_HASHED:
+ va_start (ap, type);
+ key = va_arg (ap, const guint8 *);
+ keylen = va_arg (ap, guint);
+ va_end (ap);
+ return rspamd_upstream_get_hashed (ups, key, keylen);
+ case RSPAMD_UPSTREAM_ROUND_ROBIN:
+ return rspamd_upstream_get_round_robin (ups, TRUE);
+ case RSPAMD_UPSTREAM_MASTER_SLAVE:
+ return rspamd_upstream_get_round_robin (ups, FALSE);
+ }
+}