]> source.dussan.org Git - rspamd.git/commitdiff
Rework upstreams context
authorVsevolod Stakhov <vsevolod@highsecure.ru>
Fri, 20 Nov 2015 15:48:32 +0000 (15:48 +0000)
committerVsevolod Stakhov <vsevolod@highsecure.ru>
Fri, 20 Nov 2015 15:48:32 +0000 (15:48 +0000)
src/libutil/upstream.c
src/libutil/upstream.h

index 686e32160d2b6d5bab968557274e79b9cadbe537..2218220b688d72accfc33e0be64f077295b64fd7 100644 (file)
@@ -46,6 +46,8 @@ struct upstream {
        struct timeval tv;
        gpointer ud;
        struct upstream_list *ls;
+       GList *ctx_pos;
+       struct upstream_ctx *ctx;
 
        struct {
                GPtrArray *addr;
@@ -59,6 +61,7 @@ struct upstream {
 };
 
 struct upstream_list {
+       struct upstream_ctx *ctx;
        GPtrArray *ups;
        GPtrArray *alive;
        rspamd_mutex_t *lock;
@@ -66,8 +69,19 @@ struct upstream_list {
        guint cur_elt;
 };
 
-static struct rdns_resolver *res = NULL;
-static struct event_base *ev_base = NULL;
+struct upstream_ctx {
+       struct rdns_resolver *res;
+       struct event_base *ev_base;
+       guint max_errors;
+       gdouble revive_time;
+       gdouble revive_jitter;
+       gdouble error_time;
+       gdouble dns_timeout;
+       guint dns_retransmits;
+       GQueue *upstreams;
+       ref_entry_t ref;
+};
+
 /* 4 errors in 10 seconds */
 static guint default_max_errors = 4;
 static gdouble default_revive_time = 60;
@@ -77,31 +91,65 @@ static gdouble default_dns_timeout = 1.0;
 static guint default_dns_retransmits = 2;
 
 void
-rspamd_upstreams_library_config (struct rspamd_config *cfg)
+rspamd_upstreams_library_config (struct rspamd_config *cfg,
+               struct upstream_ctx *ctx)
 {
        if (cfg->upstream_error_time) {
-               default_error_time = cfg->upstream_error_time;
+               ctx->error_time = cfg->upstream_error_time;
        }
        if (cfg->upstream_max_errors) {
-               default_max_errors = cfg->upstream_max_errors;
+               ctx->max_errors = cfg->upstream_max_errors;
        }
        if (cfg->upstream_revive_time) {
-               default_revive_time = cfg->upstream_max_errors;
+               ctx->revive_time = cfg->upstream_max_errors;
        }
        if (cfg->dns_retransmits) {
-               default_dns_retransmits = cfg->dns_retransmits;
+               ctx->dns_retransmits = cfg->dns_retransmits;
        }
        if (cfg->dns_timeout) {
-               default_dns_timeout = cfg->dns_timeout;
+               ctx->dns_timeout = cfg->dns_timeout;
        }
 }
 
-void
+static void
+rspamd_upstream_ctx_dtor (struct upstream_ctx *ctx)
+{
+       GList *cur;
+       struct upstream *u;
+
+       cur = ctx->upstreams->head;
+
+       while (cur) {
+               u = cur->data;
+               u->ctx = NULL;
+               u->ctx_pos = NULL;
+               cur = g_list_next (cur);
+       }
+
+       g_queue_free (ctx->upstreams);
+       g_slice_free1 (sizeof (*ctx), ctx);
+}
+
+struct upstream_ctx *
 rspamd_upstreams_library_init (struct rdns_resolver *resolver,
                struct event_base *base)
 {
-       res = resolver;
-       ev_base = base;
+       struct upstream_ctx *ctx;
+
+       ctx = g_slice_alloc0 (sizeof (*ctx));
+       ctx->error_time = default_error_time;
+       ctx->max_errors = default_max_errors;
+       ctx->dns_retransmits = default_dns_retransmits;
+       ctx->dns_timeout = default_dns_timeout;
+       ctx->revive_jitter = default_revive_jitter;
+       ctx->revive_time = default_revive_time;
+
+       ctx->res = resolver;
+       ctx->ev_base = base;
+       ctx->upstreams = g_queue_new ();
+       REF_INIT_RETAIN (ctx, rspamd_upstream_ctx_dtor);
+
+       return ctx;
 }
 
 static gint
@@ -254,15 +302,15 @@ rspamd_upstream_set_inactive (struct upstream_list *ls, struct upstream *up)
        g_ptr_array_remove_index (ls->alive, up->active_idx);
        up->active_idx = -1;
 
-       if (res != NULL) {
+       if (up->ctx->res != NULL) {
                /* Resolve name of the upstream one more time */
                if (up->name[0] != '/') {
                        REF_RETAIN (up);
-                       rdns_make_request_full (res, rspamd_upstream_dns_cb, up,
+                       rdns_make_request_full (up->ctx->res, rspamd_upstream_dns_cb, up,
                                        default_dns_timeout, default_dns_retransmits,
                                        1, up->name, RDNS_REQUEST_A);
                        REF_RETAIN (up);
-                       rdns_make_request_full (res, rspamd_upstream_dns_cb, up,
+                       rdns_make_request_full (up->ctx->res, rspamd_upstream_dns_cb, up,
                                        default_dns_timeout, default_dns_retransmits,
                                        1, up->name, RDNS_REQUEST_AAAA);
                }
@@ -270,8 +318,8 @@ rspamd_upstream_set_inactive (struct upstream_list *ls, struct upstream *up)
 
        REF_RETAIN (up);
        evtimer_set (&up->ev, rspamd_upstream_revive_cb, up);
-       if (ev_base != NULL) {
-               event_base_set (ev_base, &up->ev);
+       if (up->ctx->ev_base != NULL) {
+               event_base_set (up->ctx->ev_base, &up->ev);
        }
 
        ntim = default_revive_time + ottery_rand_range (
@@ -336,7 +384,7 @@ rspamd_upstream_ok (struct upstream *up)
 #define SEED_CONSTANT 0xa574de7df64e9b9dULL
 
 struct upstream_list*
-rspamd_upstreams_create (void)
+rspamd_upstreams_create (struct upstream_ctx *ctx)
 {
        struct upstream_list *ls;
 
@@ -346,6 +394,7 @@ rspamd_upstreams_create (void)
        ls->alive = g_ptr_array_new ();
        ls->lock = rspamd_mutex_new ();
        ls->cur_elt = 0;
+       ls->ctx = ctx;
 
        return ls;
 }
@@ -381,6 +430,12 @@ rspamd_upstream_dtor (struct upstream *up)
 
        rspamd_mutex_free (up->lock);
        g_free (up->name);
+
+       if (up->ctx) {
+               g_queue_delete_link (up->ctx->upstreams, up->ctx_pos);
+               REF_RELEASE (up->ctx);
+       }
+       g_list_free (up->ctx_pos);
        g_slice_free1 (sizeof (*up), up);
 }
 
@@ -440,6 +495,10 @@ rspamd_upstreams_add_upstream (struct upstream_list *ups,
        up->ls = ups;
        REF_INIT_RETAIN (up, rspamd_upstream_dtor);
        up->lock = rspamd_mutex_new ();
+       up->ctx = ups->ctx;
+       REF_RETAIN (ups->ctx);
+       g_queue_push_tail (ups->ctx->upstreams, up);
+       up->ctx_pos = g_queue_peek_tail_link (ups->ctx->upstreams);
        g_ptr_array_sort (up->addrs.addr, rspamd_upstream_addr_sort_func);
 
        rspamd_upstream_set_active (ups, up);
index 3fb5d24a53bb871aca17b78278d018078c698fc2..4e48a42894c728b72205a2a483d5e8f754e08d05 100644 (file)
@@ -19,19 +19,21 @@ struct rspamd_config;
 /* Opaque upstream structures */
 struct upstream;
 struct upstream_list;
+struct upstream_ctx;
 
 /**
  * Init upstreams library
  * @param resolver
  */
-void rspamd_upstreams_library_init (struct rdns_resolver *resolver,
+struct upstream_ctx* rspamd_upstreams_library_init (struct rdns_resolver *resolver,
                struct event_base *base);
 
 /**
  * Configure attributes of upstreams library
  * @param cfg
  */
-void rspamd_upstreams_library_config (struct rspamd_config *cfg);
+void rspamd_upstreams_library_config (struct rspamd_config *cfg,
+               struct upstream_ctx *ctx);
 
 /**
  * Upstream error logic
@@ -55,7 +57,7 @@ void rspamd_upstream_ok (struct upstream *up);
  * Create new list of upstreams
  * @return
  */
-struct upstream_list* rspamd_upstreams_create (void);
+struct upstream_list* rspamd_upstreams_create (struct upstream_ctx *ctx);
 /**
  * Destroy list of upstreams
  * @param ups