From 7055f521c14f35249fb786f1a497c546ad1ff716 Mon Sep 17 00:00:00 2001 From: Vsevolod Stakhov Date: Mon, 12 Oct 2015 18:42:32 +0100 Subject: [PATCH] Add global timeout for the overall task processing (8 seconds by default) --- src/libserver/task.c | 2 ++ src/libserver/task.h | 1 + src/worker.c | 66 ++++++++++++++++++++++++++++++++------------ 3 files changed, 52 insertions(+), 17 deletions(-) diff --git a/src/libserver/task.c b/src/libserver/task.c index eaf845949..6d1edaebd 100644 --- a/src/libserver/task.c +++ b/src/libserver/task.c @@ -224,6 +224,8 @@ rspamd_task_free (struct rspamd_task *task, gboolean is_soft) g_error_free (task->err); } + event_del (&task->timeout_ev); + rspamd_mempool_delete (task->task_pool); g_slice_free1 (sizeof (struct rspamd_task), task); } diff --git a/src/libserver/task.h b/src/libserver/task.h index ee57c0f93..528b3e50e 100644 --- a/src/libserver/task.h +++ b/src/libserver/task.h @@ -165,6 +165,7 @@ struct rspamd_task { struct rspamd_dns_resolver *resolver; /**< DNS resolver */ struct event_base *ev_base; /**< Event base */ + struct event timeout_ev; /**< Global task timeout */ gpointer checkpoint; /**< Opaque checkpoint data */ diff --git a/src/worker.c b/src/worker.c index a1fc09e97..cde69c2a1 100644 --- a/src/worker.c +++ b/src/worker.c @@ -44,6 +44,8 @@ /* 60 seconds for worker's IO */ #define DEFAULT_WORKER_IO_TIMEOUT 60000 +/* Timeout for task processing */ +#define DEFAULT_TASK_TIMEOUT 8.0 gpointer init_worker (struct rspamd_config *cfg); void start_worker (struct rspamd_worker *worker); @@ -94,6 +96,8 @@ struct rspamd_worker_ctx { struct rspamd_dns_resolver *resolver; /* Limit of tasks */ guint32 max_tasks; + /* Maximum time for task processing */ + gdouble task_timeout; /* Events base */ struct event_base *ev_base; /* Encryption key */ @@ -115,6 +119,18 @@ reduce_tasks_count (gpointer arg) (*nconns)--; } +static void +rspamd_task_timeout (gint fd, short what, gpointer ud) +{ + struct rspamd_task *task = (struct rspamd_task *) ud; + + if (!(task->processed_stages & RSPAMD_TASK_STAGE_FILTERS)) { + msg_info_task ("processing of task timed out, forced processing"); + task->processed_stages |= RSPAMD_TASK_STAGE_FILTERS; + rspamd_task_process (task, RSPAMD_TASK_PROCESS_ALL); + } +} + static gint rspamd_worker_body_handler (struct rspamd_http_connection *conn, struct rspamd_http_message *msg, @@ -122,6 +138,7 @@ rspamd_worker_body_handler (struct rspamd_http_connection *conn, { struct rspamd_task *task = (struct rspamd_task *) conn->ud; struct rspamd_worker_ctx *ctx; + struct timeval task_tv; ctx = task->worker->ctx; @@ -141,6 +158,15 @@ rspamd_worker_body_handler (struct rspamd_http_connection *conn, } } + /* Set global timeout for the task */ + if (ctx->task_timeout > 0.0) { + event_set (&task->timeout_ev, -1, EV_TIMEOUT, rspamd_task_timeout, + task); + event_base_set (ctx->ev_base, &task->timeout_ev); + double_to_tv (ctx->task_timeout, &task_tv); + event_add (&task->timeout_ev, &task_tv); + } + rspamd_task_process (task, RSPAMD_TASK_PROCESS_ALL); return 0; @@ -267,37 +293,43 @@ init_worker (struct rspamd_config *cfg) ctx->is_mime = TRUE; ctx->timeout = DEFAULT_WORKER_IO_TIMEOUT; ctx->cfg = cfg; + ctx->task_timeout = DEFAULT_TASK_TIMEOUT; rspamd_rcl_register_worker_option (cfg, type, "mime", - rspamd_rcl_parse_struct_boolean, ctx, - G_STRUCT_OFFSET (struct rspamd_worker_ctx, is_mime), 0); + rspamd_rcl_parse_struct_boolean, ctx, + G_STRUCT_OFFSET (struct rspamd_worker_ctx, is_mime), 0); rspamd_rcl_register_worker_option (cfg, type, "http", - rspamd_rcl_parse_struct_boolean, ctx, - G_STRUCT_OFFSET (struct rspamd_worker_ctx, is_http), 0); + rspamd_rcl_parse_struct_boolean, ctx, + G_STRUCT_OFFSET (struct rspamd_worker_ctx, is_http), 0); rspamd_rcl_register_worker_option (cfg, type, "json", - rspamd_rcl_parse_struct_boolean, ctx, - G_STRUCT_OFFSET (struct rspamd_worker_ctx, is_json), 0); + rspamd_rcl_parse_struct_boolean, ctx, + G_STRUCT_OFFSET (struct rspamd_worker_ctx, is_json), 0); rspamd_rcl_register_worker_option (cfg, type, "allow_learn", - rspamd_rcl_parse_struct_boolean, ctx, - G_STRUCT_OFFSET (struct rspamd_worker_ctx, allow_learn), 0); + rspamd_rcl_parse_struct_boolean, ctx, + G_STRUCT_OFFSET (struct rspamd_worker_ctx, allow_learn), 0); rspamd_rcl_register_worker_option (cfg, type, "timeout", - rspamd_rcl_parse_struct_time, ctx, - G_STRUCT_OFFSET (struct rspamd_worker_ctx, - timeout), RSPAMD_CL_FLAG_TIME_INTEGER); + rspamd_rcl_parse_struct_time, ctx, + G_STRUCT_OFFSET (struct rspamd_worker_ctx, + timeout), RSPAMD_CL_FLAG_TIME_INTEGER); + + rspamd_rcl_register_worker_option (cfg, type, "task_timeout", + rspamd_rcl_parse_struct_time, ctx, + G_STRUCT_OFFSET (struct rspamd_worker_ctx, + task_timeout), RSPAMD_CL_FLAG_TIME_FLOAT); rspamd_rcl_register_worker_option (cfg, type, "max_tasks", - rspamd_rcl_parse_struct_integer, ctx, - G_STRUCT_OFFSET (struct rspamd_worker_ctx, - max_tasks), RSPAMD_CL_FLAG_INT_32); + rspamd_rcl_parse_struct_integer, ctx, + G_STRUCT_OFFSET (struct rspamd_worker_ctx, + max_tasks), RSPAMD_CL_FLAG_INT_32); rspamd_rcl_register_worker_option (cfg, type, "keypair", - rspamd_rcl_parse_struct_keypair, ctx, - G_STRUCT_OFFSET (struct rspamd_worker_ctx, - key), 0); + rspamd_rcl_parse_struct_keypair, ctx, + G_STRUCT_OFFSET (struct rspamd_worker_ctx, + key), 0); return ctx; } -- 2.39.5