|
|
@@ -44,6 +44,8 @@ |
|
|
|
|
|
|
|
/* 60 seconds for worker's IO */ |
|
|
|
#define DEFAULT_WORKER_IO_TIMEOUT 60000 |
|
|
|
/* Timeout for task processing */ |
|
|
|
#define DEFAULT_TASK_TIMEOUT 8.0 |
|
|
|
|
|
|
|
gpointer init_worker (struct rspamd_config *cfg); |
|
|
|
void start_worker (struct rspamd_worker *worker); |
|
|
@@ -94,6 +96,8 @@ struct rspamd_worker_ctx { |
|
|
|
struct rspamd_dns_resolver *resolver; |
|
|
|
/* Limit of tasks */ |
|
|
|
guint32 max_tasks; |
|
|
|
/* Maximum time for task processing */ |
|
|
|
gdouble task_timeout; |
|
|
|
/* Events base */ |
|
|
|
struct event_base *ev_base; |
|
|
|
/* Encryption key */ |
|
|
@@ -115,6 +119,18 @@ reduce_tasks_count (gpointer arg) |
|
|
|
(*nconns)--; |
|
|
|
} |
|
|
|
|
|
|
|
static void |
|
|
|
rspamd_task_timeout (gint fd, short what, gpointer ud) |
|
|
|
{ |
|
|
|
struct rspamd_task *task = (struct rspamd_task *) ud; |
|
|
|
|
|
|
|
if (!(task->processed_stages & RSPAMD_TASK_STAGE_FILTERS)) { |
|
|
|
msg_info_task ("processing of task timed out, forced processing"); |
|
|
|
task->processed_stages |= RSPAMD_TASK_STAGE_FILTERS; |
|
|
|
rspamd_task_process (task, RSPAMD_TASK_PROCESS_ALL); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
static gint |
|
|
|
rspamd_worker_body_handler (struct rspamd_http_connection *conn, |
|
|
|
struct rspamd_http_message *msg, |
|
|
@@ -122,6 +138,7 @@ rspamd_worker_body_handler (struct rspamd_http_connection *conn, |
|
|
|
{ |
|
|
|
struct rspamd_task *task = (struct rspamd_task *) conn->ud; |
|
|
|
struct rspamd_worker_ctx *ctx; |
|
|
|
struct timeval task_tv; |
|
|
|
|
|
|
|
ctx = task->worker->ctx; |
|
|
|
|
|
|
@@ -141,6 +158,15 @@ rspamd_worker_body_handler (struct rspamd_http_connection *conn, |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
/* Set global timeout for the task */ |
|
|
|
if (ctx->task_timeout > 0.0) { |
|
|
|
event_set (&task->timeout_ev, -1, EV_TIMEOUT, rspamd_task_timeout, |
|
|
|
task); |
|
|
|
event_base_set (ctx->ev_base, &task->timeout_ev); |
|
|
|
double_to_tv (ctx->task_timeout, &task_tv); |
|
|
|
event_add (&task->timeout_ev, &task_tv); |
|
|
|
} |
|
|
|
|
|
|
|
rspamd_task_process (task, RSPAMD_TASK_PROCESS_ALL); |
|
|
|
|
|
|
|
return 0; |
|
|
@@ -267,37 +293,43 @@ init_worker (struct rspamd_config *cfg) |
|
|
|
ctx->is_mime = TRUE; |
|
|
|
ctx->timeout = DEFAULT_WORKER_IO_TIMEOUT; |
|
|
|
ctx->cfg = cfg; |
|
|
|
ctx->task_timeout = DEFAULT_TASK_TIMEOUT; |
|
|
|
|
|
|
|
rspamd_rcl_register_worker_option (cfg, type, "mime", |
|
|
|
rspamd_rcl_parse_struct_boolean, ctx, |
|
|
|
G_STRUCT_OFFSET (struct rspamd_worker_ctx, is_mime), 0); |
|
|
|
rspamd_rcl_parse_struct_boolean, ctx, |
|
|
|
G_STRUCT_OFFSET (struct rspamd_worker_ctx, is_mime), 0); |
|
|
|
|
|
|
|
rspamd_rcl_register_worker_option (cfg, type, "http", |
|
|
|
rspamd_rcl_parse_struct_boolean, ctx, |
|
|
|
G_STRUCT_OFFSET (struct rspamd_worker_ctx, is_http), 0); |
|
|
|
rspamd_rcl_parse_struct_boolean, ctx, |
|
|
|
G_STRUCT_OFFSET (struct rspamd_worker_ctx, is_http), 0); |
|
|
|
|
|
|
|
rspamd_rcl_register_worker_option (cfg, type, "json", |
|
|
|
rspamd_rcl_parse_struct_boolean, ctx, |
|
|
|
G_STRUCT_OFFSET (struct rspamd_worker_ctx, is_json), 0); |
|
|
|
rspamd_rcl_parse_struct_boolean, ctx, |
|
|
|
G_STRUCT_OFFSET (struct rspamd_worker_ctx, is_json), 0); |
|
|
|
|
|
|
|
rspamd_rcl_register_worker_option (cfg, type, "allow_learn", |
|
|
|
rspamd_rcl_parse_struct_boolean, ctx, |
|
|
|
G_STRUCT_OFFSET (struct rspamd_worker_ctx, allow_learn), 0); |
|
|
|
rspamd_rcl_parse_struct_boolean, ctx, |
|
|
|
G_STRUCT_OFFSET (struct rspamd_worker_ctx, allow_learn), 0); |
|
|
|
|
|
|
|
rspamd_rcl_register_worker_option (cfg, type, "timeout", |
|
|
|
rspamd_rcl_parse_struct_time, ctx, |
|
|
|
G_STRUCT_OFFSET (struct rspamd_worker_ctx, |
|
|
|
timeout), RSPAMD_CL_FLAG_TIME_INTEGER); |
|
|
|
rspamd_rcl_parse_struct_time, ctx, |
|
|
|
G_STRUCT_OFFSET (struct rspamd_worker_ctx, |
|
|
|
timeout), RSPAMD_CL_FLAG_TIME_INTEGER); |
|
|
|
|
|
|
|
rspamd_rcl_register_worker_option (cfg, type, "task_timeout", |
|
|
|
rspamd_rcl_parse_struct_time, ctx, |
|
|
|
G_STRUCT_OFFSET (struct rspamd_worker_ctx, |
|
|
|
task_timeout), RSPAMD_CL_FLAG_TIME_FLOAT); |
|
|
|
|
|
|
|
rspamd_rcl_register_worker_option (cfg, type, "max_tasks", |
|
|
|
rspamd_rcl_parse_struct_integer, ctx, |
|
|
|
G_STRUCT_OFFSET (struct rspamd_worker_ctx, |
|
|
|
max_tasks), RSPAMD_CL_FLAG_INT_32); |
|
|
|
rspamd_rcl_parse_struct_integer, ctx, |
|
|
|
G_STRUCT_OFFSET (struct rspamd_worker_ctx, |
|
|
|
max_tasks), RSPAMD_CL_FLAG_INT_32); |
|
|
|
|
|
|
|
rspamd_rcl_register_worker_option (cfg, type, "keypair", |
|
|
|
rspamd_rcl_parse_struct_keypair, ctx, |
|
|
|
G_STRUCT_OFFSET (struct rspamd_worker_ctx, |
|
|
|
key), 0); |
|
|
|
rspamd_rcl_parse_struct_keypair, ctx, |
|
|
|
G_STRUCT_OFFSET (struct rspamd_worker_ctx, |
|
|
|
key), 0); |
|
|
|
|
|
|
|
return ctx; |
|
|
|
} |