]> source.dussan.org Git - rspamd.git/commitdiff
[Fix] Add fail-safety for destroying sessions
authorVsevolod Stakhov <vsevolod@highsecure.ru>
Tue, 14 Aug 2018 11:51:54 +0000 (12:51 +0100)
committerVsevolod Stakhov <vsevolod@highsecure.ru>
Tue, 14 Aug 2018 11:51:54 +0000 (12:51 +0100)
src/libserver/dns.c
src/libserver/events.c
src/libserver/events.h
src/libstat/backends/redis_backend.c
src/libstat/learn_cache/redis_cache.c
src/lua/lua_http.c
src/lua/lua_redis.c
src/lua/lua_tcp.c
src/plugins/fuzzy_check.c
src/plugins/surbl.c

index 6d488ccf3a17d5fec676ec4c1518f93c2ab14c25..fbf37363ae0299cd52abeb3a7950f90fa3d4fffc 100644 (file)
@@ -122,6 +122,10 @@ make_dns_request (struct rspamd_dns_resolver *resolver,
                return FALSE;
        }
 
+       if (session && rspamd_session_is_destroying (session)) {
+               return FALSE;
+       }
+
        if (pool != NULL) {
                reqdata =
                        rspamd_mempool_alloc0 (pool, sizeof (struct rspamd_dns_request_ud));
index 9b0d049d43f526a9d5a46781e19676b3ea215480..f62005b962ac771862ac98e8f49292ea73929898 100644 (file)
@@ -172,6 +172,13 @@ rspamd_session_add_event (struct rspamd_async_session *session,
                g_assert_not_reached ();
        }
 
+       if (RSPAMD_SESSION_IS_DESTROYING (session)) {
+               msg_debug_session ("skip adding event subsystem: %s: session is destroying",
+                               g_quark_to_string (subsystem));
+
+               return NULL;
+       }
+
        new_event = rspamd_mempool_alloc (session->pool,
                        sizeof (struct rspamd_async_event));
        new_event->fin = fin;
@@ -502,4 +509,12 @@ rspamd_session_mempool (struct rspamd_async_session *session)
        g_assert (session != NULL);
 
        return session->pool;
+}
+
+gboolean
+rspamd_session_is_destroying (struct rspamd_async_session *session)
+{
+       g_assert (session != NULL);
+
+       return RSPAMD_SESSION_IS_DESTROYING (session);
 }
