Browse Source

[Rework] Serialize control commands

In fact, we cannot send multiple commands and read them through the pipe. It has
caused multiple weird issues in the past but I can now see clearly how it should
be done. We should send commands and serialize all requests pending to let them
being sent one by one, after reply for the previous command has been received.
pull/4937/head
Vsevolod Stakhov 1 week ago
parent
commit
6d1762d85e
No account linked to committer's email address
2 changed files with 169 additions and 68 deletions
  1. 158
    57
      src/libserver/rspamd_control.c
  2. 11
    11
      src/libutil/libev_helper.c

+ 158
- 57
src/libserver/rspamd_control.c View File

@@ -1,5 +1,5 @@
/*
* Copyright 2023 Vsevolod Stakhov
* Copyright 2024 Vsevolod Stakhov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -31,6 +31,11 @@
#include "hyperscan_tools.h"
#endif

#define msg_debug_control(...) rspamd_conditional_debug_fast(NULL, NULL, \
rspamd_control_log_id, "control", NULL, \
RSPAMD_LOG_FUNC, \
__VA_ARGS__)

static ev_tstamp io_timeout = 30.0;
static ev_tstamp worker_io_timeout = 0.5;

@@ -40,10 +45,14 @@ struct rspamd_control_reply_elt {
struct rspamd_control_reply reply;
struct rspamd_io_ev ev;
struct ev_loop *event_loop;
struct rspamd_worker *worker;
GQuark wrk_type;
pid_t wrk_pid;
rspamd_ev_cb handler;
gpointer ud;
int attached_fd;
bool sent;
struct rspamd_control_command cmd;
GHashTable *pending_elts;
struct rspamd_control_reply_elt *prev, *next;
};
@@ -76,17 +85,9 @@ static const struct rspamd_control_cmd_match {
};

static void rspamd_control_ignore_io_handler(int fd, short what, void *ud);
static void rspamd_control_stop_pending(struct rspamd_control_reply_elt *elt);

static void
rspamd_control_stop_pending(struct rspamd_control_reply_elt *elt)
{
GHashTable *htb;
/* It stops event and frees hash */
htb = elt->pending_elts;
g_hash_table_remove(elt->pending_elts, elt);
/* Release hash reference */
g_hash_table_unref(htb);
}
INIT_LOG_MODULE(control)

void rspamd_control_send_error(struct rspamd_control_session *session,
int code, const char *error_msg, ...)
@@ -361,6 +362,103 @@ void rspamd_pending_control_free(gpointer p)
g_free(rep_elt);
}

static inline void
rspamd_control_fill_msghdr(struct rspamd_control_command *cmd,
int attached_fd, struct msghdr *msg)
{
struct cmsghdr *cmsg;
struct iovec iov;
unsigned char fdspace[CMSG_SPACE(sizeof(int))];

memset(msg, 0, sizeof(*msg));

/* Attach fd to the message */
if (attached_fd != -1) {
memset(fdspace, 0, sizeof(fdspace));
msg->msg_control = fdspace;
msg->msg_controllen = sizeof(fdspace);
cmsg = CMSG_FIRSTHDR(msg);
cmsg->cmsg_level = SOL_SOCKET;
cmsg->cmsg_type = SCM_RIGHTS;
cmsg->cmsg_len = CMSG_LEN(sizeof(int));
memcpy(CMSG_DATA(cmsg), &attached_fd, sizeof(int));
}

iov.iov_base = cmd;
iov.iov_len = sizeof(*cmd);
msg->msg_iov = &iov;
msg->msg_iovlen = 1;
}

