]> source.dussan.org Git - sonarqube.git/commitdiff
SONAR-13400 exclude portfolio (but not applications) from worker peek during indexation
authorPierre <pierre.guillot@sonarsource.com>
Thu, 11 Jun 2020 12:53:08 +0000 (14:53 +0200)
committersonartech <sonartech@sonarsource.com>
Fri, 26 Jun 2020 20:04:57 +0000 (20:04 +0000)
server/sonar-ce-common/src/test/java/org/sonar/ce/queue/CeQueueImplTest.java
server/sonar-ce/src/main/java/org/sonar/ce/queue/InternalCeQueue.java
server/sonar-ce/src/main/java/org/sonar/ce/queue/InternalCeQueueImpl.java
server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeWorkerImpl.java
server/sonar-ce/src/test/java/org/sonar/ce/queue/InternalCeQueueImplTest.java
server/sonar-ce/src/test/java/org/sonar/ce/taskprocessor/CeWorkerImplTest.java
server/sonar-db-dao/src/main/java/org/sonar/db/ce/CeQueueDao.java
server/sonar-db-dao/src/main/java/org/sonar/db/ce/CeQueueMapper.java
server/sonar-db-dao/src/main/resources/org/sonar/db/ce/CeQueueMapper.xml
server/sonar-db-dao/src/test/java/org/sonar/db/ce/CeQueueDaoTest.java

