]> source.dussan.org Git - sonarqube.git/commitdiff
SONAR-8985 SONAR-9040 populates CE_QUEUE.WORKER_UUID and EXECUTION_COUNT
authorSébastien Lesaint <sebastien.lesaint@sonarsource.com>
Mon, 27 Mar 2017 13:03:41 +0000 (15:03 +0200)
committerEric Hartmann <hartmann.eric@gmail.Com>
Thu, 27 Apr 2017 07:23:18 +0000 (09:23 +0200)
12 files changed:
server/sonar-db-dao/src/main/java/org/sonar/db/ce/CeQueueDao.java
server/sonar-db-dao/src/main/java/org/sonar/db/ce/UpdateIf.java
server/sonar-db-dao/src/main/resources/org/sonar/db/ce/CeQueueMapper.xml
server/sonar-db-dao/src/test/java/org/sonar/db/ce/CeQueueDaoTest.java
server/sonar-db-migration/src/main/java/org/sonar/server/platform/db/migration/version/v64/DbVersion64.java
server/sonar-db-migration/src/test/java/org/sonar/server/platform/db/migration/version/v64/DbVersion64Test.java
server/sonar-server/src/main/java/org/sonar/server/computation/queue/InternalCeQueue.java
server/sonar-server/src/main/java/org/sonar/server/computation/queue/InternalCeQueueImpl.java
server/sonar-server/src/main/java/org/sonar/server/computation/taskprocessor/CeWorkerCallableImpl.java
server/sonar-server/src/test/java/org/sonar/ce/queue/CeQueueImplTest.java
server/sonar-server/src/test/java/org/sonar/server/computation/queue/InternalCeQueueImplTest.java
server/sonar-server/src/test/java/org/sonar/server/computation/taskprocessor/CeWorkerCallableImplTest.java

index 2489f33ca055f2c72000bfdcab2273179d5e3d79..73e90cfcfda7679f7b39c680a1e77e85f094d318 100644 (file)
@@ -109,20 +109,21 @@ public class CeQueueDao implements Dao {
     return mapper(dbSession).countByStatusAndComponentUuid(status, componentUuid);
   }
 
