diff options
author | Vsevolod Stakhov <vsevolod@highsecure.ru> | 2019-12-23 18:43:16 +0000 |
---|---|---|
committer | Vsevolod Stakhov <vsevolod@highsecure.ru> | 2019-12-23 18:43:16 +0000 |
commit | 23b99d31a9a620b3c62848da4126dbbf0642f6db (patch) | |
tree | beb2ae9a991c97e9c6eb3c0f8baa8a43076e0a11 | |
parent | db425a97b301299cc0d2cb732a5e266e83216299 (diff) | |
download | rspamd-23b99d31a9a620b3c62848da4126dbbf0642f6db.tar.gz rspamd-23b99d31a9a620b3c62848da4126dbbf0642f6db.zip |
[Project] Allow to enable mempool debugging from the protocol
-rw-r--r-- | src/controller.c | 12 | ||||
-rw-r--r-- | src/libserver/task.c | 6 | ||||
-rw-r--r-- | src/libserver/task.h | 3 | ||||
-rw-r--r-- | src/libserver/worker_util.c | 2 | ||||
-rw-r--r-- | src/lua/lua_task.c | 8 | ||||
-rw-r--r-- | src/lua/lua_util.c | 2 | ||||
-rw-r--r-- | src/plugins/fuzzy_check.c | 2 | ||||
-rw-r--r-- | src/rspamadm/lua_repl.c | 2 | ||||
-rw-r--r-- | src/rspamd_proxy.c | 2 | ||||
-rw-r--r-- | src/worker.c | 257 |
10 files changed, 205 insertions, 91 deletions
diff --git a/src/controller.c b/src/controller.c index e4ad7085d..d36431702 100644 --- a/src/controller.c +++ b/src/controller.c @@ -1514,7 +1514,7 @@ rspamd_controller_handle_lua_history (lua_State *L, if (lua_isfunction (L, -1)) { task = rspamd_task_new (session->ctx->worker, session->cfg, - session->pool, ctx->lang_det, ctx->event_loop); + session->pool, ctx->lang_det, ctx->event_loop, FALSE); task->resolver = ctx->resolver; task->s = rspamd_session_create (session->pool, @@ -1811,7 +1811,7 @@ rspamd_controller_handle_lua (struct rspamd_http_connection_entry *conn_ent, } task = rspamd_task_new (session->ctx->worker, session->cfg, session->pool, - ctx->lang_det, ctx->event_loop); + ctx->lang_det, ctx->event_loop, FALSE); task->resolver = ctx->resolver; task->s = rspamd_session_create (session->pool, @@ -1996,7 +1996,7 @@ rspamd_controller_handle_learn_common ( } task = rspamd_task_new (session->ctx->worker, session->cfg, session->pool, - session->ctx->lang_det, ctx->event_loop); + session->ctx->lang_det, ctx->event_loop, FALSE); task->resolver = ctx->resolver; task->s = rspamd_session_create (session->pool, @@ -2095,7 +2095,7 @@ rspamd_controller_handle_scan (struct rspamd_http_connection_entry *conn_ent, } task = rspamd_task_new (session->ctx->worker, session->cfg, session->pool, - ctx->lang_det, ctx->event_loop); + ctx->lang_det, ctx->event_loop, FALSE); task->resolver = ctx->resolver; task->s = rspamd_session_create (session->pool, @@ -2584,7 +2584,7 @@ rspamd_controller_handle_stat_common ( ctx = session->ctx; task = rspamd_task_new (session->ctx->worker, session->cfg, session->pool, - ctx->lang_det, ctx->event_loop); + ctx->lang_det, ctx->event_loop, FALSE); task->resolver = ctx->resolver; cbdata = rspamd_mempool_alloc0 (session->pool, sizeof (*cbdata)); cbdata->conn_ent = conn_ent; @@ -2986,7 +2986,7 @@ rspamd_controller_handle_lua_plugin (struct rspamd_http_connection_entry *conn_e } task = rspamd_task_new (session->ctx->worker, session->cfg, session->pool, - ctx->lang_det, ctx->event_loop); + ctx->lang_det, ctx->event_loop, FALSE); task->resolver = ctx->resolver; task->s = rspamd_session_create (session->pool, diff --git a/src/libserver/task.c b/src/libserver/task.c index 9eebe02a2..ccd959920 100644 --- a/src/libserver/task.c +++ b/src/libserver/task.c @@ -64,14 +64,16 @@ rspamd_task_new (struct rspamd_worker *worker, struct rspamd_config *cfg, rspamd_mempool_t *pool, struct rspamd_lang_detector *lang_det, - struct ev_loop *event_loop) + struct ev_loop *event_loop, + gboolean debug_mem) { struct rspamd_task *new_task; rspamd_mempool_t *task_pool; guint flags = 0; if (pool == NULL) { - task_pool = rspamd_mempool_new (rspamd_mempool_suggest_size (), "task", 0); + task_pool = rspamd_mempool_new (rspamd_mempool_suggest_size (), + "task", debug_mem ? RSPAMD_MEMPOOL_DEBUG : 0); flags |= RSPAMD_TASK_FLAG_OWN_POOL; } else { diff --git a/src/libserver/task.h b/src/libserver/task.h index a96e2ac05..feac456dd 100644 --- a/src/libserver/task.h +++ b/src/libserver/task.h @@ -225,7 +225,8 @@ struct rspamd_task *rspamd_task_new (struct rspamd_worker *worker, struct rspamd_config *cfg, rspamd_mempool_t *pool, struct rspamd_lang_detector *lang_det, - struct ev_loop *event_loop); + struct ev_loop *event_loop, + gboolean debug_mem); /** * Destroy task object and remove its IO dispatcher if it exists diff --git a/src/libserver/worker_util.c b/src/libserver/worker_util.c index 362d64bc5..e519cb985 100644 --- a/src/libserver/worker_util.c +++ b/src/libserver/worker_util.c @@ -143,7 +143,7 @@ rspamd_worker_call_finish_handlers (struct rspamd_worker *worker) if (cfg->on_term_scripts) { ctx = (struct rspamd_abstract_worker_ctx *)worker->ctx; /* Create a fake task object for async events */ - task = rspamd_task_new (worker, cfg, NULL, NULL, ctx->event_loop); + task = rspamd_task_new (worker, cfg, NULL, NULL, ctx->event_loop, FALSE); task->resolver = ctx->resolver; task->flags |= RSPAMD_TASK_FLAG_PROCESSING; task->s = rspamd_session_create (task->task_pool, diff --git a/src/lua/lua_task.c b/src/lua/lua_task.c index cf57ebf13..774bb0120 100644 --- a/src/lua/lua_task.c +++ b/src/lua/lua_task.c @@ -1615,7 +1615,7 @@ lua_task_load_from_file (lua_State * L) } } - task = rspamd_task_new (NULL, cfg, NULL, NULL, NULL); + task = rspamd_task_new (NULL, cfg, NULL, NULL, NULL, FALSE); task->msg.begin = data->str; task->msg.len = data->len; rspamd_mempool_add_destructor (task->task_pool, @@ -1629,7 +1629,7 @@ lua_task_load_from_file (lua_State * L) if (!map) { err = strerror (errno); } else { - task = rspamd_task_new (NULL, cfg, NULL, NULL, NULL); + task = rspamd_task_new (NULL, cfg, NULL, NULL, NULL, FALSE); task->msg.begin = map; task->msg.len = sz; rspamd_mempool_add_destructor (task->task_pool, @@ -1683,7 +1683,7 @@ lua_task_load_from_string (lua_State * L) } } - task = rspamd_task_new (NULL, cfg, NULL, NULL, NULL); + task = rspamd_task_new (NULL, cfg, NULL, NULL, NULL, FALSE); task->msg.begin = g_malloc (message_len); memcpy ((gchar *)task->msg.begin, str_message, message_len); task->msg.len = message_len; @@ -1729,7 +1729,7 @@ lua_task_create (lua_State * L) } } - task = rspamd_task_new (NULL, cfg, NULL, NULL, ev_base); + task = rspamd_task_new (NULL, cfg, NULL, NULL, ev_base, FALSE); task->flags |= RSPAMD_TASK_FLAG_EMPTY; ptask = lua_newuserdata (L, sizeof (*ptask)); diff --git a/src/lua/lua_util.c b/src/lua/lua_util.c index b71339402..3a3561e2f 100644 --- a/src/lua/lua_util.c +++ b/src/lua/lua_util.c @@ -873,7 +873,7 @@ lua_util_process_message (lua_State *L) if (cfg != NULL && message != NULL) { base = ev_loop_new (EVFLAG_SIGNALFD|EVBACKEND_ALL); rspamd_init_filters (cfg, FALSE); - task = rspamd_task_new (NULL, cfg, NULL, NULL, base); + task = rspamd_task_new (NULL, cfg, NULL, NULL, base, FALSE); task->msg.begin = rspamd_mempool_alloc (task->task_pool, mlen); rspamd_strlcpy ((gpointer)task->msg.begin, message, mlen); task->msg.len = mlen; diff --git a/src/plugins/fuzzy_check.c b/src/plugins/fuzzy_check.c index 713edc41b..e8f02652d 100644 --- a/src/plugins/fuzzy_check.c +++ b/src/plugins/fuzzy_check.c @@ -2979,7 +2979,7 @@ fuzzy_process_handler (struct rspamd_http_connection_entry *conn_ent, /* Prepare task */ task = rspamd_task_new (session->wrk, session->cfg, NULL, - session->lang_det, conn_ent->rt->event_loop); + session->lang_det, conn_ent->rt->event_loop, FALSE); task->cfg = ctx->cfg; saved = rspamd_mempool_alloc0 (session->pool, sizeof (gint)); err = rspamd_mempool_alloc0 (session->pool, sizeof (GError *)); diff --git a/src/rspamadm/lua_repl.c b/src/rspamadm/lua_repl.c index ee22c4868..bceed5855 100644 --- a/src/rspamadm/lua_repl.c +++ b/src/rspamadm/lua_repl.c @@ -431,7 +431,7 @@ rspamadm_lua_message_handler (lua_State *L, gint argc, gchar **argv) rspamd_printf ("cannot open %s: %s\n", argv[i], strerror (errno)); } else { - task = rspamd_task_new (NULL, rspamd_main->cfg, NULL, NULL, NULL); + task = rspamd_task_new (NULL, rspamd_main->cfg, NULL, NULL, NULL, FALSE); if (!rspamd_task_load_message (task, NULL, map, len)) { rspamd_printf ("cannot load %s\n", argv[i]); diff --git a/src/rspamd_proxy.c b/src/rspamd_proxy.c index 6153ddd21..7e460c040 100644 --- a/src/rspamd_proxy.c +++ b/src/rspamd_proxy.c @@ -1754,7 +1754,7 @@ rspamd_proxy_self_scan (struct rspamd_proxy_session *session) msg = session->client_message; task = rspamd_task_new (session->worker, session->ctx->cfg, session->pool, session->ctx->lang_det, - session->ctx->event_loop); + session->ctx->event_loop, FALSE); task->flags |= RSPAMD_TASK_FLAG_MIME; if (session->ctx->milter) { diff --git a/src/worker.c b/src/worker.c index b20800a60..8ba221c7a 100644 --- a/src/worker.c +++ b/src/worker.c @@ -66,6 +66,15 @@ worker_t normal_worker = { G_STRFUNC, \ __VA_ARGS__) +struct rspamd_worker_session { + gint64 magic; + struct rspamd_task *task; + gint fd; + rspamd_inet_addr_t *addr; + struct rspamd_worker_ctx *ctx; + struct rspamd_http_connection *http_conn; + struct rspamd_worker *worker; +}; /* * Reduce number of tasks proceeded */ @@ -216,10 +225,67 @@ rspamd_worker_body_handler (struct rspamd_http_connection *conn, struct rspamd_http_message *msg, const gchar *chunk, gsize len) { - struct rspamd_task *task = (struct rspamd_task *) conn->ud; + struct rspamd_worker_session *session = (struct rspamd_worker_session *)conn->ud; + struct rspamd_task *task; struct rspamd_worker_ctx *ctx; + const rspamd_ftok_t *hv_tok; + gboolean debug_mempool = FALSE; + + ctx = session->ctx; + + /* Check debug */ + if ((hv_tok = rspamd_http_message_find_header (msg, "Memory")) != NULL) { + rspamd_ftok_t cmp; + + RSPAMD_FTOK_ASSIGN (&cmp, "debug"); + + if (rspamd_ftok_cmp (hv_tok, &cmp) == 0) { + debug_mempool = TRUE; + } + } + + task = rspamd_task_new (session->worker, + session->ctx->cfg, NULL, session->ctx->lang_det, + session->ctx->event_loop, + debug_mempool); + session->task = task; + + msg_info_task ("accepted connection from %s port %d, task ptr: %p", + rspamd_inet_address_to_string (session->addr), + rspamd_inet_address_get_port (session->addr), + task); + + /* Copy some variables */ + if (ctx->is_mime) { + task->flags |= RSPAMD_TASK_FLAG_MIME; + } + else { + task->flags &= ~RSPAMD_TASK_FLAG_MIME; + } + + /* We actually transfer ownership from session to task here */ + task->sock = session->fd; + task->client_addr = session->addr; + task->worker = session->worker; + task->http_conn = session->http_conn; + + task->resolver = ctx->resolver; + /* TODO: allow to disable autolearn in protocol */ + task->flags |= RSPAMD_TASK_FLAG_LEARN_AUTO; + + session->worker->nconns++; + rspamd_mempool_add_destructor (task->task_pool, + (rspamd_mempool_destruct_t)reduce_tasks_count, + session->worker); + + /* Session memory is also now handled by task */ + rspamd_mempool_add_destructor (task->task_pool, + (rspamd_mempool_destruct_t)g_free, + session); - ctx = task->worker->ctx; + /* Set up async session */ + task->s = rspamd_session_create (task->task_pool, rspamd_task_fin, + rspamd_task_restore, (event_finalizer_t )rspamd_task_free, task); if (!rspamd_protocol_handle_request (task, msg)) { msg_err_task ("cannot handle request: %e", task->err); @@ -248,7 +314,9 @@ rspamd_worker_body_handler (struct rspamd_http_connection *conn, /* Set socket guard */ task->guard_ev.data = task; - ev_io_init (&task->guard_ev, rspamd_worker_guard_handler, task->sock, EV_READ); + ev_io_init (&task->guard_ev, + rspamd_worker_guard_handler, + task->sock, EV_READ); ev_io_start (task->event_loop, &task->guard_ev); rspamd_task_process (task, RSPAMD_TASK_PROCESS_ALL); @@ -259,43 +327,84 @@ rspamd_worker_body_handler (struct rspamd_http_connection *conn, static void rspamd_worker_error_handler (struct rspamd_http_connection *conn, GError *err) { - struct rspamd_task *task = (struct rspamd_task *) conn->ud; + struct rspamd_worker_session *session = (struct rspamd_worker_session *)conn->ud; + struct rspamd_task *task; struct rspamd_http_message *msg; rspamd_fstring_t *reply; - msg_info_task ("abnormally closing connection from: %s, error: %e", - rspamd_inet_address_to_string (task->client_addr), err); - if (task->processed_stages & RSPAMD_TASK_STAGE_REPLIED) { - /* Terminate session immediately */ - rspamd_session_destroy (task->s); + /* + * This function can be called with both struct rspamd_worker_session * + * and struct rspamd_task * + * + * The first case is when we read message and it is controlled by this code; + * the second case is when a reply is written and we do not control it normally, + * as it is managed by `rspamd_protocol_reply` in protocol.c + * + * Hence, we need to distinguish our arguments... + * + * The approach here is simple: + * - struct rspamd_worker_session starts with gint64 `magic` and we set it to + * MAX_INT64 + * - struct rspamd_task starts with a pointer (or pointer + command on 32 bit system) + * + * The idea is simple: no sane pointer would reach MAX_INT64, so if this field + * is MAX_INT64 then it is our session, and if it is not then it is a task. + */ + + if (session->magic == G_MAXINT64) { + task = session->task; } else { - task->processed_stages |= RSPAMD_TASK_STAGE_REPLIED; - msg = rspamd_http_new_message (HTTP_RESPONSE); + task = (struct rspamd_task *)conn->ud; + } + + + if (task) { + msg_info_task ("abnormally closing connection from: %s, error: %e", + rspamd_inet_address_to_string_pretty (task->client_addr), err); - if (err) { - msg->status = rspamd_fstring_new_init (err->message, - strlen (err->message)); - msg->code = err->code; + if (task->processed_stages & RSPAMD_TASK_STAGE_REPLIED) { + /* Terminate session immediately */ + rspamd_session_destroy (task->s); } else { - msg->status = rspamd_fstring_new_init ("Internal error", - strlen ("Internal error")); - msg->code = 500; - } + task->processed_stages |= RSPAMD_TASK_STAGE_REPLIED; + msg = rspamd_http_new_message (HTTP_RESPONSE); - msg->date = time (NULL); - - reply = rspamd_fstring_sized_new (msg->status->len + 16); - rspamd_printf_fstring (&reply, "{\"error\":\"%V\"}", msg->status); - rspamd_http_message_set_body_from_fstring_steal (msg, reply); - rspamd_http_connection_reset (task->http_conn); - rspamd_http_connection_write_message (task->http_conn, - msg, - NULL, - "application/json", - task, - 1.0); + if (err) { + msg->status = rspamd_fstring_new_init (err->message, + strlen (err->message)); + msg->code = err->code; + } + else { + msg->status = rspamd_fstring_new_init ("Internal error", + strlen ("Internal error")); + msg->code = 500; + } + + msg->date = time (NULL); + + reply = rspamd_fstring_sized_new (msg->status->len + 16); + rspamd_printf_fstring (&reply, "{\"error\":\"%V\"}", msg->status); + rspamd_http_message_set_body_from_fstring_steal (msg, reply); + rspamd_http_connection_reset (task->http_conn); + rspamd_http_connection_write_message (task->http_conn, + msg, + NULL, + "application/json", + task, + 1.0); + } + } + else { + /* If there was no task, then session is unmanaged */ + msg_info ("no data received from: %s, error: %e", + rspamd_inet_address_to_string_pretty (session->addr), err); + rspamd_http_connection_reset (session->http_conn); + rspamd_http_connection_unref (session->http_conn); + rspamd_inet_address_free (session->addr); + close (session->fd); + g_free (session); } } @@ -303,16 +412,38 @@ static gint rspamd_worker_finish_handler (struct rspamd_http_connection *conn, struct rspamd_http_message *msg) { - struct rspamd_task *task = (struct rspamd_task *) conn->ud; + struct rspamd_worker_session *session = (struct rspamd_worker_session *)conn->ud; + struct rspamd_task *task; - if (task->processed_stages & RSPAMD_TASK_STAGE_REPLIED) { - /* We are done here */ - msg_debug_task ("normally closing connection from: %s", - rspamd_inet_address_to_string (task->client_addr)); - rspamd_session_destroy (task->s); + /* Read the comment to rspamd_worker_error_handler */ + + if (session->magic == G_MAXINT64) { + task = session->task; } - else if (task->processed_stages & RSPAMD_TASK_STAGE_DONE) { - rspamd_session_pending (task->s); + else { + task = (struct rspamd_task *)conn->ud; + } + + if (task) { + if (task->processed_stages & RSPAMD_TASK_STAGE_REPLIED) { + /* We are done here */ + msg_debug_task ("normally closing connection from: %s", + rspamd_inet_address_to_string (task->client_addr)); + rspamd_session_destroy (task->s); + } + else if (task->processed_stages & RSPAMD_TASK_STAGE_DONE) { + rspamd_session_pending (task->s); + } + } + else { + /* If there was no task, then session is unmanaged */ + msg_info ("no data received from: %s, closing connection", + rspamd_inet_address_to_string_pretty (session->addr)); + rspamd_inet_address_free (session->addr); + rspamd_http_connection_reset (session->http_conn); + rspamd_http_connection_unref (session->http_conn); + close (session->fd); + g_free (session); } return 0; @@ -326,7 +457,7 @@ accept_socket (EV_P_ ev_io *w, int revents) { struct rspamd_worker *worker = (struct rspamd_worker *) w->data; struct rspamd_worker_ctx *ctx; - struct rspamd_task *task; + struct rspamd_worker_session *session; rspamd_inet_addr_t *addr; gint nfd, http_opts = 0; @@ -350,55 +481,35 @@ accept_socket (EV_P_ ev_io *w, int revents) return; } - task = rspamd_task_new (worker, ctx->cfg, NULL, ctx->lang_det, ctx->event_loop); - - msg_info_task ("accepted connection from %s port %d, task ptr: %p", - rspamd_inet_address_to_string (addr), - rspamd_inet_address_get_port (addr), - task); - - /* Copy some variables */ - if (ctx->is_mime) { - task->flags |= RSPAMD_TASK_FLAG_MIME; - } - else { - task->flags &= ~RSPAMD_TASK_FLAG_MIME; - } - - task->sock = nfd; - task->client_addr = addr; - - worker->srv->stat->connections_count++; - task->resolver = ctx->resolver; - /* TODO: allow to disable autolearn in protocol */ - task->flags |= RSPAMD_TASK_FLAG_LEARN_AUTO; + session = g_malloc (sizeof (*session)); + session->magic = G_MAXINT64; + session->addr = addr; + session->fd = nfd; + session->ctx = ctx; + session->worker = worker; if (ctx->encrypted_only && !rspamd_inet_address_is_local (addr, FALSE)) { http_opts = RSPAMD_HTTP_REQUIRE_ENCRYPTION; } - task->http_conn = rspamd_http_connection_new_server ( + session->http_conn = rspamd_http_connection_new_server ( ctx->http_ctx, nfd, rspamd_worker_body_handler, rspamd_worker_error_handler, rspamd_worker_finish_handler, http_opts); - rspamd_http_connection_set_max_size (task->http_conn, task->cfg->max_message); - worker->nconns++; - rspamd_mempool_add_destructor (task->task_pool, - (rspamd_mempool_destruct_t)reduce_tasks_count, worker); - /* Set up async session */ - task->s = rspamd_session_create (task->task_pool, rspamd_task_fin, - rspamd_task_restore, (event_finalizer_t )rspamd_task_free, task); + worker->srv->stat->connections_count++; + rspamd_http_connection_set_max_size (session->http_conn, + ctx->cfg->max_message); if (ctx->key) { - rspamd_http_connection_set_key (task->http_conn, ctx->key); + rspamd_http_connection_set_key (session->http_conn, ctx->key); } - rspamd_http_connection_read_message (task->http_conn, - task, + rspamd_http_connection_read_message (session->http_conn, + session, ctx->timeout); } |