/* Copyright (c) 2010, Vsevolod Stakhov
 * All rights reserved.
 *
 * Redistribution and use in source and binary forms, with or without
 * modification, are permitted provided that the following conditions are met:
 *       * Redistributions of source code must retain the above copyright
 *         notice, this list of conditions and the following disclaimer.
 *       * Redistributions in binary form must reproduce the above copyright
 *         notice, this list of conditions and the following disclaimer in the
 *         documentation and/or other materials provided with the distribution.
 *
 * THIS SOFTWARE IS PROVIDED ''AS IS'' AND ANY
 * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
 * DISCLAIMED. IN NO EVENT SHALL AUTHOR BE LIABLE FOR ANY
 * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
 * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
 * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
 * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
 */


#include "config.h"
#include "kvstorage.h"
#include "kvstorage_config.h"
#include "kvstorage_server.h"
#include "cfg_file.h"
#include "cfg_xml.h"
#include "main.h"

#define ERROR_COMMON "ERROR" CRLF
#define ERROR_UNKNOWN_COMMAND "CLIENT_ERROR unknown command" CRLF
#define ERROR_NOT_STORED "NOT_STORED" CRLF
#define ERROR_EXISTS "EXISTS" CRLF
#define ERROR_NOT_FOUND "NOT_FOUND" CRLF
#define ERROR_INVALID_KEYSTORAGE "CLIENT_ERROR storage does not exists" CRLF

#define ERROR_REDIS_OK "+OK" CRLF


static sig_atomic_t wanna_die = 0;
static sig_atomic_t do_reopen_log = 0;
static sig_atomic_t soft_wanna_die = 0;

/* Logging functions */
#define thr_err(...)    do {                                                                            \
		g_mutex_lock (thr->log_mtx);                                                                \
		rspamd_common_log_function (rspamd_main->logger, \
			G_LOG_LEVEL_CRITICAL, \
			__FUNCTION__, \
			__VA_ARGS__);   \
		g_mutex_unlock (thr->log_mtx);                                                              \
} while (0)

#define thr_warn(...)   do {                                                                            \
		g_mutex_lock (thr->log_mtx);                                                                \
		rspamd_common_log_function (rspamd_main->logger, \
			G_LOG_LEVEL_WARNING, \
			__FUNCTION__, \
			__VA_ARGS__);    \
		g_mutex_unlock (thr->log_mtx);                                                              \
} while (0)

#define thr_info(...)   do {                                                                            \
		g_mutex_lock (thr->log_mtx);                                                                \
		rspamd_common_log_function (rspamd_main->logger, \
			G_LOG_LEVEL_INFO, \
			__FUNCTION__, \
			__VA_ARGS__);   \
		g_mutex_unlock (thr->log_mtx);                                                              \
} while (0)

/* Init functions */
gpointer init_keystorage (void);
void start_keystorage (struct rspamd_worker *worker);

worker_t keystorage_worker = {
	"keystorage",               /* Name */
	init_keystorage,            /* Init function */
	start_keystorage,           /* Start function */
	TRUE,                       /* Has socket */
	FALSE,                      /* Non unique */
	TRUE,                       /* Non threaded */
	FALSE,                      /* Non killable */
	SOCK_STREAM                 /* TCP socket */
};

#ifndef HAVE_SA_SIGINFO
static void
sig_handler (gint signo)
#else
static void
sig_handler (gint signo, siginfo_t *info, void *unused)
#endif
{
	switch (signo) {
	case SIGUSR1:
		do_reopen_log = 1;
		break;
	case SIGINT:
	case SIGTERM:
		wanna_die = 1;
		break;
	case SIGUSR2:
		soft_wanna_die = 1;
		break;
	}
}

gpointer
init_keystorage (void)
{
	struct kvstorage_worker_ctx *ctx;
	GQuark type;

	type = g_quark_try_string ("keystorage");
	ctx = g_malloc0 (sizeof (struct kvstorage_worker_ctx));
	ctx->pool = rspamd_mempool_new (rspamd_mempool_suggest_size ());

	/* Set default values */
	ctx->timeout_raw = 300000;

	register_worker_opt (type, "timeout", xml_handle_seconds, ctx,
		G_STRUCT_OFFSET (struct kvstorage_worker_ctx, timeout_raw));
	register_worker_opt (type, "redis", xml_handle_boolean, ctx,
		G_STRUCT_OFFSET (struct kvstorage_worker_ctx, is_redis));
	return ctx;
}

/* Make post-init configuration */
static gboolean
config_kvstorage_worker (struct rspamd_worker *worker)
{
	struct kvstorage_worker_ctx *ctx = worker->ctx;

	/* Init timeval */
	msec_to_tv (ctx->timeout_raw, &ctx->io_timeout);

	return TRUE;
}

/*
 * Free kvstorage session
 */
static void
free_kvstorage_session (struct kvstorage_session *session)
{
	rspamd_remove_dispatcher (session->dispather);
	rspamd_mempool_delete (session->pool);
	close (session->sock);
	g_slice_free1 (sizeof (struct kvstorage_session), session);
}

/*
 * Parse kvstorage command
 */
