]> source.dussan.org Git - rspamd.git/commitdiff
[Fix] Another fix for marking upstreams inactive
authorVsevolod Stakhov <vsevolod@highsecure.ru>
Thu, 23 Jun 2016 15:06:44 +0000 (16:06 +0100)
committerVsevolod Stakhov <vsevolod@highsecure.ru>
Thu, 23 Jun 2016 15:06:44 +0000 (16:06 +0100)
src/libutil/upstream.c

index e8bf1a19c300ab7bcc94e84e5c5fe05d57b07083..43d9f7448ad898537ee7d4a0959f992627bc4c0f 100644 (file)
@@ -82,6 +82,14 @@ struct upstream_ctx {
        ref_entry_t ref;
 };
 
+#ifndef UPSTREAMS_THREAD_SAFE
+#define RSPAMD_UPSTREAM_LOCK(x) do { } while (0)
+#define RSPAMD_UPSTREAM_UNLOCK(x) do { } while (0)
+#else
+#define RSPAMD_UPSTREAM_LOCK(x) rspamd_mutex_lock(x)
+#define RSPAMD_UPSTREAM_UNLOCK(x) rspamd_mutex_unlock(x)
+#endif
+
 /* 4 errors in 10 seconds */
 static guint default_max_errors = 4;
 static gdouble default_revive_time = 60;
@@ -202,10 +210,10 @@ rspamd_upstream_addr_sort_func (gconstpointer a, gconstpointer b)
 static void
 rspamd_upstream_set_active (struct upstream_list *ls, struct upstream *up)
 {
-       rspamd_mutex_lock (ls->lock);
+       RSPAMD_UPSTREAM_LOCK (ls->lock);
        g_ptr_array_add (ls->alive, up);
        up->active_idx = ls->alive->len - 1;
-       rspamd_mutex_unlock (ls->lock);
+       RSPAMD_UPSTREAM_UNLOCK (ls->lock);
 }
 
 static void
@@ -230,7 +238,7 @@ rspamd_upstream_update_addrs (struct upstream *up)
         * We need first of all get the saved port, since DNS gives us no
         * idea about what port has been used previously
         */
-       rspamd_mutex_lock (up->lock);
+       RSPAMD_UPSTREAM_LOCK (up->lock);
 
        if (up->addrs.addr->len > 0 && up->new_addrs) {
                addr_elt = g_ptr_array_index (up->addrs.addr, 0);
@@ -266,7 +274,7 @@ rspamd_upstream_update_addrs (struct upstream *up)
        }
        up->new_addrs = NULL;
 
-       rspamd_mutex_unlock (up->lock);
+       RSPAMD_UPSTREAM_UNLOCK (up->lock);
 }
 
 static void
