]> source.dussan.org Git - sonarqube.git/commitdiff
SONARCLOUD-310 Ce task can now fail after a maximum amount of time
authorSébastien Lesaint <sebastien.lesaint@sonarsource.com>
Wed, 19 Dec 2018 15:55:29 +0000 (16:55 +0100)
committerSonarTech <sonartech@sonarsource.com>
Fri, 21 Dec 2018 19:21:03 +0000 (20:21 +0100)
failure can only happen when going from one step to the other

37 files changed:
server/sonar-ce-task/src/main/java/org/sonar/ce/task/CeTaskCanceledException.java [new file with mode: 0644]
server/sonar-ce-task/src/main/java/org/sonar/ce/task/CeTaskInterruptedException.java [new file with mode: 0644]
server/sonar-ce-task/src/main/java/org/sonar/ce/task/CeTaskInterrupter.java [new file with mode: 0644]
server/sonar-ce-task/src/main/java/org/sonar/ce/task/CeTaskTimeoutException.java [new file with mode: 0644]
server/sonar-ce-task/src/main/java/org/sonar/ce/task/step/ComputationStepExecutor.java
server/sonar-ce-task/src/test/java/org/sonar/ce/task/CeTaskCanceledExceptionTest.java [new file with mode: 0644]
server/sonar-ce-task/src/test/java/org/sonar/ce/task/CeTaskInterruptedExceptionTest.java [new file with mode: 0644]
server/sonar-ce-task/src/test/java/org/sonar/ce/task/CeTaskTimeoutExceptionTest.java [new file with mode: 0644]
server/sonar-ce-task/src/test/java/org/sonar/ce/task/step/ComputationStepExecutorTest.java
server/sonar-ce/src/main/java/org/sonar/ce/monitoring/CeTasksMBeanImpl.java
server/sonar-ce/src/main/java/org/sonar/ce/queue/InternalCeQueueImpl.java
server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeProcessingSchedulerImpl.java
server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeTaskInterrupterProvider.java [new file with mode: 0644]
server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeTaskInterrupterWorkerExecutionListener.java [new file with mode: 0644]
server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeTaskProcessorModule.java
server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeWorker.java
server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeWorkerController.java [new file with mode: 0644]
server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeWorkerControllerImpl.java [new file with mode: 0644]
server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeWorkerFactoryImpl.java
server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeWorkerImpl.java
server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/EnabledCeWorkerController.java [deleted file]
server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/EnabledCeWorkerControllerImpl.java [deleted file]
server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/SimpleCeTaskInterrupter.java [new file with mode: 0644]
server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/TimeoutCeTaskInterrupter.java [new file with mode: 0644]
server/sonar-ce/src/test/java/org/sonar/ce/container/ComputeEngineContainerImplTest.java
server/sonar-ce/src/test/java/org/sonar/ce/monitoring/CeTasksMBeanImplTest.java
server/sonar-ce/src/test/java/org/sonar/ce/taskprocessor/CeProcessingSchedulerImplTest.java
server/sonar-ce/src/test/java/org/sonar/ce/taskprocessor/CeTaskInterrupterProviderTest.java [new file with mode: 0644]
server/sonar-ce/src/test/java/org/sonar/ce/taskprocessor/CeTaskInterrupterWorkerExecutionListenerTest.java [new file with mode: 0644]
server/sonar-ce/src/test/java/org/sonar/ce/taskprocessor/CeTaskProcessorModuleTest.java
server/sonar-ce/src/test/java/org/sonar/ce/taskprocessor/CeWorkerControllerImplTest.java [new file with mode: 0644]
server/sonar-ce/src/test/java/org/sonar/ce/taskprocessor/CeWorkerFactoryImplTest.java
server/sonar-ce/src/test/java/org/sonar/ce/taskprocessor/CeWorkerImplTest.java
server/sonar-ce/src/test/java/org/sonar/ce/taskprocessor/ComputingThread.java [new file with mode: 0644]
server/sonar-ce/src/test/java/org/sonar/ce/taskprocessor/EnabledCeWorkerControllerImplTest.java [deleted file]
server/sonar-ce/src/test/java/org/sonar/ce/taskprocessor/SimpleCeTaskInterrupterTest.java [new file with mode: 0644]
server/sonar-ce/src/test/java/org/sonar/ce/taskprocessor/TimeoutCeTaskInterrupterTest.java [new file with mode: 0644]

diff --git a/server/sonar-ce-task/src/main/java/org/sonar/ce/task/CeTaskCanceledException.java b/server/sonar-ce-task/src/main/java/org/sonar/ce/task/CeTaskCanceledException.java
new file mode 100644 (file)
index 0000000..dd3d9b6
--- /dev/null
@@ -0,0 +1,31 @@
+/*
+ * SonarQube
+ * Copyright (C) 2009-2018 SonarSource SA
+ * mailto:info AT sonarsource DOT com
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 3 of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this program; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
+ */
+package org.sonar.ce.task;
+
+import org.sonar.db.ce.CeActivityDto;
+
+import static java.lang.String.format;
+
+public final class CeTaskCanceledException extends CeTaskInterruptedException {
+  public CeTaskCanceledException(Thread thread) {
+    super(format("CeWorker executing in Thread '%s' has been interrupted", thread.getName()),
+      CeActivityDto.Status.CANCELED);
+  }
+}
diff --git a/server/sonar-ce-task/src/main/java/org/sonar/ce/task/CeTaskInterruptedException.java b/server/sonar-ce-task/src/main/java/org/sonar/ce/task/CeTaskInterruptedException.java
new file mode 100644 (file)
index 0000000..33192cd
--- /dev/null
@@ -0,0 +1,56 @@
+/*
+ * SonarQube
+ * Copyright (C) 2009-2018 SonarSource SA
+ * mailto:info AT sonarsource DOT com
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 3 of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this program; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
+ */
+package org.sonar.ce.task;
+
+import java.util.Optional;
+import org.sonar.db.ce.CeActivityDto;
+
+import static java.util.Objects.requireNonNull;
+
+public abstract class CeTaskInterruptedException extends RuntimeException {
+  private final CeActivityDto.Status status;
+
+  protected CeTaskInterruptedException(String message, CeActivityDto.Status status) {
+    super(message);
+    this.status = requireNonNull(status, "status can't be null");
+  }
+
+  public CeActivityDto.Status getStatus() {
+    return status;
+  }
+
+  public static Optional<CeTaskInterruptedException> isTaskInterruptedException(Throwable e) {
+    if (e instanceof CeTaskInterruptedException) {
+      return Optional.of((CeTaskInterruptedException) e);
+    }
+    return isCauseInterruptedException(e);
+  }
+
+  private static Optional<CeTaskInterruptedException> isCauseInterruptedException(Throwable e) {
+    Throwable cause = e.getCause();
+    if (cause == null || cause == e) {
+      return Optional.empty();
+    }
+    if (cause instanceof CeTaskInterruptedException) {
+      return Optional.of((CeTaskInterruptedException) cause);
+    }
+    return isCauseInterruptedException(cause);
+  }
+}
diff --git a/server/sonar-ce-task/src/main/java/org/sonar/ce/task/CeTaskInterrupter.java b/server/sonar-ce-task/src/main/java/org/sonar/ce/task/CeTaskInterrupter.java
new file mode 100644 (file)
index 0000000..f3acee8
--- /dev/null
@@ -0,0 +1,44 @@
+/*
+ * SonarQube
+ * Copyright (C) 2009-2018 SonarSource SA
+ * mailto:info AT sonarsource DOT com
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 3 of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this program; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
+ */
+package org.sonar.ce.task;
+
+/**
+ * Method {@link #check(Thread)} of the {@link CeTaskInterrupter} can be called during the processing of a
+ * {@link CeTask} to check whether processing of this task must be interrupted.
+ * <p>
+ * Interruption cause may be user cancelling the task, operator stopping the Compute Engine, execution running for
+ * too long, ...
+ */
+public interface CeTaskInterrupter {
+  /**
+   * @throws CeTaskInterruptedException if the execution of the task must be interrupted
+   */
+  void check(Thread currentThread) throws CeTaskInterruptedException;
+
+  /**
+   * Lets the interrupter know that the processing of the specified task has started.
+   */
+  void onStart(CeTask ceTask);
+
+  /**
+   * Lets the interrupter know that the processing of the specified task has ended.
+   */
+  void onEnd(CeTask ceTask);
+}
diff --git a/server/sonar-ce-task/src/main/java/org/sonar/ce/task/CeTaskTimeoutException.java b/server/sonar-ce-task/src/main/java/org/sonar/ce/task/CeTaskTimeoutException.java
new file mode 100644 (file)
index 0000000..0e4caac
--- /dev/null
@@ -0,0 +1,54 @@
+/*
+ * SonarQube
+ * Copyright (C) 2009-2018 SonarSource SA
+ * mailto:info AT sonarsource DOT com
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 3 of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this program; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
+ */
+package org.sonar.ce.task;
+
+import org.sonar.db.ce.CeActivityDto;
+
+/**
+ * This exception will stop the task and make it be archived with the {@link CeActivityDto.Status#FAILED FAILED}
+ * status and the error type {@code TIMEOUT}.
+ * <p>
+ * This exception has no stacktrace:
+ * <ul>
+ *   <li>it's irrelevant to the end user</li>
+ *   <li>we don't want to leak any implementation detail</li>
+ * </ul>
+ */
+public final class CeTaskTimeoutException extends CeTaskInterruptedException implements TypedException {
+
+  public CeTaskTimeoutException(String message) {
+    super(message, CeActivityDto.Status.FAILED);
+  }
+
+  @Override
+  public String getType() {
+    return "TIMEOUT";
+  }
+
+  /**
+   * Does not fill in the stack trace
+   *
+   * @see Throwable#fillInStackTrace()
+   */
+  @Override
+  public synchronized Throwable fillInStackTrace() {
+    return this;
+  }
+}
index 8318fe8d8ab36a7970c5e200ff30dcc5bdb06dc7..c7a546f235d9c86cb61da077129ce741c0ed686a 100644 (file)
@@ -23,6 +23,7 @@ import javax.annotation.CheckForNull;
 import javax.annotation.Nullable;
 import org.sonar.api.utils.log.Logger;
 import org.sonar.api.utils.log.Loggers;
+import org.sonar.ce.task.CeTaskInterrupter;
 import org.sonar.core.util.logs.Profiler;
 
 import static com.google.common.base.Preconditions.checkArgument;
