strerror (errno), log->metaindex->indexes[log->metaindex->last_index]);
return FALSE;
}
+ log->cur_idx->last_index ++;
if (write (log->fd, log->cur_idx, sizeof (struct rspamd_index_block)) == -1) {
unlock_file (log->fd, FALSE);
msg_info ("cannot write index to file: %s, error: %s", log->filename, strerror (errno));
return FALSE;
}
- log->cur_idx->last_index ++;
+
unlock_file (log->fd, FALSE);
return TRUE;
/* Now fill reply structure */
(*rep)->len = idx->len;
/* Read result */
+ msg_info ("update from binlog '%s' from revision: %ul to revision %ul size is %ul",
+ log->filename, (long unsigned)from_rev, (long unsigned)log->cur_seq, (long unsigned)idx->len);
if (lseek (log->fd, idx->seek, SEEK_SET) == -1) {
msg_warn ("cannot seek file %s, error %d, %s", log->filename, errno, strerror (errno));
res = FALSE;
}
if (binlog_insert (log, nodes)) {
+ msg_info ("set new revision of statfile %s: %ul", st->symbol, (long unsigned)log->cur_seq);
(void)statfile_set_revision (file, log->cur_seq, log->cur_time);
return TRUE;
}
b = d->in_buf->data->begin;
c = b;
}
+ else {
+ d->in_buf->data->len = 0;
+ d->in_buf->pos = d->in_buf->data->begin;
+ }
if (d->policy != saved_policy && len != r) {
debug_ip (d->peer_addr, "policy changed during callback, restart buffer's processing");
read_buffers (fd, d, TRUE);
static time_t start_time;
static char greetingbuf[1024];
+static sig_atomic_t wanna_die = 0;
extern rspamd_hash_t *counters;
static gboolean controller_write_socket (void *arg);
sig_handler (int signo, siginfo_t *info, void *unused)
#endif
{
+ struct timeval tv;
switch (signo) {
case SIGUSR1:
reopen_log ();
break;
case SIGINT:
case SIGTERM:
- _exit (1);
+ if (!wanna_die) {
+ wanna_die = 1;
+ tv.tv_sec = 0;
+ tv.tv_usec = 0;
+ event_loopexit (&tv);
+
+#ifdef WITH_GPERF_TOOLS
+ ProfilerStop ();
+#endif
+ }
break;
}
}
while (binlog_sync (binlog, rev, &time, &data)) {
r = rspamd_snprintf (out_buf, sizeof (out_buf), "%ul %ul %ul" CRLF, (long unsigned)rev, (long unsigned)time, (long unsigned)data->len);
- if (! rspamd_dispatcher_write (session->dispatcher, out_buf, r, TRUE, FALSE)) {
+ if (! rspamd_dispatcher_write (session->dispatcher, out_buf, r, FALSE, FALSE)) {
if (data != NULL) {
g_free (data);
}
return FALSE;
}
- if (!rspamd_dispatcher_write (session->dispatcher, data->data, data->len, TRUE, FALSE)) {
- if (data != NULL) {
- g_free (data);
+ if (data->data != NULL) {
+ if (!rspamd_dispatcher_write (session->dispatcher, data->data, data->len, TRUE, FALSE)) {
+ if (data != NULL) {
+ g_free (data);
+ }
+ return FALSE;
}
- return FALSE;
}
rev ++;
}
}
return TRUE;
}
+ else {
+ if (! rspamd_dispatcher_write (session->dispatcher, CRLF, sizeof (CRLF) - 1, FALSE, TRUE)) {
+ return FALSE;
+ }
+ return TRUE;
+ }
break;
case COMMAND_HELP:
r = rspamd_snprintf (out_buf, sizeof (out_buf),
setproctitle ("main process");
/* Init statfile pool */
- rspamd->statfile_pool = statfile_pool_new (rspamd->cfg->max_statfile_size);
+ rspamd->statfile_pool = statfile_pool_new (rspamd->server_pool, rspamd->cfg->max_statfile_size);
event_init ();
g_mime_init (0);
msg_info ("terminating...");
+ statfile_pool_delete (rspamd->statfile_pool);
+
close_log ();
free_config (rspamd->cfg);
}
statfile_pool_t *
-statfile_pool_new (size_t max_size)
+statfile_pool_new (memory_pool_t *pool, size_t max_size)
{
statfile_pool_t *new;
- new = g_malloc (sizeof (statfile_pool_t));
+ new = memory_pool_alloc_shared (pool, sizeof (statfile_pool_t));
bzero (new, sizeof (statfile_pool_t));
new->pool = memory_pool_new (memory_pool_get_size ());
new->max = max_size;
- new->files = memory_pool_alloc (new->pool, STATFILES_MAX * sizeof (stat_file_t));
+ new->files = memory_pool_alloc_shared (new->pool, STATFILES_MAX * sizeof (stat_file_t));
new->lock = memory_pool_get_mutex (new->pool);
return new;
}
if (file->map) {
+ msg_info ("syncing statfile %s", file->filename);
+ msync (file->map, file->len, MS_INVALIDATE | MS_SYNC);
munmap (file->map, file->len);
}
if (file->fd != -1) {
close (file->fd);
}
+ /* Move the remain statfiles */
+ memmove (pos, ((guint8 *)pos) + sizeof (stat_file_t),
+ (--pool->opened - (pos - pool->files)) * sizeof (stat_file_t));
pool->occupied -= file->len;
- pool->opened--;
memory_pool_unlock_mutex (pool->lock);
statfile_pool_close (pool, &pool->files[i], FALSE);
}
memory_pool_delete (pool->pool);
- g_free (pool);
}
void
* @param max_size maximum size
* @return statfile pool object
*/
-statfile_pool_t* statfile_pool_new (size_t max_size);
+statfile_pool_t* statfile_pool_new (memory_pool_t *pool, size_t max_size);
/**
* Open statfile and attach it to pool
uint64_t rev = 0;
time_t ti = 0;
+ if (in->len == 0) {
+ /* Skip empty lines */
+ return TRUE;
+ }
switch (ctx->state) {
case SYNC_STATE_GREETING:
/* Skip greeting line and write sync command */
case SYNC_STATE_READ_LINE:
/* Try to parse line from server */
if (!parse_revision_line (ctx, in)) {
- msg_info ("cannot parse line: %*s", in->len, in->begin);
+ msg_info ("cannot parse line of length %z: '%*s'", in->len, (int)in->len, in->begin);
close (ctx->sock);
rspamd_remove_dispatcher (ctx->dispatcher);
ctx->is_busy = FALSE;
close (ctx->sock);
rspamd_remove_dispatcher (ctx->dispatcher);
ctx->is_busy = FALSE;
- /* Immideately return from callback */
+ /* Immediately return from callback */
return FALSE;
}
break;
return FALSE;
}
statfile_set_revision (ctx->real_statfile, ctx->new_rev, ctx->new_time);
+ msg_info ("set new revision: %ul, readed %ul bytes", (long unsigned)ctx->new_rev, (long unsigned)in->len);
/* Now try to read other revision or END line */
ctx->state = SYNC_STATE_READ_LINE;
rspamd_set_dispatcher_policy (ctx->dispatcher, BUFFER_LINE, 0);
{
struct rspamd_sync_ctx *ctx;
- ctx = memory_pool_alloc (pool->pool, sizeof (struct rspamd_sync_ctx));
- ctx->st = st;
- /* Add some jittering for synchronization */
- ctx->interval.tv_sec = g_random_int_range (MAX_SYNC_TIME, MAX_SYNC_TIME * 2);
- ctx->interval.tv_usec = 0;
- /* Open statfile and attach it to pool */
- if ((ctx->real_statfile = statfile_pool_is_open (pool, st->path)) == NULL) {
- if ((ctx->real_statfile = statfile_pool_open (pool, st->path, st->size, FALSE)) == NULL) {
- msg_warn ("cannot open %s", st->path);
- if (statfile_pool_create (pool, st->path, st->size) == -1) {
- msg_err ("cannot create statfile %s", st->path);
- return FALSE;
+ if (st->binlog->master_addr.s_addr != INADDR_NONE &&
+ st->binlog->master_addr.s_addr != INADDR_ANY) {
+ ctx = memory_pool_alloc (pool->pool, sizeof (struct rspamd_sync_ctx));
+ ctx->st = st;
+ /* Add some jittering for synchronization */
+ ctx->interval.tv_sec = g_random_int_range (MAX_SYNC_TIME, MAX_SYNC_TIME * 2);
+ ctx->interval.tv_usec = 0;
+ /* Open statfile and attach it to pool */
+ if ((ctx->real_statfile = statfile_pool_is_open (pool, st->path)) == NULL) {
+ if ((ctx->real_statfile = statfile_pool_open (pool, st->path, st->size, FALSE)) == NULL) {
+ msg_warn ("cannot open %s", st->path);
+ if (statfile_pool_create (pool, st->path, st->size) == -1) {
+ msg_err ("cannot create statfile %s", st->path);
+ return FALSE;
+ }
+ ctx->real_statfile = statfile_pool_open (pool, st->path, st->size, FALSE);
}
- ctx->real_statfile = statfile_pool_open (pool, st->path, st->size, FALSE);
}
+ /* Now plan event for it's future executing */
+ evtimer_set (&ctx->tm_ev, sync_timer_callback, ctx);
+ evtimer_add (&ctx->tm_ev, &ctx->interval);
+ log_next_sync (st->symbol, ctx->interval.tv_sec);
+ }
+ else {
+ msg_err ("cannot add statfile watch for statfile %s: no master defined", st->symbol);
+ return FALSE;
}
- /* Now plan event for it's future executing */
- evtimer_set (&ctx->tm_ev, sync_timer_callback, ctx);
- evtimer_add (&ctx->tm_ev, &ctx->interval);
- log_next_sync (st->symbol, ctx->interval.tv_sec);
return TRUE;
}
rspamd_statfile_test_func ()
{
statfile_pool_t *pool;
+ memory_pool_t *p;
stat_file_t *st;
uint32_t random_hashes[HASHES_NUM], i, v;
time_t now;
+ p = memory_pool_new (memory_pool_get_size ());
umask (S_IWGRP | S_IWOTH);
- pool = statfile_pool_new (10 * 1024 * 1024);
+ pool = statfile_pool_new (p, 10 * 1024 * 1024);
now = time (NULL);
/* Fill random array */