/*-
 * Copyright 2016 Vsevolod Stakhov
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
/*
 * Implementation of map files handling
 */
#include "config.h"
#include "map.h"
#include "http.h"
#include "rspamd.h"
#include "cryptobox.h"
#include "unix-std.h"

static const gchar *hash_fill = "1";

/**
 * Data specific to file maps
 */
struct file_map_data {
	const gchar *filename;
	struct stat st;
};

/**
 * Data specific to HTTP maps
 */
struct http_map_data {
	struct addrinfo *addr;
	guint16 port;
	gchar *path;
	gchar *host;
	time_t last_checked;
	gboolean request_sent;
	struct rspamd_http_connection *conn;
};


struct http_callback_data {
	struct event_base *ev_base;
	struct timeval tv;
	struct rspamd_map *map;
	struct http_map_data *data;
	struct map_cb_data cbdata;

	GString *remain_buf;

	gint fd;
};

/* Value in seconds after whitch we would try to do stat on list file */

/* HTTP timeouts */
#define HTTP_CONNECT_TIMEOUT 2
#define HTTP_READ_TIMEOUT 10

/**
 * Helper for HTTP connection establishment
 */
static gint
connect_http (struct rspamd_map *map,
	struct http_map_data *data,
	gboolean is_async)
{
	gint sock;
	rspamd_mempool_t *pool;

	pool = map->pool;

	if ((sock = rspamd_socket_tcp (data->addr, FALSE, is_async)) == -1) {
		msg_info_pool ("cannot connect to http server %s: %d, %s",
			data->host,
			errno,
			strerror (errno));
		return -1;
	}

	return sock;
}

/**
 * Write HTTP request
 */
static void
write_http_request (struct http_callback_data *cbd)
{
	gchar datebuf[128];
	struct tm *tm;
	struct rspamd_http_message *msg;

	msg = rspamd_http_new_message (HTTP_REQUEST);

	msg->url = rspamd_fstring_new_init (cbd->data->path, strlen (cbd->data->path));
	if (cbd->data->last_checked != 0) {
		tm = gmtime (&cbd->data->last_checked);
		strftime (datebuf, sizeof (datebuf), "%a, %d %b %Y %H:%M:%S %Z", tm);

		rspamd_http_message_add_header (msg, "If-Modified-Since", datebuf);
	}

	rspamd_http_connection_write_message (cbd->data->conn, msg, cbd->data->host,
		NULL, cbd, cbd->fd, &cbd->tv, cbd->ev_base);
}

/**
 * Callback for destroying HTTP callback data
 */
static void
free_http_cbdata (struct http_callback_data *cbd)
{
	g_atomic_int_set (cbd->map->locked, 0);
	if (cbd->remain_buf) {
		g_string_free (cbd->remain_buf, TRUE);
	}

	rspamd_http_connection_reset (cbd->data->conn);
	close (cbd->fd);
	g_slice_free1 (sizeof (struct http_callback_data), cbd);
}

/*
 * HTTP callbacks
 */
static void
http_map_error (struct rspamd_http_connection *conn,
	GError *err)
{
	struct http_callback_data *cbd = conn->ud;
	rspamd_mempool_t *pool;

	pool = cbd->map->pool;

	msg_err_pool ("connection with http server terminated incorrectly: %s",
			err->message);
	free_http_cbdata (cbd);
}

static int
http_map_finish (struct rspamd_http_connection *conn,
		struct rspamd_http_message *msg)
{
	struct http_callback_data *cbd = conn->ud;
	struct rspamd_map *map;
	rspamd_mempool_t *pool;

	map = cbd->map;
	pool = cbd->map->pool;

