summaryrefslogtreecommitdiffstats
path: root/server/sonar-ce-common
diff options
context:
space:
mode:
authorSébastien Lesaint <sebastien.lesaint@sonarsource.com>2018-06-22 16:08:28 +0200
committersonartech <sonartech@sonarsource.com>2018-06-29 09:10:14 +0200
commit697e8d18e6a8b9dca3e345fa355c4b6228e89bdc (patch)
tree4a02e5b723379f032261e921564e7656219c3063 /server/sonar-ce-common
parent525ed917b32bfd43f2517c03d500d7a557240ce0 (diff)
downloadsonarqube-697e8d18e6a8b9dca3e345fa355c4b6228e89bdc.tar.gz
sonarqube-697e8d18e6a8b9dca3e345fa355c4b6228e89bdc.zip
create module sonar-ce-common
Diffstat (limited to 'server/sonar-ce-common')
-rw-r--r--server/sonar-ce-common/build.gradle55
-rw-r--r--server/sonar-ce-common/src/main/java/org/sonar/ce/configuration/WorkerCountProvider.java36
-rw-r--r--server/sonar-ce-common/src/main/java/org/sonar/ce/configuration/package-info.java23
-rw-r--r--server/sonar-ce-common/src/main/java/org/sonar/ce/queue/CeQueue.java125
-rw-r--r--server/sonar-ce-common/src/main/java/org/sonar/ce/queue/CeQueueImpl.java294
-rw-r--r--server/sonar-ce-common/src/main/java/org/sonar/ce/queue/CeTaskSubmit.java95
-rw-r--r--server/sonar-ce-common/src/main/java/org/sonar/ce/queue/package-info.java23
-rw-r--r--server/sonar-ce-common/src/test/java/org/sonar/ce/queue/CeQueueImplTest.java500
8 files changed, 1151 insertions, 0 deletions
diff --git a/server/sonar-ce-common/build.gradle b/server/sonar-ce-common/build.gradle
new file mode 100644
index 00000000000..c028c302b2f
--- /dev/null
+++ b/server/sonar-ce-common/build.gradle
@@ -0,0 +1,55 @@
+description = 'Code relating to the Compute Engine shared between the Compute Engine and the Web Server'
+
+sonarqube {
+ properties {
+ property 'sonar.projectName', "${projectTitle} :: Compute Engine :: Common"
+ }
+}
+
+sourceSets {
+ test {
+ resources {
+ srcDirs += ['src/test/projects']
+ }
+ }
+}
+
+
+import org.apache.tools.ant.filters.ReplaceTokens
+processResources {
+ filesMatching('build.properties') {
+ filter ReplaceTokens, tokens: [
+ 'buildNumber': release ? 'git rev-parse HEAD'.execute().text.trim() : 'N/A'
+ ]
+ }
+}
+
+configurations {
+ testCompile.extendsFrom compileOnly
+}
+
+dependencies {
+ // please keep the list grouped by configuration and ordered by name
+
+ compile 'org.slf4j:jul-to-slf4j'
+ compile 'org.slf4j:slf4j-api'
+ compile project(':server:sonar-db-dao')
+
+ compileOnly 'com.google.code.findbugs:jsr305'
+ compileOnly project(':server:sonar-ce-task')
+ compileOnly project(':server:sonar-server-common')
+
+ testCompile 'com.google.code.findbugs:jsr305'
+ testCompile 'com.h2database:h2'
+ testCompile 'com.tngtech.java:junit-dataprovider'
+ testCompile 'junit:junit'
+ testCompile 'org.apache.logging.log4j:log4j-api'
+ testCompile 'org.apache.logging.log4j:log4j-core'
+ testCompile 'org.assertj:assertj-core'
+ testCompile 'org.assertj:assertj-guava'
+ testCompile 'org.mockito:mockito-core'
+ testCompile project(':sonar-plugin-api')
+ testCompile project(':sonar-core')
+ testCompile project(':server:sonar-db-testing')
+ testCompile project(path: ":server:sonar-server-common", configuration: "tests")
+}
diff --git a/server/sonar-ce-common/src/main/java/org/sonar/ce/configuration/WorkerCountProvider.java b/server/sonar-ce-common/src/main/java/org/sonar/ce/configuration/WorkerCountProvider.java
new file mode 100644
index 00000000000..0b2362785dd
--- /dev/null
+++ b/server/sonar-ce-common/src/main/java/org/sonar/ce/configuration/WorkerCountProvider.java
@@ -0,0 +1,36 @@
+/*
+ * SonarQube
+ * Copyright (C) 2009-2018 SonarSource SA
+ * mailto:info AT sonarsource DOT com
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 3 of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this program; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ */
+package org.sonar.ce.configuration;
+
+import org.sonar.api.ce.ComputeEngineSide;
+import org.sonar.api.server.ServerSide;
+
+/**
+ * When an implementation of this interface is available in Pico, the Compute Engine will use the value returned by
+ * {@link #get()} as the number of worker the Compute Engine should run on.
+ */
+@ComputeEngineSide
+@ServerSide
+public interface WorkerCountProvider {
+ /**
+ * @return an integer strictly greater than 0
+ */
+ int get();
+}
diff --git a/server/sonar-ce-common/src/main/java/org/sonar/ce/configuration/package-info.java b/server/sonar-ce-common/src/main/java/org/sonar/ce/configuration/package-info.java
new file mode 100644
index 00000000000..95986bdb22b
--- /dev/null
+++ b/server/sonar-ce-common/src/main/java/org/sonar/ce/configuration/package-info.java
@@ -0,0 +1,23 @@
+/*
+ * SonarQube
+ * Copyright (C) 2009-2018 SonarSource SA
+ * mailto:info AT sonarsource DOT com
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 3 of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this program; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ */
+@ParametersAreNonnullByDefault
+package org.sonar.ce.configuration;
+
+import javax.annotation.ParametersAreNonnullByDefault;
diff --git a/server/sonar-ce-common/src/main/java/org/sonar/ce/queue/CeQueue.java b/server/sonar-ce-common/src/main/java/org/sonar/ce/queue/CeQueue.java
new file mode 100644
index 00000000000..cb6810f5a86
--- /dev/null
+++ b/server/sonar-ce-common/src/main/java/org/sonar/ce/queue/CeQueue.java
@@ -0,0 +1,125 @@
+/*
+ * SonarQube
+ * Copyright (C) 2009-2018 SonarSource SA
+ * mailto:info AT sonarsource DOT com
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 3 of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this program; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ */
+package org.sonar.ce.queue;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Optional;
+import org.sonar.ce.task.CeTask;
+import org.sonar.db.DbSession;
+import org.sonar.db.ce.CeQueueDto;
+
+/**
+ * Queue of pending Compute Engine tasks. Both producer and consumer actions
+ * are implemented.
+ * <p>
+ * This class is decoupled from the regular task type {@link org.sonar.db.ce.CeTaskTypes#REPORT}.
+ * </p>
+ */
+public interface CeQueue {
+ /**
+ * Build an instance of {@link CeTaskSubmit} required for {@link #submit(CeTaskSubmit, SubmitOption...)}. It allows
+ * to enforce that task ids are generated by the queue. It's used also for having access
+ * to the id before submitting the task to the queue.
+ */
+ CeTaskSubmit.Builder prepareSubmit();
+
+ /**
+ * Submits a task to the queue. The task is processed asynchronously.
+ * <p>
+ * Convenience method for calling {@link #submit(CeTaskSubmit, SubmitOption...)} without any {@link SubmitOption}
+ * and which does not returning an {@link Optional}.
+ * <p>
+ * This method is equivalent to calling {@link #massSubmit(Collection, SubmitOption...)} with a singleton list and no
+ * option.
+ */
+ CeTask submit(CeTaskSubmit submission);
+
+ /**
+ * Submits a task to the queue. The task is processed asynchronously.
+ * <p>
+ * This method is equivalent to calling {@code massSubmit(Collections.singletonList(submission))}.
+ *
+ * @return empty if {@code options} contains {@link SubmitOption#UNIQUE_QUEUE_PER_COMPONENT UNIQUE_QUEUE_PER_COMPONENT}
+ * and there's already a queued task, otherwise the created task.
+ */
+ Optional<CeTask> submit(CeTaskSubmit submission, SubmitOption... options);
+
+ /**
+ * Submits multiple tasks to the queue at once. All tasks are processed asynchronously.
+ * <p>
+ * This method will perform significantly better that calling {@link #submit(CeTaskSubmit, SubmitOption...)} in a loop.
+ * </p>
+ */
+ List<CeTask> massSubmit(Collection<CeTaskSubmit> submissions, SubmitOption... options);
+
+ /**
+ * Cancels a task in status {@link org.sonar.db.ce.CeQueueDto.Status#PENDING}. An unchecked
+ * exception is thrown if the status is not {@link org.sonar.db.ce.CeQueueDto.Status#PENDING}.
+ */
+ void cancel(DbSession dbSession, CeQueueDto ceQueueDto);
+
+ /**
+ * Removes all the tasks from the queue, except the tasks with status
+ * {@link org.sonar.db.ce.CeQueueDto.Status#IN_PROGRESS} are ignored. They are marked
+ * as {@link org.sonar.db.ce.CeActivityDto.Status#CANCELED} in past activity.
+ * This method can be called at runtime, even if workers are being executed.
+ *
+ * @return the number of canceled tasks
+ */
+ int cancelAll();
+
+ /**
+ * Requests workers to stop peeking tasks from queue. Does nothing if workers are already paused or being paused.
+ * The workers that are already processing tasks are not interrupted.
+ * This method is not restricted to the local workers. All the Compute Engine nodes are paused.
+ */
+ void pauseWorkers();
+
+ /**
+ * Resumes workers so that they can peek tasks from queue.
+ * This method is not restricted to the local workers. All the Compute Engine nodes are paused.
+ */
+ void resumeWorkers();
+
+ WorkersPauseStatus getWorkersPauseStatus();
+
+ enum SubmitOption {
+ UNIQUE_QUEUE_PER_COMPONENT
+ }
+
+ enum WorkersPauseStatus {
+ /**
+ * Pause triggered but at least one task is still in-progress
+ */
+ PAUSING,
+
+ /**
+ * Paused, no tasks are in-progress. Tasks are pending.
+ */
+ PAUSED,
+
+ /**
+ * Not paused nor pausing
+ */
+ RESUMED
+ }
+
+}
diff --git a/server/sonar-ce-common/src/main/java/org/sonar/ce/queue/CeQueueImpl.java b/server/sonar-ce-common/src/main/java/org/sonar/ce/queue/CeQueueImpl.java
new file mode 100644
index 00000000000..01b113bd64d
--- /dev/null
+++ b/server/sonar-ce-common/src/main/java/org/sonar/ce/queue/CeQueueImpl.java
@@ -0,0 +1,294 @@
+/*
+ * SonarQube
+ * Copyright (C) 2009-2018 SonarSource SA
+ * mailto:info AT sonarsource DOT com
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 3 of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this program; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ */
+package org.sonar.ce.queue;
+
+import com.google.common.base.Function;
+import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableMap;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.EnumSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.function.Predicate;
+import java.util.stream.Collectors;
+import javax.annotation.Nonnull;
+import org.sonar.api.server.ServerSide;
+import org.sonar.ce.task.CeTask;
+import org.sonar.core.util.UuidFactory;
+import org.sonar.core.util.stream.MoreCollectors;
+import org.sonar.db.DbClient;
+import org.sonar.db.DbSession;
+import org.sonar.db.ce.CeActivityDto;
+import org.sonar.db.ce.CeQueueDto;
+import org.sonar.db.component.ComponentDto;
+import org.sonar.server.organization.DefaultOrganizationProvider;
+import org.sonar.server.property.InternalProperties;
+
+import static com.google.common.base.Preconditions.checkState;
+import static com.google.common.collect.FluentIterable.from;
+import static java.util.Collections.singleton;
+import static java.util.Objects.requireNonNull;
+import static org.sonar.ce.queue.CeQueue.SubmitOption.UNIQUE_QUEUE_PER_COMPONENT;
+import static org.sonar.core.util.stream.MoreCollectors.toEnumSet;
+import static org.sonar.db.ce.CeQueueDto.Status.PENDING;
+
+@ServerSide
+public class CeQueueImpl implements CeQueue {
+
+ private final DbClient dbClient;
+ private final UuidFactory uuidFactory;
+ private final DefaultOrganizationProvider defaultOrganizationProvider;
+
+ public CeQueueImpl(DbClient dbClient, UuidFactory uuidFactory, DefaultOrganizationProvider defaultOrganizationProvider) {
+ this.dbClient = dbClient;
+ this.uuidFactory = uuidFactory;
+ this.defaultOrganizationProvider = defaultOrganizationProvider;
+ }
+
+ @Override
+ public CeTaskSubmit.Builder prepareSubmit() {
+ return new CeTaskSubmit.Builder(uuidFactory.create());
+ }
+
+ @Override
+ public CeTask submit(CeTaskSubmit submission) {
+ return submit(submission, EnumSet.noneOf(SubmitOption.class)).get();
+ }
+
+ @Override
+ public java.util.Optional<CeTask> submit(CeTaskSubmit submission, SubmitOption... options) {
+ return submit(submission, toSet(options));
+ }
+
+ private java.util.Optional<CeTask> submit(CeTaskSubmit submission, EnumSet<SubmitOption> submitOptions) {
+ try (DbSession dbSession = dbClient.openSession(false)) {
+ if (submitOptions.contains(UNIQUE_QUEUE_PER_COMPONENT)
+ && submission.getComponentUuid() != null
+ && dbClient.ceQueueDao().countByStatusAndComponentUuid(dbSession, PENDING, submission.getComponentUuid()) > 0) {
+ return java.util.Optional.empty();
+ }
+ CeQueueDto dto = addToQueueInDb(dbSession, submission);
+ CeTask task = loadTask(dbSession, dto);
+ dbSession.commit();
+ return java.util.Optional.of(task);
+ }
+ }
+
+ @Override
+ public List<CeTask> massSubmit(Collection<CeTaskSubmit> submissions, SubmitOption... options) {
+ if (submissions.isEmpty()) {
+ return Collections.emptyList();
+ }
+
+ try (DbSession dbSession = dbClient.openSession(true)) {
+ List<CeQueueDto> ceQueueDtos = submissions.stream()
+ .filter(filterBySubmitOptions(options, submissions, dbSession))
+ .map(submission -> addToQueueInDb(dbSession, submission))
+ .collect(Collectors.toList());
+ List<CeTask> tasks = loadTasks(dbSession, ceQueueDtos);
+ dbSession.commit();
+ return tasks;
+ }
+ }
+
+ private Predicate<CeTaskSubmit> filterBySubmitOptions(SubmitOption[] options, Collection<CeTaskSubmit> submissions, DbSession dbSession) {
+ EnumSet<SubmitOption> submitOptions = toSet(options);
+
+ if (submitOptions.contains(UNIQUE_QUEUE_PER_COMPONENT)) {
+ Set<String> componentUuids = submissions.stream()
+ .map(CeTaskSubmit::getComponentUuid)
+ .filter(Objects::nonNull)
+ .collect(MoreCollectors.toSet(submissions.size()));
+ if (componentUuids.isEmpty()) {
+ return t -> true;
+ }
+ return new NoPendingTaskFilter(dbSession, componentUuids);
+ }
+
+ return t -> true;
+ }
+
+ private class NoPendingTaskFilter implements Predicate<CeTaskSubmit> {
+ private final Map<String, Integer> queuedItemsByComponentUuid;
+
+ private NoPendingTaskFilter(DbSession dbSession, Set<String> componentUuids) {
+ queuedItemsByComponentUuid = dbClient.ceQueueDao().countByStatusAndComponentUuids(dbSession, PENDING, componentUuids);
+ }
+
+ @Override
+ public boolean test(CeTaskSubmit ceTaskSubmit) {
+ String componentUuid = ceTaskSubmit.getComponentUuid();
+ return componentUuid == null || queuedItemsByComponentUuid.getOrDefault(componentUuid, 0) == 0;
+ }
+ }
+
+ private static EnumSet<SubmitOption> toSet(SubmitOption[] options) {
+ return Arrays.stream(options).collect(toEnumSet(SubmitOption.class));
+ }
+
+ private CeQueueDto addToQueueInDb(DbSession dbSession, CeTaskSubmit submission) {
+ CeQueueDto dto = new CeQueueDto();
+ dto.setUuid(submission.getUuid());
+ dto.setTaskType(submission.getType());
+ dto.setComponentUuid(submission.getComponentUuid());
+ dto.setStatus(PENDING);
+ dto.setSubmitterUuid(submission.getSubmitterUuid());
+ dto.setStartedAt(null);
+ dbClient.ceQueueDao().insert(dbSession, dto);
+ return dto;
+ }
+
+ protected CeTask loadTask(DbSession dbSession, CeQueueDto dto) {
+ String componentUuid = dto.getComponentUuid();
+ if (componentUuid == null) {
+ return new CeQueueDtoToCeTask(defaultOrganizationProvider.get().getUuid()).apply(dto);
+ }
+ Optional<ComponentDto> componentDto = dbClient.componentDao().selectByUuid(dbSession, componentUuid);
+ if (componentDto.isPresent()) {
+ return new CeQueueDtoToCeTask(defaultOrganizationProvider.get().getUuid(), ImmutableMap.of(componentUuid, componentDto.get())).apply(dto);
+ }
+ return new CeQueueDtoToCeTask(defaultOrganizationProvider.get().getUuid()).apply(dto);
+ }
+
+ private List<CeTask> loadTasks(DbSession dbSession, List<CeQueueDto> dtos) {
+ Set<String> componentUuids = dtos.stream()
+ .map(CeQueueDto::getComponentUuid)
+ .filter(Objects::nonNull)
+ .collect(Collectors.toSet());
+ Map<String, ComponentDto> componentDtoByUuid = from(dbClient.componentDao()
+ .selectByUuids(dbSession, componentUuids))
+ .uniqueIndex(ComponentDto::uuid);
+
+ return dtos.stream()
+ .map(new CeQueueDtoToCeTask(defaultOrganizationProvider.get().getUuid(), componentDtoByUuid)::apply)
+ .collect(MoreCollectors.toList(dtos.size()));
+ }
+
+ @Override
+ public void cancel(DbSession dbSession, CeQueueDto ceQueueDto) {
+ checkState(PENDING.equals(ceQueueDto.getStatus()), "Task is in progress and can't be canceled [uuid=%s]", ceQueueDto.getUuid());
+ cancelImpl(dbSession, ceQueueDto);
+ }
+
+ private void cancelImpl(DbSession dbSession, CeQueueDto q) {
+ CeActivityDto activityDto = new CeActivityDto(q);
+ activityDto.setStatus(CeActivityDto.Status.CANCELED);
+ remove(dbSession, q, activityDto);
+ }
+
+ @Override
+ public int cancelAll() {
+ return cancelAll(false);
+ }
+
+ protected int cancelAll(boolean includeInProgress) {
+ int count = 0;
+ try (DbSession dbSession = dbClient.openSession(false)) {
+ for (CeQueueDto queueDto : dbClient.ceQueueDao().selectAllInAscOrder(dbSession)) {
+ if (includeInProgress || !queueDto.getStatus().equals(CeQueueDto.Status.IN_PROGRESS)) {
+ cancelImpl(dbSession, queueDto);
+ count++;
+ }
+ }
+ return count;
+ }
+ }
+
+ @Override
+ public void pauseWorkers() {
+ try (DbSession dbSession = dbClient.openSession(false)) {
+ dbClient.internalPropertiesDao().save(dbSession, InternalProperties.COMPUTE_ENGINE_PAUSE, "true");
+ dbSession.commit();
+ }
+ }
+
+ @Override
+ public void resumeWorkers() {
+ try (DbSession dbSession = dbClient.openSession(false)) {
+ dbClient.internalPropertiesDao().delete(dbSession, InternalProperties.COMPUTE_ENGINE_PAUSE);
+ dbSession.commit();
+ }
+ }
+
+ @Override
+ public WorkersPauseStatus getWorkersPauseStatus() {
+ try (DbSession dbSession = dbClient.openSession(false)) {
+ java.util.Optional<String> propValue = dbClient.internalPropertiesDao().selectByKey(dbSession, InternalProperties.COMPUTE_ENGINE_PAUSE);
+ if (!propValue.isPresent() || !propValue.get().equals("true")) {
+ return WorkersPauseStatus.RESUMED;
+ }
+ int countInProgress = dbClient.ceQueueDao().countByStatus(dbSession, CeQueueDto.Status.IN_PROGRESS);
+ if (countInProgress > 0) {
+ return WorkersPauseStatus.PAUSING;
+ }
+ return WorkersPauseStatus.PAUSED;
+ }
+ }
+
+ protected void remove(DbSession dbSession, CeQueueDto queueDto, CeActivityDto activityDto) {
+ dbClient.ceActivityDao().insert(dbSession, activityDto);
+ dbClient.ceQueueDao().deleteByUuid(dbSession, queueDto.getUuid());
+ dbClient.ceTaskInputDao().deleteByUuids(dbSession, singleton(queueDto.getUuid()));
+ dbSession.commit();
+ }
+
+ private static class CeQueueDtoToCeTask implements Function<CeQueueDto, CeTask> {
+ private final String defaultOrganizationUuid;
+ private final Map<String, ComponentDto> componentDtoByUuid;
+
+ private CeQueueDtoToCeTask(String defaultOrganizationUuid) {
+ this(defaultOrganizationUuid, Collections.emptyMap());
+ }
+
+ private CeQueueDtoToCeTask(String defaultOrganizationUuid, Map<String, ComponentDto> componentDtoByUuid) {
+ this.defaultOrganizationUuid = requireNonNull(defaultOrganizationUuid, "defaultOrganizationUuid can't be null");
+ this.componentDtoByUuid = componentDtoByUuid;
+ }
+
+ @Override
+ @Nonnull
+ public CeTask apply(@Nonnull CeQueueDto dto) {
+ CeTask.Builder builder = new CeTask.Builder();
+ builder.setUuid(dto.getUuid());
+ builder.setType(dto.getTaskType());
+ builder.setSubmitterUuid(dto.getSubmitterUuid());
+ String componentUuid = dto.getComponentUuid();
+ if (componentUuid != null) {
+ builder.setComponentUuid(componentUuid);
+ ComponentDto component = componentDtoByUuid.get(componentUuid);
+ if (component != null) {
+ builder.setOrganizationUuid(component.getOrganizationUuid());
+ builder.setComponentKey(component.getDbKey());
+ builder.setComponentName(component.name());
+ }
+ }
+ // fixme this should be set from the CeQueueDto
+ if (!builder.hasOrganizationUuid()) {
+ builder.setOrganizationUuid(defaultOrganizationUuid);
+ }
+ return builder.build();
+ }
+ }
+
+}
diff --git a/server/sonar-ce-common/src/main/java/org/sonar/ce/queue/CeTaskSubmit.java b/server/sonar-ce-common/src/main/java/org/sonar/ce/queue/CeTaskSubmit.java
new file mode 100644
index 00000000000..c3eb9083d2d
--- /dev/null
+++ b/server/sonar-ce-common/src/main/java/org/sonar/ce/queue/CeTaskSubmit.java
@@ -0,0 +1,95 @@
+/*
+ * SonarQube
+ * Copyright (C) 2009-2018 SonarSource SA
+ * mailto:info AT sonarsource DOT com
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 3 of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this program; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ */
+package org.sonar.ce.queue;
+
+import java.util.Objects;
+import javax.annotation.CheckForNull;
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.Immutable;
+
+import static com.google.common.base.Strings.emptyToNull;
+
+@Immutable
+public final class CeTaskSubmit {
+
+ private final String uuid;
+ private final String type;
+ private final String componentUuid;
+ private final String submitterUuid;
+
+ private CeTaskSubmit(Builder builder) {
+ this.uuid = Objects.requireNonNull(emptyToNull(builder.uuid));
+ this.type = Objects.requireNonNull(emptyToNull(builder.type));
+ this.componentUuid = emptyToNull(builder.componentUuid);
+ this.submitterUuid = emptyToNull(builder.submitterUuid);
+ }
+
+ public String getType() {
+ return type;
+ }
+
+ public String getUuid() {
+ return uuid;
+ }
+
+ @CheckForNull
+ public String getComponentUuid() {
+ return componentUuid;
+ }
+
+ @CheckForNull
+ public String getSubmitterUuid() {
+ return submitterUuid;
+ }
+
+ public static final class Builder {
+ private final String uuid;
+ private String type;
+ private String componentUuid;
+ private String submitterUuid;
+
+ public Builder(String uuid) {
+ this.uuid = uuid;
+ }
+
+ public String getUuid() {
+ return uuid;
+ }
+
+ public Builder setType(String s) {
+ this.type = s;
+ return this;
+ }
+
+ public Builder setComponentUuid(@Nullable String s) {
+ this.componentUuid = s;
+ return this;
+ }
+
+ public Builder setSubmitterUuid(@Nullable String s) {
+ this.submitterUuid = s;
+ return this;
+ }
+
+ public CeTaskSubmit build() {
+ return new CeTaskSubmit(this);
+ }
+ }
+}
diff --git a/server/sonar-ce-common/src/main/java/org/sonar/ce/queue/package-info.java b/server/sonar-ce-common/src/main/java/org/sonar/ce/queue/package-info.java
new file mode 100644
index 00000000000..600d3bce96e
--- /dev/null
+++ b/server/sonar-ce-common/src/main/java/org/sonar/ce/queue/package-info.java
@@ -0,0 +1,23 @@
+/*
+ * SonarQube
+ * Copyright (C) 2009-2018 SonarSource SA
+ * mailto:info AT sonarsource DOT com
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 3 of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this program; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ */
+@ParametersAreNonnullByDefault
+package org.sonar.ce.queue;
+
+import javax.annotation.ParametersAreNonnullByDefault;
diff --git a/server/sonar-ce-common/src/test/java/org/sonar/ce/queue/CeQueueImplTest.java b/server/sonar-ce-common/src/test/java/org/sonar/ce/queue/CeQueueImplTest.java
new file mode 100644
index 00000000000..8d0c5db35b2
--- /dev/null
+++ b/server/sonar-ce-common/src/test/java/org/sonar/ce/queue/CeQueueImplTest.java
@@ -0,0 +1,500 @@
+/*
+ * SonarQube
+ * Copyright (C) 2009-2018 SonarSource SA
+ * mailto:info AT sonarsource DOT com
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 3 of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this program; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ */
+package org.sonar.ce.queue;
+
+import java.util.List;
+import java.util.Optional;
+import java.util.Random;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import javax.annotation.Nullable;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.sonar.api.utils.System2;
+import org.sonar.api.utils.internal.TestSystem2;
+import org.sonar.ce.task.CeTask;
+import org.sonar.core.util.UuidFactory;
+import org.sonar.core.util.UuidFactoryFast;
+import org.sonar.core.util.UuidFactoryImpl;
+import org.sonar.db.DbSession;
+import org.sonar.db.DbTester;
+import org.sonar.db.ce.CeActivityDto;
+import org.sonar.db.ce.CeQueueDto;
+import org.sonar.db.ce.CeTaskTypes;
+import org.sonar.db.component.ComponentDto;
+import org.sonar.db.component.ComponentTesting;
+import org.sonar.server.organization.DefaultOrganizationProvider;
+import org.sonar.server.organization.TestDefaultOrganizationProvider;
+
+import static com.google.common.collect.ImmutableList.of;
+import static java.util.Arrays.asList;
+import static org.apache.commons.lang.RandomStringUtils.randomAlphabetic;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.hamcrest.Matchers.startsWith;
+import static org.sonar.ce.queue.CeQueue.SubmitOption.UNIQUE_QUEUE_PER_COMPONENT;
+
+public class CeQueueImplTest {
+
+ private static final String WORKER_UUID = "workerUuid";
+
+ private System2 system2 = new TestSystem2().setNow(1_450_000_000_000L);
+
+ @Rule
+ public ExpectedException expectedException = ExpectedException.none();
+ @Rule
+ public DbTester db = DbTester.create(system2);
+
+ private DbSession session = db.getSession();
+
+ private UuidFactory uuidFactory = UuidFactoryImpl.INSTANCE;
+ private DefaultOrganizationProvider defaultOrganizationProvider = TestDefaultOrganizationProvider.from(db);
+
+ private CeQueue underTest = new CeQueueImpl(db.getDbClient(), uuidFactory, defaultOrganizationProvider);
+
+ @Test
+ public void submit_returns_task_populated_from_CeTaskSubmit_and_creates_CeQueue_row() {
+ CeTaskSubmit taskSubmit = createTaskSubmit(CeTaskTypes.REPORT, "PROJECT_1", "submitter uuid");
+
+ CeTask task = underTest.submit(taskSubmit);
+
+ verifyCeTask(taskSubmit, task, null);
+ verifyCeQueueDtoForTaskSubmit(taskSubmit);
+ }
+
+ @Test
+ public void submit_populates_component_name_and_key_of_CeTask_if_component_exists() {
+ ComponentDto componentDto = insertComponent(ComponentTesting.newPrivateProjectDto(db.organizations().insert(), "PROJECT_1"));
+ CeTaskSubmit taskSubmit = createTaskSubmit(CeTaskTypes.REPORT, componentDto.uuid(), null);
+
+ CeTask task = underTest.submit(taskSubmit);
+
+ verifyCeTask(taskSubmit, task, componentDto);
+ }
+
+ @Test
+ public void submit_returns_task_without_component_info_when_submit_has_none() {
+ CeTaskSubmit taskSubmit = createTaskSubmit("not cpt related");
+
+ CeTask task = underTest.submit(taskSubmit);
+
+ verifyCeTask(taskSubmit, task, null);
+ }
+
+ @Test
+ public void submit_with_UNIQUE_QUEUE_PER_COMPONENT_creates_task_without_component_when_there_is_a_pending_task_without_component() {
+ CeTaskSubmit taskSubmit = createTaskSubmit("no_component");
+ CeQueueDto dto = insertPendingInQueue(null);
+
+ Optional<CeTask> task = underTest.submit(taskSubmit, UNIQUE_QUEUE_PER_COMPONENT);
+
+ assertThat(task).isNotEmpty();
+ assertThat(db.getDbClient().ceQueueDao().selectAllInAscOrder(db.getSession()))
+ .extracting(CeQueueDto::getUuid)
+ .containsOnly(dto.getUuid(), task.get().getUuid());
+ }
+
+ @Test
+ public void submit_with_UNIQUE_QUEUE_PER_COMPONENT_creates_task_when_there_is_a_pending_task_for_another_component() {
+ String componentUuid = randomAlphabetic(5);
+ String otherComponentUuid = randomAlphabetic(6);
+ CeTaskSubmit taskSubmit = createTaskSubmit("with_component", componentUuid, null);
+ CeQueueDto dto = insertPendingInQueue(otherComponentUuid);
+
+ Optional<CeTask> task = underTest.submit(taskSubmit, UNIQUE_QUEUE_PER_COMPONENT);
+
+ assertThat(task).isNotEmpty();
+ assertThat(db.getDbClient().ceQueueDao().selectAllInAscOrder(db.getSession()))
+ .extracting(CeQueueDto::getUuid)
+ .containsOnly(dto.getUuid(), task.get().getUuid());
+ }
+
+ @Test
+ public void submit_with_UNIQUE_QUEUE_PER_COMPONENT_does_not_create_task_when_there_is_one_pending_task_for_component() {
+ String componentUuid = randomAlphabetic(5);
+ CeTaskSubmit taskSubmit = createTaskSubmit("with_component", componentUuid, null);
+ CeQueueDto dto = insertPendingInQueue(componentUuid);
+
+ Optional<CeTask> task = underTest.submit(taskSubmit, UNIQUE_QUEUE_PER_COMPONENT);
+
+ assertThat(task).isEmpty();
+ assertThat(db.getDbClient().ceQueueDao().selectAllInAscOrder(db.getSession()))
+ .extracting(CeQueueDto::getUuid)
+ .containsOnly(dto.getUuid());
+ }
+
+ @Test
+ public void submit_with_UNIQUE_QUEUE_PER_COMPONENT_does_not_create_task_when_there_is_many_pending_task_for_component() {
+ String componentUuid = randomAlphabetic(5);
+ CeTaskSubmit taskSubmit = createTaskSubmit("with_component", componentUuid, null);
+ String[] uuids = IntStream.range(0, 2 + new Random().nextInt(5))
+ .mapToObj(i -> insertPendingInQueue(componentUuid))
+ .map(CeQueueDto::getUuid)
+ .toArray(String[]::new);
+
+ Optional<CeTask> task = underTest.submit(taskSubmit, UNIQUE_QUEUE_PER_COMPONENT);
+
+ assertThat(task).isEmpty();
+ assertThat(db.getDbClient().ceQueueDao().selectAllInAscOrder(db.getSession()))
+ .extracting(CeQueueDto::getUuid)
+ .containsOnly(uuids);
+ }
+
+ @Test
+ public void submit_without_UNIQUE_QUEUE_PER_COMPONENT_creates_task_when_there_is_one_pending_task_for_component() {
+ String componentUuid = randomAlphabetic(5);
+ CeTaskSubmit taskSubmit = createTaskSubmit("with_component", componentUuid, null);
+ CeQueueDto dto = insertPendingInQueue(componentUuid);
+
+ CeTask task = underTest.submit(taskSubmit);
+
+ assertThat(db.getDbClient().ceQueueDao().selectAllInAscOrder(db.getSession()))
+ .extracting(CeQueueDto::getUuid)
+ .containsOnly(dto.getUuid(), task.getUuid());
+ }
+
+ @Test
+ public void submit_without_UNIQUE_QUEUE_PER_COMPONENT_creates_task_when_there_is_many_pending_task_for_component() {
+ String componentUuid = randomAlphabetic(5);
+ CeTaskSubmit taskSubmit = createTaskSubmit("with_component", componentUuid, null);
+ String[] uuids = IntStream.range(0, 2 + new Random().nextInt(5))
+ .mapToObj(i -> insertPendingInQueue(componentUuid))
+ .map(CeQueueDto::getUuid)
+ .toArray(String[]::new);
+
+ CeTask task = underTest.submit(taskSubmit);
+
+ assertThat(db.getDbClient().ceQueueDao().selectAllInAscOrder(db.getSession()))
+ .extracting(CeQueueDto::getUuid)
+ .hasSize(uuids.length + 1)
+ .contains(uuids)
+ .contains(task.getUuid());
+ }
+
+ @Test
+ public void massSubmit_returns_tasks_for_each_CeTaskSubmit_populated_from_CeTaskSubmit_and_creates_CeQueue_row_for_each() {
+ CeTaskSubmit taskSubmit1 = createTaskSubmit(CeTaskTypes.REPORT, "PROJECT_1", "submitter uuid");
+ CeTaskSubmit taskSubmit2 = createTaskSubmit("some type");
+
+ List<CeTask> tasks = underTest.massSubmit(asList(taskSubmit1, taskSubmit2));
+
+ assertThat(tasks).hasSize(2);
+ verifyCeTask(taskSubmit1, tasks.get(0), null);
+ verifyCeTask(taskSubmit2, tasks.get(1), null);
+ verifyCeQueueDtoForTaskSubmit(taskSubmit1);
+ verifyCeQueueDtoForTaskSubmit(taskSubmit2);
+ }
+
+ @Test
+ public void massSubmit_populates_component_name_and_key_of_CeTask_if_component_exists() {
+ ComponentDto componentDto1 = insertComponent(ComponentTesting.newPrivateProjectDto(db.getDefaultOrganization(), "PROJECT_1"));
+ CeTaskSubmit taskSubmit1 = createTaskSubmit(CeTaskTypes.REPORT, componentDto1.uuid(), null);
+ CeTaskSubmit taskSubmit2 = createTaskSubmit("something", "non existing component uuid", null);
+
+ List<CeTask> tasks = underTest.massSubmit(asList(taskSubmit1, taskSubmit2));
+
+ assertThat(tasks).hasSize(2);
+ verifyCeTask(taskSubmit1, tasks.get(0), componentDto1);
+ verifyCeTask(taskSubmit2, tasks.get(1), null);
+ }
+
+ @Test
+ public void massSubmit_with_UNIQUE_QUEUE_PER_COMPONENT_creates_task_without_component_when_there_is_a_pending_task_without_component() {
+ CeTaskSubmit taskSubmit = createTaskSubmit("no_component");
+ CeQueueDto dto = insertPendingInQueue(null);
+
+ List<CeTask> tasks = underTest.massSubmit(of(taskSubmit), UNIQUE_QUEUE_PER_COMPONENT);
+
+ assertThat(tasks).hasSize(1);
+ assertThat(db.getDbClient().ceQueueDao().selectAllInAscOrder(db.getSession()))
+ .extracting(CeQueueDto::getUuid)
+ .containsOnly(dto.getUuid(), tasks.iterator().next().getUuid());
+ }
+
+ @Test
+ public void massSubmit_with_UNIQUE_QUEUE_PER_COMPONENT_creates_task_when_there_is_a_pending_task_for_another_component() {
+ String componentUuid = randomAlphabetic(5);
+ String otherComponentUuid = randomAlphabetic(6);
+ CeTaskSubmit taskSubmit = createTaskSubmit("with_component", componentUuid, null);
+ CeQueueDto dto = insertPendingInQueue(otherComponentUuid);
+
+ List<CeTask> tasks = underTest.massSubmit(of(taskSubmit), UNIQUE_QUEUE_PER_COMPONENT);
+
+ assertThat(tasks).hasSize(1);
+ assertThat(db.getDbClient().ceQueueDao().selectAllInAscOrder(db.getSession()))
+ .extracting(CeQueueDto::getUuid)
+ .containsOnly(dto.getUuid(), tasks.iterator().next().getUuid());
+ }
+
+ @Test
+ public void massSubmit_with_UNIQUE_QUEUE_PER_COMPONENT_does_not_create_task_when_there_is_one_pending_task_for_component() {
+ String componentUuid = randomAlphabetic(5);
+ CeTaskSubmit taskSubmit = createTaskSubmit("with_component", componentUuid, null);
+ CeQueueDto dto = insertPendingInQueue(componentUuid);
+
+ List<CeTask> tasks = underTest.massSubmit(of(taskSubmit), UNIQUE_QUEUE_PER_COMPONENT);
+
+ assertThat(tasks).isEmpty();
+ assertThat(db.getDbClient().ceQueueDao().selectAllInAscOrder(db.getSession()))
+ .extracting(CeQueueDto::getUuid)
+ .containsOnly(dto.getUuid());
+ }
+
+ @Test
+ public void massSubmit_with_UNIQUE_QUEUE_PER_COMPONENT_does_not_create_task_when_there_is_many_pending_task_for_component() {
+ String componentUuid = randomAlphabetic(5);
+ CeTaskSubmit taskSubmit = createTaskSubmit("with_component", componentUuid, null);
+ String[] uuids = IntStream.range(0, 2 + new Random().nextInt(5))
+ .mapToObj(i -> insertPendingInQueue(componentUuid))
+ .map(CeQueueDto::getUuid)
+ .toArray(String[]::new);
+
+ List<CeTask> tasks = underTest.massSubmit(of(taskSubmit), UNIQUE_QUEUE_PER_COMPONENT);
+
+ assertThat(tasks).isEmpty();
+ assertThat(db.getDbClient().ceQueueDao().selectAllInAscOrder(db.getSession()))
+ .extracting(CeQueueDto::getUuid)
+ .containsOnly(uuids);
+ }
+
+ @Test
+ public void massSubmit_without_UNIQUE_QUEUE_PER_COMPONENT_creates_task_when_there_is_one_pending_task_for_component() {
+ String componentUuid = randomAlphabetic(5);
+ CeTaskSubmit taskSubmit = createTaskSubmit("with_component", componentUuid, null);
+ CeQueueDto dto = insertPendingInQueue(componentUuid);
+
+ List<CeTask> tasks = underTest.massSubmit(of(taskSubmit));
+
+ assertThat(tasks).hasSize(1);
+ assertThat(db.getDbClient().ceQueueDao().selectAllInAscOrder(db.getSession()))
+ .extracting(CeQueueDto::getUuid)
+ .containsOnly(dto.getUuid(), tasks.iterator().next().getUuid());
+ }
+
+ @Test
+ public void massSubmit_without_UNIQUE_QUEUE_PER_COMPONENT_creates_task_when_there_is_many_pending_task_for_component() {
+ String componentUuid = randomAlphabetic(5);
+ CeTaskSubmit taskSubmit = createTaskSubmit("with_component", componentUuid, null);
+ String[] uuids = IntStream.range(0, 2 + new Random().nextInt(5))
+ .mapToObj(i -> insertPendingInQueue(componentUuid))
+ .map(CeQueueDto::getUuid)
+ .toArray(String[]::new);
+
+ List<CeTask> tasks = underTest.massSubmit(of(taskSubmit));
+
+ assertThat(tasks).hasSize(1);
+ assertThat(db.getDbClient().ceQueueDao().selectAllInAscOrder(db.getSession()))
+ .extracting(CeQueueDto::getUuid)
+ .hasSize(uuids.length + 1)
+ .contains(uuids)
+ .contains(tasks.iterator().next().getUuid());
+ }
+
+ @Test
+ public void massSubmit_with_UNIQUE_QUEUE_PER_COMPONENT_creates_tasks_depending_on_whether_there_is_pending_task_for_component() {
+ String componentUuid1 = randomAlphabetic(5);
+ String componentUuid2 = randomAlphabetic(6);
+ String componentUuid3 = randomAlphabetic(7);
+ String componentUuid4 = randomAlphabetic(8);
+ String componentUuid5 = randomAlphabetic(9);
+ CeTaskSubmit taskSubmit1 = createTaskSubmit("with_one_pending", componentUuid1, null);
+ CeQueueDto dto1 = insertPendingInQueue(componentUuid1);
+ CeTaskSubmit taskSubmit2 = createTaskSubmit("no_pending", componentUuid2, null);
+ CeTaskSubmit taskSubmit3 = createTaskSubmit("with_many_pending", componentUuid3, null);
+ String[] uuids3 = IntStream.range(0, 2 + new Random().nextInt(5))
+ .mapToObj(i -> insertPendingInQueue(componentUuid3))
+ .map(CeQueueDto::getUuid)
+ .toArray(String[]::new);
+ CeTaskSubmit taskSubmit4 = createTaskSubmit("no_pending_2", componentUuid4, null);
+ CeTaskSubmit taskSubmit5 = createTaskSubmit("with_pending_2", componentUuid5, null);
+ CeQueueDto dto5 = insertPendingInQueue(componentUuid5);
+
+ List<CeTask> tasks = underTest.massSubmit(of(taskSubmit1, taskSubmit2, taskSubmit3, taskSubmit4, taskSubmit5), UNIQUE_QUEUE_PER_COMPONENT);
+
+ assertThat(tasks)
+ .hasSize(2)
+ .extracting(CeTask::getComponentUuid)
+ .containsOnly(componentUuid2, componentUuid4);
+ assertThat(db.getDbClient().ceQueueDao().selectAllInAscOrder(db.getSession()))
+ .extracting(CeQueueDto::getUuid)
+ .hasSize(1 + uuids3.length + 1 + tasks.size())
+ .contains(dto1.getUuid())
+ .contains(uuids3)
+ .contains(dto5.getUuid())
+ .containsAll(tasks.stream().map(CeTask::getUuid).collect(Collectors.toList()));
+ }
+
+ @Test
+ public void cancel_pending() {
+ CeTask task = submit(CeTaskTypes.REPORT, "PROJECT_1");
+ CeQueueDto queueDto = db.getDbClient().ceQueueDao().selectByUuid(db.getSession(), task.getUuid()).get();
+
+ underTest.cancel(db.getSession(), queueDto);
+
+ Optional<CeActivityDto> activity = db.getDbClient().ceActivityDao().selectByUuid(db.getSession(), task.getUuid());
+ assertThat(activity.isPresent()).isTrue();
+ assertThat(activity.get().getStatus()).isEqualTo(CeActivityDto.Status.CANCELED);
+ }
+
+ @Test
+ public void fail_to_cancel_if_in_progress() {
+ submit(CeTaskTypes.REPORT, "PROJECT_1");
+ CeQueueDto ceQueueDto = db.getDbClient().ceQueueDao().peek(session, WORKER_UUID).get();
+
+ expectedException.expect(IllegalStateException.class);
+ expectedException.expectMessage(startsWith("Task is in progress and can't be canceled"));
+
+ underTest.cancel(db.getSession(), ceQueueDto);
+ }
+
+ @Test
+ public void cancelAll_pendings_but_not_in_progress() {
+ CeTask inProgressTask = submit(CeTaskTypes.REPORT, "PROJECT_1");
+ CeTask pendingTask1 = submit(CeTaskTypes.REPORT, "PROJECT_2");
+ CeTask pendingTask2 = submit(CeTaskTypes.REPORT, "PROJECT_3");
+
+ db.getDbClient().ceQueueDao().peek(session, WORKER_UUID);
+
+ int canceledCount = underTest.cancelAll();
+ assertThat(canceledCount).isEqualTo(2);
+
+ Optional<CeActivityDto> history = db.getDbClient().ceActivityDao().selectByUuid(db.getSession(), pendingTask1.getUuid());
+ assertThat(history.get().getStatus()).isEqualTo(CeActivityDto.Status.CANCELED);
+ history = db.getDbClient().ceActivityDao().selectByUuid(db.getSession(), pendingTask2.getUuid());
+ assertThat(history.get().getStatus()).isEqualTo(CeActivityDto.Status.CANCELED);
+ history = db.getDbClient().ceActivityDao().selectByUuid(db.getSession(), inProgressTask.getUuid());
+ assertThat(history.isPresent()).isFalse();
+ }
+
+ @Test
+ public void pauseWorkers_marks_workers_as_paused_if_zero_tasks_in_progress() {
+ submit(CeTaskTypes.REPORT, "PROJECT_1");
+ // task is pending
+
+ assertThat(underTest.getWorkersPauseStatus()).isEqualTo(CeQueue.WorkersPauseStatus.RESUMED);
+
+ underTest.pauseWorkers();
+ assertThat(underTest.getWorkersPauseStatus()).isEqualTo(CeQueue.WorkersPauseStatus.PAUSED);
+ }
+
+ @Test
+ public void pauseWorkers_marks_workers_as_pausing_if_some_tasks_in_progress() {
+ submit(CeTaskTypes.REPORT, "PROJECT_1");
+ db.getDbClient().ceQueueDao().peek(session, WORKER_UUID);
+ // task is in-progress
+
+ assertThat(underTest.getWorkersPauseStatus()).isEqualTo(CeQueue.WorkersPauseStatus.RESUMED);
+
+ underTest.pauseWorkers();
+ assertThat(underTest.getWorkersPauseStatus()).isEqualTo(CeQueue.WorkersPauseStatus.PAUSING);
+ }
+
+ @Test
+ public void resumeWorkers_does_nothing_if_not_paused() {
+ assertThat(underTest.getWorkersPauseStatus()).isEqualTo(CeQueue.WorkersPauseStatus.RESUMED);
+
+ underTest.resumeWorkers();
+
+ assertThat(underTest.getWorkersPauseStatus()).isEqualTo(CeQueue.WorkersPauseStatus.RESUMED);
+ }
+
+ @Test
+ public void resumeWorkers_resumes_pausing_workers() {
+ submit(CeTaskTypes.REPORT, "PROJECT_1");
+ db.getDbClient().ceQueueDao().peek(session, WORKER_UUID);
+ // task is in-progress
+
+ underTest.pauseWorkers();
+ assertThat(underTest.getWorkersPauseStatus()).isEqualTo(CeQueue.WorkersPauseStatus.PAUSING);
+
+ underTest.resumeWorkers();
+ assertThat(underTest.getWorkersPauseStatus()).isEqualTo(CeQueue.WorkersPauseStatus.RESUMED);
+ }
+
+ @Test
+ public void resumeWorkers_resumes_paused_workers() {
+ underTest.pauseWorkers();
+ assertThat(underTest.getWorkersPauseStatus()).isEqualTo(CeQueue.WorkersPauseStatus.PAUSED);
+
+ underTest.resumeWorkers();
+ assertThat(underTest.getWorkersPauseStatus()).isEqualTo(CeQueue.WorkersPauseStatus.RESUMED);
+ }
+
+ private void verifyCeTask(CeTaskSubmit taskSubmit, CeTask task, @Nullable ComponentDto componentDto) {
+ if (componentDto == null) {
+ assertThat(task.getOrganizationUuid()).isEqualTo(defaultOrganizationProvider.get().getUuid());
+ } else {
+ assertThat(task.getOrganizationUuid()).isEqualTo(componentDto.getOrganizationUuid());
+ }
+ assertThat(task.getUuid()).isEqualTo(taskSubmit.getUuid());
+ assertThat(task.getComponentUuid()).isEqualTo(task.getComponentUuid());
+ assertThat(task.getType()).isEqualTo(taskSubmit.getType());
+ if (componentDto == null) {
+ assertThat(task.getComponentKey()).isNull();
+ assertThat(task.getComponentName()).isNull();
+ } else {
+ assertThat(task.getComponentKey()).isEqualTo(componentDto.getDbKey());
+ assertThat(task.getComponentName()).isEqualTo(componentDto.name());
+ }
+ assertThat(task.getSubmitterUuid()).isEqualTo(taskSubmit.getSubmitterUuid());
+ }
+
+ private void verifyCeQueueDtoForTaskSubmit(CeTaskSubmit taskSubmit) {
+ Optional<CeQueueDto> queueDto = db.getDbClient().ceQueueDao().selectByUuid(db.getSession(), taskSubmit.getUuid());
+ assertThat(queueDto.isPresent()).isTrue();
+ assertThat(queueDto.get().getTaskType()).isEqualTo(taskSubmit.getType());
+ assertThat(queueDto.get().getComponentUuid()).isEqualTo(taskSubmit.getComponentUuid());
+ assertThat(queueDto.get().getSubmitterUuid()).isEqualTo(taskSubmit.getSubmitterUuid());
+ assertThat(queueDto.get().getCreatedAt()).isEqualTo(1_450_000_000_000L);
+ }
+
+ private CeTask submit(String reportType, String componentUuid) {
+ return underTest.submit(createTaskSubmit(reportType, componentUuid, null));
+ }
+
+ private CeTaskSubmit createTaskSubmit(String type) {
+ return createTaskSubmit(type, null, null);
+ }
+
+ private CeTaskSubmit createTaskSubmit(String type, @Nullable String componentUuid, @Nullable String submitterUuid) {
+ CeTaskSubmit.Builder submission = underTest.prepareSubmit();
+ submission.setType(type);
+ submission.setComponentUuid(componentUuid);
+ submission.setSubmitterUuid(submitterUuid);
+ return submission.build();
+ }
+
+ private ComponentDto insertComponent(ComponentDto componentDto) {
+ db.getDbClient().componentDao().insert(session, componentDto);
+ session.commit();
+ return componentDto;
+ }
+
+ private CeQueueDto insertPendingInQueue(@Nullable String componentUuid) {
+ CeQueueDto dto = new CeQueueDto()
+ .setUuid(UuidFactoryFast.getInstance().create())
+ .setTaskType("some type")
+ .setComponentUuid(componentUuid)
+ .setStatus(CeQueueDto.Status.PENDING);
+ db.getDbClient().ceQueueDao().insert(db.getSession(), dto);
+ db.commit();
+ return dto;
+ }
+}