*/
import org.sonar.api.Plugin;
+import workerlatch.LatchControllerWorkerMeasureComputer;
+import workerlatch.WorkerLatchMetrics;
public class FakeGovernancePlugin implements Plugin {
// Nothing should be loaded when the plugin is running within by the scanner
if (isRunningInSQ()) {
context.addExtension(FakeWorkerCountProviderImpl.class);
+ context.addExtension(WorkerLatchMetrics.class);
+ context.addExtension(LatchControllerWorkerMeasureComputer.class);
}
}
--- /dev/null
+/*
+ * SonarQube
+ * Copyright (C) 2009-2017 SonarSource SA
+ * mailto:info AT sonarsource DOT com
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 3 of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this program; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ */
+package workerlatch;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.MappedByteBuffer;
+import java.nio.channels.FileChannel;
+import org.sonar.api.ce.measure.Component;
+import org.sonar.api.ce.measure.MeasureComputer;
+import org.sonar.api.config.Configuration;
+import org.sonar.api.utils.log.Logger;
+import org.sonar.api.utils.log.Loggers;
+
+public class LatchControllerWorkerMeasureComputer implements MeasureComputer {
+ private static final Logger LOG = Loggers.get(LatchControllerWorkerMeasureComputer.class);
+ private static final String WORKER_LATCH_SHARED_MEMORY_FILE_PATH = "fakeGoverance.workerLatch.sharedMemoryFile";
+
+ private static final int WAIT = 200; // ms
+ private static final int MAX_WAIT_ROUND = 5 * 60 * 2; // 2 minutes
+
+ private static final int MAX_SHARED_MEMORY = 1;
+ private static final byte UNLATCHED = (byte) 0x01;
+
+ private final File sharedMemory;
+
+ public LatchControllerWorkerMeasureComputer(Configuration configuration) {
+ this.sharedMemory = configuration.get(WORKER_LATCH_SHARED_MEMORY_FILE_PATH)
+ .map(path -> {
+ File file = new File(path);
+ if (file.exists() && file.isFile()) {
+ return file;
+ }
+ LOG.info("Latch sharedMemory file {} is not a file or does not exist", path);
+ return null;
+ })
+ .orElse(null);
+ }
+
+ private boolean isLatchEnabled() {
+ return sharedMemory != null;
+ }
+
+ @Override
+ public MeasureComputerDefinition define(MeasureComputerDefinitionContext defContext) {
+ return defContext.newDefinitionBuilder()
+ .setOutputMetrics(WorkerLatchMetrics.METRIC_KEY)
+ .build();
+ }
+
+ @Override
+ public void compute(MeasureComputerContext context) {
+ Component component = context.getComponent();
+ if (isLatchEnabled() && component.getType() == Component.Type.PROJECT) {
+ context.addMeasure(WorkerLatchMetrics.METRIC_KEY, waitForUnlatched(component.getKey()));
+ } else {
+ context.addMeasure(WorkerLatchMetrics.METRIC_KEY, false);
+ }
+ }
+
+ private boolean waitForUnlatched(String key) {
+ RandomAccessFile randomAccessFile = null;
+ try {
+ randomAccessFile = new RandomAccessFile(sharedMemory, "rw");
+ MappedByteBuffer mappedByteBuffer = randomAccessFile.getChannel().map(FileChannel.MapMode.READ_WRITE, 0, MAX_SHARED_MEMORY);
+ int i = 0;
+ boolean unlatched = isUnlatched(mappedByteBuffer);
+ while (!unlatched && i < MAX_WAIT_ROUND) {
+ waitInterruptedly();
+ i++;
+ unlatched = isUnlatched(mappedByteBuffer);
+ }
+ LOG.info("Project {} unlatched={} i={}", key, unlatched, i);
+
+ return true;
+ } catch (IOException e) {
+ LOG.error("Failed to read or write to shared memory", e);
+ return false;
+ } finally {
+ if (randomAccessFile != null) {
+ try {
+ randomAccessFile.close();
+ } catch (IOException e) {
+ LOG.error("Failed to close randomAccessFile", e);
+ }
+ }
+ }
+ }
+
+ private static void waitInterruptedly() {
+ try {
+ Thread.sleep(WAIT);
+ } catch (InterruptedException e) {
+ LOG.error("Wait was interrupted");
+ }
+ }
+
+ private boolean isUnlatched(MappedByteBuffer mappedByteBuffer) {
+ return mappedByteBuffer.get(0) == UNLATCHED;
+ }
+
+}
--- /dev/null
+/*
+ * SonarQube
+ * Copyright (C) 2009-2017 SonarSource SA
+ * mailto:info AT sonarsource DOT com
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 3 of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this program; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ */
+package workerlatch;
+
+import java.util.Collections;
+import java.util.List;
+import org.sonar.api.measures.Metric;
+import org.sonar.api.measures.Metrics;
+
+public class WorkerLatchMetrics implements Metrics {
+ static final String METRIC_KEY = "WORKER_LATCH";
+
+ @Override
+ public List<Metric> getMetrics() {
+ return Collections.singletonList(
+ new Metric.Builder(METRIC_KEY, "Worker latch", Metric.ValueType.BOOL)
+ .setHidden(true)
+ .create());
+ }
+}
import org.junit.runner.RunWith;
import org.junit.runners.Suite;
-import org.sonarqube.tests.ce.WorkerCountTest;
+import org.sonarqube.tests.ce.CeWorkersTest;
import org.sonarqube.tests.qualityProfile.ActiveRuleEsResilienceTest;
import org.sonarqube.tests.qualityProfile.BuiltInQualityProfilesNotificationTest;
import org.sonarqube.tests.rule.RuleEsResilienceTest;
RuleEsResilienceTest.class,
UserEsResilienceTest.class,
// ce
- WorkerCountTest.class
+ CeWorkersTest.class
})
public class Category5Suite {
--- /dev/null
+/*
+ * SonarQube
+ * Copyright (C) 2009-2017 SonarSource SA
+ * mailto:info AT sonarsource DOT com
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 3 of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this program; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ */
+package org.sonarqube.tests.ce;
+
+import com.google.common.collect.ImmutableList;
+import com.sonar.orchestrator.Orchestrator;
+import com.sonar.orchestrator.OrchestratorBuilder;
+import com.sonar.orchestrator.build.SonarScanner;
+import java.io.File;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.MappedByteBuffer;
+import java.nio.channels.FileChannel;
+import java.nio.file.Files;
+import java.util.List;
+import java.util.Random;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.sonarqube.ws.WsCe;
+import org.sonarqube.ws.client.WsClient;
+import org.sonarqube.ws.client.ce.ActivityWsRequest;
+import util.ItUtils;
+
+import static com.google.common.collect.ImmutableSet.copyOf;
+import static java.lang.String.valueOf;
+import static java.util.stream.Collectors.toList;
+import static java.util.stream.Collectors.toSet;
+import static org.assertj.core.api.Assertions.assertThat;
+import static util.ItUtils.newAdminWsClient;
+import static util.ItUtils.pluginArtifact;
+import static util.ItUtils.xooPlugin;
+
+public class CeWorkersTest {
+ /** 2 <= workerCount <= 5 */
+ private static final int WORKER_COUNT = 2 + new Random().nextInt(4);
+
+ private static final int WAIT = 200; // ms
+ private static final int MAX_WAIT_LOOP = 5 * 10; // 10s
+
+ private static final byte BLOCKING = (byte) 0x00;
+ private static final byte UNLATCHED = (byte) 0x01;
+
+ private static final String STATUS_PENDING = "PENDING";
+ private static final String STATUS_IN_PROGRESS = "IN_PROGRESS";
+ private static final String STATUS_SUCCESS = "SUCCESS";
+ private static final String STATUS_FAILED = "FAILED";
+
+ private static final String STATUS_CANCELED = "CANCELED";
+
+ @ClassRule
+ public static TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+ private static File sharedMemory;
+ private static Orchestrator orchestrator;
+
+ @BeforeClass
+ public static void setUp() throws Exception {
+ sharedMemory = temporaryFolder.newFile();
+
+ OrchestratorBuilder builder = Orchestrator.builderEnv()
+ .addPlugin(pluginArtifact("fake-governance-plugin"))
+ .setServerProperty("fakeGovernance.workerCount", valueOf(WORKER_COUNT))
+ .setServerProperty("fakeGoverance.workerLatch.sharedMemoryFile", sharedMemory.getAbsolutePath())
+ .addPlugin(xooPlugin());
+ orchestrator = builder.build();
+ orchestrator.start();
+ }
+
+ @AfterClass
+ public static void stop() {
+ if (orchestrator != null) {
+ orchestrator.stop();
+ orchestrator = null;
+ }
+ }
+
+ @Test
+ public void workerCount_is_modified_by_plugin() throws IOException {
+ Set<String> line = Files.lines(orchestrator.getServer().getCeLogs().toPath())
+ .filter(s -> s.contains("Compute Engine will use "))
+ .collect(Collectors.toSet());
+ assertThat(line)
+ .hasSize(1);
+ assertThat(line.iterator().next()).contains(valueOf(WORKER_COUNT));
+
+ assertThat(newAdminWsClient(orchestrator).ce().workerCount())
+ .extracting(WsCe.WorkerCountResponse::getValue, WsCe.WorkerCountResponse::getCanSetWorkerCount)
+ .containsOnly(WORKER_COUNT, true);
+ }
+
+ @Test
+ public void number_of_analysis_processed_in_parallel_is_equal_to_number_of_workers() throws IOException {
+ RandomAccessFile randomAccessFile = null;
+ try {
+ randomAccessFile = new RandomAccessFile(sharedMemory, "rw");
+ MappedByteBuffer mappedByteBuffer = randomAccessFile.getChannel().map(FileChannel.MapMode.READ_WRITE, 0, 1);
+ // block any analysis which will run with the fake-governance-plugin
+ mappedByteBuffer.put(0, BLOCKING);
+
+ // start analysis of WORKER_COUNT + 2 projects
+ List<String> projectKeys = IntStream.range(0, WORKER_COUNT + 2).mapToObj(i -> "prj" + i).collect(toList());
+ for (String projectKey : projectKeys) {
+ SonarScanner sonarRunner = SonarScanner.create(ItUtils.projectDir("shared/xoo-sample"))
+ .setProperties("sonar.projectKey", projectKey);
+ orchestrator.executeBuild(sonarRunner, false);
+ }
+
+ List<WsCe.Task> tasksList = waitForWsCallStatus(this::getTasksAllTasks, CeWorkersTest::threeTasksNotPending);
+ boolean expectedState = threeTasksNotPending(tasksList);
+ // let the blocked analyses finish running
+ mappedByteBuffer.put(0, UNLATCHED);
+
+ assertThat(expectedState).as("Couldn't get to the expected CE queue state in time").isTrue();
+ assertThat(tasksList.stream()
+ .filter(CeWorkersTest::pending)
+ .map(WsCe.Task::getComponentKey)
+ .collect(toSet()))
+ .isEqualTo(copyOf(projectKeys.subList(WORKER_COUNT, projectKeys.size())));
+ assertThat(tasksList.stream()
+ .filter(CeWorkersTest::inProgress)
+ .map(WsCe.Task::getComponentKey)
+ .collect(toSet()))
+ .isEqualTo(copyOf(projectKeys.subList(0, WORKER_COUNT)));
+
+ waitForWsCallStatus(this::getTasksAllTasks, tasks -> tasks.stream().noneMatch(CeWorkersTest::pending));
+ } finally {
+ if (randomAccessFile != null) {
+ randomAccessFile.close();
+ }
+ }
+ }
+
+ private static boolean threeTasksNotPending(List<WsCe.Task> tasksList) {
+ return tasksList.stream().filter(task -> !pending(task)).count() >= WORKER_COUNT;
+ }
+
+ private static boolean pending(WsCe.Task task) {
+ return WsCe.TaskStatus.PENDING == task.getStatus();
+ }
+
+ private static boolean inProgress(WsCe.Task task) {
+ return WsCe.TaskStatus.IN_PROGRESS == task.getStatus();
+ }
+
+ private List<WsCe.Task> getTasksAllTasks(WsClient wsClient) {
+ return wsClient.ce().activity(new ActivityWsRequest()
+ .setStatus(ImmutableList.of(STATUS_PENDING, STATUS_IN_PROGRESS, STATUS_SUCCESS, STATUS_FAILED, STATUS_CANCELED)))
+ .getTasksList();
+ }
+
+ private <T> T waitForWsCallStatus(Function<WsClient, T> call, Function<T, Boolean> test) {
+ WsClient wsClient = ItUtils.newAdminWsClient(orchestrator);
+ int i = 0;
+ T returnValue = call.apply(wsClient);
+ boolean expectedState = test.apply(returnValue);
+ while (i < MAX_WAIT_LOOP && !expectedState) {
+ waitInterruptedly();
+ i++;
+ returnValue = call.apply(wsClient);
+ expectedState = test.apply(returnValue);
+ }
+ return returnValue;
+ }
+
+ private static void waitInterruptedly() {
+ try {
+ Thread.sleep(WAIT);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+}
+++ /dev/null
-/*
- * SonarQube
- * Copyright (C) 2009-2017 SonarSource SA
- * mailto:info AT sonarsource DOT com
- *
- * This program is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License as published by the Free Software Foundation; either
- * version 3 of the License, or (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public License
- * along with this program; if not, write to the Free Software Foundation,
- * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
- */
-package org.sonarqube.tests.ce;
-
-import com.sonar.orchestrator.Orchestrator;
-import com.sonar.orchestrator.OrchestratorBuilder;
-import java.io.IOException;
-import java.nio.file.Files;
-import java.util.Set;
-import java.util.stream.Collectors;
-import org.junit.After;
-import org.junit.Test;
-import org.sonarqube.ws.WsCe.WorkerCountResponse;
-
-import static java.lang.Integer.parseInt;
-import static org.assertj.core.api.Assertions.assertThat;
-import static util.ItUtils.newAdminWsClient;
-import static util.ItUtils.pluginArtifact;
-
-public class WorkerCountTest {
-
- private Orchestrator orchestrator;
-
- @After
- public void stop() {
- if (orchestrator != null) {
- orchestrator.stop();
- orchestrator = null;
- }
- }
-
- @Test
- public void workerCount_can_be_controlled_via_plugin() throws IOException {
- String workerCount = "5";
- OrchestratorBuilder builder = Orchestrator.builderEnv()
- .addPlugin(pluginArtifact("fake-governance-plugin"))
- .setServerProperty("fakeGovernance.workerCount", workerCount);
- orchestrator = builder.build();
- orchestrator.start();
-
- Set<String> line = Files.lines(orchestrator.getServer().getCeLogs().toPath())
- .filter(s -> s.contains("Compute Engine will use "))
- .collect(Collectors.toSet());
- assertThat(line)
- .hasSize(1);
- assertThat(line.iterator().next()).contains(workerCount);
-
- assertThat(newAdminWsClient(orchestrator).ce().workerCount())
- .extracting(WorkerCountResponse::getValue, WorkerCountResponse::getCanSetWorkerCount)
- .containsOnly(parseInt(workerCount), true);
- }
-}