Browse Source

Add rspamd_upstream_reresolve routine.

tags/1.1.0
Vsevolod Stakhov 8 years ago
parent
commit
33dc0d893c
2 changed files with 97 additions and 45 deletions
  1. 92
    45
      src/libutil/upstream.c
  2. 5
    0
      src/libutil/upstream.h

+ 92
- 45
src/libutil/upstream.c View File

@@ -40,6 +40,7 @@ struct upstream {
guint weight;
guint cur_weight;
guint errors;
guint dns_requests;
gint active_idx;
gchar *name;
struct event ev;
@@ -210,39 +211,6 @@ rspamd_upstream_set_active (struct upstream_list *ls, struct upstream *up)
rspamd_mutex_unlock (ls->lock);
}

static void
rspamd_upstream_dns_cb (struct rdns_reply *reply, void *arg)
{
struct upstream *up = (struct upstream *)arg;
struct rdns_reply_entry *entry;
struct upstream_inet_addr_entry *up_ent;

if (reply->code == RDNS_RC_NOERROR) {
entry = reply->entries;

rspamd_mutex_lock (up->lock);
while (entry) {

if (entry->type == RDNS_REQUEST_A) {
up_ent = g_malloc0 (sizeof (*up_ent));
up_ent->addr = rspamd_inet_address_new (AF_INET,
&entry->content.a.addr);
LL_PREPEND (up->new_addrs, up_ent);
}
else if (entry->type == RDNS_REQUEST_AAAA) {
up_ent = g_malloc0 (sizeof (*up_ent));
up_ent->addr = rspamd_inet_address_new (AF_INET6,
&entry->content.aaa.addr);
LL_PREPEND (up->new_addrs, up_ent);
}
entry = entry->next;
}
rspamd_mutex_unlock (up->lock);
}

REF_RELEASE (up);
}

static void
rspamd_upstream_update_addrs (struct upstream *up)
{
@@ -255,8 +223,11 @@ rspamd_upstream_update_addrs (struct upstream *up)
* We need first of all get the saved port, since DNS gives us no
* idea about what port has been used previously
*/
rspamd_mutex_lock (up->lock);

if (up->addrs.addr->len > 0 && up->new_addrs) {
port = rspamd_inet_address_get_port (g_ptr_array_index (up->addrs.addr, 0));
port = rspamd_inet_address_get_port (g_ptr_array_index (up->addrs.addr,
0));

/* Free old addresses */
g_ptr_array_free (up->addrs.addr, TRUE);
@@ -264,10 +235,10 @@ rspamd_upstream_update_addrs (struct upstream *up)
/* Now calculate new addrs count */
addr_cnt = 0;
LL_FOREACH (up->new_addrs, cur) {
addr_cnt ++;
addr_cnt++;
}
new_addrs = g_ptr_array_new_full (addr_cnt,
(GDestroyNotify)rspamd_inet_address_destroy);
(GDestroyNotify) rspamd_inet_address_destroy);

/* Copy addrs back */
LL_FOREACH (up->new_addrs, cur) {
@@ -285,6 +256,48 @@ rspamd_upstream_update_addrs (struct upstream *up)
g_free (cur);
}
up->new_addrs = NULL;

rspamd_mutex_unlock (up->lock);
}

static void
rspamd_upstream_dns_cb (struct rdns_reply *reply, void *arg)
{
struct upstream *up = (struct upstream *)arg;
struct rdns_reply_entry *entry;
struct upstream_inet_addr_entry *up_ent;

if (reply->code == RDNS_RC_NOERROR) {
entry = reply->entries;

rspamd_mutex_lock (up->lock);
while (entry) {

if (entry->type == RDNS_REQUEST_A) {
up_ent = g_malloc0 (sizeof (*up_ent));
up_ent->addr = rspamd_inet_address_new (AF_INET,
&entry->content.a.addr);
LL_PREPEND (up->new_addrs, up_ent);
}
else if (entry->type == RDNS_REQUEST_AAAA) {
up_ent = g_malloc0 (sizeof (*up_ent));
up_ent->addr = rspamd_inet_address_new (AF_INET6,
&entry->content.aaa.addr);
LL_PREPEND (up->new_addrs, up_ent);
}
entry = entry->next;
}

rspamd_mutex_unlock (up->lock);
}

up->dns_requests--;

if (up->dns_requests == 0) {
rspamd_upstream_update_addrs (up);
}

REF_RELEASE (up);
}

static void
@@ -296,10 +309,6 @@ rspamd_upstream_revive_cb (int fd, short what, void *arg)
event_del (&up->ev);
if (up->ls) {
rspamd_upstream_set_active (up->ls, up);

if (up->new_addrs) {
rspamd_upstream_update_addrs (up);
}
}

rspamd_mutex_unlock (up->lock);
@@ -322,12 +331,14 @@ rspamd_upstream_set_inactive (struct upstream_list *ls, struct upstream *up)
if (rdns_make_request_full (up->ctx->res, rspamd_upstream_dns_cb, up,
up->ctx->dns_timeout, up->ctx->dns_retransmits,
1, up->name, RDNS_REQUEST_A) != NULL) {
up->dns_requests ++;
REF_RETAIN (up);
}

if (rdns_make_request_full (up->ctx->res, rspamd_upstream_dns_cb, up,
up->ctx->dns_timeout, up->ctx->dns_retransmits,
1, up->name, RDNS_REQUEST_AAAA) != NULL) {
up->dns_requests ++;
REF_RETAIN (up);
}
}
@@ -647,11 +658,6 @@ rspamd_upstream_restore_cb (gpointer elt, gpointer ls)
/* Here the upstreams list is already locked */
rspamd_mutex_lock (up->lock);
event_del (&up->ev);

if (up->new_addrs) {
rspamd_upstream_update_addrs (up);
}

g_ptr_array_add (ups->alive, up);
up->active_idx = ups->alive->len - 1;
rspamd_mutex_unlock (up->lock);
@@ -785,3 +791,44 @@ rspamd_upstream_get (struct upstream_list *ups,
/* Silent stupid compilers */
return NULL;
}

void
rspamd_upstream_reresolve (struct upstream_ctx *ctx)
{
GList *cur;
struct upstream *up;

cur = ctx->upstreams->head;

while (cur) {
up = cur->data;

if (up->name[0] != '/') {
if (rdns_make_request_full (ctx->res,
rspamd_upstream_dns_cb,
up,
ctx->dns_timeout,
ctx->dns_retransmits,
1,
up->name,
RDNS_REQUEST_A) != NULL) {
up->dns_requests++;
REF_RETAIN (up);
}

if (rdns_make_request_full (ctx->res,
rspamd_upstream_dns_cb,
up,
ctx->dns_timeout,
ctx->dns_retransmits,
1,
up->name,
RDNS_REQUEST_AAAA) != NULL) {
up->dns_requests++;
REF_RETAIN (up);
}
}

cur = g_list_next (cur);
}
}

+ 5
- 0
src/libutil/upstream.h View File

@@ -150,6 +150,11 @@ struct upstream* rspamd_upstream_get (struct upstream_list *ups,
enum rspamd_upstream_rotation default_type,
const guchar *key, gsize keylen);

/**
* Re-resolve addresses for all upstreams registered
*/
void rspamd_upstream_reresolve (struct upstream_ctx *ctx);

#endif /* UPSTREAM_H */
/*
* vi:ts=4

Loading…
Cancel
Save