From 6141a5694842f6ba9588eae5cc7cddb10830b9e6 Mon Sep 17 00:00:00 2001 From: Vsevolod Stakhov Date: Wed, 25 Nov 2015 14:43:40 +0000 Subject: Add method to send data to server pipe. --- src/libserver/rspamd_control.c | 106 ++++++++++++++++++++++++++++++++++++++++- src/libserver/rspamd_control.h | 15 +++++- 2 files changed, 119 insertions(+), 2 deletions(-) (limited to 'src') diff --git a/src/libserver/rspamd_control.c b/src/libserver/rspamd_control.c index f16b095ff..685ffabf8 100644 --- a/src/libserver/rspamd_control.c +++ b/src/libserver/rspamd_control.c @@ -568,6 +568,8 @@ rspamd_srv_handler (gint fd, short what, gpointer ud) } else if (what == EV_WRITE) { rdata = ud; + worker = rdata->worker; + srv = worker->srv; memset (&msg, 0, sizeof (msg)); @@ -605,11 +607,113 @@ rspamd_srv_handler (gint fd, short what, gpointer ud) } } -void rspamd_main_start_watching (struct rspamd_worker *worker, +void +rspamd_srv_start_watching (struct rspamd_worker *worker, struct event_base *ev_base) { + g_assert (worker != NULL); + event_set (&worker->srv_ev, worker->srv_pipe[0], EV_READ | EV_PERSIST, rspamd_srv_handler, worker); event_base_set (ev_base, &worker->srv_ev); event_add (&worker->srv_ev, NULL); } + +struct rspamd_srv_request_data { + struct rspamd_worker *worker; + struct rspamd_srv_command cmd; + struct rspamd_srv_reply rep; + rspamd_srv_reply_handler handler; + struct event io_ev; + gpointer ud; +}; + +static void +rspamd_srv_request_handler (gint fd, short what, gpointer ud) +{ + struct rspamd_srv_request_data *rd = ud; + struct msghdr msg; + struct iovec iov; + struct cmsghdr *cmsg; + guchar fdspace[CMSG_SPACE(sizeof (int))]; + gssize r; + gint rfd = -1; + + if (what == EV_WRITE) { + /* Send request to server */ + r = write (fd, &rd->cmd, sizeof (rd->cmd)); + + if (r == -1) { + msg_err ("cannot write to server pipe: %s", strerror (errno)); + goto cleanup; + } + + event_del (&rd->io_ev); + event_set (&rd->io_ev, rd->worker->srv_pipe[1], EV_READ, + rspamd_srv_request_handler, rd); + event_add (&rd->io_ev, NULL); + } + else { + iov.iov_base = &rd->rep; + iov.iov_len = sizeof (rd->rep); + memset (&msg, 0, sizeof (msg)); + msg.msg_control = fdspace; + msg.msg_controllen = sizeof (fdspace); + msg.msg_iov = &iov; + msg.msg_iovlen = 1; + + r = recvmsg (fd, &msg, 0); + + if (r == -1) { + msg_err ("cannot read from server pipe: %s", strerror (errno)); + goto cleanup; + } + + if (r < (gint)sizeof (rd->rep)) { + msg_err ("cannot read from server pipe, invalid length: %d", + (gint)r); + goto cleanup; + } + + if (msg.msg_controllen >= CMSG_SPACE(sizeof (int))) { + cmsg = CMSG_FIRSTHDR (&msg); + + rfd = *(int *) CMSG_DATA(CMSG_FIRSTHDR (&msg)); + } + + goto cleanup; + } + + return; + +cleanup: + rd->handler (rd->worker, &rd->rep, rfd, rd->ud); + event_del (&rd->io_ev); + g_slice_free1 (sizeof (*rd), rd); +} + +void +rspamd_srv_send_command (struct rspamd_worker *worker, + struct event_base *ev_base, + struct rspamd_srv_command *cmd, + rspamd_srv_reply_handler handler, gpointer ud) +{ + struct rspamd_srv_request_data *rd; + + g_assert (cmd != NULL); + g_assert (worker != NULL); + g_assert (handler != NULL); + + rd = g_slice_alloc0 (sizeof (*rd)); + memcpy (&rd->cmd, cmd, sizeof (rd->cmd)); + rd->handler = handler; + rd->ud = ud; + rd->worker = worker; + rd->rep.id = cmd->id; + rd->rep.type = cmd->type; + + event_set (&rd->io_ev, worker->srv_pipe[1], EV_WRITE, + rspamd_srv_request_handler, rd); + event_base_set (ev_base, &rd->io_ev); + event_add (&rd->io_ev, NULL); +} diff --git a/src/libserver/rspamd_control.h b/src/libserver/rspamd_control.h index 24874e785..5e8497a0a 100644 --- a/src/libserver/rspamd_control.h +++ b/src/libserver/rspamd_control.h @@ -104,6 +104,10 @@ typedef gboolean (*rspamd_worker_control_handler) (struct rspamd_main *rspamd_ma struct rspamd_control_command *cmd, gpointer ud); +typedef void (*rspamd_srv_reply_handler) (struct rspamd_worker *worker, + struct rspamd_srv_reply *rep, gint rep_fd, + gpointer ud); + /** * Process client socket connection */ @@ -127,7 +131,16 @@ void rspamd_control_worker_add_cmd_handler (struct rspamd_worker *worker, /** * Start watching on srv pipe */ -void rspamd_main_start_watching (struct rspamd_worker *worker, +void rspamd_srv_start_watching (struct rspamd_worker *worker, struct event_base *ev_base); + +/** + * Send command to srv pipe and read reply calling the specified callback at the + * end + */ +void rspamd_srv_send_command (struct rspamd_worker *worker, + struct event_base *ev_base, + struct rspamd_srv_command *cmd, + rspamd_srv_reply_handler handler, gpointer ud); #endif -- cgit v1.2.3