@@ -279,7 +287,7 @@ rspamd_upstream_dns_cb (struct rdns_reply *reply, void *arg)
        if (reply->code == RDNS_RC_NOERROR) {
                entry = reply->entries;
 
-               rspamd_mutex_lock (up->lock);
+               RSPAMD_UPSTREAM_LOCK (up->lock);
                while (entry) {
 
                        if (entry->type == RDNS_REQUEST_A) {
@@ -297,7 +305,7 @@ rspamd_upstream_dns_cb (struct rdns_reply *reply, void *arg)
                        entry = entry->next;
                }
 
-               rspamd_mutex_unlock (up->lock);
+               RSPAMD_UPSTREAM_UNLOCK (up->lock);
        }
 
        up->dns_requests--;
@@ -314,13 +322,13 @@ rspamd_upstream_revive_cb (int fd, short what, void *arg)
 {
        struct upstream *up = (struct upstream *)arg;
 
-       rspamd_mutex_lock (up->lock);
+       RSPAMD_UPSTREAM_LOCK (up->lock);
        event_del (&up->ev);
        if (up->ls) {
                rspamd_upstream_set_active (up->ls, up);
        }
 
-       rspamd_mutex_unlock (up->lock);
+       RSPAMD_UPSTREAM_UNLOCK (up->lock);
        REF_RELEASE (up);
 }
 
@@ -328,11 +336,19 @@ static void
 rspamd_upstream_set_inactive (struct upstream_list *ls, struct upstream *up)
 {
        gdouble ntim;
+       guint i;
+       struct upstream *cur;
 
-       rspamd_mutex_lock (ls->lock);
+       RSPAMD_UPSTREAM_LOCK (ls->lock);
        g_ptr_array_remove_index (ls->alive, up->active_idx);
        up->active_idx = -1;
 
+       /* We need to update all indicies */
+       for (i = 0; i < ls->alive->len; i ++) {
+               cur = g_ptr_array_index (ls->alive, i);
+               cur->active_idx = i;
+       }
+
        if (up->ctx->res != NULL && up->ctx->configured &&
                        !(ls->flags & RSPAMD_UPSTREAM_FLAG_NORESOLVE)) {
                /* Resolve name of the upstream one more time */
@@ -364,7 +380,7 @@ rspamd_upstream_set_inactive (struct upstream_list *ls, struct upstream *up)
        double_to_tv (ntim, &up->tv);
        event_add (&up->ev, &up->tv);
 
-       rspamd_mutex_unlock (ls->lock);
+       RSPAMD_UPSTREAM_UNLOCK (ls->lock);
 }
 
 void
@@ -375,45 +391,47 @@ rspamd_upstream_fail (struct upstream *up)
        gdouble sec_last, sec_cur;
        struct upstream_addr_elt *addr_elt;
 
-       gettimeofday (&tv, NULL);
+       if (up->active_idx != -1) {
+               gettimeofday (&tv, NULL);
 
-       rspamd_mutex_lock (up->lock);
-       if (up->errors == 0 && up->active_idx != -1) {
-               /* We have the first error */
-               up->tv = tv;
-               up->errors = 1;
-       }
-       else if (up->active_idx != -1) {
-               sec_last = tv_to_double (&up->tv);
-               sec_cur = tv_to_double (&tv);
-
-               if (sec_cur >= sec_last) {
-                       up->errors ++;
-
-                       if (sec_cur > sec_last) {
-                               error_rate = ((gdouble)up->errors) / (sec_cur - sec_last);
-                               max_error_rate = ((gdouble)up->ctx->max_errors) / up->ctx->error_time;
-                       }
-                       else {
-                               error_rate = 1;
-                               max_error_rate = 0;
+               RSPAMD_UPSTREAM_LOCK (up->lock);
+               if (up->errors == 0) {
+                       /* We have the first error */
+                       up->tv = tv;
+                       up->errors = 1;
+               }
+               else {
+                       sec_last = tv_to_double (&up->tv);
+                       sec_cur = tv_to_double (&tv);
+
+                       if (sec_cur >= sec_last) {
+                               up->errors ++;
+
+                               if (sec_cur > sec_last) {
+                                       error_rate = ((gdouble)up->errors) / (sec_cur - sec_last);
+                                       max_error_rate = ((gdouble)up->ctx->max_errors) / up->ctx->error_time;
+                               }
+                               else {
+                                       error_rate = 1;
+                                       max_error_rate = 0;
+                               }
+
+                               if (error_rate > max_error_rate && up->active_idx != -1) {
+                                       /* Remove upstream from the active list */
+                                       up->errors = 0;
+                                       rspamd_upstream_set_inactive (up->ls, up);
+                               }
                        }
+               }
 
-                       if (error_rate > max_error_rate && up->active_idx != -1) {
-                               /* Remove upstream from the active list */
-                               up->errors = 0;
-                               rspamd_upstream_set_inactive (up->ls, up);
-                       }
+               /* Also increase count of errors for this specific address */
+               if (up->addrs.addr) {
+                       addr_elt = g_ptr_array_index (up->addrs.addr, up->addrs.cur);
+                       addr_elt->errors ++;
                }
-       }
 
-       /* Also increase count of errors for this specific address */
-       if (up->addrs.addr) {
-               addr_elt = g_ptr_array_index (up->addrs.addr, up->addrs.cur);
-               addr_elt->errors ++;
+               RSPAMD_UPSTREAM_UNLOCK (up->lock);
        }
-
-       rspamd_mutex_unlock (up->lock);
 }
 
 void
@@ -421,7 +439,7 @@ rspamd_upstream_ok (struct upstream *up)
 {
        struct upstream_addr_elt *addr_elt;
 
-       rspamd_mutex_lock (up->lock);
+       RSPAMD_UPSTREAM_LOCK (up->lock);
        if (up->errors > 0 && up->active_idx != -1) {
                /* We touch upstream if and only if it is active */
                up->errors = 0;
@@ -433,7 +451,7 @@ rspamd_upstream_ok (struct upstream *up)
                }
        }
 
-       rspamd_mutex_unlock (up->lock);
+       RSPAMD_UPSTREAM_UNLOCK (up->lock);
 }
 
 #define SEED_CONSTANT 0xa574de7df64e9b9dULL
@@ -704,11 +722,11 @@ rspamd_upstream_restore_cb (gpointer elt, gpointer ls)
        struct upstream_list *ups = (struct upstream_list *)ls;
 
        /* Here the upstreams list is already locked */
-       rspamd_mutex_lock (up->lock);
+       RSPAMD_UPSTREAM_LOCK (up->lock);
        event_del (&up->ev);
        g_ptr_array_add (ups->alive, up);
        up->active_idx = ups->alive->len - 1;
-       rspamd_mutex_unlock (up->lock);
+       RSPAMD_UPSTREAM_UNLOCK (up->lock);
        /* For revive event */
        REF_RELEASE (up);
 }
@@ -729,7 +747,7 @@ rspamd_upstream_get_round_robin (struct upstream_list *ups, gboolean use_cur)
        guint i;
 
        /* Select upstream with the maximum cur_weight */
-       rspamd_mutex_lock (ups->lock);
+       RSPAMD_UPSTREAM_LOCK (ups->lock);
        for (i = 0; i < ups->alive->len; i ++) {
                up = g_ptr_array_index (ups->alive, i);
                if (use_cur) {
@@ -764,7 +782,7 @@ rspamd_upstream_get_round_robin (struct upstream_list *ups, gboolean use_cur)
                }
        }
 
-       rspamd_mutex_unlock (ups->lock);
+       RSPAMD_UPSTREAM_UNLOCK (ups->lock);
 
        return selected;
 }
@@ -800,9 +818,9 @@ rspamd_upstream_get_hashed (struct upstream_list *ups, const guint8 *key, guint
        k = rspamd_cryptobox_fast_hash_specific (RSPAMD_CRYPTOBOX_XXHASH64,
                        key, keylen, ups->hash_seed);
 
-       rspamd_mutex_lock (ups->lock);
+       RSPAMD_UPSTREAM_LOCK (ups->lock);
        idx = rspamd_consistent_hash (k, ups->alive->len);
-       rspamd_mutex_unlock (ups->lock);
+       RSPAMD_UPSTREAM_UNLOCK (ups->lock);
 
        return g_ptr_array_index (ups->alive, idx);
 }
@@ -814,12 +832,12 @@ rspamd_upstream_get_common (struct upstream_list *ups,
 {
        enum rspamd_upstream_rotation type;
 
-       rspamd_mutex_lock (ups->lock);
+       RSPAMD_UPSTREAM_LOCK (ups->lock);
        if (ups->alive->len == 0) {
                /* We have no upstreams alive */
                g_ptr_array_foreach (ups->ups, rspamd_upstream_restore_cb, ups);
        }
-       rspamd_mutex_unlock (ups->lock);
+       RSPAMD_UPSTREAM_UNLOCK (ups->lock);
 
        if (!forced) {
                type = ups->rot_alg != RSPAMD_UPSTREAM_UNDEF ? ups->rot_alg : default_type;