aboutsummaryrefslogtreecommitdiffstats
path: root/src/libserver/worker_util.c
diff options
context:
space:
mode:
authorVsevolod Stakhov <vsevolod@highsecure.ru>2019-11-11 14:41:57 +0000
committerVsevolod Stakhov <vsevolod@highsecure.ru>2019-11-11 15:30:30 +0000
commit63800059e8dacc1ba69b623719c21355a77301c5 (patch)
tree4fa217e014f7932c547e1815e217c332f4611309 /src/libserver/worker_util.c
parent82ca7ec2ac7f02f67304e1ed7b6859cabdfeae22 (diff)
downloadrspamd-63800059e8dacc1ba69b623719c21355a77301c5.tar.gz
rspamd-63800059e8dacc1ba69b623719c21355a77301c5.zip
[Rework] Further isolation of the controller's functions
Diffstat (limited to 'src/libserver/worker_util.c')
-rw-r--r--src/libserver/worker_util.c278
1 files changed, 278 insertions, 0 deletions
diff --git a/src/libserver/worker_util.c b/src/libserver/worker_util.c
index 05997d8b3..08933060c 100644
--- a/src/libserver/worker_util.c
+++ b/src/libserver/worker_util.c
@@ -25,6 +25,7 @@
#include "libutil/map_private.h"
#include "libutil/http_private.h"
#include "libutil/http_router.h"
+#include "libutil/rrd.h"
#ifdef WITH_GPERF_TOOLS
#include <gperftools/profiler.h>
@@ -1716,4 +1717,281 @@ rspamd_worker_init_scanner (struct rspamd_worker *worker,
worker->srv->cfg);
*plang_det = worker->srv->cfg->lang_det;
+}
+
+void
+rspamd_controller_store_saved_stats (struct rspamd_main *rspamd_main,
+ struct rspamd_config *cfg)
+{
+ struct rspamd_stat *stat;
+ ucl_object_t *top, *sub;
+ struct ucl_emitter_functions *efuncs;
+ gint i, fd;
+ gchar fpath[PATH_MAX];
+
+ if (cfg->stats_file == NULL) {
+ return;
+ }
+
+ rspamd_snprintf (fpath, sizeof (fpath), "%s.XXXXXXXX", cfg->stats_file);
+ fd = g_mkstemp_full (fpath, O_WRONLY|O_TRUNC, 00644);
+
+ if (fd == -1) {
+ msg_err_config ("cannot open for writing controller stats from %s: %s",
+ fpath, strerror (errno));
+ return;
+ }
+
+ stat = rspamd_main->stat;
+
+ top = ucl_object_typed_new (UCL_OBJECT);
+ ucl_object_insert_key (top, ucl_object_fromint (
+ stat->messages_scanned), "scanned", 0, false);
+ ucl_object_insert_key (top, ucl_object_fromint (
+ stat->messages_learned), "learned", 0, false);
+
+ if (stat->messages_scanned > 0) {
+ sub = ucl_object_typed_new (UCL_OBJECT);
+ for (i = METRIC_ACTION_REJECT; i <= METRIC_ACTION_NOACTION; i++) {
+ ucl_object_insert_key (sub,
+ ucl_object_fromint (stat->actions_stat[i]),
+ rspamd_action_to_str (i), 0, false);
+ }
+ ucl_object_insert_key (top, sub, "actions", 0, false);
+ }
+
+ ucl_object_insert_key (top,
+ ucl_object_fromint (stat->connections_count),
+ "connections", 0, false);
+ ucl_object_insert_key (top,
+ ucl_object_fromint (stat->control_connections_count),
+ "control_connections", 0, false);
+
+ efuncs = ucl_object_emit_fd_funcs (fd);
+ if (!ucl_object_emit_full (top, UCL_EMIT_JSON_COMPACT,
+ efuncs, NULL)) {
+ msg_err_config ("cannot write stats to %s: %s",
+ fpath, strerror (errno));
+
+ unlink (fpath);
+ }
+ else {
+ if (rename (fpath, cfg->stats_file) == -1) {
+ msg_err_config ("cannot rename stats from %s to %s: %s",
+ fpath, cfg->stats_file, strerror (errno));
+ }
+ }
+
+ ucl_object_unref (top);
+ close (fd);
+ ucl_object_emit_funcs_free (efuncs);
+}
+
+static ev_timer rrd_timer;
+
+void
+rspamd_controller_on_terminate (struct rspamd_worker *worker,
+ struct rspamd_rrd_file *rrd)
+{
+ struct rspamd_abstract_worker_ctx *ctx;
+
+ ctx = (struct rspamd_abstract_worker_ctx *)worker->ctx;
+ rspamd_controller_store_saved_stats (worker->srv, worker->srv->cfg);
+
+ if (rrd) {
+ ev_timer_stop (ctx->event_loop, &rrd_timer);
+ msg_info ("closing rrd file: %s", rrd->filename);
+ rspamd_rrd_close (rrd);
+ }
+}
+
+static void
+rspamd_controller_load_saved_stats (struct rspamd_main *rspamd_main,
+ struct rspamd_config *cfg)
+{
+ struct ucl_parser *parser;
+ ucl_object_t *obj;
+ const ucl_object_t *elt, *subelt;
+ struct rspamd_stat *stat, stat_copy;
+ gint i;
+
+ if (cfg->stats_file == NULL) {
+ return;
+ }
+
+ if (access (cfg->stats_file, R_OK) == -1) {
+ msg_err_config ("cannot load controller stats from %s: %s",
+ cfg->stats_file, strerror (errno));
+ return;
+ }
+
+ parser = ucl_parser_new (0);
+
+ if (!ucl_parser_add_file (parser, cfg->stats_file)) {
+ msg_err_config ("cannot parse controller stats from %s: %s",
+ cfg->stats_file, ucl_parser_get_error (parser));
+ ucl_parser_free (parser);
+
+ return;
+ }
+
+ obj = ucl_parser_get_object (parser);
+ ucl_parser_free (parser);
+
+ stat = rspamd_main->stat;
+ memcpy (&stat_copy, stat, sizeof (stat_copy));
+
+ elt = ucl_object_lookup (obj, "scanned");
+
+ if (elt != NULL && ucl_object_type (elt) == UCL_INT) {
+ stat_copy.messages_scanned = ucl_object_toint (elt);
+ }
+
+ elt = ucl_object_lookup (obj, "learned");
+
+ if (elt != NULL && ucl_object_type (elt) == UCL_INT) {
+ stat_copy.messages_learned = ucl_object_toint (elt);
+ }
+
+ elt = ucl_object_lookup (obj, "actions");
+
+ if (elt != NULL) {
+ for (i = METRIC_ACTION_REJECT; i <= METRIC_ACTION_NOACTION; i++) {
+ subelt = ucl_object_lookup (elt, rspamd_action_to_str (i));
+
+ if (subelt && ucl_object_type (subelt) == UCL_INT) {
+ stat_copy.actions_stat[i] = ucl_object_toint (subelt);
+ }
+ }
+ }
+
+ elt = ucl_object_lookup (obj, "connections_count");
+
+ if (elt != NULL && ucl_object_type (elt) == UCL_INT) {
+ stat_copy.connections_count = ucl_object_toint (elt);
+ }
+
+ elt = ucl_object_lookup (obj, "control_connections_count");
+
+ if (elt != NULL && ucl_object_type (elt) == UCL_INT) {
+ stat_copy.control_connections_count = ucl_object_toint (elt);
+ }
+
+ ucl_object_unref (obj);
+ memcpy (stat, &stat_copy, sizeof (stat_copy));
+}
+
+struct rspamd_controller_periodics_cbdata {
+ struct rspamd_worker *worker;
+ struct rspamd_rrd_file *rrd;
+ struct rspamd_stat *stat;
+ ev_timer save_stats_event;
+};
+
+static void
+rspamd_controller_rrd_update (EV_P_ ev_timer *w, int revents)
+{
+ struct rspamd_controller_periodics_cbdata *cbd =
+ (struct rspamd_controller_periodics_cbdata *)w->data;
+ struct rspamd_stat *stat;
+ GArray ar;
+ gdouble points[METRIC_ACTION_MAX];
+ GError *err = NULL;
+ guint i;
+
+ g_assert (cbd->rrd != NULL);
+ stat = cbd->stat;
+
+ for (i = METRIC_ACTION_REJECT; i < METRIC_ACTION_MAX; i ++) {
+ points[i] = stat->actions_stat[i];
+ }
+
+ ar.data = (gchar *)points;
+ ar.len = sizeof (points);
+
+ if (!rspamd_rrd_add_record (cbd->rrd, &ar, rspamd_get_calendar_ticks (),
+ &err)) {
+ msg_err ("cannot update rrd file: %e", err);
+ g_error_free (err);
+ }
+
+ /* Plan new event */
+ ev_timer_again (EV_A_ w);
+}
+
+static void
+rspamd_controller_stats_save_periodic (EV_P_ ev_timer *w, int revents)
+{
+ struct rspamd_controller_periodics_cbdata *cbd =
+ (struct rspamd_controller_periodics_cbdata *)w->data;
+
+ rspamd_controller_store_saved_stats (cbd->worker->srv, cbd->worker->srv->cfg);
+ ev_timer_again (EV_A_ w);
+}
+
+void
+rspamd_worker_init_controller (struct rspamd_worker *worker,
+ struct rspamd_rrd_file **prrd)
+{
+ struct rspamd_abstract_worker_ctx *ctx;
+ static const ev_tstamp rrd_update_time = 1.0;
+
+ ctx = (struct rspamd_abstract_worker_ctx *)worker->ctx;
+ rspamd_controller_load_saved_stats (worker->srv, worker->srv->cfg);
+
+ if (worker->index == 0) {
+ /* Enable periodics and other stuff */
+ static struct rspamd_controller_periodics_cbdata cbd;
+ const ev_tstamp save_stats_interval = 60; /* 1 minute */
+
+ memset (&cbd, 0, sizeof (cbd));
+ cbd.save_stats_event.data = &cbd;
+ cbd.worker = worker;
+ cbd.stat = worker->srv->stat;
+
+ ev_timer_init (&cbd.save_stats_event,
+ rspamd_controller_stats_save_periodic,
+ save_stats_interval, save_stats_interval);
+ ev_timer_start (ctx->event_loop, &cbd.save_stats_event);
+
+ rspamd_map_watch (worker->srv->cfg, ctx->event_loop,
+ ctx->resolver, worker, TRUE);
+
+ if (prrd != NULL) {
+ if (ctx->cfg->rrd_file && worker->index == 0) {
+ GError *rrd_err = NULL;
+
+ *prrd = rspamd_rrd_file_default (ctx->cfg->rrd_file, &rrd_err);
+
+ if (*prrd) {
+ cbd.rrd = *prrd;
+ rrd_timer.data = &cbd;
+ ev_timer_init (&rrd_timer, rspamd_controller_rrd_update,
+ rrd_update_time, rrd_update_time);
+ ev_timer_start (ctx->event_loop, &rrd_timer);
+ }
+ else if (rrd_err) {
+ msg_err ("cannot load rrd from %s: %e", ctx->cfg->rrd_file,
+ rrd_err);
+ g_error_free (rrd_err);
+ }
+ else {
+ msg_err ("cannot load rrd from %s: unknown error",
+ ctx->cfg->rrd_file);
+ }
+ }
+ else {
+ *prrd = NULL;
+ }
+ }
+
+ if (!ctx->cfg->disable_monitored) {
+ rspamd_worker_init_monitored (worker,
+ ctx->event_loop, ctx->resolver);
+ }
+ }
+ else {
+ rspamd_map_watch (worker->srv->cfg, ctx->event_loop,
+ ctx->resolver, worker, FALSE);
+ }
} \ No newline at end of file