aboutsummaryrefslogtreecommitdiffstats
path: root/server/sonar-ce
diff options
context:
space:
mode:
authorLukasz Jarocki <lukasz.jarocki@sonarsource.com>2022-12-06 14:44:08 +0100
committersonartech <sonartech@sonarsource.com>2022-12-09 20:03:10 +0000
commit64b25b0613feb16070ada8e02c64761ac0d0f6d2 (patch)
tree5fc6a8685ffa823a7b4b63895c140dad0eb673c9 /server/sonar-ce
parentb7e67fd16dda1f14ce2901310f5bf21f0030960a (diff)
downloadsonarqube-64b25b0613feb16070ada8e02c64761ac0d0f6d2.tar.gz
sonarqube-64b25b0613feb16070ada8e02c64761ac0d0f6d2.zip
SONAR-17699 implemented algorithm for running PRs in parallel
Diffstat (limited to 'server/sonar-ce')
-rw-r--r--server/sonar-ce/src/main/java/org/sonar/ce/CeQueueModule.java2
-rw-r--r--server/sonar-ce/src/main/java/org/sonar/ce/queue/InternalCeQueueImpl.java19
-rw-r--r--server/sonar-ce/src/main/java/org/sonar/ce/queue/NextPendingTaskPicker.java142
-rw-r--r--server/sonar-ce/src/test/java/org/sonar/ce/queue/InternalCeQueueImplTest.java14
-rw-r--r--server/sonar-ce/src/test/java/org/sonar/ce/queue/NextPendingTaskPickerTest.java386
5 files changed, 545 insertions, 18 deletions
diff --git a/server/sonar-ce/src/main/java/org/sonar/ce/CeQueueModule.java b/server/sonar-ce/src/main/java/org/sonar/ce/CeQueueModule.java
index f51b19f6c5f..dba7f93aea0 100644
--- a/server/sonar-ce/src/main/java/org/sonar/ce/CeQueueModule.java
+++ b/server/sonar-ce/src/main/java/org/sonar/ce/CeQueueModule.java
@@ -22,6 +22,7 @@ package org.sonar.ce;
import org.sonar.ce.monitoring.CeTasksMBeanImpl;
import org.sonar.ce.queue.CeQueueInitializer;
import org.sonar.ce.queue.InternalCeQueueImpl;
+import org.sonar.ce.queue.NextPendingTaskPicker;
import org.sonar.core.platform.Module;
public class CeQueueModule extends Module {
@@ -30,6 +31,7 @@ public class CeQueueModule extends Module {
add(
// queue state
InternalCeQueueImpl.class,
+ NextPendingTaskPicker.class,
// queue monitoring
CeTasksMBeanImpl.class,
diff --git a/server/sonar-ce/src/main/java/org/sonar/ce/queue/InternalCeQueueImpl.java b/server/sonar-ce/src/main/java/org/sonar/ce/queue/InternalCeQueueImpl.java
index 22c1143b2b8..92a076aff6f 100644
--- a/server/sonar-ce/src/main/java/org/sonar/ce/queue/InternalCeQueueImpl.java
+++ b/server/sonar-ce/src/main/java/org/sonar/ce/queue/InternalCeQueueImpl.java
@@ -62,13 +62,15 @@ public class InternalCeQueueImpl extends CeQueueImpl implements InternalCeQueue
private final DbClient dbClient;
private final CEQueueStatus queueStatus;
private final ComputeEngineStatus computeEngineStatus;
+ private final NextPendingTaskPicker nextPendingTaskPicker;
public InternalCeQueueImpl(System2 system2, DbClient dbClient, UuidFactory uuidFactory, CEQueueStatus queueStatus,
- ComputeEngineStatus computeEngineStatus) {
+ ComputeEngineStatus computeEngineStatus, NextPendingTaskPicker nextPendingTaskPicker) {
super(system2, dbClient, uuidFactory);
this.dbClient = dbClient;
this.queueStatus = queueStatus;
this.computeEngineStatus = computeEngineStatus;
+ this.nextPendingTaskPicker = nextPendingTaskPicker;
}
@Override
@@ -85,8 +87,8 @@ public class InternalCeQueueImpl extends CeQueueImpl implements InternalCeQueue
dbSession.commit();
LOG.debug("{} in progress tasks reset for worker uuid {}", i, workerUuid);
}
- Optional<CeQueueDto> opt = findPendingTask(workerUuid, dbSession, ceQueueDao, excludeIndexationJob);
- if (!opt.isPresent()) {
+ Optional<CeQueueDto> opt = nextPendingTaskPicker.findPendingTask(workerUuid, dbSession, excludeIndexationJob);
+ if (opt.isEmpty()) {
return Optional.empty();
}
CeQueueDto taskDto = opt.get();
@@ -102,17 +104,6 @@ public class InternalCeQueueImpl extends CeQueueImpl implements InternalCeQueue
}
}
- private static Optional<CeQueueDto> findPendingTask(String workerUuid, DbSession dbSession, CeQueueDao ceQueueDao, boolean excludeIndexationJob) {
- // try to find tasks including indexation job & excluding app/portfolio
- // and if no match, try the opposite
- // when excludeIndexationJob is false, search first excluding indexation jobs and including app/portfolio, then the opposite
- Optional<CeQueueDto> opt = ceQueueDao.peek(dbSession, workerUuid, excludeIndexationJob, !excludeIndexationJob);
- if (!opt.isPresent()) {
- opt = ceQueueDao.peek(dbSession, workerUuid, !excludeIndexationJob, excludeIndexationJob);
- }
- return opt;
- }
-
@Override
public void remove(CeTask task, CeActivityDto.Status status, @Nullable CeTaskResult taskResult, @Nullable Throwable error) {
checkArgument(error == null || status == CeActivityDto.Status.FAILED, "Error can be provided only when status is FAILED");
diff --git a/server/sonar-ce/src/main/java/org/sonar/ce/queue/NextPendingTaskPicker.java b/server/sonar-ce/src/main/java/org/sonar/ce/queue/NextPendingTaskPicker.java
new file mode 100644
index 00000000000..bc3f631c8a5
--- /dev/null
+++ b/server/sonar-ce/src/main/java/org/sonar/ce/queue/NextPendingTaskPicker.java
@@ -0,0 +1,142 @@
+/*
+ * SonarQube
+ * Copyright (C) 2009-2022 SonarSource SA
+ * mailto:info AT sonarsource DOT com
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 3 of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this program; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ */
+package org.sonar.ce.queue;
+
+import java.util.HashSet;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import javax.annotation.Nullable;
+import org.apache.commons.lang.ObjectUtils;
+import org.sonar.api.ce.ComputeEngineSide;
+import org.sonar.api.config.Configuration;
+import org.sonar.api.utils.log.Logger;
+import org.sonar.api.utils.log.Loggers;
+import org.sonar.core.config.ComputeEngineProperties;
+import org.sonar.db.DbClient;
+import org.sonar.db.DbSession;
+import org.sonar.db.ce.CeQueueDao;
+import org.sonar.db.ce.CeQueueDto;
+import org.sonar.db.ce.CeTaskDtoLight;
+import org.sonar.db.ce.PrOrBranchTask;
+
+import static java.util.stream.Collectors.toList;
+import static org.sonar.db.ce.CeTaskCharacteristicDto.BRANCH_KEY;
+import static org.sonar.db.ce.CeTaskCharacteristicDto.PULL_REQUEST;
+
+@ComputeEngineSide
+public class NextPendingTaskPicker {
+ private static final Logger LOG = Loggers.get(NextPendingTaskPicker.class);
+
+ private final Configuration config;
+ private final CeQueueDao ceQueueDao;
+
+ public NextPendingTaskPicker(Configuration config, DbClient dbClient) {
+ this.config = config;
+ this.ceQueueDao = dbClient.ceQueueDao();
+ }
+
+ Optional<CeQueueDto> findPendingTask(String workerUuid, DbSession dbSession, boolean prioritizeAnalysisAndRefresh) {
+ // try to find tasks including indexation job & excluding app/portfolio and if no match, try the opposite
+ // when prioritizeAnalysisAndRefresh is false, search first excluding indexation jobs and including app/portfolio, then the opposite
+ Optional<CeTaskDtoLight> eligibleForPeek = ceQueueDao.selectEligibleForPeek(dbSession, prioritizeAnalysisAndRefresh, !prioritizeAnalysisAndRefresh);
+ Optional<CeTaskDtoLight> eligibleForPeekInParallel = eligibleForPeekInParallel(dbSession);
+
+ if (eligibleForPeek.isPresent() || eligibleForPeekInParallel.isPresent()) {
+ return submitOldest(dbSession, workerUuid, eligibleForPeek.orElse(null), eligibleForPeekInParallel.orElse(null));
+ }
+
+ eligibleForPeek = ceQueueDao.selectEligibleForPeek(dbSession, !prioritizeAnalysisAndRefresh, prioritizeAnalysisAndRefresh);
+ if (eligibleForPeek.isPresent()) {
+ return ceQueueDao.tryToPeek(dbSession, eligibleForPeek.get().getCeTaskUuid(), workerUuid);
+ }
+ return Optional.empty();
+ }
+
+ /**
+ * priority is always given to the task that is waiting longer - to avoid starvation
+ */
+ private Optional<CeQueueDto> submitOldest(DbSession session, String workerUuid, @Nullable CeTaskDtoLight eligibleForPeek, @Nullable CeTaskDtoLight eligibleForPeekInParallel) {
+ CeTaskDtoLight oldest = (CeTaskDtoLight) ObjectUtils.min(eligibleForPeek, eligibleForPeekInParallel);
+ Optional<CeQueueDto> ceQueueDto = ceQueueDao.tryToPeek(session, oldest.getCeTaskUuid(), workerUuid);
+ if (!Objects.equals(oldest, eligibleForPeek)) {
+ ceQueueDto.ifPresent(t -> LOG.info("Task [uuid = " + t.getUuid() + "] will be run concurrently with other tasks for the same project"));
+ }
+ return ceQueueDto;
+ }
+
+ Optional<CeTaskDtoLight> eligibleForPeekInParallel(DbSession dbSession) {
+ Optional<Boolean> parallelProjectTasksEnabled = config.getBoolean(ComputeEngineProperties.CE_PARALLEL_PROJECT_TASKS_ENABLED);
+ if (parallelProjectTasksEnabled.isPresent() && Boolean.TRUE.equals(parallelProjectTasksEnabled.get())) {
+ return findPendingConcurrentCandidateTasks(ceQueueDao, dbSession);
+ }
+ return Optional.empty();
+ }
+
+ /**
+ * Some of the tasks of the same project (mostly PRs) can be assigned and executed on workers at the same time/concurrently.
+ * We look for them in this method.
+ */
+ private static Optional<CeTaskDtoLight> findPendingConcurrentCandidateTasks(CeQueueDao ceQueueDao, DbSession session) {
+ List<PrOrBranchTask> queuedPrOrBranches = filterOldestPerProject(ceQueueDao.selectOldestPendingPrOrBranch(session));
+ List<PrOrBranchTask> inProgressTasks = ceQueueDao.selectInProgressWithCharacteristics(session);
+
+ for (PrOrBranchTask task : queuedPrOrBranches) {
+ if ((Objects.equals(task.getBranchType(), PULL_REQUEST) && canRunPr(task, inProgressTasks))
+ || (Objects.equals(task.getBranchType(), BRANCH_KEY) && canRunBranch(task, inProgressTasks))) {
+ return Optional.of(task);
+ }
+ }
+ return Optional.empty();
+ }
+
+ private static List<PrOrBranchTask> filterOldestPerProject(List<PrOrBranchTask> queuedPrOrBranches) {
+ Set<String> mainComponentUuidsSeen = new HashSet<>();
+ return queuedPrOrBranches.stream().filter(t -> mainComponentUuidsSeen.add(t.getMainComponentUuid())).collect(toList());
+ }
+
+ /**
+ * Branches cannot be run concurrently at this moment with other branches. And branches can already be returned in
+ * {@link CeQueueDao#selectEligibleForPeek(org.sonar.db.DbSession, boolean, boolean)}. But we need this method because branches can be
+ * assigned to a worker in a situation where the only type of in-progress tasks for a given project are {@link #PULLREQUEST_TYPE}.
+ * <p>
+ * This method returns the longest waiting branch in the queue which can be scheduled concurrently with pull requests.
+ */
+ private static boolean canRunBranch(PrOrBranchTask task, List<PrOrBranchTask> inProgress) {
+ String mainComponentUuid = task.getMainComponentUuid();
+ List<PrOrBranchTask> sameComponentTasks = inProgress.stream()
+ .filter(t -> t.getMainComponentUuid().equals(mainComponentUuid))
+ .collect(toList());
+ //we can peek branch analysis task only if all the other in progress tasks for this component uuid are pull requests
+ return sameComponentTasks.stream().map(PrOrBranchTask::getBranchType).allMatch(s -> Objects.equals(s, PULL_REQUEST));
+ }
+
+ /**
+ * Queued pull requests can almost always be assigned to worker unless there is already PR running with the same ID (text_value column)
+ * and for the same project. We look for the one that waits for the longest time.
+ */
+ private static boolean canRunPr(PrOrBranchTask task, List<PrOrBranchTask> inProgress) {
+ // return true unless the same PR is already in progress
+ return inProgress.stream()
+ .noneMatch(pr -> pr.getMainComponentUuid().equals(task.getMainComponentUuid()) && Objects.equals(pr.getBranchType(), PULL_REQUEST) &&
+ Objects.equals(pr.getComponentUuid(), (task.getComponentUuid())));
+ }
+}
diff --git a/server/sonar-ce/src/test/java/org/sonar/ce/queue/InternalCeQueueImplTest.java b/server/sonar-ce/src/test/java/org/sonar/ce/queue/InternalCeQueueImplTest.java
index 05b04d9ac2f..c488e4db934 100644
--- a/server/sonar-ce/src/test/java/org/sonar/ce/queue/InternalCeQueueImplTest.java
+++ b/server/sonar-ce/src/test/java/org/sonar/ce/queue/InternalCeQueueImplTest.java
@@ -28,6 +28,7 @@ import javax.annotation.Nullable;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
+import org.sonar.api.config.Configuration;
import org.sonar.api.impl.utils.AlwaysIncreasingSystem2;
import org.sonar.api.utils.System2;
import org.sonar.ce.container.ComputeEngineStatus;
@@ -53,6 +54,7 @@ import static java.util.Collections.emptyMap;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.assertj.core.api.Assertions.fail;
+import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
@@ -77,10 +79,14 @@ public class InternalCeQueueImplTest {
private UuidFactory uuidFactory = UuidFactoryImpl.INSTANCE;
private CEQueueStatus queueStatus = new CEQueueStatusImpl(db.getDbClient(), mock(System2.class));
private ComputeEngineStatus computeEngineStatus = mock(ComputeEngineStatus.class);
- private InternalCeQueue underTest = new InternalCeQueueImpl(system2, db.getDbClient(), uuidFactory, queueStatus, computeEngineStatus);
+ private Configuration config = mock(Configuration.class);
+ private NextPendingTaskPicker nextPendingTaskPicker = new NextPendingTaskPicker(config, db.getDbClient());
+ private InternalCeQueue underTest = new InternalCeQueueImpl(system2, db.getDbClient(), uuidFactory, queueStatus,
+ computeEngineStatus, nextPendingTaskPicker);
@Before
public void setUp() {
+ when(config.getBoolean(any())).thenReturn(Optional.of(false));
when(computeEngineStatus.getStatus()).thenReturn(STARTED);
}
@@ -242,7 +248,7 @@ public class InternalCeQueueImplTest {
db.getDbClient().ceQueueDao().deleteByUuid(db.getSession(), task.getUuid());
db.commit();
- InternalCeQueueImpl underTest = new InternalCeQueueImpl(system2, db.getDbClient(), null, queueStatus, null);
+ InternalCeQueueImpl underTest = new InternalCeQueueImpl(system2, db.getDbClient(), null, queueStatus, null, null);
try {
underTest.remove(task, CeActivityDto.Status.SUCCESS, null, null);
@@ -259,7 +265,7 @@ public class InternalCeQueueImplTest {
CeTask task = submit(CeTaskTypes.REPORT, newProjectDto("PROJECT_1"));
db.getDbClient().ceQueueDao().deleteByUuid(db.getSession(), task.getUuid());
db.commit();
- InternalCeQueueImpl underTest = new InternalCeQueueImpl(system2, db.getDbClient(), null, queueStatusMock, null);
+ InternalCeQueueImpl underTest = new InternalCeQueueImpl(system2, db.getDbClient(), null, queueStatusMock, null, null);
try {
underTest.remove(task, CeActivityDto.Status.FAILED, null, null);
@@ -276,7 +282,7 @@ public class InternalCeQueueImplTest {
CeTask task = submit(CeTaskTypes.REPORT, newProjectDto("PROJECT_1"));
db.executeUpdateSql("update ce_queue set status = 'PENDING', started_at = 123 where uuid = '" + task.getUuid() + "'");
db.commit();
- InternalCeQueueImpl underTest = new InternalCeQueueImpl(system2, db.getDbClient(), null, queueStatusMock, null);
+ InternalCeQueueImpl underTest = new InternalCeQueueImpl(system2, db.getDbClient(), null, queueStatusMock, null, null);
underTest.cancelWornOuts();
diff --git a/server/sonar-ce/src/test/java/org/sonar/ce/queue/NextPendingTaskPickerTest.java b/server/sonar-ce/src/test/java/org/sonar/ce/queue/NextPendingTaskPickerTest.java
new file mode 100644
index 00000000000..f39b9fdb3df
--- /dev/null
+++ b/server/sonar-ce/src/test/java/org/sonar/ce/queue/NextPendingTaskPickerTest.java
@@ -0,0 +1,386 @@
+/*
+ * SonarQube
+ * Copyright (C) 2009-2022 SonarSource SA
+ * mailto:info AT sonarsource DOT com
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 3 of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this program; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ */
+package org.sonar.ce.queue;
+
+import java.util.Optional;
+import java.util.UUID;
+import java.util.function.Consumer;
+import javax.annotation.Nullable;
+import org.jetbrains.annotations.NotNull;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.sonar.api.config.Configuration;
+import org.sonar.api.impl.utils.AlwaysIncreasingSystem2;
+import org.sonar.api.utils.System2;
+import org.sonar.api.utils.log.LogTester;
+import org.sonar.core.config.ComputeEngineProperties;
+import org.sonar.db.DbTester;
+import org.sonar.db.ce.CeQueueDto;
+import org.sonar.db.ce.CeTaskCharacteristicDto;
+import org.sonar.db.ce.CeTaskTypes;
+import org.sonar.db.component.ComponentDto;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+import static org.sonar.db.ce.CeQueueDto.Status.IN_PROGRESS;
+import static org.sonar.db.ce.CeQueueDto.Status.PENDING;
+import static org.sonar.db.ce.CeTaskCharacteristicDto.BRANCH_KEY;
+import static org.sonar.db.ce.CeTaskCharacteristicDto.PULL_REQUEST;
+
+public class NextPendingTaskPickerTest {
+
+ private final System2 alwaysIncreasingSystem2 = new AlwaysIncreasingSystem2(1L, 1);
+
+ private final Configuration config = mock(Configuration.class);
+
+ @Rule
+ public LogTester logTester = new LogTester();
+
+ private NextPendingTaskPicker underTest;
+
+ @Rule
+ public DbTester db = DbTester.create(alwaysIncreasingSystem2);
+
+ @Before
+ public void before() {
+ underTest = new NextPendingTaskPicker(config, db.getDbClient());
+ when(config.getBoolean(ComputeEngineProperties.CE_PARALLEL_PROJECT_TASKS_ENABLED)).thenReturn(Optional.of(true));
+ }
+
+ @Test
+ public void findPendingTask_whenNoTasksPending_returnsEmpty() {
+ Optional<CeQueueDto> ceQueueDto = underTest.findPendingTask("workerUuid", db.getSession(), true);
+
+ assertThat(ceQueueDto).isEmpty();
+ }
+
+ @Test
+ public void findPendingTask_whenTwoTasksPending_returnsTheOlderOne() {
+ // both the 'eligibleTask' and 'parallelEligibleTask' will point to this one
+ insertPending("1");
+ insertPending("2");
+ Optional<CeQueueDto> ceQueueDto = underTest.findPendingTask("workerUuid", db.getSession(), true);
+
+ assertThat(ceQueueDto).isPresent();
+ assertThat(ceQueueDto.get().getUuid()).isEqualTo("1");
+ }
+
+ @Test
+ public void findPendingTask_whenTwoTasksPendingWithSameCreationDate_returnsLowestUuid() {
+ insertPending("d", c -> c.setCreatedAt(1L).setUpdatedAt(1L));
+ insertPending("c", c -> c.setCreatedAt(1L).setUpdatedAt(1L));
+ Optional<CeQueueDto> ceQueueDto = underTest.findPendingTask("workerUuid", db.getSession(), true);
+
+ assertThat(ceQueueDto).isPresent();
+ assertThat(ceQueueDto.get().getUuid()).isEqualTo("c");
+ }
+
+ @Test
+ public void findPendingTask_givenBranchInProgressAndPropertySet_returnQueuedPR() {
+ insertInProgress("1");
+ insertPendingPullRequest("2");
+ Optional<CeQueueDto> ceQueueDto = underTest.findPendingTask("workerUuid", db.getSession(), true);
+
+ assertThat(ceQueueDto).isPresent();
+ assertThat(ceQueueDto.get().getUuid()).isEqualTo("2");
+
+ assertThat(logTester.logs()).contains("Task [uuid = " + ceQueueDto.get().getUuid() + "] will be run concurrently with other tasks for the same project");
+ }
+
+ @Test
+ public void findPendingTask_givenBranchInProgressAndPropertyNotSet_dontReturnQueuedPR() {
+ when(config.getBoolean(ComputeEngineProperties.CE_PARALLEL_PROJECT_TASKS_ENABLED)).thenReturn(Optional.of(false));
+ insertInProgress("1");
+ insertPendingPullRequest("2");
+ Optional<CeQueueDto> ceQueueDto = underTest.findPendingTask("workerUuid", db.getSession(), true);
+
+ assertThat(ceQueueDto).isEmpty();
+ }
+
+ @Test
+ public void findPendingTask_given2PRsQueued_returnBothQueuedPR() {
+ insertPendingPullRequest("1");
+ insertPendingPullRequest("2");
+ Optional<CeQueueDto> ceQueueDto = underTest.findPendingTask("workerUuid", db.getSession(), true);
+ Optional<CeQueueDto> ceQueueDto2 = underTest.findPendingTask("workerUuid2", db.getSession(), true);
+
+ assertThat(ceQueueDto).isPresent();
+ assertThat(ceQueueDto2).isPresent();
+ assertThat(ceQueueDto.get().getUuid()).isEqualTo("1");
+ assertThat(ceQueueDto.get().getStatus()).isEqualTo(IN_PROGRESS);
+ assertThat(ceQueueDto2.get().getStatus()).isEqualTo(IN_PROGRESS);
+ }
+
+ @Test
+ public void findPendingTask_given1MainBranch_2PRsQueued_returnMainBranchAndPRs() {
+ insertPending("1");
+ insertPendingPullRequest("2");
+ insertPendingPullRequest("3");
+ Optional<CeQueueDto> ceQueueDto = underTest.findPendingTask("workerUuid", db.getSession(), true);
+ Optional<CeQueueDto> ceQueueDto2 = underTest.findPendingTask("workerUuid2", db.getSession(), true);
+ Optional<CeQueueDto> ceQueueDto3 = underTest.findPendingTask("workerUuid3", db.getSession(), true);
+
+ assertThat(ceQueueDto).isPresent();
+ assertThat(ceQueueDto2).isPresent();
+ assertThat(ceQueueDto3).isPresent();
+ assertThat(ceQueueDto.get().getUuid()).isEqualTo("1");
+ assertThat(ceQueueDto.get().getStatus()).isEqualTo(IN_PROGRESS);
+ assertThat(ceQueueDto2.get().getUuid()).isEqualTo("2");
+ assertThat(ceQueueDto2.get().getStatus()).isEqualTo(IN_PROGRESS);
+ assertThat(ceQueueDto3.get().getUuid()).isEqualTo("3");
+ assertThat(ceQueueDto3.get().getStatus()).isEqualTo(IN_PROGRESS);
+ }
+
+ @Test
+ public void findPendingTask_given1MainBranch_2BranchesQueued_returnOnyMainBranch() {
+ insertPending("1", null);
+ insertPendingBranch("2");
+ insertPendingBranch("3");
+ Optional<CeQueueDto> ceQueueDto = underTest.findPendingTask("workerUuid", db.getSession(), true);
+ Optional<CeQueueDto> ceQueueDto2 = underTest.findPendingTask("workerUuid2", db.getSession(), true);
+
+ assertThat(ceQueueDto).isPresent();
+ assertThat(ceQueueDto2).isEmpty();
+ assertThat(ceQueueDto.get().getUuid()).isEqualTo("1");
+ assertThat(ceQueueDto.get().getStatus()).isEqualTo(IN_PROGRESS);
+ }
+
+ @Test
+ public void findPendingTask_given2BranchesQueued_returnOnlyFirstQueuedBranch() {
+ insertPending("1");
+ insertPendingBranch("2");
+ Optional<CeQueueDto> ceQueueDto = underTest.findPendingTask("workerUuid", db.getSession(), true);
+ Optional<CeQueueDto> ceQueueDto2 = underTest.findPendingTask("workerUuid2", db.getSession(), true);
+
+ assertThat(ceQueueDto).isPresent();
+ assertThat(ceQueueDto2).isEmpty();
+ assertThat(ceQueueDto.get().getStatus()).isEqualTo(IN_PROGRESS);
+ }
+
+ @Test
+ public void findPendingTask_given2SamePRsQueued_returnOnlyFirstQueuedPR() {
+ insertPendingPullRequest("1", c -> c.setComponentUuid("pr1"));
+ insertPendingPullRequest("2", c -> c.setComponentUuid("pr1"));
+ Optional<CeQueueDto> ceQueueDto = underTest.findPendingTask("workerUuid", db.getSession(), true);
+ Optional<CeQueueDto> ceQueueDto2 = underTest.findPendingTask("workerUuid2", db.getSession(), true);
+
+ assertThat(ceQueueDto).isPresent();
+ assertThat(ceQueueDto2).isEmpty();
+ assertThat(ceQueueDto.get().getStatus()).isEqualTo(IN_PROGRESS);
+ }
+
+ @Test
+ public void findPendingTask_givenBranchInTheQueueOlderThanPrInTheQueue_dontJumpAheadOfBranch() {
+ // we have branch task in progress. Next branch task needs to wait for this one to finish. We dont allow PRs to jump ahead of this branch
+ insertInProgress("1");
+ insertPending("2");
+ insertPendingPullRequest("3");
+
+ Optional<CeQueueDto> ceQueueDto = underTest.findPendingTask("workerUuid", db.getSession(), true);
+
+ assertThat(ceQueueDto).isEmpty();
+ }
+
+ @Test
+ public void findPendingTask_givenDifferentProjectAndPrInTheQueue_dontJumpAheadOfDifferentProject() {
+ // we have branch task in progress.
+ insertInProgress("1");
+ // The PR can run in parallel, but needs to wait for this other project to finish. We dont allow PRs to jump ahead
+ insertPending("2", c -> c.setMainComponentUuid("different project"));
+ insertPendingPullRequest("3");
+
+ Optional<CeQueueDto> ceQueueDto = underTest.findPendingTask("workerUuid", db.getSession(), true);
+
+ assertThat(ceQueueDto).isPresent();
+ assertThat(ceQueueDto.get().getUuid()).isEqualTo("2");
+ }
+
+ @Test
+ public void findPendingTask_givenDifferentProjectAndPrInTheQueue_prCanRunFirst() {
+ // we have branch task in progress.
+ insertInProgress("1");
+ // The PR can run in parallel and is ahead of the other project
+ insertPendingPullRequest("2");
+ insertPending("3", c -> c.setMainComponentUuid("different project"));
+
+ Optional<CeQueueDto> ceQueueDto = underTest.findPendingTask("workerUuid", db.getSession(), true);
+
+ assertThat(ceQueueDto).isPresent();
+ assertThat(ceQueueDto.get().getUuid()).isEqualTo("2");
+ }
+
+ @Test
+ public void findPendingTask_givenFivePrsInProgress_branchCanBeScheduled() {
+ insertInProgressPullRequest("1");
+ insertInProgressPullRequest("2");
+ insertInProgressPullRequest("3");
+ insertInProgressPullRequest("4");
+ insertInProgressPullRequest("5");
+ insertPending("6");
+
+ Optional<CeQueueDto> ceQueueDto = underTest.findPendingTask("workerUuid", db.getSession(), true);
+
+ assertThat(ceQueueDto).isPresent();
+ assertThat(ceQueueDto.get().getUuid()).isEqualTo("6");
+ }
+
+ @Test
+ public void findPendingTask_excludingViewPickUpOrphanBranches() {
+ insertPending("1", dto -> dto
+ .setComponentUuid("1")
+ .setMainComponentUuid("non-existing-uuid")
+ .setStatus(PENDING)
+ .setTaskType(CeTaskTypes.BRANCH_ISSUE_SYNC)
+ .setCreatedAt(100_000L));
+
+ Optional<CeQueueDto> peek = underTest.findPendingTask("1", db.getSession(), false);
+ assertThat(peek).isPresent();
+ assertThat(peek.get().getUuid()).isEqualTo("1");
+ }
+
+ @Test
+ public void exclude_portfolios_computation_when_indexing_issues() {
+ String taskUuid1 = "1", taskUuid2 = "2";
+ String mainComponentUuid = "1";
+ insertBranch(mainComponentUuid);
+ insertPending(taskUuid1, dto -> dto
+ .setComponentUuid(mainComponentUuid)
+ .setMainComponentUuid(mainComponentUuid)
+ .setStatus(PENDING)
+ .setTaskType(CeTaskTypes.BRANCH_ISSUE_SYNC)
+ .setCreatedAt(100_000L));
+
+ String view_uuid = "view_uuid";
+ insertView(view_uuid);
+ insertPending(taskUuid2, dto -> dto
+ .setComponentUuid(view_uuid)
+ .setMainComponentUuid(view_uuid)
+ .setStatus(PENDING)
+ .setTaskType(CeTaskTypes.REPORT)
+ .setCreatedAt(100_000L));
+
+ Optional<CeQueueDto> peek = underTest.findPendingTask("1", db.getSession(), false);
+ assertThat(peek).isPresent();
+ assertThat(peek.get().getUuid()).isEqualTo(taskUuid1);
+
+ Optional<CeQueueDto> peek2 = underTest.findPendingTask("1", db.getSession(), false);
+ assertThat(peek2).isPresent();
+ assertThat(peek2.get().getUuid()).isEqualTo(taskUuid2);
+ }
+
+ private CeQueueDto insertPending(String uuid) {
+ return insertPending(uuid, null);
+ }
+
+ private CeQueueDto insertPendingBranch(String uuid) {
+ CeQueueDto queue = insertPending(uuid, null);
+ insertCharacteristics(queue.getUuid(), BRANCH_KEY);
+ return queue;
+ }
+
+ private CeQueueDto insertPendingPullRequest(String uuid) {
+ return insertPendingPullRequest(uuid, null);
+ }
+
+ private CeQueueDto insertPendingPullRequest(String uuid, @Nullable Consumer<CeQueueDto> ceQueueDto) {
+ CeQueueDto queue = insertPending(uuid, ceQueueDto);
+ insertCharacteristics(queue.getUuid(), PULL_REQUEST);
+ return queue;
+ }
+
+ private CeQueueDto insertInProgressPullRequest(String uuid) {
+ CeQueueDto queue = insertInProgress(uuid, null);
+ insertCharacteristics(queue.getUuid(), PULL_REQUEST);
+ return queue;
+ }
+
+ private CeQueueDto insertInProgress(String uuid) {
+ return insertInProgress(uuid, null);
+ }
+
+ private CeQueueDto insertInProgress(String uuid, @Nullable Consumer<CeQueueDto> ceQueueDto) {
+ return insertTask(uuid, IN_PROGRESS, ceQueueDto);
+ }
+
+ private CeQueueDto insertPending(String uuid, @Nullable Consumer<CeQueueDto> ceQueueDto) {
+ return insertTask(uuid, PENDING, ceQueueDto);
+ }
+
+ private CeTaskCharacteristicDto insertCharacteristics(String taskUuid, String branchType) {
+ var ctcDto = new CeTaskCharacteristicDto();
+ ctcDto.setUuid(UUID.randomUUID().toString());
+ ctcDto.setTaskUuid(taskUuid);
+ ctcDto.setKey(branchType);
+ ctcDto.setValue("value");
+ db.getDbClient().ceTaskCharacteristicsDao().insert(db.getSession(), ctcDto);
+ db.getSession().commit();
+ return ctcDto;
+ }
+
+ private CeQueueDto insertTask(String uuid, CeQueueDto.Status status, @Nullable Consumer<CeQueueDto> ceQueueDtoConsumer) {
+ CeQueueDto dto = createCeQueue(uuid, status, ceQueueDtoConsumer);
+ db.getDbClient().ceQueueDao().insert(db.getSession(), dto);
+ db.getSession().commit();
+ return dto;
+ }
+
+ @NotNull
+ private static CeQueueDto createCeQueue(String uuid, CeQueueDto.Status status, @Nullable Consumer<CeQueueDto> ceQueueDtoConsumer) {
+ CeQueueDto dto = new CeQueueDto();
+ dto.setUuid(uuid);
+ dto.setTaskType(CeTaskTypes.REPORT);
+ dto.setStatus(status);
+ dto.setSubmitterUuid("henri");
+ dto.setComponentUuid(UUID.randomUUID().toString());
+ dto.setMainComponentUuid("1");
+ if (ceQueueDtoConsumer != null) {
+ ceQueueDtoConsumer.accept(dto);
+ }
+ return dto;
+ }
+
+ private void insertView(String view_uuid) {
+ ComponentDto view = new ComponentDto();
+ view.setQualifier("VW");
+ view.setKey(view_uuid + "_key");
+ view.setUuid(view_uuid);
+ view.setPrivate(false);
+ view.setRootUuid(view_uuid);
+ view.setUuidPath("uuid_path");
+ view.setBranchUuid(view_uuid);
+ db.components().insertPortfolioAndSnapshot(view);
+ db.commit();
+ }
+
+ private void insertBranch(String uuid) {
+ ComponentDto branch = new ComponentDto();
+ branch.setQualifier("TRK");
+ branch.setKey(uuid + "_key");
+ branch.setUuid(uuid);
+ branch.setPrivate(false);
+ branch.setRootUuid(uuid);
+ branch.setUuidPath("uuid_path");
+ branch.setBranchUuid(uuid);
+ db.components().insertComponent(branch);
+ db.commit();
+ }
+}