From a60c4fc67d4253f3af20f131e5af50eed82aa13e Mon Sep 17 00:00:00 2001 From: Vsevolod Stakhov Date: Sat, 22 Jul 2017 20:36:04 +0100 Subject: [PATCH] [Feature] Allow to spawn asynchronous processes from Lua --- src/lua/lua_common.c | 372 +++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 372 insertions(+) diff --git a/src/lua/lua_common.c b/src/lua/lua_common.c index 38b6ee81b..22d1f2446 100644 --- a/src/lua/lua_common.c +++ b/src/lua/lua_common.c @@ -16,7 +16,13 @@ #include "lua_common.h" #include "lptree.h" #include "utlist.h" +#include "unix-std.h" +#include "worker_util.h" +#include "ottery.h" +#include "rspamd_control.h" #include +#include +#include /* Lua module init function */ #define MODULE_INIT_FUNC "module_init" @@ -31,12 +37,14 @@ LUA_FUNCTION_DEF (worker, get_name); LUA_FUNCTION_DEF (worker, get_stat); LUA_FUNCTION_DEF (worker, get_index); LUA_FUNCTION_DEF (worker, get_pid); +LUA_FUNCTION_DEF (worker, spawn_process); const luaL_reg worker_reg[] = { LUA_INTERFACE_DEF (worker, get_name), LUA_INTERFACE_DEF (worker, get_stat), LUA_INTERFACE_DEF (worker, get_index), LUA_INTERFACE_DEF (worker, get_pid), + LUA_INTERFACE_DEF (worker, spawn_process), {"__tostring", rspamd_lua_class_tostring}, {NULL, NULL} }; @@ -1331,6 +1339,370 @@ lua_worker_get_pid (lua_State *L) return 1; } +struct rspamd_lua_process_cbdata { + gint sp[2]; + gint func_cbref; + gint cb_cbref; + gboolean replied; + gboolean is_error; + pid_t cpid; + lua_State *L; + guint64 sz; + GString *io_buf; + struct rspamd_worker *wrk; + struct event_base *ev_base; + struct event ev; +}; + +static void +rspamd_lua_execute_subprocess (lua_State *L, + struct rspamd_lua_process_cbdata *cbdata) +{ + gint err_idx, r; + GString *tb; + guint64 wlen = 0; + const gchar *ret; + gsize retlen; + + lua_pushcfunction (L, &rspamd_lua_traceback); + err_idx = lua_gettop (L); + + lua_rawgeti (L, LUA_REGISTRYINDEX, cbdata->func_cbref); + + if (lua_pcall (L, 0, 1, err_idx) != 0) { + tb = lua_touserdata (L, -1); + msg_err ("call to subprocess failed: %v", tb); + /* Indicate error */ + wlen = (1ULL << 63) + tb->len; + g_string_free (tb, TRUE); + + r = write (cbdata->sp[1], &wlen, sizeof (wlen)); + if (r == -1) { + msg_err ("write failed: %s", strerror (errno)); + } + + r = write (cbdata->sp[1], tb->str, tb->len); + if (r == -1) { + msg_err ("write failed: %s", strerror (errno)); + } + + lua_pop (L, 1); + } + else { + ret = lua_tolstring (L, -1, &retlen); + wlen = retlen; + + r = write (cbdata->sp[1], &wlen, sizeof (wlen)); + if (r == -1) { + msg_err ("write failed: %s", strerror (errno)); + } + + r = write (cbdata->sp[1], ret, retlen); + if (r == -1) { + msg_err ("write failed: %s", strerror (errno)); + } + } + + lua_pop (L, 1); /* Error function */ +} + +static void +rspamd_lua_call_on_complete (lua_State *L, + struct rspamd_lua_process_cbdata *cbdata, + const gchar *err_msg, const gchar *data, gsize datalen) +{ + gint err_idx; + GString *tb; + + lua_pushcfunction (L, &rspamd_lua_traceback); + err_idx = lua_gettop (L); + + lua_rawgeti (L, LUA_REGISTRYINDEX, cbdata->cb_cbref); + + if (err_msg) { + lua_pushstring (L, err_msg); + } + else { + lua_pushnil (L); + } + + if (data) { + lua_pushlstring (L, data, datalen); + } + else { + lua_pushnil (L); + } + + if (lua_pcall (L, 2, 0, err_idx) != 0) { + tb = lua_touserdata (L, -1); + msg_err ("call to subprocess callback script failed: %v", tb); + lua_pop (L, 1); + } + + lua_pop (L, 1); /* Error function */ +} + +static gboolean +rspamd_lua_cld_handler (struct rspamd_worker_signal_handler *sigh, void *ud) +{ + struct rspamd_lua_process_cbdata *cbdata = ud; + struct rspamd_srv_command srv_cmd; + lua_State *L; + pid_t died; + gint res = 0; + + /* Are we called by a correct children ? */ + died = waitpid (cbdata->cpid, &res, WNOHANG); + + if (died <= 0) { + /* Wait more */ + return TRUE; + } + + L = cbdata->L; + msg_info ("handled SIGCHLD from %p", cbdata->cpid); + + if (!cbdata->replied) { + /* We still need to call on_complete callback */ + rspamd_lua_call_on_complete (cbdata->L, cbdata, + "Worker has died without reply", NULL, 0); + } + + /* Free structures */ + event_del (&cbdata->ev); + close (cbdata->sp[0]); + close (cbdata->sp[1]); + luaL_unref (L, LUA_REGISTRYINDEX, cbdata->func_cbref); + luaL_unref (L, LUA_REGISTRYINDEX, cbdata->cb_cbref); + g_string_free (cbdata->io_buf, TRUE); + g_free (cbdata); + + /* Notify main */ + srv_cmd.type = RSPAMD_SRV_ON_FORK; + srv_cmd.cmd.on_fork.state = child_dead; + srv_cmd.cmd.on_fork.cpid = cbdata->cpid; + srv_cmd.cmd.on_fork.ppid = getpid (); + rspamd_srv_send_command (cbdata->wrk, cbdata->ev_base, &srv_cmd, -1, + NULL, NULL); + + /* We are done with this SIGCHLD */ + return FALSE; +} + +static void +rspamd_lua_subprocess_io (gint fd, short what, gpointer ud) +{ + struct rspamd_lua_process_cbdata *cbdata = ud; + gssize r; + + if (cbdata->io_buf->len < sizeof (guint64)) { + guint64 sz; + + /* We read size of reply + flags first */ + r = read (cbdata->sp[0], cbdata->io_buf->str + cbdata->io_buf->len, + sizeof (guint64) - cbdata->io_buf->len); + + if (r == 0) { + rspamd_lua_call_on_complete (cbdata->L, cbdata, + "Unexpected EOF", NULL, 0); + kill (cbdata->cpid, SIGTERM); + + return; + } + else if (r == -1) { + if (errno == EAGAIN || errno == EINTR) { + return; + } + else { + rspamd_lua_call_on_complete (cbdata->L, cbdata, + strerror (errno), NULL, 0); + kill (cbdata->cpid, SIGTERM); + + return; + } + } + + cbdata->io_buf->len += r; + + if (cbdata->io_buf->len == sizeof (guint64)) { + memcpy ((guchar *)&sz, cbdata->io_buf->str, sizeof (sz)); + + if (sz & (1ULL << 63)) { + cbdata->is_error = TRUE; + sz &= ~(1ULL << 63); + } + + cbdata->io_buf->len = 0; + cbdata->sz = sz; + g_string_set_size (cbdata->io_buf, sz + 1); + cbdata->io_buf->len = 0; + } + } + else { + /* Read data */ + r = read (cbdata->sp[0], cbdata->io_buf->str + cbdata->io_buf->len, + cbdata->sz - cbdata->io_buf->len); + + if (r == 0) { + rspamd_lua_call_on_complete (cbdata->L, cbdata, + "Unexpected EOF", NULL, 0); + kill (cbdata->cpid, SIGTERM); + + return; + } + else if (r == -1) { + if (errno == EAGAIN || errno == EINTR) { + return; + } + else { + rspamd_lua_call_on_complete (cbdata->L, cbdata, + strerror (errno), NULL, 0); + kill (cbdata->cpid, SIGTERM); + + return; + } + } + + cbdata->io_buf->len += r; + + if (cbdata->io_buf->len == cbdata->sz) { + gchar rep[4]; + + /* Finished reading data */ + if (cbdata->is_error) { + cbdata->io_buf->str[cbdata->io_buf->len] = '\0'; + rspamd_lua_call_on_complete (cbdata->L, cbdata, + cbdata->io_buf->str, NULL, 0); + } + else { + rspamd_lua_call_on_complete (cbdata->L, cbdata, + NULL, cbdata->io_buf->str, cbdata->io_buf->len); + } + + /* Write reply to the child */ + rspamd_socket_blocking (cbdata->sp[0]); + memset (rep, 0, sizeof (rep)); + (void)write (cbdata->sp[0], rep, sizeof (rep)); + } + } +} + +static gint +lua_worker_spawn_process (lua_State *L) +{ + struct rspamd_worker *w = lua_check_worker (L, 1); + struct rspamd_lua_process_cbdata *cbdata; + struct rspamd_abstract_worker_ctx *actx; + struct rspamd_srv_command srv_cmd; + pid_t pid; + GError *err = NULL; + gint func_cbref, cb_cbref; + + if (!rspamd_lua_parse_table_arguments (L, 2, &err, + "*func=F;*on_complete=F", &func_cbref, &cb_cbref)) { + msg_err ("cannot get parameters list: %e", err); + + if (err) { + g_error_free (err); + } + + return 0; + } + + cbdata = g_malloc0 (sizeof (*cbdata)); + cbdata->cb_cbref = cb_cbref; + cbdata->func_cbref = func_cbref; + + if (socketpair (AF_UNIX, SOCK_STREAM, 0, cbdata->sp) == -1) { + msg_err ("cannot spawn socketpair: %s", strerror (errno)); + g_free (cbdata); + luaL_unref (L, LUA_REGISTRYINDEX, cbdata->func_cbref); + luaL_unref (L, LUA_REGISTRYINDEX, cbdata->cb_cbref); + + return 0; + } + + actx = w->ctx; + cbdata->wrk = w; + cbdata->L = L; + cbdata->ev_base = actx->ev_base; + + pid = fork (); + + if (pid == -1) { + msg_err ("cannot spawn process: %s", strerror (errno)); + close (cbdata->sp[0]); + close (cbdata->sp[1]); + luaL_unref (L, LUA_REGISTRYINDEX, cbdata->func_cbref); + luaL_unref (L, LUA_REGISTRYINDEX, cbdata->cb_cbref); + g_free (cbdata); + + return 0; + } + else if (pid == 0) { + /* Child */ + gint rc; + gchar inbuf[4]; + + rspamd_log_update_pid (w->cf->type, w->srv->logger); + rc = ottery_init (w->srv->cfg->libs_ctx->ottery_cfg); + + if (rc != OTTERY_ERR_NONE) { + msg_err ("cannot initialize PRNG: %d", rc); + abort (); + } + rspamd_random_seed_fast (); +#ifdef HAVE_EVUTIL_RNG_INIT + evutil_secure_rng_init (); +#endif + + close (cbdata->sp[0]); + /* Here we assume that we can block on writing results */ + rspamd_socket_blocking (cbdata->sp[1]); + event_reinit (cbdata->ev_base); + rspamd_lua_execute_subprocess (L, cbdata); + + /* Wait for parent to reply and exit */ + rc = read (cbdata->sp[1], inbuf, sizeof (inbuf)); + + if (memcmp (inbuf, "\0\0\0\0", 4) == 0) { + exit (EXIT_SUCCESS); + } + else { + msg_err ("got invalid reply from parent"); + + exit (EXIT_FAILURE); + } + + } + + cbdata->cpid = pid; + cbdata->io_buf = g_string_sized_new (8); + /* Notify main */ + srv_cmd.type = RSPAMD_SRV_ON_FORK; + srv_cmd.cmd.on_fork.state = child_create; + srv_cmd.cmd.on_fork.cpid = pid; + srv_cmd.cmd.on_fork.ppid = getpid (); + rspamd_srv_send_command (w, cbdata->ev_base, &srv_cmd, -1, NULL, NULL); + + close (cbdata->sp[1]); + rspamd_socket_nonblocking (cbdata->sp[0]); + /* Parent */ + rspamd_worker_set_signal_handler (SIGCHLD, w, cbdata->ev_base, + rspamd_lua_cld_handler, + cbdata); + + /* Add result pipe waiting */ + event_set (&cbdata->ev, cbdata->sp[0], EV_READ | EV_PERSIST, + rspamd_lua_subprocess_io, cbdata); + event_base_set (cbdata->ev_base, &cbdata->ev); + /* TODO: maybe add timeout? */ + event_add (&cbdata->ev, NULL); + + return 0; +} + + struct rspamd_lua_ref_cbdata { lua_State *L; gint cbref; -- 2.39.5