From: Vsevolod Stakhov Date: Thu, 2 Sep 2010 16:24:41 +0000 (+0400) Subject: * Many fixes to statfile syncronization system X-Git-Tag: 0.3.2~17 X-Git-Url: https://source.dussan.org/?a=commitdiff_plain;h=07a2e0b3c8ac8249f35423ab79b23b6b5ab11c51;p=rspamd.git * Many fixes to statfile syncronization system * Fixed statfile pool initialization and synchronization with disk --- diff --git a/src/binlog.c b/src/binlog.c index 8ff8112b0..83a57aacc 100644 --- a/src/binlog.c +++ b/src/binlog.c @@ -257,12 +257,13 @@ write_binlog_tree (struct rspamd_binlog *log, GTree *nodes) strerror (errno), log->metaindex->indexes[log->metaindex->last_index]); return FALSE; } + log->cur_idx->last_index ++; if (write (log->fd, log->cur_idx, sizeof (struct rspamd_index_block)) == -1) { unlock_file (log->fd, FALSE); msg_info ("cannot write index to file: %s, error: %s", log->filename, strerror (errno)); return FALSE; } - log->cur_idx->last_index ++; + unlock_file (log->fd, FALSE); return TRUE; @@ -466,6 +467,8 @@ binlog_sync (struct rspamd_binlog *log, uint64_t from_rev, uint64_t *from_time, /* Now fill reply structure */ (*rep)->len = idx->len; /* Read result */ + msg_info ("update from binlog '%s' from revision: %ul to revision %ul size is %ul", + log->filename, (long unsigned)from_rev, (long unsigned)log->cur_seq, (long unsigned)idx->len); if (lseek (log->fd, idx->seek, SEEK_SET) == -1) { msg_warn ("cannot seek file %s, error %d, %s", log->filename, errno, strerror (errno)); res = FALSE; @@ -535,6 +538,7 @@ maybe_write_binlog (struct classifier_config *ccf, struct statfile *st, stat_fil } if (binlog_insert (log, nodes)) { + msg_info ("set new revision of statfile %s: %ul", st->symbol, (long unsigned)log->cur_seq); (void)statfile_set_revision (file, log->cur_seq, log->cur_time); return TRUE; } diff --git a/src/buffer.c b/src/buffer.c index d0c36f8ea..c5b775dfc 100644 --- a/src/buffer.c +++ b/src/buffer.c @@ -403,6 +403,10 @@ read_buffers (int fd, rspamd_io_dispatcher_t * d, gboolean skip_read) b = d->in_buf->data->begin; c = b; } + else { + d->in_buf->data->len = 0; + d->in_buf->pos = d->in_buf->data->begin; + } if (d->policy != saved_policy && len != r) { debug_ip (d->peer_addr, "policy changed during callback, restart buffer's processing"); read_buffers (fd, d, TRUE); diff --git a/src/controller.c b/src/controller.c index f3cdc74de..bbf50d646 100644 --- a/src/controller.c +++ b/src/controller.c @@ -87,6 +87,7 @@ static GCompletion *comp; static time_t start_time; static char greetingbuf[1024]; +static sig_atomic_t wanna_die = 0; extern rspamd_hash_t *counters; static gboolean controller_write_socket (void *arg); @@ -99,13 +100,23 @@ static void sig_handler (int signo, siginfo_t *info, void *unused) #endif { + struct timeval tv; switch (signo) { case SIGUSR1: reopen_log (); break; case SIGINT: case SIGTERM: - _exit (1); + if (!wanna_die) { + wanna_die = 1; + tv.tv_sec = 0; + tv.tv_usec = 0; + event_loopexit (&tv); + +#ifdef WITH_GPERF_TOOLS + ProfilerStop (); +#endif + } break; } } @@ -308,17 +319,19 @@ process_sync_command (struct controller_session *session, char **args) while (binlog_sync (binlog, rev, &time, &data)) { r = rspamd_snprintf (out_buf, sizeof (out_buf), "%ul %ul %ul" CRLF, (long unsigned)rev, (long unsigned)time, (long unsigned)data->len); - if (! rspamd_dispatcher_write (session->dispatcher, out_buf, r, TRUE, FALSE)) { + if (! rspamd_dispatcher_write (session->dispatcher, out_buf, r, FALSE, FALSE)) { if (data != NULL) { g_free (data); } return FALSE; } - if (!rspamd_dispatcher_write (session->dispatcher, data->data, data->len, TRUE, FALSE)) { - if (data != NULL) { - g_free (data); + if (data->data != NULL) { + if (!rspamd_dispatcher_write (session->dispatcher, data->data, data->len, TRUE, FALSE)) { + if (data != NULL) { + g_free (data); + } + return FALSE; } - return FALSE; } rev ++; } @@ -645,6 +658,12 @@ process_command (struct controller_command *cmd, char **cmd_args, struct control } return TRUE; } + else { + if (! rspamd_dispatcher_write (session->dispatcher, CRLF, sizeof (CRLF) - 1, FALSE, TRUE)) { + return FALSE; + } + return TRUE; + } break; case COMMAND_HELP: r = rspamd_snprintf (out_buf, sizeof (out_buf), diff --git a/src/main.c b/src/main.c index 9db7df75d..b196bd3ea 100644 --- a/src/main.c +++ b/src/main.c @@ -903,7 +903,7 @@ main (int argc, char **argv, char **env) setproctitle ("main process"); /* Init statfile pool */ - rspamd->statfile_pool = statfile_pool_new (rspamd->cfg->max_statfile_size); + rspamd->statfile_pool = statfile_pool_new (rspamd->server_pool, rspamd->cfg->max_statfile_size); event_init (); g_mime_init (0); @@ -1002,6 +1002,8 @@ main (int argc, char **argv, char **env) msg_info ("terminating..."); + statfile_pool_delete (rspamd->statfile_pool); + close_log (); free_config (rspamd->cfg); diff --git a/src/statfile.c b/src/statfile.c index 3a0ff5171..0460a43c8 100644 --- a/src/statfile.c +++ b/src/statfile.c @@ -205,15 +205,15 @@ statfile_pool_expire (statfile_pool_t * pool) } statfile_pool_t * -statfile_pool_new (size_t max_size) +statfile_pool_new (memory_pool_t *pool, size_t max_size) { statfile_pool_t *new; - new = g_malloc (sizeof (statfile_pool_t)); + new = memory_pool_alloc_shared (pool, sizeof (statfile_pool_t)); bzero (new, sizeof (statfile_pool_t)); new->pool = memory_pool_new (memory_pool_get_size ()); new->max = max_size; - new->files = memory_pool_alloc (new->pool, STATFILES_MAX * sizeof (stat_file_t)); + new->files = memory_pool_alloc_shared (new->pool, STATFILES_MAX * sizeof (stat_file_t)); new->lock = memory_pool_get_mutex (new->pool); return new; @@ -382,13 +382,17 @@ statfile_pool_close (statfile_pool_t * pool, stat_file_t * file, gboolean keep_s } if (file->map) { + msg_info ("syncing statfile %s", file->filename); + msync (file->map, file->len, MS_INVALIDATE | MS_SYNC); munmap (file->map, file->len); } if (file->fd != -1) { close (file->fd); } + /* Move the remain statfiles */ + memmove (pos, ((guint8 *)pos) + sizeof (stat_file_t), + (--pool->opened - (pos - pool->files)) * sizeof (stat_file_t)); pool->occupied -= file->len; - pool->opened--; memory_pool_unlock_mutex (pool->lock); @@ -496,7 +500,6 @@ statfile_pool_delete (statfile_pool_t * pool) statfile_pool_close (pool, &pool->files[i], FALSE); } memory_pool_delete (pool->pool); - g_free (pool); } void diff --git a/src/statfile.h b/src/statfile.h index a43000534..95f800ce6 100644 --- a/src/statfile.h +++ b/src/statfile.h @@ -96,7 +96,7 @@ typedef struct statfile_pool_s { * @param max_size maximum size * @return statfile pool object */ -statfile_pool_t* statfile_pool_new (size_t max_size); +statfile_pool_t* statfile_pool_new (memory_pool_t *pool, size_t max_size); /** * Open statfile and attach it to pool diff --git a/src/statfile_sync.c b/src/statfile_sync.c index e2f62aca2..56328c77b 100644 --- a/src/statfile_sync.c +++ b/src/statfile_sync.c @@ -170,6 +170,10 @@ sync_read (f_str_t * in, void *arg) uint64_t rev = 0; time_t ti = 0; + if (in->len == 0) { + /* Skip empty lines */ + return TRUE; + } switch (ctx->state) { case SYNC_STATE_GREETING: /* Skip greeting line and write sync command */ @@ -182,7 +186,7 @@ sync_read (f_str_t * in, void *arg) case SYNC_STATE_READ_LINE: /* Try to parse line from server */ if (!parse_revision_line (ctx, in)) { - msg_info ("cannot parse line: %*s", in->len, in->begin); + msg_info ("cannot parse line of length %z: '%*s'", in->len, (int)in->len, in->begin); close (ctx->sock); rspamd_remove_dispatcher (ctx->dispatcher); ctx->is_busy = FALSE; @@ -198,7 +202,7 @@ sync_read (f_str_t * in, void *arg) close (ctx->sock); rspamd_remove_dispatcher (ctx->dispatcher); ctx->is_busy = FALSE; - /* Immideately return from callback */ + /* Immediately return from callback */ return FALSE; } break; @@ -212,6 +216,7 @@ sync_read (f_str_t * in, void *arg) return FALSE; } statfile_set_revision (ctx->real_statfile, ctx->new_rev, ctx->new_time); + msg_info ("set new revision: %ul, readed %ul bytes", (long unsigned)ctx->new_rev, (long unsigned)in->len); /* Now try to read other revision or END line */ ctx->state = SYNC_STATE_READ_LINE; rspamd_set_dispatcher_policy (ctx->dispatcher, BUFFER_LINE, 0); @@ -277,26 +282,33 @@ add_statfile_watch (statfile_pool_t *pool, struct statfile *st) { struct rspamd_sync_ctx *ctx; - ctx = memory_pool_alloc (pool->pool, sizeof (struct rspamd_sync_ctx)); - ctx->st = st; - /* Add some jittering for synchronization */ - ctx->interval.tv_sec = g_random_int_range (MAX_SYNC_TIME, MAX_SYNC_TIME * 2); - ctx->interval.tv_usec = 0; - /* Open statfile and attach it to pool */ - if ((ctx->real_statfile = statfile_pool_is_open (pool, st->path)) == NULL) { - if ((ctx->real_statfile = statfile_pool_open (pool, st->path, st->size, FALSE)) == NULL) { - msg_warn ("cannot open %s", st->path); - if (statfile_pool_create (pool, st->path, st->size) == -1) { - msg_err ("cannot create statfile %s", st->path); - return FALSE; + if (st->binlog->master_addr.s_addr != INADDR_NONE && + st->binlog->master_addr.s_addr != INADDR_ANY) { + ctx = memory_pool_alloc (pool->pool, sizeof (struct rspamd_sync_ctx)); + ctx->st = st; + /* Add some jittering for synchronization */ + ctx->interval.tv_sec = g_random_int_range (MAX_SYNC_TIME, MAX_SYNC_TIME * 2); + ctx->interval.tv_usec = 0; + /* Open statfile and attach it to pool */ + if ((ctx->real_statfile = statfile_pool_is_open (pool, st->path)) == NULL) { + if ((ctx->real_statfile = statfile_pool_open (pool, st->path, st->size, FALSE)) == NULL) { + msg_warn ("cannot open %s", st->path); + if (statfile_pool_create (pool, st->path, st->size) == -1) { + msg_err ("cannot create statfile %s", st->path); + return FALSE; + } + ctx->real_statfile = statfile_pool_open (pool, st->path, st->size, FALSE); } - ctx->real_statfile = statfile_pool_open (pool, st->path, st->size, FALSE); } + /* Now plan event for it's future executing */ + evtimer_set (&ctx->tm_ev, sync_timer_callback, ctx); + evtimer_add (&ctx->tm_ev, &ctx->interval); + log_next_sync (st->symbol, ctx->interval.tv_sec); + } + else { + msg_err ("cannot add statfile watch for statfile %s: no master defined", st->symbol); + return FALSE; } - /* Now plan event for it's future executing */ - evtimer_set (&ctx->tm_ev, sync_timer_callback, ctx); - evtimer_add (&ctx->tm_ev, &ctx->interval); - log_next_sync (st->symbol, ctx->interval.tv_sec); return TRUE; } diff --git a/test/rspamd_statfile_test.c b/test/rspamd_statfile_test.c index 2ada1836e..7a36a848d 100644 --- a/test/rspamd_statfile_test.c +++ b/test/rspamd_statfile_test.c @@ -10,12 +10,14 @@ void rspamd_statfile_test_func () { statfile_pool_t *pool; + memory_pool_t *p; stat_file_t *st; uint32_t random_hashes[HASHES_NUM], i, v; time_t now; + p = memory_pool_new (memory_pool_get_size ()); umask (S_IWGRP | S_IWOTH); - pool = statfile_pool_new (10 * 1024 * 1024); + pool = statfile_pool_new (p, 10 * 1024 * 1024); now = time (NULL); /* Fill random array */