Browse Source

Merge pull request #4937 from rspamd/vstakhov-control-fixes

Serialize control commands
pull/4942/head
Vsevolod Stakhov 1 week ago
parent
commit
980cd40f8d
No account linked to committer's email address
2 changed files with 212 additions and 71 deletions
  1. 201
    60
      src/libserver/rspamd_control.c
  2. 11
    11
      src/libutil/libev_helper.c

+ 201
- 60
src/libserver/rspamd_control.c View File

@@ -1,5 +1,5 @@
/*
* Copyright 2023 Vsevolod Stakhov
* Copyright 2024 Vsevolod Stakhov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -31,6 +31,11 @@
#include "hyperscan_tools.h"
#endif

#define msg_debug_control(...) rspamd_conditional_debug_fast(NULL, NULL, \
rspamd_control_log_id, "control", NULL, \
RSPAMD_LOG_FUNC, \
__VA_ARGS__)

static ev_tstamp io_timeout = 30.0;
static ev_tstamp worker_io_timeout = 0.5;

@@ -40,10 +45,14 @@ struct rspamd_control_reply_elt {
struct rspamd_control_reply reply;
struct rspamd_io_ev ev;
struct ev_loop *event_loop;
struct rspamd_worker *worker;
GQuark wrk_type;
pid_t wrk_pid;
rspamd_ev_cb handler;
gpointer ud;
int attached_fd;
bool sent;
struct rspamd_control_command cmd;
GHashTable *pending_elts;
struct rspamd_control_reply_elt *prev, *next;
};
@@ -76,17 +85,9 @@ static const struct rspamd_control_cmd_match {
};

static void rspamd_control_ignore_io_handler(int fd, short what, void *ud);
static void rspamd_control_stop_pending(struct rspamd_control_reply_elt *elt);

static void
rspamd_control_stop_pending(struct rspamd_control_reply_elt *elt)
{
GHashTable *htb;
/* It stops event and frees hash */
htb = elt->pending_elts;
g_hash_table_remove(elt->pending_elts, elt);
/* Release hash reference */
g_hash_table_unref(htb);
}
INIT_LOG_MODULE(control)

void rspamd_control_send_error(struct rspamd_control_session *session,
int code, const char *error_msg, ...)
@@ -357,10 +358,116 @@ void rspamd_pending_control_free(gpointer p)
{
struct rspamd_control_reply_elt *rep_elt = (struct rspamd_control_reply_elt *) p;

rspamd_ev_watcher_stop(rep_elt->event_loop, &rep_elt->ev);
if (rep_elt->sent) {
rspamd_ev_watcher_stop(rep_elt->event_loop, &rep_elt->ev);
}
else if (rep_elt->attached_fd != -1) {
/* Only for non-sent requests! */
close(rep_elt->attached_fd);
}

g_hash_table_unref(rep_elt->pending_elts);
g_free(rep_elt);
}

static inline void
rspamd_control_fill_msghdr(struct rspamd_control_command *cmd,
int attached_fd, struct msghdr *msg,
struct iovec *iov)
{
struct cmsghdr *cmsg;
unsigned char fdspace[CMSG_SPACE(sizeof(int))];

memset(msg, 0, sizeof(*msg));

/* Attach fd to the message */
if (attached_fd != -1) {
memset(fdspace, 0, sizeof(fdspace));
msg->msg_control = fdspace;
msg->msg_controllen = sizeof(fdspace);
cmsg = CMSG_FIRSTHDR(msg);
cmsg->cmsg_level = SOL_SOCKET;
cmsg->cmsg_type = SCM_RIGHTS;
cmsg->cmsg_len = CMSG_LEN(sizeof(int));
memcpy(CMSG_DATA(cmsg), &attached_fd, sizeof(int));
}

iov->iov_base = cmd;
iov->iov_len = sizeof(*cmd);
msg->msg_iov = iov;
msg->msg_iovlen = 1;
}

