/*
 * Copyright (c) 2009, Rambler media
 * All rights reserved.
 *
 * Redistribution and use in source and binary forms, with or without
 * modification, are permitted provided that the following conditions are met:
 *     * Redistributions of source code must retain the above copyright
 *       notice, this list of conditions and the following disclaimer.
 *     * Redistributions in binary form must reproduce the above copyright
 *       notice, this list of conditions and the following disclaimer in the
 *       documentation and/or other materials provided with the distribution.
 *
 * THIS SOFTWARE IS PROVIDED BY Rambler media ''AS IS'' AND ANY
 * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
 * DISCLAIMED. IN NO EVENT SHALL Rambler BE LIABLE FOR ANY
 * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
 * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
 * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
 * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
 */

#include "config.h"
#include "binlog.h"
#include "cfg_file.h"
#include "tokenizers/tokenizers.h"

#define BINLOG_SUFFIX ".binlog"
#define BACKUP_SUFFIX ".old"
#define VALID_MAGIC { 'r', 's', 'l' }
#define VALID_VERSION { '1', '0' }

static GHashTable *binlog_opened = NULL;
static memory_pool_t *binlog_pool = NULL;

static gboolean 
binlog_write_header (struct rspamd_binlog *log)
{
	struct rspamd_binlog_header header = {
		.magic = VALID_MAGIC,
		.version = VALID_VERSION,
		.padding = { '\0', '\0' },
	};
	
	header.create_time = time (NULL);
	lock_file (log->fd, FALSE);
	
	if (write (log->fd, &header, sizeof (struct rspamd_binlog_header)) == -1) {
		msg_warn ("cannot write file %s, error %d, %s", log->filename, errno, strerror (errno));
		return FALSE;
	}


	memcpy (&log->header, &header, sizeof (struct rspamd_binlog_header));
	
	/* Metaindex */
	log->metaindex = g_malloc (sizeof (struct rspamd_binlog_metaindex));
	bzero (log->metaindex, sizeof (struct rspamd_binlog_metaindex));
	/* Offset to metaindex */
	log->metaindex->indexes[0] = sizeof (struct rspamd_binlog_metaindex) + sizeof (struct rspamd_binlog_header);

	if (write (log->fd, log->metaindex, sizeof (struct rspamd_binlog_metaindex))  == -1) {
		g_free (log->metaindex);
		msg_warn ("cannot write file %s, error %d, %s", log->filename, errno, strerror (errno));
		unlock_file (log->fd, FALSE);
		return FALSE;
	}

	/* Alloc, write, mmap */
	log->cur_idx = g_malloc (sizeof (struct rspamd_index_block));
	bzero (log->cur_idx, sizeof (struct rspamd_index_block));
	if (write (log->fd, log->cur_idx, sizeof (struct rspamd_index_block))  == -1) {
		g_free (log->cur_idx);
		msg_warn ("cannot write file %s, error %d, %s", log->filename, errno, strerror (errno));
		unlock_file (log->fd, FALSE);
		return FALSE;
	}

	unlock_file (log->fd, FALSE);

	return TRUE;
}

static gboolean
binlog_check_file (struct rspamd_binlog *log)
{
	static char valid_magic[] = VALID_MAGIC, valid_version[] = VALID_VERSION;

	if (read (log->fd, &log->header, sizeof (struct rspamd_binlog_header)) != sizeof (struct rspamd_binlog_header)) {
		msg_warn ("cannot read file %s, error %d, %s", log->filename, errno, strerror (errno));
		return FALSE;
	}

	/* Now check all fields */
	if (memcmp (&log->header.magic, valid_magic, sizeof (valid_magic)) != 0 || 
			memcmp (&log->header.version, valid_version, sizeof (valid_version)) != 0) {
		msg_warn ("cannot validate file %s");
		return FALSE;
	}
	/* Now mmap metaindex and current index */
	if (log->metaindex == NULL) {
		log->metaindex = g_malloc (sizeof (struct rspamd_binlog_metaindex));
	}
	if ((read (log->fd, log->metaindex, sizeof (struct rspamd_binlog_metaindex))) != sizeof (struct rspamd_binlog_metaindex)) {
		msg_warn ("cannot read metaindex of file %s, error %d, %s", log->filename, errno, strerror (errno));
		return FALSE;
	}
	/* Current index */
	if (log->cur_idx == NULL) {
		log->cur_idx = g_malloc (sizeof (struct rspamd_index_block));
	}
	if (lseek (log->fd, log->metaindex->indexes[log->metaindex->last_index], SEEK_SET) == -1) {
		msg_info ("cannot seek in file: %s, error: %s", log->filename, strerror (errno));
		return FALSE;
	}
	if ((read (log->fd, log->cur_idx, sizeof (struct rspamd_index_block))) != sizeof (struct rspamd_index_block)) {
		msg_warn ("cannot read index in file %s, error %d, %s", log->filename, errno, strerror (errno));
		return FALSE;
	}

	log->cur_seq = log->cur_idx->last_index;
	log->cur_time = log->cur_idx->indexes[log->cur_idx->last_index].time;

	return TRUE;

}

