diff options
author | Pierre <pierre.guillot@sonarsource.com> | 2020-06-02 16:13:48 +0200 |
---|---|---|
committer | sonartech <sonartech@sonarsource.com> | 2020-06-26 20:04:56 +0000 |
commit | e4b519ed129dbc7b76eab00d6c48166a8993e35f (patch) | |
tree | 2f79b9a0bdeccd1494fcc1ac4d3a14cb78b50999 /server/sonar-ce | |
parent | ac3abae6429931aa5065b9f1ac359ff9a4ca78fd (diff) | |
download | sonarqube-e4b519ed129dbc7b76eab00d6c48166a8993e35f.tar.gz sonarqube-e4b519ed129dbc7b76eab00d6c48166a8993e35f.zip |
SONAR-13444 background tasks for issue indexation implementation
Diffstat (limited to 'server/sonar-ce')
7 files changed, 139 insertions, 75 deletions
diff --git a/server/sonar-ce/src/main/java/org/sonar/ce/container/ComputeEngineContainerImpl.java b/server/sonar-ce/src/main/java/org/sonar/ce/container/ComputeEngineContainerImpl.java index a455b71f39c..f7e3793a818 100644 --- a/server/sonar-ce/src/main/java/org/sonar/ce/container/ComputeEngineContainerImpl.java +++ b/server/sonar-ce/src/main/java/org/sonar/ce/container/ComputeEngineContainerImpl.java @@ -53,6 +53,7 @@ import org.sonar.ce.async.SynchronousAsyncExecution; import org.sonar.ce.cleaning.CeCleaningModule; import org.sonar.ce.cleaning.NoopCeCleaningSchedulerImpl; import org.sonar.ce.db.ReadOnlyPropertiesDao; +import org.sonar.ce.issue.index.NoAsyncIssueIndexing; import org.sonar.ce.logging.CeProcessLogging; import org.sonar.ce.monitoring.CEQueueStatusImpl; import org.sonar.ce.monitoring.DistributedCEQueueStatusImpl; @@ -64,6 +65,7 @@ import org.sonar.ce.task.projectanalysis.ProjectAnalysisTaskModule; import org.sonar.ce.task.projectanalysis.analysis.ProjectConfigurationFactory; import org.sonar.ce.task.projectanalysis.issue.AdHocRuleCreator; import org.sonar.ce.task.projectanalysis.notification.ReportAnalysisFailureNotificationModule; +import org.sonar.ce.task.projectanalysis.taskprocessor.IssueSyncTaskModule; import org.sonar.ce.taskprocessor.CeProcessingScheduler; import org.sonar.ce.taskprocessor.CeTaskProcessorModule; import org.sonar.core.component.DefaultResourceTypes; @@ -402,6 +404,7 @@ public class ComputeEngineContainerImpl implements ComputeEngineContainer { // issues IssueStorage.class, + NoAsyncIssueIndexing.class, IssueIndexer.class, IssueIteratorFactory.class, IssueFieldsSetter.class, // used in Web Services and CE's DebtCalculator @@ -439,6 +442,7 @@ public class ComputeEngineContainerImpl implements ComputeEngineContainer { CeHttpModule.class, CeTaskCommonsModule.class, ProjectAnalysisTaskModule.class, + IssueSyncTaskModule.class, CeTaskProcessorModule.class, OfficialDistribution.class, diff --git a/server/sonar-ce/src/main/java/org/sonar/ce/issue/index/NoAsyncIssueIndexing.java b/server/sonar-ce/src/main/java/org/sonar/ce/issue/index/NoAsyncIssueIndexing.java new file mode 100644 index 00000000000..ba4d525c4dd --- /dev/null +++ b/server/sonar-ce/src/main/java/org/sonar/ce/issue/index/NoAsyncIssueIndexing.java @@ -0,0 +1,31 @@ +/* + * SonarQube + * Copyright (C) 2009-2020 SonarSource SA + * mailto:info AT sonarsource DOT com + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 3 of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with this program; if not, write to the Free Software Foundation, + * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ +package org.sonar.ce.issue.index; + +import org.sonar.api.ce.ComputeEngineSide; +import org.sonar.server.issue.index.AsyncIssueIndexing; + +@ComputeEngineSide +public class NoAsyncIssueIndexing implements AsyncIssueIndexing { + @Override + public void triggerOnIndexCreation() { + throw new IllegalStateException("Async issue indexing should not be triggered in Compute Engine"); + } +} diff --git a/server/sonar-ce/src/main/java/org/sonar/ce/queue/InternalCeQueue.java b/server/sonar-ce/src/main/java/org/sonar/ce/queue/InternalCeQueue.java index 2058d7a19c5..30466805efc 100644 --- a/server/sonar-ce/src/main/java/org/sonar/ce/queue/InternalCeQueue.java +++ b/server/sonar-ce/src/main/java/org/sonar/ce/queue/InternalCeQueue.java @@ -40,6 +40,8 @@ public interface InternalCeQueue extends CeQueue { * The task status is changed to {@link org.sonar.db.ce.CeQueueDto.Status#IN_PROGRESS}. * Does not return anything if workers are paused or being paused (see {@link #getWorkersPauseStatus()}. * + * @param excludeIndexationJob change the underlying request to exclude indexation tasks. + * * <p>Only a single task can be peeked by project.</p> * * <p>An unchecked exception may be thrown on technical errors (db connection, ...).</p> @@ -47,7 +49,7 @@ public interface InternalCeQueue extends CeQueue { * <p>Tasks which have been executed twice already but are still {@link org.sonar.db.ce.CeQueueDto.Status#PENDING} * are ignored</p> */ - Optional<CeTask> peek(String workerUuid); + Optional<CeTask> peek(String workerUuid, boolean excludeIndexationJob); /** * Removes a task from the queue and registers it to past activities. This method diff --git a/server/sonar-ce/src/main/java/org/sonar/ce/queue/InternalCeQueueImpl.java b/server/sonar-ce/src/main/java/org/sonar/ce/queue/InternalCeQueueImpl.java index 5bdab435748..711961b1f95 100644 --- a/server/sonar-ce/src/main/java/org/sonar/ce/queue/InternalCeQueueImpl.java +++ b/server/sonar-ce/src/main/java/org/sonar/ce/queue/InternalCeQueueImpl.java @@ -73,7 +73,7 @@ public class InternalCeQueueImpl extends CeQueueImpl implements InternalCeQueue } @Override - public Optional<CeTask> peek(String workerUuid) { + public Optional<CeTask> peek(String workerUuid, boolean excludeIndexationJob) { requireNonNull(workerUuid, "workerUuid can't be null"); if (computeEngineStatus.getStatus() != ComputeEngineStatus.Status.STARTED || getWorkersPauseStatus() != WorkersPauseStatus.RESUMED) { @@ -86,20 +86,20 @@ public class InternalCeQueueImpl extends CeQueueImpl implements InternalCeQueue dbSession.commit(); LOG.debug("{} in progress tasks reset for worker uuid {}", i, workerUuid); } - Optional<CeQueueDto> opt = ceQueueDao.peek(dbSession, workerUuid); - if (opt.isPresent()) { - CeQueueDto taskDto = opt.get(); - Map<String, ComponentDto> componentsByUuid = loadComponentDtos(dbSession, taskDto); - Map<String, String> characteristics = dbClient.ceTaskCharacteristicsDao().selectByTaskUuids(dbSession, singletonList(taskDto.getUuid())).stream() - .collect(uniqueIndex(CeTaskCharacteristicDto::getKey, CeTaskCharacteristicDto::getValue)); - - CeTask task = convertToTask(dbSession, taskDto, characteristics, - ofNullable(taskDto.getComponentUuid()).map(componentsByUuid::get).orElse(null), - ofNullable(taskDto.getMainComponentUuid()).map(componentsByUuid::get).orElse(null)); - queueStatus.addInProgress(); - return Optional.of(task); + Optional<CeQueueDto> opt = ceQueueDao.peek(dbSession, workerUuid, excludeIndexationJob); + if (!opt.isPresent()) { + return Optional.empty(); } - return Optional.empty(); + CeQueueDto taskDto = opt.get(); + Map<String, ComponentDto> componentsByUuid = loadComponentDtos(dbSession, taskDto); + Map<String, String> characteristics = dbClient.ceTaskCharacteristicsDao().selectByTaskUuids(dbSession, singletonList(taskDto.getUuid())).stream() + .collect(uniqueIndex(CeTaskCharacteristicDto::getKey, CeTaskCharacteristicDto::getValue)); + + CeTask task = convertToTask(dbSession, taskDto, characteristics, + ofNullable(taskDto.getComponentUuid()).map(componentsByUuid::get).orElse(null), + ofNullable(taskDto.getMainComponentUuid()).map(componentsByUuid::get).orElse(null)); + queueStatus.addInProgress(); + return Optional.of(task); } } diff --git a/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeWorkerImpl.java b/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeWorkerImpl.java index ef6cad9a8ab..10c234639f5 100644 --- a/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeWorkerImpl.java +++ b/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeWorkerImpl.java @@ -61,17 +61,21 @@ public class CeWorkerImpl implements CeWorker { private final CeWorkerController ceWorkerController; private final List<ExecutionListener> listeners; private final AtomicReference<RunningState> runningState = new AtomicReference<>(); + private boolean indexationTaskLookupEnabled; + private boolean excludeIndexationJob; public CeWorkerImpl(int ordinal, String uuid, - InternalCeQueue queue, CeTaskProcessorRepository taskProcessorRepository, - CeWorkerController ceWorkerController, - ExecutionListener... listeners) { + InternalCeQueue queue, CeTaskProcessorRepository taskProcessorRepository, + CeWorkerController ceWorkerController, + ExecutionListener... listeners) { this.ordinal = checkOrdinal(ordinal); this.uuid = uuid; this.queue = queue; this.taskProcessorRepository = taskProcessorRepository; this.ceWorkerController = ceWorkerController; this.listeners = Arrays.asList(listeners); + indexationTaskLookupEnabled = true; + excludeIndexationJob = false; } private static int checkOrdinal(int ordinal) { @@ -119,7 +123,7 @@ public class CeWorkerImpl implements CeWorker { localRunningState = new RunningState(currentThread); if (!runningState.compareAndSet(null, localRunningState)) { LOG.warn("Worker {} (UUID=%s) starts executing with new Thread {} while running state isn't null. " + - "Forcefully updating Workers's running state to new Thread.", + "Forcefully updating Workers's running state to new Thread.", getOrdinal(), getUUID(), currentThread); runningState.set(localRunningState); } @@ -138,7 +142,7 @@ public class CeWorkerImpl implements CeWorker { localRunningState.runningThread.setName(oldName); if (!runningState.compareAndSet(localRunningState, null)) { LOG.warn("Worker {} (UUID=%s) ending execution in Thread {} while running state has already changed." + - " Keeping this new state.", + " Keeping this new state.", getOrdinal(), getUUID(), localRunningState.runningThread); } } @@ -154,7 +158,7 @@ public class CeWorkerImpl implements CeWorker { } try (CeWorkerController.ProcessingRecorderHook processing = ceWorkerController.registerProcessingFor(this); - ExecuteTask executeTask = new ExecuteTask(localRunningState, ceTask.get())) { + ExecuteTask executeTask = new ExecuteTask(localRunningState, ceTask.get())) { executeTask.run(); } catch (Exception e) { LOG.error(format("An error occurred while executing task with uuid '%s'", ceTask.get().getUuid()), e); @@ -164,13 +168,35 @@ public class CeWorkerImpl implements CeWorker { private Optional<CeTask> tryAndFindTaskToExecute() { try { - return queue.peek(uuid); + if (indexationTaskLookupEnabled) { + return tryAndFindTaskToExecuteIncludingIndexation(); + } else { + return queue.peek(uuid, true); + } } catch (Exception e) { LOG.error("Failed to pop the queue of analysis reports", e); } return Optional.empty(); } + private Optional<CeTask> tryAndFindTaskToExecuteIncludingIndexation() { + excludeIndexationJob = !excludeIndexationJob; + Optional<CeTask> peek = queue.peek(uuid, excludeIndexationJob); + if (peek.isPresent()) { + return peek; + } + if (excludeIndexationJob) { + peek = queue.peek(uuid, false); + if (peek.isPresent()) { + return peek; + } + // do not lookup for indexation tasks anymore + indexationTaskLookupEnabled = false; + LOG.info(String.format("worker %s found no pending task (including indexation task). Disabling indexation task lookup for this worker until next SonarQube restart.", uuid)); + } + return Optional.empty(); + } + private final class ExecuteTask implements Runnable, AutoCloseable { private final CeTask task; private final RunningState localRunningState; @@ -237,7 +263,7 @@ public class CeWorkerImpl implements CeWorker { } private void finalizeTask(CeTask task, Profiler ceProfiler, CeActivityDto.Status status, - @Nullable CeTaskResult taskResult, @Nullable Throwable error) { + @Nullable CeTaskResult taskResult, @Nullable Throwable error) { try { queue.remove(task, status, taskResult, error); } catch (Exception e) { diff --git a/server/sonar-ce/src/test/java/org/sonar/ce/queue/InternalCeQueueImplTest.java b/server/sonar-ce/src/test/java/org/sonar/ce/queue/InternalCeQueueImplTest.java index d31c514d45a..922da41cdc3 100644 --- a/server/sonar-ce/src/test/java/org/sonar/ce/queue/InternalCeQueueImplTest.java +++ b/server/sonar-ce/src/test/java/org/sonar/ce/queue/InternalCeQueueImplTest.java @@ -159,18 +159,18 @@ public class InternalCeQueueImplTest { expectedException.expect(NullPointerException.class); expectedException.expectMessage("workerUuid can't be null"); - underTest.peek(null); + underTest.peek(null, false); } @Test public void test_remove() { CeTask task = submit(CeTaskTypes.REPORT, newProjectDto("PROJECT_1")); - Optional<CeTask> peek = underTest.peek(WORKER_UUID_1); + Optional<CeTask> peek = underTest.peek(WORKER_UUID_1, false); underTest.remove(peek.get(), CeActivityDto.Status.SUCCESS, null, null); // queue is empty assertThat(db.getDbClient().ceQueueDao().selectByUuid(db.getSession(), task.getUuid()).isPresent()).isFalse(); - assertThat(underTest.peek(WORKER_UUID_2).isPresent()).isFalse(); + assertThat(underTest.peek(WORKER_UUID_2, false).isPresent()).isFalse(); // available in history Optional<CeActivityDto> history = db.getDbClient().ceActivityDao().selectByUuid(db.getSession(), task.getUuid()); @@ -199,7 +199,7 @@ public class InternalCeQueueImplTest { @Test public void remove_does_not_set_analysisUuid_in_CeActivity_when_CeTaskResult_has_no_analysis_uuid() { CeTask task = submit(CeTaskTypes.REPORT, newProjectDto("PROJECT_1")); - Optional<CeTask> peek = underTest.peek(WORKER_UUID_1); + Optional<CeTask> peek = underTest.peek(WORKER_UUID_1, false); underTest.remove(peek.get(), CeActivityDto.Status.SUCCESS, newTaskResult(null), null); // available in history @@ -212,7 +212,7 @@ public class InternalCeQueueImplTest { public void remove_sets_analysisUuid_in_CeActivity_when_CeTaskResult_has_analysis_uuid() { CeTask task = submit(CeTaskTypes.REPORT, newProjectDto("PROJECT_1")); - Optional<CeTask> peek = underTest.peek(WORKER_UUID_2); + Optional<CeTask> peek = underTest.peek(WORKER_UUID_2, false); underTest.remove(peek.get(), CeActivityDto.Status.SUCCESS, newTaskResult(AN_ANALYSIS_UUID), null); // available in history @@ -226,7 +226,7 @@ public class InternalCeQueueImplTest { Throwable error = new NullPointerException("Fake NPE to test persistence to DB"); CeTask task = submit(CeTaskTypes.REPORT, newProjectDto("PROJECT_1")); - Optional<CeTask> peek = underTest.peek(WORKER_UUID_1); + Optional<CeTask> peek = underTest.peek(WORKER_UUID_1, false); underTest.remove(peek.get(), CeActivityDto.Status.FAILED, null, error); Optional<CeActivityDto> activityDto = db.getDbClient().ceActivityDao().selectByUuid(session, task.getUuid()); @@ -242,7 +242,7 @@ public class InternalCeQueueImplTest { Throwable error = new TypedExceptionImpl("aType", "aMessage"); CeTask task = submit(CeTaskTypes.REPORT, newProjectDto("PROJECT_1")); - Optional<CeTask> peek = underTest.peek(WORKER_UUID_1); + Optional<CeTask> peek = underTest.peek(WORKER_UUID_1, false); underTest.remove(peek.get(), CeActivityDto.Status.FAILED, null, error); CeActivityDto activityDto = db.getDbClient().ceActivityDao().selectByUuid(session, task.getUuid()).get(); @@ -348,7 +348,7 @@ public class InternalCeQueueImplTest { public void test_peek() { CeTask task = submit(CeTaskTypes.REPORT, newProjectDto("PROJECT_1")); - Optional<CeTask> peek = underTest.peek(WORKER_UUID_1); + Optional<CeTask> peek = underTest.peek(WORKER_UUID_1, false); assertThat(peek.isPresent()).isTrue(); assertThat(peek.get().getUuid()).isEqualTo(task.getUuid()); assertThat(peek.get().getType()).isEqualTo(CeTaskTypes.REPORT); @@ -356,7 +356,7 @@ public class InternalCeQueueImplTest { assertThat(peek.get().getMainComponent()).contains(peek.get().getComponent().get()); // no more pending tasks - peek = underTest.peek(WORKER_UUID_2); + peek = underTest.peek(WORKER_UUID_2, false); assertThat(peek.isPresent()).isFalse(); } @@ -366,7 +366,7 @@ public class InternalCeQueueImplTest { ComponentDto branch = db.components().insertProjectBranch(project); CeTask task = submit(CeTaskTypes.REPORT, branch); - Optional<CeTask> peek = underTest.peek(WORKER_UUID_1); + Optional<CeTask> peek = underTest.peek(WORKER_UUID_1, false); assertThat(peek.isPresent()).isTrue(); assertThat(peek.get().getUuid()).isEqualTo(task.getUuid()); assertThat(peek.get().getType()).isEqualTo(CeTaskTypes.REPORT); @@ -374,7 +374,7 @@ public class InternalCeQueueImplTest { assertThat(peek.get().getMainComponent()).contains(new CeTask.Component(project.uuid(), project.getDbKey(), project.name())); // no more pending tasks - peek = underTest.peek(WORKER_UUID_2); + peek = underTest.peek(WORKER_UUID_2, false); assertThat(peek.isPresent()).isFalse(); } @@ -383,11 +383,11 @@ public class InternalCeQueueImplTest { CeTask task = submit(CeTaskTypes.REPORT, newProjectDto("PROJECT_1")); underTest.pauseWorkers(); - Optional<CeTask> peek = underTest.peek(WORKER_UUID_1); + Optional<CeTask> peek = underTest.peek(WORKER_UUID_1, false); assertThat(peek).isEmpty(); underTest.resumeWorkers(); - peek = underTest.peek(WORKER_UUID_1); + peek = underTest.peek(WORKER_UUID_1, false); assertThat(peek).isPresent(); assertThat(peek.get().getUuid()).isEqualTo(task.getUuid()); } @@ -401,7 +401,7 @@ public class InternalCeQueueImplTest { makeInProgress(dto, "foo"); db.commit(); - assertThat(underTest.peek(WORKER_UUID_1)).isEmpty(); + assertThat(underTest.peek(WORKER_UUID_1, false)).isEmpty(); } @Test @@ -409,7 +409,7 @@ public class InternalCeQueueImplTest { submit(CeTaskTypes.REPORT, newProjectDto("PROJECT_1")); when(computeEngineStatus.getStatus()).thenReturn(STOPPING); - Optional<CeTask> peek = underTest.peek(WORKER_UUID_1); + Optional<CeTask> peek = underTest.peek(WORKER_UUID_1, false); assertThat(peek.isPresent()).isFalse(); } @@ -421,7 +421,7 @@ public class InternalCeQueueImplTest { .setStatus(CeQueueDto.Status.PENDING)); db.commit(); - assertThat(underTest.peek(WORKER_UUID_1).get().getUuid()).isEqualTo("uuid"); + assertThat(underTest.peek(WORKER_UUID_1, false).get().getUuid()).isEqualTo("uuid"); } @Test @@ -430,7 +430,7 @@ public class InternalCeQueueImplTest { CeQueueDto u1 = insertPending("u1");// will be picked-because older than any of the reset ones CeQueueDto u2 = insertInProgress("u2", WORKER_UUID_1);// will be reset - assertThat(underTest.peek(WORKER_UUID_1).get().getUuid()).isEqualTo("u0"); + assertThat(underTest.peek(WORKER_UUID_1, false).get().getUuid()).isEqualTo("u0"); verifyUnmodifiedTask(u1); verifyResetTask(u2); @@ -444,7 +444,7 @@ public class InternalCeQueueImplTest { CeQueueDto u3 = insertInProgress("u3", WORKER_UUID_1); CeQueueDto u4 = insertInProgress("u4", WORKER_UUID_2); - assertThat(underTest.peek(WORKER_UUID_1).get().getUuid()).isEqualTo("u0"); + assertThat(underTest.peek(WORKER_UUID_1, false).get().getUuid()).isEqualTo("u0"); verifyResetTask(u1); verifyUnmodifiedTask(u2); @@ -502,7 +502,7 @@ public class InternalCeQueueImplTest { @Test public void fail_to_cancel_if_in_progress() { CeTask task = submit(CeTaskTypes.REPORT, newProjectDto("PROJECT_1")); - underTest.peek(WORKER_UUID_2); + underTest.peek(WORKER_UUID_2, false); CeQueueDto queueDto = db.getDbClient().ceQueueDao().selectByUuid(db.getSession(), task.getUuid()).get(); expectedException.expect(IllegalStateException.class); @@ -516,7 +516,7 @@ public class InternalCeQueueImplTest { CeTask inProgressTask = submit(CeTaskTypes.REPORT, newProjectDto("PROJECT_1")); CeTask pendingTask1 = submit(CeTaskTypes.REPORT, newProjectDto("PROJECT_2")); CeTask pendingTask2 = submit(CeTaskTypes.REPORT, newProjectDto("PROJECT_3")); - underTest.peek(WORKER_UUID_2); + underTest.peek(WORKER_UUID_2, false); int canceledCount = underTest.cancelAll(); assertThat(canceledCount).isEqualTo(2); diff --git a/server/sonar-ce/src/test/java/org/sonar/ce/taskprocessor/CeWorkerImplTest.java b/server/sonar-ce/src/test/java/org/sonar/ce/taskprocessor/CeWorkerImplTest.java index 5f67a7fdb32..84943ca6009 100644 --- a/server/sonar-ce/src/test/java/org/sonar/ce/taskprocessor/CeWorkerImplTest.java +++ b/server/sonar-ce/src/test/java/org/sonar/ce/taskprocessor/CeWorkerImplTest.java @@ -36,7 +36,6 @@ import org.junit.Test; import org.junit.rules.ExpectedException; import org.mockito.ArgumentCaptor; import org.mockito.InOrder; -import org.mockito.Mockito; import org.mockito.stubbing.Answer; import org.sonar.api.impl.utils.TestSystem2; import org.sonar.api.utils.MessageException; @@ -60,9 +59,11 @@ import org.sonar.server.organization.BillingValidations; import static org.apache.commons.lang.RandomStringUtils.randomAlphabetic; import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyBoolean; import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.ArgumentMatchers.isNull; +import static org.mockito.Mockito.*; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.reset; @@ -99,7 +100,7 @@ public class CeWorkerImplTest { private CeWorker underTest = new CeWorkerImpl(randomOrdinal, workerUuid, queue, taskProcessorRepository, ceWorkerController, executionListener1, executionListener2); private CeWorker underTestNoListener = new CeWorkerImpl(randomOrdinal, workerUuid, queue, taskProcessorRepository, ceWorkerController); - private InOrder inOrder = Mockito.inOrder(taskProcessor, queue, executionListener1, executionListener2); + private InOrder inOrder = inOrder(taskProcessor, queue, executionListener1, executionListener2); private final CeTask.User submitter = new CeTask.User("UUID_USER_1", "LOGIN_1"); @Before @@ -144,7 +145,7 @@ public class CeWorkerImplTest { @Test public void no_pending_tasks_in_queue() throws Exception { - when(queue.peek(anyString())).thenReturn(Optional.empty()); + when(queue.peek(anyString(), anyBoolean())).thenReturn(Optional.empty()); assertThat(underTest.call()).isEqualTo(NO_TASK); @@ -153,7 +154,7 @@ public class CeWorkerImplTest { @Test public void no_pending_tasks_in_queue_without_listener() throws Exception { - when(queue.peek(anyString())).thenReturn(Optional.empty()); + when(queue.peek(anyString(), anyBoolean())).thenReturn(Optional.empty()); assertThat(underTestNoListener.call()).isEqualTo(NO_TASK); @@ -164,7 +165,7 @@ public class CeWorkerImplTest { public void fail_when_no_CeTaskProcessor_is_found_in_repository() throws Exception { CeTask task = createCeTask(null); taskProcessorRepository.setNoProcessorForTask(CeTaskTypes.REPORT); - when(queue.peek(anyString())).thenReturn(Optional.of(task)); + when(queue.peek(anyString(), anyBoolean())).thenReturn(Optional.of(task)); assertThat(underTest.call()).isEqualTo(TASK_PROCESSED); @@ -180,7 +181,7 @@ public class CeWorkerImplTest { public void fail_when_no_CeTaskProcessor_is_found_in_repository_without_listener() throws Exception { CeTask task = createCeTask(null); taskProcessorRepository.setNoProcessorForTask(CeTaskTypes.REPORT); - when(queue.peek(anyString())).thenReturn(Optional.of(task)); + when(queue.peek(anyString(), anyBoolean())).thenReturn(Optional.of(task)); assertThat(underTestNoListener.call()).isEqualTo(TASK_PROCESSED); @@ -193,7 +194,7 @@ public class CeWorkerImplTest { public void peek_and_process_task() throws Exception { CeTask task = createCeTask(null); taskProcessorRepository.setProcessorForTask(task.getType(), taskProcessor); - when(queue.peek(anyString())).thenReturn(Optional.of(task)); + when(queue.peek(anyString(), anyBoolean())).thenReturn(Optional.of(task)); assertThat(underTest.call()).isEqualTo(TASK_PROCESSED); @@ -210,7 +211,7 @@ public class CeWorkerImplTest { public void peek_and_process_task_without_listeners() throws Exception { CeTask task = createCeTask(null); taskProcessorRepository.setProcessorForTask(task.getType(), taskProcessor); - when(queue.peek(anyString())).thenReturn(Optional.of(task)); + when(queue.peek(anyString(), anyBoolean())).thenReturn(Optional.of(task)); assertThat(underTestNoListener.call()).isEqualTo(TASK_PROCESSED); @@ -223,7 +224,7 @@ public class CeWorkerImplTest { @Test public void fail_to_process_task() throws Exception { CeTask task = createCeTask(null); - when(queue.peek(anyString())).thenReturn(Optional.of(task)); + when(queue.peek(anyString(), anyBoolean())).thenReturn(Optional.of(task)); taskProcessorRepository.setProcessorForTask(task.getType(), taskProcessor); Throwable error = makeTaskProcessorFail(task); @@ -241,7 +242,7 @@ public class CeWorkerImplTest { @Test public void fail_to_process_task_without_listeners() throws Exception { CeTask task = createCeTask(null); - when(queue.peek(anyString())).thenReturn(Optional.of(task)); + when(queue.peek(anyString(), anyBoolean())).thenReturn(Optional.of(task)); taskProcessorRepository.setProcessorForTask(task.getType(), taskProcessor); Throwable error = makeTaskProcessorFail(task); @@ -255,7 +256,7 @@ public class CeWorkerImplTest { @Test public void log_task_characteristics() throws Exception { - when(queue.peek(anyString())).thenReturn(Optional.of(createCeTask(null, "pullRequest", "123", "branch", "foo"))); + when(queue.peek(anyString(), anyBoolean())).thenReturn(Optional.of(createCeTask(null, "pullRequest", "123", "branch", "foo"))); taskProcessorRepository.setProcessorForTask(CeTaskTypes.REPORT, taskProcessor); underTest.call(); @@ -270,7 +271,7 @@ public class CeWorkerImplTest { @Test public void do_not_log_submitter_param_if_anonymous_and_success() throws Exception { - when(queue.peek(anyString())).thenReturn(Optional.of(createCeTask(null))); + when(queue.peek(anyString(), anyBoolean())).thenReturn(Optional.of(createCeTask(null))); taskProcessorRepository.setProcessorForTask(CeTaskTypes.REPORT, taskProcessor); underTest.call(); @@ -286,7 +287,7 @@ public class CeWorkerImplTest { @Test public void do_not_log_submitter_param_if_anonymous_and_error() throws Exception { CeTask ceTask = createCeTask(null); - when(queue.peek(anyString())).thenReturn(Optional.of(ceTask)); + when(queue.peek(anyString(), anyBoolean())).thenReturn(Optional.of(ceTask)); taskProcessorRepository.setProcessorForTask(ceTask.getType(), taskProcessor); makeTaskProcessorFail(ceTask); @@ -306,7 +307,7 @@ public class CeWorkerImplTest { @Test public void log_submitter_login_if_authenticated_and_success() throws Exception { UserDto userDto = insertRandomUser(); - when(queue.peek(anyString())).thenReturn(Optional.of(createCeTask(toTaskSubmitter(userDto)))); + when(queue.peek(anyString(), anyBoolean())).thenReturn(Optional.of(createCeTask(toTaskSubmitter(userDto)))); taskProcessorRepository.setProcessorForTask(CeTaskTypes.REPORT, taskProcessor); underTest.call(); @@ -322,7 +323,7 @@ public class CeWorkerImplTest { @Test public void log_submitterUuid_if_user_matching_submitterUuid_can_not_be_found() throws Exception { - when(queue.peek(anyString())).thenReturn(Optional.of(createCeTask(new CeTask.User("UUID_USER", null)))); + when(queue.peek(anyString(), anyBoolean())).thenReturn(Optional.of(createCeTask(new CeTask.User("UUID_USER", null)))); taskProcessorRepository.setProcessorForTask(CeTaskTypes.REPORT, taskProcessor); underTest.call(); @@ -340,7 +341,7 @@ public class CeWorkerImplTest { public void display_submitterLogin_in_logs_when_set_in_case_of_error() throws Exception { UserDto userDto = insertRandomUser(); CeTask ceTask = createCeTask(toTaskSubmitter(userDto)); - when(queue.peek(anyString())).thenReturn(Optional.of(ceTask)); + when(queue.peek(anyString(), anyBoolean())).thenReturn(Optional.of(ceTask)); taskProcessorRepository.setProcessorForTask(ceTask.getType(), taskProcessor); makeTaskProcessorFail(ceTask); @@ -360,7 +361,7 @@ public class CeWorkerImplTest { public void display_start_stop_at_debug_level_for_console_if_DEBUG_is_enabled_and_task_successful() throws Exception { logTester.setLevel(LoggerLevel.DEBUG); - when(queue.peek(anyString())).thenReturn(Optional.of(createCeTask(submitter))); + when(queue.peek(anyString(), anyBoolean())).thenReturn(Optional.of(createCeTask(submitter))); taskProcessorRepository.setProcessorForTask(CeTaskTypes.REPORT, taskProcessor); underTest.call(); @@ -379,7 +380,7 @@ public class CeWorkerImplTest { logTester.setLevel(LoggerLevel.DEBUG); CeTask ceTask = createCeTask(submitter); - when(queue.peek(anyString())).thenReturn(Optional.of(ceTask)); + when(queue.peek(anyString(), anyBoolean())).thenReturn(Optional.of(ceTask)); taskProcessorRepository.setProcessorForTask(CeTaskTypes.REPORT, taskProcessor); makeTaskProcessorFail(ceTask); @@ -399,7 +400,7 @@ public class CeWorkerImplTest { @Test public void call_sets_and_restores_thread_name_with_information_of_worker_when_there_is_no_task_to_process() throws Exception { String threadName = randomAlphabetic(3); - when(queue.peek(anyString())).thenAnswer(invocation -> { + when(queue.peek(anyString(), anyBoolean())).thenAnswer(invocation -> { assertThat(Thread.currentThread().getName()) .isEqualTo("Worker " + randomOrdinal + " (UUID=" + workerUuid + ") on " + threadName); return Optional.empty(); @@ -413,7 +414,7 @@ public class CeWorkerImplTest { @Test public void call_sets_and_restores_thread_name_with_information_of_worker_when_a_task_is_processed() throws Exception { String threadName = randomAlphabetic(3); - when(queue.peek(anyString())).thenAnswer(invocation -> { + when(queue.peek(anyString(), anyBoolean())).thenAnswer(invocation -> { assertThat(Thread.currentThread().getName()) .isEqualTo("Worker " + randomOrdinal + " (UUID=" + workerUuid + ") on " + threadName); return Optional.of(createCeTask(submitter)); @@ -429,7 +430,7 @@ public class CeWorkerImplTest { public void call_sets_and_restores_thread_name_with_information_of_worker_when_an_error_occurs() throws Exception { String threadName = randomAlphabetic(3); CeTask ceTask = createCeTask(submitter); - when(queue.peek(anyString())).thenAnswer(invocation -> { + when(queue.peek(anyString(), anyBoolean())).thenAnswer(invocation -> { assertThat(Thread.currentThread().getName()) .isEqualTo("Worker " + randomOrdinal + " (UUID=" + workerUuid + ") on " + threadName); return Optional.of(ceTask); @@ -457,7 +458,7 @@ public class CeWorkerImplTest { @Test public void log_error_when_task_fails_with_not_MessageException() throws Exception { CeTask ceTask = createCeTask(submitter); - when(queue.peek(anyString())).thenReturn(Optional.of(ceTask)); + when(queue.peek(anyString(), anyBoolean())).thenReturn(Optional.of(ceTask)); taskProcessorRepository.setProcessorForTask(CeTaskTypes.REPORT, taskProcessor); makeTaskProcessorFail(ceTask); @@ -475,7 +476,7 @@ public class CeWorkerImplTest { @Test public void do_no_log_error_when_task_fails_with_MessageException() throws Exception { CeTask ceTask = createCeTask(submitter); - when(queue.peek(anyString())).thenReturn(Optional.of(ceTask)); + when(queue.peek(anyString(), anyBoolean())).thenReturn(Optional.of(ceTask)); taskProcessorRepository.setProcessorForTask(CeTaskTypes.REPORT, taskProcessor); makeTaskProcessorFail(ceTask, MessageException.of("simulate MessageException thrown by TaskProcessor#process")); @@ -491,7 +492,7 @@ public class CeWorkerImplTest { @Test public void do_no_log_error_when_task_fails_with_BillingValidationsException() throws Exception { CeTask ceTask = createCeTask(submitter); - when(queue.peek(anyString())).thenReturn(Optional.of(ceTask)); + when(queue.peek(anyString(), anyBoolean())).thenReturn(Optional.of(ceTask)); taskProcessorRepository.setProcessorForTask(CeTaskTypes.REPORT, taskProcessor); makeTaskProcessorFail(ceTask, new BillingValidations.BillingValidationsException("simulate MessageException thrown by TaskProcessor#process")); @@ -507,7 +508,7 @@ public class CeWorkerImplTest { @Test public void log_error_when_task_was_successful_but_ending_state_can_not_be_persisted_to_db() throws Exception { CeTask ceTask = createCeTask(submitter); - when(queue.peek(anyString())).thenReturn(Optional.of(ceTask)); + when(queue.peek(anyString(), anyBoolean())).thenReturn(Optional.of(ceTask)); taskProcessorRepository.setProcessorForTask(CeTaskTypes.REPORT, taskProcessor); doThrow(new RuntimeException("Simulate queue#remove failing")).when(queue).remove(ceTask, CeActivityDto.Status.SUCCESS, null, null); @@ -519,7 +520,7 @@ public class CeWorkerImplTest { @Test public void log_error_when_task_failed_and_ending_state_can_not_be_persisted_to_db() throws Exception { CeTask ceTask = createCeTask(submitter); - when(queue.peek(anyString())).thenReturn(Optional.of(ceTask)); + when(queue.peek(anyString(), anyBoolean())).thenReturn(Optional.of(ceTask)); taskProcessorRepository.setProcessorForTask(CeTaskTypes.REPORT, taskProcessor); IllegalStateException ex = makeTaskProcessorFail(ceTask); RuntimeException runtimeException = new RuntimeException("Simulate queue#remove failing"); @@ -548,7 +549,7 @@ public class CeWorkerImplTest { @Test public void log_error_as_suppressed_when_task_failed_with_MessageException_and_ending_state_can_not_be_persisted_to_db() throws Exception { CeTask ceTask = createCeTask(submitter); - when(queue.peek(anyString())).thenReturn(Optional.of(ceTask)); + when(queue.peek(anyString(), anyBoolean())).thenReturn(Optional.of(ceTask)); taskProcessorRepository.setProcessorForTask(CeTaskTypes.REPORT, taskProcessor); MessageException ex = makeTaskProcessorFail(ceTask, MessageException.of("simulate MessageException thrown by TaskProcessor#process")); RuntimeException runtimeException = new RuntimeException("Simulate queue#remove failing"); @@ -579,7 +580,7 @@ public class CeWorkerImplTest { CountDownLatch inCallLatch = new CountDownLatch(1); CountDownLatch assertionsDoneLatch = new CountDownLatch(1); // mock long running peek(String) call => Thread is executing call() but not running a task - when(queue.peek(anyString())).thenAnswer((Answer<Optional<CeTask>>) invocation -> { + when(queue.peek(anyString(), anyBoolean())).thenAnswer((Answer<Optional<CeTask>>) invocation -> { inCallLatch.countDown(); try { assertionsDoneLatch.await(10, TimeUnit.SECONDS); @@ -614,7 +615,7 @@ public class CeWorkerImplTest { String taskType = randomAlphabetic(12); CeTask ceTask = mock(CeTask.class); when(ceTask.getType()).thenReturn(taskType); - when(queue.peek(anyString())).thenReturn(Optional.of(ceTask)); + when(queue.peek(anyString(), anyBoolean())).thenReturn(Optional.of(ceTask)); taskProcessorRepository.setProcessorForTask(taskType, new SimpleCeTaskProcessor() { @CheckForNull @Override @@ -657,7 +658,7 @@ public class CeWorkerImplTest { CountDownLatch inCallLatch = new CountDownLatch(1); CountDownLatch assertionsDoneLatch = new CountDownLatch(1); // mock long running peek(String) call => Thread is executing call() but not running a task - when(queue.peek(anyString())).thenAnswer((Answer<Optional<CeTask>>) invocation -> { + when(queue.peek(anyString(), anyBoolean())).thenAnswer((Answer<Optional<CeTask>>) invocation -> { inCallLatch.countDown(); try { assertionsDoneLatch.await(10, TimeUnit.SECONDS); @@ -688,7 +689,7 @@ public class CeWorkerImplTest { String taskType = randomAlphabetic(12); CeTask ceTask = mock(CeTask.class); when(ceTask.getType()).thenReturn(taskType); - when(queue.peek(anyString())).thenReturn(Optional.of(ceTask)); + when(queue.peek(anyString(), anyBoolean())).thenReturn(Optional.of(ceTask)); taskProcessorRepository.setProcessorForTask(taskType, new SimpleCeTaskProcessor() { @CheckForNull @@ -745,7 +746,7 @@ public class CeWorkerImplTest { } private void verifyWorkerUuid() { - verify(queue).peek(workerUuidCaptor.capture()); + verify(queue, atLeastOnce()).peek(workerUuidCaptor.capture(), anyBoolean()); assertThat(workerUuidCaptor.getValue()).isEqualTo(workerUuid); } |