Browse Source

Add some basic logic for upstreams.

tags/0.7.3
Vsevolod Stakhov 9 years ago
parent
commit
f046f61e40
3 changed files with 138 additions and 15 deletions
  1. 10
    3
      src/libutil/addr.c
  2. 3
    3
      src/libutil/addr.h
  3. 125
    9
      src/libutil/upstream.c

+ 10
- 3
src/libutil/addr.c View File

@@ -185,7 +185,7 @@ rspamd_inet_address_connect (rspamd_inet_addr_t *addr, gint type,

gboolean
rspamd_parse_host_port_priority_strv (gchar **tokens,
rspamd_inet_addr_t *addr, guint *priority, guint default_port)
rspamd_inet_addr_t *addr, guint *priority, gchar **name, guint default_port)
{
gchar *err_str, portbuf[8];
const gchar *cur_tok, *cur_port;
@@ -267,6 +267,7 @@ rspamd_parse_host_port_priority_strv (gchar **tokens,
if ((r = getaddrinfo (cur_tok, cur_port, &hints, &res)) == 0) {
memcpy (&addr->addr, res->ai_addr,
MIN (sizeof (addr->addr), res->ai_addrlen));
addr->af = res->ai_family;
freeaddrinfo (res);
}
else {
@@ -277,6 +278,9 @@ rspamd_parse_host_port_priority_strv (gchar **tokens,
}

/* Restore errno */
if (name != NULL) {
*name = g_strdup (tokens[0]);
}
errno = saved_errno;
return TRUE;

@@ -290,6 +294,7 @@ rspamd_parse_host_port_priority (
const gchar *str,
rspamd_inet_addr_t *addr,
guint *priority,
gchar **name,
guint default_port)
{
gchar **tokens;
@@ -300,7 +305,8 @@ rspamd_parse_host_port_priority (
return FALSE;
}

ret = rspamd_parse_host_port_priority_strv (tokens, addr, priority, default_port);
ret = rspamd_parse_host_port_priority_strv (tokens, addr, priority, name,
default_port);

g_strfreev (tokens);

@@ -310,7 +316,8 @@ rspamd_parse_host_port_priority (
gboolean
rspamd_parse_host_port (const gchar *str,
rspamd_inet_addr_t *addr,
gchar **name,
guint default_port)
{
return rspamd_parse_host_port_priority (str, addr, NULL, default_port);
return rspamd_parse_host_port_priority (str, addr, NULL, name, default_port);
}

+ 3
- 3
src/libutil/addr.h View File

@@ -96,7 +96,7 @@ gboolean rspamd_ip_is_valid (rspamd_inet_addr_t *addr);
gint rspamd_accept_from_socket (gint sock, rspamd_inet_addr_t *addr);

gboolean rspamd_parse_host_port_priority_strv (gchar **tokens,
rspamd_inet_addr_t *addr, guint *priority, guint default_port);
rspamd_inet_addr_t *addr, guint *priority, gchar **name, guint default_port);

/**
* Parse host[:port[:priority]] line
@@ -106,7 +106,7 @@ gboolean rspamd_parse_host_port_priority_strv (gchar **tokens,
* @return TRUE if string was parsed
*/
gboolean rspamd_parse_host_port_priority (const gchar *str,
rspamd_inet_addr_t *addr, guint *priority, guint default_port);
rspamd_inet_addr_t *addr, guint *priority, gchar **name, guint default_port);

/**
* Parse host:port line
@@ -115,7 +115,7 @@ gboolean rspamd_parse_host_port_priority (const gchar *str,
* @return TRUE if string was parsed
*/
gboolean rspamd_parse_host_port (const gchar *str,
rspamd_inet_addr_t *addr, guint default_port);
rspamd_inet_addr_t *addr, gchar **name, guint default_port);


#endif /* ADDR_H_ */

+ 125
- 9
src/libutil/upstream.c View File

@@ -25,23 +25,26 @@
#include "config.h"
#include "upstream.h"
#include "ottery.h"
#include "ref.h"
#include "rdns.h"

struct upstream {
guint weight;
guint cur_weight;
guint errors;
guint port;
guint active_idx;
gint active_idx;
gchar *name;
struct event ev;
struct timeval tv;
gpointer ud;
struct upstream_list *ls;
rspamd_inet_addr_t addr;
ref_entry_t ref;
};

struct upstream_list {
GArray *ups;
GPtrArray *ups;
GPtrArray *alive;
rspamd_mutex_t *lock;
guint hash_seed;
@@ -49,23 +52,78 @@ struct upstream_list {

static struct rdns_resolver *res = NULL;
static struct event_base *ev_base = NULL;
/* 4 errors in 10 seconds */
const guint default_max_errors = 4;
const guint default_revive_time = 60;
const guint default_error_time = 10;
const gdouble default_dns_timeout = 1.0;
const guint default_dns_retransmits = 2;

static void
rspamd_upstream_set_inactive (struct upstream_list *ls, struct upstream *up)
rspamd_upstream_set_active (struct upstream_list *ls, struct upstream *up)
{
rspamd_mutex_lock (ls->lock);
g_ptr_array_remove_index (ls->alive, up->active_idx);
up->active_idx = 0;
g_ptr_array_add (ls->alive, up);
up->active_idx = ls->alive->len - 1;
rspamd_mutex_unlock (ls->lock);
}

static void
rspamd_upstream_set_active (struct upstream_list *ls, struct upstream *up)
rspamd_upstream_dns_cb (struct rdns_reply *reply, void *arg)
{
struct upstream *up = (struct upstream *)arg;
struct rdns_reply_entry *entry;

if (reply->code == RDNS_RC_NOERROR) {

}

REF_RELEASE (up);
}

static void
rspamd_revive_cb (int fd, short what, void *arg)
{
struct upstream *up = (struct upstream *)arg;

event_del (&up->ev);
if (up->ls) {
rspamd_upstream_set_active (up->ls, up);
}

REF_RELEASE (up);
}

static void
rspamd_upstream_set_inactive (struct upstream_list *ls, struct upstream *up)
{
gint query_type = -1;

rspamd_mutex_lock (ls->lock);
g_ptr_array_add (ls->alive, up);
up->active_idx = ls->alive->len - 1;
g_ptr_array_remove_index (ls->alive, up->active_idx);
up->active_idx = -1;
/* Resolve name of the upstream one more time */
if (up->addr.af == AF_INET) {
query_type = RDNS_REQUEST_A;
}
else if (up->addr.af == AF_INET6) {
query_type = RDNS_REQUEST_AAAA;
}

if (query_type != -1) {
REF_RETAIN (up);
rdns_make_request_full (res, rspamd_upstream_dns_cb, up,
default_dns_timeout, default_dns_retransmits,
RDNS_REQUEST_A, up->name);
}

REF_RETAIN (up);
evtimer_set (&up->ev, rspamd_revive_cb, up);
event_base_set (ev_base, &up->ev);
up->tv.tv_sec = default_revive_time;
up->tv.tv_usec = 0;
event_add (&up->ev, &up->tv);

rspamd_mutex_unlock (ls->lock);
}

@@ -80,6 +138,9 @@ rspamd_upstreams_library_init (struct rdns_resolver *resolver,
void
rspamd_upstream_fail (struct upstream *up)
{
struct timeval tv;
gdouble error_rate, max_error_rate;

if (g_atomic_int_compare_and_exchange (&up->errors, 0, 1)) {
gettimeofday (&up->tv, NULL);
up->errors ++;
@@ -88,7 +149,12 @@ rspamd_upstream_fail (struct upstream *up)
g_atomic_int_inc (&up->errors);
}

if (g_atomic_int_compare_and_exchange (&up->errors, default_max_errors, 0)) {
gettimeofday (&tv, NULL);

error_rate = ((gdouble)up->errors) / (tv.tv_sec - up->tv.tv_sec);
max_error_rate = (gdouble)default_max_errors / (gdouble)default_error_time;

if (error_rate > max_error_rate) {
/* Remove upstream from the active list */
rspamd_upstream_set_inactive (up->ls, up);
}
@@ -120,9 +186,59 @@ rspamd_upstreams_create (void)
return ls;
}

static void
rspamd_upstream_dtor (struct upstream *up)
{
g_free (up->name);
g_slice_free1 (sizeof (*up), up);
}

rspamd_inet_addr_t*
rspamd_upstream_addr (struct upstream *up)
{
return &up->addr;
}

gboolean
rspamd_upstreams_add_upstream (struct upstream_list *ups,
const gchar *str, guint16 def_port, void *data)
{
struct upstream *up;

up = g_slice_alloc0 (sizeof (*up));

if (!rspamd_parse_host_port_priority (str, &up->addr, &up->weight,
&up->name, def_port)) {
g_slice_free1 (sizeof (*up), up);
return FALSE;
}

g_ptr_array_add (ups->ups, up);
up->ud = data;
up->cur_weight = up->weight;
up->port = rspamd_inet_address_get_port (&up->addr);
REF_INIT_RETAIN (up, rspamd_upstream_dtor);

rspamd_upstream_set_active (ups, up);

return TRUE;
}

void
rspamd_upstreams_destroy (struct upstream_list *ups)
{
guint i;
struct upstream *up;

g_ptr_array_free (ups->alive, TRUE);

for (i = 0; i < ups->ups->len; i ++) {
up = g_ptr_array_index (ups->ups, i);
up->ls = NULL;
REF_RELEASE (up);
}

g_ptr_array_free (ups->ups, TRUE);
rspamd_mutex_free (ups->lock);
g_slice_free1 (sizeof (*ups), ups);
}

Loading…
Cancel
Save