]> source.dussan.org Git - sonarqube.git/commitdiff
SONAR-7168 add CeQueue.massSubmit 743/head
authorSébastien Lesaint <sebastien.lesaint@sonarsource.com>
Thu, 28 Jan 2016 15:23:54 +0000 (16:23 +0100)
committerSébastien Lesaint <sebastien.lesaint@sonarsource.com>
Fri, 29 Jan 2016 15:05:32 +0000 (16:05 +0100)
server/sonar-server/src/main/java/org/sonar/server/computation/queue/CeQueue.java
server/sonar-server/src/main/java/org/sonar/server/computation/queue/CeQueueImpl.java
server/sonar-server/src/test/java/org/sonar/server/computation/queue/CeQueueImplTest.java

index ed3bd8c48d2d2754cea73cef9d4ba12aa8dc1b4f..dfedb006a47c4d0dc83d587969ba5443e3468582 100644 (file)
@@ -20,6 +20,8 @@
 package org.sonar.server.computation.queue;
 
 import com.google.common.base.Optional;
+import java.util.Collection;
+import java.util.List;
 import javax.annotation.Nullable;
 import org.sonar.db.ce.CeActivityDto;
 
@@ -40,11 +42,24 @@ public interface CeQueue {
 
   /**
    * Submits a task to the queue. The task is processed asynchronously.
-   * If submits are paused (see {@link #isSubmitPaused()}, then an
-   * unchecked exception is thrown.
+   * <p>
+   * This method is equivalent to calling {@code massSubmit(Collections.singletonList(submission))}.
+   * </p>
+   *
+   * @throws IllegalStateException If submits are paused (see {@link #isSubmitPaused()})
    */
   CeTask submit(CeTaskSubmit submission);
 
+  /**
+   * Submits multiple tasks to the queue at once. All tasks are processed asynchronously.
+   * <p>
+   * This method will perform significantly better that calling {@link #submit(CeTaskSubmit)} in a loop.
+   * </p>
+   *
+   * @throws IllegalStateException If submits are paused (see {@link #isSubmitPaused()})
+   */
+  List<CeTask> massSubmit(Collection<CeTaskSubmit> submissions);
+
   /**
    * Peek the oldest task in status {@link org.sonar.db.ce.CeQueueDto.Status#PENDING}.
    * The task status is changed to {@link org.sonar.db.ce.CeQueueDto.Status#IN_PROGRESS}.
index 4aaca6297ba116a43eb9ab06077a03f06b5409f7..5d4cc559dc6716aa5d6c3fac41f5a37dc0cbcdcf 100644 (file)
  */
 package org.sonar.server.computation.queue;
 
+import com.google.common.base.Function;
 import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableMap;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.atomic.AtomicBoolean;
+import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 import org.sonar.api.server.ServerSide;
 import org.sonar.api.utils.System2;
@@ -33,7 +41,10 @@ import org.sonar.db.component.ComponentDto;
 import org.sonar.server.computation.monitoring.CEQueueStatus;
 
 import static com.google.common.base.Preconditions.checkState;
+import static com.google.common.base.Predicates.notNull;
+import static com.google.common.collect.FluentIterable.from;
 import static java.lang.String.format;
+import static org.sonar.db.component.ComponentDtoFunctions.toUuid;
 
 @ServerSide
 public class CeQueueImpl implements CeQueue {
@@ -68,14 +79,7 @@ public class CeQueueImpl implements CeQueue {
 
     DbSession dbSession = dbClient.openSession(false);
     try {
-      CeQueueDto dto = new CeQueueDto();
-      dto.setUuid(submission.getUuid());
-      dto.setTaskType(submission.getType());
-      dto.setComponentUuid(submission.getComponentUuid());
-      dto.setStatus(CeQueueDto.Status.PENDING);
-      dto.setSubmitterLogin(submission.getSubmitterLogin());
-      dto.setStartedAt(null);
-      dbClient.ceQueueDao().insert(dbSession, dto);
+      CeQueueDto dto = new CeTaskSubmitToInsertedCeQueueDto(dbSession, dbClient).apply(submission);
       CeTask task = loadTask(dbSession, dto);
       dbSession.commit();
       queueStatus.addReceived();
@@ -86,6 +90,28 @@ public class CeQueueImpl implements CeQueue {
     }
   }
 
+  @Override
+  public List<CeTask> massSubmit(Collection<CeTaskSubmit> submissions) {
+    checkState(!submitPaused.get(), "Compute Engine does not currently accept new tasks");
+    if (submissions.isEmpty()) {
+      return Collections.emptyList();
+    }
+
+    DbSession dbSession = dbClient.openSession(true);
+    try {
+      List<CeQueueDto> ceQueueDtos = from(submissions)
+        .transform(new CeTaskSubmitToInsertedCeQueueDto(dbSession, dbClient))
+        .toList();
+      List<CeTask> tasks = loadTasks(dbSession, ceQueueDtos);
+      dbSession.commit();
+      queueStatus.addReceived(tasks.size());
+      return tasks;
+
+    } finally {
+      dbClient.closeSession(dbSession);
+    }
+  }
+
   @Override
   public Optional<CeTask> peek() {
     if (peekPaused.get()) {
@@ -107,20 +133,28 @@ public class CeQueueImpl implements CeQueue {
   }
 
   private CeTask loadTask(DbSession dbSession, CeQueueDto dto) {
-    CeTask.Builder builder = new CeTask.Builder();
-    builder.setUuid(dto.getUuid());
-    builder.setType(dto.getTaskType());
-    builder.setSubmitterLogin(dto.getSubmitterLogin());
-    String componentUuid = dto.getComponentUuid();
-    if (componentUuid != null) {
-      builder.setComponentUuid(componentUuid);
-      Optional<ComponentDto> component = dbClient.componentDao().selectByUuid(dbSession, componentUuid);
-      if (component.isPresent()) {
-        builder.setComponentKey(component.get().getKey());
-        builder.setComponentName(component.get().name());
-      }
+    if (dto.getComponentUuid() == null) {
+      return new CeQueueDtoToCeTask().apply(dto);
     }
-    return builder.build();
+    Optional<ComponentDto> componentDto = dbClient.componentDao().selectByUuid(dbSession, dto.getComponentUuid());
+    if (componentDto.isPresent()) {
+      return new CeQueueDtoToCeTask(ImmutableMap.of(dto.getComponentUuid(), componentDto.get())).apply(dto);
+    }
+    return new CeQueueDtoToCeTask().apply(dto);
+  }
+
+  private List<CeTask> loadTasks(DbSession dbSession, List<CeQueueDto> dtos) {
+    Set<String> componentUuids = from(dtos)
+      .transform(CeQueueDtoToComponentUuid.INSTANCE)
+      .filter(notNull())
+      .toSet();
+    Map<String, ComponentDto> componentDtoByUuid = from(dbClient.componentDao()
+      .selectByUuids(dbSession, componentUuids))
+      .uniqueIndex(toUuid());
+
+    return from(dtos)
+      .transform(new CeQueueDtoToCeTask(componentDtoByUuid))
+      .toList();
   }
 
   @Override
@@ -253,4 +287,69 @@ public class CeQueueImpl implements CeQueue {
   public boolean isPeekPaused() {
     return peekPaused.get();
   }
+
+  private static class CeQueueDtoToCeTask implements Function<CeQueueDto, CeTask> {
+    private final Map<String, ComponentDto> componentDtoByUuid;
+
+    public CeQueueDtoToCeTask() {
+      this.componentDtoByUuid = Collections.emptyMap();
+    }
+
+    public CeQueueDtoToCeTask(Map<String, ComponentDto> componentDtoByUuid) {
+      this.componentDtoByUuid = componentDtoByUuid;
+    }
+
+    @Override
+    @Nonnull
+    public CeTask apply(@Nonnull CeQueueDto dto) {
+      CeTask.Builder builder = new CeTask.Builder();
+      builder.setUuid(dto.getUuid());
+      builder.setType(dto.getTaskType());
+      builder.setSubmitterLogin(dto.getSubmitterLogin());
+      String componentUuid = dto.getComponentUuid();
+      if (componentUuid != null) {
+        builder.setComponentUuid(componentUuid);
+        ComponentDto component = componentDtoByUuid.get(componentUuid);
+        if (component != null) {
+          builder.setComponentKey(component.getKey());
+          builder.setComponentName(component.name());
+        }
+      }
+      return builder.build();
+    }
+  }
+
+  private static class CeTaskSubmitToInsertedCeQueueDto implements Function<CeTaskSubmit, CeQueueDto> {
+    private final DbSession dbSession;
+    private final DbClient dbClient;
+
+    public CeTaskSubmitToInsertedCeQueueDto(DbSession dbSession, DbClient dbClient) {
+      this.dbSession = dbSession;
+      this.dbClient = dbClient;
+    }
+
+    @Override
+    @Nonnull
+    public CeQueueDto apply(@Nonnull CeTaskSubmit submission) {
+      CeQueueDto dto = new CeQueueDto();
+      dto.setUuid(submission.getUuid());
+      dto.setTaskType(submission.getType());
+      dto.setComponentUuid(submission.getComponentUuid());
+      dto.setStatus(CeQueueDto.Status.PENDING);
+      dto.setSubmitterLogin(submission.getSubmitterLogin());
+      dto.setStartedAt(null);
+      dbClient.ceQueueDao().insert(dbSession, dto);
+      return dto;
+    }
+  }
+
+  private enum CeQueueDtoToComponentUuid implements Function<CeQueueDto, String> {
+    INSTANCE;
+
+    @Override
+    @Nullable
+    public String apply(@Nonnull CeQueueDto input) {
+      return input.getComponentUuid();
+    }
+  }
 }
index 765a338505a57787eaef5b98007d3ea9af5f0c5c..69acc25acc4399a3c36a52c423b134cb0e0e9ad4 100644 (file)
@@ -20,6 +20,8 @@
 package org.sonar.server.computation.queue;
 
 import com.google.common.base.Optional;
+import java.util.List;
+import javax.annotation.Nullable;
 import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
@@ -28,13 +30,16 @@ import org.sonar.api.utils.System2;
 import org.sonar.api.utils.internal.TestSystem2;
 import org.sonar.core.util.UuidFactory;
 import org.sonar.core.util.UuidFactoryImpl;
+import org.sonar.db.DbSession;
 import org.sonar.db.DbTester;
 import org.sonar.db.ce.CeActivityDto;
 import org.sonar.db.ce.CeQueueDto;
 import org.sonar.db.ce.CeTaskTypes;
+import org.sonar.db.component.ComponentDto;
 import org.sonar.server.computation.monitoring.CEQueueStatus;
 import org.sonar.server.computation.monitoring.CEQueueStatusImpl;
 
+import static java.util.Arrays.asList;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.hamcrest.Matchers.startsWith;
 import static org.mockito.Matchers.any;
@@ -54,6 +59,7 @@ public class CeQueueImplTest {
 
   @Rule
   public DbTester dbTester = DbTester.create(system2);
+  DbSession session = dbTester.getSession();
 
   UuidFactory uuidFactory = UuidFactoryImpl.INSTANCE;
   CEQueueStatus queueStatus = new CEQueueStatusImpl();
@@ -66,35 +72,92 @@ public class CeQueueImplTest {
   }
 
   @Test
-  public void test_submit() {
-    CeTaskSubmit.Builder submission = underTest.prepareSubmit();
-    submission.setComponentUuid("PROJECT_1");
-    submission.setType(CeTaskTypes.REPORT);
-    submission.setSubmitterLogin("rob");
+  public void submit_returns_task_populated_from_CeTaskSubmit_and_creates_CeQueue_row() {
+    CeTaskSubmit taskSubmit = createTaskSubmit(CeTaskTypes.REPORT, "PROJECT_1", "rob");
+    CeTask task = underTest.submit(taskSubmit);
 
-    CeTask task = underTest.submit(submission.build());
-    assertThat(task.getUuid()).isEqualTo(submission.getUuid());
-    assertThat(task.getComponentUuid()).isEqualTo("PROJECT_1");
-    assertThat(task.getSubmitterLogin()).isEqualTo("rob");
+    verifyCeTask(taskSubmit, task, null);
+    verifyCeQueueDtoForTaskSubmit(taskSubmit);
+  }
+
+  @Test
+  public void submit_increments_receivedCount_of_QueueStatus() {
+    underTest.submit(createTaskSubmit(CeTaskTypes.REPORT, "PROJECT_1", "rob"));
 
-    Optional<CeQueueDto> queueDto = dbTester.getDbClient().ceQueueDao().selectByUuid(dbTester.getSession(), submission.getUuid());
-    assertThat(queueDto.isPresent()).isTrue();
-    assertThat(queueDto.get().getTaskType()).isEqualTo(CeTaskTypes.REPORT);
-    assertThat(queueDto.get().getComponentUuid()).isEqualTo("PROJECT_1");
-    assertThat(queueDto.get().getSubmitterLogin()).isEqualTo("rob");
-    assertThat(queueDto.get().getCreatedAt()).isEqualTo(1_450_000_000_000L);
     assertThat(queueStatus.getReceivedCount()).isEqualTo(1L);
+
+    underTest.submit(createTaskSubmit(CeTaskTypes.REPORT, "PROJECT_2", "rob"));
+
+    assertThat(queueStatus.getReceivedCount()).isEqualTo(2L);
   }
 
   @Test
-  public void fail_to_submit_if_paused() {
+  public void submit_populates_component_name_and_key_of_CeTask_if_component_exists() {
+    ComponentDto componentDto = insertComponent(newComponentDto("PROJECT_1"));
+    CeTaskSubmit taskSubmit = createTaskSubmit(CeTaskTypes.REPORT, componentDto.uuid(), null);
+
+    CeTask task = underTest.submit(taskSubmit);
+
+    verifyCeTask(taskSubmit, task, componentDto);
+  }
+
+  @Test
+  public void submit_returns_task_without_component_info_when_submit_has_none() {
+    CeTaskSubmit taskSubmit = createTaskSubmit("not cpt related");
+
+    CeTask task = underTest.submit(taskSubmit);
+
+    verifyCeTask(taskSubmit, task, null);
+  }
+
+  @Test
+  public void submit_fails_with_ISE_if_paused() {
+    underTest.pauseSubmit();
+
     expectedException.expect(IllegalStateException.class);
     expectedException.expectMessage("Compute Engine does not currently accept new tasks");
-    underTest.pauseSubmit();
 
     submit(CeTaskTypes.REPORT, "PROJECT_1");
   }
 
+  @Test
+  public void massSubmit_returns_tasks_for_each_CeTaskSubmit_populated_from_CeTaskSubmit_and_creates_CeQueue_row_for_each() {
+    CeTaskSubmit taskSubmit1 = createTaskSubmit(CeTaskTypes.REPORT, "PROJECT_1", "rob");
+    CeTaskSubmit taskSubmit2 = createTaskSubmit("some type");
+
+    List<CeTask> tasks = underTest.massSubmit(asList(taskSubmit1, taskSubmit2));
+
+    assertThat(tasks).hasSize(2);
+    verifyCeTask(taskSubmit1, tasks.get(0), null);
+    verifyCeTask(taskSubmit2, tasks.get(1), null);
+    verifyCeQueueDtoForTaskSubmit(taskSubmit1);
+    verifyCeQueueDtoForTaskSubmit(taskSubmit2);
+  }
+
+  @Test
+  public void massSubmit_populates_component_name_and_key_of_CeTask_if_component_exists() {
+    ComponentDto componentDto1 = insertComponent(newComponentDto("PROJECT_1"));
+    CeTaskSubmit taskSubmit1 = createTaskSubmit(CeTaskTypes.REPORT, componentDto1.uuid(), null);
+    CeTaskSubmit taskSubmit2 = createTaskSubmit("something", "non existing component uuid", null);
+
+    List<CeTask> tasks = underTest.massSubmit(asList(taskSubmit1, taskSubmit2));
+
+    assertThat(tasks).hasSize(2);
+    verifyCeTask(taskSubmit1, tasks.get(0), componentDto1);
+    verifyCeTask(taskSubmit2, tasks.get(1), null);
+  }
+
+  @Test
+  public void massSubmit_increments_receivedCount_of_QueueStatus() {
+    underTest.massSubmit(asList(createTaskSubmit("type 1"), createTaskSubmit("type 2")));
+
+    assertThat(queueStatus.getReceivedCount()).isEqualTo(2L);
+
+    underTest.massSubmit(asList(createTaskSubmit("a"), createTaskSubmit("a"), createTaskSubmit("b")));
+
+    assertThat(queueStatus.getReceivedCount()).isEqualTo(5L);
+  }
+
   @Test
   public void test_remove() {
     CeTask task = submit(CeTaskTypes.REPORT, "PROJECT_1");
@@ -143,11 +206,11 @@ public class CeQueueImplTest {
 
   @Test
   public void fail_to_remove_if_not_in_queue() throws Exception {
-    expectedException.expect(IllegalStateException.class);
     CeTask task = submit(CeTaskTypes.REPORT, "PROJECT_1");
     underTest.remove(task, CeActivityDto.Status.SUCCESS, null);
 
-    // fail
+    expectedException.expect(IllegalStateException.class);
+
     underTest.remove(task, CeActivityDto.Status.SUCCESS, null);
   }
 
@@ -244,11 +307,47 @@ public class CeQueueImplTest {
     assertThat(underTest.isPeekPaused()).isFalse();
   }
 
+  private void verifyCeTask(CeTaskSubmit taskSubmit, CeTask task, @Nullable ComponentDto componentDto) {
+    assertThat(task.getUuid()).isEqualTo(taskSubmit.getUuid());
+    assertThat(task.getComponentUuid()).isEqualTo(task.getComponentUuid());
+    assertThat(task.getType()).isEqualTo(taskSubmit.getType());
+    if (componentDto == null) {
+      assertThat(task.getComponentKey()).isNull();
+      assertThat(task.getComponentName()).isNull();
+    } else {
+      assertThat(task.getComponentKey()).isEqualTo(componentDto.key());
+      assertThat(task.getComponentName()).isEqualTo(componentDto.name());
+    }
+    assertThat(task.getSubmitterLogin()).isEqualTo(taskSubmit.getSubmitterLogin());
+  }
+
+  private void verifyCeQueueDtoForTaskSubmit(CeTaskSubmit taskSubmit) {
+    Optional<CeQueueDto> queueDto = dbTester.getDbClient().ceQueueDao().selectByUuid(dbTester.getSession(), taskSubmit.getUuid());
+    assertThat(queueDto.isPresent()).isTrue();
+    assertThat(queueDto.get().getTaskType()).isEqualTo(taskSubmit.getType());
+    assertThat(queueDto.get().getComponentUuid()).isEqualTo(taskSubmit.getComponentUuid());
+    assertThat(queueDto.get().getSubmitterLogin()).isEqualTo(taskSubmit.getSubmitterLogin());
+    assertThat(queueDto.get().getCreatedAt()).isEqualTo(1_450_000_000_000L);
+  }
+
+  private static ComponentDto newComponentDto(String uuid) {
+    return new ComponentDto().setUuid(uuid).setName("name_" + uuid).setKey("key_" + uuid);
+  }
+
   private CeTask submit(String reportType, String componentUuid) {
+    return underTest.submit(createTaskSubmit(reportType, componentUuid, null));
+  }
+
+  private CeTaskSubmit createTaskSubmit(String type) {
+    return createTaskSubmit(type, null, null);
+  }
+
+  private CeTaskSubmit createTaskSubmit(String type, @Nullable String componentUuid, @Nullable String submitterLogin) {
     CeTaskSubmit.Builder submission = underTest.prepareSubmit();
-    submission.setType(reportType);
+    submission.setType(type);
     submission.setComponentUuid(componentUuid);
-    return underTest.submit(submission.build());
+    submission.setSubmitterLogin(submitterLogin);
+    return submission.build();
   }
 
   private CeTaskResult newTaskResult(Long snapshotId) {
@@ -256,4 +355,10 @@ public class CeQueueImplTest {
     when(taskResult.getSnapshotId()).thenReturn(snapshotId);
     return taskResult;
   }
+
+  private ComponentDto insertComponent(ComponentDto componentDto) {
+    dbTester.getDbClient().componentDao().insert(session, componentDto);
+    session.commit();
+    return componentDto;
+  }
 }