diff options
author | Vsevolod Stakhov <vsevolod@rambler-co.ru> | 2009-11-27 19:15:48 +0300 |
---|---|---|
committer | Vsevolod Stakhov <vsevolod@rambler-co.ru> | 2009-11-27 19:15:48 +0300 |
commit | 569df8dd24eb159b069ca7f5efa6a6ba3336d63d (patch) | |
tree | 54d5c6200e5ea121e9fba909b6e7c40b7e164258 /src/statfile_sync.c | |
parent | 893d6efc979fa9e23d93a43c6ab1e46bd9f7da16 (diff) | |
download | rspamd-569df8dd24eb159b069ca7f5efa6a6ba3336d63d.tar.gz rspamd-569df8dd24eb159b069ca7f5efa6a6ba3336d63d.zip |
* Initial release of synchronization server in controller
Diffstat (limited to 'src/statfile_sync.c')
-rw-r--r-- | src/statfile_sync.c | 51 |
1 files changed, 43 insertions, 8 deletions
diff --git a/src/statfile_sync.c b/src/statfile_sync.c index 71be88733..d28e0d208 100644 --- a/src/statfile_sync.c +++ b/src/statfile_sync.c @@ -38,6 +38,7 @@ enum rspamd_sync_state { SYNC_STATE_GREETING, + SYNC_STATE_READ_LINE, SYNC_STATE_READ_REV, SYNC_STATE_QUIT, }; @@ -62,6 +63,25 @@ struct rspamd_sync_ctx { uint64_t new_len; }; +static void +log_next_sync (const char *symbol, time_t delay) +{ + char outstr[200]; + time_t t; + struct tm *tmp; + int r; + + t = time(NULL); + t += delay; + tmp = localtime(&t); + + if (tmp) { + r = snprintf (outstr, sizeof (outstr), "statfile_sync: next sync of %s at ", symbol); + if ((r = strftime(outstr + r, sizeof(outstr) - r, "%T", tmp)) != 0) { + msg_info (outstr); + } + } +} static gboolean parse_revision_line (struct rspamd_sync_ctx *ctx, f_str_t *in) @@ -77,6 +97,13 @@ parse_revision_line (struct rspamd_sync_ctx *ctx, f_str_t *in) return TRUE; } + /* Next check for error line */ + if (in->len >= sizeof ("FAIL") - 1 && memcmp (in->begin, "FAIL", sizeof ("FAIL") - 1) == 0) { + ctx->state = SYNC_STATE_QUIT; + ctx->is_busy = FALSE; + return TRUE; + } + /* Now try to extract 3 numbers from string: revision, time and length */ p = in->begin; val = &ctx->new_rev; @@ -137,9 +164,20 @@ static gboolean sync_read (f_str_t * in, void *arg) { struct rspamd_sync_ctx *ctx = arg; + char buf[256]; + uint64_t rev = 0; + time_t ti = 0; switch (ctx->state) { case SYNC_STATE_GREETING: + /* Skip greeting line and write sync command */ + /* Write initial data */ + statfile_get_revision (ctx->real_statfile, &rev, &ti); + rev = snprintf (buf, sizeof (buf), "sync %s %ld %ld" CRLF, ctx->st->symbol, (long int)rev, (long int)ti); + ctx->state = SYNC_STATE_READ_LINE; + rspamd_dispatcher_write (ctx->dispatcher, buf, rev, FALSE, FALSE); + break; + case SYNC_STATE_READ_LINE: /* Try to parse line from server */ if (!parse_revision_line (ctx, in)) { msg_info ("sync_read: cannot parse line"); @@ -170,7 +208,7 @@ sync_read (f_str_t * in, void *arg) statfile_set_revision (ctx->real_statfile, ctx->new_rev, ctx->new_time); /* Now try to read other revision or END line */ rspamd_set_dispatcher_policy (ctx->dispatcher, BUFFER_LINE, 0); - ctx->state = SYNC_STATE_GREETING; + ctx->state = SYNC_STATE_READ_LINE; break; case SYNC_STATE_QUIT: rspamd_remove_dispatcher (ctx->dispatcher); @@ -196,15 +234,13 @@ static void sync_timer_callback (int fd, short what, void *ud) { struct rspamd_sync_ctx *ctx = ud; - char buf[256]; - uint64_t rev = 0; - time_t ti = 0; /* Plan new event */ evtimer_del (&ctx->tm_ev); ctx->interval.tv_sec = g_random_int_range (MAX_SYNC_TIME, MAX_SYNC_TIME * 2); ctx->interval.tv_usec = 0; evtimer_add (&ctx->tm_ev, &ctx->interval); + log_next_sync (ctx->st->symbol, ctx->interval.tv_sec); if (ctx->is_busy) { /* Sync is in progress */ @@ -220,14 +256,12 @@ sync_timer_callback (int fd, short what, void *ud) ctx->io_tv.tv_sec = IO_TIMEOUT; ctx->io_tv.tv_usec = 0; ctx->dispatcher = rspamd_create_dispatcher (ctx->sock, BUFFER_LINE, sync_read, NULL, sync_err, &ctx->io_tv, ctx); - /* Write initial data */ - statfile_get_revision (ctx->real_statfile, &rev, &ti); - rev = snprintf (buf, sizeof (buf), "sync %s %ld %ld" CRLF, ctx->st->symbol, (long int)rev, (long int)ti); ctx->state = SYNC_STATE_GREETING; ctx->is_busy = TRUE; - rspamd_dispatcher_write (ctx->dispatcher, buf, rev, FALSE, FALSE); + msg_info ("sync_timer_callback: starting synchronization of %s", ctx->st->symbol); + } static gboolean @@ -254,6 +288,7 @@ add_statfile_watch (statfile_pool_t *pool, struct statfile *st) /* Now plan event for it's future executing */ evtimer_set (&ctx->tm_ev, sync_timer_callback, ctx); evtimer_add (&ctx->tm_ev, &ctx->interval); + log_next_sync (st->symbol, ctx->interval.tv_sec); return TRUE; } |