	if (msg->code == 200) {
		if (cbd->remain_buf != NULL) {
			/* Append \n to avoid issues */
			g_string_append_c (cbd->remain_buf, '\n');
			map->read_callback (map->pool, cbd->remain_buf->str,
					cbd->remain_buf->len, &cbd->cbdata);
		}

		map->fin_callback (map->pool, &cbd->cbdata);
		*map->user_data = cbd->cbdata.cur_data;
		cbd->data->last_checked = msg->date;
		msg_info_pool ("read map data from %s", cbd->data->host);
	}
	else if (msg->code == 304) {
		msg_debug_pool ("data is not modified for server %s",
				cbd->data->host);
		cbd->data->last_checked = msg->date;
	}
	else {
		msg_info_pool ("cannot load map %s from %s: HTTP error %d",
				map->uri, cbd->data->host, msg->code);
	}

	free_http_cbdata (cbd);

	return 0;
}

static int
http_map_read (struct rspamd_http_connection *conn,
	struct rspamd_http_message *msg,
	const gchar *chunk,
	gsize len)
{
	struct http_callback_data *cbd = conn->ud;
	gchar *pos;
	struct rspamd_map *map;

	if (msg->code != 200 || len == 0) {
		/* Ignore not full replies */
		return 0;
	}

	map = cbd->map;
	if (cbd->remain_buf != NULL) {
		/* We need to concatenate incoming buf with the remaining buf */
		g_string_append_len (cbd->remain_buf, chunk, len);

		pos = map->read_callback (map->pool, cbd->remain_buf->str,
				cbd->remain_buf->len, &cbd->cbdata);

		/* All read */
		if (pos == NULL) {
			g_string_free (cbd->remain_buf, TRUE);
			cbd->remain_buf = NULL;
		}
		else {
			/* Need to erase data processed */
			g_string_erase (cbd->remain_buf, 0, pos - cbd->remain_buf->str);
		}
	}
	else {
		pos = map->read_callback (map->pool, (gchar *)chunk, len, &cbd->cbdata);

		if (pos != NULL) {
			/* Store data in remain buf */
			cbd->remain_buf = g_string_new_len (pos, len - (pos - chunk));
		}
	}

	return 0;
}

/**
 * Callback for reading data from file
 */
static void
read_map_file (struct rspamd_map *map, struct file_map_data *data)
{
	struct map_cb_data cbdata;
	gchar buf[BUFSIZ], *remain = NULL;
	ssize_t r;
	gint fd, rlen, tlen;
	rspamd_mempool_t *pool = map->pool;

	if (map->read_callback == NULL || map->fin_callback == NULL) {
		msg_err_pool ("bad callback for reading map file");
		return;
	}

	if ((fd = open (data->filename, O_RDONLY)) == -1) {
		msg_warn_pool ("cannot open file '%s': %s", data->filename,
			strerror (errno));
		return;
	}

	cbdata.state = 0;
	cbdata.prev_data = *map->user_data;
	cbdata.cur_data = NULL;
	cbdata.map = map;

	rlen = 0;
	tlen = 0;
	while ((r = read (fd, buf + rlen, sizeof (buf) - rlen - 2)) > 0) {
		r += rlen;
		tlen += r;
		buf[r] = '\0';
		remain = map->read_callback (map->pool, buf, r, &cbdata);

		if (remain != NULL) {
			/* copy remaining buffer to start of buffer */
			rlen = r - (remain - buf);
			memmove (buf, remain, rlen);
		}
		else {
			rlen = 0;
		}
	}

	if (remain != NULL && remain > buf) {
		g_assert (rlen <= (gint)sizeof (buf) - 2);
		buf[rlen++] = '\n';
		buf[rlen] = '\0';
		tlen += rlen;
		map->read_callback (map->pool, buf, rlen, &cbdata);
	}

	close (fd);

	if (tlen > 0) {
		map->fin_callback (map->pool, &cbdata);
		*map->user_data = cbdata.cur_data;
	}
}

