]> source.dussan.org Git - rspamd.git/commitdiff
[Rework] Send health checks from a single worker
authorVsevolod Stakhov <vsevolod@highsecure.ru>
Thu, 6 Jul 2017 18:08:15 +0000 (19:08 +0100)
committerVsevolod Stakhov <vsevolod@highsecure.ru>
Thu, 6 Jul 2017 18:08:15 +0000 (19:08 +0100)
src/controller.c
src/libserver/monitored.c
src/libserver/monitored.h
src/libserver/worker_util.c
src/libserver/worker_util.h
src/worker.c

index 7deec25247bf5020b9e6b2c19d0f272221a682b2..f7e7bd0439af26ef32567ce75a089f76e654ec65 100644 (file)
@@ -26,7 +26,6 @@
 #include "cryptobox.h"
 #include "ottery.h"
 #include "fuzzy_wire.h"
-#include "libutil/rrd.h"
 #include "unix-std.h"
 #include "utlist.h"
 #include <math.h>
@@ -3794,6 +3793,10 @@ start_controller_worker (struct rspamd_worker *worker)
                        worker);
        rspamd_stat_init (worker->srv->cfg, ctx->ev_base);
 
+       if (worker->index == 0) {
+               rspamd_worker_init_monitored (worker, ctx->ev_base, ctx->resolver);
+       }
+
        event_base_loop (ctx->ev_base, 0);
        rspamd_worker_block_signals ();
 
index a280298c87f808a852ecfe03f6b90db61cae14b7..548dcf362c8ef5db684d200c55b27885864131c7 100644 (file)
@@ -386,6 +386,13 @@ rspamd_monitored_ctx_config (struct rspamd_monitored_ctx *ctx,
 }
 
 