\ No newline at end of file
index 760bb000c8e1180e4ebb9f7e9ad94db5fe278ce0..10ccb8d1d82f12888680f9c27e3e780295e9c0ab 100644 (file)
@@ -148,4 +148,11 @@ void rspamd_session_watcher_pop (struct rspamd_async_session *s,
 struct rspamd_async_watcher* rspamd_session_get_watcher (
                struct rspamd_async_session *s);
 
+/**
+ * Returns TRUE if an async session is currently destroying
+ * @param s
+ * @return
+ */
+gboolean rspamd_session_is_destroying (struct rspamd_async_session *s);
+
 #endif /* RSPAMD_EVENTS_H */
index 69c14e1675f0801455227d44e403f24c80fac101..5510cff05940c7cac8e15adf386d39d002b41a8d 100644 (file)
@@ -1572,6 +1572,9 @@ rspamd_redis_process_tokens (struct rspamd_task *task,
        gint ret;
        const gchar *learned_key = "learns";
 
+       if (rspamd_session_is_destroying (task->s)) {
+               return FALSE;
+       }
 
        if (tokens == NULL || tokens->len == 0 || rt->redis == NULL) {
                return FALSE;
@@ -1664,6 +1667,10 @@ rspamd_redis_learn_tokens (struct rspamd_task *task, GPtrArray *tokens,
        goffset off;
        const gchar *learned_key = "learns";
 
+       if (rspamd_session_is_destroying (task->s)) {
+               return FALSE;
+       }
+
        up = rspamd_upstream_get (rt->ctx->write_servers,
                        RSPAMD_UPSTREAM_MASTER_SLAVE,
                        NULL,
index d43ec366501526ce3d5637e416af29e952d0355b..e17f20d27b46f8de0ff937c9b7c24ef3cb70c5ef 100644 (file)
@@ -438,6 +438,10 @@ rspamd_stat_cache_redis_check (struct rspamd_task *task,
        struct timeval tv;
        gchar *h;
 
+       if (rspamd_session_is_destroying (task->s)) {
+               return RSPAMD_LEARN_INGORE;
+       }
+
        h = rspamd_mempool_get_variable (task->task_pool, "words_hash");
 
        if (h == NULL) {
@@ -469,6 +473,10 @@ rspamd_stat_cache_redis_learn (struct rspamd_task *task,
        gchar *h;
        gint flag;
 
+       if (rspamd_session_is_destroying (task->s)) {
+               return RSPAMD_LEARN_INGORE;
+       }
+
        h = rspamd_mempool_get_variable (task->task_pool, "words_hash");
        g_assert (h != NULL);
 
index b9a2d7b59f961130aa4ad15f4deab6906dc840e4..cb202642182ea74a35e1123c0d6cc7c809095408 100644 (file)
@@ -377,7 +377,7 @@ lua_http_request (lua_State *L)
        struct rspamd_http_message *msg;
        struct lua_http_cbdata *cbd;
        struct rspamd_dns_resolver *resolver;
-       struct rspamd_async_session *session;
+       struct rspamd_async_session *session = NULL;
        struct rspamd_lua_text *t;
        struct rspamd_task *task = NULL;
        struct rspamd_config *cfg = NULL;
@@ -700,6 +700,12 @@ lua_http_request (lua_State *L)
                return 1;
        }
 
+       if (session && rspamd_session_is_destroying (session)) {
+               lua_pushboolean (L, FALSE);
+
+               return 1;
+       }
+
        cbd = g_malloc0 (sizeof (*cbd));
        cbd->L = L;
        cbd->cbref = cbref;
index cf416a12a7fb9d356a242a6ffc50ed66cda1c67e..82ef0c53bd2557e2993955c5bf53c6e754e425f5 100644 (file)
@@ -675,6 +675,10 @@ rspamd_lua_redis_prepare_connection (lua_State *L, gint *pcbref)
 
                lua_pop (L, 1); /* table */
 
+               if (session && rspamd_session_is_destroying (session)) {
+                       ret = FALSE;
+               }
+
 
                if (ret && addr != NULL) {
                        ctx = g_malloc0 (sizeof (struct lua_redis_ctx));
@@ -1200,6 +1204,13 @@ lua_redis_add_cmd (lua_State *L)
 
                        LL_PREPEND (sp_ud->c->specific, sp_ud);
 
+                       if (ud->s && rspamd_session_is_destroying (ud->s)) {
+                               lua_pushboolean (L, 0);
+                               lua_pushstring (L, "session is terminating");
+
+                               return 2;
+                       }
+
                        ret = redisAsyncCommandArgv (sp_ud->c->ctx,
                                        lua_redis_callback,
                                        sp_ud,
index 6c3f6ec044135c8165d09b3adb6bf1413e18b12a..797bdcc4eb919e32bfdb42bd0ae99e25a0a1b1a4 100644 (file)
@@ -804,7 +804,8 @@ lua_tcp_plan_handler_event (struct lua_tcp_cbdata *cbd, gboolean can_read,
        }
 }
 
-static void
+
+static gboolean
 lua_tcp_register_event (struct lua_tcp_cbdata *cbd)
 {
        if (cbd->session) {
@@ -812,9 +813,16 @@ lua_tcp_register_event (struct lua_tcp_cbdata *cbd)
                                (event_finalizer_t) lua_tcp_fin,
                                cbd,
                                g_quark_from_static_string ("lua tcp"));
+
+               if (!cbd->async_ev) {
+                       return FALSE;
+               }
+
                cbd->w = rspamd_session_get_watcher (cbd->session);
                rspamd_session_watcher_push (cbd->session);
        }
+
+       return TRUE;
 }
 
 static gboolean
@@ -1232,6 +1240,13 @@ lua_tcp_request (lua_State *L)
 
        if (session) {
                cbd->session = session;
+
+               if (rspamd_session_is_destroying (session)) {
+                       REF_RELEASE (cbd);
+                       lua_pushboolean (L, FALSE);
+
+                       return 1;
+               }
        }
 
        if (rspamd_parse_inet_address (&cbd->addr, host, 0)) {
index b5c3903289ca66ba34cd920691514e1820229c5f..c0fd8aa4c6531c462adba2dc48320cbd2df9ffb0 100644 (file)
@@ -2830,49 +2830,50 @@ register_fuzzy_client_call (struct rspamd_task *task,
        rspamd_inet_addr_t *addr;
        gint sock;
 
-       /* Get upstream */
-       selected = rspamd_upstream_get (rule->servers, RSPAMD_UPSTREAM_ROUND_ROBIN,
-                       NULL, 0);
-       if (selected) {
-               addr = rspamd_upstream_addr (selected);
-               if ((sock = rspamd_inet_address_connect (addr, SOCK_DGRAM, TRUE)) == -1) {
-                       msg_warn_task ("cannot connect to %s(%s), %d, %s",
-                               rspamd_upstream_name (selected),
-                                       rspamd_inet_address_to_string_pretty (addr),
-                               errno,
-                               strerror (errno));
-                       rspamd_upstream_fail (selected, FALSE);
-                       g_ptr_array_free (commands, TRUE);
-               }
-               else {
-                       /* Create session for a socket */
-                       session =
-                               rspamd_mempool_alloc0 (task->task_pool,
-                                       sizeof (struct fuzzy_client_session));
-                       msec_to_tv (rule->ctx->io_timeout, &session->tv);
-                       session->state = 0;
-                       session->commands = commands;
-                       session->task = task;
-                       session->fd = sock;
-                       session->server = selected;
-                       session->rule = rule;
-                       session->addr = addr;
-                       session->results = g_ptr_array_sized_new (32);
-
-                       event_set (&session->ev, sock, EV_WRITE, fuzzy_check_io_callback,
-                                       session);
-                       event_base_set (session->task->ev_base, &session->ev);
-                       event_add (&session->ev, NULL);
+       if (!rspamd_session_is_destroying (task->s)) {
+               /* Get upstream */
+               selected = rspamd_upstream_get (rule->servers, RSPAMD_UPSTREAM_ROUND_ROBIN,
+                               NULL, 0);
+               if (selected) {
+                       addr = rspamd_upstream_addr (selected);
+                       if ((sock = rspamd_inet_address_connect (addr, SOCK_DGRAM, TRUE)) == -1) {
+                               msg_warn_task ("cannot connect to %s(%s), %d, %s",
+                                               rspamd_upstream_name (selected),
+                                               rspamd_inet_address_to_string_pretty (addr),
+                                               errno,
+                                               strerror (errno));
+                               rspamd_upstream_fail (selected, FALSE);
+                               g_ptr_array_free (commands, TRUE);
+                       } else {
+                               /* Create session for a socket */
+                               session =
+                                               rspamd_mempool_alloc0 (task->task_pool,
+                                                               sizeof (struct fuzzy_client_session));
+                               msec_to_tv (rule->ctx->io_timeout, &session->tv);
+                               session->state = 0;
+                               session->commands = commands;
+                               session->task = task;
+                               session->fd = sock;
+                               session->server = selected;
+                               session->rule = rule;
+                               session->addr = addr;
+                               session->results = g_ptr_array_sized_new (32);
+
+                               event_set (&session->ev, sock, EV_WRITE, fuzzy_check_io_callback,
+                                               session);
+                               event_base_set (session->task->ev_base, &session->ev);
+                               event_add (&session->ev, NULL);
 
-                       evtimer_set (&session->timev, fuzzy_check_timer_callback,
-                                       session);
-                       event_base_set (session->task->ev_base, &session->timev);
-                       event_add (&session->timev, &session->tv);
+                               evtimer_set (&session->timev, fuzzy_check_timer_callback,
+                                               session);
+                               event_base_set (session->task->ev_base, &session->timev);
+                               event_add (&session->timev, &session->tv);
 
-                       rspamd_session_add_event (task->s,
-                               fuzzy_io_fin,
-                               session,
-                               g_quark_from_static_string ("fuzzy check"));
+                               rspamd_session_add_event (task->s,
+                                               fuzzy_io_fin,
+                                               session,
+                                               g_quark_from_static_string ("fuzzy check"));
+                       }
                }
        }
 }
@@ -3310,48 +3311,48 @@ fuzzy_check_send_lua_learn (struct fuzzy_rule *rule,
        gint ret = -1;
 
        /* Get upstream */
-
-       while ((selected = rspamd_upstream_get (rule->servers,
-                       RSPAMD_UPSTREAM_SEQUENTIAL, NULL, 0))) {
-               /* Create UDP socket */
-               addr = rspamd_upstream_addr (selected);
-
-               if ((sock = rspamd_inet_address_connect (addr,
-                               SOCK_DGRAM, TRUE)) == -1) {
-                       rspamd_upstream_fail (selected, TRUE);
-               }
-               else {
-                       s =
-                               rspamd_mempool_alloc0 (task->task_pool,
-                                       sizeof (struct fuzzy_learn_session));
-
-                       msec_to_tv (rule->ctx->io_timeout, &s->tv);
-                       s->task = task;
-                       s->addr = addr;
-                       s->commands = commands;
-                       s->http_entry = NULL;
-                       s->server = selected;
-                       s->saved = saved;
-                       s->fd = sock;
-                       s->err = err;
-                       s->rule = rule;
-                       s->session = task->s;
-
-                       event_set (&s->ev, sock, EV_WRITE, fuzzy_controller_io_callback, s);
-                       event_base_set (task->ev_base, &s->ev);
-                       event_add (&s->ev, NULL);
-
-                       evtimer_set (&s->timev, fuzzy_controller_timer_callback, s);
-                       event_base_set (s->task->ev_base, &s->timev);
-                       event_add (&s->timev, &s->tv);
-
-                       rspamd_session_add_event (task->s,
-                                       fuzzy_lua_fin,
-                                       s,
-                                       g_quark_from_static_string ("fuzzy check"));
-
-                       (*saved)++;
-                       ret = 1;
+       if (!rspamd_session_is_destroying (task->s)) {
+               while ((selected = rspamd_upstream_get (rule->servers,
+                               RSPAMD_UPSTREAM_SEQUENTIAL, NULL, 0))) {
+                       /* Create UDP socket */
+                       addr = rspamd_upstream_addr (selected);
+
+                       if ((sock = rspamd_inet_address_connect (addr,
+                                       SOCK_DGRAM, TRUE)) == -1) {
+                               rspamd_upstream_fail (selected, TRUE);
+                       } else {
+                               s =
+                                               rspamd_mempool_alloc0 (task->task_pool,
+                                                               sizeof (struct fuzzy_learn_session));
+
+                               msec_to_tv (rule->ctx->io_timeout, &s->tv);
+                               s->task = task;
+                               s->addr = addr;
+                               s->commands = commands;
+                               s->http_entry = NULL;
+                               s->server = selected;
+                               s->saved = saved;
+                               s->fd = sock;
+                               s->err = err;
+                               s->rule = rule;
+                               s->session = task->s;
+
+                               event_set (&s->ev, sock, EV_WRITE, fuzzy_controller_io_callback, s);
+                               event_base_set (task->ev_base, &s->ev);
+                               event_add (&s->ev, NULL);
+
+                               evtimer_set (&s->timev, fuzzy_controller_timer_callback, s);
+                               event_base_set (s->task->ev_base, &s->timev);
+                               event_add (&s->timev, &s->tv);
+
+                               rspamd_session_add_event (task->s,
+                                               fuzzy_lua_fin,
+                                               s,
+                                               g_quark_from_static_string ("fuzzy check"));
+
+                               (*saved)++;
+                               ret = 1;
+                       }
                }
        }
 
index 81496f0a9a759cbe0bfd9919a26275dfd482b3fa..5b2375888e6653c9780a5bbdecb091130072a744 100644 (file)
@@ -1638,55 +1638,58 @@ register_redirector_call (struct rspamd_url *url, struct rspamd_task *task,
        struct rspamd_http_message *msg;
        struct surbl_ctx *surbl_module_ctx = surbl_get_context (task->cfg);
 
-       selected = rspamd_upstream_get (surbl_module_ctx->redirectors,
-                       RSPAMD_UPSTREAM_ROUND_ROBIN, url->host, url->hostlen);
+       if (!rspamd_session_is_destroying (task->s)) {
 
-       if (selected) {
-               s = rspamd_inet_address_connect (rspamd_upstream_addr (selected),
-                               SOCK_STREAM, TRUE);
-       }
+               selected = rspamd_upstream_get (surbl_module_ctx->redirectors,
+                               RSPAMD_UPSTREAM_ROUND_ROBIN, url->host, url->hostlen);
 
-       if (s == -1) {
-               msg_info_surbl ("<%s> cannot create tcp socket failed: %s",
-                       task->message_id,
-                       strerror (errno));
-               return;
-       }
+               if (selected) {
+                       s = rspamd_inet_address_connect (rspamd_upstream_addr (selected),
+                                       SOCK_STREAM, TRUE);
+               }
 
-       param =
-               rspamd_mempool_alloc (task->task_pool,
-                       sizeof (struct redirector_param));
-       param->url = url;
-       param->task = task;
-       param->conn = rspamd_http_connection_new (NULL,
-                       surbl_redirector_error,
-                       surbl_redirector_finish,
-                       RSPAMD_HTTP_CLIENT_SIMPLE,
-                       RSPAMD_HTTP_CLIENT,
-                       NULL,
-                       NULL);
-       param->ctx = surbl_module_ctx;
-       msg = rspamd_http_new_message (HTTP_REQUEST);
-       msg->url = rspamd_fstring_assign (msg->url, url->string, url->urllen);
-       param->sock = s;
-       param->redirector = selected;
-       timeout = rspamd_mempool_alloc (task->task_pool, sizeof (struct timeval));
-       double_to_tv (surbl_module_ctx->read_timeout, timeout);
-
-       rspamd_session_add_event (task->s,
-               free_redirector_session,
-               param,
-               g_quark_from_static_string ("surbl"));
-
-       rspamd_http_connection_write_message (param->conn, msg, NULL,
-                       NULL, param, s, timeout, task->ev_base);
-
-       msg_info_surbl (
-               "<%s> registered redirector call for %*s to %s, according to rule: %s",
-               task->message_id,
-               url->urllen, url->string,
-               rspamd_upstream_name (param->redirector),
-               rule);
+               if (s == -1) {
+                       msg_info_surbl ("<%s> cannot create tcp socket failed: %s",
+                                       task->message_id,
+                                       strerror (errno));
+                       return;
+               }
+
+               param =
+                               rspamd_mempool_alloc (task->task_pool,
+                                               sizeof (struct redirector_param));
+               param->url = url;
+               param->task = task;
+               param->conn = rspamd_http_connection_new (NULL,
+                               surbl_redirector_error,
+                               surbl_redirector_finish,
+                               RSPAMD_HTTP_CLIENT_SIMPLE,
+                               RSPAMD_HTTP_CLIENT,
+                               NULL,
+                               NULL);
+               param->ctx = surbl_module_ctx;
+               msg = rspamd_http_new_message (HTTP_REQUEST);
+               msg->url = rspamd_fstring_assign (msg->url, url->string, url->urllen);
+               param->sock = s;
+               param->redirector = selected;
+               timeout = rspamd_mempool_alloc (task->task_pool, sizeof (struct timeval));
+               double_to_tv (surbl_module_ctx->read_timeout, timeout);
+
+               rspamd_session_add_event (task->s,
+                               free_redirector_session,
+                               param,
+                               g_quark_from_static_string ("surbl"));
+
+               rspamd_http_connection_write_message (param->conn, msg, NULL,
+                               NULL, param, s, timeout, task->ev_base);
+
+               msg_info_surbl (
+                               "<%s> registered redirector call for %*s to %s, according to rule: %s",
+                               task->message_id,
+                               url->urllen, url->string,
+                               rspamd_upstream_name (param->redirector),
+                               rule);
+       }
 }
 
 static gboolean