From 61d89e0791f1d1a2d590ee7c05e6ad5066d1ee3d Mon Sep 17 00:00:00 2001 From: Vsevolod Stakhov Date: Fri, 6 Nov 2009 20:31:03 +0300 Subject: [PATCH] * Add binlog API implementation --- CMakeLists.txt | 45 ++-- src/binlog.c | 458 ++++++++++++++++++++++++++++++++++++ src/binlog.h | 59 +++++ src/classifiers/winnow.c | 4 +- src/tokenizers/tokenizers.h | 1 + 5 files changed, 544 insertions(+), 23 deletions(-) create mode 100644 src/binlog.c create mode 100644 src/binlog.h diff --git a/CMakeLists.txt b/CMakeLists.txt index a8f4b6768..8c58586d0 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -344,36 +344,37 @@ ENDIF(DEBUG_MODE MATCHES "ON") ################################ SOURCES SECTION ########################### SET(RSPAMDSRC src/modules.c - src/hash.c - src/worker.c - src/util.c - src/url.c - src/upstream.c - src/statfile.c - src/protocol.c - src/message.c - src/fuzzy.c - src/expressions.c - src/mem_pool.c - src/memcached.c - src/main.c - src/fstring.c - src/filter.c - src/controller.c - src/cfg_utils.c + src/binlog.c + src/bloom.c src/buffer.c + src/cfg_utils.c + src/controller.c src/events.c + src/expressions.c + src/filter.c + src/fstring.c + src/fuzzy.c + src/fuzzy_storage.c + src/hash.c src/html.c src/lmtp.c - src/spf.c src/lmtp_proto.c - src/radix.c - src/view.c + src/main.c src/map.c - src/bloom.c + src/memcached.c + src/mem_pool.c + src/message.c + src/protocol.c + src/radix.c src/settings.c + src/spf.c + src/statfile.c src/symbols_cache.c - src/fuzzy_storage.c) + src/upstream.c + src/url.c + src/util.c + src/view.c + src/worker.c) IF(ENABLE_PERL MATCHES "ON") LIST(APPEND RSPAMDSRC src/perl.c) diff --git a/src/binlog.c b/src/binlog.c new file mode 100644 index 000000000..457e56a42 --- /dev/null +++ b/src/binlog.c @@ -0,0 +1,458 @@ +/* + * Copyright (c) 2009, Rambler media + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY Rambler media ''AS IS'' AND ANY + * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED + * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE + * DISCLAIMED. IN NO EVENT SHALL Rambler BE LIABLE FOR ANY + * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES + * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; + * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND + * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS + * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +#include "config.h" +#include "binlog.h" +#include "tokenizers/tokenizers.h" + +#define BINLOG_SUFFIX ".binlog" +#define BACKUP_SUFFIX ".old" +#define VALID_MAGIC { 'r', 's', 'l' } +#define VALID_VERSION { '1', '0' } + +static gboolean +binlog_write_header (struct rspamd_binlog *log) +{ + struct rspamd_binlog_header header = { + .magic = VALID_MAGIC, + .version = VALID_VERSION, + .padding = { '\0', '\0' }, + }; + + header.create_time = time (NULL); + lock_file (log->fd, FALSE); + + if (write (log->fd, &header, sizeof (struct rspamd_binlog_header)) == -1) { + msg_warn ("binlog_write_header: cannot write file %s, error %d, %s", log->filename, errno, strerror (errno)); + return FALSE; + } + + + memcpy (&log->header, &header, sizeof (struct rspamd_binlog_header)); + + /* Metaindex */ + log->metaindex = g_malloc (sizeof (struct rspamd_binlog_metaindex)); + bzero (log->metaindex, sizeof (struct rspamd_binlog_metaindex)); + if (write (log->fd, log->metaindex, sizeof (struct rspamd_binlog_metaindex)) == -1) { + g_free (log->metaindex); + msg_warn ("binlog_write_header: cannot write file %s, error %d, %s", log->filename, errno, strerror (errno)); + unlock_file (log->fd, FALSE); + return FALSE; + } + g_free (log->metaindex); + /* Now mmap metaindex to memory */ + if ((log->metaindex = mmap (NULL, sizeof (struct rspamd_binlog_metaindex), + PROT_READ | PROT_WRITE, MAP_SHARED, log->fd, sizeof (struct rspamd_binlog_header))) == MAP_FAILED) { + msg_warn ("binlog_write_header: cannot mmap file %s, error %d, %s", log->filename, errno, strerror (errno)); + unlock_file (log->fd, FALSE); + return FALSE; + } + + /* First index block */ + + /* Offset to metaindex */ + log->metaindex->indexes[0] = sizeof (struct rspamd_binlog_metaindex) + sizeof (struct rspamd_binlog_header); + /* Alloc, write, mmap */ + log->cur_idx = g_malloc (sizeof (struct rspamd_index_block)); + bzero (log->cur_idx, sizeof (struct rspamd_index_block)); + if (write (log->fd, log->cur_idx, sizeof (struct rspamd_index_block)) == -1) { + g_free (log->cur_idx); + msg_warn ("binlog_write_header: cannot write file %s, error %d, %s", log->filename, errno, strerror (errno)); + unlock_file (log->fd, FALSE); + return FALSE; + } + g_free (log->cur_idx); + /* Now mmap it to memory */ + if ((log->cur_idx = mmap (NULL, sizeof (struct rspamd_index_block), + PROT_READ | PROT_WRITE, MAP_SHARED, log->fd, log->metaindex->indexes[0])) == MAP_FAILED) { + msg_warn ("binlog_write_header: cannot mmap file %s, error %d, %s", log->filename, errno, strerror (errno)); + unlock_file (log->fd, FALSE); + return FALSE; + } + + unlock_file (log->fd, FALSE); + + return TRUE; +} + +static gboolean +binlog_check_file (struct rspamd_binlog *log) +{ + static char valid_magic[] = VALID_MAGIC, valid_version[] = VALID_VERSION; + + if (read (log->fd, &log->header, sizeof (struct rspamd_binlog_header)) != sizeof (struct rspamd_binlog_header)) { + msg_warn ("binlog_check_file: cannot read file %s, error %d, %s", log->filename, errno, strerror (errno)); + return FALSE; + } + + /* Now check all fields */ + if (memcmp (&log->header.magic, valid_magic, sizeof (valid_magic)) != 0 || + memcmp (&log->header.version, valid_version, sizeof (valid_version)) != 0) { + msg_warn ("binlog_check_file: cannot validate file %s"); + return FALSE; + } + /* Now mmap metaindex and current index */ + if ((log->metaindex = mmap (NULL, sizeof (struct rspamd_binlog_metaindex), + PROT_READ | PROT_WRITE, MAP_SHARED, log->fd, sizeof (struct rspamd_binlog_header))) == MAP_FAILED) { + msg_warn ("binlog_check file: cannot mmap file %s, error %d, %s", log->filename, errno, strerror (errno)); + return FALSE; + } + /* Current index */ + if ((log->cur_idx = mmap (NULL, sizeof (struct rspamd_index_block), + PROT_READ | PROT_WRITE, MAP_SHARED, log->fd, log->metaindex->indexes[log->metaindex->last_index])) == MAP_FAILED) { + msg_warn ("binlog_check_file: cannot mmap file %s, error %d, %s", log->filename, errno, strerror (errno)); + return FALSE; + } + + return TRUE; + +} + +static gboolean +binlog_create (struct rspamd_binlog *log) +{ + if ((log->fd = open (log->filename, O_RDWR | O_TRUNC | O_CREAT, S_IWUSR | S_IRUSR)) == -1) { + msg_info ("binlog_create: cannot create file %s, error %d, %s", log->filename, errno, strerror (errno)); + return FALSE; + } + + return binlog_write_header (log); +} + +static gboolean +binlog_open_real (struct rspamd_binlog *log) +{ + if ((log->fd = open (log->filename, O_RDWR)) == -1) { + msg_info ("binlog_open: cannot open file %s, error %d, %s", log->filename, errno, strerror (errno)); + return FALSE; + } + + return binlog_check_file (log); +} + + +struct rspamd_binlog* +binlog_open (memory_pool_t *pool, const char *path, time_t rotate_time, int rotate_jitter) +{ + struct rspamd_binlog *new; + int len = strlen (path); + struct stat st; + + new = memory_pool_alloc0 (pool, sizeof (struct rspamd_binlog)); + new->pool = pool; + new->rotate_time = rotate_time; + new->rotate_jitter = g_random_int_range (0, rotate_jitter); + new->fd = -1; + + new->filename = memory_pool_alloc (pool, len + sizeof (BINLOG_SUFFIX)); + g_strlcpy (new->filename, path, len + 1); + g_strlcpy (new->filename + len, BINLOG_SUFFIX, sizeof (BINLOG_SUFFIX)); + + if (stat (new->filename, &st) == -1) { + /* Check errno to check whether we should create this file */ + if (errno != ENOENT) { + msg_err ("binlog_open: cannot stat file: %s, error %s", new->filename, strerror (errno)); + return NULL; + } + else { + /* In case of ENOENT try to create binlog */ + if (!binlog_create (new)) { + return NULL; + } + } + } + else { + /* Try to open binlog */ + if (!binlog_open_real (new)) { + return NULL; + } + } + + return new; +} + +void +binlog_close (struct rspamd_binlog *log) +{ + if (log) { + if (log->metaindex) { + munmap (log->metaindex, sizeof (struct rspamd_binlog_metaindex)); + } + if (log->cur_idx) { + munmap (log->cur_idx, sizeof (struct rspamd_index_block)); + } + close (log->fd); + } +} + +static gboolean +binlog_tree_callback (gpointer key, gpointer value, gpointer data) +{ + token_node_t *node = key; + struct rspamd_binlog *log = data; + struct rspamd_binlog_element elt; + + elt.h1 = node->h1; + elt.h2 = node->h2; + elt.value = node->value; + + if (write (log->fd, &elt, sizeof (elt)) == -1) { + msg_info ("write_binlog_tree: cannot write token to file: %s, error: %s", log->filename, strerror (errno)); + return TRUE; + } + + return FALSE; +} + +static gboolean +write_binlog_tree (struct rspamd_binlog *log, GTree *nodes) +{ + off_t seek; + struct rspamd_binlog_index *idx; + + /* Seek to end of file */ + if ((seek = lseek (log->fd, 0, SEEK_END)) == -1) { + msg_info ("binlog_insert: cannot seek in file: %s, error: %s", log->filename, strerror (errno)); + return FALSE; + } + + lock_file (log->fd, FALSE); + /* Write index */ + log->cur_idx->last_index ++; + log->cur_seq ++; + + idx = &log->cur_idx->indexes[log->cur_idx->last_index]; + idx->seek = seek; + idx->time = (uint64_t)time (NULL); + idx->len = g_tree_nnodes (nodes) * sizeof (struct rspamd_binlog_element); + + /* Now write all nodes to file */ + g_tree_foreach (nodes, binlog_tree_callback, (gpointer)log); + + unlock_file (log->fd, FALSE); + + return TRUE; +} + +static gboolean +create_new_metaindex_block (struct rspamd_binlog *log) +{ + off_t seek; + + /* Seek to end of file */ + if ((seek = lseek (log->fd, 0, SEEK_END)) == -1) { + msg_info ("create_new_metaindex_block: cannot seek in file: %s, error: %s", log->filename, strerror (errno)); + return FALSE; + } + + lock_file (log->fd, FALSE); + + log->metaindex->last_index ++; + /* Offset to metaindex */ + log->metaindex->indexes[log->metaindex->last_index] = seek; + munmap (log->cur_idx, sizeof (struct rspamd_index_block)); + /* Alloc, write, mmap */ + log->cur_idx = g_malloc (sizeof (struct rspamd_index_block)); + bzero (log->cur_idx, sizeof (struct rspamd_index_block)); + if (write (log->fd, log->cur_idx, sizeof (struct rspamd_index_block)) == -1) { + unlock_file (log->fd, FALSE); + g_free (log->cur_idx); + msg_warn ("create_new_metaindex_block: cannot write file %s, error %d, %s", log->filename, errno, strerror (errno)); + return FALSE; + } + g_free (log->cur_idx); + /* Now mmap it to memory */ + if ((log->cur_idx = mmap (NULL, sizeof (struct rspamd_index_block), + PROT_READ | PROT_WRITE, MAP_SHARED, log->fd, seek)) == MAP_FAILED) { + log->cur_idx = NULL; + unlock_file (log->fd, FALSE); + msg_warn ("create_new_metaindex_block: cannot mmap file %s, error %d, %s", log->filename, errno, strerror (errno)); + return FALSE; + } + unlock_file (log->fd, FALSE); + + return TRUE; +} + +static gboolean +maybe_rotate_binlog (struct rspamd_binlog *log) +{ + uint64_t now = time (NULL); + + if ((now - log->header.create_time) > log->rotate_time + log->rotate_jitter) { + return TRUE; + } + return FALSE; +} + +static gboolean +rotate_binlog (struct rspamd_binlog *log) +{ + char *backup_name; + struct stat st; + + lock_file (log->fd, FALSE); + + /* Unmap mapped fragments */ + if (log->metaindex) { + munmap (log->metaindex, sizeof (struct rspamd_binlog_metaindex)); + log->metaindex = NULL; + } + if (log->cur_idx) { + munmap (log->cur_idx, sizeof (struct rspamd_index_block)); + log->cur_idx = NULL; + } + /* Format backup name */ + backup_name = g_strdup_printf ("%s.%s", log->filename, BACKUP_SUFFIX); + + if (stat (backup_name, &st) != -1) { + msg_info ("rotate_binlog: replace old %s", backup_name); + unlink (backup_name); + } + + rename (log->filename, backup_name); + g_free (backup_name); + + /* XXX: maybe race condition here */ + unlock_file (log->fd, FALSE); + close (log->fd); + + return binlog_create (log); + +} + +gboolean +binlog_insert (struct rspamd_binlog *log, GTree *nodes) +{ + off_t seek; + + if (!log || !log->metaindex || !log->cur_idx || !nodes) { + msg_info ("binlog_insert: improperly opened binlog: %s", log->filename); + return FALSE; + } + + if (maybe_rotate_binlog (log)) { + if (!rotate_binlog (log)) { + return FALSE; + } + } + /* First of all try to place new tokens in current index */ + if (log->cur_idx->last_index < BINLOG_IDX_LEN) { + /* All is ok */ + return write_binlog_tree (log, nodes); + } + /* Current index table is all busy, try to allocate new index */ + + /* Check metaindex free space */ + if (log->metaindex->last_index < METAINDEX_LEN) { + /* Create new index block */ + if ((seek = lseek (log->fd, 0, SEEK_END)) == -1) { + msg_info ("binlog_insert: cannot seek in file: %s, error: %s", log->filename, strerror (errno)); + return FALSE; + } + if (!create_new_metaindex_block (log)) { + return FALSE; + } + return write_binlog_tree (log, nodes); + } + + /* All binlog is filled, we need to rotate it forcefully */ + if (!rotate_binlog (log)) { + return FALSE; + } + + return write_binlog_tree (log, nodes); +} + +gboolean +binlog_sync (struct rspamd_binlog *log, uint64_t from_rev, uint64_t from_time, GByteArray **rep) +{ + uint32_t metaindex_num; + struct rspamd_index_block *idxb; + struct rspamd_binlog_index *idx; + gboolean idx_mapped = FALSE, res = TRUE; + + if (!log || !log->metaindex || !log->cur_idx) { + msg_info ("binlog_sync: improperly opened binlog: %s", log->filename); + return FALSE; + } + + if (*rep == NULL) { + *rep = g_malloc (sizeof (GByteArray)); + } + else { + /* Unmap old fragment */ + munmap ((*rep)->data, (*rep)->len); + } + + if (from_rev == log->cur_seq) { + /* Last record */ + *rep = NULL; + return FALSE; + } + + metaindex_num = from_rev / BINLOG_IDX_LEN; + /* First of all try to find this revision */ + if (metaindex_num > log->metaindex->last_index) { + return FALSE; + } + else if (metaindex_num != log->metaindex->last_index) { + /* Need to remap index block */ + lock_file (log->fd, FALSE); + if ((idxb = mmap (NULL, sizeof (struct rspamd_index_block), + PROT_READ | PROT_WRITE, MAP_SHARED, + log->fd, log->metaindex->indexes[metaindex_num])) == MAP_FAILED) { + unlock_file (log->fd, FALSE); + msg_warn ("binlog_sync: cannot mmap file %s, error %d, %s", log->filename, errno, strerror (errno)); + return FALSE; + } + idx_mapped = TRUE; + } + else { + idxb = log->cur_idx; + } + /* Now check specified index */ + idx = &idxb->indexes[from_rev % BINLOG_IDX_LEN]; + if (idx->time != from_time) { + res = FALSE; + goto end; + } + + /* Now fill reply structure */ + (*rep)->len = idx->len; + /* MMap result */ + if (((*rep)->data = mmap (NULL, idx->len, PROT_READ, MAP_SHARED, log->fd, idx->seek)) == MAP_FAILED) { + msg_warn ("binlog_sync: cannot mmap file %s, error %d, %s", log->filename, errno, strerror (errno)); + res = FALSE; + goto end; + } + +end: + if (idx_mapped) { + munmap (idxb, sizeof (struct rspamd_index_block)); + } + + return res; +} + diff --git a/src/binlog.h b/src/binlog.h new file mode 100644 index 000000000..bf2666840 --- /dev/null +++ b/src/binlog.h @@ -0,0 +1,59 @@ +#ifndef RSPAMD_BINLOG_H +#define RSPAMD_BINLOG_H + +#include "config.h" +#include "main.h" +#include "statfile.h" + +/* How much records are in a single index */ +#define BINLOG_IDX_LEN 200 +#define METAINDEX_LEN 1024 + +struct rspamd_binlog_header { + char magic[3]; + char version[2]; + char padding[3]; + uint64_t create_time; +} __attribute__((__packed__)); + +struct rspamd_binlog_index { + uint64_t time; + uint64_t seek; + uint32_t len; +}; + +struct rspamd_index_block { + struct rspamd_binlog_index indexes[BINLOG_IDX_LEN]; + uint32_t last_index; +}; + +struct rspamd_binlog_metaindex { + uint64_t indexes[METAINDEX_LEN]; + uint64_t last_index; +}; + +struct rspamd_binlog_element { + uint32_t h1; + uint32_t h2; + float value; +} __attribute__((__packed__)); + +struct rspamd_binlog { + char *filename; + time_t rotate_time; + int rotate_jitter; + uint64_t cur_seq; + int fd; + memory_pool_t *pool; + + struct rspamd_binlog_header header; + struct rspamd_binlog_metaindex *metaindex; + struct rspamd_index_block *cur_idx; +}; + +struct rspamd_binlog* binlog_open (memory_pool_t *pool, const char *path, time_t rotate_time, int rotate_jitter); +void binlog_close (struct rspamd_binlog *log); +gboolean binlog_insert (struct rspamd_binlog *log, GTree *nodes); +gboolean binlog_sync (struct rspamd_binlog *log, uint64_t from_rev, uint64_t from_time, GByteArray **rep); + +#endif diff --git a/src/classifiers/winnow.c b/src/classifiers/winnow.c index acf12a462..cc2a0cc23 100644 --- a/src/classifiers/winnow.c +++ b/src/classifiers/winnow.c @@ -77,11 +77,13 @@ learn_callback (gpointer key, gpointer value, gpointer data) /* Consider that not found blocks have value 1 */ if ((v = statfile_pool_get_block (cd->pool, cd->file, node->h1, node->h2, cd->now)) < 0.00001) { statfile_pool_set_block (cd->pool, cd->file, node->h1, node->h2, cd->now, c); + node->value = c; } else { statfile_pool_set_block (cd->pool, cd->file, node->h1, node->h2, cd->now, v * c); + node->value = v * c; } - + cd->count++; return FALSE; diff --git a/src/tokenizers/tokenizers.h b/src/tokenizers/tokenizers.h index ed3b66fcf..fda5bded3 100644 --- a/src/tokenizers/tokenizers.h +++ b/src/tokenizers/tokenizers.h @@ -17,6 +17,7 @@ typedef struct token_node_s { uint32_t h1; uint32_t h2; + float value; } token_node_t; /* Common tokenizer structure */ -- 2.39.5