Browse Source

[Feature] Implement event watchers for upstreams

tags/1.9.0
Vsevolod Stakhov 5 years ago
parent
commit
a6a1a8d5ae
2 changed files with 93 additions and 2 deletions
  1. 68
    2
      src/libutil/upstream.c
  2. 25
    0
      src/libutil/upstream.h

+ 68
- 2
src/libutil/upstream.c View File

@@ -34,6 +34,13 @@ struct upstream_addr_elt {
guint errors;
};

struct upstream_list_watcher {
rspamd_upstream_watch_func func;
gpointer ud;
enum rspamd_upstreams_watch_event events_mask;
struct upstream_list_watcher *next, *prev;
};

struct upstream {
guint weight;
guint cur_weight;
@@ -73,6 +80,7 @@ struct upstream_list {
struct upstream_ctx *ctx;
GPtrArray *ups;
GPtrArray *alive;
struct upstream_list_watcher *watchers;
rspamd_mutex_t *lock;
guint64 hash_seed;
struct upstream_limits limits;
@@ -109,8 +117,9 @@ static guint default_dns_retransmits = 2;

void
rspamd_upstreams_library_config (struct rspamd_config *cfg,
struct upstream_ctx *ctx, struct event_base *ev_base,
struct rdns_resolver *resolver)
struct upstream_ctx *ctx,
struct event_base *ev_base,
struct rdns_resolver *resolver)
{
g_assert (ctx != NULL);
g_assert (cfg != NULL);
@@ -405,6 +414,7 @@ rspamd_upstream_set_inactive (struct upstream_list *ls, struct upstream *up)
guint i;
struct upstream *cur;
struct timeval tv;
struct upstream_list_watcher *w;

RSPAMD_UPSTREAM_LOCK (ls->lock);
g_ptr_array_remove_index (ls->alive, up->active_idx);
@@ -431,6 +441,12 @@ rspamd_upstream_set_inactive (struct upstream_list *ls, struct upstream *up)
event_add (&up->ev, &tv);
}

DL_FOREACH (up->ls->watchers, w) {
if (w->events_mask & RSPAMD_UPSTREAM_WATCH_OFFLINE) {
w->func (up, RSPAMD_UPSTREAM_WATCH_OFFLINE, up->errors, w->ud);
}
}

RSPAMD_UPSTREAM_UNLOCK (ls->lock);
}

@@ -440,6 +456,7 @@ rspamd_upstream_fail (struct upstream *up, gboolean addr_failure)
gdouble error_rate, max_error_rate;
gdouble sec_last, sec_cur;
struct upstream_addr_elt *addr_elt;
struct upstream_list_watcher *w;

