aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorVsevolod Stakhov <vsevolod@rspamd.com>2025-07-01 21:27:23 +0100
committerVsevolod Stakhov <vsevolod@rspamd.com>2025-07-01 21:27:23 +0100
commit6e932f18359d29419a9beeb7be562bdec486a29f (patch)
tree87db1d1c47ee8eb9ef032f7015c8b81715b9be33
parent40df8199246e0c4e07a05dd35e8af59a69c5c8bd (diff)
downloadrspamd-6e932f18359d29419a9beeb7be562bdec486a29f.tar.gz
rspamd-6e932f18359d29419a9beeb7be562bdec486a29f.zip
[Feature] Add a signal from main to workers for workers ready state
-rw-r--r--src/hs_helper.c164
-rw-r--r--src/libserver/rspamd_control.c16
-rw-r--r--src/libserver/rspamd_control.h17
-rw-r--r--src/rspamd.c23
4 files changed, 153 insertions, 67 deletions
diff --git a/src/hs_helper.c b/src/hs_helper.c
index 3bd2040f8..55dbb53f6 100644
--- a/src/hs_helper.c
+++ b/src/hs_helper.c
@@ -1,11 +1,11 @@
-/*-
- * Copyright 2016 Vsevolod Stakhov
+/*
+ * Copyright 2025 Vsevolod Stakhov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -250,12 +250,12 @@ struct rspamd_hs_helper_compile_cbdata {
unsigned int total_compiled;
unsigned int scopes_remaining;
gboolean forced;
+ gboolean workers_ready;
};
static void
-rspamd_rs_delayed_scoped_cb(EV_P_ ev_timer *w, int revents)
+rspamd_rs_send_final_notification(struct rspamd_hs_helper_compile_cbdata *cbd)
{
- struct rspamd_hs_helper_compile_cbdata *cbd = (struct rspamd_hs_helper_compile_cbdata *) w->data;
struct rspamd_worker *worker = cbd->worker;
struct hs_helper_ctx *ctx = cbd->ctx;
static struct rspamd_srv_command srv_cmd;
@@ -269,11 +269,12 @@ rspamd_rs_delayed_scoped_cb(EV_P_ ev_timer *w, int revents)
rspamd_srv_send_command(worker,
ctx->event_loop, &srv_cmd, -1, NULL, NULL);
- ev_timer_stop(EV_A_ w);
- g_free(w);
- g_free(cbd);
- ev_timer_again(EV_A_ & ctx->recompile_timer);
+ msg_info("sent final hyperscan loaded notification (%d total expressions compiled)",
+ cbd->total_compiled);
+
+ g_free(cbd);
+ ev_timer_stop(ctx->event_loop, &ctx->recompile_timer);
}
static void
@@ -320,44 +321,32 @@ rspamd_rs_compile_scoped_cb(const char *scope, unsigned int ncompiled, GError *e
/* Check if all scopes are done */
if (compile_cbd->scopes_remaining == 0) {
- ev_timer *tm;
- ev_tstamp when = 0.0;
-
- /*
- * Do not send notification unless all other workers are started
- * XXX: now we just sleep for 1 seconds to ensure that
- */
- if (!ctx->loaded) {
- when = 1.0; /* Postpone */
- ctx->loaded = TRUE;
+ if (compile_cbd->workers_ready) {
+ /* Workers are ready, send notification immediately */
msg_info("compiled %d total regular expressions to the hyperscan tree, "
- "postpone final notification for %.0f seconds to avoid races",
- compile_cbd->total_compiled,
- when);
+ "send final notification",
+ compile_cbd->total_compiled);
+ rspamd_rs_send_final_notification(compile_cbd);
}
else {
+ /* Workers not ready yet, notification will be sent when workers_spawned event is received */
msg_info("compiled %d total regular expressions to the hyperscan tree, "
- "send final notification",
+ "waiting for workers to be ready before sending notification",
compile_cbd->total_compiled);
+ ctx->loaded = TRUE;
}
-
- tm = g_malloc0(sizeof(*tm));
- tm->data = (void *) compile_cbd;
- ev_timer_init(tm, rspamd_rs_delayed_scoped_cb, when, 0);
- ev_timer_start(ctx->event_loop, tm);
}
}
struct rspamd_hs_helper_single_compile_cbdata {
struct rspamd_worker *worker;
gboolean forced;
+ gboolean workers_ready;
};
static void
-rspamd_rs_delayed_cb(EV_P_ ev_timer *w, int revents)
+rspamd_rs_send_single_notification(struct rspamd_hs_helper_single_compile_cbdata *cbd)
{
- struct rspamd_hs_helper_single_compile_cbdata *cbd =
- (struct rspamd_hs_helper_single_compile_cbdata *) w->data;
struct rspamd_worker *worker = cbd->worker;
static struct rspamd_srv_command srv_cmd;
struct hs_helper_ctx *ctx;
@@ -372,11 +361,11 @@ rspamd_rs_delayed_cb(EV_P_ ev_timer *w, int revents)
rspamd_srv_send_command(worker,
ctx->event_loop, &srv_cmd, -1, NULL, NULL);
- ev_timer_stop(EV_A_ w);
- g_free(w);
- g_free(cbd);
- ev_timer_again(EV_A_ & ctx->recompile_timer);
+ msg_info("sent hyperscan loaded notification");
+
+ g_free(cbd);
+ ev_timer_again(ctx->event_loop, &ctx->recompile_timer);
}
static void
@@ -385,8 +374,6 @@ rspamd_rs_compile_cb(unsigned int ncompiled, GError *err, void *cbd)
struct rspamd_hs_helper_single_compile_cbdata *compile_cbd =
(struct rspamd_hs_helper_single_compile_cbdata *) cbd;
struct rspamd_worker *worker = compile_cbd->worker;
- ev_timer *tm;
- ev_tstamp when = 0.0;
struct hs_helper_ctx *ctx;
struct rspamd_hs_helper_single_compile_cbdata *timer_cbd;
@@ -399,33 +386,26 @@ rspamd_rs_compile_cb(unsigned int ncompiled, GError *err, void *cbd)
return;
}
- /*
- * Do not send notification unless all other workers are started
- * XXX: now we just sleep for 1 seconds to ensure that
- */
- if (!ctx->loaded) {
- when = 1.0; /* Postpone */
- ctx->loaded = TRUE;
+ timer_cbd = g_malloc0(sizeof(*timer_cbd));
+ timer_cbd->worker = worker;
+ timer_cbd->forced = (ncompiled > 0) ? TRUE : compile_cbd->forced;
+ timer_cbd->workers_ready = compile_cbd->workers_ready;
+
+ if (timer_cbd->workers_ready) {
+ /* Workers are ready, send notification immediately */
msg_info("compiled %d regular expressions to the hyperscan tree, "
- "postpone loaded notification for %.0f seconds to avoid races",
- ncompiled,
- when);
+ "send loaded notification",
+ ncompiled);
+ rspamd_rs_send_single_notification(timer_cbd);
}
else {
+ /* Workers not ready yet, notification will be sent when workers_spawned event is received */
msg_info("compiled %d regular expressions to the hyperscan tree, "
- "send loaded notification",
+ "waiting for workers to be ready before sending notification",
ncompiled);
+ ctx->loaded = TRUE;
}
- timer_cbd = g_malloc0(sizeof(*timer_cbd));
- timer_cbd->worker = worker;
- timer_cbd->forced = (ncompiled > 0) ? TRUE : compile_cbd->forced;
-
- tm = g_malloc0(sizeof(*tm));
- tm->data = (void *) timer_cbd;
- ev_timer_init(tm, rspamd_rs_delayed_cb, when, 0);
- ev_timer_start(ctx->event_loop, tm);
-
g_free(compile_cbd);
}
@@ -453,6 +433,7 @@ rspamd_rs_compile(struct hs_helper_ctx *ctx, struct rspamd_worker *worker,
g_malloc0(sizeof(*single_cbd));
single_cbd->worker = worker;
single_cbd->forced = forced;
+ single_cbd->workers_ready = ctx->loaded;
rspamd_re_cache_compile_hyperscan(ctx->cfg->re_cache,
ctx->hs_dir, ctx->max_time, !forced,
@@ -472,6 +453,7 @@ rspamd_rs_compile(struct hs_helper_ctx *ctx, struct rspamd_worker *worker,
g_malloc0(sizeof(*single_cbd));
single_cbd->worker = worker;
single_cbd->forced = forced;
+ single_cbd->workers_ready = ctx->loaded;
rspamd_re_cache_compile_hyperscan(ctx->cfg->re_cache,
ctx->hs_dir, ctx->max_time, !forced,
@@ -489,6 +471,7 @@ rspamd_rs_compile(struct hs_helper_ctx *ctx, struct rspamd_worker *worker,
compile_cbd->total_compiled = 0;
compile_cbd->scopes_remaining = names_count;
compile_cbd->forced = forced;
+ compile_cbd->workers_ready = ctx->loaded;
/* Compile each scope */
for (unsigned int i = 0; i < names_count; i++) {
@@ -510,10 +493,12 @@ rspamd_rs_compile(struct hs_helper_ctx *ctx, struct rspamd_worker *worker,
/* Check if we're done */
if (compile_cbd->scopes_remaining == 0) {
/* No scopes to compile, send final notification immediately */
- ev_timer *tm = g_malloc0(sizeof(*tm));
- tm->data = (void *) compile_cbd;
- ev_timer_init(tm, rspamd_rs_delayed_scoped_cb, 0.0, 0);
- ev_timer_start(ctx->event_loop, tm);
+ if (compile_cbd->workers_ready) {
+ rspamd_rs_send_final_notification(compile_cbd);
+ }
+ else {
+ ctx->loaded = TRUE;
+ }
}
}
}
@@ -545,11 +530,61 @@ rspamd_hs_helper_reload(struct rspamd_main *rspamd_main,
/* Stop recompile */
ev_timer_stop(ctx->event_loop, &ctx->recompile_timer);
+ ctx->loaded = FALSE; /* Reset flag for forced recompile */
rspamd_rs_compile(ctx, worker, TRUE);
return TRUE;
}
+static gboolean
+rspamd_hs_helper_workers_spawned(struct rspamd_main *rspamd_main,
+ struct rspamd_worker *worker, int fd,
+ int attached_fd,
+ struct rspamd_control_command *cmd,
+ gpointer ud)
+{
+ struct rspamd_control_reply rep;
+ struct hs_helper_ctx *ctx = ud;
+
+ msg_info("received workers_spawned notification (%d workers); hyperscan ready: %s",
+ cmd->cmd.workers_spawned.workers_count,
+ ctx->loaded ? "yes" : "no");
+
+ memset(&rep, 0, sizeof(rep));
+ rep.type = RSPAMD_CONTROL_WORKERS_SPAWNED;
+ rep.reply.workers_spawned.status = 0;
+
+ /* Write reply */
+ if (write(fd, &rep, sizeof(rep)) != sizeof(rep)) {
+ msg_err("cannot write reply to the control socket: %s",
+ strerror(errno));
+ }
+
+ /* If hyperscan compilation has finished but we were waiting for workers, trigger notification now */
+ if (ctx->loaded) {
+ static struct rspamd_srv_command srv_cmd;
+
+ memset(&srv_cmd, 0, sizeof(srv_cmd));
+ srv_cmd.type = RSPAMD_SRV_HYPERSCAN_LOADED;
+ rspamd_strlcpy(srv_cmd.cmd.hs_loaded.cache_dir, ctx->hs_dir,
+ sizeof(srv_cmd.cmd.hs_loaded.cache_dir));
+ srv_cmd.cmd.hs_loaded.forced = FALSE;
+ srv_cmd.cmd.hs_loaded.scope[0] = '\0'; /* NULL scope means all scopes */
+
+ rspamd_srv_send_command(worker,
+ ctx->event_loop, &srv_cmd, -1, NULL, NULL);
+
+ msg_info("sent delayed hyperscan loaded notification after workers spawned");
+ ctx->loaded = FALSE; /* Reset to avoid duplicate notifications */
+ }
+
+ if (attached_fd != -1) {
+ close(attached_fd);
+ }
+
+ return TRUE;
+}
+
static void
rspamd_hs_helper_timer(EV_P_ ev_timer *w, int revents)
{
@@ -583,13 +618,10 @@ start_hs_helper(struct rspamd_worker *worker)
"hs_helper",
NULL);
- if (!rspamd_rs_compile(ctx, worker, FALSE)) {
- /* Tell main not to respawn more workers */
- exit(EXIT_SUCCESS);
- }
-
rspamd_control_worker_add_cmd_handler(worker, RSPAMD_CONTROL_RECOMPILE,
rspamd_hs_helper_reload, ctx);
+ rspamd_control_worker_add_cmd_handler(worker, RSPAMD_CONTROL_WORKERS_SPAWNED,
+ rspamd_hs_helper_workers_spawned, ctx);
ctx->recompile_timer.data = worker;
tim = rspamd_time_jitter(ctx->recompile_time, 0);
diff --git a/src/libserver/rspamd_control.c b/src/libserver/rspamd_control.c
index deab5064b..e212f7e91 100644
--- a/src/libserver/rspamd_control.c
+++ b/src/libserver/rspamd_control.c
@@ -724,6 +724,9 @@ rspamd_control_default_cmd_handler(int fd,
case RSPAMD_CONTROL_CHILD_CHANGE:
case RSPAMD_CONTROL_FUZZY_BLOCKED:
break;
+ case RSPAMD_CONTROL_WORKERS_SPAWNED:
+ rep.reply.workers_spawned.status = 0;
+ break;
case RSPAMD_CONTROL_RERESOLVE:
if (cd->worker->srv->cfg) {
REF_RETAIN(cd->worker->srv->cfg);
@@ -1165,6 +1168,10 @@ rspamd_srv_handler(EV_P_ ev_io *w, int revents)
rspamd_control_broadcast_cmd(rspamd_main, &wcmd, rfd,
rspamd_control_ignore_io_handler, NULL, worker->pid);
break;
+ case RSPAMD_SRV_WORKERS_SPAWNED:
+ /* No need to broadcast, this is just a notification from main to specific workers */
+ rdata->rep.reply.workers_spawned.status = 0;
+ break;
default:
msg_err_main("unknown command type: %d", cmd.type);
break;
@@ -1418,6 +1425,9 @@ rspamd_control_command_from_string(const char *str)
else if (g_ascii_strcasecmp(str, "child_change") == 0) {
ret = RSPAMD_CONTROL_CHILD_CHANGE;
}
+ else if (g_ascii_strcasecmp(str, "workers_spawned") == 0) {
+ ret = RSPAMD_CONTROL_WORKERS_SPAWNED;
+ }
return ret;
}
@@ -1458,6 +1468,9 @@ rspamd_control_command_to_string(enum rspamd_control_type cmd)
case RSPAMD_CONTROL_CHILD_CHANGE:
reply = "child_change";
break;
+ case RSPAMD_CONTROL_WORKERS_SPAWNED:
+ reply = "workers_spawned";
+ break;
default:
break;
}
@@ -1497,6 +1510,9 @@ const char *rspamd_srv_command_to_string(enum rspamd_srv_type cmd)
case RSPAMD_SRV_FUZZY_BLOCKED:
reply = "fuzzy_blocked";
break;
+ case RSPAMD_SRV_WORKERS_SPAWNED:
+ reply = "workers_spawned";
+ break;
}
return reply;
diff --git a/src/libserver/rspamd_control.h b/src/libserver/rspamd_control.h
index 92bdec85d..81603cab2 100644
--- a/src/libserver/rspamd_control.h
+++ b/src/libserver/rspamd_control.h
@@ -37,6 +37,7 @@ enum rspamd_control_type {
RSPAMD_CONTROL_MONITORED_CHANGE,
RSPAMD_CONTROL_CHILD_CHANGE,
RSPAMD_CONTROL_FUZZY_BLOCKED,
+ RSPAMD_CONTROL_WORKERS_SPAWNED,
RSPAMD_CONTROL_MAX
};
@@ -49,7 +50,8 @@ enum rspamd_srv_type {
RSPAMD_SRV_HEARTBEAT,
RSPAMD_SRV_HEALTH,
RSPAMD_SRV_NOTICE_HYPERSCAN_CACHE,
- RSPAMD_SRV_FUZZY_BLOCKED, /* Used to notify main process about a blocked ip */
+ RSPAMD_SRV_FUZZY_BLOCKED, /* Used to notify main process about a blocked ip */
+ RSPAMD_SRV_WORKERS_SPAWNED, /* Used to notify workers that all workers have been spawned */
};
enum rspamd_log_pipe_type {
@@ -107,6 +109,9 @@ struct rspamd_control_command {
} addr;
sa_family_t af;
} fuzzy_blocked;
+ struct {
+ unsigned int workers_count;
+ } workers_spawned;
} cmd;
};
@@ -148,6 +153,9 @@ struct rspamd_control_reply {
struct {
unsigned int status;
} fuzzy_blocked;
+ struct {
+ unsigned int status;
+ } workers_spawned;
} reply;
};
@@ -203,6 +211,10 @@ struct rspamd_srv_command {
} addr;
sa_family_t af;
} fuzzy_blocked;
+ /* Sent when all workers have been spawned */
+ struct {
+ unsigned int workers_count;
+ } workers_spawned;
} cmd;
};
@@ -240,6 +252,9 @@ struct rspamd_srv_reply {
struct {
int unused;
} fuzzy_blocked;
+ struct {
+ int status;
+ } workers_spawned;
} reply;
};
diff --git a/src/rspamd.c b/src/rspamd.c
index dafd9aebe..ba1ea1fb8 100644
--- a/src/rspamd.c
+++ b/src/rspamd.c
@@ -1155,6 +1155,18 @@ rspamd_hup_handler(struct ev_loop *loop, ev_signal *w, int revents)
msg_info_main("spawn workers with a new config");
spawn_workers(rspamd_main, rspamd_main->event_loop);
msg_info_main("workers spawning has been finished");
+
+ /* Notify all workers that spawning is complete */
+ {
+ struct rspamd_control_command wcmd;
+ memset(&wcmd, 0, sizeof(wcmd));
+ wcmd.type = RSPAMD_CONTROL_WORKERS_SPAWNED;
+ wcmd.cmd.workers_spawned.workers_count = g_hash_table_size(rspamd_main->workers);
+ rspamd_control_broadcast_srv_cmd(rspamd_main, &wcmd, 0);
+ msg_info_main("notified workers that spawning is complete after reload (%d workers)",
+ wcmd.cmd.workers_spawned.workers_count);
+ }
+
/* Kill marked */
msg_info_main("kill old workers");
g_hash_table_foreach(rspamd_main->workers, kill_old_workers, NULL);
@@ -1687,6 +1699,17 @@ int main(int argc, char **argv, char **env)
spawn_workers(rspamd_main, event_loop);
rspamd_mempool_unlock_mutex(rspamd_main->start_mtx);
+ /* Notify all workers that spawning is complete */
+ {
+ struct rspamd_control_command wcmd;
+ memset(&wcmd, 0, sizeof(wcmd));
+ wcmd.type = RSPAMD_CONTROL_WORKERS_SPAWNED;
+ wcmd.cmd.workers_spawned.workers_count = g_hash_table_size(rspamd_main->workers);
+ rspamd_control_broadcast_srv_cmd(rspamd_main, &wcmd, 0);
+ msg_info_main("notified workers that spawning is complete (%d workers)",
+ wcmd.cmd.workers_spawned.workers_count);
+ }
+
rspamd_main->http_ctx = rspamd_http_context_create(rspamd_main->cfg,
event_loop, rspamd_main->cfg->ups_ctx);