--- /dev/null
+/*
+ * SonarQube
+ * Copyright (C) 2009-2017 SonarSource SA
+ * mailto:info AT sonarsource DOT com
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 3 of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this program; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ */
+package workerCount;
+
+import org.sonar.api.server.ServerSide;
+import org.sonar.api.server.ws.Request;
+import org.sonar.api.server.ws.Response;
+import org.sonar.api.server.ws.WebService;
+import org.sonar.ce.http.CeHttpClient;
+import org.sonar.db.DbClient;
+import org.sonar.db.DbSession;
+import org.sonar.db.property.PropertyDto;
+import org.sonar.server.ce.ws.CeWsAction;
+
+import static workerCount.FakeWorkerCountProviderImpl.PROPERTY_WORKER_COUNT;
+
+@ServerSide
+public class RefreshWorkerCountAction implements CeWsAction {
+ private static final String PARAM_COUNT = "count";
+
+ private final CeHttpClient ceHttpClient;
+ private final DbClient dbClient;
+
+ public RefreshWorkerCountAction(CeHttpClient ceHttpClient, DbClient dbClient) {
+ this.ceHttpClient = ceHttpClient;
+ this.dbClient = dbClient;
+ }
+
+ @Override
+ public void define(WebService.NewController controller) {
+ controller.createAction("refreshWorkerCount")
+ .setPost(true)
+ .setHandler(this)
+ .createParam(PARAM_COUNT)
+ .setPossibleValues("1", "2", "3", "4", "5", "6", "7", "8", "9", "10")
+ .setRequired(true);
+ }
+
+ @Override
+ public void handle(Request request, Response response) throws Exception {
+ String count = request.getParam(PARAM_COUNT).getValue();
+ try (DbSession dbSession = dbClient.openSession(false)) {
+ dbClient.propertiesDao().saveProperty(new PropertyDto()
+ .setKey(PROPERTY_WORKER_COUNT)
+ .setValue(count));
+ dbSession.commit();
+ }
+ ceHttpClient.refreshCeWorkerCount();
+ }
+}
import com.sonar.orchestrator.Orchestrator;
import com.sonar.orchestrator.OrchestratorBuilder;
import com.sonar.orchestrator.build.SonarScanner;
+import com.sonar.orchestrator.http.HttpMethod;
import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
+import javax.annotation.Nullable;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import static util.ItUtils.xooPlugin;
public class CeWorkersTest {
- /** 2 <= workerCount <= 5 */
- private static final int WORKER_COUNT = 2 + new Random().nextInt(4);
-
private static final int WAIT = 200; // ms
- private static final int MAX_WAIT_LOOP = 5 * 10; // 10s
+ private static final int MAX_WAIT_LOOP = 5 * 60 * 5; // 5 minutes
private static final byte BLOCKING = (byte) 0x00;
private static final byte UNLATCHED = (byte) 0x01;
private static final String STATUS_PENDING = "PENDING";
private static final String STATUS_IN_PROGRESS = "IN_PROGRESS";
- private static final String STATUS_SUCCESS = "SUCCESS";
- private static final String STATUS_FAILED = "FAILED";
-
- private static final String STATUS_CANCELED = "CANCELED";
@ClassRule
public static TemporaryFolder temporaryFolder = new TemporaryFolder();
private static File sharedMemory;
private static Orchestrator orchestrator;
+ private static WsClient adminWsClient;
@BeforeClass
public static void setUp() throws Exception {
OrchestratorBuilder builder = Orchestrator.builderEnv()
.addPlugin(pluginArtifact("fake-governance-plugin"))
- .setServerProperty("fakeGovernance.workerCount", valueOf(WORKER_COUNT))
.setServerProperty("fakeGoverance.workerLatch.sharedMemoryFile", sharedMemory.getAbsolutePath())
.addPlugin(xooPlugin());
orchestrator = builder.build();
orchestrator.start();
+
+ adminWsClient = newAdminWsClient(orchestrator);
}
@AfterClass
}
@Test
- public void workerCount_is_modified_by_plugin() throws IOException {
- Set<String> line = Files.lines(orchestrator.getServer().getCeLogs().toPath())
- .filter(s -> s.contains("Compute Engine will use "))
- .collect(Collectors.toSet());
- assertThat(line)
- .hasSize(1);
- assertThat(line.iterator().next()).contains(valueOf(WORKER_COUNT));
-
- assertThat(newAdminWsClient(orchestrator).ce().workerCount())
- .extracting(WsCe.WorkerCountResponse::getValue, WsCe.WorkerCountResponse::getCanSetWorkerCount)
- .containsOnly(WORKER_COUNT, true);
- }
+ public void enabled_worker_count_is_initially_1_and_can_be_changed_dynamically_by_plugin() throws IOException {
+ assertThat(Files.lines(orchestrator.getServer().getCeLogs().toPath())
+ .filter(s -> s.contains("Compute Engine will use ")))
+ .isEmpty();
- @Test
- public void number_of_analysis_processed_in_parallel_is_equal_to_number_of_workers() throws IOException {
RandomAccessFile randomAccessFile = null;
try {
randomAccessFile = new RandomAccessFile(sharedMemory, "rw");
- MappedByteBuffer mappedByteBuffer = randomAccessFile.getChannel().map(FileChannel.MapMode.READ_WRITE, 0, 1);
- // block any analysis which will run with the fake-governance-plugin
- mappedByteBuffer.put(0, BLOCKING);
-
- // start analysis of WORKER_COUNT + 2 projects
- List<String> projectKeys = IntStream.range(0, WORKER_COUNT + 2).mapToObj(i -> "prj" + i).collect(toList());
- for (String projectKey : projectKeys) {
- SonarScanner sonarRunner = SonarScanner.create(ItUtils.projectDir("shared/xoo-sample"))
- .setProperties("sonar.projectKey", projectKey);
- orchestrator.executeBuild(sonarRunner, false);
- }
-
- List<WsCe.Task> tasksList = waitForWsCallStatus(this::getTasksAllTasks, CeWorkersTest::threeTasksNotPending);
- boolean expectedState = threeTasksNotPending(tasksList);
- // let the blocked analyses finish running
- mappedByteBuffer.put(0, UNLATCHED);
-
- assertThat(expectedState).as("Couldn't get to the expected CE queue state in time").isTrue();
- assertThat(tasksList.stream()
- .filter(CeWorkersTest::pending)
- .map(WsCe.Task::getComponentKey)
- .collect(toSet()))
- .isEqualTo(copyOf(projectKeys.subList(WORKER_COUNT, projectKeys.size())));
- assertThat(tasksList.stream()
- .filter(CeWorkersTest::inProgress)
- .map(WsCe.Task::getComponentKey)
- .collect(toSet()))
- .isEqualTo(copyOf(projectKeys.subList(0, WORKER_COUNT)));
-
- waitForWsCallStatus(this::getTasksAllTasks, tasks -> tasks.stream().noneMatch(CeWorkersTest::pending));
+ MappedByteBuffer mappedByteBuffer = initMappedByteBuffer(randomAccessFile);
+
+ verifyAnalysesRunInParallel(mappedByteBuffer, 1);
+
+ /* 2 <= newWorkerCount <= 7 */
+ int newWorkerCount = 4 + new Random().nextInt(4);
+ updateWorkerCount(newWorkerCount);
+
+ Set<String> line = Files.lines(orchestrator.getServer().getCeLogs().toPath())
+ .filter(s -> s.contains("Compute Engine will use "))
+ .collect(Collectors.toSet());
+ assertThat(line).hasSize(1);
+ assertThat(line.iterator().next()).contains(valueOf(newWorkerCount));
+
+ verifyAnalysesRunInParallel(mappedByteBuffer, newWorkerCount);
+
+ int lowerWorkerCount = 3;
+ updateWorkerCount(lowerWorkerCount);
+ verifyAnalysesRunInParallel(mappedByteBuffer, lowerWorkerCount);
} finally {
- if (randomAccessFile != null) {
- randomAccessFile.close();
- }
+ close(randomAccessFile);
+ }
+ }
+
+ private void updateWorkerCount(int newWorkerCount) {
+ orchestrator.getServer()
+ .newHttpCall("api/ce/refreshWorkerCount")
+ .setMethod(HttpMethod.POST)
+ .setParam("count", valueOf(newWorkerCount))
+ .execute();
+ }
+
+ private void verifyAnalysesRunInParallel(MappedByteBuffer mappedByteBuffer, int workerCount) {
+ assertThat(adminWsClient.ce().workerCount())
+ .extracting(WsCe.WorkerCountResponse::getValue, WsCe.WorkerCountResponse::getCanSetWorkerCount)
+ .containsOnly(workerCount, true);
+
+ blockAnyAnalysisWithFakeGovernancePlugin(mappedByteBuffer);
+
+ // start analysis of workerCount + 2 projects
+ List<String> projectKeys = IntStream.range(0, workerCount + 2).mapToObj(i -> "prj" + i).collect(toList());
+ for (String projectKey : projectKeys) {
+ SonarScanner sonarRunner = SonarScanner.create(ItUtils.projectDir("shared/xoo-sample"))
+ .setProperties("sonar.projectKey", projectKey);
+ orchestrator.executeBuild(sonarRunner, false);
+ }
+
+ List<WsCe.Task> tasksList = waitForWsCallStatus(
+ this::getTasksAllTasks,
+ (tasks) -> verifyInProgressTaskCount(tasks, workerCount));
+
+ assertThat(tasksList.stream()
+ .filter(CeWorkersTest::pending)
+ .map(WsCe.Task::getComponentKey)
+ .collect(toSet()))
+ .isEqualTo(copyOf(projectKeys.subList(workerCount, projectKeys.size())));
+ assertThat(tasksList.stream()
+ .filter(CeWorkersTest::inProgress)
+ .map(WsCe.Task::getComponentKey)
+ .collect(toSet()))
+ .isEqualTo(copyOf(projectKeys.subList(0, workerCount)));
+
+ releaseAnyAnalysisWithFakeGovernancePlugin(mappedByteBuffer);
+
+ waitForWsCallStatus(this::getTasksAllTasks, List::isEmpty);
+ }
+
+ private static MappedByteBuffer initMappedByteBuffer(RandomAccessFile randomAccessFile) throws IOException {
+ return randomAccessFile.getChannel().map(FileChannel.MapMode.READ_WRITE, 0, 1);
+ }
+
+ private void releaseAnyAnalysisWithFakeGovernancePlugin(MappedByteBuffer mappedByteBuffer) {
+ // let the blocked analyses finish running
+ mappedByteBuffer.put(0, UNLATCHED);
+ }
+
+ private static void blockAnyAnalysisWithFakeGovernancePlugin(MappedByteBuffer mappedByteBuffer) {
+ // block any analysis which will run with the fake-governance-plugin
+ mappedByteBuffer.put(0, BLOCKING);
+ }
+
+ private void close(@Nullable RandomAccessFile randomAccessFile) throws IOException {
+ if (randomAccessFile != null) {
+ randomAccessFile.close();
}
}
- private static boolean threeTasksNotPending(List<WsCe.Task> tasksList) {
- return tasksList.stream().filter(task -> !pending(task)).count() >= WORKER_COUNT;
+ private static boolean verifyInProgressTaskCount(List<WsCe.Task> tasksList, int workerCount) {
+ return tasksList.stream().filter(CeWorkersTest::inProgress).count() >= workerCount;
}
private static boolean pending(WsCe.Task task) {
private List<WsCe.Task> getTasksAllTasks(WsClient wsClient) {
return wsClient.ce().activity(new ActivityWsRequest()
- .setStatus(ImmutableList.of(STATUS_PENDING, STATUS_IN_PROGRESS, STATUS_SUCCESS, STATUS_FAILED, STATUS_CANCELED)))
+ .setStatus(ImmutableList.of(STATUS_PENDING, STATUS_IN_PROGRESS)))
.getTasksList();
}
returnValue = call.apply(wsClient);
expectedState = test.apply(returnValue);
}
+ assertThat(expectedState)
+ .as("Failed to wait for expected queue status. Last call returned:\n%s", returnValue)
+ .isTrue();
return returnValue;
}