/* 60 seconds for worker's IO */
#define DEFAULT_WORKER_IO_TIMEOUT 60000
+/* Timeout for task processing */
+#define DEFAULT_TASK_TIMEOUT 8.0
gpointer init_worker (struct rspamd_config *cfg);
void start_worker (struct rspamd_worker *worker);
struct rspamd_dns_resolver *resolver;
/* Limit of tasks */
guint32 max_tasks;
+ /* Maximum time for task processing */
+ gdouble task_timeout;
/* Events base */
struct event_base *ev_base;
/* Encryption key */
(*nconns)--;
}
+static void
+rspamd_task_timeout (gint fd, short what, gpointer ud)
+{
+ struct rspamd_task *task = (struct rspamd_task *) ud;
+
+ if (!(task->processed_stages & RSPAMD_TASK_STAGE_FILTERS)) {
+ msg_info_task ("processing of task timed out, forced processing");
+ task->processed_stages |= RSPAMD_TASK_STAGE_FILTERS;
+ rspamd_task_process (task, RSPAMD_TASK_PROCESS_ALL);
+ }
+}
+
static gint
rspamd_worker_body_handler (struct rspamd_http_connection *conn,
struct rspamd_http_message *msg,
{
struct rspamd_task *task = (struct rspamd_task *) conn->ud;
struct rspamd_worker_ctx *ctx;
+ struct timeval task_tv;
ctx = task->worker->ctx;
}
}
+ /* Set global timeout for the task */
+ if (ctx->task_timeout > 0.0) {
+ event_set (&task->timeout_ev, -1, EV_TIMEOUT, rspamd_task_timeout,
+ task);
+ event_base_set (ctx->ev_base, &task->timeout_ev);
+ double_to_tv (ctx->task_timeout, &task_tv);
+ event_add (&task->timeout_ev, &task_tv);
+ }
+
rspamd_task_process (task, RSPAMD_TASK_PROCESS_ALL);
return 0;
ctx->is_mime = TRUE;
ctx->timeout = DEFAULT_WORKER_IO_TIMEOUT;
ctx->cfg = cfg;
+ ctx->task_timeout = DEFAULT_TASK_TIMEOUT;
rspamd_rcl_register_worker_option (cfg, type, "mime",
- rspamd_rcl_parse_struct_boolean, ctx,
- G_STRUCT_OFFSET (struct rspamd_worker_ctx, is_mime), 0);
+ rspamd_rcl_parse_struct_boolean, ctx,
+ G_STRUCT_OFFSET (struct rspamd_worker_ctx, is_mime), 0);
rspamd_rcl_register_worker_option (cfg, type, "http",
- rspamd_rcl_parse_struct_boolean, ctx,
- G_STRUCT_OFFSET (struct rspamd_worker_ctx, is_http), 0);
+ rspamd_rcl_parse_struct_boolean, ctx,
+ G_STRUCT_OFFSET (struct rspamd_worker_ctx, is_http), 0);
rspamd_rcl_register_worker_option (cfg, type, "json",
- rspamd_rcl_parse_struct_boolean, ctx,
- G_STRUCT_OFFSET (struct rspamd_worker_ctx, is_json), 0);
+ rspamd_rcl_parse_struct_boolean, ctx,
+ G_STRUCT_OFFSET (struct rspamd_worker_ctx, is_json), 0);
rspamd_rcl_register_worker_option (cfg, type, "allow_learn",
- rspamd_rcl_parse_struct_boolean, ctx,
- G_STRUCT_OFFSET (struct rspamd_worker_ctx, allow_learn), 0);
+ rspamd_rcl_parse_struct_boolean, ctx,
+ G_STRUCT_OFFSET (struct rspamd_worker_ctx, allow_learn), 0);
rspamd_rcl_register_worker_option (cfg, type, "timeout",
- rspamd_rcl_parse_struct_time, ctx,
- G_STRUCT_OFFSET (struct rspamd_worker_ctx,
- timeout), RSPAMD_CL_FLAG_TIME_INTEGER);
+ rspamd_rcl_parse_struct_time, ctx,
+ G_STRUCT_OFFSET (struct rspamd_worker_ctx,
+ timeout), RSPAMD_CL_FLAG_TIME_INTEGER);
+
+ rspamd_rcl_register_worker_option (cfg, type, "task_timeout",
+ rspamd_rcl_parse_struct_time, ctx,
+ G_STRUCT_OFFSET (struct rspamd_worker_ctx,
+ task_timeout), RSPAMD_CL_FLAG_TIME_FLOAT);
rspamd_rcl_register_worker_option (cfg, type, "max_tasks",
- rspamd_rcl_parse_struct_integer, ctx,
- G_STRUCT_OFFSET (struct rspamd_worker_ctx,
- max_tasks), RSPAMD_CL_FLAG_INT_32);
+ rspamd_rcl_parse_struct_integer, ctx,
+ G_STRUCT_OFFSET (struct rspamd_worker_ctx,
+ max_tasks), RSPAMD_CL_FLAG_INT_32);
rspamd_rcl_register_worker_option (cfg, type, "keypair",
- rspamd_rcl_parse_struct_keypair, ctx,
- G_STRUCT_OFFSET (struct rspamd_worker_ctx,
- key), 0);
+ rspamd_rcl_parse_struct_keypair, ctx,
+ G_STRUCT_OFFSET (struct rspamd_worker_ctx,
+ key), 0);
return ctx;
}