/*-
 * Copyright 2016 Vsevolod Stakhov
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
#include "config.h"
#include "utlist.h"
#include "libserver/proxy.h"
#include "rspamd.h"
#include "smtp.h"
#include "libserver/worker_util.h"
#include "unix-std.h"

/*
 * SMTP proxy is a simple smtp proxy worker for dns resolving and
 * load balancing. It uses XCLIENT command and is designed for MTA
 * that supports that (postfix and exim).
 */

/* Upstream timeouts */
#define DEFAULT_UPSTREAM_ERROR_TIME 10
#define DEFAULT_UPSTREAM_DEAD_TIME 300
#define DEFAULT_UPSTREAM_MAXERRORS 10

#define DEFAULT_PROXY_BUF_LEN 100 * 1024

#define SMTP_MAXERRORS 15
/* Init functions */
gpointer init_smtp_proxy (struct rspamd_config *cfg);
void start_smtp_proxy (struct rspamd_worker *worker);

worker_t smtp_proxy_worker = {
	"smtp_proxy",               /* Name */
	init_smtp_proxy,            /* Init function */
	start_smtp_proxy,           /* Start function */
	RSPAMD_WORKER_HAS_SOCKET | RSPAMD_WORKER_KILLABLE,
	RSPAMD_WORKER_SOCKET_TCP,   /* TCP socket */
	RSPAMD_WORKER_VER           /* Version info */
};

static guint64 rspamd_smtp_proxy_magic = 0xf3d849189c85f12dULL;

struct smtp_proxy_ctx {
	guint64 magic;
	struct upstream_list *upstreams;
	gchar *upstreams_str;

	rspamd_mempool_t *pool;
	guint32 smtp_delay;
	guint32 delay_jitter;
	guint32 smtp_timeout_raw;
	struct timeval smtp_timeout;

	gboolean use_xclient;

	gboolean instant_reject;

	gsize proxy_buf_len;

	struct rspamd_dns_resolver *resolver;
	struct event_base *ev_base;

	GList *rbls;
};

enum rspamd_smtp_proxy_state {
	SMTP_PROXY_STATE_RESOLVE_REVERSE = 0,
	SMTP_PROXY_STATE_RESOLVE_NORMAL,
	SMTP_PROXY_STATE_RESOLVE_RBL,
	SMTP_PROXY_STATE_DELAY,
	SMTP_PROXY_STATE_GREETING,
	SMTP_PROXY_STATE_XCLIENT,
	SMTP_PROXY_STATE_PROXY,
	SMTP_PROXY_STATE_REJECT,
	SMTP_PROXY_STATE_REJECT_EMULATE
};

struct smtp_proxy_session {
	struct smtp_proxy_ctx *ctx;
	rspamd_mempool_t *pool;

	enum rspamd_smtp_proxy_state state;
	struct rspamd_worker *worker;
	struct in_addr client_addr;
	gchar *ptr_str;
	gchar *hostname;
	gchar *error;
	gchar *temp_name;
	gint sock;
	gint upstream_sock;

	struct rspamd_async_session *s;
	rspamd_io_dispatcher_t *dispatcher;

	rspamd_proxy_t *proxy;

	struct upstream *upstream;

	struct event *delay_timer;
	struct event upstream_ev;

	gboolean resolved;
	struct rspamd_dns_resolver *resolver;
	struct event_base *ev_base;

	GString *upstream_greeting;

	guint rbl_requests;
	gchar *dnsbl_applied;

	gchar *from;
	gchar *rcpt;

	guint errors;
};

static void
free_smtp_proxy_session (gpointer arg)
{
	struct smtp_proxy_session *session = arg;
	static const char fatal_smtp_error[] = "521 5.2.1 Internal error" CRLF;

	if (session) {
		if (session->dispatcher) {
			rspamd_remove_dispatcher (session->dispatcher);
		}

		if (session->upstream_greeting) {
			g_string_free (session->upstream_greeting, TRUE);
		}

		if (session->state != SMTP_PROXY_STATE_PROXY && session->state !=
			SMTP_PROXY_STATE_REJECT &&
			session->state != SMTP_PROXY_STATE_REJECT_EMULATE) {
			/* Send 521 fatal error */
			if (write (session->sock, fatal_smtp_error,
				sizeof (fatal_smtp_error)) == -1) {
				msg_err ("write error to client failed: %s", strerror (errno));
			}
		}
		else if ((session->state == SMTP_PROXY_STATE_REJECT || session->state ==
			SMTP_PROXY_STATE_REJECT_EMULATE) &&
			session->from && session->rcpt && session->dnsbl_applied) {
			msg_info ("reject by %s mail from <%s> to <%s>, ip: %s",
				session->dnsbl_applied,
				session->from,
				session->rcpt,
				inet_ntoa (session->client_addr));
		}

		close (session->sock);

		if (session->proxy) {
			rspamd_proxy_close (session->proxy);
		}
		if (session->ptr_str) {
			free (session->ptr_str);
		}
		if (session->upstream_sock != -1) {
			event_del (&session->upstream_ev);
			close (session->upstream_sock);
		}
		rspamd_mempool_delete (session->pool);
		g_slice_free1 (sizeof (struct smtp_proxy_session), session);
	}
}

