summaryrefslogtreecommitdiffstats
path: root/src/lua/lua_tcp.c
diff options
context:
space:
mode:
authorVsevolod Stakhov <vsevolod@highsecure.ru>2016-12-29 17:58:47 +0000
committerVsevolod Stakhov <vsevolod@highsecure.ru>2016-12-29 18:58:16 +0000
commit7bdea713e206959efbf8ffeee544cadd3cfc1a6f (patch)
tree789fae62a20993ccc96ab61e1219b71ac2e15c1f /src/lua/lua_tcp.c
parentf53f197c71b6d10e8eb1383cf4de3009a40b5939 (diff)
downloadrspamd-7bdea713e206959efbf8ffeee544cadd3cfc1a6f.tar.gz
rspamd-7bdea713e206959efbf8ffeee544cadd3cfc1a6f.zip
[Fix] Miltiple fixes to new lua_tcp, add debugging
Diffstat (limited to 'src/lua/lua_tcp.c')
-rw-r--r--src/lua/lua_tcp.c66
1 files changed, 44 insertions, 22 deletions
diff --git a/src/lua/lua_tcp.c b/src/lua/lua_tcp.c
index d7c2a7d23..2b2a7ce4c 100644
--- a/src/lua/lua_tcp.c
+++ b/src/lua/lua_tcp.c
@@ -152,12 +152,18 @@ struct lua_tcp_cbdata {
gint connect_cb;
guint port;
guint flags;
+ gchar tag[7];
struct rspamd_async_watcher *w;
struct event ev;
struct lua_tcp_dtor *dtors;
ref_entry_t ref;
};
+#define msg_debug_tcp(...) rspamd_default_log_function (G_LOG_LEVEL_DEBUG, \
+ "lua_tcp", cbd->tag, \
+ G_STRFUNC, \
+ __VA_ARGS__)
+
static void lua_tcp_handler (int fd, short what, gpointer ud);
static void lua_tcp_plan_handler_event (struct lua_tcp_cbdata *cbd,
gboolean can_read, gboolean can_write);
@@ -218,6 +224,8 @@ lua_tcp_fin (gpointer arg)
struct lua_tcp_cbdata *cbd = (struct lua_tcp_cbdata *)arg;
struct lua_tcp_dtor *dtor, *dttmp;
+ msg_debug_tcp ("finishing TCP connection");
+
if (cbd->connect_cb) {
luaL_unref (cbd->L, LUA_REGISTRYINDEX, cbd->connect_cb);
}
@@ -232,6 +240,7 @@ lua_tcp_fin (gpointer arg)
}
while (lua_tcp_shift_handler (cbd)) {}
+ g_queue_free (cbd->handlers);
LL_FOREACH_SAFE (cbd->dtors, dtor, dttmp) {
dtor->dtor (dtor->data);
@@ -301,9 +310,9 @@ lua_tcp_push_error (struct lua_tcp_cbdata *cbd, const char *err, ...)
msg_info ("callback call failed: %s", lua_tostring (cbd->L, -1));
lua_pop (cbd->L, 1);
}
- }
- REF_RELEASE (cbd);
+ REF_RELEASE (cbd);
+ }
}
static void
@@ -312,7 +321,7 @@ lua_tcp_push_data (struct lua_tcp_cbdata *cbd, const guint8 *str, gsize len)
struct rspamd_lua_text *t;
struct lua_tcp_cbdata **pcbd;
struct lua_tcp_handler *hdl;
- gint cbref;
+ gint cbref, arg_cnt;
hdl = g_queue_peek_head (cbd->handlers);
@@ -337,6 +346,10 @@ lua_tcp_push_data (struct lua_tcp_cbdata *cbd, const guint8 *str, gsize len)
t->start = (const gchar *)str;
t->len = len;
t->flags = 0;
+ arg_cnt = 3;
+ }
+ else {
+ arg_cnt = 2;
}
/* Connection */
pcbd = lua_newuserdata (cbd->L, sizeof (*pcbd));
@@ -345,13 +358,13 @@ lua_tcp_push_data (struct lua_tcp_cbdata *cbd, const guint8 *str, gsize len)
REF_RETAIN (cbd);
- if (lua_pcall (cbd->L, 3, 0, 0) != 0) {
+ if (lua_pcall (cbd->L, arg_cnt, 0, 0) != 0) {
msg_info ("callback call failed: %s", lua_tostring (cbd->L, -1));
lua_pop (cbd->L, 1);
}
- }
- REF_RELEASE (cbd);
+ REF_RELEASE (cbd);
+ }
}
static void
@@ -359,10 +372,10 @@ lua_tcp_plan_read (struct lua_tcp_cbdata *cbd)
{
event_del (&cbd->ev);
#ifdef EV_CLOSED
- event_set (&cbd->ev, cbd->fd, EV_READ|EV_PERSIST|EV_CLOSED,
+ event_set (&cbd->ev, cbd->fd, EV_READ|EV_CLOSED,
lua_tcp_handler, cbd);
#else
- event_set (&cbd->ev, cbd->fd, EV_READ|EV_PERSIST, lua_tcp_handler, cbd);
+ event_set (&cbd->ev, cbd->fd, EV_READ, lua_tcp_handler, cbd);
#endif
event_base_set (cbd->ev_base, &cbd->ev);
event_add (&cbd->ev, &cbd->tv);
@@ -445,6 +458,8 @@ lua_tcp_write_helper (struct lua_tcp_cbdata *cbd)
call_finish_handler:
+ msg_debug_tcp ("finishing TCP write");
+
if ((cbd->flags & LUA_TCP_FLAG_SHUTDOWN)) {
/* Half close the connection */
shutdown (cbd->fd, SHUT_WR);
@@ -469,6 +484,7 @@ lua_tcp_process_read_handler (struct lua_tcp_cbdata *cbd,
if (cbd->in->len >= slen) {
if ((pos = rspamd_substring_search (cbd->in->data, cbd->in->len,
rh->stop_pattern, slen)) != -1) {
+ msg_debug_tcp ("found TCP stop pattern");
lua_tcp_push_data (cbd, cbd->in->data, pos);
if (pos + slen < cbd->in->len) {
@@ -485,11 +501,13 @@ lua_tcp_process_read_handler (struct lua_tcp_cbdata *cbd,
}
else {
/* Plan new read */
+ msg_debug_tcp ("NOT found TCP stop pattern");
lua_tcp_plan_read (cbd);
}
}
}
else {
+ msg_debug_tcp ("read TCP partial data");
lua_tcp_push_data (cbd, cbd->in->data, cbd->in->len);
lua_tcp_shift_handler (cbd);
@@ -570,6 +588,8 @@ lua_tcp_handler (int fd, short what, gpointer ud)
REF_RETAIN (cbd);
+ msg_debug_tcp ("processed TCP event: %d", what);
+
if (what == EV_READ) {
r = read (cbd->fd, inbuf, sizeof (inbuf));
lua_tcp_process_read (cbd, inbuf, r);
@@ -637,12 +657,15 @@ lua_tcp_plan_handler_event (struct lua_tcp_cbdata *cbd, gboolean can_read,
if (hdl == NULL) {
/* We are finished with a connection */
+ msg_debug_tcp ("no handlers left, finish session");
REF_RELEASE (cbd);
}
else {
if (hdl->type == LUA_WANT_READ) {
+
/* We need to check if we have some leftover in the buffer */
if (cbd->in->len > 0) {
+ msg_debug_tcp ("process read buffer leftover");
if (lua_tcp_process_read_handler (cbd, &hdl->h.r)) {
/* We can go to the next handler */
lua_tcp_shift_handler (cbd);
@@ -650,6 +673,7 @@ lua_tcp_plan_handler_event (struct lua_tcp_cbdata *cbd, gboolean can_read,
}
}
else {
+ msg_debug_tcp ("plan new read");
if (can_read) {
/* We need to plan a new event */
event_set (&cbd->ev, cbd->fd, EV_READ, lua_tcp_handler, cbd);
@@ -669,15 +693,17 @@ lua_tcp_plan_handler_event (struct lua_tcp_cbdata *cbd, gboolean can_read,
* We need to plan write event if there is something in the
* write request
*/
+
if (hdl->h.w.pos < hdl->h.w.total) {
+ msg_debug_tcp ("plan new write");
if (can_write) {
event_set (&cbd->ev, cbd->fd, EV_WRITE, lua_tcp_handler, cbd);
event_base_set (cbd->ev_base, &cbd->ev);
event_add (&cbd->ev, &cbd->tv);
}
else {
- /* Cannot read more */
- lua_tcp_push_error (cbd, "EOF, cannot read more data");
+ /* Cannot write more */
+ lua_tcp_push_error (cbd, "EOF, cannot write more data");
lua_tcp_shift_handler (cbd);
lua_tcp_plan_handler_event (cbd, can_read, can_write);
}
@@ -826,6 +852,7 @@ lua_tcp_request (lua_State *L)
struct rspamd_task *task = NULL;
struct iovec *iov = NULL;
guint niov = 0, total_out;
+ guint64 h;
gdouble timeout = default_tcp_timeout;
gboolean partial = FALSE, do_shutdown = FALSE, do_read = TRUE;
@@ -1007,6 +1034,9 @@ lua_tcp_request (lua_State *L)
}
cbd->L = L;
+ h = rspamd_random_uint64_fast ();
+ rspamd_snprintf (cbd->tag, sizeof (cbd->tag), "%uxL", h);
+ cbd->handlers = g_queue_new ();
if (total_out > 0) {
struct lua_tcp_handler *wh;
@@ -1162,8 +1192,9 @@ lua_tcp_add_read (lua_State *L)
rh->type = LUA_WANT_READ;
rh->h.r.cbref = cbref;
rh->h.r.stop_pattern = stop_pattern;
+ msg_debug_tcp ("added read event, cbref: %d", cbref);
+
g_queue_push_tail (cbd->handlers, rh);
- lua_tcp_plan_handler_event (cbd, TRUE, TRUE);
return 0;
}
@@ -1242,19 +1273,10 @@ lua_tcp_add_write (lua_State *L)
wh->h.w.total = total_out;
wh->h.w.pos = 0;
/* Cannot set write handler here */
- wh->h.w.cbref = -1;
-
- if (cbref != -1) {
- /* We have write only callback */
- wh->h.w.cbref = cbref;
- }
- else {
- /* We have simple client callback */
- wh->h.w.cbref = -1;
- }
+ wh->h.w.cbref = cbref;
+ msg_debug_tcp ("added write event, cbref: %d", cbref);
g_queue_push_tail (cbd->handlers, wh);
- lua_tcp_plan_handler_event (cbd, TRUE, TRUE);
lua_pushboolean (L, TRUE);
return 1;