/*
 * Copyright (c) 2009-2012, Vsevolod Stakhov
 * All rights reserved.
 *
 * Redistribution and use in source and binary forms, with or without
 * modification, are permitted provided that the following conditions are met:
 *     * Redistributions of source code must retain the above copyright
 *       notice, this list of conditions and the following disclaimer.
 *     * Redistributions in binary form must reproduce the above copyright
 *       notice, this list of conditions and the following disclaimer in the
 *       documentation and/or other materials provided with the distribution.
 *
 * THIS SOFTWARE IS PROVIDED BY AUTHOR ''AS IS'' AND ANY
 * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
 * DISCLAIMED. IN NO EVENT SHALL AUTHOR BE LIABLE FOR ANY
 * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
 * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
 * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
 * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
 */

#include "config.h"
#include "util.h"
#include "main.h"
#include "message.h"
#include "protocol.h"
#include "upstream.h"
#include "cfg_file.h"
#include "map.h"
#include "dns.h"
#include "tokenizers/tokenizers.h"
#include "classifiers/classifiers.h"
#include "binlog.h"
#include "statfile_sync.h"
#include "lua/lua_common.h"
#include "dynamic_cfg.h"
#include "rrd.h"

#define END "END" CRLF

/* 120 seconds for controller's IO */
#define CONTROLLER_IO_TIMEOUT 120

/* RRD macroes */
/* Write data each minute */
#define CONTROLLER_RRD_STEP 60

/* Init functions */
gpointer init_controller (struct rspamd_config *cfg);
void start_controller (struct rspamd_worker *worker);

worker_t controller_worker = {
	"controller",               /* Name */
	init_controller,            /* Init function */
	start_controller,           /* Start function */
	TRUE,                       /* Has socket */
	FALSE,                      /* Non unique */
	FALSE,                      /* Non threaded */
	TRUE,                       /* Killable */
	SOCK_STREAM                 /* TCP socket */
};

enum command_type {
	COMMAND_PASSWORD,
	COMMAND_QUIT,
	COMMAND_RELOAD,
	COMMAND_STAT,
	COMMAND_STAT_RESET,
	COMMAND_SHUTDOWN,
	COMMAND_UPTIME,
	COMMAND_LEARN,
	COMMAND_LEARN_SPAM,
	COMMAND_LEARN_HAM,
	COMMAND_HELP,
	COMMAND_COUNTERS,
	COMMAND_SYNC,
	COMMAND_WEIGHTS,
	COMMAND_GET,
	COMMAND_POST,
	COMMAND_ADD_SYMBOL,
	COMMAND_ADD_ACTION
};

struct controller_command {
	gchar *command;
	gboolean privilleged;
	enum command_type type;
};

struct custom_controller_command {
	const gchar *command;
	gboolean privilleged;
	gboolean require_message;
	controller_func_t handler;
};

struct rspamd_controller_ctx {
	char *password;
	guint32 timeout;
	struct rspamd_dns_resolver *resolver;
	struct event_base *ev_base;
	struct event rrd_event;
	struct rspamd_rrd_file *rrd_file;
	struct rspamd_main *srv;
};

static struct controller_command commands[] = {
	{"password", FALSE, COMMAND_PASSWORD},
	{"quit", FALSE, COMMAND_QUIT},
	{"reload", TRUE, COMMAND_RELOAD},
	{"stat", FALSE, COMMAND_STAT},
	{"stat_reset", TRUE, COMMAND_STAT_RESET},
	{"shutdown", TRUE, COMMAND_SHUTDOWN},
	{"uptime", FALSE, COMMAND_UPTIME},
	{"learn", TRUE, COMMAND_LEARN},
	{"weights", FALSE, COMMAND_WEIGHTS},
	{"help", FALSE, COMMAND_HELP},
	{"counters", FALSE, COMMAND_COUNTERS},
	{"sync", FALSE, COMMAND_SYNC},
	{"learn_spam", TRUE, COMMAND_LEARN_SPAM},
	{"learn_ham", TRUE, COMMAND_LEARN_HAM},
	{"get", FALSE, COMMAND_GET},
	{"post", FALSE, COMMAND_POST},
	{"add_symbol", TRUE, COMMAND_ADD_SYMBOL},
	{"add_action", TRUE, COMMAND_ADD_ACTION}
};

static GList *custom_commands = NULL;

static time_t start_time;

static gchar greetingbuf[1024];

static gboolean controller_write_socket (void *arg);

static void
free_session (void *ud)
{
	GList *part;
	struct mime_part *p;
	struct controller_session *session = ud;

	msg_debug ("freeing session %p", session);

	while ((part = g_list_first (session->parts))) {
		session->parts = g_list_remove_link (session->parts, part);
		p = (struct mime_part *)part->data;
		g_byte_array_free (p->content, FALSE);
		g_list_free_1 (part);
	}
	rspamd_remove_dispatcher (session->dispatcher);

	if (session->kwargs) {
		g_hash_table_destroy (session->kwargs);
	}

	close (session->sock);

	rspamd_mempool_delete (session->session_pool);
	g_slice_free1 (sizeof (struct controller_session), session);
}

static gboolean
restful_write_reply (gint error_code, const gchar *err_message,
	const gchar *buf, gsize buflen, rspamd_io_dispatcher_t *d)
{
	static gchar hbuf[256];
	gint r;

	r = rspamd_snprintf (hbuf, sizeof (hbuf),
			"HTTP/1.0 %d %s" CRLF "Version: " RVERSION CRLF,
			error_code, err_message ? err_message : "OK");
	if (buflen > 0) {
		r += rspamd_snprintf (hbuf + r,
				sizeof (hbuf) - r,
				"Content-Length: %z" CRLF,
				buflen);
	}
	r += rspamd_snprintf (hbuf + r, sizeof (hbuf) - r, CRLF);

	if (buf != NULL) {
		if (!rspamd_dispatcher_write (d, hbuf, r, TRUE, TRUE)) {
			return FALSE;
		}
		return rspamd_dispatcher_write (d, buf, buflen, FALSE, FALSE);
	}
	else {
		if (!rspamd_dispatcher_write (d, hbuf, r, FALSE, TRUE)) {
			return FALSE;
		}
	}

	return TRUE;
}

static gboolean
restful_write_reply_string (gint error_code, const gchar *err_message,
	GString *buf, rspamd_io_dispatcher_t *d)
{
	static gchar hbuf[256];
	gint r;

	r = rspamd_snprintf (hbuf, sizeof (hbuf),
			"HTTP/1.0 %d %s" CRLF "Version: " RVERSION CRLF,
			error_code, err_message ? err_message : "OK");
	if (buf->len > 0) {
		r += rspamd_snprintf (hbuf + r,
				sizeof (hbuf) - r,
				"Content-Length: %z" CRLF,
				buf->len);
	}
	r += rspamd_snprintf (hbuf + r, sizeof (hbuf) - r, CRLF);

	if (buf != NULL) {
		if (!rspamd_dispatcher_write (d, hbuf, r, TRUE, TRUE)) {
			return FALSE;
		}
		return rspamd_dispatcher_write_string (d, buf, FALSE, TRUE);
	}
	else {
		if (!rspamd_dispatcher_write (d, hbuf, r, FALSE, TRUE)) {
			return FALSE;
		}
	}

	return TRUE;
}

static gint
check_auth (struct controller_command *cmd, struct controller_session *session)
{
	gchar out_buf[128];
	gint r;

	if (cmd->privilleged && !session->authorized) {
		if (!session->restful) {
			r = rspamd_snprintf (out_buf,
					sizeof (out_buf),
					"not authorized" CRLF);
			if (!rspamd_dispatcher_write (session->dispatcher, out_buf, r,
				FALSE, FALSE)) {
				return 0;
			}
		}
		else {
			(void)restful_write_reply (403,
				"Not authorized",
				NULL,
				0,
				session->dispatcher);
		}
		return 0;
	}

	return 1;
}

static gboolean
write_whole_statfile (struct controller_session *session,
	gchar *symbol,
	struct rspamd_classifier_config *ccf)
{
	stat_file_t *statfile;
	struct rspamd_statfile_config *st;
	gchar out_buf[BUFSIZ];
	guint i;
	guint64 rev, ti, len, pos, blocks;
	gchar *out;
	struct rspamd_binlog_element log_elt;
	struct stat_file_block *stat_elt;

	statfile = get_statfile_by_symbol (session->worker->srv->statfile_pool, ccf,
			symbol, &st, FALSE);
	if (statfile == NULL) {
		return FALSE;
	}

	/* Begin to copy all blocks into array */
	statfile_get_revision (statfile, &rev, (time_t *)&ti);
	if (ti == 0) {
		/* Not tracked file */
		ti = time (NULL);
		statfile_set_revision (statfile, rev, ti);
	}
	msg_info ("send a whole statfile %s with version %uL to slave", symbol,
		rev);

	blocks = statfile_get_total_blocks (statfile);
	len = blocks * sizeof (struct rspamd_binlog_element);
	out = rspamd_mempool_alloc (session->session_pool, len);

