diff options
author | Vsevolod Stakhov <vsevolod@highsecure.ru> | 2019-12-23 18:43:16 +0000 |
---|---|---|
committer | Vsevolod Stakhov <vsevolod@highsecure.ru> | 2019-12-23 18:43:16 +0000 |
commit | 23b99d31a9a620b3c62848da4126dbbf0642f6db (patch) | |
tree | beb2ae9a991c97e9c6eb3c0f8baa8a43076e0a11 /src/worker.c | |
parent | db425a97b301299cc0d2cb732a5e266e83216299 (diff) | |
download | rspamd-23b99d31a9a620b3c62848da4126dbbf0642f6db.tar.gz rspamd-23b99d31a9a620b3c62848da4126dbbf0642f6db.zip |
[Project] Allow to enable mempool debugging from the protocol
Diffstat (limited to 'src/worker.c')
-rw-r--r-- | src/worker.c | 257 |
1 files changed, 184 insertions, 73 deletions
diff --git a/src/worker.c b/src/worker.c index b20800a60..8ba221c7a 100644 --- a/src/worker.c +++ b/src/worker.c @@ -66,6 +66,15 @@ worker_t normal_worker = { G_STRFUNC, \ __VA_ARGS__) +struct rspamd_worker_session { + gint64 magic; + struct rspamd_task *task; + gint fd; + rspamd_inet_addr_t *addr; + struct rspamd_worker_ctx *ctx; + struct rspamd_http_connection *http_conn; + struct rspamd_worker *worker; +}; /* * Reduce number of tasks proceeded */ @@ -216,10 +225,67 @@ rspamd_worker_body_handler (struct rspamd_http_connection *conn, struct rspamd_http_message *msg, const gchar *chunk, gsize len) { - struct rspamd_task *task = (struct rspamd_task *) conn->ud; + struct rspamd_worker_session *session = (struct rspamd_worker_session *)conn->ud; + struct rspamd_task *task; struct rspamd_worker_ctx *ctx; + const rspamd_ftok_t *hv_tok; + gboolean debug_mempool = FALSE; + + ctx = session->ctx; + + /* Check debug */ + if ((hv_tok = rspamd_http_message_find_header (msg, "Memory")) != NULL) { + rspamd_ftok_t cmp; + + RSPAMD_FTOK_ASSIGN (&cmp, "debug"); + + if (rspamd_ftok_cmp (hv_tok, &cmp) == 0) { + debug_mempool = TRUE; + } + } + + task = rspamd_task_new (session->worker, + session->ctx->cfg, NULL, session->ctx->lang_det, + session->ctx->event_loop, + debug_mempool); + session->task = task; + + msg_info_task ("accepted connection from %s port %d, task ptr: %p", + rspamd_inet_address_to_string (session->addr), + rspamd_inet_address_get_port (session->addr), + task); + + /* Copy some variables */ + if (ctx->is_mime) { + task->flags |= RSPAMD_TASK_FLAG_MIME; + } + else { + task->flags &= ~RSPAMD_TASK_FLAG_MIME; + } + + /* We actually transfer ownership from session to task here */ + task->sock = session->fd; + task->client_addr = session->addr; + task->worker = session->worker; + task->http_conn = session->http_conn; + + task->resolver = ctx->resolver; + /* TODO: allow to disable autolearn in protocol */ + task->flags |= RSPAMD_TASK_FLAG_LEARN_AUTO; + + session->worker->nconns++; + rspamd_mempool_add_destructor (task->task_pool, + (rspamd_mempool_destruct_t)reduce_tasks_count, + session->worker); + + /* Session memory is also now handled by task */ + rspamd_mempool_add_destructor (task->task_pool, + (rspamd_mempool_destruct_t)g_free, + session); - ctx = task->worker->ctx; + /* Set up async session */ + task->s = rspamd_session_create (task->task_pool, rspamd_task_fin, + rspamd_task_restore, (event_finalizer_t )rspamd_task_free, task); if (!rspamd_protocol_handle_request (task, msg)) { msg_err_task ("cannot handle request: %e", task->err); @@ -248,7 +314,9 @@ rspamd_worker_body_handler (struct rspamd_http_connection *conn, /* Set socket guard */ task->guard_ev.data = task; - ev_io_init (&task->guard_ev, rspamd_worker_guard_handler, task->sock, EV_READ); + ev_io_init (&task->guard_ev, + rspamd_worker_guard_handler, + task->sock, EV_READ); ev_io_start (task->event_loop, &task->guard_ev); rspamd_task_process (task, RSPAMD_TASK_PROCESS_ALL); @@ -259,43 +327,84 @@ rspamd_worker_body_handler (struct rspamd_http_connection *conn, static void rspamd_worker_error_handler (struct rspamd_http_connection *conn, GError *err) { - struct rspamd_task *task = (struct rspamd_task *) conn->ud; + struct rspamd_worker_session *session = (struct rspamd_worker_session *)conn->ud; + struct rspamd_task *task; struct rspamd_http_message *msg; rspamd_fstring_t *reply; - msg_info_task ("abnormally closing connection from: %s, error: %e", - rspamd_inet_address_to_string (task->client_addr), err); - if (task->processed_stages & RSPAMD_TASK_STAGE_REPLIED) { - /* Terminate session immediately */ - rspamd_session_destroy (task->s); + /* + * This function can be called with both struct rspamd_worker_session * + * and struct rspamd_task * + * + * The first case is when we read message and it is controlled by this code; + * the second case is when a reply is written and we do not control it normally, + * as it is managed by `rspamd_protocol_reply` in protocol.c + * + * Hence, we need to distinguish our arguments... + * + * The approach here is simple: + * - struct rspamd_worker_session starts with gint64 `magic` and we set it to + * MAX_INT64 + * - struct rspamd_task starts with a pointer (or pointer + command on 32 bit system) + * + * The idea is simple: no sane pointer would reach MAX_INT64, so if this field + * is MAX_INT64 then it is our session, and if it is not then it is a task. + */ + + if (session->magic == G_MAXINT64) { + task = session->task; } else { - task->processed_stages |= RSPAMD_TASK_STAGE_REPLIED; - msg = rspamd_http_new_message (HTTP_RESPONSE); + task = (struct rspamd_task *)conn->ud; + } + + + if (task) { + msg_info_task ("abnormally closing connection from: %s, error: %e", + rspamd_inet_address_to_string_pretty (task->client_addr), err); - if (err) { - msg->status = rspamd_fstring_new_init (err->message, - strlen (err->message)); - msg->code = err->code; + if (task->processed_stages & RSPAMD_TASK_STAGE_REPLIED) { + /* Terminate session immediately */ + rspamd_session_destroy (task->s); } else { - msg->status = rspamd_fstring_new_init ("Internal error", - strlen ("Internal error")); - msg->code = 500; - } + task->processed_stages |= RSPAMD_TASK_STAGE_REPLIED; + msg = rspamd_http_new_message (HTTP_RESPONSE); - msg->date = time (NULL); - - reply = rspamd_fstring_sized_new (msg->status->len + 16); - rspamd_printf_fstring (&reply, "{\"error\":\"%V\"}", msg->status); - rspamd_http_message_set_body_from_fstring_steal (msg, reply); - rspamd_http_connection_reset (task->http_conn); - rspamd_http_connection_write_message (task->http_conn, - msg, - NULL, - "application/json", - task, - 1.0); + if (err) { + msg->status = rspamd_fstring_new_init (err->message, + strlen (err->message)); + msg->code = err->code; + } + else { + msg->status = rspamd_fstring_new_init ("Internal error", + strlen ("Internal error")); + msg->code = 500; + } + + msg->date = time (NULL); + + reply = rspamd_fstring_sized_new (msg->status->len + 16); + rspamd_printf_fstring (&reply, "{\"error\":\"%V\"}", msg->status); + rspamd_http_message_set_body_from_fstring_steal (msg, reply); + rspamd_http_connection_reset (task->http_conn); + rspamd_http_connection_write_message (task->http_conn, + msg, + NULL, + "application/json", + task, + 1.0); + } + } + else { + /* If there was no task, then session is unmanaged */ + msg_info ("no data received from: %s, error: %e", + rspamd_inet_address_to_string_pretty (session->addr), err); + rspamd_http_connection_reset (session->http_conn); + rspamd_http_connection_unref (session->http_conn); + rspamd_inet_address_free (session->addr); + close (session->fd); + g_free (session); } } @@ -303,16 +412,38 @@ static gint rspamd_worker_finish_handler (struct rspamd_http_connection *conn, struct rspamd_http_message *msg) { - struct rspamd_task *task = (struct rspamd_task *) conn->ud; + struct rspamd_worker_session *session = (struct rspamd_worker_session *)conn->ud; + struct rspamd_task *task; - if (task->processed_stages & RSPAMD_TASK_STAGE_REPLIED) { - /* We are done here */ - msg_debug_task ("normally closing connection from: %s", - rspamd_inet_address_to_string (task->client_addr)); - rspamd_session_destroy (task->s); + /* Read the comment to rspamd_worker_error_handler */ + + if (session->magic == G_MAXINT64) { + task = session->task; } - else if (task->processed_stages & RSPAMD_TASK_STAGE_DONE) { - rspamd_session_pending (task->s); + else { + task = (struct rspamd_task *)conn->ud; + } + + if (task) { + if (task->processed_stages & RSPAMD_TASK_STAGE_REPLIED) { + /* We are done here */ + msg_debug_task ("normally closing connection from: %s", + rspamd_inet_address_to_string (task->client_addr)); + rspamd_session_destroy (task->s); + } + else if (task->processed_stages & RSPAMD_TASK_STAGE_DONE) { + rspamd_session_pending (task->s); + } + } + else { + /* If there was no task, then session is unmanaged */ + msg_info ("no data received from: %s, closing connection", + rspamd_inet_address_to_string_pretty (session->addr)); + rspamd_inet_address_free (session->addr); + rspamd_http_connection_reset (session->http_conn); + rspamd_http_connection_unref (session->http_conn); + close (session->fd); + g_free (session); } return 0; @@ -326,7 +457,7 @@ accept_socket (EV_P_ ev_io *w, int revents) { struct rspamd_worker *worker = (struct rspamd_worker *) w->data; struct rspamd_worker_ctx *ctx; - struct rspamd_task *task; + struct rspamd_worker_session *session; rspamd_inet_addr_t *addr; gint nfd, http_opts = 0; @@ -350,55 +481,35 @@ accept_socket (EV_P_ ev_io *w, int revents) return; } - task = rspamd_task_new (worker, ctx->cfg, NULL, ctx->lang_det, ctx->event_loop); - - msg_info_task ("accepted connection from %s port %d, task ptr: %p", - rspamd_inet_address_to_string (addr), - rspamd_inet_address_get_port (addr), - task); - - /* Copy some variables */ - if (ctx->is_mime) { - task->flags |= RSPAMD_TASK_FLAG_MIME; - } - else { - task->flags &= ~RSPAMD_TASK_FLAG_MIME; - } - - task->sock = nfd; - task->client_addr = addr; - - worker->srv->stat->connections_count++; - task->resolver = ctx->resolver; - /* TODO: allow to disable autolearn in protocol */ - task->flags |= RSPAMD_TASK_FLAG_LEARN_AUTO; + session = g_malloc (sizeof (*session)); + session->magic = G_MAXINT64; + session->addr = addr; + session->fd = nfd; + session->ctx = ctx; + session->worker = worker; if (ctx->encrypted_only && !rspamd_inet_address_is_local (addr, FALSE)) { http_opts = RSPAMD_HTTP_REQUIRE_ENCRYPTION; } - task->http_conn = rspamd_http_connection_new_server ( + session->http_conn = rspamd_http_connection_new_server ( ctx->http_ctx, nfd, rspamd_worker_body_handler, rspamd_worker_error_handler, rspamd_worker_finish_handler, http_opts); - rspamd_http_connection_set_max_size (task->http_conn, task->cfg->max_message); - worker->nconns++; - rspamd_mempool_add_destructor (task->task_pool, - (rspamd_mempool_destruct_t)reduce_tasks_count, worker); - /* Set up async session */ - task->s = rspamd_session_create (task->task_pool, rspamd_task_fin, - rspamd_task_restore, (event_finalizer_t )rspamd_task_free, task); + worker->srv->stat->connections_count++; + rspamd_http_connection_set_max_size (session->http_conn, + ctx->cfg->max_message); if (ctx->key) { - rspamd_http_connection_set_key (task->http_conn, ctx->key); + rspamd_http_connection_set_key (session->http_conn, ctx->key); } - rspamd_http_connection_read_message (task->http_conn, - task, + rspamd_http_connection_read_message (session->http_conn, + session, ctx->timeout); } |