]> source.dussan.org Git - rspamd.git/commitdiff
[Rework] Serialize control commands
authorVsevolod Stakhov <vsevolod@rspamd.com>
Mon, 22 Apr 2024 14:49:47 +0000 (15:49 +0100)
committerVsevolod Stakhov <vsevolod@rspamd.com>
Mon, 22 Apr 2024 14:49:47 +0000 (15:49 +0100)
In fact, we cannot send multiple commands and read them through the pipe. It has
caused multiple weird issues in the past but I can now see clearly how it should
be done. We should send commands and serialize all requests pending to let them
being sent one by one, after reply for the previous command has been received.

src/libserver/rspamd_control.c
src/libutil/libev_helper.c

index 8dd7595f4d47df4072a3b48d885f360b9b4e6f89..c1bc175d6f25d73ebe8df230e41f8bea4f540d88 100644 (file)
@@ -1,5 +1,5 @@
 /*
- * Copyright 2023 Vsevolod Stakhov
+ * Copyright 2024 Vsevolod Stakhov
  *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
 #include "hyperscan_tools.h"
 #endif
 
+#define msg_debug_control(...) rspamd_conditional_debug_fast(NULL, NULL,                             \
+                                                                                                                        rspamd_control_log_id, "control", NULL, \
+                                                                                                                        RSPAMD_LOG_FUNC,                        \
+                                                                                                                        __VA_ARGS__)
+
 static ev_tstamp io_timeout = 30.0;
 static ev_tstamp worker_io_timeout = 0.5;
 
@@ -40,10 +45,14 @@ struct rspamd_control_reply_elt {
        struct rspamd_control_reply reply;
        struct rspamd_io_ev ev;
        struct ev_loop *event_loop;
+       struct rspamd_worker *worker;
        GQuark wrk_type;
        pid_t wrk_pid;
+       rspamd_ev_cb handler;
        gpointer ud;
        int attached_fd;
+       bool sent;
+       struct rspamd_control_command cmd;
        GHashTable *pending_elts;
        struct rspamd_control_reply_elt *prev, *next;
 };
@@ -76,17 +85,9 @@ static const struct rspamd_control_cmd_match {
 };
 
 static void rspamd_control_ignore_io_handler(int fd, short what, void *ud);
+static void rspamd_control_stop_pending(struct rspamd_control_reply_elt *elt);
 
-static void
-rspamd_control_stop_pending(struct rspamd_control_reply_elt *elt)
-{
-       GHashTable *htb;
-       /* It stops event and frees hash */
-       htb = elt->pending_elts;
-       g_hash_table_remove(elt->pending_elts, elt);
-       /* Release hash reference */
-       g_hash_table_unref(htb);
-}
+INIT_LOG_MODULE(control)
 
 void rspamd_control_send_error(struct rspamd_control_session *session,
                                                           int code, const char *error_msg, ...)
@@ -361,6 +362,103 @@ void rspamd_pending_control_free(gpointer p)
        g_free(rep_elt);
 }
 
