From 63f823eb9d6b4cfed6c3014ab350dfc61f33cb28 Mon Sep 17 00:00:00 2001 From: Vsevolod Stakhov Date: Wed, 19 Jun 2019 17:46:28 +0100 Subject: [PATCH] [Project] Further workers refactoring --- src/CMakeLists.txt | 5 +-- src/rspamadm/CMakeLists.txt | 3 +- src/rspamadm/lua_repl.c | 2 +- src/rspamd.c | 84 +++++++++++++++++-------------------- 4 files changed, 43 insertions(+), 51 deletions(-) diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index caa0b5d0d..a23c4e505 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -91,8 +91,7 @@ SET(RSPAMDSRC controller.c fuzzy_storage.c rspamd.c worker.c - rspamd_proxy.c - log_helper.c) + rspamd_proxy.c) SET(PLUGINSSRC plugins/surbl.c plugins/regexp.c @@ -103,7 +102,7 @@ SET(PLUGINSSRC plugins/surbl.c libserver/rspamd_control.c) SET(MODULES_LIST surbl regexp chartable fuzzy_check spf dkim) -SET(WORKERS_LIST normal controller fuzzy lua rspamd_proxy log_helper) +SET(WORKERS_LIST normal controller fuzzy rspamd_proxy) IF (ENABLE_HYPERSCAN MATCHES "ON") LIST(APPEND WORKERS_LIST "hs_helper") LIST(APPEND RSPAMDSRC "hs_helper.c") diff --git a/src/rspamadm/CMakeLists.txt b/src/rspamadm/CMakeLists.txt index b98b99473..3d4f2f490 100644 --- a/src/rspamadm/CMakeLists.txt +++ b/src/rspamadm/CMakeLists.txt @@ -16,8 +16,7 @@ SET(RSPAMADMSRC rspamadm.c ${CMAKE_SOURCE_DIR}/src/controller.c ${CMAKE_SOURCE_DIR}/src/fuzzy_storage.c ${CMAKE_SOURCE_DIR}/src/worker.c - ${CMAKE_SOURCE_DIR}/src/rspamd_proxy.c - ${CMAKE_SOURCE_DIR}/src/log_helper.c) + ${CMAKE_SOURCE_DIR}/src/rspamd_proxy.c) INCLUDE_DIRECTORIES(${CMAKE_CURRENT_BINARY_DIR}) IF (ENABLE_HYPERSCAN MATCHES "ON") LIST(APPEND RSPAMADMSRC "${CMAKE_SOURCE_DIR}/src/hs_helper.c") diff --git a/src/rspamadm/lua_repl.c b/src/rspamadm/lua_repl.c index 33805d66b..a95521bdb 100644 --- a/src/rspamadm/lua_repl.c +++ b/src/rspamadm/lua_repl.c @@ -296,7 +296,7 @@ wait_session_events (void) { /* XXX: it's probably worth to add timeout here - not to wait forever */ while (rspamd_session_events_pending (rspamadm_session) > 0) { - event_base_loop (rspamd_main->event_loop, EVLOOP_ONCE); + ev_loop (rspamd_main->event_loop, EVLOOP_ONESHOT); } } diff --git a/src/rspamd.c b/src/rspamd.c index a4a659a53..813b7b7bb 100644 --- a/src/rspamd.c +++ b/src/rspamd.c @@ -358,21 +358,22 @@ reread_config (struct rspamd_main *rspamd_main) struct waiting_worker { struct rspamd_main *rspamd_main; - struct event wait_ev; + struct ev_timer wait_ev; struct rspamd_worker_conf *cf; guint oldindex; }; static void -rspamd_fork_delayed_cb (gint signo, short what, gpointer arg) +rspamd_fork_delayed_cb (EV_P_ ev_timer *w, int revents) { - struct waiting_worker *w = arg; - - event_del (&w->wait_ev); - rspamd_fork_worker (w->rspamd_main, w->cf, w->oldindex, - w->rspamd_main->event_loop); - REF_RELEASE (w->cf); - g_free (w); + struct waiting_worker *waiting_worker = (struct waiting_worker *)w->data; + + ev_timer_stop (EV_A_ &waiting_worker->wait_ev); + rspamd_fork_worker (waiting_worker->rspamd_main, waiting_worker->cf, + waiting_worker->oldindex, + waiting_worker->rspamd_main->event_loop); + REF_RELEASE (waiting_worker->cf); + g_free (waiting_worker); } static void @@ -390,9 +391,8 @@ rspamd_fork_delayed (struct rspamd_worker_conf *cf, tv.tv_sec = SOFT_FORK_TIME; tv.tv_usec = 0; REF_RETAIN (cf); - event_set (&nw->wait_ev, -1, EV_TIMEOUT, rspamd_fork_delayed_cb, nw); - event_base_set (rspamd_main->event_loop, &nw->wait_ev); - event_add (&nw->wait_ev, &tv); + ev_timer_init (&nw->wait_ev, rspamd_fork_delayed_cb, SOFT_FORK_TIME, 0.0); + ev_timer_start (rspamd_main->event_loop, &nw->wait_ev); } static GList * @@ -803,7 +803,7 @@ wait_for_workers (gpointer key, gpointer value, gpointer unused) if (w->tmp_data) { g_free (w->tmp_data); } - event_del (&w->srv_ev); + ev_io_stop (rspamd_main->event_loop, &w->srv_ev); } if (w->finish_actions) { @@ -1026,9 +1026,9 @@ rspamd_hup_handler (struct ev_loop *loop, ev_signal *w, int revents) } static void -rspamd_cld_handler (gint signo, short what, gpointer arg) +rspamd_cld_handler (EV_P_ ev_signal *w, int revents) { - struct rspamd_main *rspamd_main = arg; + struct rspamd_main *rspamd_main = (struct rspamd_main *)w->data; guint i; gint res = 0; struct rspamd_worker *cur; @@ -1135,7 +1135,7 @@ rspamd_cld_handler (gint signo, short what, gpointer arg) if (cur->tmp_data) { g_free (cur->tmp_data); } - event_del (&cur->srv_ev); + ev_io_stop (rspamd_main->event_loop, &cur->srv_ev); } if (cur->control_pipe[0] != -1) { @@ -1166,29 +1166,29 @@ rspamd_cld_handler (gint signo, short what, gpointer arg) } static void -rspamd_final_term_handler (gint signo, short what, gpointer arg) +rspamd_final_term_handler (EV_P_ ev_timer *w, int revents) { - struct rspamd_main *rspamd_main = arg; + struct rspamd_main *rspamd_main = (struct rspamd_main *)w->data; term_attempts--; g_hash_table_foreach_remove (rspamd_main->workers, wait_for_workers, NULL); if (g_hash_table_size (rspamd_main->workers) == 0) { - event_base_loopexit (rspamd_main->event_loop, NULL); + ev_break (rspamd_main->event_loop, EVBREAK_ALL); } } /* Control socket handler */ static void -rspamd_control_handler (gint fd, short what, gpointer arg) +rspamd_control_handler (EV_P_ ev_io *w, int revents) { - struct rspamd_main *rspamd_main = arg; + struct rspamd_main *rspamd_main = (struct rspamd_main *)w->data; rspamd_inet_addr_t *addr; gint nfd; if ((nfd = - rspamd_accept_from_socket (fd, &addr, NULL)) == -1) { + rspamd_accept_from_socket (w->fd, &addr, NULL, NULL)) == -1) { msg_warn_main ("accept failed: %s", strerror (errno)); return; } @@ -1246,7 +1246,6 @@ main (gint argc, gchar **argv, gchar **env) struct ev_loop *event_loop; ev_signal term_ev, int_ev, cld_ev, hup_ev, usr1_ev; ev_io control_ev; - struct timeval term_tv; struct rspamd_main *rspamd_main; gboolean skip_pid = FALSE, valgrind_mode = FALSE; @@ -1524,10 +1523,9 @@ main (gint argc, gchar **argv, gchar **env) /* XXX: deal with children */ - evsignal_set (&cld_ev, SIGCHLD, rspamd_cld_handler, rspamd_main); - event_base_set (event_loop, &cld_ev); - event_add (&cld_ev, NULL); - + cld_ev.data = rspamd_main; + ev_signal_init (&cld_ev, rspamd_cld_handler, SIGCHLD); + ev_signal_start (event_loop, &cld_ev); rspamd_check_core_limits (rspamd_main); rspamd_mempool_lock_mutex (rspamd_main->start_mtx); @@ -1545,18 +1543,17 @@ main (gint argc, gchar **argv, gchar **env) ev_io_start (event_loop, &control_ev); } - event_base_loop (event_loop, 0); + ev_loop (event_loop, 0); /* We need to block signals unless children are waited for */ rspamd_worker_block_signals (); - - event_del (&term_ev); - event_del (&int_ev); - event_del (&hup_ev); - event_del (&cld_ev); - event_del (&usr1_ev); + ev_signal_stop (event_loop, &term_ev); + ev_signal_stop (event_loop, &int_ev); + ev_signal_stop (event_loop, &hup_ev); + ev_signal_stop (event_loop, &cld_ev); + ev_signal_stop (event_loop, &usr1_ev); if (control_fd != -1) { - event_del (&control_ev); + ev_io_stop (event_loop, &control_ev); close (control_fd); } @@ -1568,20 +1565,17 @@ main (gint argc, gchar **argv, gchar **env) term_attempts = TERMINATION_ATTEMPTS; } - /* Check each 200 ms */ - term_tv.tv_sec = 0; - term_tv.tv_usec = 200000; /* Wait for workers termination */ g_hash_table_foreach_remove (rspamd_main->workers, wait_for_workers, NULL); - event_set (&term_ev, -1, EV_TIMEOUT|EV_PERSIST, - rspamd_final_term_handler, rspamd_main); - event_base_set (event_loop, &term_ev); - event_add (&term_ev, &term_tv); + static ev_timer ev_finale; + + ev_finale.data = rspamd_main; + ev_timer_init (&ev_finale, rspamd_final_term_handler, 0.2, 0.2); + ev_timer_start (event_loop, &ev_finale); - event_base_loop (event_loop, 0); - event_del (&term_ev); + ev_loop (event_loop, 0); /* Maybe save roll history */ if (rspamd_main->cfg->history_file) { @@ -1602,7 +1596,7 @@ main (gint argc, gchar **argv, gchar **env) } g_free (rspamd_main); - event_base_free (event_loop); + ev_unref (event_loop); sqlite3_shutdown (); if (control_addr) { -- 2.39.5