From 655afcbbe99a841275ba34534f9300a67fede996 Mon Sep 17 00:00:00 2001 From: Vsevolod Stakhov Date: Mon, 24 Jun 2024 14:48:17 +0100 Subject: [PATCH] [Feature] Support metrics command in normal/proxy workers --- src/controller.c | 11 ++++------- src/libserver/protocol.c | 30 ++++++++++++++++++++++++++++-- src/libserver/protocol.h | 4 ++-- src/libserver/protocol_internal.h | 8 +++++--- src/libserver/task.c | 2 +- src/libserver/task.h | 7 ++++--- src/rspamd.c | 1 + src/rspamd.h | 1 + src/rspamd_proxy.c | 13 ++++++++++++- src/worker.c | 2 +- src/worker_private.h | 6 +++--- 11 files changed, 62 insertions(+), 23 deletions(-) diff --git a/src/controller.c b/src/controller.c index c8dbd9724..d91f99098 100644 --- a/src/controller.c +++ b/src/controller.c @@ -149,8 +149,6 @@ struct rspamd_controller_worker_ctx { /* HTTP server */ struct rspamd_http_context *http_ctx; struct rspamd_http_connection_router *http; - /* Server's start time */ - ev_tstamp start_time; /* Main server */ struct rspamd_main *srv; /* SSL cert */ @@ -762,7 +760,7 @@ rspamd_controller_handle_auth(struct rspamd_http_connection_entry *conn_ent, data[4] = st.actions_stat[METRIC_ACTION_SOFT_REJECT]; /* Get uptime */ - uptime = ev_time() - session->ctx->start_time; + uptime = ev_time() - session->ctx->srv->start_time; ucl_object_insert_key(obj, ucl_object_fromstring(RVERSION), "version", 0, false); ucl_object_insert_key(obj, ucl_object_fromstring("ok"), "auth", 0, false); @@ -2695,7 +2693,7 @@ rspamd_controller_handle_stat_common( ucl_object_insert_key(top, ucl_object_fromstring(RVERSION), "version", 0, false); ucl_object_insert_key(top, ucl_object_fromstring(session->ctx->cfg->checksum), "config_id", 0, false); - uptime = ev_time() - session->ctx->start_time; + uptime = ev_time() - session->ctx->srv->start_time; ucl_object_insert_key(top, ucl_object_fromint(uptime), "uptime", 0, false); ucl_object_insert_key(top, ucl_object_frombool(session->is_read_only), "read_only", 0, false); @@ -2960,12 +2958,12 @@ rspamd_controller_handle_metrics_common( struct rspamd_task *task; struct rspamd_stat_cbdata *cbdata; - uptime = ev_time() - session->ctx->start_time; + uptime = ev_time() - session->ctx->srv->start_time; ctx = session->ctx; memcpy(&stat_copy, session->ctx->worker->srv->stat, sizeof(stat_copy)); top = rspamd_worker_metrics_object(session->ctx->cfg, &stat_copy, uptime); - ucl_object_insert_key(top, ucl_object_fromint(session->ctx->start_time), "start_time", 0, false); + ucl_object_insert_key(top, ucl_object_fromint(session->ctx->srv->start_time), "start_time", 0, false); ucl_object_insert_key(top, ucl_object_frombool(session->is_read_only), "read_only", 0, false); task = rspamd_task_new(session->ctx->worker, session->cfg, session->pool, @@ -3884,7 +3882,6 @@ start_controller_worker(struct rspamd_worker *worker) "controller", rspamd_controller_accept_socket); - ctx->start_time = ev_time(); ctx->worker = worker; ctx->cfg = worker->srv->cfg; ctx->srv = worker->srv; diff --git a/src/libserver/protocol.c b/src/libserver/protocol.c index c8c7bc76a..db83b0bfb 100644 --- a/src/libserver/protocol.c +++ b/src/libserver/protocol.c @@ -25,6 +25,7 @@ #include "unix-std.h" #include "protocol_internal.h" #include "libserver/mempool_vars_internal.h" +#include "libserver/worker_util.h" #include "contrib/fastutf8/fastutf8.h" #include "task.h" #include "lua/lua_classnames.h" @@ -210,6 +211,19 @@ rspamd_protocol_handle_url(struct rspamd_task *task, goto err; } break; + case 'M': + case 'm': + /* metrics, process */ + if (COMPARE_CMD(p, MSG_CMD_METRICS, pathlen)) { + msg_debug_protocol("got metrics command"); + task->cmd = CMD_METRICS; + task->flags |= RSPAMD_TASK_FLAG_SKIP; + task->processed_stages |= RSPAMD_TASK_STAGE_DONE; /* Skip all */ + } + else { + goto err; + } + break; default: goto err; } @@ -2098,11 +2112,12 @@ void rspamd_protocol_write_log_pipe(struct rspamd_task *task) g_array_free(extra, TRUE); } -void rspamd_protocol_write_reply(struct rspamd_task *task, ev_tstamp timeout) +void rspamd_protocol_write_reply(struct rspamd_task *task, ev_tstamp timeout, struct rspamd_main *srv) { struct rspamd_http_message *msg; const char *ctype = "application/json"; rspamd_fstring_t *reply; + ev_tstamp now = ev_time(); msg = rspamd_http_new_message(HTTP_RESPONSE); @@ -2163,6 +2178,8 @@ void rspamd_protocol_write_reply(struct rspamd_task *task, ev_tstamp timeout) } } else { + rspamd_fstring_t *output; + struct rspamd_stat stat_copy; msg->status = rspamd_fstring_new_init("OK", 2); switch (task->cmd) { @@ -2179,6 +2196,15 @@ void rspamd_protocol_write_reply(struct rspamd_task *task, ev_tstamp timeout) rspamd_http_message_set_body(msg, "pong" CRLF, 6); ctype = "text/plain"; break; + case CMD_METRICS: + msg_debug_protocol("writing metrics to client"); + + memcpy(&stat_copy, srv->stat, sizeof(stat_copy)); + output = rspamd_metrics_to_prometheus_string( + rspamd_worker_metrics_object(srv->cfg, &stat_copy, now - srv->start_time)); + rspamd_http_message_set_body_from_fstring_steal(msg, output); + ctype = "application/openmetrics-text; version=1.0.0; charset=utf-8"; + break; default: msg_err_protocol("BROKEN"); break; @@ -2186,7 +2212,7 @@ void rspamd_protocol_write_reply(struct rspamd_task *task, ev_tstamp timeout) } ev_now_update(task->event_loop); - msg->date = ev_time(); + msg->date = now; rspamd_http_connection_reset(task->http_conn); rspamd_http_connection_write_message(task->http_conn, msg, NULL, diff --git a/src/libserver/protocol.h b/src/libserver/protocol.h index 94fbcbf04..9d2b985da 100644 --- a/src/libserver/protocol.h +++ b/src/libserver/protocol.h @@ -51,7 +51,7 @@ struct rspamd_protocol_log_message_sum { struct rspamd_protocol_log_symbol_result results[]; }; -struct rspamd_metric; +struct rspamd_main; /** * Process headers into HTTP message and set appropriate task fields @@ -126,7 +126,7 @@ ucl_object_t *rspamd_protocol_write_ucl(struct rspamd_task *task, * @param task task object * @return 0 if we wrote reply and -1 if there was some error */ -void rspamd_protocol_write_reply(struct rspamd_task *task, ev_tstamp timeout); +void rspamd_protocol_write_reply(struct rspamd_task *task, ev_tstamp timeout, struct rspamd_main *srv); /** * Convert rspamd output to legacy protocol reply diff --git a/src/libserver/protocol_internal.h b/src/libserver/protocol_internal.h index c604e9630..7a70ccef0 100644 --- a/src/libserver/protocol_internal.h +++ b/src/libserver/protocol_internal.h @@ -1,11 +1,11 @@ -/*- - * Copyright 2017 Vsevolod Stakhov +/* + * Copyright 2024 Vsevolod Stakhov * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -55,6 +55,8 @@ extern "C" { * Return a confirmation that spamd is alive */ #define MSG_CMD_PING "ping" + +#define MSG_CMD_METRICS "metrics" /* * Process this message as described above and return modified message */ diff --git a/src/libserver/task.c b/src/libserver/task.c index 63babf990..637f401a9 100644 --- a/src/libserver/task.c +++ b/src/libserver/task.c @@ -133,7 +133,7 @@ rspamd_task_reply(struct rspamd_task *task) } else { if (!(task->processed_stages & RSPAMD_TASK_STAGE_REPLIED)) { - rspamd_protocol_write_reply(task, write_timeout); + rspamd_protocol_write_reply(task, write_timeout, task->worker->srv); } } } diff --git a/src/libserver/task.h b/src/libserver/task.h index cba9bbbd4..7e6341a84 100644 --- a/src/libserver/task.h +++ b/src/libserver/task.h @@ -1,11 +1,11 @@ -/*- - * Copyright 2016 Vsevolod Stakhov +/* + * Copyright 2024 Vsevolod Stakhov * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -36,6 +36,7 @@ enum rspamd_command { CMD_CHECK_RSPAMC, /* Legacy rspamc format (like SA one) */ CMD_CHECK, /* Legacy check - metric json reply */ CMD_CHECK_V2, /* Modern check - symbols in json reply */ + CMD_METRICS, }; enum rspamd_task_stage { diff --git a/src/rspamd.c b/src/rspamd.c index dc5535f0c..117f3b995 100644 --- a/src/rspamd.c +++ b/src/rspamd.c @@ -1644,6 +1644,7 @@ int main(int argc, char **argv, char **env) exit(EXIT_FAILURE); } + rspamd_main->start_time = ev_time(); /* Unblock signals */ sigemptyset(&signals.sa_mask); sigprocmask(SIG_SETMASK, &signals.sa_mask, NULL); diff --git a/src/rspamd.h b/src/rspamd.h index 60613ce0a..a5ef068e1 100644 --- a/src/rspamd.h +++ b/src/rspamd.h @@ -320,6 +320,7 @@ struct rspamd_main { struct roll_history *history; /**< rolling history */ struct ev_loop *event_loop; ev_signal term_ev, int_ev, hup_ev, usr1_ev; /**< signals */ + ev_tstamp start_time; struct rspamd_http_context *http_ctx; }; diff --git a/src/rspamd_proxy.c b/src/rspamd_proxy.c index 5bf1fb8f2..4f08e81b9 100644 --- a/src/rspamd_proxy.c +++ b/src/rspamd_proxy.c @@ -160,6 +160,7 @@ struct rspamd_proxy_ctx { /* Language detector */ struct rspamd_lang_detector *lang_det; double task_timeout; + struct rspamd_main *srv; }; enum rspamd_backend_flags { @@ -1734,6 +1735,8 @@ rspamd_proxy_scan_self_reply(struct rspamd_task *task) int out_type = UCL_EMIT_JSON_COMPACT; const char *ctype = "application/json"; const rspamd_ftok_t *accept_hdr = rspamd_task_get_request_header(task, "Accept"); + rspamd_fstring_t *output; + struct rspamd_stat stat_copy; if (accept_hdr && rspamd_substring_search(accept_hdr->begin, accept_hdr->len, "application/msgpack", sizeof("application/msgpack") - 1) != -1) { @@ -1759,6 +1762,13 @@ rspamd_proxy_scan_self_reply(struct rspamd_task *task) rspamd_http_message_set_body(msg, "pong" CRLF, 6); ctype = "text/plain"; break; + case CMD_METRICS: + memcpy(&stat_copy, session->ctx->srv->stat, sizeof(stat_copy)); + output = rspamd_metrics_to_prometheus_string( + rspamd_worker_metrics_object(task->cfg, &stat_copy, ev_time() - session->ctx->srv->start_time)); + rspamd_http_message_set_body_from_fstring_steal(msg, output); + ctype = "application/openmetrics-text; version=1.0.0; charset=utf-8"; + break; default: msg_err_task("BROKEN"); break; @@ -1903,7 +1913,7 @@ rspamd_proxy_self_scan(struct rspamd_proxy_session *session) task->flags |= RSPAMD_TASK_FLAG_SKIP; } else { - if (task->cmd == CMD_PING) { + if (task->cmd == CMD_PING || task->cmd == CMD_METRICS) { task->flags |= RSPAMD_TASK_FLAG_SKIP; } else { @@ -2412,6 +2422,7 @@ start_rspamd_proxy(struct rspamd_worker *worker) g_assert(rspamd_worker_check_context(worker->ctx, rspamd_rspamd_proxy_magic)); ctx->cfg = worker->srv->cfg; + ctx->srv = worker->srv; ctx->event_loop = rspamd_prepare_worker(worker, "rspamd_proxy", proxy_accept_socket); diff --git a/src/worker.c b/src/worker.c index 150f813c9..5f8ba76c5 100644 --- a/src/worker.c +++ b/src/worker.c @@ -172,7 +172,7 @@ rspamd_worker_body_handler(struct rspamd_http_connection *conn, task->flags |= RSPAMD_TASK_FLAG_SKIP; } else { - if (task->cmd == CMD_PING) { + if (task->cmd == CMD_PING || task->cmd == CMD_METRICS) { task->flags |= RSPAMD_TASK_FLAG_SKIP; } else { diff --git a/src/worker_private.h b/src/worker_private.h index c295eaf85..6fb40e970 100644 --- a/src/worker_private.h +++ b/src/worker_private.h @@ -1,11 +1,11 @@ -/*- - * Copyright 2016 Vsevolod Stakhov +/* + * Copyright 2024 Vsevolod Stakhov * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, -- 2.39.5