struct timeval tv;
gpointer ud;
struct upstream_list *ls;
+ GList *ctx_pos;
+ struct upstream_ctx *ctx;
struct {
GPtrArray *addr;
};
struct upstream_list {
+ struct upstream_ctx *ctx;
GPtrArray *ups;
GPtrArray *alive;
rspamd_mutex_t *lock;
guint cur_elt;
};
-static struct rdns_resolver *res = NULL;
-static struct event_base *ev_base = NULL;
+struct upstream_ctx {
+ struct rdns_resolver *res;
+ struct event_base *ev_base;
+ guint max_errors;
+ gdouble revive_time;
+ gdouble revive_jitter;
+ gdouble error_time;
+ gdouble dns_timeout;
+ guint dns_retransmits;
+ GQueue *upstreams;
+ ref_entry_t ref;
+};
+
/* 4 errors in 10 seconds */
static guint default_max_errors = 4;
static gdouble default_revive_time = 60;
static guint default_dns_retransmits = 2;
void
-rspamd_upstreams_library_config (struct rspamd_config *cfg)
+rspamd_upstreams_library_config (struct rspamd_config *cfg,
+ struct upstream_ctx *ctx)
{
if (cfg->upstream_error_time) {
- default_error_time = cfg->upstream_error_time;
+ ctx->error_time = cfg->upstream_error_time;
}
if (cfg->upstream_max_errors) {
- default_max_errors = cfg->upstream_max_errors;
+ ctx->max_errors = cfg->upstream_max_errors;
}
if (cfg->upstream_revive_time) {
- default_revive_time = cfg->upstream_max_errors;
+ ctx->revive_time = cfg->upstream_max_errors;
}
if (cfg->dns_retransmits) {
- default_dns_retransmits = cfg->dns_retransmits;
+ ctx->dns_retransmits = cfg->dns_retransmits;
}
if (cfg->dns_timeout) {
- default_dns_timeout = cfg->dns_timeout;
+ ctx->dns_timeout = cfg->dns_timeout;
}
}
-void
+static void
+rspamd_upstream_ctx_dtor (struct upstream_ctx *ctx)
+{
+ GList *cur;
+ struct upstream *u;
+
+ cur = ctx->upstreams->head;
+
+ while (cur) {
+ u = cur->data;
+ u->ctx = NULL;
+ u->ctx_pos = NULL;
+ cur = g_list_next (cur);
+ }
+
+ g_queue_free (ctx->upstreams);
+ g_slice_free1 (sizeof (*ctx), ctx);
+}
+
+struct upstream_ctx *
rspamd_upstreams_library_init (struct rdns_resolver *resolver,
struct event_base *base)
{
- res = resolver;
- ev_base = base;
+ struct upstream_ctx *ctx;
+
+ ctx = g_slice_alloc0 (sizeof (*ctx));
+ ctx->error_time = default_error_time;
+ ctx->max_errors = default_max_errors;
+ ctx->dns_retransmits = default_dns_retransmits;
+ ctx->dns_timeout = default_dns_timeout;
+ ctx->revive_jitter = default_revive_jitter;
+ ctx->revive_time = default_revive_time;
+
+ ctx->res = resolver;
+ ctx->ev_base = base;
+ ctx->upstreams = g_queue_new ();
+ REF_INIT_RETAIN (ctx, rspamd_upstream_ctx_dtor);
+
+ return ctx;
}
static gint
g_ptr_array_remove_index (ls->alive, up->active_idx);
up->active_idx = -1;
- if (res != NULL) {
+ if (up->ctx->res != NULL) {
/* Resolve name of the upstream one more time */
if (up->name[0] != '/') {
REF_RETAIN (up);
- rdns_make_request_full (res, rspamd_upstream_dns_cb, up,
+ rdns_make_request_full (up->ctx->res, rspamd_upstream_dns_cb, up,
default_dns_timeout, default_dns_retransmits,
1, up->name, RDNS_REQUEST_A);
REF_RETAIN (up);
- rdns_make_request_full (res, rspamd_upstream_dns_cb, up,
+ rdns_make_request_full (up->ctx->res, rspamd_upstream_dns_cb, up,
default_dns_timeout, default_dns_retransmits,
1, up->name, RDNS_REQUEST_AAAA);
}
REF_RETAIN (up);
evtimer_set (&up->ev, rspamd_upstream_revive_cb, up);
- if (ev_base != NULL) {
- event_base_set (ev_base, &up->ev);
+ if (up->ctx->ev_base != NULL) {
+ event_base_set (up->ctx->ev_base, &up->ev);
}
ntim = default_revive_time + ottery_rand_range (
#define SEED_CONSTANT 0xa574de7df64e9b9dULL
struct upstream_list*
-rspamd_upstreams_create (void)
+rspamd_upstreams_create (struct upstream_ctx *ctx)
{
struct upstream_list *ls;
ls->alive = g_ptr_array_new ();
ls->lock = rspamd_mutex_new ();
ls->cur_elt = 0;
+ ls->ctx = ctx;
return ls;
}
rspamd_mutex_free (up->lock);
g_free (up->name);
+
+ if (up->ctx) {
+ g_queue_delete_link (up->ctx->upstreams, up->ctx_pos);
+ REF_RELEASE (up->ctx);
+ }
+ g_list_free (up->ctx_pos);
g_slice_free1 (sizeof (*up), up);
}
up->ls = ups;
REF_INIT_RETAIN (up, rspamd_upstream_dtor);
up->lock = rspamd_mutex_new ();
+ up->ctx = ups->ctx;
+ REF_RETAIN (ups->ctx);
+ g_queue_push_tail (ups->ctx->upstreams, up);
+ up->ctx_pos = g_queue_peek_tail_link (ups->ctx->upstreams);
g_ptr_array_sort (up->addrs.addr, rspamd_upstream_addr_sort_func);
rspamd_upstream_set_active (ups, up);