From 2a35942c35b9cf3a0b931b71906b4bd303b0934f Mon Sep 17 00:00:00 2001 From: Pierre Date: Wed, 14 Apr 2021 09:54:33 +0200 Subject: [PATCH] SONAR-14698 Live index use all workers --- .../org/sonar/ce/queue/InternalCeQueue.java | 3 +- .../sonar/ce/queue/InternalCeQueueImpl.java | 15 ++++- .../CeProcessingSchedulerImpl.java | 2 +- .../sonar/ce/taskprocessor/CeWorkerImpl.java | 29 +------- .../ce/queue/InternalCeQueueImplTest.java | 40 +++++------ .../ce/taskprocessor/CeWorkerImplTest.java | 66 +++++++++---------- 6 files changed, 70 insertions(+), 85 deletions(-) diff --git a/server/sonar-ce/src/main/java/org/sonar/ce/queue/InternalCeQueue.java b/server/sonar-ce/src/main/java/org/sonar/ce/queue/InternalCeQueue.java index 0e0903ae90e..3e834cdd8af 100644 --- a/server/sonar-ce/src/main/java/org/sonar/ce/queue/InternalCeQueue.java +++ b/server/sonar-ce/src/main/java/org/sonar/ce/queue/InternalCeQueue.java @@ -41,7 +41,6 @@ public interface InternalCeQueue extends CeQueue { * Does not return anything if workers are paused or being paused (see {@link #getWorkersPauseStatus()}. * * @param excludeIndexationJob change the underlying request to exclude indexation tasks. - * @param excludeViewRefresh change the underlying request to exclude portfolios (but still include APP) * *

Only a single task can be peeked by project.

* @@ -50,7 +49,7 @@ public interface InternalCeQueue extends CeQueue { *

Tasks which have been executed twice already but are still {@link org.sonar.db.ce.CeQueueDto.Status#PENDING} * are ignored

*/ - Optional peek(String workerUuid, boolean excludeIndexationJob, boolean excludeViewRefresh); + Optional peek(String workerUuid, boolean excludeIndexationJob); /** * Removes a task from the queue and registers it to past activities. This method diff --git a/server/sonar-ce/src/main/java/org/sonar/ce/queue/InternalCeQueueImpl.java b/server/sonar-ce/src/main/java/org/sonar/ce/queue/InternalCeQueueImpl.java index e386d7ac899..d3ebb0fec99 100644 --- a/server/sonar-ce/src/main/java/org/sonar/ce/queue/InternalCeQueueImpl.java +++ b/server/sonar-ce/src/main/java/org/sonar/ce/queue/InternalCeQueueImpl.java @@ -72,7 +72,7 @@ public class InternalCeQueueImpl extends CeQueueImpl implements InternalCeQueue } @Override - public Optional peek(String workerUuid, boolean excludeIndexationJob, boolean excludeViewRefresh) { + public Optional peek(String workerUuid, boolean excludeIndexationJob) { requireNonNull(workerUuid, "workerUuid can't be null"); if (computeEngineStatus.getStatus() != ComputeEngineStatus.Status.STARTED || getWorkersPauseStatus() != WorkersPauseStatus.RESUMED) { @@ -85,7 +85,7 @@ public class InternalCeQueueImpl extends CeQueueImpl implements InternalCeQueue dbSession.commit(); LOG.debug("{} in progress tasks reset for worker uuid {}", i, workerUuid); } - Optional opt = ceQueueDao.peek(dbSession, workerUuid, excludeIndexationJob, excludeViewRefresh); + Optional opt = findPendingTask(workerUuid, dbSession, ceQueueDao, excludeIndexationJob); if (!opt.isPresent()) { return Optional.empty(); } @@ -102,6 +102,17 @@ public class InternalCeQueueImpl extends CeQueueImpl implements InternalCeQueue } } + private Optional findPendingTask(String workerUuid, DbSession dbSession, CeQueueDao ceQueueDao, boolean excludeIndexationJob) { + // try to find tasks including indexation job & excluding app/portfolio + // and if no match, try the opposite + // when excludeIndexationJob is false, search first excluding indexation jobs and including app/portfolio, then the opposite + Optional opt = ceQueueDao.peek(dbSession, workerUuid, excludeIndexationJob, !excludeIndexationJob); + if (!opt.isPresent()) { + opt = ceQueueDao.peek(dbSession, workerUuid, !excludeIndexationJob, excludeIndexationJob); + } + return opt; + } + @Override public void remove(CeTask task, CeActivityDto.Status status, @Nullable CeTaskResult taskResult, @Nullable Throwable error) { checkArgument(error == null || status == CeActivityDto.Status.FAILED, "Error can be provided only when status is FAILED"); diff --git a/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeProcessingSchedulerImpl.java b/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeProcessingSchedulerImpl.java index 3e92f875dab..fd8cc7a2186 100644 --- a/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeProcessingSchedulerImpl.java +++ b/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeProcessingSchedulerImpl.java @@ -207,8 +207,8 @@ public class CeProcessingSchedulerImpl implements CeProcessingScheduler { public void stop(boolean interrupt) { keepRunning = false; + interrupted = true; if (workerFuture != null) { - interrupted = true; workerFuture.cancel(interrupt); } } diff --git a/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeWorkerImpl.java b/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeWorkerImpl.java index 9bbff219f88..89d2f75dc0b 100644 --- a/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeWorkerImpl.java +++ b/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeWorkerImpl.java @@ -61,7 +61,6 @@ public class CeWorkerImpl implements CeWorker { private final CeWorkerController ceWorkerController; private final List listeners; private final AtomicReference runningState = new AtomicReference<>(); - private boolean indexationTaskLookupEnabled; private boolean excludeIndexationJob; public CeWorkerImpl(int ordinal, String uuid, @@ -74,8 +73,7 @@ public class CeWorkerImpl implements CeWorker { this.taskProcessorRepository = taskProcessorRepository; this.ceWorkerController = ceWorkerController; this.listeners = Arrays.asList(listeners); - indexationTaskLookupEnabled = true; - excludeIndexationJob = false; + this.excludeIndexationJob = true; } private static int checkOrdinal(int ordinal) { @@ -167,36 +165,15 @@ public class CeWorkerImpl implements CeWorker { } private Optional tryAndFindTaskToExecute() { + excludeIndexationJob = !excludeIndexationJob; try { - if (indexationTaskLookupEnabled) { - return tryAndFindTaskToExecuteIncludingIndexation(); - } else { - return queue.peek(uuid, true, false); - } + return queue.peek(uuid, excludeIndexationJob); } catch (Exception e) { LOG.error("Failed to pop the queue of analysis reports", e); } return Optional.empty(); } - private Optional tryAndFindTaskToExecuteIncludingIndexation() { - excludeIndexationJob = !excludeIndexationJob; - Optional peek = queue.peek(uuid, excludeIndexationJob, true); - if (peek.isPresent()) { - return peek; - } - if (excludeIndexationJob) { - peek = queue.peek(uuid, false, true); - if (peek.isPresent()) { - return peek; - } - // do not lookup for indexation tasks anymore - indexationTaskLookupEnabled = false; - LOG.info(String.format("worker %s found no pending task (including indexation task). Disabling indexation task lookup for this worker until next SonarQube restart.", uuid)); - } - return Optional.empty(); - } - private final class ExecuteTask implements Runnable, AutoCloseable { private final CeTask task; private final RunningState localRunningState; diff --git a/server/sonar-ce/src/test/java/org/sonar/ce/queue/InternalCeQueueImplTest.java b/server/sonar-ce/src/test/java/org/sonar/ce/queue/InternalCeQueueImplTest.java index 33b96e58c02..cd4f7368ebd 100644 --- a/server/sonar-ce/src/test/java/org/sonar/ce/queue/InternalCeQueueImplTest.java +++ b/server/sonar-ce/src/test/java/org/sonar/ce/queue/InternalCeQueueImplTest.java @@ -147,18 +147,18 @@ public class InternalCeQueueImplTest { expectedException.expect(NullPointerException.class); expectedException.expectMessage("workerUuid can't be null"); - underTest.peek(null, false, false); + underTest.peek(null, true); } @Test public void test_remove() { CeTask task = submit(CeTaskTypes.REPORT, newProjectDto("PROJECT_1")); - Optional peek = underTest.peek(WORKER_UUID_1, false, false); + Optional peek = underTest.peek(WORKER_UUID_1, true); underTest.remove(peek.get(), CeActivityDto.Status.SUCCESS, null, null); // queue is empty assertThat(db.getDbClient().ceQueueDao().selectByUuid(db.getSession(), task.getUuid())).isNotPresent(); - assertThat(underTest.peek(WORKER_UUID_2, false, false)).isNotPresent(); + assertThat(underTest.peek(WORKER_UUID_2, true)).isNotPresent(); // available in history Optional history = db.getDbClient().ceActivityDao().selectByUuid(db.getSession(), task.getUuid()); @@ -187,7 +187,7 @@ public class InternalCeQueueImplTest { @Test public void remove_does_not_set_analysisUuid_in_CeActivity_when_CeTaskResult_has_no_analysis_uuid() { CeTask task = submit(CeTaskTypes.REPORT, newProjectDto("PROJECT_1")); - Optional peek = underTest.peek(WORKER_UUID_1, false, false); + Optional peek = underTest.peek(WORKER_UUID_1, true); underTest.remove(peek.get(), CeActivityDto.Status.SUCCESS, newTaskResult(null), null); // available in history @@ -200,7 +200,7 @@ public class InternalCeQueueImplTest { public void remove_sets_analysisUuid_in_CeActivity_when_CeTaskResult_has_analysis_uuid() { CeTask task = submit(CeTaskTypes.REPORT, newProjectDto("PROJECT_1")); - Optional peek = underTest.peek(WORKER_UUID_2, false, false); + Optional peek = underTest.peek(WORKER_UUID_2, true); underTest.remove(peek.get(), CeActivityDto.Status.SUCCESS, newTaskResult(AN_ANALYSIS_UUID), null); // available in history @@ -214,7 +214,7 @@ public class InternalCeQueueImplTest { Throwable error = new NullPointerException("Fake NPE to test persistence to DB"); CeTask task = submit(CeTaskTypes.REPORT, newProjectDto("PROJECT_1")); - Optional peek = underTest.peek(WORKER_UUID_1, false, false); + Optional peek = underTest.peek(WORKER_UUID_1, true); underTest.remove(peek.get(), CeActivityDto.Status.FAILED, null, error); Optional activityDto = db.getDbClient().ceActivityDao().selectByUuid(session, task.getUuid()); @@ -230,7 +230,7 @@ public class InternalCeQueueImplTest { Throwable error = new TypedExceptionImpl("aType", "aMessage"); CeTask task = submit(CeTaskTypes.REPORT, newProjectDto("PROJECT_1")); - Optional peek = underTest.peek(WORKER_UUID_1, false, false); + Optional peek = underTest.peek(WORKER_UUID_1, true); underTest.remove(peek.get(), CeActivityDto.Status.FAILED, null, error); CeActivityDto activityDto = db.getDbClient().ceActivityDao().selectByUuid(session, task.getUuid()).get(); @@ -335,7 +335,7 @@ public class InternalCeQueueImplTest { public void test_peek() { CeTask task = submit(CeTaskTypes.REPORT, newProjectDto("PROJECT_1")); - Optional peek = underTest.peek(WORKER_UUID_1, false, false); + Optional peek = underTest.peek(WORKER_UUID_1, true); assertThat(peek.isPresent()).isTrue(); assertThat(peek.get().getUuid()).isEqualTo(task.getUuid()); assertThat(peek.get().getType()).isEqualTo(CeTaskTypes.REPORT); @@ -343,7 +343,7 @@ public class InternalCeQueueImplTest { assertThat(peek.get().getMainComponent()).contains(peek.get().getComponent().get()); // no more pending tasks - peek = underTest.peek(WORKER_UUID_2, false, false); + peek = underTest.peek(WORKER_UUID_2, true); assertThat(peek.isPresent()).isFalse(); } @@ -353,7 +353,7 @@ public class InternalCeQueueImplTest { ComponentDto branch = db.components().insertProjectBranch(project); CeTask task = submit(CeTaskTypes.REPORT, branch); - Optional peek = underTest.peek(WORKER_UUID_1, false, false); + Optional peek = underTest.peek(WORKER_UUID_1, true); assertThat(peek.isPresent()).isTrue(); assertThat(peek.get().getUuid()).isEqualTo(task.getUuid()); assertThat(peek.get().getType()).isEqualTo(CeTaskTypes.REPORT); @@ -361,7 +361,7 @@ public class InternalCeQueueImplTest { assertThat(peek.get().getMainComponent()).contains(new CeTask.Component(project.uuid(), project.getDbKey(), project.name())); // no more pending tasks - peek = underTest.peek(WORKER_UUID_2, false, false); + peek = underTest.peek(WORKER_UUID_2, true); assertThat(peek.isPresent()).isFalse(); } @@ -370,11 +370,11 @@ public class InternalCeQueueImplTest { CeTask task = submit(CeTaskTypes.REPORT, newProjectDto("PROJECT_1")); underTest.pauseWorkers(); - Optional peek = underTest.peek(WORKER_UUID_1, false, false); + Optional peek = underTest.peek(WORKER_UUID_1, true); assertThat(peek).isEmpty(); underTest.resumeWorkers(); - peek = underTest.peek(WORKER_UUID_1, false, false); + peek = underTest.peek(WORKER_UUID_1, true); assertThat(peek).isPresent(); assertThat(peek.get().getUuid()).isEqualTo(task.getUuid()); } @@ -388,7 +388,7 @@ public class InternalCeQueueImplTest { makeInProgress(dto, "foo"); db.commit(); - assertThat(underTest.peek(WORKER_UUID_1, false, false)).isEmpty(); + assertThat(underTest.peek(WORKER_UUID_1, true)).isEmpty(); } @Test @@ -396,7 +396,7 @@ public class InternalCeQueueImplTest { submit(CeTaskTypes.REPORT, newProjectDto("PROJECT_1")); when(computeEngineStatus.getStatus()).thenReturn(STOPPING); - Optional peek = underTest.peek(WORKER_UUID_1, false, false); + Optional peek = underTest.peek(WORKER_UUID_1, true); assertThat(peek.isPresent()).isFalse(); } @@ -408,7 +408,7 @@ public class InternalCeQueueImplTest { .setStatus(CeQueueDto.Status.PENDING)); db.commit(); - assertThat(underTest.peek(WORKER_UUID_1, false, false).get().getUuid()).isEqualTo("uuid"); + assertThat(underTest.peek(WORKER_UUID_1, true).get().getUuid()).isEqualTo("uuid"); } @Test @@ -417,7 +417,7 @@ public class InternalCeQueueImplTest { CeQueueDto u1 = insertPending("u1");// will be picked-because older than any of the reset ones CeQueueDto u2 = insertInProgress("u2", WORKER_UUID_1);// will be reset - assertThat(underTest.peek(WORKER_UUID_1, false, false).get().getUuid()).isEqualTo("u0"); + assertThat(underTest.peek(WORKER_UUID_1, true).get().getUuid()).isEqualTo("u0"); verifyUnmodifiedTask(u1); verifyResetTask(u2); @@ -431,7 +431,7 @@ public class InternalCeQueueImplTest { CeQueueDto u3 = insertInProgress("u3", WORKER_UUID_1); CeQueueDto u4 = insertInProgress("u4", WORKER_UUID_2); - assertThat(underTest.peek(WORKER_UUID_1, false, false).get().getUuid()).isEqualTo("u0"); + assertThat(underTest.peek(WORKER_UUID_1, true).get().getUuid()).isEqualTo("u0"); verifyResetTask(u1); verifyUnmodifiedTask(u2); @@ -489,7 +489,7 @@ public class InternalCeQueueImplTest { @Test public void fail_to_cancel_if_in_progress() { CeTask task = submit(CeTaskTypes.REPORT, newProjectDto("PROJECT_1")); - underTest.peek(WORKER_UUID_2, false, false); + underTest.peek(WORKER_UUID_2, true); CeQueueDto queueDto = db.getDbClient().ceQueueDao().selectByUuid(db.getSession(), task.getUuid()).get(); expectedException.expect(IllegalStateException.class); @@ -503,7 +503,7 @@ public class InternalCeQueueImplTest { CeTask inProgressTask = submit(CeTaskTypes.REPORT, newProjectDto("PROJECT_1")); CeTask pendingTask1 = submit(CeTaskTypes.REPORT, newProjectDto("PROJECT_2")); CeTask pendingTask2 = submit(CeTaskTypes.REPORT, newProjectDto("PROJECT_3")); - underTest.peek(WORKER_UUID_2, false, false); + underTest.peek(WORKER_UUID_2, true); int canceledCount = underTest.cancelAll(); assertThat(canceledCount).isEqualTo(2); diff --git a/server/sonar-ce/src/test/java/org/sonar/ce/taskprocessor/CeWorkerImplTest.java b/server/sonar-ce/src/test/java/org/sonar/ce/taskprocessor/CeWorkerImplTest.java index 49051202383..60a3253ef4b 100644 --- a/server/sonar-ce/src/test/java/org/sonar/ce/taskprocessor/CeWorkerImplTest.java +++ b/server/sonar-ce/src/test/java/org/sonar/ce/taskprocessor/CeWorkerImplTest.java @@ -146,7 +146,7 @@ public class CeWorkerImplTest { @Test public void no_pending_tasks_in_queue() throws Exception { - when(queue.peek(anyString(), anyBoolean(), anyBoolean())).thenReturn(Optional.empty()); + when(queue.peek(anyString(), anyBoolean())).thenReturn(Optional.empty()); assertThat(underTest.call()).isEqualTo(NO_TASK); @@ -155,7 +155,7 @@ public class CeWorkerImplTest { @Test public void no_pending_tasks_in_queue_without_listener() throws Exception { - when(queue.peek(anyString(), anyBoolean(), anyBoolean())).thenReturn(Optional.empty()); + when(queue.peek(anyString(), anyBoolean())).thenReturn(Optional.empty()); assertThat(underTestNoListener.call()).isEqualTo(NO_TASK); @@ -166,7 +166,7 @@ public class CeWorkerImplTest { public void fail_when_no_CeTaskProcessor_is_found_in_repository() throws Exception { CeTask task = createCeTask(null); taskProcessorRepository.setNoProcessorForTask(CeTaskTypes.REPORT); - when(queue.peek(anyString(), anyBoolean(), anyBoolean())).thenReturn(Optional.of(task)); + when(queue.peek(anyString(), anyBoolean())).thenReturn(Optional.of(task)); assertThat(underTest.call()).isEqualTo(TASK_PROCESSED); @@ -182,7 +182,7 @@ public class CeWorkerImplTest { public void fail_when_no_CeTaskProcessor_is_found_in_repository_without_listener() throws Exception { CeTask task = createCeTask(null); taskProcessorRepository.setNoProcessorForTask(CeTaskTypes.REPORT); - when(queue.peek(anyString(), anyBoolean(), anyBoolean())).thenReturn(Optional.of(task)); + when(queue.peek(anyString(), anyBoolean())).thenReturn(Optional.of(task)); assertThat(underTestNoListener.call()).isEqualTo(TASK_PROCESSED); @@ -195,7 +195,7 @@ public class CeWorkerImplTest { public void peek_and_process_task() throws Exception { CeTask task = createCeTask(null); taskProcessorRepository.setProcessorForTask(task.getType(), taskProcessor); - when(queue.peek(anyString(), anyBoolean(), anyBoolean())).thenReturn(Optional.of(task)); + when(queue.peek(anyString(), anyBoolean())).thenReturn(Optional.of(task)); assertThat(underTest.call()).isEqualTo(TASK_PROCESSED); @@ -212,7 +212,7 @@ public class CeWorkerImplTest { public void peek_and_process_task_without_listeners() throws Exception { CeTask task = createCeTask(null); taskProcessorRepository.setProcessorForTask(task.getType(), taskProcessor); - when(queue.peek(anyString(), anyBoolean(), anyBoolean())).thenReturn(Optional.of(task)); + when(queue.peek(anyString(), anyBoolean())).thenReturn(Optional.of(task)); assertThat(underTestNoListener.call()).isEqualTo(TASK_PROCESSED); @@ -225,7 +225,7 @@ public class CeWorkerImplTest { @Test public void fail_to_process_task() throws Exception { CeTask task = createCeTask(null); - when(queue.peek(anyString(), anyBoolean(), anyBoolean())).thenReturn(Optional.of(task)); + when(queue.peek(anyString(), anyBoolean())).thenReturn(Optional.of(task)); taskProcessorRepository.setProcessorForTask(task.getType(), taskProcessor); Throwable error = makeTaskProcessorFail(task); @@ -243,7 +243,7 @@ public class CeWorkerImplTest { @Test public void fail_to_process_task_without_listeners() throws Exception { CeTask task = createCeTask(null); - when(queue.peek(anyString(), anyBoolean(), anyBoolean())).thenReturn(Optional.of(task)); + when(queue.peek(anyString(), anyBoolean())).thenReturn(Optional.of(task)); taskProcessorRepository.setProcessorForTask(task.getType(), taskProcessor); Throwable error = makeTaskProcessorFail(task); @@ -257,7 +257,7 @@ public class CeWorkerImplTest { @Test public void log_task_characteristics() throws Exception { - when(queue.peek(anyString(), anyBoolean(), anyBoolean())).thenReturn(Optional.of(createCeTask(null, "pullRequest", "123", "branch", "foo"))); + when(queue.peek(anyString(), anyBoolean())).thenReturn(Optional.of(createCeTask(null, "pullRequest", "123", "branch", "foo"))); taskProcessorRepository.setProcessorForTask(CeTaskTypes.REPORT, taskProcessor); underTest.call(); @@ -272,7 +272,7 @@ public class CeWorkerImplTest { @Test public void do_not_log_submitter_param_if_anonymous_and_success() throws Exception { - when(queue.peek(anyString(), anyBoolean(), anyBoolean())).thenReturn(Optional.of(createCeTask(null))); + when(queue.peek(anyString(), anyBoolean())).thenReturn(Optional.of(createCeTask(null))); taskProcessorRepository.setProcessorForTask(CeTaskTypes.REPORT, taskProcessor); underTest.call(); @@ -288,7 +288,7 @@ public class CeWorkerImplTest { @Test public void do_not_log_submitter_param_if_anonymous_and_error() throws Exception { CeTask ceTask = createCeTask(null); - when(queue.peek(anyString(), anyBoolean(), anyBoolean())).thenReturn(Optional.of(ceTask)); + when(queue.peek(anyString(), anyBoolean())).thenReturn(Optional.of(ceTask)); taskProcessorRepository.setProcessorForTask(ceTask.getType(), taskProcessor); makeTaskProcessorFail(ceTask); @@ -308,7 +308,7 @@ public class CeWorkerImplTest { @Test public void log_submitter_login_if_authenticated_and_success() throws Exception { UserDto userDto = insertRandomUser(); - when(queue.peek(anyString(), anyBoolean(), anyBoolean())).thenReturn(Optional.of(createCeTask(toTaskSubmitter(userDto)))); + when(queue.peek(anyString(), anyBoolean())).thenReturn(Optional.of(createCeTask(toTaskSubmitter(userDto)))); taskProcessorRepository.setProcessorForTask(CeTaskTypes.REPORT, taskProcessor); underTest.call(); @@ -324,7 +324,7 @@ public class CeWorkerImplTest { @Test public void log_submitterUuid_if_user_matching_submitterUuid_can_not_be_found() throws Exception { - when(queue.peek(anyString(), anyBoolean(), anyBoolean())).thenReturn(Optional.of(createCeTask(new CeTask.User("UUID_USER", null)))); + when(queue.peek(anyString(), anyBoolean())).thenReturn(Optional.of(createCeTask(new CeTask.User("UUID_USER", null)))); taskProcessorRepository.setProcessorForTask(CeTaskTypes.REPORT, taskProcessor); underTest.call(); @@ -342,7 +342,7 @@ public class CeWorkerImplTest { public void display_submitterLogin_in_logs_when_set_in_case_of_error() throws Exception { UserDto userDto = insertRandomUser(); CeTask ceTask = createCeTask(toTaskSubmitter(userDto)); - when(queue.peek(anyString(), anyBoolean(), anyBoolean())).thenReturn(Optional.of(ceTask)); + when(queue.peek(anyString(), anyBoolean())).thenReturn(Optional.of(ceTask)); taskProcessorRepository.setProcessorForTask(ceTask.getType(), taskProcessor); makeTaskProcessorFail(ceTask); @@ -362,7 +362,7 @@ public class CeWorkerImplTest { public void display_start_stop_at_debug_level_for_console_if_DEBUG_is_enabled_and_task_successful() throws Exception { logTester.setLevel(LoggerLevel.DEBUG); - when(queue.peek(anyString(), anyBoolean(), anyBoolean())).thenReturn(Optional.of(createCeTask(submitter))); + when(queue.peek(anyString(), anyBoolean())).thenReturn(Optional.of(createCeTask(submitter))); taskProcessorRepository.setProcessorForTask(CeTaskTypes.REPORT, taskProcessor); underTest.call(); @@ -381,7 +381,7 @@ public class CeWorkerImplTest { logTester.setLevel(LoggerLevel.DEBUG); CeTask ceTask = createCeTask(submitter); - when(queue.peek(anyString(), anyBoolean(), anyBoolean())).thenReturn(Optional.of(ceTask)); + when(queue.peek(anyString(), anyBoolean())).thenReturn(Optional.of(ceTask)); taskProcessorRepository.setProcessorForTask(CeTaskTypes.REPORT, taskProcessor); makeTaskProcessorFail(ceTask); @@ -401,7 +401,7 @@ public class CeWorkerImplTest { @Test public void call_sets_and_restores_thread_name_with_information_of_worker_when_there_is_no_task_to_process() throws Exception { String threadName = randomAlphabetic(3); - when(queue.peek(anyString(), anyBoolean(), anyBoolean())).thenAnswer(invocation -> { + when(queue.peek(anyString(), anyBoolean())).thenAnswer(invocation -> { assertThat(Thread.currentThread().getName()) .isEqualTo("Worker " + randomOrdinal + " (UUID=" + workerUuid + ") on " + threadName); return Optional.empty(); @@ -415,7 +415,7 @@ public class CeWorkerImplTest { @Test public void call_sets_and_restores_thread_name_with_information_of_worker_when_a_task_is_processed() throws Exception { String threadName = randomAlphabetic(3); - when(queue.peek(anyString(), anyBoolean(), anyBoolean())).thenAnswer(invocation -> { + when(queue.peek(anyString(), anyBoolean())).thenAnswer(invocation -> { assertThat(Thread.currentThread().getName()) .isEqualTo("Worker " + randomOrdinal + " (UUID=" + workerUuid + ") on " + threadName); return Optional.of(createCeTask(submitter)); @@ -431,7 +431,7 @@ public class CeWorkerImplTest { public void call_sets_and_restores_thread_name_with_information_of_worker_when_an_error_occurs() throws Exception { String threadName = randomAlphabetic(3); CeTask ceTask = createCeTask(submitter); - when(queue.peek(anyString(), anyBoolean(), anyBoolean())).thenAnswer(invocation -> { + when(queue.peek(anyString(), anyBoolean())).thenAnswer(invocation -> { assertThat(Thread.currentThread().getName()) .isEqualTo("Worker " + randomOrdinal + " (UUID=" + workerUuid + ") on " + threadName); return Optional.of(ceTask); @@ -459,7 +459,7 @@ public class CeWorkerImplTest { @Test public void log_error_when_task_fails_with_not_MessageException() throws Exception { CeTask ceTask = createCeTask(submitter); - when(queue.peek(anyString(), anyBoolean(), anyBoolean())).thenReturn(Optional.of(ceTask)); + when(queue.peek(anyString(), anyBoolean())).thenReturn(Optional.of(ceTask)); taskProcessorRepository.setProcessorForTask(CeTaskTypes.REPORT, taskProcessor); makeTaskProcessorFail(ceTask); @@ -477,7 +477,7 @@ public class CeWorkerImplTest { @Test public void do_no_log_error_when_task_fails_with_MessageException() throws Exception { CeTask ceTask = createCeTask(submitter); - when(queue.peek(anyString(), anyBoolean(), anyBoolean())).thenReturn(Optional.of(ceTask)); + when(queue.peek(anyString(), anyBoolean())).thenReturn(Optional.of(ceTask)); taskProcessorRepository.setProcessorForTask(CeTaskTypes.REPORT, taskProcessor); makeTaskProcessorFail(ceTask, MessageException.of("simulate MessageException thrown by TaskProcessor#process")); @@ -493,7 +493,7 @@ public class CeWorkerImplTest { @Test public void log_error_when_task_was_successful_but_ending_state_can_not_be_persisted_to_db() throws Exception { CeTask ceTask = createCeTask(submitter); - when(queue.peek(anyString(), anyBoolean(), anyBoolean())).thenReturn(Optional.of(ceTask)); + when(queue.peek(anyString(), anyBoolean())).thenReturn(Optional.of(ceTask)); taskProcessorRepository.setProcessorForTask(CeTaskTypes.REPORT, taskProcessor); doThrow(new RuntimeException("Simulate queue#remove failing")).when(queue).remove(ceTask, CeActivityDto.Status.SUCCESS, null, null); @@ -505,7 +505,7 @@ public class CeWorkerImplTest { @Test public void log_error_when_task_failed_and_ending_state_can_not_be_persisted_to_db() throws Exception { CeTask ceTask = createCeTask(submitter); - when(queue.peek(anyString(), anyBoolean(), anyBoolean())).thenReturn(Optional.of(ceTask)); + when(queue.peek(anyString(), anyBoolean())).thenReturn(Optional.of(ceTask)); taskProcessorRepository.setProcessorForTask(CeTaskTypes.REPORT, taskProcessor); IllegalStateException ex = makeTaskProcessorFail(ceTask); RuntimeException runtimeException = new RuntimeException("Simulate queue#remove failing"); @@ -534,7 +534,7 @@ public class CeWorkerImplTest { @Test public void log_error_as_suppressed_when_task_failed_with_MessageException_and_ending_state_can_not_be_persisted_to_db() throws Exception { CeTask ceTask = createCeTask(submitter); - when(queue.peek(anyString(), anyBoolean(), anyBoolean())).thenReturn(Optional.of(ceTask)); + when(queue.peek(anyString(), anyBoolean())).thenReturn(Optional.of(ceTask)); taskProcessorRepository.setProcessorForTask(CeTaskTypes.REPORT, taskProcessor); MessageException ex = makeTaskProcessorFail(ceTask, MessageException.of("simulate MessageException thrown by TaskProcessor#process")); RuntimeException runtimeException = new RuntimeException("Simulate queue#remove failing"); @@ -565,7 +565,7 @@ public class CeWorkerImplTest { CountDownLatch inCallLatch = new CountDownLatch(1); CountDownLatch assertionsDoneLatch = new CountDownLatch(1); // mock long running peek(String) call => Thread is executing call() but not running a task - when(queue.peek(anyString(), anyBoolean(), anyBoolean())).thenAnswer((Answer>) invocation -> { + when(queue.peek(anyString(), anyBoolean())).thenAnswer((Answer>) invocation -> { inCallLatch.countDown(); try { assertionsDoneLatch.await(10, TimeUnit.SECONDS); @@ -600,7 +600,7 @@ public class CeWorkerImplTest { String taskType = randomAlphabetic(12); CeTask ceTask = mock(CeTask.class); when(ceTask.getType()).thenReturn(taskType); - when(queue.peek(anyString(), anyBoolean(), anyBoolean())).thenReturn(Optional.of(ceTask)); + when(queue.peek(anyString(), anyBoolean())).thenReturn(Optional.of(ceTask)); taskProcessorRepository.setProcessorForTask(taskType, new SimpleCeTaskProcessor() { @CheckForNull @Override @@ -641,15 +641,13 @@ public class CeWorkerImplTest { @Test public void do_not_exclude_portfolio_when_indexation_task_lookup_is_disabled() throws Exception { // first call with empty queue to disable indexationTaskLookupEnabled - when(queue.peek(anyString(), anyBoolean(), anyBoolean())).thenReturn(Optional.empty()); + when(queue.peek(anyString(), anyBoolean())).thenReturn(Optional.empty()); assertThat(underTest.call()).isEqualTo(NO_TASK); - ArgumentCaptor booleanCaptor = ArgumentCaptor.forClass(Boolean.class); // following calls should not exclude portfolios - when(queue.peek(anyString(), anyBoolean(), anyBoolean())).thenReturn(Optional.of(createCeTask(submitter))); + when(queue.peek(anyString(), anyBoolean())).thenReturn(Optional.of(createCeTask(submitter))); assertThat(underTest.call()).isEqualTo(TASK_PROCESSED); - verify(queue, times(3)).peek(anyString(), anyBoolean(), booleanCaptor.capture()); - assertThat(booleanCaptor.getAllValues()).containsExactly(true, true, false); + verify(queue, times(2)).peek(anyString(), anyBoolean()); } @Test @@ -657,7 +655,7 @@ public class CeWorkerImplTest { CountDownLatch inCallLatch = new CountDownLatch(1); CountDownLatch assertionsDoneLatch = new CountDownLatch(1); // mock long running peek(String) call => Thread is executing call() but not running a task - when(queue.peek(anyString(), anyBoolean(), anyBoolean())).thenAnswer((Answer>) invocation -> { + when(queue.peek(anyString(), anyBoolean())).thenAnswer((Answer>) invocation -> { inCallLatch.countDown(); try { assertionsDoneLatch.await(10, TimeUnit.SECONDS); @@ -688,7 +686,7 @@ public class CeWorkerImplTest { String taskType = randomAlphabetic(12); CeTask ceTask = mock(CeTask.class); when(ceTask.getType()).thenReturn(taskType); - when(queue.peek(anyString(), anyBoolean(), anyBoolean())).thenReturn(Optional.of(ceTask)); + when(queue.peek(anyString(), anyBoolean())).thenReturn(Optional.of(ceTask)); taskProcessorRepository.setProcessorForTask(taskType, new SimpleCeTaskProcessor() { @CheckForNull @@ -745,7 +743,7 @@ public class CeWorkerImplTest { } private void verifyWorkerUuid() { - verify(queue, atLeastOnce()).peek(workerUuidCaptor.capture(), anyBoolean(), anyBoolean()); + verify(queue, atLeastOnce()).peek(workerUuidCaptor.capture(), anyBoolean()); assertThat(workerUuidCaptor.getValue()).isEqualTo(workerUuid); } -- 2.39.5