static void
smtp_proxy_err_proxy (GError * err, void *arg)
{
	struct smtp_proxy_session *session = arg;

	if (err) {
		g_error_free (err);
		msg_info ("abnormally closing connection, error: %s", err->message);
	}
	/* Free buffers */
	session->state = SMTP_PROXY_STATE_REJECT;
	rspamd_session_destroy (session->s);
}

/**
 * Check whether SMTP greeting is valid
 * @param s
 * @return
 */
static gint
check_valid_smtp_greeting (GString *s)
{
	gchar *p;

	p = s->str + s->len - 1;
	if (s->len < 6 || (*p != '\n' || *(p - 1) != '\r')) {
		return 1;
	}
	p -= 5;

	while (p >= s->str) {
		/* It is fast to use memcmp here as we compare only 4 bytes */
		if (memcmp (p, "220 ", 4) == 0) {
			/* Check position */
			if (p == s->str || *(p - 1) == '\n') {
				return 1;
			}
			return 0;
		}
		else if ((*p == '5' || *p == '4' || *p == '3') &&
			g_ascii_isdigit (p[1]) && g_ascii_isdigit (p[2]) && p[3] == ' ') {
			return -1;
		}
		p--;
	}

	return 1;
}

/*
 * Handle upstream greeting
 */

