struct statfile *st;
char out_buf[BUFSIZ];
int i;
- uint64_t rev, time, len, pos;
+ uint64_t rev, ti, len, pos;
char *out;
struct rspamd_binlog_element log_elt;
struct stat_file_block *stat_elt;
}
/* Begin to copy all blocks into array */
- statfile_get_revision (statfile, &rev, (time_t *)&time);
+ statfile_get_revision (statfile, &rev, (time_t *)&ti);
+ if (ti == 0) {
+ /* Not tracked file */
+ ti = time (NULL);
+ statfile_set_revision (statfile, rev, ti);
+ }
len = statfile->cur_section.length * sizeof (struct rspamd_binlog_element);
- i = snprintf (out_buf, sizeof (out_buf), "%lu %lu %lu" CRLF, (long unsigned)rev, (long unsigned)time, (long unsigned)len);
- rspamd_dispatcher_write (session->dispatcher, out_buf, i, TRUE, FALSE);
out = memory_pool_alloc (session->session_pool, len);
for (i = 0, pos = 0; i < statfile->cur_section.length; i ++) {
stat_elt = (struct stat_file_block *)((u_char *)statfile->map + statfile->seek_pos + i * sizeof (struct stat_file_block));
- log_elt.h1 = stat_elt->hash1;
- log_elt.h2 = stat_elt->hash2;
- log_elt.value = stat_elt->value;
- memcpy (out + pos, &log_elt, sizeof (log_elt));
- pos += sizeof (struct rspamd_binlog_element);
+ if (fabs (stat_elt->value) > 0.001) {
+ /* Write only those values which value is not 0 */
+ log_elt.h1 = stat_elt->hash1;
+ log_elt.h2 = stat_elt->hash2;
+ log_elt.value = stat_elt->value;
+
+ memcpy (out + pos, &log_elt, sizeof (log_elt));
+ pos += sizeof (struct rspamd_binlog_element);
+ }
}
- if (!rspamd_dispatcher_write (session->dispatcher, out, len, TRUE, TRUE)) {
+
+ i = rspamd_snprintf (out_buf, sizeof (out_buf), "%uL %uL %uL" CRLF, rev, ti, pos);
+ rspamd_dispatcher_write (session->dispatcher, out_buf, i, TRUE, FALSE);
+
+ if (!rspamd_dispatcher_write (session->dispatcher, out, pos, TRUE, TRUE)) {
return FALSE;
}
/* XXX: hardcoding this value is not very smart */
#define MAX_SYNC_TIME 60
-#define IO_TIMEOUT 5
+#define IO_TIMEOUT 20
enum rspamd_sync_state {
ctx->is_busy = FALSE;
return TRUE;
}
-
+
+ msg_info ("got string: %V", in);
/* Now try to extract 3 numbers from string: revision, time and length */
p = in->begin;
val = &ctx->new_rev;
statfile_get_revision (ctx->real_statfile, &rev, &ti);
rev = snprintf (buf, sizeof (buf), "sync %s %ld %ld" CRLF, ctx->st->symbol, (long int)rev, (long int)ti);
ctx->state = SYNC_STATE_READ_LINE;
- rspamd_dispatcher_write (ctx->dispatcher, buf, rev, FALSE, FALSE);
+ return rspamd_dispatcher_write (ctx->dispatcher, buf, rev, FALSE, FALSE);
break;
case SYNC_STATE_READ_LINE:
/* Try to parse line from server */
if (!parse_revision_line (ctx, in)) {
- msg_info ("sync_read: cannot parse line: %S", in);
+ msg_info ("sync_read: cannot parse line: %*s", in->len, in->begin);
+ close (ctx->sock);
rspamd_remove_dispatcher (ctx->dispatcher);
ctx->is_busy = FALSE;
return FALSE;
}
else {
/* Quit this session */
+ msg_info ("sync_read: no sync needed for: %s", ctx->st->symbol);
+ close (ctx->sock);
rspamd_remove_dispatcher (ctx->dispatcher);
ctx->is_busy = FALSE;
/* Immideately return from callback */
/* In now contains all blocks of specified revision, so we can read them directly */
if (!read_blocks (ctx, in)) {
msg_info ("sync_read: cannot read blocks");
+ close (ctx->sock);
rspamd_remove_dispatcher (ctx->dispatcher);
ctx->is_busy = FALSE;
return FALSE;
ctx->state = SYNC_STATE_READ_LINE;
break;
case SYNC_STATE_QUIT:
+ close (ctx->sock);
rspamd_remove_dispatcher (ctx->dispatcher);
ctx->is_busy = FALSE;
return FALSE;
msg_info ("sync_err: abnormally closing connection, error: %s", err->message);
ctx->is_busy = FALSE;
+ close (ctx->sock);
rspamd_remove_dispatcher (ctx->dispatcher);
}