diff options
author | Sébastien Lesaint <sebastien.lesaint@sonarsource.com> | 2018-06-22 16:08:28 +0200 |
---|---|---|
committer | sonartech <sonartech@sonarsource.com> | 2018-06-29 09:10:14 +0200 |
commit | 697e8d18e6a8b9dca3e345fa355c4b6228e89bdc (patch) | |
tree | 4a02e5b723379f032261e921564e7656219c3063 /server/sonar-ce-common | |
parent | 525ed917b32bfd43f2517c03d500d7a557240ce0 (diff) | |
download | sonarqube-697e8d18e6a8b9dca3e345fa355c4b6228e89bdc.tar.gz sonarqube-697e8d18e6a8b9dca3e345fa355c4b6228e89bdc.zip |
create module sonar-ce-common
Diffstat (limited to 'server/sonar-ce-common')
8 files changed, 1151 insertions, 0 deletions
diff --git a/server/sonar-ce-common/build.gradle b/server/sonar-ce-common/build.gradle new file mode 100644 index 00000000000..c028c302b2f --- /dev/null +++ b/server/sonar-ce-common/build.gradle @@ -0,0 +1,55 @@ +description = 'Code relating to the Compute Engine shared between the Compute Engine and the Web Server' + +sonarqube { + properties { + property 'sonar.projectName', "${projectTitle} :: Compute Engine :: Common" + } +} + +sourceSets { + test { + resources { + srcDirs += ['src/test/projects'] + } + } +} + + +import org.apache.tools.ant.filters.ReplaceTokens +processResources { + filesMatching('build.properties') { + filter ReplaceTokens, tokens: [ + 'buildNumber': release ? 'git rev-parse HEAD'.execute().text.trim() : 'N/A' + ] + } +} + +configurations { + testCompile.extendsFrom compileOnly +} + +dependencies { + // please keep the list grouped by configuration and ordered by name + + compile 'org.slf4j:jul-to-slf4j' + compile 'org.slf4j:slf4j-api' + compile project(':server:sonar-db-dao') + + compileOnly 'com.google.code.findbugs:jsr305' + compileOnly project(':server:sonar-ce-task') + compileOnly project(':server:sonar-server-common') + + testCompile 'com.google.code.findbugs:jsr305' + testCompile 'com.h2database:h2' + testCompile 'com.tngtech.java:junit-dataprovider' + testCompile 'junit:junit' + testCompile 'org.apache.logging.log4j:log4j-api' + testCompile 'org.apache.logging.log4j:log4j-core' + testCompile 'org.assertj:assertj-core' + testCompile 'org.assertj:assertj-guava' + testCompile 'org.mockito:mockito-core' + testCompile project(':sonar-plugin-api') + testCompile project(':sonar-core') + testCompile project(':server:sonar-db-testing') + testCompile project(path: ":server:sonar-server-common", configuration: "tests") +} diff --git a/server/sonar-ce-common/src/main/java/org/sonar/ce/configuration/WorkerCountProvider.java b/server/sonar-ce-common/src/main/java/org/sonar/ce/configuration/WorkerCountProvider.java new file mode 100644 index 00000000000..0b2362785dd --- /dev/null +++ b/server/sonar-ce-common/src/main/java/org/sonar/ce/configuration/WorkerCountProvider.java @@ -0,0 +1,36 @@ +/* + * SonarQube + * Copyright (C) 2009-2018 SonarSource SA + * mailto:info AT sonarsource DOT com + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 3 of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with this program; if not, write to the Free Software Foundation, + * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ +package org.sonar.ce.configuration; + +import org.sonar.api.ce.ComputeEngineSide; +import org.sonar.api.server.ServerSide; + +/** + * When an implementation of this interface is available in Pico, the Compute Engine will use the value returned by + * {@link #get()} as the number of worker the Compute Engine should run on. + */ +@ComputeEngineSide +@ServerSide +public interface WorkerCountProvider { + /** + * @return an integer strictly greater than 0 + */ + int get(); +} diff --git a/server/sonar-ce-common/src/main/java/org/sonar/ce/configuration/package-info.java b/server/sonar-ce-common/src/main/java/org/sonar/ce/configuration/package-info.java new file mode 100644 index 00000000000..95986bdb22b --- /dev/null +++ b/server/sonar-ce-common/src/main/java/org/sonar/ce/configuration/package-info.java @@ -0,0 +1,23 @@ +/* + * SonarQube + * Copyright (C) 2009-2018 SonarSource SA + * mailto:info AT sonarsource DOT com + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 3 of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with this program; if not, write to the Free Software Foundation, + * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ +@ParametersAreNonnullByDefault +package org.sonar.ce.configuration; + +import javax.annotation.ParametersAreNonnullByDefault; diff --git a/server/sonar-ce-common/src/main/java/org/sonar/ce/queue/CeQueue.java b/server/sonar-ce-common/src/main/java/org/sonar/ce/queue/CeQueue.java new file mode 100644 index 00000000000..cb6810f5a86 --- /dev/null +++ b/server/sonar-ce-common/src/main/java/org/sonar/ce/queue/CeQueue.java @@ -0,0 +1,125 @@ +/* + * SonarQube + * Copyright (C) 2009-2018 SonarSource SA + * mailto:info AT sonarsource DOT com + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 3 of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with this program; if not, write to the Free Software Foundation, + * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ +package org.sonar.ce.queue; + +import java.util.Collection; +import java.util.List; +import java.util.Optional; +import org.sonar.ce.task.CeTask; +import org.sonar.db.DbSession; +import org.sonar.db.ce.CeQueueDto; + +/** + * Queue of pending Compute Engine tasks. Both producer and consumer actions + * are implemented. + * <p> + * This class is decoupled from the regular task type {@link org.sonar.db.ce.CeTaskTypes#REPORT}. + * </p> + */ +public interface CeQueue { + /** + * Build an instance of {@link CeTaskSubmit} required for {@link #submit(CeTaskSubmit, SubmitOption...)}. It allows + * to enforce that task ids are generated by the queue. It's used also for having access + * to the id before submitting the task to the queue. + */ + CeTaskSubmit.Builder prepareSubmit(); + + /** + * Submits a task to the queue. The task is processed asynchronously. + * <p> + * Convenience method for calling {@link #submit(CeTaskSubmit, SubmitOption...)} without any {@link SubmitOption} + * and which does not returning an {@link Optional}. + * <p> + * This method is equivalent to calling {@link #massSubmit(Collection, SubmitOption...)} with a singleton list and no + * option. + */ + CeTask submit(CeTaskSubmit submission); + + /** + * Submits a task to the queue. The task is processed asynchronously. + * <p> + * This method is equivalent to calling {@code massSubmit(Collections.singletonList(submission))}. + * + * @return empty if {@code options} contains {@link SubmitOption#UNIQUE_QUEUE_PER_COMPONENT UNIQUE_QUEUE_PER_COMPONENT} + * and there's already a queued task, otherwise the created task. + */ + Optional<CeTask> submit(CeTaskSubmit submission, SubmitOption... options); + + /** + * Submits multiple tasks to the queue at once. All tasks are processed asynchronously. + * <p> + * This method will perform significantly better that calling {@link #submit(CeTaskSubmit, SubmitOption...)} in a loop. + * </p> + */ + List<CeTask> massSubmit(Collection<CeTaskSubmit> submissions, SubmitOption... options); + + /** + * Cancels a task in status {@link org.sonar.db.ce.CeQueueDto.Status#PENDING}. An unchecked + * exception is thrown if the status is not {@link org.sonar.db.ce.CeQueueDto.Status#PENDING}. + */ + void cancel(DbSession dbSession, CeQueueDto ceQueueDto); + + /** + * Removes all the tasks from the queue, except the tasks with status + * {@link org.sonar.db.ce.CeQueueDto.Status#IN_PROGRESS} are ignored. They are marked + * as {@link org.sonar.db.ce.CeActivityDto.Status#CANCELED} in past activity. + * This method can be called at runtime, even if workers are being executed. + * + * @return the number of canceled tasks + */ + int cancelAll(); + + /** + * Requests workers to stop peeking tasks from queue. Does nothing if workers are already paused or being paused. + * The workers that are already processing tasks are not interrupted. + * This method is not restricted to the local workers. All the Compute Engine nodes are paused. + */ + void pauseWorkers(); + + /** + * Resumes workers so that they can peek tasks from queue. + * This method is not restricted to the local workers. All the Compute Engine nodes are paused. + */ + void resumeWorkers(); + + WorkersPauseStatus getWorkersPauseStatus(); + + enum SubmitOption { + UNIQUE_QUEUE_PER_COMPONENT + } + + enum WorkersPauseStatus { + /** + * Pause triggered but at least one task is still in-progress + */ + PAUSING, + + /** + * Paused, no tasks are in-progress. Tasks are pending. + */ + PAUSED, + + /** + * Not paused nor pausing + */ + RESUMED + } + +} diff --git a/server/sonar-ce-common/src/main/java/org/sonar/ce/queue/CeQueueImpl.java b/server/sonar-ce-common/src/main/java/org/sonar/ce/queue/CeQueueImpl.java new file mode 100644 index 00000000000..01b113bd64d --- /dev/null +++ b/server/sonar-ce-common/src/main/java/org/sonar/ce/queue/CeQueueImpl.java @@ -0,0 +1,294 @@ +/* + * SonarQube + * Copyright (C) 2009-2018 SonarSource SA + * mailto:info AT sonarsource DOT com + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 3 of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with this program; if not, write to the Free Software Foundation, + * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ +package org.sonar.ce.queue; + +import com.google.common.base.Function; +import com.google.common.base.Optional; +import com.google.common.collect.ImmutableMap; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.EnumSet; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.function.Predicate; +import java.util.stream.Collectors; +import javax.annotation.Nonnull; +import org.sonar.api.server.ServerSide; +import org.sonar.ce.task.CeTask; +import org.sonar.core.util.UuidFactory; +import org.sonar.core.util.stream.MoreCollectors; +import org.sonar.db.DbClient; +import org.sonar.db.DbSession; +import org.sonar.db.ce.CeActivityDto; +import org.sonar.db.ce.CeQueueDto; +import org.sonar.db.component.ComponentDto; +import org.sonar.server.organization.DefaultOrganizationProvider; +import org.sonar.server.property.InternalProperties; + +import static com.google.common.base.Preconditions.checkState; +import static com.google.common.collect.FluentIterable.from; +import static java.util.Collections.singleton; +import static java.util.Objects.requireNonNull; +import static org.sonar.ce.queue.CeQueue.SubmitOption.UNIQUE_QUEUE_PER_COMPONENT; +import static org.sonar.core.util.stream.MoreCollectors.toEnumSet; +import static org.sonar.db.ce.CeQueueDto.Status.PENDING; + +@ServerSide +public class CeQueueImpl implements CeQueue { + + private final DbClient dbClient; + private final UuidFactory uuidFactory; + private final DefaultOrganizationProvider defaultOrganizationProvider; + + public CeQueueImpl(DbClient dbClient, UuidFactory uuidFactory, DefaultOrganizationProvider defaultOrganizationProvider) { + this.dbClient = dbClient; + this.uuidFactory = uuidFactory; + this.defaultOrganizationProvider = defaultOrganizationProvider; + } + + @Override + public CeTaskSubmit.Builder prepareSubmit() { + return new CeTaskSubmit.Builder(uuidFactory.create()); + } + + @Override + public CeTask submit(CeTaskSubmit submission) { + return submit(submission, EnumSet.noneOf(SubmitOption.class)).get(); + } + + @Override + public java.util.Optional<CeTask> submit(CeTaskSubmit submission, SubmitOption... options) { + return submit(submission, toSet(options)); + } + + private java.util.Optional<CeTask> submit(CeTaskSubmit submission, EnumSet<SubmitOption> submitOptions) { + try (DbSession dbSession = dbClient.openSession(false)) { + if (submitOptions.contains(UNIQUE_QUEUE_PER_COMPONENT) + && submission.getComponentUuid() != null + && dbClient.ceQueueDao().countByStatusAndComponentUuid(dbSession, PENDING, submission.getComponentUuid()) > 0) { + return java.util.Optional.empty(); + } + CeQueueDto dto = addToQueueInDb(dbSession, submission); + CeTask task = loadTask(dbSession, dto); + dbSession.commit(); + return java.util.Optional.of(task); + } + } + + @Override + public List<CeTask> massSubmit(Collection<CeTaskSubmit> submissions, SubmitOption... options) { + if (submissions.isEmpty()) { + return Collections.emptyList(); + } + + try (DbSession dbSession = dbClient.openSession(true)) { + List<CeQueueDto> ceQueueDtos = submissions.stream() + .filter(filterBySubmitOptions(options, submissions, dbSession)) + .map(submission -> addToQueueInDb(dbSession, submission)) + .collect(Collectors.toList()); + List<CeTask> tasks = loadTasks(dbSession, ceQueueDtos); + dbSession.commit(); + return tasks; + } + } + + private Predicate<CeTaskSubmit> filterBySubmitOptions(SubmitOption[] options, Collection<CeTaskSubmit> submissions, DbSession dbSession) { + EnumSet<SubmitOption> submitOptions = toSet(options); + + if (submitOptions.contains(UNIQUE_QUEUE_PER_COMPONENT)) { + Set<String> componentUuids = submissions.stream() + .map(CeTaskSubmit::getComponentUuid) + .filter(Objects::nonNull) + .collect(MoreCollectors.toSet(submissions.size())); + if (componentUuids.isEmpty()) { + return t -> true; + } + return new NoPendingTaskFilter(dbSession, componentUuids); + } + + return t -> true; + } + + private class NoPendingTaskFilter implements Predicate<CeTaskSubmit> { + private final Map<String, Integer> queuedItemsByComponentUuid; + + private NoPendingTaskFilter(DbSession dbSession, Set<String> componentUuids) { + queuedItemsByComponentUuid = dbClient.ceQueueDao().countByStatusAndComponentUuids(dbSession, PENDING, componentUuids); + } + + @Override + public boolean test(CeTaskSubmit ceTaskSubmit) { + String componentUuid = ceTaskSubmit.getComponentUuid(); + return componentUuid == null || queuedItemsByComponentUuid.getOrDefault(componentUuid, 0) == 0; + } + } + + private static EnumSet<SubmitOption> toSet(SubmitOption[] options) { + return Arrays.stream(options).collect(toEnumSet(SubmitOption.class)); + } + + private CeQueueDto addToQueueInDb(DbSession dbSession, CeTaskSubmit submission) { + CeQueueDto dto = new CeQueueDto(); + dto.setUuid(submission.getUuid()); + dto.setTaskType(submission.getType()); + dto.setComponentUuid(submission.getComponentUuid()); + dto.setStatus(PENDING); + dto.setSubmitterUuid(submission.getSubmitterUuid()); + dto.setStartedAt(null); + dbClient.ceQueueDao().insert(dbSession, dto); + return dto; + } + + protected CeTask loadTask(DbSession dbSession, CeQueueDto dto) { + String componentUuid = dto.getComponentUuid(); + if (componentUuid == null) { + return new CeQueueDtoToCeTask(defaultOrganizationProvider.get().getUuid()).apply(dto); + } + Optional<ComponentDto> componentDto = dbClient.componentDao().selectByUuid(dbSession, componentUuid); + if (componentDto.isPresent()) { + return new CeQueueDtoToCeTask(defaultOrganizationProvider.get().getUuid(), ImmutableMap.of(componentUuid, componentDto.get())).apply(dto); + } + return new CeQueueDtoToCeTask(defaultOrganizationProvider.get().getUuid()).apply(dto); + } + + private List<CeTask> loadTasks(DbSession dbSession, List<CeQueueDto> dtos) { + Set<String> componentUuids = dtos.stream() + .map(CeQueueDto::getComponentUuid) + .filter(Objects::nonNull) + .collect(Collectors.toSet()); + Map<String, ComponentDto> componentDtoByUuid = from(dbClient.componentDao() + .selectByUuids(dbSession, componentUuids)) + .uniqueIndex(ComponentDto::uuid); + + return dtos.stream() + .map(new CeQueueDtoToCeTask(defaultOrganizationProvider.get().getUuid(), componentDtoByUuid)::apply) + .collect(MoreCollectors.toList(dtos.size())); + } + + @Override + public void cancel(DbSession dbSession, CeQueueDto ceQueueDto) { + checkState(PENDING.equals(ceQueueDto.getStatus()), "Task is in progress and can't be canceled [uuid=%s]", ceQueueDto.getUuid()); + cancelImpl(dbSession, ceQueueDto); + } + + private void cancelImpl(DbSession dbSession, CeQueueDto q) { + CeActivityDto activityDto = new CeActivityDto(q); + activityDto.setStatus(CeActivityDto.Status.CANCELED); + remove(dbSession, q, activityDto); + } + + @Override + public int cancelAll() { + return cancelAll(false); + } + + protected int cancelAll(boolean includeInProgress) { + int count = 0; + try (DbSession dbSession = dbClient.openSession(false)) { + for (CeQueueDto queueDto : dbClient.ceQueueDao().selectAllInAscOrder(dbSession)) { + if (includeInProgress || !queueDto.getStatus().equals(CeQueueDto.Status.IN_PROGRESS)) { + cancelImpl(dbSession, queueDto); + count++; + } + } + return count; + } + } + + @Override + public void pauseWorkers() { + try (DbSession dbSession = dbClient.openSession(false)) { + dbClient.internalPropertiesDao().save(dbSession, InternalProperties.COMPUTE_ENGINE_PAUSE, "true"); + dbSession.commit(); + } + } + + @Override + public void resumeWorkers() { + try (DbSession dbSession = dbClient.openSession(false)) { + dbClient.internalPropertiesDao().delete(dbSession, InternalProperties.COMPUTE_ENGINE_PAUSE); + dbSession.commit(); + } + } + + @Override + public WorkersPauseStatus getWorkersPauseStatus() { + try (DbSession dbSession = dbClient.openSession(false)) { + java.util.Optional<String> propValue = dbClient.internalPropertiesDao().selectByKey(dbSession, InternalProperties.COMPUTE_ENGINE_PAUSE); + if (!propValue.isPresent() || !propValue.get().equals("true")) { + return WorkersPauseStatus.RESUMED; + } + int countInProgress = dbClient.ceQueueDao().countByStatus(dbSession, CeQueueDto.Status.IN_PROGRESS); + if (countInProgress > 0) { + return WorkersPauseStatus.PAUSING; + } + return WorkersPauseStatus.PAUSED; + } + } + + protected void remove(DbSession dbSession, CeQueueDto queueDto, CeActivityDto activityDto) { + dbClient.ceActivityDao().insert(dbSession, activityDto); + dbClient.ceQueueDao().deleteByUuid(dbSession, queueDto.getUuid()); + dbClient.ceTaskInputDao().deleteByUuids(dbSession, singleton(queueDto.getUuid())); + dbSession.commit(); + } + + private static class CeQueueDtoToCeTask implements Function<CeQueueDto, CeTask> { + private final String defaultOrganizationUuid; + private final Map<String, ComponentDto> componentDtoByUuid; + + private CeQueueDtoToCeTask(String defaultOrganizationUuid) { + this(defaultOrganizationUuid, Collections.emptyMap()); + } + + private CeQueueDtoToCeTask(String defaultOrganizationUuid, Map<String, ComponentDto> componentDtoByUuid) { + this.defaultOrganizationUuid = requireNonNull(defaultOrganizationUuid, "defaultOrganizationUuid can't be null"); + this.componentDtoByUuid = componentDtoByUuid; + } + + @Override + @Nonnull + public CeTask apply(@Nonnull CeQueueDto dto) { + CeTask.Builder builder = new CeTask.Builder(); + builder.setUuid(dto.getUuid()); + builder.setType(dto.getTaskType()); + builder.setSubmitterUuid(dto.getSubmitterUuid()); + String componentUuid = dto.getComponentUuid(); + if (componentUuid != null) { + builder.setComponentUuid(componentUuid); + ComponentDto component = componentDtoByUuid.get(componentUuid); + if (component != null) { + builder.setOrganizationUuid(component.getOrganizationUuid()); + builder.setComponentKey(component.getDbKey()); + builder.setComponentName(component.name()); + } + } + // fixme this should be set from the CeQueueDto + if (!builder.hasOrganizationUuid()) { + builder.setOrganizationUuid(defaultOrganizationUuid); + } + return builder.build(); + } + } + +} diff --git a/server/sonar-ce-common/src/main/java/org/sonar/ce/queue/CeTaskSubmit.java b/server/sonar-ce-common/src/main/java/org/sonar/ce/queue/CeTaskSubmit.java new file mode 100644 index 00000000000..c3eb9083d2d --- /dev/null +++ b/server/sonar-ce-common/src/main/java/org/sonar/ce/queue/CeTaskSubmit.java @@ -0,0 +1,95 @@ +/* + * SonarQube + * Copyright (C) 2009-2018 SonarSource SA + * mailto:info AT sonarsource DOT com + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 3 of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with this program; if not, write to the Free Software Foundation, + * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ +package org.sonar.ce.queue; + +import java.util.Objects; +import javax.annotation.CheckForNull; +import javax.annotation.Nullable; +import javax.annotation.concurrent.Immutable; + +import static com.google.common.base.Strings.emptyToNull; + +@Immutable +public final class CeTaskSubmit { + + private final String uuid; + private final String type; + private final String componentUuid; + private final String submitterUuid; + + private CeTaskSubmit(Builder builder) { + this.uuid = Objects.requireNonNull(emptyToNull(builder.uuid)); + this.type = Objects.requireNonNull(emptyToNull(builder.type)); + this.componentUuid = emptyToNull(builder.componentUuid); + this.submitterUuid = emptyToNull(builder.submitterUuid); + } + + public String getType() { + return type; + } + + public String getUuid() { + return uuid; + } + + @CheckForNull + public String getComponentUuid() { + return componentUuid; + } + + @CheckForNull + public String getSubmitterUuid() { + return submitterUuid; + } + + public static final class Builder { + private final String uuid; + private String type; + private String componentUuid; + private String submitterUuid; + + public Builder(String uuid) { + this.uuid = uuid; + } + + public String getUuid() { + return uuid; + } + + public Builder setType(String s) { + this.type = s; + return this; + } + + public Builder setComponentUuid(@Nullable String s) { + this.componentUuid = s; + return this; + } + + public Builder setSubmitterUuid(@Nullable String s) { + this.submitterUuid = s; + return this; + } + + public CeTaskSubmit build() { + return new CeTaskSubmit(this); + } + } +} diff --git a/server/sonar-ce-common/src/main/java/org/sonar/ce/queue/package-info.java b/server/sonar-ce-common/src/main/java/org/sonar/ce/queue/package-info.java new file mode 100644 index 00000000000..600d3bce96e --- /dev/null +++ b/server/sonar-ce-common/src/main/java/org/sonar/ce/queue/package-info.java @@ -0,0 +1,23 @@ +/* + * SonarQube + * Copyright (C) 2009-2018 SonarSource SA + * mailto:info AT sonarsource DOT com + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 3 of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with this program; if not, write to the Free Software Foundation, + * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ +@ParametersAreNonnullByDefault +package org.sonar.ce.queue; + +import javax.annotation.ParametersAreNonnullByDefault; diff --git a/server/sonar-ce-common/src/test/java/org/sonar/ce/queue/CeQueueImplTest.java b/server/sonar-ce-common/src/test/java/org/sonar/ce/queue/CeQueueImplTest.java new file mode 100644 index 00000000000..8d0c5db35b2 --- /dev/null +++ b/server/sonar-ce-common/src/test/java/org/sonar/ce/queue/CeQueueImplTest.java @@ -0,0 +1,500 @@ +/* + * SonarQube + * Copyright (C) 2009-2018 SonarSource SA + * mailto:info AT sonarsource DOT com + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 3 of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with this program; if not, write to the Free Software Foundation, + * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ +package org.sonar.ce.queue; + +import java.util.List; +import java.util.Optional; +import java.util.Random; +import java.util.stream.Collectors; +import java.util.stream.IntStream; +import javax.annotation.Nullable; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.sonar.api.utils.System2; +import org.sonar.api.utils.internal.TestSystem2; +import org.sonar.ce.task.CeTask; +import org.sonar.core.util.UuidFactory; +import org.sonar.core.util.UuidFactoryFast; +import org.sonar.core.util.UuidFactoryImpl; +import org.sonar.db.DbSession; +import org.sonar.db.DbTester; +import org.sonar.db.ce.CeActivityDto; +import org.sonar.db.ce.CeQueueDto; +import org.sonar.db.ce.CeTaskTypes; +import org.sonar.db.component.ComponentDto; +import org.sonar.db.component.ComponentTesting; +import org.sonar.server.organization.DefaultOrganizationProvider; +import org.sonar.server.organization.TestDefaultOrganizationProvider; + +import static com.google.common.collect.ImmutableList.of; +import static java.util.Arrays.asList; +import static org.apache.commons.lang.RandomStringUtils.randomAlphabetic; +import static org.assertj.core.api.Assertions.assertThat; +import static org.hamcrest.Matchers.startsWith; +import static org.sonar.ce.queue.CeQueue.SubmitOption.UNIQUE_QUEUE_PER_COMPONENT; + +public class CeQueueImplTest { + + private static final String WORKER_UUID = "workerUuid"; + + private System2 system2 = new TestSystem2().setNow(1_450_000_000_000L); + + @Rule + public ExpectedException expectedException = ExpectedException.none(); + @Rule + public DbTester db = DbTester.create(system2); + + private DbSession session = db.getSession(); + + private UuidFactory uuidFactory = UuidFactoryImpl.INSTANCE; + private DefaultOrganizationProvider defaultOrganizationProvider = TestDefaultOrganizationProvider.from(db); + + private CeQueue underTest = new CeQueueImpl(db.getDbClient(), uuidFactory, defaultOrganizationProvider); + + @Test + public void submit_returns_task_populated_from_CeTaskSubmit_and_creates_CeQueue_row() { + CeTaskSubmit taskSubmit = createTaskSubmit(CeTaskTypes.REPORT, "PROJECT_1", "submitter uuid"); + + CeTask task = underTest.submit(taskSubmit); + + verifyCeTask(taskSubmit, task, null); + verifyCeQueueDtoForTaskSubmit(taskSubmit); + } + + @Test + public void submit_populates_component_name_and_key_of_CeTask_if_component_exists() { + ComponentDto componentDto = insertComponent(ComponentTesting.newPrivateProjectDto(db.organizations().insert(), "PROJECT_1")); + CeTaskSubmit taskSubmit = createTaskSubmit(CeTaskTypes.REPORT, componentDto.uuid(), null); + + CeTask task = underTest.submit(taskSubmit); + + verifyCeTask(taskSubmit, task, componentDto); + } + + @Test + public void submit_returns_task_without_component_info_when_submit_has_none() { + CeTaskSubmit taskSubmit = createTaskSubmit("not cpt related"); + + CeTask task = underTest.submit(taskSubmit); + + verifyCeTask(taskSubmit, task, null); + } + + @Test + public void submit_with_UNIQUE_QUEUE_PER_COMPONENT_creates_task_without_component_when_there_is_a_pending_task_without_component() { + CeTaskSubmit taskSubmit = createTaskSubmit("no_component"); + CeQueueDto dto = insertPendingInQueue(null); + + Optional<CeTask> task = underTest.submit(taskSubmit, UNIQUE_QUEUE_PER_COMPONENT); + + assertThat(task).isNotEmpty(); + assertThat(db.getDbClient().ceQueueDao().selectAllInAscOrder(db.getSession())) + .extracting(CeQueueDto::getUuid) + .containsOnly(dto.getUuid(), task.get().getUuid()); + } + + @Test + public void submit_with_UNIQUE_QUEUE_PER_COMPONENT_creates_task_when_there_is_a_pending_task_for_another_component() { + String componentUuid = randomAlphabetic(5); + String otherComponentUuid = randomAlphabetic(6); + CeTaskSubmit taskSubmit = createTaskSubmit("with_component", componentUuid, null); + CeQueueDto dto = insertPendingInQueue(otherComponentUuid); + + Optional<CeTask> task = underTest.submit(taskSubmit, UNIQUE_QUEUE_PER_COMPONENT); + + assertThat(task).isNotEmpty(); + assertThat(db.getDbClient().ceQueueDao().selectAllInAscOrder(db.getSession())) + .extracting(CeQueueDto::getUuid) + .containsOnly(dto.getUuid(), task.get().getUuid()); + } + + @Test + public void submit_with_UNIQUE_QUEUE_PER_COMPONENT_does_not_create_task_when_there_is_one_pending_task_for_component() { + String componentUuid = randomAlphabetic(5); + CeTaskSubmit taskSubmit = createTaskSubmit("with_component", componentUuid, null); + CeQueueDto dto = insertPendingInQueue(componentUuid); + + Optional<CeTask> task = underTest.submit(taskSubmit, UNIQUE_QUEUE_PER_COMPONENT); + + assertThat(task).isEmpty(); + assertThat(db.getDbClient().ceQueueDao().selectAllInAscOrder(db.getSession())) + .extracting(CeQueueDto::getUuid) + .containsOnly(dto.getUuid()); + } + + @Test + public void submit_with_UNIQUE_QUEUE_PER_COMPONENT_does_not_create_task_when_there_is_many_pending_task_for_component() { + String componentUuid = randomAlphabetic(5); + CeTaskSubmit taskSubmit = createTaskSubmit("with_component", componentUuid, null); + String[] uuids = IntStream.range(0, 2 + new Random().nextInt(5)) + .mapToObj(i -> insertPendingInQueue(componentUuid)) + .map(CeQueueDto::getUuid) + .toArray(String[]::new); + + Optional<CeTask> task = underTest.submit(taskSubmit, UNIQUE_QUEUE_PER_COMPONENT); + + assertThat(task).isEmpty(); + assertThat(db.getDbClient().ceQueueDao().selectAllInAscOrder(db.getSession())) + .extracting(CeQueueDto::getUuid) + .containsOnly(uuids); + } + + @Test + public void submit_without_UNIQUE_QUEUE_PER_COMPONENT_creates_task_when_there_is_one_pending_task_for_component() { + String componentUuid = randomAlphabetic(5); + CeTaskSubmit taskSubmit = createTaskSubmit("with_component", componentUuid, null); + CeQueueDto dto = insertPendingInQueue(componentUuid); + + CeTask task = underTest.submit(taskSubmit); + + assertThat(db.getDbClient().ceQueueDao().selectAllInAscOrder(db.getSession())) + .extracting(CeQueueDto::getUuid) + .containsOnly(dto.getUuid(), task.getUuid()); + } + + @Test + public void submit_without_UNIQUE_QUEUE_PER_COMPONENT_creates_task_when_there_is_many_pending_task_for_component() { + String componentUuid = randomAlphabetic(5); + CeTaskSubmit taskSubmit = createTaskSubmit("with_component", componentUuid, null); + String[] uuids = IntStream.range(0, 2 + new Random().nextInt(5)) + .mapToObj(i -> insertPendingInQueue(componentUuid)) + .map(CeQueueDto::getUuid) + .toArray(String[]::new); + + CeTask task = underTest.submit(taskSubmit); + + assertThat(db.getDbClient().ceQueueDao().selectAllInAscOrder(db.getSession())) + .extracting(CeQueueDto::getUuid) + .hasSize(uuids.length + 1) + .contains(uuids) + .contains(task.getUuid()); + } + + @Test + public void massSubmit_returns_tasks_for_each_CeTaskSubmit_populated_from_CeTaskSubmit_and_creates_CeQueue_row_for_each() { + CeTaskSubmit taskSubmit1 = createTaskSubmit(CeTaskTypes.REPORT, "PROJECT_1", "submitter uuid"); + CeTaskSubmit taskSubmit2 = createTaskSubmit("some type"); + + List<CeTask> tasks = underTest.massSubmit(asList(taskSubmit1, taskSubmit2)); + + assertThat(tasks).hasSize(2); + verifyCeTask(taskSubmit1, tasks.get(0), null); + verifyCeTask(taskSubmit2, tasks.get(1), null); + verifyCeQueueDtoForTaskSubmit(taskSubmit1); + verifyCeQueueDtoForTaskSubmit(taskSubmit2); + } + + @Test + public void massSubmit_populates_component_name_and_key_of_CeTask_if_component_exists() { + ComponentDto componentDto1 = insertComponent(ComponentTesting.newPrivateProjectDto(db.getDefaultOrganization(), "PROJECT_1")); + CeTaskSubmit taskSubmit1 = createTaskSubmit(CeTaskTypes.REPORT, componentDto1.uuid(), null); + CeTaskSubmit taskSubmit2 = createTaskSubmit("something", "non existing component uuid", null); + + List<CeTask> tasks = underTest.massSubmit(asList(taskSubmit1, taskSubmit2)); + + assertThat(tasks).hasSize(2); + verifyCeTask(taskSubmit1, tasks.get(0), componentDto1); + verifyCeTask(taskSubmit2, tasks.get(1), null); + } + + @Test + public void massSubmit_with_UNIQUE_QUEUE_PER_COMPONENT_creates_task_without_component_when_there_is_a_pending_task_without_component() { + CeTaskSubmit taskSubmit = createTaskSubmit("no_component"); + CeQueueDto dto = insertPendingInQueue(null); + + List<CeTask> tasks = underTest.massSubmit(of(taskSubmit), UNIQUE_QUEUE_PER_COMPONENT); + + assertThat(tasks).hasSize(1); + assertThat(db.getDbClient().ceQueueDao().selectAllInAscOrder(db.getSession())) + .extracting(CeQueueDto::getUuid) + .containsOnly(dto.getUuid(), tasks.iterator().next().getUuid()); + } + + @Test + public void massSubmit_with_UNIQUE_QUEUE_PER_COMPONENT_creates_task_when_there_is_a_pending_task_for_another_component() { + String componentUuid = randomAlphabetic(5); + String otherComponentUuid = randomAlphabetic(6); + CeTaskSubmit taskSubmit = createTaskSubmit("with_component", componentUuid, null); + CeQueueDto dto = insertPendingInQueue(otherComponentUuid); + + List<CeTask> tasks = underTest.massSubmit(of(taskSubmit), UNIQUE_QUEUE_PER_COMPONENT); + + assertThat(tasks).hasSize(1); + assertThat(db.getDbClient().ceQueueDao().selectAllInAscOrder(db.getSession())) + .extracting(CeQueueDto::getUuid) + .containsOnly(dto.getUuid(), tasks.iterator().next().getUuid()); + } + + @Test + public void massSubmit_with_UNIQUE_QUEUE_PER_COMPONENT_does_not_create_task_when_there_is_one_pending_task_for_component() { + String componentUuid = randomAlphabetic(5); + CeTaskSubmit taskSubmit = createTaskSubmit("with_component", componentUuid, null); + CeQueueDto dto = insertPendingInQueue(componentUuid); + + List<CeTask> tasks = underTest.massSubmit(of(taskSubmit), UNIQUE_QUEUE_PER_COMPONENT); + + assertThat(tasks).isEmpty(); + assertThat(db.getDbClient().ceQueueDao().selectAllInAscOrder(db.getSession())) + .extracting(CeQueueDto::getUuid) + .containsOnly(dto.getUuid()); + } + + @Test + public void massSubmit_with_UNIQUE_QUEUE_PER_COMPONENT_does_not_create_task_when_there_is_many_pending_task_for_component() { + String componentUuid = randomAlphabetic(5); + CeTaskSubmit taskSubmit = createTaskSubmit("with_component", componentUuid, null); + String[] uuids = IntStream.range(0, 2 + new Random().nextInt(5)) + .mapToObj(i -> insertPendingInQueue(componentUuid)) + .map(CeQueueDto::getUuid) + .toArray(String[]::new); + + List<CeTask> tasks = underTest.massSubmit(of(taskSubmit), UNIQUE_QUEUE_PER_COMPONENT); + + assertThat(tasks).isEmpty(); + assertThat(db.getDbClient().ceQueueDao().selectAllInAscOrder(db.getSession())) + .extracting(CeQueueDto::getUuid) + .containsOnly(uuids); + } + + @Test + public void massSubmit_without_UNIQUE_QUEUE_PER_COMPONENT_creates_task_when_there_is_one_pending_task_for_component() { + String componentUuid = randomAlphabetic(5); + CeTaskSubmit taskSubmit = createTaskSubmit("with_component", componentUuid, null); + CeQueueDto dto = insertPendingInQueue(componentUuid); + + List<CeTask> tasks = underTest.massSubmit(of(taskSubmit)); + + assertThat(tasks).hasSize(1); + assertThat(db.getDbClient().ceQueueDao().selectAllInAscOrder(db.getSession())) + .extracting(CeQueueDto::getUuid) + .containsOnly(dto.getUuid(), tasks.iterator().next().getUuid()); + } + + @Test + public void massSubmit_without_UNIQUE_QUEUE_PER_COMPONENT_creates_task_when_there_is_many_pending_task_for_component() { + String componentUuid = randomAlphabetic(5); + CeTaskSubmit taskSubmit = createTaskSubmit("with_component", componentUuid, null); + String[] uuids = IntStream.range(0, 2 + new Random().nextInt(5)) + .mapToObj(i -> insertPendingInQueue(componentUuid)) + .map(CeQueueDto::getUuid) + .toArray(String[]::new); + + List<CeTask> tasks = underTest.massSubmit(of(taskSubmit)); + + assertThat(tasks).hasSize(1); + assertThat(db.getDbClient().ceQueueDao().selectAllInAscOrder(db.getSession())) + .extracting(CeQueueDto::getUuid) + .hasSize(uuids.length + 1) + .contains(uuids) + .contains(tasks.iterator().next().getUuid()); + } + + @Test + public void massSubmit_with_UNIQUE_QUEUE_PER_COMPONENT_creates_tasks_depending_on_whether_there_is_pending_task_for_component() { + String componentUuid1 = randomAlphabetic(5); + String componentUuid2 = randomAlphabetic(6); + String componentUuid3 = randomAlphabetic(7); + String componentUuid4 = randomAlphabetic(8); + String componentUuid5 = randomAlphabetic(9); + CeTaskSubmit taskSubmit1 = createTaskSubmit("with_one_pending", componentUuid1, null); + CeQueueDto dto1 = insertPendingInQueue(componentUuid1); + CeTaskSubmit taskSubmit2 = createTaskSubmit("no_pending", componentUuid2, null); + CeTaskSubmit taskSubmit3 = createTaskSubmit("with_many_pending", componentUuid3, null); + String[] uuids3 = IntStream.range(0, 2 + new Random().nextInt(5)) + .mapToObj(i -> insertPendingInQueue(componentUuid3)) + .map(CeQueueDto::getUuid) + .toArray(String[]::new); + CeTaskSubmit taskSubmit4 = createTaskSubmit("no_pending_2", componentUuid4, null); + CeTaskSubmit taskSubmit5 = createTaskSubmit("with_pending_2", componentUuid5, null); + CeQueueDto dto5 = insertPendingInQueue(componentUuid5); + + List<CeTask> tasks = underTest.massSubmit(of(taskSubmit1, taskSubmit2, taskSubmit3, taskSubmit4, taskSubmit5), UNIQUE_QUEUE_PER_COMPONENT); + + assertThat(tasks) + .hasSize(2) + .extracting(CeTask::getComponentUuid) + .containsOnly(componentUuid2, componentUuid4); + assertThat(db.getDbClient().ceQueueDao().selectAllInAscOrder(db.getSession())) + .extracting(CeQueueDto::getUuid) + .hasSize(1 + uuids3.length + 1 + tasks.size()) + .contains(dto1.getUuid()) + .contains(uuids3) + .contains(dto5.getUuid()) + .containsAll(tasks.stream().map(CeTask::getUuid).collect(Collectors.toList())); + } + + @Test + public void cancel_pending() { + CeTask task = submit(CeTaskTypes.REPORT, "PROJECT_1"); + CeQueueDto queueDto = db.getDbClient().ceQueueDao().selectByUuid(db.getSession(), task.getUuid()).get(); + + underTest.cancel(db.getSession(), queueDto); + + Optional<CeActivityDto> activity = db.getDbClient().ceActivityDao().selectByUuid(db.getSession(), task.getUuid()); + assertThat(activity.isPresent()).isTrue(); + assertThat(activity.get().getStatus()).isEqualTo(CeActivityDto.Status.CANCELED); + } + + @Test + public void fail_to_cancel_if_in_progress() { + submit(CeTaskTypes.REPORT, "PROJECT_1"); + CeQueueDto ceQueueDto = db.getDbClient().ceQueueDao().peek(session, WORKER_UUID).get(); + + expectedException.expect(IllegalStateException.class); + expectedException.expectMessage(startsWith("Task is in progress and can't be canceled")); + + underTest.cancel(db.getSession(), ceQueueDto); + } + + @Test + public void cancelAll_pendings_but_not_in_progress() { + CeTask inProgressTask = submit(CeTaskTypes.REPORT, "PROJECT_1"); + CeTask pendingTask1 = submit(CeTaskTypes.REPORT, "PROJECT_2"); + CeTask pendingTask2 = submit(CeTaskTypes.REPORT, "PROJECT_3"); + + db.getDbClient().ceQueueDao().peek(session, WORKER_UUID); + + int canceledCount = underTest.cancelAll(); + assertThat(canceledCount).isEqualTo(2); + + Optional<CeActivityDto> history = db.getDbClient().ceActivityDao().selectByUuid(db.getSession(), pendingTask1.getUuid()); + assertThat(history.get().getStatus()).isEqualTo(CeActivityDto.Status.CANCELED); + history = db.getDbClient().ceActivityDao().selectByUuid(db.getSession(), pendingTask2.getUuid()); + assertThat(history.get().getStatus()).isEqualTo(CeActivityDto.Status.CANCELED); + history = db.getDbClient().ceActivityDao().selectByUuid(db.getSession(), inProgressTask.getUuid()); + assertThat(history.isPresent()).isFalse(); + } + + @Test + public void pauseWorkers_marks_workers_as_paused_if_zero_tasks_in_progress() { + submit(CeTaskTypes.REPORT, "PROJECT_1"); + // task is pending + + assertThat(underTest.getWorkersPauseStatus()).isEqualTo(CeQueue.WorkersPauseStatus.RESUMED); + + underTest.pauseWorkers(); + assertThat(underTest.getWorkersPauseStatus()).isEqualTo(CeQueue.WorkersPauseStatus.PAUSED); + } + + @Test + public void pauseWorkers_marks_workers_as_pausing_if_some_tasks_in_progress() { + submit(CeTaskTypes.REPORT, "PROJECT_1"); + db.getDbClient().ceQueueDao().peek(session, WORKER_UUID); + // task is in-progress + + assertThat(underTest.getWorkersPauseStatus()).isEqualTo(CeQueue.WorkersPauseStatus.RESUMED); + + underTest.pauseWorkers(); + assertThat(underTest.getWorkersPauseStatus()).isEqualTo(CeQueue.WorkersPauseStatus.PAUSING); + } + + @Test + public void resumeWorkers_does_nothing_if_not_paused() { + assertThat(underTest.getWorkersPauseStatus()).isEqualTo(CeQueue.WorkersPauseStatus.RESUMED); + + underTest.resumeWorkers(); + + assertThat(underTest.getWorkersPauseStatus()).isEqualTo(CeQueue.WorkersPauseStatus.RESUMED); + } + + @Test + public void resumeWorkers_resumes_pausing_workers() { + submit(CeTaskTypes.REPORT, "PROJECT_1"); + db.getDbClient().ceQueueDao().peek(session, WORKER_UUID); + // task is in-progress + + underTest.pauseWorkers(); + assertThat(underTest.getWorkersPauseStatus()).isEqualTo(CeQueue.WorkersPauseStatus.PAUSING); + + underTest.resumeWorkers(); + assertThat(underTest.getWorkersPauseStatus()).isEqualTo(CeQueue.WorkersPauseStatus.RESUMED); + } + + @Test + public void resumeWorkers_resumes_paused_workers() { + underTest.pauseWorkers(); + assertThat(underTest.getWorkersPauseStatus()).isEqualTo(CeQueue.WorkersPauseStatus.PAUSED); + + underTest.resumeWorkers(); + assertThat(underTest.getWorkersPauseStatus()).isEqualTo(CeQueue.WorkersPauseStatus.RESUMED); + } + + private void verifyCeTask(CeTaskSubmit taskSubmit, CeTask task, @Nullable ComponentDto componentDto) { + if (componentDto == null) { + assertThat(task.getOrganizationUuid()).isEqualTo(defaultOrganizationProvider.get().getUuid()); + } else { + assertThat(task.getOrganizationUuid()).isEqualTo(componentDto.getOrganizationUuid()); + } + assertThat(task.getUuid()).isEqualTo(taskSubmit.getUuid()); + assertThat(task.getComponentUuid()).isEqualTo(task.getComponentUuid()); + assertThat(task.getType()).isEqualTo(taskSubmit.getType()); + if (componentDto == null) { + assertThat(task.getComponentKey()).isNull(); + assertThat(task.getComponentName()).isNull(); + } else { + assertThat(task.getComponentKey()).isEqualTo(componentDto.getDbKey()); + assertThat(task.getComponentName()).isEqualTo(componentDto.name()); + } + assertThat(task.getSubmitterUuid()).isEqualTo(taskSubmit.getSubmitterUuid()); + } + + private void verifyCeQueueDtoForTaskSubmit(CeTaskSubmit taskSubmit) { + Optional<CeQueueDto> queueDto = db.getDbClient().ceQueueDao().selectByUuid(db.getSession(), taskSubmit.getUuid()); + assertThat(queueDto.isPresent()).isTrue(); + assertThat(queueDto.get().getTaskType()).isEqualTo(taskSubmit.getType()); + assertThat(queueDto.get().getComponentUuid()).isEqualTo(taskSubmit.getComponentUuid()); + assertThat(queueDto.get().getSubmitterUuid()).isEqualTo(taskSubmit.getSubmitterUuid()); + assertThat(queueDto.get().getCreatedAt()).isEqualTo(1_450_000_000_000L); + } + + private CeTask submit(String reportType, String componentUuid) { + return underTest.submit(createTaskSubmit(reportType, componentUuid, null)); + } + + private CeTaskSubmit createTaskSubmit(String type) { + return createTaskSubmit(type, null, null); + } + + private CeTaskSubmit createTaskSubmit(String type, @Nullable String componentUuid, @Nullable String submitterUuid) { + CeTaskSubmit.Builder submission = underTest.prepareSubmit(); + submission.setType(type); + submission.setComponentUuid(componentUuid); + submission.setSubmitterUuid(submitterUuid); + return submission.build(); + } + + private ComponentDto insertComponent(ComponentDto componentDto) { + db.getDbClient().componentDao().insert(session, componentDto); + session.commit(); + return componentDto; + } + + private CeQueueDto insertPendingInQueue(@Nullable String componentUuid) { + CeQueueDto dto = new CeQueueDto() + .setUuid(UuidFactoryFast.getInstance().create()) + .setTaskType("some type") + .setComponentUuid(componentUuid) + .setStatus(CeQueueDto.Status.PENDING); + db.getDbClient().ceQueueDao().insert(db.getSession(), dto); + db.commit(); + return dto; + } +} |