]> source.dussan.org Git - sonarqube.git/commitdiff
SONAR-14698 Live index use all workers
authorPierre <pierre.guillot@sonarsource.com>
Wed, 14 Apr 2021 07:54:33 +0000 (09:54 +0200)
committersonartech <sonartech@sonarsource.com>
Fri, 16 Apr 2021 20:03:44 +0000 (20:03 +0000)
server/sonar-ce/src/main/java/org/sonar/ce/queue/InternalCeQueue.java
server/sonar-ce/src/main/java/org/sonar/ce/queue/InternalCeQueueImpl.java
server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeProcessingSchedulerImpl.java
server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeWorkerImpl.java
server/sonar-ce/src/test/java/org/sonar/ce/queue/InternalCeQueueImplTest.java
server/sonar-ce/src/test/java/org/sonar/ce/taskprocessor/CeWorkerImplTest.java

index 0e0903ae90e752e75b4e0fb68a6269e5fb175cbb..3e834cdd8afa7355b55e76fb43b1659b8017af7f 100644 (file)
@@ -41,7 +41,6 @@ public interface InternalCeQueue extends CeQueue {
    * Does not return anything if workers are paused or being paused (see {@link #getWorkersPauseStatus()}.
    *
    * @param excludeIndexationJob change the underlying request to exclude indexation tasks.
-   * @param excludeViewRefresh change the underlying request to exclude portfolios (but still include APP)
    *
    * <p>Only a single task can be peeked by project.</p>
    *
@@ -50,7 +49,7 @@ public interface InternalCeQueue extends CeQueue {
    * <p>Tasks which have been executed twice already but are still {@link org.sonar.db.ce.CeQueueDto.Status#PENDING}
    * are ignored</p>
    */
-  Optional<CeTask> peek(String workerUuid, boolean excludeIndexationJob, boolean excludeViewRefresh);
+  Optional<CeTask> peek(String workerUuid, boolean excludeIndexationJob);
 
   /**
    * Removes a task from the queue and registers it to past activities. This method
index e386d7ac899c2b184a5f787188b2e08572b07e02..d3ebb0fec999deca73502368688094f3c8623ea6 100644 (file)
@@ -72,7 +72,7 @@ public class InternalCeQueueImpl extends CeQueueImpl implements InternalCeQueue
   }
 
   @Override
-  public Optional<CeTask> peek(String workerUuid, boolean excludeIndexationJob, boolean excludeViewRefresh) {
+  public Optional<CeTask> peek(String workerUuid, boolean excludeIndexationJob) {
     requireNonNull(workerUuid, "workerUuid can't be null");
 
     if (computeEngineStatus.getStatus() != ComputeEngineStatus.Status.STARTED || getWorkersPauseStatus() != WorkersPauseStatus.RESUMED) {
@@ -85,7 +85,7 @@ public class InternalCeQueueImpl extends CeQueueImpl implements InternalCeQueue
         dbSession.commit();
         LOG.debug("{} in progress tasks reset for worker uuid {}", i, workerUuid);
       }
-      Optional<CeQueueDto> opt = ceQueueDao.peek(dbSession, workerUuid, excludeIndexationJob, excludeViewRefresh);
+      Optional<CeQueueDto> opt = findPendingTask(workerUuid, dbSession, ceQueueDao, excludeIndexationJob);
       if (!opt.isPresent()) {
         return Optional.empty();
       }
@@ -102,6 +102,17 @@ public class InternalCeQueueImpl extends CeQueueImpl implements InternalCeQueue
     }
   }
 
+  private Optional<CeQueueDto> findPendingTask(String workerUuid, DbSession dbSession, CeQueueDao ceQueueDao, boolean excludeIndexationJob) {
+    // try to find tasks including indexation job & excluding app/portfolio
+    // and if no match, try the opposite
+    // when excludeIndexationJob is false, search first excluding indexation jobs and including app/portfolio, then the opposite
+    Optional<CeQueueDto> opt = ceQueueDao.peek(dbSession, workerUuid, excludeIndexationJob, !excludeIndexationJob);
+    if (!opt.isPresent()) {
+      opt = ceQueueDao.peek(dbSession, workerUuid, !excludeIndexationJob, excludeIndexationJob);
+    }
+    return opt;
+  }
+
   @Override
   public void remove(CeTask task, CeActivityDto.Status status, @Nullable CeTaskResult taskResult, @Nullable Throwable error) {
     checkArgument(error == null || status == CeActivityDto.Status.FAILED, "Error can be provided only when status is FAILED");
index 3e92f875dab4ec9a2a4f3c9e9db4afd4e6eafb0a..fd8cc7a2186f8842f7aebe2608065cfea50e8f95 100644 (file)
@@ -207,8 +207,8 @@ public class CeProcessingSchedulerImpl implements CeProcessingScheduler {
 
     public void stop(boolean interrupt) {
       keepRunning = false;
+      interrupted = true;
       if (workerFuture != null) {
-        interrupted = true;
         workerFuture.cancel(interrupt);
       }
     }
index 9bbff219f880e33207c3cc67c7834188216e4eec..89d2f75dc0b8e8a6187583870414278b273cdfda 100644 (file)
@@ -61,7 +61,6 @@ public class CeWorkerImpl implements CeWorker {
   private final CeWorkerController ceWorkerController;
   private final List<ExecutionListener> listeners;
   private final AtomicReference<RunningState> runningState = new AtomicReference<>();
-  private boolean indexationTaskLookupEnabled;
   private boolean excludeIndexationJob;
 
   public CeWorkerImpl(int ordinal, String uuid,
@@ -74,8 +73,7 @@ public class CeWorkerImpl implements CeWorker {
     this.taskProcessorRepository = taskProcessorRepository;
     this.ceWorkerController = ceWorkerController;
     this.listeners = Arrays.asList(listeners);
-    indexationTaskLookupEnabled = true;
-    excludeIndexationJob = false;
+    this.excludeIndexationJob = true;
   }
 
   private static int checkOrdinal(int ordinal) {
@@ -167,36 +165,15 @@ public class CeWorkerImpl implements CeWorker {
   }
 
   private Optional<CeTask> tryAndFindTaskToExecute() {
+    excludeIndexationJob = !excludeIndexationJob;
     try {
-      if (indexationTaskLookupEnabled) {
-        return tryAndFindTaskToExecuteIncludingIndexation();
-      } else {
-        return queue.peek(uuid, true, false);
-      }
+      return queue.peek(uuid, excludeIndexationJob);
     } catch (Exception e) {
       LOG.error("Failed to pop the queue of analysis reports", e);
     }
     return Optional.empty();
   }
 
-  private Optional<CeTask> tryAndFindTaskToExecuteIncludingIndexation() {
-    excludeIndexationJob = !excludeIndexationJob;
-    Optional<CeTask> peek = queue.peek(uuid, excludeIndexationJob, true);
-    if (peek.isPresent()) {
-      return peek;
-    }
-    if (excludeIndexationJob) {
-      peek = queue.peek(uuid, false, true);
-      if (peek.isPresent()) {
-        return peek;
-      }
-      // do not lookup for indexation tasks anymore
-      indexationTaskLookupEnabled = false;
-      LOG.info(String.format("worker %s found no pending task (including indexation task). Disabling indexation task lookup for this worker until next SonarQube restart.", uuid));
-    }
-    return Optional.empty();
-  }
-
   private final class ExecuteTask implements Runnable, AutoCloseable {
     private final CeTask task;
     private final RunningState localRunningState;
index 33b96e58c0258da936cc1b43a1eccdc5567cc3c5..cd4f7368ebd7760007f4c096899f77b2f8c6fc18 100644 (file)
@@ -147,18 +147,18 @@ public class InternalCeQueueImplTest {
     expectedException.expect(NullPointerException.class);
     expectedException.expectMessage("workerUuid can't be null");
 
-    underTest.peek(null, false, false);
+    underTest.peek(null, true);
   }
 
   @Test
   public void test_remove() {
     CeTask task = submit(CeTaskTypes.REPORT, newProjectDto("PROJECT_1"));
-    Optional<CeTask> peek = underTest.peek(WORKER_UUID_1, false, false);
+    Optional<CeTask> peek = underTest.peek(WORKER_UUID_1, true);
     underTest.remove(peek.get(), CeActivityDto.Status.SUCCESS, null, null);
 
     // queue is empty
     assertThat(db.getDbClient().ceQueueDao().selectByUuid(db.getSession(), task.getUuid())).isNotPresent();
-    assertThat(underTest.peek(WORKER_UUID_2, false, false)).isNotPresent();
+    assertThat(underTest.peek(WORKER_UUID_2, true)).isNotPresent();
 
     // available in history
     Optional<CeActivityDto> history = db.getDbClient().ceActivityDao().selectByUuid(db.getSession(), task.getUuid());
@@ -187,7 +187,7 @@ public class InternalCeQueueImplTest {
   @Test
   public void remove_does_not_set_analysisUuid_in_CeActivity_when_CeTaskResult_has_no_analysis_uuid() {
     CeTask task = submit(CeTaskTypes.REPORT, newProjectDto("PROJECT_1"));
-    Optional<CeTask> peek = underTest.peek(WORKER_UUID_1, false, false);
+    Optional<CeTask> peek = underTest.peek(WORKER_UUID_1, true);
     underTest.remove(peek.get(), CeActivityDto.Status.SUCCESS, newTaskResult(null), null);
 
     // available in history
@@ -200,7 +200,7 @@ public class InternalCeQueueImplTest {
   public void remove_sets_analysisUuid_in_CeActivity_when_CeTaskResult_has_analysis_uuid() {
     CeTask task = submit(CeTaskTypes.REPORT, newProjectDto("PROJECT_1"));
 
-    Optional<CeTask> peek = underTest.peek(WORKER_UUID_2, false, false);
+    Optional<CeTask> peek = underTest.peek(WORKER_UUID_2, true);
     underTest.remove(peek.get(), CeActivityDto.Status.SUCCESS, newTaskResult(AN_ANALYSIS_UUID), null);
 
     // available in history
@@ -214,7 +214,7 @@ public class InternalCeQueueImplTest {
     Throwable error = new NullPointerException("Fake NPE to test persistence to DB");
 
     CeTask task = submit(CeTaskTypes.REPORT, newProjectDto("PROJECT_1"));
-    Optional<CeTask> peek = underTest.peek(WORKER_UUID_1, false, false);
+    Optional<CeTask> peek = underTest.peek(WORKER_UUID_1, true);
     underTest.remove(peek.get(), CeActivityDto.Status.FAILED, null, error);
 
     Optional<CeActivityDto> activityDto = db.getDbClient().ceActivityDao().selectByUuid(session, task.getUuid());
@@ -230,7 +230,7 @@ public class InternalCeQueueImplTest {
     Throwable error = new TypedExceptionImpl("aType", "aMessage");
 
     CeTask task = submit(CeTaskTypes.REPORT, newProjectDto("PROJECT_1"));
-    Optional<CeTask> peek = underTest.peek(WORKER_UUID_1, false, false);
+    Optional<CeTask> peek = underTest.peek(WORKER_UUID_1, true);
     underTest.remove(peek.get(), CeActivityDto.Status.FAILED, null, error);
 
     CeActivityDto activityDto = db.getDbClient().ceActivityDao().selectByUuid(session, task.getUuid()).get();
@@ -335,7 +335,7 @@ public class InternalCeQueueImplTest {
   public void test_peek() {
     CeTask task = submit(CeTaskTypes.REPORT, newProjectDto("PROJECT_1"));
 
-    Optional<CeTask> peek = underTest.peek(WORKER_UUID_1, false, false);
+    Optional<CeTask> peek = underTest.peek(WORKER_UUID_1, true);
     assertThat(peek.isPresent()).isTrue();
     assertThat(peek.get().getUuid()).isEqualTo(task.getUuid());
     assertThat(peek.get().getType()).isEqualTo(CeTaskTypes.REPORT);
@@ -343,7 +343,7 @@ public class InternalCeQueueImplTest {
     assertThat(peek.get().getMainComponent()).contains(peek.get().getComponent().get());
 
     // no more pending tasks
-    peek = underTest.peek(WORKER_UUID_2, false, false);
+    peek = underTest.peek(WORKER_UUID_2, true);
     assertThat(peek.isPresent()).isFalse();
   }
 
@@ -353,7 +353,7 @@ public class InternalCeQueueImplTest {
     ComponentDto branch = db.components().insertProjectBranch(project);
     CeTask task = submit(CeTaskTypes.REPORT, branch);
 
-    Optional<CeTask> peek = underTest.peek(WORKER_UUID_1, false, false);
+    Optional<CeTask> peek = underTest.peek(WORKER_UUID_1, true);
     assertThat(peek.isPresent()).isTrue();
     assertThat(peek.get().getUuid()).isEqualTo(task.getUuid());
     assertThat(peek.get().getType()).isEqualTo(CeTaskTypes.REPORT);
@@ -361,7 +361,7 @@ public class InternalCeQueueImplTest {
     assertThat(peek.get().getMainComponent()).contains(new CeTask.Component(project.uuid(), project.getDbKey(), project.name()));
 
     // no more pending tasks
-    peek = underTest.peek(WORKER_UUID_2, false, false);
+    peek = underTest.peek(WORKER_UUID_2, true);
     assertThat(peek.isPresent()).isFalse();
   }
 
@@ -370,11 +370,11 @@ public class InternalCeQueueImplTest {
     CeTask task = submit(CeTaskTypes.REPORT, newProjectDto("PROJECT_1"));
     underTest.pauseWorkers();
 
-    Optional<CeTask> peek = underTest.peek(WORKER_UUID_1, false, false);
+    Optional<CeTask> peek = underTest.peek(WORKER_UUID_1, true);
     assertThat(peek).isEmpty();
 
     underTest.resumeWorkers();
-    peek = underTest.peek(WORKER_UUID_1, false, false);
+    peek = underTest.peek(WORKER_UUID_1, true);
     assertThat(peek).isPresent();
     assertThat(peek.get().getUuid()).isEqualTo(task.getUuid());
   }
@@ -388,7 +388,7 @@ public class InternalCeQueueImplTest {
     makeInProgress(dto, "foo");
     db.commit();
 
-    assertThat(underTest.peek(WORKER_UUID_1, false, false)).isEmpty();
+    assertThat(underTest.peek(WORKER_UUID_1, true)).isEmpty();
   }
 
   @Test
@@ -396,7 +396,7 @@ public class InternalCeQueueImplTest {
     submit(CeTaskTypes.REPORT, newProjectDto("PROJECT_1"));
     when(computeEngineStatus.getStatus()).thenReturn(STOPPING);
 
-    Optional<CeTask> peek = underTest.peek(WORKER_UUID_1, false, false);
+    Optional<CeTask> peek = underTest.peek(WORKER_UUID_1, true);
     assertThat(peek.isPresent()).isFalse();
   }
 
@@ -408,7 +408,7 @@ public class InternalCeQueueImplTest {
       .setStatus(CeQueueDto.Status.PENDING));
     db.commit();
 
-    assertThat(underTest.peek(WORKER_UUID_1, false, false).get().getUuid()).isEqualTo("uuid");
+    assertThat(underTest.peek(WORKER_UUID_1, true).get().getUuid()).isEqualTo("uuid");
   }
 
   @Test
@@ -417,7 +417,7 @@ public class InternalCeQueueImplTest {
     CeQueueDto u1 = insertPending("u1");// will be picked-because older than any of the reset ones
     CeQueueDto u2 = insertInProgress("u2", WORKER_UUID_1);// will be reset
 
-    assertThat(underTest.peek(WORKER_UUID_1, false, false).get().getUuid()).isEqualTo("u0");
+    assertThat(underTest.peek(WORKER_UUID_1, true).get().getUuid()).isEqualTo("u0");
 
     verifyUnmodifiedTask(u1);
     verifyResetTask(u2);
@@ -431,7 +431,7 @@ public class InternalCeQueueImplTest {
     CeQueueDto u3 = insertInProgress("u3", WORKER_UUID_1);
     CeQueueDto u4 = insertInProgress("u4", WORKER_UUID_2);
 
-    assertThat(underTest.peek(WORKER_UUID_1, false, false).get().getUuid()).isEqualTo("u0");
+    assertThat(underTest.peek(WORKER_UUID_1, true).get().getUuid()).isEqualTo("u0");
 
     verifyResetTask(u1);
     verifyUnmodifiedTask(u2);
@@ -489,7 +489,7 @@ public class InternalCeQueueImplTest {
   @Test
   public void fail_to_cancel_if_in_progress() {
     CeTask task = submit(CeTaskTypes.REPORT, newProjectDto("PROJECT_1"));
-    underTest.peek(WORKER_UUID_2, false, false);
+    underTest.peek(WORKER_UUID_2, true);
     CeQueueDto queueDto = db.getDbClient().ceQueueDao().selectByUuid(db.getSession(), task.getUuid()).get();
 
     expectedException.expect(IllegalStateException.class);
@@ -503,7 +503,7 @@ public class InternalCeQueueImplTest {
     CeTask inProgressTask = submit(CeTaskTypes.REPORT, newProjectDto("PROJECT_1"));
     CeTask pendingTask1 = submit(CeTaskTypes.REPORT, newProjectDto("PROJECT_2"));
     CeTask pendingTask2 = submit(CeTaskTypes.REPORT, newProjectDto("PROJECT_3"));
-    underTest.peek(WORKER_UUID_2, false, false);
+    underTest.peek(WORKER_UUID_2, true);
 
     int canceledCount = underTest.cancelAll();
     assertThat(canceledCount).isEqualTo(2);
index 490512023831cbb7d43ee8668049a03040cdcd41..60a3253ef4bd29f5b202c495716724b95fed39a5 100644 (file)
@@ -146,7 +146,7 @@ public class CeWorkerImplTest {
 
   @Test
   public void no_pending_tasks_in_queue() throws Exception {
-    when(queue.peek(anyString(), anyBoolean(), anyBoolean())).thenReturn(Optional.empty());
+    when(queue.peek(anyString(), anyBoolean())).thenReturn(Optional.empty());
 
     assertThat(underTest.call()).isEqualTo(NO_TASK);
 
@@ -155,7 +155,7 @@ public class CeWorkerImplTest {
 
   @Test
   public void no_pending_tasks_in_queue_without_listener() throws Exception {
-    when(queue.peek(anyString(), anyBoolean(), anyBoolean())).thenReturn(Optional.empty());
+    when(queue.peek(anyString(), anyBoolean())).thenReturn(Optional.empty());
 
     assertThat(underTestNoListener.call()).isEqualTo(NO_TASK);
 
@@ -166,7 +166,7 @@ public class CeWorkerImplTest {
   public void fail_when_no_CeTaskProcessor_is_found_in_repository() throws Exception {
     CeTask task = createCeTask(null);
     taskProcessorRepository.setNoProcessorForTask(CeTaskTypes.REPORT);
-    when(queue.peek(anyString(), anyBoolean(), anyBoolean())).thenReturn(Optional.of(task));
+    when(queue.peek(anyString(), anyBoolean())).thenReturn(Optional.of(task));
 
     assertThat(underTest.call()).isEqualTo(TASK_PROCESSED);
 
@@ -182,7 +182,7 @@ public class CeWorkerImplTest {
   public void fail_when_no_CeTaskProcessor_is_found_in_repository_without_listener() throws Exception {
     CeTask task = createCeTask(null);
     taskProcessorRepository.setNoProcessorForTask(CeTaskTypes.REPORT);
-    when(queue.peek(anyString(), anyBoolean(), anyBoolean())).thenReturn(Optional.of(task));
+    when(queue.peek(anyString(), anyBoolean())).thenReturn(Optional.of(task));
 
     assertThat(underTestNoListener.call()).isEqualTo(TASK_PROCESSED);
 
@@ -195,7 +195,7 @@ public class CeWorkerImplTest {
   public void peek_and_process_task() throws Exception {
     CeTask task = createCeTask(null);
     taskProcessorRepository.setProcessorForTask(task.getType(), taskProcessor);
-    when(queue.peek(anyString(), anyBoolean(), anyBoolean())).thenReturn(Optional.of(task));
+    when(queue.peek(anyString(), anyBoolean())).thenReturn(Optional.of(task));
 
     assertThat(underTest.call()).isEqualTo(TASK_PROCESSED);
 
@@ -212,7 +212,7 @@ public class CeWorkerImplTest {
   public void peek_and_process_task_without_listeners() throws Exception {
     CeTask task = createCeTask(null);
     taskProcessorRepository.setProcessorForTask(task.getType(), taskProcessor);
-    when(queue.peek(anyString(), anyBoolean(), anyBoolean())).thenReturn(Optional.of(task));
+    when(queue.peek(anyString(), anyBoolean())).thenReturn(Optional.of(task));
 
     assertThat(underTestNoListener.call()).isEqualTo(TASK_PROCESSED);
 
@@ -225,7 +225,7 @@ public class CeWorkerImplTest {
   @Test
   public void fail_to_process_task() throws Exception {
     CeTask task = createCeTask(null);
-    when(queue.peek(anyString(), anyBoolean(), anyBoolean())).thenReturn(Optional.of(task));
+    when(queue.peek(anyString(), anyBoolean())).thenReturn(Optional.of(task));
     taskProcessorRepository.setProcessorForTask(task.getType(), taskProcessor);
     Throwable error = makeTaskProcessorFail(task);
 
@@ -243,7 +243,7 @@ public class CeWorkerImplTest {
   @Test
   public void fail_to_process_task_without_listeners() throws Exception {
     CeTask task = createCeTask(null);
-    when(queue.peek(anyString(), anyBoolean(), anyBoolean())).thenReturn(Optional.of(task));
+    when(queue.peek(anyString(), anyBoolean())).thenReturn(Optional.of(task));
     taskProcessorRepository.setProcessorForTask(task.getType(), taskProcessor);
     Throwable error = makeTaskProcessorFail(task);
 
@@ -257,7 +257,7 @@ public class CeWorkerImplTest {
 
   @Test
   public void log_task_characteristics() throws Exception {
-    when(queue.peek(anyString(), anyBoolean(), anyBoolean())).thenReturn(Optional.of(createCeTask(null, "pullRequest", "123", "branch", "foo")));
+    when(queue.peek(anyString(), anyBoolean())).thenReturn(Optional.of(createCeTask(null, "pullRequest", "123", "branch", "foo")));
     taskProcessorRepository.setProcessorForTask(CeTaskTypes.REPORT, taskProcessor);
 
     underTest.call();
@@ -272,7 +272,7 @@ public class CeWorkerImplTest {
 
   @Test
   public void do_not_log_submitter_param_if_anonymous_and_success() throws Exception {
-    when(queue.peek(anyString(), anyBoolean(), anyBoolean())).thenReturn(Optional.of(createCeTask(null)));
+    when(queue.peek(anyString(), anyBoolean())).thenReturn(Optional.of(createCeTask(null)));
     taskProcessorRepository.setProcessorForTask(CeTaskTypes.REPORT, taskProcessor);
 
     underTest.call();
@@ -288,7 +288,7 @@ public class CeWorkerImplTest {
   @Test
   public void do_not_log_submitter_param_if_anonymous_and_error() throws Exception {
     CeTask ceTask = createCeTask(null);
-    when(queue.peek(anyString(), anyBoolean(), anyBoolean())).thenReturn(Optional.of(ceTask));
+    when(queue.peek(anyString(), anyBoolean())).thenReturn(Optional.of(ceTask));
     taskProcessorRepository.setProcessorForTask(ceTask.getType(), taskProcessor);
     makeTaskProcessorFail(ceTask);
 
@@ -308,7 +308,7 @@ public class CeWorkerImplTest {
   @Test
   public void log_submitter_login_if_authenticated_and_success() throws Exception {
     UserDto userDto = insertRandomUser();
-    when(queue.peek(anyString(), anyBoolean(), anyBoolean())).thenReturn(Optional.of(createCeTask(toTaskSubmitter(userDto))));
+    when(queue.peek(anyString(), anyBoolean())).thenReturn(Optional.of(createCeTask(toTaskSubmitter(userDto))));
     taskProcessorRepository.setProcessorForTask(CeTaskTypes.REPORT, taskProcessor);
 
     underTest.call();
@@ -324,7 +324,7 @@ public class CeWorkerImplTest {
 
   @Test
   public void log_submitterUuid_if_user_matching_submitterUuid_can_not_be_found() throws Exception {
-    when(queue.peek(anyString(), anyBoolean(), anyBoolean())).thenReturn(Optional.of(createCeTask(new CeTask.User("UUID_USER", null))));
+    when(queue.peek(anyString(), anyBoolean())).thenReturn(Optional.of(createCeTask(new CeTask.User("UUID_USER", null))));
     taskProcessorRepository.setProcessorForTask(CeTaskTypes.REPORT, taskProcessor);
 
     underTest.call();
@@ -342,7 +342,7 @@ public class CeWorkerImplTest {
   public void display_submitterLogin_in_logs_when_set_in_case_of_error() throws Exception {
     UserDto userDto = insertRandomUser();
     CeTask ceTask = createCeTask(toTaskSubmitter(userDto));
-    when(queue.peek(anyString(), anyBoolean(), anyBoolean())).thenReturn(Optional.of(ceTask));
+    when(queue.peek(anyString(), anyBoolean())).thenReturn(Optional.of(ceTask));
     taskProcessorRepository.setProcessorForTask(ceTask.getType(), taskProcessor);
     makeTaskProcessorFail(ceTask);
 
@@ -362,7 +362,7 @@ public class CeWorkerImplTest {
   public void display_start_stop_at_debug_level_for_console_if_DEBUG_is_enabled_and_task_successful() throws Exception {
     logTester.setLevel(LoggerLevel.DEBUG);
 
-    when(queue.peek(anyString(), anyBoolean(), anyBoolean())).thenReturn(Optional.of(createCeTask(submitter)));
+    when(queue.peek(anyString(), anyBoolean())).thenReturn(Optional.of(createCeTask(submitter)));
     taskProcessorRepository.setProcessorForTask(CeTaskTypes.REPORT, taskProcessor);
 
     underTest.call();
@@ -381,7 +381,7 @@ public class CeWorkerImplTest {
     logTester.setLevel(LoggerLevel.DEBUG);
 
     CeTask ceTask = createCeTask(submitter);
-    when(queue.peek(anyString(), anyBoolean(), anyBoolean())).thenReturn(Optional.of(ceTask));
+    when(queue.peek(anyString(), anyBoolean())).thenReturn(Optional.of(ceTask));
     taskProcessorRepository.setProcessorForTask(CeTaskTypes.REPORT, taskProcessor);
     makeTaskProcessorFail(ceTask);
 
@@ -401,7 +401,7 @@ public class CeWorkerImplTest {
   @Test
   public void call_sets_and_restores_thread_name_with_information_of_worker_when_there_is_no_task_to_process() throws Exception {
     String threadName = randomAlphabetic(3);
-    when(queue.peek(anyString(), anyBoolean(), anyBoolean())).thenAnswer(invocation -> {
+    when(queue.peek(anyString(), anyBoolean())).thenAnswer(invocation -> {
       assertThat(Thread.currentThread().getName())
         .isEqualTo("Worker " + randomOrdinal + " (UUID=" + workerUuid + ") on " + threadName);
       return Optional.empty();
@@ -415,7 +415,7 @@ public class CeWorkerImplTest {
   @Test
   public void call_sets_and_restores_thread_name_with_information_of_worker_when_a_task_is_processed() throws Exception {
     String threadName = randomAlphabetic(3);
-    when(queue.peek(anyString(), anyBoolean(), anyBoolean())).thenAnswer(invocation -> {
+    when(queue.peek(anyString(), anyBoolean())).thenAnswer(invocation -> {
       assertThat(Thread.currentThread().getName())
         .isEqualTo("Worker " + randomOrdinal + " (UUID=" + workerUuid + ") on " + threadName);
       return Optional.of(createCeTask(submitter));
@@ -431,7 +431,7 @@ public class CeWorkerImplTest {
   public void call_sets_and_restores_thread_name_with_information_of_worker_when_an_error_occurs() throws Exception {
     String threadName = randomAlphabetic(3);
     CeTask ceTask = createCeTask(submitter);
-    when(queue.peek(anyString(), anyBoolean(), anyBoolean())).thenAnswer(invocation -> {
+    when(queue.peek(anyString(), anyBoolean())).thenAnswer(invocation -> {
       assertThat(Thread.currentThread().getName())
         .isEqualTo("Worker " + randomOrdinal + " (UUID=" + workerUuid + ") on " + threadName);
       return Optional.of(ceTask);
@@ -459,7 +459,7 @@ public class CeWorkerImplTest {
   @Test
   public void log_error_when_task_fails_with_not_MessageException() throws Exception {
     CeTask ceTask = createCeTask(submitter);
-    when(queue.peek(anyString(), anyBoolean(), anyBoolean())).thenReturn(Optional.of(ceTask));
+    when(queue.peek(anyString(), anyBoolean())).thenReturn(Optional.of(ceTask));
     taskProcessorRepository.setProcessorForTask(CeTaskTypes.REPORT, taskProcessor);
     makeTaskProcessorFail(ceTask);
 
@@ -477,7 +477,7 @@ public class CeWorkerImplTest {
   @Test
   public void do_no_log_error_when_task_fails_with_MessageException() throws Exception {
     CeTask ceTask = createCeTask(submitter);
-    when(queue.peek(anyString(), anyBoolean(), anyBoolean())).thenReturn(Optional.of(ceTask));
+    when(queue.peek(anyString(), anyBoolean())).thenReturn(Optional.of(ceTask));
     taskProcessorRepository.setProcessorForTask(CeTaskTypes.REPORT, taskProcessor);
     makeTaskProcessorFail(ceTask, MessageException.of("simulate MessageException thrown by TaskProcessor#process"));
 
@@ -493,7 +493,7 @@ public class CeWorkerImplTest {
   @Test
   public void log_error_when_task_was_successful_but_ending_state_can_not_be_persisted_to_db() throws Exception {
     CeTask ceTask = createCeTask(submitter);
-    when(queue.peek(anyString(), anyBoolean(), anyBoolean())).thenReturn(Optional.of(ceTask));
+    when(queue.peek(anyString(), anyBoolean())).thenReturn(Optional.of(ceTask));
     taskProcessorRepository.setProcessorForTask(CeTaskTypes.REPORT, taskProcessor);
     doThrow(new RuntimeException("Simulate queue#remove failing")).when(queue).remove(ceTask, CeActivityDto.Status.SUCCESS, null, null);
 
@@ -505,7 +505,7 @@ public class CeWorkerImplTest {
   @Test
   public void log_error_when_task_failed_and_ending_state_can_not_be_persisted_to_db() throws Exception {
     CeTask ceTask = createCeTask(submitter);
-    when(queue.peek(anyString(), anyBoolean(), anyBoolean())).thenReturn(Optional.of(ceTask));
+    when(queue.peek(anyString(), anyBoolean())).thenReturn(Optional.of(ceTask));
     taskProcessorRepository.setProcessorForTask(CeTaskTypes.REPORT, taskProcessor);
     IllegalStateException ex = makeTaskProcessorFail(ceTask);
     RuntimeException runtimeException = new RuntimeException("Simulate queue#remove failing");
@@ -534,7 +534,7 @@ public class CeWorkerImplTest {
   @Test
   public void log_error_as_suppressed_when_task_failed_with_MessageException_and_ending_state_can_not_be_persisted_to_db() throws Exception {
     CeTask ceTask = createCeTask(submitter);
-    when(queue.peek(anyString(), anyBoolean(), anyBoolean())).thenReturn(Optional.of(ceTask));
+    when(queue.peek(anyString(), anyBoolean())).thenReturn(Optional.of(ceTask));
     taskProcessorRepository.setProcessorForTask(CeTaskTypes.REPORT, taskProcessor);
     MessageException ex = makeTaskProcessorFail(ceTask, MessageException.of("simulate MessageException thrown by TaskProcessor#process"));
     RuntimeException runtimeException = new RuntimeException("Simulate queue#remove failing");
@@ -565,7 +565,7 @@ public class CeWorkerImplTest {
     CountDownLatch inCallLatch = new CountDownLatch(1);
     CountDownLatch assertionsDoneLatch = new CountDownLatch(1);
     // mock long running peek(String) call => Thread is executing call() but not running a task
-    when(queue.peek(anyString(), anyBoolean(), anyBoolean())).thenAnswer((Answer<Optional<CeTask>>) invocation -> {
+    when(queue.peek(anyString(), anyBoolean())).thenAnswer((Answer<Optional<CeTask>>) invocation -> {
       inCallLatch.countDown();
       try {
         assertionsDoneLatch.await(10, TimeUnit.SECONDS);
@@ -600,7 +600,7 @@ public class CeWorkerImplTest {
     String taskType = randomAlphabetic(12);
     CeTask ceTask = mock(CeTask.class);
     when(ceTask.getType()).thenReturn(taskType);
-    when(queue.peek(anyString(), anyBoolean(), anyBoolean())).thenReturn(Optional.of(ceTask));
+    when(queue.peek(anyString(), anyBoolean())).thenReturn(Optional.of(ceTask));
     taskProcessorRepository.setProcessorForTask(taskType, new SimpleCeTaskProcessor() {
       @CheckForNull
       @Override
@@ -641,15 +641,13 @@ public class CeWorkerImplTest {
   @Test
   public void do_not_exclude_portfolio_when_indexation_task_lookup_is_disabled() throws Exception {
     // first call with empty queue to disable indexationTaskLookupEnabled
-    when(queue.peek(anyString(), anyBoolean(), anyBoolean())).thenReturn(Optional.empty());
+    when(queue.peek(anyString(), anyBoolean())).thenReturn(Optional.empty());
     assertThat(underTest.call()).isEqualTo(NO_TASK);
 
-    ArgumentCaptor<Boolean> booleanCaptor = ArgumentCaptor.forClass(Boolean.class);
     // following calls should not exclude portfolios
-    when(queue.peek(anyString(), anyBoolean(), anyBoolean())).thenReturn(Optional.of(createCeTask(submitter)));
+    when(queue.peek(anyString(), anyBoolean())).thenReturn(Optional.of(createCeTask(submitter)));
     assertThat(underTest.call()).isEqualTo(TASK_PROCESSED);
-    verify(queue, times(3)).peek(anyString(), anyBoolean(), booleanCaptor.capture());
-    assertThat(booleanCaptor.getAllValues()).containsExactly(true, true, false);
+    verify(queue, times(2)).peek(anyString(), anyBoolean());
   }
 
   @Test
@@ -657,7 +655,7 @@ public class CeWorkerImplTest {
     CountDownLatch inCallLatch = new CountDownLatch(1);
     CountDownLatch assertionsDoneLatch = new CountDownLatch(1);
     // mock long running peek(String) call => Thread is executing call() but not running a task
-    when(queue.peek(anyString(), anyBoolean(), anyBoolean())).thenAnswer((Answer<Optional<CeTask>>) invocation -> {
+    when(queue.peek(anyString(), anyBoolean())).thenAnswer((Answer<Optional<CeTask>>) invocation -> {
       inCallLatch.countDown();
       try {
         assertionsDoneLatch.await(10, TimeUnit.SECONDS);
@@ -688,7 +686,7 @@ public class CeWorkerImplTest {
     String taskType = randomAlphabetic(12);
     CeTask ceTask = mock(CeTask.class);
     when(ceTask.getType()).thenReturn(taskType);
-    when(queue.peek(anyString(), anyBoolean(), anyBoolean())).thenReturn(Optional.of(ceTask));
+    when(queue.peek(anyString(), anyBoolean())).thenReturn(Optional.of(ceTask));
     taskProcessorRepository.setProcessorForTask(taskType, new SimpleCeTaskProcessor() {
 
       @CheckForNull
@@ -745,7 +743,7 @@ public class CeWorkerImplTest {
   }
 
   private void verifyWorkerUuid() {
-    verify(queue, atLeastOnce()).peek(workerUuidCaptor.capture(), anyBoolean(), anyBoolean());
+    verify(queue, atLeastOnce()).peek(workerUuidCaptor.capture(), anyBoolean());
     assertThat(workerUuidCaptor.getValue()).isEqualTo(workerUuid);
   }