static void
jitter_timeout_event (struct rspamd_map *map, gboolean locked, gboolean initial)
{
	gdouble jittered_sec;
	gdouble timeout = initial ? 1.0 : map->cfg->map_timeout;

	/* Plan event again with jitter */
	evtimer_del (&map->ev);
	jittered_sec = rspamd_time_jitter (locked ? timeout * 4 : timeout, 0);
	double_to_tv (jittered_sec, &map->tv);

	evtimer_add (&map->ev, &map->tv);
}

/**
 * Common file callback
 */
static void
file_callback (gint fd, short what, void *ud)
{
	struct rspamd_map *map = ud;
	struct file_map_data *data = map->map_data;
	struct stat st;
	rspamd_mempool_t *pool;

	pool = map->pool;

	if (g_atomic_int_get (map->locked)) {
		msg_info_pool (
			"don't try to reread map as it is locked by other process, will reread it later");
		jitter_timeout_event (map, TRUE, FALSE);
		return;
	}

	g_atomic_int_inc (map->locked);
	jitter_timeout_event (map, FALSE, FALSE);
	if (stat (data->filename,
		&st) != -1 &&
		(st.st_mtime > data->st.st_mtime || data->st.st_mtime == -1)) {
		/* File was modified since last check */
		memcpy (&data->st, &st, sizeof (struct stat));
	}
	else {
		g_atomic_int_set (map->locked, 0);
		return;
	}

	msg_info_pool ("rereading map file %s", data->filename);
	read_map_file (map, data);
	g_atomic_int_set (map->locked, 0);
}

/**
 * Async HTTP callback
 */
static void
http_callback (gint fd, short what, void *ud)
{
	struct rspamd_map *map = ud;
	struct http_map_data *data;
	gint sock;
	struct http_callback_data *cbd;
	rspamd_mempool_t *pool;

	data = map->map_data;
	pool = map->pool;

	if (g_atomic_int_get (map->locked)) {
		msg_info_pool (
			"don't try to reread map as it is locked by other process, will reread it later");
		if (data->conn->ud == NULL) {
			jitter_timeout_event (map, TRUE, TRUE);
		}
		else {
			jitter_timeout_event (map, TRUE, FALSE);
		}
		return;
	}

	g_atomic_int_inc (map->locked);
	jitter_timeout_event (map, FALSE, FALSE);
	/* Connect asynced */
	if ((sock = connect_http (map, data, TRUE)) == -1) {
		g_atomic_int_set (map->locked, 0);
		return;
	}
	else {
		/* Plan event */
		cbd = g_slice_alloc (sizeof (struct http_callback_data));
		cbd->ev_base = map->ev_base;
		cbd->map = map;
		cbd->data = data;
		cbd->remain_buf = NULL;
		cbd->cbdata.state = 0;
		cbd->cbdata.prev_data = *cbd->map->user_data;
		cbd->cbdata.cur_data = NULL;
		cbd->cbdata.map = cbd->map;
		cbd->tv.tv_sec = HTTP_CONNECT_TIMEOUT;
		cbd->tv.tv_usec = 0;
		cbd->fd = sock;
		data->conn->ud = cbd;
		msg_debug_pool ("reading map data from %s", data->host);
		write_http_request (cbd);
	}
}

/* Start watching event for all maps */
void
rspamd_map_watch (struct rspamd_config *cfg, struct event_base *ev_base)
{
	GList *cur = cfg->maps;
	struct rspamd_map *map;
	struct file_map_data *fdata;

	/* First of all do synced read of data */
	while (cur) {
		map = cur->data;
		map->ev_base = ev_base;
		event_base_set (map->ev_base, &map->ev);
		if (map->protocol == MAP_PROTO_FILE) {
			evtimer_set (&map->ev, file_callback, map);
			/* Read initial data */
			fdata = map->map_data;
			if (fdata->st.st_mtime != -1) {
				/* Do not try to read non-existent file */
				read_map_file (map, map->map_data);
			}
			/* Plan event with jitter */
			jitter_timeout_event (map, FALSE, TRUE);
		}
		else if (map->protocol == MAP_PROTO_HTTP) {
			evtimer_set (&map->ev, http_callback, map);
			jitter_timeout_event (map, FALSE, TRUE);
		}
		cur = g_list_next (cur);
	}
}