static void
smtp_proxy_greeting_handler (gint fd, short what, void *arg)
{
	struct smtp_proxy_session *session = arg;
	gint r;
	gchar read_buf[BUFSIZ];

	if (what == EV_READ) {
		if (session->state == SMTP_PROXY_STATE_GREETING) {
			/* Fill greeting buffer with new portion of data */
			r = read (fd, read_buf, sizeof (read_buf) - 1);
			if (r > 0) {
				g_string_append_len (session->upstream_greeting, read_buf, r);
				/* Now search line with 220 */
				r = check_valid_smtp_greeting (session->upstream_greeting);
				if (r == 1) {
					/* Send xclient */
					if (session->ctx->use_xclient) {
						r = rspamd_snprintf (read_buf,
								sizeof (read_buf),
								"XCLIENT NAME=%s ADDR=%s" CRLF,
								session->hostname,
								inet_ntoa (session->client_addr));
						r = write (session->upstream_sock, read_buf, r);

						if (r < 0 && errno == EAGAIN) {
							/* Add write event */
							event_del (&session->upstream_ev);
							event_set (&session->upstream_ev,
								session->upstream_sock,
								EV_WRITE,
								smtp_proxy_greeting_handler,
								session);
							event_base_set (session->ev_base,
								&session->upstream_ev);
							event_add (&session->upstream_ev, NULL);
						}
						else if (r > 0) {
							session->upstream_greeting->len = 0;
							session->state = SMTP_PROXY_STATE_XCLIENT;
						}
						else {
							msg_info ("connection with %s got write error: %s",
								inet_ntoa (session->client_addr),
								strerror (errno));
							rspamd_session_destroy (session->s);
						}
					}
					else {
						event_del (&session->upstream_ev);
						/* Start direct proxy */
						r = write (session->sock,
								session->upstream_greeting->str,
								session->upstream_greeting->len);
						/* TODO: handle client's error here */
						if (r > 0) {
							session->proxy = rspamd_create_proxy (session->sock,
									session->upstream_sock,
									session->pool,
									session->ev_base,
									session->ctx->proxy_buf_len,
									&session->ctx->smtp_timeout,
									smtp_proxy_err_proxy,
									session);
							session->state = SMTP_PROXY_STATE_PROXY;
						}
						else {
							msg_info ("connection with %s got write error: %s",
								inet_ntoa (session->client_addr),
								strerror (errno));
							rspamd_session_destroy (session->s);
						}
					}
				}
				else if (r == -1) {
					/* Proxy sent 500 error */
					msg_info ("connection with %s got smtp error for greeting",
						rspamd_upstream_name (session->upstream));
					rspamd_session_destroy (session->s);
				}
			}
			else {
				msg_info ("connection with %s got read error: %s",
					rspamd_upstream_name (session->upstream),
					strerror (errno));
				rspamd_session_destroy (session->s);
			}
		}
		else if (session->state == SMTP_PROXY_STATE_XCLIENT) {
			/* Fill greeting buffer with new portion of data */
			r = read (fd, read_buf, sizeof (read_buf) - 1);
			if (r > 0) {
				g_string_append_len (session->upstream_greeting, read_buf, r);
				/* Now search line with 220 */
				r = check_valid_smtp_greeting (session->upstream_greeting);
				if (r == 1) {
					event_del (&session->upstream_ev);
					/* Start direct proxy */
					r = write (session->sock,
							session->upstream_greeting->str,
							session->upstream_greeting->len);
					/* TODO: handle client's error here */
					if (r > 0) {
						session->proxy = rspamd_create_proxy (session->sock,
								session->upstream_sock,
								session->pool,
								session->ev_base,
								session->ctx->proxy_buf_len,
								&session->ctx->smtp_timeout,
								smtp_proxy_err_proxy,
								session);
						session->state = SMTP_PROXY_STATE_PROXY;
					}
					else {
						msg_info ("connection with %s got write error: %s",
							inet_ntoa (session->client_addr),
							strerror (errno));
						rspamd_session_destroy (session->s);
					}
				}
				else if (r == -1) {
					/* Proxy sent 500 error */
					msg_info ("connection with %s got smtp error for xclient",
							rspamd_upstream_name (session->upstream));
					rspamd_session_destroy (session->s);
				}
			}
		}
		else {
			msg_info ("connection with %s got read event at improper state: %d",
				rspamd_upstream_name (session->upstream),
				session->state);
			rspamd_session_destroy (session->s);
		}
	}
	else if (what == EV_WRITE) {
		if (session->state == SMTP_PROXY_STATE_GREETING) {
			/* Send xclient again */
			r = rspamd_snprintf (read_buf,
					sizeof (read_buf),
					"XCLIENT NAME=%s ADDR=%s" CRLF,
					session->hostname,
					inet_ntoa (session->client_addr));
			r = write (session->upstream_sock, read_buf, r);

			if (r < 0 && errno == EAGAIN) {
				/* Add write event */
				event_del (&session->upstream_ev);
				event_set (&session->upstream_ev, session->upstream_sock,
					EV_WRITE, smtp_proxy_greeting_handler, session);
				event_base_set (session->ev_base, &session->upstream_ev);
				event_add (&session->upstream_ev, NULL);
			}
			else if (r > 0) {
				session->upstream_greeting->len = 0;
				session->state = SMTP_PROXY_STATE_XCLIENT;
				event_del (&session->upstream_ev);
				event_set (&session->upstream_ev, session->upstream_sock,
					EV_READ | EV_PERSIST, smtp_proxy_greeting_handler, session);
				event_base_set (session->ev_base, &session->upstream_ev);
				event_add (&session->upstream_ev, NULL);
			}
			else {
				msg_info ("connection with %s got write error: %s",
					rspamd_upstream_name (session->upstream),
					strerror (errno));
				rspamd_session_destroy (session->s);
			}
		}
		else {
			msg_info (
				"connection with %s got write event at improper state: %d",
				rspamd_upstream_name (session->upstream),
				session->state);
			rspamd_session_destroy (session->s);
		}
	}
	else {
		/* Timeout */
		msg_info ("connection with %s timed out",
				rspamd_upstream_name (session->upstream));
		rspamd_session_destroy (session->s);
	}
}

static gboolean
create_smtp_proxy_upstream_connection (struct smtp_proxy_session *session)
{
	struct upstream *selected;

	/* Try to select upstream */
	selected = rspamd_upstream_get (session->ctx->upstreams,
			RSPAMD_UPSTREAM_ROUND_ROBIN, NULL, 0);
	if (selected == NULL) {
		msg_err ("no upstreams suitable found");
		return FALSE;
	}

	session->upstream = selected;

	/* Now try to create socket */
	session->upstream_sock = rspamd_inet_address_connect (
			rspamd_upstream_addr (selected), SOCK_STREAM, TRUE);
	if (session->upstream_sock == -1) {
		msg_err ("cannot make a connection to %s",
				rspamd_upstream_name (session->upstream));
		rspamd_upstream_fail (selected);
		return FALSE;
	}
	/* Create a proxy for upstream connection */
	rspamd_dispatcher_pause (session->dispatcher);
	/* First of all get upstream's greeting */
	session->state = SMTP_PROXY_STATE_GREETING;

	event_set (&session->upstream_ev,
		session->upstream_sock,
		EV_READ | EV_PERSIST,
		smtp_proxy_greeting_handler,
		session);
	event_base_set (session->ev_base, &session->upstream_ev);
	event_add (&session->upstream_ev, &session->ctx->smtp_timeout);

	session->upstream_greeting = g_string_sized_new (BUFSIZ);

	return TRUE;
}

