aboutsummaryrefslogtreecommitdiffstats
path: root/src/libserver/statfile_sync.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/libserver/statfile_sync.c')
-rw-r--r--src/libserver/statfile_sync.c350
1 files changed, 350 insertions, 0 deletions
diff --git a/src/libserver/statfile_sync.c b/src/libserver/statfile_sync.c
new file mode 100644
index 000000000..6b545af17
--- /dev/null
+++ b/src/libserver/statfile_sync.c
@@ -0,0 +1,350 @@
+/*
+ * Copyright (c) 2009-2012, Vsevolod Stakhov
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above copyright
+ * notice, this list of conditions and the following disclaimer in the
+ * documentation and/or other materials provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY AUTHOR ''AS IS'' AND ANY
+ * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+ * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
+ * DISCLAIMED. IN NO EVENT SHALL AUTHOR BE LIABLE FOR ANY
+ * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
+ * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
+ * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
+ * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
+ * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#include "config.h"
+#include "cfg_file.h"
+#include "tokenizers/tokenizers.h"
+#include "classifiers/classifiers.h"
+#include "statfile.h"
+#include "binlog.h"
+#include "buffer.h"
+#include "statfile_sync.h"
+
+enum rspamd_sync_state {
+ SYNC_STATE_GREETING,
+ SYNC_STATE_READ_LINE,
+ SYNC_STATE_READ_REV,
+ SYNC_STATE_QUIT,
+};
+
+/* Context of sync process */
+struct rspamd_sync_ctx {
+ struct statfile *st;
+ stat_file_t *real_statfile;
+ statfile_pool_t *pool;
+ rspamd_io_dispatcher_t *dispatcher;
+ struct event_base *ev_base;
+
+ struct event tm_ev;
+
+ struct timeval interval;
+ struct timeval io_tv;
+ gint sock;
+ guint32 timeout;
+ guint32 sync_interval;
+ enum rspamd_sync_state state;
+ gboolean is_busy;
+
+ guint64 new_rev;
+ guint64 new_time;
+ guint64 new_len;
+};
+
+static void
+log_next_sync (const gchar *symbol, time_t delay)
+{
+ gchar outstr[200];
+ time_t t;
+ struct tm *tmp;
+ gint r;
+
+ t = time(NULL);
+ t += delay;
+ tmp = localtime(&t);
+
+ if (tmp) {
+ r = rspamd_snprintf (outstr, sizeof (outstr), "statfile_sync: next sync of %s at ", symbol);
+ if ((r = strftime(outstr + r, sizeof(outstr) - r, "%T", tmp)) != 0) {
+ msg_info (outstr);
+ }
+ }
+}
+
+static gboolean
+parse_revision_line (struct rspamd_sync_ctx *ctx, f_str_t *in)
+{
+ guint i, state = 0;
+ gchar *p, *c, numbuf[sizeof("18446744073709551615")];
+ guint64 *val;
+
+ /* First of all try to find END line */
+ if (in->len >= sizeof ("END") - 1 && memcmp (in->begin, "END", sizeof ("END") - 1) == 0) {
+ ctx->state = SYNC_STATE_QUIT;
+ ctx->is_busy = FALSE;
+ return TRUE;
+ }
+
+ /* Next check for error line */
+ if (in->len >= sizeof ("FAIL") - 1 && memcmp (in->begin, "FAIL", sizeof ("FAIL") - 1) == 0) {
+ ctx->state = SYNC_STATE_QUIT;
+ ctx->is_busy = FALSE;
+ return TRUE;
+ }
+
+ /* Now try to extract 3 numbers from string: revision, time and length */
+ p = in->begin;
+ val = &ctx->new_rev;
+ for (i = 0; i < in->len; i ++, p ++) {
+ if (g_ascii_isspace (*p) || i == in->len - 1) {
+ if (state == 1) {
+ if (i == in->len - 1) {
+ /* One more character */
+ p ++;
+ }
+ rspamd_strlcpy (numbuf, c, MIN (p - c + 1, (gint)sizeof (numbuf)));
+ errno = 0;
+ *val = strtoull (numbuf, NULL, 10);
+ if (errno != 0) {
+ msg_info ("cannot parse number %s", strerror (errno));
+ return FALSE;
+ }
+ state = 2;
+ }
+ }
+ else {
+ if (state == 0) {
+ c = p;
+ state = 1;
+ }
+ else if (state == 2) {
+ if (val == &ctx->new_rev) {
+ val = &ctx->new_time;
+ }
+ else if (val == &ctx->new_time) {
+ val = &ctx->new_len;
+ }
+ c = p;
+ state = 1;
+ }
+ }
+ }
+
+ /* Current value must be len value and its value must not be 0 */
+ return ((val == &ctx->new_len));
+}
+
+static gboolean
+read_blocks (struct rspamd_sync_ctx *ctx, f_str_t *in)
+{
+ struct rspamd_binlog_element *elt;
+ guint i;
+
+ statfile_pool_lock_file (ctx->pool, ctx->real_statfile);
+ elt = (struct rspamd_binlog_element *)in->begin;
+ for (i = 0; i < in->len / sizeof (struct rspamd_binlog_element); i ++, elt ++) {
+ statfile_pool_set_block (ctx->pool, ctx->real_statfile, elt->h1, elt->h2, ctx->new_time, elt->value);
+ }
+ statfile_pool_unlock_file (ctx->pool, ctx->real_statfile);
+
+ return TRUE;
+}
+
+static gboolean
+sync_read (f_str_t * in, void *arg)
+{
+ struct rspamd_sync_ctx *ctx = arg;
+ gchar buf[256];
+ guint64 rev = 0;
+ time_t ti = 0;
+
+ if (in->len == 0) {
+ /* Skip empty lines */
+ return TRUE;
+ }
+ switch (ctx->state) {
+ case SYNC_STATE_GREETING:
+ /* Skip greeting line and write sync command */
+ /* Write initial data */
+ statfile_get_revision (ctx->real_statfile, &rev, &ti);
+ rev = rspamd_snprintf (buf, sizeof (buf), "sync %s %uL %T" CRLF, ctx->st->symbol, rev, ti);
+ ctx->state = SYNC_STATE_READ_LINE;
+ return rspamd_dispatcher_write (ctx->dispatcher, buf, rev, FALSE, FALSE);
+ break;
+ case SYNC_STATE_READ_LINE:
+ /* Try to parse line from server */
+ if (!parse_revision_line (ctx, in)) {
+ msg_info ("cannot parse line of length %z: '%*s'", in->len, (gint)in->len, in->begin);
+ close (ctx->sock);
+ rspamd_remove_dispatcher (ctx->dispatcher);
+ ctx->is_busy = FALSE;
+ return FALSE;
+ }
+ else if (ctx->state != SYNC_STATE_QUIT) {
+ if (ctx->new_len > 0) {
+ ctx->state = SYNC_STATE_READ_REV;
+ rspamd_set_dispatcher_policy (ctx->dispatcher, BUFFER_CHARACTER, ctx->new_len);
+ }
+ }
+ else {
+ /* Quit this session */
+ msg_info ("sync ended for: %s", ctx->st->symbol);
+ close (ctx->sock);
+ rspamd_remove_dispatcher (ctx->dispatcher);
+ ctx->is_busy = FALSE;
+ /* Immediately return from callback */
+ return FALSE;
+ }
+ break;
+ case SYNC_STATE_READ_REV:
+ /* In now contains all blocks of specified revision, so we can read them directly */
+ if (!read_blocks (ctx, in)) {
+ msg_info ("cannot read blocks");
+ close (ctx->sock);
+ rspamd_remove_dispatcher (ctx->dispatcher);
+ ctx->is_busy = FALSE;
+ return FALSE;
+ }
+ statfile_set_revision (ctx->real_statfile, ctx->new_rev, ctx->new_time);
+ msg_info ("set new revision: %uL, readed %z bytes", ctx->new_rev, in->len);
+ /* Now try to read other revision or END line */
+ ctx->state = SYNC_STATE_READ_LINE;
+ rspamd_set_dispatcher_policy (ctx->dispatcher, BUFFER_LINE, 0);
+ break;
+ case SYNC_STATE_QUIT:
+ close (ctx->sock);
+ rspamd_remove_dispatcher (ctx->dispatcher);
+ ctx->is_busy = FALSE;
+ return FALSE;
+ }
+
+ return TRUE;
+}
+
+static void
+sync_err (GError *err, void *arg)
+{
+ struct rspamd_sync_ctx *ctx = arg;
+
+ msg_info ("abnormally closing connection, error: %s", err->message);
+ ctx->is_busy = FALSE;
+ close (ctx->sock);
+ rspamd_remove_dispatcher (ctx->dispatcher);
+}
+
+
+static void
+sync_timer_callback (gint fd, short what, void *ud)
+{
+ struct rspamd_sync_ctx *ctx = ud;
+ guint32 jittered_interval;
+
+ /* Plan new event */
+ evtimer_del (&ctx->tm_ev);
+ /* Add some jittering for synchronization */
+ jittered_interval = g_random_int_range (ctx->sync_interval, ctx->sync_interval * 2);
+ msec_to_tv (jittered_interval, &ctx->interval);
+ evtimer_add (&ctx->tm_ev, &ctx->interval);
+ log_next_sync (ctx->st->symbol, ctx->interval.tv_sec);
+
+ if (ctx->is_busy) {
+ /* Sync is in progress */
+ msg_info ("syncronization process is in progress, do not start new one");
+ return;
+ }
+
+ if ((ctx->sock = make_universal_socket (ctx->st->binlog->master_addr, ctx->st->binlog->master_port,
+ SOCK_STREAM, TRUE, FALSE, TRUE)) == -1) {
+ msg_info ("cannot connect to %s", ctx->st->binlog->master_addr);
+ return;
+ }
+ /* Now create and activate dispatcher */
+ msec_to_tv (ctx->timeout, &ctx->io_tv);
+ ctx->dispatcher = rspamd_create_dispatcher (ctx->ev_base, ctx->sock, BUFFER_LINE, sync_read, NULL, sync_err, &ctx->io_tv, ctx);
+
+ ctx->state = SYNC_STATE_GREETING;
+ ctx->is_busy = TRUE;
+
+ msg_info ("starting synchronization of %s", ctx->st->symbol);
+
+}
+
+static gboolean
+add_statfile_watch (statfile_pool_t *pool, struct statfile *st, struct config_file *cfg, struct event_base *ev_base)
+{
+ struct rspamd_sync_ctx *ctx;
+ guint32 jittered_interval;
+
+ if (st->binlog->master_addr != NULL) {
+ ctx = rspamd_mempool_alloc (pool->pool, sizeof (struct rspamd_sync_ctx));
+ ctx->st = st;
+ ctx->timeout = cfg->statfile_sync_timeout;
+ ctx->sync_interval = cfg->statfile_sync_interval;
+ ctx->ev_base = ev_base;
+ /* Add some jittering for synchronization */
+ jittered_interval = g_random_int_range (ctx->sync_interval, ctx->sync_interval * 2);
+ msec_to_tv (jittered_interval, &ctx->interval);
+ /* Open statfile and attach it to pool */
+ if ((ctx->real_statfile = statfile_pool_is_open (pool, st->path)) == NULL) {
+ if ((ctx->real_statfile = statfile_pool_open (pool, st->path, st->size, FALSE)) == NULL) {
+ msg_warn ("cannot open %s", st->path);
+ if (statfile_pool_create (pool, st->path, st->size) == -1) {
+ msg_err ("cannot create statfile %s", st->path);
+ return FALSE;
+ }
+ ctx->real_statfile = statfile_pool_open (pool, st->path, st->size, FALSE);
+ }
+ }
+ /* Now plan event for it's future executing */
+ evtimer_set (&ctx->tm_ev, sync_timer_callback, ctx);
+ event_base_set (ctx->ev_base, &ctx->tm_ev);
+ evtimer_add (&ctx->tm_ev, &ctx->interval);
+ log_next_sync (st->symbol, ctx->interval.tv_sec);
+ }
+ else {
+ msg_err ("cannot add statfile watch for statfile %s: no master defined", st->symbol);
+ return FALSE;
+ }
+
+ return TRUE;
+}
+
+gboolean
+start_statfile_sync (statfile_pool_t *pool, struct config_file *cfg, struct event_base *ev_base)
+{
+ GList *cur, *l;
+ struct classifier_config *cl;
+ struct statfile *st;
+
+ /*
+ * First of all walk through all classifiers and find those statfiles
+ * for which we should do sync (slave affinity)
+ */
+ cur = cfg->classifiers;
+ while (cur) {
+ cl = cur->data;
+ l = cl->statfiles;
+ while (l) {
+ st = l->data;
+ if (st->binlog != NULL && st->binlog->affinity == AFFINITY_SLAVE) {
+ if (!add_statfile_watch (pool, st, cfg, ev_base)) {
+ return FALSE;
+ }
+ }
+ l = g_list_next (l);
+ }
+ cur = g_list_next (cur);
+ }
+
+ return TRUE;
+}