summaryrefslogtreecommitdiffstats
path: root/src/libserver
diff options
context:
space:
mode:
Diffstat (limited to 'src/libserver')
-rw-r--r--src/libserver/CMakeLists.txt2
-rw-r--r--src/libserver/binlog.c688
-rw-r--r--src/libserver/binlog.h102
-rw-r--r--src/libserver/cfg_rcl.c36
-rw-r--r--src/libserver/statfile_sync.c398
-rw-r--r--src/libserver/statfile_sync.h16
6 files changed, 0 insertions, 1242 deletions
diff --git a/src/libserver/CMakeLists.txt b/src/libserver/CMakeLists.txt
index 307611301..a35d740d9 100644
--- a/src/libserver/CMakeLists.txt
+++ b/src/libserver/CMakeLists.txt
@@ -1,6 +1,5 @@
# Librspamdserver
SET(LIBRSPAMDSERVERSRC
- binlog.c
buffer.c
cfg_utils.c
cfg_rcl.c
@@ -15,7 +14,6 @@ SET(LIBRSPAMDSERVERSRC
roll_history.c
spf.c
statfile.c
- statfile_sync.c
symbols_cache.c
task.c
url.c
diff --git a/src/libserver/binlog.c b/src/libserver/binlog.c
deleted file mode 100644
index c48016339..000000000
--- a/src/libserver/binlog.c
+++ /dev/null
@@ -1,688 +0,0 @@
-/*
- * Copyright (c) 2009-2012, Vsevolod Stakhov
- * All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are met:
- * * Redistributions of source code must retain the above copyright
- * notice, this list of conditions and the following disclaimer.
- * * Redistributions in binary form must reproduce the above copyright
- * notice, this list of conditions and the following disclaimer in the
- * documentation and/or other materials provided with the distribution.
- *
- * THIS SOFTWARE IS PROVIDED BY AUTHOR ''AS IS'' AND ANY
- * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
- * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
- * DISCLAIMED. IN NO EVENT SHALL AUTHOR BE LIABLE FOR ANY
- * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
- * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
- * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
- * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
- * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
- * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
- */
-
-#include "config.h"
-#include "binlog.h"
-#include "cfg_file.h"
-#include "tokenizers.h"
-
-#define BINLOG_SUFFIX ".binlog"
-#define BACKUP_SUFFIX ".old"
-#define VALID_MAGIC { 'r', 's', 'l' }
-#define VALID_VERSION { '1', '0' }
-
-static GHashTable *binlog_opened = NULL;
-static rspamd_mempool_t *binlog_pool = NULL;
-
-static gboolean
-binlog_write_header (struct rspamd_binlog *log)
-{
- struct rspamd_binlog_header header = {
- .magic = VALID_MAGIC,
- .version = VALID_VERSION,
- .padding = { '\0', '\0' },
- };
-
- header.create_time = time (NULL);
- rspamd_file_lock (log->fd, FALSE);
-
- if (write (log->fd, &header, sizeof (struct rspamd_binlog_header)) == -1) {
- msg_warn ("cannot write file %s, error %d, %s",
- log->filename,
- errno,
- strerror (errno));
- return FALSE;
- }
-
-
- memcpy (&log->header, &header, sizeof (struct rspamd_binlog_header));
-
- /* Metaindex */
- log->metaindex = g_malloc (sizeof (struct rspamd_binlog_metaindex));
- bzero (log->metaindex, sizeof (struct rspamd_binlog_metaindex));
- /* Offset to metaindex */
- log->metaindex->indexes[0] = sizeof (struct rspamd_binlog_metaindex) +
- sizeof (struct rspamd_binlog_header);
-
- if (write (log->fd, log->metaindex,
- sizeof (struct rspamd_binlog_metaindex)) == -1) {
- g_free (log->metaindex);
- msg_warn ("cannot write file %s, error %d, %s",
- log->filename,
- errno,
- strerror (errno));
- rspamd_file_unlock (log->fd, FALSE);
- return FALSE;
- }
-
- /* Alloc, write, mmap */
- log->cur_idx = g_malloc (sizeof (struct rspamd_index_block));
- bzero (log->cur_idx, sizeof (struct rspamd_index_block));
- if (write (log->fd, log->cur_idx,
- sizeof (struct rspamd_index_block)) == -1) {
- g_free (log->cur_idx);
- msg_warn ("cannot write file %s, error %d, %s",
- log->filename,
- errno,
- strerror (errno));
- rspamd_file_unlock (log->fd, FALSE);
- return FALSE;
- }
-
- rspamd_file_unlock (log->fd, FALSE);
-
- return TRUE;
-}
-
-static gboolean
-binlog_check_file (struct rspamd_binlog *log)
-{
- static gchar valid_magic[] = VALID_MAGIC, valid_version[] = VALID_VERSION;
-
- if (read (log->fd, &log->header,
- sizeof (struct rspamd_binlog_header)) !=
- sizeof (struct rspamd_binlog_header)) {
- msg_warn ("cannot read file %s, error %d, %s",
- log->filename,
- errno,
- strerror (errno));
- return FALSE;
- }
-
- /* Now check all fields */
- if (memcmp (&log->header.magic, valid_magic, sizeof (valid_magic)) != 0 ||
- memcmp (&log->header.version, valid_version,
- sizeof (valid_version)) != 0) {
- msg_warn ("cannot validate file %s");
- return FALSE;
- }
- /* Now mmap metaindex and current index */
- if (log->metaindex == NULL) {
- log->metaindex = g_malloc (sizeof (struct rspamd_binlog_metaindex));
- }
- if ((read (log->fd, log->metaindex,
- sizeof (struct rspamd_binlog_metaindex))) !=
- sizeof (struct rspamd_binlog_metaindex)) {
- msg_warn ("cannot read metaindex of file %s, error %d, %s",
- log->filename,
- errno,
- strerror (errno));
- return FALSE;
- }
- /* Current index */
- if (log->cur_idx == NULL) {
- log->cur_idx = g_malloc (sizeof (struct rspamd_index_block));
- }
- if (lseek (log->fd, log->metaindex->indexes[log->metaindex->last_index],
- SEEK_SET) == -1) {
- msg_info ("cannot seek in file: %s, error: %s",
- log->filename,
- strerror (errno));
- return FALSE;
- }
- if ((read (log->fd, log->cur_idx,
- sizeof (struct rspamd_index_block))) !=
- sizeof (struct rspamd_index_block)) {
- msg_warn ("cannot read index in file %s, error %d, %s",
- log->filename,
- errno,
- strerror (errno));
- return FALSE;
- }
-
- log->cur_seq = log->metaindex->last_index * BINLOG_IDX_LEN +
- log->cur_idx->last_index;
- log->cur_time = log->cur_idx->indexes[log->cur_idx->last_index].time;
-
- return TRUE;
-
-}
-
-static gboolean
-binlog_create (struct rspamd_binlog *log)
-{
- if ((log->fd =
- open (log->filename, O_RDWR | O_TRUNC | O_CREAT,
- S_IWUSR | S_IRUSR)) == -1) {
- msg_info ("cannot create file %s, error %d, %s",
- log->filename,
- errno,
- strerror (errno));
- return FALSE;
- }
-
- return binlog_write_header (log);
-}
-
-static gboolean
-binlog_open_real (struct rspamd_binlog *log)
-{
- if ((log->fd = open (log->filename, O_RDWR)) == -1) {
- msg_info ("cannot open file %s, error %d, %s",
- log->filename,
- errno,
- strerror (errno));
- return FALSE;
- }
-
- return binlog_check_file (log);
-}
-
-
-struct rspamd_binlog *
-binlog_open (rspamd_mempool_t *pool,
- const gchar *path,
- time_t rotate_time,
- gint rotate_jitter)
-{
- struct rspamd_binlog *new;
- gint len = strlen (path);
- struct stat st;
-
- new = rspamd_mempool_alloc0 (pool, sizeof (struct rspamd_binlog));
- new->pool = pool;
- new->rotate_time = rotate_time;
- new->fd = -1;
-
- if (rotate_time) {
- new->rotate_jitter = g_random_int_range (0, rotate_jitter);
- }
-
- new->filename = rspamd_mempool_alloc (pool, len + sizeof (BINLOG_SUFFIX));
- rspamd_strlcpy (new->filename, path, len + 1);
- rspamd_strlcpy (new->filename + len, BINLOG_SUFFIX, sizeof (BINLOG_SUFFIX));
-
- if (stat (new->filename, &st) == -1) {
- /* Check errno to check whether we should create this file */
- if (errno != ENOENT) {
- msg_err ("cannot stat file: %s, error %s", new->filename,
- strerror (errno));
- return NULL;
- }
- else {
- /* In case of ENOENT try to create binlog */
- if (!binlog_create (new)) {
- return NULL;
- }
- }
- }
- else {
- /* Try to open binlog */
- if (!binlog_open_real (new)) {
- return NULL;
- }
- }
-
- return new;
-}
-
-void
-binlog_close (struct rspamd_binlog *log)
-{
- if (log) {
- if (log->metaindex) {
- g_free (log->metaindex);
- }
- if (log->cur_idx) {
- g_free (log->cur_idx);
- }
- close (log->fd);
- }
-}
-
-static gboolean
-binlog_tree_callback (gpointer key, gpointer value, gpointer data)
-{
- token_node_t *node = key;
- struct rspamd_binlog *log = data;
- struct rspamd_binlog_element elt;
-
- elt.h1 = node->h1;
- elt.h2 = node->h2;
- elt.value = node->value;
-
- if (write (log->fd, &elt, sizeof (elt)) == -1) {
- msg_info ("cannot write token to file: %s, error: %s",
- log->filename,
- strerror (errno));
- return TRUE;
- }
-
- return FALSE;
-}
-
-static gboolean
-write_binlog_tree (struct rspamd_binlog *log, GTree *nodes)
-{
- off_t seek;
- struct rspamd_binlog_index *idx;
-
- rspamd_file_lock (log->fd, FALSE);
- log->cur_seq++;
-
- /* Seek to end of file */
- if ((seek = lseek (log->fd, 0, SEEK_END)) == -1) {
- rspamd_file_unlock (log->fd, FALSE);
- msg_info ("cannot seek in file: %s, error: %s",
- log->filename,
- strerror (errno));
- return FALSE;
- }
-
- /* Now write all nodes to file */
- g_tree_foreach (nodes, binlog_tree_callback, (gpointer)log);
-
- /* Write index */
- idx = &log->cur_idx->indexes[log->cur_idx->last_index];
- idx->seek = seek;
- idx->time = (guint64)time (NULL);
- log->cur_time = idx->time;
- idx->len = g_tree_nnodes (nodes) * sizeof (struct rspamd_binlog_element);
- if (lseek (log->fd, log->metaindex->indexes[log->metaindex->last_index],
- SEEK_SET) == -1) {
- rspamd_file_unlock (log->fd, FALSE);
- msg_info (
- "cannot seek in file: %s, error: %s, seek: %L, op: insert index",
- log->filename,
- strerror (errno),
- log->metaindex->indexes[log->metaindex->last_index]);
- return FALSE;
- }
- log->cur_idx->last_index++;
- if (write (log->fd, log->cur_idx,
- sizeof (struct rspamd_index_block)) == -1) {
- rspamd_file_unlock (log->fd, FALSE);
- msg_info ("cannot write index to file: %s, error: %s",
- log->filename,
- strerror (errno));
- return FALSE;
- }
-
- rspamd_file_unlock (log->fd, FALSE);
-
- return TRUE;
-}
-
-static gboolean
-create_new_metaindex_block (struct rspamd_binlog *log)
-{
- off_t seek;
-
- rspamd_file_lock (log->fd, FALSE);
-
- log->metaindex->last_index++;
- /* Seek to end of file */
- if ((seek = lseek (log->fd, 0, SEEK_END)) == -1) {
- rspamd_file_unlock (log->fd, FALSE);
- msg_info ("cannot seek in file: %s, error: %s",
- log->filename,
- strerror (errno));
- return FALSE;
- }
- if (write (log->fd, log->cur_idx,
- sizeof (struct rspamd_index_block)) == -1) {
- rspamd_file_unlock (log->fd, FALSE);
- g_free (log->cur_idx);
- msg_warn ("cannot write file %s, error %d, %s",
- log->filename,
- errno,
- strerror (errno));
- return FALSE;
- }
- /* Offset to metaindex */
- log->metaindex->indexes[log->metaindex->last_index] = seek;
- /* Overwrite all metaindexes */
- if (lseek (log->fd, sizeof (struct rspamd_binlog_header), SEEK_SET) == -1) {
- rspamd_file_unlock (log->fd, FALSE);
- msg_info ("cannot seek in file: %s, error: %s",
- log->filename,
- strerror (errno));
- return FALSE;
- }
- if (write (log->fd, log->metaindex,
- sizeof (struct rspamd_binlog_metaindex)) == -1) {
- rspamd_file_unlock (log->fd, FALSE);
- msg_info ("cannot write metaindex in file: %s, error: %s",
- log->filename,
- strerror (errno));
- return FALSE;
- }
- bzero (log->cur_idx, sizeof (struct rspamd_index_block));
- rspamd_file_unlock (log->fd, FALSE);
-
- return TRUE;
-}
-
-static gboolean
-maybe_rotate_binlog (struct rspamd_binlog *log)
-{
- guint64 now = time (NULL);
-
- if (log->rotate_time &&
- ((now - log->header.create_time) >
- (guint)(log->rotate_time + log->rotate_jitter))) {
- return TRUE;
- }
- return FALSE;
-}
-
-static gboolean
-rotate_binlog (struct rspamd_binlog *log)
-{
- gchar *backup_name;
- struct stat st;
-
- rspamd_file_lock (log->fd, FALSE);
-
- /* Unmap mapped fragments */
- if (log->metaindex) {
- g_free (log->metaindex);
- log->metaindex = NULL;
- }
- if (log->cur_idx) {
- g_free (log->cur_idx);
- log->cur_idx = NULL;
- }
- /* Format backup name */
- backup_name = g_strdup_printf ("%s.%s", log->filename, BACKUP_SUFFIX);
-
- if (stat (backup_name, &st) != -1) {
- msg_info ("replace old %s", backup_name);
- unlink (backup_name);
- }
-
- rename (log->filename, backup_name);
- g_free (backup_name);
-
- /* XXX: maybe race condition here */
- rspamd_file_unlock (log->fd, FALSE);
- close (log->fd);
-
- return binlog_create (log);
-
-}
-
-gboolean
-binlog_insert (struct rspamd_binlog *log, GTree *nodes)
-{
- off_t seek;
-
- if (!log || !log->metaindex || !log->cur_idx || !nodes) {
- msg_info ("improperly opened binlog: %s",
- log != NULL ? log->filename : "unknown");
- return FALSE;
- }
-
- if (maybe_rotate_binlog (log)) {
- if (!rotate_binlog (log)) {
- return FALSE;
- }
- }
- /* First of all try to place new tokens in current index */
- if (log->cur_idx->last_index < BINLOG_IDX_LEN) {
- /* All is ok */
- return write_binlog_tree (log, nodes);
- }
- /* Current index table is all busy, try to allocate new index */
-
- /* Check metaindex free space */
- if (log->metaindex->last_index < METAINDEX_LEN) {
- /* Create new index block */
- if ((seek = lseek (log->fd, 0, SEEK_END)) == (off_t)-1) {
- msg_info ("cannot seek in file: %s, error: %s",
- log->filename,
- strerror (errno));
- return FALSE;
- }
- if (!create_new_metaindex_block (log)) {
- return FALSE;
- }
- return write_binlog_tree (log, nodes);
- }
-
- /* All binlog is filled, we need to rotate it forcefully */
- if (!rotate_binlog (log)) {
- return FALSE;
- }
-
- return write_binlog_tree (log, nodes);
-}
-
-gboolean
-binlog_sync (struct rspamd_binlog *log,
- guint64 from_rev,
- guint64 *from_time,
- GByteArray **rep)
-{
- guint32 metaindex_num;
- struct rspamd_index_block *idxb;
- struct rspamd_binlog_index *idx;
- gboolean idx_mapped = FALSE, res = TRUE, is_first = FALSE;
-
- if (!log || !log->metaindex || !log->cur_idx) {
- msg_info ("improperly opened binlog: %s",
- log != NULL ? log->filename : "unknown");
- return FALSE;
- }
-
- if (*rep == NULL) {
- *rep = g_malloc (sizeof (GByteArray));
- is_first = TRUE;
- }
- else {
- /* Unmap old fragment */
- g_free ((*rep)->data);
- }
-
- if (from_rev == log->cur_seq) {
- /* Last record */
- *rep = NULL;
- return FALSE;
- }
- else if (from_rev > log->cur_seq) {
- /* Slave has more actual copy, write this to log and abort sync */
- msg_warn (
- "slave has more recent revision of statfile %s: %uL and our is: %uL",
- log->filename,
- from_rev,
- log->cur_seq);
- *rep = NULL;
- *from_time = 0;
- return FALSE;
- }
-
- metaindex_num = from_rev / BINLOG_IDX_LEN;
- /* First of all try to find this revision */
- if (metaindex_num > log->metaindex->last_index) {
- return FALSE;
- }
- else if (metaindex_num != log->metaindex->last_index) {
- /* Need to remap index block */
- rspamd_file_lock (log->fd, FALSE);
- idxb = g_malloc (sizeof (struct rspamd_index_block));
- idx_mapped = TRUE;
- if (lseek (log->fd, log->metaindex->indexes[metaindex_num],
- SEEK_SET) == -1) {
- rspamd_file_unlock (log->fd, FALSE);
- msg_warn ("cannot seek file %s, error %d, %s",
- log->filename,
- errno,
- strerror (errno));
- res = FALSE;
- goto end;
- }
- if ((read (log->fd, idxb,
- sizeof (struct rspamd_index_block))) !=
- sizeof (struct rspamd_index_block)) {
- rspamd_file_unlock (log->fd, FALSE);
- msg_warn ("cannot read index from file %s, error %d, %s",
- log->filename,
- errno,
- strerror (errno));
- res = FALSE;
- goto end;
- }
- rspamd_file_unlock (log->fd, FALSE);
- }
- else {
- idxb = log->cur_idx;
- }
- /* Now check specified index */
- idx = &idxb->indexes[from_rev % BINLOG_IDX_LEN];
- if (is_first && idx->time != *from_time) {
- res = FALSE;
- *from_time = 0;
- goto end;
- }
- else {
- *from_time = idx->time;
- }
-
- /* Now fill reply structure */
- (*rep)->len = idx->len;
- /* Read result */
- msg_info (
- "update from binlog '%s' from revision: %uL to revision %uL size is %uL",
- log->filename,
- from_rev,
- log->cur_seq,
- idx->len);
- if (lseek (log->fd, idx->seek, SEEK_SET) == -1) {
- msg_warn ("cannot seek file %s, error %d, %s",
- log->filename,
- errno,
- strerror (errno));
- res = FALSE;
- goto end;
- }
-
- (*rep)->data = g_malloc (idx->len);
- if ((read (log->fd, (*rep)->data, idx->len)) != (ssize_t)idx->len) {
- msg_warn ("cannot read file %s, error %d, %s",
- log->filename,
- errno,
- strerror (errno));
- res = FALSE;
- goto end;
- }
-
-end:
- if (idx_mapped) {
- g_free (idxb);
- }
-
- return res;
-}
-
-static gboolean
-maybe_init_static (void)
-{
- if (!binlog_opened) {
- binlog_opened = g_hash_table_new (g_direct_hash, g_direct_equal);
- if (!binlog_opened) {
- return FALSE;
- }
- }
-
- if (!binlog_pool) {
- binlog_pool = rspamd_mempool_new (rspamd_mempool_suggest_size ());
- if (!binlog_pool) {
- return FALSE;
- }
- }
-
- return TRUE;
-}
-
-gboolean
-maybe_write_binlog (struct rspamd_classifier_config *ccf,
- struct rspamd_statfile_config *st,
- stat_file_t *file,
- GTree *nodes)
-{
- struct rspamd_binlog *log;
-
- if (ccf == NULL) {
- return FALSE;
- }
-
-
- if (st == NULL || nodes == NULL || st->binlog == NULL ||
- st->binlog->affinity != AFFINITY_MASTER) {
- return FALSE;
- }
-
- if (!maybe_init_static ()) {
- return FALSE;
- }
-
- if ((log = g_hash_table_lookup (binlog_opened, st)) == NULL) {
- if ((log =
- binlog_open (binlog_pool, st->path, st->binlog->rotate_time,
- st->binlog->rotate_time / 2)) != NULL) {
- g_hash_table_insert (binlog_opened, st, log);
- }
- else {
- return FALSE;
- }
- }
-
- if (binlog_insert (log, nodes)) {
- msg_info ("set new revision of statfile %s: %uL",
- st->symbol,
- log->cur_seq);
- (void)statfile_set_revision (file, log->cur_seq, log->cur_time);
- return TRUE;
- }
-
- return FALSE;
-}
-
-struct rspamd_binlog *
-get_binlog_by_statfile (struct rspamd_statfile_config *st)
-{
- struct rspamd_binlog *log;
-
- if (st == NULL || st->binlog == NULL || st->binlog->affinity !=
- AFFINITY_MASTER) {
- return NULL;
- }
-
- if (!maybe_init_static ()) {
- return NULL;
- }
-
- if ((log = g_hash_table_lookup (binlog_opened, st)) == NULL) {
- if ((log =
- binlog_open (binlog_pool, st->path, st->binlog->rotate_time,
- st->binlog->rotate_time / 2)) != NULL) {
- g_hash_table_insert (binlog_opened, st, log);
- }
- else {
- return NULL;
- }
- }
-
- return log;
-}
diff --git a/src/libserver/binlog.h b/src/libserver/binlog.h
deleted file mode 100644
index c18a201f2..000000000
--- a/src/libserver/binlog.h
+++ /dev/null
@@ -1,102 +0,0 @@
-#ifndef RSPAMD_BINLOG_H
-#define RSPAMD_BINLOG_H
-
-#include "config.h"
-#include "main.h"
-#include "statfile.h"
-
-/* How much records are in a single index */
-#define BINLOG_IDX_LEN 200
-#define METAINDEX_LEN 1024
-
-/* Assume 8 bytes words */
-struct rspamd_binlog_header {
- gchar magic[3];
- gchar version[2];
- gchar padding[3];
- guint64 create_time;
-};
-
-struct rspamd_binlog_index {
- guint64 time;
- guint64 seek;
- guint32 len;
-};
-
-struct rspamd_index_block {
- struct rspamd_binlog_index indexes[BINLOG_IDX_LEN];
- guint32 last_index;
-};
-
-struct rspamd_binlog_metaindex {
- guint64 indexes[METAINDEX_LEN];
- guint64 last_index;
-};
-
-struct rspamd_binlog_element {
- guint32 h1;
- guint32 h2;
- float value;
-} __attribute__((__packed__));
-
-struct rspamd_binlog {
- gchar *filename;
- time_t rotate_time;
- gint rotate_jitter;
- guint64 cur_seq;
- guint64 cur_time;
- gint fd;
- rspamd_mempool_t *pool;
-
- struct rspamd_binlog_header header;
- struct rspamd_binlog_metaindex *metaindex;
- struct rspamd_index_block *cur_idx;
-};
-
-struct rspamd_classifier_config;
-
-/*
- * Open binlog at specified path with specified rotate params
- */
-struct rspamd_binlog * binlog_open (rspamd_mempool_t *pool,
- const gchar *path,
- time_t rotate_time,
- gint rotate_jitter);
-
-/*
- * Get and open binlog for specified statfile
- */
-struct rspamd_binlog * get_binlog_by_statfile (struct rspamd_statfile_config *st);
-
-/*
- * Close binlog
- */
-void binlog_close (struct rspamd_binlog *log);
-
-/*
- * Insert new nodes inside binlog
- */
-gboolean binlog_insert (struct rspamd_binlog *log, GTree *nodes);
-
-/*
- * Sync binlog from specified revision
- * @param log binlog structure
- * @param from_rev from revision
- * @param from_time from time
- * @param rep a portion of changes for revision is stored here
- * @return TRUE if there are more revisions to get and FALSE if synchronization is complete
- */
-gboolean binlog_sync (struct rspamd_binlog *log,
- guint64 from_rev,
- guint64 *from_time,
- GByteArray **rep);
-
-/*
- * Conditional write to a binlog for specified statfile
- */
-gboolean maybe_write_binlog (struct rspamd_classifier_config *ccf,
- struct rspamd_statfile_config *st,
- stat_file_t *file,
- GTree *nodes);
-
-#endif
diff --git a/src/libserver/cfg_rcl.c b/src/libserver/cfg_rcl.c
index 44db06a0b..6c77292aa 100644
--- a/src/libserver/cfg_rcl.c
+++ b/src/libserver/cfg_rcl.c
@@ -866,42 +866,6 @@ rspamd_rcl_statfile_handler (struct rspamd_config *cfg, const ucl_object_t *obj,
st = rspamd_config_new_statfile (cfg, NULL);
-#if 0
- const gchar *data;
- gdouble binlog_rotate;
- val = ucl_object_find_key (obj, "binlog");
- if (val != NULL && ucl_object_tostring_safe (val, &data)) {
- if (st->binlog == NULL) {
- st->binlog =
- rspamd_mempool_alloc0 (cfg->cfg_pool,
- sizeof (struct statfile_binlog_params));
- }
- if (g_ascii_strcasecmp (data, "master") == 0) {
- st->binlog->affinity = AFFINITY_MASTER;
- }
- else if (g_ascii_strcasecmp (data, "slave") == 0) {
- st->binlog->affinity = AFFINITY_SLAVE;
- }
- else {
- st->binlog->affinity = AFFINITY_NONE;
- }
- /* Parse remaining binlog attributes */
- val = ucl_object_find_key (obj, "binlog_rotate");
- if (val != NULL && ucl_object_todouble_safe (val, &binlog_rotate)) {
- st->binlog->rotate_time = binlog_rotate;
- }
- val = ucl_object_find_key (obj, "binlog_master");
- if (val != NULL && ucl_object_tostring_safe (val, &data)) {
- if (!rspamd_parse_host_port (cfg->cfg_pool, data,
- &st->binlog->master_addr, &st->binlog->master_port)) {
- msg_err ("cannot parse master address: %s", data);
- return FALSE;
- }
- }
- }
-#endif
-
-
if (rspamd_rcl_section_parse_defaults (section, cfg, obj, st, err)) {
ccf->statfiles = g_list_prepend (ccf->statfiles, st);
if (st->label != NULL) {
diff --git a/src/libserver/statfile_sync.c b/src/libserver/statfile_sync.c
deleted file mode 100644
index 62f848059..000000000
--- a/src/libserver/statfile_sync.c
+++ /dev/null
@@ -1,398 +0,0 @@
-/*
- * Copyright (c) 2009-2012, Vsevolod Stakhov
- * All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are met:
- * * Redistributions of source code must retain the above copyright
- * notice, this list of conditions and the following disclaimer.
- * * Redistributions in binary form must reproduce the above copyright
- * notice, this list of conditions and the following disclaimer in the
- * documentation and/or other materials provided with the distribution.
- *
- * THIS SOFTWARE IS PROVIDED BY AUTHOR ''AS IS'' AND ANY
- * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
- * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
- * DISCLAIMED. IN NO EVENT SHALL AUTHOR BE LIABLE FOR ANY
- * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
- * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
- * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
- * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
- * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
- * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
- */
-
-#include "config.h"
-#include "cfg_file.h"
-#include "tokenizers.h"
-#include "classifiers.h"
-#include "statfile.h"
-#include "binlog.h"
-#include "buffer.h"
-#include "statfile_sync.h"
-
-enum rspamd_sync_state {
- SYNC_STATE_GREETING,
- SYNC_STATE_READ_LINE,
- SYNC_STATE_READ_REV,
- SYNC_STATE_QUIT,
-};
-
-/* Context of sync process */
-struct rspamd_sync_ctx {
- struct rspamd_statfile_config *st;
- stat_file_t *real_statfile;
- statfile_pool_t *pool;
- rspamd_io_dispatcher_t *dispatcher;
- struct event_base *ev_base;
-
- struct event tm_ev;
-
- struct timeval interval;
- struct timeval io_tv;
- gint sock;
- guint32 timeout;
- guint32 sync_interval;
- enum rspamd_sync_state state;
- gboolean is_busy;
-
- guint64 new_rev;
- guint64 new_time;
- guint64 new_len;
-};
-
-static void
-log_next_sync (const gchar *symbol, time_t delay)
-{
- gchar outstr[200];
- time_t t;
- struct tm *tmp;
- gint r;
-
- t = time (NULL);
- t += delay;
- tmp = localtime (&t);
-
- if (tmp) {
- r = rspamd_snprintf (outstr,
- sizeof (outstr),
- "statfile_sync: next sync of %s at ",
- symbol);
- if ((r = strftime (outstr + r, sizeof(outstr) - r, "%T", tmp)) != 0) {
- msg_info (outstr);
- }
- }
-}
-
-static gboolean
-parse_revision_line (struct rspamd_sync_ctx *ctx, rspamd_fstring_t *in)
-{
- guint i, state = 0;
- gchar *p, *c, numbuf[sizeof("18446744073709551615")];
- guint64 *val;
-
- /* First of all try to find END line */
- if (in->len >= sizeof ("END") - 1 &&
- memcmp (in->begin, "END", sizeof ("END") - 1) == 0) {
- ctx->state = SYNC_STATE_QUIT;
- ctx->is_busy = FALSE;
- return TRUE;
- }
-
- /* Next check for error line */
- if (in->len >= sizeof ("FAIL") - 1 &&
- memcmp (in->begin, "FAIL", sizeof ("FAIL") - 1) == 0) {
- ctx->state = SYNC_STATE_QUIT;
- ctx->is_busy = FALSE;
- return TRUE;
- }
-
- /* Now try to extract 3 numbers from string: revision, time and length */
- p = in->begin;
- val = &ctx->new_rev;
- for (i = 0; i < in->len; i++, p++) {
- if (g_ascii_isspace (*p) || i == in->len - 1) {
- if (state == 1) {
- if (i == in->len - 1) {
- /* One more character */
- p++;
- }
- rspamd_strlcpy (numbuf, c, MIN (p - c + 1,
- (gint)sizeof (numbuf)));
- errno = 0;
- *val = strtoull (numbuf, NULL, 10);
- if (errno != 0) {
- msg_info ("cannot parse number %s", strerror (errno));
- return FALSE;
- }
- state = 2;
- }
- }
- else {
- if (state == 0) {
- c = p;
- state = 1;
- }
- else if (state == 2) {
- if (val == &ctx->new_rev) {
- val = &ctx->new_time;
- }
- else if (val == &ctx->new_time) {
- val = &ctx->new_len;
- }
- c = p;
- state = 1;
- }
- }
- }
-
- /* Current value must be len value and its value must not be 0 */
- return ((val == &ctx->new_len));
-}
-
-static gboolean
-read_blocks (struct rspamd_sync_ctx *ctx, rspamd_fstring_t *in)
-{
- struct rspamd_binlog_element *elt;
- guint i;
-
- statfile_pool_lock_file (ctx->pool, ctx->real_statfile);
- elt = (struct rspamd_binlog_element *)in->begin;
- for (i = 0; i < in->len / sizeof (struct rspamd_binlog_element); i++,
- elt++) {
- statfile_pool_set_block (ctx->pool,
- ctx->real_statfile,
- elt->h1,
- elt->h2,
- ctx->new_time,
- elt->value);
- }
- statfile_pool_unlock_file (ctx->pool, ctx->real_statfile);
-
- return TRUE;
-}
-
-static gboolean
-sync_read (rspamd_fstring_t * in, void *arg)
-{
- struct rspamd_sync_ctx *ctx = arg;
- gchar buf[256];
- guint64 rev = 0;
- time_t ti = 0;
-
- if (in->len == 0) {
- /* Skip empty lines */
- return TRUE;
- }
- switch (ctx->state) {
- case SYNC_STATE_GREETING:
- /* Skip greeting line and write sync command */
- /* Write initial data */
- statfile_get_revision (ctx->real_statfile, &rev, &ti);
- rev = rspamd_snprintf (buf,
- sizeof (buf),
- "sync %s %uL %T" CRLF,
- ctx->st->symbol,
- rev,
- ti);
- ctx->state = SYNC_STATE_READ_LINE;
- return rspamd_dispatcher_write (ctx->dispatcher, buf, rev, FALSE,
- FALSE);
- break;
- case SYNC_STATE_READ_LINE:
- /* Try to parse line from server */
- if (!parse_revision_line (ctx, in)) {
- msg_info ("cannot parse line of length %z: '%*s'",
- in->len,
- (gint)in->len,
- in->begin);
- close (ctx->sock);
- rspamd_remove_dispatcher (ctx->dispatcher);
- ctx->is_busy = FALSE;
- return FALSE;
- }
- else if (ctx->state != SYNC_STATE_QUIT) {
- if (ctx->new_len > 0) {
- ctx->state = SYNC_STATE_READ_REV;
- rspamd_set_dispatcher_policy (ctx->dispatcher,
- BUFFER_CHARACTER,
- ctx->new_len);
- }
- }
- else {
- /* Quit this session */
- msg_info ("sync ended for: %s", ctx->st->symbol);
- close (ctx->sock);
- rspamd_remove_dispatcher (ctx->dispatcher);
- ctx->is_busy = FALSE;
- /* Immediately return from callback */
- return FALSE;
- }
- break;
- case SYNC_STATE_READ_REV:
- /* In now contains all blocks of specified revision, so we can read them directly */
- if (!read_blocks (ctx, in)) {
- msg_info ("cannot read blocks");
- close (ctx->sock);
- rspamd_remove_dispatcher (ctx->dispatcher);
- ctx->is_busy = FALSE;
- return FALSE;
- }
- statfile_set_revision (ctx->real_statfile, ctx->new_rev, ctx->new_time);
- msg_info ("set new revision: %uL, readed %z bytes",
- ctx->new_rev,
- in->len);
- /* Now try to read other revision or END line */
- ctx->state = SYNC_STATE_READ_LINE;
- rspamd_set_dispatcher_policy (ctx->dispatcher, BUFFER_LINE, 0);
- break;
- case SYNC_STATE_QUIT:
- close (ctx->sock);
- rspamd_remove_dispatcher (ctx->dispatcher);
- ctx->is_busy = FALSE;
- return FALSE;
- }
-
- return TRUE;
-}
-
-static void
-sync_err (GError *err, void *arg)
-{
- struct rspamd_sync_ctx *ctx = arg;
-
- msg_info ("abnormally closing connection, error: %s", err->message);
- ctx->is_busy = FALSE;
- close (ctx->sock);
- rspamd_remove_dispatcher (ctx->dispatcher);
-}
-
-
-static void
-sync_timer_callback (gint fd, short what, void *ud)
-{
- struct rspamd_sync_ctx *ctx = ud;
- guint32 jittered_interval;
-
- /* Plan new event */
- evtimer_del (&ctx->tm_ev);
- /* Add some jittering for synchronization */
- jittered_interval = g_random_int_range (ctx->sync_interval,
- ctx->sync_interval * 2);
- msec_to_tv (jittered_interval, &ctx->interval);
- evtimer_add (&ctx->tm_ev, &ctx->interval);
- log_next_sync (ctx->st->symbol, ctx->interval.tv_sec);
-
- if (ctx->is_busy) {
- /* Sync is in progress */
- msg_info ("syncronization process is in progress, do not start new one");
- return;
- }
-
- if ((ctx->sock =
- rspamd_socket (ctx->st->binlog->master_addr,
- ctx->st->binlog->master_port,
- SOCK_STREAM, TRUE, FALSE, TRUE)) == -1) {
- msg_info ("cannot connect to %s", ctx->st->binlog->master_addr);
- return;
- }
- /* Now create and activate dispatcher */
- msec_to_tv (ctx->timeout, &ctx->io_tv);
- ctx->dispatcher = rspamd_create_dispatcher (ctx->ev_base,
- ctx->sock,
- BUFFER_LINE,
- sync_read,
- NULL,
- sync_err,
- &ctx->io_tv,
- ctx);
-
- ctx->state = SYNC_STATE_GREETING;
- ctx->is_busy = TRUE;
-
- msg_info ("starting synchronization of %s", ctx->st->symbol);
-
-}
-
-static gboolean
-add_statfile_watch (statfile_pool_t *pool,
- struct rspamd_statfile_config *st,
- struct rspamd_config *cfg,
- struct event_base *ev_base)
-{
- struct rspamd_sync_ctx *ctx;
- guint32 jittered_interval;
-
- if (st->binlog->master_addr != NULL) {
- ctx =
- rspamd_mempool_alloc (pool->pool, sizeof (struct rspamd_sync_ctx));
- ctx->st = st;
- ctx->timeout = cfg->statfile_sync_timeout;
- ctx->sync_interval = cfg->statfile_sync_interval;
- ctx->ev_base = ev_base;
- /* Add some jittering for synchronization */
- jittered_interval = g_random_int_range (ctx->sync_interval,
- ctx->sync_interval * 2);
- msec_to_tv (jittered_interval, &ctx->interval);
- /* Open statfile and attach it to pool */
- if ((ctx->real_statfile =
- statfile_pool_is_open (pool, st->path)) == NULL) {
- if ((ctx->real_statfile =
- statfile_pool_open (pool, st->path, st->size, FALSE)) == NULL) {
- msg_warn ("cannot open %s", st->path);
- if (statfile_pool_create (pool, st->path, st->size) == -1) {
- msg_err ("cannot create statfile %s", st->path);
- return FALSE;
- }
- ctx->real_statfile = statfile_pool_open (pool,
- st->path,
- st->size,
- FALSE);
- }
- }
- /* Now plan event for it's future executing */
- evtimer_set (&ctx->tm_ev, sync_timer_callback, ctx);
- event_base_set (ctx->ev_base, &ctx->tm_ev);
- evtimer_add (&ctx->tm_ev, &ctx->interval);
- log_next_sync (st->symbol, ctx->interval.tv_sec);
- }
- else {
- msg_err ("cannot add statfile watch for statfile %s: no master defined",
- st->symbol);
- return FALSE;
- }
-
- return TRUE;
-}
-
-gboolean
-start_statfile_sync (statfile_pool_t *pool,
- struct rspamd_config *cfg,
- struct event_base *ev_base)
-{
- GList *cur, *l;
- struct rspamd_classifier_config *cl;
- struct rspamd_statfile_config *st;
-
- /*
- * First of all walk through all classifiers and find those statfiles
- * for which we should do sync (slave affinity)
- */
- cur = cfg->classifiers;
- while (cur) {
- cl = cur->data;
- l = cl->statfiles;
- while (l) {
- st = l->data;
- if (st->binlog != NULL && st->binlog->affinity == AFFINITY_SLAVE) {
- if (!add_statfile_watch (pool, st, cfg, ev_base)) {
- return FALSE;
- }
- }
- l = g_list_next (l);
- }
- cur = g_list_next (cur);
- }
-
- return TRUE;
-}
diff --git a/src/libserver/statfile_sync.h b/src/libserver/statfile_sync.h
deleted file mode 100644
index be2550d7b..000000000
--- a/src/libserver/statfile_sync.h
+++ /dev/null
@@ -1,16 +0,0 @@
-#ifndef RSPAMD_STATFILE_SYNC_H
-#define RSPAMD_STATFILE_SYNC_H
-
-#include "config.h"
-#include "main.h"
-#include "statfile.h"
-#include "cfg_file.h"
-
-/*
- * Start synchronization of statfiles. Must be called after event_init as it adds events
- */
-gboolean start_statfile_sync (statfile_pool_t *pool,
- struct rspamd_config *cfg,
- struct event_base *ev_base);
-
-#endif