aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--src/controller.c2
-rw-r--r--src/libserver/protocol.c11
-rw-r--r--src/libserver/protocol.h7
-rw-r--r--src/rspamd_proxy.c197
-rw-r--r--src/worker.c4
-rw-r--r--src/worker_private.h10
6 files changed, 210 insertions, 21 deletions
diff --git a/src/controller.c b/src/controller.c
index ad24e5034..7214aa59f 100644
--- a/src/controller.c
+++ b/src/controller.c
@@ -1876,7 +1876,7 @@ rspamd_controller_scan_reply (struct rspamd_task *task)
msg = rspamd_http_new_message (HTTP_RESPONSE);
msg->date = time (NULL);
msg->code = 200;
- rspamd_protocol_http_reply (msg, task);
+ rspamd_protocol_http_reply (msg, task, NULL);
rspamd_http_connection_reset (conn_ent->conn);
rspamd_http_connection_write_message (conn_ent->conn, msg, NULL,
"application/json", conn_ent, conn_ent->conn->fd, conn_ent->rt->ptv,
diff --git a/src/libserver/protocol.c b/src/libserver/protocol.c
index cbf91027c..52ba05188 100644
--- a/src/libserver/protocol.c
+++ b/src/libserver/protocol.c
@@ -1241,7 +1241,7 @@ rspamd_protocol_write_ucl (struct rspamd_task *task,
void
rspamd_protocol_http_reply (struct rspamd_http_message *msg,
- struct rspamd_task *task)
+ struct rspamd_task *task, ucl_object_t **pobj)
{
struct rspamd_metric_result *metric_res;
GHashTableIter hiter;
@@ -1265,6 +1265,10 @@ rspamd_protocol_http_reply (struct rspamd_http_message *msg,
top = rspamd_protocol_write_ucl (task, flags);
+ if (pobj) {
+ *pobj = top;
+ }
+
if (!(task->flags & RSPAMD_TASK_FLAG_NO_LOG)) {
rspamd_roll_history_update (task->worker->srv->history, task);
}
@@ -1400,7 +1404,7 @@ end:
}
}
-static void
+void
rspamd_protocol_write_log_pipe (struct rspamd_task *task)
{
struct rspamd_worker_log_pipe *lp;
@@ -1645,7 +1649,6 @@ rspamd_protocol_write_reply (struct rspamd_task *task)
msg->date = time (NULL);
-
debug_task ("writing reply to client");
if (task->err != NULL) {
ucl_object_t *top = NULL;
@@ -1675,7 +1678,7 @@ rspamd_protocol_write_reply (struct rspamd_task *task)
case CMD_PROCESS:
case CMD_SKIP:
case CMD_CHECK_V2:
- rspamd_protocol_http_reply (msg, task);
+ rspamd_protocol_http_reply (msg, task, NULL);
rspamd_protocol_write_log_pipe (task);
break;
case CMD_PING:
diff --git a/src/libserver/protocol.h b/src/libserver/protocol.h
index c0095fedc..3b7dabc73 100644
--- a/src/libserver/protocol.h
+++ b/src/libserver/protocol.h
@@ -66,7 +66,12 @@ gboolean rspamd_protocol_handle_request (struct rspamd_task *task,
* @param task
*/
void rspamd_protocol_http_reply (struct rspamd_http_message *msg,
- struct rspamd_task *task);
+ struct rspamd_task *task, ucl_object_t **pobj);
+/**
+ * Write data to log pipes
+ * @param task
+ */
+void rspamd_protocol_write_log_pipe (struct rspamd_task *task);
enum rspamd_protocol_flags {
RSPAMD_PROTOCOL_BASIC = 1 << 0,
diff --git a/src/rspamd_proxy.c b/src/rspamd_proxy.c
index 992a4f08d..e381a7c54 100644
--- a/src/rspamd_proxy.c
+++ b/src/rspamd_proxy.c
@@ -26,8 +26,10 @@
#include "libmime/message.h"
#include "rspamd.h"
#include "libserver/worker_util.h"
+#include "worker_private.h"
#include "lua/lua_common.h"
#include "keypairs_cache.h"
+#include "libstat/stat_api.h"
#include "ottery.h"
#include "unix-std.h"
@@ -73,6 +75,7 @@ struct rspamd_http_upstream {
gint parser_from_ref;
gint parser_to_ref;
gboolean local;
+ gboolean self_process;
};
struct rspamd_http_mirror {
@@ -118,6 +121,8 @@ struct rspamd_proxy_ctx {
GArray *cmp_refs;
/* Maximum count for retries */
guint max_retries;
+ /* If we have self_scanning backends, we need to work as a normal worker */
+ gboolean has_self_scan;
};
enum rspamd_backend_flags {
@@ -142,9 +147,11 @@ struct rspamd_proxy_backend_connection {
enum rspamd_backend_flags flags;
gint parser_from_ref;
gint parser_to_ref;
+ struct rspamd_task *task;
};
struct rspamd_proxy_session {
+ struct rspamd_worker *worker;
rspamd_mempool_t *pool;
struct rspamd_proxy_ctx *ctx;
rspamd_inet_addr_t *client_addr;
@@ -318,25 +325,33 @@ rspamd_proxy_parse_upstream (rspamd_mempool_t *pool,
(rspamd_mempool_destruct_t)rspamd_pubkey_unref, up->key);
}
+ elt = ucl_object_lookup (obj, "self_process");
+ if (elt && ucl_object_toboolean (elt)) {
+ up->self_process = TRUE;
+ ctx->has_self_scan = TRUE;
+ }
+
elt = ucl_object_lookup (obj, "hosts");
- if (elt == NULL) {
+ if (elt == NULL && !up->self_process) {
g_set_error (err, rspamd_proxy_quark (), 100,
"upstream option must have some hosts definition");
goto err;
}
- up->u = rspamd_upstreams_create (ctx->cfg->ups_ctx);
- if (!rspamd_upstreams_from_ucl (up->u, elt, 11333, NULL)) {
- g_set_error (err, rspamd_proxy_quark (), 100,
- "upstream has bad hosts definition");
+ if (elt) {
+ up->u = rspamd_upstreams_create (ctx->cfg->ups_ctx);
+ if (!rspamd_upstreams_from_ucl (up->u, elt, 11333, NULL)) {
+ g_set_error (err, rspamd_proxy_quark (), 100,
+ "upstream has bad hosts definition");
- goto err;
- }
+ goto err;
+ }
- rspamd_mempool_add_destructor (pool,
- (rspamd_mempool_destruct_t)rspamd_upstreams_destroy, up->u);
+ rspamd_mempool_add_destructor (pool,
+ (rspamd_mempool_destruct_t)rspamd_upstreams_destroy, up->u);
+ }
elt = ucl_object_lookup (obj, "default");
if (elt && ucl_object_toboolean (elt)) {
@@ -841,8 +856,14 @@ proxy_session_dtor (struct rspamd_proxy_session *session)
}
}
- if (session->master_conn && session->master_conn->results) {
- ucl_object_unref (session->master_conn->results);
+ if (session->master_conn) {
+ if (session->master_conn->results) {
+ ucl_object_unref (session->master_conn->results);
+ }
+
+ if (session->master_conn->task) {
+ rspamd_session_destroy (session->master_conn->task->s);
+ }
}
g_ptr_array_free (session->mirror_conns, TRUE);
@@ -1194,6 +1215,144 @@ proxy_backend_master_finish_handler (struct rspamd_http_connection *conn,
return 0;
}
+static void
+rspamd_proxy_scan_self_reply (struct rspamd_task *task)
+{
+ struct rspamd_http_message *msg;
+ struct rspamd_proxy_session *session = task->fin_arg;
+ ucl_object_t *rep;
+ const char *ctype = "application/json";
+
+ msg = rspamd_http_new_message (HTTP_RESPONSE);
+ msg->date = time (NULL);
+ msg->code = 200;
+
+ switch (task->cmd) {
+ case CMD_REPORT_IFSPAM:
+ case CMD_REPORT:
+ case CMD_CHECK:
+ case CMD_SYMBOLS:
+ case CMD_PROCESS:
+ case CMD_SKIP:
+ case CMD_CHECK_V2:
+ rspamd_protocol_http_reply (msg, task, &rep);
+ rspamd_protocol_write_log_pipe (task);
+ break;
+ case CMD_PING:
+ rspamd_http_message_set_body (msg, "pong" CRLF, 6);
+ ctype = "text/plain";
+ break;
+ case CMD_OTHER:
+ msg_err_task ("BROKEN");
+ break;
+ }
+
+ rspamd_http_connection_reset (session->client_conn);
+ session->master_conn->flags |= RSPAMD_BACKEND_CLOSED;
+ session->master_conn->results = rep;
+ rspamd_http_connection_write_message (session->client_conn, msg, NULL,
+ ctype,
+ session,
+ session->client_sock,
+ NULL,
+ session->ctx->ev_base);
+}
+
+static gboolean
+rspamd_proxy_task_fin (void *ud)
+{
+ struct rspamd_task *task = ud;
+
+ msg_debug_task ("finish task");
+
+ if (RSPAMD_TASK_IS_PROCESSED (task)) {
+ rspamd_proxy_scan_self_reply (task);
+ return TRUE;
+ }
+
+ if (!rspamd_task_process (task, RSPAMD_TASK_PROCESS_ALL)) {
+ rspamd_proxy_scan_self_reply (task);
+ return TRUE;
+ }
+
+ if (RSPAMD_TASK_IS_PROCESSED (task)) {
+ rspamd_proxy_scan_self_reply (task);
+ return TRUE;
+ }
+
+ /* One more iteration */
+ return FALSE;
+}
+
+static gboolean
+rspamd_proxy_self_scan (struct rspamd_proxy_session *session)
+{
+ struct rspamd_task *task;
+ struct rspamd_http_message *msg;
+ struct event *guard_ev;
+ const gchar *data;
+ gsize len;
+
+ msg = session->client_message;
+ task = rspamd_task_new (session->worker, session->ctx->cfg);
+ task->flags |= RSPAMD_TASK_FLAG_MIME;
+ task->sock = session->client_sock;
+ task->client_addr = session->client_addr;
+ task->fin_arg = session;
+ task->resolver = session->ctx->resolver;
+ /* TODO: allow to disable autolearn in protocol */
+ task->flags |= RSPAMD_TASK_FLAG_LEARN_AUTO;
+ task->ev_base = session->ctx->ev_base;
+ task->s = rspamd_session_create (task->task_pool, rspamd_proxy_task_fin,
+ NULL, (event_finalizer_t )rspamd_task_free, task);
+ data = rspamd_http_message_get_body (msg, &len);
+
+ /* Process message */
+ if (!rspamd_protocol_handle_request (task, msg)) {
+ msg_err_task ("cannot handle request: %e", task->err);
+ task->flags |= RSPAMD_TASK_FLAG_SKIP;
+ }
+ else {
+ if (task->cmd == CMD_PING) {
+ task->flags |= RSPAMD_TASK_FLAG_SKIP;
+ }
+ else {
+ if (!rspamd_task_load_message (task, msg, data, len)) {
+ msg_err_task ("cannot load message: %e", task->err);
+ task->flags |= RSPAMD_TASK_FLAG_SKIP;
+ }
+ }
+ }
+
+ /* Set global timeout for the task */
+ if (session->ctx->default_upstream->timeout > 0.0) {
+ struct timeval task_tv;
+
+ event_set (&task->timeout_ev, -1, EV_TIMEOUT, rspamd_task_timeout,
+ task);
+ event_base_set (session->ctx->ev_base, &task->timeout_ev);
+ double_to_tv (session->ctx->default_upstream->timeout, &task_tv);
+ event_add (&task->timeout_ev, &task_tv);
+ }
+
+ /* Set socket guard */
+ guard_ev = rspamd_mempool_alloc (task->task_pool, sizeof (*guard_ev));
+#ifdef EV_CLOSED
+ event_set (guard_ev, task->sock, EV_READ|EV_PERSIST|EV_CLOSED,
+ rspamd_worker_guard_handler, task);
+#else
+ event_set (guard_ev, task->sock, EV_READ|EV_PERSIST,
+ rspamd_worker_guard_handler, task);
+#endif
+ event_base_set (task->ev_base, guard_ev);
+ event_add (guard_ev, NULL);
+ task->guard_ev = guard_ev;
+
+ rspamd_task_process (task, RSPAMD_TASK_PROCESS_ALL);
+
+ return TRUE;
+}
+
static gboolean
proxy_send_master_message (struct rspamd_proxy_session *session)
{
@@ -1222,6 +1381,9 @@ proxy_send_master_message (struct rspamd_proxy_session *session)
goto err;
}
else {
+ if (backend->self_process) {
+ return rspamd_proxy_self_scan (session);
+ }
retry:
if (session->ctx->max_retries &&
session->retries > session->ctx->max_retries) {
@@ -1415,6 +1577,7 @@ proxy_accept_socket (gint fd, short what, void *arg)
ctx->keys_cache,
NULL);
session->ctx = ctx;
+ session->worker = worker;
if (ctx->key) {
rspamd_http_connection_set_key (session->client_conn, ctx->key);
@@ -1450,8 +1613,7 @@ proxy_rotate_key (gint fd, short what, void *arg)
}
void
-start_rspamd_proxy (struct rspamd_worker *worker)
-{
+start_rspamd_proxy (struct rspamd_worker *worker) {
struct rspamd_proxy_ctx *ctx = worker->ctx;
struct timeval rot_tv;
@@ -1480,11 +1642,20 @@ start_rspamd_proxy (struct rspamd_worker *worker)
event_base_set (ctx->ev_base, &ctx->rotate_ev);
event_add (&ctx->rotate_ev, &rot_tv);
+ if (ctx->has_self_scan) {
+ /* Additional initialisation needed */
+ rspamd_worker_init_scanner (worker, ctx->ev_base, ctx->resolver);
+ }
+
event_base_loop (ctx->ev_base, 0);
rspamd_worker_block_signals ();
rspamd_log_close (worker->srv->logger);
+ if (ctx->has_self_scan) {
+ rspamd_stat_close ();
+ }
+
rspamd_keypair_cache_destroy (ctx->keys_cache);
REF_RELEASE (ctx->cfg);
diff --git a/src/worker.c b/src/worker.c
index 724b9ccd2..5bb4b2d61 100644
--- a/src/worker.c
+++ b/src/worker.c
@@ -142,7 +142,7 @@ reduce_tasks_count (gpointer arg)
}
}
-static void
+void
rspamd_task_timeout (gint fd, short what, gpointer ud)
{
struct rspamd_task *task = (struct rspamd_task *) ud;
@@ -156,7 +156,7 @@ rspamd_task_timeout (gint fd, short what, gpointer ud)
}
}
-static void
+void
rspamd_worker_guard_handler (gint fd, short what, void *data)
{
struct rspamd_task *task = data;
diff --git a/src/worker_private.h b/src/worker_private.h
index ac391fc8c..b9a9e57d6 100644
--- a/src/worker_private.h
+++ b/src/worker_private.h
@@ -60,4 +60,14 @@ void rspamd_worker_init_scanner (struct rspamd_worker *worker,
struct event_base *ev_base,
struct rspamd_dns_resolver *resolver);
+/*
+ * Called on forced timeout
+ */
+void rspamd_task_timeout (gint fd, short what, gpointer ud);
+
+/*
+ * Called on unexpected IO error (e.g. ECONNRESET)
+ */
+void rspamd_worker_guard_handler (gint fd, short what, void *data);
+
#endif