static gboolean
parse_kvstorage_command (struct kvstorage_session *session, gchar *c, guint len)
{
	if (len == 3) {
		/* Set or get command */
		if ((c[0] == 'g' || c[0] == 'G') &&
			(c[1] == 'e' || c[1] == 'E') &&
			(c[2] == 't' || c[2] == 'T')) {
			session->command = KVSTORAGE_CMD_GET;
		}
		else if ((c[0] == 's' || c[0] == 'S') &&
			(c[1] == 'e' || c[1] == 'E') &&
			(c[2] == 't' || c[2] == 'T')) {
			session->command = KVSTORAGE_CMD_SET;
		}
		else if ((c[0] == 'd' || c[0] == 'D') &&
			(c[1] == 'e' || c[1] == 'E') &&
			(c[2] == 'l' || c[2] == 'L')) {
			session->command = KVSTORAGE_CMD_DELETE;
		}
		else {
			/* Error */
			return FALSE;
		}
	}
	else if (len == 4) {
		if ((c[0] == 'i' || c[0] == 'I')     &&
			(c[1] == 'n' || c[1] == 'N') &&
			(c[2] == 'c' || c[2] == 'C') &&
			(c[3] == 'r' || c[3] == 'R')) {
			session->command = KVSTORAGE_CMD_INCR;
			session->arg_data.value = 1;
		}
		else if ((c[0] == 'd' || c[0] == 'D') &&
			(c[1] == 'e' || c[1] == 'E')  &&
			(c[2] == 'c' || c[2] == 'C')  &&
			(c[3] == 'r' || c[3] == 'R')) {
			session->command = KVSTORAGE_CMD_DECR;
			session->arg_data.value = -1;
		}
		else if (g_ascii_strncasecmp (c, "quit", 4) == 0) {
			session->command = KVSTORAGE_CMD_QUIT;
		}
		else if (g_ascii_strncasecmp (c, "sync",
			4) == 0 || g_ascii_strncasecmp (c, "save", 4) == 0) {
			session->command = KVSTORAGE_CMD_SYNC;
		}
	}
	else if (len == 6) {
		if ((c[0] == 'i' || c[0] == 'I')     &&
			(c[1] == 'n' || c[1] == 'N') &&
			(c[2] == 'c' || c[2] == 'C') &&
			(c[3] == 'r' || c[3] == 'R') &&
			(c[4] == 'b' || c[4] == 'B') &&
			(c[5] == 'y' || c[5] == 'Y')) {
			session->command = KVSTORAGE_CMD_INCR;
			session->arg_data.value = 1;
		}
		else if ((c[0] == 'd' || c[0] == 'D') &&
			(c[1] == 'e' || c[1] == 'E')  &&
			(c[2] == 'c' || c[2] == 'C')  &&
			(c[3] == 'r' || c[3] == 'R')  &&
			(c[4] == 'b' || c[4] == 'B')  &&
			(c[5] == 'y' || c[5] == 'Y')) {
			session->command = KVSTORAGE_CMD_DECR;
			session->arg_data.value = -1;
		}
		else if ((c[0] == 'd' || c[0] == 'D') &&
			(c[1] == 'e' || c[1] == 'E')  &&
			(c[2] == 'l' || c[2] == 'L')  &&
			(c[3] == 'e' || c[3] == 'E')  &&
			(c[4] == 't' || c[4] == 'T')  &&
			(c[5] == 'e' || c[5] == 'E')) {
			session->command = KVSTORAGE_CMD_DELETE;
		}
		else if (g_ascii_strncasecmp (c, "select", 6) == 0) {
			session->command = KVSTORAGE_CMD_SELECT;
		}
		else {
			return FALSE;
		}
	}

	return TRUE;
}

/**
 * Parse kvstorage line
 */
