From 64b25b0613feb16070ada8e02c64761ac0d0f6d2 Mon Sep 17 00:00:00 2001 From: Lukasz Jarocki Date: Tue, 6 Dec 2022 14:44:08 +0100 Subject: SONAR-17699 implemented algorithm for running PRs in parallel --- .../src/main/java/org/sonar/ce/CeQueueModule.java | 2 + .../org/sonar/ce/queue/InternalCeQueueImpl.java | 19 +- .../org/sonar/ce/queue/NextPendingTaskPicker.java | 142 ++++++++ .../sonar/ce/queue/InternalCeQueueImplTest.java | 14 +- .../sonar/ce/queue/NextPendingTaskPickerTest.java | 386 +++++++++++++++++++++ 5 files changed, 545 insertions(+), 18 deletions(-) create mode 100644 server/sonar-ce/src/main/java/org/sonar/ce/queue/NextPendingTaskPicker.java create mode 100644 server/sonar-ce/src/test/java/org/sonar/ce/queue/NextPendingTaskPickerTest.java (limited to 'server/sonar-ce') diff --git a/server/sonar-ce/src/main/java/org/sonar/ce/CeQueueModule.java b/server/sonar-ce/src/main/java/org/sonar/ce/CeQueueModule.java index f51b19f6c5f..dba7f93aea0 100644 --- a/server/sonar-ce/src/main/java/org/sonar/ce/CeQueueModule.java +++ b/server/sonar-ce/src/main/java/org/sonar/ce/CeQueueModule.java @@ -22,6 +22,7 @@ package org.sonar.ce; import org.sonar.ce.monitoring.CeTasksMBeanImpl; import org.sonar.ce.queue.CeQueueInitializer; import org.sonar.ce.queue.InternalCeQueueImpl; +import org.sonar.ce.queue.NextPendingTaskPicker; import org.sonar.core.platform.Module; public class CeQueueModule extends Module { @@ -30,6 +31,7 @@ public class CeQueueModule extends Module { add( // queue state InternalCeQueueImpl.class, + NextPendingTaskPicker.class, // queue monitoring CeTasksMBeanImpl.class, diff --git a/server/sonar-ce/src/main/java/org/sonar/ce/queue/InternalCeQueueImpl.java b/server/sonar-ce/src/main/java/org/sonar/ce/queue/InternalCeQueueImpl.java index 22c1143b2b8..92a076aff6f 100644 --- a/server/sonar-ce/src/main/java/org/sonar/ce/queue/InternalCeQueueImpl.java +++ b/server/sonar-ce/src/main/java/org/sonar/ce/queue/InternalCeQueueImpl.java @@ -62,13 +62,15 @@ public class InternalCeQueueImpl extends CeQueueImpl implements InternalCeQueue private final DbClient dbClient; private final CEQueueStatus queueStatus; private final ComputeEngineStatus computeEngineStatus; + private final NextPendingTaskPicker nextPendingTaskPicker; public InternalCeQueueImpl(System2 system2, DbClient dbClient, UuidFactory uuidFactory, CEQueueStatus queueStatus, - ComputeEngineStatus computeEngineStatus) { + ComputeEngineStatus computeEngineStatus, NextPendingTaskPicker nextPendingTaskPicker) { super(system2, dbClient, uuidFactory); this.dbClient = dbClient; this.queueStatus = queueStatus; this.computeEngineStatus = computeEngineStatus; + this.nextPendingTaskPicker = nextPendingTaskPicker; } @Override @@ -85,8 +87,8 @@ public class InternalCeQueueImpl extends CeQueueImpl implements InternalCeQueue dbSession.commit(); LOG.debug("{} in progress tasks reset for worker uuid {}", i, workerUuid); } - Optional opt = findPendingTask(workerUuid, dbSession, ceQueueDao, excludeIndexationJob); - if (!opt.isPresent()) { + Optional opt = nextPendingTaskPicker.findPendingTask(workerUuid, dbSession, excludeIndexationJob); + if (opt.isEmpty()) { return Optional.empty(); } CeQueueDto taskDto = opt.get(); @@ -102,17 +104,6 @@ public class InternalCeQueueImpl extends CeQueueImpl implements InternalCeQueue } } - private static Optional findPendingTask(String workerUuid, DbSession dbSession, CeQueueDao ceQueueDao, boolean excludeIndexationJob) { - // try to find tasks including indexation job & excluding app/portfolio - // and if no match, try the opposite - // when excludeIndexationJob is false, search first excluding indexation jobs and including app/portfolio, then the opposite - Optional opt = ceQueueDao.peek(dbSession, workerUuid, excludeIndexationJob, !excludeIndexationJob); - if (!opt.isPresent()) { - opt = ceQueueDao.peek(dbSession, workerUuid, !excludeIndexationJob, excludeIndexationJob); - } - return opt; - } - @Override public void remove(CeTask task, CeActivityDto.Status status, @Nullable CeTaskResult taskResult, @Nullable Throwable error) { checkArgument(error == null || status == CeActivityDto.Status.FAILED, "Error can be provided only when status is FAILED"); diff --git a/server/sonar-ce/src/main/java/org/sonar/ce/queue/NextPendingTaskPicker.java b/server/sonar-ce/src/main/java/org/sonar/ce/queue/NextPendingTaskPicker.java new file mode 100644 index 00000000000..bc3f631c8a5 --- /dev/null +++ b/server/sonar-ce/src/main/java/org/sonar/ce/queue/NextPendingTaskPicker.java @@ -0,0 +1,142 @@ +/* + * SonarQube + * Copyright (C) 2009-2022 SonarSource SA + * mailto:info AT sonarsource DOT com + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 3 of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with this program; if not, write to the Free Software Foundation, + * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ +package org.sonar.ce.queue; + +import java.util.HashSet; +import java.util.List; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; +import javax.annotation.Nullable; +import org.apache.commons.lang.ObjectUtils; +import org.sonar.api.ce.ComputeEngineSide; +import org.sonar.api.config.Configuration; +import org.sonar.api.utils.log.Logger; +import org.sonar.api.utils.log.Loggers; +import org.sonar.core.config.ComputeEngineProperties; +import org.sonar.db.DbClient; +import org.sonar.db.DbSession; +import org.sonar.db.ce.CeQueueDao; +import org.sonar.db.ce.CeQueueDto; +import org.sonar.db.ce.CeTaskDtoLight; +import org.sonar.db.ce.PrOrBranchTask; + +import static java.util.stream.Collectors.toList; +import static org.sonar.db.ce.CeTaskCharacteristicDto.BRANCH_KEY; +import static org.sonar.db.ce.CeTaskCharacteristicDto.PULL_REQUEST; + +@ComputeEngineSide +public class NextPendingTaskPicker { + private static final Logger LOG = Loggers.get(NextPendingTaskPicker.class); + + private final Configuration config; + private final CeQueueDao ceQueueDao; + + public NextPendingTaskPicker(Configuration config, DbClient dbClient) { + this.config = config; + this.ceQueueDao = dbClient.ceQueueDao(); + } + + Optional findPendingTask(String workerUuid, DbSession dbSession, boolean prioritizeAnalysisAndRefresh) { + // try to find tasks including indexation job & excluding app/portfolio and if no match, try the opposite + // when prioritizeAnalysisAndRefresh is false, search first excluding indexation jobs and including app/portfolio, then the opposite + Optional eligibleForPeek = ceQueueDao.selectEligibleForPeek(dbSession, prioritizeAnalysisAndRefresh, !prioritizeAnalysisAndRefresh); + Optional eligibleForPeekInParallel = eligibleForPeekInParallel(dbSession); + + if (eligibleForPeek.isPresent() || eligibleForPeekInParallel.isPresent()) { + return submitOldest(dbSession, workerUuid, eligibleForPeek.orElse(null), eligibleForPeekInParallel.orElse(null)); + } + + eligibleForPeek = ceQueueDao.selectEligibleForPeek(dbSession, !prioritizeAnalysisAndRefresh, prioritizeAnalysisAndRefresh); + if (eligibleForPeek.isPresent()) { + return ceQueueDao.tryToPeek(dbSession, eligibleForPeek.get().getCeTaskUuid(), workerUuid); + } + return Optional.empty(); + } + + /** + * priority is always given to the task that is waiting longer - to avoid starvation + */ + private Optional submitOldest(DbSession session, String workerUuid, @Nullable CeTaskDtoLight eligibleForPeek, @Nullable CeTaskDtoLight eligibleForPeekInParallel) { + CeTaskDtoLight oldest = (CeTaskDtoLight) ObjectUtils.min(eligibleForPeek, eligibleForPeekInParallel); + Optional ceQueueDto = ceQueueDao.tryToPeek(session, oldest.getCeTaskUuid(), workerUuid); + if (!Objects.equals(oldest, eligibleForPeek)) { + ceQueueDto.ifPresent(t -> LOG.info("Task [uuid = " + t.getUuid() + "] will be run concurrently with other tasks for the same project")); + } + return ceQueueDto; + } + + Optional eligibleForPeekInParallel(DbSession dbSession) { + Optional parallelProjectTasksEnabled = config.getBoolean(ComputeEngineProperties.CE_PARALLEL_PROJECT_TASKS_ENABLED); + if (parallelProjectTasksEnabled.isPresent() && Boolean.TRUE.equals(parallelProjectTasksEnabled.get())) { + return findPendingConcurrentCandidateTasks(ceQueueDao, dbSession); + } + return Optional.empty(); + } + + /** + * Some of the tasks of the same project (mostly PRs) can be assigned and executed on workers at the same time/concurrently. + * We look for them in this method. + */ + private static Optional findPendingConcurrentCandidateTasks(CeQueueDao ceQueueDao, DbSession session) { + List queuedPrOrBranches = filterOldestPerProject(ceQueueDao.selectOldestPendingPrOrBranch(session)); + List inProgressTasks = ceQueueDao.selectInProgressWithCharacteristics(session); + + for (PrOrBranchTask task : queuedPrOrBranches) { + if ((Objects.equals(task.getBranchType(), PULL_REQUEST) && canRunPr(task, inProgressTasks)) + || (Objects.equals(task.getBranchType(), BRANCH_KEY) && canRunBranch(task, inProgressTasks))) { + return Optional.of(task); + } + } + return Optional.empty(); + } + + private static List filterOldestPerProject(List queuedPrOrBranches) { + Set mainComponentUuidsSeen = new HashSet<>(); + return queuedPrOrBranches.stream().filter(t -> mainComponentUuidsSeen.add(t.getMainComponentUuid())).collect(toList()); + } + + /** + * Branches cannot be run concurrently at this moment with other branches. And branches can already be returned in + * {@link CeQueueDao#selectEligibleForPeek(org.sonar.db.DbSession, boolean, boolean)}. But we need this method because branches can be + * assigned to a worker in a situation where the only type of in-progress tasks for a given project are {@link #PULLREQUEST_TYPE}. + *

+ * This method returns the longest waiting branch in the queue which can be scheduled concurrently with pull requests. + */ + private static boolean canRunBranch(PrOrBranchTask task, List inProgress) { + String mainComponentUuid = task.getMainComponentUuid(); + List sameComponentTasks = inProgress.stream() + .filter(t -> t.getMainComponentUuid().equals(mainComponentUuid)) + .collect(toList()); + //we can peek branch analysis task only if all the other in progress tasks for this component uuid are pull requests + return sameComponentTasks.stream().map(PrOrBranchTask::getBranchType).allMatch(s -> Objects.equals(s, PULL_REQUEST)); + } + + /** + * Queued pull requests can almost always be assigned to worker unless there is already PR running with the same ID (text_value column) + * and for the same project. We look for the one that waits for the longest time. + */ + private static boolean canRunPr(PrOrBranchTask task, List inProgress) { + // return true unless the same PR is already in progress + return inProgress.stream() + .noneMatch(pr -> pr.getMainComponentUuid().equals(task.getMainComponentUuid()) && Objects.equals(pr.getBranchType(), PULL_REQUEST) && + Objects.equals(pr.getComponentUuid(), (task.getComponentUuid()))); + } +} diff --git a/server/sonar-ce/src/test/java/org/sonar/ce/queue/InternalCeQueueImplTest.java b/server/sonar-ce/src/test/java/org/sonar/ce/queue/InternalCeQueueImplTest.java index 05b04d9ac2f..c488e4db934 100644 --- a/server/sonar-ce/src/test/java/org/sonar/ce/queue/InternalCeQueueImplTest.java +++ b/server/sonar-ce/src/test/java/org/sonar/ce/queue/InternalCeQueueImplTest.java @@ -28,6 +28,7 @@ import javax.annotation.Nullable; import org.junit.Before; import org.junit.Rule; import org.junit.Test; +import org.sonar.api.config.Configuration; import org.sonar.api.impl.utils.AlwaysIncreasingSystem2; import org.sonar.api.utils.System2; import org.sonar.ce.container.ComputeEngineStatus; @@ -53,6 +54,7 @@ import static java.util.Collections.emptyMap; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.assertj.core.api.Assertions.fail; +import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; @@ -77,10 +79,14 @@ public class InternalCeQueueImplTest { private UuidFactory uuidFactory = UuidFactoryImpl.INSTANCE; private CEQueueStatus queueStatus = new CEQueueStatusImpl(db.getDbClient(), mock(System2.class)); private ComputeEngineStatus computeEngineStatus = mock(ComputeEngineStatus.class); - private InternalCeQueue underTest = new InternalCeQueueImpl(system2, db.getDbClient(), uuidFactory, queueStatus, computeEngineStatus); + private Configuration config = mock(Configuration.class); + private NextPendingTaskPicker nextPendingTaskPicker = new NextPendingTaskPicker(config, db.getDbClient()); + private InternalCeQueue underTest = new InternalCeQueueImpl(system2, db.getDbClient(), uuidFactory, queueStatus, + computeEngineStatus, nextPendingTaskPicker); @Before public void setUp() { + when(config.getBoolean(any())).thenReturn(Optional.of(false)); when(computeEngineStatus.getStatus()).thenReturn(STARTED); } @@ -242,7 +248,7 @@ public class InternalCeQueueImplTest { db.getDbClient().ceQueueDao().deleteByUuid(db.getSession(), task.getUuid()); db.commit(); - InternalCeQueueImpl underTest = new InternalCeQueueImpl(system2, db.getDbClient(), null, queueStatus, null); + InternalCeQueueImpl underTest = new InternalCeQueueImpl(system2, db.getDbClient(), null, queueStatus, null, null); try { underTest.remove(task, CeActivityDto.Status.SUCCESS, null, null); @@ -259,7 +265,7 @@ public class InternalCeQueueImplTest { CeTask task = submit(CeTaskTypes.REPORT, newProjectDto("PROJECT_1")); db.getDbClient().ceQueueDao().deleteByUuid(db.getSession(), task.getUuid()); db.commit(); - InternalCeQueueImpl underTest = new InternalCeQueueImpl(system2, db.getDbClient(), null, queueStatusMock, null); + InternalCeQueueImpl underTest = new InternalCeQueueImpl(system2, db.getDbClient(), null, queueStatusMock, null, null); try { underTest.remove(task, CeActivityDto.Status.FAILED, null, null); @@ -276,7 +282,7 @@ public class InternalCeQueueImplTest { CeTask task = submit(CeTaskTypes.REPORT, newProjectDto("PROJECT_1")); db.executeUpdateSql("update ce_queue set status = 'PENDING', started_at = 123 where uuid = '" + task.getUuid() + "'"); db.commit(); - InternalCeQueueImpl underTest = new InternalCeQueueImpl(system2, db.getDbClient(), null, queueStatusMock, null); + InternalCeQueueImpl underTest = new InternalCeQueueImpl(system2, db.getDbClient(), null, queueStatusMock, null, null); underTest.cancelWornOuts(); diff --git a/server/sonar-ce/src/test/java/org/sonar/ce/queue/NextPendingTaskPickerTest.java b/server/sonar-ce/src/test/java/org/sonar/ce/queue/NextPendingTaskPickerTest.java new file mode 100644 index 00000000000..f39b9fdb3df --- /dev/null +++ b/server/sonar-ce/src/test/java/org/sonar/ce/queue/NextPendingTaskPickerTest.java @@ -0,0 +1,386 @@ +/* + * SonarQube + * Copyright (C) 2009-2022 SonarSource SA + * mailto:info AT sonarsource DOT com + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 3 of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with this program; if not, write to the Free Software Foundation, + * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ +package org.sonar.ce.queue; + +import java.util.Optional; +import java.util.UUID; +import java.util.function.Consumer; +import javax.annotation.Nullable; +import org.jetbrains.annotations.NotNull; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.sonar.api.config.Configuration; +import org.sonar.api.impl.utils.AlwaysIncreasingSystem2; +import org.sonar.api.utils.System2; +import org.sonar.api.utils.log.LogTester; +import org.sonar.core.config.ComputeEngineProperties; +import org.sonar.db.DbTester; +import org.sonar.db.ce.CeQueueDto; +import org.sonar.db.ce.CeTaskCharacteristicDto; +import org.sonar.db.ce.CeTaskTypes; +import org.sonar.db.component.ComponentDto; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; +import static org.sonar.db.ce.CeQueueDto.Status.IN_PROGRESS; +import static org.sonar.db.ce.CeQueueDto.Status.PENDING; +import static org.sonar.db.ce.CeTaskCharacteristicDto.BRANCH_KEY; +import static org.sonar.db.ce.CeTaskCharacteristicDto.PULL_REQUEST; + +public class NextPendingTaskPickerTest { + + private final System2 alwaysIncreasingSystem2 = new AlwaysIncreasingSystem2(1L, 1); + + private final Configuration config = mock(Configuration.class); + + @Rule + public LogTester logTester = new LogTester(); + + private NextPendingTaskPicker underTest; + + @Rule + public DbTester db = DbTester.create(alwaysIncreasingSystem2); + + @Before + public void before() { + underTest = new NextPendingTaskPicker(config, db.getDbClient()); + when(config.getBoolean(ComputeEngineProperties.CE_PARALLEL_PROJECT_TASKS_ENABLED)).thenReturn(Optional.of(true)); + } + + @Test + public void findPendingTask_whenNoTasksPending_returnsEmpty() { + Optional ceQueueDto = underTest.findPendingTask("workerUuid", db.getSession(), true); + + assertThat(ceQueueDto).isEmpty(); + } + + @Test + public void findPendingTask_whenTwoTasksPending_returnsTheOlderOne() { + // both the 'eligibleTask' and 'parallelEligibleTask' will point to this one + insertPending("1"); + insertPending("2"); + Optional ceQueueDto = underTest.findPendingTask("workerUuid", db.getSession(), true); + + assertThat(ceQueueDto).isPresent(); + assertThat(ceQueueDto.get().getUuid()).isEqualTo("1"); + } + + @Test + public void findPendingTask_whenTwoTasksPendingWithSameCreationDate_returnsLowestUuid() { + insertPending("d", c -> c.setCreatedAt(1L).setUpdatedAt(1L)); + insertPending("c", c -> c.setCreatedAt(1L).setUpdatedAt(1L)); + Optional ceQueueDto = underTest.findPendingTask("workerUuid", db.getSession(), true); + + assertThat(ceQueueDto).isPresent(); + assertThat(ceQueueDto.get().getUuid()).isEqualTo("c"); + } + + @Test + public void findPendingTask_givenBranchInProgressAndPropertySet_returnQueuedPR() { + insertInProgress("1"); + insertPendingPullRequest("2"); + Optional ceQueueDto = underTest.findPendingTask("workerUuid", db.getSession(), true); + + assertThat(ceQueueDto).isPresent(); + assertThat(ceQueueDto.get().getUuid()).isEqualTo("2"); + + assertThat(logTester.logs()).contains("Task [uuid = " + ceQueueDto.get().getUuid() + "] will be run concurrently with other tasks for the same project"); + } + + @Test + public void findPendingTask_givenBranchInProgressAndPropertyNotSet_dontReturnQueuedPR() { + when(config.getBoolean(ComputeEngineProperties.CE_PARALLEL_PROJECT_TASKS_ENABLED)).thenReturn(Optional.of(false)); + insertInProgress("1"); + insertPendingPullRequest("2"); + Optional ceQueueDto = underTest.findPendingTask("workerUuid", db.getSession(), true); + + assertThat(ceQueueDto).isEmpty(); + } + + @Test + public void findPendingTask_given2PRsQueued_returnBothQueuedPR() { + insertPendingPullRequest("1"); + insertPendingPullRequest("2"); + Optional ceQueueDto = underTest.findPendingTask("workerUuid", db.getSession(), true); + Optional ceQueueDto2 = underTest.findPendingTask("workerUuid2", db.getSession(), true); + + assertThat(ceQueueDto).isPresent(); + assertThat(ceQueueDto2).isPresent(); + assertThat(ceQueueDto.get().getUuid()).isEqualTo("1"); + assertThat(ceQueueDto.get().getStatus()).isEqualTo(IN_PROGRESS); + assertThat(ceQueueDto2.get().getStatus()).isEqualTo(IN_PROGRESS); + } + + @Test + public void findPendingTask_given1MainBranch_2PRsQueued_returnMainBranchAndPRs() { + insertPending("1"); + insertPendingPullRequest("2"); + insertPendingPullRequest("3"); + Optional ceQueueDto = underTest.findPendingTask("workerUuid", db.getSession(), true); + Optional ceQueueDto2 = underTest.findPendingTask("workerUuid2", db.getSession(), true); + Optional ceQueueDto3 = underTest.findPendingTask("workerUuid3", db.getSession(), true); + + assertThat(ceQueueDto).isPresent(); + assertThat(ceQueueDto2).isPresent(); + assertThat(ceQueueDto3).isPresent(); + assertThat(ceQueueDto.get().getUuid()).isEqualTo("1"); + assertThat(ceQueueDto.get().getStatus()).isEqualTo(IN_PROGRESS); + assertThat(ceQueueDto2.get().getUuid()).isEqualTo("2"); + assertThat(ceQueueDto2.get().getStatus()).isEqualTo(IN_PROGRESS); + assertThat(ceQueueDto3.get().getUuid()).isEqualTo("3"); + assertThat(ceQueueDto3.get().getStatus()).isEqualTo(IN_PROGRESS); + } + + @Test + public void findPendingTask_given1MainBranch_2BranchesQueued_returnOnyMainBranch() { + insertPending("1", null); + insertPendingBranch("2"); + insertPendingBranch("3"); + Optional ceQueueDto = underTest.findPendingTask("workerUuid", db.getSession(), true); + Optional ceQueueDto2 = underTest.findPendingTask("workerUuid2", db.getSession(), true); + + assertThat(ceQueueDto).isPresent(); + assertThat(ceQueueDto2).isEmpty(); + assertThat(ceQueueDto.get().getUuid()).isEqualTo("1"); + assertThat(ceQueueDto.get().getStatus()).isEqualTo(IN_PROGRESS); + } + + @Test + public void findPendingTask_given2BranchesQueued_returnOnlyFirstQueuedBranch() { + insertPending("1"); + insertPendingBranch("2"); + Optional ceQueueDto = underTest.findPendingTask("workerUuid", db.getSession(), true); + Optional ceQueueDto2 = underTest.findPendingTask("workerUuid2", db.getSession(), true); + + assertThat(ceQueueDto).isPresent(); + assertThat(ceQueueDto2).isEmpty(); + assertThat(ceQueueDto.get().getStatus()).isEqualTo(IN_PROGRESS); + } + + @Test + public void findPendingTask_given2SamePRsQueued_returnOnlyFirstQueuedPR() { + insertPendingPullRequest("1", c -> c.setComponentUuid("pr1")); + insertPendingPullRequest("2", c -> c.setComponentUuid("pr1")); + Optional ceQueueDto = underTest.findPendingTask("workerUuid", db.getSession(), true); + Optional ceQueueDto2 = underTest.findPendingTask("workerUuid2", db.getSession(), true); + + assertThat(ceQueueDto).isPresent(); + assertThat(ceQueueDto2).isEmpty(); + assertThat(ceQueueDto.get().getStatus()).isEqualTo(IN_PROGRESS); + } + + @Test + public void findPendingTask_givenBranchInTheQueueOlderThanPrInTheQueue_dontJumpAheadOfBranch() { + // we have branch task in progress. Next branch task needs to wait for this one to finish. We dont allow PRs to jump ahead of this branch + insertInProgress("1"); + insertPending("2"); + insertPendingPullRequest("3"); + + Optional ceQueueDto = underTest.findPendingTask("workerUuid", db.getSession(), true); + + assertThat(ceQueueDto).isEmpty(); + } + + @Test + public void findPendingTask_givenDifferentProjectAndPrInTheQueue_dontJumpAheadOfDifferentProject() { + // we have branch task in progress. + insertInProgress("1"); + // The PR can run in parallel, but needs to wait for this other project to finish. We dont allow PRs to jump ahead + insertPending("2", c -> c.setMainComponentUuid("different project")); + insertPendingPullRequest("3"); + + Optional ceQueueDto = underTest.findPendingTask("workerUuid", db.getSession(), true); + + assertThat(ceQueueDto).isPresent(); + assertThat(ceQueueDto.get().getUuid()).isEqualTo("2"); + } + + @Test + public void findPendingTask_givenDifferentProjectAndPrInTheQueue_prCanRunFirst() { + // we have branch task in progress. + insertInProgress("1"); + // The PR can run in parallel and is ahead of the other project + insertPendingPullRequest("2"); + insertPending("3", c -> c.setMainComponentUuid("different project")); + + Optional ceQueueDto = underTest.findPendingTask("workerUuid", db.getSession(), true); + + assertThat(ceQueueDto).isPresent(); + assertThat(ceQueueDto.get().getUuid()).isEqualTo("2"); + } + + @Test + public void findPendingTask_givenFivePrsInProgress_branchCanBeScheduled() { + insertInProgressPullRequest("1"); + insertInProgressPullRequest("2"); + insertInProgressPullRequest("3"); + insertInProgressPullRequest("4"); + insertInProgressPullRequest("5"); + insertPending("6"); + + Optional ceQueueDto = underTest.findPendingTask("workerUuid", db.getSession(), true); + + assertThat(ceQueueDto).isPresent(); + assertThat(ceQueueDto.get().getUuid()).isEqualTo("6"); + } + + @Test + public void findPendingTask_excludingViewPickUpOrphanBranches() { + insertPending("1", dto -> dto + .setComponentUuid("1") + .setMainComponentUuid("non-existing-uuid") + .setStatus(PENDING) + .setTaskType(CeTaskTypes.BRANCH_ISSUE_SYNC) + .setCreatedAt(100_000L)); + + Optional peek = underTest.findPendingTask("1", db.getSession(), false); + assertThat(peek).isPresent(); + assertThat(peek.get().getUuid()).isEqualTo("1"); + } + + @Test + public void exclude_portfolios_computation_when_indexing_issues() { + String taskUuid1 = "1", taskUuid2 = "2"; + String mainComponentUuid = "1"; + insertBranch(mainComponentUuid); + insertPending(taskUuid1, dto -> dto + .setComponentUuid(mainComponentUuid) + .setMainComponentUuid(mainComponentUuid) + .setStatus(PENDING) + .setTaskType(CeTaskTypes.BRANCH_ISSUE_SYNC) + .setCreatedAt(100_000L)); + + String view_uuid = "view_uuid"; + insertView(view_uuid); + insertPending(taskUuid2, dto -> dto + .setComponentUuid(view_uuid) + .setMainComponentUuid(view_uuid) + .setStatus(PENDING) + .setTaskType(CeTaskTypes.REPORT) + .setCreatedAt(100_000L)); + + Optional peek = underTest.findPendingTask("1", db.getSession(), false); + assertThat(peek).isPresent(); + assertThat(peek.get().getUuid()).isEqualTo(taskUuid1); + + Optional peek2 = underTest.findPendingTask("1", db.getSession(), false); + assertThat(peek2).isPresent(); + assertThat(peek2.get().getUuid()).isEqualTo(taskUuid2); + } + + private CeQueueDto insertPending(String uuid) { + return insertPending(uuid, null); + } + + private CeQueueDto insertPendingBranch(String uuid) { + CeQueueDto queue = insertPending(uuid, null); + insertCharacteristics(queue.getUuid(), BRANCH_KEY); + return queue; + } + + private CeQueueDto insertPendingPullRequest(String uuid) { + return insertPendingPullRequest(uuid, null); + } + + private CeQueueDto insertPendingPullRequest(String uuid, @Nullable Consumer ceQueueDto) { + CeQueueDto queue = insertPending(uuid, ceQueueDto); + insertCharacteristics(queue.getUuid(), PULL_REQUEST); + return queue; + } + + private CeQueueDto insertInProgressPullRequest(String uuid) { + CeQueueDto queue = insertInProgress(uuid, null); + insertCharacteristics(queue.getUuid(), PULL_REQUEST); + return queue; + } + + private CeQueueDto insertInProgress(String uuid) { + return insertInProgress(uuid, null); + } + + private CeQueueDto insertInProgress(String uuid, @Nullable Consumer ceQueueDto) { + return insertTask(uuid, IN_PROGRESS, ceQueueDto); + } + + private CeQueueDto insertPending(String uuid, @Nullable Consumer ceQueueDto) { + return insertTask(uuid, PENDING, ceQueueDto); + } + + private CeTaskCharacteristicDto insertCharacteristics(String taskUuid, String branchType) { + var ctcDto = new CeTaskCharacteristicDto(); + ctcDto.setUuid(UUID.randomUUID().toString()); + ctcDto.setTaskUuid(taskUuid); + ctcDto.setKey(branchType); + ctcDto.setValue("value"); + db.getDbClient().ceTaskCharacteristicsDao().insert(db.getSession(), ctcDto); + db.getSession().commit(); + return ctcDto; + } + + private CeQueueDto insertTask(String uuid, CeQueueDto.Status status, @Nullable Consumer ceQueueDtoConsumer) { + CeQueueDto dto = createCeQueue(uuid, status, ceQueueDtoConsumer); + db.getDbClient().ceQueueDao().insert(db.getSession(), dto); + db.getSession().commit(); + return dto; + } + + @NotNull + private static CeQueueDto createCeQueue(String uuid, CeQueueDto.Status status, @Nullable Consumer ceQueueDtoConsumer) { + CeQueueDto dto = new CeQueueDto(); + dto.setUuid(uuid); + dto.setTaskType(CeTaskTypes.REPORT); + dto.setStatus(status); + dto.setSubmitterUuid("henri"); + dto.setComponentUuid(UUID.randomUUID().toString()); + dto.setMainComponentUuid("1"); + if (ceQueueDtoConsumer != null) { + ceQueueDtoConsumer.accept(dto); + } + return dto; + } + + private void insertView(String view_uuid) { + ComponentDto view = new ComponentDto(); + view.setQualifier("VW"); + view.setKey(view_uuid + "_key"); + view.setUuid(view_uuid); + view.setPrivate(false); + view.setRootUuid(view_uuid); + view.setUuidPath("uuid_path"); + view.setBranchUuid(view_uuid); + db.components().insertPortfolioAndSnapshot(view); + db.commit(); + } + + private void insertBranch(String uuid) { + ComponentDto branch = new ComponentDto(); + branch.setQualifier("TRK"); + branch.setKey(uuid + "_key"); + branch.setUuid(uuid); + branch.setPrivate(false); + branch.setRootUuid(uuid); + branch.setUuidPath("uuid_path"); + branch.setBranchUuid(uuid); + db.components().insertComponent(branch); + db.commit(); + } +} -- cgit v1.2.3