From: Vsevolod Stakhov Date: Tue, 28 Oct 2014 17:04:43 +0000 (+0000) Subject: Upstreams get implementation. X-Git-Tag: 0.7.3~36^2~1 X-Git-Url: https://source.dussan.org/?a=commitdiff_plain;h=88490d9f18997e2acc347e6891953c19eaf7c7df;p=rspamd.git Upstreams get implementation. --- diff --git a/src/libutil/upstream.c b/src/libutil/upstream.c index 55f44ec80..ca3c4a947 100644 --- a/src/libutil/upstream.c +++ b/src/libutil/upstream.c @@ -27,6 +27,7 @@ #include "ottery.h" #include "ref.h" #include "rdns.h" +#include "xxhash.h" #include "utlist.h" struct upstream_inet_addr_entry { @@ -52,6 +53,7 @@ struct upstream { } addrs; struct upstream_inet_addr_entry *new_addrs; + rspamd_mutex_t *lock; ref_entry_t ref; }; @@ -60,7 +62,7 @@ struct upstream_list { GPtrArray *ups; GPtrArray *alive; rspamd_mutex_t *lock; - guint hash_seed; + guint64 hash_seed; }; static struct rdns_resolver *res = NULL; @@ -92,6 +94,7 @@ rspamd_upstream_dns_cb (struct rdns_reply *reply, void *arg) if (reply->code == RDNS_RC_NOERROR) { entry = reply->entries; + rspamd_mutex_lock (up->lock); while (entry) { if (entry->type == RDNS_REQUEST_A) { @@ -113,6 +116,7 @@ rspamd_upstream_dns_cb (struct rdns_reply *reply, void *arg) } entry = entry->next; } + rspamd_mutex_unlock (up->lock); } REF_RELEASE (up); @@ -166,6 +170,7 @@ rspamd_upstream_revive_cb (int fd, short what, void *arg) { struct upstream *up = (struct upstream *)arg; + rspamd_mutex_lock (up->lock); event_del (&up->ev); if (up->ls) { rspamd_upstream_set_active (up->ls, up); @@ -175,6 +180,7 @@ rspamd_upstream_revive_cb (int fd, short what, void *arg) } } + rspamd_mutex_unlock (up->lock); REF_RELEASE (up); } @@ -221,6 +227,7 @@ rspamd_upstream_fail (struct upstream *up) struct timeval tv; gdouble error_rate, max_error_rate; + rspamd_mutex_lock (up->lock); if (g_atomic_int_compare_and_exchange (&up->errors, 0, 1)) { gettimeofday (&up->tv, NULL); up->errors ++; @@ -238,18 +245,19 @@ rspamd_upstream_fail (struct upstream *up) /* Remove upstream from the active list */ rspamd_upstream_set_inactive (up->ls, up); } + rspamd_mutex_unlock (up->lock); } void rspamd_upstream_ok (struct upstream *up) { + rspamd_mutex_lock (up->lock); if (up->errors > 0) { up->errors = 0; rspamd_upstream_set_active (up->ls, up); } - /* Rotate weight of the alive upstream */ - up->cur_weight = up->cur_weight > 0 ? up->cur_weight -- : up->weight; + rspamd_mutex_unlock (up->lock); } struct upstream_list* @@ -258,7 +266,7 @@ rspamd_upstreams_create (void) struct upstream_list *ls; ls = g_slice_alloc (sizeof (*ls)); - ls->hash_seed = ottery_rand_unsigned (); + ottery_rand_bytes (&ls->hash_seed, sizeof (ls->hash_seed)); ls->ups = g_ptr_array_new (); ls->alive = g_ptr_array_new (); ls->lock = rspamd_mutex_new (); @@ -277,6 +285,7 @@ rspamd_upstream_dtor (struct upstream *up) } } + rspamd_mutex_free (up->lock); g_free (up->name); g_slice_free1 (sizeof (*up), up); } @@ -307,6 +316,7 @@ rspamd_upstreams_add_upstream (struct upstream_list *ups, up->ud = data; up->cur_weight = up->weight; REF_INIT_RETAIN (up, rspamd_upstream_dtor); + up->lock = rspamd_mutex_new (); rspamd_upstream_set_active (ups, up); @@ -331,3 +341,143 @@ rspamd_upstreams_destroy (struct upstream_list *ups) rspamd_mutex_free (ups->lock); g_slice_free1 (sizeof (*ups), ups); } + +static void +rspamd_upstream_restore_cb (gpointer elt, gpointer ls) +{ + struct upstream *up = (struct upstream *)elt; + struct upstream_list *ups = (struct upstream_list *)ls; + + /* Here the upstreams list is already locked */ + rspamd_mutex_lock (up->lock); + event_del (&up->ev); + + if (up->new_addrs) { + rspamd_upstream_update_addrs (up); + } + + g_ptr_array_add (ups->alive, up); + up->active_idx = ups->alive->len - 1; + rspamd_mutex_lock (up->lock); + /* For revive event */ + REF_RELEASE (up); +} + +static struct upstream* +rspamd_upstream_get_random (struct upstream_list *ups) +{ + guint idx = ottery_rand_range (ups->alive->len - 1); + + return g_ptr_array_index (ups->alive, idx); +} + +static struct upstream* +rspamd_upstream_get_round_robin (struct upstream_list *ups, gboolean use_cur) +{ + guint max_weight = 0; + struct upstream *up, *selected; + guint i; + + /* Select upstream with the maximum cur_weight */ + rspamd_mutex_lock (ups->lock); + for (i = 0; i < ups->alive->len; i ++) { + up = g_ptr_array_index (ups->alive, i); + if (use_cur) { + if (up->cur_weight > max_weight) { + selected = up; + max_weight = up->cur_weight; + } + } + else { + if (up->weight > max_weight) { + selected = up; + max_weight = up->weight; + } + } + } + + if (use_cur) { + if (selected->cur_weight > 0) { + selected->cur_weight--; + } + else { + selected->cur_weight = selected->weight; + } + } + rspamd_mutex_unlock (ups->lock); + + return selected; +} + +/* + * The key idea of this function is obtained from the following paper: + * A Fast, Minimal Memory, Consistent Hash Algorithm + * John Lamping, Eric Veach + * + * http://arxiv.org/abs/1406.2294 + */ +static guint32 +rspamd_consistent_hash (guint64 key, guint32 nbuckets) +{ + gint64 b = -1, j = 0; + + while (j < nbuckets) { + b = j; + key *= 2862933555777941757ULL + 1; + j = (b + 1) * (double)(1ULL << 31) / (double)((key >> 33) + 1ULL); + } + + return b; +} + +static struct upstream* +rspamd_upstream_get_hashed (struct upstream_list *ups, const guint8 *key, guint keylen) +{ + union { + guint64 k64; + guint32 k32[2]; + } h; + + guint32 idx; + + /* Generate 64 bits input key */ + h.k32[0] = XXH32 (key, keylen, ((guint32*)&ups->hash_seed)[0]); + h.k32[1] = XXH32 (key, keylen, ((guint32*)&ups->hash_seed)[1]); + + rspamd_mutex_lock (ups->lock); + idx = rspamd_consistent_hash (h.k64, ups->alive->len); + rspamd_mutex_unlock (ups->lock); + + return g_ptr_array_index (ups->alive, idx); +} + +struct upstream* +rspamd_upstream_get (struct upstream_list *ups, + enum rspamd_upstream_rotation type, ...) +{ + va_list ap; + const guint8 *key; + guint keylen; + + rspamd_mutex_lock (ups->lock); + if (ups->alive->len == 0) { + /* We have no upstreams alive */ + g_ptr_array_foreach (ups->ups, rspamd_upstream_restore_cb, ups); + } + rspamd_mutex_unlock (ups->lock); + + switch (type) { + case RSPAMD_UPSTREAM_RANDOM: + return rspamd_upstream_get_random (ups); + case RSPAMD_UPSTREAM_HASHED: + va_start (ap, type); + key = va_arg (ap, const guint8 *); + keylen = va_arg (ap, guint); + va_end (ap); + return rspamd_upstream_get_hashed (ups, key, keylen); + case RSPAMD_UPSTREAM_ROUND_ROBIN: + return rspamd_upstream_get_round_robin (ups, TRUE); + case RSPAMD_UPSTREAM_MASTER_SLAVE: + return rspamd_upstream_get_round_robin (ups, FALSE); + } +}