index 1c155e145e66bacef49de90c5d986ca333a634be..1bc242c57028b0fbfe2f79b3c387763de6e24606 100644 (file)
@@ -396,7 +396,7 @@ public class CeQueueImplTest {
   @Test
   public void fail_to_cancel_if_in_progress() {
     submit(CeTaskTypes.REPORT, newComponent(randomAlphabetic(11)));
-    CeQueueDto ceQueueDto = db.getDbClient().ceQueueDao().peek(session, WORKER_UUID, false).get();
+    CeQueueDto ceQueueDto = db.getDbClient().ceQueueDao().peek(session, WORKER_UUID, false, false).get();
 
     expectedException.expect(IllegalStateException.class);
     expectedException.expectMessage(startsWith("Task is in progress and can't be canceled"));
@@ -410,7 +410,7 @@ public class CeQueueImplTest {
     CeTask pendingTask1 = submit(CeTaskTypes.REPORT, newComponent(randomAlphabetic(12)));
     CeTask pendingTask2 = submit(CeTaskTypes.REPORT, newComponent(randomAlphabetic(12)));
 
-    db.getDbClient().ceQueueDao().peek(session, WORKER_UUID, false);
+    db.getDbClient().ceQueueDao().peek(session, WORKER_UUID, false, false);
 
     int canceledCount = underTest.cancelAll();
     assertThat(canceledCount).isEqualTo(2);
@@ -437,7 +437,7 @@ public class CeQueueImplTest {
   @Test
   public void pauseWorkers_marks_workers_as_pausing_if_some_tasks_in_progress() {
     submit(CeTaskTypes.REPORT, newComponent(randomAlphabetic(12)));
-    db.getDbClient().ceQueueDao().peek(session, WORKER_UUID, false);
+    db.getDbClient().ceQueueDao().peek(session, WORKER_UUID, false, false);
     // task is in-progress
 
     assertThat(underTest.getWorkersPauseStatus()).isEqualTo(CeQueue.WorkersPauseStatus.RESUMED);
@@ -458,7 +458,7 @@ public class CeQueueImplTest {
   @Test
   public void resumeWorkers_resumes_pausing_workers() {
     submit(CeTaskTypes.REPORT, newComponent(randomAlphabetic(12)));
-    db.getDbClient().ceQueueDao().peek(session, WORKER_UUID, false);
+    db.getDbClient().ceQueueDao().peek(session, WORKER_UUID, false, false);
     // task is in-progress
 
     underTest.pauseWorkers();
@@ -480,7 +480,7 @@ public class CeQueueImplTest {
   @Test
   public void fail_in_progress_task() {
     CeTask task = submit(CeTaskTypes.REPORT, newComponent(randomAlphabetic(12)));
-    CeQueueDto queueDto = db.getDbClient().ceQueueDao().peek(db.getSession(), WORKER_UUID, false).get();
+    CeQueueDto queueDto = db.getDbClient().ceQueueDao().peek(db.getSession(), WORKER_UUID, false, false).get();
 
     underTest.fail(db.getSession(), queueDto, "TIMEOUT", "Failed on timeout");
 
index 30466805efc86a207c6a743744bfbf28d75e23f1..be8f52d7b0881d01a482a703038840952d3c8de9 100644 (file)
@@ -41,6 +41,7 @@ public interface InternalCeQueue extends CeQueue {
    * Does not return anything if workers are paused or being paused (see {@link #getWorkersPauseStatus()}.
    *
    * @param excludeIndexationJob change the underlying request to exclude indexation tasks.
+   * @param excludeViewRefresh change the underlying request to exclude portfolios (but still include APP)
    *
    * <p>Only a single task can be peeked by project.</p>
    *
@@ -49,7 +50,7 @@ public interface InternalCeQueue extends CeQueue {
    * <p>Tasks which have been executed twice already but are still {@link org.sonar.db.ce.CeQueueDto.Status#PENDING}
    * are ignored</p>
    */
-  Optional<CeTask> peek(String workerUuid, boolean excludeIndexationJob);
+  Optional<CeTask> peek(String workerUuid, boolean excludeIndexationJob, boolean excludeViewRefresh);
 
   /**
    * Removes a task from the queue and registers it to past activities. This method
index 711961b1f95cb2201b48a227eea0e1a128d1a74c..279b2f60021a1097a3f9d6344da298c88d625c24 100644 (file)
@@ -73,7 +73,7 @@ public class InternalCeQueueImpl extends CeQueueImpl implements InternalCeQueue
   }
 
   @Override
-  public Optional<CeTask> peek(String workerUuid, boolean excludeIndexationJob) {
+  public Optional<CeTask> peek(String workerUuid, boolean excludeIndexationJob, boolean excludeViewRefresh) {
     requireNonNull(workerUuid, "workerUuid can't be null");
 
     if (computeEngineStatus.getStatus() != ComputeEngineStatus.Status.STARTED || getWorkersPauseStatus() != WorkersPauseStatus.RESUMED) {
@@ -86,7 +86,7 @@ public class InternalCeQueueImpl extends CeQueueImpl implements InternalCeQueue
         dbSession.commit();
         LOG.debug("{} in progress tasks reset for worker uuid {}", i, workerUuid);
       }
-      Optional<CeQueueDto> opt = ceQueueDao.peek(dbSession, workerUuid, excludeIndexationJob);
+      Optional<CeQueueDto> opt = ceQueueDao.peek(dbSession, workerUuid, excludeIndexationJob, excludeViewRefresh);
       if (!opt.isPresent()) {
         return Optional.empty();
       }
index 10c234639f52b44f1529ba0f65c7932df4abc499..c100f2e744aef585225a2f74985e72fab4f31c9a 100644 (file)
@@ -171,7 +171,7 @@ public class CeWorkerImpl implements CeWorker {
       if (indexationTaskLookupEnabled) {
         return tryAndFindTaskToExecuteIncludingIndexation();
       } else {
-        return queue.peek(uuid, true);
+        return queue.peek(uuid, true, false);
       }
     } catch (Exception e) {
       LOG.error("Failed to pop the queue of analysis reports", e);
@@ -181,12 +181,12 @@ public class CeWorkerImpl implements CeWorker {
 
   private Optional<CeTask> tryAndFindTaskToExecuteIncludingIndexation() {
     excludeIndexationJob = !excludeIndexationJob;
-    Optional<CeTask> peek = queue.peek(uuid, excludeIndexationJob);
+    Optional<CeTask> peek = queue.peek(uuid, excludeIndexationJob, true);
     if (peek.isPresent()) {
       return peek;
     }
     if (excludeIndexationJob) {
-      peek = queue.peek(uuid, false);
+      peek = queue.peek(uuid, false, true);
       if (peek.isPresent()) {
         return peek;
       }
index 922da41cdc36d0486f7d583cce6d3712c2176706..dbfcd54a60b6fa2449ad335b9c59aefbbe2935a6 100644 (file)
@@ -159,18 +159,18 @@ public class InternalCeQueueImplTest {
     expectedException.expect(NullPointerException.class);
     expectedException.expectMessage("workerUuid can't be null");
 
-    underTest.peek(null, false);
+    underTest.peek(null, false, false);
   }
 
   @Test
   public void test_remove() {
     CeTask task = submit(CeTaskTypes.REPORT, newProjectDto("PROJECT_1"));
-    Optional<CeTask> peek = underTest.peek(WORKER_UUID_1, false);
+    Optional<CeTask> peek = underTest.peek(WORKER_UUID_1, false, false);
     underTest.remove(peek.get(), CeActivityDto.Status.SUCCESS, null, null);
 
     // queue is empty
     assertThat(db.getDbClient().ceQueueDao().selectByUuid(db.getSession(), task.getUuid()).isPresent()).isFalse();
-    assertThat(underTest.peek(WORKER_UUID_2, false).isPresent()).isFalse();
+    assertThat(underTest.peek(WORKER_UUID_2, false, false).isPresent()).isFalse();
 
     // available in history
     Optional<CeActivityDto> history = db.getDbClient().ceActivityDao().selectByUuid(db.getSession(), task.getUuid());
@@ -199,7 +199,7 @@ public class InternalCeQueueImplTest {
   @Test
   public void remove_does_not_set_analysisUuid_in_CeActivity_when_CeTaskResult_has_no_analysis_uuid() {
     CeTask task = submit(CeTaskTypes.REPORT, newProjectDto("PROJECT_1"));
-    Optional<CeTask> peek = underTest.peek(WORKER_UUID_1, false);
+    Optional<CeTask> peek = underTest.peek(WORKER_UUID_1, false, false);
     underTest.remove(peek.get(), CeActivityDto.Status.SUCCESS, newTaskResult(null), null);
 
     // available in history
@@ -212,7 +212,7 @@ public class InternalCeQueueImplTest {
   public void remove_sets_analysisUuid_in_CeActivity_when_CeTaskResult_has_analysis_uuid() {
     CeTask task = submit(CeTaskTypes.REPORT, newProjectDto("PROJECT_1"));
 
-    Optional<CeTask> peek = underTest.peek(WORKER_UUID_2, false);
+    Optional<CeTask> peek = underTest.peek(WORKER_UUID_2, false, false);
     underTest.remove(peek.get(), CeActivityDto.Status.SUCCESS, newTaskResult(AN_ANALYSIS_UUID), null);
 
     // available in history
@@ -226,7 +226,7 @@ public class InternalCeQueueImplTest {
     Throwable error = new NullPointerException("Fake NPE to test persistence to DB");
 
     CeTask task = submit(CeTaskTypes.REPORT, newProjectDto("PROJECT_1"));
-    Optional<CeTask> peek = underTest.peek(WORKER_UUID_1, false);
+    Optional<CeTask> peek = underTest.peek(WORKER_UUID_1, false, false);
     underTest.remove(peek.get(), CeActivityDto.Status.FAILED, null, error);
 
     Optional<CeActivityDto> activityDto = db.getDbClient().ceActivityDao().selectByUuid(session, task.getUuid());
@@ -242,7 +242,7 @@ public class InternalCeQueueImplTest {
     Throwable error = new TypedExceptionImpl("aType", "aMessage");
 
     CeTask task = submit(CeTaskTypes.REPORT, newProjectDto("PROJECT_1"));
-    Optional<CeTask> peek = underTest.peek(WORKER_UUID_1, false);
+    Optional<CeTask> peek = underTest.peek(WORKER_UUID_1, false, false);
     underTest.remove(peek.get(), CeActivityDto.Status.FAILED, null, error);
 
     CeActivityDto activityDto = db.getDbClient().ceActivityDao().selectByUuid(session, task.getUuid()).get();
@@ -348,7 +348,7 @@ public class InternalCeQueueImplTest {
   public void test_peek() {
     CeTask task = submit(CeTaskTypes.REPORT, newProjectDto("PROJECT_1"));
 
-    Optional<CeTask> peek = underTest.peek(WORKER_UUID_1, false);
+    Optional<CeTask> peek = underTest.peek(WORKER_UUID_1, false, false);
     assertThat(peek.isPresent()).isTrue();
     assertThat(peek.get().getUuid()).isEqualTo(task.getUuid());
     assertThat(peek.get().getType()).isEqualTo(CeTaskTypes.REPORT);
@@ -356,7 +356,7 @@ public class InternalCeQueueImplTest {
     assertThat(peek.get().getMainComponent()).contains(peek.get().getComponent().get());
 
     // no more pending tasks
-    peek = underTest.peek(WORKER_UUID_2, false);
+    peek = underTest.peek(WORKER_UUID_2, false, false);
     assertThat(peek.isPresent()).isFalse();
   }
 
@@ -366,7 +366,7 @@ public class InternalCeQueueImplTest {
     ComponentDto branch = db.components().insertProjectBranch(project);
     CeTask task = submit(CeTaskTypes.REPORT, branch);
 
-    Optional<CeTask> peek = underTest.peek(WORKER_UUID_1, false);
+    Optional<CeTask> peek = underTest.peek(WORKER_UUID_1, false, false);
     assertThat(peek.isPresent()).isTrue();
     assertThat(peek.get().getUuid()).isEqualTo(task.getUuid());
     assertThat(peek.get().getType()).isEqualTo(CeTaskTypes.REPORT);
@@ -374,7 +374,7 @@ public class InternalCeQueueImplTest {
     assertThat(peek.get().getMainComponent()).contains(new CeTask.Component(project.uuid(), project.getDbKey(), project.name()));
 
     // no more pending tasks
-    peek = underTest.peek(WORKER_UUID_2, false);
+    peek = underTest.peek(WORKER_UUID_2, false, false);
     assertThat(peek.isPresent()).isFalse();
   }
 
@@ -383,11 +383,11 @@ public class InternalCeQueueImplTest {
     CeTask task = submit(CeTaskTypes.REPORT, newProjectDto("PROJECT_1"));
     underTest.pauseWorkers();
 
-    Optional<CeTask> peek = underTest.peek(WORKER_UUID_1, false);
+    Optional<CeTask> peek = underTest.peek(WORKER_UUID_1, false, false);
     assertThat(peek).isEmpty();
 
     underTest.resumeWorkers();
-    peek = underTest.peek(WORKER_UUID_1, false);
+    peek = underTest.peek(WORKER_UUID_1, false, false);
     assertThat(peek).isPresent();
     assertThat(peek.get().getUuid()).isEqualTo(task.getUuid());
   }
@@ -401,7 +401,7 @@ public class InternalCeQueueImplTest {
     makeInProgress(dto, "foo");
     db.commit();
 
-    assertThat(underTest.peek(WORKER_UUID_1, false)).isEmpty();
+    assertThat(underTest.peek(WORKER_UUID_1, false, false)).isEmpty();
   }
 
   @Test
@@ -409,7 +409,7 @@ public class InternalCeQueueImplTest {
     submit(CeTaskTypes.REPORT, newProjectDto("PROJECT_1"));
     when(computeEngineStatus.getStatus()).thenReturn(STOPPING);
 
-    Optional<CeTask> peek = underTest.peek(WORKER_UUID_1, false);
+    Optional<CeTask> peek = underTest.peek(WORKER_UUID_1, false, false);
     assertThat(peek.isPresent()).isFalse();
   }
 
@@ -421,7 +421,7 @@ public class InternalCeQueueImplTest {
       .setStatus(CeQueueDto.Status.PENDING));
     db.commit();
 
-    assertThat(underTest.peek(WORKER_UUID_1, false).get().getUuid()).isEqualTo("uuid");
+    assertThat(underTest.peek(WORKER_UUID_1, false, false).get().getUuid()).isEqualTo("uuid");
   }
 
   @Test
@@ -430,7 +430,7 @@ public class InternalCeQueueImplTest {
     CeQueueDto u1 = insertPending("u1");// will be picked-because older than any of the reset ones
     CeQueueDto u2 = insertInProgress("u2", WORKER_UUID_1);// will be reset
 
-    assertThat(underTest.peek(WORKER_UUID_1, false).get().getUuid()).isEqualTo("u0");
+    assertThat(underTest.peek(WORKER_UUID_1, false, false).get().getUuid()).isEqualTo("u0");
 
     verifyUnmodifiedTask(u1);
     verifyResetTask(u2);
@@ -444,7 +444,7 @@ public class InternalCeQueueImplTest {
     CeQueueDto u3 = insertInProgress("u3", WORKER_UUID_1);
     CeQueueDto u4 = insertInProgress("u4", WORKER_UUID_2);
 
-    assertThat(underTest.peek(WORKER_UUID_1, false).get().getUuid()).isEqualTo("u0");
+    assertThat(underTest.peek(WORKER_UUID_1, false, false).get().getUuid()).isEqualTo("u0");
 
     verifyResetTask(u1);
     verifyUnmodifiedTask(u2);
@@ -502,7 +502,7 @@ public class InternalCeQueueImplTest {
   @Test
   public void fail_to_cancel_if_in_progress() {
     CeTask task = submit(CeTaskTypes.REPORT, newProjectDto("PROJECT_1"));
-    underTest.peek(WORKER_UUID_2, false);
+    underTest.peek(WORKER_UUID_2, false, false);
     CeQueueDto queueDto = db.getDbClient().ceQueueDao().selectByUuid(db.getSession(), task.getUuid()).get();
 
     expectedException.expect(IllegalStateException.class);
@@ -516,7 +516,7 @@ public class InternalCeQueueImplTest {
     CeTask inProgressTask = submit(CeTaskTypes.REPORT, newProjectDto("PROJECT_1"));
     CeTask pendingTask1 = submit(CeTaskTypes.REPORT, newProjectDto("PROJECT_2"));
     CeTask pendingTask2 = submit(CeTaskTypes.REPORT, newProjectDto("PROJECT_3"));
-    underTest.peek(WORKER_UUID_2, false);
+    underTest.peek(WORKER_UUID_2, false, false);
 
     int canceledCount = underTest.cancelAll();
     assertThat(canceledCount).isEqualTo(2);
index 84943ca60090bbcc59cff478183f6583de1e1e57..ddb5027720d03ffb5e066f330c2c71bbfbc6cef2 100644 (file)
@@ -19,6 +19,7 @@
  */
 package org.sonar.ce.taskprocessor;
 
+import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -145,7 +146,7 @@ public class CeWorkerImplTest {
 
   @Test
   public void no_pending_tasks_in_queue() throws Exception {
-    when(queue.peek(anyString(), anyBoolean())).thenReturn(Optional.empty());
+    when(queue.peek(anyString(), anyBoolean(), anyBoolean())).thenReturn(Optional.empty());
 
     assertThat(underTest.call()).isEqualTo(NO_TASK);
 
@@ -154,7 +155,7 @@ public class CeWorkerImplTest {
 
   @Test
   public void no_pending_tasks_in_queue_without_listener() throws Exception {
-    when(queue.peek(anyString(), anyBoolean())).thenReturn(Optional.empty());
+    when(queue.peek(anyString(), anyBoolean(), anyBoolean())).thenReturn(Optional.empty());
 
     assertThat(underTestNoListener.call()).isEqualTo(NO_TASK);
 
@@ -165,7 +166,7 @@ public class CeWorkerImplTest {
   public void fail_when_no_CeTaskProcessor_is_found_in_repository() throws Exception {
     CeTask task = createCeTask(null);
     taskProcessorRepository.setNoProcessorForTask(CeTaskTypes.REPORT);
-    when(queue.peek(anyString(), anyBoolean())).thenReturn(Optional.of(task));
+    when(queue.peek(anyString(), anyBoolean(), anyBoolean())).thenReturn(Optional.of(task));
 
     assertThat(underTest.call()).isEqualTo(TASK_PROCESSED);
 
@@ -181,7 +182,7 @@ public class CeWorkerImplTest {
   public void fail_when_no_CeTaskProcessor_is_found_in_repository_without_listener() throws Exception {
     CeTask task = createCeTask(null);
     taskProcessorRepository.setNoProcessorForTask(CeTaskTypes.REPORT);
-    when(queue.peek(anyString(), anyBoolean())).thenReturn(Optional.of(task));
+    when(queue.peek(anyString(), anyBoolean(), anyBoolean())).thenReturn(Optional.of(task));
 
     assertThat(underTestNoListener.call()).isEqualTo(TASK_PROCESSED);
 
@@ -194,7 +195,7 @@ public class CeWorkerImplTest {
   public void peek_and_process_task() throws Exception {
     CeTask task = createCeTask(null);
     taskProcessorRepository.setProcessorForTask(task.getType(), taskProcessor);
-    when(queue.peek(anyString(), anyBoolean())).thenReturn(Optional.of(task));
+    when(queue.peek(anyString(), anyBoolean(), anyBoolean())).thenReturn(Optional.of(task));
 
     assertThat(underTest.call()).isEqualTo(TASK_PROCESSED);
 
@@ -211,7 +212,7 @@ public class CeWorkerImplTest {
   public void peek_and_process_task_without_listeners() throws Exception {
     CeTask task = createCeTask(null);
     taskProcessorRepository.setProcessorForTask(task.getType(), taskProcessor);
-    when(queue.peek(anyString(), anyBoolean())).thenReturn(Optional.of(task));
+    when(queue.peek(anyString(), anyBoolean(), anyBoolean())).thenReturn(Optional.of(task));
 
     assertThat(underTestNoListener.call()).isEqualTo(TASK_PROCESSED);
 
@@ -224,7 +225,7 @@ public class CeWorkerImplTest {
   @Test
   public void fail_to_process_task() throws Exception {
     CeTask task = createCeTask(null);
-    when(queue.peek(anyString(), anyBoolean())).thenReturn(Optional.of(task));
+    when(queue.peek(anyString(), anyBoolean(), anyBoolean())).thenReturn(Optional.of(task));
     taskProcessorRepository.setProcessorForTask(task.getType(), taskProcessor);
     Throwable error = makeTaskProcessorFail(task);
 
@@ -242,7 +243,7 @@ public class CeWorkerImplTest {
   @Test
   public void fail_to_process_task_without_listeners() throws Exception {
     CeTask task = createCeTask(null);
-    when(queue.peek(anyString(), anyBoolean())).thenReturn(Optional.of(task));
+    when(queue.peek(anyString(), anyBoolean(), anyBoolean())).thenReturn(Optional.of(task));
     taskProcessorRepository.setProcessorForTask(task.getType(), taskProcessor);
     Throwable error = makeTaskProcessorFail(task);
 
@@ -256,7 +257,7 @@ public class CeWorkerImplTest {
 
   @Test
   public void log_task_characteristics() throws Exception {
-    when(queue.peek(anyString(), anyBoolean())).thenReturn(Optional.of(createCeTask(null, "pullRequest", "123", "branch", "foo")));
+    when(queue.peek(anyString(), anyBoolean(), anyBoolean())).thenReturn(Optional.of(createCeTask(null, "pullRequest", "123", "branch", "foo")));
     taskProcessorRepository.setProcessorForTask(CeTaskTypes.REPORT, taskProcessor);
 
     underTest.call();
@@ -271,7 +272,7 @@ public class CeWorkerImplTest {
 
   @Test
   public void do_not_log_submitter_param_if_anonymous_and_success() throws Exception {
-    when(queue.peek(anyString(), anyBoolean())).thenReturn(Optional.of(createCeTask(null)));
+    when(queue.peek(anyString(), anyBoolean(), anyBoolean())).thenReturn(Optional.of(createCeTask(null)));
     taskProcessorRepository.setProcessorForTask(CeTaskTypes.REPORT, taskProcessor);
 
     underTest.call();
@@ -287,7 +288,7 @@ public class CeWorkerImplTest {
   @Test
   public void do_not_log_submitter_param_if_anonymous_and_error() throws Exception {
     CeTask ceTask = createCeTask(null);
-    when(queue.peek(anyString(), anyBoolean())).thenReturn(Optional.of(ceTask));
+    when(queue.peek(anyString(), anyBoolean(), anyBoolean())).thenReturn(Optional.of(ceTask));
     taskProcessorRepository.setProcessorForTask(ceTask.getType(), taskProcessor);
     makeTaskProcessorFail(ceTask);
 
@@ -307,7 +308,7 @@ public class CeWorkerImplTest {
   @Test
   public void log_submitter_login_if_authenticated_and_success() throws Exception {
     UserDto userDto = insertRandomUser();
-    when(queue.peek(anyString(), anyBoolean())).thenReturn(Optional.of(createCeTask(toTaskSubmitter(userDto))));
+    when(queue.peek(anyString(), anyBoolean(), anyBoolean())).thenReturn(Optional.of(createCeTask(toTaskSubmitter(userDto))));
     taskProcessorRepository.setProcessorForTask(CeTaskTypes.REPORT, taskProcessor);
 
     underTest.call();
@@ -323,7 +324,7 @@ public class CeWorkerImplTest {
 
   @Test
   public void log_submitterUuid_if_user_matching_submitterUuid_can_not_be_found() throws Exception {
-    when(queue.peek(anyString(), anyBoolean())).thenReturn(Optional.of(createCeTask(new CeTask.User("UUID_USER", null))));
+    when(queue.peek(anyString(), anyBoolean(), anyBoolean())).thenReturn(Optional.of(createCeTask(new CeTask.User("UUID_USER", null))));
     taskProcessorRepository.setProcessorForTask(CeTaskTypes.REPORT, taskProcessor);
 
     underTest.call();
@@ -341,7 +342,7 @@ public class CeWorkerImplTest {
   public void display_submitterLogin_in_logs_when_set_in_case_of_error() throws Exception {
     UserDto userDto = insertRandomUser();
     CeTask ceTask = createCeTask(toTaskSubmitter(userDto));
-    when(queue.peek(anyString(), anyBoolean())).thenReturn(Optional.of(ceTask));
+    when(queue.peek(anyString(), anyBoolean(), anyBoolean())).thenReturn(Optional.of(ceTask));
     taskProcessorRepository.setProcessorForTask(ceTask.getType(), taskProcessor);
     makeTaskProcessorFail(ceTask);
 
@@ -361,7 +362,7 @@ public class CeWorkerImplTest {
   public void display_start_stop_at_debug_level_for_console_if_DEBUG_is_enabled_and_task_successful() throws Exception {
     logTester.setLevel(LoggerLevel.DEBUG);
 
-    when(queue.peek(anyString(), anyBoolean())).thenReturn(Optional.of(createCeTask(submitter)));
+    when(queue.peek(anyString(), anyBoolean(), anyBoolean())).thenReturn(Optional.of(createCeTask(submitter)));
     taskProcessorRepository.setProcessorForTask(CeTaskTypes.REPORT, taskProcessor);
 
     underTest.call();
@@ -380,7 +381,7 @@ public class CeWorkerImplTest {
     logTester.setLevel(LoggerLevel.DEBUG);
 
     CeTask ceTask = createCeTask(submitter);
-    when(queue.peek(anyString(), anyBoolean())).thenReturn(Optional.of(ceTask));
+    when(queue.peek(anyString(), anyBoolean(), anyBoolean())).thenReturn(Optional.of(ceTask));
     taskProcessorRepository.setProcessorForTask(CeTaskTypes.REPORT, taskProcessor);
     makeTaskProcessorFail(ceTask);
 
@@ -400,7 +401,7 @@ public class CeWorkerImplTest {
   @Test
   public void call_sets_and_restores_thread_name_with_information_of_worker_when_there_is_no_task_to_process() throws Exception {
     String threadName = randomAlphabetic(3);
-    when(queue.peek(anyString(), anyBoolean())).thenAnswer(invocation -> {
+    when(queue.peek(anyString(), anyBoolean(), anyBoolean())).thenAnswer(invocation -> {
       assertThat(Thread.currentThread().getName())
         .isEqualTo("Worker " + randomOrdinal + " (UUID=" + workerUuid + ") on " + threadName);
       return Optional.empty();
@@ -414,7 +415,7 @@ public class CeWorkerImplTest {
   @Test
   public void call_sets_and_restores_thread_name_with_information_of_worker_when_a_task_is_processed() throws Exception {
     String threadName = randomAlphabetic(3);
-    when(queue.peek(anyString(), anyBoolean())).thenAnswer(invocation -> {
+    when(queue.peek(anyString(), anyBoolean(), anyBoolean())).thenAnswer(invocation -> {
       assertThat(Thread.currentThread().getName())
         .isEqualTo("Worker " + randomOrdinal + " (UUID=" + workerUuid + ") on " + threadName);
       return Optional.of(createCeTask(submitter));
@@ -430,7 +431,7 @@ public class CeWorkerImplTest {
   public void call_sets_and_restores_thread_name_with_information_of_worker_when_an_error_occurs() throws Exception {
     String threadName = randomAlphabetic(3);
     CeTask ceTask = createCeTask(submitter);
-    when(queue.peek(anyString(), anyBoolean())).thenAnswer(invocation -> {
+    when(queue.peek(anyString(), anyBoolean(), anyBoolean())).thenAnswer(invocation -> {
       assertThat(Thread.currentThread().getName())
         .isEqualTo("Worker " + randomOrdinal + " (UUID=" + workerUuid + ") on " + threadName);
       return Optional.of(ceTask);
@@ -458,7 +459,7 @@ public class CeWorkerImplTest {
   @Test
   public void log_error_when_task_fails_with_not_MessageException() throws Exception {
     CeTask ceTask = createCeTask(submitter);
-    when(queue.peek(anyString(), anyBoolean())).thenReturn(Optional.of(ceTask));
+    when(queue.peek(anyString(), anyBoolean(), anyBoolean())).thenReturn(Optional.of(ceTask));
     taskProcessorRepository.setProcessorForTask(CeTaskTypes.REPORT, taskProcessor);
     makeTaskProcessorFail(ceTask);
 
@@ -476,7 +477,7 @@ public class CeWorkerImplTest {
   @Test
   public void do_no_log_error_when_task_fails_with_MessageException() throws Exception {
     CeTask ceTask = createCeTask(submitter);
-    when(queue.peek(anyString(), anyBoolean())).thenReturn(Optional.of(ceTask));
+    when(queue.peek(anyString(), anyBoolean(), anyBoolean())).thenReturn(Optional.of(ceTask));
     taskProcessorRepository.setProcessorForTask(CeTaskTypes.REPORT, taskProcessor);
     makeTaskProcessorFail(ceTask, MessageException.of("simulate MessageException thrown by TaskProcessor#process"));
 
@@ -492,7 +493,7 @@ public class CeWorkerImplTest {
   @Test
   public void do_no_log_error_when_task_fails_with_BillingValidationsException() throws Exception {
     CeTask ceTask = createCeTask(submitter);
-    when(queue.peek(anyString(), anyBoolean())).thenReturn(Optional.of(ceTask));
+    when(queue.peek(anyString(), anyBoolean(), anyBoolean())).thenReturn(Optional.of(ceTask));
     taskProcessorRepository.setProcessorForTask(CeTaskTypes.REPORT, taskProcessor);
     makeTaskProcessorFail(ceTask, new BillingValidations.BillingValidationsException("simulate MessageException thrown by TaskProcessor#process"));
 
@@ -508,7 +509,7 @@ public class CeWorkerImplTest {
   @Test
   public void log_error_when_task_was_successful_but_ending_state_can_not_be_persisted_to_db() throws Exception {
     CeTask ceTask = createCeTask(submitter);
-    when(queue.peek(anyString(), anyBoolean())).thenReturn(Optional.of(ceTask));
+    when(queue.peek(anyString(), anyBoolean(), anyBoolean())).thenReturn(Optional.of(ceTask));
     taskProcessorRepository.setProcessorForTask(CeTaskTypes.REPORT, taskProcessor);
     doThrow(new RuntimeException("Simulate queue#remove failing")).when(queue).remove(ceTask, CeActivityDto.Status.SUCCESS, null, null);
 
@@ -520,7 +521,7 @@ public class CeWorkerImplTest {
   @Test
   public void log_error_when_task_failed_and_ending_state_can_not_be_persisted_to_db() throws Exception {
     CeTask ceTask = createCeTask(submitter);
-    when(queue.peek(anyString(), anyBoolean())).thenReturn(Optional.of(ceTask));
+    when(queue.peek(anyString(), anyBoolean(), anyBoolean())).thenReturn(Optional.of(ceTask));
     taskProcessorRepository.setProcessorForTask(CeTaskTypes.REPORT, taskProcessor);
     IllegalStateException ex = makeTaskProcessorFail(ceTask);
     RuntimeException runtimeException = new RuntimeException("Simulate queue#remove failing");
@@ -549,7 +550,7 @@ public class CeWorkerImplTest {
   @Test
   public void log_error_as_suppressed_when_task_failed_with_MessageException_and_ending_state_can_not_be_persisted_to_db() throws Exception {
     CeTask ceTask = createCeTask(submitter);
-    when(queue.peek(anyString(), anyBoolean())).thenReturn(Optional.of(ceTask));
+    when(queue.peek(anyString(), anyBoolean(), anyBoolean())).thenReturn(Optional.of(ceTask));
     taskProcessorRepository.setProcessorForTask(CeTaskTypes.REPORT, taskProcessor);
     MessageException ex = makeTaskProcessorFail(ceTask, MessageException.of("simulate MessageException thrown by TaskProcessor#process"));
     RuntimeException runtimeException = new RuntimeException("Simulate queue#remove failing");
@@ -580,7 +581,7 @@ public class CeWorkerImplTest {
     CountDownLatch inCallLatch = new CountDownLatch(1);
     CountDownLatch assertionsDoneLatch = new CountDownLatch(1);
     // mock long running peek(String) call => Thread is executing call() but not running a task
-    when(queue.peek(anyString(), anyBoolean())).thenAnswer((Answer<Optional<CeTask>>) invocation -> {
+    when(queue.peek(anyString(), anyBoolean(), anyBoolean())).thenAnswer((Answer<Optional<CeTask>>) invocation -> {
       inCallLatch.countDown();
       try {
         assertionsDoneLatch.await(10, TimeUnit.SECONDS);
@@ -615,7 +616,7 @@ public class CeWorkerImplTest {
     String taskType = randomAlphabetic(12);
     CeTask ceTask = mock(CeTask.class);
     when(ceTask.getType()).thenReturn(taskType);
-    when(queue.peek(anyString(), anyBoolean())).thenReturn(Optional.of(ceTask));
+    when(queue.peek(anyString(), anyBoolean(), anyBoolean())).thenReturn(Optional.of(ceTask));
     taskProcessorRepository.setProcessorForTask(taskType, new SimpleCeTaskProcessor() {
       @CheckForNull
       @Override
@@ -653,12 +654,26 @@ public class CeWorkerImplTest {
     assertThat(underTest.getCurrentTask()).isEmpty();
   }
 
+  @Test
+  public void do_not_exclude_portfolio_when_indexation_task_lookup_is_disabled() throws Exception {
+    // first call with empty queue to disable indexationTaskLookupEnabled
+    when(queue.peek(anyString(), anyBoolean(), anyBoolean())).thenReturn(Optional.empty());
+    assertThat(underTest.call()).isEqualTo(NO_TASK);
+
+    ArgumentCaptor<Boolean> booleanCaptor = ArgumentCaptor.forClass(Boolean.class);
+    // following calls should not exclude portfolios
+    when(queue.peek(anyString(), anyBoolean(), anyBoolean())).thenReturn(Optional.of(createCeTask(submitter)));
+    assertThat(underTest.call()).isEqualTo(TASK_PROCESSED);
+    verify(queue, times(3)).peek(anyString(), anyBoolean(), booleanCaptor.capture());
+    assertThat(booleanCaptor.getAllValues()).containsExactly(true, true, false);
+  }
+
   @Test
   public void getCurrentTask_returns_empty_when_a_thread_is_currently_calling_call_but_not_executing_a_task() throws InterruptedException {
     CountDownLatch inCallLatch = new CountDownLatch(1);
     CountDownLatch assertionsDoneLatch = new CountDownLatch(1);
     // mock long running peek(String) call => Thread is executing call() but not running a task
-    when(queue.peek(anyString(), anyBoolean())).thenAnswer((Answer<Optional<CeTask>>) invocation -> {
+    when(queue.peek(anyString(), anyBoolean(), anyBoolean())).thenAnswer((Answer<Optional<CeTask>>) invocation -> {
       inCallLatch.countDown();
       try {
         assertionsDoneLatch.await(10, TimeUnit.SECONDS);
@@ -689,7 +704,7 @@ public class CeWorkerImplTest {
     String taskType = randomAlphabetic(12);
     CeTask ceTask = mock(CeTask.class);
     when(ceTask.getType()).thenReturn(taskType);
-    when(queue.peek(anyString(), anyBoolean())).thenReturn(Optional.of(ceTask));
+    when(queue.peek(anyString(), anyBoolean(), anyBoolean())).thenReturn(Optional.of(ceTask));
     taskProcessorRepository.setProcessorForTask(taskType, new SimpleCeTaskProcessor() {
 
       @CheckForNull
@@ -746,7 +761,7 @@ public class CeWorkerImplTest {
   }
 
   private void verifyWorkerUuid() {
-    verify(queue, atLeastOnce()).peek(workerUuidCaptor.capture(), anyBoolean());
+    verify(queue, atLeastOnce()).peek(workerUuidCaptor.capture(), anyBoolean(), anyBoolean());
     assertThat(workerUuidCaptor.getValue()).isEqualTo(workerUuid);
   }
 
index b5dd36852c0f301f08fdc92de8134cea2b7675cb..3e699d894b7006018319e0c89ef9b289548fe312 100644 (file)
@@ -166,8 +166,8 @@ public class CeQueueDao implements Dao {
     return builder.build();
   }
 
-  public Optional<CeQueueDto> peek(DbSession session, String workerUuid, boolean excludeIndexationJob) {
-    List<String> eligibles = mapper(session).selectEligibleForPeek(ONE_RESULT_PAGINATION, excludeIndexationJob);
+  public Optional<CeQueueDto> peek(DbSession session, String workerUuid, boolean excludeIndexationJob, boolean excludeViewRefresh) {
+    List<String> eligibles = mapper(session).selectEligibleForPeek(ONE_RESULT_PAGINATION, excludeIndexationJob, excludeViewRefresh);
     if (eligibles.isEmpty()) {
       return Optional.empty();
     }
index e7c3843d640d697fe9b1e58e93852b2c81cee4d9..fd277b2011cceafcc9919f1992530db2f7060be7 100644 (file)
@@ -36,7 +36,9 @@ public interface CeQueueMapper {
 
   int countByQuery(@Param("query") CeTaskQuery query);
 
-  List<String> selectEligibleForPeek(@Param("pagination") Pagination pagination, @Param("excludeIndexationJob") boolean excludeIndexationJob);
+  List<String> selectEligibleForPeek(@Param("pagination") Pagination pagination,
+                                     @Param("excludeIndexationJob") boolean excludeIndexationJob,
+                                     @Param("excludeViewRefresh") boolean excludeViewRefresh);
 
   @CheckForNull
   CeQueueDto selectByUuid(@Param("uuid") String uuid);
index 0d2a49d25c56dd338ada5bae4a413e97d5684ebf..5ac5844792ba45b4e8da0e6381e3de90d49a53e3 100644 (file)
   <sql id="sqlSelectEligibleForPeek">
     from
       ce_queue cq
+    <if test="excludeViewRefresh">
+        inner join components c on c.uuid = cq.main_component_uuid and c.qualifier &lt;&gt; 'VW'
+    </if>
     where
       cq.status='PENDING'
       and cq.started_at is null
 
   <sql id="orderBySelectEligibleForPeek">
     order by
-      created_at asc,
-      uuid asc
+      cq.created_at asc,
+      cq.uuid asc
   </sql>
 
   <select id="selectPending" resultType="org.sonar.db.ce.CeQueueDto">
index f58956ee4b8c49a59388b61918a5d06ad115d01b..83fdb26f1dc72e03a52f911053901a26a0871eca 100644 (file)
@@ -29,6 +29,7 @@ import java.util.Map;
 import java.util.Optional;
 import java.util.Random;
 import java.util.function.Consumer;
+import java.util.stream.Collectors;
 import java.util.stream.Stream;
 import javax.annotation.Nullable;
 import org.junit.Rule;
@@ -38,8 +39,8 @@ import org.sonar.api.impl.utils.AlwaysIncreasingSystem2;
 import org.sonar.api.impl.utils.TestSystem2;
 import org.sonar.api.utils.System2;
 import org.sonar.db.DbTester;
+import org.sonar.db.component.ComponentDto;
 
-import static com.google.common.collect.FluentIterable.from;
 import static com.google.common.collect.Lists.newArrayList;
 import static java.util.Collections.singletonList;
 import static org.assertj.core.api.Assertions.assertThat;
@@ -392,11 +393,11 @@ public class CeQueueDaoTest {
 
   @Test
   public void peek_none_if_no_pendings() {
-    assertThat(underTest.peek(db.getSession(), WORKER_UUID_1, false).isPresent()).isFalse();
+    assertThat(underTest.peek(db.getSession(), WORKER_UUID_1, false, false).isPresent()).isFalse();
 
     // not pending, but in progress
     makeInProgress(WORKER_UUID_1, 2_232_222L, insertPending(TASK_UUID_1, MAIN_COMPONENT_UUID_1));
-    assertThat(underTest.peek(db.getSession(), WORKER_UUID_1, false).isPresent()).isFalse();
+    assertThat(underTest.peek(db.getSession(), WORKER_UUID_1, false, false).isPresent()).isFalse();
   }
 
   @Test
@@ -409,7 +410,7 @@ public class CeQueueDaoTest {
     verifyCeQueueStatuses(TASK_UUID_1, PENDING, TASK_UUID_2, PENDING);
 
     // peek first one
-    Optional<CeQueueDto> peek = underTest.peek(db.getSession(), WORKER_UUID_1, false);
+    Optional<CeQueueDto> peek = underTest.peek(db.getSession(), WORKER_UUID_1, false, false);
     assertThat(peek).isPresent();
     assertThat(peek.get().getUuid()).isEqualTo(TASK_UUID_1);
     assertThat(peek.get().getStatus()).isEqualTo(IN_PROGRESS);
@@ -417,7 +418,7 @@ public class CeQueueDaoTest {
     verifyCeQueueStatuses(TASK_UUID_1, IN_PROGRESS, TASK_UUID_2, PENDING);
 
     // peek second one
-    peek = underTest.peek(db.getSession(), WORKER_UUID_2, false);
+    peek = underTest.peek(db.getSession(), WORKER_UUID_2, false, false);
     assertThat(peek).isPresent();
     assertThat(peek.get().getUuid()).isEqualTo(TASK_UUID_2);
     assertThat(peek.get().getStatus()).isEqualTo(IN_PROGRESS);
@@ -425,7 +426,7 @@ public class CeQueueDaoTest {
     verifyCeQueueStatuses(TASK_UUID_1, IN_PROGRESS, TASK_UUID_2, IN_PROGRESS);
 
     // no more pendings
-    assertThat(underTest.peek(db.getSession(), WORKER_UUID_1, false).isPresent()).isFalse();
+    assertThat(underTest.peek(db.getSession(), WORKER_UUID_1, false, false).isPresent()).isFalse();
   }
 
   @Test
@@ -435,7 +436,7 @@ public class CeQueueDaoTest {
     system2.setNow(INIT_TIME + 3_000_000);
     insertPending(TASK_UUID_2, MAIN_COMPONENT_UUID_1);
 
-    Optional<CeQueueDto> peek = underTest.peek(db.getSession(), WORKER_UUID_1, false);
+    Optional<CeQueueDto> peek = underTest.peek(db.getSession(), WORKER_UUID_1, false, false);
     assertThat(peek).isPresent();
     assertThat(peek.get().getUuid()).isEqualTo(TASK_UUID_1);
     assertThat(peek.get().getMainComponentUuid()).isEqualTo(MAIN_COMPONENT_UUID_1);
@@ -443,12 +444,12 @@ public class CeQueueDaoTest {
     verifyCeQueueStatuses(TASK_UUID_1, IN_PROGRESS, TASK_UUID_2, PENDING);
 
     // do not peek second task as long as the first one is in progress
-    peek = underTest.peek(db.getSession(), WORKER_UUID_1, false);
+    peek = underTest.peek(db.getSession(), WORKER_UUID_1, false, false);
     assertThat(peek.isPresent()).isFalse();
 
     // first one is finished
     underTest.deleteByUuid(db.getSession(), TASK_UUID_1);
-    peek = underTest.peek(db.getSession(), WORKER_UUID_2, false);
+    peek = underTest.peek(db.getSession(), WORKER_UUID_2, false, false);
     assertThat(peek.get().getUuid()).isEqualTo(TASK_UUID_2);
     assertThat(peek.get().getWorkerUuid()).isEqualTo(WORKER_UUID_2);
   }
@@ -630,6 +631,60 @@ public class CeQueueDaoTest {
     assertThat(underTest.selectInProgressStartedBefore(db.getSession(), 3_000L)).extracting(CeQueueDto::getUuid).containsExactlyInAnyOrder(TASK_UUID_2, TASK_UUID_3);
   }
 
+  @Test
+  public void exclude_portfolios_computation_when_indexing_issues() {
+    insertBranch(MAIN_COMPONENT_UUID_1);
+    insertPending(newCeQueueDto(TASK_UUID_1)
+      .setComponentUuid(MAIN_COMPONENT_UUID_1)
+      .setMainComponentUuid(MAIN_COMPONENT_UUID_1)
+      .setStatus(PENDING)
+      .setTaskType(CeTaskTypes.BRANCH_ISSUE_SYNC)
+      .setCreatedAt(100_000L));
+
+    String view_uuid = "view_uuid";
+    insertView(view_uuid);
+    insertPending(newCeQueueDto(TASK_UUID_2)
+      .setComponentUuid(view_uuid)
+      .setMainComponentUuid(view_uuid)
+      .setStatus(PENDING)
+      .setTaskType(CeTaskTypes.REPORT)
+      .setCreatedAt(100_000L));
+
+    Optional<CeQueueDto> peek = underTest.peek(db.getSession(), WORKER_UUID_1, false, true);
+    assertThat(peek).isPresent();
+    assertThat(peek.get().getUuid()).isEqualTo(TASK_UUID_1);
+
+    Optional<CeQueueDto> peek2 = underTest.peek(db.getSession(), WORKER_UUID_1, false, false);
+    assertThat(peek2).isPresent();
+    assertThat(peek2.get().getUuid()).isEqualTo(TASK_UUID_2);
+  }
+
+  private void insertView(String view_uuid) {
+    ComponentDto view = new ComponentDto();
+    view.setQualifier("VW");
+    view.setUuid(view_uuid);
+    view.setOrganizationUuid("org_uuid");
+    view.setPrivate(false);
+    view.setRootUuid(view_uuid);
+    view.setUuidPath("uuid_path");
+    view.setProjectUuid(view_uuid);
+    db.components().insertViewAndSnapshot(view);
+    db.commit();
+  }
+
+  private void insertBranch(String uuid) {
+    ComponentDto branch = new ComponentDto();
+    branch.setQualifier("TRK");
+    branch.setUuid(uuid);
+    branch.setOrganizationUuid("org_uuid");
+    branch.setPrivate(false);
+    branch.setRootUuid(uuid);
+    branch.setUuidPath("uuid_path");
+    branch.setProjectUuid(uuid);
+    db.components().insertComponent(branch);
+    db.commit();
+  }
+
   private void insertPending(CeQueueDto dto) {
     underTest.insert(db.getSession(), dto);
     db.commit();
@@ -675,7 +730,7 @@ public class CeQueueDaoTest {
   }
 
   private static Iterable<Map<String, Object>> upperizeKeys(List<Map<String, Object>> select) {
-    return from(select).transform(new Function<Map<String, Object>, Map<String, Object>>() {
+    return select.stream().map(new Function<Map<String, Object>, Map<String, Object>>() {
       @Nullable
       @Override
       public Map<String, Object> apply(Map<String, Object> input) {
@@ -685,7 +740,7 @@ public class CeQueueDaoTest {
         }
         return res;
       }
-    });
+    }).collect(Collectors.toList());
   }
 
   private void verifyCeQueueStatuses(String taskUuid1, CeQueueDto.Status taskStatus1, String taskUuid2, CeQueueDto.Status taskStatus2) {