]> source.dussan.org Git - rspamd.git/commitdiff
[Project] Start SRV upstreams implementation
authorVsevolod Stakhov <vsevolod@highsecure.ru>
Tue, 1 Oct 2019 16:40:26 +0000 (17:40 +0100)
committerVsevolod Stakhov <vsevolod@highsecure.ru>
Tue, 1 Oct 2019 16:40:26 +0000 (17:40 +0100)
src/libutil/map.c
src/libutil/upstream.c
src/libutil/upstream.h

index 4f0e2354cf9ba6ae5aaa1ba2026ebfd046ea0ea6..42134921aee474fbec19491e32a2bb183df605de 100644 (file)
@@ -970,7 +970,9 @@ rspamd_map_periodic_dtor (struct map_periodic_cbdata *periodic)
        }
 
        if (periodic->locked) {
-               rspamd_map_schedule_periodic (periodic->map, FALSE, FALSE, FALSE);
+               if (!periodic->map->wrk->wanna_die) {
+                       rspamd_map_schedule_periodic (periodic->map, FALSE, FALSE, FALSE);
+               }
                g_atomic_int_set (periodic->map->locked, 0);
                msg_debug_map ("unlocked map");
        }
index e2bfd94d993d1454a3777e09890d468188efb4d6..22e5200c7c1410c85e9cb5ec69576901f0f291a2 100644 (file)
@@ -24,6 +24,7 @@
 #include "logger.h"
 
 #include <math.h>
