From b3eb4d1800eee3527772f26201f6a8a6a3a56022 Mon Sep 17 00:00:00 2001 From: Vsevolod Stakhov Date: Tue, 1 Oct 2019 17:40:26 +0100 Subject: [PATCH] [Project] Start SRV upstreams implementation --- src/libutil/map.c | 4 +- src/libutil/upstream.c | 251 ++++++++++++++++++++++++++++++++++++----- src/libutil/upstream.h | 1 + 3 files changed, 229 insertions(+), 27 deletions(-) diff --git a/src/libutil/map.c b/src/libutil/map.c index 4f0e2354c..42134921a 100644 --- a/src/libutil/map.c +++ b/src/libutil/map.c @@ -970,7 +970,9 @@ rspamd_map_periodic_dtor (struct map_periodic_cbdata *periodic) } if (periodic->locked) { - rspamd_map_schedule_periodic (periodic->map, FALSE, FALSE, FALSE); + if (!periodic->map->wrk->wanna_die) { + rspamd_map_schedule_periodic (periodic->map, FALSE, FALSE, FALSE); + } g_atomic_int_set (periodic->map->locked, 0); msg_debug_map ("unlocked map"); } diff --git a/src/libutil/upstream.c b/src/libutil/upstream.c index e2bfd94d9..22e5200c7 100644 --- a/src/libutil/upstream.c +++ b/src/libutil/upstream.c @@ -24,6 +24,7 @@ #include "logger.h" #include +#include struct upstream_inet_addr_entry { rspamd_inet_addr_t *addr; @@ -32,6 +33,7 @@ struct upstream_inet_addr_entry { struct upstream_addr_elt { rspamd_inet_addr_t *addr; + guint priority; guint errors; }; @@ -174,8 +176,17 @@ rspamd_upstreams_library_config (struct rspamd_config *cfg, upstream = cur->data; if (!ev_can_stop (&upstream->ev) && upstream->ls && !(upstream->flags & RSPAMD_UPSTREAM_FLAG_NORESOLVE)) { - gdouble when = rspamd_time_jitter (upstream->ls->limits.lazy_resolve_time, - upstream->ls->limits.lazy_resolve_time * .1); + gdouble when; + + if (upstream->flags & RSPAMD_UPSTREAM_FLAG_SRV_RESOLVE) { + /* Resolve them immediately ! */ + when = 0.0; + } + else { + when = rspamd_time_jitter (upstream->ls->limits.lazy_resolve_time, + upstream->ls->limits.lazy_resolve_time * .1); + } + ev_timer_init (&upstream->ev, rspamd_upstream_lazy_resolve_cb, when, 0); upstream->ev.data = upstream; @@ -261,13 +272,20 @@ rspamd_upstream_af_to_weight (const rspamd_inet_addr_t *addr) static gint rspamd_upstream_addr_sort_func (gconstpointer a, gconstpointer b) { - const struct upstream_addr_elt **ip1 = (const struct upstream_addr_elt **)a, - **ip2 = (const struct upstream_addr_elt **)b; + const struct upstream_addr_elt *ip1 = *(const struct upstream_addr_elt **)a, + *ip2 = *(const struct upstream_addr_elt **)b; gint w1, w2; - w1 = rspamd_upstream_af_to_weight ((*ip1)->addr); - w2 = rspamd_upstream_af_to_weight ((*ip2)->addr); + if (ip1->priority == 0 && ip2->priority == 0) { + w1 = rspamd_upstream_af_to_weight (ip1->addr); + w2 = rspamd_upstream_af_to_weight (ip2->addr); + } + else { + w1 = ip1->priority; + w2 = ip2->priority; + } + /* Inverse order */ return w2 - w1; } @@ -284,9 +302,18 @@ rspamd_upstream_set_active (struct upstream_list *ls, struct upstream *upstream) if (ev_can_stop (&upstream->ev)) { ev_timer_stop (upstream->ctx->event_loop, &upstream->ev); } - /* Start lazy names resolution */ - gdouble when = rspamd_time_jitter (ls->limits.lazy_resolve_time, - ls->limits.lazy_resolve_time * 0.1); + + /* Start lazy (or not so lazy) names resolution */ + gdouble when; + + if (upstream->flags & RSPAMD_UPSTREAM_FLAG_SRV_RESOLVE) { + /* Resolve them immediately ! */ + when = 0.0; + } + else { + when = rspamd_time_jitter (upstream->ls->limits.lazy_resolve_time, + upstream->ls->limits.lazy_resolve_time * .1); + } ev_timer_init (&upstream->ev, rspamd_upstream_lazy_resolve_cb, when, 0); upstream->ev.data = upstream; @@ -435,6 +462,122 @@ rspamd_upstream_dns_cb (struct rdns_reply *reply, void *arg) REF_RELEASE (up); } +struct rspamd_upstream_srv_dns_cb { + struct upstream *up; + guint ttl; + guint priority; + guint port; + guint requests_inflight; +}; + +/* Used when we have resolved SRV record and resolved addrs */ +static void +rspamd_upstream_dns_srv_phase2_cb (struct rdns_reply *reply, void *arg) +{ + struct rspamd_upstream_srv_dns_cb *cbdata = + (struct rspamd_upstream_srv_dns_cb *)arg; + struct upstream *up; + struct rdns_reply_entry *entry; + struct upstream_inet_addr_entry *up_ent; + + up = cbdata->up; + + if (reply->code == RDNS_RC_NOERROR) { + entry = reply->entries; + + RSPAMD_UPSTREAM_LOCK (up->lock); + while (entry) { + + if (entry->type == RDNS_REQUEST_A) { + up_ent = g_malloc0 (sizeof (*up_ent)); + up_ent->addr = rspamd_inet_address_new (AF_INET, + &entry->content.a.addr); + LL_PREPEND (up->new_addrs, up_ent); + } + else if (entry->type == RDNS_REQUEST_AAAA) { + up_ent = g_malloc0 (sizeof (*up_ent)); + up_ent->addr = rspamd_inet_address_new (AF_INET6, + &entry->content.aaa.addr); + LL_PREPEND (up->new_addrs, up_ent); + } + entry = entry->next; + } + + RSPAMD_UPSTREAM_UNLOCK (up->lock); + } + + up->dns_requests--; + cbdata->requests_inflight --; + + if (cbdata->requests_inflight == 0) { + g_free (cbdata); + } + + if (up->dns_requests == 0) { + rspamd_upstream_update_addrs (up); + } + + REF_RELEASE (up); +} + +static void +rspamd_upstream_dns_srv_cb (struct rdns_reply *reply, void *arg) +{ + struct upstream *upstream = (struct upstream *) arg; + struct rdns_reply_entry *entry; + struct rspamd_upstream_srv_dns_cb *ncbdata; + + if (reply->code == RDNS_RC_NOERROR) { + entry = reply->entries; + + RSPAMD_UPSTREAM_LOCK (upstream->lock); + while (entry) { + /* XXX: we ignore weight as it contradicts with upstreams logic */ + if (entry->type == RDNS_REQUEST_SRV) { + msg_debug_upstream ("got srv reply for %s: %s " + "(weight=%d, priority=%d, port=%d)", + upstream->name, entry->content.srv.target, + entry->content.srv.weight, entry->content.srv.priority, + entry->content.srv.port); + ncbdata = g_malloc0 (sizeof (*ncbdata)); + ncbdata->priority = entry->content.srv.weight; + ncbdata->port = entry->content.srv.port; + ncbdata->ttl = entry->ttl; + + if (rdns_make_request_full (upstream->ctx->res, + rspamd_upstream_dns_srv_phase2_cb, ncbdata, + upstream->ls->limits.dns_timeout, + upstream->ls->limits.dns_retransmits, + 1, entry->content.srv.target, RDNS_REQUEST_A) != NULL) { + upstream->dns_requests++; + REF_RETAIN (upstream); + ncbdata->requests_inflight ++; + } + + if (rdns_make_request_full (upstream->ctx->res, + rspamd_upstream_dns_srv_phase2_cb, ncbdata, + upstream->ls->limits.dns_timeout, + upstream->ls->limits.dns_retransmits, + 1, entry->content.srv.target, RDNS_REQUEST_AAAA) != NULL) { + upstream->dns_requests++; + REF_RETAIN (upstream); + ncbdata->requests_inflight ++; + } + + if (ncbdata->requests_inflight == 0) { + g_free (ncbdata); + } + } + entry = entry->next; + } + + RSPAMD_UPSTREAM_UNLOCK (upstream->lock); + } + + upstream->dns_requests--; + REF_RELEASE (upstream); +} + static void rspamd_upstream_revive_cb (struct ev_loop *loop, ev_timer *w, int revents) { @@ -464,19 +607,31 @@ rspamd_upstream_resolve_addrs (const struct upstream_list *ls, !(up->flags & RSPAMD_UPSTREAM_FLAG_NORESOLVE)) { /* Resolve name of the upstream one more time */ if (up->name[0] != '/') { - - if (rdns_make_request_full (up->ctx->res, rspamd_upstream_dns_cb, up, - ls->limits.dns_timeout, ls->limits.dns_retransmits, - 1, up->name, RDNS_REQUEST_A) != NULL) { - up->dns_requests ++; - REF_RETAIN (up); + if (up->flags & RSPAMD_UPSTREAM_FLAG_SRV_RESOLVE) { + if (rdns_make_request_full (up->ctx->res, + rspamd_upstream_dns_srv_cb, up, + ls->limits.dns_timeout, ls->limits.dns_retransmits, + 1, up->name, RDNS_REQUEST_SRV) != NULL) { + up->dns_requests++; + REF_RETAIN (up); + } } + else { + if (rdns_make_request_full (up->ctx->res, + rspamd_upstream_dns_cb, up, + ls->limits.dns_timeout, ls->limits.dns_retransmits, + 1, up->name, RDNS_REQUEST_A) != NULL) { + up->dns_requests++; + REF_RETAIN (up); + } - if (rdns_make_request_full (up->ctx->res, rspamd_upstream_dns_cb, up, - ls->limits.dns_timeout, ls->limits.dns_retransmits, - 1, up->name, RDNS_REQUEST_AAAA) != NULL) { - up->dns_requests ++; - REF_RETAIN (up); + if (rdns_make_request_full (up->ctx->res, + rspamd_upstream_dns_cb, up, + ls->limits.dns_timeout, ls->limits.dns_retransmits, + 1, up->name, RDNS_REQUEST_AAAA) != NULL) { + up->dns_requests++; + REF_RETAIN (up); + } } } } @@ -491,6 +646,7 @@ rspamd_upstream_lazy_resolve_cb (struct ev_loop *loop, ev_timer *w, int revents) ev_timer_stop (loop, w); if (up->ls) { + rspamd_upstream_resolve_addrs (up->ls, up); w->repeat = rspamd_time_jitter (up->ls->limits.lazy_resolve_time, @@ -773,18 +929,61 @@ rspamd_upstreams_add_upstream (struct upstream_list *ups, const gchar *str, { struct upstream *upstream; GPtrArray *addrs = NULL; - guint i; + guint i, slen; rspamd_inet_addr_t *addr; enum rspamd_parse_host_port_result ret = RSPAMD_PARSE_ADDR_FAIL; upstream = g_malloc0 (sizeof (*upstream)); + slen = strlen (str); switch (parse_type) { case RSPAMD_UPSTREAM_PARSE_DEFAULT: - ret = rspamd_parse_host_port_priority (str, &addrs, - &upstream->weight, - &upstream->name, def_port, - ups->ctx ? ups->ctx->pool : NULL); + if (slen > sizeof ("service=") && + RSPAMD_LEN_CHECK_STARTS_WITH (str, slen, "service=")) { + const gchar *plus_pos, *service_pos, *semicolon_pos; + + /* Accept service=srv_name+hostname[:priority] */ + service_pos = str + sizeof ("service=") - 1; + plus_pos = strchr (service_pos, '+'); + + if (plus_pos != NULL) { + semicolon_pos = strchr (plus_pos + 1, ':'); + + if (semicolon_pos) { + upstream->weight = strtoul (semicolon_pos + 1, NULL, 10); + } + else { + semicolon_pos = plus_pos + strlen (plus_pos); + } + + /* + * Now our name is _service._tcp. + * where is string between semicolon_pos and plus_pos +1 + * while service is a string between service_pos and plus_pos + */ + guint namelen = (semicolon_pos - (plus_pos + 1)) + + (plus_pos - service_pos) + + (sizeof ("tcp") - 1) + + 4; + addrs = g_ptr_array_sized_new (1); + upstream->name = ups->ctx ? + rspamd_mempool_alloc (ups->ctx->pool, namelen + 1) : + g_malloc (namelen + 1); + + rspamd_snprintf (upstream->name, namelen + 1, + "_%*s._tcp.%*s", + (gint)(plus_pos - service_pos), service_pos, + (gint)(semicolon_pos - (plus_pos + 1)), plus_pos + 1); + upstream->flags |= RSPAMD_UPSTREAM_FLAG_SRV_RESOLVE; + ret = RSPAMD_PARSE_ADDR_RESOLVED; + } + } + else { + ret = rspamd_parse_host_port_priority (str, &addrs, + &upstream->weight, + &upstream->name, def_port, + ups->ctx ? ups->ctx->pool : NULL); + } break; case RSPAMD_UPSTREAM_PARSE_NAMESERVER: addrs = g_ptr_array_sized_new (1); @@ -824,7 +1023,7 @@ rspamd_upstreams_add_upstream (struct upstream_list *ups, const gchar *str, return FALSE; } else { - upstream->flags = ups->flags; + upstream->flags |= ups->flags; if (ret == RSPAMD_PARSE_ADDR_NUMERIC) { /* Add noresolve flag */ diff --git a/src/libutil/upstream.h b/src/libutil/upstream.h index 04ec6d984..5d3e00514 100644 --- a/src/libutil/upstream.h +++ b/src/libutil/upstream.h @@ -21,6 +21,7 @@ enum rspamd_upstream_rotation { enum rspamd_upstream_flag { RSPAMD_UPSTREAM_FLAG_NORESOLVE = (1 << 0), + RSPAMD_UPSTREAM_FLAG_SRV_RESOLVE = (1 << 1), }; struct rspamd_config; -- 2.39.5