]> source.dussan.org Git - rspamd.git/commitdiff
[Project] Further rework
authorVsevolod Stakhov <vsevolod@highsecure.ru>
Wed, 19 Jun 2019 10:42:58 +0000 (11:42 +0100)
committerVsevolod Stakhov <vsevolod@highsecure.ru>
Sat, 22 Jun 2019 09:57:29 +0000 (10:57 +0100)
src/libserver/milter.c
src/libserver/worker_util.c
src/libserver/worker_util.h
src/libstat/learn_cache/redis_cache.c
src/rspamd.h

index bb27d2ff11a470fddcfb3894d048da0c4c034bac..897938df026ee940807d293b08439bf38349d6c2 100644 (file)
@@ -186,10 +186,7 @@ rspamd_milter_session_dtor (struct rspamd_milter_session *session)
                priv = session->priv;
                msg_debug_milter ("destroying milter session");
 
-               if (rspamd_event_pending (&priv->ev, EV_TIMEOUT|EV_WRITE|EV_READ)) {
-                       event_del (&priv->ev);
-               }
-
+               rspamd_ev_watcher_stop (priv->event_loop, &priv->ev);
                rspamd_milter_session_reset (session, RSPAMD_MILTER_RESET_ALL);
 
                if (priv->parser.buf) {
@@ -267,14 +264,7 @@ static inline void
 rspamd_milter_plan_io (struct rspamd_milter_session *session,
                struct rspamd_milter_private *priv, gshort what)
 {
-       if (rspamd_event_pending (&priv->ev, EV_TIMEOUT|EV_WRITE|EV_READ)) {
-               event_del (&priv->ev);
-       }
-
-       event_set (&priv->ev, priv->fd, what, rspamd_milter_io_handler,
-                       session);
-       event_base_set (priv->event_loop, &priv->ev);
-       event_add (&priv->ev, priv->ptv);
+       rspamd_ev_watcher_reschedule (priv->event_loop, &priv->ev, what);
 }
 
 
index 0d4c4db17fb9846cc29294648e322bda2493fd8d..0aa0c9cf32dc95b1bbfec5ace09a6c6512183989 100644 (file)
@@ -57,7 +57,7 @@
 #include <sys/ucontext.h>
 #endif
 
-static void rspamd_worker_ignore_signal (int signo);
+static void rspamd_worker_ignore_signal (struct rspamd_worker_signal_handler *);
 /**
  * Return worker's control structure by its type
  * @param type
@@ -98,6 +98,16 @@ rspamd_worker_terminate_handlers (struct rspamd_worker *w)
 
        return ret;
 }
+
+static void
+rspamd_worker_on_delayed_shutdown (EV_P_ ev_timer *w, int revents)
+{
+       ev_break (loop, EVBREAK_ALL);
+#ifdef WITH_GPERF_TOOLS
+       ProfilerStop ();
+#endif
+}
+
 /*
  * Config reload is designed by sending sigusr2 to active workers and pending shutdown of them
  */
@@ -108,7 +118,10 @@ rspamd_worker_usr2_handler (struct rspamd_worker_signal_handler *sigh, void *arg
        struct timeval tv;
 
        if (!sigh->worker->wanna_die) {
-               rspamd_worker_ignore_signal (SIGUSR2);
+               static ev_timer shutdown_ev;
+
+               rspamd_worker_ignore_signal (sigh);
+
                tv.tv_sec = SOFT_SHUTDOWN_TIME;
                tv.tv_usec = 0;
                sigh->worker->wanna_die = TRUE;
@@ -119,7 +132,9 @@ rspamd_worker_usr2_handler (struct rspamd_worker_signal_handler *sigh, void *arg
                                G_STRFUNC,
                                "worker's shutdown is pending in %d sec",
                                SOFT_SHUTDOWN_TIME);
-               event_base_loopexit (sigh->base, &tv);
+               ev_timer_init (&shutdown_ev, rspamd_worker_on_delayed_shutdown,
+                               SOFT_SHUTDOWN_TIME, 0.0);
+               ev_timer_start (sigh->event_loop, &shutdown_ev);
                rspamd_worker_stop_accept (sigh->worker);
        }
 
@@ -142,9 +157,12 @@ rspamd_worker_usr1_handler (struct rspamd_worker_signal_handler *sigh, void *arg
 static gboolean
 rspamd_worker_term_handler (struct rspamd_worker_signal_handler *sigh, void *arg)
 {
-       struct timeval tv;
+       ev_tstamp delay;
 
        if (!sigh->worker->wanna_die) {
+               static ev_timer shutdown_ev;
+
+               rspamd_worker_ignore_signal (sigh);
                rspamd_default_log_function (G_LOG_LEVEL_INFO,
                                sigh->worker->srv->server_pool->tag.tagname,
                                sigh->worker->srv->server_pool->tag.uid,
@@ -152,19 +170,17 @@ rspamd_worker_term_handler (struct rspamd_worker_signal_handler *sigh, void *arg
                                "terminating after receiving signal %s",
                                g_strsignal (sigh->signo));
 
-               tv.tv_usec = 0;
                if (rspamd_worker_terminate_handlers (sigh->worker)) {
-                       tv.tv_sec =  SOFT_SHUTDOWN_TIME;
+                       delay = SOFT_SHUTDOWN_TIME;
                }
                else {
-                       tv.tv_sec = 0;
+                       delay = 0;
                }
 
                sigh->worker->wanna_die = 1;
-               event_base_loopexit (sigh->base, &tv);
-#ifdef WITH_GPERF_TOOLS
-               ProfilerStop ();
-#endif
+               ev_timer_init (&shutdown_ev, rspamd_worker_on_delayed_shutdown,
+                               SOFT_SHUTDOWN_TIME, 0.0);
+               ev_timer_start (sigh->event_loop, &shutdown_ev);
                rspamd_worker_stop_accept (sigh->worker);
        }
 
@@ -173,10 +189,10 @@ rspamd_worker_term_handler (struct rspamd_worker_signal_handler *sigh, void *arg
 }
 
 static void
-rspamd_worker_signal_handle (int fd, short what, void *arg)
+rspamd_worker_signal_handle (EV_P_ ev_signal *w, int revents)
 {
        struct rspamd_worker_signal_handler *sigh =
-                       (struct rspamd_worker_signal_handler *) arg;
+                       (struct rspamd_worker_signal_handler *)w->data;
        struct rspamd_worker_signal_cb *cb, *cbtmp;
 
        /* Call all signal handlers registered */
@@ -188,15 +204,9 @@ rspamd_worker_signal_handle (int fd, short what, void *arg)
 }
 
 static void
-rspamd_worker_ignore_signal (int signo)
+rspamd_worker_ignore_signal (struct rspamd_worker_signal_handler *sigh)
 {
-       struct sigaction sig;
-
-       sigemptyset (&sig.sa_mask);
-       sigaddset (&sig.sa_mask, signo);
-       sig.sa_handler = SIG_IGN;
-       sig.sa_flags = 0;
-       sigaction (signo, &sig, NULL);
+       ev_signal_stop (sigh->event_loop, &sigh->ev_sig);
 }
 
 static void
@@ -222,14 +232,14 @@ rspamd_sigh_free (void *p)
                g_free (cb);
        }
 
-       event_del (&sigh->ev);
+       ev_signal_stop (sigh->event_loop, &sigh->ev_sig);
        rspamd_worker_default_signal (sigh->signo);
        g_free (sigh);
 }
 
 void
 rspamd_worker_set_signal_handler (int signo, struct rspamd_worker *worker,
-               struct ev_loop *base,
+               struct ev_loop *event_loop,
                rspamd_worker_signal_handler handler,
                void *handler_data)
 {
@@ -242,12 +252,12 @@ rspamd_worker_set_signal_handler (int signo, struct rspamd_worker *worker,
                sigh = g_malloc0 (sizeof (*sigh));
                sigh->signo = signo;
                sigh->worker = worker;
-               sigh->base = base;
+               sigh->event_loop = event_loop;
                sigh->enabled = TRUE;
 
-               signal_set (&sigh->ev, signo, rspamd_worker_signal_handle, sigh);
-               event_base_set (base, &sigh->ev);
-               signal_add (&sigh->ev, NULL);
+               sigh->ev_sig.data = sigh;
+               ev_signal_init (&sigh->ev_sig, rspamd_worker_signal_handle, signo);
+               ev_signal_start (event_loop, &sigh->ev_sig);
 
                g_hash_table_insert (worker->signal_events,
                                GINT_TO_POINTER (signo),
@@ -428,7 +438,7 @@ rspamd_controller_send_error (struct rspamd_http_connection_entry *entry,
                NULL,
                "application/json",
                entry,
-               entry->rt->ptv);
+               entry->rt->timeout);
        entry->is_reply = TRUE;
 }
 
@@ -460,7 +470,7 @@ rspamd_controller_send_string (struct rspamd_http_connection_entry *entry,
                NULL,
                "application/json",
                entry,
-               entry->rt->ptv);
+               entry->rt->timeout);
        entry->is_reply = TRUE;
 }
 
@@ -486,7 +496,7 @@ rspamd_controller_send_ucl (struct rspamd_http_connection_entry *entry,
                NULL,
                "application/json",
                entry,
-               entry->rt->ptv);
+               entry->rt->timeout);
        entry->is_reply = TRUE;
 }
 