static void
rspamd_control_stop_pending(struct rspamd_control_reply_elt *elt)
{
GHashTable *htb;
struct rspamd_main *rspamd_main;
gsize pending;

/* It stops event and frees hash */
htb = elt->pending_elts;

pending = g_hash_table_size(htb);
msg_debug_control("stop pending for %P(%s), %d events pending", elt->wrk_pid,
g_quark_to_string(elt->wrk_type),
(int) pending);

if (elt->worker->state != rspamd_worker_state_terminating && pending != 0) {
/* Invoke another event from the queue */
GHashTableIter it;
gpointer k, v;

g_hash_table_iter_init(&it, elt->pending_elts);

while (g_hash_table_iter_next(&it, &k, &v)) {
struct rspamd_control_reply_elt *cur = v;

if (!cur->sent) {
struct msghdr msg;
struct iovec iov;

rspamd_main = cur->worker->srv;
rspamd_control_fill_msghdr(&cur->cmd, cur->attached_fd, &msg, &iov);
ssize_t r = sendmsg(cur->worker->control_pipe[0], &msg, 0);

if (r == sizeof(cur->cmd)) {
msg_debug_control("restarting pending event for %P(%s), %d events pending",
cur->wrk_pid,
g_quark_to_string(cur->wrk_type),
(int) pending - 1);
rspamd_ev_watcher_init(&cur->ev,
cur->worker->control_pipe[0],
EV_READ, cur->handler,
cur);
rspamd_ev_watcher_start(cur->event_loop,
&cur->ev, worker_io_timeout);
cur->sent = true;
if (cur->attached_fd != -1) {
/* Since `sendmsg` performs `dup` for us, we need to remove our own descriptor */
close(cur->attached_fd);
cur->attached_fd = -1;
}

break; /* Exit the outer loop as we have invoked something */
}
else {
msg_err_main("cannot write command %d to the worker %P(%s), fd: %d: %s",
(int) cur->cmd.type,
cur->wrk_pid,
g_quark_to_string(cur->wrk_type),
cur->worker->control_pipe[0],
strerror(errno));
g_hash_table_remove(elt->pending_elts, cur);
}
}
}
}

/* Remove from hash and performs the cleanup */
g_hash_table_remove(elt->pending_elts, elt);
}

static struct rspamd_control_reply_elt *
rspamd_control_broadcast_cmd(struct rspamd_main *rspamd_main,
struct rspamd_control_command *cmd,
@@ -373,10 +480,6 @@ rspamd_control_broadcast_cmd(struct rspamd_main *rspamd_main,
struct rspamd_worker *wrk;
struct rspamd_control_reply_elt *rep_elt, *res = NULL;
gpointer k, v;
struct msghdr msg;
struct cmsghdr *cmsg;
struct iovec iov;
unsigned char fdspace[CMSG_SPACE(sizeof(int))];
gssize r;

g_hash_table_iter_init(&it, rspamd_main->workers);
@@ -398,51 +501,89 @@ rspamd_control_broadcast_cmd(struct rspamd_main *rspamd_main,
continue;
}

memset(&msg, 0, sizeof(msg));

/* Attach fd to the message */
if (attached_fd != -1) {
memset(fdspace, 0, sizeof(fdspace));
msg.msg_control = fdspace;
msg.msg_controllen = sizeof(fdspace);
cmsg = CMSG_FIRSTHDR(&msg);
cmsg->cmsg_level = SOL_SOCKET;
cmsg->cmsg_type = SCM_RIGHTS;
cmsg->cmsg_len = CMSG_LEN(sizeof(int));
memcpy(CMSG_DATA(cmsg), &attached_fd, sizeof(int));
}

iov.iov_base = cmd;
iov.iov_len = sizeof(*cmd);
msg.msg_iov = &iov;
msg.msg_iovlen = 1;

r = sendmsg(wrk->control_pipe[0], &msg, 0);

if (r == sizeof(*cmd)) {
rep_elt = g_malloc0(sizeof(*rep_elt));
rep_elt->wrk_pid = wrk->pid;
rep_elt->wrk_type = wrk->type;
rep_elt->event_loop = rspamd_main->event_loop;
rep_elt->ud = ud;
rep_elt->pending_elts = g_hash_table_ref(wrk->control_events_pending);
rspamd_ev_watcher_init(&rep_elt->ev,
wrk->control_pipe[0],
EV_READ, handler,
rep_elt);
rspamd_ev_watcher_start(rspamd_main->event_loop,
&rep_elt->ev, worker_io_timeout);
g_hash_table_insert(wrk->control_events_pending, rep_elt, rep_elt);

DL_APPEND(res, rep_elt);
rep_elt = g_malloc0(sizeof(*rep_elt));
rep_elt->worker = wrk;
rep_elt->wrk_pid = wrk->pid;
rep_elt->wrk_type = wrk->type;
rep_elt->event_loop = rspamd_main->event_loop;
rep_elt->ud = ud;
rep_elt->handler = handler;
memcpy(&rep_elt->cmd, cmd, sizeof(*cmd));
rep_elt->sent = false;
rep_elt->attached_fd = -1;

if (g_hash_table_size(wrk->control_events_pending) == 0) {
/* We can send command */
struct msghdr msg;
struct iovec iov;

rspamd_control_fill_msghdr(cmd, attached_fd, &msg, &iov);
r = sendmsg(wrk->control_pipe[0], &msg, 0);

if (r == sizeof(*cmd)) {
rspamd_ev_watcher_init(&rep_elt->ev,
wrk->control_pipe[0],
EV_READ, handler,
rep_elt);
rspamd_ev_watcher_start(rspamd_main->event_loop,
&rep_elt->ev, worker_io_timeout);
rep_elt->sent = true;
rep_elt->pending_elts = g_hash_table_ref(wrk->control_events_pending);
g_hash_table_insert(wrk->control_events_pending, rep_elt, rep_elt);

DL_APPEND(res, rep_elt);
msg_debug_control("sent command %d to the worker %P(%s), fd: %d",
(int) cmd->type,
wrk->pid,
g_quark_to_string(wrk->type),
wrk->control_pipe[0]);
}
else {
msg_err_main("cannot write command %d to the worker %P(%s), fd: %d: %s",
(int) cmd->type,
wrk->pid,
g_quark_to_string(wrk->type),
wrk->control_pipe[0],
strerror(errno));
g_free(rep_elt);
}
}
else {
msg_err_main("cannot write command %d(%z) to the worker %P(%s), fd: %d: %s",
(int) cmd->type, iov.iov_len,
wrk->pid,
g_quark_to_string(wrk->type),
wrk->control_pipe[0],
strerror(errno));
/* We need to wait till the last command is processed, or it will mess up all serialization */
msg_debug_control("pending event for %P(%s), %d events pending",
wrk->pid,
g_quark_to_string(wrk->type),
(int) g_hash_table_size(wrk->control_events_pending));
rep_elt->pending_elts = g_hash_table_ref(wrk->control_events_pending);
/*
* Here are dragons:
* If we have a descriptor to send, the callee expects that we follow
* sendmsg semantics that performs `dup` on it. So we need to clone fd and keep it there.
*/
if (attached_fd != -1) {
rep_elt->attached_fd = dup(attached_fd);

if (rep_elt->attached_fd == -1) {
/*
* We have a problem: file descriptors limit is reached, so we cannot really deal with this
* request
*/
msg_err_main("cannot duplicate file descriptor to send command to worker %P(%s): %s; failed to send command",
wrk->pid,
g_quark_to_string(wrk->type),
strerror(errno));
g_hash_table_unref(rep_elt->pending_elts);
g_free(rep_elt);
}
else {
g_hash_table_insert(wrk->control_events_pending, rep_elt, rep_elt);
DL_APPEND(res, rep_elt);
}
}
else {
g_hash_table_insert(wrk->control_events_pending, rep_elt, rep_elt);
DL_APPEND(res, rep_elt);
}
}
}

@@ -737,7 +878,7 @@ rspamd_control_ignore_io_handler(int fd, short what, void *ud)

/* At this point we just ignore replies from the workers */
if (read(fd, &rep, sizeof(rep)) == -1) {
msg_debug("cannot read %d bytes: %s", (int) sizeof(rep), strerror(errno));
msg_debug_control("cannot read %d bytes: %s", (int) sizeof(rep), strerror(errno));
}
rspamd_control_stop_pending(elt);
}

+ 11
- 11
src/libutil/libev_helper.c View File

@@ -1,11 +1,11 @@
/*-
* Copyright 2019 Vsevolod Stakhov
/*
* Copyright 2024 Vsevolod Stakhov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -57,7 +57,7 @@ void rspamd_ev_watcher_start(struct ev_loop *loop,
{
g_assert(ev->cb != NULL);

ev_io_start(EV_A_ & ev->io);
ev_io_start(EV_A, &ev->io);

if (timeout > 0) {
/* Update timestamp to avoid timers running early */
@@ -65,7 +65,7 @@ void rspamd_ev_watcher_start(struct ev_loop *loop,

ev->timeout = timeout;
ev_timer_set(&ev->tm, timeout, 0.0);
ev_timer_start(EV_A_ & ev->tm);
ev_timer_start(EV_A, &ev->tm);
}
}

