diff options
author | Vsevolod Stakhov <vsevolod@highsecure.ru> | 2019-11-11 14:41:57 +0000 |
---|---|---|
committer | Vsevolod Stakhov <vsevolod@highsecure.ru> | 2019-11-11 15:30:30 +0000 |
commit | 63800059e8dacc1ba69b623719c21355a77301c5 (patch) | |
tree | 4fa217e014f7932c547e1815e217c332f4611309 /src/libserver/worker_util.c | |
parent | 82ca7ec2ac7f02f67304e1ed7b6859cabdfeae22 (diff) | |
download | rspamd-63800059e8dacc1ba69b623719c21355a77301c5.tar.gz rspamd-63800059e8dacc1ba69b623719c21355a77301c5.zip |
[Rework] Further isolation of the controller's functions
Diffstat (limited to 'src/libserver/worker_util.c')
-rw-r--r-- | src/libserver/worker_util.c | 278 |
1 files changed, 278 insertions, 0 deletions
diff --git a/src/libserver/worker_util.c b/src/libserver/worker_util.c index 05997d8b3..08933060c 100644 --- a/src/libserver/worker_util.c +++ b/src/libserver/worker_util.c @@ -25,6 +25,7 @@ #include "libutil/map_private.h" #include "libutil/http_private.h" #include "libutil/http_router.h" +#include "libutil/rrd.h" #ifdef WITH_GPERF_TOOLS #include <gperftools/profiler.h> @@ -1716,4 +1717,281 @@ rspamd_worker_init_scanner (struct rspamd_worker *worker, worker->srv->cfg); *plang_det = worker->srv->cfg->lang_det; +} + +void +rspamd_controller_store_saved_stats (struct rspamd_main *rspamd_main, + struct rspamd_config *cfg) +{ + struct rspamd_stat *stat; + ucl_object_t *top, *sub; + struct ucl_emitter_functions *efuncs; + gint i, fd; + gchar fpath[PATH_MAX]; + + if (cfg->stats_file == NULL) { + return; + } + + rspamd_snprintf (fpath, sizeof (fpath), "%s.XXXXXXXX", cfg->stats_file); + fd = g_mkstemp_full (fpath, O_WRONLY|O_TRUNC, 00644); + + if (fd == -1) { + msg_err_config ("cannot open for writing controller stats from %s: %s", + fpath, strerror (errno)); + return; + } + + stat = rspamd_main->stat; + + top = ucl_object_typed_new (UCL_OBJECT); + ucl_object_insert_key (top, ucl_object_fromint ( + stat->messages_scanned), "scanned", 0, false); + ucl_object_insert_key (top, ucl_object_fromint ( + stat->messages_learned), "learned", 0, false); + + if (stat->messages_scanned > 0) { + sub = ucl_object_typed_new (UCL_OBJECT); + for (i = METRIC_ACTION_REJECT; i <= METRIC_ACTION_NOACTION; i++) { + ucl_object_insert_key (sub, + ucl_object_fromint (stat->actions_stat[i]), + rspamd_action_to_str (i), 0, false); + } + ucl_object_insert_key (top, sub, "actions", 0, false); + } + + ucl_object_insert_key (top, + ucl_object_fromint (stat->connections_count), + "connections", 0, false); + ucl_object_insert_key (top, + ucl_object_fromint (stat->control_connections_count), + "control_connections", 0, false); + + efuncs = ucl_object_emit_fd_funcs (fd); + if (!ucl_object_emit_full (top, UCL_EMIT_JSON_COMPACT, + efuncs, NULL)) { + msg_err_config ("cannot write stats to %s: %s", + fpath, strerror (errno)); + + unlink (fpath); + } + else { + if (rename (fpath, cfg->stats_file) == -1) { + msg_err_config ("cannot rename stats from %s to %s: %s", + fpath, cfg->stats_file, strerror (errno)); + } + } + + ucl_object_unref (top); + close (fd); + ucl_object_emit_funcs_free (efuncs); +} + +static ev_timer rrd_timer; + +void +rspamd_controller_on_terminate (struct rspamd_worker *worker, + struct rspamd_rrd_file *rrd) +{ + struct rspamd_abstract_worker_ctx *ctx; + + ctx = (struct rspamd_abstract_worker_ctx *)worker->ctx; + rspamd_controller_store_saved_stats (worker->srv, worker->srv->cfg); + + if (rrd) { + ev_timer_stop (ctx->event_loop, &rrd_timer); + msg_info ("closing rrd file: %s", rrd->filename); + rspamd_rrd_close (rrd); + } +} + +static void +rspamd_controller_load_saved_stats (struct rspamd_main *rspamd_main, + struct rspamd_config *cfg) +{ + struct ucl_parser *parser; + ucl_object_t *obj; + const ucl_object_t *elt, *subelt; + struct rspamd_stat *stat, stat_copy; + gint i; + + if (cfg->stats_file == NULL) { + return; + } + + if (access (cfg->stats_file, R_OK) == -1) { + msg_err_config ("cannot load controller stats from %s: %s", + cfg->stats_file, strerror (errno)); + return; + } + + parser = ucl_parser_new (0); + + if (!ucl_parser_add_file (parser, cfg->stats_file)) { + msg_err_config ("cannot parse controller stats from %s: %s", + cfg->stats_file, ucl_parser_get_error (parser)); + ucl_parser_free (parser); + + return; + } + + obj = ucl_parser_get_object (parser); + ucl_parser_free (parser); + + stat = rspamd_main->stat; + memcpy (&stat_copy, stat, sizeof (stat_copy)); + + elt = ucl_object_lookup (obj, "scanned"); + + if (elt != NULL && ucl_object_type (elt) == UCL_INT) { + stat_copy.messages_scanned = ucl_object_toint (elt); + } + + elt = ucl_object_lookup (obj, "learned"); + + if (elt != NULL && ucl_object_type (elt) == UCL_INT) { + stat_copy.messages_learned = ucl_object_toint (elt); + } + + elt = ucl_object_lookup (obj, "actions"); + + if (elt != NULL) { + for (i = METRIC_ACTION_REJECT; i <= METRIC_ACTION_NOACTION; i++) { + subelt = ucl_object_lookup (elt, rspamd_action_to_str (i)); + + if (subelt && ucl_object_type (subelt) == UCL_INT) { + stat_copy.actions_stat[i] = ucl_object_toint (subelt); + } + } + } + + elt = ucl_object_lookup (obj, "connections_count"); + + if (elt != NULL && ucl_object_type (elt) == UCL_INT) { + stat_copy.connections_count = ucl_object_toint (elt); + } + + elt = ucl_object_lookup (obj, "control_connections_count"); + + if (elt != NULL && ucl_object_type (elt) == UCL_INT) { + stat_copy.control_connections_count = ucl_object_toint (elt); + } + + ucl_object_unref (obj); + memcpy (stat, &stat_copy, sizeof (stat_copy)); +} + +struct rspamd_controller_periodics_cbdata { + struct rspamd_worker *worker; + struct rspamd_rrd_file *rrd; + struct rspamd_stat *stat; + ev_timer save_stats_event; +}; + +static void +rspamd_controller_rrd_update (EV_P_ ev_timer *w, int revents) +{ + struct rspamd_controller_periodics_cbdata *cbd = + (struct rspamd_controller_periodics_cbdata *)w->data; + struct rspamd_stat *stat; + GArray ar; + gdouble points[METRIC_ACTION_MAX]; + GError *err = NULL; + guint i; + + g_assert (cbd->rrd != NULL); + stat = cbd->stat; + + for (i = METRIC_ACTION_REJECT; i < METRIC_ACTION_MAX; i ++) { + points[i] = stat->actions_stat[i]; + } + + ar.data = (gchar *)points; + ar.len = sizeof (points); + + if (!rspamd_rrd_add_record (cbd->rrd, &ar, rspamd_get_calendar_ticks (), + &err)) { + msg_err ("cannot update rrd file: %e", err); + g_error_free (err); + } + + /* Plan new event */ + ev_timer_again (EV_A_ w); +} + +static void +rspamd_controller_stats_save_periodic (EV_P_ ev_timer *w, int revents) +{ + struct rspamd_controller_periodics_cbdata *cbd = + (struct rspamd_controller_periodics_cbdata *)w->data; + + rspamd_controller_store_saved_stats (cbd->worker->srv, cbd->worker->srv->cfg); + ev_timer_again (EV_A_ w); +} + +void +rspamd_worker_init_controller (struct rspamd_worker *worker, + struct rspamd_rrd_file **prrd) +{ + struct rspamd_abstract_worker_ctx *ctx; + static const ev_tstamp rrd_update_time = 1.0; + + ctx = (struct rspamd_abstract_worker_ctx *)worker->ctx; + rspamd_controller_load_saved_stats (worker->srv, worker->srv->cfg); + + if (worker->index == 0) { + /* Enable periodics and other stuff */ + static struct rspamd_controller_periodics_cbdata cbd; + const ev_tstamp save_stats_interval = 60; /* 1 minute */ + + memset (&cbd, 0, sizeof (cbd)); + cbd.save_stats_event.data = &cbd; + cbd.worker = worker; + cbd.stat = worker->srv->stat; + + ev_timer_init (&cbd.save_stats_event, + rspamd_controller_stats_save_periodic, + save_stats_interval, save_stats_interval); + ev_timer_start (ctx->event_loop, &cbd.save_stats_event); + + rspamd_map_watch (worker->srv->cfg, ctx->event_loop, + ctx->resolver, worker, TRUE); + + if (prrd != NULL) { + if (ctx->cfg->rrd_file && worker->index == 0) { + GError *rrd_err = NULL; + + *prrd = rspamd_rrd_file_default (ctx->cfg->rrd_file, &rrd_err); + + if (*prrd) { + cbd.rrd = *prrd; + rrd_timer.data = &cbd; + ev_timer_init (&rrd_timer, rspamd_controller_rrd_update, + rrd_update_time, rrd_update_time); + ev_timer_start (ctx->event_loop, &rrd_timer); + } + else if (rrd_err) { + msg_err ("cannot load rrd from %s: %e", ctx->cfg->rrd_file, + rrd_err); + g_error_free (rrd_err); + } + else { + msg_err ("cannot load rrd from %s: unknown error", + ctx->cfg->rrd_file); + } + } + else { + *prrd = NULL; + } + } + + if (!ctx->cfg->disable_monitored) { + rspamd_worker_init_monitored (worker, + ctx->event_loop, ctx->resolver); + } + } + else { + rspamd_map_watch (worker->srv->cfg, ctx->event_loop, + ctx->resolver, worker, FALSE); + } }
\ No newline at end of file |