aboutsummaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/lua/lua_tcp.c78
1 files changed, 23 insertions, 55 deletions
diff --git a/src/lua/lua_tcp.c b/src/lua/lua_tcp.c
index f9c1a477d..c6e96825b 100644
--- a/src/lua/lua_tcp.c
+++ b/src/lua/lua_tcp.c
@@ -336,11 +336,11 @@ struct lua_tcp_cbdata {
guint port;
guint flags;
gchar tag[7];
- struct rspamd_async_watcher *w;
struct event ev;
struct lua_tcp_dtor *dtors;
ref_entry_t ref;
struct rspamd_task *task;
+ struct rspamd_symcache_item *item;
struct thread_entry *thread;
struct rspamd_config *cfg;
gboolean eof;
@@ -482,10 +482,10 @@ lua_tcp_maybe_free (struct lua_tcp_cbdata *cbd)
* Object is owned by lua and will be destroyed on __gc()
*/
- if (cbd->w) {
- rspamd_session_watcher_pop (cbd->session, cbd->w);
+ if (cbd->item) {
+ rspamd_symcache_item_async_dec_check (cbd->task, cbd->item);
+ cbd->item = NULL;
}
- cbd->w = NULL;
if (cbd->async_ev) {
rspamd_session_remove_event (cbd->session, lua_tcp_void_finalyser, cbd);
@@ -494,10 +494,10 @@ lua_tcp_maybe_free (struct lua_tcp_cbdata *cbd)
cbd->async_ev = NULL;
}
else {
- if (cbd->w) {
- rspamd_session_watcher_pop (cbd->session, cbd->w);
+ if (cbd->item) {
+ rspamd_symcache_item_async_dec_check (cbd->task, cbd->item);
+ cbd->item = NULL;
}
- cbd->w = NULL;
if (cbd->async_ev) {
rspamd_session_remove_event (cbd->session, lua_tcp_fin, cbd);
@@ -525,7 +525,6 @@ lua_tcp_push_error (struct lua_tcp_cbdata *cbd, gboolean is_fatal,
struct lua_tcp_handler *hdl;
gint cbref, top;
struct lua_callback_state cbs;
- struct rspamd_async_watcher *existing_watcher = NULL;
lua_State *L;
if (cbd->thread) {
@@ -572,21 +571,14 @@ lua_tcp_push_error (struct lua_tcp_cbdata *cbd, gboolean is_fatal,
rspamd_lua_setclass (L, "rspamd{tcp}", -1);
TCP_RETAIN (cbd);
- if (cbd->w) {
- /* Replace watcher to deal with nested calls */
- existing_watcher = rspamd_session_replace_watcher (cbd->session, cbd->w);
+ if (cbd->item) {
+ rspamd_symbols_cache_set_cur_item (cbd->task, cbd->item);
}
if (lua_pcall (L, 3, 0, 0) != 0) {
msg_info ("callback call failed: %s", lua_tostring (L, -1));
}
-
- if (cbd->w) {
- /* Restore existing watcher */
- rspamd_session_replace_watcher (cbd->session, existing_watcher);
- }
-
lua_settop (L, top);
TCP_RELEASE (cbd);
@@ -617,7 +609,6 @@ lua_tcp_push_data (struct lua_tcp_cbdata *cbd, const guint8 *str, gsize len)
gint cbref, arg_cnt, top;
struct lua_callback_state cbs;
lua_State *L;
- struct rspamd_async_watcher *existing_watcher = NULL;
if (cbd->thread) {
lua_tcp_resume_thread (cbd, str, len);
@@ -663,20 +654,14 @@ lua_tcp_push_data (struct lua_tcp_cbdata *cbd, const guint8 *str, gsize len)
TCP_RETAIN (cbd);
- if (cbd->w) {
- /* Replace watcher to deal with nested calls */
- existing_watcher = rspamd_session_replace_watcher (cbd->session, cbd->w);
+ if (cbd->item) {
+ rspamd_symbols_cache_set_cur_item (cbd->task, cbd->item);
}
if (lua_pcall (L, arg_cnt, 0, 0) != 0) {
msg_info ("callback call failed: %s", lua_tostring (L, -1));
}
- if (cbd->w) {
- /* Restore existing watcher */
- rspamd_session_replace_watcher (cbd->session, existing_watcher);
- }
-
lua_settop (L, top);
TCP_RELEASE (cbd);
}
@@ -720,7 +705,6 @@ lua_tcp_resume_thread (struct lua_tcp_cbdata *cbd, const guint8 *str, gsize len)
lua_State *L = cbd->thread->lua_state;
struct lua_tcp_handler *hdl;
- struct rspamd_async_watcher *existing_watcher = NULL;
hdl = g_queue_peek_head (cbd->handlers);
@@ -735,18 +719,12 @@ lua_tcp_resume_thread (struct lua_tcp_cbdata *cbd, const guint8 *str, gsize len)
lua_tcp_shift_handler (cbd);
lua_thread_pool_set_running_entry (cbd->cfg->lua_thread_pool, cbd->thread);
- if (cbd->w) {
- /* Replace watcher to deal with nested calls */
- existing_watcher = rspamd_session_replace_watcher (cbd->session, cbd->w);
+ if (cbd->item) {
+ rspamd_symbols_cache_set_cur_item (cbd->task, cbd->item);
}
lua_thread_resume (cbd->thread, 2);
- if (cbd->w) {
- /* Restore existing watcher */
- rspamd_session_replace_watcher (cbd->session, existing_watcher);
- }
-
TCP_RELEASE (cbd);
}
@@ -1042,7 +1020,6 @@ lua_tcp_handler (int fd, short what, gpointer ud)
if (cbd->connect_cb != -1) {
struct lua_tcp_cbdata **pcbd;
gint top;
- struct rspamd_async_watcher *existing_watcher = NULL;
lua_thread_pool_prepare_callback (cbd->cfg->lua_thread_pool, &cbs);
L = cbs.L;
@@ -1054,26 +1031,16 @@ lua_tcp_handler (int fd, short what, gpointer ud)
TCP_RETAIN (cbd);
rspamd_lua_setclass (L, "rspamd{tcp}", -1);
- if (cbd->w) {
- /* Replace watcher to deal with nested calls */
- existing_watcher = rspamd_session_replace_watcher (
- cbd->session, cbd->w);
+ if (cbd->item) {
+ rspamd_symbols_cache_set_cur_item (cbd->task, cbd->item);
}
if (lua_pcall (L, 1, 0, 0) != 0) {
msg_info ("callback call failed: %s", lua_tostring (L, -1));
}
- if (cbd->w) {
- /* Restore existing watcher */
- rspamd_session_replace_watcher (cbd->session,
- existing_watcher);
- }
-
lua_settop (L, top);
-
TCP_RELEASE (cbd);
-
lua_thread_pool_restore_callback (&cbs);
}
}
@@ -1194,7 +1161,7 @@ lua_tcp_register_event (struct lua_tcp_cbdata *cbd)
if (cbd->session) {
event_finalizer_t fin = IS_SYNC (cbd) ? lua_tcp_void_finalyser : lua_tcp_fin;
- cbd->async_ev = rspamd_session_add_event (cbd->session, NULL, fin, cbd,
+ cbd->async_ev = rspamd_session_add_event (cbd->session, fin, cbd,
g_quark_from_static_string ("lua tcp"));
if (!cbd->async_ev) {
@@ -1208,12 +1175,8 @@ lua_tcp_register_event (struct lua_tcp_cbdata *cbd)
static void
lua_tcp_register_watcher (struct lua_tcp_cbdata *cbd)
{
- if (cbd->session) {
- cbd->w = rspamd_session_get_watcher (cbd->session);
-
- if (cbd->w) {
- rspamd_session_watcher_push (cbd->session);
- }
+ if (cbd->item) {
+ rspamd_symcache_item_async_inc (cbd->task, cbd->item);
}
}
@@ -1590,6 +1553,11 @@ lua_tcp_request (lua_State *L)
}
cbd->task = task;
+
+ if (task) {
+ cbd->item = rspamd_symbols_cache_get_cur_item (task);
+ }
+
cbd->cfg = cfg;
h = rspamd_random_uint64_fast ();
rspamd_snprintf (cbd->tag, sizeof (cbd->tag), "%uxL", h);