static gboolean
parse_kvstorage_line (struct kvstorage_session *session, rspamd_fstring_t *in)
{
	gchar *p, *c, *end;
	gint state = 0, next_state = 0;
	gboolean is_redis;

	p = in->begin;
	end = in->begin + in->len;
	c = p;
	is_redis = session->thr->ctx->is_redis;

	/* State machine for parsing */
	while (p <= end) {
		switch (state) {
		case 0:
			/* At this state we try to read identifier of storage */
			if (g_ascii_isdigit (*p)) {
				p++;
			}
			else {
				if (g_ascii_isspace (*p) && p != c) {
					/* We have some digits, so parse id */
					session->id = strtoul (c, NULL, 10);
					state = 99;
					next_state = 1;
				}
				else if (c == p) {
					if (*p != '*') {
						/* We have some character, so assume id as 0 and parse command */
						session->id = 0;
						state = 1;
					}
					else {
						/* In fact it is redis number of commands */
						c = ++p;
						state = 7;
						session->id = 0;
					}
				}
				else {
					/* We have something wrong here (like some digits and then come non-digits) */
					return FALSE;
				}
			}
			break;
		case 1:
			/* At this state we parse command */
			if (g_ascii_isalpha (*p) && p != end) {
				p++;
			}
			else {
				if (parse_kvstorage_command (session, c, p - c)) {
					switch (session->command) {

					case KVSTORAGE_CMD_QUIT:
					case KVSTORAGE_CMD_SYNC:
						/* Single argument command */
						state = 100;
						break;
					case KVSTORAGE_CMD_SELECT:
						/* Select command, read id next */
						state = 99;
						next_state = 6;
						break;
					default:
						/* Normal command, read key */
						state = 99;
						next_state = 2;
					}
				}
				else {
					/* Some error */
					return FALSE;
				}
			}
			break;
		case 2:
			/* Read and store key */
			if (!g_ascii_isspace (*p) && end != p) {
				p++;
			}
			else {
				if (p == c) {
					return FALSE;
				}
				else {
					session->key = rspamd_mempool_alloc (session->pool,
							p - c + 1);
					rspamd_strlcpy (session->key, c, p - c + 1);
					session->keylen = p - c;
					/* Now we must select next state based on command */
					if (session->command == KVSTORAGE_CMD_SET ||
						session->command == KVSTORAGE_CMD_INCR ||
						session->command == KVSTORAGE_CMD_DECR) {
						/* Read flags */
						state = 99;
						if (is_redis) {
							next_state = 5;
							session->flags = 0;
							session->expire = 0;
						}
						else {
							if (session->command == KVSTORAGE_CMD_SET) {
								next_state = 3;
							}
							else {
								next_state = 5;
							}
						}
					}
					else {
						/* Nothing to read for other commands */
						state = 100;
					}
				}
			}
			break;
		case 3:
			/* Read flags */
			if (g_ascii_isdigit (*p)) {
				p++;
			}
			else {
				if (g_ascii_isspace (*p)) {
					session->flags = strtoul (c, NULL, 10);
					state = 99;
					if (session->command == KVSTORAGE_CMD_SET) {
						next_state = 4;
					}
					else {
						/* INCR and DECR */
						next_state = 5;
					}
				}
				else {
					return FALSE;
				}
			}
			break;
		case 4:
			/* Read exptime */
			if (g_ascii_isdigit (*p)) {
				p++;
			}
			else {
				if (g_ascii_isspace (*p)) {
					session->expire = strtoul (c, NULL, 10);
					state = 99;
					next_state = 5;
				}
				else {
					return FALSE;
				}
			}
			break;
		case 5:
			/* Read size or incr/decr values */
			if (g_ascii_isdigit (*p)) {
				p++;
			}
			else {
				if (g_ascii_isspace (*p) || p >= end - 1) {
					if (session->command == KVSTORAGE_CMD_SET) {
						session->arg_data.length = strtoul (c, NULL, 10);
					}
					else {
						if (p != c) {
							session->arg_data.value = strtoul (c, NULL, 10);
							if (session->command == KVSTORAGE_CMD_DECR) {
								session->arg_data.value =
									-session->arg_data.value;
							}
						}
						else if (session->command == KVSTORAGE_CMD_INCR) {
							session->arg_data.value = 1;
						}
						else {
							session->arg_data.value = -1;
						}
					}
					state = 100;
				}
				else {
					return FALSE;
				}
			}
			break;
		case 6:
			/* Read index of storage */
			if (g_ascii_isdigit (*p)) {
				p++;
			}
			else {
				if (g_ascii_isspace (*p) || end == p) {
					session->id = strtoul (c, NULL, 10);
					state = 100;
				}
				else {
					return FALSE;
				}
			}
			break;
		case 7:
			/* Read arguments count */
			if (g_ascii_isdigit (*p)) {
				p++;
			}
			else {
				if (g_ascii_isspace (*p) || end == p) {
					session->argc = strtoul (c, NULL, 10);
					session->argnum = 0;
					state = 100;
					/* Switch to arglen state */
					session->state = KVSTORAGE_STATE_READ_ARGLEN;
				}
				else {
					return FALSE;
				}
			}
			break;
		case 99:
			/* Skip spaces state */
			if (g_ascii_isspace (*p)) {
				p++;
			}
			else {
				c = p;
				state = next_state;
			}
			break;
		case 100:
			/* Successful state */
			return TRUE;
			break;
		}
	}

	return state == 100;
}

