]> source.dussan.org Git - rspamd.git/commitdiff
* Some fixes to sync
authorVsevolod Stakhov <vsevolod@rambler-co.ru>
Tue, 1 Dec 2009 21:45:37 +0000 (00:45 +0300)
committerVsevolod Stakhov <vsevolod@rambler-co.ru>
Tue, 1 Dec 2009 21:45:37 +0000 (00:45 +0300)
src/controller.c
src/statfile_sync.c
src/util.c

index 2c541b2e96e56d75bf5dd2d36324dc5f9f606c39..caa538823c2219145d5702b0d90b3ff17a421ce0 100644 (file)
@@ -186,7 +186,7 @@ write_whole_statfile (struct controller_session *session, char *symbol, struct c
        struct statfile                *st;
        char                            out_buf[BUFSIZ];
        int                             i;
-       uint64_t                        rev, time, len, pos;
+       uint64_t                        rev, ti, len, pos;
        char                           *out;
        struct rspamd_binlog_element    log_elt;
        struct stat_file_block         *stat_elt;
@@ -198,21 +198,32 @@ write_whole_statfile (struct controller_session *session, char *symbol, struct c
        }
        
        /* Begin to copy all blocks into array */
-       statfile_get_revision (statfile, &rev, (time_t *)&time);
+       statfile_get_revision (statfile, &rev, (time_t *)&ti);
+       if (ti == 0) {
+               /* Not tracked file */
+               ti = time (NULL);
+               statfile_set_revision (statfile, rev, ti);
+       }
        len = statfile->cur_section.length * sizeof (struct rspamd_binlog_element);
-       i = snprintf (out_buf, sizeof (out_buf), "%lu %lu %lu" CRLF, (long unsigned)rev, (long unsigned)time, (long unsigned)len);
-       rspamd_dispatcher_write (session->dispatcher, out_buf, i, TRUE, FALSE);
        out = memory_pool_alloc (session->session_pool, len);
 
        for (i = 0, pos = 0; i < statfile->cur_section.length; i ++) {
                stat_elt = (struct stat_file_block *)((u_char *)statfile->map + statfile->seek_pos + i * sizeof (struct stat_file_block));
-               log_elt.h1 = stat_elt->hash1;
-               log_elt.h2 = stat_elt->hash2;
-               log_elt.value = stat_elt->value;
-               memcpy (out + pos, &log_elt, sizeof (log_elt));
-               pos += sizeof (struct rspamd_binlog_element);
+               if (fabs (stat_elt->value) > 0.001) {
+                       /* Write only those values which value is not 0 */
+                       log_elt.h1 = stat_elt->hash1;
+                       log_elt.h2 = stat_elt->hash2;
+                       log_elt.value = stat_elt->value;
+
+                       memcpy (out + pos, &log_elt, sizeof (log_elt));
+                       pos += sizeof (struct rspamd_binlog_element);
+               }
        }
-       if (!rspamd_dispatcher_write (session->dispatcher, out, len, TRUE, TRUE)) {
+
+       i = rspamd_snprintf (out_buf, sizeof (out_buf), "%uL %uL %uL" CRLF, rev, ti, pos);
+       rspamd_dispatcher_write (session->dispatcher, out_buf, i, TRUE, FALSE);
+
+       if (!rspamd_dispatcher_write (session->dispatcher, out, pos, TRUE, TRUE)) {
                return FALSE;
        }
        
index 300bb215e998c99d395b14b5fed8eb3bcc51642e..5d2e9283df83c72f55004fbafaa28e0dda253466 100644 (file)
@@ -33,7 +33,7 @@
 
 /* XXX: hardcoding this value is not very smart */
 #define MAX_SYNC_TIME 60
-#define IO_TIMEOUT 5
+#define IO_TIMEOUT 20
 
 
 enum rspamd_sync_state {
@@ -103,7 +103,8 @@ parse_revision_line (struct rspamd_sync_ctx *ctx, f_str_t *in)
                ctx->is_busy = FALSE;
                return TRUE;
        }
-
+       
+       msg_info ("got string: %V", in);
        /* Now try to extract 3 numbers from string: revision, time and length */
        p = in->begin;
        val = &ctx->new_rev;
@@ -175,12 +176,13 @@ sync_read (f_str_t * in, void *arg)
                        statfile_get_revision (ctx->real_statfile, &rev, &ti);
                        rev = snprintf (buf, sizeof (buf), "sync %s %ld %ld" CRLF, ctx->st->symbol, (long int)rev, (long int)ti);
                        ctx->state = SYNC_STATE_READ_LINE;
-                       rspamd_dispatcher_write (ctx->dispatcher, buf, rev, FALSE, FALSE);      
+                       return rspamd_dispatcher_write (ctx->dispatcher, buf, rev, FALSE, FALSE);       
                        break;
                case SYNC_STATE_READ_LINE:
                        /* Try to parse line from server */
                        if (!parse_revision_line (ctx, in)) {
-                               msg_info ("sync_read: cannot parse line: %S", in);
+                               msg_info ("sync_read: cannot parse line: %*s", in->len, in->begin);
+                               close (ctx->sock);
                                rspamd_remove_dispatcher (ctx->dispatcher);
                                ctx->is_busy = FALSE;
                                return FALSE;
@@ -191,6 +193,8 @@ sync_read (f_str_t * in, void *arg)
                        }
                        else {
                                /* Quit this session */
+                               msg_info ("sync_read: no sync needed for: %s", ctx->st->symbol);
+                               close (ctx->sock);
                                rspamd_remove_dispatcher (ctx->dispatcher);
                                ctx->is_busy = FALSE;
                                /* Immideately return from callback */ 
@@ -201,6 +205,7 @@ sync_read (f_str_t * in, void *arg)
                        /* In now contains all blocks of specified revision, so we can read them directly */
                        if (!read_blocks (ctx, in)) {
                                msg_info ("sync_read: cannot read blocks");
+                               close (ctx->sock);
                                rspamd_remove_dispatcher (ctx->dispatcher);
                                ctx->is_busy = FALSE;
                                return FALSE;
@@ -211,6 +216,7 @@ sync_read (f_str_t * in, void *arg)
                        ctx->state = SYNC_STATE_READ_LINE;
                        break;
                case SYNC_STATE_QUIT:
+                       close (ctx->sock);
                        rspamd_remove_dispatcher (ctx->dispatcher);
                        ctx->is_busy = FALSE;
                        return FALSE;
@@ -226,6 +232,7 @@ sync_err (GError *err, void *arg)
 
        msg_info ("sync_err: abnormally closing connection, error: %s", err->message);
        ctx->is_busy = FALSE;
+       close (ctx->sock);
        rspamd_remove_dispatcher (ctx->dispatcher);
 }
 
index c549426c572f018bc78b97f08402513661cb6c1c..2e22772c2a9982b2c74b1b23627d13e5e388f79d 100644 (file)
@@ -1344,6 +1344,9 @@ rspamd_vsnprintf (u_char *buf, size_t max, const char *fmt, va_list args)
 
                        case 's':
                                p = va_arg(args, u_char *);
+                               if (p == NULL) {
+                                       p = "(NULL)";
+                               }
 
                                if (slen == (size_t) -1) {
                                        while (*p && buf < last) {