aboutsummaryrefslogtreecommitdiffstats
path: root/src/worker.c
diff options
context:
space:
mode:
authorVsevolod Stakhov <vsevolod@highsecure.ru>2019-12-23 18:43:16 +0000
committerVsevolod Stakhov <vsevolod@highsecure.ru>2019-12-23 18:43:16 +0000
commit23b99d31a9a620b3c62848da4126dbbf0642f6db (patch)
treebeb2ae9a991c97e9c6eb3c0f8baa8a43076e0a11 /src/worker.c
parentdb425a97b301299cc0d2cb732a5e266e83216299 (diff)
downloadrspamd-23b99d31a9a620b3c62848da4126dbbf0642f6db.tar.gz
rspamd-23b99d31a9a620b3c62848da4126dbbf0642f6db.zip
[Project] Allow to enable mempool debugging from the protocol
Diffstat (limited to 'src/worker.c')
-rw-r--r--src/worker.c257
1 files changed, 184 insertions, 73 deletions
diff --git a/src/worker.c b/src/worker.c
index b20800a60..8ba221c7a 100644
--- a/src/worker.c
+++ b/src/worker.c
@@ -66,6 +66,15 @@ worker_t normal_worker = {
G_STRFUNC, \
__VA_ARGS__)
+struct rspamd_worker_session {
+ gint64 magic;
+ struct rspamd_task *task;
+ gint fd;
+ rspamd_inet_addr_t *addr;
+ struct rspamd_worker_ctx *ctx;
+ struct rspamd_http_connection *http_conn;
+ struct rspamd_worker *worker;
+};
/*
* Reduce number of tasks proceeded
*/
@@ -216,10 +225,67 @@ rspamd_worker_body_handler (struct rspamd_http_connection *conn,
struct rspamd_http_message *msg,
const gchar *chunk, gsize len)
{
- struct rspamd_task *task = (struct rspamd_task *) conn->ud;
+ struct rspamd_worker_session *session = (struct rspamd_worker_session *)conn->ud;
+ struct rspamd_task *task;
struct rspamd_worker_ctx *ctx;
+ const rspamd_ftok_t *hv_tok;
+ gboolean debug_mempool = FALSE;
+
+ ctx = session->ctx;
+
+ /* Check debug */
+ if ((hv_tok = rspamd_http_message_find_header (msg, "Memory")) != NULL) {
+ rspamd_ftok_t cmp;
+
+ RSPAMD_FTOK_ASSIGN (&cmp, "debug");
+
+ if (rspamd_ftok_cmp (hv_tok, &cmp) == 0) {
+ debug_mempool = TRUE;
+ }
+ }
+
+ task = rspamd_task_new (session->worker,
+ session->ctx->cfg, NULL, session->ctx->lang_det,
+ session->ctx->event_loop,
+ debug_mempool);
+ session->task = task;
+
+ msg_info_task ("accepted connection from %s port %d, task ptr: %p",
+ rspamd_inet_address_to_string (session->addr),
+ rspamd_inet_address_get_port (session->addr),
+ task);
+
+ /* Copy some variables */
+ if (ctx->is_mime) {
+ task->flags |= RSPAMD_TASK_FLAG_MIME;
+ }
+ else {
+ task->flags &= ~RSPAMD_TASK_FLAG_MIME;
+ }
+
+ /* We actually transfer ownership from session to task here */
+ task->sock = session->fd;
+ task->client_addr = session->addr;
+ task->worker = session->worker;
+ task->http_conn = session->http_conn;
+
+ task->resolver = ctx->resolver;
+ /* TODO: allow to disable autolearn in protocol */
+ task->flags |= RSPAMD_TASK_FLAG_LEARN_AUTO;
+
+ session->worker->nconns++;
+ rspamd_mempool_add_destructor (task->task_pool,
+ (rspamd_mempool_destruct_t)reduce_tasks_count,
+ session->worker);
+
+ /* Session memory is also now handled by task */
+ rspamd_mempool_add_destructor (task->task_pool,
+ (rspamd_mempool_destruct_t)g_free,
+ session);
- ctx = task->worker->ctx;
+ /* Set up async session */
+ task->s = rspamd_session_create (task->task_pool, rspamd_task_fin,
+ rspamd_task_restore, (event_finalizer_t )rspamd_task_free, task);
if (!rspamd_protocol_handle_request (task, msg)) {
msg_err_task ("cannot handle request: %e", task->err);
@@ -248,7 +314,9 @@ rspamd_worker_body_handler (struct rspamd_http_connection *conn,
/* Set socket guard */
task->guard_ev.data = task;
- ev_io_init (&task->guard_ev, rspamd_worker_guard_handler, task->sock, EV_READ);
+ ev_io_init (&task->guard_ev,
+ rspamd_worker_guard_handler,
+ task->sock, EV_READ);
ev_io_start (task->event_loop, &task->guard_ev);
rspamd_task_process (task, RSPAMD_TASK_PROCESS_ALL);
@@ -259,43 +327,84 @@ rspamd_worker_body_handler (struct rspamd_http_connection *conn,
static void
rspamd_worker_error_handler (struct rspamd_http_connection *conn, GError *err)
{
- struct rspamd_task *task = (struct rspamd_task *) conn->ud;
+ struct rspamd_worker_session *session = (struct rspamd_worker_session *)conn->ud;
+ struct rspamd_task *task;
struct rspamd_http_message *msg;
rspamd_fstring_t *reply;
- msg_info_task ("abnormally closing connection from: %s, error: %e",
- rspamd_inet_address_to_string (task->client_addr), err);
- if (task->processed_stages & RSPAMD_TASK_STAGE_REPLIED) {
- /* Terminate session immediately */
- rspamd_session_destroy (task->s);
+ /*
+ * This function can be called with both struct rspamd_worker_session *
+ * and struct rspamd_task *
+ *
+ * The first case is when we read message and it is controlled by this code;
+ * the second case is when a reply is written and we do not control it normally,
+ * as it is managed by `rspamd_protocol_reply` in protocol.c
+ *
+ * Hence, we need to distinguish our arguments...
+ *
+ * The approach here is simple:
+ * - struct rspamd_worker_session starts with gint64 `magic` and we set it to
+ * MAX_INT64
+ * - struct rspamd_task starts with a pointer (or pointer + command on 32 bit system)
+ *
+ * The idea is simple: no sane pointer would reach MAX_INT64, so if this field
+ * is MAX_INT64 then it is our session, and if it is not then it is a task.
+ */
+
+ if (session->magic == G_MAXINT64) {
+ task = session->task;
}
else {
- task->processed_stages |= RSPAMD_TASK_STAGE_REPLIED;
- msg = rspamd_http_new_message (HTTP_RESPONSE);
+ task = (struct rspamd_task *)conn->ud;
+ }
+
+
+ if (task) {
+ msg_info_task ("abnormally closing connection from: %s, error: %e",
+ rspamd_inet_address_to_string_pretty (task->client_addr), err);
- if (err) {
- msg->status = rspamd_fstring_new_init (err->message,
- strlen (err->message));
- msg->code = err->code;
+ if (task->processed_stages & RSPAMD_TASK_STAGE_REPLIED) {
+ /* Terminate session immediately */
+ rspamd_session_destroy (task->s);
}
else {
- msg->status = rspamd_fstring_new_init ("Internal error",
- strlen ("Internal error"));
- msg->code = 500;
- }
+ task->processed_stages |= RSPAMD_TASK_STAGE_REPLIED;
+ msg = rspamd_http_new_message (HTTP_RESPONSE);
- msg->date = time (NULL);
-
- reply = rspamd_fstring_sized_new (msg->status->len + 16);
- rspamd_printf_fstring (&reply, "{\"error\":\"%V\"}", msg->status);
- rspamd_http_message_set_body_from_fstring_steal (msg, reply);
- rspamd_http_connection_reset (task->http_conn);
- rspamd_http_connection_write_message (task->http_conn,
- msg,
- NULL,
- "application/json",
- task,
- 1.0);
+ if (err) {
+ msg->status = rspamd_fstring_new_init (err->message,
+ strlen (err->message));
+ msg->code = err->code;
+ }
+ else {
+ msg->status = rspamd_fstring_new_init ("Internal error",
+ strlen ("Internal error"));
+ msg->code = 500;
+ }
+
+ msg->date = time (NULL);
+
+ reply = rspamd_fstring_sized_new (msg->status->len + 16);
+ rspamd_printf_fstring (&reply, "{\"error\":\"%V\"}", msg->status);
+ rspamd_http_message_set_body_from_fstring_steal (msg, reply);
+ rspamd_http_connection_reset (task->http_conn);
+ rspamd_http_connection_write_message (task->http_conn,
+ msg,
+ NULL,
+ "application/json",
+ task,
+ 1.0);
+ }
+ }
+ else {
+ /* If there was no task, then session is unmanaged */
+ msg_info ("no data received from: %s, error: %e",
+ rspamd_inet_address_to_string_pretty (session->addr), err);
+ rspamd_http_connection_reset (session->http_conn);
+ rspamd_http_connection_unref (session->http_conn);
+ rspamd_inet_address_free (session->addr);
+ close (session->fd);
+ g_free (session);
}
}
@@ -303,16 +412,38 @@ static gint
rspamd_worker_finish_handler (struct rspamd_http_connection *conn,
struct rspamd_http_message *msg)
{
- struct rspamd_task *task = (struct rspamd_task *) conn->ud;
+ struct rspamd_worker_session *session = (struct rspamd_worker_session *)conn->ud;
+ struct rspamd_task *task;
- if (task->processed_stages & RSPAMD_TASK_STAGE_REPLIED) {
- /* We are done here */
- msg_debug_task ("normally closing connection from: %s",
- rspamd_inet_address_to_string (task->client_addr));
- rspamd_session_destroy (task->s);
+ /* Read the comment to rspamd_worker_error_handler */
+
+ if (session->magic == G_MAXINT64) {
+ task = session->task;
}
- else if (task->processed_stages & RSPAMD_TASK_STAGE_DONE) {
- rspamd_session_pending (task->s);
+ else {
+ task = (struct rspamd_task *)conn->ud;
+ }
+
+ if (task) {
+ if (task->processed_stages & RSPAMD_TASK_STAGE_REPLIED) {
+ /* We are done here */
+ msg_debug_task ("normally closing connection from: %s",
+ rspamd_inet_address_to_string (task->client_addr));
+ rspamd_session_destroy (task->s);
+ }
+ else if (task->processed_stages & RSPAMD_TASK_STAGE_DONE) {
+ rspamd_session_pending (task->s);
+ }
+ }
+ else {
+ /* If there was no task, then session is unmanaged */
+ msg_info ("no data received from: %s, closing connection",
+ rspamd_inet_address_to_string_pretty (session->addr));
+ rspamd_inet_address_free (session->addr);
+ rspamd_http_connection_reset (session->http_conn);
+ rspamd_http_connection_unref (session->http_conn);
+ close (session->fd);
+ g_free (session);
}
return 0;
@@ -326,7 +457,7 @@ accept_socket (EV_P_ ev_io *w, int revents)
{
struct rspamd_worker *worker = (struct rspamd_worker *) w->data;
struct rspamd_worker_ctx *ctx;
- struct rspamd_task *task;
+ struct rspamd_worker_session *session;
rspamd_inet_addr_t *addr;
gint nfd, http_opts = 0;
@@ -350,55 +481,35 @@ accept_socket (EV_P_ ev_io *w, int revents)
return;
}
- task = rspamd_task_new (worker, ctx->cfg, NULL, ctx->lang_det, ctx->event_loop);
-
- msg_info_task ("accepted connection from %s port %d, task ptr: %p",
- rspamd_inet_address_to_string (addr),
- rspamd_inet_address_get_port (addr),
- task);
-
- /* Copy some variables */
- if (ctx->is_mime) {
- task->flags |= RSPAMD_TASK_FLAG_MIME;
- }
- else {
- task->flags &= ~RSPAMD_TASK_FLAG_MIME;
- }
-
- task->sock = nfd;
- task->client_addr = addr;
-
- worker->srv->stat->connections_count++;
- task->resolver = ctx->resolver;
- /* TODO: allow to disable autolearn in protocol */
- task->flags |= RSPAMD_TASK_FLAG_LEARN_AUTO;
+ session = g_malloc (sizeof (*session));
+ session->magic = G_MAXINT64;
+ session->addr = addr;
+ session->fd = nfd;
+ session->ctx = ctx;
+ session->worker = worker;
if (ctx->encrypted_only && !rspamd_inet_address_is_local (addr, FALSE)) {
http_opts = RSPAMD_HTTP_REQUIRE_ENCRYPTION;
}
- task->http_conn = rspamd_http_connection_new_server (
+ session->http_conn = rspamd_http_connection_new_server (
ctx->http_ctx,
nfd,
rspamd_worker_body_handler,
rspamd_worker_error_handler,
rspamd_worker_finish_handler,
http_opts);
- rspamd_http_connection_set_max_size (task->http_conn, task->cfg->max_message);
- worker->nconns++;
- rspamd_mempool_add_destructor (task->task_pool,
- (rspamd_mempool_destruct_t)reduce_tasks_count, worker);
- /* Set up async session */
- task->s = rspamd_session_create (task->task_pool, rspamd_task_fin,
- rspamd_task_restore, (event_finalizer_t )rspamd_task_free, task);
+ worker->srv->stat->connections_count++;
+ rspamd_http_connection_set_max_size (session->http_conn,
+ ctx->cfg->max_message);
if (ctx->key) {
- rspamd_http_connection_set_key (task->http_conn, ctx->key);
+ rspamd_http_connection_set_key (session->http_conn, ctx->key);
}
- rspamd_http_connection_read_message (task->http_conn,
- task,
+ rspamd_http_connection_read_message (session->http_conn,
+ session,
ctx->timeout);
}