From 064948fca0ca6b579c4494c18223c1ea2d68208a Mon Sep 17 00:00:00 2001 From: Vsevolod Stakhov Date: Wed, 2 Dec 2009 00:45:37 +0300 Subject: [PATCH] * Some fixes to sync --- src/controller.c | 31 +++++++++++++++++++++---------- src/statfile_sync.c | 15 +++++++++++---- src/util.c | 3 +++ 3 files changed, 35 insertions(+), 14 deletions(-) diff --git a/src/controller.c b/src/controller.c index 2c541b2e9..caa538823 100644 --- a/src/controller.c +++ b/src/controller.c @@ -186,7 +186,7 @@ write_whole_statfile (struct controller_session *session, char *symbol, struct c struct statfile *st; char out_buf[BUFSIZ]; int i; - uint64_t rev, time, len, pos; + uint64_t rev, ti, len, pos; char *out; struct rspamd_binlog_element log_elt; struct stat_file_block *stat_elt; @@ -198,21 +198,32 @@ write_whole_statfile (struct controller_session *session, char *symbol, struct c } /* Begin to copy all blocks into array */ - statfile_get_revision (statfile, &rev, (time_t *)&time); + statfile_get_revision (statfile, &rev, (time_t *)&ti); + if (ti == 0) { + /* Not tracked file */ + ti = time (NULL); + statfile_set_revision (statfile, rev, ti); + } len = statfile->cur_section.length * sizeof (struct rspamd_binlog_element); - i = snprintf (out_buf, sizeof (out_buf), "%lu %lu %lu" CRLF, (long unsigned)rev, (long unsigned)time, (long unsigned)len); - rspamd_dispatcher_write (session->dispatcher, out_buf, i, TRUE, FALSE); out = memory_pool_alloc (session->session_pool, len); for (i = 0, pos = 0; i < statfile->cur_section.length; i ++) { stat_elt = (struct stat_file_block *)((u_char *)statfile->map + statfile->seek_pos + i * sizeof (struct stat_file_block)); - log_elt.h1 = stat_elt->hash1; - log_elt.h2 = stat_elt->hash2; - log_elt.value = stat_elt->value; - memcpy (out + pos, &log_elt, sizeof (log_elt)); - pos += sizeof (struct rspamd_binlog_element); + if (fabs (stat_elt->value) > 0.001) { + /* Write only those values which value is not 0 */ + log_elt.h1 = stat_elt->hash1; + log_elt.h2 = stat_elt->hash2; + log_elt.value = stat_elt->value; + + memcpy (out + pos, &log_elt, sizeof (log_elt)); + pos += sizeof (struct rspamd_binlog_element); + } } - if (!rspamd_dispatcher_write (session->dispatcher, out, len, TRUE, TRUE)) { + + i = rspamd_snprintf (out_buf, sizeof (out_buf), "%uL %uL %uL" CRLF, rev, ti, pos); + rspamd_dispatcher_write (session->dispatcher, out_buf, i, TRUE, FALSE); + + if (!rspamd_dispatcher_write (session->dispatcher, out, pos, TRUE, TRUE)) { return FALSE; } diff --git a/src/statfile_sync.c b/src/statfile_sync.c index 300bb215e..5d2e9283d 100644 --- a/src/statfile_sync.c +++ b/src/statfile_sync.c @@ -33,7 +33,7 @@ /* XXX: hardcoding this value is not very smart */ #define MAX_SYNC_TIME 60 -#define IO_TIMEOUT 5 +#define IO_TIMEOUT 20 enum rspamd_sync_state { @@ -103,7 +103,8 @@ parse_revision_line (struct rspamd_sync_ctx *ctx, f_str_t *in) ctx->is_busy = FALSE; return TRUE; } - + + msg_info ("got string: %V", in); /* Now try to extract 3 numbers from string: revision, time and length */ p = in->begin; val = &ctx->new_rev; @@ -175,12 +176,13 @@ sync_read (f_str_t * in, void *arg) statfile_get_revision (ctx->real_statfile, &rev, &ti); rev = snprintf (buf, sizeof (buf), "sync %s %ld %ld" CRLF, ctx->st->symbol, (long int)rev, (long int)ti); ctx->state = SYNC_STATE_READ_LINE; - rspamd_dispatcher_write (ctx->dispatcher, buf, rev, FALSE, FALSE); + return rspamd_dispatcher_write (ctx->dispatcher, buf, rev, FALSE, FALSE); break; case SYNC_STATE_READ_LINE: /* Try to parse line from server */ if (!parse_revision_line (ctx, in)) { - msg_info ("sync_read: cannot parse line: %S", in); + msg_info ("sync_read: cannot parse line: %*s", in->len, in->begin); + close (ctx->sock); rspamd_remove_dispatcher (ctx->dispatcher); ctx->is_busy = FALSE; return FALSE; @@ -191,6 +193,8 @@ sync_read (f_str_t * in, void *arg) } else { /* Quit this session */ + msg_info ("sync_read: no sync needed for: %s", ctx->st->symbol); + close (ctx->sock); rspamd_remove_dispatcher (ctx->dispatcher); ctx->is_busy = FALSE; /* Immideately return from callback */ @@ -201,6 +205,7 @@ sync_read (f_str_t * in, void *arg) /* In now contains all blocks of specified revision, so we can read them directly */ if (!read_blocks (ctx, in)) { msg_info ("sync_read: cannot read blocks"); + close (ctx->sock); rspamd_remove_dispatcher (ctx->dispatcher); ctx->is_busy = FALSE; return FALSE; @@ -211,6 +216,7 @@ sync_read (f_str_t * in, void *arg) ctx->state = SYNC_STATE_READ_LINE; break; case SYNC_STATE_QUIT: + close (ctx->sock); rspamd_remove_dispatcher (ctx->dispatcher); ctx->is_busy = FALSE; return FALSE; @@ -226,6 +232,7 @@ sync_err (GError *err, void *arg) msg_info ("sync_err: abnormally closing connection, error: %s", err->message); ctx->is_busy = FALSE; + close (ctx->sock); rspamd_remove_dispatcher (ctx->dispatcher); } diff --git a/src/util.c b/src/util.c index c549426c5..2e22772c2 100644 --- a/src/util.c +++ b/src/util.c @@ -1344,6 +1344,9 @@ rspamd_vsnprintf (u_char *buf, size_t max, const char *fmt, va_list args) case 's': p = va_arg(args, u_char *); + if (p == NULL) { + p = "(NULL)"; + } if (slen == (size_t) -1) { while (*p && buf < last) { -- 2.39.5