diff options
author | Sébastien Lesaint <sebastien.lesaint@sonarsource.com> | 2017-07-10 10:49:46 +0200 |
---|---|---|
committer | Sébastien Lesaint <sebastien.lesaint@sonarsource.com> | 2017-07-18 08:51:46 +0200 |
commit | 179311a789bbfb5025f446d3259c9ae0bac2cc95 (patch) | |
tree | f8e283fd0344fe1f29c2450a010952f417c6b613 /tests/src | |
parent | 601fafa4f5e03a83c0fd48a1f88b5ae9fae2e20d (diff) | |
download | sonarqube-179311a789bbfb5025f446d3259c9ae0bac2cc95.tar.gz sonarqube-179311a789bbfb5025f446d3259c9ae0bac2cc95.zip |
SONAR-9525 add IT on dynamic update of CE worker count
Diffstat (limited to 'tests/src')
-rw-r--r-- | tests/src/test/java/org/sonarqube/tests/ce/CeWorkersTest.java | 158 |
1 files changed, 99 insertions, 59 deletions
diff --git a/tests/src/test/java/org/sonarqube/tests/ce/CeWorkersTest.java b/tests/src/test/java/org/sonarqube/tests/ce/CeWorkersTest.java index 92a9482d852..747602c0850 100644 --- a/tests/src/test/java/org/sonarqube/tests/ce/CeWorkersTest.java +++ b/tests/src/test/java/org/sonarqube/tests/ce/CeWorkersTest.java @@ -23,6 +23,7 @@ import com.google.common.collect.ImmutableList; import com.sonar.orchestrator.Orchestrator; import com.sonar.orchestrator.OrchestratorBuilder; import com.sonar.orchestrator.build.SonarScanner; +import com.sonar.orchestrator.http.HttpMethod; import java.io.File; import java.io.IOException; import java.io.RandomAccessFile; @@ -35,6 +36,7 @@ import java.util.Set; import java.util.function.Function; import java.util.stream.Collectors; import java.util.stream.IntStream; +import javax.annotation.Nullable; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.ClassRule; @@ -55,27 +57,21 @@ import static util.ItUtils.pluginArtifact; import static util.ItUtils.xooPlugin; public class CeWorkersTest { - /** 2 <= workerCount <= 5 */ - private static final int WORKER_COUNT = 2 + new Random().nextInt(4); - private static final int WAIT = 200; // ms - private static final int MAX_WAIT_LOOP = 5 * 10; // 10s + private static final int MAX_WAIT_LOOP = 5 * 60 * 5; // 5 minutes private static final byte BLOCKING = (byte) 0x00; private static final byte UNLATCHED = (byte) 0x01; private static final String STATUS_PENDING = "PENDING"; private static final String STATUS_IN_PROGRESS = "IN_PROGRESS"; - private static final String STATUS_SUCCESS = "SUCCESS"; - private static final String STATUS_FAILED = "FAILED"; - - private static final String STATUS_CANCELED = "CANCELED"; @ClassRule public static TemporaryFolder temporaryFolder = new TemporaryFolder(); private static File sharedMemory; private static Orchestrator orchestrator; + private static WsClient adminWsClient; @BeforeClass public static void setUp() throws Exception { @@ -83,11 +79,12 @@ public class CeWorkersTest { OrchestratorBuilder builder = Orchestrator.builderEnv() .addPlugin(pluginArtifact("fake-governance-plugin")) - .setServerProperty("fakeGovernance.workerCount", valueOf(WORKER_COUNT)) .setServerProperty("fakeGoverance.workerLatch.sharedMemoryFile", sharedMemory.getAbsolutePath()) .addPlugin(xooPlugin()); orchestrator = builder.build(); orchestrator.start(); + + adminWsClient = newAdminWsClient(orchestrator); } @AfterClass @@ -99,63 +96,103 @@ public class CeWorkersTest { } @Test - public void workerCount_is_modified_by_plugin() throws IOException { - Set<String> line = Files.lines(orchestrator.getServer().getCeLogs().toPath()) - .filter(s -> s.contains("Compute Engine will use ")) - .collect(Collectors.toSet()); - assertThat(line) - .hasSize(1); - assertThat(line.iterator().next()).contains(valueOf(WORKER_COUNT)); - - assertThat(newAdminWsClient(orchestrator).ce().workerCount()) - .extracting(WsCe.WorkerCountResponse::getValue, WsCe.WorkerCountResponse::getCanSetWorkerCount) - .containsOnly(WORKER_COUNT, true); - } + public void enabled_worker_count_is_initially_1_and_can_be_changed_dynamically_by_plugin() throws IOException { + assertThat(Files.lines(orchestrator.getServer().getCeLogs().toPath()) + .filter(s -> s.contains("Compute Engine will use "))) + .isEmpty(); - @Test - public void number_of_analysis_processed_in_parallel_is_equal_to_number_of_workers() throws IOException { RandomAccessFile randomAccessFile = null; try { randomAccessFile = new RandomAccessFile(sharedMemory, "rw"); - MappedByteBuffer mappedByteBuffer = randomAccessFile.getChannel().map(FileChannel.MapMode.READ_WRITE, 0, 1); - // block any analysis which will run with the fake-governance-plugin - mappedByteBuffer.put(0, BLOCKING); - - // start analysis of WORKER_COUNT + 2 projects - List<String> projectKeys = IntStream.range(0, WORKER_COUNT + 2).mapToObj(i -> "prj" + i).collect(toList()); - for (String projectKey : projectKeys) { - SonarScanner sonarRunner = SonarScanner.create(ItUtils.projectDir("shared/xoo-sample")) - .setProperties("sonar.projectKey", projectKey); - orchestrator.executeBuild(sonarRunner, false); - } - - List<WsCe.Task> tasksList = waitForWsCallStatus(this::getTasksAllTasks, CeWorkersTest::threeTasksNotPending); - boolean expectedState = threeTasksNotPending(tasksList); - // let the blocked analyses finish running - mappedByteBuffer.put(0, UNLATCHED); - - assertThat(expectedState).as("Couldn't get to the expected CE queue state in time").isTrue(); - assertThat(tasksList.stream() - .filter(CeWorkersTest::pending) - .map(WsCe.Task::getComponentKey) - .collect(toSet())) - .isEqualTo(copyOf(projectKeys.subList(WORKER_COUNT, projectKeys.size()))); - assertThat(tasksList.stream() - .filter(CeWorkersTest::inProgress) - .map(WsCe.Task::getComponentKey) - .collect(toSet())) - .isEqualTo(copyOf(projectKeys.subList(0, WORKER_COUNT))); - - waitForWsCallStatus(this::getTasksAllTasks, tasks -> tasks.stream().noneMatch(CeWorkersTest::pending)); + MappedByteBuffer mappedByteBuffer = initMappedByteBuffer(randomAccessFile); + + verifyAnalysesRunInParallel(mappedByteBuffer, 1); + + /* 2 <= newWorkerCount <= 7 */ + int newWorkerCount = 4 + new Random().nextInt(4); + updateWorkerCount(newWorkerCount); + + Set<String> line = Files.lines(orchestrator.getServer().getCeLogs().toPath()) + .filter(s -> s.contains("Compute Engine will use ")) + .collect(Collectors.toSet()); + assertThat(line).hasSize(1); + assertThat(line.iterator().next()).contains(valueOf(newWorkerCount)); + + verifyAnalysesRunInParallel(mappedByteBuffer, newWorkerCount); + + int lowerWorkerCount = 3; + updateWorkerCount(lowerWorkerCount); + verifyAnalysesRunInParallel(mappedByteBuffer, lowerWorkerCount); } finally { - if (randomAccessFile != null) { - randomAccessFile.close(); - } + close(randomAccessFile); + } + } + + private void updateWorkerCount(int newWorkerCount) { + orchestrator.getServer() + .newHttpCall("api/ce/refreshWorkerCount") + .setMethod(HttpMethod.POST) + .setParam("count", valueOf(newWorkerCount)) + .execute(); + } + + private void verifyAnalysesRunInParallel(MappedByteBuffer mappedByteBuffer, int workerCount) { + assertThat(adminWsClient.ce().workerCount()) + .extracting(WsCe.WorkerCountResponse::getValue, WsCe.WorkerCountResponse::getCanSetWorkerCount) + .containsOnly(workerCount, true); + + blockAnyAnalysisWithFakeGovernancePlugin(mappedByteBuffer); + + // start analysis of workerCount + 2 projects + List<String> projectKeys = IntStream.range(0, workerCount + 2).mapToObj(i -> "prj" + i).collect(toList()); + for (String projectKey : projectKeys) { + SonarScanner sonarRunner = SonarScanner.create(ItUtils.projectDir("shared/xoo-sample")) + .setProperties("sonar.projectKey", projectKey); + orchestrator.executeBuild(sonarRunner, false); + } + + List<WsCe.Task> tasksList = waitForWsCallStatus( + this::getTasksAllTasks, + (tasks) -> verifyInProgressTaskCount(tasks, workerCount)); + + assertThat(tasksList.stream() + .filter(CeWorkersTest::pending) + .map(WsCe.Task::getComponentKey) + .collect(toSet())) + .isEqualTo(copyOf(projectKeys.subList(workerCount, projectKeys.size()))); + assertThat(tasksList.stream() + .filter(CeWorkersTest::inProgress) + .map(WsCe.Task::getComponentKey) + .collect(toSet())) + .isEqualTo(copyOf(projectKeys.subList(0, workerCount))); + + releaseAnyAnalysisWithFakeGovernancePlugin(mappedByteBuffer); + + waitForWsCallStatus(this::getTasksAllTasks, List::isEmpty); + } + + private static MappedByteBuffer initMappedByteBuffer(RandomAccessFile randomAccessFile) throws IOException { + return randomAccessFile.getChannel().map(FileChannel.MapMode.READ_WRITE, 0, 1); + } + + private void releaseAnyAnalysisWithFakeGovernancePlugin(MappedByteBuffer mappedByteBuffer) { + // let the blocked analyses finish running + mappedByteBuffer.put(0, UNLATCHED); + } + + private static void blockAnyAnalysisWithFakeGovernancePlugin(MappedByteBuffer mappedByteBuffer) { + // block any analysis which will run with the fake-governance-plugin + mappedByteBuffer.put(0, BLOCKING); + } + + private void close(@Nullable RandomAccessFile randomAccessFile) throws IOException { + if (randomAccessFile != null) { + randomAccessFile.close(); } } - private static boolean threeTasksNotPending(List<WsCe.Task> tasksList) { - return tasksList.stream().filter(task -> !pending(task)).count() >= WORKER_COUNT; + private static boolean verifyInProgressTaskCount(List<WsCe.Task> tasksList, int workerCount) { + return tasksList.stream().filter(CeWorkersTest::inProgress).count() >= workerCount; } private static boolean pending(WsCe.Task task) { @@ -168,7 +205,7 @@ public class CeWorkersTest { private List<WsCe.Task> getTasksAllTasks(WsClient wsClient) { return wsClient.ce().activity(new ActivityWsRequest() - .setStatus(ImmutableList.of(STATUS_PENDING, STATUS_IN_PROGRESS, STATUS_SUCCESS, STATUS_FAILED, STATUS_CANCELED))) + .setStatus(ImmutableList.of(STATUS_PENDING, STATUS_IN_PROGRESS))) .getTasksList(); } @@ -183,6 +220,9 @@ public class CeWorkersTest { returnValue = call.apply(wsClient); expectedState = test.apply(returnValue); } + assertThat(expectedState) + .as("Failed to wait for expected queue status. Last call returned:\n%s", returnValue) + .isTrue(); return returnValue; } |