/*
- * Copyright 2023 Vsevolod Stakhov
+ * Copyright 2024 Vsevolod Stakhov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
#include "hyperscan_tools.h"
#endif
+#define msg_debug_control(...) rspamd_conditional_debug_fast(NULL, NULL, \
+ rspamd_control_log_id, "control", NULL, \
+ RSPAMD_LOG_FUNC, \
+ __VA_ARGS__)
+
static ev_tstamp io_timeout = 30.0;
static ev_tstamp worker_io_timeout = 0.5;
struct rspamd_control_reply reply;
struct rspamd_io_ev ev;
struct ev_loop *event_loop;
+ struct rspamd_worker *worker;
GQuark wrk_type;
pid_t wrk_pid;
+ rspamd_ev_cb handler;
gpointer ud;
int attached_fd;
+ bool sent;
+ struct rspamd_control_command cmd;
GHashTable *pending_elts;
struct rspamd_control_reply_elt *prev, *next;
};
};
static void rspamd_control_ignore_io_handler(int fd, short what, void *ud);
+static void rspamd_control_stop_pending(struct rspamd_control_reply_elt *elt);
-static void
-rspamd_control_stop_pending(struct rspamd_control_reply_elt *elt)
-{
- GHashTable *htb;
- /* It stops event and frees hash */
- htb = elt->pending_elts;
- g_hash_table_remove(elt->pending_elts, elt);
- /* Release hash reference */
- g_hash_table_unref(htb);
-}
+INIT_LOG_MODULE(control)
void rspamd_control_send_error(struct rspamd_control_session *session,
int code, const char *error_msg, ...)
g_free(rep_elt);
}
+static inline void
+rspamd_control_fill_msghdr(struct rspamd_control_command *cmd,
+ int attached_fd, struct msghdr *msg)
+{
+ struct cmsghdr *cmsg;
+ struct iovec iov;
+ unsigned char fdspace[CMSG_SPACE(sizeof(int))];
+
+ memset(msg, 0, sizeof(*msg));
+
+ /* Attach fd to the message */
+ if (attached_fd != -1) {
+ memset(fdspace, 0, sizeof(fdspace));
+ msg->msg_control = fdspace;
+ msg->msg_controllen = sizeof(fdspace);
+ cmsg = CMSG_FIRSTHDR(msg);
+ cmsg->cmsg_level = SOL_SOCKET;
+ cmsg->cmsg_type = SCM_RIGHTS;
+ cmsg->cmsg_len = CMSG_LEN(sizeof(int));
+ memcpy(CMSG_DATA(cmsg), &attached_fd, sizeof(int));
+ }
+
+ iov.iov_base = cmd;
+ iov.iov_len = sizeof(*cmd);
+ msg->msg_iov = &iov;
+ msg->msg_iovlen = 1;
+}
+
+static void
+rspamd_control_stop_pending(struct rspamd_control_reply_elt *elt)
+{
+ GHashTable *htb;
+ struct rspamd_main *rspamd_main;
+ gsize pending;
+
+ /* It stops event and frees hash */
+ htb = elt->pending_elts;
+
+ pending = g_hash_table_size(htb);
+ msg_debug_control("stop pending for %P(%s), %d events pending", elt->wrk_pid,
+ g_quark_to_string(elt->wrk_type),
+ (int) pending);
+
+ if (pending != 0) {
+ /* Invoke another event from the queue */
+ GHashTableIter it;
+ gpointer k, v;
+
+ g_hash_table_iter_init(&it, elt->pending_elts);
+
+ while (g_hash_table_iter_next(&it, &k, &v)) {
+ struct rspamd_control_reply_elt *cur = v;
+
+ if (!cur->sent) {
+ struct msghdr msg;
+
+ rspamd_main = cur->worker->srv;
+ rspamd_control_fill_msghdr(&cur->cmd, cur->attached_fd, &msg);
+ ssize_t r = sendmsg(cur->worker->control_pipe[0], &msg, 0);
+
+ if (r == sizeof(cur->cmd)) {
+ msg_debug_control("restarting pending event for %P(%s), %d events pending",
+ cur->wrk_pid,
+ g_quark_to_string(cur->wrk_type),
+ (int) pending - 1);
+ rspamd_ev_watcher_init(&cur->ev,
+ cur->worker->control_pipe[0],
+ EV_READ, cur->handler,
+ cur);
+ rspamd_ev_watcher_start(cur->event_loop,
+ &cur->ev, worker_io_timeout);
+ cur->sent = true;
+
+ break; /* Exit the outer loop as we have invoked something */
+ }
+ else {
+ msg_err_main("cannot write command %d to the worker %P(%s), fd: %d: %s",
+ (int) cur->cmd.type,
+ cur->wrk_pid,
+ g_quark_to_string(cur->wrk_type),
+ cur->worker->control_pipe[0],
+ strerror(errno));
+ g_hash_table_remove(elt->pending_elts, cur);
+ /* Free one refcounter on a hash table as it was taken by `cur` */
+ g_hash_table_unref(htb);
+ }
+ }
+ }
+ }
+
+ /* Remove from hash and performs the cleanup */
+ g_hash_table_remove(elt->pending_elts, elt);
+
+ /* Release hash reference */
+ g_hash_table_unref(htb);
+}
+
static struct rspamd_control_reply_elt *
rspamd_control_broadcast_cmd(struct rspamd_main *rspamd_main,
struct rspamd_control_command *cmd,
struct rspamd_control_reply_elt *rep_elt, *res = NULL;
gpointer k, v;
struct msghdr msg;
- struct cmsghdr *cmsg;
- struct iovec iov;
- unsigned char fdspace[CMSG_SPACE(sizeof(int))];
gssize r;
g_hash_table_iter_init(&it, rspamd_main->workers);
continue;
}
- memset(&msg, 0, sizeof(msg));
-
- /* Attach fd to the message */
- if (attached_fd != -1) {
- memset(fdspace, 0, sizeof(fdspace));
- msg.msg_control = fdspace;
- msg.msg_controllen = sizeof(fdspace);
- cmsg = CMSG_FIRSTHDR(&msg);
- cmsg->cmsg_level = SOL_SOCKET;
- cmsg->cmsg_type = SCM_RIGHTS;
- cmsg->cmsg_len = CMSG_LEN(sizeof(int));
- memcpy(CMSG_DATA(cmsg), &attached_fd, sizeof(int));
+ rep_elt = g_malloc0(sizeof(*rep_elt));
+ rep_elt->worker = wrk;
+ rep_elt->wrk_pid = wrk->pid;
+ rep_elt->wrk_type = wrk->type;
+ rep_elt->event_loop = rspamd_main->event_loop;
+ rep_elt->ud = ud;
+ rep_elt->handler = handler;
+ rep_elt->pending_elts = g_hash_table_ref(wrk->control_events_pending);
+ memcpy(&rep_elt->cmd, cmd, sizeof(*cmd));
+ rep_elt->sent = false;
+
+ if (g_hash_table_size(wrk->control_events_pending) == 0) {
+ /* We can send command */
+ rspamd_control_fill_msghdr(cmd, attached_fd, &msg);
+ r = sendmsg(wrk->control_pipe[0], &msg, 0);
+
+ if (r == sizeof(*cmd)) {
+ rspamd_ev_watcher_init(&rep_elt->ev,
+ wrk->control_pipe[0],
+ EV_READ, handler,
+ rep_elt);
+ rspamd_ev_watcher_start(rspamd_main->event_loop,
+ &rep_elt->ev, worker_io_timeout);
+ g_hash_table_insert(wrk->control_events_pending, rep_elt, rep_elt);
+ rep_elt->sent = true;
+
+ DL_APPEND(res, rep_elt);
+ msg_debug_control("sent command %d to the worker %P(%s), fd: %d",
+ (int) cmd->type,
+ wrk->pid,
+ g_quark_to_string(wrk->type),
+ wrk->control_pipe[0]);
+ }
+ else {
+ msg_err_main("cannot write command %d to the worker %P(%s), fd: %d: %s",
+ (int) cmd->type,
+ wrk->pid,
+ g_quark_to_string(wrk->type),
+ wrk->control_pipe[0],
+ strerror(errno));
+ g_free(rep_elt);
+ }
}
-
- iov.iov_base = cmd;
- iov.iov_len = sizeof(*cmd);
- msg.msg_iov = &iov;
- msg.msg_iovlen = 1;
-
- r = sendmsg(wrk->control_pipe[0], &msg, 0);
-
- if (r == sizeof(*cmd)) {
- rep_elt = g_malloc0(sizeof(*rep_elt));
- rep_elt->wrk_pid = wrk->pid;
- rep_elt->wrk_type = wrk->type;
- rep_elt->event_loop = rspamd_main->event_loop;
- rep_elt->ud = ud;
- rep_elt->pending_elts = g_hash_table_ref(wrk->control_events_pending);
- rspamd_ev_watcher_init(&rep_elt->ev,
- wrk->control_pipe[0],
- EV_READ, handler,
- rep_elt);
- rspamd_ev_watcher_start(rspamd_main->event_loop,
- &rep_elt->ev, worker_io_timeout);
+ else {
+ /* We need to wait till the last command is processed, or it will mess up all serialization */
+ msg_debug_control("pending event for %P(%s), %d events pending",
+ wrk->pid,
+ g_quark_to_string(wrk->type),
+ (int) g_hash_table_size(wrk->control_events_pending));
g_hash_table_insert(wrk->control_events_pending, rep_elt, rep_elt);
-
DL_APPEND(res, rep_elt);
}
- else {
- msg_err_main("cannot write command %d(%z) to the worker %P(%s), fd: %d: %s",
- (int) cmd->type, iov.iov_len,
- wrk->pid,
- g_quark_to_string(wrk->type),
- wrk->control_pipe[0],
- strerror(errno));
- }
}
return res;
/* At this point we just ignore replies from the workers */
if (read(fd, &rep, sizeof(rep)) == -1) {
- msg_debug("cannot read %d bytes: %s", (int) sizeof(rep), strerror(errno));
+ msg_debug_control("cannot read %d bytes: %s", (int) sizeof(rep), strerror(errno));
}
rspamd_control_stop_pending(elt);
}
-/*-
- * Copyright 2019 Vsevolod Stakhov
+/*
+ * Copyright 2024 Vsevolod Stakhov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
{
g_assert(ev->cb != NULL);
- ev_io_start(EV_A_ & ev->io);
+ ev_io_start(EV_A, &ev->io);
if (timeout > 0) {
/* Update timestamp to avoid timers running early */
ev->timeout = timeout;
ev_timer_set(&ev->tm, timeout, 0.0);
- ev_timer_start(EV_A_ & ev->tm);
+ ev_timer_start(EV_A, &ev->tm);
}
}
struct rspamd_io_ev *ev)
{
if (ev_can_stop(&ev->io)) {
- ev_io_stop(EV_A_ & ev->io);
+ ev_io_stop(EV_A, &ev->io);
}
if (ev->timeout > 0) {
- ev_timer_stop(EV_A_ & ev->tm);
+ ev_timer_stop(EV_A, &ev->tm);
}
}
g_assert(ev->cb != NULL);
if (ev_can_stop(&ev->io)) {
- ev_io_stop(EV_A_ & ev->io);
+ ev_io_stop(EV_A, &ev->io);
ev_io_set(&ev->io, ev->io.fd, what);
- ev_io_start(EV_A_ & ev->io);
+ ev_io_start(EV_A, &ev->io);
}
else {
ev->io.data = ev;
ev_io_init(&ev->io, rspamd_ev_watcher_io_cb, ev->io.fd, what);
- ev_io_start(EV_A_ & ev->io);
+ ev_io_start(EV_A, &ev->io);
}
if (ev->timeout > 0) {
ev->tm.data = ev;
ev_timer_init(&ev->tm, rspamd_ev_watcher_timer_cb, ev->timeout, 0.0);
- ev_timer_start(EV_A_ & ev->tm);
+ ev_timer_start(EV_A, &ev->tm);
}
}
}
\ No newline at end of file