diff options
author | Vsevolod Stakhov <vsevolod@highsecure.ru> | 2019-06-19 17:07:56 +0100 |
---|---|---|
committer | Vsevolod Stakhov <vsevolod@highsecure.ru> | 2019-06-22 10:57:29 +0100 |
commit | 675b33dd2025cc1f8e732efa9ffc72d55e5a35d9 (patch) | |
tree | 1492d1527bd84c5b032ada128d88a1fb12449102 /src/worker.c | |
parent | eeb0beb73d7769341d1b6aa8fac4f27f7dc76b2e (diff) | |
download | rspamd-675b33dd2025cc1f8e732efa9ffc72d55e5a35d9.tar.gz rspamd-675b33dd2025cc1f8e732efa9ffc72d55e5a35d9.zip |
[Project] Adopt normal worker and contorller
Diffstat (limited to 'src/worker.c')
-rw-r--r-- | src/worker.c | 96 |
1 files changed, 32 insertions, 64 deletions
diff --git a/src/worker.c b/src/worker.c index 5a4adb325..ad0782b17 100644 --- a/src/worker.c +++ b/src/worker.c @@ -42,7 +42,7 @@ #include "lua/lua_common.h" /* 60 seconds for worker's IO */ -#define DEFAULT_WORKER_IO_TIMEOUT 60000 +#define DEFAULT_WORKER_IO_TIMEOUT 60.0 gpointer init_worker (struct rspamd_config *cfg); void start_worker (struct rspamd_worker *worker); @@ -73,11 +73,10 @@ static gboolean rspamd_worker_finalize (gpointer user_data) { struct rspamd_task *task = user_data; - struct timeval tv = {.tv_sec = 0, .tv_usec = 0}; if (!(task->flags & RSPAMD_TASK_FLAG_PROCESSING)) { msg_info_task ("finishing actions has been processed, terminating"); - event_base_loopexit (task->event_loop, &tv); + ev_break (task->event_loop, EVBREAK_ALL); rspamd_session_destroy (task->s); return TRUE; @@ -137,9 +136,9 @@ reduce_tasks_count (gpointer arg) } void -rspamd_task_timeout (gint fd, short what, gpointer ud) +rspamd_task_timeout (EV_P_ ev_timer *w, int revents) { - struct rspamd_task *task = (struct rspamd_task *) ud; + struct rspamd_task *task = (struct rspamd_task *)w->data; if (!(task->processed_stages & RSPAMD_TASK_STAGE_FILTERS)) { msg_info_task ("processing of task timed out, forced processing"); @@ -176,32 +175,13 @@ rspamd_task_timeout (gint fd, short what, gpointer ud) } void -rspamd_worker_guard_handler (gint fd, short what, void *data) +rspamd_worker_guard_handler (EV_P_ ev_io *w, int revents) { - struct rspamd_task *task = data; + struct rspamd_task *task = (struct rspamd_task *)w->data; gchar fake_buf[1024]; gssize r; -#ifdef EV_CLOSED - if (what == EV_CLOSED) { - if (!(task->flags & RSPAMD_TASK_FLAG_JSON) && - task->cfg->enable_shutdown_workaround) { - msg_info_task ("workaround for shutdown enabled, please update " - "your client, this support might be removed in future"); - shutdown (fd, SHUT_RD); - event_del (task->guard_ev); - task->guard_ev = NULL; - } - else { - msg_err_task ("the peer has closed connection unexpectedly"); - rspamd_session_destroy (task->s); - } - - return; - } -#endif - - r = read (fd, fake_buf, sizeof (fake_buf)); + r = read (w->fd, fake_buf, sizeof (fake_buf)); if (r > 0) { msg_warn_task ("received extra data after task is loaded, ignoring"); @@ -218,9 +198,8 @@ rspamd_worker_guard_handler (gint fd, short what, void *data) task->cfg->enable_shutdown_workaround) { msg_info_task ("workaround for shutdown enabled, please update " "your client, this support might be removed in future"); - shutdown (fd, SHUT_RD); - event_del (task->guard_ev); - task->guard_ev = NULL; + shutdown (w->fd, SHUT_RD); + ev_io_stop (task->event_loop, &task->guard_ev); } else { msg_err_task ("the peer has closed connection unexpectedly"); @@ -245,8 +224,6 @@ rspamd_worker_body_handler (struct rspamd_http_connection *conn, { struct rspamd_task *task = (struct rspamd_task *) conn->ud; struct rspamd_worker_ctx *ctx; - struct timeval task_tv; - struct event *guard_ev; ctx = task->worker->ctx; @@ -268,25 +245,16 @@ rspamd_worker_body_handler (struct rspamd_http_connection *conn, /* Set global timeout for the task */ if (ctx->task_timeout > 0.0) { - event_set (&task->timeout_ev, -1, EV_TIMEOUT, rspamd_task_timeout, - task); - event_base_set (ctx->ev_base, &task->timeout_ev); - double_to_tv (ctx->task_timeout, &task_tv); - event_add (&task->timeout_ev, &task_tv); + task->timeout_ev.data = task; + ev_timer_init (&task->timeout_ev, rspamd_task_timeout, + ctx->task_timeout, 0.0); + ev_timer_start (task->event_loop, &task->timeout_ev); } /* Set socket guard */ - guard_ev = rspamd_mempool_alloc (task->task_pool, sizeof (*guard_ev)); -#ifdef EV_CLOSED - event_set (guard_ev, task->sock, EV_READ|EV_PERSIST|EV_CLOSED, - rspamd_worker_guard_handler, task); -#else - event_set (guard_ev, task->sock, EV_READ|EV_PERSIST, - rspamd_worker_guard_handler, task); -#endif - event_base_set (task->event_loop, guard_ev); - event_add (guard_ev, NULL); - task->guard_ev = guard_ev; + task->guard_ev.data = task; + ev_io_init (&task->guard_ev, rspamd_worker_guard_handler, task->sock, EV_READ); + ev_io_start (task->event_loop, &task->guard_ev); rspamd_task_process (task, RSPAMD_TASK_PROCESS_ALL); @@ -359,9 +327,9 @@ rspamd_worker_finish_handler (struct rspamd_http_connection *conn, * Accept new connection and construct task */ static void -accept_socket (gint fd, short what, void *arg) +accept_socket (EV_P_ ev_io *w, int revents) { - struct rspamd_worker *worker = (struct rspamd_worker *) arg; + struct rspamd_worker *worker = (struct rspamd_worker *) w->data; struct rspamd_worker_ctx *ctx; struct rspamd_task *task; rspamd_inet_addr_t *addr; @@ -377,7 +345,8 @@ accept_socket (gint fd, short what, void *arg) } if ((nfd = - rspamd_accept_from_socket (fd, &addr, worker->accept_events)) == -1) { + rspamd_accept_from_socket (w->fd, &addr, + rspamd_worker_throttle_accept_events, worker->accept_events)) == -1) { msg_warn_ctx ("accept failed: %s", strerror (errno)); return; } @@ -386,7 +355,7 @@ accept_socket (gint fd, short what, void *arg) return; } - task = rspamd_task_new (worker, ctx->cfg, NULL, ctx->lang_det, ctx->ev_base); + task = rspamd_task_new (worker, ctx->cfg, NULL, ctx->lang_det, ctx->event_loop); msg_info_task ("accepted connection from %s port %d, task ptr: %p", rspamd_inet_address_to_string (addr), @@ -435,7 +404,7 @@ accept_socket (gint fd, short what, void *arg) rspamd_http_connection_read_message (task->http_conn, task, - &ctx->io_tv); + ctx->timeout); } #ifdef WITH_HYPERSCAN @@ -587,7 +556,7 @@ init_worker (struct rspamd_config *cfg) ctx, G_STRUCT_OFFSET (struct rspamd_worker_ctx, timeout), - RSPAMD_CL_FLAG_TIME_INTEGER, + RSPAMD_CL_FLAG_TIME_FLOAT, "Protocol IO timeout"); rspamd_rcl_register_worker_option (cfg, @@ -672,9 +641,8 @@ start_worker (struct rspamd_worker *worker) struct rspamd_worker_ctx *ctx = worker->ctx; ctx->cfg = worker->srv->cfg; - ctx->ev_base = rspamd_prepare_worker (worker, "normal", accept_socket); - msec_to_tv (ctx->timeout, &ctx->io_tv); - rspamd_symcache_start_refresh (worker->srv->cfg->cache, ctx->ev_base, + ctx->event_loop = rspamd_prepare_worker (worker, "normal", accept_socket); + rspamd_symcache_start_refresh (worker->srv->cfg->cache, ctx->event_loop, worker); if (isnan (ctx->task_timeout)) { @@ -687,20 +655,20 @@ start_worker (struct rspamd_worker *worker) } ctx->resolver = rspamd_dns_resolver_init (worker->srv->logger, - ctx->ev_base, + ctx->event_loop, worker->srv->cfg); - rspamd_map_watch (worker->srv->cfg, ctx->ev_base, ctx->resolver, worker, 0); + rspamd_map_watch (worker->srv->cfg, ctx->event_loop, ctx->resolver, worker, 0); rspamd_upstreams_library_config (worker->srv->cfg, ctx->cfg->ups_ctx, - ctx->ev_base, ctx->resolver->r); + ctx->event_loop, ctx->resolver->r); - ctx->http_ctx = rspamd_http_context_create (ctx->cfg, ctx->ev_base, + ctx->http_ctx = rspamd_http_context_create (ctx->cfg, ctx->event_loop, ctx->cfg->ups_ctx); - rspamd_worker_init_scanner (worker, ctx->ev_base, ctx->resolver, + rspamd_worker_init_scanner (worker, ctx->event_loop, ctx->resolver, &ctx->lang_det); - rspamd_lua_run_postloads (ctx->cfg->lua_state, ctx->cfg, ctx->ev_base, + rspamd_lua_run_postloads (ctx->cfg->lua_state, ctx->cfg, ctx->event_loop, worker); - event_base_loop (ctx->ev_base, 0); + ev_loop (ctx->event_loop, 0); rspamd_worker_block_signals (); rspamd_stat_close (); |