/* Process normal kvstorage command */
static gboolean
kvstorage_process_command (struct kvstorage_session *session, gboolean is_redis)
{
	gint r;
	gchar outbuf[BUFSIZ], intbuf[sizeof ("9223372036854775807")];
	gboolean res;
	struct rspamd_kv_element *elt;
	guint eltlen;
	glong longval;

	if (session->command == KVSTORAGE_CMD_SET) {
		session->state = KVSTORAGE_STATE_READ_DATA;
		rspamd_set_dispatcher_policy (session->dispather,
			BUFFER_CHARACTER,
			session->arg_data.length);
	}
	else if (session->command == KVSTORAGE_CMD_GET) {
		elt = rspamd_kv_storage_lookup (session->cf->storage,
				session->key,
				session->keylen,
				session->now);
		if (elt == NULL) {
			RW_R_UNLOCK (&session->cf->storage->rwlock);
			if (!is_redis) {
				return rspamd_dispatcher_write (session->dispather,
						   ERROR_NOT_FOUND,
						   sizeof (ERROR_NOT_FOUND) - 1,
						   FALSE,
						   TRUE);
			}
			else {
				return rspamd_dispatcher_write (session->dispather, "$-1" CRLF,
						   sizeof ("$-1" CRLF) - 1, FALSE, TRUE);
			}
		}
		else {
			if (elt->flags & KV_ELT_INTEGER) {
				eltlen = rspamd_snprintf (intbuf,
						sizeof (intbuf),
						"%l",
						ELT_LONG (elt));

			}
			else {
				eltlen = elt->size;
			}

			if (!is_redis) {
				r = rspamd_snprintf (outbuf,
						sizeof (outbuf),
						"VALUE %s %ud %ud" CRLF,
						ELT_KEY (elt),
						elt->flags,
						eltlen);
			}
			else {
				r = rspamd_snprintf (outbuf, sizeof (outbuf), "$%ud" CRLF,
						eltlen);
			}
			if (!rspamd_dispatcher_write (session->dispather, outbuf,
				r, TRUE, FALSE)) {
				RW_R_UNLOCK (&session->cf->storage->rwlock);
				return FALSE;
			}
			if (elt->flags & KV_ELT_INTEGER) {
				if (!rspamd_dispatcher_write (session->dispather, intbuf,
					eltlen, TRUE, TRUE)) {
					RW_R_UNLOCK (&session->cf->storage->rwlock);
					return FALSE;
				}
			}
			else {
				if (!rspamd_dispatcher_write (session->dispather,
					ELT_DATA (elt), eltlen, TRUE, TRUE)) {
					RW_R_UNLOCK (&session->cf->storage->rwlock);
					return FALSE;
				}
			}
			session->elt = elt;
			if (!is_redis) {
				res = rspamd_dispatcher_write (session->dispather,
						CRLF "END" CRLF,
						sizeof (CRLF "END" CRLF) - 1,
						FALSE,
						TRUE);
			}
			else {
				res = rspamd_dispatcher_write (session->dispather, CRLF,
						sizeof (CRLF) - 1, FALSE, TRUE);
			}
			if (!res) {
				RW_R_UNLOCK (&session->cf->storage->rwlock);
			}

			return res;
		}
	}
	else if (session->command == KVSTORAGE_CMD_DELETE) {
		elt = rspamd_kv_storage_delete (session->cf->storage,
				session->key,
				session->keylen);
		if (elt != NULL) {
			if ((elt->flags & KV_ELT_DIRTY) == 0) {
				/* Free memory if backend has deleted this element */
				g_slice_free1 (ELT_SIZE (elt), elt);
			}
			if (!is_redis) {
				return rspamd_dispatcher_write (session->dispather,
						   "DELETED" CRLF,
						   sizeof ("DELETED" CRLF) - 1,
						   FALSE,
						   TRUE);
			}
			else {
				return rspamd_dispatcher_write (session->dispather, ":1" CRLF,
						   sizeof (":1" CRLF) - 1, FALSE, TRUE);
			}
		}
		else {
			if (!is_redis) {
				return rspamd_dispatcher_write (session->dispather,
						   ERROR_NOT_FOUND,
						   sizeof (ERROR_NOT_FOUND) - 1,
						   FALSE,
						   TRUE);
			}
			else {
				return rspamd_dispatcher_write (session->dispather, ":0" CRLF,
						   sizeof (":0" CRLF) - 1, FALSE, TRUE);
			}
		}
	}
	else if (session->command == KVSTORAGE_CMD_INCR || session->command ==
		KVSTORAGE_CMD_DECR) {
		longval = session->arg_data.value;
		if (!rspamd_kv_storage_increment (session->cf->storage, session->key,
			session->keylen, &longval)) {
			if (!is_redis) {
				return rspamd_dispatcher_write (session->dispather,
						   ERROR_NOT_FOUND,
						   sizeof (ERROR_NOT_FOUND) - 1,
						   FALSE,
						   TRUE);
			}
			else {
				return rspamd_dispatcher_write (session->dispather,
						   "-ERR not found" CRLF,
						   sizeof ("-ERR not found" CRLF) - 1,
						   FALSE,
						   TRUE);
			}
		}
		else {
			if (!is_redis) {
				r = rspamd_snprintf (outbuf, sizeof (outbuf), "%l" CRLF,
						longval);
			}
			else {
				r = rspamd_snprintf (outbuf, sizeof (outbuf), ":%l" CRLF,
						longval);
			}
			if (!rspamd_dispatcher_write (session->dispather, outbuf,
				r, FALSE, FALSE)) {
				return FALSE;
			}
		}
	}
	else if (session->command == KVSTORAGE_CMD_SYNC) {
		if (session->cf->storage->backend == NULL ||
			session->cf->storage->backend->sync_func == NULL) {
			if (!is_redis) {
				return rspamd_dispatcher_write (session->dispather,
						   ERROR_COMMON,
						   sizeof (ERROR_COMMON) - 1,
						   FALSE,
						   TRUE);
			}
			else {
				return rspamd_dispatcher_write (session->dispather,
						   "-ERR unsupported" CRLF,
						   sizeof ("-ERR unsupported" CRLF) - 1,
						   FALSE,
						   TRUE);
			}
		}
		else {
			if (session->cf->storage->backend->sync_func (session->cf->storage->
				backend)) {
				if (!is_redis) {
					return rspamd_dispatcher_write (session->dispather,
							   "SYNCED" CRLF,
							   sizeof ("SYNCED" CRLF) - 1,
							   FALSE,
							   TRUE);
				}
				else {
					return rspamd_dispatcher_write (session->dispather,
							   "+OK" CRLF,
							   sizeof ("+OK" CRLF) - 1,
							   FALSE,
							   TRUE);
				}
			}
			else {
				if (!is_redis) {
					return rspamd_dispatcher_write (session->dispather,
							   "NOT_SYNCED" CRLF,
							   sizeof ("NOT_SYNCED" CRLF) - 1,
							   FALSE,
							   TRUE);
				}
				else {
					return rspamd_dispatcher_write (session->dispather,
							   "-ERR not synced" CRLF,
							   sizeof ("-ERR not synced" CRLF) - 1,
							   FALSE,
							   TRUE);
				}
			}
		}
	}
	else if (session->command == KVSTORAGE_CMD_SELECT) {
		if (!is_redis) {
			return rspamd_dispatcher_write (session->dispather, "SELECTED" CRLF,
					   sizeof ("SELECTED" CRLF) - 1, FALSE, TRUE);
		}
		else {
			return rspamd_dispatcher_write (session->dispather, "+OK" CRLF,
					   sizeof ("+OK" CRLF) - 1, FALSE, TRUE);
		}
	}
	else if (session->command == KVSTORAGE_CMD_QUIT) {
		/* Quit session */
		free_kvstorage_session (session);
		return FALSE;
	}

	return TRUE;
}

