diff options
Diffstat (limited to 'src/libserver/worker_util.c')
-rw-r--r-- | src/libserver/worker_util.c | 92 |
1 files changed, 52 insertions, 40 deletions
diff --git a/src/libserver/worker_util.c b/src/libserver/worker_util.c index 59698be32..0d4c4db17 100644 --- a/src/libserver/worker_util.c +++ b/src/libserver/worker_util.c @@ -299,12 +299,12 @@ rspamd_worker_init_signals (struct rspamd_worker *worker, struct ev_loop *base) struct ev_loop * rspamd_prepare_worker (struct rspamd_worker *worker, const char *name, - void (*accept_handler)(int, short, void *)) + rspamd_accept_handler hdl) { - struct ev_loop *ev_base; - struct event *accept_events; + struct ev_loop *event_loop; GList *cur; struct rspamd_worker_listen_socket *ls; + struct rspamd_worker_accept_event *accept_ev; #ifdef WITH_PROFILER extern void _start (void), etext (void); @@ -316,65 +316,58 @@ rspamd_prepare_worker (struct rspamd_worker *worker, const char *name, worker->signal_events = g_hash_table_new_full (g_direct_hash, g_direct_equal, NULL, rspamd_sigh_free); - ev_base = event_init (); + event_loop = ev_default_loop (EVFLAG_SIGNALFD); - rspamd_worker_init_signals (worker, ev_base); - rspamd_control_worker_add_default_handler (worker, ev_base); + rspamd_worker_init_signals (worker, event_loop); + rspamd_control_worker_add_default_handler (worker, event_loop); #ifdef WITH_HIREDIS rspamd_redis_pool_config (worker->srv->cfg->redis_pool, - worker->srv->cfg, ev_base); + worker->srv->cfg, event_loop); #endif /* Accept all sockets */ - if (accept_handler) { + if (hdl) { cur = worker->cf->listen_socks; while (cur) { ls = cur->data; if (ls->fd != -1) { - accept_events = g_malloc0 (sizeof (struct event) * 2); - event_set (&accept_events[0], ls->fd, EV_READ | EV_PERSIST, - accept_handler, worker); - event_base_set (ev_base, &accept_events[0]); - event_add (&accept_events[0], NULL); - worker->accept_events = g_list_prepend (worker->accept_events, - accept_events); + accept_ev = g_malloc0 (sizeof (*accept_ev)); + accept_ev->event_loop = event_loop; + ev_io_init (&accept_ev->accept_ev, hdl, ls->fd, EV_READ); + ev_io_start (event_loop, &accept_ev->accept_ev); + + DL_APPEND (worker->accept_events, accept_ev); } cur = g_list_next (cur); } } - return ev_base; + return event_loop; } void rspamd_worker_stop_accept (struct rspamd_worker *worker) { - GList *cur; - struct event *events; + struct rspamd_worker_accept_event *cur, *tmp; /* Remove all events */ - cur = worker->accept_events; - while (cur) { - events = cur->data; + DL_FOREACH_SAFE (worker->accept_events, cur, tmp) { - if (rspamd_event_pending (&events[0], EV_TIMEOUT|EV_READ|EV_WRITE)) { - event_del (&events[0]); + if (ev_is_active (&cur->accept_ev) || ev_is_pending (&cur->accept_ev)) { + ev_io_stop (cur->event_loop, &cur->accept_ev); } - if (rspamd_event_pending (&events[1], EV_TIMEOUT|EV_READ|EV_WRITE)) { - event_del (&events[1]); + + if (ev_is_active (&cur->throttling_ev) || ev_is_pending (&cur->throttling_ev)) { + ev_timer_stop (cur->event_loop, &cur->throttling_ev); } - cur = g_list_next (cur); - g_free (events); + g_free (cur); } - if (worker->accept_events != NULL) { - g_list_free (worker->accept_events); - } /* XXX: we need to do it much later */ #if 0 g_hash_table_iter_init (&it, worker->signal_events); @@ -721,16 +714,6 @@ rspamd_fork_worker (struct rspamd_main *rspamd_main, rspamd_log_open (rspamd_main->logger); wrk->start_time = rspamd_get_calendar_ticks (); -#if ((GLIB_MAJOR_VERSION == 2) && (GLIB_MINOR_VERSION <= 30)) - # if (GLIB_MINOR_VERSION > 20) - /* Ugly hack for old glib */ - if (!g_thread_get_initialized ()) { - g_thread_init (NULL); - } -# else - g_thread_init (NULL); -# endif -#endif if (cf->bind_conf) { msg_info_main ("starting %s process %P (%d); listen on: %s", cf->worker->name, @@ -1129,4 +1112,33 @@ rspamd_set_crash_handler (struct rspamd_main *rspamd_main) sigaction (SIGFPE, &sa, NULL); sigaction (SIGSYS, &sa, NULL); #endif +} + +static void +rspamd_enable_accept_event (gint fd, short what, gpointer d) +{ + struct event *events = d; + + event_del (&events[1]); + event_add (&events[0], NULL); +} + +void +rspamd_worker_throttle_accept_events (gint sock, void *data) +{ + struct rspamd_worker_accept_event *head, *cur; + const gdouble throttling = 0.5; + struct ev_loop *ev_base; + + head = (struct rspamd_worker_accept_event *)data; + + DL_FOREACH (head, cur) { + + ev_base = event_get_base (&events[0]); + event_del (&events[0]); + event_set (&events[1], sock, EV_TIMEOUT, rspamd_enable_accept_event, + events); + event_base_set (ev_base, &events[1]); + event_add (&events[1], &tv); + } }
\ No newline at end of file |