aboutsummaryrefslogtreecommitdiffstats
path: root/server/sonar-ce
diff options
context:
space:
mode:
authorPierre <pierre.guillot@sonarsource.com>2020-06-02 16:13:48 +0200
committersonartech <sonartech@sonarsource.com>2020-06-26 20:04:56 +0000
commite4b519ed129dbc7b76eab00d6c48166a8993e35f (patch)
tree2f79b9a0bdeccd1494fcc1ac4d3a14cb78b50999 /server/sonar-ce
parentac3abae6429931aa5065b9f1ac359ff9a4ca78fd (diff)
downloadsonarqube-e4b519ed129dbc7b76eab00d6c48166a8993e35f.tar.gz
sonarqube-e4b519ed129dbc7b76eab00d6c48166a8993e35f.zip
SONAR-13444 background tasks for issue indexation implementation
Diffstat (limited to 'server/sonar-ce')
-rw-r--r--server/sonar-ce/src/main/java/org/sonar/ce/container/ComputeEngineContainerImpl.java4
-rw-r--r--server/sonar-ce/src/main/java/org/sonar/ce/issue/index/NoAsyncIssueIndexing.java31
-rw-r--r--server/sonar-ce/src/main/java/org/sonar/ce/queue/InternalCeQueue.java4
-rw-r--r--server/sonar-ce/src/main/java/org/sonar/ce/queue/InternalCeQueueImpl.java28
-rw-r--r--server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeWorkerImpl.java42
-rw-r--r--server/sonar-ce/src/test/java/org/sonar/ce/queue/InternalCeQueueImplTest.java40
-rw-r--r--server/sonar-ce/src/test/java/org/sonar/ce/taskprocessor/CeWorkerImplTest.java65
7 files changed, 139 insertions, 75 deletions
diff --git a/server/sonar-ce/src/main/java/org/sonar/ce/container/ComputeEngineContainerImpl.java b/server/sonar-ce/src/main/java/org/sonar/ce/container/ComputeEngineContainerImpl.java
index a455b71f39c..f7e3793a818 100644
--- a/server/sonar-ce/src/main/java/org/sonar/ce/container/ComputeEngineContainerImpl.java
+++ b/server/sonar-ce/src/main/java/org/sonar/ce/container/ComputeEngineContainerImpl.java
@@ -53,6 +53,7 @@ import org.sonar.ce.async.SynchronousAsyncExecution;
import org.sonar.ce.cleaning.CeCleaningModule;
import org.sonar.ce.cleaning.NoopCeCleaningSchedulerImpl;
import org.sonar.ce.db.ReadOnlyPropertiesDao;
+import org.sonar.ce.issue.index.NoAsyncIssueIndexing;
import org.sonar.ce.logging.CeProcessLogging;
import org.sonar.ce.monitoring.CEQueueStatusImpl;
import org.sonar.ce.monitoring.DistributedCEQueueStatusImpl;
@@ -64,6 +65,7 @@ import org.sonar.ce.task.projectanalysis.ProjectAnalysisTaskModule;
import org.sonar.ce.task.projectanalysis.analysis.ProjectConfigurationFactory;
import org.sonar.ce.task.projectanalysis.issue.AdHocRuleCreator;
import org.sonar.ce.task.projectanalysis.notification.ReportAnalysisFailureNotificationModule;
+import org.sonar.ce.task.projectanalysis.taskprocessor.IssueSyncTaskModule;
import org.sonar.ce.taskprocessor.CeProcessingScheduler;
import org.sonar.ce.taskprocessor.CeTaskProcessorModule;
import org.sonar.core.component.DefaultResourceTypes;
@@ -402,6 +404,7 @@ public class ComputeEngineContainerImpl implements ComputeEngineContainer {
// issues
IssueStorage.class,
+ NoAsyncIssueIndexing.class,
IssueIndexer.class,
IssueIteratorFactory.class,
IssueFieldsSetter.class, // used in Web Services and CE's DebtCalculator
@@ -439,6 +442,7 @@ public class ComputeEngineContainerImpl implements ComputeEngineContainer {
CeHttpModule.class,
CeTaskCommonsModule.class,
ProjectAnalysisTaskModule.class,
+ IssueSyncTaskModule.class,
CeTaskProcessorModule.class,
OfficialDistribution.class,
diff --git a/server/sonar-ce/src/main/java/org/sonar/ce/issue/index/NoAsyncIssueIndexing.java b/server/sonar-ce/src/main/java/org/sonar/ce/issue/index/NoAsyncIssueIndexing.java
new file mode 100644
index 00000000000..ba4d525c4dd
--- /dev/null
+++ b/server/sonar-ce/src/main/java/org/sonar/ce/issue/index/NoAsyncIssueIndexing.java
@@ -0,0 +1,31 @@
+/*
+ * SonarQube
+ * Copyright (C) 2009-2020 SonarSource SA
+ * mailto:info AT sonarsource DOT com
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 3 of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this program; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ */
+package org.sonar.ce.issue.index;
+
+import org.sonar.api.ce.ComputeEngineSide;
+import org.sonar.server.issue.index.AsyncIssueIndexing;
+
+@ComputeEngineSide
+public class NoAsyncIssueIndexing implements AsyncIssueIndexing {
+ @Override
+ public void triggerOnIndexCreation() {
+ throw new IllegalStateException("Async issue indexing should not be triggered in Compute Engine");
+ }
+}
diff --git a/server/sonar-ce/src/main/java/org/sonar/ce/queue/InternalCeQueue.java b/server/sonar-ce/src/main/java/org/sonar/ce/queue/InternalCeQueue.java
index 2058d7a19c5..30466805efc 100644
--- a/server/sonar-ce/src/main/java/org/sonar/ce/queue/InternalCeQueue.java
+++ b/server/sonar-ce/src/main/java/org/sonar/ce/queue/InternalCeQueue.java
@@ -40,6 +40,8 @@ public interface InternalCeQueue extends CeQueue {
* The task status is changed to {@link org.sonar.db.ce.CeQueueDto.Status#IN_PROGRESS}.
* Does not return anything if workers are paused or being paused (see {@link #getWorkersPauseStatus()}.
*
+ * @param excludeIndexationJob change the underlying request to exclude indexation tasks.
+ *
* <p>Only a single task can be peeked by project.</p>
*
* <p>An unchecked exception may be thrown on technical errors (db connection, ...).</p>
@@ -47,7 +49,7 @@ public interface InternalCeQueue extends CeQueue {
* <p>Tasks which have been executed twice already but are still {@link org.sonar.db.ce.CeQueueDto.Status#PENDING}
* are ignored</p>
*/
- Optional<CeTask> peek(String workerUuid);
+ Optional<CeTask> peek(String workerUuid, boolean excludeIndexationJob);
/**
* Removes a task from the queue and registers it to past activities. This method
diff --git a/server/sonar-ce/src/main/java/org/sonar/ce/queue/InternalCeQueueImpl.java b/server/sonar-ce/src/main/java/org/sonar/ce/queue/InternalCeQueueImpl.java
index 5bdab435748..711961b1f95 100644
--- a/server/sonar-ce/src/main/java/org/sonar/ce/queue/InternalCeQueueImpl.java
+++ b/server/sonar-ce/src/main/java/org/sonar/ce/queue/InternalCeQueueImpl.java
@@ -73,7 +73,7 @@ public class InternalCeQueueImpl extends CeQueueImpl implements InternalCeQueue
}
@Override
- public Optional<CeTask> peek(String workerUuid) {
+ public Optional<CeTask> peek(String workerUuid, boolean excludeIndexationJob) {
requireNonNull(workerUuid, "workerUuid can't be null");
if (computeEngineStatus.getStatus() != ComputeEngineStatus.Status.STARTED || getWorkersPauseStatus() != WorkersPauseStatus.RESUMED) {
@@ -86,20 +86,20 @@ public class InternalCeQueueImpl extends CeQueueImpl implements InternalCeQueue
dbSession.commit();
LOG.debug("{} in progress tasks reset for worker uuid {}", i, workerUuid);
}
- Optional<CeQueueDto> opt = ceQueueDao.peek(dbSession, workerUuid);
- if (opt.isPresent()) {
- CeQueueDto taskDto = opt.get();
- Map<String, ComponentDto> componentsByUuid = loadComponentDtos(dbSession, taskDto);
- Map<String, String> characteristics = dbClient.ceTaskCharacteristicsDao().selectByTaskUuids(dbSession, singletonList(taskDto.getUuid())).stream()
- .collect(uniqueIndex(CeTaskCharacteristicDto::getKey, CeTaskCharacteristicDto::getValue));
-
- CeTask task = convertToTask(dbSession, taskDto, characteristics,
- ofNullable(taskDto.getComponentUuid()).map(componentsByUuid::get).orElse(null),
- ofNullable(taskDto.getMainComponentUuid()).map(componentsByUuid::get).orElse(null));
- queueStatus.addInProgress();
- return Optional.of(task);
+ Optional<CeQueueDto> opt = ceQueueDao.peek(dbSession, workerUuid, excludeIndexationJob);
+ if (!opt.isPresent()) {
+ return Optional.empty();
}
- return Optional.empty();
+ CeQueueDto taskDto = opt.get();
+ Map<String, ComponentDto> componentsByUuid = loadComponentDtos(dbSession, taskDto);
+ Map<String, String> characteristics = dbClient.ceTaskCharacteristicsDao().selectByTaskUuids(dbSession, singletonList(taskDto.getUuid())).stream()
+ .collect(uniqueIndex(CeTaskCharacteristicDto::getKey, CeTaskCharacteristicDto::getValue));
+
+ CeTask task = convertToTask(dbSession, taskDto, characteristics,
+ ofNullable(taskDto.getComponentUuid()).map(componentsByUuid::get).orElse(null),
+ ofNullable(taskDto.getMainComponentUuid()).map(componentsByUuid::get).orElse(null));
+ queueStatus.addInProgress();
+ return Optional.of(task);
}
}
diff --git a/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeWorkerImpl.java b/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeWorkerImpl.java
index ef6cad9a8ab..10c234639f5 100644
--- a/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeWorkerImpl.java
+++ b/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeWorkerImpl.java
@@ -61,17 +61,21 @@ public class CeWorkerImpl implements CeWorker {
private final CeWorkerController ceWorkerController;
private final List<ExecutionListener> listeners;
private final AtomicReference<RunningState> runningState = new AtomicReference<>();
+ private boolean indexationTaskLookupEnabled;
+ private boolean excludeIndexationJob;
public CeWorkerImpl(int ordinal, String uuid,
- InternalCeQueue queue, CeTaskProcessorRepository taskProcessorRepository,
- CeWorkerController ceWorkerController,
- ExecutionListener... listeners) {
+ InternalCeQueue queue, CeTaskProcessorRepository taskProcessorRepository,
+ CeWorkerController ceWorkerController,
+ ExecutionListener... listeners) {
this.ordinal = checkOrdinal(ordinal);
this.uuid = uuid;
this.queue = queue;
this.taskProcessorRepository = taskProcessorRepository;
this.ceWorkerController = ceWorkerController;
this.listeners = Arrays.asList(listeners);
+ indexationTaskLookupEnabled = true;
+ excludeIndexationJob = false;
}
private static int checkOrdinal(int ordinal) {
@@ -119,7 +123,7 @@ public class CeWorkerImpl implements CeWorker {
localRunningState = new RunningState(currentThread);
if (!runningState.compareAndSet(null, localRunningState)) {
LOG.warn("Worker {} (UUID=%s) starts executing with new Thread {} while running state isn't null. " +
- "Forcefully updating Workers's running state to new Thread.",
+ "Forcefully updating Workers's running state to new Thread.",
getOrdinal(), getUUID(), currentThread);
runningState.set(localRunningState);
}
@@ -138,7 +142,7 @@ public class CeWorkerImpl implements CeWorker {
localRunningState.runningThread.setName(oldName);
if (!runningState.compareAndSet(localRunningState, null)) {
LOG.warn("Worker {} (UUID=%s) ending execution in Thread {} while running state has already changed." +
- " Keeping this new state.",
+ " Keeping this new state.",
getOrdinal(), getUUID(), localRunningState.runningThread);
}
}
@@ -154,7 +158,7 @@ public class CeWorkerImpl implements CeWorker {
}
try (CeWorkerController.ProcessingRecorderHook processing = ceWorkerController.registerProcessingFor(this);
- ExecuteTask executeTask = new ExecuteTask(localRunningState, ceTask.get())) {
+ ExecuteTask executeTask = new ExecuteTask(localRunningState, ceTask.get())) {
executeTask.run();
} catch (Exception e) {
LOG.error(format("An error occurred while executing task with uuid '%s'", ceTask.get().getUuid()), e);
@@ -164,13 +168,35 @@ public class CeWorkerImpl implements CeWorker {
private Optional<CeTask> tryAndFindTaskToExecute() {
try {
- return queue.peek(uuid);
+ if (indexationTaskLookupEnabled) {
+ return tryAndFindTaskToExecuteIncludingIndexation();
+ } else {
+ return queue.peek(uuid, true);
+ }
} catch (Exception e) {
LOG.error("Failed to pop the queue of analysis reports", e);
}
return Optional.empty();
}
+ private Optional<CeTask> tryAndFindTaskToExecuteIncludingIndexation() {
+ excludeIndexationJob = !excludeIndexationJob;
+ Optional<CeTask> peek = queue.peek(uuid, excludeIndexationJob);
+ if (peek.isPresent()) {
+ return peek;
+ }
+ if (excludeIndexationJob) {
+ peek = queue.peek(uuid, false);
+ if (peek.isPresent()) {
+ return peek;
+ }
+ // do not lookup for indexation tasks anymore
+ indexationTaskLookupEnabled = false;
+ LOG.info(String.format("worker %s found no pending task (including indexation task). Disabling indexation task lookup for this worker until next SonarQube restart.", uuid));
+ }
+ return Optional.empty();
+ }
+
private final class ExecuteTask implements Runnable, AutoCloseable {
private final CeTask task;
private final RunningState localRunningState;
@@ -237,7 +263,7 @@ public class CeWorkerImpl implements CeWorker {
}
private void finalizeTask(CeTask task, Profiler ceProfiler, CeActivityDto.Status status,
- @Nullable CeTaskResult taskResult, @Nullable Throwable error) {
+ @Nullable CeTaskResult taskResult, @Nullable Throwable error) {
try {
queue.remove(task, status, taskResult, error);
} catch (Exception e) {
diff --git a/server/sonar-ce/src/test/java/org/sonar/ce/queue/InternalCeQueueImplTest.java b/server/sonar-ce/src/test/java/org/sonar/ce/queue/InternalCeQueueImplTest.java
index d31c514d45a..922da41cdc3 100644
--- a/server/sonar-ce/src/test/java/org/sonar/ce/queue/InternalCeQueueImplTest.java
+++ b/server/sonar-ce/src/test/java/org/sonar/ce/queue/InternalCeQueueImplTest.java
@@ -159,18 +159,18 @@ public class InternalCeQueueImplTest {
expectedException.expect(NullPointerException.class);
expectedException.expectMessage("workerUuid can't be null");
- underTest.peek(null);
+ underTest.peek(null, false);
}
@Test
public void test_remove() {
CeTask task = submit(CeTaskTypes.REPORT, newProjectDto("PROJECT_1"));
- Optional<CeTask> peek = underTest.peek(WORKER_UUID_1);
+ Optional<CeTask> peek = underTest.peek(WORKER_UUID_1, false);
underTest.remove(peek.get(), CeActivityDto.Status.SUCCESS, null, null);
// queue is empty
assertThat(db.getDbClient().ceQueueDao().selectByUuid(db.getSession(), task.getUuid()).isPresent()).isFalse();
- assertThat(underTest.peek(WORKER_UUID_2).isPresent()).isFalse();
+ assertThat(underTest.peek(WORKER_UUID_2, false).isPresent()).isFalse();
// available in history
Optional<CeActivityDto> history = db.getDbClient().ceActivityDao().selectByUuid(db.getSession(), task.getUuid());
@@ -199,7 +199,7 @@ public class InternalCeQueueImplTest {
@Test
public void remove_does_not_set_analysisUuid_in_CeActivity_when_CeTaskResult_has_no_analysis_uuid() {
CeTask task = submit(CeTaskTypes.REPORT, newProjectDto("PROJECT_1"));
- Optional<CeTask> peek = underTest.peek(WORKER_UUID_1);
+ Optional<CeTask> peek = underTest.peek(WORKER_UUID_1, false);
underTest.remove(peek.get(), CeActivityDto.Status.SUCCESS, newTaskResult(null), null);
// available in history
@@ -212,7 +212,7 @@ public class InternalCeQueueImplTest {
public void remove_sets_analysisUuid_in_CeActivity_when_CeTaskResult_has_analysis_uuid() {
CeTask task = submit(CeTaskTypes.REPORT, newProjectDto("PROJECT_1"));
- Optional<CeTask> peek = underTest.peek(WORKER_UUID_2);
+ Optional<CeTask> peek = underTest.peek(WORKER_UUID_2, false);
underTest.remove(peek.get(), CeActivityDto.Status.SUCCESS, newTaskResult(AN_ANALYSIS_UUID), null);
// available in history
@@ -226,7 +226,7 @@ public class InternalCeQueueImplTest {
Throwable error = new NullPointerException("Fake NPE to test persistence to DB");
CeTask task = submit(CeTaskTypes.REPORT, newProjectDto("PROJECT_1"));
- Optional<CeTask> peek = underTest.peek(WORKER_UUID_1);
+ Optional<CeTask> peek = underTest.peek(WORKER_UUID_1, false);
underTest.remove(peek.get(), CeActivityDto.Status.FAILED, null, error);
Optional<CeActivityDto> activityDto = db.getDbClient().ceActivityDao().selectByUuid(session, task.getUuid());
@@ -242,7 +242,7 @@ public class InternalCeQueueImplTest {
Throwable error = new TypedExceptionImpl("aType", "aMessage");
CeTask task = submit(CeTaskTypes.REPORT, newProjectDto("PROJECT_1"));
- Optional<CeTask> peek = underTest.peek(WORKER_UUID_1);
+ Optional<CeTask> peek = underTest.peek(WORKER_UUID_1, false);
underTest.remove(peek.get(), CeActivityDto.Status.FAILED, null, error);
CeActivityDto activityDto = db.getDbClient().ceActivityDao().selectByUuid(session, task.getUuid()).get();
@@ -348,7 +348,7 @@ public class InternalCeQueueImplTest {
public void test_peek() {
CeTask task = submit(CeTaskTypes.REPORT, newProjectDto("PROJECT_1"));
- Optional<CeTask> peek = underTest.peek(WORKER_UUID_1);
+ Optional<CeTask> peek = underTest.peek(WORKER_UUID_1, false);
assertThat(peek.isPresent()).isTrue();
assertThat(peek.get().getUuid()).isEqualTo(task.getUuid());
assertThat(peek.get().getType()).isEqualTo(CeTaskTypes.REPORT);
@@ -356,7 +356,7 @@ public class InternalCeQueueImplTest {
assertThat(peek.get().getMainComponent()).contains(peek.get().getComponent().get());
// no more pending tasks
- peek = underTest.peek(WORKER_UUID_2);
+ peek = underTest.peek(WORKER_UUID_2, false);
assertThat(peek.isPresent()).isFalse();
}
@@ -366,7 +366,7 @@ public class InternalCeQueueImplTest {
ComponentDto branch = db.components().insertProjectBranch(project);
CeTask task = submit(CeTaskTypes.REPORT, branch);
- Optional<CeTask> peek = underTest.peek(WORKER_UUID_1);
+ Optional<CeTask> peek = underTest.peek(WORKER_UUID_1, false);
assertThat(peek.isPresent()).isTrue();
assertThat(peek.get().getUuid()).isEqualTo(task.getUuid());
assertThat(peek.get().getType()).isEqualTo(CeTaskTypes.REPORT);
@@ -374,7 +374,7 @@ public class InternalCeQueueImplTest {
assertThat(peek.get().getMainComponent()).contains(new CeTask.Component(project.uuid(), project.getDbKey(), project.name()));
// no more pending tasks
- peek = underTest.peek(WORKER_UUID_2);
+ peek = underTest.peek(WORKER_UUID_2, false);
assertThat(peek.isPresent()).isFalse();
}
@@ -383,11 +383,11 @@ public class InternalCeQueueImplTest {
CeTask task = submit(CeTaskTypes.REPORT, newProjectDto("PROJECT_1"));
underTest.pauseWorkers();
- Optional<CeTask> peek = underTest.peek(WORKER_UUID_1);
+ Optional<CeTask> peek = underTest.peek(WORKER_UUID_1, false);
assertThat(peek).isEmpty();
underTest.resumeWorkers();
- peek = underTest.peek(WORKER_UUID_1);
+ peek = underTest.peek(WORKER_UUID_1, false);
assertThat(peek).isPresent();
assertThat(peek.get().getUuid()).isEqualTo(task.getUuid());
}
@@ -401,7 +401,7 @@ public class InternalCeQueueImplTest {
makeInProgress(dto, "foo");
db.commit();
- assertThat(underTest.peek(WORKER_UUID_1)).isEmpty();
+ assertThat(underTest.peek(WORKER_UUID_1, false)).isEmpty();
}
@Test
@@ -409,7 +409,7 @@ public class InternalCeQueueImplTest {
submit(CeTaskTypes.REPORT, newProjectDto("PROJECT_1"));
when(computeEngineStatus.getStatus()).thenReturn(STOPPING);
- Optional<CeTask> peek = underTest.peek(WORKER_UUID_1);
+ Optional<CeTask> peek = underTest.peek(WORKER_UUID_1, false);
assertThat(peek.isPresent()).isFalse();
}
@@ -421,7 +421,7 @@ public class InternalCeQueueImplTest {
.setStatus(CeQueueDto.Status.PENDING));
db.commit();
- assertThat(underTest.peek(WORKER_UUID_1).get().getUuid()).isEqualTo("uuid");
+ assertThat(underTest.peek(WORKER_UUID_1, false).get().getUuid()).isEqualTo("uuid");
}
@Test
@@ -430,7 +430,7 @@ public class InternalCeQueueImplTest {
CeQueueDto u1 = insertPending("u1");// will be picked-because older than any of the reset ones
CeQueueDto u2 = insertInProgress("u2", WORKER_UUID_1);// will be reset
- assertThat(underTest.peek(WORKER_UUID_1).get().getUuid()).isEqualTo("u0");
+ assertThat(underTest.peek(WORKER_UUID_1, false).get().getUuid()).isEqualTo("u0");
verifyUnmodifiedTask(u1);
verifyResetTask(u2);
@@ -444,7 +444,7 @@ public class InternalCeQueueImplTest {
CeQueueDto u3 = insertInProgress("u3", WORKER_UUID_1);
CeQueueDto u4 = insertInProgress("u4", WORKER_UUID_2);
- assertThat(underTest.peek(WORKER_UUID_1).get().getUuid()).isEqualTo("u0");
+ assertThat(underTest.peek(WORKER_UUID_1, false).get().getUuid()).isEqualTo("u0");
verifyResetTask(u1);
verifyUnmodifiedTask(u2);
@@ -502,7 +502,7 @@ public class InternalCeQueueImplTest {
@Test
public void fail_to_cancel_if_in_progress() {
CeTask task = submit(CeTaskTypes.REPORT, newProjectDto("PROJECT_1"));
- underTest.peek(WORKER_UUID_2);
+ underTest.peek(WORKER_UUID_2, false);
CeQueueDto queueDto = db.getDbClient().ceQueueDao().selectByUuid(db.getSession(), task.getUuid()).get();
expectedException.expect(IllegalStateException.class);
@@ -516,7 +516,7 @@ public class InternalCeQueueImplTest {
CeTask inProgressTask = submit(CeTaskTypes.REPORT, newProjectDto("PROJECT_1"));
CeTask pendingTask1 = submit(CeTaskTypes.REPORT, newProjectDto("PROJECT_2"));
CeTask pendingTask2 = submit(CeTaskTypes.REPORT, newProjectDto("PROJECT_3"));
- underTest.peek(WORKER_UUID_2);
+ underTest.peek(WORKER_UUID_2, false);
int canceledCount = underTest.cancelAll();
assertThat(canceledCount).isEqualTo(2);
diff --git a/server/sonar-ce/src/test/java/org/sonar/ce/taskprocessor/CeWorkerImplTest.java b/server/sonar-ce/src/test/java/org/sonar/ce/taskprocessor/CeWorkerImplTest.java
index 5f67a7fdb32..84943ca6009 100644
--- a/server/sonar-ce/src/test/java/org/sonar/ce/taskprocessor/CeWorkerImplTest.java
+++ b/server/sonar-ce/src/test/java/org/sonar/ce/taskprocessor/CeWorkerImplTest.java
@@ -36,7 +36,6 @@ import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.mockito.ArgumentCaptor;
import org.mockito.InOrder;
-import org.mockito.Mockito;
import org.mockito.stubbing.Answer;
import org.sonar.api.impl.utils.TestSystem2;
import org.sonar.api.utils.MessageException;
@@ -60,9 +59,11 @@ import org.sonar.server.organization.BillingValidations;
import static org.apache.commons.lang.RandomStringUtils.randomAlphabetic;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.ArgumentMatchers.isNull;
+import static org.mockito.Mockito.*;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.reset;
@@ -99,7 +100,7 @@ public class CeWorkerImplTest {
private CeWorker underTest = new CeWorkerImpl(randomOrdinal, workerUuid, queue, taskProcessorRepository, ceWorkerController,
executionListener1, executionListener2);
private CeWorker underTestNoListener = new CeWorkerImpl(randomOrdinal, workerUuid, queue, taskProcessorRepository, ceWorkerController);
- private InOrder inOrder = Mockito.inOrder(taskProcessor, queue, executionListener1, executionListener2);
+ private InOrder inOrder = inOrder(taskProcessor, queue, executionListener1, executionListener2);
private final CeTask.User submitter = new CeTask.User("UUID_USER_1", "LOGIN_1");
@Before
@@ -144,7 +145,7 @@ public class CeWorkerImplTest {
@Test
public void no_pending_tasks_in_queue() throws Exception {
- when(queue.peek(anyString())).thenReturn(Optional.empty());
+ when(queue.peek(anyString(), anyBoolean())).thenReturn(Optional.empty());
assertThat(underTest.call()).isEqualTo(NO_TASK);
@@ -153,7 +154,7 @@ public class CeWorkerImplTest {
@Test
public void no_pending_tasks_in_queue_without_listener() throws Exception {
- when(queue.peek(anyString())).thenReturn(Optional.empty());
+ when(queue.peek(anyString(), anyBoolean())).thenReturn(Optional.empty());
assertThat(underTestNoListener.call()).isEqualTo(NO_TASK);
@@ -164,7 +165,7 @@ public class CeWorkerImplTest {
public void fail_when_no_CeTaskProcessor_is_found_in_repository() throws Exception {
CeTask task = createCeTask(null);
taskProcessorRepository.setNoProcessorForTask(CeTaskTypes.REPORT);
- when(queue.peek(anyString())).thenReturn(Optional.of(task));
+ when(queue.peek(anyString(), anyBoolean())).thenReturn(Optional.of(task));
assertThat(underTest.call()).isEqualTo(TASK_PROCESSED);
@@ -180,7 +181,7 @@ public class CeWorkerImplTest {
public void fail_when_no_CeTaskProcessor_is_found_in_repository_without_listener() throws Exception {
CeTask task = createCeTask(null);
taskProcessorRepository.setNoProcessorForTask(CeTaskTypes.REPORT);
- when(queue.peek(anyString())).thenReturn(Optional.of(task));
+ when(queue.peek(anyString(), anyBoolean())).thenReturn(Optional.of(task));
assertThat(underTestNoListener.call()).isEqualTo(TASK_PROCESSED);
@@ -193,7 +194,7 @@ public class CeWorkerImplTest {
public void peek_and_process_task() throws Exception {
CeTask task = createCeTask(null);
taskProcessorRepository.setProcessorForTask(task.getType(), taskProcessor);
- when(queue.peek(anyString())).thenReturn(Optional.of(task));
+ when(queue.peek(anyString(), anyBoolean())).thenReturn(Optional.of(task));
assertThat(underTest.call()).isEqualTo(TASK_PROCESSED);
@@ -210,7 +211,7 @@ public class CeWorkerImplTest {
public void peek_and_process_task_without_listeners() throws Exception {
CeTask task = createCeTask(null);
taskProcessorRepository.setProcessorForTask(task.getType(), taskProcessor);
- when(queue.peek(anyString())).thenReturn(Optional.of(task));
+ when(queue.peek(anyString(), anyBoolean())).thenReturn(Optional.of(task));
assertThat(underTestNoListener.call()).isEqualTo(TASK_PROCESSED);
@@ -223,7 +224,7 @@ public class CeWorkerImplTest {
@Test
public void fail_to_process_task() throws Exception {
CeTask task = createCeTask(null);
- when(queue.peek(anyString())).thenReturn(Optional.of(task));
+ when(queue.peek(anyString(), anyBoolean())).thenReturn(Optional.of(task));
taskProcessorRepository.setProcessorForTask(task.getType(), taskProcessor);
Throwable error = makeTaskProcessorFail(task);
@@ -241,7 +242,7 @@ public class CeWorkerImplTest {
@Test
public void fail_to_process_task_without_listeners() throws Exception {
CeTask task = createCeTask(null);
- when(queue.peek(anyString())).thenReturn(Optional.of(task));
+ when(queue.peek(anyString(), anyBoolean())).thenReturn(Optional.of(task));
taskProcessorRepository.setProcessorForTask(task.getType(), taskProcessor);
Throwable error = makeTaskProcessorFail(task);
@@ -255,7 +256,7 @@ public class CeWorkerImplTest {
@Test
public void log_task_characteristics() throws Exception {
- when(queue.peek(anyString())).thenReturn(Optional.of(createCeTask(null, "pullRequest", "123", "branch", "foo")));
+ when(queue.peek(anyString(), anyBoolean())).thenReturn(Optional.of(createCeTask(null, "pullRequest", "123", "branch", "foo")));
taskProcessorRepository.setProcessorForTask(CeTaskTypes.REPORT, taskProcessor);
underTest.call();
@@ -270,7 +271,7 @@ public class CeWorkerImplTest {
@Test
public void do_not_log_submitter_param_if_anonymous_and_success() throws Exception {
- when(queue.peek(anyString())).thenReturn(Optional.of(createCeTask(null)));
+ when(queue.peek(anyString(), anyBoolean())).thenReturn(Optional.of(createCeTask(null)));
taskProcessorRepository.setProcessorForTask(CeTaskTypes.REPORT, taskProcessor);
underTest.call();
@@ -286,7 +287,7 @@ public class CeWorkerImplTest {
@Test
public void do_not_log_submitter_param_if_anonymous_and_error() throws Exception {
CeTask ceTask = createCeTask(null);
- when(queue.peek(anyString())).thenReturn(Optional.of(ceTask));
+ when(queue.peek(anyString(), anyBoolean())).thenReturn(Optional.of(ceTask));
taskProcessorRepository.setProcessorForTask(ceTask.getType(), taskProcessor);
makeTaskProcessorFail(ceTask);
@@ -306,7 +307,7 @@ public class CeWorkerImplTest {
@Test
public void log_submitter_login_if_authenticated_and_success() throws Exception {
UserDto userDto = insertRandomUser();
- when(queue.peek(anyString())).thenReturn(Optional.of(createCeTask(toTaskSubmitter(userDto))));
+ when(queue.peek(anyString(), anyBoolean())).thenReturn(Optional.of(createCeTask(toTaskSubmitter(userDto))));
taskProcessorRepository.setProcessorForTask(CeTaskTypes.REPORT, taskProcessor);
underTest.call();
@@ -322,7 +323,7 @@ public class CeWorkerImplTest {
@Test
public void log_submitterUuid_if_user_matching_submitterUuid_can_not_be_found() throws Exception {
- when(queue.peek(anyString())).thenReturn(Optional.of(createCeTask(new CeTask.User("UUID_USER", null))));
+ when(queue.peek(anyString(), anyBoolean())).thenReturn(Optional.of(createCeTask(new CeTask.User("UUID_USER", null))));
taskProcessorRepository.setProcessorForTask(CeTaskTypes.REPORT, taskProcessor);
underTest.call();
@@ -340,7 +341,7 @@ public class CeWorkerImplTest {
public void display_submitterLogin_in_logs_when_set_in_case_of_error() throws Exception {
UserDto userDto = insertRandomUser();
CeTask ceTask = createCeTask(toTaskSubmitter(userDto));
- when(queue.peek(anyString())).thenReturn(Optional.of(ceTask));
+ when(queue.peek(anyString(), anyBoolean())).thenReturn(Optional.of(ceTask));
taskProcessorRepository.setProcessorForTask(ceTask.getType(), taskProcessor);
makeTaskProcessorFail(ceTask);
@@ -360,7 +361,7 @@ public class CeWorkerImplTest {
public void display_start_stop_at_debug_level_for_console_if_DEBUG_is_enabled_and_task_successful() throws Exception {
logTester.setLevel(LoggerLevel.DEBUG);
- when(queue.peek(anyString())).thenReturn(Optional.of(createCeTask(submitter)));
+ when(queue.peek(anyString(), anyBoolean())).thenReturn(Optional.of(createCeTask(submitter)));
taskProcessorRepository.setProcessorForTask(CeTaskTypes.REPORT, taskProcessor);
underTest.call();
@@ -379,7 +380,7 @@ public class CeWorkerImplTest {
logTester.setLevel(LoggerLevel.DEBUG);
CeTask ceTask = createCeTask(submitter);
- when(queue.peek(anyString())).thenReturn(Optional.of(ceTask));
+ when(queue.peek(anyString(), anyBoolean())).thenReturn(Optional.of(ceTask));
taskProcessorRepository.setProcessorForTask(CeTaskTypes.REPORT, taskProcessor);
makeTaskProcessorFail(ceTask);
@@ -399,7 +400,7 @@ public class CeWorkerImplTest {
@Test
public void call_sets_and_restores_thread_name_with_information_of_worker_when_there_is_no_task_to_process() throws Exception {
String threadName = randomAlphabetic(3);
- when(queue.peek(anyString())).thenAnswer(invocation -> {
+ when(queue.peek(anyString(), anyBoolean())).thenAnswer(invocation -> {
assertThat(Thread.currentThread().getName())
.isEqualTo("Worker " + randomOrdinal + " (UUID=" + workerUuid + ") on " + threadName);
return Optional.empty();
@@ -413,7 +414,7 @@ public class CeWorkerImplTest {
@Test
public void call_sets_and_restores_thread_name_with_information_of_worker_when_a_task_is_processed() throws Exception {
String threadName = randomAlphabetic(3);
- when(queue.peek(anyString())).thenAnswer(invocation -> {
+ when(queue.peek(anyString(), anyBoolean())).thenAnswer(invocation -> {
assertThat(Thread.currentThread().getName())
.isEqualTo("Worker " + randomOrdinal + " (UUID=" + workerUuid + ") on " + threadName);
return Optional.of(createCeTask(submitter));
@@ -429,7 +430,7 @@ public class CeWorkerImplTest {
public void call_sets_and_restores_thread_name_with_information_of_worker_when_an_error_occurs() throws Exception {
String threadName = randomAlphabetic(3);
CeTask ceTask = createCeTask(submitter);
- when(queue.peek(anyString())).thenAnswer(invocation -> {
+ when(queue.peek(anyString(), anyBoolean())).thenAnswer(invocation -> {
assertThat(Thread.currentThread().getName())
.isEqualTo("Worker " + randomOrdinal + " (UUID=" + workerUuid + ") on " + threadName);
return Optional.of(ceTask);
@@ -457,7 +458,7 @@ public class CeWorkerImplTest {
@Test
public void log_error_when_task_fails_with_not_MessageException() throws Exception {
CeTask ceTask = createCeTask(submitter);
- when(queue.peek(anyString())).thenReturn(Optional.of(ceTask));
+ when(queue.peek(anyString(), anyBoolean())).thenReturn(Optional.of(ceTask));
taskProcessorRepository.setProcessorForTask(CeTaskTypes.REPORT, taskProcessor);
makeTaskProcessorFail(ceTask);
@@ -475,7 +476,7 @@ public class CeWorkerImplTest {
@Test
public void do_no_log_error_when_task_fails_with_MessageException() throws Exception {
CeTask ceTask = createCeTask(submitter);
- when(queue.peek(anyString())).thenReturn(Optional.of(ceTask));
+ when(queue.peek(anyString(), anyBoolean())).thenReturn(Optional.of(ceTask));
taskProcessorRepository.setProcessorForTask(CeTaskTypes.REPORT, taskProcessor);
makeTaskProcessorFail(ceTask, MessageException.of("simulate MessageException thrown by TaskProcessor#process"));
@@ -491,7 +492,7 @@ public class CeWorkerImplTest {
@Test
public void do_no_log_error_when_task_fails_with_BillingValidationsException() throws Exception {
CeTask ceTask = createCeTask(submitter);
- when(queue.peek(anyString())).thenReturn(Optional.of(ceTask));
+ when(queue.peek(anyString(), anyBoolean())).thenReturn(Optional.of(ceTask));
taskProcessorRepository.setProcessorForTask(CeTaskTypes.REPORT, taskProcessor);
makeTaskProcessorFail(ceTask, new BillingValidations.BillingValidationsException("simulate MessageException thrown by TaskProcessor#process"));
@@ -507,7 +508,7 @@ public class CeWorkerImplTest {
@Test
public void log_error_when_task_was_successful_but_ending_state_can_not_be_persisted_to_db() throws Exception {
CeTask ceTask = createCeTask(submitter);
- when(queue.peek(anyString())).thenReturn(Optional.of(ceTask));
+ when(queue.peek(anyString(), anyBoolean())).thenReturn(Optional.of(ceTask));
taskProcessorRepository.setProcessorForTask(CeTaskTypes.REPORT, taskProcessor);
doThrow(new RuntimeException("Simulate queue#remove failing")).when(queue).remove(ceTask, CeActivityDto.Status.SUCCESS, null, null);
@@ -519,7 +520,7 @@ public class CeWorkerImplTest {
@Test
public void log_error_when_task_failed_and_ending_state_can_not_be_persisted_to_db() throws Exception {
CeTask ceTask = createCeTask(submitter);
- when(queue.peek(anyString())).thenReturn(Optional.of(ceTask));
+ when(queue.peek(anyString(), anyBoolean())).thenReturn(Optional.of(ceTask));
taskProcessorRepository.setProcessorForTask(CeTaskTypes.REPORT, taskProcessor);
IllegalStateException ex = makeTaskProcessorFail(ceTask);
RuntimeException runtimeException = new RuntimeException("Simulate queue#remove failing");
@@ -548,7 +549,7 @@ public class CeWorkerImplTest {
@Test
public void log_error_as_suppressed_when_task_failed_with_MessageException_and_ending_state_can_not_be_persisted_to_db() throws Exception {
CeTask ceTask = createCeTask(submitter);
- when(queue.peek(anyString())).thenReturn(Optional.of(ceTask));
+ when(queue.peek(anyString(), anyBoolean())).thenReturn(Optional.of(ceTask));
taskProcessorRepository.setProcessorForTask(CeTaskTypes.REPORT, taskProcessor);
MessageException ex = makeTaskProcessorFail(ceTask, MessageException.of("simulate MessageException thrown by TaskProcessor#process"));
RuntimeException runtimeException = new RuntimeException("Simulate queue#remove failing");
@@ -579,7 +580,7 @@ public class CeWorkerImplTest {
CountDownLatch inCallLatch = new CountDownLatch(1);
CountDownLatch assertionsDoneLatch = new CountDownLatch(1);
// mock long running peek(String) call => Thread is executing call() but not running a task
- when(queue.peek(anyString())).thenAnswer((Answer<Optional<CeTask>>) invocation -> {
+ when(queue.peek(anyString(), anyBoolean())).thenAnswer((Answer<Optional<CeTask>>) invocation -> {
inCallLatch.countDown();
try {
assertionsDoneLatch.await(10, TimeUnit.SECONDS);
@@ -614,7 +615,7 @@ public class CeWorkerImplTest {
String taskType = randomAlphabetic(12);
CeTask ceTask = mock(CeTask.class);
when(ceTask.getType()).thenReturn(taskType);
- when(queue.peek(anyString())).thenReturn(Optional.of(ceTask));
+ when(queue.peek(anyString(), anyBoolean())).thenReturn(Optional.of(ceTask));
taskProcessorRepository.setProcessorForTask(taskType, new SimpleCeTaskProcessor() {
@CheckForNull
@Override
@@ -657,7 +658,7 @@ public class CeWorkerImplTest {
CountDownLatch inCallLatch = new CountDownLatch(1);
CountDownLatch assertionsDoneLatch = new CountDownLatch(1);
// mock long running peek(String) call => Thread is executing call() but not running a task
- when(queue.peek(anyString())).thenAnswer((Answer<Optional<CeTask>>) invocation -> {
+ when(queue.peek(anyString(), anyBoolean())).thenAnswer((Answer<Optional<CeTask>>) invocation -> {
inCallLatch.countDown();
try {
assertionsDoneLatch.await(10, TimeUnit.SECONDS);
@@ -688,7 +689,7 @@ public class CeWorkerImplTest {
String taskType = randomAlphabetic(12);
CeTask ceTask = mock(CeTask.class);
when(ceTask.getType()).thenReturn(taskType);
- when(queue.peek(anyString())).thenReturn(Optional.of(ceTask));
+ when(queue.peek(anyString(), anyBoolean())).thenReturn(Optional.of(ceTask));
taskProcessorRepository.setProcessorForTask(taskType, new SimpleCeTaskProcessor() {
@CheckForNull
@@ -745,7 +746,7 @@ public class CeWorkerImplTest {
}
private void verifyWorkerUuid() {
- verify(queue).peek(workerUuidCaptor.capture());
+ verify(queue, atLeastOnce()).peek(workerUuidCaptor.capture(), anyBoolean());
assertThat(workerUuidCaptor.getValue()).isEqualTo(workerUuid);
}