static gboolean
kvstorage_read_arglen (rspamd_fstring_t *in, guint *len)
{
	gchar *p = in->begin, *end = in->begin + in->len, *c;
	gint state = 0;

	c = p;
	while (p < end) {
		switch (state) {
		case 0:
			if (*p != '$') {
				return FALSE;
			}
			else {
				p++;
				c = p;
				state = 1;
			}
			break;
		case 1:
			if (g_ascii_isdigit (*p) && p != end - 1) {
				p++;
			}
			else {
				if (p != end - 1) {
					return FALSE;
				}
				else {
					*len = strtoul (c, NULL, 10);
					return TRUE;
				}
			}
			break;
		}
	}

	return TRUE;
}

/*
 * Check number of arguments for a command
 */
static gboolean
kvstorage_check_argnum (struct kvstorage_session *session)
{
	switch (session->command) {
	case KVSTORAGE_CMD_QUIT:
	case KVSTORAGE_CMD_SYNC:
		return session->argc == 1;
	case KVSTORAGE_CMD_SET:
		return session->argc == 3 || session->argc == 4;
	case KVSTORAGE_CMD_INCR:
	case KVSTORAGE_CMD_DECR:
		return session->argc == 2 || session->argc == 3;
	default:
		return session->argc == 2;
	}

	/* Unreachable */
	return FALSE;
}

/**
 * Dispatcher callbacks
 */

/*
 * Callback that is called when there is data to read in buffer
 */
