summaryrefslogtreecommitdiffstats
path: root/src/libutil/upstream.c
diff options
context:
space:
mode:
authorVsevolod Stakhov <vsevolod@highsecure.ru>2014-10-28 15:19:06 +0000
committerVsevolod Stakhov <vsevolod@highsecure.ru>2014-10-28 15:19:06 +0000
commitf046f61e40a6033828e89be935637c9d8a389092 (patch)
treeff91d938496a06aa7a8a6b389e766409b500f6b9 /src/libutil/upstream.c
parent4a4ff2f073debbe1b5967121fc4e1782bd004ed6 (diff)
downloadrspamd-f046f61e40a6033828e89be935637c9d8a389092.tar.gz
rspamd-f046f61e40a6033828e89be935637c9d8a389092.zip
Add some basic logic for upstreams.
Diffstat (limited to 'src/libutil/upstream.c')
-rw-r--r--src/libutil/upstream.c134
1 files changed, 125 insertions, 9 deletions
diff --git a/src/libutil/upstream.c b/src/libutil/upstream.c
index f8a0d7618..44a0455ad 100644
--- a/src/libutil/upstream.c
+++ b/src/libutil/upstream.c
@@ -25,23 +25,26 @@
#include "config.h"
#include "upstream.h"
#include "ottery.h"
+#include "ref.h"
+#include "rdns.h"
struct upstream {
guint weight;
guint cur_weight;
guint errors;
guint port;
- guint active_idx;
+ gint active_idx;
gchar *name;
struct event ev;
struct timeval tv;
gpointer ud;
struct upstream_list *ls;
rspamd_inet_addr_t addr;
+ ref_entry_t ref;
};
struct upstream_list {
- GArray *ups;
+ GPtrArray *ups;
GPtrArray *alive;
rspamd_mutex_t *lock;
guint hash_seed;
@@ -49,23 +52,78 @@ struct upstream_list {
static struct rdns_resolver *res = NULL;
static struct event_base *ev_base = NULL;
+/* 4 errors in 10 seconds */
const guint default_max_errors = 4;
+const guint default_revive_time = 60;
+const guint default_error_time = 10;
+const gdouble default_dns_timeout = 1.0;
+const guint default_dns_retransmits = 2;
static void
-rspamd_upstream_set_inactive (struct upstream_list *ls, struct upstream *up)
+rspamd_upstream_set_active (struct upstream_list *ls, struct upstream *up)
{
rspamd_mutex_lock (ls->lock);
- g_ptr_array_remove_index (ls->alive, up->active_idx);
- up->active_idx = 0;
+ g_ptr_array_add (ls->alive, up);
+ up->active_idx = ls->alive->len - 1;
rspamd_mutex_unlock (ls->lock);
}
static void
-rspamd_upstream_set_active (struct upstream_list *ls, struct upstream *up)
+rspamd_upstream_dns_cb (struct rdns_reply *reply, void *arg)
{
+ struct upstream *up = (struct upstream *)arg;
+ struct rdns_reply_entry *entry;
+
+ if (reply->code == RDNS_RC_NOERROR) {
+
+ }
+
+ REF_RELEASE (up);
+}
+
+static void
+rspamd_revive_cb (int fd, short what, void *arg)
+{
+ struct upstream *up = (struct upstream *)arg;
+
+ event_del (&up->ev);
+ if (up->ls) {
+ rspamd_upstream_set_active (up->ls, up);
+ }
+
+ REF_RELEASE (up);
+}
+
+static void
+rspamd_upstream_set_inactive (struct upstream_list *ls, struct upstream *up)
+{
+ gint query_type = -1;
+
rspamd_mutex_lock (ls->lock);
- g_ptr_array_add (ls->alive, up);
- up->active_idx = ls->alive->len - 1;
+ g_ptr_array_remove_index (ls->alive, up->active_idx);
+ up->active_idx = -1;
+ /* Resolve name of the upstream one more time */
+ if (up->addr.af == AF_INET) {
+ query_type = RDNS_REQUEST_A;
+ }
+ else if (up->addr.af == AF_INET6) {
+ query_type = RDNS_REQUEST_AAAA;
+ }
+
+ if (query_type != -1) {
+ REF_RETAIN (up);
+ rdns_make_request_full (res, rspamd_upstream_dns_cb, up,
+ default_dns_timeout, default_dns_retransmits,
+ RDNS_REQUEST_A, up->name);
+ }
+
+ REF_RETAIN (up);
+ evtimer_set (&up->ev, rspamd_revive_cb, up);
+ event_base_set (ev_base, &up->ev);
+ up->tv.tv_sec = default_revive_time;
+ up->tv.tv_usec = 0;
+ event_add (&up->ev, &up->tv);
+
rspamd_mutex_unlock (ls->lock);
}
@@ -80,6 +138,9 @@ rspamd_upstreams_library_init (struct rdns_resolver *resolver,
void
rspamd_upstream_fail (struct upstream *up)
{
+ struct timeval tv;
+ gdouble error_rate, max_error_rate;
+
if (g_atomic_int_compare_and_exchange (&up->errors, 0, 1)) {
gettimeofday (&up->tv, NULL);
up->errors ++;
@@ -88,7 +149,12 @@ rspamd_upstream_fail (struct upstream *up)
g_atomic_int_inc (&up->errors);
}
- if (g_atomic_int_compare_and_exchange (&up->errors, default_max_errors, 0)) {
+ gettimeofday (&tv, NULL);
+
+ error_rate = ((gdouble)up->errors) / (tv.tv_sec - up->tv.tv_sec);
+ max_error_rate = (gdouble)default_max_errors / (gdouble)default_error_time;
+
+ if (error_rate > max_error_rate) {
/* Remove upstream from the active list */
rspamd_upstream_set_inactive (up->ls, up);
}
@@ -120,9 +186,59 @@ rspamd_upstreams_create (void)
return ls;
}
+static void
+rspamd_upstream_dtor (struct upstream *up)
+{
+ g_free (up->name);
+ g_slice_free1 (sizeof (*up), up);
+}
rspamd_inet_addr_t*
rspamd_upstream_addr (struct upstream *up)
{
return &up->addr;
}
+
+gboolean
+rspamd_upstreams_add_upstream (struct upstream_list *ups,
+ const gchar *str, guint16 def_port, void *data)
+{
+ struct upstream *up;
+
+ up = g_slice_alloc0 (sizeof (*up));
+
+ if (!rspamd_parse_host_port_priority (str, &up->addr, &up->weight,
+ &up->name, def_port)) {
+ g_slice_free1 (sizeof (*up), up);
+ return FALSE;
+ }
+
+ g_ptr_array_add (ups->ups, up);
+ up->ud = data;
+ up->cur_weight = up->weight;
+ up->port = rspamd_inet_address_get_port (&up->addr);
+ REF_INIT_RETAIN (up, rspamd_upstream_dtor);
+
+ rspamd_upstream_set_active (ups, up);
+
+ return TRUE;
+}
+
+void
+rspamd_upstreams_destroy (struct upstream_list *ups)
+{
+ guint i;
+ struct upstream *up;
+
+ g_ptr_array_free (ups->alive, TRUE);
+
+ for (i = 0; i < ups->ups->len; i ++) {
+ up = g_ptr_array_index (ups->ups, i);
+ up->ls = NULL;
+ REF_RELEASE (up);
+ }
+
+ g_ptr_array_free (ups->ups, TRUE);
+ rspamd_mutex_free (ups->lock);
+ g_slice_free1 (sizeof (*ups), ups);
+}