]> source.dussan.org Git - sonarqube.git/commitdiff
SONAR-9715 Implement a delay for finishing task in Compute Engine
authorEric Hartmann <hartmann.eric@gmail.com>
Wed, 30 Aug 2017 15:27:05 +0000 (17:27 +0200)
committerSimon Brandhof <simon.brandhof@sonarsource.com>
Tue, 5 Sep 2017 12:24:13 +0000 (14:24 +0200)
25 files changed:
server/sonar-ce/src/main/java/org/sonar/ce/app/CeServer.java
server/sonar-ce/src/main/java/org/sonar/ce/container/ComputeEngineContainerImpl.java
server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeProcessingScheduler.java
server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeProcessingSchedulerImpl.java
server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeWorkerImpl.java
server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/EnabledCeWorkerController.java
server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/EnabledCeWorkerControllerImpl.java
server/sonar-ce/src/test/java/org/sonar/ce/taskprocessor/CeProcessingSchedulerImplTest.java
server/sonar-server/src/main/java/org/sonar/server/computation/task/projectanalysis/step/LoadMeasureComputersStep.java
server/sonar-server/src/test/java/org/sonar/server/computation/task/projectanalysis/step/LoadMeasureComputersStepTest.java
sonar-application/src/main/assembly/conf/wrapper.conf
tests/plugins/server-plugin/src/main/java/ServerPlugin.java
tests/plugins/server-plugin/src/main/java/ce/CePauseStep.java [new file with mode: 0644]
tests/plugins/server-plugin/src/main/java/ce/PauseMetric.java [new file with mode: 0644]
tests/src/test/java/org/sonarqube/tests/Category5Suite.java
tests/src/test/java/org/sonarqube/tests/LogsTailer.java [new file with mode: 0644]
tests/src/test/java/org/sonarqube/tests/ce/CeShutdownTest.java [new file with mode: 0644]
tests/src/test/java/org/sonarqube/tests/cluster/Cluster.java
tests/src/test/java/org/sonarqube/tests/cluster/ClusterTest.java [new file with mode: 0644]
tests/src/test/java/org/sonarqube/tests/cluster/DataCenterEdition.java [deleted file]
tests/src/test/java/org/sonarqube/tests/cluster/DataCenterEditionTest.java [deleted file]
tests/src/test/java/org/sonarqube/tests/cluster/Node.java [new file with mode: 0644]
tests/src/test/java/org/sonarqube/tests/cluster/NodeConfig.java [new file with mode: 0644]
tests/src/test/java/org/sonarqube/tests/cluster/StartupLogWatcherImpl.java [deleted file]
tests/src/test/java/org/sonarqube/tests/serverSystem/ClusterTest.java [deleted file]

index 129c65ef502bf8a191a00a9d35d52fe212b96590..05815aed2b5f64db97ca0a65895bcb8d1e01e3ab 100644 (file)
@@ -171,8 +171,9 @@ public class CeServer implements Monitored {
         try {
           Thread.sleep(CHECK_FOR_STOP_DELAY);
         } catch (InterruptedException e) {
-          // ignore the interruption itself, check the flag
-          Thread.currentThread().interrupt();
+          // ignore the interruption itself
+          // Do not propagate the isInterrupted flag with : Thread.currentThread().interrupt();
+          // It will break the shutdown of ComputeEngineContainerImpl#stop()
         }
       }
       attemptShutdown();
