diff options
author | Sébastien Lesaint <sebastien.lesaint@sonarsource.com> | 2018-03-01 10:08:31 +0100 |
---|---|---|
committer | SonarTech <sonartech@sonarsource.com> | 2018-03-23 20:20:52 +0100 |
commit | eb87d2d89414c2d621515c8b44a91e94ee653660 (patch) | |
tree | 2f284381917683f2979725a9912a6de2a2b48ac6 /server | |
parent | 18b9b6a719434f206c65771fb6cbabd7df3090f3 (diff) | |
download | sonarqube-eb87d2d89414c2d621515c8b44a91e94ee653660.tar.gz sonarqube-eb87d2d89414c2d621515c8b44a91e94ee653660.zip |
GOV-322 support not creating task in CeQueue if pending task exists
Diffstat (limited to 'server')
8 files changed, 468 insertions, 80 deletions
diff --git a/server/sonar-db-dao/src/main/java/org/sonar/db/ce/CeQueueDao.java b/server/sonar-db-dao/src/main/java/org/sonar/db/ce/CeQueueDao.java index 30823b1253c..42260c55ae9 100644 --- a/server/sonar-db-dao/src/main/java/org/sonar/db/ce/CeQueueDao.java +++ b/server/sonar-db-dao/src/main/java/org/sonar/db/ce/CeQueueDao.java @@ -19,7 +19,9 @@ */ package org.sonar.db.ce; +import com.google.common.collect.ImmutableMap; import java.util.List; +import java.util.Map; import java.util.Optional; import java.util.Set; import javax.annotation.Nullable; @@ -30,6 +32,7 @@ import org.sonar.db.DbSession; import org.sonar.db.Pagination; import static java.util.Collections.emptyList; +import static java.util.Collections.emptyMap; import static org.sonar.db.DatabaseUtils.executeLargeUpdates; import static org.sonar.db.ce.CeQueueDto.Status.IN_PROGRESS; import static org.sonar.db.ce.CeQueueDto.Status.PENDING; @@ -92,8 +95,7 @@ public class CeQueueDao implements Dao { } else { // executeLargeUpdates won't call the SQL command if knownWorkerUUIDs is empty executeLargeUpdates(knownWorkerUUIDs, - uuids -> mapper(dbSession).resetTasksWithUnknownWorkerUUIDs(uuids, system2.now()) - ); + uuids -> mapper(dbSession).resetTasksWithUnknownWorkerUUIDs(uuids, system2.now())); } } @@ -135,6 +137,27 @@ public class CeQueueDao implements Dao { return mapper(dbSession).countByStatusAndComponentUuid(status, componentUuid); } + /** + * Counts entries in the queue with the specified status for each specified component uuid. + * + * The returned map doesn't contain any entry for component uuid for which there is no entry in the queue (ie. + * all entries have a value >= 0). + */ + public Map<String, Integer> countByStatusAndComponentUuids(DbSession dbSession, CeQueueDto.Status status, Set<String> componentUuids) { + if (componentUuids.isEmpty()) { + return emptyMap(); + } + + ImmutableMap.Builder<String, Integer> builder = ImmutableMap.builder(); + executeLargeUpdates( + componentUuids, + uuids -> { + List<QueueCount> i = mapper(dbSession).countByStatusAndComponentUuids(status, componentUuids); + i.forEach(o -> builder.put(o.getComponentUuid(), o.getTotal())); + }); + return builder.build(); + } + public Optional<CeQueueDto> peek(DbSession session, String workerUuid, int maxExecutionCount) { List<EligibleTaskDto> eligibles = mapper(session).selectEligibleForPeek(maxExecutionCount, ONE_RESULT_PAGINATION); if (eligibles.isEmpty()) { diff --git a/server/sonar-db-dao/src/main/java/org/sonar/db/ce/CeQueueMapper.java b/server/sonar-db-dao/src/main/java/org/sonar/db/ce/CeQueueMapper.java index 9858fcb6e29..0d36f3e31a6 100644 --- a/server/sonar-db-dao/src/main/java/org/sonar/db/ce/CeQueueMapper.java +++ b/server/sonar-db-dao/src/main/java/org/sonar/db/ce/CeQueueMapper.java @@ -20,6 +20,7 @@ package org.sonar.db.ce; import java.util.List; +import java.util.Set; import javax.annotation.CheckForNull; import javax.annotation.Nullable; import org.apache.ibatis.annotations.Param; @@ -56,9 +57,10 @@ public interface CeQueueMapper { */ void resetAllInProgressTasks(@Param("updatedAt") long updatedAt); - int countByStatusAndComponentUuid(@Param("status") CeQueueDto.Status status, @Nullable @Param("componentUuid") String componentUuid); + List<QueueCount> countByStatusAndComponentUuids(@Param("status") CeQueueDto.Status status, @Param("componentUuids") Set<String> componentUuids); + void insert(CeQueueDto dto); void resetAllToPendingStatus(@Param("updatedAt") long updatedAt); diff --git a/server/sonar-db-dao/src/main/java/org/sonar/db/ce/QueueCount.java b/server/sonar-db-dao/src/main/java/org/sonar/db/ce/QueueCount.java new file mode 100644 index 00000000000..5b948c38df9 --- /dev/null +++ b/server/sonar-db-dao/src/main/java/org/sonar/db/ce/QueueCount.java @@ -0,0 +1,34 @@ +/* + * SonarQube + * Copyright (C) 2009-2018 SonarSource SA + * mailto:info AT sonarsource DOT com + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 3 of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with this program; if not, write to the Free Software Foundation, + * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ +package org.sonar.db.ce; + +public class QueueCount { + // set by reflection by MyBatis + private String componentUuid; + private int total; + + public String getComponentUuid() { + return componentUuid; + } + + public int getTotal() { + return total; + } +} diff --git a/server/sonar-db-dao/src/main/resources/org/sonar/db/ce/CeQueueMapper.xml b/server/sonar-db-dao/src/main/resources/org/sonar/db/ce/CeQueueMapper.xml index 8259043c4e0..f10b38becf1 100644 --- a/server/sonar-db-dao/src/main/resources/org/sonar/db/ce/CeQueueMapper.xml +++ b/server/sonar-db-dao/src/main/resources/org/sonar/db/ce/CeQueueMapper.xml @@ -49,6 +49,21 @@ </if> </select> + <select id="countByStatusAndComponentUuids" resultType="org.sonar.db.ce.QueueCount"> + select + component_uuid as componentUuid, + count(1) as total + from + ce_queue + where + status=#{status,jdbcType=VARCHAR} + and component_uuid in + <foreach collection="componentUuids" open="(" close=")" item="cUuid" separator=","> + #{cUuid,jdbcType=VARCHAR} + </foreach> + group by component_uuid + </select> + <select id="countAll" resultType="int"> select count(1) diff --git a/server/sonar-db-dao/src/test/java/org/sonar/db/ce/CeQueueDaoTest.java b/server/sonar-db-dao/src/test/java/org/sonar/db/ce/CeQueueDaoTest.java index 1a6c1b7d9d1..d200ea6eea8 100644 --- a/server/sonar-db-dao/src/test/java/org/sonar/db/ce/CeQueueDaoTest.java +++ b/server/sonar-db-dao/src/test/java/org/sonar/db/ce/CeQueueDaoTest.java @@ -42,6 +42,7 @@ import static com.google.common.collect.FluentIterable.from; import static com.google.common.collect.Lists.newArrayList; import static java.util.Collections.singletonList; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.entry; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; import static org.sonar.db.ce.CeQueueDto.Status.IN_PROGRESS; @@ -281,7 +282,6 @@ public class CeQueueDaoTest { verifyUnchangedByResetToPendingForWorker(o4); } - @Test public void resetTasksWithUnknownWorkerUUIDs_with_empty_set_resets_status_of_all_pending_tasks() { long startedAt = 2_099_888L; @@ -597,6 +597,36 @@ public class CeQueueDaoTest { assertThat(underTest.countByStatus(db.getSession(), IN_PROGRESS)).isEqualTo(2); } + @Test + public void count_by_status_and_component_uuids() { + // task retrieved in the queue + insert(newCeQueueDto(TASK_UUID_1) + .setComponentUuid(COMPONENT_UUID_1) + .setStatus(IN_PROGRESS) + .setTaskType(CeTaskTypes.REPORT) + .setCreatedAt(100_000L)); + // on component uuid 2, not returned + insert(newCeQueueDto(TASK_UUID_2) + .setComponentUuid(COMPONENT_UUID_2) + .setStatus(IN_PROGRESS) + .setTaskType(CeTaskTypes.REPORT) + .setCreatedAt(100_000L)); + // pending status, not returned + insert(newCeQueueDto(TASK_UUID_3) + .setComponentUuid(COMPONENT_UUID_1) + .setStatus(PENDING) + .setTaskType(CeTaskTypes.REPORT) + .setCreatedAt(100_000L)); + + assertThat(underTest.countByStatusAndComponentUuids(db.getSession(), IN_PROGRESS, ImmutableSet.of())).isEmpty(); + assertThat(underTest.countByStatusAndComponentUuids(db.getSession(), IN_PROGRESS, ImmutableSet.of("non existing component uuid"))).isEmpty(); + assertThat(underTest.countByStatusAndComponentUuids(db.getSession(), IN_PROGRESS, ImmutableSet.of(COMPONENT_UUID_1, COMPONENT_UUID_2))) + .containsOnly(entry(COMPONENT_UUID_1, 1), entry(COMPONENT_UUID_2, 1)); + assertThat(underTest.countByStatusAndComponentUuids(db.getSession(), PENDING, ImmutableSet.of(COMPONENT_UUID_1, COMPONENT_UUID_2))) + .containsOnly(entry(COMPONENT_UUID_1, 1)); + assertThat(underTest.countByStatus(db.getSession(), IN_PROGRESS)).isEqualTo(2); + } + private void insert(CeQueueDto dto) { underTest.insert(db.getSession(), dto); db.commit(); diff --git a/server/sonar-server/src/main/java/org/sonar/ce/queue/CeQueue.java b/server/sonar-server/src/main/java/org/sonar/ce/queue/CeQueue.java index 7faa2ae4524..943c8785161 100644 --- a/server/sonar-server/src/main/java/org/sonar/ce/queue/CeQueue.java +++ b/server/sonar-server/src/main/java/org/sonar/ce/queue/CeQueue.java @@ -22,6 +22,7 @@ package org.sonar.ce.queue; import java.util.Collection; import java.util.List; +import java.util.Optional; import org.sonar.db.DbSession; import org.sonar.db.ce.CeQueueDto; @@ -34,7 +35,7 @@ import org.sonar.db.ce.CeQueueDto; */ public interface CeQueue { /** - * Build an instance of {@link CeTaskSubmit} required for {@link #submit(CeTaskSubmit)}. It allows + * Build an instance of {@link CeTaskSubmit} required for {@link #submit(CeTaskSubmit, SubmitOption...)}. It allows * to enforce that task ids are generated by the queue. It's used also for having access * to the id before submitting the task to the queue. */ @@ -43,22 +44,35 @@ public interface CeQueue { /** * Submits a task to the queue. The task is processed asynchronously. * <p> - * This method is equivalent to calling {@code massSubmit(Collections.singletonList(submission))}. - * </p> + * Convenience method for calling {@link #submit(CeTaskSubmit, SubmitOption...)} without any {@link SubmitOption} + * and which does not returning an {@link Optional}. + * <p> + * This method is equivalent to calling {@link #massSubmit(Collection, SubmitOption...)} with a singleton list and no + * option. * * @throws IllegalStateException If submits are paused (see {@link #isSubmitPaused()}) */ CeTask submit(CeTaskSubmit submission); /** + * Submits a task to the queue. The task is processed asynchronously. + * <p> + * This method is equivalent to calling {@code massSubmit(Collections.singletonList(submission))}. + * + * @return empty if {@code options} contains {@link SubmitOption#UNIQUE_QUEUE_PER_COMPONENT UNIQUE_QUEUE_PER_COMPONENT} + * and there's already a queued task, otherwise the created task. + */ + Optional<CeTask> submit(CeTaskSubmit submission, SubmitOption... options); + + /** * Submits multiple tasks to the queue at once. All tasks are processed asynchronously. * <p> - * This method will perform significantly better that calling {@link #submit(CeTaskSubmit)} in a loop. + * This method will perform significantly better that calling {@link #submit(CeTaskSubmit, SubmitOption...)} in a loop. * </p> * * @throws IllegalStateException If submits are paused (see {@link #isSubmitPaused()}) */ - List<CeTask> massSubmit(Collection<CeTaskSubmit> submissions); + List<CeTask> massSubmit(Collection<CeTaskSubmit> submissions, SubmitOption... options); /** * Cancels a task in status {@link org.sonar.db.ce.CeQueueDto.Status#PENDING}. An unchecked @@ -82,4 +96,8 @@ public interface CeQueue { boolean isSubmitPaused(); + enum SubmitOption { + UNIQUE_QUEUE_PER_COMPONENT + } + } diff --git a/server/sonar-server/src/main/java/org/sonar/ce/queue/CeQueueImpl.java b/server/sonar-server/src/main/java/org/sonar/ce/queue/CeQueueImpl.java index 9a9bd967afa..4576f2a2101 100644 --- a/server/sonar-server/src/main/java/org/sonar/ce/queue/CeQueueImpl.java +++ b/server/sonar-server/src/main/java/org/sonar/ce/queue/CeQueueImpl.java @@ -19,24 +19,24 @@ */ package org.sonar.ce.queue; -import static com.google.common.base.Preconditions.checkState; -import static com.google.common.base.Predicates.notNull; -import static com.google.common.collect.FluentIterable.from; -import static java.util.Collections.singleton; -import static java.util.Objects.requireNonNull; - +import com.google.common.base.Function; +import com.google.common.base.Optional; +import com.google.common.collect.ImmutableMap; +import java.util.Arrays; import java.util.Collection; import java.util.Collections; +import java.util.EnumSet; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; - +import java.util.function.Predicate; +import java.util.stream.Collectors; import javax.annotation.Nonnull; -import javax.annotation.Nullable; - import org.sonar.api.ce.ComputeEngineSide; import org.sonar.core.util.UuidFactory; +import org.sonar.core.util.stream.MoreCollectors; import org.sonar.db.DbClient; import org.sonar.db.DbSession; import org.sonar.db.ce.CeActivityDto; @@ -44,8 +44,13 @@ import org.sonar.db.ce.CeQueueDto; import org.sonar.db.component.ComponentDto; import org.sonar.server.organization.DefaultOrganizationProvider; -import com.google.common.base.Function; -import com.google.common.collect.ImmutableMap; +import static com.google.common.base.Preconditions.checkState; +import static com.google.common.collect.FluentIterable.from; +import static java.util.Collections.singleton; +import static java.util.Objects.requireNonNull; +import static org.sonar.ce.queue.CeQueue.SubmitOption.UNIQUE_QUEUE_PER_COMPONENT; +import static org.sonar.core.util.stream.MoreCollectors.toEnumSet; +import static org.sonar.db.ce.CeQueueDto.Status.PENDING; @ComputeEngineSide public class CeQueueImpl implements CeQueue { @@ -70,61 +75,123 @@ public class CeQueueImpl implements CeQueue { @Override public CeTask submit(CeTaskSubmit submission) { - checkState(!submitPaused.get(), "Compute Engine does not currently accept new tasks"); + return submit(submission, EnumSet.noneOf(SubmitOption.class)).get(); + } + + @Override + public java.util.Optional<CeTask> submit(CeTaskSubmit submission, SubmitOption... options) { + return submit(submission, toSet(options)); + } + private java.util.Optional<CeTask> submit(CeTaskSubmit submission, EnumSet<SubmitOption> submitOptions) { + checkState(!submitPaused.get(), "Compute Engine does not currently accept new tasks"); try (DbSession dbSession = dbClient.openSession(false)) { - CeQueueDto dto = new CeTaskSubmitToInsertedCeQueueDto(dbSession, dbClient).apply(submission); + if (submitOptions.contains(UNIQUE_QUEUE_PER_COMPONENT) + && submission.getComponentUuid() != null + && dbClient.ceQueueDao().countByStatusAndComponentUuid(dbSession, PENDING, submission.getComponentUuid()) > 0) { + return java.util.Optional.empty(); + } + CeQueueDto dto = addToQueueInDb(dbSession, submission); CeTask task = loadTask(dbSession, dto); dbSession.commit(); - return task; + return java.util.Optional.of(task); } } @Override - public List<CeTask> massSubmit(Collection<CeTaskSubmit> submissions) { + public List<CeTask> massSubmit(Collection<CeTaskSubmit> submissions, SubmitOption... options) { checkState(!submitPaused.get(), "Compute Engine does not currently accept new tasks"); if (submissions.isEmpty()) { return Collections.emptyList(); } try (DbSession dbSession = dbClient.openSession(true)) { - List<CeQueueDto> ceQueueDtos = from(submissions) - .transform(new CeTaskSubmitToInsertedCeQueueDto(dbSession, dbClient)) - .toList(); + List<CeQueueDto> ceQueueDtos = submissions.stream() + .filter(filterBySubmitOptions(options, submissions, dbSession)) + .map(submission -> addToQueueInDb(dbSession, submission)) + .collect(Collectors.toList()); List<CeTask> tasks = loadTasks(dbSession, ceQueueDtos); dbSession.commit(); return tasks; } } + private Predicate<CeTaskSubmit> filterBySubmitOptions(SubmitOption[] options, Collection<CeTaskSubmit> submissions, DbSession dbSession) { + EnumSet<SubmitOption> submitOptions = toSet(options); + + if (submitOptions.contains(UNIQUE_QUEUE_PER_COMPONENT)) { + Set<String> componentUuids = submissions.stream() + .map(CeTaskSubmit::getComponentUuid) + .filter(Objects::nonNull) + .collect(MoreCollectors.toSet(submissions.size())); + if (componentUuids.isEmpty()) { + return t -> true; + } + return new NoPendingTaskFilter(dbSession, componentUuids); + } + + return t -> true; + } + + private class NoPendingTaskFilter implements Predicate<CeTaskSubmit> { + private final Map<String, Integer> queuedItemsByComponentUuid; + + private NoPendingTaskFilter(DbSession dbSession, Set<String> componentUuids) { + queuedItemsByComponentUuid = dbClient.ceQueueDao().countByStatusAndComponentUuids(dbSession, PENDING, componentUuids); + } + + @Override + public boolean test(CeTaskSubmit ceTaskSubmit) { + String componentUuid = ceTaskSubmit.getComponentUuid(); + return componentUuid == null || queuedItemsByComponentUuid.getOrDefault(componentUuid, 0) == 0; + } + } + + private static EnumSet<SubmitOption> toSet(SubmitOption[] options) { + return Arrays.stream(options).collect(toEnumSet(SubmitOption.class)); + } + + private CeQueueDto addToQueueInDb(DbSession dbSession, CeTaskSubmit submission) { + CeQueueDto dto = new CeQueueDto(); + dto.setUuid(submission.getUuid()); + dto.setTaskType(submission.getType()); + dto.setComponentUuid(submission.getComponentUuid()); + dto.setStatus(PENDING); + dto.setSubmitterLogin(submission.getSubmitterLogin()); + dto.setStartedAt(null); + dbClient.ceQueueDao().insert(dbSession, dto); + return dto; + } + protected CeTask loadTask(DbSession dbSession, CeQueueDto dto) { - if (dto.getComponentUuid() == null) { + String componentUuid = dto.getComponentUuid(); + if (componentUuid == null) { return new CeQueueDtoToCeTask(defaultOrganizationProvider.get().getUuid()).apply(dto); } - com.google.common.base.Optional<ComponentDto> componentDto = dbClient.componentDao().selectByUuid(dbSession, dto.getComponentUuid()); + Optional<ComponentDto> componentDto = dbClient.componentDao().selectByUuid(dbSession, componentUuid); if (componentDto.isPresent()) { - return new CeQueueDtoToCeTask(defaultOrganizationProvider.get().getUuid(), ImmutableMap.of(dto.getComponentUuid(), componentDto.get())).apply(dto); + return new CeQueueDtoToCeTask(defaultOrganizationProvider.get().getUuid(), ImmutableMap.of(componentUuid, componentDto.get())).apply(dto); } return new CeQueueDtoToCeTask(defaultOrganizationProvider.get().getUuid()).apply(dto); } private List<CeTask> loadTasks(DbSession dbSession, List<CeQueueDto> dtos) { - Set<String> componentUuids = from(dtos) - .transform(CeQueueDtoToComponentUuid.INSTANCE) - .filter(notNull()) - .toSet(); + Set<String> componentUuids = dtos.stream() + .map(CeQueueDto::getComponentUuid) + .filter(Objects::nonNull) + .collect(Collectors.toSet()); Map<String, ComponentDto> componentDtoByUuid = from(dbClient.componentDao() .selectByUuids(dbSession, componentUuids)) - .uniqueIndex(ComponentDto::uuid); + .uniqueIndex(ComponentDto::uuid); - return from(dtos) - .transform(new CeQueueDtoToCeTask(defaultOrganizationProvider.get().getUuid(), componentDtoByUuid)) - .toList(); + return dtos.stream() + .map(new CeQueueDtoToCeTask(defaultOrganizationProvider.get().getUuid(), componentDtoByUuid)::apply) + .collect(MoreCollectors.toList(dtos.size())); } @Override public void cancel(DbSession dbSession, CeQueueDto ceQueueDto) { - checkState(CeQueueDto.Status.PENDING.equals(ceQueueDto.getStatus()), "Task is in progress and can't be canceled [uuid=%s]", ceQueueDto.getUuid()); + checkState(PENDING.equals(ceQueueDto.getStatus()), "Task is in progress and can't be canceled [uuid=%s]", ceQueueDto.getUuid()); cancelImpl(dbSession, ceQueueDto); } @@ -178,11 +245,11 @@ public class CeQueueImpl implements CeQueue { private final String defaultOrganizationUuid; private final Map<String, ComponentDto> componentDtoByUuid; - public CeQueueDtoToCeTask(String defaultOrganizationUuid) { + private CeQueueDtoToCeTask(String defaultOrganizationUuid) { this(defaultOrganizationUuid, Collections.emptyMap()); } - public CeQueueDtoToCeTask(String defaultOrganizationUuid, Map<String, ComponentDto> componentDtoByUuid) { + private CeQueueDtoToCeTask(String defaultOrganizationUuid, Map<String, ComponentDto> componentDtoByUuid) { this.defaultOrganizationUuid = requireNonNull(defaultOrganizationUuid, "defaultOrganizationUuid can't be null"); this.componentDtoByUuid = componentDtoByUuid; } @@ -212,37 +279,4 @@ public class CeQueueImpl implements CeQueue { } } - private static class CeTaskSubmitToInsertedCeQueueDto implements Function<CeTaskSubmit, CeQueueDto> { - private final DbSession dbSession; - private final DbClient dbClient; - - public CeTaskSubmitToInsertedCeQueueDto(DbSession dbSession, DbClient dbClient) { - this.dbSession = dbSession; - this.dbClient = dbClient; - } - - @Override - @Nonnull - public CeQueueDto apply(@Nonnull CeTaskSubmit submission) { - CeQueueDto dto = new CeQueueDto(); - dto.setUuid(submission.getUuid()); - dto.setTaskType(submission.getType()); - dto.setComponentUuid(submission.getComponentUuid()); - dto.setStatus(CeQueueDto.Status.PENDING); - dto.setSubmitterLogin(submission.getSubmitterLogin()); - dto.setStartedAt(null); - dbClient.ceQueueDao().insert(dbSession, dto); - return dto; - } - } - - private enum CeQueueDtoToComponentUuid implements Function<CeQueueDto, String> { - INSTANCE; - - @Override - @Nullable - public String apply(@Nonnull CeQueueDto input) { - return input.getComponentUuid(); - } - } } diff --git a/server/sonar-server/src/test/java/org/sonar/ce/queue/CeQueueImplTest.java b/server/sonar-server/src/test/java/org/sonar/ce/queue/CeQueueImplTest.java index 961db97a11c..5ed3ab13ae1 100644 --- a/server/sonar-server/src/test/java/org/sonar/ce/queue/CeQueueImplTest.java +++ b/server/sonar-server/src/test/java/org/sonar/ce/queue/CeQueueImplTest.java @@ -19,21 +19,19 @@ */ package org.sonar.ce.queue; -import static java.util.Arrays.asList; -import static org.assertj.core.api.Assertions.assertThat; -import static org.hamcrest.Matchers.startsWith; - import java.util.List; import java.util.Optional; - +import java.util.Random; +import java.util.stream.Collectors; +import java.util.stream.IntStream; import javax.annotation.Nullable; - import org.junit.Rule; import org.junit.Test; import org.junit.rules.ExpectedException; import org.sonar.api.utils.System2; import org.sonar.api.utils.internal.TestSystem2; import org.sonar.core.util.UuidFactory; +import org.sonar.core.util.UuidFactoryFast; import org.sonar.core.util.UuidFactoryImpl; import org.sonar.db.DbSession; import org.sonar.db.DbTester; @@ -45,6 +43,13 @@ import org.sonar.db.component.ComponentTesting; import org.sonar.server.organization.DefaultOrganizationProvider; import org.sonar.server.organization.TestDefaultOrganizationProvider; +import static com.google.common.collect.ImmutableList.of; +import static java.util.Arrays.asList; +import static org.apache.commons.lang.RandomStringUtils.randomAlphabetic; +import static org.assertj.core.api.Assertions.assertThat; +import static org.hamcrest.Matchers.startsWith; +import static org.sonar.ce.queue.CeQueue.SubmitOption.UNIQUE_QUEUE_PER_COMPONENT; + public class CeQueueImplTest { private static final String WORKER_UUID = "workerUuid"; @@ -94,6 +99,96 @@ public class CeQueueImplTest { } @Test + public void submit_with_UNIQUE_QUEUE_PER_COMPONENT_creates_task_without_component_when_there_is_a_pending_task_without_component() { + CeTaskSubmit taskSubmit = createTaskSubmit("no_component"); + CeQueueDto dto = insertPendingInQueue(null); + + Optional<CeTask> task = underTest.submit(taskSubmit, UNIQUE_QUEUE_PER_COMPONENT); + + assertThat(task).isNotEmpty(); + assertThat(db.getDbClient().ceQueueDao().selectAllInAscOrder(db.getSession())) + .extracting(CeQueueDto::getUuid) + .containsOnly(dto.getUuid(), task.get().getUuid()); + } + + @Test + public void submit_with_UNIQUE_QUEUE_PER_COMPONENT_creates_task_when_there_is_a_pending_task_for_another_component() { + String componentUuid = randomAlphabetic(5); + String otherComponentUuid = randomAlphabetic(6); + CeTaskSubmit taskSubmit = createTaskSubmit("with_component", componentUuid, null); + CeQueueDto dto = insertPendingInQueue(otherComponentUuid); + + Optional<CeTask> task = underTest.submit(taskSubmit, UNIQUE_QUEUE_PER_COMPONENT); + + assertThat(task).isNotEmpty(); + assertThat(db.getDbClient().ceQueueDao().selectAllInAscOrder(db.getSession())) + .extracting(CeQueueDto::getUuid) + .containsOnly(dto.getUuid(), task.get().getUuid()); + } + + @Test + public void submit_with_UNIQUE_QUEUE_PER_COMPONENT_does_not_create_task_when_there_is_one_pending_task_for_component() { + String componentUuid = randomAlphabetic(5); + CeTaskSubmit taskSubmit = createTaskSubmit("with_component", componentUuid, null); + CeQueueDto dto = insertPendingInQueue(componentUuid); + + Optional<CeTask> task = underTest.submit(taskSubmit, UNIQUE_QUEUE_PER_COMPONENT); + + assertThat(task).isEmpty(); + assertThat(db.getDbClient().ceQueueDao().selectAllInAscOrder(db.getSession())) + .extracting(CeQueueDto::getUuid) + .containsOnly(dto.getUuid()); + } + + @Test + public void submit_with_UNIQUE_QUEUE_PER_COMPONENT_does_not_create_task_when_there_is_many_pending_task_for_component() { + String componentUuid = randomAlphabetic(5); + CeTaskSubmit taskSubmit = createTaskSubmit("with_component", componentUuid, null); + String[] uuids = IntStream.range(0, 2 + new Random().nextInt(5)) + .mapToObj(i -> insertPendingInQueue(componentUuid)) + .map(CeQueueDto::getUuid) + .toArray(String[]::new); + + Optional<CeTask> task = underTest.submit(taskSubmit, UNIQUE_QUEUE_PER_COMPONENT); + + assertThat(task).isEmpty(); + assertThat(db.getDbClient().ceQueueDao().selectAllInAscOrder(db.getSession())) + .extracting(CeQueueDto::getUuid) + .containsOnly(uuids); + } + + @Test + public void submit_without_UNIQUE_QUEUE_PER_COMPONENT_creates_task_when_there_is_one_pending_task_for_component() { + String componentUuid = randomAlphabetic(5); + CeTaskSubmit taskSubmit = createTaskSubmit("with_component", componentUuid, null); + CeQueueDto dto = insertPendingInQueue(componentUuid); + + CeTask task = underTest.submit(taskSubmit); + + assertThat(db.getDbClient().ceQueueDao().selectAllInAscOrder(db.getSession())) + .extracting(CeQueueDto::getUuid) + .containsOnly(dto.getUuid(), task.getUuid()); + } + + @Test + public void submit_without_UNIQUE_QUEUE_PER_COMPONENT_creates_task_when_there_is_many_pending_task_for_component() { + String componentUuid = randomAlphabetic(5); + CeTaskSubmit taskSubmit = createTaskSubmit("with_component", componentUuid, null); + String[] uuids = IntStream.range(0, 2 + new Random().nextInt(5)) + .mapToObj(i -> insertPendingInQueue(componentUuid)) + .map(CeQueueDto::getUuid) + .toArray(String[]::new); + + CeTask task = underTest.submit(taskSubmit); + + assertThat(db.getDbClient().ceQueueDao().selectAllInAscOrder(db.getSession())) + .extracting(CeQueueDto::getUuid) + .hasSize(uuids.length + 1) + .contains(uuids) + .contains(task.getUuid()); + } + + @Test public void submit_fails_with_ISE_if_paused() { underTest.pauseSubmit(); @@ -131,6 +226,132 @@ public class CeQueueImplTest { } @Test + public void massSubmit_with_UNIQUE_QUEUE_PER_COMPONENT_creates_task_without_component_when_there_is_a_pending_task_without_component() { + CeTaskSubmit taskSubmit = createTaskSubmit("no_component"); + CeQueueDto dto = insertPendingInQueue(null); + + List<CeTask> tasks = underTest.massSubmit(of(taskSubmit), UNIQUE_QUEUE_PER_COMPONENT); + + assertThat(tasks).hasSize(1); + assertThat(db.getDbClient().ceQueueDao().selectAllInAscOrder(db.getSession())) + .extracting(CeQueueDto::getUuid) + .containsOnly(dto.getUuid(), tasks.iterator().next().getUuid()); + } + + @Test + public void massSubmit_with_UNIQUE_QUEUE_PER_COMPONENT_creates_task_when_there_is_a_pending_task_for_another_component() { + String componentUuid = randomAlphabetic(5); + String otherComponentUuid = randomAlphabetic(6); + CeTaskSubmit taskSubmit = createTaskSubmit("with_component", componentUuid, null); + CeQueueDto dto = insertPendingInQueue(otherComponentUuid); + + List<CeTask> tasks = underTest.massSubmit(of(taskSubmit), UNIQUE_QUEUE_PER_COMPONENT); + + assertThat(tasks).hasSize(1); + assertThat(db.getDbClient().ceQueueDao().selectAllInAscOrder(db.getSession())) + .extracting(CeQueueDto::getUuid) + .containsOnly(dto.getUuid(), tasks.iterator().next().getUuid()); + } + + @Test + public void massSubmit_with_UNIQUE_QUEUE_PER_COMPONENT_does_not_create_task_when_there_is_one_pending_task_for_component() { + String componentUuid = randomAlphabetic(5); + CeTaskSubmit taskSubmit = createTaskSubmit("with_component", componentUuid, null); + CeQueueDto dto = insertPendingInQueue(componentUuid); + + List<CeTask> tasks = underTest.massSubmit(of(taskSubmit), UNIQUE_QUEUE_PER_COMPONENT); + + assertThat(tasks).isEmpty(); + assertThat(db.getDbClient().ceQueueDao().selectAllInAscOrder(db.getSession())) + .extracting(CeQueueDto::getUuid) + .containsOnly(dto.getUuid()); + } + + @Test + public void massSubmit_with_UNIQUE_QUEUE_PER_COMPONENT_does_not_create_task_when_there_is_many_pending_task_for_component() { + String componentUuid = randomAlphabetic(5); + CeTaskSubmit taskSubmit = createTaskSubmit("with_component", componentUuid, null); + String[] uuids = IntStream.range(0, 2 + new Random().nextInt(5)) + .mapToObj(i -> insertPendingInQueue(componentUuid)) + .map(CeQueueDto::getUuid) + .toArray(String[]::new); + + List<CeTask> tasks = underTest.massSubmit(of(taskSubmit), UNIQUE_QUEUE_PER_COMPONENT); + + assertThat(tasks).isEmpty(); + assertThat(db.getDbClient().ceQueueDao().selectAllInAscOrder(db.getSession())) + .extracting(CeQueueDto::getUuid) + .containsOnly(uuids); + } + + @Test + public void massSubmit_without_UNIQUE_QUEUE_PER_COMPONENT_creates_task_when_there_is_one_pending_task_for_component() { + String componentUuid = randomAlphabetic(5); + CeTaskSubmit taskSubmit = createTaskSubmit("with_component", componentUuid, null); + CeQueueDto dto = insertPendingInQueue(componentUuid); + + List<CeTask> tasks = underTest.massSubmit(of(taskSubmit)); + + assertThat(tasks).hasSize(1); + assertThat(db.getDbClient().ceQueueDao().selectAllInAscOrder(db.getSession())) + .extracting(CeQueueDto::getUuid) + .containsOnly(dto.getUuid(), tasks.iterator().next().getUuid()); + } + + @Test + public void massSubmit_without_UNIQUE_QUEUE_PER_COMPONENT_creates_task_when_there_is_many_pending_task_for_component() { + String componentUuid = randomAlphabetic(5); + CeTaskSubmit taskSubmit = createTaskSubmit("with_component", componentUuid, null); + String[] uuids = IntStream.range(0, 2 + new Random().nextInt(5)) + .mapToObj(i -> insertPendingInQueue(componentUuid)) + .map(CeQueueDto::getUuid) + .toArray(String[]::new); + + List<CeTask> tasks = underTest.massSubmit(of(taskSubmit)); + + assertThat(tasks).hasSize(1); + assertThat(db.getDbClient().ceQueueDao().selectAllInAscOrder(db.getSession())) + .extracting(CeQueueDto::getUuid) + .hasSize(uuids.length + 1) + .contains(uuids) + .contains(tasks.iterator().next().getUuid()); + } + + @Test + public void massSubmit_with_UNIQUE_QUEUE_PER_COMPONENT_creates_tasks_depending_on_whether_there_is_pending_task_for_component() { + String componentUuid1 = randomAlphabetic(5); + String componentUuid2 = randomAlphabetic(6); + String componentUuid3 = randomAlphabetic(7); + String componentUuid4 = randomAlphabetic(8); + String componentUuid5 = randomAlphabetic(9); + CeTaskSubmit taskSubmit1 = createTaskSubmit("with_one_pending", componentUuid1, null); + CeQueueDto dto1 = insertPendingInQueue(componentUuid1); + CeTaskSubmit taskSubmit2 = createTaskSubmit("no_pending", componentUuid2, null); + CeTaskSubmit taskSubmit3 = createTaskSubmit("with_many_pending", componentUuid3, null); + String[] uuids3 = IntStream.range(0, 2 + new Random().nextInt(5)) + .mapToObj(i -> insertPendingInQueue(componentUuid3)) + .map(CeQueueDto::getUuid) + .toArray(String[]::new); + CeTaskSubmit taskSubmit4 = createTaskSubmit("no_pending_2", componentUuid4, null); + CeTaskSubmit taskSubmit5 = createTaskSubmit("with_pending_2", componentUuid5, null); + CeQueueDto dto5 = insertPendingInQueue(componentUuid5); + + List<CeTask> tasks = underTest.massSubmit(of(taskSubmit1, taskSubmit2, taskSubmit3, taskSubmit4, taskSubmit5), UNIQUE_QUEUE_PER_COMPONENT); + + assertThat(tasks) + .hasSize(2) + .extracting(CeTask::getComponentUuid) + .containsOnly(componentUuid2, componentUuid4); + assertThat(db.getDbClient().ceQueueDao().selectAllInAscOrder(db.getSession())) + .extracting(CeQueueDto::getUuid) + .hasSize(1 + uuids3.length + 1 + tasks.size()) + .contains(dto1.getUuid()) + .contains(uuids3) + .contains(dto5.getUuid()) + .containsAll(tasks.stream().map(CeTask::getUuid).collect(Collectors.toList())); + } + + @Test public void cancel_pending() { CeTask task = submit(CeTaskTypes.REPORT, "PROJECT_1"); CeQueueDto queueDto = db.getDbClient().ceQueueDao().selectByUuid(db.getSession(), task.getUuid()).get(); @@ -230,4 +451,15 @@ public class CeQueueImplTest { session.commit(); return componentDto; } + + private CeQueueDto insertPendingInQueue(@Nullable String componentUuid) { + CeQueueDto dto = new CeQueueDto() + .setUuid(UuidFactoryFast.getInstance().create()) + .setTaskType("some type") + .setComponentUuid(componentUuid) + .setStatus(CeQueueDto.Status.PENDING); + db.getDbClient().ceQueueDao().insert(db.getSession(), dto); + db.commit(); + return dto; + } } |