aboutsummaryrefslogtreecommitdiffstats
path: root/src/libserver/worker_util.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/libserver/worker_util.c')
-rw-r--r--src/libserver/worker_util.c92
1 files changed, 52 insertions, 40 deletions
diff --git a/src/libserver/worker_util.c b/src/libserver/worker_util.c
index 59698be32..0d4c4db17 100644
--- a/src/libserver/worker_util.c
+++ b/src/libserver/worker_util.c
@@ -299,12 +299,12 @@ rspamd_worker_init_signals (struct rspamd_worker *worker, struct ev_loop *base)
struct ev_loop *
rspamd_prepare_worker (struct rspamd_worker *worker, const char *name,
- void (*accept_handler)(int, short, void *))
+ rspamd_accept_handler hdl)
{
- struct ev_loop *ev_base;
- struct event *accept_events;
+ struct ev_loop *event_loop;
GList *cur;
struct rspamd_worker_listen_socket *ls;
+ struct rspamd_worker_accept_event *accept_ev;
#ifdef WITH_PROFILER
extern void _start (void), etext (void);
@@ -316,65 +316,58 @@ rspamd_prepare_worker (struct rspamd_worker *worker, const char *name,
worker->signal_events = g_hash_table_new_full (g_direct_hash, g_direct_equal,
NULL, rspamd_sigh_free);
- ev_base = event_init ();
+ event_loop = ev_default_loop (EVFLAG_SIGNALFD);
- rspamd_worker_init_signals (worker, ev_base);
- rspamd_control_worker_add_default_handler (worker, ev_base);
+ rspamd_worker_init_signals (worker, event_loop);
+ rspamd_control_worker_add_default_handler (worker, event_loop);
#ifdef WITH_HIREDIS
rspamd_redis_pool_config (worker->srv->cfg->redis_pool,
- worker->srv->cfg, ev_base);
+ worker->srv->cfg, event_loop);
#endif
/* Accept all sockets */
- if (accept_handler) {
+ if (hdl) {
cur = worker->cf->listen_socks;
while (cur) {
ls = cur->data;
if (ls->fd != -1) {
- accept_events = g_malloc0 (sizeof (struct event) * 2);
- event_set (&accept_events[0], ls->fd, EV_READ | EV_PERSIST,
- accept_handler, worker);
- event_base_set (ev_base, &accept_events[0]);
- event_add (&accept_events[0], NULL);
- worker->accept_events = g_list_prepend (worker->accept_events,
- accept_events);
+ accept_ev = g_malloc0 (sizeof (*accept_ev));
+ accept_ev->event_loop = event_loop;
+ ev_io_init (&accept_ev->accept_ev, hdl, ls->fd, EV_READ);
+ ev_io_start (event_loop, &accept_ev->accept_ev);
+
+ DL_APPEND (worker->accept_events, accept_ev);
}
cur = g_list_next (cur);
}
}
- return ev_base;
+ return event_loop;
}
void
rspamd_worker_stop_accept (struct rspamd_worker *worker)
{
- GList *cur;
- struct event *events;
+ struct rspamd_worker_accept_event *cur, *tmp;
/* Remove all events */
- cur = worker->accept_events;
- while (cur) {
- events = cur->data;
+ DL_FOREACH_SAFE (worker->accept_events, cur, tmp) {
- if (rspamd_event_pending (&events[0], EV_TIMEOUT|EV_READ|EV_WRITE)) {
- event_del (&events[0]);
+ if (ev_is_active (&cur->accept_ev) || ev_is_pending (&cur->accept_ev)) {
+ ev_io_stop (cur->event_loop, &cur->accept_ev);
}
- if (rspamd_event_pending (&events[1], EV_TIMEOUT|EV_READ|EV_WRITE)) {
- event_del (&events[1]);
+
+ if (ev_is_active (&cur->throttling_ev) || ev_is_pending (&cur->throttling_ev)) {
+ ev_timer_stop (cur->event_loop, &cur->throttling_ev);
}
- cur = g_list_next (cur);
- g_free (events);
+ g_free (cur);
}
- if (worker->accept_events != NULL) {
- g_list_free (worker->accept_events);
- }
/* XXX: we need to do it much later */
#if 0
g_hash_table_iter_init (&it, worker->signal_events);
@@ -721,16 +714,6 @@ rspamd_fork_worker (struct rspamd_main *rspamd_main,
rspamd_log_open (rspamd_main->logger);
wrk->start_time = rspamd_get_calendar_ticks ();
-#if ((GLIB_MAJOR_VERSION == 2) && (GLIB_MINOR_VERSION <= 30))
- # if (GLIB_MINOR_VERSION > 20)
- /* Ugly hack for old glib */
- if (!g_thread_get_initialized ()) {
- g_thread_init (NULL);
- }
-# else
- g_thread_init (NULL);
-# endif
-#endif
if (cf->bind_conf) {
msg_info_main ("starting %s process %P (%d); listen on: %s",
cf->worker->name,
@@ -1129,4 +1112,33 @@ rspamd_set_crash_handler (struct rspamd_main *rspamd_main)
sigaction (SIGFPE, &sa, NULL);
sigaction (SIGSYS, &sa, NULL);
#endif
+}
+
+static void
+rspamd_enable_accept_event (gint fd, short what, gpointer d)
+{
+ struct event *events = d;
+
+ event_del (&events[1]);
+ event_add (&events[0], NULL);
+}
+
+void
+rspamd_worker_throttle_accept_events (gint sock, void *data)
+{
+ struct rspamd_worker_accept_event *head, *cur;
+ const gdouble throttling = 0.5;
+ struct ev_loop *ev_base;
+
+ head = (struct rspamd_worker_accept_event *)data;
+
+ DL_FOREACH (head, cur) {
+
+ ev_base = event_get_base (&events[0]);
+ event_del (&events[0]);
+ event_set (&events[1], sock, EV_TIMEOUT, rspamd_enable_accept_event,
+ events);
+ event_base_set (ev_base, &events[1]);
+ event_add (&events[1], &tv);
+ }
} \ No newline at end of file