#include "libmime/message.h"
#include "rspamd.h"
#include "libserver/worker_util.h"
+#include "worker_private.h"
#include "lua/lua_common.h"
#include "keypairs_cache.h"
+#include "libstat/stat_api.h"
#include "ottery.h"
#include "unix-std.h"
gint parser_from_ref;
gint parser_to_ref;
gboolean local;
+ gboolean self_process;
};
struct rspamd_http_mirror {
GArray *cmp_refs;
/* Maximum count for retries */
guint max_retries;
+ /* If we have self_scanning backends, we need to work as a normal worker */
+ gboolean has_self_scan;
};
enum rspamd_backend_flags {
enum rspamd_backend_flags flags;
gint parser_from_ref;
gint parser_to_ref;
+ struct rspamd_task *task;
};
struct rspamd_proxy_session {
+ struct rspamd_worker *worker;
rspamd_mempool_t *pool;
struct rspamd_proxy_ctx *ctx;
rspamd_inet_addr_t *client_addr;
(rspamd_mempool_destruct_t)rspamd_pubkey_unref, up->key);
}
+ elt = ucl_object_lookup (obj, "self_process");
+ if (elt && ucl_object_toboolean (elt)) {
+ up->self_process = TRUE;
+ ctx->has_self_scan = TRUE;
+ }
+
elt = ucl_object_lookup (obj, "hosts");
- if (elt == NULL) {
+ if (elt == NULL && !up->self_process) {
g_set_error (err, rspamd_proxy_quark (), 100,
"upstream option must have some hosts definition");
goto err;
}
- up->u = rspamd_upstreams_create (ctx->cfg->ups_ctx);
- if (!rspamd_upstreams_from_ucl (up->u, elt, 11333, NULL)) {
- g_set_error (err, rspamd_proxy_quark (), 100,
- "upstream has bad hosts definition");
+ if (elt) {
+ up->u = rspamd_upstreams_create (ctx->cfg->ups_ctx);
+ if (!rspamd_upstreams_from_ucl (up->u, elt, 11333, NULL)) {
+ g_set_error (err, rspamd_proxy_quark (), 100,
+ "upstream has bad hosts definition");
- goto err;
- }
+ goto err;
+ }
- rspamd_mempool_add_destructor (pool,
- (rspamd_mempool_destruct_t)rspamd_upstreams_destroy, up->u);
+ rspamd_mempool_add_destructor (pool,
+ (rspamd_mempool_destruct_t)rspamd_upstreams_destroy, up->u);
+ }
elt = ucl_object_lookup (obj, "default");
if (elt && ucl_object_toboolean (elt)) {
}
}
- if (session->master_conn && session->master_conn->results) {
- ucl_object_unref (session->master_conn->results);
+ if (session->master_conn) {
+ if (session->master_conn->results) {
+ ucl_object_unref (session->master_conn->results);
+ }
+
+ if (session->master_conn->task) {
+ rspamd_session_destroy (session->master_conn->task->s);
+ }
}
g_ptr_array_free (session->mirror_conns, TRUE);
return 0;
}
+static void
+rspamd_proxy_scan_self_reply (struct rspamd_task *task)
+{
+ struct rspamd_http_message *msg;
+ struct rspamd_proxy_session *session = task->fin_arg;
+ ucl_object_t *rep;
+ const char *ctype = "application/json";
+
+ msg = rspamd_http_new_message (HTTP_RESPONSE);
+ msg->date = time (NULL);
+ msg->code = 200;
+
+ switch (task->cmd) {
+ case CMD_REPORT_IFSPAM:
+ case CMD_REPORT:
+ case CMD_CHECK:
+ case CMD_SYMBOLS:
+ case CMD_PROCESS:
+ case CMD_SKIP:
+ case CMD_CHECK_V2:
+ rspamd_protocol_http_reply (msg, task, &rep);
+ rspamd_protocol_write_log_pipe (task);
+ break;
+ case CMD_PING:
+ rspamd_http_message_set_body (msg, "pong" CRLF, 6);
+ ctype = "text/plain";
+ break;
+ case CMD_OTHER:
+ msg_err_task ("BROKEN");
+ break;
+ }
+
+ rspamd_http_connection_reset (session->client_conn);
+ session->master_conn->flags |= RSPAMD_BACKEND_CLOSED;
+ session->master_conn->results = rep;
+ rspamd_http_connection_write_message (session->client_conn, msg, NULL,
+ ctype,
+ session,
+ session->client_sock,
+ NULL,
+ session->ctx->ev_base);
+}
+
+static gboolean
+rspamd_proxy_task_fin (void *ud)
+{
+ struct rspamd_task *task = ud;
+
+ msg_debug_task ("finish task");
+
+ if (RSPAMD_TASK_IS_PROCESSED (task)) {
+ rspamd_proxy_scan_self_reply (task);
+ return TRUE;
+ }
+
+ if (!rspamd_task_process (task, RSPAMD_TASK_PROCESS_ALL)) {
+ rspamd_proxy_scan_self_reply (task);
+ return TRUE;
+ }
+
+ if (RSPAMD_TASK_IS_PROCESSED (task)) {
+ rspamd_proxy_scan_self_reply (task);
+ return TRUE;
+ }
+
+ /* One more iteration */
+ return FALSE;
+}
+
+static gboolean
+rspamd_proxy_self_scan (struct rspamd_proxy_session *session)
+{
+ struct rspamd_task *task;
+ struct rspamd_http_message *msg;
+ struct event *guard_ev;
+ const gchar *data;
+ gsize len;
+
+ msg = session->client_message;
+ task = rspamd_task_new (session->worker, session->ctx->cfg);
+ task->flags |= RSPAMD_TASK_FLAG_MIME;
+ task->sock = session->client_sock;
+ task->client_addr = session->client_addr;
+ task->fin_arg = session;
+ task->resolver = session->ctx->resolver;
+ /* TODO: allow to disable autolearn in protocol */
+ task->flags |= RSPAMD_TASK_FLAG_LEARN_AUTO;
+ task->ev_base = session->ctx->ev_base;
+ task->s = rspamd_session_create (task->task_pool, rspamd_proxy_task_fin,
+ NULL, (event_finalizer_t )rspamd_task_free, task);
+ data = rspamd_http_message_get_body (msg, &len);
+
+ /* Process message */
+ if (!rspamd_protocol_handle_request (task, msg)) {
+ msg_err_task ("cannot handle request: %e", task->err);
+ task->flags |= RSPAMD_TASK_FLAG_SKIP;
+ }
+ else {
+ if (task->cmd == CMD_PING) {
+ task->flags |= RSPAMD_TASK_FLAG_SKIP;
+ }
+ else {
+ if (!rspamd_task_load_message (task, msg, data, len)) {
+ msg_err_task ("cannot load message: %e", task->err);
+ task->flags |= RSPAMD_TASK_FLAG_SKIP;
+ }
+ }
+ }
+
+ /* Set global timeout for the task */
+ if (session->ctx->default_upstream->timeout > 0.0) {
+ struct timeval task_tv;
+
+ event_set (&task->timeout_ev, -1, EV_TIMEOUT, rspamd_task_timeout,
+ task);
+ event_base_set (session->ctx->ev_base, &task->timeout_ev);
+ double_to_tv (session->ctx->default_upstream->timeout, &task_tv);
+ event_add (&task->timeout_ev, &task_tv);
+ }
+
+ /* Set socket guard */
+ guard_ev = rspamd_mempool_alloc (task->task_pool, sizeof (*guard_ev));
+#ifdef EV_CLOSED
+ event_set (guard_ev, task->sock, EV_READ|EV_PERSIST|EV_CLOSED,
+ rspamd_worker_guard_handler, task);
+#else
+ event_set (guard_ev, task->sock, EV_READ|EV_PERSIST,
+ rspamd_worker_guard_handler, task);
+#endif
+ event_base_set (task->ev_base, guard_ev);
+ event_add (guard_ev, NULL);
+ task->guard_ev = guard_ev;
+
+ rspamd_task_process (task, RSPAMD_TASK_PROCESS_ALL);
+
+ return TRUE;
+}
+
static gboolean
proxy_send_master_message (struct rspamd_proxy_session *session)
{
goto err;
}
else {
+ if (backend->self_process) {
+ return rspamd_proxy_self_scan (session);
+ }
retry:
if (session->ctx->max_retries &&
session->retries > session->ctx->max_retries) {
ctx->keys_cache,
NULL);
session->ctx = ctx;
+ session->worker = worker;
if (ctx->key) {
rspamd_http_connection_set_key (session->client_conn, ctx->key);
}
void
-start_rspamd_proxy (struct rspamd_worker *worker)
-{
+start_rspamd_proxy (struct rspamd_worker *worker) {
struct rspamd_proxy_ctx *ctx = worker->ctx;
struct timeval rot_tv;
event_base_set (ctx->ev_base, &ctx->rotate_ev);
event_add (&ctx->rotate_ev, &rot_tv);
+ if (ctx->has_self_scan) {
+ /* Additional initialisation needed */
+ rspamd_worker_init_scanner (worker, ctx->ev_base, ctx->resolver);
+ }
+
event_base_loop (ctx->ev_base, 0);
rspamd_worker_block_signals ();
rspamd_log_close (worker->srv->logger);
+ if (ctx->has_self_scan) {
+ rspamd_stat_close ();
+ }
+
rspamd_keypair_cache_destroy (ctx->keys_cache);
REF_RELEASE (ctx->cfg);