-  public Optional<CeQueueDto> peek(DbSession session) {
+  public Optional<CeQueueDto> peek(DbSession session, String workerUuid) {
     List<EligibleTaskDto> eligibles = mapper(session).selectEligibleForPeek(ONE_ROW_LIMIT);
     if (eligibles.isEmpty()) {
       return Optional.absent();
     }
 
     EligibleTaskDto eligible = eligibles.get(0);
-    return tryToPeek(session, eligible);
+    return tryToPeek(session, eligible, workerUuid);
   }
 
-  private Optional<CeQueueDto> tryToPeek(DbSession session, EligibleTaskDto eligible) {
+  private Optional<CeQueueDto> tryToPeek(DbSession session, EligibleTaskDto eligible, String workerUuid) {
+    long now = system2.now();
     int touchedRows = mapper(session).updateIf(eligible.getUuid(),
-      new UpdateIf.NewProperties(IN_PROGRESS, system2.now(), system2.now()),
-      new UpdateIf.OldProperties(PENDING));
+      new UpdateIf.NewProperties(IN_PROGRESS, workerUuid, eligible.getExecutionCount() + 1, now, now),
+      new UpdateIf.OldProperties(PENDING, eligible.getExecutionCount()));
     if (touchedRows != 1) {
       return Optional.absent();
     }
index e5912fcef358f2ab9c205cd3eea5a35389b2717a..01b552aac02266eb0e17defee2f0f05fc005426e 100644 (file)
@@ -20,6 +20,7 @@
 package org.sonar.db.ce;
 
 import javax.annotation.CheckForNull;
+import javax.annotation.Nullable;
 import javax.annotation.concurrent.Immutable;
 
 import static java.util.Objects.requireNonNull;
@@ -32,11 +33,16 @@ final class UpdateIf {
   @Immutable
   public static class NewProperties {
     private final CeQueueDto.Status status;
+    private final String workerUuid;
+    private final int executionCount;
     private final Long startedAt;
     private final long updatedAt;
 
-    NewProperties(CeQueueDto.Status status, Long startedAt, long updatedAt) {
+    NewProperties(CeQueueDto.Status status, @Nullable String workerUuid, int executionCount,
+      Long startedAt, long updatedAt) {
       this.status = requireNonNull(status, "status can't be null");
+      this.workerUuid = workerUuid;
+      this.executionCount = executionCount;
       this.startedAt = startedAt;
       this.updatedAt = updatedAt;
     }
@@ -45,6 +51,15 @@ final class UpdateIf {
       return status;
     }
 
+    @CheckForNull
+    public String getWorkerUuid() {
+      return workerUuid;
+    }
+
+    public int getExecutionCount() {
+      return executionCount;
+    }
+
     @CheckForNull
     public Long getStartedAt() {
       return startedAt;
@@ -58,14 +73,20 @@ final class UpdateIf {
   @Immutable
   public static class OldProperties {
     private final CeQueueDto.Status status;
+    private final int executionCount;
 
-    OldProperties(CeQueueDto.Status status) {
+    OldProperties(CeQueueDto.Status status, int executionCount) {
       this.status = requireNonNull(status, "status can't be null");
+      this.executionCount = executionCount;
     }
 
     public CeQueueDto.Status getStatus() {
       return status;
     }
+
+    public int getExecutionCount() {
+      return executionCount;
+    }
   }
 
 }
index 9c36c58406784012b800d18f17814df502d202b7..657c8a9d526cc0101930c193138384951ef30205 100644 (file)
   <update id="updateIf" parameterType="map">
     update ce_queue set
       status=#{new.status,jdbcType=VARCHAR},
+      execution_count=#{new.executionCount,jdbcType=INTEGER},
+      worker_uuid=#{new.workerUuid,jdbcType=VARCHAR},
       started_at=#{new.startedAt,jdbcType=BIGINT},
       updated_at=#{new.updatedAt,jdbcType=BIGINT}
     where
       uuid=#{uuid,jdbcType=VARCHAR}
       and status=#{old.status,jdbcType=VARCHAR}
+      and execution_count=#{old.executionCount,jdbcType=INTEGER}
   </update>
 
   <delete id="deleteByUuid">
index 2459f78ec02faf6fd014b8eaa45208d85ce71e8c..2290193f241884c07416219c985abfcb55d781b1 100644 (file)
@@ -54,7 +54,8 @@ public class CeQueueDaoTest {
   private static final String TASK_UUID_3 = "TASK_3";
   private static final String SELECT_QUEUE_UUID_AND_STATUS_QUERY = "select uuid,status from ce_queue";
   private static final String SUBMITTER_LOGIN = "henri";
-  private static final String WORKER_UUID = "worker uuid";
+  private static final String WORKER_UUID_1 = "worker uuid 1";
+  private static final String WORKER_UUID_2 = "worker uuid 2";
   private static final int EXECUTION_COUNT = 42;
 
   private TestSystem2 system2 = new TestSystem2().setNow(INIT_TIME);
@@ -62,30 +63,31 @@ public class CeQueueDaoTest {
   @Rule
   public DbTester db = DbTester.create(system2);
 
+  private System2 mockedSystem2 = mock(System2.class);
+
   private CeQueueDao underTest = new CeQueueDao(system2);
+  private CeQueueDao underTestWithSystem2Mock = new CeQueueDao(mockedSystem2);
 
   @Test
   public void insert_populates_createdAt_and_updateAt_from_System2_with_same_value_if_any_is_not_set() {
-    System2 system2 = mock(System2.class);
-    CeQueueDao ceQueueDao = new CeQueueDao(system2);
     long now = 1_334_333L;
     CeQueueDto dto = new CeQueueDto()
       .setTaskType(CeTaskTypes.REPORT)
       .setComponentUuid(COMPONENT_UUID_1)
       .setStatus(PENDING)
       .setSubmitterLogin(SUBMITTER_LOGIN)
-      .setWorkerUuid(WORKER_UUID)
+      .setWorkerUuid(WORKER_UUID_1)
       .setExecutionCount(EXECUTION_COUNT);
 
-    mockSystem2ForSingleCall(system2, now);
-    ceQueueDao.insert(db.getSession(), dto.setUuid(TASK_UUID_1));
-    mockSystem2ForSingleCall(system2, now);
-    ceQueueDao.insert(db.getSession(), dto.setUuid(TASK_UUID_2).setCreatedAt(8_000_999L).setUpdatedAt(0));
-    mockSystem2ForSingleCall(system2, now);
-    ceQueueDao.insert(db.getSession(), dto.setUuid(TASK_UUID_3).setCreatedAt(0).setUpdatedAt(8_000_999L));
-    mockSystem2ForSingleCall(system2, now);
+    mockSystem2ForSingleCall(now);
+    underTestWithSystem2Mock.insert(db.getSession(), dto.setUuid(TASK_UUID_1));
+    mockSystem2ForSingleCall(now);
+    underTestWithSystem2Mock.insert(db.getSession(), dto.setUuid(TASK_UUID_2).setCreatedAt(8_000_999L).setUpdatedAt(0));
+    mockSystem2ForSingleCall(now);
+    underTestWithSystem2Mock.insert(db.getSession(), dto.setUuid(TASK_UUID_3).setCreatedAt(0).setUpdatedAt(8_000_999L));
+    mockSystem2ForSingleCall(now);
     String uuid4 = "uuid 4";
-    ceQueueDao.insert(db.getSession(), dto.setUuid(uuid4).setCreatedAt(6_888_777L).setUpdatedAt(8_000_999L));
+    underTestWithSystem2Mock.insert(db.getSession(), dto.setUuid(uuid4).setCreatedAt(6_888_777L).setUpdatedAt(8_000_999L));
     db.getSession().commit();
 
     Stream.of(TASK_UUID_1, TASK_UUID_2, TASK_UUID_3)
@@ -96,7 +98,7 @@ public class CeQueueDaoTest {
         assertThat(saved.getComponentUuid()).isEqualTo(COMPONENT_UUID_1);
         assertThat(saved.getStatus()).isEqualTo(PENDING);
         assertThat(saved.getSubmitterLogin()).isEqualTo(SUBMITTER_LOGIN);
-        assertThat(saved.getWorkerUuid()).isEqualTo(WORKER_UUID);
+        assertThat(saved.getWorkerUuid()).isEqualTo(WORKER_UUID_1);
         assertThat(saved.getExecutionCount()).isEqualTo(EXECUTION_COUNT);
         assertThat(saved.getCreatedAt()).isEqualTo(now);
         assertThat(saved.getUpdatedAt()).isEqualTo(now);
@@ -108,18 +110,13 @@ public class CeQueueDaoTest {
     assertThat(saved.getComponentUuid()).isEqualTo(COMPONENT_UUID_1);
     assertThat(saved.getStatus()).isEqualTo(PENDING);
     assertThat(saved.getSubmitterLogin()).isEqualTo(SUBMITTER_LOGIN);
-    assertThat(saved.getWorkerUuid()).isEqualTo(WORKER_UUID);
+    assertThat(saved.getWorkerUuid()).isEqualTo(WORKER_UUID_1);
     assertThat(saved.getExecutionCount()).isEqualTo(EXECUTION_COUNT);
     assertThat(saved.getCreatedAt()).isEqualTo(6_888_777L);
     assertThat(saved.getUpdatedAt()).isEqualTo(8_000_999L);
     assertThat(saved.getStartedAt()).isNull();
   }
 
-  private void mockSystem2ForSingleCall(System2 system2, long now) {
-    Mockito.reset(system2);
-    when(system2.now()).thenReturn(now).thenThrow(new IllegalStateException("now should be called only once"));
-  }
-
   @Test
   public void test_selectByUuid() {
     insert(TASK_UUID_1, COMPONENT_UUID_1, PENDING);
@@ -182,13 +179,57 @@ public class CeQueueDaoTest {
     verifyCeQueueStatuses(TASK_UUID_1, PENDING, TASK_UUID_2, PENDING, TASK_UUID_3, PENDING);
   }
 
+  @Test
+  public void resetAllToPendingStatus_updates_updatedAt() {
+    long now = 1_334_333L;
+    insert(TASK_UUID_1, COMPONENT_UUID_1, IN_PROGRESS);
+    insert(TASK_UUID_2, COMPONENT_UUID_1, IN_PROGRESS);
+    mockSystem2ForSingleCall(now);
+
+    underTestWithSystem2Mock.resetAllToPendingStatus(db.getSession());
+
+    assertThat(underTest.selectByUuid(db.getSession(), TASK_UUID_1).get().getUpdatedAt()).isEqualTo(now);
+    assertThat(underTest.selectByUuid(db.getSession(), TASK_UUID_2).get().getUpdatedAt()).isEqualTo(now);
+  }
+
+  @Test
+  public void resetAllToPendingStatus_resets_startedAt() {
+    assertThat(insert(TASK_UUID_1, COMPONENT_UUID_1, PENDING).getStartedAt()).isNull();
+    assertThat(underTest.peek(db.getSession(), WORKER_UUID_1).get().getUuid()).isEqualTo(TASK_UUID_1);
+    assertThat(underTest.selectByUuid(db.getSession(), TASK_UUID_1).get().getStartedAt()).isNotNull();
+
+    underTest.resetAllToPendingStatus(db.getSession());
+
+    assertThat(underTest.selectByUuid(db.getSession(), TASK_UUID_1).get().getStartedAt()).isNull();
+  }
+
+  @Test
+  public void resetAllToPendingStatus_does_not_reset_workerUuid_nor_executionCount() {
+    CeQueueDto dto = new CeQueueDto()
+      .setUuid(TASK_UUID_1)
+      .setTaskType(CeTaskTypes.REPORT)
+      .setComponentUuid(COMPONENT_UUID_1)
+      .setStatus(IN_PROGRESS)
+      .setSubmitterLogin(SUBMITTER_LOGIN)
+      .setWorkerUuid(WORKER_UUID_1)
+      .setExecutionCount(EXECUTION_COUNT);
+    underTest.insert(db.getSession(), dto);
+    db.commit();
+
+    underTest.resetAllToPendingStatus(db.getSession());
+
+    CeQueueDto saved = underTest.selectByUuid(db.getSession(), TASK_UUID_1).get();
+    assertThat(saved.getWorkerUuid()).isEqualTo(WORKER_UUID_1);
+    assertThat(saved.getExecutionCount()).isEqualTo(EXECUTION_COUNT);
+  }
+
   @Test
   public void peek_none_if_no_pendings() throws Exception {
-    assertThat(underTest.peek(db.getSession()).isPresent()).isFalse();
+    assertThat(underTest.peek(db.getSession(), WORKER_UUID_1).isPresent()).isFalse();
 
     // not pending, but in progress
     insert(TASK_UUID_1, COMPONENT_UUID_1, IN_PROGRESS);
-    assertThat(underTest.peek(db.getSession()).isPresent()).isFalse();
+    assertThat(underTest.peek(db.getSession(), WORKER_UUID_1).isPresent()).isFalse();
   }
 
   @Test
@@ -201,21 +242,25 @@ public class CeQueueDaoTest {
     verifyCeQueueStatuses(TASK_UUID_1, PENDING, TASK_UUID_2, PENDING);
 
     // peek first one
-    Optional<CeQueueDto> peek = underTest.peek(db.getSession());
+    Optional<CeQueueDto> peek = underTest.peek(db.getSession(), WORKER_UUID_1);
     assertThat(peek.isPresent()).isTrue();
     assertThat(peek.get().getUuid()).isEqualTo(TASK_UUID_1);
     assertThat(peek.get().getStatus()).isEqualTo(IN_PROGRESS);
+    assertThat(peek.get().getWorkerUuid()).isEqualTo(WORKER_UUID_1);
+    assertThat(peek.get().getExecutionCount()).isEqualTo(1);
     verifyCeQueueStatuses(TASK_UUID_1, IN_PROGRESS, TASK_UUID_2, PENDING);
 
     // peek second one
-    peek = underTest.peek(db.getSession());
+    peek = underTest.peek(db.getSession(), WORKER_UUID_2);
     assertThat(peek.isPresent()).isTrue();
     assertThat(peek.get().getUuid()).isEqualTo(TASK_UUID_2);
     assertThat(peek.get().getStatus()).isEqualTo(IN_PROGRESS);
+    assertThat(peek.get().getWorkerUuid()).isEqualTo(WORKER_UUID_2);
+    assertThat(peek.get().getExecutionCount()).isEqualTo(1);
     verifyCeQueueStatuses(TASK_UUID_1, IN_PROGRESS, TASK_UUID_2, IN_PROGRESS);
 
     // no more pendings
-    assertThat(underTest.peek(db.getSession()).isPresent()).isFalse();
+    assertThat(underTest.peek(db.getSession(), WORKER_UUID_1).isPresent()).isFalse();
   }
 
   @Test
@@ -225,19 +270,23 @@ public class CeQueueDaoTest {
     system2.setNow(INIT_TIME + 3_000_000);
     insert(TASK_UUID_2, COMPONENT_UUID_1, PENDING);
 
-    Optional<CeQueueDto> peek = underTest.peek(db.getSession());
+    Optional<CeQueueDto> peek = underTest.peek(db.getSession(), WORKER_UUID_1);
     assertThat(peek.isPresent()).isTrue();
     assertThat(peek.get().getUuid()).isEqualTo(TASK_UUID_1);
+    assertThat(peek.get().getWorkerUuid()).isEqualTo(WORKER_UUID_1);
+    assertThat(peek.get().getExecutionCount()).isEqualTo(1);
     verifyCeQueueStatuses(TASK_UUID_1, IN_PROGRESS, TASK_UUID_2, PENDING);
 
     // do not peek second task as long as the first one is in progress
-    peek = underTest.peek(db.getSession());
+    peek = underTest.peek(db.getSession(), WORKER_UUID_1);
     assertThat(peek.isPresent()).isFalse();
 
     // first one is finished
     underTest.deleteByUuid(db.getSession(), TASK_UUID_1);
-    peek = underTest.peek(db.getSession());
+    peek = underTest.peek(db.getSession(), WORKER_UUID_2);
     assertThat(peek.get().getUuid()).isEqualTo(TASK_UUID_2);
+    assertThat(peek.get().getWorkerUuid()).isEqualTo(WORKER_UUID_2);
+    assertThat(peek.get().getExecutionCount()).isEqualTo(1);
   }
 
   @Test
@@ -372,7 +421,7 @@ public class CeQueueDaoTest {
     db.commit();
   }
 
-  private void insert(String uuid, String componentUuid, CeQueueDto.Status status) {
+  private CeQueueDto insert(String uuid, String componentUuid, CeQueueDto.Status status) {
     CeQueueDto dto = new CeQueueDto();
     dto.setUuid(uuid);
     dto.setTaskType(CeTaskTypes.REPORT);
@@ -381,6 +430,7 @@ public class CeQueueDaoTest {
     dto.setSubmitterLogin("henri");
     underTest.insert(db.getSession(), dto);
     db.getSession().commit();
+    return dto;
   }
 
   private static Iterable<Map<String, Object>> upperizeKeys(List<Map<String, Object>> select) {
@@ -417,4 +467,11 @@ public class CeQueueDaoTest {
   private static Map<String, Object> rowMap(String uuid, CeQueueDto.Status status) {
     return ImmutableMap.of("UUID", uuid, "STATUS", status.name());
   }
+
+  private void mockSystem2ForSingleCall(long now) {
+    Mockito.reset(mockedSystem2);
+    when(mockedSystem2.now())
+      .thenReturn(now)
+      .thenThrow(new IllegalStateException("now should be called only once"));
+  }
 }
index d2fa0263fd8bba9c7c866fbbe6251526f9cd19f5..8c619f53a08b78ad7773ec236c48483cb67a8547 100644 (file)
@@ -57,9 +57,8 @@ public class DbVersion64 implements DbVersion {
       .add(1625, "Populate column ORGANIZATIONS.DEFAULT_GROUP_ID", PopulateColumnDefaultGroupIdOfOrganizations.class)
       .add(1626, "Clean orphan rows in table GROUPS_USERS", CleanOrphanRowsInGroupsUsers.class)
       .add(1627, "Delete permission templates linked to removed users", DeletePermissionTemplatesLinkedToRemovedUsers.class)
-    ;
-      .add(1628, "Add columns CE_QUEUE.WORKER_UUID and EXECUTION_COUNT", AddCeQueueWorkerUuidAndExecutionCount.class);
-      .add(1629, "Make CE_QUEUE.EXECUTION_COUNT not nullable", MakeCeQueueExecutionCountNotNullable.class);
+      .add(1628, "Add columns CE_QUEUE.WORKER_UUID and EXECUTION_COUNT", AddCeQueueWorkerUuidAndExecutionCount.class)
+      .add(1629, "Make CE_QUEUE.EXECUTION_COUNT not nullable", MakeCeQueueExecutionCountNotNullable.class)
       .add(1630, "Add columns CE_ACTIVITY.WORKER_UUID and EXECUTION_COUNT", AddCeActivityWorkerUuidAndExecutionCount.class)
       .add(1631, "Make columns CE_ACTIVITY.EXECUTION_COUNT not nullable", MakeCeActivityExecutionCountNotNullable.class);
   }
index cd0e721c77b7597d6ad84cb638e809584688f393..e1cfd088962485ad551376a0465c35cab92de59d 100644 (file)
@@ -35,7 +35,7 @@ public class DbVersion64Test {
 
   @Test
   public void verify_migration_count() {
-    verifyMigrationCount(underTest, 31);
+    verifyMigrationCount(underTest, 32);
   }
 }
 
index 2cc2f066887cba86c8d3e4175a1f2762d45e1390..a0280f3a667f140f2944ab218f81a799d56405a7 100644 (file)
@@ -46,7 +46,7 @@ public interface InternalCeQueue extends CeQueue {
    *
    * <p>An unchecked exception may be thrown on technical errors (db connection, ...).</p>
    */
-  Optional<CeTask> peek();
+  Optional<CeTask> peek(String workerUuid);
 
   /**
    * Removes all the tasks from the queue, whatever their status. They are marked
index 072e55bc174e8cbd83532b33ac09e814ae15f336..d1aaf4786eb4a7bece4c3aa832096786f23fc60c 100644 (file)
@@ -43,6 +43,7 @@ import org.sonar.server.organization.DefaultOrganizationProvider;
 
 import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkState;
+import static java.util.Objects.requireNonNull;
 
 @ComputeEngineSide
 public class InternalCeQueueImpl extends CeQueueImpl implements InternalCeQueue {
@@ -63,12 +64,14 @@ public class InternalCeQueueImpl extends CeQueueImpl implements InternalCeQueue
   }
 
   @Override
-  public Optional<CeTask> peek() {
+  public Optional<CeTask> peek(String workerUuid) {
+    requireNonNull(workerUuid, "workerUuid can't be null");
+
     if (peekPaused.get()) {
       return Optional.absent();
     }
     try (DbSession dbSession = dbClient.openSession(false)) {
-      Optional<CeQueueDto> dto = dbClient.ceQueueDao().peek(dbSession);
+      Optional<CeQueueDto> dto = dbClient.ceQueueDao().peek(dbSession, workerUuid);
       CeTask task = null;
       if (dto.isPresent()) {
         task = loadTask(dbSession, dto.get());
index 1580220f351443f48f2046835132135d7a2c7a92..a21c74cffa40f78daf8b2811994396ffa32bff6f 100644 (file)
@@ -64,7 +64,7 @@ public class CeWorkerCallableImpl implements CeWorkerCallable {
 
   private Optional<CeTask> tryAndFindTaskToExecute() {
     try {
-      return queue.peek();
+      return queue.peek("UNKNOWN" /*FIXME provide a real worker uuid*/);
     } catch (Exception e) {
       LOG.error("Failed to pop the queue of analysis reports", e);
     }
index 8a876ff533cd0176b8891e3b028a700ca75e84cd..2c7a38560c33ebaf7f5516363ada58dfa25e16d1 100644 (file)
@@ -45,6 +45,7 @@ import static org.hamcrest.Matchers.startsWith;
 
 public class CeQueueImplTest {
 
+  private static final String WORKER_UUID = "workerUuid";
   private System2 system2 = new TestSystem2().setNow(1_450_000_000_000L);
 
   @Rule
@@ -146,7 +147,7 @@ public class CeQueueImplTest {
 
     CeTask task = submit(CeTaskTypes.REPORT, "PROJECT_1");
 
-    dbTester.getDbClient().ceQueueDao().peek(session);
+    dbTester.getDbClient().ceQueueDao().peek(session, WORKER_UUID);
 
     underTest.cancel(task.getUuid());
   }
@@ -157,7 +158,7 @@ public class CeQueueImplTest {
     CeTask pendingTask1 = submit(CeTaskTypes.REPORT, "PROJECT_2");
     CeTask pendingTask2 = submit(CeTaskTypes.REPORT, "PROJECT_3");
 
-    dbTester.getDbClient().ceQueueDao().peek(session);
+    dbTester.getDbClient().ceQueueDao().peek(session, WORKER_UUID);
 
     int canceledCount = underTest.cancelAll();
     assertThat(canceledCount).isEqualTo(2);
index 6b4c2c29cb70db9cca78e798bcf6a4c1a430ce34..e577395d822b2c705e8a88c7bd90f8e058ab4b1c 100644 (file)
@@ -56,6 +56,8 @@ import static org.mockito.Mockito.when;
 public class InternalCeQueueImplTest {
 
   private static final String AN_ANALYSIS_UUID = "U1";
+  private static final String WORKER_UUID_1 = "worker uuid 1";
+  private static final String WORKER_UUID_2 = "worker uuid 2";
 
   private System2 system2 = new TestSystem2().setNow(1_450_000_000_000L);
 
@@ -136,15 +138,23 @@ public class InternalCeQueueImplTest {
     verifyCeTask(taskSubmit2, tasks.get(1), null);
   }
 
+  @Test
+  public void peek_throws_NPE_if_workerUUid_is_null() {
+    expectedException.expect(NullPointerException.class);
+    expectedException.expectMessage("workerUuid can't be null");
+
+    underTest.peek(null);
+  }
+
   @Test
   public void test_remove() {
     CeTask task = submit(CeTaskTypes.REPORT, "PROJECT_1");
-    Optional<CeTask> peek = underTest.peek();
+    Optional<CeTask> peek = underTest.peek(WORKER_UUID_1);
     underTest.remove(peek.get(), CeActivityDto.Status.SUCCESS, null, null);
 
     // queue is empty
     assertThat(dbTester.getDbClient().ceQueueDao().selectByUuid(dbTester.getSession(), task.getUuid()).isPresent()).isFalse();
-    assertThat(underTest.peek().isPresent()).isFalse();
+    assertThat(underTest.peek(WORKER_UUID_2).isPresent()).isFalse();
 
     // available in history
     Optional<CeActivityDto> history = dbTester.getDbClient().ceActivityDao().selectByUuid(dbTester.getSession(), task.getUuid());
@@ -173,7 +183,7 @@ public class InternalCeQueueImplTest {
   @Test
   public void remove_does_not_set_analysisUuid_in_CeActivity_when_CeTaskResult_has_no_analysis_uuid() {
     CeTask task = submit(CeTaskTypes.REPORT, "PROJECT_1");
-    Optional<CeTask> peek = underTest.peek();
+    Optional<CeTask> peek = underTest.peek(WORKER_UUID_1);
     underTest.remove(peek.get(), CeActivityDto.Status.SUCCESS, newTaskResult(null), null);
 
     // available in history
@@ -186,7 +196,7 @@ public class InternalCeQueueImplTest {
   public void remove_sets_snapshotId_in_CeActivity_when_CeTaskResult_has_no_snapshot_id() {
     CeTask task = submit(CeTaskTypes.REPORT, "PROJECT_1");
 
-    Optional<CeTask> peek = underTest.peek();
+    Optional<CeTask> peek = underTest.peek(WORKER_UUID_2);
     underTest.remove(peek.get(), CeActivityDto.Status.SUCCESS, newTaskResult(AN_ANALYSIS_UUID), null);
 
     // available in history
@@ -200,7 +210,7 @@ public class InternalCeQueueImplTest {
     Throwable error = new NullPointerException("Fake NPE to test persistence to DB");
 
     CeTask task = submit(CeTaskTypes.REPORT, "PROJECT_1");
-    Optional<CeTask> peek = underTest.peek();
+    Optional<CeTask> peek = underTest.peek(WORKER_UUID_1);
     underTest.remove(peek.get(), CeActivityDto.Status.FAILED, null, error);
 
     Optional<CeActivityDto> activityDto = dbTester.getDbClient().ceActivityDao().selectByUuid(session, task.getUuid());
@@ -251,14 +261,14 @@ public class InternalCeQueueImplTest {
   public void test_peek() throws Exception {
     CeTask task = submit(CeTaskTypes.REPORT, "PROJECT_1");
 
-    Optional<CeTask> peek = underTest.peek();
+    Optional<CeTask> peek = underTest.peek(WORKER_UUID_1);
     assertThat(peek.isPresent()).isTrue();
     assertThat(peek.get().getUuid()).isEqualTo(task.getUuid());
     assertThat(peek.get().getType()).isEqualTo(CeTaskTypes.REPORT);
     assertThat(peek.get().getComponentUuid()).isEqualTo("PROJECT_1");
 
     // no more pending tasks
-    peek = underTest.peek();
+    peek = underTest.peek(WORKER_UUID_2);
     assertThat(peek.isPresent()).isFalse();
   }
 
@@ -284,7 +294,7 @@ public class InternalCeQueueImplTest {
     submit(CeTaskTypes.REPORT, "PROJECT_1");
     underTest.pausePeek();
 
-    Optional<CeTask> peek = underTest.peek();
+    Optional<CeTask> peek = underTest.peek(WORKER_UUID_1);
     assertThat(peek.isPresent()).isFalse();
   }
 
@@ -326,7 +336,7 @@ public class InternalCeQueueImplTest {
     expectedException.expectMessage(startsWith("Task is in progress and can't be canceled"));
 
     CeTask task = submit(CeTaskTypes.REPORT, "PROJECT_1");
-    underTest.peek();
+    underTest.peek(WORKER_UUID_2);
 
     underTest.cancel(task.getUuid());
   }
@@ -336,7 +346,7 @@ public class InternalCeQueueImplTest {
     CeTask inProgressTask = submit(CeTaskTypes.REPORT, "PROJECT_1");
     CeTask pendingTask1 = submit(CeTaskTypes.REPORT, "PROJECT_2");
     CeTask pendingTask2 = submit(CeTaskTypes.REPORT, "PROJECT_3");
-    underTest.peek();
+    underTest.peek(WORKER_UUID_2);
 
     int canceledCount = underTest.cancelAll();
     assertThat(canceledCount).isEqualTo(2);
index d7959753c6731814304055b42d50596c1779751d..5de587e43b531c9660f705876f90604e11499699 100644 (file)
@@ -44,6 +44,8 @@ import static org.mockito.Mockito.when;
 
 public class CeWorkerCallableImplTest {
 
+  private static final String UNKNOWN_WORKER_UUID = "UNKNOWN";
+
   @Rule
   public CeTaskProcessorRepositoryRule taskProcessorRepository = new CeTaskProcessorRepositoryRule();
   @Rule
@@ -57,7 +59,7 @@ public class CeWorkerCallableImplTest {
 
   @Test
   public void no_pending_tasks_in_queue() throws Exception {
-    when(queue.peek()).thenReturn(Optional.<CeTask>absent());
+    when(queue.peek(UNKNOWN_WORKER_UUID)).thenReturn(Optional.<CeTask>absent());
 
     assertThat(underTest.call()).isFalse();
 
@@ -68,7 +70,7 @@ public class CeWorkerCallableImplTest {
   public void fail_when_no_CeTaskProcessor_is_found_in_repository() throws Exception {
     CeTask task = createCeTask(null);
     taskProcessorRepository.setNoProcessorForTask(CeTaskTypes.REPORT);
-    when(queue.peek()).thenReturn(Optional.of(task));
+    when(queue.peek(UNKNOWN_WORKER_UUID)).thenReturn(Optional.of(task));
 
     assertThat(underTest.call()).isTrue();
 
@@ -81,7 +83,7 @@ public class CeWorkerCallableImplTest {
   public void peek_and_process_task() throws Exception {
     CeTask task = createCeTask(null);
     taskProcessorRepository.setProcessorForTask(task.getType(), taskProcessor);
-    when(queue.peek()).thenReturn(Optional.of(task));
+    when(queue.peek(UNKNOWN_WORKER_UUID)).thenReturn(Optional.of(task));
 
     assertThat(underTest.call()).isTrue();
 
@@ -94,7 +96,7 @@ public class CeWorkerCallableImplTest {
   @Test
   public void fail_to_process_task() throws Exception {
     CeTask task = createCeTask(null);
-    when(queue.peek()).thenReturn(Optional.of(task));
+    when(queue.peek(UNKNOWN_WORKER_UUID)).thenReturn(Optional.of(task));
     taskProcessorRepository.setProcessorForTask(task.getType(), taskProcessor);
     Throwable error = makeTaskProcessorFail(task);
 
@@ -108,7 +110,7 @@ public class CeWorkerCallableImplTest {
 
   @Test
   public void do_not_display_submitter_param_in_log_when_submitterLogin_is_not_set_in_case_of_success() throws Exception {
-    when(queue.peek()).thenReturn(Optional.of(createCeTask(null)));
+    when(queue.peek(UNKNOWN_WORKER_UUID)).thenReturn(Optional.of(createCeTask(null)));
     taskProcessorRepository.setProcessorForTask(CeTaskTypes.REPORT, taskProcessor);
 
     underTest.call();
@@ -123,7 +125,7 @@ public class CeWorkerCallableImplTest {
   @Test
   public void do_not_display_submitter_param_in_log_when_submitterLogin_is_not_set_in_case_of_error() throws Exception {
     CeTask ceTask = createCeTask(null);
-    when(queue.peek()).thenReturn(Optional.of(ceTask));
+    when(queue.peek(UNKNOWN_WORKER_UUID)).thenReturn(Optional.of(ceTask));
     taskProcessorRepository.setProcessorForTask(ceTask.getType(), taskProcessor);
     makeTaskProcessorFail(ceTask);
 
@@ -142,7 +144,7 @@ public class CeWorkerCallableImplTest {
 
   @Test
   public void display_submitterLogin_in_logs_when_set_in_case_of_success() throws Exception {
-    when(queue.peek()).thenReturn(Optional.of(createCeTask("FooBar")));
+    when(queue.peek(UNKNOWN_WORKER_UUID)).thenReturn(Optional.of(createCeTask("FooBar")));
     taskProcessorRepository.setProcessorForTask(CeTaskTypes.REPORT, taskProcessor);
 
     underTest.call();
@@ -158,7 +160,7 @@ public class CeWorkerCallableImplTest {
   @Test
   public void display_submitterLogin_in_logs_when_set_in_case_of_error() throws Exception {
     CeTask ceTask = createCeTask("FooBar");
-    when(queue.peek()).thenReturn(Optional.of(ceTask));
+    when(queue.peek(UNKNOWN_WORKER_UUID)).thenReturn(Optional.of(ceTask));
     taskProcessorRepository.setProcessorForTask(ceTask.getType(), taskProcessor);
     makeTaskProcessorFail(ceTask);
 
@@ -177,7 +179,7 @@ public class CeWorkerCallableImplTest {
   public void display_start_stop_at_debug_level_for_console_if_DEBUG_is_enabled_and_task_successful() throws Exception {
     logTester.setLevel(LoggerLevel.DEBUG);
 
-    when(queue.peek()).thenReturn(Optional.of(createCeTask("FooBar")));
+    when(queue.peek(UNKNOWN_WORKER_UUID)).thenReturn(Optional.of(createCeTask("FooBar")));
     taskProcessorRepository.setProcessorForTask(CeTaskTypes.REPORT, taskProcessor);
 
     underTest.call();
@@ -195,7 +197,7 @@ public class CeWorkerCallableImplTest {
     logTester.setLevel(LoggerLevel.DEBUG);
 
     CeTask ceTask = createCeTask("FooBar");
-    when(queue.peek()).thenReturn(Optional.of(ceTask));
+    when(queue.peek(UNKNOWN_WORKER_UUID)).thenReturn(Optional.of(ceTask));
     taskProcessorRepository.setProcessorForTask(CeTaskTypes.REPORT, taskProcessor);
     makeTaskProcessorFail(ceTask);