static gboolean
binlog_create (struct rspamd_binlog *log)
{
	if ((log->fd = open (log->filename, O_RDWR | O_TRUNC | O_CREAT, S_IWUSR | S_IRUSR)) == -1) {
		msg_info ("cannot create file %s, error %d, %s", log->filename, errno, strerror (errno));
		return FALSE;
	}

	return binlog_write_header (log);
}

static gboolean
binlog_open_real (struct rspamd_binlog *log)
{
	if ((log->fd = open (log->filename, O_RDWR)) == -1) {
		msg_info ("cannot open file %s, error %d, %s", log->filename, errno, strerror (errno));
		return FALSE;
	}
	
	return binlog_check_file (log);
}


struct rspamd_binlog* 
binlog_open (memory_pool_t *pool, const char *path, time_t rotate_time, int rotate_jitter)
{
	struct rspamd_binlog *new;
	int len = strlen (path);
	struct stat st;

	new = memory_pool_alloc0 (pool, sizeof (struct rspamd_binlog));
	new->pool = pool;
	new->rotate_time = rotate_time;
	new->fd = -1;

	if (rotate_time) {
		new->rotate_jitter = g_random_int_range (0, rotate_jitter);
	}
	
	new->filename = memory_pool_alloc (pool, len + sizeof (BINLOG_SUFFIX));
	g_strlcpy (new->filename, path, len + 1);
	g_strlcpy (new->filename + len, BINLOG_SUFFIX, sizeof (BINLOG_SUFFIX));

	if (stat (new->filename, &st) == -1) {
		/* Check errno to check whether we should create this file */
		if (errno != ENOENT) {
			msg_err ("cannot stat file: %s, error %s", new->filename, strerror (errno));
			return NULL;
		}
		else {
			/* In case of ENOENT try to create binlog */
			if (!binlog_create (new)) {
				return NULL;
			}
		}
	}
	else {
		/* Try to open binlog */
		if (!binlog_open_real (new)) {
			return NULL;
		}
	}

	return new;
}

void 
binlog_close (struct rspamd_binlog *log)
{
	if (log) {
		if (log->metaindex) {
			g_free (log->metaindex);
		}
		if (log->cur_idx) {
			g_free (log->cur_idx);
		}
		close (log->fd);
	}
}

static gboolean
binlog_tree_callback (gpointer key, gpointer value, gpointer data)
{
	token_node_t                   *node = key;
	struct rspamd_binlog           *log = data;
	struct rspamd_binlog_element    elt;

	elt.h1 = node->h1;
	elt.h2 = node->h2;
	elt.value = node->value;
	
	if (write (log->fd, &elt, sizeof (elt)) == -1) {
		msg_info ("cannot write token to file: %s, error: %s", log->filename, strerror (errno));
		return TRUE;
	}

	return FALSE;
}

static gboolean
write_binlog_tree (struct rspamd_binlog *log, GTree *nodes)
{
	off_t seek;
	struct rspamd_binlog_index *idx;

	lock_file (log->fd, FALSE);
	log->cur_seq ++;	

	/* Seek to end of file */
	if ((seek = lseek (log->fd, 0, SEEK_END)) == -1) {
		unlock_file (log->fd, FALSE);
		msg_info ("cannot seek in file: %s, error: %s", log->filename, strerror (errno));
		return FALSE;
	}

	/* Now write all nodes to file */
	g_tree_foreach (nodes, binlog_tree_callback, (gpointer)log);

	/* Write index */
	idx = &log->cur_idx->indexes[log->cur_idx->last_index];
	idx->seek = seek;
	idx->time = (uint64_t)time (NULL);
	log->cur_time = idx->time;
	idx->len = g_tree_nnodes (nodes) * sizeof (struct rspamd_binlog_element);
	if (lseek (log->fd, log->metaindex->indexes[log->metaindex->last_index], SEEK_SET) == -1) {
		unlock_file (log->fd, FALSE);
		msg_info ("cannot seek in file: %s, error: %s, seek: %L, op: insert index", log->filename, 
				strerror (errno), log->metaindex->indexes[log->metaindex->last_index]);
		return FALSE;
	}
	if (write (log->fd, log->cur_idx, sizeof (struct rspamd_index_block)) == -1) {
		unlock_file (log->fd, FALSE);
		msg_info ("cannot write index to file: %s, error: %s", log->filename, strerror (errno));
		return FALSE;
	}
	log->cur_idx->last_index ++;
	unlock_file (log->fd, FALSE);
	
	return TRUE;
}