@@ -676,9 +686,8 @@ rspamd_fork_worker (struct rspamd_main *rspamd_main,
 #endif
 
                /* Remove the inherited event base */
-               event_reinit (rspamd_main->event_loop);
-               event_base_free (rspamd_main->event_loop);
-
+               ev_loop_destroy (EV_DEFAULT);
+               rspamd_main->event_loop = NULL;
                /* Drop privileges */
                rspamd_worker_drop_priv (rspamd_main);
                /* Set limits */
@@ -853,8 +862,7 @@ struct rspamd_worker_session_cache {
        struct ev_loop *ev_base;
        GHashTable *cache;
        struct rspamd_config *cfg;
-       struct timeval tv;
-       struct event periodic;
+       struct ev_timer periodic;
 };
 
 static gint
@@ -868,9 +876,10 @@ rspamd_session_cache_sort_cmp (gconstpointer pa, gconstpointer pb)
 }
 
 static void
-rspamd_sessions_cache_periodic (gint fd, short what, gpointer p)
+rspamd_sessions_cache_periodic (EV_P_ ev_timer *w, int revents)
 {
-       struct rspamd_worker_session_cache *c = p;
+       struct rspamd_worker_session_cache *c =
+                       (struct rspamd_worker_session_cache *)w->data;
        GHashTableIter it;
        gchar timebuf[32];
        gpointer k, v;
@@ -902,6 +911,8 @@ rspamd_sessions_cache_periodic (gint fd, short what, gpointer p)
                                        timebuf);
                }
        }