static void
smtp_dnsbl_cb (struct rdns_reply *reply, void *arg)
{
	struct smtp_proxy_session *session = arg;
	const gchar *p;
	gint dots = 0;
	const struct rdns_request_name *req_name;

	session->rbl_requests--;

	req_name = rdns_request_get_name (reply->request, NULL);

	msg_debug ("got reply for %s: %s", req_name[0].name,
		rdns_strerror (reply->code));

	if (session->state != SMTP_PROXY_STATE_REJECT) {

		if (reply->code == RDNS_RC_NOERROR) {
			/* This means that address is in dnsbl */
			p = req_name[0].name;
			while (*p) {
				if (*p == '.') {
					dots++;
				}
				if (dots == 4) {
					session->dnsbl_applied = (gchar *)p + 1;
					break;
				}
				p++;
			}
			session->state = SMTP_PROXY_STATE_REJECT;
		}
	}

	if (session->rbl_requests == 0) {
		if (session->state != SMTP_PROXY_STATE_REJECT) {
			/* Make proxy */
			if (!create_smtp_proxy_upstream_connection (session)) {
				rspamd_dispatcher_restore (session->dispatcher);
			}
		}
		else {
			if (session->ctx->instant_reject) {
				msg_info ("reject %s is denied by dnsbl: %s",
					inet_ntoa (session->client_addr), session->dnsbl_applied);
				if (!rspamd_dispatcher_write (session->dispatcher,
					make_smtp_error (session->pool, 521,
					"%s Client denied by %s", "5.2.1", session->dnsbl_applied),
					0, FALSE, TRUE)) {
					msg_err ("cannot write smtp error");
				}
			}
			else {
				/* Emulate fake smtp session */
				rspamd_set_dispatcher_policy (session->dispatcher,
					BUFFER_LINE,
					0);
				if (!rspamd_dispatcher_write (session->dispatcher,
					make_smtp_error (session->pool, 220, "smtp ready"),
					0, FALSE, TRUE)) {
					msg_err ("cannot write smtp reply");
				}
			}
			rspamd_dispatcher_restore (session->dispatcher);
		}
	}
}

/*
 * Create requests to all rbls
 */
static void
make_rbl_requests (struct smtp_proxy_session *session)
{
	GList *cur;
	gchar *p, *dst;
	guint len;

	cur = session->ctx->rbls;
	while (cur) {
		len = INET_ADDRSTRLEN + strlen (cur->data) + 1;
		dst = rspamd_mempool_alloc (session->pool, len);
		/* Print ipv4 addr */
		p = (gchar *)&session->client_addr.s_addr;
		rspamd_snprintf (dst, len, "%ud.%ud.%ud.%ud.%s", (guint)p[3],
			(guint)p[2], (guint)p[1], (guint)p[0], cur->data);
		if (make_dns_request (session->resolver, session->s, session->pool,
			smtp_dnsbl_cb, session, RDNS_REQUEST_A, dst)) {
			session->rbl_requests++;
			msg_debug ("send request to %s", dst);
		}
		cur = g_list_next (cur);
	}

	if (session->rbl_requests == 0) {
		/* Create proxy */
		if (!create_smtp_proxy_upstream_connection (session)) {
			rspamd_dispatcher_restore (session->dispatcher);
		}
	}
}

/* Resolving and delay handlers */
/*
 * Return from a delay
 */
