|
|
@@ -82,6 +82,14 @@ struct upstream_ctx { |
|
|
|
ref_entry_t ref; |
|
|
|
}; |
|
|
|
|
|
|
|
#ifndef UPSTREAMS_THREAD_SAFE |
|
|
|
#define RSPAMD_UPSTREAM_LOCK(x) do { } while (0) |
|
|
|
#define RSPAMD_UPSTREAM_UNLOCK(x) do { } while (0) |
|
|
|
#else |
|
|
|
#define RSPAMD_UPSTREAM_LOCK(x) rspamd_mutex_lock(x) |
|
|
|
#define RSPAMD_UPSTREAM_UNLOCK(x) rspamd_mutex_unlock(x) |
|
|
|
#endif |
|
|
|
|
|
|
|
/* 4 errors in 10 seconds */ |
|
|
|
static guint default_max_errors = 4; |
|
|
|
static gdouble default_revive_time = 60; |
|
|
@@ -202,10 +210,10 @@ rspamd_upstream_addr_sort_func (gconstpointer a, gconstpointer b) |
|
|
|
static void |
|
|
|
rspamd_upstream_set_active (struct upstream_list *ls, struct upstream *up) |
|
|
|
{ |
|
|
|
rspamd_mutex_lock (ls->lock); |
|
|
|
RSPAMD_UPSTREAM_LOCK (ls->lock); |
|
|
|
g_ptr_array_add (ls->alive, up); |
|
|
|
up->active_idx = ls->alive->len - 1; |
|
|
|
rspamd_mutex_unlock (ls->lock); |
|
|
|
RSPAMD_UPSTREAM_UNLOCK (ls->lock); |
|
|
|
} |
|
|
|
|
|
|
|
static void |
|
|
@@ -230,7 +238,7 @@ rspamd_upstream_update_addrs (struct upstream *up) |
|
|
|
* We need first of all get the saved port, since DNS gives us no |
|
|
|
* idea about what port has been used previously |
|
|
|
*/ |
|
|
|
rspamd_mutex_lock (up->lock); |
|
|
|
RSPAMD_UPSTREAM_LOCK (up->lock); |
|
|
|
|
|
|
|
if (up->addrs.addr->len > 0 && up->new_addrs) { |
|
|
|
addr_elt = g_ptr_array_index (up->addrs.addr, 0); |
|
|
@@ -266,7 +274,7 @@ rspamd_upstream_update_addrs (struct upstream *up) |
|
|
|
} |
|
|
|
up->new_addrs = NULL; |
|
|
|
|
|
|
|
rspamd_mutex_unlock (up->lock); |
|
|
|
RSPAMD_UPSTREAM_UNLOCK (up->lock); |
|
|
|
} |
|
|
|
|
|
|
|
static void |
|
|
@@ -279,7 +287,7 @@ rspamd_upstream_dns_cb (struct rdns_reply *reply, void *arg) |
|
|
|
if (reply->code == RDNS_RC_NOERROR) { |
|
|
|
entry = reply->entries; |
|
|
|
|
|
|
|
rspamd_mutex_lock (up->lock); |
|
|
|
RSPAMD_UPSTREAM_LOCK (up->lock); |
|
|
|
while (entry) { |
|
|
|
|
|
|
|
if (entry->type == RDNS_REQUEST_A) { |
|
|
@@ -297,7 +305,7 @@ rspamd_upstream_dns_cb (struct rdns_reply *reply, void *arg) |
|
|
|
entry = entry->next; |
|
|
|
} |
|
|
|
|
|
|
|
rspamd_mutex_unlock (up->lock); |
|
|
|
RSPAMD_UPSTREAM_UNLOCK (up->lock); |
|
|
|
} |
|
|
|
|
|
|
|
up->dns_requests--; |
|
|
@@ -314,13 +322,13 @@ rspamd_upstream_revive_cb (int fd, short what, void *arg) |
|
|
|
{ |
|
|
|
struct upstream *up = (struct upstream *)arg; |
|
|
|
|
|
|
|
rspamd_mutex_lock (up->lock); |
|
|
|
RSPAMD_UPSTREAM_LOCK (up->lock); |
|
|
|
event_del (&up->ev); |
|
|
|
if (up->ls) { |
|
|
|
rspamd_upstream_set_active (up->ls, up); |
|
|
|
} |
|
|
|
|
|
|
|
rspamd_mutex_unlock (up->lock); |
|
|
|
RSPAMD_UPSTREAM_UNLOCK (up->lock); |
|
|
|
REF_RELEASE (up); |
|
|
|
} |
|
|
|
|
|
|
@@ -328,11 +336,19 @@ static void |
|
|
|
rspamd_upstream_set_inactive (struct upstream_list *ls, struct upstream *up) |
|
|
|
{ |
|
|
|
gdouble ntim; |
|
|
|
guint i; |
|
|
|
struct upstream *cur; |
|
|
|
|
|
|
|
rspamd_mutex_lock (ls->lock); |
|
|
|
RSPAMD_UPSTREAM_LOCK (ls->lock); |
|
|
|
g_ptr_array_remove_index (ls->alive, up->active_idx); |
|
|
|
up->active_idx = -1; |
|
|
|
|
|
|
|
/* We need to update all indicies */ |
|
|
|
for (i = 0; i < ls->alive->len; i ++) { |
|
|
|
cur = g_ptr_array_index (ls->alive, i); |
|
|
|
cur->active_idx = i; |
|
|
|
} |
|
|
|
|
|
|
|
if (up->ctx->res != NULL && up->ctx->configured && |
|
|
|
!(ls->flags & RSPAMD_UPSTREAM_FLAG_NORESOLVE)) { |
|
|
|
/* Resolve name of the upstream one more time */ |
|
|
@@ -364,7 +380,7 @@ rspamd_upstream_set_inactive (struct upstream_list *ls, struct upstream *up) |
|
|
|
double_to_tv (ntim, &up->tv); |
|
|
|
event_add (&up->ev, &up->tv); |
|
|
|
|
|
|
|
rspamd_mutex_unlock (ls->lock); |
|
|
|
RSPAMD_UPSTREAM_UNLOCK (ls->lock); |
|
|
|
} |
|
|
|
|
|
|
|
void |
|
|
@@ -375,45 +391,47 @@ rspamd_upstream_fail (struct upstream *up) |
|
|
|
gdouble sec_last, sec_cur; |
|
|
|
struct upstream_addr_elt *addr_elt; |
|
|
|
|
|
|
|
gettimeofday (&tv, NULL); |
|
|
|
if (up->active_idx != -1) { |
|
|
|
gettimeofday (&tv, NULL); |
|
|
|
|
|
|
|
rspamd_mutex_lock (up->lock); |
|
|
|
if (up->errors == 0 && up->active_idx != -1) { |
|
|
|
/* We have the first error */ |
|
|
|
up->tv = tv; |
|
|
|
up->errors = 1; |
|
|
|
} |
|
|
|
else if (up->active_idx != -1) { |
|
|
|
sec_last = tv_to_double (&up->tv); |
|
|
|
sec_cur = tv_to_double (&tv); |
|
|
|
|
|
|
|
if (sec_cur >= sec_last) { |
|
|
|
up->errors ++; |
|
|
|
|
|
|
|
if (sec_cur > sec_last) { |
|
|
|
error_rate = ((gdouble)up->errors) / (sec_cur - sec_last); |
|
|
|
max_error_rate = ((gdouble)up->ctx->max_errors) / up->ctx->error_time; |
|
|
|
} |
|
|
|
else { |
|
|
|
error_rate = 1; |
|
|
|
max_error_rate = 0; |
|
|
|
RSPAMD_UPSTREAM_LOCK (up->lock); |
|
|
|
if (up->errors == 0) { |
|
|
|
/* We have the first error */ |
|
|
|
up->tv = tv; |
|
|
|
up->errors = 1; |
|
|
|
} |
|
|
|
else { |
|
|
|
sec_last = tv_to_double (&up->tv); |
|
|
|
sec_cur = tv_to_double (&tv); |
|
|
|
|
|
|
|
if (sec_cur >= sec_last) { |
|
|
|
up->errors ++; |
|
|
|
|
|
|
|
if (sec_cur > sec_last) { |
|
|
|
error_rate = ((gdouble)up->errors) / (sec_cur - sec_last); |
|
|
|
max_error_rate = ((gdouble)up->ctx->max_errors) / up->ctx->error_time; |
|
|
|
} |
|
|
|
else { |
|
|
|
error_rate = 1; |
|
|
|
max_error_rate = 0; |
|
|
|
} |
|
|
|
|
|
|
|
if (error_rate > max_error_rate && up->active_idx != -1) { |
|
|
|
/* Remove upstream from the active list */ |
|
|
|
up->errors = 0; |
|
|
|
rspamd_upstream_set_inactive (up->ls, up); |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
if (error_rate > max_error_rate && up->active_idx != -1) { |
|
|
|
/* Remove upstream from the active list */ |
|
|
|
up->errors = 0; |
|
|
|
rspamd_upstream_set_inactive (up->ls, up); |
|
|
|
} |
|
|
|
/* Also increase count of errors for this specific address */ |
|
|
|
if (up->addrs.addr) { |
|
|
|
addr_elt = g_ptr_array_index (up->addrs.addr, up->addrs.cur); |
|
|
|
addr_elt->errors ++; |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
/* Also increase count of errors for this specific address */ |
|
|
|
if (up->addrs.addr) { |
|
|
|
addr_elt = g_ptr_array_index (up->addrs.addr, up->addrs.cur); |
|
|
|
addr_elt->errors ++; |
|
|
|
RSPAMD_UPSTREAM_UNLOCK (up->lock); |
|
|
|
} |
|
|
|
|
|
|
|
rspamd_mutex_unlock (up->lock); |
|
|
|
} |
|
|
|
|
|
|
|
void |
|
|
@@ -421,7 +439,7 @@ rspamd_upstream_ok (struct upstream *up) |
|
|
|
{ |
|
|
|
struct upstream_addr_elt *addr_elt; |
|
|
|
|
|
|
|
rspamd_mutex_lock (up->lock); |
|
|
|
RSPAMD_UPSTREAM_LOCK (up->lock); |
|
|
|
if (up->errors > 0 && up->active_idx != -1) { |
|
|
|
/* We touch upstream if and only if it is active */ |
|
|
|
up->errors = 0; |
|
|
@@ -433,7 +451,7 @@ rspamd_upstream_ok (struct upstream *up) |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
rspamd_mutex_unlock (up->lock); |
|
|
|
RSPAMD_UPSTREAM_UNLOCK (up->lock); |
|
|
|
} |
|
|
|
|
|
|
|
#define SEED_CONSTANT 0xa574de7df64e9b9dULL |
|
|
@@ -704,11 +722,11 @@ rspamd_upstream_restore_cb (gpointer elt, gpointer ls) |
|
|
|
struct upstream_list *ups = (struct upstream_list *)ls; |
|
|
|
|
|
|
|
/* Here the upstreams list is already locked */ |
|
|
|
rspamd_mutex_lock (up->lock); |
|
|
|
RSPAMD_UPSTREAM_LOCK (up->lock); |
|
|
|
event_del (&up->ev); |
|
|
|
g_ptr_array_add (ups->alive, up); |
|
|
|
up->active_idx = ups->alive->len - 1; |
|
|
|
rspamd_mutex_unlock (up->lock); |
|
|
|
RSPAMD_UPSTREAM_UNLOCK (up->lock); |
|
|
|
/* For revive event */ |
|
|
|
REF_RELEASE (up); |
|
|
|
} |
|
|
@@ -729,7 +747,7 @@ rspamd_upstream_get_round_robin (struct upstream_list *ups, gboolean use_cur) |
|
|
|
guint i; |
|
|
|
|
|
|
|
/* Select upstream with the maximum cur_weight */ |
|
|
|
rspamd_mutex_lock (ups->lock); |
|
|
|
RSPAMD_UPSTREAM_LOCK (ups->lock); |
|
|
|
for (i = 0; i < ups->alive->len; i ++) { |
|
|
|
up = g_ptr_array_index (ups->alive, i); |
|
|
|
if (use_cur) { |
|
|
@@ -764,7 +782,7 @@ rspamd_upstream_get_round_robin (struct upstream_list *ups, gboolean use_cur) |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
rspamd_mutex_unlock (ups->lock); |
|
|
|
RSPAMD_UPSTREAM_UNLOCK (ups->lock); |
|
|
|
|
|
|
|
return selected; |
|
|
|
} |
|
|
@@ -800,9 +818,9 @@ rspamd_upstream_get_hashed (struct upstream_list *ups, const guint8 *key, guint |
|
|
|
k = rspamd_cryptobox_fast_hash_specific (RSPAMD_CRYPTOBOX_XXHASH64, |
|
|
|
key, keylen, ups->hash_seed); |
|
|
|
|
|
|
|
rspamd_mutex_lock (ups->lock); |
|
|
|
RSPAMD_UPSTREAM_LOCK (ups->lock); |
|
|
|
idx = rspamd_consistent_hash (k, ups->alive->len); |
|
|
|
rspamd_mutex_unlock (ups->lock); |
|
|
|
RSPAMD_UPSTREAM_UNLOCK (ups->lock); |
|
|
|
|
|
|
|
return g_ptr_array_index (ups->alive, idx); |
|
|
|
} |
|
|
@@ -814,12 +832,12 @@ rspamd_upstream_get_common (struct upstream_list *ups, |
|
|
|
{ |
|
|
|
enum rspamd_upstream_rotation type; |
|
|
|
|
|
|
|
rspamd_mutex_lock (ups->lock); |
|
|
|
RSPAMD_UPSTREAM_LOCK (ups->lock); |
|
|
|
if (ups->alive->len == 0) { |
|
|
|
/* We have no upstreams alive */ |
|
|
|
g_ptr_array_foreach (ups->ups, rspamd_upstream_restore_cb, ups); |
|
|
|
} |
|
|
|
rspamd_mutex_unlock (ups->lock); |
|
|
|
RSPAMD_UPSTREAM_UNLOCK (ups->lock); |
|
|
|
|
|
|
|
if (!forced) { |
|
|
|
type = ups->rot_alg != RSPAMD_UPSTREAM_UNDEF ? ups->rot_alg : default_type; |