]> source.dussan.org Git - rspamd.git/commitdiff
* Add binlog API implementation
authorVsevolod Stakhov <vsevolod@rambler-co.ru>
Fri, 6 Nov 2009 17:31:03 +0000 (20:31 +0300)
committerVsevolod Stakhov <vsevolod@rambler-co.ru>
Fri, 6 Nov 2009 17:31:03 +0000 (20:31 +0300)
CMakeLists.txt
src/binlog.c [new file with mode: 0644]
src/binlog.h [new file with mode: 0644]
src/classifiers/winnow.c
src/tokenizers/tokenizers.h

index a8f4b67683348d80b78ee32444b1ae6048122eb8..8c58586d0f532bb216f73aaf7e8cd120824bd511 100644 (file)
@@ -344,36 +344,37 @@ ENDIF(DEBUG_MODE MATCHES "ON")
 ################################ SOURCES SECTION ###########################
 
 SET(RSPAMDSRC  src/modules.c
-                               src/hash.c
-                               src/worker.c
-                               src/util.c
-                               src/url.c
-                               src/upstream.c
-                               src/statfile.c
-                               src/protocol.c
-                               src/message.c
-                               src/fuzzy.c
-                               src/expressions.c
-                               src/mem_pool.c
-                               src/memcached.c
-                               src/main.c
-                               src/fstring.c
-                               src/filter.c
-                               src/controller.c
-                               src/cfg_utils.c
+                               src/binlog.c
+                src/bloom.c
                                src/buffer.c
+                               src/cfg_utils.c
+                               src/controller.c
                                src/events.c
+                               src/expressions.c
+                               src/filter.c
+                               src/fstring.c
+                               src/fuzzy.c
+                               src/fuzzy_storage.c
+                               src/hash.c
                                src/html.c
                                src/lmtp.c
-                               src/spf.c
                                src/lmtp_proto.c
-                               src/radix.c
-                               src/view.c
+                               src/main.c
                                src/map.c
-                src/bloom.c
+                               src/memcached.c
+                               src/mem_pool.c
+                               src/message.c
+                               src/protocol.c
+                               src/radix.c
                                src/settings.c
+                               src/spf.c
+                               src/statfile.c
                                src/symbols_cache.c
-                               src/fuzzy_storage.c)
+                               src/upstream.c
+                               src/url.c
+                               src/util.c
+                               src/view.c
+                               src/worker.c)
 
 IF(ENABLE_PERL MATCHES "ON")
        LIST(APPEND RSPAMDSRC src/perl.c)
