summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--CMakeLists.txt1
-rw-r--r--perl/lib/Mail/Rspamd/Client.pm3
-rw-r--r--src/statfile_sync.c289
-rw-r--r--src/statfile_sync.h11
4 files changed, 303 insertions, 1 deletions
diff --git a/CMakeLists.txt b/CMakeLists.txt
index 0803b0ad9..cca84015a 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -368,6 +368,7 @@ SET(RSPAMDSRC src/modules.c
src/settings.c
src/spf.c
src/statfile.c
+ src/statfile_sync.c
src/symbols_cache.c
src/upstream.c
src/url.c
diff --git a/perl/lib/Mail/Rspamd/Client.pm b/perl/lib/Mail/Rspamd/Client.pm
index fa42bd5a3..32801b9ab 100644
--- a/perl/lib/Mail/Rspamd/Client.pm
+++ b/perl/lib/Mail/Rspamd/Client.pm
@@ -153,7 +153,8 @@ sub check {
return undef unless $self->_get_io_readiness($remote, 0);
- my $in, $res, $offset = 0;
+ my ($in, $res);
+ my $offset = 0;
do {
$res = sysread($remote, $in, 512, $offset);
if ($res > 0 && $res < 512) {
diff --git a/src/statfile_sync.c b/src/statfile_sync.c
new file mode 100644
index 000000000..71be88733
--- /dev/null
+++ b/src/statfile_sync.c
@@ -0,0 +1,289 @@
+/*
+ * Copyright (c) 2009, Rambler media
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above copyright
+ * notice, this list of conditions and the following disclaimer in the
+ * documentation and/or other materials provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY Rambler media ''AS IS'' AND ANY
+ * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+ * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
+ * DISCLAIMED. IN NO EVENT SHALL Rambler BE LIABLE FOR ANY
+ * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
+ * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
+ * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
+ * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
+ * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#include "config.h"
+#include "cfg_file.h"
+#include "tokenizers/tokenizers.h"
+#include "classifiers/classifiers.h"
+#include "statfile.h"
+#include "binlog.h"
+#include "buffer.h"
+#include "statfile_sync.h"
+
+/* XXX: hardcoding this value is not very smart */
+#define MAX_SYNC_TIME 60
+#define IO_TIMEOUT 5
+
+
+enum rspamd_sync_state {
+ SYNC_STATE_GREETING,
+ SYNC_STATE_READ_REV,
+ SYNC_STATE_QUIT,
+};
+
+/* Context of sync process */
+struct rspamd_sync_ctx {
+ struct statfile *st;
+ stat_file_t *real_statfile;
+ statfile_pool_t *pool;
+ rspamd_io_dispatcher_t *dispatcher;
+
+ struct event tm_ev;
+
+ struct timeval interval;
+ struct timeval io_tv;
+ int sock;
+ enum rspamd_sync_state state;
+ gboolean is_busy;
+
+ uint64_t new_rev;
+ uint64_t new_time;
+ uint64_t new_len;
+};
+
+
+static gboolean
+parse_revision_line (struct rspamd_sync_ctx *ctx, f_str_t *in)
+{
+ int i, state = 0;
+ char *p, *c, t;
+ uint64_t *val;
+
+ /* First of all try to find END line */
+ if (in->len >= sizeof ("END") - 1 && memcmp (in->begin, "END", sizeof ("END") - 1) == 0) {
+ ctx->state = SYNC_STATE_QUIT;
+ ctx->is_busy = FALSE;
+ return TRUE;
+ }
+
+ /* Now try to extract 3 numbers from string: revision, time and length */
+ p = in->begin;
+ val = &ctx->new_rev;
+ for (i = 0; i < in->len; i ++, p ++) {
+ if (g_ascii_isspace (*p) || i == in->len - 1) {
+ if (state == 1) {
+ t = *p;
+ *p = '\0';
+ errno = 0;
+ *val = strtoull (c, NULL, 10);
+ *p = t;
+ if (errno != 0) {
+ msg_info ("parse_revision_line: cannot parse number %s", strerror (errno));
+ return FALSE;
+ }
+ state = 2;
+ }
+ }
+ else {
+ if (state == 0) {
+ c = p;
+ state = 1;
+ }
+ else if (state == 2) {
+ if (val == &ctx->new_rev) {
+ val = &ctx->new_time;
+ }
+ else if (val == &ctx->new_time) {
+ val = &ctx->new_len;
+ }
+ c = p;
+ state = 1;
+ }
+ }
+ }
+
+ /* Current value must be len value and its value must not be 0 */
+ return ((val == &ctx->new_len) && *val != 0);
+}
+
+static gboolean
+read_blocks (struct rspamd_sync_ctx *ctx, f_str_t *in)
+{
+ struct rspamd_binlog_element *elt;
+ int i;
+
+ statfile_pool_lock_file (ctx->pool, ctx->real_statfile);
+ elt = (struct rspamd_binlog_element *)in->begin;
+ for (i = 0; i < in->len / sizeof (struct rspamd_binlog_element); i ++, elt ++) {
+ statfile_pool_set_block (ctx->pool, ctx->real_statfile, elt->h1, elt->h2, ctx->new_time, elt->value);
+ }
+ statfile_pool_unlock_file (ctx->pool, ctx->real_statfile);
+
+ return TRUE;
+}
+
+static gboolean
+sync_read (f_str_t * in, void *arg)
+{
+ struct rspamd_sync_ctx *ctx = arg;
+
+ switch (ctx->state) {
+ case SYNC_STATE_GREETING:
+ /* Try to parse line from server */
+ if (!parse_revision_line (ctx, in)) {
+ msg_info ("sync_read: cannot parse line");
+ rspamd_remove_dispatcher (ctx->dispatcher);
+ ctx->is_busy = FALSE;
+ return FALSE;
+ }
+ else if (ctx->state != SYNC_STATE_QUIT) {
+ ctx->state = SYNC_STATE_READ_REV;
+ rspamd_set_dispatcher_policy (ctx->dispatcher, BUFFER_CHARACTER, ctx->new_len);
+ }
+ else {
+ /* Quit this session */
+ rspamd_remove_dispatcher (ctx->dispatcher);
+ ctx->is_busy = FALSE;
+ /* Immideately return from callback */
+ return FALSE;
+ }
+ break;
+ case SYNC_STATE_READ_REV:
+ /* In now contains all blocks of specified revision, so we can read them directly */
+ if (!read_blocks (ctx, in)) {
+ msg_info ("sync_read: cannot read blocks");
+ rspamd_remove_dispatcher (ctx->dispatcher);
+ ctx->is_busy = FALSE;
+ return FALSE;
+ }
+ statfile_set_revision (ctx->real_statfile, ctx->new_rev, ctx->new_time);
+ /* Now try to read other revision or END line */
+ rspamd_set_dispatcher_policy (ctx->dispatcher, BUFFER_LINE, 0);
+ ctx->state = SYNC_STATE_GREETING;
+ break;
+ case SYNC_STATE_QUIT:
+ rspamd_remove_dispatcher (ctx->dispatcher);
+ ctx->is_busy = FALSE;
+ return FALSE;
+ }
+
+ return TRUE;
+}
+
+static void
+sync_err (GError *err, void *arg)
+{
+ struct rspamd_sync_ctx *ctx = arg;
+
+ msg_info ("sync_err: abnormally closing connection, error: %s", err->message);
+ ctx->is_busy = FALSE;
+ rspamd_remove_dispatcher (ctx->dispatcher);
+}
+
+
+static void
+sync_timer_callback (int fd, short what, void *ud)
+{
+ struct rspamd_sync_ctx *ctx = ud;
+ char buf[256];
+ uint64_t rev = 0;
+ time_t ti = 0;
+
+ /* Plan new event */
+ evtimer_del (&ctx->tm_ev);
+ ctx->interval.tv_sec = g_random_int_range (MAX_SYNC_TIME, MAX_SYNC_TIME * 2);
+ ctx->interval.tv_usec = 0;
+ evtimer_add (&ctx->tm_ev, &ctx->interval);
+
+ if (ctx->is_busy) {
+ /* Sync is in progress */
+ msg_info ("sync_timer_callback: syncronization process is in progress, do not start new one");
+ return;
+ }
+
+ if ((ctx->sock = make_tcp_socket (&ctx->st->binlog->master_addr, ctx->st->binlog->master_port, FALSE, TRUE)) == -1) {
+ msg_info ("sync_timer_callback: cannot connect to %s", inet_ntoa (ctx->st->binlog->master_addr));
+ return;
+ }
+ /* Now create and activate dispatcher */
+ ctx->io_tv.tv_sec = IO_TIMEOUT;
+ ctx->io_tv.tv_usec = 0;
+ ctx->dispatcher = rspamd_create_dispatcher (ctx->sock, BUFFER_LINE, sync_read, NULL, sync_err, &ctx->io_tv, ctx);
+ /* Write initial data */
+ statfile_get_revision (ctx->real_statfile, &rev, &ti);
+ rev = snprintf (buf, sizeof (buf), "sync %s %ld %ld" CRLF, ctx->st->symbol, (long int)rev, (long int)ti);
+
+ ctx->state = SYNC_STATE_GREETING;
+ ctx->is_busy = TRUE;
+
+ rspamd_dispatcher_write (ctx->dispatcher, buf, rev, FALSE, FALSE);
+}
+
+static gboolean
+add_statfile_watch (statfile_pool_t *pool, struct statfile *st)
+{
+ struct rspamd_sync_ctx *ctx;
+
+ ctx = memory_pool_alloc (pool->pool, sizeof (struct rspamd_sync_ctx));
+ ctx->st = st;
+ /* Add some jittering for synchronization */
+ ctx->interval.tv_sec = g_random_int_range (MAX_SYNC_TIME, MAX_SYNC_TIME * 2);
+ ctx->interval.tv_usec = 0;
+ /* Open statfile and attach it to pool */
+ if ((ctx->real_statfile = statfile_pool_is_open (pool, st->path)) == NULL) {
+ if ((ctx->real_statfile = statfile_pool_open (pool, st->path, st->size, FALSE)) == NULL) {
+ msg_warn ("add_statfile_watch: cannot open %s", st->path);
+ if (statfile_pool_create (pool, st->path, st->size) == -1) {
+ msg_err ("add_statfile_watch: cannot create statfile %s", st->path);
+ return FALSE;
+ }
+ ctx->real_statfile = statfile_pool_open (pool, st->path, st->size, FALSE);
+ }
+ }
+ /* Now plan event for it's future executing */
+ evtimer_set (&ctx->tm_ev, sync_timer_callback, ctx);
+ evtimer_add (&ctx->tm_ev, &ctx->interval);
+
+ return TRUE;
+}
+
+gboolean
+start_statfile_sync (statfile_pool_t *pool, struct config_file *cfg)
+{
+ GList *cur, *l;
+ struct classifier_config *cl;
+ struct statfile *st;
+
+ /*
+ * First of all walk through all classifiers and find those statfiles
+ * for which we should do sync (slave affinity)
+ */
+ cur = cfg->classifiers;
+ while (cur) {
+ cl = cur->data;
+ l = cl->statfiles;
+ while (l) {
+ st = l->data;
+ if (st->binlog != NULL && st->binlog->affinity == AFFINITY_SLAVE) {
+ if (!add_statfile_watch (pool, st)) {
+ return FALSE;
+ }
+ }
+ l = g_list_next (l);
+ }
+ cur = g_list_next (cur);
+ }
+
+ return TRUE;
+}
diff --git a/src/statfile_sync.h b/src/statfile_sync.h
new file mode 100644
index 000000000..ba3aec2a3
--- /dev/null
+++ b/src/statfile_sync.h
@@ -0,0 +1,11 @@
+#ifndef RSPAMD_STATFILE_SYNC_H
+#define RSPAMD_STATFILE_SYNC_H
+
+#include "config.h"
+#include "main.h"
+#include "statfile.h"
+#include "cfg_file.h"
+
+gboolean start_statfile_sync (statfile_pool_t *pool, struct config_file *cfg);
+
+#endif