aboutsummaryrefslogtreecommitdiffstats
path: root/server
diff options
context:
space:
mode:
authorSébastien Lesaint <sebastien.lesaint@sonarsource.com>2018-03-01 10:08:31 +0100
committerSonarTech <sonartech@sonarsource.com>2018-03-23 20:20:52 +0100
commiteb87d2d89414c2d621515c8b44a91e94ee653660 (patch)
tree2f284381917683f2979725a9912a6de2a2b48ac6 /server
parent18b9b6a719434f206c65771fb6cbabd7df3090f3 (diff)
downloadsonarqube-eb87d2d89414c2d621515c8b44a91e94ee653660.tar.gz
sonarqube-eb87d2d89414c2d621515c8b44a91e94ee653660.zip
GOV-322 support not creating task in CeQueue if pending task exists
Diffstat (limited to 'server')
-rw-r--r--server/sonar-db-dao/src/main/java/org/sonar/db/ce/CeQueueDao.java27
-rw-r--r--server/sonar-db-dao/src/main/java/org/sonar/db/ce/CeQueueMapper.java4
-rw-r--r--server/sonar-db-dao/src/main/java/org/sonar/db/ce/QueueCount.java34
-rw-r--r--server/sonar-db-dao/src/main/resources/org/sonar/db/ce/CeQueueMapper.xml15
-rw-r--r--server/sonar-db-dao/src/test/java/org/sonar/db/ce/CeQueueDaoTest.java32
-rw-r--r--server/sonar-server/src/main/java/org/sonar/ce/queue/CeQueue.java28
-rw-r--r--server/sonar-server/src/main/java/org/sonar/ce/queue/CeQueueImpl.java164
-rw-r--r--server/sonar-server/src/test/java/org/sonar/ce/queue/CeQueueImplTest.java244
8 files changed, 468 insertions, 80 deletions
diff --git a/server/sonar-db-dao/src/main/java/org/sonar/db/ce/CeQueueDao.java b/server/sonar-db-dao/src/main/java/org/sonar/db/ce/CeQueueDao.java
index 30823b1253c..42260c55ae9 100644
--- a/server/sonar-db-dao/src/main/java/org/sonar/db/ce/CeQueueDao.java
+++ b/server/sonar-db-dao/src/main/java/org/sonar/db/ce/CeQueueDao.java
@@ -19,7 +19,9 @@
*/
package org.sonar.db.ce;
+import com.google.common.collect.ImmutableMap;
import java.util.List;
+import java.util.Map;
import java.util.Optional;
import java.util.Set;
import javax.annotation.Nullable;
@@ -30,6 +32,7 @@ import org.sonar.db.DbSession;
import org.sonar.db.Pagination;
import static java.util.Collections.emptyList;
+import static java.util.Collections.emptyMap;
import static org.sonar.db.DatabaseUtils.executeLargeUpdates;
import static org.sonar.db.ce.CeQueueDto.Status.IN_PROGRESS;
import static org.sonar.db.ce.CeQueueDto.Status.PENDING;
@@ -92,8 +95,7 @@ public class CeQueueDao implements Dao {
} else {
// executeLargeUpdates won't call the SQL command if knownWorkerUUIDs is empty
executeLargeUpdates(knownWorkerUUIDs,
- uuids -> mapper(dbSession).resetTasksWithUnknownWorkerUUIDs(uuids, system2.now())
- );
+ uuids -> mapper(dbSession).resetTasksWithUnknownWorkerUUIDs(uuids, system2.now()));
}
}
@@ -135,6 +137,27 @@ public class CeQueueDao implements Dao {
return mapper(dbSession).countByStatusAndComponentUuid(status, componentUuid);
}
+ /**
+ * Counts entries in the queue with the specified status for each specified component uuid.
+ *
+ * The returned map doesn't contain any entry for component uuid for which there is no entry in the queue (ie.
+ * all entries have a value >= 0).
+ */
+ public Map<String, Integer> countByStatusAndComponentUuids(DbSession dbSession, CeQueueDto.Status status, Set<String> componentUuids) {
+ if (componentUuids.isEmpty()) {
+ return emptyMap();
+ }
+
+ ImmutableMap.Builder<String, Integer> builder = ImmutableMap.builder();
+ executeLargeUpdates(
+ componentUuids,
+ uuids -> {
+ List<QueueCount> i = mapper(dbSession).countByStatusAndComponentUuids(status, componentUuids);
+ i.forEach(o -> builder.put(o.getComponentUuid(), o.getTotal()));
+ });
+ return builder.build();
+ }
+
public Optional<CeQueueDto> peek(DbSession session, String workerUuid, int maxExecutionCount) {
List<EligibleTaskDto> eligibles = mapper(session).selectEligibleForPeek(maxExecutionCount, ONE_RESULT_PAGINATION);
if (eligibles.isEmpty()) {
diff --git a/server/sonar-db-dao/src/main/java/org/sonar/db/ce/CeQueueMapper.java b/server/sonar-db-dao/src/main/java/org/sonar/db/ce/CeQueueMapper.java
index 9858fcb6e29..0d36f3e31a6 100644
--- a/server/sonar-db-dao/src/main/java/org/sonar/db/ce/CeQueueMapper.java
+++ b/server/sonar-db-dao/src/main/java/org/sonar/db/ce/CeQueueMapper.java
@@ -20,6 +20,7 @@
package org.sonar.db.ce;
import java.util.List;
+import java.util.Set;
import javax.annotation.CheckForNull;
import javax.annotation.Nullable;
import org.apache.ibatis.annotations.Param;
@@ -56,9 +57,10 @@ public interface CeQueueMapper {
*/
void resetAllInProgressTasks(@Param("updatedAt") long updatedAt);
-
int countByStatusAndComponentUuid(@Param("status") CeQueueDto.Status status, @Nullable @Param("componentUuid") String componentUuid);
+ List<QueueCount> countByStatusAndComponentUuids(@Param("status") CeQueueDto.Status status, @Param("componentUuids") Set<String> componentUuids);
+
void insert(CeQueueDto dto);
void resetAllToPendingStatus(@Param("updatedAt") long updatedAt);
diff --git a/server/sonar-db-dao/src/main/java/org/sonar/db/ce/QueueCount.java b/server/sonar-db-dao/src/main/java/org/sonar/db/ce/QueueCount.java
new file mode 100644
index 00000000000..5b948c38df9
--- /dev/null
+++ b/server/sonar-db-dao/src/main/java/org/sonar/db/ce/QueueCount.java
@@ -0,0 +1,34 @@
+/*
+ * SonarQube
+ * Copyright (C) 2009-2018 SonarSource SA
+ * mailto:info AT sonarsource DOT com
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 3 of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this program; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ */
+package org.sonar.db.ce;
+
+public class QueueCount {
+ // set by reflection by MyBatis
+ private String componentUuid;
+ private int total;
+
+ public String getComponentUuid() {
+ return componentUuid;
+ }
+
+ public int getTotal() {
+ return total;
+ }
+}
diff --git a/server/sonar-db-dao/src/main/resources/org/sonar/db/ce/CeQueueMapper.xml b/server/sonar-db-dao/src/main/resources/org/sonar/db/ce/CeQueueMapper.xml
index 8259043c4e0..f10b38becf1 100644
--- a/server/sonar-db-dao/src/main/resources/org/sonar/db/ce/CeQueueMapper.xml
+++ b/server/sonar-db-dao/src/main/resources/org/sonar/db/ce/CeQueueMapper.xml
@@ -49,6 +49,21 @@
</if>
</select>
+ <select id="countByStatusAndComponentUuids" resultType="org.sonar.db.ce.QueueCount">
+ select
+ component_uuid as componentUuid,
+ count(1) as total
+ from
+ ce_queue
+ where
+ status=#{status,jdbcType=VARCHAR}
+ and component_uuid in
+ <foreach collection="componentUuids" open="(" close=")" item="cUuid" separator=",">
+ #{cUuid,jdbcType=VARCHAR}
+ </foreach>
+ group by component_uuid
+ </select>
+
<select id="countAll" resultType="int">
select
count(1)
diff --git a/server/sonar-db-dao/src/test/java/org/sonar/db/ce/CeQueueDaoTest.java b/server/sonar-db-dao/src/test/java/org/sonar/db/ce/CeQueueDaoTest.java
index 1a6c1b7d9d1..d200ea6eea8 100644
--- a/server/sonar-db-dao/src/test/java/org/sonar/db/ce/CeQueueDaoTest.java
+++ b/server/sonar-db-dao/src/test/java/org/sonar/db/ce/CeQueueDaoTest.java
@@ -42,6 +42,7 @@ import static com.google.common.collect.FluentIterable.from;
import static com.google.common.collect.Lists.newArrayList;
import static java.util.Collections.singletonList;
import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.entry;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.sonar.db.ce.CeQueueDto.Status.IN_PROGRESS;
@@ -281,7 +282,6 @@ public class CeQueueDaoTest {
verifyUnchangedByResetToPendingForWorker(o4);
}
-
@Test
public void resetTasksWithUnknownWorkerUUIDs_with_empty_set_resets_status_of_all_pending_tasks() {
long startedAt = 2_099_888L;
@@ -597,6 +597,36 @@ public class CeQueueDaoTest {
assertThat(underTest.countByStatus(db.getSession(), IN_PROGRESS)).isEqualTo(2);
}
+ @Test
+ public void count_by_status_and_component_uuids() {
+ // task retrieved in the queue
+ insert(newCeQueueDto(TASK_UUID_1)
+ .setComponentUuid(COMPONENT_UUID_1)
+ .setStatus(IN_PROGRESS)
+ .setTaskType(CeTaskTypes.REPORT)
+ .setCreatedAt(100_000L));
+ // on component uuid 2, not returned
+ insert(newCeQueueDto(TASK_UUID_2)
+ .setComponentUuid(COMPONENT_UUID_2)
+ .setStatus(IN_PROGRESS)
+ .setTaskType(CeTaskTypes.REPORT)
+ .setCreatedAt(100_000L));
+ // pending status, not returned
+ insert(newCeQueueDto(TASK_UUID_3)
+ .setComponentUuid(COMPONENT_UUID_1)
+ .setStatus(PENDING)
+ .setTaskType(CeTaskTypes.REPORT)
+ .setCreatedAt(100_000L));
+
+ assertThat(underTest.countByStatusAndComponentUuids(db.getSession(), IN_PROGRESS, ImmutableSet.of())).isEmpty();
+ assertThat(underTest.countByStatusAndComponentUuids(db.getSession(), IN_PROGRESS, ImmutableSet.of("non existing component uuid"))).isEmpty();
+ assertThat(underTest.countByStatusAndComponentUuids(db.getSession(), IN_PROGRESS, ImmutableSet.of(COMPONENT_UUID_1, COMPONENT_UUID_2)))
+ .containsOnly(entry(COMPONENT_UUID_1, 1), entry(COMPONENT_UUID_2, 1));
+ assertThat(underTest.countByStatusAndComponentUuids(db.getSession(), PENDING, ImmutableSet.of(COMPONENT_UUID_1, COMPONENT_UUID_2)))
+ .containsOnly(entry(COMPONENT_UUID_1, 1));
+ assertThat(underTest.countByStatus(db.getSession(), IN_PROGRESS)).isEqualTo(2);
+ }
+
private void insert(CeQueueDto dto) {
underTest.insert(db.getSession(), dto);
db.commit();
diff --git a/server/sonar-server/src/main/java/org/sonar/ce/queue/CeQueue.java b/server/sonar-server/src/main/java/org/sonar/ce/queue/CeQueue.java
index 7faa2ae4524..943c8785161 100644
--- a/server/sonar-server/src/main/java/org/sonar/ce/queue/CeQueue.java
+++ b/server/sonar-server/src/main/java/org/sonar/ce/queue/CeQueue.java
@@ -22,6 +22,7 @@ package org.sonar.ce.queue;
import java.util.Collection;
import java.util.List;
+import java.util.Optional;
import org.sonar.db.DbSession;
import org.sonar.db.ce.CeQueueDto;
@@ -34,7 +35,7 @@ import org.sonar.db.ce.CeQueueDto;
*/
public interface CeQueue {
/**
- * Build an instance of {@link CeTaskSubmit} required for {@link #submit(CeTaskSubmit)}. It allows
+ * Build an instance of {@link CeTaskSubmit} required for {@link #submit(CeTaskSubmit, SubmitOption...)}. It allows
* to enforce that task ids are generated by the queue. It's used also for having access
* to the id before submitting the task to the queue.
*/
@@ -43,22 +44,35 @@ public interface CeQueue {
/**
* Submits a task to the queue. The task is processed asynchronously.
* <p>
- * This method is equivalent to calling {@code massSubmit(Collections.singletonList(submission))}.
- * </p>
+ * Convenience method for calling {@link #submit(CeTaskSubmit, SubmitOption...)} without any {@link SubmitOption}
+ * and which does not returning an {@link Optional}.
+ * <p>
+ * This method is equivalent to calling {@link #massSubmit(Collection, SubmitOption...)} with a singleton list and no
+ * option.
*
* @throws IllegalStateException If submits are paused (see {@link #isSubmitPaused()})
*/
CeTask submit(CeTaskSubmit submission);
/**
+ * Submits a task to the queue. The task is processed asynchronously.
+ * <p>
+ * This method is equivalent to calling {@code massSubmit(Collections.singletonList(submission))}.
+ *
+ * @return empty if {@code options} contains {@link SubmitOption#UNIQUE_QUEUE_PER_COMPONENT UNIQUE_QUEUE_PER_COMPONENT}
+ * and there's already a queued task, otherwise the created task.
+ */
+ Optional<CeTask> submit(CeTaskSubmit submission, SubmitOption... options);
+
+ /**
* Submits multiple tasks to the queue at once. All tasks are processed asynchronously.
* <p>
- * This method will perform significantly better that calling {@link #submit(CeTaskSubmit)} in a loop.
+ * This method will perform significantly better that calling {@link #submit(CeTaskSubmit, SubmitOption...)} in a loop.
* </p>
*
* @throws IllegalStateException If submits are paused (see {@link #isSubmitPaused()})
*/
- List<CeTask> massSubmit(Collection<CeTaskSubmit> submissions);
+ List<CeTask> massSubmit(Collection<CeTaskSubmit> submissions, SubmitOption... options);
/**
* Cancels a task in status {@link org.sonar.db.ce.CeQueueDto.Status#PENDING}. An unchecked
@@ -82,4 +96,8 @@ public interface CeQueue {
boolean isSubmitPaused();
+ enum SubmitOption {
+ UNIQUE_QUEUE_PER_COMPONENT
+ }
+
}
diff --git a/server/sonar-server/src/main/java/org/sonar/ce/queue/CeQueueImpl.java b/server/sonar-server/src/main/java/org/sonar/ce/queue/CeQueueImpl.java
index 9a9bd967afa..4576f2a2101 100644
--- a/server/sonar-server/src/main/java/org/sonar/ce/queue/CeQueueImpl.java
+++ b/server/sonar-server/src/main/java/org/sonar/ce/queue/CeQueueImpl.java
@@ -19,24 +19,24 @@
*/
package org.sonar.ce.queue;
-import static com.google.common.base.Preconditions.checkState;
-import static com.google.common.base.Predicates.notNull;
-import static com.google.common.collect.FluentIterable.from;
-import static java.util.Collections.singleton;
-import static java.util.Objects.requireNonNull;
-
+import com.google.common.base.Function;
+import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableMap;
+import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
+import java.util.EnumSet;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
-
+import java.util.function.Predicate;
+import java.util.stream.Collectors;
import javax.annotation.Nonnull;
-import javax.annotation.Nullable;
-
import org.sonar.api.ce.ComputeEngineSide;
import org.sonar.core.util.UuidFactory;
+import org.sonar.core.util.stream.MoreCollectors;
import org.sonar.db.DbClient;
import org.sonar.db.DbSession;
import org.sonar.db.ce.CeActivityDto;
@@ -44,8 +44,13 @@ import org.sonar.db.ce.CeQueueDto;
import org.sonar.db.component.ComponentDto;
import org.sonar.server.organization.DefaultOrganizationProvider;
-import com.google.common.base.Function;
-import com.google.common.collect.ImmutableMap;
+import static com.google.common.base.Preconditions.checkState;
+import static com.google.common.collect.FluentIterable.from;
+import static java.util.Collections.singleton;
+import static java.util.Objects.requireNonNull;
+import static org.sonar.ce.queue.CeQueue.SubmitOption.UNIQUE_QUEUE_PER_COMPONENT;
+import static org.sonar.core.util.stream.MoreCollectors.toEnumSet;
+import static org.sonar.db.ce.CeQueueDto.Status.PENDING;
@ComputeEngineSide
public class CeQueueImpl implements CeQueue {
@@ -70,61 +75,123 @@ public class CeQueueImpl implements CeQueue {
@Override
public CeTask submit(CeTaskSubmit submission) {
- checkState(!submitPaused.get(), "Compute Engine does not currently accept new tasks");
+ return submit(submission, EnumSet.noneOf(SubmitOption.class)).get();
+ }
+
+ @Override
+ public java.util.Optional<CeTask> submit(CeTaskSubmit submission, SubmitOption... options) {
+ return submit(submission, toSet(options));
+ }
+ private java.util.Optional<CeTask> submit(CeTaskSubmit submission, EnumSet<SubmitOption> submitOptions) {
+ checkState(!submitPaused.get(), "Compute Engine does not currently accept new tasks");
try (DbSession dbSession = dbClient.openSession(false)) {
- CeQueueDto dto = new CeTaskSubmitToInsertedCeQueueDto(dbSession, dbClient).apply(submission);
+ if (submitOptions.contains(UNIQUE_QUEUE_PER_COMPONENT)
+ && submission.getComponentUuid() != null
+ && dbClient.ceQueueDao().countByStatusAndComponentUuid(dbSession, PENDING, submission.getComponentUuid()) > 0) {
+ return java.util.Optional.empty();
+ }
+ CeQueueDto dto = addToQueueInDb(dbSession, submission);
CeTask task = loadTask(dbSession, dto);
dbSession.commit();
- return task;
+ return java.util.Optional.of(task);
}
}
@Override
- public List<CeTask> massSubmit(Collection<CeTaskSubmit> submissions) {
+ public List<CeTask> massSubmit(Collection<CeTaskSubmit> submissions, SubmitOption... options) {
checkState(!submitPaused.get(), "Compute Engine does not currently accept new tasks");
if (submissions.isEmpty()) {
return Collections.emptyList();
}
try (DbSession dbSession = dbClient.openSession(true)) {
- List<CeQueueDto> ceQueueDtos = from(submissions)
- .transform(new CeTaskSubmitToInsertedCeQueueDto(dbSession, dbClient))
- .toList();
+ List<CeQueueDto> ceQueueDtos = submissions.stream()
+ .filter(filterBySubmitOptions(options, submissions, dbSession))
+ .map(submission -> addToQueueInDb(dbSession, submission))
+ .collect(Collectors.toList());
List<CeTask> tasks = loadTasks(dbSession, ceQueueDtos);
dbSession.commit();
return tasks;
}
}
+ private Predicate<CeTaskSubmit> filterBySubmitOptions(SubmitOption[] options, Collection<CeTaskSubmit> submissions, DbSession dbSession) {
+ EnumSet<SubmitOption> submitOptions = toSet(options);
+
+ if (submitOptions.contains(UNIQUE_QUEUE_PER_COMPONENT)) {
+ Set<String> componentUuids = submissions.stream()
+ .map(CeTaskSubmit::getComponentUuid)
+ .filter(Objects::nonNull)
+ .collect(MoreCollectors.toSet(submissions.size()));
+ if (componentUuids.isEmpty()) {
+ return t -> true;
+ }
+ return new NoPendingTaskFilter(dbSession, componentUuids);
+ }
+
+ return t -> true;
+ }
+
+ private class NoPendingTaskFilter implements Predicate<CeTaskSubmit> {
+ private final Map<String, Integer> queuedItemsByComponentUuid;
+
+ private NoPendingTaskFilter(DbSession dbSession, Set<String> componentUuids) {
+ queuedItemsByComponentUuid = dbClient.ceQueueDao().countByStatusAndComponentUuids(dbSession, PENDING, componentUuids);
+ }
+
+ @Override
+ public boolean test(CeTaskSubmit ceTaskSubmit) {
+ String componentUuid = ceTaskSubmit.getComponentUuid();
+ return componentUuid == null || queuedItemsByComponentUuid.getOrDefault(componentUuid, 0) == 0;
+ }
+ }
+
+ private static EnumSet<SubmitOption> toSet(SubmitOption[] options) {
+ return Arrays.stream(options).collect(toEnumSet(SubmitOption.class));
+ }
+
+ private CeQueueDto addToQueueInDb(DbSession dbSession, CeTaskSubmit submission) {
+ CeQueueDto dto = new CeQueueDto();
+ dto.setUuid(submission.getUuid());
+ dto.setTaskType(submission.getType());
+ dto.setComponentUuid(submission.getComponentUuid());
+ dto.setStatus(PENDING);
+ dto.setSubmitterLogin(submission.getSubmitterLogin());
+ dto.setStartedAt(null);
+ dbClient.ceQueueDao().insert(dbSession, dto);
+ return dto;
+ }
+
protected CeTask loadTask(DbSession dbSession, CeQueueDto dto) {
- if (dto.getComponentUuid() == null) {
+ String componentUuid = dto.getComponentUuid();
+ if (componentUuid == null) {
return new CeQueueDtoToCeTask(defaultOrganizationProvider.get().getUuid()).apply(dto);
}
- com.google.common.base.Optional<ComponentDto> componentDto = dbClient.componentDao().selectByUuid(dbSession, dto.getComponentUuid());
+ Optional<ComponentDto> componentDto = dbClient.componentDao().selectByUuid(dbSession, componentUuid);
if (componentDto.isPresent()) {
- return new CeQueueDtoToCeTask(defaultOrganizationProvider.get().getUuid(), ImmutableMap.of(dto.getComponentUuid(), componentDto.get())).apply(dto);
+ return new CeQueueDtoToCeTask(defaultOrganizationProvider.get().getUuid(), ImmutableMap.of(componentUuid, componentDto.get())).apply(dto);
}
return new CeQueueDtoToCeTask(defaultOrganizationProvider.get().getUuid()).apply(dto);
}
private List<CeTask> loadTasks(DbSession dbSession, List<CeQueueDto> dtos) {
- Set<String> componentUuids = from(dtos)
- .transform(CeQueueDtoToComponentUuid.INSTANCE)
- .filter(notNull())
- .toSet();
+ Set<String> componentUuids = dtos.stream()
+ .map(CeQueueDto::getComponentUuid)
+ .filter(Objects::nonNull)
+ .collect(Collectors.toSet());
Map<String, ComponentDto> componentDtoByUuid = from(dbClient.componentDao()
.selectByUuids(dbSession, componentUuids))
- .uniqueIndex(ComponentDto::uuid);
+ .uniqueIndex(ComponentDto::uuid);
- return from(dtos)
- .transform(new CeQueueDtoToCeTask(defaultOrganizationProvider.get().getUuid(), componentDtoByUuid))
- .toList();
+ return dtos.stream()
+ .map(new CeQueueDtoToCeTask(defaultOrganizationProvider.get().getUuid(), componentDtoByUuid)::apply)
+ .collect(MoreCollectors.toList(dtos.size()));
}
@Override
public void cancel(DbSession dbSession, CeQueueDto ceQueueDto) {
- checkState(CeQueueDto.Status.PENDING.equals(ceQueueDto.getStatus()), "Task is in progress and can't be canceled [uuid=%s]", ceQueueDto.getUuid());
+ checkState(PENDING.equals(ceQueueDto.getStatus()), "Task is in progress and can't be canceled [uuid=%s]", ceQueueDto.getUuid());
cancelImpl(dbSession, ceQueueDto);
}
@@ -178,11 +245,11 @@ public class CeQueueImpl implements CeQueue {
private final String defaultOrganizationUuid;
private final Map<String, ComponentDto> componentDtoByUuid;
- public CeQueueDtoToCeTask(String defaultOrganizationUuid) {
+ private CeQueueDtoToCeTask(String defaultOrganizationUuid) {
this(defaultOrganizationUuid, Collections.emptyMap());
}
- public CeQueueDtoToCeTask(String defaultOrganizationUuid, Map<String, ComponentDto> componentDtoByUuid) {
+ private CeQueueDtoToCeTask(String defaultOrganizationUuid, Map<String, ComponentDto> componentDtoByUuid) {
this.defaultOrganizationUuid = requireNonNull(defaultOrganizationUuid, "defaultOrganizationUuid can't be null");
this.componentDtoByUuid = componentDtoByUuid;
}
@@ -212,37 +279,4 @@ public class CeQueueImpl implements CeQueue {
}
}
- private static class CeTaskSubmitToInsertedCeQueueDto implements Function<CeTaskSubmit, CeQueueDto> {
- private final DbSession dbSession;
- private final DbClient dbClient;
-
- public CeTaskSubmitToInsertedCeQueueDto(DbSession dbSession, DbClient dbClient) {
- this.dbSession = dbSession;
- this.dbClient = dbClient;
- }
-
- @Override
- @Nonnull
- public CeQueueDto apply(@Nonnull CeTaskSubmit submission) {
- CeQueueDto dto = new CeQueueDto();
- dto.setUuid(submission.getUuid());
- dto.setTaskType(submission.getType());
- dto.setComponentUuid(submission.getComponentUuid());
- dto.setStatus(CeQueueDto.Status.PENDING);
- dto.setSubmitterLogin(submission.getSubmitterLogin());
- dto.setStartedAt(null);
- dbClient.ceQueueDao().insert(dbSession, dto);
- return dto;
- }
- }
-
- private enum CeQueueDtoToComponentUuid implements Function<CeQueueDto, String> {
- INSTANCE;
-
- @Override
- @Nullable
- public String apply(@Nonnull CeQueueDto input) {
- return input.getComponentUuid();
- }
- }
}
diff --git a/server/sonar-server/src/test/java/org/sonar/ce/queue/CeQueueImplTest.java b/server/sonar-server/src/test/java/org/sonar/ce/queue/CeQueueImplTest.java
index 961db97a11c..5ed3ab13ae1 100644
--- a/server/sonar-server/src/test/java/org/sonar/ce/queue/CeQueueImplTest.java
+++ b/server/sonar-server/src/test/java/org/sonar/ce/queue/CeQueueImplTest.java
@@ -19,21 +19,19 @@
*/
package org.sonar.ce.queue;
-import static java.util.Arrays.asList;
-import static org.assertj.core.api.Assertions.assertThat;
-import static org.hamcrest.Matchers.startsWith;
-
import java.util.List;
import java.util.Optional;
-
+import java.util.Random;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
import javax.annotation.Nullable;
-
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.sonar.api.utils.System2;
import org.sonar.api.utils.internal.TestSystem2;
import org.sonar.core.util.UuidFactory;
+import org.sonar.core.util.UuidFactoryFast;
import org.sonar.core.util.UuidFactoryImpl;
import org.sonar.db.DbSession;
import org.sonar.db.DbTester;
@@ -45,6 +43,13 @@ import org.sonar.db.component.ComponentTesting;
import org.sonar.server.organization.DefaultOrganizationProvider;
import org.sonar.server.organization.TestDefaultOrganizationProvider;
+import static com.google.common.collect.ImmutableList.of;
+import static java.util.Arrays.asList;
+import static org.apache.commons.lang.RandomStringUtils.randomAlphabetic;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.hamcrest.Matchers.startsWith;
+import static org.sonar.ce.queue.CeQueue.SubmitOption.UNIQUE_QUEUE_PER_COMPONENT;
+
public class CeQueueImplTest {
private static final String WORKER_UUID = "workerUuid";
@@ -94,6 +99,96 @@ public class CeQueueImplTest {
}
@Test
+ public void submit_with_UNIQUE_QUEUE_PER_COMPONENT_creates_task_without_component_when_there_is_a_pending_task_without_component() {
+ CeTaskSubmit taskSubmit = createTaskSubmit("no_component");
+ CeQueueDto dto = insertPendingInQueue(null);
+
+ Optional<CeTask> task = underTest.submit(taskSubmit, UNIQUE_QUEUE_PER_COMPONENT);
+
+ assertThat(task).isNotEmpty();
+ assertThat(db.getDbClient().ceQueueDao().selectAllInAscOrder(db.getSession()))
+ .extracting(CeQueueDto::getUuid)
+ .containsOnly(dto.getUuid(), task.get().getUuid());
+ }
+
+ @Test
+ public void submit_with_UNIQUE_QUEUE_PER_COMPONENT_creates_task_when_there_is_a_pending_task_for_another_component() {
+ String componentUuid = randomAlphabetic(5);
+ String otherComponentUuid = randomAlphabetic(6);
+ CeTaskSubmit taskSubmit = createTaskSubmit("with_component", componentUuid, null);
+ CeQueueDto dto = insertPendingInQueue(otherComponentUuid);
+
+ Optional<CeTask> task = underTest.submit(taskSubmit, UNIQUE_QUEUE_PER_COMPONENT);
+
+ assertThat(task).isNotEmpty();
+ assertThat(db.getDbClient().ceQueueDao().selectAllInAscOrder(db.getSession()))
+ .extracting(CeQueueDto::getUuid)
+ .containsOnly(dto.getUuid(), task.get().getUuid());
+ }
+
+ @Test
+ public void submit_with_UNIQUE_QUEUE_PER_COMPONENT_does_not_create_task_when_there_is_one_pending_task_for_component() {
+ String componentUuid = randomAlphabetic(5);
+ CeTaskSubmit taskSubmit = createTaskSubmit("with_component", componentUuid, null);
+ CeQueueDto dto = insertPendingInQueue(componentUuid);
+
+ Optional<CeTask> task = underTest.submit(taskSubmit, UNIQUE_QUEUE_PER_COMPONENT);
+
+ assertThat(task).isEmpty();
+ assertThat(db.getDbClient().ceQueueDao().selectAllInAscOrder(db.getSession()))
+ .extracting(CeQueueDto::getUuid)
+ .containsOnly(dto.getUuid());
+ }
+
+ @Test
+ public void submit_with_UNIQUE_QUEUE_PER_COMPONENT_does_not_create_task_when_there_is_many_pending_task_for_component() {
+ String componentUuid = randomAlphabetic(5);
+ CeTaskSubmit taskSubmit = createTaskSubmit("with_component", componentUuid, null);
+ String[] uuids = IntStream.range(0, 2 + new Random().nextInt(5))
+ .mapToObj(i -> insertPendingInQueue(componentUuid))
+ .map(CeQueueDto::getUuid)
+ .toArray(String[]::new);
+
+ Optional<CeTask> task = underTest.submit(taskSubmit, UNIQUE_QUEUE_PER_COMPONENT);
+
+ assertThat(task).isEmpty();
+ assertThat(db.getDbClient().ceQueueDao().selectAllInAscOrder(db.getSession()))
+ .extracting(CeQueueDto::getUuid)
+ .containsOnly(uuids);
+ }
+
+ @Test
+ public void submit_without_UNIQUE_QUEUE_PER_COMPONENT_creates_task_when_there_is_one_pending_task_for_component() {
+ String componentUuid = randomAlphabetic(5);
+ CeTaskSubmit taskSubmit = createTaskSubmit("with_component", componentUuid, null);
+ CeQueueDto dto = insertPendingInQueue(componentUuid);
+
+ CeTask task = underTest.submit(taskSubmit);
+
+ assertThat(db.getDbClient().ceQueueDao().selectAllInAscOrder(db.getSession()))
+ .extracting(CeQueueDto::getUuid)
+ .containsOnly(dto.getUuid(), task.getUuid());
+ }
+
+ @Test
+ public void submit_without_UNIQUE_QUEUE_PER_COMPONENT_creates_task_when_there_is_many_pending_task_for_component() {
+ String componentUuid = randomAlphabetic(5);
+ CeTaskSubmit taskSubmit = createTaskSubmit("with_component", componentUuid, null);
+ String[] uuids = IntStream.range(0, 2 + new Random().nextInt(5))
+ .mapToObj(i -> insertPendingInQueue(componentUuid))
+ .map(CeQueueDto::getUuid)
+ .toArray(String[]::new);
+
+ CeTask task = underTest.submit(taskSubmit);
+
+ assertThat(db.getDbClient().ceQueueDao().selectAllInAscOrder(db.getSession()))
+ .extracting(CeQueueDto::getUuid)
+ .hasSize(uuids.length + 1)
+ .contains(uuids)
+ .contains(task.getUuid());
+ }
+
+ @Test
public void submit_fails_with_ISE_if_paused() {
underTest.pauseSubmit();
@@ -131,6 +226,132 @@ public class CeQueueImplTest {
}
@Test
+ public void massSubmit_with_UNIQUE_QUEUE_PER_COMPONENT_creates_task_without_component_when_there_is_a_pending_task_without_component() {
+ CeTaskSubmit taskSubmit = createTaskSubmit("no_component");
+ CeQueueDto dto = insertPendingInQueue(null);
+
+ List<CeTask> tasks = underTest.massSubmit(of(taskSubmit), UNIQUE_QUEUE_PER_COMPONENT);
+
+ assertThat(tasks).hasSize(1);
+ assertThat(db.getDbClient().ceQueueDao().selectAllInAscOrder(db.getSession()))
+ .extracting(CeQueueDto::getUuid)
+ .containsOnly(dto.getUuid(), tasks.iterator().next().getUuid());
+ }
+
+ @Test
+ public void massSubmit_with_UNIQUE_QUEUE_PER_COMPONENT_creates_task_when_there_is_a_pending_task_for_another_component() {
+ String componentUuid = randomAlphabetic(5);
+ String otherComponentUuid = randomAlphabetic(6);
+ CeTaskSubmit taskSubmit = createTaskSubmit("with_component", componentUuid, null);
+ CeQueueDto dto = insertPendingInQueue(otherComponentUuid);
+
+ List<CeTask> tasks = underTest.massSubmit(of(taskSubmit), UNIQUE_QUEUE_PER_COMPONENT);
+
+ assertThat(tasks).hasSize(1);
+ assertThat(db.getDbClient().ceQueueDao().selectAllInAscOrder(db.getSession()))
+ .extracting(CeQueueDto::getUuid)
+ .containsOnly(dto.getUuid(), tasks.iterator().next().getUuid());
+ }
+
+ @Test
+ public void massSubmit_with_UNIQUE_QUEUE_PER_COMPONENT_does_not_create_task_when_there_is_one_pending_task_for_component() {
+ String componentUuid = randomAlphabetic(5);
+ CeTaskSubmit taskSubmit = createTaskSubmit("with_component", componentUuid, null);
+ CeQueueDto dto = insertPendingInQueue(componentUuid);
+
+ List<CeTask> tasks = underTest.massSubmit(of(taskSubmit), UNIQUE_QUEUE_PER_COMPONENT);
+
+ assertThat(tasks).isEmpty();
+ assertThat(db.getDbClient().ceQueueDao().selectAllInAscOrder(db.getSession()))
+ .extracting(CeQueueDto::getUuid)
+ .containsOnly(dto.getUuid());
+ }
+
+ @Test
+ public void massSubmit_with_UNIQUE_QUEUE_PER_COMPONENT_does_not_create_task_when_there_is_many_pending_task_for_component() {
+ String componentUuid = randomAlphabetic(5);
+ CeTaskSubmit taskSubmit = createTaskSubmit("with_component", componentUuid, null);
+ String[] uuids = IntStream.range(0, 2 + new Random().nextInt(5))
+ .mapToObj(i -> insertPendingInQueue(componentUuid))
+ .map(CeQueueDto::getUuid)
+ .toArray(String[]::new);
+
+ List<CeTask> tasks = underTest.massSubmit(of(taskSubmit), UNIQUE_QUEUE_PER_COMPONENT);
+
+ assertThat(tasks).isEmpty();
+ assertThat(db.getDbClient().ceQueueDao().selectAllInAscOrder(db.getSession()))
+ .extracting(CeQueueDto::getUuid)
+ .containsOnly(uuids);
+ }
+
+ @Test
+ public void massSubmit_without_UNIQUE_QUEUE_PER_COMPONENT_creates_task_when_there_is_one_pending_task_for_component() {
+ String componentUuid = randomAlphabetic(5);
+ CeTaskSubmit taskSubmit = createTaskSubmit("with_component", componentUuid, null);
+ CeQueueDto dto = insertPendingInQueue(componentUuid);
+
+ List<CeTask> tasks = underTest.massSubmit(of(taskSubmit));
+
+ assertThat(tasks).hasSize(1);
+ assertThat(db.getDbClient().ceQueueDao().selectAllInAscOrder(db.getSession()))
+ .extracting(CeQueueDto::getUuid)
+ .containsOnly(dto.getUuid(), tasks.iterator().next().getUuid());
+ }
+
+ @Test
+ public void massSubmit_without_UNIQUE_QUEUE_PER_COMPONENT_creates_task_when_there_is_many_pending_task_for_component() {
+ String componentUuid = randomAlphabetic(5);
+ CeTaskSubmit taskSubmit = createTaskSubmit("with_component", componentUuid, null);
+ String[] uuids = IntStream.range(0, 2 + new Random().nextInt(5))
+ .mapToObj(i -> insertPendingInQueue(componentUuid))
+ .map(CeQueueDto::getUuid)
+ .toArray(String[]::new);
+
+ List<CeTask> tasks = underTest.massSubmit(of(taskSubmit));
+
+ assertThat(tasks).hasSize(1);
+ assertThat(db.getDbClient().ceQueueDao().selectAllInAscOrder(db.getSession()))
+ .extracting(CeQueueDto::getUuid)
+ .hasSize(uuids.length + 1)
+ .contains(uuids)
+ .contains(tasks.iterator().next().getUuid());
+ }
+
+ @Test
+ public void massSubmit_with_UNIQUE_QUEUE_PER_COMPONENT_creates_tasks_depending_on_whether_there_is_pending_task_for_component() {
+ String componentUuid1 = randomAlphabetic(5);
+ String componentUuid2 = randomAlphabetic(6);
+ String componentUuid3 = randomAlphabetic(7);
+ String componentUuid4 = randomAlphabetic(8);
+ String componentUuid5 = randomAlphabetic(9);
+ CeTaskSubmit taskSubmit1 = createTaskSubmit("with_one_pending", componentUuid1, null);
+ CeQueueDto dto1 = insertPendingInQueue(componentUuid1);
+ CeTaskSubmit taskSubmit2 = createTaskSubmit("no_pending", componentUuid2, null);
+ CeTaskSubmit taskSubmit3 = createTaskSubmit("with_many_pending", componentUuid3, null);
+ String[] uuids3 = IntStream.range(0, 2 + new Random().nextInt(5))
+ .mapToObj(i -> insertPendingInQueue(componentUuid3))
+ .map(CeQueueDto::getUuid)
+ .toArray(String[]::new);
+ CeTaskSubmit taskSubmit4 = createTaskSubmit("no_pending_2", componentUuid4, null);
+ CeTaskSubmit taskSubmit5 = createTaskSubmit("with_pending_2", componentUuid5, null);
+ CeQueueDto dto5 = insertPendingInQueue(componentUuid5);
+
+ List<CeTask> tasks = underTest.massSubmit(of(taskSubmit1, taskSubmit2, taskSubmit3, taskSubmit4, taskSubmit5), UNIQUE_QUEUE_PER_COMPONENT);
+
+ assertThat(tasks)
+ .hasSize(2)
+ .extracting(CeTask::getComponentUuid)
+ .containsOnly(componentUuid2, componentUuid4);
+ assertThat(db.getDbClient().ceQueueDao().selectAllInAscOrder(db.getSession()))
+ .extracting(CeQueueDto::getUuid)
+ .hasSize(1 + uuids3.length + 1 + tasks.size())
+ .contains(dto1.getUuid())
+ .contains(uuids3)
+ .contains(dto5.getUuid())
+ .containsAll(tasks.stream().map(CeTask::getUuid).collect(Collectors.toList()));
+ }
+
+ @Test
public void cancel_pending() {
CeTask task = submit(CeTaskTypes.REPORT, "PROJECT_1");
CeQueueDto queueDto = db.getDbClient().ceQueueDao().selectByUuid(db.getSession(), task.getUuid()).get();
@@ -230,4 +451,15 @@ public class CeQueueImplTest {
session.commit();
return componentDto;
}
+
+ private CeQueueDto insertPendingInQueue(@Nullable String componentUuid) {
+ CeQueueDto dto = new CeQueueDto()
+ .setUuid(UuidFactoryFast.getInstance().create())
+ .setTaskType("some type")
+ .setComponentUuid(componentUuid)
+ .setStatus(CeQueueDto.Status.PENDING);
+ db.getDbClient().ceQueueDao().insert(db.getSession(), dto);
+ db.commit();
+ return dto;
+ }
}