static gboolean
kvstorage_read_socket (rspamd_fstring_t * in, void *arg)
{
	struct kvstorage_session *session = (struct kvstorage_session *) arg;
	struct kvstorage_worker_thread *thr;
	gint r;
	guint arglen = 0;
	gchar outbuf[BUFSIZ];
	gboolean is_redis;

	if (in->len == 0) {
		/* Skip empty commands */
		return TRUE;
	}
	thr = session->thr;
	is_redis = thr->ctx->is_redis;

	switch (session->state) {
	case KVSTORAGE_STATE_READ_CMD:
		/* Update timestamp */
		session->now = time (NULL);
		if (!parse_kvstorage_line (session, in)) {
			thr_info ("%ud: unknown command: %V", thr->id, in);
			if (!is_redis) {
				return rspamd_dispatcher_write (session->dispather,
						   ERROR_UNKNOWN_COMMAND,
						   sizeof (ERROR_UNKNOWN_COMMAND) - 1,
						   FALSE,
						   TRUE);
			}
			else {
				r = rspamd_snprintf (outbuf,
						sizeof (outbuf),
						"-ERR unknown command '%V'" CRLF,
						in);
				return rspamd_dispatcher_write (session->dispather, outbuf,
						   r, FALSE, TRUE);
			}
		}
		else {
			session->cf = get_kvstorage_config (session->id);
			if (session->cf == NULL) {
				thr_info ("%ud: bad keystorage: %ud", thr->id, session->id);
				if (!is_redis) {
					return rspamd_dispatcher_write (session->dispather,
							   ERROR_INVALID_KEYSTORAGE,
							   sizeof (ERROR_INVALID_KEYSTORAGE) - 1,
							   FALSE,
							   TRUE);
				}
				else {
					return rspamd_dispatcher_write (session->dispather,
							   "-ERR unknown keystorage" CRLF,
							   sizeof ("-ERR unknown keystorage" CRLF) - 1,
							   FALSE,
							   TRUE);
				}
			}
			if (session->state != KVSTORAGE_STATE_READ_ARGLEN) {
				return kvstorage_process_command (session, is_redis);
			}
		}
		break;
	case KVSTORAGE_STATE_READ_ARGLEN:
		if (!kvstorage_read_arglen (in, &arglen)) {
			session->state = KVSTORAGE_STATE_READ_CMD;
			r = rspamd_snprintf (outbuf,
					sizeof (outbuf),
					"-ERR unknown arglen '%V'" CRLF,
					in);
			return rspamd_dispatcher_write (session->dispather, outbuf,
					   r, FALSE, TRUE);
		}
		else {
			session->state = KVSTORAGE_STATE_READ_ARG;
			rspamd_set_dispatcher_policy (session->dispather,
				BUFFER_CHARACTER,
				arglen);
		}
		break;
	case KVSTORAGE_STATE_READ_ARG:
		if (session->argnum == 0) {
			/* Read command */
			if (!parse_kvstorage_command (session, in->begin, in->len)) {
				session->state = KVSTORAGE_STATE_READ_CMD;
				r = rspamd_snprintf (outbuf,
						sizeof (outbuf),
						"-ERR unknown command '%V'" CRLF,
						in);
				return rspamd_dispatcher_write (session->dispather, outbuf,
						   r, FALSE, TRUE);
			}
			else {
				if (!kvstorage_check_argnum (session)) {
					session->state = KVSTORAGE_STATE_READ_CMD;
					r = rspamd_snprintf (outbuf,
							sizeof (outbuf),
							"-ERR invalid argnum for command '%V': %ud" CRLF,
							in,
							session->argc);
					return rspamd_dispatcher_write (session->dispather, outbuf,
							   r, FALSE, TRUE);
				}
				else {
					if (session->argnum == session->argc - 1) {
						session->state = KVSTORAGE_STATE_READ_CMD;
						rspamd_set_dispatcher_policy (session->dispather,
							BUFFER_LINE,
							-1);
						return kvstorage_process_command (session, TRUE);
					}
					else {
						session->argnum++;
						session->state = KVSTORAGE_STATE_READ_ARGLEN;
						rspamd_set_dispatcher_policy (session->dispather,
							BUFFER_LINE,
							-1);
					}
				}
			}
		}
		else if (session->argnum == 1) {
			if (session->command != KVSTORAGE_CMD_SELECT) {
				/* This argument is a key for normal command */
				session->key = rspamd_mempool_fstrdup (session->pool, in);
				session->keylen = in->len;
				if (session->argnum == session->argc - 1) {
					session->state = KVSTORAGE_STATE_READ_CMD;
					rspamd_set_dispatcher_policy (session->dispather,
						BUFFER_LINE,
						-1);
					return kvstorage_process_command (session, TRUE);
				}
				else {
					session->argnum++;
					session->state = KVSTORAGE_STATE_READ_ARGLEN;
					rspamd_set_dispatcher_policy (session->dispather,
						BUFFER_LINE,
						-1);
				}
			}
			else {
				/* Special case for select command */
				session->state = KVSTORAGE_STATE_READ_CMD;
				rspamd_strlcpy (outbuf, in->begin, MIN (sizeof (outbuf),
					in->len));
				session->id = strtoul (outbuf, NULL, 10);
				rspamd_set_dispatcher_policy (session->dispather,
					BUFFER_LINE,
					-1);
				return kvstorage_process_command (session, TRUE);
			}
		}
		else if (session->argnum == 2) {
			/* We get datablock for set command */
			if (session->command == KVSTORAGE_CMD_SET && session->argc == 3) {
				session->state = KVSTORAGE_STATE_READ_CMD;
				rspamd_set_dispatcher_policy (session->dispather,
					BUFFER_LINE,
					-1);
				if (rspamd_kv_storage_insert (session->cf->storage,
					session->key, session->keylen,
					in->begin, in->len,
					session->flags, session->expire)) {
					return rspamd_dispatcher_write (session->dispather,
							   "+OK" CRLF,
							   sizeof ("+OK" CRLF) - 1,
							   FALSE,
							   TRUE);
				}
				else {
					return rspamd_dispatcher_write (session->dispather,
							   "-ERR not stored" CRLF,
							   sizeof ("-ERR not stored" CRLF) - 1,
							   FALSE,
							   TRUE);
				}
			}
			else if (session->command == KVSTORAGE_CMD_SET && session->argc ==
				4) {
				/* It is expire argument */
				session->state = KVSTORAGE_STATE_READ_CMD;
				rspamd_strtol (in->begin, in->len, (glong *)&session->expire);
				session->argnum++;
				session->state = KVSTORAGE_STATE_READ_ARGLEN;
				rspamd_set_dispatcher_policy (session->dispather,
					BUFFER_LINE,
					-1);
			}
			else {
				session->state = KVSTORAGE_STATE_READ_CMD;
				rspamd_strtol (in->begin, in->len, &session->arg_data.value);
				if (session->command == KVSTORAGE_CMD_DECR) {
					session->arg_data.value = -session->arg_data.value;
				}
				rspamd_set_dispatcher_policy (session->dispather,
					BUFFER_LINE,
					-1);
				return kvstorage_process_command (session, TRUE);
			}
		}
		else if (session->argnum == 3) {
			/* We get datablock for set command */
			if (session->command == KVSTORAGE_CMD_SET && session->argc == 4) {
				session->state = KVSTORAGE_STATE_READ_CMD;
				rspamd_set_dispatcher_policy (session->dispather,
					BUFFER_LINE,
					-1);
				if (rspamd_kv_storage_insert (session->cf->storage,
					session->key, session->keylen,
					in->begin, in->len,
					session->flags, session->expire)) {
					return rspamd_dispatcher_write (session->dispather,
							   "+OK" CRLF,
							   sizeof ("+OK" CRLF) - 1,
							   FALSE,
							   TRUE);
				}
				else {
					return rspamd_dispatcher_write (session->dispather,
							   "-ERR not stored" CRLF,
							   sizeof ("-ERR not stored" CRLF) - 1,
							   FALSE,
							   TRUE);
				}
			}
		}
		break;
	case KVSTORAGE_STATE_READ_DATA:
		session->state = KVSTORAGE_STATE_READ_CMD;
		rspamd_set_dispatcher_policy (session->dispather, BUFFER_LINE, -1);
		if (rspamd_kv_storage_insert (session->cf->storage, session->key,
			session->keylen,
			in->begin, in->len,
			session->flags, session->expire)) {
			if (!is_redis) {
				return rspamd_dispatcher_write (session->dispather,
						   "STORED" CRLF,
						   sizeof ("STORED" CRLF) - 1,
						   FALSE,
						   TRUE);
			}
			else {
				return rspamd_dispatcher_write (session->dispather, "+OK" CRLF,
						   sizeof ("+OK" CRLF) - 1, FALSE, TRUE);
			}
		}
		else {
			if (!is_redis) {
				return rspamd_dispatcher_write (session->dispather,
						   ERROR_NOT_STORED,
						   sizeof (ERROR_NOT_STORED) - 1,
						   FALSE,
						   TRUE);
			}
			else {
				return rspamd_dispatcher_write (session->dispather,
						   "-ERR not stored" CRLF,
						   sizeof ("-ERR not stored" CRLF) - 1,
						   FALSE,
						   TRUE);
			}
		}

		break;
	}

	return TRUE;
}