static void
rspamd_control_stop_pending(struct rspamd_control_reply_elt *elt)
{
GHashTable *htb;
struct rspamd_main *rspamd_main;
gsize pending;

/* It stops event and frees hash */
htb = elt->pending_elts;

pending = g_hash_table_size(htb);
msg_debug_control("stop pending for %P(%s), %d events pending", elt->wrk_pid,
g_quark_to_string(elt->wrk_type),
(int) pending);

if (pending != 0) {
/* Invoke another event from the queue */
GHashTableIter it;
gpointer k, v;

g_hash_table_iter_init(&it, elt->pending_elts);

while (g_hash_table_iter_next(&it, &k, &v)) {
struct rspamd_control_reply_elt *cur = v;

if (!cur->sent) {
struct msghdr msg;

rspamd_main = cur->worker->srv;
rspamd_control_fill_msghdr(&cur->cmd, cur->attached_fd, &msg);
ssize_t r = sendmsg(cur->worker->control_pipe[0], &msg, 0);

if (r == sizeof(cur->cmd)) {
msg_debug_control("restarting pending event for %P(%s), %d events pending",
cur->wrk_pid,
g_quark_to_string(cur->wrk_type),
(int) pending - 1);
rspamd_ev_watcher_init(&cur->ev,
cur->worker->control_pipe[0],
EV_READ, cur->handler,
cur);
rspamd_ev_watcher_start(cur->event_loop,
&cur->ev, worker_io_timeout);
cur->sent = true;

break; /* Exit the outer loop as we have invoked something */
}
else {
msg_err_main("cannot write command %d to the worker %P(%s), fd: %d: %s",
(int) cur->cmd.type,
cur->wrk_pid,
g_quark_to_string(cur->wrk_type),
cur->worker->control_pipe[0],
strerror(errno));
g_hash_table_remove(elt->pending_elts, cur);
/* Free one refcounter on a hash table as it was taken by `cur` */
g_hash_table_unref(htb);
}
}
}
}

/* Remove from hash and performs the cleanup */
g_hash_table_remove(elt->pending_elts, elt);

/* Release hash reference */
g_hash_table_unref(htb);
}

