From 51edc2517af40c42381f983f2aa7175853c1208e Mon Sep 17 00:00:00 2001 From: Vsevolod Stakhov Date: Mon, 18 May 2015 16:54:55 +0100 Subject: [PATCH] Implement IO in lua tcp. --- src/lua/lua_tcp.c | 161 ++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 161 insertions(+) diff --git a/src/lua/lua_tcp.c b/src/lua/lua_tcp.c index 20330299e..a5fb6a882 100644 --- a/src/lua/lua_tcp.c +++ b/src/lua/lua_tcp.c @@ -27,6 +27,8 @@ #include "dns.h" #include "utlist.h" +static void lua_tcp_handler (int fd, short what, gpointer ud); + LUA_FUNCTION_DEF (tcp, request); static const struct luaL_reg tcp_libf[] = { @@ -43,6 +45,8 @@ struct lua_tcp_cbdata { rspamd_inet_addr_t *addr; rspamd_mempool_t *pool; struct iovec *iov; + GString *in; + struct event ev; gint fd; gint cbref; guint iovlen; @@ -74,6 +78,7 @@ lua_tcp_fin (gpointer arg) luaL_unref (cbd->L, LUA_REGISTRYINDEX, cbd->cbref); if (cbd->fd != -1) { + event_del (&cbd->ev); close (cbd->fd); } @@ -106,6 +111,158 @@ lua_tcp_push_error (struct lua_tcp_cbdata *cbd, const char *err) } } +static void +lua_tcp_push_data (struct lua_tcp_cbdata *cbd, const gchar *str, gsize len) +{ + struct rspamd_lua_text *t; + + lua_rawgeti (cbd->L, LUA_REGISTRYINDEX, cbd->cbref); + /* Error */ + lua_pushnil (cbd->L); + /* Body */ + t = lua_newuserdata (cbd->L, sizeof (*t)); + rspamd_lua_setclass (cbd->L, "rspamd{text}", -1); + t->start = str; + t->len = len; + + if (lua_pcall (cbd->L, 2, 0, 0) != 0) { + msg_info ("callback call failed: %s", lua_tostring (cbd->L, -1)); + } + + if (!cbd->partial) { + lua_tcp_maybe_free (cbd); + } +} + +static void +lua_tcp_write_helper (struct lua_tcp_cbdata *cbd) +{ + struct iovec *start; + guint niov, i; + gint flags = 0; + gsize remain; + gssize r; + struct iovec *cur_iov; + struct msghdr msg; + + if (cbd->pos == cbd->total) { + goto call_finish_handler; + } + + start = &cbd->iov[0]; + niov = cbd->iovlen; + remain = cbd->pos; + /* We know that niov is small enough for that */ + cur_iov = alloca (niov * sizeof (struct iovec)); + memcpy (cur_iov, cbd->iov, niov * sizeof (struct iovec)); + for (i = 0; i < cbd->iovlen && remain > 0; i++) { + /* Find out the first iov required */ + start = &cur_iov[i]; + if (start->iov_len <= remain) { + remain -= start->iov_len; + start = &cur_iov[i + 1]; + niov--; + } + else { + start->iov_base = (void *)((char *)start->iov_base + remain); + start->iov_len -= remain; + remain = 0; + } + } + + memset (&msg, 0, sizeof (msg)); + msg.msg_iov = start; + msg.msg_iovlen = MIN (IOV_MAX, niov); + g_assert (niov > 0); +#ifdef MSG_NOSIGNAL + flags = MSG_NOSIGNAL; +#endif + r = sendmsg (cbd->fd, &msg, flags); + + if (r == -1) { + lua_tcp_push_error (cbd, "IO write error"); + lua_tcp_maybe_free (cbd); + return; + } + else { + cbd->pos += r; + } + + if (cbd->pos >= cbd->total) { + goto call_finish_handler; + } + else { + /* Want to write more */ + event_add (&cbd->ev, &cbd->tv); + } + + return; + +call_finish_handler: + + if (!cbd->partial) { + cbd->in = g_string_sized_new (BUFSIZ); + rspamd_mempool_add_destructor (cbd->pool, rspamd_gstring_free_hard, + cbd->in); + } + + event_del (&cbd->ev); + event_set (&cbd->ev, cbd->fd, EV_READ | EV_PERSIST, lua_tcp_handler, cbd); + event_base_set (cbd->ev_base, &cbd->ev); + event_add (&cbd->ev, &cbd->tv); +} + +static void +lua_tcp_handler (int fd, short what, gpointer ud) +{ + struct lua_tcp_cbdata *cbd = ud; + gchar inbuf[BUFSIZ]; + gssize r; + + if (what == EV_READ) { + g_assert (cbd->partial || cbd->in != NULL); + + r = read (cbd->fd, inbuf, sizeof (inbuf)); + + if (r <= 0) { + /* + * We actually can have connection reset here, so we just check if + * the cumulative buffer is not empty + */ + if (cbd->partial) { + if (r < 0) { + lua_tcp_push_error (cbd, strerror (errno)); + } + } + else { + if (cbd->in->len > 0) { + lua_tcp_push_data (cbd, cbd->in->str, cbd->in->len); + } + else { + lua_tcp_push_error (cbd, "IO read error"); + } + } + + lua_tcp_maybe_free (cbd); + } + else { + if (cbd->partial) { + lua_tcp_push_data (cbd, inbuf, r); + } + else { + g_string_append_len (cbd->in, inbuf, r); + } + } + } + else if (what == EV_WRITE) { + lua_tcp_write_helper (cbd); + } + else { + lua_tcp_push_error (cbd, "IO timeout"); + lua_tcp_maybe_free (cbd); + } +} + static gboolean lua_tcp_make_connection (struct lua_tcp_cbdata *cbd) { @@ -120,6 +277,10 @@ lua_tcp_make_connection (struct lua_tcp_cbdata *cbd) } cbd->fd = fd; + event_set (&cbd->ev, cbd->fd, EV_WRITE, lua_tcp_handler, cbd); + event_base_set (cbd->ev_base, &cbd->ev); + event_add (&cbd->ev, &cbd->tv); + return TRUE; } -- 2.39.5