From 840f144b9d399d69f06c921f23e36c13d0b498ff Mon Sep 17 00:00:00 2001 From: Vsevolod Stakhov Date: Wed, 25 Nov 2015 14:14:09 +0000 Subject: [PATCH] Start work on worker->main pipe interface --- src/libserver/rspamd_control.c | 129 +++++++++++++++++++++++++++++++++ src/libserver/rspamd_control.h | 33 +++++++++ src/libserver/worker_util.c | 5 ++ src/rspamd.h | 7 +- 4 files changed, 172 insertions(+), 2 deletions(-) diff --git a/src/libserver/rspamd_control.c b/src/libserver/rspamd_control.c index 9e17f2120..f16b095ff 100644 --- a/src/libserver/rspamd_control.c +++ b/src/libserver/rspamd_control.c @@ -484,3 +484,132 @@ rspamd_control_worker_add_cmd_handler (struct rspamd_worker *worker, cd->handlers[type].handler = handler; cd->handlers[type].ud = ud; } + +struct rspamd_srv_reply_data { + struct rspamd_worker *worker; + gint fd; + struct rspamd_srv_reply rep; +}; + +static void +rspamd_srv_handler (gint fd, short what, gpointer ud) +{ + struct rspamd_worker *worker; + struct rspamd_srv_command cmd; + struct rspamd_main *srv; + struct rspamd_srv_reply_data *rdata; + struct msghdr msg; + struct cmsghdr *cmsg; + struct iovec iov; + guchar fdspace[CMSG_SPACE(sizeof (int))]; + gint *spair; + gchar *nid; + gssize r; + + if (what == EV_READ) { + worker = ud; + srv = worker->srv; + + r = read (fd, &cmd, sizeof (cmd)); + + if (r == -1) { + msg_err ("cannot read from worker's srv pipe: %s", + strerror (errno)); + } + else if (r != sizeof (cmd)) { + msg_err ("cannot read from worker's srv pipe incomplete command: %d", + (gint) r); + } + else { + rdata = g_slice_alloc0 (sizeof (*rdata)); + rdata->worker = worker; + rdata->rep.id = cmd.id; + rdata->rep.type = cmd.type; + rdata->fd = -1; + + switch (cmd.type) { + case RSPAMD_SRV_SOCKETPAIR: + spair = g_hash_table_lookup (srv->spairs, cmd.cmd.spair.pair_id); + if (spair == NULL) { + spair = g_malloc (sizeof (gint) * 2); + if (rspamd_socketpair (spair) == -1) { + rdata->rep.reply.spair.code = errno; + msg_err ("cannot create socket pair: %s", strerror (errno)); + } + else { + nid = g_malloc (sizeof (cmd.cmd.spair.pair_id)); + memcpy (nid, cmd.cmd.spair.pair_id, + sizeof (cmd.cmd.spair.pair_id)); + g_hash_table_insert (srv->spairs, + cmd.cmd.spair.pair_id, spair); + rdata->rep.reply.spair.code = 0; + rdata->fd = cmd.cmd.spair.pair_num ? spair[1] : spair[0]; + } + } + else { + rdata->rep.reply.spair.code = 0; + rdata->fd = cmd.cmd.spair.pair_num ? spair[1] : spair[0]; + } + break; + default: + msg_err ("unknown command type: %d", cmd.type); + break; + } + + /* Now plan write event and send data back */ + event_del (&worker->srv_ev); + event_set (&worker->srv_ev, + worker->srv_pipe[0], + EV_WRITE, + rspamd_srv_handler, + rdata); + event_add (&worker->srv_ev, NULL); + } + } + else if (what == EV_WRITE) { + rdata = ud; + + memset (&msg, 0, sizeof (msg)); + + /* Attach fd to the message */ + if (rdata->fd != -1) { + msg.msg_control = fdspace; + msg.msg_controllen = sizeof (fdspace); + cmsg = CMSG_FIRSTHDR (&msg); + cmsg->cmsg_level = SOL_SOCKET; + cmsg->cmsg_type = SCM_RIGHTS; + cmsg->cmsg_len = CMSG_LEN (sizeof (int)); + memcpy (CMSG_DATA (cmsg), &rdata->fd, sizeof (int)); + } + + iov.iov_base = &rdata->rep; + iov.iov_len = sizeof (rdata->rep); + msg.msg_iov = &iov; + msg.msg_iovlen = 1; + + r = sendmsg (fd, &msg, 0); + + if (r == -1) { + msg_err ("cannot write to worker's srv pipe: %s", + strerror (errno)); + } + + g_slice_free1 (sizeof (*rdata), rdata); + event_del (&worker->srv_ev); + event_set (&worker->srv_ev, + worker->srv_pipe[0], + EV_READ | EV_PERSIST, + rspamd_srv_handler, + worker); + event_add (&worker->srv_ev, NULL); + } +} + +void rspamd_main_start_watching (struct rspamd_worker *worker, + struct event_base *ev_base) +{ + event_set (&worker->srv_ev, worker->srv_pipe[0], EV_READ | EV_PERSIST, + rspamd_srv_handler, worker); + event_base_set (ev_base, &worker->srv_ev); + event_add (&worker->srv_ev, NULL); +} diff --git a/src/libserver/rspamd_control.h b/src/libserver/rspamd_control.h index ddca143cc..24874e785 100644 --- a/src/libserver/rspamd_control.h +++ b/src/libserver/rspamd_control.h @@ -38,6 +38,10 @@ enum rspamd_control_type { RSPAMD_CONTROL_MAX }; +enum rspamd_srv_type { + RSPAMD_SRV_SOCKETPAIR = 0, +}; + struct rspamd_control_command { enum rspamd_control_type type; union { @@ -72,6 +76,29 @@ struct rspamd_control_reply { } reply; }; +#define PAIR_ID_LEN 16 +struct rspamd_srv_command { + enum rspamd_srv_type type; + guint64 id; + union { + struct { + gint af; + gchar pair_id[PAIR_ID_LEN]; + guint pair_num; + } spair; + } cmd; +}; + +struct rspamd_srv_reply { + enum rspamd_srv_type type; + guint64 id; + union { + struct { + gint code; + } spair; + } reply; +}; + typedef gboolean (*rspamd_worker_control_handler) (struct rspamd_main *rspamd_main, struct rspamd_worker *worker, gint fd, struct rspamd_control_command *cmd, @@ -97,4 +124,10 @@ void rspamd_control_worker_add_cmd_handler (struct rspamd_worker *worker, rspamd_worker_control_handler handler, gpointer ud); +/** + * Start watching on srv pipe + */ +void rspamd_main_start_watching (struct rspamd_worker *worker, + struct event_base *ev_base); + #endif diff --git a/src/libserver/worker_util.c b/src/libserver/worker_util.c index f404a3295..4b24ee377 100644 --- a/src/libserver/worker_util.c +++ b/src/libserver/worker_util.c @@ -448,6 +448,11 @@ rspamd_fork_worker (struct rspamd_main *rspamd_main, exit (-errno); } + if (!rspamd_socketpair (cur->srv_pipe)) { + msg_err ("socketpair failure: %s", strerror (errno)); + exit (-errno); + } + cur->srv = rspamd_main; cur->type = cf->type; cur->cf = g_malloc (sizeof (struct rspamd_worker_conf)); diff --git a/src/rspamd.h b/src/rspamd.h index f10a8e90a..61ef25cd4 100644 --- a/src/rspamd.h +++ b/src/rspamd.h @@ -56,6 +56,9 @@ struct rspamd_worker { gpointer ctx; /**< worker's specific data */ gint control_pipe[2]; /**< control pipe. [0] is used by main process, [1] is used by a worker */ + gint srv_pipe[2]; /**< used by workers to request something from the + main process. [0] - main, [1] - worker */ + struct event srv_ev; /**< used by main for read workers' requests */ gpointer control_data; /**< used by control protocol to handle commands */ }; @@ -160,11 +163,11 @@ struct rspamd_main { /* Pid file structure */ rspamd_pidfh_t *pfh; /**< struct pidfh for pidfile */ GQuark type; /**< process type */ - guint ev_initialized; /**< is event system is initialized */ struct rspamd_stat *stat; /**< pointer to statistics */ - rspamd_mempool_t *server_pool; /**< server's memory pool */ + rspamd_mempool_t *server_pool; /**< server's memory pool */ GHashTable *workers; /**< workers pool indexed by pid */ + GHashTable *spairs; /**< socket pairs requested by workers */ rspamd_logger_t *logger; uid_t workers_uid; /**< worker's uid running to */ gid_t workers_gid; /**< worker's gid running to */ -- 2.39.5