@@ -33,6 +34,7 @@ public final class ComputationStepExecutor {
   private static final Logger LOGGER = Loggers.get(ComputationStepExecutor.class);
 
   private final ComputationSteps steps;
+  private final CeTaskInterrupter taskInterrupter;
   @CheckForNull
   private final Listener listener;
 
@@ -40,12 +42,13 @@ public final class ComputationStepExecutor {
    * Used when no {@link ComputationStepExecutor.Listener} is available in pico
    * container.
    */
-  public ComputationStepExecutor(ComputationSteps steps) {
-    this(steps, null);
+  public ComputationStepExecutor(ComputationSteps steps, CeTaskInterrupter taskInterrupter) {
+    this(steps, taskInterrupter, null);
   }
 
-  public ComputationStepExecutor(ComputationSteps steps, @Nullable Listener listener) {
+  public ComputationStepExecutor(ComputationSteps steps, CeTaskInterrupter taskInterrupter, @Nullable Listener listener) {
     this.steps = steps;
+    this.taskInterrupter = taskInterrupter;
     this.listener = listener;
   }
 
@@ -70,10 +73,11 @@ public final class ComputationStepExecutor {
     }
   }
 
-  private static void executeStep(Profiler stepProfiler, ComputationStep.Context context, ComputationStep step) {
+  private void executeStep(Profiler stepProfiler, ComputationStep.Context context, ComputationStep step) {
     String status = "FAILED";
     stepProfiler.start();
     try {
+      taskInterrupter.check(Thread.currentThread());
       step.execute(context);
       status = "SUCCESS";
     } finally {
diff --git a/server/sonar-ce-task/src/test/java/org/sonar/ce/task/CeTaskCanceledExceptionTest.java b/server/sonar-ce-task/src/test/java/org/sonar/ce/task/CeTaskCanceledExceptionTest.java
new file mode 100644 (file)
index 0000000..3d3b507
--- /dev/null
@@ -0,0 +1,45 @@
+/*
+ * SonarQube
+ * Copyright (C) 2009-2018 SonarSource SA
+ * mailto:info AT sonarsource DOT com
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 3 of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this program; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
+ */
+package org.sonar.ce.task;
+
+import org.junit.Test;
+import org.sonar.db.ce.CeActivityDto;
+
+import static org.apache.commons.lang.RandomStringUtils.randomAlphabetic;
+import static org.assertj.core.api.Assertions.assertThat;
+
+public class CeTaskCanceledExceptionTest {
+  @Test
+  public void message_is_based_on_specified_thread_name() {
+    Thread t = new Thread();
+    t.setName(randomAlphabetic(29));
+
+    CeTaskCanceledException underTest = new CeTaskCanceledException(t);
+
+    assertThat(underTest.getMessage()).isEqualTo("CeWorker executing in Thread '" + t.getName() + "' has been interrupted");
+  }
+
+  @Test
+  public void getStatus_returns_CANCELED() {
+    CeTaskCanceledException underTest = new CeTaskCanceledException(new Thread());
+
+    assertThat(underTest.getStatus()).isEqualTo(CeActivityDto.Status.CANCELED);
+  }
+}
diff --git a/server/sonar-ce-task/src/test/java/org/sonar/ce/task/CeTaskInterruptedExceptionTest.java b/server/sonar-ce-task/src/test/java/org/sonar/ce/task/CeTaskInterruptedExceptionTest.java
new file mode 100644 (file)
index 0000000..84e47e9
--- /dev/null
@@ -0,0 +1,70 @@
+/*
+ * SonarQube
+ * Copyright (C) 2009-2018 SonarSource SA
+ * mailto:info AT sonarsource DOT com
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 3 of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this program; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
+ */
+package org.sonar.ce.task;
+
+import java.util.Random;
+import org.junit.Test;
+import org.sonar.db.ce.CeActivityDto;
+
+import static org.apache.commons.lang.RandomStringUtils.randomAlphabetic;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.sonar.ce.task.CeTaskInterruptedException.isTaskInterruptedException;
+
+public class CeTaskInterruptedExceptionTest {
+
+  @Test
+  public void isCauseInterruptedException_returns_CeTaskInterruptedException_or_subclass() {
+    String message = randomAlphabetic(50);
+    CeActivityDto.Status status = randomStatus();
+    CeTaskInterruptedException e1 = new CeTaskInterruptedException(message, status) {
+
+    };
+    CeTaskInterruptedException e2 = new CeTaskInterruptedExceptionSubclass(message, status);
+
+    assertThat(isTaskInterruptedException(e1)).contains(e1);
+    assertThat(isTaskInterruptedException(e2)).contains(e2);
+    assertThat(isTaskInterruptedException(new RuntimeException())).isEmpty();
+    assertThat(isTaskInterruptedException(new Exception())).isEmpty();
+  }
+
+  @Test
+  public void isCauseInterruptedException_returns_CeTaskInterruptedException_or_subclass_in_cause_chain() {
+    String message = randomAlphabetic(50);
+    CeActivityDto.Status status = randomStatus();
+    CeTaskInterruptedException e1 = new CeTaskInterruptedException(message, status) {
+
+    };
+    CeTaskInterruptedException e2 = new CeTaskInterruptedExceptionSubclass(message, status);
+
+    assertThat(isTaskInterruptedException(new RuntimeException(e1))).contains(e1);
+    assertThat(isTaskInterruptedException(new Exception(new RuntimeException(e2)))).contains(e2);
+  }
+
+  private static CeActivityDto.Status randomStatus() {
+    return CeActivityDto.Status.values()[new Random().nextInt(CeActivityDto.Status.values().length)];
+  }
+
+  private static class CeTaskInterruptedExceptionSubclass extends CeTaskInterruptedException {
+    public CeTaskInterruptedExceptionSubclass(String message, CeActivityDto.Status status) {
+      super(message, status);
+    }
+  }
+
+}
diff --git a/server/sonar-ce-task/src/test/java/org/sonar/ce/task/CeTaskTimeoutExceptionTest.java b/server/sonar-ce-task/src/test/java/org/sonar/ce/task/CeTaskTimeoutExceptionTest.java
new file mode 100644 (file)
index 0000000..a8059fd
--- /dev/null
@@ -0,0 +1,53 @@
+/*
+ * SonarQube
+ * Copyright (C) 2009-2018 SonarSource SA
+ * mailto:info AT sonarsource DOT com
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 3 of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this program; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
+ */
+package org.sonar.ce.task;
+
+import java.io.PrintWriter;
+import java.io.StringWriter;
+import org.apache.commons.lang.RandomStringUtils;
+import org.junit.Test;
+import org.sonar.db.ce.CeActivityDto;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+public class CeTaskTimeoutExceptionTest {
+  private String message = RandomStringUtils.randomAlphabetic(50);
+  private CeTaskTimeoutException underTest = new CeTaskTimeoutException(message);
+
+  @Test
+  public void verify_message_and_type() {
+    assertThat(underTest.getMessage()).isEqualTo(message);
+    assertThat(underTest.getType()).isEqualTo("TIMEOUT");
+  }
+
+  @Test
+  public void getStatus_returns_FAILED() {
+    assertThat(underTest.getStatus()).isEqualTo(CeActivityDto.Status.FAILED);
+  }
+
+
+  @Test
+  public void noStacktrace() {
+    StringWriter stacktrace = new StringWriter();
+    underTest.printStackTrace(new PrintWriter(stacktrace));
+    assertThat(stacktrace.toString())
+      .isEqualTo(CeTaskTimeoutException.class.getCanonicalName() + ": " + message + System.lineSeparator());
+  }
+}
index 71810a3ce55a6af81fa1919a08f8f7b9eeaaf8cc..3900fd9dfa3580e67d9877d1c3676b3ec6f3b768 100644 (file)
@@ -27,15 +27,18 @@ import org.junit.rules.ExpectedException;
 import org.mockito.InOrder;
 import org.sonar.api.utils.log.LogTester;
 import org.sonar.api.utils.log.LoggerLevel;
+import org.sonar.ce.task.CeTaskInterrupter;
 import org.sonar.ce.task.ChangeLogLevel;
 
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.fail;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyBoolean;
+import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.inOrder;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.reset;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.verifyNoMoreInteractions;
 import static org.mockito.Mockito.when;
@@ -47,13 +50,14 @@ public class ComputationStepExecutorTest {
   public ExpectedException expectedException = ExpectedException.none();
 
   private final ComputationStepExecutor.Listener listener = mock(ComputationStepExecutor.Listener.class);
+  private final CeTaskInterrupter taskInterrupter = mock(CeTaskInterrupter.class);
   private final ComputationStep computationStep1 = mockComputationStep("step1");
   private final ComputationStep computationStep2 = mockComputationStep("step2");
   private final ComputationStep computationStep3 = mockComputationStep("step3");
 
   @Test
   public void execute_call_execute_on_each_ComputationStep_in_order_returned_by_instances_method() {
-    new ComputationStepExecutor(mockComputationSteps(computationStep1, computationStep2, computationStep3))
+    new ComputationStepExecutor(mockComputationSteps(computationStep1, computationStep2, computationStep3), taskInterrupter)
       .execute();
 
     InOrder inOrder = inOrder(computationStep1, computationStep2, computationStep3);
@@ -75,7 +79,7 @@ public class ComputationStepExecutorTest {
       .when(computationStep)
       .execute(any());
 
-    ComputationStepExecutor computationStepExecutor = new ComputationStepExecutor(mockComputationSteps(computationStep));
+    ComputationStepExecutor computationStepExecutor = new ComputationStepExecutor(mockComputationSteps(computationStep), taskInterrupter);
 
     expectedException.expect(RuntimeException.class);
     expectedException.expectMessage(message);
@@ -93,7 +97,7 @@ public class ComputationStepExecutorTest {
       ChangeLogLevel logLevel1 = new ChangeLogLevel(step1.getClass(), LoggerLevel.INFO);
       ChangeLogLevel logLevel2 = new ChangeLogLevel(step2.getClass(), LoggerLevel.INFO);
       ChangeLogLevel logLevel3 = new ChangeLogLevel(step3.getClass(), LoggerLevel.INFO)) {
-      new ComputationStepExecutor(mockComputationSteps(step1, step2, step3)).execute();
+      new ComputationStepExecutor(mockComputationSteps(step1, step2, step3), taskInterrupter).execute();
 
       List<String> infoLogs = logTester.logs(LoggerLevel.INFO);
       assertThat(infoLogs).hasSize(3);
@@ -114,7 +118,7 @@ public class ComputationStepExecutorTest {
         super.execute(context);
         throw expected;
       }
-    } ;
+    };
 
     try (ChangeLogLevel executor = new ChangeLogLevel(ComputationStepExecutor.class, LoggerLevel.INFO);
       ChangeLogLevel logLevel1 = new ChangeLogLevel(step1.getClass(), LoggerLevel.INFO);
@@ -122,7 +126,7 @@ public class ComputationStepExecutorTest {
       ChangeLogLevel logLevel3 = new ChangeLogLevel(step3.getClass(), LoggerLevel.INFO)) {
 
       try {
-        new ComputationStepExecutor(mockComputationSteps(step1, step2, step3)).execute();
+        new ComputationStepExecutor(mockComputationSteps(step1, step2, step3), taskInterrupter).execute();
         fail("a RuntimeException should have been thrown");
       } catch (RuntimeException e) {
         List<String> infoLogs = logTester.logs(LoggerLevel.INFO);
@@ -142,7 +146,7 @@ public class ComputationStepExecutorTest {
       expectedException.expect(IllegalArgumentException.class);
       expectedException.expectMessage("Statistic with key [time] is not accepted");
 
-      new ComputationStepExecutor(mockComputationSteps(step)).execute();
+      new ComputationStepExecutor(mockComputationSteps(step), taskInterrupter).execute();
     }
   }
 
@@ -154,7 +158,7 @@ public class ComputationStepExecutorTest {
       expectedException.expect(IllegalArgumentException.class);
       expectedException.expectMessage("Statistic with key [foo] is already present");
 
-      new ComputationStepExecutor(mockComputationSteps(step)).execute();
+      new ComputationStepExecutor(mockComputationSteps(step), taskInterrupter).execute();
     }
   }
 
@@ -166,7 +170,7 @@ public class ComputationStepExecutorTest {
       expectedException.expect(NullPointerException.class);
       expectedException.expectMessage("Statistic has null key");
 
-      new ComputationStepExecutor(mockComputationSteps(step)).execute();
+      new ComputationStepExecutor(mockComputationSteps(step), taskInterrupter).execute();
     }
   }
 
@@ -178,13 +182,13 @@ public class ComputationStepExecutorTest {
       expectedException.expect(NullPointerException.class);
       expectedException.expectMessage("Statistic with key [bar] has null value");
 
-      new ComputationStepExecutor(mockComputationSteps(step)).execute();
+      new ComputationStepExecutor(mockComputationSteps(step), taskInterrupter).execute();
     }
   }
 
   @Test
   public void execute_calls_listener_finished_method_with_all_step_runs() {
-    new ComputationStepExecutor(mockComputationSteps(computationStep1, computationStep2), listener)
+    new ComputationStepExecutor(mockComputationSteps(computationStep1, computationStep2), taskInterrupter, listener)
       .execute();
 
     verify(listener).finished(true);
@@ -199,7 +203,7 @@ public class ComputationStepExecutorTest {
       .execute(any());
 
     try {
-      new ComputationStepExecutor(mockComputationSteps(computationStep1, computationStep2), listener)
+      new ComputationStepExecutor(mockComputationSteps(computationStep1, computationStep2), taskInterrupter, listener)
         .execute();
       fail("exception toBeThrown should have been raised");
     } catch (RuntimeException e) {
@@ -216,7 +220,76 @@ public class ComputationStepExecutorTest {
       .when(listener)
       .finished(anyBoolean());
 
-    new ComputationStepExecutor(mockComputationSteps(computationStep1), listener).execute();
+    new ComputationStepExecutor(mockComputationSteps(computationStep1), taskInterrupter, listener).execute();
+  }
+
+  @Test
+  public void execute_fails_with_exception_thrown_by_interrupter() throws Throwable {
+    executeFailsWithExceptionThrownByInterrupter();
+
+    reset(computationStep1, computationStep2, computationStep3, taskInterrupter);
+    runInOtherThread(this::executeFailsWithExceptionThrownByInterrupter);
+  }
+
+  private void executeFailsWithExceptionThrownByInterrupter() {
+    Thread currentThread = Thread.currentThread();
+    ComputationStepExecutor underTest = new ComputationStepExecutor(mockComputationSteps(computationStep1, computationStep2, computationStep3), taskInterrupter);
+    RuntimeException exception = new RuntimeException("mocking fail of method check()");
+    doNothing()
+      .doNothing()
+      .doThrow(exception)
+      .when(taskInterrupter)
+      .check(currentThread);
+
+    try {
+      underTest.execute();
+      fail("execute should have thrown an exception");
+    } catch (Exception e) {
+      assertThat(e).isSameAs(exception);
+    }
+  }
+
+  @Test
+  public void execute_calls_interrupter_with_current_thread_before_each_step() throws Throwable {
+    executeCallsInterrupterWithCurrentThreadBeforeEachStep();
+
+    reset(computationStep1, computationStep2, computationStep3, taskInterrupter);
+    runInOtherThread(this::executeCallsInterrupterWithCurrentThreadBeforeEachStep);
+  }
+
+  private void executeCallsInterrupterWithCurrentThreadBeforeEachStep() {
+    InOrder inOrder = inOrder(computationStep1, computationStep2, computationStep3, taskInterrupter);
+    ComputationStepExecutor underTest = new ComputationStepExecutor(mockComputationSteps(computationStep1, computationStep2, computationStep3), taskInterrupter);
+
+    underTest.execute();
+
+    inOrder.verify(taskInterrupter).check(Thread.currentThread());
+    inOrder.verify(computationStep1).execute(any());
+    inOrder.verify(computationStep1).getDescription();
+    inOrder.verify(taskInterrupter).check(Thread.currentThread());
+    inOrder.verify(computationStep2).execute(any());
+    inOrder.verify(computationStep2).getDescription();
+    inOrder.verify(taskInterrupter).check(Thread.currentThread());
+    inOrder.verify(computationStep3).execute(any());
+    inOrder.verify(computationStep3).getDescription();
+    inOrder.verifyNoMoreInteractions();
+  }
+
+  private void runInOtherThread(Runnable r) throws Throwable {
+    Throwable[] otherThreadException = new Throwable[1];
+    Thread t = new Thread(() -> {
+      try {
+        r.run();
+      } catch (Throwable e) {
+        otherThreadException[0] = e;
+      }
+    });
+    t.start();
+    t.join();
+
+    if (otherThreadException[0] != null) {
+      throw otherThreadException[0];
+    }
   }
 
   private static ComputationSteps mockComputationSteps(ComputationStep... computationSteps) {
index 08cd1464c457a90aa74328a5b8fec25ce33f5c4a..0b53f4b6adb26997cfb9b8d15624beafce2bfa4e 100644 (file)
@@ -25,8 +25,8 @@ import java.util.stream.Collectors;
 import org.picocontainer.Startable;
 import org.sonar.ce.configuration.CeConfiguration;
 import org.sonar.ce.taskprocessor.CeWorker;
+import org.sonar.ce.taskprocessor.CeWorkerController;
 import org.sonar.ce.taskprocessor.CeWorkerFactory;
-import org.sonar.ce.taskprocessor.EnabledCeWorkerController;
 import org.sonar.process.Jmx;
 import org.sonar.process.systeminfo.SystemInfoSection;
 import org.sonar.process.systeminfo.protobuf.ProtobufSystemInfo;
@@ -35,13 +35,13 @@ public class CeTasksMBeanImpl implements CeTasksMBean, Startable, SystemInfoSect
   private final CEQueueStatus queueStatus;
   private final CeConfiguration ceConfiguration;
   private final CeWorkerFactory ceWorkerFactory;
-  private final EnabledCeWorkerController enabledCeWorkerController;
+  private final CeWorkerController ceWorkerController;
 
-  public CeTasksMBeanImpl(CEQueueStatus queueStatus, CeConfiguration ceConfiguration, CeWorkerFactory ceWorkerFactory, EnabledCeWorkerController enabledCeWorkerController) {
+  public CeTasksMBeanImpl(CEQueueStatus queueStatus, CeConfiguration ceConfiguration, CeWorkerFactory ceWorkerFactory, CeWorkerController CeWorkerController) {
     this.queueStatus = queueStatus;
     this.ceConfiguration = ceConfiguration;
     this.ceWorkerFactory = ceWorkerFactory;
-    this.enabledCeWorkerController = enabledCeWorkerController;
+    this.ceWorkerController = CeWorkerController;
   }
 
   @Override
@@ -105,7 +105,7 @@ public class CeTasksMBeanImpl implements CeTasksMBean, Startable, SystemInfoSect
   public List<String> getEnabledWorkerUuids() {
     Set<CeWorker> workers = ceWorkerFactory.getWorkers();
     return workers.stream()
-      .filter(enabledCeWorkerController::isEnabled)
+      .filter(ceWorkerController::isEnabled)
       .map(CeWorker::getUUID)
       .sorted()
       .collect(Collectors.toList());
index 17ca3b6f47493b6e57dcd006facad78fc2f4195a..6cd77dddbbf9b9835309d5cef95883ae6706eae6 100644 (file)
@@ -37,8 +37,8 @@ import org.sonar.ce.container.ComputeEngineStatus;
 import org.sonar.ce.monitoring.CEQueueStatus;
 import org.sonar.ce.task.CeTask;
 import org.sonar.ce.task.CeTaskResult;
-import org.sonar.ce.task.projectanalysis.component.VisitException;
 import org.sonar.ce.task.TypedException;
+import org.sonar.ce.task.projectanalysis.component.VisitException;
 import org.sonar.core.util.UuidFactory;
 import org.sonar.db.DbClient;
 import org.sonar.db.DbSession;
index 2a25ae0cbe09e0e0d7c6fc2b172a8c6db969bea5..4ce1f7210ecb73d5b322105fc961ae401afabc5e 100644 (file)
@@ -41,12 +41,12 @@ public class CeProcessingSchedulerImpl implements CeProcessingScheduler {
   private final long delayBetweenEnabledTasks;
   private final TimeUnit timeUnit;
   private final ChainingCallback[] chainingCallbacks;
-  private final EnabledCeWorkerController ceWorkerController;
+  private final CeWorkerController ceWorkerController;
   private final int gracefulStopTimeoutInMs;
 
   public CeProcessingSchedulerImpl(CeConfiguration ceConfiguration,
     CeProcessingSchedulerExecutorService processingExecutorService, CeWorkerFactory ceCeWorkerFactory,
-    EnabledCeWorkerController ceWorkerController) {
+    CeWorkerController ceWorkerController) {
     this.executorService = processingExecutorService;
 
     this.delayBetweenEnabledTasks = ceConfiguration.getQueuePollingDelay();
diff --git a/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeTaskInterrupterProvider.java b/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeTaskInterrupterProvider.java
new file mode 100644 (file)
index 0000000..e082dd2
--- /dev/null
@@ -0,0 +1,46 @@
+/*
+ * SonarQube
+ * Copyright (C) 2009-2018 SonarSource SA
+ * mailto:info AT sonarsource DOT com
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 3 of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this program; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
+ */
+package org.sonar.ce.taskprocessor;
+
+import org.picocontainer.injectors.ProviderAdapter;
+import org.sonar.api.config.Configuration;
+import org.sonar.api.utils.System2;
+import org.sonar.ce.task.CeTaskInterrupter;
+
+import static com.google.common.base.Preconditions.checkState;
+
+public class CeTaskInterrupterProvider extends ProviderAdapter {
+  private static final String PROPERTY_CE_TASK_TIMEOUT = "sonar.ce.task.timeoutSeconds";
+
+  private CeTaskInterrupter instance;
+
+  public CeTaskInterrupter provide(Configuration configuration, CeWorkerController ceWorkerController, System2 system2) {
+    if (instance == null) {
+      instance = configuration.getLong(PROPERTY_CE_TASK_TIMEOUT)
+        .filter(timeOutInSeconds -> {
+          checkState(timeOutInSeconds >= 1, "The property '%s' must be a long value >= 1. Got '%s'", PROPERTY_CE_TASK_TIMEOUT, timeOutInSeconds);
+          return true;
+        })
+        .map(timeOutInSeconds -> (CeTaskInterrupter) new TimeoutCeTaskInterrupter(timeOutInSeconds * 1_000L, ceWorkerController, system2))
+        .orElseGet(SimpleCeTaskInterrupter::new);
+    }
+    return instance;
+  }
+}
diff --git a/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeTaskInterrupterWorkerExecutionListener.java b/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeTaskInterrupterWorkerExecutionListener.java
new file mode 100644 (file)
index 0000000..4cb42c1
--- /dev/null
@@ -0,0 +1,44 @@
+/*
+ * SonarQube
+ * Copyright (C) 2009-2018 SonarSource SA
+ * mailto:info AT sonarsource DOT com
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 3 of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this program; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
+ */
+package org.sonar.ce.taskprocessor;
+
+import javax.annotation.Nullable;
+import org.sonar.ce.task.CeTask;
+import org.sonar.ce.task.CeTaskInterrupter;
+import org.sonar.ce.task.CeTaskResult;
+import org.sonar.db.ce.CeActivityDto;
+
+public class CeTaskInterrupterWorkerExecutionListener implements CeWorker.ExecutionListener {
+  private final CeTaskInterrupter interrupter;
+
+  public CeTaskInterrupterWorkerExecutionListener(CeTaskInterrupter interrupter) {
+    this.interrupter = interrupter;
+  }
+
+  @Override
+  public void onStart(CeTask ceTask) {
+    interrupter.onStart(ceTask);
+  }
+
+  @Override
+  public void onEnd(CeTask ceTask, CeActivityDto.Status status, @Nullable CeTaskResult taskResult, @Nullable Throwable error) {
+    interrupter.onEnd(ceTask);
+  }
+}
index 4f3b0ba6989325b3d85f3719f2064944a4a20c36..bbd351a844bf3d26384db7e4732e64e277e9dc4b 100644 (file)
@@ -29,9 +29,13 @@ public class CeTaskProcessorModule extends Module {
       CeTaskProcessorRepositoryImpl.class,
       CeLoggingWorkerExecutionListener.class,
       ReportAnalysisFailureNotificationExecutionListener.class,
+      new CeTaskInterrupterProvider(),
+      CeTaskInterrupterWorkerExecutionListener.class,
       CeWorkerFactoryImpl.class,
-      EnabledCeWorkerControllerImpl.class,
+      CeWorkerControllerImpl.class,
       CeProcessingSchedulerExecutorServiceImpl.class,
-      CeProcessingSchedulerImpl.class);
+      CeProcessingSchedulerImpl.class
+
+    );
   }
 }
index 74b416fd0bba5a9cbe3c49354561041ffae10d1c..8bfeffc401762820301445cfb4575bc31518cbb7 100644 (file)
@@ -19,6 +19,7 @@
  */
 package org.sonar.ce.taskprocessor;
 
+import java.util.Optional;
 import java.util.concurrent.Callable;
 import javax.annotation.Nullable;
 import org.sonar.ce.queue.CeQueue;
@@ -32,6 +33,7 @@ import org.sonar.db.ce.CeActivityDto;
  * {@code false} otherwise.
  */
 public interface CeWorker extends Callable<CeWorker.Result> {
+
   enum Result {
     /** Worker is disabled */
     DISABLED,
@@ -51,6 +53,16 @@ public interface CeWorker extends Callable<CeWorker.Result> {
    */
   String getUUID();
 
+  /**
+   * @return {@code true} if this CeWorker currently being executed by the specified {@link Thread}.
+   */
+  boolean isExecutedBy(Thread thread);
+
+  /**
+   * @return the {@link CeTask} currently being executed by this worker, if any.
+   */
+  Optional<CeTask> getCurrentTask();
+
   /**
    * Classes implementing will be called a task start and finishes executing.
    * All classes implementing this interface are guaranted to be called for each event, even if another implementation
diff --git a/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeWorkerController.java b/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeWorkerController.java
new file mode 100644 (file)
index 0000000..df0d328
--- /dev/null
@@ -0,0 +1,58 @@
+/*
+ * SonarQube
+ * Copyright (C) 2009-2018 SonarSource SA
+ * mailto:info AT sonarsource DOT com
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 3 of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this program; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
+ */
+package org.sonar.ce.taskprocessor;
+
+import java.util.Optional;
+
+/**
+ * This class is responsible of knowing/deciding which {@link CeWorker} is enabled and should actually try and find a
+ * task to process.
+ */
+public interface CeWorkerController {
+  interface ProcessingRecorderHook extends AutoCloseable {
+    /**
+     * Override to not declare any exception thrown.
+     */
+    @Override
+    void close();
+  }
+
+  /**
+   * Registers to the controller that the specified {@link CeWorker}
+   */
+  ProcessingRecorderHook registerProcessingFor(CeWorker ceWorker);
+
+  /**
+   * Returns {@code true} if the specified {@link CeWorker} is enabled
+   */
+  boolean isEnabled(CeWorker ceWorker);
+
+  /**
+   * @return the {@link CeWorker} running in the specified {@link Thread}, if any.
+   */
+  Optional<CeWorker> getCeWorkerIn(Thread thread);
+
+  /**
+   * Whether at least one worker is processing a task or not.
+   *
+   * @return {@code false} when all workers are waiting for tasks or are being stopped.
+   */
+  boolean hasAtLeastOneProcessingWorker();
+}
diff --git a/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeWorkerControllerImpl.java b/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeWorkerControllerImpl.java
new file mode 100644 (file)
index 0000000..114be1e
--- /dev/null
@@ -0,0 +1,88 @@
+/*
+ * SonarQube
+ * Copyright (C) 2009-2018 SonarSource SA
+ * mailto:info AT sonarsource DOT com
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 3 of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this program; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
+ */
+package org.sonar.ce.taskprocessor;
+
+import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+import org.sonar.api.utils.log.Loggers;
+import org.sonar.ce.configuration.CeConfiguration;
+
+public class CeWorkerControllerImpl implements CeWorkerController {
+  private final ConcurrentHashMap<CeWorker, Status> workerStatuses = new ConcurrentHashMap<>();
+  private final CeConfiguration ceConfiguration;
+
+  enum Status {
+    PROCESSING, PAUSED
+  }
+
+  public CeWorkerControllerImpl(CeConfiguration ceConfiguration) {
+    this.ceConfiguration = ceConfiguration;
+    logEnabledWorkerCount();
+  }
+
+  private void logEnabledWorkerCount() {
+    int workerCount = ceConfiguration.getWorkerCount();
+    if (workerCount > 1) {
+      Loggers.get(CeWorkerController.class).info("Compute Engine will use {} concurrent workers to process tasks", workerCount);
+    }
+  }
+
+  @Override
+  public Optional<CeWorker> getCeWorkerIn(Thread thread) {
+    return workerStatuses.keySet().stream()
+      .filter(t -> t.isExecutedBy(thread))
+      .findFirst();
+  }
+
+  @Override
+  public ProcessingRecorderHook registerProcessingFor(CeWorker ceWorker) {
+    return new ProcessingRecorderHookImpl(ceWorker);
+  }
+
+  @Override
+  public boolean hasAtLeastOneProcessingWorker() {
+    return workerStatuses.entrySet().stream().anyMatch(e -> e.getValue() == Status.PROCESSING);
+  }
+
+  /**
+   * Returns {@code true} if {@link CeWorker#getOrdinal() worker ordinal} is strictly less than
+   * {@link CeConfiguration#getWorkerCount()}.
+   *
+   * This method does not fail if ordinal is invalid (ie. < 0).
+   */
+  @Override
+  public boolean isEnabled(CeWorker ceWorker) {
+    return ceWorker.getOrdinal() < ceConfiguration.getWorkerCount();
+  }
+
+  private class ProcessingRecorderHookImpl implements ProcessingRecorderHook {
+    private final CeWorker ceWorker;
+
+    private ProcessingRecorderHookImpl(CeWorker ceWorker) {
+      this.ceWorker = ceWorker;
+      workerStatuses.put(this.ceWorker, Status.PROCESSING);
+    }
+
+    @Override
+    public void close() {
+      workerStatuses.put(ceWorker, Status.PAUSED);
+    }
+  }
+}
index 5ddff9078225964cd4e3964484dfd6c79fb397c2..6a508b5118d41f2fbad6f970c1509f2c7d9c74e3 100644 (file)
@@ -30,7 +30,7 @@ public class CeWorkerFactoryImpl implements CeWorkerFactory {
   private final UuidFactory uuidFactory;
   private final InternalCeQueue queue;
   private final CeTaskProcessorRepository taskProcessorRepository;
-  private final EnabledCeWorkerController enabledCeWorkerController;
+  private final CeWorkerController ceWorkerController;
   private final CeWorker.ExecutionListener[] executionListeners;
   private Set<CeWorker> ceWorkers = Collections.emptySet();
 
@@ -38,24 +38,24 @@ public class CeWorkerFactoryImpl implements CeWorkerFactory {
    * Used by Pico when there is no {@link CeWorker.ExecutionListener} in the container.
    */
   public CeWorkerFactoryImpl(InternalCeQueue queue, CeTaskProcessorRepository taskProcessorRepository,
-    UuidFactory uuidFactory, EnabledCeWorkerController enabledCeWorkerController) {
-    this(queue, taskProcessorRepository, uuidFactory, enabledCeWorkerController, new CeWorker.ExecutionListener[0]);
+    UuidFactory uuidFactory, CeWorkerController ceWorkerController) {
+    this(queue, taskProcessorRepository, uuidFactory, ceWorkerController, new CeWorker.ExecutionListener[0]);
   }
 
   public CeWorkerFactoryImpl(InternalCeQueue queue, CeTaskProcessorRepository taskProcessorRepository,
-    UuidFactory uuidFactory, EnabledCeWorkerController enabledCeWorkerController,
+    UuidFactory uuidFactory, CeWorkerController ceWorkerController,
     CeWorker.ExecutionListener[] executionListeners) {
     this.queue = queue;
     this.taskProcessorRepository = taskProcessorRepository;
     this.uuidFactory = uuidFactory;
-    this.enabledCeWorkerController = enabledCeWorkerController;
+    this.ceWorkerController = ceWorkerController;
     this.executionListeners = executionListeners;
   }
 
   @Override
   public CeWorker create(int ordinal) {
     String uuid = uuidFactory.create();
-    CeWorkerImpl ceWorker = new CeWorkerImpl(ordinal, uuid, queue, taskProcessorRepository, enabledCeWorkerController, executionListeners);
+    CeWorkerImpl ceWorker = new CeWorkerImpl(ordinal, uuid, queue, taskProcessorRepository, ceWorkerController, executionListeners);
     ceWorkers = Stream.concat(ceWorkers.stream(), Stream.of(ceWorker)).collect(MoreCollectors.toSet(ceWorkers.size() + 1));
     return ceWorker;
   }
index f2c775a120af9b1f6e8a521af3fc32beb778cdcf..8bb2c17023e96cef901ddcc560ea2d876894c50a 100644 (file)
@@ -23,7 +23,9 @@ import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Consumer;
+import java.util.function.Function;
 import java.util.function.Supplier;
 import javax.annotation.CheckForNull;
 import javax.annotation.Nullable;
@@ -32,6 +34,7 @@ import org.sonar.api.utils.log.Logger;
 import org.sonar.api.utils.log.Loggers;
 import org.sonar.ce.queue.InternalCeQueue;
 import org.sonar.ce.task.CeTask;
+import org.sonar.ce.task.CeTaskInterruptedException;
 import org.sonar.ce.task.CeTaskResult;
 import org.sonar.ce.task.taskprocessor.CeTaskProcessor;
 import org.sonar.core.util.logs.Profiler;
@@ -39,9 +42,11 @@ import org.sonar.db.ce.CeActivityDto;
 
 import static com.google.common.base.Preconditions.checkArgument;
 import static java.lang.String.format;
+import static org.sonar.ce.task.CeTaskInterruptedException.isTaskInterruptedException;
 import static org.sonar.ce.taskprocessor.CeWorker.Result.DISABLED;
 import static org.sonar.ce.taskprocessor.CeWorker.Result.NO_TASK;
 import static org.sonar.ce.taskprocessor.CeWorker.Result.TASK_PROCESSED;
+import static org.sonar.db.ce.CeActivityDto.Status.FAILED;
 
 public class CeWorkerImpl implements CeWorker {
 
@@ -51,18 +56,19 @@ public class CeWorkerImpl implements CeWorker {
   private final String uuid;
   private final InternalCeQueue queue;
   private final CeTaskProcessorRepository taskProcessorRepository;
-  private final EnabledCeWorkerController enabledCeWorkerController;
+  private final CeWorkerController ceWorkerController;
   private final List<ExecutionListener> listeners;
+  private final AtomicReference<RunningState> runningState = new AtomicReference<>();
 
   public CeWorkerImpl(int ordinal, String uuid,
     InternalCeQueue queue, CeTaskProcessorRepository taskProcessorRepository,
-    EnabledCeWorkerController enabledCeWorkerController,
+    CeWorkerController ceWorkerController,
     ExecutionListener... listeners) {
     this.ordinal = checkOrdinal(ordinal);
     this.uuid = uuid;
     this.queue = queue;
     this.taskProcessorRepository = taskProcessorRepository;
-    this.enabledCeWorkerController = enabledCeWorkerController;
+    this.ceWorkerController = ceWorkerController;
     this.listeners = Arrays.asList(listeners);
   }
 
@@ -73,22 +79,71 @@ public class CeWorkerImpl implements CeWorker {
 
   @Override
   public Result call() {
-    return withCustomizedThreadName(this::findAndProcessTask);
+    try (TrackRunningState trackRunningState = new TrackRunningState(this::findAndProcessTask)) {
+      return trackRunningState.get();
+    }
   }
 
-  private <T> T withCustomizedThreadName(Supplier<T> supplier) {
-    Thread currentThread = Thread.currentThread();
-    String oldName = currentThread.getName();
-    try {
-      currentThread.setName(String.format("Worker %s (UUID=%s) on %s", getOrdinal(), getUUID(), oldName));
-      return supplier.get();
-    } finally {
-      currentThread.setName(oldName);
+  @Override
+  public int getOrdinal() {
+    return ordinal;
+  }
+
+  @Override
+  public String getUUID() {
+    return uuid;
+  }
+
+  @Override
+  public boolean isExecutedBy(Thread thread) {
+    return Optional.ofNullable(runningState.get())
+      .filter(state -> state.runningThread.equals(thread))
+      .isPresent();
+  }
+
+  @Override
+  public Optional<CeTask> getCurrentTask() {
+    return Optional.ofNullable(runningState.get())
+      .flatMap(RunningState::getTask);
+  }
+
+  private class TrackRunningState implements AutoCloseable, Supplier<Result> {
+    private final RunningState localRunningState;
+    private final Function<RunningState, Result> delegate;
+    private final String oldName;
+
+    private TrackRunningState(Function<RunningState, Result> delegate) {
+      Thread currentThread = Thread.currentThread();
+      localRunningState = new RunningState(currentThread);
+      if (!runningState.compareAndSet(null, localRunningState)) {
+        LOG.warn("Worker {} (UUID=%s) starts executing with new Thread {} while running state isn't null. " +
+          "Forcefully updating Workers's running state to new Thread.",
+          getOrdinal(), getUUID(), currentThread);
+        runningState.set(localRunningState);
+      }
+      this.delegate = delegate;
+      this.oldName = currentThread.getName();
+    }
+
+    @Override
+    public Result get() {
+      localRunningState.runningThread.setName(String.format("Worker %s (UUID=%s) on %s", getOrdinal(), getUUID(), oldName));
+      return delegate.apply(localRunningState);
+    }
+
+    @Override
+    public void close() {
+      localRunningState.runningThread.setName(oldName);
+      if (!runningState.compareAndSet(localRunningState, null)) {
+        LOG.warn("Worker {} (UUID=%s) ending execution in Thread {} while running state has already changed." +
+          " Keeping this new state.",
+          getOrdinal(), getUUID(), localRunningState.runningThread);
+      }
     }
   }
 
-  private Result findAndProcessTask() {
-    if (!enabledCeWorkerController.isEnabled(this)) {
+  private Result findAndProcessTask(RunningState localRunningState) {
+    if (!ceWorkerController.isEnabled(this)) {
       return DISABLED;
     }
     Optional<CeTask> ceTask = tryAndFindTaskToExecute();
@@ -96,8 +151,9 @@ public class CeWorkerImpl implements CeWorker {
       return NO_TASK;
     }
 
-    try (EnabledCeWorkerController.ProcessingRecorderHook processing = enabledCeWorkerController.registerProcessingFor(this)) {
-      executeTask(ceTask.get());
+    try (CeWorkerController.ProcessingRecorderHook processing = ceWorkerController.registerProcessingFor(this);
+      ExecuteTask executeTask = new ExecuteTask(localRunningState, ceTask.get())) {
+      executeTask.run();
     } catch (Exception e) {
       LOG.error(format("An error occurred while executing task with uuid '%s'", ceTask.get().getUuid()), e);
     }
@@ -113,68 +169,95 @@ public class CeWorkerImpl implements CeWorker {
     return Optional.empty();
   }
 
-  @Override
-  public int getOrdinal() {
-    return ordinal;
-  }
+  private final class ExecuteTask implements Runnable, AutoCloseable {
+    private final CeTask task;
+    private final RunningState localRunningState;
+    private final Profiler ceProfiler;
+    private CeActivityDto.Status status = FAILED;
+    private CeTaskResult taskResult = null;
+    private Throwable error = null;
 
-  @Override
-  public String getUUID() {
-    return uuid;
-  }
+    private ExecuteTask(RunningState localRunningState, CeTask task) {
+      this.task = task;
+      this.localRunningState = localRunningState;
+      this.ceProfiler = startLogProfiler(task);
+    }
 
-  private void executeTask(CeTask task) {
-    callListeners(t -> t.onStart(task));
-    Profiler ceProfiler = startLogProfiler(task);
+    @Override
+    public void run() {
+      beforeExecute();
+      executeTask();
+    }
 
-    CeActivityDto.Status status = CeActivityDto.Status.FAILED;
-    CeTaskResult taskResult = null;
-    Throwable error = null;
-    try {
-      // TODO delegate the message to the related task processor, according to task type
-      Optional<CeTaskProcessor> taskProcessor = taskProcessorRepository.getForCeTask(task);
-      if (taskProcessor.isPresent()) {
-        taskResult = taskProcessor.get().process(task);
-        status = CeActivityDto.Status.SUCCESS;
-      } else {
-        LOG.error("No CeTaskProcessor is defined for task of type {}. Plugin configuration may have changed", task.getType());
-        status = CeActivityDto.Status.FAILED;
+    @Override
+    public void close() {
+      afterExecute();
+    }
+
+    private void beforeExecute() {
+      localRunningState.setTask(task);
+      callListeners(t -> t.onStart(task));
+    }
+
+    private void executeTask() {
+      try {
+        // TODO delegate the message to the related task processor, according to task type
+        Optional<CeTaskProcessor> taskProcessor = taskProcessorRepository.getForCeTask(task);
+        if (taskProcessor.isPresent()) {
+          taskResult = taskProcessor.get().process(task);
+          status = CeActivityDto.Status.SUCCESS;
+        } else {
+          LOG.error("No CeTaskProcessor is defined for task of type {}. Plugin configuration may have changed", task.getType());
+          status = FAILED;
+        }
+      } catch (MessageException e) {
+        // error
+        error = e;
+      } catch (Throwable e) {
+        Optional<CeTaskInterruptedException> taskInterruptedException = isTaskInterruptedException(e);
+        if (taskInterruptedException.isPresent()) {
+          LOG.trace("Task interrupted", e);
+          CeTaskInterruptedException exception = taskInterruptedException.get();
+          CeActivityDto.Status interruptionStatus = exception.getStatus();
+          status = interruptionStatus;
+          error = (interruptionStatus == FAILED ? exception : null);
+        } else {
+          // error
+          LOG.error("Failed to execute task {}", task.getUuid(), e);
+          error = e;
+        }
       }
-    } catch (MessageException e) {
-      // error
-      error = e;
-    } catch (Throwable e) {
-      // error
-      LOG.error("Failed to execute task {}", task.getUuid(), e);
-      error = e;
-    } finally {
+    }
+
+    private void afterExecute() {
+      localRunningState.setTask(null);
       finalizeTask(task, ceProfiler, status, taskResult, error);
     }
-  }
 
-  private void callListeners(Consumer<ExecutionListener> call) {
-    listeners.forEach(listener -> {
+    private void finalizeTask(CeTask task, Profiler ceProfiler, CeActivityDto.Status status,
+      @Nullable CeTaskResult taskResult, @Nullable Throwable error) {
       try {
-        call.accept(listener);
-      } catch (Throwable t) {
-        LOG.error(format("Call to listener %s failed.", listener.getClass().getSimpleName()), t);
+        queue.remove(task, status, taskResult, error);
+      } catch (Exception e) {
+        if (error != null) {
+          e.addSuppressed(error);
+        }
+        LOG.error(format("Failed to finalize task with uuid '%s' and persist its state to db", task.getUuid()), e);
+      } finally {
+        // finalize
+        stopLogProfiler(ceProfiler, status);
+        callListeners(t -> t.onEnd(task, status, taskResult, error));
       }
-    });
-  }
+    }
 
-  private void finalizeTask(CeTask task, Profiler ceProfiler, CeActivityDto.Status status,
-    @Nullable CeTaskResult taskResult, @Nullable Throwable error) {
-    try {
-      queue.remove(task, status, taskResult, error);
-    } catch (Exception e) {
-      if (error != null) {
-        e.addSuppressed(error);
-      }
-      LOG.error(format("Failed to finalize task with uuid '%s' and persist its state to db", task.getUuid()), e);
-    } finally {
-      // finalize
-      stopLogProfiler(ceProfiler, status);
-      callListeners(t -> t.onEnd(task, status, taskResult, error));
+    private void callListeners(Consumer<ExecutionListener> call) {
+      listeners.forEach(listener -> {
+        try {
+          call.accept(listener);
+        } catch (Throwable t) {
+          LOG.error(format("Call to listener %s failed.", listener.getClass().getSimpleName()), t);
+        }
+      });
     }
   }
 
@@ -210,4 +293,22 @@ public class CeWorkerImpl implements CeWorker {
     profiler.addContext("status", status.name());
     profiler.stopInfo("Executed task");
   }
+
+  private static final class RunningState {
+    private final Thread runningThread;
+    private CeTask task;
+
+    private RunningState(Thread runningThread) {
+      this.runningThread = runningThread;
+    }
+
+    public Optional<CeTask> getTask() {
+      return Optional.ofNullable(task);
+    }
+
+    public void setTask(@Nullable CeTask task) {
+      this.task = task;
+    }
+  }
+
 }
diff --git a/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/EnabledCeWorkerController.java b/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/EnabledCeWorkerController.java
deleted file mode 100644 (file)
index e11063d..0000000
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * SonarQube
- * Copyright (C) 2009-2018 SonarSource SA
- * mailto:info AT sonarsource DOT com
- *
- * This program is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License as published by the Free Software Foundation; either
- * version 3 of the License, or (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public License
- * along with this program; if not, write to the Free Software Foundation,
- * Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
- */
-package org.sonar.ce.taskprocessor;
-
-/**
- * This class is responsible of knowing/deciding which {@link CeWorker} is enabled and should actually try and find a
- * task to process.
- */
-public interface EnabledCeWorkerController {
-  interface ProcessingRecorderHook extends AutoCloseable {
-  }
-
-  /**
-   * Returns {@code true} if the specified {@link CeWorker} is enabled
-   */
-  boolean isEnabled(CeWorker ceWorker);
-
-  ProcessingRecorderHook registerProcessingFor(CeWorker ceWorker);
-
-  /**
-   * Whether at least one worker is being processed a task or not.
-   * Returns {@code false} when all workers are waiting for tasks
-   * or are being stopped.
-   */
-  boolean hasAtLeastOneProcessingWorker();
-}
diff --git a/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/EnabledCeWorkerControllerImpl.java b/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/EnabledCeWorkerControllerImpl.java
deleted file mode 100644 (file)
index 7370647..0000000
+++ /dev/null
@@ -1,80 +0,0 @@
-/*
- * SonarQube
- * Copyright (C) 2009-2018 SonarSource SA
- * mailto:info AT sonarsource DOT com
- *
- * This program is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License as published by the Free Software Foundation; either
- * version 3 of the License, or (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public License
- * along with this program; if not, write to the Free Software Foundation,
- * Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
- */
-package org.sonar.ce.taskprocessor;
-
-import java.util.concurrent.ConcurrentHashMap;
-import org.sonar.api.utils.log.Loggers;
-import org.sonar.ce.configuration.CeConfiguration;
-
-public class EnabledCeWorkerControllerImpl implements EnabledCeWorkerController {
-  private final ConcurrentHashMap<CeWorker, Status> map = new ConcurrentHashMap<>();
-  private final CeConfiguration ceConfiguration;
-
-  enum Status {
-    PROCESSING, PAUSED
-  }
-
-  public EnabledCeWorkerControllerImpl(CeConfiguration ceConfiguration) {
-    this.ceConfiguration = ceConfiguration;
-    logEnabledWorkerCount();
-  }
-
-  private void logEnabledWorkerCount() {
-    int workerCount = ceConfiguration.getWorkerCount();
-    if (workerCount > 1) {
-      Loggers.get(EnabledCeWorkerController.class).info("Compute Engine will use {} concurrent workers to process tasks", workerCount);
-    }
-  }
-
-  @Override
-  public ProcessingRecorderHook registerProcessingFor(CeWorker ceWorker) {
-    return new ProcessingRecorderHookImpl(ceWorker);
-  }
-
-  @Override
-  public boolean hasAtLeastOneProcessingWorker() {
-    return map.entrySet().stream().anyMatch(e -> e.getValue() == Status.PROCESSING);
-  }
-
-  /**
-   * Returns {@code true} if {@link CeWorker#getOrdinal() worker ordinal} is strictly less than
-   * {@link CeConfiguration#getWorkerCount()}.
-   *
-   * This method does not fail if ordinal is invalid (ie. < 0).
-   */
-  @Override
-  public boolean isEnabled(CeWorker ceWorker) {
-    return ceWorker.getOrdinal() < ceConfiguration.getWorkerCount();
-  }
-
-  private class ProcessingRecorderHookImpl implements ProcessingRecorderHook {
-    private final CeWorker ceWorker;
-
-    private ProcessingRecorderHookImpl(CeWorker ceWorker) {
-      this.ceWorker = ceWorker;
-      map.put(this.ceWorker, Status.PROCESSING);
-    }
-
-    @Override
-    public void close() {
-      map.put(ceWorker, Status.PAUSED);
-    }
-  }
-}
diff --git a/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/SimpleCeTaskInterrupter.java b/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/SimpleCeTaskInterrupter.java
new file mode 100644 (file)
index 0000000..0847cbf
--- /dev/null
@@ -0,0 +1,50 @@
+/*
+ * SonarQube
+ * Copyright (C) 2009-2018 SonarSource SA
+ * mailto:info AT sonarsource DOT com
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 3 of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this program; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
+ */
+package org.sonar.ce.taskprocessor;
+
+import org.sonar.ce.task.CeTask;
+import org.sonar.ce.task.CeTaskCanceledException;
+import org.sonar.ce.task.CeTaskInterruptedException;
+import org.sonar.ce.task.CeTaskInterrupter;
+
+/**
+ * An implementation of {@link CeTaskInterrupter} which will only interrupt the processing if the current thread
+ * has been interrupted.
+ *
+ * @see Thread#isInterrupted()
+ */
+public class SimpleCeTaskInterrupter implements CeTaskInterrupter {
+  @Override
+  public void check(Thread currentThread) throws CeTaskInterruptedException {
+    if (currentThread.isInterrupted()) {
+      throw new CeTaskCanceledException(currentThread);
+    }
+  }
+
+  @Override
+  public void onStart(CeTask ceTask) {
+    // nothing to do
+  }
+
+  @Override
+  public void onEnd(CeTask ceTask) {
+    // nothing to do
+  }
+}
diff --git a/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/TimeoutCeTaskInterrupter.java b/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/TimeoutCeTaskInterrupter.java
new file mode 100644 (file)
index 0000000..1bebdee
--- /dev/null
@@ -0,0 +1,103 @@
+/*
+ * SonarQube
+ * Copyright (C) 2009-2018 SonarSource SA
+ * mailto:info AT sonarsource DOT com
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 3 of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this program; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
+ */
+package org.sonar.ce.taskprocessor;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import org.sonar.api.utils.System2;
+import org.sonar.api.utils.log.Loggers;
+import org.sonar.ce.task.CeTask;
+import org.sonar.ce.task.CeTaskInterruptedException;
+import org.sonar.ce.task.CeTaskTimeoutException;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkState;
+import static java.lang.String.format;
+
+/**
+ * An implementation of {@link org.sonar.ce.task.CeTaskInterrupter} which interrupts the processing of the task
+ * if:
+ * <ul>
+ *   <li>the thread has been interrupted</li>
+ *   <li>it's been running for more than a certain, configurable, amount of time</li>
+ * </ul>
+ */
+public class TimeoutCeTaskInterrupter extends SimpleCeTaskInterrupter {
+  private final long taskTimeoutThreshold;
+  private final CeWorkerController ceWorkerController;
+  private final System2 system2;
+  private final Map<String, Long> startTimestampByCeTaskUuid = new HashMap<>();
+
+  public TimeoutCeTaskInterrupter(long taskTimeoutThreshold, CeWorkerController ceWorkerController, System2 system2) {
+    checkArgument(taskTimeoutThreshold >= 1, "threshold must be >= 1");
+    Loggers.get(TimeoutCeTaskInterrupter.class).info("Compute Engine Task timeout enabled: {} ms", taskTimeoutThreshold);
+
+    this.taskTimeoutThreshold = taskTimeoutThreshold;
+    this.ceWorkerController = ceWorkerController;
+    this.system2 = system2;
+  }
+
+  @Override
+  public void check(Thread currentThread) throws CeTaskInterruptedException {
+    super.check(currentThread);
+
+    computeTimeOutOf(taskOf(currentThread))
+      .ifPresent(timeout -> {
+        throw new CeTaskTimeoutException(format("Execution of task timed out after %s ms", timeout));
+      });
+  }
+
+  private Optional<Long> computeTimeOutOf(CeTask ceTask) {
+    Long startTimestamp = startTimestampByCeTaskUuid.get(ceTask.getUuid());
+    checkState(startTimestamp != null, "No start time recorded for task %s", ceTask.getUuid());
+
+    long duration = system2.now() - startTimestamp;
+    return Optional.of(duration)
+      .filter(t -> t > taskTimeoutThreshold);
+  }
+
+  private CeTask taskOf(Thread currentThread) {
+    return ceWorkerController.getCeWorkerIn(currentThread)
+      .flatMap(CeWorker::getCurrentTask)
+      .orElseThrow(() -> new IllegalStateException(format("Could not find the CeTask being executed in thread '%s'", currentThread.getName())));
+  }
+
+  @Override
+  public void onStart(CeTask ceTask) {
+    long now = system2.now();
+    Long existingTimestamp = startTimestampByCeTaskUuid.put(ceTask.getUuid(), now);
+    if (existingTimestamp != null) {
+      Loggers.get(TimeoutCeTaskInterrupter.class)
+        .warn("Notified of start of execution of task %s but start had already been recorded at %s. Recording new start at %s",
+          ceTask.getUuid(), existingTimestamp, now);
+    }
+  }
+
+  @Override
+  public void onEnd(CeTask ceTask) {
+    Long startTimestamp = startTimestampByCeTaskUuid.remove(ceTask.getUuid());
+    if (startTimestamp == null) {
+      Loggers.get(TimeoutCeTaskInterrupter.class)
+        .warn("Notified of end of execution of task %s but start wasn't recorded", ceTask.getUuid());
+    }
+  }
+
+}
index 046bfd3d8c0a6f72f42cc6cb0d8e93eabe534ce9..a05b5ebe74babb2cca64cc6a1d8a6a5815788952 100644 (file)
@@ -103,7 +103,7 @@ public class ComputeEngineContainerImplTest {
             + 3 // content of CeHttpModule
             + 3 // content of CeTaskCommonsModule
             + 4 // content of ProjectAnalysisTaskModule
-            + 7 // content of CeTaskProcessorModule
+            + 9 // content of CeTaskProcessorModule
             + 3 // content of ReportAnalysisFailureNotificationModule
             + 3 // CeCleaningModule + its content
             + 4 // WebhookModule
index 18cc1c0462a210a5d5c3a2f4c676bd78d807acaa..53c83fc01387ad12e0716e38112d536f13ee984d 100644 (file)
@@ -35,8 +35,8 @@ import org.apache.commons.lang.RandomStringUtils;
 import org.junit.Test;
 import org.sonar.ce.configuration.CeConfiguration;
 import org.sonar.ce.taskprocessor.CeWorker;
+import org.sonar.ce.taskprocessor.CeWorkerController;
 import org.sonar.ce.taskprocessor.CeWorkerFactory;
-import org.sonar.ce.taskprocessor.EnabledCeWorkerController;
 import org.sonar.core.util.stream.MoreCollectors;
 import org.sonar.process.systeminfo.protobuf.ProtobufSystemInfo;
 
@@ -61,8 +61,8 @@ public class CeTasksMBeanImplTest {
     })
     .collect(MoreCollectors.toSet());
 
-  private EnabledCeWorkerController enabledCeWorkerController = mock(EnabledCeWorkerController.class);
-  private CeTasksMBeanImpl underTest = new CeTasksMBeanImpl(new DumbCEQueueStatus(), new DumbCeConfiguration(), new DumbCeWorkerFactory(), enabledCeWorkerController);
+  private CeWorkerController ceWorkerController = mock(CeWorkerController.class);
+  private CeTasksMBeanImpl underTest = new CeTasksMBeanImpl(new DumbCEQueueStatus(), new DumbCeConfiguration(), new DumbCeWorkerFactory(), ceWorkerController);
 
   @Test
   public void register_and_unregister() throws Exception {
@@ -124,9 +124,9 @@ public class CeTasksMBeanImplTest {
     for (CeWorker worker : WORKERS) {
       if (i < enabledWorkerCount) {
         enabledWorkers[i] = worker;
-        when(enabledCeWorkerController.isEnabled(worker)).thenReturn(true);
+        when(ceWorkerController.isEnabled(worker)).thenReturn(true);
       } else {
-        when(enabledCeWorkerController.isEnabled(worker)).thenReturn(false);
+        when(ceWorkerController.isEnabled(worker)).thenReturn(false);
       }
       i++;
     }
index 9c1f68683e1b4d70d6da056e8562732afcb85816..2fdbfdf9029b213a8bedcb26c811fbd0b818a55e 100644 (file)
@@ -78,7 +78,7 @@ public class CeProcessingSchedulerImplTest {
   private SchedulerCall regularDelayedPoll = new SchedulerCall(ceWorker, 2000L, MILLISECONDS);
   private SchedulerCall extendedDelayedPoll = new SchedulerCall(ceWorker, 30000L, MILLISECONDS);
   private SchedulerCall notDelayedPoll = new SchedulerCall(ceWorker);
-  private EnabledCeWorkerController ceWorkerController = new EnabledCeWorkerControllerImpl(ceConfiguration);
+  private CeWorkerController ceWorkerController = new CeWorkerControllerImpl(ceConfiguration);
 
   private CeProcessingSchedulerImpl underTest = new CeProcessingSchedulerImpl(ceConfiguration, processingExecutorService, ceWorkerFactory, ceWorkerController);
 
diff --git a/server/sonar-ce/src/test/java/org/sonar/ce/taskprocessor/CeTaskInterrupterProviderTest.java b/server/sonar-ce/src/test/java/org/sonar/ce/taskprocessor/CeTaskInterrupterProviderTest.java
new file mode 100644 (file)
index 0000000..05062eb
--- /dev/null
@@ -0,0 +1,127 @@
+/*
+ * SonarQube
+ * Copyright (C) 2009-2018 SonarSource SA
+ * mailto:info AT sonarsource DOT com
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 3 of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this program; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
+ */
+package org.sonar.ce.taskprocessor;
+
+import java.lang.reflect.Field;
+import java.util.Random;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.sonar.api.config.internal.MapSettings;
+import org.sonar.api.utils.System2;
+import org.sonar.ce.task.CeTaskInterrupter;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Mockito.mock;
+
+public class CeTaskInterrupterProviderTest {
+  @Rule
+  public ExpectedException expectedException = ExpectedException.none();
+
+  private MapSettings settings = new MapSettings();
+  private CeWorkerController ceWorkerController = mock(CeWorkerController.class);
+  private System2 system2 = mock(System2.class);
+  private CeTaskInterrupterProvider underTest = new CeTaskInterrupterProvider();
+
+  @Test
+  public void provide_returns_a_SimpleCeTaskInterrupter_instance_if_configuration_is_empty() {
+    CeTaskInterrupter instance = underTest.provide(settings.asConfig(), ceWorkerController, system2);
+
+    assertThat(instance)
+      .isInstanceOf(SimpleCeTaskInterrupter.class);
+  }
+
+  @Test
+  public void provide_always_return_the_same_SimpleCeTaskInterrupter_instance() {
+    CeTaskInterrupter instance = underTest.provide(settings.asConfig(), ceWorkerController, system2);
+
+    assertThat(instance)
+      .isSameAs(underTest.provide(settings.asConfig(), ceWorkerController, system2))
+      .isSameAs(underTest.provide(new MapSettings().asConfig(), ceWorkerController, system2));
+  }
+
+  @Test
+  public void provide_returns_a_TimeoutCeTaskInterrupter_instance_if_property_taskTimeout_has_a_value() throws IllegalAccessException, NoSuchFieldException {
+    int timeout = 1 + new Random().nextInt(2222);
+    settings.setProperty("sonar.ce.task.timeoutSeconds", timeout);
+
+    CeTaskInterrupter instance = underTest.provide(settings.asConfig(), ceWorkerController, system2);
+
+    assertThat(instance)
+      .isInstanceOf(TimeoutCeTaskInterrupter.class);
+
+    assertThat(readField(instance, "taskTimeoutThreshold"))
+      .isEqualTo(timeout * 1_000L);
+    assertThat(readField(instance, "ceWorkerController"))
+      .isSameAs(ceWorkerController);
+    assertThat(readField(instance, "system2"))
+      .isSameAs(system2);
+  }
+
+  @Test
+  public void provide_fails_with_ISE_if_property_is_not_a_long() {
+    settings.setProperty("sonar.ce.task.timeoutSeconds", "foo");
+
+    expectedException.expect(IllegalStateException.class);
+    expectedException.expectMessage("The property 'sonar.ce.task.timeoutSeconds' is not an long value: For input string: \"foo\"");
+
+    underTest.provide(settings.asConfig(), ceWorkerController, system2);
+  }
+
+  @Test
+  public void provide_fails_with_ISE_if_property_is_zero() {
+    settings.setProperty("sonar.ce.task.timeoutSeconds", "0");
+
+    expectedException.expect(IllegalStateException.class);
+    expectedException.expectMessage("The property 'sonar.ce.task.timeoutSeconds' must be a long value >= 1. Got '0'");
+
+    underTest.provide(settings.asConfig(), ceWorkerController, system2);
+  }
+
+  @Test
+  public void provide_fails_with_ISE_if_property_is_less_than_zero() {
+    int negativeValue = -(1 + new Random().nextInt(1_212));
+    settings.setProperty("sonar.ce.task.timeoutSeconds", negativeValue);
+
+    expectedException.expect(IllegalStateException.class);
+    expectedException.expectMessage("The property 'sonar.ce.task.timeoutSeconds' must be a long value >= 1. Got '" + negativeValue + "'");
+
+    underTest.provide(settings.asConfig(), ceWorkerController, system2);
+  }
+
+  @Test
+  public void provide_always_return_the_same_TimeoutCeTaskInterrupter_instance() {
+    int timeout = 1 + new Random().nextInt(2222);
+    settings.setProperty("sonar.ce.task.timeoutSeconds", timeout);
+
+    CeTaskInterrupter instance = underTest.provide(settings.asConfig(), ceWorkerController, system2);
+
+    assertThat(instance)
+      .isSameAs(underTest.provide(settings.asConfig(), ceWorkerController, system2))
+      .isSameAs(underTest.provide(new MapSettings().setProperty("sonar.ce.task.timeoutSeconds", 999).asConfig(), ceWorkerController, system2));
+  }
+
+  private static Object readField(CeTaskInterrupter instance, String fieldName) throws NoSuchFieldException, IllegalAccessException {
+    Class<?> clazz = instance.getClass();
+    Field timeoutField = clazz.getDeclaredField(fieldName);
+    timeoutField.setAccessible(true);
+    return timeoutField.get(instance);
+  }
+}
diff --git a/server/sonar-ce/src/test/java/org/sonar/ce/taskprocessor/CeTaskInterrupterWorkerExecutionListenerTest.java b/server/sonar-ce/src/test/java/org/sonar/ce/taskprocessor/CeTaskInterrupterWorkerExecutionListenerTest.java
new file mode 100644 (file)
index 0000000..a2e12ca
--- /dev/null
@@ -0,0 +1,56 @@
+/*
+ * SonarQube
+ * Copyright (C) 2009-2018 SonarSource SA
+ * mailto:info AT sonarsource DOT com
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 3 of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this program; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
+ */
+package org.sonar.ce.taskprocessor;
+
+import java.util.Random;
+import org.junit.Test;
+import org.sonar.ce.task.CeTask;
+import org.sonar.ce.task.CeTaskInterrupter;
+import org.sonar.db.ce.CeActivityDto;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.ArgumentMatchers.same;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+
+public class CeTaskInterrupterWorkerExecutionListenerTest {
+  private CeTaskInterrupter ceTaskInterrupter = mock(CeTaskInterrupter.class);
+  private CeTaskInterrupterWorkerExecutionListener underTest = new CeTaskInterrupterWorkerExecutionListener(ceTaskInterrupter);
+
+
+  @Test
+  public void onStart_delegates_to_ceTaskInterrupter_onStart() {
+    CeTask ceTask = mock(CeTask.class);
+
+    underTest.onStart(ceTask);
+
+    verify(ceTaskInterrupter).onStart(same(ceTask));
+  }
+
+  @Test
+  public void onEnd_delegates_to_ceTaskInterrupter_onEnd() {
+    CeTask ceTask = mock(CeTask.class);
+    CeActivityDto.Status randomStatus = CeActivityDto.Status.values()[new Random().nextInt(CeActivityDto.Status.values().length)];
+
+    underTest.onEnd(ceTask, randomStatus, null, null);
+
+    verify(ceTaskInterrupter).onEnd(same(ceTask));
+  }
+}
index f11d88125e8fc5c9b8d872e6a408a0d8a1539604..2f90ceb80d9a23566dda18e96b3ac2bf11afc7de 100644 (file)
@@ -21,7 +21,9 @@ package org.sonar.ce.taskprocessor;
 
 import org.junit.Test;
 import org.picocontainer.ComponentAdapter;
+import org.sonar.api.config.internal.MapSettings;
 import org.sonar.ce.notification.ReportAnalysisFailureNotificationExecutionListener;
+import org.sonar.ce.task.CeTaskInterrupter;
 import org.sonar.core.platform.ComponentContainer;
 
 import static org.assertj.core.api.Assertions.assertThat;
@@ -52,4 +54,15 @@ public class CeTaskProcessorModuleTest {
       .map(ComponentAdapter::getComponentImplementation))
         .contains(ReportAnalysisFailureNotificationExecutionListener.class);
   }
+
+  @Test
+  public void defines_CeTaskInterrupterProvider_object() {
+    ComponentContainer container = new ComponentContainer();
+
+    underTest.configure(container);
+
+
+    assertThat(container.getPicoContainer().getComponentAdapter(CeTaskInterrupter.class))
+      .isInstanceOf(CeTaskInterrupterProvider.class);
+  }
 }
diff --git a/server/sonar-ce/src/test/java/org/sonar/ce/taskprocessor/CeWorkerControllerImplTest.java b/server/sonar-ce/src/test/java/org/sonar/ce/taskprocessor/CeWorkerControllerImplTest.java
new file mode 100644 (file)
index 0000000..14f1ac3
--- /dev/null
@@ -0,0 +1,172 @@
+/*
+ * SonarQube
+ * Copyright (C) 2009-2018 SonarSource SA
+ * mailto:info AT sonarsource DOT com
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 3 of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this program; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
+ */
+package org.sonar.ce.taskprocessor;
+
+import java.util.Random;
+import org.junit.Rule;
+import org.junit.Test;
+import org.sonar.api.utils.log.LogTester;
+import org.sonar.api.utils.log.LoggerLevel;
+import org.sonar.ce.configuration.CeConfigurationRule;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.reset;
+import static org.mockito.Mockito.when;
+
+public class CeWorkerControllerImplTest {
+  private Random random = new Random();
+  /** 1 <= workerCount <= 5 */
+  private int randomWorkerCount = 1 + random.nextInt(5);
+
+  @Rule
+  public CeConfigurationRule ceConfigurationRule = new CeConfigurationRule()
+    .setWorkerCount(randomWorkerCount);
+  @Rule
+  public LogTester logTester = new LogTester();
+
+  private CeWorker ceWorker = mock(CeWorker.class);
+  private CeWorkerControllerImpl underTest = new CeWorkerControllerImpl(ceConfigurationRule);
+
+  @Test
+  public void isEnabled_returns_true_if_worker_ordinal_is_less_than_CeConfiguration_workerCount() {
+    int ordinal = randomWorkerCount + Math.min(-1, -random.nextInt(randomWorkerCount));
+    when(ceWorker.getOrdinal()).thenReturn(ordinal);
+
+    assertThat(underTest.isEnabled(ceWorker))
+      .as("For ordinal " + ordinal + " and workerCount " + randomWorkerCount)
+      .isTrue();
+  }
+
+  @Test
+  public void isEnabled_returns_false_if_worker_ordinal_is_equal_to_CeConfiguration_workerCount() {
+    when(ceWorker.getOrdinal()).thenReturn(randomWorkerCount);
+
+    assertThat(underTest.isEnabled(ceWorker)).isFalse();
+  }
+
+  @Test
+  public void isEnabled_returns_true_if_ordinal_is_invalid() {
+    int ordinal = -1 - random.nextInt(3);
+    when(ceWorker.getOrdinal()).thenReturn(ordinal);
+
+    assertThat(underTest.isEnabled(ceWorker))
+      .as("For invalid ordinal " + ordinal + " and workerCount " + randomWorkerCount)
+      .isTrue();
+  }
+
+  @Test
+  public void constructor_writes_no_info_log_if_workerCount_is_1() {
+    ceConfigurationRule.setWorkerCount(1);
+    logTester.clear();
+
+    new CeWorkerControllerImpl(ceConfigurationRule);
+
+    assertThat(logTester.logs()).isEmpty();
+  }
+
+  @Test
+  public void constructor_writes_info_log_if_workerCount_is_greater_than_1() {
+    int newWorkerCount = randomWorkerCount + 1;
+    ceConfigurationRule.setWorkerCount(newWorkerCount);
+    logTester.clear();
+
+    new CeWorkerControllerImpl(ceConfigurationRule);
+
+    verifyInfoLog(newWorkerCount);
+  }
+
+  @Test
+  public void workerCount_is_always_reloaded() {
+    when(ceWorker.getOrdinal()).thenReturn(1);
+
+    ceConfigurationRule.setWorkerCount(1);
+    assertThat(underTest.isEnabled(ceWorker)).isFalse();
+
+    ceConfigurationRule.setWorkerCount(2);
+    assertThat(underTest.isEnabled(ceWorker)).isTrue();
+  }
+
+  @Test
+  public void getCeWorkerIn_returns_empty_if_worker_is_unregistered_in_CeWorkerController() {
+    CeWorker ceWorker = mock(CeWorker.class);
+    Thread currentThread = Thread.currentThread();
+    Thread otherThread = new Thread();
+
+    mockWorkerIsRunningOnNoThread(ceWorker);
+    assertThat(underTest.getCeWorkerIn(currentThread)).isEmpty();
+    assertThat(underTest.getCeWorkerIn(otherThread)).isEmpty();
+
+    mockWorkerIsRunningOnThread(ceWorker, currentThread);
+    assertThat(underTest.getCeWorkerIn(currentThread)).isEmpty();
+    assertThat(underTest.getCeWorkerIn(otherThread)).isEmpty();
+
+    mockWorkerIsRunningOnThread(ceWorker, otherThread);
+    assertThat(underTest.getCeWorkerIn(currentThread)).isEmpty();
+    assertThat(underTest.getCeWorkerIn(otherThread)).isEmpty();
+  }
+
+  @Test
+  public void getCeWorkerIn_returns_empty_if_worker_registered_in_CeWorkerController_but_has_no_current_thread() {
+    CeWorker ceWorker = mock(CeWorker.class);
+    Thread currentThread = Thread.currentThread();
+    Thread otherThread = new Thread();
+
+    underTest.registerProcessingFor(ceWorker);
+
+    mockWorkerIsRunningOnNoThread(ceWorker);
+    assertThat(underTest.getCeWorkerIn(currentThread)).isEmpty();
+    assertThat(underTest.getCeWorkerIn(otherThread)).isEmpty();
+  }
+
+  @Test
+  public void getCeWorkerIn_returns_thread_if_worker_registered_in_CeWorkerController_but_has_a_current_thread() {
+    CeWorker ceWorker = mock(CeWorker.class);
+    Thread currentThread = Thread.currentThread();
+    Thread otherThread = new Thread();
+
+    underTest.registerProcessingFor(ceWorker);
+
+    mockWorkerIsRunningOnThread(ceWorker, currentThread);
+    assertThat(underTest.getCeWorkerIn(currentThread)).contains(ceWorker);
+    assertThat(underTest.getCeWorkerIn(otherThread)).isEmpty();
+
+    mockWorkerIsRunningOnThread(ceWorker, otherThread);
+    assertThat(underTest.getCeWorkerIn(currentThread)).isEmpty();
+    assertThat(underTest.getCeWorkerIn(otherThread)).contains(ceWorker);
+  }
+
+  private void mockWorkerIsRunningOnThread(CeWorker ceWorker, Thread thread) {
+    reset(ceWorker);
+    when(ceWorker.isExecutedBy(thread)).thenReturn(true);
+  }
+
+  private void mockWorkerIsRunningOnNoThread(CeWorker ceWorker) {
+    reset(ceWorker);
+    when(ceWorker.isExecutedBy(any())).thenReturn(false);
+  }
+
+  private void verifyInfoLog(int workerCount) {
+    assertThat(logTester.logs()).hasSize(1);
+    assertThat(logTester.logs(LoggerLevel.INFO))
+      .containsOnly("Compute Engine will use " + workerCount + " concurrent workers to process tasks");
+  }
+}
index 4f0603ef59f283146daa294b368572bf423b3d54..b88fbd4f7df3689186f1f013cc08fc09bc531257 100644 (file)
@@ -35,7 +35,7 @@ import static org.mockito.Mockito.mock;
 public class CeWorkerFactoryImplTest {
   private int randomOrdinal = new Random().nextInt(20);
   private CeWorkerFactoryImpl underTest = new CeWorkerFactoryImpl(mock(InternalCeQueue.class),
-    mock(CeTaskProcessorRepository.class), UuidFactoryImpl.INSTANCE, mock(EnabledCeWorkerController.class));
+    mock(CeTaskProcessorRepository.class), UuidFactoryImpl.INSTANCE, mock(CeWorkerController.class));
 
   @Test
   public void create_return_CeWorker_object_with_specified_ordinal() {
@@ -49,7 +49,7 @@ public class CeWorkerFactoryImplTest {
     CeWorker.ExecutionListener executionListener1 = mock(CeWorker.ExecutionListener.class);
     CeWorker.ExecutionListener executionListener2 = mock(CeWorker.ExecutionListener.class);
     CeWorkerFactoryImpl underTest = new CeWorkerFactoryImpl(mock(InternalCeQueue.class),
-      mock(CeTaskProcessorRepository.class), UuidFactoryImpl.INSTANCE, mock(EnabledCeWorkerController.class),
+      mock(CeTaskProcessorRepository.class), UuidFactoryImpl.INSTANCE, mock(CeWorkerController.class),
       new CeWorker.ExecutionListener[] {executionListener1, executionListener2});
 
     CeWorker ceWorker = underTest.create(randomOrdinal);
index 0b15cca7ef3978a047affe9db1d687dc0b748f69..db549d4c779d94b4fb10ecea1cc1a71260cdeab5 100644 (file)
@@ -24,10 +24,12 @@ import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Random;
+import java.util.Set;
 import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 import javax.annotation.CheckForNull;
 import javax.annotation.Nullable;
-import org.apache.commons.lang.RandomStringUtils;
 import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
@@ -35,6 +37,7 @@ import org.junit.rules.ExpectedException;
 import org.mockito.ArgumentCaptor;
 import org.mockito.InOrder;
 import org.mockito.Mockito;
+import org.mockito.stubbing.Answer;
 import org.sonar.api.utils.MessageException;
 import org.sonar.api.utils.System2;
 import org.sonar.api.utils.internal.TestSystem2;
@@ -45,6 +48,7 @@ import org.sonar.ce.queue.InternalCeQueue;
 import org.sonar.ce.task.CeTask;
 import org.sonar.ce.task.CeTaskResult;
 import org.sonar.ce.task.projectanalysis.taskprocessor.ReportTaskProcessor;
+import org.sonar.ce.task.taskprocessor.CeTaskProcessor;
 import org.sonar.db.DbSession;
 import org.sonar.db.DbTester;
 import org.sonar.db.ce.CeActivityDto;
@@ -53,6 +57,7 @@ import org.sonar.db.user.UserDto;
 import org.sonar.db.user.UserTesting;
 import org.sonar.server.organization.BillingValidations;
 
+import static org.apache.commons.lang.RandomStringUtils.randomAlphabetic;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyString;
@@ -85,19 +90,19 @@ public class CeWorkerImplTest {
   private ReportTaskProcessor taskProcessor = mock(ReportTaskProcessor.class);
   private CeWorker.ExecutionListener executionListener1 = mock(CeWorker.ExecutionListener.class);
   private CeWorker.ExecutionListener executionListener2 = mock(CeWorker.ExecutionListener.class);
-  private EnabledCeWorkerController enabledCeWorkerController = mock(EnabledCeWorkerController.class);
+  private CeWorkerController ceWorkerController = mock(CeWorkerController.class);
   private ArgumentCaptor<String> workerUuidCaptor = ArgumentCaptor.forClass(String.class);
   private int randomOrdinal = new Random().nextInt(50);
   private String workerUuid = UUID.randomUUID().toString();
-  private CeWorker underTest = new CeWorkerImpl(randomOrdinal, workerUuid, queue, taskProcessorRepository, enabledCeWorkerController,
+  private CeWorker underTest = new CeWorkerImpl(randomOrdinal, workerUuid, queue, taskProcessorRepository, ceWorkerController,
     executionListener1, executionListener2);
-  private CeWorker underTestNoListener = new CeWorkerImpl(randomOrdinal, workerUuid, queue, taskProcessorRepository, enabledCeWorkerController);
+  private CeWorker underTestNoListener = new CeWorkerImpl(randomOrdinal, workerUuid, queue, taskProcessorRepository, ceWorkerController);
   private InOrder inOrder = Mockito.inOrder(taskProcessor, queue, executionListener1, executionListener2);
   private final CeTask.User submitter = new CeTask.User("UUID_USER_1", "LOGIN_1");
 
   @Before
   public void setUp() {
-    when(enabledCeWorkerController.isEnabled(any(CeWorker.class))).thenReturn(true);
+    when(ceWorkerController.isEnabled(any(CeWorker.class))).thenReturn(true);
   }
 
   @Test
@@ -105,20 +110,20 @@ public class CeWorkerImplTest {
     expectedException.expect(IllegalArgumentException.class);
     expectedException.expectMessage("Ordinal must be >= 0");
 
-    new CeWorkerImpl(-1 - new Random().nextInt(20), workerUuid, queue, taskProcessorRepository, enabledCeWorkerController);
+    new CeWorkerImpl(-1 - new Random().nextInt(20), workerUuid, queue, taskProcessorRepository, ceWorkerController);
   }
 
   @Test
   public void getUUID_must_return_the_uuid_of_constructor() {
     String uuid = UUID.randomUUID().toString();
-    CeWorker underTest = new CeWorkerImpl(randomOrdinal, uuid, queue, taskProcessorRepository, enabledCeWorkerController);
+    CeWorker underTest = new CeWorkerImpl(randomOrdinal, uuid, queue, taskProcessorRepository, ceWorkerController);
     assertThat(underTest.getUUID()).isEqualTo(uuid);
   }
 
   @Test
   public void worker_disabled() throws Exception {
-    reset(enabledCeWorkerController);
-    when(enabledCeWorkerController.isEnabled(underTest)).thenReturn(false);
+    reset(ceWorkerController);
+    when(ceWorkerController.isEnabled(underTest)).thenReturn(false);
 
     assertThat(underTest.call()).isEqualTo(DISABLED);
 
@@ -127,8 +132,8 @@ public class CeWorkerImplTest {
 
   @Test
   public void worker_disabled_no_listener() throws Exception {
-    reset(enabledCeWorkerController);
-    when(enabledCeWorkerController.isEnabled(underTest)).thenReturn(false);
+    reset(ceWorkerController);
+    when(ceWorkerController.isEnabled(underTest)).thenReturn(false);
 
     assertThat(underTestNoListener.call()).isEqualTo(DISABLED);
 
@@ -391,7 +396,7 @@ public class CeWorkerImplTest {
 
   @Test
   public void call_sets_and_restores_thread_name_with_information_of_worker_when_there_is_no_task_to_process() throws Exception {
-    String threadName = RandomStringUtils.randomAlphabetic(3);
+    String threadName = randomAlphabetic(3);
     when(queue.peek(anyString())).thenAnswer(invocation -> {
       assertThat(Thread.currentThread().getName())
         .isEqualTo("Worker " + randomOrdinal + " (UUID=" + workerUuid + ") on " + threadName);
@@ -405,7 +410,7 @@ public class CeWorkerImplTest {
 
   @Test
   public void call_sets_and_restores_thread_name_with_information_of_worker_when_a_task_is_processed() throws Exception {
-    String threadName = RandomStringUtils.randomAlphabetic(3);
+    String threadName = randomAlphabetic(3);
     when(queue.peek(anyString())).thenAnswer(invocation -> {
       assertThat(Thread.currentThread().getName())
         .isEqualTo("Worker " + randomOrdinal + " (UUID=" + workerUuid + ") on " + threadName);
@@ -420,7 +425,7 @@ public class CeWorkerImplTest {
 
   @Test
   public void call_sets_and_restores_thread_name_with_information_of_worker_when_an_error_occurs() throws Exception {
-    String threadName = RandomStringUtils.randomAlphabetic(3);
+    String threadName = randomAlphabetic(3);
     CeTask ceTask = createCeTask(submitter);
     when(queue.peek(anyString())).thenAnswer(invocation -> {
       assertThat(Thread.currentThread().getName())
@@ -437,10 +442,10 @@ public class CeWorkerImplTest {
 
   @Test
   public void call_sets_and_restores_thread_name_with_information_of_worker_when_worker_is_disabled() throws Exception {
-    reset(enabledCeWorkerController);
-    when(enabledCeWorkerController.isEnabled(underTest)).thenReturn(false);
+    reset(ceWorkerController);
+    when(ceWorkerController.isEnabled(underTest)).thenReturn(false);
 
-    String threadName = RandomStringUtils.randomAlphabetic(3);
+    String threadName = randomAlphabetic(3);
     Thread newThread = createThreadNameVerifyingThread(threadName);
 
     newThread.start();
@@ -561,6 +566,166 @@ public class CeWorkerImplTest {
     assertThat(((Exception) arg1).getSuppressed()).containsOnly(ex);
   }
 
+  @Test
+  public void isExecutedBy_returns_false_when_no_interaction_with_instance() {
+    assertThat(underTest.isExecutedBy(Thread.currentThread())).isFalse();
+    assertThat(underTest.isExecutedBy(new Thread())).isFalse();
+  }
+
+  @Test
+  public void isExecutedBy_returns_false_unless_a_thread_is_currently_calling_call() throws InterruptedException {
+    CountDownLatch inCallLatch = new CountDownLatch(1);
+    CountDownLatch assertionsDoneLatch = new CountDownLatch(1);
+    // mock long running peek(String) call => Thread is executing call() but not running a task
+    when(queue.peek(anyString())).thenAnswer((Answer<Optional<CeTask>>) invocation -> {
+      inCallLatch.countDown();
+      try {
+        assertionsDoneLatch.await(10, TimeUnit.SECONDS);
+      } catch (InterruptedException e) {
+        throw new RuntimeException(e);
+      }
+      return Optional.empty();
+    });
+    Thread t = callCallInNewThread(underTest);
+
+    try {
+      t.start();
+
+      inCallLatch.await(10, TimeUnit.SECONDS);
+      assertThat(underTest.isExecutedBy(Thread.currentThread())).isFalse();
+      assertThat(underTest.isExecutedBy(new Thread())).isFalse();
+      assertThat(underTest.isExecutedBy(t)).isTrue();
+    } finally {
+      assertionsDoneLatch.countDown();
+      t.join();
+    }
+
+    assertThat(underTest.isExecutedBy(Thread.currentThread())).isFalse();
+    assertThat(underTest.isExecutedBy(new Thread())).isFalse();
+    assertThat(underTest.isExecutedBy(t)).isFalse();
+  }
+
+  @Test
+  public void isExecutedBy_returns_false_unless_a_thread_is_currently_executing_a_task() throws InterruptedException {
+    CountDownLatch inCallLatch = new CountDownLatch(1);
+    CountDownLatch assertionsDoneLatch = new CountDownLatch(1);
+    String taskType = randomAlphabetic(12);
+    CeTask ceTask = mock(CeTask.class);
+    when(ceTask.getType()).thenReturn(taskType);
+    when(queue.peek(anyString())).thenReturn(Optional.of(ceTask));
+    taskProcessorRepository.setProcessorForTask(taskType, new SimpleCeTaskProcessor() {
+      @CheckForNull
+      @Override
+      public CeTaskResult process(CeTask task) {
+        inCallLatch.countDown();
+        try {
+          assertionsDoneLatch.await(10, TimeUnit.SECONDS);
+        } catch (InterruptedException e) {
+          throw new RuntimeException(e);
+        }
+        return null;
+      }
+    });
+    Thread t = callCallInNewThread(underTest);
+
+    try {
+      t.start();
+
+      inCallLatch.await(10, TimeUnit.SECONDS);
+      assertThat(underTest.isExecutedBy(Thread.currentThread())).isFalse();
+      assertThat(underTest.isExecutedBy(new Thread())).isFalse();
+      assertThat(underTest.isExecutedBy(t)).isTrue();
+    } finally {
+      assertionsDoneLatch.countDown();
+      t.join();
+    }
+
+    assertThat(underTest.isExecutedBy(Thread.currentThread())).isFalse();
+    assertThat(underTest.isExecutedBy(new Thread())).isFalse();
+    assertThat(underTest.isExecutedBy(t)).isFalse();
+  }
+
+  @Test
+  public void getCurrentTask_returns_empty_when_no_interaction_with_instance() {
+    assertThat(underTest.getCurrentTask()).isEmpty();
+  }
+
+  @Test
+  public void getCurrentTask_returns_empty_when_a_thread_is_currently_calling_call_but_not_executing_a_task() throws InterruptedException {
+    CountDownLatch inCallLatch = new CountDownLatch(1);
+    CountDownLatch assertionsDoneLatch = new CountDownLatch(1);
+    // mock long running peek(String) call => Thread is executing call() but not running a task
+    when(queue.peek(anyString())).thenAnswer((Answer<Optional<CeTask>>) invocation -> {
+      inCallLatch.countDown();
+      try {
+        assertionsDoneLatch.await(10, TimeUnit.SECONDS);
+      } catch (InterruptedException e) {
+        throw new RuntimeException(e);
+      }
+      return Optional.empty();
+    });
+    Thread t = callCallInNewThread(underTest);
+
+    try {
+      t.start();
+
+      inCallLatch.await(10, TimeUnit.SECONDS);
+      assertThat(underTest.getCurrentTask()).isEmpty();
+    } finally {
+      assertionsDoneLatch.countDown();
+      t.join();
+    }
+
+    assertThat(underTest.getCurrentTask()).isEmpty();
+  }
+
+  @Test
+  public void getCurrentTask_returns_empty_unless_a_thread_is_currently_executing_a_task() throws InterruptedException {
+    CountDownLatch inCallLatch = new CountDownLatch(1);
+    CountDownLatch assertionsDoneLatch = new CountDownLatch(1);
+    String taskType = randomAlphabetic(12);
+    CeTask ceTask = mock(CeTask.class);
+    when(ceTask.getType()).thenReturn(taskType);
+    when(queue.peek(anyString())).thenReturn(Optional.of(ceTask));
+    taskProcessorRepository.setProcessorForTask(taskType, new SimpleCeTaskProcessor() {
+
+      @CheckForNull
+      @Override
+      public CeTaskResult process(CeTask task) {
+        inCallLatch.countDown();
+        try {
+          assertionsDoneLatch.await(10, TimeUnit.SECONDS);
+        } catch (InterruptedException e) {
+          throw new RuntimeException(e);
+        }
+        return null;
+      }
+    });
+    Thread t = callCallInNewThread(underTest);
+
+    try {
+      t.start();
+
+      inCallLatch.await(10, TimeUnit.SECONDS);
+      assertThat(underTest.getCurrentTask()).contains(ceTask);
+    } finally {
+      assertionsDoneLatch.countDown();
+      t.join();
+    }
+
+    assertThat(underTest.getCurrentTask()).isEmpty();
+  }
+
+  private Thread callCallInNewThread(CeWorker underTest) {
+    return new Thread(() -> {
+      try {
+        underTest.call();
+      } catch (Exception e) {
+        throw new RuntimeException("call to call() failed and this is unexpected. Fix the UT.", e);
+      }
+    });
+  }
+
   private Thread createThreadNameVerifyingThread(String threadName) {
     return new Thread(() -> {
       verifyUnchangedThreadName(threadName);
@@ -597,7 +762,7 @@ public class CeWorkerImplTest {
       .setCharacteristics(characteristicMap)
       .build();
   }
-  
+
   private UserDto insertRandomUser() {
     UserDto userDto = UserTesting.newUserDto();
     db.getDbClient().userDao().insert(session, userDto);
@@ -617,4 +782,11 @@ public class CeWorkerImplTest {
     doThrow(t).when(taskProcessor).process(task);
     return t;
   }
+
+  private static abstract class SimpleCeTaskProcessor implements CeTaskProcessor {
+    @Override
+    public Set<String> getHandledCeTaskTypes() {
+      throw new UnsupportedOperationException("getHandledCeTaskTypes should not be called");
+    }
+  }
 }
diff --git a/server/sonar-ce/src/test/java/org/sonar/ce/taskprocessor/ComputingThread.java b/server/sonar-ce/src/test/java/org/sonar/ce/taskprocessor/ComputingThread.java
new file mode 100644 (file)
index 0000000..3bb0a82
--- /dev/null
@@ -0,0 +1,55 @@
+/*
+ * SonarQube
+ * Copyright (C) 2009-2018 SonarSource SA
+ * mailto:info AT sonarsource DOT com
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 3 of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this program; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
+ */
+package org.sonar.ce.taskprocessor;
+
+class ComputingThread extends Thread {
+  private boolean kill = false;
+
+  public ComputingThread(String name) {
+    setName(name);
+  }
+
+  private long fibo(int i) {
+    if (kill) {
+      return i;
+    }
+    if (i == 0) {
+      return 0;
+    }
+    if (i == 1) {
+      return 1;
+    }
+    return fibo(i - 1) + fibo(i - 2);
+  }
+
+  @Override
+  public void run() {
+    for (int i = 2; i < 50; i++) {
+      fibo(i);
+      if (kill) {
+        break;
+      }
+    }
+  }
+
+  public void kill() {
+    this.kill = true;
+  }
+}
diff --git a/server/sonar-ce/src/test/java/org/sonar/ce/taskprocessor/EnabledCeWorkerControllerImplTest.java b/server/sonar-ce/src/test/java/org/sonar/ce/taskprocessor/EnabledCeWorkerControllerImplTest.java
deleted file mode 100644 (file)
index 54c90fb..0000000
+++ /dev/null
@@ -1,111 +0,0 @@
-/*
- * SonarQube
- * Copyright (C) 2009-2018 SonarSource SA
- * mailto:info AT sonarsource DOT com
- *
- * This program is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License as published by the Free Software Foundation; either
- * version 3 of the License, or (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public License
- * along with this program; if not, write to the Free Software Foundation,
- * Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
- */
-package org.sonar.ce.taskprocessor;
-
-import java.util.Random;
-import org.junit.Rule;
-import org.junit.Test;
-import org.sonar.api.utils.log.LogTester;
-import org.sonar.api.utils.log.LoggerLevel;
-import org.sonar.ce.configuration.CeConfigurationRule;
-
-import static org.assertj.core.api.Assertions.assertThat;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-public class EnabledCeWorkerControllerImplTest {
-  private Random random = new Random();
-  /** 1 <= workerCount <= 5 */
-  private int randomWorkerCount = 1 + random.nextInt(5);
-
-  @Rule
-  public CeConfigurationRule ceConfigurationRule = new CeConfigurationRule()
-    .setWorkerCount(randomWorkerCount);
-  @Rule
-  public LogTester logTester = new LogTester();
-
-  private CeWorker ceWorker = mock(CeWorker.class);
-  private EnabledCeWorkerControllerImpl underTest = new EnabledCeWorkerControllerImpl(ceConfigurationRule);
-
-  @Test
-  public void isEnabled_returns_true_if_worker_ordinal_is_less_than_CeConfiguration_workerCount() {
-    int ordinal = randomWorkerCount + Math.min(-1, -random.nextInt(randomWorkerCount));
-    when(ceWorker.getOrdinal()).thenReturn(ordinal);
-
-    assertThat(underTest.isEnabled(ceWorker))
-      .as("For ordinal " + ordinal + " and workerCount " + randomWorkerCount)
-      .isTrue();
-  }
-
-  @Test
-  public void isEnabled_returns_false_if_worker_ordinal_is_equal_to_CeConfiguration_workerCount() {
-    when(ceWorker.getOrdinal()).thenReturn(randomWorkerCount);
-
-    assertThat(underTest.isEnabled(ceWorker)).isFalse();
-  }
-
-  @Test
-  public void isEnabled_returns_true_if_ordinal_is_invalid() {
-    int ordinal = -1 - random.nextInt(3);
-    when(ceWorker.getOrdinal()).thenReturn(ordinal);
-
-    assertThat(underTest.isEnabled(ceWorker))
-      .as("For invalid ordinal " + ordinal + " and workerCount " + randomWorkerCount)
-      .isTrue();
-  }
-
-  @Test
-  public void constructor_writes_no_info_log_if_workerCount_is_1() {
-    ceConfigurationRule.setWorkerCount(1);
-    logTester.clear();
-
-    new EnabledCeWorkerControllerImpl(ceConfigurationRule);
-
-    assertThat(logTester.logs()).isEmpty();
-  }
-
-  @Test
-  public void constructor_writes_info_log_if_workerCount_is_greater_than_1() {
-    int newWorkerCount = randomWorkerCount + 1;
-    ceConfigurationRule.setWorkerCount(newWorkerCount);
-    logTester.clear();
-
-    new EnabledCeWorkerControllerImpl(ceConfigurationRule);
-
-    verifyInfoLog(newWorkerCount);
-  }
-
-  @Test
-  public void workerCount_is_always_reloaded() {
-    when(ceWorker.getOrdinal()).thenReturn(1);
-
-    ceConfigurationRule.setWorkerCount(1);
-    assertThat(underTest.isEnabled(ceWorker)).isFalse();
-
-    ceConfigurationRule.setWorkerCount(2);
-    assertThat(underTest.isEnabled(ceWorker)).isTrue();
-  }
-
-  private void verifyInfoLog(int workerCount) {
-    assertThat(logTester.logs()).hasSize(1);
-    assertThat(logTester.logs(LoggerLevel.INFO))
-      .containsOnly("Compute Engine will use " + workerCount + " concurrent workers to process tasks");
-  }
-}
diff --git a/server/sonar-ce/src/test/java/org/sonar/ce/taskprocessor/SimpleCeTaskInterrupterTest.java b/server/sonar-ce/src/test/java/org/sonar/ce/taskprocessor/SimpleCeTaskInterrupterTest.java
new file mode 100644 (file)
index 0000000..c8f40e9
--- /dev/null
@@ -0,0 +1,78 @@
+/*
+ * SonarQube
+ * Copyright (C) 2009-2018 SonarSource SA
+ * mailto:info AT sonarsource DOT com
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 3 of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this program; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
+ */
+package org.sonar.ce.taskprocessor;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.sonar.ce.task.CeTask;
+import org.sonar.ce.task.CeTaskCanceledException;
+
+import static org.apache.commons.lang.RandomStringUtils.randomAlphabetic;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verifyZeroInteractions;
+
+public class SimpleCeTaskInterrupterTest {
+  @Rule
+  public ExpectedException expectedException = ExpectedException.none();
+
+  private SimpleCeTaskInterrupter underTest = new SimpleCeTaskInterrupter();
+
+  @Test
+  public void check_throws_CeTaskCanceledException_if_provided_thread_is_interrupted() throws InterruptedException {
+    String threadName = randomAlphabetic(30);
+    ComputingThread t = new ComputingThread(threadName);
+
+    try {
+      t.start();
+
+      // will not fail
+      underTest.check(t);
+
+      t.interrupt();
+
+      expectedException.expect(CeTaskCanceledException.class);
+      expectedException.expectMessage("CeWorker executing in Thread '" + threadName + "' has been interrupted");
+
+      underTest.check(t);
+    } finally {
+      t.kill();
+      t.join(1_000);
+    }
+  }
+
+  @Test
+  public void onStart_has_no_effect() {
+    CeTask ceTask = mock(CeTask.class);
+
+    underTest.onStart(ceTask);
+
+    verifyZeroInteractions(ceTask);
+  }
+
+  @Test
+  public void onEnd_has_no_effect() {
+    CeTask ceTask = mock(CeTask.class);
+
+    underTest.onEnd(ceTask);
+
+    verifyZeroInteractions(ceTask);
+  }
+}
diff --git a/server/sonar-ce/src/test/java/org/sonar/ce/taskprocessor/TimeoutCeTaskInterrupterTest.java b/server/sonar-ce/src/test/java/org/sonar/ce/taskprocessor/TimeoutCeTaskInterrupterTest.java
new file mode 100644 (file)
index 0000000..a49c812
--- /dev/null
@@ -0,0 +1,223 @@
+/*
+ * SonarQube
+ * Copyright (C) 2009-2018 SonarSource SA
+ * mailto:info AT sonarsource DOT com
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 3 of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this program; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
+ */
+package org.sonar.ce.taskprocessor;
+
+import java.util.Optional;
+import java.util.Random;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.sonar.api.utils.System2;
+import org.sonar.api.utils.log.LogTester;
+import org.sonar.api.utils.log.LoggerLevel;
+import org.sonar.ce.task.CeTask;
+import org.sonar.ce.task.CeTaskCanceledException;
+import org.sonar.ce.task.CeTaskTimeoutException;
+
+import static org.apache.commons.lang.RandomStringUtils.randomAlphabetic;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class TimeoutCeTaskInterrupterTest {
+  @Rule
+  public ExpectedException expectedException = ExpectedException.none();
+  @Rule
+  public LogTester logTester = new LogTester();
+
+  private int timeoutInSeconds = 1 + new Random().nextInt(20);
+  private int timeoutInMs = timeoutInSeconds * 1_000;
+  private CeWorkerController ceWorkerController = mock(CeWorkerController.class);
+  private System2 system2 = mock(System2.class);
+  private CeWorker ceWorker = mock(CeWorker.class);
+  private CeTask ceTask = mock(CeTask.class);
+  private TimeoutCeTaskInterrupter underTest = new TimeoutCeTaskInterrupter(timeoutInMs, ceWorkerController, system2);
+
+  @Test
+  public void constructor_fails_with_IAE_if_timeout_is_0() {
+    expectedException.expect(IllegalArgumentException.class);
+    expectedException.expectMessage("threshold must be >= 1");
+
+    new TimeoutCeTaskInterrupter(0, ceWorkerController, system2);
+  }
+
+  @Test
+  public void constructor_fails_with_IAE_if_timeout_is_less_than_0() {
+    long timeout = - (1 + new Random().nextInt(299));
+    expectedException.expect(IllegalArgumentException.class);
+    expectedException.expectMessage("threshold must be >= 1");
+
+    new TimeoutCeTaskInterrupter(timeout, ceWorkerController, system2);
+  }
+
+  @Test
+  public void constructor_log_timeout_in_ms_at_INFO_level() {
+    int timeout = 1 + new Random().nextInt(9_999);
+
+    new TimeoutCeTaskInterrupter(timeout, ceWorkerController, system2);
+
+    assertThat(logTester.logs()).hasSize(1);
+    assertThat(logTester.logs(LoggerLevel.INFO))
+      .containsExactly("Compute Engine Task timeout enabled: " + timeout + " ms");
+  }
+
+  @Test
+  public void check_fails_with_ISE_if_thread_is_not_running_a_CeWorker() {
+    Thread t = newThreadWithRandomName();
+
+    expectedException.expect(IllegalStateException.class);
+    expectedException.expectMessage("Could not find the CeTask being executed in thread '" + t.getName() + "'");
+
+    underTest.check(t);
+  }
+
+  @Test
+  public void check_fails_with_ISE_if_thread_is_not_running_a_CeWorker_with_no_current_CeTask() {
+    Thread t = newThreadWithRandomName();
+    mockWorkerOnThread(t, ceWorker);
+
+    expectedException.expect(IllegalStateException.class);
+    expectedException.expectMessage("Could not find the CeTask being executed in thread '" + t.getName() + "'");
+
+    underTest.check(t);
+  }
+
+  @Test
+  public void check_fails_with_ISE_if_thread_is_executing_a_CeTask_but_on_start_has_not_been_called_on_it() {
+    String taskUuid = randomAlphabetic(15);
+    Thread t = new Thread();
+    mockWorkerOnThread(t, ceWorker);
+    mockWorkerWithTask(ceTask);
+    when(ceTask.getUuid()).thenReturn(taskUuid);
+
+    expectedException.expect(IllegalStateException.class);
+    expectedException.expectMessage("No start time recorded for task " + taskUuid);
+
+    underTest.check(t);
+  }
+
+  @Test
+  public void check_fails_with_ISE_if_thread_is_executing_a_CeTask_but_on_start_and_on_end_have_not_been_called_on_it() {
+    String taskUuid = randomAlphabetic(15);
+    Thread t = new Thread();
+    mockWorkerOnThread(t, ceWorker);
+    mockWorkerWithTask(ceTask);
+    when(ceTask.getUuid()).thenReturn(taskUuid);
+    underTest.onStart(this.ceTask);
+    underTest.onEnd(this.ceTask);
+
+    expectedException.expect(IllegalStateException.class);
+    expectedException.expectMessage("No start time recorded for task " + taskUuid);
+
+    underTest.check(t);
+  }
+
+  @Test
+  public void check_throws_CeTaskCanceledException_if_provided_thread_is_interrupted() throws InterruptedException {
+    String threadName = randomAlphabetic(30);
+    ComputingThread t = new ComputingThread(threadName);
+    mockWorkerOnThread(t, ceWorker);
+    mockWorkerWithTask(ceTask);
+    underTest.onStart(ceTask);
+
+    try {
+      t.start();
+
+      // will not fail as thread is not interrupted nor timed out
+      underTest.check(t);
+
+      t.interrupt();
+
+      expectedException.expect(CeTaskCanceledException.class);
+      expectedException.expectMessage("CeWorker executing in Thread '" + threadName + "' has been interrupted");
+
+      underTest.check(t);
+    } finally {
+      t.kill();
+      t.join(1_000);
+    }
+  }
+
+  @Test
+  public void check_throws_CeTaskTimeoutException_if_check_called_later_than_timeout_milliseconds_after_on_start() {
+    Thread thread = newThreadWithRandomName();
+    mockWorkerOnThread(thread, ceWorker);
+    mockWorkerWithTask(ceTask);
+    long now = 3_776_663_999L;
+    when(system2.now()).thenReturn(now);
+    underTest.onStart(ceTask);
+
+    // timeout not passed => no exception thrown
+    int beforeTimeoutOffset = 1 + new Random().nextInt(timeoutInMs - 1);
+    when(system2.now()).thenReturn(now + timeoutInMs - beforeTimeoutOffset);
+    underTest.check(thread);
+
+    int afterTimeoutOffset = new Random().nextInt(7_112);
+    when(system2.now()).thenReturn(now + timeoutInMs + afterTimeoutOffset);
+
+    expectedException.expect(CeTaskTimeoutException.class);
+    expectedException.expectMessage("Execution of task timed out after " + (timeoutInMs + afterTimeoutOffset) + " ms");
+
+    underTest.check(thread);
+  }
+
+  @Test
+  public void check_throws_CeTaskCanceledException_if_provided_thread_is_interrupted_even_if_timed_out() throws InterruptedException {
+    String threadName = randomAlphabetic(30);
+    ComputingThread t = new ComputingThread(threadName);
+    mockWorkerOnThread(t, ceWorker);
+    mockWorkerWithTask(ceTask);
+    long now = 3_776_663_999L;
+    when(system2.now()).thenReturn(now);
+    underTest.onStart(ceTask);
+
+    try {
+      t.start();
+      t.interrupt();
+
+      // will not fail as thread is not interrupted nor timed out
+      int afterTimeoutOffset = new Random().nextInt(7_112);
+      when(system2.now()).thenReturn(now + timeoutInMs + afterTimeoutOffset);
+
+      expectedException.expect(CeTaskCanceledException.class);
+      expectedException.expectMessage("CeWorker executing in Thread '" + threadName + "' has been interrupted");
+
+      underTest.check(t);
+    } finally {
+      t.kill();
+      t.join(1_000);
+    }
+  }
+
+  private static Thread newThreadWithRandomName() {
+    String threadName = randomAlphabetic(30);
+    Thread t = new Thread();
+    t.setName(threadName);
+    return t;
+  }
+
+  private void mockWorkerOnThread(Thread t, CeWorker ceWorker) {
+    when(ceWorkerController.getCeWorkerIn(t)).thenReturn(Optional.of(ceWorker));
+  }
+
+  private void mockWorkerWithTask(CeTask ceTask) {
+    when(ceWorker.getCurrentTask()).thenReturn(Optional.of(ceTask));
+  }
+}