diff options
author | Vsevolod Stakhov <vsevolod@rspamd.com> | 2024-06-24 20:01:45 +0600 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-06-24 20:01:45 +0600 |
commit | 1995012959e219e02aef939f9fd3765ffd718e04 (patch) | |
tree | b4b807aa0ce73c5c530a0ce8c7b72638acf0f417 | |
parent | 90b73439d20d6e5b9b9e61cecbaa9809d7d0ddcd (diff) | |
parent | 655afcbbe99a841275ba34534f9300a67fede996 (diff) | |
download | rspamd-1995012959e219e02aef939f9fd3765ffd718e04.tar.gz rspamd-1995012959e219e02aef939f9fd3765ffd718e04.zip |
Merge pull request #5026 from rspamd/vstakhov-metrics-worker
[Rework] Allow metrics endpoint to be enabled for each scanner
-rw-r--r-- | src/controller.c | 236 | ||||
-rw-r--r-- | src/libserver/protocol.c | 30 | ||||
-rw-r--r-- | src/libserver/protocol.h | 4 | ||||
-rw-r--r-- | src/libserver/protocol_internal.h | 8 | ||||
-rw-r--r-- | src/libserver/task.c | 2 | ||||
-rw-r--r-- | src/libserver/task.h | 7 | ||||
-rw-r--r-- | src/libserver/worker_util.c | 202 | ||||
-rw-r--r-- | src/libserver/worker_util.h | 32 | ||||
-rw-r--r-- | src/rspamd.c | 1 | ||||
-rw-r--r-- | src/rspamd.h | 1 | ||||
-rw-r--r-- | src/rspamd_proxy.c | 13 | ||||
-rw-r--r-- | src/worker.c | 2 | ||||
-rw-r--r-- | src/worker_private.h | 6 |
13 files changed, 306 insertions, 238 deletions
diff --git a/src/controller.c b/src/controller.c index 5526d9197..d91f99098 100644 --- a/src/controller.c +++ b/src/controller.c @@ -149,8 +149,6 @@ struct rspamd_controller_worker_ctx { /* HTTP server */ struct rspamd_http_context *http_ctx; struct rspamd_http_connection_router *http; - /* Server's start time */ - ev_tstamp start_time; /* Main server */ struct rspamd_main *srv; /* SSL cert */ @@ -762,7 +760,7 @@ rspamd_controller_handle_auth(struct rspamd_http_connection_entry *conn_ent, data[4] = st.actions_stat[METRIC_ACTION_SOFT_REJECT]; /* Get uptime */ - uptime = ev_time() - session->ctx->start_time; + uptime = ev_time() - session->ctx->srv->start_time; ucl_object_insert_key(obj, ucl_object_fromstring(RVERSION), "version", 0, false); ucl_object_insert_key(obj, ucl_object_fromstring("ok"), "auth", 0, false); @@ -2695,7 +2693,7 @@ rspamd_controller_handle_stat_common( ucl_object_insert_key(top, ucl_object_fromstring(RVERSION), "version", 0, false); ucl_object_insert_key(top, ucl_object_fromstring(session->ctx->cfg->checksum), "config_id", 0, false); - uptime = ev_time() - session->ctx->start_time; + uptime = ev_time() - session->ctx->srv->start_time; ucl_object_insert_key(top, ucl_object_fromint(uptime), "uptime", 0, false); ucl_object_insert_key(top, ucl_object_frombool(session->is_read_only), "read_only", 0, false); @@ -2811,19 +2809,6 @@ rspamd_controller_handle_statreset( return rspamd_controller_handle_stat_common(conn_ent, msg, TRUE); } -static inline void -rspamd_controller_metrics_add_integer(rspamd_fstring_t **output, - const ucl_object_t *top, - const char *name, - const char *type, - const char *description, - const char *ucl_key) -{ - rspamd_printf_fstring(output, "# HELP %s %s\n", name, description); - rspamd_printf_fstring(output, "# TYPE %s %s\n", name, type); - rspamd_printf_fstring(output, "%s %L\n", name, - ucl_object_toint(ucl_object_lookup(top, ucl_key))); -} /* * Metrics command handler: @@ -2836,138 +2821,11 @@ rspamd_controller_metrics_fin_task(void *ud) { struct rspamd_stat_cbdata *cbdata = ud; struct rspamd_http_connection_entry *conn_ent; - ucl_object_t *top; struct rspamd_fuzzy_stat_entry *entry; rspamd_fstring_t *output; - int i; conn_ent = cbdata->conn_ent; - top = cbdata->top; - - output = rspamd_fstring_sized_new(1024); - rspamd_printf_fstring(&output, "# HELP rspamd_build_info A metric with a constant '1' value " - "labeled by version from which rspamd was built.\n"); - rspamd_printf_fstring(&output, "# TYPE rspamd_build_info gauge\n"); - rspamd_printf_fstring(&output, "rspamd_build_info{version=\"%s\"} 1\n", - ucl_object_tostring(ucl_object_lookup(top, "version"))); - rspamd_printf_fstring(&output, "# HELP rspamd_config A metric with a constant '1' value " - "labeled by id of the current config.\n"); - rspamd_printf_fstring(&output, "# TYPE rspamd_config gauge\n"); - rspamd_printf_fstring(&output, "rspamd_config{id=\"%s\"} 1\n", - ucl_object_tostring(ucl_object_lookup(top, "config_id"))); - - gsize cnt = MAX_AVG_TIME_SLOTS; - float sum = rspamd_sum_floats(cbdata->ctx->worker->srv->stat->avg_time.avg_time, &cnt); - rspamd_printf_fstring(&output, "# HELP rspamd_scan_time_average Average messages scan time.\n"); - rspamd_printf_fstring(&output, "# TYPE rspamd_scan_time_average gauge\n"); - rspamd_printf_fstring(&output, "rspamd_scan_time_average %f\n", - cnt > 0 ? (double) sum / cnt : 0.0); - - rspamd_controller_metrics_add_integer(&output, top, - "process_start_time_seconds", - "gauge", - "Start time of the process since unix epoch in seconds.", - "start_time"); - rspamd_controller_metrics_add_integer(&output, top, - "rspamd_read_only", - "gauge", - "Whether the rspamd instance is read-only.", - "read_only"); - rspamd_controller_metrics_add_integer(&output, top, - "rspamd_scanned_total", - "counter", - "Scanned messages.", - "scanned"); - rspamd_controller_metrics_add_integer(&output, top, - "rspamd_learned_total", - "counter", - "Learned messages.", - "learned"); - rspamd_controller_metrics_add_integer(&output, top, - "rspamd_spam_total", - "counter", - "Messages classified as spam.", - "spam_count"); - rspamd_controller_metrics_add_integer(&output, top, - "rspamd_ham_total", - "counter", - "Messages classified as ham.", - "ham_count"); - rspamd_controller_metrics_add_integer(&output, top, - "rspamd_connections", - "gauge", - "Active connections.", - "connections"); - rspamd_controller_metrics_add_integer(&output, top, - "rspamd_control_connections_total", - "gauge", - "Control connections.", - "control_connections"); - rspamd_controller_metrics_add_integer(&output, top, - "rspamd_pools_allocated", - "gauge", - "Pools allocated.", - "pools_allocated"); - rspamd_controller_metrics_add_integer(&output, top, - "rspamd_pools_freed", - "gauge", - "Pools freed.", - "pools_freed"); - rspamd_controller_metrics_add_integer(&output, top, - "rspamd_allocated_bytes", - "gauge", - "Bytes allocated.", - "bytes_allocated"); - rspamd_controller_metrics_add_integer(&output, top, - "rspamd_chunks_allocated", - "gauge", - "Memory pools: current chunks allocated.", - "chunks_allocated"); - rspamd_controller_metrics_add_integer(&output, top, - "rspamd_shared_chunks_allocated", - "gauge", - "Memory pools: current shared chunks allocated.", - "shared_chunks_allocated"); - rspamd_controller_metrics_add_integer(&output, top, - "rspamd_chunks_freed", - "gauge", - "Memory pools: current chunks freed.", - "chunks_freed"); - rspamd_controller_metrics_add_integer(&output, top, - "rspamd_chunks_oversized", - "gauge", - "Memory pools: current chunks oversized (needs extra allocation/fragmentation).", - "chunks_oversized"); - rspamd_controller_metrics_add_integer(&output, top, - "rspamd_fragmented", - "gauge", - "Memory pools: fragmented memory waste.", - "fragmented"); - - rspamd_printf_fstring(&output, "# HELP rspamd_learns_total Total learns.\n"); - rspamd_printf_fstring(&output, "# TYPE rspamd_learns_total counter\n"); - rspamd_printf_fstring(&output, "rspamd_learns_total %L\n", cbdata->learned); - - const ucl_object_t *acts_obj = ucl_object_lookup(top, "actions"); - - if (acts_obj) { - rspamd_printf_fstring(&output, "# HELP rspamd_actions_total Actions labelled by action type.\n"); - rspamd_printf_fstring(&output, "# TYPE rspamd_actions_total counter\n"); - for (i = METRIC_ACTION_REJECT; i <= METRIC_ACTION_NOACTION; i++) { - const char *str_act = rspamd_action_to_str(i); - const ucl_object_t *act = ucl_object_lookup(acts_obj, str_act); - - if (act) { - rspamd_printf_fstring(&output, "rspamd_actions_total{type=\"%s\"} %L\n", - str_act, - ucl_object_toint(act)); - } - else { - rspamd_printf_fstring(&output, "rspamd_actions_total{type=\"%s\"} 0\n", - str_act); - } - } - } + output = rspamd_metrics_to_prometheus_string(cbdata->top); if (cbdata->stat) { const ucl_object_t *cur_elt; @@ -3081,7 +2939,6 @@ rspamd_controller_metrics_fin_task(void *ud) } rspamd_printf_fstring(&output, "# EOF\n"); - rspamd_controller_send_openmetrics(conn_ent, output); return TRUE; @@ -3094,22 +2951,21 @@ rspamd_controller_handle_metrics_common( gboolean do_reset) { struct rspamd_controller_session *session = conn_ent->ud; - ucl_object_t *top, *sub; - int i; - int64_t uptime; - uint64_t spam = 0, ham = 0; - rspamd_mempool_stat_t mem_st; - struct rspamd_stat *stat, stat_copy; + ucl_object_t *top; + ev_tstamp uptime; + struct rspamd_stat stat_copy; struct rspamd_controller_worker_ctx *ctx; struct rspamd_task *task; struct rspamd_stat_cbdata *cbdata; - memset(&mem_st, 0, sizeof(mem_st)); - rspamd_mempool_stat(&mem_st); - memcpy(&stat_copy, session->ctx->worker->srv->stat, sizeof(stat_copy)); - stat = &stat_copy; + uptime = ev_time() - session->ctx->srv->start_time; ctx = session->ctx; + memcpy(&stat_copy, session->ctx->worker->srv->stat, sizeof(stat_copy)); + top = rspamd_worker_metrics_object(session->ctx->cfg, &stat_copy, uptime); + ucl_object_insert_key(top, ucl_object_fromint(session->ctx->srv->start_time), "start_time", 0, false); + ucl_object_insert_key(top, ucl_object_frombool(session->is_read_only), + "read_only", 0, false); task = rspamd_task_new(session->ctx->worker, session->cfg, session->pool, ctx->lang_det, ctx->event_loop, FALSE); task->resolver = ctx->resolver; @@ -3117,7 +2973,6 @@ rspamd_controller_handle_metrics_common( cbdata->conn_ent = conn_ent; cbdata->task = task; cbdata->ctx = ctx; - top = ucl_object_typed_new(UCL_OBJECT); cbdata->top = top; task->s = rspamd_session_create(session->pool, @@ -3127,75 +2982,20 @@ rspamd_controller_handle_metrics_common( cbdata); task->fin_arg = cbdata; task->http_conn = rspamd_http_connection_ref(conn_ent->conn); - ; task->sock = conn_ent->conn->fd; - ucl_object_insert_key(top, ucl_object_fromstring(RVERSION), "version", 0, false); - ucl_object_insert_key(top, ucl_object_fromstring(session->ctx->cfg->checksum), "config_id", 0, false); - uptime = ev_time() - session->ctx->start_time; - ucl_object_insert_key(top, ucl_object_fromint(uptime), "uptime", 0, false); - ucl_object_insert_key(top, ucl_object_fromint(session->ctx->start_time), "start_time", 0, false); - ucl_object_insert_key(top, ucl_object_frombool(session->is_read_only), - "read_only", 0, false); - ucl_object_insert_key(top, ucl_object_fromint(stat->messages_scanned), "scanned", 0, false); - ucl_object_insert_key(top, ucl_object_fromint(stat->messages_learned), "learned", 0, false); + if (stat_copy.messages_scanned > 0 && do_reset) { + for (int i = METRIC_ACTION_REJECT; i <= METRIC_ACTION_NOACTION; i++) { - if (stat->messages_scanned > 0) { - sub = ucl_object_typed_new(UCL_OBJECT); - for (i = METRIC_ACTION_REJECT; i <= METRIC_ACTION_NOACTION; i++) { - ucl_object_insert_key(sub, - ucl_object_fromint(stat->actions_stat[i]), - rspamd_action_to_str(i), 0, false); - if (i < METRIC_ACTION_GREYLIST) { - spam += stat->actions_stat[i]; - } - else { - ham += stat->actions_stat[i]; - } - if (do_reset) { #ifndef HAVE_ATOMIC_BUILTINS - session->ctx->worker->srv->stat->actions_stat[i] = 0; + session->ctx->worker->srv->stat->actions_stat[i] = 0; #else - __atomic_store_n(&session->ctx->worker->srv->stat->actions_stat[i], - 0, __ATOMIC_RELEASE); + __atomic_store_n(&session->ctx->worker->srv->stat->actions_stat[i], + 0, __ATOMIC_RELEASE); #endif - } } - ucl_object_insert_key(top, sub, "actions", 0, false); } - ucl_object_insert_key(top, ucl_object_fromint(spam), "spam_count", 0, false); - ucl_object_insert_key(top, ucl_object_fromint(ham), "ham_count", 0, false); - ucl_object_insert_key(top, - ucl_object_fromint(stat->connections_count), "connections", 0, false); - ucl_object_insert_key(top, - ucl_object_fromint(stat->control_connections_count), - "control_connections", 0, false); - - ucl_object_insert_key(top, - ucl_object_fromint(mem_st.pools_allocated), "pools_allocated", 0, - false); - ucl_object_insert_key(top, - ucl_object_fromint(mem_st.pools_freed), "pools_freed", 0, false); - ucl_object_insert_key(top, - ucl_object_fromint(mem_st.bytes_allocated), "bytes_allocated", 0, - false); - ucl_object_insert_key(top, - ucl_object_fromint( - mem_st.chunks_allocated), - "chunks_allocated", 0, false); - ucl_object_insert_key(top, - ucl_object_fromint(mem_st.shared_chunks_allocated), - "shared_chunks_allocated", 0, false); - ucl_object_insert_key(top, - ucl_object_fromint(mem_st.chunks_freed), "chunks_freed", 0, false); - ucl_object_insert_key(top, - ucl_object_fromint( - mem_st.oversized_chunks), - "chunks_oversized", 0, false); - ucl_object_insert_key(top, - ucl_object_fromint(mem_st.fragmented_size), "fragmented", 0, false); - if (do_reset) { session->ctx->srv->stat->messages_scanned = 0; session->ctx->srv->stat->messages_learned = 0; @@ -3210,7 +3010,6 @@ rspamd_controller_handle_metrics_common( rspamd_stat_statistics(task, session->ctx->cfg, &cbdata->learned, &cbdata->stat); session->task = task; - rspamd_session_pending(task->s); @@ -4083,7 +3882,6 @@ start_controller_worker(struct rspamd_worker *worker) "controller", rspamd_controller_accept_socket); - ctx->start_time = ev_time(); ctx->worker = worker; ctx->cfg = worker->srv->cfg; ctx->srv = worker->srv; diff --git a/src/libserver/protocol.c b/src/libserver/protocol.c index c8c7bc76a..db83b0bfb 100644 --- a/src/libserver/protocol.c +++ b/src/libserver/protocol.c @@ -25,6 +25,7 @@ #include "unix-std.h" #include "protocol_internal.h" #include "libserver/mempool_vars_internal.h" +#include "libserver/worker_util.h" #include "contrib/fastutf8/fastutf8.h" #include "task.h" #include "lua/lua_classnames.h" @@ -210,6 +211,19 @@ rspamd_protocol_handle_url(struct rspamd_task *task, goto err; } break; + case 'M': + case 'm': + /* metrics, process */ + if (COMPARE_CMD(p, MSG_CMD_METRICS, pathlen)) { + msg_debug_protocol("got metrics command"); + task->cmd = CMD_METRICS; + task->flags |= RSPAMD_TASK_FLAG_SKIP; + task->processed_stages |= RSPAMD_TASK_STAGE_DONE; /* Skip all */ + } + else { + goto err; + } + break; default: goto err; } @@ -2098,11 +2112,12 @@ void rspamd_protocol_write_log_pipe(struct rspamd_task *task) g_array_free(extra, TRUE); } -void rspamd_protocol_write_reply(struct rspamd_task *task, ev_tstamp timeout) +void rspamd_protocol_write_reply(struct rspamd_task *task, ev_tstamp timeout, struct rspamd_main *srv) { struct rspamd_http_message *msg; const char *ctype = "application/json"; rspamd_fstring_t *reply; + ev_tstamp now = ev_time(); msg = rspamd_http_new_message(HTTP_RESPONSE); @@ -2163,6 +2178,8 @@ void rspamd_protocol_write_reply(struct rspamd_task *task, ev_tstamp timeout) } } else { + rspamd_fstring_t *output; + struct rspamd_stat stat_copy; msg->status = rspamd_fstring_new_init("OK", 2); switch (task->cmd) { @@ -2179,6 +2196,15 @@ void rspamd_protocol_write_reply(struct rspamd_task *task, ev_tstamp timeout) rspamd_http_message_set_body(msg, "pong" CRLF, 6); ctype = "text/plain"; break; + case CMD_METRICS: + msg_debug_protocol("writing metrics to client"); + + memcpy(&stat_copy, srv->stat, sizeof(stat_copy)); + output = rspamd_metrics_to_prometheus_string( + rspamd_worker_metrics_object(srv->cfg, &stat_copy, now - srv->start_time)); + rspamd_http_message_set_body_from_fstring_steal(msg, output); + ctype = "application/openmetrics-text; version=1.0.0; charset=utf-8"; + break; default: msg_err_protocol("BROKEN"); break; @@ -2186,7 +2212,7 @@ void rspamd_protocol_write_reply(struct rspamd_task *task, ev_tstamp timeout) } ev_now_update(task->event_loop); - msg->date = ev_time(); + msg->date = now; rspamd_http_connection_reset(task->http_conn); rspamd_http_connection_write_message(task->http_conn, msg, NULL, diff --git a/src/libserver/protocol.h b/src/libserver/protocol.h index 94fbcbf04..9d2b985da 100644 --- a/src/libserver/protocol.h +++ b/src/libserver/protocol.h @@ -51,7 +51,7 @@ struct rspamd_protocol_log_message_sum { struct rspamd_protocol_log_symbol_result results[]; }; -struct rspamd_metric; +struct rspamd_main; /** * Process headers into HTTP message and set appropriate task fields @@ -126,7 +126,7 @@ ucl_object_t *rspamd_protocol_write_ucl(struct rspamd_task *task, * @param task task object * @return 0 if we wrote reply and -1 if there was some error */ -void rspamd_protocol_write_reply(struct rspamd_task *task, ev_tstamp timeout); +void rspamd_protocol_write_reply(struct rspamd_task *task, ev_tstamp timeout, struct rspamd_main *srv); /** * Convert rspamd output to legacy protocol reply diff --git a/src/libserver/protocol_internal.h b/src/libserver/protocol_internal.h index c604e9630..7a70ccef0 100644 --- a/src/libserver/protocol_internal.h +++ b/src/libserver/protocol_internal.h @@ -1,11 +1,11 @@ -/*- - * Copyright 2017 Vsevolod Stakhov +/* + * Copyright 2024 Vsevolod Stakhov * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -55,6 +55,8 @@ extern "C" { * Return a confirmation that spamd is alive */ #define MSG_CMD_PING "ping" + +#define MSG_CMD_METRICS "metrics" /* * Process this message as described above and return modified message */ diff --git a/src/libserver/task.c b/src/libserver/task.c index 63babf990..637f401a9 100644 --- a/src/libserver/task.c +++ b/src/libserver/task.c @@ -133,7 +133,7 @@ rspamd_task_reply(struct rspamd_task *task) } else { if (!(task->processed_stages & RSPAMD_TASK_STAGE_REPLIED)) { - rspamd_protocol_write_reply(task, write_timeout); + rspamd_protocol_write_reply(task, write_timeout, task->worker->srv); } } } diff --git a/src/libserver/task.h b/src/libserver/task.h index cba9bbbd4..7e6341a84 100644 --- a/src/libserver/task.h +++ b/src/libserver/task.h @@ -1,11 +1,11 @@ -/*- - * Copyright 2016 Vsevolod Stakhov +/* + * Copyright 2024 Vsevolod Stakhov * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -36,6 +36,7 @@ enum rspamd_command { CMD_CHECK_RSPAMC, /* Legacy rspamc format (like SA one) */ CMD_CHECK, /* Legacy check - metric json reply */ CMD_CHECK_V2, /* Modern check - symbols in json reply */ + CMD_METRICS, }; enum rspamd_task_stage { diff --git a/src/libserver/worker_util.c b/src/libserver/worker_util.c index 02b0cd5ae..383d89c14 100644 --- a/src/libserver/worker_util.c +++ b/src/libserver/worker_util.c @@ -2354,3 +2354,205 @@ rspamd_worker_check_and_adjust_timeout(struct rspamd_config *cfg, double timeout /* TODO: maybe adjust timeout */ return timeout; } + +ucl_object_t * +rspamd_worker_metrics_object(struct rspamd_config *cfg, struct rspamd_stat *stat, ev_tstamp uptime) +{ + rspamd_mempool_stat_t mem_st; + memset(&mem_st, 0, sizeof(mem_st)); + rspamd_mempool_stat(&mem_st); + + ucl_object_t *top = ucl_object_typed_new(UCL_OBJECT); + + ucl_object_insert_key(top, ucl_object_fromstring(RVERSION), "version", 0, false); + ucl_object_insert_key(top, ucl_object_fromstring(cfg->checksum), "config_id", 0, false); + ucl_object_insert_key(top, ucl_object_fromdouble(uptime), "uptime", 0, false); + ucl_object_insert_key(top, ucl_object_fromint(stat->messages_scanned), "scanned", 0, false); + ucl_object_insert_key(top, ucl_object_fromint(stat->messages_learned), "learned", 0, false); + + gsize cnt = MAX_AVG_TIME_SLOTS; + float sum = rspamd_sum_floats(stat->avg_time.avg_time, &cnt); + + ucl_object_insert_key(top, + ucl_object_fromdouble(cnt > 0 ? (double) sum / cnt : 0.0), "avg_scan_time", 0, false); + + unsigned spam = 0, ham = 0; + + if (stat->messages_scanned > 0) { + ucl_object_t *sub = ucl_object_typed_new(UCL_OBJECT); + for (int i = METRIC_ACTION_REJECT; i <= METRIC_ACTION_NOACTION; i++) { + ucl_object_insert_key(sub, + ucl_object_fromint(stat->actions_stat[i]), + rspamd_action_to_str(i), 0, false); + if (i < METRIC_ACTION_GREYLIST) { + spam += stat->actions_stat[i]; + } + else { + ham += stat->actions_stat[i]; + } + } + ucl_object_insert_key(top, sub, "actions", 0, false); + } + + ucl_object_insert_key(top, ucl_object_fromint(spam), "spam_count", 0, false); + ucl_object_insert_key(top, ucl_object_fromint(ham), "ham_count", 0, false); + ucl_object_insert_key(top, + ucl_object_fromint(stat->connections_count), "connections", 0, false); + ucl_object_insert_key(top, + ucl_object_fromint(stat->control_connections_count), + "control_connections", 0, false); + + ucl_object_insert_key(top, + ucl_object_fromint(mem_st.pools_allocated), "pools_allocated", 0, + false); + ucl_object_insert_key(top, + ucl_object_fromint(mem_st.pools_freed), "pools_freed", 0, false); + ucl_object_insert_key(top, + ucl_object_fromint(mem_st.bytes_allocated), "bytes_allocated", 0, + false); + ucl_object_insert_key(top, + ucl_object_fromint( + mem_st.chunks_allocated), + "chunks_allocated", 0, false); + ucl_object_insert_key(top, + ucl_object_fromint(mem_st.shared_chunks_allocated), + "shared_chunks_allocated", 0, false); + ucl_object_insert_key(top, + ucl_object_fromint(mem_st.chunks_freed), "chunks_freed", 0, false); + ucl_object_insert_key(top, + ucl_object_fromint( + mem_st.oversized_chunks), + "chunks_oversized", 0, false); + ucl_object_insert_key(top, + ucl_object_fromint(mem_st.fragmented_size), "fragmented", 0, false); + + return top; +} + +rspamd_fstring_t * +rspamd_metrics_to_prometheus_string(const ucl_object_t *top) +{ + rspamd_fstring_t *output = rspamd_fstring_sized_new(1024); + + rspamd_printf_fstring(&output, "# HELP rspamd_build_info A metric with a constant '1' value " + "labeled by version from which rspamd was built.\n"); + rspamd_printf_fstring(&output, "# TYPE rspamd_build_info gauge\n"); + rspamd_printf_fstring(&output, "rspamd_build_info{version=\"%s\"} 1\n", + ucl_object_tostring(ucl_object_lookup(top, "version"))); + rspamd_printf_fstring(&output, "# HELP rspamd_config A metric with a constant '1' value " + "labeled by id of the current config.\n"); + rspamd_printf_fstring(&output, "# TYPE rspamd_config gauge\n"); + rspamd_printf_fstring(&output, "rspamd_config{id=\"%s\"} 1\n", + ucl_object_tostring(ucl_object_lookup(top, "config_id"))); + + + rspamd_metrics_add_integer(&output, top, + "rspamd_scan_time_average", + "gauge", + "Average messages scan time.", + "avg_scan_time"); + rspamd_metrics_add_integer(&output, top, + "process_start_time_seconds", + "gauge", + "Start time of the process since unix epoch in seconds.", + "start_time"); + rspamd_metrics_add_integer(&output, top, + "rspamd_read_only", + "gauge", + "Whether the rspamd instance is read-only.", + "read_only"); + rspamd_metrics_add_integer(&output, top, + "rspamd_scanned_total", + "counter", + "Scanned messages.", + "scanned"); + rspamd_metrics_add_integer(&output, top, + "rspamd_learned_total", + "counter", + "Learned messages.", + "learned"); + rspamd_metrics_add_integer(&output, top, + "rspamd_spam_total", + "counter", + "Messages classified as spam.", + "spam_count"); + rspamd_metrics_add_integer(&output, top, + "rspamd_ham_total", + "counter", + "Messages classified as ham.", + "ham_count"); + rspamd_metrics_add_integer(&output, top, + "rspamd_connections", + "gauge", + "Active connections.", + "connections"); + rspamd_metrics_add_integer(&output, top, + "rspamd_control_connections_total", + "gauge", + "Control connections.", + "control_connections"); + rspamd_metrics_add_integer(&output, top, + "rspamd_pools_allocated", + "gauge", + "Pools allocated.", + "pools_allocated"); + rspamd_metrics_add_integer(&output, top, + "rspamd_pools_freed", + "gauge", + "Pools freed.", + "pools_freed"); + rspamd_metrics_add_integer(&output, top, + "rspamd_allocated_bytes", + "gauge", + "Bytes allocated.", + "bytes_allocated"); + rspamd_metrics_add_integer(&output, top, + "rspamd_chunks_allocated", + "gauge", + "Memory pools: current chunks allocated.", + "chunks_allocated"); + rspamd_metrics_add_integer(&output, top, + "rspamd_shared_chunks_allocated", + "gauge", + "Memory pools: current shared chunks allocated.", + "shared_chunks_allocated"); + rspamd_metrics_add_integer(&output, top, + "rspamd_chunks_freed", + "gauge", + "Memory pools: current chunks freed.", + "chunks_freed"); + rspamd_metrics_add_integer(&output, top, + "rspamd_chunks_oversized", + "gauge", + "Memory pools: current chunks oversized (needs extra allocation/fragmentation).", + "chunks_oversized"); + rspamd_metrics_add_integer(&output, top, + "rspamd_fragmented", + "gauge", + "Memory pools: fragmented memory waste.", + "fragmented"); + + const ucl_object_t *acts_obj = ucl_object_lookup(top, "actions"); + + if (acts_obj) { + rspamd_printf_fstring(&output, "# HELP rspamd_actions_total Actions labelled by action type.\n"); + rspamd_printf_fstring(&output, "# TYPE rspamd_actions_total counter\n"); + for (int i = METRIC_ACTION_REJECT; i <= METRIC_ACTION_NOACTION; i++) { + const char *str_act = rspamd_action_to_str(i); + const ucl_object_t *act = ucl_object_lookup(acts_obj, str_act); + + if (act) { + rspamd_printf_fstring(&output, "rspamd_actions_total{type=\"%s\"} %L\n", + str_act, + ucl_object_toint(act)); + } + else { + rspamd_printf_fstring(&output, "rspamd_actions_total{type=\"%s\"} 0\n", + str_act); + } + } + } + + /* Must be finalized and freed by caller */ + return output; +}
\ No newline at end of file diff --git a/src/libserver/worker_util.h b/src/libserver/worker_util.h index e3627092d..b58a43f7f 100644 --- a/src/libserver/worker_util.h +++ b/src/libserver/worker_util.h @@ -1,11 +1,11 @@ -/*- - * Copyright 2016 Vsevolod Stakhov +/* + * Copyright 2024 Vsevolod Stakhov * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -299,6 +299,32 @@ void rspamd_worker_init_controller(struct rspamd_worker *worker, void rspamd_controller_store_saved_stats(struct rspamd_main *rspamd_main, struct rspamd_config *cfg); +/** + * Get metrics object for a worker + */ +ucl_object_t *rspamd_worker_metrics_object(struct rspamd_config *cfg, struct rspamd_stat *stat, ev_tstamp uptime); + + +static inline void +rspamd_metrics_add_integer(rspamd_fstring_t **output, + const ucl_object_t *top, + const char *name, + const char *type, + const char *description, + const char *ucl_key) +{ + rspamd_printf_fstring(output, "# HELP %s %s\n", name, description); + rspamd_printf_fstring(output, "# TYPE %s %s\n", name, type); + rspamd_printf_fstring(output, "%s %L\n", name, + ucl_object_toint(ucl_object_lookup(top, ucl_key))); +} +/** + * Convert metrics to the prometheus format + * @param top + * @return + */ +rspamd_fstring_t *rspamd_metrics_to_prometheus_string(const ucl_object_t *top); + #ifdef WITH_HYPERSCAN struct rspamd_control_command; diff --git a/src/rspamd.c b/src/rspamd.c index dc5535f0c..117f3b995 100644 --- a/src/rspamd.c +++ b/src/rspamd.c @@ -1644,6 +1644,7 @@ int main(int argc, char **argv, char **env) exit(EXIT_FAILURE); } + rspamd_main->start_time = ev_time(); /* Unblock signals */ sigemptyset(&signals.sa_mask); sigprocmask(SIG_SETMASK, &signals.sa_mask, NULL); diff --git a/src/rspamd.h b/src/rspamd.h index 60613ce0a..a5ef068e1 100644 --- a/src/rspamd.h +++ b/src/rspamd.h @@ -320,6 +320,7 @@ struct rspamd_main { struct roll_history *history; /**< rolling history */ struct ev_loop *event_loop; ev_signal term_ev, int_ev, hup_ev, usr1_ev; /**< signals */ + ev_tstamp start_time; struct rspamd_http_context *http_ctx; }; diff --git a/src/rspamd_proxy.c b/src/rspamd_proxy.c index 5bf1fb8f2..4f08e81b9 100644 --- a/src/rspamd_proxy.c +++ b/src/rspamd_proxy.c @@ -160,6 +160,7 @@ struct rspamd_proxy_ctx { /* Language detector */ struct rspamd_lang_detector *lang_det; double task_timeout; + struct rspamd_main *srv; }; enum rspamd_backend_flags { @@ -1734,6 +1735,8 @@ rspamd_proxy_scan_self_reply(struct rspamd_task *task) int out_type = UCL_EMIT_JSON_COMPACT; const char *ctype = "application/json"; const rspamd_ftok_t *accept_hdr = rspamd_task_get_request_header(task, "Accept"); + rspamd_fstring_t *output; + struct rspamd_stat stat_copy; if (accept_hdr && rspamd_substring_search(accept_hdr->begin, accept_hdr->len, "application/msgpack", sizeof("application/msgpack") - 1) != -1) { @@ -1759,6 +1762,13 @@ rspamd_proxy_scan_self_reply(struct rspamd_task *task) rspamd_http_message_set_body(msg, "pong" CRLF, 6); ctype = "text/plain"; break; + case CMD_METRICS: + memcpy(&stat_copy, session->ctx->srv->stat, sizeof(stat_copy)); + output = rspamd_metrics_to_prometheus_string( + rspamd_worker_metrics_object(task->cfg, &stat_copy, ev_time() - session->ctx->srv->start_time)); + rspamd_http_message_set_body_from_fstring_steal(msg, output); + ctype = "application/openmetrics-text; version=1.0.0; charset=utf-8"; + break; default: msg_err_task("BROKEN"); break; @@ -1903,7 +1913,7 @@ rspamd_proxy_self_scan(struct rspamd_proxy_session *session) task->flags |= RSPAMD_TASK_FLAG_SKIP; } else { - if (task->cmd == CMD_PING) { + if (task->cmd == CMD_PING || task->cmd == CMD_METRICS) { task->flags |= RSPAMD_TASK_FLAG_SKIP; } else { @@ -2412,6 +2422,7 @@ start_rspamd_proxy(struct rspamd_worker *worker) g_assert(rspamd_worker_check_context(worker->ctx, rspamd_rspamd_proxy_magic)); ctx->cfg = worker->srv->cfg; + ctx->srv = worker->srv; ctx->event_loop = rspamd_prepare_worker(worker, "rspamd_proxy", proxy_accept_socket); diff --git a/src/worker.c b/src/worker.c index 150f813c9..5f8ba76c5 100644 --- a/src/worker.c +++ b/src/worker.c @@ -172,7 +172,7 @@ rspamd_worker_body_handler(struct rspamd_http_connection *conn, task->flags |= RSPAMD_TASK_FLAG_SKIP; } else { - if (task->cmd == CMD_PING) { + if (task->cmd == CMD_PING || task->cmd == CMD_METRICS) { task->flags |= RSPAMD_TASK_FLAG_SKIP; } else { diff --git a/src/worker_private.h b/src/worker_private.h index c295eaf85..6fb40e970 100644 --- a/src/worker_private.h +++ b/src/worker_private.h @@ -1,11 +1,11 @@ -/*- - * Copyright 2016 Vsevolod Stakhov +/* + * Copyright 2024 Vsevolod Stakhov * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, |