	for (i = 0, pos = 0; i < blocks; i++) {
		stat_elt =
			(struct stat_file_block *)((u_char *)statfile->map +
			statfile->seek_pos + i * sizeof (struct stat_file_block));
		if (fabs (stat_elt->value) > 0.001) {
			/* Write only those values which value is not 0 */
			log_elt.h1 = stat_elt->hash1;
			log_elt.h2 = stat_elt->hash2;
			log_elt.value = stat_elt->value;

			memcpy (out + pos, &log_elt, sizeof (log_elt));
			pos += sizeof (struct rspamd_binlog_element);
		}
	}

	i = rspamd_snprintf (out_buf,
			sizeof (out_buf),
			"%uL %uL %uL" CRLF,
			rev,
			ti,
			pos);
	if (!rspamd_dispatcher_write (session->dispatcher, out_buf, i, TRUE,
		FALSE)) {
		return FALSE;
	}

	if (!rspamd_dispatcher_write (session->dispatcher, out, pos, TRUE, TRUE)) {
		return FALSE;
	}

	return TRUE;
}

static gboolean
process_sync_command (struct controller_session *session, gchar **args)
{
	gchar out_buf[BUFSIZ], *arg, *err_str, *symbol;
	gint r;
	guint64 rev, time;
	struct rspamd_statfile_config *st = NULL;
	struct rspamd_classifier_config *ccf;
	GList *cur;
	struct rspamd_binlog *binlog;
	GByteArray *data = NULL;

	arg = *args;
	if (!arg || *arg == '\0') {
		msg_info ("bad arguments to sync command, need symbol");
		return FALSE;
	}
	symbol = arg;
	arg = *(args + 1);
	if (!arg || *arg == '\0') {
		msg_info ("bad arguments to sync command, need revision");
		return FALSE;
	}
	rev = strtoull (arg, &err_str, 10);
	if (err_str && *err_str != 0) {
		msg_info ("bad arguments to sync command: %s", arg);
		return FALSE;
	}
	arg = *(args + 2);
	if (!arg || *arg == '\0') {
		msg_info ("bad arguments to sync command, need time");
		return FALSE;
	}
	time = strtoull (arg, &err_str, 10);
	if (err_str && *err_str != 0) {
		msg_info ("bad arguments to sync command: %s", arg);
		return FALSE;
	}

	ccf = g_hash_table_lookup (session->cfg->classifiers_symbols, symbol);
	if (ccf == NULL) {
		msg_info ("bad symbol: %s", symbol);
		return FALSE;
	}

	cur = g_list_first (ccf->statfiles);
	while (cur) {
		st = cur->data;
		if (strcmp (symbol, st->symbol) == 0) {
			break;
		}
		st = NULL;
		cur = g_list_next (cur);
	}
	if (st == NULL) {
		msg_info ("bad symbol: %s", symbol);
		return FALSE;
	}

	binlog = get_binlog_by_statfile (st);
	if (binlog == NULL) {
		msg_info ("cannot open binlog: %s", symbol);
		return FALSE;
	}

	while (binlog_sync (binlog, rev, &time, &data)) {
		rev++;
		r = rspamd_snprintf (out_buf,
				sizeof (out_buf),
				"%uL %uL %z" CRLF,
				rev,
				time,
				data->len);
		if (!rspamd_dispatcher_write (session->dispatcher, out_buf, r, FALSE,
			FALSE)) {
			if (data != NULL) {
				g_free (data);
			}
			return FALSE;
		}
		if (data->data != NULL) {
			if (!rspamd_dispatcher_write (session->dispatcher, data->data,
				data->len, TRUE, FALSE)) {
				if (data != NULL) {
					g_free (data);
				}
				return FALSE;
			}
		}
	}

	if (time == 0) {
		if (data != NULL) {
			g_free (data);
		}
		return write_whole_statfile (session, symbol, ccf);
	}

	if (data != NULL) {
		g_free (data);
	}

	return TRUE;
}

static gboolean
process_counters_command (struct controller_session *session)
{
	GList *cur;
	struct cache_item *item;
	struct symbols_cache *cache;
	GString *out;

	cache = session->cfg->cache;
	out = g_string_sized_new (BUFSIZ);

	if (!session->restful) {
		rspamd_printf_gstring (out, "Rspamd counters:" CRLF);
	}

	if (cache != NULL) {
		cur = cache->negative_items;
		while (cur) {
			item = cur->data;
			if (!item->is_callback) {
				rspamd_printf_gstring (out, "%s %.2f %d %.3f" CRLF,
					item->s->symbol, item->s->weight,
					item->s->frequency, item->s->avg_time);
			}
			cur = g_list_next (cur);
		}
		cur = cache->static_items;
		while (cur) {
			item = cur->data;
			if (!item->is_callback) {
				rspamd_printf_gstring (out, "%s %.2f %d %.3f" CRLF,
					item->s->symbol, item->s->weight,
					item->s->frequency, item->s->avg_time);
			}
			cur = g_list_next (cur);
		}
	}

	if (!session->restful) {
		return rspamd_dispatcher_write_string (session->dispatcher,
				   out,
				   FALSE,
				   TRUE);
	}
	else {
		return restful_write_reply_string (200, NULL, out, session->dispatcher);
	}
}

static gboolean
process_stat_command (struct controller_session *session, gboolean do_reset)
{
	GString *out;
	gint i;
	guint64 used, total, rev, ham = 0, spam = 0;
	time_t ti;
	rspamd_mempool_stat_t mem_st;
	struct rspamd_classifier_config *ccf;
	stat_file_t *statfile;
	struct rspamd_statfile_config *st;
	GList *cur_cl, *cur_st;
	struct rspamd_stat *stat, stat_copy;

	rspamd_mempool_stat (&mem_st);
	memcpy (&stat_copy, session->worker->srv->stat, sizeof (stat_copy));
	stat = &stat_copy;
	out = g_string_sized_new (BUFSIZ);
	rspamd_printf_gstring (out,
		"Messages scanned: %ud" CRLF,
		stat->messages_scanned);
	if (stat->messages_scanned > 0) {
		for (i = METRIC_ACTION_REJECT; i <= METRIC_ACTION_NOACTION; i++) {
			rspamd_printf_gstring (out,
				"Messages with action %s: %ud, %.2f%%" CRLF,
				rspamd_action_to_str (
					i),
				stat->actions_stat[i],
				(double)stat->actions_stat[i] / (double)stat->messages_scanned *
				100.);
			if (i < METRIC_ACTION_GREYLIST) {
				spam += stat->actions_stat[i];
			}
			else {
				ham += stat->actions_stat[i];
			}
			if (do_reset) {
				session->worker->srv->stat->actions_stat[i] = 0;
			}
		}
		rspamd_printf_gstring (out,
			"Messages treated as spam: %ud, %.2f%%" CRLF,
			spam,
			(double)spam / (double)stat->messages_scanned * 100.);
		rspamd_printf_gstring (out,
			"Messages treated as ham: %ud, %.2f%%" CRLF,
			ham,
			(double)ham / (double)stat->messages_scanned * 100.);
	}
	rspamd_printf_gstring (out,
		"Messages learned: %ud" CRLF,
		stat->messages_learned);
	rspamd_printf_gstring (out,
		"Connections count: %ud" CRLF,
		stat->connections_count);
	rspamd_printf_gstring (out,
		"Control connections count: %ud" CRLF,
		stat->control_connections_count);
	rspamd_printf_gstring (out,
		"Pools allocated: %z" CRLF,
		mem_st.pools_allocated);
	rspamd_printf_gstring (out,
		"Pools freed: %z" CRLF,
		mem_st.pools_freed);
	rspamd_printf_gstring (out,
		"Bytes allocated: %z" CRLF,
		mem_st.bytes_allocated);
	rspamd_printf_gstring (out,
		"Memory chunks allocated: %z" CRLF,
		mem_st.chunks_allocated);
	rspamd_printf_gstring (out,
		"Shared chunks allocated: %z" CRLF,
		mem_st.shared_chunks_allocated);
	rspamd_printf_gstring (out,
		"Chunks freed: %z" CRLF,
		mem_st.chunks_freed);
	rspamd_printf_gstring (out,
		"Oversized chunks: %z" CRLF,
		mem_st.oversized_chunks);
	rspamd_printf_gstring (out,
		"Fuzzy hashes stored: %ud" CRLF,
		stat->fuzzy_hashes);
	rspamd_printf_gstring (out,
		"Fuzzy hashes expired: %ud" CRLF,
		stat->fuzzy_hashes_expired);
	/* Now write statistics for each statfile */
	cur_cl = g_list_first (session->cfg->classifiers);
	while (cur_cl) {
		ccf = cur_cl->data;
		cur_st = g_list_first (ccf->statfiles);
		while (cur_st) {
			st = cur_st->data;
			if ((statfile =
				statfile_pool_is_open (session->worker->srv->statfile_pool,
				st->path)) == NULL) {
				statfile = statfile_pool_open (
					session->worker->srv->statfile_pool,
					st->path,
					st->size,
					FALSE);
			}
			if (statfile) {
				used = statfile_get_used_blocks (statfile);
				total = statfile_get_total_blocks (statfile);
				statfile_get_revision (statfile, &rev, &ti);
				if (total != (guint64) - 1 && used != (guint64) - 1) {
					if (st->label) {
						rspamd_printf_gstring (out,
							"Statfile: %s <%s> (version %uL); length: %Hz; free blocks: %uL; total blocks: %uL; free: %.2f%%" CRLF,
							st->symbol,
							st->label,
							rev,
							st->size,
							(total - used),
							total,
							(double)((double)(total -
							used) / (double)total) * 100.);
					}
					else {
						rspamd_printf_gstring (out,
							"Statfile: %s (version %uL); length: %Hz; free blocks: %uL; total blocks: %uL; free: %.2f%%" CRLF,
							st->symbol,
							rev,
							st->size,
							(total - used),
							total,
							(double)((double)(total -
							used) / (double)total) * 100.);
					}
				}
			}
			cur_st = g_list_next (cur_st);
		}
		cur_cl = g_list_next (cur_cl);
	}

	if (do_reset) {
		session->worker->srv->stat->messages_scanned = 0;
		session->worker->srv->stat->messages_learned = 0;
		session->worker->srv->stat->connections_count = 0;
		session->worker->srv->stat->control_connections_count = 0;
	}

	if (!session->restful) {
		return rspamd_dispatcher_write_string (session->dispatcher,
				   out,
				   FALSE,
				   TRUE);
	}
	else {
		return restful_write_reply_string (200, NULL, out, session->dispatcher);
	}
}

