*/
package org.sonar.server.computation.queue;
+import com.google.common.base.Function;
import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableMap;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
+import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.sonar.api.server.ServerSide;
import org.sonar.api.utils.System2;
import org.sonar.server.computation.monitoring.CEQueueStatus;
import static com.google.common.base.Preconditions.checkState;
+import static com.google.common.base.Predicates.notNull;
+import static com.google.common.collect.FluentIterable.from;
import static java.lang.String.format;
+import static org.sonar.db.component.ComponentDtoFunctions.toUuid;
@ServerSide
public class CeQueueImpl implements CeQueue {
DbSession dbSession = dbClient.openSession(false);
try {
- CeQueueDto dto = new CeQueueDto();
- dto.setUuid(submission.getUuid());
- dto.setTaskType(submission.getType());
- dto.setComponentUuid(submission.getComponentUuid());
- dto.setStatus(CeQueueDto.Status.PENDING);
- dto.setSubmitterLogin(submission.getSubmitterLogin());
- dto.setStartedAt(null);
- dbClient.ceQueueDao().insert(dbSession, dto);
+ CeQueueDto dto = new CeTaskSubmitToInsertedCeQueueDto(dbSession, dbClient).apply(submission);
CeTask task = loadTask(dbSession, dto);
dbSession.commit();
queueStatus.addReceived();
}
}
+ @Override
+ public List<CeTask> massSubmit(Collection<CeTaskSubmit> submissions) {
+ checkState(!submitPaused.get(), "Compute Engine does not currently accept new tasks");
+ if (submissions.isEmpty()) {
+ return Collections.emptyList();
+ }
+
+ DbSession dbSession = dbClient.openSession(true);
+ try {
+ List<CeQueueDto> ceQueueDtos = from(submissions)
+ .transform(new CeTaskSubmitToInsertedCeQueueDto(dbSession, dbClient))
+ .toList();
+ List<CeTask> tasks = loadTasks(dbSession, ceQueueDtos);
+ dbSession.commit();
+ queueStatus.addReceived(tasks.size());
+ return tasks;
+
+ } finally {
+ dbClient.closeSession(dbSession);
+ }
+ }
+
@Override
public Optional<CeTask> peek() {
if (peekPaused.get()) {
}
private CeTask loadTask(DbSession dbSession, CeQueueDto dto) {
- CeTask.Builder builder = new CeTask.Builder();
- builder.setUuid(dto.getUuid());
- builder.setType(dto.getTaskType());
- builder.setSubmitterLogin(dto.getSubmitterLogin());
- String componentUuid = dto.getComponentUuid();
- if (componentUuid != null) {
- builder.setComponentUuid(componentUuid);
- Optional<ComponentDto> component = dbClient.componentDao().selectByUuid(dbSession, componentUuid);
- if (component.isPresent()) {
- builder.setComponentKey(component.get().getKey());
- builder.setComponentName(component.get().name());
- }
+ if (dto.getComponentUuid() == null) {
+ return new CeQueueDtoToCeTask().apply(dto);
}
- return builder.build();
+ Optional<ComponentDto> componentDto = dbClient.componentDao().selectByUuid(dbSession, dto.getComponentUuid());
+ if (componentDto.isPresent()) {
+ return new CeQueueDtoToCeTask(ImmutableMap.of(dto.getComponentUuid(), componentDto.get())).apply(dto);
+ }
+ return new CeQueueDtoToCeTask().apply(dto);
+ }
+
+ private List<CeTask> loadTasks(DbSession dbSession, List<CeQueueDto> dtos) {
+ Set<String> componentUuids = from(dtos)
+ .transform(CeQueueDtoToComponentUuid.INSTANCE)
+ .filter(notNull())
+ .toSet();
+ Map<String, ComponentDto> componentDtoByUuid = from(dbClient.componentDao()
+ .selectByUuids(dbSession, componentUuids))
+ .uniqueIndex(toUuid());
+
+ return from(dtos)
+ .transform(new CeQueueDtoToCeTask(componentDtoByUuid))
+ .toList();
}
@Override
public boolean isPeekPaused() {
return peekPaused.get();
}
+
+ private static class CeQueueDtoToCeTask implements Function<CeQueueDto, CeTask> {
+ private final Map<String, ComponentDto> componentDtoByUuid;
+
+ public CeQueueDtoToCeTask() {
+ this.componentDtoByUuid = Collections.emptyMap();
+ }
+
+ public CeQueueDtoToCeTask(Map<String, ComponentDto> componentDtoByUuid) {
+ this.componentDtoByUuid = componentDtoByUuid;
+ }
+
+ @Override
+ @Nonnull
+ public CeTask apply(@Nonnull CeQueueDto dto) {
+ CeTask.Builder builder = new CeTask.Builder();
+ builder.setUuid(dto.getUuid());
+ builder.setType(dto.getTaskType());
+ builder.setSubmitterLogin(dto.getSubmitterLogin());
+ String componentUuid = dto.getComponentUuid();
+ if (componentUuid != null) {
+ builder.setComponentUuid(componentUuid);
+ ComponentDto component = componentDtoByUuid.get(componentUuid);
+ if (component != null) {
+ builder.setComponentKey(component.getKey());
+ builder.setComponentName(component.name());
+ }
+ }
+ return builder.build();
+ }
+ }
+
+ private static class CeTaskSubmitToInsertedCeQueueDto implements Function<CeTaskSubmit, CeQueueDto> {
+ private final DbSession dbSession;
+ private final DbClient dbClient;
+
+ public CeTaskSubmitToInsertedCeQueueDto(DbSession dbSession, DbClient dbClient) {
+ this.dbSession = dbSession;
+ this.dbClient = dbClient;
+ }
+
+ @Override
+ @Nonnull
+ public CeQueueDto apply(@Nonnull CeTaskSubmit submission) {
+ CeQueueDto dto = new CeQueueDto();
+ dto.setUuid(submission.getUuid());
+ dto.setTaskType(submission.getType());
+ dto.setComponentUuid(submission.getComponentUuid());
+ dto.setStatus(CeQueueDto.Status.PENDING);
+ dto.setSubmitterLogin(submission.getSubmitterLogin());
+ dto.setStartedAt(null);
+ dbClient.ceQueueDao().insert(dbSession, dto);
+ return dto;
+ }
+ }
+
+ private enum CeQueueDtoToComponentUuid implements Function<CeQueueDto, String> {
+ INSTANCE;
+
+ @Override
+ @Nullable
+ public String apply(@Nonnull CeQueueDto input) {
+ return input.getComponentUuid();
+ }
+ }
}
package org.sonar.server.computation.queue;
import com.google.common.base.Optional;
+import java.util.List;
+import javax.annotation.Nullable;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.sonar.api.utils.internal.TestSystem2;
import org.sonar.core.util.UuidFactory;
import org.sonar.core.util.UuidFactoryImpl;
+import org.sonar.db.DbSession;
import org.sonar.db.DbTester;
import org.sonar.db.ce.CeActivityDto;
import org.sonar.db.ce.CeQueueDto;
import org.sonar.db.ce.CeTaskTypes;
+import org.sonar.db.component.ComponentDto;
import org.sonar.server.computation.monitoring.CEQueueStatus;
import org.sonar.server.computation.monitoring.CEQueueStatusImpl;
+import static java.util.Arrays.asList;
import static org.assertj.core.api.Assertions.assertThat;
import static org.hamcrest.Matchers.startsWith;
import static org.mockito.Matchers.any;
@Rule
public DbTester dbTester = DbTester.create(system2);
+ DbSession session = dbTester.getSession();
UuidFactory uuidFactory = UuidFactoryImpl.INSTANCE;
CEQueueStatus queueStatus = new CEQueueStatusImpl();
}
@Test
- public void test_submit() {
- CeTaskSubmit.Builder submission = underTest.prepareSubmit();
- submission.setComponentUuid("PROJECT_1");
- submission.setType(CeTaskTypes.REPORT);
- submission.setSubmitterLogin("rob");
+ public void submit_returns_task_populated_from_CeTaskSubmit_and_creates_CeQueue_row() {
+ CeTaskSubmit taskSubmit = createTaskSubmit(CeTaskTypes.REPORT, "PROJECT_1", "rob");
+ CeTask task = underTest.submit(taskSubmit);
- CeTask task = underTest.submit(submission.build());
- assertThat(task.getUuid()).isEqualTo(submission.getUuid());
- assertThat(task.getComponentUuid()).isEqualTo("PROJECT_1");
- assertThat(task.getSubmitterLogin()).isEqualTo("rob");
+ verifyCeTask(taskSubmit, task, null);
+ verifyCeQueueDtoForTaskSubmit(taskSubmit);
+ }
+
+ @Test
+ public void submit_increments_receivedCount_of_QueueStatus() {
+ underTest.submit(createTaskSubmit(CeTaskTypes.REPORT, "PROJECT_1", "rob"));
- Optional<CeQueueDto> queueDto = dbTester.getDbClient().ceQueueDao().selectByUuid(dbTester.getSession(), submission.getUuid());
- assertThat(queueDto.isPresent()).isTrue();
- assertThat(queueDto.get().getTaskType()).isEqualTo(CeTaskTypes.REPORT);
- assertThat(queueDto.get().getComponentUuid()).isEqualTo("PROJECT_1");
- assertThat(queueDto.get().getSubmitterLogin()).isEqualTo("rob");
- assertThat(queueDto.get().getCreatedAt()).isEqualTo(1_450_000_000_000L);
assertThat(queueStatus.getReceivedCount()).isEqualTo(1L);
+
+ underTest.submit(createTaskSubmit(CeTaskTypes.REPORT, "PROJECT_2", "rob"));
+
+ assertThat(queueStatus.getReceivedCount()).isEqualTo(2L);
}
@Test
- public void fail_to_submit_if_paused() {
+ public void submit_populates_component_name_and_key_of_CeTask_if_component_exists() {
+ ComponentDto componentDto = insertComponent(newComponentDto("PROJECT_1"));
+ CeTaskSubmit taskSubmit = createTaskSubmit(CeTaskTypes.REPORT, componentDto.uuid(), null);
+
+ CeTask task = underTest.submit(taskSubmit);
+
+ verifyCeTask(taskSubmit, task, componentDto);
+ }
+
+ @Test
+ public void submit_returns_task_without_component_info_when_submit_has_none() {
+ CeTaskSubmit taskSubmit = createTaskSubmit("not cpt related");
+
+ CeTask task = underTest.submit(taskSubmit);
+
+ verifyCeTask(taskSubmit, task, null);
+ }
+
+ @Test
+ public void submit_fails_with_ISE_if_paused() {
+ underTest.pauseSubmit();
+
expectedException.expect(IllegalStateException.class);
expectedException.expectMessage("Compute Engine does not currently accept new tasks");
- underTest.pauseSubmit();
submit(CeTaskTypes.REPORT, "PROJECT_1");
}
+ @Test
+ public void massSubmit_returns_tasks_for_each_CeTaskSubmit_populated_from_CeTaskSubmit_and_creates_CeQueue_row_for_each() {
+ CeTaskSubmit taskSubmit1 = createTaskSubmit(CeTaskTypes.REPORT, "PROJECT_1", "rob");
+ CeTaskSubmit taskSubmit2 = createTaskSubmit("some type");
+
+ List<CeTask> tasks = underTest.massSubmit(asList(taskSubmit1, taskSubmit2));
+
+ assertThat(tasks).hasSize(2);
+ verifyCeTask(taskSubmit1, tasks.get(0), null);
+ verifyCeTask(taskSubmit2, tasks.get(1), null);
+ verifyCeQueueDtoForTaskSubmit(taskSubmit1);
+ verifyCeQueueDtoForTaskSubmit(taskSubmit2);
+ }
+
+ @Test
+ public void massSubmit_populates_component_name_and_key_of_CeTask_if_component_exists() {
+ ComponentDto componentDto1 = insertComponent(newComponentDto("PROJECT_1"));
+ CeTaskSubmit taskSubmit1 = createTaskSubmit(CeTaskTypes.REPORT, componentDto1.uuid(), null);
+ CeTaskSubmit taskSubmit2 = createTaskSubmit("something", "non existing component uuid", null);
+
+ List<CeTask> tasks = underTest.massSubmit(asList(taskSubmit1, taskSubmit2));
+
+ assertThat(tasks).hasSize(2);
+ verifyCeTask(taskSubmit1, tasks.get(0), componentDto1);
+ verifyCeTask(taskSubmit2, tasks.get(1), null);
+ }
+
+ @Test
+ public void massSubmit_increments_receivedCount_of_QueueStatus() {
+ underTest.massSubmit(asList(createTaskSubmit("type 1"), createTaskSubmit("type 2")));
+
+ assertThat(queueStatus.getReceivedCount()).isEqualTo(2L);
+
+ underTest.massSubmit(asList(createTaskSubmit("a"), createTaskSubmit("a"), createTaskSubmit("b")));
+
+ assertThat(queueStatus.getReceivedCount()).isEqualTo(5L);
+ }
+
@Test
public void test_remove() {
CeTask task = submit(CeTaskTypes.REPORT, "PROJECT_1");
@Test
public void fail_to_remove_if_not_in_queue() throws Exception {
- expectedException.expect(IllegalStateException.class);
CeTask task = submit(CeTaskTypes.REPORT, "PROJECT_1");
underTest.remove(task, CeActivityDto.Status.SUCCESS, null);
- // fail
+ expectedException.expect(IllegalStateException.class);
+
underTest.remove(task, CeActivityDto.Status.SUCCESS, null);
}
assertThat(underTest.isPeekPaused()).isFalse();
}
+ private void verifyCeTask(CeTaskSubmit taskSubmit, CeTask task, @Nullable ComponentDto componentDto) {
+ assertThat(task.getUuid()).isEqualTo(taskSubmit.getUuid());
+ assertThat(task.getComponentUuid()).isEqualTo(task.getComponentUuid());
+ assertThat(task.getType()).isEqualTo(taskSubmit.getType());
+ if (componentDto == null) {
+ assertThat(task.getComponentKey()).isNull();
+ assertThat(task.getComponentName()).isNull();
+ } else {
+ assertThat(task.getComponentKey()).isEqualTo(componentDto.key());
+ assertThat(task.getComponentName()).isEqualTo(componentDto.name());
+ }
+ assertThat(task.getSubmitterLogin()).isEqualTo(taskSubmit.getSubmitterLogin());
+ }
+
+ private void verifyCeQueueDtoForTaskSubmit(CeTaskSubmit taskSubmit) {
+ Optional<CeQueueDto> queueDto = dbTester.getDbClient().ceQueueDao().selectByUuid(dbTester.getSession(), taskSubmit.getUuid());
+ assertThat(queueDto.isPresent()).isTrue();
+ assertThat(queueDto.get().getTaskType()).isEqualTo(taskSubmit.getType());
+ assertThat(queueDto.get().getComponentUuid()).isEqualTo(taskSubmit.getComponentUuid());
+ assertThat(queueDto.get().getSubmitterLogin()).isEqualTo(taskSubmit.getSubmitterLogin());
+ assertThat(queueDto.get().getCreatedAt()).isEqualTo(1_450_000_000_000L);
+ }
+
+ private static ComponentDto newComponentDto(String uuid) {
+ return new ComponentDto().setUuid(uuid).setName("name_" + uuid).setKey("key_" + uuid);
+ }
+
private CeTask submit(String reportType, String componentUuid) {
+ return underTest.submit(createTaskSubmit(reportType, componentUuid, null));
+ }
+
+ private CeTaskSubmit createTaskSubmit(String type) {
+ return createTaskSubmit(type, null, null);
+ }
+
+ private CeTaskSubmit createTaskSubmit(String type, @Nullable String componentUuid, @Nullable String submitterLogin) {
CeTaskSubmit.Builder submission = underTest.prepareSubmit();
- submission.setType(reportType);
+ submission.setType(type);
submission.setComponentUuid(componentUuid);
- return underTest.submit(submission.build());
+ submission.setSubmitterLogin(submitterLogin);
+ return submission.build();
}
private CeTaskResult newTaskResult(Long snapshotId) {
when(taskResult.getSnapshotId()).thenReturn(snapshotId);
return taskResult;
}
+
+ private ComponentDto insertComponent(ComponentDto componentDto) {
+ dbTester.getDbClient().componentDao().insert(session, componentDto);
+ session.commit();
+ return componentDto;
+ }
}