aboutsummaryrefslogtreecommitdiffstats
path: root/src/libserver/rspamd_control.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/libserver/rspamd_control.c')
-rw-r--r--src/libserver/rspamd_control.c261
1 files changed, 201 insertions, 60 deletions
diff --git a/src/libserver/rspamd_control.c b/src/libserver/rspamd_control.c
index 8dd7595f4..1bff2ff12 100644
--- a/src/libserver/rspamd_control.c
+++ b/src/libserver/rspamd_control.c
@@ -1,5 +1,5 @@
/*
- * Copyright 2023 Vsevolod Stakhov
+ * Copyright 2024 Vsevolod Stakhov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -31,6 +31,11 @@
#include "hyperscan_tools.h"
#endif
+#define msg_debug_control(...) rspamd_conditional_debug_fast(NULL, NULL, \
+ rspamd_control_log_id, "control", NULL, \
+ RSPAMD_LOG_FUNC, \
+ __VA_ARGS__)
+
static ev_tstamp io_timeout = 30.0;
static ev_tstamp worker_io_timeout = 0.5;
@@ -40,10 +45,14 @@ struct rspamd_control_reply_elt {
struct rspamd_control_reply reply;
struct rspamd_io_ev ev;
struct ev_loop *event_loop;
+ struct rspamd_worker *worker;
GQuark wrk_type;
pid_t wrk_pid;
+ rspamd_ev_cb handler;
gpointer ud;
int attached_fd;
+ bool sent;
+ struct rspamd_control_command cmd;
GHashTable *pending_elts;
struct rspamd_control_reply_elt *prev, *next;
};
@@ -76,17 +85,9 @@ static const struct rspamd_control_cmd_match {
};
static void rspamd_control_ignore_io_handler(int fd, short what, void *ud);
+static void rspamd_control_stop_pending(struct rspamd_control_reply_elt *elt);
-static void
-rspamd_control_stop_pending(struct rspamd_control_reply_elt *elt)
-{
- GHashTable *htb;
- /* It stops event and frees hash */
- htb = elt->pending_elts;
- g_hash_table_remove(elt->pending_elts, elt);
- /* Release hash reference */
- g_hash_table_unref(htb);
-}
+INIT_LOG_MODULE(control)
void rspamd_control_send_error(struct rspamd_control_session *session,
int code, const char *error_msg, ...)
@@ -357,10 +358,116 @@ void rspamd_pending_control_free(gpointer p)
{
struct rspamd_control_reply_elt *rep_elt = (struct rspamd_control_reply_elt *) p;
- rspamd_ev_watcher_stop(rep_elt->event_loop, &rep_elt->ev);
+ if (rep_elt->sent) {
+ rspamd_ev_watcher_stop(rep_elt->event_loop, &rep_elt->ev);
+ }
+ else if (rep_elt->attached_fd != -1) {
+ /* Only for non-sent requests! */
+ close(rep_elt->attached_fd);
+ }
+
+ g_hash_table_unref(rep_elt->pending_elts);
g_free(rep_elt);
}
+static inline void
+rspamd_control_fill_msghdr(struct rspamd_control_command *cmd,
+ int attached_fd, struct msghdr *msg,
+ struct iovec *iov)
+{
+ struct cmsghdr *cmsg;
+ unsigned char fdspace[CMSG_SPACE(sizeof(int))];
+
+ memset(msg, 0, sizeof(*msg));
+
+ /* Attach fd to the message */
+ if (attached_fd != -1) {
+ memset(fdspace, 0, sizeof(fdspace));
+ msg->msg_control = fdspace;
+ msg->msg_controllen = sizeof(fdspace);
+ cmsg = CMSG_FIRSTHDR(msg);
+ cmsg->cmsg_level = SOL_SOCKET;
+ cmsg->cmsg_type = SCM_RIGHTS;
+ cmsg->cmsg_len = CMSG_LEN(sizeof(int));
+ memcpy(CMSG_DATA(cmsg), &attached_fd, sizeof(int));
+ }
+
+ iov->iov_base = cmd;
+ iov->iov_len = sizeof(*cmd);
+ msg->msg_iov = iov;
+ msg->msg_iovlen = 1;
+}
+
+static void
+rspamd_control_stop_pending(struct rspamd_control_reply_elt *elt)
+{
+ GHashTable *htb;
+ struct rspamd_main *rspamd_main;
+ gsize pending;
+
+ /* It stops event and frees hash */
+ htb = elt->pending_elts;
+
+ pending = g_hash_table_size(htb);
+ msg_debug_control("stop pending for %P(%s), %d events pending", elt->wrk_pid,
+ g_quark_to_string(elt->wrk_type),
+ (int) pending);
+
+ if (elt->worker->state != rspamd_worker_state_terminating && pending != 0) {
+ /* Invoke another event from the queue */
+ GHashTableIter it;
+ gpointer k, v;
+
+ g_hash_table_iter_init(&it, elt->pending_elts);
+
+ while (g_hash_table_iter_next(&it, &k, &v)) {
+ struct rspamd_control_reply_elt *cur = v;
+
+ if (!cur->sent) {
+ struct msghdr msg;
+ struct iovec iov;
+
+ rspamd_main = cur->worker->srv;
+ rspamd_control_fill_msghdr(&cur->cmd, cur->attached_fd, &msg, &iov);
+ ssize_t r = sendmsg(cur->worker->control_pipe[0], &msg, 0);
+
+ if (r == sizeof(cur->cmd)) {
+ msg_debug_control("restarting pending event for %P(%s), %d events pending",
+ cur->wrk_pid,
+ g_quark_to_string(cur->wrk_type),
+ (int) pending - 1);
+ rspamd_ev_watcher_init(&cur->ev,
+ cur->worker->control_pipe[0],
+ EV_READ, cur->handler,
+ cur);
+ rspamd_ev_watcher_start(cur->event_loop,
+ &cur->ev, worker_io_timeout);
+ cur->sent = true;
+ if (cur->attached_fd != -1) {
+ /* Since `sendmsg` performs `dup` for us, we need to remove our own descriptor */
+ close(cur->attached_fd);
+ cur->attached_fd = -1;
+ }
+
+ break; /* Exit the outer loop as we have invoked something */
+ }
+ else {
+ msg_err_main("cannot write command %d to the worker %P(%s), fd: %d: %s",
+ (int) cur->cmd.type,
+ cur->wrk_pid,
+ g_quark_to_string(cur->wrk_type),
+ cur->worker->control_pipe[0],
+ strerror(errno));
+ g_hash_table_remove(elt->pending_elts, cur);
+ }
+ }
+ }
+ }
+
+ /* Remove from hash and performs the cleanup */
+ g_hash_table_remove(elt->pending_elts, elt);
+}
+
static struct rspamd_control_reply_elt *
rspamd_control_broadcast_cmd(struct rspamd_main *rspamd_main,
struct rspamd_control_command *cmd,
@@ -373,10 +480,6 @@ rspamd_control_broadcast_cmd(struct rspamd_main *rspamd_main,
struct rspamd_worker *wrk;
struct rspamd_control_reply_elt *rep_elt, *res = NULL;
gpointer k, v;
- struct msghdr msg;
- struct cmsghdr *cmsg;
- struct iovec iov;
- unsigned char fdspace[CMSG_SPACE(sizeof(int))];
gssize r;
g_hash_table_iter_init(&it, rspamd_main->workers);
@@ -398,51 +501,89 @@ rspamd_control_broadcast_cmd(struct rspamd_main *rspamd_main,
continue;
}
- memset(&msg, 0, sizeof(msg));
-
- /* Attach fd to the message */
- if (attached_fd != -1) {
- memset(fdspace, 0, sizeof(fdspace));
- msg.msg_control = fdspace;
- msg.msg_controllen = sizeof(fdspace);
- cmsg = CMSG_FIRSTHDR(&msg);
- cmsg->cmsg_level = SOL_SOCKET;
- cmsg->cmsg_type = SCM_RIGHTS;
- cmsg->cmsg_len = CMSG_LEN(sizeof(int));
- memcpy(CMSG_DATA(cmsg), &attached_fd, sizeof(int));
- }
-
- iov.iov_base = cmd;
- iov.iov_len = sizeof(*cmd);
- msg.msg_iov = &iov;
- msg.msg_iovlen = 1;
-
- r = sendmsg(wrk->control_pipe[0], &msg, 0);
-
- if (r == sizeof(*cmd)) {
- rep_elt = g_malloc0(sizeof(*rep_elt));
- rep_elt->wrk_pid = wrk->pid;
- rep_elt->wrk_type = wrk->type;
- rep_elt->event_loop = rspamd_main->event_loop;
- rep_elt->ud = ud;
- rep_elt->pending_elts = g_hash_table_ref(wrk->control_events_pending);
- rspamd_ev_watcher_init(&rep_elt->ev,
- wrk->control_pipe[0],
- EV_READ, handler,
- rep_elt);
- rspamd_ev_watcher_start(rspamd_main->event_loop,
- &rep_elt->ev, worker_io_timeout);
- g_hash_table_insert(wrk->control_events_pending, rep_elt, rep_elt);
-
- DL_APPEND(res, rep_elt);
+ rep_elt = g_malloc0(sizeof(*rep_elt));
+ rep_elt->worker = wrk;
+ rep_elt->wrk_pid = wrk->pid;
+ rep_elt->wrk_type = wrk->type;
+ rep_elt->event_loop = rspamd_main->event_loop;
+ rep_elt->ud = ud;
+ rep_elt->handler = handler;
+ memcpy(&rep_elt->cmd, cmd, sizeof(*cmd));
+ rep_elt->sent = false;
+ rep_elt->attached_fd = -1;
+
+ if (g_hash_table_size(wrk->control_events_pending) == 0) {
+ /* We can send command */
+ struct msghdr msg;
+ struct iovec iov;
+
+ rspamd_control_fill_msghdr(cmd, attached_fd, &msg, &iov);
+ r = sendmsg(wrk->control_pipe[0], &msg, 0);
+
+ if (r == sizeof(*cmd)) {
+ rspamd_ev_watcher_init(&rep_elt->ev,
+ wrk->control_pipe[0],
+ EV_READ, handler,
+ rep_elt);
+ rspamd_ev_watcher_start(rspamd_main->event_loop,
+ &rep_elt->ev, worker_io_timeout);
+ rep_elt->sent = true;
+ rep_elt->pending_elts = g_hash_table_ref(wrk->control_events_pending);
+ g_hash_table_insert(wrk->control_events_pending, rep_elt, rep_elt);
+
+ DL_APPEND(res, rep_elt);
+ msg_debug_control("sent command %d to the worker %P(%s), fd: %d",
+ (int) cmd->type,
+ wrk->pid,
+ g_quark_to_string(wrk->type),
+ wrk->control_pipe[0]);
+ }
+ else {
+ msg_err_main("cannot write command %d to the worker %P(%s), fd: %d: %s",
+ (int) cmd->type,
+ wrk->pid,
+ g_quark_to_string(wrk->type),
+ wrk->control_pipe[0],
+ strerror(errno));
+ g_free(rep_elt);
+ }
}
else {
- msg_err_main("cannot write command %d(%z) to the worker %P(%s), fd: %d: %s",
- (int) cmd->type, iov.iov_len,
- wrk->pid,
- g_quark_to_string(wrk->type),
- wrk->control_pipe[0],
- strerror(errno));
+ /* We need to wait till the last command is processed, or it will mess up all serialization */
+ msg_debug_control("pending event for %P(%s), %d events pending",
+ wrk->pid,
+ g_quark_to_string(wrk->type),
+ (int) g_hash_table_size(wrk->control_events_pending));
+ rep_elt->pending_elts = g_hash_table_ref(wrk->control_events_pending);
+ /*
+ * Here are dragons:
+ * If we have a descriptor to send, the callee expects that we follow
+ * sendmsg semantics that performs `dup` on it. So we need to clone fd and keep it there.
+ */
+ if (attached_fd != -1) {
+ rep_elt->attached_fd = dup(attached_fd);
+
+ if (rep_elt->attached_fd == -1) {
+ /*
+ * We have a problem: file descriptors limit is reached, so we cannot really deal with this
+ * request
+ */
+ msg_err_main("cannot duplicate file descriptor to send command to worker %P(%s): %s; failed to send command",
+ wrk->pid,
+ g_quark_to_string(wrk->type),
+ strerror(errno));
+ g_hash_table_unref(rep_elt->pending_elts);
+ g_free(rep_elt);
+ }
+ else {
+ g_hash_table_insert(wrk->control_events_pending, rep_elt, rep_elt);
+ DL_APPEND(res, rep_elt);
+ }
+ }
+ else {
+ g_hash_table_insert(wrk->control_events_pending, rep_elt, rep_elt);
+ DL_APPEND(res, rep_elt);
+ }
}
}
@@ -737,7 +878,7 @@ rspamd_control_ignore_io_handler(int fd, short what, void *ud)
/* At this point we just ignore replies from the workers */
if (read(fd, &rep, sizeof(rep)) == -1) {
- msg_debug("cannot read %d bytes: %s", (int) sizeof(rep), strerror(errno));
+ msg_debug_control("cannot read %d bytes: %s", (int) sizeof(rep), strerror(errno));
}
rspamd_control_stop_pending(elt);
}