aboutsummaryrefslogtreecommitdiffstats
path: root/src/worker.c
diff options
context:
space:
mode:
authorVsevolod Stakhov <vsevolod@highsecure.ru>2014-07-23 12:57:31 +0100
committerVsevolod Stakhov <vsevolod@highsecure.ru>2014-07-23 12:57:31 +0100
commit379055dbbb4af997b4d3ffb161d447872d7ca357 (patch)
tree3774553d470f93e12ddeb454aad9b3b607cf8918 /src/worker.c
parent602ae7a0b7e215ba2677131b8fdc70abc156b3ca (diff)
downloadrspamd-379055dbbb4af997b4d3ffb161d447872d7ca357.tar.gz
rspamd-379055dbbb4af997b4d3ffb161d447872d7ca357.zip
Unify style without sorting headers.
Diffstat (limited to 'src/worker.c')
-rw-r--r--src/worker.c165
1 files changed, 93 insertions, 72 deletions
diff --git a/src/worker.c b/src/worker.c
index 229ee1a4e..77cb9fcd1 100644
--- a/src/worker.c
+++ b/src/worker.c
@@ -50,42 +50,42 @@ gpointer init_worker (struct rspamd_config *cfg);
void start_worker (struct rspamd_worker *worker);
worker_t normal_worker = {
- "normal", /* Name */
- init_worker, /* Init function */
- start_worker, /* Start function */
- TRUE, /* Has socket */
- FALSE, /* Non unique */
- FALSE, /* Non threaded */
- TRUE, /* Killable */
- SOCK_STREAM /* TCP socket */
+ "normal", /* Name */
+ init_worker, /* Init function */
+ start_worker, /* Start function */
+ TRUE, /* Has socket */
+ FALSE, /* Non unique */
+ FALSE, /* Non threaded */
+ TRUE, /* Killable */
+ SOCK_STREAM /* TCP socket */
};
/*
* Worker's context
*/
struct rspamd_worker_ctx {
- guint32 timeout;
- struct timeval io_tv;
- /* Detect whether this worker is mime worker */
- gboolean is_mime;
+ guint32 timeout;
+ struct timeval io_tv;
+ /* Detect whether this worker is mime worker */
+ gboolean is_mime;
/* HTTP worker */
- gboolean is_http;
- /* JSON output */
- gboolean is_json;
+ gboolean is_http;
+ /* JSON output */
+ gboolean is_json;
/* Allow learning throught worker */
- gboolean allow_learn;
+ gboolean allow_learn;
/* DNS resolver */
- struct rspamd_dns_resolver *resolver;
+ struct rspamd_dns_resolver *resolver;
/* Current tasks */
- guint32 tasks;
+ guint32 tasks;
/* Limit of tasks */
- guint32 max_tasks;
+ guint32 max_tasks;
/* Classify threads */
- guint32 classify_threads;
+ guint32 classify_threads;
/* Classify threads */
- GThreadPool *classify_pool;
+ GThreadPool *classify_pool;
/* Events base */
- struct event_base *ev_base;
+ struct event_base *ev_base;
};
/*
@@ -94,18 +94,18 @@ struct rspamd_worker_ctx {
static void
reduce_tasks_count (gpointer arg)
{
- guint32 *tasks = arg;
+ guint32 *tasks = arg;
- (*tasks) --;
+ (*tasks)--;
}
static gint
rspamd_worker_body_handler (struct rspamd_http_connection *conn,
- struct rspamd_http_message *msg,
- const gchar *chunk, gsize len)
+ struct rspamd_http_message *msg,
+ const gchar *chunk, gsize len)
{
- struct rspamd_task *task = (struct rspamd_task *) conn->ud;
- struct rspamd_worker_ctx *ctx;
+ struct rspamd_task *task = (struct rspamd_task *) conn->ud;
+ struct rspamd_worker_ctx *ctx;
ctx = task->worker->ctx;
@@ -137,14 +137,15 @@ rspamd_worker_body_handler (struct rspamd_http_connection *conn,
static void
rspamd_worker_error_handler (struct rspamd_http_connection *conn, GError *err)
{
- struct rspamd_task *task = (struct rspamd_task *) conn->ud;
+ struct rspamd_task *task = (struct rspamd_task *) conn->ud;
msg_info ("abnormally closing connection from: %s, error: %s",
- rspamd_inet_address_to_string (&task->client_addr), err->message);
+ rspamd_inet_address_to_string (&task->client_addr), err->message);
if (task->state != CLOSING_CONNECTION) {
/* We still need to write a reply */
task->error_code = err->code;
- task->last_error = rspamd_mempool_strdup (task->task_pool, err->message);
+ task->last_error =
+ rspamd_mempool_strdup (task->task_pool, err->message);
task->state = WRITE_REPLY;
rspamd_protocol_write_reply (task);
}
@@ -156,13 +157,13 @@ rspamd_worker_error_handler (struct rspamd_http_connection *conn, GError *err)
static gint
rspamd_worker_finish_handler (struct rspamd_http_connection *conn,
- struct rspamd_http_message *msg)
+ struct rspamd_http_message *msg)
{
- struct rspamd_task *task = (struct rspamd_task *) conn->ud;
+ struct rspamd_task *task = (struct rspamd_task *) conn->ud;
if (task->state == CLOSING_CONNECTION) {
msg_debug ("normally closing connection from: %s",
- rspamd_inet_address_to_string (&task->client_addr));
+ rspamd_inet_address_to_string (&task->client_addr));
destroy_session (task->s);
}
else {
@@ -183,21 +184,23 @@ rspamd_worker_finish_handler (struct rspamd_http_connection *conn,
static void
accept_socket (gint fd, short what, void *arg)
{
- struct rspamd_worker *worker = (struct rspamd_worker *) arg;
- struct rspamd_worker_ctx *ctx;
- struct rspamd_task *new_task;
- rspamd_inet_addr_t addr;
- gint nfd;
+ struct rspamd_worker *worker = (struct rspamd_worker *) arg;
+ struct rspamd_worker_ctx *ctx;
+ struct rspamd_task *new_task;
+ rspamd_inet_addr_t addr;
+ gint nfd;
ctx = worker->ctx;
if (ctx->max_tasks != 0 && ctx->tasks > ctx->max_tasks) {
- msg_info ("current tasks is now: %uD while maximum is: %uD", ctx->tasks, ctx->max_tasks);
+ msg_info ("current tasks is now: %uD while maximum is: %uD",
+ ctx->tasks,
+ ctx->max_tasks);
return;
}
if ((nfd =
- rspamd_accept_from_socket (fd, &addr)) == -1) {
+ rspamd_accept_from_socket (fd, &addr)) == -1) {
msg_warn ("accept failed: %s", strerror (errno));
return;
}
@@ -209,8 +212,8 @@ accept_socket (gint fd, short what, void *arg)
new_task = rspamd_task_new (worker);
msg_info ("accepted connection from %s port %d",
- rspamd_inet_address_to_string (&addr),
- rspamd_inet_address_get_port (&addr));
+ rspamd_inet_address_to_string (&addr),
+ rspamd_inet_address_get_port (&addr));
/* Copy some variables */
new_task->sock = nfd;
@@ -220,26 +223,35 @@ accept_socket (gint fd, short what, void *arg)
worker->srv->stat->connections_count++;
new_task->resolver = ctx->resolver;
- new_task->http_conn = rspamd_http_connection_new (rspamd_worker_body_handler,
- rspamd_worker_error_handler, rspamd_worker_finish_handler, 0, RSPAMD_HTTP_SERVER);
+ new_task->http_conn = rspamd_http_connection_new (
+ rspamd_worker_body_handler,
+ rspamd_worker_error_handler,
+ rspamd_worker_finish_handler,
+ 0,
+ RSPAMD_HTTP_SERVER);
new_task->ev_base = ctx->ev_base;
- ctx->tasks ++;
- rspamd_mempool_add_destructor (new_task->task_pool, (rspamd_mempool_destruct_t)reduce_tasks_count, &ctx->tasks);
+ ctx->tasks++;
+ rspamd_mempool_add_destructor (new_task->task_pool,
+ (rspamd_mempool_destruct_t)reduce_tasks_count, &ctx->tasks);
/* Set up async session */
new_task->s = new_async_session (new_task->task_pool, rspamd_task_fin,
- rspamd_task_restore, rspamd_task_free_hard, new_task);
+ rspamd_task_restore, rspamd_task_free_hard, new_task);
new_task->classify_pool = ctx->classify_pool;
- rspamd_http_connection_read_message (new_task->http_conn, new_task, nfd, &ctx->io_tv, ctx->ev_base);
+ rspamd_http_connection_read_message (new_task->http_conn,
+ new_task,
+ nfd,
+ &ctx->io_tv,
+ ctx->ev_base);
}
gpointer
init_worker (struct rspamd_config *cfg)
{
- struct rspamd_worker_ctx *ctx;
- GQuark type;
+ struct rspamd_worker_ctx *ctx;
+ GQuark type;
type = g_quark_try_string ("normal");
@@ -250,32 +262,35 @@ init_worker (struct rspamd_config *cfg)
ctx->classify_threads = 1;
rspamd_rcl_register_worker_option (cfg, type, "mime",
- rspamd_rcl_parse_struct_boolean, ctx,
- G_STRUCT_OFFSET (struct rspamd_worker_ctx, is_mime), 0);
+ rspamd_rcl_parse_struct_boolean, ctx,
+ G_STRUCT_OFFSET (struct rspamd_worker_ctx, is_mime), 0);
rspamd_rcl_register_worker_option (cfg, type, "http",
- rspamd_rcl_parse_struct_boolean, ctx,
- G_STRUCT_OFFSET (struct rspamd_worker_ctx, is_http), 0);
+ rspamd_rcl_parse_struct_boolean, ctx,
+ G_STRUCT_OFFSET (struct rspamd_worker_ctx, is_http), 0);
rspamd_rcl_register_worker_option (cfg, type, "json",
- rspamd_rcl_parse_struct_boolean, ctx,
- G_STRUCT_OFFSET (struct rspamd_worker_ctx, is_json), 0);
+ rspamd_rcl_parse_struct_boolean, ctx,
+ G_STRUCT_OFFSET (struct rspamd_worker_ctx, is_json), 0);
rspamd_rcl_register_worker_option (cfg, type, "allow_learn",
- rspamd_rcl_parse_struct_boolean, ctx,
- G_STRUCT_OFFSET (struct rspamd_worker_ctx, allow_learn), 0);
+ rspamd_rcl_parse_struct_boolean, ctx,
+ G_STRUCT_OFFSET (struct rspamd_worker_ctx, allow_learn), 0);
rspamd_rcl_register_worker_option (cfg, type, "timeout",
- rspamd_rcl_parse_struct_time, ctx,
- G_STRUCT_OFFSET (struct rspamd_worker_ctx, timeout), RSPAMD_CL_FLAG_TIME_INTEGER);
+ rspamd_rcl_parse_struct_time, ctx,
+ G_STRUCT_OFFSET (struct rspamd_worker_ctx,
+ timeout), RSPAMD_CL_FLAG_TIME_INTEGER);
rspamd_rcl_register_worker_option (cfg, type, "max_tasks",
- rspamd_rcl_parse_struct_integer, ctx,
- G_STRUCT_OFFSET (struct rspamd_worker_ctx, max_tasks), RSPAMD_CL_FLAG_INT_32);
+ rspamd_rcl_parse_struct_integer, ctx,
+ G_STRUCT_OFFSET (struct rspamd_worker_ctx,
+ max_tasks), RSPAMD_CL_FLAG_INT_32);
rspamd_rcl_register_worker_option (cfg, type, "classify_threads",
- rspamd_rcl_parse_struct_integer, ctx,
- G_STRUCT_OFFSET (struct rspamd_worker_ctx, classify_threads), RSPAMD_CL_FLAG_INT_32);
+ rspamd_rcl_parse_struct_integer, ctx,
+ G_STRUCT_OFFSET (struct rspamd_worker_ctx,
+ classify_threads), RSPAMD_CL_FLAG_INT_32);
return ctx;
}
@@ -286,9 +301,9 @@ init_worker (struct rspamd_config *cfg)
void
start_worker (struct rspamd_worker *worker)
{
- struct rspamd_worker_ctx *ctx = worker->ctx;
- GError *err = NULL;
- struct lua_locked_state *nL;
+ struct rspamd_worker_ctx *ctx = worker->ctx;
+ GError *err = NULL;
+ struct lua_locked_state *nL;
ctx->ev_base = rspamd_prepare_worker (worker, "normal", accept_socket);
msec_to_tv (ctx->timeout, &ctx->io_tv);
@@ -296,13 +311,19 @@ start_worker (struct rspamd_worker *worker)
start_map_watch (worker->srv->cfg, ctx->ev_base);
- ctx->resolver = dns_resolver_init (worker->srv->logger, ctx->ev_base, worker->srv->cfg);
+ ctx->resolver = dns_resolver_init (worker->srv->logger,
+ ctx->ev_base,
+ worker->srv->cfg);
/* Create classify pool */
ctx->classify_pool = NULL;
if (ctx->classify_threads > 1) {
nL = init_lua_locked (worker->srv->cfg);
- ctx->classify_pool = g_thread_pool_new (process_statfiles_threaded, nL, ctx->classify_threads, TRUE, &err);
+ ctx->classify_pool = g_thread_pool_new (process_statfiles_threaded,
+ nL,
+ ctx->classify_threads,
+ TRUE,
+ &err);
if (err != NULL) {
msg_err ("pool create failed: %s", err->message);
ctx->classify_pool = NULL;
@@ -316,6 +337,6 @@ start_worker (struct rspamd_worker *worker)
exit (EXIT_SUCCESS);
}
-/*
- * vi:ts=4
+/*
+ * vi:ts=4
*/