diff options
author | Vsevolod Stakhov <vsevolod@highsecure.ru> | 2019-06-19 11:42:58 +0100 |
---|---|---|
committer | Vsevolod Stakhov <vsevolod@highsecure.ru> | 2019-06-22 10:57:29 +0100 |
commit | 0334b8e433a45513c0087dda20f22a26b2e16ad1 (patch) | |
tree | 767ec1825742c1ad4a4a41c64ee846c54be40eae /src | |
parent | a3a8b32851bd236333af1353dad0d663f30a555d (diff) | |
download | rspamd-0334b8e433a45513c0087dda20f22a26b2e16ad1.tar.gz rspamd-0334b8e433a45513c0087dda20f22a26b2e16ad1.zip |
[Project] Further rework
Diffstat (limited to 'src')
-rw-r--r-- | src/libserver/milter.c | 14 | ||||
-rw-r--r-- | src/libserver/worker_util.c | 115 | ||||
-rw-r--r-- | src/libserver/worker_util.h | 2 | ||||
-rw-r--r-- | src/libstat/learn_cache/redis_cache.c | 27 | ||||
-rw-r--r-- | src/rspamd.h | 2 |
5 files changed, 77 insertions, 83 deletions
diff --git a/src/libserver/milter.c b/src/libserver/milter.c index bb27d2ff1..897938df0 100644 --- a/src/libserver/milter.c +++ b/src/libserver/milter.c @@ -186,10 +186,7 @@ rspamd_milter_session_dtor (struct rspamd_milter_session *session) priv = session->priv; msg_debug_milter ("destroying milter session"); - if (rspamd_event_pending (&priv->ev, EV_TIMEOUT|EV_WRITE|EV_READ)) { - event_del (&priv->ev); - } - + rspamd_ev_watcher_stop (priv->event_loop, &priv->ev); rspamd_milter_session_reset (session, RSPAMD_MILTER_RESET_ALL); if (priv->parser.buf) { @@ -267,14 +264,7 @@ static inline void rspamd_milter_plan_io (struct rspamd_milter_session *session, struct rspamd_milter_private *priv, gshort what) { - if (rspamd_event_pending (&priv->ev, EV_TIMEOUT|EV_WRITE|EV_READ)) { - event_del (&priv->ev); - } - - event_set (&priv->ev, priv->fd, what, rspamd_milter_io_handler, - session); - event_base_set (priv->event_loop, &priv->ev); - event_add (&priv->ev, priv->ptv); + rspamd_ev_watcher_reschedule (priv->event_loop, &priv->ev, what); } diff --git a/src/libserver/worker_util.c b/src/libserver/worker_util.c index 0d4c4db17..0aa0c9cf3 100644 --- a/src/libserver/worker_util.c +++ b/src/libserver/worker_util.c @@ -57,7 +57,7 @@ #include <sys/ucontext.h> #endif -static void rspamd_worker_ignore_signal (int signo); +static void rspamd_worker_ignore_signal (struct rspamd_worker_signal_handler *); /** * Return worker's control structure by its type * @param type @@ -98,6 +98,16 @@ rspamd_worker_terminate_handlers (struct rspamd_worker *w) return ret; } + +static void +rspamd_worker_on_delayed_shutdown (EV_P_ ev_timer *w, int revents) +{ + ev_break (loop, EVBREAK_ALL); +#ifdef WITH_GPERF_TOOLS + ProfilerStop (); +#endif +} + /* * Config reload is designed by sending sigusr2 to active workers and pending shutdown of them */ @@ -108,7 +118,10 @@ rspamd_worker_usr2_handler (struct rspamd_worker_signal_handler *sigh, void *arg struct timeval tv; if (!sigh->worker->wanna_die) { - rspamd_worker_ignore_signal (SIGUSR2); + static ev_timer shutdown_ev; + + rspamd_worker_ignore_signal (sigh); + tv.tv_sec = SOFT_SHUTDOWN_TIME; tv.tv_usec = 0; sigh->worker->wanna_die = TRUE; @@ -119,7 +132,9 @@ rspamd_worker_usr2_handler (struct rspamd_worker_signal_handler *sigh, void *arg G_STRFUNC, "worker's shutdown is pending in %d sec", SOFT_SHUTDOWN_TIME); - event_base_loopexit (sigh->base, &tv); + ev_timer_init (&shutdown_ev, rspamd_worker_on_delayed_shutdown, + SOFT_SHUTDOWN_TIME, 0.0); + ev_timer_start (sigh->event_loop, &shutdown_ev); rspamd_worker_stop_accept (sigh->worker); } @@ -142,9 +157,12 @@ rspamd_worker_usr1_handler (struct rspamd_worker_signal_handler *sigh, void *arg static gboolean rspamd_worker_term_handler (struct rspamd_worker_signal_handler *sigh, void *arg) { - struct timeval tv; + ev_tstamp delay; if (!sigh->worker->wanna_die) { + static ev_timer shutdown_ev; + + rspamd_worker_ignore_signal (sigh); rspamd_default_log_function (G_LOG_LEVEL_INFO, sigh->worker->srv->server_pool->tag.tagname, sigh->worker->srv->server_pool->tag.uid, @@ -152,19 +170,17 @@ rspamd_worker_term_handler (struct rspamd_worker_signal_handler *sigh, void *arg "terminating after receiving signal %s", g_strsignal (sigh->signo)); - tv.tv_usec = 0; if (rspamd_worker_terminate_handlers (sigh->worker)) { - tv.tv_sec = SOFT_SHUTDOWN_TIME; + delay = SOFT_SHUTDOWN_TIME; } else { - tv.tv_sec = 0; + delay = 0; } sigh->worker->wanna_die = 1; - event_base_loopexit (sigh->base, &tv); -#ifdef WITH_GPERF_TOOLS - ProfilerStop (); -#endif + ev_timer_init (&shutdown_ev, rspamd_worker_on_delayed_shutdown, + SOFT_SHUTDOWN_TIME, 0.0); + ev_timer_start (sigh->event_loop, &shutdown_ev); rspamd_worker_stop_accept (sigh->worker); } @@ -173,10 +189,10 @@ rspamd_worker_term_handler (struct rspamd_worker_signal_handler *sigh, void *arg } static void -rspamd_worker_signal_handle (int fd, short what, void *arg) +rspamd_worker_signal_handle (EV_P_ ev_signal *w, int revents) { struct rspamd_worker_signal_handler *sigh = - (struct rspamd_worker_signal_handler *) arg; + (struct rspamd_worker_signal_handler *)w->data; struct rspamd_worker_signal_cb *cb, *cbtmp; /* Call all signal handlers registered */ @@ -188,15 +204,9 @@ rspamd_worker_signal_handle (int fd, short what, void *arg) } static void -rspamd_worker_ignore_signal (int signo) +rspamd_worker_ignore_signal (struct rspamd_worker_signal_handler *sigh) { - struct sigaction sig; - - sigemptyset (&sig.sa_mask); - sigaddset (&sig.sa_mask, signo); - sig.sa_handler = SIG_IGN; - sig.sa_flags = 0; - sigaction (signo, &sig, NULL); + ev_signal_stop (sigh->event_loop, &sigh->ev_sig); } static void @@ -222,14 +232,14 @@ rspamd_sigh_free (void *p) g_free (cb); } - event_del (&sigh->ev); + ev_signal_stop (sigh->event_loop, &sigh->ev_sig); rspamd_worker_default_signal (sigh->signo); g_free (sigh); } void rspamd_worker_set_signal_handler (int signo, struct rspamd_worker *worker, - struct ev_loop *base, + struct ev_loop *event_loop, rspamd_worker_signal_handler handler, void *handler_data) { @@ -242,12 +252,12 @@ rspamd_worker_set_signal_handler (int signo, struct rspamd_worker *worker, sigh = g_malloc0 (sizeof (*sigh)); sigh->signo = signo; sigh->worker = worker; - sigh->base = base; + sigh->event_loop = event_loop; sigh->enabled = TRUE; - signal_set (&sigh->ev, signo, rspamd_worker_signal_handle, sigh); - event_base_set (base, &sigh->ev); - signal_add (&sigh->ev, NULL); + sigh->ev_sig.data = sigh; + ev_signal_init (&sigh->ev_sig, rspamd_worker_signal_handle, signo); + ev_signal_start (event_loop, &sigh->ev_sig); g_hash_table_insert (worker->signal_events, GINT_TO_POINTER (signo), @@ -428,7 +438,7 @@ rspamd_controller_send_error (struct rspamd_http_connection_entry *entry, NULL, "application/json", entry, - entry->rt->ptv); + entry->rt->timeout); entry->is_reply = TRUE; } @@ -460,7 +470,7 @@ rspamd_controller_send_string (struct rspamd_http_connection_entry *entry, NULL, "application/json", entry, - entry->rt->ptv); + entry->rt->timeout); entry->is_reply = TRUE; } @@ -486,7 +496,7 @@ rspamd_controller_send_ucl (struct rspamd_http_connection_entry *entry, NULL, "application/json", entry, - entry->rt->ptv); + entry->rt->timeout); entry->is_reply = TRUE; } @@ -676,9 +686,8 @@ rspamd_fork_worker (struct rspamd_main *rspamd_main, #endif /* Remove the inherited event base */ - event_reinit (rspamd_main->event_loop); - event_base_free (rspamd_main->event_loop); - + ev_loop_destroy (EV_DEFAULT); + rspamd_main->event_loop = NULL; /* Drop privileges */ rspamd_worker_drop_priv (rspamd_main); /* Set limits */ @@ -853,8 +862,7 @@ struct rspamd_worker_session_cache { struct ev_loop *ev_base; GHashTable *cache; struct rspamd_config *cfg; - struct timeval tv; - struct event periodic; + struct ev_timer periodic; }; static gint @@ -868,9 +876,10 @@ rspamd_session_cache_sort_cmp (gconstpointer pa, gconstpointer pb) } static void -rspamd_sessions_cache_periodic (gint fd, short what, gpointer p) +rspamd_sessions_cache_periodic (EV_P_ ev_timer *w, int revents) { - struct rspamd_worker_session_cache *c = p; + struct rspamd_worker_session_cache *c = + (struct rspamd_worker_session_cache *)w->data; GHashTableIter it; gchar timebuf[32]; gpointer k, v; @@ -902,6 +911,8 @@ rspamd_sessions_cache_periodic (gint fd, short what, gpointer p) timebuf); } } + + ev_timer_again (EV_A_ w); } void * @@ -916,11 +927,10 @@ rspamd_worker_session_cache_new (struct rspamd_worker *w, c->cache = g_hash_table_new_full (g_direct_hash, g_direct_equal, NULL, g_free); c->cfg = w->srv->cfg; - double_to_tv (periodic_interval, &c->tv); - event_set (&c->periodic, -1, EV_TIMEOUT|EV_PERSIST, - rspamd_sessions_cache_periodic, c); - event_base_set (ev_base, &c->periodic); - event_add (&c->periodic, &c->tv); + c->periodic.data = c; + ev_timer_init (&c->periodic, rspamd_sessions_cache_periodic, periodic_interval, + periodic_interval); + ev_timer_start (ev_base, &c->periodic); return c; } @@ -1115,12 +1125,13 @@ rspamd_set_crash_handler (struct rspamd_main *rspamd_main) } static void -rspamd_enable_accept_event (gint fd, short what, gpointer d) +rspamd_enable_accept_event (EV_P_ ev_timer *w, int revents) { - struct event *events = d; + struct rspamd_worker_accept_event *ac_ev = + (struct rspamd_worker_accept_event *)w->data; - event_del (&events[1]); - event_add (&events[0], NULL); + ev_timer_stop (EV_A_ w); + ev_io_start (EV_A_ &ac_ev->accept_ev); } void @@ -1128,17 +1139,15 @@ rspamd_worker_throttle_accept_events (gint sock, void *data) { struct rspamd_worker_accept_event *head, *cur; const gdouble throttling = 0.5; - struct ev_loop *ev_base; head = (struct rspamd_worker_accept_event *)data; DL_FOREACH (head, cur) { - ev_base = event_get_base (&events[0]); - event_del (&events[0]); - event_set (&events[1], sock, EV_TIMEOUT, rspamd_enable_accept_event, - events); - event_base_set (ev_base, &events[1]); - event_add (&events[1], &tv); + ev_io_stop (cur->event_loop, &cur->accept_ev); + cur->throttling_ev.data = cur; + ev_timer_init (&cur->throttling_ev, rspamd_enable_accept_event, + throttling, 0.0); + ev_timer_start (cur->event_loop, &cur->throttling_ev); } }
\ No newline at end of file diff --git a/src/libserver/worker_util.h b/src/libserver/worker_util.h index 67b54e5c9..9693aa6ad 100644 --- a/src/libserver/worker_util.h +++ b/src/libserver/worker_util.h @@ -56,7 +56,7 @@ rspamd_prepare_worker (struct rspamd_worker *worker, const char *name, */ void rspamd_worker_set_signal_handler (int signo, struct rspamd_worker *worker, - struct ev_loop *base, + struct ev_loop *event_loop, rspamd_worker_signal_handler handler, void *handler_data); diff --git a/src/libstat/learn_cache/redis_cache.c b/src/libstat/learn_cache/redis_cache.c index 3ae30c440..2313db0b2 100644 --- a/src/libstat/learn_cache/redis_cache.c +++ b/src/libstat/learn_cache/redis_cache.c @@ -45,7 +45,7 @@ struct rspamd_redis_cache_runtime { struct rspamd_redis_cache_ctx *ctx; struct rspamd_task *task; struct upstream *selected; - struct event timeout_event; + ev_timer timer_ev; redisAsyncContext *redis; gboolean has_event; }; @@ -92,9 +92,7 @@ rspamd_redis_cache_fin (gpointer data) redisAsyncContext *redis; rt->has_event = FALSE; - if (rspamd_event_pending (&rt->timeout_event, EV_TIMEOUT)) { - event_del (&rt->timeout_event); - } + ev_timer_stop (rt->task->event_loop, &rt->timer_ev); if (rt->redis) { redis = rt->redis; @@ -105,9 +103,10 @@ rspamd_redis_cache_fin (gpointer data) } static void -rspamd_redis_cache_timeout (gint fd, short what, gpointer d) +rspamd_redis_cache_timeout (EV_P_ ev_timer *w, int revents) { - struct rspamd_redis_cache_runtime *rt = d; + struct rspamd_redis_cache_runtime *rt = + (struct rspamd_redis_cache_runtime *)w->data; struct rspamd_task *task; task = rt->task; @@ -117,7 +116,7 @@ rspamd_redis_cache_timeout (gint fd, short what, gpointer d) rspamd_upstream_fail (rt->selected, FALSE); if (rt->has_event) { - rspamd_session_remove_event (task->s, rspamd_redis_cache_fin, d); + rspamd_session_remove_event (task->s, rspamd_redis_cache_fin, rt); } } @@ -401,8 +400,9 @@ rspamd_stat_cache_redis_runtime (struct rspamd_task *task, redisLibevAttach (task->event_loop, rt->redis); /* Now check stats */ - event_set (&rt->timeout_event, -1, EV_TIMEOUT, rspamd_redis_cache_timeout, rt); - event_base_set (task->event_loop, &rt->timeout_event); + rt->timer_ev.data = rt; + ev_timer_init (&rt->timer_ev, rspamd_redis_cache_timeout, + rt->ctx->timeout, 0.0); rspamd_redis_cache_maybe_auth (ctx, rt->redis); if (!learn) { @@ -418,7 +418,6 @@ rspamd_stat_cache_redis_check (struct rspamd_task *task, gpointer runtime) { struct rspamd_redis_cache_runtime *rt = runtime; - struct timeval tv; gchar *h; if (rspamd_session_blocked (task->s)) { @@ -431,8 +430,6 @@ rspamd_stat_cache_redis_check (struct rspamd_task *task, return RSPAMD_LEARN_INGORE; } - double_to_tv (rt->ctx->timeout, &tv); - if (redisAsyncCommand (rt->redis, rspamd_stat_cache_redis_get, rt, "HGET %s %s", rt->ctx->redis_object, h) == REDIS_OK) { @@ -440,7 +437,7 @@ rspamd_stat_cache_redis_check (struct rspamd_task *task, rspamd_redis_cache_fin, rt, M); - event_add (&rt->timeout_event, &tv); + ev_timer_start (rt->task->event_loop, &rt->timer_ev); rt->has_event = TRUE; } @@ -454,7 +451,6 @@ rspamd_stat_cache_redis_learn (struct rspamd_task *task, gpointer runtime) { struct rspamd_redis_cache_runtime *rt = runtime; - struct timeval tv; gchar *h; gint flag; @@ -465,7 +461,6 @@ rspamd_stat_cache_redis_learn (struct rspamd_task *task, h = rspamd_mempool_get_variable (task->task_pool, "words_hash"); g_assert (h != NULL); - double_to_tv (rt->ctx->timeout, &tv); flag = (task->flags & RSPAMD_TASK_FLAG_LEARN_SPAM) ? 1 : -1; if (redisAsyncCommand (rt->redis, rspamd_stat_cache_redis_set, rt, @@ -473,7 +468,7 @@ rspamd_stat_cache_redis_learn (struct rspamd_task *task, rt->ctx->redis_object, h, flag) == REDIS_OK) { rspamd_session_add_event (task->s, rspamd_redis_cache_fin, rt, M); - event_add (&rt->timeout_event, &tv); + ev_timer_start (rt->task->event_loop, &rt->timer_ev); rt->has_event = TRUE; } diff --git a/src/rspamd.h b/src/rspamd.h index 9048a26bd..e47271ca3 100644 --- a/src/rspamd.h +++ b/src/rspamd.h @@ -122,7 +122,7 @@ struct rspamd_worker_signal_handler { gint signo; gboolean enabled; ev_signal ev_sig; - struct ev_loop *base; + struct ev_loop *event_loop; struct rspamd_worker *worker; struct rspamd_worker_signal_cb *cb; }; |