--- /dev/null
+/*
+ * SonarQube
+ * Copyright (C) 2009-2018 SonarSource SA
+ * mailto:info AT sonarsource DOT com
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 3 of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this program; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ */
+package org.sonar.ce.task;
+
+import org.sonar.db.ce.CeActivityDto;
+
+import static java.lang.String.format;
+
+public final class CeTaskCanceledException extends CeTaskInterruptedException {
+ public CeTaskCanceledException(Thread thread) {
+ super(format("CeWorker executing in Thread '%s' has been interrupted", thread.getName()),
+ CeActivityDto.Status.CANCELED);
+ }
+}
--- /dev/null
+/*
+ * SonarQube
+ * Copyright (C) 2009-2018 SonarSource SA
+ * mailto:info AT sonarsource DOT com
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 3 of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this program; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ */
+package org.sonar.ce.task;
+
+import java.util.Optional;
+import org.sonar.db.ce.CeActivityDto;
+
+import static java.util.Objects.requireNonNull;
+
+public abstract class CeTaskInterruptedException extends RuntimeException {
+ private final CeActivityDto.Status status;
+
+ protected CeTaskInterruptedException(String message, CeActivityDto.Status status) {
+ super(message);
+ this.status = requireNonNull(status, "status can't be null");
+ }
+
+ public CeActivityDto.Status getStatus() {
+ return status;
+ }
+
+ public static Optional<CeTaskInterruptedException> isTaskInterruptedException(Throwable e) {
+ if (e instanceof CeTaskInterruptedException) {
+ return Optional.of((CeTaskInterruptedException) e);
+ }
+ return isCauseInterruptedException(e);
+ }
+
+ private static Optional<CeTaskInterruptedException> isCauseInterruptedException(Throwable e) {
+ Throwable cause = e.getCause();
+ if (cause == null || cause == e) {
+ return Optional.empty();
+ }
+ if (cause instanceof CeTaskInterruptedException) {
+ return Optional.of((CeTaskInterruptedException) cause);
+ }
+ return isCauseInterruptedException(cause);
+ }
+}
--- /dev/null
+/*
+ * SonarQube
+ * Copyright (C) 2009-2018 SonarSource SA
+ * mailto:info AT sonarsource DOT com
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 3 of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this program; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ */
+package org.sonar.ce.task;
+
+/**
+ * Method {@link #check(Thread)} of the {@link CeTaskInterrupter} can be called during the processing of a
+ * {@link CeTask} to check whether processing of this task must be interrupted.
+ * <p>
+ * Interruption cause may be user cancelling the task, operator stopping the Compute Engine, execution running for
+ * too long, ...
+ */
+public interface CeTaskInterrupter {
+ /**
+ * @throws CeTaskInterruptedException if the execution of the task must be interrupted
+ */
+ void check(Thread currentThread) throws CeTaskInterruptedException;
+
+ /**
+ * Lets the interrupter know that the processing of the specified task has started.
+ */
+ void onStart(CeTask ceTask);
+
+ /**
+ * Lets the interrupter know that the processing of the specified task has ended.
+ */
+ void onEnd(CeTask ceTask);
+}
--- /dev/null
+/*
+ * SonarQube
+ * Copyright (C) 2009-2018 SonarSource SA
+ * mailto:info AT sonarsource DOT com
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 3 of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this program; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ */
+package org.sonar.ce.task;
+
+import org.sonar.db.ce.CeActivityDto;
+
+/**
+ * This exception will stop the task and make it be archived with the {@link CeActivityDto.Status#FAILED FAILED}
+ * status and the error type {@code TIMEOUT}.
+ * <p>
+ * This exception has no stacktrace:
+ * <ul>
+ * <li>it's irrelevant to the end user</li>
+ * <li>we don't want to leak any implementation detail</li>
+ * </ul>
+ */
+public final class CeTaskTimeoutException extends CeTaskInterruptedException implements TypedException {
+
+ public CeTaskTimeoutException(String message) {
+ super(message, CeActivityDto.Status.FAILED);
+ }
+
+ @Override
+ public String getType() {
+ return "TIMEOUT";
+ }
+
+ /**
+ * Does not fill in the stack trace
+ *
+ * @see Throwable#fillInStackTrace()
+ */
+ @Override
+ public synchronized Throwable fillInStackTrace() {
+ return this;
+ }
+}
import javax.annotation.Nullable;
import org.sonar.api.utils.log.Logger;
import org.sonar.api.utils.log.Loggers;
+import org.sonar.ce.task.CeTaskInterrupter;
import org.sonar.core.util.logs.Profiler;
import static com.google.common.base.Preconditions.checkArgument;
private static final Logger LOGGER = Loggers.get(ComputationStepExecutor.class);
private final ComputationSteps steps;
+ private final CeTaskInterrupter taskInterrupter;
@CheckForNull
private final Listener listener;
* Used when no {@link ComputationStepExecutor.Listener} is available in pico
* container.
*/
- public ComputationStepExecutor(ComputationSteps steps) {
- this(steps, null);
+ public ComputationStepExecutor(ComputationSteps steps, CeTaskInterrupter taskInterrupter) {
+ this(steps, taskInterrupter, null);
}
- public ComputationStepExecutor(ComputationSteps steps, @Nullable Listener listener) {
+ public ComputationStepExecutor(ComputationSteps steps, CeTaskInterrupter taskInterrupter, @Nullable Listener listener) {
this.steps = steps;
+ this.taskInterrupter = taskInterrupter;
this.listener = listener;
}
}
}
- private static void executeStep(Profiler stepProfiler, ComputationStep.Context context, ComputationStep step) {
+ private void executeStep(Profiler stepProfiler, ComputationStep.Context context, ComputationStep step) {
String status = "FAILED";
stepProfiler.start();
try {
+ taskInterrupter.check(Thread.currentThread());
step.execute(context);
status = "SUCCESS";
} finally {
--- /dev/null
+/*
+ * SonarQube
+ * Copyright (C) 2009-2018 SonarSource SA
+ * mailto:info AT sonarsource DOT com
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 3 of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this program; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ */
+package org.sonar.ce.task;
+
+import org.junit.Test;
+import org.sonar.db.ce.CeActivityDto;
+
+import static org.apache.commons.lang.RandomStringUtils.randomAlphabetic;
+import static org.assertj.core.api.Assertions.assertThat;
+
+public class CeTaskCanceledExceptionTest {
+ @Test
+ public void message_is_based_on_specified_thread_name() {
+ Thread t = new Thread();
+ t.setName(randomAlphabetic(29));
+
+ CeTaskCanceledException underTest = new CeTaskCanceledException(t);
+
+ assertThat(underTest.getMessage()).isEqualTo("CeWorker executing in Thread '" + t.getName() + "' has been interrupted");
+ }
+
+ @Test
+ public void getStatus_returns_CANCELED() {
+ CeTaskCanceledException underTest = new CeTaskCanceledException(new Thread());
+
+ assertThat(underTest.getStatus()).isEqualTo(CeActivityDto.Status.CANCELED);
+ }
+}
--- /dev/null
+/*
+ * SonarQube
+ * Copyright (C) 2009-2018 SonarSource SA
+ * mailto:info AT sonarsource DOT com
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 3 of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this program; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ */
+package org.sonar.ce.task;
+
+import java.util.Random;
+import org.junit.Test;
+import org.sonar.db.ce.CeActivityDto;
+
+import static org.apache.commons.lang.RandomStringUtils.randomAlphabetic;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.sonar.ce.task.CeTaskInterruptedException.isTaskInterruptedException;
+
+public class CeTaskInterruptedExceptionTest {
+
+ @Test
+ public void isCauseInterruptedException_returns_CeTaskInterruptedException_or_subclass() {
+ String message = randomAlphabetic(50);
+ CeActivityDto.Status status = randomStatus();
+ CeTaskInterruptedException e1 = new CeTaskInterruptedException(message, status) {
+
+ };
+ CeTaskInterruptedException e2 = new CeTaskInterruptedExceptionSubclass(message, status);
+
+ assertThat(isTaskInterruptedException(e1)).contains(e1);
+ assertThat(isTaskInterruptedException(e2)).contains(e2);
+ assertThat(isTaskInterruptedException(new RuntimeException())).isEmpty();
+ assertThat(isTaskInterruptedException(new Exception())).isEmpty();
+ }
+
+ @Test
+ public void isCauseInterruptedException_returns_CeTaskInterruptedException_or_subclass_in_cause_chain() {
+ String message = randomAlphabetic(50);
+ CeActivityDto.Status status = randomStatus();
+ CeTaskInterruptedException e1 = new CeTaskInterruptedException(message, status) {
+
+ };
+ CeTaskInterruptedException e2 = new CeTaskInterruptedExceptionSubclass(message, status);
+
+ assertThat(isTaskInterruptedException(new RuntimeException(e1))).contains(e1);
+ assertThat(isTaskInterruptedException(new Exception(new RuntimeException(e2)))).contains(e2);
+ }
+
+ private static CeActivityDto.Status randomStatus() {
+ return CeActivityDto.Status.values()[new Random().nextInt(CeActivityDto.Status.values().length)];
+ }
+
+ private static class CeTaskInterruptedExceptionSubclass extends CeTaskInterruptedException {
+ public CeTaskInterruptedExceptionSubclass(String message, CeActivityDto.Status status) {
+ super(message, status);
+ }
+ }
+
+}
--- /dev/null
+/*
+ * SonarQube
+ * Copyright (C) 2009-2018 SonarSource SA
+ * mailto:info AT sonarsource DOT com
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 3 of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this program; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ */
+package org.sonar.ce.task;
+
+import java.io.PrintWriter;
+import java.io.StringWriter;
+import org.apache.commons.lang.RandomStringUtils;
+import org.junit.Test;
+import org.sonar.db.ce.CeActivityDto;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+public class CeTaskTimeoutExceptionTest {
+ private String message = RandomStringUtils.randomAlphabetic(50);
+ private CeTaskTimeoutException underTest = new CeTaskTimeoutException(message);
+
+ @Test
+ public void verify_message_and_type() {
+ assertThat(underTest.getMessage()).isEqualTo(message);
+ assertThat(underTest.getType()).isEqualTo("TIMEOUT");
+ }
+
+ @Test
+ public void getStatus_returns_FAILED() {
+ assertThat(underTest.getStatus()).isEqualTo(CeActivityDto.Status.FAILED);
+ }
+
+
+ @Test
+ public void noStacktrace() {
+ StringWriter stacktrace = new StringWriter();
+ underTest.printStackTrace(new PrintWriter(stacktrace));
+ assertThat(stacktrace.toString())
+ .isEqualTo(CeTaskTimeoutException.class.getCanonicalName() + ": " + message + System.lineSeparator());
+ }
+}
import org.mockito.InOrder;
import org.sonar.api.utils.log.LogTester;
import org.sonar.api.utils.log.LoggerLevel;
+import org.sonar.ce.task.CeTaskInterrupter;
import org.sonar.ce.task.ChangeLogLevel;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.fail;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyBoolean;
+import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;
public ExpectedException expectedException = ExpectedException.none();
private final ComputationStepExecutor.Listener listener = mock(ComputationStepExecutor.Listener.class);
+ private final CeTaskInterrupter taskInterrupter = mock(CeTaskInterrupter.class);
private final ComputationStep computationStep1 = mockComputationStep("step1");
private final ComputationStep computationStep2 = mockComputationStep("step2");
private final ComputationStep computationStep3 = mockComputationStep("step3");
@Test
public void execute_call_execute_on_each_ComputationStep_in_order_returned_by_instances_method() {
- new ComputationStepExecutor(mockComputationSteps(computationStep1, computationStep2, computationStep3))
+ new ComputationStepExecutor(mockComputationSteps(computationStep1, computationStep2, computationStep3), taskInterrupter)
.execute();
InOrder inOrder = inOrder(computationStep1, computationStep2, computationStep3);
.when(computationStep)
.execute(any());
- ComputationStepExecutor computationStepExecutor = new ComputationStepExecutor(mockComputationSteps(computationStep));
+ ComputationStepExecutor computationStepExecutor = new ComputationStepExecutor(mockComputationSteps(computationStep), taskInterrupter);
expectedException.expect(RuntimeException.class);
expectedException.expectMessage(message);
ChangeLogLevel logLevel1 = new ChangeLogLevel(step1.getClass(), LoggerLevel.INFO);
ChangeLogLevel logLevel2 = new ChangeLogLevel(step2.getClass(), LoggerLevel.INFO);
ChangeLogLevel logLevel3 = new ChangeLogLevel(step3.getClass(), LoggerLevel.INFO)) {
- new ComputationStepExecutor(mockComputationSteps(step1, step2, step3)).execute();
+ new ComputationStepExecutor(mockComputationSteps(step1, step2, step3), taskInterrupter).execute();
List<String> infoLogs = logTester.logs(LoggerLevel.INFO);
assertThat(infoLogs).hasSize(3);
super.execute(context);
throw expected;
}
- } ;
+ };
try (ChangeLogLevel executor = new ChangeLogLevel(ComputationStepExecutor.class, LoggerLevel.INFO);
ChangeLogLevel logLevel1 = new ChangeLogLevel(step1.getClass(), LoggerLevel.INFO);
ChangeLogLevel logLevel3 = new ChangeLogLevel(step3.getClass(), LoggerLevel.INFO)) {
try {
- new ComputationStepExecutor(mockComputationSteps(step1, step2, step3)).execute();
+ new ComputationStepExecutor(mockComputationSteps(step1, step2, step3), taskInterrupter).execute();
fail("a RuntimeException should have been thrown");
} catch (RuntimeException e) {
List<String> infoLogs = logTester.logs(LoggerLevel.INFO);
expectedException.expect(IllegalArgumentException.class);
expectedException.expectMessage("Statistic with key [time] is not accepted");
- new ComputationStepExecutor(mockComputationSteps(step)).execute();
+ new ComputationStepExecutor(mockComputationSteps(step), taskInterrupter).execute();
}
}
expectedException.expect(IllegalArgumentException.class);
expectedException.expectMessage("Statistic with key [foo] is already present");
- new ComputationStepExecutor(mockComputationSteps(step)).execute();
+ new ComputationStepExecutor(mockComputationSteps(step), taskInterrupter).execute();
}
}
expectedException.expect(NullPointerException.class);
expectedException.expectMessage("Statistic has null key");
- new ComputationStepExecutor(mockComputationSteps(step)).execute();
+ new ComputationStepExecutor(mockComputationSteps(step), taskInterrupter).execute();
}
}
expectedException.expect(NullPointerException.class);
expectedException.expectMessage("Statistic with key [bar] has null value");
- new ComputationStepExecutor(mockComputationSteps(step)).execute();
+ new ComputationStepExecutor(mockComputationSteps(step), taskInterrupter).execute();
}
}
@Test
public void execute_calls_listener_finished_method_with_all_step_runs() {
- new ComputationStepExecutor(mockComputationSteps(computationStep1, computationStep2), listener)
+ new ComputationStepExecutor(mockComputationSteps(computationStep1, computationStep2), taskInterrupter, listener)
.execute();
verify(listener).finished(true);
.execute(any());
try {
- new ComputationStepExecutor(mockComputationSteps(computationStep1, computationStep2), listener)
+ new ComputationStepExecutor(mockComputationSteps(computationStep1, computationStep2), taskInterrupter, listener)
.execute();
fail("exception toBeThrown should have been raised");
} catch (RuntimeException e) {
.when(listener)
.finished(anyBoolean());
- new ComputationStepExecutor(mockComputationSteps(computationStep1), listener).execute();
+ new ComputationStepExecutor(mockComputationSteps(computationStep1), taskInterrupter, listener).execute();
+ }
+
+ @Test
+ public void execute_fails_with_exception_thrown_by_interrupter() throws Throwable {
+ executeFailsWithExceptionThrownByInterrupter();
+
+ reset(computationStep1, computationStep2, computationStep3, taskInterrupter);
+ runInOtherThread(this::executeFailsWithExceptionThrownByInterrupter);
+ }
+
+ private void executeFailsWithExceptionThrownByInterrupter() {
+ Thread currentThread = Thread.currentThread();
+ ComputationStepExecutor underTest = new ComputationStepExecutor(mockComputationSteps(computationStep1, computationStep2, computationStep3), taskInterrupter);
+ RuntimeException exception = new RuntimeException("mocking fail of method check()");
+ doNothing()
+ .doNothing()
+ .doThrow(exception)
+ .when(taskInterrupter)
+ .check(currentThread);
+
+ try {
+ underTest.execute();
+ fail("execute should have thrown an exception");
+ } catch (Exception e) {
+ assertThat(e).isSameAs(exception);
+ }
+ }
+
+ @Test
+ public void execute_calls_interrupter_with_current_thread_before_each_step() throws Throwable {
+ executeCallsInterrupterWithCurrentThreadBeforeEachStep();
+
+ reset(computationStep1, computationStep2, computationStep3, taskInterrupter);
+ runInOtherThread(this::executeCallsInterrupterWithCurrentThreadBeforeEachStep);
+ }
+
+ private void executeCallsInterrupterWithCurrentThreadBeforeEachStep() {
+ InOrder inOrder = inOrder(computationStep1, computationStep2, computationStep3, taskInterrupter);
+ ComputationStepExecutor underTest = new ComputationStepExecutor(mockComputationSteps(computationStep1, computationStep2, computationStep3), taskInterrupter);
+
+ underTest.execute();
+
+ inOrder.verify(taskInterrupter).check(Thread.currentThread());
+ inOrder.verify(computationStep1).execute(any());
+ inOrder.verify(computationStep1).getDescription();
+ inOrder.verify(taskInterrupter).check(Thread.currentThread());
+ inOrder.verify(computationStep2).execute(any());
+ inOrder.verify(computationStep2).getDescription();
+ inOrder.verify(taskInterrupter).check(Thread.currentThread());
+ inOrder.verify(computationStep3).execute(any());
+ inOrder.verify(computationStep3).getDescription();
+ inOrder.verifyNoMoreInteractions();
+ }
+
+ private void runInOtherThread(Runnable r) throws Throwable {
+ Throwable[] otherThreadException = new Throwable[1];
+ Thread t = new Thread(() -> {
+ try {
+ r.run();
+ } catch (Throwable e) {
+ otherThreadException[0] = e;
+ }
+ });
+ t.start();
+ t.join();
+
+ if (otherThreadException[0] != null) {
+ throw otherThreadException[0];
+ }
}
private static ComputationSteps mockComputationSteps(ComputationStep... computationSteps) {
import org.picocontainer.Startable;
import org.sonar.ce.configuration.CeConfiguration;
import org.sonar.ce.taskprocessor.CeWorker;
+import org.sonar.ce.taskprocessor.CeWorkerController;
import org.sonar.ce.taskprocessor.CeWorkerFactory;
-import org.sonar.ce.taskprocessor.EnabledCeWorkerController;
import org.sonar.process.Jmx;
import org.sonar.process.systeminfo.SystemInfoSection;
import org.sonar.process.systeminfo.protobuf.ProtobufSystemInfo;
private final CEQueueStatus queueStatus;
private final CeConfiguration ceConfiguration;
private final CeWorkerFactory ceWorkerFactory;
- private final EnabledCeWorkerController enabledCeWorkerController;
+ private final CeWorkerController ceWorkerController;
- public CeTasksMBeanImpl(CEQueueStatus queueStatus, CeConfiguration ceConfiguration, CeWorkerFactory ceWorkerFactory, EnabledCeWorkerController enabledCeWorkerController) {
+ public CeTasksMBeanImpl(CEQueueStatus queueStatus, CeConfiguration ceConfiguration, CeWorkerFactory ceWorkerFactory, CeWorkerController CeWorkerController) {
this.queueStatus = queueStatus;
this.ceConfiguration = ceConfiguration;
this.ceWorkerFactory = ceWorkerFactory;
- this.enabledCeWorkerController = enabledCeWorkerController;
+ this.ceWorkerController = CeWorkerController;
}
@Override
public List<String> getEnabledWorkerUuids() {
Set<CeWorker> workers = ceWorkerFactory.getWorkers();
return workers.stream()
- .filter(enabledCeWorkerController::isEnabled)
+ .filter(ceWorkerController::isEnabled)
.map(CeWorker::getUUID)
.sorted()
.collect(Collectors.toList());
import org.sonar.ce.monitoring.CEQueueStatus;
import org.sonar.ce.task.CeTask;
import org.sonar.ce.task.CeTaskResult;
-import org.sonar.ce.task.projectanalysis.component.VisitException;
import org.sonar.ce.task.TypedException;
+import org.sonar.ce.task.projectanalysis.component.VisitException;
import org.sonar.core.util.UuidFactory;
import org.sonar.db.DbClient;
import org.sonar.db.DbSession;
private final long delayBetweenEnabledTasks;
private final TimeUnit timeUnit;
private final ChainingCallback[] chainingCallbacks;
- private final EnabledCeWorkerController ceWorkerController;
+ private final CeWorkerController ceWorkerController;
private final int gracefulStopTimeoutInMs;
public CeProcessingSchedulerImpl(CeConfiguration ceConfiguration,
CeProcessingSchedulerExecutorService processingExecutorService, CeWorkerFactory ceCeWorkerFactory,
- EnabledCeWorkerController ceWorkerController) {
+ CeWorkerController ceWorkerController) {
this.executorService = processingExecutorService;
this.delayBetweenEnabledTasks = ceConfiguration.getQueuePollingDelay();
--- /dev/null
+/*
+ * SonarQube
+ * Copyright (C) 2009-2018 SonarSource SA
+ * mailto:info AT sonarsource DOT com
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 3 of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this program; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ */
+package org.sonar.ce.taskprocessor;
+
+import org.picocontainer.injectors.ProviderAdapter;
+import org.sonar.api.config.Configuration;
+import org.sonar.api.utils.System2;
+import org.sonar.ce.task.CeTaskInterrupter;
+
+import static com.google.common.base.Preconditions.checkState;
+
+public class CeTaskInterrupterProvider extends ProviderAdapter {
+ private static final String PROPERTY_CE_TASK_TIMEOUT = "sonar.ce.task.timeoutSeconds";
+
+ private CeTaskInterrupter instance;
+
+ public CeTaskInterrupter provide(Configuration configuration, CeWorkerController ceWorkerController, System2 system2) {
+ if (instance == null) {
+ instance = configuration.getLong(PROPERTY_CE_TASK_TIMEOUT)
+ .filter(timeOutInSeconds -> {
+ checkState(timeOutInSeconds >= 1, "The property '%s' must be a long value >= 1. Got '%s'", PROPERTY_CE_TASK_TIMEOUT, timeOutInSeconds);
+ return true;
+ })
+ .map(timeOutInSeconds -> (CeTaskInterrupter) new TimeoutCeTaskInterrupter(timeOutInSeconds * 1_000L, ceWorkerController, system2))
+ .orElseGet(SimpleCeTaskInterrupter::new);
+ }
+ return instance;
+ }
+}
--- /dev/null
+/*
+ * SonarQube
+ * Copyright (C) 2009-2018 SonarSource SA
+ * mailto:info AT sonarsource DOT com
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 3 of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this program; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ */
+package org.sonar.ce.taskprocessor;
+
+import javax.annotation.Nullable;
+import org.sonar.ce.task.CeTask;
+import org.sonar.ce.task.CeTaskInterrupter;
+import org.sonar.ce.task.CeTaskResult;
+import org.sonar.db.ce.CeActivityDto;
+
+public class CeTaskInterrupterWorkerExecutionListener implements CeWorker.ExecutionListener {
+ private final CeTaskInterrupter interrupter;
+
+ public CeTaskInterrupterWorkerExecutionListener(CeTaskInterrupter interrupter) {
+ this.interrupter = interrupter;
+ }
+
+ @Override
+ public void onStart(CeTask ceTask) {
+ interrupter.onStart(ceTask);
+ }
+
+ @Override
+ public void onEnd(CeTask ceTask, CeActivityDto.Status status, @Nullable CeTaskResult taskResult, @Nullable Throwable error) {
+ interrupter.onEnd(ceTask);
+ }
+}
CeTaskProcessorRepositoryImpl.class,
CeLoggingWorkerExecutionListener.class,
ReportAnalysisFailureNotificationExecutionListener.class,
+ new CeTaskInterrupterProvider(),
+ CeTaskInterrupterWorkerExecutionListener.class,
CeWorkerFactoryImpl.class,
- EnabledCeWorkerControllerImpl.class,
+ CeWorkerControllerImpl.class,
CeProcessingSchedulerExecutorServiceImpl.class,
- CeProcessingSchedulerImpl.class);
+ CeProcessingSchedulerImpl.class
+
+ );
}
}
*/
package org.sonar.ce.taskprocessor;
+import java.util.Optional;
import java.util.concurrent.Callable;
import javax.annotation.Nullable;
import org.sonar.ce.queue.CeQueue;
* {@code false} otherwise.
*/
public interface CeWorker extends Callable<CeWorker.Result> {
+
enum Result {
/** Worker is disabled */
DISABLED,
*/
String getUUID();
+ /**
+ * @return {@code true} if this CeWorker currently being executed by the specified {@link Thread}.
+ */
+ boolean isExecutedBy(Thread thread);
+
+ /**
+ * @return the {@link CeTask} currently being executed by this worker, if any.
+ */
+ Optional<CeTask> getCurrentTask();
+
/**
* Classes implementing will be called a task start and finishes executing.
* All classes implementing this interface are guaranted to be called for each event, even if another implementation
--- /dev/null
+/*
+ * SonarQube
+ * Copyright (C) 2009-2018 SonarSource SA
+ * mailto:info AT sonarsource DOT com
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 3 of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this program; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ */
+package org.sonar.ce.taskprocessor;
+
+import java.util.Optional;
+
+/**
+ * This class is responsible of knowing/deciding which {@link CeWorker} is enabled and should actually try and find a
+ * task to process.
+ */
+public interface CeWorkerController {
+ interface ProcessingRecorderHook extends AutoCloseable {
+ /**
+ * Override to not declare any exception thrown.
+ */
+ @Override
+ void close();
+ }
+
+ /**
+ * Registers to the controller that the specified {@link CeWorker}
+ */
+ ProcessingRecorderHook registerProcessingFor(CeWorker ceWorker);
+
+ /**
+ * Returns {@code true} if the specified {@link CeWorker} is enabled
+ */
+ boolean isEnabled(CeWorker ceWorker);
+
+ /**
+ * @return the {@link CeWorker} running in the specified {@link Thread}, if any.
+ */
+ Optional<CeWorker> getCeWorkerIn(Thread thread);
+
+ /**
+ * Whether at least one worker is processing a task or not.
+ *
+ * @return {@code false} when all workers are waiting for tasks or are being stopped.
+ */
+ boolean hasAtLeastOneProcessingWorker();
+}
--- /dev/null
+/*
+ * SonarQube
+ * Copyright (C) 2009-2018 SonarSource SA
+ * mailto:info AT sonarsource DOT com
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 3 of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this program; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ */
+package org.sonar.ce.taskprocessor;
+
+import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+import org.sonar.api.utils.log.Loggers;
+import org.sonar.ce.configuration.CeConfiguration;
+
+public class CeWorkerControllerImpl implements CeWorkerController {
+ private final ConcurrentHashMap<CeWorker, Status> workerStatuses = new ConcurrentHashMap<>();
+ private final CeConfiguration ceConfiguration;
+
+ enum Status {
+ PROCESSING, PAUSED
+ }
+
+ public CeWorkerControllerImpl(CeConfiguration ceConfiguration) {
+ this.ceConfiguration = ceConfiguration;
+ logEnabledWorkerCount();
+ }
+
+ private void logEnabledWorkerCount() {
+ int workerCount = ceConfiguration.getWorkerCount();
+ if (workerCount > 1) {
+ Loggers.get(CeWorkerController.class).info("Compute Engine will use {} concurrent workers to process tasks", workerCount);
+ }
+ }
+
+ @Override
+ public Optional<CeWorker> getCeWorkerIn(Thread thread) {
+ return workerStatuses.keySet().stream()
+ .filter(t -> t.isExecutedBy(thread))
+ .findFirst();
+ }
+
+ @Override
+ public ProcessingRecorderHook registerProcessingFor(CeWorker ceWorker) {
+ return new ProcessingRecorderHookImpl(ceWorker);
+ }
+
+ @Override
+ public boolean hasAtLeastOneProcessingWorker() {
+ return workerStatuses.entrySet().stream().anyMatch(e -> e.getValue() == Status.PROCESSING);
+ }
+
+ /**
+ * Returns {@code true} if {@link CeWorker#getOrdinal() worker ordinal} is strictly less than
+ * {@link CeConfiguration#getWorkerCount()}.
+ *
+ * This method does not fail if ordinal is invalid (ie. < 0).
+ */
+ @Override
+ public boolean isEnabled(CeWorker ceWorker) {
+ return ceWorker.getOrdinal() < ceConfiguration.getWorkerCount();
+ }
+
+ private class ProcessingRecorderHookImpl implements ProcessingRecorderHook {
+ private final CeWorker ceWorker;
+
+ private ProcessingRecorderHookImpl(CeWorker ceWorker) {
+ this.ceWorker = ceWorker;
+ workerStatuses.put(this.ceWorker, Status.PROCESSING);
+ }
+
+ @Override
+ public void close() {
+ workerStatuses.put(ceWorker, Status.PAUSED);
+ }
+ }
+}
private final UuidFactory uuidFactory;
private final InternalCeQueue queue;
private final CeTaskProcessorRepository taskProcessorRepository;
- private final EnabledCeWorkerController enabledCeWorkerController;
+ private final CeWorkerController ceWorkerController;
private final CeWorker.ExecutionListener[] executionListeners;
private Set<CeWorker> ceWorkers = Collections.emptySet();
* Used by Pico when there is no {@link CeWorker.ExecutionListener} in the container.
*/
public CeWorkerFactoryImpl(InternalCeQueue queue, CeTaskProcessorRepository taskProcessorRepository,
- UuidFactory uuidFactory, EnabledCeWorkerController enabledCeWorkerController) {
- this(queue, taskProcessorRepository, uuidFactory, enabledCeWorkerController, new CeWorker.ExecutionListener[0]);
+ UuidFactory uuidFactory, CeWorkerController ceWorkerController) {
+ this(queue, taskProcessorRepository, uuidFactory, ceWorkerController, new CeWorker.ExecutionListener[0]);
}
public CeWorkerFactoryImpl(InternalCeQueue queue, CeTaskProcessorRepository taskProcessorRepository,
- UuidFactory uuidFactory, EnabledCeWorkerController enabledCeWorkerController,
+ UuidFactory uuidFactory, CeWorkerController ceWorkerController,
CeWorker.ExecutionListener[] executionListeners) {
this.queue = queue;
this.taskProcessorRepository = taskProcessorRepository;
this.uuidFactory = uuidFactory;
- this.enabledCeWorkerController = enabledCeWorkerController;
+ this.ceWorkerController = ceWorkerController;
this.executionListeners = executionListeners;
}
@Override
public CeWorker create(int ordinal) {
String uuid = uuidFactory.create();
- CeWorkerImpl ceWorker = new CeWorkerImpl(ordinal, uuid, queue, taskProcessorRepository, enabledCeWorkerController, executionListeners);
+ CeWorkerImpl ceWorker = new CeWorkerImpl(ordinal, uuid, queue, taskProcessorRepository, ceWorkerController, executionListeners);
ceWorkers = Stream.concat(ceWorkers.stream(), Stream.of(ceWorker)).collect(MoreCollectors.toSet(ceWorkers.size() + 1));
return ceWorker;
}
import java.util.List;
import java.util.Map;
import java.util.Optional;
+import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
+import java.util.function.Function;
import java.util.function.Supplier;
import javax.annotation.CheckForNull;
import javax.annotation.Nullable;
import org.sonar.api.utils.log.Loggers;
import org.sonar.ce.queue.InternalCeQueue;
import org.sonar.ce.task.CeTask;
+import org.sonar.ce.task.CeTaskInterruptedException;
import org.sonar.ce.task.CeTaskResult;
import org.sonar.ce.task.taskprocessor.CeTaskProcessor;
import org.sonar.core.util.logs.Profiler;
import static com.google.common.base.Preconditions.checkArgument;
import static java.lang.String.format;
+import static org.sonar.ce.task.CeTaskInterruptedException.isTaskInterruptedException;
import static org.sonar.ce.taskprocessor.CeWorker.Result.DISABLED;
import static org.sonar.ce.taskprocessor.CeWorker.Result.NO_TASK;
import static org.sonar.ce.taskprocessor.CeWorker.Result.TASK_PROCESSED;
+import static org.sonar.db.ce.CeActivityDto.Status.FAILED;
public class CeWorkerImpl implements CeWorker {
private final String uuid;
private final InternalCeQueue queue;
private final CeTaskProcessorRepository taskProcessorRepository;
- private final EnabledCeWorkerController enabledCeWorkerController;
+ private final CeWorkerController ceWorkerController;
private final List<ExecutionListener> listeners;
+ private final AtomicReference<RunningState> runningState = new AtomicReference<>();
public CeWorkerImpl(int ordinal, String uuid,
InternalCeQueue queue, CeTaskProcessorRepository taskProcessorRepository,
- EnabledCeWorkerController enabledCeWorkerController,
+ CeWorkerController ceWorkerController,
ExecutionListener... listeners) {
this.ordinal = checkOrdinal(ordinal);
this.uuid = uuid;
this.queue = queue;
this.taskProcessorRepository = taskProcessorRepository;
- this.enabledCeWorkerController = enabledCeWorkerController;
+ this.ceWorkerController = ceWorkerController;
this.listeners = Arrays.asList(listeners);
}
@Override
public Result call() {
- return withCustomizedThreadName(this::findAndProcessTask);
+ try (TrackRunningState trackRunningState = new TrackRunningState(this::findAndProcessTask)) {
+ return trackRunningState.get();
+ }
}
- private <T> T withCustomizedThreadName(Supplier<T> supplier) {
- Thread currentThread = Thread.currentThread();
- String oldName = currentThread.getName();
- try {
- currentThread.setName(String.format("Worker %s (UUID=%s) on %s", getOrdinal(), getUUID(), oldName));
- return supplier.get();
- } finally {
- currentThread.setName(oldName);
+ @Override
+ public int getOrdinal() {
+ return ordinal;
+ }
+
+ @Override
+ public String getUUID() {
+ return uuid;
+ }
+
+ @Override
+ public boolean isExecutedBy(Thread thread) {
+ return Optional.ofNullable(runningState.get())
+ .filter(state -> state.runningThread.equals(thread))
+ .isPresent();
+ }
+
+ @Override
+ public Optional<CeTask> getCurrentTask() {
+ return Optional.ofNullable(runningState.get())
+ .flatMap(RunningState::getTask);
+ }
+
+ private class TrackRunningState implements AutoCloseable, Supplier<Result> {
+ private final RunningState localRunningState;
+ private final Function<RunningState, Result> delegate;
+ private final String oldName;
+
+ private TrackRunningState(Function<RunningState, Result> delegate) {
+ Thread currentThread = Thread.currentThread();
+ localRunningState = new RunningState(currentThread);
+ if (!runningState.compareAndSet(null, localRunningState)) {
+ LOG.warn("Worker {} (UUID=%s) starts executing with new Thread {} while running state isn't null. " +
+ "Forcefully updating Workers's running state to new Thread.",
+ getOrdinal(), getUUID(), currentThread);
+ runningState.set(localRunningState);
+ }
+ this.delegate = delegate;
+ this.oldName = currentThread.getName();
+ }
+
+ @Override
+ public Result get() {
+ localRunningState.runningThread.setName(String.format("Worker %s (UUID=%s) on %s", getOrdinal(), getUUID(), oldName));
+ return delegate.apply(localRunningState);
+ }
+
+ @Override
+ public void close() {
+ localRunningState.runningThread.setName(oldName);
+ if (!runningState.compareAndSet(localRunningState, null)) {
+ LOG.warn("Worker {} (UUID=%s) ending execution in Thread {} while running state has already changed." +
+ " Keeping this new state.",
+ getOrdinal(), getUUID(), localRunningState.runningThread);
+ }
}
}
- private Result findAndProcessTask() {
- if (!enabledCeWorkerController.isEnabled(this)) {
+ private Result findAndProcessTask(RunningState localRunningState) {
+ if (!ceWorkerController.isEnabled(this)) {
return DISABLED;
}
Optional<CeTask> ceTask = tryAndFindTaskToExecute();
return NO_TASK;
}
- try (EnabledCeWorkerController.ProcessingRecorderHook processing = enabledCeWorkerController.registerProcessingFor(this)) {
- executeTask(ceTask.get());
+ try (CeWorkerController.ProcessingRecorderHook processing = ceWorkerController.registerProcessingFor(this);
+ ExecuteTask executeTask = new ExecuteTask(localRunningState, ceTask.get())) {
+ executeTask.run();
} catch (Exception e) {
LOG.error(format("An error occurred while executing task with uuid '%s'", ceTask.get().getUuid()), e);
}
return Optional.empty();
}
- @Override
- public int getOrdinal() {
- return ordinal;
- }
+ private final class ExecuteTask implements Runnable, AutoCloseable {
+ private final CeTask task;
+ private final RunningState localRunningState;
+ private final Profiler ceProfiler;
+ private CeActivityDto.Status status = FAILED;
+ private CeTaskResult taskResult = null;
+ private Throwable error = null;
- @Override
- public String getUUID() {
- return uuid;
- }
+ private ExecuteTask(RunningState localRunningState, CeTask task) {
+ this.task = task;
+ this.localRunningState = localRunningState;
+ this.ceProfiler = startLogProfiler(task);
+ }
- private void executeTask(CeTask task) {
- callListeners(t -> t.onStart(task));
- Profiler ceProfiler = startLogProfiler(task);
+ @Override
+ public void run() {
+ beforeExecute();
+ executeTask();
+ }
- CeActivityDto.Status status = CeActivityDto.Status.FAILED;
- CeTaskResult taskResult = null;
- Throwable error = null;
- try {
- // TODO delegate the message to the related task processor, according to task type
- Optional<CeTaskProcessor> taskProcessor = taskProcessorRepository.getForCeTask(task);
- if (taskProcessor.isPresent()) {
- taskResult = taskProcessor.get().process(task);
- status = CeActivityDto.Status.SUCCESS;
- } else {
- LOG.error("No CeTaskProcessor is defined for task of type {}. Plugin configuration may have changed", task.getType());
- status = CeActivityDto.Status.FAILED;
+ @Override
+ public void close() {
+ afterExecute();
+ }
+
+ private void beforeExecute() {
+ localRunningState.setTask(task);
+ callListeners(t -> t.onStart(task));
+ }
+
+ private void executeTask() {
+ try {
+ // TODO delegate the message to the related task processor, according to task type
+ Optional<CeTaskProcessor> taskProcessor = taskProcessorRepository.getForCeTask(task);
+ if (taskProcessor.isPresent()) {
+ taskResult = taskProcessor.get().process(task);
+ status = CeActivityDto.Status.SUCCESS;
+ } else {
+ LOG.error("No CeTaskProcessor is defined for task of type {}. Plugin configuration may have changed", task.getType());
+ status = FAILED;
+ }
+ } catch (MessageException e) {
+ // error
+ error = e;
+ } catch (Throwable e) {
+ Optional<CeTaskInterruptedException> taskInterruptedException = isTaskInterruptedException(e);
+ if (taskInterruptedException.isPresent()) {
+ LOG.trace("Task interrupted", e);
+ CeTaskInterruptedException exception = taskInterruptedException.get();
+ CeActivityDto.Status interruptionStatus = exception.getStatus();
+ status = interruptionStatus;
+ error = (interruptionStatus == FAILED ? exception : null);
+ } else {
+ // error
+ LOG.error("Failed to execute task {}", task.getUuid(), e);
+ error = e;
+ }
}
- } catch (MessageException e) {
- // error
- error = e;
- } catch (Throwable e) {
- // error
- LOG.error("Failed to execute task {}", task.getUuid(), e);
- error = e;
- } finally {
+ }
+
+ private void afterExecute() {
+ localRunningState.setTask(null);
finalizeTask(task, ceProfiler, status, taskResult, error);
}
- }
- private void callListeners(Consumer<ExecutionListener> call) {
- listeners.forEach(listener -> {
+ private void finalizeTask(CeTask task, Profiler ceProfiler, CeActivityDto.Status status,
+ @Nullable CeTaskResult taskResult, @Nullable Throwable error) {
try {
- call.accept(listener);
- } catch (Throwable t) {
- LOG.error(format("Call to listener %s failed.", listener.getClass().getSimpleName()), t);
+ queue.remove(task, status, taskResult, error);
+ } catch (Exception e) {
+ if (error != null) {
+ e.addSuppressed(error);
+ }
+ LOG.error(format("Failed to finalize task with uuid '%s' and persist its state to db", task.getUuid()), e);
+ } finally {
+ // finalize
+ stopLogProfiler(ceProfiler, status);
+ callListeners(t -> t.onEnd(task, status, taskResult, error));
}
- });
- }
+ }
- private void finalizeTask(CeTask task, Profiler ceProfiler, CeActivityDto.Status status,
- @Nullable CeTaskResult taskResult, @Nullable Throwable error) {
- try {
- queue.remove(task, status, taskResult, error);
- } catch (Exception e) {
- if (error != null) {
- e.addSuppressed(error);
- }
- LOG.error(format("Failed to finalize task with uuid '%s' and persist its state to db", task.getUuid()), e);
- } finally {
- // finalize
- stopLogProfiler(ceProfiler, status);
- callListeners(t -> t.onEnd(task, status, taskResult, error));
+ private void callListeners(Consumer<ExecutionListener> call) {
+ listeners.forEach(listener -> {
+ try {
+ call.accept(listener);
+ } catch (Throwable t) {
+ LOG.error(format("Call to listener %s failed.", listener.getClass().getSimpleName()), t);
+ }
+ });
}
}
profiler.addContext("status", status.name());
profiler.stopInfo("Executed task");
}
+
+ private static final class RunningState {
+ private final Thread runningThread;
+ private CeTask task;
+
+ private RunningState(Thread runningThread) {
+ this.runningThread = runningThread;
+ }
+
+ public Optional<CeTask> getTask() {
+ return Optional.ofNullable(task);
+ }
+
+ public void setTask(@Nullable CeTask task) {
+ this.task = task;
+ }
+ }
+
}
+++ /dev/null
-/*
- * SonarQube
- * Copyright (C) 2009-2018 SonarSource SA
- * mailto:info AT sonarsource DOT com
- *
- * This program is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License as published by the Free Software Foundation; either
- * version 3 of the License, or (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public License
- * along with this program; if not, write to the Free Software Foundation,
- * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
- */
-package org.sonar.ce.taskprocessor;
-
-/**
- * This class is responsible of knowing/deciding which {@link CeWorker} is enabled and should actually try and find a
- * task to process.
- */
-public interface EnabledCeWorkerController {
- interface ProcessingRecorderHook extends AutoCloseable {
- }
-
- /**
- * Returns {@code true} if the specified {@link CeWorker} is enabled
- */
- boolean isEnabled(CeWorker ceWorker);
-
- ProcessingRecorderHook registerProcessingFor(CeWorker ceWorker);
-
- /**
- * Whether at least one worker is being processed a task or not.
- * Returns {@code false} when all workers are waiting for tasks
- * or are being stopped.
- */
- boolean hasAtLeastOneProcessingWorker();
-}
+++ /dev/null
-/*
- * SonarQube
- * Copyright (C) 2009-2018 SonarSource SA
- * mailto:info AT sonarsource DOT com
- *
- * This program is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License as published by the Free Software Foundation; either
- * version 3 of the License, or (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public License
- * along with this program; if not, write to the Free Software Foundation,
- * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
- */
-package org.sonar.ce.taskprocessor;
-
-import java.util.concurrent.ConcurrentHashMap;
-import org.sonar.api.utils.log.Loggers;
-import org.sonar.ce.configuration.CeConfiguration;
-
-public class EnabledCeWorkerControllerImpl implements EnabledCeWorkerController {
- private final ConcurrentHashMap<CeWorker, Status> map = new ConcurrentHashMap<>();
- private final CeConfiguration ceConfiguration;
-
- enum Status {
- PROCESSING, PAUSED
- }
-
- public EnabledCeWorkerControllerImpl(CeConfiguration ceConfiguration) {
- this.ceConfiguration = ceConfiguration;
- logEnabledWorkerCount();
- }
-
- private void logEnabledWorkerCount() {
- int workerCount = ceConfiguration.getWorkerCount();
- if (workerCount > 1) {
- Loggers.get(EnabledCeWorkerController.class).info("Compute Engine will use {} concurrent workers to process tasks", workerCount);
- }
- }
-
- @Override
- public ProcessingRecorderHook registerProcessingFor(CeWorker ceWorker) {
- return new ProcessingRecorderHookImpl(ceWorker);
- }
-
- @Override
- public boolean hasAtLeastOneProcessingWorker() {
- return map.entrySet().stream().anyMatch(e -> e.getValue() == Status.PROCESSING);
- }
-
- /**
- * Returns {@code true} if {@link CeWorker#getOrdinal() worker ordinal} is strictly less than
- * {@link CeConfiguration#getWorkerCount()}.
- *
- * This method does not fail if ordinal is invalid (ie. < 0).
- */
- @Override
- public boolean isEnabled(CeWorker ceWorker) {
- return ceWorker.getOrdinal() < ceConfiguration.getWorkerCount();
- }
-
- private class ProcessingRecorderHookImpl implements ProcessingRecorderHook {
- private final CeWorker ceWorker;
-
- private ProcessingRecorderHookImpl(CeWorker ceWorker) {
- this.ceWorker = ceWorker;
- map.put(this.ceWorker, Status.PROCESSING);
- }
-
- @Override
- public void close() {
- map.put(ceWorker, Status.PAUSED);
- }
- }
-}
--- /dev/null
+/*
+ * SonarQube
+ * Copyright (C) 2009-2018 SonarSource SA
+ * mailto:info AT sonarsource DOT com
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 3 of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this program; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ */
+package org.sonar.ce.taskprocessor;
+
+import org.sonar.ce.task.CeTask;
+import org.sonar.ce.task.CeTaskCanceledException;
+import org.sonar.ce.task.CeTaskInterruptedException;
+import org.sonar.ce.task.CeTaskInterrupter;
+
+/**
+ * An implementation of {@link CeTaskInterrupter} which will only interrupt the processing if the current thread
+ * has been interrupted.
+ *
+ * @see Thread#isInterrupted()
+ */
+public class SimpleCeTaskInterrupter implements CeTaskInterrupter {
+ @Override
+ public void check(Thread currentThread) throws CeTaskInterruptedException {
+ if (currentThread.isInterrupted()) {
+ throw new CeTaskCanceledException(currentThread);
+ }
+ }
+
+ @Override
+ public void onStart(CeTask ceTask) {
+ // nothing to do
+ }
+
+ @Override
+ public void onEnd(CeTask ceTask) {
+ // nothing to do
+ }
+}
--- /dev/null
+/*
+ * SonarQube
+ * Copyright (C) 2009-2018 SonarSource SA
+ * mailto:info AT sonarsource DOT com
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 3 of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this program; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ */
+package org.sonar.ce.taskprocessor;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import org.sonar.api.utils.System2;
+import org.sonar.api.utils.log.Loggers;
+import org.sonar.ce.task.CeTask;
+import org.sonar.ce.task.CeTaskInterruptedException;
+import org.sonar.ce.task.CeTaskTimeoutException;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkState;
+import static java.lang.String.format;
+
+/**
+ * An implementation of {@link org.sonar.ce.task.CeTaskInterrupter} which interrupts the processing of the task
+ * if:
+ * <ul>
+ * <li>the thread has been interrupted</li>
+ * <li>it's been running for more than a certain, configurable, amount of time</li>
+ * </ul>
+ */
+public class TimeoutCeTaskInterrupter extends SimpleCeTaskInterrupter {
+ private final long taskTimeoutThreshold;
+ private final CeWorkerController ceWorkerController;
+ private final System2 system2;
+ private final Map<String, Long> startTimestampByCeTaskUuid = new HashMap<>();
+
+ public TimeoutCeTaskInterrupter(long taskTimeoutThreshold, CeWorkerController ceWorkerController, System2 system2) {
+ checkArgument(taskTimeoutThreshold >= 1, "threshold must be >= 1");
+ Loggers.get(TimeoutCeTaskInterrupter.class).info("Compute Engine Task timeout enabled: {} ms", taskTimeoutThreshold);
+
+ this.taskTimeoutThreshold = taskTimeoutThreshold;
+ this.ceWorkerController = ceWorkerController;
+ this.system2 = system2;
+ }
+
+ @Override
+ public void check(Thread currentThread) throws CeTaskInterruptedException {
+ super.check(currentThread);
+
+ computeTimeOutOf(taskOf(currentThread))
+ .ifPresent(timeout -> {
+ throw new CeTaskTimeoutException(format("Execution of task timed out after %s ms", timeout));
+ });
+ }
+
+ private Optional<Long> computeTimeOutOf(CeTask ceTask) {
+ Long startTimestamp = startTimestampByCeTaskUuid.get(ceTask.getUuid());
+ checkState(startTimestamp != null, "No start time recorded for task %s", ceTask.getUuid());
+
+ long duration = system2.now() - startTimestamp;
+ return Optional.of(duration)
+ .filter(t -> t > taskTimeoutThreshold);
+ }
+
+ private CeTask taskOf(Thread currentThread) {
+ return ceWorkerController.getCeWorkerIn(currentThread)
+ .flatMap(CeWorker::getCurrentTask)
+ .orElseThrow(() -> new IllegalStateException(format("Could not find the CeTask being executed in thread '%s'", currentThread.getName())));
+ }
+
+ @Override
+ public void onStart(CeTask ceTask) {
+ long now = system2.now();
+ Long existingTimestamp = startTimestampByCeTaskUuid.put(ceTask.getUuid(), now);
+ if (existingTimestamp != null) {
+ Loggers.get(TimeoutCeTaskInterrupter.class)
+ .warn("Notified of start of execution of task %s but start had already been recorded at %s. Recording new start at %s",
+ ceTask.getUuid(), existingTimestamp, now);
+ }
+ }
+
+ @Override
+ public void onEnd(CeTask ceTask) {
+ Long startTimestamp = startTimestampByCeTaskUuid.remove(ceTask.getUuid());
+ if (startTimestamp == null) {
+ Loggers.get(TimeoutCeTaskInterrupter.class)
+ .warn("Notified of end of execution of task %s but start wasn't recorded", ceTask.getUuid());
+ }
+ }
+
+}
+ 3 // content of CeHttpModule
+ 3 // content of CeTaskCommonsModule
+ 4 // content of ProjectAnalysisTaskModule
- + 7 // content of CeTaskProcessorModule
+ + 9 // content of CeTaskProcessorModule
+ 3 // content of ReportAnalysisFailureNotificationModule
+ 3 // CeCleaningModule + its content
+ 4 // WebhookModule
import org.junit.Test;
import org.sonar.ce.configuration.CeConfiguration;
import org.sonar.ce.taskprocessor.CeWorker;
+import org.sonar.ce.taskprocessor.CeWorkerController;
import org.sonar.ce.taskprocessor.CeWorkerFactory;
-import org.sonar.ce.taskprocessor.EnabledCeWorkerController;
import org.sonar.core.util.stream.MoreCollectors;
import org.sonar.process.systeminfo.protobuf.ProtobufSystemInfo;
})
.collect(MoreCollectors.toSet());
- private EnabledCeWorkerController enabledCeWorkerController = mock(EnabledCeWorkerController.class);
- private CeTasksMBeanImpl underTest = new CeTasksMBeanImpl(new DumbCEQueueStatus(), new DumbCeConfiguration(), new DumbCeWorkerFactory(), enabledCeWorkerController);
+ private CeWorkerController ceWorkerController = mock(CeWorkerController.class);
+ private CeTasksMBeanImpl underTest = new CeTasksMBeanImpl(new DumbCEQueueStatus(), new DumbCeConfiguration(), new DumbCeWorkerFactory(), ceWorkerController);
@Test
public void register_and_unregister() throws Exception {
for (CeWorker worker : WORKERS) {
if (i < enabledWorkerCount) {
enabledWorkers[i] = worker;
- when(enabledCeWorkerController.isEnabled(worker)).thenReturn(true);
+ when(ceWorkerController.isEnabled(worker)).thenReturn(true);
} else {
- when(enabledCeWorkerController.isEnabled(worker)).thenReturn(false);
+ when(ceWorkerController.isEnabled(worker)).thenReturn(false);
}
i++;
}
private SchedulerCall regularDelayedPoll = new SchedulerCall(ceWorker, 2000L, MILLISECONDS);
private SchedulerCall extendedDelayedPoll = new SchedulerCall(ceWorker, 30000L, MILLISECONDS);
private SchedulerCall notDelayedPoll = new SchedulerCall(ceWorker);
- private EnabledCeWorkerController ceWorkerController = new EnabledCeWorkerControllerImpl(ceConfiguration);
+ private CeWorkerController ceWorkerController = new CeWorkerControllerImpl(ceConfiguration);
private CeProcessingSchedulerImpl underTest = new CeProcessingSchedulerImpl(ceConfiguration, processingExecutorService, ceWorkerFactory, ceWorkerController);
--- /dev/null
+/*
+ * SonarQube
+ * Copyright (C) 2009-2018 SonarSource SA
+ * mailto:info AT sonarsource DOT com
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 3 of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this program; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ */
+package org.sonar.ce.taskprocessor;
+
+import java.lang.reflect.Field;
+import java.util.Random;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.sonar.api.config.internal.MapSettings;
+import org.sonar.api.utils.System2;
+import org.sonar.ce.task.CeTaskInterrupter;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Mockito.mock;
+
+public class CeTaskInterrupterProviderTest {
+ @Rule
+ public ExpectedException expectedException = ExpectedException.none();
+
+ private MapSettings settings = new MapSettings();
+ private CeWorkerController ceWorkerController = mock(CeWorkerController.class);
+ private System2 system2 = mock(System2.class);
+ private CeTaskInterrupterProvider underTest = new CeTaskInterrupterProvider();
+
+ @Test
+ public void provide_returns_a_SimpleCeTaskInterrupter_instance_if_configuration_is_empty() {
+ CeTaskInterrupter instance = underTest.provide(settings.asConfig(), ceWorkerController, system2);
+
+ assertThat(instance)
+ .isInstanceOf(SimpleCeTaskInterrupter.class);
+ }
+
+ @Test
+ public void provide_always_return_the_same_SimpleCeTaskInterrupter_instance() {
+ CeTaskInterrupter instance = underTest.provide(settings.asConfig(), ceWorkerController, system2);
+
+ assertThat(instance)
+ .isSameAs(underTest.provide(settings.asConfig(), ceWorkerController, system2))
+ .isSameAs(underTest.provide(new MapSettings().asConfig(), ceWorkerController, system2));
+ }
+
+ @Test
+ public void provide_returns_a_TimeoutCeTaskInterrupter_instance_if_property_taskTimeout_has_a_value() throws IllegalAccessException, NoSuchFieldException {
+ int timeout = 1 + new Random().nextInt(2222);
+ settings.setProperty("sonar.ce.task.timeoutSeconds", timeout);
+
+ CeTaskInterrupter instance = underTest.provide(settings.asConfig(), ceWorkerController, system2);
+
+ assertThat(instance)
+ .isInstanceOf(TimeoutCeTaskInterrupter.class);
+
+ assertThat(readField(instance, "taskTimeoutThreshold"))
+ .isEqualTo(timeout * 1_000L);
+ assertThat(readField(instance, "ceWorkerController"))
+ .isSameAs(ceWorkerController);
+ assertThat(readField(instance, "system2"))
+ .isSameAs(system2);
+ }
+
+ @Test
+ public void provide_fails_with_ISE_if_property_is_not_a_long() {
+ settings.setProperty("sonar.ce.task.timeoutSeconds", "foo");
+
+ expectedException.expect(IllegalStateException.class);
+ expectedException.expectMessage("The property 'sonar.ce.task.timeoutSeconds' is not an long value: For input string: \"foo\"");
+
+ underTest.provide(settings.asConfig(), ceWorkerController, system2);
+ }
+
+ @Test
+ public void provide_fails_with_ISE_if_property_is_zero() {
+ settings.setProperty("sonar.ce.task.timeoutSeconds", "0");
+
+ expectedException.expect(IllegalStateException.class);
+ expectedException.expectMessage("The property 'sonar.ce.task.timeoutSeconds' must be a long value >= 1. Got '0'");
+
+ underTest.provide(settings.asConfig(), ceWorkerController, system2);
+ }
+
+ @Test
+ public void provide_fails_with_ISE_if_property_is_less_than_zero() {
+ int negativeValue = -(1 + new Random().nextInt(1_212));
+ settings.setProperty("sonar.ce.task.timeoutSeconds", negativeValue);
+
+ expectedException.expect(IllegalStateException.class);
+ expectedException.expectMessage("The property 'sonar.ce.task.timeoutSeconds' must be a long value >= 1. Got '" + negativeValue + "'");
+
+ underTest.provide(settings.asConfig(), ceWorkerController, system2);
+ }
+
+ @Test
+ public void provide_always_return_the_same_TimeoutCeTaskInterrupter_instance() {
+ int timeout = 1 + new Random().nextInt(2222);
+ settings.setProperty("sonar.ce.task.timeoutSeconds", timeout);
+
+ CeTaskInterrupter instance = underTest.provide(settings.asConfig(), ceWorkerController, system2);
+
+ assertThat(instance)
+ .isSameAs(underTest.provide(settings.asConfig(), ceWorkerController, system2))
+ .isSameAs(underTest.provide(new MapSettings().setProperty("sonar.ce.task.timeoutSeconds", 999).asConfig(), ceWorkerController, system2));
+ }
+
+ private static Object readField(CeTaskInterrupter instance, String fieldName) throws NoSuchFieldException, IllegalAccessException {
+ Class<?> clazz = instance.getClass();
+ Field timeoutField = clazz.getDeclaredField(fieldName);
+ timeoutField.setAccessible(true);
+ return timeoutField.get(instance);
+ }
+}
--- /dev/null
+/*
+ * SonarQube
+ * Copyright (C) 2009-2018 SonarSource SA
+ * mailto:info AT sonarsource DOT com
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 3 of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this program; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ */
+package org.sonar.ce.taskprocessor;
+
+import java.util.Random;
+import org.junit.Test;
+import org.sonar.ce.task.CeTask;
+import org.sonar.ce.task.CeTaskInterrupter;
+import org.sonar.db.ce.CeActivityDto;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.ArgumentMatchers.same;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+
+public class CeTaskInterrupterWorkerExecutionListenerTest {
+ private CeTaskInterrupter ceTaskInterrupter = mock(CeTaskInterrupter.class);
+ private CeTaskInterrupterWorkerExecutionListener underTest = new CeTaskInterrupterWorkerExecutionListener(ceTaskInterrupter);
+
+
+ @Test
+ public void onStart_delegates_to_ceTaskInterrupter_onStart() {
+ CeTask ceTask = mock(CeTask.class);
+
+ underTest.onStart(ceTask);
+
+ verify(ceTaskInterrupter).onStart(same(ceTask));
+ }
+
+ @Test
+ public void onEnd_delegates_to_ceTaskInterrupter_onEnd() {
+ CeTask ceTask = mock(CeTask.class);
+ CeActivityDto.Status randomStatus = CeActivityDto.Status.values()[new Random().nextInt(CeActivityDto.Status.values().length)];
+
+ underTest.onEnd(ceTask, randomStatus, null, null);
+
+ verify(ceTaskInterrupter).onEnd(same(ceTask));
+ }
+}
import org.junit.Test;
import org.picocontainer.ComponentAdapter;
+import org.sonar.api.config.internal.MapSettings;
import org.sonar.ce.notification.ReportAnalysisFailureNotificationExecutionListener;
+import org.sonar.ce.task.CeTaskInterrupter;
import org.sonar.core.platform.ComponentContainer;
import static org.assertj.core.api.Assertions.assertThat;
.map(ComponentAdapter::getComponentImplementation))
.contains(ReportAnalysisFailureNotificationExecutionListener.class);
}
+
+ @Test
+ public void defines_CeTaskInterrupterProvider_object() {
+ ComponentContainer container = new ComponentContainer();
+
+ underTest.configure(container);
+
+
+ assertThat(container.getPicoContainer().getComponentAdapter(CeTaskInterrupter.class))
+ .isInstanceOf(CeTaskInterrupterProvider.class);
+ }
}
--- /dev/null
+/*
+ * SonarQube
+ * Copyright (C) 2009-2018 SonarSource SA
+ * mailto:info AT sonarsource DOT com
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 3 of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this program; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ */
+package org.sonar.ce.taskprocessor;
+
+import java.util.Random;
+import org.junit.Rule;
+import org.junit.Test;
+import org.sonar.api.utils.log.LogTester;
+import org.sonar.api.utils.log.LoggerLevel;
+import org.sonar.ce.configuration.CeConfigurationRule;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.reset;
+import static org.mockito.Mockito.when;
+
+public class CeWorkerControllerImplTest {
+ private Random random = new Random();
+ /** 1 <= workerCount <= 5 */
+ private int randomWorkerCount = 1 + random.nextInt(5);
+
+ @Rule
+ public CeConfigurationRule ceConfigurationRule = new CeConfigurationRule()
+ .setWorkerCount(randomWorkerCount);
+ @Rule
+ public LogTester logTester = new LogTester();
+
+ private CeWorker ceWorker = mock(CeWorker.class);
+ private CeWorkerControllerImpl underTest = new CeWorkerControllerImpl(ceConfigurationRule);
+
+ @Test
+ public void isEnabled_returns_true_if_worker_ordinal_is_less_than_CeConfiguration_workerCount() {
+ int ordinal = randomWorkerCount + Math.min(-1, -random.nextInt(randomWorkerCount));
+ when(ceWorker.getOrdinal()).thenReturn(ordinal);
+
+ assertThat(underTest.isEnabled(ceWorker))
+ .as("For ordinal " + ordinal + " and workerCount " + randomWorkerCount)
+ .isTrue();
+ }
+
+ @Test
+ public void isEnabled_returns_false_if_worker_ordinal_is_equal_to_CeConfiguration_workerCount() {
+ when(ceWorker.getOrdinal()).thenReturn(randomWorkerCount);
+
+ assertThat(underTest.isEnabled(ceWorker)).isFalse();
+ }
+
+ @Test
+ public void isEnabled_returns_true_if_ordinal_is_invalid() {
+ int ordinal = -1 - random.nextInt(3);
+ when(ceWorker.getOrdinal()).thenReturn(ordinal);
+
+ assertThat(underTest.isEnabled(ceWorker))
+ .as("For invalid ordinal " + ordinal + " and workerCount " + randomWorkerCount)
+ .isTrue();
+ }
+
+ @Test
+ public void constructor_writes_no_info_log_if_workerCount_is_1() {
+ ceConfigurationRule.setWorkerCount(1);
+ logTester.clear();
+
+ new CeWorkerControllerImpl(ceConfigurationRule);
+
+ assertThat(logTester.logs()).isEmpty();
+ }
+
+ @Test
+ public void constructor_writes_info_log_if_workerCount_is_greater_than_1() {
+ int newWorkerCount = randomWorkerCount + 1;
+ ceConfigurationRule.setWorkerCount(newWorkerCount);
+ logTester.clear();
+
+ new CeWorkerControllerImpl(ceConfigurationRule);
+
+ verifyInfoLog(newWorkerCount);
+ }
+
+ @Test
+ public void workerCount_is_always_reloaded() {
+ when(ceWorker.getOrdinal()).thenReturn(1);
+
+ ceConfigurationRule.setWorkerCount(1);
+ assertThat(underTest.isEnabled(ceWorker)).isFalse();
+
+ ceConfigurationRule.setWorkerCount(2);
+ assertThat(underTest.isEnabled(ceWorker)).isTrue();
+ }
+
+ @Test
+ public void getCeWorkerIn_returns_empty_if_worker_is_unregistered_in_CeWorkerController() {
+ CeWorker ceWorker = mock(CeWorker.class);
+ Thread currentThread = Thread.currentThread();
+ Thread otherThread = new Thread();
+
+ mockWorkerIsRunningOnNoThread(ceWorker);
+ assertThat(underTest.getCeWorkerIn(currentThread)).isEmpty();
+ assertThat(underTest.getCeWorkerIn(otherThread)).isEmpty();
+
+ mockWorkerIsRunningOnThread(ceWorker, currentThread);
+ assertThat(underTest.getCeWorkerIn(currentThread)).isEmpty();
+ assertThat(underTest.getCeWorkerIn(otherThread)).isEmpty();
+
+ mockWorkerIsRunningOnThread(ceWorker, otherThread);
+ assertThat(underTest.getCeWorkerIn(currentThread)).isEmpty();
+ assertThat(underTest.getCeWorkerIn(otherThread)).isEmpty();
+ }
+
+ @Test
+ public void getCeWorkerIn_returns_empty_if_worker_registered_in_CeWorkerController_but_has_no_current_thread() {
+ CeWorker ceWorker = mock(CeWorker.class);
+ Thread currentThread = Thread.currentThread();
+ Thread otherThread = new Thread();
+
+ underTest.registerProcessingFor(ceWorker);
+
+ mockWorkerIsRunningOnNoThread(ceWorker);
+ assertThat(underTest.getCeWorkerIn(currentThread)).isEmpty();
+ assertThat(underTest.getCeWorkerIn(otherThread)).isEmpty();
+ }
+
+ @Test
+ public void getCeWorkerIn_returns_thread_if_worker_registered_in_CeWorkerController_but_has_a_current_thread() {
+ CeWorker ceWorker = mock(CeWorker.class);
+ Thread currentThread = Thread.currentThread();
+ Thread otherThread = new Thread();
+
+ underTest.registerProcessingFor(ceWorker);
+
+ mockWorkerIsRunningOnThread(ceWorker, currentThread);
+ assertThat(underTest.getCeWorkerIn(currentThread)).contains(ceWorker);
+ assertThat(underTest.getCeWorkerIn(otherThread)).isEmpty();
+
+ mockWorkerIsRunningOnThread(ceWorker, otherThread);
+ assertThat(underTest.getCeWorkerIn(currentThread)).isEmpty();
+ assertThat(underTest.getCeWorkerIn(otherThread)).contains(ceWorker);
+ }
+
+ private void mockWorkerIsRunningOnThread(CeWorker ceWorker, Thread thread) {
+ reset(ceWorker);
+ when(ceWorker.isExecutedBy(thread)).thenReturn(true);
+ }
+
+ private void mockWorkerIsRunningOnNoThread(CeWorker ceWorker) {
+ reset(ceWorker);
+ when(ceWorker.isExecutedBy(any())).thenReturn(false);
+ }
+
+ private void verifyInfoLog(int workerCount) {
+ assertThat(logTester.logs()).hasSize(1);
+ assertThat(logTester.logs(LoggerLevel.INFO))
+ .containsOnly("Compute Engine will use " + workerCount + " concurrent workers to process tasks");
+ }
+}
public class CeWorkerFactoryImplTest {
private int randomOrdinal = new Random().nextInt(20);
private CeWorkerFactoryImpl underTest = new CeWorkerFactoryImpl(mock(InternalCeQueue.class),
- mock(CeTaskProcessorRepository.class), UuidFactoryImpl.INSTANCE, mock(EnabledCeWorkerController.class));
+ mock(CeTaskProcessorRepository.class), UuidFactoryImpl.INSTANCE, mock(CeWorkerController.class));
@Test
public void create_return_CeWorker_object_with_specified_ordinal() {
CeWorker.ExecutionListener executionListener1 = mock(CeWorker.ExecutionListener.class);
CeWorker.ExecutionListener executionListener2 = mock(CeWorker.ExecutionListener.class);
CeWorkerFactoryImpl underTest = new CeWorkerFactoryImpl(mock(InternalCeQueue.class),
- mock(CeTaskProcessorRepository.class), UuidFactoryImpl.INSTANCE, mock(EnabledCeWorkerController.class),
+ mock(CeTaskProcessorRepository.class), UuidFactoryImpl.INSTANCE, mock(CeWorkerController.class),
new CeWorker.ExecutionListener[] {executionListener1, executionListener2});
CeWorker ceWorker = underTest.create(randomOrdinal);
import java.util.Map;
import java.util.Optional;
import java.util.Random;
+import java.util.Set;
import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
import javax.annotation.CheckForNull;
import javax.annotation.Nullable;
-import org.apache.commons.lang.RandomStringUtils;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.InOrder;
import org.mockito.Mockito;
+import org.mockito.stubbing.Answer;
import org.sonar.api.utils.MessageException;
import org.sonar.api.utils.System2;
import org.sonar.api.utils.internal.TestSystem2;
import org.sonar.ce.task.CeTask;
import org.sonar.ce.task.CeTaskResult;
import org.sonar.ce.task.projectanalysis.taskprocessor.ReportTaskProcessor;
+import org.sonar.ce.task.taskprocessor.CeTaskProcessor;
import org.sonar.db.DbSession;
import org.sonar.db.DbTester;
import org.sonar.db.ce.CeActivityDto;
import org.sonar.db.user.UserTesting;
import org.sonar.server.organization.BillingValidations;
+import static org.apache.commons.lang.RandomStringUtils.randomAlphabetic;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyString;
private ReportTaskProcessor taskProcessor = mock(ReportTaskProcessor.class);
private CeWorker.ExecutionListener executionListener1 = mock(CeWorker.ExecutionListener.class);
private CeWorker.ExecutionListener executionListener2 = mock(CeWorker.ExecutionListener.class);
- private EnabledCeWorkerController enabledCeWorkerController = mock(EnabledCeWorkerController.class);
+ private CeWorkerController ceWorkerController = mock(CeWorkerController.class);
private ArgumentCaptor<String> workerUuidCaptor = ArgumentCaptor.forClass(String.class);
private int randomOrdinal = new Random().nextInt(50);
private String workerUuid = UUID.randomUUID().toString();
- private CeWorker underTest = new CeWorkerImpl(randomOrdinal, workerUuid, queue, taskProcessorRepository, enabledCeWorkerController,
+ private CeWorker underTest = new CeWorkerImpl(randomOrdinal, workerUuid, queue, taskProcessorRepository, ceWorkerController,
executionListener1, executionListener2);
- private CeWorker underTestNoListener = new CeWorkerImpl(randomOrdinal, workerUuid, queue, taskProcessorRepository, enabledCeWorkerController);
+ private CeWorker underTestNoListener = new CeWorkerImpl(randomOrdinal, workerUuid, queue, taskProcessorRepository, ceWorkerController);
private InOrder inOrder = Mockito.inOrder(taskProcessor, queue, executionListener1, executionListener2);
private final CeTask.User submitter = new CeTask.User("UUID_USER_1", "LOGIN_1");
@Before
public void setUp() {
- when(enabledCeWorkerController.isEnabled(any(CeWorker.class))).thenReturn(true);
+ when(ceWorkerController.isEnabled(any(CeWorker.class))).thenReturn(true);
}
@Test
expectedException.expect(IllegalArgumentException.class);
expectedException.expectMessage("Ordinal must be >= 0");
- new CeWorkerImpl(-1 - new Random().nextInt(20), workerUuid, queue, taskProcessorRepository, enabledCeWorkerController);
+ new CeWorkerImpl(-1 - new Random().nextInt(20), workerUuid, queue, taskProcessorRepository, ceWorkerController);
}
@Test
public void getUUID_must_return_the_uuid_of_constructor() {
String uuid = UUID.randomUUID().toString();
- CeWorker underTest = new CeWorkerImpl(randomOrdinal, uuid, queue, taskProcessorRepository, enabledCeWorkerController);
+ CeWorker underTest = new CeWorkerImpl(randomOrdinal, uuid, queue, taskProcessorRepository, ceWorkerController);
assertThat(underTest.getUUID()).isEqualTo(uuid);
}
@Test
public void worker_disabled() throws Exception {
- reset(enabledCeWorkerController);
- when(enabledCeWorkerController.isEnabled(underTest)).thenReturn(false);
+ reset(ceWorkerController);
+ when(ceWorkerController.isEnabled(underTest)).thenReturn(false);
assertThat(underTest.call()).isEqualTo(DISABLED);
@Test
public void worker_disabled_no_listener() throws Exception {
- reset(enabledCeWorkerController);
- when(enabledCeWorkerController.isEnabled(underTest)).thenReturn(false);
+ reset(ceWorkerController);
+ when(ceWorkerController.isEnabled(underTest)).thenReturn(false);
assertThat(underTestNoListener.call()).isEqualTo(DISABLED);
@Test
public void call_sets_and_restores_thread_name_with_information_of_worker_when_there_is_no_task_to_process() throws Exception {
- String threadName = RandomStringUtils.randomAlphabetic(3);
+ String threadName = randomAlphabetic(3);
when(queue.peek(anyString())).thenAnswer(invocation -> {
assertThat(Thread.currentThread().getName())
.isEqualTo("Worker " + randomOrdinal + " (UUID=" + workerUuid + ") on " + threadName);
@Test
public void call_sets_and_restores_thread_name_with_information_of_worker_when_a_task_is_processed() throws Exception {
- String threadName = RandomStringUtils.randomAlphabetic(3);
+ String threadName = randomAlphabetic(3);
when(queue.peek(anyString())).thenAnswer(invocation -> {
assertThat(Thread.currentThread().getName())
.isEqualTo("Worker " + randomOrdinal + " (UUID=" + workerUuid + ") on " + threadName);
@Test
public void call_sets_and_restores_thread_name_with_information_of_worker_when_an_error_occurs() throws Exception {
- String threadName = RandomStringUtils.randomAlphabetic(3);
+ String threadName = randomAlphabetic(3);
CeTask ceTask = createCeTask(submitter);
when(queue.peek(anyString())).thenAnswer(invocation -> {
assertThat(Thread.currentThread().getName())
@Test
public void call_sets_and_restores_thread_name_with_information_of_worker_when_worker_is_disabled() throws Exception {
- reset(enabledCeWorkerController);
- when(enabledCeWorkerController.isEnabled(underTest)).thenReturn(false);
+ reset(ceWorkerController);
+ when(ceWorkerController.isEnabled(underTest)).thenReturn(false);
- String threadName = RandomStringUtils.randomAlphabetic(3);
+ String threadName = randomAlphabetic(3);
Thread newThread = createThreadNameVerifyingThread(threadName);
newThread.start();
assertThat(((Exception) arg1).getSuppressed()).containsOnly(ex);
}
+ @Test
+ public void isExecutedBy_returns_false_when_no_interaction_with_instance() {
+ assertThat(underTest.isExecutedBy(Thread.currentThread())).isFalse();
+ assertThat(underTest.isExecutedBy(new Thread())).isFalse();
+ }
+
+ @Test
+ public void isExecutedBy_returns_false_unless_a_thread_is_currently_calling_call() throws InterruptedException {
+ CountDownLatch inCallLatch = new CountDownLatch(1);
+ CountDownLatch assertionsDoneLatch = new CountDownLatch(1);
+ // mock long running peek(String) call => Thread is executing call() but not running a task
+ when(queue.peek(anyString())).thenAnswer((Answer<Optional<CeTask>>) invocation -> {
+ inCallLatch.countDown();
+ try {
+ assertionsDoneLatch.await(10, TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ return Optional.empty();
+ });
+ Thread t = callCallInNewThread(underTest);
+
+ try {
+ t.start();
+
+ inCallLatch.await(10, TimeUnit.SECONDS);
+ assertThat(underTest.isExecutedBy(Thread.currentThread())).isFalse();
+ assertThat(underTest.isExecutedBy(new Thread())).isFalse();
+ assertThat(underTest.isExecutedBy(t)).isTrue();
+ } finally {
+ assertionsDoneLatch.countDown();
+ t.join();
+ }
+
+ assertThat(underTest.isExecutedBy(Thread.currentThread())).isFalse();
+ assertThat(underTest.isExecutedBy(new Thread())).isFalse();
+ assertThat(underTest.isExecutedBy(t)).isFalse();
+ }
+
+ @Test
+ public void isExecutedBy_returns_false_unless_a_thread_is_currently_executing_a_task() throws InterruptedException {
+ CountDownLatch inCallLatch = new CountDownLatch(1);
+ CountDownLatch assertionsDoneLatch = new CountDownLatch(1);
+ String taskType = randomAlphabetic(12);
+ CeTask ceTask = mock(CeTask.class);
+ when(ceTask.getType()).thenReturn(taskType);
+ when(queue.peek(anyString())).thenReturn(Optional.of(ceTask));
+ taskProcessorRepository.setProcessorForTask(taskType, new SimpleCeTaskProcessor() {
+ @CheckForNull
+ @Override
+ public CeTaskResult process(CeTask task) {
+ inCallLatch.countDown();
+ try {
+ assertionsDoneLatch.await(10, TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ return null;
+ }
+ });
+ Thread t = callCallInNewThread(underTest);
+
+ try {
+ t.start();
+
+ inCallLatch.await(10, TimeUnit.SECONDS);
+ assertThat(underTest.isExecutedBy(Thread.currentThread())).isFalse();
+ assertThat(underTest.isExecutedBy(new Thread())).isFalse();
+ assertThat(underTest.isExecutedBy(t)).isTrue();
+ } finally {
+ assertionsDoneLatch.countDown();
+ t.join();
+ }
+
+ assertThat(underTest.isExecutedBy(Thread.currentThread())).isFalse();
+ assertThat(underTest.isExecutedBy(new Thread())).isFalse();
+ assertThat(underTest.isExecutedBy(t)).isFalse();
+ }
+
+ @Test
+ public void getCurrentTask_returns_empty_when_no_interaction_with_instance() {
+ assertThat(underTest.getCurrentTask()).isEmpty();
+ }
+
+ @Test
+ public void getCurrentTask_returns_empty_when_a_thread_is_currently_calling_call_but_not_executing_a_task() throws InterruptedException {
+ CountDownLatch inCallLatch = new CountDownLatch(1);
+ CountDownLatch assertionsDoneLatch = new CountDownLatch(1);
+ // mock long running peek(String) call => Thread is executing call() but not running a task
+ when(queue.peek(anyString())).thenAnswer((Answer<Optional<CeTask>>) invocation -> {
+ inCallLatch.countDown();
+ try {
+ assertionsDoneLatch.await(10, TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ return Optional.empty();
+ });
+ Thread t = callCallInNewThread(underTest);
+
+ try {
+ t.start();
+
+ inCallLatch.await(10, TimeUnit.SECONDS);
+ assertThat(underTest.getCurrentTask()).isEmpty();
+ } finally {
+ assertionsDoneLatch.countDown();
+ t.join();
+ }
+
+ assertThat(underTest.getCurrentTask()).isEmpty();
+ }
+
+ @Test
+ public void getCurrentTask_returns_empty_unless_a_thread_is_currently_executing_a_task() throws InterruptedException {
+ CountDownLatch inCallLatch = new CountDownLatch(1);
+ CountDownLatch assertionsDoneLatch = new CountDownLatch(1);
+ String taskType = randomAlphabetic(12);
+ CeTask ceTask = mock(CeTask.class);
+ when(ceTask.getType()).thenReturn(taskType);
+ when(queue.peek(anyString())).thenReturn(Optional.of(ceTask));
+ taskProcessorRepository.setProcessorForTask(taskType, new SimpleCeTaskProcessor() {
+
+ @CheckForNull
+ @Override
+ public CeTaskResult process(CeTask task) {
+ inCallLatch.countDown();
+ try {
+ assertionsDoneLatch.await(10, TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ return null;
+ }
+ });
+ Thread t = callCallInNewThread(underTest);
+
+ try {
+ t.start();
+
+ inCallLatch.await(10, TimeUnit.SECONDS);
+ assertThat(underTest.getCurrentTask()).contains(ceTask);
+ } finally {
+ assertionsDoneLatch.countDown();
+ t.join();
+ }
+
+ assertThat(underTest.getCurrentTask()).isEmpty();
+ }
+
+ private Thread callCallInNewThread(CeWorker underTest) {
+ return new Thread(() -> {
+ try {
+ underTest.call();
+ } catch (Exception e) {
+ throw new RuntimeException("call to call() failed and this is unexpected. Fix the UT.", e);
+ }
+ });
+ }
+
private Thread createThreadNameVerifyingThread(String threadName) {
return new Thread(() -> {
verifyUnchangedThreadName(threadName);
.setCharacteristics(characteristicMap)
.build();
}
-
+
private UserDto insertRandomUser() {
UserDto userDto = UserTesting.newUserDto();
db.getDbClient().userDao().insert(session, userDto);
doThrow(t).when(taskProcessor).process(task);
return t;
}
+
+ private static abstract class SimpleCeTaskProcessor implements CeTaskProcessor {
+ @Override
+ public Set<String> getHandledCeTaskTypes() {
+ throw new UnsupportedOperationException("getHandledCeTaskTypes should not be called");
+ }
+ }
}
--- /dev/null
+/*
+ * SonarQube
+ * Copyright (C) 2009-2018 SonarSource SA
+ * mailto:info AT sonarsource DOT com
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 3 of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this program; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ */
+package org.sonar.ce.taskprocessor;
+
+class ComputingThread extends Thread {
+ private boolean kill = false;
+
+ public ComputingThread(String name) {
+ setName(name);
+ }
+
+ private long fibo(int i) {
+ if (kill) {
+ return i;
+ }
+ if (i == 0) {
+ return 0;
+ }
+ if (i == 1) {
+ return 1;
+ }
+ return fibo(i - 1) + fibo(i - 2);
+ }
+
+ @Override
+ public void run() {
+ for (int i = 2; i < 50; i++) {
+ fibo(i);
+ if (kill) {
+ break;
+ }
+ }
+ }
+
+ public void kill() {
+ this.kill = true;
+ }
+}
+++ /dev/null
-/*
- * SonarQube
- * Copyright (C) 2009-2018 SonarSource SA
- * mailto:info AT sonarsource DOT com
- *
- * This program is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License as published by the Free Software Foundation; either
- * version 3 of the License, or (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public License
- * along with this program; if not, write to the Free Software Foundation,
- * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
- */
-package org.sonar.ce.taskprocessor;
-
-import java.util.Random;
-import org.junit.Rule;
-import org.junit.Test;
-import org.sonar.api.utils.log.LogTester;
-import org.sonar.api.utils.log.LoggerLevel;
-import org.sonar.ce.configuration.CeConfigurationRule;
-
-import static org.assertj.core.api.Assertions.assertThat;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-public class EnabledCeWorkerControllerImplTest {
- private Random random = new Random();
- /** 1 <= workerCount <= 5 */
- private int randomWorkerCount = 1 + random.nextInt(5);
-
- @Rule
- public CeConfigurationRule ceConfigurationRule = new CeConfigurationRule()
- .setWorkerCount(randomWorkerCount);
- @Rule
- public LogTester logTester = new LogTester();
-
- private CeWorker ceWorker = mock(CeWorker.class);
- private EnabledCeWorkerControllerImpl underTest = new EnabledCeWorkerControllerImpl(ceConfigurationRule);
-
- @Test
- public void isEnabled_returns_true_if_worker_ordinal_is_less_than_CeConfiguration_workerCount() {
- int ordinal = randomWorkerCount + Math.min(-1, -random.nextInt(randomWorkerCount));
- when(ceWorker.getOrdinal()).thenReturn(ordinal);
-
- assertThat(underTest.isEnabled(ceWorker))
- .as("For ordinal " + ordinal + " and workerCount " + randomWorkerCount)
- .isTrue();
- }
-
- @Test
- public void isEnabled_returns_false_if_worker_ordinal_is_equal_to_CeConfiguration_workerCount() {
- when(ceWorker.getOrdinal()).thenReturn(randomWorkerCount);
-
- assertThat(underTest.isEnabled(ceWorker)).isFalse();
- }
-
- @Test
- public void isEnabled_returns_true_if_ordinal_is_invalid() {
- int ordinal = -1 - random.nextInt(3);
- when(ceWorker.getOrdinal()).thenReturn(ordinal);
-
- assertThat(underTest.isEnabled(ceWorker))
- .as("For invalid ordinal " + ordinal + " and workerCount " + randomWorkerCount)
- .isTrue();
- }
-
- @Test
- public void constructor_writes_no_info_log_if_workerCount_is_1() {
- ceConfigurationRule.setWorkerCount(1);
- logTester.clear();
-
- new EnabledCeWorkerControllerImpl(ceConfigurationRule);
-
- assertThat(logTester.logs()).isEmpty();
- }
-
- @Test
- public void constructor_writes_info_log_if_workerCount_is_greater_than_1() {
- int newWorkerCount = randomWorkerCount + 1;
- ceConfigurationRule.setWorkerCount(newWorkerCount);
- logTester.clear();
-
- new EnabledCeWorkerControllerImpl(ceConfigurationRule);
-
- verifyInfoLog(newWorkerCount);
- }
-
- @Test
- public void workerCount_is_always_reloaded() {
- when(ceWorker.getOrdinal()).thenReturn(1);
-
- ceConfigurationRule.setWorkerCount(1);
- assertThat(underTest.isEnabled(ceWorker)).isFalse();
-
- ceConfigurationRule.setWorkerCount(2);
- assertThat(underTest.isEnabled(ceWorker)).isTrue();
- }
-
- private void verifyInfoLog(int workerCount) {
- assertThat(logTester.logs()).hasSize(1);
- assertThat(logTester.logs(LoggerLevel.INFO))
- .containsOnly("Compute Engine will use " + workerCount + " concurrent workers to process tasks");
- }
-}
--- /dev/null
+/*
+ * SonarQube
+ * Copyright (C) 2009-2018 SonarSource SA
+ * mailto:info AT sonarsource DOT com
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 3 of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this program; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ */
+package org.sonar.ce.taskprocessor;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.sonar.ce.task.CeTask;
+import org.sonar.ce.task.CeTaskCanceledException;
+
+import static org.apache.commons.lang.RandomStringUtils.randomAlphabetic;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verifyZeroInteractions;
+
+public class SimpleCeTaskInterrupterTest {
+ @Rule
+ public ExpectedException expectedException = ExpectedException.none();
+
+ private SimpleCeTaskInterrupter underTest = new SimpleCeTaskInterrupter();
+
+ @Test
+ public void check_throws_CeTaskCanceledException_if_provided_thread_is_interrupted() throws InterruptedException {
+ String threadName = randomAlphabetic(30);
+ ComputingThread t = new ComputingThread(threadName);
+
+ try {
+ t.start();
+
+ // will not fail
+ underTest.check(t);
+
+ t.interrupt();
+
+ expectedException.expect(CeTaskCanceledException.class);
+ expectedException.expectMessage("CeWorker executing in Thread '" + threadName + "' has been interrupted");
+
+ underTest.check(t);
+ } finally {
+ t.kill();
+ t.join(1_000);
+ }
+ }
+
+ @Test
+ public void onStart_has_no_effect() {
+ CeTask ceTask = mock(CeTask.class);
+
+ underTest.onStart(ceTask);
+
+ verifyZeroInteractions(ceTask);
+ }
+
+ @Test
+ public void onEnd_has_no_effect() {
+ CeTask ceTask = mock(CeTask.class);
+
+ underTest.onEnd(ceTask);
+
+ verifyZeroInteractions(ceTask);
+ }
+}
--- /dev/null
+/*
+ * SonarQube
+ * Copyright (C) 2009-2018 SonarSource SA
+ * mailto:info AT sonarsource DOT com
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 3 of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this program; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ */
+package org.sonar.ce.taskprocessor;
+
+import java.util.Optional;
+import java.util.Random;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.sonar.api.utils.System2;
+import org.sonar.api.utils.log.LogTester;
+import org.sonar.api.utils.log.LoggerLevel;
+import org.sonar.ce.task.CeTask;
+import org.sonar.ce.task.CeTaskCanceledException;
+import org.sonar.ce.task.CeTaskTimeoutException;
+
+import static org.apache.commons.lang.RandomStringUtils.randomAlphabetic;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class TimeoutCeTaskInterrupterTest {
+ @Rule
+ public ExpectedException expectedException = ExpectedException.none();
+ @Rule
+ public LogTester logTester = new LogTester();
+
+ private int timeoutInSeconds = 1 + new Random().nextInt(20);
+ private int timeoutInMs = timeoutInSeconds * 1_000;
+ private CeWorkerController ceWorkerController = mock(CeWorkerController.class);
+ private System2 system2 = mock(System2.class);
+ private CeWorker ceWorker = mock(CeWorker.class);
+ private CeTask ceTask = mock(CeTask.class);
+ private TimeoutCeTaskInterrupter underTest = new TimeoutCeTaskInterrupter(timeoutInMs, ceWorkerController, system2);
+
+ @Test
+ public void constructor_fails_with_IAE_if_timeout_is_0() {
+ expectedException.expect(IllegalArgumentException.class);
+ expectedException.expectMessage("threshold must be >= 1");
+
+ new TimeoutCeTaskInterrupter(0, ceWorkerController, system2);
+ }
+
+ @Test
+ public void constructor_fails_with_IAE_if_timeout_is_less_than_0() {
+ long timeout = - (1 + new Random().nextInt(299));
+ expectedException.expect(IllegalArgumentException.class);
+ expectedException.expectMessage("threshold must be >= 1");
+
+ new TimeoutCeTaskInterrupter(timeout, ceWorkerController, system2);
+ }
+
+ @Test
+ public void constructor_log_timeout_in_ms_at_INFO_level() {
+ int timeout = 1 + new Random().nextInt(9_999);
+
+ new TimeoutCeTaskInterrupter(timeout, ceWorkerController, system2);
+
+ assertThat(logTester.logs()).hasSize(1);
+ assertThat(logTester.logs(LoggerLevel.INFO))
+ .containsExactly("Compute Engine Task timeout enabled: " + timeout + " ms");
+ }
+
+ @Test
+ public void check_fails_with_ISE_if_thread_is_not_running_a_CeWorker() {
+ Thread t = newThreadWithRandomName();
+
+ expectedException.expect(IllegalStateException.class);
+ expectedException.expectMessage("Could not find the CeTask being executed in thread '" + t.getName() + "'");
+
+ underTest.check(t);
+ }
+
+ @Test
+ public void check_fails_with_ISE_if_thread_is_not_running_a_CeWorker_with_no_current_CeTask() {
+ Thread t = newThreadWithRandomName();
+ mockWorkerOnThread(t, ceWorker);
+
+ expectedException.expect(IllegalStateException.class);
+ expectedException.expectMessage("Could not find the CeTask being executed in thread '" + t.getName() + "'");
+
+ underTest.check(t);
+ }
+
+ @Test
+ public void check_fails_with_ISE_if_thread_is_executing_a_CeTask_but_on_start_has_not_been_called_on_it() {
+ String taskUuid = randomAlphabetic(15);
+ Thread t = new Thread();
+ mockWorkerOnThread(t, ceWorker);
+ mockWorkerWithTask(ceTask);
+ when(ceTask.getUuid()).thenReturn(taskUuid);
+
+ expectedException.expect(IllegalStateException.class);
+ expectedException.expectMessage("No start time recorded for task " + taskUuid);
+
+ underTest.check(t);
+ }
+
+ @Test
+ public void check_fails_with_ISE_if_thread_is_executing_a_CeTask_but_on_start_and_on_end_have_not_been_called_on_it() {
+ String taskUuid = randomAlphabetic(15);
+ Thread t = new Thread();
+ mockWorkerOnThread(t, ceWorker);
+ mockWorkerWithTask(ceTask);
+ when(ceTask.getUuid()).thenReturn(taskUuid);
+ underTest.onStart(this.ceTask);
+ underTest.onEnd(this.ceTask);
+
+ expectedException.expect(IllegalStateException.class);
+ expectedException.expectMessage("No start time recorded for task " + taskUuid);
+
+ underTest.check(t);
+ }
+
+ @Test
+ public void check_throws_CeTaskCanceledException_if_provided_thread_is_interrupted() throws InterruptedException {
+ String threadName = randomAlphabetic(30);
+ ComputingThread t = new ComputingThread(threadName);
+ mockWorkerOnThread(t, ceWorker);
+ mockWorkerWithTask(ceTask);
+ underTest.onStart(ceTask);
+
+ try {
+ t.start();
+
+ // will not fail as thread is not interrupted nor timed out
+ underTest.check(t);
+
+ t.interrupt();
+
+ expectedException.expect(CeTaskCanceledException.class);
+ expectedException.expectMessage("CeWorker executing in Thread '" + threadName + "' has been interrupted");
+
+ underTest.check(t);
+ } finally {
+ t.kill();
+ t.join(1_000);
+ }
+ }
+
+ @Test
+ public void check_throws_CeTaskTimeoutException_if_check_called_later_than_timeout_milliseconds_after_on_start() {
+ Thread thread = newThreadWithRandomName();
+ mockWorkerOnThread(thread, ceWorker);
+ mockWorkerWithTask(ceTask);
+ long now = 3_776_663_999L;
+ when(system2.now()).thenReturn(now);
+ underTest.onStart(ceTask);
+
+ // timeout not passed => no exception thrown
+ int beforeTimeoutOffset = 1 + new Random().nextInt(timeoutInMs - 1);
+ when(system2.now()).thenReturn(now + timeoutInMs - beforeTimeoutOffset);
+ underTest.check(thread);
+
+ int afterTimeoutOffset = new Random().nextInt(7_112);
+ when(system2.now()).thenReturn(now + timeoutInMs + afterTimeoutOffset);
+
+ expectedException.expect(CeTaskTimeoutException.class);
+ expectedException.expectMessage("Execution of task timed out after " + (timeoutInMs + afterTimeoutOffset) + " ms");
+
+ underTest.check(thread);
+ }
+
+ @Test
+ public void check_throws_CeTaskCanceledException_if_provided_thread_is_interrupted_even_if_timed_out() throws InterruptedException {
+ String threadName = randomAlphabetic(30);
+ ComputingThread t = new ComputingThread(threadName);
+ mockWorkerOnThread(t, ceWorker);
+ mockWorkerWithTask(ceTask);
+ long now = 3_776_663_999L;
+ when(system2.now()).thenReturn(now);
+ underTest.onStart(ceTask);
+
+ try {
+ t.start();
+ t.interrupt();
+
+ // will not fail as thread is not interrupted nor timed out
+ int afterTimeoutOffset = new Random().nextInt(7_112);
+ when(system2.now()).thenReturn(now + timeoutInMs + afterTimeoutOffset);
+
+ expectedException.expect(CeTaskCanceledException.class);
+ expectedException.expectMessage("CeWorker executing in Thread '" + threadName + "' has been interrupted");
+
+ underTest.check(t);
+ } finally {
+ t.kill();
+ t.join(1_000);
+ }
+ }
+
+ private static Thread newThreadWithRandomName() {
+ String threadName = randomAlphabetic(30);
+ Thread t = new Thread();
+ t.setName(threadName);
+ return t;
+ }
+
+ private void mockWorkerOnThread(Thread t, CeWorker ceWorker) {
+ when(ceWorkerController.getCeWorkerIn(t)).thenReturn(Optional.of(ceWorker));
+ }
+
+ private void mockWorkerWithTask(CeTask ceTask) {
+ when(ceWorker.getCurrentTask()).thenReturn(Optional.of(ceTask));
+ }
+}