aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorVsevolod Stakhov <vsevolod@highsecure.ru>2015-11-25 14:43:40 +0000
committerVsevolod Stakhov <vsevolod@highsecure.ru>2015-11-25 14:43:40 +0000
commit6141a5694842f6ba9588eae5cc7cddb10830b9e6 (patch)
treeed872a99e0e428ef350b1dc1b089359d51a72ecf
parent840f144b9d399d69f06c921f23e36c13d0b498ff (diff)
downloadrspamd-6141a5694842f6ba9588eae5cc7cddb10830b9e6.tar.gz
rspamd-6141a5694842f6ba9588eae5cc7cddb10830b9e6.zip
Add method to send data to server pipe.
-rw-r--r--src/libserver/rspamd_control.c106
-rw-r--r--src/libserver/rspamd_control.h15
2 files changed, 119 insertions, 2 deletions
diff --git a/src/libserver/rspamd_control.c b/src/libserver/rspamd_control.c
index f16b095ff..685ffabf8 100644
--- a/src/libserver/rspamd_control.c
+++ b/src/libserver/rspamd_control.c
@@ -568,6 +568,8 @@ rspamd_srv_handler (gint fd, short what, gpointer ud)
}
else if (what == EV_WRITE) {
rdata = ud;
+ worker = rdata->worker;
+ srv = worker->srv;
memset (&msg, 0, sizeof (msg));
@@ -605,11 +607,113 @@ rspamd_srv_handler (gint fd, short what, gpointer ud)
}
}
-void rspamd_main_start_watching (struct rspamd_worker *worker,
+void
+rspamd_srv_start_watching (struct rspamd_worker *worker,
struct event_base *ev_base)
{
+ g_assert (worker != NULL);
+
event_set (&worker->srv_ev, worker->srv_pipe[0], EV_READ | EV_PERSIST,
rspamd_srv_handler, worker);
event_base_set (ev_base, &worker->srv_ev);
event_add (&worker->srv_ev, NULL);
}
+
+struct rspamd_srv_request_data {
+ struct rspamd_worker *worker;
+ struct rspamd_srv_command cmd;
+ struct rspamd_srv_reply rep;
+ rspamd_srv_reply_handler handler;
+ struct event io_ev;
+ gpointer ud;
+};
+
+static void
+rspamd_srv_request_handler (gint fd, short what, gpointer ud)
+{
+ struct rspamd_srv_request_data *rd = ud;
+ struct msghdr msg;
+ struct iovec iov;
+ struct cmsghdr *cmsg;
+ guchar fdspace[CMSG_SPACE(sizeof (int))];
+ gssize r;
+ gint rfd = -1;
+
+ if (what == EV_WRITE) {
+ /* Send request to server */
+ r = write (fd, &rd->cmd, sizeof (rd->cmd));
+
+ if (r == -1) {
+ msg_err ("cannot write to server pipe: %s", strerror (errno));
+ goto cleanup;
+ }
+
+ event_del (&rd->io_ev);
+ event_set (&rd->io_ev, rd->worker->srv_pipe[1], EV_READ,
+ rspamd_srv_request_handler, rd);
+ event_add (&rd->io_ev, NULL);
+ }
+ else {
+ iov.iov_base = &rd->rep;
+ iov.iov_len = sizeof (rd->rep);
+ memset (&msg, 0, sizeof (msg));
+ msg.msg_control = fdspace;
+ msg.msg_controllen = sizeof (fdspace);
+ msg.msg_iov = &iov;
+ msg.msg_iovlen = 1;
+
+ r = recvmsg (fd, &msg, 0);
+
+ if (r == -1) {
+ msg_err ("cannot read from server pipe: %s", strerror (errno));
+ goto cleanup;
+ }
+
+ if (r < (gint)sizeof (rd->rep)) {
+ msg_err ("cannot read from server pipe, invalid length: %d",
+ (gint)r);
+ goto cleanup;
+ }
+
+ if (msg.msg_controllen >= CMSG_SPACE(sizeof (int))) {
+ cmsg = CMSG_FIRSTHDR (&msg);
+
+ rfd = *(int *) CMSG_DATA(CMSG_FIRSTHDR (&msg));
+ }
+
+ goto cleanup;
+ }
+
+ return;
+
+cleanup:
+ rd->handler (rd->worker, &rd->rep, rfd, rd->ud);
+ event_del (&rd->io_ev);
+ g_slice_free1 (sizeof (*rd), rd);
+}
+
+void
+rspamd_srv_send_command (struct rspamd_worker *worker,
+ struct event_base *ev_base,
+ struct rspamd_srv_command *cmd,
+ rspamd_srv_reply_handler handler, gpointer ud)
+{
+ struct rspamd_srv_request_data *rd;
+
+ g_assert (cmd != NULL);
+ g_assert (worker != NULL);
+ g_assert (handler != NULL);
+
+ rd = g_slice_alloc0 (sizeof (*rd));
+ memcpy (&rd->cmd, cmd, sizeof (rd->cmd));
+ rd->handler = handler;
+ rd->ud = ud;
+ rd->worker = worker;
+ rd->rep.id = cmd->id;
+ rd->rep.type = cmd->type;
+
+ event_set (&rd->io_ev, worker->srv_pipe[1], EV_WRITE,
+ rspamd_srv_request_handler, rd);
+ event_base_set (ev_base, &rd->io_ev);
+ event_add (&rd->io_ev, NULL);
+}
diff --git a/src/libserver/rspamd_control.h b/src/libserver/rspamd_control.h
index 24874e785..5e8497a0a 100644
--- a/src/libserver/rspamd_control.h
+++ b/src/libserver/rspamd_control.h
@@ -104,6 +104,10 @@ typedef gboolean (*rspamd_worker_control_handler) (struct rspamd_main *rspamd_ma
struct rspamd_control_command *cmd,
gpointer ud);
+typedef void (*rspamd_srv_reply_handler) (struct rspamd_worker *worker,
+ struct rspamd_srv_reply *rep, gint rep_fd,
+ gpointer ud);
+
/**
* Process client socket connection
*/
@@ -127,7 +131,16 @@ void rspamd_control_worker_add_cmd_handler (struct rspamd_worker *worker,
/**
* Start watching on srv pipe
*/
-void rspamd_main_start_watching (struct rspamd_worker *worker,
+void rspamd_srv_start_watching (struct rspamd_worker *worker,
struct event_base *ev_base);
+
+/**
+ * Send command to srv pipe and read reply calling the specified callback at the
+ * end
+ */
+void rspamd_srv_send_command (struct rspamd_worker *worker,
+ struct event_base *ev_base,
+ struct rspamd_srv_command *cmd,
+ rspamd_srv_reply_handler handler, gpointer ud);
#endif