aboutsummaryrefslogtreecommitdiffstats
path: root/tests/src
diff options
context:
space:
mode:
authorSébastien Lesaint <sebastien.lesaint@sonarsource.com>2017-07-10 10:49:46 +0200
committerSébastien Lesaint <sebastien.lesaint@sonarsource.com>2017-07-18 08:51:46 +0200
commit179311a789bbfb5025f446d3259c9ae0bac2cc95 (patch)
treef8e283fd0344fe1f29c2450a010952f417c6b613 /tests/src
parent601fafa4f5e03a83c0fd48a1f88b5ae9fae2e20d (diff)
downloadsonarqube-179311a789bbfb5025f446d3259c9ae0bac2cc95.tar.gz
sonarqube-179311a789bbfb5025f446d3259c9ae0bac2cc95.zip
SONAR-9525 add IT on dynamic update of CE worker count
Diffstat (limited to 'tests/src')
-rw-r--r--tests/src/test/java/org/sonarqube/tests/ce/CeWorkersTest.java158
1 files changed, 99 insertions, 59 deletions
diff --git a/tests/src/test/java/org/sonarqube/tests/ce/CeWorkersTest.java b/tests/src/test/java/org/sonarqube/tests/ce/CeWorkersTest.java
index 92a9482d852..747602c0850 100644
--- a/tests/src/test/java/org/sonarqube/tests/ce/CeWorkersTest.java
+++ b/tests/src/test/java/org/sonarqube/tests/ce/CeWorkersTest.java
@@ -23,6 +23,7 @@ import com.google.common.collect.ImmutableList;
import com.sonar.orchestrator.Orchestrator;
import com.sonar.orchestrator.OrchestratorBuilder;
import com.sonar.orchestrator.build.SonarScanner;
+import com.sonar.orchestrator.http.HttpMethod;
import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
@@ -35,6 +36,7 @@ import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
+import javax.annotation.Nullable;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.ClassRule;
@@ -55,27 +57,21 @@ import static util.ItUtils.pluginArtifact;
import static util.ItUtils.xooPlugin;
public class CeWorkersTest {
- /** 2 <= workerCount <= 5 */
- private static final int WORKER_COUNT = 2 + new Random().nextInt(4);
-
private static final int WAIT = 200; // ms
- private static final int MAX_WAIT_LOOP = 5 * 10; // 10s
+ private static final int MAX_WAIT_LOOP = 5 * 60 * 5; // 5 minutes
private static final byte BLOCKING = (byte) 0x00;
private static final byte UNLATCHED = (byte) 0x01;
private static final String STATUS_PENDING = "PENDING";
private static final String STATUS_IN_PROGRESS = "IN_PROGRESS";
- private static final String STATUS_SUCCESS = "SUCCESS";
- private static final String STATUS_FAILED = "FAILED";
-
- private static final String STATUS_CANCELED = "CANCELED";
@ClassRule
public static TemporaryFolder temporaryFolder = new TemporaryFolder();
private static File sharedMemory;
private static Orchestrator orchestrator;
+ private static WsClient adminWsClient;
@BeforeClass
public static void setUp() throws Exception {
@@ -83,11 +79,12 @@ public class CeWorkersTest {
OrchestratorBuilder builder = Orchestrator.builderEnv()
.addPlugin(pluginArtifact("fake-governance-plugin"))
- .setServerProperty("fakeGovernance.workerCount", valueOf(WORKER_COUNT))
.setServerProperty("fakeGoverance.workerLatch.sharedMemoryFile", sharedMemory.getAbsolutePath())
.addPlugin(xooPlugin());
orchestrator = builder.build();
orchestrator.start();
+
+ adminWsClient = newAdminWsClient(orchestrator);
}
@AfterClass
@@ -99,63 +96,103 @@ public class CeWorkersTest {
}
@Test
- public void workerCount_is_modified_by_plugin() throws IOException {
- Set<String> line = Files.lines(orchestrator.getServer().getCeLogs().toPath())
- .filter(s -> s.contains("Compute Engine will use "))
- .collect(Collectors.toSet());
- assertThat(line)
- .hasSize(1);
- assertThat(line.iterator().next()).contains(valueOf(WORKER_COUNT));
-
- assertThat(newAdminWsClient(orchestrator).ce().workerCount())
- .extracting(WsCe.WorkerCountResponse::getValue, WsCe.WorkerCountResponse::getCanSetWorkerCount)
- .containsOnly(WORKER_COUNT, true);
- }
+ public void enabled_worker_count_is_initially_1_and_can_be_changed_dynamically_by_plugin() throws IOException {
+ assertThat(Files.lines(orchestrator.getServer().getCeLogs().toPath())
+ .filter(s -> s.contains("Compute Engine will use ")))
+ .isEmpty();
- @Test
- public void number_of_analysis_processed_in_parallel_is_equal_to_number_of_workers() throws IOException {
RandomAccessFile randomAccessFile = null;
try {
randomAccessFile = new RandomAccessFile(sharedMemory, "rw");
- MappedByteBuffer mappedByteBuffer = randomAccessFile.getChannel().map(FileChannel.MapMode.READ_WRITE, 0, 1);
- // block any analysis which will run with the fake-governance-plugin
- mappedByteBuffer.put(0, BLOCKING);
-
- // start analysis of WORKER_COUNT + 2 projects
- List<String> projectKeys = IntStream.range(0, WORKER_COUNT + 2).mapToObj(i -> "prj" + i).collect(toList());
- for (String projectKey : projectKeys) {
- SonarScanner sonarRunner = SonarScanner.create(ItUtils.projectDir("shared/xoo-sample"))
- .setProperties("sonar.projectKey", projectKey);
- orchestrator.executeBuild(sonarRunner, false);
- }
-
- List<WsCe.Task> tasksList = waitForWsCallStatus(this::getTasksAllTasks, CeWorkersTest::threeTasksNotPending);
- boolean expectedState = threeTasksNotPending(tasksList);
- // let the blocked analyses finish running
- mappedByteBuffer.put(0, UNLATCHED);
-
- assertThat(expectedState).as("Couldn't get to the expected CE queue state in time").isTrue();
- assertThat(tasksList.stream()
- .filter(CeWorkersTest::pending)
- .map(WsCe.Task::getComponentKey)
- .collect(toSet()))
- .isEqualTo(copyOf(projectKeys.subList(WORKER_COUNT, projectKeys.size())));
- assertThat(tasksList.stream()
- .filter(CeWorkersTest::inProgress)
- .map(WsCe.Task::getComponentKey)
- .collect(toSet()))
- .isEqualTo(copyOf(projectKeys.subList(0, WORKER_COUNT)));
-
- waitForWsCallStatus(this::getTasksAllTasks, tasks -> tasks.stream().noneMatch(CeWorkersTest::pending));
+ MappedByteBuffer mappedByteBuffer = initMappedByteBuffer(randomAccessFile);
+
+ verifyAnalysesRunInParallel(mappedByteBuffer, 1);
+
+ /* 2 <= newWorkerCount <= 7 */
+ int newWorkerCount = 4 + new Random().nextInt(4);
+ updateWorkerCount(newWorkerCount);
+
+ Set<String> line = Files.lines(orchestrator.getServer().getCeLogs().toPath())
+ .filter(s -> s.contains("Compute Engine will use "))
+ .collect(Collectors.toSet());
+ assertThat(line).hasSize(1);
+ assertThat(line.iterator().next()).contains(valueOf(newWorkerCount));
+
+ verifyAnalysesRunInParallel(mappedByteBuffer, newWorkerCount);
+
+ int lowerWorkerCount = 3;
+ updateWorkerCount(lowerWorkerCount);
+ verifyAnalysesRunInParallel(mappedByteBuffer, lowerWorkerCount);
} finally {
- if (randomAccessFile != null) {
- randomAccessFile.close();
- }
+ close(randomAccessFile);
+ }
+ }
+
+ private void updateWorkerCount(int newWorkerCount) {
+ orchestrator.getServer()
+ .newHttpCall("api/ce/refreshWorkerCount")
+ .setMethod(HttpMethod.POST)
+ .setParam("count", valueOf(newWorkerCount))
+ .execute();
+ }
+
+ private void verifyAnalysesRunInParallel(MappedByteBuffer mappedByteBuffer, int workerCount) {
+ assertThat(adminWsClient.ce().workerCount())
+ .extracting(WsCe.WorkerCountResponse::getValue, WsCe.WorkerCountResponse::getCanSetWorkerCount)
+ .containsOnly(workerCount, true);
+
+ blockAnyAnalysisWithFakeGovernancePlugin(mappedByteBuffer);
+
+ // start analysis of workerCount + 2 projects
+ List<String> projectKeys = IntStream.range(0, workerCount + 2).mapToObj(i -> "prj" + i).collect(toList());
+ for (String projectKey : projectKeys) {
+ SonarScanner sonarRunner = SonarScanner.create(ItUtils.projectDir("shared/xoo-sample"))
+ .setProperties("sonar.projectKey", projectKey);
+ orchestrator.executeBuild(sonarRunner, false);
+ }
+
+ List<WsCe.Task> tasksList = waitForWsCallStatus(
+ this::getTasksAllTasks,
+ (tasks) -> verifyInProgressTaskCount(tasks, workerCount));
+
+ assertThat(tasksList.stream()
+ .filter(CeWorkersTest::pending)
+ .map(WsCe.Task::getComponentKey)
+ .collect(toSet()))
+ .isEqualTo(copyOf(projectKeys.subList(workerCount, projectKeys.size())));
+ assertThat(tasksList.stream()
+ .filter(CeWorkersTest::inProgress)
+ .map(WsCe.Task::getComponentKey)
+ .collect(toSet()))
+ .isEqualTo(copyOf(projectKeys.subList(0, workerCount)));
+
+ releaseAnyAnalysisWithFakeGovernancePlugin(mappedByteBuffer);
+
+ waitForWsCallStatus(this::getTasksAllTasks, List::isEmpty);
+ }
+
+ private static MappedByteBuffer initMappedByteBuffer(RandomAccessFile randomAccessFile) throws IOException {
+ return randomAccessFile.getChannel().map(FileChannel.MapMode.READ_WRITE, 0, 1);
+ }
+
+ private void releaseAnyAnalysisWithFakeGovernancePlugin(MappedByteBuffer mappedByteBuffer) {
+ // let the blocked analyses finish running
+ mappedByteBuffer.put(0, UNLATCHED);
+ }
+
+ private static void blockAnyAnalysisWithFakeGovernancePlugin(MappedByteBuffer mappedByteBuffer) {
+ // block any analysis which will run with the fake-governance-plugin
+ mappedByteBuffer.put(0, BLOCKING);
+ }
+
+ private void close(@Nullable RandomAccessFile randomAccessFile) throws IOException {
+ if (randomAccessFile != null) {
+ randomAccessFile.close();
}
}
- private static boolean threeTasksNotPending(List<WsCe.Task> tasksList) {
- return tasksList.stream().filter(task -> !pending(task)).count() >= WORKER_COUNT;
+ private static boolean verifyInProgressTaskCount(List<WsCe.Task> tasksList, int workerCount) {
+ return tasksList.stream().filter(CeWorkersTest::inProgress).count() >= workerCount;
}
private static boolean pending(WsCe.Task task) {
@@ -168,7 +205,7 @@ public class CeWorkersTest {
private List<WsCe.Task> getTasksAllTasks(WsClient wsClient) {
return wsClient.ce().activity(new ActivityWsRequest()
- .setStatus(ImmutableList.of(STATUS_PENDING, STATUS_IN_PROGRESS, STATUS_SUCCESS, STATUS_FAILED, STATUS_CANCELED)))
+ .setStatus(ImmutableList.of(STATUS_PENDING, STATUS_IN_PROGRESS)))
.getTasksList();
}
@@ -183,6 +220,9 @@ public class CeWorkersTest {
returnValue = call.apply(wsClient);
expectedState = test.apply(returnValue);
}
+ assertThat(expectedState)
+ .as("Failed to wait for expected queue status. Last call returned:\n%s", returnValue)
+ .isTrue();
return returnValue;
}