diff options
Diffstat (limited to 'src/libutil/upstream.c')
-rw-r--r-- | src/libutil/upstream.c | 124 |
1 files changed, 71 insertions, 53 deletions
diff --git a/src/libutil/upstream.c b/src/libutil/upstream.c index e8bf1a19c..43d9f7448 100644 --- a/src/libutil/upstream.c +++ b/src/libutil/upstream.c @@ -82,6 +82,14 @@ struct upstream_ctx { ref_entry_t ref; }; +#ifndef UPSTREAMS_THREAD_SAFE +#define RSPAMD_UPSTREAM_LOCK(x) do { } while (0) +#define RSPAMD_UPSTREAM_UNLOCK(x) do { } while (0) +#else +#define RSPAMD_UPSTREAM_LOCK(x) rspamd_mutex_lock(x) +#define RSPAMD_UPSTREAM_UNLOCK(x) rspamd_mutex_unlock(x) +#endif + /* 4 errors in 10 seconds */ static guint default_max_errors = 4; static gdouble default_revive_time = 60; @@ -202,10 +210,10 @@ rspamd_upstream_addr_sort_func (gconstpointer a, gconstpointer b) static void rspamd_upstream_set_active (struct upstream_list *ls, struct upstream *up) { - rspamd_mutex_lock (ls->lock); + RSPAMD_UPSTREAM_LOCK (ls->lock); g_ptr_array_add (ls->alive, up); up->active_idx = ls->alive->len - 1; - rspamd_mutex_unlock (ls->lock); + RSPAMD_UPSTREAM_UNLOCK (ls->lock); } static void @@ -230,7 +238,7 @@ rspamd_upstream_update_addrs (struct upstream *up) * We need first of all get the saved port, since DNS gives us no * idea about what port has been used previously */ - rspamd_mutex_lock (up->lock); + RSPAMD_UPSTREAM_LOCK (up->lock); if (up->addrs.addr->len > 0 && up->new_addrs) { addr_elt = g_ptr_array_index (up->addrs.addr, 0); @@ -266,7 +274,7 @@ rspamd_upstream_update_addrs (struct upstream *up) } up->new_addrs = NULL; - rspamd_mutex_unlock (up->lock); + RSPAMD_UPSTREAM_UNLOCK (up->lock); } static void @@ -279,7 +287,7 @@ rspamd_upstream_dns_cb (struct rdns_reply *reply, void *arg) if (reply->code == RDNS_RC_NOERROR) { entry = reply->entries; - rspamd_mutex_lock (up->lock); + RSPAMD_UPSTREAM_LOCK (up->lock); while (entry) { if (entry->type == RDNS_REQUEST_A) { @@ -297,7 +305,7 @@ rspamd_upstream_dns_cb (struct rdns_reply *reply, void *arg) entry = entry->next; } - rspamd_mutex_unlock (up->lock); + RSPAMD_UPSTREAM_UNLOCK (up->lock); } up->dns_requests--; @@ -314,13 +322,13 @@ rspamd_upstream_revive_cb (int fd, short what, void *arg) { struct upstream *up = (struct upstream *)arg; - rspamd_mutex_lock (up->lock); + RSPAMD_UPSTREAM_LOCK (up->lock); event_del (&up->ev); if (up->ls) { rspamd_upstream_set_active (up->ls, up); } - rspamd_mutex_unlock (up->lock); + RSPAMD_UPSTREAM_UNLOCK (up->lock); REF_RELEASE (up); } @@ -328,11 +336,19 @@ static void rspamd_upstream_set_inactive (struct upstream_list *ls, struct upstream *up) { gdouble ntim; + guint i; + struct upstream *cur; - rspamd_mutex_lock (ls->lock); + RSPAMD_UPSTREAM_LOCK (ls->lock); g_ptr_array_remove_index (ls->alive, up->active_idx); up->active_idx = -1; + /* We need to update all indicies */ + for (i = 0; i < ls->alive->len; i ++) { + cur = g_ptr_array_index (ls->alive, i); + cur->active_idx = i; + } + if (up->ctx->res != NULL && up->ctx->configured && !(ls->flags & RSPAMD_UPSTREAM_FLAG_NORESOLVE)) { /* Resolve name of the upstream one more time */ @@ -364,7 +380,7 @@ rspamd_upstream_set_inactive (struct upstream_list *ls, struct upstream *up) double_to_tv (ntim, &up->tv); event_add (&up->ev, &up->tv); - rspamd_mutex_unlock (ls->lock); + RSPAMD_UPSTREAM_UNLOCK (ls->lock); } void @@ -375,45 +391,47 @@ rspamd_upstream_fail (struct upstream *up) gdouble sec_last, sec_cur; struct upstream_addr_elt *addr_elt; - gettimeofday (&tv, NULL); + if (up->active_idx != -1) { + gettimeofday (&tv, NULL); - rspamd_mutex_lock (up->lock); - if (up->errors == 0 && up->active_idx != -1) { - /* We have the first error */ - up->tv = tv; - up->errors = 1; - } - else if (up->active_idx != -1) { - sec_last = tv_to_double (&up->tv); - sec_cur = tv_to_double (&tv); - - if (sec_cur >= sec_last) { - up->errors ++; - - if (sec_cur > sec_last) { - error_rate = ((gdouble)up->errors) / (sec_cur - sec_last); - max_error_rate = ((gdouble)up->ctx->max_errors) / up->ctx->error_time; - } - else { - error_rate = 1; - max_error_rate = 0; + RSPAMD_UPSTREAM_LOCK (up->lock); + if (up->errors == 0) { + /* We have the first error */ + up->tv = tv; + up->errors = 1; + } + else { + sec_last = tv_to_double (&up->tv); + sec_cur = tv_to_double (&tv); + + if (sec_cur >= sec_last) { + up->errors ++; + + if (sec_cur > sec_last) { + error_rate = ((gdouble)up->errors) / (sec_cur - sec_last); + max_error_rate = ((gdouble)up->ctx->max_errors) / up->ctx->error_time; + } + else { + error_rate = 1; + max_error_rate = 0; + } + + if (error_rate > max_error_rate && up->active_idx != -1) { + /* Remove upstream from the active list */ + up->errors = 0; + rspamd_upstream_set_inactive (up->ls, up); + } } + } - if (error_rate > max_error_rate && up->active_idx != -1) { - /* Remove upstream from the active list */ - up->errors = 0; - rspamd_upstream_set_inactive (up->ls, up); - } + /* Also increase count of errors for this specific address */ + if (up->addrs.addr) { + addr_elt = g_ptr_array_index (up->addrs.addr, up->addrs.cur); + addr_elt->errors ++; } - } - /* Also increase count of errors for this specific address */ - if (up->addrs.addr) { - addr_elt = g_ptr_array_index (up->addrs.addr, up->addrs.cur); - addr_elt->errors ++; + RSPAMD_UPSTREAM_UNLOCK (up->lock); } - - rspamd_mutex_unlock (up->lock); } void @@ -421,7 +439,7 @@ rspamd_upstream_ok (struct upstream *up) { struct upstream_addr_elt *addr_elt; - rspamd_mutex_lock (up->lock); + RSPAMD_UPSTREAM_LOCK (up->lock); if (up->errors > 0 && up->active_idx != -1) { /* We touch upstream if and only if it is active */ up->errors = 0; @@ -433,7 +451,7 @@ rspamd_upstream_ok (struct upstream *up) } } - rspamd_mutex_unlock (up->lock); + RSPAMD_UPSTREAM_UNLOCK (up->lock); } #define SEED_CONSTANT 0xa574de7df64e9b9dULL @@ -704,11 +722,11 @@ rspamd_upstream_restore_cb (gpointer elt, gpointer ls) struct upstream_list *ups = (struct upstream_list *)ls; /* Here the upstreams list is already locked */ - rspamd_mutex_lock (up->lock); + RSPAMD_UPSTREAM_LOCK (up->lock); event_del (&up->ev); g_ptr_array_add (ups->alive, up); up->active_idx = ups->alive->len - 1; - rspamd_mutex_unlock (up->lock); + RSPAMD_UPSTREAM_UNLOCK (up->lock); /* For revive event */ REF_RELEASE (up); } @@ -729,7 +747,7 @@ rspamd_upstream_get_round_robin (struct upstream_list *ups, gboolean use_cur) guint i; /* Select upstream with the maximum cur_weight */ - rspamd_mutex_lock (ups->lock); + RSPAMD_UPSTREAM_LOCK (ups->lock); for (i = 0; i < ups->alive->len; i ++) { up = g_ptr_array_index (ups->alive, i); if (use_cur) { @@ -764,7 +782,7 @@ rspamd_upstream_get_round_robin (struct upstream_list *ups, gboolean use_cur) } } - rspamd_mutex_unlock (ups->lock); + RSPAMD_UPSTREAM_UNLOCK (ups->lock); return selected; } @@ -800,9 +818,9 @@ rspamd_upstream_get_hashed (struct upstream_list *ups, const guint8 *key, guint k = rspamd_cryptobox_fast_hash_specific (RSPAMD_CRYPTOBOX_XXHASH64, key, keylen, ups->hash_seed); - rspamd_mutex_lock (ups->lock); + RSPAMD_UPSTREAM_LOCK (ups->lock); idx = rspamd_consistent_hash (k, ups->alive->len); - rspamd_mutex_unlock (ups->lock); + RSPAMD_UPSTREAM_UNLOCK (ups->lock); return g_ptr_array_index (ups->alive, idx); } @@ -814,12 +832,12 @@ rspamd_upstream_get_common (struct upstream_list *ups, { enum rspamd_upstream_rotation type; - rspamd_mutex_lock (ups->lock); + RSPAMD_UPSTREAM_LOCK (ups->lock); if (ups->alive->len == 0) { /* We have no upstreams alive */ g_ptr_array_foreach (ups->ups, rspamd_upstream_restore_cb, ups); } - rspamd_mutex_unlock (ups->lock); + RSPAMD_UPSTREAM_UNLOCK (ups->lock); if (!forced) { type = ups->rot_alg != RSPAMD_UPSTREAM_UNDEF ? ups->rot_alg : default_type; |