From e2b9ffea8d40c4f58239b8e29c2ec80e5aa8c250 Mon Sep 17 00:00:00 2001 From: Vsevolod Stakhov Date: Sat, 24 Oct 2015 14:33:02 +0100 Subject: [PATCH] Start conversion history to a flexible form. --- src/libserver/roll_history.c | 185 ++++++++++++++++++++++++++--------- src/libserver/roll_history.h | 24 +++-- 2 files changed, 148 insertions(+), 61 deletions(-) diff --git a/src/libserver/roll_history.c b/src/libserver/roll_history.c index 6c48fc224..798e1f855 100644 --- a/src/libserver/roll_history.c +++ b/src/libserver/roll_history.c @@ -26,9 +26,10 @@ #include "config.h" #include "rspamd.h" #include "roll_history.h" +#include "ucl.h" #include "unix-std.h" -static const gchar rspamd_history_magic[] = {'r', 's', 'h', '1'}; +static const gchar rspamd_history_magic_old[] = {'r', 's', 'h', '1'}; /** * Returns new roll history @@ -36,17 +37,18 @@ static const gchar rspamd_history_magic[] = {'r', 's', 'h', '1'}; * @return new structure */ struct roll_history * -rspamd_roll_history_new (rspamd_mempool_t *pool) +rspamd_roll_history_new (rspamd_mempool_t *pool, guint max_rows) { struct roll_history *new; - if (pool == NULL) { + if (pool == NULL || max_rows == 0) { return NULL; } new = rspamd_mempool_alloc0_shared (pool, sizeof (struct roll_history)); - new->pool = pool; - new->mtx = rspamd_mempool_get_mutex (pool); + new->rows = rspamd_mempool_alloc0_shared (pool, + sizeof (struct roll_history_row) * max_rows); + new->nrows = max_rows; return new; } @@ -84,24 +86,17 @@ rspamd_roll_history_update (struct roll_history *history, struct metric_result *metric_res; struct history_metric_callback_data cbdata; - if (history->need_lock) { - /* Some process is getting history, so wait on a mutex */ - rspamd_mempool_lock_mutex (history->mtx); - history->need_lock = FALSE; - rspamd_mempool_unlock_mutex (history->mtx); - } - /* First of all obtain check and obtain row number */ - g_atomic_int_compare_and_exchange (&history->cur_row, HISTORY_MAX_ROWS, 0); + g_atomic_int_compare_and_exchange (&history->cur_row, history->nrows, 0); #if ((GLIB_MAJOR_VERSION == 2) && (GLIB_MINOR_VERSION > 30)) row_num = g_atomic_int_add (&history->cur_row, 1); #else row_num = g_atomic_int_exchange_and_add (&history->cur_row, 1); #endif - if (row_num < HISTORY_MAX_ROWS) { + if (row_num < history->nrows) { row = &history->rows[row_num]; - row->completed = FALSE; + g_atomic_int_set (&row->completed, FALSE); } else { /* Race condition */ @@ -119,7 +114,7 @@ rspamd_roll_history_update (struct roll_history *history, rspamd_strlcpy (row->from_addr, "unknown", sizeof (row->from_addr)); } - memcpy (&row->tv, &task->tv, sizeof (row->tv)); + memcpy (&row->tv, &task->tv, sizeof (row->tv)); /* Strings */ rspamd_strlcpy (row->message_id, task->message_id, @@ -157,7 +152,7 @@ rspamd_roll_history_update (struct roll_history *history, row->scan_time = rspamd_get_ticks () - task->time_real; row->len = task->msg.len; - row->completed = TRUE; + g_atomic_int_set (&row->completed, TRUE); } /** @@ -171,51 +166,159 @@ rspamd_roll_history_load (struct roll_history *history, const gchar *filename) { gint fd; struct stat st; - gchar magic[sizeof(rspamd_history_magic)]; - rspamd_mempool_t *pool; + gchar magic[sizeof(rspamd_history_magic_old)]; + ucl_object_t *top; + const ucl_object_t *cur, *elt; + struct ucl_parser *parser; + struct roll_history_row *row; + guint n, i; g_assert (history != NULL); - pool = history->pool; if (stat (filename, &st) == -1) { - msg_info_pool ("cannot load history from %s: %s", filename, + msg_info ("cannot load history from %s: %s", filename, strerror (errno)); return FALSE; } - if (st.st_size != sizeof (history->rows) + sizeof (rspamd_history_magic)) { - msg_info_pool ("cannot load history from %s: size mismatch", filename); - return FALSE; - } - if ((fd = open (filename, O_RDONLY)) == -1) { - msg_info_pool ("cannot load history from %s: %s", filename, + msg_info ("cannot load history from %s: %s", filename, strerror (errno)); return FALSE; } + /* Check for old format */ if (read (fd, magic, sizeof (magic)) == -1) { close (fd); - msg_info_pool ("cannot read history from %s: %s", filename, + msg_info ("cannot read history from %s: %s", filename, strerror (errno)); return FALSE; } - if (memcmp (magic, rspamd_history_magic, sizeof (magic)) != 0) { + if (memcmp (magic, rspamd_history_magic_old, sizeof (magic)) == 0) { close (fd); - msg_info_pool ("cannot read history from %s: bad magic", filename); + msg_warn ("cannot read history from old format %s, " + "it will be replaced after restart", filename); return FALSE; } - if (read (fd, history->rows, sizeof (history->rows)) == -1) { + parser = ucl_parser_new (0); + + if (!ucl_parser_add_fd (parser, fd)) { + msg_warn ("cannot parse history file %s: %s", filename, + ucl_parser_get_error (parser)); + ucl_parser_free (parser); close (fd); - msg_info_pool ("cannot read history from %s: %s", filename, - strerror (errno)); + return FALSE; } + top = ucl_parser_get_object (parser); + ucl_parser_free (parser); close (fd); + g_assert (top != NULL); + + if (ucl_object_type (top) != UCL_ARRAY) { + msg_warn ("invalid object type read from: %s", filename); + ucl_object_unref (top); + + return FALSE; + } + + if (top->len > history->nrows) { + msg_warn ("stored history is larger than the current one: %ud (file) vs " + "%ud (history)", top->len, history->nrows); + n = history->nrows; + } + else if (top->len < history->nrows) { + msg_warn ( + "stored history is smaller than the current one: %ud (file) vs " + "%ud (history)", + top->len, history->nrows); + n = top->len; + } + else { + n = top->len; + } + + for (i = 0; i < n; i ++) { + cur = ucl_array_find_index (top, i); + + if (cur != NULL && ucl_object_type (cur) == UCL_OBJECT) { + row = &history->rows[i]; + memset (row, 0, sizeof (*row)); + + elt = ucl_object_find_key (cur, "time"); + + if (elt && ucl_object_type (elt) == UCL_FLOAT) { + double_to_tv (ucl_object_todouble (elt), &row->tv); + } + + elt = ucl_object_find_key (cur, "id"); + + if (elt && ucl_object_type (elt) == UCL_STRING) { + rspamd_strlcpy (row->message_id, ucl_object_tostring (elt), + sizeof (row->message_id)); + } + + elt = ucl_object_find_key (cur, "symbols"); + + if (elt && ucl_object_type (elt) == UCL_STRING) { + rspamd_strlcpy (row->symbols, ucl_object_tostring (elt), + sizeof (row->symbols)); + } + + elt = ucl_object_find_key (cur, "user"); + + if (elt && ucl_object_type (elt) == UCL_STRING) { + rspamd_strlcpy (row->user, ucl_object_tostring (elt), + sizeof (row->user)); + } + + elt = ucl_object_find_key (cur, "from"); + + if (elt && ucl_object_type (elt) == UCL_STRING) { + rspamd_strlcpy (row->from_addr, ucl_object_tostring (elt), + sizeof (row->from_addr)); + } + + elt = ucl_object_find_key (cur, "len"); + + if (elt && ucl_object_type (elt) == UCL_INT) { + row->len = ucl_object_toint (elt); + } + + elt = ucl_object_find_key (cur, "scan_time"); + + if (elt && ucl_object_type (elt) == UCL_FLOAT) { + row->scan_time = ucl_object_todouble (elt); + } + + elt = ucl_object_find_key (cur, "score"); + + if (elt && ucl_object_type (elt) == UCL_FLOAT) { + row->score = ucl_object_todouble (elt); + } + + elt = ucl_object_find_key (cur, "required_score"); + + if (elt && ucl_object_type (elt) == UCL_FLOAT) { + row->required_score = ucl_object_todouble (elt); + } + + elt = ucl_object_find_key (cur, "action"); + + if (elt && ucl_object_type (elt) == UCL_INT) { + row->action = ucl_object_toint (elt); + } + + row->completed = TRUE; + } + } + + history->cur_row = n; + return TRUE; } @@ -229,25 +332,11 @@ gboolean rspamd_roll_history_save (struct roll_history *history, const gchar *filename) { gint fd; - rspamd_mempool_t *pool; g_assert (history != NULL); - pool = history->pool; if ((fd = open (filename, O_WRONLY | O_CREAT | O_TRUNC, 00600)) == -1) { - msg_info_pool ("cannot save history to %s: %s", filename, strerror (errno)); - return FALSE; - } - - if (write (fd, rspamd_history_magic, sizeof (rspamd_history_magic)) == -1) { - close (fd); - msg_info_pool ("cannot write history to %s: %s", filename, strerror (errno)); - return FALSE; - } - - if (write (fd, history->rows, sizeof (history->rows)) == -1) { - close (fd); - msg_info_pool ("cannot write history to %s: %s", filename, strerror (errno)); + msg_info ("cannot save history to %s: %s", filename, strerror (errno)); return FALSE; } diff --git a/src/libserver/roll_history.h b/src/libserver/roll_history.h index 2e8e1e5cd..5bcd03702 100644 --- a/src/libserver/roll_history.h +++ b/src/libserver/roll_history.h @@ -33,11 +33,10 @@ * and displaying them in webui */ -#define HISTORY_MAX_ID 100 -#define HISTORY_MAX_SYMBOLS 200 -#define HISTORY_MAX_USER 20 +#define HISTORY_MAX_ID 64 +#define HISTORY_MAX_SYMBOLS 128 +#define HISTORY_MAX_USER 32 #define HISTORY_MAX_ADDR 32 -#define HISTORY_MAX_ROWS 200 struct rspamd_task; @@ -48,19 +47,17 @@ struct roll_history_row { gchar user[HISTORY_MAX_USER]; gchar from_addr[HISTORY_MAX_ADDR]; gsize len; - guint scan_time; - gint action; + gdouble scan_time; gdouble score; gdouble required_score; - guint8 completed; + gint action; + guint completed; }; struct roll_history { - struct roll_history_row rows[HISTORY_MAX_ROWS]; - gint cur_row; - rspamd_mempool_t *pool; - gboolean need_lock; - rspamd_mempool_mutex_t *mtx; + struct roll_history_row *rows; + guint nrows; + guint cur_row; }; /** @@ -68,7 +65,8 @@ struct roll_history { * @param pool pool for shared memory * @return new structure */ -struct roll_history * rspamd_roll_history_new (rspamd_mempool_t *pool); +struct roll_history * rspamd_roll_history_new (rspamd_mempool_t *pool, + guint max_rows); /** * Update roll history with data from task -- 2.39.5