From: Vsevolod Stakhov Date: Sat, 17 Jan 2015 21:53:49 +0000 (+0000) Subject: Start refactoring of statistics in rspamd. X-Git-Tag: 0.9.0~864 X-Git-Url: https://source.dussan.org/?a=commitdiff_plain;h=665166c376a54f52b070e891780ca6209bbaa2d1;p=rspamd.git Start refactoring of statistics in rspamd. --- diff --git a/src/libserver/cfg_file.h b/src/libserver/cfg_file.h index 5844a945f..1beb51055 100644 --- a/src/libserver/cfg_file.h +++ b/src/libserver/cfg_file.h @@ -125,43 +125,6 @@ struct rspamd_symbols_group { GList *symbols; }; -/** - * Statfile section definition - */ -struct rspamd_statfile_section { - guint32 code; /**< section's code */ - guint64 size; /**< size of section */ - double weight; /**< weight coefficient for section */ -}; - -/** - * Statfile autolearn parameters - */ -struct statfile_autolearn_params { - const gchar *metric; /**< metric name for autolearn triggering */ - double threshold_min; /**< threshold mark */ - double threshold_max; /**< threshold mark */ - GList *symbols; /**< list of symbols */ -}; - -/** - * Sync affinity - */ -enum sync_affinity { - AFFINITY_NONE = 0, - AFFINITY_MASTER, - AFFINITY_SLAVE -}; - -/** - * Binlog params - */ -struct statfile_binlog_params { - enum sync_affinity affinity; - time_t rotate_time; - gchar *master_addr; - guint16 master_port; -}; typedef double (*statfile_normalize_func)(struct rspamd_config *cfg, long double score, void *params); @@ -171,15 +134,7 @@ typedef double (*statfile_normalize_func)(struct rspamd_config *cfg, */ struct rspamd_statfile_config { gchar *symbol; /**< symbol of statfile */ - gchar *path; /**< filesystem pattern (with %r or %f) */ gchar *label; /**< label of this statfile */ - gsize size; /**< size of statfile */ - GList *sections; /**< list of sections in statfile */ - struct statfile_autolearn_params *autolearn; /**< autolearn params */ - struct statfile_binlog_params *binlog; /**< binlog params */ - statfile_normalize_func normalizer; /**< function that is used as normaliser */ - void *normalizer_data; /**< normalizer function params */ - gchar *normalizer_str; /**< source string (for dump) */ ucl_object_t *opts; /**< other options */ gboolean is_spam; /**< spam flag */ }; @@ -193,7 +148,7 @@ struct rspamd_classifier_config { gchar *metric; /**< metric of this classifier */ struct classifier *classifier; /**< classifier interface */ struct tokenizer *tokenizer; /**< tokenizer used for classifier */ - GHashTable *opts; /**< other options */ + ucl_object_t *opts; /**< other options */ GList *pre_callbacks; /**< list of callbacks that are called before classification */ GList *post_callbacks; /**< list of callbacks that are called after classification */ }; diff --git a/src/libserver/cfg_rcl.c b/src/libserver/cfg_rcl.c index 6c77292aa..921464219 100644 --- a/src/libserver/cfg_rcl.c +++ b/src/libserver/cfg_rcl.c @@ -889,14 +889,6 @@ rspamd_rcl_statfile_handler (struct rspamd_config *cfg, const ucl_object_t *obj, return FALSE; } - if (st->path == NULL) { - g_set_error (err, - CFG_RCL_ERROR, - EINVAL, - "statfile must have a path defined"); - return FALSE; - } - st->opts = (ucl_object_t *)obj; val = ucl_object_find_key (obj, "spam"); @@ -967,7 +959,7 @@ rspamd_rcl_classifier_handler (struct rspamd_config *cfg, if (found == NULL) { ccf = rspamd_config_new_classifier (cfg, NULL); - ccf->classifier = get_classifier (type); + ccf->classifier = rspamd_stat_get_classifier (type); } else { ccf = found; @@ -997,13 +989,7 @@ rspamd_rcl_classifier_handler (struct rspamd_config *cfg, } else if (g_ascii_strcasecmp (key, "tokenizer") == 0 && val->type == UCL_STRING) { - ccf->tokenizer = get_tokenizer (ucl_object_tostring (val)); - } - else { - /* Just insert a value of option to the hash */ - g_hash_table_insert (ccf->opts, - (gpointer)key, - (gpointer)ucl_object_tostring_forced (val)); + ccf->tokenizer = rspamd_stat_get_tokenizer (ucl_object_tostring (val)); } } } @@ -1403,21 +1389,11 @@ rspamd_rcl_config_init (void) rspamd_rcl_parse_struct_string, G_STRUCT_OFFSET (struct rspamd_statfile_config, symbol), 0); - rspamd_rcl_add_default_handler (ssub, - "path", - rspamd_rcl_parse_struct_string, - G_STRUCT_OFFSET (struct rspamd_statfile_config, path), - RSPAMD_CL_FLAG_STRING_PATH); rspamd_rcl_add_default_handler (ssub, "label", rspamd_rcl_parse_struct_string, G_STRUCT_OFFSET (struct rspamd_statfile_config, label), 0); - rspamd_rcl_add_default_handler (ssub, - "size", - rspamd_rcl_parse_struct_integer, - G_STRUCT_OFFSET (struct rspamd_statfile_config, size), - RSPAMD_CL_FLAG_INT_SIZE); rspamd_rcl_add_default_handler (ssub, "spam", rspamd_rcl_parse_struct_boolean, diff --git a/src/libserver/cfg_utils.c b/src/libserver/cfg_utils.c index b53a2690c..c9a9555b1 100644 --- a/src/libserver/cfg_utils.c +++ b/src/libserver/cfg_utils.c @@ -498,12 +498,6 @@ rspamd_config_new_classifier (struct rspamd_config *cfg, rspamd_mempool_alloc0 (cfg->cfg_pool, sizeof (struct rspamd_classifier_config)); } - if (c->opts == NULL) { - c->opts = g_hash_table_new (rspamd_str_hash, rspamd_str_equal); - rspamd_mempool_add_destructor (cfg->cfg_pool, - (rspamd_mempool_destruct_t) g_hash_table_destroy, - c->opts); - } if (c->labels == NULL) { c->labels = g_hash_table_new_full (rspamd_str_hash, rspamd_str_equal, diff --git a/src/libserver/statfile.c b/src/libserver/statfile.c deleted file mode 100644 index 066671a95..000000000 --- a/src/libserver/statfile.c +++ /dev/null @@ -1,1083 +0,0 @@ -/* - * Copyright (c) 2009-2012, Vsevolod Stakhov - * All rights reserved. - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are met: - * * Redistributions of source code must retain the above copyright - * notice, this list of conditions and the following disclaimer. - * * Redistributions in binary form must reproduce the above copyright - * notice, this list of conditions and the following disclaimer in the - * documentation and/or other materials provided with the distribution. - * - * THIS SOFTWARE IS PROVIDED BY AUTHOR ''AS IS'' AND ANY - * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED - * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE - * DISCLAIMED. IN NO EVENT SHALL AUTHOR BE LIABLE FOR ANY - * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES - * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; - * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND - * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT - * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS - * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - */ - -#include "config.h" - -#include "statfile.h" -#include "main.h" - -#define RSPAMD_STATFILE_VERSION {'1', '2'} -#define BACKUP_SUFFIX ".old" - -/* Maximum number of statistics files */ -#define STATFILES_MAX 255 -static void statfile_pool_set_block_common ( - statfile_pool_t * pool, stat_file_t * file, - guint32 h1, guint32 h2, - time_t t, double value, - gboolean from_now); - -static gint -cmpstatfile (const void *a, const void *b) -{ - const stat_file_t *s1 = a, *s2 = b; - - return g_ascii_strcasecmp (s1->filename, s2->filename); -} - -/* Convert statfile version 1.0 to statfile version 1.2, saving backup */ -struct stat_file_header_10 { - u_char magic[3]; /**< magic signature ('r' 's' 'd') */ - u_char version[2]; /**< version of statfile */ - u_char padding[3]; /**< padding */ - guint64 create_time; /**< create time (time_t->guint64) */ -}; - -static gboolean -convert_statfile_10 (stat_file_t * file) -{ - gchar *backup_name; - struct stat st; - struct stat_file_header header = { - .magic = {'r', 's', 'd'}, - .version = RSPAMD_STATFILE_VERSION, - .padding = {0, 0, 0}, - .revision = 0, - .rev_time = 0 - }; - - - /* Format backup name */ - backup_name = g_strdup_printf ("%s.%s", file->filename, BACKUP_SUFFIX); - - msg_info ("convert old statfile %s to version %c.%c, backup in %s", - file->filename, - header.version[0], - header.version[1], - backup_name); - - if (stat (backup_name, &st) != -1) { - msg_info ("replace old %s", backup_name); - unlink (backup_name); - } - - rename (file->filename, backup_name); - g_free (backup_name); - - /* XXX: maybe race condition here */ - rspamd_file_unlock (file->fd, FALSE); - close (file->fd); - if ((file->fd = - open (file->filename, O_RDWR | O_TRUNC | O_CREAT, - S_IWUSR | S_IRUSR)) == -1) { - msg_info ("cannot create file %s, error %d, %s", - file->filename, - errno, - strerror (errno)); - return FALSE; - } - rspamd_file_lock (file->fd, FALSE); - /* Now make new header and copy it to new file */ - if (write (file->fd, &header, sizeof (header)) == -1) { - msg_info ("cannot write to file %s, error %d, %s", - file->filename, - errno, - strerror (errno)); - return FALSE; - } - /* Now write old map to new file */ - if (write (file->fd, - ((u_char *)file->map + sizeof (struct stat_file_header_10)), - file->len - sizeof (struct stat_file_header_10)) == -1) { - msg_info ("cannot write to file %s, error %d, %s", - file->filename, - errno, - strerror (errno)); - return FALSE; - } - /* Unmap old memory and map new */ - munmap (file->map, file->len); - file->len = file->len + sizeof (struct stat_file_header) - - sizeof (struct stat_file_header_10); -#ifdef HAVE_MMAP_NOCORE - if ((file->map = - mmap (NULL, file->len, PROT_READ | PROT_WRITE, MAP_SHARED | MAP_NOCORE, - file->fd, 0)) == MAP_FAILED) { -#else - if ((file->map = - mmap (NULL, file->len, PROT_READ | PROT_WRITE, MAP_SHARED, file->fd, - 0)) == MAP_FAILED) { -#endif - msg_info ("cannot mmap file %s, error %d, %s", - file->filename, - errno, - strerror (errno)); - return FALSE; - } - - return TRUE; -} - -/* Check whether specified file is statistic file and calculate its len in blocks */ -static gint -statfile_pool_check (stat_file_t * file) -{ - struct stat_file *f; - gchar *c; - static gchar valid_version[] = RSPAMD_STATFILE_VERSION; - - - if (!file || !file->map) { - return -1; - } - - if (file->len < sizeof (struct stat_file)) { - msg_info ("file %s is too short to be stat file: %z", - file->filename, - file->len); - return -1; - } - - f = (struct stat_file *)file->map; - c = f->header.magic; - /* Check magic and version */ - if (*c++ != 'r' || *c++ != 's' || *c++ != 'd') { - msg_info ("file %s is invalid stat file", file->filename); - return -1; - } - /* Now check version and convert old version to new one (that can be used for sync */ - if (*c == 1 && *(c + 1) == 0) { - if (!convert_statfile_10 (file)) { - return -1; - } - f = (struct stat_file *)file->map; - } - else if (memcmp (c, valid_version, sizeof (valid_version)) != 0) { - /* Unknown version */ - msg_info ("file %s has invalid version %c.%c", - file->filename, - '0' + *c, - '0' + *(c + 1)); - return -1; - } - - /* Check first section and set new offset */ - file->cur_section.code = f->section.code; - file->cur_section.length = f->section.length; - if (file->cur_section.length * sizeof (struct stat_file_block) > - file->len) { - msg_info ("file %s is truncated: %z, must be %z", - file->filename, - file->len, - file->cur_section.length * sizeof (struct stat_file_block)); - return -1; - } - file->seek_pos = sizeof (struct stat_file) - - sizeof (struct stat_file_block); - - return 0; -} - - -statfile_pool_t * -statfile_pool_new (rspamd_mempool_t *pool, gboolean use_mlock) -{ - statfile_pool_t *new; - - new = rspamd_mempool_alloc0 (pool, sizeof (statfile_pool_t)); - new->pool = rspamd_mempool_new (rspamd_mempool_suggest_size ()); - new->files = - rspamd_mempool_alloc0 (new->pool, STATFILES_MAX * sizeof (stat_file_t)); - new->lock = rspamd_mempool_get_mutex (new->pool); - new->mlock_ok = use_mlock; - - return new; -} - -static stat_file_t * -statfile_pool_reindex (statfile_pool_t * pool, - gchar *filename, - size_t old_size, - size_t size) -{ - gchar *backup; - gint fd; - stat_file_t *new; - u_char *map, *pos; - struct stat_file_block *block; - struct stat_file_header *header; - - if (size < - sizeof (struct stat_file_header) + sizeof (struct stat_file_section) + - sizeof (block)) { - msg_err ("file %s is too small to carry any statistic: %z", - filename, - size); - return NULL; - } - - /* First of all rename old file */ - rspamd_mempool_lock_mutex (pool->lock); - - backup = g_strconcat (filename, ".old", NULL); - if (rename (filename, backup) == -1) { - msg_err ("cannot rename %s to %s: %s", filename, backup, strerror ( - errno)); - g_free (backup); - rspamd_mempool_unlock_mutex (pool->lock); - return NULL; - } - - rspamd_mempool_unlock_mutex (pool->lock); - - /* Now create new file with required size */ - if (statfile_pool_create (pool, filename, size) != 0) { - msg_err ("cannot create new file"); - g_free (backup); - return NULL; - } - /* Now open new file and start copying */ - fd = open (backup, O_RDONLY); - new = statfile_pool_open (pool, filename, size, TRUE); - - if (fd == -1 || new == NULL) { - msg_err ("cannot open file: %s", strerror (errno)); - g_free (backup); - return NULL; - } - - /* Now start reading blocks from old statfile */ - if ((map = - mmap (NULL, old_size, PROT_READ, MAP_SHARED, fd, 0)) == MAP_FAILED) { - msg_err ("cannot mmap file: %s", strerror (errno)); - close (fd); - g_free (backup); - return NULL; - } - - pos = map + (sizeof (struct stat_file) - sizeof (struct stat_file_block)); - while (old_size - (pos - map) >= sizeof (struct stat_file_block)) { - block = (struct stat_file_block *)pos; - if (block->hash1 != 0 && block->value != 0) { - statfile_pool_set_block_common (pool, - new, - block->hash1, - block->hash2, - 0, - block->value, - FALSE); - } - pos += sizeof (block); - } - - header = (struct stat_file_header *)map; - statfile_set_revision (new, header->revision, header->rev_time); - - munmap (map, old_size); - close (fd); - unlink (backup); - g_free (backup); - - return new; - -} - -/* - * Pre-load mmaped file into memory - */ -static void -statfile_preload (stat_file_t *file) -{ - guint8 *pos, *end; - volatile guint8 t; - gsize size; - - pos = (guint8 *)file->map; - end = (guint8 *)file->map + file->len; - - if (madvise (pos, end - pos, MADV_SEQUENTIAL) == -1) { - msg_info ("madvise failed: %s", strerror (errno)); - } - else { - /* Load pages of file */ -#ifdef HAVE_GETPAGESIZE - size = getpagesize (); -#else - size = sysconf (_SC_PAGESIZE); -#endif - while (pos < end) { - t = *pos; - (void)t; - pos += size; - } - } -} - -stat_file_t * -statfile_pool_open (statfile_pool_t * pool, - gchar *filename, - size_t size, - gboolean forced) -{ - struct stat st; - stat_file_t *new_file; - - if ((new_file = statfile_pool_is_open (pool, filename)) != NULL) { - return new_file; - } - - if (pool->opened >= STATFILES_MAX - 1) { - msg_err ("reached hard coded limit of statfiles opened: %d", - STATFILES_MAX); - return NULL; - } - - if (stat (filename, &st) == -1) { - msg_info ("cannot stat file %s, error %s, %d", filename, strerror ( - errno), errno); - return NULL; - } - - rspamd_mempool_lock_mutex (pool->lock); - if (!forced && - labs (size - st.st_size) > (long)sizeof (struct stat_file) * 2 - && size > sizeof (struct stat_file)) { - rspamd_mempool_unlock_mutex (pool->lock); - msg_warn ("need to reindex statfile old size: %Hz, new size: %Hz", - (size_t)st.st_size, size); - return statfile_pool_reindex (pool, filename, st.st_size, size); - } - else if (size < sizeof (struct stat_file)) { - msg_err ("requested to shrink statfile to %Hz but it is too small", - size); - } - - new_file = &pool->files[pool->opened++]; - bzero (new_file, sizeof (stat_file_t)); - if ((new_file->fd = open (filename, O_RDWR)) == -1) { - msg_info ("cannot open file %s, error %d, %s", - filename, - errno, - strerror (errno)); - rspamd_mempool_unlock_mutex (pool->lock); - pool->opened--; - return NULL; - } - - if ((new_file->map = - mmap (NULL, st.st_size, PROT_READ | PROT_WRITE, MAP_SHARED, - new_file->fd, 0)) == MAP_FAILED) { - close (new_file->fd); - rspamd_mempool_unlock_mutex (pool->lock); - msg_info ("cannot mmap file %s, error %d, %s", - filename, - errno, - strerror (errno)); - pool->opened--; - return NULL; - - } - - rspamd_strlcpy (new_file->filename, filename, sizeof (new_file->filename)); - new_file->len = st.st_size; - /* Try to lock pages in RAM */ - if (pool->mlock_ok) { - if (mlock (new_file->map, new_file->len) == -1) { - msg_warn ( - "mlock of statfile failed, maybe you need to increase RLIMIT_MEMLOCK limit for a process: %s", - strerror (errno)); - pool->mlock_ok = FALSE; - } - } - /* Acquire lock for this operation */ - rspamd_file_lock (new_file->fd, FALSE); - if (statfile_pool_check (new_file) == -1) { - pool->opened--; - rspamd_mempool_unlock_mutex (pool->lock); - rspamd_file_unlock (new_file->fd, FALSE); - munmap (new_file->map, st.st_size); - return NULL; - } - rspamd_file_unlock (new_file->fd, FALSE); - - new_file->open_time = time (NULL); - new_file->access_time = new_file->open_time; - new_file->lock = rspamd_mempool_get_mutex (pool->pool); - - statfile_preload (new_file); - - rspamd_mempool_unlock_mutex (pool->lock); - - return statfile_pool_is_open (pool, filename); -} - -gint -statfile_pool_close (statfile_pool_t * pool, - stat_file_t * file, - gboolean keep_sorted) -{ - stat_file_t *pos; - - if ((pos = statfile_pool_is_open (pool, file->filename)) == NULL) { - msg_info ("file %s is not opened", file->filename); - return -1; - } - - rspamd_mempool_lock_mutex (pool->lock); - - if (file->map) { - msg_info ("syncing statfile %s", file->filename); - msync (file->map, file->len, MS_ASYNC); - munmap (file->map, file->len); - } - if (file->fd != -1) { - close (file->fd); - } - /* Move the remain statfiles */ - memmove (pos, ((guint8 *)pos) + sizeof (stat_file_t), - (--pool->opened - (pos - pool->files)) * sizeof (stat_file_t)); - - rspamd_mempool_unlock_mutex (pool->lock); - - return 0; -} - -gint -statfile_pool_create (statfile_pool_t * pool, gchar *filename, size_t size) -{ - struct stat_file_header header = { - .magic = {'r', 's', 'd'}, - .version = RSPAMD_STATFILE_VERSION, - .padding = {0, 0, 0}, - .revision = 0, - .rev_time = 0, - .used_blocks = 0 - }; - struct stat_file_section section = { - .code = STATFILE_SECTION_COMMON, - }; - struct stat_file_block block = { 0, 0, 0 }; - gint fd; - guint buflen = 0, nblocks; - gchar *buf = NULL; - - if (statfile_pool_is_open (pool, filename) != NULL) { - msg_info ("file %s is already opened", filename); - return 0; - } - - if (size < - sizeof (struct stat_file_header) + sizeof (struct stat_file_section) + - sizeof (block)) { - msg_err ("file %s is too small to carry any statistic: %z", - filename, - size); - return -1; - } - - rspamd_mempool_lock_mutex (pool->lock); - nblocks = - (size - sizeof (struct stat_file_header) - - sizeof (struct stat_file_section)) / sizeof (struct stat_file_block); - header.total_blocks = nblocks; - - if ((fd = - open (filename, O_RDWR | O_TRUNC | O_CREAT, S_IWUSR | S_IRUSR)) == -1) { - msg_info ("cannot create file %s, error %d, %s", - filename, - errno, - strerror (errno)); - rspamd_mempool_unlock_mutex (pool->lock); - return -1; - } - - rspamd_fallocate (fd, - 0, - sizeof (header) + sizeof (section) + sizeof (block) * nblocks); - - header.create_time = (guint64) time (NULL); - if (write (fd, &header, sizeof (header)) == -1) { - msg_info ("cannot write header to file %s, error %d, %s", - filename, - errno, - strerror (errno)); - close (fd); - rspamd_mempool_unlock_mutex (pool->lock); - return -1; - } - - section.length = (guint64) nblocks; - if (write (fd, §ion, sizeof (section)) == -1) { - msg_info ("cannot write section header to file %s, error %d, %s", - filename, - errno, - strerror (errno)); - close (fd); - rspamd_mempool_unlock_mutex (pool->lock); - return -1; - } - - /* Buffer for write 256 blocks at once */ - if (nblocks > 256) { - buflen = sizeof (block) * 256; - buf = g_malloc0 (buflen); - } - - while (nblocks) { - if (nblocks > 256) { - /* Just write buffer */ - if (write (fd, buf, buflen) == -1) { - msg_info ("cannot write blocks buffer to file %s, error %d, %s", - filename, - errno, - strerror (errno)); - close (fd); - rspamd_mempool_unlock_mutex (pool->lock); - g_free (buf); - return -1; - } - nblocks -= 256; - } - else { - if (write (fd, &block, sizeof (block)) == -1) { - msg_info ("cannot write block to file %s, error %d, %s", - filename, - errno, - strerror (errno)); - close (fd); - if (buf) { - g_free (buf); - } - rspamd_mempool_unlock_mutex (pool->lock); - return -1; - } - nblocks--; - } - } - - close (fd); - rspamd_mempool_unlock_mutex (pool->lock); - - if (buf) { - g_free (buf); - } - - return 0; -} - -void -statfile_pool_delete (statfile_pool_t * pool) -{ - gint i; - - for (i = 0; i < pool->opened; i++) { - statfile_pool_close (pool, &pool->files[i], FALSE); - } - rspamd_mempool_delete (pool->pool); -} - -void -statfile_pool_lock_file (statfile_pool_t * pool, stat_file_t * file) -{ - - rspamd_mempool_lock_mutex (file->lock); -} - -void -statfile_pool_unlock_file (statfile_pool_t * pool, stat_file_t * file) -{ - - rspamd_mempool_unlock_mutex (file->lock); -} - -double -statfile_pool_get_block (statfile_pool_t * pool, - stat_file_t * file, - guint32 h1, - guint32 h2, - time_t now) -{ - struct stat_file_block *block; - guint i, blocknum; - u_char *c; - - - file->access_time = now; - if (!file->map) { - return 0; - } - - blocknum = h1 % file->cur_section.length; - c = (u_char *) file->map + file->seek_pos + blocknum * - sizeof (struct stat_file_block); - block = (struct stat_file_block *)c; - - for (i = 0; i < CHAIN_LENGTH; i++) { - if (i + blocknum >= file->cur_section.length) { - break; - } - if (block->hash1 == h1 && block->hash2 == h2) { - return block->value; - } - c += sizeof (struct stat_file_block); - block = (struct stat_file_block *)c; - } - - - return 0; -} - -static void -statfile_pool_set_block_common (statfile_pool_t * pool, - stat_file_t * file, - guint32 h1, - guint32 h2, - time_t t, - double value, - gboolean from_now) -{ - struct stat_file_block *block, *to_expire = NULL; - struct stat_file_header *header; - guint i, blocknum; - u_char *c; - double min = G_MAXDOUBLE; - - if (from_now) { - file->access_time = t; - } - if (!file->map) { - return; - } - - blocknum = h1 % file->cur_section.length; - header = (struct stat_file_header *)file->map; - c = (u_char *) file->map + file->seek_pos + blocknum * - sizeof (struct stat_file_block); - block = (struct stat_file_block *)c; - - for (i = 0; i < CHAIN_LENGTH; i++) { - if (i + blocknum >= file->cur_section.length) { - /* Need to expire some block in chain */ - msg_info ("chain %ud is full in statfile %s, starting expire", - blocknum, - file->filename); - break; - } - /* First try to find block in chain */ - if (block->hash1 == h1 && block->hash2 == h2) { - block->value = value; - return; - } - /* Check whether we have a free block in chain */ - if (block->hash1 == 0 && block->hash2 == 0) { - /* Write new block here */ - msg_debug ("found free block %ud in chain %ud, set h1=%ud, h2=%ud", - i, - blocknum, - h1, - h2); - block->hash1 = h1; - block->hash2 = h2; - block->value = value; - header->used_blocks++; - - return; - } - - /* Expire block with minimum value otherwise */ - if (block->value < min) { - to_expire = block; - min = block->value; - } - c += sizeof (struct stat_file_block); - block = (struct stat_file_block *)c; - } - - /* Try expire some block */ - if (to_expire) { - block = to_expire; - } - else { - /* Expire first block in chain */ - c = (u_char *) file->map + file->seek_pos + blocknum * - sizeof (struct stat_file_block); - block = (struct stat_file_block *)c; - } - - block->hash1 = h1; - block->hash2 = h2; - block->value = value; -} - -void -statfile_pool_set_block (statfile_pool_t * pool, - stat_file_t * file, - guint32 h1, - guint32 h2, - time_t now, - double value) -{ - statfile_pool_set_block_common (pool, file, h1, h2, now, value, TRUE); -} - -stat_file_t * -statfile_pool_is_open (statfile_pool_t * pool, gchar *filename) -{ - static stat_file_t f, *ret; - rspamd_strlcpy (f.filename, filename, sizeof (f.filename)); - ret = lfind (&f, - pool->files, - (size_t *)&pool->opened, - sizeof (stat_file_t), - cmpstatfile); - return ret; -} - -guint32 -statfile_pool_get_section (statfile_pool_t * pool, stat_file_t * file) -{ - - return file->cur_section.code; -} - -gboolean -statfile_pool_set_section (statfile_pool_t * pool, - stat_file_t * file, - guint32 code, - gboolean from_begin) -{ - struct stat_file_section *sec; - off_t cur_offset; - - - /* Try to find section */ - if (from_begin) { - cur_offset = sizeof (struct stat_file_header); - } - else { - cur_offset = file->seek_pos - sizeof (struct stat_file_section); - } - while (cur_offset < (off_t)file->len) { - sec = (struct stat_file_section *)((gchar *)file->map + cur_offset); - if (sec->code == code) { - file->cur_section.code = code; - file->cur_section.length = sec->length; - file->seek_pos = cur_offset + sizeof (struct stat_file_section); - return TRUE; - } - cur_offset += sec->length; - } - - return FALSE; -} - -gboolean -statfile_pool_add_section (statfile_pool_t * pool, - stat_file_t * file, - guint32 code, - guint64 length) -{ - struct stat_file_section sect; - struct stat_file_block block = { 0, 0, 0 }; - - if (lseek (file->fd, 0, SEEK_END) == -1) { - msg_info ("cannot lseek file %s, error %d, %s", - file->filename, - errno, - strerror (errno)); - return FALSE; - } - - sect.code = code; - sect.length = length; - - if (write (file->fd, §, sizeof (sect)) == -1) { - msg_info ("cannot write block to file %s, error %d, %s", - file->filename, - errno, - strerror (errno)); - return FALSE; - } - - while (length--) { - if (write (file->fd, &block, sizeof (block)) == -1) { - msg_info ("cannot write block to file %s, error %d, %s", - file->filename, - errno, - strerror (errno)); - return FALSE; - } - } - - /* Lock statfile to remap memory */ - statfile_pool_lock_file (pool, file); - munmap (file->map, file->len); - fsync (file->fd); - file->len += length; - - if ((file->map = - mmap (NULL, file->len, PROT_READ | PROT_WRITE, MAP_SHARED, file->fd, - 0)) == NULL) { - msg_info ("cannot mmap file %s, error %d, %s", - file->filename, - errno, - strerror (errno)); - return FALSE; - } - statfile_pool_unlock_file (pool, file); - - return TRUE; - -} - -guint32 -statfile_get_section_by_name (const gchar *name) -{ - if (g_ascii_strcasecmp (name, "common") == 0) { - return STATFILE_SECTION_COMMON; - } - else if (g_ascii_strcasecmp (name, "header") == 0) { - return STATFILE_SECTION_HEADERS; - } - else if (g_ascii_strcasecmp (name, "url") == 0) { - return STATFILE_SECTION_URLS; - } - else if (g_ascii_strcasecmp (name, "regexp") == 0) { - return STATFILE_SECTION_REGEXP; - } - - return 0; -} - -gboolean -statfile_set_revision (stat_file_t *file, guint64 rev, time_t time) -{ - struct stat_file_header *header; - - if (file == NULL || file->map == NULL) { - return FALSE; - } - - header = (struct stat_file_header *)file->map; - - header->revision = rev; - header->rev_time = time; - - return TRUE; -} - -gboolean -statfile_inc_revision (stat_file_t *file) -{ - struct stat_file_header *header; - - if (file == NULL || file->map == NULL) { - return FALSE; - } - - header = (struct stat_file_header *)file->map; - - header->revision++; - - return TRUE; -} - -gboolean -statfile_get_revision (stat_file_t *file, guint64 *rev, time_t *time) -{ - struct stat_file_header *header; - - if (file == NULL || file->map == NULL) { - return FALSE; - } - - header = (struct stat_file_header *)file->map; - - if (rev != NULL) { - *rev = header->revision; - } - if (time != NULL) { - *time = header->rev_time; - } - - return TRUE; -} - -guint64 -statfile_get_used_blocks (stat_file_t *file) -{ - struct stat_file_header *header; - - if (file == NULL || file->map == NULL) { - return (guint64) - 1; - } - - header = (struct stat_file_header *)file->map; - - return header->used_blocks; -} - -guint64 -statfile_get_total_blocks (stat_file_t *file) -{ - struct stat_file_header *header; - - if (file == NULL || file->map == NULL) { - return (guint64) - 1; - } - - header = (struct stat_file_header *)file->map; - - /* If total blocks is 0 we have old version of header, so set total blocks correctly */ - if (header->total_blocks == 0) { - header->total_blocks = file->cur_section.length; - } - - return header->total_blocks; -} - -static void -statfile_pool_invalidate_callback (gint fd, short what, void *ud) -{ - statfile_pool_t *pool = ud; - stat_file_t *file; - gint i; - - msg_info ("invalidating %d statfiles", pool->opened); - - for (i = 0; i < pool->opened; i++) { - file = &pool->files[i]; - msync (file->map, file->len, MS_ASYNC); - } - -} - - -void -statfile_pool_plan_invalidate (statfile_pool_t *pool, - time_t seconds, - time_t jitter) -{ - gboolean pending; - - - if (pool->invalidate_event != NULL) { - pending = evtimer_pending (pool->invalidate_event, NULL); - if (pending) { - /* Replan event */ - pool->invalidate_tv.tv_sec = seconds + - g_random_int_range (0, jitter); - pool->invalidate_tv.tv_usec = 0; - evtimer_add (pool->invalidate_event, &pool->invalidate_tv); - } - } - else { - pool->invalidate_event = - rspamd_mempool_alloc (pool->pool, sizeof (struct event)); - pool->invalidate_tv.tv_sec = seconds + g_random_int_range (0, jitter); - pool->invalidate_tv.tv_usec = 0; - evtimer_set (pool->invalidate_event, - statfile_pool_invalidate_callback, - pool); - evtimer_add (pool->invalidate_event, &pool->invalidate_tv); - msg_info ("invalidate of statfile pool is planned in %d seconds", - (gint)pool->invalidate_tv.tv_sec); - } -} - - -stat_file_t * -get_statfile_by_symbol (statfile_pool_t *pool, - struct rspamd_classifier_config *ccf, - const gchar *symbol, - struct rspamd_statfile_config **st, - gboolean try_create) -{ - stat_file_t *res = NULL; - GList *cur; - - if (pool == NULL || ccf == NULL || symbol == NULL) { - msg_err ("invalid input arguments"); - return NULL; - } - - cur = g_list_first (ccf->statfiles); - while (cur) { - *st = cur->data; - if (strcmp (symbol, (*st)->symbol) == 0) { - break; - } - *st = NULL; - cur = g_list_next (cur); - } - if (*st == NULL) { - msg_info ("cannot find statfile with symbol %s", symbol); - return NULL; - } - - if ((res = statfile_pool_is_open (pool, (*st)->path)) == NULL) { - if ((res = - statfile_pool_open (pool, (*st)->path, (*st)->size, - FALSE)) == NULL) { - msg_warn ("cannot open %s", (*st)->path); - if (try_create) { - if (statfile_pool_create (pool, (*st)->path, - (*st)->size) == -1) { - msg_err ("cannot create statfile %s", (*st)->path); - return NULL; - } - res = - statfile_pool_open (pool, (*st)->path, (*st)->size, FALSE); - if (res == NULL) { - msg_err ("cannot open statfile %s after creation", - (*st)->path); - } - } - } - } - - return res; -} - -void -statfile_pool_lockall (statfile_pool_t *pool) -{ - stat_file_t *file; - gint i; - - if (pool->mlock_ok) { - for (i = 0; i < pool->opened; i++) { - file = &pool->files[i]; - if (mlock (file->map, file->len) == -1) { - msg_warn ( - "mlock of statfile failed, maybe you need to increase RLIMIT_MEMLOCK limit for a process: %s", - strerror (errno)); - pool->mlock_ok = FALSE; - return; - } - } - } - /* Do not try to lock if mlock failed */ -} - diff --git a/src/libserver/statfile.h b/src/libserver/statfile.h deleted file mode 100644 index f7f632703..000000000 --- a/src/libserver/statfile.h +++ /dev/null @@ -1,310 +0,0 @@ -/** - * @file statfile.h - * Describes common methods for accessing statistics files and caching them in memory - */ - -#ifndef RSPAMD_STATFILE_H -#define RSPAMD_STATFILE_H - -#include "config.h" -#include "mem_pool.h" -#include "hash.h" - -#define CHAIN_LENGTH 128 - -/* Section types */ -#define STATFILE_SECTION_COMMON 1 -#define STATFILE_SECTION_HEADERS 2 -#define STATFILE_SECTION_URLS 3 -#define STATFILE_SECTION_REGEXP 4 - -#define DEFAULT_STATFILE_INVALIDATE_TIME 30 -#define DEFAULT_STATFILE_INVALIDATE_JITTER 30 - -/** - * Common statfile header - */ -struct stat_file_header { - u_char magic[3]; /**< magic signature ('r' 's' 'd') */ - u_char version[2]; /**< version of statfile */ - u_char padding[3]; /**< padding */ - guint64 create_time; /**< create time (time_t->guint64) */ - guint64 revision; /**< revision number */ - guint64 rev_time; /**< revision time */ - guint64 used_blocks; /**< used blocks number */ - guint64 total_blocks; /**< total number of blocks */ - u_char unused[239]; /**< some bytes that can be used in future */ -}; - -/** - * Section header - */ -struct stat_file_section { - guint64 code; /**< section's code */ - guint64 length; /**< section's length in blocks */ -}; - -/** - * Block of data in statfile - */ -struct stat_file_block { - guint32 hash1; /**< hash1 (also acts as index) */ - guint32 hash2; /**< hash2 */ - double value; /**< double value */ -}; - -/** - * Statistic file - */ -struct stat_file { - struct stat_file_header header; /**< header */ - struct stat_file_section section; /**< first section */ - struct stat_file_block blocks[1]; /**< first block of data */ -}; - -/** - * Common view of statfile object - */ -typedef struct stat_file_s { -#ifdef HAVE_PATH_MAX - gchar filename[PATH_MAX]; /**< name of file */ -#else - gchar filename[MAXPATHLEN]; /**< name of file */ -#endif - gint fd; /**< descriptor */ - void *map; /**< mmaped area */ - off_t seek_pos; /**< current seek position */ - struct stat_file_section cur_section; /**< current section */ - time_t open_time; /**< time when file was opened */ - time_t access_time; /**< last access time */ - size_t len; /**< length of file(in bytes) */ - rspamd_mempool_mutex_t *lock; /**< mutex */ -} stat_file_t; - -/** - * Statfiles pool - */ -typedef struct statfile_pool_s { - stat_file_t *files; /**< hash table of opened files indexed by name */ - void **maps; /**< shared hash table of mmaped areas indexed by name */ - gint opened; /**< number of opened files */ - rspamd_mempool_t *pool; /**< memory pool object */ - rspamd_mempool_mutex_t *lock; /**< mutex */ - struct event *invalidate_event; /**< event for pool invalidation */ - struct timeval invalidate_tv; - gboolean mlock_ok; /**< whether it is possible to use mlock (2) to avoid statfiles unloading */ -} statfile_pool_t; - -/* Forwarded declarations */ -struct rspamd_classifier_config; -struct rspamd_statfile_config; - -/** - * Create new statfile pool - * @param max_size maximum size - * @return statfile pool object - */ -statfile_pool_t * statfile_pool_new (rspamd_mempool_t *pool, - gboolean use_mlock); - -/** - * Open statfile and attach it to pool - * @param pool statfile pool object - * @param filename name of statfile to open - * @return 0 if specified statfile is attached and -1 in case of error - */ -stat_file_t * statfile_pool_open (statfile_pool_t *pool, - gchar *filename, - size_t len, - gboolean forced); - -/** - * Create new statfile but DOES NOT attach it to pool, use @see statfile_pool_open for attaching - * @param pool statfile pool object - * @param filename name of statfile to create - * @param len length of new statfile - * @return 0 if file was created and -1 in case of error - */ -gint statfile_pool_create (statfile_pool_t *pool, gchar *filename, size_t len); - -/** - * Close specified statfile - * @param pool statfile pool object - * @param filename name of statfile to close - * @param remove_hash remove filename from opened files hash also - * @return 0 if file was closed and -1 if statfile was not opened - */ -gint statfile_pool_close (statfile_pool_t *pool, - stat_file_t *file, - gboolean keep_sorted); - -/** - * Delete statfile pool and close all attached statfiles - * @param pool statfile pool object - */ -void statfile_pool_delete (statfile_pool_t *pool); - -/** - * Try to lock all statfiles in memory - * @param pool statfile pool object - */ -void statfile_pool_lockall (statfile_pool_t *pool); - -/** - * Lock specified file for exclusive use (eg. learning) - * @param pool statfile pool object - * @param filename name of statfile - */ -void statfile_pool_lock_file (statfile_pool_t *pool, stat_file_t *file); - -/** - * Unlock specified file - * @param pool statfile pool object - * @param filename name of statfile - */ -void statfile_pool_unlock_file (statfile_pool_t *pool, stat_file_t *file); - -/** - * Get block from statfile with h1 and h2 values, use time argument for current time - * @param pool statfile pool object - * @param filename name of statfile - * @param h1 h1 in file - * @param h2 h2 in file - * @param now current time - * @return block value or 0 if block is not found - */ -double statfile_pool_get_block (statfile_pool_t *pool, - stat_file_t *file, - guint32 h1, - guint32 h2, - time_t now); - -/** - * Set specified block in statfile - * @param pool statfile pool object - * @param filename name of statfile - * @param h1 h1 in file - * @param h2 h2 in file - * @param now current time - * @param value value of block - */ -void statfile_pool_set_block (statfile_pool_t *pool, - stat_file_t *file, - guint32 h1, - guint32 h2, - time_t now, - double value); - -/** - * Check whether statfile is opened - * @param pool statfile pool object - * @param filename name of statfile - * @return TRUE if specified statfile is opened and FALSE otherwise - */ -stat_file_t * statfile_pool_is_open (statfile_pool_t *pool, gchar *filename); - -/** - * Returns current statfile section - * @param pool statfile pool object - * @param filename name of statfile - * @return code of section or 0 if file is not opened - */ -guint32 statfile_pool_get_section (statfile_pool_t *pool, stat_file_t *file); - -/** - * Go to other section of statfile - * @param pool statfile pool object - * @param filename name of statfile - * @param code code of section to seek to - * @param from_begin search for section from begin of file if true - * @return TRUE if section was set and FALSE otherwise - */ -gboolean statfile_pool_set_section (statfile_pool_t *pool, - stat_file_t *file, - guint32 code, - gboolean from_begin); - -/** - * Add new section to statfile - * @param pool statfile pool object - * @param filename name of statfile - * @param code code of section to seek to - * @param length length in blocks of new section - * @return TRUE if section was successfully added and FALSE in case of error - */ -gboolean statfile_pool_add_section (statfile_pool_t *pool, - stat_file_t *file, - guint32 code, - guint64 length); - - -/** - * Return code of section identified by name - * @param name name of section - * @return code of section or 0 if name of section is unknown - */ -guint32 statfile_get_section_by_name (const gchar *name); - -/** - * Set statfile revision and revision time - * @param filename name of statfile - * @param revision number of revision - * @param time time of revision - * @return TRUE if revision was set - */ -gboolean statfile_set_revision (stat_file_t *file, guint64 rev, time_t time); - -/** - * Increment statfile revision and revision time - * @param filename name of statfile - * @param time time of revision - * @return TRUE if revision was set - */ -gboolean statfile_inc_revision (stat_file_t *file); - -/** - * Set statfile revision and revision time - * @param filename name of statfile - * @param revision saved number of revision - * @param time saved time of revision - * @return TRUE if revision was saved in rev and time - */ -gboolean statfile_get_revision (stat_file_t *file, guint64 *rev, time_t *time); - -/** - * Get statfile used blocks - * @param file file to get number of used blocks - * @return number of used blocks or (guint64)-1 in case of error - */ -guint64 statfile_get_used_blocks (stat_file_t *file); - -/** - * Get statfile total blocks - * @param file file to get number of used blocks - * @return number of used blocks or (guint64)-1 in case of error - */ -guint64 statfile_get_total_blocks (stat_file_t *file); - - -/** - * Plan statfile pool invalidation - */ -void statfile_pool_plan_invalidate (statfile_pool_t *pool, - time_t seconds, - time_t jitter); - -/** - * Get a statfile by symbol - * @param pool pool object - * @param ccf ccf classifier config - * @param symbol symbol to search - * @param st statfile to get - * @param try_create whether we need to create statfile if it is absent - */ -stat_file_t * get_statfile_by_symbol (statfile_pool_t *pool, - struct rspamd_classifier_config *ccf, - const gchar *symbol, - struct rspamd_statfile_config **st, - gboolean try_create); - -#endif diff --git a/src/libstat/CMakeLists.txt b/src/libstat/CMakeLists.txt index 810570f20..f1692de63 100644 --- a/src/libstat/CMakeLists.txt +++ b/src/libstat/CMakeLists.txt @@ -1,11 +1,14 @@ # Librspamdserver SET(LIBSTATSRC - ) + stat_config.c) SET(TOKENIZERSSRC tokenizers/tokenizers.c tokenizers/osb.c) SET(CLASSIFIERSSRC classifiers/classifiers.c classifiers/bayes.c) + +SET(BACKENDSSRC backends/backends.c + backends/mmaped_file.c) ADD_LIBRARY(rspamd-stat ${LINK_TYPE} ${LIBSTATSRC} ${TOKENIZERSSRC} ${CLASSIFIERSSRC}) IF(NOT DEBIAN_BUILD) diff --git a/src/libstat/backends.h b/src/libstat/backends.h new file mode 100644 index 000000000..04710b4b2 --- /dev/null +++ b/src/libstat/backends.h @@ -0,0 +1,43 @@ +/* + * Copyright (c) 2015, Vsevolod Stakhov + * + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY AUTHOR ''AS IS'' AND ANY + * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED + * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE + * DISCLAIMED. IN NO EVENT SHALL AUTHOR BE LIABLE FOR ANY + * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES + * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; + * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND + * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS + * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ +#ifndef BACKENDS_H_ +#define BACKENDS_H_ + +#include "config.h" +#include "cfg_file.h" + +#define RSPAMD_DEFAULT_BACKEND "mmap" + +struct rspamd_stat_backend { + const char *name; + gpointer (*init)(rspamd_mempool_t *pool, struct rspamd_statfile_config *cfg); + gpointer ctx; +}; + +extern struct rspamd_stat_backend statfile_backends[]; + +struct rspamd_stat_backend *rspamd_stat_get_backend (const char *name); + +#endif /* BACKENDS_H_ */ diff --git a/src/libstat/backends/backends.c b/src/libstat/backends/backends.c new file mode 100644 index 000000000..815a66dbd --- /dev/null +++ b/src/libstat/backends/backends.c @@ -0,0 +1,47 @@ +/* + * Copyright (c) 2015, Vsevolod Stakhov + * + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY AUTHOR ''AS IS'' AND ANY + * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED + * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE + * DISCLAIMED. IN NO EVENT SHALL AUTHOR BE LIABLE FOR ANY + * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES + * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; + * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND + * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS + * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +#include "main.h" +#include "backends.h" +#include "mmaped_file.h" + +struct rspamd_stat_backend statfile_backends[] = { + {RSPAMD_DEFAULT_BACKEND, } +}; + + +struct rspamd_stat_backend * +rspamd_stat_get_backend (const char *name) +{ + guint i; + + for (i = 0; i < G_N_ELEMENTS (statfile_backends); i++) { + if (strcmp (statfile_backends[i].name, name) == 0) { + return &statfile_backends[i]; + } + } + + return NULL; +} diff --git a/src/libstat/backends/mmaped_file.c b/src/libstat/backends/mmaped_file.c new file mode 100644 index 000000000..066671a95 --- /dev/null +++ b/src/libstat/backends/mmaped_file.c @@ -0,0 +1,1083 @@ +/* + * Copyright (c) 2009-2012, Vsevolod Stakhov + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY AUTHOR ''AS IS'' AND ANY + * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED + * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE + * DISCLAIMED. IN NO EVENT SHALL AUTHOR BE LIABLE FOR ANY + * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES + * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; + * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND + * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS + * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +#include "config.h" + +#include "statfile.h" +#include "main.h" + +#define RSPAMD_STATFILE_VERSION {'1', '2'} +#define BACKUP_SUFFIX ".old" + +/* Maximum number of statistics files */ +#define STATFILES_MAX 255 +static void statfile_pool_set_block_common ( + statfile_pool_t * pool, stat_file_t * file, + guint32 h1, guint32 h2, + time_t t, double value, + gboolean from_now); + +static gint +cmpstatfile (const void *a, const void *b) +{ + const stat_file_t *s1 = a, *s2 = b; + + return g_ascii_strcasecmp (s1->filename, s2->filename); +} + +/* Convert statfile version 1.0 to statfile version 1.2, saving backup */ +struct stat_file_header_10 { + u_char magic[3]; /**< magic signature ('r' 's' 'd') */ + u_char version[2]; /**< version of statfile */ + u_char padding[3]; /**< padding */ + guint64 create_time; /**< create time (time_t->guint64) */ +}; + +static gboolean +convert_statfile_10 (stat_file_t * file) +{ + gchar *backup_name; + struct stat st; + struct stat_file_header header = { + .magic = {'r', 's', 'd'}, + .version = RSPAMD_STATFILE_VERSION, + .padding = {0, 0, 0}, + .revision = 0, + .rev_time = 0 + }; + + + /* Format backup name */ + backup_name = g_strdup_printf ("%s.%s", file->filename, BACKUP_SUFFIX); + + msg_info ("convert old statfile %s to version %c.%c, backup in %s", + file->filename, + header.version[0], + header.version[1], + backup_name); + + if (stat (backup_name, &st) != -1) { + msg_info ("replace old %s", backup_name); + unlink (backup_name); + } + + rename (file->filename, backup_name); + g_free (backup_name); + + /* XXX: maybe race condition here */ + rspamd_file_unlock (file->fd, FALSE); + close (file->fd); + if ((file->fd = + open (file->filename, O_RDWR | O_TRUNC | O_CREAT, + S_IWUSR | S_IRUSR)) == -1) { + msg_info ("cannot create file %s, error %d, %s", + file->filename, + errno, + strerror (errno)); + return FALSE; + } + rspamd_file_lock (file->fd, FALSE); + /* Now make new header and copy it to new file */ + if (write (file->fd, &header, sizeof (header)) == -1) { + msg_info ("cannot write to file %s, error %d, %s", + file->filename, + errno, + strerror (errno)); + return FALSE; + } + /* Now write old map to new file */ + if (write (file->fd, + ((u_char *)file->map + sizeof (struct stat_file_header_10)), + file->len - sizeof (struct stat_file_header_10)) == -1) { + msg_info ("cannot write to file %s, error %d, %s", + file->filename, + errno, + strerror (errno)); + return FALSE; + } + /* Unmap old memory and map new */ + munmap (file->map, file->len); + file->len = file->len + sizeof (struct stat_file_header) - + sizeof (struct stat_file_header_10); +#ifdef HAVE_MMAP_NOCORE + if ((file->map = + mmap (NULL, file->len, PROT_READ | PROT_WRITE, MAP_SHARED | MAP_NOCORE, + file->fd, 0)) == MAP_FAILED) { +#else + if ((file->map = + mmap (NULL, file->len, PROT_READ | PROT_WRITE, MAP_SHARED, file->fd, + 0)) == MAP_FAILED) { +#endif + msg_info ("cannot mmap file %s, error %d, %s", + file->filename, + errno, + strerror (errno)); + return FALSE; + } + + return TRUE; +} + +/* Check whether specified file is statistic file and calculate its len in blocks */ +static gint +statfile_pool_check (stat_file_t * file) +{ + struct stat_file *f; + gchar *c; + static gchar valid_version[] = RSPAMD_STATFILE_VERSION; + + + if (!file || !file->map) { + return -1; + } + + if (file->len < sizeof (struct stat_file)) { + msg_info ("file %s is too short to be stat file: %z", + file->filename, + file->len); + return -1; + } + + f = (struct stat_file *)file->map; + c = f->header.magic; + /* Check magic and version */ + if (*c++ != 'r' || *c++ != 's' || *c++ != 'd') { + msg_info ("file %s is invalid stat file", file->filename); + return -1; + } + /* Now check version and convert old version to new one (that can be used for sync */ + if (*c == 1 && *(c + 1) == 0) { + if (!convert_statfile_10 (file)) { + return -1; + } + f = (struct stat_file *)file->map; + } + else if (memcmp (c, valid_version, sizeof (valid_version)) != 0) { + /* Unknown version */ + msg_info ("file %s has invalid version %c.%c", + file->filename, + '0' + *c, + '0' + *(c + 1)); + return -1; + } + + /* Check first section and set new offset */ + file->cur_section.code = f->section.code; + file->cur_section.length = f->section.length; + if (file->cur_section.length * sizeof (struct stat_file_block) > + file->len) { + msg_info ("file %s is truncated: %z, must be %z", + file->filename, + file->len, + file->cur_section.length * sizeof (struct stat_file_block)); + return -1; + } + file->seek_pos = sizeof (struct stat_file) - + sizeof (struct stat_file_block); + + return 0; +} + + +statfile_pool_t * +statfile_pool_new (rspamd_mempool_t *pool, gboolean use_mlock) +{ + statfile_pool_t *new; + + new = rspamd_mempool_alloc0 (pool, sizeof (statfile_pool_t)); + new->pool = rspamd_mempool_new (rspamd_mempool_suggest_size ()); + new->files = + rspamd_mempool_alloc0 (new->pool, STATFILES_MAX * sizeof (stat_file_t)); + new->lock = rspamd_mempool_get_mutex (new->pool); + new->mlock_ok = use_mlock; + + return new; +} + +static stat_file_t * +statfile_pool_reindex (statfile_pool_t * pool, + gchar *filename, + size_t old_size, + size_t size) +{ + gchar *backup; + gint fd; + stat_file_t *new; + u_char *map, *pos; + struct stat_file_block *block; + struct stat_file_header *header; + + if (size < + sizeof (struct stat_file_header) + sizeof (struct stat_file_section) + + sizeof (block)) { + msg_err ("file %s is too small to carry any statistic: %z", + filename, + size); + return NULL; + } + + /* First of all rename old file */ + rspamd_mempool_lock_mutex (pool->lock); + + backup = g_strconcat (filename, ".old", NULL); + if (rename (filename, backup) == -1) { + msg_err ("cannot rename %s to %s: %s", filename, backup, strerror ( + errno)); + g_free (backup); + rspamd_mempool_unlock_mutex (pool->lock); + return NULL; + } + + rspamd_mempool_unlock_mutex (pool->lock); + + /* Now create new file with required size */ + if (statfile_pool_create (pool, filename, size) != 0) { + msg_err ("cannot create new file"); + g_free (backup); + return NULL; + } + /* Now open new file and start copying */ + fd = open (backup, O_RDONLY); + new = statfile_pool_open (pool, filename, size, TRUE); + + if (fd == -1 || new == NULL) { + msg_err ("cannot open file: %s", strerror (errno)); + g_free (backup); + return NULL; + } + + /* Now start reading blocks from old statfile */ + if ((map = + mmap (NULL, old_size, PROT_READ, MAP_SHARED, fd, 0)) == MAP_FAILED) { + msg_err ("cannot mmap file: %s", strerror (errno)); + close (fd); + g_free (backup); + return NULL; + } + + pos = map + (sizeof (struct stat_file) - sizeof (struct stat_file_block)); + while (old_size - (pos - map) >= sizeof (struct stat_file_block)) { + block = (struct stat_file_block *)pos; + if (block->hash1 != 0 && block->value != 0) { + statfile_pool_set_block_common (pool, + new, + block->hash1, + block->hash2, + 0, + block->value, + FALSE); + } + pos += sizeof (block); + } + + header = (struct stat_file_header *)map; + statfile_set_revision (new, header->revision, header->rev_time); + + munmap (map, old_size); + close (fd); + unlink (backup); + g_free (backup); + + return new; + +} + +/* + * Pre-load mmaped file into memory + */ +static void +statfile_preload (stat_file_t *file) +{ + guint8 *pos, *end; + volatile guint8 t; + gsize size; + + pos = (guint8 *)file->map; + end = (guint8 *)file->map + file->len; + + if (madvise (pos, end - pos, MADV_SEQUENTIAL) == -1) { + msg_info ("madvise failed: %s", strerror (errno)); + } + else { + /* Load pages of file */ +#ifdef HAVE_GETPAGESIZE + size = getpagesize (); +#else + size = sysconf (_SC_PAGESIZE); +#endif + while (pos < end) { + t = *pos; + (void)t; + pos += size; + } + } +} + +stat_file_t * +statfile_pool_open (statfile_pool_t * pool, + gchar *filename, + size_t size, + gboolean forced) +{ + struct stat st; + stat_file_t *new_file; + + if ((new_file = statfile_pool_is_open (pool, filename)) != NULL) { + return new_file; + } + + if (pool->opened >= STATFILES_MAX - 1) { + msg_err ("reached hard coded limit of statfiles opened: %d", + STATFILES_MAX); + return NULL; + } + + if (stat (filename, &st) == -1) { + msg_info ("cannot stat file %s, error %s, %d", filename, strerror ( + errno), errno); + return NULL; + } + + rspamd_mempool_lock_mutex (pool->lock); + if (!forced && + labs (size - st.st_size) > (long)sizeof (struct stat_file) * 2 + && size > sizeof (struct stat_file)) { + rspamd_mempool_unlock_mutex (pool->lock); + msg_warn ("need to reindex statfile old size: %Hz, new size: %Hz", + (size_t)st.st_size, size); + return statfile_pool_reindex (pool, filename, st.st_size, size); + } + else if (size < sizeof (struct stat_file)) { + msg_err ("requested to shrink statfile to %Hz but it is too small", + size); + } + + new_file = &pool->files[pool->opened++]; + bzero (new_file, sizeof (stat_file_t)); + if ((new_file->fd = open (filename, O_RDWR)) == -1) { + msg_info ("cannot open file %s, error %d, %s", + filename, + errno, + strerror (errno)); + rspamd_mempool_unlock_mutex (pool->lock); + pool->opened--; + return NULL; + } + + if ((new_file->map = + mmap (NULL, st.st_size, PROT_READ | PROT_WRITE, MAP_SHARED, + new_file->fd, 0)) == MAP_FAILED) { + close (new_file->fd); + rspamd_mempool_unlock_mutex (pool->lock); + msg_info ("cannot mmap file %s, error %d, %s", + filename, + errno, + strerror (errno)); + pool->opened--; + return NULL; + + } + + rspamd_strlcpy (new_file->filename, filename, sizeof (new_file->filename)); + new_file->len = st.st_size; + /* Try to lock pages in RAM */ + if (pool->mlock_ok) { + if (mlock (new_file->map, new_file->len) == -1) { + msg_warn ( + "mlock of statfile failed, maybe you need to increase RLIMIT_MEMLOCK limit for a process: %s", + strerror (errno)); + pool->mlock_ok = FALSE; + } + } + /* Acquire lock for this operation */ + rspamd_file_lock (new_file->fd, FALSE); + if (statfile_pool_check (new_file) == -1) { + pool->opened--; + rspamd_mempool_unlock_mutex (pool->lock); + rspamd_file_unlock (new_file->fd, FALSE); + munmap (new_file->map, st.st_size); + return NULL; + } + rspamd_file_unlock (new_file->fd, FALSE); + + new_file->open_time = time (NULL); + new_file->access_time = new_file->open_time; + new_file->lock = rspamd_mempool_get_mutex (pool->pool); + + statfile_preload (new_file); + + rspamd_mempool_unlock_mutex (pool->lock); + + return statfile_pool_is_open (pool, filename); +} + +gint +statfile_pool_close (statfile_pool_t * pool, + stat_file_t * file, + gboolean keep_sorted) +{ + stat_file_t *pos; + + if ((pos = statfile_pool_is_open (pool, file->filename)) == NULL) { + msg_info ("file %s is not opened", file->filename); + return -1; + } + + rspamd_mempool_lock_mutex (pool->lock); + + if (file->map) { + msg_info ("syncing statfile %s", file->filename); + msync (file->map, file->len, MS_ASYNC); + munmap (file->map, file->len); + } + if (file->fd != -1) { + close (file->fd); + } + /* Move the remain statfiles */ + memmove (pos, ((guint8 *)pos) + sizeof (stat_file_t), + (--pool->opened - (pos - pool->files)) * sizeof (stat_file_t)); + + rspamd_mempool_unlock_mutex (pool->lock); + + return 0; +} + +gint +statfile_pool_create (statfile_pool_t * pool, gchar *filename, size_t size) +{ + struct stat_file_header header = { + .magic = {'r', 's', 'd'}, + .version = RSPAMD_STATFILE_VERSION, + .padding = {0, 0, 0}, + .revision = 0, + .rev_time = 0, + .used_blocks = 0 + }; + struct stat_file_section section = { + .code = STATFILE_SECTION_COMMON, + }; + struct stat_file_block block = { 0, 0, 0 }; + gint fd; + guint buflen = 0, nblocks; + gchar *buf = NULL; + + if (statfile_pool_is_open (pool, filename) != NULL) { + msg_info ("file %s is already opened", filename); + return 0; + } + + if (size < + sizeof (struct stat_file_header) + sizeof (struct stat_file_section) + + sizeof (block)) { + msg_err ("file %s is too small to carry any statistic: %z", + filename, + size); + return -1; + } + + rspamd_mempool_lock_mutex (pool->lock); + nblocks = + (size - sizeof (struct stat_file_header) - + sizeof (struct stat_file_section)) / sizeof (struct stat_file_block); + header.total_blocks = nblocks; + + if ((fd = + open (filename, O_RDWR | O_TRUNC | O_CREAT, S_IWUSR | S_IRUSR)) == -1) { + msg_info ("cannot create file %s, error %d, %s", + filename, + errno, + strerror (errno)); + rspamd_mempool_unlock_mutex (pool->lock); + return -1; + } + + rspamd_fallocate (fd, + 0, + sizeof (header) + sizeof (section) + sizeof (block) * nblocks); + + header.create_time = (guint64) time (NULL); + if (write (fd, &header, sizeof (header)) == -1) { + msg_info ("cannot write header to file %s, error %d, %s", + filename, + errno, + strerror (errno)); + close (fd); + rspamd_mempool_unlock_mutex (pool->lock); + return -1; + } + + section.length = (guint64) nblocks; + if (write (fd, §ion, sizeof (section)) == -1) { + msg_info ("cannot write section header to file %s, error %d, %s", + filename, + errno, + strerror (errno)); + close (fd); + rspamd_mempool_unlock_mutex (pool->lock); + return -1; + } + + /* Buffer for write 256 blocks at once */ + if (nblocks > 256) { + buflen = sizeof (block) * 256; + buf = g_malloc0 (buflen); + } + + while (nblocks) { + if (nblocks > 256) { + /* Just write buffer */ + if (write (fd, buf, buflen) == -1) { + msg_info ("cannot write blocks buffer to file %s, error %d, %s", + filename, + errno, + strerror (errno)); + close (fd); + rspamd_mempool_unlock_mutex (pool->lock); + g_free (buf); + return -1; + } + nblocks -= 256; + } + else { + if (write (fd, &block, sizeof (block)) == -1) { + msg_info ("cannot write block to file %s, error %d, %s", + filename, + errno, + strerror (errno)); + close (fd); + if (buf) { + g_free (buf); + } + rspamd_mempool_unlock_mutex (pool->lock); + return -1; + } + nblocks--; + } + } + + close (fd); + rspamd_mempool_unlock_mutex (pool->lock); + + if (buf) { + g_free (buf); + } + + return 0; +} + +void +statfile_pool_delete (statfile_pool_t * pool) +{ + gint i; + + for (i = 0; i < pool->opened; i++) { + statfile_pool_close (pool, &pool->files[i], FALSE); + } + rspamd_mempool_delete (pool->pool); +} + +void +statfile_pool_lock_file (statfile_pool_t * pool, stat_file_t * file) +{ + + rspamd_mempool_lock_mutex (file->lock); +} + +void +statfile_pool_unlock_file (statfile_pool_t * pool, stat_file_t * file) +{ + + rspamd_mempool_unlock_mutex (file->lock); +} + +double +statfile_pool_get_block (statfile_pool_t * pool, + stat_file_t * file, + guint32 h1, + guint32 h2, + time_t now) +{ + struct stat_file_block *block; + guint i, blocknum; + u_char *c; + + + file->access_time = now; + if (!file->map) { + return 0; + } + + blocknum = h1 % file->cur_section.length; + c = (u_char *) file->map + file->seek_pos + blocknum * + sizeof (struct stat_file_block); + block = (struct stat_file_block *)c; + + for (i = 0; i < CHAIN_LENGTH; i++) { + if (i + blocknum >= file->cur_section.length) { + break; + } + if (block->hash1 == h1 && block->hash2 == h2) { + return block->value; + } + c += sizeof (struct stat_file_block); + block = (struct stat_file_block *)c; + } + + + return 0; +} + +static void +statfile_pool_set_block_common (statfile_pool_t * pool, + stat_file_t * file, + guint32 h1, + guint32 h2, + time_t t, + double value, + gboolean from_now) +{ + struct stat_file_block *block, *to_expire = NULL; + struct stat_file_header *header; + guint i, blocknum; + u_char *c; + double min = G_MAXDOUBLE; + + if (from_now) { + file->access_time = t; + } + if (!file->map) { + return; + } + + blocknum = h1 % file->cur_section.length; + header = (struct stat_file_header *)file->map; + c = (u_char *) file->map + file->seek_pos + blocknum * + sizeof (struct stat_file_block); + block = (struct stat_file_block *)c; + + for (i = 0; i < CHAIN_LENGTH; i++) { + if (i + blocknum >= file->cur_section.length) { + /* Need to expire some block in chain */ + msg_info ("chain %ud is full in statfile %s, starting expire", + blocknum, + file->filename); + break; + } + /* First try to find block in chain */ + if (block->hash1 == h1 && block->hash2 == h2) { + block->value = value; + return; + } + /* Check whether we have a free block in chain */ + if (block->hash1 == 0 && block->hash2 == 0) { + /* Write new block here */ + msg_debug ("found free block %ud in chain %ud, set h1=%ud, h2=%ud", + i, + blocknum, + h1, + h2); + block->hash1 = h1; + block->hash2 = h2; + block->value = value; + header->used_blocks++; + + return; + } + + /* Expire block with minimum value otherwise */ + if (block->value < min) { + to_expire = block; + min = block->value; + } + c += sizeof (struct stat_file_block); + block = (struct stat_file_block *)c; + } + + /* Try expire some block */ + if (to_expire) { + block = to_expire; + } + else { + /* Expire first block in chain */ + c = (u_char *) file->map + file->seek_pos + blocknum * + sizeof (struct stat_file_block); + block = (struct stat_file_block *)c; + } + + block->hash1 = h1; + block->hash2 = h2; + block->value = value; +} + +void +statfile_pool_set_block (statfile_pool_t * pool, + stat_file_t * file, + guint32 h1, + guint32 h2, + time_t now, + double value) +{ + statfile_pool_set_block_common (pool, file, h1, h2, now, value, TRUE); +} + +stat_file_t * +statfile_pool_is_open (statfile_pool_t * pool, gchar *filename) +{ + static stat_file_t f, *ret; + rspamd_strlcpy (f.filename, filename, sizeof (f.filename)); + ret = lfind (&f, + pool->files, + (size_t *)&pool->opened, + sizeof (stat_file_t), + cmpstatfile); + return ret; +} + +guint32 +statfile_pool_get_section (statfile_pool_t * pool, stat_file_t * file) +{ + + return file->cur_section.code; +} + +gboolean +statfile_pool_set_section (statfile_pool_t * pool, + stat_file_t * file, + guint32 code, + gboolean from_begin) +{ + struct stat_file_section *sec; + off_t cur_offset; + + + /* Try to find section */ + if (from_begin) { + cur_offset = sizeof (struct stat_file_header); + } + else { + cur_offset = file->seek_pos - sizeof (struct stat_file_section); + } + while (cur_offset < (off_t)file->len) { + sec = (struct stat_file_section *)((gchar *)file->map + cur_offset); + if (sec->code == code) { + file->cur_section.code = code; + file->cur_section.length = sec->length; + file->seek_pos = cur_offset + sizeof (struct stat_file_section); + return TRUE; + } + cur_offset += sec->length; + } + + return FALSE; +} + +gboolean +statfile_pool_add_section (statfile_pool_t * pool, + stat_file_t * file, + guint32 code, + guint64 length) +{ + struct stat_file_section sect; + struct stat_file_block block = { 0, 0, 0 }; + + if (lseek (file->fd, 0, SEEK_END) == -1) { + msg_info ("cannot lseek file %s, error %d, %s", + file->filename, + errno, + strerror (errno)); + return FALSE; + } + + sect.code = code; + sect.length = length; + + if (write (file->fd, §, sizeof (sect)) == -1) { + msg_info ("cannot write block to file %s, error %d, %s", + file->filename, + errno, + strerror (errno)); + return FALSE; + } + + while (length--) { + if (write (file->fd, &block, sizeof (block)) == -1) { + msg_info ("cannot write block to file %s, error %d, %s", + file->filename, + errno, + strerror (errno)); + return FALSE; + } + } + + /* Lock statfile to remap memory */ + statfile_pool_lock_file (pool, file); + munmap (file->map, file->len); + fsync (file->fd); + file->len += length; + + if ((file->map = + mmap (NULL, file->len, PROT_READ | PROT_WRITE, MAP_SHARED, file->fd, + 0)) == NULL) { + msg_info ("cannot mmap file %s, error %d, %s", + file->filename, + errno, + strerror (errno)); + return FALSE; + } + statfile_pool_unlock_file (pool, file); + + return TRUE; + +} + +guint32 +statfile_get_section_by_name (const gchar *name) +{ + if (g_ascii_strcasecmp (name, "common") == 0) { + return STATFILE_SECTION_COMMON; + } + else if (g_ascii_strcasecmp (name, "header") == 0) { + return STATFILE_SECTION_HEADERS; + } + else if (g_ascii_strcasecmp (name, "url") == 0) { + return STATFILE_SECTION_URLS; + } + else if (g_ascii_strcasecmp (name, "regexp") == 0) { + return STATFILE_SECTION_REGEXP; + } + + return 0; +} + +gboolean +statfile_set_revision (stat_file_t *file, guint64 rev, time_t time) +{ + struct stat_file_header *header; + + if (file == NULL || file->map == NULL) { + return FALSE; + } + + header = (struct stat_file_header *)file->map; + + header->revision = rev; + header->rev_time = time; + + return TRUE; +} + +gboolean +statfile_inc_revision (stat_file_t *file) +{ + struct stat_file_header *header; + + if (file == NULL || file->map == NULL) { + return FALSE; + } + + header = (struct stat_file_header *)file->map; + + header->revision++; + + return TRUE; +} + +gboolean +statfile_get_revision (stat_file_t *file, guint64 *rev, time_t *time) +{ + struct stat_file_header *header; + + if (file == NULL || file->map == NULL) { + return FALSE; + } + + header = (struct stat_file_header *)file->map; + + if (rev != NULL) { + *rev = header->revision; + } + if (time != NULL) { + *time = header->rev_time; + } + + return TRUE; +} + +guint64 +statfile_get_used_blocks (stat_file_t *file) +{ + struct stat_file_header *header; + + if (file == NULL || file->map == NULL) { + return (guint64) - 1; + } + + header = (struct stat_file_header *)file->map; + + return header->used_blocks; +} + +guint64 +statfile_get_total_blocks (stat_file_t *file) +{ + struct stat_file_header *header; + + if (file == NULL || file->map == NULL) { + return (guint64) - 1; + } + + header = (struct stat_file_header *)file->map; + + /* If total blocks is 0 we have old version of header, so set total blocks correctly */ + if (header->total_blocks == 0) { + header->total_blocks = file->cur_section.length; + } + + return header->total_blocks; +} + +static void +statfile_pool_invalidate_callback (gint fd, short what, void *ud) +{ + statfile_pool_t *pool = ud; + stat_file_t *file; + gint i; + + msg_info ("invalidating %d statfiles", pool->opened); + + for (i = 0; i < pool->opened; i++) { + file = &pool->files[i]; + msync (file->map, file->len, MS_ASYNC); + } + +} + + +void +statfile_pool_plan_invalidate (statfile_pool_t *pool, + time_t seconds, + time_t jitter) +{ + gboolean pending; + + + if (pool->invalidate_event != NULL) { + pending = evtimer_pending (pool->invalidate_event, NULL); + if (pending) { + /* Replan event */ + pool->invalidate_tv.tv_sec = seconds + + g_random_int_range (0, jitter); + pool->invalidate_tv.tv_usec = 0; + evtimer_add (pool->invalidate_event, &pool->invalidate_tv); + } + } + else { + pool->invalidate_event = + rspamd_mempool_alloc (pool->pool, sizeof (struct event)); + pool->invalidate_tv.tv_sec = seconds + g_random_int_range (0, jitter); + pool->invalidate_tv.tv_usec = 0; + evtimer_set (pool->invalidate_event, + statfile_pool_invalidate_callback, + pool); + evtimer_add (pool->invalidate_event, &pool->invalidate_tv); + msg_info ("invalidate of statfile pool is planned in %d seconds", + (gint)pool->invalidate_tv.tv_sec); + } +} + + +stat_file_t * +get_statfile_by_symbol (statfile_pool_t *pool, + struct rspamd_classifier_config *ccf, + const gchar *symbol, + struct rspamd_statfile_config **st, + gboolean try_create) +{ + stat_file_t *res = NULL; + GList *cur; + + if (pool == NULL || ccf == NULL || symbol == NULL) { + msg_err ("invalid input arguments"); + return NULL; + } + + cur = g_list_first (ccf->statfiles); + while (cur) { + *st = cur->data; + if (strcmp (symbol, (*st)->symbol) == 0) { + break; + } + *st = NULL; + cur = g_list_next (cur); + } + if (*st == NULL) { + msg_info ("cannot find statfile with symbol %s", symbol); + return NULL; + } + + if ((res = statfile_pool_is_open (pool, (*st)->path)) == NULL) { + if ((res = + statfile_pool_open (pool, (*st)->path, (*st)->size, + FALSE)) == NULL) { + msg_warn ("cannot open %s", (*st)->path); + if (try_create) { + if (statfile_pool_create (pool, (*st)->path, + (*st)->size) == -1) { + msg_err ("cannot create statfile %s", (*st)->path); + return NULL; + } + res = + statfile_pool_open (pool, (*st)->path, (*st)->size, FALSE); + if (res == NULL) { + msg_err ("cannot open statfile %s after creation", + (*st)->path); + } + } + } + } + + return res; +} + +void +statfile_pool_lockall (statfile_pool_t *pool) +{ + stat_file_t *file; + gint i; + + if (pool->mlock_ok) { + for (i = 0; i < pool->opened; i++) { + file = &pool->files[i]; + if (mlock (file->map, file->len) == -1) { + msg_warn ( + "mlock of statfile failed, maybe you need to increase RLIMIT_MEMLOCK limit for a process: %s", + strerror (errno)); + pool->mlock_ok = FALSE; + return; + } + } + } + /* Do not try to lock if mlock failed */ +} + diff --git a/src/libstat/backends/mmaped_file.h b/src/libstat/backends/mmaped_file.h new file mode 100644 index 000000000..f7f632703 --- /dev/null +++ b/src/libstat/backends/mmaped_file.h @@ -0,0 +1,310 @@ +/** + * @file statfile.h + * Describes common methods for accessing statistics files and caching them in memory + */ + +#ifndef RSPAMD_STATFILE_H +#define RSPAMD_STATFILE_H + +#include "config.h" +#include "mem_pool.h" +#include "hash.h" + +#define CHAIN_LENGTH 128 + +/* Section types */ +#define STATFILE_SECTION_COMMON 1 +#define STATFILE_SECTION_HEADERS 2 +#define STATFILE_SECTION_URLS 3 +#define STATFILE_SECTION_REGEXP 4 + +#define DEFAULT_STATFILE_INVALIDATE_TIME 30 +#define DEFAULT_STATFILE_INVALIDATE_JITTER 30 + +/** + * Common statfile header + */ +struct stat_file_header { + u_char magic[3]; /**< magic signature ('r' 's' 'd') */ + u_char version[2]; /**< version of statfile */ + u_char padding[3]; /**< padding */ + guint64 create_time; /**< create time (time_t->guint64) */ + guint64 revision; /**< revision number */ + guint64 rev_time; /**< revision time */ + guint64 used_blocks; /**< used blocks number */ + guint64 total_blocks; /**< total number of blocks */ + u_char unused[239]; /**< some bytes that can be used in future */ +}; + +/** + * Section header + */ +struct stat_file_section { + guint64 code; /**< section's code */ + guint64 length; /**< section's length in blocks */ +}; + +/** + * Block of data in statfile + */ +struct stat_file_block { + guint32 hash1; /**< hash1 (also acts as index) */ + guint32 hash2; /**< hash2 */ + double value; /**< double value */ +}; + +/** + * Statistic file + */ +struct stat_file { + struct stat_file_header header; /**< header */ + struct stat_file_section section; /**< first section */ + struct stat_file_block blocks[1]; /**< first block of data */ +}; + +/** + * Common view of statfile object + */ +typedef struct stat_file_s { +#ifdef HAVE_PATH_MAX + gchar filename[PATH_MAX]; /**< name of file */ +#else + gchar filename[MAXPATHLEN]; /**< name of file */ +#endif + gint fd; /**< descriptor */ + void *map; /**< mmaped area */ + off_t seek_pos; /**< current seek position */ + struct stat_file_section cur_section; /**< current section */ + time_t open_time; /**< time when file was opened */ + time_t access_time; /**< last access time */ + size_t len; /**< length of file(in bytes) */ + rspamd_mempool_mutex_t *lock; /**< mutex */ +} stat_file_t; + +/** + * Statfiles pool + */ +typedef struct statfile_pool_s { + stat_file_t *files; /**< hash table of opened files indexed by name */ + void **maps; /**< shared hash table of mmaped areas indexed by name */ + gint opened; /**< number of opened files */ + rspamd_mempool_t *pool; /**< memory pool object */ + rspamd_mempool_mutex_t *lock; /**< mutex */ + struct event *invalidate_event; /**< event for pool invalidation */ + struct timeval invalidate_tv; + gboolean mlock_ok; /**< whether it is possible to use mlock (2) to avoid statfiles unloading */ +} statfile_pool_t; + +/* Forwarded declarations */ +struct rspamd_classifier_config; +struct rspamd_statfile_config; + +/** + * Create new statfile pool + * @param max_size maximum size + * @return statfile pool object + */ +statfile_pool_t * statfile_pool_new (rspamd_mempool_t *pool, + gboolean use_mlock); + +/** + * Open statfile and attach it to pool + * @param pool statfile pool object + * @param filename name of statfile to open + * @return 0 if specified statfile is attached and -1 in case of error + */ +stat_file_t * statfile_pool_open (statfile_pool_t *pool, + gchar *filename, + size_t len, + gboolean forced); + +/** + * Create new statfile but DOES NOT attach it to pool, use @see statfile_pool_open for attaching + * @param pool statfile pool object + * @param filename name of statfile to create + * @param len length of new statfile + * @return 0 if file was created and -1 in case of error + */ +gint statfile_pool_create (statfile_pool_t *pool, gchar *filename, size_t len); + +/** + * Close specified statfile + * @param pool statfile pool object + * @param filename name of statfile to close + * @param remove_hash remove filename from opened files hash also + * @return 0 if file was closed and -1 if statfile was not opened + */ +gint statfile_pool_close (statfile_pool_t *pool, + stat_file_t *file, + gboolean keep_sorted); + +/** + * Delete statfile pool and close all attached statfiles + * @param pool statfile pool object + */ +void statfile_pool_delete (statfile_pool_t *pool); + +/** + * Try to lock all statfiles in memory + * @param pool statfile pool object + */ +void statfile_pool_lockall (statfile_pool_t *pool); + +/** + * Lock specified file for exclusive use (eg. learning) + * @param pool statfile pool object + * @param filename name of statfile + */ +void statfile_pool_lock_file (statfile_pool_t *pool, stat_file_t *file); + +/** + * Unlock specified file + * @param pool statfile pool object + * @param filename name of statfile + */ +void statfile_pool_unlock_file (statfile_pool_t *pool, stat_file_t *file); + +/** + * Get block from statfile with h1 and h2 values, use time argument for current time + * @param pool statfile pool object + * @param filename name of statfile + * @param h1 h1 in file + * @param h2 h2 in file + * @param now current time + * @return block value or 0 if block is not found + */ +double statfile_pool_get_block (statfile_pool_t *pool, + stat_file_t *file, + guint32 h1, + guint32 h2, + time_t now); + +/** + * Set specified block in statfile + * @param pool statfile pool object + * @param filename name of statfile + * @param h1 h1 in file + * @param h2 h2 in file + * @param now current time + * @param value value of block + */ +void statfile_pool_set_block (statfile_pool_t *pool, + stat_file_t *file, + guint32 h1, + guint32 h2, + time_t now, + double value); + +/** + * Check whether statfile is opened + * @param pool statfile pool object + * @param filename name of statfile + * @return TRUE if specified statfile is opened and FALSE otherwise + */ +stat_file_t * statfile_pool_is_open (statfile_pool_t *pool, gchar *filename); + +/** + * Returns current statfile section + * @param pool statfile pool object + * @param filename name of statfile + * @return code of section or 0 if file is not opened + */ +guint32 statfile_pool_get_section (statfile_pool_t *pool, stat_file_t *file); + +/** + * Go to other section of statfile + * @param pool statfile pool object + * @param filename name of statfile + * @param code code of section to seek to + * @param from_begin search for section from begin of file if true + * @return TRUE if section was set and FALSE otherwise + */ +gboolean statfile_pool_set_section (statfile_pool_t *pool, + stat_file_t *file, + guint32 code, + gboolean from_begin); + +/** + * Add new section to statfile + * @param pool statfile pool object + * @param filename name of statfile + * @param code code of section to seek to + * @param length length in blocks of new section + * @return TRUE if section was successfully added and FALSE in case of error + */ +gboolean statfile_pool_add_section (statfile_pool_t *pool, + stat_file_t *file, + guint32 code, + guint64 length); + + +/** + * Return code of section identified by name + * @param name name of section + * @return code of section or 0 if name of section is unknown + */ +guint32 statfile_get_section_by_name (const gchar *name); + +/** + * Set statfile revision and revision time + * @param filename name of statfile + * @param revision number of revision + * @param time time of revision + * @return TRUE if revision was set + */ +gboolean statfile_set_revision (stat_file_t *file, guint64 rev, time_t time); + +/** + * Increment statfile revision and revision time + * @param filename name of statfile + * @param time time of revision + * @return TRUE if revision was set + */ +gboolean statfile_inc_revision (stat_file_t *file); + +/** + * Set statfile revision and revision time + * @param filename name of statfile + * @param revision saved number of revision + * @param time saved time of revision + * @return TRUE if revision was saved in rev and time + */ +gboolean statfile_get_revision (stat_file_t *file, guint64 *rev, time_t *time); + +/** + * Get statfile used blocks + * @param file file to get number of used blocks + * @return number of used blocks or (guint64)-1 in case of error + */ +guint64 statfile_get_used_blocks (stat_file_t *file); + +/** + * Get statfile total blocks + * @param file file to get number of used blocks + * @return number of used blocks or (guint64)-1 in case of error + */ +guint64 statfile_get_total_blocks (stat_file_t *file); + + +/** + * Plan statfile pool invalidation + */ +void statfile_pool_plan_invalidate (statfile_pool_t *pool, + time_t seconds, + time_t jitter); + +/** + * Get a statfile by symbol + * @param pool pool object + * @param ccf ccf classifier config + * @param symbol symbol to search + * @param st statfile to get + * @param try_create whether we need to create statfile if it is absent + */ +stat_file_t * get_statfile_by_symbol (statfile_pool_t *pool, + struct rspamd_classifier_config *ccf, + const gchar *symbol, + struct rspamd_statfile_config **st, + gboolean try_create); + +#endif diff --git a/src/libstat/classifiers.h b/src/libstat/classifiers.h index d13178486..2c2f33449 100644 --- a/src/libstat/classifiers.h +++ b/src/libstat/classifiers.h @@ -45,7 +45,7 @@ struct classifier { }; /* Get classifier structure by name or return NULL if this name is not found */ -struct classifier * get_classifier (const char *name); +struct classifier * rspamd_stat_get_classifier (const char *name); /* Bayes algorithm */ struct classifier_ctx * bayes_init (rspamd_mempool_t *pool, diff --git a/src/libstat/classifiers/classifiers.c b/src/libstat/classifiers/classifiers.c index 6af7d2dc8..a3efb53c1 100644 --- a/src/libstat/classifiers/classifiers.c +++ b/src/libstat/classifiers/classifiers.c @@ -40,7 +40,7 @@ struct classifier classifiers[] = { }; struct classifier * -get_classifier (const char *name) +rspamd_stat_get_classifier (const char *name) { guint i; diff --git a/src/libstat/stat_config.c b/src/libstat/stat_config.c new file mode 100644 index 000000000..fd2c0f165 --- /dev/null +++ b/src/libstat/stat_config.c @@ -0,0 +1,30 @@ +/* + * Copyright (c) 2015, Vsevolod Stakhov + * + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY AUTHOR ''AS IS'' AND ANY + * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED + * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE + * DISCLAIMED. IN NO EVENT SHALL AUTHOR BE LIABLE FOR ANY + * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES + * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; + * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND + * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS + * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +#include "config.h" +#include "stat_api.h" +#include "main.h" +#include "cfg_rcl.h" + diff --git a/src/libstat/tokenizers.h b/src/libstat/tokenizers.h index ed47e0add..c0d2e8934 100644 --- a/src/libstat/tokenizers.h +++ b/src/libstat/tokenizers.h @@ -33,7 +33,7 @@ struct tokenizer { int token_node_compare_func (gconstpointer a, gconstpointer b); /* Get tokenizer structure by name or return NULL if this name is not found */ -struct tokenizer * get_tokenizer (const char *name); +struct tokenizer * rspamd_stat_get_tokenizer (const char *name); /* Get next word from specified f_str_t buf */ gchar * rspamd_tokenizer_get_word (rspamd_fstring_t *buf, diff --git a/src/libstat/tokenizers/tokenizers.c b/src/libstat/tokenizers/tokenizers.c index 3e6c745ec..ce221397d 100644 --- a/src/libstat/tokenizers/tokenizers.c +++ b/src/libstat/tokenizers/tokenizers.c @@ -26,7 +26,6 @@ * Common tokenization functions */ -#include #include "main.h" #include "tokenizers.h" @@ -77,7 +76,7 @@ const gchar t_delimiters[255] = { }; struct tokenizer * -get_tokenizer (const char *name) +rspamd_stat_get_tokenizer (const char *name) { guint i; @@ -230,7 +229,7 @@ tokenize_subject (struct rspamd_task *task, GTree ** tree) (rspamd_mempool_destruct_t) g_tree_destroy, *tree); } - osb_tokenizer = get_tokenizer ("osb-text"); + osb_tokenizer = rspamd_stat_get_tokenizer ("osb-text"); /* Try to use pre-defined subject */ if (task->subject != NULL) { diff --git a/src/lua/lua_classifier.c b/src/lua/lua_classifier.c index 346f5d64b..7adc473ba 100644 --- a/src/lua/lua_classifier.c +++ b/src/lua/lua_classifier.c @@ -45,16 +45,12 @@ static const struct luaL_reg classifierlib_m[] = { LUA_FUNCTION_DEF (statfile, get_symbol); LUA_FUNCTION_DEF (statfile, get_label); -LUA_FUNCTION_DEF (statfile, get_path); -LUA_FUNCTION_DEF (statfile, get_size); LUA_FUNCTION_DEF (statfile, is_spam); LUA_FUNCTION_DEF (statfile, get_param); static const struct luaL_reg statfilelib_m[] = { LUA_INTERFACE_DEF (statfile, get_symbol), LUA_INTERFACE_DEF (statfile, get_label), - LUA_INTERFACE_DEF (statfile, get_path), - LUA_INTERFACE_DEF (statfile, get_size), LUA_INTERFACE_DEF (statfile, is_spam), LUA_INTERFACE_DEF (statfile, get_param), {"__tostring", rspamd_lua_class_tostring}, @@ -351,36 +347,6 @@ lua_statfile_get_label (lua_State *L) return 1; } -static gint -lua_statfile_get_path (lua_State *L) -{ - struct rspamd_statfile_config *st = lua_check_statfile (L); - - if (st != NULL) { - lua_pushstring (L, st->path); - } - else { - lua_pushnil (L); - } - - return 1; -} - -static gint -lua_statfile_get_size (lua_State *L) -{ - struct rspamd_statfile_config *st = lua_check_statfile (L); - - if (st != NULL) { - lua_pushinteger (L, st->size); - } - else { - lua_pushnil (L); - } - - return 1; -} - static gint lua_statfile_is_spam (lua_State *L) { diff --git a/src/lua/lua_common.c b/src/lua/lua_common.c index 8df878585..73471719b 100644 --- a/src/lua/lua_common.c +++ b/src/lua/lua_common.c @@ -355,9 +355,8 @@ gboolean rspamd_init_lua_filters (struct rspamd_config *cfg) { struct rspamd_config **pcfg; - GList *cur, *tmp; + GList *cur; struct script_module *module; - struct rspamd_statfile_config *st; lua_State *L = cfg->lua_state; cur = g_list_first (cfg->script_modules); @@ -395,24 +394,7 @@ rspamd_init_lua_filters (struct rspamd_config *cfg) } cur = g_list_next (cur); } - /* Init statfiles normalizers */ - cur = g_list_first (cfg->statfiles); - while (cur) { - st = cur->data; - if (st->normalizer == rspamd_lua_normalize) { - tmp = st->normalizer_data; - if (tmp && (tmp = g_list_next (tmp))) { - if (tmp->data) { - /* Code must be loaded from data */ - if (luaL_loadstring (L, tmp->data) != 0) { - msg_info ("cannot load normalizer code %s", tmp->data); - return FALSE; - } - } - } - } - cur = g_list_next (cur); - } + /* Assign state */ cfg->lua_state = L;