]> source.dussan.org Git - rspamd.git/commitdiff
[Fix] Fix watchers in lua_tcp
authorVsevolod Stakhov <vsevolod@highsecure.ru>
Thu, 13 Sep 2018 11:45:50 +0000 (12:45 +0100)
committerVsevolod Stakhov <vsevolod@highsecure.ru>
Thu, 13 Sep 2018 11:45:50 +0000 (12:45 +0100)
src/lua/lua_http.c
src/lua/lua_tcp.c

index 4471b01ce1f4b5e77163f480b14aed5eb3de90d1..96872108cf261b37b5b74db653517fadca5311d4 100644 (file)
@@ -897,6 +897,10 @@ lua_http_request (lua_State *L)
                        lua_http_maybe_free (cbd);
                        lua_pushboolean (L, FALSE);
 
+                       if (cbd->w) {
+                               rspamd_session_watcher_pop (cbd->session, cbd->w);
+                       }
+
                        return 1;
                }
        }
@@ -912,9 +916,14 @@ lua_http_request (lua_State *L)
                                lua_pushboolean (L, FALSE);
                                g_free (to_resolve);
 
+                               if (cbd->w) {
+                                       rspamd_session_watcher_pop (cbd->session, cbd->w);
+                               }
+
                                return 1;
                        }
 
+
                        g_free (to_resolve);
                }
                else {
@@ -925,6 +934,10 @@ lua_http_request (lua_State *L)
                                lua_http_maybe_free (cbd);
                                lua_pushboolean (L, FALSE);
 
+                               if (cbd->w) {
+                                       rspamd_session_watcher_pop (cbd->session, cbd->w);
+                               }
+
                                return 1;
                        }
                }
@@ -932,11 +945,14 @@ lua_http_request (lua_State *L)
 
        if (cbd->cbref == -1) {
                cbd->thread = lua_thread_pool_get_running_entry (cfg->lua_thread_pool);
+
                return lua_thread_yield (cbd->thread, 0);
-       } else {
+       }
+       else {
                lua_pushboolean (L, TRUE);
-               return 1;
        }
+
+       return 1;
 }
 
 static gint
