From: Vsevolod Stakhov Date: Thu, 13 Sep 2018 11:45:50 +0000 (+0100) Subject: [Fix] Fix watchers in lua_tcp X-Git-Tag: 1.8.0~119 X-Git-Url: https://source.dussan.org/?a=commitdiff_plain;h=2a415e2c1d2ed0ecc790593e430dba1cb1c865df;p=rspamd.git [Fix] Fix watchers in lua_tcp --- diff --git a/src/lua/lua_http.c b/src/lua/lua_http.c index 4471b01ce..96872108c 100644 --- a/src/lua/lua_http.c +++ b/src/lua/lua_http.c @@ -897,6 +897,10 @@ lua_http_request (lua_State *L) lua_http_maybe_free (cbd); lua_pushboolean (L, FALSE); + if (cbd->w) { + rspamd_session_watcher_pop (cbd->session, cbd->w); + } + return 1; } } @@ -912,9 +916,14 @@ lua_http_request (lua_State *L) lua_pushboolean (L, FALSE); g_free (to_resolve); + if (cbd->w) { + rspamd_session_watcher_pop (cbd->session, cbd->w); + } + return 1; } + g_free (to_resolve); } else { @@ -925,6 +934,10 @@ lua_http_request (lua_State *L) lua_http_maybe_free (cbd); lua_pushboolean (L, FALSE); + if (cbd->w) { + rspamd_session_watcher_pop (cbd->session, cbd->w); + } + return 1; } } @@ -932,11 +945,14 @@ lua_http_request (lua_State *L) if (cbd->cbref == -1) { cbd->thread = lua_thread_pool_get_running_entry (cfg->lua_thread_pool); + return lua_thread_yield (cbd->thread, 0); - } else { + } + else { lua_pushboolean (L, TRUE); - return 1; } + + return 1; } static gint diff --git a/src/lua/lua_tcp.c b/src/lua/lua_tcp.c index 099fc6896..ef4b4cb5a 100644 --- a/src/lua/lua_tcp.c +++ b/src/lua/lua_tcp.c @@ -305,6 +305,7 @@ struct lua_tcp_dtor { #define LUA_TCP_FLAG_CONNECTED (1 << 3) #define LUA_TCP_FLAG_FINISHED (1 << 4) #define LUA_TCP_FLAG_SYNC (1 << 5) +#define LUA_TCP_FLAG_RESOLVED (1 << 6) #undef TCP_DEBUG_REFS #ifdef TCP_DEBUG_REFS @@ -480,15 +481,23 @@ lua_tcp_maybe_free (struct lua_tcp_cbdata *cbd) * in this mode, we don't remove object, we only remove the event * Object is owned by lua and will be destroyed on __gc() */ - if (cbd->async_ev) { + + if (cbd->w) { rspamd_session_watcher_pop (cbd->session, cbd->w); + } + + if (cbd->async_ev) { rspamd_session_remove_event (cbd->session, lua_tcp_void_finalyser, cbd); } + cbd->async_ev = NULL; } else { - if (cbd->async_ev) { + if (cbd->w) { rspamd_session_watcher_pop (cbd->session, cbd->w); + } + + if (cbd->async_ev) { rspamd_session_remove_event (cbd->session, lua_tcp_fin, cbd); } else { @@ -514,6 +523,7 @@ lua_tcp_push_error (struct lua_tcp_cbdata *cbd, gboolean is_fatal, struct lua_tcp_handler *hdl; gint cbref, top; struct lua_callback_state cbs; + struct rspamd_async_watcher *existing_watcher = NULL; lua_State *L; if (cbd->thread) { @@ -560,10 +570,21 @@ lua_tcp_push_error (struct lua_tcp_cbdata *cbd, gboolean is_fatal, rspamd_lua_setclass (L, "rspamd{tcp}", -1); TCP_RETAIN (cbd); + if (cbd->w) { + /* Replace watcher to deal with nested calls */ + existing_watcher = rspamd_session_replace_watcher (cbd->session, cbd->w); + } + if (lua_pcall (L, 3, 0, 0) != 0) { msg_info ("callback call failed: %s", lua_tostring (L, -1)); } + + if (cbd->w) { + /* Restore existing watcher */ + rspamd_session_replace_watcher (cbd->session, existing_watcher); + } + lua_settop (L, top); TCP_RELEASE (cbd); @@ -594,6 +615,7 @@ lua_tcp_push_data (struct lua_tcp_cbdata *cbd, const guint8 *str, gsize len) gint cbref, arg_cnt, top; struct lua_callback_state cbs; lua_State *L; + struct rspamd_async_watcher *existing_watcher = NULL; if (cbd->thread) { lua_tcp_resume_thread (cbd, str, len); @@ -639,10 +661,20 @@ lua_tcp_push_data (struct lua_tcp_cbdata *cbd, const guint8 *str, gsize len) TCP_RETAIN (cbd); + if (cbd->w) { + /* Replace watcher to deal with nested calls */ + existing_watcher = rspamd_session_replace_watcher (cbd->session, cbd->w); + } + if (lua_pcall (L, arg_cnt, 0, 0) != 0) { msg_info ("callback call failed: %s", lua_tostring (L, -1)); } + if (cbd->w) { + /* Restore existing watcher */ + rspamd_session_replace_watcher (cbd->session, existing_watcher); + } + lua_settop (L, top); TCP_RELEASE (cbd); } @@ -669,23 +701,25 @@ lua_tcp_resume_thread_error_argp (struct lua_tcp_cbdata *cbd, const gchar *error static void lua_tcp_resume_thread (struct lua_tcp_cbdata *cbd, const guint8 *str, gsize len) { -/* - * typical call returns: - * - * read: - * error: - * (nil, error message) - * got data: - * (true, data) - * write/connect: - * error: - * (nil, error message) - * wrote - * (true) - */ + /* + * typical call returns: + * + * read: + * error: + * (nil, error message) + * got data: + * (true, data) + * write/connect: + * error: + * (nil, error message) + * wrote + * (true) + */ lua_State *L = cbd->thread->lua_state; struct lua_tcp_handler *hdl; + struct rspamd_async_watcher *existing_watcher = NULL; + hdl = g_queue_peek_head (cbd->handlers); lua_pushboolean (L, TRUE); @@ -695,10 +729,22 @@ lua_tcp_resume_thread (struct lua_tcp_cbdata *cbd, const guint8 *str, gsize len) else { lua_pushnil (L); } + lua_tcp_shift_handler (cbd); lua_thread_pool_set_running_entry (cbd->cfg->lua_thread_pool, cbd->thread); + + if (cbd->w) { + /* Replace watcher to deal with nested calls */ + existing_watcher = rspamd_session_replace_watcher (cbd->session, cbd->w); + } + lua_thread_resume (cbd->thread, 2); + if (cbd->w) { + /* Restore existing watcher */ + rspamd_session_replace_watcher (cbd->session, existing_watcher); + } + TCP_RELEASE (cbd); } @@ -992,11 +1038,12 @@ lua_tcp_handler (int fd, short what, gpointer ud) cbd->flags |= LUA_TCP_FLAG_CONNECTED; if (cbd->connect_cb != -1) { - lua_thread_pool_prepare_callback (cbd->cfg->lua_thread_pool, &cbs); - L = cbs.L; - struct lua_tcp_cbdata **pcbd; gint top; + struct rspamd_async_watcher *existing_watcher = NULL; + + lua_thread_pool_prepare_callback (cbd->cfg->lua_thread_pool, &cbs); + L = cbs.L; top = lua_gettop (L); lua_rawgeti (L, LUA_REGISTRYINDEX, cbd->connect_cb); @@ -1005,10 +1052,22 @@ lua_tcp_handler (int fd, short what, gpointer ud) TCP_RETAIN (cbd); rspamd_lua_setclass (L, "rspamd{tcp}", -1); + if (cbd->w) { + /* Replace watcher to deal with nested calls */ + existing_watcher = rspamd_session_replace_watcher ( + cbd->session, cbd->w); + } + if (lua_pcall (L, 1, 0, 0) != 0) { msg_info ("callback call failed: %s", lua_tostring (L, -1)); } + if (cbd->w) { + /* Restore existing watcher */ + rspamd_session_replace_watcher (cbd->session, + existing_watcher); + } + lua_settop (L, top); TCP_RELEASE (cbd); @@ -1138,14 +1197,23 @@ lua_tcp_register_event (struct lua_tcp_cbdata *cbd) if (!cbd->async_ev) { return FALSE; } - - cbd->w = rspamd_session_get_watcher (cbd->session); - rspamd_session_watcher_push (cbd->session); } return TRUE; } +static void +lua_tcp_register_watcher (struct lua_tcp_cbdata *cbd) +{ + if (cbd->session) { + cbd->w = rspamd_session_get_watcher (cbd->session); + + if (cbd->w) { + rspamd_session_watcher_push (cbd->session); + } + } +} + static gboolean lua_tcp_make_connection (struct lua_tcp_cbdata *cbd) { @@ -1170,6 +1238,9 @@ lua_tcp_make_connection (struct lua_tcp_cbdata *cbd) return FALSE; } + cbd->flags |= LUA_TCP_FLAG_RESOLVED; + lua_tcp_register_event (cbd); + cbd->fd = fd; lua_tcp_plan_handler_event (cbd, TRUE, TRUE); @@ -1576,6 +1647,8 @@ lua_tcp_request (lua_State *L) return 1; } + + lua_tcp_register_watcher (cbd); } if (rspamd_parse_inet_address (&cbd->addr, host, 0)) { @@ -1585,11 +1658,12 @@ lua_tcp_request (lua_State *L) TCP_RELEASE (cbd); lua_pushboolean (L, FALSE); + if (cbd->w) { + rspamd_session_watcher_pop (cbd->session, cbd->w); + } + return 1; } - else { - lua_tcp_register_event (cbd); - } } else { if (task == NULL) { @@ -1598,11 +1672,13 @@ lua_tcp_request (lua_State *L) lua_tcp_push_error (cbd, TRUE, "cannot resolve host: %s", host); TCP_RELEASE (cbd); lua_pushboolean (L, FALSE); + + if (cbd->w) { + rspamd_session_watcher_pop (cbd->session, cbd->w); + } + return 1; } - else { - lua_tcp_register_event (cbd); - } } else { if (!make_dns_request_task (task, lua_tcp_dns_handler, cbd, @@ -1610,11 +1686,13 @@ lua_tcp_request (lua_State *L) lua_tcp_push_error (cbd, TRUE, "cannot resolve host: %s", host); TCP_RELEASE (cbd); lua_pushboolean (L, FALSE); + + if (cbd->w) { + rspamd_session_watcher_pop (cbd->session, cbd->w); + } + return 1; } - else { - lua_tcp_register_event (cbd); - } } } @@ -1741,6 +1819,8 @@ lua_tcp_connect_sync (lua_State *L) return 2; } + + lua_tcp_register_watcher (cbd); } if (rspamd_parse_inet_address (&cbd->addr, host, 0)) { @@ -1751,11 +1831,12 @@ lua_tcp_connect_sync (lua_State *L) lua_pushboolean (L, FALSE); lua_pushliteral (L, "Failed to initiate connection"); + if (cbd->w) { + rspamd_session_watcher_pop (cbd->session, cbd->w); + } + return 2; } - else { - lua_tcp_register_event (cbd); - } } else { if (task == NULL) { @@ -1765,11 +1846,12 @@ lua_tcp_connect_sync (lua_State *L) lua_pushboolean (L, FALSE); lua_pushliteral (L, "Failed to initiate dns request"); + if (cbd->w) { + rspamd_session_watcher_pop (cbd->session, cbd->w); + } + return 2; } - else { - lua_tcp_register_event (cbd); - } } else { if (!make_dns_request_task (task, lua_tcp_dns_handler, cbd, @@ -1778,11 +1860,12 @@ lua_tcp_connect_sync (lua_State *L) lua_pushboolean (L, FALSE); lua_pushliteral (L, "Failed to initiate dns request"); + if (cbd->w) { + rspamd_session_watcher_pop (cbd->session, cbd->w); + } + return 2; } - else { - lua_tcp_register_event (cbd); - } } }