/*
 * Called if buffers were written
 */
static gboolean
kvstorage_write_socket (void *arg)
{
	struct kvstorage_session *session = (struct kvstorage_session *) arg;

	if (session->elt) {
		if ((session->elt->flags & KV_ELT_NEED_INSERT) != 0) {
			/* Insert to cache and free element */
			session->elt->flags &= ~KV_ELT_NEED_INSERT;
			RW_R_UNLOCK (&session->cf->storage->rwlock);
			rspamd_kv_storage_insert_cache (session->cf->storage,
				ELT_KEY (session->elt),
				session->elt->keylen, ELT_DATA (session->elt),
				session->elt->size, session->elt->flags,
				session->elt->expire, NULL);
			g_free (session->elt);
			session->elt = NULL;
			return TRUE;
		}
		RW_R_UNLOCK (&session->cf->storage->rwlock);
		session->elt = NULL;

	}

	return TRUE;
}

/*
 * Called if something goes wrong
 */
static void
kvstorage_err_socket (GError * err, void *arg)
{
	struct kvstorage_session *session = (struct kvstorage_session *) arg;
	struct kvstorage_worker_thread *thr;

	thr = session->thr;
	if (err->code != -1) {
		thr_info ("%ud: abnormally closing connection from: %s, error: %s",
			thr->id, inet_ntoa (session->client_addr), err->message);
	}

	if (session->elt) {
		RW_R_UNLOCK (&session->cf->storage->rwlock);
		session->elt = NULL;
	}

	g_error_free (err);
	free_kvstorage_session (session);
}

/**
 * Accept function
 */
static void
thr_accept_socket (gint fd, short what, void *arg)
{
	struct kvstorage_worker_thread *thr = (struct kvstorage_worker_thread *)arg;
	union sa_union su;
	socklen_t addrlen = sizeof (su.ss);
	gint nfd;
	struct kvstorage_session *session;

	g_mutex_lock (thr->accept_mtx);
	if ((nfd =
		accept_from_socket (fd, (struct sockaddr *)&su.ss, &addrlen)) == -1) {
		thr_warn ("%ud: accept failed: %s", thr->id, strerror (errno));
		g_mutex_unlock (thr->accept_mtx);
		return;
	}

	/* Check for EAGAIN */
	if (nfd == 0) {
		g_mutex_unlock (thr->accept_mtx);
		return;
	}

	session = g_slice_alloc0 (sizeof (struct kvstorage_session));
	session->pool = rspamd_mempool_new (rspamd_mempool_suggest_size ());
	session->state = KVSTORAGE_STATE_READ_CMD;
	session->thr = thr;
	session->sock = nfd;
	session->dispather = rspamd_create_dispatcher (thr->ev_base,
			nfd,
			BUFFER_LINE,
			kvstorage_read_socket,
			kvstorage_write_socket,
			kvstorage_err_socket,
			thr->tv,
			session);

	g_mutex_unlock (thr->accept_mtx);
	session->elt = NULL;

	if (su.ss.ss_family == AF_UNIX) {
		session->client_addr.s_addr = INADDR_NONE;
	}
	else if (su.ss.ss_family == AF_INET) {
		memcpy (&session->client_addr, &su.s4.sin_addr,
			sizeof (struct in_addr));
	}
}

/**
 * Handle termination
 */
static void
thr_term_socket (gint fd, short what, void *arg)
{
	struct kvstorage_worker_thread *thr = (struct kvstorage_worker_thread *)arg;
	struct timeval tv;

	if (read (fd, &tv, sizeof (struct timeval)) != sizeof (struct timeval)) {
		thr_err ("cannot read data from socket: %s", strerror (errno));
		tv.tv_sec = 0;
		tv.tv_usec = 0;
	}

	event_base_loopexit (thr->ev_base, &tv);
	event_del (&thr->bind_ev);
}

/**
 * Thread main worker function
 */
static gpointer
kvstorage_thread (gpointer ud)
{
	struct kvstorage_worker_thread *thr = ud;

	/* Block signals as it is dispatcher deity */
	sigprocmask (SIG_BLOCK, thr->signals, NULL);
	/* Init thread specific events */
	thr->ev_base = event_init ();

	event_set (&thr->bind_ev,
		thr->worker->cf->listen_sock,
		EV_READ | EV_PERSIST,
		thr_accept_socket,
		(void *)thr);
	event_base_set (thr->ev_base, &thr->bind_ev);
	event_add (&thr->bind_ev, NULL);

	event_set (&thr->term_ev,
		thr->term_sock[0],
		EV_READ | EV_PERSIST,
		thr_term_socket,
		(void *)thr);
	event_base_set (thr->ev_base, &thr->term_ev);
	event_add (&thr->term_ev, NULL);

	event_base_loop (thr->ev_base, 0);

	return NULL;
}

/**
 * Create new thread, set it detached
 */
