#include <sys/ucontext.h>
#endif
-static void rspamd_worker_ignore_signal (int signo);
+static void rspamd_worker_ignore_signal (struct rspamd_worker_signal_handler *);
/**
* Return worker's control structure by its type
* @param type
return ret;
}
+
+static void
+rspamd_worker_on_delayed_shutdown (EV_P_ ev_timer *w, int revents)
+{
+ ev_break (loop, EVBREAK_ALL);
+#ifdef WITH_GPERF_TOOLS
+ ProfilerStop ();
+#endif
+}
+
/*
* Config reload is designed by sending sigusr2 to active workers and pending shutdown of them
*/
struct timeval tv;
if (!sigh->worker->wanna_die) {
- rspamd_worker_ignore_signal (SIGUSR2);
+ static ev_timer shutdown_ev;
+
+ rspamd_worker_ignore_signal (sigh);
+
tv.tv_sec = SOFT_SHUTDOWN_TIME;
tv.tv_usec = 0;
sigh->worker->wanna_die = TRUE;
G_STRFUNC,
"worker's shutdown is pending in %d sec",
SOFT_SHUTDOWN_TIME);
- event_base_loopexit (sigh->base, &tv);
+ ev_timer_init (&shutdown_ev, rspamd_worker_on_delayed_shutdown,
+ SOFT_SHUTDOWN_TIME, 0.0);
+ ev_timer_start (sigh->event_loop, &shutdown_ev);
rspamd_worker_stop_accept (sigh->worker);
}
static gboolean
rspamd_worker_term_handler (struct rspamd_worker_signal_handler *sigh, void *arg)
{
- struct timeval tv;
+ ev_tstamp delay;
if (!sigh->worker->wanna_die) {
+ static ev_timer shutdown_ev;
+
+ rspamd_worker_ignore_signal (sigh);
rspamd_default_log_function (G_LOG_LEVEL_INFO,
sigh->worker->srv->server_pool->tag.tagname,
sigh->worker->srv->server_pool->tag.uid,
"terminating after receiving signal %s",
g_strsignal (sigh->signo));
- tv.tv_usec = 0;
if (rspamd_worker_terminate_handlers (sigh->worker)) {
- tv.tv_sec = SOFT_SHUTDOWN_TIME;
+ delay = SOFT_SHUTDOWN_TIME;
}
else {
- tv.tv_sec = 0;
+ delay = 0;
}
sigh->worker->wanna_die = 1;
- event_base_loopexit (sigh->base, &tv);
-#ifdef WITH_GPERF_TOOLS
- ProfilerStop ();
-#endif
+ ev_timer_init (&shutdown_ev, rspamd_worker_on_delayed_shutdown,
+ SOFT_SHUTDOWN_TIME, 0.0);
+ ev_timer_start (sigh->event_loop, &shutdown_ev);
rspamd_worker_stop_accept (sigh->worker);
}
}
static void
-rspamd_worker_signal_handle (int fd, short what, void *arg)
+rspamd_worker_signal_handle (EV_P_ ev_signal *w, int revents)
{
struct rspamd_worker_signal_handler *sigh =
- (struct rspamd_worker_signal_handler *) arg;
+ (struct rspamd_worker_signal_handler *)w->data;
struct rspamd_worker_signal_cb *cb, *cbtmp;
/* Call all signal handlers registered */
}
static void
-rspamd_worker_ignore_signal (int signo)
+rspamd_worker_ignore_signal (struct rspamd_worker_signal_handler *sigh)
{
- struct sigaction sig;
-
- sigemptyset (&sig.sa_mask);
- sigaddset (&sig.sa_mask, signo);
- sig.sa_handler = SIG_IGN;
- sig.sa_flags = 0;
- sigaction (signo, &sig, NULL);
+ ev_signal_stop (sigh->event_loop, &sigh->ev_sig);
}
static void
g_free (cb);
}
- event_del (&sigh->ev);
+ ev_signal_stop (sigh->event_loop, &sigh->ev_sig);
rspamd_worker_default_signal (sigh->signo);
g_free (sigh);
}
void
rspamd_worker_set_signal_handler (int signo, struct rspamd_worker *worker,
- struct ev_loop *base,
+ struct ev_loop *event_loop,
rspamd_worker_signal_handler handler,
void *handler_data)
{
sigh = g_malloc0 (sizeof (*sigh));
sigh->signo = signo;
sigh->worker = worker;
- sigh->base = base;
+ sigh->event_loop = event_loop;
sigh->enabled = TRUE;
- signal_set (&sigh->ev, signo, rspamd_worker_signal_handle, sigh);
- event_base_set (base, &sigh->ev);
- signal_add (&sigh->ev, NULL);
+ sigh->ev_sig.data = sigh;
+ ev_signal_init (&sigh->ev_sig, rspamd_worker_signal_handle, signo);
+ ev_signal_start (event_loop, &sigh->ev_sig);
g_hash_table_insert (worker->signal_events,
GINT_TO_POINTER (signo),
NULL,
"application/json",
entry,
- entry->rt->ptv);
+ entry->rt->timeout);
entry->is_reply = TRUE;
}
NULL,
"application/json",
entry,
- entry->rt->ptv);
+ entry->rt->timeout);
entry->is_reply = TRUE;
}
NULL,
"application/json",
entry,
- entry->rt->ptv);
+ entry->rt->timeout);
entry->is_reply = TRUE;
}
#endif
/* Remove the inherited event base */
- event_reinit (rspamd_main->event_loop);
- event_base_free (rspamd_main->event_loop);
-
+ ev_loop_destroy (EV_DEFAULT);
+ rspamd_main->event_loop = NULL;
/* Drop privileges */
rspamd_worker_drop_priv (rspamd_main);
/* Set limits */
struct ev_loop *ev_base;
GHashTable *cache;
struct rspamd_config *cfg;
- struct timeval tv;
- struct event periodic;
+ struct ev_timer periodic;
};
static gint
}
static void
-rspamd_sessions_cache_periodic (gint fd, short what, gpointer p)
+rspamd_sessions_cache_periodic (EV_P_ ev_timer *w, int revents)
{
- struct rspamd_worker_session_cache *c = p;
+ struct rspamd_worker_session_cache *c =
+ (struct rspamd_worker_session_cache *)w->data;
GHashTableIter it;
gchar timebuf[32];
gpointer k, v;
timebuf);
}
}
+
+ ev_timer_again (EV_A_ w);
}
void *
c->cache = g_hash_table_new_full (g_direct_hash, g_direct_equal,
NULL, g_free);
c->cfg = w->srv->cfg;
- double_to_tv (periodic_interval, &c->tv);
- event_set (&c->periodic, -1, EV_TIMEOUT|EV_PERSIST,
- rspamd_sessions_cache_periodic, c);
- event_base_set (ev_base, &c->periodic);
- event_add (&c->periodic, &c->tv);
+ c->periodic.data = c;
+ ev_timer_init (&c->periodic, rspamd_sessions_cache_periodic, periodic_interval,
+ periodic_interval);
+ ev_timer_start (ev_base, &c->periodic);
return c;
}
}
static void
-rspamd_enable_accept_event (gint fd, short what, gpointer d)
+rspamd_enable_accept_event (EV_P_ ev_timer *w, int revents)
{
- struct event *events = d;
+ struct rspamd_worker_accept_event *ac_ev =
+ (struct rspamd_worker_accept_event *)w->data;
- event_del (&events[1]);
- event_add (&events[0], NULL);
+ ev_timer_stop (EV_A_ w);
+ ev_io_start (EV_A_ &ac_ev->accept_ev);
}
void
{
struct rspamd_worker_accept_event *head, *cur;
const gdouble throttling = 0.5;
- struct ev_loop *ev_base;
head = (struct rspamd_worker_accept_event *)data;
DL_FOREACH (head, cur) {
- ev_base = event_get_base (&events[0]);
- event_del (&events[0]);
- event_set (&events[1], sock, EV_TIMEOUT, rspamd_enable_accept_event,
- events);
- event_base_set (ev_base, &events[1]);
- event_add (&events[1], &tv);
+ ev_io_stop (cur->event_loop, &cur->accept_ev);
+ cur->throttling_ev.data = cur;
+ ev_timer_init (&cur->throttling_ev, rspamd_enable_accept_event,
+ throttling, 0.0);
+ ev_timer_start (cur->event_loop, &cur->throttling_ev);
}
}
\ No newline at end of file
struct rspamd_redis_cache_ctx *ctx;
struct rspamd_task *task;
struct upstream *selected;
- struct event timeout_event;
+ ev_timer timer_ev;
redisAsyncContext *redis;
gboolean has_event;
};
redisAsyncContext *redis;
rt->has_event = FALSE;
- if (rspamd_event_pending (&rt->timeout_event, EV_TIMEOUT)) {
- event_del (&rt->timeout_event);
- }
+ ev_timer_stop (rt->task->event_loop, &rt->timer_ev);
if (rt->redis) {
redis = rt->redis;
}
static void
-rspamd_redis_cache_timeout (gint fd, short what, gpointer d)
+rspamd_redis_cache_timeout (EV_P_ ev_timer *w, int revents)
{
- struct rspamd_redis_cache_runtime *rt = d;
+ struct rspamd_redis_cache_runtime *rt =
+ (struct rspamd_redis_cache_runtime *)w->data;
struct rspamd_task *task;
task = rt->task;
rspamd_upstream_fail (rt->selected, FALSE);
if (rt->has_event) {
- rspamd_session_remove_event (task->s, rspamd_redis_cache_fin, d);
+ rspamd_session_remove_event (task->s, rspamd_redis_cache_fin, rt);
}
}
redisLibevAttach (task->event_loop, rt->redis);
/* Now check stats */
- event_set (&rt->timeout_event, -1, EV_TIMEOUT, rspamd_redis_cache_timeout, rt);
- event_base_set (task->event_loop, &rt->timeout_event);
+ rt->timer_ev.data = rt;
+ ev_timer_init (&rt->timer_ev, rspamd_redis_cache_timeout,
+ rt->ctx->timeout, 0.0);
rspamd_redis_cache_maybe_auth (ctx, rt->redis);
if (!learn) {
gpointer runtime)
{
struct rspamd_redis_cache_runtime *rt = runtime;
- struct timeval tv;
gchar *h;
if (rspamd_session_blocked (task->s)) {
return RSPAMD_LEARN_INGORE;
}
- double_to_tv (rt->ctx->timeout, &tv);
-
if (redisAsyncCommand (rt->redis, rspamd_stat_cache_redis_get, rt,
"HGET %s %s",
rt->ctx->redis_object, h) == REDIS_OK) {
rspamd_redis_cache_fin,
rt,
M);
- event_add (&rt->timeout_event, &tv);
+ ev_timer_start (rt->task->event_loop, &rt->timer_ev);
rt->has_event = TRUE;
}
gpointer runtime)
{
struct rspamd_redis_cache_runtime *rt = runtime;
- struct timeval tv;
gchar *h;
gint flag;
h = rspamd_mempool_get_variable (task->task_pool, "words_hash");
g_assert (h != NULL);
- double_to_tv (rt->ctx->timeout, &tv);
flag = (task->flags & RSPAMD_TASK_FLAG_LEARN_SPAM) ? 1 : -1;
if (redisAsyncCommand (rt->redis, rspamd_stat_cache_redis_set, rt,
rt->ctx->redis_object, h, flag) == REDIS_OK) {
rspamd_session_add_event (task->s,
rspamd_redis_cache_fin, rt, M);
- event_add (&rt->timeout_event, &tv);
+ ev_timer_start (rt->task->event_loop, &rt->timer_ev);
rt->has_event = TRUE;
}