static void
smtp_delay_handler (gint fd, short what, void *arg)
{
	struct smtp_proxy_session *session = arg;

	rspamd_session_remove_event (session->s, (event_finalizer_t) event_del,
		session->delay_timer);
	if (session->state == SMTP_PROXY_STATE_DELAY) {
		/* TODO: Create upstream connection here */
		if (session->ctx->rbls) {
			make_rbl_requests (session);
		}
		else {
			if (!create_smtp_proxy_upstream_connection (session)) {
				rspamd_dispatcher_restore (session->dispatcher);
			}
		}
	}
	else {
		/* TODO: Write error here */
		session->state = SMTP_PROXY_STATE_REJECT;
		if (!rspamd_dispatcher_write (session->dispatcher,
			make_smtp_error (session->pool, 521,
			"%s Improper use of SMTP command pipelining", "5.2.1"),
			0, FALSE, TRUE)) {
			msg_err ("cannot write smtp error");
		}
		rspamd_dispatcher_restore (session->dispatcher);
	}
}

/*
 * Make delay for a client
 */
static void
smtp_make_delay (struct smtp_proxy_session *session)
{
	struct event *tev;
	struct timeval *tv;
	gint32 jitter;

	if (session->ctx->smtp_delay != 0 && session->state ==
		SMTP_PROXY_STATE_DELAY) {
		tev = rspamd_mempool_alloc (session->pool, sizeof(struct event));
		tv = rspamd_mempool_alloc (session->pool, sizeof(struct timeval));
		if (session->ctx->delay_jitter != 0) {
			jitter = g_random_int_range (0, session->ctx->delay_jitter);
			msec_to_tv (session->ctx->smtp_delay + jitter, tv);
		}
		else {
			msec_to_tv (session->ctx->smtp_delay, tv);
		}

		evtimer_set (tev, smtp_delay_handler, session);
		evtimer_add (tev, tv);
		rspamd_session_add_event (session->s, (event_finalizer_t) event_del, tev,
			g_quark_from_static_string ("smtp proxy"));
		session->delay_timer = tev;
	}
	else if (session->state == SMTP_PROXY_STATE_DELAY) {
		/* TODO: Create upstream connection here */
		if (session->ctx->rbls) {
			make_rbl_requests (session);
		}
		else {
			if (!create_smtp_proxy_upstream_connection (session)) {
				rspamd_dispatcher_restore (session->dispatcher);
			}
		}
	}
}

/*
 * Handle DNS replies
 */
static void
smtp_dns_cb (struct rdns_reply *reply, void *arg)
{
	struct smtp_proxy_session *session = arg;
	gint res = 0;
	struct rdns_reply_entry *elt;

	switch (session->state)
	{
	case SMTP_PROXY_STATE_RESOLVE_REVERSE:
		/* Parse reverse reply and start resolve of this ip */
		if (reply->code != RDNS_RC_NOERROR) {
			rspamd_conditional_debug (session->worker->srv->logger,
				NULL, "rdns", NULL, G_STRFUNC, "DNS error: %s",
				rdns_strerror (reply->code));

			if (reply->code == RDNS_RC_NXDOMAIN) {
				session->hostname = rspamd_mempool_strdup (session->pool,
						XCLIENT_HOST_UNAVAILABLE);
			}
			else {
				session->hostname = rspamd_mempool_strdup (session->pool,
						XCLIENT_HOST_TEMPFAIL);
			}
			session->state = SMTP_PROXY_STATE_DELAY;
			smtp_make_delay (session);
		}
		else {
			if (reply->entries) {
				elt = reply->entries;
				session->hostname = rspamd_mempool_strdup (session->pool,
						elt->content.ptr.name);
				session->state = SMTP_PROXY_STATE_RESOLVE_NORMAL;
				make_dns_request (session->resolver, session->s, session->pool,
					smtp_dns_cb, session, RDNS_REQUEST_A, session->hostname);

			}
		}
		break;
	case SMTP_PROXY_STATE_RESOLVE_NORMAL:
		if (reply->code != RDNS_RC_NOERROR) {
			rspamd_conditional_debug (session->worker->srv->logger,
				NULL, "rdns", NULL, G_STRFUNC, "DNS error: %s",
				rdns_strerror (reply->code));

			if (reply->code == RDNS_RC_NXDOMAIN) {
				session->hostname = rspamd_mempool_strdup (session->pool,
						XCLIENT_HOST_UNAVAILABLE);
			}
			else {
				session->hostname = rspamd_mempool_strdup (session->pool,
						XCLIENT_HOST_TEMPFAIL);
			}
			session->state = SMTP_PROXY_STATE_DELAY;
			smtp_make_delay (session);
		}
		else {
			res = 0;
			LL_FOREACH (reply->entries, elt)
			{
				if (memcmp (&session->client_addr, &elt->content.a.addr,
					sizeof(struct in_addr)) == 0) {
					res = 1;
					session->resolved = TRUE;
					break;
				}
			}

			if (res == 0) {
				msg_info (
					"cannot find address for hostname: %s, ip: %s",
					session->hostname,
					inet_ntoa (session->client_addr));
				session->hostname = rspamd_mempool_strdup (session->pool,
						XCLIENT_HOST_UNAVAILABLE);
			}
			session->state = SMTP_PROXY_STATE_DELAY;
			smtp_make_delay (session);
		}
		break;
	default:
		/* TODO: write something about pipelining */
		break;
	}
}

