aboutsummaryrefslogtreecommitdiffstats
path: root/src/libutil/upstream.c
diff options
context:
space:
mode:
authorVsevolod Stakhov <vsevolod@highsecure.ru>2019-07-26 17:26:23 +0100
committerVsevolod Stakhov <vsevolod@highsecure.ru>2019-07-26 17:26:23 +0100
commit44f911a00655f5c8caa103f15afa155c25ac25a0 (patch)
tree2ab5411ec8445bce6049577da5a34a5f1108a8cc /src/libutil/upstream.c
parentd299f79c0c92ec03b19b88b6d338bc9f9a54383e (diff)
downloadrspamd-44f911a00655f5c8caa103f15afa155c25ac25a0.tar.gz
rspamd-44f911a00655f5c8caa103f15afa155c25ac25a0.zip
[Feature] Upstreams: Add lazy resolving logic to all upstreams
Diffstat (limited to 'src/libutil/upstream.c')
-rw-r--r--src/libutil/upstream.c81
1 files changed, 79 insertions, 2 deletions
diff --git a/src/libutil/upstream.c b/src/libutil/upstream.c
index c445751b4..4cd39f5ea 100644
--- a/src/libutil/upstream.c
+++ b/src/libutil/upstream.c
@@ -73,6 +73,7 @@ struct upstream_limits {
gdouble revive_jitter;
gdouble error_time;
gdouble dns_timeout;
+ gdouble lazy_resolve_time;
guint max_errors;
guint dns_retransmits;
};
@@ -115,6 +116,10 @@ static gdouble default_revive_jitter = 0.4;
static gdouble default_error_time = 10;
static gdouble default_dns_timeout = 1.0;
static guint default_dns_retransmits = 2;
+/* TODO: make it configurable */
+static gdouble default_lazy_resolve_time = 3600.0;
+
+static void rspamd_upstream_lazy_resolve_cb (struct ev_loop *, ev_timer *, int );
void
rspamd_upstreams_library_config (struct rspamd_config *cfg,
@@ -144,6 +149,29 @@ rspamd_upstreams_library_config (struct rspamd_config *cfg,
ctx->event_loop = event_loop;
ctx->res = resolver;
ctx->configured = TRUE;
+
+ /* Start lazy resolving */
+ if (event_loop && resolver) {
+ GList *cur;
+ struct upstream *u;
+
+ cur = ctx->upstreams->head;
+
+ while (cur) {
+ u = cur->data;
+ if (!ev_is_active (&u->ev) && u->ls &&
+ !(u->ls->flags & RSPAMD_UPSTREAM_FLAG_NORESOLVE)) {
+ gdouble when = rspamd_time_jitter (u->ls->limits.lazy_resolve_time,
+ u->ls->limits.lazy_resolve_time * .1);
+ ev_timer_init (&u->ev, rspamd_upstream_lazy_resolve_cb,
+ when, 0);
+ u->ev.data = u;
+ ev_timer_start (ctx->event_loop, &u->ev);
+ }
+
+ cur = g_list_next (cur);
+ }
+ }
}
static void
@@ -184,6 +212,7 @@ rspamd_upstreams_library_init (void)
ctx->limits.dns_timeout = default_dns_timeout;
ctx->limits.revive_jitter = default_revive_jitter;
ctx->limits.revive_time = default_revive_time;
+ ctx->limits.lazy_resolve_time = default_lazy_resolve_time;
ctx->pool = rspamd_mempool_new (rspamd_mempool_suggest_size (),
"upstreams");
@@ -235,6 +264,21 @@ rspamd_upstream_set_active (struct upstream_list *ls, struct upstream *up)
RSPAMD_UPSTREAM_LOCK (ls->lock);
g_ptr_array_add (ls->alive, up);
up->active_idx = ls->alive->len - 1;
+
+ if (up->ctx && up->ctx->configured &&
+ !(ls->flags & RSPAMD_UPSTREAM_FLAG_NORESOLVE)) {
+
+ if (ev_is_active (&up->ev)) {
+ ev_timer_stop (up->ctx->event_loop, &up->ev);
+ }
+ /* Start lazy names resolution */
+ gdouble when = rspamd_time_jitter (ls->limits.lazy_resolve_time,
+ ls->limits.lazy_resolve_time * 0.1);
+ ev_timer_init (&up->ev, rspamd_upstream_lazy_resolve_cb, when, 0);
+ up->ev.data = up;
+ ev_timer_start (up->ctx->event_loop, &up->ev);
+ }
+
RSPAMD_UPSTREAM_UNLOCK (ls->lock);
}
@@ -372,6 +416,7 @@ rspamd_upstream_revive_cb (struct ev_loop *loop, ev_timer *w, int revents)
RSPAMD_UPSTREAM_LOCK (up->lock);
ev_timer_stop (loop, w);
+
if (up->ls) {
rspamd_upstream_set_active (up->ls, up);
}
@@ -409,6 +454,25 @@ rspamd_upstream_resolve_addrs (const struct upstream_list *ls,
}
static void
+rspamd_upstream_lazy_resolve_cb (struct ev_loop *loop, ev_timer *w, int revents)
+{
+ struct upstream *up = (struct upstream *)w->data;
+
+ RSPAMD_UPSTREAM_LOCK (up->lock);
+ ev_timer_stop (loop, w);
+
+ if (up->ls) {
+ rspamd_upstream_resolve_addrs (up->ls, up);
+
+ w->repeat = rspamd_time_jitter (up->ls->limits.lazy_resolve_time,
+ up->ls->limits.lazy_resolve_time * .1);
+ ev_timer_again (loop, w);
+ }
+
+ RSPAMD_UPSTREAM_UNLOCK (up->lock);
+}
+
+static void
rspamd_upstream_set_inactive (struct upstream_list *ls, struct upstream *up)
{
gdouble ntim;
@@ -432,6 +496,11 @@ rspamd_upstream_set_inactive (struct upstream_list *ls, struct upstream *up)
REF_RETAIN (up);
ntim = rspamd_time_jitter (ls->limits.revive_time,
ls->limits.revive_jitter);
+
+ if (ev_is_active (&up->ev)) {
+ ev_timer_stop (up->ctx->event_loop, &up->ev);
+ }
+
ev_timer_init (&up->ev, rspamd_upstream_revive_cb, ntim, 0);
up->ev.data = up;
@@ -583,6 +652,7 @@ rspamd_upstreams_create (struct upstream_ctx *ctx)
ls->limits.dns_timeout = default_dns_timeout;
ls->limits.revive_jitter = default_revive_jitter;
ls->limits.revive_time = default_revive_time;
+ ls->limits.lazy_resolve_time = default_lazy_resolve_time;
}
return ls;
@@ -620,6 +690,11 @@ rspamd_upstream_dtor (struct upstream *up)
rspamd_mutex_free (up->lock);
if (up->ctx) {
+
+ if (ev_is_active (&up->ev)) {
+ ev_timer_stop (up->ctx->event_loop, &up->ev);
+ }
+
g_queue_delete_link (up->ctx->upstreams, up->ctx_pos);
REF_RELEASE (up->ctx);
}
@@ -747,7 +822,6 @@ rspamd_upstreams_add_upstream (struct upstream_list *ups, const gchar *str,
}
g_ptr_array_sort (up->addrs.addr, rspamd_upstream_addr_sort_func);
-
rspamd_upstream_set_active (ups, up);
return TRUE;
@@ -913,7 +987,10 @@ rspamd_upstream_restore_cb (gpointer elt, gpointer ls)
/* Here the upstreams list is already locked */
RSPAMD_UPSTREAM_LOCK (up->lock);
- ev_timer_stop (up->ctx->event_loop, &up->ev);
+ if (ev_is_active (&up->ev)) {
+ ev_timer_stop (up->ctx->event_loop, &up->ev);
+ }
+
g_ptr_array_add (ups->alive, up);
up->active_idx = ups->alive->len - 1;
RSPAMD_UPSTREAM_UNLOCK (up->lock);