aboutsummaryrefslogtreecommitdiffstats
path: root/src/libutil
diff options
context:
space:
mode:
authorVsevolod Stakhov <vsevolod@highsecure.ru>2016-06-23 16:06:44 +0100
committerVsevolod Stakhov <vsevolod@highsecure.ru>2016-06-23 16:06:44 +0100
commit8dc67975d826c02e2757ff8e3e758b1839edb044 (patch)
tree5f6a9ece05883c19cfd442aa999ce93e7f16079d /src/libutil
parentaa8c94c4344f4d856700f58624e50d304bcc96b9 (diff)
downloadrspamd-8dc67975d826c02e2757ff8e3e758b1839edb044.tar.gz
rspamd-8dc67975d826c02e2757ff8e3e758b1839edb044.zip
[Fix] Another fix for marking upstreams inactive
Diffstat (limited to 'src/libutil')
-rw-r--r--src/libutil/upstream.c124
1 files changed, 71 insertions, 53 deletions
diff --git a/src/libutil/upstream.c b/src/libutil/upstream.c
index e8bf1a19c..43d9f7448 100644
--- a/src/libutil/upstream.c
+++ b/src/libutil/upstream.c
@@ -82,6 +82,14 @@ struct upstream_ctx {
ref_entry_t ref;
};
+#ifndef UPSTREAMS_THREAD_SAFE
+#define RSPAMD_UPSTREAM_LOCK(x) do { } while (0)
+#define RSPAMD_UPSTREAM_UNLOCK(x) do { } while (0)
+#else
+#define RSPAMD_UPSTREAM_LOCK(x) rspamd_mutex_lock(x)
+#define RSPAMD_UPSTREAM_UNLOCK(x) rspamd_mutex_unlock(x)
+#endif
+
/* 4 errors in 10 seconds */
static guint default_max_errors = 4;
static gdouble default_revive_time = 60;
@@ -202,10 +210,10 @@ rspamd_upstream_addr_sort_func (gconstpointer a, gconstpointer b)
static void
rspamd_upstream_set_active (struct upstream_list *ls, struct upstream *up)
{
- rspamd_mutex_lock (ls->lock);
+ RSPAMD_UPSTREAM_LOCK (ls->lock);
g_ptr_array_add (ls->alive, up);
up->active_idx = ls->alive->len - 1;
- rspamd_mutex_unlock (ls->lock);
+ RSPAMD_UPSTREAM_UNLOCK (ls->lock);
}
static void
@@ -230,7 +238,7 @@ rspamd_upstream_update_addrs (struct upstream *up)
* We need first of all get the saved port, since DNS gives us no
* idea about what port has been used previously
*/
- rspamd_mutex_lock (up->lock);
+ RSPAMD_UPSTREAM_LOCK (up->lock);
if (up->addrs.addr->len > 0 && up->new_addrs) {
addr_elt = g_ptr_array_index (up->addrs.addr, 0);
@@ -266,7 +274,7 @@ rspamd_upstream_update_addrs (struct upstream *up)
}
up->new_addrs = NULL;
- rspamd_mutex_unlock (up->lock);
+ RSPAMD_UPSTREAM_UNLOCK (up->lock);
}
static void
@@ -279,7 +287,7 @@ rspamd_upstream_dns_cb (struct rdns_reply *reply, void *arg)
if (reply->code == RDNS_RC_NOERROR) {
entry = reply->entries;
- rspamd_mutex_lock (up->lock);
+ RSPAMD_UPSTREAM_LOCK (up->lock);
while (entry) {
if (entry->type == RDNS_REQUEST_A) {
@@ -297,7 +305,7 @@ rspamd_upstream_dns_cb (struct rdns_reply *reply, void *arg)
entry = entry->next;
}
- rspamd_mutex_unlock (up->lock);
+ RSPAMD_UPSTREAM_UNLOCK (up->lock);
}
up->dns_requests--;
@@ -314,13 +322,13 @@ rspamd_upstream_revive_cb (int fd, short what, void *arg)
{
struct upstream *up = (struct upstream *)arg;
- rspamd_mutex_lock (up->lock);
+ RSPAMD_UPSTREAM_LOCK (up->lock);
event_del (&up->ev);
if (up->ls) {
rspamd_upstream_set_active (up->ls, up);
}
- rspamd_mutex_unlock (up->lock);
+ RSPAMD_UPSTREAM_UNLOCK (up->lock);
REF_RELEASE (up);
}
@@ -328,11 +336,19 @@ static void
rspamd_upstream_set_inactive (struct upstream_list *ls, struct upstream *up)
{
gdouble ntim;
+ guint i;
+ struct upstream *cur;
- rspamd_mutex_lock (ls->lock);
+ RSPAMD_UPSTREAM_LOCK (ls->lock);
g_ptr_array_remove_index (ls->alive, up->active_idx);
up->active_idx = -1;
+ /* We need to update all indicies */
+ for (i = 0; i < ls->alive->len; i ++) {
+ cur = g_ptr_array_index (ls->alive, i);
+ cur->active_idx = i;
+ }
+
if (up->ctx->res != NULL && up->ctx->configured &&
!(ls->flags & RSPAMD_UPSTREAM_FLAG_NORESOLVE)) {
/* Resolve name of the upstream one more time */
@@ -364,7 +380,7 @@ rspamd_upstream_set_inactive (struct upstream_list *ls, struct upstream *up)
double_to_tv (ntim, &up->tv);
event_add (&up->ev, &up->tv);
- rspamd_mutex_unlock (ls->lock);
+ RSPAMD_UPSTREAM_UNLOCK (ls->lock);
}
void
@@ -375,45 +391,47 @@ rspamd_upstream_fail (struct upstream *up)
gdouble sec_last, sec_cur;
struct upstream_addr_elt *addr_elt;
- gettimeofday (&tv, NULL);
+ if (up->active_idx != -1) {
+ gettimeofday (&tv, NULL);
- rspamd_mutex_lock (up->lock);
- if (up->errors == 0 && up->active_idx != -1) {
- /* We have the first error */
- up->tv = tv;
- up->errors = 1;
- }
- else if (up->active_idx != -1) {
- sec_last = tv_to_double (&up->tv);
- sec_cur = tv_to_double (&tv);
-
- if (sec_cur >= sec_last) {
- up->errors ++;
-
- if (sec_cur > sec_last) {
- error_rate = ((gdouble)up->errors) / (sec_cur - sec_last);
- max_error_rate = ((gdouble)up->ctx->max_errors) / up->ctx->error_time;
- }
- else {
- error_rate = 1;
- max_error_rate = 0;
+ RSPAMD_UPSTREAM_LOCK (up->lock);
+ if (up->errors == 0) {
+ /* We have the first error */
+ up->tv = tv;
+ up->errors = 1;
+ }
+ else {
+ sec_last = tv_to_double (&up->tv);
+ sec_cur = tv_to_double (&tv);
+
+ if (sec_cur >= sec_last) {
+ up->errors ++;
+
+ if (sec_cur > sec_last) {
+ error_rate = ((gdouble)up->errors) / (sec_cur - sec_last);
+ max_error_rate = ((gdouble)up->ctx->max_errors) / up->ctx->error_time;
+ }
+ else {
+ error_rate = 1;
+ max_error_rate = 0;
+ }
+
+ if (error_rate > max_error_rate && up->active_idx != -1) {
+ /* Remove upstream from the active list */
+ up->errors = 0;
+ rspamd_upstream_set_inactive (up->ls, up);
+ }
}
+ }
- if (error_rate > max_error_rate && up->active_idx != -1) {
- /* Remove upstream from the active list */
- up->errors = 0;
- rspamd_upstream_set_inactive (up->ls, up);
- }
+ /* Also increase count of errors for this specific address */
+ if (up->addrs.addr) {
+ addr_elt = g_ptr_array_index (up->addrs.addr, up->addrs.cur);
+ addr_elt->errors ++;
}
- }
- /* Also increase count of errors for this specific address */
- if (up->addrs.addr) {
- addr_elt = g_ptr_array_index (up->addrs.addr, up->addrs.cur);
- addr_elt->errors ++;
+ RSPAMD_UPSTREAM_UNLOCK (up->lock);
}
-
- rspamd_mutex_unlock (up->lock);
}
void
@@ -421,7 +439,7 @@ rspamd_upstream_ok (struct upstream *up)
{
struct upstream_addr_elt *addr_elt;
- rspamd_mutex_lock (up->lock);
+ RSPAMD_UPSTREAM_LOCK (up->lock);
if (up->errors > 0 && up->active_idx != -1) {
/* We touch upstream if and only if it is active */
up->errors = 0;
@@ -433,7 +451,7 @@ rspamd_upstream_ok (struct upstream *up)
}
}
- rspamd_mutex_unlock (up->lock);
+ RSPAMD_UPSTREAM_UNLOCK (up->lock);
}
#define SEED_CONSTANT 0xa574de7df64e9b9dULL
@@ -704,11 +722,11 @@ rspamd_upstream_restore_cb (gpointer elt, gpointer ls)
struct upstream_list *ups = (struct upstream_list *)ls;
/* Here the upstreams list is already locked */
- rspamd_mutex_lock (up->lock);
+ RSPAMD_UPSTREAM_LOCK (up->lock);
event_del (&up->ev);
g_ptr_array_add (ups->alive, up);
up->active_idx = ups->alive->len - 1;
- rspamd_mutex_unlock (up->lock);
+ RSPAMD_UPSTREAM_UNLOCK (up->lock);
/* For revive event */
REF_RELEASE (up);
}
@@ -729,7 +747,7 @@ rspamd_upstream_get_round_robin (struct upstream_list *ups, gboolean use_cur)
guint i;
/* Select upstream with the maximum cur_weight */
- rspamd_mutex_lock (ups->lock);
+ RSPAMD_UPSTREAM_LOCK (ups->lock);
for (i = 0; i < ups->alive->len; i ++) {
up = g_ptr_array_index (ups->alive, i);
if (use_cur) {
@@ -764,7 +782,7 @@ rspamd_upstream_get_round_robin (struct upstream_list *ups, gboolean use_cur)
}
}
- rspamd_mutex_unlock (ups->lock);
+ RSPAMD_UPSTREAM_UNLOCK (ups->lock);
return selected;
}
@@ -800,9 +818,9 @@ rspamd_upstream_get_hashed (struct upstream_list *ups, const guint8 *key, guint
k = rspamd_cryptobox_fast_hash_specific (RSPAMD_CRYPTOBOX_XXHASH64,
key, keylen, ups->hash_seed);
- rspamd_mutex_lock (ups->lock);
+ RSPAMD_UPSTREAM_LOCK (ups->lock);
idx = rspamd_consistent_hash (k, ups->alive->len);
- rspamd_mutex_unlock (ups->lock);
+ RSPAMD_UPSTREAM_UNLOCK (ups->lock);
return g_ptr_array_index (ups->alive, idx);
}
@@ -814,12 +832,12 @@ rspamd_upstream_get_common (struct upstream_list *ups,
{
enum rspamd_upstream_rotation type;
- rspamd_mutex_lock (ups->lock);
+ RSPAMD_UPSTREAM_LOCK (ups->lock);
if (ups->alive->len == 0) {
/* We have no upstreams alive */
g_ptr_array_foreach (ups->ups, rspamd_upstream_restore_cb, ups);
}
- rspamd_mutex_unlock (ups->lock);
+ RSPAMD_UPSTREAM_UNLOCK (ups->lock);
if (!forced) {
type = ups->rot_alg != RSPAMD_UPSTREAM_UNDEF ? ups->rot_alg : default_type;