+struct event_base *
+rspamd_monitored_ctx_get_ev_base (struct rspamd_monitored_ctx *ctx)
+{
+       return ctx->ev_base;
+}
+
+
 struct rspamd_monitored *
 rspamd_monitored_create_ (struct rspamd_monitored_ctx *ctx,
                const gchar *line,
@@ -580,5 +587,5 @@ rspamd_monitored_get_tag (struct rspamd_monitored *m,
 {
        g_assert (m != NULL);
 
-       rspamd_strlcpy (m->tag, tag_out, RSPAMD_MONITORED_TAG_LEN);
+       rspamd_strlcpy (tag_out, m->tag, RSPAMD_MONITORED_TAG_LEN);
 }
\ No newline at end of file
index 261227d35c237d773e35f4d3ccb39ba1b80507a5..4db41f9c2ff87b9a8294c229ae9b3d827af2338a 100644 (file)
@@ -57,6 +57,8 @@ void rspamd_monitored_ctx_config (struct rspamd_monitored_ctx *ctx,
                mon_change_cb change_cb,
                gpointer ud);
 
+struct event_base *rspamd_monitored_ctx_get_ev_base (struct rspamd_monitored_ctx *ctx);
+
 /**
  * Create monitored object
  * @param ctx context
index fcd9a8be9becb57ed56aeb181119d63b712e41b9..0c0cc9344225118f4ec2a8685ff0ab5dc2b4ed6c 100644 (file)
@@ -802,4 +802,37 @@ rspamd_worker_session_cache_remove (void *cache, void *ptr)
        struct rspamd_worker_session_cache *c = cache;
 
        g_hash_table_remove (c->cache, ptr);
+}
+
+static void
+rspamd_worker_monitored_on_change (struct rspamd_monitored_ctx *ctx,
+               struct rspamd_monitored *m, gboolean alive,
+               void *ud)
+{
+       struct rspamd_worker *worker = ud;
+       struct rspamd_config *cfg = worker->srv->cfg;
+       struct event_base *ev_base;
+       guchar tag[RSPAMD_MONITORED_TAG_LEN];
+       static struct rspamd_srv_command srv_cmd;
+
+       rspamd_monitored_get_tag (m, tag);
+       ev_base = rspamd_monitored_ctx_get_ev_base (ctx);
+       srv_cmd.type = RSPAMD_SRV_MONITORED_CHANGE;
+       rspamd_strlcpy (srv_cmd.cmd.monitored_change.tag, tag,
+                       sizeof (srv_cmd.cmd.monitored_change.tag));
+       srv_cmd.cmd.monitored_change.alive = alive;
+       msg_info_config ("broadcast monitored update for %s: %s",
+                       srv_cmd.cmd.monitored_change.tag, alive ? "alive" : "dead");
+
+       rspamd_srv_send_command (worker, ev_base, &srv_cmd, -1, NULL, NULL);
+}
+
+void
+rspamd_worker_init_monitored (struct rspamd_worker *worker,
+               struct event_base *ev_base,
+               struct rspamd_dns_resolver *resolver)
+{
+       rspamd_monitored_ctx_config (worker->srv->cfg->monitored_ctx,
+                       worker->srv->cfg, ev_base, resolver->r,
+                       rspamd_worker_monitored_on_change, worker);
 }
\ No newline at end of file
index b21a92bfcccda37da5fafced5073563f076c2caf..5e33c940dd3b87c6d649f3f6f2aec9104b6651a8 100644 (file)
@@ -167,6 +167,16 @@ void rspamd_worker_session_cache_remove (void *cache, void *ptr);
 struct rspamd_worker *rspamd_fork_worker (struct rspamd_main *,
                struct rspamd_worker_conf *, guint idx, struct event_base *ev_base);
 
+/**
+ * Initialise the main monitoring worker
+ * @param worker
+ * @param ev_base
+ * @param resolver
+ */
+void rspamd_worker_init_monitored (struct rspamd_worker *worker,
+               struct event_base *ev_base,
+               struct rspamd_dns_resolver *resolver);
+
 #define msg_err_main(...) rspamd_default_log_function (G_LOG_LEVEL_CRITICAL, \
         rspamd_main->server_pool->tag.tagname, rspamd_main->server_pool->tag.uid, \
         G_STRFUNC, \
index 12a4ae2992f5f9c56fed0c1347300abe5219d06b..5c81f419ae4b5e1a4e2893ece785b72fe1d05920 100644 (file)
@@ -498,6 +498,7 @@ rspamd_worker_monitored_handler (struct rspamd_main *rspamd_main,
        struct rspamd_control_reply rep;
        struct rspamd_monitored *m;
        struct rspamd_monitored_ctx *mctx = worker->srv->cfg->monitored_ctx;
+       struct rspamd_config *cfg = worker->srv->cfg;
 
        memset (&rep, 0, sizeof (rep));
        rep.type = RSPAMD_CONTROL_MONITORED_CHANGE;
@@ -506,6 +507,9 @@ rspamd_worker_monitored_handler (struct rspamd_main *rspamd_main,
        if (!m) {
                rspamd_monitored_set_alive (m, cmd->cmd.monitored_change.alive);
                rep.reply.monitored_change.status = 1;
+               msg_info_config ("updated monitored status for %s: %s",
+                               cmd->cmd.monitored_change.tag,
+                               cmd->cmd.monitored_change.alive ? "alive" : "dead");
        }
        else {
                msg_err ("cannot find monitored by tag: %*s", 32,
@@ -637,8 +641,6 @@ rspamd_worker_init_scanner (struct rspamd_worker *worker,
                struct event_base *ev_base,
                struct rspamd_dns_resolver *resolver)
 {
-       rspamd_monitored_ctx_config (worker->srv->cfg->monitored_ctx,
-                       worker->srv->cfg, ev_base, resolver->r, NULL, NULL);
        rspamd_stat_init (worker->srv->cfg, ev_base);
        g_ptr_array_add (worker->finish_actions,
                        (gpointer) rspamd_worker_on_terminate);