static gboolean
process_dynamic_conf_command (gchar **cmd_args,
	struct controller_session *session,
	gboolean is_action)
{
	struct rspamd_config *cfg = session->cfg;
	gchar *arg, *metric, *name, *err_str;
	gdouble value;
	gboolean res;
	guint real_act;

	if (cfg->dynamic_conf == NULL) {
		if (!session->restful) {
			return rspamd_dispatcher_write (session->dispatcher,
					   "dynamic config is not specified" CRLF,
					   0,
					   FALSE,
					   TRUE);
		}
		else {
			return restful_write_reply (500,
					   "dynamic config is not specified",
					   NULL,
					   0,
					   session->dispatcher);
		}
	}

	if (session->restful) {
		if ((arg = g_hash_table_lookup (session->kwargs, "metric")) == NULL) {
			metric = DEFAULT_METRIC;
		}
		else {
			metric = arg;
		}
		if ((arg = g_hash_table_lookup (session->kwargs, "name")) == NULL) {
			goto invalid_arguments;
		}
		name = arg;
		if ((arg = g_hash_table_lookup (session->kwargs, "value")) == NULL) {
			goto invalid_arguments;
		}
		value = strtod (arg, &err_str);
		if (err_str && *err_str != '\0') {
			msg_info ("double value is invalid: %s", arg);
			goto invalid_arguments;
		}
	}
	else {
		if (cmd_args[0] == NULL || cmd_args[1] == NULL) {
			goto invalid_arguments;
		}
		if (cmd_args[2] == NULL) {
			metric = DEFAULT_METRIC;
			name = cmd_args[0];
			arg = cmd_args[1];
			value = strtod (arg, &err_str);
			if (err_str && *err_str != '\0') {
				msg_info ("double value is invalid: %s", arg);
				goto invalid_arguments;
			}
		}
		else {
			metric = cmd_args[0];
			name = cmd_args[1];
			arg = cmd_args[2];
			value = strtod (arg, &err_str);
			if (err_str && *err_str != '\0') {
				msg_info ("double value is invalid: %s", arg);
				goto invalid_arguments;
			}
		}
	}

	if (is_action) {
		if (!rspamd_action_from_str (name, &real_act)) {
			msg_info ("invalid action string: %s", name);
			res = FALSE;
		}
		else {
			res = add_dynamic_action (cfg, metric, real_act, value);
		}
	}
	else {
		res = add_dynamic_symbol (cfg, metric, name, value);
	}

	if (res) {

		res = dump_dynamic_config (cfg);
		if (res) {
			if (!session->restful) {
				return rspamd_dispatcher_write (session->dispatcher,
						   "OK" CRLF,
						   0,
						   FALSE,
						   TRUE);
			}
			else {
				return restful_write_reply (200,
						   "OK",
						   NULL,
						   0,
						   session->dispatcher);
			}
		}
		else {
			if (!session->restful) {
				return rspamd_dispatcher_write (session->dispatcher,
						   "Error dumping dynamic config" CRLF,
						   0,
						   FALSE,
						   TRUE);
			}
			else {
				return restful_write_reply (500,
						   "Error dumping dynamic config",
						   NULL,
						   0,
						   session->dispatcher);
			}
		}
	}
	else {
		if (!session->restful) {
			return rspamd_dispatcher_write (session->dispatcher,
					   "Cannot add dynamic rule" CRLF,
					   0,
					   FALSE,
					   TRUE);
		}
		else {
			return restful_write_reply (500,
					   "Cannot add dynamic rule",
					   NULL,
					   0,
					   session->dispatcher);
		}
	}

invalid_arguments:
	if (!session->restful) {
		return rspamd_dispatcher_write (session->dispatcher,
				   "Invalid arguments" CRLF,
				   0,
				   FALSE,
				   TRUE);
	}
	else {
		return restful_write_reply (500,
				   "Invalid arguments",
				   NULL,
				   0,
				   session->dispatcher);
	}
}

static gboolean
process_command (struct controller_command *cmd,
	gchar **cmd_args,
	struct controller_session *session)
{
	gchar out_buf[BUFSIZ], *arg, *err_str;
	gint r = 0, days, hours, minutes;
	time_t uptime;
	guint32 size = 0;
	struct rspamd_classifier_config *cl;
	struct rspamd_controller_ctx *ctx = session->worker->ctx;

