struct rspamd_controller_session { | struct rspamd_controller_session { | ||||
struct rspamd_controller_worker_ctx *ctx; | struct rspamd_controller_worker_ctx *ctx; | ||||
struct rspamd_worker *wrk; | |||||
rspamd_mempool_t *pool; | rspamd_mempool_t *pool; | ||||
struct rspamd_task *task; | struct rspamd_task *task; | ||||
struct rspamd_classifier_config *cl; | struct rspamd_classifier_config *cl; | ||||
rspamd_mempool_delete (session->pool); | rspamd_mempool_delete (session->pool); | ||||
} | } | ||||
session->wrk->nconns --; | |||||
rspamd_inet_address_destroy (session->from_addr); | rspamd_inet_address_destroy (session->from_addr); | ||||
msg_debug_session ("destroy session %p", session); | msg_debug_session ("destroy session %p", session); | ||||
g_slice_free1 (sizeof (struct rspamd_controller_session), session); | g_slice_free1 (sizeof (struct rspamd_controller_session), session); | ||||
session->ctx = ctx; | session->ctx = ctx; | ||||
session->from_addr = addr; | session->from_addr = addr; | ||||
session->wrk = worker; | |||||
worker->nconns ++; | |||||
rspamd_http_router_handle_socket (ctx->http, nfd, session); | rspamd_http_router_handle_socket (ctx->http, nfd, session); | ||||
} | } |
/* Got some data */ | /* Got some data */ | ||||
if (what == EV_READ) { | if (what == EV_READ) { | ||||
worker->nconns++; | |||||
while ((r = rspamd_inet_address_recvfrom (fd, buf, sizeof (buf), 0, | while ((r = rspamd_inet_address_recvfrom (fd, buf, sizeof (buf), 0, | ||||
&session.addr)) == -1) { | &session.addr)) == -1) { | ||||
if (errno == EINTR) { | if (errno == EINTR) { | ||||
} | } | ||||
rspamd_inet_address_destroy (session.addr); | rspamd_inet_address_destroy (session.addr); | ||||
worker->nconns --; | |||||
} | } | ||||
rspamd_explicit_memzero (session.nm, sizeof (session.nm)); | rspamd_explicit_memzero (session.nm, sizeof (session.nm)); |
gboolean allow_learn; | gboolean allow_learn; | ||||
/* DNS resolver */ | /* DNS resolver */ | ||||
struct rspamd_dns_resolver *resolver; | struct rspamd_dns_resolver *resolver; | ||||
/* Current tasks */ | |||||
guint32 tasks; | |||||
/* Limit of tasks */ | /* Limit of tasks */ | ||||
guint32 max_tasks; | guint32 max_tasks; | ||||
/* Events base */ | /* Events base */ | ||||
static void | static void | ||||
reduce_tasks_count (gpointer arg) | reduce_tasks_count (gpointer arg) | ||||
{ | { | ||||
guint32 *tasks = arg; | |||||
guint *nconns = arg; | |||||
(*tasks)--; | |||||
(*nconns)--; | |||||
} | } | ||||
static gint | static gint | ||||
ctx = worker->ctx; | ctx = worker->ctx; | ||||
if (ctx->max_tasks != 0 && ctx->tasks > ctx->max_tasks) { | |||||
if (ctx->max_tasks != 0 && worker->nconns > ctx->max_tasks) { | |||||
msg_info_ctx ("current tasks is now: %uD while maximum is: %uD", | msg_info_ctx ("current tasks is now: %uD while maximum is: %uD", | ||||
ctx->tasks, | |||||
worker->nconns, | |||||
ctx->max_tasks); | ctx->max_tasks); | ||||
return; | return; | ||||
} | } | ||||
RSPAMD_HTTP_SERVER, | RSPAMD_HTTP_SERVER, | ||||
ctx->keys_cache); | ctx->keys_cache); | ||||
task->ev_base = ctx->ev_base; | task->ev_base = ctx->ev_base; | ||||
ctx->tasks++; | |||||
worker->nconns++; | |||||
rspamd_mempool_add_destructor (task->task_pool, | rspamd_mempool_add_destructor (task->task_pool, | ||||
(rspamd_mempool_destruct_t)reduce_tasks_count, &ctx->tasks); | |||||
(rspamd_mempool_destruct_t)reduce_tasks_count, &worker->nconns); | |||||
/* Set up async session */ | /* Set up async session */ | ||||
task->s = rspamd_session_create (task->task_pool, rspamd_task_fin, | task->s = rspamd_session_create (task->task_pool, rspamd_task_fin, |