static struct kvstorage_worker_thread *
create_kvstorage_thread (struct rspamd_worker *worker,
	struct kvstorage_worker_ctx *ctx,
	guint id,
	sigset_t *signals)
{
	struct kvstorage_worker_thread *new;
	GError *err = NULL;

	new =
		rspamd_mempool_alloc (ctx->pool,
			sizeof (struct kvstorage_worker_thread));
	new->ctx = ctx;
	new->worker = worker;
	new->tv = &ctx->io_timeout;
	new->log_mtx = ctx->log_mtx;
	new->accept_mtx = ctx->accept_mtx;
	new->id = id;

	/* Create and setup terminating socket */
	if (rspamd_socketpair (new->term_sock) == -1) {
		msg_err ("socket failed: %s", strerror (errno));
		return NULL;
	}
	rspamd_socket_nonblocking (new->term_sock[0]);

#if ((GLIB_MAJOR_VERSION == 2) && (GLIB_MINOR_VERSION <= 30))
	new->thr = g_thread_create (kvstorage_thread, new, FALSE, &err);
#else
	gchar *name;

	name = rspamd_mempool_alloc (ctx->pool,
			sizeof ("kvstorage_thread") + sizeof ("4294967296") - 1);
	rspamd_snprintf (name,
		sizeof ("kvstorage_thread") + sizeof ("4294967296") - 1,
		"kvstorage_thread%d",
		id);

	new->thr = g_thread_new (name, kvstorage_thread, new);
#endif
	new->ev_base = NULL;
	new->signals = signals;

	if (new->thr == NULL) {
		msg_err ("cannot create thread: %s", err->message);
	}

	return new;
}

/*
 * Start worker process
 */
void
start_keystorage (struct rspamd_worker *worker)
{
	struct sigaction signals;
	struct kvstorage_worker_ctx *ctx = worker->ctx;
	guint i;
	struct kvstorage_worker_thread *thr;
	struct timeval tv;
	GList *cur;

	gperf_profiler_init (worker->srv->cfg, "kvstorage");

	if (!g_thread_supported ()) {
		msg_err (
			"threads support is not supported on your system so kvstorage is not functionable");
		exit (EXIT_SUCCESS);
	}
	/* Create socketpair */
	if (rspamd_socketpair (ctx->s_pair) == -1) {
		msg_err ("cannot create socketpair, exiting");
		exit (EXIT_SUCCESS);
	}
	worker->srv->pid = getpid ();
	ctx->threads = NULL;

#if _EVENT_NUMERIC_VERSION > 0x02000000
	if (evthread_use_pthreads () == -1) {
		msg_err (
			"threads support is not supported in your libevent so kvstorage is not functionable");
		exit (EXIT_SUCCESS);
	}
#endif

	/* Set kvstorage options */
	if ( !config_kvstorage_worker (worker)) {
		msg_err ("cannot configure kvstorage worker, exiting");
		exit (EXIT_SUCCESS);
	}

	rspamd_signals_init (&signals, sig_handler);

	/* Set umask */
	umask (S_IWGRP | S_IWOTH | S_IROTH | S_IRGRP);

	/* Init mutexes */
#if ((GLIB_MAJOR_VERSION == 2) && (GLIB_MINOR_VERSION <= 30))
	ctx->log_mtx = g_mutex_new ();
	ctx->accept_mtx = g_mutex_new ();
#else
	ctx->log_mtx = rspamd_mempool_alloc (ctx->pool, sizeof (GMutex));
	ctx->accept_mtx = rspamd_mempool_alloc (ctx->pool, sizeof (GMutex));
	g_mutex_init (ctx->log_mtx);
	g_mutex_init (ctx->accept_mtx);
#endif

	/* Start workers threads */
	for (i = 0; i < worker->cf->count; i++) {
		thr = create_kvstorage_thread (worker, ctx, i, &signals.sa_mask);
		if (thr != NULL) {
			ctx->threads = g_list_prepend (ctx->threads, thr);
		}
	}

	sigprocmask (SIG_BLOCK, &signals.sa_mask, NULL);
	/* Signal processing cycle */
	for (;; ) {
		msg_debug ("calling sigsuspend");
		sigemptyset (&signals.sa_mask);
		sigsuspend (&signals.sa_mask);
		if (wanna_die == 1) {
			wanna_die = 0;
			tv.tv_sec = 0;
			tv.tv_usec = 0;
			msg_info ("worker's immediately shutdown is requested");
			cur = ctx->threads;
			while (cur) {
				thr = cur->data;
				while (write (thr->term_sock[1], &tv,
					sizeof (struct timeval)) == -1) {
					if (errno != EAGAIN) {
						msg_err ("write to term socket failed: %s",
							strerror (errno));
						abort ();
					}
				}
				cur = g_list_next (cur);
			}
			break;
		}
		else if (soft_wanna_die == 1) {
			soft_wanna_die = 0;
			tv.tv_sec = SOFT_SHUTDOWN_TIME;
			tv.tv_usec = 0;
			msg_info ("worker's shutdown is pending in %d sec",
				SOFT_SHUTDOWN_TIME);
			cur = ctx->threads;
			while (cur) {
				thr = cur->data;
				while (write (thr->term_sock[1], &tv,
					sizeof (struct timeval)) == -1) {
					if (errno != EAGAIN) {
						msg_err ("write to term socket failed: %s",
							strerror (errno));
						abort ();
					}
				}
				cur = g_list_next (cur);
			}
			break;
		}
		else if (do_reopen_log == 1) {
			do_reopen_log = 0;
			rspamd_log_reopen (rspamd_main->logger);
		}
	}

	msg_info ("syncing storages");
	/* Wait for threads in the recent glib */
#if ((GLIB_MAJOR_VERSION == 2) && (GLIB_MINOR_VERSION > 30))
	cur = ctx->threads;
	while (cur) {
		thr = cur->data;
		(void)g_thread_join (thr->thr);
		cur = g_list_next (cur);
	}
#endif
	destroy_kvstorage_config ();
	rspamd_log_close (rspamd_main->logger);
	exit (EXIT_SUCCESS);
}