#define LUA_TCP_FLAG_CONNECTED (1 << 3)
#define LUA_TCP_FLAG_FINISHED (1 << 4)
#define LUA_TCP_FLAG_SYNC (1 << 5)
+#define LUA_TCP_FLAG_RESOLVED (1 << 6)
#undef TCP_DEBUG_REFS
#ifdef TCP_DEBUG_REFS
* in this mode, we don't remove object, we only remove the event
* Object is owned by lua and will be destroyed on __gc()
*/
- if (cbd->async_ev) {
+
+ if (cbd->w) {
rspamd_session_watcher_pop (cbd->session, cbd->w);
+ }
+
+ if (cbd->async_ev) {
rspamd_session_remove_event (cbd->session, lua_tcp_void_finalyser, cbd);
}
+
cbd->async_ev = NULL;
}
else {
- if (cbd->async_ev) {
+ if (cbd->w) {
rspamd_session_watcher_pop (cbd->session, cbd->w);
+ }
+
+ if (cbd->async_ev) {
rspamd_session_remove_event (cbd->session, lua_tcp_fin, cbd);
}
else {
struct lua_tcp_handler *hdl;
gint cbref, top;
struct lua_callback_state cbs;
+ struct rspamd_async_watcher *existing_watcher = NULL;
lua_State *L;
if (cbd->thread) {
rspamd_lua_setclass (L, "rspamd{tcp}", -1);
TCP_RETAIN (cbd);
+ if (cbd->w) {
+ /* Replace watcher to deal with nested calls */
+ existing_watcher = rspamd_session_replace_watcher (cbd->session, cbd->w);
+ }
+
if (lua_pcall (L, 3, 0, 0) != 0) {
msg_info ("callback call failed: %s", lua_tostring (L, -1));
}
+
+ if (cbd->w) {
+ /* Restore existing watcher */
+ rspamd_session_replace_watcher (cbd->session, existing_watcher);
+ }
+
lua_settop (L, top);
TCP_RELEASE (cbd);
gint cbref, arg_cnt, top;
struct lua_callback_state cbs;
lua_State *L;
+ struct rspamd_async_watcher *existing_watcher = NULL;
if (cbd->thread) {
lua_tcp_resume_thread (cbd, str, len);
TCP_RETAIN (cbd);
+ if (cbd->w) {
+ /* Replace watcher to deal with nested calls */
+ existing_watcher = rspamd_session_replace_watcher (cbd->session, cbd->w);
+ }
+
if (lua_pcall (L, arg_cnt, 0, 0) != 0) {
msg_info ("callback call failed: %s", lua_tostring (L, -1));
}
+ if (cbd->w) {
+ /* Restore existing watcher */
+ rspamd_session_replace_watcher (cbd->session, existing_watcher);
+ }
+
lua_settop (L, top);
TCP_RELEASE (cbd);
}
static void
lua_tcp_resume_thread (struct lua_tcp_cbdata *cbd, const guint8 *str, gsize len)
{
-/*
- * typical call returns:
- *
- * read:
- * error:
- * (nil, error message)
- * got data:
- * (true, data)
- * write/connect:
- * error:
- * (nil, error message)
- * wrote
- * (true)
- */
+ /*
+ * typical call returns:
+ *
+ * read:
+ * error:
+ * (nil, error message)
+ * got data:
+ * (true, data)
+ * write/connect:
+ * error:
+ * (nil, error message)
+ * wrote
+ * (true)
+ */
lua_State *L = cbd->thread->lua_state;
struct lua_tcp_handler *hdl;
+ struct rspamd_async_watcher *existing_watcher = NULL;
+
hdl = g_queue_peek_head (cbd->handlers);
lua_pushboolean (L, TRUE);
else {
lua_pushnil (L);
}
+
lua_tcp_shift_handler (cbd);
lua_thread_pool_set_running_entry (cbd->cfg->lua_thread_pool, cbd->thread);
+
+ if (cbd->w) {
+ /* Replace watcher to deal with nested calls */
+ existing_watcher = rspamd_session_replace_watcher (cbd->session, cbd->w);
+ }
+
lua_thread_resume (cbd->thread, 2);
+ if (cbd->w) {
+ /* Restore existing watcher */
+ rspamd_session_replace_watcher (cbd->session, existing_watcher);
+ }
+
TCP_RELEASE (cbd);
}
cbd->flags |= LUA_TCP_FLAG_CONNECTED;
if (cbd->connect_cb != -1) {
- lua_thread_pool_prepare_callback (cbd->cfg->lua_thread_pool, &cbs);
- L = cbs.L;
-
struct lua_tcp_cbdata **pcbd;
gint top;
+ struct rspamd_async_watcher *existing_watcher = NULL;
+
+ lua_thread_pool_prepare_callback (cbd->cfg->lua_thread_pool, &cbs);
+ L = cbs.L;
top = lua_gettop (L);
lua_rawgeti (L, LUA_REGISTRYINDEX, cbd->connect_cb);
TCP_RETAIN (cbd);
rspamd_lua_setclass (L, "rspamd{tcp}", -1);
+ if (cbd->w) {
+ /* Replace watcher to deal with nested calls */
+ existing_watcher = rspamd_session_replace_watcher (
+ cbd->session, cbd->w);
+ }
+
if (lua_pcall (L, 1, 0, 0) != 0) {
msg_info ("callback call failed: %s", lua_tostring (L, -1));
}
+ if (cbd->w) {
+ /* Restore existing watcher */
+ rspamd_session_replace_watcher (cbd->session,
+ existing_watcher);
+ }
+
lua_settop (L, top);
TCP_RELEASE (cbd);
if (!cbd->async_ev) {
return FALSE;
}
-
- cbd->w = rspamd_session_get_watcher (cbd->session);
- rspamd_session_watcher_push (cbd->session);
}
return TRUE;
}
+static void
+lua_tcp_register_watcher (struct lua_tcp_cbdata *cbd)
+{
+ if (cbd->session) {
+ cbd->w = rspamd_session_get_watcher (cbd->session);
+
+ if (cbd->w) {
+ rspamd_session_watcher_push (cbd->session);
+ }
+ }
+}
+
static gboolean
lua_tcp_make_connection (struct lua_tcp_cbdata *cbd)
{
return FALSE;
}
+ cbd->flags |= LUA_TCP_FLAG_RESOLVED;
+ lua_tcp_register_event (cbd);
+
cbd->fd = fd;
lua_tcp_plan_handler_event (cbd, TRUE, TRUE);
return 1;
}
+
+ lua_tcp_register_watcher (cbd);
}
if (rspamd_parse_inet_address (&cbd->addr, host, 0)) {
TCP_RELEASE (cbd);
lua_pushboolean (L, FALSE);
+ if (cbd->w) {
+ rspamd_session_watcher_pop (cbd->session, cbd->w);
+ }
+
return 1;
}
- else {
- lua_tcp_register_event (cbd);
- }
}
else {
if (task == NULL) {
lua_tcp_push_error (cbd, TRUE, "cannot resolve host: %s", host);
TCP_RELEASE (cbd);
lua_pushboolean (L, FALSE);
+
+ if (cbd->w) {
+ rspamd_session_watcher_pop (cbd->session, cbd->w);
+ }
+
return 1;
}
- else {
- lua_tcp_register_event (cbd);
- }
}
else {
if (!make_dns_request_task (task, lua_tcp_dns_handler, cbd,
lua_tcp_push_error (cbd, TRUE, "cannot resolve host: %s", host);
TCP_RELEASE (cbd);
lua_pushboolean (L, FALSE);
+
+ if (cbd->w) {
+ rspamd_session_watcher_pop (cbd->session, cbd->w);
+ }
+
return 1;
}
- else {
- lua_tcp_register_event (cbd);
- }
}
}
return 2;
}
+
+ lua_tcp_register_watcher (cbd);
}
if (rspamd_parse_inet_address (&cbd->addr, host, 0)) {
lua_pushboolean (L, FALSE);
lua_pushliteral (L, "Failed to initiate connection");
+ if (cbd->w) {
+ rspamd_session_watcher_pop (cbd->session, cbd->w);
+ }
+
return 2;
}
- else {
- lua_tcp_register_event (cbd);
- }
}
else {
if (task == NULL) {
lua_pushboolean (L, FALSE);
lua_pushliteral (L, "Failed to initiate dns request");
+ if (cbd->w) {
+ rspamd_session_watcher_pop (cbd->session, cbd->w);
+ }
+
return 2;
}
- else {
- lua_tcp_register_event (cbd);
- }
}
else {
if (!make_dns_request_task (task, lua_tcp_dns_handler, cbd,
lua_pushboolean (L, FALSE);
lua_pushliteral (L, "Failed to initiate dns request");
+ if (cbd->w) {
+ rspamd_session_watcher_pop (cbd->session, cbd->w);
+ }
+
return 2;
}
- else {
- lua_tcp_register_event (cbd);
- }
}
}