aboutsummaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/libserver/milter.c57
-rw-r--r--src/rspamd_proxy.c92
2 files changed, 119 insertions, 30 deletions
diff --git a/src/libserver/milter.c b/src/libserver/milter.c
index 15835a801..576fb644c 100644
--- a/src/libserver/milter.c
+++ b/src/libserver/milter.c
@@ -67,11 +67,11 @@ rspamd_milter_obuf_free (struct rspamd_milter_outbuf *obuf)
}
#define RSPAMD_MILTER_RESET_COMMON (1 << 0)
-#define RSPAMD_MILTER_RESET_OUT (1 << 1)
+#define RSPAMD_MILTER_RESET_IO (1 << 1)
#define RSPAMD_MILTER_RESET_ADDR (1 << 2)
#define RSPAMD_MILTER_RESET_MACRO (1 << 3)
#define RSPAMD_MILTER_RESET_ALL (RSPAMD_MILTER_RESET_COMMON | \
- RSPAMD_MILTER_RESET_OUT | \
+ RSPAMD_MILTER_RESET_IO | \
RSPAMD_MILTER_RESET_ADDR | \
RSPAMD_MILTER_RESET_MACRO)
#define RSPAMD_MILTER_RESET_QUIT_NC (RSPAMD_MILTER_RESET_COMMON | \
@@ -88,19 +88,19 @@ rspamd_milter_session_reset (struct rspamd_milter_session *session,
struct rspamd_email_address *cur;
guint i;
- if (how & RSPAMD_MILTER_RESET_OUT) {
+ if (how & RSPAMD_MILTER_RESET_IO) {
DL_FOREACH_SAFE (priv->out_chain, obuf, obuf_tmp) {
rspamd_milter_obuf_free (obuf);
}
priv->out_chain = NULL;
- }
- if (how & RSPAMD_MILTER_RESET_COMMON) {
if (priv->parser.buf) {
priv->parser.buf->len = 0;
}
+ }
+ if (how & RSPAMD_MILTER_RESET_COMMON) {
if (session->message) {
session->message->len = 0;
}
@@ -173,10 +173,6 @@ rspamd_milter_session_dtor (struct rspamd_milter_session *session)
rspamd_fstring_free (session->hostname);
}
- if (priv->fd) {
- close (priv->fd);
- }
-
g_free (session);
}
}
@@ -585,6 +581,8 @@ rspamd_milter_process_command (struct rspamd_milter_session *session,
case RSPAMD_MILTER_CMD_QUIT:
msg_debug_milter ("quit command");
priv->state = RSPAMD_MILTER_WANNA_DIE;
+ REF_RETAIN (session);
+ priv->fin_cb (priv->fd, session, priv->ud);
REF_RELEASE (session);
break;
case RSPAMD_MILTER_CMD_RCPT:
@@ -631,7 +629,6 @@ rspamd_milter_process_command (struct rspamd_milter_session *session,
session->message = rspamd_fstring_sized_new (
RSPAMD_MILTER_MESSAGE_CHUNK);
}
-
msg_debug_milter ("got data command");
/* We do not need reply as specified */
break;
@@ -681,6 +678,7 @@ rspamd_milter_consume_input (struct rspamd_milter_session *session,
end = priv->parser.buf->str + priv->parser.buf->len;
while (p < end) {
+ msg_debug_milter("offset: %d, state: %d", (gint)(p - (const guchar *)priv->parser.buf->str), priv->parser.state);
switch (priv->parser.state) {
case st_len_1:
/* The first length byte in big endian order */
@@ -746,10 +744,10 @@ rspamd_milter_consume_input (struct rspamd_milter_session *session,
return FALSE;
}
if (priv->parser.buf->allocated < priv->parser.datalen) {
+ priv->parser.pos = p - (const guchar *)priv->parser.buf->str;
priv->parser.buf = rspamd_fstring_grow (priv->parser.buf,
- priv->parser.pos + priv->parser.datalen);
+ priv->parser.buf->len + priv->parser.datalen);
/* This can realloc buffer */
- p = priv->parser.buf->str + priv->parser.pos;
rspamd_milter_plan_io (session, priv, EV_READ);
goto end;
}
@@ -768,6 +766,7 @@ rspamd_milter_consume_input (struct rspamd_milter_session *session,
}
else {
/* Need to read more */
+ priv->parser.pos = p - (const guchar *)priv->parser.buf->str;
rspamd_milter_plan_io (session, priv, EV_READ);
goto end;
}
@@ -775,9 +774,38 @@ rspamd_milter_consume_input (struct rspamd_milter_session *session,
break;
}
}
+
+ /* Leftover */
+ switch (priv->parser.state) {
+ case st_read_data:
+ if (p + priv->parser.datalen <= end) {
+ if (!rspamd_milter_process_command (session, priv)) {
+ return FALSE;
+ }
+
+ priv->parser.state = st_len_1;
+ priv->parser.cur_cmd = '\0';
+ priv->parser.cmd_start = 0;
+ }
+ break;
+ default:
+ /* No need to do anything */
+ break;
+ }
+
+ if (p == end) {
+ priv->parser.buf->len = 0;
+ priv->parser.pos = 0;
+ }
+
+ if (priv->out_chain) {
+ rspamd_milter_plan_io (session, priv, EV_READ|EV_WRITE);
+ }
+ else {
+ rspamd_milter_plan_io (session, priv, EV_READ);
+ }
end:
- priv->parser.pos = p - (const guchar *)priv->parser.buf->str;
return TRUE;
}
@@ -801,6 +829,9 @@ rspamd_milter_handle_session (struct rspamd_milter_session *session,
r = read (priv->fd, priv->parser.buf->str + priv->parser.buf->len,
priv->parser.buf->allocated - priv->parser.buf->len);
+ msg_debug_milter ("read %z bytes, %z remain, %z allocated",
+ r, priv->parser.buf->len, priv->parser.buf->allocated);
+
if (r == -1) {
if (errno == EAGAIN || errno == EINTR) {
rspamd_milter_plan_io (session, priv, EV_READ);
diff --git a/src/rspamd_proxy.c b/src/rspamd_proxy.c
index 6eabcf109..5728419b9 100644
--- a/src/rspamd_proxy.c
+++ b/src/rspamd_proxy.c
@@ -859,6 +859,10 @@ proxy_session_dtor (struct rspamd_proxy_session *session)
rspamd_http_connection_unref (session->client_conn);
}
+ if (session->client_milter_conn) {
+ rspamd_milter_session_unref (session->client_milter_conn);
+ }
+
for (i = 0; i < session->mirror_conns->len; i ++) {
conn = g_ptr_array_index (session->mirror_conns, i);
@@ -1142,12 +1146,18 @@ proxy_client_write_error (struct rspamd_proxy_session *session, gint code,
{
struct rspamd_http_message *reply;
- reply = rspamd_http_new_message (HTTP_RESPONSE);
- reply->code = code;
- reply->status = rspamd_fstring_new_init (status, strlen (status));
- rspamd_http_connection_write_message (session->client_conn,
- reply, NULL, NULL, session, session->client_sock,
- &session->ctx->io_tv, session->ctx->ev_base);
+ if (session->client_milter_conn) {
+ rspamd_milter_send_action (session->client_milter_conn,
+ RSPAMD_MILTER_TEMPFAIL);
+ }
+ else {
+ reply = rspamd_http_new_message (HTTP_RESPONSE);
+ reply->code = code;
+ reply->status = rspamd_fstring_new_init (status, strlen (status));
+ rspamd_http_connection_write_message (session->client_conn,
+ reply, NULL, NULL, session, session->client_sock,
+ &session->ctx->io_tv, session->ctx->ev_base);
+ }
}
static void
@@ -1180,7 +1190,8 @@ proxy_backend_master_error_handler (struct rspamd_http_connection *conn, GError
else {
msg_info_session ("retry connection to: %s"
" retries left: %d",
- rspamd_inet_address_to_string (rspamd_upstream_addr (session->master_conn->up)),
+ rspamd_inet_address_to_string (
+ rspamd_upstream_addr (session->master_conn->up)),
session->ctx->max_retries - session->retries);
}
}
@@ -1223,9 +1234,18 @@ proxy_backend_master_finish_handler (struct rspamd_http_connection *conn,
rspamd_upstream_ok (bk_conn->up);
- rspamd_http_connection_write_message (session->client_conn,
- msg, NULL, NULL, session, session->client_sock,
- bk_conn->io_tv, session->ctx->ev_base);
+ if (session->client_milter_conn) {
+ /*
+ * TODO: convert reply to milter reply
+ */
+ rspamd_milter_send_action (session->client_milter_conn,
+ RSPAMD_MILTER_ACCEPT);
+ }
+ else {
+ rspamd_http_connection_write_message (session->client_conn,
+ msg, NULL, NULL, session, session->client_sock,
+ bk_conn->io_tv, session->ctx->ev_base);
+ }
return 0;
}
@@ -1262,15 +1282,27 @@ rspamd_proxy_scan_self_reply (struct rspamd_task *task)
break;
}
- rspamd_http_connection_reset (session->client_conn);
session->master_conn->flags |= RSPAMD_BACKEND_CLOSED;
session->master_conn->results = rep;
- rspamd_http_connection_write_message (session->client_conn, msg, NULL,
- ctype,
- session,
- session->client_sock,
- NULL,
- session->ctx->ev_base);
+
+ if (session->client_milter_conn) {
+ /*
+ * TODO: convert reply to milter reply
+ */
+ rspamd_milter_send_action (session->client_milter_conn,
+ RSPAMD_MILTER_ACCEPT);
+ }
+ else {
+ rspamd_http_connection_reset (session->client_conn);
+ rspamd_http_connection_write_message (session->client_conn,
+ msg,
+ NULL,
+ ctype,
+ session,
+ session->client_sock,
+ NULL,
+ session->ctx->ev_base);
+ }
}
static gboolean
@@ -1548,6 +1580,32 @@ proxy_milter_finish_handler (gint fd,
void *ud)
{
struct rspamd_proxy_session *session = ud;
+ struct rspamd_http_message *msg;
+
+ if (!session->master_conn) {
+ session->client_milter_conn = rms;
+ msg = rspamd_milter_to_http (rms);
+ session->master_conn = rspamd_mempool_alloc0 (session->pool,
+ sizeof (*session->master_conn));
+ session->master_conn->s = session;
+ session->master_conn->name = "master";
+ session->client_message = msg;
+
+ if (msg->body_buf.len == 0) {
+ msg_info_session ("incomplete master connection");
+ proxy_backend_close_connection (session->master_conn);
+ REF_RELEASE (session);
+ }
+ else {
+ proxy_open_mirror_connections (session);
+ proxy_send_master_message (session);
+ }
+ }
+ else {
+ msg_info_session ("finished master connection");
+ proxy_backend_close_connection (session->master_conn);
+ REF_RELEASE (session);
+ }
}
static void