	switch (cmd->type) {
	case COMMAND_GET:
	case COMMAND_POST:
		session->restful = TRUE;
		session->state = STATE_HEADER;
		session->kwargs = g_hash_table_new (rspamd_strcase_hash,
				rspamd_strcase_equal);
		break;
	case COMMAND_PASSWORD:
		arg = *cmd_args;
		if (!arg || *arg == '\0') {
			msg_debug ("empty password passed");
			r = rspamd_snprintf (out_buf,
					sizeof (out_buf),
					"password command requires one argument" CRLF);
			if (!rspamd_dispatcher_write (session->dispatcher, out_buf, r,
				FALSE, FALSE)) {
				return FALSE;
			}
			return TRUE;
		}
		if (ctx->password == NULL) {
			r = rspamd_snprintf (out_buf,
					sizeof (out_buf),
					"password command disabled in config, authorized access granted" CRLF);
			if (!rspamd_dispatcher_write (session->dispatcher, out_buf, r,
				FALSE, FALSE)) {
				return FALSE;
			}
			return TRUE;
		}
		if (strncmp (arg, ctx->password, strlen (arg)) == 0) {
			session->authorized = 1;
			r = rspamd_snprintf (out_buf,
					sizeof (out_buf),
					"password accepted" CRLF);
			if (!rspamd_dispatcher_write (session->dispatcher, out_buf, r,
				FALSE, FALSE)) {
				return FALSE;
			}
		}
		else {
			session->authorized = 0;
			r = rspamd_snprintf (out_buf,
					sizeof (out_buf),
					"password NOT accepted" CRLF);
			if (!rspamd_dispatcher_write (session->dispatcher, out_buf, r,
				FALSE, FALSE)) {
				return FALSE;
			}
		}
		break;
	case COMMAND_QUIT:
		session->state = STATE_QUIT;
		break;
	case COMMAND_RELOAD:
		if (check_auth (cmd, session)) {
			r = rspamd_snprintf (out_buf,
					sizeof (out_buf),
					"reload request sent" CRLF);
			if (!session->restful) {
				if (!rspamd_dispatcher_write (session->dispatcher, out_buf, r,
					FALSE, FALSE)) {
					return FALSE;
				}
			}
			else {
				if (!restful_write_reply (200, out_buf, NULL, 0,
					session->dispatcher)) {
					return FALSE;
				}
			}
			kill (getppid (), SIGHUP);
		}
		break;
	case COMMAND_STAT:
		if (check_auth (cmd, session)) {
			return process_stat_command (session, FALSE);
		}
		break;
	case COMMAND_STAT_RESET:
		if (check_auth (cmd, session)) {
			return process_stat_command (session, TRUE);
		}
		break;
	case COMMAND_SHUTDOWN:
		if (check_auth (cmd, session)) {
			r = rspamd_snprintf (out_buf,
					sizeof (out_buf),
					"shutdown request sent" CRLF);
			if (!session->restful) {
				if (!rspamd_dispatcher_write (session->dispatcher, out_buf, r,
					FALSE, FALSE)) {
					return FALSE;
				}
			}
			else {
				if (!restful_write_reply (200, out_buf, NULL, 0,
					session->dispatcher)) {
					return FALSE;
				}
			}
			kill (getppid (), SIGTERM);
		}
		break;
	case COMMAND_UPTIME:
		if (check_auth (cmd, session)) {
			uptime = time (NULL) - start_time;
			/* If uptime more than 2 hours, print as a number of days. */
			if (uptime >= 2 * 3600) {
				days = uptime / 86400;
				hours = uptime / 3600 - days * 24;
				minutes = uptime / 60 - hours * 60 - days * 1440;
				r = rspamd_snprintf (out_buf,
						sizeof (out_buf),
						"%d day%s %d hour%s %d minute%s" CRLF,
						days,
						days > 1 ? "s" : " ",
						hours,
						hours > 1 ? "s" : " ",
						minutes,
						minutes > 1 ? "s" : " ");
			}
			/* If uptime is less than 1 minute print only seconds */
			else if (uptime / 60 == 0) {
				r = rspamd_snprintf (out_buf,
						sizeof (out_buf),
						"%d second%s" CRLF,
						(gint)uptime,
						(gint)uptime > 1 ? "s" : " ");
			}
			/* Else print the minutes and seconds. */
			else {
				hours = uptime / 3600;
				minutes = uptime / 60 - hours * 60;
				uptime -= hours * 3600 + minutes * 60;
				r = rspamd_snprintf (out_buf,
						sizeof (out_buf),
						"%d hour%s %d minute%s %d second%s" CRLF,
						hours,
						hours > 1 ? "s" : " ",
						minutes,
						minutes > 1 ? "s" : " ",
						(gint)uptime,
						uptime > 1 ? "s" : " ");
			}
			if (!session->restful) {
				if (!rspamd_dispatcher_write (session->dispatcher, out_buf, r,
					FALSE, FALSE)) {
					return FALSE;
				}
			}
			else {
				if (!restful_write_reply (200, NULL, out_buf, r,
					session->dispatcher)) {
					return FALSE;
				}
			}
		}
		break;
	case COMMAND_LEARN_SPAM:
		if (check_auth (cmd, session)) {
			if (!session->restful) {
				arg = *cmd_args;
				if (!arg || *arg == '\0') {
					msg_debug ("no statfile specified in learn command");
					r = rspamd_snprintf (out_buf,
							sizeof (out_buf),
							"learn_spam command requires at least two arguments: classifier name and a message's size" CRLF);
					if (!rspamd_dispatcher_write (session->dispatcher, out_buf,
						r, FALSE, FALSE)) {
						return FALSE;
					}
					return TRUE;
				}
				arg = *(cmd_args + 1);
				if (arg == NULL || *arg == '\0') {
					msg_debug ("no message size specified in learn command");
					r = rspamd_snprintf (out_buf,
							sizeof (out_buf),
							"learn_spam command requires at least two arguments: classifier name and a message's size" CRLF);
					if (!rspamd_dispatcher_write (session->dispatcher, out_buf,
						r, FALSE, FALSE)) {
						return FALSE;
					}
					return TRUE;
				}
				size = strtoul (arg, &err_str, 10);
				if (err_str && *err_str != '\0') {
					msg_debug ("message size is invalid: %s", arg);
					r = rspamd_snprintf (out_buf,
							sizeof (out_buf),
							"learn size is invalid" CRLF);
					if (!rspamd_dispatcher_write (session->dispatcher, out_buf,
						r, FALSE, FALSE)) {
						return FALSE;
					}
					return TRUE;
				}
				cl = rspamd_config_find_classifier (session->cfg, *cmd_args);
			}
			else {
				if ((arg =
					g_hash_table_lookup (session->kwargs,
					"classifier")) == NULL) {
					msg_debug ("no classifier specified in learn command");
					r = rspamd_snprintf (out_buf,
							sizeof (out_buf),
							"learn_spam command requires at least two arguments: classifier name and a message's size" CRLF);
					if (!restful_write_reply (500, out_buf, NULL, 0,
						session->dispatcher)) {
						return FALSE;
					}
					return TRUE;
				}
				else {
					cl = rspamd_config_find_classifier (session->cfg, arg);
				}
				if ((arg =
					g_hash_table_lookup (session->kwargs,
					"content-length")) == NULL) {
					msg_debug ("no size specified in learn command");
					r = rspamd_snprintf (out_buf,
							sizeof (out_buf),
							"learn_spam command requires at least two arguments: classifier name and a message's size" CRLF);
					if (!restful_write_reply (500, out_buf, NULL, 0,
						session->dispatcher)) {
						return FALSE;
					}
				}
				else {
					size = strtoul (arg, &err_str, 10);
					if (err_str && *err_str != '\0') {
						msg_debug ("message size is invalid: %s", arg);
						r = rspamd_snprintf (out_buf,
								sizeof (out_buf),
								"learn size is invalid" CRLF);
						if (!restful_write_reply (500, out_buf, NULL, 0,
							session->dispatcher)) {
							return FALSE;
						}
						return TRUE;
					}
				}
			}

			session->learn_classifier = cl;

			/* By default learn positive */
			session->in_class = TRUE;
			rspamd_set_dispatcher_policy (session->dispatcher,
				BUFFER_CHARACTER,
				size);
			session->state = STATE_LEARN_SPAM_PRE;
		}
		break;
	case COMMAND_LEARN_HAM:
		if (check_auth (cmd, session)) {
			if (!session->restful) {
				arg = *cmd_args;
				if (!arg || *arg == '\0') {
					msg_debug ("no statfile specified in learn command");
					r = rspamd_snprintf (out_buf,
							sizeof (out_buf),
							"learn_spam command requires at least two arguments: classifier name and a message's size" CRLF);
					if (!rspamd_dispatcher_write (session->dispatcher, out_buf,
						r, FALSE, FALSE)) {
						return FALSE;
					}
					return TRUE;
				}
				arg = *(cmd_args + 1);
				if (arg == NULL || *arg == '\0') {
					msg_debug ("no message size specified in learn command");
					r = rspamd_snprintf (out_buf,
							sizeof (out_buf),
							"learn_spam command requires at least two arguments: classifier name and a message's size" CRLF);
					if (!rspamd_dispatcher_write (session->dispatcher, out_buf,
						r, FALSE, FALSE)) {
						return FALSE;
					}
					return TRUE;
				}
				size = strtoul (arg, &err_str, 10);
				if (err_str && *err_str != '\0') {
					msg_debug ("message size is invalid: %s", arg);
					r = rspamd_snprintf (out_buf,
							sizeof (out_buf),
							"learn size is invalid" CRLF);
					if (!rspamd_dispatcher_write (session->dispatcher, out_buf,
						r, FALSE, FALSE)) {
						return FALSE;
					}
					return TRUE;
				}
				cl = rspamd_config_find_classifier (session->cfg, *cmd_args);
			}
			else {
				if ((arg =
					g_hash_table_lookup (session->kwargs,
					"classifier")) == NULL) {
					msg_debug ("no classifier specified in learn command");
					r = rspamd_snprintf (out_buf,
							sizeof (out_buf),
							"learn_spam command requires at least two arguments: classifier name and a message's size" CRLF);
					if (!restful_write_reply (500, out_buf, NULL, 0,
						session->dispatcher)) {
						return FALSE;
					}
					return TRUE;
				}
				else {
					cl = rspamd_config_find_classifier (session->cfg, arg);
				}
				if ((arg =
					g_hash_table_lookup (session->kwargs,
					"content-length")) == NULL) {
					msg_debug ("no size specified in learn command");
					r = rspamd_snprintf (out_buf,
							sizeof (out_buf),
							"learn_spam command requires at least two arguments: classifier name and a message's size" CRLF);
					if (!restful_write_reply (500, out_buf, NULL, 0,
						session->dispatcher)) {
						return FALSE;
					}
				}
				else {
					size = strtoul (arg, &err_str, 10);
					if (err_str && *err_str != '\0') {
						msg_debug ("message size is invalid: %s", arg);
						r = rspamd_snprintf (out_buf,
								sizeof (out_buf),
								"learn size is invalid" CRLF);
						if (!restful_write_reply (500, out_buf, NULL, 0,
							session->dispatcher)) {
							return FALSE;
						}
						return TRUE;
					}
				}
			}

			session->learn_classifier = cl;

			/* By default learn positive */
			session->in_class = FALSE;
			rspamd_set_dispatcher_policy (session->dispatcher,
				BUFFER_CHARACTER,
				size);
			session->state = STATE_LEARN_SPAM_PRE;
		}
		break;
	case COMMAND_LEARN:
		if (check_auth (cmd, session)) {
			/* TODO: remove this command as currenly it should not be used anywhere */
			arg = *cmd_args;
			if (!arg || *arg == '\0') {
				msg_debug ("no statfile specified in learn command");
				r = rspamd_snprintf (out_buf,
						sizeof (out_buf),
						"learn command requires at least two arguments: stat filename and its size" CRLF);
				if (!rspamd_dispatcher_write (session->dispatcher, out_buf, r,
					FALSE, FALSE)) {
					return FALSE;
				}
				return TRUE;
			}
			arg = *(cmd_args + 1);
			if (arg == NULL || *arg == '\0') {
				msg_debug ("no message size specified in learn command");
				r = rspamd_snprintf (out_buf,
						sizeof (out_buf),
						"learn command requires at least two arguments: symbol and message size" CRLF);
				if (!rspamd_dispatcher_write (session->dispatcher, out_buf, r,
					FALSE, FALSE)) {
					return FALSE;
				}
				return TRUE;
			}
			size = strtoul (arg, &err_str, 10);
			if (err_str && *err_str != '\0') {
				msg_debug ("message size is invalid: %s", arg);
				r = rspamd_snprintf (out_buf,
						sizeof (out_buf),
						"learn size is invalid" CRLF);
				if (!rspamd_dispatcher_write (session->dispatcher, out_buf, r,
					FALSE, FALSE)) {
					return FALSE;
				}
				return TRUE;
			}

			session->learn_symbol = rspamd_mempool_strdup (
				session->session_pool,
				*cmd_args);
			cl = g_hash_table_lookup (session->cfg->classifiers_symbols,
					*cmd_args);

			session->learn_classifier = cl;

			/* By default learn positive */
			session->in_class = 1;
			session->learn_multiplier = 1.;
			/* Get all arguments */
			while (*cmd_args++) {
				arg = *cmd_args;
				if (arg && *arg == '-') {
					switch (*(arg + 1)) {
					case 'r':
						arg = *(cmd_args + 1);
						if (!arg || *arg == '\0') {
							r = rspamd_snprintf (out_buf,
									sizeof (out_buf),
									"recipient is not defined" CRLF);
							if (!rspamd_dispatcher_write (session->dispatcher,
								out_buf, r, FALSE, FALSE)) {
								return FALSE;
							}
						}
						session->learn_rcpt = rspamd_mempool_strdup (
							session->session_pool,
							arg);
						break;
					case 'f':
						arg = *(cmd_args + 1);
						if (!arg || *arg == '\0') {
							r = rspamd_snprintf (out_buf,
									sizeof (out_buf),
									"from is not defined" CRLF);
							if (!rspamd_dispatcher_write (session->dispatcher,
								out_buf, r, FALSE, FALSE)) {
								return FALSE;
							}
						}
						session->learn_from = rspamd_mempool_strdup (
							session->session_pool,
							arg);
						break;
					case 'n':
						session->in_class = 0;
						break;
					case 'm':
						arg = *(cmd_args + 1);
						if (!arg || *arg == '\0') {
							r = rspamd_snprintf (out_buf,
									sizeof (out_buf),
									"multiplier is not defined" CRLF);
							if (!rspamd_dispatcher_write (session->dispatcher,
								out_buf, r, FALSE, FALSE)) {
								return FALSE;
							}
						}
						else {
							session->learn_multiplier = strtod (arg, NULL);
						}
						break;
					default:
						r = rspamd_snprintf (out_buf,
								sizeof (out_buf),
								"tokenizer is not defined" CRLF);
						if (!rspamd_dispatcher_write (session->dispatcher,
							out_buf, r, FALSE, FALSE)) {
							return FALSE;
						}
						return TRUE;
					}
				}
			}
			rspamd_set_dispatcher_policy (session->dispatcher,
				BUFFER_CHARACTER,
				size);
			session->state = STATE_LEARN;
		}
		break;

	case COMMAND_WEIGHTS:
		/* TODO: remove this command as currenly it should not be used anywhere */
		arg = *cmd_args;
		if (!arg || *arg == '\0') {
			msg_debug ("no statfile specified in weights command");
			r = rspamd_snprintf (out_buf,
					sizeof (out_buf),
					"weights command requires two arguments: statfile and message size" CRLF);
			if (!rspamd_dispatcher_write (session->dispatcher, out_buf, r,
				FALSE, FALSE)) {
				return FALSE;
			}
			return TRUE;
		}
		arg = *(cmd_args + 1);
		if (arg == NULL || *arg == '\0') {
			msg_debug ("no message size specified in weights command");
			r = rspamd_snprintf (out_buf,
					sizeof (out_buf),
					"weights command requires two arguments: statfile and message size" CRLF);
			if (!rspamd_dispatcher_write (session->dispatcher, out_buf, r,
				FALSE, FALSE)) {
				return FALSE;
			}
			return TRUE;
		}
		size = strtoul (arg, &err_str, 10);
		if (err_str && *err_str != '\0') {
			msg_debug ("message size is invalid: %s", arg);
			r = rspamd_snprintf (out_buf,
					sizeof (out_buf),
					"message size is invalid" CRLF);
			if (!rspamd_dispatcher_write (session->dispatcher, out_buf, r,
				FALSE, FALSE)) {
				return FALSE;
			}
			return TRUE;
		}

		cl = g_hash_table_lookup (session->cfg->classifiers_symbols, *cmd_args);
		if (cl == NULL) {
			r = rspamd_snprintf (out_buf,
					sizeof (out_buf),
					"statfile %s is not defined" CRLF,
					*cmd_args);
			if (!rspamd_dispatcher_write (session->dispatcher, out_buf, r,
				FALSE, FALSE)) {
				return FALSE;
			}
			return TRUE;

		}
		session->learn_classifier = cl;

		rspamd_set_dispatcher_policy (session->dispatcher,
			BUFFER_CHARACTER,
			size);
		session->state = STATE_WEIGHTS;
		break;
	case COMMAND_SYNC:
		if (!process_sync_command (session, cmd_args)) {
			r = rspamd_snprintf (out_buf, sizeof (out_buf), "FAIL" CRLF);
			if (!rspamd_dispatcher_write (session->dispatcher, out_buf, r,
				FALSE, FALSE)) {
				return FALSE;
			}
			return TRUE;
		}
		break;
	case COMMAND_HELP:
		r = rspamd_snprintf (out_buf,
				sizeof (out_buf),
				"Rspamd CLI commands (* - privileged command):" CRLF
				"    help - this help message" CRLF
				"(*) learn <statfile> <size> [-r recipient] [-m multiplier] [-f from] [-n] - learn message to specified statfile" CRLF
				"    quit - quit CLI session" CRLF
				"    password <password> - authenticate yourself for privileged commands" CRLF
				"(*) reload - reload rspamd" CRLF
				"(*) shutdown - shutdown rspamd" CRLF
				"    stat - show different rspamd stat" CRLF
				"    sync - run synchronization of statfiles" CRLF
				"    counters - show rspamd counters" CRLF
				"    uptime - rspamd uptime" CRLF
				"    weights <statfile> <size> - weight of message in all statfiles" CRLF);
		if (!session->restful) {
			if (!rspamd_dispatcher_write (session->dispatcher, out_buf, r,
				FALSE, FALSE)) {
				return FALSE;
			}
		}
		else {
			if (!restful_write_reply (200, NULL, out_buf, r,
				session->dispatcher)) {
				return FALSE;
			}
		}
		break;
	case COMMAND_COUNTERS:
		process_counters_command (session);
		break;
	case COMMAND_ADD_ACTION:
		if (check_auth (cmd, session)) {
			return process_dynamic_conf_command (cmd_args, session, TRUE);
		}
		break;
	case COMMAND_ADD_SYMBOL:
		if (check_auth (cmd, session)) {
			return process_dynamic_conf_command (cmd_args, session, FALSE);
		}
		break;
	}
	return TRUE;
}

