From 19e29ff61de92c0e7f7837d4d694067d74d06cae Mon Sep 17 00:00:00 2001 From: Vsevolod Stakhov Date: Wed, 30 Oct 2019 14:32:45 +0000 Subject: [PATCH] [Fix] Fix consistent hashing when upstreams are marked inactive The idea is to rehash the value when we found that the current upstream is dead and apply consistent hashing algorithm multiple times. This is limited by number of attempts (we try 10 times before giving up). Also cleanup locking stuff. --- src/libutil/upstream.c | 128 +++++++++++++++++++++++++++++------------ 1 file changed, 91 insertions(+), 37 deletions(-) diff --git a/src/libutil/upstream.c b/src/libutil/upstream.c index 543f07201..cef508e4d 100644 --- a/src/libutil/upstream.c +++ b/src/libutil/upstream.c @@ -22,9 +22,11 @@ #include "cryptobox.h" #include "utlist.h" #include "logger.h" +#include "contrib/librdns/rdns.h" +#include "contrib/mumhash/mum.h" #include -#include + struct upstream_inet_addr_entry { rspamd_inet_addr_t *addr; @@ -69,10 +71,12 @@ struct upstream { } addrs; struct upstream_inet_addr_entry *new_addrs; - rspamd_mutex_t *lock; gpointer data; gchar uid[8]; ref_entry_t ref; +#ifdef UPSTREAMS_THREAD_SAFE + rspamd_mutex_t *lock; +#endif }; struct upstream_limits { @@ -86,16 +90,19 @@ struct upstream_limits { }; struct upstream_list { + gchar *ups_line; struct upstream_ctx *ctx; GPtrArray *ups; GPtrArray *alive; struct upstream_list_watcher *watchers; - rspamd_mutex_t *lock; guint64 hash_seed; struct upstream_limits limits; enum rspamd_upstream_flag flags; guint cur_elt; enum rspamd_upstream_rotation rot_alg; +#ifdef UPSTREAMS_THREAD_SAFE + rspamd_mutex_t *lock; +#endif }; struct upstream_ctx { @@ -112,8 +119,8 @@ struct upstream_ctx { #define RSPAMD_UPSTREAM_LOCK(x) do { } while (0) #define RSPAMD_UPSTREAM_UNLOCK(x) do { } while (0) #else -#define RSPAMD_UPSTREAM_LOCK(x) rspamd_mutex_lock(x) -#define RSPAMD_UPSTREAM_UNLOCK(x) rspamd_mutex_unlock(x) +#define RSPAMD_UPSTREAM_LOCK(x) rspamd_mutex_lock(x->lock) +#define RSPAMD_UPSTREAM_UNLOCK(x) rspamd_mutex_unlock(x->lock) #endif #define msg_debug_upstream(...) rspamd_conditional_debug_fast (NULL, NULL, \ @@ -294,7 +301,7 @@ rspamd_upstream_addr_sort_func (gconstpointer a, gconstpointer b) static void rspamd_upstream_set_active (struct upstream_list *ls, struct upstream *upstream) { - RSPAMD_UPSTREAM_LOCK (ls->lock); + RSPAMD_UPSTREAM_LOCK (ls); g_ptr_array_add (ls->alive, upstream); upstream->active_idx = ls->alive->len - 1; @@ -324,7 +331,7 @@ rspamd_upstream_set_active (struct upstream_list *ls, struct upstream *upstream) ev_timer_start (upstream->ctx->event_loop, &upstream->ev); } - RSPAMD_UPSTREAM_UNLOCK (ls->lock); + RSPAMD_UPSTREAM_UNLOCK (ls); } static void @@ -351,7 +358,7 @@ rspamd_upstream_update_addrs (struct upstream *upstream) * We need first of all get the saved port, since DNS gives us no * idea about what port has been used previously */ - RSPAMD_UPSTREAM_LOCK (upstream->lock); + RSPAMD_UPSTREAM_LOCK (upstream); if (upstream->addrs.addr->len > 0 && upstream->new_addrs) { addr_elt = g_ptr_array_index (upstream->addrs.addr, 0); @@ -421,7 +428,7 @@ rspamd_upstream_update_addrs (struct upstream *upstream) } upstream->new_addrs = NULL; - RSPAMD_UPSTREAM_UNLOCK (upstream->lock); + RSPAMD_UPSTREAM_UNLOCK (upstream); } static void @@ -434,7 +441,7 @@ rspamd_upstream_dns_cb (struct rdns_reply *reply, void *arg) if (reply->code == RDNS_RC_NOERROR) { entry = reply->entries; - RSPAMD_UPSTREAM_LOCK (up->lock); + RSPAMD_UPSTREAM_LOCK (up); while (entry) { if (entry->type == RDNS_REQUEST_A) { @@ -452,7 +459,7 @@ rspamd_upstream_dns_cb (struct rdns_reply *reply, void *arg) entry = entry->next; } - RSPAMD_UPSTREAM_UNLOCK (up->lock); + RSPAMD_UPSTREAM_UNLOCK (up); } up->dns_requests--; @@ -486,7 +493,7 @@ rspamd_upstream_dns_srv_phase2_cb (struct rdns_reply *reply, void *arg) if (reply->code == RDNS_RC_NOERROR) { entry = reply->entries; - RSPAMD_UPSTREAM_LOCK (up->lock); + RSPAMD_UPSTREAM_LOCK (up); while (entry) { if (entry->type == RDNS_REQUEST_A) { @@ -508,7 +515,7 @@ rspamd_upstream_dns_srv_phase2_cb (struct rdns_reply *reply, void *arg) entry = entry->next; } - RSPAMD_UPSTREAM_UNLOCK (up->lock); + RSPAMD_UPSTREAM_UNLOCK (up); } up->dns_requests--; @@ -535,7 +542,7 @@ rspamd_upstream_dns_srv_cb (struct rdns_reply *reply, void *arg) if (reply->code == RDNS_RC_NOERROR) { entry = reply->entries; - RSPAMD_UPSTREAM_LOCK (upstream->lock); + RSPAMD_UPSTREAM_LOCK (upstream); while (entry) { /* XXX: we ignore weight as it contradicts with upstreams logic */ if (entry->type == RDNS_REQUEST_SRV) { @@ -577,7 +584,7 @@ rspamd_upstream_dns_srv_cb (struct rdns_reply *reply, void *arg) entry = entry->next; } - RSPAMD_UPSTREAM_UNLOCK (upstream->lock); + RSPAMD_UPSTREAM_UNLOCK (upstream); } upstream->dns_requests--; @@ -589,7 +596,7 @@ rspamd_upstream_revive_cb (struct ev_loop *loop, ev_timer *w, int revents) { struct upstream *upstream = (struct upstream *)w->data; - RSPAMD_UPSTREAM_LOCK (upstream->lock); + RSPAMD_UPSTREAM_LOCK (upstream); ev_timer_stop (loop, w); msg_debug_upstream ("revive upstream %s", upstream->name); @@ -598,7 +605,7 @@ rspamd_upstream_revive_cb (struct ev_loop *loop, ev_timer *w, int revents) rspamd_upstream_set_active (upstream->ls, upstream); } - RSPAMD_UPSTREAM_UNLOCK (upstream->lock); + RSPAMD_UPSTREAM_UNLOCK (upstream); g_assert (upstream->ref.refcount > 1); REF_RELEASE (upstream); } @@ -648,7 +655,7 @@ rspamd_upstream_lazy_resolve_cb (struct ev_loop *loop, ev_timer *w, int revents) { struct upstream *up = (struct upstream *)w->data; - RSPAMD_UPSTREAM_LOCK (up->lock); + RSPAMD_UPSTREAM_LOCK (up); ev_timer_stop (loop, w); if (up->ls) { @@ -665,7 +672,7 @@ rspamd_upstream_lazy_resolve_cb (struct ev_loop *loop, ev_timer *w, int revents) ev_timer_again (loop, w); } - RSPAMD_UPSTREAM_UNLOCK (up->lock); + RSPAMD_UPSTREAM_UNLOCK (up); } static void @@ -676,7 +683,7 @@ rspamd_upstream_set_inactive (struct upstream_list *ls, struct upstream *upstrea struct upstream *cur; struct upstream_list_watcher *w; - RSPAMD_UPSTREAM_LOCK (ls->lock); + RSPAMD_UPSTREAM_LOCK (ls); g_ptr_array_remove_index (ls->alive, upstream->active_idx); upstream->active_idx = -1; @@ -713,7 +720,7 @@ rspamd_upstream_set_inactive (struct upstream_list *ls, struct upstream *upstrea } } - RSPAMD_UPSTREAM_UNLOCK (ls->lock); + RSPAMD_UPSTREAM_UNLOCK (ls); } void @@ -727,7 +734,7 @@ rspamd_upstream_fail (struct upstream *up, gboolean addr_failure) if (up->ctx && up->active_idx != -1) { sec_cur = rspamd_get_ticks (FALSE); - RSPAMD_UPSTREAM_LOCK (up->lock); + RSPAMD_UPSTREAM_LOCK (up); if (up->errors == 0) { /* We have the first error */ up->last_fail = sec_cur; @@ -786,7 +793,7 @@ rspamd_upstream_fail (struct upstream *up, gboolean addr_failure) } } - RSPAMD_UPSTREAM_UNLOCK (up->lock); + RSPAMD_UPSTREAM_UNLOCK (up); } } @@ -796,7 +803,7 @@ rspamd_upstream_ok (struct upstream *up) struct upstream_addr_elt *addr_elt; struct upstream_list_watcher *w; - RSPAMD_UPSTREAM_LOCK (up->lock); + RSPAMD_UPSTREAM_LOCK (up); if (up->errors > 0 && up->active_idx != -1) { /* We touch upstream if and only if it is active */ up->errors = 0; @@ -813,15 +820,15 @@ rspamd_upstream_ok (struct upstream *up) } } - RSPAMD_UPSTREAM_UNLOCK (up->lock); + RSPAMD_UPSTREAM_UNLOCK (up); } void rspamd_upstream_set_weight (struct upstream *up, guint weight) { - RSPAMD_UPSTREAM_LOCK (up->lock); + RSPAMD_UPSTREAM_LOCK (up); up->weight = weight; - RSPAMD_UPSTREAM_UNLOCK (up->lock); + RSPAMD_UPSTREAM_UNLOCK (up); } #define SEED_CONSTANT 0xa574de7df64e9b9dULL @@ -835,7 +842,10 @@ rspamd_upstreams_create (struct upstream_ctx *ctx) ls->hash_seed = SEED_CONSTANT; ls->ups = g_ptr_array_new (); ls->alive = g_ptr_array_new (); + +#ifdef UPSTREAMS_THREAD_SAFE ls->lock = rspamd_mutex_new (); +#endif ls->cur_elt = 0; ls->ctx = ctx; ls->rot_alg = RSPAMD_UPSTREAM_UNDEF; @@ -885,7 +895,9 @@ rspamd_upstream_dtor (struct upstream *up) g_ptr_array_free (up->addrs.addr, TRUE); } +#ifdef UPSTREAMS_THREAD_SAFE rspamd_mutex_free (up->lock); +#endif if (up->ctx) { @@ -1059,7 +1071,9 @@ rspamd_upstreams_add_upstream (struct upstream_list *ups, const gchar *str, upstream->cur_weight = upstream->weight; upstream->ls = ups; REF_INIT_RETAIN (upstream, rspamd_upstream_dtor); +#ifdef UPSTREAMS_THREAD_SAFE upstream->lock = rspamd_mutex_new (); +#endif upstream->ctx = ups->ctx; if (upstream->ctx) { @@ -1169,6 +1183,11 @@ rspamd_upstreams_parse_line_len (struct upstream_list *ups, } } + if (!ups->ups_line) { + ups->ups_line = g_malloc (len + 1); + rspamd_strlcpy (ups->ups_line, str, len + 1); + } + return ret; } @@ -1226,8 +1245,11 @@ rspamd_upstreams_destroy (struct upstream_list *ups) g_free (w); } + g_free (ups->ups_line); g_ptr_array_free (ups->ups, TRUE); +#ifdef UPSTREAMS_THREAD_SAFE rspamd_mutex_free (ups->lock); +#endif g_free (ups); } } @@ -1240,7 +1262,7 @@ rspamd_upstream_restore_cb (gpointer elt, gpointer ls) struct upstream_list_watcher *w; /* Here the upstreams list is already locked */ - RSPAMD_UPSTREAM_LOCK (up->lock); + RSPAMD_UPSTREAM_LOCK (up); if (ev_can_stop (&up->ev)) { ev_timer_stop (up->ctx->event_loop, &up->ev); @@ -1248,7 +1270,7 @@ rspamd_upstream_restore_cb (gpointer elt, gpointer ls) g_ptr_array_add (ups->alive, up); up->active_idx = ups->alive->len - 1; - RSPAMD_UPSTREAM_UNLOCK (up->lock); + RSPAMD_UPSTREAM_UNLOCK (up); DL_FOREACH (up->ls->watchers, w) { if (w->events_mask & RSPAMD_UPSTREAM_WATCH_ONLINE) { @@ -1277,7 +1299,7 @@ rspamd_upstream_get_round_robin (struct upstream_list *ups, gboolean use_cur) guint i; /* Select upstream with the maximum cur_weight */ - RSPAMD_UPSTREAM_LOCK (ups->lock); + RSPAMD_UPSTREAM_LOCK (ups); for (i = 0; i < ups->alive->len; i ++) { up = g_ptr_array_index (ups->alive, i); @@ -1321,7 +1343,7 @@ rspamd_upstream_get_round_robin (struct upstream_list *ups, gboolean use_cur) } } - RSPAMD_UPSTREAM_UNLOCK (ups->lock); + RSPAMD_UPSTREAM_UNLOCK (ups); return selected; } @@ -1352,16 +1374,46 @@ rspamd_upstream_get_hashed (struct upstream_list *ups, const guint8 *key, guint { guint64 k; guint32 idx; + static const guint max_tries = 20; + struct upstream *up = NULL; + + if (ups->alive->len == 1) { + /* Fast path */ + return g_ptr_array_index (ups->alive, 0); + } /* Generate 64 bits input key */ k = rspamd_cryptobox_fast_hash_specific (RSPAMD_CRYPTOBOX_XXHASH64, key, keylen, ups->hash_seed); - RSPAMD_UPSTREAM_LOCK (ups->lock); - idx = rspamd_consistent_hash (k, ups->alive->len); - RSPAMD_UPSTREAM_UNLOCK (ups->lock); + RSPAMD_UPSTREAM_LOCK (ups); + /* + * Select new upstream from all upstreams + */ + for (guint i = 0; i < max_tries; i ++) { + idx = rspamd_consistent_hash (k, ups->ups->len); + up = g_ptr_array_index (ups->ups, idx); - return g_ptr_array_index (ups->alive, idx); + if (up->active_idx < 0) { + /* Found inactive upstream */ + k = mum_hash_step (k, ups->hash_seed); + } + else { + break; + } + } + RSPAMD_UPSTREAM_UNLOCK (ups); + + if (up->active_idx >= 0) { + return up; + } + + /* We failed to find any active upstream */ + up = rspamd_upstream_get_random (ups); + msg_info ("failed to find hashed upstream for %s, fallback to random: %s", + ups->ups_line, up->name); + + return up; } static struct upstream* @@ -1372,12 +1424,14 @@ rspamd_upstream_get_common (struct upstream_list *ups, enum rspamd_upstream_rotation type; struct upstream *up = NULL; - RSPAMD_UPSTREAM_LOCK (ups->lock); + RSPAMD_UPSTREAM_LOCK (ups); if (ups->alive->len == 0) { /* We have no upstreams alive */ + msg_warn ("there are no alive upstreams left for %s, revive all of them", + ups->ups_line); g_ptr_array_foreach (ups->ups, rspamd_upstream_restore_cb, ups); } - RSPAMD_UPSTREAM_UNLOCK (ups->lock); + RSPAMD_UPSTREAM_UNLOCK (ups); if (!forced) { type = ups->rot_alg != RSPAMD_UPSTREAM_UNDEF ? ups->rot_alg : default_type; -- 2.39.5