From a42332ca48be149ed950ed9ccaf81d58783a13c9 Mon Sep 17 00:00:00 2001 From: Vsevolod Stakhov Date: Fri, 20 Nov 2015 15:48:32 +0000 Subject: [PATCH] Rework upstreams context --- src/libutil/upstream.c | 93 ++++++++++++++++++++++++++++++++++-------- src/libutil/upstream.h | 8 ++-- 2 files changed, 81 insertions(+), 20 deletions(-) diff --git a/src/libutil/upstream.c b/src/libutil/upstream.c index 686e32160..2218220b6 100644 --- a/src/libutil/upstream.c +++ b/src/libutil/upstream.c @@ -46,6 +46,8 @@ struct upstream { struct timeval tv; gpointer ud; struct upstream_list *ls; + GList *ctx_pos; + struct upstream_ctx *ctx; struct { GPtrArray *addr; @@ -59,6 +61,7 @@ struct upstream { }; struct upstream_list { + struct upstream_ctx *ctx; GPtrArray *ups; GPtrArray *alive; rspamd_mutex_t *lock; @@ -66,8 +69,19 @@ struct upstream_list { guint cur_elt; }; -static struct rdns_resolver *res = NULL; -static struct event_base *ev_base = NULL; +struct upstream_ctx { + struct rdns_resolver *res; + struct event_base *ev_base; + guint max_errors; + gdouble revive_time; + gdouble revive_jitter; + gdouble error_time; + gdouble dns_timeout; + guint dns_retransmits; + GQueue *upstreams; + ref_entry_t ref; +}; + /* 4 errors in 10 seconds */ static guint default_max_errors = 4; static gdouble default_revive_time = 60; @@ -77,31 +91,65 @@ static gdouble default_dns_timeout = 1.0; static guint default_dns_retransmits = 2; void -rspamd_upstreams_library_config (struct rspamd_config *cfg) +rspamd_upstreams_library_config (struct rspamd_config *cfg, + struct upstream_ctx *ctx) { if (cfg->upstream_error_time) { - default_error_time = cfg->upstream_error_time; + ctx->error_time = cfg->upstream_error_time; } if (cfg->upstream_max_errors) { - default_max_errors = cfg->upstream_max_errors; + ctx->max_errors = cfg->upstream_max_errors; } if (cfg->upstream_revive_time) { - default_revive_time = cfg->upstream_max_errors; + ctx->revive_time = cfg->upstream_max_errors; } if (cfg->dns_retransmits) { - default_dns_retransmits = cfg->dns_retransmits; + ctx->dns_retransmits = cfg->dns_retransmits; } if (cfg->dns_timeout) { - default_dns_timeout = cfg->dns_timeout; + ctx->dns_timeout = cfg->dns_timeout; } } -void +static void +rspamd_upstream_ctx_dtor (struct upstream_ctx *ctx) +{ + GList *cur; + struct upstream *u; + + cur = ctx->upstreams->head; + + while (cur) { + u = cur->data; + u->ctx = NULL; + u->ctx_pos = NULL; + cur = g_list_next (cur); + } + + g_queue_free (ctx->upstreams); + g_slice_free1 (sizeof (*ctx), ctx); +} + +struct upstream_ctx * rspamd_upstreams_library_init (struct rdns_resolver *resolver, struct event_base *base) { - res = resolver; - ev_base = base; + struct upstream_ctx *ctx; + + ctx = g_slice_alloc0 (sizeof (*ctx)); + ctx->error_time = default_error_time; + ctx->max_errors = default_max_errors; + ctx->dns_retransmits = default_dns_retransmits; + ctx->dns_timeout = default_dns_timeout; + ctx->revive_jitter = default_revive_jitter; + ctx->revive_time = default_revive_time; + + ctx->res = resolver; + ctx->ev_base = base; + ctx->upstreams = g_queue_new (); + REF_INIT_RETAIN (ctx, rspamd_upstream_ctx_dtor); + + return ctx; } static gint @@ -254,15 +302,15 @@ rspamd_upstream_set_inactive (struct upstream_list *ls, struct upstream *up) g_ptr_array_remove_index (ls->alive, up->active_idx); up->active_idx = -1; - if (res != NULL) { + if (up->ctx->res != NULL) { /* Resolve name of the upstream one more time */ if (up->name[0] != '/') { REF_RETAIN (up); - rdns_make_request_full (res, rspamd_upstream_dns_cb, up, + rdns_make_request_full (up->ctx->res, rspamd_upstream_dns_cb, up, default_dns_timeout, default_dns_retransmits, 1, up->name, RDNS_REQUEST_A); REF_RETAIN (up); - rdns_make_request_full (res, rspamd_upstream_dns_cb, up, + rdns_make_request_full (up->ctx->res, rspamd_upstream_dns_cb, up, default_dns_timeout, default_dns_retransmits, 1, up->name, RDNS_REQUEST_AAAA); } @@ -270,8 +318,8 @@ rspamd_upstream_set_inactive (struct upstream_list *ls, struct upstream *up) REF_RETAIN (up); evtimer_set (&up->ev, rspamd_upstream_revive_cb, up); - if (ev_base != NULL) { - event_base_set (ev_base, &up->ev); + if (up->ctx->ev_base != NULL) { + event_base_set (up->ctx->ev_base, &up->ev); } ntim = default_revive_time + ottery_rand_range ( @@ -336,7 +384,7 @@ rspamd_upstream_ok (struct upstream *up) #define SEED_CONSTANT 0xa574de7df64e9b9dULL struct upstream_list* -rspamd_upstreams_create (void) +rspamd_upstreams_create (struct upstream_ctx *ctx) { struct upstream_list *ls; @@ -346,6 +394,7 @@ rspamd_upstreams_create (void) ls->alive = g_ptr_array_new (); ls->lock = rspamd_mutex_new (); ls->cur_elt = 0; + ls->ctx = ctx; return ls; } @@ -381,6 +430,12 @@ rspamd_upstream_dtor (struct upstream *up) rspamd_mutex_free (up->lock); g_free (up->name); + + if (up->ctx) { + g_queue_delete_link (up->ctx->upstreams, up->ctx_pos); + REF_RELEASE (up->ctx); + } + g_list_free (up->ctx_pos); g_slice_free1 (sizeof (*up), up); } @@ -440,6 +495,10 @@ rspamd_upstreams_add_upstream (struct upstream_list *ups, up->ls = ups; REF_INIT_RETAIN (up, rspamd_upstream_dtor); up->lock = rspamd_mutex_new (); + up->ctx = ups->ctx; + REF_RETAIN (ups->ctx); + g_queue_push_tail (ups->ctx->upstreams, up); + up->ctx_pos = g_queue_peek_tail_link (ups->ctx->upstreams); g_ptr_array_sort (up->addrs.addr, rspamd_upstream_addr_sort_func); rspamd_upstream_set_active (ups, up); diff --git a/src/libutil/upstream.h b/src/libutil/upstream.h index 3fb5d24a5..4e48a4289 100644 --- a/src/libutil/upstream.h +++ b/src/libutil/upstream.h @@ -19,19 +19,21 @@ struct rspamd_config; /* Opaque upstream structures */ struct upstream; struct upstream_list; +struct upstream_ctx; /** * Init upstreams library * @param resolver */ -void rspamd_upstreams_library_init (struct rdns_resolver *resolver, +struct upstream_ctx* rspamd_upstreams_library_init (struct rdns_resolver *resolver, struct event_base *base); /** * Configure attributes of upstreams library * @param cfg */ -void rspamd_upstreams_library_config (struct rspamd_config *cfg); +void rspamd_upstreams_library_config (struct rspamd_config *cfg, + struct upstream_ctx *ctx); /** * Upstream error logic @@ -55,7 +57,7 @@ void rspamd_upstream_ok (struct upstream *up); * Create new list of upstreams * @return */ -struct upstream_list* rspamd_upstreams_create (void); +struct upstream_list* rspamd_upstreams_create (struct upstream_ctx *ctx); /** * Destroy list of upstreams * @param ups -- 2.39.5