From 840f144b9d399d69f06c921f23e36c13d0b498ff Mon Sep 17 00:00:00 2001 From: Vsevolod Stakhov Date: Wed, 25 Nov 2015 14:14:09 +0000 Subject: Start work on worker->main pipe interface --- src/libserver/rspamd_control.c | 129 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 129 insertions(+) (limited to 'src/libserver/rspamd_control.c') diff --git a/src/libserver/rspamd_control.c b/src/libserver/rspamd_control.c index 9e17f2120..f16b095ff 100644 --- a/src/libserver/rspamd_control.c +++ b/src/libserver/rspamd_control.c @@ -484,3 +484,132 @@ rspamd_control_worker_add_cmd_handler (struct rspamd_worker *worker, cd->handlers[type].handler = handler; cd->handlers[type].ud = ud; } + +struct rspamd_srv_reply_data { + struct rspamd_worker *worker; + gint fd; + struct rspamd_srv_reply rep; +}; + +static void +rspamd_srv_handler (gint fd, short what, gpointer ud) +{ + struct rspamd_worker *worker; + struct rspamd_srv_command cmd; + struct rspamd_main *srv; + struct rspamd_srv_reply_data *rdata; + struct msghdr msg; + struct cmsghdr *cmsg; + struct iovec iov; + guchar fdspace[CMSG_SPACE(sizeof (int))]; + gint *spair; + gchar *nid; + gssize r; + + if (what == EV_READ) { + worker = ud; + srv = worker->srv; + + r = read (fd, &cmd, sizeof (cmd)); + + if (r == -1) { + msg_err ("cannot read from worker's srv pipe: %s", + strerror (errno)); + } + else if (r != sizeof (cmd)) { + msg_err ("cannot read from worker's srv pipe incomplete command: %d", + (gint) r); + } + else { + rdata = g_slice_alloc0 (sizeof (*rdata)); + rdata->worker = worker; + rdata->rep.id = cmd.id; + rdata->rep.type = cmd.type; + rdata->fd = -1; + + switch (cmd.type) { + case RSPAMD_SRV_SOCKETPAIR: + spair = g_hash_table_lookup (srv->spairs, cmd.cmd.spair.pair_id); + if (spair == NULL) { + spair = g_malloc (sizeof (gint) * 2); + if (rspamd_socketpair (spair) == -1) { + rdata->rep.reply.spair.code = errno; + msg_err ("cannot create socket pair: %s", strerror (errno)); + } + else { + nid = g_malloc (sizeof (cmd.cmd.spair.pair_id)); + memcpy (nid, cmd.cmd.spair.pair_id, + sizeof (cmd.cmd.spair.pair_id)); + g_hash_table_insert (srv->spairs, + cmd.cmd.spair.pair_id, spair); + rdata->rep.reply.spair.code = 0; + rdata->fd = cmd.cmd.spair.pair_num ? spair[1] : spair[0]; + } + } + else { + rdata->rep.reply.spair.code = 0; + rdata->fd = cmd.cmd.spair.pair_num ? spair[1] : spair[0]; + } + break; + default: + msg_err ("unknown command type: %d", cmd.type); + break; + } + + /* Now plan write event and send data back */ + event_del (&worker->srv_ev); + event_set (&worker->srv_ev, + worker->srv_pipe[0], + EV_WRITE, + rspamd_srv_handler, + rdata); + event_add (&worker->srv_ev, NULL); + } + } + else if (what == EV_WRITE) { + rdata = ud; + + memset (&msg, 0, sizeof (msg)); + + /* Attach fd to the message */ + if (rdata->fd != -1) { + msg.msg_control = fdspace; + msg.msg_controllen = sizeof (fdspace); + cmsg = CMSG_FIRSTHDR (&msg); + cmsg->cmsg_level = SOL_SOCKET; + cmsg->cmsg_type = SCM_RIGHTS; + cmsg->cmsg_len = CMSG_LEN (sizeof (int)); + memcpy (CMSG_DATA (cmsg), &rdata->fd, sizeof (int)); + } + + iov.iov_base = &rdata->rep; + iov.iov_len = sizeof (rdata->rep); + msg.msg_iov = &iov; + msg.msg_iovlen = 1; + + r = sendmsg (fd, &msg, 0); + + if (r == -1) { + msg_err ("cannot write to worker's srv pipe: %s", + strerror (errno)); + } + + g_slice_free1 (sizeof (*rdata), rdata); + event_del (&worker->srv_ev); + event_set (&worker->srv_ev, + worker->srv_pipe[0], + EV_READ | EV_PERSIST, + rspamd_srv_handler, + worker); + event_add (&worker->srv_ev, NULL); + } +} + +void rspamd_main_start_watching (struct rspamd_worker *worker, + struct event_base *ev_base) +{ + event_set (&worker->srv_ev, worker->srv_pipe[0], EV_READ | EV_PERSIST, + rspamd_srv_handler, worker); + event_base_set (ev_base, &worker->srv_ev); + event_add (&worker->srv_ev, NULL); +} -- cgit v1.2.3