diff options
Diffstat (limited to 'src/lua/lua_tcp.c')
-rw-r--r-- | src/lua/lua_tcp.c | 99 |
1 files changed, 41 insertions, 58 deletions
diff --git a/src/lua/lua_tcp.c b/src/lua/lua_tcp.c index f9c1a477d..047bfe444 100644 --- a/src/lua/lua_tcp.c +++ b/src/lua/lua_tcp.c @@ -19,6 +19,8 @@ #include "unix-std.h" #include <math.h> +static const gchar *M = "rspamd lua tcp"; + /*** * @module rspamd_tcp * Rspamd TCP module represents generic TCP asynchronous client available from LUA code. @@ -336,11 +338,11 @@ struct lua_tcp_cbdata { guint port; guint flags; gchar tag[7]; - struct rspamd_async_watcher *w; struct event ev; struct lua_tcp_dtor *dtors; ref_entry_t ref; struct rspamd_task *task; + struct rspamd_symcache_item *item; struct thread_entry *thread; struct rspamd_config *cfg; gboolean eof; @@ -482,10 +484,10 @@ lua_tcp_maybe_free (struct lua_tcp_cbdata *cbd) * Object is owned by lua and will be destroyed on __gc() */ - if (cbd->w) { - rspamd_session_watcher_pop (cbd->session, cbd->w); + if (cbd->item) { + rspamd_symcache_item_async_dec_check (cbd->task, cbd->item, M); + cbd->item = NULL; } - cbd->w = NULL; if (cbd->async_ev) { rspamd_session_remove_event (cbd->session, lua_tcp_void_finalyser, cbd); @@ -494,10 +496,10 @@ lua_tcp_maybe_free (struct lua_tcp_cbdata *cbd) cbd->async_ev = NULL; } else { - if (cbd->w) { - rspamd_session_watcher_pop (cbd->session, cbd->w); + if (cbd->item) { + rspamd_symcache_item_async_dec_check (cbd->task, cbd->item, M); + cbd->item = NULL; } - cbd->w = NULL; if (cbd->async_ev) { rspamd_session_remove_event (cbd->session, lua_tcp_fin, cbd); @@ -525,7 +527,6 @@ lua_tcp_push_error (struct lua_tcp_cbdata *cbd, gboolean is_fatal, struct lua_tcp_handler *hdl; gint cbref, top; struct lua_callback_state cbs; - struct rspamd_async_watcher *existing_watcher = NULL; lua_State *L; if (cbd->thread) { @@ -572,21 +573,14 @@ lua_tcp_push_error (struct lua_tcp_cbdata *cbd, gboolean is_fatal, rspamd_lua_setclass (L, "rspamd{tcp}", -1); TCP_RETAIN (cbd); - if (cbd->w) { - /* Replace watcher to deal with nested calls */ - existing_watcher = rspamd_session_replace_watcher (cbd->session, cbd->w); + if (cbd->item) { + rspamd_symcache_set_cur_item (cbd->task, cbd->item); } if (lua_pcall (L, 3, 0, 0) != 0) { msg_info ("callback call failed: %s", lua_tostring (L, -1)); } - - if (cbd->w) { - /* Restore existing watcher */ - rspamd_session_replace_watcher (cbd->session, existing_watcher); - } - lua_settop (L, top); TCP_RELEASE (cbd); @@ -617,7 +611,6 @@ lua_tcp_push_data (struct lua_tcp_cbdata *cbd, const guint8 *str, gsize len) gint cbref, arg_cnt, top; struct lua_callback_state cbs; lua_State *L; - struct rspamd_async_watcher *existing_watcher = NULL; if (cbd->thread) { lua_tcp_resume_thread (cbd, str, len); @@ -663,20 +656,14 @@ lua_tcp_push_data (struct lua_tcp_cbdata *cbd, const guint8 *str, gsize len) TCP_RETAIN (cbd); - if (cbd->w) { - /* Replace watcher to deal with nested calls */ - existing_watcher = rspamd_session_replace_watcher (cbd->session, cbd->w); + if (cbd->item) { + rspamd_symcache_set_cur_item (cbd->task, cbd->item); } if (lua_pcall (L, arg_cnt, 0, 0) != 0) { msg_info ("callback call failed: %s", lua_tostring (L, -1)); } - if (cbd->w) { - /* Restore existing watcher */ - rspamd_session_replace_watcher (cbd->session, existing_watcher); - } - lua_settop (L, top); TCP_RELEASE (cbd); } @@ -720,7 +707,6 @@ lua_tcp_resume_thread (struct lua_tcp_cbdata *cbd, const guint8 *str, gsize len) lua_State *L = cbd->thread->lua_state; struct lua_tcp_handler *hdl; - struct rspamd_async_watcher *existing_watcher = NULL; hdl = g_queue_peek_head (cbd->handlers); @@ -735,18 +721,12 @@ lua_tcp_resume_thread (struct lua_tcp_cbdata *cbd, const guint8 *str, gsize len) lua_tcp_shift_handler (cbd); lua_thread_pool_set_running_entry (cbd->cfg->lua_thread_pool, cbd->thread); - if (cbd->w) { - /* Replace watcher to deal with nested calls */ - existing_watcher = rspamd_session_replace_watcher (cbd->session, cbd->w); + if (cbd->item) { + rspamd_symcache_set_cur_item (cbd->task, cbd->item); } lua_thread_resume (cbd->thread, 2); - if (cbd->w) { - /* Restore existing watcher */ - rspamd_session_replace_watcher (cbd->session, existing_watcher); - } - TCP_RELEASE (cbd); } @@ -915,7 +895,15 @@ lua_tcp_process_read_handler (struct lua_tcp_cbdata *cbd, else { /* Plan new read */ msg_debug_tcp ("NOT found TCP stop pattern"); - lua_tcp_plan_read (cbd); + + if (!cbd->eof) { + lua_tcp_plan_read (cbd); + } + else { + /* Got session finished but no stop pattern */ + lua_tcp_push_error (cbd, TRUE, + "IO read error: connection terminated"); + } } } } @@ -977,7 +965,7 @@ lua_tcp_process_read (struct lua_tcp_cbdata *cbd, lua_tcp_process_read_handler (cbd, rh, TRUE); } else { - lua_tcp_push_error (cbd, FALSE, "IO read error: connection terminated"); + lua_tcp_push_error (cbd, TRUE, "IO read error: connection terminated"); } lua_tcp_plan_handler_event (cbd, FALSE, TRUE); @@ -1042,7 +1030,6 @@ lua_tcp_handler (int fd, short what, gpointer ud) if (cbd->connect_cb != -1) { struct lua_tcp_cbdata **pcbd; gint top; - struct rspamd_async_watcher *existing_watcher = NULL; lua_thread_pool_prepare_callback (cbd->cfg->lua_thread_pool, &cbs); L = cbs.L; @@ -1054,26 +1041,16 @@ lua_tcp_handler (int fd, short what, gpointer ud) TCP_RETAIN (cbd); rspamd_lua_setclass (L, "rspamd{tcp}", -1); - if (cbd->w) { - /* Replace watcher to deal with nested calls */ - existing_watcher = rspamd_session_replace_watcher ( - cbd->session, cbd->w); + if (cbd->item) { + rspamd_symcache_set_cur_item (cbd->task, cbd->item); } if (lua_pcall (L, 1, 0, 0) != 0) { msg_info ("callback call failed: %s", lua_tostring (L, -1)); } - if (cbd->w) { - /* Restore existing watcher */ - rspamd_session_replace_watcher (cbd->session, - existing_watcher); - } - lua_settop (L, top); - TCP_RELEASE (cbd); - lua_thread_pool_restore_callback (&cbs); } } @@ -1194,8 +1171,7 @@ lua_tcp_register_event (struct lua_tcp_cbdata *cbd) if (cbd->session) { event_finalizer_t fin = IS_SYNC (cbd) ? lua_tcp_void_finalyser : lua_tcp_fin; - cbd->async_ev = rspamd_session_add_event (cbd->session, NULL, fin, cbd, - g_quark_from_static_string ("lua tcp")); + cbd->async_ev = rspamd_session_add_event (cbd->session, fin, cbd, M); if (!cbd->async_ev) { return FALSE; @@ -1208,12 +1184,8 @@ lua_tcp_register_event (struct lua_tcp_cbdata *cbd) static void lua_tcp_register_watcher (struct lua_tcp_cbdata *cbd) { - if (cbd->session) { - cbd->w = rspamd_session_get_watcher (cbd->session); - - if (cbd->w) { - rspamd_session_watcher_push (cbd->session); - } + if (cbd->item) { + rspamd_symcache_item_async_inc (cbd->task, cbd->item, M); } } @@ -1241,6 +1213,7 @@ lua_tcp_make_connection (struct lua_tcp_cbdata *cbd) return FALSE; } +#if 0 if (!(cbd->flags & LUA_TCP_FLAG_RESOLVED)) { /* We come here without resolving, so we need to add a watcher */ lua_tcp_register_watcher (cbd); @@ -1248,6 +1221,7 @@ lua_tcp_make_connection (struct lua_tcp_cbdata *cbd) else { cbd->flags |= LUA_TCP_FLAG_RESOLVED; } +#endif lua_tcp_register_event (cbd); @@ -1590,6 +1564,11 @@ lua_tcp_request (lua_State *L) } cbd->task = task; + + if (task) { + cbd->item = rspamd_symcache_get_cur_item (task); + } + cbd->cfg = cfg; h = rspamd_random_uint64_fast (); rspamd_snprintf (cbd->tag, sizeof (cbd->tag), "%uxL", h); @@ -1668,6 +1647,8 @@ lua_tcp_request (lua_State *L) if (rspamd_parse_inet_address (&cbd->addr, host, 0)) { rspamd_inet_address_set_port (cbd->addr, port); /* Host is numeric IP, no need to resolve */ + lua_tcp_register_watcher (cbd); + if (!lua_tcp_make_connection (cbd)) { lua_pushboolean (L, FALSE); @@ -1860,6 +1841,8 @@ lua_tcp_connect_sync (lua_State *L) } } else { + cbd->item = rspamd_symcache_get_cur_item (task); + if (!make_dns_request_task (task, lua_tcp_dns_handler, cbd, RDNS_REQUEST_A, host)) { lua_pushboolean (L, FALSE); |