Browse Source

Upstreams get implementation.

tags/0.7.3
Vsevolod Stakhov 9 years ago
parent
commit
88490d9f18
1 changed files with 154 additions and 4 deletions
  1. 154
    4
      src/libutil/upstream.c

+ 154
- 4
src/libutil/upstream.c View File

@@ -27,6 +27,7 @@
#include "ottery.h"
#include "ref.h"
#include "rdns.h"
#include "xxhash.h"
#include "utlist.h"

struct upstream_inet_addr_entry {
@@ -52,6 +53,7 @@ struct upstream {
} addrs;

struct upstream_inet_addr_entry *new_addrs;
rspamd_mutex_t *lock;

ref_entry_t ref;
};
@@ -60,7 +62,7 @@ struct upstream_list {
GPtrArray *ups;
GPtrArray *alive;
rspamd_mutex_t *lock;
guint hash_seed;
guint64 hash_seed;
};

static struct rdns_resolver *res = NULL;
@@ -92,6 +94,7 @@ rspamd_upstream_dns_cb (struct rdns_reply *reply, void *arg)
if (reply->code == RDNS_RC_NOERROR) {
entry = reply->entries;

rspamd_mutex_lock (up->lock);
while (entry) {

if (entry->type == RDNS_REQUEST_A) {
@@ -113,6 +116,7 @@ rspamd_upstream_dns_cb (struct rdns_reply *reply, void *arg)
}
entry = entry->next;
}
rspamd_mutex_unlock (up->lock);
}

REF_RELEASE (up);
@@ -166,6 +170,7 @@ rspamd_upstream_revive_cb (int fd, short what, void *arg)
{
struct upstream *up = (struct upstream *)arg;

rspamd_mutex_lock (up->lock);
event_del (&up->ev);
if (up->ls) {
rspamd_upstream_set_active (up->ls, up);
@@ -175,6 +180,7 @@ rspamd_upstream_revive_cb (int fd, short what, void *arg)
}
}

rspamd_mutex_unlock (up->lock);
REF_RELEASE (up);
}

@@ -221,6 +227,7 @@ rspamd_upstream_fail (struct upstream *up)
struct timeval tv;
gdouble error_rate, max_error_rate;

rspamd_mutex_lock (up->lock);
if (g_atomic_int_compare_and_exchange (&up->errors, 0, 1)) {
gettimeofday (&up->tv, NULL);
up->errors ++;
@@ -238,18 +245,19 @@ rspamd_upstream_fail (struct upstream *up)
/* Remove upstream from the active list */
rspamd_upstream_set_inactive (up->ls, up);
}
rspamd_mutex_unlock (up->lock);
}

void
rspamd_upstream_ok (struct upstream *up)
{
rspamd_mutex_lock (up->lock);
if (up->errors > 0) {
up->errors = 0;
rspamd_upstream_set_active (up->ls, up);
}

/* Rotate weight of the alive upstream */
up->cur_weight = up->cur_weight > 0 ? up->cur_weight -- : up->weight;
rspamd_mutex_unlock (up->lock);
}

struct upstream_list*
@@ -258,7 +266,7 @@ rspamd_upstreams_create (void)
struct upstream_list *ls;

ls = g_slice_alloc (sizeof (*ls));
ls->hash_seed = ottery_rand_unsigned ();
ottery_rand_bytes (&ls->hash_seed, sizeof (ls->hash_seed));
ls->ups = g_ptr_array_new ();
ls->alive = g_ptr_array_new ();
ls->lock = rspamd_mutex_new ();
@@ -277,6 +285,7 @@ rspamd_upstream_dtor (struct upstream *up)
}
}

rspamd_mutex_free (up->lock);
g_free (up->name);
g_slice_free1 (sizeof (*up), up);
}
@@ -307,6 +316,7 @@ rspamd_upstreams_add_upstream (struct upstream_list *ups,
up->ud = data;
up->cur_weight = up->weight;
REF_INIT_RETAIN (up, rspamd_upstream_dtor);
up->lock = rspamd_mutex_new ();

rspamd_upstream_set_active (ups, up);

@@ -331,3 +341,143 @@ rspamd_upstreams_destroy (struct upstream_list *ups)
rspamd_mutex_free (ups->lock);
g_slice_free1 (sizeof (*ups), ups);
}