static void
proxy_parse_smtp_input (rspamd_ftok_t *line, struct smtp_proxy_session *session)
{
	const gchar *p, *c, *end;
	gsize len;

	p = line->begin;
	end = line->begin + line->len;
	if (line->len >= sizeof("rcpt to: ") - 1 &&
		(*p == 'r' || *p == 'R') && session->rcpt == NULL) {
		if (g_ascii_strncasecmp (p, "rcpt to: ",
			sizeof ("rcpt to: ") - 1) == 0) {
			p += sizeof ("rcpt to: ") - 1;
			/* Skip spaces */
			while ((g_ascii_isspace (*p) || *p == '<') && p < end) {
				p++;
			}
			c = p;
			while (!(g_ascii_isspace (*p) || *p == '>') && p < end) {
				p++;
			}
			len = p - c;
			session->rcpt = rspamd_mempool_alloc (session->pool, len + 1);
			rspamd_strlcpy (session->rcpt, c, len + 1);
		}
	}
	else if (line->len >= sizeof("mail from: ") - 1 &&
		(*p == 'm' || *p == 'M') && session->from == NULL) {
		if (g_ascii_strncasecmp (p, "mail from: ", sizeof ("mail from: ") -
			1) == 0) {
			p += sizeof ("mail from: ") - 1;
			/* Skip spaces */
			while ((g_ascii_isspace (*p) || *p == '<') && p < end) {
				p++;
			}
			c = p;
			while (!(g_ascii_isspace (*p) || *p == '>') && p < end) {
				p++;
			}
			len = p - c;
			session->from = rspamd_mempool_alloc (session->pool, len + 1);
			rspamd_strlcpy (session->from, c, len + 1);
		}
	}
	else if (line->len >= sizeof ("quit") - 1 && (*p == 'q' || *p == 'Q')) {
		if (g_ascii_strncasecmp (p, "quit", sizeof ("quit") - 1) == 0) {
			session->state = SMTP_PROXY_STATE_REJECT;
		}
	}
}

/*
 * Callback that is called when there is data to read in buffer
 */
static gboolean
smtp_proxy_read_socket (rspamd_ftok_t * in, void *arg)
{
	struct smtp_proxy_session *session = arg;
	const gchar *p;

	if (session->state != SMTP_PROXY_STATE_REJECT_EMULATE) {
		/* This can be called only if client is using invalid pipelining */
		session->state = SMTP_PROXY_STATE_REJECT;
		if (!rspamd_dispatcher_write (session->dispatcher,
			make_smtp_error (session->pool, 521,
			"%s Improper use of SMTP command pipelining", "5.2.1"),
			0, FALSE, TRUE)) {
			msg_err ("cannot write smtp error");
		}
		rspamd_session_destroy (session->s);
	}
	else {
		/* Try to extract data */
		p = in->begin;
		if (in->len >= sizeof ("helo") - 1 &&
			(*p == 'h' || *p == 'H' || *p == 'e' || *p == 'E')) {
			return rspamd_dispatcher_write (session->dispatcher,
					   "220 smtp ready" CRLF,
					   0, FALSE, TRUE);
		}
		else if (in->len > 0) {
			proxy_parse_smtp_input (in, session);
		}
		if (session->state == SMTP_PROXY_STATE_REJECT) {
			/* Received QUIT command */
			if (!rspamd_dispatcher_write (session->dispatcher,
				"221 2.0.0 Bye" CRLF,
				0, FALSE, TRUE)) {
				msg_err ("cannot write smtp error");
			}
			rspamd_session_destroy (session->s);
			return FALSE;
		}
		if (session->rcpt != NULL) {
			session->errors++;
			if (session->errors > SMTP_MAXERRORS) {
				if (!rspamd_dispatcher_write (session->dispatcher,
					"521 5.2.1 Maximum errors reached" CRLF,
					0, FALSE, TRUE)) {
					msg_err ("cannot write smtp error");
				}
				rspamd_session_destroy (session->s);
				return FALSE;
			}
			return rspamd_dispatcher_write (session->dispatcher,
					   make_smtp_error (session->pool,
					   521,
					   "%s Client denied by %s",
					   "5.2.1",
					   session->dnsbl_applied),
					   0, FALSE, TRUE);
		}
		else {
			return rspamd_dispatcher_write (session->dispatcher,
					   "250 smtp ready" CRLF,
					   0, FALSE, TRUE);
		}
	}

	return FALSE;
}

