From 555829f24cc59d67efa6c03f980141088e4f6db9 Mon Sep 17 00:00:00 2001 From: =?utf8?q?S=C3=A9bastien=20Lesaint?= Date: Thu, 28 Jan 2016 16:23:54 +0100 Subject: [PATCH] SONAR-7168 add CeQueue.massSubmit --- .../server/computation/queue/CeQueue.java | 19 ++- .../server/computation/queue/CeQueueImpl.java | 141 ++++++++++++++--- .../computation/queue/CeQueueImplTest.java | 147 +++++++++++++++--- 3 files changed, 263 insertions(+), 44 deletions(-) diff --git a/server/sonar-server/src/main/java/org/sonar/server/computation/queue/CeQueue.java b/server/sonar-server/src/main/java/org/sonar/server/computation/queue/CeQueue.java index ed3bd8c48d2..dfedb006a47 100644 --- a/server/sonar-server/src/main/java/org/sonar/server/computation/queue/CeQueue.java +++ b/server/sonar-server/src/main/java/org/sonar/server/computation/queue/CeQueue.java @@ -20,6 +20,8 @@ package org.sonar.server.computation.queue; import com.google.common.base.Optional; +import java.util.Collection; +import java.util.List; import javax.annotation.Nullable; import org.sonar.db.ce.CeActivityDto; @@ -40,11 +42,24 @@ public interface CeQueue { /** * Submits a task to the queue. The task is processed asynchronously. - * If submits are paused (see {@link #isSubmitPaused()}, then an - * unchecked exception is thrown. + *

+ * This method is equivalent to calling {@code massSubmit(Collections.singletonList(submission))}. + *

+ * + * @throws IllegalStateException If submits are paused (see {@link #isSubmitPaused()}) */ CeTask submit(CeTaskSubmit submission); + /** + * Submits multiple tasks to the queue at once. All tasks are processed asynchronously. + *

+ * This method will perform significantly better that calling {@link #submit(CeTaskSubmit)} in a loop. + *

+ * + * @throws IllegalStateException If submits are paused (see {@link #isSubmitPaused()}) + */ + List massSubmit(Collection submissions); + /** * Peek the oldest task in status {@link org.sonar.db.ce.CeQueueDto.Status#PENDING}. * The task status is changed to {@link org.sonar.db.ce.CeQueueDto.Status#IN_PROGRESS}. diff --git a/server/sonar-server/src/main/java/org/sonar/server/computation/queue/CeQueueImpl.java b/server/sonar-server/src/main/java/org/sonar/server/computation/queue/CeQueueImpl.java index 4aaca6297ba..5d4cc559dc6 100644 --- a/server/sonar-server/src/main/java/org/sonar/server/computation/queue/CeQueueImpl.java +++ b/server/sonar-server/src/main/java/org/sonar/server/computation/queue/CeQueueImpl.java @@ -19,8 +19,16 @@ */ package org.sonar.server.computation.queue; +import com.google.common.base.Function; import com.google.common.base.Optional; +import com.google.common.collect.ImmutableMap; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; +import javax.annotation.Nonnull; import javax.annotation.Nullable; import org.sonar.api.server.ServerSide; import org.sonar.api.utils.System2; @@ -33,7 +41,10 @@ import org.sonar.db.component.ComponentDto; import org.sonar.server.computation.monitoring.CEQueueStatus; import static com.google.common.base.Preconditions.checkState; +import static com.google.common.base.Predicates.notNull; +import static com.google.common.collect.FluentIterable.from; import static java.lang.String.format; +import static org.sonar.db.component.ComponentDtoFunctions.toUuid; @ServerSide public class CeQueueImpl implements CeQueue { @@ -68,14 +79,7 @@ public class CeQueueImpl implements CeQueue { DbSession dbSession = dbClient.openSession(false); try { - CeQueueDto dto = new CeQueueDto(); - dto.setUuid(submission.getUuid()); - dto.setTaskType(submission.getType()); - dto.setComponentUuid(submission.getComponentUuid()); - dto.setStatus(CeQueueDto.Status.PENDING); - dto.setSubmitterLogin(submission.getSubmitterLogin()); - dto.setStartedAt(null); - dbClient.ceQueueDao().insert(dbSession, dto); + CeQueueDto dto = new CeTaskSubmitToInsertedCeQueueDto(dbSession, dbClient).apply(submission); CeTask task = loadTask(dbSession, dto); dbSession.commit(); queueStatus.addReceived(); @@ -86,6 +90,28 @@ public class CeQueueImpl implements CeQueue { } } + @Override + public List massSubmit(Collection submissions) { + checkState(!submitPaused.get(), "Compute Engine does not currently accept new tasks"); + if (submissions.isEmpty()) { + return Collections.emptyList(); + } + + DbSession dbSession = dbClient.openSession(true); + try { + List ceQueueDtos = from(submissions) + .transform(new CeTaskSubmitToInsertedCeQueueDto(dbSession, dbClient)) + .toList(); + List tasks = loadTasks(dbSession, ceQueueDtos); + dbSession.commit(); + queueStatus.addReceived(tasks.size()); + return tasks; + + } finally { + dbClient.closeSession(dbSession); + } + } + @Override public Optional peek() { if (peekPaused.get()) { @@ -107,20 +133,28 @@ public class CeQueueImpl implements CeQueue { } private CeTask loadTask(DbSession dbSession, CeQueueDto dto) { - CeTask.Builder builder = new CeTask.Builder(); - builder.setUuid(dto.getUuid()); - builder.setType(dto.getTaskType()); - builder.setSubmitterLogin(dto.getSubmitterLogin()); - String componentUuid = dto.getComponentUuid(); - if (componentUuid != null) { - builder.setComponentUuid(componentUuid); - Optional component = dbClient.componentDao().selectByUuid(dbSession, componentUuid); - if (component.isPresent()) { - builder.setComponentKey(component.get().getKey()); - builder.setComponentName(component.get().name()); - } + if (dto.getComponentUuid() == null) { + return new CeQueueDtoToCeTask().apply(dto); } - return builder.build(); + Optional componentDto = dbClient.componentDao().selectByUuid(dbSession, dto.getComponentUuid()); + if (componentDto.isPresent()) { + return new CeQueueDtoToCeTask(ImmutableMap.of(dto.getComponentUuid(), componentDto.get())).apply(dto); + } + return new CeQueueDtoToCeTask().apply(dto); + } + + private List loadTasks(DbSession dbSession, List dtos) { + Set componentUuids = from(dtos) + .transform(CeQueueDtoToComponentUuid.INSTANCE) + .filter(notNull()) + .toSet(); + Map componentDtoByUuid = from(dbClient.componentDao() + .selectByUuids(dbSession, componentUuids)) + .uniqueIndex(toUuid()); + + return from(dtos) + .transform(new CeQueueDtoToCeTask(componentDtoByUuid)) + .toList(); } @Override @@ -253,4 +287,69 @@ public class CeQueueImpl implements CeQueue { public boolean isPeekPaused() { return peekPaused.get(); } + + private static class CeQueueDtoToCeTask implements Function { + private final Map componentDtoByUuid; + + public CeQueueDtoToCeTask() { + this.componentDtoByUuid = Collections.emptyMap(); + } + + public CeQueueDtoToCeTask(Map componentDtoByUuid) { + this.componentDtoByUuid = componentDtoByUuid; + } + + @Override + @Nonnull + public CeTask apply(@Nonnull CeQueueDto dto) { + CeTask.Builder builder = new CeTask.Builder(); + builder.setUuid(dto.getUuid()); + builder.setType(dto.getTaskType()); + builder.setSubmitterLogin(dto.getSubmitterLogin()); + String componentUuid = dto.getComponentUuid(); + if (componentUuid != null) { + builder.setComponentUuid(componentUuid); + ComponentDto component = componentDtoByUuid.get(componentUuid); + if (component != null) { + builder.setComponentKey(component.getKey()); + builder.setComponentName(component.name()); + } + } + return builder.build(); + } + } + + private static class CeTaskSubmitToInsertedCeQueueDto implements Function { + private final DbSession dbSession; + private final DbClient dbClient; + + public CeTaskSubmitToInsertedCeQueueDto(DbSession dbSession, DbClient dbClient) { + this.dbSession = dbSession; + this.dbClient = dbClient; + } + + @Override + @Nonnull + public CeQueueDto apply(@Nonnull CeTaskSubmit submission) { + CeQueueDto dto = new CeQueueDto(); + dto.setUuid(submission.getUuid()); + dto.setTaskType(submission.getType()); + dto.setComponentUuid(submission.getComponentUuid()); + dto.setStatus(CeQueueDto.Status.PENDING); + dto.setSubmitterLogin(submission.getSubmitterLogin()); + dto.setStartedAt(null); + dbClient.ceQueueDao().insert(dbSession, dto); + return dto; + } + } + + private enum CeQueueDtoToComponentUuid implements Function { + INSTANCE; + + @Override + @Nullable + public String apply(@Nonnull CeQueueDto input) { + return input.getComponentUuid(); + } + } } diff --git a/server/sonar-server/src/test/java/org/sonar/server/computation/queue/CeQueueImplTest.java b/server/sonar-server/src/test/java/org/sonar/server/computation/queue/CeQueueImplTest.java index 765a338505a..69acc25acc4 100644 --- a/server/sonar-server/src/test/java/org/sonar/server/computation/queue/CeQueueImplTest.java +++ b/server/sonar-server/src/test/java/org/sonar/server/computation/queue/CeQueueImplTest.java @@ -20,6 +20,8 @@ package org.sonar.server.computation.queue; import com.google.common.base.Optional; +import java.util.List; +import javax.annotation.Nullable; import org.junit.Before; import org.junit.Rule; import org.junit.Test; @@ -28,13 +30,16 @@ import org.sonar.api.utils.System2; import org.sonar.api.utils.internal.TestSystem2; import org.sonar.core.util.UuidFactory; import org.sonar.core.util.UuidFactoryImpl; +import org.sonar.db.DbSession; import org.sonar.db.DbTester; import org.sonar.db.ce.CeActivityDto; import org.sonar.db.ce.CeQueueDto; import org.sonar.db.ce.CeTaskTypes; +import org.sonar.db.component.ComponentDto; import org.sonar.server.computation.monitoring.CEQueueStatus; import org.sonar.server.computation.monitoring.CEQueueStatusImpl; +import static java.util.Arrays.asList; import static org.assertj.core.api.Assertions.assertThat; import static org.hamcrest.Matchers.startsWith; import static org.mockito.Matchers.any; @@ -54,6 +59,7 @@ public class CeQueueImplTest { @Rule public DbTester dbTester = DbTester.create(system2); + DbSession session = dbTester.getSession(); UuidFactory uuidFactory = UuidFactoryImpl.INSTANCE; CEQueueStatus queueStatus = new CEQueueStatusImpl(); @@ -66,35 +72,92 @@ public class CeQueueImplTest { } @Test - public void test_submit() { - CeTaskSubmit.Builder submission = underTest.prepareSubmit(); - submission.setComponentUuid("PROJECT_1"); - submission.setType(CeTaskTypes.REPORT); - submission.setSubmitterLogin("rob"); + public void submit_returns_task_populated_from_CeTaskSubmit_and_creates_CeQueue_row() { + CeTaskSubmit taskSubmit = createTaskSubmit(CeTaskTypes.REPORT, "PROJECT_1", "rob"); + CeTask task = underTest.submit(taskSubmit); - CeTask task = underTest.submit(submission.build()); - assertThat(task.getUuid()).isEqualTo(submission.getUuid()); - assertThat(task.getComponentUuid()).isEqualTo("PROJECT_1"); - assertThat(task.getSubmitterLogin()).isEqualTo("rob"); + verifyCeTask(taskSubmit, task, null); + verifyCeQueueDtoForTaskSubmit(taskSubmit); + } + + @Test + public void submit_increments_receivedCount_of_QueueStatus() { + underTest.submit(createTaskSubmit(CeTaskTypes.REPORT, "PROJECT_1", "rob")); - Optional queueDto = dbTester.getDbClient().ceQueueDao().selectByUuid(dbTester.getSession(), submission.getUuid()); - assertThat(queueDto.isPresent()).isTrue(); - assertThat(queueDto.get().getTaskType()).isEqualTo(CeTaskTypes.REPORT); - assertThat(queueDto.get().getComponentUuid()).isEqualTo("PROJECT_1"); - assertThat(queueDto.get().getSubmitterLogin()).isEqualTo("rob"); - assertThat(queueDto.get().getCreatedAt()).isEqualTo(1_450_000_000_000L); assertThat(queueStatus.getReceivedCount()).isEqualTo(1L); + + underTest.submit(createTaskSubmit(CeTaskTypes.REPORT, "PROJECT_2", "rob")); + + assertThat(queueStatus.getReceivedCount()).isEqualTo(2L); } @Test - public void fail_to_submit_if_paused() { + public void submit_populates_component_name_and_key_of_CeTask_if_component_exists() { + ComponentDto componentDto = insertComponent(newComponentDto("PROJECT_1")); + CeTaskSubmit taskSubmit = createTaskSubmit(CeTaskTypes.REPORT, componentDto.uuid(), null); + + CeTask task = underTest.submit(taskSubmit); + + verifyCeTask(taskSubmit, task, componentDto); + } + + @Test + public void submit_returns_task_without_component_info_when_submit_has_none() { + CeTaskSubmit taskSubmit = createTaskSubmit("not cpt related"); + + CeTask task = underTest.submit(taskSubmit); + + verifyCeTask(taskSubmit, task, null); + } + + @Test + public void submit_fails_with_ISE_if_paused() { + underTest.pauseSubmit(); + expectedException.expect(IllegalStateException.class); expectedException.expectMessage("Compute Engine does not currently accept new tasks"); - underTest.pauseSubmit(); submit(CeTaskTypes.REPORT, "PROJECT_1"); } + @Test + public void massSubmit_returns_tasks_for_each_CeTaskSubmit_populated_from_CeTaskSubmit_and_creates_CeQueue_row_for_each() { + CeTaskSubmit taskSubmit1 = createTaskSubmit(CeTaskTypes.REPORT, "PROJECT_1", "rob"); + CeTaskSubmit taskSubmit2 = createTaskSubmit("some type"); + + List tasks = underTest.massSubmit(asList(taskSubmit1, taskSubmit2)); + + assertThat(tasks).hasSize(2); + verifyCeTask(taskSubmit1, tasks.get(0), null); + verifyCeTask(taskSubmit2, tasks.get(1), null); + verifyCeQueueDtoForTaskSubmit(taskSubmit1); + verifyCeQueueDtoForTaskSubmit(taskSubmit2); + } + + @Test + public void massSubmit_populates_component_name_and_key_of_CeTask_if_component_exists() { + ComponentDto componentDto1 = insertComponent(newComponentDto("PROJECT_1")); + CeTaskSubmit taskSubmit1 = createTaskSubmit(CeTaskTypes.REPORT, componentDto1.uuid(), null); + CeTaskSubmit taskSubmit2 = createTaskSubmit("something", "non existing component uuid", null); + + List tasks = underTest.massSubmit(asList(taskSubmit1, taskSubmit2)); + + assertThat(tasks).hasSize(2); + verifyCeTask(taskSubmit1, tasks.get(0), componentDto1); + verifyCeTask(taskSubmit2, tasks.get(1), null); + } + + @Test + public void massSubmit_increments_receivedCount_of_QueueStatus() { + underTest.massSubmit(asList(createTaskSubmit("type 1"), createTaskSubmit("type 2"))); + + assertThat(queueStatus.getReceivedCount()).isEqualTo(2L); + + underTest.massSubmit(asList(createTaskSubmit("a"), createTaskSubmit("a"), createTaskSubmit("b"))); + + assertThat(queueStatus.getReceivedCount()).isEqualTo(5L); + } + @Test public void test_remove() { CeTask task = submit(CeTaskTypes.REPORT, "PROJECT_1"); @@ -143,11 +206,11 @@ public class CeQueueImplTest { @Test public void fail_to_remove_if_not_in_queue() throws Exception { - expectedException.expect(IllegalStateException.class); CeTask task = submit(CeTaskTypes.REPORT, "PROJECT_1"); underTest.remove(task, CeActivityDto.Status.SUCCESS, null); - // fail + expectedException.expect(IllegalStateException.class); + underTest.remove(task, CeActivityDto.Status.SUCCESS, null); } @@ -244,11 +307,47 @@ public class CeQueueImplTest { assertThat(underTest.isPeekPaused()).isFalse(); } + private void verifyCeTask(CeTaskSubmit taskSubmit, CeTask task, @Nullable ComponentDto componentDto) { + assertThat(task.getUuid()).isEqualTo(taskSubmit.getUuid()); + assertThat(task.getComponentUuid()).isEqualTo(task.getComponentUuid()); + assertThat(task.getType()).isEqualTo(taskSubmit.getType()); + if (componentDto == null) { + assertThat(task.getComponentKey()).isNull(); + assertThat(task.getComponentName()).isNull(); + } else { + assertThat(task.getComponentKey()).isEqualTo(componentDto.key()); + assertThat(task.getComponentName()).isEqualTo(componentDto.name()); + } + assertThat(task.getSubmitterLogin()).isEqualTo(taskSubmit.getSubmitterLogin()); + } + + private void verifyCeQueueDtoForTaskSubmit(CeTaskSubmit taskSubmit) { + Optional queueDto = dbTester.getDbClient().ceQueueDao().selectByUuid(dbTester.getSession(), taskSubmit.getUuid()); + assertThat(queueDto.isPresent()).isTrue(); + assertThat(queueDto.get().getTaskType()).isEqualTo(taskSubmit.getType()); + assertThat(queueDto.get().getComponentUuid()).isEqualTo(taskSubmit.getComponentUuid()); + assertThat(queueDto.get().getSubmitterLogin()).isEqualTo(taskSubmit.getSubmitterLogin()); + assertThat(queueDto.get().getCreatedAt()).isEqualTo(1_450_000_000_000L); + } + + private static ComponentDto newComponentDto(String uuid) { + return new ComponentDto().setUuid(uuid).setName("name_" + uuid).setKey("key_" + uuid); + } + private CeTask submit(String reportType, String componentUuid) { + return underTest.submit(createTaskSubmit(reportType, componentUuid, null)); + } + + private CeTaskSubmit createTaskSubmit(String type) { + return createTaskSubmit(type, null, null); + } + + private CeTaskSubmit createTaskSubmit(String type, @Nullable String componentUuid, @Nullable String submitterLogin) { CeTaskSubmit.Builder submission = underTest.prepareSubmit(); - submission.setType(reportType); + submission.setType(type); submission.setComponentUuid(componentUuid); - return underTest.submit(submission.build()); + submission.setSubmitterLogin(submitterLogin); + return submission.build(); } private CeTaskResult newTaskResult(Long snapshotId) { @@ -256,4 +355,10 @@ public class CeQueueImplTest { when(taskResult.getSnapshotId()).thenReturn(snapshotId); return taskResult; } + + private ComponentDto insertComponent(ComponentDto componentDto) { + dbTester.getDbClient().componentDao().insert(session, componentDto); + session.commit(); + return componentDto; + } } -- 2.39.5