if (up->ctx && up->active_idx != -1) {
sec_cur = rspamd_get_ticks (FALSE);
@@ -449,6 +466,12 @@ rspamd_upstream_fail (struct upstream *up, gboolean addr_failure)
/* We have the first error */
up->last_fail = sec_cur;
up->errors = 1;

DL_FOREACH (up->ls->watchers, w) {
if (w->events_mask & RSPAMD_UPSTREAM_WATCH_FAILURE) {
w->func (up, RSPAMD_UPSTREAM_WATCH_FAILURE, 1, w->ud);
}
}
}
else {
sec_last = up->last_fail;
@@ -456,6 +479,12 @@ rspamd_upstream_fail (struct upstream *up, gboolean addr_failure)
if (sec_cur >= sec_last) {
up->errors ++;

DL_FOREACH (up->ls->watchers, w) {
if (w->events_mask & RSPAMD_UPSTREAM_WATCH_FAILURE) {
w->func (up, RSPAMD_UPSTREAM_WATCH_FAILURE, up->errors, w->ud);
}
}

if (sec_cur > sec_last) {
error_rate = ((gdouble)up->errors) / (sec_cur - sec_last);
max_error_rate = ((gdouble)up->ls->limits.max_errors) /
@@ -499,6 +528,7 @@ void
rspamd_upstream_ok (struct upstream *up)
{
struct upstream_addr_elt *addr_elt;
struct upstream_list_watcher *w;

RSPAMD_UPSTREAM_LOCK (up->lock);
if (up->errors > 0 && up->active_idx != -1) {
@@ -509,6 +539,12 @@ rspamd_upstream_ok (struct upstream *up)
addr_elt = g_ptr_array_index (up->addrs.addr, up->addrs.cur);
addr_elt->errors = 0;
}

DL_FOREACH (up->ls->watchers, w) {
if (w->events_mask & RSPAMD_UPSTREAM_WATCH_SUCCESS) {
w->func (up, RSPAMD_UPSTREAM_WATCH_SUCCESS, 0, w->ud);
}
}
}

RSPAMD_UPSTREAM_UNLOCK (up->lock);
@@ -831,6 +867,7 @@ rspamd_upstreams_destroy (struct upstream_list *ups)
{
guint i;
struct upstream *up;
struct upstream_list_watcher *w, *tmp;

if (ups != NULL) {
g_ptr_array_free (ups->alive, TRUE);
@@ -841,6 +878,10 @@ rspamd_upstreams_destroy (struct upstream_list *ups)
REF_RELEASE (up);
}

DL_FOREACH_SAFE (ups->watchers, w, tmp) {
g_free (w);
}

g_ptr_array_free (ups->ups, TRUE);
rspamd_mutex_free (ups->lock);
g_free (ups);
@@ -852,6 +893,7 @@ rspamd_upstream_restore_cb (gpointer elt, gpointer ls)
{
struct upstream *up = (struct upstream *)elt;
struct upstream_list *ups = (struct upstream_list *)ls;
struct upstream_list_watcher *w;

/* Here the upstreams list is already locked */
RSPAMD_UPSTREAM_LOCK (up->lock);
@@ -862,6 +904,13 @@ rspamd_upstream_restore_cb (gpointer elt, gpointer ls)
g_ptr_array_add (ups->alive, up);
up->active_idx = ups->alive->len - 1;
RSPAMD_UPSTREAM_UNLOCK (up->lock);

DL_FOREACH (up->ls->watchers, w) {
if (w->events_mask & RSPAMD_UPSTREAM_WATCH_ONLINE) {
w->func (up, RSPAMD_UPSTREAM_WATCH_ONLINE, up->errors, w->ud);
}
}

/* For revive event */
REF_RELEASE (up);
}
@@ -1125,3 +1174,20 @@ rspamd_upstreams_set_limits (struct upstream_list *ups,
ups->limits.dns_retransmits = dns_retransmits;
}
}

void rspamd_upstreams_add_watch_callback (struct upstream_list *ups,
enum rspamd_upstreams_watch_event events,
rspamd_upstream_watch_func func,
gpointer ud)
{
struct upstream_list_watcher *nw;

g_assert ((events & RSPAMD_UPSTREAM_WATCH_ALL) != 0);

nw = g_malloc (sizeof (*nw));
nw->func = func;
nw->events_mask = events;
nw->ud = ud;

DL_APPEND (ups->watchers, nw);
}

+ 25
- 0
src/libutil/upstream.h View File

@@ -181,6 +181,31 @@ typedef void (*rspamd_upstream_traverse_func) (struct upstream *up, guint idx,
void rspamd_upstreams_foreach (struct upstream_list *ups,
rspamd_upstream_traverse_func cb, void *ud);

enum rspamd_upstreams_watch_event {
RSPAMD_UPSTREAM_WATCH_SUCCESS = 1u << 0,
RSPAMD_UPSTREAM_WATCH_FAILURE = 1u << 1,
RSPAMD_UPSTREAM_WATCH_OFFLINE = 1u << 2,
RSPAMD_UPSTREAM_WATCH_ONLINE = 1u << 3,
RSPAMD_UPSTREAM_WATCH_ALL = (1u << 0) | (1u << 1) | (1u << 2) | (1u << 3),
};

typedef void (*rspamd_upstream_watch_func) (struct upstream *up,
enum rspamd_upstreams_watch_event event,
guint cur_errors,
void *ud);

/**
* Adds new watcher to the upstreams list
* @param ups
* @param events
* @param func
* @param ud
*/
void rspamd_upstreams_add_watch_callback (struct upstream_list *ups,
enum rspamd_upstreams_watch_event events,
rspamd_upstream_watch_func func,
gpointer ud);

/**
* Returns the current IP address of the upstream
* @param up

Loading…
Cancel
Save