diff options
author | Vsevolod Stakhov <vsevolod@highsecure.ru> | 2014-07-23 12:53:08 +0100 |
---|---|---|
committer | Vsevolod Stakhov <vsevolod@highsecure.ru> | 2014-07-23 12:53:08 +0100 |
commit | fe79d8c5a39f2b717f78cc3f3ef21b3cfc46500b (patch) | |
tree | c84e6a5d4c5cd78a7a2cc3c7adbc7af5d0541682 /src/worker.c | |
parent | e0483657ff6cf1adc828ccce457814d61fe90a0d (diff) | |
download | rspamd-fe79d8c5a39f2b717f78cc3f3ef21b3cfc46500b.tar.gz rspamd-fe79d8c5a39f2b717f78cc3f3ef21b3cfc46500b.zip |
Revert "Unify code style."
This reverts commit e0483657ff6cf1adc828ccce457814d61fe90a0d.
Diffstat (limited to 'src/worker.c')
-rw-r--r-- | src/worker.c | 175 |
1 files changed, 77 insertions, 98 deletions
diff --git a/src/worker.c b/src/worker.c index 232ed6c4a..229ee1a4e 100644 --- a/src/worker.c +++ b/src/worker.c @@ -26,16 +26,16 @@ * Rspamd worker implementation */ -#include "cfg_file.h" #include "config.h" -#include "dns.h" +#include "util.h" #include "main.h" -#include "map.h" -#include "message.h" #include "protocol.h" #include "upstream.h" +#include "cfg_file.h" #include "url.h" -#include "util.h" +#include "message.h" +#include "map.h" +#include "dns.h" #include "lua/lua_common.h" @@ -50,42 +50,42 @@ gpointer init_worker (struct rspamd_config *cfg); void start_worker (struct rspamd_worker *worker); worker_t normal_worker = { - "normal", /* Name */ - init_worker, /* Init function */ - start_worker, /* Start function */ - TRUE, /* Has socket */ - FALSE, /* Non unique */ - FALSE, /* Non threaded */ - TRUE, /* Killable */ - SOCK_STREAM /* TCP socket */ + "normal", /* Name */ + init_worker, /* Init function */ + start_worker, /* Start function */ + TRUE, /* Has socket */ + FALSE, /* Non unique */ + FALSE, /* Non threaded */ + TRUE, /* Killable */ + SOCK_STREAM /* TCP socket */ }; /* * Worker's context */ struct rspamd_worker_ctx { - guint32 timeout; - struct timeval io_tv; - /* Detect whether this worker is mime worker */ - gboolean is_mime; + guint32 timeout; + struct timeval io_tv; + /* Detect whether this worker is mime worker */ + gboolean is_mime; /* HTTP worker */ - gboolean is_http; - /* JSON output */ - gboolean is_json; + gboolean is_http; + /* JSON output */ + gboolean is_json; /* Allow learning throught worker */ - gboolean allow_learn; + gboolean allow_learn; /* DNS resolver */ - struct rspamd_dns_resolver *resolver; + struct rspamd_dns_resolver *resolver; /* Current tasks */ - guint32 tasks; + guint32 tasks; /* Limit of tasks */ - guint32 max_tasks; + guint32 max_tasks; /* Classify threads */ - guint32 classify_threads; + guint32 classify_threads; /* Classify threads */ - GThreadPool *classify_pool; + GThreadPool *classify_pool; /* Events base */ - struct event_base *ev_base; + struct event_base *ev_base; }; /* @@ -94,18 +94,18 @@ struct rspamd_worker_ctx { static void reduce_tasks_count (gpointer arg) { - guint32 *tasks = arg; + guint32 *tasks = arg; - (*tasks)--; + (*tasks) --; } static gint rspamd_worker_body_handler (struct rspamd_http_connection *conn, - struct rspamd_http_message *msg, - const gchar *chunk, gsize len) + struct rspamd_http_message *msg, + const gchar *chunk, gsize len) { - struct rspamd_task *task = (struct rspamd_task *) conn->ud; - struct rspamd_worker_ctx *ctx; + struct rspamd_task *task = (struct rspamd_task *) conn->ud; + struct rspamd_worker_ctx *ctx; ctx = task->worker->ctx; @@ -137,15 +137,14 @@ rspamd_worker_body_handler (struct rspamd_http_connection *conn, static void rspamd_worker_error_handler (struct rspamd_http_connection *conn, GError *err) { - struct rspamd_task *task = (struct rspamd_task *) conn->ud; + struct rspamd_task *task = (struct rspamd_task *) conn->ud; msg_info ("abnormally closing connection from: %s, error: %s", - rspamd_inet_address_to_string (&task->client_addr), err->message); + rspamd_inet_address_to_string (&task->client_addr), err->message); if (task->state != CLOSING_CONNECTION) { /* We still need to write a reply */ task->error_code = err->code; - task->last_error = - rspamd_mempool_strdup (task->task_pool, err->message); + task->last_error = rspamd_mempool_strdup (task->task_pool, err->message); task->state = WRITE_REPLY; rspamd_protocol_write_reply (task); } @@ -157,13 +156,13 @@ rspamd_worker_error_handler (struct rspamd_http_connection *conn, GError *err) static gint rspamd_worker_finish_handler (struct rspamd_http_connection *conn, - struct rspamd_http_message *msg) + struct rspamd_http_message *msg) { - struct rspamd_task *task = (struct rspamd_task *) conn->ud; + struct rspamd_task *task = (struct rspamd_task *) conn->ud; if (task->state == CLOSING_CONNECTION) { msg_debug ("normally closing connection from: %s", - rspamd_inet_address_to_string (&task->client_addr)); + rspamd_inet_address_to_string (&task->client_addr)); destroy_session (task->s); } else { @@ -184,23 +183,21 @@ rspamd_worker_finish_handler (struct rspamd_http_connection *conn, static void accept_socket (gint fd, short what, void *arg) { - struct rspamd_worker *worker = (struct rspamd_worker *) arg; - struct rspamd_worker_ctx *ctx; - struct rspamd_task *new_task; - rspamd_inet_addr_t addr; - gint nfd; + struct rspamd_worker *worker = (struct rspamd_worker *) arg; + struct rspamd_worker_ctx *ctx; + struct rspamd_task *new_task; + rspamd_inet_addr_t addr; + gint nfd; ctx = worker->ctx; if (ctx->max_tasks != 0 && ctx->tasks > ctx->max_tasks) { - msg_info ("current tasks is now: %uD while maximum is: %uD", - ctx->tasks, - ctx->max_tasks); + msg_info ("current tasks is now: %uD while maximum is: %uD", ctx->tasks, ctx->max_tasks); return; } if ((nfd = - rspamd_accept_from_socket (fd, &addr)) == -1) { + rspamd_accept_from_socket (fd, &addr)) == -1) { msg_warn ("accept failed: %s", strerror (errno)); return; } @@ -212,8 +209,8 @@ accept_socket (gint fd, short what, void *arg) new_task = rspamd_task_new (worker); msg_info ("accepted connection from %s port %d", - rspamd_inet_address_to_string (&addr), - rspamd_inet_address_get_port (&addr)); + rspamd_inet_address_to_string (&addr), + rspamd_inet_address_get_port (&addr)); /* Copy some variables */ new_task->sock = nfd; @@ -223,35 +220,26 @@ accept_socket (gint fd, short what, void *arg) worker->srv->stat->connections_count++; new_task->resolver = ctx->resolver; - new_task->http_conn = rspamd_http_connection_new ( - rspamd_worker_body_handler, - rspamd_worker_error_handler, - rspamd_worker_finish_handler, - 0, - RSPAMD_HTTP_SERVER); + new_task->http_conn = rspamd_http_connection_new (rspamd_worker_body_handler, + rspamd_worker_error_handler, rspamd_worker_finish_handler, 0, RSPAMD_HTTP_SERVER); new_task->ev_base = ctx->ev_base; - ctx->tasks++; - rspamd_mempool_add_destructor (new_task->task_pool, - (rspamd_mempool_destruct_t)reduce_tasks_count, &ctx->tasks); + ctx->tasks ++; + rspamd_mempool_add_destructor (new_task->task_pool, (rspamd_mempool_destruct_t)reduce_tasks_count, &ctx->tasks); /* Set up async session */ new_task->s = new_async_session (new_task->task_pool, rspamd_task_fin, - rspamd_task_restore, rspamd_task_free_hard, new_task); + rspamd_task_restore, rspamd_task_free_hard, new_task); new_task->classify_pool = ctx->classify_pool; - rspamd_http_connection_read_message (new_task->http_conn, - new_task, - nfd, - &ctx->io_tv, - ctx->ev_base); + rspamd_http_connection_read_message (new_task->http_conn, new_task, nfd, &ctx->io_tv, ctx->ev_base); } gpointer init_worker (struct rspamd_config *cfg) { - struct rspamd_worker_ctx *ctx; - GQuark type; + struct rspamd_worker_ctx *ctx; + GQuark type; type = g_quark_try_string ("normal"); @@ -262,35 +250,32 @@ init_worker (struct rspamd_config *cfg) ctx->classify_threads = 1; rspamd_rcl_register_worker_option (cfg, type, "mime", - rspamd_rcl_parse_struct_boolean, ctx, - G_STRUCT_OFFSET (struct rspamd_worker_ctx, is_mime), 0); + rspamd_rcl_parse_struct_boolean, ctx, + G_STRUCT_OFFSET (struct rspamd_worker_ctx, is_mime), 0); rspamd_rcl_register_worker_option (cfg, type, "http", - rspamd_rcl_parse_struct_boolean, ctx, - G_STRUCT_OFFSET (struct rspamd_worker_ctx, is_http), 0); + rspamd_rcl_parse_struct_boolean, ctx, + G_STRUCT_OFFSET (struct rspamd_worker_ctx, is_http), 0); rspamd_rcl_register_worker_option (cfg, type, "json", - rspamd_rcl_parse_struct_boolean, ctx, - G_STRUCT_OFFSET (struct rspamd_worker_ctx, is_json), 0); + rspamd_rcl_parse_struct_boolean, ctx, + G_STRUCT_OFFSET (struct rspamd_worker_ctx, is_json), 0); rspamd_rcl_register_worker_option (cfg, type, "allow_learn", - rspamd_rcl_parse_struct_boolean, ctx, - G_STRUCT_OFFSET (struct rspamd_worker_ctx, allow_learn), 0); + rspamd_rcl_parse_struct_boolean, ctx, + G_STRUCT_OFFSET (struct rspamd_worker_ctx, allow_learn), 0); rspamd_rcl_register_worker_option (cfg, type, "timeout", - rspamd_rcl_parse_struct_time, ctx, - G_STRUCT_OFFSET (struct rspamd_worker_ctx, - timeout), RSPAMD_CL_FLAG_TIME_INTEGER); + rspamd_rcl_parse_struct_time, ctx, + G_STRUCT_OFFSET (struct rspamd_worker_ctx, timeout), RSPAMD_CL_FLAG_TIME_INTEGER); rspamd_rcl_register_worker_option (cfg, type, "max_tasks", - rspamd_rcl_parse_struct_integer, ctx, - G_STRUCT_OFFSET (struct rspamd_worker_ctx, - max_tasks), RSPAMD_CL_FLAG_INT_32); + rspamd_rcl_parse_struct_integer, ctx, + G_STRUCT_OFFSET (struct rspamd_worker_ctx, max_tasks), RSPAMD_CL_FLAG_INT_32); rspamd_rcl_register_worker_option (cfg, type, "classify_threads", - rspamd_rcl_parse_struct_integer, ctx, - G_STRUCT_OFFSET (struct rspamd_worker_ctx, - classify_threads), RSPAMD_CL_FLAG_INT_32); + rspamd_rcl_parse_struct_integer, ctx, + G_STRUCT_OFFSET (struct rspamd_worker_ctx, classify_threads), RSPAMD_CL_FLAG_INT_32); return ctx; } @@ -301,9 +286,9 @@ init_worker (struct rspamd_config *cfg) void start_worker (struct rspamd_worker *worker) { - struct rspamd_worker_ctx *ctx = worker->ctx; - GError *err = NULL; - struct lua_locked_state *nL; + struct rspamd_worker_ctx *ctx = worker->ctx; + GError *err = NULL; + struct lua_locked_state *nL; ctx->ev_base = rspamd_prepare_worker (worker, "normal", accept_socket); msec_to_tv (ctx->timeout, &ctx->io_tv); @@ -311,19 +296,13 @@ start_worker (struct rspamd_worker *worker) start_map_watch (worker->srv->cfg, ctx->ev_base); - ctx->resolver = dns_resolver_init (worker->srv->logger, - ctx->ev_base, - worker->srv->cfg); + ctx->resolver = dns_resolver_init (worker->srv->logger, ctx->ev_base, worker->srv->cfg); /* Create classify pool */ ctx->classify_pool = NULL; if (ctx->classify_threads > 1) { nL = init_lua_locked (worker->srv->cfg); - ctx->classify_pool = g_thread_pool_new (process_statfiles_threaded, - nL, - ctx->classify_threads, - TRUE, - &err); + ctx->classify_pool = g_thread_pool_new (process_statfiles_threaded, nL, ctx->classify_threads, TRUE, &err); if (err != NULL) { msg_err ("pool create failed: %s", err->message); ctx->classify_pool = NULL; @@ -337,6 +316,6 @@ start_worker (struct rspamd_worker *worker) exit (EXIT_SUCCESS); } -/* - * vi:ts=4 +/* + * vi:ts=4 */ |