static controller_func_t
parse_custom_command (gchar *line,
	gchar **cmd_args,
	struct controller_session *session,
	gsize len)
{
	GList *cur;
	struct custom_controller_command *cmd;

	if (len == 0) {
		len = strlen (line);
	}
	cur = custom_commands;
	while (cur) {
		cmd = cur->data;
		if (g_ascii_strncasecmp (cmd->command, line, len) == 0) {
			return cmd->handler;
		}
		cur = g_list_next (cur);
	}

	return NULL;
}

static struct controller_command *
parse_normal_command (const gchar *line, gsize len)
{
	guint i;
	struct controller_command *c;

	if (len == 0) {
		len = strlen (line);
	}
	for (i = 0; i < G_N_ELEMENTS (commands); i++) {
		c = &commands[i];
		if (g_ascii_strncasecmp (line, c->command, len) == 0) {
			return c;
		}
	}

	return NULL;
}

static gboolean
process_header (f_str_t *line, struct controller_session *session)
{
	gchar *headern;
	struct controller_command *command;
	struct rspamd_controller_ctx *ctx = session->worker->ctx;
	controller_func_t custom_handler;

	/* XXX: temporary workaround */
	headern = NULL;

	if (line == NULL || headern == NULL) {
		msg_warn ("bad header: %V", line);
		return FALSE;
	}
	/* Eat whitespaces */
	g_strstrip (headern);
	fstrstrip (line);

	if (*headern == 'c' || *headern == 'C') {
		if (g_ascii_strcasecmp (headern, "command") == 0) {
			/* This header is actually command */
			command = parse_normal_command (line->begin, line->len);
			if (command == NULL) {
				if ((custom_handler =
					parse_custom_command (line->begin, NULL, session,
					line->len)) == NULL) {
					msg_info ("bad command header: %V", line);
					return FALSE;
				}
				else {
					session->custom_handler = custom_handler;
				}
			}
			session->cmd = command;
			return TRUE;
		}
	}
	else if (*headern == 'p' || *headern == 'P') {
		/* Password header */
		if (g_ascii_strcasecmp (headern, "password") == 0) {
			if (ctx->password == NULL ||
				(line->len == strlen (ctx->password)
				&& memcmp (line->begin, ctx->password, line->len) == 0)) {
				session->authorized = TRUE;
			}
			else {
				msg_info ("wrong password in controller command");
			}
			return TRUE;
		}
	}

	g_hash_table_insert (session->kwargs, headern,
		fstrcstr (line, session->session_pool));

	return TRUE;
}

