aboutsummaryrefslogtreecommitdiffstats
path: root/src/lua/lua_tcp.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/lua/lua_tcp.c')
-rw-r--r--src/lua/lua_tcp.c99
1 files changed, 41 insertions, 58 deletions
diff --git a/src/lua/lua_tcp.c b/src/lua/lua_tcp.c
index f9c1a477d..047bfe444 100644
--- a/src/lua/lua_tcp.c
+++ b/src/lua/lua_tcp.c
@@ -19,6 +19,8 @@
#include "unix-std.h"
#include <math.h>
+static const gchar *M = "rspamd lua tcp";
+
/***
* @module rspamd_tcp
* Rspamd TCP module represents generic TCP asynchronous client available from LUA code.
@@ -336,11 +338,11 @@ struct lua_tcp_cbdata {
guint port;
guint flags;
gchar tag[7];
- struct rspamd_async_watcher *w;
struct event ev;
struct lua_tcp_dtor *dtors;
ref_entry_t ref;
struct rspamd_task *task;
+ struct rspamd_symcache_item *item;
struct thread_entry *thread;
struct rspamd_config *cfg;
gboolean eof;
@@ -482,10 +484,10 @@ lua_tcp_maybe_free (struct lua_tcp_cbdata *cbd)
* Object is owned by lua and will be destroyed on __gc()
*/
- if (cbd->w) {
- rspamd_session_watcher_pop (cbd->session, cbd->w);
+ if (cbd->item) {
+ rspamd_symcache_item_async_dec_check (cbd->task, cbd->item, M);
+ cbd->item = NULL;
}
- cbd->w = NULL;
if (cbd->async_ev) {
rspamd_session_remove_event (cbd->session, lua_tcp_void_finalyser, cbd);
@@ -494,10 +496,10 @@ lua_tcp_maybe_free (struct lua_tcp_cbdata *cbd)
cbd->async_ev = NULL;
}
else {
- if (cbd->w) {
- rspamd_session_watcher_pop (cbd->session, cbd->w);
+ if (cbd->item) {
+ rspamd_symcache_item_async_dec_check (cbd->task, cbd->item, M);
+ cbd->item = NULL;
}
- cbd->w = NULL;
if (cbd->async_ev) {
rspamd_session_remove_event (cbd->session, lua_tcp_fin, cbd);
@@ -525,7 +527,6 @@ lua_tcp_push_error (struct lua_tcp_cbdata *cbd, gboolean is_fatal,
struct lua_tcp_handler *hdl;
gint cbref, top;
struct lua_callback_state cbs;
- struct rspamd_async_watcher *existing_watcher = NULL;
lua_State *L;
if (cbd->thread) {
@@ -572,21 +573,14 @@ lua_tcp_push_error (struct lua_tcp_cbdata *cbd, gboolean is_fatal,
rspamd_lua_setclass (L, "rspamd{tcp}", -1);
TCP_RETAIN (cbd);
- if (cbd->w) {
- /* Replace watcher to deal with nested calls */
- existing_watcher = rspamd_session_replace_watcher (cbd->session, cbd->w);
+ if (cbd->item) {
+ rspamd_symcache_set_cur_item (cbd->task, cbd->item);
}
if (lua_pcall (L, 3, 0, 0) != 0) {
msg_info ("callback call failed: %s", lua_tostring (L, -1));
}
-
- if (cbd->w) {
- /* Restore existing watcher */
- rspamd_session_replace_watcher (cbd->session, existing_watcher);
- }
-
lua_settop (L, top);
TCP_RELEASE (cbd);
@@ -617,7 +611,6 @@ lua_tcp_push_data (struct lua_tcp_cbdata *cbd, const guint8 *str, gsize len)
gint cbref, arg_cnt, top;
struct lua_callback_state cbs;
lua_State *L;
- struct rspamd_async_watcher *existing_watcher = NULL;
if (cbd->thread) {
lua_tcp_resume_thread (cbd, str, len);
@@ -663,20 +656,14 @@ lua_tcp_push_data (struct lua_tcp_cbdata *cbd, const guint8 *str, gsize len)
TCP_RETAIN (cbd);
- if (cbd->w) {
- /* Replace watcher to deal with nested calls */
- existing_watcher = rspamd_session_replace_watcher (cbd->session, cbd->w);
+ if (cbd->item) {
+ rspamd_symcache_set_cur_item (cbd->task, cbd->item);
}
if (lua_pcall (L, arg_cnt, 0, 0) != 0) {
msg_info ("callback call failed: %s", lua_tostring (L, -1));
}
- if (cbd->w) {
- /* Restore existing watcher */
- rspamd_session_replace_watcher (cbd->session, existing_watcher);
- }
-
lua_settop (L, top);
TCP_RELEASE (cbd);
}
@@ -720,7 +707,6 @@ lua_tcp_resume_thread (struct lua_tcp_cbdata *cbd, const guint8 *str, gsize len)
lua_State *L = cbd->thread->lua_state;
struct lua_tcp_handler *hdl;
- struct rspamd_async_watcher *existing_watcher = NULL;
hdl = g_queue_peek_head (cbd->handlers);
@@ -735,18 +721,12 @@ lua_tcp_resume_thread (struct lua_tcp_cbdata *cbd, const guint8 *str, gsize len)
lua_tcp_shift_handler (cbd);
lua_thread_pool_set_running_entry (cbd->cfg->lua_thread_pool, cbd->thread);
- if (cbd->w) {
- /* Replace watcher to deal with nested calls */
- existing_watcher = rspamd_session_replace_watcher (cbd->session, cbd->w);
+ if (cbd->item) {
+ rspamd_symcache_set_cur_item (cbd->task, cbd->item);
}
lua_thread_resume (cbd->thread, 2);
- if (cbd->w) {
- /* Restore existing watcher */
- rspamd_session_replace_watcher (cbd->session, existing_watcher);
- }
-
TCP_RELEASE (cbd);
}
@@ -915,7 +895,15 @@ lua_tcp_process_read_handler (struct lua_tcp_cbdata *cbd,
else {
/* Plan new read */
msg_debug_tcp ("NOT found TCP stop pattern");
- lua_tcp_plan_read (cbd);
+
+ if (!cbd->eof) {
+ lua_tcp_plan_read (cbd);
+ }
+ else {
+ /* Got session finished but no stop pattern */
+ lua_tcp_push_error (cbd, TRUE,
+ "IO read error: connection terminated");
+ }
}
}
}
@@ -977,7 +965,7 @@ lua_tcp_process_read (struct lua_tcp_cbdata *cbd,
lua_tcp_process_read_handler (cbd, rh, TRUE);
}
else {
- lua_tcp_push_error (cbd, FALSE, "IO read error: connection terminated");
+ lua_tcp_push_error (cbd, TRUE, "IO read error: connection terminated");
}
lua_tcp_plan_handler_event (cbd, FALSE, TRUE);
@@ -1042,7 +1030,6 @@ lua_tcp_handler (int fd, short what, gpointer ud)
if (cbd->connect_cb != -1) {
struct lua_tcp_cbdata **pcbd;
gint top;
- struct rspamd_async_watcher *existing_watcher = NULL;
lua_thread_pool_prepare_callback (cbd->cfg->lua_thread_pool, &cbs);
L = cbs.L;
@@ -1054,26 +1041,16 @@ lua_tcp_handler (int fd, short what, gpointer ud)
TCP_RETAIN (cbd);
rspamd_lua_setclass (L, "rspamd{tcp}", -1);
- if (cbd->w) {
- /* Replace watcher to deal with nested calls */
- existing_watcher = rspamd_session_replace_watcher (
- cbd->session, cbd->w);
+ if (cbd->item) {
+ rspamd_symcache_set_cur_item (cbd->task, cbd->item);
}
if (lua_pcall (L, 1, 0, 0) != 0) {
msg_info ("callback call failed: %s", lua_tostring (L, -1));
}
- if (cbd->w) {
- /* Restore existing watcher */
- rspamd_session_replace_watcher (cbd->session,
- existing_watcher);
- }
-
lua_settop (L, top);
-
TCP_RELEASE (cbd);
-
lua_thread_pool_restore_callback (&cbs);
}
}
@@ -1194,8 +1171,7 @@ lua_tcp_register_event (struct lua_tcp_cbdata *cbd)
if (cbd->session) {
event_finalizer_t fin = IS_SYNC (cbd) ? lua_tcp_void_finalyser : lua_tcp_fin;
- cbd->async_ev = rspamd_session_add_event (cbd->session, NULL, fin, cbd,
- g_quark_from_static_string ("lua tcp"));
+ cbd->async_ev = rspamd_session_add_event (cbd->session, fin, cbd, M);
if (!cbd->async_ev) {
return FALSE;
@@ -1208,12 +1184,8 @@ lua_tcp_register_event (struct lua_tcp_cbdata *cbd)
static void
lua_tcp_register_watcher (struct lua_tcp_cbdata *cbd)
{
- if (cbd->session) {
- cbd->w = rspamd_session_get_watcher (cbd->session);
-
- if (cbd->w) {
- rspamd_session_watcher_push (cbd->session);
- }
+ if (cbd->item) {
+ rspamd_symcache_item_async_inc (cbd->task, cbd->item, M);
}
}
@@ -1241,6 +1213,7 @@ lua_tcp_make_connection (struct lua_tcp_cbdata *cbd)
return FALSE;
}
+#if 0
if (!(cbd->flags & LUA_TCP_FLAG_RESOLVED)) {
/* We come here without resolving, so we need to add a watcher */
lua_tcp_register_watcher (cbd);
@@ -1248,6 +1221,7 @@ lua_tcp_make_connection (struct lua_tcp_cbdata *cbd)
else {
cbd->flags |= LUA_TCP_FLAG_RESOLVED;
}
+#endif
lua_tcp_register_event (cbd);
@@ -1590,6 +1564,11 @@ lua_tcp_request (lua_State *L)
}
cbd->task = task;
+
+ if (task) {
+ cbd->item = rspamd_symcache_get_cur_item (task);
+ }
+
cbd->cfg = cfg;
h = rspamd_random_uint64_fast ();
rspamd_snprintf (cbd->tag, sizeof (cbd->tag), "%uxL", h);
@@ -1668,6 +1647,8 @@ lua_tcp_request (lua_State *L)
if (rspamd_parse_inet_address (&cbd->addr, host, 0)) {
rspamd_inet_address_set_port (cbd->addr, port);
/* Host is numeric IP, no need to resolve */
+ lua_tcp_register_watcher (cbd);
+
if (!lua_tcp_make_connection (cbd)) {
lua_pushboolean (L, FALSE);
@@ -1860,6 +1841,8 @@ lua_tcp_connect_sync (lua_State *L)
}
}
else {
+ cbd->item = rspamd_symcache_get_cur_item (task);
+
if (!make_dns_request_task (task, lua_tcp_dns_handler, cbd,
RDNS_REQUEST_A, host)) {
lua_pushboolean (L, FALSE);