]> source.dussan.org Git - rspamd.git/commitdiff
Implement IO in lua tcp.
authorVsevolod Stakhov <vsevolod@highsecure.ru>
Mon, 18 May 2015 15:54:55 +0000 (16:54 +0100)
committerVsevolod Stakhov <vsevolod@highsecure.ru>
Mon, 18 May 2015 15:54:55 +0000 (16:54 +0100)
src/lua/lua_tcp.c

index 20330299e041d9eb32ccb8edbb5d1956cebee840..a5fb6a88237d014321f91ab0398c799f8073e354 100644 (file)
@@ -27,6 +27,8 @@
 #include "dns.h"
 #include "utlist.h"
 
+static void lua_tcp_handler (int fd, short what, gpointer ud);
+
 LUA_FUNCTION_DEF (tcp, request);
 
 static const struct luaL_reg tcp_libf[] = {
@@ -43,6 +45,8 @@ struct lua_tcp_cbdata {
        rspamd_inet_addr_t *addr;
        rspamd_mempool_t *pool;
        struct iovec *iov;
+       GString *in;
+       struct event ev;
        gint fd;
        gint cbref;
        guint iovlen;
@@ -74,6 +78,7 @@ lua_tcp_fin (gpointer arg)
        luaL_unref (cbd->L, LUA_REGISTRYINDEX, cbd->cbref);
 
        if (cbd->fd != -1) {
+               event_del (&cbd->ev);
                close (cbd->fd);
        }
 
@@ -106,6 +111,158 @@ lua_tcp_push_error (struct lua_tcp_cbdata *cbd, const char *err)
        }
 }
 
+static void
+lua_tcp_push_data (struct lua_tcp_cbdata *cbd, const gchar *str, gsize len)
+{
+       struct rspamd_lua_text *t;
+
+       lua_rawgeti (cbd->L, LUA_REGISTRYINDEX, cbd->cbref);
+       /* Error */
+       lua_pushnil (cbd->L);
+       /* Body */
+       t = lua_newuserdata (cbd->L, sizeof (*t));
+       rspamd_lua_setclass (cbd->L, "rspamd{text}", -1);
+       t->start = str;
+       t->len = len;
+
+       if (lua_pcall (cbd->L, 2, 0, 0) != 0) {
+               msg_info ("callback call failed: %s", lua_tostring (cbd->L, -1));
+       }
+
+       if (!cbd->partial) {
+               lua_tcp_maybe_free (cbd);
+       }
+}
+
+static void
+lua_tcp_write_helper (struct lua_tcp_cbdata *cbd)
+{
+       struct iovec *start;
+       guint niov, i;
+       gint flags = 0;
+       gsize remain;
+       gssize r;
+       struct iovec *cur_iov;
+       struct msghdr msg;
+
+       if (cbd->pos == cbd->total) {
+               goto call_finish_handler;
+       }
+
+       start = &cbd->iov[0];
+       niov = cbd->iovlen;
+       remain = cbd->pos;
+       /* We know that niov is small enough for that */
+       cur_iov = alloca (niov * sizeof (struct iovec));
+       memcpy (cur_iov, cbd->iov, niov * sizeof (struct iovec));
+       for (i = 0; i < cbd->iovlen && remain > 0; i++) {
+               /* Find out the first iov required */
+               start = &cur_iov[i];
+               if (start->iov_len <= remain) {
+                       remain -= start->iov_len;
+                       start = &cur_iov[i + 1];
+                       niov--;
+               }
+               else {
+                       start->iov_base = (void *)((char *)start->iov_base + remain);
+                       start->iov_len -= remain;
+                       remain = 0;
+               }
+       }
+
+       memset (&msg, 0, sizeof (msg));
+       msg.msg_iov = start;
+       msg.msg_iovlen = MIN (IOV_MAX, niov);
+       g_assert (niov > 0);
+#ifdef MSG_NOSIGNAL
+       flags = MSG_NOSIGNAL;
+#endif
+       r = sendmsg (cbd->fd, &msg, flags);
+
+       if (r == -1) {
+               lua_tcp_push_error (cbd, "IO write error");
+               lua_tcp_maybe_free (cbd);
+               return;
+       }
+       else {
+               cbd->pos += r;
+       }
+
+       if (cbd->pos >= cbd->total) {
+               goto call_finish_handler;
+       }
+       else {
+               /* Want to write more */
+               event_add (&cbd->ev, &cbd->tv);
+       }
+
+       return;
+
+call_finish_handler:
+
+       if (!cbd->partial) {
+               cbd->in = g_string_sized_new (BUFSIZ);
+               rspamd_mempool_add_destructor (cbd->pool, rspamd_gstring_free_hard,
+                               cbd->in);
+       }
+
+       event_del (&cbd->ev);
+       event_set (&cbd->ev, cbd->fd, EV_READ | EV_PERSIST, lua_tcp_handler, cbd);
+       event_base_set (cbd->ev_base, &cbd->ev);
+       event_add (&cbd->ev, &cbd->tv);
+}
+
+static void
+lua_tcp_handler (int fd, short what, gpointer ud)
+{
+       struct lua_tcp_cbdata *cbd = ud;
+       gchar inbuf[BUFSIZ];
+       gssize r;
+
+       if (what == EV_READ) {
+               g_assert (cbd->partial || cbd->in != NULL);
+
+               r = read (cbd->fd, inbuf, sizeof (inbuf));
+
+               if (r <= 0) {
+                       /*
+                        * We actually can have connection reset here, so we just check if
+                        * the cumulative buffer is not empty
+                        */
+                       if (cbd->partial) {
+                               if (r < 0) {
+                                       lua_tcp_push_error (cbd, strerror (errno));
+                               }
+                       }
+                       else {
+                               if (cbd->in->len > 0) {
+                                       lua_tcp_push_data (cbd, cbd->in->str, cbd->in->len);
+                               }
+                               else {
+                                       lua_tcp_push_error (cbd, "IO read error");
+                               }
+                       }
+
+                       lua_tcp_maybe_free (cbd);
+               }
+               else {
+                       if (cbd->partial) {
+                               lua_tcp_push_data (cbd, inbuf, r);
+                       }
+                       else {
+                               g_string_append_len (cbd->in, inbuf, r);
+                       }
+               }
+       }
+       else if (what == EV_WRITE) {
+               lua_tcp_write_helper (cbd);
+       }
+       else {
+               lua_tcp_push_error (cbd, "IO timeout");
+               lua_tcp_maybe_free (cbd);
+       }
+}
+
 static gboolean
 lua_tcp_make_connection (struct lua_tcp_cbdata *cbd)
 {
@@ -120,6 +277,10 @@ lua_tcp_make_connection (struct lua_tcp_cbdata *cbd)
        }
        cbd->fd = fd;
 
+       event_set (&cbd->ev, cbd->fd, EV_WRITE, lua_tcp_handler, cbd);
+       event_base_set (cbd->ev_base, &cbd->ev);
+       event_add (&cbd->ev, &cbd->tv);
+
        return TRUE;
 }