From 3d97675cf4361d30dd541eff5b6b13c57cf36b80 Mon Sep 17 00:00:00 2001 From: Vsevolod Stakhov Date: Tue, 10 Sep 2019 17:42:18 +0100 Subject: [PATCH] [Project] Add preliminary support of the heartbeats --- src/libserver/cfg_file.h | 1 + src/libserver/cfg_rcl.c | 6 +++ src/libserver/cfg_rcl.h | 1 + src/libserver/cfg_utils.c | 1 + src/libserver/worker_util.c | 91 +++++++++++++++++++++++++++++++++++++ src/rspamd.c | 1 + src/rspamd.h | 9 +++- 7 files changed, 109 insertions(+), 1 deletion(-) diff --git a/src/libserver/cfg_file.h b/src/libserver/cfg_file.h index 263d00f38..7186a73ec 100644 --- a/src/libserver/cfg_file.h +++ b/src/libserver/cfg_file.h @@ -380,6 +380,7 @@ struct rspamd_config { gsize images_cache_size; /**< size of LRU cache for DCT data from images */ gdouble task_timeout; /**< maximum message processing time */ gint default_max_shots; /**< default maximum count of symbols hits permitted (-1 for unlimited) */ + gdouble heartbeat_interval; /**< interval for heartbeats for workers */ enum rspamd_log_type log_type; /**< log type */ gint log_facility; /**< log facility in case of syslog */ diff --git a/src/libserver/cfg_rcl.c b/src/libserver/cfg_rcl.c index fb2cbf052..5a1d3a639 100644 --- a/src/libserver/cfg_rcl.c +++ b/src/libserver/cfg_rcl.c @@ -2182,6 +2182,12 @@ rspamd_rcl_config_init (struct rspamd_config *cfg, GHashTable *skip_sections) G_STRUCT_OFFSET (struct rspamd_config, full_gc_iters), RSPAMD_CL_FLAG_UINT, "Task scanned before memory gc is performed (default: 0 - disabled)"); + rspamd_rcl_add_default_handler (sub, + "heartbeat_interval", + rspamd_rcl_parse_struct_time, + G_STRUCT_OFFSET (struct rspamd_config, heartbeat_interval), + RSPAMD_CL_FLAG_TIME_FLOAT, + "Time between workers heartbeats"); /* Neighbours configuration */ rspamd_rcl_add_section_doc (&sub->subsections, "neighbours", "name", diff --git a/src/libserver/cfg_rcl.h b/src/libserver/cfg_rcl.h index 7f97b100d..1a2d69c58 100644 --- a/src/libserver/cfg_rcl.h +++ b/src/libserver/cfg_rcl.h @@ -413,6 +413,7 @@ ucl_object_t *rspamd_rcl_add_doc_by_path (struct rspamd_config *cfg, const char *default_value, gboolean required); + /** * Parses example and adds documentation according to the example: * diff --git a/src/libserver/cfg_utils.c b/src/libserver/cfg_utils.c index 619e8784e..bd8595514 100644 --- a/src/libserver/cfg_utils.c +++ b/src/libserver/cfg_utils.c @@ -233,6 +233,7 @@ rspamd_config_new (enum rspamd_config_init_flags flags) cfg->max_sessions_cache = DEFAULT_MAX_SESSIONS; cfg->maps_cache_dir = rspamd_mempool_strdup (cfg->cfg_pool, RSPAMD_DBDIR); cfg->c_modules = g_ptr_array_new (); + cfg->heartbeat_interval = 10.0; REF_INIT_RETAIN (cfg, rspamd_config_free); diff --git a/src/libserver/worker_util.c b/src/libserver/worker_util.c index 511289a2d..47812ded1 100644 --- a/src/libserver/worker_util.c +++ b/src/libserver/worker_util.c @@ -63,6 +63,10 @@ #include "contrib/libev/ev.h" +/* Forward declaration */ +static void rspamd_worker_heartbeat_start (struct rspamd_worker *, + struct ev_loop *); + static void rspamd_worker_ignore_signal (struct rspamd_worker_signal_handler *); /** * Return worker's control structure by its type @@ -362,6 +366,7 @@ rspamd_prepare_worker (struct rspamd_worker *worker, const char *name, rspamd_worker_init_signals (worker, event_loop); rspamd_control_worker_add_default_handler (worker, event_loop); + rspamd_worker_heartbeat_start (worker, event_loop); #ifdef WITH_HIREDIS rspamd_redis_pool_config (worker->srv->cfg->redis_pool, worker->srv->cfg, event_loop); @@ -683,6 +688,91 @@ rspamd_worker_on_term (EV_P_ ev_child *w, int revents) } } +static void +rspamd_worker_heartbeat_cb (EV_P_ ev_timer *w, int revents) +{ + struct rspamd_worker *wrk = (struct rspamd_worker *)w->data; + +} + +static void +rspamd_worker_heartbeat_start (struct rspamd_worker *wrk, struct ev_loop *event_loop) +{ + wrk->hb.heartbeat_ev.data = (void *)wrk; + ev_timer_init (&wrk->hb.heartbeat_ev, rspamd_worker_heartbeat_cb, + 0.0, wrk->srv->cfg->heartbeat_interval); + ev_timer_start (event_loop, &wrk->hb.heartbeat_ev); +} + +static void +rspamd_main_heartbeat_cb (EV_P_ ev_timer *w, int revents) +{ + struct rspamd_worker *wrk = (struct rspamd_worker *)w->data; + gdouble time_from_last = ev_time (); + struct rspamd_main *rspamd_main; + struct tm tm; + gchar timebuf[64]; + gchar usec_buf[16]; + gint r; + + time_from_last -= wrk->hb.last_event; + rspamd_main = wrk->srv; + + if (time_from_last > 0 && time_from_last > rspamd_main->cfg->heartbeat_interval) { + rspamd_localtime (wrk->hb.last_event, &tm); + r = strftime (timebuf, sizeof (timebuf), "%F %H:%M:%S", &tm); + rspamd_snprintf (usec_buf, sizeof (usec_buf), "%.5f", + wrk->hb.last_event - (gdouble)(time_t)wrk->hb.last_event); + rspamd_snprintf (timebuf + r, sizeof (timebuf) - r, + "%s", usec_buf + 1); + + if (wrk->hb.nbeats > 0) { + /* First time lost event */ + msg_warn_main ("lost heartbeat from worker type %s with pid %P, " + "last beat on: %s (%L beats received previously)", + g_quark_to_string (wrk->type), wrk->pid, + timebuf, + wrk->hb.nbeats); + wrk->hb.nbeats = -1; + /* TODO: send notify about worker problem */ + } + else { + wrk->hb.nbeats --; + msg_warn_main ("lost %L heartbeat from worker type %s with pid %P, " + "last beat on: %s", + -(wrk->hb.nbeats), + g_quark_to_string (wrk->type), + wrk->pid, + timebuf); + } + } + else if (wrk->hb.nbeats < 0) { + rspamd_localtime (wrk->hb.last_event, &tm); + r = strftime (timebuf, sizeof (timebuf), "%F %H:%M:%S", &tm); + rspamd_snprintf (usec_buf, sizeof (usec_buf), "%.5f", + wrk->hb.last_event - (gdouble)(time_t)wrk->hb.last_event); + rspamd_snprintf (timebuf + r, sizeof (timebuf) - r, + "%s", usec_buf + 1); + + msg_info_main ("received heartbeat from worker type %s with pid %P, " + "last beat on: %s (%L beats lost previously)", + g_quark_to_string (wrk->type), wrk->pid, + timebuf, + -(wrk->hb.nbeats)); + wrk->hb.nbeats = 1; + /* TODO: send notify about worker restoration */ + } +} + +static void +rspamd_main_heartbeat_start (struct rspamd_worker *wrk, struct ev_loop *event_loop) +{ + wrk->hb.heartbeat_ev.data = (void *)wrk; + ev_timer_init (&wrk->hb.heartbeat_ev, rspamd_main_heartbeat_cb, + 0.0, wrk->srv->cfg->heartbeat_interval * 2); + ev_timer_start (event_loop, &wrk->hb.heartbeat_ev); +} + struct rspamd_worker * rspamd_fork_worker (struct rspamd_main *rspamd_main, struct rspamd_worker_conf *cf, @@ -822,6 +912,7 @@ rspamd_fork_worker (struct rspamd_main *rspamd_main, wrk->cld_ev.data = wrk; ev_child_init (&wrk->cld_ev, rspamd_worker_on_term, wrk->pid, 0); ev_child_start (rspamd_main->event_loop, &wrk->cld_ev); + rspamd_main_heartbeat_start (wrk, rspamd_main->event_loop); /* Insert worker into worker's table, pid is index */ g_hash_table_insert (rspamd_main->workers, GSIZE_TO_POINTER ( wrk->pid), wrk); diff --git a/src/rspamd.c b/src/rspamd.c index 458d3d083..808ca6aaa 100644 --- a/src/rspamd.c +++ b/src/rspamd.c @@ -1008,6 +1008,7 @@ rspamd_cld_handler (EV_P_ ev_child *w, struct rspamd_main *rspamd_main, g_free (wrk->tmp_data); } ev_io_stop (rspamd_main->event_loop, &wrk->srv_ev); + ev_timer_stop (rspamd_main->event_loop, &wrk->hb.heartbeat_ev); } if (wrk->control_pipe[0] != -1) { diff --git a/src/rspamd.h b/src/rspamd.h index ea11965fb..3cd6c391b 100644 --- a/src/rspamd.h +++ b/src/rspamd.h @@ -76,6 +76,12 @@ struct rspamd_worker_accept_event { typedef void (*rspamd_worker_term_cb) (EV_P_ ev_child *, struct rspamd_main *, struct rspamd_worker *); +struct rspamd_worker_heartbeat { + ev_timer heartbeat_ev; /**< used by main for checking heartbeats and by workers to send heartbeats */ + ev_tstamp last_event; + gint64 nbeats; /* positive for beats received, negative for beats missed */ +}; + /** * Worker process structure */ @@ -90,7 +96,7 @@ struct rspamd_worker { struct rspamd_main *srv; /**< pointer to server structure */ GQuark type; /**< process type */ GHashTable *signal_events; /**< signal events */ - struct rspamd_worker_accept_event *accept_events; /**< socket events */ + struct rspamd_worker_accept_event *accept_events; /**< socket events */ struct rspamd_worker_conf *cf; /**< worker config data */ gpointer ctx; /**< worker's specific data */ enum rspamd_worker_flags flags; /**< worker's flags */ @@ -99,6 +105,7 @@ struct rspamd_worker { gint srv_pipe[2]; /**< used by workers to request something from the main process. [0] - main, [1] - worker */ ev_io srv_ev; /**< used by main for read workers' requests */ + struct rspamd_worker_heartbeat hb; /**< heartbeat data */ gpointer control_data; /**< used by control protocol to handle commands */ gpointer tmp_data; /**< used to avoid race condition to deal with control messages */ GPtrArray *finish_actions; /**< called when worker is terminated */ -- 2.39.5