/* * Copyright (c) 2009, Rambler media * All rights reserved. * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions are met: * * Redistributions of source code must retain the above copyright * notice, this list of conditions and the following disclaimer. * * Redistributions in binary form must reproduce the above copyright * notice, this list of conditions and the following disclaimer in the * documentation and/or other materials provided with the distribution. * * THIS SOFTWARE IS PROVIDED BY Rambler media ''AS IS'' AND ANY * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE * DISCLAIMED. IN NO EVENT SHALL Rambler BE LIABLE FOR ANY * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. */ #include "config.h" #include "binlog.h" #include "cfg_file.h" #include "tokenizers/tokenizers.h" #define BINLOG_SUFFIX ".binlog" #define BACKUP_SUFFIX ".old" #define VALID_MAGIC { 'r', 's', 'l' } #define VALID_VERSION { '1', '0' } static GHashTable *binlog_opened = NULL; static memory_pool_t *binlog_pool = NULL; static gboolean binlog_write_header (struct rspamd_binlog *log) { struct rspamd_binlog_header header = { .magic = VALID_MAGIC, .version = VALID_VERSION, .padding = { '\0', '\0' }, }; header.create_time = time (NULL); lock_file (log->fd, FALSE); if (write (log->fd, &header, sizeof (struct rspamd_binlog_header)) == -1) { msg_warn ("cannot write file %s, error %d, %s", log->filename, errno, strerror (errno)); return FALSE; } memcpy (&log->header, &header, sizeof (struct rspamd_binlog_header)); /* Metaindex */ log->metaindex = g_malloc (sizeof (struct rspamd_binlog_metaindex)); bzero (log->metaindex, sizeof (struct rspamd_binlog_metaindex)); /* Offset to metaindex */ log->metaindex->indexes[0] = sizeof (struct rspamd_binlog_metaindex) + sizeof (struct rspamd_binlog_header); if (write (log->fd, log->metaindex, sizeof (struct rspamd_binlog_metaindex)) == -1) { g_free (log->metaindex); msg_warn ("cannot write file %s, error %d, %s", log->filename, errno, strerror (errno)); unlock_file (log->fd, FALSE); return FALSE; } /* Alloc, write, mmap */ log->cur_idx = g_malloc (sizeof (struct rspamd_index_block)); bzero (log->cur_idx, sizeof (struct rspamd_index_block)); if (write (log->fd, log->cur_idx, sizeof (struct rspamd_index_block)) == -1) { g_free (log->cur_idx); msg_warn ("cannot write file %s, error %d, %s", log->filename, errno, strerror (errno)); unlock_file (log->fd, FALSE); return FALSE; } unlock_file (log->fd, FALSE); return TRUE; } static gboolean binlog_check_file (struct rspamd_binlog *log) { static gchar valid_magic[] = VALID_MAGIC, valid_version[] = VALID_VERSION; if (read (log->fd, &log->header, sizeof (struct rspamd_binlog_header)) != sizeof (struct rspamd_binlog_header)) { msg_warn ("cannot read file %s, error %d, %s", log->filename, errno, strerror (errno)); return FALSE; } /* Now check all fields */ if (memcmp (&log->header.magic, valid_magic, sizeof (valid_magic)) != 0 || memcmp (&log->header.version, valid_version, sizeof (valid_version)) != 0) { msg_warn ("cannot validate file %s"); return FALSE; } /* Now mmap metaindex and current index */ if (log->metaindex == NULL) { log->metaindex = g_malloc (sizeof (struct rspamd_binlog_metaindex)); } if ((read (log->fd, log->metaindex, sizeof (struct rspamd_binlog_metaindex))) != sizeof (struct rspamd_binlog_metaindex)) { msg_warn ("cannot read metaindex of file %s, error %d, %s", log->filename, errno, strerror (errno)); return FALSE; } /* Current index */ if (log->cur_idx == NULL) { log->cur_idx = g_malloc (sizeof (struct rspamd_index_block)); } if (lseek (log->fd, log->metaindex->indexes[log->metaindex->last_index], SEEK_SET) == -1) { msg_info ("cannot seek in file: %s, error: %s", log->filename, strerror (errno)); return FALSE; } if ((read (log->fd, log->cur_idx, sizeof (struct rspamd_index_block))) != sizeof (struct rspamd_index_block)) { msg_warn ("cannot read index in file %s, error %d, %s", log->filename, errno, strerror (errno)); return FALSE; } log->cur_seq = log->metaindex->last_index * BINLOG_IDX_LEN + log->cur_idx->last_index; log->cur_time = log->cur_idx->indexes[log->cur_idx->last_index].time; return TRUE; } static gboolean binlog_create (struct rspamd_binlog *log) { if ((log->fd = open (log->filename, O_RDWR | O_TRUNC | O_CREAT, S_IWUSR | S_IRUSR)) == -1) { msg_info ("cannot create file %s, error %d, %s", log->filename, errno, strerror (errno)); return FALSE; } return binlog_write_header (log); } static gboolean binlog_open_real (struct rspamd_binlog *log) { if ((log->fd = open (log->filename, O_RDWR)) == -1) { msg_info ("cannot open file %s, error %d, %s", log->filename, errno, strerror (errno)); return FALSE; } return binlog_check_file (log); } struct rspamd_binlog* binlog_open (memory_pool_t *pool, const gchar *path, time_t rotate_time, gint rotate_jitter) { struct rspamd_binlog *new; gint len = strlen (path); struct stat st; new = memory_pool_alloc0 (pool, sizeof (struct rspamd_binlog)); new->pool = pool; new->rotate_time = rotate_time; new->fd = -1; if (rotate_time) { new->rotate_jitter = g_random_int_range (0, rotate_jitter); } new->filename = memory_pool_alloc (pool, len + sizeof (BINLOG_SUFFIX)); rspamd_strlcpy (new->filename, path, len + 1); rspamd_strlcpy (new->filename + len, BINLOG_SUFFIX, sizeof (BINLOG_SUFFIX)); if (stat (new->filename, &st) == -1) { /* Check errno to check whether we should create this file */ if (errno != ENOENT) { msg_err ("cannot stat file: %s, error %s", new->filename, strerror (errno)); return NULL; } else { /* In case of ENOENT try to create binlog */ if (!binlog_create (new)) { return NULL; } } } else { /* Try to open binlog */ if (!binlog_open_real (new)) { return NULL; } } return new; } void binlog_close (struct rspamd_binlog *log) { if (log) { if (log->metaindex) { g_free (log->metaindex); } if (log->cur_idx) { g_free (log->cur_idx); } close (log->fd); } } static gboolean binlog_tree_callback (gpointer key, gpointer value, gpointer data) { token_node_t *node = key; struct rspamd_binlog *log = data; struct rspamd_binlog_element elt; elt.h1 = node->h1; elt.h2 = node->h2; elt.value = node->value; if (write (log->fd, &elt, sizeof (elt)) == -1) { msg_info ("cannot write token to file: %s, error: %s", log->filename, strerror (errno)); return TRUE; } return FALSE; } static gboolean write_binlog_tree (struct rspamd_binlog *log, GTree *nodes) { off_t seek; struct rspamd_binlog_index *idx; lock_file (log->fd, FALSE); log->cur_seq ++; /* Seek to end of file */ if ((seek = lseek (log->fd, 0, SEEK_END)) == -1) { unlock_file (log->fd, FALSE); msg_info ("cannot seek in file: %s, error: %s", log->filename, strerror (errno)); return FALSE; } /* Now write all nodes to file */ g_tree_foreach (nodes, binlog_tree_callback, (gpointer)log); /* Write index */ idx = &log->cur_idx->indexes[log->cur_idx->last_index]; idx->seek = seek; idx->time = (guint64)time (NULL); log->cur_time = idx->time; idx->len = g_tree_nnodes (nodes) * sizeof (struct rspamd_binlog_element); if (lseek (log->fd, log->metaindex->indexes[log->metaindex->last_index], SEEK_SET) == -1) { unlock_file (log->fd, FALSE); msg_info ("cannot seek in file: %s, error: %s, seek: %L, op: insert index", log->filename, strerror (errno), log->metaindex->indexes[log->metaindex->last_index]); return FALSE; } log->cur_idx->last_index ++; if (write (log->fd, log->cur_idx, sizeof (struct rspamd_index_block)) == -1) { unlock_file (log->fd, FALSE); msg_info ("cannot write index to file: %s, error: %s", log->filename, strerror (errno)); return FALSE; } unlock_file (log->fd, FALSE); return TRUE; } static gboolean create_new_metaindex_block (struct rspamd_binlog *log) { off_t seek; lock_file (log->fd, FALSE); log->metaindex->last_index ++; /* Seek to end of file */ if ((seek = lseek (log->fd, 0, SEEK_END)) == -1) { unlock_file (log->fd, FALSE); msg_info ("cannot seek in file: %s, error: %s", log->filename, strerror (errno)); return FALSE; } if (write (log->fd, log->cur_idx, sizeof (struct rspamd_index_block)) == -1) { unlock_file (log->fd, FALSE); g_free (log->cur_idx); msg_warn ("cannot write file %s, error %d, %s", log->filename, errno, strerror (errno)); return FALSE; } /* Offset to metaindex */ log->metaindex->indexes[log->metaindex->last_index] = seek; /* Overwrite all metaindexes */ if (lseek (log->fd, sizeof (struct rspamd_binlog_header), SEEK_SET) == -1) { unlock_file (log->fd, FALSE); msg_info ("cannot seek in file: %s, error: %s", log->filename, strerror (errno)); return FALSE; } if (write (log->fd, log->metaindex, sizeof (struct rspamd_binlog_metaindex)) == -1) { unlock_file (log->fd, FALSE); msg_info ("cannot write metaindex in file: %s, error: %s", log->filename, strerror (errno)); return FALSE; } bzero (log->cur_idx, sizeof (struct rspamd_index_block)); unlock_file (log->fd, FALSE); return TRUE; } static gboolean maybe_rotate_binlog (struct rspamd_binlog *log) { guint64 now = time (NULL); if (log->rotate_time && ((now - log->header.create_time) > (guint)(log->rotate_time + log->rotate_jitter))) { return TRUE; } return FALSE; } static gboolean rotate_binlog (struct rspamd_binlog *log) { gchar *backup_name; struct stat st; lock_file (log->fd, FALSE); /* Unmap mapped fragments */ if (log->metaindex) { g_free (log->metaindex); log->metaindex = NULL; } if (log->cur_idx) { g_free (log->cur_idx); log->cur_idx = NULL; } /* Format backup name */ backup_name = g_strdup_printf ("%s.%s", log->filename, BACKUP_SUFFIX); if (stat (backup_name, &st) != -1) { msg_info ("replace old %s", backup_name); unlink (backup_name); } rename (log->filename, backup_name); g_free (backup_name); /* XXX: maybe race condition here */ unlock_file (log->fd, FALSE); close (log->fd); return binlog_create (log); } gboolean binlog_insert (struct rspamd_binlog *log, GTree *nodes) { off_t seek; if (!log || !log->metaindex || !log->cur_idx || !nodes) { msg_info ("improperly opened binlog: %s", log != NULL ? log->filename : "unknown"); return FALSE; } if (maybe_rotate_binlog (log)) { if (!rotate_binlog (log)) { return FALSE; } } /* First of all try to place new tokens in current index */ if (log->cur_idx->last_index < BINLOG_IDX_LEN) { /* All is ok */ return write_binlog_tree (log, nodes); } /* Current index table is all busy, try to allocate new index */ /* Check metaindex free space */ if (log->metaindex->last_index < METAINDEX_LEN) { /* Create new index block */ if ((seek = lseek (log->fd, 0, SEEK_END)) == (off_t)-1) { msg_info ("cannot seek in file: %s, error: %s", log->filename, strerror (errno)); return FALSE; } if (!create_new_metaindex_block (log)) { return FALSE; } return write_binlog_tree (log, nodes); } /* All binlog is filled, we need to rotate it forcefully */ if (!rotate_binlog (log)) { return FALSE; } return write_binlog_tree (log, nodes); } gboolean binlog_sync (struct rspamd_binlog *log, guint64 from_rev, guint64 *from_time, GByteArray **rep) { guint32 metaindex_num; struct rspamd_index_block *idxb; struct rspamd_binlog_index *idx; gboolean idx_mapped = FALSE, res = TRUE, is_first = FALSE; if (!log || !log->metaindex || !log->cur_idx) { msg_info ("improperly opened binlog: %s", log != NULL ? log->filename : "unknown"); return FALSE; } if (*rep == NULL) { *rep = g_malloc (sizeof (GByteArray)); is_first = TRUE; } else { /* Unmap old fragment */ g_free ((*rep)->data); } if (from_rev == log->cur_seq) { /* Last record */ *rep = NULL; return FALSE; } else if (from_rev > log->cur_seq) { /* Slave has more actual copy, write this to log and abort sync */ msg_warn ("slave has more recent revision of statfile %s: %uL and our is: %uL", log->filename, from_rev, log->cur_seq); *rep = NULL; *from_time = 0; return FALSE; } metaindex_num = from_rev / BINLOG_IDX_LEN; /* First of all try to find this revision */ if (metaindex_num > log->metaindex->last_index) { return FALSE; } else if (metaindex_num != log->metaindex->last_index) { /* Need to remap index block */ lock_file (log->fd, FALSE); idxb = g_malloc (sizeof (struct rspamd_index_block)); idx_mapped = TRUE; if (lseek (log->fd, log->metaindex->indexes[metaindex_num], SEEK_SET) == -1) { unlock_file (log->fd, FALSE); msg_warn ("cannot seek file %s, error %d, %s", log->filename, errno, strerror (errno)); res = FALSE; goto end; } if ((read (log->fd, idxb, sizeof (struct rspamd_index_block))) != sizeof (struct rspamd_index_block)) { unlock_file (log->fd, FALSE); msg_warn ("cannot read index from file %s, error %d, %s", log->filename, errno, strerror (errno)); res = FALSE; goto end; } unlock_file (log->fd, FALSE); } else { idxb = log->cur_idx; } /* Now check specified index */ idx = &idxb->indexes[from_rev % BINLOG_IDX_LEN]; if (is_first && idx->time != *from_time) { res = FALSE; *from_time = 0; goto end; } else { *from_time = idx->time; } /* Now fill reply structure */ (*rep)->len = idx->len; /* Read result */ msg_info ("update from binlog '%s' from revision: %uL to revision %uL size is %uL", log->filename, from_rev, log->cur_seq, idx->len); if (lseek (log->fd, idx->seek, SEEK_SET) == -1) { msg_warn ("cannot seek file %s, error %d, %s", log->filename, errno, strerror (errno)); res = FALSE; goto end; } (*rep)->data = g_malloc (idx->len); if ((read (log->fd, (*rep)->data, idx->len)) != idx->len) { msg_warn ("cannot read file %s, error %d, %s", log->filename, errno, strerror (errno)); res = FALSE; goto end; } end: if (idx_mapped) { g_free (idxb); } return res; } static gboolean maybe_init_static () { if (!binlog_opened) { binlog_opened = g_hash_table_new (g_direct_hash, g_direct_equal); if (!binlog_opened) { return FALSE; } } if (!binlog_pool) { binlog_pool = memory_pool_new (memory_pool_get_size ()); if (!binlog_pool) { return FALSE; } } return TRUE; } gboolean maybe_write_binlog (struct classifier_config *ccf, struct statfile *st, stat_file_t *file, GTree *nodes) { struct rspamd_binlog *log; if (ccf == NULL) { return FALSE; } if (st == NULL || nodes == NULL || st->binlog == NULL || st->binlog->affinity != AFFINITY_MASTER) { return FALSE; } if (!maybe_init_static ()) { return FALSE; } if ((log = g_hash_table_lookup (binlog_opened, st)) == NULL) { if ((log = binlog_open (binlog_pool, st->path, st->binlog->rotate_time, st->binlog->rotate_time / 2)) != NULL) { g_hash_table_insert (binlog_opened, st, log); } else { return FALSE; } } if (binlog_insert (log, nodes)) { msg_info ("set new revision of statfile %s: %uL", st->symbol, log->cur_seq); (void)statfile_set_revision (file, log->cur_seq, log->cur_time); return TRUE; } return FALSE; } struct rspamd_binlog* get_binlog_by_statfile (struct statfile *st) { struct rspamd_binlog *log; if (st == NULL || st->binlog == NULL || st->binlog->affinity != AFFINITY_MASTER) { return NULL; } if (!maybe_init_static ()) { return NULL; } if ((log = g_hash_table_lookup (binlog_opened, st)) == NULL) { if ((log = binlog_open (binlog_pool, st->path, st->binlog->rotate_time, st->binlog->rotate_time / 2)) != NULL) { g_hash_table_insert (binlog_opened, st, log); } else { return NULL; } } return log; }