@@ -186,7 +186,7 @@ write_whole_statfile (struct controller_session *session, char *symbol, struct c | |||
struct statfile *st; | |||
char out_buf[BUFSIZ]; | |||
int i; | |||
uint64_t rev, time, len, pos; | |||
uint64_t rev, ti, len, pos; | |||
char *out; | |||
struct rspamd_binlog_element log_elt; | |||
struct stat_file_block *stat_elt; | |||
@@ -198,21 +198,32 @@ write_whole_statfile (struct controller_session *session, char *symbol, struct c | |||
} | |||
/* Begin to copy all blocks into array */ | |||
statfile_get_revision (statfile, &rev, (time_t *)&time); | |||
statfile_get_revision (statfile, &rev, (time_t *)&ti); | |||
if (ti == 0) { | |||
/* Not tracked file */ | |||
ti = time (NULL); | |||
statfile_set_revision (statfile, rev, ti); | |||
} | |||
len = statfile->cur_section.length * sizeof (struct rspamd_binlog_element); | |||
i = snprintf (out_buf, sizeof (out_buf), "%lu %lu %lu" CRLF, (long unsigned)rev, (long unsigned)time, (long unsigned)len); | |||
rspamd_dispatcher_write (session->dispatcher, out_buf, i, TRUE, FALSE); | |||
out = memory_pool_alloc (session->session_pool, len); | |||
for (i = 0, pos = 0; i < statfile->cur_section.length; i ++) { | |||
stat_elt = (struct stat_file_block *)((u_char *)statfile->map + statfile->seek_pos + i * sizeof (struct stat_file_block)); | |||
log_elt.h1 = stat_elt->hash1; | |||
log_elt.h2 = stat_elt->hash2; | |||
log_elt.value = stat_elt->value; | |||
memcpy (out + pos, &log_elt, sizeof (log_elt)); | |||
pos += sizeof (struct rspamd_binlog_element); | |||
if (fabs (stat_elt->value) > 0.001) { | |||
/* Write only those values which value is not 0 */ | |||
log_elt.h1 = stat_elt->hash1; | |||
log_elt.h2 = stat_elt->hash2; | |||
log_elt.value = stat_elt->value; | |||
memcpy (out + pos, &log_elt, sizeof (log_elt)); | |||
pos += sizeof (struct rspamd_binlog_element); | |||
} | |||
} | |||
if (!rspamd_dispatcher_write (session->dispatcher, out, len, TRUE, TRUE)) { | |||
i = rspamd_snprintf (out_buf, sizeof (out_buf), "%uL %uL %uL" CRLF, rev, ti, pos); | |||
rspamd_dispatcher_write (session->dispatcher, out_buf, i, TRUE, FALSE); | |||
if (!rspamd_dispatcher_write (session->dispatcher, out, pos, TRUE, TRUE)) { | |||
return FALSE; | |||
} | |||
@@ -33,7 +33,7 @@ | |||
/* XXX: hardcoding this value is not very smart */ | |||
#define MAX_SYNC_TIME 60 | |||
#define IO_TIMEOUT 5 | |||
#define IO_TIMEOUT 20 | |||
enum rspamd_sync_state { | |||
@@ -103,7 +103,8 @@ parse_revision_line (struct rspamd_sync_ctx *ctx, f_str_t *in) | |||
ctx->is_busy = FALSE; | |||
return TRUE; | |||
} | |||
msg_info ("got string: %V", in); | |||
/* Now try to extract 3 numbers from string: revision, time and length */ | |||
p = in->begin; | |||
val = &ctx->new_rev; | |||
@@ -175,12 +176,13 @@ sync_read (f_str_t * in, void *arg) | |||
statfile_get_revision (ctx->real_statfile, &rev, &ti); | |||
rev = snprintf (buf, sizeof (buf), "sync %s %ld %ld" CRLF, ctx->st->symbol, (long int)rev, (long int)ti); | |||
ctx->state = SYNC_STATE_READ_LINE; | |||
rspamd_dispatcher_write (ctx->dispatcher, buf, rev, FALSE, FALSE); | |||
return rspamd_dispatcher_write (ctx->dispatcher, buf, rev, FALSE, FALSE); | |||
break; | |||
case SYNC_STATE_READ_LINE: | |||
/* Try to parse line from server */ | |||
if (!parse_revision_line (ctx, in)) { | |||
msg_info ("sync_read: cannot parse line: %S", in); | |||
msg_info ("sync_read: cannot parse line: %*s", in->len, in->begin); | |||
close (ctx->sock); | |||
rspamd_remove_dispatcher (ctx->dispatcher); | |||
ctx->is_busy = FALSE; | |||
return FALSE; | |||
@@ -191,6 +193,8 @@ sync_read (f_str_t * in, void *arg) | |||
} | |||
else { | |||
/* Quit this session */ | |||
msg_info ("sync_read: no sync needed for: %s", ctx->st->symbol); | |||
close (ctx->sock); | |||
rspamd_remove_dispatcher (ctx->dispatcher); | |||
ctx->is_busy = FALSE; | |||
/* Immideately return from callback */ | |||
@@ -201,6 +205,7 @@ sync_read (f_str_t * in, void *arg) | |||
/* In now contains all blocks of specified revision, so we can read them directly */ | |||
if (!read_blocks (ctx, in)) { | |||
msg_info ("sync_read: cannot read blocks"); | |||
close (ctx->sock); | |||
rspamd_remove_dispatcher (ctx->dispatcher); | |||
ctx->is_busy = FALSE; | |||
return FALSE; | |||
@@ -211,6 +216,7 @@ sync_read (f_str_t * in, void *arg) | |||
ctx->state = SYNC_STATE_READ_LINE; | |||
break; | |||
case SYNC_STATE_QUIT: | |||
close (ctx->sock); | |||
rspamd_remove_dispatcher (ctx->dispatcher); | |||
ctx->is_busy = FALSE; | |||
return FALSE; | |||
@@ -226,6 +232,7 @@ sync_err (GError *err, void *arg) | |||
msg_info ("sync_err: abnormally closing connection, error: %s", err->message); | |||
ctx->is_busy = FALSE; | |||
close (ctx->sock); | |||
rspamd_remove_dispatcher (ctx->dispatcher); | |||
} | |||
@@ -1344,6 +1344,9 @@ rspamd_vsnprintf (u_char *buf, size_t max, const char *fmt, va_list args) | |||
case 's': | |||
p = va_arg(args, u_char *); | |||
if (p == NULL) { | |||
p = "(NULL)"; | |||
} | |||
if (slen == (size_t) -1) { | |||
while (*p && buf < last) { |