From d95807104076189411987dcc733a23a94b8812a5 Mon Sep 17 00:00:00 2001 From: Vsevolod Stakhov Date: Tue, 28 Oct 2014 13:29:13 +0000 Subject: [PATCH] Start new upstreams implementation. --- src/libutil/upstream.c | 102 +++++++++++++++++++++++++++++++++++++++++ src/libutil/upstream.h | 7 +-- 2 files changed, 106 insertions(+), 3 deletions(-) diff --git a/src/libutil/upstream.c b/src/libutil/upstream.c index a35667eba..f8a0d7618 100644 --- a/src/libutil/upstream.c +++ b/src/libutil/upstream.c @@ -24,3 +24,105 @@ #include "config.h" #include "upstream.h" +#include "ottery.h" + +struct upstream { + guint weight; + guint cur_weight; + guint errors; + guint port; + guint active_idx; + gchar *name; + struct event ev; + struct timeval tv; + gpointer ud; + struct upstream_list *ls; + rspamd_inet_addr_t addr; +}; + +struct upstream_list { + GArray *ups; + GPtrArray *alive; + rspamd_mutex_t *lock; + guint hash_seed; +}; + +static struct rdns_resolver *res = NULL; +static struct event_base *ev_base = NULL; +const guint default_max_errors = 4; + +static void +rspamd_upstream_set_inactive (struct upstream_list *ls, struct upstream *up) +{ + rspamd_mutex_lock (ls->lock); + g_ptr_array_remove_index (ls->alive, up->active_idx); + up->active_idx = 0; + rspamd_mutex_unlock (ls->lock); +} + +static void +rspamd_upstream_set_active (struct upstream_list *ls, struct upstream *up) +{ + rspamd_mutex_lock (ls->lock); + g_ptr_array_add (ls->alive, up); + up->active_idx = ls->alive->len - 1; + rspamd_mutex_unlock (ls->lock); +} + +void +rspamd_upstreams_library_init (struct rdns_resolver *resolver, + struct event_base *base) +{ + res = resolver; + ev_base = base; +} + +void +rspamd_upstream_fail (struct upstream *up) +{ + if (g_atomic_int_compare_and_exchange (&up->errors, 0, 1)) { + gettimeofday (&up->tv, NULL); + up->errors ++; + } + else { + g_atomic_int_inc (&up->errors); + } + + if (g_atomic_int_compare_and_exchange (&up->errors, default_max_errors, 0)) { + /* Remove upstream from the active list */ + rspamd_upstream_set_inactive (up->ls, up); + } +} + +void +rspamd_upstream_ok (struct upstream *up) +{ + if (up->errors > 0) { + up->errors = 0; + rspamd_upstream_set_active (up->ls, up); + } + + /* Rotate weight of the alive upstream */ + up->cur_weight = up->cur_weight > 0 ? up->cur_weight -- : up->weight; +} + +struct upstream_list* +rspamd_upstreams_create (void) +{ + struct upstream_list *ls; + + ls = g_slice_alloc (sizeof (*ls)); + ls->hash_seed = ottery_rand_unsigned (); + ls->ups = g_array_new (FALSE, TRUE, sizeof (struct upstream_list)); + ls->alive = g_ptr_array_new (); + ls->lock = rspamd_mutex_new (); + + return ls; +} + + +rspamd_inet_addr_t* +rspamd_upstream_addr (struct upstream *up) +{ + return &up->addr; +} diff --git a/src/libutil/upstream.h b/src/libutil/upstream.h index b2235866f..7adad8d22 100644 --- a/src/libutil/upstream.h +++ b/src/libutil/upstream.h @@ -20,7 +20,8 @@ struct upstream_list; * Init upstreams library * @param resolver */ -void rspamd_upstreams_library_init (struct rdns_resolver *resolver); +void rspamd_upstreams_library_init (struct rdns_resolver *resolver, + struct event_base *base); /** * Upstream error logic @@ -33,12 +34,12 @@ void rspamd_upstreams_library_init (struct rdns_resolver *resolver); /** * Add an error to an upstream */ -void upstream_fail (struct upstream *up); +void rspamd_upstream_fail (struct upstream *up); /** * Increase upstream successes count */ -void upstream_ok (struct upstream *up); +void rspamd_upstream_ok (struct upstream *up); /** * Create new list of upstreams -- 2.39.5