aboutsummaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/libserver/rspamd_control.c58
1 files changed, 56 insertions, 2 deletions
diff --git a/src/libserver/rspamd_control.c b/src/libserver/rspamd_control.c
index aa2edaf74..8d9067053 100644
--- a/src/libserver/rspamd_control.c
+++ b/src/libserver/rspamd_control.c
@@ -34,11 +34,13 @@ static struct timeval io_timeout = {
.tv_usec = 0
};
+struct rspamd_control_session;
+
struct rspamd_control_reply_elt {
struct rspamd_control_reply reply;
struct event io_ev;
- struct event tm_ev;
struct rspamd_worker *wrk;
+ struct rspamd_control_session *session;
struct rspamd_control_reply_elt *prev, *next;
};
@@ -48,6 +50,7 @@ struct rspamd_control_session {
struct rspamd_http_connection *conn;
struct rspamd_control_command cmd;
struct rspamd_control_reply_elt *replies;
+ guint replies_remain;
gboolean is_reply;
};
@@ -128,6 +131,7 @@ rspamd_control_connection_close (struct rspamd_control_session *session)
struct rspamd_control_reply_elt *elt, *telt;
DL_FOREACH_SAFE (session->replies, elt, telt) {
+ event_del (&elt->io_ev);
g_slice_free1 (sizeof (*elt), elt);
}
@@ -137,6 +141,26 @@ rspamd_control_connection_close (struct rspamd_control_session *session)
}
static void
+rspamd_control_wrk_io (gint fd, short what, gpointer ud)
+{
+ struct rspamd_control_reply_elt *elt = ud;
+
+ if (read (elt->wrk->control_pipe[1], &elt->reply, sizeof (elt->reply)) !=
+ sizeof (elt->reply)) {
+ msg_err ("cannot read request from the worker %p (%s): %s",
+ elt->wrk->pid, g_quark_to_string (elt->wrk->type), strerror (errno));
+ }
+
+ elt->session->replies_remain --;
+ event_del (&elt->io_ev);
+
+ if (elt->session->replies_remain == 0) {
+ /* TODO: add reply logic */
+ rspamd_control_connection_close (elt->session);
+ }
+}
+
+static void
rspamd_control_error_handler (struct rspamd_http_connection *conn, GError *err)
{
struct rspamd_control_session *session = conn->ud;
@@ -159,6 +183,10 @@ rspamd_control_finish_hadler (struct rspamd_http_connection *conn,
rspamd_ftok_t srch;
guint i;
gboolean found = FALSE;
+ GHashTableIter it;
+ struct rspamd_worker *wrk;
+ struct rspamd_control_reply_elt *rep_elt;
+ gpointer k, v;
if (!session->is_reply) {
if (msg->url == NULL) {
@@ -184,7 +212,33 @@ rspamd_control_finish_hadler (struct rspamd_http_connection *conn,
rspamd_control_send_error (session, 404, "Command not defined");
}
else {
- rspamd_control_send_error (session, 500, "Not implemented yet");
+ /* Send command to all workers */
+ g_hash_table_iter_init (&it, session->rspamd_main->workers);
+
+ while (g_hash_table_iter_next (&it, &k, &v)) {
+ wrk = v;
+
+ if (write (wrk->control_pipe[0], &session->cmd,
+ sizeof (session->cmd)) == sizeof (session->cmd)) {
+
+ rep_elt = g_slice_alloc0 (sizeof (*rep_elt));
+ rep_elt->wrk = wrk;
+ rep_elt->session = session;
+ event_set (&rep_elt->io_ev, wrk->control_pipe[0],
+ EV_READ | EV_PERSIST, rspamd_control_wrk_io,
+ rep_elt);
+ event_base_set (session->rspamd_main->ev_base,
+ &rep_elt->io_ev);
+ event_add (&rep_elt->io_ev, &io_timeout);
+
+ DL_APPEND (session->replies, rep_elt);
+ session->replies_remain ++;
+ }
+ else {
+ msg_err ("cannot write request to the worker %p (%s): %s",
+ wrk->pid, g_quark_to_string (wrk->type), strerror (errno));
+ }
+ }
}
}
else {