diff options
Diffstat (limited to 'src/binlog.c')
-rw-r--r-- | src/binlog.c | 458 |
1 files changed, 458 insertions, 0 deletions
diff --git a/src/binlog.c b/src/binlog.c new file mode 100644 index 000000000..457e56a42 --- /dev/null +++ b/src/binlog.c @@ -0,0 +1,458 @@ +/* + * Copyright (c) 2009, Rambler media + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY Rambler media ''AS IS'' AND ANY + * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED + * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE + * DISCLAIMED. IN NO EVENT SHALL Rambler BE LIABLE FOR ANY + * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES + * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; + * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND + * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS + * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +#include "config.h" +#include "binlog.h" +#include "tokenizers/tokenizers.h" + +#define BINLOG_SUFFIX ".binlog" +#define BACKUP_SUFFIX ".old" +#define VALID_MAGIC { 'r', 's', 'l' } +#define VALID_VERSION { '1', '0' } + +static gboolean +binlog_write_header (struct rspamd_binlog *log) +{ + struct rspamd_binlog_header header = { + .magic = VALID_MAGIC, + .version = VALID_VERSION, + .padding = { '\0', '\0' }, + }; + + header.create_time = time (NULL); + lock_file (log->fd, FALSE); + + if (write (log->fd, &header, sizeof (struct rspamd_binlog_header)) == -1) { + msg_warn ("binlog_write_header: cannot write file %s, error %d, %s", log->filename, errno, strerror (errno)); + return FALSE; + } + + + memcpy (&log->header, &header, sizeof (struct rspamd_binlog_header)); + + /* Metaindex */ + log->metaindex = g_malloc (sizeof (struct rspamd_binlog_metaindex)); + bzero (log->metaindex, sizeof (struct rspamd_binlog_metaindex)); + if (write (log->fd, log->metaindex, sizeof (struct rspamd_binlog_metaindex)) == -1) { + g_free (log->metaindex); + msg_warn ("binlog_write_header: cannot write file %s, error %d, %s", log->filename, errno, strerror (errno)); + unlock_file (log->fd, FALSE); + return FALSE; + } + g_free (log->metaindex); + /* Now mmap metaindex to memory */ + if ((log->metaindex = mmap (NULL, sizeof (struct rspamd_binlog_metaindex), + PROT_READ | PROT_WRITE, MAP_SHARED, log->fd, sizeof (struct rspamd_binlog_header))) == MAP_FAILED) { + msg_warn ("binlog_write_header: cannot mmap file %s, error %d, %s", log->filename, errno, strerror (errno)); + unlock_file (log->fd, FALSE); + return FALSE; + } + + /* First index block */ + + /* Offset to metaindex */ + log->metaindex->indexes[0] = sizeof (struct rspamd_binlog_metaindex) + sizeof (struct rspamd_binlog_header); + /* Alloc, write, mmap */ + log->cur_idx = g_malloc (sizeof (struct rspamd_index_block)); + bzero (log->cur_idx, sizeof (struct rspamd_index_block)); + if (write (log->fd, log->cur_idx, sizeof (struct rspamd_index_block)) == -1) { + g_free (log->cur_idx); + msg_warn ("binlog_write_header: cannot write file %s, error %d, %s", log->filename, errno, strerror (errno)); + unlock_file (log->fd, FALSE); + return FALSE; + } + g_free (log->cur_idx); + /* Now mmap it to memory */ + if ((log->cur_idx = mmap (NULL, sizeof (struct rspamd_index_block), + PROT_READ | PROT_WRITE, MAP_SHARED, log->fd, log->metaindex->indexes[0])) == MAP_FAILED) { + msg_warn ("binlog_write_header: cannot mmap file %s, error %d, %s", log->filename, errno, strerror (errno)); + unlock_file (log->fd, FALSE); + return FALSE; + } + + unlock_file (log->fd, FALSE); + + return TRUE; +} + +static gboolean +binlog_check_file (struct rspamd_binlog *log) +{ + static char valid_magic[] = VALID_MAGIC, valid_version[] = VALID_VERSION; + + if (read (log->fd, &log->header, sizeof (struct rspamd_binlog_header)) != sizeof (struct rspamd_binlog_header)) { + msg_warn ("binlog_check_file: cannot read file %s, error %d, %s", log->filename, errno, strerror (errno)); + return FALSE; + } + + /* Now check all fields */ + if (memcmp (&log->header.magic, valid_magic, sizeof (valid_magic)) != 0 || + memcmp (&log->header.version, valid_version, sizeof (valid_version)) != 0) { + msg_warn ("binlog_check_file: cannot validate file %s"); + return FALSE; + } + /* Now mmap metaindex and current index */ + if ((log->metaindex = mmap (NULL, sizeof (struct rspamd_binlog_metaindex), + PROT_READ | PROT_WRITE, MAP_SHARED, log->fd, sizeof (struct rspamd_binlog_header))) == MAP_FAILED) { + msg_warn ("binlog_check file: cannot mmap file %s, error %d, %s", log->filename, errno, strerror (errno)); + return FALSE; + } + /* Current index */ + if ((log->cur_idx = mmap (NULL, sizeof (struct rspamd_index_block), + PROT_READ | PROT_WRITE, MAP_SHARED, log->fd, log->metaindex->indexes[log->metaindex->last_index])) == MAP_FAILED) { + msg_warn ("binlog_check_file: cannot mmap file %s, error %d, %s", log->filename, errno, strerror (errno)); + return FALSE; + } + + return TRUE; + +} + +static gboolean +binlog_create (struct rspamd_binlog *log) +{ + if ((log->fd = open (log->filename, O_RDWR | O_TRUNC | O_CREAT, S_IWUSR | S_IRUSR)) == -1) { + msg_info ("binlog_create: cannot create file %s, error %d, %s", log->filename, errno, strerror (errno)); + return FALSE; + } + + return binlog_write_header (log); +} + +static gboolean +binlog_open_real (struct rspamd_binlog *log) +{ + if ((log->fd = open (log->filename, O_RDWR)) == -1) { + msg_info ("binlog_open: cannot open file %s, error %d, %s", log->filename, errno, strerror (errno)); + return FALSE; + } + + return binlog_check_file (log); +} + + +struct rspamd_binlog* +binlog_open (memory_pool_t *pool, const char *path, time_t rotate_time, int rotate_jitter) +{ + struct rspamd_binlog *new; + int len = strlen (path); + struct stat st; + + new = memory_pool_alloc0 (pool, sizeof (struct rspamd_binlog)); + new->pool = pool; + new->rotate_time = rotate_time; + new->rotate_jitter = g_random_int_range (0, rotate_jitter); + new->fd = -1; + + new->filename = memory_pool_alloc (pool, len + sizeof (BINLOG_SUFFIX)); + g_strlcpy (new->filename, path, len + 1); + g_strlcpy (new->filename + len, BINLOG_SUFFIX, sizeof (BINLOG_SUFFIX)); + + if (stat (new->filename, &st) == -1) { + /* Check errno to check whether we should create this file */ + if (errno != ENOENT) { + msg_err ("binlog_open: cannot stat file: %s, error %s", new->filename, strerror (errno)); + return NULL; + } + else { + /* In case of ENOENT try to create binlog */ + if (!binlog_create (new)) { + return NULL; + } + } + } + else { + /* Try to open binlog */ + if (!binlog_open_real (new)) { + return NULL; + } + } + + return new; +} + +void +binlog_close (struct rspamd_binlog *log) +{ + if (log) { + if (log->metaindex) { + munmap (log->metaindex, sizeof (struct rspamd_binlog_metaindex)); + } + if (log->cur_idx) { + munmap (log->cur_idx, sizeof (struct rspamd_index_block)); + } + close (log->fd); + } +} + +static gboolean +binlog_tree_callback (gpointer key, gpointer value, gpointer data) +{ + token_node_t *node = key; + struct rspamd_binlog *log = data; + struct rspamd_binlog_element elt; + + elt.h1 = node->h1; + elt.h2 = node->h2; + elt.value = node->value; + + if (write (log->fd, &elt, sizeof (elt)) == -1) { + msg_info ("write_binlog_tree: cannot write token to file: %s, error: %s", log->filename, strerror (errno)); + return TRUE; + } + + return FALSE; +} + +static gboolean +write_binlog_tree (struct rspamd_binlog *log, GTree *nodes) +{ + off_t seek; + struct rspamd_binlog_index *idx; + + /* Seek to end of file */ + if ((seek = lseek (log->fd, 0, SEEK_END)) == -1) { + msg_info ("binlog_insert: cannot seek in file: %s, error: %s", log->filename, strerror (errno)); + return FALSE; + } + + lock_file (log->fd, FALSE); + /* Write index */ + log->cur_idx->last_index ++; + log->cur_seq ++; + + idx = &log->cur_idx->indexes[log->cur_idx->last_index]; + idx->seek = seek; + idx->time = (uint64_t)time (NULL); + idx->len = g_tree_nnodes (nodes) * sizeof (struct rspamd_binlog_element); + + /* Now write all nodes to file */ + g_tree_foreach (nodes, binlog_tree_callback, (gpointer)log); + + unlock_file (log->fd, FALSE); + + return TRUE; +} + +static gboolean +create_new_metaindex_block (struct rspamd_binlog *log) +{ + off_t seek; + + /* Seek to end of file */ + if ((seek = lseek (log->fd, 0, SEEK_END)) == -1) { + msg_info ("create_new_metaindex_block: cannot seek in file: %s, error: %s", log->filename, strerror (errno)); + return FALSE; + } + + lock_file (log->fd, FALSE); + + log->metaindex->last_index ++; + /* Offset to metaindex */ + log->metaindex->indexes[log->metaindex->last_index] = seek; + munmap (log->cur_idx, sizeof (struct rspamd_index_block)); + /* Alloc, write, mmap */ + log->cur_idx = g_malloc (sizeof (struct rspamd_index_block)); + bzero (log->cur_idx, sizeof (struct rspamd_index_block)); + if (write (log->fd, log->cur_idx, sizeof (struct rspamd_index_block)) == -1) { + unlock_file (log->fd, FALSE); + g_free (log->cur_idx); + msg_warn ("create_new_metaindex_block: cannot write file %s, error %d, %s", log->filename, errno, strerror (errno)); + return FALSE; + } + g_free (log->cur_idx); + /* Now mmap it to memory */ + if ((log->cur_idx = mmap (NULL, sizeof (struct rspamd_index_block), + PROT_READ | PROT_WRITE, MAP_SHARED, log->fd, seek)) == MAP_FAILED) { + log->cur_idx = NULL; + unlock_file (log->fd, FALSE); + msg_warn ("create_new_metaindex_block: cannot mmap file %s, error %d, %s", log->filename, errno, strerror (errno)); + return FALSE; + } + unlock_file (log->fd, FALSE); + + return TRUE; +} + +static gboolean +maybe_rotate_binlog (struct rspamd_binlog *log) +{ + uint64_t now = time (NULL); + + if ((now - log->header.create_time) > log->rotate_time + log->rotate_jitter) { + return TRUE; + } + return FALSE; +} + +static gboolean +rotate_binlog (struct rspamd_binlog *log) +{ + char *backup_name; + struct stat st; + + lock_file (log->fd, FALSE); + + /* Unmap mapped fragments */ + if (log->metaindex) { + munmap (log->metaindex, sizeof (struct rspamd_binlog_metaindex)); + log->metaindex = NULL; + } + if (log->cur_idx) { + munmap (log->cur_idx, sizeof (struct rspamd_index_block)); + log->cur_idx = NULL; + } + /* Format backup name */ + backup_name = g_strdup_printf ("%s.%s", log->filename, BACKUP_SUFFIX); + + if (stat (backup_name, &st) != -1) { + msg_info ("rotate_binlog: replace old %s", backup_name); + unlink (backup_name); + } + + rename (log->filename, backup_name); + g_free (backup_name); + + /* XXX: maybe race condition here */ + unlock_file (log->fd, FALSE); + close (log->fd); + + return binlog_create (log); + +} + +gboolean +binlog_insert (struct rspamd_binlog *log, GTree *nodes) +{ + off_t seek; + + if (!log || !log->metaindex || !log->cur_idx || !nodes) { + msg_info ("binlog_insert: improperly opened binlog: %s", log->filename); + return FALSE; + } + + if (maybe_rotate_binlog (log)) { + if (!rotate_binlog (log)) { + return FALSE; + } + } + /* First of all try to place new tokens in current index */ + if (log->cur_idx->last_index < BINLOG_IDX_LEN) { + /* All is ok */ + return write_binlog_tree (log, nodes); + } + /* Current index table is all busy, try to allocate new index */ + + /* Check metaindex free space */ + if (log->metaindex->last_index < METAINDEX_LEN) { + /* Create new index block */ + if ((seek = lseek (log->fd, 0, SEEK_END)) == -1) { + msg_info ("binlog_insert: cannot seek in file: %s, error: %s", log->filename, strerror (errno)); + return FALSE; + } + if (!create_new_metaindex_block (log)) { + return FALSE; + } + return write_binlog_tree (log, nodes); + } + + /* All binlog is filled, we need to rotate it forcefully */ + if (!rotate_binlog (log)) { + return FALSE; + } + + return write_binlog_tree (log, nodes); +} + +gboolean +binlog_sync (struct rspamd_binlog *log, uint64_t from_rev, uint64_t from_time, GByteArray **rep) +{ + uint32_t metaindex_num; + struct rspamd_index_block *idxb; + struct rspamd_binlog_index *idx; + gboolean idx_mapped = FALSE, res = TRUE; + + if (!log || !log->metaindex || !log->cur_idx) { + msg_info ("binlog_sync: improperly opened binlog: %s", log->filename); + return FALSE; + } + + if (*rep == NULL) { + *rep = g_malloc (sizeof (GByteArray)); + } + else { + /* Unmap old fragment */ + munmap ((*rep)->data, (*rep)->len); + } + + if (from_rev == log->cur_seq) { + /* Last record */ + *rep = NULL; + return FALSE; + } + + metaindex_num = from_rev / BINLOG_IDX_LEN; + /* First of all try to find this revision */ + if (metaindex_num > log->metaindex->last_index) { + return FALSE; + } + else if (metaindex_num != log->metaindex->last_index) { + /* Need to remap index block */ + lock_file (log->fd, FALSE); + if ((idxb = mmap (NULL, sizeof (struct rspamd_index_block), + PROT_READ | PROT_WRITE, MAP_SHARED, + log->fd, log->metaindex->indexes[metaindex_num])) == MAP_FAILED) { + unlock_file (log->fd, FALSE); + msg_warn ("binlog_sync: cannot mmap file %s, error %d, %s", log->filename, errno, strerror (errno)); + return FALSE; + } + idx_mapped = TRUE; + } + else { + idxb = log->cur_idx; + } + /* Now check specified index */ + idx = &idxb->indexes[from_rev % BINLOG_IDX_LEN]; + if (idx->time != from_time) { + res = FALSE; + goto end; + } + + /* Now fill reply structure */ + (*rep)->len = idx->len; + /* MMap result */ + if (((*rep)->data = mmap (NULL, idx->len, PROT_READ, MAP_SHARED, log->fd, idx->seek)) == MAP_FAILED) { + msg_warn ("binlog_sync: cannot mmap file %s, error %d, %s", log->filename, errno, strerror (errno)); + res = FALSE; + goto end; + } + +end: + if (idx_mapped) { + munmap (idxb, sizeof (struct rspamd_index_block)); + } + + return res; +} + |