void
rspamd_map_remove_all (struct rspamd_config *cfg)
{
	g_list_free (cfg->maps);
	cfg->maps = NULL;
	if (cfg->map_pool != NULL) {
		rspamd_mempool_delete (cfg->map_pool);
		cfg->map_pool = NULL;
	}
}

gboolean
rspamd_map_check_proto (const gchar *map_line, gint *res, const gchar **pos)
{
	g_assert (res != NULL);
	g_assert (pos != NULL);

	if (g_ascii_strncasecmp (map_line, "http://",
			sizeof ("http://") - 1) == 0) {
		*res = MAP_PROTO_HTTP;
		*pos = map_line + sizeof ("http://") - 1;
	}
	else if (g_ascii_strncasecmp (map_line, "file://", sizeof ("file://") -
			1) == 0) {
		*res = MAP_PROTO_FILE;
		*pos = map_line + sizeof ("file://") - 1;
	}
	else if (*map_line == '/') {
		/* Trivial file case */
		*res = MAP_PROTO_FILE;
		*pos = map_line;
	}
	else {
		msg_debug ("invalid map fetching protocol: %s", map_line);
		return FALSE;
	}

	return TRUE;
}

gboolean
rspamd_map_add (struct rspamd_config *cfg,
	const gchar *map_line,
	const gchar *description,
	map_cb_t read_callback,
	map_fin_cb_t fin_callback,
	void **user_data)
{
	struct rspamd_map *new_map;
	enum fetch_proto proto;
	const gchar *def, *p, *hostend;
	struct file_map_data *fdata;
	struct http_map_data *hdata;
	gchar portbuf[6], *cksum_encoded, cksum[rspamd_cryptobox_HASHBYTES];
	gint i, s, r;
	struct addrinfo hints, *res;
	rspamd_mempool_t *pool;

	/* First of all detect protocol line */
	if (!rspamd_map_check_proto (map_line, (int *)&proto, &def)) {
		return FALSE;
	}
	/* Constant pool */
	if (cfg->map_pool == NULL) {
		cfg->map_pool = rspamd_mempool_new (rspamd_mempool_suggest_size (),
				"map");
		memcpy (cfg->map_pool->tag.uid, cfg->cfg_pool->tag.uid,
				sizeof (cfg->map_pool->tag.uid));
	}

	new_map = rspamd_mempool_alloc0 (cfg->map_pool, sizeof (struct rspamd_map));
	new_map->read_callback = read_callback;
	new_map->fin_callback = fin_callback;
	new_map->user_data = user_data;
	new_map->protocol = proto;
	new_map->cfg = cfg;
	new_map->id = g_random_int ();
	new_map->locked =
		rspamd_mempool_alloc0_shared (cfg->cfg_pool, sizeof (gint));

	if (proto == MAP_PROTO_FILE) {
		new_map->uri = rspamd_mempool_strdup (cfg->cfg_pool, def);
		def = new_map->uri;
	}
	else {
		new_map->uri = rspamd_mempool_strdup (cfg->cfg_pool, map_line);
	}
	if (description != NULL) {
		new_map->description =
			rspamd_mempool_strdup (cfg->cfg_pool, description);
	}

	/* Now check for each proto separately */
	if (proto == MAP_PROTO_FILE) {
		fdata =
			rspamd_mempool_alloc0 (cfg->map_pool,
				sizeof (struct file_map_data));
		if (access (def, R_OK) == -1) {
			if (errno != ENOENT) {
				msg_err_config ("cannot open file '%s': %s", def, strerror
						(errno));
				return FALSE;

			}
			msg_info_config (
				"map '%s' is not found, but it can be loaded automatically later",
				def);
			/* We still can add this file */
			fdata->st.st_mtime = -1;
		}
		else {
			stat (def, &fdata->st);
		}
		fdata->filename = rspamd_mempool_strdup (cfg->map_pool, def);
		new_map->map_data = fdata;
	}
	else if (proto == MAP_PROTO_HTTP) {
		hdata =
			rspamd_mempool_alloc0 (cfg->map_pool,
				sizeof (struct http_map_data));
		/* Try to search port */
		if ((p = strchr (def, ':')) != NULL) {
			hostend = p;
			i = 0;
			p++;
			while (g_ascii_isdigit (*p) && i < (gint)sizeof (portbuf) - 1) {
				portbuf[i++] = *p++;
			}
			if (*p != '/') {
				msg_info_config ("bad http map definition: %s", def);
				return FALSE;
			}
			portbuf[i] = '\0';
			hdata->port = atoi (portbuf);
		}
		else {
			/* Default http port */
			rspamd_snprintf (portbuf, sizeof (portbuf), "80");
			hdata->port = 80;
			/* Now separate host from path */
			if ((p = strchr (def, '/')) == NULL) {
				msg_info_config ("bad http map definition: %s", def);
				return FALSE;
			}
			hostend = p;
		}
		hdata->host = rspamd_mempool_alloc (cfg->map_pool, hostend - def + 1);
		rspamd_strlcpy (hdata->host, def, hostend - def + 1);
		hdata->path = rspamd_mempool_strdup (cfg->map_pool, p);
		/* Now try to resolve */
		memset (&hints, 0, sizeof (hints));
		hints.ai_family = AF_UNSPEC;     /* Allow IPv4 or IPv6 */
		hints.ai_socktype = SOCK_STREAM; /* Stream socket */
		hints.ai_flags = 0;
		hints.ai_protocol = 0;           /* Any protocol */
		hints.ai_canonname = NULL;
		hints.ai_addr = NULL;
		hints.ai_next = NULL;

		if ((r = getaddrinfo (hdata->host, portbuf, &hints, &res)) == 0) {
			hdata->addr = res;
			rspamd_mempool_add_destructor (cfg->cfg_pool,
				(rspamd_mempool_destruct_t)freeaddrinfo, hdata->addr);
		}
		else {
			msg_err_config ("address resolution for %s failed: %s",
				hdata->host,
				gai_strerror (r));
			return FALSE;
		}
		/* Now try to connect */
		if ((s = rspamd_socket_tcp (hdata->addr, FALSE, FALSE)) == -1) {
			msg_info_config ("cannot connect to http server %s: %d, %s",
				hdata->host,
				errno,
				strerror (errno));
			return FALSE;
		}
		close (s);
		hdata->conn = rspamd_http_connection_new (http_map_read, http_map_error,
			http_map_finish,
			RSPAMD_HTTP_BODY_PARTIAL | RSPAMD_HTTP_CLIENT_SIMPLE,
			RSPAMD_HTTP_CLIENT, NULL);
		new_map->map_data = hdata;
	}
	/* Temp pool */
	rspamd_cryptobox_hash (cksum, new_map->uri, strlen (new_map->uri), NULL, 0);
	cksum_encoded = rspamd_encode_base32 (cksum, sizeof (cksum));
	new_map->pool = rspamd_mempool_new (rspamd_mempool_suggest_size (), "map");
	memcpy (new_map->pool->tag.uid, cksum_encoded,
			sizeof (new_map->pool->tag.uid));
	g_free (cksum_encoded);
	pool = new_map->pool;
	msg_info_pool ("added map %s", new_map->uri);


	cfg->maps = g_list_prepend (cfg->maps, new_map);

	return TRUE;
}