+#include <contrib/librdns/rdns.h>
 
 struct upstream_inet_addr_entry {
        rspamd_inet_addr_t *addr;
@@ -32,6 +33,7 @@ struct upstream_inet_addr_entry {
 
 struct upstream_addr_elt {
        rspamd_inet_addr_t *addr;
+       guint priority;
        guint errors;
 };
 
@@ -174,8 +176,17 @@ rspamd_upstreams_library_config (struct rspamd_config *cfg,
                        upstream = cur->data;
                        if (!ev_can_stop (&upstream->ev) && upstream->ls &&
                                                !(upstream->flags & RSPAMD_UPSTREAM_FLAG_NORESOLVE)) {
-                               gdouble when = rspamd_time_jitter (upstream->ls->limits.lazy_resolve_time,
-                                               upstream->ls->limits.lazy_resolve_time * .1);
+                               gdouble when;
+
+                               if (upstream->flags & RSPAMD_UPSTREAM_FLAG_SRV_RESOLVE) {
+                                       /* Resolve them immediately ! */
+                                       when = 0.0;
+                               }
+                               else {
+                                       when = rspamd_time_jitter (upstream->ls->limits.lazy_resolve_time,
+                                                       upstream->ls->limits.lazy_resolve_time * .1);
+                               }
+
                                ev_timer_init (&upstream->ev, rspamd_upstream_lazy_resolve_cb,
                                                when, 0);
                                upstream->ev.data = upstream;
@@ -261,13 +272,20 @@ rspamd_upstream_af_to_weight (const rspamd_inet_addr_t *addr)
 static gint
 rspamd_upstream_addr_sort_func (gconstpointer a, gconstpointer b)
 {
-       const struct upstream_addr_elt **ip1 = (const struct upstream_addr_elt **)a,
-                       **ip2 = (const struct upstream_addr_elt **)b;
+       const struct upstream_addr_elt *ip1 = *(const struct upstream_addr_elt **)a,
+                       *ip2 = *(const struct upstream_addr_elt **)b;
        gint w1, w2;
 
-       w1 = rspamd_upstream_af_to_weight ((*ip1)->addr);
-       w2 = rspamd_upstream_af_to_weight ((*ip2)->addr);
+       if (ip1->priority == 0 && ip2->priority == 0) {
+               w1 = rspamd_upstream_af_to_weight (ip1->addr);
+               w2 = rspamd_upstream_af_to_weight (ip2->addr);
+       }
+       else {
+               w1 = ip1->priority;
+               w2 = ip2->priority;
+       }
 
+       /* Inverse order */
        return w2 - w1;
 }
 
@@ -284,9 +302,18 @@ rspamd_upstream_set_active (struct upstream_list *ls, struct upstream *upstream)
                if (ev_can_stop (&upstream->ev)) {
                        ev_timer_stop (upstream->ctx->event_loop, &upstream->ev);
                }
-               /* Start lazy names resolution */
-               gdouble when = rspamd_time_jitter (ls->limits.lazy_resolve_time,
-                               ls->limits.lazy_resolve_time * 0.1);
+
+               /* Start lazy (or not so lazy) names resolution */
+               gdouble when;
+
+               if (upstream->flags & RSPAMD_UPSTREAM_FLAG_SRV_RESOLVE) {
+                       /* Resolve them immediately ! */
+                       when = 0.0;
+               }
+               else {
+                       when = rspamd_time_jitter (upstream->ls->limits.lazy_resolve_time,
+                                       upstream->ls->limits.lazy_resolve_time * .1);
+               }
                ev_timer_init (&upstream->ev, rspamd_upstream_lazy_resolve_cb,
                                when, 0);
                upstream->ev.data = upstream;
@@ -435,6 +462,122 @@ rspamd_upstream_dns_cb (struct rdns_reply *reply, void *arg)
        REF_RELEASE (up);
 }
 
+struct rspamd_upstream_srv_dns_cb {
+       struct upstream *up;
+       guint ttl;
+       guint priority;
+       guint port;
+       guint requests_inflight;
+};
+
+/* Used when we have resolved SRV record and resolved addrs */
+static void
+rspamd_upstream_dns_srv_phase2_cb (struct rdns_reply *reply, void *arg)
+{
+       struct rspamd_upstream_srv_dns_cb *cbdata =
+                       (struct rspamd_upstream_srv_dns_cb *)arg;
+       struct upstream *up;
+       struct rdns_reply_entry *entry;
+       struct upstream_inet_addr_entry *up_ent;
+
+       up = cbdata->up;
+
+       if (reply->code == RDNS_RC_NOERROR) {
+               entry = reply->entries;
+
+               RSPAMD_UPSTREAM_LOCK (up->lock);
+               while (entry) {
+
+                       if (entry->type == RDNS_REQUEST_A) {
+                               up_ent = g_malloc0 (sizeof (*up_ent));
+                               up_ent->addr = rspamd_inet_address_new (AF_INET,
+                                               &entry->content.a.addr);
+                               LL_PREPEND (up->new_addrs, up_ent);
+                       }
+                       else if (entry->type == RDNS_REQUEST_AAAA) {
+                               up_ent = g_malloc0 (sizeof (*up_ent));
+                               up_ent->addr = rspamd_inet_address_new (AF_INET6,
+                                               &entry->content.aaa.addr);
+                               LL_PREPEND (up->new_addrs, up_ent);
+                       }
+                       entry = entry->next;
+               }
+
+               RSPAMD_UPSTREAM_UNLOCK (up->lock);
+       }
+
+       up->dns_requests--;
+       cbdata->requests_inflight --;
+
+       if (cbdata->requests_inflight == 0) {
+               g_free (cbdata);
+       }
+
+       if (up->dns_requests == 0) {
+               rspamd_upstream_update_addrs (up);
+       }
+
+       REF_RELEASE (up);
+}
+
+static void
+rspamd_upstream_dns_srv_cb (struct rdns_reply *reply, void *arg)
+{
+       struct upstream *upstream = (struct upstream *) arg;
+       struct rdns_reply_entry *entry;
+       struct rspamd_upstream_srv_dns_cb *ncbdata;
+
+       if (reply->code == RDNS_RC_NOERROR) {
+               entry = reply->entries;
+
+               RSPAMD_UPSTREAM_LOCK (upstream->lock);
+               while (entry) {
+                       /* XXX: we ignore weight as it contradicts with upstreams logic */
+                       if (entry->type == RDNS_REQUEST_SRV) {
+                               msg_debug_upstream ("got srv reply for %s: %s "
+                                               "(weight=%d, priority=%d, port=%d)",
+                                               upstream->name, entry->content.srv.target,
+                                               entry->content.srv.weight, entry->content.srv.priority,
+                                               entry->content.srv.port);
+                               ncbdata = g_malloc0 (sizeof (*ncbdata));
+                               ncbdata->priority = entry->content.srv.weight;
+                               ncbdata->port = entry->content.srv.port;
+                               ncbdata->ttl = entry->ttl;
+
+                               if (rdns_make_request_full (upstream->ctx->res,
+                                               rspamd_upstream_dns_srv_phase2_cb, ncbdata,
+                                               upstream->ls->limits.dns_timeout,
+                                               upstream->ls->limits.dns_retransmits,
+                                               1, entry->content.srv.target, RDNS_REQUEST_A) != NULL) {
+                                       upstream->dns_requests++;
+                                       REF_RETAIN (upstream);
+                                       ncbdata->requests_inflight ++;
+                               }
+
+                               if (rdns_make_request_full (upstream->ctx->res,
+                                               rspamd_upstream_dns_srv_phase2_cb, ncbdata,
+                                               upstream->ls->limits.dns_timeout,
+                                               upstream->ls->limits.dns_retransmits,
+                                               1, entry->content.srv.target, RDNS_REQUEST_AAAA) != NULL) {
+                                       upstream->dns_requests++;
+                                       REF_RETAIN (upstream);
+                                       ncbdata->requests_inflight ++;
+                               }
+
+                               if (ncbdata->requests_inflight == 0) {
+                                       g_free (ncbdata);
+                               }
+                       }
+                       entry = entry->next;
+               }
+
+               RSPAMD_UPSTREAM_UNLOCK (upstream->lock);
+       }
+
+       upstream->dns_requests--;
+       REF_RELEASE (upstream);
+}
+
 static void
 rspamd_upstream_revive_cb (struct ev_loop *loop, ev_timer *w, int revents)
 {
@@ -464,19 +607,31 @@ rspamd_upstream_resolve_addrs (const struct upstream_list *ls,
                        !(up->flags & RSPAMD_UPSTREAM_FLAG_NORESOLVE)) {
                /* Resolve name of the upstream one more time */
                if (up->name[0] != '/') {
-
-                       if (rdns_make_request_full (up->ctx->res, rspamd_upstream_dns_cb, up,
-                                       ls->limits.dns_timeout, ls->limits.dns_retransmits,
-                                       1, up->name, RDNS_REQUEST_A) != NULL) {
-                               up->dns_requests ++;
-                               REF_RETAIN (up);
+                       if (up->flags & RSPAMD_UPSTREAM_FLAG_SRV_RESOLVE) {
+                               if (rdns_make_request_full (up->ctx->res,
+                                               rspamd_upstream_dns_srv_cb, up,
+                                               ls->limits.dns_timeout, ls->limits.dns_retransmits,
+                                               1, up->name, RDNS_REQUEST_SRV) != NULL) {
+                                       up->dns_requests++;
+                                       REF_RETAIN (up);
+                               }
                        }
+                       else {
+                               if (rdns_make_request_full (up->ctx->res,
+                                               rspamd_upstream_dns_cb, up,
+                                               ls->limits.dns_timeout, ls->limits.dns_retransmits,
+                                               1, up->name, RDNS_REQUEST_A) != NULL) {
+                                       up->dns_requests++;
+                                       REF_RETAIN (up);
+                               }
 
-                       if (rdns_make_request_full (up->ctx->res, rspamd_upstream_dns_cb, up,
-                                       ls->limits.dns_timeout, ls->limits.dns_retransmits,
-                                       1, up->name, RDNS_REQUEST_AAAA) != NULL) {
-                               up->dns_requests ++;
-                               REF_RETAIN (up);
+                               if (rdns_make_request_full (up->ctx->res,
+                                               rspamd_upstream_dns_cb, up,
+                                               ls->limits.dns_timeout, ls->limits.dns_retransmits,
+                                               1, up->name, RDNS_REQUEST_AAAA) != NULL) {
+                                       up->dns_requests++;
+                                       REF_RETAIN (up);
+                               }
                        }
                }
        }
@@ -491,6 +646,7 @@ rspamd_upstream_lazy_resolve_cb (struct ev_loop *loop, ev_timer *w, int revents)
        ev_timer_stop (loop, w);
 
        if (up->ls) {
+
                rspamd_upstream_resolve_addrs (up->ls, up);
 
                w->repeat = rspamd_time_jitter (up->ls->limits.lazy_resolve_time,
@@ -773,18 +929,61 @@ rspamd_upstreams_add_upstream (struct upstream_list *ups, const gchar *str,
 {
        struct upstream *upstream;
        GPtrArray *addrs = NULL;
-       guint i;
+       guint i, slen;
        rspamd_inet_addr_t *addr;
        enum rspamd_parse_host_port_result ret = RSPAMD_PARSE_ADDR_FAIL;
 
        upstream = g_malloc0 (sizeof (*upstream));
+       slen = strlen (str);
 
        switch (parse_type) {
        case RSPAMD_UPSTREAM_PARSE_DEFAULT:
-               ret = rspamd_parse_host_port_priority (str, &addrs,
-                               &upstream->weight,
-                               &upstream->name, def_port,
-                               ups->ctx ? ups->ctx->pool : NULL);
+               if (slen > sizeof ("service=") &&
+                       RSPAMD_LEN_CHECK_STARTS_WITH (str, slen, "service=")) {
+                       const gchar *plus_pos, *service_pos, *semicolon_pos;
+
+                       /* Accept service=srv_name+hostname[:priority] */
+                       service_pos = str + sizeof ("service=") - 1;
+                       plus_pos = strchr (service_pos, '+');
+
+                       if (plus_pos != NULL) {
+                               semicolon_pos = strchr (plus_pos + 1, ':');
+
+                               if (semicolon_pos) {
+                                       upstream->weight = strtoul (semicolon_pos + 1, NULL, 10);
+                               }
+                               else {
+                                       semicolon_pos = plus_pos + strlen (plus_pos);
+                               }
+
+                               /*
+                                * Now our name is _service._tcp.<domain>
+                                * where <domain> is string between semicolon_pos and plus_pos +1
+                                * while service is a string between service_pos and plus_pos
+                                */
+                               guint namelen = (semicolon_pos - (plus_pos + 1)) +
+                                               (plus_pos - service_pos) +
+                                               (sizeof ("tcp") - 1) +
+                                               4;
+                               addrs = g_ptr_array_sized_new (1);
+                               upstream->name = ups->ctx ?
+                                               rspamd_mempool_alloc (ups->ctx->pool, namelen + 1) :
+                                               g_malloc (namelen + 1);
+
+                               rspamd_snprintf (upstream->name, namelen + 1,
+                                               "_%*s._tcp.%*s",
+                                               (gint)(plus_pos - service_pos), service_pos,
+                                               (gint)(semicolon_pos - (plus_pos + 1)), plus_pos + 1);
+                               upstream->flags |= RSPAMD_UPSTREAM_FLAG_SRV_RESOLVE;
+                               ret = RSPAMD_PARSE_ADDR_RESOLVED;
+                       }
+               }
+               else {
+                       ret = rspamd_parse_host_port_priority (str, &addrs,
+                                       &upstream->weight,
+                                       &upstream->name, def_port,
+                                       ups->ctx ? ups->ctx->pool : NULL);
+               }
                break;
        case RSPAMD_UPSTREAM_PARSE_NAMESERVER:
                addrs = g_ptr_array_sized_new (1);
@@ -824,7 +1023,7 @@ rspamd_upstreams_add_upstream (struct upstream_list *ups, const gchar *str,
                return FALSE;
        }
        else {
-               upstream->flags = ups->flags;
+               upstream->flags |= ups->flags;
 
                if (ret == RSPAMD_PARSE_ADDR_NUMERIC) {
                        /* Add noresolve flag */
index 04ec6d9843bd4e08035cfc52444efec2a26df17d..5d3e00514f1b782c85b6ec1793a93917b82c2e26 100644 (file)
@@ -21,6 +21,7 @@ enum rspamd_upstream_rotation {
 
 enum rspamd_upstream_flag {
        RSPAMD_UPSTREAM_FLAG_NORESOLVE = (1 << 0),
+       RSPAMD_UPSTREAM_FLAG_SRV_RESOLVE = (1 << 1),
 };
 
 struct rspamd_config;