diff options
Diffstat (limited to 'src/libserver/rspamd_control.c')
-rw-r--r-- | src/libserver/rspamd_control.c | 261 |
1 files changed, 201 insertions, 60 deletions
diff --git a/src/libserver/rspamd_control.c b/src/libserver/rspamd_control.c index 8dd7595f4..1bff2ff12 100644 --- a/src/libserver/rspamd_control.c +++ b/src/libserver/rspamd_control.c @@ -1,5 +1,5 @@ /* - * Copyright 2023 Vsevolod Stakhov + * Copyright 2024 Vsevolod Stakhov * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -31,6 +31,11 @@ #include "hyperscan_tools.h" #endif +#define msg_debug_control(...) rspamd_conditional_debug_fast(NULL, NULL, \ + rspamd_control_log_id, "control", NULL, \ + RSPAMD_LOG_FUNC, \ + __VA_ARGS__) + static ev_tstamp io_timeout = 30.0; static ev_tstamp worker_io_timeout = 0.5; @@ -40,10 +45,14 @@ struct rspamd_control_reply_elt { struct rspamd_control_reply reply; struct rspamd_io_ev ev; struct ev_loop *event_loop; + struct rspamd_worker *worker; GQuark wrk_type; pid_t wrk_pid; + rspamd_ev_cb handler; gpointer ud; int attached_fd; + bool sent; + struct rspamd_control_command cmd; GHashTable *pending_elts; struct rspamd_control_reply_elt *prev, *next; }; @@ -76,17 +85,9 @@ static const struct rspamd_control_cmd_match { }; static void rspamd_control_ignore_io_handler(int fd, short what, void *ud); +static void rspamd_control_stop_pending(struct rspamd_control_reply_elt *elt); -static void -rspamd_control_stop_pending(struct rspamd_control_reply_elt *elt) -{ - GHashTable *htb; - /* It stops event and frees hash */ - htb = elt->pending_elts; - g_hash_table_remove(elt->pending_elts, elt); - /* Release hash reference */ - g_hash_table_unref(htb); -} +INIT_LOG_MODULE(control) void rspamd_control_send_error(struct rspamd_control_session *session, int code, const char *error_msg, ...) @@ -357,10 +358,116 @@ void rspamd_pending_control_free(gpointer p) { struct rspamd_control_reply_elt *rep_elt = (struct rspamd_control_reply_elt *) p; - rspamd_ev_watcher_stop(rep_elt->event_loop, &rep_elt->ev); + if (rep_elt->sent) { + rspamd_ev_watcher_stop(rep_elt->event_loop, &rep_elt->ev); + } + else if (rep_elt->attached_fd != -1) { + /* Only for non-sent requests! */ + close(rep_elt->attached_fd); + } + + g_hash_table_unref(rep_elt->pending_elts); g_free(rep_elt); } +static inline void +rspamd_control_fill_msghdr(struct rspamd_control_command *cmd, + int attached_fd, struct msghdr *msg, + struct iovec *iov) +{ + struct cmsghdr *cmsg; + unsigned char fdspace[CMSG_SPACE(sizeof(int))]; + + memset(msg, 0, sizeof(*msg)); + + /* Attach fd to the message */ + if (attached_fd != -1) { + memset(fdspace, 0, sizeof(fdspace)); + msg->msg_control = fdspace; + msg->msg_controllen = sizeof(fdspace); + cmsg = CMSG_FIRSTHDR(msg); + cmsg->cmsg_level = SOL_SOCKET; + cmsg->cmsg_type = SCM_RIGHTS; + cmsg->cmsg_len = CMSG_LEN(sizeof(int)); + memcpy(CMSG_DATA(cmsg), &attached_fd, sizeof(int)); + } + + iov->iov_base = cmd; + iov->iov_len = sizeof(*cmd); + msg->msg_iov = iov; + msg->msg_iovlen = 1; +} + +static void +rspamd_control_stop_pending(struct rspamd_control_reply_elt *elt) +{ + GHashTable *htb; + struct rspamd_main *rspamd_main; + gsize pending; + + /* It stops event and frees hash */ + htb = elt->pending_elts; + + pending = g_hash_table_size(htb); + msg_debug_control("stop pending for %P(%s), %d events pending", elt->wrk_pid, + g_quark_to_string(elt->wrk_type), + (int) pending); + + if (elt->worker->state != rspamd_worker_state_terminating && pending != 0) { + /* Invoke another event from the queue */ + GHashTableIter it; + gpointer k, v; + + g_hash_table_iter_init(&it, elt->pending_elts); + + while (g_hash_table_iter_next(&it, &k, &v)) { + struct rspamd_control_reply_elt *cur = v; + + if (!cur->sent) { + struct msghdr msg; + struct iovec iov; + + rspamd_main = cur->worker->srv; + rspamd_control_fill_msghdr(&cur->cmd, cur->attached_fd, &msg, &iov); + ssize_t r = sendmsg(cur->worker->control_pipe[0], &msg, 0); + + if (r == sizeof(cur->cmd)) { + msg_debug_control("restarting pending event for %P(%s), %d events pending", + cur->wrk_pid, + g_quark_to_string(cur->wrk_type), + (int) pending - 1); + rspamd_ev_watcher_init(&cur->ev, + cur->worker->control_pipe[0], + EV_READ, cur->handler, + cur); + rspamd_ev_watcher_start(cur->event_loop, + &cur->ev, worker_io_timeout); + cur->sent = true; + if (cur->attached_fd != -1) { + /* Since `sendmsg` performs `dup` for us, we need to remove our own descriptor */ + close(cur->attached_fd); + cur->attached_fd = -1; + } + + break; /* Exit the outer loop as we have invoked something */ + } + else { + msg_err_main("cannot write command %d to the worker %P(%s), fd: %d: %s", + (int) cur->cmd.type, + cur->wrk_pid, + g_quark_to_string(cur->wrk_type), + cur->worker->control_pipe[0], + strerror(errno)); + g_hash_table_remove(elt->pending_elts, cur); + } + } + } + } + + /* Remove from hash and performs the cleanup */ + g_hash_table_remove(elt->pending_elts, elt); +} + static struct rspamd_control_reply_elt * rspamd_control_broadcast_cmd(struct rspamd_main *rspamd_main, struct rspamd_control_command *cmd, @@ -373,10 +480,6 @@ rspamd_control_broadcast_cmd(struct rspamd_main *rspamd_main, struct rspamd_worker *wrk; struct rspamd_control_reply_elt *rep_elt, *res = NULL; gpointer k, v; - struct msghdr msg; - struct cmsghdr *cmsg; - struct iovec iov; - unsigned char fdspace[CMSG_SPACE(sizeof(int))]; gssize r; g_hash_table_iter_init(&it, rspamd_main->workers); @@ -398,51 +501,89 @@ rspamd_control_broadcast_cmd(struct rspamd_main *rspamd_main, continue; } - memset(&msg, 0, sizeof(msg)); - - /* Attach fd to the message */ - if (attached_fd != -1) { - memset(fdspace, 0, sizeof(fdspace)); - msg.msg_control = fdspace; - msg.msg_controllen = sizeof(fdspace); - cmsg = CMSG_FIRSTHDR(&msg); - cmsg->cmsg_level = SOL_SOCKET; - cmsg->cmsg_type = SCM_RIGHTS; - cmsg->cmsg_len = CMSG_LEN(sizeof(int)); - memcpy(CMSG_DATA(cmsg), &attached_fd, sizeof(int)); - } - - iov.iov_base = cmd; - iov.iov_len = sizeof(*cmd); - msg.msg_iov = &iov; - msg.msg_iovlen = 1; - - r = sendmsg(wrk->control_pipe[0], &msg, 0); - - if (r == sizeof(*cmd)) { - rep_elt = g_malloc0(sizeof(*rep_elt)); - rep_elt->wrk_pid = wrk->pid; - rep_elt->wrk_type = wrk->type; - rep_elt->event_loop = rspamd_main->event_loop; - rep_elt->ud = ud; - rep_elt->pending_elts = g_hash_table_ref(wrk->control_events_pending); - rspamd_ev_watcher_init(&rep_elt->ev, - wrk->control_pipe[0], - EV_READ, handler, - rep_elt); - rspamd_ev_watcher_start(rspamd_main->event_loop, - &rep_elt->ev, worker_io_timeout); - g_hash_table_insert(wrk->control_events_pending, rep_elt, rep_elt); - - DL_APPEND(res, rep_elt); + rep_elt = g_malloc0(sizeof(*rep_elt)); + rep_elt->worker = wrk; + rep_elt->wrk_pid = wrk->pid; + rep_elt->wrk_type = wrk->type; + rep_elt->event_loop = rspamd_main->event_loop; + rep_elt->ud = ud; + rep_elt->handler = handler; + memcpy(&rep_elt->cmd, cmd, sizeof(*cmd)); + rep_elt->sent = false; + rep_elt->attached_fd = -1; + + if (g_hash_table_size(wrk->control_events_pending) == 0) { + /* We can send command */ + struct msghdr msg; + struct iovec iov; + + rspamd_control_fill_msghdr(cmd, attached_fd, &msg, &iov); + r = sendmsg(wrk->control_pipe[0], &msg, 0); + + if (r == sizeof(*cmd)) { + rspamd_ev_watcher_init(&rep_elt->ev, + wrk->control_pipe[0], + EV_READ, handler, + rep_elt); + rspamd_ev_watcher_start(rspamd_main->event_loop, + &rep_elt->ev, worker_io_timeout); + rep_elt->sent = true; + rep_elt->pending_elts = g_hash_table_ref(wrk->control_events_pending); + g_hash_table_insert(wrk->control_events_pending, rep_elt, rep_elt); + + DL_APPEND(res, rep_elt); + msg_debug_control("sent command %d to the worker %P(%s), fd: %d", + (int) cmd->type, + wrk->pid, + g_quark_to_string(wrk->type), + wrk->control_pipe[0]); + } + else { + msg_err_main("cannot write command %d to the worker %P(%s), fd: %d: %s", + (int) cmd->type, + wrk->pid, + g_quark_to_string(wrk->type), + wrk->control_pipe[0], + strerror(errno)); + g_free(rep_elt); + } } else { - msg_err_main("cannot write command %d(%z) to the worker %P(%s), fd: %d: %s", - (int) cmd->type, iov.iov_len, - wrk->pid, - g_quark_to_string(wrk->type), - wrk->control_pipe[0], - strerror(errno)); + /* We need to wait till the last command is processed, or it will mess up all serialization */ + msg_debug_control("pending event for %P(%s), %d events pending", + wrk->pid, + g_quark_to_string(wrk->type), + (int) g_hash_table_size(wrk->control_events_pending)); + rep_elt->pending_elts = g_hash_table_ref(wrk->control_events_pending); + /* + * Here are dragons: + * If we have a descriptor to send, the callee expects that we follow + * sendmsg semantics that performs `dup` on it. So we need to clone fd and keep it there. + */ + if (attached_fd != -1) { + rep_elt->attached_fd = dup(attached_fd); + + if (rep_elt->attached_fd == -1) { + /* + * We have a problem: file descriptors limit is reached, so we cannot really deal with this + * request + */ + msg_err_main("cannot duplicate file descriptor to send command to worker %P(%s): %s; failed to send command", + wrk->pid, + g_quark_to_string(wrk->type), + strerror(errno)); + g_hash_table_unref(rep_elt->pending_elts); + g_free(rep_elt); + } + else { + g_hash_table_insert(wrk->control_events_pending, rep_elt, rep_elt); + DL_APPEND(res, rep_elt); + } + } + else { + g_hash_table_insert(wrk->control_events_pending, rep_elt, rep_elt); + DL_APPEND(res, rep_elt); + } } } @@ -737,7 +878,7 @@ rspamd_control_ignore_io_handler(int fd, short what, void *ud) /* At this point we just ignore replies from the workers */ if (read(fd, &rep, sizeof(rep)) == -1) { - msg_debug("cannot read %d bytes: %s", (int) sizeof(rep), strerror(errno)); + msg_debug_control("cannot read %d bytes: %s", (int) sizeof(rep), strerror(errno)); } rspamd_control_stop_pending(elt); } |