#include "config.h"
#include "upstream.h"
+#include "ottery.h"
+
+struct upstream {
+ guint weight;
+ guint cur_weight;
+ guint errors;
+ guint port;
+ guint active_idx;
+ gchar *name;
+ struct event ev;
+ struct timeval tv;
+ gpointer ud;
+ struct upstream_list *ls;
+ rspamd_inet_addr_t addr;
+};
+
+struct upstream_list {
+ GArray *ups;
+ GPtrArray *alive;
+ rspamd_mutex_t *lock;
+ guint hash_seed;
+};
+
+static struct rdns_resolver *res = NULL;
+static struct event_base *ev_base = NULL;
+const guint default_max_errors = 4;
+
+static void
+rspamd_upstream_set_inactive (struct upstream_list *ls, struct upstream *up)
+{
+ rspamd_mutex_lock (ls->lock);
+ g_ptr_array_remove_index (ls->alive, up->active_idx);
+ up->active_idx = 0;
+ rspamd_mutex_unlock (ls->lock);
+}
+
+static void
+rspamd_upstream_set_active (struct upstream_list *ls, struct upstream *up)
+{
+ rspamd_mutex_lock (ls->lock);
+ g_ptr_array_add (ls->alive, up);
+ up->active_idx = ls->alive->len - 1;
+ rspamd_mutex_unlock (ls->lock);
+}
+
+void
+rspamd_upstreams_library_init (struct rdns_resolver *resolver,
+ struct event_base *base)
+{
+ res = resolver;
+ ev_base = base;
+}
+
+void
+rspamd_upstream_fail (struct upstream *up)
+{
+ if (g_atomic_int_compare_and_exchange (&up->errors, 0, 1)) {
+ gettimeofday (&up->tv, NULL);
+ up->errors ++;
+ }
+ else {
+ g_atomic_int_inc (&up->errors);
+ }
+
+ if (g_atomic_int_compare_and_exchange (&up->errors, default_max_errors, 0)) {
+ /* Remove upstream from the active list */
+ rspamd_upstream_set_inactive (up->ls, up);
+ }
+}
+
+void
+rspamd_upstream_ok (struct upstream *up)
+{
+ if (up->errors > 0) {
+ up->errors = 0;
+ rspamd_upstream_set_active (up->ls, up);
+ }
+
+ /* Rotate weight of the alive upstream */
+ up->cur_weight = up->cur_weight > 0 ? up->cur_weight -- : up->weight;
+}
+
+struct upstream_list*
+rspamd_upstreams_create (void)
+{
+ struct upstream_list *ls;
+
+ ls = g_slice_alloc (sizeof (*ls));
+ ls->hash_seed = ottery_rand_unsigned ();
+ ls->ups = g_array_new (FALSE, TRUE, sizeof (struct upstream_list));
+ ls->alive = g_ptr_array_new ();
+ ls->lock = rspamd_mutex_new ();
+
+ return ls;
+}
+
+
+rspamd_inet_addr_t*
+rspamd_upstream_addr (struct upstream *up)
+{
+ return &up->addr;
+}
* Init upstreams library
* @param resolver
*/
-void rspamd_upstreams_library_init (struct rdns_resolver *resolver);
+void rspamd_upstreams_library_init (struct rdns_resolver *resolver,
+ struct event_base *base);
/**
* Upstream error logic
/**
* Add an error to an upstream
*/
-void upstream_fail (struct upstream *up);
+void rspamd_upstream_fail (struct upstream *up);
/**
* Increase upstream successes count
*/
-void upstream_ok (struct upstream *up);
+void rspamd_upstream_ok (struct upstream *up);
/**
* Create new list of upstreams