index 099fc68961b8a59c7d4a8e3349576de1ddf4385a..ef4b4cb5ad21acf83393032be90fed2948a9d034 100644 (file)
@@ -305,6 +305,7 @@ struct lua_tcp_dtor {
 #define LUA_TCP_FLAG_CONNECTED (1 << 3)
 #define LUA_TCP_FLAG_FINISHED (1 << 4)
 #define LUA_TCP_FLAG_SYNC (1 << 5)
+#define LUA_TCP_FLAG_RESOLVED (1 << 6)
 
 #undef TCP_DEBUG_REFS
 #ifdef TCP_DEBUG_REFS
@@ -480,15 +481,23 @@ lua_tcp_maybe_free (struct lua_tcp_cbdata *cbd)
                 * in this mode, we don't remove object, we only remove the event
                 * Object is owned by lua and will be destroyed on __gc()
                 */
-               if (cbd->async_ev) {
+
+               if (cbd->w) {
                        rspamd_session_watcher_pop (cbd->session, cbd->w);
+               }
+
+               if (cbd->async_ev) {
                        rspamd_session_remove_event (cbd->session, lua_tcp_void_finalyser, cbd);
                }
+
                cbd->async_ev = NULL;
        }
        else {
-               if (cbd->async_ev) {
+               if (cbd->w) {
                        rspamd_session_watcher_pop (cbd->session, cbd->w);
+               }
+
+               if (cbd->async_ev) {
                        rspamd_session_remove_event (cbd->session, lua_tcp_fin, cbd);
                }
                else {
@@ -514,6 +523,7 @@ lua_tcp_push_error (struct lua_tcp_cbdata *cbd, gboolean is_fatal,
        struct lua_tcp_handler *hdl;
        gint cbref, top;
        struct lua_callback_state cbs;
+       struct rspamd_async_watcher *existing_watcher = NULL;
        lua_State *L;
 
        if (cbd->thread) {
@@ -560,10 +570,21 @@ lua_tcp_push_error (struct lua_tcp_cbdata *cbd, gboolean is_fatal,
                        rspamd_lua_setclass (L, "rspamd{tcp}", -1);
                        TCP_RETAIN (cbd);
 
+                       if (cbd->w) {
+                               /* Replace watcher to deal with nested calls */
+                               existing_watcher = rspamd_session_replace_watcher (cbd->session, cbd->w);
+                       }
+
                        if (lua_pcall (L, 3, 0, 0) != 0) {
                                msg_info ("callback call failed: %s", lua_tostring (L, -1));
                        }
 
+
+                       if (cbd->w) {
+                               /* Restore existing watcher */
+                               rspamd_session_replace_watcher (cbd->session, existing_watcher);
+                       }
+
                        lua_settop (L, top);
 
                        TCP_RELEASE (cbd);
@@ -594,6 +615,7 @@ lua_tcp_push_data (struct lua_tcp_cbdata *cbd, const guint8 *str, gsize len)
        gint cbref, arg_cnt, top;
        struct lua_callback_state cbs;
        lua_State *L;
+       struct rspamd_async_watcher *existing_watcher = NULL;
 
        if (cbd->thread) {
                lua_tcp_resume_thread (cbd, str, len);
@@ -639,10 +661,20 @@ lua_tcp_push_data (struct lua_tcp_cbdata *cbd, const guint8 *str, gsize len)
 
                TCP_RETAIN (cbd);
 
+               if (cbd->w) {
+                       /* Replace watcher to deal with nested calls */
+                       existing_watcher = rspamd_session_replace_watcher (cbd->session, cbd->w);
+               }
+
                if (lua_pcall (L, arg_cnt, 0, 0) != 0) {
                        msg_info ("callback call failed: %s", lua_tostring (L, -1));
                }
 
+               if (cbd->w) {
+                       /* Restore existing watcher */
+                       rspamd_session_replace_watcher (cbd->session, existing_watcher);
+               }
+
                lua_settop (L, top);
                TCP_RELEASE (cbd);
        }
@@ -669,23 +701,25 @@ lua_tcp_resume_thread_error_argp (struct lua_tcp_cbdata *cbd, const gchar *error
 static void
 lua_tcp_resume_thread (struct lua_tcp_cbdata *cbd, const guint8 *str, gsize len)
 {
-/*
- * typical call returns:
- *
- * read:
- *  error:
- *    (nil, error message)
- *  got data:
- *    (true, data)
- * write/connect:
- *   error:
- *     (nil, error message)
- *   wrote
- *     (true)
- */
+       /*
       * typical call returns:
       *
       * read:
       *  error:
       *    (nil, error message)
       *  got data:
       *    (true, data)
       * write/connect:
       *   error:
       *     (nil, error message)
       *   wrote
       *     (true)
       */
 
        lua_State *L = cbd->thread->lua_state;
        struct lua_tcp_handler *hdl;
+       struct rspamd_async_watcher *existing_watcher = NULL;
+
        hdl = g_queue_peek_head (cbd->handlers);
 
        lua_pushboolean (L, TRUE);
@@ -695,10 +729,22 @@ lua_tcp_resume_thread (struct lua_tcp_cbdata *cbd, const guint8 *str, gsize len)
        else {
                lua_pushnil (L);
        }
+
        lua_tcp_shift_handler (cbd);
        lua_thread_pool_set_running_entry (cbd->cfg->lua_thread_pool, cbd->thread);
+
+       if (cbd->w) {
+               /* Replace watcher to deal with nested calls */
+               existing_watcher = rspamd_session_replace_watcher (cbd->session, cbd->w);
+       }
+
        lua_thread_resume (cbd->thread, 2);
 
+       if (cbd->w) {
+               /* Restore existing watcher */
+               rspamd_session_replace_watcher (cbd->session, existing_watcher);
+       }
+
        TCP_RELEASE (cbd);
 }
 
@@ -992,11 +1038,12 @@ lua_tcp_handler (int fd, short what, gpointer ud)
                                cbd->flags |= LUA_TCP_FLAG_CONNECTED;
 
                                if (cbd->connect_cb != -1) {
-                                       lua_thread_pool_prepare_callback (cbd->cfg->lua_thread_pool, &cbs);
-                                       L = cbs.L;
-
                                        struct lua_tcp_cbdata **pcbd;
                                        gint top;
+                                       struct rspamd_async_watcher *existing_watcher = NULL;
+
+                                       lua_thread_pool_prepare_callback (cbd->cfg->lua_thread_pool, &cbs);
+                                       L = cbs.L;
 
                                        top = lua_gettop (L);
                                        lua_rawgeti (L, LUA_REGISTRYINDEX, cbd->connect_cb);
@@ -1005,10 +1052,22 @@ lua_tcp_handler (int fd, short what, gpointer ud)
                                        TCP_RETAIN (cbd);
                                        rspamd_lua_setclass (L, "rspamd{tcp}", -1);
 
+                                       if (cbd->w) {
+                                               /* Replace watcher to deal with nested calls */
+                                               existing_watcher = rspamd_session_replace_watcher (
+                                                               cbd->session, cbd->w);
+                                       }
+
                                        if (lua_pcall (L, 1, 0, 0) != 0) {
                                                msg_info ("callback call failed: %s", lua_tostring (L, -1));
                                        }
 
+                                       if (cbd->w) {
+                                               /* Restore existing watcher */
+                                               rspamd_session_replace_watcher (cbd->session,
+                                                               existing_watcher);
+                                       }
+
                                        lua_settop (L, top);
 
                                        TCP_RELEASE (cbd);
@@ -1138,14 +1197,23 @@ lua_tcp_register_event (struct lua_tcp_cbdata *cbd)
                if (!cbd->async_ev) {
                        return FALSE;
                }
-
-               cbd->w = rspamd_session_get_watcher (cbd->session);
-               rspamd_session_watcher_push (cbd->session);
        }
 
        return TRUE;
 }
 
+static void
+lua_tcp_register_watcher (struct lua_tcp_cbdata *cbd)
+{
+       if (cbd->session) {
+               cbd->w = rspamd_session_get_watcher (cbd->session);
+
+               if (cbd->w) {
+                       rspamd_session_watcher_push (cbd->session);
+               }
+       }
+}
+
 static gboolean
 lua_tcp_make_connection (struct lua_tcp_cbdata *cbd)
 {
@@ -1170,6 +1238,9 @@ lua_tcp_make_connection (struct lua_tcp_cbdata *cbd)
                return FALSE;
        }
 
+       cbd->flags |= LUA_TCP_FLAG_RESOLVED;
+       lua_tcp_register_event (cbd);
+
        cbd->fd = fd;
        lua_tcp_plan_handler_event (cbd, TRUE, TRUE);
 
@@ -1576,6 +1647,8 @@ lua_tcp_request (lua_State *L)
 
                        return 1;
                }
+
+               lua_tcp_register_watcher (cbd);
        }
 
        if (rspamd_parse_inet_address (&cbd->addr, host, 0)) {
@@ -1585,11 +1658,12 @@ lua_tcp_request (lua_State *L)
                        TCP_RELEASE (cbd);
                        lua_pushboolean (L, FALSE);
 
+                       if (cbd->w) {
+                               rspamd_session_watcher_pop (cbd->session, cbd->w);
+                       }
+
                        return 1;
                }
-               else {
-                       lua_tcp_register_event (cbd);
-               }
        }
        else {
                if (task == NULL) {
@@ -1598,11 +1672,13 @@ lua_tcp_request (lua_State *L)
                                lua_tcp_push_error (cbd, TRUE, "cannot resolve host: %s", host);
                                TCP_RELEASE (cbd);
                                lua_pushboolean (L, FALSE);
+
+                               if (cbd->w) {
+                                       rspamd_session_watcher_pop (cbd->session, cbd->w);
+                               }
+
                                return 1;
                        }
-                       else {
-                               lua_tcp_register_event (cbd);
-                       }
                }
                else {
                        if (!make_dns_request_task (task, lua_tcp_dns_handler, cbd,
@@ -1610,11 +1686,13 @@ lua_tcp_request (lua_State *L)
                                lua_tcp_push_error (cbd, TRUE, "cannot resolve host: %s", host);
                                TCP_RELEASE (cbd);
                                lua_pushboolean (L, FALSE);
+
+                               if (cbd->w) {
+                                       rspamd_session_watcher_pop (cbd->session, cbd->w);
+                               }
+
                                return 1;
                        }
-                       else {
-                               lua_tcp_register_event (cbd);
-                       }
                }
        }
 
@@ -1741,6 +1819,8 @@ lua_tcp_connect_sync (lua_State *L)
 
                        return 2;
                }
+
+               lua_tcp_register_watcher (cbd);
        }
 
        if (rspamd_parse_inet_address (&cbd->addr, host, 0)) {
@@ -1751,11 +1831,12 @@ lua_tcp_connect_sync (lua_State *L)
                        lua_pushboolean (L, FALSE);
                        lua_pushliteral (L, "Failed to initiate connection");
 
+                       if (cbd->w) {
+                               rspamd_session_watcher_pop (cbd->session, cbd->w);
+                       }
+
                        return 2;
                }
-               else {
-                       lua_tcp_register_event (cbd);
-               }
        }
        else {
                if (task == NULL) {
@@ -1765,11 +1846,12 @@ lua_tcp_connect_sync (lua_State *L)
                                lua_pushboolean (L, FALSE);
                                lua_pushliteral (L, "Failed to initiate dns request");
 
+                               if (cbd->w) {
+                                       rspamd_session_watcher_pop (cbd->session, cbd->w);
+                               }
+
                                return 2;
                        }
-                       else {
-                               lua_tcp_register_event (cbd);
-                       }
                }
                else {
                        if (!make_dns_request_task (task, lua_tcp_dns_handler, cbd,
@@ -1778,11 +1860,12 @@ lua_tcp_connect_sync (lua_State *L)
                                lua_pushboolean (L, FALSE);
                                lua_pushliteral (L, "Failed to initiate dns request");
 
+                               if (cbd->w) {
+                                       rspamd_session_watcher_pop (cbd->session, cbd->w);
+                               }
+
                                return 2;
                        }
-                       else {
-                               lua_tcp_register_event (cbd);
-                       }
                }
        }