From f8fe3b1e2ed54ef05b718875da8d5dc1a76e9d1e Mon Sep 17 00:00:00 2001 From: Vsevolod Stakhov Date: Sun, 11 Oct 2015 21:07:07 +0100 Subject: [PATCH] Broadcast control command to workers. --- src/libserver/rspamd_control.c | 58 ++++++++++++++++++++++++++++++++-- 1 file changed, 56 insertions(+), 2 deletions(-) diff --git a/src/libserver/rspamd_control.c b/src/libserver/rspamd_control.c index aa2edaf74..8d9067053 100644 --- a/src/libserver/rspamd_control.c +++ b/src/libserver/rspamd_control.c @@ -34,11 +34,13 @@ static struct timeval io_timeout = { .tv_usec = 0 }; +struct rspamd_control_session; + struct rspamd_control_reply_elt { struct rspamd_control_reply reply; struct event io_ev; - struct event tm_ev; struct rspamd_worker *wrk; + struct rspamd_control_session *session; struct rspamd_control_reply_elt *prev, *next; }; @@ -48,6 +50,7 @@ struct rspamd_control_session { struct rspamd_http_connection *conn; struct rspamd_control_command cmd; struct rspamd_control_reply_elt *replies; + guint replies_remain; gboolean is_reply; }; @@ -128,6 +131,7 @@ rspamd_control_connection_close (struct rspamd_control_session *session) struct rspamd_control_reply_elt *elt, *telt; DL_FOREACH_SAFE (session->replies, elt, telt) { + event_del (&elt->io_ev); g_slice_free1 (sizeof (*elt), elt); } @@ -136,6 +140,26 @@ rspamd_control_connection_close (struct rspamd_control_session *session) g_slice_free1 (sizeof (*session), session); } +static void +rspamd_control_wrk_io (gint fd, short what, gpointer ud) +{ + struct rspamd_control_reply_elt *elt = ud; + + if (read (elt->wrk->control_pipe[1], &elt->reply, sizeof (elt->reply)) != + sizeof (elt->reply)) { + msg_err ("cannot read request from the worker %p (%s): %s", + elt->wrk->pid, g_quark_to_string (elt->wrk->type), strerror (errno)); + } + + elt->session->replies_remain --; + event_del (&elt->io_ev); + + if (elt->session->replies_remain == 0) { + /* TODO: add reply logic */ + rspamd_control_connection_close (elt->session); + } +} + static void rspamd_control_error_handler (struct rspamd_http_connection *conn, GError *err) { @@ -159,6 +183,10 @@ rspamd_control_finish_hadler (struct rspamd_http_connection *conn, rspamd_ftok_t srch; guint i; gboolean found = FALSE; + GHashTableIter it; + struct rspamd_worker *wrk; + struct rspamd_control_reply_elt *rep_elt; + gpointer k, v; if (!session->is_reply) { if (msg->url == NULL) { @@ -184,7 +212,33 @@ rspamd_control_finish_hadler (struct rspamd_http_connection *conn, rspamd_control_send_error (session, 404, "Command not defined"); } else { - rspamd_control_send_error (session, 500, "Not implemented yet"); + /* Send command to all workers */ + g_hash_table_iter_init (&it, session->rspamd_main->workers); + + while (g_hash_table_iter_next (&it, &k, &v)) { + wrk = v; + + if (write (wrk->control_pipe[0], &session->cmd, + sizeof (session->cmd)) == sizeof (session->cmd)) { + + rep_elt = g_slice_alloc0 (sizeof (*rep_elt)); + rep_elt->wrk = wrk; + rep_elt->session = session; + event_set (&rep_elt->io_ev, wrk->control_pipe[0], + EV_READ | EV_PERSIST, rspamd_control_wrk_io, + rep_elt); + event_base_set (session->rspamd_main->ev_base, + &rep_elt->io_ev); + event_add (&rep_elt->io_ev, &io_timeout); + + DL_APPEND (session->replies, rep_elt); + session->replies_remain ++; + } + else { + msg_err ("cannot write request to the worker %p (%s): %s", + wrk->pid, g_quark_to_string (wrk->type), strerror (errno)); + } + } } } else { -- 2.39.5