aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorVsevolod Stakhov <vsevolod@rambler-co.ru>2010-09-02 20:24:41 +0400
committerVsevolod Stakhov <vsevolod@rambler-co.ru>2010-09-02 20:24:41 +0400
commit07a2e0b3c8ac8249f35423ab79b23b6b5ab11c51 (patch)
tree231437d521e228e95de289f15b1fcc4878db4b03
parent5086821ae43d0283ed8b839c847ca267c3c81d2c (diff)
downloadrspamd-07a2e0b3c8ac8249f35423ab79b23b6b5ab11c51.tar.gz
rspamd-07a2e0b3c8ac8249f35423ab79b23b6b5ab11c51.zip
* Many fixes to statfile syncronization system
* Fixed statfile pool initialization and synchronization with disk
-rw-r--r--src/binlog.c6
-rw-r--r--src/buffer.c4
-rw-r--r--src/controller.c31
-rw-r--r--src/main.c4
-rw-r--r--src/statfile.c13
-rw-r--r--src/statfile.h2
-rw-r--r--src/statfile_sync.c50
-rw-r--r--test/rspamd_statfile_test.c4
8 files changed, 80 insertions, 34 deletions
diff --git a/src/binlog.c b/src/binlog.c
index 8ff8112b0..83a57aacc 100644
--- a/src/binlog.c
+++ b/src/binlog.c
@@ -257,12 +257,13 @@ write_binlog_tree (struct rspamd_binlog *log, GTree *nodes)
strerror (errno), log->metaindex->indexes[log->metaindex->last_index]);
return FALSE;
}
+ log->cur_idx->last_index ++;
if (write (log->fd, log->cur_idx, sizeof (struct rspamd_index_block)) == -1) {
unlock_file (log->fd, FALSE);
msg_info ("cannot write index to file: %s, error: %s", log->filename, strerror (errno));
return FALSE;
}
- log->cur_idx->last_index ++;
+
unlock_file (log->fd, FALSE);
return TRUE;
@@ -466,6 +467,8 @@ binlog_sync (struct rspamd_binlog *log, uint64_t from_rev, uint64_t *from_time,
/* Now fill reply structure */
(*rep)->len = idx->len;
/* Read result */
+ msg_info ("update from binlog '%s' from revision: %ul to revision %ul size is %ul",
+ log->filename, (long unsigned)from_rev, (long unsigned)log->cur_seq, (long unsigned)idx->len);
if (lseek (log->fd, idx->seek, SEEK_SET) == -1) {
msg_warn ("cannot seek file %s, error %d, %s", log->filename, errno, strerror (errno));
res = FALSE;
@@ -535,6 +538,7 @@ maybe_write_binlog (struct classifier_config *ccf, struct statfile *st, stat_fil
}
if (binlog_insert (log, nodes)) {
+ msg_info ("set new revision of statfile %s: %ul", st->symbol, (long unsigned)log->cur_seq);
(void)statfile_set_revision (file, log->cur_seq, log->cur_time);
return TRUE;
}
diff --git a/src/buffer.c b/src/buffer.c
index d0c36f8ea..c5b775dfc 100644
--- a/src/buffer.c
+++ b/src/buffer.c
@@ -403,6 +403,10 @@ read_buffers (int fd, rspamd_io_dispatcher_t * d, gboolean skip_read)
b = d->in_buf->data->begin;
c = b;
}
+ else {
+ d->in_buf->data->len = 0;
+ d->in_buf->pos = d->in_buf->data->begin;
+ }
if (d->policy != saved_policy && len != r) {
debug_ip (d->peer_addr, "policy changed during callback, restart buffer's processing");
read_buffers (fd, d, TRUE);
diff --git a/src/controller.c b/src/controller.c
index f3cdc74de..bbf50d646 100644
--- a/src/controller.c
+++ b/src/controller.c
@@ -87,6 +87,7 @@ static GCompletion *comp;
static time_t start_time;
static char greetingbuf[1024];
+static sig_atomic_t wanna_die = 0;
extern rspamd_hash_t *counters;
static gboolean controller_write_socket (void *arg);
@@ -99,13 +100,23 @@ static void
sig_handler (int signo, siginfo_t *info, void *unused)
#endif
{
+ struct timeval tv;
switch (signo) {
case SIGUSR1:
reopen_log ();
break;
case SIGINT:
case SIGTERM:
- _exit (1);
+ if (!wanna_die) {
+ wanna_die = 1;
+ tv.tv_sec = 0;
+ tv.tv_usec = 0;
+ event_loopexit (&tv);
+
+#ifdef WITH_GPERF_TOOLS
+ ProfilerStop ();
+#endif
+ }
break;
}
}
@@ -308,17 +319,19 @@ process_sync_command (struct controller_session *session, char **args)
while (binlog_sync (binlog, rev, &time, &data)) {
r = rspamd_snprintf (out_buf, sizeof (out_buf), "%ul %ul %ul" CRLF, (long unsigned)rev, (long unsigned)time, (long unsigned)data->len);
- if (! rspamd_dispatcher_write (session->dispatcher, out_buf, r, TRUE, FALSE)) {
+ if (! rspamd_dispatcher_write (session->dispatcher, out_buf, r, FALSE, FALSE)) {
if (data != NULL) {
g_free (data);
}
return FALSE;
}
- if (!rspamd_dispatcher_write (session->dispatcher, data->data, data->len, TRUE, FALSE)) {
- if (data != NULL) {
- g_free (data);
+ if (data->data != NULL) {
+ if (!rspamd_dispatcher_write (session->dispatcher, data->data, data->len, TRUE, FALSE)) {
+ if (data != NULL) {
+ g_free (data);
+ }
+ return FALSE;
}
- return FALSE;
}
rev ++;
}
@@ -645,6 +658,12 @@ process_command (struct controller_command *cmd, char **cmd_args, struct control
}
return TRUE;
}
+ else {
+ if (! rspamd_dispatcher_write (session->dispatcher, CRLF, sizeof (CRLF) - 1, FALSE, TRUE)) {
+ return FALSE;
+ }
+ return TRUE;
+ }
break;
case COMMAND_HELP:
r = rspamd_snprintf (out_buf, sizeof (out_buf),
diff --git a/src/main.c b/src/main.c
index 9db7df75d..b196bd3ea 100644
--- a/src/main.c
+++ b/src/main.c
@@ -903,7 +903,7 @@ main (int argc, char **argv, char **env)
setproctitle ("main process");
/* Init statfile pool */
- rspamd->statfile_pool = statfile_pool_new (rspamd->cfg->max_statfile_size);
+ rspamd->statfile_pool = statfile_pool_new (rspamd->server_pool, rspamd->cfg->max_statfile_size);
event_init ();
g_mime_init (0);
@@ -1002,6 +1002,8 @@ main (int argc, char **argv, char **env)
msg_info ("terminating...");
+ statfile_pool_delete (rspamd->statfile_pool);
+
close_log ();
free_config (rspamd->cfg);
diff --git a/src/statfile.c b/src/statfile.c
index 3a0ff5171..0460a43c8 100644
--- a/src/statfile.c
+++ b/src/statfile.c
@@ -205,15 +205,15 @@ statfile_pool_expire (statfile_pool_t * pool)
}
statfile_pool_t *
-statfile_pool_new (size_t max_size)
+statfile_pool_new (memory_pool_t *pool, size_t max_size)
{
statfile_pool_t *new;
- new = g_malloc (sizeof (statfile_pool_t));
+ new = memory_pool_alloc_shared (pool, sizeof (statfile_pool_t));
bzero (new, sizeof (statfile_pool_t));
new->pool = memory_pool_new (memory_pool_get_size ());
new->max = max_size;
- new->files = memory_pool_alloc (new->pool, STATFILES_MAX * sizeof (stat_file_t));
+ new->files = memory_pool_alloc_shared (new->pool, STATFILES_MAX * sizeof (stat_file_t));
new->lock = memory_pool_get_mutex (new->pool);
return new;
@@ -382,13 +382,17 @@ statfile_pool_close (statfile_pool_t * pool, stat_file_t * file, gboolean keep_s
}
if (file->map) {
+ msg_info ("syncing statfile %s", file->filename);
+ msync (file->map, file->len, MS_INVALIDATE | MS_SYNC);
munmap (file->map, file->len);
}
if (file->fd != -1) {
close (file->fd);
}
+ /* Move the remain statfiles */
+ memmove (pos, ((guint8 *)pos) + sizeof (stat_file_t),
+ (--pool->opened - (pos - pool->files)) * sizeof (stat_file_t));
pool->occupied -= file->len;
- pool->opened--;
memory_pool_unlock_mutex (pool->lock);
@@ -496,7 +500,6 @@ statfile_pool_delete (statfile_pool_t * pool)
statfile_pool_close (pool, &pool->files[i], FALSE);
}
memory_pool_delete (pool->pool);
- g_free (pool);
}
void
diff --git a/src/statfile.h b/src/statfile.h
index a43000534..95f800ce6 100644
--- a/src/statfile.h
+++ b/src/statfile.h
@@ -96,7 +96,7 @@ typedef struct statfile_pool_s {
* @param max_size maximum size
* @return statfile pool object
*/
-statfile_pool_t* statfile_pool_new (size_t max_size);
+statfile_pool_t* statfile_pool_new (memory_pool_t *pool, size_t max_size);
/**
* Open statfile and attach it to pool
diff --git a/src/statfile_sync.c b/src/statfile_sync.c
index e2f62aca2..56328c77b 100644
--- a/src/statfile_sync.c
+++ b/src/statfile_sync.c
@@ -170,6 +170,10 @@ sync_read (f_str_t * in, void *arg)
uint64_t rev = 0;
time_t ti = 0;
+ if (in->len == 0) {
+ /* Skip empty lines */
+ return TRUE;
+ }
switch (ctx->state) {
case SYNC_STATE_GREETING:
/* Skip greeting line and write sync command */
@@ -182,7 +186,7 @@ sync_read (f_str_t * in, void *arg)
case SYNC_STATE_READ_LINE:
/* Try to parse line from server */
if (!parse_revision_line (ctx, in)) {
- msg_info ("cannot parse line: %*s", in->len, in->begin);
+ msg_info ("cannot parse line of length %z: '%*s'", in->len, (int)in->len, in->begin);
close (ctx->sock);
rspamd_remove_dispatcher (ctx->dispatcher);
ctx->is_busy = FALSE;
@@ -198,7 +202,7 @@ sync_read (f_str_t * in, void *arg)
close (ctx->sock);
rspamd_remove_dispatcher (ctx->dispatcher);
ctx->is_busy = FALSE;
- /* Immideately return from callback */
+ /* Immediately return from callback */
return FALSE;
}
break;
@@ -212,6 +216,7 @@ sync_read (f_str_t * in, void *arg)
return FALSE;
}
statfile_set_revision (ctx->real_statfile, ctx->new_rev, ctx->new_time);
+ msg_info ("set new revision: %ul, readed %ul bytes", (long unsigned)ctx->new_rev, (long unsigned)in->len);
/* Now try to read other revision or END line */
ctx->state = SYNC_STATE_READ_LINE;
rspamd_set_dispatcher_policy (ctx->dispatcher, BUFFER_LINE, 0);
@@ -277,26 +282,33 @@ add_statfile_watch (statfile_pool_t *pool, struct statfile *st)
{
struct rspamd_sync_ctx *ctx;
- ctx = memory_pool_alloc (pool->pool, sizeof (struct rspamd_sync_ctx));
- ctx->st = st;
- /* Add some jittering for synchronization */
- ctx->interval.tv_sec = g_random_int_range (MAX_SYNC_TIME, MAX_SYNC_TIME * 2);
- ctx->interval.tv_usec = 0;
- /* Open statfile and attach it to pool */
- if ((ctx->real_statfile = statfile_pool_is_open (pool, st->path)) == NULL) {
- if ((ctx->real_statfile = statfile_pool_open (pool, st->path, st->size, FALSE)) == NULL) {
- msg_warn ("cannot open %s", st->path);
- if (statfile_pool_create (pool, st->path, st->size) == -1) {
- msg_err ("cannot create statfile %s", st->path);
- return FALSE;
+ if (st->binlog->master_addr.s_addr != INADDR_NONE &&
+ st->binlog->master_addr.s_addr != INADDR_ANY) {
+ ctx = memory_pool_alloc (pool->pool, sizeof (struct rspamd_sync_ctx));
+ ctx->st = st;
+ /* Add some jittering for synchronization */
+ ctx->interval.tv_sec = g_random_int_range (MAX_SYNC_TIME, MAX_SYNC_TIME * 2);
+ ctx->interval.tv_usec = 0;
+ /* Open statfile and attach it to pool */
+ if ((ctx->real_statfile = statfile_pool_is_open (pool, st->path)) == NULL) {
+ if ((ctx->real_statfile = statfile_pool_open (pool, st->path, st->size, FALSE)) == NULL) {
+ msg_warn ("cannot open %s", st->path);
+ if (statfile_pool_create (pool, st->path, st->size) == -1) {
+ msg_err ("cannot create statfile %s", st->path);
+ return FALSE;
+ }
+ ctx->real_statfile = statfile_pool_open (pool, st->path, st->size, FALSE);
}
- ctx->real_statfile = statfile_pool_open (pool, st->path, st->size, FALSE);
}
+ /* Now plan event for it's future executing */
+ evtimer_set (&ctx->tm_ev, sync_timer_callback, ctx);
+ evtimer_add (&ctx->tm_ev, &ctx->interval);
+ log_next_sync (st->symbol, ctx->interval.tv_sec);
+ }
+ else {
+ msg_err ("cannot add statfile watch for statfile %s: no master defined", st->symbol);
+ return FALSE;
}
- /* Now plan event for it's future executing */
- evtimer_set (&ctx->tm_ev, sync_timer_callback, ctx);
- evtimer_add (&ctx->tm_ev, &ctx->interval);
- log_next_sync (st->symbol, ctx->interval.tv_sec);
return TRUE;
}
diff --git a/test/rspamd_statfile_test.c b/test/rspamd_statfile_test.c
index 2ada1836e..7a36a848d 100644
--- a/test/rspamd_statfile_test.c
+++ b/test/rspamd_statfile_test.c
@@ -10,12 +10,14 @@ void
rspamd_statfile_test_func ()
{
statfile_pool_t *pool;
+ memory_pool_t *p;
stat_file_t *st;
uint32_t random_hashes[HASHES_NUM], i, v;
time_t now;
+ p = memory_pool_new (memory_pool_get_size ());
umask (S_IWGRP | S_IWOTH);
- pool = statfile_pool_new (10 * 1024 * 1024);
+ pool = statfile_pool_new (p, 10 * 1024 * 1024);
now = time (NULL);
/* Fill random array */