]> source.dussan.org Git - rspamd.git/commitdiff
[Feature] Allow to spawn asynchronous processes from Lua
authorVsevolod Stakhov <vsevolod@highsecure.ru>
Sat, 22 Jul 2017 19:36:04 +0000 (20:36 +0100)
committerVsevolod Stakhov <vsevolod@highsecure.ru>
Sat, 22 Jul 2017 19:36:04 +0000 (20:36 +0100)
src/lua/lua_common.c

index 38b6ee81b4c582a6ec98126a27005d52a68bcada..22d1f24466eb0c8f38bae612925a4fba76524ba9 100644 (file)
 #include "lua_common.h"
 #include "lptree.h"
 #include "utlist.h"
+#include "unix-std.h"
+#include "worker_util.h"
+#include "ottery.h"
+#include "rspamd_control.h"
 #include <math.h>
+#include <sys/wait.h>
+#include <src/libserver/rspamd_control.h>
 
 /* Lua module init function */
 #define MODULE_INIT_FUNC "module_init"
@@ -31,12 +37,14 @@ LUA_FUNCTION_DEF (worker, get_name);
 LUA_FUNCTION_DEF (worker, get_stat);
 LUA_FUNCTION_DEF (worker, get_index);
 LUA_FUNCTION_DEF (worker, get_pid);
+LUA_FUNCTION_DEF (worker, spawn_process);
 
 const luaL_reg worker_reg[] = {
        LUA_INTERFACE_DEF (worker, get_name),
        LUA_INTERFACE_DEF (worker, get_stat),
        LUA_INTERFACE_DEF (worker, get_index),
        LUA_INTERFACE_DEF (worker, get_pid),
+       LUA_INTERFACE_DEF (worker, spawn_process),
        {"__tostring", rspamd_lua_class_tostring},
        {NULL, NULL}
 };
@@ -1331,6 +1339,370 @@ lua_worker_get_pid (lua_State *L)
        return 1;
 }
 
