diff options
author | Vsevolod Stakhov <vsevolod@rspamd.com> | 2025-07-01 21:27:23 +0100 |
---|---|---|
committer | Vsevolod Stakhov <vsevolod@rspamd.com> | 2025-07-01 21:27:23 +0100 |
commit | 6e932f18359d29419a9beeb7be562bdec486a29f (patch) | |
tree | 87db1d1c47ee8eb9ef032f7015c8b81715b9be33 | |
parent | 40df8199246e0c4e07a05dd35e8af59a69c5c8bd (diff) | |
download | rspamd-6e932f18359d29419a9beeb7be562bdec486a29f.tar.gz rspamd-6e932f18359d29419a9beeb7be562bdec486a29f.zip |
[Feature] Add a signal from main to workers for workers ready state
-rw-r--r-- | src/hs_helper.c | 164 | ||||
-rw-r--r-- | src/libserver/rspamd_control.c | 16 | ||||
-rw-r--r-- | src/libserver/rspamd_control.h | 17 | ||||
-rw-r--r-- | src/rspamd.c | 23 |
4 files changed, 153 insertions, 67 deletions
diff --git a/src/hs_helper.c b/src/hs_helper.c index 3bd2040f8..55dbb53f6 100644 --- a/src/hs_helper.c +++ b/src/hs_helper.c @@ -1,11 +1,11 @@ -/*- - * Copyright 2016 Vsevolod Stakhov +/* + * Copyright 2025 Vsevolod Stakhov * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -250,12 +250,12 @@ struct rspamd_hs_helper_compile_cbdata { unsigned int total_compiled; unsigned int scopes_remaining; gboolean forced; + gboolean workers_ready; }; static void -rspamd_rs_delayed_scoped_cb(EV_P_ ev_timer *w, int revents) +rspamd_rs_send_final_notification(struct rspamd_hs_helper_compile_cbdata *cbd) { - struct rspamd_hs_helper_compile_cbdata *cbd = (struct rspamd_hs_helper_compile_cbdata *) w->data; struct rspamd_worker *worker = cbd->worker; struct hs_helper_ctx *ctx = cbd->ctx; static struct rspamd_srv_command srv_cmd; @@ -269,11 +269,12 @@ rspamd_rs_delayed_scoped_cb(EV_P_ ev_timer *w, int revents) rspamd_srv_send_command(worker, ctx->event_loop, &srv_cmd, -1, NULL, NULL); - ev_timer_stop(EV_A_ w); - g_free(w); - g_free(cbd); - ev_timer_again(EV_A_ & ctx->recompile_timer); + msg_info("sent final hyperscan loaded notification (%d total expressions compiled)", + cbd->total_compiled); + + g_free(cbd); + ev_timer_stop(ctx->event_loop, &ctx->recompile_timer); } static void @@ -320,44 +321,32 @@ rspamd_rs_compile_scoped_cb(const char *scope, unsigned int ncompiled, GError *e /* Check if all scopes are done */ if (compile_cbd->scopes_remaining == 0) { - ev_timer *tm; - ev_tstamp when = 0.0; - - /* - * Do not send notification unless all other workers are started - * XXX: now we just sleep for 1 seconds to ensure that - */ - if (!ctx->loaded) { - when = 1.0; /* Postpone */ - ctx->loaded = TRUE; + if (compile_cbd->workers_ready) { + /* Workers are ready, send notification immediately */ msg_info("compiled %d total regular expressions to the hyperscan tree, " - "postpone final notification for %.0f seconds to avoid races", - compile_cbd->total_compiled, - when); + "send final notification", + compile_cbd->total_compiled); + rspamd_rs_send_final_notification(compile_cbd); } else { + /* Workers not ready yet, notification will be sent when workers_spawned event is received */ msg_info("compiled %d total regular expressions to the hyperscan tree, " - "send final notification", + "waiting for workers to be ready before sending notification", compile_cbd->total_compiled); + ctx->loaded = TRUE; } - - tm = g_malloc0(sizeof(*tm)); - tm->data = (void *) compile_cbd; - ev_timer_init(tm, rspamd_rs_delayed_scoped_cb, when, 0); - ev_timer_start(ctx->event_loop, tm); } } struct rspamd_hs_helper_single_compile_cbdata { struct rspamd_worker *worker; gboolean forced; + gboolean workers_ready; }; static void -rspamd_rs_delayed_cb(EV_P_ ev_timer *w, int revents) +rspamd_rs_send_single_notification(struct rspamd_hs_helper_single_compile_cbdata *cbd) { - struct rspamd_hs_helper_single_compile_cbdata *cbd = - (struct rspamd_hs_helper_single_compile_cbdata *) w->data; struct rspamd_worker *worker = cbd->worker; static struct rspamd_srv_command srv_cmd; struct hs_helper_ctx *ctx; @@ -372,11 +361,11 @@ rspamd_rs_delayed_cb(EV_P_ ev_timer *w, int revents) rspamd_srv_send_command(worker, ctx->event_loop, &srv_cmd, -1, NULL, NULL); - ev_timer_stop(EV_A_ w); - g_free(w); - g_free(cbd); - ev_timer_again(EV_A_ & ctx->recompile_timer); + msg_info("sent hyperscan loaded notification"); + + g_free(cbd); + ev_timer_again(ctx->event_loop, &ctx->recompile_timer); } static void @@ -385,8 +374,6 @@ rspamd_rs_compile_cb(unsigned int ncompiled, GError *err, void *cbd) struct rspamd_hs_helper_single_compile_cbdata *compile_cbd = (struct rspamd_hs_helper_single_compile_cbdata *) cbd; struct rspamd_worker *worker = compile_cbd->worker; - ev_timer *tm; - ev_tstamp when = 0.0; struct hs_helper_ctx *ctx; struct rspamd_hs_helper_single_compile_cbdata *timer_cbd; @@ -399,33 +386,26 @@ rspamd_rs_compile_cb(unsigned int ncompiled, GError *err, void *cbd) return; } - /* - * Do not send notification unless all other workers are started - * XXX: now we just sleep for 1 seconds to ensure that - */ - if (!ctx->loaded) { - when = 1.0; /* Postpone */ - ctx->loaded = TRUE; + timer_cbd = g_malloc0(sizeof(*timer_cbd)); + timer_cbd->worker = worker; + timer_cbd->forced = (ncompiled > 0) ? TRUE : compile_cbd->forced; + timer_cbd->workers_ready = compile_cbd->workers_ready; + + if (timer_cbd->workers_ready) { + /* Workers are ready, send notification immediately */ msg_info("compiled %d regular expressions to the hyperscan tree, " - "postpone loaded notification for %.0f seconds to avoid races", - ncompiled, - when); + "send loaded notification", + ncompiled); + rspamd_rs_send_single_notification(timer_cbd); } else { + /* Workers not ready yet, notification will be sent when workers_spawned event is received */ msg_info("compiled %d regular expressions to the hyperscan tree, " - "send loaded notification", + "waiting for workers to be ready before sending notification", ncompiled); + ctx->loaded = TRUE; } - timer_cbd = g_malloc0(sizeof(*timer_cbd)); - timer_cbd->worker = worker; - timer_cbd->forced = (ncompiled > 0) ? TRUE : compile_cbd->forced; - - tm = g_malloc0(sizeof(*tm)); - tm->data = (void *) timer_cbd; - ev_timer_init(tm, rspamd_rs_delayed_cb, when, 0); - ev_timer_start(ctx->event_loop, tm); - g_free(compile_cbd); } @@ -453,6 +433,7 @@ rspamd_rs_compile(struct hs_helper_ctx *ctx, struct rspamd_worker *worker, g_malloc0(sizeof(*single_cbd)); single_cbd->worker = worker; single_cbd->forced = forced; + single_cbd->workers_ready = ctx->loaded; rspamd_re_cache_compile_hyperscan(ctx->cfg->re_cache, ctx->hs_dir, ctx->max_time, !forced, @@ -472,6 +453,7 @@ rspamd_rs_compile(struct hs_helper_ctx *ctx, struct rspamd_worker *worker, g_malloc0(sizeof(*single_cbd)); single_cbd->worker = worker; single_cbd->forced = forced; + single_cbd->workers_ready = ctx->loaded; rspamd_re_cache_compile_hyperscan(ctx->cfg->re_cache, ctx->hs_dir, ctx->max_time, !forced, @@ -489,6 +471,7 @@ rspamd_rs_compile(struct hs_helper_ctx *ctx, struct rspamd_worker *worker, compile_cbd->total_compiled = 0; compile_cbd->scopes_remaining = names_count; compile_cbd->forced = forced; + compile_cbd->workers_ready = ctx->loaded; /* Compile each scope */ for (unsigned int i = 0; i < names_count; i++) { @@ -510,10 +493,12 @@ rspamd_rs_compile(struct hs_helper_ctx *ctx, struct rspamd_worker *worker, /* Check if we're done */ if (compile_cbd->scopes_remaining == 0) { /* No scopes to compile, send final notification immediately */ - ev_timer *tm = g_malloc0(sizeof(*tm)); - tm->data = (void *) compile_cbd; - ev_timer_init(tm, rspamd_rs_delayed_scoped_cb, 0.0, 0); - ev_timer_start(ctx->event_loop, tm); + if (compile_cbd->workers_ready) { + rspamd_rs_send_final_notification(compile_cbd); + } + else { + ctx->loaded = TRUE; + } } } } @@ -545,11 +530,61 @@ rspamd_hs_helper_reload(struct rspamd_main *rspamd_main, /* Stop recompile */ ev_timer_stop(ctx->event_loop, &ctx->recompile_timer); + ctx->loaded = FALSE; /* Reset flag for forced recompile */ rspamd_rs_compile(ctx, worker, TRUE); return TRUE; } +static gboolean +rspamd_hs_helper_workers_spawned(struct rspamd_main *rspamd_main, + struct rspamd_worker *worker, int fd, + int attached_fd, + struct rspamd_control_command *cmd, + gpointer ud) +{ + struct rspamd_control_reply rep; + struct hs_helper_ctx *ctx = ud; + + msg_info("received workers_spawned notification (%d workers); hyperscan ready: %s", + cmd->cmd.workers_spawned.workers_count, + ctx->loaded ? "yes" : "no"); + + memset(&rep, 0, sizeof(rep)); + rep.type = RSPAMD_CONTROL_WORKERS_SPAWNED; + rep.reply.workers_spawned.status = 0; + + /* Write reply */ + if (write(fd, &rep, sizeof(rep)) != sizeof(rep)) { + msg_err("cannot write reply to the control socket: %s", + strerror(errno)); + } + + /* If hyperscan compilation has finished but we were waiting for workers, trigger notification now */ + if (ctx->loaded) { + static struct rspamd_srv_command srv_cmd; + + memset(&srv_cmd, 0, sizeof(srv_cmd)); + srv_cmd.type = RSPAMD_SRV_HYPERSCAN_LOADED; + rspamd_strlcpy(srv_cmd.cmd.hs_loaded.cache_dir, ctx->hs_dir, + sizeof(srv_cmd.cmd.hs_loaded.cache_dir)); + srv_cmd.cmd.hs_loaded.forced = FALSE; + srv_cmd.cmd.hs_loaded.scope[0] = '\0'; /* NULL scope means all scopes */ + + rspamd_srv_send_command(worker, + ctx->event_loop, &srv_cmd, -1, NULL, NULL); + + msg_info("sent delayed hyperscan loaded notification after workers spawned"); + ctx->loaded = FALSE; /* Reset to avoid duplicate notifications */ + } + + if (attached_fd != -1) { + close(attached_fd); + } + + return TRUE; +} + static void rspamd_hs_helper_timer(EV_P_ ev_timer *w, int revents) { @@ -583,13 +618,10 @@ start_hs_helper(struct rspamd_worker *worker) "hs_helper", NULL); - if (!rspamd_rs_compile(ctx, worker, FALSE)) { - /* Tell main not to respawn more workers */ - exit(EXIT_SUCCESS); - } - rspamd_control_worker_add_cmd_handler(worker, RSPAMD_CONTROL_RECOMPILE, rspamd_hs_helper_reload, ctx); + rspamd_control_worker_add_cmd_handler(worker, RSPAMD_CONTROL_WORKERS_SPAWNED, + rspamd_hs_helper_workers_spawned, ctx); ctx->recompile_timer.data = worker; tim = rspamd_time_jitter(ctx->recompile_time, 0); diff --git a/src/libserver/rspamd_control.c b/src/libserver/rspamd_control.c index deab5064b..e212f7e91 100644 --- a/src/libserver/rspamd_control.c +++ b/src/libserver/rspamd_control.c @@ -724,6 +724,9 @@ rspamd_control_default_cmd_handler(int fd, case RSPAMD_CONTROL_CHILD_CHANGE: case RSPAMD_CONTROL_FUZZY_BLOCKED: break; + case RSPAMD_CONTROL_WORKERS_SPAWNED: + rep.reply.workers_spawned.status = 0; + break; case RSPAMD_CONTROL_RERESOLVE: if (cd->worker->srv->cfg) { REF_RETAIN(cd->worker->srv->cfg); @@ -1165,6 +1168,10 @@ rspamd_srv_handler(EV_P_ ev_io *w, int revents) rspamd_control_broadcast_cmd(rspamd_main, &wcmd, rfd, rspamd_control_ignore_io_handler, NULL, worker->pid); break; + case RSPAMD_SRV_WORKERS_SPAWNED: + /* No need to broadcast, this is just a notification from main to specific workers */ + rdata->rep.reply.workers_spawned.status = 0; + break; default: msg_err_main("unknown command type: %d", cmd.type); break; @@ -1418,6 +1425,9 @@ rspamd_control_command_from_string(const char *str) else if (g_ascii_strcasecmp(str, "child_change") == 0) { ret = RSPAMD_CONTROL_CHILD_CHANGE; } + else if (g_ascii_strcasecmp(str, "workers_spawned") == 0) { + ret = RSPAMD_CONTROL_WORKERS_SPAWNED; + } return ret; } @@ -1458,6 +1468,9 @@ rspamd_control_command_to_string(enum rspamd_control_type cmd) case RSPAMD_CONTROL_CHILD_CHANGE: reply = "child_change"; break; + case RSPAMD_CONTROL_WORKERS_SPAWNED: + reply = "workers_spawned"; + break; default: break; } @@ -1497,6 +1510,9 @@ const char *rspamd_srv_command_to_string(enum rspamd_srv_type cmd) case RSPAMD_SRV_FUZZY_BLOCKED: reply = "fuzzy_blocked"; break; + case RSPAMD_SRV_WORKERS_SPAWNED: + reply = "workers_spawned"; + break; } return reply; diff --git a/src/libserver/rspamd_control.h b/src/libserver/rspamd_control.h index 92bdec85d..81603cab2 100644 --- a/src/libserver/rspamd_control.h +++ b/src/libserver/rspamd_control.h @@ -37,6 +37,7 @@ enum rspamd_control_type { RSPAMD_CONTROL_MONITORED_CHANGE, RSPAMD_CONTROL_CHILD_CHANGE, RSPAMD_CONTROL_FUZZY_BLOCKED, + RSPAMD_CONTROL_WORKERS_SPAWNED, RSPAMD_CONTROL_MAX }; @@ -49,7 +50,8 @@ enum rspamd_srv_type { RSPAMD_SRV_HEARTBEAT, RSPAMD_SRV_HEALTH, RSPAMD_SRV_NOTICE_HYPERSCAN_CACHE, - RSPAMD_SRV_FUZZY_BLOCKED, /* Used to notify main process about a blocked ip */ + RSPAMD_SRV_FUZZY_BLOCKED, /* Used to notify main process about a blocked ip */ + RSPAMD_SRV_WORKERS_SPAWNED, /* Used to notify workers that all workers have been spawned */ }; enum rspamd_log_pipe_type { @@ -107,6 +109,9 @@ struct rspamd_control_command { } addr; sa_family_t af; } fuzzy_blocked; + struct { + unsigned int workers_count; + } workers_spawned; } cmd; }; @@ -148,6 +153,9 @@ struct rspamd_control_reply { struct { unsigned int status; } fuzzy_blocked; + struct { + unsigned int status; + } workers_spawned; } reply; }; @@ -203,6 +211,10 @@ struct rspamd_srv_command { } addr; sa_family_t af; } fuzzy_blocked; + /* Sent when all workers have been spawned */ + struct { + unsigned int workers_count; + } workers_spawned; } cmd; }; @@ -240,6 +252,9 @@ struct rspamd_srv_reply { struct { int unused; } fuzzy_blocked; + struct { + int status; + } workers_spawned; } reply; }; diff --git a/src/rspamd.c b/src/rspamd.c index dafd9aebe..ba1ea1fb8 100644 --- a/src/rspamd.c +++ b/src/rspamd.c @@ -1155,6 +1155,18 @@ rspamd_hup_handler(struct ev_loop *loop, ev_signal *w, int revents) msg_info_main("spawn workers with a new config"); spawn_workers(rspamd_main, rspamd_main->event_loop); msg_info_main("workers spawning has been finished"); + + /* Notify all workers that spawning is complete */ + { + struct rspamd_control_command wcmd; + memset(&wcmd, 0, sizeof(wcmd)); + wcmd.type = RSPAMD_CONTROL_WORKERS_SPAWNED; + wcmd.cmd.workers_spawned.workers_count = g_hash_table_size(rspamd_main->workers); + rspamd_control_broadcast_srv_cmd(rspamd_main, &wcmd, 0); + msg_info_main("notified workers that spawning is complete after reload (%d workers)", + wcmd.cmd.workers_spawned.workers_count); + } + /* Kill marked */ msg_info_main("kill old workers"); g_hash_table_foreach(rspamd_main->workers, kill_old_workers, NULL); @@ -1687,6 +1699,17 @@ int main(int argc, char **argv, char **env) spawn_workers(rspamd_main, event_loop); rspamd_mempool_unlock_mutex(rspamd_main->start_mtx); + /* Notify all workers that spawning is complete */ + { + struct rspamd_control_command wcmd; + memset(&wcmd, 0, sizeof(wcmd)); + wcmd.type = RSPAMD_CONTROL_WORKERS_SPAWNED; + wcmd.cmd.workers_spawned.workers_count = g_hash_table_size(rspamd_main->workers); + rspamd_control_broadcast_srv_cmd(rspamd_main, &wcmd, 0); + msg_info_main("notified workers that spawning is complete (%d workers)", + wcmd.cmd.workers_spawned.workers_count); + } + rspamd_main->http_ctx = rspamd_http_context_create(rspamd_main->cfg, event_loop, rspamd_main->cfg->ups_ctx); |