]> source.dussan.org Git - rspamd.git/commitdiff
[Project] Add preliminary support of the heartbeats
authorVsevolod Stakhov <vsevolod@highsecure.ru>
Tue, 10 Sep 2019 16:42:18 +0000 (17:42 +0100)
committerVsevolod Stakhov <vsevolod@highsecure.ru>
Tue, 10 Sep 2019 16:42:18 +0000 (17:42 +0100)
src/libserver/cfg_file.h
src/libserver/cfg_rcl.c
src/libserver/cfg_rcl.h
src/libserver/cfg_utils.c
src/libserver/worker_util.c
src/rspamd.c
src/rspamd.h

index 263d00f38c0d5a60461e88c127c4c67ab970ceb2..7186a73ecf0f4a08b6b07df898c736fb43eb83e5 100644 (file)
@@ -380,6 +380,7 @@ struct rspamd_config {
        gsize images_cache_size;                        /**< size of LRU cache for DCT data from images                 */
        gdouble task_timeout;                           /**< maximum message processing time                                    */
        gint default_max_shots;                         /**< default maximum count of symbols hits permitted (-1 for unlimited) */
+       gdouble heartbeat_interval;                     /**< interval for heartbeats for workers                                */
 
        enum rspamd_log_type log_type;                  /**< log type                                                                                   */
        gint log_facility;                              /**< log facility in case of syslog                                             */
index fb2cbf0520a5877e68380d28f862441ca3935e21..5a1d3a639d668b225b6ded3b7796322144b2f77c 100644 (file)
@@ -2182,6 +2182,12 @@ rspamd_rcl_config_init (struct rspamd_config *cfg, GHashTable *skip_sections)
                                G_STRUCT_OFFSET (struct rspamd_config, full_gc_iters),
                                RSPAMD_CL_FLAG_UINT,
                                "Task scanned before memory gc is performed (default: 0 - disabled)");
