diff options
author | Vsevolod Stakhov <vsevolod@highsecure.ru> | 2016-12-29 17:58:47 +0000 |
---|---|---|
committer | Vsevolod Stakhov <vsevolod@highsecure.ru> | 2016-12-29 18:58:16 +0000 |
commit | 7bdea713e206959efbf8ffeee544cadd3cfc1a6f (patch) | |
tree | 789fae62a20993ccc96ab61e1219b71ac2e15c1f /src/lua/lua_tcp.c | |
parent | f53f197c71b6d10e8eb1383cf4de3009a40b5939 (diff) | |
download | rspamd-7bdea713e206959efbf8ffeee544cadd3cfc1a6f.tar.gz rspamd-7bdea713e206959efbf8ffeee544cadd3cfc1a6f.zip |
[Fix] Miltiple fixes to new lua_tcp, add debugging
Diffstat (limited to 'src/lua/lua_tcp.c')
-rw-r--r-- | src/lua/lua_tcp.c | 66 |
1 files changed, 44 insertions, 22 deletions
diff --git a/src/lua/lua_tcp.c b/src/lua/lua_tcp.c index d7c2a7d23..2b2a7ce4c 100644 --- a/src/lua/lua_tcp.c +++ b/src/lua/lua_tcp.c @@ -152,12 +152,18 @@ struct lua_tcp_cbdata { gint connect_cb; guint port; guint flags; + gchar tag[7]; struct rspamd_async_watcher *w; struct event ev; struct lua_tcp_dtor *dtors; ref_entry_t ref; }; +#define msg_debug_tcp(...) rspamd_default_log_function (G_LOG_LEVEL_DEBUG, \ + "lua_tcp", cbd->tag, \ + G_STRFUNC, \ + __VA_ARGS__) + static void lua_tcp_handler (int fd, short what, gpointer ud); static void lua_tcp_plan_handler_event (struct lua_tcp_cbdata *cbd, gboolean can_read, gboolean can_write); @@ -218,6 +224,8 @@ lua_tcp_fin (gpointer arg) struct lua_tcp_cbdata *cbd = (struct lua_tcp_cbdata *)arg; struct lua_tcp_dtor *dtor, *dttmp; + msg_debug_tcp ("finishing TCP connection"); + if (cbd->connect_cb) { luaL_unref (cbd->L, LUA_REGISTRYINDEX, cbd->connect_cb); } @@ -232,6 +240,7 @@ lua_tcp_fin (gpointer arg) } while (lua_tcp_shift_handler (cbd)) {} + g_queue_free (cbd->handlers); LL_FOREACH_SAFE (cbd->dtors, dtor, dttmp) { dtor->dtor (dtor->data); @@ -301,9 +310,9 @@ lua_tcp_push_error (struct lua_tcp_cbdata *cbd, const char *err, ...) msg_info ("callback call failed: %s", lua_tostring (cbd->L, -1)); lua_pop (cbd->L, 1); } - } - REF_RELEASE (cbd); + REF_RELEASE (cbd); + } } static void @@ -312,7 +321,7 @@ lua_tcp_push_data (struct lua_tcp_cbdata *cbd, const guint8 *str, gsize len) struct rspamd_lua_text *t; struct lua_tcp_cbdata **pcbd; struct lua_tcp_handler *hdl; - gint cbref; + gint cbref, arg_cnt; hdl = g_queue_peek_head (cbd->handlers); @@ -337,6 +346,10 @@ lua_tcp_push_data (struct lua_tcp_cbdata *cbd, const guint8 *str, gsize len) t->start = (const gchar *)str; t->len = len; t->flags = 0; + arg_cnt = 3; + } + else { + arg_cnt = 2; } /* Connection */ pcbd = lua_newuserdata (cbd->L, sizeof (*pcbd)); @@ -345,13 +358,13 @@ lua_tcp_push_data (struct lua_tcp_cbdata *cbd, const guint8 *str, gsize len) REF_RETAIN (cbd); - if (lua_pcall (cbd->L, 3, 0, 0) != 0) { + if (lua_pcall (cbd->L, arg_cnt, 0, 0) != 0) { msg_info ("callback call failed: %s", lua_tostring (cbd->L, -1)); lua_pop (cbd->L, 1); } - } - REF_RELEASE (cbd); + REF_RELEASE (cbd); + } } static void @@ -359,10 +372,10 @@ lua_tcp_plan_read (struct lua_tcp_cbdata *cbd) { event_del (&cbd->ev); #ifdef EV_CLOSED - event_set (&cbd->ev, cbd->fd, EV_READ|EV_PERSIST|EV_CLOSED, + event_set (&cbd->ev, cbd->fd, EV_READ|EV_CLOSED, lua_tcp_handler, cbd); #else - event_set (&cbd->ev, cbd->fd, EV_READ|EV_PERSIST, lua_tcp_handler, cbd); + event_set (&cbd->ev, cbd->fd, EV_READ, lua_tcp_handler, cbd); #endif event_base_set (cbd->ev_base, &cbd->ev); event_add (&cbd->ev, &cbd->tv); @@ -445,6 +458,8 @@ lua_tcp_write_helper (struct lua_tcp_cbdata *cbd) call_finish_handler: + msg_debug_tcp ("finishing TCP write"); + if ((cbd->flags & LUA_TCP_FLAG_SHUTDOWN)) { /* Half close the connection */ shutdown (cbd->fd, SHUT_WR); @@ -469,6 +484,7 @@ lua_tcp_process_read_handler (struct lua_tcp_cbdata *cbd, if (cbd->in->len >= slen) { if ((pos = rspamd_substring_search (cbd->in->data, cbd->in->len, rh->stop_pattern, slen)) != -1) { + msg_debug_tcp ("found TCP stop pattern"); lua_tcp_push_data (cbd, cbd->in->data, pos); if (pos + slen < cbd->in->len) { @@ -485,11 +501,13 @@ lua_tcp_process_read_handler (struct lua_tcp_cbdata *cbd, } else { /* Plan new read */ + msg_debug_tcp ("NOT found TCP stop pattern"); lua_tcp_plan_read (cbd); } } } else { + msg_debug_tcp ("read TCP partial data"); lua_tcp_push_data (cbd, cbd->in->data, cbd->in->len); lua_tcp_shift_handler (cbd); @@ -570,6 +588,8 @@ lua_tcp_handler (int fd, short what, gpointer ud) REF_RETAIN (cbd); + msg_debug_tcp ("processed TCP event: %d", what); + if (what == EV_READ) { r = read (cbd->fd, inbuf, sizeof (inbuf)); lua_tcp_process_read (cbd, inbuf, r); @@ -637,12 +657,15 @@ lua_tcp_plan_handler_event (struct lua_tcp_cbdata *cbd, gboolean can_read, if (hdl == NULL) { /* We are finished with a connection */ + msg_debug_tcp ("no handlers left, finish session"); REF_RELEASE (cbd); } else { if (hdl->type == LUA_WANT_READ) { + /* We need to check if we have some leftover in the buffer */ if (cbd->in->len > 0) { + msg_debug_tcp ("process read buffer leftover"); if (lua_tcp_process_read_handler (cbd, &hdl->h.r)) { /* We can go to the next handler */ lua_tcp_shift_handler (cbd); @@ -650,6 +673,7 @@ lua_tcp_plan_handler_event (struct lua_tcp_cbdata *cbd, gboolean can_read, } } else { + msg_debug_tcp ("plan new read"); if (can_read) { /* We need to plan a new event */ event_set (&cbd->ev, cbd->fd, EV_READ, lua_tcp_handler, cbd); @@ -669,15 +693,17 @@ lua_tcp_plan_handler_event (struct lua_tcp_cbdata *cbd, gboolean can_read, * We need to plan write event if there is something in the * write request */ + if (hdl->h.w.pos < hdl->h.w.total) { + msg_debug_tcp ("plan new write"); if (can_write) { event_set (&cbd->ev, cbd->fd, EV_WRITE, lua_tcp_handler, cbd); event_base_set (cbd->ev_base, &cbd->ev); event_add (&cbd->ev, &cbd->tv); } else { - /* Cannot read more */ - lua_tcp_push_error (cbd, "EOF, cannot read more data"); + /* Cannot write more */ + lua_tcp_push_error (cbd, "EOF, cannot write more data"); lua_tcp_shift_handler (cbd); lua_tcp_plan_handler_event (cbd, can_read, can_write); } @@ -826,6 +852,7 @@ lua_tcp_request (lua_State *L) struct rspamd_task *task = NULL; struct iovec *iov = NULL; guint niov = 0, total_out; + guint64 h; gdouble timeout = default_tcp_timeout; gboolean partial = FALSE, do_shutdown = FALSE, do_read = TRUE; @@ -1007,6 +1034,9 @@ lua_tcp_request (lua_State *L) } cbd->L = L; + h = rspamd_random_uint64_fast (); + rspamd_snprintf (cbd->tag, sizeof (cbd->tag), "%uxL", h); + cbd->handlers = g_queue_new (); if (total_out > 0) { struct lua_tcp_handler *wh; @@ -1162,8 +1192,9 @@ lua_tcp_add_read (lua_State *L) rh->type = LUA_WANT_READ; rh->h.r.cbref = cbref; rh->h.r.stop_pattern = stop_pattern; + msg_debug_tcp ("added read event, cbref: %d", cbref); + g_queue_push_tail (cbd->handlers, rh); - lua_tcp_plan_handler_event (cbd, TRUE, TRUE); return 0; } @@ -1242,19 +1273,10 @@ lua_tcp_add_write (lua_State *L) wh->h.w.total = total_out; wh->h.w.pos = 0; /* Cannot set write handler here */ - wh->h.w.cbref = -1; - - if (cbref != -1) { - /* We have write only callback */ - wh->h.w.cbref = cbref; - } - else { - /* We have simple client callback */ - wh->h.w.cbref = -1; - } + wh->h.w.cbref = cbref; + msg_debug_tcp ("added write event, cbref: %d", cbref); g_queue_push_tail (cbd->handlers, wh); - lua_tcp_plan_handler_event (cbd, TRUE, TRUE); lua_pushboolean (L, TRUE); return 1; |