aboutsummaryrefslogtreecommitdiffstats
path: root/src/libserver/rspamd_control.c
diff options
context:
space:
mode:
authorVsevolod Stakhov <vsevolod@highsecure.ru>2015-11-25 14:14:09 +0000
committerVsevolod Stakhov <vsevolod@highsecure.ru>2015-11-25 14:14:09 +0000
commit840f144b9d399d69f06c921f23e36c13d0b498ff (patch)
treef8f3fda3f8a2178247b07b6f40afe91ea8fe3f2f /src/libserver/rspamd_control.c
parent64839a3bfb364431c1e5640d9e51f84607055d2a (diff)
downloadrspamd-840f144b9d399d69f06c921f23e36c13d0b498ff.tar.gz
rspamd-840f144b9d399d69f06c921f23e36c13d0b498ff.zip
Start work on worker->main pipe interface
Diffstat (limited to 'src/libserver/rspamd_control.c')
-rw-r--r--src/libserver/rspamd_control.c129
1 files changed, 129 insertions, 0 deletions
diff --git a/src/libserver/rspamd_control.c b/src/libserver/rspamd_control.c
index 9e17f2120..f16b095ff 100644
--- a/src/libserver/rspamd_control.c
+++ b/src/libserver/rspamd_control.c
@@ -484,3 +484,132 @@ rspamd_control_worker_add_cmd_handler (struct rspamd_worker *worker,
cd->handlers[type].handler = handler;
cd->handlers[type].ud = ud;
}
+
+struct rspamd_srv_reply_data {
+ struct rspamd_worker *worker;
+ gint fd;
+ struct rspamd_srv_reply rep;
+};
+
+static void
+rspamd_srv_handler (gint fd, short what, gpointer ud)
+{
+ struct rspamd_worker *worker;
+ struct rspamd_srv_command cmd;
+ struct rspamd_main *srv;
+ struct rspamd_srv_reply_data *rdata;
+ struct msghdr msg;
+ struct cmsghdr *cmsg;
+ struct iovec iov;
+ guchar fdspace[CMSG_SPACE(sizeof (int))];
+ gint *spair;
+ gchar *nid;
+ gssize r;
+
+ if (what == EV_READ) {
+ worker = ud;
+ srv = worker->srv;
+
+ r = read (fd, &cmd, sizeof (cmd));
+
+ if (r == -1) {
+ msg_err ("cannot read from worker's srv pipe: %s",
+ strerror (errno));
+ }
+ else if (r != sizeof (cmd)) {
+ msg_err ("cannot read from worker's srv pipe incomplete command: %d",
+ (gint) r);
+ }
+ else {
+ rdata = g_slice_alloc0 (sizeof (*rdata));
+ rdata->worker = worker;
+ rdata->rep.id = cmd.id;
+ rdata->rep.type = cmd.type;
+ rdata->fd = -1;
+
+ switch (cmd.type) {
+ case RSPAMD_SRV_SOCKETPAIR:
+ spair = g_hash_table_lookup (srv->spairs, cmd.cmd.spair.pair_id);
+ if (spair == NULL) {
+ spair = g_malloc (sizeof (gint) * 2);
+ if (rspamd_socketpair (spair) == -1) {
+ rdata->rep.reply.spair.code = errno;
+ msg_err ("cannot create socket pair: %s", strerror (errno));
+ }
+ else {
+ nid = g_malloc (sizeof (cmd.cmd.spair.pair_id));
+ memcpy (nid, cmd.cmd.spair.pair_id,
+ sizeof (cmd.cmd.spair.pair_id));
+ g_hash_table_insert (srv->spairs,
+ cmd.cmd.spair.pair_id, spair);
+ rdata->rep.reply.spair.code = 0;
+ rdata->fd = cmd.cmd.spair.pair_num ? spair[1] : spair[0];
+ }
+ }
+ else {
+ rdata->rep.reply.spair.code = 0;
+ rdata->fd = cmd.cmd.spair.pair_num ? spair[1] : spair[0];
+ }
+ break;
+ default:
+ msg_err ("unknown command type: %d", cmd.type);
+ break;
+ }
+
+ /* Now plan write event and send data back */
+ event_del (&worker->srv_ev);
+ event_set (&worker->srv_ev,
+ worker->srv_pipe[0],
+ EV_WRITE,
+ rspamd_srv_handler,
+ rdata);
+ event_add (&worker->srv_ev, NULL);
+ }
+ }
+ else if (what == EV_WRITE) {
+ rdata = ud;
+
+ memset (&msg, 0, sizeof (msg));
+
+ /* Attach fd to the message */
+ if (rdata->fd != -1) {
+ msg.msg_control = fdspace;
+ msg.msg_controllen = sizeof (fdspace);
+ cmsg = CMSG_FIRSTHDR (&msg);
+ cmsg->cmsg_level = SOL_SOCKET;
+ cmsg->cmsg_type = SCM_RIGHTS;
+ cmsg->cmsg_len = CMSG_LEN (sizeof (int));
+ memcpy (CMSG_DATA (cmsg), &rdata->fd, sizeof (int));
+ }
+
+ iov.iov_base = &rdata->rep;
+ iov.iov_len = sizeof (rdata->rep);
+ msg.msg_iov = &iov;
+ msg.msg_iovlen = 1;
+
+ r = sendmsg (fd, &msg, 0);
+
+ if (r == -1) {
+ msg_err ("cannot write to worker's srv pipe: %s",
+ strerror (errno));
+ }
+
+ g_slice_free1 (sizeof (*rdata), rdata);
+ event_del (&worker->srv_ev);
+ event_set (&worker->srv_ev,
+ worker->srv_pipe[0],
+ EV_READ | EV_PERSIST,
+ rspamd_srv_handler,
+ worker);
+ event_add (&worker->srv_ev, NULL);
+ }
+}
+
+void rspamd_main_start_watching (struct rspamd_worker *worker,
+ struct event_base *ev_base)
+{
+ event_set (&worker->srv_ev, worker->srv_pipe[0], EV_READ | EV_PERSIST,
+ rspamd_srv_handler, worker);
+ event_base_set (ev_base, &worker->srv_ev);
+ event_add (&worker->srv_ev, NULL);
+}