+               rspamd_rcl_add_default_handler (sub,
+                               "heartbeat_interval",
+                               rspamd_rcl_parse_struct_time,
+                               G_STRUCT_OFFSET (struct rspamd_config, heartbeat_interval),
+                               RSPAMD_CL_FLAG_TIME_FLOAT,
+                               "Time between workers heartbeats");
 
                /* Neighbours configuration */
                rspamd_rcl_add_section_doc (&sub->subsections, "neighbours", "name",
index 7f97b100d010f7f27eb2ce291b061af4e3743433..1a2d69c58f62a44992348b1481f4a25153b6d4d8 100644 (file)
@@ -413,6 +413,7 @@ ucl_object_t *rspamd_rcl_add_doc_by_path (struct rspamd_config *cfg,
                const char *default_value,
                gboolean required);
 
+
 /**
  * Parses example and adds documentation according to the example:
  *
index 619e8784e205f8c43c7e12b5687a156ab10c5229..bd859551454af2714a4c070aa0645f10b6aaa765 100644 (file)
@@ -233,6 +233,7 @@ rspamd_config_new (enum rspamd_config_init_flags flags)
        cfg->max_sessions_cache = DEFAULT_MAX_SESSIONS;
        cfg->maps_cache_dir = rspamd_mempool_strdup (cfg->cfg_pool, RSPAMD_DBDIR);
        cfg->c_modules = g_ptr_array_new ();
+       cfg->heartbeat_interval = 10.0;
 
        REF_INIT_RETAIN (cfg, rspamd_config_free);
 
index 511289a2d3891ae5af22521dd70f233de059ddee..47812ded1f63e38ed5e1afca777812213d8360bc 100644 (file)
 
 #include "contrib/libev/ev.h"
 
+/* Forward declaration */
+static void rspamd_worker_heartbeat_start (struct rspamd_worker *,
+               struct ev_loop *);
+
 static void rspamd_worker_ignore_signal (struct rspamd_worker_signal_handler *);
 /**
  * Return worker's control structure by its type
@@ -362,6 +366,7 @@ rspamd_prepare_worker (struct rspamd_worker *worker, const char *name,
 
        rspamd_worker_init_signals (worker, event_loop);
        rspamd_control_worker_add_default_handler (worker, event_loop);
+       rspamd_worker_heartbeat_start (worker, event_loop);
 #ifdef WITH_HIREDIS
        rspamd_redis_pool_config (worker->srv->cfg->redis_pool,
                        worker->srv->cfg, event_loop);
@@ -683,6 +688,91 @@ rspamd_worker_on_term (EV_P_ ev_child *w, int revents)
        }
 }
 
+static void
+rspamd_worker_heartbeat_cb (EV_P_ ev_timer *w, int revents)
+{
+       struct rspamd_worker *wrk = (struct rspamd_worker *)w->data;
+
+}
+
+static void
+rspamd_worker_heartbeat_start (struct rspamd_worker *wrk, struct ev_loop *event_loop)
+{
+       wrk->hb.heartbeat_ev.data = (void *)wrk;
+       ev_timer_init (&wrk->hb.heartbeat_ev, rspamd_worker_heartbeat_cb,
+                       0.0, wrk->srv->cfg->heartbeat_interval);
+       ev_timer_start (event_loop, &wrk->hb.heartbeat_ev);
+}
+
+static void
+rspamd_main_heartbeat_cb (EV_P_ ev_timer *w, int revents)
+{
+       struct rspamd_worker *wrk = (struct rspamd_worker *)w->data;
+       gdouble time_from_last = ev_time ();
+       struct rspamd_main *rspamd_main;
+       struct tm tm;
+       gchar timebuf[64];
+       gchar usec_buf[16];
+       gint r;
+
+       time_from_last -= wrk->hb.last_event;
+       rspamd_main = wrk->srv;
+
+       if (time_from_last > 0 && time_from_last > rspamd_main->cfg->heartbeat_interval) {
+               rspamd_localtime (wrk->hb.last_event, &tm);
+               r = strftime (timebuf, sizeof (timebuf), "%F %H:%M:%S", &tm);
+               rspamd_snprintf (usec_buf, sizeof (usec_buf), "%.5f",
+                               wrk->hb.last_event - (gdouble)(time_t)wrk->hb.last_event);
+               rspamd_snprintf (timebuf + r, sizeof (timebuf) - r,
+                               "%s", usec_buf + 1);
+
+               if (wrk->hb.nbeats > 0) {
+                       /* First time lost event */
+                       msg_warn_main ("lost heartbeat from worker type %s with pid %P, "
+                                 "last beat on: %s (%L beats received previously)",
+                                       g_quark_to_string (wrk->type), wrk->pid,
+                                       timebuf,
+                                       wrk->hb.nbeats);
+                       wrk->hb.nbeats = -1;
+                       /* TODO: send notify about worker problem */
+               }
+               else {
+                       wrk->hb.nbeats --;
+                       msg_warn_main ("lost %L heartbeat from worker type %s with pid %P, "
+                                                  "last beat on: %s",
+                                       -(wrk->hb.nbeats),
+                                       g_quark_to_string (wrk->type),
+                                       wrk->pid,
+                                       timebuf);
+               }
+       }
+       else if (wrk->hb.nbeats < 0) {
+               rspamd_localtime (wrk->hb.last_event, &tm);
+               r = strftime (timebuf, sizeof (timebuf), "%F %H:%M:%S", &tm);
+               rspamd_snprintf (usec_buf, sizeof (usec_buf), "%.5f",
+                               wrk->hb.last_event - (gdouble)(time_t)wrk->hb.last_event);
+               rspamd_snprintf (timebuf + r, sizeof (timebuf) - r,
+                               "%s", usec_buf + 1);
+
+               msg_info_main ("received heartbeat from worker type %s with pid %P, "
+                                          "last beat on: %s (%L beats lost previously)",
+                               g_quark_to_string (wrk->type), wrk->pid,
+                               timebuf,
+                               -(wrk->hb.nbeats));
+               wrk->hb.nbeats = 1;
+               /* TODO: send notify about worker restoration */
+       }
+}
+
+static void
+rspamd_main_heartbeat_start (struct rspamd_worker *wrk, struct ev_loop *event_loop)
+{
+       wrk->hb.heartbeat_ev.data = (void *)wrk;
+       ev_timer_init (&wrk->hb.heartbeat_ev, rspamd_main_heartbeat_cb,
+                       0.0, wrk->srv->cfg->heartbeat_interval * 2);
+       ev_timer_start (event_loop, &wrk->hb.heartbeat_ev);
+}
+
 struct rspamd_worker *
 rspamd_fork_worker (struct rspamd_main *rspamd_main,
                                        struct rspamd_worker_conf *cf,
@@ -822,6 +912,7 @@ rspamd_fork_worker (struct rspamd_main *rspamd_main,
                wrk->cld_ev.data = wrk;
                ev_child_init (&wrk->cld_ev, rspamd_worker_on_term, wrk->pid, 0);
                ev_child_start (rspamd_main->event_loop, &wrk->cld_ev);
+               rspamd_main_heartbeat_start (wrk, rspamd_main->event_loop);
                /* Insert worker into worker's table, pid is index */
                g_hash_table_insert (rspamd_main->workers, GSIZE_TO_POINTER (
                                wrk->pid), wrk);
index 458d3d0835cf1ad4435b18bbf74fb163addb635a..808ca6aaa2478018facbc1ad149f39b70ccb3478 100644 (file)
@@ -1008,6 +1008,7 @@ rspamd_cld_handler (EV_P_ ev_child *w, struct rspamd_main *rspamd_main,
                        g_free (wrk->tmp_data);
                }
                ev_io_stop (rspamd_main->event_loop, &wrk->srv_ev);
+               ev_timer_stop (rspamd_main->event_loop, &wrk->hb.heartbeat_ev);
        }
 
        if (wrk->control_pipe[0] != -1) {
index ea11965fb969052fed500505fb70d0061bcbe730..3cd6c391bd2b01074bfb9a690226984e327af237 100644 (file)
@@ -76,6 +76,12 @@ struct rspamd_worker_accept_event {
 typedef void (*rspamd_worker_term_cb) (EV_P_ ev_child *, struct rspamd_main *,
                                                                           struct rspamd_worker *);
 
+struct rspamd_worker_heartbeat {
+       ev_timer heartbeat_ev;          /**< used by main for checking heartbeats and by workers to send heartbeats */
+       ev_tstamp last_event;
+       gint64 nbeats; /* positive for beats received, negative for beats missed */
+};
+
 /**
  * Worker process structure
  */
@@ -90,7 +96,7 @@ struct rspamd_worker {
        struct rspamd_main *srv;        /**< pointer to server structure                                        */
        GQuark type;                    /**< process type                                                                       */
        GHashTable *signal_events;      /**< signal events                                                                      */
-       struct rspamd_worker_accept_event *accept_events; /**< socket events                                                                    */
+       struct rspamd_worker_accept_event *accept_events; /**< socket events                            */
        struct rspamd_worker_conf *cf;  /**< worker config data                                                         */
        gpointer ctx;                   /**< worker's specific data                                                     */
        enum rspamd_worker_flags flags; /**< worker's flags                                                                     */
@@ -99,6 +105,7 @@ struct rspamd_worker {
        gint srv_pipe[2];               /**< used by workers to request something from the
                                             main process. [0] - main, [1] - worker                     */
        ev_io srv_ev;                   /**< used by main for read workers' requests            */
+       struct rspamd_worker_heartbeat hb; /**< heartbeat data */
        gpointer control_data;          /**< used by control protocol to handle commands        */
        gpointer tmp_data;              /**< used to avoid race condition to deal with control messages */
        GPtrArray *finish_actions;      /**< called when worker is terminated                           */