Browse Source

Rework upstreams context

tags/1.1.0
Vsevolod Stakhov 8 years ago
parent
commit
a42332ca48
2 changed files with 81 additions and 20 deletions
  1. 76
    17
      src/libutil/upstream.c
  2. 5
    3
      src/libutil/upstream.h

+ 76
- 17
src/libutil/upstream.c View File

@@ -46,6 +46,8 @@ struct upstream {
struct timeval tv;
gpointer ud;
struct upstream_list *ls;
GList *ctx_pos;
struct upstream_ctx *ctx;

struct {
GPtrArray *addr;
@@ -59,6 +61,7 @@ struct upstream {
};

struct upstream_list {
struct upstream_ctx *ctx;
GPtrArray *ups;
GPtrArray *alive;
rspamd_mutex_t *lock;
@@ -66,8 +69,19 @@ struct upstream_list {
guint cur_elt;
};

static struct rdns_resolver *res = NULL;
static struct event_base *ev_base = NULL;
struct upstream_ctx {
struct rdns_resolver *res;
struct event_base *ev_base;
guint max_errors;
gdouble revive_time;
gdouble revive_jitter;
gdouble error_time;
gdouble dns_timeout;
guint dns_retransmits;
GQueue *upstreams;
ref_entry_t ref;
};

/* 4 errors in 10 seconds */
static guint default_max_errors = 4;
static gdouble default_revive_time = 60;
@@ -77,31 +91,65 @@ static gdouble default_dns_timeout = 1.0;
static guint default_dns_retransmits = 2;

void
rspamd_upstreams_library_config (struct rspamd_config *cfg)
rspamd_upstreams_library_config (struct rspamd_config *cfg,
struct upstream_ctx *ctx)
{
if (cfg->upstream_error_time) {
default_error_time = cfg->upstream_error_time;
ctx->error_time = cfg->upstream_error_time;
}
if (cfg->upstream_max_errors) {
default_max_errors = cfg->upstream_max_errors;
ctx->max_errors = cfg->upstream_max_errors;
}
if (cfg->upstream_revive_time) {
default_revive_time = cfg->upstream_max_errors;
ctx->revive_time = cfg->upstream_max_errors;
}
if (cfg->dns_retransmits) {
default_dns_retransmits = cfg->dns_retransmits;
ctx->dns_retransmits = cfg->dns_retransmits;
}
if (cfg->dns_timeout) {
default_dns_timeout = cfg->dns_timeout;
ctx->dns_timeout = cfg->dns_timeout;
}
}

void
static void
rspamd_upstream_ctx_dtor (struct upstream_ctx *ctx)
{
GList *cur;
struct upstream *u;

cur = ctx->upstreams->head;

while (cur) {
u = cur->data;
u->ctx = NULL;
u->ctx_pos = NULL;
cur = g_list_next (cur);
}

g_queue_free (ctx->upstreams);
g_slice_free1 (sizeof (*ctx), ctx);
}

struct upstream_ctx *
rspamd_upstreams_library_init (struct rdns_resolver *resolver,
struct event_base *base)
{
res = resolver;
ev_base = base;
struct upstream_ctx *ctx;

ctx = g_slice_alloc0 (sizeof (*ctx));
ctx->error_time = default_error_time;
ctx->max_errors = default_max_errors;
ctx->dns_retransmits = default_dns_retransmits;
ctx->dns_timeout = default_dns_timeout;
ctx->revive_jitter = default_revive_jitter;
ctx->revive_time = default_revive_time;

ctx->res = resolver;
ctx->ev_base = base;
ctx->upstreams = g_queue_new ();
REF_INIT_RETAIN (ctx, rspamd_upstream_ctx_dtor);

return ctx;
}

static gint
@@ -254,15 +302,15 @@ rspamd_upstream_set_inactive (struct upstream_list *ls, struct upstream *up)
g_ptr_array_remove_index (ls->alive, up->active_idx);
up->active_idx = -1;

if (res != NULL) {
if (up->ctx->res != NULL) {
/* Resolve name of the upstream one more time */
if (up->name[0] != '/') {
REF_RETAIN (up);
rdns_make_request_full (res, rspamd_upstream_dns_cb, up,
rdns_make_request_full (up->ctx->res, rspamd_upstream_dns_cb, up,
default_dns_timeout, default_dns_retransmits,
1, up->name, RDNS_REQUEST_A);
REF_RETAIN (up);
rdns_make_request_full (res, rspamd_upstream_dns_cb, up,
rdns_make_request_full (up->ctx->res, rspamd_upstream_dns_cb, up,
default_dns_timeout, default_dns_retransmits,
1, up->name, RDNS_REQUEST_AAAA);
}
@@ -270,8 +318,8 @@ rspamd_upstream_set_inactive (struct upstream_list *ls, struct upstream *up)

REF_RETAIN (up);
evtimer_set (&up->ev, rspamd_upstream_revive_cb, up);
if (ev_base != NULL) {
event_base_set (ev_base, &up->ev);
if (up->ctx->ev_base != NULL) {
event_base_set (up->ctx->ev_base, &up->ev);
}

ntim = default_revive_time + ottery_rand_range (
@@ -336,7 +384,7 @@ rspamd_upstream_ok (struct upstream *up)
#define SEED_CONSTANT 0xa574de7df64e9b9dULL

struct upstream_list*
rspamd_upstreams_create (void)
rspamd_upstreams_create (struct upstream_ctx *ctx)
{
struct upstream_list *ls;

@@ -346,6 +394,7 @@ rspamd_upstreams_create (void)
ls->alive = g_ptr_array_new ();
ls->lock = rspamd_mutex_new ();
ls->cur_elt = 0;
ls->ctx = ctx;

return ls;
}
@@ -381,6 +430,12 @@ rspamd_upstream_dtor (struct upstream *up)

rspamd_mutex_free (up->lock);
g_free (up->name);

if (up->ctx) {
g_queue_delete_link (up->ctx->upstreams, up->ctx_pos);
REF_RELEASE (up->ctx);
}
g_list_free (up->ctx_pos);
g_slice_free1 (sizeof (*up), up);
}

@@ -440,6 +495,10 @@ rspamd_upstreams_add_upstream (struct upstream_list *ups,
up->ls = ups;
REF_INIT_RETAIN (up, rspamd_upstream_dtor);
up->lock = rspamd_mutex_new ();
up->ctx = ups->ctx;
REF_RETAIN (ups->ctx);
g_queue_push_tail (ups->ctx->upstreams, up);
up->ctx_pos = g_queue_peek_tail_link (ups->ctx->upstreams);
g_ptr_array_sort (up->addrs.addr, rspamd_upstream_addr_sort_func);

rspamd_upstream_set_active (ups, up);

+ 5
- 3
src/libutil/upstream.h View File

@@ -19,19 +19,21 @@ struct rspamd_config;
/* Opaque upstream structures */
struct upstream;
struct upstream_list;
struct upstream_ctx;

/**
* Init upstreams library
* @param resolver
*/
void rspamd_upstreams_library_init (struct rdns_resolver *resolver,
struct upstream_ctx* rspamd_upstreams_library_init (struct rdns_resolver *resolver,
struct event_base *base);

/**
* Configure attributes of upstreams library
* @param cfg
*/
void rspamd_upstreams_library_config (struct rspamd_config *cfg);
void rspamd_upstreams_library_config (struct rspamd_config *cfg,
struct upstream_ctx *ctx);

/**
* Upstream error logic
@@ -55,7 +57,7 @@ void rspamd_upstream_ok (struct upstream *up);
* Create new list of upstreams
* @return
*/
struct upstream_list* rspamd_upstreams_create (void);
struct upstream_list* rspamd_upstreams_create (struct upstream_ctx *ctx);
/**
* Destroy list of upstreams
* @param ups

Loading…
Cancel
Save