aboutsummaryrefslogtreecommitdiffstats
path: root/server/sonar-ce
diff options
context:
space:
mode:
authorSébastien Lesaint <sebastien.lesaint@sonarsource.com>2017-03-31 11:52:17 +0200
committerEric Hartmann <hartmann.eric@gmail.Com>2017-04-27 09:23:18 +0200
commit2af0a2dd2bff90fb3a7cdcb531271d537f5c8b57 (patch)
tree488054d366f006667799c8d975492ee33ec57e6d /server/sonar-ce
parent588904e9c4460e485749effc85663e4594e16c79 (diff)
downloadsonarqube-2af0a2dd2bff90fb3a7cdcb531271d537f5c8b57.tar.gz
sonarqube-2af0a2dd2bff90fb3a7cdcb531271d537f5c8b57.zip
SONAR-9057 CE tables cleaning and purging is done by web leader
moved CE queue and taskprocessor classes (the later depending upon the former) to sonar-ce module moved CE configuration classes to sonar-ce-api module
Diffstat (limited to 'server/sonar-ce')
-rw-r--r--server/sonar-ce/src/main/java/org/sonar/ce/CeConfigurationModule.java2
-rw-r--r--server/sonar-ce/src/main/java/org/sonar/ce/CeQueueModule.java4
-rw-r--r--server/sonar-ce/src/main/java/org/sonar/ce/container/ComputeEngineContainerImpl.java6
-rw-r--r--server/sonar-ce/src/main/java/org/sonar/ce/queue/CeQueueCleaner.java85
-rw-r--r--server/sonar-ce/src/main/java/org/sonar/ce/queue/CeQueueInitializer.java53
-rw-r--r--server/sonar-ce/src/main/java/org/sonar/ce/queue/InternalCeQueue.java78
-rw-r--r--server/sonar-ce/src/main/java/org/sonar/ce/queue/InternalCeQueueImpl.java251
-rw-r--r--server/sonar-ce/src/main/java/org/sonar/ce/queue/PurgeCeActivities.java69
-rw-r--r--server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeProcessingScheduler.java26
-rw-r--r--server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeProcessingSchedulerExecutorService.java29
-rw-r--r--server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeProcessingSchedulerExecutorServiceImpl.java81
-rw-r--r--server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeProcessingSchedulerImpl.java137
-rw-r--r--server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeTaskProcessorModule.java33
-rw-r--r--server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeTaskProcessorRepository.java34
-rw-r--r--server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeTaskProcessorRepositoryImpl.java105
-rw-r--r--server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeWorkerCallable.java32
-rw-r--r--server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeWorkerCallableImpl.java137
-rw-r--r--server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/package-info.java23
-rw-r--r--server/sonar-ce/src/test/java/org/sonar/ce/configuration/CeConfigurationRule.java53
-rw-r--r--server/sonar-ce/src/test/java/org/sonar/ce/queue/CeQueueCleanerTest.java101
-rw-r--r--server/sonar-ce/src/test/java/org/sonar/ce/queue/CeQueueInitializerTest.java57
-rw-r--r--server/sonar-ce/src/test/java/org/sonar/ce/queue/InternalCeQueueImplTest.java550
-rw-r--r--server/sonar-ce/src/test/java/org/sonar/ce/queue/PurgeCeActivitiesTest.java66
-rw-r--r--server/sonar-ce/src/test/java/org/sonar/ce/taskprocessor/CeProcessingSchedulerImplTest.java542
-rw-r--r--server/sonar-ce/src/test/java/org/sonar/ce/taskprocessor/CeTaskProcessorRepositoryImplTest.java148
-rw-r--r--server/sonar-ce/src/test/java/org/sonar/ce/taskprocessor/CeTaskProcessorRepositoryRule.java78
-rw-r--r--server/sonar-ce/src/test/java/org/sonar/ce/taskprocessor/CeWorkerCallableImplTest.java230
27 files changed, 3004 insertions, 6 deletions
diff --git a/server/sonar-ce/src/main/java/org/sonar/ce/CeConfigurationModule.java b/server/sonar-ce/src/main/java/org/sonar/ce/CeConfigurationModule.java
index 5a37d1d06e0..f89cf4efd39 100644
--- a/server/sonar-ce/src/main/java/org/sonar/ce/CeConfigurationModule.java
+++ b/server/sonar-ce/src/main/java/org/sonar/ce/CeConfigurationModule.java
@@ -19,10 +19,10 @@
*/
package org.sonar.ce;
+import org.sonar.ce.configuration.CeConfigurationImpl;
import org.sonar.ce.log.CeLogging;
import org.sonar.core.platform.Module;
import org.sonar.process.systeminfo.ProcessStateSystemInfo;
-import org.sonar.server.computation.configuration.CeConfigurationImpl;
import org.sonar.server.computation.monitoring.CeDatabaseMBeanImpl;
public class CeConfigurationModule extends Module {
diff --git a/server/sonar-ce/src/main/java/org/sonar/ce/CeQueueModule.java b/server/sonar-ce/src/main/java/org/sonar/ce/CeQueueModule.java
index a11e826cd12..159e0d53687 100644
--- a/server/sonar-ce/src/main/java/org/sonar/ce/CeQueueModule.java
+++ b/server/sonar-ce/src/main/java/org/sonar/ce/CeQueueModule.java
@@ -22,8 +22,8 @@ package org.sonar.ce;
import org.sonar.core.platform.Module;
import org.sonar.server.computation.monitoring.CEQueueStatusImpl;
import org.sonar.server.computation.monitoring.CeTasksMBeanImpl;
-import org.sonar.server.computation.queue.CeQueueInitializer;
-import org.sonar.server.computation.queue.InternalCeQueueImpl;
+import org.sonar.ce.queue.CeQueueInitializer;
+import org.sonar.ce.queue.InternalCeQueueImpl;
public class CeQueueModule extends Module {
@Override
diff --git a/server/sonar-ce/src/main/java/org/sonar/ce/container/ComputeEngineContainerImpl.java b/server/sonar-ce/src/main/java/org/sonar/ce/container/ComputeEngineContainerImpl.java
index ed306dd165a..c36413a9ac3 100644
--- a/server/sonar-ce/src/main/java/org/sonar/ce/container/ComputeEngineContainerImpl.java
+++ b/server/sonar-ce/src/main/java/org/sonar/ce/container/ComputeEngineContainerImpl.java
@@ -46,7 +46,10 @@ import org.sonar.ce.CeTaskCommonsModule;
import org.sonar.ce.db.ReadOnlyPropertiesDao;
import org.sonar.ce.log.CeProcessLogging;
import org.sonar.ce.platform.ComputeEngineExtensionInstaller;
+import org.sonar.ce.queue.CeQueueCleaner;
+import org.sonar.ce.queue.PurgeCeActivities;
import org.sonar.ce.settings.ProjectSettingsFactory;
+import org.sonar.ce.taskprocessor.CeTaskProcessorModule;
import org.sonar.ce.user.CeUserSession;
import org.sonar.core.component.DefaultResourceTypes;
import org.sonar.core.config.CorePropertyDefinitions;
@@ -68,10 +71,7 @@ import org.sonar.process.logging.LogbackHelper;
import org.sonar.server.component.ComponentCleanerService;
import org.sonar.server.component.ComponentFinder;
import org.sonar.server.component.index.ComponentIndexer;
-import org.sonar.server.computation.queue.CeQueueCleaner;
-import org.sonar.server.computation.queue.PurgeCeActivities;
import org.sonar.server.computation.task.projectanalysis.ProjectAnalysisTaskModule;
-import org.sonar.server.computation.taskprocessor.CeTaskProcessorModule;
import org.sonar.server.debt.DebtModelPluginRepository;
import org.sonar.server.debt.DebtRulesXMLImporter;
import org.sonar.server.event.NewAlerts;
diff --git a/server/sonar-ce/src/main/java/org/sonar/ce/queue/CeQueueCleaner.java b/server/sonar-ce/src/main/java/org/sonar/ce/queue/CeQueueCleaner.java
new file mode 100644
index 00000000000..39dd83f88bb
--- /dev/null
+++ b/server/sonar-ce/src/main/java/org/sonar/ce/queue/CeQueueCleaner.java
@@ -0,0 +1,85 @@
+/*
+ * SonarQube
+ * Copyright (C) 2009-2017 SonarSource SA
+ * mailto:info AT sonarsource DOT com
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 3 of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this program; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ */
+package org.sonar.ce.queue;
+
+import java.util.List;
+import org.picocontainer.Startable;
+import org.sonar.api.ce.ComputeEngineSide;
+import org.sonar.api.platform.ServerUpgradeStatus;
+import org.sonar.api.utils.log.Logger;
+import org.sonar.api.utils.log.Loggers;
+import org.sonar.db.DbClient;
+import org.sonar.db.DbSession;
+
+/**
+ * Cleans-up the Compute Engine queue and resets the JMX counters.
+ * CE workers must not be started before execution of this class.
+ */
+@ComputeEngineSide
+public class CeQueueCleaner implements Startable {
+
+ private static final Logger LOGGER = Loggers.get(CeQueueCleaner.class);
+
+ private final DbClient dbClient;
+ private final ServerUpgradeStatus serverUpgradeStatus;
+ private final InternalCeQueue queue;
+
+ public CeQueueCleaner(DbClient dbClient, ServerUpgradeStatus serverUpgradeStatus, InternalCeQueue queue) {
+ this.dbClient = dbClient;
+ this.serverUpgradeStatus = serverUpgradeStatus;
+ this.queue = queue;
+ }
+
+ @Override
+ public void start() {
+ if (serverUpgradeStatus.isUpgraded()) {
+ cleanOnUpgrade();
+ } else {
+ try (DbSession dbSession = dbClient.openSession(false)) {
+ verifyConsistency(dbSession);
+ }
+ }
+ }
+
+ private void cleanOnUpgrade() {
+ // we assume that pending tasks are not compatible with the new version
+ // and can't be processed
+ LOGGER.info("Cancel all pending tasks (due to upgrade)");
+ queue.clear();
+ }
+
+ private void verifyConsistency(DbSession dbSession) {
+ // server is not being upgraded
+ dbClient.ceQueueDao().resetAllToPendingStatus(dbSession);
+ dbSession.commit();
+
+ // Reports that have been processed are not kept in database yet.
+ // They are supposed to be systematically dropped.
+ // Let's clean-up orphans if any.
+ List<String> uuids = dbClient.ceTaskInputDao().selectUuidsNotInQueue(dbSession);
+ dbClient.ceTaskInputDao().deleteByUuids(dbSession, uuids);
+ dbSession.commit();
+ }
+
+ @Override
+ public void stop() {
+ // nothing to do
+ }
+}
diff --git a/server/sonar-ce/src/main/java/org/sonar/ce/queue/CeQueueInitializer.java b/server/sonar-ce/src/main/java/org/sonar/ce/queue/CeQueueInitializer.java
new file mode 100644
index 00000000000..d794a4c1ebb
--- /dev/null
+++ b/server/sonar-ce/src/main/java/org/sonar/ce/queue/CeQueueInitializer.java
@@ -0,0 +1,53 @@
+/*
+ * SonarQube
+ * Copyright (C) 2009-2017 SonarSource SA
+ * mailto:info AT sonarsource DOT com
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 3 of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this program; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ */
+package org.sonar.ce.queue;
+
+import org.sonar.api.ce.ComputeEngineSide;
+import org.sonar.api.platform.Server;
+import org.sonar.api.platform.ServerStartHandler;
+import org.sonar.ce.taskprocessor.CeProcessingScheduler;
+
+/**
+ * Cleans-up the queue, initializes JMX counters then schedule
+ * the execution of workers. That allows to not prevent workers
+ * from peeking the queue before it's ready.
+ */
+@ComputeEngineSide
+public class CeQueueInitializer implements ServerStartHandler {
+
+ private final CeProcessingScheduler scheduler;
+ private boolean done = false;
+
+ public CeQueueInitializer(CeProcessingScheduler scheduler) {
+ this.scheduler = scheduler;
+ }
+
+ @Override
+ public void onServerStart(Server server) {
+ if (!done) {
+ initCe();
+ this.done = true;
+ }
+ }
+
+ private void initCe() {
+ scheduler.startScheduling();
+ }
+}
diff --git a/server/sonar-ce/src/main/java/org/sonar/ce/queue/InternalCeQueue.java b/server/sonar-ce/src/main/java/org/sonar/ce/queue/InternalCeQueue.java
new file mode 100644
index 00000000000..01f5341f764
--- /dev/null
+++ b/server/sonar-ce/src/main/java/org/sonar/ce/queue/InternalCeQueue.java
@@ -0,0 +1,78 @@
+/*
+ * SonarQube
+ * Copyright (C) 2009-2017 SonarSource SA
+ * mailto:info AT sonarsource DOT com
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 3 of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this program; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ */
+package org.sonar.ce.queue;
+
+import com.google.common.base.Optional;
+import javax.annotation.Nullable;
+import org.sonar.ce.queue.CeQueue;
+import org.sonar.ce.queue.CeTask;
+import org.sonar.ce.queue.CeTaskResult;
+import org.sonar.db.ce.CeActivityDto.Status;
+
+/**
+ * Queue of pending Compute Engine tasks. Both producer and consumer actions
+ * are implemented.
+ * <p>
+ * This class is decoupled from the regular task type {@link org.sonar.db.ce.CeTaskTypes#REPORT}.
+ * </p>
+ */
+public interface InternalCeQueue extends CeQueue {
+
+ /**
+ * Peek the oldest task in status {@link org.sonar.db.ce.CeQueueDto.Status#PENDING}.
+ * The task status is changed to {@link org.sonar.db.ce.CeQueueDto.Status#IN_PROGRESS}.
+ * Does not return anything if the queue is paused (see {@link #isPeekPaused()}.
+ *
+ * <p>Only a single task can be peeked by project.</p>
+ *
+ * <p>An unchecked exception may be thrown on technical errors (db connection, ...).</p>
+ *
+ * <p>Tasks which have been executed twice already but are still {@link org.sonar.db.ce.CeQueueDto.Status#PENDING}
+ * are ignored</p>
+ */
+ Optional<CeTask> peek(String workerUuid);
+
+ /**
+ * Removes all the tasks from the queue, whatever their status. They are marked
+ * as {@link Status#CANCELED} in past activity.
+ * This method can NOT be called when workers are being executed, as in progress
+ * tasks can't be killed.
+ *
+ * @return the number of canceled tasks
+ */
+ int clear();
+
+ /**
+ * Removes a task from the queue and registers it to past activities. This method
+ * is called by Compute Engine workers when task is processed and can include an option {@link CeTaskResult} object.
+ *
+ * @throws IllegalStateException if the task does not exist in the queue
+ * @throws IllegalArgumentException if {@code error} is non {@code null} but {@code status} is not {@link Status#FAILED}
+ */
+ void remove(CeTask task, Status status, @Nullable CeTaskResult taskResult, @Nullable Throwable error);
+
+ void cancelWornOuts();
+
+ void pausePeek();
+
+ void resumePeek();
+
+ boolean isPeekPaused();
+}
diff --git a/server/sonar-ce/src/main/java/org/sonar/ce/queue/InternalCeQueueImpl.java b/server/sonar-ce/src/main/java/org/sonar/ce/queue/InternalCeQueueImpl.java
new file mode 100644
index 00000000000..bcd6f0abccf
--- /dev/null
+++ b/server/sonar-ce/src/main/java/org/sonar/ce/queue/InternalCeQueueImpl.java
@@ -0,0 +1,251 @@
+/*
+ * SonarQube
+ * Copyright (C) 2009-2017 SonarSource SA
+ * mailto:info AT sonarsource DOT com
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 3 of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this program; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ */
+package org.sonar.ce.queue;
+
+import com.google.common.base.Optional;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.PrintWriter;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+import javax.annotation.CheckForNull;
+import javax.annotation.Nullable;
+import org.apache.log4j.Logger;
+import org.sonar.api.ce.ComputeEngineSide;
+import org.sonar.api.utils.System2;
+import org.sonar.ce.monitoring.CEQueueStatus;
+import org.sonar.core.util.UuidFactory;
+import org.sonar.db.DbClient;
+import org.sonar.db.DbSession;
+import org.sonar.db.ce.CeActivityDto;
+import org.sonar.db.ce.CeQueueDto;
+import org.sonar.server.organization.DefaultOrganizationProvider;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkState;
+import static java.util.Objects.requireNonNull;
+
+@ComputeEngineSide
+public class InternalCeQueueImpl extends CeQueueImpl implements InternalCeQueue {
+
+ private static final int MAX_EXECUTION_COUNT = 2;
+
+ private final System2 system2;
+ private final DbClient dbClient;
+ private final CEQueueStatus queueStatus;
+
+ // state
+ private AtomicBoolean peekPaused = new AtomicBoolean(false);
+
+ public InternalCeQueueImpl(System2 system2, DbClient dbClient, UuidFactory uuidFactory, CEQueueStatus queueStatus,
+ DefaultOrganizationProvider defaultOrganizationProvider) {
+ super(dbClient, uuidFactory, defaultOrganizationProvider);
+ this.system2 = system2;
+ this.dbClient = dbClient;
+ this.queueStatus = queueStatus;
+ }
+
+ @Override
+ public Optional<CeTask> peek(String workerUuid) {
+ requireNonNull(workerUuid, "workerUuid can't be null");
+
+ if (peekPaused.get()) {
+ return Optional.absent();
+ }
+ try (DbSession dbSession = dbClient.openSession(false)) {
+ Optional<CeQueueDto> dto = dbClient.ceQueueDao().peek(dbSession, workerUuid, MAX_EXECUTION_COUNT);
+ CeTask task = null;
+ if (dto.isPresent()) {
+ task = loadTask(dbSession, dto.get());
+ queueStatus.addInProgress();
+ }
+ return Optional.fromNullable(task);
+
+ }
+ }
+
+ @Override
+ public int clear() {
+ return cancelAll(true);
+ }
+
+ @Override
+ public void remove(CeTask task, CeActivityDto.Status status, @Nullable CeTaskResult taskResult, @Nullable Throwable error) {
+ checkArgument(error == null || status == CeActivityDto.Status.FAILED, "Error can be provided only when status is FAILED");
+ try (DbSession dbSession = dbClient.openSession(false)) {
+ Optional<CeQueueDto> queueDto = dbClient.ceQueueDao().selectByUuid(dbSession, task.getUuid());
+ checkState(queueDto.isPresent(), "Task does not exist anymore: %s", task);
+ CeActivityDto activityDto = new CeActivityDto(queueDto.get());
+ activityDto.setStatus(status);
+ updateQueueStatus(status, activityDto);
+ updateTaskResult(activityDto, taskResult);
+ updateError(activityDto, error);
+ remove(dbSession, queueDto.get(), activityDto);
+ }
+ }
+
+ private static void updateTaskResult(CeActivityDto activityDto, @Nullable CeTaskResult taskResult) {
+ if (taskResult != null) {
+ java.util.Optional<String> analysisUuid = taskResult.getAnalysisUuid();
+ if (analysisUuid.isPresent()) {
+ activityDto.setAnalysisUuid(analysisUuid.get());
+ }
+ }
+ }
+
+ private static void updateError(CeActivityDto activityDto, @Nullable Throwable error) {
+ if (error == null) {
+ return;
+ }
+
+ activityDto.setErrorMessage(error.getMessage());
+ String stacktrace = getStackTraceForPersistence(error);
+ if (stacktrace != null) {
+ activityDto.setErrorStacktrace(stacktrace);
+ }
+ }
+
+ @CheckForNull
+ private static String getStackTraceForPersistence(Throwable error) {
+ try (ByteArrayOutputStream out = new ByteArrayOutputStream();
+ LineReturnEnforcedPrintStream printStream = new LineReturnEnforcedPrintStream(out)) {
+ error.printStackTrace(printStream);
+ printStream.flush();
+ return out.toString();
+ } catch (IOException e) {
+ Logger.getLogger(InternalCeQueueImpl.class).debug("Failed to getStacktrace out of error", e);
+ return null;
+ }
+ }
+
+ private void updateQueueStatus(CeActivityDto.Status status, CeActivityDto activityDto) {
+ Long startedAt = activityDto.getStartedAt();
+ if (startedAt == null) {
+ return;
+ }
+ activityDto.setExecutedAt(system2.now());
+ long executionTimeInMs = activityDto.getExecutedAt() - startedAt;
+ activityDto.setExecutionTimeMs(executionTimeInMs);
+ if (status == CeActivityDto.Status.SUCCESS) {
+ queueStatus.addSuccess(executionTimeInMs);
+ } else {
+ queueStatus.addError(executionTimeInMs);
+ }
+ }
+
+ @Override
+ public void cancelWornOuts() {
+ try (DbSession dbSession = dbClient.openSession(false)) {
+ List<CeQueueDto> wornOutTasks = dbClient.ceQueueDao().selectPendingByMinimumExecutionCount(dbSession, MAX_EXECUTION_COUNT);
+ wornOutTasks.forEach(queueDto -> {
+ CeActivityDto activityDto = new CeActivityDto(queueDto);
+ activityDto.setStatus(CeActivityDto.Status.CANCELED);
+ updateQueueStatus(CeActivityDto.Status.CANCELED, activityDto);
+ remove(dbSession, queueDto, activityDto);
+ });
+ }
+ }
+
+ @Override
+ public void pausePeek() {
+ this.peekPaused.set(true);
+ }
+
+ @Override
+ public void resumePeek() {
+ this.peekPaused.set(false);
+ }
+
+ @Override
+ public boolean isPeekPaused() {
+ return peekPaused.get();
+ }
+
+ /**
+ * A {@link PrintWriter} subclass which enforces that line returns are {@code \n} whichever the platform.
+ */
+ private static class LineReturnEnforcedPrintStream extends PrintWriter {
+
+ LineReturnEnforcedPrintStream(OutputStream out) {
+ super(out);
+ }
+
+ @Override
+ public void println() {
+ super.print('\n');
+ }
+
+ @Override
+ public void println(boolean x) {
+ super.print(x);
+ println();
+ }
+
+ @Override
+ public void println(char x) {
+ super.print(x);
+ println();
+ }
+
+ @Override
+ public void println(int x) {
+ super.print(x);
+ println();
+ }
+
+ @Override
+ public void println(long x) {
+ super.print(x);
+ println();
+ }
+
+ @Override
+ public void println(float x) {
+ super.print(x);
+ println();
+ }
+
+ @Override
+ public void println(double x) {
+ super.print(x);
+ println();
+ }
+
+ @Override
+ public void println(char[] x) {
+ super.print(x);
+ println();
+ }
+
+ @Override
+ public void println(String x) {
+ super.print(x);
+ println();
+ }
+
+ @Override
+ public void println(Object x) {
+ super.print(x);
+ println();
+ }
+ }
+
+}
diff --git a/server/sonar-ce/src/main/java/org/sonar/ce/queue/PurgeCeActivities.java b/server/sonar-ce/src/main/java/org/sonar/ce/queue/PurgeCeActivities.java
new file mode 100644
index 00000000000..04a81ef1208
--- /dev/null
+++ b/server/sonar-ce/src/main/java/org/sonar/ce/queue/PurgeCeActivities.java
@@ -0,0 +1,69 @@
+/*
+ * SonarQube
+ * Copyright (C) 2009-2017 SonarSource SA
+ * mailto:info AT sonarsource DOT com
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 3 of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this program; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ */
+package org.sonar.ce.queue;
+
+import java.util.Calendar;
+import java.util.Set;
+import org.sonar.api.Startable;
+import org.sonar.api.ce.ComputeEngineSide;
+import org.sonar.api.utils.System2;
+import org.sonar.api.utils.log.Logger;
+import org.sonar.api.utils.log.Loggers;
+import org.sonar.core.util.stream.MoreCollectors;
+import org.sonar.db.DbClient;
+import org.sonar.db.DbSession;
+import org.sonar.db.ce.CeActivityDto;
+
+@ComputeEngineSide
+public class PurgeCeActivities implements Startable {
+
+ private static final Logger LOGGER = Loggers.get(PurgeCeActivities.class);
+
+ private final DbClient dbClient;
+ private final System2 system2;
+
+ public PurgeCeActivities(DbClient dbClient, System2 system2) {
+ this.dbClient = dbClient;
+ this.system2 = system2;
+ }
+
+ @Override
+ public void start() {
+ try (DbSession dbSession = dbClient.openSession(false)) {
+ Calendar sixMonthsAgo = Calendar.getInstance();
+ sixMonthsAgo.setTimeInMillis(system2.now());
+ sixMonthsAgo.add(Calendar.DATE, -180);
+
+ LOGGER.info("Delete the Compute Engine tasks created before {}", sixMonthsAgo.getTime());
+ Set<String> ceActivityUuids = dbClient.ceActivityDao().selectOlderThan(dbSession, sixMonthsAgo.getTimeInMillis())
+ .stream()
+ .map(CeActivityDto::getUuid)
+ .collect(MoreCollectors.toSet());
+ dbClient.ceActivityDao().deleteByUuids(dbSession, ceActivityUuids);
+ dbClient.ceScannerContextDao().deleteByUuids(dbSession, ceActivityUuids);
+ dbSession.commit();
+ }
+ }
+
+ @Override
+ public void stop() {
+ // nothing to do
+ }
+}
diff --git a/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeProcessingScheduler.java b/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeProcessingScheduler.java
new file mode 100644
index 00000000000..831684d2e8c
--- /dev/null
+++ b/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeProcessingScheduler.java
@@ -0,0 +1,26 @@
+/*
+ * SonarQube
+ * Copyright (C) 2009-2017 SonarSource SA
+ * mailto:info AT sonarsource DOT com
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 3 of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this program; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ */
+package org.sonar.ce.taskprocessor;
+
+public interface CeProcessingScheduler {
+
+ void startScheduling();
+
+}
diff --git a/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeProcessingSchedulerExecutorService.java b/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeProcessingSchedulerExecutorService.java
new file mode 100644
index 00000000000..2aa14e6962d
--- /dev/null
+++ b/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeProcessingSchedulerExecutorService.java
@@ -0,0 +1,29 @@
+/*
+ * SonarQube
+ * Copyright (C) 2009-2017 SonarSource SA
+ * mailto:info AT sonarsource DOT com
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 3 of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this program; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ */
+package org.sonar.ce.taskprocessor;
+
+import com.google.common.util.concurrent.ListeningScheduledExecutorService;
+import org.sonar.server.util.StoppableExecutorService;
+
+/**
+ * The {@link java.util.concurrent.ExecutorService} responsible for running {@link CeWorkerCallableImpl}.
+ */
+public interface CeProcessingSchedulerExecutorService extends StoppableExecutorService, ListeningScheduledExecutorService {
+}
diff --git a/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeProcessingSchedulerExecutorServiceImpl.java b/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeProcessingSchedulerExecutorServiceImpl.java
new file mode 100644
index 00000000000..184d32cbc15
--- /dev/null
+++ b/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeProcessingSchedulerExecutorServiceImpl.java
@@ -0,0 +1,81 @@
+/*
+ * SonarQube
+ * Copyright (C) 2009-2017 SonarSource SA
+ * mailto:info AT sonarsource DOT com
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 3 of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this program; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ */
+package org.sonar.ce.taskprocessor;
+
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListenableScheduledFuture;
+import com.google.common.util.concurrent.ListeningScheduledExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import java.util.concurrent.Callable;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import org.sonar.ce.configuration.CeConfiguration;
+import org.sonar.server.util.AbstractStoppableExecutorService;
+
+public class CeProcessingSchedulerExecutorServiceImpl extends AbstractStoppableExecutorService<ListeningScheduledExecutorService>
+ implements CeProcessingSchedulerExecutorService {
+ private static final String THREAD_NAME_PREFIX = "ce-worker-";
+
+ public CeProcessingSchedulerExecutorServiceImpl(CeConfiguration ceConfiguration) {
+ super(
+ MoreExecutors.listeningDecorator(
+ Executors.newScheduledThreadPool(ceConfiguration.getWorkerCount(),
+ new ThreadFactoryBuilder()
+ .setNameFormat(THREAD_NAME_PREFIX + "%d")
+ .setPriority(Thread.MIN_PRIORITY)
+ .build())));
+ }
+
+ @Override
+ public <T> ListenableFuture<T> submit(Callable<T> task) {
+ return delegate.submit(task);
+ }
+
+ @Override
+ public <T> ListenableFuture<T> submit(Runnable task, T result) {
+ return delegate.submit(task, result);
+ }
+
+ @Override
+ public ListenableFuture<?> submit(Runnable task) {
+ return delegate.submit(task);
+ }
+
+ @Override
+ public ListenableScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
+ return delegate.schedule(command, delay, unit);
+ }
+
+ @Override
+ public <V> ListenableScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
+ return delegate.schedule(callable, delay, unit);
+ }
+
+ @Override
+ public ListenableScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) {
+ return delegate.scheduleAtFixedRate(command, initialDelay, period, unit);
+ }
+
+ @Override
+ public ListenableScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) {
+ return delegate.scheduleWithFixedDelay(command, initialDelay, delay, unit);
+ }
+}
diff --git a/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeProcessingSchedulerImpl.java b/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeProcessingSchedulerImpl.java
new file mode 100644
index 00000000000..47f56af6ff1
--- /dev/null
+++ b/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeProcessingSchedulerImpl.java
@@ -0,0 +1,137 @@
+/*
+ * SonarQube
+ * Copyright (C) 2009-2017 SonarSource SA
+ * mailto:info AT sonarsource DOT com
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 3 of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this program; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ */
+package org.sonar.ce.taskprocessor;
+
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListenableScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import javax.annotation.CheckForNull;
+import javax.annotation.Nullable;
+import org.picocontainer.Startable;
+import org.sonar.api.utils.log.Logger;
+import org.sonar.api.utils.log.Loggers;
+import org.sonar.ce.configuration.CeConfiguration;
+
+import static com.google.common.util.concurrent.Futures.addCallback;
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+
+public class CeProcessingSchedulerImpl implements CeProcessingScheduler, Startable {
+ private static final Logger LOG = Loggers.get(CeProcessingSchedulerImpl.class);
+
+ private final CeProcessingSchedulerExecutorService executorService;
+ private final CeWorkerCallable workerRunnable;
+
+ private final long delayBetweenTasks;
+ private final TimeUnit timeUnit;
+ private final ChainingCallback[] chainingCallbacks;
+
+ public CeProcessingSchedulerImpl(CeConfiguration ceConfiguration,
+ CeProcessingSchedulerExecutorService processingExecutorService, CeWorkerCallable workerRunnable) {
+ this.executorService = processingExecutorService;
+ this.workerRunnable = workerRunnable;
+
+ this.delayBetweenTasks = ceConfiguration.getQueuePollingDelay();
+ this.timeUnit = MILLISECONDS;
+
+ int workerCount = ceConfiguration.getWorkerCount();
+ this.chainingCallbacks = new ChainingCallback[workerCount];
+ for (int i = 0; i < workerCount; i++) {
+ chainingCallbacks[i] = new ChainingCallback();
+ }
+ }
+
+ @Override
+ public void start() {
+ // nothing to do at component startup, startScheduling will be called by CeQueueInitializer
+ }
+
+ @Override
+ public void startScheduling() {
+ for (ChainingCallback chainingCallback : chainingCallbacks) {
+ ListenableScheduledFuture<Boolean> future = executorService.schedule(workerRunnable, delayBetweenTasks, timeUnit);
+ addCallback(future, chainingCallback, executorService);
+ }
+ }
+
+ @Override
+ public void stop() {
+ for (ChainingCallback chainingCallback : chainingCallbacks) {
+ chainingCallback.stop();
+ }
+ }
+
+ private class ChainingCallback implements FutureCallback<Boolean> {
+ private final AtomicBoolean keepRunning = new AtomicBoolean(true);
+ @CheckForNull
+ private ListenableFuture<Boolean> workerFuture;
+
+ @Override
+ public void onSuccess(@Nullable Boolean result) {
+ if (result != null && result) {
+ chainWithoutDelay();
+ } else {
+ chainWithDelay();
+ }
+ }
+
+ @Override
+ public void onFailure(Throwable t) {
+ if (t instanceof Error) {
+ LOG.error("Compute Engine execution failed. Scheduled processing interrupted.", t);
+ } else {
+ chainWithoutDelay();
+ }
+ }
+
+ private void chainWithoutDelay() {
+ if (keepRunning()) {
+ workerFuture = executorService.submit(workerRunnable);
+ }
+ addCallback();
+ }
+
+ private void chainWithDelay() {
+ if (keepRunning()) {
+ workerFuture = executorService.schedule(workerRunnable, delayBetweenTasks, timeUnit);
+ }
+ addCallback();
+ }
+
+ private void addCallback() {
+ if (workerFuture != null && keepRunning()) {
+ Futures.addCallback(workerFuture, this, executorService);
+ }
+ }
+
+ private boolean keepRunning() {
+ return keepRunning.get();
+ }
+
+ public void stop() {
+ this.keepRunning.set(false);
+ if (workerFuture != null) {
+ workerFuture.cancel(false);
+ }
+ }
+ }
+}
diff --git a/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeTaskProcessorModule.java b/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeTaskProcessorModule.java
new file mode 100644
index 00000000000..b6f08854b8c
--- /dev/null
+++ b/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeTaskProcessorModule.java
@@ -0,0 +1,33 @@
+/*
+ * SonarQube
+ * Copyright (C) 2009-2017 SonarSource SA
+ * mailto:info AT sonarsource DOT com
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 3 of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this program; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ */
+package org.sonar.ce.taskprocessor;
+
+import org.sonar.core.platform.Module;
+
+public class CeTaskProcessorModule extends Module {
+ @Override
+ protected void configureModule() {
+ add(
+ CeTaskProcessorRepositoryImpl.class,
+ CeWorkerCallableImpl.class,
+ CeProcessingSchedulerExecutorServiceImpl.class,
+ CeProcessingSchedulerImpl.class);
+ }
+}
diff --git a/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeTaskProcessorRepository.java b/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeTaskProcessorRepository.java
new file mode 100644
index 00000000000..7fbbfa3f3f7
--- /dev/null
+++ b/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeTaskProcessorRepository.java
@@ -0,0 +1,34 @@
+/*
+ * SonarQube
+ * Copyright (C) 2009-2017 SonarSource SA
+ * mailto:info AT sonarsource DOT com
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 3 of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this program; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ */
+package org.sonar.ce.taskprocessor;
+
+import com.google.common.base.Optional;
+import org.sonar.ce.queue.CeTask;
+import org.sonar.ce.taskprocessor.CeTaskProcessor;
+
+public interface CeTaskProcessorRepository {
+
+ /**
+ * @throws NullPointerException if the specified {@link CeTask} is {@code null}
+ * @throws IllegalStateException if there is no {@link CeTaskProcessor} for the specified {@link CeTask} in the repository
+ */
+ Optional<CeTaskProcessor> getForCeTask(CeTask ceTask);
+
+}
diff --git a/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeTaskProcessorRepositoryImpl.java b/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeTaskProcessorRepositoryImpl.java
new file mode 100644
index 00000000000..24a2bf07530
--- /dev/null
+++ b/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeTaskProcessorRepositoryImpl.java
@@ -0,0 +1,105 @@
+/*
+ * SonarQube
+ * Copyright (C) 2009-2017 SonarSource SA
+ * mailto:info AT sonarsource DOT com
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 3 of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this program; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ */
+package org.sonar.ce.taskprocessor;
+
+import com.google.common.base.Function;
+import com.google.common.base.Joiner;
+import com.google.common.base.Optional;
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Multimap;
+import java.util.Collection;
+import java.util.Map;
+import javax.annotation.Nonnull;
+import org.sonar.ce.queue.CeTask;
+import org.sonar.ce.taskprocessor.CeTaskProcessor;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.collect.FluentIterable.from;
+import static java.lang.String.CASE_INSENSITIVE_ORDER;
+import static java.lang.String.format;
+
+/**
+ * {@link CeTaskProcessorRepository} implementation which provides access to the {@link CeTaskProcessor} existing in the
+ * PicoContainer the current object belongs to.
+ */
+public class CeTaskProcessorRepositoryImpl implements CeTaskProcessorRepository {
+ private static final Joiner COMMA_JOINER = Joiner.on(", ");
+
+ private final Map<String, CeTaskProcessor> taskProcessorByCeTaskType;
+
+ public CeTaskProcessorRepositoryImpl(CeTaskProcessor[] taskProcessors) {
+ this.taskProcessorByCeTaskType = indexTaskProcessors(taskProcessors);
+ }
+
+ @Override
+ public Optional<CeTaskProcessor> getForCeTask(CeTask ceTask) {
+ return Optional.fromNullable(taskProcessorByCeTaskType.get(ceTask.getType()));
+ }
+
+ private static Map<String, CeTaskProcessor> indexTaskProcessors(CeTaskProcessor[] taskProcessors) {
+ Multimap<String, CeTaskProcessor> permissiveIndex = buildPermissiveCeTaskProcessorIndex(taskProcessors);
+ checkUniqueHandlerPerCeTaskType(permissiveIndex);
+ return ImmutableMap.copyOf(Maps.transformValues(permissiveIndex.asMap(), CeTaskProcessorCollectionToFirstElement.INSTANCE));
+ }
+
+ private static Multimap<String, CeTaskProcessor> buildPermissiveCeTaskProcessorIndex(CeTaskProcessor[] taskProcessors) {
+ Multimap<String, CeTaskProcessor> permissiveIndex = ArrayListMultimap.create(taskProcessors.length, 1);
+ for (CeTaskProcessor taskProcessor : taskProcessors) {
+ for (String ceTaskType : taskProcessor.getHandledCeTaskTypes()) {
+ permissiveIndex.put(ceTaskType, taskProcessor);
+ }
+ }
+ return permissiveIndex;
+ }
+
+ private static void checkUniqueHandlerPerCeTaskType(Multimap<String, CeTaskProcessor> permissiveIndex) {
+ for (Map.Entry<String, Collection<CeTaskProcessor>> entry : permissiveIndex.asMap().entrySet()) {
+ checkArgument(
+ entry.getValue().size() == 1,
+ format(
+ "There can be only one CeTaskProcessor instance registered as the processor for CeTask type %s. " +
+ "More than one found. Please fix your configuration: %s",
+ entry.getKey(),
+ COMMA_JOINER.join(from(entry.getValue()).transform(ToClassName.INSTANCE).toSortedList(CASE_INSENSITIVE_ORDER))));
+ }
+ }
+
+ private enum ToClassName implements Function<Object, String> {
+ INSTANCE;
+
+ @Override
+ @Nonnull
+ public String apply(@Nonnull Object input) {
+ return input.getClass().getName();
+ }
+ }
+
+ private enum CeTaskProcessorCollectionToFirstElement implements Function<Collection<CeTaskProcessor>, CeTaskProcessor> {
+ INSTANCE;
+
+ @Override
+ @Nonnull
+ public CeTaskProcessor apply(@Nonnull Collection<CeTaskProcessor> input) {
+ return input.iterator().next();
+ }
+ }
+}
diff --git a/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeWorkerCallable.java b/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeWorkerCallable.java
new file mode 100644
index 00000000000..2618b37b070
--- /dev/null
+++ b/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeWorkerCallable.java
@@ -0,0 +1,32 @@
+/*
+ * SonarQube
+ * Copyright (C) 2009-2017 SonarSource SA
+ * mailto:info AT sonarsource DOT com
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 3 of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this program; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ */
+package org.sonar.ce.taskprocessor;
+
+import java.util.concurrent.Callable;
+import org.sonar.ce.queue.CeQueue;
+import org.sonar.ce.queue.CeTask;
+
+/**
+ * Marker interface of the runnable in charge of polling the {@link CeQueue} and executing {@link CeTask}.
+ * {@link Callable#call()} returns a Boolean which is {@code true} when some a {@link CeTask} was processed,
+ * {@code false} otherwise.
+ */
+public interface CeWorkerCallable extends Callable<Boolean> {
+}
diff --git a/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeWorkerCallableImpl.java b/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeWorkerCallableImpl.java
new file mode 100644
index 00000000000..5bb5a047068
--- /dev/null
+++ b/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeWorkerCallableImpl.java
@@ -0,0 +1,137 @@
+/*
+ * SonarQube
+ * Copyright (C) 2009-2017 SonarSource SA
+ * mailto:info AT sonarsource DOT com
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 3 of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this program; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ */
+package org.sonar.ce.taskprocessor;
+
+import com.google.common.base.Optional;
+import javax.annotation.Nullable;
+import org.sonar.api.utils.log.Logger;
+import org.sonar.api.utils.log.Loggers;
+import org.sonar.ce.log.CeLogging;
+import org.sonar.ce.queue.CeTask;
+import org.sonar.ce.queue.CeTaskResult;
+import org.sonar.ce.queue.InternalCeQueue;
+import org.sonar.core.util.logs.Profiler;
+import org.sonar.db.ce.CeActivityDto;
+
+import static java.lang.String.format;
+
+public class CeWorkerCallableImpl implements CeWorkerCallable {
+
+ private static final Logger LOG = Loggers.get(CeWorkerCallableImpl.class);
+
+ private final InternalCeQueue queue;
+ private final CeLogging ceLogging;
+ private final CeTaskProcessorRepository taskProcessorRepository;
+
+ public CeWorkerCallableImpl(InternalCeQueue queue, CeLogging ceLogging, CeTaskProcessorRepository taskProcessorRepository) {
+ this.queue = queue;
+ this.ceLogging = ceLogging;
+ this.taskProcessorRepository = taskProcessorRepository;
+ }
+
+ @Override
+ public Boolean call() throws Exception {
+ Optional<CeTask> ceTask = tryAndFindTaskToExecute();
+ if (!ceTask.isPresent()) {
+ return false;
+ }
+
+ try {
+ executeTask(ceTask.get());
+ } catch (Exception e) {
+ LOG.error(format("An error occurred while executing task with uuid '%s'", ceTask.get().getUuid()), e);
+ }
+ return true;
+ }
+
+ private Optional<CeTask> tryAndFindTaskToExecute() {
+ try {
+ return queue.peek("UNKNOWN" /*FIXME provide a real worker uuid*/);
+ } catch (Exception e) {
+ LOG.error("Failed to pop the queue of analysis reports", e);
+ }
+ return Optional.absent();
+ }
+
+ private void executeTask(CeTask task) {
+ ceLogging.initForTask(task);
+ Profiler ceProfiler = startActivityProfiler(task);
+
+ CeActivityDto.Status status = CeActivityDto.Status.FAILED;
+ CeTaskResult taskResult = null;
+ Throwable error = null;
+ try {
+ // TODO delegate the message to the related task processor, according to task type
+ Optional<CeTaskProcessor> taskProcessor = taskProcessorRepository.getForCeTask(task);
+ if (taskProcessor.isPresent()) {
+ taskResult = taskProcessor.get().process(task);
+ status = CeActivityDto.Status.SUCCESS;
+ } else {
+ LOG.error("No CeTaskProcessor is defined for task of type {}. Plugin configuration may have changed", task.getType());
+ status = CeActivityDto.Status.FAILED;
+ }
+ } catch (Throwable e) {
+ LOG.error(format("Failed to execute task %s", task.getUuid()), e);
+ error = e;
+ } finally {
+ finalizeTask(task, ceProfiler, status, taskResult, error);
+ }
+ }
+
+ private void finalizeTask(CeTask task, Profiler ceProfiler, CeActivityDto.Status status,
+ @Nullable CeTaskResult taskResult, @Nullable Throwable error) {
+ try {
+ queue.remove(task, status, taskResult, error);
+ } catch (Exception e) {
+ LOG.error(format("Failed to finalize task with uuid '%s' and persist its state to db", task.getUuid()), e);
+ } finally {
+ stopActivityProfiler(ceProfiler, task, status);
+ ceLogging.clearForTask();
+ }
+ }
+
+ private static Profiler startActivityProfiler(CeTask task) {
+ Profiler profiler = Profiler.create(LOG);
+ addContext(profiler, task);
+ return profiler.startInfo("Execute task");
+ }
+
+ private static void stopActivityProfiler(Profiler profiler, CeTask task, CeActivityDto.Status status) {
+ addContext(profiler, task);
+ if (status == CeActivityDto.Status.FAILED) {
+ profiler.stopError("Executed task");
+ } else {
+ profiler.stopInfo("Executed task");
+ }
+ }
+
+ private static void addContext(Profiler profiler, CeTask task) {
+ profiler
+ .logTimeLast(true)
+ .addContext("project", task.getComponentKey())
+ .addContext("type", task.getType())
+ .addContext("id", task.getUuid());
+ String submitterLogin = task.getSubmitterLogin();
+ if (submitterLogin != null) {
+ profiler.addContext("submitter", submitterLogin);
+ }
+ }
+
+}
diff --git a/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/package-info.java b/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/package-info.java
new file mode 100644
index 00000000000..131bfda3efd
--- /dev/null
+++ b/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/package-info.java
@@ -0,0 +1,23 @@
+/*
+ * SonarQube
+ * Copyright (C) 2009-2017 SonarSource SA
+ * mailto:info AT sonarsource DOT com
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 3 of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this program; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ */
+@ParametersAreNonnullByDefault
+package org.sonar.ce.taskprocessor;
+
+import javax.annotation.ParametersAreNonnullByDefault;
diff --git a/server/sonar-ce/src/test/java/org/sonar/ce/configuration/CeConfigurationRule.java b/server/sonar-ce/src/test/java/org/sonar/ce/configuration/CeConfigurationRule.java
new file mode 100644
index 00000000000..6bdf086d3c7
--- /dev/null
+++ b/server/sonar-ce/src/test/java/org/sonar/ce/configuration/CeConfigurationRule.java
@@ -0,0 +1,53 @@
+/*
+ * SonarQube
+ * Copyright (C) 2009-2017 SonarSource SA
+ * mailto:info AT sonarsource DOT com
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 3 of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this program; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ */
+package org.sonar.ce.configuration;
+
+import org.junit.rules.ExternalResource;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+/**
+ * Mutable implementation of {@link CeConfiguration} as {@link org.junit.Rule}.
+ */
+public class CeConfigurationRule extends ExternalResource implements CeConfiguration {
+ private int workerCount = CeConfigurationImpl.DEFAULT_WORKER_COUNT;
+ private long queuePollingDelay = CeConfigurationImpl.DEFAULT_QUEUE_POLLING_DELAY;
+
+ @Override
+ public int getWorkerCount() {
+ return workerCount;
+ }
+
+ public CeConfigurationRule setWorkerCount(int workerCount) {
+ checkArgument(workerCount >= 1, "worker count must be >= 1");
+ this.workerCount = workerCount;
+ return this;
+ }
+
+ @Override
+ public long getQueuePollingDelay() {
+ return queuePollingDelay;
+ }
+
+ public void setQueuePollingDelay(int queuePollingDelay) {
+ checkArgument(queuePollingDelay > 0, "Queue polling delay must be >= 0");
+ this.queuePollingDelay = queuePollingDelay;
+ }
+}
diff --git a/server/sonar-ce/src/test/java/org/sonar/ce/queue/CeQueueCleanerTest.java b/server/sonar-ce/src/test/java/org/sonar/ce/queue/CeQueueCleanerTest.java
new file mode 100644
index 00000000000..0397af0d4bd
--- /dev/null
+++ b/server/sonar-ce/src/test/java/org/sonar/ce/queue/CeQueueCleanerTest.java
@@ -0,0 +1,101 @@
+/*
+ * SonarQube
+ * Copyright (C) 2009-2017 SonarSource SA
+ * mailto:info AT sonarsource DOT com
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 3 of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this program; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ */
+package org.sonar.ce.queue;
+
+import java.io.IOException;
+import java.util.Optional;
+import org.apache.commons.io.IOUtils;
+import org.junit.Rule;
+import org.junit.Test;
+import org.sonar.api.platform.ServerUpgradeStatus;
+import org.sonar.api.utils.System2;
+import org.sonar.db.DbTester;
+import org.sonar.db.ce.CeQueueDto;
+import org.sonar.db.ce.CeTaskInputDao;
+import org.sonar.db.ce.CeTaskTypes;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public class CeQueueCleanerTest {
+
+ @Rule
+ public DbTester dbTester = DbTester.create(System2.INSTANCE);
+
+ private ServerUpgradeStatus serverUpgradeStatus = mock(ServerUpgradeStatus.class);
+ private InternalCeQueue queue = mock(InternalCeQueue.class);
+ private CeQueueCleaner underTest = new CeQueueCleaner(dbTester.getDbClient(), serverUpgradeStatus, queue);
+
+ @Test
+ public void start_resets_in_progress_tasks_to_pending() throws IOException {
+ insertInQueue("TASK_1", CeQueueDto.Status.PENDING);
+ insertInQueue("TASK_2", CeQueueDto.Status.IN_PROGRESS);
+
+ underTest.start();
+
+ assertThat(dbTester.getDbClient().ceQueueDao().countByStatus(dbTester.getSession(), CeQueueDto.Status.PENDING)).isEqualTo(2);
+ assertThat(dbTester.getDbClient().ceQueueDao().countByStatus(dbTester.getSession(), CeQueueDto.Status.IN_PROGRESS)).isEqualTo(0);
+ }
+
+ @Test
+ public void start_clears_queue_if_version_upgrade() {
+ when(serverUpgradeStatus.isUpgraded()).thenReturn(true);
+
+ underTest.start();
+
+ verify(queue).clear();
+ }
+
+ @Test
+ public void start_deletes_orphan_report_files() throws Exception {
+ // analysis reports are persisted but the associated
+ // task is not in the queue
+ insertInQueue("TASK_1", CeQueueDto.Status.PENDING);
+ insertTaskData("TASK_1");
+ insertTaskData("TASK_2");
+
+ underTest.start();
+
+ CeTaskInputDao dataDao = dbTester.getDbClient().ceTaskInputDao();
+ Optional<CeTaskInputDao.DataStream> task1Data = dataDao.selectData(dbTester.getSession(), "TASK_1");
+ assertThat(task1Data).isPresent();
+ task1Data.get().close();
+
+ assertThat(dataDao.selectData(dbTester.getSession(), "TASK_2")).isNotPresent();
+ }
+
+ private CeQueueDto insertInQueue(String taskUuid, CeQueueDto.Status status) throws IOException {
+ CeQueueDto dto = new CeQueueDto();
+ dto.setTaskType(CeTaskTypes.REPORT);
+ dto.setComponentUuid("PROJECT_1");
+ dto.setUuid(taskUuid);
+ dto.setStatus(status);
+ dbTester.getDbClient().ceQueueDao().insert(dbTester.getSession(), dto);
+ dbTester.getSession().commit();
+ return dto;
+ }
+
+ private void insertTaskData(String taskUuid) throws IOException {
+ dbTester.getDbClient().ceTaskInputDao().insert(dbTester.getSession(), taskUuid, IOUtils.toInputStream("{binary}"));
+ dbTester.getSession().commit();
+ }
+}
diff --git a/server/sonar-ce/src/test/java/org/sonar/ce/queue/CeQueueInitializerTest.java b/server/sonar-ce/src/test/java/org/sonar/ce/queue/CeQueueInitializerTest.java
new file mode 100644
index 00000000000..9595e966583
--- /dev/null
+++ b/server/sonar-ce/src/test/java/org/sonar/ce/queue/CeQueueInitializerTest.java
@@ -0,0 +1,57 @@
+/*
+ * SonarQube
+ * Copyright (C) 2009-2017 SonarSource SA
+ * mailto:info AT sonarsource DOT com
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 3 of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this program; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ */
+package org.sonar.ce.queue;
+
+import java.io.IOException;
+import org.junit.Test;
+import org.sonar.api.platform.Server;
+import org.sonar.ce.taskprocessor.CeProcessingScheduler;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.reset;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyZeroInteractions;
+
+public class CeQueueInitializerTest {
+
+ private Server server = mock(Server.class);
+ private CeProcessingScheduler scheduler = mock(CeProcessingScheduler.class);
+ private CeQueueInitializer underTest = new CeQueueInitializer(scheduler);
+
+ @Test
+ public void clean_queue_then_start_scheduler_of_workers() throws IOException {
+ underTest.onServerStart(server);
+
+ verify(scheduler).startScheduling();
+ }
+
+ @Test
+ public void onServerStart_has_no_effect_if_called_twice_to_support_medium_test_doing_startup_tasks_multiple_times() {
+
+ underTest.onServerStart(server);
+
+ reset(scheduler);
+
+ underTest.onServerStart(server);
+
+ verifyZeroInteractions(scheduler);
+
+ }
+}
diff --git a/server/sonar-ce/src/test/java/org/sonar/ce/queue/InternalCeQueueImplTest.java b/server/sonar-ce/src/test/java/org/sonar/ce/queue/InternalCeQueueImplTest.java
new file mode 100644
index 00000000000..c5a537880b0
--- /dev/null
+++ b/server/sonar-ce/src/test/java/org/sonar/ce/queue/InternalCeQueueImplTest.java
@@ -0,0 +1,550 @@
+/*
+ * SonarQube
+ * Copyright (C) 2009-2017 SonarSource SA
+ * mailto:info AT sonarsource DOT com
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 3 of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this program; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ */
+package org.sonar.ce.queue;
+
+import com.google.common.base.Optional;
+import java.io.ByteArrayOutputStream;
+import java.io.PrintStream;
+import java.util.List;
+import java.util.Random;
+import javax.annotation.Nullable;
+import org.assertj.guava.api.Assertions;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.sonar.api.utils.System2;
+import org.sonar.api.utils.internal.TestSystem2;
+import org.sonar.ce.monitoring.CEQueueStatus;
+import org.sonar.core.util.UuidFactory;
+import org.sonar.core.util.UuidFactoryImpl;
+import org.sonar.db.DbSession;
+import org.sonar.db.DbTester;
+import org.sonar.db.ce.CeActivityDto;
+import org.sonar.db.ce.CeQueueDto;
+import org.sonar.db.ce.CeTaskTypes;
+import org.sonar.db.component.ComponentDto;
+import org.sonar.db.component.ComponentTesting;
+import org.sonar.db.organization.OrganizationDto;
+import org.sonar.server.computation.monitoring.CEQueueStatusImpl;
+import org.sonar.server.organization.DefaultOrganization;
+import org.sonar.server.organization.DefaultOrganizationProvider;
+
+import static java.util.Arrays.asList;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.hamcrest.Matchers.*;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class InternalCeQueueImplTest {
+
+ private static final String AN_ANALYSIS_UUID = "U1";
+ private static final String WORKER_UUID_1 = "worker uuid 1";
+ private static final String WORKER_UUID_2 = "worker uuid 2";
+
+ private System2 system2 = new TestSystem2().setNow(1_450_000_000_000L);
+
+ @Rule
+ public ExpectedException expectedException = ExpectedException.none();
+ @Rule
+ public DbTester dbTester = DbTester.create(system2);
+
+ private DbSession session = dbTester.getSession();
+
+ private UuidFactory uuidFactory = UuidFactoryImpl.INSTANCE;
+ private CEQueueStatus queueStatus = new CEQueueStatusImpl(dbTester.getDbClient());
+ private DefaultOrganizationProvider defaultOrganizationProvider = mock(DefaultOrganizationProvider.class);
+ private InternalCeQueue underTest = new InternalCeQueueImpl(system2, dbTester.getDbClient(), uuidFactory, queueStatus, defaultOrganizationProvider);
+
+ @Before
+ public void setUp() throws Exception {
+ OrganizationDto defaultOrganization = dbTester.getDefaultOrganization();
+ when(defaultOrganizationProvider.get()).thenReturn(DefaultOrganization.newBuilder()
+ .setUuid(defaultOrganization.getUuid())
+ .setKey(defaultOrganization.getKey())
+ .setName(defaultOrganization.getName())
+ .setCreatedAt(defaultOrganization.getCreatedAt())
+ .setUpdatedAt(defaultOrganization.getUpdatedAt())
+ .build());
+ }
+
+ @Test
+ public void submit_returns_task_populated_from_CeTaskSubmit_and_creates_CeQueue_row() {
+ CeTaskSubmit taskSubmit = createTaskSubmit(CeTaskTypes.REPORT, "PROJECT_1", "rob");
+ CeTask task = underTest.submit(taskSubmit);
+
+ verifyCeTask(taskSubmit, task, null);
+ verifyCeQueueDtoForTaskSubmit(taskSubmit);
+ }
+
+ @Test
+ public void submit_populates_component_name_and_key_of_CeTask_if_component_exists() {
+ ComponentDto componentDto = insertComponent(newComponentDto("PROJECT_1"));
+ CeTaskSubmit taskSubmit = createTaskSubmit(CeTaskTypes.REPORT, componentDto.uuid(), null);
+
+ CeTask task = underTest.submit(taskSubmit);
+
+ verifyCeTask(taskSubmit, task, componentDto);
+ }
+
+ @Test
+ public void submit_returns_task_without_component_info_when_submit_has_none() {
+ CeTaskSubmit taskSubmit = createTaskSubmit("not cpt related");
+
+ CeTask task = underTest.submit(taskSubmit);
+
+ verifyCeTask(taskSubmit, task, null);
+ }
+
+ @Test
+ public void submit_fails_with_ISE_if_paused() {
+ underTest.pauseSubmit();
+
+ expectedException.expect(IllegalStateException.class);
+ expectedException.expectMessage("Compute Engine does not currently accept new tasks");
+
+ submit(CeTaskTypes.REPORT, "PROJECT_1");
+ }
+
+ @Test
+ public void massSubmit_returns_tasks_for_each_CeTaskSubmit_populated_from_CeTaskSubmit_and_creates_CeQueue_row_for_each() {
+ CeTaskSubmit taskSubmit1 = createTaskSubmit(CeTaskTypes.REPORT, "PROJECT_1", "rob");
+ CeTaskSubmit taskSubmit2 = createTaskSubmit("some type");
+
+ List<CeTask> tasks = underTest.massSubmit(asList(taskSubmit1, taskSubmit2));
+
+ assertThat(tasks).hasSize(2);
+ verifyCeTask(taskSubmit1, tasks.get(0), null);
+ verifyCeTask(taskSubmit2, tasks.get(1), null);
+ verifyCeQueueDtoForTaskSubmit(taskSubmit1);
+ verifyCeQueueDtoForTaskSubmit(taskSubmit2);
+ }
+
+ @Test
+ public void massSubmit_populates_component_name_and_key_of_CeTask_if_component_exists() {
+ ComponentDto componentDto1 = insertComponent(newComponentDto("PROJECT_1"));
+ CeTaskSubmit taskSubmit1 = createTaskSubmit(CeTaskTypes.REPORT, componentDto1.uuid(), null);
+ CeTaskSubmit taskSubmit2 = createTaskSubmit("something", "non existing component uuid", null);
+
+ List<CeTask> tasks = underTest.massSubmit(asList(taskSubmit1, taskSubmit2));
+
+ assertThat(tasks).hasSize(2);
+ verifyCeTask(taskSubmit1, tasks.get(0), componentDto1);
+ verifyCeTask(taskSubmit2, tasks.get(1), null);
+ }
+
+ @Test
+ public void peek_throws_NPE_if_workerUUid_is_null() {
+ expectedException.expect(NullPointerException.class);
+ expectedException.expectMessage("workerUuid can't be null");
+
+ underTest.peek(null);
+ }
+
+ @Test
+ public void test_remove() {
+ CeTask task = submit(CeTaskTypes.REPORT, "PROJECT_1");
+ Optional<CeTask> peek = underTest.peek(WORKER_UUID_1);
+ underTest.remove(peek.get(), CeActivityDto.Status.SUCCESS, null, null);
+
+ // queue is empty
+ assertThat(dbTester.getDbClient().ceQueueDao().selectByUuid(dbTester.getSession(), task.getUuid()).isPresent()).isFalse();
+ assertThat(underTest.peek(WORKER_UUID_2).isPresent()).isFalse();
+
+ // available in history
+ Optional<CeActivityDto> history = dbTester.getDbClient().ceActivityDao().selectByUuid(dbTester.getSession(), task.getUuid());
+ assertThat(history.isPresent()).isTrue();
+ assertThat(history.get().getStatus()).isEqualTo(CeActivityDto.Status.SUCCESS);
+ assertThat(history.get().getIsLast()).isTrue();
+ assertThat(history.get().getAnalysisUuid()).isNull();
+ }
+
+ @Test
+ public void remove_throws_IAE_if_exception_is_provided_but_status_is_SUCCESS() {
+ expectedException.expect(IllegalArgumentException.class);
+ expectedException.expectMessage("Error can be provided only when status is FAILED");
+
+ underTest.remove(mock(CeTask.class), CeActivityDto.Status.SUCCESS, null, new RuntimeException("Some error"));
+ }
+
+ @Test
+ public void remove_throws_IAE_if_exception_is_provided_but_status_is_CANCELED() {
+ expectedException.expect(IllegalArgumentException.class);
+ expectedException.expectMessage("Error can be provided only when status is FAILED");
+
+ underTest.remove(mock(CeTask.class), CeActivityDto.Status.CANCELED, null, new RuntimeException("Some error"));
+ }
+
+ @Test
+ public void remove_does_not_set_analysisUuid_in_CeActivity_when_CeTaskResult_has_no_analysis_uuid() {
+ CeTask task = submit(CeTaskTypes.REPORT, "PROJECT_1");
+ Optional<CeTask> peek = underTest.peek(WORKER_UUID_1);
+ underTest.remove(peek.get(), CeActivityDto.Status.SUCCESS, newTaskResult(null), null);
+
+ // available in history
+ Optional<CeActivityDto> history = dbTester.getDbClient().ceActivityDao().selectByUuid(dbTester.getSession(), task.getUuid());
+ assertThat(history.isPresent()).isTrue();
+ assertThat(history.get().getAnalysisUuid()).isNull();
+ }
+
+ @Test
+ public void remove_sets_analysisUuid_in_CeActivity_when_CeTaskResult_has_analysis_uuid() {
+ CeTask task = submit(CeTaskTypes.REPORT, "PROJECT_1");
+
+ Optional<CeTask> peek = underTest.peek(WORKER_UUID_2);
+ underTest.remove(peek.get(), CeActivityDto.Status.SUCCESS, newTaskResult(AN_ANALYSIS_UUID), null);
+
+ // available in history
+ Optional<CeActivityDto> history = dbTester.getDbClient().ceActivityDao().selectByUuid(dbTester.getSession(), task.getUuid());
+ assertThat(history.isPresent()).isTrue();
+ assertThat(history.get().getAnalysisUuid()).isEqualTo("U1");
+ }
+
+ @Test
+ public void remove_saves_error_message_and_stacktrace_when_exception_is_provided() {
+ Throwable error = new NullPointerException("Fake NPE to test persistence to DB");
+
+ CeTask task = submit(CeTaskTypes.REPORT, "PROJECT_1");
+ Optional<CeTask> peek = underTest.peek(WORKER_UUID_1);
+ underTest.remove(peek.get(), CeActivityDto.Status.FAILED, null, error);
+
+ Optional<CeActivityDto> activityDto = dbTester.getDbClient().ceActivityDao().selectByUuid(session, task.getUuid());
+ Assertions.assertThat(activityDto).isPresent();
+
+ assertThat(activityDto.get().getErrorMessage()).isEqualTo(error.getMessage());
+ assertThat(activityDto.get().getErrorStacktrace()).isEqualToIgnoringWhitespace(stacktraceToString(error));
+ }
+
+ @Test
+ public void remove_copies_executionCount_and_workerUuid() {
+ dbTester.getDbClient().ceQueueDao().insert(session, new CeQueueDto()
+ .setUuid("uuid")
+ .setTaskType("foo")
+ .setStatus(CeQueueDto.Status.PENDING)
+ .setWorkerUuid("Dustin")
+ .setExecutionCount(2));
+ dbTester.commit();
+
+ underTest.remove(new CeTask.Builder()
+ .setOrganizationUuid("foo")
+ .setUuid("uuid")
+ .setType("bar")
+ .build(), CeActivityDto.Status.SUCCESS, null, null);
+
+ CeActivityDto dto = dbTester.getDbClient().ceActivityDao().selectByUuid(dbTester.getSession(), "uuid").get();
+ assertThat(dto.getExecutionCount()).isEqualTo(2);
+ assertThat(dto.getWorkerUuid()).isEqualTo("Dustin");
+ }
+
+ @Test
+ public void fail_to_remove_if_not_in_queue() throws Exception {
+ CeTask task = submit(CeTaskTypes.REPORT, "PROJECT_1");
+ underTest.remove(task, CeActivityDto.Status.SUCCESS, null, null);
+
+ expectedException.expect(IllegalStateException.class);
+
+ underTest.remove(task, CeActivityDto.Status.SUCCESS, null, null);
+ }
+
+ @Test
+ public void test_peek() throws Exception {
+ CeTask task = submit(CeTaskTypes.REPORT, "PROJECT_1");
+
+ Optional<CeTask> peek = underTest.peek(WORKER_UUID_1);
+ assertThat(peek.isPresent()).isTrue();
+ assertThat(peek.get().getUuid()).isEqualTo(task.getUuid());
+ assertThat(peek.get().getType()).isEqualTo(CeTaskTypes.REPORT);
+ assertThat(peek.get().getComponentUuid()).isEqualTo("PROJECT_1");
+
+ // no more pending tasks
+ peek = underTest.peek(WORKER_UUID_2);
+ assertThat(peek.isPresent()).isFalse();
+ }
+
+ @Test
+ public void peek_overrides_workerUuid_to_argument() {
+ dbTester.getDbClient().ceQueueDao().insert(session, new CeQueueDto()
+ .setUuid("uuid")
+ .setTaskType("foo")
+ .setStatus(CeQueueDto.Status.PENDING)
+ .setWorkerUuid("must be overriden"));
+ dbTester.commit();
+
+ underTest.peek(WORKER_UUID_1);
+
+ CeQueueDto ceQueueDto = dbTester.getDbClient().ceQueueDao().selectByUuid(session, "uuid").get();
+ assertThat(ceQueueDto.getWorkerUuid()).isEqualTo(WORKER_UUID_1);
+ }
+
+ @Test
+ public void peek_nothing_if_paused() throws Exception {
+ submit(CeTaskTypes.REPORT, "PROJECT_1");
+ underTest.pausePeek();
+
+ Optional<CeTask> peek = underTest.peek(WORKER_UUID_1);
+ assertThat(peek.isPresent()).isFalse();
+ }
+
+ @Test
+ public void peek_peeks_pending_tasks_with_executionCount_equal_to_0_and_increases_it() {
+ dbTester.getDbClient().ceQueueDao().insert(session, new CeQueueDto()
+ .setUuid("uuid")
+ .setTaskType("foo")
+ .setStatus(CeQueueDto.Status.PENDING)
+ .setExecutionCount(0));
+ dbTester.commit();
+
+ assertThat(underTest.peek(WORKER_UUID_1).get().getUuid()).isEqualTo("uuid");
+ assertThat(dbTester.getDbClient().ceQueueDao().selectByUuid(session, "uuid").get().getExecutionCount()).isEqualTo(1);
+ }
+
+ @Test
+ public void peek_peeks_pending_tasks_with_executionCount_equal_to_1_and_increases_it() {
+ dbTester.getDbClient().ceQueueDao().insert(session, new CeQueueDto()
+ .setUuid("uuid")
+ .setTaskType("foo")
+ .setStatus(CeQueueDto.Status.PENDING)
+ .setExecutionCount(1));
+ dbTester.commit();
+
+ assertThat(underTest.peek(WORKER_UUID_1).get().getUuid()).isEqualTo("uuid");
+ assertThat(dbTester.getDbClient().ceQueueDao().selectByUuid(session, "uuid").get().getExecutionCount()).isEqualTo(2);
+ }
+
+ @Test
+ public void peek_ignores_pending_tasks_with_executionCount_equal_to_2() {
+ dbTester.getDbClient().ceQueueDao().insert(session, new CeQueueDto()
+ .setUuid("uuid")
+ .setTaskType("foo")
+ .setStatus(CeQueueDto.Status.PENDING)
+ .setExecutionCount(2));
+ dbTester.commit();
+
+ assertThat(underTest.peek(WORKER_UUID_1).isPresent()).isFalse();
+ }
+
+ @Test
+ public void peek_ignores_pending_tasks_with_executionCount_greater_than_2() {
+ dbTester.getDbClient().ceQueueDao().insert(session, new CeQueueDto()
+ .setUuid("uuid")
+ .setTaskType("foo")
+ .setStatus(CeQueueDto.Status.PENDING)
+ .setExecutionCount(2 + Math.abs(new Random().nextInt(100))));
+ dbTester.commit();
+
+ assertThat(underTest.peek(WORKER_UUID_1).isPresent()).isFalse();
+ }
+
+ @Test
+ public void cancel_pending() throws Exception {
+ CeTask task = submit(CeTaskTypes.REPORT, "PROJECT_1");
+
+ // ignore
+ boolean canceled = underTest.cancel("UNKNOWN");
+ assertThat(canceled).isFalse();
+
+ canceled = underTest.cancel(task.getUuid());
+ assertThat(canceled).isTrue();
+ Optional<CeActivityDto> activity = dbTester.getDbClient().ceActivityDao().selectByUuid(dbTester.getSession(), task.getUuid());
+ assertThat(activity.isPresent()).isTrue();
+ assertThat(activity.get().getStatus()).isEqualTo(CeActivityDto.Status.CANCELED);
+ }
+
+ @Test
+ public void cancel_copies_executionCount_and_workerUuid() {
+ dbTester.getDbClient().ceQueueDao().insert(session, new CeQueueDto()
+ .setUuid("uuid")
+ .setTaskType("foo")
+ .setStatus(CeQueueDto.Status.PENDING)
+ .setWorkerUuid("Dustin")
+ .setExecutionCount(2));
+ dbTester.commit();
+
+ underTest.cancel("uuid");
+
+ CeActivityDto dto = dbTester.getDbClient().ceActivityDao().selectByUuid(dbTester.getSession(), "uuid").get();
+ assertThat(dto.getExecutionCount()).isEqualTo(2);
+ assertThat(dto.getWorkerUuid()).isEqualTo("Dustin");
+ }
+
+ @Test
+ public void fail_to_cancel_if_in_progress() throws Exception {
+ expectedException.expect(IllegalStateException.class);
+ expectedException.expectMessage(startsWith("Task is in progress and can't be canceled"));
+
+ CeTask task = submit(CeTaskTypes.REPORT, "PROJECT_1");
+ underTest.peek(WORKER_UUID_2);
+
+ underTest.cancel(task.getUuid());
+ }
+
+ @Test
+ public void cancelAll_pendings_but_not_in_progress() throws Exception {
+ CeTask inProgressTask = submit(CeTaskTypes.REPORT, "PROJECT_1");
+ CeTask pendingTask1 = submit(CeTaskTypes.REPORT, "PROJECT_2");
+ CeTask pendingTask2 = submit(CeTaskTypes.REPORT, "PROJECT_3");
+ underTest.peek(WORKER_UUID_2);
+
+ int canceledCount = underTest.cancelAll();
+ assertThat(canceledCount).isEqualTo(2);
+
+ Optional<CeActivityDto> history = dbTester.getDbClient().ceActivityDao().selectByUuid(dbTester.getSession(), pendingTask1.getUuid());
+ assertThat(history.get().getStatus()).isEqualTo(CeActivityDto.Status.CANCELED);
+ history = dbTester.getDbClient().ceActivityDao().selectByUuid(dbTester.getSession(), pendingTask2.getUuid());
+ assertThat(history.get().getStatus()).isEqualTo(CeActivityDto.Status.CANCELED);
+ history = dbTester.getDbClient().ceActivityDao().selectByUuid(dbTester.getSession(), inProgressTask.getUuid());
+ assertThat(history.isPresent()).isFalse();
+ }
+
+ @Test
+ public void cancelWornOuts_cancels_pending_tasks_with_executionCount_greater_or_equal_to_2() {
+ CeQueueDto u1 = insertCeQueueDto("u1", CeQueueDto.Status.PENDING, 0, "worker1");
+ CeQueueDto u2 = insertCeQueueDto("u2", CeQueueDto.Status.PENDING, 1, "worker1");
+ CeQueueDto u3 = insertCeQueueDto("u3", CeQueueDto.Status.PENDING, 2, "worker1");
+ CeQueueDto u4 = insertCeQueueDto("u4", CeQueueDto.Status.PENDING, 3, "worker1");
+ CeQueueDto u5 = insertCeQueueDto("u5", CeQueueDto.Status.IN_PROGRESS, 0, "worker1");
+ CeQueueDto u6 = insertCeQueueDto("u6", CeQueueDto.Status.IN_PROGRESS, 1, "worker1");
+ CeQueueDto u7 = insertCeQueueDto("u7", CeQueueDto.Status.IN_PROGRESS, 2, "worker1");
+ CeQueueDto u8 = insertCeQueueDto("u8", CeQueueDto.Status.IN_PROGRESS, 3, "worker1");
+
+ underTest.cancelWornOuts();
+
+ verifyUnmodifiedByCancelWornOuts(u1);
+ verifyUnmodifiedByCancelWornOuts(u2);
+ verifyCanceled(u3);
+ verifyCanceled(u4);
+ verifyUnmodifiedByCancelWornOuts(u5);
+ verifyUnmodifiedByCancelWornOuts(u6);
+ verifyUnmodifiedByCancelWornOuts(u7);
+ verifyUnmodifiedByCancelWornOuts(u8);
+ }
+
+ private void verifyUnmodifiedByCancelWornOuts(CeQueueDto original) {
+ CeQueueDto dto = dbTester.getDbClient().ceQueueDao().selectByUuid(dbTester.getSession(), original.getUuid()).get();
+ assertThat(dto.getStatus()).isEqualTo(original.getStatus());
+ assertThat(dto.getExecutionCount()).isEqualTo(original.getExecutionCount());
+ assertThat(dto.getCreatedAt()).isEqualTo(original.getCreatedAt());
+ assertThat(dto.getUpdatedAt()).isEqualTo(original.getUpdatedAt());
+ }
+
+ private void verifyCanceled(CeQueueDto original) {
+ Assertions.assertThat(dbTester.getDbClient().ceQueueDao().selectByUuid(dbTester.getSession(), original.getUuid())).isAbsent();
+ CeActivityDto dto = dbTester.getDbClient().ceActivityDao().selectByUuid(dbTester.getSession(), original.getUuid()).get();
+ assertThat(dto.getStatus()).isEqualTo(CeActivityDto.Status.CANCELED);
+ assertThat(dto.getExecutionCount()).isEqualTo(original.getExecutionCount());
+ assertThat(dto.getWorkerUuid()).isEqualTo(original.getWorkerUuid());
+ }
+
+ private CeQueueDto insertCeQueueDto(String uuid, CeQueueDto.Status status, int executionCount, String workerUuid) {
+ CeQueueDto dto = new CeQueueDto()
+ .setUuid(uuid)
+ .setTaskType("foo")
+ .setStatus(status)
+ .setExecutionCount(executionCount)
+ .setWorkerUuid(workerUuid);
+ dbTester.getDbClient().ceQueueDao().insert(dbTester.getSession(), dto);
+ dbTester.commit();
+ return dto;
+ }
+
+ @Test
+ public void pause_and_resume_submits() throws Exception {
+ assertThat(underTest.isSubmitPaused()).isFalse();
+ underTest.pauseSubmit();
+ assertThat(underTest.isSubmitPaused()).isTrue();
+ underTest.resumeSubmit();
+ assertThat(underTest.isSubmitPaused()).isFalse();
+ }
+
+ @Test
+ public void pause_and_resume_peeks() throws Exception {
+ assertThat(underTest.isPeekPaused()).isFalse();
+ underTest.pausePeek();
+ assertThat(underTest.isPeekPaused()).isTrue();
+ underTest.resumePeek();
+ assertThat(underTest.isPeekPaused()).isFalse();
+ }
+
+ private void verifyCeTask(CeTaskSubmit taskSubmit, CeTask task, @Nullable ComponentDto componentDto) {
+ if (componentDto == null) {
+ assertThat(task.getOrganizationUuid()).isEqualTo(defaultOrganizationProvider.get().getUuid());
+ } else {
+ assertThat(task.getOrganizationUuid()).isEqualTo(componentDto.getOrganizationUuid());
+ }
+ assertThat(task.getUuid()).isEqualTo(taskSubmit.getUuid());
+ assertThat(task.getComponentUuid()).isEqualTo(task.getComponentUuid());
+ assertThat(task.getType()).isEqualTo(taskSubmit.getType());
+ if (componentDto == null) {
+ assertThat(task.getComponentKey()).isNull();
+ assertThat(task.getComponentName()).isNull();
+ } else {
+ assertThat(task.getComponentKey()).isEqualTo(componentDto.key());
+ assertThat(task.getComponentName()).isEqualTo(componentDto.name());
+ }
+ assertThat(task.getSubmitterLogin()).isEqualTo(taskSubmit.getSubmitterLogin());
+ }
+
+ private void verifyCeQueueDtoForTaskSubmit(CeTaskSubmit taskSubmit) {
+ Optional<CeQueueDto> queueDto = dbTester.getDbClient().ceQueueDao().selectByUuid(dbTester.getSession(), taskSubmit.getUuid());
+ assertThat(queueDto.isPresent()).isTrue();
+ assertThat(queueDto.get().getTaskType()).isEqualTo(taskSubmit.getType());
+ assertThat(queueDto.get().getComponentUuid()).isEqualTo(taskSubmit.getComponentUuid());
+ assertThat(queueDto.get().getSubmitterLogin()).isEqualTo(taskSubmit.getSubmitterLogin());
+ assertThat(queueDto.get().getCreatedAt()).isEqualTo(1_450_000_000_000L);
+ }
+
+ private ComponentDto newComponentDto(String uuid) {
+ return ComponentTesting.newProjectDto(dbTester.getDefaultOrganization(), uuid).setName("name_" + uuid).setKey("key_" + uuid);
+ }
+
+ private CeTask submit(String reportType, String componentUuid) {
+ return underTest.submit(createTaskSubmit(reportType, componentUuid, null));
+ }
+
+ private CeTaskSubmit createTaskSubmit(String type) {
+ return createTaskSubmit(type, null, null);
+ }
+
+ private CeTaskSubmit createTaskSubmit(String type, @Nullable String componentUuid, @Nullable String submitterLogin) {
+ CeTaskSubmit.Builder submission = underTest.prepareSubmit();
+ submission.setType(type);
+ submission.setComponentUuid(componentUuid);
+ submission.setSubmitterLogin(submitterLogin);
+ return submission.build();
+ }
+
+ private CeTaskResult newTaskResult(@Nullable String analysisUuid) {
+ CeTaskResult taskResult = mock(CeTaskResult.class);
+ when(taskResult.getAnalysisUuid()).thenReturn(java.util.Optional.ofNullable(analysisUuid));
+ return taskResult;
+ }
+
+ private ComponentDto insertComponent(ComponentDto componentDto) {
+ dbTester.getDbClient().componentDao().insert(session, componentDto);
+ session.commit();
+ return componentDto;
+ }
+
+ private static String stacktraceToString(Throwable error) {
+ ByteArrayOutputStream out = new ByteArrayOutputStream();
+ error.printStackTrace(new PrintStream(out));
+ return out.toString();
+ }
+}
diff --git a/server/sonar-ce/src/test/java/org/sonar/ce/queue/PurgeCeActivitiesTest.java b/server/sonar-ce/src/test/java/org/sonar/ce/queue/PurgeCeActivitiesTest.java
new file mode 100644
index 00000000000..1fd2316eaf5
--- /dev/null
+++ b/server/sonar-ce/src/test/java/org/sonar/ce/queue/PurgeCeActivitiesTest.java
@@ -0,0 +1,66 @@
+/*
+ * SonarQube
+ * Copyright (C) 2009-2017 SonarSource SA
+ * mailto:info AT sonarsource DOT com
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 3 of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this program; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ */
+package org.sonar.ce.queue;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.sonar.api.utils.System2;
+import org.sonar.db.DbTester;
+import org.sonar.db.ce.CeActivityDto;
+import org.sonar.db.ce.CeQueueDto;
+import org.sonar.db.ce.CeTaskTypes;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
+
+public class PurgeCeActivitiesTest {
+
+ private System2 system2 = spy(System2.INSTANCE);
+
+ @Rule
+ public DbTester dbTester = DbTester.create(system2);
+
+ private PurgeCeActivities underTest = new PurgeCeActivities(dbTester.getDbClient(), system2);
+
+ @Test
+ public void delete_older_than_6_months() throws Exception {
+ insertWithDate("VERY_OLD", 1_000_000_000_000L);
+ insertWithDate("RECENT", 1_500_000_000_000L);
+ when(system2.now()).thenReturn(1_500_000_000_100L);
+
+ underTest.start();
+
+ assertThat(dbTester.getDbClient().ceActivityDao().selectByUuid(dbTester.getSession(), "VERY_OLD").isPresent()).isFalse();
+ assertThat(dbTester.getDbClient().ceActivityDao().selectByUuid(dbTester.getSession(), "RECENT").isPresent()).isTrue();
+ }
+
+ private void insertWithDate(String uuid, long date) {
+ CeQueueDto queueDto = new CeQueueDto();
+ queueDto.setUuid(uuid);
+ queueDto.setTaskType(CeTaskTypes.REPORT);
+
+ CeActivityDto dto = new CeActivityDto(queueDto);
+ dto.setStatus(CeActivityDto.Status.SUCCESS);
+ when(system2.now()).thenReturn(date);
+ dbTester.getDbClient().ceActivityDao().insert(dbTester.getSession(), dto);
+ dbTester.getSession().commit();
+ }
+}
diff --git a/server/sonar-ce/src/test/java/org/sonar/ce/taskprocessor/CeProcessingSchedulerImplTest.java b/server/sonar-ce/src/test/java/org/sonar/ce/taskprocessor/CeProcessingSchedulerImplTest.java
new file mode 100644
index 00000000000..d3480186bd2
--- /dev/null
+++ b/server/sonar-ce/src/test/java/org/sonar/ce/taskprocessor/CeProcessingSchedulerImplTest.java
@@ -0,0 +1,542 @@
+/*
+ * SonarQube
+ * Copyright (C) 2009-2017 SonarSource SA
+ * mailto:info AT sonarsource DOT com
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 3 of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this program; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ */
+package org.sonar.ce.taskprocessor;
+
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListenableScheduledFuture;
+import com.google.common.util.concurrent.ListeningScheduledExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Objects;
+import java.util.Queue;
+import java.util.Random;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.Delayed;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.Immutable;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.Timeout;
+import org.sonar.ce.configuration.CeConfigurationRule;
+
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public class CeProcessingSchedulerImplTest {
+ private static final Error ERROR_TO_INTERRUPT_CHAINING = new Error("Error should stop scheduling");
+
+ @Rule
+ // due to risks of infinite chaining of tasks/futures, a timeout is required for safety
+ public Timeout timeout = Timeout.seconds(60);
+ @Rule
+ public CeConfigurationRule ceConfiguration = new CeConfigurationRule();
+
+ private CeWorkerCallable ceWorkerRunnable = mock(CeWorkerCallable.class);
+ private StubCeProcessingSchedulerExecutorService processingExecutorService = new StubCeProcessingSchedulerExecutorService();
+ private SchedulerCall regularDelayedPoll = new SchedulerCall(ceWorkerRunnable, 2000L, TimeUnit.MILLISECONDS);
+ private SchedulerCall notDelayedPoll = new SchedulerCall(ceWorkerRunnable);
+
+ private CeProcessingSchedulerImpl underTest = new CeProcessingSchedulerImpl(ceConfiguration, processingExecutorService, ceWorkerRunnable);
+
+ @Test
+ public void polls_without_delay_when_CeWorkerCallable_returns_true() throws Exception {
+ when(ceWorkerRunnable.call())
+ .thenReturn(true)
+ .thenThrow(ERROR_TO_INTERRUPT_CHAINING);
+
+ startSchedulingAndRun();
+
+ assertThat(processingExecutorService.getSchedulerCalls()).containsOnly(
+ regularDelayedPoll,
+ notDelayedPoll
+ );
+ }
+
+ @Test
+ public void polls_without_delay_when_CeWorkerCallable_throws_Exception_but_not_Error() throws Exception {
+ when(ceWorkerRunnable.call())
+ .thenThrow(new Exception("Exception is followed by a poll without delay"))
+ .thenThrow(ERROR_TO_INTERRUPT_CHAINING);
+
+ startSchedulingAndRun();
+
+ assertThat(processingExecutorService.getSchedulerCalls()).containsExactly(
+ regularDelayedPoll,
+ notDelayedPoll
+ );
+ }
+
+ @Test
+ public void polls_with_regular_delay_when_CeWorkerCallable_returns_false() throws Exception {
+ when(ceWorkerRunnable.call())
+ .thenReturn(false)
+ .thenThrow(ERROR_TO_INTERRUPT_CHAINING);
+
+ startSchedulingAndRun();
+
+ assertThat(processingExecutorService.getSchedulerCalls()).containsExactly(
+ regularDelayedPoll,
+ regularDelayedPoll
+ );
+ }
+
+ @Test
+ public void startScheduling_schedules_CeWorkerCallable_at_fixed_rate_run_head_of_queue() throws Exception {
+ when(ceWorkerRunnable.call())
+ .thenReturn(true)
+ .thenReturn(true)
+ .thenReturn(false)
+ .thenReturn(true)
+ .thenReturn(false)
+ .thenThrow(new Exception("IAE should not cause scheduling to stop"))
+ .thenReturn(false)
+ .thenReturn(false)
+ .thenReturn(false)
+ .thenThrow(ERROR_TO_INTERRUPT_CHAINING);
+
+ startSchedulingAndRun();
+
+ assertThat(processingExecutorService.getSchedulerCalls()).containsExactly(
+ regularDelayedPoll,
+ notDelayedPoll,
+ notDelayedPoll,
+ regularDelayedPoll,
+ notDelayedPoll,
+ regularDelayedPoll,
+ notDelayedPoll,
+ regularDelayedPoll,
+ regularDelayedPoll,
+ regularDelayedPoll
+ );
+ }
+
+ @Test
+ public void stop_cancels_next_polling_and_does_not_add_any_new_one() throws Exception {
+ when(ceWorkerRunnable.call())
+ .thenReturn(false)
+ .thenReturn(true)
+ .thenReturn(false)
+ .thenReturn(false)
+ .thenReturn(false)
+ .thenReturn(false)
+ .thenReturn(false)
+ .thenThrow(ERROR_TO_INTERRUPT_CHAINING);
+
+ underTest.startScheduling();
+
+ int cancelledTaskFutureCount = 0;
+ int i = 0;
+ while (processingExecutorService.futures.peek() != null) {
+ Future<?> future = processingExecutorService.futures.poll();
+ if (future.isCancelled()) {
+ cancelledTaskFutureCount++;
+ } else {
+ future.get();
+ }
+ // call stop after second delayed polling
+ if (i == 1) {
+ underTest.stop();
+ }
+ i++;
+ }
+
+ assertThat(cancelledTaskFutureCount).isEqualTo(1);
+ assertThat(processingExecutorService.getSchedulerCalls()).containsExactly(
+ regularDelayedPoll,
+ regularDelayedPoll,
+ notDelayedPoll,
+ regularDelayedPoll
+ );
+ }
+
+ @Test
+ public void when_workerCount_is_more_than_1_as_many_CeWorkerCallable_are_scheduled() throws InterruptedException {
+ int workerCount = Math.abs(new Random().nextInt(10)) + 1;
+
+ ceConfiguration.setWorkerCount(workerCount);
+
+ ListenableScheduledFuture listenableScheduledFuture = mock(ListenableScheduledFuture.class);
+ CeProcessingSchedulerExecutorService processingExecutorService = mock(CeProcessingSchedulerExecutorService.class);
+ CeProcessingSchedulerImpl underTest = new CeProcessingSchedulerImpl(ceConfiguration, processingExecutorService, ceWorkerRunnable);
+ when(processingExecutorService.schedule(ceWorkerRunnable, ceConfiguration.getQueuePollingDelay(), MILLISECONDS))
+ .thenReturn(listenableScheduledFuture);
+
+ underTest.startScheduling();
+
+ verify(processingExecutorService, times(workerCount)).schedule(ceWorkerRunnable, ceConfiguration.getQueuePollingDelay(), MILLISECONDS);
+ verify(listenableScheduledFuture, times(workerCount)).addListener(any(Runnable.class), eq(processingExecutorService));
+ }
+
+ private void startSchedulingAndRun() throws ExecutionException, InterruptedException {
+ underTest.startScheduling();
+
+ // execute future synchronously
+ processingExecutorService.runFutures();
+ }
+
+ /**
+ * A synchronous implementation of {@link CeProcessingSchedulerExecutorService} which exposes a synchronous
+ * method to execute futures it creates and exposes a method to retrieve logs of calls to
+ * {@link CeProcessingSchedulerExecutorService#schedule(Callable, long, TimeUnit)} which is used by
+ * {@link CeProcessingSchedulerImpl}.
+ */
+ private static class StubCeProcessingSchedulerExecutorService implements CeProcessingSchedulerExecutorService {
+
+ private final Queue<Future<?>> futures = new ConcurrentLinkedQueue<>();
+ private final ListeningScheduledExecutorService delegate = MoreExecutors.listeningDecorator(new SynchronousStubExecutorService());
+
+ private final List<SchedulerCall> schedulerCalls = new ArrayList<>();
+
+ public List<SchedulerCall> getSchedulerCalls() {
+ return schedulerCalls;
+ }
+
+ public void runFutures() throws ExecutionException, InterruptedException {
+ while (futures.peek() != null) {
+ Future<?> future = futures.poll();
+ if (!future.isCancelled()) {
+ future.get();
+ }
+ }
+ }
+
+ @Override
+ public <V> ListenableScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
+ this.schedulerCalls.add(new SchedulerCall(callable, delay, unit));
+ return delegate.schedule(callable, delay, unit);
+ }
+
+ @Override
+ public <T> ListenableFuture<T> submit(Callable<T> task) {
+ this.schedulerCalls.add(new SchedulerCall(task));
+ return delegate.submit(task);
+ }
+
+ @Override
+ public void stop() {
+ throw new UnsupportedOperationException("stop() not implemented");
+ }
+
+ // ////////////// delegated methods ////////////////
+
+ @Override
+ public ListenableScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
+ return delegate.schedule(command, delay, unit);
+ }
+
+ @Override
+ public ListenableScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) {
+ return delegate.scheduleAtFixedRate(command, initialDelay, period, unit);
+ }
+
+ @Override
+ public ListenableScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) {
+ return delegate.scheduleWithFixedDelay(command, initialDelay, delay, unit);
+ }
+
+ @Override
+ public void shutdown() {
+ delegate.shutdown();
+ }
+
+ @Override
+ public List<Runnable> shutdownNow() {
+ return delegate.shutdownNow();
+ }
+
+ @Override
+ public boolean isShutdown() {
+ return delegate.isShutdown();
+ }
+
+ @Override
+ public boolean isTerminated() {
+ return delegate.isTerminated();
+ }
+
+ @Override
+ public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
+ return delegate.awaitTermination(timeout, unit);
+ }
+
+ @Override
+ public <T> ListenableFuture<T> submit(Runnable task, T result) {
+ return delegate.submit(task, result);
+ }
+
+ @Override
+ public ListenableFuture<?> submit(Runnable task) {
+ return delegate.submit(task);
+ }
+
+ @Override
+ public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException {
+ return delegate.invokeAll(tasks);
+ }
+
+ @Override
+ public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException {
+ return delegate.invokeAll(tasks, timeout, unit);
+ }
+
+ @Override
+ public <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException {
+ return delegate.invokeAny(tasks);
+ }
+
+ @Override
+ public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
+ return delegate.invokeAny(tasks, timeout, unit);
+ }
+
+ @Override
+ public void execute(Runnable command) {
+ delegate.execute(command);
+ }
+
+ /**
+ * A partial (only 3 methods) implementation of ScheduledExecutorService which stores futures it creates into
+ * {@link StubCeProcessingSchedulerExecutorService#futures}.
+ */
+ private class SynchronousStubExecutorService implements ScheduledExecutorService {
+ @Override
+ public ScheduledFuture<?> schedule(final Runnable command, long delay, TimeUnit unit) {
+ ScheduledFuture<Void> res = new AbstractPartiallyImplementedScheduledFuture<Void>() {
+ @Override
+ public Void get() throws InterruptedException, ExecutionException {
+ command.run();
+ return null;
+ }
+ };
+ futures.add(res);
+ return res;
+ }
+
+ @Override
+ public <V> ScheduledFuture<V> schedule(final Callable<V> callable, long delay, TimeUnit unit) {
+ ScheduledFuture<V> res = new AbstractPartiallyImplementedScheduledFuture<V>() {
+
+ @Override
+ public V get() throws InterruptedException, ExecutionException {
+ try {
+ return callable.call();
+ } catch (Exception e) {
+ throw new ExecutionException(e);
+ }
+ }
+ };
+ futures.add(res);
+ return res;
+ }
+
+ @Override
+ public <T> Future<T> submit(final Callable<T> task) {
+ Future<T> res = new AbstractPartiallyImplementedFuture<T>() {
+
+ @Override
+ public T get() throws InterruptedException, ExecutionException {
+ try {
+ return task.call();
+ } catch (Exception e) {
+ throw new ExecutionException(e);
+ }
+ }
+
+ };
+ futures.add(res);
+ return res;
+ }
+
+ @Override
+ public void execute(Runnable command) {
+ command.run();
+ }
+
+ // ///////// unsupported operations ///////////
+
+ @Override
+ public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) {
+ throw new UnsupportedOperationException("scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) not implemented");
+ }
+
+ @Override
+ public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) {
+ throw new UnsupportedOperationException("scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) not implemented");
+ }
+
+ @Override
+ public void shutdown() {
+ throw new UnsupportedOperationException("shutdown() not implemented");
+ }
+
+ @Override
+ public List<Runnable> shutdownNow() {
+ throw new UnsupportedOperationException("shutdownNow() not implemented");
+ }
+
+ @Override
+ public boolean isShutdown() {
+ throw new UnsupportedOperationException("isShutdown() not implemented");
+ }
+
+ @Override
+ public boolean isTerminated() {
+ throw new UnsupportedOperationException("isTerminated() not implemented");
+ }
+
+ @Override
+ public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
+ throw new UnsupportedOperationException("awaitTermination(long timeout, TimeUnit unit) not implemented");
+ }
+
+ @Override
+ public <T> Future<T> submit(Runnable task, T result) {
+ throw new UnsupportedOperationException("submit(Runnable task, T result) not implemented");
+ }
+
+ @Override
+ public Future<?> submit(Runnable task) {
+ throw new UnsupportedOperationException("submit(Runnable task) not implemented");
+ }
+
+ @Override
+ public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException {
+ throw new UnsupportedOperationException("invokeAll(Collection<? extends Callable<T>> tasks) not implemented");
+ }
+
+ @Override
+ public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException {
+ throw new UnsupportedOperationException("invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) not implemented");
+ }
+
+ @Override
+ public <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException {
+ throw new UnsupportedOperationException("invokeAny(Collection<? extends Callable<T>> tasks) not implemented");
+ }
+
+ @Override
+ public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
+ throw new UnsupportedOperationException("invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) not implemented");
+ }
+ }
+ }
+
+ private static abstract class AbstractPartiallyImplementedScheduledFuture<V> extends AbstractPartiallyImplementedFuture<V> implements ScheduledFuture<V> {
+ @Override
+ public long getDelay(TimeUnit unit) {
+ throw new UnsupportedOperationException("getDelay(TimeUnit unit) not implemented");
+ }
+
+ @Override
+ public int compareTo(Delayed o) {
+ throw new UnsupportedOperationException("compareTo(Delayed o) not implemented");
+ }
+
+ }
+
+ private static abstract class AbstractPartiallyImplementedFuture<T> implements Future<T> {
+ private boolean cancelled = false;
+
+ @Override
+ public boolean cancel(boolean mayInterruptIfRunning) {
+ this.cancelled = true;
+ return true;
+ }
+
+ @Override
+ public boolean isCancelled() {
+ return this.cancelled;
+ }
+
+ @Override
+ public boolean isDone() {
+ throw new UnsupportedOperationException("isDone() not implemented");
+ }
+
+ @Override
+ public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
+ throw new UnsupportedOperationException("get(long timeout, TimeUnit unit) not implemented");
+ }
+ }
+
+ /**
+ * Used to log parameters of calls to {@link CeProcessingSchedulerExecutorService#schedule(Callable, long, TimeUnit)}
+ */
+ @Immutable
+ private static final class SchedulerCall {
+ private final Callable<?> callable;
+ private final long delay;
+ private final TimeUnit unit;
+
+ private SchedulerCall(Callable<?> callable, long delay, TimeUnit unit) {
+ this.callable = callable;
+ this.delay = delay;
+ this.unit = unit;
+ }
+
+ private SchedulerCall(Callable<?> callable) {
+ this.callable = callable;
+ this.delay = -63366;
+ this.unit = TimeUnit.NANOSECONDS;
+ }
+
+ @Override
+ public boolean equals(@Nullable Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ SchedulerCall that = (SchedulerCall) o;
+ return delay == that.delay && callable == that.callable && unit.equals(that.unit);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(callable, delay, unit);
+ }
+
+ @Override
+ public String toString() {
+ return "SchedulerCall{" +
+ "callable=" + callable +
+ ", delay=" + delay +
+ ", unit=" + unit +
+ '}';
+ }
+ }
+
+}
diff --git a/server/sonar-ce/src/test/java/org/sonar/ce/taskprocessor/CeTaskProcessorRepositoryImplTest.java b/server/sonar-ce/src/test/java/org/sonar/ce/taskprocessor/CeTaskProcessorRepositoryImplTest.java
new file mode 100644
index 00000000000..743f51e169d
--- /dev/null
+++ b/server/sonar-ce/src/test/java/org/sonar/ce/taskprocessor/CeTaskProcessorRepositoryImplTest.java
@@ -0,0 +1,148 @@
+/*
+ * SonarQube
+ * Copyright (C) 2009-2017 SonarSource SA
+ * mailto:info AT sonarsource DOT com
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 3 of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this program; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ */
+package org.sonar.ce.taskprocessor;
+
+import com.google.common.collect.ImmutableSet;
+import java.util.Set;
+import org.assertj.guava.api.Assertions;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.sonar.ce.queue.CeTask;
+import org.sonar.ce.queue.CeTaskResult;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+
+public class CeTaskProcessorRepositoryImplTest {
+ private static final String SOME_CE_TASK_TYPE = "some type";
+ private static final String SOME_COMPONENT_KEY = "key";
+
+ @Rule
+ public ExpectedException expectedException = ExpectedException.none();
+
+ @Test
+ public void constructor_accepts_empty_array_argument() {
+ new CeTaskProcessorRepositoryImpl(new CeTaskProcessor[] {});
+ }
+
+ @Test
+ public void constructor_throws_IAE_if_two_TaskProcessor_handle_the_same_CeTask_type() {
+ expectedException.expect(IllegalArgumentException.class);
+ expectedException.expectMessage(
+ "There can be only one CeTaskProcessor instance registered as the processor for CeTask type " + SOME_CE_TASK_TYPE + ". " +
+ "More than one found. Please fix your configuration: " + SomeProcessor1.class.getName() + ", " + SomeProcessor2.class.getName());
+
+ new CeTaskProcessorRepositoryImpl(new CeTaskProcessor[] {
+ new SomeProcessor1(SOME_CE_TASK_TYPE),
+ new SomeProcessor2(SOME_CE_TASK_TYPE)
+ });
+ }
+
+ @Test
+ public void constructor_throws_IAE_if_multiple_TaskProcessor_overlap_their_supported_CeTask_type() {
+ expectedException.expect(IllegalArgumentException.class);
+ expectedException.expectMessage(
+ "There can be only one CeTaskProcessor instance registered as the processor for CeTask type " + SOME_CE_TASK_TYPE + ". " +
+ "More than one found. Please fix your configuration: " + SomeProcessor1.class.getName() + ", " + SomeProcessor2.class.getName());
+
+ new CeTaskProcessorRepositoryImpl(new CeTaskProcessor[] {
+ new SomeProcessor2(SOME_CE_TASK_TYPE + "_2", SOME_CE_TASK_TYPE),
+ new SomeProcessor1(SOME_CE_TASK_TYPE, SOME_CE_TASK_TYPE + "_3")
+ });
+ }
+
+ @Test
+ public void getForTask_returns_absent_if_repository_is_empty() {
+ CeTaskProcessorRepositoryImpl underTest = new CeTaskProcessorRepositoryImpl(new CeTaskProcessor[] {});
+
+ Assertions.assertThat(underTest.getForCeTask(createCeTask(SOME_CE_TASK_TYPE, SOME_COMPONENT_KEY))).isAbsent();
+ }
+
+ @Test
+ public void getForTask_returns_absent_if_repository_does_not_contain_matching_TaskProcessor() {
+ CeTaskProcessorRepositoryImpl underTest = new CeTaskProcessorRepositoryImpl(new CeTaskProcessor[] {
+ createCeTaskProcessor(SOME_CE_TASK_TYPE + "_1"),
+ createCeTaskProcessor(SOME_CE_TASK_TYPE + "_2", SOME_CE_TASK_TYPE + "_3"),
+ });
+
+ Assertions.assertThat(underTest.getForCeTask(createCeTask(SOME_CE_TASK_TYPE, SOME_COMPONENT_KEY))).isAbsent();
+ }
+
+ @Test
+ public void getForTask_returns_TaskProcessor_based_on_CeTask_type_only() {
+ CeTaskProcessor taskProcessor = createCeTaskProcessor(SOME_CE_TASK_TYPE);
+ CeTaskProcessorRepositoryImpl underTest = new CeTaskProcessorRepositoryImpl(new CeTaskProcessor[] {taskProcessor});
+
+ assertThat(underTest.getForCeTask(createCeTask(SOME_CE_TASK_TYPE, SOME_COMPONENT_KEY)).get()).isSameAs(taskProcessor);
+ assertThat(underTest.getForCeTask(createCeTask(SOME_CE_TASK_TYPE, SOME_COMPONENT_KEY + "2")).get()).isSameAs(taskProcessor);
+ }
+
+ @Test
+ public void getForTask_returns_TaskProcessor_even_if_it_is_not_specific() {
+ CeTaskProcessor taskProcessor = createCeTaskProcessor(SOME_CE_TASK_TYPE + "_1", SOME_CE_TASK_TYPE, SOME_CE_TASK_TYPE + "_3");
+ CeTaskProcessorRepositoryImpl underTest = new CeTaskProcessorRepositoryImpl(new CeTaskProcessor[] {taskProcessor});
+
+ assertThat(underTest.getForCeTask(createCeTask(SOME_CE_TASK_TYPE, SOME_COMPONENT_KEY)).get()).isSameAs(taskProcessor);
+ }
+
+ private CeTaskProcessor createCeTaskProcessor(final String... ceTaskTypes) {
+ return new HandleTypeOnlyTaskProcessor(ceTaskTypes);
+ }
+
+ private static CeTask createCeTask(String ceTaskType, String key) {
+ return new CeTask.Builder()
+ .setOrganizationUuid("org1")
+ .setType(ceTaskType)
+ .setUuid("task_uuid_" + key)
+ .setComponentKey(key).setComponentUuid("uuid_" + key).setComponentName("name_" + key)
+ .build();
+ }
+
+ private static class HandleTypeOnlyTaskProcessor implements CeTaskProcessor {
+ private final String[] ceTaskTypes;
+
+ public HandleTypeOnlyTaskProcessor(String... ceTaskTypes) {
+ this.ceTaskTypes = ceTaskTypes;
+ }
+
+ @Override
+ public Set<String> getHandledCeTaskTypes() {
+ return ImmutableSet.copyOf(ceTaskTypes);
+ }
+
+ @Override
+ public CeTaskResult process(CeTask task) {
+ throw new UnsupportedOperationException("Process is not implemented");
+ }
+ }
+
+ private static class SomeProcessor1 extends HandleTypeOnlyTaskProcessor {
+ public SomeProcessor1(String... ceTaskTypes) {
+ super(ceTaskTypes);
+ }
+ }
+
+ private static class SomeProcessor2 extends HandleTypeOnlyTaskProcessor {
+ public SomeProcessor2(String... ceTaskTypes) {
+ super(ceTaskTypes);
+ }
+ }
+}
diff --git a/server/sonar-ce/src/test/java/org/sonar/ce/taskprocessor/CeTaskProcessorRepositoryRule.java b/server/sonar-ce/src/test/java/org/sonar/ce/taskprocessor/CeTaskProcessorRepositoryRule.java
new file mode 100644
index 00000000000..7a07b5e8b8e
--- /dev/null
+++ b/server/sonar-ce/src/test/java/org/sonar/ce/taskprocessor/CeTaskProcessorRepositoryRule.java
@@ -0,0 +1,78 @@
+/*
+ * SonarQube
+ * Copyright (C) 2009-2017 SonarSource SA
+ * mailto:info AT sonarsource DOT com
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 3 of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this program; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ */
+package org.sonar.ce.taskprocessor;
+
+import com.google.common.base.Optional;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import org.junit.rules.ExternalResource;
+import org.sonar.ce.queue.CeTask;
+import org.sonar.ce.queue.CeTaskResult;
+
+import static com.google.common.base.Preconditions.checkState;
+import static java.util.Objects.requireNonNull;
+
+/**
+ * A {@link org.junit.Rule} that implements the {@link CeTaskProcessorRepository} interface and
+ * requires consumer to explicitly define if a specific Task type has an associated {@link CeTaskProcessor} or not.
+ */
+public class CeTaskProcessorRepositoryRule extends ExternalResource implements CeTaskProcessorRepository {
+
+ private final Map<String, CeTaskProcessor> index = new HashMap<>();
+
+ @Override
+ protected void after() {
+ index.clear();
+ }
+
+ public CeTaskProcessorRepositoryRule setNoProcessorForTask(String taskType) {
+ index.put(requireNonNull(taskType), NoCeTaskProcessor.INSTANCE);
+ return this;
+ }
+
+ public CeTaskProcessorRepositoryRule setProcessorForTask(String taskType, CeTaskProcessor taskProcessor) {
+ index.put(requireNonNull(taskType), requireNonNull(taskProcessor));
+ return this;
+ }
+
+ @Override
+ public Optional<CeTaskProcessor> getForCeTask(CeTask ceTask) {
+ CeTaskProcessor taskProcessor = index.get(ceTask.getType());
+ checkState(taskProcessor != null, "CeTaskProcessor was not set in rule for task %s", ceTask);
+ return taskProcessor instanceof NoCeTaskProcessor ? Optional.<CeTaskProcessor>absent() : Optional.of(taskProcessor);
+ }
+
+ private enum NoCeTaskProcessor implements CeTaskProcessor {
+ INSTANCE;
+
+ private static final String UOE_MESSAGE = "NoCeTaskProcessor does not implement any method since it not supposed to be ever used";
+
+ @Override
+ public Set<String> getHandledCeTaskTypes() {
+ throw new UnsupportedOperationException(UOE_MESSAGE);
+ }
+
+ @Override
+ public CeTaskResult process(CeTask task) {
+ throw new UnsupportedOperationException(UOE_MESSAGE);
+ }
+ }
+}
diff --git a/server/sonar-ce/src/test/java/org/sonar/ce/taskprocessor/CeWorkerCallableImplTest.java b/server/sonar-ce/src/test/java/org/sonar/ce/taskprocessor/CeWorkerCallableImplTest.java
new file mode 100644
index 00000000000..b63a9e0dacc
--- /dev/null
+++ b/server/sonar-ce/src/test/java/org/sonar/ce/taskprocessor/CeWorkerCallableImplTest.java
@@ -0,0 +1,230 @@
+/*
+ * SonarQube
+ * Copyright (C) 2009-2017 SonarSource SA
+ * mailto:info AT sonarsource DOT com
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 3 of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this program; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ */
+package org.sonar.ce.taskprocessor;
+
+import com.google.common.base.Optional;
+import java.util.List;
+import javax.annotation.Nullable;
+import org.junit.Rule;
+import org.junit.Test;
+import org.mockito.InOrder;
+import org.mockito.Mockito;
+import org.sonar.api.utils.log.LogTester;
+import org.sonar.api.utils.log.LoggerLevel;
+import org.sonar.ce.log.CeLogging;
+import org.sonar.ce.queue.CeTask;
+import org.sonar.ce.queue.InternalCeQueue;
+import org.sonar.db.ce.CeActivityDto;
+import org.sonar.db.ce.CeTaskTypes;
+import org.sonar.server.computation.task.projectanalysis.taskprocessor.ReportTaskProcessor;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verifyZeroInteractions;
+import static org.mockito.Mockito.when;
+
+public class CeWorkerCallableImplTest {
+
+ private static final String UNKNOWN_WORKER_UUID = "UNKNOWN";
+
+ @Rule
+ public CeTaskProcessorRepositoryRule taskProcessorRepository = new CeTaskProcessorRepositoryRule();
+ @Rule
+ public LogTester logTester = new LogTester();
+
+ private InternalCeQueue queue = mock(InternalCeQueue.class);
+ private ReportTaskProcessor taskProcessor = mock(ReportTaskProcessor.class);
+ private CeLogging ceLogging = spy(CeLogging.class);
+ private CeWorkerCallable underTest = new CeWorkerCallableImpl(queue, ceLogging, taskProcessorRepository);
+ private InOrder inOrder = Mockito.inOrder(ceLogging, taskProcessor, queue);
+
+ @Test
+ public void no_pending_tasks_in_queue() throws Exception {
+ when(queue.peek(UNKNOWN_WORKER_UUID)).thenReturn(Optional.<CeTask>absent());
+
+ assertThat(underTest.call()).isFalse();
+
+ verifyZeroInteractions(taskProcessor, ceLogging);
+ }
+
+ @Test
+ public void fail_when_no_CeTaskProcessor_is_found_in_repository() throws Exception {
+ CeTask task = createCeTask(null);
+ taskProcessorRepository.setNoProcessorForTask(CeTaskTypes.REPORT);
+ when(queue.peek(UNKNOWN_WORKER_UUID)).thenReturn(Optional.of(task));
+
+ assertThat(underTest.call()).isTrue();
+
+ inOrder.verify(ceLogging).initForTask(task);
+ inOrder.verify(queue).remove(task, CeActivityDto.Status.FAILED, null, null);
+ inOrder.verify(ceLogging).clearForTask();
+ }
+
+ @Test
+ public void peek_and_process_task() throws Exception {
+ CeTask task = createCeTask(null);
+ taskProcessorRepository.setProcessorForTask(task.getType(), taskProcessor);
+ when(queue.peek(UNKNOWN_WORKER_UUID)).thenReturn(Optional.of(task));
+
+ assertThat(underTest.call()).isTrue();
+
+ inOrder.verify(ceLogging).initForTask(task);
+ inOrder.verify(taskProcessor).process(task);
+ inOrder.verify(queue).remove(task, CeActivityDto.Status.SUCCESS, null, null);
+ inOrder.verify(ceLogging).clearForTask();
+ }
+
+ @Test
+ public void fail_to_process_task() throws Exception {
+ CeTask task = createCeTask(null);
+ when(queue.peek(UNKNOWN_WORKER_UUID)).thenReturn(Optional.of(task));
+ taskProcessorRepository.setProcessorForTask(task.getType(), taskProcessor);
+ Throwable error = makeTaskProcessorFail(task);
+
+ assertThat(underTest.call()).isTrue();
+
+ inOrder.verify(ceLogging).initForTask(task);
+ inOrder.verify(taskProcessor).process(task);
+ inOrder.verify(queue).remove(task, CeActivityDto.Status.FAILED, null, error);
+ inOrder.verify(ceLogging).clearForTask();
+ }
+
+ @Test
+ public void do_not_display_submitter_param_in_log_when_submitterLogin_is_not_set_in_case_of_success() throws Exception {
+ when(queue.peek(UNKNOWN_WORKER_UUID)).thenReturn(Optional.of(createCeTask(null)));
+ taskProcessorRepository.setProcessorForTask(CeTaskTypes.REPORT, taskProcessor);
+
+ underTest.call();
+
+ List<String> logs = logTester.logs(LoggerLevel.INFO);
+ assertThat(logs).hasSize(2);
+ for (int i = 0; i < 2; i++) {
+ assertThat(logs.get(i)).doesNotContain(" | submitter=");
+ }
+ }
+
+ @Test
+ public void do_not_display_submitter_param_in_log_when_submitterLogin_is_not_set_in_case_of_error() throws Exception {
+ CeTask ceTask = createCeTask(null);
+ when(queue.peek(UNKNOWN_WORKER_UUID)).thenReturn(Optional.of(ceTask));
+ taskProcessorRepository.setProcessorForTask(ceTask.getType(), taskProcessor);
+ makeTaskProcessorFail(ceTask);
+
+ underTest.call();
+
+ List<String> logs = logTester.logs(LoggerLevel.INFO);
+ assertThat(logs).hasSize(1);
+ assertThat(logs.get(0)).doesNotContain(" | submitter=");
+ logs = logTester.logs(LoggerLevel.ERROR);
+ assertThat(logs).hasSize(2);
+ for (int i = 0; i < 2; i++) {
+ assertThat(logs.get(i)).doesNotContain(" | submitter=");
+ }
+ assertThat(logTester.logs(LoggerLevel.DEBUG)).isEmpty();
+ }
+
+ @Test
+ public void display_submitterLogin_in_logs_when_set_in_case_of_success() throws Exception {
+ when(queue.peek(UNKNOWN_WORKER_UUID)).thenReturn(Optional.of(createCeTask("FooBar")));
+ taskProcessorRepository.setProcessorForTask(CeTaskTypes.REPORT, taskProcessor);
+
+ underTest.call();
+
+ List<String> logs = logTester.logs(LoggerLevel.INFO);
+ assertThat(logs).hasSize(2);
+ assertThat(logs.get(0)).contains(" | submitter=FooBar");
+ assertThat(logs.get(1)).contains(" | submitter=FooBar | time=");
+ assertThat(logTester.logs(LoggerLevel.ERROR)).isEmpty();
+ assertThat(logTester.logs(LoggerLevel.DEBUG)).isEmpty();
+ }
+
+ @Test
+ public void display_submitterLogin_in_logs_when_set_in_case_of_error() throws Exception {
+ CeTask ceTask = createCeTask("FooBar");
+ when(queue.peek(UNKNOWN_WORKER_UUID)).thenReturn(Optional.of(ceTask));
+ taskProcessorRepository.setProcessorForTask(ceTask.getType(), taskProcessor);
+ makeTaskProcessorFail(ceTask);
+
+ underTest.call();
+
+ List<String> logs = logTester.logs(LoggerLevel.INFO);
+ assertThat(logs).hasSize(1);
+ assertThat(logs.iterator().next()).contains(" | submitter=FooBar");
+ logs = logTester.logs(LoggerLevel.ERROR);
+ assertThat(logs).hasSize(2);
+ assertThat(logs.get(0)).isEqualTo("Failed to execute task " + ceTask.getUuid());
+ assertThat(logs.get(1)).contains(" | submitter=FooBar | time=");
+ }
+
+ @Test
+ public void display_start_stop_at_debug_level_for_console_if_DEBUG_is_enabled_and_task_successful() throws Exception {
+ logTester.setLevel(LoggerLevel.DEBUG);
+
+ when(queue.peek(UNKNOWN_WORKER_UUID)).thenReturn(Optional.of(createCeTask("FooBar")));
+ taskProcessorRepository.setProcessorForTask(CeTaskTypes.REPORT, taskProcessor);
+
+ underTest.call();
+
+ List<String> logs = logTester.logs(LoggerLevel.INFO);
+ assertThat(logs).hasSize(2);
+ assertThat(logs.get(0)).contains(" | submitter=FooBar");
+ assertThat(logs.get(1)).contains(" | submitter=FooBar | time=");
+ assertThat(logTester.logs(LoggerLevel.ERROR)).isEmpty();
+ assertThat(logTester.logs(LoggerLevel.DEBUG)).isEmpty();
+ }
+
+ @Test
+ public void display_start_at_debug_level_stop_at_error_level_for_console_if_DEBUG_is_enabled_and_task_failed() throws Exception {
+ logTester.setLevel(LoggerLevel.DEBUG);
+
+ CeTask ceTask = createCeTask("FooBar");
+ when(queue.peek(UNKNOWN_WORKER_UUID)).thenReturn(Optional.of(ceTask));
+ taskProcessorRepository.setProcessorForTask(CeTaskTypes.REPORT, taskProcessor);
+ makeTaskProcessorFail(ceTask);
+
+ underTest.call();
+
+ List<String> logs = logTester.logs(LoggerLevel.INFO);
+ assertThat(logs).hasSize(1);
+ assertThat(logs.iterator().next()).contains(" | submitter=FooBar");
+ logs = logTester.logs(LoggerLevel.ERROR);
+ assertThat(logs).hasSize(2);
+ assertThat(logs.get(0)).isEqualTo("Failed to execute task " + ceTask.getUuid());
+ assertThat(logs.get(1)).contains(" | submitter=FooBar | time=");
+ assertThat(logTester.logs(LoggerLevel.DEBUG)).isEmpty();
+ }
+
+ private static CeTask createCeTask(@Nullable String submitterLogin) {
+ return new CeTask.Builder()
+ .setOrganizationUuid("org1")
+ .setUuid("TASK_1").setType(CeTaskTypes.REPORT)
+ .setComponentUuid("PROJECT_1")
+ .setSubmitterLogin(submitterLogin)
+ .build();
+ }
+
+ private IllegalStateException makeTaskProcessorFail(CeTask task) {
+ IllegalStateException error = new IllegalStateException("simulate exception thrown by TaskProcessor#process");
+ doThrow(error).when(taskProcessor).process(task);
+ return error;
+ }
+}