]> source.dussan.org Git - rspamd.git/commitdiff
* Many fixes to statfile syncronization system
authorVsevolod Stakhov <vsevolod@rambler-co.ru>
Thu, 2 Sep 2010 16:24:41 +0000 (20:24 +0400)
committerVsevolod Stakhov <vsevolod@rambler-co.ru>
Thu, 2 Sep 2010 16:24:41 +0000 (20:24 +0400)
* Fixed statfile pool initialization and synchronization with disk

src/binlog.c
src/buffer.c
src/controller.c
src/main.c
src/statfile.c
src/statfile.h
src/statfile_sync.c
test/rspamd_statfile_test.c

index 8ff8112b06804bf33d76ca9519a432898646ced5..83a57aacc11387fe8aa99594c25de11b67afffc4 100644 (file)
@@ -257,12 +257,13 @@ write_binlog_tree (struct rspamd_binlog *log, GTree *nodes)
                                strerror (errno), log->metaindex->indexes[log->metaindex->last_index]);
                return FALSE;
        }
+       log->cur_idx->last_index ++;
        if (write (log->fd, log->cur_idx, sizeof (struct rspamd_index_block)) == -1) {
                unlock_file (log->fd, FALSE);
                msg_info ("cannot write index to file: %s, error: %s", log->filename, strerror (errno));
                return FALSE;
        }
-       log->cur_idx->last_index ++;
+
        unlock_file (log->fd, FALSE);
        
        return TRUE;
