aboutsummaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
authorVsevolod Stakhov <vsevolod@highsecure.ru>2019-06-19 11:42:58 +0100
committerVsevolod Stakhov <vsevolod@highsecure.ru>2019-06-22 10:57:29 +0100
commit0334b8e433a45513c0087dda20f22a26b2e16ad1 (patch)
tree767ec1825742c1ad4a4a41c64ee846c54be40eae /src
parenta3a8b32851bd236333af1353dad0d663f30a555d (diff)
downloadrspamd-0334b8e433a45513c0087dda20f22a26b2e16ad1.tar.gz
rspamd-0334b8e433a45513c0087dda20f22a26b2e16ad1.zip
[Project] Further rework
Diffstat (limited to 'src')
-rw-r--r--src/libserver/milter.c14
-rw-r--r--src/libserver/worker_util.c115
-rw-r--r--src/libserver/worker_util.h2
-rw-r--r--src/libstat/learn_cache/redis_cache.c27
-rw-r--r--src/rspamd.h2
5 files changed, 77 insertions, 83 deletions
diff --git a/src/libserver/milter.c b/src/libserver/milter.c
index bb27d2ff1..897938df0 100644
--- a/src/libserver/milter.c
+++ b/src/libserver/milter.c
@@ -186,10 +186,7 @@ rspamd_milter_session_dtor (struct rspamd_milter_session *session)
priv = session->priv;
msg_debug_milter ("destroying milter session");
- if (rspamd_event_pending (&priv->ev, EV_TIMEOUT|EV_WRITE|EV_READ)) {
- event_del (&priv->ev);
- }
-
+ rspamd_ev_watcher_stop (priv->event_loop, &priv->ev);
rspamd_milter_session_reset (session, RSPAMD_MILTER_RESET_ALL);
if (priv->parser.buf) {
@@ -267,14 +264,7 @@ static inline void
rspamd_milter_plan_io (struct rspamd_milter_session *session,
struct rspamd_milter_private *priv, gshort what)
{
- if (rspamd_event_pending (&priv->ev, EV_TIMEOUT|EV_WRITE|EV_READ)) {
- event_del (&priv->ev);
- }
-
- event_set (&priv->ev, priv->fd, what, rspamd_milter_io_handler,
- session);
- event_base_set (priv->event_loop, &priv->ev);
- event_add (&priv->ev, priv->ptv);
+ rspamd_ev_watcher_reschedule (priv->event_loop, &priv->ev, what);
}
diff --git a/src/libserver/worker_util.c b/src/libserver/worker_util.c
index 0d4c4db17..0aa0c9cf3 100644
--- a/src/libserver/worker_util.c
+++ b/src/libserver/worker_util.c
@@ -57,7 +57,7 @@
#include <sys/ucontext.h>
#endif
-static void rspamd_worker_ignore_signal (int signo);
+static void rspamd_worker_ignore_signal (struct rspamd_worker_signal_handler *);
/**
* Return worker's control structure by its type
* @param type
@@ -98,6 +98,16 @@ rspamd_worker_terminate_handlers (struct rspamd_worker *w)
return ret;
}
+
+static void
+rspamd_worker_on_delayed_shutdown (EV_P_ ev_timer *w, int revents)
+{
+ ev_break (loop, EVBREAK_ALL);
+#ifdef WITH_GPERF_TOOLS
+ ProfilerStop ();
+#endif
+}
+
/*
* Config reload is designed by sending sigusr2 to active workers and pending shutdown of them
*/
@@ -108,7 +118,10 @@ rspamd_worker_usr2_handler (struct rspamd_worker_signal_handler *sigh, void *arg
struct timeval tv;
if (!sigh->worker->wanna_die) {
- rspamd_worker_ignore_signal (SIGUSR2);
+ static ev_timer shutdown_ev;
+
+ rspamd_worker_ignore_signal (sigh);
+
tv.tv_sec = SOFT_SHUTDOWN_TIME;
tv.tv_usec = 0;
sigh->worker->wanna_die = TRUE;
@@ -119,7 +132,9 @@ rspamd_worker_usr2_handler (struct rspamd_worker_signal_handler *sigh, void *arg
G_STRFUNC,
"worker's shutdown is pending in %d sec",
SOFT_SHUTDOWN_TIME);
- event_base_loopexit (sigh->base, &tv);
+ ev_timer_init (&shutdown_ev, rspamd_worker_on_delayed_shutdown,
+ SOFT_SHUTDOWN_TIME, 0.0);
+ ev_timer_start (sigh->event_loop, &shutdown_ev);
rspamd_worker_stop_accept (sigh->worker);
}
@@ -142,9 +157,12 @@ rspamd_worker_usr1_handler (struct rspamd_worker_signal_handler *sigh, void *arg
static gboolean
rspamd_worker_term_handler (struct rspamd_worker_signal_handler *sigh, void *arg)
{
- struct timeval tv;
+ ev_tstamp delay;
if (!sigh->worker->wanna_die) {
+ static ev_timer shutdown_ev;
+
+ rspamd_worker_ignore_signal (sigh);
rspamd_default_log_function (G_LOG_LEVEL_INFO,
sigh->worker->srv->server_pool->tag.tagname,
sigh->worker->srv->server_pool->tag.uid,
@@ -152,19 +170,17 @@ rspamd_worker_term_handler (struct rspamd_worker_signal_handler *sigh, void *arg
"terminating after receiving signal %s",
g_strsignal (sigh->signo));
- tv.tv_usec = 0;
if (rspamd_worker_terminate_handlers (sigh->worker)) {
- tv.tv_sec = SOFT_SHUTDOWN_TIME;
+ delay = SOFT_SHUTDOWN_TIME;
}
else {
- tv.tv_sec = 0;
+ delay = 0;
}
sigh->worker->wanna_die = 1;
- event_base_loopexit (sigh->base, &tv);
-#ifdef WITH_GPERF_TOOLS
- ProfilerStop ();
-#endif
+ ev_timer_init (&shutdown_ev, rspamd_worker_on_delayed_shutdown,
+ SOFT_SHUTDOWN_TIME, 0.0);
+ ev_timer_start (sigh->event_loop, &shutdown_ev);
rspamd_worker_stop_accept (sigh->worker);
}
@@ -173,10 +189,10 @@ rspamd_worker_term_handler (struct rspamd_worker_signal_handler *sigh, void *arg
}
static void
-rspamd_worker_signal_handle (int fd, short what, void *arg)
+rspamd_worker_signal_handle (EV_P_ ev_signal *w, int revents)
{
struct rspamd_worker_signal_handler *sigh =
- (struct rspamd_worker_signal_handler *) arg;
+ (struct rspamd_worker_signal_handler *)w->data;
struct rspamd_worker_signal_cb *cb, *cbtmp;
/* Call all signal handlers registered */
@@ -188,15 +204,9 @@ rspamd_worker_signal_handle (int fd, short what, void *arg)
}
static void
-rspamd_worker_ignore_signal (int signo)
+rspamd_worker_ignore_signal (struct rspamd_worker_signal_handler *sigh)
{
- struct sigaction sig;
-
- sigemptyset (&sig.sa_mask);
- sigaddset (&sig.sa_mask, signo);
- sig.sa_handler = SIG_IGN;
- sig.sa_flags = 0;
- sigaction (signo, &sig, NULL);
+ ev_signal_stop (sigh->event_loop, &sigh->ev_sig);
}
static void
@@ -222,14 +232,14 @@ rspamd_sigh_free (void *p)
g_free (cb);
}
- event_del (&sigh->ev);
+ ev_signal_stop (sigh->event_loop, &sigh->ev_sig);
rspamd_worker_default_signal (sigh->signo);
g_free (sigh);
}
void
rspamd_worker_set_signal_handler (int signo, struct rspamd_worker *worker,
- struct ev_loop *base,
+ struct ev_loop *event_loop,
rspamd_worker_signal_handler handler,
void *handler_data)
{
@@ -242,12 +252,12 @@ rspamd_worker_set_signal_handler (int signo, struct rspamd_worker *worker,
sigh = g_malloc0 (sizeof (*sigh));
sigh->signo = signo;
sigh->worker = worker;
- sigh->base = base;
+ sigh->event_loop = event_loop;
sigh->enabled = TRUE;
- signal_set (&sigh->ev, signo, rspamd_worker_signal_handle, sigh);
- event_base_set (base, &sigh->ev);
- signal_add (&sigh->ev, NULL);
+ sigh->ev_sig.data = sigh;
+ ev_signal_init (&sigh->ev_sig, rspamd_worker_signal_handle, signo);
+ ev_signal_start (event_loop, &sigh->ev_sig);
g_hash_table_insert (worker->signal_events,
GINT_TO_POINTER (signo),
@@ -428,7 +438,7 @@ rspamd_controller_send_error (struct rspamd_http_connection_entry *entry,
NULL,
"application/json",
entry,
- entry->rt->ptv);
+ entry->rt->timeout);
entry->is_reply = TRUE;
}
@@ -460,7 +470,7 @@ rspamd_controller_send_string (struct rspamd_http_connection_entry *entry,
NULL,
"application/json",
entry,
- entry->rt->ptv);
+ entry->rt->timeout);
entry->is_reply = TRUE;
}
@@ -486,7 +496,7 @@ rspamd_controller_send_ucl (struct rspamd_http_connection_entry *entry,
NULL,
"application/json",
entry,
- entry->rt->ptv);
+ entry->rt->timeout);
entry->is_reply = TRUE;
}
@@ -676,9 +686,8 @@ rspamd_fork_worker (struct rspamd_main *rspamd_main,
#endif
/* Remove the inherited event base */
- event_reinit (rspamd_main->event_loop);
- event_base_free (rspamd_main->event_loop);
-
+ ev_loop_destroy (EV_DEFAULT);
+ rspamd_main->event_loop = NULL;
/* Drop privileges */
rspamd_worker_drop_priv (rspamd_main);
/* Set limits */
@@ -853,8 +862,7 @@ struct rspamd_worker_session_cache {
struct ev_loop *ev_base;
GHashTable *cache;
struct rspamd_config *cfg;
- struct timeval tv;
- struct event periodic;
+ struct ev_timer periodic;
};
static gint
@@ -868,9 +876,10 @@ rspamd_session_cache_sort_cmp (gconstpointer pa, gconstpointer pb)
}
static void
-rspamd_sessions_cache_periodic (gint fd, short what, gpointer p)
+rspamd_sessions_cache_periodic (EV_P_ ev_timer *w, int revents)
{
- struct rspamd_worker_session_cache *c = p;
+ struct rspamd_worker_session_cache *c =
+ (struct rspamd_worker_session_cache *)w->data;
GHashTableIter it;
gchar timebuf[32];
gpointer k, v;
@@ -902,6 +911,8 @@ rspamd_sessions_cache_periodic (gint fd, short what, gpointer p)
timebuf);
}
}
+
+ ev_timer_again (EV_A_ w);
}
void *
@@ -916,11 +927,10 @@ rspamd_worker_session_cache_new (struct rspamd_worker *w,
c->cache = g_hash_table_new_full (g_direct_hash, g_direct_equal,
NULL, g_free);
c->cfg = w->srv->cfg;
- double_to_tv (periodic_interval, &c->tv);
- event_set (&c->periodic, -1, EV_TIMEOUT|EV_PERSIST,
- rspamd_sessions_cache_periodic, c);
- event_base_set (ev_base, &c->periodic);
- event_add (&c->periodic, &c->tv);
+ c->periodic.data = c;
+ ev_timer_init (&c->periodic, rspamd_sessions_cache_periodic, periodic_interval,
+ periodic_interval);
+ ev_timer_start (ev_base, &c->periodic);
return c;
}
@@ -1115,12 +1125,13 @@ rspamd_set_crash_handler (struct rspamd_main *rspamd_main)
}
static void
-rspamd_enable_accept_event (gint fd, short what, gpointer d)
+rspamd_enable_accept_event (EV_P_ ev_timer *w, int revents)
{
- struct event *events = d;
+ struct rspamd_worker_accept_event *ac_ev =
+ (struct rspamd_worker_accept_event *)w->data;
- event_del (&events[1]);
- event_add (&events[0], NULL);
+ ev_timer_stop (EV_A_ w);
+ ev_io_start (EV_A_ &ac_ev->accept_ev);
}
void
@@ -1128,17 +1139,15 @@ rspamd_worker_throttle_accept_events (gint sock, void *data)
{
struct rspamd_worker_accept_event *head, *cur;
const gdouble throttling = 0.5;
- struct ev_loop *ev_base;
head = (struct rspamd_worker_accept_event *)data;
DL_FOREACH (head, cur) {
- ev_base = event_get_base (&events[0]);
- event_del (&events[0]);
- event_set (&events[1], sock, EV_TIMEOUT, rspamd_enable_accept_event,
- events);
- event_base_set (ev_base, &events[1]);
- event_add (&events[1], &tv);
+ ev_io_stop (cur->event_loop, &cur->accept_ev);
+ cur->throttling_ev.data = cur;
+ ev_timer_init (&cur->throttling_ev, rspamd_enable_accept_event,
+ throttling, 0.0);
+ ev_timer_start (cur->event_loop, &cur->throttling_ev);
}
} \ No newline at end of file
diff --git a/src/libserver/worker_util.h b/src/libserver/worker_util.h
index 67b54e5c9..9693aa6ad 100644
--- a/src/libserver/worker_util.h
+++ b/src/libserver/worker_util.h
@@ -56,7 +56,7 @@ rspamd_prepare_worker (struct rspamd_worker *worker, const char *name,
*/
void rspamd_worker_set_signal_handler (int signo,
struct rspamd_worker *worker,
- struct ev_loop *base,
+ struct ev_loop *event_loop,
rspamd_worker_signal_handler handler,
void *handler_data);
diff --git a/src/libstat/learn_cache/redis_cache.c b/src/libstat/learn_cache/redis_cache.c
index 3ae30c440..2313db0b2 100644
--- a/src/libstat/learn_cache/redis_cache.c
+++ b/src/libstat/learn_cache/redis_cache.c
@@ -45,7 +45,7 @@ struct rspamd_redis_cache_runtime {
struct rspamd_redis_cache_ctx *ctx;
struct rspamd_task *task;
struct upstream *selected;
- struct event timeout_event;
+ ev_timer timer_ev;
redisAsyncContext *redis;
gboolean has_event;
};
@@ -92,9 +92,7 @@ rspamd_redis_cache_fin (gpointer data)
redisAsyncContext *redis;
rt->has_event = FALSE;
- if (rspamd_event_pending (&rt->timeout_event, EV_TIMEOUT)) {
- event_del (&rt->timeout_event);
- }
+ ev_timer_stop (rt->task->event_loop, &rt->timer_ev);
if (rt->redis) {
redis = rt->redis;
@@ -105,9 +103,10 @@ rspamd_redis_cache_fin (gpointer data)
}
static void
-rspamd_redis_cache_timeout (gint fd, short what, gpointer d)
+rspamd_redis_cache_timeout (EV_P_ ev_timer *w, int revents)
{
- struct rspamd_redis_cache_runtime *rt = d;
+ struct rspamd_redis_cache_runtime *rt =
+ (struct rspamd_redis_cache_runtime *)w->data;
struct rspamd_task *task;
task = rt->task;
@@ -117,7 +116,7 @@ rspamd_redis_cache_timeout (gint fd, short what, gpointer d)
rspamd_upstream_fail (rt->selected, FALSE);
if (rt->has_event) {
- rspamd_session_remove_event (task->s, rspamd_redis_cache_fin, d);
+ rspamd_session_remove_event (task->s, rspamd_redis_cache_fin, rt);
}
}
@@ -401,8 +400,9 @@ rspamd_stat_cache_redis_runtime (struct rspamd_task *task,
redisLibevAttach (task->event_loop, rt->redis);
/* Now check stats */
- event_set (&rt->timeout_event, -1, EV_TIMEOUT, rspamd_redis_cache_timeout, rt);
- event_base_set (task->event_loop, &rt->timeout_event);
+ rt->timer_ev.data = rt;
+ ev_timer_init (&rt->timer_ev, rspamd_redis_cache_timeout,
+ rt->ctx->timeout, 0.0);
rspamd_redis_cache_maybe_auth (ctx, rt->redis);
if (!learn) {
@@ -418,7 +418,6 @@ rspamd_stat_cache_redis_check (struct rspamd_task *task,
gpointer runtime)
{
struct rspamd_redis_cache_runtime *rt = runtime;
- struct timeval tv;
gchar *h;
if (rspamd_session_blocked (task->s)) {
@@ -431,8 +430,6 @@ rspamd_stat_cache_redis_check (struct rspamd_task *task,
return RSPAMD_LEARN_INGORE;
}
- double_to_tv (rt->ctx->timeout, &tv);
-
if (redisAsyncCommand (rt->redis, rspamd_stat_cache_redis_get, rt,
"HGET %s %s",
rt->ctx->redis_object, h) == REDIS_OK) {
@@ -440,7 +437,7 @@ rspamd_stat_cache_redis_check (struct rspamd_task *task,
rspamd_redis_cache_fin,
rt,
M);
- event_add (&rt->timeout_event, &tv);
+ ev_timer_start (rt->task->event_loop, &rt->timer_ev);
rt->has_event = TRUE;
}
@@ -454,7 +451,6 @@ rspamd_stat_cache_redis_learn (struct rspamd_task *task,
gpointer runtime)
{
struct rspamd_redis_cache_runtime *rt = runtime;
- struct timeval tv;
gchar *h;
gint flag;
@@ -465,7 +461,6 @@ rspamd_stat_cache_redis_learn (struct rspamd_task *task,
h = rspamd_mempool_get_variable (task->task_pool, "words_hash");
g_assert (h != NULL);
- double_to_tv (rt->ctx->timeout, &tv);
flag = (task->flags & RSPAMD_TASK_FLAG_LEARN_SPAM) ? 1 : -1;
if (redisAsyncCommand (rt->redis, rspamd_stat_cache_redis_set, rt,
@@ -473,7 +468,7 @@ rspamd_stat_cache_redis_learn (struct rspamd_task *task,
rt->ctx->redis_object, h, flag) == REDIS_OK) {
rspamd_session_add_event (task->s,
rspamd_redis_cache_fin, rt, M);
- event_add (&rt->timeout_event, &tv);
+ ev_timer_start (rt->task->event_loop, &rt->timer_ev);
rt->has_event = TRUE;
}
diff --git a/src/rspamd.h b/src/rspamd.h
index 9048a26bd..e47271ca3 100644
--- a/src/rspamd.h
+++ b/src/rspamd.h
@@ -122,7 +122,7 @@ struct rspamd_worker_signal_handler {
gint signo;
gboolean enabled;
ev_signal ev_sig;
- struct ev_loop *base;
+ struct ev_loop *event_loop;
struct rspamd_worker *worker;
struct rspamd_worker_signal_cb *cb;
};