From 012167478aa431136aa5e1522d38f6fc971868b5 Mon Sep 17 00:00:00 2001 From: Vsevolod Stakhov Date: Mon, 7 Jan 2013 18:32:15 +0400 Subject: [PATCH] * Add rolling history feature saving last 200 scanned messages. Add /history command handler in webui. --- lib/CMakeLists.txt | 1 + src/main.c | 3 + src/main.h | 3 + src/protocol.c | 15 ++++- src/roll_history.c | 145 +++++++++++++++++++++++++++++++++++++++++++++ src/roll_history.h | 90 ++++++++++++++++++++++++++++ src/smtp_utils.c | 4 +- src/util.c | 7 ++- src/util.h | 4 +- src/webui.c | 139 +++++++++++++++++++++++++++++++++++++++++++ 10 files changed, 403 insertions(+), 8 deletions(-) create mode 100644 src/roll_history.c create mode 100644 src/roll_history.h diff --git a/lib/CMakeLists.txt b/lib/CMakeLists.txt index 4f0ede5b8..846b68fc2 100644 --- a/lib/CMakeLists.txt +++ b/lib/CMakeLists.txt @@ -39,6 +39,7 @@ SET(LIBRSPAMDSERVERSRC ../src/kvstorage_file.c ../src/lmtp_proto.c ../src/proxy.c + ../src/roll_history.c ../src/settings.c ../src/spf.c ../src/statfile.c diff --git a/src/main.c b/src/main.c index 94001298d..c450500ad 100644 --- a/src/main.c +++ b/src/main.c @@ -1022,6 +1022,9 @@ main (gint argc, gchar **argv, gchar **env) config_logger (rspamd_main, type, TRUE); + /* Create rolling history */ + rspamd_main->history = rspamd_roll_history_new (rspamd_main->server_pool); + msg_info ("rspamd " RVERSION " is starting, build id: " RID); rspamd_main->cfg->cfg_name = memory_pool_strdup (rspamd_main->cfg->cfg_pool, rspamd_main->cfg->cfg_name); diff --git a/src/main.h b/src/main.h index 9a28793cd..521e0740a 100644 --- a/src/main.h +++ b/src/main.h @@ -19,6 +19,7 @@ #include "events.h" #include "util.h" #include "logger.h" +#include "roll_history.h" /* Default values */ #define FIXED_CONFIG_FILE ETC_PREFIX "/rspamd.xml" @@ -104,6 +105,7 @@ struct rspamd_main { uid_t workers_uid; /**< worker's uid running to */ gid_t workers_gid; /**< worker's gid running to */ gboolean is_privilleged; /**< true if run in privilleged mode */ + struct roll_history *history; /**< rolling history */ }; struct counter_data { @@ -258,6 +260,7 @@ struct worker_task { #endif struct timeval tv; /**< time of connection */ struct rspamd_view *view; /**< matching view */ + guint32 scan_milliseconds; /**< how much milliseconds passed */ gboolean view_checked; gboolean pass_all_filters; /**< pass task throught every rule */ guint32 parser_recursion; /**< for avoiding recursion stack overflow */ diff --git a/src/protocol.c b/src/protocol.c index a9c2638ef..a303b4285 100644 --- a/src/protocol.c +++ b/src/protocol.c @@ -1291,11 +1291,11 @@ show_metric_result (gpointer metric_name, gpointer metric_value, void *user_data cd->log_offset += rspamd_snprintf (cd->log_buf + cd->log_offset, cd->log_size - cd->log_offset, "]), len: %z, time: %s, dns req: %d,", task->msg->len, calculate_check_time (&task->tv, &task->ts, - task->cfg->clock_res), task->dns_requests); + task->cfg->clock_res, &task->scan_milliseconds), task->dns_requests); #else cd->log_offset += rspamd_snprintf (cd->log_buf + cd->log_offset, cd->log_size - cd->log_offset, "]), len: %z, time: %s, dns req: %d,", - task->msg->len, calculate_check_time (&task->tv, task->cfg->clock_res), task->dns_requests); + task->msg->len, calculate_check_time (&task->tv, task->cfg->clock_res, &task->scan_milliseconds), task->dns_requests); #endif } } @@ -1362,6 +1362,7 @@ write_check_reply (struct worker_task *task) } cd.alive = TRUE; + if (task->proto == SPAMC_PROTO && !task->is_http) { /* Ignore metrics, just write report for 'default' metric */ @@ -1379,6 +1380,8 @@ write_check_reply (struct worker_task *task) return FALSE; } } + /* Update history */ + rspamd_roll_history_update (task->worker->srv->history, task); } else { /* Show default metric first */ @@ -1401,6 +1404,8 @@ write_check_reply (struct worker_task *task) return FALSE; } } + /* Update history */ + rspamd_roll_history_update (task->worker->srv->history, task); g_hash_table_remove (task->results, "default"); /* Write result for each metric separately */ @@ -1433,6 +1438,7 @@ write_check_reply (struct worker_task *task) write_hashes_to_log (task, logbuf, cd.log_offset, cd.log_size); msg_info ("%s", logbuf); + if (!task->is_json) { /* Write message id */ if (task->proto == RSPAMC_PROTO && task->proto_ver >= 12) { @@ -1518,6 +1524,7 @@ write_process_reply (struct worker_task *task) cd.log_size = sizeof (logbuf); cd.alive = TRUE; + if (task->proto == SPAMC_PROTO) { /* Ignore metrics, just write report for 'default' metric */ metric_res = g_hash_table_lookup (task->results, "default"); @@ -1534,6 +1541,8 @@ write_process_reply (struct worker_task *task) return FALSE; } } + /* Update history */ + rspamd_roll_history_update (task->worker->srv->history, task); } else { /* Show default metric first */ @@ -1551,6 +1560,8 @@ write_process_reply (struct worker_task *task) return FALSE; } } + /* Update history */ + rspamd_roll_history_update (task->worker->srv->history, task); g_hash_table_remove (task->results, "default"); /* Write result for each metric separately */ diff --git a/src/roll_history.c b/src/roll_history.c new file mode 100644 index 000000000..fec13045a --- /dev/null +++ b/src/roll_history.c @@ -0,0 +1,145 @@ +/* Copyright (c) 2010-2012, Vsevolod Stakhov + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED ''AS IS'' AND ANY + * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED + * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE + * DISCLAIMED. IN NO EVENT SHALL AUTHOR BE LIABLE FOR ANY + * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES + * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; + * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND + * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS + * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + + + +#include "config.h" +#include "main.h" +#include "roll_history.h" + + +/** + * Returns new roll history + * @param pool pool for shared memory + * @return new structure + */ +struct roll_history* +rspamd_roll_history_new (memory_pool_t *pool) +{ + struct roll_history *new; + + if (pool == NULL) { + return NULL; + } + + new = memory_pool_alloc0_shared (pool, sizeof (struct roll_history)); + new->pool = pool; + new->mtx = memory_pool_get_mutex (pool); + + return new; +} + +struct history_metric_callback_data { + gchar *pos; + gint remain; +}; + +static void +roll_history_symbols_callback (gpointer key, gpointer value, void *user_data) +{ + struct history_metric_callback_data *cb = user_data; + struct symbol *s = value; + guint wr; + + if (cb->remain > 0) { + wr = rspamd_snprintf (cb->pos, cb->remain, "%s, ", s->name); + cb->pos += wr; + cb->remain -= wr; + } +} + +/** + * Update roll history with data from task + * @param history roll history object + * @param task task object + */ +void +rspamd_roll_history_update (struct roll_history *history, struct worker_task *task) +{ + gint row_num; + struct roll_history_row *row; + struct metric_result *metric_res; + struct history_metric_callback_data cbdata; + + if (history->need_lock) { + /* Some process is getting history, so wait on a mutex */ + memory_pool_lock_mutex (history->mtx); + history->need_lock = FALSE; + memory_pool_unlock_mutex (history->mtx); + } + + /* First of all obtain check and obtain row number */ + g_atomic_int_compare_and_exchange (&history->cur_row, HISTORY_MAX_ROWS, 0); +#if ((GLIB_MAJOR_VERSION == 2) && (GLIB_MINOR_VERSION > 30)) + row_num = g_atomic_int_add (&history->cur_row, 1); +#else + row_num = g_atomic_int_exchange_and_add (&history->cur_row, 1); +#endif + + if (row_num < HISTORY_MAX_ROWS) { + row = &history->rows[row_num]; + row->completed = FALSE; + } + else { + msg_err ("internal error with history roll occured, row number is invalid: %d", row_num); + return; + } + + /* Add information from task to roll history */ + memcpy (&row->from_addr, &task->from_addr, sizeof (row->from_addr)); + memcpy (&row->tv, &task->tv, sizeof (row->tv)); + + /* Strings */ + rspamd_strlcpy (row->message_id, task->message_id, sizeof (row->message_id)); + if (task->user) { + rspamd_strlcpy (row->user, task->user, sizeof (row->message_id)); + } + else { + row->user[0] = '\0'; + } + + /* Get default metric */ + metric_res = g_hash_table_lookup (task->results, DEFAULT_METRIC); + if (metric_res == NULL) { + row->symbols[0] = '\0'; + row->action = METRIC_ACTION_NOACTION; + } + else { + row->score = metric_res->score; + row->required_score = metric_res->metric->required_score; + row->action = check_metric_action (metric_res->score, metric_res->metric->required_score, metric_res->metric); + cbdata.pos = row->symbols; + cbdata.remain = sizeof (row->symbols); + g_hash_table_foreach (metric_res->symbols, roll_history_symbols_callback, &cbdata); + if (cbdata.remain > 0) { + /* Remove last whitespace and comma */ + *cbdata.pos-- = '\0'; + *cbdata.pos-- = '\0'; + *cbdata.pos = '\0'; + } + } + + row->scan_time = task->scan_milliseconds; + row->len = task->msg->len; + row->completed = TRUE; +} diff --git a/src/roll_history.h b/src/roll_history.h new file mode 100644 index 000000000..c2e378bf9 --- /dev/null +++ b/src/roll_history.h @@ -0,0 +1,90 @@ +/* Copyright (c) 2010-2012, Vsevolod Stakhov + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED ''AS IS'' AND ANY + * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED + * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE + * DISCLAIMED. IN NO EVENT SHALL AUTHOR BE LIABLE FOR ANY + * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES + * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; + * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND + * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS + * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + + +#ifndef ROLL_HISTORY_H_ +#define ROLL_HISTORY_H_ + +#include "config.h" +#include "mem_pool.h" + +/* + * Roll history is a special cycled buffer for checked messages, it is designed for writing history messages + * and displaying them in webui + */ + +#define HISTORY_MAX_ID 100 +#define HISTORY_MAX_SYMBOLS 200 +#define HISTORY_MAX_USER 20 +#define HISTORY_MAX_ROWS 200 + +struct worker_task; + +struct roll_history_row { + struct timeval tv; + gchar message_id[HISTORY_MAX_ID]; + gchar symbols[HISTORY_MAX_SYMBOLS]; + gchar user[HISTORY_MAX_USER]; +#ifdef HAVE_INET_PTON + struct { + union { + struct in_addr in4; + struct in6_addr in6; + } d; + gboolean ipv6; + gboolean has_addr; + } from_addr; +#else + struct in_addr from_addr; +#endif + gsize len; + guint scan_time; + gint action; + gdouble score; + gdouble required_score; + guint8 completed; +}; + +struct roll_history { + struct roll_history_row rows[HISTORY_MAX_ROWS]; + gint cur_row; + memory_pool_t *pool; + gboolean need_lock; + memory_pool_mutex_t *mtx; +}; + +/** + * Returns new roll history + * @param pool pool for shared memory + * @return new structure + */ +struct roll_history* rspamd_roll_history_new (memory_pool_t *pool); + +/** + * Update roll history with data from task + * @param history roll history object + * @param task task object + */ +void rspamd_roll_history_update (struct roll_history *history, struct worker_task *task); + +#endif /* ROLL_HISTORY_H_ */ diff --git a/src/smtp_utils.c b/src/smtp_utils.c index 4f5e8d117..4c87b6382 100644 --- a/src/smtp_utils.c +++ b/src/smtp_utils.c @@ -184,10 +184,10 @@ smtp_metric_callback (gpointer key, gpointer value, gpointer ud) #ifdef HAVE_CLOCK_GETTIME cd->log_offset += rspamd_snprintf (cd->log_buf + cd->log_offset, cd->log_size - cd->log_offset, "]), len: %z, time: %s,", - task->msg->len, calculate_check_time (&task->tv, &task->ts, task->cfg->clock_res)); + task->msg->len, calculate_check_time (&task->tv, &task->ts, task->cfg->clock_res, &task->scan_milliseconds)); #else cd->log_offset += rspamd_snprintf (cd->log_buf + cd->log_offset, cd->log_size - cd->log_offset, "]), len: %z, time: %s,", - task->msg->len, calculate_check_time (&task->tv, task->cfg->clock_res)); + task->msg->len, calculate_check_time (&task->tv, task->cfg->clock_res, &task->scan_milliseconds)); #endif } diff --git a/src/util.c b/src/util.c index 707f4c449..2784e6a78 100644 --- a/src/util.c +++ b/src/util.c @@ -902,16 +902,17 @@ resolve_stat_filename (memory_pool_t * pool, gchar *pattern, gchar *rcpt, gchar #ifdef HAVE_CLOCK_GETTIME const gchar * -calculate_check_time (struct timeval *tv, struct timespec *begin, gint resolution) +calculate_check_time (struct timeval *tv, struct timespec *begin, gint resolution, guint32 *scan_time) #else const gchar * -calculate_check_time (struct timeval *begin, gint resolution) +calculate_check_time (struct timeval *begin, gint resolution, guint32 *scan_time) #endif { double vdiff, diff; static gchar res[64]; static gchar fmt[sizeof ("%.10f ms real, %.10f ms virtual")]; struct timeval tv_now; + if (gettimeofday (&tv_now, NULL) == -1) { msg_warn ("gettimeofday failed: %s", strerror (errno)); } @@ -937,6 +938,8 @@ calculate_check_time (struct timeval *begin, gint resolution) vdiff = diff; #endif + *scan_time = diff; + sprintf (fmt, "%%.%dfms real, %%.%dfms virtual", resolution, resolution); snprintf (res, sizeof (res), fmt, diff, vdiff); diff --git a/src/util.h b/src/util.h index c8fd8158a..bd57ef59b 100644 --- a/src/util.h +++ b/src/util.h @@ -128,9 +128,9 @@ gchar* resolve_stat_filename (memory_pool_t *pool, gchar *pattern, gchar *rcpt, /* * Calculate check time with specified resolution of timer */ -const gchar* calculate_check_time (struct timeval *tv, struct timespec *begin, gint resolution); +const gchar* calculate_check_time (struct timeval *tv, struct timespec *begin, gint resolution, guint32 *scan_ms); #else -const gchar* calculate_check_time (struct timeval *begin, gint resolution); +const gchar* calculate_check_time (struct timeval *begin, gint resolution, guint32 *scan_ms); #endif /* diff --git a/src/webui.c b/src/webui.c index 8b6f26904..657750b46 100644 --- a/src/webui.c +++ b/src/webui.c @@ -66,6 +66,8 @@ #define PATH_MAPS "/maps" #define PATH_GET_MAP "/getmap" #define PATH_GRAPH "/graph" +#define PATH_PIE_CHART "/pie" +#define PATH_HISTORY "/history" /* Graph colors */ #define COLOR_CLEAN "#58A458" @@ -640,6 +642,141 @@ http_handle_graph (struct evhttp_request *req, gpointer arg) evbuffer_free (evb); } +/* + * Pie chart command handler: + * request: /pie + * headers: Password + * reply: json [ + * { label: "Foo", data: 11 }, + * { label: "Bar", data: 20 }, + * {...} + * ] + */ +static void +http_handle_pie_chart (struct evhttp_request *req, gpointer arg) +{ + struct rspamd_webui_worker_ctx *ctx = arg; + struct evbuffer *evb; + gdouble data[4], total; + + evb = evbuffer_new (); + if (!evb) { + msg_err ("cannot allocate evbuffer for reply"); + evhttp_send_reply (req, HTTP_INTERNAL, "500 insufficient memory", NULL); + return; + } + + total = ctx->srv->stat->messages_scanned; + if (total != 0) { + data[0] = ctx->srv->stat->actions_stat[METRIC_ACTION_NOACTION] / total * 100.; + data[1] = (ctx->srv->stat->actions_stat[METRIC_ACTION_ADD_HEADER] + ctx->srv->stat->actions_stat[METRIC_ACTION_REWRITE_SUBJECT]) / total * 100.; + data[2] = ctx->srv->stat->actions_stat[METRIC_ACTION_GREYLIST] / total * 100.; + data[3] = ctx->srv->stat->actions_stat[METRIC_ACTION_REJECT] / total * 100.; + + evbuffer_add_printf (evb, "[{\"label\": \"Clean messages\", \"color\": \"" + COLOR_CLEAN "\", \"data\":%.2f},", data[0]); + evbuffer_add_printf (evb, "{\"label\": \"Probable spam messages\", \"color\": \"" + COLOR_PROBABLE_SPAM "\", \"data\":%.2f},", data[1]); + evbuffer_add_printf (evb, "{\"label\": \"Greylisted messages\", \"color\": \"" + COLOR_GREYLIST "\", \"data\":%.2f},", data[2]); + evbuffer_add_printf (evb, "{\"label\": \"Rejected messages\", \"color\": \"" + COLOR_REJECT "\", \"data\":%.2f}]" CRLF, data[3]); + } + else { + evbuffer_add_printf (evb, "[{\"label\": \"Not scanned messages\", \"data\": 0}]" CRLF); + } + + + evhttp_add_header (req->output_headers, "Connection", "close"); + http_calculate_content_length (evb, req); + + evhttp_send_reply (req, HTTP_OK, "OK", evb); + evbuffer_free (evb); +} + +/* + * History command handler: + * request: /history + * headers: Password + * reply: json [ + * { label: "Foo", data: 11 }, + * { label: "Bar", data: 20 }, + * {...} + * ] + */ +static void +http_handle_history (struct evhttp_request *req, gpointer arg) +{ + struct rspamd_webui_worker_ctx *ctx = arg; + struct evbuffer *evb; + struct roll_history_row *row; + struct roll_history copied_history; + gint i, row_num; + struct tm *tm; + gchar timebuf[32]; + gchar ip_buf[INET6_ADDRSTRLEN]; + + evb = evbuffer_new (); + if (!evb) { + msg_err ("cannot allocate evbuffer for reply"); + evhttp_send_reply (req, HTTP_INTERNAL, "500 insufficient memory", NULL); + return; + } + + /* Set lock on history */ + memory_pool_lock_mutex (ctx->srv->history->mtx); + ctx->srv->history->need_lock = TRUE; + /* Copy locked */ + memcpy (&copied_history, ctx->srv->history, sizeof (copied_history)); + memory_pool_unlock_mutex (ctx->srv->history->mtx); + + /* Trailer */ + evbuffer_add (evb, "[", 1); + + /* Go throught all rows */ + row_num = copied_history.cur_row; + for (i = 0; i < HISTORY_MAX_ROWS; i ++, row_num ++) { + if (row_num == HISTORY_MAX_ROWS) { + row_num = 0; + } + row = &copied_history.rows[row_num]; + /* Get only completed rows */ + if (row->completed) { + tm = localtime (&row->tv.tv_sec); + strftime (timebuf, sizeof (timebuf), "%F %H:%M:%S", tm); +#ifdef HAVE_INET_PTON + if (row->from_addr.ipv6) { + inet_ntop (AF_INET6, &row->from_addr.d.in6, ip_buf, sizeof (ip_buf)); + } + else { + inet_ntop (AF_INET, &row->from_addr.d.in4, ip_buf, sizeof (ip_buf)); + } +#else + rspamd_strlcpy (ip_buf, inet_ntoa (task->from_addr), sizeof (ip_buf)); +#endif + if (row->user[0] != '\0') { + evbuffer_add_printf (evb, "{\"time\":\"%s\",\"id\":\"%s\",\"ip\":\"%s\",\"action\":\"%s\"," + "\"score\":%.2f,\"required_score\": %.2f,\"symbols\":\"%s\",\"size\":%zd,\"scan_time\":%u," + "\"user\":\"%s\"}%s", timebuf, row->message_id, ip_buf, str_action_metric (row->action), + row->score, row->required_score, row->symbols, row->len, row->scan_time, row->user, i == HISTORY_MAX_ROWS - 1 ? "" : ","); + } + else { + evbuffer_add_printf (evb, "{\"time\":\"%s\",\"id\":\"%s\",\"ip\":\"%s\",\"action\":\"%s\"," + "\"score\": %.2f,\"required_score\":%.2f,\"symbols\":\"%s\",\"size\":%zd,\"scan_time\":%u}%s", + timebuf, row->message_id, ip_buf, str_action_metric (row->action), + row->score, row->required_score, row->symbols, row->len, row->scan_time, i == HISTORY_MAX_ROWS - 1 ? "" : ","); + } + } + } + + evbuffer_add (evb, "]" CRLF, 3); + + evhttp_add_header (req->output_headers, "Connection", "close"); + http_calculate_content_length (evb, req); + + evhttp_send_reply (req, HTTP_OK, "OK", evb); + evbuffer_free (evb); +} gpointer init_webui_worker (void) @@ -719,6 +856,8 @@ start_webui_worker (struct rspamd_worker *worker) evhttp_set_cb (ctx->http, PATH_MAPS, http_handle_maps, ctx); evhttp_set_cb (ctx->http, PATH_GET_MAP, http_handle_get_map, ctx); evhttp_set_cb (ctx->http, PATH_GRAPH, http_handle_graph, ctx); + evhttp_set_cb (ctx->http, PATH_PIE_CHART, http_handle_pie_chart, ctx); + evhttp_set_cb (ctx->http, PATH_HISTORY, http_handle_history, ctx); ctx->resolver = dns_resolver_init (ctx->ev_base, worker->srv->cfg); -- 2.39.5