/*
 * Actually called only if something goes wrong
 */
static gboolean
smtp_proxy_write_socket (void *arg)
{
	struct smtp_proxy_session *session = arg;

	if (session->ctx->instant_reject) {
		rspamd_session_destroy (session->s);
		return FALSE;
	}
	else {
		session->state = SMTP_PROXY_STATE_REJECT_EMULATE;
	}

	return TRUE;
}

/*
 * Called if something goes wrong
 */
static void
smtp_proxy_err_socket (GError * err, void *arg)
{
	struct smtp_proxy_session *session = arg;

	if (err) {
		if (err->code == ETIMEDOUT) {
			/* Write smtp error */
			if (!rspamd_dispatcher_write (session->dispatcher,
				"421 4.4.2 Error: timeout exceeded" CRLF,
				0, FALSE, TRUE)) {
				msg_err ("cannot write smtp error");
			}
		}
		msg_info ("abnormally closing connection, error: %s", err->message);
		g_error_free (err);
	}
	/* Free buffers */
	rspamd_session_destroy (session->s);
}

/*
 * Accept new connection and construct session
 */
static void
accept_socket (gint fd, short what, void *arg)
{
	struct rspamd_worker *worker = (struct rspamd_worker *)arg;
	struct smtp_proxy_session *session;
	struct smtp_proxy_ctx *ctx;
	rspamd_inet_addr_t *addr;
	gint nfd;

	ctx = worker->ctx;

	if ((nfd =
		rspamd_accept_from_socket (fd, &addr, worker->accept_events)) == -1) {
		msg_warn ("accept failed: %s", strerror (errno));
		return;
	}
	/* Check for EAGAIN */
	if (nfd == 0) {
		return;
	}

	msg_info ("accepted connection from %s port %d",
		rspamd_inet_address_to_string (addr),
		rspamd_inet_address_get_port (addr));

	ctx = worker->ctx;
	session = g_slice_alloc0 (sizeof (struct smtp_proxy_session));
	session->pool = rspamd_mempool_new (rspamd_mempool_suggest_size (), NULL);


	session->sock = nfd;
	session->worker = worker;
	session->ctx = ctx;
	session->resolver = ctx->resolver;
	session->ev_base = ctx->ev_base;
	session->upstream_sock = -1;
	session->ptr_str = rdns_generate_ptr_from_str (rspamd_inet_address_to_string (
				addr));
	worker->srv->stat->connections_count++;

	/* Resolve client's addr */
	/* Set up async session */
	session->s = rspamd_session_create (session->pool,
			NULL,
			NULL,
			free_smtp_proxy_session,
			session);
	rspamd_inet_address_destroy (addr);
	session->state = SMTP_PROXY_STATE_RESOLVE_REVERSE;
	if (!make_dns_request (session->resolver, session->s, session->pool,
		smtp_dns_cb, session, RDNS_REQUEST_PTR, session->ptr_str)) {
		msg_err ("cannot resolve %s", inet_ntoa (session->client_addr));
		g_slice_free1 (sizeof (struct smtp_proxy_session), session);
		close (nfd);
		return;
	}
	else {
		session->dispatcher = rspamd_create_dispatcher (session->ev_base,
				nfd,
				BUFFER_ANY,
				smtp_proxy_read_socket,
				smtp_proxy_write_socket,
				smtp_proxy_err_socket,
				&session->ctx->smtp_timeout,
				session);
		session->dispatcher->peer_addr = session->client_addr.s_addr;
	}
}

