diff options
author | Vsevolod Stakhov <vsevolod@highsecure.ru> | 2018-12-04 17:05:18 +0000 |
---|---|---|
committer | Vsevolod Stakhov <vsevolod@highsecure.ru> | 2018-12-04 17:05:18 +0000 |
commit | a6a1a8d5aea8439b78645d2dc77b61a898020b7d (patch) | |
tree | 1b94a56d91e74bb4799911a88da2268ae5ff444c | |
parent | b99cd00a121e1f80077144678207f21c82690544 (diff) | |
download | rspamd-a6a1a8d5aea8439b78645d2dc77b61a898020b7d.tar.gz rspamd-a6a1a8d5aea8439b78645d2dc77b61a898020b7d.zip |
[Feature] Implement event watchers for upstreams
-rw-r--r-- | src/libutil/upstream.c | 70 | ||||
-rw-r--r-- | src/libutil/upstream.h | 25 |
2 files changed, 93 insertions, 2 deletions
diff --git a/src/libutil/upstream.c b/src/libutil/upstream.c index 4ed657da4..90f792bbe 100644 --- a/src/libutil/upstream.c +++ b/src/libutil/upstream.c @@ -34,6 +34,13 @@ struct upstream_addr_elt { guint errors; }; +struct upstream_list_watcher { + rspamd_upstream_watch_func func; + gpointer ud; + enum rspamd_upstreams_watch_event events_mask; + struct upstream_list_watcher *next, *prev; +}; + struct upstream { guint weight; guint cur_weight; @@ -73,6 +80,7 @@ struct upstream_list { struct upstream_ctx *ctx; GPtrArray *ups; GPtrArray *alive; + struct upstream_list_watcher *watchers; rspamd_mutex_t *lock; guint64 hash_seed; struct upstream_limits limits; @@ -109,8 +117,9 @@ static guint default_dns_retransmits = 2; void rspamd_upstreams_library_config (struct rspamd_config *cfg, - struct upstream_ctx *ctx, struct event_base *ev_base, - struct rdns_resolver *resolver) + struct upstream_ctx *ctx, + struct event_base *ev_base, + struct rdns_resolver *resolver) { g_assert (ctx != NULL); g_assert (cfg != NULL); @@ -405,6 +414,7 @@ rspamd_upstream_set_inactive (struct upstream_list *ls, struct upstream *up) guint i; struct upstream *cur; struct timeval tv; + struct upstream_list_watcher *w; RSPAMD_UPSTREAM_LOCK (ls->lock); g_ptr_array_remove_index (ls->alive, up->active_idx); @@ -431,6 +441,12 @@ rspamd_upstream_set_inactive (struct upstream_list *ls, struct upstream *up) event_add (&up->ev, &tv); } + DL_FOREACH (up->ls->watchers, w) { + if (w->events_mask & RSPAMD_UPSTREAM_WATCH_OFFLINE) { + w->func (up, RSPAMD_UPSTREAM_WATCH_OFFLINE, up->errors, w->ud); + } + } + RSPAMD_UPSTREAM_UNLOCK (ls->lock); } @@ -440,6 +456,7 @@ rspamd_upstream_fail (struct upstream *up, gboolean addr_failure) gdouble error_rate, max_error_rate; gdouble sec_last, sec_cur; struct upstream_addr_elt *addr_elt; + struct upstream_list_watcher *w; if (up->ctx && up->active_idx != -1) { sec_cur = rspamd_get_ticks (FALSE); @@ -449,6 +466,12 @@ rspamd_upstream_fail (struct upstream *up, gboolean addr_failure) /* We have the first error */ up->last_fail = sec_cur; up->errors = 1; + + DL_FOREACH (up->ls->watchers, w) { + if (w->events_mask & RSPAMD_UPSTREAM_WATCH_FAILURE) { + w->func (up, RSPAMD_UPSTREAM_WATCH_FAILURE, 1, w->ud); + } + } } else { sec_last = up->last_fail; @@ -456,6 +479,12 @@ rspamd_upstream_fail (struct upstream *up, gboolean addr_failure) if (sec_cur >= sec_last) { up->errors ++; + DL_FOREACH (up->ls->watchers, w) { + if (w->events_mask & RSPAMD_UPSTREAM_WATCH_FAILURE) { + w->func (up, RSPAMD_UPSTREAM_WATCH_FAILURE, up->errors, w->ud); + } + } + if (sec_cur > sec_last) { error_rate = ((gdouble)up->errors) / (sec_cur - sec_last); max_error_rate = ((gdouble)up->ls->limits.max_errors) / @@ -499,6 +528,7 @@ void rspamd_upstream_ok (struct upstream *up) { struct upstream_addr_elt *addr_elt; + struct upstream_list_watcher *w; RSPAMD_UPSTREAM_LOCK (up->lock); if (up->errors > 0 && up->active_idx != -1) { @@ -509,6 +539,12 @@ rspamd_upstream_ok (struct upstream *up) addr_elt = g_ptr_array_index (up->addrs.addr, up->addrs.cur); addr_elt->errors = 0; } + + DL_FOREACH (up->ls->watchers, w) { + if (w->events_mask & RSPAMD_UPSTREAM_WATCH_SUCCESS) { + w->func (up, RSPAMD_UPSTREAM_WATCH_SUCCESS, 0, w->ud); + } + } } RSPAMD_UPSTREAM_UNLOCK (up->lock); @@ -831,6 +867,7 @@ rspamd_upstreams_destroy (struct upstream_list *ups) { guint i; struct upstream *up; + struct upstream_list_watcher *w, *tmp; if (ups != NULL) { g_ptr_array_free (ups->alive, TRUE); @@ -841,6 +878,10 @@ rspamd_upstreams_destroy (struct upstream_list *ups) REF_RELEASE (up); } + DL_FOREACH_SAFE (ups->watchers, w, tmp) { + g_free (w); + } + g_ptr_array_free (ups->ups, TRUE); rspamd_mutex_free (ups->lock); g_free (ups); @@ -852,6 +893,7 @@ rspamd_upstream_restore_cb (gpointer elt, gpointer ls) { struct upstream *up = (struct upstream *)elt; struct upstream_list *ups = (struct upstream_list *)ls; + struct upstream_list_watcher *w; /* Here the upstreams list is already locked */ RSPAMD_UPSTREAM_LOCK (up->lock); @@ -862,6 +904,13 @@ rspamd_upstream_restore_cb (gpointer elt, gpointer ls) g_ptr_array_add (ups->alive, up); up->active_idx = ups->alive->len - 1; RSPAMD_UPSTREAM_UNLOCK (up->lock); + + DL_FOREACH (up->ls->watchers, w) { + if (w->events_mask & RSPAMD_UPSTREAM_WATCH_ONLINE) { + w->func (up, RSPAMD_UPSTREAM_WATCH_ONLINE, up->errors, w->ud); + } + } + /* For revive event */ REF_RELEASE (up); } @@ -1125,3 +1174,20 @@ rspamd_upstreams_set_limits (struct upstream_list *ups, ups->limits.dns_retransmits = dns_retransmits; } } + +void rspamd_upstreams_add_watch_callback (struct upstream_list *ups, + enum rspamd_upstreams_watch_event events, + rspamd_upstream_watch_func func, + gpointer ud) +{ + struct upstream_list_watcher *nw; + + g_assert ((events & RSPAMD_UPSTREAM_WATCH_ALL) != 0); + + nw = g_malloc (sizeof (*nw)); + nw->func = func; + nw->events_mask = events; + nw->ud = ud; + + DL_APPEND (ups->watchers, nw); +} diff --git a/src/libutil/upstream.h b/src/libutil/upstream.h index 9b5c7794c..56d6fa6c5 100644 --- a/src/libutil/upstream.h +++ b/src/libutil/upstream.h @@ -181,6 +181,31 @@ typedef void (*rspamd_upstream_traverse_func) (struct upstream *up, guint idx, void rspamd_upstreams_foreach (struct upstream_list *ups, rspamd_upstream_traverse_func cb, void *ud); +enum rspamd_upstreams_watch_event { + RSPAMD_UPSTREAM_WATCH_SUCCESS = 1u << 0, + RSPAMD_UPSTREAM_WATCH_FAILURE = 1u << 1, + RSPAMD_UPSTREAM_WATCH_OFFLINE = 1u << 2, + RSPAMD_UPSTREAM_WATCH_ONLINE = 1u << 3, + RSPAMD_UPSTREAM_WATCH_ALL = (1u << 0) | (1u << 1) | (1u << 2) | (1u << 3), +}; + +typedef void (*rspamd_upstream_watch_func) (struct upstream *up, + enum rspamd_upstreams_watch_event event, + guint cur_errors, + void *ud); + +/** + * Adds new watcher to the upstreams list + * @param ups + * @param events + * @param func + * @param ud + */ +void rspamd_upstreams_add_watch_callback (struct upstream_list *ups, + enum rspamd_upstreams_watch_event events, + rspamd_upstream_watch_func func, + gpointer ud); + /** * Returns the current IP address of the upstream * @param up |