diff options
Diffstat (limited to 'src/libserver')
-rw-r--r-- | src/libserver/CMakeLists.txt | 2 | ||||
-rw-r--r-- | src/libserver/binlog.c | 688 | ||||
-rw-r--r-- | src/libserver/binlog.h | 102 | ||||
-rw-r--r-- | src/libserver/cfg_rcl.c | 36 | ||||
-rw-r--r-- | src/libserver/statfile_sync.c | 398 | ||||
-rw-r--r-- | src/libserver/statfile_sync.h | 16 |
6 files changed, 0 insertions, 1242 deletions
diff --git a/src/libserver/CMakeLists.txt b/src/libserver/CMakeLists.txt index 307611301..a35d740d9 100644 --- a/src/libserver/CMakeLists.txt +++ b/src/libserver/CMakeLists.txt @@ -1,6 +1,5 @@ # Librspamdserver SET(LIBRSPAMDSERVERSRC - binlog.c buffer.c cfg_utils.c cfg_rcl.c @@ -15,7 +14,6 @@ SET(LIBRSPAMDSERVERSRC roll_history.c spf.c statfile.c - statfile_sync.c symbols_cache.c task.c url.c diff --git a/src/libserver/binlog.c b/src/libserver/binlog.c deleted file mode 100644 index c48016339..000000000 --- a/src/libserver/binlog.c +++ /dev/null @@ -1,688 +0,0 @@ -/* - * Copyright (c) 2009-2012, Vsevolod Stakhov - * All rights reserved. - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are met: - * * Redistributions of source code must retain the above copyright - * notice, this list of conditions and the following disclaimer. - * * Redistributions in binary form must reproduce the above copyright - * notice, this list of conditions and the following disclaimer in the - * documentation and/or other materials provided with the distribution. - * - * THIS SOFTWARE IS PROVIDED BY AUTHOR ''AS IS'' AND ANY - * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED - * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE - * DISCLAIMED. IN NO EVENT SHALL AUTHOR BE LIABLE FOR ANY - * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES - * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; - * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND - * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT - * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS - * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - */ - -#include "config.h" -#include "binlog.h" -#include "cfg_file.h" -#include "tokenizers.h" - -#define BINLOG_SUFFIX ".binlog" -#define BACKUP_SUFFIX ".old" -#define VALID_MAGIC { 'r', 's', 'l' } -#define VALID_VERSION { '1', '0' } - -static GHashTable *binlog_opened = NULL; -static rspamd_mempool_t *binlog_pool = NULL; - -static gboolean -binlog_write_header (struct rspamd_binlog *log) -{ - struct rspamd_binlog_header header = { - .magic = VALID_MAGIC, - .version = VALID_VERSION, - .padding = { '\0', '\0' }, - }; - - header.create_time = time (NULL); - rspamd_file_lock (log->fd, FALSE); - - if (write (log->fd, &header, sizeof (struct rspamd_binlog_header)) == -1) { - msg_warn ("cannot write file %s, error %d, %s", - log->filename, - errno, - strerror (errno)); - return FALSE; - } - - - memcpy (&log->header, &header, sizeof (struct rspamd_binlog_header)); - - /* Metaindex */ - log->metaindex = g_malloc (sizeof (struct rspamd_binlog_metaindex)); - bzero (log->metaindex, sizeof (struct rspamd_binlog_metaindex)); - /* Offset to metaindex */ - log->metaindex->indexes[0] = sizeof (struct rspamd_binlog_metaindex) + - sizeof (struct rspamd_binlog_header); - - if (write (log->fd, log->metaindex, - sizeof (struct rspamd_binlog_metaindex)) == -1) { - g_free (log->metaindex); - msg_warn ("cannot write file %s, error %d, %s", - log->filename, - errno, - strerror (errno)); - rspamd_file_unlock (log->fd, FALSE); - return FALSE; - } - - /* Alloc, write, mmap */ - log->cur_idx = g_malloc (sizeof (struct rspamd_index_block)); - bzero (log->cur_idx, sizeof (struct rspamd_index_block)); - if (write (log->fd, log->cur_idx, - sizeof (struct rspamd_index_block)) == -1) { - g_free (log->cur_idx); - msg_warn ("cannot write file %s, error %d, %s", - log->filename, - errno, - strerror (errno)); - rspamd_file_unlock (log->fd, FALSE); - return FALSE; - } - - rspamd_file_unlock (log->fd, FALSE); - - return TRUE; -} - -static gboolean -binlog_check_file (struct rspamd_binlog *log) -{ - static gchar valid_magic[] = VALID_MAGIC, valid_version[] = VALID_VERSION; - - if (read (log->fd, &log->header, - sizeof (struct rspamd_binlog_header)) != - sizeof (struct rspamd_binlog_header)) { - msg_warn ("cannot read file %s, error %d, %s", - log->filename, - errno, - strerror (errno)); - return FALSE; - } - - /* Now check all fields */ - if (memcmp (&log->header.magic, valid_magic, sizeof (valid_magic)) != 0 || - memcmp (&log->header.version, valid_version, - sizeof (valid_version)) != 0) { - msg_warn ("cannot validate file %s"); - return FALSE; - } - /* Now mmap metaindex and current index */ - if (log->metaindex == NULL) { - log->metaindex = g_malloc (sizeof (struct rspamd_binlog_metaindex)); - } - if ((read (log->fd, log->metaindex, - sizeof (struct rspamd_binlog_metaindex))) != - sizeof (struct rspamd_binlog_metaindex)) { - msg_warn ("cannot read metaindex of file %s, error %d, %s", - log->filename, - errno, - strerror (errno)); - return FALSE; - } - /* Current index */ - if (log->cur_idx == NULL) { - log->cur_idx = g_malloc (sizeof (struct rspamd_index_block)); - } - if (lseek (log->fd, log->metaindex->indexes[log->metaindex->last_index], - SEEK_SET) == -1) { - msg_info ("cannot seek in file: %s, error: %s", - log->filename, - strerror (errno)); - return FALSE; - } - if ((read (log->fd, log->cur_idx, - sizeof (struct rspamd_index_block))) != - sizeof (struct rspamd_index_block)) { - msg_warn ("cannot read index in file %s, error %d, %s", - log->filename, - errno, - strerror (errno)); - return FALSE; - } - - log->cur_seq = log->metaindex->last_index * BINLOG_IDX_LEN + - log->cur_idx->last_index; - log->cur_time = log->cur_idx->indexes[log->cur_idx->last_index].time; - - return TRUE; - -} - -static gboolean -binlog_create (struct rspamd_binlog *log) -{ - if ((log->fd = - open (log->filename, O_RDWR | O_TRUNC | O_CREAT, - S_IWUSR | S_IRUSR)) == -1) { - msg_info ("cannot create file %s, error %d, %s", - log->filename, - errno, - strerror (errno)); - return FALSE; - } - - return binlog_write_header (log); -} - -static gboolean -binlog_open_real (struct rspamd_binlog *log) -{ - if ((log->fd = open (log->filename, O_RDWR)) == -1) { - msg_info ("cannot open file %s, error %d, %s", - log->filename, - errno, - strerror (errno)); - return FALSE; - } - - return binlog_check_file (log); -} - - -struct rspamd_binlog * -binlog_open (rspamd_mempool_t *pool, - const gchar *path, - time_t rotate_time, - gint rotate_jitter) -{ - struct rspamd_binlog *new; - gint len = strlen (path); - struct stat st; - - new = rspamd_mempool_alloc0 (pool, sizeof (struct rspamd_binlog)); - new->pool = pool; - new->rotate_time = rotate_time; - new->fd = -1; - - if (rotate_time) { - new->rotate_jitter = g_random_int_range (0, rotate_jitter); - } - - new->filename = rspamd_mempool_alloc (pool, len + sizeof (BINLOG_SUFFIX)); - rspamd_strlcpy (new->filename, path, len + 1); - rspamd_strlcpy (new->filename + len, BINLOG_SUFFIX, sizeof (BINLOG_SUFFIX)); - - if (stat (new->filename, &st) == -1) { - /* Check errno to check whether we should create this file */ - if (errno != ENOENT) { - msg_err ("cannot stat file: %s, error %s", new->filename, - strerror (errno)); - return NULL; - } - else { - /* In case of ENOENT try to create binlog */ - if (!binlog_create (new)) { - return NULL; - } - } - } - else { - /* Try to open binlog */ - if (!binlog_open_real (new)) { - return NULL; - } - } - - return new; -} - -void -binlog_close (struct rspamd_binlog *log) -{ - if (log) { - if (log->metaindex) { - g_free (log->metaindex); - } - if (log->cur_idx) { - g_free (log->cur_idx); - } - close (log->fd); - } -} - -static gboolean -binlog_tree_callback (gpointer key, gpointer value, gpointer data) -{ - token_node_t *node = key; - struct rspamd_binlog *log = data; - struct rspamd_binlog_element elt; - - elt.h1 = node->h1; - elt.h2 = node->h2; - elt.value = node->value; - - if (write (log->fd, &elt, sizeof (elt)) == -1) { - msg_info ("cannot write token to file: %s, error: %s", - log->filename, - strerror (errno)); - return TRUE; - } - - return FALSE; -} - -static gboolean -write_binlog_tree (struct rspamd_binlog *log, GTree *nodes) -{ - off_t seek; - struct rspamd_binlog_index *idx; - - rspamd_file_lock (log->fd, FALSE); - log->cur_seq++; - - /* Seek to end of file */ - if ((seek = lseek (log->fd, 0, SEEK_END)) == -1) { - rspamd_file_unlock (log->fd, FALSE); - msg_info ("cannot seek in file: %s, error: %s", - log->filename, - strerror (errno)); - return FALSE; - } - - /* Now write all nodes to file */ - g_tree_foreach (nodes, binlog_tree_callback, (gpointer)log); - - /* Write index */ - idx = &log->cur_idx->indexes[log->cur_idx->last_index]; - idx->seek = seek; - idx->time = (guint64)time (NULL); - log->cur_time = idx->time; - idx->len = g_tree_nnodes (nodes) * sizeof (struct rspamd_binlog_element); - if (lseek (log->fd, log->metaindex->indexes[log->metaindex->last_index], - SEEK_SET) == -1) { - rspamd_file_unlock (log->fd, FALSE); - msg_info ( - "cannot seek in file: %s, error: %s, seek: %L, op: insert index", - log->filename, - strerror (errno), - log->metaindex->indexes[log->metaindex->last_index]); - return FALSE; - } - log->cur_idx->last_index++; - if (write (log->fd, log->cur_idx, - sizeof (struct rspamd_index_block)) == -1) { - rspamd_file_unlock (log->fd, FALSE); - msg_info ("cannot write index to file: %s, error: %s", - log->filename, - strerror (errno)); - return FALSE; - } - - rspamd_file_unlock (log->fd, FALSE); - - return TRUE; -} - -static gboolean -create_new_metaindex_block (struct rspamd_binlog *log) -{ - off_t seek; - - rspamd_file_lock (log->fd, FALSE); - - log->metaindex->last_index++; - /* Seek to end of file */ - if ((seek = lseek (log->fd, 0, SEEK_END)) == -1) { - rspamd_file_unlock (log->fd, FALSE); - msg_info ("cannot seek in file: %s, error: %s", - log->filename, - strerror (errno)); - return FALSE; - } - if (write (log->fd, log->cur_idx, - sizeof (struct rspamd_index_block)) == -1) { - rspamd_file_unlock (log->fd, FALSE); - g_free (log->cur_idx); - msg_warn ("cannot write file %s, error %d, %s", - log->filename, - errno, - strerror (errno)); - return FALSE; - } - /* Offset to metaindex */ - log->metaindex->indexes[log->metaindex->last_index] = seek; - /* Overwrite all metaindexes */ - if (lseek (log->fd, sizeof (struct rspamd_binlog_header), SEEK_SET) == -1) { - rspamd_file_unlock (log->fd, FALSE); - msg_info ("cannot seek in file: %s, error: %s", - log->filename, - strerror (errno)); - return FALSE; - } - if (write (log->fd, log->metaindex, - sizeof (struct rspamd_binlog_metaindex)) == -1) { - rspamd_file_unlock (log->fd, FALSE); - msg_info ("cannot write metaindex in file: %s, error: %s", - log->filename, - strerror (errno)); - return FALSE; - } - bzero (log->cur_idx, sizeof (struct rspamd_index_block)); - rspamd_file_unlock (log->fd, FALSE); - - return TRUE; -} - -static gboolean -maybe_rotate_binlog (struct rspamd_binlog *log) -{ - guint64 now = time (NULL); - - if (log->rotate_time && - ((now - log->header.create_time) > - (guint)(log->rotate_time + log->rotate_jitter))) { - return TRUE; - } - return FALSE; -} - -static gboolean -rotate_binlog (struct rspamd_binlog *log) -{ - gchar *backup_name; - struct stat st; - - rspamd_file_lock (log->fd, FALSE); - - /* Unmap mapped fragments */ - if (log->metaindex) { - g_free (log->metaindex); - log->metaindex = NULL; - } - if (log->cur_idx) { - g_free (log->cur_idx); - log->cur_idx = NULL; - } - /* Format backup name */ - backup_name = g_strdup_printf ("%s.%s", log->filename, BACKUP_SUFFIX); - - if (stat (backup_name, &st) != -1) { - msg_info ("replace old %s", backup_name); - unlink (backup_name); - } - - rename (log->filename, backup_name); - g_free (backup_name); - - /* XXX: maybe race condition here */ - rspamd_file_unlock (log->fd, FALSE); - close (log->fd); - - return binlog_create (log); - -} - -gboolean -binlog_insert (struct rspamd_binlog *log, GTree *nodes) -{ - off_t seek; - - if (!log || !log->metaindex || !log->cur_idx || !nodes) { - msg_info ("improperly opened binlog: %s", - log != NULL ? log->filename : "unknown"); - return FALSE; - } - - if (maybe_rotate_binlog (log)) { - if (!rotate_binlog (log)) { - return FALSE; - } - } - /* First of all try to place new tokens in current index */ - if (log->cur_idx->last_index < BINLOG_IDX_LEN) { - /* All is ok */ - return write_binlog_tree (log, nodes); - } - /* Current index table is all busy, try to allocate new index */ - - /* Check metaindex free space */ - if (log->metaindex->last_index < METAINDEX_LEN) { - /* Create new index block */ - if ((seek = lseek (log->fd, 0, SEEK_END)) == (off_t)-1) { - msg_info ("cannot seek in file: %s, error: %s", - log->filename, - strerror (errno)); - return FALSE; - } - if (!create_new_metaindex_block (log)) { - return FALSE; - } - return write_binlog_tree (log, nodes); - } - - /* All binlog is filled, we need to rotate it forcefully */ - if (!rotate_binlog (log)) { - return FALSE; - } - - return write_binlog_tree (log, nodes); -} - -gboolean -binlog_sync (struct rspamd_binlog *log, - guint64 from_rev, - guint64 *from_time, - GByteArray **rep) -{ - guint32 metaindex_num; - struct rspamd_index_block *idxb; - struct rspamd_binlog_index *idx; - gboolean idx_mapped = FALSE, res = TRUE, is_first = FALSE; - - if (!log || !log->metaindex || !log->cur_idx) { - msg_info ("improperly opened binlog: %s", - log != NULL ? log->filename : "unknown"); - return FALSE; - } - - if (*rep == NULL) { - *rep = g_malloc (sizeof (GByteArray)); - is_first = TRUE; - } - else { - /* Unmap old fragment */ - g_free ((*rep)->data); - } - - if (from_rev == log->cur_seq) { - /* Last record */ - *rep = NULL; - return FALSE; - } - else if (from_rev > log->cur_seq) { - /* Slave has more actual copy, write this to log and abort sync */ - msg_warn ( - "slave has more recent revision of statfile %s: %uL and our is: %uL", - log->filename, - from_rev, - log->cur_seq); - *rep = NULL; - *from_time = 0; - return FALSE; - } - - metaindex_num = from_rev / BINLOG_IDX_LEN; - /* First of all try to find this revision */ - if (metaindex_num > log->metaindex->last_index) { - return FALSE; - } - else if (metaindex_num != log->metaindex->last_index) { - /* Need to remap index block */ - rspamd_file_lock (log->fd, FALSE); - idxb = g_malloc (sizeof (struct rspamd_index_block)); - idx_mapped = TRUE; - if (lseek (log->fd, log->metaindex->indexes[metaindex_num], - SEEK_SET) == -1) { - rspamd_file_unlock (log->fd, FALSE); - msg_warn ("cannot seek file %s, error %d, %s", - log->filename, - errno, - strerror (errno)); - res = FALSE; - goto end; - } - if ((read (log->fd, idxb, - sizeof (struct rspamd_index_block))) != - sizeof (struct rspamd_index_block)) { - rspamd_file_unlock (log->fd, FALSE); - msg_warn ("cannot read index from file %s, error %d, %s", - log->filename, - errno, - strerror (errno)); - res = FALSE; - goto end; - } - rspamd_file_unlock (log->fd, FALSE); - } - else { - idxb = log->cur_idx; - } - /* Now check specified index */ - idx = &idxb->indexes[from_rev % BINLOG_IDX_LEN]; - if (is_first && idx->time != *from_time) { - res = FALSE; - *from_time = 0; - goto end; - } - else { - *from_time = idx->time; - } - - /* Now fill reply structure */ - (*rep)->len = idx->len; - /* Read result */ - msg_info ( - "update from binlog '%s' from revision: %uL to revision %uL size is %uL", - log->filename, - from_rev, - log->cur_seq, - idx->len); - if (lseek (log->fd, idx->seek, SEEK_SET) == -1) { - msg_warn ("cannot seek file %s, error %d, %s", - log->filename, - errno, - strerror (errno)); - res = FALSE; - goto end; - } - - (*rep)->data = g_malloc (idx->len); - if ((read (log->fd, (*rep)->data, idx->len)) != (ssize_t)idx->len) { - msg_warn ("cannot read file %s, error %d, %s", - log->filename, - errno, - strerror (errno)); - res = FALSE; - goto end; - } - -end: - if (idx_mapped) { - g_free (idxb); - } - - return res; -} - -static gboolean -maybe_init_static (void) -{ - if (!binlog_opened) { - binlog_opened = g_hash_table_new (g_direct_hash, g_direct_equal); - if (!binlog_opened) { - return FALSE; - } - } - - if (!binlog_pool) { - binlog_pool = rspamd_mempool_new (rspamd_mempool_suggest_size ()); - if (!binlog_pool) { - return FALSE; - } - } - - return TRUE; -} - -gboolean -maybe_write_binlog (struct rspamd_classifier_config *ccf, - struct rspamd_statfile_config *st, - stat_file_t *file, - GTree *nodes) -{ - struct rspamd_binlog *log; - - if (ccf == NULL) { - return FALSE; - } - - - if (st == NULL || nodes == NULL || st->binlog == NULL || - st->binlog->affinity != AFFINITY_MASTER) { - return FALSE; - } - - if (!maybe_init_static ()) { - return FALSE; - } - - if ((log = g_hash_table_lookup (binlog_opened, st)) == NULL) { - if ((log = - binlog_open (binlog_pool, st->path, st->binlog->rotate_time, - st->binlog->rotate_time / 2)) != NULL) { - g_hash_table_insert (binlog_opened, st, log); - } - else { - return FALSE; - } - } - - if (binlog_insert (log, nodes)) { - msg_info ("set new revision of statfile %s: %uL", - st->symbol, - log->cur_seq); - (void)statfile_set_revision (file, log->cur_seq, log->cur_time); - return TRUE; - } - - return FALSE; -} - -struct rspamd_binlog * -get_binlog_by_statfile (struct rspamd_statfile_config *st) -{ - struct rspamd_binlog *log; - - if (st == NULL || st->binlog == NULL || st->binlog->affinity != - AFFINITY_MASTER) { - return NULL; - } - - if (!maybe_init_static ()) { - return NULL; - } - - if ((log = g_hash_table_lookup (binlog_opened, st)) == NULL) { - if ((log = - binlog_open (binlog_pool, st->path, st->binlog->rotate_time, - st->binlog->rotate_time / 2)) != NULL) { - g_hash_table_insert (binlog_opened, st, log); - } - else { - return NULL; - } - } - - return log; -} diff --git a/src/libserver/binlog.h b/src/libserver/binlog.h deleted file mode 100644 index c18a201f2..000000000 --- a/src/libserver/binlog.h +++ /dev/null @@ -1,102 +0,0 @@ -#ifndef RSPAMD_BINLOG_H -#define RSPAMD_BINLOG_H - -#include "config.h" -#include "main.h" -#include "statfile.h" - -/* How much records are in a single index */ -#define BINLOG_IDX_LEN 200 -#define METAINDEX_LEN 1024 - -/* Assume 8 bytes words */ -struct rspamd_binlog_header { - gchar magic[3]; - gchar version[2]; - gchar padding[3]; - guint64 create_time; -}; - -struct rspamd_binlog_index { - guint64 time; - guint64 seek; - guint32 len; -}; - -struct rspamd_index_block { - struct rspamd_binlog_index indexes[BINLOG_IDX_LEN]; - guint32 last_index; -}; - -struct rspamd_binlog_metaindex { - guint64 indexes[METAINDEX_LEN]; - guint64 last_index; -}; - -struct rspamd_binlog_element { - guint32 h1; - guint32 h2; - float value; -} __attribute__((__packed__)); - -struct rspamd_binlog { - gchar *filename; - time_t rotate_time; - gint rotate_jitter; - guint64 cur_seq; - guint64 cur_time; - gint fd; - rspamd_mempool_t *pool; - - struct rspamd_binlog_header header; - struct rspamd_binlog_metaindex *metaindex; - struct rspamd_index_block *cur_idx; -}; - -struct rspamd_classifier_config; - -/* - * Open binlog at specified path with specified rotate params - */ -struct rspamd_binlog * binlog_open (rspamd_mempool_t *pool, - const gchar *path, - time_t rotate_time, - gint rotate_jitter); - -/* - * Get and open binlog for specified statfile - */ -struct rspamd_binlog * get_binlog_by_statfile (struct rspamd_statfile_config *st); - -/* - * Close binlog - */ -void binlog_close (struct rspamd_binlog *log); - -/* - * Insert new nodes inside binlog - */ -gboolean binlog_insert (struct rspamd_binlog *log, GTree *nodes); - -/* - * Sync binlog from specified revision - * @param log binlog structure - * @param from_rev from revision - * @param from_time from time - * @param rep a portion of changes for revision is stored here - * @return TRUE if there are more revisions to get and FALSE if synchronization is complete - */ -gboolean binlog_sync (struct rspamd_binlog *log, - guint64 from_rev, - guint64 *from_time, - GByteArray **rep); - -/* - * Conditional write to a binlog for specified statfile - */ -gboolean maybe_write_binlog (struct rspamd_classifier_config *ccf, - struct rspamd_statfile_config *st, - stat_file_t *file, - GTree *nodes); - -#endif diff --git a/src/libserver/cfg_rcl.c b/src/libserver/cfg_rcl.c index 44db06a0b..6c77292aa 100644 --- a/src/libserver/cfg_rcl.c +++ b/src/libserver/cfg_rcl.c @@ -866,42 +866,6 @@ rspamd_rcl_statfile_handler (struct rspamd_config *cfg, const ucl_object_t *obj, st = rspamd_config_new_statfile (cfg, NULL); -#if 0 - const gchar *data; - gdouble binlog_rotate; - val = ucl_object_find_key (obj, "binlog"); - if (val != NULL && ucl_object_tostring_safe (val, &data)) { - if (st->binlog == NULL) { - st->binlog = - rspamd_mempool_alloc0 (cfg->cfg_pool, - sizeof (struct statfile_binlog_params)); - } - if (g_ascii_strcasecmp (data, "master") == 0) { - st->binlog->affinity = AFFINITY_MASTER; - } - else if (g_ascii_strcasecmp (data, "slave") == 0) { - st->binlog->affinity = AFFINITY_SLAVE; - } - else { - st->binlog->affinity = AFFINITY_NONE; - } - /* Parse remaining binlog attributes */ - val = ucl_object_find_key (obj, "binlog_rotate"); - if (val != NULL && ucl_object_todouble_safe (val, &binlog_rotate)) { - st->binlog->rotate_time = binlog_rotate; - } - val = ucl_object_find_key (obj, "binlog_master"); - if (val != NULL && ucl_object_tostring_safe (val, &data)) { - if (!rspamd_parse_host_port (cfg->cfg_pool, data, - &st->binlog->master_addr, &st->binlog->master_port)) { - msg_err ("cannot parse master address: %s", data); - return FALSE; - } - } - } -#endif - - if (rspamd_rcl_section_parse_defaults (section, cfg, obj, st, err)) { ccf->statfiles = g_list_prepend (ccf->statfiles, st); if (st->label != NULL) { diff --git a/src/libserver/statfile_sync.c b/src/libserver/statfile_sync.c deleted file mode 100644 index 62f848059..000000000 --- a/src/libserver/statfile_sync.c +++ /dev/null @@ -1,398 +0,0 @@ -/* - * Copyright (c) 2009-2012, Vsevolod Stakhov - * All rights reserved. - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are met: - * * Redistributions of source code must retain the above copyright - * notice, this list of conditions and the following disclaimer. - * * Redistributions in binary form must reproduce the above copyright - * notice, this list of conditions and the following disclaimer in the - * documentation and/or other materials provided with the distribution. - * - * THIS SOFTWARE IS PROVIDED BY AUTHOR ''AS IS'' AND ANY - * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED - * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE - * DISCLAIMED. IN NO EVENT SHALL AUTHOR BE LIABLE FOR ANY - * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES - * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; - * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND - * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT - * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS - * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - */ - -#include "config.h" -#include "cfg_file.h" -#include "tokenizers.h" -#include "classifiers.h" -#include "statfile.h" -#include "binlog.h" -#include "buffer.h" -#include "statfile_sync.h" - -enum rspamd_sync_state { - SYNC_STATE_GREETING, - SYNC_STATE_READ_LINE, - SYNC_STATE_READ_REV, - SYNC_STATE_QUIT, -}; - -/* Context of sync process */ -struct rspamd_sync_ctx { - struct rspamd_statfile_config *st; - stat_file_t *real_statfile; - statfile_pool_t *pool; - rspamd_io_dispatcher_t *dispatcher; - struct event_base *ev_base; - - struct event tm_ev; - - struct timeval interval; - struct timeval io_tv; - gint sock; - guint32 timeout; - guint32 sync_interval; - enum rspamd_sync_state state; - gboolean is_busy; - - guint64 new_rev; - guint64 new_time; - guint64 new_len; -}; - -static void -log_next_sync (const gchar *symbol, time_t delay) -{ - gchar outstr[200]; - time_t t; - struct tm *tmp; - gint r; - - t = time (NULL); - t += delay; - tmp = localtime (&t); - - if (tmp) { - r = rspamd_snprintf (outstr, - sizeof (outstr), - "statfile_sync: next sync of %s at ", - symbol); - if ((r = strftime (outstr + r, sizeof(outstr) - r, "%T", tmp)) != 0) { - msg_info (outstr); - } - } -} - -static gboolean -parse_revision_line (struct rspamd_sync_ctx *ctx, rspamd_fstring_t *in) -{ - guint i, state = 0; - gchar *p, *c, numbuf[sizeof("18446744073709551615")]; - guint64 *val; - - /* First of all try to find END line */ - if (in->len >= sizeof ("END") - 1 && - memcmp (in->begin, "END", sizeof ("END") - 1) == 0) { - ctx->state = SYNC_STATE_QUIT; - ctx->is_busy = FALSE; - return TRUE; - } - - /* Next check for error line */ - if (in->len >= sizeof ("FAIL") - 1 && - memcmp (in->begin, "FAIL", sizeof ("FAIL") - 1) == 0) { - ctx->state = SYNC_STATE_QUIT; - ctx->is_busy = FALSE; - return TRUE; - } - - /* Now try to extract 3 numbers from string: revision, time and length */ - p = in->begin; - val = &ctx->new_rev; - for (i = 0; i < in->len; i++, p++) { - if (g_ascii_isspace (*p) || i == in->len - 1) { - if (state == 1) { - if (i == in->len - 1) { - /* One more character */ - p++; - } - rspamd_strlcpy (numbuf, c, MIN (p - c + 1, - (gint)sizeof (numbuf))); - errno = 0; - *val = strtoull (numbuf, NULL, 10); - if (errno != 0) { - msg_info ("cannot parse number %s", strerror (errno)); - return FALSE; - } - state = 2; - } - } - else { - if (state == 0) { - c = p; - state = 1; - } - else if (state == 2) { - if (val == &ctx->new_rev) { - val = &ctx->new_time; - } - else if (val == &ctx->new_time) { - val = &ctx->new_len; - } - c = p; - state = 1; - } - } - } - - /* Current value must be len value and its value must not be 0 */ - return ((val == &ctx->new_len)); -} - -static gboolean -read_blocks (struct rspamd_sync_ctx *ctx, rspamd_fstring_t *in) -{ - struct rspamd_binlog_element *elt; - guint i; - - statfile_pool_lock_file (ctx->pool, ctx->real_statfile); - elt = (struct rspamd_binlog_element *)in->begin; - for (i = 0; i < in->len / sizeof (struct rspamd_binlog_element); i++, - elt++) { - statfile_pool_set_block (ctx->pool, - ctx->real_statfile, - elt->h1, - elt->h2, - ctx->new_time, - elt->value); - } - statfile_pool_unlock_file (ctx->pool, ctx->real_statfile); - - return TRUE; -} - -static gboolean -sync_read (rspamd_fstring_t * in, void *arg) -{ - struct rspamd_sync_ctx *ctx = arg; - gchar buf[256]; - guint64 rev = 0; - time_t ti = 0; - - if (in->len == 0) { - /* Skip empty lines */ - return TRUE; - } - switch (ctx->state) { - case SYNC_STATE_GREETING: - /* Skip greeting line and write sync command */ - /* Write initial data */ - statfile_get_revision (ctx->real_statfile, &rev, &ti); - rev = rspamd_snprintf (buf, - sizeof (buf), - "sync %s %uL %T" CRLF, - ctx->st->symbol, - rev, - ti); - ctx->state = SYNC_STATE_READ_LINE; - return rspamd_dispatcher_write (ctx->dispatcher, buf, rev, FALSE, - FALSE); - break; - case SYNC_STATE_READ_LINE: - /* Try to parse line from server */ - if (!parse_revision_line (ctx, in)) { - msg_info ("cannot parse line of length %z: '%*s'", - in->len, - (gint)in->len, - in->begin); - close (ctx->sock); - rspamd_remove_dispatcher (ctx->dispatcher); - ctx->is_busy = FALSE; - return FALSE; - } - else if (ctx->state != SYNC_STATE_QUIT) { - if (ctx->new_len > 0) { - ctx->state = SYNC_STATE_READ_REV; - rspamd_set_dispatcher_policy (ctx->dispatcher, - BUFFER_CHARACTER, - ctx->new_len); - } - } - else { - /* Quit this session */ - msg_info ("sync ended for: %s", ctx->st->symbol); - close (ctx->sock); - rspamd_remove_dispatcher (ctx->dispatcher); - ctx->is_busy = FALSE; - /* Immediately return from callback */ - return FALSE; - } - break; - case SYNC_STATE_READ_REV: - /* In now contains all blocks of specified revision, so we can read them directly */ - if (!read_blocks (ctx, in)) { - msg_info ("cannot read blocks"); - close (ctx->sock); - rspamd_remove_dispatcher (ctx->dispatcher); - ctx->is_busy = FALSE; - return FALSE; - } - statfile_set_revision (ctx->real_statfile, ctx->new_rev, ctx->new_time); - msg_info ("set new revision: %uL, readed %z bytes", - ctx->new_rev, - in->len); - /* Now try to read other revision or END line */ - ctx->state = SYNC_STATE_READ_LINE; - rspamd_set_dispatcher_policy (ctx->dispatcher, BUFFER_LINE, 0); - break; - case SYNC_STATE_QUIT: - close (ctx->sock); - rspamd_remove_dispatcher (ctx->dispatcher); - ctx->is_busy = FALSE; - return FALSE; - } - - return TRUE; -} - -static void -sync_err (GError *err, void *arg) -{ - struct rspamd_sync_ctx *ctx = arg; - - msg_info ("abnormally closing connection, error: %s", err->message); - ctx->is_busy = FALSE; - close (ctx->sock); - rspamd_remove_dispatcher (ctx->dispatcher); -} - - -static void -sync_timer_callback (gint fd, short what, void *ud) -{ - struct rspamd_sync_ctx *ctx = ud; - guint32 jittered_interval; - - /* Plan new event */ - evtimer_del (&ctx->tm_ev); - /* Add some jittering for synchronization */ - jittered_interval = g_random_int_range (ctx->sync_interval, - ctx->sync_interval * 2); - msec_to_tv (jittered_interval, &ctx->interval); - evtimer_add (&ctx->tm_ev, &ctx->interval); - log_next_sync (ctx->st->symbol, ctx->interval.tv_sec); - - if (ctx->is_busy) { - /* Sync is in progress */ - msg_info ("syncronization process is in progress, do not start new one"); - return; - } - - if ((ctx->sock = - rspamd_socket (ctx->st->binlog->master_addr, - ctx->st->binlog->master_port, - SOCK_STREAM, TRUE, FALSE, TRUE)) == -1) { - msg_info ("cannot connect to %s", ctx->st->binlog->master_addr); - return; - } - /* Now create and activate dispatcher */ - msec_to_tv (ctx->timeout, &ctx->io_tv); - ctx->dispatcher = rspamd_create_dispatcher (ctx->ev_base, - ctx->sock, - BUFFER_LINE, - sync_read, - NULL, - sync_err, - &ctx->io_tv, - ctx); - - ctx->state = SYNC_STATE_GREETING; - ctx->is_busy = TRUE; - - msg_info ("starting synchronization of %s", ctx->st->symbol); - -} - -static gboolean -add_statfile_watch (statfile_pool_t *pool, - struct rspamd_statfile_config *st, - struct rspamd_config *cfg, - struct event_base *ev_base) -{ - struct rspamd_sync_ctx *ctx; - guint32 jittered_interval; - - if (st->binlog->master_addr != NULL) { - ctx = - rspamd_mempool_alloc (pool->pool, sizeof (struct rspamd_sync_ctx)); - ctx->st = st; - ctx->timeout = cfg->statfile_sync_timeout; - ctx->sync_interval = cfg->statfile_sync_interval; - ctx->ev_base = ev_base; - /* Add some jittering for synchronization */ - jittered_interval = g_random_int_range (ctx->sync_interval, - ctx->sync_interval * 2); - msec_to_tv (jittered_interval, &ctx->interval); - /* Open statfile and attach it to pool */ - if ((ctx->real_statfile = - statfile_pool_is_open (pool, st->path)) == NULL) { - if ((ctx->real_statfile = - statfile_pool_open (pool, st->path, st->size, FALSE)) == NULL) { - msg_warn ("cannot open %s", st->path); - if (statfile_pool_create (pool, st->path, st->size) == -1) { - msg_err ("cannot create statfile %s", st->path); - return FALSE; - } - ctx->real_statfile = statfile_pool_open (pool, - st->path, - st->size, - FALSE); - } - } - /* Now plan event for it's future executing */ - evtimer_set (&ctx->tm_ev, sync_timer_callback, ctx); - event_base_set (ctx->ev_base, &ctx->tm_ev); - evtimer_add (&ctx->tm_ev, &ctx->interval); - log_next_sync (st->symbol, ctx->interval.tv_sec); - } - else { - msg_err ("cannot add statfile watch for statfile %s: no master defined", - st->symbol); - return FALSE; - } - - return TRUE; -} - -gboolean -start_statfile_sync (statfile_pool_t *pool, - struct rspamd_config *cfg, - struct event_base *ev_base) -{ - GList *cur, *l; - struct rspamd_classifier_config *cl; - struct rspamd_statfile_config *st; - - /* - * First of all walk through all classifiers and find those statfiles - * for which we should do sync (slave affinity) - */ - cur = cfg->classifiers; - while (cur) { - cl = cur->data; - l = cl->statfiles; - while (l) { - st = l->data; - if (st->binlog != NULL && st->binlog->affinity == AFFINITY_SLAVE) { - if (!add_statfile_watch (pool, st, cfg, ev_base)) { - return FALSE; - } - } - l = g_list_next (l); - } - cur = g_list_next (cur); - } - - return TRUE; -} diff --git a/src/libserver/statfile_sync.h b/src/libserver/statfile_sync.h deleted file mode 100644 index be2550d7b..000000000 --- a/src/libserver/statfile_sync.h +++ /dev/null @@ -1,16 +0,0 @@ -#ifndef RSPAMD_STATFILE_SYNC_H -#define RSPAMD_STATFILE_SYNC_H - -#include "config.h" -#include "main.h" -#include "statfile.h" -#include "cfg_file.h" - -/* - * Start synchronization of statfiles. Must be called after event_init as it adds events - */ -gboolean start_statfile_sync (statfile_pool_t *pool, - struct rspamd_config *cfg, - struct event_base *ev_base); - -#endif |