Преглед изворни кода

[Fix] Fix consistent hashing when upstreams are marked inactive

The idea is to rehash the value when we found that the current upstream is dead
and apply consistent hashing algorithm multiple times.  This is limited by
number of attempts (we try 10 times before giving up). Also cleanup locking
stuff.
tags/2.2
Vsevolod Stakhov пре 4 година
родитељ
комит
19e29ff61d
1 измењених фајлова са 91 додато и 37 уклоњено
  1. 91
    37
      src/libutil/upstream.c

+ 91
- 37
src/libutil/upstream.c Прегледај датотеку

@@ -22,9 +22,11 @@
#include "cryptobox.h"
#include "utlist.h"
#include "logger.h"
#include "contrib/librdns/rdns.h"
#include "contrib/mumhash/mum.h"

#include <math.h>
#include <contrib/librdns/rdns.h>

struct upstream_inet_addr_entry {
rspamd_inet_addr_t *addr;
@@ -69,10 +71,12 @@ struct upstream {
} addrs;

struct upstream_inet_addr_entry *new_addrs;
rspamd_mutex_t *lock;
gpointer data;
gchar uid[8];
ref_entry_t ref;
#ifdef UPSTREAMS_THREAD_SAFE
rspamd_mutex_t *lock;
#endif
};