static gchar*
strip_map_elt (rspamd_mempool_t *pool, const gchar *start,
		size_t len)
{
	gchar *res = NULL;
	const gchar *c = start, *p = start + len - 1;

	/* Strip starting spaces */
	while (g_ascii_isspace (*c)) {
		c ++;
	}

	/* Strip ending spaces */
	while (g_ascii_isspace (*p) && p >= c) {
		p --;
	}

	/* One symbol up */
	p ++;

	if (p - c > 0) {
		res = rspamd_mempool_alloc (pool, p - c + 1);
		rspamd_strlcpy (res, c, p - c + 1);
	}

	return res;
}

/**
 * FSM for parsing lists
 */
gchar *
abstract_parse_kv_list (rspamd_mempool_t * pool,
	gchar * chunk,
	gint len,
	struct map_cb_data *data,
	insert_func func)
{
	gchar *c, *p, *key = NULL, *value = NULL, *end;

	p = chunk;
	c = p;
	end = p + len;

	while (p < end) {
		switch (data->state) {
		case 0:
			/* read key */
			/* Check here comments, eol and end of buffer */
			if (*p == '#') {
				if (key != NULL && p - c  >= 0) {
					value = rspamd_mempool_alloc (pool, p - c + 1);
					memcpy (value, c, p - c);
					value[p - c] = '\0';
					value = g_strstrip (value);
					func (data->cur_data, key, value);
					msg_debug_pool ("insert kv pair: %s -> %s", key, value);
				}
				data->state = 99;
			}
			else if (*p == '\r' || *p == '\n') {
				if (key != NULL && p - c >= 0) {
					value = rspamd_mempool_alloc (pool, p - c + 1);
					memcpy (value, c, p - c);
					value[p - c] = '\0';

					value = g_strstrip (value);
					func (data->cur_data, key, value);
					msg_debug_pool ("insert kv pair: %s -> %s", key, value);
				}
				else if (key == NULL && p - c > 0) {
					/* Key only line */
					key = rspamd_mempool_alloc (pool, p - c + 1);
					memcpy (key, c, p - c);
					key[p - c] = '\0';
					value = rspamd_mempool_alloc (pool, 1);
					*value = '\0';
					func (data->cur_data, key, value);
					msg_debug_pool ("insert kv pair: %s -> %s", key, value);
				}
				data->state = 100;
				key = NULL;
			}
			else if (g_ascii_isspace (*p)) {
				if (p - c > 0) {
					key = rspamd_mempool_alloc (pool, p - c + 1);
					memcpy (key, c, p - c);
					key[p - c] = '\0';
					data->state = 2;
				}
				else {
					key = NULL;
				}
			}
			else {
				p++;
			}
			break;
		case 2:
			/* Skip spaces before value */
			if (!g_ascii_isspace (*p)) {
				c = p;
				data->state = 0;
			}
			else {
				p++;
			}
			break;
		case 99:
			/* SKIP_COMMENT */
			/* Skip comment till end of line */
			if (*p == '\r' || *p == '\n') {
				while ((*p == '\r' || *p == '\n') && p < end) {
					p++;
				}
				c = p;
				key = NULL;
				data->state = 0;
			}
			else {
				p++;
			}
			break;
		case 100:
			/* Skip \r\n and whitespaces */
			if (*p == '\r' || *p == '\n' || g_ascii_isspace (*p)) {
				p++;
			}
			else {
				c = p;
				key = NULL;
				data->state = 0;
			}
			break;
		}
	}

	return c;
}

