diff options
author | Pierre <pierre.guillot@sonarsource.com> | 2020-06-02 16:13:48 +0200 |
---|---|---|
committer | sonartech <sonartech@sonarsource.com> | 2020-06-26 20:04:56 +0000 |
commit | e4b519ed129dbc7b76eab00d6c48166a8993e35f (patch) | |
tree | 2f79b9a0bdeccd1494fcc1ac4d3a14cb78b50999 /server/sonar-ce/src/main/java/org | |
parent | ac3abae6429931aa5065b9f1ac359ff9a4ca78fd (diff) | |
download | sonarqube-e4b519ed129dbc7b76eab00d6c48166a8993e35f.tar.gz sonarqube-e4b519ed129dbc7b76eab00d6c48166a8993e35f.zip |
SONAR-13444 background tasks for issue indexation implementation
Diffstat (limited to 'server/sonar-ce/src/main/java/org')
5 files changed, 86 insertions, 23 deletions
diff --git a/server/sonar-ce/src/main/java/org/sonar/ce/container/ComputeEngineContainerImpl.java b/server/sonar-ce/src/main/java/org/sonar/ce/container/ComputeEngineContainerImpl.java index a455b71f39c..f7e3793a818 100644 --- a/server/sonar-ce/src/main/java/org/sonar/ce/container/ComputeEngineContainerImpl.java +++ b/server/sonar-ce/src/main/java/org/sonar/ce/container/ComputeEngineContainerImpl.java @@ -53,6 +53,7 @@ import org.sonar.ce.async.SynchronousAsyncExecution; import org.sonar.ce.cleaning.CeCleaningModule; import org.sonar.ce.cleaning.NoopCeCleaningSchedulerImpl; import org.sonar.ce.db.ReadOnlyPropertiesDao; +import org.sonar.ce.issue.index.NoAsyncIssueIndexing; import org.sonar.ce.logging.CeProcessLogging; import org.sonar.ce.monitoring.CEQueueStatusImpl; import org.sonar.ce.monitoring.DistributedCEQueueStatusImpl; @@ -64,6 +65,7 @@ import org.sonar.ce.task.projectanalysis.ProjectAnalysisTaskModule; import org.sonar.ce.task.projectanalysis.analysis.ProjectConfigurationFactory; import org.sonar.ce.task.projectanalysis.issue.AdHocRuleCreator; import org.sonar.ce.task.projectanalysis.notification.ReportAnalysisFailureNotificationModule; +import org.sonar.ce.task.projectanalysis.taskprocessor.IssueSyncTaskModule; import org.sonar.ce.taskprocessor.CeProcessingScheduler; import org.sonar.ce.taskprocessor.CeTaskProcessorModule; import org.sonar.core.component.DefaultResourceTypes; @@ -402,6 +404,7 @@ public class ComputeEngineContainerImpl implements ComputeEngineContainer { // issues IssueStorage.class, + NoAsyncIssueIndexing.class, IssueIndexer.class, IssueIteratorFactory.class, IssueFieldsSetter.class, // used in Web Services and CE's DebtCalculator @@ -439,6 +442,7 @@ public class ComputeEngineContainerImpl implements ComputeEngineContainer { CeHttpModule.class, CeTaskCommonsModule.class, ProjectAnalysisTaskModule.class, + IssueSyncTaskModule.class, CeTaskProcessorModule.class, OfficialDistribution.class, diff --git a/server/sonar-ce/src/main/java/org/sonar/ce/issue/index/NoAsyncIssueIndexing.java b/server/sonar-ce/src/main/java/org/sonar/ce/issue/index/NoAsyncIssueIndexing.java new file mode 100644 index 00000000000..ba4d525c4dd --- /dev/null +++ b/server/sonar-ce/src/main/java/org/sonar/ce/issue/index/NoAsyncIssueIndexing.java @@ -0,0 +1,31 @@ +/* + * SonarQube + * Copyright (C) 2009-2020 SonarSource SA + * mailto:info AT sonarsource DOT com + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 3 of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with this program; if not, write to the Free Software Foundation, + * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ +package org.sonar.ce.issue.index; + +import org.sonar.api.ce.ComputeEngineSide; +import org.sonar.server.issue.index.AsyncIssueIndexing; + +@ComputeEngineSide +public class NoAsyncIssueIndexing implements AsyncIssueIndexing { + @Override + public void triggerOnIndexCreation() { + throw new IllegalStateException("Async issue indexing should not be triggered in Compute Engine"); + } +} diff --git a/server/sonar-ce/src/main/java/org/sonar/ce/queue/InternalCeQueue.java b/server/sonar-ce/src/main/java/org/sonar/ce/queue/InternalCeQueue.java index 2058d7a19c5..30466805efc 100644 --- a/server/sonar-ce/src/main/java/org/sonar/ce/queue/InternalCeQueue.java +++ b/server/sonar-ce/src/main/java/org/sonar/ce/queue/InternalCeQueue.java @@ -40,6 +40,8 @@ public interface InternalCeQueue extends CeQueue { * The task status is changed to {@link org.sonar.db.ce.CeQueueDto.Status#IN_PROGRESS}. * Does not return anything if workers are paused or being paused (see {@link #getWorkersPauseStatus()}. * + * @param excludeIndexationJob change the underlying request to exclude indexation tasks. + * * <p>Only a single task can be peeked by project.</p> * * <p>An unchecked exception may be thrown on technical errors (db connection, ...).</p> @@ -47,7 +49,7 @@ public interface InternalCeQueue extends CeQueue { * <p>Tasks which have been executed twice already but are still {@link org.sonar.db.ce.CeQueueDto.Status#PENDING} * are ignored</p> */ - Optional<CeTask> peek(String workerUuid); + Optional<CeTask> peek(String workerUuid, boolean excludeIndexationJob); /** * Removes a task from the queue and registers it to past activities. This method diff --git a/server/sonar-ce/src/main/java/org/sonar/ce/queue/InternalCeQueueImpl.java b/server/sonar-ce/src/main/java/org/sonar/ce/queue/InternalCeQueueImpl.java index 5bdab435748..711961b1f95 100644 --- a/server/sonar-ce/src/main/java/org/sonar/ce/queue/InternalCeQueueImpl.java +++ b/server/sonar-ce/src/main/java/org/sonar/ce/queue/InternalCeQueueImpl.java @@ -73,7 +73,7 @@ public class InternalCeQueueImpl extends CeQueueImpl implements InternalCeQueue } @Override - public Optional<CeTask> peek(String workerUuid) { + public Optional<CeTask> peek(String workerUuid, boolean excludeIndexationJob) { requireNonNull(workerUuid, "workerUuid can't be null"); if (computeEngineStatus.getStatus() != ComputeEngineStatus.Status.STARTED || getWorkersPauseStatus() != WorkersPauseStatus.RESUMED) { @@ -86,20 +86,20 @@ public class InternalCeQueueImpl extends CeQueueImpl implements InternalCeQueue dbSession.commit(); LOG.debug("{} in progress tasks reset for worker uuid {}", i, workerUuid); } - Optional<CeQueueDto> opt = ceQueueDao.peek(dbSession, workerUuid); - if (opt.isPresent()) { - CeQueueDto taskDto = opt.get(); - Map<String, ComponentDto> componentsByUuid = loadComponentDtos(dbSession, taskDto); - Map<String, String> characteristics = dbClient.ceTaskCharacteristicsDao().selectByTaskUuids(dbSession, singletonList(taskDto.getUuid())).stream() - .collect(uniqueIndex(CeTaskCharacteristicDto::getKey, CeTaskCharacteristicDto::getValue)); - - CeTask task = convertToTask(dbSession, taskDto, characteristics, - ofNullable(taskDto.getComponentUuid()).map(componentsByUuid::get).orElse(null), - ofNullable(taskDto.getMainComponentUuid()).map(componentsByUuid::get).orElse(null)); - queueStatus.addInProgress(); - return Optional.of(task); + Optional<CeQueueDto> opt = ceQueueDao.peek(dbSession, workerUuid, excludeIndexationJob); + if (!opt.isPresent()) { + return Optional.empty(); } - return Optional.empty(); + CeQueueDto taskDto = opt.get(); + Map<String, ComponentDto> componentsByUuid = loadComponentDtos(dbSession, taskDto); + Map<String, String> characteristics = dbClient.ceTaskCharacteristicsDao().selectByTaskUuids(dbSession, singletonList(taskDto.getUuid())).stream() + .collect(uniqueIndex(CeTaskCharacteristicDto::getKey, CeTaskCharacteristicDto::getValue)); + + CeTask task = convertToTask(dbSession, taskDto, characteristics, + ofNullable(taskDto.getComponentUuid()).map(componentsByUuid::get).orElse(null), + ofNullable(taskDto.getMainComponentUuid()).map(componentsByUuid::get).orElse(null)); + queueStatus.addInProgress(); + return Optional.of(task); } } diff --git a/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeWorkerImpl.java b/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeWorkerImpl.java index ef6cad9a8ab..10c234639f5 100644 --- a/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeWorkerImpl.java +++ b/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeWorkerImpl.java @@ -61,17 +61,21 @@ public class CeWorkerImpl implements CeWorker { private final CeWorkerController ceWorkerController; private final List<ExecutionListener> listeners; private final AtomicReference<RunningState> runningState = new AtomicReference<>(); + private boolean indexationTaskLookupEnabled; + private boolean excludeIndexationJob; public CeWorkerImpl(int ordinal, String uuid, - InternalCeQueue queue, CeTaskProcessorRepository taskProcessorRepository, - CeWorkerController ceWorkerController, - ExecutionListener... listeners) { + InternalCeQueue queue, CeTaskProcessorRepository taskProcessorRepository, + CeWorkerController ceWorkerController, + ExecutionListener... listeners) { this.ordinal = checkOrdinal(ordinal); this.uuid = uuid; this.queue = queue; this.taskProcessorRepository = taskProcessorRepository; this.ceWorkerController = ceWorkerController; this.listeners = Arrays.asList(listeners); + indexationTaskLookupEnabled = true; + excludeIndexationJob = false; } private static int checkOrdinal(int ordinal) { @@ -119,7 +123,7 @@ public class CeWorkerImpl implements CeWorker { localRunningState = new RunningState(currentThread); if (!runningState.compareAndSet(null, localRunningState)) { LOG.warn("Worker {} (UUID=%s) starts executing with new Thread {} while running state isn't null. " + - "Forcefully updating Workers's running state to new Thread.", + "Forcefully updating Workers's running state to new Thread.", getOrdinal(), getUUID(), currentThread); runningState.set(localRunningState); } @@ -138,7 +142,7 @@ public class CeWorkerImpl implements CeWorker { localRunningState.runningThread.setName(oldName); if (!runningState.compareAndSet(localRunningState, null)) { LOG.warn("Worker {} (UUID=%s) ending execution in Thread {} while running state has already changed." + - " Keeping this new state.", + " Keeping this new state.", getOrdinal(), getUUID(), localRunningState.runningThread); } } @@ -154,7 +158,7 @@ public class CeWorkerImpl implements CeWorker { } try (CeWorkerController.ProcessingRecorderHook processing = ceWorkerController.registerProcessingFor(this); - ExecuteTask executeTask = new ExecuteTask(localRunningState, ceTask.get())) { + ExecuteTask executeTask = new ExecuteTask(localRunningState, ceTask.get())) { executeTask.run(); } catch (Exception e) { LOG.error(format("An error occurred while executing task with uuid '%s'", ceTask.get().getUuid()), e); @@ -164,13 +168,35 @@ public class CeWorkerImpl implements CeWorker { private Optional<CeTask> tryAndFindTaskToExecute() { try { - return queue.peek(uuid); + if (indexationTaskLookupEnabled) { + return tryAndFindTaskToExecuteIncludingIndexation(); + } else { + return queue.peek(uuid, true); + } } catch (Exception e) { LOG.error("Failed to pop the queue of analysis reports", e); } return Optional.empty(); } + private Optional<CeTask> tryAndFindTaskToExecuteIncludingIndexation() { + excludeIndexationJob = !excludeIndexationJob; + Optional<CeTask> peek = queue.peek(uuid, excludeIndexationJob); + if (peek.isPresent()) { + return peek; + } + if (excludeIndexationJob) { + peek = queue.peek(uuid, false); + if (peek.isPresent()) { + return peek; + } + // do not lookup for indexation tasks anymore + indexationTaskLookupEnabled = false; + LOG.info(String.format("worker %s found no pending task (including indexation task). Disabling indexation task lookup for this worker until next SonarQube restart.", uuid)); + } + return Optional.empty(); + } + private final class ExecuteTask implements Runnable, AutoCloseable { private final CeTask task; private final RunningState localRunningState; @@ -237,7 +263,7 @@ public class CeWorkerImpl implements CeWorker { } private void finalizeTask(CeTask task, Profiler ceProfiler, CeActivityDto.Status status, - @Nullable CeTaskResult taskResult, @Nullable Throwable error) { + @Nullable CeTaskResult taskResult, @Nullable Throwable error) { try { queue.remove(task, status, taskResult, error); } catch (Exception e) { |