]> source.dussan.org Git - rspamd.git/commitdiff
[Feature] Initial support of self-scan in Rspamd proxy
authorVsevolod Stakhov <vsevolod@highsecure.ru>
Tue, 2 May 2017 16:27:51 +0000 (17:27 +0100)
committerVsevolod Stakhov <vsevolod@highsecure.ru>
Tue, 2 May 2017 16:27:51 +0000 (17:27 +0100)
src/controller.c
src/libserver/protocol.c
src/libserver/protocol.h
src/rspamd_proxy.c
src/worker.c
src/worker_private.h

index ad24e50342a91cc9027946d129a0a489052042fc..7214aa59f947cd50ce398d6dd0a8379062a7f79b 100644 (file)
@@ -1876,7 +1876,7 @@ rspamd_controller_scan_reply (struct rspamd_task *task)
        msg = rspamd_http_new_message (HTTP_RESPONSE);
        msg->date = time (NULL);
        msg->code = 200;
-       rspamd_protocol_http_reply (msg, task);
+       rspamd_protocol_http_reply (msg, task, NULL);
        rspamd_http_connection_reset (conn_ent->conn);
        rspamd_http_connection_write_message (conn_ent->conn, msg, NULL,
                        "application/json", conn_ent, conn_ent->conn->fd, conn_ent->rt->ptv,
index cbf91027c5c3bc54078ebd1e117f81c8d9df6e1e..52ba0518864161fdc07e4614af1ac17f379d4b4e 100644 (file)
@@ -1241,7 +1241,7 @@ rspamd_protocol_write_ucl (struct rspamd_task *task,
 
 void
 rspamd_protocol_http_reply (struct rspamd_http_message *msg,
-       struct rspamd_task *task)
+               struct rspamd_task *task, ucl_object_t **pobj)
 {
        struct rspamd_metric_result *metric_res;
        GHashTableIter hiter;
@@ -1265,6 +1265,10 @@ rspamd_protocol_http_reply (struct rspamd_http_message *msg,
 
        top = rspamd_protocol_write_ucl (task, flags);
 
+       if (pobj) {
+               *pobj = top;
+       }
+
        if (!(task->flags & RSPAMD_TASK_FLAG_NO_LOG)) {
                rspamd_roll_history_update (task->worker->srv->history, task);
        }
@@ -1400,7 +1404,7 @@ end:
        }
 }
 
-static void
+void
 rspamd_protocol_write_log_pipe (struct rspamd_task *task)
 {
        struct rspamd_worker_log_pipe *lp;
@@ -1645,7 +1649,6 @@ rspamd_protocol_write_reply (struct rspamd_task *task)
 
        msg->date = time (NULL);
 
-
        debug_task ("writing reply to client");
        if (task->err != NULL) {
                ucl_object_t *top = NULL;
@@ -1675,7 +1678,7 @@ rspamd_protocol_write_reply (struct rspamd_task *task)
                case CMD_PROCESS:
                case CMD_SKIP:
                case CMD_CHECK_V2:
-                       rspamd_protocol_http_reply (msg, task);
+                       rspamd_protocol_http_reply (msg, task, NULL);
                        rspamd_protocol_write_log_pipe (task);
                        break;
                case CMD_PING:
index c0095fedca44950d96ded57191a4b68e76d3d8e0..3b7dabc731e9b631b4d85fa62dd3dd9c649d22d6 100644 (file)
@@ -66,7 +66,12 @@ gboolean rspamd_protocol_handle_request (struct rspamd_task *task,
  * @param task
  */
 void rspamd_protocol_http_reply (struct rspamd_http_message *msg,
-       struct rspamd_task *task);
+               struct rspamd_task *task, ucl_object_t **pobj);
+/**
+ * Write data to log pipes
+ * @param task
+ */
+void rspamd_protocol_write_log_pipe (struct rspamd_task *task);
 
 enum rspamd_protocol_flags {
        RSPAMD_PROTOCOL_BASIC = 1 << 0,
index 992a4f08d4ab9d377bc06648ce0444154ebb8d9c..e381a7c54be78a74c20901a7abcea9d9b0a9f8e1 100644 (file)
 #include "libmime/message.h"
 #include "rspamd.h"
 #include "libserver/worker_util.h"
+#include "worker_private.h"
 #include "lua/lua_common.h"
 #include "keypairs_cache.h"
+#include "libstat/stat_api.h"
 #include "ottery.h"
 #include "unix-std.h"
 
@@ -73,6 +75,7 @@ struct rspamd_http_upstream {
        gint parser_from_ref;
        gint parser_to_ref;
        gboolean local;
+       gboolean self_process;
 };
 
 struct rspamd_http_mirror {
@@ -118,6 +121,8 @@ struct rspamd_proxy_ctx {
        GArray *cmp_refs;
        /* Maximum count for retries */
        guint max_retries;
+       /* If we have self_scanning backends, we need to work as a normal worker */
+       gboolean has_self_scan;
 };
 
 enum rspamd_backend_flags {
@@ -142,9 +147,11 @@ struct rspamd_proxy_backend_connection {
        enum rspamd_backend_flags flags;
        gint parser_from_ref;
        gint parser_to_ref;
+       struct rspamd_task *task;
 };
 
 struct rspamd_proxy_session {
+       struct rspamd_worker *worker;
        rspamd_mempool_t *pool;
        struct rspamd_proxy_ctx *ctx;
        rspamd_inet_addr_t *client_addr;
@@ -318,25 +325,33 @@ rspamd_proxy_parse_upstream (rspamd_mempool_t *pool,
                                (rspamd_mempool_destruct_t)rspamd_pubkey_unref, up->key);
        }
 
+       elt = ucl_object_lookup (obj, "self_process");
+       if (elt && ucl_object_toboolean (elt)) {
+               up->self_process = TRUE;
+               ctx->has_self_scan = TRUE;
+       }
+
        elt = ucl_object_lookup (obj, "hosts");
 
-       if (elt == NULL) {
+       if (elt == NULL && !up->self_process) {
                g_set_error (err, rspamd_proxy_quark (), 100,
                                "upstream option must have some hosts definition");
 
                goto err;
        }
 
-       up->u = rspamd_upstreams_create (ctx->cfg->ups_ctx);
-       if (!rspamd_upstreams_from_ucl (up->u, elt, 11333, NULL)) {
-               g_set_error (err, rspamd_proxy_quark (), 100,
-                               "upstream has bad hosts definition");
+       if (elt) {
+               up->u = rspamd_upstreams_create (ctx->cfg->ups_ctx);
+               if (!rspamd_upstreams_from_ucl (up->u, elt, 11333, NULL)) {
+                       g_set_error (err, rspamd_proxy_quark (), 100,
+                                       "upstream has bad hosts definition");
 
-               goto err;
-       }
+                       goto err;
+               }
 
-       rspamd_mempool_add_destructor (pool,
-                       (rspamd_mempool_destruct_t)rspamd_upstreams_destroy, up->u);
+               rspamd_mempool_add_destructor (pool,
+                               (rspamd_mempool_destruct_t)rspamd_upstreams_destroy, up->u);
+       }
 
        elt = ucl_object_lookup (obj, "default");
        if (elt && ucl_object_toboolean (elt)) {
@@ -841,8 +856,14 @@ proxy_session_dtor (struct rspamd_proxy_session *session)
                }
        }
 
-       if (session->master_conn && session->master_conn->results) {
-               ucl_object_unref (session->master_conn->results);
+       if (session->master_conn) {
+               if (session->master_conn->results) {
+                       ucl_object_unref (session->master_conn->results);
+               }
+
+               if (session->master_conn->task) {
+                       rspamd_session_destroy (session->master_conn->task->s);
+               }
        }
 
        g_ptr_array_free (session->mirror_conns, TRUE);
@@ -1194,6 +1215,144 @@ proxy_backend_master_finish_handler (struct rspamd_http_connection *conn,
        return 0;
 }
 
+static void
+rspamd_proxy_scan_self_reply (struct rspamd_task *task)
+{
+       struct rspamd_http_message *msg;
+       struct rspamd_proxy_session *session = task->fin_arg;
+       ucl_object_t *rep;
+       const char *ctype = "application/json";
+
+       msg = rspamd_http_new_message (HTTP_RESPONSE);
+       msg->date = time (NULL);
+       msg->code = 200;
+
+       switch (task->cmd) {
+       case CMD_REPORT_IFSPAM:
+       case CMD_REPORT:
+       case CMD_CHECK:
+       case CMD_SYMBOLS:
+       case CMD_PROCESS:
+       case CMD_SKIP:
+       case CMD_CHECK_V2:
+               rspamd_protocol_http_reply (msg, task, &rep);
+               rspamd_protocol_write_log_pipe (task);
+               break;
+       case CMD_PING:
+               rspamd_http_message_set_body (msg, "pong" CRLF, 6);
+               ctype = "text/plain";
+               break;
+       case CMD_OTHER:
+               msg_err_task ("BROKEN");
+               break;
+       }
+
+       rspamd_http_connection_reset (session->client_conn);
+       session->master_conn->flags |= RSPAMD_BACKEND_CLOSED;
+       session->master_conn->results = rep;
+       rspamd_http_connection_write_message (session->client_conn, msg, NULL,
+                       ctype,
+                       session,
+                       session->client_sock,
+                       NULL,
+                       session->ctx->ev_base);
+}
+
+static gboolean
+rspamd_proxy_task_fin (void *ud)
+{
+       struct rspamd_task *task = ud;
+
+       msg_debug_task ("finish task");
+
+       if (RSPAMD_TASK_IS_PROCESSED (task)) {
+               rspamd_proxy_scan_self_reply (task);
+               return TRUE;
+       }
+
+       if (!rspamd_task_process (task, RSPAMD_TASK_PROCESS_ALL)) {
+               rspamd_proxy_scan_self_reply (task);
+               return TRUE;
+       }
+
+       if (RSPAMD_TASK_IS_PROCESSED (task)) {
+               rspamd_proxy_scan_self_reply (task);
+               return TRUE;
+       }
+
+       /* One more iteration */
+       return FALSE;
+}
+
+static gboolean
+rspamd_proxy_self_scan (struct rspamd_proxy_session *session)
+{
+       struct rspamd_task *task;
+       struct rspamd_http_message *msg;
+       struct event *guard_ev;
+       const gchar *data;
+       gsize len;
+
+       msg = session->client_message;
+       task = rspamd_task_new (session->worker, session->ctx->cfg);
+       task->flags |= RSPAMD_TASK_FLAG_MIME;
+       task->sock = session->client_sock;
+       task->client_addr = session->client_addr;
+       task->fin_arg = session;
+       task->resolver = session->ctx->resolver;
+       /* TODO: allow to disable autolearn in protocol */
+       task->flags |= RSPAMD_TASK_FLAG_LEARN_AUTO;
+       task->ev_base = session->ctx->ev_base;
+       task->s = rspamd_session_create (task->task_pool, rspamd_proxy_task_fin,
+                       NULL, (event_finalizer_t )rspamd_task_free, task);
+       data = rspamd_http_message_get_body (msg, &len);
+
+       /* Process message */
+       if (!rspamd_protocol_handle_request (task, msg)) {
+               msg_err_task ("cannot handle request: %e", task->err);
+               task->flags |= RSPAMD_TASK_FLAG_SKIP;
+       }
+       else {
+               if (task->cmd == CMD_PING) {
+                       task->flags |= RSPAMD_TASK_FLAG_SKIP;
+               }
+               else {
+                       if (!rspamd_task_load_message (task, msg, data, len)) {
+                               msg_err_task ("cannot load message: %e", task->err);
+                               task->flags |= RSPAMD_TASK_FLAG_SKIP;
+                       }
+               }
+       }
+
+       /* Set global timeout for the task */
+       if (session->ctx->default_upstream->timeout > 0.0) {
+               struct timeval task_tv;
+
+               event_set (&task->timeout_ev, -1, EV_TIMEOUT, rspamd_task_timeout,
+                               task);
+               event_base_set (session->ctx->ev_base, &task->timeout_ev);
+               double_to_tv (session->ctx->default_upstream->timeout, &task_tv);
+               event_add (&task->timeout_ev, &task_tv);
+       }
+
+       /* Set socket guard */
+       guard_ev = rspamd_mempool_alloc (task->task_pool, sizeof (*guard_ev));
+#ifdef EV_CLOSED
+       event_set (guard_ev, task->sock, EV_READ|EV_PERSIST|EV_CLOSED,
+                               rspamd_worker_guard_handler, task);
+#else
+       event_set (guard_ev, task->sock, EV_READ|EV_PERSIST,
+                       rspamd_worker_guard_handler, task);
+#endif
+       event_base_set (task->ev_base, guard_ev);
+       event_add (guard_ev, NULL);
+       task->guard_ev = guard_ev;
+
+       rspamd_task_process (task, RSPAMD_TASK_PROCESS_ALL);
+
+       return TRUE;
+}
+
 static gboolean
 proxy_send_master_message (struct rspamd_proxy_session *session)
 {
@@ -1222,6 +1381,9 @@ proxy_send_master_message (struct rspamd_proxy_session *session)
                goto err;
        }
        else {
+               if (backend->self_process) {
+                       return rspamd_proxy_self_scan (session);
+               }
 retry:
                if (session->ctx->max_retries &&
                                session->retries > session->ctx->max_retries) {
@@ -1415,6 +1577,7 @@ proxy_accept_socket (gint fd, short what, void *arg)
                        ctx->keys_cache,
                        NULL);
        session->ctx = ctx;
+       session->worker = worker;
 
        if (ctx->key) {
                rspamd_http_connection_set_key (session->client_conn, ctx->key);
@@ -1450,8 +1613,7 @@ proxy_rotate_key (gint fd, short what, void *arg)
 }
 
 void
-start_rspamd_proxy (struct rspamd_worker *worker)
-{
+start_rspamd_proxy (struct rspamd_worker *worker) {
        struct rspamd_proxy_ctx *ctx = worker->ctx;
        struct timeval rot_tv;
 
@@ -1480,11 +1642,20 @@ start_rspamd_proxy (struct rspamd_worker *worker)
        event_base_set (ctx->ev_base, &ctx->rotate_ev);
        event_add (&ctx->rotate_ev, &rot_tv);
 
+       if (ctx->has_self_scan) {
+               /* Additional initialisation needed */
+               rspamd_worker_init_scanner (worker, ctx->ev_base, ctx->resolver);
+       }
+
        event_base_loop (ctx->ev_base, 0);
        rspamd_worker_block_signals ();
 
        rspamd_log_close (worker->srv->logger);
 
+       if (ctx->has_self_scan) {
+               rspamd_stat_close ();
+       }
+
        rspamd_keypair_cache_destroy (ctx->keys_cache);
        REF_RELEASE (ctx->cfg);
 
index 724b9ccd2cbdb9960e42d28c8b2d15a53d4ed061..5bb4b2d61553256329de2c84e37f1c5d9dd79253 100644 (file)
@@ -142,7 +142,7 @@ reduce_tasks_count (gpointer arg)
        }
 }
 
-static void
+void
 rspamd_task_timeout (gint fd, short what, gpointer ud)
 {
        struct rspamd_task *task = (struct rspamd_task *) ud;
@@ -156,7 +156,7 @@ rspamd_task_timeout (gint fd, short what, gpointer ud)
        }
 }
 
-static void
+void
 rspamd_worker_guard_handler (gint fd, short what, void *data)
 {
        struct rspamd_task *task = data;
index ac391fc8cd8105c3b24e5971c14d14e0ed69b584..b9a9e57d6fc537366da0ba59393df9a029a5e9a4 100644 (file)
@@ -60,4 +60,14 @@ void rspamd_worker_init_scanner (struct rspamd_worker *worker,
                struct event_base *ev_base,
                struct rspamd_dns_resolver *resolver);
 
+/*
+ * Called on forced timeout
+ */
+void rspamd_task_timeout (gint fd, short what, gpointer ud);
+
+/*
+ * Called on unexpected IO error (e.g. ECONNRESET)
+ */
+void rspamd_worker_guard_handler (gint fd, short what, void *data);
+
 #endif