gchar *
rspamd_parse_abstract_list (rspamd_mempool_t * pool,
	gchar * chunk,
	gint len,
	struct map_cb_data *data,
	insert_func func)
{
	gchar *p, *c, *end, *s;

	p = chunk;
	c = p;
	end = p + len;

	while (p < end) {
		switch (data->state) {
		/* READ_SYMBOL */
		case 0:
			if (*p == '#') {
				/* Got comment */
				if (p > c) {
					/* Save previous string in lines like: "127.0.0.1 #localhost" */
					s = strip_map_elt (pool, c, p - c);

					if (s) {
						func (data->cur_data, s, hash_fill);
						msg_debug_pool ("insert element (before comment): %s", s);
					}
				}
				c = p;
				data->state = 1;
			}
			else if (*p == '\r' || *p == '\n') {
				/* Got EOL marker, save stored string */
				s = strip_map_elt (pool, c, p - c);

				if (s) {
					func (data->cur_data, s, hash_fill);
					msg_debug_pool ("insert element (before EOL): %s", s);
				}
				/* Skip EOL symbols */
				while ((*p == '\r' || *p == '\n') && p < end) {
					p++;
				}

				if (p == end) {
					p ++;
					c = NULL;
				}
				else {
					c = p;
				}
			}
			else {
				p++;
			}
			break;
		/* SKIP_COMMENT */
		case 1:
			/* Skip comment till end of line */
			if (*p == '\r' || *p == '\n') {
				while ((*p == '\r' || *p == '\n') && p < end) {
					p++;
				}

				if (p == end) {
					p ++;
					c = NULL;
				}
				else {
					c = p;
				}
				data->state = 0;
			}
			else {
				p++;
			}
			break;
		}
	}

	if (c >= end) {
		c = NULL;
	}

	return c;
}

