summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorVsevolod Stakhov <vsevolod@highsecure.ru>2014-10-28 13:29:13 +0000
committerVsevolod Stakhov <vsevolod@highsecure.ru>2014-10-28 13:29:13 +0000
commitd95807104076189411987dcc733a23a94b8812a5 (patch)
tree0a752e15aacb77a7ca0e66b93be8cd2f01b03a3d
parent397dfabb70a3e1169e7bee1b35a4f003f0fa8f81 (diff)
downloadrspamd-d95807104076189411987dcc733a23a94b8812a5.tar.gz
rspamd-d95807104076189411987dcc733a23a94b8812a5.zip
Start new upstreams implementation.
-rw-r--r--src/libutil/upstream.c102
-rw-r--r--src/libutil/upstream.h7
2 files changed, 106 insertions, 3 deletions
diff --git a/src/libutil/upstream.c b/src/libutil/upstream.c
index a35667eba..f8a0d7618 100644
--- a/src/libutil/upstream.c
+++ b/src/libutil/upstream.c
@@ -24,3 +24,105 @@
#include "config.h"
#include "upstream.h"
+#include "ottery.h"
+
+struct upstream {
+ guint weight;
+ guint cur_weight;
+ guint errors;
+ guint port;
+ guint active_idx;
+ gchar *name;
+ struct event ev;
+ struct timeval tv;
+ gpointer ud;
+ struct upstream_list *ls;
+ rspamd_inet_addr_t addr;
+};
+
+struct upstream_list {
+ GArray *ups;
+ GPtrArray *alive;
+ rspamd_mutex_t *lock;
+ guint hash_seed;
+};
+
+static struct rdns_resolver *res = NULL;
+static struct event_base *ev_base = NULL;
+const guint default_max_errors = 4;
+
+static void
+rspamd_upstream_set_inactive (struct upstream_list *ls, struct upstream *up)
+{
+ rspamd_mutex_lock (ls->lock);
+ g_ptr_array_remove_index (ls->alive, up->active_idx);
+ up->active_idx = 0;
+ rspamd_mutex_unlock (ls->lock);
+}
+
+static void
+rspamd_upstream_set_active (struct upstream_list *ls, struct upstream *up)
+{
+ rspamd_mutex_lock (ls->lock);
+ g_ptr_array_add (ls->alive, up);
+ up->active_idx = ls->alive->len - 1;
+ rspamd_mutex_unlock (ls->lock);
+}
+
+void
+rspamd_upstreams_library_init (struct rdns_resolver *resolver,
+ struct event_base *base)
+{
+ res = resolver;
+ ev_base = base;
+}
+
+void
+rspamd_upstream_fail (struct upstream *up)
+{
+ if (g_atomic_int_compare_and_exchange (&up->errors, 0, 1)) {
+ gettimeofday (&up->tv, NULL);
+ up->errors ++;
+ }
+ else {
+ g_atomic_int_inc (&up->errors);
+ }
+
+ if (g_atomic_int_compare_and_exchange (&up->errors, default_max_errors, 0)) {
+ /* Remove upstream from the active list */
+ rspamd_upstream_set_inactive (up->ls, up);
+ }
+}
+
+void
+rspamd_upstream_ok (struct upstream *up)
+{
+ if (up->errors > 0) {
+ up->errors = 0;
+ rspamd_upstream_set_active (up->ls, up);
+ }
+
+ /* Rotate weight of the alive upstream */
+ up->cur_weight = up->cur_weight > 0 ? up->cur_weight -- : up->weight;
+}
+
+struct upstream_list*
+rspamd_upstreams_create (void)
+{
+ struct upstream_list *ls;
+
+ ls = g_slice_alloc (sizeof (*ls));
+ ls->hash_seed = ottery_rand_unsigned ();
+ ls->ups = g_array_new (FALSE, TRUE, sizeof (struct upstream_list));
+ ls->alive = g_ptr_array_new ();
+ ls->lock = rspamd_mutex_new ();
+
+ return ls;
+}
+
+
+rspamd_inet_addr_t*
+rspamd_upstream_addr (struct upstream *up)
+{
+ return &up->addr;
+}
diff --git a/src/libutil/upstream.h b/src/libutil/upstream.h
index b2235866f..7adad8d22 100644
--- a/src/libutil/upstream.h
+++ b/src/libutil/upstream.h
@@ -20,7 +20,8 @@ struct upstream_list;
* Init upstreams library
* @param resolver
*/
-void rspamd_upstreams_library_init (struct rdns_resolver *resolver);
+void rspamd_upstreams_library_init (struct rdns_resolver *resolver,
+ struct event_base *base);
/**
* Upstream error logic
@@ -33,12 +34,12 @@ void rspamd_upstreams_library_init (struct rdns_resolver *resolver);
/**
* Add an error to an upstream
*/
-void upstream_fail (struct upstream *up);
+void rspamd_upstream_fail (struct upstream *up);
/**
* Increase upstream successes count
*/
-void upstream_ok (struct upstream *up);
+void rspamd_upstream_ok (struct upstream *up);
/**
* Create new list of upstreams