static gboolean
create_new_metaindex_block (struct rspamd_binlog *log)
{
	off_t seek;

	lock_file (log->fd, FALSE);
	
	log->metaindex->last_index ++;
	/* Seek to end of file */
	if ((seek = lseek (log->fd, 0, SEEK_END)) == -1) {
		unlock_file (log->fd, FALSE);
		msg_info ("cannot seek in file: %s, error: %s", log->filename, strerror (errno));
		return FALSE;
	}
	if (write (log->fd, log->cur_idx, sizeof (struct rspamd_index_block))  == -1) {
		unlock_file (log->fd, FALSE);
		g_free (log->cur_idx);
		msg_warn ("cannot write file %s, error %d, %s", log->filename, errno, strerror (errno));
		return FALSE;
	}
	/* Offset to metaindex */
	log->metaindex->indexes[log->metaindex->last_index] = seek;
	/* Overwrite all metaindexes */
	if (lseek (log->fd, sizeof (struct rspamd_binlog_header), SEEK_SET) == -1) {
		unlock_file (log->fd, FALSE);
		msg_info ("cannot seek in file: %s, error: %s", log->filename, strerror (errno));
		return FALSE;
	}
	if (write (log->fd, log->metaindex, sizeof (struct rspamd_binlog_metaindex)) == -1) {
		unlock_file (log->fd, FALSE);
		msg_info ("cannot write metaindex in file: %s, error: %s", log->filename, strerror (errno));
		return FALSE;
	}
	bzero (log->cur_idx, sizeof (struct rspamd_index_block));
	unlock_file (log->fd, FALSE);

	return TRUE;
}

static gboolean
maybe_rotate_binlog (struct rspamd_binlog *log)
{
	uint64_t now = time (NULL);

	if (log->rotate_time && ((now - log->header.create_time) > log->rotate_time + log->rotate_jitter)) {
		return TRUE;
	}
	return FALSE;
}

static gboolean
rotate_binlog (struct rspamd_binlog *log)
{
	char *backup_name;
	struct stat st;

	lock_file (log->fd, FALSE);

	/* Unmap mapped fragments */
	if (log->metaindex) {
		g_free (log->metaindex);
		log->metaindex = NULL;
	}
	if (log->cur_idx) {
		g_free (log->cur_idx);
		log->cur_idx = NULL;
	}
	/* Format backup name */
	backup_name = g_strdup_printf ("%s.%s", log->filename, BACKUP_SUFFIX);

	if (stat (backup_name, &st) != -1) {
		msg_info ("replace old %s", backup_name);
		unlink (backup_name);
	}

	rename (log->filename, backup_name);
	g_free (backup_name);

	/* XXX: maybe race condition here */
	unlock_file (log->fd, FALSE);
	close (log->fd);

	return binlog_create (log);

}

gboolean 
binlog_insert (struct rspamd_binlog *log, GTree *nodes)
{
	off_t seek;

	if (!log || !log->metaindex || !log->cur_idx || !nodes) {
		msg_info ("improperly opened binlog: %s", log->filename);
		return FALSE;
	}

	if (maybe_rotate_binlog (log)) {
		if (!rotate_binlog (log)) {
			return FALSE;
		}
	}
	/* First of all try to place new tokens in current index */
	if (log->cur_idx->last_index < BINLOG_IDX_LEN) {
		/* All is ok */
		return write_binlog_tree (log, nodes);
	}
	/* Current index table is all busy, try to allocate new index */

	/* Check metaindex free space */
	if (log->metaindex->last_index < METAINDEX_LEN) {
		/* Create new index block */
		if ((seek = lseek (log->fd, 0, SEEK_END)) == -1) {
			msg_info ("cannot seek in file: %s, error: %s", log->filename, strerror (errno));
			return FALSE;
		}
		if (!create_new_metaindex_block (log)) {
			return FALSE;
		}
		return write_binlog_tree (log, nodes);
	}
	
	/* All binlog is filled, we need to rotate it forcefully */
	if (!rotate_binlog (log)) {
		return FALSE;
	}

	return write_binlog_tree (log, nodes);
}

