aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorVsevolod Stakhov <vsevolod@rambler-co.ru>2012-04-23 21:40:01 +0400
committerVsevolod Stakhov <vsevolod@rambler-co.ru>2012-04-23 21:40:01 +0400
commitfe5e1614f36236654623667df4f317ae5e5e1806 (patch)
treebccbb4f2b89bcac52cd8db7640f94048e3bacab8
parent9a8f854b5bc2279a8d2ee6ff3bac362dac635372 (diff)
downloadrspamd-fe5e1614f36236654623667df4f317ae5e5e1806.tar.gz
rspamd-fe5e1614f36236654623667df4f317ae5e5e1806.zip
* Add support of XCLIENT to the smtp proxy.
-rw-r--r--src/proxy.c17
-rw-r--r--src/proxy.h3
-rw-r--r--src/smtp_proxy.c240
3 files changed, 239 insertions, 21 deletions
diff --git a/src/proxy.c b/src/proxy.c
index bd8503c71..b6f1917ae 100644
--- a/src/proxy.c
+++ b/src/proxy.c
@@ -34,14 +34,17 @@ proxy_error_quark (void)
return g_quark_from_static_string ("proxy-error");
}
-static void
+void
rspamd_proxy_close (rspamd_proxy_t *proxy)
{
- close (proxy->cfd);
- close (proxy->bfd);
+ if (!proxy->closed) {
+ close (proxy->cfd);
+ close (proxy->bfd);
- event_del (&proxy->client_ev);
- event_del (&proxy->backend_ev);
+ event_del (&proxy->client_ev);
+ event_del (&proxy->backend_ev);
+ proxy->closed = TRUE;
+ }
}
static void
@@ -215,8 +218,8 @@ rspamd_create_proxy (gint cfd, gint bfd, memory_pool_t *pool, struct event_base
new = memory_pool_alloc0 (pool, sizeof (rspamd_proxy_t));
- new->cfd = cfd;
- new->bfd = bfd;
+ new->cfd = dup (cfd);
+ new->bfd = dup (bfd);
new->pool = pool;
new->base = base;
new->bufsize = bufsize;
diff --git a/src/proxy.h b/src/proxy.h
index 6ebb08d56..970957e56 100644
--- a/src/proxy.h
+++ b/src/proxy.h
@@ -48,6 +48,7 @@ typedef struct rspamd_proxy_s {
gint buf_offset; /**< offset to write */
gpointer user_data; /**< user's data for callbacks */
struct timeval *tv; /**< timeout for communications */
+ gboolean closed; /**< whether descriptors are closed */
} rspamd_proxy_t;
/**
@@ -63,4 +64,6 @@ rspamd_proxy_t* rspamd_create_proxy (gint cfd, gint bfd, memory_pool_t *pool,
struct event_base *base, gsize bufsize, struct timeval *tv,
dispatcher_err_callback_t err_cb, gpointer ud);
+void rspamd_proxy_close (rspamd_proxy_t *proxy);
+
#endif /* PROXY_H_ */
diff --git a/src/smtp_proxy.c b/src/smtp_proxy.c
index 1ada5ff1e..97e56a956 100644
--- a/src/smtp_proxy.c
+++ b/src/smtp_proxy.c
@@ -85,7 +85,10 @@ struct smtp_proxy_ctx {
enum rspamd_smtp_proxy_state {
SMTP_PROXY_STATE_RESOLVE_REVERSE = 0,
SMTP_PROXY_STATE_RESOLVE_NORMAL,
- SMTP_PROXY_STATE_DELAY
+ SMTP_PROXY_STATE_DELAY,
+ SMTP_PROXY_STATE_GREETING,
+ SMTP_PROXY_STATE_XCLIENT,
+ SMTP_PROXY_STATE_PROXY
};
struct smtp_proxy_session {
@@ -109,10 +112,13 @@ struct smtp_proxy_session {
struct smtp_upstream *upstream;
struct event *delay_timer;
+ struct event upstream_ev;
gboolean resolved;
struct rspamd_dns_resolver *resolver;
struct event_base *ev_base;
+
+ GString *upstream_greeting;
};
#ifndef HAVE_SA_SIGINFO
@@ -180,13 +186,31 @@ static void
free_smtp_proxy_session (gpointer arg)
{
struct smtp_proxy_session *session = arg;
+ static const char fatal_smtp_error[] = "521 5.2.1 Internal error" CRLF;
if (session) {
if (session->dispatcher) {
rspamd_remove_dispatcher (session->dispatcher);
}
+ if (session->upstream_greeting) {
+ g_string_free (session->upstream_greeting, TRUE);
+ }
+
+ if (session->state != SMTP_PROXY_STATE_PROXY) {
+ /* Send 521 fatal error */
+ write (session->sock, fatal_smtp_error, sizeof (fatal_smtp_error));
+ }
+
close (session->sock);
+
+ if (session->proxy) {
+ rspamd_proxy_close (session->proxy);
+ }
+ if (session->upstream_sock != -1) {
+ event_del (&session->upstream_ev);
+ close (session->upstream_sock);
+ }
memory_pool_delete (session->pool);
g_slice_free1 (sizeof (struct smtp_proxy_session), session);
}
@@ -205,11 +229,193 @@ smtp_proxy_err_proxy (GError * err, void *arg)
destroy_session (session->s);
}
+/**
+ * Check whether SMTP greeting is valid
+ * @param s
+ * @return
+ */
+static gint
+check_valid_smtp_greeting (GString *s)
+{
+ gchar *p;
+
+ p = s->str + s->len - 1;
+ if (s->len < 6 || (*p != '\n' || *(p - 1) != '\r')) {
+ return 1;
+ }
+ p -= 5;
+
+ while (p >= s->str) {
+ /* It is fast to use memcmp here as we compare only 4 bytes */
+ if (memcmp (p, "220 ", 4) == 0) {
+ /* Check position */
+ if (p == s->str || *(p - 1) == '\n') {
+ return 1;
+ }
+ return 0;
+ }
+ else if ((*p == '5' || *p == '4' || *p == '3') &&
+ g_ascii_isdigit (p[1]) && g_ascii_isdigit (p[2]) && p[3] == ' ') {
+ return -1;
+ }
+ p --;
+ }
+
+ return 1;
+}
+
+/*
+ * Handle upstream greeting
+ */
+
+static void
+smtp_proxy_greeting_handler (gint fd, short what, void *arg)
+{
+ struct smtp_proxy_session *session = arg;
+ gint r;
+ gchar read_buf[BUFSIZ];
+
+ if (what == EV_READ) {
+ if (session->state == SMTP_PROXY_STATE_GREETING) {
+ /* Fill greeting buffer with new portion of data */
+ r = read (fd, read_buf, sizeof (read_buf) - 1);
+ if (r > 0) {
+ g_string_append_len (session->upstream_greeting, read_buf, r);
+ /* Now search line with 220 */
+ r = check_valid_smtp_greeting (session->upstream_greeting);
+ if (r == 1) {
+ /* Send xclient */
+ if (session->ctx->use_xclient) {
+ r = rspamd_snprintf (read_buf, sizeof (read_buf), "XCLIENT NAME=%s ADDR=%s" CRLF,
+ session->hostname, inet_ntoa (session->client_addr));
+ r = write (session->upstream_sock, read_buf, r);
+
+ if (r < 0 && errno == EAGAIN) {
+ /* Add write event */
+ event_del (&session->upstream_ev);
+ event_set (&session->upstream_ev, session->upstream_sock,
+ EV_WRITE, smtp_proxy_greeting_handler, session);
+ event_base_set (session->ev_base, &session->upstream_ev);
+ event_add (&session->upstream_ev, NULL);
+ }
+ else if (r > 0) {
+ session->upstream_greeting->len = 0;
+ session->state = SMTP_PROXY_STATE_XCLIENT;
+ }
+ else {
+ msg_info ("connection with %s got write error: %s", inet_ntoa (session->client_addr), strerror (errno));
+ destroy_session (session->s);
+ }
+ }
+ else {
+ event_del (&session->upstream_ev);
+ /* Start direct proxy */
+ r = write (session->sock, session->upstream_greeting->str, session->upstream_greeting->len);
+ /* TODO: handle client's error here */
+ if (r > 0) {
+ session->proxy = rspamd_create_proxy (session->sock, session->upstream_sock, session->pool,
+ session->ev_base, session->ctx->proxy_buf_len,
+ &session->ctx->smtp_timeout, smtp_proxy_err_proxy, session);
+ session->state = SMTP_PROXY_STATE_PROXY;
+ }
+ else {
+ msg_info ("connection with %s got write error: %s", inet_ntoa (session->client_addr), strerror (errno));
+ destroy_session (session->s);
+ }
+ }
+ }
+ else if (r == -1) {
+ /* Proxy sent 500 error */
+ msg_info ("connection with %s got smtp error for greeting", session->upstream->name);
+ destroy_session (session->s);
+ }
+ }
+ else {
+ msg_info ("connection with %s got read error: %s", session->upstream->name, strerror (errno));
+ destroy_session (session->s);
+ }
+ }
+ else if (session->state == SMTP_PROXY_STATE_XCLIENT) {
+ /* Fill greeting buffer with new portion of data */
+ r = read (fd, read_buf, sizeof (read_buf) - 1);
+ if (r > 0) {
+ g_string_append_len (session->upstream_greeting, read_buf, r);
+ /* Now search line with 220 */
+ r = check_valid_smtp_greeting (session->upstream_greeting);
+ if (r == 1) {
+ event_del (&session->upstream_ev);
+ /* Start direct proxy */
+ r = write (session->sock, session->upstream_greeting->str, session->upstream_greeting->len);
+ /* TODO: handle client's error here */
+ if (r > 0) {
+ session->proxy = rspamd_create_proxy (session->sock, session->upstream_sock, session->pool,
+ session->ev_base, session->ctx->proxy_buf_len,
+ &session->ctx->smtp_timeout, smtp_proxy_err_proxy, session);
+ session->state = SMTP_PROXY_STATE_PROXY;
+ }
+ else {
+ msg_info ("connection with %s got write error: %s", inet_ntoa (session->client_addr), strerror (errno));
+ destroy_session (session->s);
+ }
+ }
+ else if (r == -1) {
+ /* Proxy sent 500 error */
+ msg_info ("connection with %s got smtp error for xclient", session->upstream->name);
+ destroy_session (session->s);
+ }
+ }
+ }
+ else {
+ msg_info ("connection with %s got read event at improper state: %d", session->upstream->name, session->state);
+ destroy_session (session->s);
+ }
+ }
+ else if (what == EV_WRITE) {
+ if (session->state == SMTP_PROXY_STATE_GREETING) {
+ /* Send xclient again */
+ r = rspamd_snprintf (read_buf, sizeof (read_buf), "XCLIENT NAME=%s ADDR=%s" CRLF,
+ session->hostname, inet_ntoa (session->client_addr));
+ r = write (session->upstream_sock, read_buf, r);
+
+ if (r < 0 && errno == EAGAIN) {
+ /* Add write event */
+ event_del (&session->upstream_ev);
+ event_set (&session->upstream_ev, session->upstream_sock,
+ EV_WRITE, smtp_proxy_greeting_handler, session);
+ event_base_set (session->ev_base, &session->upstream_ev);
+ event_add (&session->upstream_ev, NULL);
+ }
+ else if (r > 0) {
+ session->upstream_greeting->len = 0;
+ session->state = SMTP_PROXY_STATE_XCLIENT;
+ event_del (&session->upstream_ev);
+ event_set (&session->upstream_ev, session->upstream_sock,
+ EV_READ | EV_PERSIST, smtp_proxy_greeting_handler, session);
+ event_base_set (session->ev_base, &session->upstream_ev);
+ event_add (&session->upstream_ev, NULL);
+ }
+ else {
+ msg_info ("connection with %s got write error: %s", session->upstream->name, strerror (errno));
+ destroy_session (session->s);
+ }
+ }
+ else {
+ msg_info ("connection with %s got write event at improper state: %d", session->upstream->name, session->state);
+ destroy_session (session->s);
+ }
+ }
+ else {
+ /* Timeout */
+ msg_info ("connection with %s timed out", session->upstream->name);
+ destroy_session (session->s);
+ }
+}
+
static gboolean
create_smtp_proxy_upstream_connection (struct smtp_proxy_session *session)
{
- struct smtp_upstream *selected;
- struct sockaddr_un *un;
+ struct smtp_upstream *selected;
+ struct sockaddr_un *un;
/* Try to select upstream */
selected = (struct smtp_upstream *)get_upstream_round_robin (session->ctx->upstreams,
@@ -237,9 +443,14 @@ create_smtp_proxy_upstream_connection (struct smtp_proxy_session *session)
}
/* Create a proxy for upstream connection */
rspamd_dispatcher_pause (session->dispatcher);
- session->proxy = rspamd_create_proxy (session->sock, session->upstream_sock, session->pool,
- session->ev_base, session->ctx->proxy_buf_len,
- &session->ctx->smtp_timeout, smtp_proxy_err_proxy, session);
+ /* First of all get upstream's greeting */
+ session->state = SMTP_PROXY_STATE_GREETING;
+
+ event_set (&session->upstream_ev, session->upstream_sock, EV_READ | EV_PERSIST, smtp_proxy_greeting_handler, session);
+ event_base_set (session->ev_base, &session->upstream_ev);
+ event_add (&session->upstream_ev, &session->ctx->smtp_timeout);
+
+ session->upstream_greeting = g_string_sized_new (BUFSIZ);
return TRUE;
}
@@ -251,7 +462,7 @@ create_smtp_proxy_upstream_connection (struct smtp_proxy_session *session)
static void
smtp_delay_handler (gint fd, short what, void *arg)
{
- struct smtp_proxy_session *session = arg;
+ struct smtp_proxy_session *session = arg;
remove_normal_event (session->s, (event_finalizer_t) event_del,
session->delay_timer);
@@ -275,9 +486,9 @@ smtp_delay_handler (gint fd, short what, void *arg)
static void
smtp_make_delay (struct smtp_proxy_session *session)
{
- struct event *tev;
- struct timeval *tv;
- gint32 jitter;
+ struct event *tev;
+ struct timeval *tv;
+ gint32 jitter;
if (session->ctx->smtp_delay != 0 && session->state == SMTP_PROXY_STATE_DELAY) {
tev = memory_pool_alloc (session->pool, sizeof(struct event));
@@ -308,10 +519,10 @@ smtp_make_delay (struct smtp_proxy_session *session)
static void
smtp_dns_cb (struct rspamd_dns_reply *reply, void *arg)
{
- struct smtp_proxy_session *session = arg;
- gint res = 0;
- union rspamd_reply_element *elt;
- GList *cur;
+ struct smtp_proxy_session *session = arg;
+ gint res = 0;
+ union rspamd_reply_element *elt;
+ GList *cur;
switch (session->state)
{
@@ -470,6 +681,7 @@ accept_socket (gint fd, short what, void *arg)
session->ctx = ctx;
session->resolver = ctx->resolver;
session->ev_base = ctx->ev_base;
+ session->upstream_sock = -1;
worker->srv->stat->connections_count++;
/* Resolve client's addr */