+
+       ev_timer_again (EV_A_ w);
 }
 
 void *
@@ -916,11 +927,10 @@ rspamd_worker_session_cache_new (struct rspamd_worker *w,
        c->cache = g_hash_table_new_full (g_direct_hash, g_direct_equal,
                        NULL, g_free);
        c->cfg = w->srv->cfg;
-       double_to_tv (periodic_interval, &c->tv);
-       event_set (&c->periodic, -1, EV_TIMEOUT|EV_PERSIST,
-                       rspamd_sessions_cache_periodic, c);
-       event_base_set (ev_base, &c->periodic);
-       event_add (&c->periodic, &c->tv);
+       c->periodic.data = c;
+       ev_timer_init (&c->periodic, rspamd_sessions_cache_periodic, periodic_interval,
+                       periodic_interval);
+       ev_timer_start (ev_base, &c->periodic);
 
        return c;
 }
@@ -1115,12 +1125,13 @@ rspamd_set_crash_handler (struct rspamd_main *rspamd_main)
 }
 
 static void
-rspamd_enable_accept_event (gint fd, short what, gpointer d)
+rspamd_enable_accept_event (EV_P_ ev_timer *w, int revents)
 {
-       struct event *events = d;
+       struct rspamd_worker_accept_event *ac_ev =
+                       (struct rspamd_worker_accept_event *)w->data;
 
-       event_del (&events[1]);
-       event_add (&events[0], NULL);
+       ev_timer_stop (EV_A_ w);
+       ev_io_start (EV_A_ &ac_ev->accept_ev);
 }
 
 void
