From a3c12be17d74f53465017850644e29226709a910 Mon Sep 17 00:00:00 2001 From: =?utf8?q?S=C3=A9bastien=20Lesaint?= Date: Wed, 19 Dec 2018 16:55:29 +0100 Subject: [PATCH] SONARCLOUD-310 Ce task can now fail after a maximum amount of time failure can only happen when going from one step to the other --- .../ce/task/CeTaskCanceledException.java | 31 +++ .../ce/task/CeTaskInterruptedException.java | 56 +++++ .../org/sonar/ce/task/CeTaskInterrupter.java | 44 ++++ .../sonar/ce/task/CeTaskTimeoutException.java | 54 ++++ .../ce/task/step/ComputationStepExecutor.java | 12 +- .../ce/task/CeTaskCanceledExceptionTest.java | 45 ++++ .../task/CeTaskInterruptedExceptionTest.java | 70 ++++++ .../ce/task/CeTaskTimeoutExceptionTest.java | 53 ++++ .../step/ComputationStepExecutorTest.java | 97 ++++++- .../sonar/ce/monitoring/CeTasksMBeanImpl.java | 10 +- .../sonar/ce/queue/InternalCeQueueImpl.java | 2 +- .../CeProcessingSchedulerImpl.java | 4 +- .../CeTaskInterrupterProvider.java | 46 ++++ ...askInterrupterWorkerExecutionListener.java | 44 ++++ .../taskprocessor/CeTaskProcessorModule.java | 8 +- .../org/sonar/ce/taskprocessor/CeWorker.java | 12 + ...ontroller.java => CeWorkerController.java} | 25 +- ...rImpl.java => CeWorkerControllerImpl.java} | 22 +- .../ce/taskprocessor/CeWorkerFactoryImpl.java | 12 +- .../sonar/ce/taskprocessor/CeWorkerImpl.java | 237 +++++++++++++----- .../SimpleCeTaskInterrupter.java | 50 ++++ .../TimeoutCeTaskInterrupter.java | 103 ++++++++ .../ComputeEngineContainerImplTest.java | 2 +- .../ce/monitoring/CeTasksMBeanImplTest.java | 10 +- .../CeProcessingSchedulerImplTest.java | 2 +- .../CeTaskInterrupterProviderTest.java | 127 ++++++++++ ...nterrupterWorkerExecutionListenerTest.java | 56 +++++ .../CeTaskProcessorModuleTest.java | 13 + ...t.java => CeWorkerControllerImplTest.java} | 69 ++++- .../CeWorkerFactoryImplTest.java | 4 +- .../ce/taskprocessor/CeWorkerImplTest.java | 208 +++++++++++++-- .../ce/taskprocessor/ComputingThread.java | 55 ++++ .../SimpleCeTaskInterrupterTest.java | 78 ++++++ .../TimeoutCeTaskInterrupterTest.java | 223 ++++++++++++++++ 34 files changed, 1741 insertions(+), 143 deletions(-) create mode 100644 server/sonar-ce-task/src/main/java/org/sonar/ce/task/CeTaskCanceledException.java create mode 100644 server/sonar-ce-task/src/main/java/org/sonar/ce/task/CeTaskInterruptedException.java create mode 100644 server/sonar-ce-task/src/main/java/org/sonar/ce/task/CeTaskInterrupter.java create mode 100644 server/sonar-ce-task/src/main/java/org/sonar/ce/task/CeTaskTimeoutException.java create mode 100644 server/sonar-ce-task/src/test/java/org/sonar/ce/task/CeTaskCanceledExceptionTest.java create mode 100644 server/sonar-ce-task/src/test/java/org/sonar/ce/task/CeTaskInterruptedExceptionTest.java create mode 100644 server/sonar-ce-task/src/test/java/org/sonar/ce/task/CeTaskTimeoutExceptionTest.java create mode 100644 server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeTaskInterrupterProvider.java create mode 100644 server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeTaskInterrupterWorkerExecutionListener.java rename server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/{EnabledCeWorkerController.java => CeWorkerController.java} (70%) rename server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/{EnabledCeWorkerControllerImpl.java => CeWorkerControllerImpl.java} (73%) create mode 100644 server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/SimpleCeTaskInterrupter.java create mode 100644 server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/TimeoutCeTaskInterrupter.java create mode 100644 server/sonar-ce/src/test/java/org/sonar/ce/taskprocessor/CeTaskInterrupterProviderTest.java create mode 100644 server/sonar-ce/src/test/java/org/sonar/ce/taskprocessor/CeTaskInterrupterWorkerExecutionListenerTest.java rename server/sonar-ce/src/test/java/org/sonar/ce/taskprocessor/{EnabledCeWorkerControllerImplTest.java => CeWorkerControllerImplTest.java} (57%) create mode 100644 server/sonar-ce/src/test/java/org/sonar/ce/taskprocessor/ComputingThread.java create mode 100644 server/sonar-ce/src/test/java/org/sonar/ce/taskprocessor/SimpleCeTaskInterrupterTest.java create mode 100644 server/sonar-ce/src/test/java/org/sonar/ce/taskprocessor/TimeoutCeTaskInterrupterTest.java diff --git a/server/sonar-ce-task/src/main/java/org/sonar/ce/task/CeTaskCanceledException.java b/server/sonar-ce-task/src/main/java/org/sonar/ce/task/CeTaskCanceledException.java new file mode 100644 index 00000000000..dd3d9b68245 --- /dev/null +++ b/server/sonar-ce-task/src/main/java/org/sonar/ce/task/CeTaskCanceledException.java @@ -0,0 +1,31 @@ +/* + * SonarQube + * Copyright (C) 2009-2018 SonarSource SA + * mailto:info AT sonarsource DOT com + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 3 of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with this program; if not, write to the Free Software Foundation, + * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ +package org.sonar.ce.task; + +import org.sonar.db.ce.CeActivityDto; + +import static java.lang.String.format; + +public final class CeTaskCanceledException extends CeTaskInterruptedException { + public CeTaskCanceledException(Thread thread) { + super(format("CeWorker executing in Thread '%s' has been interrupted", thread.getName()), + CeActivityDto.Status.CANCELED); + } +} diff --git a/server/sonar-ce-task/src/main/java/org/sonar/ce/task/CeTaskInterruptedException.java b/server/sonar-ce-task/src/main/java/org/sonar/ce/task/CeTaskInterruptedException.java new file mode 100644 index 00000000000..33192cd3e83 --- /dev/null +++ b/server/sonar-ce-task/src/main/java/org/sonar/ce/task/CeTaskInterruptedException.java @@ -0,0 +1,56 @@ +/* + * SonarQube + * Copyright (C) 2009-2018 SonarSource SA + * mailto:info AT sonarsource DOT com + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 3 of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with this program; if not, write to the Free Software Foundation, + * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ +package org.sonar.ce.task; + +import java.util.Optional; +import org.sonar.db.ce.CeActivityDto; + +import static java.util.Objects.requireNonNull; + +public abstract class CeTaskInterruptedException extends RuntimeException { + private final CeActivityDto.Status status; + + protected CeTaskInterruptedException(String message, CeActivityDto.Status status) { + super(message); + this.status = requireNonNull(status, "status can't be null"); + } + + public CeActivityDto.Status getStatus() { + return status; + } + + public static Optional isTaskInterruptedException(Throwable e) { + if (e instanceof CeTaskInterruptedException) { + return Optional.of((CeTaskInterruptedException) e); + } + return isCauseInterruptedException(e); + } + + private static Optional isCauseInterruptedException(Throwable e) { + Throwable cause = e.getCause(); + if (cause == null || cause == e) { + return Optional.empty(); + } + if (cause instanceof CeTaskInterruptedException) { + return Optional.of((CeTaskInterruptedException) cause); + } + return isCauseInterruptedException(cause); + } +} diff --git a/server/sonar-ce-task/src/main/java/org/sonar/ce/task/CeTaskInterrupter.java b/server/sonar-ce-task/src/main/java/org/sonar/ce/task/CeTaskInterrupter.java new file mode 100644 index 00000000000..f3acee8f631 --- /dev/null +++ b/server/sonar-ce-task/src/main/java/org/sonar/ce/task/CeTaskInterrupter.java @@ -0,0 +1,44 @@ +/* + * SonarQube + * Copyright (C) 2009-2018 SonarSource SA + * mailto:info AT sonarsource DOT com + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 3 of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with this program; if not, write to the Free Software Foundation, + * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ +package org.sonar.ce.task; + +/** + * Method {@link #check(Thread)} of the {@link CeTaskInterrupter} can be called during the processing of a + * {@link CeTask} to check whether processing of this task must be interrupted. + *

+ * Interruption cause may be user cancelling the task, operator stopping the Compute Engine, execution running for + * too long, ... + */ +public interface CeTaskInterrupter { + /** + * @throws CeTaskInterruptedException if the execution of the task must be interrupted + */ + void check(Thread currentThread) throws CeTaskInterruptedException; + + /** + * Lets the interrupter know that the processing of the specified task has started. + */ + void onStart(CeTask ceTask); + + /** + * Lets the interrupter know that the processing of the specified task has ended. + */ + void onEnd(CeTask ceTask); +} diff --git a/server/sonar-ce-task/src/main/java/org/sonar/ce/task/CeTaskTimeoutException.java b/server/sonar-ce-task/src/main/java/org/sonar/ce/task/CeTaskTimeoutException.java new file mode 100644 index 00000000000..0e4caac5c5d --- /dev/null +++ b/server/sonar-ce-task/src/main/java/org/sonar/ce/task/CeTaskTimeoutException.java @@ -0,0 +1,54 @@ +/* + * SonarQube + * Copyright (C) 2009-2018 SonarSource SA + * mailto:info AT sonarsource DOT com + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 3 of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with this program; if not, write to the Free Software Foundation, + * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ +package org.sonar.ce.task; + +import org.sonar.db.ce.CeActivityDto; + +/** + * This exception will stop the task and make it be archived with the {@link CeActivityDto.Status#FAILED FAILED} + * status and the error type {@code TIMEOUT}. + *

+ * This exception has no stacktrace: + *

    + *
  • it's irrelevant to the end user
  • + *
  • we don't want to leak any implementation detail
  • + *
+ */ +public final class CeTaskTimeoutException extends CeTaskInterruptedException implements TypedException { + + public CeTaskTimeoutException(String message) { + super(message, CeActivityDto.Status.FAILED); + } + + @Override + public String getType() { + return "TIMEOUT"; + } + + /** + * Does not fill in the stack trace + * + * @see Throwable#fillInStackTrace() + */ + @Override + public synchronized Throwable fillInStackTrace() { + return this; + } +} diff --git a/server/sonar-ce-task/src/main/java/org/sonar/ce/task/step/ComputationStepExecutor.java b/server/sonar-ce-task/src/main/java/org/sonar/ce/task/step/ComputationStepExecutor.java index 8318fe8d8ab..c7a546f235d 100644 --- a/server/sonar-ce-task/src/main/java/org/sonar/ce/task/step/ComputationStepExecutor.java +++ b/server/sonar-ce-task/src/main/java/org/sonar/ce/task/step/ComputationStepExecutor.java @@ -23,6 +23,7 @@ import javax.annotation.CheckForNull; import javax.annotation.Nullable; import org.sonar.api.utils.log.Logger; import org.sonar.api.utils.log.Loggers; +import org.sonar.ce.task.CeTaskInterrupter; import org.sonar.core.util.logs.Profiler; import static com.google.common.base.Preconditions.checkArgument; @@ -33,6 +34,7 @@ public final class ComputationStepExecutor { private static final Logger LOGGER = Loggers.get(ComputationStepExecutor.class); private final ComputationSteps steps; + private final CeTaskInterrupter taskInterrupter; @CheckForNull private final Listener listener; @@ -40,12 +42,13 @@ public final class ComputationStepExecutor { * Used when no {@link ComputationStepExecutor.Listener} is available in pico * container. */ - public ComputationStepExecutor(ComputationSteps steps) { - this(steps, null); + public ComputationStepExecutor(ComputationSteps steps, CeTaskInterrupter taskInterrupter) { + this(steps, taskInterrupter, null); } - public ComputationStepExecutor(ComputationSteps steps, @Nullable Listener listener) { + public ComputationStepExecutor(ComputationSteps steps, CeTaskInterrupter taskInterrupter, @Nullable Listener listener) { this.steps = steps; + this.taskInterrupter = taskInterrupter; this.listener = listener; } @@ -70,10 +73,11 @@ public final class ComputationStepExecutor { } } - private static void executeStep(Profiler stepProfiler, ComputationStep.Context context, ComputationStep step) { + private void executeStep(Profiler stepProfiler, ComputationStep.Context context, ComputationStep step) { String status = "FAILED"; stepProfiler.start(); try { + taskInterrupter.check(Thread.currentThread()); step.execute(context); status = "SUCCESS"; } finally { diff --git a/server/sonar-ce-task/src/test/java/org/sonar/ce/task/CeTaskCanceledExceptionTest.java b/server/sonar-ce-task/src/test/java/org/sonar/ce/task/CeTaskCanceledExceptionTest.java new file mode 100644 index 00000000000..3d3b5076956 --- /dev/null +++ b/server/sonar-ce-task/src/test/java/org/sonar/ce/task/CeTaskCanceledExceptionTest.java @@ -0,0 +1,45 @@ +/* + * SonarQube + * Copyright (C) 2009-2018 SonarSource SA + * mailto:info AT sonarsource DOT com + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 3 of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with this program; if not, write to the Free Software Foundation, + * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ +package org.sonar.ce.task; + +import org.junit.Test; +import org.sonar.db.ce.CeActivityDto; + +import static org.apache.commons.lang.RandomStringUtils.randomAlphabetic; +import static org.assertj.core.api.Assertions.assertThat; + +public class CeTaskCanceledExceptionTest { + @Test + public void message_is_based_on_specified_thread_name() { + Thread t = new Thread(); + t.setName(randomAlphabetic(29)); + + CeTaskCanceledException underTest = new CeTaskCanceledException(t); + + assertThat(underTest.getMessage()).isEqualTo("CeWorker executing in Thread '" + t.getName() + "' has been interrupted"); + } + + @Test + public void getStatus_returns_CANCELED() { + CeTaskCanceledException underTest = new CeTaskCanceledException(new Thread()); + + assertThat(underTest.getStatus()).isEqualTo(CeActivityDto.Status.CANCELED); + } +} diff --git a/server/sonar-ce-task/src/test/java/org/sonar/ce/task/CeTaskInterruptedExceptionTest.java b/server/sonar-ce-task/src/test/java/org/sonar/ce/task/CeTaskInterruptedExceptionTest.java new file mode 100644 index 00000000000..84e47e90e53 --- /dev/null +++ b/server/sonar-ce-task/src/test/java/org/sonar/ce/task/CeTaskInterruptedExceptionTest.java @@ -0,0 +1,70 @@ +/* + * SonarQube + * Copyright (C) 2009-2018 SonarSource SA + * mailto:info AT sonarsource DOT com + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 3 of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with this program; if not, write to the Free Software Foundation, + * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ +package org.sonar.ce.task; + +import java.util.Random; +import org.junit.Test; +import org.sonar.db.ce.CeActivityDto; + +import static org.apache.commons.lang.RandomStringUtils.randomAlphabetic; +import static org.assertj.core.api.Assertions.assertThat; +import static org.sonar.ce.task.CeTaskInterruptedException.isTaskInterruptedException; + +public class CeTaskInterruptedExceptionTest { + + @Test + public void isCauseInterruptedException_returns_CeTaskInterruptedException_or_subclass() { + String message = randomAlphabetic(50); + CeActivityDto.Status status = randomStatus(); + CeTaskInterruptedException e1 = new CeTaskInterruptedException(message, status) { + + }; + CeTaskInterruptedException e2 = new CeTaskInterruptedExceptionSubclass(message, status); + + assertThat(isTaskInterruptedException(e1)).contains(e1); + assertThat(isTaskInterruptedException(e2)).contains(e2); + assertThat(isTaskInterruptedException(new RuntimeException())).isEmpty(); + assertThat(isTaskInterruptedException(new Exception())).isEmpty(); + } + + @Test + public void isCauseInterruptedException_returns_CeTaskInterruptedException_or_subclass_in_cause_chain() { + String message = randomAlphabetic(50); + CeActivityDto.Status status = randomStatus(); + CeTaskInterruptedException e1 = new CeTaskInterruptedException(message, status) { + + }; + CeTaskInterruptedException e2 = new CeTaskInterruptedExceptionSubclass(message, status); + + assertThat(isTaskInterruptedException(new RuntimeException(e1))).contains(e1); + assertThat(isTaskInterruptedException(new Exception(new RuntimeException(e2)))).contains(e2); + } + + private static CeActivityDto.Status randomStatus() { + return CeActivityDto.Status.values()[new Random().nextInt(CeActivityDto.Status.values().length)]; + } + + private static class CeTaskInterruptedExceptionSubclass extends CeTaskInterruptedException { + public CeTaskInterruptedExceptionSubclass(String message, CeActivityDto.Status status) { + super(message, status); + } + } + +} diff --git a/server/sonar-ce-task/src/test/java/org/sonar/ce/task/CeTaskTimeoutExceptionTest.java b/server/sonar-ce-task/src/test/java/org/sonar/ce/task/CeTaskTimeoutExceptionTest.java new file mode 100644 index 00000000000..a8059fd6d0b --- /dev/null +++ b/server/sonar-ce-task/src/test/java/org/sonar/ce/task/CeTaskTimeoutExceptionTest.java @@ -0,0 +1,53 @@ +/* + * SonarQube + * Copyright (C) 2009-2018 SonarSource SA + * mailto:info AT sonarsource DOT com + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 3 of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with this program; if not, write to the Free Software Foundation, + * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ +package org.sonar.ce.task; + +import java.io.PrintWriter; +import java.io.StringWriter; +import org.apache.commons.lang.RandomStringUtils; +import org.junit.Test; +import org.sonar.db.ce.CeActivityDto; + +import static org.assertj.core.api.Assertions.assertThat; + +public class CeTaskTimeoutExceptionTest { + private String message = RandomStringUtils.randomAlphabetic(50); + private CeTaskTimeoutException underTest = new CeTaskTimeoutException(message); + + @Test + public void verify_message_and_type() { + assertThat(underTest.getMessage()).isEqualTo(message); + assertThat(underTest.getType()).isEqualTo("TIMEOUT"); + } + + @Test + public void getStatus_returns_FAILED() { + assertThat(underTest.getStatus()).isEqualTo(CeActivityDto.Status.FAILED); + } + + + @Test + public void noStacktrace() { + StringWriter stacktrace = new StringWriter(); + underTest.printStackTrace(new PrintWriter(stacktrace)); + assertThat(stacktrace.toString()) + .isEqualTo(CeTaskTimeoutException.class.getCanonicalName() + ": " + message + System.lineSeparator()); + } +} diff --git a/server/sonar-ce-task/src/test/java/org/sonar/ce/task/step/ComputationStepExecutorTest.java b/server/sonar-ce-task/src/test/java/org/sonar/ce/task/step/ComputationStepExecutorTest.java index 71810a3ce55..3900fd9dfa3 100644 --- a/server/sonar-ce-task/src/test/java/org/sonar/ce/task/step/ComputationStepExecutorTest.java +++ b/server/sonar-ce-task/src/test/java/org/sonar/ce/task/step/ComputationStepExecutorTest.java @@ -27,15 +27,18 @@ import org.junit.rules.ExpectedException; import org.mockito.InOrder; import org.sonar.api.utils.log.LogTester; import org.sonar.api.utils.log.LoggerLevel; +import org.sonar.ce.task.CeTaskInterrupter; import org.sonar.ce.task.ChangeLogLevel; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.fail; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyBoolean; +import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.inOrder; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.reset; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.Mockito.when; @@ -47,13 +50,14 @@ public class ComputationStepExecutorTest { public ExpectedException expectedException = ExpectedException.none(); private final ComputationStepExecutor.Listener listener = mock(ComputationStepExecutor.Listener.class); + private final CeTaskInterrupter taskInterrupter = mock(CeTaskInterrupter.class); private final ComputationStep computationStep1 = mockComputationStep("step1"); private final ComputationStep computationStep2 = mockComputationStep("step2"); private final ComputationStep computationStep3 = mockComputationStep("step3"); @Test public void execute_call_execute_on_each_ComputationStep_in_order_returned_by_instances_method() { - new ComputationStepExecutor(mockComputationSteps(computationStep1, computationStep2, computationStep3)) + new ComputationStepExecutor(mockComputationSteps(computationStep1, computationStep2, computationStep3), taskInterrupter) .execute(); InOrder inOrder = inOrder(computationStep1, computationStep2, computationStep3); @@ -75,7 +79,7 @@ public class ComputationStepExecutorTest { .when(computationStep) .execute(any()); - ComputationStepExecutor computationStepExecutor = new ComputationStepExecutor(mockComputationSteps(computationStep)); + ComputationStepExecutor computationStepExecutor = new ComputationStepExecutor(mockComputationSteps(computationStep), taskInterrupter); expectedException.expect(RuntimeException.class); expectedException.expectMessage(message); @@ -93,7 +97,7 @@ public class ComputationStepExecutorTest { ChangeLogLevel logLevel1 = new ChangeLogLevel(step1.getClass(), LoggerLevel.INFO); ChangeLogLevel logLevel2 = new ChangeLogLevel(step2.getClass(), LoggerLevel.INFO); ChangeLogLevel logLevel3 = new ChangeLogLevel(step3.getClass(), LoggerLevel.INFO)) { - new ComputationStepExecutor(mockComputationSteps(step1, step2, step3)).execute(); + new ComputationStepExecutor(mockComputationSteps(step1, step2, step3), taskInterrupter).execute(); List infoLogs = logTester.logs(LoggerLevel.INFO); assertThat(infoLogs).hasSize(3); @@ -114,7 +118,7 @@ public class ComputationStepExecutorTest { super.execute(context); throw expected; } - } ; + }; try (ChangeLogLevel executor = new ChangeLogLevel(ComputationStepExecutor.class, LoggerLevel.INFO); ChangeLogLevel logLevel1 = new ChangeLogLevel(step1.getClass(), LoggerLevel.INFO); @@ -122,7 +126,7 @@ public class ComputationStepExecutorTest { ChangeLogLevel logLevel3 = new ChangeLogLevel(step3.getClass(), LoggerLevel.INFO)) { try { - new ComputationStepExecutor(mockComputationSteps(step1, step2, step3)).execute(); + new ComputationStepExecutor(mockComputationSteps(step1, step2, step3), taskInterrupter).execute(); fail("a RuntimeException should have been thrown"); } catch (RuntimeException e) { List infoLogs = logTester.logs(LoggerLevel.INFO); @@ -142,7 +146,7 @@ public class ComputationStepExecutorTest { expectedException.expect(IllegalArgumentException.class); expectedException.expectMessage("Statistic with key [time] is not accepted"); - new ComputationStepExecutor(mockComputationSteps(step)).execute(); + new ComputationStepExecutor(mockComputationSteps(step), taskInterrupter).execute(); } } @@ -154,7 +158,7 @@ public class ComputationStepExecutorTest { expectedException.expect(IllegalArgumentException.class); expectedException.expectMessage("Statistic with key [foo] is already present"); - new ComputationStepExecutor(mockComputationSteps(step)).execute(); + new ComputationStepExecutor(mockComputationSteps(step), taskInterrupter).execute(); } } @@ -166,7 +170,7 @@ public class ComputationStepExecutorTest { expectedException.expect(NullPointerException.class); expectedException.expectMessage("Statistic has null key"); - new ComputationStepExecutor(mockComputationSteps(step)).execute(); + new ComputationStepExecutor(mockComputationSteps(step), taskInterrupter).execute(); } } @@ -178,13 +182,13 @@ public class ComputationStepExecutorTest { expectedException.expect(NullPointerException.class); expectedException.expectMessage("Statistic with key [bar] has null value"); - new ComputationStepExecutor(mockComputationSteps(step)).execute(); + new ComputationStepExecutor(mockComputationSteps(step), taskInterrupter).execute(); } } @Test public void execute_calls_listener_finished_method_with_all_step_runs() { - new ComputationStepExecutor(mockComputationSteps(computationStep1, computationStep2), listener) + new ComputationStepExecutor(mockComputationSteps(computationStep1, computationStep2), taskInterrupter, listener) .execute(); verify(listener).finished(true); @@ -199,7 +203,7 @@ public class ComputationStepExecutorTest { .execute(any()); try { - new ComputationStepExecutor(mockComputationSteps(computationStep1, computationStep2), listener) + new ComputationStepExecutor(mockComputationSteps(computationStep1, computationStep2), taskInterrupter, listener) .execute(); fail("exception toBeThrown should have been raised"); } catch (RuntimeException e) { @@ -216,7 +220,76 @@ public class ComputationStepExecutorTest { .when(listener) .finished(anyBoolean()); - new ComputationStepExecutor(mockComputationSteps(computationStep1), listener).execute(); + new ComputationStepExecutor(mockComputationSteps(computationStep1), taskInterrupter, listener).execute(); + } + + @Test + public void execute_fails_with_exception_thrown_by_interrupter() throws Throwable { + executeFailsWithExceptionThrownByInterrupter(); + + reset(computationStep1, computationStep2, computationStep3, taskInterrupter); + runInOtherThread(this::executeFailsWithExceptionThrownByInterrupter); + } + + private void executeFailsWithExceptionThrownByInterrupter() { + Thread currentThread = Thread.currentThread(); + ComputationStepExecutor underTest = new ComputationStepExecutor(mockComputationSteps(computationStep1, computationStep2, computationStep3), taskInterrupter); + RuntimeException exception = new RuntimeException("mocking fail of method check()"); + doNothing() + .doNothing() + .doThrow(exception) + .when(taskInterrupter) + .check(currentThread); + + try { + underTest.execute(); + fail("execute should have thrown an exception"); + } catch (Exception e) { + assertThat(e).isSameAs(exception); + } + } + + @Test + public void execute_calls_interrupter_with_current_thread_before_each_step() throws Throwable { + executeCallsInterrupterWithCurrentThreadBeforeEachStep(); + + reset(computationStep1, computationStep2, computationStep3, taskInterrupter); + runInOtherThread(this::executeCallsInterrupterWithCurrentThreadBeforeEachStep); + } + + private void executeCallsInterrupterWithCurrentThreadBeforeEachStep() { + InOrder inOrder = inOrder(computationStep1, computationStep2, computationStep3, taskInterrupter); + ComputationStepExecutor underTest = new ComputationStepExecutor(mockComputationSteps(computationStep1, computationStep2, computationStep3), taskInterrupter); + + underTest.execute(); + + inOrder.verify(taskInterrupter).check(Thread.currentThread()); + inOrder.verify(computationStep1).execute(any()); + inOrder.verify(computationStep1).getDescription(); + inOrder.verify(taskInterrupter).check(Thread.currentThread()); + inOrder.verify(computationStep2).execute(any()); + inOrder.verify(computationStep2).getDescription(); + inOrder.verify(taskInterrupter).check(Thread.currentThread()); + inOrder.verify(computationStep3).execute(any()); + inOrder.verify(computationStep3).getDescription(); + inOrder.verifyNoMoreInteractions(); + } + + private void runInOtherThread(Runnable r) throws Throwable { + Throwable[] otherThreadException = new Throwable[1]; + Thread t = new Thread(() -> { + try { + r.run(); + } catch (Throwable e) { + otherThreadException[0] = e; + } + }); + t.start(); + t.join(); + + if (otherThreadException[0] != null) { + throw otherThreadException[0]; + } } private static ComputationSteps mockComputationSteps(ComputationStep... computationSteps) { diff --git a/server/sonar-ce/src/main/java/org/sonar/ce/monitoring/CeTasksMBeanImpl.java b/server/sonar-ce/src/main/java/org/sonar/ce/monitoring/CeTasksMBeanImpl.java index 08cd1464c45..0b53f4b6adb 100644 --- a/server/sonar-ce/src/main/java/org/sonar/ce/monitoring/CeTasksMBeanImpl.java +++ b/server/sonar-ce/src/main/java/org/sonar/ce/monitoring/CeTasksMBeanImpl.java @@ -25,8 +25,8 @@ import java.util.stream.Collectors; import org.picocontainer.Startable; import org.sonar.ce.configuration.CeConfiguration; import org.sonar.ce.taskprocessor.CeWorker; +import org.sonar.ce.taskprocessor.CeWorkerController; import org.sonar.ce.taskprocessor.CeWorkerFactory; -import org.sonar.ce.taskprocessor.EnabledCeWorkerController; import org.sonar.process.Jmx; import org.sonar.process.systeminfo.SystemInfoSection; import org.sonar.process.systeminfo.protobuf.ProtobufSystemInfo; @@ -35,13 +35,13 @@ public class CeTasksMBeanImpl implements CeTasksMBean, Startable, SystemInfoSect private final CEQueueStatus queueStatus; private final CeConfiguration ceConfiguration; private final CeWorkerFactory ceWorkerFactory; - private final EnabledCeWorkerController enabledCeWorkerController; + private final CeWorkerController ceWorkerController; - public CeTasksMBeanImpl(CEQueueStatus queueStatus, CeConfiguration ceConfiguration, CeWorkerFactory ceWorkerFactory, EnabledCeWorkerController enabledCeWorkerController) { + public CeTasksMBeanImpl(CEQueueStatus queueStatus, CeConfiguration ceConfiguration, CeWorkerFactory ceWorkerFactory, CeWorkerController CeWorkerController) { this.queueStatus = queueStatus; this.ceConfiguration = ceConfiguration; this.ceWorkerFactory = ceWorkerFactory; - this.enabledCeWorkerController = enabledCeWorkerController; + this.ceWorkerController = CeWorkerController; } @Override @@ -105,7 +105,7 @@ public class CeTasksMBeanImpl implements CeTasksMBean, Startable, SystemInfoSect public List getEnabledWorkerUuids() { Set workers = ceWorkerFactory.getWorkers(); return workers.stream() - .filter(enabledCeWorkerController::isEnabled) + .filter(ceWorkerController::isEnabled) .map(CeWorker::getUUID) .sorted() .collect(Collectors.toList()); diff --git a/server/sonar-ce/src/main/java/org/sonar/ce/queue/InternalCeQueueImpl.java b/server/sonar-ce/src/main/java/org/sonar/ce/queue/InternalCeQueueImpl.java index 17ca3b6f474..6cd77dddbbf 100644 --- a/server/sonar-ce/src/main/java/org/sonar/ce/queue/InternalCeQueueImpl.java +++ b/server/sonar-ce/src/main/java/org/sonar/ce/queue/InternalCeQueueImpl.java @@ -37,8 +37,8 @@ import org.sonar.ce.container.ComputeEngineStatus; import org.sonar.ce.monitoring.CEQueueStatus; import org.sonar.ce.task.CeTask; import org.sonar.ce.task.CeTaskResult; -import org.sonar.ce.task.projectanalysis.component.VisitException; import org.sonar.ce.task.TypedException; +import org.sonar.ce.task.projectanalysis.component.VisitException; import org.sonar.core.util.UuidFactory; import org.sonar.db.DbClient; import org.sonar.db.DbSession; diff --git a/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeProcessingSchedulerImpl.java b/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeProcessingSchedulerImpl.java index 2a25ae0cbe0..4ce1f7210ec 100644 --- a/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeProcessingSchedulerImpl.java +++ b/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeProcessingSchedulerImpl.java @@ -41,12 +41,12 @@ public class CeProcessingSchedulerImpl implements CeProcessingScheduler { private final long delayBetweenEnabledTasks; private final TimeUnit timeUnit; private final ChainingCallback[] chainingCallbacks; - private final EnabledCeWorkerController ceWorkerController; + private final CeWorkerController ceWorkerController; private final int gracefulStopTimeoutInMs; public CeProcessingSchedulerImpl(CeConfiguration ceConfiguration, CeProcessingSchedulerExecutorService processingExecutorService, CeWorkerFactory ceCeWorkerFactory, - EnabledCeWorkerController ceWorkerController) { + CeWorkerController ceWorkerController) { this.executorService = processingExecutorService; this.delayBetweenEnabledTasks = ceConfiguration.getQueuePollingDelay(); diff --git a/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeTaskInterrupterProvider.java b/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeTaskInterrupterProvider.java new file mode 100644 index 00000000000..e082dd276c4 --- /dev/null +++ b/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeTaskInterrupterProvider.java @@ -0,0 +1,46 @@ +/* + * SonarQube + * Copyright (C) 2009-2018 SonarSource SA + * mailto:info AT sonarsource DOT com + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 3 of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with this program; if not, write to the Free Software Foundation, + * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ +package org.sonar.ce.taskprocessor; + +import org.picocontainer.injectors.ProviderAdapter; +import org.sonar.api.config.Configuration; +import org.sonar.api.utils.System2; +import org.sonar.ce.task.CeTaskInterrupter; + +import static com.google.common.base.Preconditions.checkState; + +public class CeTaskInterrupterProvider extends ProviderAdapter { + private static final String PROPERTY_CE_TASK_TIMEOUT = "sonar.ce.task.timeoutSeconds"; + + private CeTaskInterrupter instance; + + public CeTaskInterrupter provide(Configuration configuration, CeWorkerController ceWorkerController, System2 system2) { + if (instance == null) { + instance = configuration.getLong(PROPERTY_CE_TASK_TIMEOUT) + .filter(timeOutInSeconds -> { + checkState(timeOutInSeconds >= 1, "The property '%s' must be a long value >= 1. Got '%s'", PROPERTY_CE_TASK_TIMEOUT, timeOutInSeconds); + return true; + }) + .map(timeOutInSeconds -> (CeTaskInterrupter) new TimeoutCeTaskInterrupter(timeOutInSeconds * 1_000L, ceWorkerController, system2)) + .orElseGet(SimpleCeTaskInterrupter::new); + } + return instance; + } +} diff --git a/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeTaskInterrupterWorkerExecutionListener.java b/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeTaskInterrupterWorkerExecutionListener.java new file mode 100644 index 00000000000..4cb42c123f4 --- /dev/null +++ b/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeTaskInterrupterWorkerExecutionListener.java @@ -0,0 +1,44 @@ +/* + * SonarQube + * Copyright (C) 2009-2018 SonarSource SA + * mailto:info AT sonarsource DOT com + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 3 of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with this program; if not, write to the Free Software Foundation, + * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ +package org.sonar.ce.taskprocessor; + +import javax.annotation.Nullable; +import org.sonar.ce.task.CeTask; +import org.sonar.ce.task.CeTaskInterrupter; +import org.sonar.ce.task.CeTaskResult; +import org.sonar.db.ce.CeActivityDto; + +public class CeTaskInterrupterWorkerExecutionListener implements CeWorker.ExecutionListener { + private final CeTaskInterrupter interrupter; + + public CeTaskInterrupterWorkerExecutionListener(CeTaskInterrupter interrupter) { + this.interrupter = interrupter; + } + + @Override + public void onStart(CeTask ceTask) { + interrupter.onStart(ceTask); + } + + @Override + public void onEnd(CeTask ceTask, CeActivityDto.Status status, @Nullable CeTaskResult taskResult, @Nullable Throwable error) { + interrupter.onEnd(ceTask); + } +} diff --git a/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeTaskProcessorModule.java b/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeTaskProcessorModule.java index 4f3b0ba6989..bbd351a844b 100644 --- a/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeTaskProcessorModule.java +++ b/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeTaskProcessorModule.java @@ -29,9 +29,13 @@ public class CeTaskProcessorModule extends Module { CeTaskProcessorRepositoryImpl.class, CeLoggingWorkerExecutionListener.class, ReportAnalysisFailureNotificationExecutionListener.class, + new CeTaskInterrupterProvider(), + CeTaskInterrupterWorkerExecutionListener.class, CeWorkerFactoryImpl.class, - EnabledCeWorkerControllerImpl.class, + CeWorkerControllerImpl.class, CeProcessingSchedulerExecutorServiceImpl.class, - CeProcessingSchedulerImpl.class); + CeProcessingSchedulerImpl.class + + ); } } diff --git a/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeWorker.java b/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeWorker.java index 74b416fd0bb..8bfeffc4017 100644 --- a/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeWorker.java +++ b/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeWorker.java @@ -19,6 +19,7 @@ */ package org.sonar.ce.taskprocessor; +import java.util.Optional; import java.util.concurrent.Callable; import javax.annotation.Nullable; import org.sonar.ce.queue.CeQueue; @@ -32,6 +33,7 @@ import org.sonar.db.ce.CeActivityDto; * {@code false} otherwise. */ public interface CeWorker extends Callable { + enum Result { /** Worker is disabled */ DISABLED, @@ -51,6 +53,16 @@ public interface CeWorker extends Callable { */ String getUUID(); + /** + * @return {@code true} if this CeWorker currently being executed by the specified {@link Thread}. + */ + boolean isExecutedBy(Thread thread); + + /** + * @return the {@link CeTask} currently being executed by this worker, if any. + */ + Optional getCurrentTask(); + /** * Classes implementing will be called a task start and finishes executing. * All classes implementing this interface are guaranted to be called for each event, even if another implementation diff --git a/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/EnabledCeWorkerController.java b/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeWorkerController.java similarity index 70% rename from server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/EnabledCeWorkerController.java rename to server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeWorkerController.java index e11063d311c..df0d32820bc 100644 --- a/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/EnabledCeWorkerController.java +++ b/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeWorkerController.java @@ -19,25 +19,40 @@ */ package org.sonar.ce.taskprocessor; +import java.util.Optional; + /** * This class is responsible of knowing/deciding which {@link CeWorker} is enabled and should actually try and find a * task to process. */ -public interface EnabledCeWorkerController { +public interface CeWorkerController { interface ProcessingRecorderHook extends AutoCloseable { + /** + * Override to not declare any exception thrown. + */ + @Override + void close(); } + /** + * Registers to the controller that the specified {@link CeWorker} + */ + ProcessingRecorderHook registerProcessingFor(CeWorker ceWorker); + /** * Returns {@code true} if the specified {@link CeWorker} is enabled */ boolean isEnabled(CeWorker ceWorker); - ProcessingRecorderHook registerProcessingFor(CeWorker ceWorker); + /** + * @return the {@link CeWorker} running in the specified {@link Thread}, if any. + */ + Optional getCeWorkerIn(Thread thread); /** - * Whether at least one worker is being processed a task or not. - * Returns {@code false} when all workers are waiting for tasks - * or are being stopped. + * Whether at least one worker is processing a task or not. + * + * @return {@code false} when all workers are waiting for tasks or are being stopped. */ boolean hasAtLeastOneProcessingWorker(); } diff --git a/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/EnabledCeWorkerControllerImpl.java b/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeWorkerControllerImpl.java similarity index 73% rename from server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/EnabledCeWorkerControllerImpl.java rename to server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeWorkerControllerImpl.java index 7370647a589..114be1e47f1 100644 --- a/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/EnabledCeWorkerControllerImpl.java +++ b/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeWorkerControllerImpl.java @@ -19,19 +19,20 @@ */ package org.sonar.ce.taskprocessor; +import java.util.Optional; import java.util.concurrent.ConcurrentHashMap; import org.sonar.api.utils.log.Loggers; import org.sonar.ce.configuration.CeConfiguration; -public class EnabledCeWorkerControllerImpl implements EnabledCeWorkerController { - private final ConcurrentHashMap map = new ConcurrentHashMap<>(); +public class CeWorkerControllerImpl implements CeWorkerController { + private final ConcurrentHashMap workerStatuses = new ConcurrentHashMap<>(); private final CeConfiguration ceConfiguration; enum Status { PROCESSING, PAUSED } - public EnabledCeWorkerControllerImpl(CeConfiguration ceConfiguration) { + public CeWorkerControllerImpl(CeConfiguration ceConfiguration) { this.ceConfiguration = ceConfiguration; logEnabledWorkerCount(); } @@ -39,10 +40,17 @@ public class EnabledCeWorkerControllerImpl implements EnabledCeWorkerController private void logEnabledWorkerCount() { int workerCount = ceConfiguration.getWorkerCount(); if (workerCount > 1) { - Loggers.get(EnabledCeWorkerController.class).info("Compute Engine will use {} concurrent workers to process tasks", workerCount); + Loggers.get(CeWorkerController.class).info("Compute Engine will use {} concurrent workers to process tasks", workerCount); } } + @Override + public Optional getCeWorkerIn(Thread thread) { + return workerStatuses.keySet().stream() + .filter(t -> t.isExecutedBy(thread)) + .findFirst(); + } + @Override public ProcessingRecorderHook registerProcessingFor(CeWorker ceWorker) { return new ProcessingRecorderHookImpl(ceWorker); @@ -50,7 +58,7 @@ public class EnabledCeWorkerControllerImpl implements EnabledCeWorkerController @Override public boolean hasAtLeastOneProcessingWorker() { - return map.entrySet().stream().anyMatch(e -> e.getValue() == Status.PROCESSING); + return workerStatuses.entrySet().stream().anyMatch(e -> e.getValue() == Status.PROCESSING); } /** @@ -69,12 +77,12 @@ public class EnabledCeWorkerControllerImpl implements EnabledCeWorkerController private ProcessingRecorderHookImpl(CeWorker ceWorker) { this.ceWorker = ceWorker; - map.put(this.ceWorker, Status.PROCESSING); + workerStatuses.put(this.ceWorker, Status.PROCESSING); } @Override public void close() { - map.put(ceWorker, Status.PAUSED); + workerStatuses.put(ceWorker, Status.PAUSED); } } } diff --git a/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeWorkerFactoryImpl.java b/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeWorkerFactoryImpl.java index 5ddff907822..6a508b5118d 100644 --- a/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeWorkerFactoryImpl.java +++ b/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeWorkerFactoryImpl.java @@ -30,7 +30,7 @@ public class CeWorkerFactoryImpl implements CeWorkerFactory { private final UuidFactory uuidFactory; private final InternalCeQueue queue; private final CeTaskProcessorRepository taskProcessorRepository; - private final EnabledCeWorkerController enabledCeWorkerController; + private final CeWorkerController ceWorkerController; private final CeWorker.ExecutionListener[] executionListeners; private Set ceWorkers = Collections.emptySet(); @@ -38,24 +38,24 @@ public class CeWorkerFactoryImpl implements CeWorkerFactory { * Used by Pico when there is no {@link CeWorker.ExecutionListener} in the container. */ public CeWorkerFactoryImpl(InternalCeQueue queue, CeTaskProcessorRepository taskProcessorRepository, - UuidFactory uuidFactory, EnabledCeWorkerController enabledCeWorkerController) { - this(queue, taskProcessorRepository, uuidFactory, enabledCeWorkerController, new CeWorker.ExecutionListener[0]); + UuidFactory uuidFactory, CeWorkerController ceWorkerController) { + this(queue, taskProcessorRepository, uuidFactory, ceWorkerController, new CeWorker.ExecutionListener[0]); } public CeWorkerFactoryImpl(InternalCeQueue queue, CeTaskProcessorRepository taskProcessorRepository, - UuidFactory uuidFactory, EnabledCeWorkerController enabledCeWorkerController, + UuidFactory uuidFactory, CeWorkerController ceWorkerController, CeWorker.ExecutionListener[] executionListeners) { this.queue = queue; this.taskProcessorRepository = taskProcessorRepository; this.uuidFactory = uuidFactory; - this.enabledCeWorkerController = enabledCeWorkerController; + this.ceWorkerController = ceWorkerController; this.executionListeners = executionListeners; } @Override public CeWorker create(int ordinal) { String uuid = uuidFactory.create(); - CeWorkerImpl ceWorker = new CeWorkerImpl(ordinal, uuid, queue, taskProcessorRepository, enabledCeWorkerController, executionListeners); + CeWorkerImpl ceWorker = new CeWorkerImpl(ordinal, uuid, queue, taskProcessorRepository, ceWorkerController, executionListeners); ceWorkers = Stream.concat(ceWorkers.stream(), Stream.of(ceWorker)).collect(MoreCollectors.toSet(ceWorkers.size() + 1)); return ceWorker; } diff --git a/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeWorkerImpl.java b/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeWorkerImpl.java index f2c775a120a..8bb2c17023e 100644 --- a/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeWorkerImpl.java +++ b/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeWorkerImpl.java @@ -23,7 +23,9 @@ import java.util.Arrays; import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; +import java.util.function.Function; import java.util.function.Supplier; import javax.annotation.CheckForNull; import javax.annotation.Nullable; @@ -32,6 +34,7 @@ import org.sonar.api.utils.log.Logger; import org.sonar.api.utils.log.Loggers; import org.sonar.ce.queue.InternalCeQueue; import org.sonar.ce.task.CeTask; +import org.sonar.ce.task.CeTaskInterruptedException; import org.sonar.ce.task.CeTaskResult; import org.sonar.ce.task.taskprocessor.CeTaskProcessor; import org.sonar.core.util.logs.Profiler; @@ -39,9 +42,11 @@ import org.sonar.db.ce.CeActivityDto; import static com.google.common.base.Preconditions.checkArgument; import static java.lang.String.format; +import static org.sonar.ce.task.CeTaskInterruptedException.isTaskInterruptedException; import static org.sonar.ce.taskprocessor.CeWorker.Result.DISABLED; import static org.sonar.ce.taskprocessor.CeWorker.Result.NO_TASK; import static org.sonar.ce.taskprocessor.CeWorker.Result.TASK_PROCESSED; +import static org.sonar.db.ce.CeActivityDto.Status.FAILED; public class CeWorkerImpl implements CeWorker { @@ -51,18 +56,19 @@ public class CeWorkerImpl implements CeWorker { private final String uuid; private final InternalCeQueue queue; private final CeTaskProcessorRepository taskProcessorRepository; - private final EnabledCeWorkerController enabledCeWorkerController; + private final CeWorkerController ceWorkerController; private final List listeners; + private final AtomicReference runningState = new AtomicReference<>(); public CeWorkerImpl(int ordinal, String uuid, InternalCeQueue queue, CeTaskProcessorRepository taskProcessorRepository, - EnabledCeWorkerController enabledCeWorkerController, + CeWorkerController ceWorkerController, ExecutionListener... listeners) { this.ordinal = checkOrdinal(ordinal); this.uuid = uuid; this.queue = queue; this.taskProcessorRepository = taskProcessorRepository; - this.enabledCeWorkerController = enabledCeWorkerController; + this.ceWorkerController = ceWorkerController; this.listeners = Arrays.asList(listeners); } @@ -73,22 +79,71 @@ public class CeWorkerImpl implements CeWorker { @Override public Result call() { - return withCustomizedThreadName(this::findAndProcessTask); + try (TrackRunningState trackRunningState = new TrackRunningState(this::findAndProcessTask)) { + return trackRunningState.get(); + } } - private T withCustomizedThreadName(Supplier supplier) { - Thread currentThread = Thread.currentThread(); - String oldName = currentThread.getName(); - try { - currentThread.setName(String.format("Worker %s (UUID=%s) on %s", getOrdinal(), getUUID(), oldName)); - return supplier.get(); - } finally { - currentThread.setName(oldName); + @Override + public int getOrdinal() { + return ordinal; + } + + @Override + public String getUUID() { + return uuid; + } + + @Override + public boolean isExecutedBy(Thread thread) { + return Optional.ofNullable(runningState.get()) + .filter(state -> state.runningThread.equals(thread)) + .isPresent(); + } + + @Override + public Optional getCurrentTask() { + return Optional.ofNullable(runningState.get()) + .flatMap(RunningState::getTask); + } + + private class TrackRunningState implements AutoCloseable, Supplier { + private final RunningState localRunningState; + private final Function delegate; + private final String oldName; + + private TrackRunningState(Function delegate) { + Thread currentThread = Thread.currentThread(); + localRunningState = new RunningState(currentThread); + if (!runningState.compareAndSet(null, localRunningState)) { + LOG.warn("Worker {} (UUID=%s) starts executing with new Thread {} while running state isn't null. " + + "Forcefully updating Workers's running state to new Thread.", + getOrdinal(), getUUID(), currentThread); + runningState.set(localRunningState); + } + this.delegate = delegate; + this.oldName = currentThread.getName(); + } + + @Override + public Result get() { + localRunningState.runningThread.setName(String.format("Worker %s (UUID=%s) on %s", getOrdinal(), getUUID(), oldName)); + return delegate.apply(localRunningState); + } + + @Override + public void close() { + localRunningState.runningThread.setName(oldName); + if (!runningState.compareAndSet(localRunningState, null)) { + LOG.warn("Worker {} (UUID=%s) ending execution in Thread {} while running state has already changed." + + " Keeping this new state.", + getOrdinal(), getUUID(), localRunningState.runningThread); + } } } - private Result findAndProcessTask() { - if (!enabledCeWorkerController.isEnabled(this)) { + private Result findAndProcessTask(RunningState localRunningState) { + if (!ceWorkerController.isEnabled(this)) { return DISABLED; } Optional ceTask = tryAndFindTaskToExecute(); @@ -96,8 +151,9 @@ public class CeWorkerImpl implements CeWorker { return NO_TASK; } - try (EnabledCeWorkerController.ProcessingRecorderHook processing = enabledCeWorkerController.registerProcessingFor(this)) { - executeTask(ceTask.get()); + try (CeWorkerController.ProcessingRecorderHook processing = ceWorkerController.registerProcessingFor(this); + ExecuteTask executeTask = new ExecuteTask(localRunningState, ceTask.get())) { + executeTask.run(); } catch (Exception e) { LOG.error(format("An error occurred while executing task with uuid '%s'", ceTask.get().getUuid()), e); } @@ -113,68 +169,95 @@ public class CeWorkerImpl implements CeWorker { return Optional.empty(); } - @Override - public int getOrdinal() { - return ordinal; - } + private final class ExecuteTask implements Runnable, AutoCloseable { + private final CeTask task; + private final RunningState localRunningState; + private final Profiler ceProfiler; + private CeActivityDto.Status status = FAILED; + private CeTaskResult taskResult = null; + private Throwable error = null; - @Override - public String getUUID() { - return uuid; - } + private ExecuteTask(RunningState localRunningState, CeTask task) { + this.task = task; + this.localRunningState = localRunningState; + this.ceProfiler = startLogProfiler(task); + } - private void executeTask(CeTask task) { - callListeners(t -> t.onStart(task)); - Profiler ceProfiler = startLogProfiler(task); + @Override + public void run() { + beforeExecute(); + executeTask(); + } - CeActivityDto.Status status = CeActivityDto.Status.FAILED; - CeTaskResult taskResult = null; - Throwable error = null; - try { - // TODO delegate the message to the related task processor, according to task type - Optional taskProcessor = taskProcessorRepository.getForCeTask(task); - if (taskProcessor.isPresent()) { - taskResult = taskProcessor.get().process(task); - status = CeActivityDto.Status.SUCCESS; - } else { - LOG.error("No CeTaskProcessor is defined for task of type {}. Plugin configuration may have changed", task.getType()); - status = CeActivityDto.Status.FAILED; + @Override + public void close() { + afterExecute(); + } + + private void beforeExecute() { + localRunningState.setTask(task); + callListeners(t -> t.onStart(task)); + } + + private void executeTask() { + try { + // TODO delegate the message to the related task processor, according to task type + Optional taskProcessor = taskProcessorRepository.getForCeTask(task); + if (taskProcessor.isPresent()) { + taskResult = taskProcessor.get().process(task); + status = CeActivityDto.Status.SUCCESS; + } else { + LOG.error("No CeTaskProcessor is defined for task of type {}. Plugin configuration may have changed", task.getType()); + status = FAILED; + } + } catch (MessageException e) { + // error + error = e; + } catch (Throwable e) { + Optional taskInterruptedException = isTaskInterruptedException(e); + if (taskInterruptedException.isPresent()) { + LOG.trace("Task interrupted", e); + CeTaskInterruptedException exception = taskInterruptedException.get(); + CeActivityDto.Status interruptionStatus = exception.getStatus(); + status = interruptionStatus; + error = (interruptionStatus == FAILED ? exception : null); + } else { + // error + LOG.error("Failed to execute task {}", task.getUuid(), e); + error = e; + } } - } catch (MessageException e) { - // error - error = e; - } catch (Throwable e) { - // error - LOG.error("Failed to execute task {}", task.getUuid(), e); - error = e; - } finally { + } + + private void afterExecute() { + localRunningState.setTask(null); finalizeTask(task, ceProfiler, status, taskResult, error); } - } - private void callListeners(Consumer call) { - listeners.forEach(listener -> { + private void finalizeTask(CeTask task, Profiler ceProfiler, CeActivityDto.Status status, + @Nullable CeTaskResult taskResult, @Nullable Throwable error) { try { - call.accept(listener); - } catch (Throwable t) { - LOG.error(format("Call to listener %s failed.", listener.getClass().getSimpleName()), t); + queue.remove(task, status, taskResult, error); + } catch (Exception e) { + if (error != null) { + e.addSuppressed(error); + } + LOG.error(format("Failed to finalize task with uuid '%s' and persist its state to db", task.getUuid()), e); + } finally { + // finalize + stopLogProfiler(ceProfiler, status); + callListeners(t -> t.onEnd(task, status, taskResult, error)); } - }); - } + } - private void finalizeTask(CeTask task, Profiler ceProfiler, CeActivityDto.Status status, - @Nullable CeTaskResult taskResult, @Nullable Throwable error) { - try { - queue.remove(task, status, taskResult, error); - } catch (Exception e) { - if (error != null) { - e.addSuppressed(error); - } - LOG.error(format("Failed to finalize task with uuid '%s' and persist its state to db", task.getUuid()), e); - } finally { - // finalize - stopLogProfiler(ceProfiler, status); - callListeners(t -> t.onEnd(task, status, taskResult, error)); + private void callListeners(Consumer call) { + listeners.forEach(listener -> { + try { + call.accept(listener); + } catch (Throwable t) { + LOG.error(format("Call to listener %s failed.", listener.getClass().getSimpleName()), t); + } + }); } } @@ -210,4 +293,22 @@ public class CeWorkerImpl implements CeWorker { profiler.addContext("status", status.name()); profiler.stopInfo("Executed task"); } + + private static final class RunningState { + private final Thread runningThread; + private CeTask task; + + private RunningState(Thread runningThread) { + this.runningThread = runningThread; + } + + public Optional getTask() { + return Optional.ofNullable(task); + } + + public void setTask(@Nullable CeTask task) { + this.task = task; + } + } + } diff --git a/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/SimpleCeTaskInterrupter.java b/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/SimpleCeTaskInterrupter.java new file mode 100644 index 00000000000..0847cbf2248 --- /dev/null +++ b/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/SimpleCeTaskInterrupter.java @@ -0,0 +1,50 @@ +/* + * SonarQube + * Copyright (C) 2009-2018 SonarSource SA + * mailto:info AT sonarsource DOT com + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 3 of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with this program; if not, write to the Free Software Foundation, + * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ +package org.sonar.ce.taskprocessor; + +import org.sonar.ce.task.CeTask; +import org.sonar.ce.task.CeTaskCanceledException; +import org.sonar.ce.task.CeTaskInterruptedException; +import org.sonar.ce.task.CeTaskInterrupter; + +/** + * An implementation of {@link CeTaskInterrupter} which will only interrupt the processing if the current thread + * has been interrupted. + * + * @see Thread#isInterrupted() + */ +public class SimpleCeTaskInterrupter implements CeTaskInterrupter { + @Override + public void check(Thread currentThread) throws CeTaskInterruptedException { + if (currentThread.isInterrupted()) { + throw new CeTaskCanceledException(currentThread); + } + } + + @Override + public void onStart(CeTask ceTask) { + // nothing to do + } + + @Override + public void onEnd(CeTask ceTask) { + // nothing to do + } +} diff --git a/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/TimeoutCeTaskInterrupter.java b/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/TimeoutCeTaskInterrupter.java new file mode 100644 index 00000000000..1bebdee8819 --- /dev/null +++ b/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/TimeoutCeTaskInterrupter.java @@ -0,0 +1,103 @@ +/* + * SonarQube + * Copyright (C) 2009-2018 SonarSource SA + * mailto:info AT sonarsource DOT com + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 3 of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with this program; if not, write to the Free Software Foundation, + * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ +package org.sonar.ce.taskprocessor; + +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; +import org.sonar.api.utils.System2; +import org.sonar.api.utils.log.Loggers; +import org.sonar.ce.task.CeTask; +import org.sonar.ce.task.CeTaskInterruptedException; +import org.sonar.ce.task.CeTaskTimeoutException; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkState; +import static java.lang.String.format; + +/** + * An implementation of {@link org.sonar.ce.task.CeTaskInterrupter} which interrupts the processing of the task + * if: + *
    + *
  • the thread has been interrupted
  • + *
  • it's been running for more than a certain, configurable, amount of time
  • + *
+ */ +public class TimeoutCeTaskInterrupter extends SimpleCeTaskInterrupter { + private final long taskTimeoutThreshold; + private final CeWorkerController ceWorkerController; + private final System2 system2; + private final Map startTimestampByCeTaskUuid = new HashMap<>(); + + public TimeoutCeTaskInterrupter(long taskTimeoutThreshold, CeWorkerController ceWorkerController, System2 system2) { + checkArgument(taskTimeoutThreshold >= 1, "threshold must be >= 1"); + Loggers.get(TimeoutCeTaskInterrupter.class).info("Compute Engine Task timeout enabled: {} ms", taskTimeoutThreshold); + + this.taskTimeoutThreshold = taskTimeoutThreshold; + this.ceWorkerController = ceWorkerController; + this.system2 = system2; + } + + @Override + public void check(Thread currentThread) throws CeTaskInterruptedException { + super.check(currentThread); + + computeTimeOutOf(taskOf(currentThread)) + .ifPresent(timeout -> { + throw new CeTaskTimeoutException(format("Execution of task timed out after %s ms", timeout)); + }); + } + + private Optional computeTimeOutOf(CeTask ceTask) { + Long startTimestamp = startTimestampByCeTaskUuid.get(ceTask.getUuid()); + checkState(startTimestamp != null, "No start time recorded for task %s", ceTask.getUuid()); + + long duration = system2.now() - startTimestamp; + return Optional.of(duration) + .filter(t -> t > taskTimeoutThreshold); + } + + private CeTask taskOf(Thread currentThread) { + return ceWorkerController.getCeWorkerIn(currentThread) + .flatMap(CeWorker::getCurrentTask) + .orElseThrow(() -> new IllegalStateException(format("Could not find the CeTask being executed in thread '%s'", currentThread.getName()))); + } + + @Override + public void onStart(CeTask ceTask) { + long now = system2.now(); + Long existingTimestamp = startTimestampByCeTaskUuid.put(ceTask.getUuid(), now); + if (existingTimestamp != null) { + Loggers.get(TimeoutCeTaskInterrupter.class) + .warn("Notified of start of execution of task %s but start had already been recorded at %s. Recording new start at %s", + ceTask.getUuid(), existingTimestamp, now); + } + } + + @Override + public void onEnd(CeTask ceTask) { + Long startTimestamp = startTimestampByCeTaskUuid.remove(ceTask.getUuid()); + if (startTimestamp == null) { + Loggers.get(TimeoutCeTaskInterrupter.class) + .warn("Notified of end of execution of task %s but start wasn't recorded", ceTask.getUuid()); + } + } + +} diff --git a/server/sonar-ce/src/test/java/org/sonar/ce/container/ComputeEngineContainerImplTest.java b/server/sonar-ce/src/test/java/org/sonar/ce/container/ComputeEngineContainerImplTest.java index 046bfd3d8c0..a05b5ebe74b 100644 --- a/server/sonar-ce/src/test/java/org/sonar/ce/container/ComputeEngineContainerImplTest.java +++ b/server/sonar-ce/src/test/java/org/sonar/ce/container/ComputeEngineContainerImplTest.java @@ -103,7 +103,7 @@ public class ComputeEngineContainerImplTest { + 3 // content of CeHttpModule + 3 // content of CeTaskCommonsModule + 4 // content of ProjectAnalysisTaskModule - + 7 // content of CeTaskProcessorModule + + 9 // content of CeTaskProcessorModule + 3 // content of ReportAnalysisFailureNotificationModule + 3 // CeCleaningModule + its content + 4 // WebhookModule diff --git a/server/sonar-ce/src/test/java/org/sonar/ce/monitoring/CeTasksMBeanImplTest.java b/server/sonar-ce/src/test/java/org/sonar/ce/monitoring/CeTasksMBeanImplTest.java index 18cc1c0462a..53c83fc0138 100644 --- a/server/sonar-ce/src/test/java/org/sonar/ce/monitoring/CeTasksMBeanImplTest.java +++ b/server/sonar-ce/src/test/java/org/sonar/ce/monitoring/CeTasksMBeanImplTest.java @@ -35,8 +35,8 @@ import org.apache.commons.lang.RandomStringUtils; import org.junit.Test; import org.sonar.ce.configuration.CeConfiguration; import org.sonar.ce.taskprocessor.CeWorker; +import org.sonar.ce.taskprocessor.CeWorkerController; import org.sonar.ce.taskprocessor.CeWorkerFactory; -import org.sonar.ce.taskprocessor.EnabledCeWorkerController; import org.sonar.core.util.stream.MoreCollectors; import org.sonar.process.systeminfo.protobuf.ProtobufSystemInfo; @@ -61,8 +61,8 @@ public class CeTasksMBeanImplTest { }) .collect(MoreCollectors.toSet()); - private EnabledCeWorkerController enabledCeWorkerController = mock(EnabledCeWorkerController.class); - private CeTasksMBeanImpl underTest = new CeTasksMBeanImpl(new DumbCEQueueStatus(), new DumbCeConfiguration(), new DumbCeWorkerFactory(), enabledCeWorkerController); + private CeWorkerController ceWorkerController = mock(CeWorkerController.class); + private CeTasksMBeanImpl underTest = new CeTasksMBeanImpl(new DumbCEQueueStatus(), new DumbCeConfiguration(), new DumbCeWorkerFactory(), ceWorkerController); @Test public void register_and_unregister() throws Exception { @@ -124,9 +124,9 @@ public class CeTasksMBeanImplTest { for (CeWorker worker : WORKERS) { if (i < enabledWorkerCount) { enabledWorkers[i] = worker; - when(enabledCeWorkerController.isEnabled(worker)).thenReturn(true); + when(ceWorkerController.isEnabled(worker)).thenReturn(true); } else { - when(enabledCeWorkerController.isEnabled(worker)).thenReturn(false); + when(ceWorkerController.isEnabled(worker)).thenReturn(false); } i++; } diff --git a/server/sonar-ce/src/test/java/org/sonar/ce/taskprocessor/CeProcessingSchedulerImplTest.java b/server/sonar-ce/src/test/java/org/sonar/ce/taskprocessor/CeProcessingSchedulerImplTest.java index 9c1f68683e1..2fdbfdf9029 100644 --- a/server/sonar-ce/src/test/java/org/sonar/ce/taskprocessor/CeProcessingSchedulerImplTest.java +++ b/server/sonar-ce/src/test/java/org/sonar/ce/taskprocessor/CeProcessingSchedulerImplTest.java @@ -78,7 +78,7 @@ public class CeProcessingSchedulerImplTest { private SchedulerCall regularDelayedPoll = new SchedulerCall(ceWorker, 2000L, MILLISECONDS); private SchedulerCall extendedDelayedPoll = new SchedulerCall(ceWorker, 30000L, MILLISECONDS); private SchedulerCall notDelayedPoll = new SchedulerCall(ceWorker); - private EnabledCeWorkerController ceWorkerController = new EnabledCeWorkerControllerImpl(ceConfiguration); + private CeWorkerController ceWorkerController = new CeWorkerControllerImpl(ceConfiguration); private CeProcessingSchedulerImpl underTest = new CeProcessingSchedulerImpl(ceConfiguration, processingExecutorService, ceWorkerFactory, ceWorkerController); diff --git a/server/sonar-ce/src/test/java/org/sonar/ce/taskprocessor/CeTaskInterrupterProviderTest.java b/server/sonar-ce/src/test/java/org/sonar/ce/taskprocessor/CeTaskInterrupterProviderTest.java new file mode 100644 index 00000000000..05062eb1c76 --- /dev/null +++ b/server/sonar-ce/src/test/java/org/sonar/ce/taskprocessor/CeTaskInterrupterProviderTest.java @@ -0,0 +1,127 @@ +/* + * SonarQube + * Copyright (C) 2009-2018 SonarSource SA + * mailto:info AT sonarsource DOT com + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 3 of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with this program; if not, write to the Free Software Foundation, + * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ +package org.sonar.ce.taskprocessor; + +import java.lang.reflect.Field; +import java.util.Random; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.sonar.api.config.internal.MapSettings; +import org.sonar.api.utils.System2; +import org.sonar.ce.task.CeTaskInterrupter; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.Mockito.mock; + +public class CeTaskInterrupterProviderTest { + @Rule + public ExpectedException expectedException = ExpectedException.none(); + + private MapSettings settings = new MapSettings(); + private CeWorkerController ceWorkerController = mock(CeWorkerController.class); + private System2 system2 = mock(System2.class); + private CeTaskInterrupterProvider underTest = new CeTaskInterrupterProvider(); + + @Test + public void provide_returns_a_SimpleCeTaskInterrupter_instance_if_configuration_is_empty() { + CeTaskInterrupter instance = underTest.provide(settings.asConfig(), ceWorkerController, system2); + + assertThat(instance) + .isInstanceOf(SimpleCeTaskInterrupter.class); + } + + @Test + public void provide_always_return_the_same_SimpleCeTaskInterrupter_instance() { + CeTaskInterrupter instance = underTest.provide(settings.asConfig(), ceWorkerController, system2); + + assertThat(instance) + .isSameAs(underTest.provide(settings.asConfig(), ceWorkerController, system2)) + .isSameAs(underTest.provide(new MapSettings().asConfig(), ceWorkerController, system2)); + } + + @Test + public void provide_returns_a_TimeoutCeTaskInterrupter_instance_if_property_taskTimeout_has_a_value() throws IllegalAccessException, NoSuchFieldException { + int timeout = 1 + new Random().nextInt(2222); + settings.setProperty("sonar.ce.task.timeoutSeconds", timeout); + + CeTaskInterrupter instance = underTest.provide(settings.asConfig(), ceWorkerController, system2); + + assertThat(instance) + .isInstanceOf(TimeoutCeTaskInterrupter.class); + + assertThat(readField(instance, "taskTimeoutThreshold")) + .isEqualTo(timeout * 1_000L); + assertThat(readField(instance, "ceWorkerController")) + .isSameAs(ceWorkerController); + assertThat(readField(instance, "system2")) + .isSameAs(system2); + } + + @Test + public void provide_fails_with_ISE_if_property_is_not_a_long() { + settings.setProperty("sonar.ce.task.timeoutSeconds", "foo"); + + expectedException.expect(IllegalStateException.class); + expectedException.expectMessage("The property 'sonar.ce.task.timeoutSeconds' is not an long value: For input string: \"foo\""); + + underTest.provide(settings.asConfig(), ceWorkerController, system2); + } + + @Test + public void provide_fails_with_ISE_if_property_is_zero() { + settings.setProperty("sonar.ce.task.timeoutSeconds", "0"); + + expectedException.expect(IllegalStateException.class); + expectedException.expectMessage("The property 'sonar.ce.task.timeoutSeconds' must be a long value >= 1. Got '0'"); + + underTest.provide(settings.asConfig(), ceWorkerController, system2); + } + + @Test + public void provide_fails_with_ISE_if_property_is_less_than_zero() { + int negativeValue = -(1 + new Random().nextInt(1_212)); + settings.setProperty("sonar.ce.task.timeoutSeconds", negativeValue); + + expectedException.expect(IllegalStateException.class); + expectedException.expectMessage("The property 'sonar.ce.task.timeoutSeconds' must be a long value >= 1. Got '" + negativeValue + "'"); + + underTest.provide(settings.asConfig(), ceWorkerController, system2); + } + + @Test + public void provide_always_return_the_same_TimeoutCeTaskInterrupter_instance() { + int timeout = 1 + new Random().nextInt(2222); + settings.setProperty("sonar.ce.task.timeoutSeconds", timeout); + + CeTaskInterrupter instance = underTest.provide(settings.asConfig(), ceWorkerController, system2); + + assertThat(instance) + .isSameAs(underTest.provide(settings.asConfig(), ceWorkerController, system2)) + .isSameAs(underTest.provide(new MapSettings().setProperty("sonar.ce.task.timeoutSeconds", 999).asConfig(), ceWorkerController, system2)); + } + + private static Object readField(CeTaskInterrupter instance, String fieldName) throws NoSuchFieldException, IllegalAccessException { + Class clazz = instance.getClass(); + Field timeoutField = clazz.getDeclaredField(fieldName); + timeoutField.setAccessible(true); + return timeoutField.get(instance); + } +} diff --git a/server/sonar-ce/src/test/java/org/sonar/ce/taskprocessor/CeTaskInterrupterWorkerExecutionListenerTest.java b/server/sonar-ce/src/test/java/org/sonar/ce/taskprocessor/CeTaskInterrupterWorkerExecutionListenerTest.java new file mode 100644 index 00000000000..a2e12caaf77 --- /dev/null +++ b/server/sonar-ce/src/test/java/org/sonar/ce/taskprocessor/CeTaskInterrupterWorkerExecutionListenerTest.java @@ -0,0 +1,56 @@ +/* + * SonarQube + * Copyright (C) 2009-2018 SonarSource SA + * mailto:info AT sonarsource DOT com + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 3 of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with this program; if not, write to the Free Software Foundation, + * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ +package org.sonar.ce.taskprocessor; + +import java.util.Random; +import org.junit.Test; +import org.sonar.ce.task.CeTask; +import org.sonar.ce.task.CeTaskInterrupter; +import org.sonar.db.ce.CeActivityDto; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.same; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; + +public class CeTaskInterrupterWorkerExecutionListenerTest { + private CeTaskInterrupter ceTaskInterrupter = mock(CeTaskInterrupter.class); + private CeTaskInterrupterWorkerExecutionListener underTest = new CeTaskInterrupterWorkerExecutionListener(ceTaskInterrupter); + + + @Test + public void onStart_delegates_to_ceTaskInterrupter_onStart() { + CeTask ceTask = mock(CeTask.class); + + underTest.onStart(ceTask); + + verify(ceTaskInterrupter).onStart(same(ceTask)); + } + + @Test + public void onEnd_delegates_to_ceTaskInterrupter_onEnd() { + CeTask ceTask = mock(CeTask.class); + CeActivityDto.Status randomStatus = CeActivityDto.Status.values()[new Random().nextInt(CeActivityDto.Status.values().length)]; + + underTest.onEnd(ceTask, randomStatus, null, null); + + verify(ceTaskInterrupter).onEnd(same(ceTask)); + } +} diff --git a/server/sonar-ce/src/test/java/org/sonar/ce/taskprocessor/CeTaskProcessorModuleTest.java b/server/sonar-ce/src/test/java/org/sonar/ce/taskprocessor/CeTaskProcessorModuleTest.java index f11d88125e8..2f90ceb80d9 100644 --- a/server/sonar-ce/src/test/java/org/sonar/ce/taskprocessor/CeTaskProcessorModuleTest.java +++ b/server/sonar-ce/src/test/java/org/sonar/ce/taskprocessor/CeTaskProcessorModuleTest.java @@ -21,7 +21,9 @@ package org.sonar.ce.taskprocessor; import org.junit.Test; import org.picocontainer.ComponentAdapter; +import org.sonar.api.config.internal.MapSettings; import org.sonar.ce.notification.ReportAnalysisFailureNotificationExecutionListener; +import org.sonar.ce.task.CeTaskInterrupter; import org.sonar.core.platform.ComponentContainer; import static org.assertj.core.api.Assertions.assertThat; @@ -52,4 +54,15 @@ public class CeTaskProcessorModuleTest { .map(ComponentAdapter::getComponentImplementation)) .contains(ReportAnalysisFailureNotificationExecutionListener.class); } + + @Test + public void defines_CeTaskInterrupterProvider_object() { + ComponentContainer container = new ComponentContainer(); + + underTest.configure(container); + + + assertThat(container.getPicoContainer().getComponentAdapter(CeTaskInterrupter.class)) + .isInstanceOf(CeTaskInterrupterProvider.class); + } } diff --git a/server/sonar-ce/src/test/java/org/sonar/ce/taskprocessor/EnabledCeWorkerControllerImplTest.java b/server/sonar-ce/src/test/java/org/sonar/ce/taskprocessor/CeWorkerControllerImplTest.java similarity index 57% rename from server/sonar-ce/src/test/java/org/sonar/ce/taskprocessor/EnabledCeWorkerControllerImplTest.java rename to server/sonar-ce/src/test/java/org/sonar/ce/taskprocessor/CeWorkerControllerImplTest.java index 54c90fb1b7f..14f1ac32a0a 100644 --- a/server/sonar-ce/src/test/java/org/sonar/ce/taskprocessor/EnabledCeWorkerControllerImplTest.java +++ b/server/sonar-ce/src/test/java/org/sonar/ce/taskprocessor/CeWorkerControllerImplTest.java @@ -27,10 +27,12 @@ import org.sonar.api.utils.log.LoggerLevel; import org.sonar.ce.configuration.CeConfigurationRule; import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.reset; import static org.mockito.Mockito.when; -public class EnabledCeWorkerControllerImplTest { +public class CeWorkerControllerImplTest { private Random random = new Random(); /** 1 <= workerCount <= 5 */ private int randomWorkerCount = 1 + random.nextInt(5); @@ -42,7 +44,7 @@ public class EnabledCeWorkerControllerImplTest { public LogTester logTester = new LogTester(); private CeWorker ceWorker = mock(CeWorker.class); - private EnabledCeWorkerControllerImpl underTest = new EnabledCeWorkerControllerImpl(ceConfigurationRule); + private CeWorkerControllerImpl underTest = new CeWorkerControllerImpl(ceConfigurationRule); @Test public void isEnabled_returns_true_if_worker_ordinal_is_less_than_CeConfiguration_workerCount() { @@ -76,7 +78,7 @@ public class EnabledCeWorkerControllerImplTest { ceConfigurationRule.setWorkerCount(1); logTester.clear(); - new EnabledCeWorkerControllerImpl(ceConfigurationRule); + new CeWorkerControllerImpl(ceConfigurationRule); assertThat(logTester.logs()).isEmpty(); } @@ -87,7 +89,7 @@ public class EnabledCeWorkerControllerImplTest { ceConfigurationRule.setWorkerCount(newWorkerCount); logTester.clear(); - new EnabledCeWorkerControllerImpl(ceConfigurationRule); + new CeWorkerControllerImpl(ceConfigurationRule); verifyInfoLog(newWorkerCount); } @@ -103,6 +105,65 @@ public class EnabledCeWorkerControllerImplTest { assertThat(underTest.isEnabled(ceWorker)).isTrue(); } + @Test + public void getCeWorkerIn_returns_empty_if_worker_is_unregistered_in_CeWorkerController() { + CeWorker ceWorker = mock(CeWorker.class); + Thread currentThread = Thread.currentThread(); + Thread otherThread = new Thread(); + + mockWorkerIsRunningOnNoThread(ceWorker); + assertThat(underTest.getCeWorkerIn(currentThread)).isEmpty(); + assertThat(underTest.getCeWorkerIn(otherThread)).isEmpty(); + + mockWorkerIsRunningOnThread(ceWorker, currentThread); + assertThat(underTest.getCeWorkerIn(currentThread)).isEmpty(); + assertThat(underTest.getCeWorkerIn(otherThread)).isEmpty(); + + mockWorkerIsRunningOnThread(ceWorker, otherThread); + assertThat(underTest.getCeWorkerIn(currentThread)).isEmpty(); + assertThat(underTest.getCeWorkerIn(otherThread)).isEmpty(); + } + + @Test + public void getCeWorkerIn_returns_empty_if_worker_registered_in_CeWorkerController_but_has_no_current_thread() { + CeWorker ceWorker = mock(CeWorker.class); + Thread currentThread = Thread.currentThread(); + Thread otherThread = new Thread(); + + underTest.registerProcessingFor(ceWorker); + + mockWorkerIsRunningOnNoThread(ceWorker); + assertThat(underTest.getCeWorkerIn(currentThread)).isEmpty(); + assertThat(underTest.getCeWorkerIn(otherThread)).isEmpty(); + } + + @Test + public void getCeWorkerIn_returns_thread_if_worker_registered_in_CeWorkerController_but_has_a_current_thread() { + CeWorker ceWorker = mock(CeWorker.class); + Thread currentThread = Thread.currentThread(); + Thread otherThread = new Thread(); + + underTest.registerProcessingFor(ceWorker); + + mockWorkerIsRunningOnThread(ceWorker, currentThread); + assertThat(underTest.getCeWorkerIn(currentThread)).contains(ceWorker); + assertThat(underTest.getCeWorkerIn(otherThread)).isEmpty(); + + mockWorkerIsRunningOnThread(ceWorker, otherThread); + assertThat(underTest.getCeWorkerIn(currentThread)).isEmpty(); + assertThat(underTest.getCeWorkerIn(otherThread)).contains(ceWorker); + } + + private void mockWorkerIsRunningOnThread(CeWorker ceWorker, Thread thread) { + reset(ceWorker); + when(ceWorker.isExecutedBy(thread)).thenReturn(true); + } + + private void mockWorkerIsRunningOnNoThread(CeWorker ceWorker) { + reset(ceWorker); + when(ceWorker.isExecutedBy(any())).thenReturn(false); + } + private void verifyInfoLog(int workerCount) { assertThat(logTester.logs()).hasSize(1); assertThat(logTester.logs(LoggerLevel.INFO)) diff --git a/server/sonar-ce/src/test/java/org/sonar/ce/taskprocessor/CeWorkerFactoryImplTest.java b/server/sonar-ce/src/test/java/org/sonar/ce/taskprocessor/CeWorkerFactoryImplTest.java index 4f0603ef59f..b88fbd4f7df 100644 --- a/server/sonar-ce/src/test/java/org/sonar/ce/taskprocessor/CeWorkerFactoryImplTest.java +++ b/server/sonar-ce/src/test/java/org/sonar/ce/taskprocessor/CeWorkerFactoryImplTest.java @@ -35,7 +35,7 @@ import static org.mockito.Mockito.mock; public class CeWorkerFactoryImplTest { private int randomOrdinal = new Random().nextInt(20); private CeWorkerFactoryImpl underTest = new CeWorkerFactoryImpl(mock(InternalCeQueue.class), - mock(CeTaskProcessorRepository.class), UuidFactoryImpl.INSTANCE, mock(EnabledCeWorkerController.class)); + mock(CeTaskProcessorRepository.class), UuidFactoryImpl.INSTANCE, mock(CeWorkerController.class)); @Test public void create_return_CeWorker_object_with_specified_ordinal() { @@ -49,7 +49,7 @@ public class CeWorkerFactoryImplTest { CeWorker.ExecutionListener executionListener1 = mock(CeWorker.ExecutionListener.class); CeWorker.ExecutionListener executionListener2 = mock(CeWorker.ExecutionListener.class); CeWorkerFactoryImpl underTest = new CeWorkerFactoryImpl(mock(InternalCeQueue.class), - mock(CeTaskProcessorRepository.class), UuidFactoryImpl.INSTANCE, mock(EnabledCeWorkerController.class), + mock(CeTaskProcessorRepository.class), UuidFactoryImpl.INSTANCE, mock(CeWorkerController.class), new CeWorker.ExecutionListener[] {executionListener1, executionListener2}); CeWorker ceWorker = underTest.create(randomOrdinal); diff --git a/server/sonar-ce/src/test/java/org/sonar/ce/taskprocessor/CeWorkerImplTest.java b/server/sonar-ce/src/test/java/org/sonar/ce/taskprocessor/CeWorkerImplTest.java index 0b15cca7ef3..db549d4c779 100644 --- a/server/sonar-ce/src/test/java/org/sonar/ce/taskprocessor/CeWorkerImplTest.java +++ b/server/sonar-ce/src/test/java/org/sonar/ce/taskprocessor/CeWorkerImplTest.java @@ -24,10 +24,12 @@ import java.util.List; import java.util.Map; import java.util.Optional; import java.util.Random; +import java.util.Set; import java.util.UUID; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import javax.annotation.CheckForNull; import javax.annotation.Nullable; -import org.apache.commons.lang.RandomStringUtils; import org.junit.Before; import org.junit.Rule; import org.junit.Test; @@ -35,6 +37,7 @@ import org.junit.rules.ExpectedException; import org.mockito.ArgumentCaptor; import org.mockito.InOrder; import org.mockito.Mockito; +import org.mockito.stubbing.Answer; import org.sonar.api.utils.MessageException; import org.sonar.api.utils.System2; import org.sonar.api.utils.internal.TestSystem2; @@ -45,6 +48,7 @@ import org.sonar.ce.queue.InternalCeQueue; import org.sonar.ce.task.CeTask; import org.sonar.ce.task.CeTaskResult; import org.sonar.ce.task.projectanalysis.taskprocessor.ReportTaskProcessor; +import org.sonar.ce.task.taskprocessor.CeTaskProcessor; import org.sonar.db.DbSession; import org.sonar.db.DbTester; import org.sonar.db.ce.CeActivityDto; @@ -53,6 +57,7 @@ import org.sonar.db.user.UserDto; import org.sonar.db.user.UserTesting; import org.sonar.server.organization.BillingValidations; +import static org.apache.commons.lang.RandomStringUtils.randomAlphabetic; import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyString; @@ -85,19 +90,19 @@ public class CeWorkerImplTest { private ReportTaskProcessor taskProcessor = mock(ReportTaskProcessor.class); private CeWorker.ExecutionListener executionListener1 = mock(CeWorker.ExecutionListener.class); private CeWorker.ExecutionListener executionListener2 = mock(CeWorker.ExecutionListener.class); - private EnabledCeWorkerController enabledCeWorkerController = mock(EnabledCeWorkerController.class); + private CeWorkerController ceWorkerController = mock(CeWorkerController.class); private ArgumentCaptor workerUuidCaptor = ArgumentCaptor.forClass(String.class); private int randomOrdinal = new Random().nextInt(50); private String workerUuid = UUID.randomUUID().toString(); - private CeWorker underTest = new CeWorkerImpl(randomOrdinal, workerUuid, queue, taskProcessorRepository, enabledCeWorkerController, + private CeWorker underTest = new CeWorkerImpl(randomOrdinal, workerUuid, queue, taskProcessorRepository, ceWorkerController, executionListener1, executionListener2); - private CeWorker underTestNoListener = new CeWorkerImpl(randomOrdinal, workerUuid, queue, taskProcessorRepository, enabledCeWorkerController); + private CeWorker underTestNoListener = new CeWorkerImpl(randomOrdinal, workerUuid, queue, taskProcessorRepository, ceWorkerController); private InOrder inOrder = Mockito.inOrder(taskProcessor, queue, executionListener1, executionListener2); private final CeTask.User submitter = new CeTask.User("UUID_USER_1", "LOGIN_1"); @Before public void setUp() { - when(enabledCeWorkerController.isEnabled(any(CeWorker.class))).thenReturn(true); + when(ceWorkerController.isEnabled(any(CeWorker.class))).thenReturn(true); } @Test @@ -105,20 +110,20 @@ public class CeWorkerImplTest { expectedException.expect(IllegalArgumentException.class); expectedException.expectMessage("Ordinal must be >= 0"); - new CeWorkerImpl(-1 - new Random().nextInt(20), workerUuid, queue, taskProcessorRepository, enabledCeWorkerController); + new CeWorkerImpl(-1 - new Random().nextInt(20), workerUuid, queue, taskProcessorRepository, ceWorkerController); } @Test public void getUUID_must_return_the_uuid_of_constructor() { String uuid = UUID.randomUUID().toString(); - CeWorker underTest = new CeWorkerImpl(randomOrdinal, uuid, queue, taskProcessorRepository, enabledCeWorkerController); + CeWorker underTest = new CeWorkerImpl(randomOrdinal, uuid, queue, taskProcessorRepository, ceWorkerController); assertThat(underTest.getUUID()).isEqualTo(uuid); } @Test public void worker_disabled() throws Exception { - reset(enabledCeWorkerController); - when(enabledCeWorkerController.isEnabled(underTest)).thenReturn(false); + reset(ceWorkerController); + when(ceWorkerController.isEnabled(underTest)).thenReturn(false); assertThat(underTest.call()).isEqualTo(DISABLED); @@ -127,8 +132,8 @@ public class CeWorkerImplTest { @Test public void worker_disabled_no_listener() throws Exception { - reset(enabledCeWorkerController); - when(enabledCeWorkerController.isEnabled(underTest)).thenReturn(false); + reset(ceWorkerController); + when(ceWorkerController.isEnabled(underTest)).thenReturn(false); assertThat(underTestNoListener.call()).isEqualTo(DISABLED); @@ -391,7 +396,7 @@ public class CeWorkerImplTest { @Test public void call_sets_and_restores_thread_name_with_information_of_worker_when_there_is_no_task_to_process() throws Exception { - String threadName = RandomStringUtils.randomAlphabetic(3); + String threadName = randomAlphabetic(3); when(queue.peek(anyString())).thenAnswer(invocation -> { assertThat(Thread.currentThread().getName()) .isEqualTo("Worker " + randomOrdinal + " (UUID=" + workerUuid + ") on " + threadName); @@ -405,7 +410,7 @@ public class CeWorkerImplTest { @Test public void call_sets_and_restores_thread_name_with_information_of_worker_when_a_task_is_processed() throws Exception { - String threadName = RandomStringUtils.randomAlphabetic(3); + String threadName = randomAlphabetic(3); when(queue.peek(anyString())).thenAnswer(invocation -> { assertThat(Thread.currentThread().getName()) .isEqualTo("Worker " + randomOrdinal + " (UUID=" + workerUuid + ") on " + threadName); @@ -420,7 +425,7 @@ public class CeWorkerImplTest { @Test public void call_sets_and_restores_thread_name_with_information_of_worker_when_an_error_occurs() throws Exception { - String threadName = RandomStringUtils.randomAlphabetic(3); + String threadName = randomAlphabetic(3); CeTask ceTask = createCeTask(submitter); when(queue.peek(anyString())).thenAnswer(invocation -> { assertThat(Thread.currentThread().getName()) @@ -437,10 +442,10 @@ public class CeWorkerImplTest { @Test public void call_sets_and_restores_thread_name_with_information_of_worker_when_worker_is_disabled() throws Exception { - reset(enabledCeWorkerController); - when(enabledCeWorkerController.isEnabled(underTest)).thenReturn(false); + reset(ceWorkerController); + when(ceWorkerController.isEnabled(underTest)).thenReturn(false); - String threadName = RandomStringUtils.randomAlphabetic(3); + String threadName = randomAlphabetic(3); Thread newThread = createThreadNameVerifyingThread(threadName); newThread.start(); @@ -561,6 +566,166 @@ public class CeWorkerImplTest { assertThat(((Exception) arg1).getSuppressed()).containsOnly(ex); } + @Test + public void isExecutedBy_returns_false_when_no_interaction_with_instance() { + assertThat(underTest.isExecutedBy(Thread.currentThread())).isFalse(); + assertThat(underTest.isExecutedBy(new Thread())).isFalse(); + } + + @Test + public void isExecutedBy_returns_false_unless_a_thread_is_currently_calling_call() throws InterruptedException { + CountDownLatch inCallLatch = new CountDownLatch(1); + CountDownLatch assertionsDoneLatch = new CountDownLatch(1); + // mock long running peek(String) call => Thread is executing call() but not running a task + when(queue.peek(anyString())).thenAnswer((Answer>) invocation -> { + inCallLatch.countDown(); + try { + assertionsDoneLatch.await(10, TimeUnit.SECONDS); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + return Optional.empty(); + }); + Thread t = callCallInNewThread(underTest); + + try { + t.start(); + + inCallLatch.await(10, TimeUnit.SECONDS); + assertThat(underTest.isExecutedBy(Thread.currentThread())).isFalse(); + assertThat(underTest.isExecutedBy(new Thread())).isFalse(); + assertThat(underTest.isExecutedBy(t)).isTrue(); + } finally { + assertionsDoneLatch.countDown(); + t.join(); + } + + assertThat(underTest.isExecutedBy(Thread.currentThread())).isFalse(); + assertThat(underTest.isExecutedBy(new Thread())).isFalse(); + assertThat(underTest.isExecutedBy(t)).isFalse(); + } + + @Test + public void isExecutedBy_returns_false_unless_a_thread_is_currently_executing_a_task() throws InterruptedException { + CountDownLatch inCallLatch = new CountDownLatch(1); + CountDownLatch assertionsDoneLatch = new CountDownLatch(1); + String taskType = randomAlphabetic(12); + CeTask ceTask = mock(CeTask.class); + when(ceTask.getType()).thenReturn(taskType); + when(queue.peek(anyString())).thenReturn(Optional.of(ceTask)); + taskProcessorRepository.setProcessorForTask(taskType, new SimpleCeTaskProcessor() { + @CheckForNull + @Override + public CeTaskResult process(CeTask task) { + inCallLatch.countDown(); + try { + assertionsDoneLatch.await(10, TimeUnit.SECONDS); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + return null; + } + }); + Thread t = callCallInNewThread(underTest); + + try { + t.start(); + + inCallLatch.await(10, TimeUnit.SECONDS); + assertThat(underTest.isExecutedBy(Thread.currentThread())).isFalse(); + assertThat(underTest.isExecutedBy(new Thread())).isFalse(); + assertThat(underTest.isExecutedBy(t)).isTrue(); + } finally { + assertionsDoneLatch.countDown(); + t.join(); + } + + assertThat(underTest.isExecutedBy(Thread.currentThread())).isFalse(); + assertThat(underTest.isExecutedBy(new Thread())).isFalse(); + assertThat(underTest.isExecutedBy(t)).isFalse(); + } + + @Test + public void getCurrentTask_returns_empty_when_no_interaction_with_instance() { + assertThat(underTest.getCurrentTask()).isEmpty(); + } + + @Test + public void getCurrentTask_returns_empty_when_a_thread_is_currently_calling_call_but_not_executing_a_task() throws InterruptedException { + CountDownLatch inCallLatch = new CountDownLatch(1); + CountDownLatch assertionsDoneLatch = new CountDownLatch(1); + // mock long running peek(String) call => Thread is executing call() but not running a task + when(queue.peek(anyString())).thenAnswer((Answer>) invocation -> { + inCallLatch.countDown(); + try { + assertionsDoneLatch.await(10, TimeUnit.SECONDS); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + return Optional.empty(); + }); + Thread t = callCallInNewThread(underTest); + + try { + t.start(); + + inCallLatch.await(10, TimeUnit.SECONDS); + assertThat(underTest.getCurrentTask()).isEmpty(); + } finally { + assertionsDoneLatch.countDown(); + t.join(); + } + + assertThat(underTest.getCurrentTask()).isEmpty(); + } + + @Test + public void getCurrentTask_returns_empty_unless_a_thread_is_currently_executing_a_task() throws InterruptedException { + CountDownLatch inCallLatch = new CountDownLatch(1); + CountDownLatch assertionsDoneLatch = new CountDownLatch(1); + String taskType = randomAlphabetic(12); + CeTask ceTask = mock(CeTask.class); + when(ceTask.getType()).thenReturn(taskType); + when(queue.peek(anyString())).thenReturn(Optional.of(ceTask)); + taskProcessorRepository.setProcessorForTask(taskType, new SimpleCeTaskProcessor() { + + @CheckForNull + @Override + public CeTaskResult process(CeTask task) { + inCallLatch.countDown(); + try { + assertionsDoneLatch.await(10, TimeUnit.SECONDS); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + return null; + } + }); + Thread t = callCallInNewThread(underTest); + + try { + t.start(); + + inCallLatch.await(10, TimeUnit.SECONDS); + assertThat(underTest.getCurrentTask()).contains(ceTask); + } finally { + assertionsDoneLatch.countDown(); + t.join(); + } + + assertThat(underTest.getCurrentTask()).isEmpty(); + } + + private Thread callCallInNewThread(CeWorker underTest) { + return new Thread(() -> { + try { + underTest.call(); + } catch (Exception e) { + throw new RuntimeException("call to call() failed and this is unexpected. Fix the UT.", e); + } + }); + } + private Thread createThreadNameVerifyingThread(String threadName) { return new Thread(() -> { verifyUnchangedThreadName(threadName); @@ -597,7 +762,7 @@ public class CeWorkerImplTest { .setCharacteristics(characteristicMap) .build(); } - + private UserDto insertRandomUser() { UserDto userDto = UserTesting.newUserDto(); db.getDbClient().userDao().insert(session, userDto); @@ -617,4 +782,11 @@ public class CeWorkerImplTest { doThrow(t).when(taskProcessor).process(task); return t; } + + private static abstract class SimpleCeTaskProcessor implements CeTaskProcessor { + @Override + public Set getHandledCeTaskTypes() { + throw new UnsupportedOperationException("getHandledCeTaskTypes should not be called"); + } + } } diff --git a/server/sonar-ce/src/test/java/org/sonar/ce/taskprocessor/ComputingThread.java b/server/sonar-ce/src/test/java/org/sonar/ce/taskprocessor/ComputingThread.java new file mode 100644 index 00000000000..3bb0a82186a --- /dev/null +++ b/server/sonar-ce/src/test/java/org/sonar/ce/taskprocessor/ComputingThread.java @@ -0,0 +1,55 @@ +/* + * SonarQube + * Copyright (C) 2009-2018 SonarSource SA + * mailto:info AT sonarsource DOT com + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 3 of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with this program; if not, write to the Free Software Foundation, + * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ +package org.sonar.ce.taskprocessor; + +class ComputingThread extends Thread { + private boolean kill = false; + + public ComputingThread(String name) { + setName(name); + } + + private long fibo(int i) { + if (kill) { + return i; + } + if (i == 0) { + return 0; + } + if (i == 1) { + return 1; + } + return fibo(i - 1) + fibo(i - 2); + } + + @Override + public void run() { + for (int i = 2; i < 50; i++) { + fibo(i); + if (kill) { + break; + } + } + } + + public void kill() { + this.kill = true; + } +} diff --git a/server/sonar-ce/src/test/java/org/sonar/ce/taskprocessor/SimpleCeTaskInterrupterTest.java b/server/sonar-ce/src/test/java/org/sonar/ce/taskprocessor/SimpleCeTaskInterrupterTest.java new file mode 100644 index 00000000000..c8f40e9e9fb --- /dev/null +++ b/server/sonar-ce/src/test/java/org/sonar/ce/taskprocessor/SimpleCeTaskInterrupterTest.java @@ -0,0 +1,78 @@ +/* + * SonarQube + * Copyright (C) 2009-2018 SonarSource SA + * mailto:info AT sonarsource DOT com + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 3 of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with this program; if not, write to the Free Software Foundation, + * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ +package org.sonar.ce.taskprocessor; + +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.sonar.ce.task.CeTask; +import org.sonar.ce.task.CeTaskCanceledException; + +import static org.apache.commons.lang.RandomStringUtils.randomAlphabetic; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verifyZeroInteractions; + +public class SimpleCeTaskInterrupterTest { + @Rule + public ExpectedException expectedException = ExpectedException.none(); + + private SimpleCeTaskInterrupter underTest = new SimpleCeTaskInterrupter(); + + @Test + public void check_throws_CeTaskCanceledException_if_provided_thread_is_interrupted() throws InterruptedException { + String threadName = randomAlphabetic(30); + ComputingThread t = new ComputingThread(threadName); + + try { + t.start(); + + // will not fail + underTest.check(t); + + t.interrupt(); + + expectedException.expect(CeTaskCanceledException.class); + expectedException.expectMessage("CeWorker executing in Thread '" + threadName + "' has been interrupted"); + + underTest.check(t); + } finally { + t.kill(); + t.join(1_000); + } + } + + @Test + public void onStart_has_no_effect() { + CeTask ceTask = mock(CeTask.class); + + underTest.onStart(ceTask); + + verifyZeroInteractions(ceTask); + } + + @Test + public void onEnd_has_no_effect() { + CeTask ceTask = mock(CeTask.class); + + underTest.onEnd(ceTask); + + verifyZeroInteractions(ceTask); + } +} diff --git a/server/sonar-ce/src/test/java/org/sonar/ce/taskprocessor/TimeoutCeTaskInterrupterTest.java b/server/sonar-ce/src/test/java/org/sonar/ce/taskprocessor/TimeoutCeTaskInterrupterTest.java new file mode 100644 index 00000000000..a49c8120565 --- /dev/null +++ b/server/sonar-ce/src/test/java/org/sonar/ce/taskprocessor/TimeoutCeTaskInterrupterTest.java @@ -0,0 +1,223 @@ +/* + * SonarQube + * Copyright (C) 2009-2018 SonarSource SA + * mailto:info AT sonarsource DOT com + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 3 of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with this program; if not, write to the Free Software Foundation, + * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ +package org.sonar.ce.taskprocessor; + +import java.util.Optional; +import java.util.Random; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.sonar.api.utils.System2; +import org.sonar.api.utils.log.LogTester; +import org.sonar.api.utils.log.LoggerLevel; +import org.sonar.ce.task.CeTask; +import org.sonar.ce.task.CeTaskCanceledException; +import org.sonar.ce.task.CeTaskTimeoutException; + +import static org.apache.commons.lang.RandomStringUtils.randomAlphabetic; +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class TimeoutCeTaskInterrupterTest { + @Rule + public ExpectedException expectedException = ExpectedException.none(); + @Rule + public LogTester logTester = new LogTester(); + + private int timeoutInSeconds = 1 + new Random().nextInt(20); + private int timeoutInMs = timeoutInSeconds * 1_000; + private CeWorkerController ceWorkerController = mock(CeWorkerController.class); + private System2 system2 = mock(System2.class); + private CeWorker ceWorker = mock(CeWorker.class); + private CeTask ceTask = mock(CeTask.class); + private TimeoutCeTaskInterrupter underTest = new TimeoutCeTaskInterrupter(timeoutInMs, ceWorkerController, system2); + + @Test + public void constructor_fails_with_IAE_if_timeout_is_0() { + expectedException.expect(IllegalArgumentException.class); + expectedException.expectMessage("threshold must be >= 1"); + + new TimeoutCeTaskInterrupter(0, ceWorkerController, system2); + } + + @Test + public void constructor_fails_with_IAE_if_timeout_is_less_than_0() { + long timeout = - (1 + new Random().nextInt(299)); + expectedException.expect(IllegalArgumentException.class); + expectedException.expectMessage("threshold must be >= 1"); + + new TimeoutCeTaskInterrupter(timeout, ceWorkerController, system2); + } + + @Test + public void constructor_log_timeout_in_ms_at_INFO_level() { + int timeout = 1 + new Random().nextInt(9_999); + + new TimeoutCeTaskInterrupter(timeout, ceWorkerController, system2); + + assertThat(logTester.logs()).hasSize(1); + assertThat(logTester.logs(LoggerLevel.INFO)) + .containsExactly("Compute Engine Task timeout enabled: " + timeout + " ms"); + } + + @Test + public void check_fails_with_ISE_if_thread_is_not_running_a_CeWorker() { + Thread t = newThreadWithRandomName(); + + expectedException.expect(IllegalStateException.class); + expectedException.expectMessage("Could not find the CeTask being executed in thread '" + t.getName() + "'"); + + underTest.check(t); + } + + @Test + public void check_fails_with_ISE_if_thread_is_not_running_a_CeWorker_with_no_current_CeTask() { + Thread t = newThreadWithRandomName(); + mockWorkerOnThread(t, ceWorker); + + expectedException.expect(IllegalStateException.class); + expectedException.expectMessage("Could not find the CeTask being executed in thread '" + t.getName() + "'"); + + underTest.check(t); + } + + @Test + public void check_fails_with_ISE_if_thread_is_executing_a_CeTask_but_on_start_has_not_been_called_on_it() { + String taskUuid = randomAlphabetic(15); + Thread t = new Thread(); + mockWorkerOnThread(t, ceWorker); + mockWorkerWithTask(ceTask); + when(ceTask.getUuid()).thenReturn(taskUuid); + + expectedException.expect(IllegalStateException.class); + expectedException.expectMessage("No start time recorded for task " + taskUuid); + + underTest.check(t); + } + + @Test + public void check_fails_with_ISE_if_thread_is_executing_a_CeTask_but_on_start_and_on_end_have_not_been_called_on_it() { + String taskUuid = randomAlphabetic(15); + Thread t = new Thread(); + mockWorkerOnThread(t, ceWorker); + mockWorkerWithTask(ceTask); + when(ceTask.getUuid()).thenReturn(taskUuid); + underTest.onStart(this.ceTask); + underTest.onEnd(this.ceTask); + + expectedException.expect(IllegalStateException.class); + expectedException.expectMessage("No start time recorded for task " + taskUuid); + + underTest.check(t); + } + + @Test + public void check_throws_CeTaskCanceledException_if_provided_thread_is_interrupted() throws InterruptedException { + String threadName = randomAlphabetic(30); + ComputingThread t = new ComputingThread(threadName); + mockWorkerOnThread(t, ceWorker); + mockWorkerWithTask(ceTask); + underTest.onStart(ceTask); + + try { + t.start(); + + // will not fail as thread is not interrupted nor timed out + underTest.check(t); + + t.interrupt(); + + expectedException.expect(CeTaskCanceledException.class); + expectedException.expectMessage("CeWorker executing in Thread '" + threadName + "' has been interrupted"); + + underTest.check(t); + } finally { + t.kill(); + t.join(1_000); + } + } + + @Test + public void check_throws_CeTaskTimeoutException_if_check_called_later_than_timeout_milliseconds_after_on_start() { + Thread thread = newThreadWithRandomName(); + mockWorkerOnThread(thread, ceWorker); + mockWorkerWithTask(ceTask); + long now = 3_776_663_999L; + when(system2.now()).thenReturn(now); + underTest.onStart(ceTask); + + // timeout not passed => no exception thrown + int beforeTimeoutOffset = 1 + new Random().nextInt(timeoutInMs - 1); + when(system2.now()).thenReturn(now + timeoutInMs - beforeTimeoutOffset); + underTest.check(thread); + + int afterTimeoutOffset = new Random().nextInt(7_112); + when(system2.now()).thenReturn(now + timeoutInMs + afterTimeoutOffset); + + expectedException.expect(CeTaskTimeoutException.class); + expectedException.expectMessage("Execution of task timed out after " + (timeoutInMs + afterTimeoutOffset) + " ms"); + + underTest.check(thread); + } + + @Test + public void check_throws_CeTaskCanceledException_if_provided_thread_is_interrupted_even_if_timed_out() throws InterruptedException { + String threadName = randomAlphabetic(30); + ComputingThread t = new ComputingThread(threadName); + mockWorkerOnThread(t, ceWorker); + mockWorkerWithTask(ceTask); + long now = 3_776_663_999L; + when(system2.now()).thenReturn(now); + underTest.onStart(ceTask); + + try { + t.start(); + t.interrupt(); + + // will not fail as thread is not interrupted nor timed out + int afterTimeoutOffset = new Random().nextInt(7_112); + when(system2.now()).thenReturn(now + timeoutInMs + afterTimeoutOffset); + + expectedException.expect(CeTaskCanceledException.class); + expectedException.expectMessage("CeWorker executing in Thread '" + threadName + "' has been interrupted"); + + underTest.check(t); + } finally { + t.kill(); + t.join(1_000); + } + } + + private static Thread newThreadWithRandomName() { + String threadName = randomAlphabetic(30); + Thread t = new Thread(); + t.setName(threadName); + return t; + } + + private void mockWorkerOnThread(Thread t, CeWorker ceWorker) { + when(ceWorkerController.getCeWorkerIn(t)).thenReturn(Optional.of(ceWorker)); + } + + private void mockWorkerWithTask(CeTask ceTask) { + when(ceWorker.getCurrentTask()).thenReturn(Optional.of(ceTask)); + } +} -- 2.39.5