aboutsummaryrefslogtreecommitdiffstats
path: root/src/statfile_sync.c
diff options
context:
space:
mode:
authorVsevolod Stakhov <vsevolod@rambler-co.ru>2009-11-27 19:15:48 +0300
committerVsevolod Stakhov <vsevolod@rambler-co.ru>2009-11-27 19:15:48 +0300
commit569df8dd24eb159b069ca7f5efa6a6ba3336d63d (patch)
tree54d5c6200e5ea121e9fba909b6e7c40b7e164258 /src/statfile_sync.c
parent893d6efc979fa9e23d93a43c6ab1e46bd9f7da16 (diff)
downloadrspamd-569df8dd24eb159b069ca7f5efa6a6ba3336d63d.tar.gz
rspamd-569df8dd24eb159b069ca7f5efa6a6ba3336d63d.zip
* Initial release of synchronization server in controller
Diffstat (limited to 'src/statfile_sync.c')
-rw-r--r--src/statfile_sync.c51
1 files changed, 43 insertions, 8 deletions
diff --git a/src/statfile_sync.c b/src/statfile_sync.c
index 71be88733..d28e0d208 100644
--- a/src/statfile_sync.c
+++ b/src/statfile_sync.c
@@ -38,6 +38,7 @@
enum rspamd_sync_state {
SYNC_STATE_GREETING,
+ SYNC_STATE_READ_LINE,
SYNC_STATE_READ_REV,
SYNC_STATE_QUIT,
};
@@ -62,6 +63,25 @@ struct rspamd_sync_ctx {
uint64_t new_len;
};
+static void
+log_next_sync (const char *symbol, time_t delay)
+{
+ char outstr[200];
+ time_t t;
+ struct tm *tmp;
+ int r;
+
+ t = time(NULL);
+ t += delay;
+ tmp = localtime(&t);
+
+ if (tmp) {
+ r = snprintf (outstr, sizeof (outstr), "statfile_sync: next sync of %s at ", symbol);
+ if ((r = strftime(outstr + r, sizeof(outstr) - r, "%T", tmp)) != 0) {
+ msg_info (outstr);
+ }
+ }
+}
static gboolean
parse_revision_line (struct rspamd_sync_ctx *ctx, f_str_t *in)
@@ -77,6 +97,13 @@ parse_revision_line (struct rspamd_sync_ctx *ctx, f_str_t *in)
return TRUE;
}
+ /* Next check for error line */
+ if (in->len >= sizeof ("FAIL") - 1 && memcmp (in->begin, "FAIL", sizeof ("FAIL") - 1) == 0) {
+ ctx->state = SYNC_STATE_QUIT;
+ ctx->is_busy = FALSE;
+ return TRUE;
+ }
+
/* Now try to extract 3 numbers from string: revision, time and length */
p = in->begin;
val = &ctx->new_rev;
@@ -137,9 +164,20 @@ static gboolean
sync_read (f_str_t * in, void *arg)
{
struct rspamd_sync_ctx *ctx = arg;
+ char buf[256];
+ uint64_t rev = 0;
+ time_t ti = 0;
switch (ctx->state) {
case SYNC_STATE_GREETING:
+ /* Skip greeting line and write sync command */
+ /* Write initial data */
+ statfile_get_revision (ctx->real_statfile, &rev, &ti);
+ rev = snprintf (buf, sizeof (buf), "sync %s %ld %ld" CRLF, ctx->st->symbol, (long int)rev, (long int)ti);
+ ctx->state = SYNC_STATE_READ_LINE;
+ rspamd_dispatcher_write (ctx->dispatcher, buf, rev, FALSE, FALSE);
+ break;
+ case SYNC_STATE_READ_LINE:
/* Try to parse line from server */
if (!parse_revision_line (ctx, in)) {
msg_info ("sync_read: cannot parse line");
@@ -170,7 +208,7 @@ sync_read (f_str_t * in, void *arg)
statfile_set_revision (ctx->real_statfile, ctx->new_rev, ctx->new_time);
/* Now try to read other revision or END line */
rspamd_set_dispatcher_policy (ctx->dispatcher, BUFFER_LINE, 0);
- ctx->state = SYNC_STATE_GREETING;
+ ctx->state = SYNC_STATE_READ_LINE;
break;
case SYNC_STATE_QUIT:
rspamd_remove_dispatcher (ctx->dispatcher);
@@ -196,15 +234,13 @@ static void
sync_timer_callback (int fd, short what, void *ud)
{
struct rspamd_sync_ctx *ctx = ud;
- char buf[256];
- uint64_t rev = 0;
- time_t ti = 0;
/* Plan new event */
evtimer_del (&ctx->tm_ev);
ctx->interval.tv_sec = g_random_int_range (MAX_SYNC_TIME, MAX_SYNC_TIME * 2);
ctx->interval.tv_usec = 0;
evtimer_add (&ctx->tm_ev, &ctx->interval);
+ log_next_sync (ctx->st->symbol, ctx->interval.tv_sec);
if (ctx->is_busy) {
/* Sync is in progress */
@@ -220,14 +256,12 @@ sync_timer_callback (int fd, short what, void *ud)
ctx->io_tv.tv_sec = IO_TIMEOUT;
ctx->io_tv.tv_usec = 0;
ctx->dispatcher = rspamd_create_dispatcher (ctx->sock, BUFFER_LINE, sync_read, NULL, sync_err, &ctx->io_tv, ctx);
- /* Write initial data */
- statfile_get_revision (ctx->real_statfile, &rev, &ti);
- rev = snprintf (buf, sizeof (buf), "sync %s %ld %ld" CRLF, ctx->st->symbol, (long int)rev, (long int)ti);
ctx->state = SYNC_STATE_GREETING;
ctx->is_busy = TRUE;
- rspamd_dispatcher_write (ctx->dispatcher, buf, rev, FALSE, FALSE);
+ msg_info ("sync_timer_callback: starting synchronization of %s", ctx->st->symbol);
+
}
static gboolean
@@ -254,6 +288,7 @@ add_statfile_watch (statfile_pool_t *pool, struct statfile *st)
/* Now plan event for it's future executing */
evtimer_set (&ctx->tm_ev, sync_timer_callback, ctx);
evtimer_add (&ctx->tm_ev, &ctx->interval);
+ log_next_sync (st->symbol, ctx->interval.tv_sec);
return TRUE;
}