summaryrefslogtreecommitdiffstats
path: root/src/worker.c
diff options
context:
space:
mode:
authorVsevolod Stakhov <vsevolod@highsecure.ru>2014-07-23 12:53:08 +0100
committerVsevolod Stakhov <vsevolod@highsecure.ru>2014-07-23 12:53:08 +0100
commitfe79d8c5a39f2b717f78cc3f3ef21b3cfc46500b (patch)
treec84e6a5d4c5cd78a7a2cc3c7adbc7af5d0541682 /src/worker.c
parente0483657ff6cf1adc828ccce457814d61fe90a0d (diff)
downloadrspamd-fe79d8c5a39f2b717f78cc3f3ef21b3cfc46500b.tar.gz
rspamd-fe79d8c5a39f2b717f78cc3f3ef21b3cfc46500b.zip
Revert "Unify code style."
This reverts commit e0483657ff6cf1adc828ccce457814d61fe90a0d.
Diffstat (limited to 'src/worker.c')
-rw-r--r--src/worker.c175
1 files changed, 77 insertions, 98 deletions
diff --git a/src/worker.c b/src/worker.c
index 232ed6c4a..229ee1a4e 100644
--- a/src/worker.c
+++ b/src/worker.c
@@ -26,16 +26,16 @@
* Rspamd worker implementation
*/
-#include "cfg_file.h"
#include "config.h"
-#include "dns.h"
+#include "util.h"
#include "main.h"
-#include "map.h"
-#include "message.h"
#include "protocol.h"
#include "upstream.h"
+#include "cfg_file.h"
#include "url.h"
-#include "util.h"
+#include "message.h"
+#include "map.h"
+#include "dns.h"
#include "lua/lua_common.h"
@@ -50,42 +50,42 @@ gpointer init_worker (struct rspamd_config *cfg);
void start_worker (struct rspamd_worker *worker);
worker_t normal_worker = {
- "normal", /* Name */
- init_worker, /* Init function */
- start_worker, /* Start function */
- TRUE, /* Has socket */
- FALSE, /* Non unique */
- FALSE, /* Non threaded */
- TRUE, /* Killable */
- SOCK_STREAM /* TCP socket */
+ "normal", /* Name */
+ init_worker, /* Init function */
+ start_worker, /* Start function */
+ TRUE, /* Has socket */
+ FALSE, /* Non unique */
+ FALSE, /* Non threaded */
+ TRUE, /* Killable */
+ SOCK_STREAM /* TCP socket */
};
/*
* Worker's context
*/
struct rspamd_worker_ctx {
- guint32 timeout;
- struct timeval io_tv;
- /* Detect whether this worker is mime worker */
- gboolean is_mime;
+ guint32 timeout;
+ struct timeval io_tv;
+ /* Detect whether this worker is mime worker */
+ gboolean is_mime;
/* HTTP worker */
- gboolean is_http;
- /* JSON output */
- gboolean is_json;
+ gboolean is_http;
+ /* JSON output */
+ gboolean is_json;
/* Allow learning throught worker */
- gboolean allow_learn;
+ gboolean allow_learn;
/* DNS resolver */
- struct rspamd_dns_resolver *resolver;
+ struct rspamd_dns_resolver *resolver;
/* Current tasks */
- guint32 tasks;
+ guint32 tasks;
/* Limit of tasks */
- guint32 max_tasks;
+ guint32 max_tasks;
/* Classify threads */
- guint32 classify_threads;
+ guint32 classify_threads;
/* Classify threads */
- GThreadPool *classify_pool;
+ GThreadPool *classify_pool;
/* Events base */
- struct event_base *ev_base;
+ struct event_base *ev_base;
};
/*
@@ -94,18 +94,18 @@ struct rspamd_worker_ctx {
static void
reduce_tasks_count (gpointer arg)
{
- guint32 *tasks = arg;
+ guint32 *tasks = arg;
- (*tasks)--;
+ (*tasks) --;
}
static gint
rspamd_worker_body_handler (struct rspamd_http_connection *conn,
- struct rspamd_http_message *msg,
- const gchar *chunk, gsize len)
+ struct rspamd_http_message *msg,
+ const gchar *chunk, gsize len)
{
- struct rspamd_task *task = (struct rspamd_task *) conn->ud;
- struct rspamd_worker_ctx *ctx;
+ struct rspamd_task *task = (struct rspamd_task *) conn->ud;
+ struct rspamd_worker_ctx *ctx;
ctx = task->worker->ctx;
@@ -137,15 +137,14 @@ rspamd_worker_body_handler (struct rspamd_http_connection *conn,
static void
rspamd_worker_error_handler (struct rspamd_http_connection *conn, GError *err)
{
- struct rspamd_task *task = (struct rspamd_task *) conn->ud;
+ struct rspamd_task *task = (struct rspamd_task *) conn->ud;
msg_info ("abnormally closing connection from: %s, error: %s",
- rspamd_inet_address_to_string (&task->client_addr), err->message);
+ rspamd_inet_address_to_string (&task->client_addr), err->message);
if (task->state != CLOSING_CONNECTION) {
/* We still need to write a reply */
task->error_code = err->code;
- task->last_error =
- rspamd_mempool_strdup (task->task_pool, err->message);
+ task->last_error = rspamd_mempool_strdup (task->task_pool, err->message);
task->state = WRITE_REPLY;
rspamd_protocol_write_reply (task);
}
@@ -157,13 +156,13 @@ rspamd_worker_error_handler (struct rspamd_http_connection *conn, GError *err)
static gint
rspamd_worker_finish_handler (struct rspamd_http_connection *conn,
- struct rspamd_http_message *msg)
+ struct rspamd_http_message *msg)
{
- struct rspamd_task *task = (struct rspamd_task *) conn->ud;
+ struct rspamd_task *task = (struct rspamd_task *) conn->ud;
if (task->state == CLOSING_CONNECTION) {
msg_debug ("normally closing connection from: %s",
- rspamd_inet_address_to_string (&task->client_addr));
+ rspamd_inet_address_to_string (&task->client_addr));
destroy_session (task->s);
}
else {
@@ -184,23 +183,21 @@ rspamd_worker_finish_handler (struct rspamd_http_connection *conn,
static void
accept_socket (gint fd, short what, void *arg)
{
- struct rspamd_worker *worker = (struct rspamd_worker *) arg;
- struct rspamd_worker_ctx *ctx;
- struct rspamd_task *new_task;
- rspamd_inet_addr_t addr;
- gint nfd;
+ struct rspamd_worker *worker = (struct rspamd_worker *) arg;
+ struct rspamd_worker_ctx *ctx;
+ struct rspamd_task *new_task;
+ rspamd_inet_addr_t addr;
+ gint nfd;
ctx = worker->ctx;
if (ctx->max_tasks != 0 && ctx->tasks > ctx->max_tasks) {
- msg_info ("current tasks is now: %uD while maximum is: %uD",
- ctx->tasks,
- ctx->max_tasks);
+ msg_info ("current tasks is now: %uD while maximum is: %uD", ctx->tasks, ctx->max_tasks);
return;
}
if ((nfd =
- rspamd_accept_from_socket (fd, &addr)) == -1) {
+ rspamd_accept_from_socket (fd, &addr)) == -1) {
msg_warn ("accept failed: %s", strerror (errno));
return;
}
@@ -212,8 +209,8 @@ accept_socket (gint fd, short what, void *arg)
new_task = rspamd_task_new (worker);
msg_info ("accepted connection from %s port %d",
- rspamd_inet_address_to_string (&addr),
- rspamd_inet_address_get_port (&addr));
+ rspamd_inet_address_to_string (&addr),
+ rspamd_inet_address_get_port (&addr));
/* Copy some variables */
new_task->sock = nfd;
@@ -223,35 +220,26 @@ accept_socket (gint fd, short what, void *arg)
worker->srv->stat->connections_count++;
new_task->resolver = ctx->resolver;
- new_task->http_conn = rspamd_http_connection_new (
- rspamd_worker_body_handler,
- rspamd_worker_error_handler,
- rspamd_worker_finish_handler,
- 0,
- RSPAMD_HTTP_SERVER);
+ new_task->http_conn = rspamd_http_connection_new (rspamd_worker_body_handler,
+ rspamd_worker_error_handler, rspamd_worker_finish_handler, 0, RSPAMD_HTTP_SERVER);
new_task->ev_base = ctx->ev_base;
- ctx->tasks++;
- rspamd_mempool_add_destructor (new_task->task_pool,
- (rspamd_mempool_destruct_t)reduce_tasks_count, &ctx->tasks);
+ ctx->tasks ++;
+ rspamd_mempool_add_destructor (new_task->task_pool, (rspamd_mempool_destruct_t)reduce_tasks_count, &ctx->tasks);
/* Set up async session */
new_task->s = new_async_session (new_task->task_pool, rspamd_task_fin,
- rspamd_task_restore, rspamd_task_free_hard, new_task);
+ rspamd_task_restore, rspamd_task_free_hard, new_task);
new_task->classify_pool = ctx->classify_pool;
- rspamd_http_connection_read_message (new_task->http_conn,
- new_task,
- nfd,
- &ctx->io_tv,
- ctx->ev_base);
+ rspamd_http_connection_read_message (new_task->http_conn, new_task, nfd, &ctx->io_tv, ctx->ev_base);
}
gpointer
init_worker (struct rspamd_config *cfg)
{
- struct rspamd_worker_ctx *ctx;
- GQuark type;
+ struct rspamd_worker_ctx *ctx;
+ GQuark type;
type = g_quark_try_string ("normal");
@@ -262,35 +250,32 @@ init_worker (struct rspamd_config *cfg)
ctx->classify_threads = 1;
rspamd_rcl_register_worker_option (cfg, type, "mime",
- rspamd_rcl_parse_struct_boolean, ctx,
- G_STRUCT_OFFSET (struct rspamd_worker_ctx, is_mime), 0);
+ rspamd_rcl_parse_struct_boolean, ctx,
+ G_STRUCT_OFFSET (struct rspamd_worker_ctx, is_mime), 0);
rspamd_rcl_register_worker_option (cfg, type, "http",
- rspamd_rcl_parse_struct_boolean, ctx,
- G_STRUCT_OFFSET (struct rspamd_worker_ctx, is_http), 0);
+ rspamd_rcl_parse_struct_boolean, ctx,
+ G_STRUCT_OFFSET (struct rspamd_worker_ctx, is_http), 0);
rspamd_rcl_register_worker_option (cfg, type, "json",
- rspamd_rcl_parse_struct_boolean, ctx,
- G_STRUCT_OFFSET (struct rspamd_worker_ctx, is_json), 0);
+ rspamd_rcl_parse_struct_boolean, ctx,
+ G_STRUCT_OFFSET (struct rspamd_worker_ctx, is_json), 0);
rspamd_rcl_register_worker_option (cfg, type, "allow_learn",
- rspamd_rcl_parse_struct_boolean, ctx,
- G_STRUCT_OFFSET (struct rspamd_worker_ctx, allow_learn), 0);
+ rspamd_rcl_parse_struct_boolean, ctx,
+ G_STRUCT_OFFSET (struct rspamd_worker_ctx, allow_learn), 0);
rspamd_rcl_register_worker_option (cfg, type, "timeout",
- rspamd_rcl_parse_struct_time, ctx,
- G_STRUCT_OFFSET (struct rspamd_worker_ctx,
- timeout), RSPAMD_CL_FLAG_TIME_INTEGER);
+ rspamd_rcl_parse_struct_time, ctx,
+ G_STRUCT_OFFSET (struct rspamd_worker_ctx, timeout), RSPAMD_CL_FLAG_TIME_INTEGER);
rspamd_rcl_register_worker_option (cfg, type, "max_tasks",
- rspamd_rcl_parse_struct_integer, ctx,
- G_STRUCT_OFFSET (struct rspamd_worker_ctx,
- max_tasks), RSPAMD_CL_FLAG_INT_32);
+ rspamd_rcl_parse_struct_integer, ctx,
+ G_STRUCT_OFFSET (struct rspamd_worker_ctx, max_tasks), RSPAMD_CL_FLAG_INT_32);
rspamd_rcl_register_worker_option (cfg, type, "classify_threads",
- rspamd_rcl_parse_struct_integer, ctx,
- G_STRUCT_OFFSET (struct rspamd_worker_ctx,
- classify_threads), RSPAMD_CL_FLAG_INT_32);
+ rspamd_rcl_parse_struct_integer, ctx,
+ G_STRUCT_OFFSET (struct rspamd_worker_ctx, classify_threads), RSPAMD_CL_FLAG_INT_32);
return ctx;
}
@@ -301,9 +286,9 @@ init_worker (struct rspamd_config *cfg)
void
start_worker (struct rspamd_worker *worker)
{
- struct rspamd_worker_ctx *ctx = worker->ctx;
- GError *err = NULL;
- struct lua_locked_state *nL;
+ struct rspamd_worker_ctx *ctx = worker->ctx;
+ GError *err = NULL;
+ struct lua_locked_state *nL;
ctx->ev_base = rspamd_prepare_worker (worker, "normal", accept_socket);
msec_to_tv (ctx->timeout, &ctx->io_tv);
@@ -311,19 +296,13 @@ start_worker (struct rspamd_worker *worker)
start_map_watch (worker->srv->cfg, ctx->ev_base);
- ctx->resolver = dns_resolver_init (worker->srv->logger,
- ctx->ev_base,
- worker->srv->cfg);
+ ctx->resolver = dns_resolver_init (worker->srv->logger, ctx->ev_base, worker->srv->cfg);
/* Create classify pool */
ctx->classify_pool = NULL;
if (ctx->classify_threads > 1) {
nL = init_lua_locked (worker->srv->cfg);
- ctx->classify_pool = g_thread_pool_new (process_statfiles_threaded,
- nL,
- ctx->classify_threads,
- TRUE,
- &err);
+ ctx->classify_pool = g_thread_pool_new (process_statfiles_threaded, nL, ctx->classify_threads, TRUE, &err);
if (err != NULL) {
msg_err ("pool create failed: %s", err->message);
ctx->classify_pool = NULL;
@@ -337,6 +316,6 @@ start_worker (struct rspamd_worker *worker)
exit (EXIT_SUCCESS);
}
-/*
- * vi:ts=4
+/*
+ * vi:ts=4
*/