From 6be697a2ad9deac22629528c9214871dc40969af Mon Sep 17 00:00:00 2001 From: Vsevolod Stakhov Date: Sat, 20 Oct 2018 17:23:45 +0100 Subject: [PATCH] [Project] Adopt lua tcp --- src/lua/lua_tcp.c | 78 ++++++++++++++--------------------------------- 1 file changed, 23 insertions(+), 55 deletions(-) diff --git a/src/lua/lua_tcp.c b/src/lua/lua_tcp.c index f9c1a477d..c6e96825b 100644 --- a/src/lua/lua_tcp.c +++ b/src/lua/lua_tcp.c @@ -336,11 +336,11 @@ struct lua_tcp_cbdata { guint port; guint flags; gchar tag[7]; - struct rspamd_async_watcher *w; struct event ev; struct lua_tcp_dtor *dtors; ref_entry_t ref; struct rspamd_task *task; + struct rspamd_symcache_item *item; struct thread_entry *thread; struct rspamd_config *cfg; gboolean eof; @@ -482,10 +482,10 @@ lua_tcp_maybe_free (struct lua_tcp_cbdata *cbd) * Object is owned by lua and will be destroyed on __gc() */ - if (cbd->w) { - rspamd_session_watcher_pop (cbd->session, cbd->w); + if (cbd->item) { + rspamd_symcache_item_async_dec_check (cbd->task, cbd->item); + cbd->item = NULL; } - cbd->w = NULL; if (cbd->async_ev) { rspamd_session_remove_event (cbd->session, lua_tcp_void_finalyser, cbd); @@ -494,10 +494,10 @@ lua_tcp_maybe_free (struct lua_tcp_cbdata *cbd) cbd->async_ev = NULL; } else { - if (cbd->w) { - rspamd_session_watcher_pop (cbd->session, cbd->w); + if (cbd->item) { + rspamd_symcache_item_async_dec_check (cbd->task, cbd->item); + cbd->item = NULL; } - cbd->w = NULL; if (cbd->async_ev) { rspamd_session_remove_event (cbd->session, lua_tcp_fin, cbd); @@ -525,7 +525,6 @@ lua_tcp_push_error (struct lua_tcp_cbdata *cbd, gboolean is_fatal, struct lua_tcp_handler *hdl; gint cbref, top; struct lua_callback_state cbs; - struct rspamd_async_watcher *existing_watcher = NULL; lua_State *L; if (cbd->thread) { @@ -572,21 +571,14 @@ lua_tcp_push_error (struct lua_tcp_cbdata *cbd, gboolean is_fatal, rspamd_lua_setclass (L, "rspamd{tcp}", -1); TCP_RETAIN (cbd); - if (cbd->w) { - /* Replace watcher to deal with nested calls */ - existing_watcher = rspamd_session_replace_watcher (cbd->session, cbd->w); + if (cbd->item) { + rspamd_symbols_cache_set_cur_item (cbd->task, cbd->item); } if (lua_pcall (L, 3, 0, 0) != 0) { msg_info ("callback call failed: %s", lua_tostring (L, -1)); } - - if (cbd->w) { - /* Restore existing watcher */ - rspamd_session_replace_watcher (cbd->session, existing_watcher); - } - lua_settop (L, top); TCP_RELEASE (cbd); @@ -617,7 +609,6 @@ lua_tcp_push_data (struct lua_tcp_cbdata *cbd, const guint8 *str, gsize len) gint cbref, arg_cnt, top; struct lua_callback_state cbs; lua_State *L; - struct rspamd_async_watcher *existing_watcher = NULL; if (cbd->thread) { lua_tcp_resume_thread (cbd, str, len); @@ -663,20 +654,14 @@ lua_tcp_push_data (struct lua_tcp_cbdata *cbd, const guint8 *str, gsize len) TCP_RETAIN (cbd); - if (cbd->w) { - /* Replace watcher to deal with nested calls */ - existing_watcher = rspamd_session_replace_watcher (cbd->session, cbd->w); + if (cbd->item) { + rspamd_symbols_cache_set_cur_item (cbd->task, cbd->item); } if (lua_pcall (L, arg_cnt, 0, 0) != 0) { msg_info ("callback call failed: %s", lua_tostring (L, -1)); } - if (cbd->w) { - /* Restore existing watcher */ - rspamd_session_replace_watcher (cbd->session, existing_watcher); - } - lua_settop (L, top); TCP_RELEASE (cbd); } @@ -720,7 +705,6 @@ lua_tcp_resume_thread (struct lua_tcp_cbdata *cbd, const guint8 *str, gsize len) lua_State *L = cbd->thread->lua_state; struct lua_tcp_handler *hdl; - struct rspamd_async_watcher *existing_watcher = NULL; hdl = g_queue_peek_head (cbd->handlers); @@ -735,18 +719,12 @@ lua_tcp_resume_thread (struct lua_tcp_cbdata *cbd, const guint8 *str, gsize len) lua_tcp_shift_handler (cbd); lua_thread_pool_set_running_entry (cbd->cfg->lua_thread_pool, cbd->thread); - if (cbd->w) { - /* Replace watcher to deal with nested calls */ - existing_watcher = rspamd_session_replace_watcher (cbd->session, cbd->w); + if (cbd->item) { + rspamd_symbols_cache_set_cur_item (cbd->task, cbd->item); } lua_thread_resume (cbd->thread, 2); - if (cbd->w) { - /* Restore existing watcher */ - rspamd_session_replace_watcher (cbd->session, existing_watcher); - } - TCP_RELEASE (cbd); } @@ -1042,7 +1020,6 @@ lua_tcp_handler (int fd, short what, gpointer ud) if (cbd->connect_cb != -1) { struct lua_tcp_cbdata **pcbd; gint top; - struct rspamd_async_watcher *existing_watcher = NULL; lua_thread_pool_prepare_callback (cbd->cfg->lua_thread_pool, &cbs); L = cbs.L; @@ -1054,26 +1031,16 @@ lua_tcp_handler (int fd, short what, gpointer ud) TCP_RETAIN (cbd); rspamd_lua_setclass (L, "rspamd{tcp}", -1); - if (cbd->w) { - /* Replace watcher to deal with nested calls */ - existing_watcher = rspamd_session_replace_watcher ( - cbd->session, cbd->w); + if (cbd->item) { + rspamd_symbols_cache_set_cur_item (cbd->task, cbd->item); } if (lua_pcall (L, 1, 0, 0) != 0) { msg_info ("callback call failed: %s", lua_tostring (L, -1)); } - if (cbd->w) { - /* Restore existing watcher */ - rspamd_session_replace_watcher (cbd->session, - existing_watcher); - } - lua_settop (L, top); - TCP_RELEASE (cbd); - lua_thread_pool_restore_callback (&cbs); } } @@ -1194,7 +1161,7 @@ lua_tcp_register_event (struct lua_tcp_cbdata *cbd) if (cbd->session) { event_finalizer_t fin = IS_SYNC (cbd) ? lua_tcp_void_finalyser : lua_tcp_fin; - cbd->async_ev = rspamd_session_add_event (cbd->session, NULL, fin, cbd, + cbd->async_ev = rspamd_session_add_event (cbd->session, fin, cbd, g_quark_from_static_string ("lua tcp")); if (!cbd->async_ev) { @@ -1208,12 +1175,8 @@ lua_tcp_register_event (struct lua_tcp_cbdata *cbd) static void lua_tcp_register_watcher (struct lua_tcp_cbdata *cbd) { - if (cbd->session) { - cbd->w = rspamd_session_get_watcher (cbd->session); - - if (cbd->w) { - rspamd_session_watcher_push (cbd->session); - } + if (cbd->item) { + rspamd_symcache_item_async_inc (cbd->task, cbd->item); } } @@ -1590,6 +1553,11 @@ lua_tcp_request (lua_State *L) } cbd->task = task; + + if (task) { + cbd->item = rspamd_symbols_cache_get_cur_item (task); + } + cbd->cfg = cfg; h = rspamd_random_uint64_fast (); rspamd_snprintf (cbd->tag, sizeof (cbd->tag), "%uxL", h); -- 2.39.5