aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorVsevolod Stakhov <vsevolod@rspamd.com>2024-06-24 20:01:45 +0600
committerGitHub <noreply@github.com>2024-06-24 20:01:45 +0600
commit1995012959e219e02aef939f9fd3765ffd718e04 (patch)
treeb4b807aa0ce73c5c530a0ce8c7b72638acf0f417
parent90b73439d20d6e5b9b9e61cecbaa9809d7d0ddcd (diff)
parent655afcbbe99a841275ba34534f9300a67fede996 (diff)
downloadrspamd-1995012959e219e02aef939f9fd3765ffd718e04.tar.gz
rspamd-1995012959e219e02aef939f9fd3765ffd718e04.zip
Merge pull request #5026 from rspamd/vstakhov-metrics-worker
[Rework] Allow metrics endpoint to be enabled for each scanner
-rw-r--r--src/controller.c236
-rw-r--r--src/libserver/protocol.c30
-rw-r--r--src/libserver/protocol.h4
-rw-r--r--src/libserver/protocol_internal.h8
-rw-r--r--src/libserver/task.c2
-rw-r--r--src/libserver/task.h7
-rw-r--r--src/libserver/worker_util.c202
-rw-r--r--src/libserver/worker_util.h32
-rw-r--r--src/rspamd.c1
-rw-r--r--src/rspamd.h1
-rw-r--r--src/rspamd_proxy.c13
-rw-r--r--src/worker.c2
-rw-r--r--src/worker_private.h6
13 files changed, 306 insertions, 238 deletions
diff --git a/src/controller.c b/src/controller.c
index 5526d9197..d91f99098 100644
--- a/src/controller.c
+++ b/src/controller.c
@@ -149,8 +149,6 @@ struct rspamd_controller_worker_ctx {
/* HTTP server */
struct rspamd_http_context *http_ctx;
struct rspamd_http_connection_router *http;
- /* Server's start time */
- ev_tstamp start_time;
/* Main server */
struct rspamd_main *srv;
/* SSL cert */
@@ -762,7 +760,7 @@ rspamd_controller_handle_auth(struct rspamd_http_connection_entry *conn_ent,
data[4] = st.actions_stat[METRIC_ACTION_SOFT_REJECT];
/* Get uptime */
- uptime = ev_time() - session->ctx->start_time;
+ uptime = ev_time() - session->ctx->srv->start_time;
ucl_object_insert_key(obj, ucl_object_fromstring(RVERSION), "version", 0, false);
ucl_object_insert_key(obj, ucl_object_fromstring("ok"), "auth", 0, false);
@@ -2695,7 +2693,7 @@ rspamd_controller_handle_stat_common(
ucl_object_insert_key(top, ucl_object_fromstring(RVERSION), "version", 0, false);
ucl_object_insert_key(top, ucl_object_fromstring(session->ctx->cfg->checksum), "config_id", 0, false);
- uptime = ev_time() - session->ctx->start_time;
+ uptime = ev_time() - session->ctx->srv->start_time;
ucl_object_insert_key(top, ucl_object_fromint(uptime), "uptime", 0, false);
ucl_object_insert_key(top, ucl_object_frombool(session->is_read_only),
"read_only", 0, false);
@@ -2811,19 +2809,6 @@ rspamd_controller_handle_statreset(
return rspamd_controller_handle_stat_common(conn_ent, msg, TRUE);
}
-static inline void
-rspamd_controller_metrics_add_integer(rspamd_fstring_t **output,
- const ucl_object_t *top,
- const char *name,
- const char *type,
- const char *description,
- const char *ucl_key)
-{
- rspamd_printf_fstring(output, "# HELP %s %s\n", name, description);
- rspamd_printf_fstring(output, "# TYPE %s %s\n", name, type);
- rspamd_printf_fstring(output, "%s %L\n", name,
- ucl_object_toint(ucl_object_lookup(top, ucl_key)));
-}
/*
* Metrics command handler:
@@ -2836,138 +2821,11 @@ rspamd_controller_metrics_fin_task(void *ud)
{
struct rspamd_stat_cbdata *cbdata = ud;
struct rspamd_http_connection_entry *conn_ent;
- ucl_object_t *top;
struct rspamd_fuzzy_stat_entry *entry;
rspamd_fstring_t *output;
- int i;
conn_ent = cbdata->conn_ent;
- top = cbdata->top;
-
- output = rspamd_fstring_sized_new(1024);
- rspamd_printf_fstring(&output, "# HELP rspamd_build_info A metric with a constant '1' value "
- "labeled by version from which rspamd was built.\n");
- rspamd_printf_fstring(&output, "# TYPE rspamd_build_info gauge\n");
- rspamd_printf_fstring(&output, "rspamd_build_info{version=\"%s\"} 1\n",
- ucl_object_tostring(ucl_object_lookup(top, "version")));
- rspamd_printf_fstring(&output, "# HELP rspamd_config A metric with a constant '1' value "
- "labeled by id of the current config.\n");
- rspamd_printf_fstring(&output, "# TYPE rspamd_config gauge\n");
- rspamd_printf_fstring(&output, "rspamd_config{id=\"%s\"} 1\n",
- ucl_object_tostring(ucl_object_lookup(top, "config_id")));
-
- gsize cnt = MAX_AVG_TIME_SLOTS;
- float sum = rspamd_sum_floats(cbdata->ctx->worker->srv->stat->avg_time.avg_time, &cnt);
- rspamd_printf_fstring(&output, "# HELP rspamd_scan_time_average Average messages scan time.\n");
- rspamd_printf_fstring(&output, "# TYPE rspamd_scan_time_average gauge\n");
- rspamd_printf_fstring(&output, "rspamd_scan_time_average %f\n",
- cnt > 0 ? (double) sum / cnt : 0.0);
-
- rspamd_controller_metrics_add_integer(&output, top,
- "process_start_time_seconds",
- "gauge",
- "Start time of the process since unix epoch in seconds.",
- "start_time");
- rspamd_controller_metrics_add_integer(&output, top,
- "rspamd_read_only",
- "gauge",
- "Whether the rspamd instance is read-only.",
- "read_only");
- rspamd_controller_metrics_add_integer(&output, top,
- "rspamd_scanned_total",
- "counter",
- "Scanned messages.",
- "scanned");
- rspamd_controller_metrics_add_integer(&output, top,
- "rspamd_learned_total",
- "counter",
- "Learned messages.",
- "learned");
- rspamd_controller_metrics_add_integer(&output, top,
- "rspamd_spam_total",
- "counter",
- "Messages classified as spam.",
- "spam_count");
- rspamd_controller_metrics_add_integer(&output, top,
- "rspamd_ham_total",
- "counter",
- "Messages classified as ham.",
- "ham_count");
- rspamd_controller_metrics_add_integer(&output, top,
- "rspamd_connections",
- "gauge",
- "Active connections.",
- "connections");
- rspamd_controller_metrics_add_integer(&output, top,
- "rspamd_control_connections_total",
- "gauge",
- "Control connections.",
- "control_connections");
- rspamd_controller_metrics_add_integer(&output, top,
- "rspamd_pools_allocated",
- "gauge",
- "Pools allocated.",
- "pools_allocated");
- rspamd_controller_metrics_add_integer(&output, top,
- "rspamd_pools_freed",
- "gauge",
- "Pools freed.",
- "pools_freed");
- rspamd_controller_metrics_add_integer(&output, top,
- "rspamd_allocated_bytes",
- "gauge",
- "Bytes allocated.",
- "bytes_allocated");
- rspamd_controller_metrics_add_integer(&output, top,
- "rspamd_chunks_allocated",
- "gauge",
- "Memory pools: current chunks allocated.",
- "chunks_allocated");
- rspamd_controller_metrics_add_integer(&output, top,
- "rspamd_shared_chunks_allocated",
- "gauge",
- "Memory pools: current shared chunks allocated.",
- "shared_chunks_allocated");
- rspamd_controller_metrics_add_integer(&output, top,
- "rspamd_chunks_freed",
- "gauge",
- "Memory pools: current chunks freed.",
- "chunks_freed");
- rspamd_controller_metrics_add_integer(&output, top,
- "rspamd_chunks_oversized",
- "gauge",
- "Memory pools: current chunks oversized (needs extra allocation/fragmentation).",
- "chunks_oversized");
- rspamd_controller_metrics_add_integer(&output, top,
- "rspamd_fragmented",
- "gauge",
- "Memory pools: fragmented memory waste.",
- "fragmented");
-
- rspamd_printf_fstring(&output, "# HELP rspamd_learns_total Total learns.\n");
- rspamd_printf_fstring(&output, "# TYPE rspamd_learns_total counter\n");
- rspamd_printf_fstring(&output, "rspamd_learns_total %L\n", cbdata->learned);
-
- const ucl_object_t *acts_obj = ucl_object_lookup(top, "actions");
-
- if (acts_obj) {
- rspamd_printf_fstring(&output, "# HELP rspamd_actions_total Actions labelled by action type.\n");
- rspamd_printf_fstring(&output, "# TYPE rspamd_actions_total counter\n");
- for (i = METRIC_ACTION_REJECT; i <= METRIC_ACTION_NOACTION; i++) {
- const char *str_act = rspamd_action_to_str(i);
- const ucl_object_t *act = ucl_object_lookup(acts_obj, str_act);
-
- if (act) {
- rspamd_printf_fstring(&output, "rspamd_actions_total{type=\"%s\"} %L\n",
- str_act,
- ucl_object_toint(act));
- }
- else {
- rspamd_printf_fstring(&output, "rspamd_actions_total{type=\"%s\"} 0\n",
- str_act);
- }
- }
- }
+ output = rspamd_metrics_to_prometheus_string(cbdata->top);
if (cbdata->stat) {
const ucl_object_t *cur_elt;
@@ -3081,7 +2939,6 @@ rspamd_controller_metrics_fin_task(void *ud)
}
rspamd_printf_fstring(&output, "# EOF\n");
-
rspamd_controller_send_openmetrics(conn_ent, output);
return TRUE;
@@ -3094,22 +2951,21 @@ rspamd_controller_handle_metrics_common(
gboolean do_reset)
{
struct rspamd_controller_session *session = conn_ent->ud;
- ucl_object_t *top, *sub;
- int i;
- int64_t uptime;
- uint64_t spam = 0, ham = 0;
- rspamd_mempool_stat_t mem_st;
- struct rspamd_stat *stat, stat_copy;
+ ucl_object_t *top;
+ ev_tstamp uptime;
+ struct rspamd_stat stat_copy;
struct rspamd_controller_worker_ctx *ctx;
struct rspamd_task *task;
struct rspamd_stat_cbdata *cbdata;
- memset(&mem_st, 0, sizeof(mem_st));
- rspamd_mempool_stat(&mem_st);
- memcpy(&stat_copy, session->ctx->worker->srv->stat, sizeof(stat_copy));
- stat = &stat_copy;
+ uptime = ev_time() - session->ctx->srv->start_time;
ctx = session->ctx;
+ memcpy(&stat_copy, session->ctx->worker->srv->stat, sizeof(stat_copy));
+ top = rspamd_worker_metrics_object(session->ctx->cfg, &stat_copy, uptime);
+ ucl_object_insert_key(top, ucl_object_fromint(session->ctx->srv->start_time), "start_time", 0, false);
+ ucl_object_insert_key(top, ucl_object_frombool(session->is_read_only),
+ "read_only", 0, false);
task = rspamd_task_new(session->ctx->worker, session->cfg, session->pool,
ctx->lang_det, ctx->event_loop, FALSE);
task->resolver = ctx->resolver;
@@ -3117,7 +2973,6 @@ rspamd_controller_handle_metrics_common(
cbdata->conn_ent = conn_ent;
cbdata->task = task;
cbdata->ctx = ctx;
- top = ucl_object_typed_new(UCL_OBJECT);
cbdata->top = top;
task->s = rspamd_session_create(session->pool,
@@ -3127,75 +2982,20 @@ rspamd_controller_handle_metrics_common(
cbdata);
task->fin_arg = cbdata;
task->http_conn = rspamd_http_connection_ref(conn_ent->conn);
- ;
task->sock = conn_ent->conn->fd;
- ucl_object_insert_key(top, ucl_object_fromstring(RVERSION), "version", 0, false);
- ucl_object_insert_key(top, ucl_object_fromstring(session->ctx->cfg->checksum), "config_id", 0, false);
- uptime = ev_time() - session->ctx->start_time;
- ucl_object_insert_key(top, ucl_object_fromint(uptime), "uptime", 0, false);
- ucl_object_insert_key(top, ucl_object_fromint(session->ctx->start_time), "start_time", 0, false);
- ucl_object_insert_key(top, ucl_object_frombool(session->is_read_only),
- "read_only", 0, false);
- ucl_object_insert_key(top, ucl_object_fromint(stat->messages_scanned), "scanned", 0, false);
- ucl_object_insert_key(top, ucl_object_fromint(stat->messages_learned), "learned", 0, false);
+ if (stat_copy.messages_scanned > 0 && do_reset) {
+ for (int i = METRIC_ACTION_REJECT; i <= METRIC_ACTION_NOACTION; i++) {
- if (stat->messages_scanned > 0) {
- sub = ucl_object_typed_new(UCL_OBJECT);
- for (i = METRIC_ACTION_REJECT; i <= METRIC_ACTION_NOACTION; i++) {
- ucl_object_insert_key(sub,
- ucl_object_fromint(stat->actions_stat[i]),
- rspamd_action_to_str(i), 0, false);
- if (i < METRIC_ACTION_GREYLIST) {
- spam += stat->actions_stat[i];
- }
- else {
- ham += stat->actions_stat[i];
- }
- if (do_reset) {
#ifndef HAVE_ATOMIC_BUILTINS
- session->ctx->worker->srv->stat->actions_stat[i] = 0;
+ session->ctx->worker->srv->stat->actions_stat[i] = 0;
#else
- __atomic_store_n(&session->ctx->worker->srv->stat->actions_stat[i],
- 0, __ATOMIC_RELEASE);
+ __atomic_store_n(&session->ctx->worker->srv->stat->actions_stat[i],
+ 0, __ATOMIC_RELEASE);
#endif
- }
}
- ucl_object_insert_key(top, sub, "actions", 0, false);
}
- ucl_object_insert_key(top, ucl_object_fromint(spam), "spam_count", 0, false);
- ucl_object_insert_key(top, ucl_object_fromint(ham), "ham_count", 0, false);
- ucl_object_insert_key(top,
- ucl_object_fromint(stat->connections_count), "connections", 0, false);
- ucl_object_insert_key(top,
- ucl_object_fromint(stat->control_connections_count),
- "control_connections", 0, false);
-
- ucl_object_insert_key(top,
- ucl_object_fromint(mem_st.pools_allocated), "pools_allocated", 0,
- false);
- ucl_object_insert_key(top,
- ucl_object_fromint(mem_st.pools_freed), "pools_freed", 0, false);
- ucl_object_insert_key(top,
- ucl_object_fromint(mem_st.bytes_allocated), "bytes_allocated", 0,
- false);
- ucl_object_insert_key(top,
- ucl_object_fromint(
- mem_st.chunks_allocated),
- "chunks_allocated", 0, false);
- ucl_object_insert_key(top,
- ucl_object_fromint(mem_st.shared_chunks_allocated),
- "shared_chunks_allocated", 0, false);
- ucl_object_insert_key(top,
- ucl_object_fromint(mem_st.chunks_freed), "chunks_freed", 0, false);
- ucl_object_insert_key(top,
- ucl_object_fromint(
- mem_st.oversized_chunks),
- "chunks_oversized", 0, false);
- ucl_object_insert_key(top,
- ucl_object_fromint(mem_st.fragmented_size), "fragmented", 0, false);
-
if (do_reset) {
session->ctx->srv->stat->messages_scanned = 0;
session->ctx->srv->stat->messages_learned = 0;
@@ -3210,7 +3010,6 @@ rspamd_controller_handle_metrics_common(
rspamd_stat_statistics(task, session->ctx->cfg, &cbdata->learned,
&cbdata->stat);
session->task = task;
-
rspamd_session_pending(task->s);
@@ -4083,7 +3882,6 @@ start_controller_worker(struct rspamd_worker *worker)
"controller",
rspamd_controller_accept_socket);
- ctx->start_time = ev_time();
ctx->worker = worker;
ctx->cfg = worker->srv->cfg;
ctx->srv = worker->srv;
diff --git a/src/libserver/protocol.c b/src/libserver/protocol.c
index c8c7bc76a..db83b0bfb 100644
--- a/src/libserver/protocol.c
+++ b/src/libserver/protocol.c
@@ -25,6 +25,7 @@
#include "unix-std.h"
#include "protocol_internal.h"
#include "libserver/mempool_vars_internal.h"
+#include "libserver/worker_util.h"
#include "contrib/fastutf8/fastutf8.h"
#include "task.h"
#include "lua/lua_classnames.h"
@@ -210,6 +211,19 @@ rspamd_protocol_handle_url(struct rspamd_task *task,
goto err;
}
break;
+ case 'M':
+ case 'm':
+ /* metrics, process */
+ if (COMPARE_CMD(p, MSG_CMD_METRICS, pathlen)) {
+ msg_debug_protocol("got metrics command");
+ task->cmd = CMD_METRICS;
+ task->flags |= RSPAMD_TASK_FLAG_SKIP;
+ task->processed_stages |= RSPAMD_TASK_STAGE_DONE; /* Skip all */
+ }
+ else {
+ goto err;
+ }
+ break;
default:
goto err;
}
@@ -2098,11 +2112,12 @@ void rspamd_protocol_write_log_pipe(struct rspamd_task *task)
g_array_free(extra, TRUE);
}
-void rspamd_protocol_write_reply(struct rspamd_task *task, ev_tstamp timeout)
+void rspamd_protocol_write_reply(struct rspamd_task *task, ev_tstamp timeout, struct rspamd_main *srv)
{
struct rspamd_http_message *msg;
const char *ctype = "application/json";
rspamd_fstring_t *reply;
+ ev_tstamp now = ev_time();
msg = rspamd_http_new_message(HTTP_RESPONSE);
@@ -2163,6 +2178,8 @@ void rspamd_protocol_write_reply(struct rspamd_task *task, ev_tstamp timeout)
}
}
else {
+ rspamd_fstring_t *output;
+ struct rspamd_stat stat_copy;
msg->status = rspamd_fstring_new_init("OK", 2);
switch (task->cmd) {
@@ -2179,6 +2196,15 @@ void rspamd_protocol_write_reply(struct rspamd_task *task, ev_tstamp timeout)
rspamd_http_message_set_body(msg, "pong" CRLF, 6);
ctype = "text/plain";
break;
+ case CMD_METRICS:
+ msg_debug_protocol("writing metrics to client");
+
+ memcpy(&stat_copy, srv->stat, sizeof(stat_copy));
+ output = rspamd_metrics_to_prometheus_string(
+ rspamd_worker_metrics_object(srv->cfg, &stat_copy, now - srv->start_time));
+ rspamd_http_message_set_body_from_fstring_steal(msg, output);
+ ctype = "application/openmetrics-text; version=1.0.0; charset=utf-8";
+ break;
default:
msg_err_protocol("BROKEN");
break;
@@ -2186,7 +2212,7 @@ void rspamd_protocol_write_reply(struct rspamd_task *task, ev_tstamp timeout)
}
ev_now_update(task->event_loop);
- msg->date = ev_time();
+ msg->date = now;
rspamd_http_connection_reset(task->http_conn);
rspamd_http_connection_write_message(task->http_conn, msg, NULL,
diff --git a/src/libserver/protocol.h b/src/libserver/protocol.h
index 94fbcbf04..9d2b985da 100644
--- a/src/libserver/protocol.h
+++ b/src/libserver/protocol.h
@@ -51,7 +51,7 @@ struct rspamd_protocol_log_message_sum {
struct rspamd_protocol_log_symbol_result results[];
};
-struct rspamd_metric;
+struct rspamd_main;
/**
* Process headers into HTTP message and set appropriate task fields
@@ -126,7 +126,7 @@ ucl_object_t *rspamd_protocol_write_ucl(struct rspamd_task *task,
* @param task task object
* @return 0 if we wrote reply and -1 if there was some error
*/
-void rspamd_protocol_write_reply(struct rspamd_task *task, ev_tstamp timeout);
+void rspamd_protocol_write_reply(struct rspamd_task *task, ev_tstamp timeout, struct rspamd_main *srv);
/**
* Convert rspamd output to legacy protocol reply
diff --git a/src/libserver/protocol_internal.h b/src/libserver/protocol_internal.h
index c604e9630..7a70ccef0 100644
--- a/src/libserver/protocol_internal.h
+++ b/src/libserver/protocol_internal.h
@@ -1,11 +1,11 @@
-/*-
- * Copyright 2017 Vsevolod Stakhov
+/*
+ * Copyright 2024 Vsevolod Stakhov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -55,6 +55,8 @@ extern "C" {
* Return a confirmation that spamd is alive
*/
#define MSG_CMD_PING "ping"
+
+#define MSG_CMD_METRICS "metrics"
/*
* Process this message as described above and return modified message
*/
diff --git a/src/libserver/task.c b/src/libserver/task.c
index 63babf990..637f401a9 100644
--- a/src/libserver/task.c
+++ b/src/libserver/task.c
@@ -133,7 +133,7 @@ rspamd_task_reply(struct rspamd_task *task)
}
else {
if (!(task->processed_stages & RSPAMD_TASK_STAGE_REPLIED)) {
- rspamd_protocol_write_reply(task, write_timeout);
+ rspamd_protocol_write_reply(task, write_timeout, task->worker->srv);
}
}
}
diff --git a/src/libserver/task.h b/src/libserver/task.h
index cba9bbbd4..7e6341a84 100644
--- a/src/libserver/task.h
+++ b/src/libserver/task.h
@@ -1,11 +1,11 @@
-/*-
- * Copyright 2016 Vsevolod Stakhov
+/*
+ * Copyright 2024 Vsevolod Stakhov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -36,6 +36,7 @@ enum rspamd_command {
CMD_CHECK_RSPAMC, /* Legacy rspamc format (like SA one) */
CMD_CHECK, /* Legacy check - metric json reply */
CMD_CHECK_V2, /* Modern check - symbols in json reply */
+ CMD_METRICS,
};
enum rspamd_task_stage {
diff --git a/src/libserver/worker_util.c b/src/libserver/worker_util.c
index 02b0cd5ae..383d89c14 100644
--- a/src/libserver/worker_util.c
+++ b/src/libserver/worker_util.c
@@ -2354,3 +2354,205 @@ rspamd_worker_check_and_adjust_timeout(struct rspamd_config *cfg, double timeout
/* TODO: maybe adjust timeout */
return timeout;
}
+
+ucl_object_t *
+rspamd_worker_metrics_object(struct rspamd_config *cfg, struct rspamd_stat *stat, ev_tstamp uptime)
+{
+ rspamd_mempool_stat_t mem_st;
+ memset(&mem_st, 0, sizeof(mem_st));
+ rspamd_mempool_stat(&mem_st);
+
+ ucl_object_t *top = ucl_object_typed_new(UCL_OBJECT);
+
+ ucl_object_insert_key(top, ucl_object_fromstring(RVERSION), "version", 0, false);
+ ucl_object_insert_key(top, ucl_object_fromstring(cfg->checksum), "config_id", 0, false);
+ ucl_object_insert_key(top, ucl_object_fromdouble(uptime), "uptime", 0, false);
+ ucl_object_insert_key(top, ucl_object_fromint(stat->messages_scanned), "scanned", 0, false);
+ ucl_object_insert_key(top, ucl_object_fromint(stat->messages_learned), "learned", 0, false);
+
+ gsize cnt = MAX_AVG_TIME_SLOTS;
+ float sum = rspamd_sum_floats(stat->avg_time.avg_time, &cnt);
+
+ ucl_object_insert_key(top,
+ ucl_object_fromdouble(cnt > 0 ? (double) sum / cnt : 0.0), "avg_scan_time", 0, false);
+
+ unsigned spam = 0, ham = 0;
+
+ if (stat->messages_scanned > 0) {
+ ucl_object_t *sub = ucl_object_typed_new(UCL_OBJECT);
+ for (int i = METRIC_ACTION_REJECT; i <= METRIC_ACTION_NOACTION; i++) {
+ ucl_object_insert_key(sub,
+ ucl_object_fromint(stat->actions_stat[i]),
+ rspamd_action_to_str(i), 0, false);
+ if (i < METRIC_ACTION_GREYLIST) {
+ spam += stat->actions_stat[i];
+ }
+ else {
+ ham += stat->actions_stat[i];
+ }
+ }
+ ucl_object_insert_key(top, sub, "actions", 0, false);
+ }
+
+ ucl_object_insert_key(top, ucl_object_fromint(spam), "spam_count", 0, false);
+ ucl_object_insert_key(top, ucl_object_fromint(ham), "ham_count", 0, false);
+ ucl_object_insert_key(top,
+ ucl_object_fromint(stat->connections_count), "connections", 0, false);
+ ucl_object_insert_key(top,
+ ucl_object_fromint(stat->control_connections_count),
+ "control_connections", 0, false);
+
+ ucl_object_insert_key(top,
+ ucl_object_fromint(mem_st.pools_allocated), "pools_allocated", 0,
+ false);
+ ucl_object_insert_key(top,
+ ucl_object_fromint(mem_st.pools_freed), "pools_freed", 0, false);
+ ucl_object_insert_key(top,
+ ucl_object_fromint(mem_st.bytes_allocated), "bytes_allocated", 0,
+ false);
+ ucl_object_insert_key(top,
+ ucl_object_fromint(
+ mem_st.chunks_allocated),
+ "chunks_allocated", 0, false);
+ ucl_object_insert_key(top,
+ ucl_object_fromint(mem_st.shared_chunks_allocated),
+ "shared_chunks_allocated", 0, false);
+ ucl_object_insert_key(top,
+ ucl_object_fromint(mem_st.chunks_freed), "chunks_freed", 0, false);
+ ucl_object_insert_key(top,
+ ucl_object_fromint(
+ mem_st.oversized_chunks),
+ "chunks_oversized", 0, false);
+ ucl_object_insert_key(top,
+ ucl_object_fromint(mem_st.fragmented_size), "fragmented", 0, false);
+
+ return top;
+}
+
+rspamd_fstring_t *
+rspamd_metrics_to_prometheus_string(const ucl_object_t *top)
+{
+ rspamd_fstring_t *output = rspamd_fstring_sized_new(1024);
+
+ rspamd_printf_fstring(&output, "# HELP rspamd_build_info A metric with a constant '1' value "
+ "labeled by version from which rspamd was built.\n");
+ rspamd_printf_fstring(&output, "# TYPE rspamd_build_info gauge\n");
+ rspamd_printf_fstring(&output, "rspamd_build_info{version=\"%s\"} 1\n",
+ ucl_object_tostring(ucl_object_lookup(top, "version")));
+ rspamd_printf_fstring(&output, "# HELP rspamd_config A metric with a constant '1' value "
+ "labeled by id of the current config.\n");
+ rspamd_printf_fstring(&output, "# TYPE rspamd_config gauge\n");
+ rspamd_printf_fstring(&output, "rspamd_config{id=\"%s\"} 1\n",
+ ucl_object_tostring(ucl_object_lookup(top, "config_id")));
+
+
+ rspamd_metrics_add_integer(&output, top,
+ "rspamd_scan_time_average",
+ "gauge",
+ "Average messages scan time.",
+ "avg_scan_time");
+ rspamd_metrics_add_integer(&output, top,
+ "process_start_time_seconds",
+ "gauge",
+ "Start time of the process since unix epoch in seconds.",
+ "start_time");
+ rspamd_metrics_add_integer(&output, top,
+ "rspamd_read_only",
+ "gauge",
+ "Whether the rspamd instance is read-only.",
+ "read_only");
+ rspamd_metrics_add_integer(&output, top,
+ "rspamd_scanned_total",
+ "counter",
+ "Scanned messages.",
+ "scanned");
+ rspamd_metrics_add_integer(&output, top,
+ "rspamd_learned_total",
+ "counter",
+ "Learned messages.",
+ "learned");
+ rspamd_metrics_add_integer(&output, top,
+ "rspamd_spam_total",
+ "counter",
+ "Messages classified as spam.",
+ "spam_count");
+ rspamd_metrics_add_integer(&output, top,
+ "rspamd_ham_total",
+ "counter",
+ "Messages classified as ham.",
+ "ham_count");
+ rspamd_metrics_add_integer(&output, top,
+ "rspamd_connections",
+ "gauge",
+ "Active connections.",
+ "connections");
+ rspamd_metrics_add_integer(&output, top,
+ "rspamd_control_connections_total",
+ "gauge",
+ "Control connections.",
+ "control_connections");
+ rspamd_metrics_add_integer(&output, top,
+ "rspamd_pools_allocated",
+ "gauge",
+ "Pools allocated.",
+ "pools_allocated");
+ rspamd_metrics_add_integer(&output, top,
+ "rspamd_pools_freed",
+ "gauge",
+ "Pools freed.",
+ "pools_freed");
+ rspamd_metrics_add_integer(&output, top,
+ "rspamd_allocated_bytes",
+ "gauge",
+ "Bytes allocated.",
+ "bytes_allocated");
+ rspamd_metrics_add_integer(&output, top,
+ "rspamd_chunks_allocated",
+ "gauge",
+ "Memory pools: current chunks allocated.",
+ "chunks_allocated");
+ rspamd_metrics_add_integer(&output, top,
+ "rspamd_shared_chunks_allocated",
+ "gauge",
+ "Memory pools: current shared chunks allocated.",
+ "shared_chunks_allocated");
+ rspamd_metrics_add_integer(&output, top,
+ "rspamd_chunks_freed",
+ "gauge",
+ "Memory pools: current chunks freed.",
+ "chunks_freed");
+ rspamd_metrics_add_integer(&output, top,
+ "rspamd_chunks_oversized",
+ "gauge",
+ "Memory pools: current chunks oversized (needs extra allocation/fragmentation).",
+ "chunks_oversized");
+ rspamd_metrics_add_integer(&output, top,
+ "rspamd_fragmented",
+ "gauge",
+ "Memory pools: fragmented memory waste.",
+ "fragmented");
+
+ const ucl_object_t *acts_obj = ucl_object_lookup(top, "actions");
+
+ if (acts_obj) {
+ rspamd_printf_fstring(&output, "# HELP rspamd_actions_total Actions labelled by action type.\n");
+ rspamd_printf_fstring(&output, "# TYPE rspamd_actions_total counter\n");
+ for (int i = METRIC_ACTION_REJECT; i <= METRIC_ACTION_NOACTION; i++) {
+ const char *str_act = rspamd_action_to_str(i);
+ const ucl_object_t *act = ucl_object_lookup(acts_obj, str_act);
+
+ if (act) {
+ rspamd_printf_fstring(&output, "rspamd_actions_total{type=\"%s\"} %L\n",
+ str_act,
+ ucl_object_toint(act));
+ }
+ else {
+ rspamd_printf_fstring(&output, "rspamd_actions_total{type=\"%s\"} 0\n",
+ str_act);
+ }
+ }
+ }
+
+ /* Must be finalized and freed by caller */
+ return output;
+} \ No newline at end of file
diff --git a/src/libserver/worker_util.h b/src/libserver/worker_util.h
index e3627092d..b58a43f7f 100644
--- a/src/libserver/worker_util.h
+++ b/src/libserver/worker_util.h
@@ -1,11 +1,11 @@
-/*-
- * Copyright 2016 Vsevolod Stakhov
+/*
+ * Copyright 2024 Vsevolod Stakhov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -299,6 +299,32 @@ void rspamd_worker_init_controller(struct rspamd_worker *worker,
void rspamd_controller_store_saved_stats(struct rspamd_main *rspamd_main,
struct rspamd_config *cfg);
+/**
+ * Get metrics object for a worker
+ */
+ucl_object_t *rspamd_worker_metrics_object(struct rspamd_config *cfg, struct rspamd_stat *stat, ev_tstamp uptime);
+
+
+static inline void
+rspamd_metrics_add_integer(rspamd_fstring_t **output,
+ const ucl_object_t *top,
+ const char *name,
+ const char *type,
+ const char *description,
+ const char *ucl_key)
+{
+ rspamd_printf_fstring(output, "# HELP %s %s\n", name, description);
+ rspamd_printf_fstring(output, "# TYPE %s %s\n", name, type);
+ rspamd_printf_fstring(output, "%s %L\n", name,
+ ucl_object_toint(ucl_object_lookup(top, ucl_key)));
+}
+/**
+ * Convert metrics to the prometheus format
+ * @param top
+ * @return
+ */
+rspamd_fstring_t *rspamd_metrics_to_prometheus_string(const ucl_object_t *top);
+
#ifdef WITH_HYPERSCAN
struct rspamd_control_command;
diff --git a/src/rspamd.c b/src/rspamd.c
index dc5535f0c..117f3b995 100644
--- a/src/rspamd.c
+++ b/src/rspamd.c
@@ -1644,6 +1644,7 @@ int main(int argc, char **argv, char **env)
exit(EXIT_FAILURE);
}
+ rspamd_main->start_time = ev_time();
/* Unblock signals */
sigemptyset(&signals.sa_mask);
sigprocmask(SIG_SETMASK, &signals.sa_mask, NULL);
diff --git a/src/rspamd.h b/src/rspamd.h
index 60613ce0a..a5ef068e1 100644
--- a/src/rspamd.h
+++ b/src/rspamd.h
@@ -320,6 +320,7 @@ struct rspamd_main {
struct roll_history *history; /**< rolling history */
struct ev_loop *event_loop;
ev_signal term_ev, int_ev, hup_ev, usr1_ev; /**< signals */
+ ev_tstamp start_time;
struct rspamd_http_context *http_ctx;
};
diff --git a/src/rspamd_proxy.c b/src/rspamd_proxy.c
index 5bf1fb8f2..4f08e81b9 100644
--- a/src/rspamd_proxy.c
+++ b/src/rspamd_proxy.c
@@ -160,6 +160,7 @@ struct rspamd_proxy_ctx {
/* Language detector */
struct rspamd_lang_detector *lang_det;
double task_timeout;
+ struct rspamd_main *srv;
};
enum rspamd_backend_flags {
@@ -1734,6 +1735,8 @@ rspamd_proxy_scan_self_reply(struct rspamd_task *task)
int out_type = UCL_EMIT_JSON_COMPACT;
const char *ctype = "application/json";
const rspamd_ftok_t *accept_hdr = rspamd_task_get_request_header(task, "Accept");
+ rspamd_fstring_t *output;
+ struct rspamd_stat stat_copy;
if (accept_hdr && rspamd_substring_search(accept_hdr->begin, accept_hdr->len,
"application/msgpack", sizeof("application/msgpack") - 1) != -1) {
@@ -1759,6 +1762,13 @@ rspamd_proxy_scan_self_reply(struct rspamd_task *task)
rspamd_http_message_set_body(msg, "pong" CRLF, 6);
ctype = "text/plain";
break;
+ case CMD_METRICS:
+ memcpy(&stat_copy, session->ctx->srv->stat, sizeof(stat_copy));
+ output = rspamd_metrics_to_prometheus_string(
+ rspamd_worker_metrics_object(task->cfg, &stat_copy, ev_time() - session->ctx->srv->start_time));
+ rspamd_http_message_set_body_from_fstring_steal(msg, output);
+ ctype = "application/openmetrics-text; version=1.0.0; charset=utf-8";
+ break;
default:
msg_err_task("BROKEN");
break;
@@ -1903,7 +1913,7 @@ rspamd_proxy_self_scan(struct rspamd_proxy_session *session)
task->flags |= RSPAMD_TASK_FLAG_SKIP;
}
else {
- if (task->cmd == CMD_PING) {
+ if (task->cmd == CMD_PING || task->cmd == CMD_METRICS) {
task->flags |= RSPAMD_TASK_FLAG_SKIP;
}
else {
@@ -2412,6 +2422,7 @@ start_rspamd_proxy(struct rspamd_worker *worker)
g_assert(rspamd_worker_check_context(worker->ctx, rspamd_rspamd_proxy_magic));
ctx->cfg = worker->srv->cfg;
+ ctx->srv = worker->srv;
ctx->event_loop = rspamd_prepare_worker(worker, "rspamd_proxy",
proxy_accept_socket);
diff --git a/src/worker.c b/src/worker.c
index 150f813c9..5f8ba76c5 100644
--- a/src/worker.c
+++ b/src/worker.c
@@ -172,7 +172,7 @@ rspamd_worker_body_handler(struct rspamd_http_connection *conn,
task->flags |= RSPAMD_TASK_FLAG_SKIP;
}
else {
- if (task->cmd == CMD_PING) {
+ if (task->cmd == CMD_PING || task->cmd == CMD_METRICS) {
task->flags |= RSPAMD_TASK_FLAG_SKIP;
}
else {
diff --git a/src/worker_private.h b/src/worker_private.h
index c295eaf85..6fb40e970 100644
--- a/src/worker_private.h
+++ b/src/worker_private.h
@@ -1,11 +1,11 @@
-/*-
- * Copyright 2016 Vsevolod Stakhov
+/*
+ * Copyright 2024 Vsevolod Stakhov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,