]> source.dussan.org Git - rspamd.git/commitdiff
[Feature] Upstreams: Add lazy resolving logic to all upstreams
authorVsevolod Stakhov <vsevolod@highsecure.ru>
Fri, 26 Jul 2019 16:26:23 +0000 (17:26 +0100)
committerVsevolod Stakhov <vsevolod@highsecure.ru>
Fri, 26 Jul 2019 16:26:23 +0000 (17:26 +0100)
src/libutil/upstream.c

index c445751b44afd49f90bc33e9bfd3abb42f90921a..4cd39f5ea43d5c591f54bf25796f546b933c77d7 100644 (file)
@@ -73,6 +73,7 @@ struct upstream_limits {
        gdouble revive_jitter;
        gdouble error_time;
        gdouble dns_timeout;
+       gdouble lazy_resolve_time;
        guint max_errors;
        guint dns_retransmits;
 };
@@ -115,6 +116,10 @@ static gdouble default_revive_jitter = 0.4;
 static gdouble default_error_time = 10;
 static gdouble default_dns_timeout = 1.0;
 static guint default_dns_retransmits = 2;
+/* TODO: make it configurable */
+static gdouble default_lazy_resolve_time = 3600.0;
+
+static void rspamd_upstream_lazy_resolve_cb (struct ev_loop *, ev_timer *, int );
 
 void
 rspamd_upstreams_library_config (struct rspamd_config *cfg,
@@ -144,6 +149,29 @@ rspamd_upstreams_library_config (struct rspamd_config *cfg,
        ctx->event_loop = event_loop;
        ctx->res = resolver;
        ctx->configured = TRUE;
+
+       /* Start lazy resolving */
+       if (event_loop && resolver) {
+               GList *cur;
+               struct upstream *u;
+
+               cur = ctx->upstreams->head;
+
+               while (cur) {
+                       u = cur->data;
+                       if (!ev_is_active (&u->ev) && u->ls &&
+                               !(u->ls->flags & RSPAMD_UPSTREAM_FLAG_NORESOLVE)) {
+                               gdouble when = rspamd_time_jitter (u->ls->limits.lazy_resolve_time,
+                                               u->ls->limits.lazy_resolve_time * .1);
+                               ev_timer_init (&u->ev, rspamd_upstream_lazy_resolve_cb,
+                                               when, 0);
+                               u->ev.data = u;
+                               ev_timer_start (ctx->event_loop, &u->ev);
+                       }
+
+                       cur = g_list_next (cur);
+               }
+       }
 }
 
 static void
@@ -184,6 +212,7 @@ rspamd_upstreams_library_init (void)
        ctx->limits.dns_timeout = default_dns_timeout;
        ctx->limits.revive_jitter = default_revive_jitter;
        ctx->limits.revive_time = default_revive_time;
+       ctx->limits.lazy_resolve_time = default_lazy_resolve_time;
        ctx->pool = rspamd_mempool_new (rspamd_mempool_suggest_size (),
                        "upstreams");
 
@@ -235,6 +264,21 @@ rspamd_upstream_set_active (struct upstream_list *ls, struct upstream *up)
        RSPAMD_UPSTREAM_LOCK (ls->lock);
        g_ptr_array_add (ls->alive, up);
        up->active_idx = ls->alive->len - 1;
+
+       if (up->ctx && up->ctx->configured &&
+               !(ls->flags & RSPAMD_UPSTREAM_FLAG_NORESOLVE)) {
+
+               if (ev_is_active (&up->ev)) {
+                       ev_timer_stop (up->ctx->event_loop, &up->ev);
+               }
+               /* Start lazy names resolution */
+               gdouble when = rspamd_time_jitter (ls->limits.lazy_resolve_time,
+                               ls->limits.lazy_resolve_time * 0.1);
+               ev_timer_init (&up->ev, rspamd_upstream_lazy_resolve_cb, when, 0);
+               up->ev.data = up;
+               ev_timer_start (up->ctx->event_loop, &up->ev);
+       }
+
        RSPAMD_UPSTREAM_UNLOCK (ls->lock);
 }
 
@@ -372,6 +416,7 @@ rspamd_upstream_revive_cb (struct ev_loop *loop, ev_timer *w, int revents)
 
        RSPAMD_UPSTREAM_LOCK (up->lock);
        ev_timer_stop (loop, w);
+
        if (up->ls) {
                rspamd_upstream_set_active (up->ls, up);
        }
@@ -408,6 +453,25 @@ rspamd_upstream_resolve_addrs (const struct upstream_list *ls,
        }
 }
 
+static void
+rspamd_upstream_lazy_resolve_cb (struct ev_loop *loop, ev_timer *w, int revents)
+{
+       struct upstream *up = (struct upstream *)w->data;
+
+       RSPAMD_UPSTREAM_LOCK (up->lock);
+       ev_timer_stop (loop, w);
+
+       if (up->ls) {
+               rspamd_upstream_resolve_addrs (up->ls, up);
+
+               w->repeat = rspamd_time_jitter (up->ls->limits.lazy_resolve_time,
+                               up->ls->limits.lazy_resolve_time * .1);
+               ev_timer_again (loop, w);
+       }
+
+       RSPAMD_UPSTREAM_UNLOCK (up->lock);
+}
+
 static void
 rspamd_upstream_set_inactive (struct upstream_list *ls, struct upstream *up)
 {
@@ -432,6 +496,11 @@ rspamd_upstream_set_inactive (struct upstream_list *ls, struct upstream *up)
                REF_RETAIN (up);
                ntim = rspamd_time_jitter (ls->limits.revive_time,
                                ls->limits.revive_jitter);
+
+               if (ev_is_active (&up->ev)) {
+                       ev_timer_stop (up->ctx->event_loop, &up->ev);
+               }
+
                ev_timer_init (&up->ev, rspamd_upstream_revive_cb, ntim, 0);
                up->ev.data = up;
 
@@ -583,6 +652,7 @@ rspamd_upstreams_create (struct upstream_ctx *ctx)
                ls->limits.dns_timeout = default_dns_timeout;
                ls->limits.revive_jitter = default_revive_jitter;
                ls->limits.revive_time = default_revive_time;
+               ls->limits.lazy_resolve_time = default_lazy_resolve_time;
        }
 
        return ls;
@@ -620,6 +690,11 @@ rspamd_upstream_dtor (struct upstream *up)
        rspamd_mutex_free (up->lock);
 
        if (up->ctx) {
+
+               if (ev_is_active (&up->ev)) {
+                       ev_timer_stop (up->ctx->event_loop, &up->ev);
+               }
+
                g_queue_delete_link (up->ctx->upstreams, up->ctx_pos);
                REF_RELEASE (up->ctx);
        }
@@ -747,7 +822,6 @@ rspamd_upstreams_add_upstream (struct upstream_list *ups, const gchar *str,
        }
 
        g_ptr_array_sort (up->addrs.addr, rspamd_upstream_addr_sort_func);
-
        rspamd_upstream_set_active (ups, up);
 
        return TRUE;
@@ -913,7 +987,10 @@ rspamd_upstream_restore_cb (gpointer elt, gpointer ls)
        /* Here the upstreams list is already locked */
        RSPAMD_UPSTREAM_LOCK (up->lock);
 
-       ev_timer_stop (up->ctx->event_loop, &up->ev);
+       if (ev_is_active (&up->ev)) {
+               ev_timer_stop (up->ctx->event_loop, &up->ev);
+       }
+
        g_ptr_array_add (ups->alive, up);
        up->active_idx = ups->alive->len - 1;
        RSPAMD_UPSTREAM_UNLOCK (up->lock);