aboutsummaryrefslogtreecommitdiffstats
path: root/server/sonar-ce/src/main/java/org/sonar/ce/queue/NextPendingTaskPicker.java
blob: d104afc1e5a86c69b48363b75853ef118f7a500b (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
/*
 * SonarQube
 * Copyright (C) 2009-2025 SonarSource SA
 * mailto:info AT sonarsource DOT com
 *
 * This program is free software; you can redistribute it and/or
 * modify it under the terms of the GNU Lesser General Public
 * License as published by the Free Software Foundation; either
 * version 3 of the License, or (at your option) any later version.
 *
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
 * Lesser General Public License for more details.
 *
 * You should have received a copy of the GNU Lesser General Public License
 * along with this program; if not, write to the Free Software Foundation,
 * Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
 */
package org.sonar.ce.queue;

import java.util.HashSet;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import javax.annotation.Nullable;
import org.apache.commons.lang3.ObjectUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.sonar.api.ce.ComputeEngineSide;
import org.sonar.api.config.Configuration;
import org.sonar.core.config.ComputeEngineProperties;
import org.sonar.db.DbClient;
import org.sonar.db.DbSession;
import org.sonar.db.ce.CeQueueDao;
import org.sonar.db.ce.CeQueueDto;
import org.sonar.db.ce.CeTaskDtoLight;
import org.sonar.db.ce.PrOrBranchTask;

import static org.sonar.core.ce.CeTaskCharacteristics.BRANCH;
import static org.sonar.core.ce.CeTaskCharacteristics.PULL_REQUEST;

@ComputeEngineSide
public class NextPendingTaskPicker {
  private static final Logger LOG = LoggerFactory.getLogger(NextPendingTaskPicker.class);

  private final Configuration config;
  private final CeQueueDao ceQueueDao;

  public NextPendingTaskPicker(Configuration config, DbClient dbClient) {
    this.config = config;
    this.ceQueueDao = dbClient.ceQueueDao();
  }

  Optional<CeQueueDto> findPendingTask(String workerUuid, DbSession dbSession, boolean prioritizeAnalysisAndRefresh) {
    // try to find tasks including indexing job & excluding app/portfolio and if no match, try the opposite
    // when prioritizeAnalysisAndRefresh is false, search first excluding indexing jobs and including app/portfolio, then the opposite
    Optional<CeTaskDtoLight> eligibleForPeek = ceQueueDao.selectEligibleForPeek(dbSession, prioritizeAnalysisAndRefresh, !prioritizeAnalysisAndRefresh);
    Optional<CeTaskDtoLight> eligibleForPeekInParallel = eligibleForPeekInParallel(dbSession);

    if (eligibleForPeek.isPresent() || eligibleForPeekInParallel.isPresent()) {
      return submitOldest(dbSession, workerUuid, eligibleForPeek.orElse(null), eligibleForPeekInParallel.orElse(null));
    }

    eligibleForPeek = ceQueueDao.selectEligibleForPeek(dbSession, !prioritizeAnalysisAndRefresh, prioritizeAnalysisAndRefresh);
    if (eligibleForPeek.isPresent()) {
      return ceQueueDao.tryToPeek(dbSession, eligibleForPeek.get().getCeTaskUuid(), workerUuid);
    }
    return Optional.empty();
  }

  /**
   * priority is always given to the task that is waiting longer - to avoid starvation
   */
  private Optional<CeQueueDto> submitOldest(DbSession session, String workerUuid, @Nullable CeTaskDtoLight eligibleForPeek, @Nullable CeTaskDtoLight eligibleForPeekInParallel) {
    CeTaskDtoLight oldest = ObjectUtils.min(eligibleForPeek, eligibleForPeekInParallel);
    Optional<CeQueueDto> ceQueueDto = ceQueueDao.tryToPeek(session, oldest.getCeTaskUuid(), workerUuid);
    if (!Objects.equals(oldest, eligibleForPeek)) {
      ceQueueDto.ifPresent(t -> LOG.info("Task [uuid = " + t.getUuid() + "] will be run concurrently with other tasks for the same project"));
    }
    return ceQueueDto;
  }

  Optional<CeTaskDtoLight> eligibleForPeekInParallel(DbSession dbSession) {
    Optional<Boolean> parallelProjectTasksEnabled = config.getBoolean(ComputeEngineProperties.CE_PARALLEL_PROJECT_TASKS_ENABLED);
    if (parallelProjectTasksEnabled.isPresent() && Boolean.TRUE.equals(parallelProjectTasksEnabled.get())) {
      return findPendingConcurrentCandidateTasks(ceQueueDao, dbSession);
    }
    return Optional.empty();
  }

  /**
   * Some of the tasks of the same project (mostly PRs) can be assigned and executed on workers at the same time/concurrently.
   * We look for them in this method.
   */
  private static Optional<CeTaskDtoLight> findPendingConcurrentCandidateTasks(CeQueueDao ceQueueDao, DbSession session) {
    List<PrOrBranchTask> queuedPrOrBranches = filterOldestPerProject(ceQueueDao.selectOldestPendingPrOrBranch(session));
    List<PrOrBranchTask> inProgressTasks = ceQueueDao.selectInProgressWithCharacteristics(session);

    for (PrOrBranchTask task : queuedPrOrBranches) {
      if ((Objects.equals(task.getBranchType(), PULL_REQUEST) && canRunPr(task, inProgressTasks))
        || (Objects.equals(task.getBranchType(), BRANCH) && canRunBranch(task, inProgressTasks))) {
        return Optional.of(task);
      }
    }
    return Optional.empty();
  }

  private static List<PrOrBranchTask> filterOldestPerProject(List<PrOrBranchTask> queuedPrOrBranches) {
    Set<String> entityUuidsSeen = new HashSet<>();
    return queuedPrOrBranches.stream().filter(t -> entityUuidsSeen.add(t.getEntityUuid())).toList();
  }

  /**
   * Branches cannot be run concurrently at this moment with other branches. And branches can already be returned in
   * {@link CeQueueDao#selectEligibleForPeek(org.sonar.db.DbSession, boolean, boolean)}. But we need this method because branches can be
   * assigned to a worker in a situation where the only type of in-progress tasks for a given project are {@link #PULLREQUEST_TYPE}.
   * <p>
   * This method returns the longest waiting branch in the queue which can be scheduled concurrently with pull requests.
   */
  private static boolean canRunBranch(PrOrBranchTask task, List<PrOrBranchTask> inProgress) {
    String entityUuid = task.getEntityUuid();
    List<PrOrBranchTask> sameComponentTasks = inProgress.stream()
      .filter(t -> Objects.equals(t.getEntityUuid(), entityUuid))
      .toList();
    //we can peek branch analysis task only if all the other in progress tasks for this component uuid are pull requests
    return sameComponentTasks.stream().map(PrOrBranchTask::getBranchType).allMatch(s -> Objects.equals(s, PULL_REQUEST));
  }

  /**
   * Queued pull requests can almost always be assigned to worker unless there is already PR running with the same ID (text_value column)
   * and for the same project. We look for the one that waits for the longest time.
   */
  private static boolean canRunPr(PrOrBranchTask task, List<PrOrBranchTask> inProgress) {
    // return true unless the same PR is already in progress
    return inProgress.stream()
      .noneMatch(pr -> Objects.equals(pr.getEntityUuid(), task.getEntityUuid())
        && Objects.equals(pr.getBranchType(), PULL_REQUEST)
        && Objects.equals(pr.getComponentUuid(), task.getComponentUuid()));
  }
}