diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/libserver/milter.c | 57 | ||||
-rw-r--r-- | src/rspamd_proxy.c | 92 |
2 files changed, 119 insertions, 30 deletions
diff --git a/src/libserver/milter.c b/src/libserver/milter.c index 15835a801..576fb644c 100644 --- a/src/libserver/milter.c +++ b/src/libserver/milter.c @@ -67,11 +67,11 @@ rspamd_milter_obuf_free (struct rspamd_milter_outbuf *obuf) } #define RSPAMD_MILTER_RESET_COMMON (1 << 0) -#define RSPAMD_MILTER_RESET_OUT (1 << 1) +#define RSPAMD_MILTER_RESET_IO (1 << 1) #define RSPAMD_MILTER_RESET_ADDR (1 << 2) #define RSPAMD_MILTER_RESET_MACRO (1 << 3) #define RSPAMD_MILTER_RESET_ALL (RSPAMD_MILTER_RESET_COMMON | \ - RSPAMD_MILTER_RESET_OUT | \ + RSPAMD_MILTER_RESET_IO | \ RSPAMD_MILTER_RESET_ADDR | \ RSPAMD_MILTER_RESET_MACRO) #define RSPAMD_MILTER_RESET_QUIT_NC (RSPAMD_MILTER_RESET_COMMON | \ @@ -88,19 +88,19 @@ rspamd_milter_session_reset (struct rspamd_milter_session *session, struct rspamd_email_address *cur; guint i; - if (how & RSPAMD_MILTER_RESET_OUT) { + if (how & RSPAMD_MILTER_RESET_IO) { DL_FOREACH_SAFE (priv->out_chain, obuf, obuf_tmp) { rspamd_milter_obuf_free (obuf); } priv->out_chain = NULL; - } - if (how & RSPAMD_MILTER_RESET_COMMON) { if (priv->parser.buf) { priv->parser.buf->len = 0; } + } + if (how & RSPAMD_MILTER_RESET_COMMON) { if (session->message) { session->message->len = 0; } @@ -173,10 +173,6 @@ rspamd_milter_session_dtor (struct rspamd_milter_session *session) rspamd_fstring_free (session->hostname); } - if (priv->fd) { - close (priv->fd); - } - g_free (session); } } @@ -585,6 +581,8 @@ rspamd_milter_process_command (struct rspamd_milter_session *session, case RSPAMD_MILTER_CMD_QUIT: msg_debug_milter ("quit command"); priv->state = RSPAMD_MILTER_WANNA_DIE; + REF_RETAIN (session); + priv->fin_cb (priv->fd, session, priv->ud); REF_RELEASE (session); break; case RSPAMD_MILTER_CMD_RCPT: @@ -631,7 +629,6 @@ rspamd_milter_process_command (struct rspamd_milter_session *session, session->message = rspamd_fstring_sized_new ( RSPAMD_MILTER_MESSAGE_CHUNK); } - msg_debug_milter ("got data command"); /* We do not need reply as specified */ break; @@ -681,6 +678,7 @@ rspamd_milter_consume_input (struct rspamd_milter_session *session, end = priv->parser.buf->str + priv->parser.buf->len; while (p < end) { + msg_debug_milter("offset: %d, state: %d", (gint)(p - (const guchar *)priv->parser.buf->str), priv->parser.state); switch (priv->parser.state) { case st_len_1: /* The first length byte in big endian order */ @@ -746,10 +744,10 @@ rspamd_milter_consume_input (struct rspamd_milter_session *session, return FALSE; } if (priv->parser.buf->allocated < priv->parser.datalen) { + priv->parser.pos = p - (const guchar *)priv->parser.buf->str; priv->parser.buf = rspamd_fstring_grow (priv->parser.buf, - priv->parser.pos + priv->parser.datalen); + priv->parser.buf->len + priv->parser.datalen); /* This can realloc buffer */ - p = priv->parser.buf->str + priv->parser.pos; rspamd_milter_plan_io (session, priv, EV_READ); goto end; } @@ -768,6 +766,7 @@ rspamd_milter_consume_input (struct rspamd_milter_session *session, } else { /* Need to read more */ + priv->parser.pos = p - (const guchar *)priv->parser.buf->str; rspamd_milter_plan_io (session, priv, EV_READ); goto end; } @@ -775,9 +774,38 @@ rspamd_milter_consume_input (struct rspamd_milter_session *session, break; } } + + /* Leftover */ + switch (priv->parser.state) { + case st_read_data: + if (p + priv->parser.datalen <= end) { + if (!rspamd_milter_process_command (session, priv)) { + return FALSE; + } + + priv->parser.state = st_len_1; + priv->parser.cur_cmd = '\0'; + priv->parser.cmd_start = 0; + } + break; + default: + /* No need to do anything */ + break; + } + + if (p == end) { + priv->parser.buf->len = 0; + priv->parser.pos = 0; + } + + if (priv->out_chain) { + rspamd_milter_plan_io (session, priv, EV_READ|EV_WRITE); + } + else { + rspamd_milter_plan_io (session, priv, EV_READ); + } end: - priv->parser.pos = p - (const guchar *)priv->parser.buf->str; return TRUE; } @@ -801,6 +829,9 @@ rspamd_milter_handle_session (struct rspamd_milter_session *session, r = read (priv->fd, priv->parser.buf->str + priv->parser.buf->len, priv->parser.buf->allocated - priv->parser.buf->len); + msg_debug_milter ("read %z bytes, %z remain, %z allocated", + r, priv->parser.buf->len, priv->parser.buf->allocated); + if (r == -1) { if (errno == EAGAIN || errno == EINTR) { rspamd_milter_plan_io (session, priv, EV_READ); diff --git a/src/rspamd_proxy.c b/src/rspamd_proxy.c index 6eabcf109..5728419b9 100644 --- a/src/rspamd_proxy.c +++ b/src/rspamd_proxy.c @@ -859,6 +859,10 @@ proxy_session_dtor (struct rspamd_proxy_session *session) rspamd_http_connection_unref (session->client_conn); } + if (session->client_milter_conn) { + rspamd_milter_session_unref (session->client_milter_conn); + } + for (i = 0; i < session->mirror_conns->len; i ++) { conn = g_ptr_array_index (session->mirror_conns, i); @@ -1142,12 +1146,18 @@ proxy_client_write_error (struct rspamd_proxy_session *session, gint code, { struct rspamd_http_message *reply; - reply = rspamd_http_new_message (HTTP_RESPONSE); - reply->code = code; - reply->status = rspamd_fstring_new_init (status, strlen (status)); - rspamd_http_connection_write_message (session->client_conn, - reply, NULL, NULL, session, session->client_sock, - &session->ctx->io_tv, session->ctx->ev_base); + if (session->client_milter_conn) { + rspamd_milter_send_action (session->client_milter_conn, + RSPAMD_MILTER_TEMPFAIL); + } + else { + reply = rspamd_http_new_message (HTTP_RESPONSE); + reply->code = code; + reply->status = rspamd_fstring_new_init (status, strlen (status)); + rspamd_http_connection_write_message (session->client_conn, + reply, NULL, NULL, session, session->client_sock, + &session->ctx->io_tv, session->ctx->ev_base); + } } static void @@ -1180,7 +1190,8 @@ proxy_backend_master_error_handler (struct rspamd_http_connection *conn, GError else { msg_info_session ("retry connection to: %s" " retries left: %d", - rspamd_inet_address_to_string (rspamd_upstream_addr (session->master_conn->up)), + rspamd_inet_address_to_string ( + rspamd_upstream_addr (session->master_conn->up)), session->ctx->max_retries - session->retries); } } @@ -1223,9 +1234,18 @@ proxy_backend_master_finish_handler (struct rspamd_http_connection *conn, rspamd_upstream_ok (bk_conn->up); - rspamd_http_connection_write_message (session->client_conn, - msg, NULL, NULL, session, session->client_sock, - bk_conn->io_tv, session->ctx->ev_base); + if (session->client_milter_conn) { + /* + * TODO: convert reply to milter reply + */ + rspamd_milter_send_action (session->client_milter_conn, + RSPAMD_MILTER_ACCEPT); + } + else { + rspamd_http_connection_write_message (session->client_conn, + msg, NULL, NULL, session, session->client_sock, + bk_conn->io_tv, session->ctx->ev_base); + } return 0; } @@ -1262,15 +1282,27 @@ rspamd_proxy_scan_self_reply (struct rspamd_task *task) break; } - rspamd_http_connection_reset (session->client_conn); session->master_conn->flags |= RSPAMD_BACKEND_CLOSED; session->master_conn->results = rep; - rspamd_http_connection_write_message (session->client_conn, msg, NULL, - ctype, - session, - session->client_sock, - NULL, - session->ctx->ev_base); + + if (session->client_milter_conn) { + /* + * TODO: convert reply to milter reply + */ + rspamd_milter_send_action (session->client_milter_conn, + RSPAMD_MILTER_ACCEPT); + } + else { + rspamd_http_connection_reset (session->client_conn); + rspamd_http_connection_write_message (session->client_conn, + msg, + NULL, + ctype, + session, + session->client_sock, + NULL, + session->ctx->ev_base); + } } static gboolean @@ -1548,6 +1580,32 @@ proxy_milter_finish_handler (gint fd, void *ud) { struct rspamd_proxy_session *session = ud; + struct rspamd_http_message *msg; + + if (!session->master_conn) { + session->client_milter_conn = rms; + msg = rspamd_milter_to_http (rms); + session->master_conn = rspamd_mempool_alloc0 (session->pool, + sizeof (*session->master_conn)); + session->master_conn->s = session; + session->master_conn->name = "master"; + session->client_message = msg; + + if (msg->body_buf.len == 0) { + msg_info_session ("incomplete master connection"); + proxy_backend_close_connection (session->master_conn); + REF_RELEASE (session); + } + else { + proxy_open_mirror_connections (session); + proxy_send_master_message (session); + } + } + else { + msg_info_session ("finished master connection"); + proxy_backend_close_connection (session->master_conn); + REF_RELEASE (session); + } } static void |