diff options
Diffstat (limited to 'src/hs_helper.c')
-rw-r--r-- | src/hs_helper.c | 328 |
1 files changed, 282 insertions, 46 deletions
diff --git a/src/hs_helper.c b/src/hs_helper.c index 26d57528f..f3edbd64b 100644 --- a/src/hs_helper.c +++ b/src/hs_helper.c @@ -1,11 +1,11 @@ -/*- - * Copyright 2016 Vsevolod Stakhov +/* + * Copyright 2025 Vsevolod Stakhov * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -55,6 +55,7 @@ struct hs_helper_ctx { /* END OF COMMON PART */ char *hs_dir; gboolean loaded; + gboolean workers_ready; double max_time; double recompile_time; ev_timer recompile_timer; @@ -72,6 +73,8 @@ init_hs_helper(struct rspamd_config *cfg) ctx->magic = rspamd_hs_helper_magic; ctx->cfg = cfg; ctx->hs_dir = NULL; + ctx->loaded = FALSE; + ctx->workers_ready = FALSE; ctx->max_time = default_max_time; ctx->recompile_time = default_recompile_time; @@ -243,13 +246,111 @@ rspamd_hs_helper_cleanup_dir(struct hs_helper_ctx *ctx, gboolean forced) return ret; } -/* Bad hack, but who cares */ -static gboolean hack_global_forced; + +struct rspamd_hs_helper_compile_cbdata { + struct rspamd_worker *worker; + struct hs_helper_ctx *ctx; + unsigned int total_compiled; + unsigned int scopes_remaining; + gboolean forced; + gboolean workers_ready; +}; static void -rspamd_rs_delayed_cb(EV_P_ ev_timer *w, int revents) +rspamd_rs_send_final_notification(struct rspamd_hs_helper_compile_cbdata *cbd) { - struct rspamd_worker *worker = (struct rspamd_worker *) w->data; + struct rspamd_worker *worker = cbd->worker; + struct hs_helper_ctx *ctx = cbd->ctx; + static struct rspamd_srv_command srv_cmd; + + memset(&srv_cmd, 0, sizeof(srv_cmd)); + srv_cmd.type = RSPAMD_SRV_HYPERSCAN_LOADED; + rspamd_strlcpy(srv_cmd.cmd.hs_loaded.cache_dir, ctx->hs_dir, + sizeof(srv_cmd.cmd.hs_loaded.cache_dir)); + srv_cmd.cmd.hs_loaded.forced = cbd->forced; + srv_cmd.cmd.hs_loaded.scope[0] = '\0'; /* NULL scope means all scopes */ + + rspamd_srv_send_command(worker, + ctx->event_loop, &srv_cmd, -1, NULL, NULL); + + msg_info("sent final hyperscan loaded notification (%d total expressions compiled)", + cbd->total_compiled); + + g_free(cbd); + ev_timer_stop(ctx->event_loop, &ctx->recompile_timer); +} + +static void +rspamd_rs_compile_scoped_cb(const char *scope, unsigned int ncompiled, GError *err, void *cbd) +{ + struct rspamd_hs_helper_compile_cbdata *compile_cbd = + (struct rspamd_hs_helper_compile_cbdata *) cbd; + struct rspamd_worker *worker = compile_cbd->worker; + struct hs_helper_ctx *ctx = compile_cbd->ctx; + static struct rspamd_srv_command srv_cmd; + + if (err != NULL) { + /* Failed to compile: log and continue */ + msg_err("cannot compile Hyperscan database for scope %s: %e", + scope ? scope : "default", err); + } + else { + if (ncompiled > 0) { + compile_cbd->total_compiled += ncompiled; + + /* Send notification for this specific scope */ + memset(&srv_cmd, 0, sizeof(srv_cmd)); + srv_cmd.type = RSPAMD_SRV_HYPERSCAN_LOADED; + rspamd_strlcpy(srv_cmd.cmd.hs_loaded.cache_dir, ctx->hs_dir, + sizeof(srv_cmd.cmd.hs_loaded.cache_dir)); + srv_cmd.cmd.hs_loaded.forced = compile_cbd->forced; + if (scope) { + rspamd_strlcpy(srv_cmd.cmd.hs_loaded.scope, scope, + sizeof(srv_cmd.cmd.hs_loaded.scope)); + } + else { + srv_cmd.cmd.hs_loaded.scope[0] = '\0'; + } + + rspamd_srv_send_command(worker, + ctx->event_loop, &srv_cmd, -1, NULL, NULL); + + msg_info("compiled %d regular expressions for scope %s", + ncompiled, scope ? scope : "default"); + } + } + + compile_cbd->scopes_remaining--; + + /* Check if all scopes are done */ + if (compile_cbd->scopes_remaining == 0) { + if (compile_cbd->workers_ready) { + /* Workers are ready, send notification immediately */ + msg_info("compiled %d total regular expressions to the hyperscan tree, " + "send final notification", + compile_cbd->total_compiled); + rspamd_rs_send_final_notification(compile_cbd); + } + else { + /* Workers not ready yet, notification will be sent when workers_spawned event is received */ + msg_info("compiled %d total regular expressions to the hyperscan tree, " + "waiting for workers to be ready before sending notification", + compile_cbd->total_compiled); + ctx->loaded = TRUE; + } + } +} + +struct rspamd_hs_helper_single_compile_cbdata { + struct rspamd_worker *worker; + gboolean forced; + gboolean workers_ready; +}; + +static void +rspamd_rs_send_single_notification(struct rspamd_hs_helper_single_compile_cbdata *cbd) +{ + struct rspamd_worker *worker = cbd->worker; static struct rspamd_srv_command srv_cmd; struct hs_helper_ctx *ctx; @@ -258,67 +359,66 @@ rspamd_rs_delayed_cb(EV_P_ ev_timer *w, int revents) srv_cmd.type = RSPAMD_SRV_HYPERSCAN_LOADED; rspamd_strlcpy(srv_cmd.cmd.hs_loaded.cache_dir, ctx->hs_dir, sizeof(srv_cmd.cmd.hs_loaded.cache_dir)); - srv_cmd.cmd.hs_loaded.forced = hack_global_forced; - hack_global_forced = FALSE; + srv_cmd.cmd.hs_loaded.forced = cbd->forced; + srv_cmd.cmd.hs_loaded.scope[0] = '\0'; /* NULL scope means all scopes */ rspamd_srv_send_command(worker, ctx->event_loop, &srv_cmd, -1, NULL, NULL); - ev_timer_stop(EV_A_ w); - g_free(w); - ev_timer_again(EV_A_ & ctx->recompile_timer); + msg_info("sent hyperscan loaded notification"); + + g_free(cbd); + ev_timer_again(ctx->event_loop, &ctx->recompile_timer); } static void rspamd_rs_compile_cb(unsigned int ncompiled, GError *err, void *cbd) { - struct rspamd_worker *worker = (struct rspamd_worker *) cbd; - ev_timer *tm; - ev_tstamp when = 0.0; + struct rspamd_hs_helper_single_compile_cbdata *compile_cbd = + (struct rspamd_hs_helper_single_compile_cbdata *) cbd; + struct rspamd_worker *worker = compile_cbd->worker; struct hs_helper_ctx *ctx; + struct rspamd_hs_helper_single_compile_cbdata *timer_cbd; ctx = (struct hs_helper_ctx *) worker->ctx; if (err != NULL) { /* Failed to compile: log and go out */ msg_err("cannot compile Hyperscan database: %e", err); - + g_free(compile_cbd); return; } - if (ncompiled > 0) { - /* Enforce update for other workers */ - hack_global_forced = TRUE; - } + timer_cbd = g_malloc0(sizeof(*timer_cbd)); + timer_cbd->worker = worker; + timer_cbd->forced = (ncompiled > 0) ? TRUE : compile_cbd->forced; + timer_cbd->workers_ready = compile_cbd->workers_ready; - /* - * Do not send notification unless all other workers are started - * XXX: now we just sleep for 1 seconds to ensure that - */ - if (!ctx->loaded) { - when = 1.0; /* Postpone */ - ctx->loaded = TRUE; + if (timer_cbd->workers_ready) { + /* Workers are ready, send notification immediately */ msg_info("compiled %d regular expressions to the hyperscan tree, " - "postpone loaded notification for %.0f seconds to avoid races", - ncompiled, - when); + "send loaded notification", + ncompiled); + rspamd_rs_send_single_notification(timer_cbd); } else { + /* Workers not ready yet, notification will be sent when workers_spawned event is received */ msg_info("compiled %d regular expressions to the hyperscan tree, " - "send loaded notification", + "waiting for workers to be ready before sending notification", ncompiled); + ctx->loaded = TRUE; } - tm = g_malloc0(sizeof(*tm)); - tm->data = (void *) worker; - ev_timer_init(tm, rspamd_rs_delayed_cb, when, 0); - ev_timer_start(ctx->event_loop, tm); + g_free(compile_cbd); } static gboolean rspamd_rs_compile(struct hs_helper_ctx *ctx, struct rspamd_worker *worker, gboolean forced) { + msg_info("starting hyperscan compilation (forced: %s, workers_ready: %s)", + forced ? "yes" : "no", ctx->workers_ready ? "yes" : "no"); + #if !defined(__aarch64__) && !defined(__powerpc64__) if (!(ctx->cfg->libs_ctx->crypto_ctx->cpu_config & CPUID_SSSE3)) { msg_warn("CPU doesn't have SSSE3 instructions set " @@ -331,13 +431,84 @@ rspamd_rs_compile(struct hs_helper_ctx *ctx, struct rspamd_worker *worker, msg_warn("cannot cleanup cache dir '%s'", ctx->hs_dir); } - hack_global_forced = forced; /* killmeplease */ - rspamd_re_cache_compile_hyperscan(ctx->cfg->re_cache, - ctx->hs_dir, ctx->max_time, !forced, - ctx->event_loop, - rspamd_rs_compile_cb, - (void *) worker); + /* Check if we have any scopes */ + unsigned int scope_count = rspamd_re_cache_count_scopes(ctx->cfg->re_cache); + if (scope_count == 0) { + /* No additional scopes, just default scope - use standard compilation */ + struct rspamd_hs_helper_single_compile_cbdata *single_cbd = + g_malloc0(sizeof(*single_cbd)); + single_cbd->worker = worker; + single_cbd->forced = forced; + single_cbd->workers_ready = ctx->workers_ready; + + rspamd_re_cache_compile_hyperscan(ctx->cfg->re_cache, + ctx->hs_dir, ctx->max_time, !forced, + ctx->event_loop, + rspamd_rs_compile_cb, + (void *) single_cbd); + return TRUE; + } + /* Count scopes and prepare compilation data */ + struct rspamd_re_cache *scope; + unsigned int total_scopes = 0; + + /* Count valid scopes first */ + for (scope = rspamd_re_cache_scope_first(ctx->cfg->re_cache); + scope != NULL; + scope = rspamd_re_cache_scope_next(scope)) { + const char *scope_name = rspamd_re_cache_scope_name(scope); + const char *scope_for_check = (strcmp(scope_name, "default") == 0) ? NULL : scope_name; + + if (rspamd_re_cache_is_loaded(ctx->cfg->re_cache, scope_for_check)) { + total_scopes++; + } + } + + if (total_scopes == 0) { + /* No loaded scopes, use standard compilation for default scope */ + struct rspamd_hs_helper_single_compile_cbdata *single_cbd = + g_malloc0(sizeof(*single_cbd)); + single_cbd->worker = worker; + single_cbd->forced = forced; + single_cbd->workers_ready = ctx->workers_ready; + + rspamd_re_cache_compile_hyperscan(ctx->cfg->re_cache, + ctx->hs_dir, ctx->max_time, !forced, + ctx->event_loop, + rspamd_rs_compile_cb, + (void *) single_cbd); + return TRUE; + } + + /* Prepare compilation callback data */ + struct rspamd_hs_helper_compile_cbdata *compile_cbd = + g_malloc0(sizeof(*compile_cbd)); + compile_cbd->worker = worker; + compile_cbd->ctx = ctx; + compile_cbd->total_compiled = 0; + compile_cbd->scopes_remaining = total_scopes; + compile_cbd->forced = forced; + compile_cbd->workers_ready = ctx->workers_ready; + + /* Compile each loaded scope */ + for (scope = rspamd_re_cache_scope_first(ctx->cfg->re_cache); + scope != NULL; + scope = rspamd_re_cache_scope_next(scope)) { + const char *scope_name = rspamd_re_cache_scope_name(scope); + const char *scope_for_compile = (strcmp(scope_name, "default") == 0) ? NULL : scope_name; + + if (rspamd_re_cache_is_loaded(ctx->cfg->re_cache, scope_for_compile)) { + rspamd_re_cache_compile_hyperscan_scoped_single(scope, scope_for_compile, + ctx->hs_dir, ctx->max_time, !forced, + ctx->event_loop, + rspamd_rs_compile_scoped_cb, + compile_cbd); + } + else { + msg_debug("skipping unloaded scope: %s", scope_name); + } + } return TRUE; } @@ -364,11 +535,71 @@ rspamd_hs_helper_reload(struct rspamd_main *rspamd_main, /* Stop recompile */ ev_timer_stop(ctx->event_loop, &ctx->recompile_timer); + ctx->loaded = FALSE; /* Reset flag for forced recompile */ rspamd_rs_compile(ctx, worker, TRUE); return TRUE; } +static gboolean +rspamd_hs_helper_workers_spawned(struct rspamd_main *rspamd_main, + struct rspamd_worker *worker, int fd, + int attached_fd, + struct rspamd_control_command *cmd, + gpointer ud) +{ + struct rspamd_control_reply rep; + struct hs_helper_ctx *ctx = ud; + + msg_info("received workers_spawned notification (%d workers); hyperscan compilation finished: %s", + cmd->cmd.workers_spawned.workers_count, + ctx->loaded ? "yes" : "no"); + + /* Mark that workers are ready */ + ctx->workers_ready = TRUE; + + memset(&rep, 0, sizeof(rep)); + rep.type = RSPAMD_CONTROL_WORKERS_SPAWNED; + rep.reply.workers_spawned.status = 0; + + /* Write reply */ + if (write(fd, &rep, sizeof(rep)) != sizeof(rep)) { + msg_err("cannot write reply to the control socket: %s", + strerror(errno)); + } + + /* If hyperscan compilation has finished but we were waiting for workers, trigger notification now */ + if (ctx->loaded) { + static struct rspamd_srv_command srv_cmd; + + memset(&srv_cmd, 0, sizeof(srv_cmd)); + srv_cmd.type = RSPAMD_SRV_HYPERSCAN_LOADED; + rspamd_strlcpy(srv_cmd.cmd.hs_loaded.cache_dir, ctx->hs_dir, + sizeof(srv_cmd.cmd.hs_loaded.cache_dir)); + srv_cmd.cmd.hs_loaded.forced = FALSE; + srv_cmd.cmd.hs_loaded.scope[0] = '\0'; /* NULL scope means all scopes */ + + rspamd_srv_send_command(worker, + ctx->event_loop, &srv_cmd, -1, NULL, NULL); + + msg_info("sent delayed hyperscan loaded notification after workers spawned"); + ctx->loaded = FALSE; /* Reset to avoid duplicate notifications */ + } + else { + /* Start initial compilation now that workers are ready */ + msg_info("starting initial hyperscan compilation after workers spawned"); + if (!rspamd_rs_compile(ctx, worker, FALSE)) { + msg_warn("initial hyperscan compilation failed or not needed"); + } + } + + if (attached_fd != -1) { + close(attached_fd); + } + + return TRUE; +} + static void rspamd_hs_helper_timer(EV_P_ ev_timer *w, int revents) { @@ -379,6 +610,9 @@ rspamd_hs_helper_timer(EV_P_ ev_timer *w, int revents) ctx = worker->ctx; tim = rspamd_time_jitter(ctx->recompile_time, 0); w->repeat = tim; + + msg_info("periodic recompilation timer triggered (workers_ready: %s)", + ctx->workers_ready ? "yes" : "no"); rspamd_rs_compile(ctx, worker, FALSE); } @@ -398,23 +632,25 @@ start_hs_helper(struct rspamd_worker *worker) ctx->hs_dir = RSPAMD_DBDIR "/"; } + msg_info("hs_helper starting: cache_dir=%s, recompile_time=%.1f, workers_ready=%s", + ctx->hs_dir, ctx->recompile_time, ctx->workers_ready ? "yes" : "no"); + ctx->event_loop = rspamd_prepare_worker(worker, "hs_helper", NULL); - if (!rspamd_rs_compile(ctx, worker, FALSE)) { - /* Tell main not to respawn more workers */ - exit(EXIT_SUCCESS); - } - rspamd_control_worker_add_cmd_handler(worker, RSPAMD_CONTROL_RECOMPILE, rspamd_hs_helper_reload, ctx); + rspamd_control_worker_add_cmd_handler(worker, RSPAMD_CONTROL_WORKERS_SPAWNED, + rspamd_hs_helper_workers_spawned, ctx); ctx->recompile_timer.data = worker; tim = rspamd_time_jitter(ctx->recompile_time, 0); + msg_info("setting up recompile timer for %.1f seconds", tim); ev_timer_init(&ctx->recompile_timer, rspamd_hs_helper_timer, tim, 0.0); ev_timer_start(ctx->event_loop, &ctx->recompile_timer); + msg_info("hs_helper starting event loop"); ev_loop(ctx->event_loop, 0); rspamd_worker_block_signals(); |