gpointer
init_smtp_proxy (struct rspamd_config *cfg)
{
	struct smtp_proxy_ctx *ctx;
	GQuark type;

	type = g_quark_try_string ("smtp_proxy");

	ctx = g_malloc0 (sizeof (struct smtp_worker_ctx));
	ctx->magic = rspamd_smtp_proxy_magic;
	ctx->pool = rspamd_mempool_new (rspamd_mempool_suggest_size (), NULL);

	/* Set default values */
	ctx->smtp_timeout_raw = 300000;
	ctx->smtp_delay = 0;
	ctx->instant_reject = TRUE;

	rspamd_rcl_register_worker_option (cfg,
			type,
			"upstreams",
			rspamd_rcl_parse_struct_string,
			ctx,
			G_STRUCT_OFFSET (struct smtp_proxy_ctx, upstreams_str),
			0,
			"List of upstream SMTP servers");

	rspamd_rcl_register_worker_option (cfg,
			type,
			"timeout",
			rspamd_rcl_parse_struct_time,
			ctx,
			G_STRUCT_OFFSET (struct smtp_proxy_ctx,
					smtp_timeout_raw),
			RSPAMD_CL_FLAG_TIME_UINT_32,
			"IO timeout");

	rspamd_rcl_register_worker_option (cfg,
			type,
			"delay",
			rspamd_rcl_parse_struct_time,
			ctx,
			G_STRUCT_OFFSET (struct smtp_proxy_ctx,
					smtp_delay),
			RSPAMD_CL_FLAG_TIME_UINT_32,
			"SMTP greeting delay");

	rspamd_rcl_register_worker_option (cfg,
			type,
			"jitter",
			rspamd_rcl_parse_struct_time,
			ctx,
			G_STRUCT_OFFSET (struct smtp_proxy_ctx,
					delay_jitter),
			RSPAMD_CL_FLAG_TIME_UINT_32,
			"Jitter atribute for SMTP delay");

	rspamd_rcl_register_worker_option (cfg,
			type,
			"xclient",
			rspamd_rcl_parse_struct_boolean,
			ctx,
			G_STRUCT_OFFSET (struct smtp_proxy_ctx, use_xclient),
			0,
			"Use XCLIENT protocol for upstream communication");

	rspamd_rcl_register_worker_option (cfg,
			type,
			"instant_reject",
			rspamd_rcl_parse_struct_boolean,
			ctx,
			G_STRUCT_OFFSET (struct smtp_proxy_ctx, instant_reject),
			0,
			"Reject invalid pipelining");

	rspamd_rcl_register_worker_option (cfg,
			type,
			"proxy_buffer",
			rspamd_rcl_parse_struct_integer,
			ctx,
			G_STRUCT_OFFSET (struct smtp_proxy_ctx,
					proxy_buf_len),
			RSPAMD_CL_FLAG_INT_32,
			"Adjust SMTP buffer size");

	rspamd_rcl_register_worker_option (cfg,
			type,
			"dnsbl",
			rspamd_rcl_parse_struct_string_list,
			ctx,
			G_STRUCT_OFFSET (struct smtp_proxy_ctx, rbls),
			0,
			"Use the following DNS lists as IP blacklists");

	return ctx;
}

/* Make post-init configuration */
static gboolean
config_smtp_proxy_worker (struct rspamd_worker *worker)
{
	struct smtp_proxy_ctx *ctx = worker->ctx;
	gchar *value;

	/* Init timeval */
	msec_to_tv (ctx->smtp_timeout_raw, &ctx->smtp_timeout);

	/* Init upstreams */
	if ((value = ctx->upstreams_str) != NULL) {
		if (!rspamd_upstreams_parse_line (ctx->upstreams, value, 25, NULL)) {
			msg_err ("cannot parse any valid upstream");
			return FALSE;
		}
	}
	else {
		msg_err ("no upstreams defined, don't know what to do");
		return FALSE;
	}

	if (ctx->proxy_buf_len == 0) {
		ctx->proxy_buf_len = DEFAULT_PROXY_BUF_LEN;
	}

	return TRUE;
}

/*
 * Start worker process
 */
void
start_smtp_proxy (struct rspamd_worker *worker)
{
	struct smtp_proxy_ctx *ctx = worker->ctx;

	ctx->ev_base = rspamd_prepare_worker (worker, "smtp_proxy", accept_socket);

	/* Set smtp options */
	if ( !config_smtp_proxy_worker (worker)) {
		msg_err ("cannot configure smtp worker, exiting");
		exit (EXIT_SUCCESS);
	}

	/* DNS resolver */
	ctx->resolver = dns_resolver_init (worker->srv->logger,
			ctx->ev_base,
			worker->srv->cfg);

	rspamd_upstreams_library_config (worker->srv->cfg, worker->srv->cfg->ups_ctx,
			ctx->ev_base, ctx->resolver->r);
	/* Set umask */
	umask (S_IWGRP | S_IWOTH | S_IROTH | S_IRGRP);

	event_base_loop (ctx->ev_base, 0);
	rspamd_worker_block_signals ();

	rspamd_log_close (worker->srv->logger);
	exit (EXIT_SUCCESS);
}