+struct rspamd_lua_process_cbdata {
+       gint sp[2];
+       gint func_cbref;
+       gint cb_cbref;
+       gboolean replied;
+       gboolean is_error;
+       pid_t cpid;
+       lua_State *L;
+       guint64 sz;
+       GString *io_buf;
+       struct rspamd_worker *wrk;
+       struct event_base *ev_base;
+       struct event ev;
+};
+
+static void
+rspamd_lua_execute_subprocess (lua_State *L,
+               struct rspamd_lua_process_cbdata *cbdata)
+{
+       gint err_idx, r;
+       GString *tb;
+       guint64 wlen = 0;
+       const gchar *ret;
+       gsize retlen;
+
+       lua_pushcfunction (L, &rspamd_lua_traceback);
+       err_idx = lua_gettop (L);
+
+       lua_rawgeti (L, LUA_REGISTRYINDEX, cbdata->func_cbref);
+
+       if (lua_pcall (L, 0, 1, err_idx) != 0) {
+               tb = lua_touserdata (L, -1);
+               msg_err ("call to subprocess failed: %v", tb);
+               /* Indicate error */
+               wlen = (1ULL << 63) + tb->len;
+               g_string_free (tb, TRUE);
+
+               r = write (cbdata->sp[1], &wlen, sizeof (wlen));
+               if (r == -1) {
+                       msg_err ("write failed: %s", strerror (errno));
+               }
+
+               r = write (cbdata->sp[1], tb->str, tb->len);
+               if (r == -1) {
+                       msg_err ("write failed: %s", strerror (errno));
+               }
+
+               lua_pop (L, 1);
+       }
+       else {
+               ret = lua_tolstring (L, -1, &retlen);
+               wlen = retlen;
+
+               r = write (cbdata->sp[1], &wlen, sizeof (wlen));
+               if (r == -1) {
+                       msg_err ("write failed: %s", strerror (errno));
+               }
+
+               r = write (cbdata->sp[1], ret, retlen);
+               if (r == -1) {
+                       msg_err ("write failed: %s", strerror (errno));
+               }
+       }
+
+       lua_pop (L, 1); /* Error function */
+}
+
+static void
+rspamd_lua_call_on_complete (lua_State *L,
+               struct rspamd_lua_process_cbdata *cbdata,
+               const gchar *err_msg, const gchar *data, gsize datalen)
+{
+       gint err_idx;
+       GString *tb;
+
+       lua_pushcfunction (L, &rspamd_lua_traceback);
+       err_idx = lua_gettop (L);
+
+       lua_rawgeti (L, LUA_REGISTRYINDEX, cbdata->cb_cbref);
+
+       if (err_msg) {
+               lua_pushstring (L, err_msg);
+       }
+       else {
+               lua_pushnil (L);
+       }
+
+       if (data) {
+               lua_pushlstring (L, data, datalen);
+       }
+       else {
+               lua_pushnil (L);
+       }
+
+       if (lua_pcall (L, 2, 0, err_idx) != 0) {
+               tb = lua_touserdata (L, -1);
+               msg_err ("call to subprocess callback script failed: %v", tb);
+               lua_pop (L, 1);
+       }
+
+       lua_pop (L, 1); /* Error function */
+}
+
+static gboolean
+rspamd_lua_cld_handler (struct rspamd_worker_signal_handler *sigh, void *ud)
+{
+       struct rspamd_lua_process_cbdata *cbdata = ud;
+       struct rspamd_srv_command srv_cmd;
+       lua_State *L;
+       pid_t died;
+       gint res = 0;
+
+       /* Are we called by a correct children ? */
+       died = waitpid (cbdata->cpid, &res, WNOHANG);
+
+       if (died <= 0) {
+               /* Wait more */
+               return TRUE;
+       }
+
+       L = cbdata->L;
+       msg_info ("handled SIGCHLD from %p", cbdata->cpid);
+
+       if (!cbdata->replied) {
+               /* We still need to call on_complete callback */
+               rspamd_lua_call_on_complete (cbdata->L, cbdata,
+                               "Worker has died without reply", NULL, 0);
+       }
+
+       /* Free structures */
+       event_del (&cbdata->ev);
+       close (cbdata->sp[0]);
+       close (cbdata->sp[1]);
+       luaL_unref (L, LUA_REGISTRYINDEX, cbdata->func_cbref);
+       luaL_unref (L, LUA_REGISTRYINDEX, cbdata->cb_cbref);
+       g_string_free (cbdata->io_buf, TRUE);
+       g_free (cbdata);
+
+       /* Notify main */
+       srv_cmd.type = RSPAMD_SRV_ON_FORK;
+       srv_cmd.cmd.on_fork.state = child_dead;
+       srv_cmd.cmd.on_fork.cpid = cbdata->cpid;
+       srv_cmd.cmd.on_fork.ppid = getpid ();
+       rspamd_srv_send_command (cbdata->wrk, cbdata->ev_base, &srv_cmd, -1,
+                       NULL, NULL);
+
+       /* We are done with this SIGCHLD */
+       return FALSE;
+}
+
+static void
+rspamd_lua_subprocess_io (gint fd, short what, gpointer ud)
+{
+       struct rspamd_lua_process_cbdata *cbdata = ud;
+       gssize r;
+
+       if (cbdata->io_buf->len < sizeof (guint64)) {
+               guint64 sz;
+
+               /* We read size of reply + flags first */
+               r = read (cbdata->sp[0], cbdata->io_buf->str + cbdata->io_buf->len,
+                               sizeof (guint64) - cbdata->io_buf->len);
+
+               if (r == 0) {
+                       rspamd_lua_call_on_complete (cbdata->L, cbdata,
+                                       "Unexpected EOF", NULL, 0);
+                       kill (cbdata->cpid, SIGTERM);
+
+                       return;
+               }
+               else if (r == -1) {
+                       if (errno == EAGAIN || errno == EINTR) {
+                               return;
+                       }
+                       else {
+                               rspamd_lua_call_on_complete (cbdata->L, cbdata,
+                                               strerror (errno), NULL, 0);
+                               kill (cbdata->cpid, SIGTERM);
+
+                               return;
+                       }
+               }
+
+               cbdata->io_buf->len += r;
+
+               if (cbdata->io_buf->len == sizeof (guint64)) {
+                       memcpy ((guchar *)&sz, cbdata->io_buf->str, sizeof (sz));
+
+                       if (sz & (1ULL << 63)) {
+                               cbdata->is_error = TRUE;
+                               sz &= ~(1ULL << 63);
+                       }
+
+                       cbdata->io_buf->len = 0;
+                       cbdata->sz = sz;
+                       g_string_set_size (cbdata->io_buf, sz + 1);
+                       cbdata->io_buf->len = 0;
+               }
+       }
+       else {
+               /* Read data */
+               r = read (cbdata->sp[0], cbdata->io_buf->str + cbdata->io_buf->len,
+                               cbdata->sz - cbdata->io_buf->len);
+
+               if (r == 0) {
+                       rspamd_lua_call_on_complete (cbdata->L, cbdata,
+                                       "Unexpected EOF", NULL, 0);
+                       kill (cbdata->cpid, SIGTERM);
+
+                       return;
+               }
+               else if (r == -1) {
+                       if (errno == EAGAIN || errno == EINTR) {
+                               return;
+                       }
+                       else {
+                               rspamd_lua_call_on_complete (cbdata->L, cbdata,
+                                               strerror (errno), NULL, 0);
+                               kill (cbdata->cpid, SIGTERM);
+
+                               return;
+                       }
+               }
+
+               cbdata->io_buf->len += r;
+
+               if (cbdata->io_buf->len == cbdata->sz) {
+                       gchar rep[4];
+
+                       /* Finished reading data */
+                       if (cbdata->is_error) {
+                               cbdata->io_buf->str[cbdata->io_buf->len] = '\0';
+                               rspamd_lua_call_on_complete (cbdata->L, cbdata,
+                                               cbdata->io_buf->str, NULL, 0);
+                       }
+                       else {
+                               rspamd_lua_call_on_complete (cbdata->L, cbdata,
+                                               NULL, cbdata->io_buf->str, cbdata->io_buf->len);
+                       }
+
+                       /* Write reply to the child */
+                       rspamd_socket_blocking (cbdata->sp[0]);
+                       memset (rep, 0, sizeof (rep));
+                       (void)write (cbdata->sp[0], rep, sizeof (rep));
+               }
+       }
+}
+
+static gint
+lua_worker_spawn_process (lua_State *L)
+{
+       struct rspamd_worker *w = lua_check_worker (L, 1);
+       struct rspamd_lua_process_cbdata *cbdata;
+       struct rspamd_abstract_worker_ctx *actx;
+       struct rspamd_srv_command srv_cmd;
+       pid_t pid;
+       GError *err = NULL;
+       gint func_cbref, cb_cbref;
+
+       if (!rspamd_lua_parse_table_arguments (L, 2, &err,
+               "*func=F;*on_complete=F", &func_cbref, &cb_cbref)) {
+               msg_err ("cannot get parameters list: %e", err);
+
+               if (err) {
+                       g_error_free (err);
+               }
+
+               return 0;
+       }
+
+       cbdata = g_malloc0 (sizeof (*cbdata));
+       cbdata->cb_cbref = cb_cbref;
+       cbdata->func_cbref = func_cbref;
+
+       if (socketpair (AF_UNIX, SOCK_STREAM, 0, cbdata->sp) == -1) {
+               msg_err ("cannot spawn socketpair: %s", strerror (errno));
+               g_free (cbdata);
+               luaL_unref (L, LUA_REGISTRYINDEX, cbdata->func_cbref);
+               luaL_unref (L, LUA_REGISTRYINDEX, cbdata->cb_cbref);
+
+               return 0;
+       }
+
+       actx = w->ctx;
+       cbdata->wrk = w;
+       cbdata->L = L;
+       cbdata->ev_base = actx->ev_base;
+
+       pid = fork ();
+
+       if (pid == -1) {
+               msg_err ("cannot spawn process: %s", strerror (errno));
+               close (cbdata->sp[0]);
+               close (cbdata->sp[1]);
+               luaL_unref (L, LUA_REGISTRYINDEX, cbdata->func_cbref);
+               luaL_unref (L, LUA_REGISTRYINDEX, cbdata->cb_cbref);
+               g_free (cbdata);
+
+               return 0;
+       }
+       else if (pid == 0) {
+               /* Child */
+               gint rc;
+               gchar inbuf[4];
+
+               rspamd_log_update_pid (w->cf->type, w->srv->logger);
+               rc = ottery_init (w->srv->cfg->libs_ctx->ottery_cfg);
+
+               if (rc != OTTERY_ERR_NONE) {
+                       msg_err ("cannot initialize PRNG: %d", rc);
+                       abort ();
+               }
+               rspamd_random_seed_fast ();
+#ifdef HAVE_EVUTIL_RNG_INIT
+               evutil_secure_rng_init ();
+#endif
+
+               close (cbdata->sp[0]);
+               /* Here we assume that we can block on writing results */
+               rspamd_socket_blocking (cbdata->sp[1]);
+               event_reinit (cbdata->ev_base);
+               rspamd_lua_execute_subprocess (L, cbdata);
+
+               /* Wait for parent to reply and exit */
+               rc = read (cbdata->sp[1], inbuf, sizeof (inbuf));
+
+               if (memcmp (inbuf, "\0\0\0\0", 4) == 0) {
+                       exit (EXIT_SUCCESS);
+               }
+               else {
+                       msg_err ("got invalid reply from parent");
+
+                       exit (EXIT_FAILURE);
+               }
+
+       }
+
+       cbdata->cpid = pid;
+       cbdata->io_buf = g_string_sized_new (8);
+       /* Notify main */
+       srv_cmd.type = RSPAMD_SRV_ON_FORK;
+       srv_cmd.cmd.on_fork.state = child_create;
+       srv_cmd.cmd.on_fork.cpid = pid;
+       srv_cmd.cmd.on_fork.ppid = getpid ();
+       rspamd_srv_send_command (w, cbdata->ev_base, &srv_cmd, -1, NULL, NULL);
+
+       close (cbdata->sp[1]);
+       rspamd_socket_nonblocking (cbdata->sp[0]);
+       /* Parent */
+       rspamd_worker_set_signal_handler (SIGCHLD, w, cbdata->ev_base,
+                       rspamd_lua_cld_handler,
+                       cbdata);
+
+       /* Add result pipe waiting */
+       event_set (&cbdata->ev, cbdata->sp[0], EV_READ | EV_PERSIST,
+                       rspamd_lua_subprocess_io, cbdata);
+       event_base_set (cbdata->ev_base, &cbdata->ev);
+       /* TODO: maybe add timeout? */
+       event_add (&cbdata->ev, NULL);
+
+       return 0;
+}
+
+
 struct rspamd_lua_ref_cbdata {
        lua_State *L;
        gint cbref;