diff --git a/src/binlog.c b/src/binlog.c
new file mode 100644 (file)
index 0000000..457e56a
--- /dev/null
@@ -0,0 +1,458 @@
+/*
+ * Copyright (c) 2009, Rambler media
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *     * Redistributions of source code must retain the above copyright
+ *       notice, this list of conditions and the following disclaimer.
+ *     * Redistributions in binary form must reproduce the above copyright
+ *       notice, this list of conditions and the following disclaimer in the
+ *       documentation and/or other materials provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY Rambler media ''AS IS'' AND ANY
+ * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+ * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
+ * DISCLAIMED. IN NO EVENT SHALL Rambler BE LIABLE FOR ANY
+ * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
+ * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
+ * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
+ * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
+ * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#include "config.h"
+#include "binlog.h"
+#include "tokenizers/tokenizers.h"
+
+#define BINLOG_SUFFIX ".binlog"
+#define BACKUP_SUFFIX ".old"
+#define VALID_MAGIC { 'r', 's', 'l' }
+#define VALID_VERSION { '1', '0' }
+
+static gboolean 
+binlog_write_header (struct rspamd_binlog *log)
+{
+       struct rspamd_binlog_header header = {
+               .magic = VALID_MAGIC,
+               .version = VALID_VERSION,
+               .padding = { '\0', '\0' },
+       };
+       
+       header.create_time = time (NULL);
+       lock_file (log->fd, FALSE);
+       
+       if (write (log->fd, &header, sizeof (struct rspamd_binlog_header)) == -1) {
+               msg_warn ("binlog_write_header: cannot write file %s, error %d, %s", log->filename, errno, strerror (errno));
+               return FALSE;
+       }
+
+
+       memcpy (&log->header, &header, sizeof (struct rspamd_binlog_header));
+       
+       /* Metaindex */
+       log->metaindex = g_malloc (sizeof (struct rspamd_binlog_metaindex));
+       bzero (log->metaindex, sizeof (struct rspamd_binlog_metaindex));
+       if (write (log->fd, log->metaindex, sizeof (struct rspamd_binlog_metaindex))  == -1) {
+               g_free (log->metaindex);
+               msg_warn ("binlog_write_header: cannot write file %s, error %d, %s", log->filename, errno, strerror (errno));
+               unlock_file (log->fd, FALSE);
+               return FALSE;
+       }
+       g_free (log->metaindex);
+       /* Now mmap metaindex to memory */
+       if ((log->metaindex = mmap (NULL, sizeof (struct rspamd_binlog_metaindex), 
+                                       PROT_READ | PROT_WRITE, MAP_SHARED, log->fd, sizeof (struct rspamd_binlog_header))) == MAP_FAILED) {
+               msg_warn ("binlog_write_header: cannot mmap file %s, error %d, %s", log->filename, errno, strerror (errno));
+               unlock_file (log->fd, FALSE);
+               return FALSE;
+       }
+
+       /* First index block */
+
+       /* Offset to metaindex */
+       log->metaindex->indexes[0] = sizeof (struct rspamd_binlog_metaindex) + sizeof (struct rspamd_binlog_header);
+       /* Alloc, write, mmap */
+       log->cur_idx = g_malloc (sizeof (struct rspamd_index_block));
+       bzero (log->cur_idx, sizeof (struct rspamd_index_block));
+       if (write (log->fd, log->cur_idx, sizeof (struct rspamd_index_block))  == -1) {
+               g_free (log->cur_idx);
+               msg_warn ("binlog_write_header: cannot write file %s, error %d, %s", log->filename, errno, strerror (errno));
+               unlock_file (log->fd, FALSE);
+               return FALSE;
+       }
+       g_free (log->cur_idx);
+       /* Now mmap it to memory */
+       if ((log->cur_idx = mmap (NULL, sizeof (struct rspamd_index_block), 
+                                       PROT_READ | PROT_WRITE, MAP_SHARED, log->fd, log->metaindex->indexes[0])) == MAP_FAILED) {
+               msg_warn ("binlog_write_header: cannot mmap file %s, error %d, %s", log->filename, errno, strerror (errno));
+               unlock_file (log->fd, FALSE);
+               return FALSE;
+       }
+
+       unlock_file (log->fd, FALSE);
+
+       return TRUE;
+}
+
+static gboolean
+binlog_check_file (struct rspamd_binlog *log)
+{
+       static char valid_magic[] = VALID_MAGIC, valid_version[] = VALID_VERSION;
+
+       if (read (log->fd, &log->header, sizeof (struct rspamd_binlog_header)) != sizeof (struct rspamd_binlog_header)) {
+               msg_warn ("binlog_check_file: cannot read file %s, error %d, %s", log->filename, errno, strerror (errno));
+               return FALSE;
+       }
+
+       /* Now check all fields */
+       if (memcmp (&log->header.magic, valid_magic, sizeof (valid_magic)) != 0 || 
+                       memcmp (&log->header.version, valid_version, sizeof (valid_version)) != 0) {
+               msg_warn ("binlog_check_file: cannot validate file %s");
+               return FALSE;
+       }
+       /* Now mmap metaindex and current index */
+       if ((log->metaindex = mmap (NULL, sizeof (struct rspamd_binlog_metaindex), 
+                                       PROT_READ | PROT_WRITE, MAP_SHARED, log->fd, sizeof (struct rspamd_binlog_header))) == MAP_FAILED) {
+               msg_warn ("binlog_check file: cannot mmap file %s, error %d, %s", log->filename, errno, strerror (errno));
+               return FALSE;
+       }
+       /* Current index */
+       if ((log->cur_idx = mmap (NULL, sizeof (struct rspamd_index_block), 
+                                       PROT_READ | PROT_WRITE, MAP_SHARED, log->fd, log->metaindex->indexes[log->metaindex->last_index])) == MAP_FAILED) {
+               msg_warn ("binlog_check_file: cannot mmap file %s, error %d, %s", log->filename, errno, strerror (errno));
+               return FALSE;
+       }
+
+       return TRUE;
+
+}
+
+static gboolean
+binlog_create (struct rspamd_binlog *log)
+{
+       if ((log->fd = open (log->filename, O_RDWR | O_TRUNC | O_CREAT, S_IWUSR | S_IRUSR)) == -1) {
+               msg_info ("binlog_create: cannot create file %s, error %d, %s", log->filename, errno, strerror (errno));
+               return FALSE;
+       }
+
+       return binlog_write_header (log);
+}
+
+static gboolean
+binlog_open_real (struct rspamd_binlog *log)
+{
+       if ((log->fd = open (log->filename, O_RDWR)) == -1) {
+               msg_info ("binlog_open: cannot open file %s, error %d, %s", log->filename, errno, strerror (errno));
+               return FALSE;
+       }
+       
+       return binlog_check_file (log);
+}
+
+
+struct rspamd_binlog* 
+binlog_open (memory_pool_t *pool, const char *path, time_t rotate_time, int rotate_jitter)
+{
+       struct rspamd_binlog *new;
+       int len = strlen (path);
+       struct stat st;
+       
+       new = memory_pool_alloc0 (pool, sizeof (struct rspamd_binlog));
+       new->pool = pool;
+       new->rotate_time = rotate_time;
+       new->rotate_jitter = g_random_int_range (0, rotate_jitter);
+       new->fd = -1;
+       
+       new->filename = memory_pool_alloc (pool, len + sizeof (BINLOG_SUFFIX));
+       g_strlcpy (new->filename, path, len + 1);
+       g_strlcpy (new->filename + len, BINLOG_SUFFIX, sizeof (BINLOG_SUFFIX));
+
+       if (stat (new->filename, &st) == -1) {
+               /* Check errno to check whether we should create this file */
+               if (errno != ENOENT) {
+                       msg_err ("binlog_open: cannot stat file: %s, error %s", new->filename, strerror (errno));
+                       return NULL;
+               }
+               else {
+                       /* In case of ENOENT try to create binlog */
+                       if (!binlog_create (new)) {
+                               return NULL;
+                       }
+               }
+       }
+       else {
+               /* Try to open binlog */
+               if (!binlog_open_real (new)) {
+                       return NULL;
+               }
+       }
+
+       return new;
+}
+
+void 
+binlog_close (struct rspamd_binlog *log)
+{
+       if (log) {
+               if (log->metaindex) {
+                       munmap (log->metaindex, sizeof (struct rspamd_binlog_metaindex));
+               }
+               if (log->cur_idx) {
+                       munmap (log->cur_idx, sizeof (struct rspamd_index_block));
+               }
+               close (log->fd);
+       }
+}
+
+static gboolean
+binlog_tree_callback (gpointer key, gpointer value, gpointer data)
+{
+       token_node_t                   *node = key;
+       struct rspamd_binlog           *log = data;
+       struct rspamd_binlog_element    elt;
+
+       elt.h1 = node->h1;
+       elt.h2 = node->h2;
+       elt.value = node->value;
+       
+       if (write (log->fd, &elt, sizeof (elt)) == -1) {
+               msg_info ("write_binlog_tree: cannot write token to file: %s, error: %s", log->filename, strerror (errno));
+               return TRUE;
+       }
+
+       return FALSE;
+}
+
+static gboolean
+write_binlog_tree (struct rspamd_binlog *log, GTree *nodes)
+{
+       off_t seek;
+       struct rspamd_binlog_index *idx;
+       
+       /* Seek to end of file */
+       if ((seek = lseek (log->fd, 0, SEEK_END)) == -1) {
+               msg_info ("binlog_insert: cannot seek in file: %s, error: %s", log->filename, strerror (errno));
+               return FALSE;
+       }
+       
+       lock_file (log->fd, FALSE);
+       /* Write index */
+       log->cur_idx->last_index ++;
+       log->cur_seq ++;        
+
+       idx = &log->cur_idx->indexes[log->cur_idx->last_index];
+       idx->seek = seek;
+       idx->time = (uint64_t)time (NULL);
+       idx->len = g_tree_nnodes (nodes) * sizeof (struct rspamd_binlog_element);
+       
+       /* Now write all nodes to file */
+       g_tree_foreach (nodes, binlog_tree_callback, (gpointer)log);
+
+       unlock_file (log->fd, FALSE);
+       
+       return TRUE;
+}
+
+static gboolean
+create_new_metaindex_block (struct rspamd_binlog *log)
+{
+       off_t seek;
+       
+       /* Seek to end of file */
+       if ((seek = lseek (log->fd, 0, SEEK_END)) == -1) {
+               msg_info ("create_new_metaindex_block: cannot seek in file: %s, error: %s", log->filename, strerror (errno));
+               return FALSE;
+       }
+
+       lock_file (log->fd, FALSE);
+       
+       log->metaindex->last_index ++;
+       /* Offset to metaindex */
+       log->metaindex->indexes[log->metaindex->last_index] = seek;
+       munmap (log->cur_idx, sizeof (struct rspamd_index_block));
+       /* Alloc, write, mmap */
+       log->cur_idx = g_malloc (sizeof (struct rspamd_index_block));
+       bzero (log->cur_idx, sizeof (struct rspamd_index_block));
+       if (write (log->fd, log->cur_idx, sizeof (struct rspamd_index_block))  == -1) {
+               unlock_file (log->fd, FALSE);
+               g_free (log->cur_idx);
+               msg_warn ("create_new_metaindex_block: cannot write file %s, error %d, %s", log->filename, errno, strerror (errno));
+               return FALSE;
+       }
+       g_free (log->cur_idx);
+       /* Now mmap it to memory */
+       if ((log->cur_idx = mmap (NULL, sizeof (struct rspamd_index_block), 
+                                       PROT_READ | PROT_WRITE, MAP_SHARED, log->fd, seek)) == MAP_FAILED) {
+               log->cur_idx = NULL;
+               unlock_file (log->fd, FALSE);
+               msg_warn ("create_new_metaindex_block: cannot mmap file %s, error %d, %s", log->filename, errno, strerror (errno));
+               return FALSE;
+       }
+       unlock_file (log->fd, FALSE);
+
+       return TRUE;
+}
+
+static gboolean
+maybe_rotate_binlog (struct rspamd_binlog *log)
+{
+       uint64_t now = time (NULL);
+
+       if ((now - log->header.create_time) > log->rotate_time + log->rotate_jitter) {
+               return TRUE;
+       }
+       return FALSE;
+}
+
+static gboolean
+rotate_binlog (struct rspamd_binlog *log)
+{
+       char *backup_name;
+       struct stat st;
+
+       lock_file (log->fd, FALSE);
+
+       /* Unmap mapped fragments */
+       if (log->metaindex) {
+               munmap (log->metaindex, sizeof (struct rspamd_binlog_metaindex));
+               log->metaindex = NULL;
+       }
+       if (log->cur_idx) {
+               munmap (log->cur_idx, sizeof (struct rspamd_index_block));
+               log->cur_idx = NULL;
+       }
+       /* Format backup name */
+       backup_name = g_strdup_printf ("%s.%s", log->filename, BACKUP_SUFFIX);
+
+       if (stat (backup_name, &st) != -1) {
+               msg_info ("rotate_binlog: replace old %s", backup_name);
+               unlink (backup_name);
+       }
+
+       rename (log->filename, backup_name);
+       g_free (backup_name);
+
+       /* XXX: maybe race condition here */
+       unlock_file (log->fd, FALSE);
+       close (log->fd);
+
+       return binlog_create (log);
+
+}
+
+gboolean 
+binlog_insert (struct rspamd_binlog *log, GTree *nodes)
+{
+       off_t seek;
+
+       if (!log || !log->metaindex || !log->cur_idx || !nodes) {
+               msg_info ("binlog_insert: improperly opened binlog: %s", log->filename);
+               return FALSE;
+       }
+
+       if (maybe_rotate_binlog (log)) {
+               if (!rotate_binlog (log)) {
+                       return FALSE;
+               }
+       }
+       /* First of all try to place new tokens in current index */
+       if (log->cur_idx->last_index < BINLOG_IDX_LEN) {
+               /* All is ok */
+               return write_binlog_tree (log, nodes);
+       }
+       /* Current index table is all busy, try to allocate new index */
+
+       /* Check metaindex free space */
+       if (log->metaindex->last_index < METAINDEX_LEN) {
+               /* Create new index block */
+               if ((seek = lseek (log->fd, 0, SEEK_END)) == -1) {
+                       msg_info ("binlog_insert: cannot seek in file: %s, error: %s", log->filename, strerror (errno));
+                       return FALSE;
+               }
+               if (!create_new_metaindex_block (log)) {
+                       return FALSE;
+               }
+               return write_binlog_tree (log, nodes);
+       }
+       
+       /* All binlog is filled, we need to rotate it forcefully */
+       if (!rotate_binlog (log)) {
+               return FALSE;
+       }
+
+       return write_binlog_tree (log, nodes);
+}
+
+gboolean 
+binlog_sync (struct rspamd_binlog *log, uint64_t from_rev, uint64_t from_time, GByteArray **rep)
+{
+       uint32_t metaindex_num;
+       struct rspamd_index_block *idxb;
+       struct rspamd_binlog_index *idx;
+       gboolean idx_mapped = FALSE, res = TRUE;
+
+       if (!log || !log->metaindex || !log->cur_idx) {
+               msg_info ("binlog_sync: improperly opened binlog: %s", log->filename);
+               return FALSE;
+       }
+
+       if (*rep == NULL) {
+               *rep = g_malloc (sizeof (GByteArray));
+       }
+       else {
+               /* Unmap old fragment */
+               munmap ((*rep)->data, (*rep)->len);
+       }
+       
+       if (from_rev == log->cur_seq) {
+               /* Last record */
+               *rep = NULL;
+               return FALSE;
+       }
+
+       metaindex_num = from_rev / BINLOG_IDX_LEN;
+       /* First of all try to find this revision */
+       if (metaindex_num > log->metaindex->last_index) {
+               return FALSE;
+       }
+       else if (metaindex_num != log->metaindex->last_index) {
+               /* Need to remap index block */
+               lock_file (log->fd, FALSE);
+               if ((idxb = mmap (NULL, sizeof (struct rspamd_index_block), 
+                                       PROT_READ | PROT_WRITE, MAP_SHARED, 
+                                       log->fd, log->metaindex->indexes[metaindex_num])) == MAP_FAILED) {
+                       unlock_file (log->fd, FALSE);
+                       msg_warn ("binlog_sync: cannot mmap file %s, error %d, %s", log->filename, errno, strerror (errno));
+                       return FALSE;
+               }
+               idx_mapped = TRUE;
+       }
+       else {
+               idxb = log->cur_idx;
+       }
+       /* Now check specified index */
+       idx = &idxb->indexes[from_rev % BINLOG_IDX_LEN];
+       if (idx->time != from_time) {
+               res = FALSE;
+               goto end;
+       }
+
+       /* Now fill reply structure */
+       (*rep)->len = idx->len;
+       /* MMap result */
+       if (((*rep)->data = mmap (NULL, idx->len, PROT_READ, MAP_SHARED, log->fd, idx->seek)) == MAP_FAILED) {
+               msg_warn ("binlog_sync: cannot mmap file %s, error %d, %s", log->filename, errno, strerror (errno));
+               res = FALSE;
+               goto end;
+       }
+
+end:
+       if (idx_mapped) {
+               munmap (idxb, sizeof (struct rspamd_index_block));
+       }
+
+       return res;
+}
+
diff --git a/src/binlog.h b/src/binlog.h
new file mode 100644 (file)
index 0000000..bf26668
--- /dev/null
@@ -0,0 +1,59 @@
+#ifndef RSPAMD_BINLOG_H
+#define RSPAMD_BINLOG_H
+
+#include "config.h"
+#include "main.h"
+#include "statfile.h"
+
+/* How much records are in a single index */
+#define BINLOG_IDX_LEN 200
+#define METAINDEX_LEN 1024
+
+struct rspamd_binlog_header {
+       char magic[3];
+       char version[2];
+       char padding[3];
+       uint64_t create_time;
+} __attribute__((__packed__));
+
+struct rspamd_binlog_index {
+       uint64_t time;
+       uint64_t seek;
+       uint32_t len;
+};
+
+struct rspamd_index_block {
+       struct rspamd_binlog_index indexes[BINLOG_IDX_LEN];
+       uint32_t last_index;
+};
+
+struct rspamd_binlog_metaindex {
+       uint64_t indexes[METAINDEX_LEN];
+       uint64_t last_index;
+};
+
+struct rspamd_binlog_element {
+       uint32_t h1;
+       uint32_t h2;
+       float value;
+} __attribute__((__packed__));
+
+struct rspamd_binlog {
+       char *filename;
+       time_t rotate_time;
+       int rotate_jitter;
+       uint64_t cur_seq;
+       int fd;
+       memory_pool_t *pool;
+
+       struct rspamd_binlog_header header;
+       struct rspamd_binlog_metaindex *metaindex;
+       struct rspamd_index_block *cur_idx;
+};
+
+struct rspamd_binlog* binlog_open (memory_pool_t *pool, const char *path, time_t rotate_time, int rotate_jitter);
+void binlog_close (struct rspamd_binlog *log);
+gboolean binlog_insert (struct rspamd_binlog *log, GTree *nodes);
+gboolean binlog_sync (struct rspamd_binlog *log, uint64_t from_rev, uint64_t from_time, GByteArray **rep);
+
+#endif
index acf12a462c6933b2d81e04574c60b5f34ed111b3..cc2a0cc23133c46ecddc2be6621f4abb8c7e3236 100644 (file)
@@ -77,11 +77,13 @@ learn_callback (gpointer key, gpointer value, gpointer data)
        /* Consider that not found blocks have value 1 */
        if ((v = statfile_pool_get_block (cd->pool, cd->file, node->h1, node->h2, cd->now)) < 0.00001) {
                statfile_pool_set_block (cd->pool, cd->file, node->h1, node->h2, cd->now, c);
+               node->value = c;
        }
        else {
                statfile_pool_set_block (cd->pool, cd->file, node->h1, node->h2, cd->now, v * c);
+               node->value = v * c;
        }
-
+       
        cd->count++;
 
        return FALSE;
index ed3b66fcfbd0d2c84cbde224fb25139b3cd9d821..fda5bded340c9e38aedf9ee31594f4a1fdc934f9 100644 (file)
@@ -17,6 +17,7 @@
 typedef struct token_node_s {
        uint32_t h1;
        uint32_t h2;
+       float value;
 } token_node_t;
 
 /* Common tokenizer structure */