]> source.dussan.org Git - rspamd.git/commitdiff
[Project] Allow to enable mempool debugging from the protocol
authorVsevolod Stakhov <vsevolod@highsecure.ru>
Mon, 23 Dec 2019 18:43:16 +0000 (18:43 +0000)
committerVsevolod Stakhov <vsevolod@highsecure.ru>
Mon, 23 Dec 2019 18:43:16 +0000 (18:43 +0000)
src/controller.c
src/libserver/task.c
src/libserver/task.h
src/libserver/worker_util.c
src/lua/lua_task.c
src/lua/lua_util.c
src/plugins/fuzzy_check.c
src/rspamadm/lua_repl.c
src/rspamd_proxy.c
src/worker.c

index e4ad7085d9d5819f2def4ac02bebece40f4f1aae..d36431702cd1ecb9745b6d2e4389839993918774 100644 (file)
@@ -1514,7 +1514,7 @@ rspamd_controller_handle_lua_history (lua_State *L,
 
                        if (lua_isfunction (L, -1)) {
                                task = rspamd_task_new (session->ctx->worker, session->cfg,
-                                               session->pool, ctx->lang_det, ctx->event_loop);
+                                               session->pool, ctx->lang_det, ctx->event_loop, FALSE);
 
                                task->resolver = ctx->resolver;
                                task->s = rspamd_session_create (session->pool,
@@ -1811,7 +1811,7 @@ rspamd_controller_handle_lua (struct rspamd_http_connection_entry *conn_ent,
        }
 
        task = rspamd_task_new (session->ctx->worker, session->cfg, session->pool,
-                       ctx->lang_det, ctx->event_loop);
+                       ctx->lang_det, ctx->event_loop, FALSE);
 
        task->resolver = ctx->resolver;
        task->s = rspamd_session_create (session->pool,
@@ -1996,7 +1996,7 @@ rspamd_controller_handle_learn_common (
        }
 
        task = rspamd_task_new (session->ctx->worker, session->cfg, session->pool,
-                       session->ctx->lang_det, ctx->event_loop);
+                       session->ctx->lang_det, ctx->event_loop, FALSE);
 
        task->resolver = ctx->resolver;
        task->s = rspamd_session_create (session->pool,
@@ -2095,7 +2095,7 @@ rspamd_controller_handle_scan (struct rspamd_http_connection_entry *conn_ent,
        }
 
        task = rspamd_task_new (session->ctx->worker, session->cfg, session->pool,
-                       ctx->lang_det, ctx->event_loop);
+                       ctx->lang_det, ctx->event_loop, FALSE);
 
        task->resolver = ctx->resolver;
        task->s = rspamd_session_create (session->pool,
@@ -2584,7 +2584,7 @@ rspamd_controller_handle_stat_common (
        ctx = session->ctx;
 
        task = rspamd_task_new (session->ctx->worker, session->cfg, session->pool,
-                       ctx->lang_det, ctx->event_loop);
+                       ctx->lang_det, ctx->event_loop, FALSE);
        task->resolver = ctx->resolver;
        cbdata = rspamd_mempool_alloc0 (session->pool, sizeof (*cbdata));
        cbdata->conn_ent = conn_ent;
@@ -2986,7 +2986,7 @@ rspamd_controller_handle_lua_plugin (struct rspamd_http_connection_entry *conn_e
        }
 
        task = rspamd_task_new (session->ctx->worker, session->cfg, session->pool,
-                       ctx->lang_det, ctx->event_loop);
+                       ctx->lang_det, ctx->event_loop, FALSE);
 
        task->resolver = ctx->resolver;
        task->s = rspamd_session_create (session->pool,
index 9eebe02a2186c90fed3cf0671b93cad18d70ac46..ccd959920beb70c9715b9ac47238f75a81f6be4c 100644 (file)
@@ -64,14 +64,16 @@ rspamd_task_new (struct rspamd_worker *worker,
                                 struct rspamd_config *cfg,
                                 rspamd_mempool_t *pool,
                                 struct rspamd_lang_detector *lang_det,
-                                struct ev_loop *event_loop)
+                                struct ev_loop *event_loop,
+                                gboolean debug_mem)
 {
        struct rspamd_task *new_task;
        rspamd_mempool_t *task_pool;
        guint flags = 0;
 
        if (pool == NULL) {
-               task_pool = rspamd_mempool_new (rspamd_mempool_suggest_size (), "task", 0);
+               task_pool = rspamd_mempool_new (rspamd_mempool_suggest_size (),
+                               "task", debug_mem ? RSPAMD_MEMPOOL_DEBUG : 0);
                flags |= RSPAMD_TASK_FLAG_OWN_POOL;
        }
        else {
index a96e2ac053b571a382adbbf38d894bf13d63a020..feac456dd9be8e62d9c0d886f021f6c4d8d73bbb 100644 (file)
@@ -225,7 +225,8 @@ struct rspamd_task *rspamd_task_new (struct rspamd_worker *worker,
                                                                         struct rspamd_config *cfg,
                                                                         rspamd_mempool_t *pool,
                                                                         struct rspamd_lang_detector *lang_det,
-                                                                        struct ev_loop *event_loop);
+                                                                        struct ev_loop *event_loop,
+                                                                        gboolean debug_mem);
 
 /**
  * Destroy task object and remove its IO dispatcher if it exists
index 362d64bc5741db9d1c7086bce05388f6f0d817f0..e519cb9851f2bd11e41cb51e2c464c3ed334ec77 100644 (file)
@@ -143,7 +143,7 @@ rspamd_worker_call_finish_handlers (struct rspamd_worker *worker)
        if (cfg->on_term_scripts) {
                ctx = (struct rspamd_abstract_worker_ctx *)worker->ctx;
                /* Create a fake task object for async events */
-               task = rspamd_task_new (worker, cfg, NULL, NULL, ctx->event_loop);
+               task = rspamd_task_new (worker, cfg, NULL, NULL, ctx->event_loop, FALSE);
                task->resolver = ctx->resolver;
                task->flags |= RSPAMD_TASK_FLAG_PROCESSING;
                task->s = rspamd_session_create (task->task_pool,
index cf57ebf131a8c743dd9cac861f716c657ba3a8c0..774bb01204ed2e0e9f9c49b735b22297eac2d56b 100644 (file)
@@ -1615,7 +1615,7 @@ lua_task_load_from_file (lua_State * L)
                                }
                        }
 
-                       task = rspamd_task_new (NULL, cfg, NULL, NULL, NULL);
+                       task = rspamd_task_new (NULL, cfg, NULL, NULL, NULL, FALSE);
                        task->msg.begin = data->str;
                        task->msg.len = data->len;
                        rspamd_mempool_add_destructor (task->task_pool,
@@ -1629,7 +1629,7 @@ lua_task_load_from_file (lua_State * L)
                        if (!map) {
                                err = strerror (errno);
                        } else {
-                               task = rspamd_task_new (NULL, cfg, NULL, NULL, NULL);
+                               task = rspamd_task_new (NULL, cfg, NULL, NULL, NULL, FALSE);
                                task->msg.begin = map;
                                task->msg.len = sz;
                                rspamd_mempool_add_destructor (task->task_pool,
@@ -1683,7 +1683,7 @@ lua_task_load_from_string (lua_State * L)
                        }
                }
 
-               task = rspamd_task_new (NULL, cfg, NULL, NULL, NULL);
+               task = rspamd_task_new (NULL, cfg, NULL, NULL, NULL, FALSE);
                task->msg.begin = g_malloc (message_len);
                memcpy ((gchar *)task->msg.begin, str_message, message_len);
                task->msg.len  = message_len;
@@ -1729,7 +1729,7 @@ lua_task_create (lua_State * L)
                }
        }
 