@@ -180,10 +181,11 @@ public class CeServer implements Monitored {
 
     private void attemptShutdown() {
       try {
-        LOG.info("Compute Engine shutting down...");
+        LOG.info("Compute Engine is stopping...");
         computeEngine.shutdown();
+        LOG.info("Compute Engine is stopped");
       } catch (Throwable e) {
-        LOG.error("Compute Engine shutdown failed", e);
+        LOG.error("Compute Engine failed to stop", e);
       } finally {
         // release thread waiting for CeServer
         stopAwait();
@@ -207,8 +209,9 @@ public class CeServer implements Monitored {
       if (t != null) {
         t.interrupt();
         try {
-          t.join(1000);
+          t.join(1_000);
         } catch (InterruptedException e) {
+          Thread.currentThread().interrupt();
           // Ignored
         }
       }
index d89b83febaf2451274a38c1bd6e413fc0fcc5796..ca967e1b59d3af248c4499c5b2af38cd71db7f53 100644 (file)
@@ -53,6 +53,7 @@ import org.sonar.ce.platform.ComputeEngineExtensionInstaller;
 import org.sonar.ce.queue.CeQueueCleaner;
 import org.sonar.ce.queue.PurgeCeActivities;
 import org.sonar.ce.settings.ProjectConfigurationFactory;
+import org.sonar.ce.taskprocessor.CeProcessingScheduler;
 import org.sonar.ce.taskprocessor.CeTaskProcessorModule;
 import org.sonar.ce.user.CeUserSession;
 import org.sonar.core.component.DefaultResourceTypes;
@@ -220,6 +221,11 @@ public class ComputeEngineContainerImpl implements ComputeEngineContainer {
 
   @Override
   public ComputeEngineContainer stop() {
+    if (level4 != null) {
+      // try to graceful stop in-progress tasks
+      CeProcessingScheduler ceProcessingScheduler = level4.getComponentByType(CeProcessingScheduler.class);
+      ceProcessingScheduler.stopScheduling();
+    }
     this.level1.stopComponents();
     return this;
   }
index 831684d2e8c6230c1fdb0af66872b0f627d7d640..f2a327fe720639c83b904799e615f642c385b6d3 100644 (file)
@@ -23,4 +23,5 @@ public interface CeProcessingScheduler {
 
   void startScheduling();
 
+  void stopScheduling();
 }
index 458f527f30785fa35bb13ba1dc6deb016300098c..4792bd27ed4504fcb07043a743b655bd5fef81fc 100644 (file)
@@ -27,7 +27,6 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import javax.annotation.CheckForNull;
 import javax.annotation.Nullable;
-import org.picocontainer.Startable;
 import org.sonar.api.utils.log.Logger;
 import org.sonar.api.utils.log.Loggers;
 import org.sonar.ce.configuration.CeConfiguration;
@@ -35,7 +34,7 @@ import org.sonar.ce.configuration.CeConfiguration;
 import static com.google.common.util.concurrent.Futures.addCallback;
 import static java.util.concurrent.TimeUnit.MILLISECONDS;
 
-public class CeProcessingSchedulerImpl implements CeProcessingScheduler, Startable {
+public class CeProcessingSchedulerImpl implements CeProcessingScheduler {
   private static final Logger LOG = Loggers.get(CeProcessingSchedulerImpl.class);
   private static final long DELAY_BETWEEN_DISABLED_TASKS = 30 * 1000L; // 30 seconds
 
@@ -43,12 +42,15 @@ public class CeProcessingSchedulerImpl implements CeProcessingScheduler, Startab
   private final long delayBetweenEnabledTasks;
   private final TimeUnit timeUnit;
   private final ChainingCallback[] chainingCallbacks;
+  private final EnabledCeWorkerController ceWorkerController;
 
   public CeProcessingSchedulerImpl(CeConfiguration ceConfiguration,
-    CeProcessingSchedulerExecutorService processingExecutorService, CeWorkerFactory ceCeWorkerFactory) {
+    CeProcessingSchedulerExecutorService processingExecutorService, CeWorkerFactory ceCeWorkerFactory,
+    EnabledCeWorkerController ceWorkerController) {
     this.executorService = processingExecutorService;
 
     this.delayBetweenEnabledTasks = ceConfiguration.getQueuePollingDelay();
+    this.ceWorkerController = ceWorkerController;
     this.timeUnit = MILLISECONDS;
 
     int threadWorkerCount = ceConfiguration.getWorkerMaxCount();
@@ -59,11 +61,6 @@ public class CeProcessingSchedulerImpl implements CeProcessingScheduler, Startab
     }
   }
 
-  @Override
-  public void start() {
-    // nothing to do at component startup, startScheduling will be called by CeQueueInitializer
-  }
-
   @Override
   public void startScheduling() {
     for (ChainingCallback chainingCallback : chainingCallbacks) {
@@ -72,10 +69,37 @@ public class CeProcessingSchedulerImpl implements CeProcessingScheduler, Startab
     }
   }
 
+  /**
+   * This method is stopping all the workers giving them a delay before killing them.
+   */
   @Override
-  public void stop() {
+  public void stopScheduling() {
+    LOG.debug("Stopping compute engine");
+    // Requesting all workers to stop
+    for (ChainingCallback chainingCallback : chainingCallbacks) {
+      chainingCallback.stop(false);
+    }
+
+    // Workers have 40s to gracefully stop processing tasks
+    long until = System.currentTimeMillis() + 40_000L;
+    LOG.info("Waiting for workers to finish in-progress tasks");
+    while (System.currentTimeMillis() < until && ceWorkerController.hasAtLeastOneProcessingWorker()) {
+      try {
+        Thread.sleep(200L);
+      } catch (InterruptedException e) {
+        LOG.debug("Graceful stop period has been interrupted", e);
+        Thread.currentThread().interrupt();
+        break;
+      }
+    }
+
+    if (ceWorkerController.hasAtLeastOneProcessingWorker()) {
+      LOG.info("Some in-progress tasks did not finish in due time. Tasks will be stopped.");
+    }
+
+    // Interrupting the tasks
     for (ChainingCallback chainingCallback : chainingCallbacks) {
-      chainingCallback.stop();
+      chainingCallback.stop(true);
     }
   }
 
@@ -149,10 +173,10 @@ public class CeProcessingSchedulerImpl implements CeProcessingScheduler, Startab
       return keepRunning.get();
     }
 
-    public void stop() {
+    public void stop(boolean interrupt) {
       this.keepRunning.set(false);
       if (workerFuture != null) {
-        workerFuture.cancel(false);
+        workerFuture.cancel(interrupt);
       }
     }
   }
index d52bed437f9a9489340b18ac440a0d88ec5df656..f8e6807cfa5abbd499212d6b9743399c03b6819b 100644 (file)
@@ -79,7 +79,7 @@ public class CeWorkerImpl implements CeWorker {
       return NO_TASK;
     }
 
-    try {
+    try (EnabledCeWorkerController.ProcessingRecorderHook processing = enabledCeWorkerController.registerProcessingFor(this)) {
       executeTask(ceTask.get());
     } catch (Exception e) {
       LOG.error(format("An error occurred while executing task with uuid '%s'", ceTask.get().getUuid()), e);
index 37c6c59c0ef21b19dfd9dfacd51a743e41cccd42..5634a20e45563a011b9d2e48c392d311712f57ca 100644 (file)
@@ -24,6 +24,9 @@ package org.sonar.ce.taskprocessor;
  * task to process.
  */
 public interface EnabledCeWorkerController {
+  interface ProcessingRecorderHook extends AutoCloseable {
+  }
+
   /**
    * Requests the {@link EnabledCeWorkerController} to refresh its state, if it has any.
    */
@@ -33,4 +36,13 @@ public interface EnabledCeWorkerController {
    * Returns {@code true} if the specified {@link CeWorker} is enabled
    */
   boolean isEnabled(CeWorker ceWorker);
+
+  ProcessingRecorderHook registerProcessingFor(CeWorker ceWorker);
+
+  /**
+   * Whether at least one worker is being processed a task or not.
+   * Returns {@code false} when all workers are waiting for tasks
+   * or are being stopped.
+   */
+  boolean hasAtLeastOneProcessingWorker();
 }
index 7bb905a3db53af4c1615c8864e08b46869a4a591..500eb671be9c98adc4443026b2fed0381f84d864 100644 (file)
  */
 package org.sonar.ce.taskprocessor;
 
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicInteger;
 import org.sonar.api.utils.log.Loggers;
 import org.sonar.ce.configuration.CeConfiguration;
 
 public class EnabledCeWorkerControllerImpl implements EnabledCeWorkerController {
+  private final ConcurrentHashMap<CeWorker, Status> map = new ConcurrentHashMap<>();
   private final CeConfiguration ceConfiguration;
   private final AtomicInteger workerCount;
 
+  enum Status {
+    PROCESSING, PAUSED
+  }
+
   public EnabledCeWorkerControllerImpl(CeConfiguration ceConfiguration) {
     this.ceConfiguration = ceConfiguration;
     this.workerCount = new AtomicInteger(ceConfiguration.getWorkerCount());
@@ -46,6 +52,16 @@ public class EnabledCeWorkerControllerImpl implements EnabledCeWorkerController
     logEnabledWorkerCount();
   }
 
+  @Override
+  public ProcessingRecorderHook registerProcessingFor(CeWorker ceWorker) {
+    return new ProcessingRecorderHookImpl(ceWorker);
+  }
+
+  @Override
+  public boolean hasAtLeastOneProcessingWorker() {
+    return map.entrySet().stream().anyMatch(e -> e.getValue() == Status.PROCESSING);
+  }
+
   /**
    * Returns {@code true} if {@link CeWorker#getOrdinal() worker ordinal} is strictly less than
    * {@link CeConfiguration#getWorkerCount()}.
@@ -56,4 +72,18 @@ public class EnabledCeWorkerControllerImpl implements EnabledCeWorkerController
   public boolean isEnabled(CeWorker ceWorker) {
     return ceWorker.getOrdinal() < workerCount.get();
   }
+
+  private class ProcessingRecorderHookImpl implements ProcessingRecorderHook {
+    private final CeWorker ceWorker;
+
+    private ProcessingRecorderHookImpl(CeWorker ceWorker) {
+      this.ceWorker = ceWorker;
+      map.put(this.ceWorker, Status.PROCESSING);
+    }
+
+    @Override
+    public void close() throws Exception {
+      map.put(ceWorker, Status.PAUSED);
+    }
+  }
 }
index 660dcc9a5ed114ab7c0006214a1cae9cdf0912d2..e98fa8269621a0ae0bc385cd50b6b58b73c5e25c 100644 (file)
@@ -67,20 +67,20 @@ import static org.sonar.ce.taskprocessor.CeWorker.Result.TASK_PROCESSED;
 public class CeProcessingSchedulerImplTest {
   private static final Error ERROR_TO_INTERRUPT_CHAINING = new Error("Error should stop scheduling");
 
-  @Rule
   // due to risks of infinite chaining of tasks/futures, a timeout is required for safety
+  @Rule
   public TestRule safeguardTimeout = new DisableOnDebug(Timeout.seconds(60));
   @Rule
   public CeConfigurationRule ceConfiguration = new CeConfigurationRule();
-  // Required to prevent an infinite loop
   private CeWorker ceWorker = mock(CeWorker.class);
   private CeWorkerFactory ceWorkerFactory = new TestCeWorkerFactory(ceWorker);
   private StubCeProcessingSchedulerExecutorService processingExecutorService = new StubCeProcessingSchedulerExecutorService();
   private SchedulerCall regularDelayedPoll = new SchedulerCall(ceWorker, 2000L, MILLISECONDS);
   private SchedulerCall extendedDelayedPoll = new SchedulerCall(ceWorker, 30000L, MILLISECONDS);
   private SchedulerCall notDelayedPoll = new SchedulerCall(ceWorker);
+  private EnabledCeWorkerController ceWorkerController = new EnabledCeWorkerControllerImpl(ceConfiguration);
 
-  private CeProcessingSchedulerImpl underTest = new CeProcessingSchedulerImpl(ceConfiguration, processingExecutorService, ceWorkerFactory);
+  private CeProcessingSchedulerImpl underTest = new CeProcessingSchedulerImpl(ceConfiguration, processingExecutorService, ceWorkerFactory, ceWorkerController);
 
   @Test
   public void polls_without_delay_when_CeWorkerCallable_returns_TASK_PROCESSED() throws Exception {
@@ -164,7 +164,7 @@ public class CeProcessingSchedulerImplTest {
   }
 
   @Test
-  public void stop_cancels_next_polling_and_does_not_add_any_new_one() throws Exception {
+  public void stopScheduling_cancels_next_polling_and_does_not_add_any_new_one() throws Exception {
     when(ceWorker.call())
       .thenReturn(NO_TASK)
       .thenReturn(TASK_PROCESSED)
@@ -188,7 +188,7 @@ public class CeProcessingSchedulerImplTest {
       }
       // call stop after second delayed polling
       if (i == 1) {
-        underTest.stop();
+        underTest.stopScheduling();
       }
       i++;
     }
@@ -219,7 +219,7 @@ public class CeProcessingSchedulerImplTest {
     when(processingExecutorService.schedule(any(CeWorker.class), any(Long.class), any(TimeUnit.class))).thenReturn(listenableScheduledFuture);
 
     CeWorkerFactory ceWorkerFactory = spy(new TestCeWorkerFactory(workers));
-    CeProcessingSchedulerImpl underTest = new CeProcessingSchedulerImpl(ceConfiguration, processingExecutorService, ceWorkerFactory);
+    CeProcessingSchedulerImpl underTest = new CeProcessingSchedulerImpl(ceConfiguration, processingExecutorService, ceWorkerFactory, ceWorkerController);
     when(processingExecutorService.schedule(ceWorker, ceConfiguration.getQueuePollingDelay(), MILLISECONDS))
       .thenReturn(listenableScheduledFuture);
 
@@ -454,7 +454,7 @@ public class CeProcessingSchedulerImplTest {
 
       @Override
       public void shutdown() {
-        throw new UnsupportedOperationException("shutdown() not implemented");
+        // Nothing to do
       }
 
       @Override
index 5f21f03de41c2938ad93140d5144524cc94a7578..09737ff467d3c3fd5f604668920f83f72ac51180 100644 (file)
@@ -222,8 +222,8 @@ public class LoadMeasureComputersStep implements ComputationStep {
   private class ValidateOutputMetric implements Predicate<String> {
     @Override
     public boolean apply(@Nonnull String metric) {
-      checkState(!CORE_METRIC_KEYS.contains(metric), "Metric '%s' cannot be used as an output metric as it's a core metric", metric);
-      checkState(pluginMetricKeys.contains(metric), "Metric '%s' cannot be used as an output metric as no plugin declare this metric", metric);
+      checkState(!CORE_METRIC_KEYS.contains(metric), "Metric '%s' cannot be used as an output metric because it's a core metric", metric);
+      checkState(pluginMetricKeys.contains(metric), "Metric '%s' cannot be used as an output metric because no plugins declare this metric", metric);
       return true;
     }
   }
index 293e5468d4c66567b5f2ea0afe6fc0b628634908..b2c9dbe89f754631ce99fea7cc434ff959b6d105 100644 (file)
@@ -127,7 +127,7 @@ public class LoadMeasureComputersStepTest {
   @Test
   public void fail_with_ISE_when_output_metric_is_not_define_by_plugin() throws Exception {
     thrown.expect(IllegalStateException.class);
-    thrown.expectMessage("Metric 'unknown' cannot be used as an output metric as no plugin declare this metric");
+    thrown.expectMessage("Metric 'unknown' cannot be used as an output metric because no plugins declare this metric");
 
     MeasureComputer[] computers = new MeasureComputer[] {newMeasureComputer(array(NEW_METRIC_4), array("unknown"))};
     ComputationStep underTest = new LoadMeasureComputersStep(holder, array(new TestMetrics()), computers);
@@ -137,7 +137,7 @@ public class LoadMeasureComputersStepTest {
   @Test
   public void fail_with_ISE_when_output_metric_is_a_core_metric() throws Exception {
     thrown.expect(IllegalStateException.class);
-    thrown.expectMessage("Metric 'ncloc' cannot be used as an output metric as it's a core metric");
+    thrown.expectMessage("Metric 'ncloc' cannot be used as an output metric because it's a core metric");
 
     MeasureComputer[] computers = new MeasureComputer[] {newMeasureComputer(array(NEW_METRIC_4), array(NCLOC_KEY))};
     ComputationStep underTest = new LoadMeasureComputersStep(holder, array(new TestMetrics()), computers);
@@ -172,7 +172,7 @@ public class LoadMeasureComputersStepTest {
   @Test
   public void fail_with_ISE_when_no_metrics_are_defined_by_plugin_but_measure_computer_use_a_new_metric() throws Exception {
     thrown.expect(IllegalStateException.class);
-    thrown.expectMessage("Metric 'metric1' cannot be used as an output metric as no plugin declare this metric");
+    thrown.expectMessage("Metric 'metric1' cannot be used as an output metric because no plugins declare this metric");
 
     MeasureComputer[] computers = new MeasureComputer[] {newMeasureComputer(array(NCLOC_KEY), array(NEW_METRIC_1))};
     ComputationStep underTest = new LoadMeasureComputersStep(holder, computers);
index 3f37ed2fb46a4c799e52be1f8187b40f94cd69c2..8fb972ab0b11512e46c597f79b22108e0c79ffc6 100644 (file)
@@ -86,4 +86,4 @@ wrapper.ntservice.interactive=false
 wrapper.disable_restarts=TRUE
 wrapper.ping.timeout=0
 wrapper.shutdown.timeout=300
-wrapper.jvm_exit.timeout=15
+wrapper.jvm_exit.timeout=300
index 07aa1f861a2bc0344639cf75824af551e9abe95d..9ab0dd987de487723a3c2a2d14a30ac9e1afebec 100644 (file)
@@ -38,6 +38,8 @@
  * Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
  */
 
+import ce.CePauseStep;
+import ce.PauseMetric;
 import java.util.Arrays;
 import java.util.List;
 import org.sonar.api.Properties;
@@ -86,6 +88,6 @@ import static org.sonar.api.PropertyType.USER_LOGIN;
 public class ServerPlugin extends SonarPlugin {
   public List getExtensions() {
     return Arrays.asList(
-      StartupCrash.class, TempFolderExtension.class);
+      StartupCrash.class, TempFolderExtension.class, PauseMetric.class, CePauseStep.class);
   }
 }
diff --git a/tests/plugins/server-plugin/src/main/java/ce/CePauseStep.java b/tests/plugins/server-plugin/src/main/java/ce/CePauseStep.java
new file mode 100644 (file)
index 0000000..8214ac5
--- /dev/null
@@ -0,0 +1,63 @@
+/*
+ * SonarQube
+ * Copyright (C) 2009-2017 SonarSource SA
+ * mailto:info AT sonarsource DOT com
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 3 of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this program; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
+ */
+package ce;
+
+import java.io.File;
+import org.sonar.api.ce.measure.Component;
+import org.sonar.api.ce.measure.MeasureComputer;
+import org.sonar.api.utils.log.Logger;
+import org.sonar.api.utils.log.Loggers;
+
+public class CePauseStep implements MeasureComputer {
+
+  private static final Logger LOGGER = Loggers.get(CePauseStep.class);
+
+  @Override
+  public MeasureComputerDefinition define(MeasureComputerDefinitionContext defContext) {
+    return defContext.newDefinitionBuilder()
+      .setInputMetrics("ncloc")
+      .setOutputMetrics(PauseMetric.KEY)
+      .build();
+  }
+
+  @Override
+  public void compute(MeasureComputerContext context) {
+    if (context.getComponent().getType() == Component.Type.PROJECT) {
+      String path = context.getSettings().getString("sonar.ce.pauseTask.path");
+      if (path != null) {
+        waitForFileToBeDeleted(path);
+      }
+    }
+  }
+
+  private static void waitForFileToBeDeleted(String path) {
+    LOGGER.info("CE analysis is paused. Waiting for file to be deleted: " + path);
+    File file = new File(path);
+    try {
+      while (file.exists()) {
+        Thread.sleep(500L);
+      }
+      LOGGER.info("CE analysis is resumed");
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+      throw new IllegalStateException("CE analysis has been interrupted");
+    }
+  }
+}
diff --git a/tests/plugins/server-plugin/src/main/java/ce/PauseMetric.java b/tests/plugins/server-plugin/src/main/java/ce/PauseMetric.java
new file mode 100644 (file)
index 0000000..a16f821
--- /dev/null
@@ -0,0 +1,35 @@
+/*
+ * SonarQube
+ * Copyright (C) 2009-2017 SonarSource SA
+ * mailto:info AT sonarsource DOT com
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 3 of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this program; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
+ */
+package ce;
+
+import java.util.Arrays;
+import java.util.List;
+import org.sonar.api.measures.Metric;
+import org.sonar.api.measures.Metrics;
+
+public class PauseMetric implements Metrics {
+
+  public static final String KEY = "pause";
+
+  @Override
+  public List<Metric> getMetrics() {
+    return Arrays.asList(new Metric.Builder(KEY, "Pause", Metric.ValueType.INT).create());
+  }
+}
index 55556648e85ba2407d7b2282844ad2d025a5fedf..7bc3331d1d344394638837802723d85337a7368b 100644 (file)
@@ -21,11 +21,12 @@ package org.sonarqube.tests;
 
 import org.junit.runner.RunWith;
 import org.junit.runners.Suite;
+import org.sonarqube.tests.ce.CeShutdownTest;
 import org.sonarqube.tests.ce.CeWorkersTest;
 import org.sonarqube.tests.qualityProfile.ActiveRuleEsResilienceTest;
 import org.sonarqube.tests.qualityProfile.BuiltInQualityProfilesNotificationTest;
 import org.sonarqube.tests.rule.RuleEsResilienceTest;
-import org.sonarqube.tests.serverSystem.ClusterTest;
+import org.sonarqube.tests.cluster.ClusterTest;
 import org.sonarqube.tests.serverSystem.RestartTest;
 import org.sonarqube.tests.serverSystem.ServerSystemRestartingOrchestrator;
 import org.sonarqube.tests.settings.ElasticsearchSettingsTest;
@@ -63,7 +64,9 @@ import org.sonarqube.tests.user.UserEsResilienceTest;
   TelemetryUploadTest.class,
   TelemetryOptOutTest.class,
   // ce
+  CeShutdownTest.class,
   CeWorkersTest.class,
+
   // elasticsearch
   ElasticsearchSettingsTest.class
 })
diff --git a/tests/src/test/java/org/sonarqube/tests/LogsTailer.java b/tests/src/test/java/org/sonarqube/tests/LogsTailer.java
new file mode 100644 (file)
index 0000000..06f4d7d
--- /dev/null
@@ -0,0 +1,162 @@
+/*
+ * SonarQube
+ * Copyright (C) 2009-2017 SonarSource SA
+ * mailto:info AT sonarsource DOT com
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 3 of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this program; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
+ */
+package org.sonarqube.tests;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.CountDownLatch;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+import org.apache.commons.io.input.Tailer;
+import org.apache.commons.io.input.TailerListenerAdapter;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Watch log files, usually server logs (see Orchestrator.getServer().get*Logs()).
+ * This class allows to not load the full content in memory.
+ */
+public class LogsTailer implements AutoCloseable {
+
+  private final List<Tailer> tailers;
+  private final LogConsumer logConsumer;
+
+  private LogsTailer(Builder builder) {
+    logConsumer = new LogConsumer(builder.consumers);
+    tailers = builder.files.stream()
+      .map(file -> Tailer.create(file, logConsumer, 500))
+      .collect(Collectors.toList());
+  }
+
+  public Watch watch(String text) {
+    return new Watch(text);
+  }
+
+  @Override
+  public void close() throws Exception {
+    for (Tailer tailer : tailers) {
+      tailer.stop();
+    }
+  }
+
+  public static Builder builder() {
+    return new Builder();
+  }
+
+  public static class Builder {
+    private final List<File> files = new ArrayList<>();
+    private final List<Consumer<String>> consumers = new ArrayList<>();
+
+    public Builder addFile(File file) {
+      this.files.add(file);
+      return this;
+    }
+
+    public Builder addFiles(File file, File... otherFiles) {
+      this.files.add(file);
+      Collections.addAll(this.files, otherFiles);
+      return this;
+    }
+
+    public Builder addConsumer(Consumer<String> consumer) {
+      this.consumers.add(consumer);
+      return this;
+    }
+
+    public Builder doOnFind(String text, Runnable runnable) {
+      return addConsumer(log -> {
+        if (log.contains(text)) {
+          runnable.run();
+        }
+      });
+    }
+
+    public LogsTailer build() {
+      return new LogsTailer(this);
+    }
+  }
+
+  private static class LogConsumer extends TailerListenerAdapter {
+    private final List<Consumer<String>> consumers = Collections.synchronizedList(new ArrayList<>());
+
+    private LogConsumer(List<Consumer<String>> consumers) {
+      this.consumers.addAll(consumers);
+    }
+
+    @Override
+    public void handle(String line) {
+      synchronized (consumers) {
+        for (Consumer<String> consumer : consumers) {
+          try {
+            consumer.accept(line);
+          } catch (Exception e) {
+            // do not prevent other consumers to handle the log
+            e.printStackTrace();
+          }
+        }
+      }
+    }
+
+    public void add(Consumer<String> consumer) {
+      this.consumers.add(consumer);
+    }
+
+    public void remove(Consumer<String> consumer) {
+      this.consumers.remove(consumer);
+    }
+  }
+
+  public class Watch implements AutoCloseable {
+    private final String expectedText;
+    private final CountDownLatch foundSignal = new CountDownLatch(1);
+    private String log = null;
+    private final Consumer<String> consumer;
+
+    private Watch(String expectedText) {
+      this.expectedText = requireNonNull(expectedText);
+      this.consumer = log -> {
+        if (log.contains(this.expectedText)) {
+          this.log = log;
+          foundSignal.countDown();
+        }
+      };
+      logConsumer.add(consumer);
+    }
+
+    /**
+     * Blocks until the expected log appears in watched files.
+     */
+    public void waitForLog() throws InterruptedException {
+      foundSignal.await();
+    }
+
+    public Optional<String> getLog() {
+      return Optional.ofNullable(log);
+    }
+
+    @Override
+    public void close() {
+      logConsumer.remove(consumer);
+    }
+  }
+}
diff --git a/tests/src/test/java/org/sonarqube/tests/ce/CeShutdownTest.java b/tests/src/test/java/org/sonarqube/tests/ce/CeShutdownTest.java
new file mode 100644 (file)
index 0000000..29c237c
--- /dev/null
@@ -0,0 +1,176 @@
+/*
+ * SonarQube
+ * Copyright (C) 2009-2017 SonarSource SA
+ * mailto:info AT sonarsource DOT com
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 3 of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this program; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
+ */
+package org.sonarqube.tests.ce;
+
+import com.sonar.orchestrator.Orchestrator;
+import com.sonar.orchestrator.build.SonarScanner;
+import java.io.File;
+import java.io.IOException;
+import org.apache.commons.io.FileUtils;
+import org.junit.Ignore;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.DisableOnDebug;
+import org.junit.rules.TemporaryFolder;
+import org.junit.rules.TestRule;
+import org.junit.rules.Timeout;
+import org.sonarqube.tests.LogsTailer;
+import org.sonarqube.ws.client.WsClient;
+import org.sonarqube.ws.client.ce.ActivityStatusWsRequest;
+import util.ItUtils;
+
+import static com.google.common.base.Preconditions.checkState;
+import static org.assertj.core.api.Assertions.assertThat;
+
+public class CeShutdownTest {
+
+  @Rule
+  public TestRule safeguard = new DisableOnDebug(Timeout.seconds(600));
+  @Rule
+  public TemporaryFolder temp = new TemporaryFolder();
+
+  @Test
+  public void stopping_CE_waits_for_in_progress_task_to_be_finished() throws Exception {
+    try (ComputeEngine ce = new ComputeEngine()) {
+
+      try (LogsTailer.Watch watch = ce.logs().watch("CE analysis is paused")) {
+        ce.triggerTask();
+        watch.waitForLog();
+        assertThat(ce.countInProgressTasks()).isEqualTo(1);
+      }
+
+      // stop does not kill in-progress workers. It waits
+      // for them (at least a few dozens of seconds)
+      try (LogsTailer.Watch watch = ce.logs().watch("Waiting for workers to finish in-progress tasks")) {
+        ce.triggerStop();
+        watch.waitForLog();
+        assertThat(ce.countInProgressTasks()).isEqualTo(1);
+      }
+
+      // resume the in-progress task, so that it can
+      // finish successfully
+      try (LogsTailer.Watch watch = ce.logs().watch("Process [ce] is stopped")) {
+        ce.resumeTask();
+        watch.waitForLog();
+        assertThat(ce.isTaskFinished()).isTrue();
+        assertThat(ce.hasErrorLogs()).isFalse();
+      }
+    }
+  }
+
+  @Test
+  @Ignore("TODO make the graceful stop timeout configurable. 40 seconds is too long for a test.")
+  public void stopping_CE_kills_in_progress_tasks_if_too_long_to_gracefully_stop() throws Exception {
+    try (ComputeEngine ce = new ComputeEngine()) {
+
+      try (LogsTailer.Watch watch = ce.logs().watch("CE analysis is paused")) {
+        ce.triggerTask();
+        watch.waitForLog();
+        assertThat(ce.countInProgressTasks()).isEqualTo(1);
+      }
+
+      // stop does not kill in-progress workers. It waits
+      // for them (at least a few dozens of seconds)
+      try (LogsTailer.Watch watch = ce.logs().watch("Waiting for workers to finish in-progress tasks")) {
+        ce.triggerStop();
+        watch.waitForLog();
+        assertThat(ce.countInProgressTasks()).isEqualTo(1);
+      }
+
+      // resume the in-progress task, so that it can
+      // finish successfully
+      try (LogsTailer.Watch watch = ce.logs().watch("Process [ce] is stopped")) {
+        watch.waitForLog();
+        assertThat(ce.isTaskFinished()).isTrue();
+        assertThat(ce.hasErrorLogs()).isTrue();
+      }
+    }
+  }
+
+  private class ComputeEngine implements AutoCloseable {
+    private final Orchestrator orchestrator;
+    private final File pauseFile;
+    private final WsClient adminWsClient;
+    private Thread stopper;
+    private final LogsTailer logsTailer;
+
+    ComputeEngine() throws Exception {
+      pauseFile = temp.newFile();
+      FileUtils.touch(pauseFile);
+
+      orchestrator = Orchestrator.builderEnv()
+        .setServerProperty("sonar.ce.pauseTask.path", pauseFile.getAbsolutePath())
+        .addPlugin(ItUtils.xooPlugin())
+        .addPlugin(ItUtils.pluginArtifact("server-plugin"))
+        .build();
+      orchestrator.start();
+      adminWsClient = ItUtils.newAdminWsClient(orchestrator);
+      logsTailer = LogsTailer.builder()
+        .addFile(orchestrator.getServer().getCeLogs())
+        .addFile(orchestrator.getServer().getAppLogs())
+        .build();
+    }
+
+    LogsTailer logs() {
+      return logsTailer;
+    }
+
+    void triggerTask() throws InterruptedException {
+      orchestrator.executeBuild(SonarScanner.create(new File("projects/shared/xoo-sample"), "sonar.projectKey", "foo"), false);
+    }
+
+    void resumeTask() throws Exception {
+      FileUtils.forceDelete(pauseFile);
+    }
+
+    int countInProgressTasks() {
+      return adminWsClient.ce().activityStatus(ActivityStatusWsRequest.newBuilder().build()).getInProgress();
+    }
+
+    boolean isTaskFinished() throws Exception {
+      String ceLogs = FileUtils.readFileToString(orchestrator.getServer().getCeLogs());
+      return ceLogs.contains("Executed task | project=foo | type=REPORT");
+    }
+
+    boolean hasErrorLogs() throws IOException {
+      String ceLogs = FileUtils.readFileToString(orchestrator.getServer().getCeLogs());
+      return ceLogs.contains(" ERROR ");
+    }
+
+    /**
+     * non-blocking stop
+     */
+    void triggerStop() {
+      checkState(stopper == null);
+      stopper = new Thread(orchestrator::stop);
+      stopper.start();
+    }
+
+    @Override
+    public void close() throws Exception {
+      if (stopper != null) {
+        stopper.interrupt();
+      }
+      if (orchestrator != null) {
+        orchestrator.stop();
+      }
+    }
+  }
+}
index 6c8416cbc8dd4af1bdf50d2ac60cf5c690664caa..9340ce53d8a529c4a7e6cde65067e2a31bd5b55e 100644 (file)
  * along with this program; if not, write to the Free Software Foundation,
  * Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
  */
-
 package org.sonarqube.tests.cluster;
 
-import com.google.common.net.HostAndPort;
 import com.sonar.orchestrator.Orchestrator;
 import com.sonar.orchestrator.OrchestratorBuilder;
-import com.sonar.orchestrator.util.NetworkUtils;
-import java.net.Inet4Address;
-import java.net.InetAddress;
-import java.net.NetworkInterface;
-import java.net.SocketException;
 import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.EnumSet;
-import java.util.Enumeration;
 import java.util.List;
-import java.util.Properties;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ForkJoinPool;
 import java.util.function.Consumer;
-import java.util.function.Predicate;
-import java.util.stream.Collectors;
-import javax.annotation.CheckForNull;
-
-import static org.sonarqube.tests.cluster.Cluster.NodeType.APPLICATION;
-import static org.sonarqube.tests.cluster.Cluster.NodeType.SEARCH;
-
-public class Cluster {
-
-  protected static final String CLUSTER_ENABLED = "sonar.cluster.enabled";
-  protected static final String CLUSTER_NODE_TYPE = "sonar.cluster.node.type";
-  protected static final String CLUSTER_SEARCH_HOSTS = "sonar.cluster.search.hosts";
-  protected static final String CLUSTER_HOSTS = "sonar.cluster.hosts";
-  protected static final String CLUSTER_NODE_PORT = "sonar.cluster.node.port";
-  protected static final String CLUSTER_NODE_HOST = "sonar.cluster.node.host";
-  protected static final String CLUSTER_NAME = "sonar.cluster.name";
-
-  protected static final String SEARCH_HOST = "sonar.search.host";
-  protected static final String SEARCH_PORT = "sonar.search.port";
-  protected static final String SEARCH_JAVA_OPTS = "sonar.search.javaOpts";
-
-  protected static final String WEB_JAVA_OPTS = "sonar.web.javaOpts";
-  protected static final String WEB_PORT = "sonar.web.port";
-
-  protected static final String CE_JAVA_OPTS = "sonar.ce.javaOpts";
+import java.util.stream.Stream;
+import javax.annotation.Nullable;
+import org.slf4j.LoggerFactory;
 
-  public enum NodeType {
-    SEARCH("search"), APPLICATION("application");
+import static java.util.stream.Collectors.joining;
 
-    final String value;
+class Cluster implements AutoCloseable {
 
-    NodeType(String value) {
-      this.value = value;
-    }
+  @Nullable
+  private final String clusterName;
 
-    public String getValue() {
-      return value;
-    }
+  private final List<Node> nodes = new ArrayList<>();
 
-    public static final EnumSet<NodeType> ALL = EnumSet.allOf(NodeType.class);
+  Cluster(@Nullable String name) {
+    this.clusterName = name;
   }
 
-  private final List<Node> nodes;
-  private final ForkJoinPool forkJoinPool = new ForkJoinPool(5);
-
-  private Cluster(List<Node> nodes) {
-    this.nodes = nodes;
-    assignPorts();
-    completeNodesConfiguration();
-    buildOrchestrators();
+  Node startNode(NodeConfig config, Consumer<OrchestratorBuilder> consumer) {
+    Node node = addNode(config, consumer);
+    node.start();
+    return node;
   }
 
-  public void start() throws ExecutionException, InterruptedException {
-    forkJoinPool.submit(
-      () -> nodes.parallelStream().forEach(
-        node -> node.getOrchestrator().start()
-      )
-    ).get();
-  }
+  Node addNode(NodeConfig config, Consumer<OrchestratorBuilder> consumer) {
+    OrchestratorBuilder builder = newOrchestratorBuilder(config);
 
-  public void stop() throws ExecutionException, InterruptedException {
-    forkJoinPool.submit(
-      () -> nodes.parallelStream().forEach(
-        node -> node.getOrchestrator().stop()
-      )
-    ).get();
-  }
-
-  public void stopAll(Predicate<Node> predicate) throws ExecutionException, InterruptedException {
-    forkJoinPool.submit(
-      () -> nodes.parallelStream()
-      .filter(predicate)
-      .forEach(n -> n.getOrchestrator().stop())
-    ).get();
-  }
-
-  public void startAll(Predicate<Node> predicate) throws ExecutionException, InterruptedException {
-    forkJoinPool.submit(
-      () -> nodes.parallelStream()
-      .filter(predicate)
-      .forEach(n -> n.getOrchestrator().start())
-    ).get();
-  }
-
-  public List<Node> getNodes() {
-    return Collections.unmodifiableList(nodes);
-  }
-
-  private void assignPorts() {
-    nodes.stream().forEach(
-      node -> {
-        node.setHzPort(NetworkUtils.getNextAvailablePort(getNonloopbackIPv4Address()));
-        if (node.getType() == SEARCH) {
-          node.setEsPort(NetworkUtils.getNextAvailablePort(getNonloopbackIPv4Address()));
-        } else if (node.getType() == APPLICATION) {
-          node.setWebPort(NetworkUtils.getNextAvailablePort(getNonloopbackIPv4Address()));
-        }
-      }
-    );
-  }
-
-  public static InetAddress getNonloopbackIPv4Address()  {
-    try {
-      Enumeration<NetworkInterface> nets = NetworkInterface.getNetworkInterfaces();
-      for (NetworkInterface networkInterface : Collections.list(nets)) {
-        if (!networkInterface.isLoopback() && networkInterface.isUp() && !isBlackListed(networkInterface)) {
-          Enumeration<InetAddress> inetAddresses = networkInterface.getInetAddresses();
-          while (inetAddresses.hasMoreElements()) {
-            InetAddress inetAddress = inetAddresses.nextElement();
-            if (inetAddress instanceof Inet4Address) {
-              return inetAddress;
-            }
-          }
-        }
-      }
-    } catch (SocketException se) {
-      throw new RuntimeException("Cannot find a non loopback card required for tests", se);
+    switch (config.getType()) {
+      case SEARCH:
+        builder
+          .setServerProperty("sonar.cluster.node.type", "search")
+          .setServerProperty("sonar.search.host", config.getAddress().getHostAddress())
+          .setServerProperty("sonar.search.port", "" + config.getSearchPort().get())
+          .setServerProperty("sonar.search.javaOpts", "-Xmx64m -Xms64m -XX:+HeapDumpOnOutOfMemoryError");
+        break;
+      case APPLICATION:
+        builder
+          .setServerProperty("sonar.cluster.node.type", "application")
+          .setServerProperty("sonar.web.host", config.getAddress().getHostAddress())
+          .setServerProperty("sonar.web.port", "" + config.getWebPort().get())
+          .setServerProperty("sonar.web.javaOpts", "-Xmx128m -Xms16m -XX:+HeapDumpOnOutOfMemoryError")
+          .setServerProperty("sonar.auth.jwtBase64Hs256Secret", "HrPSavOYLNNrwTY+SOqpChr7OwvbR/zbDLdVXRN0+Eg=")
+          .setServerProperty("sonar.ce.javaOpts", "-Xmx32m -Xms16m -XX:+HeapDumpOnOutOfMemoryError");
+        break;
     }
-    throw new RuntimeException("Cannot find a non loopback card required for tests");
+    consumer.accept(builder);
+    Orchestrator orchestrator = builder.build();
+    Node node = new Node(config, orchestrator);
+    nodes.add(node);
+    return node;
   }
 
-  private static boolean isBlackListed(NetworkInterface networkInterface) {
-    return networkInterface.getName().startsWith("docker") ||
-      networkInterface.getName().startsWith("vboxnet");
+  Stream<Node> getNodes() {
+    return nodes.stream();
   }
 
-  private void completeNodesConfiguration() {
-    String inet = getNonloopbackIPv4Address().getHostAddress();
-    String clusterHosts = nodes.stream()
-      .map(node -> HostAndPort.fromParts(inet, node.getHzPort()).toString())
-      .collect(Collectors.joining(","));
-    String elasticsearchHosts = nodes.stream()
-      .filter(node -> node.getType() == SEARCH)
-      .map(node -> HostAndPort.fromParts(inet, node.getEsPort()).toString())
-      .collect(Collectors.joining(","));
-
-    nodes.forEach(
-      node -> {
-        node.addProperty(CLUSTER_NODE_HOST, inet);
-        node.addProperty(CLUSTER_HOSTS, clusterHosts);
-        node.addProperty(CLUSTER_NODE_PORT, Integer.toString(node.getHzPort() == null ? -1 : node.getHzPort()));
-        node.addProperty(CLUSTER_SEARCH_HOSTS, elasticsearchHosts);
-        node.addProperty(SEARCH_PORT, Integer.toString(node.getEsPort() == null ? -1 : node.getEsPort()));
-        node.addProperty(SEARCH_HOST, inet);
-        node.addProperty(WEB_PORT, Integer.toString(node.getWebPort() == null ? -1 : node.getWebPort()));
-        node.addProperty(CLUSTER_NODE_TYPE, node.getType().getValue());
-      }
-    );
+  Stream<Node> getAppNodes() {
+    return nodes.stream().filter(n -> n.getConfig().getType() == NodeConfig.NodeType.APPLICATION);
   }
 
-  private void buildOrchestrators() {
-    nodes.stream().limit(1).forEach(
-      node -> buildOrchestrator(node, false)
-    );
-    nodes.stream().skip(1).forEach(
-      node -> buildOrchestrator(node, true)
-    );
+  Stream<Node> getSearchNodes() {
+    return nodes.stream().filter(n -> n.getConfig().getType() == NodeConfig.NodeType.SEARCH);
   }
 
-  private void buildOrchestrator(Node node, boolean keepDatabase) {
-    OrchestratorBuilder builder = Orchestrator.builderEnv()
-      .setOrchestratorProperty("orchestrator.keepDatabase", Boolean.toString(keepDatabase))
-      .setStartupLogWatcher(new StartupLogWatcherImpl());
-
-    node.getProperties().entrySet().stream().forEach(
-      e -> builder.setServerProperty((String) e.getKey(), (String) e.getValue())
-    );
-
-    node.setOrchestrator(builder.build());
+  Node getAppNode(int index) {
+    return getAppNodes().skip(index).findFirst().orElseThrow(IllegalArgumentException::new);
   }
 
-  public static Builder builder() {
-    return new Builder();
+  Node getSearchNode(int index) {
+    return getSearchNodes().skip(index).findFirst().orElseThrow(IllegalArgumentException::new);
   }
 
-  public static class Builder  {
-    private final List<Node> nodes = new ArrayList<>();
-
-    public Cluster build() {
-      return new Cluster(nodes);
-    }
-
-    public Builder addNode(NodeType type, Consumer<Node>... consumers) {
-      Node node = new Node(type);
-      Arrays.stream(consumers).forEach(c -> c.accept(node));
-      nodes.add(node);
-      return this;
+  @Override
+  public void close() throws Exception {
+    // nodes are stopped in order of creation
+    for (Node node : nodes) {
+      try {
+        node.stop();
+      } catch (Exception e) {
+        LoggerFactory.getLogger(getClass()).error("Fail to stop node", e);
+      }
     }
   }
 
-  /**
-   * A cluster node
-   */
-  public static class Node {
-    private final NodeType type;
-    private Integer webPort;
-    private Integer esPort;
-    private Integer hzPort;
-    private Orchestrator orchestrator = null;
-    private Properties properties = new Properties();
-
-    public Node(NodeType type) {
-      this.type = type;
-
-      // Default properties
-      properties.setProperty(CLUSTER_ENABLED, "true");
-      properties.setProperty(CLUSTER_NAME, "sonarqube");
-      properties.setProperty(CE_JAVA_OPTS, "-Xmx256m");
-      properties.setProperty(WEB_JAVA_OPTS, "-Xmx256m");
-      properties.setProperty(SEARCH_JAVA_OPTS,  "-Xmx256m -Xms256m " +
-          "-XX:+UseConcMarkSweepGC " +
-          "-XX:CMSInitiatingOccupancyFraction=75 " +
-          "-XX:+UseCMSInitiatingOccupancyOnly " +
-          "-XX:+AlwaysPreTouch " +
-          "-server " +
-          "-Xss1m " +
-          "-Djava.awt.headless=true " +
-          "-Dfile.encoding=UTF-8 " +
-          "-Djna.nosys=true " +
-          "-Djdk.io.permissionsUseCanonicalPath=true " +
-          "-Dio.netty.noUnsafe=true " +
-          "-Dio.netty.noKeySetOptimization=true " +
-          "-Dio.netty.recycler.maxCapacityPerThread=0 " +
-          "-Dlog4j.shutdownHookEnabled=false " +
-          "-Dlog4j2.disable.jmx=true " +
-          "-Dlog4j.skipJansi=true " +
-          "-XX:+HeapDumpOnOutOfMemoryError");
-    }
-
-    public Properties getProperties() {
-      return properties;
-    }
-
-    public Orchestrator getOrchestrator() {
-      return orchestrator;
-    }
-
-    private void setOrchestrator(Orchestrator orchestrator) {
-      this.orchestrator = orchestrator;
-    }
-
-    public NodeType getType() {
-      return type;
+  private OrchestratorBuilder newOrchestratorBuilder(NodeConfig node) {
+    OrchestratorBuilder builder = Orchestrator.builderEnv();
+    builder.setOrchestratorProperty("orchestrator.keepDatabase", "true");
+    builder.setServerProperty("sonar.cluster.enabled", "true");
+    builder.setServerProperty("sonar.cluster.node.host", node.getAddress().getHostAddress());
+    builder.setServerProperty("sonar.cluster.node.port", "" + node.getHzPort());
+    builder.setServerProperty("sonar.cluster.hosts", node.getConnectedNodes().stream().map(NodeConfig::getHzHost).collect(joining(",")));
+    builder.setServerProperty("sonar.cluster.search.hosts", node.getSearchNodes().stream().map(NodeConfig::getSearchHost).collect(joining(",")));
+    if (clusterName != null) {
+      builder.setServerProperty("sonar.cluster.name", clusterName);
     }
-
-    @CheckForNull
-    public Integer getWebPort() {
-      return webPort;
-    }
-
-    @CheckForNull
-    public Integer getEsPort() {
-      return esPort;
-    }
-
-    @CheckForNull
-    public Integer getHzPort() {
-      return hzPort;
-    }
-
-    private void setWebPort(Integer webPort) {
-      this.webPort = webPort;
-    }
-
-    private void setEsPort(Integer esPort) {
-      this.esPort = esPort;
-    }
-
-    private void setHzPort(Integer hzPort) {
-      this.hzPort = hzPort;
-    }
-
-    private void addProperty(String key, String value) {
-      properties.setProperty(key, value);
+    if (node.getName().isPresent()) {
+      builder.setServerProperty("sonar.cluster.node.name", node.getName().get());
     }
+    builder.setStartupLogWatcher(logLine -> true);
+    return builder;
   }
 }
diff --git a/tests/src/test/java/org/sonarqube/tests/cluster/ClusterTest.java b/tests/src/test/java/org/sonarqube/tests/cluster/ClusterTest.java
new file mode 100644 (file)
index 0000000..0cf07e3
--- /dev/null
@@ -0,0 +1,261 @@
+/*
+ * SonarQube
+ * Copyright (C) 2009-2017 SonarSource SA
+ * mailto:info AT sonarsource DOT com
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 3 of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this program; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
+ */
+package org.sonarqube.tests.cluster;
+
+import com.sonar.orchestrator.Orchestrator;
+import com.sonar.orchestrator.OrchestratorBuilder;
+import com.sonar.orchestrator.db.DefaultDatabase;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.function.BinaryOperator;
+import java.util.function.Consumer;
+import java.util.stream.IntStream;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.DisableOnDebug;
+import org.junit.rules.ExpectedException;
+import org.junit.rules.TestRule;
+import org.junit.rules.Timeout;
+
+import static com.google.common.base.Preconditions.checkState;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.Assert.fail;
+import static org.sonarqube.tests.cluster.NodeConfig.newApplicationConfig;
+import static org.sonarqube.tests.cluster.NodeConfig.newSearchConfig;
+
+public class ClusterTest {
+
+  @Rule
+  public TestRule safeguard = new DisableOnDebug(Timeout.seconds(300));
+
+  @Rule
+  public ExpectedException expectedException = ExpectedException.none();
+
+  @BeforeClass
+  public static void initDbSchema() throws Exception {
+    Orchestrator orchestrator = Orchestrator.builderEnv()
+      // enforce (re-)creation of database schema
+      .setOrchestratorProperty("orchestrator.keepDatabase", "false")
+      .build();
+    DefaultDatabase db = new DefaultDatabase(orchestrator.getConfiguration());
+    checkState(!db.getClient().getDialect().equals("h2"), "H2 is not supported in cluster mode");
+    db.start();
+    db.stop();
+  }
+
+  @Test
+  public void test_high_availability_topology() throws Exception {
+    try (Cluster cluster = newCluster(3, 2)) {
+      cluster.getNodes().forEach(Node::start);
+      cluster.getAppNodes().forEach(Node::waitForStatusUp);
+
+      // TODO verify cluster health to be green
+      // TODO verify that ES cluster is green
+
+      cluster.getNodes().forEach(node -> {
+        node.assertThatProcessesAreUp();
+        assertThat(node.anyLogsContain(" ERROR ")).isFalse();
+        assertThat(node.anyLogsContain("MessageException")).isFalse();
+      });
+
+      // verify that there's a single web startup leader
+      Node startupLeader = cluster.getAppNodes()
+        .filter(Node::isStartupLeader)
+        .reduce(singleElement())
+        .get();
+      assertThat(startupLeader.hasStartupLeaderOperations()).isTrue();
+      assertThat(startupLeader.hasCreatedSearchIndices()).isTrue();
+
+      // verify that the second app node is a startup follower
+      Node startupFollower = cluster.getAppNodes()
+        .filter(Node::isStartupFollower)
+        .reduce(singleElement())
+        .get();
+      assertThat(startupFollower.hasStartupLeaderOperations()).isFalse();
+      assertThat(startupFollower.hasCreatedSearchIndices()).isFalse();
+
+      cluster.getAppNodes().forEach(app -> {
+        // compute engine is being started when app node is already in status UP
+        app.waitForCeLogsContain("Compute Engine is operational");
+        assertThat(app.anyLogsContain("Process[ce] is up")).isTrue();
+      });
+    }
+  }
+
+  @Test
+  public void minimal_cluster_is_2_search_and_1_application_nodes() throws Exception {
+    try (Cluster cluster = newCluster(2, 1)) {
+      cluster.getNodes().forEach(Node::start);
+
+      Node app = cluster.getAppNode(0);
+      app.waitForStatusUp();
+      app.waitForCeLogsContain("Compute Engine is operational");
+      assertThat(app.isStartupLeader()).isTrue();
+      assertThat(app.hasStartupLeaderOperations()).isTrue();
+
+      // TODO verify cluster health to be yellow
+      // TODO verify that ES cluster is yellow
+
+      cluster.getNodes().forEach(node -> {
+        assertThat(node.anyLogsContain(" ERROR ")).isFalse();
+        node.assertThatProcessesAreUp();
+      });
+    }
+  }
+
+  @Test
+  public void configuration_of_connection_to_other_nodes_can_be_non_exhaustive() throws Exception {
+    try (Cluster cluster = new Cluster(null)) {
+      NodeConfig searchConfig1 = newSearchConfig();
+      NodeConfig searchConfig2 = newSearchConfig();
+      NodeConfig appConfig = newApplicationConfig();
+
+      // HZ bus : app -> search 2 -> search1, which is not recommended at all !!!
+      searchConfig2.addConnectionToBus(searchConfig1);
+      appConfig.addConnectionToBus(searchConfig2);
+
+      // search1 is not configured to connect search2
+      // app is not configured to connect to search 1
+      // --> not recommended at all !!!
+      searchConfig2.addConnectionToSearch(searchConfig1);
+      appConfig.addConnectionToSearch(searchConfig2);
+
+      cluster.startNode(searchConfig1, nothing());
+      cluster.startNode(searchConfig2, nothing());
+      Node app = cluster.startNode(appConfig, nothing());
+
+      app.waitForStatusUp();
+      assertThat(app.isStartupLeader()).isTrue();
+      assertThat(app.hasStartupLeaderOperations()).isTrue();
+      // TODO verify cluster health to be yellow
+      // TODO verify that ES cluster is yellow
+
+      // no errors
+      cluster.getNodes().forEach(node -> {
+        assertThat(node.anyLogsContain(" ERROR ")).isFalse();
+        node.assertThatProcessesAreUp();
+      });
+    }
+  }
+
+  @Test
+  public void node_fails_to_join_cluster_if_different_cluster_name() throws Exception {
+    try (Cluster cluster = new Cluster("foo")) {
+      NodeConfig searchConfig1 = newSearchConfig();
+      NodeConfig searchConfig2 = newSearchConfig();
+      NodeConfig.interconnectBus(searchConfig1, searchConfig2);
+      NodeConfig.interconnectSearch(searchConfig1, searchConfig2);
+      cluster.startNode(searchConfig1, nothing());
+      cluster.startNode(searchConfig2, nothing());
+
+      NodeConfig searchConfig3 = newSearchConfig()
+        .addConnectionToSearch(searchConfig1)
+        .addConnectionToBus(searchConfig1, searchConfig2);
+      Node search3 = cluster.addNode(searchConfig3, b -> b
+        .setServerProperty("sonar.cluster.name", "bar")
+        .setStartupLogWatcher(logLine -> logLine.contains("SonarQube is up")));
+      try {
+        search3.start();
+        fail();
+      } catch (IllegalStateException e) {
+        assertThat(e).hasMessage("Server startup failure");
+        // TODO how to force process to write into sonar.log, even if sonar.log.console=true ?
+        // assertThat(search3.anyLogsContain("This node has a cluster name [bar], which does not match [foo] from the cluster")).isTrue();
+      }
+    }
+  }
+
+  @Test
+  public void restarting_all_application_nodes_elects_a_new_startup_leader() throws Exception {
+    // no need for 3 search nodes, 2 is enough for the test
+    try (Cluster cluster = newCluster(2, 2)) {
+      cluster.getNodes().forEach(Node::start);
+      cluster.getAppNodes().forEach(Node::waitForStatusUp);
+
+      // stop application nodes only
+      cluster.getAppNodes().forEach(app -> {
+        app.stop();
+        app.cleanUpLogs();
+        // logs are empty, no more possible to know if node was startup leader/follower
+        assertThat(app.isStartupLeader()).isFalse();
+        assertThat(app.isStartupFollower()).isFalse();
+      });
+
+      // restart application nodes
+      cluster.getAppNodes().forEach(Node::start);
+      cluster.getAppNodes().forEach(Node::waitForStatusUp);
+
+      // one app node is elected as startup leader. It does some initialization stuff,
+      // like registration of rules. Search indices already exist and are up-to-date.
+      Node startupLeader = cluster.getAppNodes()
+        .filter(Node::isStartupLeader)
+        .reduce(singleElement())
+        .get();
+      assertThat(startupLeader.hasStartupLeaderOperations()).isTrue();
+      assertThat(startupLeader.hasCreatedSearchIndices()).isFalse();
+
+      Node startupFollower = cluster.getAppNodes()
+        .filter(Node::isStartupFollower)
+        .reduce(singleElement())
+        .get();
+      assertThat(startupFollower.hasStartupLeaderOperations()).isFalse();
+      assertThat(startupFollower.hasCreatedSearchIndices()).isFalse();
+      assertThat(startupFollower).isNotSameAs(startupLeader);
+    }
+  }
+
+  /**
+   * Used to have non-blocking {@link Node#start()}. Orchestrator considers
+   * node to be up as soon as the first log is generated.
+   */
+  private static Consumer<OrchestratorBuilder> nothing() {
+    return b -> {
+    };
+  }
+
+  /**
+   * Configure a cluster with recommended configuration (each node has references
+   * to other nodes)
+   */
+  private static Cluster newCluster(int nbOfSearchNodes, int nbOfAppNodes) {
+    Cluster cluster = new Cluster(null);
+
+    List<NodeConfig> configs = new ArrayList<>();
+    IntStream.range(0, nbOfSearchNodes).forEach(i -> configs.add(newSearchConfig()));
+    IntStream.range(0, nbOfAppNodes).forEach(i -> configs.add(newApplicationConfig()));
+    NodeConfig[] configsArray = configs.toArray(new NodeConfig[configs.size()]);
+
+    // a node is connected to all nodes, including itself (see sonar.cluster.hosts)
+    NodeConfig.interconnectBus(configsArray);
+
+    // search nodes are interconnected, and app nodes connect to all search nodes
+    NodeConfig.interconnectSearch(configsArray);
+
+    configs.forEach(c -> cluster.addNode(c, nothing()));
+    return cluster;
+  }
+
+  private static BinaryOperator<Node> singleElement() {
+    return (a, b) -> {
+      throw new IllegalStateException("More than one element");
+    };
+  }
+}
diff --git a/tests/src/test/java/org/sonarqube/tests/cluster/DataCenterEdition.java b/tests/src/test/java/org/sonarqube/tests/cluster/DataCenterEdition.java
deleted file mode 100644 (file)
index b431bcf..0000000
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * SonarQube
- * Copyright (C) 2009-2017 SonarSource SA
- * mailto:info AT sonarsource DOT com
- *
- * This program is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License as published by the Free Software Foundation; either
- * version 3 of the License, or (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public License
- * along with this program; if not, write to the Free Software Foundation,
- * Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
- */
-
-package org.sonarqube.tests.cluster;
-
-import java.util.concurrent.ExecutionException;
-
-import static org.sonarqube.tests.cluster.Cluster.NodeType.APPLICATION;
-import static org.sonarqube.tests.cluster.Cluster.NodeType.SEARCH;
-
-public class DataCenterEdition {
-
-  private final Cluster cluster;
-
-  public DataCenterEdition() {
-    cluster = Cluster.builder()
-      .addNode(SEARCH)
-      .addNode(SEARCH)
-      .addNode(SEARCH)
-      .addNode(APPLICATION)
-      .addNode(APPLICATION)
-      .build();
-  }
-
-  public void stop() throws ExecutionException, InterruptedException {
-    cluster.stop();
-  }
-
-  public void start() throws ExecutionException, InterruptedException {
-    cluster.start();
-  }
-
-  public Cluster getCluster() {
-    return cluster;
-  }
-}
diff --git a/tests/src/test/java/org/sonarqube/tests/cluster/DataCenterEditionTest.java b/tests/src/test/java/org/sonarqube/tests/cluster/DataCenterEditionTest.java
deleted file mode 100644 (file)
index 52f7930..0000000
+++ /dev/null
@@ -1,157 +0,0 @@
-/*
- * SonarQube
- * Copyright (C) 2009-2017 SonarSource SA
- * mailto:info AT sonarsource DOT com
- *
- * This program is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License as published by the Free Software Foundation; either
- * version 3 of the License, or (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public License
- * along with this program; if not, write to the Free Software Foundation,
- * Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
- */
-
-package org.sonarqube.tests.cluster;
-
-import com.sonar.orchestrator.db.Database;
-import com.sonar.orchestrator.db.DatabaseClient;
-import java.io.IOException;
-import java.net.InetAddress;
-import java.net.ServerSocket;
-import java.sql.Connection;
-import java.sql.SQLException;
-import java.sql.Statement;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
-import javax.annotation.Nullable;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.DisableOnDebug;
-import org.junit.rules.TestRule;
-import org.junit.rules.Timeout;
-
-import static org.assertj.core.api.Assertions.assertThat;
-import static org.assertj.core.api.Assertions.tuple;
-import static org.junit.Assert.fail;
-import static org.sonarqube.tests.cluster.Cluster.NodeType.APPLICATION;
-import static org.sonarqube.tests.cluster.Cluster.NodeType.SEARCH;
-
-public class DataCenterEditionTest {
-
-  @Rule
-  public TestRule timeout = new DisableOnDebug(Timeout.builder()
-    .withLookingForStuckThread(true)
-    .withTimeout(5, TimeUnit.MINUTES)
-    .build());
-
-  @Test
-  public void launch() throws ExecutionException, InterruptedException {
-    DataCenterEdition dce = new DataCenterEdition();
-    Cluster cluster = dce.getCluster();
-    dce.start();
-    assertThat(cluster.getNodes())
-      .extracting(Cluster.Node::getType, n -> isPortBound(false, n.getEsPort()), n -> isPortBound(true, n.getWebPort()))
-      .containsExactlyInAnyOrder(
-        tuple(SEARCH, true, false),
-        tuple(SEARCH, true, false),
-        tuple(SEARCH, true, false),
-        tuple(APPLICATION, false, true),
-        tuple(APPLICATION, false, true)
-      );
-    dce.stop();
-  }
-
-  @Test
-  public void upgrade_application_nodes_without_stopping_search_nodes_must_work() throws ExecutionException, InterruptedException, SQLException {
-    DataCenterEdition dce = new DataCenterEdition();
-    Cluster cluster = dce.getCluster();
-    dce.start();
-
-    // Stop all Application nodes
-    cluster.stopAll(n -> n.getType() == APPLICATION);
-
-    // Drop the schema
-    Database database = cluster.getNodes().get(0).getOrchestrator().getDatabase();
-    dropAndCreate(database.getClient());
-    assertDatabaseDropped(database);
-
-    // Start all Application nodes
-    cluster.startAll(n -> n.getType() == APPLICATION);
-
-    // We are expecting a new leader to be elected which will recreate the database
-    assertDatabaseInitialized(database);
-
-    dce.stop();
-  }
-
-  @Test
-  public void using_different_cluster_names_should_fail() throws ExecutionException, InterruptedException, SQLException {
-    Cluster cluster = Cluster.builder()
-      .addNode(SEARCH, n -> n.getProperties().setProperty(Cluster.CLUSTER_NAME, "goodClusterName"))
-      .addNode(SEARCH, n -> n.getProperties().setProperty(Cluster.CLUSTER_NAME, "goodClusterName"))
-      .addNode(SEARCH, n -> n.getProperties().setProperty(Cluster.CLUSTER_NAME, "goodClusterName"))
-      .addNode(APPLICATION, n -> n.getProperties().setProperty(Cluster.CLUSTER_NAME, "goodClusterName"))
-      .addNode(APPLICATION, n -> n.getProperties().setProperty(Cluster.CLUSTER_NAME, "badClusterName"))
-      .build();
-    cluster.startAll(n -> "goodClusterName".equals(n.getProperties().getProperty(Cluster.CLUSTER_NAME)));
-
-    try {
-      cluster.startAll(n -> "badClusterName".equals(n.getProperties().getProperty(Cluster.CLUSTER_NAME)));
-      fail("A node with a bad cluster name was able to join the cluster");
-    } catch (Exception e) {
-      // we expect, that joining the cluster fails
-      System.out.println(e);
-    }
-  }
-
-  private void assertDatabaseInitialized(Database database) {
-    assertThat(countRowsOfMigration(database)).isGreaterThan(0);
-  }
-
-  private int countRowsOfMigration(Database database) {
-    return database.countSql("select count(*) from schema_migrations");
-  }
-
-  private void assertDatabaseDropped(Database database) {
-    try {
-      countRowsOfMigration(database);
-      fail("Table 'schema_migrations' has not been dropped");
-    } catch (Exception e) {
-      // we expect the table to not exist
-    }
-  }
-
-  private static boolean isPortBound(boolean loopback, @Nullable Integer port) {
-    if (port == null) {
-      return false;
-    }
-    InetAddress inetAddress = loopback ? InetAddress.getLoopbackAddress() : Cluster.getNonloopbackIPv4Address();
-    try (ServerSocket socket = new ServerSocket(port, 50, inetAddress)) {
-      throw new IllegalStateException("A port was set explicitly, but was not bound (port="+port+")");
-    } catch (IOException e) {
-      return true;
-    }
-  }
-
-  private static void dropAndCreate(DatabaseClient databaseClient) throws SQLException {
-    try (Connection connection = databaseClient.openRootConnection()) {
-      executeDdl(connection, databaseClient.getDropDdl());
-      executeDdl(connection, databaseClient.getCreateDdl());
-    }
-  }
-
-  private static void executeDdl(Connection connection, String... ddls) throws SQLException {
-    try (Statement stmt = connection.createStatement()) {
-      for (String ddl : ddls) {
-        stmt.executeUpdate(ddl);
-      }
-    }
-  }
-}
diff --git a/tests/src/test/java/org/sonarqube/tests/cluster/Node.java b/tests/src/test/java/org/sonarqube/tests/cluster/Node.java
new file mode 100644 (file)
index 0000000..03dd4da
--- /dev/null
@@ -0,0 +1,185 @@
+/*
+ * SonarQube
+ * Copyright (C) 2009-2017 SonarSource SA
+ * mailto:info AT sonarsource DOT com
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 3 of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this program; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
+ */
+package org.sonarqube.tests.cluster;
+
+import com.google.common.util.concurrent.Uninterruptibles;
+import com.sonar.orchestrator.Orchestrator;
+import java.io.File;
+import java.io.IOException;
+import java.net.ServerSocket;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import javax.annotation.Nullable;
+import org.apache.commons.io.FileUtils;
+import util.ItUtils;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+class Node {
+
+  private final NodeConfig config;
+  private final Orchestrator orchestrator;
+
+  Node(NodeConfig config, Orchestrator orchestrator) {
+    this.config = config;
+    this.orchestrator = orchestrator;
+  }
+
+  NodeConfig getConfig() {
+    return config;
+  }
+
+  /**
+   * Non-blocking startup of node. The method does not wait for
+   * node to be started because Orchestrator uses a StartupLogWatcher
+   * that returns as soon as a log is generated.
+   */
+  void start() {
+    orchestrator.start();
+  }
+
+  void stop() {
+    orchestrator.stop();
+  }
+
+  void cleanUpLogs() {
+    if (orchestrator.getServer() != null) {
+      FileUtils.deleteQuietly(orchestrator.getServer().getWebLogs());
+      FileUtils.deleteQuietly(orchestrator.getServer().getCeLogs());
+      FileUtils.deleteQuietly(orchestrator.getServer().getEsLogs());
+      FileUtils.deleteQuietly(orchestrator.getServer().getAppLogs());
+    }
+  }
+
+  boolean isStartupLeader() {
+    return webLogsContain("Cluster enabled (startup leader)");
+  }
+
+  boolean isStartupFollower() {
+    return webLogsContain("Cluster enabled (startup follower)");
+  }
+
+  void waitForStatusUp() {
+    waitForStatus("UP");
+  }
+
+  void waitForStatus(String expectedStatus) {
+    String status = null;
+    try {
+      while (!expectedStatus.equals(status)) {
+        if (orchestrator.getServer() != null) {
+          try {
+            Map<String, Object> json = ItUtils.jsonToMap(orchestrator.getServer().newHttpCall("api/system/status").executeUnsafely().getBodyAsString());
+            status = (String) json.get("status");
+          } catch (Exception e) {
+            // ignored
+          }
+        }
+
+        Thread.sleep(500);
+      }
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+    }
+  }
+
+  void assertThatProcessesAreUp() {
+    assertThat(arePortsBound()).as(getConfig().getType().toString()).isTrue();
+    switch (config.getType()) {
+      case SEARCH:
+        assertThat(anyLogsContain("Process[es] is up")).isTrue();
+        assertThat(anyLogsContain("Process[web] is up")).isFalse();
+        assertThat(anyLogsContain("Elasticsearch cluster enabled")).isTrue();
+        break;
+      case APPLICATION:
+        assertThat(anyLogsContain("Process[es] is up")).isFalse();
+        assertThat(anyLogsContain("Process[web] is up")).isTrue();
+        assertThat(anyLogsContain("Elasticsearch cluster enabled")).isFalse();
+        break;
+    }
+  }
+
+  void waitForCeLogsContain(String expectedMessage) {
+    boolean found = false;
+    while (!found) {
+      found = orchestrator.getServer() != null && fileContains(orchestrator.getServer().getCeLogs(), expectedMessage);
+      if (!found) {
+        Uninterruptibles.sleepUninterruptibly(1_000, TimeUnit.MILLISECONDS);
+      }
+    }
+  }
+
+  boolean hasStartupLeaderOperations() throws IOException {
+    if (orchestrator.getServer() == null) {
+      return false;
+    }
+    String logs = FileUtils.readFileToString(orchestrator.getServer().getWebLogs());
+    return logs.contains("Register metrics") &&
+      logs.contains("Register rules");
+  }
+
+  boolean hasCreatedSearchIndices() throws IOException {
+    if (orchestrator.getServer() == null) {
+      return false;
+    }
+    String logs = FileUtils.readFileToString(orchestrator.getServer().getWebLogs());
+    return logs.contains("[o.s.s.e.IndexCreator] Create index");
+  }
+
+  boolean anyLogsContain(String message) {
+    if (orchestrator.getServer() == null) {
+      return false;
+    }
+    return fileContains(orchestrator.getServer().getAppLogs(), message) ||
+      fileContains(orchestrator.getServer().getWebLogs(), message) ||
+      fileContains(orchestrator.getServer().getEsLogs(), message) ||
+      fileContains(orchestrator.getServer().getCeLogs(), message);
+  }
+
+  private boolean webLogsContain(String message) {
+    if (orchestrator.getServer() == null) {
+      return false;
+    }
+    return fileContains(orchestrator.getServer().getWebLogs(), message);
+  }
+
+  private static boolean fileContains(@Nullable File logFile, String message) {
+    try {
+      return logFile != null && logFile.exists() && FileUtils.readFileToString(logFile).contains(message);
+    } catch (Exception e) {
+      throw new IllegalStateException(e);
+    }
+  }
+
+  private boolean arePortsBound() {
+    return isPortBound(config.getHzPort()) &&
+      config.getSearchPort().map(this::isPortBound).orElse(true) &&
+      config.getWebPort().map(this::isPortBound).orElse(true);
+  }
+
+  private boolean isPortBound(int port) {
+    try (ServerSocket socket = new ServerSocket(port, 50, config.getAddress())) {
+      return false;
+    } catch (IOException e) {
+      return true;
+    }
+  }
+
+}
diff --git a/tests/src/test/java/org/sonarqube/tests/cluster/NodeConfig.java b/tests/src/test/java/org/sonarqube/tests/cluster/NodeConfig.java
new file mode 100644 (file)
index 0000000..df3d2ed
--- /dev/null
@@ -0,0 +1,187 @@
+/*
+ * SonarQube
+ * Copyright (C) 2009-2017 SonarSource SA
+ * mailto:info AT sonarsource DOT com
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 3 of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this program; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
+ */
+package org.sonarqube.tests.cluster;
+
+import com.sonar.orchestrator.util.NetworkUtils;
+import java.net.Inet4Address;
+import java.net.InetAddress;
+import java.net.NetworkInterface;
+import java.net.SocketException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Enumeration;
+import java.util.List;
+import java.util.Optional;
+import javax.annotation.Nullable;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+class NodeConfig {
+
+  enum NodeType {
+    SEARCH("search"), APPLICATION("application");
+
+    final String value;
+
+    NodeType(String value) {
+      this.value = value;
+    }
+
+    String getValue() {
+      return value;
+    }
+  }
+
+  private final NodeType type;
+  @Nullable
+  private final String name;
+  private final InetAddress address;
+  private final int hzPort;
+  @Nullable
+  private final Integer searchPort;
+  @Nullable
+  private final Integer webPort;
+  private final List<NodeConfig> connectedNodes = new ArrayList<>();
+  private final List<NodeConfig> searchNodes = new ArrayList<>();
+
+  private NodeConfig(NodeType type, @Nullable String name) {
+    this.type = type;
+    this.name = name;
+    this.address = getNonLoopbackIpv4Address();
+    this.hzPort = NetworkUtils.getNextAvailablePort(this.address);
+    this.connectedNodes.add(this);
+    switch (type) {
+      case SEARCH:
+        this.searchPort = NetworkUtils.getNextAvailablePort(this.address);
+        this.webPort = null;
+        this.searchNodes.add(this);
+        break;
+      case APPLICATION:
+        this.searchPort = null;
+        this.webPort = NetworkUtils.getNextAvailablePort(this.address);
+        break;
+      default:
+        throw new IllegalArgumentException();
+    }
+  }
+
+  NodeType getType() {
+    return type;
+  }
+
+  Optional<String> getName() {
+    return Optional.ofNullable(name);
+  }
+
+  InetAddress getAddress() {
+    return address;
+  }
+
+  int getHzPort() {
+    return hzPort;
+  }
+
+  Optional<Integer> getSearchPort() {
+    return Optional.ofNullable(searchPort);
+  }
+
+  Optional<Integer> getWebPort() {
+    return Optional.ofNullable(webPort);
+  }
+
+  String getHzHost() {
+    return address.getHostAddress() + ":" + hzPort;
+  }
+
+  String getSearchHost() {
+    return address.getHostAddress() + ":" + searchPort;
+  }
+
+  NodeConfig addConnectionToBus(NodeConfig... configs) {
+    connectedNodes.addAll(Arrays.asList(configs));
+    return this;
+  }
+
+  NodeConfig addConnectionToSearch(NodeConfig... configs) {
+    Arrays.stream(configs).forEach(config -> {
+      checkArgument(config.getType() == NodeType.SEARCH);
+      searchNodes.add(config);
+    });
+    return this;
+  }
+
+  List<NodeConfig> getConnectedNodes() {
+    return connectedNodes;
+  }
+
+  List<NodeConfig> getSearchNodes() {
+    return searchNodes;
+  }
+
+  static NodeConfig newApplicationConfig() {
+    return new NodeConfig(NodeType.APPLICATION, null);
+  }
+
+  static NodeConfig newSearchConfig() {
+    return new NodeConfig(NodeType.SEARCH, null);
+  }
+
+  /**
+   * See property sonar.cluster.hosts
+   */
+  static void interconnectBus(NodeConfig... configs) {
+    Arrays.stream(configs).forEach(config -> Arrays.stream(configs).filter(c -> c != config).forEach(config::addConnectionToBus));
+  }
+
+  /**
+   * See property sonar.cluster.search.hosts
+   */
+  static void interconnectSearch(NodeConfig... configs) {
+    Arrays.stream(configs).forEach(config -> Arrays.stream(configs)
+      .filter(c -> c.getType() == NodeType.SEARCH)
+      .forEach(config::addConnectionToSearch));
+  }
+
+  private static InetAddress getNonLoopbackIpv4Address() {
+    try {
+      Enumeration<NetworkInterface> nets = NetworkInterface.getNetworkInterfaces();
+      for (NetworkInterface networkInterface : Collections.list(nets)) {
+        if (!networkInterface.isLoopback() && networkInterface.isUp() && !isBlackListed(networkInterface)) {
+          Enumeration<InetAddress> inetAddresses = networkInterface.getInetAddresses();
+          while (inetAddresses.hasMoreElements()) {
+            InetAddress inetAddress = inetAddresses.nextElement();
+            if (inetAddress instanceof Inet4Address) {
+              return inetAddress;
+            }
+          }
+        }
+      }
+    } catch (SocketException se) {
+      throw new RuntimeException("Cannot find a non loopback card required for tests", se);
+    }
+    throw new RuntimeException("Cannot find a non loopback card required for tests");
+  }
+
+  private static boolean isBlackListed(NetworkInterface networkInterface) {
+    return networkInterface.getName().startsWith("docker") ||
+      networkInterface.getName().startsWith("vboxnet");
+  }
+}
diff --git a/tests/src/test/java/org/sonarqube/tests/cluster/StartupLogWatcherImpl.java b/tests/src/test/java/org/sonarqube/tests/cluster/StartupLogWatcherImpl.java
deleted file mode 100644 (file)
index 170ddc1..0000000
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * SonarQube
- * Copyright (C) 2009-2017 SonarSource SA
- * mailto:info AT sonarsource DOT com
- *
- * This program is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License as published by the Free Software Foundation; either
- * version 3 of the License, or (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public License
- * along with this program; if not, write to the Free Software Foundation,
- * Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
- */
-
-package org.sonarqube.tests.cluster;
-
-
-import com.sonar.orchestrator.server.StartupLogWatcher;
-
-public class StartupLogWatcherImpl implements StartupLogWatcher {
-  private static final String STARTUP_EXPECTED_MESSAGE = "SonarQube is up";
-
-  @Override
-  public boolean isStarted(String logLine) {
-    return logLine.contains(STARTUP_EXPECTED_MESSAGE);
-  }
-}
diff --git a/tests/src/test/java/org/sonarqube/tests/serverSystem/ClusterTest.java b/tests/src/test/java/org/sonarqube/tests/serverSystem/ClusterTest.java
deleted file mode 100644 (file)
index ff11037..0000000
+++ /dev/null
@@ -1,215 +0,0 @@
-/*
- * SonarQube
- * Copyright (C) 2009-2017 SonarSource SA
- * mailto:info AT sonarsource DOT com
- *
- * This program is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License as published by the Free Software Foundation; either
- * version 3 of the License, or (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public License
- * along with this program; if not, write to the Free Software Foundation,
- * Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
- */
-package org.sonarqube.tests.serverSystem;
-
-import com.google.common.base.Joiner;
-import com.google.common.collect.ImmutableMap;
-import com.sonar.orchestrator.Orchestrator;
-import com.sonar.orchestrator.server.StartupLogWatcher;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.nio.file.Files;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
-import org.apache.commons.io.FileUtils;
-import org.apache.commons.lang.StringUtils;
-import org.junit.Ignore;
-import org.junit.Test;
-import org.sonarqube.ws.Issues;
-import org.sonarqube.ws.Settings;
-import org.sonarqube.ws.client.rule.SearchWsRequest;
-import org.sonarqube.ws.client.setting.ValuesRequest;
-import util.ItUtils;
-
-import static org.apache.commons.lang3.StringUtils.containsIgnoreCase;
-import static org.assertj.core.api.Assertions.assertThat;
-import static util.ItUtils.newWsClient;
-
-@Ignore("temporarily ignored")
-public class ClusterTest {
-
-  private static final String CONF_FILE_PATH = "conf/sonar.properties";
-
-  /**
-   * SONAR-7899
-   */
-  @Test
-  public void secondary_nodes_do_not_write_to_datastores_at_startup() throws Exception {
-    // start "startup leader", which creates and populates datastores
-    Orchestrator orchestrator = Orchestrator.builderEnv()
-      .setServerProperty("sonar.cluster.enabled", "true")
-      .setServerProperty("sonar.cluster.name", "secondary_nodes_do_not_write_to_datastores_at_startup")
-      .setServerProperty("sonar.cluster.web.startupLeader", "true")
-      .setServerProperty("sonar.log.level", "TRACE")
-      .addPlugin(ItUtils.xooPlugin())
-      .build();
-    orchestrator.start();
-
-    expectLog(orchestrator, "Cluster enabled (startup leader)");
-    expectWriteOperations(orchestrator, true);
-    // verify that datastores are populated by requesting rules
-    assertThat(newWsClient(orchestrator).rules().search(new SearchWsRequest()).getTotal()).isGreaterThan(0);
-
-    FileUtils.write(orchestrator.getServer().getWebLogs(), "", false);
-    updateSonarPropertiesFile(orchestrator, ImmutableMap.of("sonar.cluster.web.startupLeader", "false"));
-    orchestrator.restartServer();
-
-    expectLog(orchestrator, "Cluster enabled (startup follower)");
-    expectWriteOperations(orchestrator, false);
-
-    orchestrator.stop();
-  }
-
-  @Test
-  public void start_cluster_of_elasticsearch_and_web_nodes() throws IOException {
-    Orchestrator elasticsearch = null;
-    Orchestrator web = null;
-
-    try {
-      ElasticsearchStartupWatcher esWatcher = new ElasticsearchStartupWatcher();
-      elasticsearch = Orchestrator.builderEnv()
-        .setServerProperty("sonar.cluster.enabled", "true")
-        .setServerProperty("sonar.cluster.name", "start_cluster_of_elasticsearch_and_web_nodes")
-        .setServerProperty("sonar.cluster.web.disabled", "true")
-        .setServerProperty("sonar.cluster.ce.disabled", "true")
-        .setStartupLogWatcher(esWatcher)
-        .build();
-      elasticsearch.start();
-      assertThat(esWatcher.port).isGreaterThan(0);
-      assertThat(FileUtils.readFileToString(elasticsearch.getServer().getAppLogs())).doesNotContain("Process[web]");
-
-      web = Orchestrator.builderEnv()
-        .setServerProperty("sonar.cluster.enabled", "true")
-        .setServerProperty("sonar.cluster.name", "start_cluster_of_elasticsearch_and_web_nodes")
-        .setServerProperty("sonar.cluster.web.startupLeader", "true")
-        .setServerProperty("sonar.cluster.search.disabled", "true")
-        .setServerProperty("sonar.cluster.search.hosts", "localhost:" + esWatcher.port)
-        // no need for compute engine in this test. Disable it for faster test.
-        .setServerProperty("sonar.cluster.ce.disabled", "true")
-        // override the default watcher provided by Orchestrator
-        // which waits for Compute Engine to be up
-        .setStartupLogWatcher(log -> log.contains("SonarQube is up"))
-        .build();
-      web.start();
-
-      String coreId = getPropertyValue(web, "sonar.core.id");
-      String startTime = getPropertyValue(web, "sonar.core.startTime");
-
-      assertThat(FileUtils.readFileToString(web.getServer().getAppLogs())).doesNotContain("Process[es]");
-      // call a web service that requires Elasticsearch
-      Issues.SearchWsResponse wsResponse = newWsClient(web).issues().search(new org.sonarqube.ws.client.issue.SearchWsRequest());
-      assertThat(wsResponse.getIssuesCount()).isEqualTo(0);
-
-      web.restartServer();
-
-      // sonar core id must not change after restart
-      assertThat(getPropertyValue(web, "sonar.core.id")).isEqualTo(coreId);
-      // startTime must change at each startup
-      assertThat(getPropertyValue(web, "sonar.core.startTime")).isNotEqualTo(startTime);
-    } finally {
-      if (web != null) {
-        web.stop();
-      }
-      if (elasticsearch != null) {
-        elasticsearch.stop();
-      }
-    }
-  }
-
-  private static String getPropertyValue(Orchestrator web, String property) {
-    Settings.ValuesWsResponse response = ItUtils.newAdminWsClient(web).settings().values(ValuesRequest.builder().setKeys(property).build());
-    List<Settings.Setting> settingsList = response.getSettingsList();
-    if (settingsList.isEmpty()) {
-      return null;
-    }
-    assertThat(settingsList).hasSize(1);
-    return settingsList.iterator().next().getValue();
-  }
-
-  private static class ElasticsearchStartupWatcher implements StartupLogWatcher {
-    private final Pattern pattern = Pattern.compile("Elasticsearch listening on .*:(\\d+)");
-    private int port = -1;
-
-    @Override
-    public boolean isStarted(String log) {
-      Matcher matcher = pattern.matcher(log);
-      if (matcher.find()) {
-        port = Integer.parseInt(matcher.group(1));
-      }
-      return log.contains("Process[es] is up");
-    }
-  }
-
-  private static void expectLog(Orchestrator orchestrator, String expectedLog) throws IOException {
-    File logFile = orchestrator.getServer().getWebLogs();
-    try (Stream<String> lines = Files.lines(logFile.toPath())) {
-      assertThat(lines.anyMatch(s -> StringUtils.containsIgnoreCase(s, expectedLog))).isTrue();
-    }
-  }
-
-  private static void expectWriteOperations(Orchestrator orchestrator, boolean expected) throws IOException {
-    try (Stream<String> lines = Files.lines(orchestrator.getServer().getWebLogs().toPath())) {
-      List<String> writeOperations = lines.filter(ClusterTest::isWriteOperation).collect(Collectors.toList());
-      if (expected) {
-        assertThat(writeOperations).isNotEmpty();
-      } else {
-        assertThat(writeOperations).as("Unexpected write operations: " + Joiner.on('\n').join(writeOperations)).isEmpty();
-
-      }
-    }
-  }
-
-  private static boolean isWriteOperation(String log) {
-    return isDbWriteOperation(log) || isEsWriteOperation(log);
-  }
-
-  private static boolean isDbWriteOperation(String log) {
-    return log.contains("web[][sql]") && (containsIgnoreCase(log, "sql=insert") ||
-      containsIgnoreCase(log, "sql=update") ||
-      containsIgnoreCase(log, "sql=delete") ||
-      containsIgnoreCase(log, "sql=create"));
-  }
-
-  private static boolean isEsWriteOperation(String log) {
-    return log.contains("web[][es]") && (containsIgnoreCase(log, "Create index") ||
-      containsIgnoreCase(log, "Create type") ||
-      containsIgnoreCase(log, "put mapping request") ||
-      containsIgnoreCase(log, "refresh request") ||
-      containsIgnoreCase(log, "index request"));
-  }
-
-  private static void updateSonarPropertiesFile(Orchestrator orchestrator, Map<String, String> props) throws IOException {
-    Properties propsFile = new Properties();
-    try (FileInputStream conf = FileUtils.openInputStream(new File(orchestrator.getServer().getHome(), CONF_FILE_PATH))) {
-      propsFile.load(conf);
-      propsFile.putAll(props);
-    }
-    try (FileOutputStream conf = FileUtils.openOutputStream(new File(orchestrator.getServer().getHome(), CONF_FILE_PATH))) {
-      propsFile.store(conf, "");
-    }
-  }
-}