aboutsummaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
authorVsevolod Stakhov <vsevolod@highsecure.ru>2017-07-22 20:36:04 +0100
committerVsevolod Stakhov <vsevolod@highsecure.ru>2017-07-22 20:36:04 +0100
commita60c4fc67d4253f3af20f131e5af50eed82aa13e (patch)
tree62fda3cb96996831a48163dd6323847ceb5105da /src
parent02f3d42c14e272b6afaf791e59523dfbc58b0221 (diff)
downloadrspamd-a60c4fc67d4253f3af20f131e5af50eed82aa13e.tar.gz
rspamd-a60c4fc67d4253f3af20f131e5af50eed82aa13e.zip
[Feature] Allow to spawn asynchronous processes from Lua
Diffstat (limited to 'src')
-rw-r--r--src/lua/lua_common.c372
1 files changed, 372 insertions, 0 deletions
diff --git a/src/lua/lua_common.c b/src/lua/lua_common.c
index 38b6ee81b..22d1f2446 100644
--- a/src/lua/lua_common.c
+++ b/src/lua/lua_common.c
@@ -16,7 +16,13 @@
#include "lua_common.h"
#include "lptree.h"
#include "utlist.h"
+#include "unix-std.h"
+#include "worker_util.h"
+#include "ottery.h"
+#include "rspamd_control.h"
#include <math.h>
+#include <sys/wait.h>
+#include <src/libserver/rspamd_control.h>
/* Lua module init function */
#define MODULE_INIT_FUNC "module_init"
@@ -31,12 +37,14 @@ LUA_FUNCTION_DEF (worker, get_name);
LUA_FUNCTION_DEF (worker, get_stat);
LUA_FUNCTION_DEF (worker, get_index);
LUA_FUNCTION_DEF (worker, get_pid);
+LUA_FUNCTION_DEF (worker, spawn_process);
const luaL_reg worker_reg[] = {
LUA_INTERFACE_DEF (worker, get_name),
LUA_INTERFACE_DEF (worker, get_stat),
LUA_INTERFACE_DEF (worker, get_index),
LUA_INTERFACE_DEF (worker, get_pid),
+ LUA_INTERFACE_DEF (worker, spawn_process),
{"__tostring", rspamd_lua_class_tostring},
{NULL, NULL}
};
@@ -1331,6 +1339,370 @@ lua_worker_get_pid (lua_State *L)
return 1;
}
+struct rspamd_lua_process_cbdata {
+ gint sp[2];
+ gint func_cbref;
+ gint cb_cbref;
+ gboolean replied;
+ gboolean is_error;
+ pid_t cpid;
+ lua_State *L;
+ guint64 sz;
+ GString *io_buf;
+ struct rspamd_worker *wrk;
+ struct event_base *ev_base;
+ struct event ev;
+};
+
+static void
+rspamd_lua_execute_subprocess (lua_State *L,
+ struct rspamd_lua_process_cbdata *cbdata)
+{
+ gint err_idx, r;
+ GString *tb;
+ guint64 wlen = 0;
+ const gchar *ret;
+ gsize retlen;
+
+ lua_pushcfunction (L, &rspamd_lua_traceback);
+ err_idx = lua_gettop (L);
+
+ lua_rawgeti (L, LUA_REGISTRYINDEX, cbdata->func_cbref);
+
+ if (lua_pcall (L, 0, 1, err_idx) != 0) {
+ tb = lua_touserdata (L, -1);
+ msg_err ("call to subprocess failed: %v", tb);
+ /* Indicate error */
+ wlen = (1ULL << 63) + tb->len;
+ g_string_free (tb, TRUE);
+
+ r = write (cbdata->sp[1], &wlen, sizeof (wlen));
+ if (r == -1) {
+ msg_err ("write failed: %s", strerror (errno));
+ }
+
+ r = write (cbdata->sp[1], tb->str, tb->len);
+ if (r == -1) {
+ msg_err ("write failed: %s", strerror (errno));
+ }
+
+ lua_pop (L, 1);
+ }
+ else {
+ ret = lua_tolstring (L, -1, &retlen);
+ wlen = retlen;
+
+ r = write (cbdata->sp[1], &wlen, sizeof (wlen));
+ if (r == -1) {
+ msg_err ("write failed: %s", strerror (errno));
+ }
+
+ r = write (cbdata->sp[1], ret, retlen);
+ if (r == -1) {
+ msg_err ("write failed: %s", strerror (errno));
+ }
+ }
+
+ lua_pop (L, 1); /* Error function */
+}
+
+static void
+rspamd_lua_call_on_complete (lua_State *L,
+ struct rspamd_lua_process_cbdata *cbdata,
+ const gchar *err_msg, const gchar *data, gsize datalen)
+{
+ gint err_idx;
+ GString *tb;
+
+ lua_pushcfunction (L, &rspamd_lua_traceback);
+ err_idx = lua_gettop (L);
+
+ lua_rawgeti (L, LUA_REGISTRYINDEX, cbdata->cb_cbref);
+
+ if (err_msg) {
+ lua_pushstring (L, err_msg);
+ }
+ else {
+ lua_pushnil (L);
+ }
+
+ if (data) {
+ lua_pushlstring (L, data, datalen);
+ }
+ else {
+ lua_pushnil (L);
+ }
+
+ if (lua_pcall (L, 2, 0, err_idx) != 0) {
+ tb = lua_touserdata (L, -1);
+ msg_err ("call to subprocess callback script failed: %v", tb);
+ lua_pop (L, 1);
+ }
+
+ lua_pop (L, 1); /* Error function */
+}
+
+static gboolean
+rspamd_lua_cld_handler (struct rspamd_worker_signal_handler *sigh, void *ud)
+{
+ struct rspamd_lua_process_cbdata *cbdata = ud;
+ struct rspamd_srv_command srv_cmd;
+ lua_State *L;
+ pid_t died;
+ gint res = 0;
+
+ /* Are we called by a correct children ? */
+ died = waitpid (cbdata->cpid, &res, WNOHANG);
+
+ if (died <= 0) {
+ /* Wait more */
+ return TRUE;
+ }
+
+ L = cbdata->L;
+ msg_info ("handled SIGCHLD from %p", cbdata->cpid);
+
+ if (!cbdata->replied) {
+ /* We still need to call on_complete callback */
+ rspamd_lua_call_on_complete (cbdata->L, cbdata,
+ "Worker has died without reply", NULL, 0);
+ }
+
+ /* Free structures */
+ event_del (&cbdata->ev);
+ close (cbdata->sp[0]);
+ close (cbdata->sp[1]);
+ luaL_unref (L, LUA_REGISTRYINDEX, cbdata->func_cbref);
+ luaL_unref (L, LUA_REGISTRYINDEX, cbdata->cb_cbref);
+ g_string_free (cbdata->io_buf, TRUE);
+ g_free (cbdata);
+
+ /* Notify main */
+ srv_cmd.type = RSPAMD_SRV_ON_FORK;
+ srv_cmd.cmd.on_fork.state = child_dead;
+ srv_cmd.cmd.on_fork.cpid = cbdata->cpid;
+ srv_cmd.cmd.on_fork.ppid = getpid ();
+ rspamd_srv_send_command (cbdata->wrk, cbdata->ev_base, &srv_cmd, -1,
+ NULL, NULL);
+
+ /* We are done with this SIGCHLD */
+ return FALSE;
+}
+
+static void
+rspamd_lua_subprocess_io (gint fd, short what, gpointer ud)
+{
+ struct rspamd_lua_process_cbdata *cbdata = ud;
+ gssize r;
+
+ if (cbdata->io_buf->len < sizeof (guint64)) {
+ guint64 sz;
+
+ /* We read size of reply + flags first */
+ r = read (cbdata->sp[0], cbdata->io_buf->str + cbdata->io_buf->len,
+ sizeof (guint64) - cbdata->io_buf->len);
+
+ if (r == 0) {
+ rspamd_lua_call_on_complete (cbdata->L, cbdata,
+ "Unexpected EOF", NULL, 0);
+ kill (cbdata->cpid, SIGTERM);
+
+ return;
+ }
+ else if (r == -1) {
+ if (errno == EAGAIN || errno == EINTR) {
+ return;
+ }
+ else {
+ rspamd_lua_call_on_complete (cbdata->L, cbdata,
+ strerror (errno), NULL, 0);
+ kill (cbdata->cpid, SIGTERM);
+
+ return;
+ }
+ }
+
+ cbdata->io_buf->len += r;
+
+ if (cbdata->io_buf->len == sizeof (guint64)) {
+ memcpy ((guchar *)&sz, cbdata->io_buf->str, sizeof (sz));
+
+ if (sz & (1ULL << 63)) {
+ cbdata->is_error = TRUE;
+ sz &= ~(1ULL << 63);
+ }
+
+ cbdata->io_buf->len = 0;
+ cbdata->sz = sz;
+ g_string_set_size (cbdata->io_buf, sz + 1);
+ cbdata->io_buf->len = 0;
+ }
+ }
+ else {
+ /* Read data */
+ r = read (cbdata->sp[0], cbdata->io_buf->str + cbdata->io_buf->len,
+ cbdata->sz - cbdata->io_buf->len);
+
+ if (r == 0) {
+ rspamd_lua_call_on_complete (cbdata->L, cbdata,
+ "Unexpected EOF", NULL, 0);
+ kill (cbdata->cpid, SIGTERM);
+
+ return;
+ }
+ else if (r == -1) {
+ if (errno == EAGAIN || errno == EINTR) {
+ return;
+ }
+ else {
+ rspamd_lua_call_on_complete (cbdata->L, cbdata,
+ strerror (errno), NULL, 0);
+ kill (cbdata->cpid, SIGTERM);
+
+ return;
+ }
+ }
+
+ cbdata->io_buf->len += r;
+
+ if (cbdata->io_buf->len == cbdata->sz) {
+ gchar rep[4];
+
+ /* Finished reading data */
+ if (cbdata->is_error) {
+ cbdata->io_buf->str[cbdata->io_buf->len] = '\0';
+ rspamd_lua_call_on_complete (cbdata->L, cbdata,
+ cbdata->io_buf->str, NULL, 0);
+ }
+ else {
+ rspamd_lua_call_on_complete (cbdata->L, cbdata,
+ NULL, cbdata->io_buf->str, cbdata->io_buf->len);
+ }
+
+ /* Write reply to the child */
+ rspamd_socket_blocking (cbdata->sp[0]);
+ memset (rep, 0, sizeof (rep));
+ (void)write (cbdata->sp[0], rep, sizeof (rep));
+ }
+ }
+}
+
+static gint
+lua_worker_spawn_process (lua_State *L)
+{
+ struct rspamd_worker *w = lua_check_worker (L, 1);
+ struct rspamd_lua_process_cbdata *cbdata;
+ struct rspamd_abstract_worker_ctx *actx;
+ struct rspamd_srv_command srv_cmd;
+ pid_t pid;
+ GError *err = NULL;
+ gint func_cbref, cb_cbref;
+
+ if (!rspamd_lua_parse_table_arguments (L, 2, &err,
+ "*func=F;*on_complete=F", &func_cbref, &cb_cbref)) {
+ msg_err ("cannot get parameters list: %e", err);
+
+ if (err) {
+ g_error_free (err);
+ }
+
+ return 0;
+ }
+
+ cbdata = g_malloc0 (sizeof (*cbdata));
+ cbdata->cb_cbref = cb_cbref;
+ cbdata->func_cbref = func_cbref;
+
+ if (socketpair (AF_UNIX, SOCK_STREAM, 0, cbdata->sp) == -1) {
+ msg_err ("cannot spawn socketpair: %s", strerror (errno));
+ g_free (cbdata);
+ luaL_unref (L, LUA_REGISTRYINDEX, cbdata->func_cbref);
+ luaL_unref (L, LUA_REGISTRYINDEX, cbdata->cb_cbref);
+
+ return 0;
+ }
+
+ actx = w->ctx;
+ cbdata->wrk = w;
+ cbdata->L = L;
+ cbdata->ev_base = actx->ev_base;
+
+ pid = fork ();
+
+ if (pid == -1) {
+ msg_err ("cannot spawn process: %s", strerror (errno));
+ close (cbdata->sp[0]);
+ close (cbdata->sp[1]);
+ luaL_unref (L, LUA_REGISTRYINDEX, cbdata->func_cbref);
+ luaL_unref (L, LUA_REGISTRYINDEX, cbdata->cb_cbref);
+ g_free (cbdata);
+
+ return 0;
+ }
+ else if (pid == 0) {
+ /* Child */
+ gint rc;
+ gchar inbuf[4];
+
+ rspamd_log_update_pid (w->cf->type, w->srv->logger);
+ rc = ottery_init (w->srv->cfg->libs_ctx->ottery_cfg);
+
+ if (rc != OTTERY_ERR_NONE) {
+ msg_err ("cannot initialize PRNG: %d", rc);
+ abort ();
+ }
+ rspamd_random_seed_fast ();
+#ifdef HAVE_EVUTIL_RNG_INIT
+ evutil_secure_rng_init ();
+#endif
+
+ close (cbdata->sp[0]);
+ /* Here we assume that we can block on writing results */
+ rspamd_socket_blocking (cbdata->sp[1]);
+ event_reinit (cbdata->ev_base);
+ rspamd_lua_execute_subprocess (L, cbdata);
+
+ /* Wait for parent to reply and exit */
+ rc = read (cbdata->sp[1], inbuf, sizeof (inbuf));
+
+ if (memcmp (inbuf, "\0\0\0\0", 4) == 0) {
+ exit (EXIT_SUCCESS);
+ }
+ else {
+ msg_err ("got invalid reply from parent");
+
+ exit (EXIT_FAILURE);
+ }
+
+ }
+
+ cbdata->cpid = pid;
+ cbdata->io_buf = g_string_sized_new (8);
+ /* Notify main */
+ srv_cmd.type = RSPAMD_SRV_ON_FORK;
+ srv_cmd.cmd.on_fork.state = child_create;
+ srv_cmd.cmd.on_fork.cpid = pid;
+ srv_cmd.cmd.on_fork.ppid = getpid ();
+ rspamd_srv_send_command (w, cbdata->ev_base, &srv_cmd, -1, NULL, NULL);
+
+ close (cbdata->sp[1]);
+ rspamd_socket_nonblocking (cbdata->sp[0]);
+ /* Parent */
+ rspamd_worker_set_signal_handler (SIGCHLD, w, cbdata->ev_base,
+ rspamd_lua_cld_handler,
+ cbdata);
+
+ /* Add result pipe waiting */
+ event_set (&cbdata->ev, cbdata->sp[0], EV_READ | EV_PERSIST,
+ rspamd_lua_subprocess_io, cbdata);
+ event_base_set (cbdata->ev_base, &cbdata->ev);
+ /* TODO: maybe add timeout? */
+ event_add (&cbdata->ev, NULL);
+
+ return 0;
+}
+
+
struct rspamd_lua_ref_cbdata {
lua_State *L;
gint cbref;