struct upstream_limits {
@@ -86,16 +90,19 @@ struct upstream_limits {
};

struct upstream_list {
gchar *ups_line;
struct upstream_ctx *ctx;
GPtrArray *ups;
GPtrArray *alive;
struct upstream_list_watcher *watchers;
rspamd_mutex_t *lock;
guint64 hash_seed;
struct upstream_limits limits;
enum rspamd_upstream_flag flags;
guint cur_elt;
enum rspamd_upstream_rotation rot_alg;
#ifdef UPSTREAMS_THREAD_SAFE
rspamd_mutex_t *lock;
#endif
};

struct upstream_ctx {
@@ -112,8 +119,8 @@ struct upstream_ctx {
#define RSPAMD_UPSTREAM_LOCK(x) do { } while (0)
#define RSPAMD_UPSTREAM_UNLOCK(x) do { } while (0)
#else
#define RSPAMD_UPSTREAM_LOCK(x) rspamd_mutex_lock(x)
#define RSPAMD_UPSTREAM_UNLOCK(x) rspamd_mutex_unlock(x)
#define RSPAMD_UPSTREAM_LOCK(x) rspamd_mutex_lock(x->lock)
#define RSPAMD_UPSTREAM_UNLOCK(x) rspamd_mutex_unlock(x->lock)
#endif

#define msg_debug_upstream(...) rspamd_conditional_debug_fast (NULL, NULL, \
@@ -294,7 +301,7 @@ rspamd_upstream_addr_sort_func (gconstpointer a, gconstpointer b)
static void
rspamd_upstream_set_active (struct upstream_list *ls, struct upstream *upstream)
{
RSPAMD_UPSTREAM_LOCK (ls->lock);
RSPAMD_UPSTREAM_LOCK (ls);
g_ptr_array_add (ls->alive, upstream);
upstream->active_idx = ls->alive->len - 1;

@@ -324,7 +331,7 @@ rspamd_upstream_set_active (struct upstream_list *ls, struct upstream *upstream)
ev_timer_start (upstream->ctx->event_loop, &upstream->ev);
}

RSPAMD_UPSTREAM_UNLOCK (ls->lock);
RSPAMD_UPSTREAM_UNLOCK (ls);
}

static void
@@ -351,7 +358,7 @@ rspamd_upstream_update_addrs (struct upstream *upstream)
* We need first of all get the saved port, since DNS gives us no
* idea about what port has been used previously
*/
RSPAMD_UPSTREAM_LOCK (upstream->lock);
RSPAMD_UPSTREAM_LOCK (upstream);

if (upstream->addrs.addr->len > 0 && upstream->new_addrs) {
addr_elt = g_ptr_array_index (upstream->addrs.addr, 0);
@@ -421,7 +428,7 @@ rspamd_upstream_update_addrs (struct upstream *upstream)
}

upstream->new_addrs = NULL;
RSPAMD_UPSTREAM_UNLOCK (upstream->lock);
RSPAMD_UPSTREAM_UNLOCK (upstream);
}

static void
@@ -434,7 +441,7 @@ rspamd_upstream_dns_cb (struct rdns_reply *reply, void *arg)
if (reply->code == RDNS_RC_NOERROR) {
entry = reply->entries;

RSPAMD_UPSTREAM_LOCK (up->lock);
RSPAMD_UPSTREAM_LOCK (up);
while (entry) {

if (entry->type == RDNS_REQUEST_A) {
@@ -452,7 +459,7 @@ rspamd_upstream_dns_cb (struct rdns_reply *reply, void *arg)
entry = entry->next;
}

RSPAMD_UPSTREAM_UNLOCK (up->lock);
RSPAMD_UPSTREAM_UNLOCK (up);
}

up->dns_requests--;
@@ -486,7 +493,7 @@ rspamd_upstream_dns_srv_phase2_cb (struct rdns_reply *reply, void *arg)
if (reply->code == RDNS_RC_NOERROR) {
entry = reply->entries;

RSPAMD_UPSTREAM_LOCK (up->lock);
RSPAMD_UPSTREAM_LOCK (up);
while (entry) {

if (entry->type == RDNS_REQUEST_A) {
@@ -508,7 +515,7 @@ rspamd_upstream_dns_srv_phase2_cb (struct rdns_reply *reply, void *arg)
entry = entry->next;
}

RSPAMD_UPSTREAM_UNLOCK (up->lock);
RSPAMD_UPSTREAM_UNLOCK (up);
}

up->dns_requests--;
@@ -535,7 +542,7 @@ rspamd_upstream_dns_srv_cb (struct rdns_reply *reply, void *arg)
if (reply->code == RDNS_RC_NOERROR) {
entry = reply->entries;

RSPAMD_UPSTREAM_LOCK (upstream->lock);
RSPAMD_UPSTREAM_LOCK (upstream);
while (entry) {
/* XXX: we ignore weight as it contradicts with upstreams logic */
if (entry->type == RDNS_REQUEST_SRV) {
@@ -577,7 +584,7 @@ rspamd_upstream_dns_srv_cb (struct rdns_reply *reply, void *arg)
entry = entry->next;
}

RSPAMD_UPSTREAM_UNLOCK (upstream->lock);
RSPAMD_UPSTREAM_UNLOCK (upstream);
}

upstream->dns_requests--;
@@ -589,7 +596,7 @@ rspamd_upstream_revive_cb (struct ev_loop *loop, ev_timer *w, int revents)
{
struct upstream *upstream = (struct upstream *)w->data;

RSPAMD_UPSTREAM_LOCK (upstream->lock);
RSPAMD_UPSTREAM_LOCK (upstream);
ev_timer_stop (loop, w);

msg_debug_upstream ("revive upstream %s", upstream->name);
@@ -598,7 +605,7 @@ rspamd_upstream_revive_cb (struct ev_loop *loop, ev_timer *w, int revents)
rspamd_upstream_set_active (upstream->ls, upstream);
}

RSPAMD_UPSTREAM_UNLOCK (upstream->lock);
RSPAMD_UPSTREAM_UNLOCK (upstream);
g_assert (upstream->ref.refcount > 1);
REF_RELEASE (upstream);
}
@@ -648,7 +655,7 @@ rspamd_upstream_lazy_resolve_cb (struct ev_loop *loop, ev_timer *w, int revents)
{
struct upstream *up = (struct upstream *)w->data;

RSPAMD_UPSTREAM_LOCK (up->lock);
RSPAMD_UPSTREAM_LOCK (up);
ev_timer_stop (loop, w);

if (up->ls) {
@@ -665,7 +672,7 @@ rspamd_upstream_lazy_resolve_cb (struct ev_loop *loop, ev_timer *w, int revents)
ev_timer_again (loop, w);
}

RSPAMD_UPSTREAM_UNLOCK (up->lock);
RSPAMD_UPSTREAM_UNLOCK (up);
}

static void
@@ -676,7 +683,7 @@ rspamd_upstream_set_inactive (struct upstream_list *ls, struct upstream *upstrea
struct upstream *cur;
struct upstream_list_watcher *w;

RSPAMD_UPSTREAM_LOCK (ls->lock);
RSPAMD_UPSTREAM_LOCK (ls);
g_ptr_array_remove_index (ls->alive, upstream->active_idx);
upstream->active_idx = -1;

@@ -713,7 +720,7 @@ rspamd_upstream_set_inactive (struct upstream_list *ls, struct upstream *upstrea
}
}

RSPAMD_UPSTREAM_UNLOCK (ls->lock);
RSPAMD_UPSTREAM_UNLOCK (ls);
}

void
@@ -727,7 +734,7 @@ rspamd_upstream_fail (struct upstream *up, gboolean addr_failure)
if (up->ctx && up->active_idx != -1) {
sec_cur = rspamd_get_ticks (FALSE);

RSPAMD_UPSTREAM_LOCK (up->lock);
RSPAMD_UPSTREAM_LOCK (up);
if (up->errors == 0) {
/* We have the first error */
up->last_fail = sec_cur;
@@ -786,7 +793,7 @@ rspamd_upstream_fail (struct upstream *up, gboolean addr_failure)
}
}

RSPAMD_UPSTREAM_UNLOCK (up->lock);
RSPAMD_UPSTREAM_UNLOCK (up);
}
}