static struct rspamd_control_reply_elt *
rspamd_control_broadcast_cmd(struct rspamd_main *rspamd_main,
struct rspamd_control_command *cmd,
@@ -374,9 +472,6 @@ rspamd_control_broadcast_cmd(struct rspamd_main *rspamd_main,
struct rspamd_control_reply_elt *rep_elt, *res = NULL;
gpointer k, v;
struct msghdr msg;
struct cmsghdr *cmsg;
struct iovec iov;
unsigned char fdspace[CMSG_SPACE(sizeof(int))];
gssize r;

g_hash_table_iter_init(&it, rspamd_main->workers);
@@ -398,52 +493,58 @@ rspamd_control_broadcast_cmd(struct rspamd_main *rspamd_main,
continue;
}

memset(&msg, 0, sizeof(msg));

/* Attach fd to the message */
if (attached_fd != -1) {
memset(fdspace, 0, sizeof(fdspace));
msg.msg_control = fdspace;
msg.msg_controllen = sizeof(fdspace);
cmsg = CMSG_FIRSTHDR(&msg);
cmsg->cmsg_level = SOL_SOCKET;
cmsg->cmsg_type = SCM_RIGHTS;
cmsg->cmsg_len = CMSG_LEN(sizeof(int));
memcpy(CMSG_DATA(cmsg), &attached_fd, sizeof(int));
rep_elt = g_malloc0(sizeof(*rep_elt));
rep_elt->worker = wrk;
rep_elt->wrk_pid = wrk->pid;
rep_elt->wrk_type = wrk->type;
rep_elt->event_loop = rspamd_main->event_loop;
rep_elt->ud = ud;
rep_elt->handler = handler;
rep_elt->pending_elts = g_hash_table_ref(wrk->control_events_pending);
memcpy(&rep_elt->cmd, cmd, sizeof(*cmd));
rep_elt->sent = false;

if (g_hash_table_size(wrk->control_events_pending) == 0) {
/* We can send command */
rspamd_control_fill_msghdr(cmd, attached_fd, &msg);
r = sendmsg(wrk->control_pipe[0], &msg, 0);

if (r == sizeof(*cmd)) {
rspamd_ev_watcher_init(&rep_elt->ev,
wrk->control_pipe[0],
EV_READ, handler,
rep_elt);
rspamd_ev_watcher_start(rspamd_main->event_loop,
&rep_elt->ev, worker_io_timeout);
g_hash_table_insert(wrk->control_events_pending, rep_elt, rep_elt);
rep_elt->sent = true;

DL_APPEND(res, rep_elt);
msg_debug_control("sent command %d to the worker %P(%s), fd: %d",
(int) cmd->type,
wrk->pid,
g_quark_to_string(wrk->type),
wrk->control_pipe[0]);
}
else {
msg_err_main("cannot write command %d to the worker %P(%s), fd: %d: %s",
(int) cmd->type,
wrk->pid,
g_quark_to_string(wrk->type),
wrk->control_pipe[0],
strerror(errno));
g_free(rep_elt);
}
}

iov.iov_base = cmd;
iov.iov_len = sizeof(*cmd);
msg.msg_iov = &iov;
msg.msg_iovlen = 1;

r = sendmsg(wrk->control_pipe[0], &msg, 0);

if (r == sizeof(*cmd)) {
rep_elt = g_malloc0(sizeof(*rep_elt));
rep_elt->wrk_pid = wrk->pid;
rep_elt->wrk_type = wrk->type;
rep_elt->event_loop = rspamd_main->event_loop;
rep_elt->ud = ud;
rep_elt->pending_elts = g_hash_table_ref(wrk->control_events_pending);
rspamd_ev_watcher_init(&rep_elt->ev,
wrk->control_pipe[0],
EV_READ, handler,
rep_elt);
rspamd_ev_watcher_start(rspamd_main->event_loop,
&rep_elt->ev, worker_io_timeout);
else {
/* We need to wait till the last command is processed, or it will mess up all serialization */
msg_debug_control("pending event for %P(%s), %d events pending",
wrk->pid,
g_quark_to_string(wrk->type),
(int) g_hash_table_size(wrk->control_events_pending));
g_hash_table_insert(wrk->control_events_pending, rep_elt, rep_elt);

DL_APPEND(res, rep_elt);
}
else {
msg_err_main("cannot write command %d(%z) to the worker %P(%s), fd: %d: %s",
(int) cmd->type, iov.iov_len,
wrk->pid,
g_quark_to_string(wrk->type),
wrk->control_pipe[0],
strerror(errno));
}
}

return res;
@@ -737,7 +838,7 @@ rspamd_control_ignore_io_handler(int fd, short what, void *ud)

/* At this point we just ignore replies from the workers */
if (read(fd, &rep, sizeof(rep)) == -1) {
msg_debug("cannot read %d bytes: %s", (int) sizeof(rep), strerror(errno));
msg_debug_control("cannot read %d bytes: %s", (int) sizeof(rep), strerror(errno));
}
rspamd_control_stop_pending(elt);
}

+ 11
- 11
src/libutil/libev_helper.c View File

@@ -1,11 +1,11 @@
/*-
* Copyright 2019 Vsevolod Stakhov
/*
* Copyright 2024 Vsevolod Stakhov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -57,7 +57,7 @@ void rspamd_ev_watcher_start(struct ev_loop *loop,
{
g_assert(ev->cb != NULL);

ev_io_start(EV_A_ & ev->io);
ev_io_start(EV_A, &ev->io);

if (timeout > 0) {
/* Update timestamp to avoid timers running early */
@@ -65,7 +65,7 @@ void rspamd_ev_watcher_start(struct ev_loop *loop,

ev->timeout = timeout;
ev_timer_set(&ev->tm, timeout, 0.0);
ev_timer_start(EV_A_ & ev->tm);
ev_timer_start(EV_A, &ev->tm);
}
}

@@ -73,11 +73,11 @@ void rspamd_ev_watcher_stop(struct ev_loop *loop,
struct rspamd_io_ev *ev)
{
if (ev_can_stop(&ev->io)) {
ev_io_stop(EV_A_ & ev->io);
ev_io_stop(EV_A, &ev->io);
}

if (ev->timeout > 0) {
ev_timer_stop(EV_A_ & ev->tm);
ev_timer_stop(EV_A, &ev->tm);
}
}

@@ -88,14 +88,14 @@ void rspamd_ev_watcher_reschedule(struct ev_loop *loop,
g_assert(ev->cb != NULL);

if (ev_can_stop(&ev->io)) {
ev_io_stop(EV_A_ & ev->io);
ev_io_stop(EV_A, &ev->io);
ev_io_set(&ev->io, ev->io.fd, what);
ev_io_start(EV_A_ & ev->io);
ev_io_start(EV_A, &ev->io);
}
else {
ev->io.data = ev;
ev_io_init(&ev->io, rspamd_ev_watcher_io_cb, ev->io.fd, what);
ev_io_start(EV_A_ & ev->io);
ev_io_start(EV_A, &ev->io);
}

if (ev->timeout > 0) {
@@ -105,7 +105,7 @@ void rspamd_ev_watcher_reschedule(struct ev_loop *loop,

ev->tm.data = ev;
ev_timer_init(&ev->tm, rspamd_ev_watcher_timer_cb, ev->timeout, 0.0);
ev_timer_start(EV_A_ & ev->tm);
ev_timer_start(EV_A, &ev->tm);
}
}
}

Loading…
Cancel
Save