@@ -73,11 +73,11 @@ void rspamd_ev_watcher_stop(struct ev_loop *loop,
struct rspamd_io_ev *ev)
{
if (ev_can_stop(&ev->io)) {
ev_io_stop(EV_A_ & ev->io);
ev_io_stop(EV_A, &ev->io);
}

if (ev->timeout > 0) {
ev_timer_stop(EV_A_ & ev->tm);
ev_timer_stop(EV_A, &ev->tm);
}
}

@@ -88,14 +88,14 @@ void rspamd_ev_watcher_reschedule(struct ev_loop *loop,
g_assert(ev->cb != NULL);

if (ev_can_stop(&ev->io)) {
ev_io_stop(EV_A_ & ev->io);
ev_io_stop(EV_A, &ev->io);
ev_io_set(&ev->io, ev->io.fd, what);
ev_io_start(EV_A_ & ev->io);
ev_io_start(EV_A, &ev->io);
}
else {
ev->io.data = ev;
ev_io_init(&ev->io, rspamd_ev_watcher_io_cb, ev->io.fd, what);
ev_io_start(EV_A_ & ev->io);
ev_io_start(EV_A, &ev->io);
}

if (ev->timeout > 0) {
@@ -105,7 +105,7 @@ void rspamd_ev_watcher_reschedule(struct ev_loop *loop,

ev->tm.data = ev;
ev_timer_init(&ev->tm, rspamd_ev_watcher_timer_cb, ev->timeout, 0.0);
ev_timer_start(EV_A_ & ev->tm);
ev_timer_start(EV_A, &ev->tm);
}
}
}

Loading…
Cancel
Save