try {
Thread.sleep(CHECK_FOR_STOP_DELAY);
} catch (InterruptedException e) {
- // ignore the interruption itself, check the flag
- Thread.currentThread().interrupt();
+ // ignore the interruption itself
+ // Do not propagate the isInterrupted flag with : Thread.currentThread().interrupt();
+ // It will break the shutdown of ComputeEngineContainerImpl#stop()
}
}
attemptShutdown();
private void attemptShutdown() {
try {
- LOG.info("Compute Engine shutting down...");
+ LOG.info("Compute Engine is stopping...");
computeEngine.shutdown();
+ LOG.info("Compute Engine is stopped");
} catch (Throwable e) {
- LOG.error("Compute Engine shutdown failed", e);
+ LOG.error("Compute Engine failed to stop", e);
} finally {
// release thread waiting for CeServer
stopAwait();
if (t != null) {
t.interrupt();
try {
- t.join(1000);
+ t.join(1_000);
} catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
// Ignored
}
}
import org.sonar.ce.queue.CeQueueCleaner;
import org.sonar.ce.queue.PurgeCeActivities;
import org.sonar.ce.settings.ProjectConfigurationFactory;
+import org.sonar.ce.taskprocessor.CeProcessingScheduler;
import org.sonar.ce.taskprocessor.CeTaskProcessorModule;
import org.sonar.ce.user.CeUserSession;
import org.sonar.core.component.DefaultResourceTypes;
@Override
public ComputeEngineContainer stop() {
+ if (level4 != null) {
+ // try to graceful stop in-progress tasks
+ CeProcessingScheduler ceProcessingScheduler = level4.getComponentByType(CeProcessingScheduler.class);
+ ceProcessingScheduler.stopScheduling();
+ }
this.level1.stopComponents();
return this;
}
void startScheduling();
+ void stopScheduling();
}
import java.util.concurrent.atomic.AtomicBoolean;
import javax.annotation.CheckForNull;
import javax.annotation.Nullable;
-import org.picocontainer.Startable;
import org.sonar.api.utils.log.Logger;
import org.sonar.api.utils.log.Loggers;
import org.sonar.ce.configuration.CeConfiguration;
import static com.google.common.util.concurrent.Futures.addCallback;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
-public class CeProcessingSchedulerImpl implements CeProcessingScheduler, Startable {
+public class CeProcessingSchedulerImpl implements CeProcessingScheduler {
private static final Logger LOG = Loggers.get(CeProcessingSchedulerImpl.class);
private static final long DELAY_BETWEEN_DISABLED_TASKS = 30 * 1000L; // 30 seconds
private final long delayBetweenEnabledTasks;
private final TimeUnit timeUnit;
private final ChainingCallback[] chainingCallbacks;
+ private final EnabledCeWorkerController ceWorkerController;
public CeProcessingSchedulerImpl(CeConfiguration ceConfiguration,
- CeProcessingSchedulerExecutorService processingExecutorService, CeWorkerFactory ceCeWorkerFactory) {
+ CeProcessingSchedulerExecutorService processingExecutorService, CeWorkerFactory ceCeWorkerFactory,
+ EnabledCeWorkerController ceWorkerController) {
this.executorService = processingExecutorService;
this.delayBetweenEnabledTasks = ceConfiguration.getQueuePollingDelay();
+ this.ceWorkerController = ceWorkerController;
this.timeUnit = MILLISECONDS;
int threadWorkerCount = ceConfiguration.getWorkerMaxCount();
}
}
- @Override
- public void start() {
- // nothing to do at component startup, startScheduling will be called by CeQueueInitializer
- }
-
@Override
public void startScheduling() {
for (ChainingCallback chainingCallback : chainingCallbacks) {
}
}
+ /**
+ * This method is stopping all the workers giving them a delay before killing them.
+ */
@Override
- public void stop() {
+ public void stopScheduling() {
+ LOG.debug("Stopping compute engine");
+ // Requesting all workers to stop
+ for (ChainingCallback chainingCallback : chainingCallbacks) {
+ chainingCallback.stop(false);
+ }
+
+ // Workers have 40s to gracefully stop processing tasks
+ long until = System.currentTimeMillis() + 40_000L;
+ LOG.info("Waiting for workers to finish in-progress tasks");
+ while (System.currentTimeMillis() < until && ceWorkerController.hasAtLeastOneProcessingWorker()) {
+ try {
+ Thread.sleep(200L);
+ } catch (InterruptedException e) {
+ LOG.debug("Graceful stop period has been interrupted", e);
+ Thread.currentThread().interrupt();
+ break;
+ }
+ }
+
+ if (ceWorkerController.hasAtLeastOneProcessingWorker()) {
+ LOG.info("Some in-progress tasks did not finish in due time. Tasks will be stopped.");
+ }
+
+ // Interrupting the tasks
for (ChainingCallback chainingCallback : chainingCallbacks) {
- chainingCallback.stop();
+ chainingCallback.stop(true);
}
}
return keepRunning.get();
}
- public void stop() {
+ public void stop(boolean interrupt) {
this.keepRunning.set(false);
if (workerFuture != null) {
- workerFuture.cancel(false);
+ workerFuture.cancel(interrupt);
}
}
}
return NO_TASK;
}
- try {
+ try (EnabledCeWorkerController.ProcessingRecorderHook processing = enabledCeWorkerController.registerProcessingFor(this)) {
executeTask(ceTask.get());
} catch (Exception e) {
LOG.error(format("An error occurred while executing task with uuid '%s'", ceTask.get().getUuid()), e);
* task to process.
*/
public interface EnabledCeWorkerController {
+ interface ProcessingRecorderHook extends AutoCloseable {
+ }
+
/**
* Requests the {@link EnabledCeWorkerController} to refresh its state, if it has any.
*/
* Returns {@code true} if the specified {@link CeWorker} is enabled
*/
boolean isEnabled(CeWorker ceWorker);
+
+ ProcessingRecorderHook registerProcessingFor(CeWorker ceWorker);
+
+ /**
+ * Whether at least one worker is being processed a task or not.
+ * Returns {@code false} when all workers are waiting for tasks
+ * or are being stopped.
+ */
+ boolean hasAtLeastOneProcessingWorker();
}
*/
package org.sonar.ce.taskprocessor;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import org.sonar.api.utils.log.Loggers;
import org.sonar.ce.configuration.CeConfiguration;
public class EnabledCeWorkerControllerImpl implements EnabledCeWorkerController {
+ private final ConcurrentHashMap<CeWorker, Status> map = new ConcurrentHashMap<>();
private final CeConfiguration ceConfiguration;
private final AtomicInteger workerCount;
+ enum Status {
+ PROCESSING, PAUSED
+ }
+
public EnabledCeWorkerControllerImpl(CeConfiguration ceConfiguration) {
this.ceConfiguration = ceConfiguration;
this.workerCount = new AtomicInteger(ceConfiguration.getWorkerCount());
logEnabledWorkerCount();
}
+ @Override
+ public ProcessingRecorderHook registerProcessingFor(CeWorker ceWorker) {
+ return new ProcessingRecorderHookImpl(ceWorker);
+ }
+
+ @Override
+ public boolean hasAtLeastOneProcessingWorker() {
+ return map.entrySet().stream().anyMatch(e -> e.getValue() == Status.PROCESSING);
+ }
+
/**
* Returns {@code true} if {@link CeWorker#getOrdinal() worker ordinal} is strictly less than
* {@link CeConfiguration#getWorkerCount()}.
public boolean isEnabled(CeWorker ceWorker) {
return ceWorker.getOrdinal() < workerCount.get();
}
+
+ private class ProcessingRecorderHookImpl implements ProcessingRecorderHook {
+ private final CeWorker ceWorker;
+
+ private ProcessingRecorderHookImpl(CeWorker ceWorker) {
+ this.ceWorker = ceWorker;
+ map.put(this.ceWorker, Status.PROCESSING);
+ }
+
+ @Override
+ public void close() throws Exception {
+ map.put(ceWorker, Status.PAUSED);
+ }
+ }
}
public class CeProcessingSchedulerImplTest {
private static final Error ERROR_TO_INTERRUPT_CHAINING = new Error("Error should stop scheduling");
- @Rule
// due to risks of infinite chaining of tasks/futures, a timeout is required for safety
+ @Rule
public TestRule safeguardTimeout = new DisableOnDebug(Timeout.seconds(60));
@Rule
public CeConfigurationRule ceConfiguration = new CeConfigurationRule();
- // Required to prevent an infinite loop
private CeWorker ceWorker = mock(CeWorker.class);
private CeWorkerFactory ceWorkerFactory = new TestCeWorkerFactory(ceWorker);
private StubCeProcessingSchedulerExecutorService processingExecutorService = new StubCeProcessingSchedulerExecutorService();
private SchedulerCall regularDelayedPoll = new SchedulerCall(ceWorker, 2000L, MILLISECONDS);
private SchedulerCall extendedDelayedPoll = new SchedulerCall(ceWorker, 30000L, MILLISECONDS);
private SchedulerCall notDelayedPoll = new SchedulerCall(ceWorker);
+ private EnabledCeWorkerController ceWorkerController = new EnabledCeWorkerControllerImpl(ceConfiguration);
- private CeProcessingSchedulerImpl underTest = new CeProcessingSchedulerImpl(ceConfiguration, processingExecutorService, ceWorkerFactory);
+ private CeProcessingSchedulerImpl underTest = new CeProcessingSchedulerImpl(ceConfiguration, processingExecutorService, ceWorkerFactory, ceWorkerController);
@Test
public void polls_without_delay_when_CeWorkerCallable_returns_TASK_PROCESSED() throws Exception {
}
@Test
- public void stop_cancels_next_polling_and_does_not_add_any_new_one() throws Exception {
+ public void stopScheduling_cancels_next_polling_and_does_not_add_any_new_one() throws Exception {
when(ceWorker.call())
.thenReturn(NO_TASK)
.thenReturn(TASK_PROCESSED)
}
// call stop after second delayed polling
if (i == 1) {
- underTest.stop();
+ underTest.stopScheduling();
}
i++;
}
when(processingExecutorService.schedule(any(CeWorker.class), any(Long.class), any(TimeUnit.class))).thenReturn(listenableScheduledFuture);
CeWorkerFactory ceWorkerFactory = spy(new TestCeWorkerFactory(workers));
- CeProcessingSchedulerImpl underTest = new CeProcessingSchedulerImpl(ceConfiguration, processingExecutorService, ceWorkerFactory);
+ CeProcessingSchedulerImpl underTest = new CeProcessingSchedulerImpl(ceConfiguration, processingExecutorService, ceWorkerFactory, ceWorkerController);
when(processingExecutorService.schedule(ceWorker, ceConfiguration.getQueuePollingDelay(), MILLISECONDS))
.thenReturn(listenableScheduledFuture);
@Override
public void shutdown() {
- throw new UnsupportedOperationException("shutdown() not implemented");
+ // Nothing to do
}
@Override
private class ValidateOutputMetric implements Predicate<String> {
@Override
public boolean apply(@Nonnull String metric) {
- checkState(!CORE_METRIC_KEYS.contains(metric), "Metric '%s' cannot be used as an output metric as it's a core metric", metric);
- checkState(pluginMetricKeys.contains(metric), "Metric '%s' cannot be used as an output metric as no plugin declare this metric", metric);
+ checkState(!CORE_METRIC_KEYS.contains(metric), "Metric '%s' cannot be used as an output metric because it's a core metric", metric);
+ checkState(pluginMetricKeys.contains(metric), "Metric '%s' cannot be used as an output metric because no plugins declare this metric", metric);
return true;
}
}
@Test
public void fail_with_ISE_when_output_metric_is_not_define_by_plugin() throws Exception {
thrown.expect(IllegalStateException.class);
- thrown.expectMessage("Metric 'unknown' cannot be used as an output metric as no plugin declare this metric");
+ thrown.expectMessage("Metric 'unknown' cannot be used as an output metric because no plugins declare this metric");
MeasureComputer[] computers = new MeasureComputer[] {newMeasureComputer(array(NEW_METRIC_4), array("unknown"))};
ComputationStep underTest = new LoadMeasureComputersStep(holder, array(new TestMetrics()), computers);
@Test
public void fail_with_ISE_when_output_metric_is_a_core_metric() throws Exception {
thrown.expect(IllegalStateException.class);
- thrown.expectMessage("Metric 'ncloc' cannot be used as an output metric as it's a core metric");
+ thrown.expectMessage("Metric 'ncloc' cannot be used as an output metric because it's a core metric");
MeasureComputer[] computers = new MeasureComputer[] {newMeasureComputer(array(NEW_METRIC_4), array(NCLOC_KEY))};
ComputationStep underTest = new LoadMeasureComputersStep(holder, array(new TestMetrics()), computers);
@Test
public void fail_with_ISE_when_no_metrics_are_defined_by_plugin_but_measure_computer_use_a_new_metric() throws Exception {
thrown.expect(IllegalStateException.class);
- thrown.expectMessage("Metric 'metric1' cannot be used as an output metric as no plugin declare this metric");
+ thrown.expectMessage("Metric 'metric1' cannot be used as an output metric because no plugins declare this metric");
MeasureComputer[] computers = new MeasureComputer[] {newMeasureComputer(array(NCLOC_KEY), array(NEW_METRIC_1))};
ComputationStep underTest = new LoadMeasureComputersStep(holder, computers);
wrapper.disable_restarts=TRUE
wrapper.ping.timeout=0
wrapper.shutdown.timeout=300
-wrapper.jvm_exit.timeout=15
+wrapper.jvm_exit.timeout=300
* Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
+import ce.CePauseStep;
+import ce.PauseMetric;
import java.util.Arrays;
import java.util.List;
import org.sonar.api.Properties;
public class ServerPlugin extends SonarPlugin {
public List getExtensions() {
return Arrays.asList(
- StartupCrash.class, TempFolderExtension.class);
+ StartupCrash.class, TempFolderExtension.class, PauseMetric.class, CePauseStep.class);
}
}
--- /dev/null
+/*
+ * SonarQube
+ * Copyright (C) 2009-2017 SonarSource SA
+ * mailto:info AT sonarsource DOT com
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 3 of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this program; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ */
+package ce;
+
+import java.io.File;
+import org.sonar.api.ce.measure.Component;
+import org.sonar.api.ce.measure.MeasureComputer;
+import org.sonar.api.utils.log.Logger;
+import org.sonar.api.utils.log.Loggers;
+
+public class CePauseStep implements MeasureComputer {
+
+ private static final Logger LOGGER = Loggers.get(CePauseStep.class);
+
+ @Override
+ public MeasureComputerDefinition define(MeasureComputerDefinitionContext defContext) {
+ return defContext.newDefinitionBuilder()
+ .setInputMetrics("ncloc")
+ .setOutputMetrics(PauseMetric.KEY)
+ .build();
+ }
+
+ @Override
+ public void compute(MeasureComputerContext context) {
+ if (context.getComponent().getType() == Component.Type.PROJECT) {
+ String path = context.getSettings().getString("sonar.ce.pauseTask.path");
+ if (path != null) {
+ waitForFileToBeDeleted(path);
+ }
+ }
+ }
+
+ private static void waitForFileToBeDeleted(String path) {
+ LOGGER.info("CE analysis is paused. Waiting for file to be deleted: " + path);
+ File file = new File(path);
+ try {
+ while (file.exists()) {
+ Thread.sleep(500L);
+ }
+ LOGGER.info("CE analysis is resumed");
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new IllegalStateException("CE analysis has been interrupted");
+ }
+ }
+}
--- /dev/null
+/*
+ * SonarQube
+ * Copyright (C) 2009-2017 SonarSource SA
+ * mailto:info AT sonarsource DOT com
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 3 of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this program; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ */
+package ce;
+
+import java.util.Arrays;
+import java.util.List;
+import org.sonar.api.measures.Metric;
+import org.sonar.api.measures.Metrics;
+
+public class PauseMetric implements Metrics {
+
+ public static final String KEY = "pause";
+
+ @Override
+ public List<Metric> getMetrics() {
+ return Arrays.asList(new Metric.Builder(KEY, "Pause", Metric.ValueType.INT).create());
+ }
+}
import org.junit.runner.RunWith;
import org.junit.runners.Suite;
+import org.sonarqube.tests.ce.CeShutdownTest;
import org.sonarqube.tests.ce.CeWorkersTest;
import org.sonarqube.tests.qualityProfile.ActiveRuleEsResilienceTest;
import org.sonarqube.tests.qualityProfile.BuiltInQualityProfilesNotificationTest;
import org.sonarqube.tests.rule.RuleEsResilienceTest;
-import org.sonarqube.tests.serverSystem.ClusterTest;
+import org.sonarqube.tests.cluster.ClusterTest;
import org.sonarqube.tests.serverSystem.RestartTest;
import org.sonarqube.tests.serverSystem.ServerSystemRestartingOrchestrator;
import org.sonarqube.tests.settings.ElasticsearchSettingsTest;
TelemetryUploadTest.class,
TelemetryOptOutTest.class,
// ce
+ CeShutdownTest.class,
CeWorkersTest.class,
+
// elasticsearch
ElasticsearchSettingsTest.class
})
--- /dev/null
+/*
+ * SonarQube
+ * Copyright (C) 2009-2017 SonarSource SA
+ * mailto:info AT sonarsource DOT com
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 3 of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this program; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ */
+package org.sonarqube.tests;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.CountDownLatch;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+import org.apache.commons.io.input.Tailer;
+import org.apache.commons.io.input.TailerListenerAdapter;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Watch log files, usually server logs (see Orchestrator.getServer().get*Logs()).
+ * This class allows to not load the full content in memory.
+ */
+public class LogsTailer implements AutoCloseable {
+
+ private final List<Tailer> tailers;
+ private final LogConsumer logConsumer;
+
+ private LogsTailer(Builder builder) {
+ logConsumer = new LogConsumer(builder.consumers);
+ tailers = builder.files.stream()
+ .map(file -> Tailer.create(file, logConsumer, 500))
+ .collect(Collectors.toList());
+ }
+
+ public Watch watch(String text) {
+ return new Watch(text);
+ }
+
+ @Override
+ public void close() throws Exception {
+ for (Tailer tailer : tailers) {
+ tailer.stop();
+ }
+ }
+
+ public static Builder builder() {
+ return new Builder();
+ }
+
+ public static class Builder {
+ private final List<File> files = new ArrayList<>();
+ private final List<Consumer<String>> consumers = new ArrayList<>();
+
+ public Builder addFile(File file) {
+ this.files.add(file);
+ return this;
+ }
+
+ public Builder addFiles(File file, File... otherFiles) {
+ this.files.add(file);
+ Collections.addAll(this.files, otherFiles);
+ return this;
+ }
+
+ public Builder addConsumer(Consumer<String> consumer) {
+ this.consumers.add(consumer);
+ return this;
+ }
+
+ public Builder doOnFind(String text, Runnable runnable) {
+ return addConsumer(log -> {
+ if (log.contains(text)) {
+ runnable.run();
+ }
+ });
+ }
+
+ public LogsTailer build() {
+ return new LogsTailer(this);
+ }
+ }
+
+ private static class LogConsumer extends TailerListenerAdapter {
+ private final List<Consumer<String>> consumers = Collections.synchronizedList(new ArrayList<>());
+
+ private LogConsumer(List<Consumer<String>> consumers) {
+ this.consumers.addAll(consumers);
+ }
+
+ @Override
+ public void handle(String line) {
+ synchronized (consumers) {
+ for (Consumer<String> consumer : consumers) {
+ try {
+ consumer.accept(line);
+ } catch (Exception e) {
+ // do not prevent other consumers to handle the log
+ e.printStackTrace();
+ }
+ }
+ }
+ }
+
+ public void add(Consumer<String> consumer) {
+ this.consumers.add(consumer);
+ }
+
+ public void remove(Consumer<String> consumer) {
+ this.consumers.remove(consumer);
+ }
+ }
+
+ public class Watch implements AutoCloseable {
+ private final String expectedText;
+ private final CountDownLatch foundSignal = new CountDownLatch(1);
+ private String log = null;
+ private final Consumer<String> consumer;
+
+ private Watch(String expectedText) {
+ this.expectedText = requireNonNull(expectedText);
+ this.consumer = log -> {
+ if (log.contains(this.expectedText)) {
+ this.log = log;
+ foundSignal.countDown();
+ }
+ };
+ logConsumer.add(consumer);
+ }
+
+ /**
+ * Blocks until the expected log appears in watched files.
+ */
+ public void waitForLog() throws InterruptedException {
+ foundSignal.await();
+ }
+
+ public Optional<String> getLog() {
+ return Optional.ofNullable(log);
+ }
+
+ @Override
+ public void close() {
+ logConsumer.remove(consumer);
+ }
+ }
+}
--- /dev/null
+/*
+ * SonarQube
+ * Copyright (C) 2009-2017 SonarSource SA
+ * mailto:info AT sonarsource DOT com
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 3 of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this program; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ */
+package org.sonarqube.tests.ce;
+
+import com.sonar.orchestrator.Orchestrator;
+import com.sonar.orchestrator.build.SonarScanner;
+import java.io.File;
+import java.io.IOException;
+import org.apache.commons.io.FileUtils;
+import org.junit.Ignore;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.DisableOnDebug;
+import org.junit.rules.TemporaryFolder;
+import org.junit.rules.TestRule;
+import org.junit.rules.Timeout;
+import org.sonarqube.tests.LogsTailer;
+import org.sonarqube.ws.client.WsClient;
+import org.sonarqube.ws.client.ce.ActivityStatusWsRequest;
+import util.ItUtils;
+
+import static com.google.common.base.Preconditions.checkState;
+import static org.assertj.core.api.Assertions.assertThat;
+
+public class CeShutdownTest {
+
+ @Rule
+ public TestRule safeguard = new DisableOnDebug(Timeout.seconds(600));
+ @Rule
+ public TemporaryFolder temp = new TemporaryFolder();
+
+ @Test
+ public void stopping_CE_waits_for_in_progress_task_to_be_finished() throws Exception {
+ try (ComputeEngine ce = new ComputeEngine()) {
+
+ try (LogsTailer.Watch watch = ce.logs().watch("CE analysis is paused")) {
+ ce.triggerTask();
+ watch.waitForLog();
+ assertThat(ce.countInProgressTasks()).isEqualTo(1);
+ }
+
+ // stop does not kill in-progress workers. It waits
+ // for them (at least a few dozens of seconds)
+ try (LogsTailer.Watch watch = ce.logs().watch("Waiting for workers to finish in-progress tasks")) {
+ ce.triggerStop();
+ watch.waitForLog();
+ assertThat(ce.countInProgressTasks()).isEqualTo(1);
+ }
+
+ // resume the in-progress task, so that it can
+ // finish successfully
+ try (LogsTailer.Watch watch = ce.logs().watch("Process [ce] is stopped")) {
+ ce.resumeTask();
+ watch.waitForLog();
+ assertThat(ce.isTaskFinished()).isTrue();
+ assertThat(ce.hasErrorLogs()).isFalse();
+ }
+ }
+ }
+
+ @Test
+ @Ignore("TODO make the graceful stop timeout configurable. 40 seconds is too long for a test.")
+ public void stopping_CE_kills_in_progress_tasks_if_too_long_to_gracefully_stop() throws Exception {
+ try (ComputeEngine ce = new ComputeEngine()) {
+
+ try (LogsTailer.Watch watch = ce.logs().watch("CE analysis is paused")) {
+ ce.triggerTask();
+ watch.waitForLog();
+ assertThat(ce.countInProgressTasks()).isEqualTo(1);
+ }
+
+ // stop does not kill in-progress workers. It waits
+ // for them (at least a few dozens of seconds)
+ try (LogsTailer.Watch watch = ce.logs().watch("Waiting for workers to finish in-progress tasks")) {
+ ce.triggerStop();
+ watch.waitForLog();
+ assertThat(ce.countInProgressTasks()).isEqualTo(1);
+ }
+
+ // resume the in-progress task, so that it can
+ // finish successfully
+ try (LogsTailer.Watch watch = ce.logs().watch("Process [ce] is stopped")) {
+ watch.waitForLog();
+ assertThat(ce.isTaskFinished()).isTrue();
+ assertThat(ce.hasErrorLogs()).isTrue();
+ }
+ }
+ }
+
+ private class ComputeEngine implements AutoCloseable {
+ private final Orchestrator orchestrator;
+ private final File pauseFile;
+ private final WsClient adminWsClient;
+ private Thread stopper;
+ private final LogsTailer logsTailer;
+
+ ComputeEngine() throws Exception {
+ pauseFile = temp.newFile();
+ FileUtils.touch(pauseFile);
+
+ orchestrator = Orchestrator.builderEnv()
+ .setServerProperty("sonar.ce.pauseTask.path", pauseFile.getAbsolutePath())
+ .addPlugin(ItUtils.xooPlugin())
+ .addPlugin(ItUtils.pluginArtifact("server-plugin"))
+ .build();
+ orchestrator.start();
+ adminWsClient = ItUtils.newAdminWsClient(orchestrator);
+ logsTailer = LogsTailer.builder()
+ .addFile(orchestrator.getServer().getCeLogs())
+ .addFile(orchestrator.getServer().getAppLogs())
+ .build();
+ }
+
+ LogsTailer logs() {
+ return logsTailer;
+ }
+
+ void triggerTask() throws InterruptedException {
+ orchestrator.executeBuild(SonarScanner.create(new File("projects/shared/xoo-sample"), "sonar.projectKey", "foo"), false);
+ }
+
+ void resumeTask() throws Exception {
+ FileUtils.forceDelete(pauseFile);
+ }
+
+ int countInProgressTasks() {
+ return adminWsClient.ce().activityStatus(ActivityStatusWsRequest.newBuilder().build()).getInProgress();
+ }
+
+ boolean isTaskFinished() throws Exception {
+ String ceLogs = FileUtils.readFileToString(orchestrator.getServer().getCeLogs());
+ return ceLogs.contains("Executed task | project=foo | type=REPORT");
+ }
+
+ boolean hasErrorLogs() throws IOException {
+ String ceLogs = FileUtils.readFileToString(orchestrator.getServer().getCeLogs());
+ return ceLogs.contains(" ERROR ");
+ }
+
+ /**
+ * non-blocking stop
+ */
+ void triggerStop() {
+ checkState(stopper == null);
+ stopper = new Thread(orchestrator::stop);
+ stopper.start();
+ }
+
+ @Override
+ public void close() throws Exception {
+ if (stopper != null) {
+ stopper.interrupt();
+ }
+ if (orchestrator != null) {
+ orchestrator.stop();
+ }
+ }
+ }
+}
* along with this program; if not, write to the Free Software Foundation,
* Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
-
package org.sonarqube.tests.cluster;
-import com.google.common.net.HostAndPort;
import com.sonar.orchestrator.Orchestrator;
import com.sonar.orchestrator.OrchestratorBuilder;
-import com.sonar.orchestrator.util.NetworkUtils;
-import java.net.Inet4Address;
-import java.net.InetAddress;
-import java.net.NetworkInterface;
-import java.net.SocketException;
import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.EnumSet;
-import java.util.Enumeration;
import java.util.List;
-import java.util.Properties;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ForkJoinPool;
import java.util.function.Consumer;
-import java.util.function.Predicate;
-import java.util.stream.Collectors;
-import javax.annotation.CheckForNull;
-
-import static org.sonarqube.tests.cluster.Cluster.NodeType.APPLICATION;
-import static org.sonarqube.tests.cluster.Cluster.NodeType.SEARCH;
-
-public class Cluster {
-
- protected static final String CLUSTER_ENABLED = "sonar.cluster.enabled";
- protected static final String CLUSTER_NODE_TYPE = "sonar.cluster.node.type";
- protected static final String CLUSTER_SEARCH_HOSTS = "sonar.cluster.search.hosts";
- protected static final String CLUSTER_HOSTS = "sonar.cluster.hosts";
- protected static final String CLUSTER_NODE_PORT = "sonar.cluster.node.port";
- protected static final String CLUSTER_NODE_HOST = "sonar.cluster.node.host";
- protected static final String CLUSTER_NAME = "sonar.cluster.name";
-
- protected static final String SEARCH_HOST = "sonar.search.host";
- protected static final String SEARCH_PORT = "sonar.search.port";
- protected static final String SEARCH_JAVA_OPTS = "sonar.search.javaOpts";
-
- protected static final String WEB_JAVA_OPTS = "sonar.web.javaOpts";
- protected static final String WEB_PORT = "sonar.web.port";
-
- protected static final String CE_JAVA_OPTS = "sonar.ce.javaOpts";
+import java.util.stream.Stream;
+import javax.annotation.Nullable;
+import org.slf4j.LoggerFactory;
- public enum NodeType {
- SEARCH("search"), APPLICATION("application");
+import static java.util.stream.Collectors.joining;
- final String value;
+class Cluster implements AutoCloseable {
- NodeType(String value) {
- this.value = value;
- }
+ @Nullable
+ private final String clusterName;
- public String getValue() {
- return value;
- }
+ private final List<Node> nodes = new ArrayList<>();
- public static final EnumSet<NodeType> ALL = EnumSet.allOf(NodeType.class);
+ Cluster(@Nullable String name) {
+ this.clusterName = name;
}
- private final List<Node> nodes;
- private final ForkJoinPool forkJoinPool = new ForkJoinPool(5);
-
- private Cluster(List<Node> nodes) {
- this.nodes = nodes;
- assignPorts();
- completeNodesConfiguration();
- buildOrchestrators();
+ Node startNode(NodeConfig config, Consumer<OrchestratorBuilder> consumer) {
+ Node node = addNode(config, consumer);
+ node.start();
+ return node;
}
- public void start() throws ExecutionException, InterruptedException {
- forkJoinPool.submit(
- () -> nodes.parallelStream().forEach(
- node -> node.getOrchestrator().start()
- )
- ).get();
- }
+ Node addNode(NodeConfig config, Consumer<OrchestratorBuilder> consumer) {
+ OrchestratorBuilder builder = newOrchestratorBuilder(config);
- public void stop() throws ExecutionException, InterruptedException {
- forkJoinPool.submit(
- () -> nodes.parallelStream().forEach(
- node -> node.getOrchestrator().stop()
- )
- ).get();
- }
-
- public void stopAll(Predicate<Node> predicate) throws ExecutionException, InterruptedException {
- forkJoinPool.submit(
- () -> nodes.parallelStream()
- .filter(predicate)
- .forEach(n -> n.getOrchestrator().stop())
- ).get();
- }
-
- public void startAll(Predicate<Node> predicate) throws ExecutionException, InterruptedException {
- forkJoinPool.submit(
- () -> nodes.parallelStream()
- .filter(predicate)
- .forEach(n -> n.getOrchestrator().start())
- ).get();
- }
-
- public List<Node> getNodes() {
- return Collections.unmodifiableList(nodes);
- }
-
- private void assignPorts() {
- nodes.stream().forEach(
- node -> {
- node.setHzPort(NetworkUtils.getNextAvailablePort(getNonloopbackIPv4Address()));
- if (node.getType() == SEARCH) {
- node.setEsPort(NetworkUtils.getNextAvailablePort(getNonloopbackIPv4Address()));
- } else if (node.getType() == APPLICATION) {
- node.setWebPort(NetworkUtils.getNextAvailablePort(getNonloopbackIPv4Address()));
- }
- }
- );
- }
-
- public static InetAddress getNonloopbackIPv4Address() {
- try {
- Enumeration<NetworkInterface> nets = NetworkInterface.getNetworkInterfaces();
- for (NetworkInterface networkInterface : Collections.list(nets)) {
- if (!networkInterface.isLoopback() && networkInterface.isUp() && !isBlackListed(networkInterface)) {
- Enumeration<InetAddress> inetAddresses = networkInterface.getInetAddresses();
- while (inetAddresses.hasMoreElements()) {
- InetAddress inetAddress = inetAddresses.nextElement();
- if (inetAddress instanceof Inet4Address) {
- return inetAddress;
- }
- }
- }
- }
- } catch (SocketException se) {
- throw new RuntimeException("Cannot find a non loopback card required for tests", se);
+ switch (config.getType()) {
+ case SEARCH:
+ builder
+ .setServerProperty("sonar.cluster.node.type", "search")
+ .setServerProperty("sonar.search.host", config.getAddress().getHostAddress())
+ .setServerProperty("sonar.search.port", "" + config.getSearchPort().get())
+ .setServerProperty("sonar.search.javaOpts", "-Xmx64m -Xms64m -XX:+HeapDumpOnOutOfMemoryError");
+ break;
+ case APPLICATION:
+ builder
+ .setServerProperty("sonar.cluster.node.type", "application")
+ .setServerProperty("sonar.web.host", config.getAddress().getHostAddress())
+ .setServerProperty("sonar.web.port", "" + config.getWebPort().get())
+ .setServerProperty("sonar.web.javaOpts", "-Xmx128m -Xms16m -XX:+HeapDumpOnOutOfMemoryError")
+ .setServerProperty("sonar.auth.jwtBase64Hs256Secret", "HrPSavOYLNNrwTY+SOqpChr7OwvbR/zbDLdVXRN0+Eg=")
+ .setServerProperty("sonar.ce.javaOpts", "-Xmx32m -Xms16m -XX:+HeapDumpOnOutOfMemoryError");
+ break;
}
- throw new RuntimeException("Cannot find a non loopback card required for tests");
+ consumer.accept(builder);
+ Orchestrator orchestrator = builder.build();
+ Node node = new Node(config, orchestrator);
+ nodes.add(node);
+ return node;
}
- private static boolean isBlackListed(NetworkInterface networkInterface) {
- return networkInterface.getName().startsWith("docker") ||
- networkInterface.getName().startsWith("vboxnet");
+ Stream<Node> getNodes() {
+ return nodes.stream();
}
- private void completeNodesConfiguration() {
- String inet = getNonloopbackIPv4Address().getHostAddress();
- String clusterHosts = nodes.stream()
- .map(node -> HostAndPort.fromParts(inet, node.getHzPort()).toString())
- .collect(Collectors.joining(","));
- String elasticsearchHosts = nodes.stream()
- .filter(node -> node.getType() == SEARCH)
- .map(node -> HostAndPort.fromParts(inet, node.getEsPort()).toString())
- .collect(Collectors.joining(","));
-
- nodes.forEach(
- node -> {
- node.addProperty(CLUSTER_NODE_HOST, inet);
- node.addProperty(CLUSTER_HOSTS, clusterHosts);
- node.addProperty(CLUSTER_NODE_PORT, Integer.toString(node.getHzPort() == null ? -1 : node.getHzPort()));
- node.addProperty(CLUSTER_SEARCH_HOSTS, elasticsearchHosts);
- node.addProperty(SEARCH_PORT, Integer.toString(node.getEsPort() == null ? -1 : node.getEsPort()));
- node.addProperty(SEARCH_HOST, inet);
- node.addProperty(WEB_PORT, Integer.toString(node.getWebPort() == null ? -1 : node.getWebPort()));
- node.addProperty(CLUSTER_NODE_TYPE, node.getType().getValue());
- }
- );
+ Stream<Node> getAppNodes() {
+ return nodes.stream().filter(n -> n.getConfig().getType() == NodeConfig.NodeType.APPLICATION);
}
- private void buildOrchestrators() {
- nodes.stream().limit(1).forEach(
- node -> buildOrchestrator(node, false)
- );
- nodes.stream().skip(1).forEach(
- node -> buildOrchestrator(node, true)
- );
+ Stream<Node> getSearchNodes() {
+ return nodes.stream().filter(n -> n.getConfig().getType() == NodeConfig.NodeType.SEARCH);
}
- private void buildOrchestrator(Node node, boolean keepDatabase) {
- OrchestratorBuilder builder = Orchestrator.builderEnv()
- .setOrchestratorProperty("orchestrator.keepDatabase", Boolean.toString(keepDatabase))
- .setStartupLogWatcher(new StartupLogWatcherImpl());
-
- node.getProperties().entrySet().stream().forEach(
- e -> builder.setServerProperty((String) e.getKey(), (String) e.getValue())
- );
-
- node.setOrchestrator(builder.build());
+ Node getAppNode(int index) {
+ return getAppNodes().skip(index).findFirst().orElseThrow(IllegalArgumentException::new);
}
- public static Builder builder() {
- return new Builder();
+ Node getSearchNode(int index) {
+ return getSearchNodes().skip(index).findFirst().orElseThrow(IllegalArgumentException::new);
}
- public static class Builder {
- private final List<Node> nodes = new ArrayList<>();
-
- public Cluster build() {
- return new Cluster(nodes);
- }
-
- public Builder addNode(NodeType type, Consumer<Node>... consumers) {
- Node node = new Node(type);
- Arrays.stream(consumers).forEach(c -> c.accept(node));
- nodes.add(node);
- return this;
+ @Override
+ public void close() throws Exception {
+ // nodes are stopped in order of creation
+ for (Node node : nodes) {
+ try {
+ node.stop();
+ } catch (Exception e) {
+ LoggerFactory.getLogger(getClass()).error("Fail to stop node", e);
+ }
}
}
- /**
- * A cluster node
- */
- public static class Node {
- private final NodeType type;
- private Integer webPort;
- private Integer esPort;
- private Integer hzPort;
- private Orchestrator orchestrator = null;
- private Properties properties = new Properties();
-
- public Node(NodeType type) {
- this.type = type;
-
- // Default properties
- properties.setProperty(CLUSTER_ENABLED, "true");
- properties.setProperty(CLUSTER_NAME, "sonarqube");
- properties.setProperty(CE_JAVA_OPTS, "-Xmx256m");
- properties.setProperty(WEB_JAVA_OPTS, "-Xmx256m");
- properties.setProperty(SEARCH_JAVA_OPTS, "-Xmx256m -Xms256m " +
- "-XX:+UseConcMarkSweepGC " +
- "-XX:CMSInitiatingOccupancyFraction=75 " +
- "-XX:+UseCMSInitiatingOccupancyOnly " +
- "-XX:+AlwaysPreTouch " +
- "-server " +
- "-Xss1m " +
- "-Djava.awt.headless=true " +
- "-Dfile.encoding=UTF-8 " +
- "-Djna.nosys=true " +
- "-Djdk.io.permissionsUseCanonicalPath=true " +
- "-Dio.netty.noUnsafe=true " +
- "-Dio.netty.noKeySetOptimization=true " +
- "-Dio.netty.recycler.maxCapacityPerThread=0 " +
- "-Dlog4j.shutdownHookEnabled=false " +
- "-Dlog4j2.disable.jmx=true " +
- "-Dlog4j.skipJansi=true " +
- "-XX:+HeapDumpOnOutOfMemoryError");
- }
-
- public Properties getProperties() {
- return properties;
- }
-
- public Orchestrator getOrchestrator() {
- return orchestrator;
- }
-
- private void setOrchestrator(Orchestrator orchestrator) {
- this.orchestrator = orchestrator;
- }
-
- public NodeType getType() {
- return type;
+ private OrchestratorBuilder newOrchestratorBuilder(NodeConfig node) {
+ OrchestratorBuilder builder = Orchestrator.builderEnv();
+ builder.setOrchestratorProperty("orchestrator.keepDatabase", "true");
+ builder.setServerProperty("sonar.cluster.enabled", "true");
+ builder.setServerProperty("sonar.cluster.node.host", node.getAddress().getHostAddress());
+ builder.setServerProperty("sonar.cluster.node.port", "" + node.getHzPort());
+ builder.setServerProperty("sonar.cluster.hosts", node.getConnectedNodes().stream().map(NodeConfig::getHzHost).collect(joining(",")));
+ builder.setServerProperty("sonar.cluster.search.hosts", node.getSearchNodes().stream().map(NodeConfig::getSearchHost).collect(joining(",")));
+ if (clusterName != null) {
+ builder.setServerProperty("sonar.cluster.name", clusterName);
}
-
- @CheckForNull
- public Integer getWebPort() {
- return webPort;
- }
-
- @CheckForNull
- public Integer getEsPort() {
- return esPort;
- }
-
- @CheckForNull
- public Integer getHzPort() {
- return hzPort;
- }
-
- private void setWebPort(Integer webPort) {
- this.webPort = webPort;
- }
-
- private void setEsPort(Integer esPort) {
- this.esPort = esPort;
- }
-
- private void setHzPort(Integer hzPort) {
- this.hzPort = hzPort;
- }
-
- private void addProperty(String key, String value) {
- properties.setProperty(key, value);
+ if (node.getName().isPresent()) {
+ builder.setServerProperty("sonar.cluster.node.name", node.getName().get());
}
+ builder.setStartupLogWatcher(logLine -> true);
+ return builder;
}
}
--- /dev/null
+/*
+ * SonarQube
+ * Copyright (C) 2009-2017 SonarSource SA
+ * mailto:info AT sonarsource DOT com
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 3 of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this program; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ */
+package org.sonarqube.tests.cluster;
+
+import com.sonar.orchestrator.Orchestrator;
+import com.sonar.orchestrator.OrchestratorBuilder;
+import com.sonar.orchestrator.db.DefaultDatabase;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.function.BinaryOperator;
+import java.util.function.Consumer;
+import java.util.stream.IntStream;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.DisableOnDebug;
+import org.junit.rules.ExpectedException;
+import org.junit.rules.TestRule;
+import org.junit.rules.Timeout;
+
+import static com.google.common.base.Preconditions.checkState;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.Assert.fail;
+import static org.sonarqube.tests.cluster.NodeConfig.newApplicationConfig;
+import static org.sonarqube.tests.cluster.NodeConfig.newSearchConfig;
+
+public class ClusterTest {
+
+ @Rule
+ public TestRule safeguard = new DisableOnDebug(Timeout.seconds(300));
+
+ @Rule
+ public ExpectedException expectedException = ExpectedException.none();
+
+ @BeforeClass
+ public static void initDbSchema() throws Exception {
+ Orchestrator orchestrator = Orchestrator.builderEnv()
+ // enforce (re-)creation of database schema
+ .setOrchestratorProperty("orchestrator.keepDatabase", "false")
+ .build();
+ DefaultDatabase db = new DefaultDatabase(orchestrator.getConfiguration());
+ checkState(!db.getClient().getDialect().equals("h2"), "H2 is not supported in cluster mode");
+ db.start();
+ db.stop();
+ }
+
+ @Test
+ public void test_high_availability_topology() throws Exception {
+ try (Cluster cluster = newCluster(3, 2)) {
+ cluster.getNodes().forEach(Node::start);
+ cluster.getAppNodes().forEach(Node::waitForStatusUp);
+
+ // TODO verify cluster health to be green
+ // TODO verify that ES cluster is green
+
+ cluster.getNodes().forEach(node -> {
+ node.assertThatProcessesAreUp();
+ assertThat(node.anyLogsContain(" ERROR ")).isFalse();
+ assertThat(node.anyLogsContain("MessageException")).isFalse();
+ });
+
+ // verify that there's a single web startup leader
+ Node startupLeader = cluster.getAppNodes()
+ .filter(Node::isStartupLeader)
+ .reduce(singleElement())
+ .get();
+ assertThat(startupLeader.hasStartupLeaderOperations()).isTrue();
+ assertThat(startupLeader.hasCreatedSearchIndices()).isTrue();
+
+ // verify that the second app node is a startup follower
+ Node startupFollower = cluster.getAppNodes()
+ .filter(Node::isStartupFollower)
+ .reduce(singleElement())
+ .get();
+ assertThat(startupFollower.hasStartupLeaderOperations()).isFalse();
+ assertThat(startupFollower.hasCreatedSearchIndices()).isFalse();
+
+ cluster.getAppNodes().forEach(app -> {
+ // compute engine is being started when app node is already in status UP
+ app.waitForCeLogsContain("Compute Engine is operational");
+ assertThat(app.anyLogsContain("Process[ce] is up")).isTrue();
+ });
+ }
+ }
+
+ @Test
+ public void minimal_cluster_is_2_search_and_1_application_nodes() throws Exception {
+ try (Cluster cluster = newCluster(2, 1)) {
+ cluster.getNodes().forEach(Node::start);
+
+ Node app = cluster.getAppNode(0);
+ app.waitForStatusUp();
+ app.waitForCeLogsContain("Compute Engine is operational");
+ assertThat(app.isStartupLeader()).isTrue();
+ assertThat(app.hasStartupLeaderOperations()).isTrue();
+
+ // TODO verify cluster health to be yellow
+ // TODO verify that ES cluster is yellow
+
+ cluster.getNodes().forEach(node -> {
+ assertThat(node.anyLogsContain(" ERROR ")).isFalse();
+ node.assertThatProcessesAreUp();
+ });
+ }
+ }
+
+ @Test
+ public void configuration_of_connection_to_other_nodes_can_be_non_exhaustive() throws Exception {
+ try (Cluster cluster = new Cluster(null)) {
+ NodeConfig searchConfig1 = newSearchConfig();
+ NodeConfig searchConfig2 = newSearchConfig();
+ NodeConfig appConfig = newApplicationConfig();
+
+ // HZ bus : app -> search 2 -> search1, which is not recommended at all !!!
+ searchConfig2.addConnectionToBus(searchConfig1);
+ appConfig.addConnectionToBus(searchConfig2);
+
+ // search1 is not configured to connect search2
+ // app is not configured to connect to search 1
+ // --> not recommended at all !!!
+ searchConfig2.addConnectionToSearch(searchConfig1);
+ appConfig.addConnectionToSearch(searchConfig2);
+
+ cluster.startNode(searchConfig1, nothing());
+ cluster.startNode(searchConfig2, nothing());
+ Node app = cluster.startNode(appConfig, nothing());
+
+ app.waitForStatusUp();
+ assertThat(app.isStartupLeader()).isTrue();
+ assertThat(app.hasStartupLeaderOperations()).isTrue();
+ // TODO verify cluster health to be yellow
+ // TODO verify that ES cluster is yellow
+
+ // no errors
+ cluster.getNodes().forEach(node -> {
+ assertThat(node.anyLogsContain(" ERROR ")).isFalse();
+ node.assertThatProcessesAreUp();
+ });
+ }
+ }
+
+ @Test
+ public void node_fails_to_join_cluster_if_different_cluster_name() throws Exception {
+ try (Cluster cluster = new Cluster("foo")) {
+ NodeConfig searchConfig1 = newSearchConfig();
+ NodeConfig searchConfig2 = newSearchConfig();
+ NodeConfig.interconnectBus(searchConfig1, searchConfig2);
+ NodeConfig.interconnectSearch(searchConfig1, searchConfig2);
+ cluster.startNode(searchConfig1, nothing());
+ cluster.startNode(searchConfig2, nothing());
+
+ NodeConfig searchConfig3 = newSearchConfig()
+ .addConnectionToSearch(searchConfig1)
+ .addConnectionToBus(searchConfig1, searchConfig2);
+ Node search3 = cluster.addNode(searchConfig3, b -> b
+ .setServerProperty("sonar.cluster.name", "bar")
+ .setStartupLogWatcher(logLine -> logLine.contains("SonarQube is up")));
+ try {
+ search3.start();
+ fail();
+ } catch (IllegalStateException e) {
+ assertThat(e).hasMessage("Server startup failure");
+ // TODO how to force process to write into sonar.log, even if sonar.log.console=true ?
+ // assertThat(search3.anyLogsContain("This node has a cluster name [bar], which does not match [foo] from the cluster")).isTrue();
+ }
+ }
+ }
+
+ @Test
+ public void restarting_all_application_nodes_elects_a_new_startup_leader() throws Exception {
+ // no need for 3 search nodes, 2 is enough for the test
+ try (Cluster cluster = newCluster(2, 2)) {
+ cluster.getNodes().forEach(Node::start);
+ cluster.getAppNodes().forEach(Node::waitForStatusUp);
+
+ // stop application nodes only
+ cluster.getAppNodes().forEach(app -> {
+ app.stop();
+ app.cleanUpLogs();
+ // logs are empty, no more possible to know if node was startup leader/follower
+ assertThat(app.isStartupLeader()).isFalse();
+ assertThat(app.isStartupFollower()).isFalse();
+ });
+
+ // restart application nodes
+ cluster.getAppNodes().forEach(Node::start);
+ cluster.getAppNodes().forEach(Node::waitForStatusUp);
+
+ // one app node is elected as startup leader. It does some initialization stuff,
+ // like registration of rules. Search indices already exist and are up-to-date.
+ Node startupLeader = cluster.getAppNodes()
+ .filter(Node::isStartupLeader)
+ .reduce(singleElement())
+ .get();
+ assertThat(startupLeader.hasStartupLeaderOperations()).isTrue();
+ assertThat(startupLeader.hasCreatedSearchIndices()).isFalse();
+
+ Node startupFollower = cluster.getAppNodes()
+ .filter(Node::isStartupFollower)
+ .reduce(singleElement())
+ .get();
+ assertThat(startupFollower.hasStartupLeaderOperations()).isFalse();
+ assertThat(startupFollower.hasCreatedSearchIndices()).isFalse();
+ assertThat(startupFollower).isNotSameAs(startupLeader);
+ }
+ }
+
+ /**
+ * Used to have non-blocking {@link Node#start()}. Orchestrator considers
+ * node to be up as soon as the first log is generated.
+ */
+ private static Consumer<OrchestratorBuilder> nothing() {
+ return b -> {
+ };
+ }
+
+ /**
+ * Configure a cluster with recommended configuration (each node has references
+ * to other nodes)
+ */
+ private static Cluster newCluster(int nbOfSearchNodes, int nbOfAppNodes) {
+ Cluster cluster = new Cluster(null);
+
+ List<NodeConfig> configs = new ArrayList<>();
+ IntStream.range(0, nbOfSearchNodes).forEach(i -> configs.add(newSearchConfig()));
+ IntStream.range(0, nbOfAppNodes).forEach(i -> configs.add(newApplicationConfig()));
+ NodeConfig[] configsArray = configs.toArray(new NodeConfig[configs.size()]);
+
+ // a node is connected to all nodes, including itself (see sonar.cluster.hosts)
+ NodeConfig.interconnectBus(configsArray);
+
+ // search nodes are interconnected, and app nodes connect to all search nodes
+ NodeConfig.interconnectSearch(configsArray);
+
+ configs.forEach(c -> cluster.addNode(c, nothing()));
+ return cluster;
+ }
+
+ private static BinaryOperator<Node> singleElement() {
+ return (a, b) -> {
+ throw new IllegalStateException("More than one element");
+ };
+ }
+}
+++ /dev/null
-/*
- * SonarQube
- * Copyright (C) 2009-2017 SonarSource SA
- * mailto:info AT sonarsource DOT com
- *
- * This program is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License as published by the Free Software Foundation; either
- * version 3 of the License, or (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public License
- * along with this program; if not, write to the Free Software Foundation,
- * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
- */
-
-package org.sonarqube.tests.cluster;
-
-import java.util.concurrent.ExecutionException;
-
-import static org.sonarqube.tests.cluster.Cluster.NodeType.APPLICATION;
-import static org.sonarqube.tests.cluster.Cluster.NodeType.SEARCH;
-
-public class DataCenterEdition {
-
- private final Cluster cluster;
-
- public DataCenterEdition() {
- cluster = Cluster.builder()
- .addNode(SEARCH)
- .addNode(SEARCH)
- .addNode(SEARCH)
- .addNode(APPLICATION)
- .addNode(APPLICATION)
- .build();
- }
-
- public void stop() throws ExecutionException, InterruptedException {
- cluster.stop();
- }
-
- public void start() throws ExecutionException, InterruptedException {
- cluster.start();
- }
-
- public Cluster getCluster() {
- return cluster;
- }
-}
+++ /dev/null
-/*
- * SonarQube
- * Copyright (C) 2009-2017 SonarSource SA
- * mailto:info AT sonarsource DOT com
- *
- * This program is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License as published by the Free Software Foundation; either
- * version 3 of the License, or (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public License
- * along with this program; if not, write to the Free Software Foundation,
- * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
- */
-
-package org.sonarqube.tests.cluster;
-
-import com.sonar.orchestrator.db.Database;
-import com.sonar.orchestrator.db.DatabaseClient;
-import java.io.IOException;
-import java.net.InetAddress;
-import java.net.ServerSocket;
-import java.sql.Connection;
-import java.sql.SQLException;
-import java.sql.Statement;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
-import javax.annotation.Nullable;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.DisableOnDebug;
-import org.junit.rules.TestRule;
-import org.junit.rules.Timeout;
-
-import static org.assertj.core.api.Assertions.assertThat;
-import static org.assertj.core.api.Assertions.tuple;
-import static org.junit.Assert.fail;
-import static org.sonarqube.tests.cluster.Cluster.NodeType.APPLICATION;
-import static org.sonarqube.tests.cluster.Cluster.NodeType.SEARCH;
-
-public class DataCenterEditionTest {
-
- @Rule
- public TestRule timeout = new DisableOnDebug(Timeout.builder()
- .withLookingForStuckThread(true)
- .withTimeout(5, TimeUnit.MINUTES)
- .build());
-
- @Test
- public void launch() throws ExecutionException, InterruptedException {
- DataCenterEdition dce = new DataCenterEdition();
- Cluster cluster = dce.getCluster();
- dce.start();
- assertThat(cluster.getNodes())
- .extracting(Cluster.Node::getType, n -> isPortBound(false, n.getEsPort()), n -> isPortBound(true, n.getWebPort()))
- .containsExactlyInAnyOrder(
- tuple(SEARCH, true, false),
- tuple(SEARCH, true, false),
- tuple(SEARCH, true, false),
- tuple(APPLICATION, false, true),
- tuple(APPLICATION, false, true)
- );
- dce.stop();
- }
-
- @Test
- public void upgrade_application_nodes_without_stopping_search_nodes_must_work() throws ExecutionException, InterruptedException, SQLException {
- DataCenterEdition dce = new DataCenterEdition();
- Cluster cluster = dce.getCluster();
- dce.start();
-
- // Stop all Application nodes
- cluster.stopAll(n -> n.getType() == APPLICATION);
-
- // Drop the schema
- Database database = cluster.getNodes().get(0).getOrchestrator().getDatabase();
- dropAndCreate(database.getClient());
- assertDatabaseDropped(database);
-
- // Start all Application nodes
- cluster.startAll(n -> n.getType() == APPLICATION);
-
- // We are expecting a new leader to be elected which will recreate the database
- assertDatabaseInitialized(database);
-
- dce.stop();
- }
-
- @Test
- public void using_different_cluster_names_should_fail() throws ExecutionException, InterruptedException, SQLException {
- Cluster cluster = Cluster.builder()
- .addNode(SEARCH, n -> n.getProperties().setProperty(Cluster.CLUSTER_NAME, "goodClusterName"))
- .addNode(SEARCH, n -> n.getProperties().setProperty(Cluster.CLUSTER_NAME, "goodClusterName"))
- .addNode(SEARCH, n -> n.getProperties().setProperty(Cluster.CLUSTER_NAME, "goodClusterName"))
- .addNode(APPLICATION, n -> n.getProperties().setProperty(Cluster.CLUSTER_NAME, "goodClusterName"))
- .addNode(APPLICATION, n -> n.getProperties().setProperty(Cluster.CLUSTER_NAME, "badClusterName"))
- .build();
- cluster.startAll(n -> "goodClusterName".equals(n.getProperties().getProperty(Cluster.CLUSTER_NAME)));
-
- try {
- cluster.startAll(n -> "badClusterName".equals(n.getProperties().getProperty(Cluster.CLUSTER_NAME)));
- fail("A node with a bad cluster name was able to join the cluster");
- } catch (Exception e) {
- // we expect, that joining the cluster fails
- System.out.println(e);
- }
- }
-
- private void assertDatabaseInitialized(Database database) {
- assertThat(countRowsOfMigration(database)).isGreaterThan(0);
- }
-
- private int countRowsOfMigration(Database database) {
- return database.countSql("select count(*) from schema_migrations");
- }
-
- private void assertDatabaseDropped(Database database) {
- try {
- countRowsOfMigration(database);
- fail("Table 'schema_migrations' has not been dropped");
- } catch (Exception e) {
- // we expect the table to not exist
- }
- }
-
- private static boolean isPortBound(boolean loopback, @Nullable Integer port) {
- if (port == null) {
- return false;
- }
- InetAddress inetAddress = loopback ? InetAddress.getLoopbackAddress() : Cluster.getNonloopbackIPv4Address();
- try (ServerSocket socket = new ServerSocket(port, 50, inetAddress)) {
- throw new IllegalStateException("A port was set explicitly, but was not bound (port="+port+")");
- } catch (IOException e) {
- return true;
- }
- }
-
- private static void dropAndCreate(DatabaseClient databaseClient) throws SQLException {
- try (Connection connection = databaseClient.openRootConnection()) {
- executeDdl(connection, databaseClient.getDropDdl());
- executeDdl(connection, databaseClient.getCreateDdl());
- }
- }
-
- private static void executeDdl(Connection connection, String... ddls) throws SQLException {
- try (Statement stmt = connection.createStatement()) {
- for (String ddl : ddls) {
- stmt.executeUpdate(ddl);
- }
- }
- }
-}
--- /dev/null
+/*
+ * SonarQube
+ * Copyright (C) 2009-2017 SonarSource SA
+ * mailto:info AT sonarsource DOT com
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 3 of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this program; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ */
+package org.sonarqube.tests.cluster;
+
+import com.google.common.util.concurrent.Uninterruptibles;
+import com.sonar.orchestrator.Orchestrator;
+import java.io.File;
+import java.io.IOException;
+import java.net.ServerSocket;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import javax.annotation.Nullable;
+import org.apache.commons.io.FileUtils;
+import util.ItUtils;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+class Node {
+
+ private final NodeConfig config;
+ private final Orchestrator orchestrator;
+
+ Node(NodeConfig config, Orchestrator orchestrator) {
+ this.config = config;
+ this.orchestrator = orchestrator;
+ }
+
+ NodeConfig getConfig() {
+ return config;
+ }
+
+ /**
+ * Non-blocking startup of node. The method does not wait for
+ * node to be started because Orchestrator uses a StartupLogWatcher
+ * that returns as soon as a log is generated.
+ */
+ void start() {
+ orchestrator.start();
+ }
+
+ void stop() {
+ orchestrator.stop();
+ }
+
+ void cleanUpLogs() {
+ if (orchestrator.getServer() != null) {
+ FileUtils.deleteQuietly(orchestrator.getServer().getWebLogs());
+ FileUtils.deleteQuietly(orchestrator.getServer().getCeLogs());
+ FileUtils.deleteQuietly(orchestrator.getServer().getEsLogs());
+ FileUtils.deleteQuietly(orchestrator.getServer().getAppLogs());
+ }
+ }
+
+ boolean isStartupLeader() {
+ return webLogsContain("Cluster enabled (startup leader)");
+ }
+
+ boolean isStartupFollower() {
+ return webLogsContain("Cluster enabled (startup follower)");
+ }
+
+ void waitForStatusUp() {
+ waitForStatus("UP");
+ }
+
+ void waitForStatus(String expectedStatus) {
+ String status = null;
+ try {
+ while (!expectedStatus.equals(status)) {
+ if (orchestrator.getServer() != null) {
+ try {
+ Map<String, Object> json = ItUtils.jsonToMap(orchestrator.getServer().newHttpCall("api/system/status").executeUnsafely().getBodyAsString());
+ status = (String) json.get("status");
+ } catch (Exception e) {
+ // ignored
+ }
+ }
+
+ Thread.sleep(500);
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ }
+
+ void assertThatProcessesAreUp() {
+ assertThat(arePortsBound()).as(getConfig().getType().toString()).isTrue();
+ switch (config.getType()) {
+ case SEARCH:
+ assertThat(anyLogsContain("Process[es] is up")).isTrue();
+ assertThat(anyLogsContain("Process[web] is up")).isFalse();
+ assertThat(anyLogsContain("Elasticsearch cluster enabled")).isTrue();
+ break;
+ case APPLICATION:
+ assertThat(anyLogsContain("Process[es] is up")).isFalse();
+ assertThat(anyLogsContain("Process[web] is up")).isTrue();
+ assertThat(anyLogsContain("Elasticsearch cluster enabled")).isFalse();
+ break;
+ }
+ }
+
+ void waitForCeLogsContain(String expectedMessage) {
+ boolean found = false;
+ while (!found) {
+ found = orchestrator.getServer() != null && fileContains(orchestrator.getServer().getCeLogs(), expectedMessage);
+ if (!found) {
+ Uninterruptibles.sleepUninterruptibly(1_000, TimeUnit.MILLISECONDS);
+ }
+ }
+ }
+
+ boolean hasStartupLeaderOperations() throws IOException {
+ if (orchestrator.getServer() == null) {
+ return false;
+ }
+ String logs = FileUtils.readFileToString(orchestrator.getServer().getWebLogs());
+ return logs.contains("Register metrics") &&
+ logs.contains("Register rules");
+ }
+
+ boolean hasCreatedSearchIndices() throws IOException {
+ if (orchestrator.getServer() == null) {
+ return false;
+ }
+ String logs = FileUtils.readFileToString(orchestrator.getServer().getWebLogs());
+ return logs.contains("[o.s.s.e.IndexCreator] Create index");
+ }
+
+ boolean anyLogsContain(String message) {
+ if (orchestrator.getServer() == null) {
+ return false;
+ }
+ return fileContains(orchestrator.getServer().getAppLogs(), message) ||
+ fileContains(orchestrator.getServer().getWebLogs(), message) ||
+ fileContains(orchestrator.getServer().getEsLogs(), message) ||
+ fileContains(orchestrator.getServer().getCeLogs(), message);
+ }
+
+ private boolean webLogsContain(String message) {
+ if (orchestrator.getServer() == null) {
+ return false;
+ }
+ return fileContains(orchestrator.getServer().getWebLogs(), message);
+ }
+
+ private static boolean fileContains(@Nullable File logFile, String message) {
+ try {
+ return logFile != null && logFile.exists() && FileUtils.readFileToString(logFile).contains(message);
+ } catch (Exception e) {
+ throw new IllegalStateException(e);
+ }
+ }
+
+ private boolean arePortsBound() {
+ return isPortBound(config.getHzPort()) &&
+ config.getSearchPort().map(this::isPortBound).orElse(true) &&
+ config.getWebPort().map(this::isPortBound).orElse(true);
+ }
+
+ private boolean isPortBound(int port) {
+ try (ServerSocket socket = new ServerSocket(port, 50, config.getAddress())) {
+ return false;
+ } catch (IOException e) {
+ return true;
+ }
+ }
+
+}
--- /dev/null
+/*
+ * SonarQube
+ * Copyright (C) 2009-2017 SonarSource SA
+ * mailto:info AT sonarsource DOT com
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 3 of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this program; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ */
+package org.sonarqube.tests.cluster;
+
+import com.sonar.orchestrator.util.NetworkUtils;
+import java.net.Inet4Address;
+import java.net.InetAddress;
+import java.net.NetworkInterface;
+import java.net.SocketException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Enumeration;
+import java.util.List;
+import java.util.Optional;
+import javax.annotation.Nullable;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+class NodeConfig {
+
+ enum NodeType {
+ SEARCH("search"), APPLICATION("application");
+
+ final String value;
+
+ NodeType(String value) {
+ this.value = value;
+ }
+
+ String getValue() {
+ return value;
+ }
+ }
+
+ private final NodeType type;
+ @Nullable
+ private final String name;
+ private final InetAddress address;
+ private final int hzPort;
+ @Nullable
+ private final Integer searchPort;
+ @Nullable
+ private final Integer webPort;
+ private final List<NodeConfig> connectedNodes = new ArrayList<>();
+ private final List<NodeConfig> searchNodes = new ArrayList<>();
+
+ private NodeConfig(NodeType type, @Nullable String name) {
+ this.type = type;
+ this.name = name;
+ this.address = getNonLoopbackIpv4Address();
+ this.hzPort = NetworkUtils.getNextAvailablePort(this.address);
+ this.connectedNodes.add(this);
+ switch (type) {
+ case SEARCH:
+ this.searchPort = NetworkUtils.getNextAvailablePort(this.address);
+ this.webPort = null;
+ this.searchNodes.add(this);
+ break;
+ case APPLICATION:
+ this.searchPort = null;
+ this.webPort = NetworkUtils.getNextAvailablePort(this.address);
+ break;
+ default:
+ throw new IllegalArgumentException();
+ }
+ }
+
+ NodeType getType() {
+ return type;
+ }
+
+ Optional<String> getName() {
+ return Optional.ofNullable(name);
+ }
+
+ InetAddress getAddress() {
+ return address;
+ }
+
+ int getHzPort() {
+ return hzPort;
+ }
+
+ Optional<Integer> getSearchPort() {
+ return Optional.ofNullable(searchPort);
+ }
+
+ Optional<Integer> getWebPort() {
+ return Optional.ofNullable(webPort);
+ }
+
+ String getHzHost() {
+ return address.getHostAddress() + ":" + hzPort;
+ }
+
+ String getSearchHost() {
+ return address.getHostAddress() + ":" + searchPort;
+ }
+
+ NodeConfig addConnectionToBus(NodeConfig... configs) {
+ connectedNodes.addAll(Arrays.asList(configs));
+ return this;
+ }
+
+ NodeConfig addConnectionToSearch(NodeConfig... configs) {
+ Arrays.stream(configs).forEach(config -> {
+ checkArgument(config.getType() == NodeType.SEARCH);
+ searchNodes.add(config);
+ });
+ return this;
+ }
+
+ List<NodeConfig> getConnectedNodes() {
+ return connectedNodes;
+ }
+
+ List<NodeConfig> getSearchNodes() {
+ return searchNodes;
+ }
+
+ static NodeConfig newApplicationConfig() {
+ return new NodeConfig(NodeType.APPLICATION, null);
+ }
+
+ static NodeConfig newSearchConfig() {
+ return new NodeConfig(NodeType.SEARCH, null);
+ }
+
+ /**
+ * See property sonar.cluster.hosts
+ */
+ static void interconnectBus(NodeConfig... configs) {
+ Arrays.stream(configs).forEach(config -> Arrays.stream(configs).filter(c -> c != config).forEach(config::addConnectionToBus));
+ }
+
+ /**
+ * See property sonar.cluster.search.hosts
+ */
+ static void interconnectSearch(NodeConfig... configs) {
+ Arrays.stream(configs).forEach(config -> Arrays.stream(configs)
+ .filter(c -> c.getType() == NodeType.SEARCH)
+ .forEach(config::addConnectionToSearch));
+ }
+
+ private static InetAddress getNonLoopbackIpv4Address() {
+ try {
+ Enumeration<NetworkInterface> nets = NetworkInterface.getNetworkInterfaces();
+ for (NetworkInterface networkInterface : Collections.list(nets)) {
+ if (!networkInterface.isLoopback() && networkInterface.isUp() && !isBlackListed(networkInterface)) {
+ Enumeration<InetAddress> inetAddresses = networkInterface.getInetAddresses();
+ while (inetAddresses.hasMoreElements()) {
+ InetAddress inetAddress = inetAddresses.nextElement();
+ if (inetAddress instanceof Inet4Address) {
+ return inetAddress;
+ }
+ }
+ }
+ }
+ } catch (SocketException se) {
+ throw new RuntimeException("Cannot find a non loopback card required for tests", se);
+ }
+ throw new RuntimeException("Cannot find a non loopback card required for tests");
+ }
+
+ private static boolean isBlackListed(NetworkInterface networkInterface) {
+ return networkInterface.getName().startsWith("docker") ||
+ networkInterface.getName().startsWith("vboxnet");
+ }
+}
+++ /dev/null
-/*
- * SonarQube
- * Copyright (C) 2009-2017 SonarSource SA
- * mailto:info AT sonarsource DOT com
- *
- * This program is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License as published by the Free Software Foundation; either
- * version 3 of the License, or (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public License
- * along with this program; if not, write to the Free Software Foundation,
- * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
- */
-
-package org.sonarqube.tests.cluster;
-
-
-import com.sonar.orchestrator.server.StartupLogWatcher;
-
-public class StartupLogWatcherImpl implements StartupLogWatcher {
- private static final String STARTUP_EXPECTED_MESSAGE = "SonarQube is up";
-
- @Override
- public boolean isStarted(String logLine) {
- return logLine.contains(STARTUP_EXPECTED_MESSAGE);
- }
-}
+++ /dev/null
-/*
- * SonarQube
- * Copyright (C) 2009-2017 SonarSource SA
- * mailto:info AT sonarsource DOT com
- *
- * This program is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License as published by the Free Software Foundation; either
- * version 3 of the License, or (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public License
- * along with this program; if not, write to the Free Software Foundation,
- * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
- */
-package org.sonarqube.tests.serverSystem;
-
-import com.google.common.base.Joiner;
-import com.google.common.collect.ImmutableMap;
-import com.sonar.orchestrator.Orchestrator;
-import com.sonar.orchestrator.server.StartupLogWatcher;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.nio.file.Files;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
-import org.apache.commons.io.FileUtils;
-import org.apache.commons.lang.StringUtils;
-import org.junit.Ignore;
-import org.junit.Test;
-import org.sonarqube.ws.Issues;
-import org.sonarqube.ws.Settings;
-import org.sonarqube.ws.client.rule.SearchWsRequest;
-import org.sonarqube.ws.client.setting.ValuesRequest;
-import util.ItUtils;
-
-import static org.apache.commons.lang3.StringUtils.containsIgnoreCase;
-import static org.assertj.core.api.Assertions.assertThat;
-import static util.ItUtils.newWsClient;
-
-@Ignore("temporarily ignored")
-public class ClusterTest {
-
- private static final String CONF_FILE_PATH = "conf/sonar.properties";
-
- /**
- * SONAR-7899
- */
- @Test
- public void secondary_nodes_do_not_write_to_datastores_at_startup() throws Exception {
- // start "startup leader", which creates and populates datastores
- Orchestrator orchestrator = Orchestrator.builderEnv()
- .setServerProperty("sonar.cluster.enabled", "true")
- .setServerProperty("sonar.cluster.name", "secondary_nodes_do_not_write_to_datastores_at_startup")
- .setServerProperty("sonar.cluster.web.startupLeader", "true")
- .setServerProperty("sonar.log.level", "TRACE")
- .addPlugin(ItUtils.xooPlugin())
- .build();
- orchestrator.start();
-
- expectLog(orchestrator, "Cluster enabled (startup leader)");
- expectWriteOperations(orchestrator, true);
- // verify that datastores are populated by requesting rules
- assertThat(newWsClient(orchestrator).rules().search(new SearchWsRequest()).getTotal()).isGreaterThan(0);
-
- FileUtils.write(orchestrator.getServer().getWebLogs(), "", false);
- updateSonarPropertiesFile(orchestrator, ImmutableMap.of("sonar.cluster.web.startupLeader", "false"));
- orchestrator.restartServer();
-
- expectLog(orchestrator, "Cluster enabled (startup follower)");
- expectWriteOperations(orchestrator, false);
-
- orchestrator.stop();
- }
-
- @Test
- public void start_cluster_of_elasticsearch_and_web_nodes() throws IOException {
- Orchestrator elasticsearch = null;
- Orchestrator web = null;
-
- try {
- ElasticsearchStartupWatcher esWatcher = new ElasticsearchStartupWatcher();
- elasticsearch = Orchestrator.builderEnv()
- .setServerProperty("sonar.cluster.enabled", "true")
- .setServerProperty("sonar.cluster.name", "start_cluster_of_elasticsearch_and_web_nodes")
- .setServerProperty("sonar.cluster.web.disabled", "true")
- .setServerProperty("sonar.cluster.ce.disabled", "true")
- .setStartupLogWatcher(esWatcher)
- .build();
- elasticsearch.start();
- assertThat(esWatcher.port).isGreaterThan(0);
- assertThat(FileUtils.readFileToString(elasticsearch.getServer().getAppLogs())).doesNotContain("Process[web]");
-
- web = Orchestrator.builderEnv()
- .setServerProperty("sonar.cluster.enabled", "true")
- .setServerProperty("sonar.cluster.name", "start_cluster_of_elasticsearch_and_web_nodes")
- .setServerProperty("sonar.cluster.web.startupLeader", "true")
- .setServerProperty("sonar.cluster.search.disabled", "true")
- .setServerProperty("sonar.cluster.search.hosts", "localhost:" + esWatcher.port)
- // no need for compute engine in this test. Disable it for faster test.
- .setServerProperty("sonar.cluster.ce.disabled", "true")
- // override the default watcher provided by Orchestrator
- // which waits for Compute Engine to be up
- .setStartupLogWatcher(log -> log.contains("SonarQube is up"))
- .build();
- web.start();
-
- String coreId = getPropertyValue(web, "sonar.core.id");
- String startTime = getPropertyValue(web, "sonar.core.startTime");
-
- assertThat(FileUtils.readFileToString(web.getServer().getAppLogs())).doesNotContain("Process[es]");
- // call a web service that requires Elasticsearch
- Issues.SearchWsResponse wsResponse = newWsClient(web).issues().search(new org.sonarqube.ws.client.issue.SearchWsRequest());
- assertThat(wsResponse.getIssuesCount()).isEqualTo(0);
-
- web.restartServer();
-
- // sonar core id must not change after restart
- assertThat(getPropertyValue(web, "sonar.core.id")).isEqualTo(coreId);
- // startTime must change at each startup
- assertThat(getPropertyValue(web, "sonar.core.startTime")).isNotEqualTo(startTime);
- } finally {
- if (web != null) {
- web.stop();
- }
- if (elasticsearch != null) {
- elasticsearch.stop();
- }
- }
- }
-
- private static String getPropertyValue(Orchestrator web, String property) {
- Settings.ValuesWsResponse response = ItUtils.newAdminWsClient(web).settings().values(ValuesRequest.builder().setKeys(property).build());
- List<Settings.Setting> settingsList = response.getSettingsList();
- if (settingsList.isEmpty()) {
- return null;
- }
- assertThat(settingsList).hasSize(1);
- return settingsList.iterator().next().getValue();
- }
-
- private static class ElasticsearchStartupWatcher implements StartupLogWatcher {
- private final Pattern pattern = Pattern.compile("Elasticsearch listening on .*:(\\d+)");
- private int port = -1;
-
- @Override
- public boolean isStarted(String log) {
- Matcher matcher = pattern.matcher(log);
- if (matcher.find()) {
- port = Integer.parseInt(matcher.group(1));
- }
- return log.contains("Process[es] is up");
- }
- }
-
- private static void expectLog(Orchestrator orchestrator, String expectedLog) throws IOException {
- File logFile = orchestrator.getServer().getWebLogs();
- try (Stream<String> lines = Files.lines(logFile.toPath())) {
- assertThat(lines.anyMatch(s -> StringUtils.containsIgnoreCase(s, expectedLog))).isTrue();
- }
- }
-
- private static void expectWriteOperations(Orchestrator orchestrator, boolean expected) throws IOException {
- try (Stream<String> lines = Files.lines(orchestrator.getServer().getWebLogs().toPath())) {
- List<String> writeOperations = lines.filter(ClusterTest::isWriteOperation).collect(Collectors.toList());
- if (expected) {
- assertThat(writeOperations).isNotEmpty();
- } else {
- assertThat(writeOperations).as("Unexpected write operations: " + Joiner.on('\n').join(writeOperations)).isEmpty();
-
- }
- }
- }
-
- private static boolean isWriteOperation(String log) {
- return isDbWriteOperation(log) || isEsWriteOperation(log);
- }
-
- private static boolean isDbWriteOperation(String log) {
- return log.contains("web[][sql]") && (containsIgnoreCase(log, "sql=insert") ||
- containsIgnoreCase(log, "sql=update") ||
- containsIgnoreCase(log, "sql=delete") ||
- containsIgnoreCase(log, "sql=create"));
- }
-
- private static boolean isEsWriteOperation(String log) {
- return log.contains("web[][es]") && (containsIgnoreCase(log, "Create index") ||
- containsIgnoreCase(log, "Create type") ||
- containsIgnoreCase(log, "put mapping request") ||
- containsIgnoreCase(log, "refresh request") ||
- containsIgnoreCase(log, "index request"));
- }
-
- private static void updateSonarPropertiesFile(Orchestrator orchestrator, Map<String, String> props) throws IOException {
- Properties propsFile = new Properties();
- try (FileInputStream conf = FileUtils.openInputStream(new File(orchestrator.getServer().getHome(), CONF_FILE_PATH))) {
- propsFile.load(conf);
- propsFile.putAll(props);
- }
- try (FileOutputStream conf = FileUtils.openOutputStream(new File(orchestrator.getServer().getHome(), CONF_FILE_PATH))) {
- propsFile.store(conf, "");
- }
- }
-}