]> source.dussan.org Git - rspamd.git/commitdiff
Broadcast control command to workers.
authorVsevolod Stakhov <vsevolod@highsecure.ru>
Sun, 11 Oct 2015 20:07:07 +0000 (21:07 +0100)
committerVsevolod Stakhov <vsevolod@highsecure.ru>
Sun, 11 Oct 2015 20:07:07 +0000 (21:07 +0100)
src/libserver/rspamd_control.c

index aa2edaf74f91d3b8f69ea64437e063e74ff55c45..8d90670539fa4a9d88dc5d0a4f00320070baa0b3 100644 (file)
@@ -34,11 +34,13 @@ static struct timeval io_timeout = {
                .tv_usec = 0
 };
 
+struct rspamd_control_session;
+
 struct rspamd_control_reply_elt {
        struct rspamd_control_reply reply;
        struct event io_ev;
-       struct event tm_ev;
        struct rspamd_worker *wrk;
+       struct rspamd_control_session *session;
        struct rspamd_control_reply_elt *prev, *next;
 };
 
@@ -48,6 +50,7 @@ struct rspamd_control_session {
        struct rspamd_http_connection *conn;
        struct rspamd_control_command cmd;
        struct rspamd_control_reply_elt *replies;
+       guint replies_remain;
        gboolean is_reply;
 };
 
@@ -128,6 +131,7 @@ rspamd_control_connection_close (struct rspamd_control_session *session)
        struct rspamd_control_reply_elt *elt, *telt;
 
        DL_FOREACH_SAFE (session->replies, elt, telt) {
+               event_del (&elt->io_ev);
                g_slice_free1 (sizeof (*elt), elt);
        }
 
@@ -136,6 +140,26 @@ rspamd_control_connection_close (struct rspamd_control_session *session)
        g_slice_free1 (sizeof (*session), session);
 }
 
+static void
+rspamd_control_wrk_io (gint fd, short what, gpointer ud)
+{
+       struct rspamd_control_reply_elt *elt = ud;
+
+       if (read (elt->wrk->control_pipe[1], &elt->reply, sizeof (elt->reply)) !=
+                               sizeof (elt->reply)) {
+               msg_err ("cannot read request from the worker %p (%s): %s",
+                               elt->wrk->pid, g_quark_to_string (elt->wrk->type), strerror (errno));
+       }
+
+       elt->session->replies_remain --;
+       event_del (&elt->io_ev);
+
+       if (elt->session->replies_remain == 0) {
+               /* TODO: add reply logic */
+               rspamd_control_connection_close (elt->session);
+       }
+}
+
 static void
 rspamd_control_error_handler (struct rspamd_http_connection *conn, GError *err)
 {
@@ -159,6 +183,10 @@ rspamd_control_finish_hadler (struct rspamd_http_connection *conn,
        rspamd_ftok_t srch;
        guint i;
        gboolean found = FALSE;
+       GHashTableIter it;
+       struct rspamd_worker *wrk;
+       struct rspamd_control_reply_elt *rep_elt;
+       gpointer k, v;
 
        if (!session->is_reply) {
                if (msg->url == NULL) {
@@ -184,7 +212,33 @@ rspamd_control_finish_hadler (struct rspamd_http_connection *conn,
                        rspamd_control_send_error (session, 404, "Command not defined");
                }
                else {
-                       rspamd_control_send_error (session, 500, "Not implemented yet");
+                       /* Send command to all workers */
+                       g_hash_table_iter_init (&it, session->rspamd_main->workers);
+
+                       while (g_hash_table_iter_next (&it, &k, &v)) {
+                               wrk = v;
+
+                               if (write (wrk->control_pipe[0], &session->cmd,
+                                               sizeof (session->cmd)) == sizeof (session->cmd)) {
+
+                                       rep_elt = g_slice_alloc0 (sizeof (*rep_elt));
+                                       rep_elt->wrk = wrk;
+                                       rep_elt->session = session;
+                                       event_set (&rep_elt->io_ev, wrk->control_pipe[0],
+                                                       EV_READ | EV_PERSIST, rspamd_control_wrk_io,
+                                                       rep_elt);
+                                       event_base_set (session->rspamd_main->ev_base,
+                                                       &rep_elt->io_ev);
+                                       event_add (&rep_elt->io_ev, &io_timeout);
+
+                                       DL_APPEND (session->replies, rep_elt);
+                                       session->replies_remain ++;
+                               }
+                               else {
+                                       msg_err ("cannot write request to the worker %p (%s): %s",
+                                               wrk->pid, g_quark_to_string (wrk->type), strerror (errno));
+                               }
+                       }
                }
        }
        else {