aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorVsevolod Stakhov <vsevolod@highsecure.ru>2015-11-25 14:14:09 +0000
committerVsevolod Stakhov <vsevolod@highsecure.ru>2015-11-25 14:14:09 +0000
commit840f144b9d399d69f06c921f23e36c13d0b498ff (patch)
treef8f3fda3f8a2178247b07b6f40afe91ea8fe3f2f
parent64839a3bfb364431c1e5640d9e51f84607055d2a (diff)
downloadrspamd-840f144b9d399d69f06c921f23e36c13d0b498ff.tar.gz
rspamd-840f144b9d399d69f06c921f23e36c13d0b498ff.zip
Start work on worker->main pipe interface
-rw-r--r--src/libserver/rspamd_control.c129
-rw-r--r--src/libserver/rspamd_control.h33
-rw-r--r--src/libserver/worker_util.c5
-rw-r--r--src/rspamd.h7
4 files changed, 172 insertions, 2 deletions
diff --git a/src/libserver/rspamd_control.c b/src/libserver/rspamd_control.c
index 9e17f2120..f16b095ff 100644
--- a/src/libserver/rspamd_control.c
+++ b/src/libserver/rspamd_control.c
@@ -484,3 +484,132 @@ rspamd_control_worker_add_cmd_handler (struct rspamd_worker *worker,
cd->handlers[type].handler = handler;
cd->handlers[type].ud = ud;
}
+
+struct rspamd_srv_reply_data {
+ struct rspamd_worker *worker;
+ gint fd;
+ struct rspamd_srv_reply rep;
+};
+
+static void
+rspamd_srv_handler (gint fd, short what, gpointer ud)
+{
+ struct rspamd_worker *worker;
+ struct rspamd_srv_command cmd;
+ struct rspamd_main *srv;
+ struct rspamd_srv_reply_data *rdata;
+ struct msghdr msg;
+ struct cmsghdr *cmsg;
+ struct iovec iov;
+ guchar fdspace[CMSG_SPACE(sizeof (int))];
+ gint *spair;
+ gchar *nid;
+ gssize r;
+
+ if (what == EV_READ) {
+ worker = ud;
+ srv = worker->srv;
+
+ r = read (fd, &cmd, sizeof (cmd));
+
+ if (r == -1) {
+ msg_err ("cannot read from worker's srv pipe: %s",
+ strerror (errno));
+ }
+ else if (r != sizeof (cmd)) {
+ msg_err ("cannot read from worker's srv pipe incomplete command: %d",
+ (gint) r);
+ }
+ else {
+ rdata = g_slice_alloc0 (sizeof (*rdata));
+ rdata->worker = worker;
+ rdata->rep.id = cmd.id;
+ rdata->rep.type = cmd.type;
+ rdata->fd = -1;
+
+ switch (cmd.type) {
+ case RSPAMD_SRV_SOCKETPAIR:
+ spair = g_hash_table_lookup (srv->spairs, cmd.cmd.spair.pair_id);
+ if (spair == NULL) {
+ spair = g_malloc (sizeof (gint) * 2);
+ if (rspamd_socketpair (spair) == -1) {
+ rdata->rep.reply.spair.code = errno;
+ msg_err ("cannot create socket pair: %s", strerror (errno));
+ }
+ else {
+ nid = g_malloc (sizeof (cmd.cmd.spair.pair_id));
+ memcpy (nid, cmd.cmd.spair.pair_id,
+ sizeof (cmd.cmd.spair.pair_id));
+ g_hash_table_insert (srv->spairs,
+ cmd.cmd.spair.pair_id, spair);
+ rdata->rep.reply.spair.code = 0;
+ rdata->fd = cmd.cmd.spair.pair_num ? spair[1] : spair[0];
+ }
+ }
+ else {
+ rdata->rep.reply.spair.code = 0;
+ rdata->fd = cmd.cmd.spair.pair_num ? spair[1] : spair[0];
+ }
+ break;
+ default:
+ msg_err ("unknown command type: %d", cmd.type);
+ break;
+ }
+
+ /* Now plan write event and send data back */
+ event_del (&worker->srv_ev);
+ event_set (&worker->srv_ev,
+ worker->srv_pipe[0],
+ EV_WRITE,
+ rspamd_srv_handler,
+ rdata);
+ event_add (&worker->srv_ev, NULL);
+ }
+ }
+ else if (what == EV_WRITE) {
+ rdata = ud;
+
+ memset (&msg, 0, sizeof (msg));
+
+ /* Attach fd to the message */
+ if (rdata->fd != -1) {
+ msg.msg_control = fdspace;
+ msg.msg_controllen = sizeof (fdspace);
+ cmsg = CMSG_FIRSTHDR (&msg);
+ cmsg->cmsg_level = SOL_SOCKET;
+ cmsg->cmsg_type = SCM_RIGHTS;
+ cmsg->cmsg_len = CMSG_LEN (sizeof (int));
+ memcpy (CMSG_DATA (cmsg), &rdata->fd, sizeof (int));
+ }
+
+ iov.iov_base = &rdata->rep;
+ iov.iov_len = sizeof (rdata->rep);
+ msg.msg_iov = &iov;
+ msg.msg_iovlen = 1;
+
+ r = sendmsg (fd, &msg, 0);
+
+ if (r == -1) {
+ msg_err ("cannot write to worker's srv pipe: %s",
+ strerror (errno));
+ }
+
+ g_slice_free1 (sizeof (*rdata), rdata);
+ event_del (&worker->srv_ev);
+ event_set (&worker->srv_ev,
+ worker->srv_pipe[0],
+ EV_READ | EV_PERSIST,
+ rspamd_srv_handler,
+ worker);
+ event_add (&worker->srv_ev, NULL);
+ }
+}
+
+void rspamd_main_start_watching (struct rspamd_worker *worker,
+ struct event_base *ev_base)
+{
+ event_set (&worker->srv_ev, worker->srv_pipe[0], EV_READ | EV_PERSIST,
+ rspamd_srv_handler, worker);
+ event_base_set (ev_base, &worker->srv_ev);
+ event_add (&worker->srv_ev, NULL);
+}
diff --git a/src/libserver/rspamd_control.h b/src/libserver/rspamd_control.h
index ddca143cc..24874e785 100644
--- a/src/libserver/rspamd_control.h
+++ b/src/libserver/rspamd_control.h
@@ -38,6 +38,10 @@ enum rspamd_control_type {
RSPAMD_CONTROL_MAX
};
+enum rspamd_srv_type {
+ RSPAMD_SRV_SOCKETPAIR = 0,
+};
+
struct rspamd_control_command {
enum rspamd_control_type type;
union {
@@ -72,6 +76,29 @@ struct rspamd_control_reply {
} reply;
};
+#define PAIR_ID_LEN 16
+struct rspamd_srv_command {
+ enum rspamd_srv_type type;
+ guint64 id;
+ union {
+ struct {
+ gint af;
+ gchar pair_id[PAIR_ID_LEN];
+ guint pair_num;
+ } spair;
+ } cmd;
+};
+
+struct rspamd_srv_reply {
+ enum rspamd_srv_type type;
+ guint64 id;
+ union {
+ struct {
+ gint code;
+ } spair;
+ } reply;
+};
+
typedef gboolean (*rspamd_worker_control_handler) (struct rspamd_main *rspamd_main,
struct rspamd_worker *worker, gint fd,
struct rspamd_control_command *cmd,
@@ -97,4 +124,10 @@ void rspamd_control_worker_add_cmd_handler (struct rspamd_worker *worker,
rspamd_worker_control_handler handler,
gpointer ud);
+/**
+ * Start watching on srv pipe
+ */
+void rspamd_main_start_watching (struct rspamd_worker *worker,
+ struct event_base *ev_base);
+
#endif
diff --git a/src/libserver/worker_util.c b/src/libserver/worker_util.c
index f404a3295..4b24ee377 100644
--- a/src/libserver/worker_util.c
+++ b/src/libserver/worker_util.c
@@ -448,6 +448,11 @@ rspamd_fork_worker (struct rspamd_main *rspamd_main,
exit (-errno);
}
+ if (!rspamd_socketpair (cur->srv_pipe)) {
+ msg_err ("socketpair failure: %s", strerror (errno));
+ exit (-errno);
+ }
+
cur->srv = rspamd_main;
cur->type = cf->type;
cur->cf = g_malloc (sizeof (struct rspamd_worker_conf));
diff --git a/src/rspamd.h b/src/rspamd.h
index f10a8e90a..61ef25cd4 100644
--- a/src/rspamd.h
+++ b/src/rspamd.h
@@ -56,6 +56,9 @@ struct rspamd_worker {
gpointer ctx; /**< worker's specific data */
gint control_pipe[2]; /**< control pipe. [0] is used by main process,
[1] is used by a worker */
+ gint srv_pipe[2]; /**< used by workers to request something from the
+ main process. [0] - main, [1] - worker */
+ struct event srv_ev; /**< used by main for read workers' requests */
gpointer control_data; /**< used by control protocol to handle commands */
};
@@ -160,11 +163,11 @@ struct rspamd_main {
/* Pid file structure */
rspamd_pidfh_t *pfh; /**< struct pidfh for pidfile */
GQuark type; /**< process type */
- guint ev_initialized; /**< is event system is initialized */
struct rspamd_stat *stat; /**< pointer to statistics */
- rspamd_mempool_t *server_pool; /**< server's memory pool */
+ rspamd_mempool_t *server_pool; /**< server's memory pool */
GHashTable *workers; /**< workers pool indexed by pid */
+ GHashTable *spairs; /**< socket pairs requested by workers */
rspamd_logger_t *logger;
uid_t workers_uid; /**< worker's uid running to */
gid_t workers_gid; /**< worker's gid running to */