static void
rspamd_upstream_restore_cb (gpointer elt, gpointer ls)
{
struct upstream *up = (struct upstream *)elt;
struct upstream_list *ups = (struct upstream_list *)ls;

/* Here the upstreams list is already locked */
rspamd_mutex_lock (up->lock);
event_del (&up->ev);

if (up->new_addrs) {
rspamd_upstream_update_addrs (up);
}

g_ptr_array_add (ups->alive, up);
up->active_idx = ups->alive->len - 1;
rspamd_mutex_lock (up->lock);
/* For revive event */
REF_RELEASE (up);
}

static struct upstream*
rspamd_upstream_get_random (struct upstream_list *ups)
{
guint idx = ottery_rand_range (ups->alive->len - 1);

return g_ptr_array_index (ups->alive, idx);
}

static struct upstream*
rspamd_upstream_get_round_robin (struct upstream_list *ups, gboolean use_cur)
{
guint max_weight = 0;
struct upstream *up, *selected;
guint i;

/* Select upstream with the maximum cur_weight */
rspamd_mutex_lock (ups->lock);
for (i = 0; i < ups->alive->len; i ++) {
up = g_ptr_array_index (ups->alive, i);
if (use_cur) {
if (up->cur_weight > max_weight) {
selected = up;
max_weight = up->cur_weight;
}
}
else {
if (up->weight > max_weight) {
selected = up;
max_weight = up->weight;
}
}
}

if (use_cur) {
if (selected->cur_weight > 0) {
selected->cur_weight--;
}
else {
selected->cur_weight = selected->weight;
}
}
rspamd_mutex_unlock (ups->lock);

return selected;
}

/*
* The key idea of this function is obtained from the following paper:
* A Fast, Minimal Memory, Consistent Hash Algorithm
* John Lamping, Eric Veach
*
* http://arxiv.org/abs/1406.2294
*/
static guint32
rspamd_consistent_hash (guint64 key, guint32 nbuckets)
{
gint64 b = -1, j = 0;

while (j < nbuckets) {
b = j;
key *= 2862933555777941757ULL + 1;
j = (b + 1) * (double)(1ULL << 31) / (double)((key >> 33) + 1ULL);
}

return b;
}

static struct upstream*
rspamd_upstream_get_hashed (struct upstream_list *ups, const guint8 *key, guint keylen)
{
union {
guint64 k64;
guint32 k32[2];
} h;

guint32 idx;

/* Generate 64 bits input key */
h.k32[0] = XXH32 (key, keylen, ((guint32*)&ups->hash_seed)[0]);
h.k32[1] = XXH32 (key, keylen, ((guint32*)&ups->hash_seed)[1]);

rspamd_mutex_lock (ups->lock);
idx = rspamd_consistent_hash (h.k64, ups->alive->len);
rspamd_mutex_unlock (ups->lock);

return g_ptr_array_index (ups->alive, idx);
}

struct upstream*
rspamd_upstream_get (struct upstream_list *ups,
enum rspamd_upstream_rotation type, ...)
{
va_list ap;
const guint8 *key;
guint keylen;

rspamd_mutex_lock (ups->lock);
if (ups->alive->len == 0) {
/* We have no upstreams alive */
g_ptr_array_foreach (ups->ups, rspamd_upstream_restore_cb, ups);
}
rspamd_mutex_unlock (ups->lock);

switch (type) {
case RSPAMD_UPSTREAM_RANDOM:
return rspamd_upstream_get_random (ups);
case RSPAMD_UPSTREAM_HASHED:
va_start (ap, type);
key = va_arg (ap, const guint8 *);
keylen = va_arg (ap, guint);
va_end (ap);
return rspamd_upstream_get_hashed (ups, key, keylen);
case RSPAMD_UPSTREAM_ROUND_ROBIN:
return rspamd_upstream_get_round_robin (ups, TRUE);
case RSPAMD_UPSTREAM_MASTER_SLAVE:
return rspamd_upstream_get_round_robin (ups, FALSE);
}
}

Loading…
Cancel
Save