@@ -796,7 +803,7 @@ rspamd_upstream_ok (struct upstream *up)
struct upstream_addr_elt *addr_elt;
struct upstream_list_watcher *w;

RSPAMD_UPSTREAM_LOCK (up->lock);
RSPAMD_UPSTREAM_LOCK (up);
if (up->errors > 0 && up->active_idx != -1) {
/* We touch upstream if and only if it is active */
up->errors = 0;
@@ -813,15 +820,15 @@ rspamd_upstream_ok (struct upstream *up)
}
}

RSPAMD_UPSTREAM_UNLOCK (up->lock);
RSPAMD_UPSTREAM_UNLOCK (up);
}

void
rspamd_upstream_set_weight (struct upstream *up, guint weight)
{
RSPAMD_UPSTREAM_LOCK (up->lock);
RSPAMD_UPSTREAM_LOCK (up);
up->weight = weight;
RSPAMD_UPSTREAM_UNLOCK (up->lock);
RSPAMD_UPSTREAM_UNLOCK (up);
}

#define SEED_CONSTANT 0xa574de7df64e9b9dULL
@@ -835,7 +842,10 @@ rspamd_upstreams_create (struct upstream_ctx *ctx)
ls->hash_seed = SEED_CONSTANT;
ls->ups = g_ptr_array_new ();
ls->alive = g_ptr_array_new ();

#ifdef UPSTREAMS_THREAD_SAFE
ls->lock = rspamd_mutex_new ();
#endif
ls->cur_elt = 0;
ls->ctx = ctx;
ls->rot_alg = RSPAMD_UPSTREAM_UNDEF;
@@ -885,7 +895,9 @@ rspamd_upstream_dtor (struct upstream *up)
g_ptr_array_free (up->addrs.addr, TRUE);
}

#ifdef UPSTREAMS_THREAD_SAFE
rspamd_mutex_free (up->lock);
#endif

if (up->ctx) {

@@ -1059,7 +1071,9 @@ rspamd_upstreams_add_upstream (struct upstream_list *ups, const gchar *str,
upstream->cur_weight = upstream->weight;
upstream->ls = ups;
REF_INIT_RETAIN (upstream, rspamd_upstream_dtor);
#ifdef UPSTREAMS_THREAD_SAFE
upstream->lock = rspamd_mutex_new ();
#endif
upstream->ctx = ups->ctx;

if (upstream->ctx) {
@@ -1169,6 +1183,11 @@ rspamd_upstreams_parse_line_len (struct upstream_list *ups,
}
}

if (!ups->ups_line) {
ups->ups_line = g_malloc (len + 1);
rspamd_strlcpy (ups->ups_line, str, len + 1);
}

return ret;
}

@@ -1226,8 +1245,11 @@ rspamd_upstreams_destroy (struct upstream_list *ups)
g_free (w);
}

