#include "cryptobox.h"
#include "utlist.h"
#include "logger.h"
+#include "contrib/librdns/rdns.h"
+#include "contrib/mumhash/mum.h"
#include <math.h>
-#include <contrib/librdns/rdns.h>
+
struct upstream_inet_addr_entry {
rspamd_inet_addr_t *addr;
} addrs;
struct upstream_inet_addr_entry *new_addrs;
- rspamd_mutex_t *lock;
gpointer data;
gchar uid[8];
ref_entry_t ref;
+#ifdef UPSTREAMS_THREAD_SAFE
+ rspamd_mutex_t *lock;
+#endif
};
struct upstream_limits {
};
struct upstream_list {
+ gchar *ups_line;
struct upstream_ctx *ctx;
GPtrArray *ups;
GPtrArray *alive;
struct upstream_list_watcher *watchers;
- rspamd_mutex_t *lock;
guint64 hash_seed;
struct upstream_limits limits;
enum rspamd_upstream_flag flags;
guint cur_elt;
enum rspamd_upstream_rotation rot_alg;
+#ifdef UPSTREAMS_THREAD_SAFE
+ rspamd_mutex_t *lock;
+#endif
};
struct upstream_ctx {
#define RSPAMD_UPSTREAM_LOCK(x) do { } while (0)
#define RSPAMD_UPSTREAM_UNLOCK(x) do { } while (0)
#else
-#define RSPAMD_UPSTREAM_LOCK(x) rspamd_mutex_lock(x)
-#define RSPAMD_UPSTREAM_UNLOCK(x) rspamd_mutex_unlock(x)
+#define RSPAMD_UPSTREAM_LOCK(x) rspamd_mutex_lock(x->lock)
+#define RSPAMD_UPSTREAM_UNLOCK(x) rspamd_mutex_unlock(x->lock)
#endif
#define msg_debug_upstream(...) rspamd_conditional_debug_fast (NULL, NULL, \
static void
rspamd_upstream_set_active (struct upstream_list *ls, struct upstream *upstream)
{
- RSPAMD_UPSTREAM_LOCK (ls->lock);
+ RSPAMD_UPSTREAM_LOCK (ls);
g_ptr_array_add (ls->alive, upstream);
upstream->active_idx = ls->alive->len - 1;
ev_timer_start (upstream->ctx->event_loop, &upstream->ev);
}
- RSPAMD_UPSTREAM_UNLOCK (ls->lock);
+ RSPAMD_UPSTREAM_UNLOCK (ls);
}
static void
* We need first of all get the saved port, since DNS gives us no
* idea about what port has been used previously
*/
- RSPAMD_UPSTREAM_LOCK (upstream->lock);
+ RSPAMD_UPSTREAM_LOCK (upstream);
if (upstream->addrs.addr->len > 0 && upstream->new_addrs) {
addr_elt = g_ptr_array_index (upstream->addrs.addr, 0);
}
upstream->new_addrs = NULL;
- RSPAMD_UPSTREAM_UNLOCK (upstream->lock);
+ RSPAMD_UPSTREAM_UNLOCK (upstream);
}
static void
if (reply->code == RDNS_RC_NOERROR) {
entry = reply->entries;
- RSPAMD_UPSTREAM_LOCK (up->lock);
+ RSPAMD_UPSTREAM_LOCK (up);
while (entry) {
if (entry->type == RDNS_REQUEST_A) {
entry = entry->next;
}
- RSPAMD_UPSTREAM_UNLOCK (up->lock);
+ RSPAMD_UPSTREAM_UNLOCK (up);
}
up->dns_requests--;
if (reply->code == RDNS_RC_NOERROR) {
entry = reply->entries;
- RSPAMD_UPSTREAM_LOCK (up->lock);
+ RSPAMD_UPSTREAM_LOCK (up);
while (entry) {
if (entry->type == RDNS_REQUEST_A) {
entry = entry->next;
}
- RSPAMD_UPSTREAM_UNLOCK (up->lock);
+ RSPAMD_UPSTREAM_UNLOCK (up);
}
up->dns_requests--;
if (reply->code == RDNS_RC_NOERROR) {
entry = reply->entries;
- RSPAMD_UPSTREAM_LOCK (upstream->lock);
+ RSPAMD_UPSTREAM_LOCK (upstream);
while (entry) {
/* XXX: we ignore weight as it contradicts with upstreams logic */
if (entry->type == RDNS_REQUEST_SRV) {
entry = entry->next;
}
- RSPAMD_UPSTREAM_UNLOCK (upstream->lock);
+ RSPAMD_UPSTREAM_UNLOCK (upstream);
}
upstream->dns_requests--;
{
struct upstream *upstream = (struct upstream *)w->data;
- RSPAMD_UPSTREAM_LOCK (upstream->lock);
+ RSPAMD_UPSTREAM_LOCK (upstream);
ev_timer_stop (loop, w);
msg_debug_upstream ("revive upstream %s", upstream->name);
rspamd_upstream_set_active (upstream->ls, upstream);
}
- RSPAMD_UPSTREAM_UNLOCK (upstream->lock);
+ RSPAMD_UPSTREAM_UNLOCK (upstream);
g_assert (upstream->ref.refcount > 1);
REF_RELEASE (upstream);
}
{
struct upstream *up = (struct upstream *)w->data;
- RSPAMD_UPSTREAM_LOCK (up->lock);
+ RSPAMD_UPSTREAM_LOCK (up);
ev_timer_stop (loop, w);
if (up->ls) {
ev_timer_again (loop, w);
}
- RSPAMD_UPSTREAM_UNLOCK (up->lock);
+ RSPAMD_UPSTREAM_UNLOCK (up);
}
static void
struct upstream *cur;
struct upstream_list_watcher *w;
- RSPAMD_UPSTREAM_LOCK (ls->lock);
+ RSPAMD_UPSTREAM_LOCK (ls);
g_ptr_array_remove_index (ls->alive, upstream->active_idx);
upstream->active_idx = -1;
}
}
- RSPAMD_UPSTREAM_UNLOCK (ls->lock);
+ RSPAMD_UPSTREAM_UNLOCK (ls);
}
void
if (up->ctx && up->active_idx != -1) {
sec_cur = rspamd_get_ticks (FALSE);
- RSPAMD_UPSTREAM_LOCK (up->lock);
+ RSPAMD_UPSTREAM_LOCK (up);
if (up->errors == 0) {
/* We have the first error */
up->last_fail = sec_cur;
}
}
- RSPAMD_UPSTREAM_UNLOCK (up->lock);
+ RSPAMD_UPSTREAM_UNLOCK (up);
}
}
struct upstream_addr_elt *addr_elt;
struct upstream_list_watcher *w;
- RSPAMD_UPSTREAM_LOCK (up->lock);
+ RSPAMD_UPSTREAM_LOCK (up);
if (up->errors > 0 && up->active_idx != -1) {
/* We touch upstream if and only if it is active */
up->errors = 0;
}
}
- RSPAMD_UPSTREAM_UNLOCK (up->lock);
+ RSPAMD_UPSTREAM_UNLOCK (up);
}
void
rspamd_upstream_set_weight (struct upstream *up, guint weight)
{
- RSPAMD_UPSTREAM_LOCK (up->lock);
+ RSPAMD_UPSTREAM_LOCK (up);
up->weight = weight;
- RSPAMD_UPSTREAM_UNLOCK (up->lock);
+ RSPAMD_UPSTREAM_UNLOCK (up);
}
#define SEED_CONSTANT 0xa574de7df64e9b9dULL
ls->hash_seed = SEED_CONSTANT;
ls->ups = g_ptr_array_new ();
ls->alive = g_ptr_array_new ();
+
+#ifdef UPSTREAMS_THREAD_SAFE
ls->lock = rspamd_mutex_new ();
+#endif
ls->cur_elt = 0;
ls->ctx = ctx;
ls->rot_alg = RSPAMD_UPSTREAM_UNDEF;
g_ptr_array_free (up->addrs.addr, TRUE);
}
+#ifdef UPSTREAMS_THREAD_SAFE
rspamd_mutex_free (up->lock);
+#endif
if (up->ctx) {
upstream->cur_weight = upstream->weight;
upstream->ls = ups;
REF_INIT_RETAIN (upstream, rspamd_upstream_dtor);
+#ifdef UPSTREAMS_THREAD_SAFE
upstream->lock = rspamd_mutex_new ();
+#endif
upstream->ctx = ups->ctx;
if (upstream->ctx) {
}
}
+ if (!ups->ups_line) {
+ ups->ups_line = g_malloc (len + 1);
+ rspamd_strlcpy (ups->ups_line, str, len + 1);
+ }
+
return ret;
}
g_free (w);
}
+ g_free (ups->ups_line);
g_ptr_array_free (ups->ups, TRUE);
+#ifdef UPSTREAMS_THREAD_SAFE
rspamd_mutex_free (ups->lock);
+#endif
g_free (ups);
}
}
struct upstream_list_watcher *w;
/* Here the upstreams list is already locked */
- RSPAMD_UPSTREAM_LOCK (up->lock);
+ RSPAMD_UPSTREAM_LOCK (up);
if (ev_can_stop (&up->ev)) {
ev_timer_stop (up->ctx->event_loop, &up->ev);
g_ptr_array_add (ups->alive, up);
up->active_idx = ups->alive->len - 1;
- RSPAMD_UPSTREAM_UNLOCK (up->lock);
+ RSPAMD_UPSTREAM_UNLOCK (up);
DL_FOREACH (up->ls->watchers, w) {
if (w->events_mask & RSPAMD_UPSTREAM_WATCH_ONLINE) {
guint i;
/* Select upstream with the maximum cur_weight */
- RSPAMD_UPSTREAM_LOCK (ups->lock);
+ RSPAMD_UPSTREAM_LOCK (ups);
for (i = 0; i < ups->alive->len; i ++) {
up = g_ptr_array_index (ups->alive, i);
}
}
- RSPAMD_UPSTREAM_UNLOCK (ups->lock);
+ RSPAMD_UPSTREAM_UNLOCK (ups);
return selected;
}
{
guint64 k;
guint32 idx;
+ static const guint max_tries = 20;
+ struct upstream *up = NULL;
+
+ if (ups->alive->len == 1) {
+ /* Fast path */
+ return g_ptr_array_index (ups->alive, 0);
+ }
/* Generate 64 bits input key */
k = rspamd_cryptobox_fast_hash_specific (RSPAMD_CRYPTOBOX_XXHASH64,
key, keylen, ups->hash_seed);
- RSPAMD_UPSTREAM_LOCK (ups->lock);
- idx = rspamd_consistent_hash (k, ups->alive->len);
- RSPAMD_UPSTREAM_UNLOCK (ups->lock);
+ RSPAMD_UPSTREAM_LOCK (ups);
+ /*
+ * Select new upstream from all upstreams
+ */
+ for (guint i = 0; i < max_tries; i ++) {
+ idx = rspamd_consistent_hash (k, ups->ups->len);
+ up = g_ptr_array_index (ups->ups, idx);
- return g_ptr_array_index (ups->alive, idx);
+ if (up->active_idx < 0) {
+ /* Found inactive upstream */
+ k = mum_hash_step (k, ups->hash_seed);
+ }
+ else {
+ break;
+ }
+ }
+ RSPAMD_UPSTREAM_UNLOCK (ups);
+
+ if (up->active_idx >= 0) {
+ return up;
+ }
+
+ /* We failed to find any active upstream */
+ up = rspamd_upstream_get_random (ups);
+ msg_info ("failed to find hashed upstream for %s, fallback to random: %s",
+ ups->ups_line, up->name);
+
+ return up;
}
static struct upstream*
enum rspamd_upstream_rotation type;
struct upstream *up = NULL;
- RSPAMD_UPSTREAM_LOCK (ups->lock);
+ RSPAMD_UPSTREAM_LOCK (ups);
if (ups->alive->len == 0) {
/* We have no upstreams alive */
+ msg_warn ("there are no alive upstreams left for %s, revive all of them",
+ ups->ups_line);
g_ptr_array_foreach (ups->ups, rspamd_upstream_restore_cb, ups);
}
- RSPAMD_UPSTREAM_UNLOCK (ups->lock);
+ RSPAMD_UPSTREAM_UNLOCK (ups);
if (!forced) {
type = ups->rot_alg != RSPAMD_UPSTREAM_UNDEF ? ups->rot_alg : default_type;