@@ -1128,17 +1139,15 @@ rspamd_worker_throttle_accept_events (gint sock, void *data)
 {
        struct rspamd_worker_accept_event *head, *cur;
        const gdouble throttling = 0.5;
-       struct ev_loop *ev_base;
 
        head = (struct rspamd_worker_accept_event *)data;
 
        DL_FOREACH (head, cur) {
 
-               ev_base = event_get_base (&events[0]);
-               event_del (&events[0]);
-               event_set (&events[1], sock, EV_TIMEOUT, rspamd_enable_accept_event,
-                               events);
-               event_base_set (ev_base, &events[1]);
-               event_add (&events[1], &tv);
+               ev_io_stop (cur->event_loop, &cur->accept_ev);
+               cur->throttling_ev.data = cur;
+               ev_timer_init (&cur->throttling_ev, rspamd_enable_accept_event,
+                               throttling, 0.0);
+               ev_timer_start (cur->event_loop, &cur->throttling_ev);
        }
 }
\ No newline at end of file
index 67b54e5c9f4e0c34da15d785eaedffa7cf9b9f08..9693aa6ad60b63e8da85162610baddcfc39ec4e6 100644 (file)
@@ -56,7 +56,7 @@ rspamd_prepare_worker (struct rspamd_worker *worker, const char *name,
  */
 void rspamd_worker_set_signal_handler (int signo,
                struct rspamd_worker *worker,
-               struct ev_loop *base,
+               struct ev_loop *event_loop,
                rspamd_worker_signal_handler handler,
                void *handler_data);
 
index 3ae30c440161c5490bb1c0d6d43064a61dcf6d86..2313db0b2fe04df834e15bfb9cfb63a1d907730a 100644 (file)
@@ -45,7 +45,7 @@ struct rspamd_redis_cache_runtime {
        struct rspamd_redis_cache_ctx *ctx;
        struct rspamd_task *task;
        struct upstream *selected;
-       struct event timeout_event;
+       ev_timer timer_ev;
        redisAsyncContext *redis;
        gboolean has_event;
 };
@@ -92,9 +92,7 @@ rspamd_redis_cache_fin (gpointer data)
        redisAsyncContext *redis;
 
        rt->has_event = FALSE;
-       if (rspamd_event_pending (&rt->timeout_event, EV_TIMEOUT)) {
-               event_del (&rt->timeout_event);
-       }
+       ev_timer_stop (rt->task->event_loop, &rt->timer_ev);
 
        if (rt->redis) {
                redis = rt->redis;
@@ -105,9 +103,10 @@ rspamd_redis_cache_fin (gpointer data)
 }
 
 static void
-rspamd_redis_cache_timeout (gint fd, short what, gpointer d)
+rspamd_redis_cache_timeout (EV_P_ ev_timer *w, int revents)
 {
-       struct rspamd_redis_cache_runtime *rt = d;
+       struct rspamd_redis_cache_runtime *rt =
+                       (struct rspamd_redis_cache_runtime *)w->data;
        struct rspamd_task *task;
 
        task = rt->task;
@@ -117,7 +116,7 @@ rspamd_redis_cache_timeout (gint fd, short what, gpointer d)
        rspamd_upstream_fail (rt->selected, FALSE);
 
        if (rt->has_event) {
-               rspamd_session_remove_event (task->s, rspamd_redis_cache_fin, d);
+               rspamd_session_remove_event (task->s, rspamd_redis_cache_fin, rt);
        }
 }
 
@@ -401,8 +400,9 @@ rspamd_stat_cache_redis_runtime (struct rspamd_task *task,
        redisLibevAttach (task->event_loop, rt->redis);
 
        /* Now check stats */
-       event_set (&rt->timeout_event, -1, EV_TIMEOUT, rspamd_redis_cache_timeout, rt);
-       event_base_set (task->event_loop, &rt->timeout_event);
+       rt->timer_ev.data = rt;
+       ev_timer_init (&rt->timer_ev, rspamd_redis_cache_timeout,
+                       rt->ctx->timeout, 0.0);
        rspamd_redis_cache_maybe_auth (ctx, rt->redis);
 
        if (!learn) {
@@ -418,7 +418,6 @@ rspamd_stat_cache_redis_check (struct rspamd_task *task,
                gpointer runtime)
 {
        struct rspamd_redis_cache_runtime *rt = runtime;
-       struct timeval tv;
        gchar *h;
 
        if (rspamd_session_blocked (task->s)) {
@@ -431,8 +430,6 @@ rspamd_stat_cache_redis_check (struct rspamd_task *task,
                return RSPAMD_LEARN_INGORE;
        }
 
-       double_to_tv (rt->ctx->timeout, &tv);
-
        if (redisAsyncCommand (rt->redis, rspamd_stat_cache_redis_get, rt,
                        "HGET %s %s",
                        rt->ctx->redis_object, h) == REDIS_OK) {
@@ -440,7 +437,7 @@ rspamd_stat_cache_redis_check (struct rspamd_task *task,
                                rspamd_redis_cache_fin,
                                rt,
                                M);
-               event_add (&rt->timeout_event, &tv);
+               ev_timer_start (rt->task->event_loop, &rt->timer_ev);
                rt->has_event = TRUE;
        }
 
@@ -454,7 +451,6 @@ rspamd_stat_cache_redis_learn (struct rspamd_task *task,
                gpointer runtime)
 {
        struct rspamd_redis_cache_runtime *rt = runtime;
-       struct timeval tv;
        gchar *h;
        gint flag;
 
@@ -465,7 +461,6 @@ rspamd_stat_cache_redis_learn (struct rspamd_task *task,
        h = rspamd_mempool_get_variable (task->task_pool, "words_hash");
        g_assert (h != NULL);
 
-       double_to_tv (rt->ctx->timeout, &tv);
        flag = (task->flags & RSPAMD_TASK_FLAG_LEARN_SPAM) ? 1 : -1;
 
        if (redisAsyncCommand (rt->redis, rspamd_stat_cache_redis_set, rt,
@@ -473,7 +468,7 @@ rspamd_stat_cache_redis_learn (struct rspamd_task *task,
                        rt->ctx->redis_object, h, flag) == REDIS_OK) {
                rspamd_session_add_event (task->s,
                                rspamd_redis_cache_fin, rt, M);
-               event_add (&rt->timeout_event, &tv);
+               ev_timer_start (rt->task->event_loop, &rt->timer_ev);
                rt->has_event = TRUE;
        }
 
index 9048a26bd8618fc09838f6b4ee2c771e755ec341..e47271ca35398a4b46c770376a6f81dd5bfe901b 100644 (file)
@@ -122,7 +122,7 @@ struct rspamd_worker_signal_handler {
        gint signo;
        gboolean enabled;
        ev_signal ev_sig;
-       struct ev_loop *base;
+       struct ev_loop *event_loop;
        struct rspamd_worker *worker;
        struct rspamd_worker_signal_cb *cb;
 };