]> source.dussan.org Git - rspamd.git/commitdiff
Start new upstreams implementation.
authorVsevolod Stakhov <vsevolod@highsecure.ru>
Tue, 28 Oct 2014 13:29:13 +0000 (13:29 +0000)
committerVsevolod Stakhov <vsevolod@highsecure.ru>
Tue, 28 Oct 2014 13:29:13 +0000 (13:29 +0000)
src/libutil/upstream.c
src/libutil/upstream.h

index a35667eba473ae999195b4efe8a0100e159dd0a3..f8a0d761842ad23a48a049aa40663483205a52f8 100644 (file)
 
 #include "config.h"
 #include "upstream.h"
+#include "ottery.h"
+
+struct upstream {
+       guint weight;
+       guint cur_weight;
+       guint errors;
+       guint port;
+       guint active_idx;
+       gchar *name;
+       struct event ev;
+       struct timeval tv;
+       gpointer ud;
+       struct upstream_list *ls;
+       rspamd_inet_addr_t addr;
+};
+
+struct upstream_list {
+       GArray *ups;
+       GPtrArray *alive;
+       rspamd_mutex_t *lock;
+       guint hash_seed;
+};
+
+static struct rdns_resolver *res = NULL;
+static struct event_base *ev_base = NULL;
+const guint default_max_errors = 4;
+
+static void
+rspamd_upstream_set_inactive (struct upstream_list *ls, struct upstream *up)
+{
+       rspamd_mutex_lock (ls->lock);
+       g_ptr_array_remove_index (ls->alive, up->active_idx);
+       up->active_idx = 0;
+       rspamd_mutex_unlock (ls->lock);
+}
+
+static void
+rspamd_upstream_set_active (struct upstream_list *ls, struct upstream *up)
+{
+       rspamd_mutex_lock (ls->lock);
+       g_ptr_array_add (ls->alive, up);
+       up->active_idx = ls->alive->len - 1;
+       rspamd_mutex_unlock (ls->lock);
+}
+
+void
+rspamd_upstreams_library_init (struct rdns_resolver *resolver,
+               struct event_base *base)
+{
+       res = resolver;
+       ev_base = base;
+}
+
+void
+rspamd_upstream_fail (struct upstream *up)
+{
+       if (g_atomic_int_compare_and_exchange (&up->errors, 0, 1)) {
+               gettimeofday (&up->tv, NULL);
+               up->errors ++;
+       }
+       else {
+               g_atomic_int_inc (&up->errors);
+       }
+
+       if (g_atomic_int_compare_and_exchange (&up->errors, default_max_errors, 0)) {
+               /* Remove upstream from the active list */
+               rspamd_upstream_set_inactive (up->ls, up);
+       }
+}
+
+void
+rspamd_upstream_ok (struct upstream *up)
+{
+       if (up->errors > 0) {
+               up->errors = 0;
+               rspamd_upstream_set_active (up->ls, up);
+       }
+
+       /* Rotate weight of the alive upstream */
+       up->cur_weight = up->cur_weight > 0 ? up->cur_weight -- : up->weight;
+}
+
+struct upstream_list*
+rspamd_upstreams_create (void)
+{
+       struct upstream_list *ls;
+
+       ls = g_slice_alloc (sizeof (*ls));
+       ls->hash_seed = ottery_rand_unsigned ();
+       ls->ups = g_array_new (FALSE, TRUE, sizeof (struct upstream_list));
+       ls->alive = g_ptr_array_new ();
+       ls->lock = rspamd_mutex_new ();
+
+       return ls;
+}
+
+
+rspamd_inet_addr_t*
+rspamd_upstream_addr (struct upstream *up)
+{
+       return &up->addr;
+}
index b2235866fa9471de2e62f0e3c7bb3c4748400985..7adad8d22604bd17077c74ac5b5b3a55b2dbf42d 100644 (file)
@@ -20,7 +20,8 @@ struct upstream_list;
  * Init upstreams library
  * @param resolver
  */
-void rspamd_upstreams_library_init (struct rdns_resolver *resolver);
+void rspamd_upstreams_library_init (struct rdns_resolver *resolver,
+               struct event_base *base);
 
 /**
  * Upstream error logic
@@ -33,12 +34,12 @@ void rspamd_upstreams_library_init (struct rdns_resolver *resolver);
 /**
  * Add an error to an upstream
  */
-void upstream_fail (struct upstream *up);
+void rspamd_upstream_fail (struct upstream *up);
 
 /**
  * Increase upstream successes count
  */
-void upstream_ok (struct upstream *up);
+void rspamd_upstream_ok (struct upstream *up);
 
 /**
  * Create new list of upstreams