@@ -466,6 +467,8 @@ binlog_sync (struct rspamd_binlog *log, uint64_t from_rev, uint64_t *from_time,
        /* Now fill reply structure */
        (*rep)->len = idx->len;
        /* Read result */
+       msg_info ("update from binlog '%s' from revision: %ul to revision %ul size is %ul",
+                       log->filename, (long unsigned)from_rev, (long unsigned)log->cur_seq, (long unsigned)idx->len);
        if (lseek (log->fd, idx->seek, SEEK_SET) == -1) {
                msg_warn ("cannot seek file %s, error %d, %s", log->filename, errno, strerror (errno));
                res = FALSE;
@@ -535,6 +538,7 @@ maybe_write_binlog (struct classifier_config *ccf, struct statfile *st, stat_fil
        }
 
        if (binlog_insert (log, nodes)) {
+               msg_info ("set new revision of statfile %s: %ul", st->symbol, (long unsigned)log->cur_seq);
                (void)statfile_set_revision (file, log->cur_seq, log->cur_time);
                return TRUE;
        }
index d0c36f8eaf5839f4bd6fd94c2313d32935a08b33..c5b775dfc8d372b57b4eaa4ee21c7b696138f348 100644 (file)
@@ -403,6 +403,10 @@ read_buffers (int fd, rspamd_io_dispatcher_t * d, gboolean skip_read)
                                        b = d->in_buf->data->begin;
                                        c = b;
                                }
+                               else {
+                                       d->in_buf->data->len = 0;
+                                       d->in_buf->pos = d->in_buf->data->begin;
+                               }
                                if (d->policy != saved_policy && len != r) {
                                        debug_ip (d->peer_addr, "policy changed during callback, restart buffer's processing");
                                        read_buffers (fd, d, TRUE);
index f3cdc74deffa23c150cfc5c43dda682dc8431715..bbf50d646a8e2863e29126a53588b0980f73ed80 100644 (file)
@@ -87,6 +87,7 @@ static GCompletion             *comp;
 static time_t                   start_time;
 
 static char                     greetingbuf[1024];
+static sig_atomic_t             wanna_die = 0;
 extern rspamd_hash_t           *counters;
 
 static gboolean                 controller_write_socket (void *arg);
@@ -99,13 +100,23 @@ static void
 sig_handler (int signo, siginfo_t *info, void *unused)
 #endif
 {
+       struct timeval                  tv;
        switch (signo) {
        case SIGUSR1:
                reopen_log ();
                break;
        case SIGINT:
        case SIGTERM:
-               _exit (1);
+               if (!wanna_die) {
+                       wanna_die = 1;
+                       tv.tv_sec = 0;
+                       tv.tv_usec = 0;
+                       event_loopexit (&tv);
+
+#ifdef WITH_GPERF_TOOLS
+                       ProfilerStop ();
+#endif
+               }
                break;
        }
 }
@@ -308,17 +319,19 @@ process_sync_command (struct controller_session *session, char **args)
        
        while (binlog_sync (binlog, rev, &time, &data)) {
                r = rspamd_snprintf (out_buf, sizeof (out_buf), "%ul %ul %ul" CRLF, (long unsigned)rev, (long unsigned)time, (long unsigned)data->len);
-               if (! rspamd_dispatcher_write (session->dispatcher, out_buf, r, TRUE, FALSE)) {
+               if (! rspamd_dispatcher_write (session->dispatcher, out_buf, r, FALSE, FALSE)) {
                        if (data != NULL) {
                                g_free (data);
                        }
                        return FALSE;
                }
-               if (!rspamd_dispatcher_write (session->dispatcher, data->data, data->len, TRUE, FALSE)) {
-                       if (data != NULL) {
-                               g_free (data);
+               if (data->data != NULL) {
+                       if (!rspamd_dispatcher_write (session->dispatcher, data->data, data->len, TRUE, FALSE)) {
+                               if (data != NULL) {
+                                       g_free (data);
+                               }
+                               return FALSE;
                        }
-                       return FALSE;
                }
                rev ++;
        }
@@ -645,6 +658,12 @@ process_command (struct controller_command *cmd, char **cmd_args, struct control
                        }
                        return TRUE;
                }
+               else {
+                       if (! rspamd_dispatcher_write (session->dispatcher, CRLF, sizeof (CRLF) - 1, FALSE, TRUE)) {
+                               return FALSE;
+                       }
+                       return TRUE;
+               }
                break;
        case COMMAND_HELP:
                r = rspamd_snprintf (out_buf, sizeof (out_buf),
index 9db7df75d80c31f3bc42fd9d322c29267c336c55..b196bd3ea2830c62698eae7f6babc7adca234938 100644 (file)
@@ -903,7 +903,7 @@ main (int argc, char **argv, char **env)
        setproctitle ("main process");
 
        /* Init statfile pool */
-       rspamd->statfile_pool = statfile_pool_new (rspamd->cfg->max_statfile_size);
+       rspamd->statfile_pool = statfile_pool_new (rspamd->server_pool, rspamd->cfg->max_statfile_size);
 
        event_init ();
        g_mime_init (0);
@@ -1002,6 +1002,8 @@ main (int argc, char **argv, char **env)
 
        msg_info ("terminating...");
 
+       statfile_pool_delete (rspamd->statfile_pool);
+
        close_log ();
 
        free_config (rspamd->cfg);
index 3a0ff51713f658616ec518b8525e18960b154be2..0460a43c8ebcbdca3393e6e8bd29c5e17a42541f 100644 (file)
@@ -205,15 +205,15 @@ statfile_pool_expire (statfile_pool_t * pool)
 }
 
 statfile_pool_t                *
-statfile_pool_new (size_t max_size)
+statfile_pool_new (memory_pool_t *pool, size_t max_size)
 {
        statfile_pool_t                *new;
 
-       new = g_malloc (sizeof (statfile_pool_t));
+       new = memory_pool_alloc_shared (pool, sizeof (statfile_pool_t));
        bzero (new, sizeof (statfile_pool_t));
        new->pool = memory_pool_new (memory_pool_get_size ());
        new->max = max_size;
-       new->files = memory_pool_alloc (new->pool, STATFILES_MAX * sizeof (stat_file_t));
+       new->files = memory_pool_alloc_shared (new->pool, STATFILES_MAX * sizeof (stat_file_t));
        new->lock = memory_pool_get_mutex (new->pool);
 
        return new;
@@ -382,13 +382,17 @@ statfile_pool_close (statfile_pool_t * pool, stat_file_t * file, gboolean keep_s
        }
 
        if (file->map) {
+               msg_info ("syncing statfile %s", file->filename);
+               msync (file->map, file->len, MS_INVALIDATE | MS_SYNC);
                munmap (file->map, file->len);
        }
        if (file->fd != -1) {
                close (file->fd);
        }
+       /* Move the remain statfiles */
+       memmove (pos, ((guint8 *)pos) + sizeof (stat_file_t),
+                       (--pool->opened - (pos - pool->files)) * sizeof (stat_file_t));
        pool->occupied -= file->len;
-       pool->opened--;
 
        memory_pool_unlock_mutex (pool->lock);
 
@@ -496,7 +500,6 @@ statfile_pool_delete (statfile_pool_t * pool)
                statfile_pool_close (pool, &pool->files[i], FALSE);
        }
        memory_pool_delete (pool->pool);
-       g_free (pool);
 }
 
 void
index a43000534ff5c3c1a8575904226ea6df5cec410f..95f800ce6973ff6060f3c62251d4a2b1435ab25e 100644 (file)
@@ -96,7 +96,7 @@ typedef struct statfile_pool_s {
  * @param max_size maximum size
  * @return statfile pool object
  */
-statfile_pool_t* statfile_pool_new (size_t max_size);
+statfile_pool_t* statfile_pool_new (memory_pool_t *pool, size_t max_size);
 
 /**
  * Open statfile and attach it to pool
index e2f62aca26dd7946b4ca774340ec4eacec769329..56328c77b73dd0aadda2fe36b765a99b9fca2b8a 100644 (file)
@@ -170,6 +170,10 @@ sync_read (f_str_t * in, void *arg)
        uint64_t                rev = 0;
        time_t                  ti = 0;
 
+       if (in->len == 0) {
+               /* Skip empty lines */
+               return TRUE;
+       }
        switch (ctx->state) {
                case SYNC_STATE_GREETING:
                        /* Skip greeting line and write sync command */
@@ -182,7 +186,7 @@ sync_read (f_str_t * in, void *arg)
                case SYNC_STATE_READ_LINE:
                        /* Try to parse line from server */
                        if (!parse_revision_line (ctx, in)) {
-                               msg_info ("cannot parse line: %*s", in->len, in->begin);
+                               msg_info ("cannot parse line of length %z: '%*s'", in->len, (int)in->len, in->begin);
                                close (ctx->sock);
                                rspamd_remove_dispatcher (ctx->dispatcher);
                                ctx->is_busy = FALSE;
@@ -198,7 +202,7 @@ sync_read (f_str_t * in, void *arg)
                                close (ctx->sock);
                                rspamd_remove_dispatcher (ctx->dispatcher);
                                ctx->is_busy = FALSE;
-                               /* Immideately return from callback */ 
+                               /* Immediately return from callback */
                                return FALSE;
                        }
                        break;
@@ -212,6 +216,7 @@ sync_read (f_str_t * in, void *arg)
                                return FALSE;
                        }
                        statfile_set_revision (ctx->real_statfile, ctx->new_rev, ctx->new_time);
+                       msg_info ("set new revision: %ul, readed %ul bytes", (long unsigned)ctx->new_rev, (long unsigned)in->len);
                        /* Now try to read other revision or END line */
                        ctx->state = SYNC_STATE_READ_LINE;
                        rspamd_set_dispatcher_policy (ctx->dispatcher, BUFFER_LINE, 0);
@@ -277,26 +282,33 @@ add_statfile_watch (statfile_pool_t *pool, struct statfile *st)
 {
        struct rspamd_sync_ctx *ctx;
        
-       ctx = memory_pool_alloc (pool->pool, sizeof (struct rspamd_sync_ctx));
-       ctx->st = st;
-       /* Add some jittering for synchronization */
-       ctx->interval.tv_sec = g_random_int_range (MAX_SYNC_TIME, MAX_SYNC_TIME * 2);
-       ctx->interval.tv_usec = 0;
-       /* Open statfile and attach it to pool */
-       if ((ctx->real_statfile = statfile_pool_is_open (pool, st->path)) == NULL) {
-               if ((ctx->real_statfile = statfile_pool_open (pool, st->path, st->size, FALSE)) == NULL) {
-                       msg_warn ("cannot open %s", st->path);
-                       if (statfile_pool_create (pool, st->path, st->size) == -1) {
-                               msg_err ("cannot create statfile %s", st->path);
-                               return FALSE;
+       if (st->binlog->master_addr.s_addr != INADDR_NONE &&
+                       st->binlog->master_addr.s_addr != INADDR_ANY) {
+               ctx = memory_pool_alloc (pool->pool, sizeof (struct rspamd_sync_ctx));
+               ctx->st = st;
+               /* Add some jittering for synchronization */
+               ctx->interval.tv_sec = g_random_int_range (MAX_SYNC_TIME, MAX_SYNC_TIME * 2);
+               ctx->interval.tv_usec = 0;
+               /* Open statfile and attach it to pool */
+               if ((ctx->real_statfile = statfile_pool_is_open (pool, st->path)) == NULL) {
+                       if ((ctx->real_statfile = statfile_pool_open (pool, st->path, st->size, FALSE)) == NULL) {
+                               msg_warn ("cannot open %s", st->path);
+                               if (statfile_pool_create (pool, st->path, st->size) == -1) {
+                                       msg_err ("cannot create statfile %s", st->path);
+                                       return FALSE;
+                               }
+                               ctx->real_statfile = statfile_pool_open (pool, st->path, st->size, FALSE);
                        }
-                       ctx->real_statfile = statfile_pool_open (pool, st->path, st->size, FALSE);
                }
+               /* Now plan event for it's future executing */
+               evtimer_set (&ctx->tm_ev, sync_timer_callback, ctx);
+               evtimer_add (&ctx->tm_ev, &ctx->interval);
+               log_next_sync (st->symbol, ctx->interval.tv_sec);
+       }
+       else {
+               msg_err ("cannot add statfile watch for statfile %s: no master defined", st->symbol);
+               return FALSE;
        }
-       /* Now plan event for it's future executing */
-       evtimer_set (&ctx->tm_ev, sync_timer_callback, ctx);
-       evtimer_add (&ctx->tm_ev, &ctx->interval);
-       log_next_sync (st->symbol, ctx->interval.tv_sec);
 
        return TRUE;
 }
index 2ada1836efee6f4cf087119a6d1d725d2d49f528..7a36a848d728a7cabcfcde48eaada6caf88924cc 100644 (file)
@@ -10,12 +10,14 @@ void
 rspamd_statfile_test_func ()
 {
        statfile_pool_t *pool;
+       memory_pool_t *p;
        stat_file_t *st;
        uint32_t random_hashes[HASHES_NUM], i, v;
        time_t now;
        
+       p = memory_pool_new (memory_pool_get_size ());
        umask (S_IWGRP | S_IWOTH);
-       pool = statfile_pool_new (10 * 1024 * 1024);
+       pool = statfile_pool_new (p, 10 * 1024 * 1024);
 
        now = time (NULL);
        /* Fill random array */