/*
 * Called if all filters are processed, non-threaded and simple version
 */
static gboolean
fin_learn_task (void *arg)
{
	struct rspamd_task *task = (struct rspamd_task *) arg;

	/* XXX: needs to be reworked */

	return TRUE;
}

/*
 * Called if session was restored inside fin callback
 */
static void
restore_learn_task (void *arg)
{
	struct rspamd_task *task = (struct rspamd_task *) arg;

	/* Special state */
}

static gboolean
controller_read_socket (f_str_t * in, void *arg)
{
	struct controller_session *session = (struct controller_session *)arg;
	struct classifier_ctx *cls_ctx;
	gint len, i, r;
	gchar *s, **params, *cmd, out_buf[128];
	struct controller_command *command;
	struct rspamd_task *task;
	struct mime_text_part *part;
	GList *cur = NULL;
	GTree *tokens = NULL;
	GError *err = NULL;
	f_str_t c;
	controller_func_t custom_handler;

	switch (session->state) {
	case STATE_COMMAND:
		s = fstrcstr (in, session->session_pool);
		params = g_strsplit_set (s, " ", -1);

		rspamd_mempool_add_destructor (session->session_pool,
			(rspamd_mempool_destruct_t) g_strfreev, params);

		len = g_strv_length (params);
		if (len > 0) {
			cmd = g_strstrip (params[0]);

			command = parse_normal_command (cmd, 0);
			if (command != NULL) {
				if (!process_command (command, &params[1], session)) {
					return FALSE;
				}
			}
			else {
				if ((custom_handler =
					parse_custom_command (cmd, &params[1], session,
					0)) == NULL) {
					msg_debug ("'%s'", cmd);
					i = rspamd_snprintf (out_buf,
							sizeof (out_buf),
							"Unknown command" CRLF);
					if (!rspamd_dispatcher_write (session->dispatcher, out_buf,
						i, FALSE, FALSE)) {
						return FALSE;
					}
				}
				else {
					custom_handler (&params[1], session);
				}
			}
		}
		if (session->state != STATE_LEARN && session->state !=
			STATE_LEARN_SPAM_PRE
			&& session->state != STATE_WEIGHTS && session->state !=
			STATE_OTHER && session->state != STATE_HEADER) {
			if (!rspamd_dispatcher_write (session->dispatcher, END,
				sizeof (END) - 1, FALSE, TRUE)) {
				return FALSE;
			}
			if (session->state == STATE_QUIT) {
				destroy_session (session->s);
				return FALSE;
			}
		}

		break;
	case STATE_HEADER:
		if (in->len == 0) {
			/* End of headers */
			if (session->cmd == NULL && session->custom_handler == NULL) {
				i = rspamd_snprintf (out_buf,
						sizeof (out_buf),
						"HTTP/1.0 500 Bad command" CRLF CRLF);
				if (!rspamd_dispatcher_write (session->dispatcher, out_buf, i,
					FALSE, FALSE)) {
					return FALSE;
				}
				destroy_session (session->s);
				return FALSE;
			}
			/* Perform command */
			else if (session->cmd != NULL) {
				if (!process_command (session->cmd, NULL, session)) {
					msg_debug ("process command failed");
					return FALSE;
				}
			}
			else {
				session->custom_handler (NULL, session);
			}
			if (session->state != STATE_LEARN && session->state !=
				STATE_LEARN_SPAM_PRE
				&& session->state != STATE_WEIGHTS && session->state !=
				STATE_OTHER) {
				msg_debug ("closing restful connection");
				destroy_session (session->s);
				return FALSE;
			}
		}
		else if (!process_header (in, session)) {
			i = rspamd_snprintf (out_buf,
					sizeof (out_buf),
					"HTTP/1.0 500 Bad header" CRLF CRLF);
			if (!rspamd_dispatcher_write (session->dispatcher, out_buf, i,
				FALSE, FALSE)) {
				return FALSE;
			}
			destroy_session (session->s);
			return FALSE;
		}
		break;
	case STATE_LEARN:
		session->learn_buf = in;
		task = rspamd_task_new (session->worker);

		task->msg = g_string_new_len (in->begin, in->len);
		task->ev_base = session->ev_base;

		r = process_message (task);
		if (r == -1) {
			msg_warn ("processing of message failed");
			rspamd_task_free (task, FALSE);
			session->state = STATE_REPLY;
			if (session->restful) {
				r = rspamd_snprintf (out_buf,
						sizeof (out_buf),
						"HTTP/1.0 500 Cannot process message" CRLF CRLF);
			}
			else {
				r = rspamd_snprintf (out_buf,
						sizeof (out_buf),
						"cannot process message" CRLF);
			}
			if (!rspamd_dispatcher_write (session->dispatcher, out_buf, r,
				FALSE, FALSE)) {
				return FALSE;
			}
			return FALSE;
		}

		if (!rspamd_learn_task (session->learn_symbol, task, &err)) {
			rspamd_task_free (task, FALSE);
			if (err) {
				if (session->restful) {
					i = rspamd_snprintf (out_buf,
							sizeof (out_buf),
							"HTTP/1.0 500 Learn classifier error: %s" CRLF CRLF,
							err->message);
				}
				else {
					i = rspamd_snprintf (out_buf,
							sizeof (out_buf),
							"learn failed, learn classifier error: %s" CRLF END,
							err->message);
				}
				g_error_free (err);
			}
			else {
				if (session->restful) {
					i = rspamd_snprintf (out_buf,
							sizeof (out_buf),
							"HTTP/1.0 500 Learn classifier error: unknown" CRLF CRLF);
				}
				else {
					i = rspamd_snprintf (out_buf,
							sizeof (out_buf),
							"learn failed, unknown learn classifier error" CRLF END);
				}
			}

			if (!rspamd_dispatcher_write (session->dispatcher, out_buf, i,
				FALSE, FALSE)) {
				return FALSE;
			}
			session->state = STATE_REPLY;
			return TRUE;
		}

		rspamd_task_free (task, FALSE);
		if (session->restful) {
			i = rspamd_snprintf (out_buf,
					sizeof (out_buf),
					"HTTP/1.0 200 Learn OK" CRLF CRLF);
		}
		else {
			i =
				rspamd_snprintf (out_buf, sizeof (out_buf),
					"learn ok" CRLF END);
		}
		session->state = STATE_REPLY;
		if (!rspamd_dispatcher_write (session->dispatcher, out_buf, i, FALSE,
			FALSE)) {
			return FALSE;
		}
		break;
	case STATE_LEARN_SPAM_PRE:
		session->learn_buf = in;
		task = rspamd_task_new (session->worker);

		task->msg = g_string_new_len (in->begin, in->len);

		task->resolver = session->resolver;
		task->ev_base = session->ev_base;

		r = process_message (task);
		if (r == -1) {
			msg_warn ("processing of message failed");
			session->state = STATE_REPLY;
			if (session->restful) {
				r = rspamd_snprintf (out_buf,
						sizeof (out_buf),
						"HTTP/1.0 500 Cannot process message" CRLF CRLF);
			}
			else {
				r = rspamd_snprintf (out_buf,
						sizeof (out_buf),
						"cannot process message" CRLF);
			}
			if (!session->restful) {
				if (!rspamd_dispatcher_write (session->dispatcher, out_buf, r,
					FALSE, FALSE)) {
					return FALSE;
				}
			}
			else {
				if (!restful_write_reply (500, out_buf, NULL, 0,
					session->dispatcher)) {
					return FALSE;
				}
			}
			return FALSE;
		}
		/* Set up async session */
		task->s = new_async_session (task->task_pool,
				fin_learn_task,
				restore_learn_task,
				rspamd_task_free_hard,
				task);
		session->learn_task = task;
		session->state = STATE_LEARN_SPAM;
		rspamd_dispatcher_pause (session->dispatcher);
		r = rspamd_process_filters (task);
		if (r == -1) {
			rspamd_dispatcher_restore (session->dispatcher);
			session->state = STATE_REPLY;
			if (session->restful) {
				r = rspamd_snprintf (out_buf,
						sizeof (out_buf),
						"HTTP/1.0 500 Cannot process message" CRLF CRLF);
			}
			else {
				r = rspamd_snprintf (out_buf,
						sizeof (out_buf),
						"cannot process message" CRLF);
			}
			destroy_session (task->s);
			if (!session->restful) {
				if (!rspamd_dispatcher_write (session->dispatcher, out_buf, r,
					FALSE, FALSE)) {
					return FALSE;
				}
			}
			else {
				if (!restful_write_reply (500, out_buf, NULL, 0,
					session->dispatcher)) {
					return FALSE;
				}
			}
		}
		break;
	case STATE_WEIGHTS:
		session->learn_buf = in;
		task = rspamd_task_new (session->worker);

		task->msg = g_string_new_len (in->begin, in->len);
		task->ev_base = session->ev_base;

		r = process_message (task);
		if (r == -1) {
			msg_warn ("processing of message failed");
			rspamd_task_free (task, FALSE);
			session->state = STATE_REPLY;
			r = rspamd_snprintf (out_buf,
					sizeof (out_buf),
					"cannot process message" CRLF END);
			if (!session->restful) {
				if (!rspamd_dispatcher_write (session->dispatcher, out_buf, r,
					FALSE, FALSE)) {
					return FALSE;
				}
			}
			else {
				if (!restful_write_reply (500, out_buf, NULL, 0,
					session->dispatcher)) {
					return FALSE;
				}
			}
			return FALSE;
		}

		cur = g_list_first (task->text_parts);
		while (cur) {
			part = cur->data;
			if (part->is_empty) {
				cur = g_list_next (cur);
				continue;
			}

			c.begin = part->content->data;
			c.len = part->content->len;
			if (!session->learn_classifier->tokenizer->tokenize_func (session->
				learn_classifier->tokenizer,
				session->session_pool, &c, &tokens, FALSE, part->is_utf,
				part->urls_offset)) {
				i = rspamd_snprintf (out_buf,
						sizeof (out_buf),
						"weights failed, tokenizer error" CRLF END);
				rspamd_task_free (task, FALSE);
				if (!session->restful) {
					if (!rspamd_dispatcher_write (session->dispatcher, out_buf,
						r, FALSE, FALSE)) {
						return FALSE;
					}
				}
				else {
					if (!restful_write_reply (500, out_buf, NULL, 0,
						session->dispatcher)) {
						return FALSE;
					}
				}
				session->state = STATE_REPLY;
				return TRUE;
			}
			cur = g_list_next (cur);
		}

		/* Handle messages without text */
		if (tokens == NULL) {
			i = rspamd_snprintf (out_buf,
					sizeof (out_buf),
					"weights failed, no tokens can be extracted (no text data)" CRLF END);
			rspamd_task_free (task, FALSE);
			if (!rspamd_dispatcher_write (session->dispatcher, out_buf, i,
				FALSE, FALSE)) {
				return FALSE;
			}
			session->state = STATE_REPLY;
			return TRUE;
		}


		/* Init classifier */
		cls_ctx = session->learn_classifier->classifier->init_func (
			session->session_pool,
			session->learn_classifier);

		cur = session->learn_classifier->classifier->weights_func (cls_ctx,
				session->worker->srv->statfile_pool,
				tokens,
				task);
		i = 0;
		struct classify_weight *w;

		while (cur) {
			w = cur->data;
			i += rspamd_snprintf (out_buf + i,
					sizeof (out_buf) - i,
					"%s: %G" CRLF,
					w->name,
					w->weight);
			cur = g_list_next (cur);
		}
		i += rspamd_snprintf (out_buf + i, sizeof (out_buf) - i, END);
		if (i != 0) {
			if (!rspamd_dispatcher_write (session->dispatcher, out_buf, i,
				FALSE, FALSE)) {
				return FALSE;
			}
		}
		else {
			if (!rspamd_dispatcher_write (session->dispatcher,
				"weights failed: classifier error" CRLF END, 0, FALSE, TRUE)) {
				return FALSE;
			}
		}

		rspamd_task_free (task, FALSE);

		session->state = STATE_REPLY;
		break;
	case STATE_OTHER:
		rspamd_dispatcher_pause (session->dispatcher);
		if (session->other_handler) {
			if (!session->other_handler (session, in)) {
				return FALSE;
			}
		}
		break;
	case STATE_WAIT:
		rspamd_dispatcher_pause (session->dispatcher);
		break;
	default:
		msg_debug ("unknown state while reading %d", session->state);
		break;
	}

	if (session->state == STATE_REPLY || session->state == STATE_QUIT) {
		/* In case of normal session we restore read state, for restful session we need to terminate immediately */
		if (!session->restful) {
			rspamd_dispatcher_restore (session->dispatcher);
		}
		else {
			return FALSE;
		}
	}
	return TRUE;
}

