@Test
public void fail_to_cancel_if_in_progress() {
- submit(CeTaskTypes.REPORT, newComponent(randomAlphabetic(11)));
- CeQueueDto ceQueueDto = db.getDbClient().ceQueueDao().peek(session, WORKER_UUID, false, false).get();
+ CeTask task = submit(CeTaskTypes.REPORT, newComponent(randomAlphabetic(11)));
+ CeQueueDto ceQueueDto = db.getDbClient().ceQueueDao().tryToPeek(session, task.getUuid(), WORKER_UUID).get();
assertThatThrownBy(() -> underTest.cancel(db.getSession(), ceQueueDto))
.isInstanceOf(IllegalStateException.class)
CeTask pendingTask1 = submit(CeTaskTypes.REPORT, newComponent(randomAlphabetic(12)));
CeTask pendingTask2 = submit(CeTaskTypes.REPORT, newComponent(randomAlphabetic(12)));
- db.getDbClient().ceQueueDao().peek(session, WORKER_UUID, false, false);
+ db.getDbClient().ceQueueDao().tryToPeek(session, inProgressTask.getUuid(), WORKER_UUID);
int canceledCount = underTest.cancelAll();
assertThat(canceledCount).isEqualTo(2);
@Test
public void pauseWorkers_marks_workers_as_pausing_if_some_tasks_in_progress() {
- submit(CeTaskTypes.REPORT, newComponent(randomAlphabetic(12)));
- db.getDbClient().ceQueueDao().peek(session, WORKER_UUID, false, false);
+ CeTask task = submit(CeTaskTypes.REPORT, newComponent(randomAlphabetic(12)));
+ db.getDbClient().ceQueueDao().tryToPeek(session, task.getUuid(), WORKER_UUID);
// task is in-progress
assertThat(underTest.getWorkersPauseStatus()).isEqualTo(CeQueue.WorkersPauseStatus.RESUMED);
@Test
public void resumeWorkers_resumes_pausing_workers() {
- submit(CeTaskTypes.REPORT, newComponent(randomAlphabetic(12)));
- db.getDbClient().ceQueueDao().peek(session, WORKER_UUID, false, false);
+ CeTask task = submit(CeTaskTypes.REPORT, newComponent(randomAlphabetic(12)));
+ db.getDbClient().ceQueueDao().tryToPeek(session, task.getUuid(), WORKER_UUID);
// task is in-progress
underTest.pauseWorkers();
@Test
public void fail_in_progress_task() {
CeTask task = submit(CeTaskTypes.REPORT, newComponent(randomAlphabetic(12)));
- CeQueueDto queueDto = db.getDbClient().ceQueueDao().peek(db.getSession(), WORKER_UUID, false, false).get();
+ CeQueueDto queueDto = db.getDbClient().ceQueueDao().tryToPeek(db.getSession(), task.getUuid(), WORKER_UUID).get();
underTest.fail(db.getSession(), queueDto, "TIMEOUT", "Failed on timeout");
import org.sonar.ce.monitoring.CeTasksMBeanImpl;
import org.sonar.ce.queue.CeQueueInitializer;
import org.sonar.ce.queue.InternalCeQueueImpl;
+import org.sonar.ce.queue.NextPendingTaskPicker;
import org.sonar.core.platform.Module;
public class CeQueueModule extends Module {
add(
// queue state
InternalCeQueueImpl.class,
+ NextPendingTaskPicker.class,
// queue monitoring
CeTasksMBeanImpl.class,
private final DbClient dbClient;
private final CEQueueStatus queueStatus;
private final ComputeEngineStatus computeEngineStatus;
+ private final NextPendingTaskPicker nextPendingTaskPicker;
public InternalCeQueueImpl(System2 system2, DbClient dbClient, UuidFactory uuidFactory, CEQueueStatus queueStatus,
- ComputeEngineStatus computeEngineStatus) {
+ ComputeEngineStatus computeEngineStatus, NextPendingTaskPicker nextPendingTaskPicker) {
super(system2, dbClient, uuidFactory);
this.dbClient = dbClient;
this.queueStatus = queueStatus;
this.computeEngineStatus = computeEngineStatus;
+ this.nextPendingTaskPicker = nextPendingTaskPicker;
}
@Override
dbSession.commit();
LOG.debug("{} in progress tasks reset for worker uuid {}", i, workerUuid);
}
- Optional<CeQueueDto> opt = findPendingTask(workerUuid, dbSession, ceQueueDao, excludeIndexationJob);
- if (!opt.isPresent()) {
+ Optional<CeQueueDto> opt = nextPendingTaskPicker.findPendingTask(workerUuid, dbSession, excludeIndexationJob);
+ if (opt.isEmpty()) {
return Optional.empty();
}
CeQueueDto taskDto = opt.get();
}
}
- private static Optional<CeQueueDto> findPendingTask(String workerUuid, DbSession dbSession, CeQueueDao ceQueueDao, boolean excludeIndexationJob) {
- // try to find tasks including indexation job & excluding app/portfolio
- // and if no match, try the opposite
- // when excludeIndexationJob is false, search first excluding indexation jobs and including app/portfolio, then the opposite
- Optional<CeQueueDto> opt = ceQueueDao.peek(dbSession, workerUuid, excludeIndexationJob, !excludeIndexationJob);
- if (!opt.isPresent()) {
- opt = ceQueueDao.peek(dbSession, workerUuid, !excludeIndexationJob, excludeIndexationJob);
- }
- return opt;
- }
-
@Override
public void remove(CeTask task, CeActivityDto.Status status, @Nullable CeTaskResult taskResult, @Nullable Throwable error) {
checkArgument(error == null || status == CeActivityDto.Status.FAILED, "Error can be provided only when status is FAILED");
--- /dev/null
+/*
+ * SonarQube
+ * Copyright (C) 2009-2022 SonarSource SA
+ * mailto:info AT sonarsource DOT com
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 3 of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this program; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ */
+package org.sonar.ce.queue;
+
+import java.util.HashSet;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import javax.annotation.Nullable;
+import org.apache.commons.lang.ObjectUtils;
+import org.sonar.api.ce.ComputeEngineSide;
+import org.sonar.api.config.Configuration;
+import org.sonar.api.utils.log.Logger;
+import org.sonar.api.utils.log.Loggers;
+import org.sonar.core.config.ComputeEngineProperties;
+import org.sonar.db.DbClient;
+import org.sonar.db.DbSession;
+import org.sonar.db.ce.CeQueueDao;
+import org.sonar.db.ce.CeQueueDto;
+import org.sonar.db.ce.CeTaskDtoLight;
+import org.sonar.db.ce.PrOrBranchTask;
+
+import static java.util.stream.Collectors.toList;
+import static org.sonar.db.ce.CeTaskCharacteristicDto.BRANCH_KEY;
+import static org.sonar.db.ce.CeTaskCharacteristicDto.PULL_REQUEST;
+
+@ComputeEngineSide
+public class NextPendingTaskPicker {
+ private static final Logger LOG = Loggers.get(NextPendingTaskPicker.class);
+
+ private final Configuration config;
+ private final CeQueueDao ceQueueDao;
+
+ public NextPendingTaskPicker(Configuration config, DbClient dbClient) {
+ this.config = config;
+ this.ceQueueDao = dbClient.ceQueueDao();
+ }
+
+ Optional<CeQueueDto> findPendingTask(String workerUuid, DbSession dbSession, boolean prioritizeAnalysisAndRefresh) {
+ // try to find tasks including indexation job & excluding app/portfolio and if no match, try the opposite
+ // when prioritizeAnalysisAndRefresh is false, search first excluding indexation jobs and including app/portfolio, then the opposite
+ Optional<CeTaskDtoLight> eligibleForPeek = ceQueueDao.selectEligibleForPeek(dbSession, prioritizeAnalysisAndRefresh, !prioritizeAnalysisAndRefresh);
+ Optional<CeTaskDtoLight> eligibleForPeekInParallel = eligibleForPeekInParallel(dbSession);
+
+ if (eligibleForPeek.isPresent() || eligibleForPeekInParallel.isPresent()) {
+ return submitOldest(dbSession, workerUuid, eligibleForPeek.orElse(null), eligibleForPeekInParallel.orElse(null));
+ }
+
+ eligibleForPeek = ceQueueDao.selectEligibleForPeek(dbSession, !prioritizeAnalysisAndRefresh, prioritizeAnalysisAndRefresh);
+ if (eligibleForPeek.isPresent()) {
+ return ceQueueDao.tryToPeek(dbSession, eligibleForPeek.get().getCeTaskUuid(), workerUuid);
+ }
+ return Optional.empty();
+ }
+
+ /**
+ * priority is always given to the task that is waiting longer - to avoid starvation
+ */
+ private Optional<CeQueueDto> submitOldest(DbSession session, String workerUuid, @Nullable CeTaskDtoLight eligibleForPeek, @Nullable CeTaskDtoLight eligibleForPeekInParallel) {
+ CeTaskDtoLight oldest = (CeTaskDtoLight) ObjectUtils.min(eligibleForPeek, eligibleForPeekInParallel);
+ Optional<CeQueueDto> ceQueueDto = ceQueueDao.tryToPeek(session, oldest.getCeTaskUuid(), workerUuid);
+ if (!Objects.equals(oldest, eligibleForPeek)) {
+ ceQueueDto.ifPresent(t -> LOG.info("Task [uuid = " + t.getUuid() + "] will be run concurrently with other tasks for the same project"));
+ }
+ return ceQueueDto;
+ }
+
+ Optional<CeTaskDtoLight> eligibleForPeekInParallel(DbSession dbSession) {
+ Optional<Boolean> parallelProjectTasksEnabled = config.getBoolean(ComputeEngineProperties.CE_PARALLEL_PROJECT_TASKS_ENABLED);
+ if (parallelProjectTasksEnabled.isPresent() && Boolean.TRUE.equals(parallelProjectTasksEnabled.get())) {
+ return findPendingConcurrentCandidateTasks(ceQueueDao, dbSession);
+ }
+ return Optional.empty();
+ }
+
+ /**
+ * Some of the tasks of the same project (mostly PRs) can be assigned and executed on workers at the same time/concurrently.
+ * We look for them in this method.
+ */
+ private static Optional<CeTaskDtoLight> findPendingConcurrentCandidateTasks(CeQueueDao ceQueueDao, DbSession session) {
+ List<PrOrBranchTask> queuedPrOrBranches = filterOldestPerProject(ceQueueDao.selectOldestPendingPrOrBranch(session));
+ List<PrOrBranchTask> inProgressTasks = ceQueueDao.selectInProgressWithCharacteristics(session);
+
+ for (PrOrBranchTask task : queuedPrOrBranches) {
+ if ((Objects.equals(task.getBranchType(), PULL_REQUEST) && canRunPr(task, inProgressTasks))
+ || (Objects.equals(task.getBranchType(), BRANCH_KEY) && canRunBranch(task, inProgressTasks))) {
+ return Optional.of(task);
+ }
+ }
+ return Optional.empty();
+ }
+
+ private static List<PrOrBranchTask> filterOldestPerProject(List<PrOrBranchTask> queuedPrOrBranches) {
+ Set<String> mainComponentUuidsSeen = new HashSet<>();
+ return queuedPrOrBranches.stream().filter(t -> mainComponentUuidsSeen.add(t.getMainComponentUuid())).collect(toList());
+ }
+
+ /**
+ * Branches cannot be run concurrently at this moment with other branches. And branches can already be returned in
+ * {@link CeQueueDao#selectEligibleForPeek(org.sonar.db.DbSession, boolean, boolean)}. But we need this method because branches can be
+ * assigned to a worker in a situation where the only type of in-progress tasks for a given project are {@link #PULLREQUEST_TYPE}.
+ * <p>
+ * This method returns the longest waiting branch in the queue which can be scheduled concurrently with pull requests.
+ */
+ private static boolean canRunBranch(PrOrBranchTask task, List<PrOrBranchTask> inProgress) {
+ String mainComponentUuid = task.getMainComponentUuid();
+ List<PrOrBranchTask> sameComponentTasks = inProgress.stream()
+ .filter(t -> t.getMainComponentUuid().equals(mainComponentUuid))
+ .collect(toList());
+ //we can peek branch analysis task only if all the other in progress tasks for this component uuid are pull requests
+ return sameComponentTasks.stream().map(PrOrBranchTask::getBranchType).allMatch(s -> Objects.equals(s, PULL_REQUEST));
+ }
+
+ /**
+ * Queued pull requests can almost always be assigned to worker unless there is already PR running with the same ID (text_value column)
+ * and for the same project. We look for the one that waits for the longest time.
+ */
+ private static boolean canRunPr(PrOrBranchTask task, List<PrOrBranchTask> inProgress) {
+ // return true unless the same PR is already in progress
+ return inProgress.stream()
+ .noneMatch(pr -> pr.getMainComponentUuid().equals(task.getMainComponentUuid()) && Objects.equals(pr.getBranchType(), PULL_REQUEST) &&
+ Objects.equals(pr.getComponentUuid(), (task.getComponentUuid())));
+ }
+}
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
+import org.sonar.api.config.Configuration;
import org.sonar.api.impl.utils.AlwaysIncreasingSystem2;
import org.sonar.api.utils.System2;
import org.sonar.ce.container.ComputeEngineStatus;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.assertj.core.api.Assertions.fail;
+import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
private UuidFactory uuidFactory = UuidFactoryImpl.INSTANCE;
private CEQueueStatus queueStatus = new CEQueueStatusImpl(db.getDbClient(), mock(System2.class));
private ComputeEngineStatus computeEngineStatus = mock(ComputeEngineStatus.class);
- private InternalCeQueue underTest = new InternalCeQueueImpl(system2, db.getDbClient(), uuidFactory, queueStatus, computeEngineStatus);
+ private Configuration config = mock(Configuration.class);
+ private NextPendingTaskPicker nextPendingTaskPicker = new NextPendingTaskPicker(config, db.getDbClient());
+ private InternalCeQueue underTest = new InternalCeQueueImpl(system2, db.getDbClient(), uuidFactory, queueStatus,
+ computeEngineStatus, nextPendingTaskPicker);
@Before
public void setUp() {
+ when(config.getBoolean(any())).thenReturn(Optional.of(false));
when(computeEngineStatus.getStatus()).thenReturn(STARTED);
}
db.getDbClient().ceQueueDao().deleteByUuid(db.getSession(), task.getUuid());
db.commit();
- InternalCeQueueImpl underTest = new InternalCeQueueImpl(system2, db.getDbClient(), null, queueStatus, null);
+ InternalCeQueueImpl underTest = new InternalCeQueueImpl(system2, db.getDbClient(), null, queueStatus, null, null);
try {
underTest.remove(task, CeActivityDto.Status.SUCCESS, null, null);
CeTask task = submit(CeTaskTypes.REPORT, newProjectDto("PROJECT_1"));
db.getDbClient().ceQueueDao().deleteByUuid(db.getSession(), task.getUuid());
db.commit();
- InternalCeQueueImpl underTest = new InternalCeQueueImpl(system2, db.getDbClient(), null, queueStatusMock, null);
+ InternalCeQueueImpl underTest = new InternalCeQueueImpl(system2, db.getDbClient(), null, queueStatusMock, null, null);
try {
underTest.remove(task, CeActivityDto.Status.FAILED, null, null);
CeTask task = submit(CeTaskTypes.REPORT, newProjectDto("PROJECT_1"));
db.executeUpdateSql("update ce_queue set status = 'PENDING', started_at = 123 where uuid = '" + task.getUuid() + "'");
db.commit();
- InternalCeQueueImpl underTest = new InternalCeQueueImpl(system2, db.getDbClient(), null, queueStatusMock, null);
+ InternalCeQueueImpl underTest = new InternalCeQueueImpl(system2, db.getDbClient(), null, queueStatusMock, null, null);
underTest.cancelWornOuts();
--- /dev/null
+/*
+ * SonarQube
+ * Copyright (C) 2009-2022 SonarSource SA
+ * mailto:info AT sonarsource DOT com
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 3 of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this program; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ */
+package org.sonar.ce.queue;
+
+import java.util.Optional;
+import java.util.UUID;
+import java.util.function.Consumer;
+import javax.annotation.Nullable;
+import org.jetbrains.annotations.NotNull;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.sonar.api.config.Configuration;
+import org.sonar.api.impl.utils.AlwaysIncreasingSystem2;
+import org.sonar.api.utils.System2;
+import org.sonar.api.utils.log.LogTester;
+import org.sonar.core.config.ComputeEngineProperties;
+import org.sonar.db.DbTester;
+import org.sonar.db.ce.CeQueueDto;
+import org.sonar.db.ce.CeTaskCharacteristicDto;
+import org.sonar.db.ce.CeTaskTypes;
+import org.sonar.db.component.ComponentDto;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+import static org.sonar.db.ce.CeQueueDto.Status.IN_PROGRESS;
+import static org.sonar.db.ce.CeQueueDto.Status.PENDING;
+import static org.sonar.db.ce.CeTaskCharacteristicDto.BRANCH_KEY;
+import static org.sonar.db.ce.CeTaskCharacteristicDto.PULL_REQUEST;
+
+public class NextPendingTaskPickerTest {
+
+ private final System2 alwaysIncreasingSystem2 = new AlwaysIncreasingSystem2(1L, 1);
+
+ private final Configuration config = mock(Configuration.class);
+
+ @Rule
+ public LogTester logTester = new LogTester();
+
+ private NextPendingTaskPicker underTest;
+
+ @Rule
+ public DbTester db = DbTester.create(alwaysIncreasingSystem2);
+
+ @Before
+ public void before() {
+ underTest = new NextPendingTaskPicker(config, db.getDbClient());
+ when(config.getBoolean(ComputeEngineProperties.CE_PARALLEL_PROJECT_TASKS_ENABLED)).thenReturn(Optional.of(true));
+ }
+
+ @Test
+ public void findPendingTask_whenNoTasksPending_returnsEmpty() {
+ Optional<CeQueueDto> ceQueueDto = underTest.findPendingTask("workerUuid", db.getSession(), true);
+
+ assertThat(ceQueueDto).isEmpty();
+ }
+
+ @Test
+ public void findPendingTask_whenTwoTasksPending_returnsTheOlderOne() {
+ // both the 'eligibleTask' and 'parallelEligibleTask' will point to this one
+ insertPending("1");
+ insertPending("2");
+ Optional<CeQueueDto> ceQueueDto = underTest.findPendingTask("workerUuid", db.getSession(), true);
+
+ assertThat(ceQueueDto).isPresent();
+ assertThat(ceQueueDto.get().getUuid()).isEqualTo("1");
+ }
+
+ @Test
+ public void findPendingTask_whenTwoTasksPendingWithSameCreationDate_returnsLowestUuid() {
+ insertPending("d", c -> c.setCreatedAt(1L).setUpdatedAt(1L));
+ insertPending("c", c -> c.setCreatedAt(1L).setUpdatedAt(1L));
+ Optional<CeQueueDto> ceQueueDto = underTest.findPendingTask("workerUuid", db.getSession(), true);
+
+ assertThat(ceQueueDto).isPresent();
+ assertThat(ceQueueDto.get().getUuid()).isEqualTo("c");
+ }
+
+ @Test
+ public void findPendingTask_givenBranchInProgressAndPropertySet_returnQueuedPR() {
+ insertInProgress("1");
+ insertPendingPullRequest("2");
+ Optional<CeQueueDto> ceQueueDto = underTest.findPendingTask("workerUuid", db.getSession(), true);
+
+ assertThat(ceQueueDto).isPresent();
+ assertThat(ceQueueDto.get().getUuid()).isEqualTo("2");
+
+ assertThat(logTester.logs()).contains("Task [uuid = " + ceQueueDto.get().getUuid() + "] will be run concurrently with other tasks for the same project");
+ }
+
+ @Test
+ public void findPendingTask_givenBranchInProgressAndPropertyNotSet_dontReturnQueuedPR() {
+ when(config.getBoolean(ComputeEngineProperties.CE_PARALLEL_PROJECT_TASKS_ENABLED)).thenReturn(Optional.of(false));
+ insertInProgress("1");
+ insertPendingPullRequest("2");
+ Optional<CeQueueDto> ceQueueDto = underTest.findPendingTask("workerUuid", db.getSession(), true);
+
+ assertThat(ceQueueDto).isEmpty();
+ }
+
+ @Test
+ public void findPendingTask_given2PRsQueued_returnBothQueuedPR() {
+ insertPendingPullRequest("1");
+ insertPendingPullRequest("2");
+ Optional<CeQueueDto> ceQueueDto = underTest.findPendingTask("workerUuid", db.getSession(), true);
+ Optional<CeQueueDto> ceQueueDto2 = underTest.findPendingTask("workerUuid2", db.getSession(), true);
+
+ assertThat(ceQueueDto).isPresent();
+ assertThat(ceQueueDto2).isPresent();
+ assertThat(ceQueueDto.get().getUuid()).isEqualTo("1");
+ assertThat(ceQueueDto.get().getStatus()).isEqualTo(IN_PROGRESS);
+ assertThat(ceQueueDto2.get().getStatus()).isEqualTo(IN_PROGRESS);
+ }
+
+ @Test
+ public void findPendingTask_given1MainBranch_2PRsQueued_returnMainBranchAndPRs() {
+ insertPending("1");
+ insertPendingPullRequest("2");
+ insertPendingPullRequest("3");
+ Optional<CeQueueDto> ceQueueDto = underTest.findPendingTask("workerUuid", db.getSession(), true);
+ Optional<CeQueueDto> ceQueueDto2 = underTest.findPendingTask("workerUuid2", db.getSession(), true);
+ Optional<CeQueueDto> ceQueueDto3 = underTest.findPendingTask("workerUuid3", db.getSession(), true);
+
+ assertThat(ceQueueDto).isPresent();
+ assertThat(ceQueueDto2).isPresent();
+ assertThat(ceQueueDto3).isPresent();
+ assertThat(ceQueueDto.get().getUuid()).isEqualTo("1");
+ assertThat(ceQueueDto.get().getStatus()).isEqualTo(IN_PROGRESS);
+ assertThat(ceQueueDto2.get().getUuid()).isEqualTo("2");
+ assertThat(ceQueueDto2.get().getStatus()).isEqualTo(IN_PROGRESS);
+ assertThat(ceQueueDto3.get().getUuid()).isEqualTo("3");
+ assertThat(ceQueueDto3.get().getStatus()).isEqualTo(IN_PROGRESS);
+ }
+
+ @Test
+ public void findPendingTask_given1MainBranch_2BranchesQueued_returnOnyMainBranch() {
+ insertPending("1", null);
+ insertPendingBranch("2");
+ insertPendingBranch("3");
+ Optional<CeQueueDto> ceQueueDto = underTest.findPendingTask("workerUuid", db.getSession(), true);
+ Optional<CeQueueDto> ceQueueDto2 = underTest.findPendingTask("workerUuid2", db.getSession(), true);
+
+ assertThat(ceQueueDto).isPresent();
+ assertThat(ceQueueDto2).isEmpty();
+ assertThat(ceQueueDto.get().getUuid()).isEqualTo("1");
+ assertThat(ceQueueDto.get().getStatus()).isEqualTo(IN_PROGRESS);
+ }
+
+ @Test
+ public void findPendingTask_given2BranchesQueued_returnOnlyFirstQueuedBranch() {
+ insertPending("1");
+ insertPendingBranch("2");
+ Optional<CeQueueDto> ceQueueDto = underTest.findPendingTask("workerUuid", db.getSession(), true);
+ Optional<CeQueueDto> ceQueueDto2 = underTest.findPendingTask("workerUuid2", db.getSession(), true);
+
+ assertThat(ceQueueDto).isPresent();
+ assertThat(ceQueueDto2).isEmpty();
+ assertThat(ceQueueDto.get().getStatus()).isEqualTo(IN_PROGRESS);
+ }
+
+ @Test
+ public void findPendingTask_given2SamePRsQueued_returnOnlyFirstQueuedPR() {
+ insertPendingPullRequest("1", c -> c.setComponentUuid("pr1"));
+ insertPendingPullRequest("2", c -> c.setComponentUuid("pr1"));
+ Optional<CeQueueDto> ceQueueDto = underTest.findPendingTask("workerUuid", db.getSession(), true);
+ Optional<CeQueueDto> ceQueueDto2 = underTest.findPendingTask("workerUuid2", db.getSession(), true);
+
+ assertThat(ceQueueDto).isPresent();
+ assertThat(ceQueueDto2).isEmpty();
+ assertThat(ceQueueDto.get().getStatus()).isEqualTo(IN_PROGRESS);
+ }
+
+ @Test
+ public void findPendingTask_givenBranchInTheQueueOlderThanPrInTheQueue_dontJumpAheadOfBranch() {
+ // we have branch task in progress. Next branch task needs to wait for this one to finish. We dont allow PRs to jump ahead of this branch
+ insertInProgress("1");
+ insertPending("2");
+ insertPendingPullRequest("3");
+
+ Optional<CeQueueDto> ceQueueDto = underTest.findPendingTask("workerUuid", db.getSession(), true);
+
+ assertThat(ceQueueDto).isEmpty();
+ }
+
+ @Test
+ public void findPendingTask_givenDifferentProjectAndPrInTheQueue_dontJumpAheadOfDifferentProject() {
+ // we have branch task in progress.
+ insertInProgress("1");
+ // The PR can run in parallel, but needs to wait for this other project to finish. We dont allow PRs to jump ahead
+ insertPending("2", c -> c.setMainComponentUuid("different project"));
+ insertPendingPullRequest("3");
+
+ Optional<CeQueueDto> ceQueueDto = underTest.findPendingTask("workerUuid", db.getSession(), true);
+
+ assertThat(ceQueueDto).isPresent();
+ assertThat(ceQueueDto.get().getUuid()).isEqualTo("2");
+ }
+
+ @Test
+ public void findPendingTask_givenDifferentProjectAndPrInTheQueue_prCanRunFirst() {
+ // we have branch task in progress.
+ insertInProgress("1");
+ // The PR can run in parallel and is ahead of the other project
+ insertPendingPullRequest("2");
+ insertPending("3", c -> c.setMainComponentUuid("different project"));
+
+ Optional<CeQueueDto> ceQueueDto = underTest.findPendingTask("workerUuid", db.getSession(), true);
+
+ assertThat(ceQueueDto).isPresent();
+ assertThat(ceQueueDto.get().getUuid()).isEqualTo("2");
+ }
+
+ @Test
+ public void findPendingTask_givenFivePrsInProgress_branchCanBeScheduled() {
+ insertInProgressPullRequest("1");
+ insertInProgressPullRequest("2");
+ insertInProgressPullRequest("3");
+ insertInProgressPullRequest("4");
+ insertInProgressPullRequest("5");
+ insertPending("6");
+
+ Optional<CeQueueDto> ceQueueDto = underTest.findPendingTask("workerUuid", db.getSession(), true);
+
+ assertThat(ceQueueDto).isPresent();
+ assertThat(ceQueueDto.get().getUuid()).isEqualTo("6");
+ }
+
+ @Test
+ public void findPendingTask_excludingViewPickUpOrphanBranches() {
+ insertPending("1", dto -> dto
+ .setComponentUuid("1")
+ .setMainComponentUuid("non-existing-uuid")
+ .setStatus(PENDING)
+ .setTaskType(CeTaskTypes.BRANCH_ISSUE_SYNC)
+ .setCreatedAt(100_000L));
+
+ Optional<CeQueueDto> peek = underTest.findPendingTask("1", db.getSession(), false);
+ assertThat(peek).isPresent();
+ assertThat(peek.get().getUuid()).isEqualTo("1");
+ }
+
+ @Test
+ public void exclude_portfolios_computation_when_indexing_issues() {
+ String taskUuid1 = "1", taskUuid2 = "2";
+ String mainComponentUuid = "1";
+ insertBranch(mainComponentUuid);
+ insertPending(taskUuid1, dto -> dto
+ .setComponentUuid(mainComponentUuid)
+ .setMainComponentUuid(mainComponentUuid)
+ .setStatus(PENDING)
+ .setTaskType(CeTaskTypes.BRANCH_ISSUE_SYNC)
+ .setCreatedAt(100_000L));
+
+ String view_uuid = "view_uuid";
+ insertView(view_uuid);
+ insertPending(taskUuid2, dto -> dto
+ .setComponentUuid(view_uuid)
+ .setMainComponentUuid(view_uuid)
+ .setStatus(PENDING)
+ .setTaskType(CeTaskTypes.REPORT)
+ .setCreatedAt(100_000L));
+
+ Optional<CeQueueDto> peek = underTest.findPendingTask("1", db.getSession(), false);
+ assertThat(peek).isPresent();
+ assertThat(peek.get().getUuid()).isEqualTo(taskUuid1);
+
+ Optional<CeQueueDto> peek2 = underTest.findPendingTask("1", db.getSession(), false);
+ assertThat(peek2).isPresent();
+ assertThat(peek2.get().getUuid()).isEqualTo(taskUuid2);
+ }
+
+ private CeQueueDto insertPending(String uuid) {
+ return insertPending(uuid, null);
+ }
+
+ private CeQueueDto insertPendingBranch(String uuid) {
+ CeQueueDto queue = insertPending(uuid, null);
+ insertCharacteristics(queue.getUuid(), BRANCH_KEY);
+ return queue;
+ }
+
+ private CeQueueDto insertPendingPullRequest(String uuid) {
+ return insertPendingPullRequest(uuid, null);
+ }
+
+ private CeQueueDto insertPendingPullRequest(String uuid, @Nullable Consumer<CeQueueDto> ceQueueDto) {
+ CeQueueDto queue = insertPending(uuid, ceQueueDto);
+ insertCharacteristics(queue.getUuid(), PULL_REQUEST);
+ return queue;
+ }
+
+ private CeQueueDto insertInProgressPullRequest(String uuid) {
+ CeQueueDto queue = insertInProgress(uuid, null);
+ insertCharacteristics(queue.getUuid(), PULL_REQUEST);
+ return queue;
+ }
+
+ private CeQueueDto insertInProgress(String uuid) {
+ return insertInProgress(uuid, null);
+ }
+
+ private CeQueueDto insertInProgress(String uuid, @Nullable Consumer<CeQueueDto> ceQueueDto) {
+ return insertTask(uuid, IN_PROGRESS, ceQueueDto);
+ }
+
+ private CeQueueDto insertPending(String uuid, @Nullable Consumer<CeQueueDto> ceQueueDto) {
+ return insertTask(uuid, PENDING, ceQueueDto);
+ }
+
+ private CeTaskCharacteristicDto insertCharacteristics(String taskUuid, String branchType) {
+ var ctcDto = new CeTaskCharacteristicDto();
+ ctcDto.setUuid(UUID.randomUUID().toString());
+ ctcDto.setTaskUuid(taskUuid);
+ ctcDto.setKey(branchType);
+ ctcDto.setValue("value");
+ db.getDbClient().ceTaskCharacteristicsDao().insert(db.getSession(), ctcDto);
+ db.getSession().commit();
+ return ctcDto;
+ }
+
+ private CeQueueDto insertTask(String uuid, CeQueueDto.Status status, @Nullable Consumer<CeQueueDto> ceQueueDtoConsumer) {
+ CeQueueDto dto = createCeQueue(uuid, status, ceQueueDtoConsumer);
+ db.getDbClient().ceQueueDao().insert(db.getSession(), dto);
+ db.getSession().commit();
+ return dto;
+ }
+
+ @NotNull
+ private static CeQueueDto createCeQueue(String uuid, CeQueueDto.Status status, @Nullable Consumer<CeQueueDto> ceQueueDtoConsumer) {
+ CeQueueDto dto = new CeQueueDto();
+ dto.setUuid(uuid);
+ dto.setTaskType(CeTaskTypes.REPORT);
+ dto.setStatus(status);
+ dto.setSubmitterUuid("henri");
+ dto.setComponentUuid(UUID.randomUUID().toString());
+ dto.setMainComponentUuid("1");
+ if (ceQueueDtoConsumer != null) {
+ ceQueueDtoConsumer.accept(dto);
+ }
+ return dto;
+ }
+
+ private void insertView(String view_uuid) {
+ ComponentDto view = new ComponentDto();
+ view.setQualifier("VW");
+ view.setKey(view_uuid + "_key");
+ view.setUuid(view_uuid);
+ view.setPrivate(false);
+ view.setRootUuid(view_uuid);
+ view.setUuidPath("uuid_path");
+ view.setBranchUuid(view_uuid);
+ db.components().insertPortfolioAndSnapshot(view);
+ db.commit();
+ }
+
+ private void insertBranch(String uuid) {
+ ComponentDto branch = new ComponentDto();
+ branch.setQualifier("TRK");
+ branch.setKey(uuid + "_key");
+ branch.setUuid(uuid);
+ branch.setPrivate(false);
+ branch.setRootUuid(uuid);
+ branch.setUuidPath("uuid_path");
+ branch.setBranchUuid(uuid);
+ db.components().insertComponent(branch);
+ db.commit();
+ }
+}
}
/**
- * Update all tasks for the specified worker uuid which are not PENDING to:
+ * Updates all tasks for the specified worker uuid which are not PENDING to:
* STATUS='PENDING', STARTED_AT=NULL, UPDATED_AT={now}.
*/
public int resetToPendingForWorker(DbSession session, String workerUuid) {
return builder.build();
}
- public Optional<CeQueueDto> peek(DbSession session, String workerUuid, boolean excludeIndexationJob, boolean excludeViewRefresh) {
- List<String> eligibles = mapper(session).selectEligibleForPeek(ONE_RESULT_PAGINATION, excludeIndexationJob, excludeViewRefresh);
- if (eligibles.isEmpty()) {
- return Optional.empty();
- }
-
- String eligible = eligibles.get(0);
- return tryToPeek(session, eligible, workerUuid);
- }
-
- private Optional<CeQueueDto> tryToPeek(DbSession session, String eligibleTaskUuid, String workerUuid) {
+ public Optional<CeQueueDto> tryToPeek(DbSession session, String eligibleTaskUuid, String workerUuid) {
long now = system2.now();
int touchedRows = mapper(session).updateIf(eligibleTaskUuid,
new UpdateIf.NewProperties(IN_PROGRESS, workerUuid, now, now),
private static CeQueueMapper mapper(DbSession session) {
return session.getMapper(CeQueueMapper.class);
}
+
+ /**
+ * Only returns tasks for projects that currently have no other tasks running
+ */
+ public Optional<CeTaskDtoLight> selectEligibleForPeek(DbSession session, boolean excludeIndexationJob, boolean excludeView) {
+ return mapper(session).selectEligibleForPeek(ONE_RESULT_PAGINATION, excludeIndexationJob, excludeView);
+ }
+
+ public List<PrOrBranchTask> selectOldestPendingPrOrBranch(DbSession session) {
+ return mapper(session).selectOldestPendingPrOrBranch();
+ }
+
+ public List<PrOrBranchTask> selectInProgressWithCharacteristics(DbSession session) {
+ return mapper(session).selectInProgressWithCharacteristics();
+ }
}
package org.sonar.db.ce;
import java.util.List;
+import java.util.Optional;
import javax.annotation.CheckForNull;
import javax.annotation.Nullable;
import org.apache.ibatis.annotations.Param;
int countByQuery(@Param("query") CeTaskQuery query);
- List<String> selectEligibleForPeek(@Param("pagination") Pagination pagination,
+ Optional<CeTaskDtoLight> selectEligibleForPeek(@Param("pagination") Pagination pagination,
@Param("excludeIndexationJob") boolean excludeIndexationJob,
@Param("excludeViewRefresh") boolean excludeViewRefresh);
*/
List<CeQueueDto> selectPending();
+ List<PrOrBranchTask> selectInProgressWithCharacteristics();
+
/**
* Select all pending tasks which have already been started.
*/
boolean hasAnyIssueSyncTaskPendingOrInProgress();
+ List<PrOrBranchTask> selectOldestPendingPrOrBranch();
}
--- /dev/null
+/*
+ * SonarQube
+ * Copyright (C) 2009-2022 SonarSource SA
+ * mailto:info AT sonarsource DOT com
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 3 of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this program; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ */
+package org.sonar.db.ce;
+
+import java.util.Comparator;
+import java.util.Objects;
+
+public class CeTaskDtoLight implements Comparable<CeTaskDtoLight> {
+
+ private String ceTaskUuid;
+ private long createdAt;
+
+ public void setCeTaskUuid(String ceTaskUuid) {
+ this.ceTaskUuid = ceTaskUuid;
+ }
+
+ public void setCreatedAt(long createdAt) {
+ this.createdAt = createdAt;
+ }
+
+ public long getCreatedAt() {
+ return createdAt;
+ }
+
+ public String getCeTaskUuid() {
+ return ceTaskUuid;
+ }
+
+ @Override
+ public int compareTo(CeTaskDtoLight o) {
+ return Comparator.comparingLong(CeTaskDtoLight::getCreatedAt).thenComparing(CeTaskDtoLight::getCeTaskUuid).compare(this, o);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ CeTaskDtoLight that = (CeTaskDtoLight) o;
+ return createdAt == that.createdAt && Objects.equals(ceTaskUuid, that.ceTaskUuid);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(ceTaskUuid, createdAt);
+ }
+}
--- /dev/null
+/*
+ * SonarQube
+ * Copyright (C) 2009-2022 SonarSource SA
+ * mailto:info AT sonarsource DOT com
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 3 of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this program; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ */
+package org.sonar.db.ce;
+
+public class PrOrBranchTask extends CeTaskDtoLight {
+ private String mainComponentUuid;
+ private String taskType;
+ private String branchType;
+ private String componentUuid;
+
+ public String getMainComponentUuid() {
+ return mainComponentUuid;
+ }
+
+ public String getBranchType() {
+ return branchType;
+ }
+
+ public String getComponentUuid() {
+ return componentUuid;
+ }
+
+ public String getTaskType() {
+ return taskType;
+ }
+
+}
</where>
</sql>
- <select id="selectEligibleForPeek" parameterType="map" resultType="String">
- select cq.uuid
+ <select id="selectEligibleForPeek" parameterType="map" resultType="org.sonar.db.ce.CeTaskDtoLight">
+ select cq.uuid as ceTaskUuid, cq.created_at as createdAt
<include refid="sqlSelectEligibleForPeek"/>
<include refid="orderBySelectEligibleForPeek"/>
limit #{pagination.pageSize,jdbcType=INTEGER} offset #{pagination.offset,jdbcType=INTEGER}
</select>
- <select id="selectEligibleForPeek" parameterType="map" resultType="String" databaseId="mssql">
- select query.uuid from (
+ <select id="selectEligibleForPeek" parameterType="map" resultType="org.sonar.db.ce.CeTaskDtoLight" databaseId="mssql">
+ select query.uuid as ceTaskUuid, query.created_at as createdAt from (
select
row_number() over(<include refid="orderBySelectEligibleForPeek"/>) as number,
<include refid="columnsSelectEligibleForPeek"/>
order by number asc
</select>
- <select id="selectEligibleForPeek" parameterType="map" resultType="String" databaseId="oracle">
- select taskuuid from (
- select rownum as rn, t."uuid" as taskuuid from (
+ <select id="selectEligibleForPeek" parameterType="map" resultType="org.sonar.db.ce.CeTaskDtoLight" databaseId="oracle">
+ select taskuuid as ceTaskUuid, createdat as createdAt from (
+ select rownum as rn, t."uuid" as taskuuid, t."created_at" as createdat from (
select
<include refid="columnsSelectEligibleForPeek"/>
- <include refid="sqlSelectEligibleForPeek" />
+ <include refid="sqlSelectEligibleForPeek"/>
<include refid="orderBySelectEligibleForPeek"/>
) t
) t
from dual
</select>
+ <select id="selectOldestPendingPrOrBranch" resultType="org.sonar.db.ce.PrOrBranchTask">
+ select <include refid="oldestPendingPrOrBranch"/> limit 100
+ </select>
+
+ <select id="selectOldestPendingPrOrBranch" resultType="org.sonar.db.ce.PrOrBranchTask" databaseId="mssql">
+ select top (100) <include refid="oldestPendingPrOrBranch"/>
+ </select>
+
+ <select id="selectOldestPendingPrOrBranch" resultType="org.sonar.db.ce.PrOrBranchTask" databaseId="oracle">
+ select * from (select <include refid="oldestPendingPrOrBranch"/>) where rownum <= 100
+ </select>
+
+ <sql id="oldestPendingPrOrBranch">
+ cq.uuid as ceTaskUuid,
+ cq.main_component_uuid as mainComponentUuid,
+ cq.component_uuid as componentUuid,
+ cq.created_at as createdAt,
+ cq.task_type as taskType,
+ coalesce(ctc.kee, 'branch') as branchType
+ from
+ ce_queue cq
+ left join ce_task_characteristics ctc on cq.uuid = ctc.task_uuid and (ctc.kee = 'branch' or ctc.kee = 'pullRequest')
+ where
+ cq.status = 'PENDING'
+ and cq.task_type = 'REPORT'
+ order by
+ cq.created_at, cq.uuid
+ </sql>
+
+ <select id="selectInProgressWithCharacteristics" resultType="org.sonar.db.ce.PrOrBranchTask">
+ select
+ cq.uuid as ceTaskUuid,
+ cq.main_component_uuid as mainComponentUuid,
+ cq.created_at as createdAt,
+ coalesce(ctc.kee, 'branch') as branchType,
+ cq.task_type as taskType,
+ cq.component_uuid as componentUuid
+ from
+ ce_queue cq left join ce_task_characteristics ctc on cq.uuid = ctc.task_uuid and (ctc.kee = 'branch' or ctc.kee = 'pullRequest')
+ where
+ cq.status = 'IN_PROGRESS'
+ </select>
+
</mapper>
import org.sonar.api.impl.utils.TestSystem2;
import org.sonar.api.utils.System2;
import org.sonar.db.DbTester;
-import org.sonar.db.component.ComponentDto;
import static com.google.common.collect.Lists.newArrayList;
import static java.util.Collections.singletonList;
import static org.sonar.db.ce.CeQueueDto.Status.PENDING;
import static org.sonar.db.ce.CeQueueTesting.newCeQueueDto;
import static org.sonar.db.ce.CeQueueTesting.reset;
+import static org.sonar.db.ce.CeTaskCharacteristicDto.BRANCH_KEY;
+import static org.sonar.db.ce.CeTaskCharacteristicDto.PULL_REQUEST;
public class CeQueueDaoTest {
private static final long INIT_TIME = 1_450_000_000_000L;
private static final String TASK_UUID_2 = "TASK_2";
private static final String MAIN_COMPONENT_UUID_1 = "PROJECT_1";
private static final String MAIN_COMPONENT_UUID_2 = "PROJECT_2";
+ private static final String COMPONENT_UUID_1 = "BRANCH_1";
private static final String TASK_UUID_3 = "TASK_3";
private static final String SELECT_QUEUE_UUID_AND_STATUS_QUERY = "select uuid,status from ce_queue";
private static final String SUBMITTER_LOGIN = "submitter uuid";
assertThat(dto.getWorkerUuid()).isEqualTo(workerUuid);
}
- @Test
- public void peek_none_if_no_pendings() {
- assertThat(underTest.peek(db.getSession(), WORKER_UUID_1, false, false)).isNotPresent();
-
- // not pending, but in progress
- makeInProgress(WORKER_UUID_1, 2_232_222L, insertPending(TASK_UUID_1, MAIN_COMPONENT_UUID_1));
- assertThat(underTest.peek(db.getSession(), WORKER_UUID_1, false, false)).isNotPresent();
- }
-
- @Test
- public void peek_oldest_pending() {
- insertPending(TASK_UUID_1, MAIN_COMPONENT_UUID_1);
- system2.setNow(INIT_TIME + 3_000_000);
- insertPending(TASK_UUID_2, MAIN_COMPONENT_UUID_2);
-
- assertThat(db.countRowsOfTable("ce_queue")).isEqualTo(2);
- verifyCeQueueStatuses(TASK_UUID_1, PENDING, TASK_UUID_2, PENDING);
-
- // peek first one
- Optional<CeQueueDto> peek = underTest.peek(db.getSession(), WORKER_UUID_1, false, false);
- assertThat(peek).isPresent();
- assertThat(peek.get().getUuid()).isEqualTo(TASK_UUID_1);
- assertThat(peek.get().getStatus()).isEqualTo(IN_PROGRESS);
- assertThat(peek.get().getWorkerUuid()).isEqualTo(WORKER_UUID_1);
- verifyCeQueueStatuses(TASK_UUID_1, IN_PROGRESS, TASK_UUID_2, PENDING);
-
- // peek second one
- peek = underTest.peek(db.getSession(), WORKER_UUID_2, false, false);
- assertThat(peek).isPresent();
- assertThat(peek.get().getUuid()).isEqualTo(TASK_UUID_2);
- assertThat(peek.get().getStatus()).isEqualTo(IN_PROGRESS);
- assertThat(peek.get().getWorkerUuid()).isEqualTo(WORKER_UUID_2);
- verifyCeQueueStatuses(TASK_UUID_1, IN_PROGRESS, TASK_UUID_2, IN_PROGRESS);
-
- // no more pendings
- assertThat(underTest.peek(db.getSession(), WORKER_UUID_1, false, false)).isNotPresent();
- }
-
- @Test
- public void do_not_peek_multiple_tasks_on_same_main_component_at_the_same_time() {
- // two pending tasks on the same project
- insertPending(TASK_UUID_1, MAIN_COMPONENT_UUID_1);
- system2.setNow(INIT_TIME + 3_000_000);
- insertPending(TASK_UUID_2, MAIN_COMPONENT_UUID_1);
-
- Optional<CeQueueDto> peek = underTest.peek(db.getSession(), WORKER_UUID_1, false, false);
- assertThat(peek).isPresent();
- assertThat(peek.get().getUuid()).isEqualTo(TASK_UUID_1);
- assertThat(peek.get().getMainComponentUuid()).isEqualTo(MAIN_COMPONENT_UUID_1);
- assertThat(peek.get().getWorkerUuid()).isEqualTo(WORKER_UUID_1);
- verifyCeQueueStatuses(TASK_UUID_1, IN_PROGRESS, TASK_UUID_2, PENDING);
-
- // do not peek second task as long as the first one is in progress
- peek = underTest.peek(db.getSession(), WORKER_UUID_1, false, false);
- assertThat(peek).isEmpty();
-
- // first one is finished
- underTest.deleteByUuid(db.getSession(), TASK_UUID_1);
- peek = underTest.peek(db.getSession(), WORKER_UUID_2, false, false);
- assertThat(peek.get().getUuid()).isEqualTo(TASK_UUID_2);
- assertThat(peek.get().getWorkerUuid()).isEqualTo(WORKER_UUID_2);
- }
-
@Test
public void select_by_query() {
// task status not in query
}
@Test
- public void exclude_portfolios_computation_when_indexing_issues() {
- insertBranch(MAIN_COMPONENT_UUID_1);
+ public void hasAnyIssueSyncTaskPendingOrInProgress_PENDING() {
+ assertThat(underTest.hasAnyIssueSyncTaskPendingOrInProgress(db.getSession())).isFalse();
+
insertPending(newCeQueueDto(TASK_UUID_1)
.setComponentUuid(MAIN_COMPONENT_UUID_1)
- .setMainComponentUuid(MAIN_COMPONENT_UUID_1)
.setStatus(PENDING)
.setTaskType(CeTaskTypes.BRANCH_ISSUE_SYNC)
.setCreatedAt(100_000L));
- String view_uuid = "view_uuid";
- insertView(view_uuid);
- insertPending(newCeQueueDto(TASK_UUID_2)
- .setComponentUuid(view_uuid)
- .setMainComponentUuid(view_uuid)
- .setStatus(PENDING)
- .setTaskType(CeTaskTypes.REPORT)
- .setCreatedAt(100_000L));
-
- Optional<CeQueueDto> peek = underTest.peek(db.getSession(), WORKER_UUID_1, false, true);
- assertThat(peek).isPresent();
- assertThat(peek.get().getUuid()).isEqualTo(TASK_UUID_1);
-
- Optional<CeQueueDto> peek2 = underTest.peek(db.getSession(), WORKER_UUID_1, false, false);
- assertThat(peek2).isPresent();
- assertThat(peek2.get().getUuid()).isEqualTo(TASK_UUID_2);
+ assertThat(underTest.hasAnyIssueSyncTaskPendingOrInProgress(db.getSession())).isTrue();
}
@Test
- public void excluding_view_pick_up_orphan_branches() {
+ public void hasAnyIssueSyncTaskPendingOrInProgress_IN_PROGRESS() {
+ assertThat(underTest.hasAnyIssueSyncTaskPendingOrInProgress(db.getSession())).isFalse();
+
insertPending(newCeQueueDto(TASK_UUID_1)
.setComponentUuid(MAIN_COMPONENT_UUID_1)
- .setMainComponentUuid("non-existing-uuid")
- .setStatus(PENDING)
+ .setStatus(IN_PROGRESS)
.setTaskType(CeTaskTypes.BRANCH_ISSUE_SYNC)
.setCreatedAt(100_000L));
- Optional<CeQueueDto> peek = underTest.peek(db.getSession(), WORKER_UUID_1, false, true);
- assertThat(peek).isPresent();
- assertThat(peek.get().getUuid()).isEqualTo(TASK_UUID_1);
+ assertThat(underTest.hasAnyIssueSyncTaskPendingOrInProgress(db.getSession())).isTrue();
}
@Test
- public void hasAnyIssueSyncTaskPendingOrInProgress_PENDING() {
- assertThat(underTest.hasAnyIssueSyncTaskPendingOrInProgress(db.getSession())).isFalse();
+ public void selectOldestPendingPrOrBranch_returns_oldest_100_pr_or_branch_tasks() {
+ for (int i = 1; i < 110; i++) {
+ insertPending(newCeQueueDto("task" + i)
+ .setComponentUuid(MAIN_COMPONENT_UUID_1).setStatus(PENDING).setTaskType(CeTaskTypes.REPORT).setCreatedAt(i));
+ }
+ for (int i = 1; i < 10; i++) {
+ insertPending(newCeQueueDto("progress" + i)
+ .setComponentUuid(MAIN_COMPONENT_UUID_1).setStatus(IN_PROGRESS).setTaskType(CeTaskTypes.REPORT).setCreatedAt(i));
+ insertPending(newCeQueueDto("sync" + i)
+ .setComponentUuid(MAIN_COMPONENT_UUID_1).setStatus(PENDING).setTaskType(CeTaskTypes.BRANCH_ISSUE_SYNC).setCreatedAt(i));
+ }
+
+ List<PrOrBranchTask> prOrBranchTasks = underTest.selectOldestPendingPrOrBranch(db.getSession());
+ assertThat(prOrBranchTasks).hasSize(100)
+ .allMatch(t -> t.getCeTaskUuid().startsWith("task"), "uuid starts with task")
+ .allMatch(t -> t.getCreatedAt() <= 100, "creation date older or equal than 100");
+ }
+
+ @Test
+ public void selectOldestPendingPrOrBranch_returns_branch_branch_type_if_no_characteristics() {
+ insertPending(newCeQueueDto(TASK_UUID_1)
+ .setComponentUuid(COMPONENT_UUID_1)
+ .setMainComponentUuid(MAIN_COMPONENT_UUID_1)
+ .setStatus(PENDING)
+ .setTaskType(CeTaskTypes.REPORT)
+ .setCreatedAt(123L));
+ List<PrOrBranchTask> prOrBranchTasks = underTest.selectOldestPendingPrOrBranch(db.getSession());
+ assertThat(prOrBranchTasks).hasSize(1);
+ assertThat(prOrBranchTasks.get(0))
+ .extracting(PrOrBranchTask::getBranchType, PrOrBranchTask::getComponentUuid, PrOrBranchTask::getMainComponentUuid, PrOrBranchTask::getTaskType)
+ .containsExactly(BRANCH_KEY, COMPONENT_UUID_1, MAIN_COMPONENT_UUID_1, CeTaskTypes.REPORT);
+ }
+
+ @Test
+ public void selectOldestPendingPrOrBranch_returns_branch_branch_type_if_unrelated_characteristics() {
insertPending(newCeQueueDto(TASK_UUID_1)
- .setComponentUuid(MAIN_COMPONENT_UUID_1)
+ .setComponentUuid(COMPONENT_UUID_1)
+ .setMainComponentUuid(MAIN_COMPONENT_UUID_1)
.setStatus(PENDING)
- .setTaskType(CeTaskTypes.BRANCH_ISSUE_SYNC)
- .setCreatedAt(100_000L));
+ .setTaskType(CeTaskTypes.REPORT)
+ .setCreatedAt(123L));
+ List<PrOrBranchTask> prOrBranchTasks = underTest.selectOldestPendingPrOrBranch(db.getSession());
+ insertCharacteristic(BRANCH_KEY, "123", "c1", TASK_UUID_1);
- assertThat(underTest.hasAnyIssueSyncTaskPendingOrInProgress(db.getSession())).isTrue();
+ assertThat(prOrBranchTasks).hasSize(1);
+ assertThat(prOrBranchTasks.get(0))
+ .extracting(PrOrBranchTask::getBranchType, PrOrBranchTask::getComponentUuid, PrOrBranchTask::getMainComponentUuid, PrOrBranchTask::getTaskType)
+ .containsExactly(BRANCH_KEY, COMPONENT_UUID_1, MAIN_COMPONENT_UUID_1, CeTaskTypes.REPORT);
}
@Test
- public void hasAnyIssueSyncTaskPendingOrInProgress_IN_PROGRESS() {
- assertThat(underTest.hasAnyIssueSyncTaskPendingOrInProgress(db.getSession())).isFalse();
+ public void selectOldestPendingPrOrBranch_returns_all_fields() {
+ insertPending(newCeQueueDto(TASK_UUID_1)
+ .setComponentUuid(COMPONENT_UUID_1)
+ .setMainComponentUuid(MAIN_COMPONENT_UUID_1)
+ .setStatus(PENDING)
+ .setTaskType(CeTaskTypes.REPORT)
+ .setCreatedAt(123L));
+ insertCharacteristic(PULL_REQUEST, "1", "c1", TASK_UUID_1);
+
+ List<PrOrBranchTask> prOrBranchTasks = underTest.selectOldestPendingPrOrBranch(db.getSession());
+
+ assertThat(prOrBranchTasks).hasSize(1);
+ assertThat(prOrBranchTasks.get(0))
+ .extracting(PrOrBranchTask::getBranchType, PrOrBranchTask::getComponentUuid, PrOrBranchTask::getMainComponentUuid, PrOrBranchTask::getTaskType)
+ .containsExactly(PULL_REQUEST, COMPONENT_UUID_1, MAIN_COMPONENT_UUID_1, CeTaskTypes.REPORT);
+ }
+ @Test
+ public void selectInProgressWithCharacteristics_returns_all_fields() {
insertPending(newCeQueueDto(TASK_UUID_1)
- .setComponentUuid(MAIN_COMPONENT_UUID_1)
+ .setComponentUuid(COMPONENT_UUID_1)
+ .setMainComponentUuid(MAIN_COMPONENT_UUID_1)
.setStatus(IN_PROGRESS)
- .setTaskType(CeTaskTypes.BRANCH_ISSUE_SYNC)
- .setCreatedAt(100_000L));
+ .setTaskType(CeTaskTypes.REPORT)
+ .setCreatedAt(123L));
+ insertCharacteristic(PULL_REQUEST, "1", "c1", TASK_UUID_1);
- assertThat(underTest.hasAnyIssueSyncTaskPendingOrInProgress(db.getSession())).isTrue();
- }
+ List<PrOrBranchTask> prOrBranchTasks = underTest.selectInProgressWithCharacteristics(db.getSession());
- private void insertView(String view_uuid) {
- ComponentDto view = new ComponentDto();
- view.setQualifier("VW");
- view.setKey(view_uuid + "_key");
- view.setUuid(view_uuid);
- view.setPrivate(false);
- view.setRootUuid(view_uuid);
- view.setUuidPath("uuid_path");
- view.setBranchUuid(view_uuid);
- db.components().insertPortfolioAndSnapshot(view);
- db.commit();
+ assertThat(prOrBranchTasks).hasSize(1);
+ assertThat(prOrBranchTasks.get(0))
+ .extracting(PrOrBranchTask::getBranchType, PrOrBranchTask::getComponentUuid, PrOrBranchTask::getMainComponentUuid, PrOrBranchTask::getTaskType)
+ .containsExactly(PULL_REQUEST, COMPONENT_UUID_1, MAIN_COMPONENT_UUID_1, CeTaskTypes.REPORT);
}
- private void insertBranch(String uuid) {
- ComponentDto branch = new ComponentDto();
- branch.setQualifier("TRK");
- branch.setKey(uuid + "_key");
- branch.setUuid(uuid);
- branch.setPrivate(false);
- branch.setRootUuid(uuid);
- branch.setUuidPath("uuid_path");
- branch.setBranchUuid(uuid);
- db.components().insertComponent(branch);
- db.commit();
+ @Test
+ public void selectInProgressWithCharacteristics_returns_branch_branch_type_if_no_characteristics() {
+ insertPending(newCeQueueDto(TASK_UUID_1)
+ .setComponentUuid(COMPONENT_UUID_1)
+ .setMainComponentUuid(MAIN_COMPONENT_UUID_1)
+ .setStatus(IN_PROGRESS)
+ .setTaskType(CeTaskTypes.REPORT)
+ .setCreatedAt(123L));
+
+ List<PrOrBranchTask> prOrBranchTasks = underTest.selectInProgressWithCharacteristics(db.getSession());
+
+ assertThat(prOrBranchTasks).hasSize(1);
+ assertThat(prOrBranchTasks.get(0))
+ .extracting(PrOrBranchTask::getBranchType, PrOrBranchTask::getComponentUuid, PrOrBranchTask::getMainComponentUuid, PrOrBranchTask::getTaskType)
+ .containsExactly(BRANCH_KEY, COMPONENT_UUID_1, MAIN_COMPONENT_UUID_1, CeTaskTypes.REPORT);
}
private void insertPending(CeQueueDto dto) {
return underTest.selectByUuid(db.getSession(), uuid).get();
}
+ private void insertCharacteristic(String key, String value, String uuid, String taskUuid) {
+ CeTaskCharacteristicDto dto1 = new CeTaskCharacteristicDto()
+ .setKey(key)
+ .setValue(value)
+ .setUuid(uuid)
+ .setTaskUuid(taskUuid);
+ db.getDbClient().ceTaskCharacteristicsDao().insert(db.getSession(), dto1);
+ }
+
private static Iterable<Map<String, Object>> upperizeKeys(List<Map<String, Object>> select) {
return select.stream().map(new Function<Map<String, Object>, Map<String, Object>>() {
@Nullable
}).collect(Collectors.toList());
}
- private void verifyCeQueueStatuses(String taskUuid1, CeQueueDto.Status taskStatus1, String taskUuid2, CeQueueDto.Status taskStatus2) {
- verifyCeQueueStatuses(new String[] {taskUuid1, taskUuid2}, new CeQueueDto.Status[] {taskStatus1, taskStatus2});
- }
-
private void verifyCeQueueStatuses(String[] taskUuids, CeQueueDto.Status[] statuses) {
Map<String, Object>[] rows = new Map[taskUuids.length];
for (int i = 0; i < taskUuids.length; i++) {
--- /dev/null
+/*
+ * SonarQube
+ * Copyright (C) 2009-2022 SonarSource SA
+ * mailto:info AT sonarsource DOT com
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 3 of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this program; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ */
+package org.sonar.db.ce;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+public class CeTaskDtoLightTest {
+ private final CeTaskDtoLight task1 = new CeTaskDtoLight();
+ private final CeTaskDtoLight task2 = new CeTaskDtoLight();
+ private final CeTaskDtoLight task3 = new CeTaskDtoLight();
+ private final CeTaskDtoLight task4 = new CeTaskDtoLight();
+
+ @Before
+ public void setUp() {
+ task1.setCeTaskUuid("id1");
+ task1.setCreatedAt(1);
+ task2.setCeTaskUuid("id1");
+ task2.setCreatedAt(1);
+ task3.setCeTaskUuid("id2");
+ task3.setCreatedAt(1);
+ task4.setCeTaskUuid("id1");
+ task4.setCreatedAt(2);
+ }
+
+ @Test
+ public void equals_is_based_on_created_date_and_uuid() {
+ assertThat(task1).isEqualTo(task2);
+ assertThat(task1).isNotEqualTo(task3);
+ assertThat(task1).isNotEqualTo(task4);
+ }
+
+ @Test
+ public void hashCode_is_based_on_created_date_and_uuid() {
+ assertThat(task1).hasSameHashCodeAs(task2);
+ }
+
+ @Test
+ public void compareTo_is_based_on_created_date_and_uuid() {
+ assertThat(task1).isEqualByComparingTo(task2);
+ assertThat(task1).isLessThan(task3);
+ assertThat(task1).isLessThan(task4);
+ }
+}
--- /dev/null
+/*
+ * SonarQube
+ * Copyright (C) 2009-2022 SonarSource SA
+ * mailto:info AT sonarsource DOT com
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 3 of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this program; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ */
+package org.sonar.core.config;
+
+public class ComputeEngineProperties {
+
+ private ComputeEngineProperties() {
+ }
+
+ public static final String CE_PARALLEL_PROJECT_TASKS_ENABLED = "sonar.ce.parallelProjectTasks";
+
+}