From 64b25b0613feb16070ada8e02c64761ac0d0f6d2 Mon Sep 17 00:00:00 2001 From: Lukasz Jarocki Date: Tue, 6 Dec 2022 14:44:08 +0100 Subject: [PATCH] SONAR-17699 implemented algorithm for running PRs in parallel --- .../org/sonar/ce/queue/CeQueueImplTest.java | 16 +- .../main/java/org/sonar/ce/CeQueueModule.java | 2 + .../sonar/ce/queue/InternalCeQueueImpl.java | 19 +- .../sonar/ce/queue/NextPendingTaskPicker.java | 142 +++++++ .../ce/queue/InternalCeQueueImplTest.java | 14 +- .../ce/queue/NextPendingTaskPickerTest.java | 386 ++++++++++++++++++ .../main/java/org/sonar/db/ce/CeQueueDao.java | 29 +- .../java/org/sonar/db/ce/CeQueueMapper.java | 6 +- .../java/org/sonar/db/ce/CeTaskDtoLight.java | 67 +++ .../java/org/sonar/db/ce/PrOrBranchTask.java | 44 ++ .../org/sonar/db/ce/CeQueueMapper.xml | 59 ++- .../java/org/sonar/db/ce/CeQueueDaoTest.java | 237 +++++------ .../org/sonar/db/ce/CeTaskDtoLightTest.java | 63 +++ .../core/config/ComputeEngineProperties.java | 29 ++ 14 files changed, 938 insertions(+), 175 deletions(-) create mode 100644 server/sonar-ce/src/main/java/org/sonar/ce/queue/NextPendingTaskPicker.java create mode 100644 server/sonar-ce/src/test/java/org/sonar/ce/queue/NextPendingTaskPickerTest.java create mode 100644 server/sonar-db-dao/src/main/java/org/sonar/db/ce/CeTaskDtoLight.java create mode 100644 server/sonar-db-dao/src/main/java/org/sonar/db/ce/PrOrBranchTask.java create mode 100644 server/sonar-db-dao/src/test/java/org/sonar/db/ce/CeTaskDtoLightTest.java create mode 100644 sonar-core/src/main/java/org/sonar/core/config/ComputeEngineProperties.java diff --git a/server/sonar-ce-common/src/test/java/org/sonar/ce/queue/CeQueueImplTest.java b/server/sonar-ce-common/src/test/java/org/sonar/ce/queue/CeQueueImplTest.java index 7ea2ba50baf..3aa74acca40 100644 --- a/server/sonar-ce-common/src/test/java/org/sonar/ce/queue/CeQueueImplTest.java +++ b/server/sonar-ce-common/src/test/java/org/sonar/ce/queue/CeQueueImplTest.java @@ -389,8 +389,8 @@ public class CeQueueImplTest { @Test public void fail_to_cancel_if_in_progress() { - submit(CeTaskTypes.REPORT, newComponent(randomAlphabetic(11))); - CeQueueDto ceQueueDto = db.getDbClient().ceQueueDao().peek(session, WORKER_UUID, false, false).get(); + CeTask task = submit(CeTaskTypes.REPORT, newComponent(randomAlphabetic(11))); + CeQueueDto ceQueueDto = db.getDbClient().ceQueueDao().tryToPeek(session, task.getUuid(), WORKER_UUID).get(); assertThatThrownBy(() -> underTest.cancel(db.getSession(), ceQueueDto)) .isInstanceOf(IllegalStateException.class) @@ -403,7 +403,7 @@ public class CeQueueImplTest { CeTask pendingTask1 = submit(CeTaskTypes.REPORT, newComponent(randomAlphabetic(12))); CeTask pendingTask2 = submit(CeTaskTypes.REPORT, newComponent(randomAlphabetic(12))); - db.getDbClient().ceQueueDao().peek(session, WORKER_UUID, false, false); + db.getDbClient().ceQueueDao().tryToPeek(session, inProgressTask.getUuid(), WORKER_UUID); int canceledCount = underTest.cancelAll(); assertThat(canceledCount).isEqualTo(2); @@ -429,8 +429,8 @@ public class CeQueueImplTest { @Test public void pauseWorkers_marks_workers_as_pausing_if_some_tasks_in_progress() { - submit(CeTaskTypes.REPORT, newComponent(randomAlphabetic(12))); - db.getDbClient().ceQueueDao().peek(session, WORKER_UUID, false, false); + CeTask task = submit(CeTaskTypes.REPORT, newComponent(randomAlphabetic(12))); + db.getDbClient().ceQueueDao().tryToPeek(session, task.getUuid(), WORKER_UUID); // task is in-progress assertThat(underTest.getWorkersPauseStatus()).isEqualTo(CeQueue.WorkersPauseStatus.RESUMED); @@ -450,8 +450,8 @@ public class CeQueueImplTest { @Test public void resumeWorkers_resumes_pausing_workers() { - submit(CeTaskTypes.REPORT, newComponent(randomAlphabetic(12))); - db.getDbClient().ceQueueDao().peek(session, WORKER_UUID, false, false); + CeTask task = submit(CeTaskTypes.REPORT, newComponent(randomAlphabetic(12))); + db.getDbClient().ceQueueDao().tryToPeek(session, task.getUuid(), WORKER_UUID); // task is in-progress underTest.pauseWorkers(); @@ -473,7 +473,7 @@ public class CeQueueImplTest { @Test public void fail_in_progress_task() { CeTask task = submit(CeTaskTypes.REPORT, newComponent(randomAlphabetic(12))); - CeQueueDto queueDto = db.getDbClient().ceQueueDao().peek(db.getSession(), WORKER_UUID, false, false).get(); + CeQueueDto queueDto = db.getDbClient().ceQueueDao().tryToPeek(db.getSession(), task.getUuid(), WORKER_UUID).get(); underTest.fail(db.getSession(), queueDto, "TIMEOUT", "Failed on timeout"); diff --git a/server/sonar-ce/src/main/java/org/sonar/ce/CeQueueModule.java b/server/sonar-ce/src/main/java/org/sonar/ce/CeQueueModule.java index f51b19f6c5f..dba7f93aea0 100644 --- a/server/sonar-ce/src/main/java/org/sonar/ce/CeQueueModule.java +++ b/server/sonar-ce/src/main/java/org/sonar/ce/CeQueueModule.java @@ -22,6 +22,7 @@ package org.sonar.ce; import org.sonar.ce.monitoring.CeTasksMBeanImpl; import org.sonar.ce.queue.CeQueueInitializer; import org.sonar.ce.queue.InternalCeQueueImpl; +import org.sonar.ce.queue.NextPendingTaskPicker; import org.sonar.core.platform.Module; public class CeQueueModule extends Module { @@ -30,6 +31,7 @@ public class CeQueueModule extends Module { add( // queue state InternalCeQueueImpl.class, + NextPendingTaskPicker.class, // queue monitoring CeTasksMBeanImpl.class, diff --git a/server/sonar-ce/src/main/java/org/sonar/ce/queue/InternalCeQueueImpl.java b/server/sonar-ce/src/main/java/org/sonar/ce/queue/InternalCeQueueImpl.java index 22c1143b2b8..92a076aff6f 100644 --- a/server/sonar-ce/src/main/java/org/sonar/ce/queue/InternalCeQueueImpl.java +++ b/server/sonar-ce/src/main/java/org/sonar/ce/queue/InternalCeQueueImpl.java @@ -62,13 +62,15 @@ public class InternalCeQueueImpl extends CeQueueImpl implements InternalCeQueue private final DbClient dbClient; private final CEQueueStatus queueStatus; private final ComputeEngineStatus computeEngineStatus; + private final NextPendingTaskPicker nextPendingTaskPicker; public InternalCeQueueImpl(System2 system2, DbClient dbClient, UuidFactory uuidFactory, CEQueueStatus queueStatus, - ComputeEngineStatus computeEngineStatus) { + ComputeEngineStatus computeEngineStatus, NextPendingTaskPicker nextPendingTaskPicker) { super(system2, dbClient, uuidFactory); this.dbClient = dbClient; this.queueStatus = queueStatus; this.computeEngineStatus = computeEngineStatus; + this.nextPendingTaskPicker = nextPendingTaskPicker; } @Override @@ -85,8 +87,8 @@ public class InternalCeQueueImpl extends CeQueueImpl implements InternalCeQueue dbSession.commit(); LOG.debug("{} in progress tasks reset for worker uuid {}", i, workerUuid); } - Optional opt = findPendingTask(workerUuid, dbSession, ceQueueDao, excludeIndexationJob); - if (!opt.isPresent()) { + Optional opt = nextPendingTaskPicker.findPendingTask(workerUuid, dbSession, excludeIndexationJob); + if (opt.isEmpty()) { return Optional.empty(); } CeQueueDto taskDto = opt.get(); @@ -102,17 +104,6 @@ public class InternalCeQueueImpl extends CeQueueImpl implements InternalCeQueue } } - private static Optional findPendingTask(String workerUuid, DbSession dbSession, CeQueueDao ceQueueDao, boolean excludeIndexationJob) { - // try to find tasks including indexation job & excluding app/portfolio - // and if no match, try the opposite - // when excludeIndexationJob is false, search first excluding indexation jobs and including app/portfolio, then the opposite - Optional opt = ceQueueDao.peek(dbSession, workerUuid, excludeIndexationJob, !excludeIndexationJob); - if (!opt.isPresent()) { - opt = ceQueueDao.peek(dbSession, workerUuid, !excludeIndexationJob, excludeIndexationJob); - } - return opt; - } - @Override public void remove(CeTask task, CeActivityDto.Status status, @Nullable CeTaskResult taskResult, @Nullable Throwable error) { checkArgument(error == null || status == CeActivityDto.Status.FAILED, "Error can be provided only when status is FAILED"); diff --git a/server/sonar-ce/src/main/java/org/sonar/ce/queue/NextPendingTaskPicker.java b/server/sonar-ce/src/main/java/org/sonar/ce/queue/NextPendingTaskPicker.java new file mode 100644 index 00000000000..bc3f631c8a5 --- /dev/null +++ b/server/sonar-ce/src/main/java/org/sonar/ce/queue/NextPendingTaskPicker.java @@ -0,0 +1,142 @@ +/* + * SonarQube + * Copyright (C) 2009-2022 SonarSource SA + * mailto:info AT sonarsource DOT com + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 3 of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with this program; if not, write to the Free Software Foundation, + * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ +package org.sonar.ce.queue; + +import java.util.HashSet; +import java.util.List; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; +import javax.annotation.Nullable; +import org.apache.commons.lang.ObjectUtils; +import org.sonar.api.ce.ComputeEngineSide; +import org.sonar.api.config.Configuration; +import org.sonar.api.utils.log.Logger; +import org.sonar.api.utils.log.Loggers; +import org.sonar.core.config.ComputeEngineProperties; +import org.sonar.db.DbClient; +import org.sonar.db.DbSession; +import org.sonar.db.ce.CeQueueDao; +import org.sonar.db.ce.CeQueueDto; +import org.sonar.db.ce.CeTaskDtoLight; +import org.sonar.db.ce.PrOrBranchTask; + +import static java.util.stream.Collectors.toList; +import static org.sonar.db.ce.CeTaskCharacteristicDto.BRANCH_KEY; +import static org.sonar.db.ce.CeTaskCharacteristicDto.PULL_REQUEST; + +@ComputeEngineSide +public class NextPendingTaskPicker { + private static final Logger LOG = Loggers.get(NextPendingTaskPicker.class); + + private final Configuration config; + private final CeQueueDao ceQueueDao; + + public NextPendingTaskPicker(Configuration config, DbClient dbClient) { + this.config = config; + this.ceQueueDao = dbClient.ceQueueDao(); + } + + Optional findPendingTask(String workerUuid, DbSession dbSession, boolean prioritizeAnalysisAndRefresh) { + // try to find tasks including indexation job & excluding app/portfolio and if no match, try the opposite + // when prioritizeAnalysisAndRefresh is false, search first excluding indexation jobs and including app/portfolio, then the opposite + Optional eligibleForPeek = ceQueueDao.selectEligibleForPeek(dbSession, prioritizeAnalysisAndRefresh, !prioritizeAnalysisAndRefresh); + Optional eligibleForPeekInParallel = eligibleForPeekInParallel(dbSession); + + if (eligibleForPeek.isPresent() || eligibleForPeekInParallel.isPresent()) { + return submitOldest(dbSession, workerUuid, eligibleForPeek.orElse(null), eligibleForPeekInParallel.orElse(null)); + } + + eligibleForPeek = ceQueueDao.selectEligibleForPeek(dbSession, !prioritizeAnalysisAndRefresh, prioritizeAnalysisAndRefresh); + if (eligibleForPeek.isPresent()) { + return ceQueueDao.tryToPeek(dbSession, eligibleForPeek.get().getCeTaskUuid(), workerUuid); + } + return Optional.empty(); + } + + /** + * priority is always given to the task that is waiting longer - to avoid starvation + */ + private Optional submitOldest(DbSession session, String workerUuid, @Nullable CeTaskDtoLight eligibleForPeek, @Nullable CeTaskDtoLight eligibleForPeekInParallel) { + CeTaskDtoLight oldest = (CeTaskDtoLight) ObjectUtils.min(eligibleForPeek, eligibleForPeekInParallel); + Optional ceQueueDto = ceQueueDao.tryToPeek(session, oldest.getCeTaskUuid(), workerUuid); + if (!Objects.equals(oldest, eligibleForPeek)) { + ceQueueDto.ifPresent(t -> LOG.info("Task [uuid = " + t.getUuid() + "] will be run concurrently with other tasks for the same project")); + } + return ceQueueDto; + } + + Optional eligibleForPeekInParallel(DbSession dbSession) { + Optional parallelProjectTasksEnabled = config.getBoolean(ComputeEngineProperties.CE_PARALLEL_PROJECT_TASKS_ENABLED); + if (parallelProjectTasksEnabled.isPresent() && Boolean.TRUE.equals(parallelProjectTasksEnabled.get())) { + return findPendingConcurrentCandidateTasks(ceQueueDao, dbSession); + } + return Optional.empty(); + } + + /** + * Some of the tasks of the same project (mostly PRs) can be assigned and executed on workers at the same time/concurrently. + * We look for them in this method. + */ + private static Optional findPendingConcurrentCandidateTasks(CeQueueDao ceQueueDao, DbSession session) { + List queuedPrOrBranches = filterOldestPerProject(ceQueueDao.selectOldestPendingPrOrBranch(session)); + List inProgressTasks = ceQueueDao.selectInProgressWithCharacteristics(session); + + for (PrOrBranchTask task : queuedPrOrBranches) { + if ((Objects.equals(task.getBranchType(), PULL_REQUEST) && canRunPr(task, inProgressTasks)) + || (Objects.equals(task.getBranchType(), BRANCH_KEY) && canRunBranch(task, inProgressTasks))) { + return Optional.of(task); + } + } + return Optional.empty(); + } + + private static List filterOldestPerProject(List queuedPrOrBranches) { + Set mainComponentUuidsSeen = new HashSet<>(); + return queuedPrOrBranches.stream().filter(t -> mainComponentUuidsSeen.add(t.getMainComponentUuid())).collect(toList()); + } + + /** + * Branches cannot be run concurrently at this moment with other branches. And branches can already be returned in + * {@link CeQueueDao#selectEligibleForPeek(org.sonar.db.DbSession, boolean, boolean)}. But we need this method because branches can be + * assigned to a worker in a situation where the only type of in-progress tasks for a given project are {@link #PULLREQUEST_TYPE}. + *

+ * This method returns the longest waiting branch in the queue which can be scheduled concurrently with pull requests. + */ + private static boolean canRunBranch(PrOrBranchTask task, List inProgress) { + String mainComponentUuid = task.getMainComponentUuid(); + List sameComponentTasks = inProgress.stream() + .filter(t -> t.getMainComponentUuid().equals(mainComponentUuid)) + .collect(toList()); + //we can peek branch analysis task only if all the other in progress tasks for this component uuid are pull requests + return sameComponentTasks.stream().map(PrOrBranchTask::getBranchType).allMatch(s -> Objects.equals(s, PULL_REQUEST)); + } + + /** + * Queued pull requests can almost always be assigned to worker unless there is already PR running with the same ID (text_value column) + * and for the same project. We look for the one that waits for the longest time. + */ + private static boolean canRunPr(PrOrBranchTask task, List inProgress) { + // return true unless the same PR is already in progress + return inProgress.stream() + .noneMatch(pr -> pr.getMainComponentUuid().equals(task.getMainComponentUuid()) && Objects.equals(pr.getBranchType(), PULL_REQUEST) && + Objects.equals(pr.getComponentUuid(), (task.getComponentUuid()))); + } +} diff --git a/server/sonar-ce/src/test/java/org/sonar/ce/queue/InternalCeQueueImplTest.java b/server/sonar-ce/src/test/java/org/sonar/ce/queue/InternalCeQueueImplTest.java index 05b04d9ac2f..c488e4db934 100644 --- a/server/sonar-ce/src/test/java/org/sonar/ce/queue/InternalCeQueueImplTest.java +++ b/server/sonar-ce/src/test/java/org/sonar/ce/queue/InternalCeQueueImplTest.java @@ -28,6 +28,7 @@ import javax.annotation.Nullable; import org.junit.Before; import org.junit.Rule; import org.junit.Test; +import org.sonar.api.config.Configuration; import org.sonar.api.impl.utils.AlwaysIncreasingSystem2; import org.sonar.api.utils.System2; import org.sonar.ce.container.ComputeEngineStatus; @@ -53,6 +54,7 @@ import static java.util.Collections.emptyMap; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.assertj.core.api.Assertions.fail; +import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; @@ -77,10 +79,14 @@ public class InternalCeQueueImplTest { private UuidFactory uuidFactory = UuidFactoryImpl.INSTANCE; private CEQueueStatus queueStatus = new CEQueueStatusImpl(db.getDbClient(), mock(System2.class)); private ComputeEngineStatus computeEngineStatus = mock(ComputeEngineStatus.class); - private InternalCeQueue underTest = new InternalCeQueueImpl(system2, db.getDbClient(), uuidFactory, queueStatus, computeEngineStatus); + private Configuration config = mock(Configuration.class); + private NextPendingTaskPicker nextPendingTaskPicker = new NextPendingTaskPicker(config, db.getDbClient()); + private InternalCeQueue underTest = new InternalCeQueueImpl(system2, db.getDbClient(), uuidFactory, queueStatus, + computeEngineStatus, nextPendingTaskPicker); @Before public void setUp() { + when(config.getBoolean(any())).thenReturn(Optional.of(false)); when(computeEngineStatus.getStatus()).thenReturn(STARTED); } @@ -242,7 +248,7 @@ public class InternalCeQueueImplTest { db.getDbClient().ceQueueDao().deleteByUuid(db.getSession(), task.getUuid()); db.commit(); - InternalCeQueueImpl underTest = new InternalCeQueueImpl(system2, db.getDbClient(), null, queueStatus, null); + InternalCeQueueImpl underTest = new InternalCeQueueImpl(system2, db.getDbClient(), null, queueStatus, null, null); try { underTest.remove(task, CeActivityDto.Status.SUCCESS, null, null); @@ -259,7 +265,7 @@ public class InternalCeQueueImplTest { CeTask task = submit(CeTaskTypes.REPORT, newProjectDto("PROJECT_1")); db.getDbClient().ceQueueDao().deleteByUuid(db.getSession(), task.getUuid()); db.commit(); - InternalCeQueueImpl underTest = new InternalCeQueueImpl(system2, db.getDbClient(), null, queueStatusMock, null); + InternalCeQueueImpl underTest = new InternalCeQueueImpl(system2, db.getDbClient(), null, queueStatusMock, null, null); try { underTest.remove(task, CeActivityDto.Status.FAILED, null, null); @@ -276,7 +282,7 @@ public class InternalCeQueueImplTest { CeTask task = submit(CeTaskTypes.REPORT, newProjectDto("PROJECT_1")); db.executeUpdateSql("update ce_queue set status = 'PENDING', started_at = 123 where uuid = '" + task.getUuid() + "'"); db.commit(); - InternalCeQueueImpl underTest = new InternalCeQueueImpl(system2, db.getDbClient(), null, queueStatusMock, null); + InternalCeQueueImpl underTest = new InternalCeQueueImpl(system2, db.getDbClient(), null, queueStatusMock, null, null); underTest.cancelWornOuts(); diff --git a/server/sonar-ce/src/test/java/org/sonar/ce/queue/NextPendingTaskPickerTest.java b/server/sonar-ce/src/test/java/org/sonar/ce/queue/NextPendingTaskPickerTest.java new file mode 100644 index 00000000000..f39b9fdb3df --- /dev/null +++ b/server/sonar-ce/src/test/java/org/sonar/ce/queue/NextPendingTaskPickerTest.java @@ -0,0 +1,386 @@ +/* + * SonarQube + * Copyright (C) 2009-2022 SonarSource SA + * mailto:info AT sonarsource DOT com + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 3 of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with this program; if not, write to the Free Software Foundation, + * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ +package org.sonar.ce.queue; + +import java.util.Optional; +import java.util.UUID; +import java.util.function.Consumer; +import javax.annotation.Nullable; +import org.jetbrains.annotations.NotNull; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.sonar.api.config.Configuration; +import org.sonar.api.impl.utils.AlwaysIncreasingSystem2; +import org.sonar.api.utils.System2; +import org.sonar.api.utils.log.LogTester; +import org.sonar.core.config.ComputeEngineProperties; +import org.sonar.db.DbTester; +import org.sonar.db.ce.CeQueueDto; +import org.sonar.db.ce.CeTaskCharacteristicDto; +import org.sonar.db.ce.CeTaskTypes; +import org.sonar.db.component.ComponentDto; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; +import static org.sonar.db.ce.CeQueueDto.Status.IN_PROGRESS; +import static org.sonar.db.ce.CeQueueDto.Status.PENDING; +import static org.sonar.db.ce.CeTaskCharacteristicDto.BRANCH_KEY; +import static org.sonar.db.ce.CeTaskCharacteristicDto.PULL_REQUEST; + +public class NextPendingTaskPickerTest { + + private final System2 alwaysIncreasingSystem2 = new AlwaysIncreasingSystem2(1L, 1); + + private final Configuration config = mock(Configuration.class); + + @Rule + public LogTester logTester = new LogTester(); + + private NextPendingTaskPicker underTest; + + @Rule + public DbTester db = DbTester.create(alwaysIncreasingSystem2); + + @Before + public void before() { + underTest = new NextPendingTaskPicker(config, db.getDbClient()); + when(config.getBoolean(ComputeEngineProperties.CE_PARALLEL_PROJECT_TASKS_ENABLED)).thenReturn(Optional.of(true)); + } + + @Test + public void findPendingTask_whenNoTasksPending_returnsEmpty() { + Optional ceQueueDto = underTest.findPendingTask("workerUuid", db.getSession(), true); + + assertThat(ceQueueDto).isEmpty(); + } + + @Test + public void findPendingTask_whenTwoTasksPending_returnsTheOlderOne() { + // both the 'eligibleTask' and 'parallelEligibleTask' will point to this one + insertPending("1"); + insertPending("2"); + Optional ceQueueDto = underTest.findPendingTask("workerUuid", db.getSession(), true); + + assertThat(ceQueueDto).isPresent(); + assertThat(ceQueueDto.get().getUuid()).isEqualTo("1"); + } + + @Test + public void findPendingTask_whenTwoTasksPendingWithSameCreationDate_returnsLowestUuid() { + insertPending("d", c -> c.setCreatedAt(1L).setUpdatedAt(1L)); + insertPending("c", c -> c.setCreatedAt(1L).setUpdatedAt(1L)); + Optional ceQueueDto = underTest.findPendingTask("workerUuid", db.getSession(), true); + + assertThat(ceQueueDto).isPresent(); + assertThat(ceQueueDto.get().getUuid()).isEqualTo("c"); + } + + @Test + public void findPendingTask_givenBranchInProgressAndPropertySet_returnQueuedPR() { + insertInProgress("1"); + insertPendingPullRequest("2"); + Optional ceQueueDto = underTest.findPendingTask("workerUuid", db.getSession(), true); + + assertThat(ceQueueDto).isPresent(); + assertThat(ceQueueDto.get().getUuid()).isEqualTo("2"); + + assertThat(logTester.logs()).contains("Task [uuid = " + ceQueueDto.get().getUuid() + "] will be run concurrently with other tasks for the same project"); + } + + @Test + public void findPendingTask_givenBranchInProgressAndPropertyNotSet_dontReturnQueuedPR() { + when(config.getBoolean(ComputeEngineProperties.CE_PARALLEL_PROJECT_TASKS_ENABLED)).thenReturn(Optional.of(false)); + insertInProgress("1"); + insertPendingPullRequest("2"); + Optional ceQueueDto = underTest.findPendingTask("workerUuid", db.getSession(), true); + + assertThat(ceQueueDto).isEmpty(); + } + + @Test + public void findPendingTask_given2PRsQueued_returnBothQueuedPR() { + insertPendingPullRequest("1"); + insertPendingPullRequest("2"); + Optional ceQueueDto = underTest.findPendingTask("workerUuid", db.getSession(), true); + Optional ceQueueDto2 = underTest.findPendingTask("workerUuid2", db.getSession(), true); + + assertThat(ceQueueDto).isPresent(); + assertThat(ceQueueDto2).isPresent(); + assertThat(ceQueueDto.get().getUuid()).isEqualTo("1"); + assertThat(ceQueueDto.get().getStatus()).isEqualTo(IN_PROGRESS); + assertThat(ceQueueDto2.get().getStatus()).isEqualTo(IN_PROGRESS); + } + + @Test + public void findPendingTask_given1MainBranch_2PRsQueued_returnMainBranchAndPRs() { + insertPending("1"); + insertPendingPullRequest("2"); + insertPendingPullRequest("3"); + Optional ceQueueDto = underTest.findPendingTask("workerUuid", db.getSession(), true); + Optional ceQueueDto2 = underTest.findPendingTask("workerUuid2", db.getSession(), true); + Optional ceQueueDto3 = underTest.findPendingTask("workerUuid3", db.getSession(), true); + + assertThat(ceQueueDto).isPresent(); + assertThat(ceQueueDto2).isPresent(); + assertThat(ceQueueDto3).isPresent(); + assertThat(ceQueueDto.get().getUuid()).isEqualTo("1"); + assertThat(ceQueueDto.get().getStatus()).isEqualTo(IN_PROGRESS); + assertThat(ceQueueDto2.get().getUuid()).isEqualTo("2"); + assertThat(ceQueueDto2.get().getStatus()).isEqualTo(IN_PROGRESS); + assertThat(ceQueueDto3.get().getUuid()).isEqualTo("3"); + assertThat(ceQueueDto3.get().getStatus()).isEqualTo(IN_PROGRESS); + } + + @Test + public void findPendingTask_given1MainBranch_2BranchesQueued_returnOnyMainBranch() { + insertPending("1", null); + insertPendingBranch("2"); + insertPendingBranch("3"); + Optional ceQueueDto = underTest.findPendingTask("workerUuid", db.getSession(), true); + Optional ceQueueDto2 = underTest.findPendingTask("workerUuid2", db.getSession(), true); + + assertThat(ceQueueDto).isPresent(); + assertThat(ceQueueDto2).isEmpty(); + assertThat(ceQueueDto.get().getUuid()).isEqualTo("1"); + assertThat(ceQueueDto.get().getStatus()).isEqualTo(IN_PROGRESS); + } + + @Test + public void findPendingTask_given2BranchesQueued_returnOnlyFirstQueuedBranch() { + insertPending("1"); + insertPendingBranch("2"); + Optional ceQueueDto = underTest.findPendingTask("workerUuid", db.getSession(), true); + Optional ceQueueDto2 = underTest.findPendingTask("workerUuid2", db.getSession(), true); + + assertThat(ceQueueDto).isPresent(); + assertThat(ceQueueDto2).isEmpty(); + assertThat(ceQueueDto.get().getStatus()).isEqualTo(IN_PROGRESS); + } + + @Test + public void findPendingTask_given2SamePRsQueued_returnOnlyFirstQueuedPR() { + insertPendingPullRequest("1", c -> c.setComponentUuid("pr1")); + insertPendingPullRequest("2", c -> c.setComponentUuid("pr1")); + Optional ceQueueDto = underTest.findPendingTask("workerUuid", db.getSession(), true); + Optional ceQueueDto2 = underTest.findPendingTask("workerUuid2", db.getSession(), true); + + assertThat(ceQueueDto).isPresent(); + assertThat(ceQueueDto2).isEmpty(); + assertThat(ceQueueDto.get().getStatus()).isEqualTo(IN_PROGRESS); + } + + @Test + public void findPendingTask_givenBranchInTheQueueOlderThanPrInTheQueue_dontJumpAheadOfBranch() { + // we have branch task in progress. Next branch task needs to wait for this one to finish. We dont allow PRs to jump ahead of this branch + insertInProgress("1"); + insertPending("2"); + insertPendingPullRequest("3"); + + Optional ceQueueDto = underTest.findPendingTask("workerUuid", db.getSession(), true); + + assertThat(ceQueueDto).isEmpty(); + } + + @Test + public void findPendingTask_givenDifferentProjectAndPrInTheQueue_dontJumpAheadOfDifferentProject() { + // we have branch task in progress. + insertInProgress("1"); + // The PR can run in parallel, but needs to wait for this other project to finish. We dont allow PRs to jump ahead + insertPending("2", c -> c.setMainComponentUuid("different project")); + insertPendingPullRequest("3"); + + Optional ceQueueDto = underTest.findPendingTask("workerUuid", db.getSession(), true); + + assertThat(ceQueueDto).isPresent(); + assertThat(ceQueueDto.get().getUuid()).isEqualTo("2"); + } + + @Test + public void findPendingTask_givenDifferentProjectAndPrInTheQueue_prCanRunFirst() { + // we have branch task in progress. + insertInProgress("1"); + // The PR can run in parallel and is ahead of the other project + insertPendingPullRequest("2"); + insertPending("3", c -> c.setMainComponentUuid("different project")); + + Optional ceQueueDto = underTest.findPendingTask("workerUuid", db.getSession(), true); + + assertThat(ceQueueDto).isPresent(); + assertThat(ceQueueDto.get().getUuid()).isEqualTo("2"); + } + + @Test + public void findPendingTask_givenFivePrsInProgress_branchCanBeScheduled() { + insertInProgressPullRequest("1"); + insertInProgressPullRequest("2"); + insertInProgressPullRequest("3"); + insertInProgressPullRequest("4"); + insertInProgressPullRequest("5"); + insertPending("6"); + + Optional ceQueueDto = underTest.findPendingTask("workerUuid", db.getSession(), true); + + assertThat(ceQueueDto).isPresent(); + assertThat(ceQueueDto.get().getUuid()).isEqualTo("6"); + } + + @Test + public void findPendingTask_excludingViewPickUpOrphanBranches() { + insertPending("1", dto -> dto + .setComponentUuid("1") + .setMainComponentUuid("non-existing-uuid") + .setStatus(PENDING) + .setTaskType(CeTaskTypes.BRANCH_ISSUE_SYNC) + .setCreatedAt(100_000L)); + + Optional peek = underTest.findPendingTask("1", db.getSession(), false); + assertThat(peek).isPresent(); + assertThat(peek.get().getUuid()).isEqualTo("1"); + } + + @Test + public void exclude_portfolios_computation_when_indexing_issues() { + String taskUuid1 = "1", taskUuid2 = "2"; + String mainComponentUuid = "1"; + insertBranch(mainComponentUuid); + insertPending(taskUuid1, dto -> dto + .setComponentUuid(mainComponentUuid) + .setMainComponentUuid(mainComponentUuid) + .setStatus(PENDING) + .setTaskType(CeTaskTypes.BRANCH_ISSUE_SYNC) + .setCreatedAt(100_000L)); + + String view_uuid = "view_uuid"; + insertView(view_uuid); + insertPending(taskUuid2, dto -> dto + .setComponentUuid(view_uuid) + .setMainComponentUuid(view_uuid) + .setStatus(PENDING) + .setTaskType(CeTaskTypes.REPORT) + .setCreatedAt(100_000L)); + + Optional peek = underTest.findPendingTask("1", db.getSession(), false); + assertThat(peek).isPresent(); + assertThat(peek.get().getUuid()).isEqualTo(taskUuid1); + + Optional peek2 = underTest.findPendingTask("1", db.getSession(), false); + assertThat(peek2).isPresent(); + assertThat(peek2.get().getUuid()).isEqualTo(taskUuid2); + } + + private CeQueueDto insertPending(String uuid) { + return insertPending(uuid, null); + } + + private CeQueueDto insertPendingBranch(String uuid) { + CeQueueDto queue = insertPending(uuid, null); + insertCharacteristics(queue.getUuid(), BRANCH_KEY); + return queue; + } + + private CeQueueDto insertPendingPullRequest(String uuid) { + return insertPendingPullRequest(uuid, null); + } + + private CeQueueDto insertPendingPullRequest(String uuid, @Nullable Consumer ceQueueDto) { + CeQueueDto queue = insertPending(uuid, ceQueueDto); + insertCharacteristics(queue.getUuid(), PULL_REQUEST); + return queue; + } + + private CeQueueDto insertInProgressPullRequest(String uuid) { + CeQueueDto queue = insertInProgress(uuid, null); + insertCharacteristics(queue.getUuid(), PULL_REQUEST); + return queue; + } + + private CeQueueDto insertInProgress(String uuid) { + return insertInProgress(uuid, null); + } + + private CeQueueDto insertInProgress(String uuid, @Nullable Consumer ceQueueDto) { + return insertTask(uuid, IN_PROGRESS, ceQueueDto); + } + + private CeQueueDto insertPending(String uuid, @Nullable Consumer ceQueueDto) { + return insertTask(uuid, PENDING, ceQueueDto); + } + + private CeTaskCharacteristicDto insertCharacteristics(String taskUuid, String branchType) { + var ctcDto = new CeTaskCharacteristicDto(); + ctcDto.setUuid(UUID.randomUUID().toString()); + ctcDto.setTaskUuid(taskUuid); + ctcDto.setKey(branchType); + ctcDto.setValue("value"); + db.getDbClient().ceTaskCharacteristicsDao().insert(db.getSession(), ctcDto); + db.getSession().commit(); + return ctcDto; + } + + private CeQueueDto insertTask(String uuid, CeQueueDto.Status status, @Nullable Consumer ceQueueDtoConsumer) { + CeQueueDto dto = createCeQueue(uuid, status, ceQueueDtoConsumer); + db.getDbClient().ceQueueDao().insert(db.getSession(), dto); + db.getSession().commit(); + return dto; + } + + @NotNull + private static CeQueueDto createCeQueue(String uuid, CeQueueDto.Status status, @Nullable Consumer ceQueueDtoConsumer) { + CeQueueDto dto = new CeQueueDto(); + dto.setUuid(uuid); + dto.setTaskType(CeTaskTypes.REPORT); + dto.setStatus(status); + dto.setSubmitterUuid("henri"); + dto.setComponentUuid(UUID.randomUUID().toString()); + dto.setMainComponentUuid("1"); + if (ceQueueDtoConsumer != null) { + ceQueueDtoConsumer.accept(dto); + } + return dto; + } + + private void insertView(String view_uuid) { + ComponentDto view = new ComponentDto(); + view.setQualifier("VW"); + view.setKey(view_uuid + "_key"); + view.setUuid(view_uuid); + view.setPrivate(false); + view.setRootUuid(view_uuid); + view.setUuidPath("uuid_path"); + view.setBranchUuid(view_uuid); + db.components().insertPortfolioAndSnapshot(view); + db.commit(); + } + + private void insertBranch(String uuid) { + ComponentDto branch = new ComponentDto(); + branch.setQualifier("TRK"); + branch.setKey(uuid + "_key"); + branch.setUuid(uuid); + branch.setPrivate(false); + branch.setRootUuid(uuid); + branch.setUuidPath("uuid_path"); + branch.setBranchUuid(uuid); + db.components().insertComponent(branch); + db.commit(); + } +} diff --git a/server/sonar-db-dao/src/main/java/org/sonar/db/ce/CeQueueDao.java b/server/sonar-db-dao/src/main/java/org/sonar/db/ce/CeQueueDao.java index 1e86d6b7118..eec48ef4492 100644 --- a/server/sonar-db-dao/src/main/java/org/sonar/db/ce/CeQueueDao.java +++ b/server/sonar-db-dao/src/main/java/org/sonar/db/ce/CeQueueDao.java @@ -134,7 +134,7 @@ public class CeQueueDao implements Dao { } /** - * Update all tasks for the specified worker uuid which are not PENDING to: + * Updates all tasks for the specified worker uuid which are not PENDING to: * STATUS='PENDING', STARTED_AT=NULL, UPDATED_AT={now}. */ public int resetToPendingForWorker(DbSession session, String workerUuid) { @@ -173,17 +173,7 @@ public class CeQueueDao implements Dao { return builder.build(); } - public Optional peek(DbSession session, String workerUuid, boolean excludeIndexationJob, boolean excludeViewRefresh) { - List eligibles = mapper(session).selectEligibleForPeek(ONE_RESULT_PAGINATION, excludeIndexationJob, excludeViewRefresh); - if (eligibles.isEmpty()) { - return Optional.empty(); - } - - String eligible = eligibles.get(0); - return tryToPeek(session, eligible, workerUuid); - } - - private Optional tryToPeek(DbSession session, String eligibleTaskUuid, String workerUuid) { + public Optional tryToPeek(DbSession session, String eligibleTaskUuid, String workerUuid) { long now = system2.now(); int touchedRows = mapper(session).updateIf(eligibleTaskUuid, new UpdateIf.NewProperties(IN_PROGRESS, workerUuid, now, now), @@ -204,4 +194,19 @@ public class CeQueueDao implements Dao { private static CeQueueMapper mapper(DbSession session) { return session.getMapper(CeQueueMapper.class); } + + /** + * Only returns tasks for projects that currently have no other tasks running + */ + public Optional selectEligibleForPeek(DbSession session, boolean excludeIndexationJob, boolean excludeView) { + return mapper(session).selectEligibleForPeek(ONE_RESULT_PAGINATION, excludeIndexationJob, excludeView); + } + + public List selectOldestPendingPrOrBranch(DbSession session) { + return mapper(session).selectOldestPendingPrOrBranch(); + } + + public List selectInProgressWithCharacteristics(DbSession session) { + return mapper(session).selectInProgressWithCharacteristics(); + } } diff --git a/server/sonar-db-dao/src/main/java/org/sonar/db/ce/CeQueueMapper.java b/server/sonar-db-dao/src/main/java/org/sonar/db/ce/CeQueueMapper.java index 84ec718269a..3738d6fc132 100644 --- a/server/sonar-db-dao/src/main/java/org/sonar/db/ce/CeQueueMapper.java +++ b/server/sonar-db-dao/src/main/java/org/sonar/db/ce/CeQueueMapper.java @@ -20,6 +20,7 @@ package org.sonar.db.ce; import java.util.List; +import java.util.Optional; import javax.annotation.CheckForNull; import javax.annotation.Nullable; import org.apache.ibatis.annotations.Param; @@ -36,7 +37,7 @@ public interface CeQueueMapper { int countByQuery(@Param("query") CeTaskQuery query); - List selectEligibleForPeek(@Param("pagination") Pagination pagination, + Optional selectEligibleForPeek(@Param("pagination") Pagination pagination, @Param("excludeIndexationJob") boolean excludeIndexationJob, @Param("excludeViewRefresh") boolean excludeViewRefresh); @@ -48,6 +49,8 @@ public interface CeQueueMapper { */ List selectPending(); + List selectInProgressWithCharacteristics(); + /** * Select all pending tasks which have already been started. */ @@ -87,4 +90,5 @@ public interface CeQueueMapper { boolean hasAnyIssueSyncTaskPendingOrInProgress(); + List selectOldestPendingPrOrBranch(); } diff --git a/server/sonar-db-dao/src/main/java/org/sonar/db/ce/CeTaskDtoLight.java b/server/sonar-db-dao/src/main/java/org/sonar/db/ce/CeTaskDtoLight.java new file mode 100644 index 00000000000..c121ed5ee57 --- /dev/null +++ b/server/sonar-db-dao/src/main/java/org/sonar/db/ce/CeTaskDtoLight.java @@ -0,0 +1,67 @@ +/* + * SonarQube + * Copyright (C) 2009-2022 SonarSource SA + * mailto:info AT sonarsource DOT com + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 3 of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with this program; if not, write to the Free Software Foundation, + * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ +package org.sonar.db.ce; + +import java.util.Comparator; +import java.util.Objects; + +public class CeTaskDtoLight implements Comparable { + + private String ceTaskUuid; + private long createdAt; + + public void setCeTaskUuid(String ceTaskUuid) { + this.ceTaskUuid = ceTaskUuid; + } + + public void setCreatedAt(long createdAt) { + this.createdAt = createdAt; + } + + public long getCreatedAt() { + return createdAt; + } + + public String getCeTaskUuid() { + return ceTaskUuid; + } + + @Override + public int compareTo(CeTaskDtoLight o) { + return Comparator.comparingLong(CeTaskDtoLight::getCreatedAt).thenComparing(CeTaskDtoLight::getCeTaskUuid).compare(this, o); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + CeTaskDtoLight that = (CeTaskDtoLight) o; + return createdAt == that.createdAt && Objects.equals(ceTaskUuid, that.ceTaskUuid); + } + + @Override + public int hashCode() { + return Objects.hash(ceTaskUuid, createdAt); + } +} diff --git a/server/sonar-db-dao/src/main/java/org/sonar/db/ce/PrOrBranchTask.java b/server/sonar-db-dao/src/main/java/org/sonar/db/ce/PrOrBranchTask.java new file mode 100644 index 00000000000..86c37662e50 --- /dev/null +++ b/server/sonar-db-dao/src/main/java/org/sonar/db/ce/PrOrBranchTask.java @@ -0,0 +1,44 @@ +/* + * SonarQube + * Copyright (C) 2009-2022 SonarSource SA + * mailto:info AT sonarsource DOT com + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 3 of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with this program; if not, write to the Free Software Foundation, + * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ +package org.sonar.db.ce; + +public class PrOrBranchTask extends CeTaskDtoLight { + private String mainComponentUuid; + private String taskType; + private String branchType; + private String componentUuid; + + public String getMainComponentUuid() { + return mainComponentUuid; + } + + public String getBranchType() { + return branchType; + } + + public String getComponentUuid() { + return componentUuid; + } + + public String getTaskType() { + return taskType; + } + +} diff --git a/server/sonar-db-dao/src/main/resources/org/sonar/db/ce/CeQueueMapper.xml b/server/sonar-db-dao/src/main/resources/org/sonar/db/ce/CeQueueMapper.xml index 8f39246a624..9aebba149fb 100644 --- a/server/sonar-db-dao/src/main/resources/org/sonar/db/ce/CeQueueMapper.xml +++ b/server/sonar-db-dao/src/main/resources/org/sonar/db/ce/CeQueueMapper.xml @@ -139,15 +139,15 @@ - + select cq.uuid as ceTaskUuid, cq.created_at as createdAt limit #{pagination.pageSize,jdbcType=INTEGER} offset #{pagination.offset,jdbcType=INTEGER} - + select query.uuid as ceTaskUuid, query.created_at as createdAt from ( select row_number() over() as number, @@ -158,12 +158,12 @@ order by number asc - + select taskuuid as ceTaskUuid, createdat as createdAt from ( + select rownum as rn, t."uuid" as taskuuid, t."created_at" as createdat from ( select - + ) t ) t @@ -340,4 +340,47 @@ from dual + + + + + + + + cq.uuid as ceTaskUuid, + cq.main_component_uuid as mainComponentUuid, + cq.component_uuid as componentUuid, + cq.created_at as createdAt, + cq.task_type as taskType, + coalesce(ctc.kee, 'branch') as branchType + from + ce_queue cq + left join ce_task_characteristics ctc on cq.uuid = ctc.task_uuid and (ctc.kee = 'branch' or ctc.kee = 'pullRequest') + where + cq.status = 'PENDING' + and cq.task_type = 'REPORT' + order by + cq.created_at, cq.uuid + + + + diff --git a/server/sonar-db-dao/src/test/java/org/sonar/db/ce/CeQueueDaoTest.java b/server/sonar-db-dao/src/test/java/org/sonar/db/ce/CeQueueDaoTest.java index 2eb980d3913..b7d473c9e86 100644 --- a/server/sonar-db-dao/src/test/java/org/sonar/db/ce/CeQueueDaoTest.java +++ b/server/sonar-db-dao/src/test/java/org/sonar/db/ce/CeQueueDaoTest.java @@ -39,7 +39,6 @@ import org.sonar.api.impl.utils.AlwaysIncreasingSystem2; import org.sonar.api.impl.utils.TestSystem2; import org.sonar.api.utils.System2; import org.sonar.db.DbTester; -import org.sonar.db.component.ComponentDto; import static com.google.common.collect.Lists.newArrayList; import static java.util.Collections.singletonList; @@ -52,6 +51,8 @@ import static org.sonar.db.ce.CeQueueDto.Status.IN_PROGRESS; import static org.sonar.db.ce.CeQueueDto.Status.PENDING; import static org.sonar.db.ce.CeQueueTesting.newCeQueueDto; import static org.sonar.db.ce.CeQueueTesting.reset; +import static org.sonar.db.ce.CeTaskCharacteristicDto.BRANCH_KEY; +import static org.sonar.db.ce.CeTaskCharacteristicDto.PULL_REQUEST; public class CeQueueDaoTest { private static final long INIT_TIME = 1_450_000_000_000L; @@ -59,6 +60,7 @@ public class CeQueueDaoTest { private static final String TASK_UUID_2 = "TASK_2"; private static final String MAIN_COMPONENT_UUID_1 = "PROJECT_1"; private static final String MAIN_COMPONENT_UUID_2 = "PROJECT_2"; + private static final String COMPONENT_UUID_1 = "BRANCH_1"; private static final String TASK_UUID_3 = "TASK_3"; private static final String SELECT_QUEUE_UUID_AND_STATUS_QUERY = "select uuid,status from ce_queue"; private static final String SUBMITTER_LOGIN = "submitter uuid"; @@ -391,69 +393,6 @@ public class CeQueueDaoTest { assertThat(dto.getWorkerUuid()).isEqualTo(workerUuid); } - @Test - public void peek_none_if_no_pendings() { - assertThat(underTest.peek(db.getSession(), WORKER_UUID_1, false, false)).isNotPresent(); - - // not pending, but in progress - makeInProgress(WORKER_UUID_1, 2_232_222L, insertPending(TASK_UUID_1, MAIN_COMPONENT_UUID_1)); - assertThat(underTest.peek(db.getSession(), WORKER_UUID_1, false, false)).isNotPresent(); - } - - @Test - public void peek_oldest_pending() { - insertPending(TASK_UUID_1, MAIN_COMPONENT_UUID_1); - system2.setNow(INIT_TIME + 3_000_000); - insertPending(TASK_UUID_2, MAIN_COMPONENT_UUID_2); - - assertThat(db.countRowsOfTable("ce_queue")).isEqualTo(2); - verifyCeQueueStatuses(TASK_UUID_1, PENDING, TASK_UUID_2, PENDING); - - // peek first one - Optional peek = underTest.peek(db.getSession(), WORKER_UUID_1, false, false); - assertThat(peek).isPresent(); - assertThat(peek.get().getUuid()).isEqualTo(TASK_UUID_1); - assertThat(peek.get().getStatus()).isEqualTo(IN_PROGRESS); - assertThat(peek.get().getWorkerUuid()).isEqualTo(WORKER_UUID_1); - verifyCeQueueStatuses(TASK_UUID_1, IN_PROGRESS, TASK_UUID_2, PENDING); - - // peek second one - peek = underTest.peek(db.getSession(), WORKER_UUID_2, false, false); - assertThat(peek).isPresent(); - assertThat(peek.get().getUuid()).isEqualTo(TASK_UUID_2); - assertThat(peek.get().getStatus()).isEqualTo(IN_PROGRESS); - assertThat(peek.get().getWorkerUuid()).isEqualTo(WORKER_UUID_2); - verifyCeQueueStatuses(TASK_UUID_1, IN_PROGRESS, TASK_UUID_2, IN_PROGRESS); - - // no more pendings - assertThat(underTest.peek(db.getSession(), WORKER_UUID_1, false, false)).isNotPresent(); - } - - @Test - public void do_not_peek_multiple_tasks_on_same_main_component_at_the_same_time() { - // two pending tasks on the same project - insertPending(TASK_UUID_1, MAIN_COMPONENT_UUID_1); - system2.setNow(INIT_TIME + 3_000_000); - insertPending(TASK_UUID_2, MAIN_COMPONENT_UUID_1); - - Optional peek = underTest.peek(db.getSession(), WORKER_UUID_1, false, false); - assertThat(peek).isPresent(); - assertThat(peek.get().getUuid()).isEqualTo(TASK_UUID_1); - assertThat(peek.get().getMainComponentUuid()).isEqualTo(MAIN_COMPONENT_UUID_1); - assertThat(peek.get().getWorkerUuid()).isEqualTo(WORKER_UUID_1); - verifyCeQueueStatuses(TASK_UUID_1, IN_PROGRESS, TASK_UUID_2, PENDING); - - // do not peek second task as long as the first one is in progress - peek = underTest.peek(db.getSession(), WORKER_UUID_1, false, false); - assertThat(peek).isEmpty(); - - // first one is finished - underTest.deleteByUuid(db.getSession(), TASK_UUID_1); - peek = underTest.peek(db.getSession(), WORKER_UUID_2, false, false); - assertThat(peek.get().getUuid()).isEqualTo(TASK_UUID_2); - assertThat(peek.get().getWorkerUuid()).isEqualTo(WORKER_UUID_2); - } - @Test public void select_by_query() { // task status not in query @@ -632,97 +571,134 @@ public class CeQueueDaoTest { } @Test - public void exclude_portfolios_computation_when_indexing_issues() { - insertBranch(MAIN_COMPONENT_UUID_1); + public void hasAnyIssueSyncTaskPendingOrInProgress_PENDING() { + assertThat(underTest.hasAnyIssueSyncTaskPendingOrInProgress(db.getSession())).isFalse(); + insertPending(newCeQueueDto(TASK_UUID_1) .setComponentUuid(MAIN_COMPONENT_UUID_1) - .setMainComponentUuid(MAIN_COMPONENT_UUID_1) .setStatus(PENDING) .setTaskType(CeTaskTypes.BRANCH_ISSUE_SYNC) .setCreatedAt(100_000L)); - String view_uuid = "view_uuid"; - insertView(view_uuid); - insertPending(newCeQueueDto(TASK_UUID_2) - .setComponentUuid(view_uuid) - .setMainComponentUuid(view_uuid) - .setStatus(PENDING) - .setTaskType(CeTaskTypes.REPORT) - .setCreatedAt(100_000L)); - - Optional peek = underTest.peek(db.getSession(), WORKER_UUID_1, false, true); - assertThat(peek).isPresent(); - assertThat(peek.get().getUuid()).isEqualTo(TASK_UUID_1); - - Optional peek2 = underTest.peek(db.getSession(), WORKER_UUID_1, false, false); - assertThat(peek2).isPresent(); - assertThat(peek2.get().getUuid()).isEqualTo(TASK_UUID_2); + assertThat(underTest.hasAnyIssueSyncTaskPendingOrInProgress(db.getSession())).isTrue(); } @Test - public void excluding_view_pick_up_orphan_branches() { + public void hasAnyIssueSyncTaskPendingOrInProgress_IN_PROGRESS() { + assertThat(underTest.hasAnyIssueSyncTaskPendingOrInProgress(db.getSession())).isFalse(); + insertPending(newCeQueueDto(TASK_UUID_1) .setComponentUuid(MAIN_COMPONENT_UUID_1) - .setMainComponentUuid("non-existing-uuid") - .setStatus(PENDING) + .setStatus(IN_PROGRESS) .setTaskType(CeTaskTypes.BRANCH_ISSUE_SYNC) .setCreatedAt(100_000L)); - Optional peek = underTest.peek(db.getSession(), WORKER_UUID_1, false, true); - assertThat(peek).isPresent(); - assertThat(peek.get().getUuid()).isEqualTo(TASK_UUID_1); + assertThat(underTest.hasAnyIssueSyncTaskPendingOrInProgress(db.getSession())).isTrue(); } @Test - public void hasAnyIssueSyncTaskPendingOrInProgress_PENDING() { - assertThat(underTest.hasAnyIssueSyncTaskPendingOrInProgress(db.getSession())).isFalse(); + public void selectOldestPendingPrOrBranch_returns_oldest_100_pr_or_branch_tasks() { + for (int i = 1; i < 110; i++) { + insertPending(newCeQueueDto("task" + i) + .setComponentUuid(MAIN_COMPONENT_UUID_1).setStatus(PENDING).setTaskType(CeTaskTypes.REPORT).setCreatedAt(i)); + } + for (int i = 1; i < 10; i++) { + insertPending(newCeQueueDto("progress" + i) + .setComponentUuid(MAIN_COMPONENT_UUID_1).setStatus(IN_PROGRESS).setTaskType(CeTaskTypes.REPORT).setCreatedAt(i)); + insertPending(newCeQueueDto("sync" + i) + .setComponentUuid(MAIN_COMPONENT_UUID_1).setStatus(PENDING).setTaskType(CeTaskTypes.BRANCH_ISSUE_SYNC).setCreatedAt(i)); + } + + List prOrBranchTasks = underTest.selectOldestPendingPrOrBranch(db.getSession()); + assertThat(prOrBranchTasks).hasSize(100) + .allMatch(t -> t.getCeTaskUuid().startsWith("task"), "uuid starts with task") + .allMatch(t -> t.getCreatedAt() <= 100, "creation date older or equal than 100"); + } + + @Test + public void selectOldestPendingPrOrBranch_returns_branch_branch_type_if_no_characteristics() { + insertPending(newCeQueueDto(TASK_UUID_1) + .setComponentUuid(COMPONENT_UUID_1) + .setMainComponentUuid(MAIN_COMPONENT_UUID_1) + .setStatus(PENDING) + .setTaskType(CeTaskTypes.REPORT) + .setCreatedAt(123L)); + List prOrBranchTasks = underTest.selectOldestPendingPrOrBranch(db.getSession()); + assertThat(prOrBranchTasks).hasSize(1); + assertThat(prOrBranchTasks.get(0)) + .extracting(PrOrBranchTask::getBranchType, PrOrBranchTask::getComponentUuid, PrOrBranchTask::getMainComponentUuid, PrOrBranchTask::getTaskType) + .containsExactly(BRANCH_KEY, COMPONENT_UUID_1, MAIN_COMPONENT_UUID_1, CeTaskTypes.REPORT); + } + + @Test + public void selectOldestPendingPrOrBranch_returns_branch_branch_type_if_unrelated_characteristics() { insertPending(newCeQueueDto(TASK_UUID_1) - .setComponentUuid(MAIN_COMPONENT_UUID_1) + .setComponentUuid(COMPONENT_UUID_1) + .setMainComponentUuid(MAIN_COMPONENT_UUID_1) .setStatus(PENDING) - .setTaskType(CeTaskTypes.BRANCH_ISSUE_SYNC) - .setCreatedAt(100_000L)); + .setTaskType(CeTaskTypes.REPORT) + .setCreatedAt(123L)); + List prOrBranchTasks = underTest.selectOldestPendingPrOrBranch(db.getSession()); + insertCharacteristic(BRANCH_KEY, "123", "c1", TASK_UUID_1); - assertThat(underTest.hasAnyIssueSyncTaskPendingOrInProgress(db.getSession())).isTrue(); + assertThat(prOrBranchTasks).hasSize(1); + assertThat(prOrBranchTasks.get(0)) + .extracting(PrOrBranchTask::getBranchType, PrOrBranchTask::getComponentUuid, PrOrBranchTask::getMainComponentUuid, PrOrBranchTask::getTaskType) + .containsExactly(BRANCH_KEY, COMPONENT_UUID_1, MAIN_COMPONENT_UUID_1, CeTaskTypes.REPORT); } @Test - public void hasAnyIssueSyncTaskPendingOrInProgress_IN_PROGRESS() { - assertThat(underTest.hasAnyIssueSyncTaskPendingOrInProgress(db.getSession())).isFalse(); + public void selectOldestPendingPrOrBranch_returns_all_fields() { + insertPending(newCeQueueDto(TASK_UUID_1) + .setComponentUuid(COMPONENT_UUID_1) + .setMainComponentUuid(MAIN_COMPONENT_UUID_1) + .setStatus(PENDING) + .setTaskType(CeTaskTypes.REPORT) + .setCreatedAt(123L)); + insertCharacteristic(PULL_REQUEST, "1", "c1", TASK_UUID_1); + + List prOrBranchTasks = underTest.selectOldestPendingPrOrBranch(db.getSession()); + + assertThat(prOrBranchTasks).hasSize(1); + assertThat(prOrBranchTasks.get(0)) + .extracting(PrOrBranchTask::getBranchType, PrOrBranchTask::getComponentUuid, PrOrBranchTask::getMainComponentUuid, PrOrBranchTask::getTaskType) + .containsExactly(PULL_REQUEST, COMPONENT_UUID_1, MAIN_COMPONENT_UUID_1, CeTaskTypes.REPORT); + } + @Test + public void selectInProgressWithCharacteristics_returns_all_fields() { insertPending(newCeQueueDto(TASK_UUID_1) - .setComponentUuid(MAIN_COMPONENT_UUID_1) + .setComponentUuid(COMPONENT_UUID_1) + .setMainComponentUuid(MAIN_COMPONENT_UUID_1) .setStatus(IN_PROGRESS) - .setTaskType(CeTaskTypes.BRANCH_ISSUE_SYNC) - .setCreatedAt(100_000L)); + .setTaskType(CeTaskTypes.REPORT) + .setCreatedAt(123L)); + insertCharacteristic(PULL_REQUEST, "1", "c1", TASK_UUID_1); - assertThat(underTest.hasAnyIssueSyncTaskPendingOrInProgress(db.getSession())).isTrue(); - } + List prOrBranchTasks = underTest.selectInProgressWithCharacteristics(db.getSession()); - private void insertView(String view_uuid) { - ComponentDto view = new ComponentDto(); - view.setQualifier("VW"); - view.setKey(view_uuid + "_key"); - view.setUuid(view_uuid); - view.setPrivate(false); - view.setRootUuid(view_uuid); - view.setUuidPath("uuid_path"); - view.setBranchUuid(view_uuid); - db.components().insertPortfolioAndSnapshot(view); - db.commit(); + assertThat(prOrBranchTasks).hasSize(1); + assertThat(prOrBranchTasks.get(0)) + .extracting(PrOrBranchTask::getBranchType, PrOrBranchTask::getComponentUuid, PrOrBranchTask::getMainComponentUuid, PrOrBranchTask::getTaskType) + .containsExactly(PULL_REQUEST, COMPONENT_UUID_1, MAIN_COMPONENT_UUID_1, CeTaskTypes.REPORT); } - private void insertBranch(String uuid) { - ComponentDto branch = new ComponentDto(); - branch.setQualifier("TRK"); - branch.setKey(uuid + "_key"); - branch.setUuid(uuid); - branch.setPrivate(false); - branch.setRootUuid(uuid); - branch.setUuidPath("uuid_path"); - branch.setBranchUuid(uuid); - db.components().insertComponent(branch); - db.commit(); + @Test + public void selectInProgressWithCharacteristics_returns_branch_branch_type_if_no_characteristics() { + insertPending(newCeQueueDto(TASK_UUID_1) + .setComponentUuid(COMPONENT_UUID_1) + .setMainComponentUuid(MAIN_COMPONENT_UUID_1) + .setStatus(IN_PROGRESS) + .setTaskType(CeTaskTypes.REPORT) + .setCreatedAt(123L)); + + List prOrBranchTasks = underTest.selectInProgressWithCharacteristics(db.getSession()); + + assertThat(prOrBranchTasks).hasSize(1); + assertThat(prOrBranchTasks.get(0)) + .extracting(PrOrBranchTask::getBranchType, PrOrBranchTask::getComponentUuid, PrOrBranchTask::getMainComponentUuid, PrOrBranchTask::getTaskType) + .containsExactly(BRANCH_KEY, COMPONENT_UUID_1, MAIN_COMPONENT_UUID_1, CeTaskTypes.REPORT); } private void insertPending(CeQueueDto dto) { @@ -769,6 +745,15 @@ public class CeQueueDaoTest { return underTest.selectByUuid(db.getSession(), uuid).get(); } + private void insertCharacteristic(String key, String value, String uuid, String taskUuid) { + CeTaskCharacteristicDto dto1 = new CeTaskCharacteristicDto() + .setKey(key) + .setValue(value) + .setUuid(uuid) + .setTaskUuid(taskUuid); + db.getDbClient().ceTaskCharacteristicsDao().insert(db.getSession(), dto1); + } + private static Iterable> upperizeKeys(List> select) { return select.stream().map(new Function, Map>() { @Nullable @@ -783,10 +768,6 @@ public class CeQueueDaoTest { }).collect(Collectors.toList()); } - private void verifyCeQueueStatuses(String taskUuid1, CeQueueDto.Status taskStatus1, String taskUuid2, CeQueueDto.Status taskStatus2) { - verifyCeQueueStatuses(new String[] {taskUuid1, taskUuid2}, new CeQueueDto.Status[] {taskStatus1, taskStatus2}); - } - private void verifyCeQueueStatuses(String[] taskUuids, CeQueueDto.Status[] statuses) { Map[] rows = new Map[taskUuids.length]; for (int i = 0; i < taskUuids.length; i++) { diff --git a/server/sonar-db-dao/src/test/java/org/sonar/db/ce/CeTaskDtoLightTest.java b/server/sonar-db-dao/src/test/java/org/sonar/db/ce/CeTaskDtoLightTest.java new file mode 100644 index 00000000000..1691738deaf --- /dev/null +++ b/server/sonar-db-dao/src/test/java/org/sonar/db/ce/CeTaskDtoLightTest.java @@ -0,0 +1,63 @@ +/* + * SonarQube + * Copyright (C) 2009-2022 SonarSource SA + * mailto:info AT sonarsource DOT com + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 3 of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with this program; if not, write to the Free Software Foundation, + * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ +package org.sonar.db.ce; + +import org.junit.Before; +import org.junit.Test; + +import static org.assertj.core.api.Assertions.assertThat; + +public class CeTaskDtoLightTest { + private final CeTaskDtoLight task1 = new CeTaskDtoLight(); + private final CeTaskDtoLight task2 = new CeTaskDtoLight(); + private final CeTaskDtoLight task3 = new CeTaskDtoLight(); + private final CeTaskDtoLight task4 = new CeTaskDtoLight(); + + @Before + public void setUp() { + task1.setCeTaskUuid("id1"); + task1.setCreatedAt(1); + task2.setCeTaskUuid("id1"); + task2.setCreatedAt(1); + task3.setCeTaskUuid("id2"); + task3.setCreatedAt(1); + task4.setCeTaskUuid("id1"); + task4.setCreatedAt(2); + } + + @Test + public void equals_is_based_on_created_date_and_uuid() { + assertThat(task1).isEqualTo(task2); + assertThat(task1).isNotEqualTo(task3); + assertThat(task1).isNotEqualTo(task4); + } + + @Test + public void hashCode_is_based_on_created_date_and_uuid() { + assertThat(task1).hasSameHashCodeAs(task2); + } + + @Test + public void compareTo_is_based_on_created_date_and_uuid() { + assertThat(task1).isEqualByComparingTo(task2); + assertThat(task1).isLessThan(task3); + assertThat(task1).isLessThan(task4); + } +} diff --git a/sonar-core/src/main/java/org/sonar/core/config/ComputeEngineProperties.java b/sonar-core/src/main/java/org/sonar/core/config/ComputeEngineProperties.java new file mode 100644 index 00000000000..a8b09aae19c --- /dev/null +++ b/sonar-core/src/main/java/org/sonar/core/config/ComputeEngineProperties.java @@ -0,0 +1,29 @@ +/* + * SonarQube + * Copyright (C) 2009-2022 SonarSource SA + * mailto:info AT sonarsource DOT com + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 3 of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with this program; if not, write to the Free Software Foundation, + * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ +package org.sonar.core.config; + +public class ComputeEngineProperties { + + private ComputeEngineProperties() { + } + + public static final String CE_PARALLEL_PROJECT_TASKS_ENABLED = "sonar.ce.parallelProjectTasks"; + +} -- 2.39.5