-       task = rspamd_task_new (NULL, cfg, NULL, NULL, ev_base);
+       task = rspamd_task_new (NULL, cfg, NULL, NULL, ev_base, FALSE);
        task->flags |= RSPAMD_TASK_FLAG_EMPTY;
 
        ptask = lua_newuserdata (L, sizeof (*ptask));
index b713394029f30818b11dd7cd9ab69105c2fb4c53..3a3561e2f3d5d5e4ec57ab33f68611c794094ba0 100644 (file)
@@ -873,7 +873,7 @@ lua_util_process_message (lua_State *L)
        if (cfg != NULL && message != NULL) {
                base = ev_loop_new (EVFLAG_SIGNALFD|EVBACKEND_ALL);
                rspamd_init_filters (cfg, FALSE);
-               task = rspamd_task_new (NULL, cfg, NULL, NULL, base);
+               task = rspamd_task_new (NULL, cfg, NULL, NULL, base, FALSE);
                task->msg.begin = rspamd_mempool_alloc (task->task_pool, mlen);
                rspamd_strlcpy ((gpointer)task->msg.begin, message, mlen);
                task->msg.len = mlen;
index 713edc41b29864924363bdcf105da7571806560b..e8f02652df272d538da3a7d387996c2876d1ab5e 100644 (file)
@@ -2979,7 +2979,7 @@ fuzzy_process_handler (struct rspamd_http_connection_entry *conn_ent,
 
        /* Prepare task */
        task = rspamd_task_new (session->wrk, session->cfg, NULL,
-                       session->lang_det, conn_ent->rt->event_loop);
+                       session->lang_det, conn_ent->rt->event_loop, FALSE);
        task->cfg = ctx->cfg;
        saved = rspamd_mempool_alloc0 (session->pool, sizeof (gint));
        err = rspamd_mempool_alloc0 (session->pool, sizeof (GError *));
index ee22c4868d7e503af198b79410a6f26fc14a89f0..bceed58552b8651db39e8351359855131e4152e8 100644 (file)
@@ -431,7 +431,7 @@ rspamadm_lua_message_handler (lua_State *L, gint argc, gchar **argv)
                        rspamd_printf ("cannot open %s: %s\n", argv[i], strerror (errno));
                }
                else {
-                       task = rspamd_task_new (NULL, rspamd_main->cfg, NULL, NULL, NULL);
+                       task = rspamd_task_new (NULL, rspamd_main->cfg, NULL, NULL, NULL, FALSE);
 
                        if (!rspamd_task_load_message (task, NULL, map, len)) {
                                rspamd_printf ("cannot load %s\n", argv[i]);
index 6153ddd21d61f448449f542d27ea26b16788e190..7e460c040ba63abca0320dcaa165e895114746b1 100644 (file)
@@ -1754,7 +1754,7 @@ rspamd_proxy_self_scan (struct rspamd_proxy_session *session)
        msg = session->client_message;
        task = rspamd_task_new (session->worker, session->ctx->cfg,
                        session->pool, session->ctx->lang_det,
-                       session->ctx->event_loop);
+                       session->ctx->event_loop, FALSE);
        task->flags |= RSPAMD_TASK_FLAG_MIME;
 
        if (session->ctx->milter) {
index b20800a601b94b34fff7f3a2a10d2b7f84f70aff..8ba221c7a095da83b140897f1781f56caa3a70da 100644 (file)
@@ -66,6 +66,15 @@ worker_t normal_worker = {
         G_STRFUNC, \
         __VA_ARGS__)
 
+struct rspamd_worker_session {
+       gint64 magic;
+       struct rspamd_task *task;
+       gint fd;
+       rspamd_inet_addr_t *addr;
+       struct rspamd_worker_ctx *ctx;
+       struct rspamd_http_connection *http_conn;
+       struct rspamd_worker *worker;
+};
 /*
  * Reduce number of tasks proceeded
  */
@@ -216,10 +225,67 @@ rspamd_worker_body_handler (struct rspamd_http_connection *conn,
        struct rspamd_http_message *msg,
        const gchar *chunk, gsize len)
 {
-       struct rspamd_task *task = (struct rspamd_task *) conn->ud;
+       struct rspamd_worker_session *session = (struct rspamd_worker_session *)conn->ud;
+       struct rspamd_task *task;
        struct rspamd_worker_ctx *ctx;
+       const rspamd_ftok_t *hv_tok;
+       gboolean debug_mempool = FALSE;
+
+       ctx = session->ctx;
+
+       /* Check debug */
+       if ((hv_tok = rspamd_http_message_find_header (msg, "Memory")) != NULL) {
+               rspamd_ftok_t cmp;
+
+               RSPAMD_FTOK_ASSIGN (&cmp, "debug");
+
+               if (rspamd_ftok_cmp (hv_tok, &cmp) == 0) {
+                       debug_mempool = TRUE;
+               }
+       }
+
+       task = rspamd_task_new (session->worker,
+                       session->ctx->cfg, NULL, session->ctx->lang_det,
+                       session->ctx->event_loop,
+                       debug_mempool);
+       session->task = task;
+
+       msg_info_task ("accepted connection from %s port %d, task ptr: %p",
+                       rspamd_inet_address_to_string (session->addr),
+                       rspamd_inet_address_get_port (session->addr),
+                       task);
+
+       /* Copy some variables */
+       if (ctx->is_mime) {
+               task->flags |= RSPAMD_TASK_FLAG_MIME;
+       }
+       else {
+               task->flags &= ~RSPAMD_TASK_FLAG_MIME;
+       }
+
+       /* We actually transfer ownership from session to task here  */
+       task->sock = session->fd;
+       task->client_addr = session->addr;
+       task->worker = session->worker;
+       task->http_conn = session->http_conn;
+
+       task->resolver = ctx->resolver;
+       /* TODO: allow to disable autolearn in protocol */
+       task->flags |= RSPAMD_TASK_FLAG_LEARN_AUTO;
+
+       session->worker->nconns++;
+       rspamd_mempool_add_destructor (task->task_pool,
+                       (rspamd_mempool_destruct_t)reduce_tasks_count,
+                       session->worker);
+
+       /* Session memory is also now handled by task */
+       rspamd_mempool_add_destructor (task->task_pool,
+                       (rspamd_mempool_destruct_t)g_free,
+                       session);
 
-       ctx = task->worker->ctx;
+       /* Set up async session */
+       task->s = rspamd_session_create (task->task_pool, rspamd_task_fin,
+                       rspamd_task_restore, (event_finalizer_t )rspamd_task_free, task);
 
        if (!rspamd_protocol_handle_request (task, msg)) {
                msg_err_task ("cannot handle request: %e", task->err);
@@ -248,7 +314,9 @@ rspamd_worker_body_handler (struct rspamd_http_connection *conn,
 
        /* Set socket guard */
        task->guard_ev.data = task;
-       ev_io_init (&task->guard_ev, rspamd_worker_guard_handler, task->sock, EV_READ);
+       ev_io_init (&task->guard_ev,
+                       rspamd_worker_guard_handler,
+                       task->sock, EV_READ);
        ev_io_start (task->event_loop, &task->guard_ev);
 
        rspamd_task_process (task, RSPAMD_TASK_PROCESS_ALL);
@@ -259,43 +327,84 @@ rspamd_worker_body_handler (struct rspamd_http_connection *conn,
 static void
 rspamd_worker_error_handler (struct rspamd_http_connection *conn, GError *err)
 {
-       struct rspamd_task *task = (struct rspamd_task *) conn->ud;
+       struct rspamd_worker_session *session = (struct rspamd_worker_session *)conn->ud;
+       struct rspamd_task *task;
        struct rspamd_http_message *msg;
        rspamd_fstring_t *reply;
 
-       msg_info_task ("abnormally closing connection from: %s, error: %e",
-               rspamd_inet_address_to_string (task->client_addr), err);
-       if (task->processed_stages & RSPAMD_TASK_STAGE_REPLIED) {
-               /* Terminate session immediately */
-               rspamd_session_destroy (task->s);
+       /*
+        * This function can be called with both struct rspamd_worker_session *
+        * and struct rspamd_task *
+        *
+        * The first case is when we read message and it is controlled by this code;
+        * the second case is when a reply is written and we do not control it normally,
+        * as it is managed by `rspamd_protocol_reply` in protocol.c
+        *
+        * Hence, we need to distinguish our arguments...
+        *
+        * The approach here is simple:
+        * - struct rspamd_worker_session starts with gint64 `magic` and we set it to
+        * MAX_INT64
+        * - struct rspamd_task starts with a pointer (or pointer + command on 32 bit system)
+        *
+        * The idea is simple: no sane pointer would reach MAX_INT64, so if this field
+        * is MAX_INT64 then it is our session, and if it is not then it is a task.
+        */
+
+       if (session->magic == G_MAXINT64) {
+               task = session->task;
        }
        else {
-               task->processed_stages |= RSPAMD_TASK_STAGE_REPLIED;
-               msg = rspamd_http_new_message (HTTP_RESPONSE);
+               task = (struct rspamd_task *)conn->ud;
+       }
+
+
+       if (task) {
+               msg_info_task ("abnormally closing connection from: %s, error: %e",
+                               rspamd_inet_address_to_string_pretty (task->client_addr), err);
 
-               if (err) {
-                       msg->status = rspamd_fstring_new_init (err->message,
-                                       strlen (err->message));
-                       msg->code = err->code;
+               if (task->processed_stages & RSPAMD_TASK_STAGE_REPLIED) {
+                       /* Terminate session immediately */
+                       rspamd_session_destroy (task->s);
                }
                else {
-                       msg->status = rspamd_fstring_new_init ("Internal error",
-                                       strlen ("Internal error"));
-                       msg->code = 500;
-               }
+                       task->processed_stages |= RSPAMD_TASK_STAGE_REPLIED;
+                       msg = rspamd_http_new_message (HTTP_RESPONSE);
 
-               msg->date = time (NULL);
-
-               reply = rspamd_fstring_sized_new (msg->status->len + 16);
-               rspamd_printf_fstring (&reply, "{\"error\":\"%V\"}", msg->status);
-               rspamd_http_message_set_body_from_fstring_steal (msg, reply);
-               rspamd_http_connection_reset (task->http_conn);
-               rspamd_http_connection_write_message (task->http_conn,
-                               msg,
-                               NULL,
-                               "application/json",
-                               task,
-                               1.0);
+                       if (err) {
+                               msg->status = rspamd_fstring_new_init (err->message,
+                                               strlen (err->message));
+                               msg->code = err->code;
+                       }
+                       else {
+                               msg->status = rspamd_fstring_new_init ("Internal error",
+                                               strlen ("Internal error"));
+                               msg->code = 500;
+                       }
+
+                       msg->date = time (NULL);
+
+                       reply = rspamd_fstring_sized_new (msg->status->len + 16);
+                       rspamd_printf_fstring (&reply, "{\"error\":\"%V\"}", msg->status);
+                       rspamd_http_message_set_body_from_fstring_steal (msg, reply);
+                       rspamd_http_connection_reset (task->http_conn);
+                       rspamd_http_connection_write_message (task->http_conn,
+                                       msg,
+                                       NULL,
+                                       "application/json",
+                                       task,
+                                       1.0);
+               }
+       }
+       else {
+               /* If there was no task, then session is unmanaged */
+               msg_info ("no data received from: %s, error: %e",
+                               rspamd_inet_address_to_string_pretty (session->addr), err);
+               rspamd_http_connection_reset (session->http_conn);
+               rspamd_http_connection_unref (session->http_conn);
+               rspamd_inet_address_free (session->addr);
+               close (session->fd);
+               g_free (session);
        }
 }
 
@@ -303,16 +412,38 @@ static gint
 rspamd_worker_finish_handler (struct rspamd_http_connection *conn,
        struct rspamd_http_message *msg)
 {
-       struct rspamd_task *task = (struct rspamd_task *) conn->ud;
+       struct rspamd_worker_session *session = (struct rspamd_worker_session *)conn->ud;
+       struct rspamd_task *task;
 
-       if (task->processed_stages & RSPAMD_TASK_STAGE_REPLIED) {
-               /* We are done here */
-               msg_debug_task ("normally closing connection from: %s",
-                       rspamd_inet_address_to_string (task->client_addr));
-               rspamd_session_destroy (task->s);
+       /* Read the comment to rspamd_worker_error_handler */
+
+       if (session->magic == G_MAXINT64) {
+               task = session->task;
        }
-       else if (task->processed_stages & RSPAMD_TASK_STAGE_DONE) {
-               rspamd_session_pending (task->s);
+       else {
+               task = (struct rspamd_task *)conn->ud;
+       }
+
+       if (task) {
+               if (task->processed_stages & RSPAMD_TASK_STAGE_REPLIED) {
+                       /* We are done here */
+                       msg_debug_task ("normally closing connection from: %s",
+                                       rspamd_inet_address_to_string (task->client_addr));
+                       rspamd_session_destroy (task->s);
+               }
+               else if (task->processed_stages & RSPAMD_TASK_STAGE_DONE) {
+                       rspamd_session_pending (task->s);
+               }
+       }
+       else {
+               /* If there was no task, then session is unmanaged */
+               msg_info ("no data received from: %s, closing connection",
+                               rspamd_inet_address_to_string_pretty (session->addr));
+               rspamd_inet_address_free (session->addr);
+               rspamd_http_connection_reset (session->http_conn);
+               rspamd_http_connection_unref (session->http_conn);
+               close (session->fd);
+               g_free (session);
        }
 
        return 0;
@@ -326,7 +457,7 @@ accept_socket (EV_P_ ev_io *w, int revents)
 {
        struct rspamd_worker *worker = (struct rspamd_worker *) w->data;
        struct rspamd_worker_ctx *ctx;
-       struct rspamd_task *task;
+       struct rspamd_worker_session *session;
        rspamd_inet_addr_t *addr;
        gint nfd, http_opts = 0;
 
@@ -350,55 +481,35 @@ accept_socket (EV_P_ ev_io *w, int revents)
                return;
        }
 
-       task = rspamd_task_new (worker, ctx->cfg, NULL, ctx->lang_det, ctx->event_loop);
-
-       msg_info_task ("accepted connection from %s port %d, task ptr: %p",
-               rspamd_inet_address_to_string (addr),
-               rspamd_inet_address_get_port (addr),
-               task);
-
-       /* Copy some variables */
-       if (ctx->is_mime) {
-               task->flags |= RSPAMD_TASK_FLAG_MIME;
-       }
-       else {
-               task->flags &= ~RSPAMD_TASK_FLAG_MIME;
-       }
-
-       task->sock = nfd;
-       task->client_addr = addr;
-
-       worker->srv->stat->connections_count++;
-       task->resolver = ctx->resolver;
-       /* TODO: allow to disable autolearn in protocol */
-       task->flags |= RSPAMD_TASK_FLAG_LEARN_AUTO;
+       session = g_malloc (sizeof (*session));
+       session->magic = G_MAXINT64;
+       session->addr = addr;
+       session->fd = nfd;
+       session->ctx = ctx;
+       session->worker = worker;
 
        if (ctx->encrypted_only && !rspamd_inet_address_is_local (addr, FALSE)) {
                http_opts = RSPAMD_HTTP_REQUIRE_ENCRYPTION;
        }
 
-       task->http_conn = rspamd_http_connection_new_server (
+       session->http_conn = rspamd_http_connection_new_server (
                        ctx->http_ctx,
                        nfd,
                        rspamd_worker_body_handler,
                        rspamd_worker_error_handler,
                        rspamd_worker_finish_handler,
                        http_opts);
-       rspamd_http_connection_set_max_size (task->http_conn, task->cfg->max_message);
-       worker->nconns++;
-       rspamd_mempool_add_destructor (task->task_pool,
-               (rspamd_mempool_destruct_t)reduce_tasks_count, worker);
 
-       /* Set up async session */
-       task->s = rspamd_session_create (task->task_pool, rspamd_task_fin,
-                       rspamd_task_restore, (event_finalizer_t )rspamd_task_free, task);
+       worker->srv->stat->connections_count++;
+       rspamd_http_connection_set_max_size (session->http_conn,
+                       ctx->cfg->max_message);
 
        if (ctx->key) {
-               rspamd_http_connection_set_key (task->http_conn, ctx->key);
+               rspamd_http_connection_set_key (session->http_conn, ctx->key);
        }
 
-       rspamd_http_connection_read_message (task->http_conn,
-                       task,
+       rspamd_http_connection_read_message (session->http_conn,
+                       session,
                        ctx->timeout);
 }