static gboolean
controller_write_socket (void *arg)
{
	struct controller_session *session = (struct controller_session *)arg;
	gint i;
	gchar out_buf[1024];
	GError *err = NULL;

	if (session->state == STATE_QUIT) {
		/* Free buffers */
		destroy_session (session->s);
		return FALSE;
	}
	else if (session->state == STATE_LEARN_SPAM) {
		/* Perform actual learn here */
		if (session->learn_classifier == NULL) {
			if (session->restful) {
				i = rspamd_snprintf (out_buf,
						sizeof (out_buf),
						"HTTP/1.0 500 Learn classifier error: %s" CRLF CRLF,
						"unknown classifier");
			}
			else {
				i = rspamd_snprintf (out_buf,
						sizeof (out_buf),
						"learn failed, learn classifier error: %s" CRLF END,
						"unknown classifier");
			}
		}
		else {
			if (!rspamd_learn_task_spam (session->learn_classifier,
				session->learn_task, session->in_class, &err)) {
				if (err) {
					if (session->restful) {
						i = rspamd_snprintf (out_buf,
								sizeof (out_buf),
								"HTTP/1.0 500 Learn classifier error: %s" CRLF CRLF,
								err->message);
					}
					else {
						i = rspamd_snprintf (out_buf,
								sizeof (out_buf),
								"learn failed, learn classifier error: %s" CRLF END,
								err->message);
					}
					g_error_free (err);
				}
				else {
					if (session->restful) {
						i = rspamd_snprintf (out_buf,
								sizeof (out_buf),
								"HTTP/1.0 500 Learn classifier error: unknown" CRLF CRLF);
					}
					else {
						i = rspamd_snprintf (out_buf,
								sizeof (out_buf),
								"learn failed, learn classifier error: unknown" CRLF END);
					}
				}
			}
			else {
				if (session->restful) {
					i = rspamd_snprintf (out_buf,
							sizeof (out_buf),
							"HTTP/1.0 200 Learn OK" CRLF CRLF);
				}
				else {
					i = rspamd_snprintf (out_buf,
							sizeof (out_buf),
							"learn ok" CRLF END);
				}
			}
		}
		destroy_session (session->learn_task->s);
		session->state = STATE_REPLY;
		if (!rspamd_dispatcher_write (session->dispatcher, out_buf, i, FALSE,
			FALSE)) {
			return FALSE;
		}
		if (session->restful) {
			destroy_session (session->s);
			return FALSE;
		}
		return TRUE;
	}
	else if (session->state == STATE_REPLY) {
		if (session->restful) {
			destroy_session (session->s);
			return FALSE;
		}
		else {
			session->state = STATE_COMMAND;
			rspamd_set_dispatcher_policy (session->dispatcher,
				BUFFER_LINE,
				BUFSIZ);
		}
	}
	rspamd_dispatcher_restore (session->dispatcher);
	return TRUE;
}

static void
controller_err_socket (GError * err, void *arg)
{
	struct controller_session *session = (struct controller_session *)arg;

	if (err->code != EOF) {
		msg_info ("abnormally closing control connection, error: %s",
			err->message);
	}
	g_error_free (err);
	/* Free buffers */
	destroy_session (session->s);
}

static void
accept_socket (gint fd, short what, void *arg)
{
	struct rspamd_worker *worker = (struct rspamd_worker *)arg;
	union sa_union su;
	struct controller_session *new_session;
	struct timeval *io_tv;
	gint nfd;
	struct rspamd_controller_ctx *ctx;
	rspamd_inet_addr_t addr;

	ctx = worker->ctx;

	if ((nfd = rspamd_accept_from_socket (fd, &addr)) == -1) {
		return;
	}

	new_session = g_slice_alloc0 (sizeof (struct controller_session));
	if (new_session == NULL) {
		msg_err ("cannot allocate memory for task, %s", strerror (errno));
		return;
	}

	new_session->worker = worker;
	new_session->sock = nfd;
	new_session->cfg = worker->srv->cfg;
	new_session->state = STATE_COMMAND;
	new_session->session_pool = rspamd_mempool_new (
		rspamd_mempool_suggest_size () - 1);
	new_session->resolver = ctx->resolver;
	new_session->ev_base = ctx->ev_base;
	if (ctx->password == NULL) {
		new_session->authorized = TRUE;
	}
	worker->srv->stat->control_connections_count++;

	/* Set up dispatcher */
	io_tv =
		rspamd_mempool_alloc (new_session->session_pool,
			sizeof (struct timeval));
	io_tv->tv_sec = ctx->timeout / 1000;
	io_tv->tv_usec = ctx->timeout - io_tv->tv_sec * 1000;

	new_session->s = new_async_session (new_session->session_pool,
			NULL,
			NULL,
			free_session,
			new_session);

	new_session->dispatcher = rspamd_create_dispatcher (ctx->ev_base,
			nfd,
			BUFFER_LINE,
			controller_read_socket,
			controller_write_socket,
			controller_err_socket,
			io_tv,
			(void *)new_session);

	if (su.ss.ss_family == AF_UNIX) {
		msg_info ("accepted connection from unix socket");
		new_session->dispatcher->peer_addr = INADDR_LOOPBACK;
	}
	else if (su.ss.ss_family == AF_INET) {
		msg_info ("accepted connection from %s port %d",
			inet_ntoa (su.s4.sin_addr), ntohs (su.s4.sin_port));
		memcpy (&new_session->dispatcher->peer_addr, &su.s4.sin_addr,
			sizeof (guint32));
	}
#if 0
	/* As we support now restful HTTP like connections, this should not be printed anymore */
	if (!rspamd_dispatcher_write (new_session->dispatcher, greetingbuf,
		strlen (greetingbuf), FALSE, FALSE)) {
		msg_warn ("cannot write greeting");
	}
#endif
}

