You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

AsyncIssueIndexingImpl.java 7.8KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196
  1. /*
  2. * SonarQube
  3. * Copyright (C) 2009-2024 SonarSource SA
  4. * mailto:info AT sonarsource DOT com
  5. *
  6. * This program is free software; you can redistribute it and/or
  7. * modify it under the terms of the GNU Lesser General Public
  8. * License as published by the Free Software Foundation; either
  9. * version 3 of the License, or (at your option) any later version.
  10. *
  11. * This program is distributed in the hope that it will be useful,
  12. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  13. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
  14. * Lesser General Public License for more details.
  15. *
  16. * You should have received a copy of the GNU Lesser General Public License
  17. * along with this program; if not, write to the Free Software Foundation,
  18. * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
  19. */
  20. package org.sonar.server.issue.index;
  21. import java.util.ArrayList;
  22. import java.util.Comparator;
  23. import java.util.HashMap;
  24. import java.util.List;
  25. import java.util.Map;
  26. import java.util.Set;
  27. import java.util.function.Function;
  28. import java.util.stream.Collectors;
  29. import java.util.stream.Stream;
  30. import org.slf4j.Logger;
  31. import org.slf4j.LoggerFactory;
  32. import org.sonar.ce.queue.CeQueue;
  33. import org.sonar.ce.queue.CeTaskSubmit;
  34. import org.sonar.core.ce.CeTaskCharacteristics;
  35. import org.sonar.db.DbClient;
  36. import org.sonar.db.DbSession;
  37. import org.sonar.db.ce.CeActivityDto;
  38. import org.sonar.db.ce.CeQueueDto;
  39. import org.sonar.db.component.BranchDto;
  40. import org.sonar.db.component.BranchType;
  41. import org.sonar.db.component.SnapshotDto;
  42. import static java.util.stream.Collectors.toCollection;
  43. import static org.sonar.core.ce.CeTaskCharacteristics.BRANCH_TYPE;
  44. import static org.sonar.core.ce.CeTaskCharacteristics.PULL_REQUEST;
  45. import static org.sonar.db.ce.CeTaskTypes.BRANCH_ISSUE_SYNC;
  46. public class AsyncIssueIndexingImpl implements AsyncIssueIndexing {
  47. private static final Logger LOG = LoggerFactory.getLogger(AsyncIssueIndexingImpl.class);
  48. private final CeQueue ceQueue;
  49. private final DbClient dbClient;
  50. public AsyncIssueIndexingImpl(CeQueue ceQueue, DbClient dbClient) {
  51. this.ceQueue = ceQueue;
  52. this.dbClient = dbClient;
  53. }
  54. @Override
  55. public void triggerOnIndexCreation() {
  56. try (DbSession dbSession = dbClient.openSession(false)) {
  57. // remove already existing indexing task, if any
  58. removeExistingIndexationTasks(dbSession);
  59. dbClient.branchDao().updateAllNeedIssueSync(dbSession);
  60. List<BranchDto> branchInNeedOfIssueSync = dbClient.branchDao().selectBranchNeedingIssueSync(dbSession);
  61. LOG.info("{} branch found in need of issue sync.", branchInNeedOfIssueSync.size());
  62. if (branchInNeedOfIssueSync.isEmpty()) {
  63. return;
  64. }
  65. List<String> projectUuids = branchInNeedOfIssueSync.stream().map(BranchDto::getProjectUuid).distinct().collect(toCollection(ArrayList<String>::new));
  66. LOG.info("{} projects found in need of issue sync.", projectUuids.size());
  67. sortProjectUuids(dbSession, projectUuids);
  68. Map<String, List<BranchDto>> branchesByProject = branchInNeedOfIssueSync.stream()
  69. .collect(Collectors.groupingBy(BranchDto::getProjectUuid));
  70. List<CeTaskSubmit> tasks = new ArrayList<>();
  71. for (String projectUuid : projectUuids) {
  72. List<BranchDto> branches = branchesByProject.get(projectUuid);
  73. for (BranchDto branch : branches) {
  74. tasks.add(buildTaskSubmit(branch));
  75. }
  76. }
  77. ceQueue.massSubmit(tasks);
  78. dbSession.commit();
  79. }
  80. }
  81. @Override
  82. public void triggerForProject(String projectUuid) {
  83. try (DbSession dbSession = dbClient.openSession(false)) {
  84. // remove already existing indexing task, if any
  85. removeExistingIndexationTasksForProject(dbSession, projectUuid);
  86. dbClient.branchDao().updateAllNeedIssueSyncForProject(dbSession, projectUuid);
  87. List<BranchDto> branchInNeedOfIssueSync = dbClient.branchDao().selectBranchNeedingIssueSyncForProject(dbSession, projectUuid);
  88. LOG.info("{} branch(es) found in need of issue sync for project.", branchInNeedOfIssueSync.size());
  89. List<CeTaskSubmit> tasks = new ArrayList<>();
  90. for (BranchDto branch : branchInNeedOfIssueSync) {
  91. tasks.add(buildTaskSubmit(branch));
  92. }
  93. ceQueue.massSubmit(tasks);
  94. dbSession.commit();
  95. }
  96. }
  97. private void sortProjectUuids(DbSession dbSession, List<String> projectUuids) {
  98. Map<String, SnapshotDto> snapshotByProjectUuid = dbClient.snapshotDao()
  99. .selectLastAnalysesByRootComponentUuids(dbSession, projectUuids).stream()
  100. .collect(Collectors.toMap(SnapshotDto::getRootComponentUuid, Function.identity()));
  101. projectUuids.sort(compareBySnapshot(snapshotByProjectUuid));
  102. }
  103. static Comparator<String> compareBySnapshot(Map<String, SnapshotDto> snapshotByProjectUuid) {
  104. return (uuid1, uuid2) -> {
  105. SnapshotDto snapshot1 = snapshotByProjectUuid.get(uuid1);
  106. SnapshotDto snapshot2 = snapshotByProjectUuid.get(uuid2);
  107. if (snapshot1 == null && snapshot2 == null) {
  108. return 0;
  109. }
  110. if (snapshot1 == null) {
  111. return 1;
  112. }
  113. if (snapshot2 == null) {
  114. return -1;
  115. }
  116. return snapshot2.getCreatedAt().compareTo(snapshot1.getCreatedAt());
  117. };
  118. }
  119. private void removeExistingIndexationTasks(DbSession dbSession) {
  120. Set<String> ceQueueUuids = dbClient.ceQueueDao().selectAllInAscOrder(dbSession)
  121. .stream().filter(p -> p.getTaskType().equals(BRANCH_ISSUE_SYNC))
  122. .map(CeQueueDto::getUuid).collect(Collectors.toSet());
  123. Set<String> ceActivityUuids = dbClient.ceActivityDao().selectByTaskType(dbSession, BRANCH_ISSUE_SYNC)
  124. .stream().map(CeActivityDto::getUuid).collect(Collectors.toSet());
  125. removeIndexationTasks(dbSession, ceQueueUuids, ceActivityUuids);
  126. }
  127. private void removeExistingIndexationTasksForProject(DbSession dbSession, String projectUuid) {
  128. Set<String> ceQueueUuidsForProject = dbClient.ceQueueDao().selectByEntityUuid(dbSession, projectUuid)
  129. .stream().filter(p -> p.getTaskType().equals(BRANCH_ISSUE_SYNC))
  130. .map(CeQueueDto::getUuid).collect(Collectors.toSet());
  131. Set<String> ceActivityUuidsForProject = dbClient.ceActivityDao().selectByTaskType(dbSession, BRANCH_ISSUE_SYNC)
  132. .stream()
  133. .filter(ceActivityDto -> projectUuid.equals(ceActivityDto.getEntityUuid()))
  134. .map(CeActivityDto::getUuid).collect(Collectors.toSet());
  135. removeIndexationTasks(dbSession, ceQueueUuidsForProject, ceActivityUuidsForProject);
  136. }
  137. private void removeIndexationTasks(DbSession dbSession, Set<String> ceQueueUuids, Set<String> ceActivityUuids) {
  138. LOG.atInfo().setMessage("{} pending indexing task found to be deleted...")
  139. .addArgument(ceQueueUuids.size())
  140. .log();
  141. for (String uuid : ceQueueUuids) {
  142. dbClient.ceQueueDao().deleteByUuid(dbSession, uuid);
  143. }
  144. LOG.atInfo().setMessage("{} completed indexing task found to be deleted...")
  145. .addArgument(ceQueueUuids.size())
  146. .log();
  147. dbClient.ceActivityDao().deleteByUuids(dbSession, ceActivityUuids);
  148. LOG.info("Indexing task deletion complete.");
  149. LOG.info("Deleting tasks characteristics...");
  150. Set<String> tasksUuid = Stream.concat(ceQueueUuids.stream(), ceActivityUuids.stream()).collect(Collectors.toSet());
  151. dbClient.ceTaskCharacteristicsDao().deleteByTaskUuids(dbSession, tasksUuid);
  152. LOG.info("Tasks characteristics deletion complete.");
  153. dbSession.commit();
  154. }
  155. private CeTaskSubmit buildTaskSubmit(BranchDto branch) {
  156. Map<String, String> characteristics = new HashMap<>();
  157. characteristics.put(branch.getBranchType() == BranchType.BRANCH ? CeTaskCharacteristics.BRANCH : PULL_REQUEST, branch.getKey());
  158. characteristics.put(BRANCH_TYPE, branch.getBranchType().name());
  159. return ceQueue.prepareSubmit()
  160. .setType(BRANCH_ISSUE_SYNC)
  161. .setComponent(new CeTaskSubmit.Component(branch.getUuid(), branch.getProjectUuid()))
  162. .setCharacteristics(characteristics).build();
  163. }
  164. }