aboutsummaryrefslogtreecommitdiffstats
path: root/server/sonar-ce/src/main/java/org
diff options
context:
space:
mode:
authorPierre <pierre.guillot@sonarsource.com>2020-06-02 16:13:48 +0200
committersonartech <sonartech@sonarsource.com>2020-06-26 20:04:56 +0000
commite4b519ed129dbc7b76eab00d6c48166a8993e35f (patch)
tree2f79b9a0bdeccd1494fcc1ac4d3a14cb78b50999 /server/sonar-ce/src/main/java/org
parentac3abae6429931aa5065b9f1ac359ff9a4ca78fd (diff)
downloadsonarqube-e4b519ed129dbc7b76eab00d6c48166a8993e35f.tar.gz
sonarqube-e4b519ed129dbc7b76eab00d6c48166a8993e35f.zip
SONAR-13444 background tasks for issue indexation implementation
Diffstat (limited to 'server/sonar-ce/src/main/java/org')
-rw-r--r--server/sonar-ce/src/main/java/org/sonar/ce/container/ComputeEngineContainerImpl.java4
-rw-r--r--server/sonar-ce/src/main/java/org/sonar/ce/issue/index/NoAsyncIssueIndexing.java31
-rw-r--r--server/sonar-ce/src/main/java/org/sonar/ce/queue/InternalCeQueue.java4
-rw-r--r--server/sonar-ce/src/main/java/org/sonar/ce/queue/InternalCeQueueImpl.java28
-rw-r--r--server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeWorkerImpl.java42
5 files changed, 86 insertions, 23 deletions
diff --git a/server/sonar-ce/src/main/java/org/sonar/ce/container/ComputeEngineContainerImpl.java b/server/sonar-ce/src/main/java/org/sonar/ce/container/ComputeEngineContainerImpl.java
index a455b71f39c..f7e3793a818 100644
--- a/server/sonar-ce/src/main/java/org/sonar/ce/container/ComputeEngineContainerImpl.java
+++ b/server/sonar-ce/src/main/java/org/sonar/ce/container/ComputeEngineContainerImpl.java
@@ -53,6 +53,7 @@ import org.sonar.ce.async.SynchronousAsyncExecution;
import org.sonar.ce.cleaning.CeCleaningModule;
import org.sonar.ce.cleaning.NoopCeCleaningSchedulerImpl;
import org.sonar.ce.db.ReadOnlyPropertiesDao;
+import org.sonar.ce.issue.index.NoAsyncIssueIndexing;
import org.sonar.ce.logging.CeProcessLogging;
import org.sonar.ce.monitoring.CEQueueStatusImpl;
import org.sonar.ce.monitoring.DistributedCEQueueStatusImpl;
@@ -64,6 +65,7 @@ import org.sonar.ce.task.projectanalysis.ProjectAnalysisTaskModule;
import org.sonar.ce.task.projectanalysis.analysis.ProjectConfigurationFactory;
import org.sonar.ce.task.projectanalysis.issue.AdHocRuleCreator;
import org.sonar.ce.task.projectanalysis.notification.ReportAnalysisFailureNotificationModule;
+import org.sonar.ce.task.projectanalysis.taskprocessor.IssueSyncTaskModule;
import org.sonar.ce.taskprocessor.CeProcessingScheduler;
import org.sonar.ce.taskprocessor.CeTaskProcessorModule;
import org.sonar.core.component.DefaultResourceTypes;
@@ -402,6 +404,7 @@ public class ComputeEngineContainerImpl implements ComputeEngineContainer {
// issues
IssueStorage.class,
+ NoAsyncIssueIndexing.class,
IssueIndexer.class,
IssueIteratorFactory.class,
IssueFieldsSetter.class, // used in Web Services and CE's DebtCalculator
@@ -439,6 +442,7 @@ public class ComputeEngineContainerImpl implements ComputeEngineContainer {
CeHttpModule.class,
CeTaskCommonsModule.class,
ProjectAnalysisTaskModule.class,
+ IssueSyncTaskModule.class,
CeTaskProcessorModule.class,
OfficialDistribution.class,
diff --git a/server/sonar-ce/src/main/java/org/sonar/ce/issue/index/NoAsyncIssueIndexing.java b/server/sonar-ce/src/main/java/org/sonar/ce/issue/index/NoAsyncIssueIndexing.java
new file mode 100644
index 00000000000..ba4d525c4dd
--- /dev/null
+++ b/server/sonar-ce/src/main/java/org/sonar/ce/issue/index/NoAsyncIssueIndexing.java
@@ -0,0 +1,31 @@
+/*
+ * SonarQube
+ * Copyright (C) 2009-2020 SonarSource SA
+ * mailto:info AT sonarsource DOT com
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 3 of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this program; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ */
+package org.sonar.ce.issue.index;
+
+import org.sonar.api.ce.ComputeEngineSide;
+import org.sonar.server.issue.index.AsyncIssueIndexing;
+
+@ComputeEngineSide
+public class NoAsyncIssueIndexing implements AsyncIssueIndexing {
+ @Override
+ public void triggerOnIndexCreation() {
+ throw new IllegalStateException("Async issue indexing should not be triggered in Compute Engine");
+ }
+}
diff --git a/server/sonar-ce/src/main/java/org/sonar/ce/queue/InternalCeQueue.java b/server/sonar-ce/src/main/java/org/sonar/ce/queue/InternalCeQueue.java
index 2058d7a19c5..30466805efc 100644
--- a/server/sonar-ce/src/main/java/org/sonar/ce/queue/InternalCeQueue.java
+++ b/server/sonar-ce/src/main/java/org/sonar/ce/queue/InternalCeQueue.java
@@ -40,6 +40,8 @@ public interface InternalCeQueue extends CeQueue {
* The task status is changed to {@link org.sonar.db.ce.CeQueueDto.Status#IN_PROGRESS}.
* Does not return anything if workers are paused or being paused (see {@link #getWorkersPauseStatus()}.
*
+ * @param excludeIndexationJob change the underlying request to exclude indexation tasks.
+ *
* <p>Only a single task can be peeked by project.</p>
*
* <p>An unchecked exception may be thrown on technical errors (db connection, ...).</p>
@@ -47,7 +49,7 @@ public interface InternalCeQueue extends CeQueue {
* <p>Tasks which have been executed twice already but are still {@link org.sonar.db.ce.CeQueueDto.Status#PENDING}
* are ignored</p>
*/
- Optional<CeTask> peek(String workerUuid);
+ Optional<CeTask> peek(String workerUuid, boolean excludeIndexationJob);
/**
* Removes a task from the queue and registers it to past activities. This method
diff --git a/server/sonar-ce/src/main/java/org/sonar/ce/queue/InternalCeQueueImpl.java b/server/sonar-ce/src/main/java/org/sonar/ce/queue/InternalCeQueueImpl.java
index 5bdab435748..711961b1f95 100644
--- a/server/sonar-ce/src/main/java/org/sonar/ce/queue/InternalCeQueueImpl.java
+++ b/server/sonar-ce/src/main/java/org/sonar/ce/queue/InternalCeQueueImpl.java
@@ -73,7 +73,7 @@ public class InternalCeQueueImpl extends CeQueueImpl implements InternalCeQueue
}
@Override
- public Optional<CeTask> peek(String workerUuid) {
+ public Optional<CeTask> peek(String workerUuid, boolean excludeIndexationJob) {
requireNonNull(workerUuid, "workerUuid can't be null");
if (computeEngineStatus.getStatus() != ComputeEngineStatus.Status.STARTED || getWorkersPauseStatus() != WorkersPauseStatus.RESUMED) {
@@ -86,20 +86,20 @@ public class InternalCeQueueImpl extends CeQueueImpl implements InternalCeQueue
dbSession.commit();
LOG.debug("{} in progress tasks reset for worker uuid {}", i, workerUuid);
}
- Optional<CeQueueDto> opt = ceQueueDao.peek(dbSession, workerUuid);
- if (opt.isPresent()) {
- CeQueueDto taskDto = opt.get();
- Map<String, ComponentDto> componentsByUuid = loadComponentDtos(dbSession, taskDto);
- Map<String, String> characteristics = dbClient.ceTaskCharacteristicsDao().selectByTaskUuids(dbSession, singletonList(taskDto.getUuid())).stream()
- .collect(uniqueIndex(CeTaskCharacteristicDto::getKey, CeTaskCharacteristicDto::getValue));
-
- CeTask task = convertToTask(dbSession, taskDto, characteristics,
- ofNullable(taskDto.getComponentUuid()).map(componentsByUuid::get).orElse(null),
- ofNullable(taskDto.getMainComponentUuid()).map(componentsByUuid::get).orElse(null));
- queueStatus.addInProgress();
- return Optional.of(task);
+ Optional<CeQueueDto> opt = ceQueueDao.peek(dbSession, workerUuid, excludeIndexationJob);
+ if (!opt.isPresent()) {
+ return Optional.empty();
}
- return Optional.empty();
+ CeQueueDto taskDto = opt.get();
+ Map<String, ComponentDto> componentsByUuid = loadComponentDtos(dbSession, taskDto);
+ Map<String, String> characteristics = dbClient.ceTaskCharacteristicsDao().selectByTaskUuids(dbSession, singletonList(taskDto.getUuid())).stream()
+ .collect(uniqueIndex(CeTaskCharacteristicDto::getKey, CeTaskCharacteristicDto::getValue));
+
+ CeTask task = convertToTask(dbSession, taskDto, characteristics,
+ ofNullable(taskDto.getComponentUuid()).map(componentsByUuid::get).orElse(null),
+ ofNullable(taskDto.getMainComponentUuid()).map(componentsByUuid::get).orElse(null));
+ queueStatus.addInProgress();
+ return Optional.of(task);
}
}
diff --git a/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeWorkerImpl.java b/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeWorkerImpl.java
index ef6cad9a8ab..10c234639f5 100644
--- a/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeWorkerImpl.java
+++ b/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeWorkerImpl.java
@@ -61,17 +61,21 @@ public class CeWorkerImpl implements CeWorker {
private final CeWorkerController ceWorkerController;
private final List<ExecutionListener> listeners;
private final AtomicReference<RunningState> runningState = new AtomicReference<>();
+ private boolean indexationTaskLookupEnabled;
+ private boolean excludeIndexationJob;
public CeWorkerImpl(int ordinal, String uuid,
- InternalCeQueue queue, CeTaskProcessorRepository taskProcessorRepository,
- CeWorkerController ceWorkerController,
- ExecutionListener... listeners) {
+ InternalCeQueue queue, CeTaskProcessorRepository taskProcessorRepository,
+ CeWorkerController ceWorkerController,
+ ExecutionListener... listeners) {
this.ordinal = checkOrdinal(ordinal);
this.uuid = uuid;
this.queue = queue;
this.taskProcessorRepository = taskProcessorRepository;
this.ceWorkerController = ceWorkerController;
this.listeners = Arrays.asList(listeners);
+ indexationTaskLookupEnabled = true;
+ excludeIndexationJob = false;
}
private static int checkOrdinal(int ordinal) {
@@ -119,7 +123,7 @@ public class CeWorkerImpl implements CeWorker {
localRunningState = new RunningState(currentThread);
if (!runningState.compareAndSet(null, localRunningState)) {
LOG.warn("Worker {} (UUID=%s) starts executing with new Thread {} while running state isn't null. " +
- "Forcefully updating Workers's running state to new Thread.",
+ "Forcefully updating Workers's running state to new Thread.",
getOrdinal(), getUUID(), currentThread);
runningState.set(localRunningState);
}
@@ -138,7 +142,7 @@ public class CeWorkerImpl implements CeWorker {
localRunningState.runningThread.setName(oldName);
if (!runningState.compareAndSet(localRunningState, null)) {
LOG.warn("Worker {} (UUID=%s) ending execution in Thread {} while running state has already changed." +
- " Keeping this new state.",
+ " Keeping this new state.",
getOrdinal(), getUUID(), localRunningState.runningThread);
}
}
@@ -154,7 +158,7 @@ public class CeWorkerImpl implements CeWorker {
}
try (CeWorkerController.ProcessingRecorderHook processing = ceWorkerController.registerProcessingFor(this);
- ExecuteTask executeTask = new ExecuteTask(localRunningState, ceTask.get())) {
+ ExecuteTask executeTask = new ExecuteTask(localRunningState, ceTask.get())) {
executeTask.run();
} catch (Exception e) {
LOG.error(format("An error occurred while executing task with uuid '%s'", ceTask.get().getUuid()), e);
@@ -164,13 +168,35 @@ public class CeWorkerImpl implements CeWorker {
private Optional<CeTask> tryAndFindTaskToExecute() {
try {
- return queue.peek(uuid);
+ if (indexationTaskLookupEnabled) {
+ return tryAndFindTaskToExecuteIncludingIndexation();
+ } else {
+ return queue.peek(uuid, true);
+ }
} catch (Exception e) {
LOG.error("Failed to pop the queue of analysis reports", e);
}
return Optional.empty();
}
+ private Optional<CeTask> tryAndFindTaskToExecuteIncludingIndexation() {
+ excludeIndexationJob = !excludeIndexationJob;
+ Optional<CeTask> peek = queue.peek(uuid, excludeIndexationJob);
+ if (peek.isPresent()) {
+ return peek;
+ }
+ if (excludeIndexationJob) {
+ peek = queue.peek(uuid, false);
+ if (peek.isPresent()) {
+ return peek;
+ }
+ // do not lookup for indexation tasks anymore
+ indexationTaskLookupEnabled = false;
+ LOG.info(String.format("worker %s found no pending task (including indexation task). Disabling indexation task lookup for this worker until next SonarQube restart.", uuid));
+ }
+ return Optional.empty();
+ }
+
private final class ExecuteTask implements Runnable, AutoCloseable {
private final CeTask task;
private final RunningState localRunningState;
@@ -237,7 +263,7 @@ public class CeWorkerImpl implements CeWorker {
}
private void finalizeTask(CeTask task, Profiler ceProfiler, CeActivityDto.Status status,
- @Nullable CeTaskResult taskResult, @Nullable Throwable error) {
+ @Nullable CeTaskResult taskResult, @Nullable Throwable error) {
try {
queue.remove(task, status, taskResult, error);
} catch (Exception e) {