static gboolean
create_rrd_file (const gchar *filename, struct rspamd_controller_ctx *ctx)
{
	GError *err = NULL;
	GArray ar;
	struct rrd_rra_def rra[5];
	struct rrd_ds_def ds[4];

	/*
	 * DS:
	 * 1) reject as spam
	 * 2) mark as spam
	 * 3) greylist
	 * 4) pass
	 */
	/*
	 * RRA:
	 * 1) per minute AVERAGE
	 * 2) per 5 minutes AVERAGE
	 * 3) per 30 minutes AVERAGE
	 * 4) per 2 hours AVERAGE
	 * 5) per day AVERAGE
	 */
	ctx->rrd_file =
		rspamd_rrd_create (filename, 4, 5, CONTROLLER_RRD_STEP, &err);
	if (ctx->rrd_file == NULL) {
		msg_err ("cannot create rrd file %s, error: %s", filename,
			err->message);
		g_error_free (err);
		return FALSE;
	}

	/* Add all ds and rra */
	rrd_make_default_ds ("spam",		  CONTROLLER_RRD_STEP, &ds[0]);
	rrd_make_default_ds ("possible spam", CONTROLLER_RRD_STEP, &ds[1]);
	rrd_make_default_ds ("greylist",	  CONTROLLER_RRD_STEP, &ds[2]);
	rrd_make_default_ds ("ham",			  CONTROLLER_RRD_STEP, &ds[3]);

	rrd_make_default_rra ("AVERAGE", 1,	   600, &rra[0]);
	rrd_make_default_rra ("AVERAGE", 5,	   600, &rra[1]);
	rrd_make_default_rra ("AVERAGE", 30,   700, &rra[2]);
	rrd_make_default_rra ("AVERAGE", 120,  775, &rra[3]);
	rrd_make_default_rra ("AVERAGE", 1440, 797, &rra[4]);

	ar.data = (gchar *)ds;
	ar.len = sizeof (ds);
	if (!rspamd_rrd_add_ds (ctx->rrd_file, &ar, &err)) {
		msg_err ("cannot create rrd file %s, error: %s", filename,
			err->message);
		g_error_free (err);
		rspamd_rrd_close (ctx->rrd_file);
		return FALSE;
	}

	ar.data = (gchar *)rra;
	ar.len = sizeof (rra);
	if (!rspamd_rrd_add_rra (ctx->rrd_file, &ar, &err)) {
		msg_err ("cannot create rrd file %s, error: %s", filename,
			err->message);
		g_error_free (err);
		rspamd_rrd_close (ctx->rrd_file);
		return FALSE;
	}

	/* Finalize */
	if (!rspamd_rrd_finalize (ctx->rrd_file, &err)) {
		msg_err ("cannot create rrd file %s, error: %s", filename,
			err->message);
		g_error_free (err);
		rspamd_rrd_close (ctx->rrd_file);
		return FALSE;
	}

	return TRUE;
}

static void
controller_update_rrd (gint fd, short what, void *arg)
{
	struct rspamd_controller_ctx *ctx = arg;
	struct timeval tv;
	GArray ar;
	gdouble data[4];
	GError *err = NULL;

	/*
	 * Data:
	 * 1) reject as spam
	 * 2) mark as spam
	 * 3) greylist
	 * 4) pass
	 */

	tv.tv_sec = CONTROLLER_RRD_STEP;
	tv.tv_usec = 0;

	/* Fill data */
	data[0] = ctx->srv->stat->actions_stat[METRIC_ACTION_REJECT];
	data[1] = ctx->srv->stat->actions_stat[METRIC_ACTION_ADD_HEADER] +
		ctx->srv->stat->actions_stat[METRIC_ACTION_REWRITE_SUBJECT];
	data[2] = ctx->srv->stat->actions_stat[METRIC_ACTION_GREYLIST];
	data[3] = ctx->srv->stat->actions_stat[METRIC_ACTION_NOACTION];

	ar.data = (gchar *)data;
	ar.len = sizeof (data);
	if (!rspamd_rrd_add_record (ctx->rrd_file, &ar, &err)) {
		msg_err ("cannot add record to rrd database: %s, stop rrd update",
			err->message);
		g_error_free (err);
	}
	else {
		evtimer_add (&ctx->rrd_event, &tv);
	}
}

gpointer
init_controller (struct rspamd_config *cfg)
{
	struct rspamd_controller_ctx *ctx;
	GQuark type;

	type = g_quark_try_string ("controller");
	ctx = g_malloc0 (sizeof (struct rspamd_controller_ctx));

	ctx->timeout = CONTROLLER_IO_TIMEOUT * 1000;

	rspamd_rcl_register_worker_option (cfg, type, "password",
		rspamd_rcl_parse_struct_string, ctx,
		G_STRUCT_OFFSET (struct rspamd_controller_ctx, password), 0);

	rspamd_rcl_register_worker_option (cfg, type, "timeout",
		rspamd_rcl_parse_struct_time, ctx,
		G_STRUCT_OFFSET (struct rspamd_controller_ctx,
		timeout), RSPAMD_CL_FLAG_TIME_UINT_32);

	return ctx;
}

void
start_controller (struct rspamd_worker *worker)
{
	gchar *hostbuf;
	gsize hostmax;
	struct rspamd_controller_ctx *ctx = worker->ctx;
	GError *err = NULL;
	struct timeval tv;

	ctx->ev_base = rspamd_prepare_worker (worker, "controller", accept_socket);
	g_mime_init (0);

	start_time = time (NULL);

	/* Start statfile synchronization */
	if (!start_statfile_sync (worker->srv->statfile_pool, worker->srv->cfg,
		ctx->ev_base)) {
		msg_info (
			"cannot start statfile synchronization, statfiles would not be synchronized");
	}

	/* Check for rrd */
	tv.tv_sec = CONTROLLER_RRD_STEP;
	tv.tv_usec = 0;
	ctx->srv = worker->srv;
	if (worker->srv->cfg->rrd_file) {
		ctx->rrd_file = rspamd_rrd_open (worker->srv->cfg->rrd_file, &err);
		if (ctx->rrd_file == NULL) {
			msg_info ("cannot open rrd file: %s, error: %s, trying to create",
				worker->srv->cfg->rrd_file,
				err->message);
			g_error_free (err);
			/* Try to create rrd file */
			if (create_rrd_file (worker->srv->cfg->rrd_file, ctx)) {
				evtimer_set (&ctx->rrd_event, controller_update_rrd, ctx);
				event_base_set (ctx->ev_base, &ctx->rrd_event);
				evtimer_add (&ctx->rrd_event, &tv);
			}
		}
		else {
			evtimer_set (&ctx->rrd_event, controller_update_rrd, ctx);
			event_base_set (ctx->ev_base, &ctx->rrd_event);
			evtimer_add (&ctx->rrd_event, &tv);
		}
	}

	/* Fill hostname buf */
	hostmax = sysconf (_SC_HOST_NAME_MAX) + 1;
	hostbuf = alloca (hostmax);
	gethostname (hostbuf, hostmax);
	hostbuf[hostmax - 1] = '\0';
	rspamd_snprintf (greetingbuf,
		sizeof (greetingbuf),
		"Rspamd version %s is running on %s" CRLF,
		RVERSION,
		hostbuf);

	start_map_watch (worker->srv->cfg, ctx->ev_base);
	ctx->resolver = dns_resolver_init (worker->srv->logger,
			ctx->ev_base,
			worker->srv->cfg);

	event_base_loop (ctx->ev_base, 0);

	close_log (worker->srv->logger);
	if (ctx->rrd_file) {
		rspamd_rrd_close (ctx->rrd_file);
	}

	exit (EXIT_SUCCESS);
}

void
register_custom_controller_command (const gchar *name,
	controller_func_t handler,
	gboolean privilleged,
	gboolean require_message)
{
	struct custom_controller_command *cmd;

	cmd = g_malloc (sizeof (struct custom_controller_command));
	cmd->command = name;
	cmd->handler = handler;
	cmd->privilleged = privilleged;
	cmd->require_message = require_message;

	custom_commands = g_list_prepend (custom_commands, cmd);
}

/*
 * vi:ts=4
 */