]> source.dussan.org Git - rspamd.git/commitdiff
* Add initial version of rspamd binlog syncronization client
authorVsevolod Stakhov <vsevolod@rambler-co.ru>
Wed, 25 Nov 2009 17:32:15 +0000 (20:32 +0300)
committerVsevolod Stakhov <vsevolod@rambler-co.ru>
Wed, 25 Nov 2009 17:32:15 +0000 (20:32 +0300)
* Style fix to perl module

CMakeLists.txt
perl/lib/Mail/Rspamd/Client.pm
src/statfile_sync.c [new file with mode: 0644]
src/statfile_sync.h [new file with mode: 0644]

index 0803b0ad9b5aeb6b4ffee80189cfb503a7a6b0fb..cca84015a382a7355aee19528a292b1157314ecd 100644 (file)
@@ -368,6 +368,7 @@ SET(RSPAMDSRC       src/modules.c
                                src/settings.c
                                src/spf.c
                                src/statfile.c
+                               src/statfile_sync.c
                                src/symbols_cache.c
                                src/upstream.c
                                src/url.c
index fa42bd5a32f711058e9c7b329240645fa4f23041..32801b9abb27b4de670ea34eb91c8216c3a5d0a8 100644 (file)
@@ -153,7 +153,8 @@ sub check {
   
   return undef unless $self->_get_io_readiness($remote, 0);
                
-  my $in, $res, $offset = 0;
+  my ($in, $res);
+  my $offset = 0;
   do {
                $res = sysread($remote, $in, 512, $offset);
                if ($res > 0 && $res < 512) {
diff --git a/src/statfile_sync.c b/src/statfile_sync.c
new file mode 100644 (file)
index 0000000..71be887
--- /dev/null
@@ -0,0 +1,289 @@
+/*
+ * Copyright (c) 2009, Rambler media
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *      * Redistributions of source code must retain the above copyright
+ *        notice, this list of conditions and the following disclaimer.
+ *      * Redistributions in binary form must reproduce the above copyright
+ *        notice, this list of conditions and the following disclaimer in the
+ *        documentation and/or other materials provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY Rambler media ''AS IS'' AND ANY
+ * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+ * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
+ * DISCLAIMED. IN NO EVENT SHALL Rambler BE LIABLE FOR ANY
+ * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
+ * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
+ * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
+ * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
+ * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#include "config.h"
+#include "cfg_file.h"
+#include "tokenizers/tokenizers.h"
+#include "classifiers/classifiers.h"
+#include "statfile.h"
+#include "binlog.h"
+#include "buffer.h"
+#include "statfile_sync.h"
+
+/* XXX: hardcoding this value is not very smart */
+#define MAX_SYNC_TIME 60
+#define IO_TIMEOUT 5
+
+
+enum rspamd_sync_state {
+       SYNC_STATE_GREETING,
+       SYNC_STATE_READ_REV,
+       SYNC_STATE_QUIT,
+};
+
+/* Context of sync process */
+struct rspamd_sync_ctx {
+       struct statfile *st;
+       stat_file_t *real_statfile;
+       statfile_pool_t *pool;
+       rspamd_io_dispatcher_t *dispatcher;
+
+       struct event tm_ev;
+
+       struct timeval interval;
+       struct timeval io_tv;
+       int sock;
+       enum rspamd_sync_state state;
+       gboolean is_busy;
+
+       uint64_t new_rev;
+       uint64_t new_time;
+       uint64_t new_len;
+};
+
+
+static                          gboolean
+parse_revision_line (struct rspamd_sync_ctx *ctx, f_str_t *in)
+{
+       int i, state = 0;
+       char *p, *c, t;
+       uint64_t *val;
+
+       /* First of all try to find END line */
+       if (in->len >= sizeof ("END") - 1 && memcmp (in->begin, "END", sizeof ("END") - 1) == 0) {
+               ctx->state = SYNC_STATE_QUIT;
+               ctx->is_busy = FALSE;
+               return TRUE;
+       }
+
+       /* Now try to extract 3 numbers from string: revision, time and length */
+       p = in->begin;
+       val = &ctx->new_rev;
+       for (i = 0; i < in->len; i ++, p ++) {
+               if (g_ascii_isspace (*p) || i == in->len - 1) {
+                       if (state == 1) {
+                               t = *p;
+                               *p = '\0';
+                               errno = 0;
+                               *val = strtoull (c, NULL, 10);
+                               *p = t;
+                               if (errno != 0) {
+                                       msg_info ("parse_revision_line: cannot parse number %s", strerror (errno));
+                                       return FALSE;
+                               }
+                               state = 2;
+                       }
+               }
+               else {
+                       if (state == 0) {
+                               c = p;
+                               state = 1;
+                       }
+                       else if (state == 2) {
+                               if (val == &ctx->new_rev) {
+                                       val = &ctx->new_time;
+                               }
+                               else if (val == &ctx->new_time) {
+                                       val = &ctx->new_len;
+                               }
+                               c = p;
+                               state = 1;
+                       }
+               }
+       }
+       
+       /* Current value must be len value and its value must not be 0 */
+       return ((val == &ctx->new_len) && *val != 0);
+}
+
+static                          gboolean
+read_blocks (struct rspamd_sync_ctx *ctx, f_str_t *in)
+{
+       struct rspamd_binlog_element *elt;
+       int                           i;
+       
+       statfile_pool_lock_file (ctx->pool, ctx->real_statfile);
+       elt = (struct rspamd_binlog_element *)in->begin;
+       for (i = 0; i < in->len / sizeof (struct rspamd_binlog_element); i ++, elt ++) {
+               statfile_pool_set_block (ctx->pool, ctx->real_statfile, elt->h1, elt->h2, ctx->new_time, elt->value);
+       }
+       statfile_pool_unlock_file (ctx->pool, ctx->real_statfile);
+
+       return TRUE;
+}
+
+static                          gboolean
+sync_read (f_str_t * in, void *arg)
+{
+       struct rspamd_sync_ctx *ctx = arg;
+
+       switch (ctx->state) {
+               case SYNC_STATE_GREETING:
+                       /* Try to parse line from server */
+                       if (!parse_revision_line (ctx, in)) {
+                               msg_info ("sync_read: cannot parse line");
+                               rspamd_remove_dispatcher (ctx->dispatcher);
+                               ctx->is_busy = FALSE;
+                               return FALSE;
+                       }
+                       else if (ctx->state != SYNC_STATE_QUIT) {
+                               ctx->state = SYNC_STATE_READ_REV;
+                               rspamd_set_dispatcher_policy (ctx->dispatcher, BUFFER_CHARACTER, ctx->new_len);
+                       }
+                       else {
+                               /* Quit this session */
+                               rspamd_remove_dispatcher (ctx->dispatcher);
+                               ctx->is_busy = FALSE;
+                               /* Immideately return from callback */ 
+                               return FALSE;
+                       }
+                       break;
+               case SYNC_STATE_READ_REV:
+                       /* In now contains all blocks of specified revision, so we can read them directly */
+                       if (!read_blocks (ctx, in)) {
+                               msg_info ("sync_read: cannot read blocks");
+                               rspamd_remove_dispatcher (ctx->dispatcher);
+                               ctx->is_busy = FALSE;
+                               return FALSE;
+                       }
+                       statfile_set_revision (ctx->real_statfile, ctx->new_rev, ctx->new_time);
+                       /* Now try to read other revision or END line */
+                       rspamd_set_dispatcher_policy (ctx->dispatcher, BUFFER_LINE, 0);
+                       ctx->state = SYNC_STATE_GREETING;
+                       break;
+               case SYNC_STATE_QUIT:
+                       rspamd_remove_dispatcher (ctx->dispatcher);
+                       ctx->is_busy = FALSE;
+                       return FALSE;
+       }
+
+       return TRUE;
+}
+
+static                         void 
+sync_err (GError *err, void *arg)
+{
+       struct rspamd_sync_ctx *ctx = arg;
+
+       msg_info ("sync_err: abnormally closing connection, error: %s", err->message);
+       ctx->is_busy = FALSE;
+       rspamd_remove_dispatcher (ctx->dispatcher);
+}
+
+
+static void
+sync_timer_callback (int fd, short what, void *ud)
+{
+       struct rspamd_sync_ctx *ctx = ud;
+       char                    buf[256];
+       uint64_t                rev = 0;
+       time_t                  ti = 0;
+       
+       /* Plan new event */
+       evtimer_del (&ctx->tm_ev);
+       ctx->interval.tv_sec = g_random_int_range (MAX_SYNC_TIME, MAX_SYNC_TIME * 2);
+       ctx->interval.tv_usec = 0;
+       evtimer_add (&ctx->tm_ev, &ctx->interval);
+       
+       if (ctx->is_busy) {
+               /* Sync is in progress */
+               msg_info ("sync_timer_callback: syncronization process is in progress, do not start new one");
+               return;
+       }
+
+       if ((ctx->sock = make_tcp_socket (&ctx->st->binlog->master_addr, ctx->st->binlog->master_port, FALSE, TRUE)) == -1) {
+               msg_info ("sync_timer_callback: cannot connect to %s", inet_ntoa (ctx->st->binlog->master_addr));
+               return;
+       }
+       /* Now create and activate dispatcher */
+       ctx->io_tv.tv_sec = IO_TIMEOUT;
+       ctx->io_tv.tv_usec = 0;
+       ctx->dispatcher = rspamd_create_dispatcher (ctx->sock, BUFFER_LINE, sync_read, NULL, sync_err, &ctx->io_tv, ctx);
+       /* Write initial data */
+       statfile_get_revision (ctx->real_statfile, &rev, &ti);
+       rev = snprintf (buf, sizeof (buf), "sync %s %ld %ld" CRLF, ctx->st->symbol, (long int)rev, (long int)ti);
+       
+       ctx->state = SYNC_STATE_GREETING;
+       ctx->is_busy = TRUE;
+
+       rspamd_dispatcher_write (ctx->dispatcher, buf, rev, FALSE, FALSE);      
+}
+
+static gboolean
+add_statfile_watch (statfile_pool_t *pool, struct statfile *st)
+{
+       struct rspamd_sync_ctx *ctx;
+       
+       ctx = memory_pool_alloc (pool->pool, sizeof (struct rspamd_sync_ctx));
+       ctx->st = st;
+       /* Add some jittering for synchronization */
+       ctx->interval.tv_sec = g_random_int_range (MAX_SYNC_TIME, MAX_SYNC_TIME * 2);
+       ctx->interval.tv_usec = 0;
+       /* Open statfile and attach it to pool */
+       if ((ctx->real_statfile = statfile_pool_is_open (pool, st->path)) == NULL) {
+               if ((ctx->real_statfile = statfile_pool_open (pool, st->path, st->size, FALSE)) == NULL) {
+                       msg_warn ("add_statfile_watch: cannot open %s", st->path);
+                       if (statfile_pool_create (pool, st->path, st->size) == -1) {
+                               msg_err ("add_statfile_watch: cannot create statfile %s", st->path);
+                               return FALSE;
+                       }
+                       ctx->real_statfile = statfile_pool_open (pool, st->path, st->size, FALSE);
+               }
+       }
+       /* Now plan event for it's future executing */
+       evtimer_set (&ctx->tm_ev, sync_timer_callback, ctx);
+       evtimer_add (&ctx->tm_ev, &ctx->interval);
+
+       return TRUE;
+}
+
+gboolean 
+start_statfile_sync (statfile_pool_t *pool, struct config_file *cfg)
+{
+       GList *cur, *l;
+       struct classifier_config *cl;
+       struct statfile *st;
+
+       /* 
+        * First of all walk through all classifiers and find those statfiles
+        * for which we should do sync (slave affinity)
+        */
+       cur = cfg->classifiers;
+       while (cur) {
+               cl = cur->data;
+               l = cl->statfiles;
+               while (l) {
+                       st = l->data;
+                       if (st->binlog != NULL && st->binlog->affinity == AFFINITY_SLAVE) {
+                               if (!add_statfile_watch (pool, st)) {
+                                       return FALSE;
+                               }
+                       }
+                       l = g_list_next (l);
+               }
+               cur = g_list_next (cur);
+       }
+
+       return TRUE;
+}
diff --git a/src/statfile_sync.h b/src/statfile_sync.h
new file mode 100644 (file)
index 0000000..ba3aec2
--- /dev/null
@@ -0,0 +1,11 @@
+#ifndef RSPAMD_STATFILE_SYNC_H
+#define RSPAMD_STATFILE_SYNC_H
+
+#include "config.h"
+#include "main.h"
+#include "statfile.h"
+#include "cfg_file.h"
+
+gboolean start_statfile_sync (statfile_pool_t *pool, struct config_file *cfg);
+
+#endif