/* * SonarQube * Copyright (C) 2009-2025 SonarSource SA * mailto:info AT sonarsource DOT com * * This program is free software; you can redistribute it and/or * modify it under the terms of the GNU Lesser General Public * License as published by the Free Software Foundation; either * version 3 of the License, or (at your option) any later version. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * Lesser General Public License for more details. * * You should have received a copy of the GNU Lesser General Public License * along with this program; if not, write to the Free Software Foundation, * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. */ package org.sonar.ce.taskprocessor; import java.util.HashMap; import java.util.Map; import java.util.Optional; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.sonar.api.utils.System2; import org.sonar.ce.task.CeTask; import org.sonar.ce.task.CeTaskInterruptedException; import org.sonar.ce.task.CeTaskTimeoutException; import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkState; import static java.lang.String.format; /** * An implementation of {@link org.sonar.ce.task.CeTaskInterrupter} which interrupts the processing of the task * if: * */ public class TimeoutCeTaskInterrupter extends SimpleCeTaskInterrupter { private static final Logger LOG = LoggerFactory.getLogger(TimeoutCeTaskInterrupter.class); private final long taskTimeoutThreshold; private final CeWorkerController ceWorkerController; private final System2 system2; private final Map startTimestampByCeTaskUuid = new HashMap<>(); public TimeoutCeTaskInterrupter(long taskTimeoutThreshold, CeWorkerController ceWorkerController, System2 system2) { checkArgument(taskTimeoutThreshold >= 1, "threshold must be >= 1"); LOG.info("Compute Engine Task timeout enabled: {} ms", taskTimeoutThreshold); this.taskTimeoutThreshold = taskTimeoutThreshold; this.ceWorkerController = ceWorkerController; this.system2 = system2; } @Override public void check(Thread currentThread) throws CeTaskInterruptedException { super.check(currentThread); computeTimeOutOf(taskOf(currentThread)) .ifPresent(timeout -> { throw new CeTaskTimeoutException(format("Execution of task timed out after %s ms", timeout)); }); } private Optional computeTimeOutOf(CeTask ceTask) { Long startTimestamp = startTimestampByCeTaskUuid.get(ceTask.getUuid()); checkState(startTimestamp != null, "No start time recorded for task %s", ceTask.getUuid()); long duration = system2.now() - startTimestamp; return Optional.of(duration) .filter(t -> t > taskTimeoutThreshold); } private CeTask taskOf(Thread currentThread) { return ceWorkerController.getCeWorkerIn(currentThread) .flatMap(CeWorker::getCurrentTask) .orElseThrow(() -> new IllegalStateException(format("Could not find the CeTask being executed in thread '%s'", currentThread.getName()))); } @Override public void onStart(CeTask ceTask) { long now = system2.now(); Long existingTimestamp = startTimestampByCeTaskUuid.put(ceTask.getUuid(), now); if (existingTimestamp != null) { LOG.warn( "Notified of start of execution of task {} but start had already been recorded at {}. Recording new start at {}", ceTask.getUuid(), existingTimestamp, now); } } @Override public void onEnd(CeTask ceTask) { Long startTimestamp = startTimestampByCeTaskUuid.remove(ceTask.getUuid()); if (startTimestamp == null) { LOG.warn("Notified of end of execution of task {} but start wasn't recorded", ceTask.getUuid()); } } }