From 52b4b8db0b126f7004d6aff79cbea3b06f94c927 Mon Sep 17 00:00:00 2001 From: Vsevolod Stakhov Date: Thu, 6 Jul 2017 19:08:15 +0100 Subject: [PATCH] [Rework] Send health checks from a single worker --- src/controller.c | 5 ++++- src/libserver/monitored.c | 9 ++++++++- src/libserver/monitored.h | 2 ++ src/libserver/worker_util.c | 33 +++++++++++++++++++++++++++++++++ src/libserver/worker_util.h | 10 ++++++++++ src/worker.c | 6 ++++-- 6 files changed, 61 insertions(+), 4 deletions(-) diff --git a/src/controller.c b/src/controller.c index 7deec2524..f7e7bd043 100644 --- a/src/controller.c +++ b/src/controller.c @@ -26,7 +26,6 @@ #include "cryptobox.h" #include "ottery.h" #include "fuzzy_wire.h" -#include "libutil/rrd.h" #include "unix-std.h" #include "utlist.h" #include @@ -3794,6 +3793,10 @@ start_controller_worker (struct rspamd_worker *worker) worker); rspamd_stat_init (worker->srv->cfg, ctx->ev_base); + if (worker->index == 0) { + rspamd_worker_init_monitored (worker, ctx->ev_base, ctx->resolver); + } + event_base_loop (ctx->ev_base, 0); rspamd_worker_block_signals (); diff --git a/src/libserver/monitored.c b/src/libserver/monitored.c index a280298c8..548dcf362 100644 --- a/src/libserver/monitored.c +++ b/src/libserver/monitored.c @@ -386,6 +386,13 @@ rspamd_monitored_ctx_config (struct rspamd_monitored_ctx *ctx, } +struct event_base * +rspamd_monitored_ctx_get_ev_base (struct rspamd_monitored_ctx *ctx) +{ + return ctx->ev_base; +} + + struct rspamd_monitored * rspamd_monitored_create_ (struct rspamd_monitored_ctx *ctx, const gchar *line, @@ -580,5 +587,5 @@ rspamd_monitored_get_tag (struct rspamd_monitored *m, { g_assert (m != NULL); - rspamd_strlcpy (m->tag, tag_out, RSPAMD_MONITORED_TAG_LEN); + rspamd_strlcpy (tag_out, m->tag, RSPAMD_MONITORED_TAG_LEN); } \ No newline at end of file diff --git a/src/libserver/monitored.h b/src/libserver/monitored.h index 261227d35..4db41f9c2 100644 --- a/src/libserver/monitored.h +++ b/src/libserver/monitored.h @@ -57,6 +57,8 @@ void rspamd_monitored_ctx_config (struct rspamd_monitored_ctx *ctx, mon_change_cb change_cb, gpointer ud); +struct event_base *rspamd_monitored_ctx_get_ev_base (struct rspamd_monitored_ctx *ctx); + /** * Create monitored object * @param ctx context diff --git a/src/libserver/worker_util.c b/src/libserver/worker_util.c index fcd9a8be9..0c0cc9344 100644 --- a/src/libserver/worker_util.c +++ b/src/libserver/worker_util.c @@ -802,4 +802,37 @@ rspamd_worker_session_cache_remove (void *cache, void *ptr) struct rspamd_worker_session_cache *c = cache; g_hash_table_remove (c->cache, ptr); +} + +static void +rspamd_worker_monitored_on_change (struct rspamd_monitored_ctx *ctx, + struct rspamd_monitored *m, gboolean alive, + void *ud) +{ + struct rspamd_worker *worker = ud; + struct rspamd_config *cfg = worker->srv->cfg; + struct event_base *ev_base; + guchar tag[RSPAMD_MONITORED_TAG_LEN]; + static struct rspamd_srv_command srv_cmd; + + rspamd_monitored_get_tag (m, tag); + ev_base = rspamd_monitored_ctx_get_ev_base (ctx); + srv_cmd.type = RSPAMD_SRV_MONITORED_CHANGE; + rspamd_strlcpy (srv_cmd.cmd.monitored_change.tag, tag, + sizeof (srv_cmd.cmd.monitored_change.tag)); + srv_cmd.cmd.monitored_change.alive = alive; + msg_info_config ("broadcast monitored update for %s: %s", + srv_cmd.cmd.monitored_change.tag, alive ? "alive" : "dead"); + + rspamd_srv_send_command (worker, ev_base, &srv_cmd, -1, NULL, NULL); +} + +void +rspamd_worker_init_monitored (struct rspamd_worker *worker, + struct event_base *ev_base, + struct rspamd_dns_resolver *resolver) +{ + rspamd_monitored_ctx_config (worker->srv->cfg->monitored_ctx, + worker->srv->cfg, ev_base, resolver->r, + rspamd_worker_monitored_on_change, worker); } \ No newline at end of file diff --git a/src/libserver/worker_util.h b/src/libserver/worker_util.h index b21a92bfc..5e33c940d 100644 --- a/src/libserver/worker_util.h +++ b/src/libserver/worker_util.h @@ -167,6 +167,16 @@ void rspamd_worker_session_cache_remove (void *cache, void *ptr); struct rspamd_worker *rspamd_fork_worker (struct rspamd_main *, struct rspamd_worker_conf *, guint idx, struct event_base *ev_base); +/** + * Initialise the main monitoring worker + * @param worker + * @param ev_base + * @param resolver + */ +void rspamd_worker_init_monitored (struct rspamd_worker *worker, + struct event_base *ev_base, + struct rspamd_dns_resolver *resolver); + #define msg_err_main(...) rspamd_default_log_function (G_LOG_LEVEL_CRITICAL, \ rspamd_main->server_pool->tag.tagname, rspamd_main->server_pool->tag.uid, \ G_STRFUNC, \ diff --git a/src/worker.c b/src/worker.c index 12a4ae299..5c81f419a 100644 --- a/src/worker.c +++ b/src/worker.c @@ -498,6 +498,7 @@ rspamd_worker_monitored_handler (struct rspamd_main *rspamd_main, struct rspamd_control_reply rep; struct rspamd_monitored *m; struct rspamd_monitored_ctx *mctx = worker->srv->cfg->monitored_ctx; + struct rspamd_config *cfg = worker->srv->cfg; memset (&rep, 0, sizeof (rep)); rep.type = RSPAMD_CONTROL_MONITORED_CHANGE; @@ -506,6 +507,9 @@ rspamd_worker_monitored_handler (struct rspamd_main *rspamd_main, if (!m) { rspamd_monitored_set_alive (m, cmd->cmd.monitored_change.alive); rep.reply.monitored_change.status = 1; + msg_info_config ("updated monitored status for %s: %s", + cmd->cmd.monitored_change.tag, + cmd->cmd.monitored_change.alive ? "alive" : "dead"); } else { msg_err ("cannot find monitored by tag: %*s", 32, @@ -637,8 +641,6 @@ rspamd_worker_init_scanner (struct rspamd_worker *worker, struct event_base *ev_base, struct rspamd_dns_resolver *resolver) { - rspamd_monitored_ctx_config (worker->srv->cfg->monitored_ctx, - worker->srv->cfg, ev_base, resolver->r, NULL, NULL); rspamd_stat_init (worker->srv->cfg, ev_base); g_ptr_array_add (worker->finish_actions, (gpointer) rspamd_worker_on_terminate); -- 2.39.5