+static inline void
+rspamd_control_fill_msghdr(struct rspamd_control_command *cmd,
+                                                  int attached_fd, struct msghdr *msg)
+{
+       struct cmsghdr *cmsg;
+       struct iovec iov;
+       unsigned char fdspace[CMSG_SPACE(sizeof(int))];
+
+       memset(msg, 0, sizeof(*msg));
+
+       /* Attach fd to the message */
+       if (attached_fd != -1) {
+               memset(fdspace, 0, sizeof(fdspace));
+               msg->msg_control = fdspace;
+               msg->msg_controllen = sizeof(fdspace);
+               cmsg = CMSG_FIRSTHDR(msg);
+               cmsg->cmsg_level = SOL_SOCKET;
+               cmsg->cmsg_type = SCM_RIGHTS;
+               cmsg->cmsg_len = CMSG_LEN(sizeof(int));
+               memcpy(CMSG_DATA(cmsg), &attached_fd, sizeof(int));
+       }
+
+       iov.iov_base = cmd;
+       iov.iov_len = sizeof(*cmd);
+       msg->msg_iov = &iov;
+       msg->msg_iovlen = 1;
+}
+
+static void
+rspamd_control_stop_pending(struct rspamd_control_reply_elt *elt)
+{
+       GHashTable *htb;
+       struct rspamd_main *rspamd_main;
+       gsize pending;
+
+       /* It stops event and frees hash */
+       htb = elt->pending_elts;
+
+       pending = g_hash_table_size(htb);
+       msg_debug_control("stop pending for %P(%s), %d events pending", elt->wrk_pid,
+                                         g_quark_to_string(elt->wrk_type),
+                                         (int) pending);
+
+       if (pending != 0) {
+               /* Invoke another event from the queue */
+               GHashTableIter it;
+               gpointer k, v;
+
+               g_hash_table_iter_init(&it, elt->pending_elts);
+
+               while (g_hash_table_iter_next(&it, &k, &v)) {
+                       struct rspamd_control_reply_elt *cur = v;
+
+                       if (!cur->sent) {
+                               struct msghdr msg;
+
+                               rspamd_main = cur->worker->srv;
+                               rspamd_control_fill_msghdr(&cur->cmd, cur->attached_fd, &msg);
+                               ssize_t r = sendmsg(cur->worker->control_pipe[0], &msg, 0);
+
+                               if (r == sizeof(cur->cmd)) {
+                                       msg_debug_control("restarting pending event for %P(%s), %d events pending",
+                                                                         cur->wrk_pid,
+                                                                         g_quark_to_string(cur->wrk_type),
+                                                                         (int) pending - 1);
+                                       rspamd_ev_watcher_init(&cur->ev,
+                                                                                  cur->worker->control_pipe[0],
+                                                                                  EV_READ, cur->handler,
+                                                                                  cur);
+                                       rspamd_ev_watcher_start(cur->event_loop,
+                                                                                       &cur->ev, worker_io_timeout);
+                                       cur->sent = true;
+
+                                       break; /* Exit the outer loop as we have invoked something */
+                               }
+                               else {
+                                       msg_err_main("cannot write command %d to the worker %P(%s), fd: %d: %s",
+                                                                (int) cur->cmd.type,
+                                                                cur->wrk_pid,
+                                                                g_quark_to_string(cur->wrk_type),
+                                                                cur->worker->control_pipe[0],
+                                                                strerror(errno));
+                                       g_hash_table_remove(elt->pending_elts, cur);
+                                       /* Free one refcounter on a hash table as it was taken by `cur` */
+                                       g_hash_table_unref(htb);
+                               }
+                       }
+               }
+       }
+
+       /* Remove from hash and performs the cleanup */
+       g_hash_table_remove(elt->pending_elts, elt);
+
+       /* Release hash reference */
+       g_hash_table_unref(htb);
+}
+
 static struct rspamd_control_reply_elt *
 rspamd_control_broadcast_cmd(struct rspamd_main *rspamd_main,
                                                         struct rspamd_control_command *cmd,
@@ -374,9 +472,6 @@ rspamd_control_broadcast_cmd(struct rspamd_main *rspamd_main,
        struct rspamd_control_reply_elt *rep_elt, *res = NULL;
        gpointer k, v;
        struct msghdr msg;
-       struct cmsghdr *cmsg;
-       struct iovec iov;
-       unsigned char fdspace[CMSG_SPACE(sizeof(int))];
        gssize r;
 
        g_hash_table_iter_init(&it, rspamd_main->workers);
@@ -398,52 +493,58 @@ rspamd_control_broadcast_cmd(struct rspamd_main *rspamd_main,
                        continue;
                }
 
-               memset(&msg, 0, sizeof(msg));
-
-               /* Attach fd to the message */
-               if (attached_fd != -1) {
-                       memset(fdspace, 0, sizeof(fdspace));
-                       msg.msg_control = fdspace;
-                       msg.msg_controllen = sizeof(fdspace);
-                       cmsg = CMSG_FIRSTHDR(&msg);
-                       cmsg->cmsg_level = SOL_SOCKET;
-                       cmsg->cmsg_type = SCM_RIGHTS;
-                       cmsg->cmsg_len = CMSG_LEN(sizeof(int));
-                       memcpy(CMSG_DATA(cmsg), &attached_fd, sizeof(int));
+               rep_elt = g_malloc0(sizeof(*rep_elt));
+               rep_elt->worker = wrk;
+               rep_elt->wrk_pid = wrk->pid;
+               rep_elt->wrk_type = wrk->type;
+               rep_elt->event_loop = rspamd_main->event_loop;
+               rep_elt->ud = ud;
+               rep_elt->handler = handler;
+               rep_elt->pending_elts = g_hash_table_ref(wrk->control_events_pending);
+               memcpy(&rep_elt->cmd, cmd, sizeof(*cmd));
+               rep_elt->sent = false;
+
+               if (g_hash_table_size(wrk->control_events_pending) == 0) {
+                       /* We can send command */
+                       rspamd_control_fill_msghdr(cmd, attached_fd, &msg);
+                       r = sendmsg(wrk->control_pipe[0], &msg, 0);
+
+                       if (r == sizeof(*cmd)) {
+                               rspamd_ev_watcher_init(&rep_elt->ev,
+                                                                          wrk->control_pipe[0],
+                                                                          EV_READ, handler,
+                                                                          rep_elt);
+                               rspamd_ev_watcher_start(rspamd_main->event_loop,
+                                                                               &rep_elt->ev, worker_io_timeout);
+                               g_hash_table_insert(wrk->control_events_pending, rep_elt, rep_elt);
+                               rep_elt->sent = true;
+
+                               DL_APPEND(res, rep_elt);
+                               msg_debug_control("sent command %d to the worker %P(%s), fd: %d",
+                                                                 (int) cmd->type,
+                                                                 wrk->pid,
+                                                                 g_quark_to_string(wrk->type),
+                                                                 wrk->control_pipe[0]);
+                       }
+                       else {
+                               msg_err_main("cannot write command %d to the worker %P(%s), fd: %d: %s",
+                                                        (int) cmd->type,
+                                                        wrk->pid,
+                                                        g_quark_to_string(wrk->type),
+                                                        wrk->control_pipe[0],
+                                                        strerror(errno));
+                               g_free(rep_elt);
+                       }
                }
-
-               iov.iov_base = cmd;
-               iov.iov_len = sizeof(*cmd);
-               msg.msg_iov = &iov;
-               msg.msg_iovlen = 1;
-
-               r = sendmsg(wrk->control_pipe[0], &msg, 0);
-
-               if (r == sizeof(*cmd)) {
-                       rep_elt = g_malloc0(sizeof(*rep_elt));
-                       rep_elt->wrk_pid = wrk->pid;
-                       rep_elt->wrk_type = wrk->type;
-                       rep_elt->event_loop = rspamd_main->event_loop;
-                       rep_elt->ud = ud;
-                       rep_elt->pending_elts = g_hash_table_ref(wrk->control_events_pending);
-                       rspamd_ev_watcher_init(&rep_elt->ev,
-                                                                  wrk->control_pipe[0],
-                                                                  EV_READ, handler,
-                                                                  rep_elt);
-                       rspamd_ev_watcher_start(rspamd_main->event_loop,
-                                                                       &rep_elt->ev, worker_io_timeout);
+               else {
+                       /* We need to wait till the last command is processed, or it will mess up all serialization */
+                       msg_debug_control("pending event for %P(%s), %d events pending",
+                                                         wrk->pid,
+                                                         g_quark_to_string(wrk->type),
+                                                         (int) g_hash_table_size(wrk->control_events_pending));
                        g_hash_table_insert(wrk->control_events_pending, rep_elt, rep_elt);
-
                        DL_APPEND(res, rep_elt);
                }
-               else {
-                       msg_err_main("cannot write command %d(%z) to the worker %P(%s), fd: %d: %s",
-                                                (int) cmd->type, iov.iov_len,
-                                                wrk->pid,
-                                                g_quark_to_string(wrk->type),
-                                                wrk->control_pipe[0],
-                                                strerror(errno));
-               }
        }
 
        return res;
@@ -737,7 +838,7 @@ rspamd_control_ignore_io_handler(int fd, short what, void *ud)
 
        /* At this point we just ignore replies from the workers */
        if (read(fd, &rep, sizeof(rep)) == -1) {
-               msg_debug("cannot read %d bytes: %s", (int) sizeof(rep), strerror(errno));
+               msg_debug_control("cannot read %d bytes: %s", (int) sizeof(rep), strerror(errno));
        }
        rspamd_control_stop_pending(elt);
 }
index 770964b7f2aa9cd42e1c7f7f1e310ebd49d691e9..3b880aaa269c9e32e3c024d62b2abf028a40f939 100644 (file)
@@ -1,11 +1,11 @@
-/*-
- * Copyright 2019 Vsevolod Stakhov
+/*
+ * Copyright 2024 Vsevolod Stakhov
  *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * You may obtain a copy of the License at
  *
- *   http://www.apache.org/licenses/LICENSE-2.0
+ *    http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -57,7 +57,7 @@ void rspamd_ev_watcher_start(struct ev_loop *loop,
 {
        g_assert(ev->cb != NULL);
 
-       ev_io_start(EV_A_ & ev->io);
+       ev_io_start(EV_A, &ev->io);
 
        if (timeout > 0) {
                /* Update timestamp to avoid timers running early */
@@ -65,7 +65,7 @@ void rspamd_ev_watcher_start(struct ev_loop *loop,
 
                ev->timeout = timeout;
                ev_timer_set(&ev->tm, timeout, 0.0);
-               ev_timer_start(EV_A_ & ev->tm);
+               ev_timer_start(EV_A, &ev->tm);
        }
 }
 
@@ -73,11 +73,11 @@ void rspamd_ev_watcher_stop(struct ev_loop *loop,
                                                        struct rspamd_io_ev *ev)
 {
        if (ev_can_stop(&ev->io)) {
-               ev_io_stop(EV_A_ & ev->io);
+               ev_io_stop(EV_A, &ev->io);
        }
 
        if (ev->timeout > 0) {
-               ev_timer_stop(EV_A_ & ev->tm);
+               ev_timer_stop(EV_A, &ev->tm);
        }
 }
 
@@ -88,14 +88,14 @@ void rspamd_ev_watcher_reschedule(struct ev_loop *loop,
        g_assert(ev->cb != NULL);
 
        if (ev_can_stop(&ev->io)) {
-               ev_io_stop(EV_A_ & ev->io);
+               ev_io_stop(EV_A, &ev->io);
                ev_io_set(&ev->io, ev->io.fd, what);
-               ev_io_start(EV_A_ & ev->io);
+               ev_io_start(EV_A, &ev->io);
        }
        else {
                ev->io.data = ev;
                ev_io_init(&ev->io, rspamd_ev_watcher_io_cb, ev->io.fd, what);
-               ev_io_start(EV_A_ & ev->io);
+               ev_io_start(EV_A, &ev->io);
        }
 
        if (ev->timeout > 0) {
@@ -105,7 +105,7 @@ void rspamd_ev_watcher_reschedule(struct ev_loop *loop,
 
                        ev->tm.data = ev;
                        ev_timer_init(&ev->tm, rspamd_ev_watcher_timer_cb, ev->timeout, 0.0);
-                       ev_timer_start(EV_A_ & ev->tm);
+                       ev_timer_start(EV_A, &ev->tm);
                }
        }
 }
\ No newline at end of file