/**
 * Radix tree helper function
 */
static void
radix_tree_insert_helper (gpointer st, gconstpointer key, gpointer value)
{
	radix_compressed_t *tree = (radix_compressed_t *)st;

	rspamd_radix_add_iplist ((gchar *)key, " ,;", tree);
}

/* Helpers */
gchar *
rspamd_hosts_read (rspamd_mempool_t * pool,
	gchar * chunk,
	gint len,
	struct map_cb_data *data)
{
	if (data->cur_data == NULL) {
		data->cur_data = g_hash_table_new (rspamd_strcase_hash,
				rspamd_strcase_equal);
	}
	return rspamd_parse_abstract_list (pool,
			   chunk,
			   len,
			   data,
			   (insert_func) g_hash_table_insert);
}

void
rspamd_hosts_fin (rspamd_mempool_t * pool, struct map_cb_data *data)
{
	if (data->prev_data) {
		g_hash_table_destroy (data->prev_data);
	}
	if (data->cur_data) {
		msg_info_pool ("read hash of %d elements", g_hash_table_size
				(data->cur_data));
	}
}

gchar *
rspamd_kv_list_read (rspamd_mempool_t * pool,
	gchar * chunk,
	gint len,
	struct map_cb_data *data)
{
	if (data->cur_data == NULL) {
		data->cur_data = g_hash_table_new (rspamd_strcase_hash,
				rspamd_strcase_equal);
	}
	return abstract_parse_kv_list (pool,
			   chunk,
			   len,
			   data,
			   (insert_func) g_hash_table_insert);
}

void
rspamd_kv_list_fin (rspamd_mempool_t * pool, struct map_cb_data *data)
{
	if (data->prev_data) {
		g_hash_table_destroy (data->prev_data);
	}
	if (data->cur_data) {
		msg_info_pool ("read hash of %d elements", g_hash_table_size
				(data->cur_data));
	}
}

gchar *
rspamd_radix_read (rspamd_mempool_t * pool,
	gchar * chunk,
	gint len,
	struct map_cb_data *data)
{
	radix_compressed_t *tree;
	rspamd_mempool_t *rpool;

	if (data->cur_data == NULL) {
		tree = radix_create_compressed ();
		rpool = radix_get_pool (tree);
		memcpy (rpool->tag.uid, pool->tag.uid, sizeof (rpool->tag.uid));
		data->cur_data = tree;
	}
	return rspamd_parse_abstract_list (pool,
			   chunk,
			   len,
			   data,
			   (insert_func) radix_tree_insert_helper);
}

void
rspamd_radix_fin (rspamd_mempool_t * pool, struct map_cb_data *data)
{
	if (data->prev_data) {
		radix_destroy_compressed (data->prev_data);
	}
	if (data->cur_data) {
		msg_info_pool ("read radix trie of %z elements", radix_get_size
				(data->cur_data));
	}
}