aboutsummaryrefslogtreecommitdiffstats
path: root/src/worker.c
diff options
context:
space:
mode:
authorVsevolod Stakhov <vsevolod@highsecure.ru>2019-06-19 17:07:56 +0100
committerVsevolod Stakhov <vsevolod@highsecure.ru>2019-06-22 10:57:29 +0100
commit675b33dd2025cc1f8e732efa9ffc72d55e5a35d9 (patch)
tree1492d1527bd84c5b032ada128d88a1fb12449102 /src/worker.c
parenteeb0beb73d7769341d1b6aa8fac4f27f7dc76b2e (diff)
downloadrspamd-675b33dd2025cc1f8e732efa9ffc72d55e5a35d9.tar.gz
rspamd-675b33dd2025cc1f8e732efa9ffc72d55e5a35d9.zip
[Project] Adopt normal worker and contorller
Diffstat (limited to 'src/worker.c')
-rw-r--r--src/worker.c96
1 files changed, 32 insertions, 64 deletions
diff --git a/src/worker.c b/src/worker.c
index 5a4adb325..ad0782b17 100644
--- a/src/worker.c
+++ b/src/worker.c
@@ -42,7 +42,7 @@
#include "lua/lua_common.h"
/* 60 seconds for worker's IO */
-#define DEFAULT_WORKER_IO_TIMEOUT 60000
+#define DEFAULT_WORKER_IO_TIMEOUT 60.0
gpointer init_worker (struct rspamd_config *cfg);
void start_worker (struct rspamd_worker *worker);
@@ -73,11 +73,10 @@ static gboolean
rspamd_worker_finalize (gpointer user_data)
{
struct rspamd_task *task = user_data;
- struct timeval tv = {.tv_sec = 0, .tv_usec = 0};
if (!(task->flags & RSPAMD_TASK_FLAG_PROCESSING)) {
msg_info_task ("finishing actions has been processed, terminating");
- event_base_loopexit (task->event_loop, &tv);
+ ev_break (task->event_loop, EVBREAK_ALL);
rspamd_session_destroy (task->s);
return TRUE;
@@ -137,9 +136,9 @@ reduce_tasks_count (gpointer arg)
}
void
-rspamd_task_timeout (gint fd, short what, gpointer ud)
+rspamd_task_timeout (EV_P_ ev_timer *w, int revents)
{
- struct rspamd_task *task = (struct rspamd_task *) ud;
+ struct rspamd_task *task = (struct rspamd_task *)w->data;
if (!(task->processed_stages & RSPAMD_TASK_STAGE_FILTERS)) {
msg_info_task ("processing of task timed out, forced processing");
@@ -176,32 +175,13 @@ rspamd_task_timeout (gint fd, short what, gpointer ud)
}
void
-rspamd_worker_guard_handler (gint fd, short what, void *data)
+rspamd_worker_guard_handler (EV_P_ ev_io *w, int revents)
{
- struct rspamd_task *task = data;
+ struct rspamd_task *task = (struct rspamd_task *)w->data;
gchar fake_buf[1024];
gssize r;
-#ifdef EV_CLOSED
- if (what == EV_CLOSED) {
- if (!(task->flags & RSPAMD_TASK_FLAG_JSON) &&
- task->cfg->enable_shutdown_workaround) {
- msg_info_task ("workaround for shutdown enabled, please update "
- "your client, this support might be removed in future");
- shutdown (fd, SHUT_RD);
- event_del (task->guard_ev);
- task->guard_ev = NULL;
- }
- else {
- msg_err_task ("the peer has closed connection unexpectedly");
- rspamd_session_destroy (task->s);
- }
-
- return;
- }
-#endif
-
- r = read (fd, fake_buf, sizeof (fake_buf));
+ r = read (w->fd, fake_buf, sizeof (fake_buf));
if (r > 0) {
msg_warn_task ("received extra data after task is loaded, ignoring");
@@ -218,9 +198,8 @@ rspamd_worker_guard_handler (gint fd, short what, void *data)
task->cfg->enable_shutdown_workaround) {
msg_info_task ("workaround for shutdown enabled, please update "
"your client, this support might be removed in future");
- shutdown (fd, SHUT_RD);
- event_del (task->guard_ev);
- task->guard_ev = NULL;
+ shutdown (w->fd, SHUT_RD);
+ ev_io_stop (task->event_loop, &task->guard_ev);
}
else {
msg_err_task ("the peer has closed connection unexpectedly");
@@ -245,8 +224,6 @@ rspamd_worker_body_handler (struct rspamd_http_connection *conn,
{
struct rspamd_task *task = (struct rspamd_task *) conn->ud;
struct rspamd_worker_ctx *ctx;
- struct timeval task_tv;
- struct event *guard_ev;
ctx = task->worker->ctx;
@@ -268,25 +245,16 @@ rspamd_worker_body_handler (struct rspamd_http_connection *conn,
/* Set global timeout for the task */
if (ctx->task_timeout > 0.0) {
- event_set (&task->timeout_ev, -1, EV_TIMEOUT, rspamd_task_timeout,
- task);
- event_base_set (ctx->ev_base, &task->timeout_ev);
- double_to_tv (ctx->task_timeout, &task_tv);
- event_add (&task->timeout_ev, &task_tv);
+ task->timeout_ev.data = task;
+ ev_timer_init (&task->timeout_ev, rspamd_task_timeout,
+ ctx->task_timeout, 0.0);
+ ev_timer_start (task->event_loop, &task->timeout_ev);
}
/* Set socket guard */
- guard_ev = rspamd_mempool_alloc (task->task_pool, sizeof (*guard_ev));
-#ifdef EV_CLOSED
- event_set (guard_ev, task->sock, EV_READ|EV_PERSIST|EV_CLOSED,
- rspamd_worker_guard_handler, task);
-#else
- event_set (guard_ev, task->sock, EV_READ|EV_PERSIST,
- rspamd_worker_guard_handler, task);
-#endif
- event_base_set (task->event_loop, guard_ev);
- event_add (guard_ev, NULL);
- task->guard_ev = guard_ev;
+ task->guard_ev.data = task;
+ ev_io_init (&task->guard_ev, rspamd_worker_guard_handler, task->sock, EV_READ);
+ ev_io_start (task->event_loop, &task->guard_ev);
rspamd_task_process (task, RSPAMD_TASK_PROCESS_ALL);
@@ -359,9 +327,9 @@ rspamd_worker_finish_handler (struct rspamd_http_connection *conn,
* Accept new connection and construct task
*/
static void
-accept_socket (gint fd, short what, void *arg)
+accept_socket (EV_P_ ev_io *w, int revents)
{
- struct rspamd_worker *worker = (struct rspamd_worker *) arg;
+ struct rspamd_worker *worker = (struct rspamd_worker *) w->data;
struct rspamd_worker_ctx *ctx;
struct rspamd_task *task;
rspamd_inet_addr_t *addr;
@@ -377,7 +345,8 @@ accept_socket (gint fd, short what, void *arg)
}
if ((nfd =
- rspamd_accept_from_socket (fd, &addr, worker->accept_events)) == -1) {
+ rspamd_accept_from_socket (w->fd, &addr,
+ rspamd_worker_throttle_accept_events, worker->accept_events)) == -1) {
msg_warn_ctx ("accept failed: %s", strerror (errno));
return;
}
@@ -386,7 +355,7 @@ accept_socket (gint fd, short what, void *arg)
return;
}
- task = rspamd_task_new (worker, ctx->cfg, NULL, ctx->lang_det, ctx->ev_base);
+ task = rspamd_task_new (worker, ctx->cfg, NULL, ctx->lang_det, ctx->event_loop);
msg_info_task ("accepted connection from %s port %d, task ptr: %p",
rspamd_inet_address_to_string (addr),
@@ -435,7 +404,7 @@ accept_socket (gint fd, short what, void *arg)
rspamd_http_connection_read_message (task->http_conn,
task,
- &ctx->io_tv);
+ ctx->timeout);
}
#ifdef WITH_HYPERSCAN
@@ -587,7 +556,7 @@ init_worker (struct rspamd_config *cfg)
ctx,
G_STRUCT_OFFSET (struct rspamd_worker_ctx,
timeout),
- RSPAMD_CL_FLAG_TIME_INTEGER,
+ RSPAMD_CL_FLAG_TIME_FLOAT,
"Protocol IO timeout");
rspamd_rcl_register_worker_option (cfg,
@@ -672,9 +641,8 @@ start_worker (struct rspamd_worker *worker)
struct rspamd_worker_ctx *ctx = worker->ctx;
ctx->cfg = worker->srv->cfg;
- ctx->ev_base = rspamd_prepare_worker (worker, "normal", accept_socket);
- msec_to_tv (ctx->timeout, &ctx->io_tv);
- rspamd_symcache_start_refresh (worker->srv->cfg->cache, ctx->ev_base,
+ ctx->event_loop = rspamd_prepare_worker (worker, "normal", accept_socket);
+ rspamd_symcache_start_refresh (worker->srv->cfg->cache, ctx->event_loop,
worker);
if (isnan (ctx->task_timeout)) {
@@ -687,20 +655,20 @@ start_worker (struct rspamd_worker *worker)
}
ctx->resolver = rspamd_dns_resolver_init (worker->srv->logger,
- ctx->ev_base,
+ ctx->event_loop,
worker->srv->cfg);
- rspamd_map_watch (worker->srv->cfg, ctx->ev_base, ctx->resolver, worker, 0);
+ rspamd_map_watch (worker->srv->cfg, ctx->event_loop, ctx->resolver, worker, 0);
rspamd_upstreams_library_config (worker->srv->cfg, ctx->cfg->ups_ctx,
- ctx->ev_base, ctx->resolver->r);
+ ctx->event_loop, ctx->resolver->r);
- ctx->http_ctx = rspamd_http_context_create (ctx->cfg, ctx->ev_base,
+ ctx->http_ctx = rspamd_http_context_create (ctx->cfg, ctx->event_loop,
ctx->cfg->ups_ctx);
- rspamd_worker_init_scanner (worker, ctx->ev_base, ctx->resolver,
+ rspamd_worker_init_scanner (worker, ctx->event_loop, ctx->resolver,
&ctx->lang_det);
- rspamd_lua_run_postloads (ctx->cfg->lua_state, ctx->cfg, ctx->ev_base,
+ rspamd_lua_run_postloads (ctx->cfg->lua_state, ctx->cfg, ctx->event_loop,
worker);
- event_base_loop (ctx->ev_base, 0);
+ ev_loop (ctx->event_loop, 0);
rspamd_worker_block_signals ();
rspamd_stat_close ();