gboolean 
binlog_sync (struct rspamd_binlog *log, uint64_t from_rev, uint64_t *from_time, GByteArray **rep)
{
	uint32_t metaindex_num;
	struct rspamd_index_block *idxb;
	struct rspamd_binlog_index *idx;
	gboolean idx_mapped = FALSE, res = TRUE, is_first = FALSE;

	if (!log || !log->metaindex || !log->cur_idx) {
		msg_info ("improperly opened binlog: %s", log->filename);
		return FALSE;
	}

	if (*rep == NULL) {
		*rep = g_malloc (sizeof (GByteArray));
		is_first = TRUE;
	}
	else {
		/* Unmap old fragment */
		g_free ((*rep)->data);
	}
	
	if (from_rev == log->cur_seq) {
		/* Last record */
		*rep = NULL;
		return FALSE;
	}

	metaindex_num = from_rev / BINLOG_IDX_LEN;
	/* First of all try to find this revision */
	if (metaindex_num > log->metaindex->last_index) {
		return FALSE;
	}
	else if (metaindex_num != log->metaindex->last_index) {
		/* Need to remap index block */
		lock_file (log->fd, FALSE);
		idxb = g_malloc (sizeof (struct rspamd_index_block));
		idx_mapped = TRUE;
		if (lseek (log->fd, log->metaindex->indexes[metaindex_num], SEEK_SET) == -1) {
			unlock_file (log->fd, FALSE);
			msg_warn ("cannot seek file %s, error %d, %s", log->filename, errno, strerror (errno));
			res = FALSE;
			goto end;
		}
		if ((read (log->fd, idxb, sizeof (struct rspamd_index_block))) != sizeof (struct rspamd_index_block)) {
			unlock_file (log->fd, FALSE);
			msg_warn ("cannot read index from file %s, error %d, %s", log->filename, errno, strerror (errno));
			res = FALSE;
			goto end;
		}
		unlock_file (log->fd, FALSE);
	}
	else {
		idxb = log->cur_idx;
	}
	/* Now check specified index */
	idx = &idxb->indexes[from_rev % BINLOG_IDX_LEN];
	if (is_first && idx->time != *from_time) {
		res = FALSE;
		*from_time = 0;
		goto end;
	}
	else {
		*from_time = idx->time;
	}

	/* Now fill reply structure */
	(*rep)->len = idx->len;
	/* Read result */
	if (lseek (log->fd, idx->seek, SEEK_SET) == -1) {
		msg_warn ("cannot seek file %s, error %d, %s", log->filename, errno, strerror (errno));
		res = FALSE;
		goto end;
	}
	
	(*rep)->data = g_malloc (idx->len);
	if ((read (log->fd, (*rep)->data, idx->len)) != idx->len) {
		msg_warn ("cannot read file %s, error %d, %s", log->filename, errno, strerror (errno));
		res = FALSE;
		goto end;
	}

end:
	if (idx_mapped) {
		g_free (idxb);
	}

	return res;
}

static gboolean
maybe_init_static ()
{
	if (!binlog_opened) {
		binlog_opened = g_hash_table_new (g_direct_hash, g_direct_equal);
		if (!binlog_opened) {
			return FALSE;
		}
	}

	if (!binlog_pool) {
		binlog_pool = memory_pool_new (memory_pool_get_size ());
		if (!binlog_pool) {
			return FALSE;
		}
	}

	return TRUE;
}

gboolean 
maybe_write_binlog (struct classifier_config *ccf, struct statfile *st, stat_file_t *file, GTree *nodes)
{
	struct rspamd_binlog *log;

	if (ccf == NULL) {
		return FALSE;
	}

	
	if (st == NULL || nodes == NULL || st->binlog == NULL || st->binlog->affinity != AFFINITY_MASTER) {
		return FALSE;
	}

	if (!maybe_init_static ()) {
		return FALSE;
	}

	if ((log = g_hash_table_lookup (binlog_opened, st)) == NULL) {
		if ((log = binlog_open (binlog_pool, st->path, st->binlog->rotate_time, st->binlog->rotate_time / 2)) != NULL) {
			g_hash_table_insert (binlog_opened, st, log);
		}
		else {
			return FALSE;
		}
	}

	if (binlog_insert (log, nodes)) {
		(void)statfile_set_revision (file, log->cur_seq, log->cur_time);
		return TRUE;
	}

	return FALSE;
}

struct rspamd_binlog* 
get_binlog_by_statfile (struct statfile *st)
{
	struct rspamd_binlog *log;

	if (st == NULL || st->binlog == NULL || st->binlog->affinity != AFFINITY_MASTER) {
		return NULL;
	}

	if (!maybe_init_static ()) {
		return NULL;
	}

	if ((log = g_hash_table_lookup (binlog_opened, st)) == NULL) {
		if ((log = binlog_open (binlog_pool, st->path, st->binlog->rotate_time, st->binlog->rotate_time / 2)) != NULL) {
			g_hash_table_insert (binlog_opened, st, log);
		}
		else {
			return NULL;
		}
	}
	
	return log;
}