Browse Source

[Fix] Do not cleanup hyperscan files unless new ones are loaded

tags/3.8.0
Vsevolod Stakhov 6 months ago
parent
commit
be7504e393
No account linked to committer's email address
3 changed files with 58 additions and 34 deletions
  1. 15
    1
      src/libserver/hyperscan_tools.cxx
  2. 8
    3
      src/libserver/hyperscan_tools.h
  3. 35
    30
      src/libserver/rspamd_control.c

+ 15
- 1
src/libserver/hyperscan_tools.cxx View File

ankerl::svector<std::string, 4> cache_dirs; ankerl::svector<std::string, 4> cache_dirs;
ankerl::svector<std::string, 8> cache_extensions; ankerl::svector<std::string, 8> cache_extensions;
ankerl::unordered_dense::set<std::string> known_cached_files; ankerl::unordered_dense::set<std::string> known_cached_files;
bool loaded = false;


private: private:
hs_known_files_cache() = default; hs_known_files_cache() = default;
{ {
auto env_cleanup_disable = std::getenv("RSPAMD_NO_CLEANUP"); auto env_cleanup_disable = std::getenv("RSPAMD_NO_CLEANUP");
/* We clean dir merely if we are running from the main process */ /* We clean dir merely if we are running from the main process */
if (rspamd_current_worker == nullptr && env_cleanup_disable == nullptr) {
if (rspamd_current_worker == nullptr && env_cleanup_disable == nullptr && loaded) {
const auto *log_func = RSPAMD_LOG_FUNC; const auto *log_func = RSPAMD_LOG_FUNC;
auto cleanup_dir = [&](std::string_view dir) -> void { auto cleanup_dir = [&](std::string_view dir) -> void {
for (const auto &ext: cache_extensions) { for (const auto &ext: cache_extensions) {
else if (rspamd_current_worker == nullptr && env_cleanup_disable != nullptr) { else if (rspamd_current_worker == nullptr && env_cleanup_disable != nullptr) {
msg_debug_hyperscan("disable hyperscan cleanup: env variable RSPAMD_NO_CLEANUP is set"); msg_debug_hyperscan("disable hyperscan cleanup: env variable RSPAMD_NO_CLEANUP is set");
} }
else if (!loaded) {
msg_debug_hyperscan("disable hyperscan cleanup: not loaded");
}
}

auto notice_loaded() -> void
{
loaded = true;
} }
}; };


rspamd::util::hs_known_files_cache::get().cleanup_maybe(); rspamd::util::hs_known_files_cache::get().cleanup_maybe();
} }


void rspamd_hyperscan_notice_loaded(void)
{
rspamd::util::hs_known_files_cache::get().notice_loaded();
}

#endif// WITH_HYPERSCAN #endif// WITH_HYPERSCAN

+ 8
- 3
src/libserver/hyperscan_tools.h View File

/*-
* Copyright 2022 Vsevolod Stakhov
/*
* Copyright 2023 Vsevolod Stakhov
* *
* Licensed under the Apache License, Version 2.0 (the "License"); * Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License. * you may not use this file except in compliance with the License.
* You may obtain a copy of the License at * You may obtain a copy of the License at
* *
* http://www.apache.org/licenses/LICENSE-2.0
* http://www.apache.org/licenses/LICENSE-2.0
* *
* Unless required by applicable law or agreed to in writing, software * Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, * distributed under the License is distributed on an "AS IS" BASIS,
*/ */
void rspamd_hyperscan_notice_known(const char *fname); void rspamd_hyperscan_notice_known(const char *fname);


/**
* Notice that hyperscan files are all loaded (e.g. in the main process), so we can cleanup old files on termination
*/
void rspamd_hyperscan_notice_loaded(void);

/** /**
* Cleans up old files. This method should be called on config free (in the main process) * Cleans up old files. This method should be called on config free (in the main process)
*/ */

+ 35
- 30
src/libserver/rspamd_control.c View File

/*-
* Copyright 2016 Vsevolod Stakhov
/*
* Copyright 2023 Vsevolod Stakhov
* *
* Licensed under the Apache License, Version 2.0 (the "License"); * Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License. * you may not use this file except in compliance with the License.
* You may obtain a copy of the License at * You may obtain a copy of the License at
* *
* http://www.apache.org/licenses/LICENSE-2.0
* http://www.apache.org/licenses/LICENSE-2.0
* *
* Unless required by applicable law or agreed to in writing, software * Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, * distributed under the License is distributed on an "AS IS" BASIS,
{ {
struct rspamd_worker *worker; struct rspamd_worker *worker;
static struct rspamd_srv_command cmd; static struct rspamd_srv_command cmd;
struct rspamd_main *srv;
struct rspamd_main *rspamd_main;
struct rspamd_srv_reply_data *rdata; struct rspamd_srv_reply_data *rdata;
struct msghdr msg; struct msghdr msg;
struct cmsghdr *cmsg; struct cmsghdr *cmsg;


if (revents == EV_READ) { if (revents == EV_READ) {
worker = (struct rspamd_worker *) w->data; worker = (struct rspamd_worker *) w->data;
srv = worker->srv;
rspamd_main = worker->srv;
iov.iov_base = &cmd; iov.iov_base = &cmd;
iov.iov_len = sizeof(cmd); iov.iov_len = sizeof(cmd);
memset(&msg, 0, sizeof(msg)); memset(&msg, 0, sizeof(msg));


if (r == -1) { if (r == -1) {
if (errno != EAGAIN) { if (errno != EAGAIN) {
msg_err("cannot read from worker's srv pipe: %s",
strerror(errno));
msg_err_main("cannot read from worker's srv pipe: %s",
strerror(errno));
} }
else { else {
return; return;
* Usually this means that a worker is dead, so do not try to read * Usually this means that a worker is dead, so do not try to read
* anything * anything
*/ */
msg_err("cannot read from worker's srv pipe connection closed; command = %s",
rspamd_srv_command_to_string(cmd.type));
msg_err_main("cannot read from worker's srv pipe connection closed; command = %s",
rspamd_srv_command_to_string(cmd.type));
ev_io_stop(EV_A_ w); ev_io_stop(EV_A_ w);
} }
else if (r != sizeof(cmd)) { else if (r != sizeof(cmd)) {
msg_err("cannot read from worker's srv pipe incomplete command: %d != %d; command = %s",
(gint) r, (gint) sizeof(cmd), rspamd_srv_command_to_string(cmd.type));
msg_err_main("cannot read from worker's srv pipe incomplete command: %d != %d; command = %s",
(gint) r, (gint) sizeof(cmd), rspamd_srv_command_to_string(cmd.type));
} }
else { else {
rdata = g_malloc0(sizeof(*rdata)); rdata = g_malloc0(sizeof(*rdata));
rdata->worker = worker; rdata->worker = worker;
rdata->srv = srv;
rdata->srv = rspamd_main;
rdata->rep.id = cmd.id; rdata->rep.id = cmd.id;
rdata->rep.type = cmd.type; rdata->rep.type = cmd.type;
rdata->fd = -1; rdata->fd = -1;


switch (cmd.type) { switch (cmd.type) {
case RSPAMD_SRV_SOCKETPAIR: case RSPAMD_SRV_SOCKETPAIR:
spair = g_hash_table_lookup(srv->spairs, cmd.cmd.spair.pair_id);
spair = g_hash_table_lookup(rspamd_main->spairs, cmd.cmd.spair.pair_id);
if (spair == NULL) { if (spair == NULL) {
spair = g_malloc(sizeof(gint) * 2); spair = g_malloc(sizeof(gint) * 2);


if (rspamd_socketpair(spair, cmd.cmd.spair.af) == -1) { if (rspamd_socketpair(spair, cmd.cmd.spair.af) == -1) {
rdata->rep.reply.spair.code = errno; rdata->rep.reply.spair.code = errno;
msg_err("cannot create socket pair: %s", strerror(errno));
msg_err_main("cannot create socket pair: %s", strerror(errno));
} }
else { else {
nid = g_malloc(sizeof(cmd.cmd.spair.pair_id)); nid = g_malloc(sizeof(cmd.cmd.spair.pair_id));
memcpy(nid, cmd.cmd.spair.pair_id, memcpy(nid, cmd.cmd.spair.pair_id,
sizeof(cmd.cmd.spair.pair_id)); sizeof(cmd.cmd.spair.pair_id));
g_hash_table_insert(srv->spairs, nid, spair);
g_hash_table_insert(rspamd_main->spairs, nid, spair);
rdata->rep.reply.spair.code = 0; rdata->rep.reply.spair.code = 0;
rdata->fd = cmd.cmd.spair.pair_num ? spair[1] : spair[0]; rdata->fd = cmd.cmd.spair.pair_num ? spair[1] : spair[0];
} }
break; break;
case RSPAMD_SRV_HYPERSCAN_LOADED: case RSPAMD_SRV_HYPERSCAN_LOADED:
/* Load RE cache to provide it for new forks */ /* Load RE cache to provide it for new forks */
if (rspamd_re_cache_is_hs_loaded(srv->cfg->re_cache) != RSPAMD_HYPERSCAN_LOADED_FULL ||
if (rspamd_re_cache_is_hs_loaded(rspamd_main->cfg->re_cache) != RSPAMD_HYPERSCAN_LOADED_FULL ||
cmd.cmd.hs_loaded.forced) { cmd.cmd.hs_loaded.forced) {
rspamd_re_cache_load_hyperscan( rspamd_re_cache_load_hyperscan(
srv->cfg->re_cache,
rspamd_main->cfg->re_cache,
cmd.cmd.hs_loaded.cache_dir, cmd.cmd.hs_loaded.cache_dir,
false); false);
} }


/* After getting this notice, we can clean up old hyperscan files */
rspamd_hyperscan_notice_loaded();
msg_info_main("received hyperscan cache loaded from %s",
cmd.cmd.hs_loaded.cache_dir);

/* Broadcast command to all workers */ /* Broadcast command to all workers */
memset(&wcmd, 0, sizeof(wcmd)); memset(&wcmd, 0, sizeof(wcmd));
wcmd.type = RSPAMD_CONTROL_HYPERSCAN_LOADED; wcmd.type = RSPAMD_CONTROL_HYPERSCAN_LOADED;
cmd.cmd.hs_loaded.cache_dir, cmd.cmd.hs_loaded.cache_dir,
sizeof(wcmd.cmd.hs_loaded.cache_dir)); sizeof(wcmd.cmd.hs_loaded.cache_dir));
wcmd.cmd.hs_loaded.forced = cmd.cmd.hs_loaded.forced; wcmd.cmd.hs_loaded.forced = cmd.cmd.hs_loaded.forced;
rspamd_control_broadcast_cmd(srv, &wcmd, rfd,
rspamd_control_broadcast_cmd(rspamd_main, &wcmd, rfd,
rspamd_control_ignore_io_handler, NULL, worker->pid); rspamd_control_ignore_io_handler, NULL, worker->pid);
break; break;
case RSPAMD_SRV_MONITORED_CHANGE: case RSPAMD_SRV_MONITORED_CHANGE:
sizeof(wcmd.cmd.monitored_change.tag)); sizeof(wcmd.cmd.monitored_change.tag));
wcmd.cmd.monitored_change.alive = cmd.cmd.monitored_change.alive; wcmd.cmd.monitored_change.alive = cmd.cmd.monitored_change.alive;
wcmd.cmd.monitored_change.sender = cmd.cmd.monitored_change.sender; wcmd.cmd.monitored_change.sender = cmd.cmd.monitored_change.sender;
rspamd_control_broadcast_cmd(srv, &wcmd, rfd,
rspamd_control_broadcast_cmd(rspamd_main, &wcmd, rfd,
rspamd_control_ignore_io_handler, NULL, 0); rspamd_control_ignore_io_handler, NULL, 0);
break; break;
case RSPAMD_SRV_LOG_PIPE: case RSPAMD_SRV_LOG_PIPE:
memset(&wcmd, 0, sizeof(wcmd)); memset(&wcmd, 0, sizeof(wcmd));
wcmd.type = RSPAMD_CONTROL_LOG_PIPE; wcmd.type = RSPAMD_CONTROL_LOG_PIPE;
wcmd.cmd.log_pipe.type = cmd.cmd.log_pipe.type; wcmd.cmd.log_pipe.type = cmd.cmd.log_pipe.type;
rspamd_control_broadcast_cmd(srv, &wcmd, rfd,
rspamd_control_broadcast_cmd(rspamd_main, &wcmd, rfd,
rspamd_control_log_pipe_io_handler, NULL, 0); rspamd_control_log_pipe_io_handler, NULL, 0);
break; break;
case RSPAMD_SRV_ON_FORK: case RSPAMD_SRV_ON_FORK:
rdata->rep.reply.on_fork.status = 0; rdata->rep.reply.on_fork.status = 0;
rspamd_control_handle_on_fork(&cmd, srv);
rspamd_control_handle_on_fork(&cmd, rspamd_main);
break; break;
case RSPAMD_SRV_HEARTBEAT: case RSPAMD_SRV_HEARTBEAT:
worker->hb.last_event = ev_time(); worker->hb.last_event = ev_time();
rdata->rep.reply.heartbeat.status = 0; rdata->rep.reply.heartbeat.status = 0;
break; break;
case RSPAMD_SRV_HEALTH: case RSPAMD_SRV_HEALTH:
rspamd_fill_health_reply(srv, &rdata->rep);
rspamd_fill_health_reply(rspamd_main, &rdata->rep);
break; break;
case RSPAMD_SRV_NOTICE_HYPERSCAN_CACHE: case RSPAMD_SRV_NOTICE_HYPERSCAN_CACHE:
#ifdef WITH_HYPERSCAN #ifdef WITH_HYPERSCAN
/* Ensure that memcpy is safe */ /* Ensure that memcpy is safe */
G_STATIC_ASSERT(sizeof(wcmd.cmd.fuzzy_blocked) == sizeof(cmd.cmd.fuzzy_blocked)); G_STATIC_ASSERT(sizeof(wcmd.cmd.fuzzy_blocked) == sizeof(cmd.cmd.fuzzy_blocked));
memcpy(&wcmd.cmd.fuzzy_blocked, &cmd.cmd.fuzzy_blocked, sizeof(wcmd.cmd.fuzzy_blocked)); memcpy(&wcmd.cmd.fuzzy_blocked, &cmd.cmd.fuzzy_blocked, sizeof(wcmd.cmd.fuzzy_blocked));
rspamd_control_broadcast_cmd(srv, &wcmd, rfd,
rspamd_control_broadcast_cmd(rspamd_main, &wcmd, rfd,
rspamd_control_ignore_io_handler, NULL, worker->pid); rspamd_control_ignore_io_handler, NULL, worker->pid);
break; break;
default: default:
msg_err("unknown command type: %d", cmd.type);
msg_err_main("unknown command type: %d", cmd.type);
break; break;
} }


