aboutsummaryrefslogtreecommitdiffstats
path: root/src/lua/lua_tcp.c
diff options
context:
space:
mode:
authorVsevolod Stakhov <vsevolod@highsecure.ru>2018-09-13 12:45:50 +0100
committerVsevolod Stakhov <vsevolod@highsecure.ru>2018-09-13 12:45:50 +0100
commit2a415e2c1d2ed0ecc790593e430dba1cb1c865df (patch)
tree47354d4952c0891405e2ed5d3b07ca2126408ee5 /src/lua/lua_tcp.c
parentd8837eff359da457a5573edc9fdad5d04a41ad12 (diff)
downloadrspamd-2a415e2c1d2ed0ecc790593e430dba1cb1c865df.tar.gz
rspamd-2a415e2c1d2ed0ecc790593e430dba1cb1c865df.zip
[Fix] Fix watchers in lua_tcp
Diffstat (limited to 'src/lua/lua_tcp.c')
-rw-r--r--src/lua/lua_tcp.c163
1 files changed, 123 insertions, 40 deletions
diff --git a/src/lua/lua_tcp.c b/src/lua/lua_tcp.c
index 099fc6896..ef4b4cb5a 100644
--- a/src/lua/lua_tcp.c
+++ b/src/lua/lua_tcp.c
@@ -305,6 +305,7 @@ struct lua_tcp_dtor {
#define LUA_TCP_FLAG_CONNECTED (1 << 3)
#define LUA_TCP_FLAG_FINISHED (1 << 4)
#define LUA_TCP_FLAG_SYNC (1 << 5)
+#define LUA_TCP_FLAG_RESOLVED (1 << 6)
#undef TCP_DEBUG_REFS
#ifdef TCP_DEBUG_REFS
@@ -480,15 +481,23 @@ lua_tcp_maybe_free (struct lua_tcp_cbdata *cbd)
* in this mode, we don't remove object, we only remove the event
* Object is owned by lua and will be destroyed on __gc()
*/
- if (cbd->async_ev) {
+
+ if (cbd->w) {
rspamd_session_watcher_pop (cbd->session, cbd->w);
+ }
+
+ if (cbd->async_ev) {
rspamd_session_remove_event (cbd->session, lua_tcp_void_finalyser, cbd);
}
+
cbd->async_ev = NULL;
}
else {
- if (cbd->async_ev) {
+ if (cbd->w) {
rspamd_session_watcher_pop (cbd->session, cbd->w);
+ }
+
+ if (cbd->async_ev) {
rspamd_session_remove_event (cbd->session, lua_tcp_fin, cbd);
}
else {
@@ -514,6 +523,7 @@ lua_tcp_push_error (struct lua_tcp_cbdata *cbd, gboolean is_fatal,
struct lua_tcp_handler *hdl;
gint cbref, top;
struct lua_callback_state cbs;
+ struct rspamd_async_watcher *existing_watcher = NULL;
lua_State *L;
if (cbd->thread) {
@@ -560,10 +570,21 @@ lua_tcp_push_error (struct lua_tcp_cbdata *cbd, gboolean is_fatal,
rspamd_lua_setclass (L, "rspamd{tcp}", -1);
TCP_RETAIN (cbd);
+ if (cbd->w) {
+ /* Replace watcher to deal with nested calls */
+ existing_watcher = rspamd_session_replace_watcher (cbd->session, cbd->w);
+ }
+
if (lua_pcall (L, 3, 0, 0) != 0) {
msg_info ("callback call failed: %s", lua_tostring (L, -1));
}
+
+ if (cbd->w) {
+ /* Restore existing watcher */
+ rspamd_session_replace_watcher (cbd->session, existing_watcher);
+ }
+
lua_settop (L, top);
TCP_RELEASE (cbd);
@@ -594,6 +615,7 @@ lua_tcp_push_data (struct lua_tcp_cbdata *cbd, const guint8 *str, gsize len)
gint cbref, arg_cnt, top;
struct lua_callback_state cbs;
lua_State *L;
+ struct rspamd_async_watcher *existing_watcher = NULL;
if (cbd->thread) {
lua_tcp_resume_thread (cbd, str, len);
@@ -639,10 +661,20 @@ lua_tcp_push_data (struct lua_tcp_cbdata *cbd, const guint8 *str, gsize len)
TCP_RETAIN (cbd);
+ if (cbd->w) {
+ /* Replace watcher to deal with nested calls */
+ existing_watcher = rspamd_session_replace_watcher (cbd->session, cbd->w);
+ }
+
if (lua_pcall (L, arg_cnt, 0, 0) != 0) {
msg_info ("callback call failed: %s", lua_tostring (L, -1));
}
+ if (cbd->w) {
+ /* Restore existing watcher */
+ rspamd_session_replace_watcher (cbd->session, existing_watcher);
+ }
+
lua_settop (L, top);
TCP_RELEASE (cbd);
}
@@ -669,23 +701,25 @@ lua_tcp_resume_thread_error_argp (struct lua_tcp_cbdata *cbd, const gchar *error
static void
lua_tcp_resume_thread (struct lua_tcp_cbdata *cbd, const guint8 *str, gsize len)
{
-/*
- * typical call returns:
- *
- * read:
- * error:
- * (nil, error message)
- * got data:
- * (true, data)
- * write/connect:
- * error:
- * (nil, error message)
- * wrote
- * (true)
- */
+ /*
+ * typical call returns:
+ *
+ * read:
+ * error:
+ * (nil, error message)
+ * got data:
+ * (true, data)
+ * write/connect:
+ * error:
+ * (nil, error message)
+ * wrote
+ * (true)
+ */
lua_State *L = cbd->thread->lua_state;
struct lua_tcp_handler *hdl;
+ struct rspamd_async_watcher *existing_watcher = NULL;
+
hdl = g_queue_peek_head (cbd->handlers);
lua_pushboolean (L, TRUE);
@@ -695,10 +729,22 @@ lua_tcp_resume_thread (struct lua_tcp_cbdata *cbd, const guint8 *str, gsize len)
else {
lua_pushnil (L);
}
+
lua_tcp_shift_handler (cbd);
lua_thread_pool_set_running_entry (cbd->cfg->lua_thread_pool, cbd->thread);
+
+ if (cbd->w) {
+ /* Replace watcher to deal with nested calls */
+ existing_watcher = rspamd_session_replace_watcher (cbd->session, cbd->w);
+ }
+
lua_thread_resume (cbd->thread, 2);
+ if (cbd->w) {
+ /* Restore existing watcher */
+ rspamd_session_replace_watcher (cbd->session, existing_watcher);
+ }
+
TCP_RELEASE (cbd);
}
@@ -992,11 +1038,12 @@ lua_tcp_handler (int fd, short what, gpointer ud)
cbd->flags |= LUA_TCP_FLAG_CONNECTED;
if (cbd->connect_cb != -1) {
- lua_thread_pool_prepare_callback (cbd->cfg->lua_thread_pool, &cbs);
- L = cbs.L;
-
struct lua_tcp_cbdata **pcbd;
gint top;
+ struct rspamd_async_watcher *existing_watcher = NULL;
+
+ lua_thread_pool_prepare_callback (cbd->cfg->lua_thread_pool, &cbs);
+ L = cbs.L;
top = lua_gettop (L);
lua_rawgeti (L, LUA_REGISTRYINDEX, cbd->connect_cb);
@@ -1005,10 +1052,22 @@ lua_tcp_handler (int fd, short what, gpointer ud)
TCP_RETAIN (cbd);
rspamd_lua_setclass (L, "rspamd{tcp}", -1);
+ if (cbd->w) {
+ /* Replace watcher to deal with nested calls */
+ existing_watcher = rspamd_session_replace_watcher (
+ cbd->session, cbd->w);
+ }
+
if (lua_pcall (L, 1, 0, 0) != 0) {
msg_info ("callback call failed: %s", lua_tostring (L, -1));
}
+ if (cbd->w) {
+ /* Restore existing watcher */
+ rspamd_session_replace_watcher (cbd->session,
+ existing_watcher);
+ }
+
lua_settop (L, top);
TCP_RELEASE (cbd);
@@ -1138,14 +1197,23 @@ lua_tcp_register_event (struct lua_tcp_cbdata *cbd)
if (!cbd->async_ev) {
return FALSE;
}
-
- cbd->w = rspamd_session_get_watcher (cbd->session);
- rspamd_session_watcher_push (cbd->session);
}
return TRUE;
}
+static void
+lua_tcp_register_watcher (struct lua_tcp_cbdata *cbd)
+{
+ if (cbd->session) {
+ cbd->w = rspamd_session_get_watcher (cbd->session);
+
+ if (cbd->w) {
+ rspamd_session_watcher_push (cbd->session);
+ }
+ }
+}
+
static gboolean
lua_tcp_make_connection (struct lua_tcp_cbdata *cbd)
{
@@ -1170,6 +1238,9 @@ lua_tcp_make_connection (struct lua_tcp_cbdata *cbd)
return FALSE;
}
+ cbd->flags |= LUA_TCP_FLAG_RESOLVED;
+ lua_tcp_register_event (cbd);
+
cbd->fd = fd;
lua_tcp_plan_handler_event (cbd, TRUE, TRUE);
@@ -1576,6 +1647,8 @@ lua_tcp_request (lua_State *L)
return 1;
}
+
+ lua_tcp_register_watcher (cbd);
}
if (rspamd_parse_inet_address (&cbd->addr, host, 0)) {
@@ -1585,11 +1658,12 @@ lua_tcp_request (lua_State *L)
TCP_RELEASE (cbd);
lua_pushboolean (L, FALSE);
+ if (cbd->w) {
+ rspamd_session_watcher_pop (cbd->session, cbd->w);
+ }
+
return 1;
}
- else {
- lua_tcp_register_event (cbd);
- }
}
else {
if (task == NULL) {
@@ -1598,11 +1672,13 @@ lua_tcp_request (lua_State *L)
lua_tcp_push_error (cbd, TRUE, "cannot resolve host: %s", host);
TCP_RELEASE (cbd);
lua_pushboolean (L, FALSE);
+
+ if (cbd->w) {
+ rspamd_session_watcher_pop (cbd->session, cbd->w);
+ }
+
return 1;
}
- else {
- lua_tcp_register_event (cbd);
- }
}
else {
if (!make_dns_request_task (task, lua_tcp_dns_handler, cbd,
@@ -1610,11 +1686,13 @@ lua_tcp_request (lua_State *L)
lua_tcp_push_error (cbd, TRUE, "cannot resolve host: %s", host);
TCP_RELEASE (cbd);
lua_pushboolean (L, FALSE);
+
+ if (cbd->w) {
+ rspamd_session_watcher_pop (cbd->session, cbd->w);
+ }
+
return 1;
}
- else {
- lua_tcp_register_event (cbd);
- }
}
}
@@ -1741,6 +1819,8 @@ lua_tcp_connect_sync (lua_State *L)
return 2;
}
+
+ lua_tcp_register_watcher (cbd);
}
if (rspamd_parse_inet_address (&cbd->addr, host, 0)) {
@@ -1751,11 +1831,12 @@ lua_tcp_connect_sync (lua_State *L)
lua_pushboolean (L, FALSE);
lua_pushliteral (L, "Failed to initiate connection");
+ if (cbd->w) {
+ rspamd_session_watcher_pop (cbd->session, cbd->w);
+ }
+
return 2;
}
- else {
- lua_tcp_register_event (cbd);
- }
}
else {
if (task == NULL) {
@@ -1765,11 +1846,12 @@ lua_tcp_connect_sync (lua_State *L)
lua_pushboolean (L, FALSE);
lua_pushliteral (L, "Failed to initiate dns request");
+ if (cbd->w) {
+ rspamd_session_watcher_pop (cbd->session, cbd->w);
+ }
+
return 2;
}
- else {
- lua_tcp_register_event (cbd);
- }
}
else {
if (!make_dns_request_task (task, lua_tcp_dns_handler, cbd,
@@ -1778,11 +1860,12 @@ lua_tcp_connect_sync (lua_State *L)
lua_pushboolean (L, FALSE);
lua_pushliteral (L, "Failed to initiate dns request");
+ if (cbd->w) {
+ rspamd_session_watcher_pop (cbd->session, cbd->w);
+ }
+
return 2;
}
- else {
- lua_tcp_register_event (cbd);
- }
}
}