g_free (ups->ups_line);
g_ptr_array_free (ups->ups, TRUE);
#ifdef UPSTREAMS_THREAD_SAFE
rspamd_mutex_free (ups->lock);
#endif
g_free (ups);
}
}
@@ -1240,7 +1262,7 @@ rspamd_upstream_restore_cb (gpointer elt, gpointer ls)
struct upstream_list_watcher *w;

/* Here the upstreams list is already locked */
RSPAMD_UPSTREAM_LOCK (up->lock);
RSPAMD_UPSTREAM_LOCK (up);

if (ev_can_stop (&up->ev)) {
ev_timer_stop (up->ctx->event_loop, &up->ev);
@@ -1248,7 +1270,7 @@ rspamd_upstream_restore_cb (gpointer elt, gpointer ls)

g_ptr_array_add (ups->alive, up);
up->active_idx = ups->alive->len - 1;
RSPAMD_UPSTREAM_UNLOCK (up->lock);
RSPAMD_UPSTREAM_UNLOCK (up);

DL_FOREACH (up->ls->watchers, w) {
if (w->events_mask & RSPAMD_UPSTREAM_WATCH_ONLINE) {
@@ -1277,7 +1299,7 @@ rspamd_upstream_get_round_robin (struct upstream_list *ups, gboolean use_cur)
guint i;

/* Select upstream with the maximum cur_weight */
RSPAMD_UPSTREAM_LOCK (ups->lock);
RSPAMD_UPSTREAM_LOCK (ups);

for (i = 0; i < ups->alive->len; i ++) {
up = g_ptr_array_index (ups->alive, i);
@@ -1321,7 +1343,7 @@ rspamd_upstream_get_round_robin (struct upstream_list *ups, gboolean use_cur)
}
}

RSPAMD_UPSTREAM_UNLOCK (ups->lock);
RSPAMD_UPSTREAM_UNLOCK (ups);

return selected;
}
@@ -1352,16 +1374,46 @@ rspamd_upstream_get_hashed (struct upstream_list *ups, const guint8 *key, guint
{
guint64 k;
guint32 idx;
static const guint max_tries = 20;
struct upstream *up = NULL;

if (ups->alive->len == 1) {
/* Fast path */
return g_ptr_array_index (ups->alive, 0);
}

/* Generate 64 bits input key */
k = rspamd_cryptobox_fast_hash_specific (RSPAMD_CRYPTOBOX_XXHASH64,
key, keylen, ups->hash_seed);

RSPAMD_UPSTREAM_LOCK (ups->lock);
idx = rspamd_consistent_hash (k, ups->alive->len);
RSPAMD_UPSTREAM_UNLOCK (ups->lock);
RSPAMD_UPSTREAM_LOCK (ups);
/*
* Select new upstream from all upstreams
*/
for (guint i = 0; i < max_tries; i ++) {
idx = rspamd_consistent_hash (k, ups->ups->len);
up = g_ptr_array_index (ups->ups, idx);

return g_ptr_array_index (ups->alive, idx);
if (up->active_idx < 0) {
/* Found inactive upstream */
k = mum_hash_step (k, ups->hash_seed);
}
else {
break;
}
}
RSPAMD_UPSTREAM_UNLOCK (ups);

if (up->active_idx >= 0) {
return up;
}

/* We failed to find any active upstream */
up = rspamd_upstream_get_random (ups);
msg_info ("failed to find hashed upstream for %s, fallback to random: %s",
ups->ups_line, up->name);

return up;
}

static struct upstream*
@@ -1372,12 +1424,14 @@ rspamd_upstream_get_common (struct upstream_list *ups,
enum rspamd_upstream_rotation type;
struct upstream *up = NULL;

RSPAMD_UPSTREAM_LOCK (ups->lock);
RSPAMD_UPSTREAM_LOCK (ups);
if (ups->alive->len == 0) {
/* We have no upstreams alive */
msg_warn ("there are no alive upstreams left for %s, revive all of them",
ups->ups_line);
g_ptr_array_foreach (ups->ups, rspamd_upstream_restore_cb, ups);
}
RSPAMD_UPSTREAM_UNLOCK (ups->lock);
RSPAMD_UPSTREAM_UNLOCK (ups);

if (!forced) {
type = ups->rot_alg != RSPAMD_UPSTREAM_UNDEF ? ups->rot_alg : default_type;

Loading…
Откажи
Сачувај