aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--CMakeLists.txt45
-rw-r--r--src/binlog.c458
-rw-r--r--src/binlog.h59
-rw-r--r--src/classifiers/winnow.c4
-rw-r--r--src/tokenizers/tokenizers.h1
5 files changed, 544 insertions, 23 deletions
diff --git a/CMakeLists.txt b/CMakeLists.txt
index a8f4b6768..8c58586d0 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -344,36 +344,37 @@ ENDIF(DEBUG_MODE MATCHES "ON")
################################ SOURCES SECTION ###########################
SET(RSPAMDSRC src/modules.c
- src/hash.c
- src/worker.c
- src/util.c
- src/url.c
- src/upstream.c
- src/statfile.c
- src/protocol.c
- src/message.c
- src/fuzzy.c
- src/expressions.c
- src/mem_pool.c
- src/memcached.c
- src/main.c
- src/fstring.c
- src/filter.c
- src/controller.c
- src/cfg_utils.c
+ src/binlog.c
+ src/bloom.c
src/buffer.c
+ src/cfg_utils.c
+ src/controller.c
src/events.c
+ src/expressions.c
+ src/filter.c
+ src/fstring.c
+ src/fuzzy.c
+ src/fuzzy_storage.c
+ src/hash.c
src/html.c
src/lmtp.c
- src/spf.c
src/lmtp_proto.c
- src/radix.c
- src/view.c
+ src/main.c
src/map.c
- src/bloom.c
+ src/memcached.c
+ src/mem_pool.c
+ src/message.c
+ src/protocol.c
+ src/radix.c
src/settings.c
+ src/spf.c
+ src/statfile.c
src/symbols_cache.c
- src/fuzzy_storage.c)
+ src/upstream.c
+ src/url.c
+ src/util.c
+ src/view.c
+ src/worker.c)
IF(ENABLE_PERL MATCHES "ON")
LIST(APPEND RSPAMDSRC src/perl.c)
diff --git a/src/binlog.c b/src/binlog.c
new file mode 100644
index 000000000..457e56a42
--- /dev/null
+++ b/src/binlog.c
@@ -0,0 +1,458 @@
+/*
+ * Copyright (c) 2009, Rambler media
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above copyright
+ * notice, this list of conditions and the following disclaimer in the
+ * documentation and/or other materials provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY Rambler media ''AS IS'' AND ANY
+ * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+ * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
+ * DISCLAIMED. IN NO EVENT SHALL Rambler BE LIABLE FOR ANY
+ * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
+ * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
+ * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
+ * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
+ * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#include "config.h"
+#include "binlog.h"
+#include "tokenizers/tokenizers.h"
+
+#define BINLOG_SUFFIX ".binlog"
+#define BACKUP_SUFFIX ".old"
+#define VALID_MAGIC { 'r', 's', 'l' }
+#define VALID_VERSION { '1', '0' }
+
+static gboolean
+binlog_write_header (struct rspamd_binlog *log)
+{
+ struct rspamd_binlog_header header = {
+ .magic = VALID_MAGIC,
+ .version = VALID_VERSION,
+ .padding = { '\0', '\0' },
+ };
+
+ header.create_time = time (NULL);
+ lock_file (log->fd, FALSE);
+
+ if (write (log->fd, &header, sizeof (struct rspamd_binlog_header)) == -1) {
+ msg_warn ("binlog_write_header: cannot write file %s, error %d, %s", log->filename, errno, strerror (errno));
+ return FALSE;
+ }
+
+
+ memcpy (&log->header, &header, sizeof (struct rspamd_binlog_header));
+
+ /* Metaindex */
+ log->metaindex = g_malloc (sizeof (struct rspamd_binlog_metaindex));
+ bzero (log->metaindex, sizeof (struct rspamd_binlog_metaindex));
+ if (write (log->fd, log->metaindex, sizeof (struct rspamd_binlog_metaindex)) == -1) {
+ g_free (log->metaindex);
+ msg_warn ("binlog_write_header: cannot write file %s, error %d, %s", log->filename, errno, strerror (errno));
+ unlock_file (log->fd, FALSE);
+ return FALSE;
+ }
+ g_free (log->metaindex);
+ /* Now mmap metaindex to memory */
+ if ((log->metaindex = mmap (NULL, sizeof (struct rspamd_binlog_metaindex),
+ PROT_READ | PROT_WRITE, MAP_SHARED, log->fd, sizeof (struct rspamd_binlog_header))) == MAP_FAILED) {
+ msg_warn ("binlog_write_header: cannot mmap file %s, error %d, %s", log->filename, errno, strerror (errno));
+ unlock_file (log->fd, FALSE);
+ return FALSE;
+ }
+
+ /* First index block */
+
+ /* Offset to metaindex */
+ log->metaindex->indexes[0] = sizeof (struct rspamd_binlog_metaindex) + sizeof (struct rspamd_binlog_header);
+ /* Alloc, write, mmap */
+ log->cur_idx = g_malloc (sizeof (struct rspamd_index_block));
+ bzero (log->cur_idx, sizeof (struct rspamd_index_block));
+ if (write (log->fd, log->cur_idx, sizeof (struct rspamd_index_block)) == -1) {
+ g_free (log->cur_idx);
+ msg_warn ("binlog_write_header: cannot write file %s, error %d, %s", log->filename, errno, strerror (errno));
+ unlock_file (log->fd, FALSE);
+ return FALSE;
+ }
+ g_free (log->cur_idx);
+ /* Now mmap it to memory */
+ if ((log->cur_idx = mmap (NULL, sizeof (struct rspamd_index_block),
+ PROT_READ | PROT_WRITE, MAP_SHARED, log->fd, log->metaindex->indexes[0])) == MAP_FAILED) {
+ msg_warn ("binlog_write_header: cannot mmap file %s, error %d, %s", log->filename, errno, strerror (errno));
+ unlock_file (log->fd, FALSE);
+ return FALSE;
+ }
+
+ unlock_file (log->fd, FALSE);
+
+ return TRUE;
+}
+
+static gboolean
+binlog_check_file (struct rspamd_binlog *log)
+{
+ static char valid_magic[] = VALID_MAGIC, valid_version[] = VALID_VERSION;
+
+ if (read (log->fd, &log->header, sizeof (struct rspamd_binlog_header)) != sizeof (struct rspamd_binlog_header)) {
+ msg_warn ("binlog_check_file: cannot read file %s, error %d, %s", log->filename, errno, strerror (errno));
+ return FALSE;
+ }
+
+ /* Now check all fields */
+ if (memcmp (&log->header.magic, valid_magic, sizeof (valid_magic)) != 0 ||
+ memcmp (&log->header.version, valid_version, sizeof (valid_version)) != 0) {
+ msg_warn ("binlog_check_file: cannot validate file %s");
+ return FALSE;
+ }
+ /* Now mmap metaindex and current index */
+ if ((log->metaindex = mmap (NULL, sizeof (struct rspamd_binlog_metaindex),
+ PROT_READ | PROT_WRITE, MAP_SHARED, log->fd, sizeof (struct rspamd_binlog_header))) == MAP_FAILED) {
+ msg_warn ("binlog_check file: cannot mmap file %s, error %d, %s", log->filename, errno, strerror (errno));
+ return FALSE;
+ }
+ /* Current index */
+ if ((log->cur_idx = mmap (NULL, sizeof (struct rspamd_index_block),
+ PROT_READ | PROT_WRITE, MAP_SHARED, log->fd, log->metaindex->indexes[log->metaindex->last_index])) == MAP_FAILED) {
+ msg_warn ("binlog_check_file: cannot mmap file %s, error %d, %s", log->filename, errno, strerror (errno));
+ return FALSE;
+ }
+
+ return TRUE;
+
+}
+
+static gboolean
+binlog_create (struct rspamd_binlog *log)
+{
+ if ((log->fd = open (log->filename, O_RDWR | O_TRUNC | O_CREAT, S_IWUSR | S_IRUSR)) == -1) {
+ msg_info ("binlog_create: cannot create file %s, error %d, %s", log->filename, errno, strerror (errno));
+ return FALSE;
+ }
+
+ return binlog_write_header (log);
+}
+
+static gboolean
+binlog_open_real (struct rspamd_binlog *log)
+{
+ if ((log->fd = open (log->filename, O_RDWR)) == -1) {
+ msg_info ("binlog_open: cannot open file %s, error %d, %s", log->filename, errno, strerror (errno));
+ return FALSE;
+ }
+
+ return binlog_check_file (log);
+}
+
+
+struct rspamd_binlog*
+binlog_open (memory_pool_t *pool, const char *path, time_t rotate_time, int rotate_jitter)
+{
+ struct rspamd_binlog *new;
+ int len = strlen (path);
+ struct stat st;
+
+ new = memory_pool_alloc0 (pool, sizeof (struct rspamd_binlog));
+ new->pool = pool;
+ new->rotate_time = rotate_time;
+ new->rotate_jitter = g_random_int_range (0, rotate_jitter);
+ new->fd = -1;
+
+ new->filename = memory_pool_alloc (pool, len + sizeof (BINLOG_SUFFIX));
+ g_strlcpy (new->filename, path, len + 1);
+ g_strlcpy (new->filename + len, BINLOG_SUFFIX, sizeof (BINLOG_SUFFIX));
+
+ if (stat (new->filename, &st) == -1) {
+ /* Check errno to check whether we should create this file */
+ if (errno != ENOENT) {
+ msg_err ("binlog_open: cannot stat file: %s, error %s", new->filename, strerror (errno));
+ return NULL;
+ }
+ else {
+ /* In case of ENOENT try to create binlog */
+ if (!binlog_create (new)) {
+ return NULL;
+ }
+ }
+ }
+ else {
+ /* Try to open binlog */
+ if (!binlog_open_real (new)) {
+ return NULL;
+ }
+ }
+
+ return new;
+}
+
+void
+binlog_close (struct rspamd_binlog *log)
+{
+ if (log) {
+ if (log->metaindex) {
+ munmap (log->metaindex, sizeof (struct rspamd_binlog_metaindex));
+ }
+ if (log->cur_idx) {
+ munmap (log->cur_idx, sizeof (struct rspamd_index_block));
+ }
+ close (log->fd);
+ }
+}
+
+static gboolean
+binlog_tree_callback (gpointer key, gpointer value, gpointer data)
+{
+ token_node_t *node = key;
+ struct rspamd_binlog *log = data;
+ struct rspamd_binlog_element elt;
+
+ elt.h1 = node->h1;
+ elt.h2 = node->h2;
+ elt.value = node->value;
+
+ if (write (log->fd, &elt, sizeof (elt)) == -1) {
+ msg_info ("write_binlog_tree: cannot write token to file: %s, error: %s", log->filename, strerror (errno));
+ return TRUE;
+ }
+
+ return FALSE;
+}
+
+static gboolean
+write_binlog_tree (struct rspamd_binlog *log, GTree *nodes)
+{
+ off_t seek;
+ struct rspamd_binlog_index *idx;
+
+ /* Seek to end of file */
+ if ((seek = lseek (log->fd, 0, SEEK_END)) == -1) {
+ msg_info ("binlog_insert: cannot seek in file: %s, error: %s", log->filename, strerror (errno));
+ return FALSE;
+ }
+
+ lock_file (log->fd, FALSE);
+ /* Write index */
+ log->cur_idx->last_index ++;
+ log->cur_seq ++;
+
+ idx = &log->cur_idx->indexes[log->cur_idx->last_index];
+ idx->seek = seek;
+ idx->time = (uint64_t)time (NULL);
+ idx->len = g_tree_nnodes (nodes) * sizeof (struct rspamd_binlog_element);
+
+ /* Now write all nodes to file */
+ g_tree_foreach (nodes, binlog_tree_callback, (gpointer)log);
+
+ unlock_file (log->fd, FALSE);
+
+ return TRUE;
+}
+
+static gboolean
+create_new_metaindex_block (struct rspamd_binlog *log)
+{
+ off_t seek;
+
+ /* Seek to end of file */
+ if ((seek = lseek (log->fd, 0, SEEK_END)) == -1) {
+ msg_info ("create_new_metaindex_block: cannot seek in file: %s, error: %s", log->filename, strerror (errno));
+ return FALSE;
+ }
+
+ lock_file (log->fd, FALSE);
+
+ log->metaindex->last_index ++;
+ /* Offset to metaindex */
+ log->metaindex->indexes[log->metaindex->last_index] = seek;
+ munmap (log->cur_idx, sizeof (struct rspamd_index_block));
+ /* Alloc, write, mmap */
+ log->cur_idx = g_malloc (sizeof (struct rspamd_index_block));
+ bzero (log->cur_idx, sizeof (struct rspamd_index_block));
+ if (write (log->fd, log->cur_idx, sizeof (struct rspamd_index_block)) == -1) {
+ unlock_file (log->fd, FALSE);
+ g_free (log->cur_idx);
+ msg_warn ("create_new_metaindex_block: cannot write file %s, error %d, %s", log->filename, errno, strerror (errno));
+ return FALSE;
+ }
+ g_free (log->cur_idx);
+ /* Now mmap it to memory */
+ if ((log->cur_idx = mmap (NULL, sizeof (struct rspamd_index_block),
+ PROT_READ | PROT_WRITE, MAP_SHARED, log->fd, seek)) == MAP_FAILED) {
+ log->cur_idx = NULL;
+ unlock_file (log->fd, FALSE);
+ msg_warn ("create_new_metaindex_block: cannot mmap file %s, error %d, %s", log->filename, errno, strerror (errno));
+ return FALSE;
+ }
+ unlock_file (log->fd, FALSE);
+
+ return TRUE;
+}
+
+static gboolean
+maybe_rotate_binlog (struct rspamd_binlog *log)
+{
+ uint64_t now = time (NULL);
+
+ if ((now - log->header.create_time) > log->rotate_time + log->rotate_jitter) {
+ return TRUE;
+ }
+ return FALSE;
+}
+
+static gboolean
+rotate_binlog (struct rspamd_binlog *log)
+{
+ char *backup_name;
+ struct stat st;
+
+ lock_file (log->fd, FALSE);
+
+ /* Unmap mapped fragments */
+ if (log->metaindex) {
+ munmap (log->metaindex, sizeof (struct rspamd_binlog_metaindex));
+ log->metaindex = NULL;
+ }
+ if (log->cur_idx) {
+ munmap (log->cur_idx, sizeof (struct rspamd_index_block));
+ log->cur_idx = NULL;
+ }
+ /* Format backup name */
+ backup_name = g_strdup_printf ("%s.%s", log->filename, BACKUP_SUFFIX);
+
+ if (stat (backup_name, &st) != -1) {
+ msg_info ("rotate_binlog: replace old %s", backup_name);
+ unlink (backup_name);
+ }
+
+ rename (log->filename, backup_name);
+ g_free (backup_name);
+
+ /* XXX: maybe race condition here */
+ unlock_file (log->fd, FALSE);
+ close (log->fd);
+
+ return binlog_create (log);
+
+}
+
+gboolean
+binlog_insert (struct rspamd_binlog *log, GTree *nodes)
+{
+ off_t seek;
+
+ if (!log || !log->metaindex || !log->cur_idx || !nodes) {
+ msg_info ("binlog_insert: improperly opened binlog: %s", log->filename);
+ return FALSE;
+ }
+
+ if (maybe_rotate_binlog (log)) {
+ if (!rotate_binlog (log)) {
+ return FALSE;
+ }
+ }
+ /* First of all try to place new tokens in current index */
+ if (log->cur_idx->last_index < BINLOG_IDX_LEN) {
+ /* All is ok */
+ return write_binlog_tree (log, nodes);
+ }
+ /* Current index table is all busy, try to allocate new index */
+
+ /* Check metaindex free space */
+ if (log->metaindex->last_index < METAINDEX_LEN) {
+ /* Create new index block */
+ if ((seek = lseek (log->fd, 0, SEEK_END)) == -1) {
+ msg_info ("binlog_insert: cannot seek in file: %s, error: %s", log->filename, strerror (errno));
+ return FALSE;
+ }
+ if (!create_new_metaindex_block (log)) {
+ return FALSE;
+ }
+ return write_binlog_tree (log, nodes);
+ }
+
+ /* All binlog is filled, we need to rotate it forcefully */
+ if (!rotate_binlog (log)) {
+ return FALSE;
+ }
+
+ return write_binlog_tree (log, nodes);
+}
+
+gboolean
+binlog_sync (struct rspamd_binlog *log, uint64_t from_rev, uint64_t from_time, GByteArray **rep)
+{
+ uint32_t metaindex_num;
+ struct rspamd_index_block *idxb;
+ struct rspamd_binlog_index *idx;
+ gboolean idx_mapped = FALSE, res = TRUE;
+
+ if (!log || !log->metaindex || !log->cur_idx) {
+ msg_info ("binlog_sync: improperly opened binlog: %s", log->filename);
+ return FALSE;
+ }
+
+ if (*rep == NULL) {
+ *rep = g_malloc (sizeof (GByteArray));
+ }
+ else {
+ /* Unmap old fragment */
+ munmap ((*rep)->data, (*rep)->len);
+ }
+
+ if (from_rev == log->cur_seq) {
+ /* Last record */
+ *rep = NULL;
+ return FALSE;
+ }
+
+ metaindex_num = from_rev / BINLOG_IDX_LEN;
+ /* First of all try to find this revision */
+ if (metaindex_num > log->metaindex->last_index) {
+ return FALSE;
+ }
+ else if (metaindex_num != log->metaindex->last_index) {
+ /* Need to remap index block */
+ lock_file (log->fd, FALSE);
+ if ((idxb = mmap (NULL, sizeof (struct rspamd_index_block),
+ PROT_READ | PROT_WRITE, MAP_SHARED,
+ log->fd, log->metaindex->indexes[metaindex_num])) == MAP_FAILED) {
+ unlock_file (log->fd, FALSE);
+ msg_warn ("binlog_sync: cannot mmap file %s, error %d, %s", log->filename, errno, strerror (errno));
+ return FALSE;
+ }
+ idx_mapped = TRUE;
+ }
+ else {
+ idxb = log->cur_idx;
+ }
+ /* Now check specified index */
+ idx = &idxb->indexes[from_rev % BINLOG_IDX_LEN];
+ if (idx->time != from_time) {
+ res = FALSE;
+ goto end;
+ }
+
+ /* Now fill reply structure */
+ (*rep)->len = idx->len;
+ /* MMap result */
+ if (((*rep)->data = mmap (NULL, idx->len, PROT_READ, MAP_SHARED, log->fd, idx->seek)) == MAP_FAILED) {
+ msg_warn ("binlog_sync: cannot mmap file %s, error %d, %s", log->filename, errno, strerror (errno));
+ res = FALSE;
+ goto end;
+ }
+
+end:
+ if (idx_mapped) {
+ munmap (idxb, sizeof (struct rspamd_index_block));
+ }
+
+ return res;
+}
+
diff --git a/src/binlog.h b/src/binlog.h
new file mode 100644
index 000000000..bf2666840
--- /dev/null
+++ b/src/binlog.h
@@ -0,0 +1,59 @@
+#ifndef RSPAMD_BINLOG_H
+#define RSPAMD_BINLOG_H
+
+#include "config.h"
+#include "main.h"
+#include "statfile.h"
+
+/* How much records are in a single index */
+#define BINLOG_IDX_LEN 200
+#define METAINDEX_LEN 1024
+
+struct rspamd_binlog_header {
+ char magic[3];
+ char version[2];
+ char padding[3];
+ uint64_t create_time;
+} __attribute__((__packed__));
+
+struct rspamd_binlog_index {
+ uint64_t time;
+ uint64_t seek;
+ uint32_t len;
+};
+
+struct rspamd_index_block {
+ struct rspamd_binlog_index indexes[BINLOG_IDX_LEN];
+ uint32_t last_index;
+};
+
+struct rspamd_binlog_metaindex {
+ uint64_t indexes[METAINDEX_LEN];
+ uint64_t last_index;
+};
+
+struct rspamd_binlog_element {
+ uint32_t h1;
+ uint32_t h2;
+ float value;
+} __attribute__((__packed__));
+
+struct rspamd_binlog {
+ char *filename;
+ time_t rotate_time;
+ int rotate_jitter;
+ uint64_t cur_seq;
+ int fd;
+ memory_pool_t *pool;
+
+ struct rspamd_binlog_header header;
+ struct rspamd_binlog_metaindex *metaindex;
+ struct rspamd_index_block *cur_idx;
+};
+
+struct rspamd_binlog* binlog_open (memory_pool_t *pool, const char *path, time_t rotate_time, int rotate_jitter);
+void binlog_close (struct rspamd_binlog *log);
+gboolean binlog_insert (struct rspamd_binlog *log, GTree *nodes);
+gboolean binlog_sync (struct rspamd_binlog *log, uint64_t from_rev, uint64_t from_time, GByteArray **rep);
+
+#endif
diff --git a/src/classifiers/winnow.c b/src/classifiers/winnow.c
index acf12a462..cc2a0cc23 100644
--- a/src/classifiers/winnow.c
+++ b/src/classifiers/winnow.c
@@ -77,11 +77,13 @@ learn_callback (gpointer key, gpointer value, gpointer data)
/* Consider that not found blocks have value 1 */
if ((v = statfile_pool_get_block (cd->pool, cd->file, node->h1, node->h2, cd->now)) < 0.00001) {
statfile_pool_set_block (cd->pool, cd->file, node->h1, node->h2, cd->now, c);
+ node->value = c;
}
else {
statfile_pool_set_block (cd->pool, cd->file, node->h1, node->h2, cd->now, v * c);
+ node->value = v * c;
}
-
+
cd->count++;
return FALSE;
diff --git a/src/tokenizers/tokenizers.h b/src/tokenizers/tokenizers.h
index ed3b66fcf..fda5bded3 100644
--- a/src/tokenizers/tokenizers.h
+++ b/src/tokenizers/tokenizers.h
@@ -17,6 +17,7 @@
typedef struct token_node_s {
uint32_t h1;
uint32_t h2;
+ float value;
} token_node_t;
/* Common tokenizer structure */