From 44f911a00655f5c8caa103f15afa155c25ac25a0 Mon Sep 17 00:00:00 2001 From: Vsevolod Stakhov Date: Fri, 26 Jul 2019 17:26:23 +0100 Subject: [PATCH] [Feature] Upstreams: Add lazy resolving logic to all upstreams --- src/libutil/upstream.c | 81 ++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 79 insertions(+), 2 deletions(-) diff --git a/src/libutil/upstream.c b/src/libutil/upstream.c index c445751b4..4cd39f5ea 100644 --- a/src/libutil/upstream.c +++ b/src/libutil/upstream.c @@ -73,6 +73,7 @@ struct upstream_limits { gdouble revive_jitter; gdouble error_time; gdouble dns_timeout; + gdouble lazy_resolve_time; guint max_errors; guint dns_retransmits; }; @@ -115,6 +116,10 @@ static gdouble default_revive_jitter = 0.4; static gdouble default_error_time = 10; static gdouble default_dns_timeout = 1.0; static guint default_dns_retransmits = 2; +/* TODO: make it configurable */ +static gdouble default_lazy_resolve_time = 3600.0; + +static void rspamd_upstream_lazy_resolve_cb (struct ev_loop *, ev_timer *, int ); void rspamd_upstreams_library_config (struct rspamd_config *cfg, @@ -144,6 +149,29 @@ rspamd_upstreams_library_config (struct rspamd_config *cfg, ctx->event_loop = event_loop; ctx->res = resolver; ctx->configured = TRUE; + + /* Start lazy resolving */ + if (event_loop && resolver) { + GList *cur; + struct upstream *u; + + cur = ctx->upstreams->head; + + while (cur) { + u = cur->data; + if (!ev_is_active (&u->ev) && u->ls && + !(u->ls->flags & RSPAMD_UPSTREAM_FLAG_NORESOLVE)) { + gdouble when = rspamd_time_jitter (u->ls->limits.lazy_resolve_time, + u->ls->limits.lazy_resolve_time * .1); + ev_timer_init (&u->ev, rspamd_upstream_lazy_resolve_cb, + when, 0); + u->ev.data = u; + ev_timer_start (ctx->event_loop, &u->ev); + } + + cur = g_list_next (cur); + } + } } static void @@ -184,6 +212,7 @@ rspamd_upstreams_library_init (void) ctx->limits.dns_timeout = default_dns_timeout; ctx->limits.revive_jitter = default_revive_jitter; ctx->limits.revive_time = default_revive_time; + ctx->limits.lazy_resolve_time = default_lazy_resolve_time; ctx->pool = rspamd_mempool_new (rspamd_mempool_suggest_size (), "upstreams"); @@ -235,6 +264,21 @@ rspamd_upstream_set_active (struct upstream_list *ls, struct upstream *up) RSPAMD_UPSTREAM_LOCK (ls->lock); g_ptr_array_add (ls->alive, up); up->active_idx = ls->alive->len - 1; + + if (up->ctx && up->ctx->configured && + !(ls->flags & RSPAMD_UPSTREAM_FLAG_NORESOLVE)) { + + if (ev_is_active (&up->ev)) { + ev_timer_stop (up->ctx->event_loop, &up->ev); + } + /* Start lazy names resolution */ + gdouble when = rspamd_time_jitter (ls->limits.lazy_resolve_time, + ls->limits.lazy_resolve_time * 0.1); + ev_timer_init (&up->ev, rspamd_upstream_lazy_resolve_cb, when, 0); + up->ev.data = up; + ev_timer_start (up->ctx->event_loop, &up->ev); + } + RSPAMD_UPSTREAM_UNLOCK (ls->lock); } @@ -372,6 +416,7 @@ rspamd_upstream_revive_cb (struct ev_loop *loop, ev_timer *w, int revents) RSPAMD_UPSTREAM_LOCK (up->lock); ev_timer_stop (loop, w); + if (up->ls) { rspamd_upstream_set_active (up->ls, up); } @@ -408,6 +453,25 @@ rspamd_upstream_resolve_addrs (const struct upstream_list *ls, } } +static void +rspamd_upstream_lazy_resolve_cb (struct ev_loop *loop, ev_timer *w, int revents) +{ + struct upstream *up = (struct upstream *)w->data; + + RSPAMD_UPSTREAM_LOCK (up->lock); + ev_timer_stop (loop, w); + + if (up->ls) { + rspamd_upstream_resolve_addrs (up->ls, up); + + w->repeat = rspamd_time_jitter (up->ls->limits.lazy_resolve_time, + up->ls->limits.lazy_resolve_time * .1); + ev_timer_again (loop, w); + } + + RSPAMD_UPSTREAM_UNLOCK (up->lock); +} + static void rspamd_upstream_set_inactive (struct upstream_list *ls, struct upstream *up) { @@ -432,6 +496,11 @@ rspamd_upstream_set_inactive (struct upstream_list *ls, struct upstream *up) REF_RETAIN (up); ntim = rspamd_time_jitter (ls->limits.revive_time, ls->limits.revive_jitter); + + if (ev_is_active (&up->ev)) { + ev_timer_stop (up->ctx->event_loop, &up->ev); + } + ev_timer_init (&up->ev, rspamd_upstream_revive_cb, ntim, 0); up->ev.data = up; @@ -583,6 +652,7 @@ rspamd_upstreams_create (struct upstream_ctx *ctx) ls->limits.dns_timeout = default_dns_timeout; ls->limits.revive_jitter = default_revive_jitter; ls->limits.revive_time = default_revive_time; + ls->limits.lazy_resolve_time = default_lazy_resolve_time; } return ls; @@ -620,6 +690,11 @@ rspamd_upstream_dtor (struct upstream *up) rspamd_mutex_free (up->lock); if (up->ctx) { + + if (ev_is_active (&up->ev)) { + ev_timer_stop (up->ctx->event_loop, &up->ev); + } + g_queue_delete_link (up->ctx->upstreams, up->ctx_pos); REF_RELEASE (up->ctx); } @@ -747,7 +822,6 @@ rspamd_upstreams_add_upstream (struct upstream_list *ups, const gchar *str, } g_ptr_array_sort (up->addrs.addr, rspamd_upstream_addr_sort_func); - rspamd_upstream_set_active (ups, up); return TRUE; @@ -913,7 +987,10 @@ rspamd_upstream_restore_cb (gpointer elt, gpointer ls) /* Here the upstreams list is already locked */ RSPAMD_UPSTREAM_LOCK (up->lock); - ev_timer_stop (up->ctx->event_loop, &up->ev); + if (ev_is_active (&up->ev)) { + ev_timer_stop (up->ctx->event_loop, &up->ev); + } + g_ptr_array_add (ups->alive, up); up->active_idx = ups->alive->len - 1; RSPAMD_UPSTREAM_UNLOCK (up->lock); -- 2.39.5