diff options
24 files changed, 1228 insertions, 726 deletions
diff --git a/server/sonar-ce/src/main/java/org/sonar/ce/app/CeServer.java b/server/sonar-ce/src/main/java/org/sonar/ce/app/CeServer.java index 129c65ef502..05815aed2b5 100644 --- a/server/sonar-ce/src/main/java/org/sonar/ce/app/CeServer.java +++ b/server/sonar-ce/src/main/java/org/sonar/ce/app/CeServer.java @@ -171,8 +171,9 @@ public class CeServer implements Monitored { try { Thread.sleep(CHECK_FOR_STOP_DELAY); } catch (InterruptedException e) { - // ignore the interruption itself, check the flag - Thread.currentThread().interrupt(); + // ignore the interruption itself + // Do not propagate the isInterrupted flag with : Thread.currentThread().interrupt(); + // It will break the shutdown of ComputeEngineContainerImpl#stop() } } attemptShutdown(); @@ -180,10 +181,11 @@ public class CeServer implements Monitored { private void attemptShutdown() { try { - LOG.info("Compute Engine shutting down..."); + LOG.info("Compute Engine is stopping..."); computeEngine.shutdown(); + LOG.info("Compute Engine is stopped"); } catch (Throwable e) { - LOG.error("Compute Engine shutdown failed", e); + LOG.error("Compute Engine failed to stop", e); } finally { // release thread waiting for CeServer stopAwait(); @@ -207,8 +209,9 @@ public class CeServer implements Monitored { if (t != null) { t.interrupt(); try { - t.join(1000); + t.join(1_000); } catch (InterruptedException e) { + Thread.currentThread().interrupt(); // Ignored } } diff --git a/server/sonar-ce/src/main/java/org/sonar/ce/container/ComputeEngineContainerImpl.java b/server/sonar-ce/src/main/java/org/sonar/ce/container/ComputeEngineContainerImpl.java index d89b83febaf..ca967e1b59d 100644 --- a/server/sonar-ce/src/main/java/org/sonar/ce/container/ComputeEngineContainerImpl.java +++ b/server/sonar-ce/src/main/java/org/sonar/ce/container/ComputeEngineContainerImpl.java @@ -53,6 +53,7 @@ import org.sonar.ce.platform.ComputeEngineExtensionInstaller; import org.sonar.ce.queue.CeQueueCleaner; import org.sonar.ce.queue.PurgeCeActivities; import org.sonar.ce.settings.ProjectConfigurationFactory; +import org.sonar.ce.taskprocessor.CeProcessingScheduler; import org.sonar.ce.taskprocessor.CeTaskProcessorModule; import org.sonar.ce.user.CeUserSession; import org.sonar.core.component.DefaultResourceTypes; @@ -220,6 +221,11 @@ public class ComputeEngineContainerImpl implements ComputeEngineContainer { @Override public ComputeEngineContainer stop() { + if (level4 != null) { + // try to graceful stop in-progress tasks + CeProcessingScheduler ceProcessingScheduler = level4.getComponentByType(CeProcessingScheduler.class); + ceProcessingScheduler.stopScheduling(); + } this.level1.stopComponents(); return this; } diff --git a/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeProcessingScheduler.java b/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeProcessingScheduler.java index 831684d2e8c..f2a327fe720 100644 --- a/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeProcessingScheduler.java +++ b/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeProcessingScheduler.java @@ -23,4 +23,5 @@ public interface CeProcessingScheduler { void startScheduling(); + void stopScheduling(); } diff --git a/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeProcessingSchedulerImpl.java b/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeProcessingSchedulerImpl.java index 458f527f307..4792bd27ed4 100644 --- a/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeProcessingSchedulerImpl.java +++ b/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeProcessingSchedulerImpl.java @@ -27,7 +27,6 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import javax.annotation.CheckForNull; import javax.annotation.Nullable; -import org.picocontainer.Startable; import org.sonar.api.utils.log.Logger; import org.sonar.api.utils.log.Loggers; import org.sonar.ce.configuration.CeConfiguration; @@ -35,7 +34,7 @@ import org.sonar.ce.configuration.CeConfiguration; import static com.google.common.util.concurrent.Futures.addCallback; import static java.util.concurrent.TimeUnit.MILLISECONDS; -public class CeProcessingSchedulerImpl implements CeProcessingScheduler, Startable { +public class CeProcessingSchedulerImpl implements CeProcessingScheduler { private static final Logger LOG = Loggers.get(CeProcessingSchedulerImpl.class); private static final long DELAY_BETWEEN_DISABLED_TASKS = 30 * 1000L; // 30 seconds @@ -43,12 +42,15 @@ public class CeProcessingSchedulerImpl implements CeProcessingScheduler, Startab private final long delayBetweenEnabledTasks; private final TimeUnit timeUnit; private final ChainingCallback[] chainingCallbacks; + private final EnabledCeWorkerController ceWorkerController; public CeProcessingSchedulerImpl(CeConfiguration ceConfiguration, - CeProcessingSchedulerExecutorService processingExecutorService, CeWorkerFactory ceCeWorkerFactory) { + CeProcessingSchedulerExecutorService processingExecutorService, CeWorkerFactory ceCeWorkerFactory, + EnabledCeWorkerController ceWorkerController) { this.executorService = processingExecutorService; this.delayBetweenEnabledTasks = ceConfiguration.getQueuePollingDelay(); + this.ceWorkerController = ceWorkerController; this.timeUnit = MILLISECONDS; int threadWorkerCount = ceConfiguration.getWorkerMaxCount(); @@ -60,11 +62,6 @@ public class CeProcessingSchedulerImpl implements CeProcessingScheduler, Startab } @Override - public void start() { - // nothing to do at component startup, startScheduling will be called by CeQueueInitializer - } - - @Override public void startScheduling() { for (ChainingCallback chainingCallback : chainingCallbacks) { ListenableScheduledFuture<CeWorker.Result> future = executorService.schedule(chainingCallback.worker, delayBetweenEnabledTasks, timeUnit); @@ -72,10 +69,37 @@ public class CeProcessingSchedulerImpl implements CeProcessingScheduler, Startab } } + /** + * This method is stopping all the workers giving them a delay before killing them. + */ @Override - public void stop() { + public void stopScheduling() { + LOG.debug("Stopping compute engine"); + // Requesting all workers to stop + for (ChainingCallback chainingCallback : chainingCallbacks) { + chainingCallback.stop(false); + } + + // Workers have 40s to gracefully stop processing tasks + long until = System.currentTimeMillis() + 40_000L; + LOG.info("Waiting for workers to finish in-progress tasks"); + while (System.currentTimeMillis() < until && ceWorkerController.hasAtLeastOneProcessingWorker()) { + try { + Thread.sleep(200L); + } catch (InterruptedException e) { + LOG.debug("Graceful stop period has been interrupted", e); + Thread.currentThread().interrupt(); + break; + } + } + + if (ceWorkerController.hasAtLeastOneProcessingWorker()) { + LOG.info("Some in-progress tasks did not finish in due time. Tasks will be stopped."); + } + + // Interrupting the tasks for (ChainingCallback chainingCallback : chainingCallbacks) { - chainingCallback.stop(); + chainingCallback.stop(true); } } @@ -149,10 +173,10 @@ public class CeProcessingSchedulerImpl implements CeProcessingScheduler, Startab return keepRunning.get(); } - public void stop() { + public void stop(boolean interrupt) { this.keepRunning.set(false); if (workerFuture != null) { - workerFuture.cancel(false); + workerFuture.cancel(interrupt); } } } diff --git a/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeWorkerImpl.java b/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeWorkerImpl.java index d52bed437f9..f8e6807cfa5 100644 --- a/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeWorkerImpl.java +++ b/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeWorkerImpl.java @@ -79,7 +79,7 @@ public class CeWorkerImpl implements CeWorker { return NO_TASK; } - try { + try (EnabledCeWorkerController.ProcessingRecorderHook processing = enabledCeWorkerController.registerProcessingFor(this)) { executeTask(ceTask.get()); } catch (Exception e) { LOG.error(format("An error occurred while executing task with uuid '%s'", ceTask.get().getUuid()), e); diff --git a/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/EnabledCeWorkerController.java b/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/EnabledCeWorkerController.java index 37c6c59c0ef..5634a20e455 100644 --- a/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/EnabledCeWorkerController.java +++ b/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/EnabledCeWorkerController.java @@ -24,6 +24,9 @@ package org.sonar.ce.taskprocessor; * task to process. */ public interface EnabledCeWorkerController { + interface ProcessingRecorderHook extends AutoCloseable { + } + /** * Requests the {@link EnabledCeWorkerController} to refresh its state, if it has any. */ @@ -33,4 +36,13 @@ public interface EnabledCeWorkerController { * Returns {@code true} if the specified {@link CeWorker} is enabled */ boolean isEnabled(CeWorker ceWorker); + + ProcessingRecorderHook registerProcessingFor(CeWorker ceWorker); + + /** + * Whether at least one worker is being processed a task or not. + * Returns {@code false} when all workers are waiting for tasks + * or are being stopped. + */ + boolean hasAtLeastOneProcessingWorker(); } diff --git a/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/EnabledCeWorkerControllerImpl.java b/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/EnabledCeWorkerControllerImpl.java index 7bb905a3db5..500eb671be9 100644 --- a/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/EnabledCeWorkerControllerImpl.java +++ b/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/EnabledCeWorkerControllerImpl.java @@ -19,14 +19,20 @@ */ package org.sonar.ce.taskprocessor; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicInteger; import org.sonar.api.utils.log.Loggers; import org.sonar.ce.configuration.CeConfiguration; public class EnabledCeWorkerControllerImpl implements EnabledCeWorkerController { + private final ConcurrentHashMap<CeWorker, Status> map = new ConcurrentHashMap<>(); private final CeConfiguration ceConfiguration; private final AtomicInteger workerCount; + enum Status { + PROCESSING, PAUSED + } + public EnabledCeWorkerControllerImpl(CeConfiguration ceConfiguration) { this.ceConfiguration = ceConfiguration; this.workerCount = new AtomicInteger(ceConfiguration.getWorkerCount()); @@ -46,6 +52,16 @@ public class EnabledCeWorkerControllerImpl implements EnabledCeWorkerController logEnabledWorkerCount(); } + @Override + public ProcessingRecorderHook registerProcessingFor(CeWorker ceWorker) { + return new ProcessingRecorderHookImpl(ceWorker); + } + + @Override + public boolean hasAtLeastOneProcessingWorker() { + return map.entrySet().stream().anyMatch(e -> e.getValue() == Status.PROCESSING); + } + /** * Returns {@code true} if {@link CeWorker#getOrdinal() worker ordinal} is strictly less than * {@link CeConfiguration#getWorkerCount()}. @@ -56,4 +72,18 @@ public class EnabledCeWorkerControllerImpl implements EnabledCeWorkerController public boolean isEnabled(CeWorker ceWorker) { return ceWorker.getOrdinal() < workerCount.get(); } + + private class ProcessingRecorderHookImpl implements ProcessingRecorderHook { + private final CeWorker ceWorker; + + private ProcessingRecorderHookImpl(CeWorker ceWorker) { + this.ceWorker = ceWorker; + map.put(this.ceWorker, Status.PROCESSING); + } + + @Override + public void close() throws Exception { + map.put(ceWorker, Status.PAUSED); + } + } } diff --git a/server/sonar-ce/src/test/java/org/sonar/ce/taskprocessor/CeProcessingSchedulerImplTest.java b/server/sonar-ce/src/test/java/org/sonar/ce/taskprocessor/CeProcessingSchedulerImplTest.java index 660dcc9a5ed..e98fa826962 100644 --- a/server/sonar-ce/src/test/java/org/sonar/ce/taskprocessor/CeProcessingSchedulerImplTest.java +++ b/server/sonar-ce/src/test/java/org/sonar/ce/taskprocessor/CeProcessingSchedulerImplTest.java @@ -67,20 +67,20 @@ import static org.sonar.ce.taskprocessor.CeWorker.Result.TASK_PROCESSED; public class CeProcessingSchedulerImplTest { private static final Error ERROR_TO_INTERRUPT_CHAINING = new Error("Error should stop scheduling"); - @Rule // due to risks of infinite chaining of tasks/futures, a timeout is required for safety + @Rule public TestRule safeguardTimeout = new DisableOnDebug(Timeout.seconds(60)); @Rule public CeConfigurationRule ceConfiguration = new CeConfigurationRule(); - // Required to prevent an infinite loop private CeWorker ceWorker = mock(CeWorker.class); private CeWorkerFactory ceWorkerFactory = new TestCeWorkerFactory(ceWorker); private StubCeProcessingSchedulerExecutorService processingExecutorService = new StubCeProcessingSchedulerExecutorService(); private SchedulerCall regularDelayedPoll = new SchedulerCall(ceWorker, 2000L, MILLISECONDS); private SchedulerCall extendedDelayedPoll = new SchedulerCall(ceWorker, 30000L, MILLISECONDS); private SchedulerCall notDelayedPoll = new SchedulerCall(ceWorker); + private EnabledCeWorkerController ceWorkerController = new EnabledCeWorkerControllerImpl(ceConfiguration); - private CeProcessingSchedulerImpl underTest = new CeProcessingSchedulerImpl(ceConfiguration, processingExecutorService, ceWorkerFactory); + private CeProcessingSchedulerImpl underTest = new CeProcessingSchedulerImpl(ceConfiguration, processingExecutorService, ceWorkerFactory, ceWorkerController); @Test public void polls_without_delay_when_CeWorkerCallable_returns_TASK_PROCESSED() throws Exception { @@ -164,7 +164,7 @@ public class CeProcessingSchedulerImplTest { } @Test - public void stop_cancels_next_polling_and_does_not_add_any_new_one() throws Exception { + public void stopScheduling_cancels_next_polling_and_does_not_add_any_new_one() throws Exception { when(ceWorker.call()) .thenReturn(NO_TASK) .thenReturn(TASK_PROCESSED) @@ -188,7 +188,7 @@ public class CeProcessingSchedulerImplTest { } // call stop after second delayed polling if (i == 1) { - underTest.stop(); + underTest.stopScheduling(); } i++; } @@ -219,7 +219,7 @@ public class CeProcessingSchedulerImplTest { when(processingExecutorService.schedule(any(CeWorker.class), any(Long.class), any(TimeUnit.class))).thenReturn(listenableScheduledFuture); CeWorkerFactory ceWorkerFactory = spy(new TestCeWorkerFactory(workers)); - CeProcessingSchedulerImpl underTest = new CeProcessingSchedulerImpl(ceConfiguration, processingExecutorService, ceWorkerFactory); + CeProcessingSchedulerImpl underTest = new CeProcessingSchedulerImpl(ceConfiguration, processingExecutorService, ceWorkerFactory, ceWorkerController); when(processingExecutorService.schedule(ceWorker, ceConfiguration.getQueuePollingDelay(), MILLISECONDS)) .thenReturn(listenableScheduledFuture); @@ -454,7 +454,7 @@ public class CeProcessingSchedulerImplTest { @Override public void shutdown() { - throw new UnsupportedOperationException("shutdown() not implemented"); + // Nothing to do } @Override diff --git a/server/sonar-server/src/main/java/org/sonar/server/computation/task/projectanalysis/step/LoadMeasureComputersStep.java b/server/sonar-server/src/main/java/org/sonar/server/computation/task/projectanalysis/step/LoadMeasureComputersStep.java index 5f21f03de41..09737ff467d 100644 --- a/server/sonar-server/src/main/java/org/sonar/server/computation/task/projectanalysis/step/LoadMeasureComputersStep.java +++ b/server/sonar-server/src/main/java/org/sonar/server/computation/task/projectanalysis/step/LoadMeasureComputersStep.java @@ -222,8 +222,8 @@ public class LoadMeasureComputersStep implements ComputationStep { private class ValidateOutputMetric implements Predicate<String> { @Override public boolean apply(@Nonnull String metric) { - checkState(!CORE_METRIC_KEYS.contains(metric), "Metric '%s' cannot be used as an output metric as it's a core metric", metric); - checkState(pluginMetricKeys.contains(metric), "Metric '%s' cannot be used as an output metric as no plugin declare this metric", metric); + checkState(!CORE_METRIC_KEYS.contains(metric), "Metric '%s' cannot be used as an output metric because it's a core metric", metric); + checkState(pluginMetricKeys.contains(metric), "Metric '%s' cannot be used as an output metric because no plugins declare this metric", metric); return true; } } diff --git a/server/sonar-server/src/test/java/org/sonar/server/computation/task/projectanalysis/step/LoadMeasureComputersStepTest.java b/server/sonar-server/src/test/java/org/sonar/server/computation/task/projectanalysis/step/LoadMeasureComputersStepTest.java index 293e5468d4c..b2c9dbe89f7 100644 --- a/server/sonar-server/src/test/java/org/sonar/server/computation/task/projectanalysis/step/LoadMeasureComputersStepTest.java +++ b/server/sonar-server/src/test/java/org/sonar/server/computation/task/projectanalysis/step/LoadMeasureComputersStepTest.java @@ -127,7 +127,7 @@ public class LoadMeasureComputersStepTest { @Test public void fail_with_ISE_when_output_metric_is_not_define_by_plugin() throws Exception { thrown.expect(IllegalStateException.class); - thrown.expectMessage("Metric 'unknown' cannot be used as an output metric as no plugin declare this metric"); + thrown.expectMessage("Metric 'unknown' cannot be used as an output metric because no plugins declare this metric"); MeasureComputer[] computers = new MeasureComputer[] {newMeasureComputer(array(NEW_METRIC_4), array("unknown"))}; ComputationStep underTest = new LoadMeasureComputersStep(holder, array(new TestMetrics()), computers); @@ -137,7 +137,7 @@ public class LoadMeasureComputersStepTest { @Test public void fail_with_ISE_when_output_metric_is_a_core_metric() throws Exception { thrown.expect(IllegalStateException.class); - thrown.expectMessage("Metric 'ncloc' cannot be used as an output metric as it's a core metric"); + thrown.expectMessage("Metric 'ncloc' cannot be used as an output metric because it's a core metric"); MeasureComputer[] computers = new MeasureComputer[] {newMeasureComputer(array(NEW_METRIC_4), array(NCLOC_KEY))}; ComputationStep underTest = new LoadMeasureComputersStep(holder, array(new TestMetrics()), computers); @@ -172,7 +172,7 @@ public class LoadMeasureComputersStepTest { @Test public void fail_with_ISE_when_no_metrics_are_defined_by_plugin_but_measure_computer_use_a_new_metric() throws Exception { thrown.expect(IllegalStateException.class); - thrown.expectMessage("Metric 'metric1' cannot be used as an output metric as no plugin declare this metric"); + thrown.expectMessage("Metric 'metric1' cannot be used as an output metric because no plugins declare this metric"); MeasureComputer[] computers = new MeasureComputer[] {newMeasureComputer(array(NCLOC_KEY), array(NEW_METRIC_1))}; ComputationStep underTest = new LoadMeasureComputersStep(holder, computers); diff --git a/sonar-application/src/main/assembly/conf/wrapper.conf b/sonar-application/src/main/assembly/conf/wrapper.conf index 3f37ed2fb46..8fb972ab0b1 100644 --- a/sonar-application/src/main/assembly/conf/wrapper.conf +++ b/sonar-application/src/main/assembly/conf/wrapper.conf @@ -86,4 +86,4 @@ wrapper.ntservice.interactive=false wrapper.disable_restarts=TRUE wrapper.ping.timeout=0 wrapper.shutdown.timeout=300 -wrapper.jvm_exit.timeout=15 +wrapper.jvm_exit.timeout=300 diff --git a/tests/plugins/server-plugin/src/main/java/ServerPlugin.java b/tests/plugins/server-plugin/src/main/java/ServerPlugin.java index 07aa1f861a2..9ab0dd987de 100644 --- a/tests/plugins/server-plugin/src/main/java/ServerPlugin.java +++ b/tests/plugins/server-plugin/src/main/java/ServerPlugin.java @@ -38,6 +38,8 @@ * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. */ +import ce.CePauseStep; +import ce.PauseMetric; import java.util.Arrays; import java.util.List; import org.sonar.api.Properties; @@ -86,6 +88,6 @@ import static org.sonar.api.PropertyType.USER_LOGIN; public class ServerPlugin extends SonarPlugin { public List getExtensions() { return Arrays.asList( - StartupCrash.class, TempFolderExtension.class); + StartupCrash.class, TempFolderExtension.class, PauseMetric.class, CePauseStep.class); } } diff --git a/tests/plugins/server-plugin/src/main/java/ce/CePauseStep.java b/tests/plugins/server-plugin/src/main/java/ce/CePauseStep.java new file mode 100644 index 00000000000..8214ac56d31 --- /dev/null +++ b/tests/plugins/server-plugin/src/main/java/ce/CePauseStep.java @@ -0,0 +1,63 @@ +/* + * SonarQube + * Copyright (C) 2009-2017 SonarSource SA + * mailto:info AT sonarsource DOT com + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 3 of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with this program; if not, write to the Free Software Foundation, + * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ +package ce; + +import java.io.File; +import org.sonar.api.ce.measure.Component; +import org.sonar.api.ce.measure.MeasureComputer; +import org.sonar.api.utils.log.Logger; +import org.sonar.api.utils.log.Loggers; + +public class CePauseStep implements MeasureComputer { + + private static final Logger LOGGER = Loggers.get(CePauseStep.class); + + @Override + public MeasureComputerDefinition define(MeasureComputerDefinitionContext defContext) { + return defContext.newDefinitionBuilder() + .setInputMetrics("ncloc") + .setOutputMetrics(PauseMetric.KEY) + .build(); + } + + @Override + public void compute(MeasureComputerContext context) { + if (context.getComponent().getType() == Component.Type.PROJECT) { + String path = context.getSettings().getString("sonar.ce.pauseTask.path"); + if (path != null) { + waitForFileToBeDeleted(path); + } + } + } + + private static void waitForFileToBeDeleted(String path) { + LOGGER.info("CE analysis is paused. Waiting for file to be deleted: " + path); + File file = new File(path); + try { + while (file.exists()) { + Thread.sleep(500L); + } + LOGGER.info("CE analysis is resumed"); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new IllegalStateException("CE analysis has been interrupted"); + } + } +} diff --git a/tests/src/test/java/org/sonarqube/tests/cluster/StartupLogWatcherImpl.java b/tests/plugins/server-plugin/src/main/java/ce/PauseMetric.java index 170ddc16872..a16f821e4e4 100644 --- a/tests/src/test/java/org/sonarqube/tests/cluster/StartupLogWatcherImpl.java +++ b/tests/plugins/server-plugin/src/main/java/ce/PauseMetric.java @@ -17,17 +17,19 @@ * along with this program; if not, write to the Free Software Foundation, * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. */ +package ce; -package org.sonarqube.tests.cluster; +import java.util.Arrays; +import java.util.List; +import org.sonar.api.measures.Metric; +import org.sonar.api.measures.Metrics; +public class PauseMetric implements Metrics { -import com.sonar.orchestrator.server.StartupLogWatcher; - -public class StartupLogWatcherImpl implements StartupLogWatcher { - private static final String STARTUP_EXPECTED_MESSAGE = "SonarQube is up"; + public static final String KEY = "pause"; @Override - public boolean isStarted(String logLine) { - return logLine.contains(STARTUP_EXPECTED_MESSAGE); + public List<Metric> getMetrics() { + return Arrays.asList(new Metric.Builder(KEY, "Pause", Metric.ValueType.INT).create()); } } diff --git a/tests/src/test/java/org/sonarqube/tests/Category5Suite.java b/tests/src/test/java/org/sonarqube/tests/Category5Suite.java index 55556648e85..7bc3331d1d3 100644 --- a/tests/src/test/java/org/sonarqube/tests/Category5Suite.java +++ b/tests/src/test/java/org/sonarqube/tests/Category5Suite.java @@ -21,11 +21,12 @@ package org.sonarqube.tests; import org.junit.runner.RunWith; import org.junit.runners.Suite; +import org.sonarqube.tests.ce.CeShutdownTest; import org.sonarqube.tests.ce.CeWorkersTest; import org.sonarqube.tests.qualityProfile.ActiveRuleEsResilienceTest; import org.sonarqube.tests.qualityProfile.BuiltInQualityProfilesNotificationTest; import org.sonarqube.tests.rule.RuleEsResilienceTest; -import org.sonarqube.tests.serverSystem.ClusterTest; +import org.sonarqube.tests.cluster.ClusterTest; import org.sonarqube.tests.serverSystem.RestartTest; import org.sonarqube.tests.serverSystem.ServerSystemRestartingOrchestrator; import org.sonarqube.tests.settings.ElasticsearchSettingsTest; @@ -63,7 +64,9 @@ import org.sonarqube.tests.user.UserEsResilienceTest; TelemetryUploadTest.class, TelemetryOptOutTest.class, // ce + CeShutdownTest.class, CeWorkersTest.class, + // elasticsearch ElasticsearchSettingsTest.class }) diff --git a/tests/src/test/java/org/sonarqube/tests/LogsTailer.java b/tests/src/test/java/org/sonarqube/tests/LogsTailer.java new file mode 100644 index 00000000000..06f4d7d189a --- /dev/null +++ b/tests/src/test/java/org/sonarqube/tests/LogsTailer.java @@ -0,0 +1,162 @@ +/* + * SonarQube + * Copyright (C) 2009-2017 SonarSource SA + * mailto:info AT sonarsource DOT com + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 3 of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with this program; if not, write to the Free Software Foundation, + * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ +package org.sonarqube.tests; + +import java.io.File; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Optional; +import java.util.concurrent.CountDownLatch; +import java.util.function.Consumer; +import java.util.stream.Collectors; +import org.apache.commons.io.input.Tailer; +import org.apache.commons.io.input.TailerListenerAdapter; + +import static java.util.Objects.requireNonNull; + +/** + * Watch log files, usually server logs (see Orchestrator.getServer().get*Logs()). + * This class allows to not load the full content in memory. + */ +public class LogsTailer implements AutoCloseable { + + private final List<Tailer> tailers; + private final LogConsumer logConsumer; + + private LogsTailer(Builder builder) { + logConsumer = new LogConsumer(builder.consumers); + tailers = builder.files.stream() + .map(file -> Tailer.create(file, logConsumer, 500)) + .collect(Collectors.toList()); + } + + public Watch watch(String text) { + return new Watch(text); + } + + @Override + public void close() throws Exception { + for (Tailer tailer : tailers) { + tailer.stop(); + } + } + + public static Builder builder() { + return new Builder(); + } + + public static class Builder { + private final List<File> files = new ArrayList<>(); + private final List<Consumer<String>> consumers = new ArrayList<>(); + + public Builder addFile(File file) { + this.files.add(file); + return this; + } + + public Builder addFiles(File file, File... otherFiles) { + this.files.add(file); + Collections.addAll(this.files, otherFiles); + return this; + } + + public Builder addConsumer(Consumer<String> consumer) { + this.consumers.add(consumer); + return this; + } + + public Builder doOnFind(String text, Runnable runnable) { + return addConsumer(log -> { + if (log.contains(text)) { + runnable.run(); + } + }); + } + + public LogsTailer build() { + return new LogsTailer(this); + } + } + + private static class LogConsumer extends TailerListenerAdapter { + private final List<Consumer<String>> consumers = Collections.synchronizedList(new ArrayList<>()); + + private LogConsumer(List<Consumer<String>> consumers) { + this.consumers.addAll(consumers); + } + + @Override + public void handle(String line) { + synchronized (consumers) { + for (Consumer<String> consumer : consumers) { + try { + consumer.accept(line); + } catch (Exception e) { + // do not prevent other consumers to handle the log + e.printStackTrace(); + } + } + } + } + + public void add(Consumer<String> consumer) { + this.consumers.add(consumer); + } + + public void remove(Consumer<String> consumer) { + this.consumers.remove(consumer); + } + } + + public class Watch implements AutoCloseable { + private final String expectedText; + private final CountDownLatch foundSignal = new CountDownLatch(1); + private String log = null; + private final Consumer<String> consumer; + + private Watch(String expectedText) { + this.expectedText = requireNonNull(expectedText); + this.consumer = log -> { + if (log.contains(this.expectedText)) { + this.log = log; + foundSignal.countDown(); + } + }; + logConsumer.add(consumer); + } + + /** + * Blocks until the expected log appears in watched files. + */ + public void waitForLog() throws InterruptedException { + foundSignal.await(); + } + + public Optional<String> getLog() { + return Optional.ofNullable(log); + } + + @Override + public void close() { + logConsumer.remove(consumer); + } + } +} diff --git a/tests/src/test/java/org/sonarqube/tests/ce/CeShutdownTest.java b/tests/src/test/java/org/sonarqube/tests/ce/CeShutdownTest.java new file mode 100644 index 00000000000..29c237ca7ca --- /dev/null +++ b/tests/src/test/java/org/sonarqube/tests/ce/CeShutdownTest.java @@ -0,0 +1,176 @@ +/* + * SonarQube + * Copyright (C) 2009-2017 SonarSource SA + * mailto:info AT sonarsource DOT com + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 3 of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with this program; if not, write to the Free Software Foundation, + * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ +package org.sonarqube.tests.ce; + +import com.sonar.orchestrator.Orchestrator; +import com.sonar.orchestrator.build.SonarScanner; +import java.io.File; +import java.io.IOException; +import org.apache.commons.io.FileUtils; +import org.junit.Ignore; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.DisableOnDebug; +import org.junit.rules.TemporaryFolder; +import org.junit.rules.TestRule; +import org.junit.rules.Timeout; +import org.sonarqube.tests.LogsTailer; +import org.sonarqube.ws.client.WsClient; +import org.sonarqube.ws.client.ce.ActivityStatusWsRequest; +import util.ItUtils; + +import static com.google.common.base.Preconditions.checkState; +import static org.assertj.core.api.Assertions.assertThat; + +public class CeShutdownTest { + + @Rule + public TestRule safeguard = new DisableOnDebug(Timeout.seconds(600)); + @Rule + public TemporaryFolder temp = new TemporaryFolder(); + + @Test + public void stopping_CE_waits_for_in_progress_task_to_be_finished() throws Exception { + try (ComputeEngine ce = new ComputeEngine()) { + + try (LogsTailer.Watch watch = ce.logs().watch("CE analysis is paused")) { + ce.triggerTask(); + watch.waitForLog(); + assertThat(ce.countInProgressTasks()).isEqualTo(1); + } + + // stop does not kill in-progress workers. It waits + // for them (at least a few dozens of seconds) + try (LogsTailer.Watch watch = ce.logs().watch("Waiting for workers to finish in-progress tasks")) { + ce.triggerStop(); + watch.waitForLog(); + assertThat(ce.countInProgressTasks()).isEqualTo(1); + } + + // resume the in-progress task, so that it can + // finish successfully + try (LogsTailer.Watch watch = ce.logs().watch("Process [ce] is stopped")) { + ce.resumeTask(); + watch.waitForLog(); + assertThat(ce.isTaskFinished()).isTrue(); + assertThat(ce.hasErrorLogs()).isFalse(); + } + } + } + + @Test + @Ignore("TODO make the graceful stop timeout configurable. 40 seconds is too long for a test.") + public void stopping_CE_kills_in_progress_tasks_if_too_long_to_gracefully_stop() throws Exception { + try (ComputeEngine ce = new ComputeEngine()) { + + try (LogsTailer.Watch watch = ce.logs().watch("CE analysis is paused")) { + ce.triggerTask(); + watch.waitForLog(); + assertThat(ce.countInProgressTasks()).isEqualTo(1); + } + + // stop does not kill in-progress workers. It waits + // for them (at least a few dozens of seconds) + try (LogsTailer.Watch watch = ce.logs().watch("Waiting for workers to finish in-progress tasks")) { + ce.triggerStop(); + watch.waitForLog(); + assertThat(ce.countInProgressTasks()).isEqualTo(1); + } + + // resume the in-progress task, so that it can + // finish successfully + try (LogsTailer.Watch watch = ce.logs().watch("Process [ce] is stopped")) { + watch.waitForLog(); + assertThat(ce.isTaskFinished()).isTrue(); + assertThat(ce.hasErrorLogs()).isTrue(); + } + } + } + + private class ComputeEngine implements AutoCloseable { + private final Orchestrator orchestrator; + private final File pauseFile; + private final WsClient adminWsClient; + private Thread stopper; + private final LogsTailer logsTailer; + + ComputeEngine() throws Exception { + pauseFile = temp.newFile(); + FileUtils.touch(pauseFile); + + orchestrator = Orchestrator.builderEnv() + .setServerProperty("sonar.ce.pauseTask.path", pauseFile.getAbsolutePath()) + .addPlugin(ItUtils.xooPlugin()) + .addPlugin(ItUtils.pluginArtifact("server-plugin")) + .build(); + orchestrator.start(); + adminWsClient = ItUtils.newAdminWsClient(orchestrator); + logsTailer = LogsTailer.builder() + .addFile(orchestrator.getServer().getCeLogs()) + .addFile(orchestrator.getServer().getAppLogs()) + .build(); + } + + LogsTailer logs() { + return logsTailer; + } + + void triggerTask() throws InterruptedException { + orchestrator.executeBuild(SonarScanner.create(new File("projects/shared/xoo-sample"), "sonar.projectKey", "foo"), false); + } + + void resumeTask() throws Exception { + FileUtils.forceDelete(pauseFile); + } + + int countInProgressTasks() { + return adminWsClient.ce().activityStatus(ActivityStatusWsRequest.newBuilder().build()).getInProgress(); + } + + boolean isTaskFinished() throws Exception { + String ceLogs = FileUtils.readFileToString(orchestrator.getServer().getCeLogs()); + return ceLogs.contains("Executed task | project=foo | type=REPORT"); + } + + boolean hasErrorLogs() throws IOException { + String ceLogs = FileUtils.readFileToString(orchestrator.getServer().getCeLogs()); + return ceLogs.contains(" ERROR "); + } + + /** + * non-blocking stop + */ + void triggerStop() { + checkState(stopper == null); + stopper = new Thread(orchestrator::stop); + stopper.start(); + } + + @Override + public void close() throws Exception { + if (stopper != null) { + stopper.interrupt(); + } + if (orchestrator != null) { + orchestrator.stop(); + } + } + } +} diff --git a/tests/src/test/java/org/sonarqube/tests/cluster/Cluster.java b/tests/src/test/java/org/sonarqube/tests/cluster/Cluster.java index 6c8416cbc8d..9340ce53d8a 100644 --- a/tests/src/test/java/org/sonarqube/tests/cluster/Cluster.java +++ b/tests/src/test/java/org/sonarqube/tests/cluster/Cluster.java @@ -17,301 +17,111 @@ * along with this program; if not, write to the Free Software Foundation, * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. */ - package org.sonarqube.tests.cluster; -import com.google.common.net.HostAndPort; import com.sonar.orchestrator.Orchestrator; import com.sonar.orchestrator.OrchestratorBuilder; -import com.sonar.orchestrator.util.NetworkUtils; -import java.net.Inet4Address; -import java.net.InetAddress; -import java.net.NetworkInterface; -import java.net.SocketException; import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.EnumSet; -import java.util.Enumeration; import java.util.List; -import java.util.Properties; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ForkJoinPool; import java.util.function.Consumer; -import java.util.function.Predicate; -import java.util.stream.Collectors; -import javax.annotation.CheckForNull; - -import static org.sonarqube.tests.cluster.Cluster.NodeType.APPLICATION; -import static org.sonarqube.tests.cluster.Cluster.NodeType.SEARCH; - -public class Cluster { - - protected static final String CLUSTER_ENABLED = "sonar.cluster.enabled"; - protected static final String CLUSTER_NODE_TYPE = "sonar.cluster.node.type"; - protected static final String CLUSTER_SEARCH_HOSTS = "sonar.cluster.search.hosts"; - protected static final String CLUSTER_HOSTS = "sonar.cluster.hosts"; - protected static final String CLUSTER_NODE_PORT = "sonar.cluster.node.port"; - protected static final String CLUSTER_NODE_HOST = "sonar.cluster.node.host"; - protected static final String CLUSTER_NAME = "sonar.cluster.name"; - - protected static final String SEARCH_HOST = "sonar.search.host"; - protected static final String SEARCH_PORT = "sonar.search.port"; - protected static final String SEARCH_JAVA_OPTS = "sonar.search.javaOpts"; - - protected static final String WEB_JAVA_OPTS = "sonar.web.javaOpts"; - protected static final String WEB_PORT = "sonar.web.port"; - - protected static final String CE_JAVA_OPTS = "sonar.ce.javaOpts"; +import java.util.stream.Stream; +import javax.annotation.Nullable; +import org.slf4j.LoggerFactory; - public enum NodeType { - SEARCH("search"), APPLICATION("application"); +import static java.util.stream.Collectors.joining; - final String value; +class Cluster implements AutoCloseable { - NodeType(String value) { - this.value = value; - } + @Nullable + private final String clusterName; - public String getValue() { - return value; - } + private final List<Node> nodes = new ArrayList<>(); - public static final EnumSet<NodeType> ALL = EnumSet.allOf(NodeType.class); + Cluster(@Nullable String name) { + this.clusterName = name; } - private final List<Node> nodes; - private final ForkJoinPool forkJoinPool = new ForkJoinPool(5); - - private Cluster(List<Node> nodes) { - this.nodes = nodes; - assignPorts(); - completeNodesConfiguration(); - buildOrchestrators(); + Node startNode(NodeConfig config, Consumer<OrchestratorBuilder> consumer) { + Node node = addNode(config, consumer); + node.start(); + return node; } - public void start() throws ExecutionException, InterruptedException { - forkJoinPool.submit( - () -> nodes.parallelStream().forEach( - node -> node.getOrchestrator().start() - ) - ).get(); - } + Node addNode(NodeConfig config, Consumer<OrchestratorBuilder> consumer) { + OrchestratorBuilder builder = newOrchestratorBuilder(config); - public void stop() throws ExecutionException, InterruptedException { - forkJoinPool.submit( - () -> nodes.parallelStream().forEach( - node -> node.getOrchestrator().stop() - ) - ).get(); - } - - public void stopAll(Predicate<Node> predicate) throws ExecutionException, InterruptedException { - forkJoinPool.submit( - () -> nodes.parallelStream() - .filter(predicate) - .forEach(n -> n.getOrchestrator().stop()) - ).get(); - } - - public void startAll(Predicate<Node> predicate) throws ExecutionException, InterruptedException { - forkJoinPool.submit( - () -> nodes.parallelStream() - .filter(predicate) - .forEach(n -> n.getOrchestrator().start()) - ).get(); - } - - public List<Node> getNodes() { - return Collections.unmodifiableList(nodes); - } - - private void assignPorts() { - nodes.stream().forEach( - node -> { - node.setHzPort(NetworkUtils.getNextAvailablePort(getNonloopbackIPv4Address())); - if (node.getType() == SEARCH) { - node.setEsPort(NetworkUtils.getNextAvailablePort(getNonloopbackIPv4Address())); - } else if (node.getType() == APPLICATION) { - node.setWebPort(NetworkUtils.getNextAvailablePort(getNonloopbackIPv4Address())); - } - } - ); - } - - public static InetAddress getNonloopbackIPv4Address() { - try { - Enumeration<NetworkInterface> nets = NetworkInterface.getNetworkInterfaces(); - for (NetworkInterface networkInterface : Collections.list(nets)) { - if (!networkInterface.isLoopback() && networkInterface.isUp() && !isBlackListed(networkInterface)) { - Enumeration<InetAddress> inetAddresses = networkInterface.getInetAddresses(); - while (inetAddresses.hasMoreElements()) { - InetAddress inetAddress = inetAddresses.nextElement(); - if (inetAddress instanceof Inet4Address) { - return inetAddress; - } - } - } - } - } catch (SocketException se) { - throw new RuntimeException("Cannot find a non loopback card required for tests", se); + switch (config.getType()) { + case SEARCH: + builder + .setServerProperty("sonar.cluster.node.type", "search") + .setServerProperty("sonar.search.host", config.getAddress().getHostAddress()) + .setServerProperty("sonar.search.port", "" + config.getSearchPort().get()) + .setServerProperty("sonar.search.javaOpts", "-Xmx64m -Xms64m -XX:+HeapDumpOnOutOfMemoryError"); + break; + case APPLICATION: + builder + .setServerProperty("sonar.cluster.node.type", "application") + .setServerProperty("sonar.web.host", config.getAddress().getHostAddress()) + .setServerProperty("sonar.web.port", "" + config.getWebPort().get()) + .setServerProperty("sonar.web.javaOpts", "-Xmx128m -Xms16m -XX:+HeapDumpOnOutOfMemoryError") + .setServerProperty("sonar.auth.jwtBase64Hs256Secret", "HrPSavOYLNNrwTY+SOqpChr7OwvbR/zbDLdVXRN0+Eg=") + .setServerProperty("sonar.ce.javaOpts", "-Xmx32m -Xms16m -XX:+HeapDumpOnOutOfMemoryError"); + break; } - throw new RuntimeException("Cannot find a non loopback card required for tests"); + consumer.accept(builder); + Orchestrator orchestrator = builder.build(); + Node node = new Node(config, orchestrator); + nodes.add(node); + return node; } - private static boolean isBlackListed(NetworkInterface networkInterface) { - return networkInterface.getName().startsWith("docker") || - networkInterface.getName().startsWith("vboxnet"); + Stream<Node> getNodes() { + return nodes.stream(); } - private void completeNodesConfiguration() { - String inet = getNonloopbackIPv4Address().getHostAddress(); - String clusterHosts = nodes.stream() - .map(node -> HostAndPort.fromParts(inet, node.getHzPort()).toString()) - .collect(Collectors.joining(",")); - String elasticsearchHosts = nodes.stream() - .filter(node -> node.getType() == SEARCH) - .map(node -> HostAndPort.fromParts(inet, node.getEsPort()).toString()) - .collect(Collectors.joining(",")); - - nodes.forEach( - node -> { - node.addProperty(CLUSTER_NODE_HOST, inet); - node.addProperty(CLUSTER_HOSTS, clusterHosts); - node.addProperty(CLUSTER_NODE_PORT, Integer.toString(node.getHzPort() == null ? -1 : node.getHzPort())); - node.addProperty(CLUSTER_SEARCH_HOSTS, elasticsearchHosts); - node.addProperty(SEARCH_PORT, Integer.toString(node.getEsPort() == null ? -1 : node.getEsPort())); - node.addProperty(SEARCH_HOST, inet); - node.addProperty(WEB_PORT, Integer.toString(node.getWebPort() == null ? -1 : node.getWebPort())); - node.addProperty(CLUSTER_NODE_TYPE, node.getType().getValue()); - } - ); + Stream<Node> getAppNodes() { + return nodes.stream().filter(n -> n.getConfig().getType() == NodeConfig.NodeType.APPLICATION); } - private void buildOrchestrators() { - nodes.stream().limit(1).forEach( - node -> buildOrchestrator(node, false) - ); - nodes.stream().skip(1).forEach( - node -> buildOrchestrator(node, true) - ); + Stream<Node> getSearchNodes() { + return nodes.stream().filter(n -> n.getConfig().getType() == NodeConfig.NodeType.SEARCH); } - private void buildOrchestrator(Node node, boolean keepDatabase) { - OrchestratorBuilder builder = Orchestrator.builderEnv() - .setOrchestratorProperty("orchestrator.keepDatabase", Boolean.toString(keepDatabase)) - .setStartupLogWatcher(new StartupLogWatcherImpl()); - - node.getProperties().entrySet().stream().forEach( - e -> builder.setServerProperty((String) e.getKey(), (String) e.getValue()) - ); - - node.setOrchestrator(builder.build()); + Node getAppNode(int index) { + return getAppNodes().skip(index).findFirst().orElseThrow(IllegalArgumentException::new); } - public static Builder builder() { - return new Builder(); + Node getSearchNode(int index) { + return getSearchNodes().skip(index).findFirst().orElseThrow(IllegalArgumentException::new); } - public static class Builder { - private final List<Node> nodes = new ArrayList<>(); - - public Cluster build() { - return new Cluster(nodes); - } - - public Builder addNode(NodeType type, Consumer<Node>... consumers) { - Node node = new Node(type); - Arrays.stream(consumers).forEach(c -> c.accept(node)); - nodes.add(node); - return this; + @Override + public void close() throws Exception { + // nodes are stopped in order of creation + for (Node node : nodes) { + try { + node.stop(); + } catch (Exception e) { + LoggerFactory.getLogger(getClass()).error("Fail to stop node", e); + } } } - /** - * A cluster node - */ - public static class Node { - private final NodeType type; - private Integer webPort; - private Integer esPort; - private Integer hzPort; - private Orchestrator orchestrator = null; - private Properties properties = new Properties(); - - public Node(NodeType type) { - this.type = type; - - // Default properties - properties.setProperty(CLUSTER_ENABLED, "true"); - properties.setProperty(CLUSTER_NAME, "sonarqube"); - properties.setProperty(CE_JAVA_OPTS, "-Xmx256m"); - properties.setProperty(WEB_JAVA_OPTS, "-Xmx256m"); - properties.setProperty(SEARCH_JAVA_OPTS, "-Xmx256m -Xms256m " + - "-XX:+UseConcMarkSweepGC " + - "-XX:CMSInitiatingOccupancyFraction=75 " + - "-XX:+UseCMSInitiatingOccupancyOnly " + - "-XX:+AlwaysPreTouch " + - "-server " + - "-Xss1m " + - "-Djava.awt.headless=true " + - "-Dfile.encoding=UTF-8 " + - "-Djna.nosys=true " + - "-Djdk.io.permissionsUseCanonicalPath=true " + - "-Dio.netty.noUnsafe=true " + - "-Dio.netty.noKeySetOptimization=true " + - "-Dio.netty.recycler.maxCapacityPerThread=0 " + - "-Dlog4j.shutdownHookEnabled=false " + - "-Dlog4j2.disable.jmx=true " + - "-Dlog4j.skipJansi=true " + - "-XX:+HeapDumpOnOutOfMemoryError"); - } - - public Properties getProperties() { - return properties; - } - - public Orchestrator getOrchestrator() { - return orchestrator; - } - - private void setOrchestrator(Orchestrator orchestrator) { - this.orchestrator = orchestrator; - } - - public NodeType getType() { - return type; + private OrchestratorBuilder newOrchestratorBuilder(NodeConfig node) { + OrchestratorBuilder builder = Orchestrator.builderEnv(); + builder.setOrchestratorProperty("orchestrator.keepDatabase", "true"); + builder.setServerProperty("sonar.cluster.enabled", "true"); + builder.setServerProperty("sonar.cluster.node.host", node.getAddress().getHostAddress()); + builder.setServerProperty("sonar.cluster.node.port", "" + node.getHzPort()); + builder.setServerProperty("sonar.cluster.hosts", node.getConnectedNodes().stream().map(NodeConfig::getHzHost).collect(joining(","))); + builder.setServerProperty("sonar.cluster.search.hosts", node.getSearchNodes().stream().map(NodeConfig::getSearchHost).collect(joining(","))); + if (clusterName != null) { + builder.setServerProperty("sonar.cluster.name", clusterName); } - - @CheckForNull - public Integer getWebPort() { - return webPort; - } - - @CheckForNull - public Integer getEsPort() { - return esPort; - } - - @CheckForNull - public Integer getHzPort() { - return hzPort; - } - - private void setWebPort(Integer webPort) { - this.webPort = webPort; - } - - private void setEsPort(Integer esPort) { - this.esPort = esPort; - } - - private void setHzPort(Integer hzPort) { - this.hzPort = hzPort; - } - - private void addProperty(String key, String value) { - properties.setProperty(key, value); + if (node.getName().isPresent()) { + builder.setServerProperty("sonar.cluster.node.name", node.getName().get()); } + builder.setStartupLogWatcher(logLine -> true); + return builder; } } diff --git a/tests/src/test/java/org/sonarqube/tests/cluster/ClusterTest.java b/tests/src/test/java/org/sonarqube/tests/cluster/ClusterTest.java new file mode 100644 index 00000000000..0cf07e3fda5 --- /dev/null +++ b/tests/src/test/java/org/sonarqube/tests/cluster/ClusterTest.java @@ -0,0 +1,261 @@ +/* + * SonarQube + * Copyright (C) 2009-2017 SonarSource SA + * mailto:info AT sonarsource DOT com + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 3 of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with this program; if not, write to the Free Software Foundation, + * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ +package org.sonarqube.tests.cluster; + +import com.sonar.orchestrator.Orchestrator; +import com.sonar.orchestrator.OrchestratorBuilder; +import com.sonar.orchestrator.db.DefaultDatabase; +import java.util.ArrayList; +import java.util.List; +import java.util.function.BinaryOperator; +import java.util.function.Consumer; +import java.util.stream.IntStream; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.DisableOnDebug; +import org.junit.rules.ExpectedException; +import org.junit.rules.TestRule; +import org.junit.rules.Timeout; + +import static com.google.common.base.Preconditions.checkState; +import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.Assert.fail; +import static org.sonarqube.tests.cluster.NodeConfig.newApplicationConfig; +import static org.sonarqube.tests.cluster.NodeConfig.newSearchConfig; + +public class ClusterTest { + + @Rule + public TestRule safeguard = new DisableOnDebug(Timeout.seconds(300)); + + @Rule + public ExpectedException expectedException = ExpectedException.none(); + + @BeforeClass + public static void initDbSchema() throws Exception { + Orchestrator orchestrator = Orchestrator.builderEnv() + // enforce (re-)creation of database schema + .setOrchestratorProperty("orchestrator.keepDatabase", "false") + .build(); + DefaultDatabase db = new DefaultDatabase(orchestrator.getConfiguration()); + checkState(!db.getClient().getDialect().equals("h2"), "H2 is not supported in cluster mode"); + db.start(); + db.stop(); + } + + @Test + public void test_high_availability_topology() throws Exception { + try (Cluster cluster = newCluster(3, 2)) { + cluster.getNodes().forEach(Node::start); + cluster.getAppNodes().forEach(Node::waitForStatusUp); + + // TODO verify cluster health to be green + // TODO verify that ES cluster is green + + cluster.getNodes().forEach(node -> { + node.assertThatProcessesAreUp(); + assertThat(node.anyLogsContain(" ERROR ")).isFalse(); + assertThat(node.anyLogsContain("MessageException")).isFalse(); + }); + + // verify that there's a single web startup leader + Node startupLeader = cluster.getAppNodes() + .filter(Node::isStartupLeader) + .reduce(singleElement()) + .get(); + assertThat(startupLeader.hasStartupLeaderOperations()).isTrue(); + assertThat(startupLeader.hasCreatedSearchIndices()).isTrue(); + + // verify that the second app node is a startup follower + Node startupFollower = cluster.getAppNodes() + .filter(Node::isStartupFollower) + .reduce(singleElement()) + .get(); + assertThat(startupFollower.hasStartupLeaderOperations()).isFalse(); + assertThat(startupFollower.hasCreatedSearchIndices()).isFalse(); + + cluster.getAppNodes().forEach(app -> { + // compute engine is being started when app node is already in status UP + app.waitForCeLogsContain("Compute Engine is operational"); + assertThat(app.anyLogsContain("Process[ce] is up")).isTrue(); + }); + } + } + + @Test + public void minimal_cluster_is_2_search_and_1_application_nodes() throws Exception { + try (Cluster cluster = newCluster(2, 1)) { + cluster.getNodes().forEach(Node::start); + + Node app = cluster.getAppNode(0); + app.waitForStatusUp(); + app.waitForCeLogsContain("Compute Engine is operational"); + assertThat(app.isStartupLeader()).isTrue(); + assertThat(app.hasStartupLeaderOperations()).isTrue(); + + // TODO verify cluster health to be yellow + // TODO verify that ES cluster is yellow + + cluster.getNodes().forEach(node -> { + assertThat(node.anyLogsContain(" ERROR ")).isFalse(); + node.assertThatProcessesAreUp(); + }); + } + } + + @Test + public void configuration_of_connection_to_other_nodes_can_be_non_exhaustive() throws Exception { + try (Cluster cluster = new Cluster(null)) { + NodeConfig searchConfig1 = newSearchConfig(); + NodeConfig searchConfig2 = newSearchConfig(); + NodeConfig appConfig = newApplicationConfig(); + + // HZ bus : app -> search 2 -> search1, which is not recommended at all !!! + searchConfig2.addConnectionToBus(searchConfig1); + appConfig.addConnectionToBus(searchConfig2); + + // search1 is not configured to connect search2 + // app is not configured to connect to search 1 + // --> not recommended at all !!! + searchConfig2.addConnectionToSearch(searchConfig1); + appConfig.addConnectionToSearch(searchConfig2); + + cluster.startNode(searchConfig1, nothing()); + cluster.startNode(searchConfig2, nothing()); + Node app = cluster.startNode(appConfig, nothing()); + + app.waitForStatusUp(); + assertThat(app.isStartupLeader()).isTrue(); + assertThat(app.hasStartupLeaderOperations()).isTrue(); + // TODO verify cluster health to be yellow + // TODO verify that ES cluster is yellow + + // no errors + cluster.getNodes().forEach(node -> { + assertThat(node.anyLogsContain(" ERROR ")).isFalse(); + node.assertThatProcessesAreUp(); + }); + } + } + + @Test + public void node_fails_to_join_cluster_if_different_cluster_name() throws Exception { + try (Cluster cluster = new Cluster("foo")) { + NodeConfig searchConfig1 = newSearchConfig(); + NodeConfig searchConfig2 = newSearchConfig(); + NodeConfig.interconnectBus(searchConfig1, searchConfig2); + NodeConfig.interconnectSearch(searchConfig1, searchConfig2); + cluster.startNode(searchConfig1, nothing()); + cluster.startNode(searchConfig2, nothing()); + + NodeConfig searchConfig3 = newSearchConfig() + .addConnectionToSearch(searchConfig1) + .addConnectionToBus(searchConfig1, searchConfig2); + Node search3 = cluster.addNode(searchConfig3, b -> b + .setServerProperty("sonar.cluster.name", "bar") + .setStartupLogWatcher(logLine -> logLine.contains("SonarQube is up"))); + try { + search3.start(); + fail(); + } catch (IllegalStateException e) { + assertThat(e).hasMessage("Server startup failure"); + // TODO how to force process to write into sonar.log, even if sonar.log.console=true ? + // assertThat(search3.anyLogsContain("This node has a cluster name [bar], which does not match [foo] from the cluster")).isTrue(); + } + } + } + + @Test + public void restarting_all_application_nodes_elects_a_new_startup_leader() throws Exception { + // no need for 3 search nodes, 2 is enough for the test + try (Cluster cluster = newCluster(2, 2)) { + cluster.getNodes().forEach(Node::start); + cluster.getAppNodes().forEach(Node::waitForStatusUp); + + // stop application nodes only + cluster.getAppNodes().forEach(app -> { + app.stop(); + app.cleanUpLogs(); + // logs are empty, no more possible to know if node was startup leader/follower + assertThat(app.isStartupLeader()).isFalse(); + assertThat(app.isStartupFollower()).isFalse(); + }); + + // restart application nodes + cluster.getAppNodes().forEach(Node::start); + cluster.getAppNodes().forEach(Node::waitForStatusUp); + + // one app node is elected as startup leader. It does some initialization stuff, + // like registration of rules. Search indices already exist and are up-to-date. + Node startupLeader = cluster.getAppNodes() + .filter(Node::isStartupLeader) + .reduce(singleElement()) + .get(); + assertThat(startupLeader.hasStartupLeaderOperations()).isTrue(); + assertThat(startupLeader.hasCreatedSearchIndices()).isFalse(); + + Node startupFollower = cluster.getAppNodes() + .filter(Node::isStartupFollower) + .reduce(singleElement()) + .get(); + assertThat(startupFollower.hasStartupLeaderOperations()).isFalse(); + assertThat(startupFollower.hasCreatedSearchIndices()).isFalse(); + assertThat(startupFollower).isNotSameAs(startupLeader); + } + } + + /** + * Used to have non-blocking {@link Node#start()}. Orchestrator considers + * node to be up as soon as the first log is generated. + */ + private static Consumer<OrchestratorBuilder> nothing() { + return b -> { + }; + } + + /** + * Configure a cluster with recommended configuration (each node has references + * to other nodes) + */ + private static Cluster newCluster(int nbOfSearchNodes, int nbOfAppNodes) { + Cluster cluster = new Cluster(null); + + List<NodeConfig> configs = new ArrayList<>(); + IntStream.range(0, nbOfSearchNodes).forEach(i -> configs.add(newSearchConfig())); + IntStream.range(0, nbOfAppNodes).forEach(i -> configs.add(newApplicationConfig())); + NodeConfig[] configsArray = configs.toArray(new NodeConfig[configs.size()]); + + // a node is connected to all nodes, including itself (see sonar.cluster.hosts) + NodeConfig.interconnectBus(configsArray); + + // search nodes are interconnected, and app nodes connect to all search nodes + NodeConfig.interconnectSearch(configsArray); + + configs.forEach(c -> cluster.addNode(c, nothing())); + return cluster; + } + + private static BinaryOperator<Node> singleElement() { + return (a, b) -> { + throw new IllegalStateException("More than one element"); + }; + } +} diff --git a/tests/src/test/java/org/sonarqube/tests/cluster/DataCenterEdition.java b/tests/src/test/java/org/sonarqube/tests/cluster/DataCenterEdition.java deleted file mode 100644 index b431bcf90bb..00000000000 --- a/tests/src/test/java/org/sonarqube/tests/cluster/DataCenterEdition.java +++ /dev/null @@ -1,53 +0,0 @@ -/* - * SonarQube - * Copyright (C) 2009-2017 SonarSource SA - * mailto:info AT sonarsource DOT com - * - * This program is free software; you can redistribute it and/or - * modify it under the terms of the GNU Lesser General Public - * License as published by the Free Software Foundation; either - * version 3 of the License, or (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - * Lesser General Public License for more details. - * - * You should have received a copy of the GNU Lesser General Public License - * along with this program; if not, write to the Free Software Foundation, - * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. - */ - -package org.sonarqube.tests.cluster; - -import java.util.concurrent.ExecutionException; - -import static org.sonarqube.tests.cluster.Cluster.NodeType.APPLICATION; -import static org.sonarqube.tests.cluster.Cluster.NodeType.SEARCH; - -public class DataCenterEdition { - - private final Cluster cluster; - - public DataCenterEdition() { - cluster = Cluster.builder() - .addNode(SEARCH) - .addNode(SEARCH) - .addNode(SEARCH) - .addNode(APPLICATION) - .addNode(APPLICATION) - .build(); - } - - public void stop() throws ExecutionException, InterruptedException { - cluster.stop(); - } - - public void start() throws ExecutionException, InterruptedException { - cluster.start(); - } - - public Cluster getCluster() { - return cluster; - } -} diff --git a/tests/src/test/java/org/sonarqube/tests/cluster/DataCenterEditionTest.java b/tests/src/test/java/org/sonarqube/tests/cluster/DataCenterEditionTest.java deleted file mode 100644 index 52f7930e434..00000000000 --- a/tests/src/test/java/org/sonarqube/tests/cluster/DataCenterEditionTest.java +++ /dev/null @@ -1,157 +0,0 @@ -/* - * SonarQube - * Copyright (C) 2009-2017 SonarSource SA - * mailto:info AT sonarsource DOT com - * - * This program is free software; you can redistribute it and/or - * modify it under the terms of the GNU Lesser General Public - * License as published by the Free Software Foundation; either - * version 3 of the License, or (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - * Lesser General Public License for more details. - * - * You should have received a copy of the GNU Lesser General Public License - * along with this program; if not, write to the Free Software Foundation, - * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. - */ - -package org.sonarqube.tests.cluster; - -import com.sonar.orchestrator.db.Database; -import com.sonar.orchestrator.db.DatabaseClient; -import java.io.IOException; -import java.net.InetAddress; -import java.net.ServerSocket; -import java.sql.Connection; -import java.sql.SQLException; -import java.sql.Statement; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.TimeUnit; -import javax.annotation.Nullable; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.DisableOnDebug; -import org.junit.rules.TestRule; -import org.junit.rules.Timeout; - -import static org.assertj.core.api.Assertions.assertThat; -import static org.assertj.core.api.Assertions.tuple; -import static org.junit.Assert.fail; -import static org.sonarqube.tests.cluster.Cluster.NodeType.APPLICATION; -import static org.sonarqube.tests.cluster.Cluster.NodeType.SEARCH; - -public class DataCenterEditionTest { - - @Rule - public TestRule timeout = new DisableOnDebug(Timeout.builder() - .withLookingForStuckThread(true) - .withTimeout(5, TimeUnit.MINUTES) - .build()); - - @Test - public void launch() throws ExecutionException, InterruptedException { - DataCenterEdition dce = new DataCenterEdition(); - Cluster cluster = dce.getCluster(); - dce.start(); - assertThat(cluster.getNodes()) - .extracting(Cluster.Node::getType, n -> isPortBound(false, n.getEsPort()), n -> isPortBound(true, n.getWebPort())) - .containsExactlyInAnyOrder( - tuple(SEARCH, true, false), - tuple(SEARCH, true, false), - tuple(SEARCH, true, false), - tuple(APPLICATION, false, true), - tuple(APPLICATION, false, true) - ); - dce.stop(); - } - - @Test - public void upgrade_application_nodes_without_stopping_search_nodes_must_work() throws ExecutionException, InterruptedException, SQLException { - DataCenterEdition dce = new DataCenterEdition(); - Cluster cluster = dce.getCluster(); - dce.start(); - - // Stop all Application nodes - cluster.stopAll(n -> n.getType() == APPLICATION); - - // Drop the schema - Database database = cluster.getNodes().get(0).getOrchestrator().getDatabase(); - dropAndCreate(database.getClient()); - assertDatabaseDropped(database); - - // Start all Application nodes - cluster.startAll(n -> n.getType() == APPLICATION); - - // We are expecting a new leader to be elected which will recreate the database - assertDatabaseInitialized(database); - - dce.stop(); - } - - @Test - public void using_different_cluster_names_should_fail() throws ExecutionException, InterruptedException, SQLException { - Cluster cluster = Cluster.builder() - .addNode(SEARCH, n -> n.getProperties().setProperty(Cluster.CLUSTER_NAME, "goodClusterName")) - .addNode(SEARCH, n -> n.getProperties().setProperty(Cluster.CLUSTER_NAME, "goodClusterName")) - .addNode(SEARCH, n -> n.getProperties().setProperty(Cluster.CLUSTER_NAME, "goodClusterName")) - .addNode(APPLICATION, n -> n.getProperties().setProperty(Cluster.CLUSTER_NAME, "goodClusterName")) - .addNode(APPLICATION, n -> n.getProperties().setProperty(Cluster.CLUSTER_NAME, "badClusterName")) - .build(); - cluster.startAll(n -> "goodClusterName".equals(n.getProperties().getProperty(Cluster.CLUSTER_NAME))); - - try { - cluster.startAll(n -> "badClusterName".equals(n.getProperties().getProperty(Cluster.CLUSTER_NAME))); - fail("A node with a bad cluster name was able to join the cluster"); - } catch (Exception e) { - // we expect, that joining the cluster fails - System.out.println(e); - } - } - - private void assertDatabaseInitialized(Database database) { - assertThat(countRowsOfMigration(database)).isGreaterThan(0); - } - - private int countRowsOfMigration(Database database) { - return database.countSql("select count(*) from schema_migrations"); - } - - private void assertDatabaseDropped(Database database) { - try { - countRowsOfMigration(database); - fail("Table 'schema_migrations' has not been dropped"); - } catch (Exception e) { - // we expect the table to not exist - } - } - - private static boolean isPortBound(boolean loopback, @Nullable Integer port) { - if (port == null) { - return false; - } - InetAddress inetAddress = loopback ? InetAddress.getLoopbackAddress() : Cluster.getNonloopbackIPv4Address(); - try (ServerSocket socket = new ServerSocket(port, 50, inetAddress)) { - throw new IllegalStateException("A port was set explicitly, but was not bound (port="+port+")"); - } catch (IOException e) { - return true; - } - } - - private static void dropAndCreate(DatabaseClient databaseClient) throws SQLException { - try (Connection connection = databaseClient.openRootConnection()) { - executeDdl(connection, databaseClient.getDropDdl()); - executeDdl(connection, databaseClient.getCreateDdl()); - } - } - - private static void executeDdl(Connection connection, String... ddls) throws SQLException { - try (Statement stmt = connection.createStatement()) { - for (String ddl : ddls) { - stmt.executeUpdate(ddl); - } - } - } -} diff --git a/tests/src/test/java/org/sonarqube/tests/cluster/Node.java b/tests/src/test/java/org/sonarqube/tests/cluster/Node.java new file mode 100644 index 00000000000..03dd4daf456 --- /dev/null +++ b/tests/src/test/java/org/sonarqube/tests/cluster/Node.java @@ -0,0 +1,185 @@ +/* + * SonarQube + * Copyright (C) 2009-2017 SonarSource SA + * mailto:info AT sonarsource DOT com + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 3 of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with this program; if not, write to the Free Software Foundation, + * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ +package org.sonarqube.tests.cluster; + +import com.google.common.util.concurrent.Uninterruptibles; +import com.sonar.orchestrator.Orchestrator; +import java.io.File; +import java.io.IOException; +import java.net.ServerSocket; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import javax.annotation.Nullable; +import org.apache.commons.io.FileUtils; +import util.ItUtils; + +import static org.assertj.core.api.Assertions.assertThat; + +class Node { + + private final NodeConfig config; + private final Orchestrator orchestrator; + + Node(NodeConfig config, Orchestrator orchestrator) { + this.config = config; + this.orchestrator = orchestrator; + } + + NodeConfig getConfig() { + return config; + } + + /** + * Non-blocking startup of node. The method does not wait for + * node to be started because Orchestrator uses a StartupLogWatcher + * that returns as soon as a log is generated. + */ + void start() { + orchestrator.start(); + } + + void stop() { + orchestrator.stop(); + } + + void cleanUpLogs() { + if (orchestrator.getServer() != null) { + FileUtils.deleteQuietly(orchestrator.getServer().getWebLogs()); + FileUtils.deleteQuietly(orchestrator.getServer().getCeLogs()); + FileUtils.deleteQuietly(orchestrator.getServer().getEsLogs()); + FileUtils.deleteQuietly(orchestrator.getServer().getAppLogs()); + } + } + + boolean isStartupLeader() { + return webLogsContain("Cluster enabled (startup leader)"); + } + + boolean isStartupFollower() { + return webLogsContain("Cluster enabled (startup follower)"); + } + + void waitForStatusUp() { + waitForStatus("UP"); + } + + void waitForStatus(String expectedStatus) { + String status = null; + try { + while (!expectedStatus.equals(status)) { + if (orchestrator.getServer() != null) { + try { + Map<String, Object> json = ItUtils.jsonToMap(orchestrator.getServer().newHttpCall("api/system/status").executeUnsafely().getBodyAsString()); + status = (String) json.get("status"); + } catch (Exception e) { + // ignored + } + } + + Thread.sleep(500); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + + void assertThatProcessesAreUp() { + assertThat(arePortsBound()).as(getConfig().getType().toString()).isTrue(); + switch (config.getType()) { + case SEARCH: + assertThat(anyLogsContain("Process[es] is up")).isTrue(); + assertThat(anyLogsContain("Process[web] is up")).isFalse(); + assertThat(anyLogsContain("Elasticsearch cluster enabled")).isTrue(); + break; + case APPLICATION: + assertThat(anyLogsContain("Process[es] is up")).isFalse(); + assertThat(anyLogsContain("Process[web] is up")).isTrue(); + assertThat(anyLogsContain("Elasticsearch cluster enabled")).isFalse(); + break; + } + } + + void waitForCeLogsContain(String expectedMessage) { + boolean found = false; + while (!found) { + found = orchestrator.getServer() != null && fileContains(orchestrator.getServer().getCeLogs(), expectedMessage); + if (!found) { + Uninterruptibles.sleepUninterruptibly(1_000, TimeUnit.MILLISECONDS); + } + } + } + + boolean hasStartupLeaderOperations() throws IOException { + if (orchestrator.getServer() == null) { + return false; + } + String logs = FileUtils.readFileToString(orchestrator.getServer().getWebLogs()); + return logs.contains("Register metrics") && + logs.contains("Register rules"); + } + + boolean hasCreatedSearchIndices() throws IOException { + if (orchestrator.getServer() == null) { + return false; + } + String logs = FileUtils.readFileToString(orchestrator.getServer().getWebLogs()); + return logs.contains("[o.s.s.e.IndexCreator] Create index"); + } + + boolean anyLogsContain(String message) { + if (orchestrator.getServer() == null) { + return false; + } + return fileContains(orchestrator.getServer().getAppLogs(), message) || + fileContains(orchestrator.getServer().getWebLogs(), message) || + fileContains(orchestrator.getServer().getEsLogs(), message) || + fileContains(orchestrator.getServer().getCeLogs(), message); + } + + private boolean webLogsContain(String message) { + if (orchestrator.getServer() == null) { + return false; + } + return fileContains(orchestrator.getServer().getWebLogs(), message); + } + + private static boolean fileContains(@Nullable File logFile, String message) { + try { + return logFile != null && logFile.exists() && FileUtils.readFileToString(logFile).contains(message); + } catch (Exception e) { + throw new IllegalStateException(e); + } + } + + private boolean arePortsBound() { + return isPortBound(config.getHzPort()) && + config.getSearchPort().map(this::isPortBound).orElse(true) && + config.getWebPort().map(this::isPortBound).orElse(true); + } + + private boolean isPortBound(int port) { + try (ServerSocket socket = new ServerSocket(port, 50, config.getAddress())) { + return false; + } catch (IOException e) { + return true; + } + } + +} diff --git a/tests/src/test/java/org/sonarqube/tests/cluster/NodeConfig.java b/tests/src/test/java/org/sonarqube/tests/cluster/NodeConfig.java new file mode 100644 index 00000000000..df3d2ed7dcf --- /dev/null +++ b/tests/src/test/java/org/sonarqube/tests/cluster/NodeConfig.java @@ -0,0 +1,187 @@ +/* + * SonarQube + * Copyright (C) 2009-2017 SonarSource SA + * mailto:info AT sonarsource DOT com + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 3 of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with this program; if not, write to the Free Software Foundation, + * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ +package org.sonarqube.tests.cluster; + +import com.sonar.orchestrator.util.NetworkUtils; +import java.net.Inet4Address; +import java.net.InetAddress; +import java.net.NetworkInterface; +import java.net.SocketException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.Enumeration; +import java.util.List; +import java.util.Optional; +import javax.annotation.Nullable; + +import static com.google.common.base.Preconditions.checkArgument; + +class NodeConfig { + + enum NodeType { + SEARCH("search"), APPLICATION("application"); + + final String value; + + NodeType(String value) { + this.value = value; + } + + String getValue() { + return value; + } + } + + private final NodeType type; + @Nullable + private final String name; + private final InetAddress address; + private final int hzPort; + @Nullable + private final Integer searchPort; + @Nullable + private final Integer webPort; + private final List<NodeConfig> connectedNodes = new ArrayList<>(); + private final List<NodeConfig> searchNodes = new ArrayList<>(); + + private NodeConfig(NodeType type, @Nullable String name) { + this.type = type; + this.name = name; + this.address = getNonLoopbackIpv4Address(); + this.hzPort = NetworkUtils.getNextAvailablePort(this.address); + this.connectedNodes.add(this); + switch (type) { + case SEARCH: + this.searchPort = NetworkUtils.getNextAvailablePort(this.address); + this.webPort = null; + this.searchNodes.add(this); + break; + case APPLICATION: + this.searchPort = null; + this.webPort = NetworkUtils.getNextAvailablePort(this.address); + break; + default: + throw new IllegalArgumentException(); + } + } + + NodeType getType() { + return type; + } + + Optional<String> getName() { + return Optional.ofNullable(name); + } + + InetAddress getAddress() { + return address; + } + + int getHzPort() { + return hzPort; + } + + Optional<Integer> getSearchPort() { + return Optional.ofNullable(searchPort); + } + + Optional<Integer> getWebPort() { + return Optional.ofNullable(webPort); + } + + String getHzHost() { + return address.getHostAddress() + ":" + hzPort; + } + + String getSearchHost() { + return address.getHostAddress() + ":" + searchPort; + } + + NodeConfig addConnectionToBus(NodeConfig... configs) { + connectedNodes.addAll(Arrays.asList(configs)); + return this; + } + + NodeConfig addConnectionToSearch(NodeConfig... configs) { + Arrays.stream(configs).forEach(config -> { + checkArgument(config.getType() == NodeType.SEARCH); + searchNodes.add(config); + }); + return this; + } + + List<NodeConfig> getConnectedNodes() { + return connectedNodes; + } + + List<NodeConfig> getSearchNodes() { + return searchNodes; + } + + static NodeConfig newApplicationConfig() { + return new NodeConfig(NodeType.APPLICATION, null); + } + + static NodeConfig newSearchConfig() { + return new NodeConfig(NodeType.SEARCH, null); + } + + /** + * See property sonar.cluster.hosts + */ + static void interconnectBus(NodeConfig... configs) { + Arrays.stream(configs).forEach(config -> Arrays.stream(configs).filter(c -> c != config).forEach(config::addConnectionToBus)); + } + + /** + * See property sonar.cluster.search.hosts + */ + static void interconnectSearch(NodeConfig... configs) { + Arrays.stream(configs).forEach(config -> Arrays.stream(configs) + .filter(c -> c.getType() == NodeType.SEARCH) + .forEach(config::addConnectionToSearch)); + } + + private static InetAddress getNonLoopbackIpv4Address() { + try { + Enumeration<NetworkInterface> nets = NetworkInterface.getNetworkInterfaces(); + for (NetworkInterface networkInterface : Collections.list(nets)) { + if (!networkInterface.isLoopback() && networkInterface.isUp() && !isBlackListed(networkInterface)) { + Enumeration<InetAddress> inetAddresses = networkInterface.getInetAddresses(); + while (inetAddresses.hasMoreElements()) { + InetAddress inetAddress = inetAddresses.nextElement(); + if (inetAddress instanceof Inet4Address) { + return inetAddress; + } + } + } + } + } catch (SocketException se) { + throw new RuntimeException("Cannot find a non loopback card required for tests", se); + } + throw new RuntimeException("Cannot find a non loopback card required for tests"); + } + + private static boolean isBlackListed(NetworkInterface networkInterface) { + return networkInterface.getName().startsWith("docker") || + networkInterface.getName().startsWith("vboxnet"); + } +} diff --git a/tests/src/test/java/org/sonarqube/tests/serverSystem/ClusterTest.java b/tests/src/test/java/org/sonarqube/tests/serverSystem/ClusterTest.java deleted file mode 100644 index ff110377844..00000000000 --- a/tests/src/test/java/org/sonarqube/tests/serverSystem/ClusterTest.java +++ /dev/null @@ -1,215 +0,0 @@ -/* - * SonarQube - * Copyright (C) 2009-2017 SonarSource SA - * mailto:info AT sonarsource DOT com - * - * This program is free software; you can redistribute it and/or - * modify it under the terms of the GNU Lesser General Public - * License as published by the Free Software Foundation; either - * version 3 of the License, or (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - * Lesser General Public License for more details. - * - * You should have received a copy of the GNU Lesser General Public License - * along with this program; if not, write to the Free Software Foundation, - * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. - */ -package org.sonarqube.tests.serverSystem; - -import com.google.common.base.Joiner; -import com.google.common.collect.ImmutableMap; -import com.sonar.orchestrator.Orchestrator; -import com.sonar.orchestrator.server.StartupLogWatcher; -import java.io.File; -import java.io.FileInputStream; -import java.io.FileOutputStream; -import java.io.IOException; -import java.nio.file.Files; -import java.util.List; -import java.util.Map; -import java.util.Properties; -import java.util.regex.Matcher; -import java.util.regex.Pattern; -import java.util.stream.Collectors; -import java.util.stream.Stream; -import org.apache.commons.io.FileUtils; -import org.apache.commons.lang.StringUtils; -import org.junit.Ignore; -import org.junit.Test; -import org.sonarqube.ws.Issues; -import org.sonarqube.ws.Settings; -import org.sonarqube.ws.client.rule.SearchWsRequest; -import org.sonarqube.ws.client.setting.ValuesRequest; -import util.ItUtils; - -import static org.apache.commons.lang3.StringUtils.containsIgnoreCase; -import static org.assertj.core.api.Assertions.assertThat; -import static util.ItUtils.newWsClient; - -@Ignore("temporarily ignored") -public class ClusterTest { - - private static final String CONF_FILE_PATH = "conf/sonar.properties"; - - /** - * SONAR-7899 - */ - @Test - public void secondary_nodes_do_not_write_to_datastores_at_startup() throws Exception { - // start "startup leader", which creates and populates datastores - Orchestrator orchestrator = Orchestrator.builderEnv() - .setServerProperty("sonar.cluster.enabled", "true") - .setServerProperty("sonar.cluster.name", "secondary_nodes_do_not_write_to_datastores_at_startup") - .setServerProperty("sonar.cluster.web.startupLeader", "true") - .setServerProperty("sonar.log.level", "TRACE") - .addPlugin(ItUtils.xooPlugin()) - .build(); - orchestrator.start(); - - expectLog(orchestrator, "Cluster enabled (startup leader)"); - expectWriteOperations(orchestrator, true); - // verify that datastores are populated by requesting rules - assertThat(newWsClient(orchestrator).rules().search(new SearchWsRequest()).getTotal()).isGreaterThan(0); - - FileUtils.write(orchestrator.getServer().getWebLogs(), "", false); - updateSonarPropertiesFile(orchestrator, ImmutableMap.of("sonar.cluster.web.startupLeader", "false")); - orchestrator.restartServer(); - - expectLog(orchestrator, "Cluster enabled (startup follower)"); - expectWriteOperations(orchestrator, false); - - orchestrator.stop(); - } - - @Test - public void start_cluster_of_elasticsearch_and_web_nodes() throws IOException { - Orchestrator elasticsearch = null; - Orchestrator web = null; - - try { - ElasticsearchStartupWatcher esWatcher = new ElasticsearchStartupWatcher(); - elasticsearch = Orchestrator.builderEnv() - .setServerProperty("sonar.cluster.enabled", "true") - .setServerProperty("sonar.cluster.name", "start_cluster_of_elasticsearch_and_web_nodes") - .setServerProperty("sonar.cluster.web.disabled", "true") - .setServerProperty("sonar.cluster.ce.disabled", "true") - .setStartupLogWatcher(esWatcher) - .build(); - elasticsearch.start(); - assertThat(esWatcher.port).isGreaterThan(0); - assertThat(FileUtils.readFileToString(elasticsearch.getServer().getAppLogs())).doesNotContain("Process[web]"); - - web = Orchestrator.builderEnv() - .setServerProperty("sonar.cluster.enabled", "true") - .setServerProperty("sonar.cluster.name", "start_cluster_of_elasticsearch_and_web_nodes") - .setServerProperty("sonar.cluster.web.startupLeader", "true") - .setServerProperty("sonar.cluster.search.disabled", "true") - .setServerProperty("sonar.cluster.search.hosts", "localhost:" + esWatcher.port) - // no need for compute engine in this test. Disable it for faster test. - .setServerProperty("sonar.cluster.ce.disabled", "true") - // override the default watcher provided by Orchestrator - // which waits for Compute Engine to be up - .setStartupLogWatcher(log -> log.contains("SonarQube is up")) - .build(); - web.start(); - - String coreId = getPropertyValue(web, "sonar.core.id"); - String startTime = getPropertyValue(web, "sonar.core.startTime"); - - assertThat(FileUtils.readFileToString(web.getServer().getAppLogs())).doesNotContain("Process[es]"); - // call a web service that requires Elasticsearch - Issues.SearchWsResponse wsResponse = newWsClient(web).issues().search(new org.sonarqube.ws.client.issue.SearchWsRequest()); - assertThat(wsResponse.getIssuesCount()).isEqualTo(0); - - web.restartServer(); - - // sonar core id must not change after restart - assertThat(getPropertyValue(web, "sonar.core.id")).isEqualTo(coreId); - // startTime must change at each startup - assertThat(getPropertyValue(web, "sonar.core.startTime")).isNotEqualTo(startTime); - } finally { - if (web != null) { - web.stop(); - } - if (elasticsearch != null) { - elasticsearch.stop(); - } - } - } - - private static String getPropertyValue(Orchestrator web, String property) { - Settings.ValuesWsResponse response = ItUtils.newAdminWsClient(web).settings().values(ValuesRequest.builder().setKeys(property).build()); - List<Settings.Setting> settingsList = response.getSettingsList(); - if (settingsList.isEmpty()) { - return null; - } - assertThat(settingsList).hasSize(1); - return settingsList.iterator().next().getValue(); - } - - private static class ElasticsearchStartupWatcher implements StartupLogWatcher { - private final Pattern pattern = Pattern.compile("Elasticsearch listening on .*:(\\d+)"); - private int port = -1; - - @Override - public boolean isStarted(String log) { - Matcher matcher = pattern.matcher(log); - if (matcher.find()) { - port = Integer.parseInt(matcher.group(1)); - } - return log.contains("Process[es] is up"); - } - } - - private static void expectLog(Orchestrator orchestrator, String expectedLog) throws IOException { - File logFile = orchestrator.getServer().getWebLogs(); - try (Stream<String> lines = Files.lines(logFile.toPath())) { - assertThat(lines.anyMatch(s -> StringUtils.containsIgnoreCase(s, expectedLog))).isTrue(); - } - } - - private static void expectWriteOperations(Orchestrator orchestrator, boolean expected) throws IOException { - try (Stream<String> lines = Files.lines(orchestrator.getServer().getWebLogs().toPath())) { - List<String> writeOperations = lines.filter(ClusterTest::isWriteOperation).collect(Collectors.toList()); - if (expected) { - assertThat(writeOperations).isNotEmpty(); - } else { - assertThat(writeOperations).as("Unexpected write operations: " + Joiner.on('\n').join(writeOperations)).isEmpty(); - - } - } - } - - private static boolean isWriteOperation(String log) { - return isDbWriteOperation(log) || isEsWriteOperation(log); - } - - private static boolean isDbWriteOperation(String log) { - return log.contains("web[][sql]") && (containsIgnoreCase(log, "sql=insert") || - containsIgnoreCase(log, "sql=update") || - containsIgnoreCase(log, "sql=delete") || - containsIgnoreCase(log, "sql=create")); - } - - private static boolean isEsWriteOperation(String log) { - return log.contains("web[][es]") && (containsIgnoreCase(log, "Create index") || - containsIgnoreCase(log, "Create type") || - containsIgnoreCase(log, "put mapping request") || - containsIgnoreCase(log, "refresh request") || - containsIgnoreCase(log, "index request")); - } - - private static void updateSonarPropertiesFile(Orchestrator orchestrator, Map<String, String> props) throws IOException { - Properties propsFile = new Properties(); - try (FileInputStream conf = FileUtils.openInputStream(new File(orchestrator.getServer().getHome(), CONF_FILE_PATH))) { - propsFile.load(conf); - propsFile.putAll(props); - } - try (FileOutputStream conf = FileUtils.openOutputStream(new File(orchestrator.getServer().getHome(), CONF_FILE_PATH))) { - propsFile.store(conf, ""); - } - } -} |