rdata = (struct rspamd_srv_reply_data *) w->data; rdata = (struct rspamd_srv_reply_data *) w->data;
worker = rdata->worker; worker = rdata->worker;
worker->tmp_data = NULL; /* Avoid race */ worker->tmp_data = NULL; /* Avoid race */
srv = rdata->srv;
rspamd_main = rdata->srv;


memset(&msg, 0, sizeof(msg)); memset(&msg, 0, sizeof(msg));


r = sendmsg(w->fd, &msg, 0); r = sendmsg(w->fd, &msg, 0);


if (r == -1) { if (r == -1) {
msg_err("cannot write to worker's srv pipe when writing reply: %s; command = %s",
strerror(errno), rspamd_srv_command_to_string(rdata->rep.type));
msg_err_main("cannot write to worker's srv pipe when writing reply: %s; command = %s",
strerror(errno), rspamd_srv_command_to_string(rdata->rep.type));
} }
else if (r != sizeof(rdata->rep)) { else if (r != sizeof(rdata->rep)) {
msg_err("cannot write to worker's srv pipe: %d != %d; command = %s",
(int) r, (int) sizeof(rdata->rep),
rspamd_srv_command_to_string(rdata->rep.type));
msg_err_main("cannot write to worker's srv pipe: %d != %d; command = %s",
(int) r, (int) sizeof(rdata->rep),
rspamd_srv_command_to_string(rdata->rep.type));
} }


g_free(rdata); g_free(rdata);

Loading…
Cancel
Save