From: Vsevolod Stakhov Date: Fri, 26 Jul 2019 17:16:07 +0000 (+0100) Subject: [Feature] Upstreams: Set noresolve flag on numeric upstreams X-Git-Tag: 2.0~484 X-Git-Url: https://source.dussan.org/?a=commitdiff_plain;h=f3d197ff3d67e5435882dd4c79f4e56ef03c0f5d;p=rspamd.git [Feature] Upstreams: Set noresolve flag on numeric upstreams --- diff --git a/src/libutil/upstream.c b/src/libutil/upstream.c index 4cd39f5ea..d1da9f7d3 100644 --- a/src/libutil/upstream.c +++ b/src/libutil/upstream.c @@ -21,6 +21,7 @@ #include "rdns.h" #include "cryptobox.h" #include "utlist.h" +#include "logger.h" #include @@ -53,6 +54,7 @@ struct upstream { ev_timer ev; gdouble last_fail; gpointer ud; + enum rspamd_upstream_flag flags; struct upstream_list *ls; GList *ctx_pos; struct upstream_ctx *ctx; @@ -65,6 +67,7 @@ struct upstream { struct upstream_inet_addr_entry *new_addrs; rspamd_mutex_t *lock; gpointer data; + gchar uid[8]; ref_entry_t ref; }; @@ -86,8 +89,8 @@ struct upstream_list { rspamd_mutex_t *lock; guint64 hash_seed; struct upstream_limits limits; - guint cur_elt; enum rspamd_upstream_flag flags; + guint cur_elt; enum rspamd_upstream_rotation rot_alg; }; @@ -109,6 +112,13 @@ struct upstream_ctx { #define RSPAMD_UPSTREAM_UNLOCK(x) rspamd_mutex_unlock(x) #endif +#define msg_debug_upstream(...) rspamd_conditional_debug_fast (NULL, NULL, \ + rspamd_upstream_log_id, "upstream", upstream->uid, \ + G_STRFUNC, \ + __VA_ARGS__) + +INIT_LOG_MODULE(upstream) + /* 4 errors in 10 seconds */ static guint default_max_errors = 4; static gdouble default_revive_time = 60; @@ -139,6 +149,9 @@ rspamd_upstreams_library_config (struct rspamd_config *cfg, if (cfg->upstream_revive_time) { ctx->limits.revive_time = cfg->upstream_max_errors; } + if (cfg->upstream_lazy_resolve_time) { + ctx->limits.lazy_resolve_time = cfg->upstream_lazy_resolve_time; + } if (cfg->dns_retransmits) { ctx->limits.dns_retransmits = cfg->dns_retransmits; } @@ -153,20 +166,20 @@ rspamd_upstreams_library_config (struct rspamd_config *cfg, /* Start lazy resolving */ if (event_loop && resolver) { GList *cur; - struct upstream *u; + struct upstream *upstream; cur = ctx->upstreams->head; while (cur) { - u = cur->data; - if (!ev_is_active (&u->ev) && u->ls && - !(u->ls->flags & RSPAMD_UPSTREAM_FLAG_NORESOLVE)) { - gdouble when = rspamd_time_jitter (u->ls->limits.lazy_resolve_time, - u->ls->limits.lazy_resolve_time * .1); - ev_timer_init (&u->ev, rspamd_upstream_lazy_resolve_cb, + upstream = cur->data; + if (!ev_is_active (&upstream->ev) && upstream->ls && + !(upstream->flags & RSPAMD_UPSTREAM_FLAG_NORESOLVE)) { + gdouble when = rspamd_time_jitter (upstream->ls->limits.lazy_resolve_time, + upstream->ls->limits.lazy_resolve_time * .1); + ev_timer_init (&upstream->ev, rspamd_upstream_lazy_resolve_cb, when, 0); - u->ev.data = u; - ev_timer_start (ctx->event_loop, &u->ev); + upstream->ev.data = upstream; + ev_timer_start (ctx->event_loop, &upstream->ev); } cur = g_list_next (cur); @@ -259,24 +272,27 @@ rspamd_upstream_addr_sort_func (gconstpointer a, gconstpointer b) } static void -rspamd_upstream_set_active (struct upstream_list *ls, struct upstream *up) +rspamd_upstream_set_active (struct upstream_list *ls, struct upstream *upstream) { RSPAMD_UPSTREAM_LOCK (ls->lock); - g_ptr_array_add (ls->alive, up); - up->active_idx = ls->alive->len - 1; + g_ptr_array_add (ls->alive, upstream); + upstream->active_idx = ls->alive->len - 1; - if (up->ctx && up->ctx->configured && - !(ls->flags & RSPAMD_UPSTREAM_FLAG_NORESOLVE)) { + if (upstream->ctx && upstream->ctx->configured && + !(upstream->flags & RSPAMD_UPSTREAM_FLAG_NORESOLVE)) { - if (ev_is_active (&up->ev)) { - ev_timer_stop (up->ctx->event_loop, &up->ev); + if (ev_is_active (&upstream->ev)) { + ev_timer_stop (upstream->ctx->event_loop, &upstream->ev); } /* Start lazy names resolution */ gdouble when = rspamd_time_jitter (ls->limits.lazy_resolve_time, ls->limits.lazy_resolve_time * 0.1); - ev_timer_init (&up->ev, rspamd_upstream_lazy_resolve_cb, when, 0); - up->ev.data = up; - ev_timer_start (up->ctx->event_loop, &up->ev); + ev_timer_init (&upstream->ev, rspamd_upstream_lazy_resolve_cb, + when, 0); + upstream->ev.data = upstream; + msg_debug_upstream ("start lazy resolving for %s in %.0f seconds", + upstream->name, when); + ev_timer_start (upstream->ctx->event_loop, &upstream->ev); } RSPAMD_UPSTREAM_UNLOCK (ls->lock); @@ -294,7 +310,7 @@ rspamd_upstream_addr_elt_dtor (gpointer a) } static void -rspamd_upstream_update_addrs (struct upstream *up) +rspamd_upstream_update_addrs (struct upstream *upstream) { guint addr_cnt, i, port; gboolean seen_addr, reset_errors = FALSE; @@ -306,33 +322,35 @@ rspamd_upstream_update_addrs (struct upstream *up) * We need first of all get the saved port, since DNS gives us no * idea about what port has been used previously */ - RSPAMD_UPSTREAM_LOCK (up->lock); + RSPAMD_UPSTREAM_LOCK (upstream->lock); - if (up->addrs.addr->len > 0 && up->new_addrs) { - addr_elt = g_ptr_array_index (up->addrs.addr, 0); + if (upstream->addrs.addr->len > 0 && upstream->new_addrs) { + addr_elt = g_ptr_array_index (upstream->addrs.addr, 0); port = rspamd_inet_address_get_port (addr_elt->addr); /* Now calculate new addrs count */ addr_cnt = 0; - LL_FOREACH (up->new_addrs, cur) { + LL_FOREACH (upstream->new_addrs, cur) { addr_cnt++; } /* At 10% probability reset errors on addr elements */ if (rspamd_random_double_fast () > 0.9) { reset_errors = TRUE; + msg_debug_upstream ("reset errors on upstream %s", + upstream->name); } new_addrs = g_ptr_array_new_full (addr_cnt, rspamd_upstream_addr_elt_dtor); /* Copy addrs back */ - LL_FOREACH (up->new_addrs, cur) { + LL_FOREACH (upstream->new_addrs, cur) { seen_addr = FALSE; naddr = NULL; /* Ports are problematic, set to compare in the next block */ rspamd_inet_address_set_port (cur->addr, port); - PTR_ARRAY_FOREACH (up->addrs.addr, i, addr_elt) { + PTR_ARRAY_FOREACH (upstream->addrs.addr, i, addr_elt) { if (rspamd_inet_address_compare (addr_elt->addr, cur->addr, FALSE) == 0) { naddr = g_malloc0 (sizeof (*naddr)); naddr->addr = cur->addr; @@ -347,26 +365,34 @@ rspamd_upstream_update_addrs (struct upstream *up) naddr = g_malloc0 (sizeof (*naddr)); naddr->addr = cur->addr; naddr->errors = 0; + msg_debug_upstream ("new address for %s: %s", + upstream->name, + rspamd_inet_address_to_string_pretty (naddr->addr)); + } + else { + msg_debug_upstream ("existing address for %s: %s", + upstream->name, + rspamd_inet_address_to_string_pretty (cur->addr)); } g_ptr_array_add (new_addrs, naddr); } /* Free old addresses */ - g_ptr_array_free (up->addrs.addr, TRUE); + g_ptr_array_free (upstream->addrs.addr, TRUE); - up->addrs.cur = 0; - up->addrs.addr = new_addrs; - g_ptr_array_sort (up->addrs.addr, rspamd_upstream_addr_sort_func); + upstream->addrs.cur = 0; + upstream->addrs.addr = new_addrs; + g_ptr_array_sort (upstream->addrs.addr, rspamd_upstream_addr_sort_func); } - LL_FOREACH_SAFE (up->new_addrs, cur, tmp) { + LL_FOREACH_SAFE (upstream->new_addrs, cur, tmp) { /* Do not free inet address pointer since it has been transferred to up */ g_free (cur); } - up->new_addrs = NULL; - RSPAMD_UPSTREAM_UNLOCK (up->lock); + upstream->new_addrs = NULL; + RSPAMD_UPSTREAM_UNLOCK (upstream->lock); } static void @@ -412,17 +438,19 @@ rspamd_upstream_dns_cb (struct rdns_reply *reply, void *arg) static void rspamd_upstream_revive_cb (struct ev_loop *loop, ev_timer *w, int revents) { - struct upstream *up = (struct upstream *)w->data; + struct upstream *upstream = (struct upstream *)w->data; - RSPAMD_UPSTREAM_LOCK (up->lock); + RSPAMD_UPSTREAM_LOCK (upstream->lock); ev_timer_stop (loop, w); - if (up->ls) { - rspamd_upstream_set_active (up->ls, up); + msg_debug_upstream ("revive upstream %s", upstream->name); + + if (upstream->ls) { + rspamd_upstream_set_active (upstream->ls, upstream); } - RSPAMD_UPSTREAM_UNLOCK (up->lock); - REF_RELEASE (up); + RSPAMD_UPSTREAM_UNLOCK (upstream->lock); + REF_RELEASE (upstream); } static void @@ -432,7 +460,7 @@ rspamd_upstream_resolve_addrs (const struct upstream_list *ls, if (up->ctx->res != NULL && up->ctx->configured && up->dns_requests == 0 && - !(ls->flags & RSPAMD_UPSTREAM_FLAG_NORESOLVE)) { + !(up->flags & RSPAMD_UPSTREAM_FLAG_NORESOLVE)) { /* Resolve name of the upstream one more time */ if (up->name[0] != '/') { @@ -473,7 +501,7 @@ rspamd_upstream_lazy_resolve_cb (struct ev_loop *loop, ev_timer *w, int revents) } static void -rspamd_upstream_set_inactive (struct upstream_list *ls, struct upstream *up) +rspamd_upstream_set_inactive (struct upstream_list *ls, struct upstream *upstream) { gdouble ntim; guint i; @@ -481,8 +509,8 @@ rspamd_upstream_set_inactive (struct upstream_list *ls, struct upstream *up) struct upstream_list_watcher *w; RSPAMD_UPSTREAM_LOCK (ls->lock); - g_ptr_array_remove_index (ls->alive, up->active_idx); - up->active_idx = -1; + g_ptr_array_remove_index (ls->alive, upstream->active_idx); + upstream->active_idx = -1; /* We need to update all indicies */ for (i = 0; i < ls->alive->len; i ++) { @@ -490,28 +518,30 @@ rspamd_upstream_set_inactive (struct upstream_list *ls, struct upstream *up) cur->active_idx = i; } - if (up->ctx) { - rspamd_upstream_resolve_addrs (ls, up); + if (upstream->ctx) { + rspamd_upstream_resolve_addrs (ls, upstream); - REF_RETAIN (up); + REF_RETAIN (upstream); ntim = rspamd_time_jitter (ls->limits.revive_time, ls->limits.revive_jitter); - if (ev_is_active (&up->ev)) { - ev_timer_stop (up->ctx->event_loop, &up->ev); + if (ev_is_active (&upstream->ev)) { + ev_timer_stop (upstream->ctx->event_loop, &upstream->ev); } - ev_timer_init (&up->ev, rspamd_upstream_revive_cb, ntim, 0); - up->ev.data = up; + msg_debug_upstream ("mark upstream %s inactive; revive in %.0f seconds", + upstream->name, ntim); + ev_timer_init (&upstream->ev, rspamd_upstream_revive_cb, ntim, 0); + upstream->ev.data = upstream; - if (up->ctx->event_loop != NULL && up->ctx->configured) { - ev_timer_start (up->ctx->event_loop, &up->ev); + if (upstream->ctx->event_loop != NULL && upstream->ctx->configured) { + ev_timer_start (upstream->ctx->event_loop, &upstream->ev); } } - DL_FOREACH (up->ls->watchers, w) { + DL_FOREACH (upstream->ls->watchers, w) { if (w->events_mask & RSPAMD_UPSTREAM_WATCH_OFFLINE) { - w->func (up, RSPAMD_UPSTREAM_WATCH_OFFLINE, up->errors, w->ud); + w->func (upstream, RSPAMD_UPSTREAM_WATCH_OFFLINE, upstream->errors, w->ud); } } @@ -740,32 +770,30 @@ rspamd_upstreams_add_upstream (struct upstream_list *ups, const gchar *str, guint16 def_port, enum rspamd_upstream_parse_type parse_type, void *data) { - struct upstream *up; + struct upstream *upstream; GPtrArray *addrs = NULL; guint i; rspamd_inet_addr_t *addr; - gboolean ret = FALSE; + enum rspamd_parse_host_port_result ret = RSPAMD_PARSE_ADDR_FAIL; - up = g_malloc0 (sizeof (*up)); + upstream = g_malloc0 (sizeof (*upstream)); switch (parse_type) { case RSPAMD_UPSTREAM_PARSE_DEFAULT: ret = rspamd_parse_host_port_priority (str, &addrs, - &up->weight, - &up->name, def_port, ups->ctx ? ups->ctx->pool : NULL); + &upstream->weight, + &upstream->name, def_port, + ups->ctx ? ups->ctx->pool : NULL); break; case RSPAMD_UPSTREAM_PARSE_NAMESERVER: addrs = g_ptr_array_sized_new (1); - ret = rspamd_parse_inet_address (&addr, str, strlen (str)); - - if (ups->ctx) { - up->name = rspamd_mempool_strdup (ups->ctx->pool, str); - } - else { - up->name = g_strdup (str); - } - - if (ret) { + if (rspamd_parse_inet_address (&addr, str, strlen (str))) { + if (ups->ctx) { + upstream->name = rspamd_mempool_strdup (ups->ctx->pool, str); + } + else { + upstream->name = g_strdup (str); + } if (rspamd_inet_address_get_port (addr) == 0) { rspamd_inet_address_set_port (addr, def_port); } @@ -788,41 +816,55 @@ rspamd_upstreams_add_upstream (struct upstream_list *ups, const gchar *str, break; } - if (!ret) { - g_free (up); + if (ret == RSPAMD_PARSE_ADDR_FAIL) { + g_free (upstream); return FALSE; } else { + upstream->flags = ups->flags; + + if (ret == RSPAMD_PARSE_ADDR_NUMERIC) { + /* Add noresolve flag */ + upstream->flags |= RSPAMD_UPSTREAM_FLAG_NORESOLVE; + } for (i = 0; i < addrs->len; i ++) { addr = g_ptr_array_index (addrs, i); - rspamd_upstream_add_addr (up, rspamd_inet_address_copy (addr)); + rspamd_upstream_add_addr (upstream, rspamd_inet_address_copy (addr)); } } - if (up->weight == 0 && ups->rot_alg == RSPAMD_UPSTREAM_MASTER_SLAVE) { + if (upstream->weight == 0 && ups->rot_alg == RSPAMD_UPSTREAM_MASTER_SLAVE) { /* Special heuristic for master-slave rotation */ if (ups->ups->len == 0) { /* Prioritize the first */ - up->weight = 1; + upstream->weight = 1; } } - g_ptr_array_add (ups->ups, up); - up->ud = data; - up->cur_weight = up->weight; - up->ls = ups; - REF_INIT_RETAIN (up, rspamd_upstream_dtor); - up->lock = rspamd_mutex_new (); - up->ctx = ups->ctx; + g_ptr_array_add (ups->ups, upstream); + upstream->ud = data; + upstream->cur_weight = upstream->weight; + upstream->ls = ups; + REF_INIT_RETAIN (upstream, rspamd_upstream_dtor); + upstream->lock = rspamd_mutex_new (); + upstream->ctx = ups->ctx; - if (up->ctx) { + if (upstream->ctx) { REF_RETAIN (ups->ctx); - g_queue_push_tail (ups->ctx->upstreams, up); - up->ctx_pos = g_queue_peek_tail_link (ups->ctx->upstreams); + g_queue_push_tail (ups->ctx->upstreams, upstream); + upstream->ctx_pos = g_queue_peek_tail_link (ups->ctx->upstreams); } - g_ptr_array_sort (up->addrs.addr, rspamd_upstream_addr_sort_func); - rspamd_upstream_set_active (ups, up); + guint h = rspamd_cryptobox_fast_hash (upstream->name, + strlen (upstream->name), 0); + memset (upstream->uid, 0, sizeof (upstream->uid)); + rspamd_encode_base32_buf ((const guchar *)&h, sizeof (h), + upstream->uid, sizeof (upstream->uid) - 1); + + msg_debug_upstream ("added upstream %s (%s)", upstream->name, + upstream->flags & RSPAMD_UPSTREAM_FLAG_NORESOLVE ? "numeric ip" : "DNS name"); + g_ptr_array_sort (upstream->addrs.addr, rspamd_upstream_addr_sort_func); + rspamd_upstream_set_active (ups, upstream); return TRUE; }