diff options
author | Sébastien Lesaint <sebastien.lesaint@sonarsource.com> | 2017-03-31 11:52:17 +0200 |
---|---|---|
committer | Eric Hartmann <hartmann.eric@gmail.Com> | 2017-04-27 09:23:18 +0200 |
commit | 2af0a2dd2bff90fb3a7cdcb531271d537f5c8b57 (patch) | |
tree | 488054d366f006667799c8d975492ee33ec57e6d /server/sonar-ce | |
parent | 588904e9c4460e485749effc85663e4594e16c79 (diff) | |
download | sonarqube-2af0a2dd2bff90fb3a7cdcb531271d537f5c8b57.tar.gz sonarqube-2af0a2dd2bff90fb3a7cdcb531271d537f5c8b57.zip |
SONAR-9057 CE tables cleaning and purging is done by web leader
moved CE queue and taskprocessor classes (the later depending upon the former) to sonar-ce module
moved CE configuration classes to sonar-ce-api module
Diffstat (limited to 'server/sonar-ce')
27 files changed, 3004 insertions, 6 deletions
diff --git a/server/sonar-ce/src/main/java/org/sonar/ce/CeConfigurationModule.java b/server/sonar-ce/src/main/java/org/sonar/ce/CeConfigurationModule.java index 5a37d1d06e0..f89cf4efd39 100644 --- a/server/sonar-ce/src/main/java/org/sonar/ce/CeConfigurationModule.java +++ b/server/sonar-ce/src/main/java/org/sonar/ce/CeConfigurationModule.java @@ -19,10 +19,10 @@ */ package org.sonar.ce; +import org.sonar.ce.configuration.CeConfigurationImpl; import org.sonar.ce.log.CeLogging; import org.sonar.core.platform.Module; import org.sonar.process.systeminfo.ProcessStateSystemInfo; -import org.sonar.server.computation.configuration.CeConfigurationImpl; import org.sonar.server.computation.monitoring.CeDatabaseMBeanImpl; public class CeConfigurationModule extends Module { diff --git a/server/sonar-ce/src/main/java/org/sonar/ce/CeQueueModule.java b/server/sonar-ce/src/main/java/org/sonar/ce/CeQueueModule.java index a11e826cd12..159e0d53687 100644 --- a/server/sonar-ce/src/main/java/org/sonar/ce/CeQueueModule.java +++ b/server/sonar-ce/src/main/java/org/sonar/ce/CeQueueModule.java @@ -22,8 +22,8 @@ package org.sonar.ce; import org.sonar.core.platform.Module; import org.sonar.server.computation.monitoring.CEQueueStatusImpl; import org.sonar.server.computation.monitoring.CeTasksMBeanImpl; -import org.sonar.server.computation.queue.CeQueueInitializer; -import org.sonar.server.computation.queue.InternalCeQueueImpl; +import org.sonar.ce.queue.CeQueueInitializer; +import org.sonar.ce.queue.InternalCeQueueImpl; public class CeQueueModule extends Module { @Override diff --git a/server/sonar-ce/src/main/java/org/sonar/ce/container/ComputeEngineContainerImpl.java b/server/sonar-ce/src/main/java/org/sonar/ce/container/ComputeEngineContainerImpl.java index ed306dd165a..c36413a9ac3 100644 --- a/server/sonar-ce/src/main/java/org/sonar/ce/container/ComputeEngineContainerImpl.java +++ b/server/sonar-ce/src/main/java/org/sonar/ce/container/ComputeEngineContainerImpl.java @@ -46,7 +46,10 @@ import org.sonar.ce.CeTaskCommonsModule; import org.sonar.ce.db.ReadOnlyPropertiesDao; import org.sonar.ce.log.CeProcessLogging; import org.sonar.ce.platform.ComputeEngineExtensionInstaller; +import org.sonar.ce.queue.CeQueueCleaner; +import org.sonar.ce.queue.PurgeCeActivities; import org.sonar.ce.settings.ProjectSettingsFactory; +import org.sonar.ce.taskprocessor.CeTaskProcessorModule; import org.sonar.ce.user.CeUserSession; import org.sonar.core.component.DefaultResourceTypes; import org.sonar.core.config.CorePropertyDefinitions; @@ -68,10 +71,7 @@ import org.sonar.process.logging.LogbackHelper; import org.sonar.server.component.ComponentCleanerService; import org.sonar.server.component.ComponentFinder; import org.sonar.server.component.index.ComponentIndexer; -import org.sonar.server.computation.queue.CeQueueCleaner; -import org.sonar.server.computation.queue.PurgeCeActivities; import org.sonar.server.computation.task.projectanalysis.ProjectAnalysisTaskModule; -import org.sonar.server.computation.taskprocessor.CeTaskProcessorModule; import org.sonar.server.debt.DebtModelPluginRepository; import org.sonar.server.debt.DebtRulesXMLImporter; import org.sonar.server.event.NewAlerts; diff --git a/server/sonar-ce/src/main/java/org/sonar/ce/queue/CeQueueCleaner.java b/server/sonar-ce/src/main/java/org/sonar/ce/queue/CeQueueCleaner.java new file mode 100644 index 00000000000..39dd83f88bb --- /dev/null +++ b/server/sonar-ce/src/main/java/org/sonar/ce/queue/CeQueueCleaner.java @@ -0,0 +1,85 @@ +/* + * SonarQube + * Copyright (C) 2009-2017 SonarSource SA + * mailto:info AT sonarsource DOT com + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 3 of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with this program; if not, write to the Free Software Foundation, + * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ +package org.sonar.ce.queue; + +import java.util.List; +import org.picocontainer.Startable; +import org.sonar.api.ce.ComputeEngineSide; +import org.sonar.api.platform.ServerUpgradeStatus; +import org.sonar.api.utils.log.Logger; +import org.sonar.api.utils.log.Loggers; +import org.sonar.db.DbClient; +import org.sonar.db.DbSession; + +/** + * Cleans-up the Compute Engine queue and resets the JMX counters. + * CE workers must not be started before execution of this class. + */ +@ComputeEngineSide +public class CeQueueCleaner implements Startable { + + private static final Logger LOGGER = Loggers.get(CeQueueCleaner.class); + + private final DbClient dbClient; + private final ServerUpgradeStatus serverUpgradeStatus; + private final InternalCeQueue queue; + + public CeQueueCleaner(DbClient dbClient, ServerUpgradeStatus serverUpgradeStatus, InternalCeQueue queue) { + this.dbClient = dbClient; + this.serverUpgradeStatus = serverUpgradeStatus; + this.queue = queue; + } + + @Override + public void start() { + if (serverUpgradeStatus.isUpgraded()) { + cleanOnUpgrade(); + } else { + try (DbSession dbSession = dbClient.openSession(false)) { + verifyConsistency(dbSession); + } + } + } + + private void cleanOnUpgrade() { + // we assume that pending tasks are not compatible with the new version + // and can't be processed + LOGGER.info("Cancel all pending tasks (due to upgrade)"); + queue.clear(); + } + + private void verifyConsistency(DbSession dbSession) { + // server is not being upgraded + dbClient.ceQueueDao().resetAllToPendingStatus(dbSession); + dbSession.commit(); + + // Reports that have been processed are not kept in database yet. + // They are supposed to be systematically dropped. + // Let's clean-up orphans if any. + List<String> uuids = dbClient.ceTaskInputDao().selectUuidsNotInQueue(dbSession); + dbClient.ceTaskInputDao().deleteByUuids(dbSession, uuids); + dbSession.commit(); + } + + @Override + public void stop() { + // nothing to do + } +} diff --git a/server/sonar-ce/src/main/java/org/sonar/ce/queue/CeQueueInitializer.java b/server/sonar-ce/src/main/java/org/sonar/ce/queue/CeQueueInitializer.java new file mode 100644 index 00000000000..d794a4c1ebb --- /dev/null +++ b/server/sonar-ce/src/main/java/org/sonar/ce/queue/CeQueueInitializer.java @@ -0,0 +1,53 @@ +/* + * SonarQube + * Copyright (C) 2009-2017 SonarSource SA + * mailto:info AT sonarsource DOT com + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 3 of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with this program; if not, write to the Free Software Foundation, + * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ +package org.sonar.ce.queue; + +import org.sonar.api.ce.ComputeEngineSide; +import org.sonar.api.platform.Server; +import org.sonar.api.platform.ServerStartHandler; +import org.sonar.ce.taskprocessor.CeProcessingScheduler; + +/** + * Cleans-up the queue, initializes JMX counters then schedule + * the execution of workers. That allows to not prevent workers + * from peeking the queue before it's ready. + */ +@ComputeEngineSide +public class CeQueueInitializer implements ServerStartHandler { + + private final CeProcessingScheduler scheduler; + private boolean done = false; + + public CeQueueInitializer(CeProcessingScheduler scheduler) { + this.scheduler = scheduler; + } + + @Override + public void onServerStart(Server server) { + if (!done) { + initCe(); + this.done = true; + } + } + + private void initCe() { + scheduler.startScheduling(); + } +} diff --git a/server/sonar-ce/src/main/java/org/sonar/ce/queue/InternalCeQueue.java b/server/sonar-ce/src/main/java/org/sonar/ce/queue/InternalCeQueue.java new file mode 100644 index 00000000000..01f5341f764 --- /dev/null +++ b/server/sonar-ce/src/main/java/org/sonar/ce/queue/InternalCeQueue.java @@ -0,0 +1,78 @@ +/* + * SonarQube + * Copyright (C) 2009-2017 SonarSource SA + * mailto:info AT sonarsource DOT com + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 3 of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with this program; if not, write to the Free Software Foundation, + * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ +package org.sonar.ce.queue; + +import com.google.common.base.Optional; +import javax.annotation.Nullable; +import org.sonar.ce.queue.CeQueue; +import org.sonar.ce.queue.CeTask; +import org.sonar.ce.queue.CeTaskResult; +import org.sonar.db.ce.CeActivityDto.Status; + +/** + * Queue of pending Compute Engine tasks. Both producer and consumer actions + * are implemented. + * <p> + * This class is decoupled from the regular task type {@link org.sonar.db.ce.CeTaskTypes#REPORT}. + * </p> + */ +public interface InternalCeQueue extends CeQueue { + + /** + * Peek the oldest task in status {@link org.sonar.db.ce.CeQueueDto.Status#PENDING}. + * The task status is changed to {@link org.sonar.db.ce.CeQueueDto.Status#IN_PROGRESS}. + * Does not return anything if the queue is paused (see {@link #isPeekPaused()}. + * + * <p>Only a single task can be peeked by project.</p> + * + * <p>An unchecked exception may be thrown on technical errors (db connection, ...).</p> + * + * <p>Tasks which have been executed twice already but are still {@link org.sonar.db.ce.CeQueueDto.Status#PENDING} + * are ignored</p> + */ + Optional<CeTask> peek(String workerUuid); + + /** + * Removes all the tasks from the queue, whatever their status. They are marked + * as {@link Status#CANCELED} in past activity. + * This method can NOT be called when workers are being executed, as in progress + * tasks can't be killed. + * + * @return the number of canceled tasks + */ + int clear(); + + /** + * Removes a task from the queue and registers it to past activities. This method + * is called by Compute Engine workers when task is processed and can include an option {@link CeTaskResult} object. + * + * @throws IllegalStateException if the task does not exist in the queue + * @throws IllegalArgumentException if {@code error} is non {@code null} but {@code status} is not {@link Status#FAILED} + */ + void remove(CeTask task, Status status, @Nullable CeTaskResult taskResult, @Nullable Throwable error); + + void cancelWornOuts(); + + void pausePeek(); + + void resumePeek(); + + boolean isPeekPaused(); +} diff --git a/server/sonar-ce/src/main/java/org/sonar/ce/queue/InternalCeQueueImpl.java b/server/sonar-ce/src/main/java/org/sonar/ce/queue/InternalCeQueueImpl.java new file mode 100644 index 00000000000..bcd6f0abccf --- /dev/null +++ b/server/sonar-ce/src/main/java/org/sonar/ce/queue/InternalCeQueueImpl.java @@ -0,0 +1,251 @@ +/* + * SonarQube + * Copyright (C) 2009-2017 SonarSource SA + * mailto:info AT sonarsource DOT com + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 3 of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with this program; if not, write to the Free Software Foundation, + * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ +package org.sonar.ce.queue; + +import com.google.common.base.Optional; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.io.PrintWriter; +import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; +import javax.annotation.CheckForNull; +import javax.annotation.Nullable; +import org.apache.log4j.Logger; +import org.sonar.api.ce.ComputeEngineSide; +import org.sonar.api.utils.System2; +import org.sonar.ce.monitoring.CEQueueStatus; +import org.sonar.core.util.UuidFactory; +import org.sonar.db.DbClient; +import org.sonar.db.DbSession; +import org.sonar.db.ce.CeActivityDto; +import org.sonar.db.ce.CeQueueDto; +import org.sonar.server.organization.DefaultOrganizationProvider; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkState; +import static java.util.Objects.requireNonNull; + +@ComputeEngineSide +public class InternalCeQueueImpl extends CeQueueImpl implements InternalCeQueue { + + private static final int MAX_EXECUTION_COUNT = 2; + + private final System2 system2; + private final DbClient dbClient; + private final CEQueueStatus queueStatus; + + // state + private AtomicBoolean peekPaused = new AtomicBoolean(false); + + public InternalCeQueueImpl(System2 system2, DbClient dbClient, UuidFactory uuidFactory, CEQueueStatus queueStatus, + DefaultOrganizationProvider defaultOrganizationProvider) { + super(dbClient, uuidFactory, defaultOrganizationProvider); + this.system2 = system2; + this.dbClient = dbClient; + this.queueStatus = queueStatus; + } + + @Override + public Optional<CeTask> peek(String workerUuid) { + requireNonNull(workerUuid, "workerUuid can't be null"); + + if (peekPaused.get()) { + return Optional.absent(); + } + try (DbSession dbSession = dbClient.openSession(false)) { + Optional<CeQueueDto> dto = dbClient.ceQueueDao().peek(dbSession, workerUuid, MAX_EXECUTION_COUNT); + CeTask task = null; + if (dto.isPresent()) { + task = loadTask(dbSession, dto.get()); + queueStatus.addInProgress(); + } + return Optional.fromNullable(task); + + } + } + + @Override + public int clear() { + return cancelAll(true); + } + + @Override + public void remove(CeTask task, CeActivityDto.Status status, @Nullable CeTaskResult taskResult, @Nullable Throwable error) { + checkArgument(error == null || status == CeActivityDto.Status.FAILED, "Error can be provided only when status is FAILED"); + try (DbSession dbSession = dbClient.openSession(false)) { + Optional<CeQueueDto> queueDto = dbClient.ceQueueDao().selectByUuid(dbSession, task.getUuid()); + checkState(queueDto.isPresent(), "Task does not exist anymore: %s", task); + CeActivityDto activityDto = new CeActivityDto(queueDto.get()); + activityDto.setStatus(status); + updateQueueStatus(status, activityDto); + updateTaskResult(activityDto, taskResult); + updateError(activityDto, error); + remove(dbSession, queueDto.get(), activityDto); + } + } + + private static void updateTaskResult(CeActivityDto activityDto, @Nullable CeTaskResult taskResult) { + if (taskResult != null) { + java.util.Optional<String> analysisUuid = taskResult.getAnalysisUuid(); + if (analysisUuid.isPresent()) { + activityDto.setAnalysisUuid(analysisUuid.get()); + } + } + } + + private static void updateError(CeActivityDto activityDto, @Nullable Throwable error) { + if (error == null) { + return; + } + + activityDto.setErrorMessage(error.getMessage()); + String stacktrace = getStackTraceForPersistence(error); + if (stacktrace != null) { + activityDto.setErrorStacktrace(stacktrace); + } + } + + @CheckForNull + private static String getStackTraceForPersistence(Throwable error) { + try (ByteArrayOutputStream out = new ByteArrayOutputStream(); + LineReturnEnforcedPrintStream printStream = new LineReturnEnforcedPrintStream(out)) { + error.printStackTrace(printStream); + printStream.flush(); + return out.toString(); + } catch (IOException e) { + Logger.getLogger(InternalCeQueueImpl.class).debug("Failed to getStacktrace out of error", e); + return null; + } + } + + private void updateQueueStatus(CeActivityDto.Status status, CeActivityDto activityDto) { + Long startedAt = activityDto.getStartedAt(); + if (startedAt == null) { + return; + } + activityDto.setExecutedAt(system2.now()); + long executionTimeInMs = activityDto.getExecutedAt() - startedAt; + activityDto.setExecutionTimeMs(executionTimeInMs); + if (status == CeActivityDto.Status.SUCCESS) { + queueStatus.addSuccess(executionTimeInMs); + } else { + queueStatus.addError(executionTimeInMs); + } + } + + @Override + public void cancelWornOuts() { + try (DbSession dbSession = dbClient.openSession(false)) { + List<CeQueueDto> wornOutTasks = dbClient.ceQueueDao().selectPendingByMinimumExecutionCount(dbSession, MAX_EXECUTION_COUNT); + wornOutTasks.forEach(queueDto -> { + CeActivityDto activityDto = new CeActivityDto(queueDto); + activityDto.setStatus(CeActivityDto.Status.CANCELED); + updateQueueStatus(CeActivityDto.Status.CANCELED, activityDto); + remove(dbSession, queueDto, activityDto); + }); + } + } + + @Override + public void pausePeek() { + this.peekPaused.set(true); + } + + @Override + public void resumePeek() { + this.peekPaused.set(false); + } + + @Override + public boolean isPeekPaused() { + return peekPaused.get(); + } + + /** + * A {@link PrintWriter} subclass which enforces that line returns are {@code \n} whichever the platform. + */ + private static class LineReturnEnforcedPrintStream extends PrintWriter { + + LineReturnEnforcedPrintStream(OutputStream out) { + super(out); + } + + @Override + public void println() { + super.print('\n'); + } + + @Override + public void println(boolean x) { + super.print(x); + println(); + } + + @Override + public void println(char x) { + super.print(x); + println(); + } + + @Override + public void println(int x) { + super.print(x); + println(); + } + + @Override + public void println(long x) { + super.print(x); + println(); + } + + @Override + public void println(float x) { + super.print(x); + println(); + } + + @Override + public void println(double x) { + super.print(x); + println(); + } + + @Override + public void println(char[] x) { + super.print(x); + println(); + } + + @Override + public void println(String x) { + super.print(x); + println(); + } + + @Override + public void println(Object x) { + super.print(x); + println(); + } + } + +} diff --git a/server/sonar-ce/src/main/java/org/sonar/ce/queue/PurgeCeActivities.java b/server/sonar-ce/src/main/java/org/sonar/ce/queue/PurgeCeActivities.java new file mode 100644 index 00000000000..04a81ef1208 --- /dev/null +++ b/server/sonar-ce/src/main/java/org/sonar/ce/queue/PurgeCeActivities.java @@ -0,0 +1,69 @@ +/* + * SonarQube + * Copyright (C) 2009-2017 SonarSource SA + * mailto:info AT sonarsource DOT com + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 3 of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with this program; if not, write to the Free Software Foundation, + * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ +package org.sonar.ce.queue; + +import java.util.Calendar; +import java.util.Set; +import org.sonar.api.Startable; +import org.sonar.api.ce.ComputeEngineSide; +import org.sonar.api.utils.System2; +import org.sonar.api.utils.log.Logger; +import org.sonar.api.utils.log.Loggers; +import org.sonar.core.util.stream.MoreCollectors; +import org.sonar.db.DbClient; +import org.sonar.db.DbSession; +import org.sonar.db.ce.CeActivityDto; + +@ComputeEngineSide +public class PurgeCeActivities implements Startable { + + private static final Logger LOGGER = Loggers.get(PurgeCeActivities.class); + + private final DbClient dbClient; + private final System2 system2; + + public PurgeCeActivities(DbClient dbClient, System2 system2) { + this.dbClient = dbClient; + this.system2 = system2; + } + + @Override + public void start() { + try (DbSession dbSession = dbClient.openSession(false)) { + Calendar sixMonthsAgo = Calendar.getInstance(); + sixMonthsAgo.setTimeInMillis(system2.now()); + sixMonthsAgo.add(Calendar.DATE, -180); + + LOGGER.info("Delete the Compute Engine tasks created before {}", sixMonthsAgo.getTime()); + Set<String> ceActivityUuids = dbClient.ceActivityDao().selectOlderThan(dbSession, sixMonthsAgo.getTimeInMillis()) + .stream() + .map(CeActivityDto::getUuid) + .collect(MoreCollectors.toSet()); + dbClient.ceActivityDao().deleteByUuids(dbSession, ceActivityUuids); + dbClient.ceScannerContextDao().deleteByUuids(dbSession, ceActivityUuids); + dbSession.commit(); + } + } + + @Override + public void stop() { + // nothing to do + } +} diff --git a/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeProcessingScheduler.java b/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeProcessingScheduler.java new file mode 100644 index 00000000000..831684d2e8c --- /dev/null +++ b/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeProcessingScheduler.java @@ -0,0 +1,26 @@ +/* + * SonarQube + * Copyright (C) 2009-2017 SonarSource SA + * mailto:info AT sonarsource DOT com + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 3 of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with this program; if not, write to the Free Software Foundation, + * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ +package org.sonar.ce.taskprocessor; + +public interface CeProcessingScheduler { + + void startScheduling(); + +} diff --git a/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeProcessingSchedulerExecutorService.java b/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeProcessingSchedulerExecutorService.java new file mode 100644 index 00000000000..2aa14e6962d --- /dev/null +++ b/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeProcessingSchedulerExecutorService.java @@ -0,0 +1,29 @@ +/* + * SonarQube + * Copyright (C) 2009-2017 SonarSource SA + * mailto:info AT sonarsource DOT com + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 3 of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with this program; if not, write to the Free Software Foundation, + * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ +package org.sonar.ce.taskprocessor; + +import com.google.common.util.concurrent.ListeningScheduledExecutorService; +import org.sonar.server.util.StoppableExecutorService; + +/** + * The {@link java.util.concurrent.ExecutorService} responsible for running {@link CeWorkerCallableImpl}. + */ +public interface CeProcessingSchedulerExecutorService extends StoppableExecutorService, ListeningScheduledExecutorService { +} diff --git a/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeProcessingSchedulerExecutorServiceImpl.java b/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeProcessingSchedulerExecutorServiceImpl.java new file mode 100644 index 00000000000..184d32cbc15 --- /dev/null +++ b/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeProcessingSchedulerExecutorServiceImpl.java @@ -0,0 +1,81 @@ +/* + * SonarQube + * Copyright (C) 2009-2017 SonarSource SA + * mailto:info AT sonarsource DOT com + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 3 of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with this program; if not, write to the Free Software Foundation, + * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ +package org.sonar.ce.taskprocessor; + +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListenableScheduledFuture; +import com.google.common.util.concurrent.ListeningScheduledExecutorService; +import com.google.common.util.concurrent.MoreExecutors; +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import java.util.concurrent.Callable; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import org.sonar.ce.configuration.CeConfiguration; +import org.sonar.server.util.AbstractStoppableExecutorService; + +public class CeProcessingSchedulerExecutorServiceImpl extends AbstractStoppableExecutorService<ListeningScheduledExecutorService> + implements CeProcessingSchedulerExecutorService { + private static final String THREAD_NAME_PREFIX = "ce-worker-"; + + public CeProcessingSchedulerExecutorServiceImpl(CeConfiguration ceConfiguration) { + super( + MoreExecutors.listeningDecorator( + Executors.newScheduledThreadPool(ceConfiguration.getWorkerCount(), + new ThreadFactoryBuilder() + .setNameFormat(THREAD_NAME_PREFIX + "%d") + .setPriority(Thread.MIN_PRIORITY) + .build()))); + } + + @Override + public <T> ListenableFuture<T> submit(Callable<T> task) { + return delegate.submit(task); + } + + @Override + public <T> ListenableFuture<T> submit(Runnable task, T result) { + return delegate.submit(task, result); + } + + @Override + public ListenableFuture<?> submit(Runnable task) { + return delegate.submit(task); + } + + @Override + public ListenableScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) { + return delegate.schedule(command, delay, unit); + } + + @Override + public <V> ListenableScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) { + return delegate.schedule(callable, delay, unit); + } + + @Override + public ListenableScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) { + return delegate.scheduleAtFixedRate(command, initialDelay, period, unit); + } + + @Override + public ListenableScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) { + return delegate.scheduleWithFixedDelay(command, initialDelay, delay, unit); + } +} diff --git a/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeProcessingSchedulerImpl.java b/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeProcessingSchedulerImpl.java new file mode 100644 index 00000000000..47f56af6ff1 --- /dev/null +++ b/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeProcessingSchedulerImpl.java @@ -0,0 +1,137 @@ +/* + * SonarQube + * Copyright (C) 2009-2017 SonarSource SA + * mailto:info AT sonarsource DOT com + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 3 of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with this program; if not, write to the Free Software Foundation, + * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ +package org.sonar.ce.taskprocessor; + +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListenableScheduledFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import javax.annotation.CheckForNull; +import javax.annotation.Nullable; +import org.picocontainer.Startable; +import org.sonar.api.utils.log.Logger; +import org.sonar.api.utils.log.Loggers; +import org.sonar.ce.configuration.CeConfiguration; + +import static com.google.common.util.concurrent.Futures.addCallback; +import static java.util.concurrent.TimeUnit.MILLISECONDS; + +public class CeProcessingSchedulerImpl implements CeProcessingScheduler, Startable { + private static final Logger LOG = Loggers.get(CeProcessingSchedulerImpl.class); + + private final CeProcessingSchedulerExecutorService executorService; + private final CeWorkerCallable workerRunnable; + + private final long delayBetweenTasks; + private final TimeUnit timeUnit; + private final ChainingCallback[] chainingCallbacks; + + public CeProcessingSchedulerImpl(CeConfiguration ceConfiguration, + CeProcessingSchedulerExecutorService processingExecutorService, CeWorkerCallable workerRunnable) { + this.executorService = processingExecutorService; + this.workerRunnable = workerRunnable; + + this.delayBetweenTasks = ceConfiguration.getQueuePollingDelay(); + this.timeUnit = MILLISECONDS; + + int workerCount = ceConfiguration.getWorkerCount(); + this.chainingCallbacks = new ChainingCallback[workerCount]; + for (int i = 0; i < workerCount; i++) { + chainingCallbacks[i] = new ChainingCallback(); + } + } + + @Override + public void start() { + // nothing to do at component startup, startScheduling will be called by CeQueueInitializer + } + + @Override + public void startScheduling() { + for (ChainingCallback chainingCallback : chainingCallbacks) { + ListenableScheduledFuture<Boolean> future = executorService.schedule(workerRunnable, delayBetweenTasks, timeUnit); + addCallback(future, chainingCallback, executorService); + } + } + + @Override + public void stop() { + for (ChainingCallback chainingCallback : chainingCallbacks) { + chainingCallback.stop(); + } + } + + private class ChainingCallback implements FutureCallback<Boolean> { + private final AtomicBoolean keepRunning = new AtomicBoolean(true); + @CheckForNull + private ListenableFuture<Boolean> workerFuture; + + @Override + public void onSuccess(@Nullable Boolean result) { + if (result != null && result) { + chainWithoutDelay(); + } else { + chainWithDelay(); + } + } + + @Override + public void onFailure(Throwable t) { + if (t instanceof Error) { + LOG.error("Compute Engine execution failed. Scheduled processing interrupted.", t); + } else { + chainWithoutDelay(); + } + } + + private void chainWithoutDelay() { + if (keepRunning()) { + workerFuture = executorService.submit(workerRunnable); + } + addCallback(); + } + + private void chainWithDelay() { + if (keepRunning()) { + workerFuture = executorService.schedule(workerRunnable, delayBetweenTasks, timeUnit); + } + addCallback(); + } + + private void addCallback() { + if (workerFuture != null && keepRunning()) { + Futures.addCallback(workerFuture, this, executorService); + } + } + + private boolean keepRunning() { + return keepRunning.get(); + } + + public void stop() { + this.keepRunning.set(false); + if (workerFuture != null) { + workerFuture.cancel(false); + } + } + } +} diff --git a/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeTaskProcessorModule.java b/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeTaskProcessorModule.java new file mode 100644 index 00000000000..b6f08854b8c --- /dev/null +++ b/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeTaskProcessorModule.java @@ -0,0 +1,33 @@ +/* + * SonarQube + * Copyright (C) 2009-2017 SonarSource SA + * mailto:info AT sonarsource DOT com + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 3 of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with this program; if not, write to the Free Software Foundation, + * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ +package org.sonar.ce.taskprocessor; + +import org.sonar.core.platform.Module; + +public class CeTaskProcessorModule extends Module { + @Override + protected void configureModule() { + add( + CeTaskProcessorRepositoryImpl.class, + CeWorkerCallableImpl.class, + CeProcessingSchedulerExecutorServiceImpl.class, + CeProcessingSchedulerImpl.class); + } +} diff --git a/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeTaskProcessorRepository.java b/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeTaskProcessorRepository.java new file mode 100644 index 00000000000..7fbbfa3f3f7 --- /dev/null +++ b/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeTaskProcessorRepository.java @@ -0,0 +1,34 @@ +/* + * SonarQube + * Copyright (C) 2009-2017 SonarSource SA + * mailto:info AT sonarsource DOT com + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 3 of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with this program; if not, write to the Free Software Foundation, + * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ +package org.sonar.ce.taskprocessor; + +import com.google.common.base.Optional; +import org.sonar.ce.queue.CeTask; +import org.sonar.ce.taskprocessor.CeTaskProcessor; + +public interface CeTaskProcessorRepository { + + /** + * @throws NullPointerException if the specified {@link CeTask} is {@code null} + * @throws IllegalStateException if there is no {@link CeTaskProcessor} for the specified {@link CeTask} in the repository + */ + Optional<CeTaskProcessor> getForCeTask(CeTask ceTask); + +} diff --git a/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeTaskProcessorRepositoryImpl.java b/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeTaskProcessorRepositoryImpl.java new file mode 100644 index 00000000000..24a2bf07530 --- /dev/null +++ b/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeTaskProcessorRepositoryImpl.java @@ -0,0 +1,105 @@ +/* + * SonarQube + * Copyright (C) 2009-2017 SonarSource SA + * mailto:info AT sonarsource DOT com + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 3 of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with this program; if not, write to the Free Software Foundation, + * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ +package org.sonar.ce.taskprocessor; + +import com.google.common.base.Function; +import com.google.common.base.Joiner; +import com.google.common.base.Optional; +import com.google.common.collect.ArrayListMultimap; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Maps; +import com.google.common.collect.Multimap; +import java.util.Collection; +import java.util.Map; +import javax.annotation.Nonnull; +import org.sonar.ce.queue.CeTask; +import org.sonar.ce.taskprocessor.CeTaskProcessor; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.collect.FluentIterable.from; +import static java.lang.String.CASE_INSENSITIVE_ORDER; +import static java.lang.String.format; + +/** + * {@link CeTaskProcessorRepository} implementation which provides access to the {@link CeTaskProcessor} existing in the + * PicoContainer the current object belongs to. + */ +public class CeTaskProcessorRepositoryImpl implements CeTaskProcessorRepository { + private static final Joiner COMMA_JOINER = Joiner.on(", "); + + private final Map<String, CeTaskProcessor> taskProcessorByCeTaskType; + + public CeTaskProcessorRepositoryImpl(CeTaskProcessor[] taskProcessors) { + this.taskProcessorByCeTaskType = indexTaskProcessors(taskProcessors); + } + + @Override + public Optional<CeTaskProcessor> getForCeTask(CeTask ceTask) { + return Optional.fromNullable(taskProcessorByCeTaskType.get(ceTask.getType())); + } + + private static Map<String, CeTaskProcessor> indexTaskProcessors(CeTaskProcessor[] taskProcessors) { + Multimap<String, CeTaskProcessor> permissiveIndex = buildPermissiveCeTaskProcessorIndex(taskProcessors); + checkUniqueHandlerPerCeTaskType(permissiveIndex); + return ImmutableMap.copyOf(Maps.transformValues(permissiveIndex.asMap(), CeTaskProcessorCollectionToFirstElement.INSTANCE)); + } + + private static Multimap<String, CeTaskProcessor> buildPermissiveCeTaskProcessorIndex(CeTaskProcessor[] taskProcessors) { + Multimap<String, CeTaskProcessor> permissiveIndex = ArrayListMultimap.create(taskProcessors.length, 1); + for (CeTaskProcessor taskProcessor : taskProcessors) { + for (String ceTaskType : taskProcessor.getHandledCeTaskTypes()) { + permissiveIndex.put(ceTaskType, taskProcessor); + } + } + return permissiveIndex; + } + + private static void checkUniqueHandlerPerCeTaskType(Multimap<String, CeTaskProcessor> permissiveIndex) { + for (Map.Entry<String, Collection<CeTaskProcessor>> entry : permissiveIndex.asMap().entrySet()) { + checkArgument( + entry.getValue().size() == 1, + format( + "There can be only one CeTaskProcessor instance registered as the processor for CeTask type %s. " + + "More than one found. Please fix your configuration: %s", + entry.getKey(), + COMMA_JOINER.join(from(entry.getValue()).transform(ToClassName.INSTANCE).toSortedList(CASE_INSENSITIVE_ORDER)))); + } + } + + private enum ToClassName implements Function<Object, String> { + INSTANCE; + + @Override + @Nonnull + public String apply(@Nonnull Object input) { + return input.getClass().getName(); + } + } + + private enum CeTaskProcessorCollectionToFirstElement implements Function<Collection<CeTaskProcessor>, CeTaskProcessor> { + INSTANCE; + + @Override + @Nonnull + public CeTaskProcessor apply(@Nonnull Collection<CeTaskProcessor> input) { + return input.iterator().next(); + } + } +} diff --git a/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeWorkerCallable.java b/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeWorkerCallable.java new file mode 100644 index 00000000000..2618b37b070 --- /dev/null +++ b/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeWorkerCallable.java @@ -0,0 +1,32 @@ +/* + * SonarQube + * Copyright (C) 2009-2017 SonarSource SA + * mailto:info AT sonarsource DOT com + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 3 of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with this program; if not, write to the Free Software Foundation, + * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ +package org.sonar.ce.taskprocessor; + +import java.util.concurrent.Callable; +import org.sonar.ce.queue.CeQueue; +import org.sonar.ce.queue.CeTask; + +/** + * Marker interface of the runnable in charge of polling the {@link CeQueue} and executing {@link CeTask}. + * {@link Callable#call()} returns a Boolean which is {@code true} when some a {@link CeTask} was processed, + * {@code false} otherwise. + */ +public interface CeWorkerCallable extends Callable<Boolean> { +} diff --git a/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeWorkerCallableImpl.java b/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeWorkerCallableImpl.java new file mode 100644 index 00000000000..5bb5a047068 --- /dev/null +++ b/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeWorkerCallableImpl.java @@ -0,0 +1,137 @@ +/* + * SonarQube + * Copyright (C) 2009-2017 SonarSource SA + * mailto:info AT sonarsource DOT com + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 3 of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with this program; if not, write to the Free Software Foundation, + * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ +package org.sonar.ce.taskprocessor; + +import com.google.common.base.Optional; +import javax.annotation.Nullable; +import org.sonar.api.utils.log.Logger; +import org.sonar.api.utils.log.Loggers; +import org.sonar.ce.log.CeLogging; +import org.sonar.ce.queue.CeTask; +import org.sonar.ce.queue.CeTaskResult; +import org.sonar.ce.queue.InternalCeQueue; +import org.sonar.core.util.logs.Profiler; +import org.sonar.db.ce.CeActivityDto; + +import static java.lang.String.format; + +public class CeWorkerCallableImpl implements CeWorkerCallable { + + private static final Logger LOG = Loggers.get(CeWorkerCallableImpl.class); + + private final InternalCeQueue queue; + private final CeLogging ceLogging; + private final CeTaskProcessorRepository taskProcessorRepository; + + public CeWorkerCallableImpl(InternalCeQueue queue, CeLogging ceLogging, CeTaskProcessorRepository taskProcessorRepository) { + this.queue = queue; + this.ceLogging = ceLogging; + this.taskProcessorRepository = taskProcessorRepository; + } + + @Override + public Boolean call() throws Exception { + Optional<CeTask> ceTask = tryAndFindTaskToExecute(); + if (!ceTask.isPresent()) { + return false; + } + + try { + executeTask(ceTask.get()); + } catch (Exception e) { + LOG.error(format("An error occurred while executing task with uuid '%s'", ceTask.get().getUuid()), e); + } + return true; + } + + private Optional<CeTask> tryAndFindTaskToExecute() { + try { + return queue.peek("UNKNOWN" /*FIXME provide a real worker uuid*/); + } catch (Exception e) { + LOG.error("Failed to pop the queue of analysis reports", e); + } + return Optional.absent(); + } + + private void executeTask(CeTask task) { + ceLogging.initForTask(task); + Profiler ceProfiler = startActivityProfiler(task); + + CeActivityDto.Status status = CeActivityDto.Status.FAILED; + CeTaskResult taskResult = null; + Throwable error = null; + try { + // TODO delegate the message to the related task processor, according to task type + Optional<CeTaskProcessor> taskProcessor = taskProcessorRepository.getForCeTask(task); + if (taskProcessor.isPresent()) { + taskResult = taskProcessor.get().process(task); + status = CeActivityDto.Status.SUCCESS; + } else { + LOG.error("No CeTaskProcessor is defined for task of type {}. Plugin configuration may have changed", task.getType()); + status = CeActivityDto.Status.FAILED; + } + } catch (Throwable e) { + LOG.error(format("Failed to execute task %s", task.getUuid()), e); + error = e; + } finally { + finalizeTask(task, ceProfiler, status, taskResult, error); + } + } + + private void finalizeTask(CeTask task, Profiler ceProfiler, CeActivityDto.Status status, + @Nullable CeTaskResult taskResult, @Nullable Throwable error) { + try { + queue.remove(task, status, taskResult, error); + } catch (Exception e) { + LOG.error(format("Failed to finalize task with uuid '%s' and persist its state to db", task.getUuid()), e); + } finally { + stopActivityProfiler(ceProfiler, task, status); + ceLogging.clearForTask(); + } + } + + private static Profiler startActivityProfiler(CeTask task) { + Profiler profiler = Profiler.create(LOG); + addContext(profiler, task); + return profiler.startInfo("Execute task"); + } + + private static void stopActivityProfiler(Profiler profiler, CeTask task, CeActivityDto.Status status) { + addContext(profiler, task); + if (status == CeActivityDto.Status.FAILED) { + profiler.stopError("Executed task"); + } else { + profiler.stopInfo("Executed task"); + } + } + + private static void addContext(Profiler profiler, CeTask task) { + profiler + .logTimeLast(true) + .addContext("project", task.getComponentKey()) + .addContext("type", task.getType()) + .addContext("id", task.getUuid()); + String submitterLogin = task.getSubmitterLogin(); + if (submitterLogin != null) { + profiler.addContext("submitter", submitterLogin); + } + } + +} diff --git a/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/package-info.java b/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/package-info.java new file mode 100644 index 00000000000..131bfda3efd --- /dev/null +++ b/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/package-info.java @@ -0,0 +1,23 @@ +/* + * SonarQube + * Copyright (C) 2009-2017 SonarSource SA + * mailto:info AT sonarsource DOT com + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 3 of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with this program; if not, write to the Free Software Foundation, + * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ +@ParametersAreNonnullByDefault +package org.sonar.ce.taskprocessor; + +import javax.annotation.ParametersAreNonnullByDefault; diff --git a/server/sonar-ce/src/test/java/org/sonar/ce/configuration/CeConfigurationRule.java b/server/sonar-ce/src/test/java/org/sonar/ce/configuration/CeConfigurationRule.java new file mode 100644 index 00000000000..6bdf086d3c7 --- /dev/null +++ b/server/sonar-ce/src/test/java/org/sonar/ce/configuration/CeConfigurationRule.java @@ -0,0 +1,53 @@ +/* + * SonarQube + * Copyright (C) 2009-2017 SonarSource SA + * mailto:info AT sonarsource DOT com + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 3 of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with this program; if not, write to the Free Software Foundation, + * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ +package org.sonar.ce.configuration; + +import org.junit.rules.ExternalResource; + +import static com.google.common.base.Preconditions.checkArgument; + +/** + * Mutable implementation of {@link CeConfiguration} as {@link org.junit.Rule}. + */ +public class CeConfigurationRule extends ExternalResource implements CeConfiguration { + private int workerCount = CeConfigurationImpl.DEFAULT_WORKER_COUNT; + private long queuePollingDelay = CeConfigurationImpl.DEFAULT_QUEUE_POLLING_DELAY; + + @Override + public int getWorkerCount() { + return workerCount; + } + + public CeConfigurationRule setWorkerCount(int workerCount) { + checkArgument(workerCount >= 1, "worker count must be >= 1"); + this.workerCount = workerCount; + return this; + } + + @Override + public long getQueuePollingDelay() { + return queuePollingDelay; + } + + public void setQueuePollingDelay(int queuePollingDelay) { + checkArgument(queuePollingDelay > 0, "Queue polling delay must be >= 0"); + this.queuePollingDelay = queuePollingDelay; + } +} diff --git a/server/sonar-ce/src/test/java/org/sonar/ce/queue/CeQueueCleanerTest.java b/server/sonar-ce/src/test/java/org/sonar/ce/queue/CeQueueCleanerTest.java new file mode 100644 index 00000000000..0397af0d4bd --- /dev/null +++ b/server/sonar-ce/src/test/java/org/sonar/ce/queue/CeQueueCleanerTest.java @@ -0,0 +1,101 @@ +/* + * SonarQube + * Copyright (C) 2009-2017 SonarSource SA + * mailto:info AT sonarsource DOT com + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 3 of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with this program; if not, write to the Free Software Foundation, + * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ +package org.sonar.ce.queue; + +import java.io.IOException; +import java.util.Optional; +import org.apache.commons.io.IOUtils; +import org.junit.Rule; +import org.junit.Test; +import org.sonar.api.platform.ServerUpgradeStatus; +import org.sonar.api.utils.System2; +import org.sonar.db.DbTester; +import org.sonar.db.ce.CeQueueDto; +import org.sonar.db.ce.CeTaskInputDao; +import org.sonar.db.ce.CeTaskTypes; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +public class CeQueueCleanerTest { + + @Rule + public DbTester dbTester = DbTester.create(System2.INSTANCE); + + private ServerUpgradeStatus serverUpgradeStatus = mock(ServerUpgradeStatus.class); + private InternalCeQueue queue = mock(InternalCeQueue.class); + private CeQueueCleaner underTest = new CeQueueCleaner(dbTester.getDbClient(), serverUpgradeStatus, queue); + + @Test + public void start_resets_in_progress_tasks_to_pending() throws IOException { + insertInQueue("TASK_1", CeQueueDto.Status.PENDING); + insertInQueue("TASK_2", CeQueueDto.Status.IN_PROGRESS); + + underTest.start(); + + assertThat(dbTester.getDbClient().ceQueueDao().countByStatus(dbTester.getSession(), CeQueueDto.Status.PENDING)).isEqualTo(2); + assertThat(dbTester.getDbClient().ceQueueDao().countByStatus(dbTester.getSession(), CeQueueDto.Status.IN_PROGRESS)).isEqualTo(0); + } + + @Test + public void start_clears_queue_if_version_upgrade() { + when(serverUpgradeStatus.isUpgraded()).thenReturn(true); + + underTest.start(); + + verify(queue).clear(); + } + + @Test + public void start_deletes_orphan_report_files() throws Exception { + // analysis reports are persisted but the associated + // task is not in the queue + insertInQueue("TASK_1", CeQueueDto.Status.PENDING); + insertTaskData("TASK_1"); + insertTaskData("TASK_2"); + + underTest.start(); + + CeTaskInputDao dataDao = dbTester.getDbClient().ceTaskInputDao(); + Optional<CeTaskInputDao.DataStream> task1Data = dataDao.selectData(dbTester.getSession(), "TASK_1"); + assertThat(task1Data).isPresent(); + task1Data.get().close(); + + assertThat(dataDao.selectData(dbTester.getSession(), "TASK_2")).isNotPresent(); + } + + private CeQueueDto insertInQueue(String taskUuid, CeQueueDto.Status status) throws IOException { + CeQueueDto dto = new CeQueueDto(); + dto.setTaskType(CeTaskTypes.REPORT); + dto.setComponentUuid("PROJECT_1"); + dto.setUuid(taskUuid); + dto.setStatus(status); + dbTester.getDbClient().ceQueueDao().insert(dbTester.getSession(), dto); + dbTester.getSession().commit(); + return dto; + } + + private void insertTaskData(String taskUuid) throws IOException { + dbTester.getDbClient().ceTaskInputDao().insert(dbTester.getSession(), taskUuid, IOUtils.toInputStream("{binary}")); + dbTester.getSession().commit(); + } +} diff --git a/server/sonar-ce/src/test/java/org/sonar/ce/queue/CeQueueInitializerTest.java b/server/sonar-ce/src/test/java/org/sonar/ce/queue/CeQueueInitializerTest.java new file mode 100644 index 00000000000..9595e966583 --- /dev/null +++ b/server/sonar-ce/src/test/java/org/sonar/ce/queue/CeQueueInitializerTest.java @@ -0,0 +1,57 @@ +/* + * SonarQube + * Copyright (C) 2009-2017 SonarSource SA + * mailto:info AT sonarsource DOT com + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 3 of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with this program; if not, write to the Free Software Foundation, + * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ +package org.sonar.ce.queue; + +import java.io.IOException; +import org.junit.Test; +import org.sonar.api.platform.Server; +import org.sonar.ce.taskprocessor.CeProcessingScheduler; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.reset; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyZeroInteractions; + +public class CeQueueInitializerTest { + + private Server server = mock(Server.class); + private CeProcessingScheduler scheduler = mock(CeProcessingScheduler.class); + private CeQueueInitializer underTest = new CeQueueInitializer(scheduler); + + @Test + public void clean_queue_then_start_scheduler_of_workers() throws IOException { + underTest.onServerStart(server); + + verify(scheduler).startScheduling(); + } + + @Test + public void onServerStart_has_no_effect_if_called_twice_to_support_medium_test_doing_startup_tasks_multiple_times() { + + underTest.onServerStart(server); + + reset(scheduler); + + underTest.onServerStart(server); + + verifyZeroInteractions(scheduler); + + } +} diff --git a/server/sonar-ce/src/test/java/org/sonar/ce/queue/InternalCeQueueImplTest.java b/server/sonar-ce/src/test/java/org/sonar/ce/queue/InternalCeQueueImplTest.java new file mode 100644 index 00000000000..c5a537880b0 --- /dev/null +++ b/server/sonar-ce/src/test/java/org/sonar/ce/queue/InternalCeQueueImplTest.java @@ -0,0 +1,550 @@ +/* + * SonarQube + * Copyright (C) 2009-2017 SonarSource SA + * mailto:info AT sonarsource DOT com + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 3 of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with this program; if not, write to the Free Software Foundation, + * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ +package org.sonar.ce.queue; + +import com.google.common.base.Optional; +import java.io.ByteArrayOutputStream; +import java.io.PrintStream; +import java.util.List; +import java.util.Random; +import javax.annotation.Nullable; +import org.assertj.guava.api.Assertions; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.sonar.api.utils.System2; +import org.sonar.api.utils.internal.TestSystem2; +import org.sonar.ce.monitoring.CEQueueStatus; +import org.sonar.core.util.UuidFactory; +import org.sonar.core.util.UuidFactoryImpl; +import org.sonar.db.DbSession; +import org.sonar.db.DbTester; +import org.sonar.db.ce.CeActivityDto; +import org.sonar.db.ce.CeQueueDto; +import org.sonar.db.ce.CeTaskTypes; +import org.sonar.db.component.ComponentDto; +import org.sonar.db.component.ComponentTesting; +import org.sonar.db.organization.OrganizationDto; +import org.sonar.server.computation.monitoring.CEQueueStatusImpl; +import org.sonar.server.organization.DefaultOrganization; +import org.sonar.server.organization.DefaultOrganizationProvider; + +import static java.util.Arrays.asList; +import static org.assertj.core.api.Assertions.assertThat; +import static org.hamcrest.Matchers.*; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class InternalCeQueueImplTest { + + private static final String AN_ANALYSIS_UUID = "U1"; + private static final String WORKER_UUID_1 = "worker uuid 1"; + private static final String WORKER_UUID_2 = "worker uuid 2"; + + private System2 system2 = new TestSystem2().setNow(1_450_000_000_000L); + + @Rule + public ExpectedException expectedException = ExpectedException.none(); + @Rule + public DbTester dbTester = DbTester.create(system2); + + private DbSession session = dbTester.getSession(); + + private UuidFactory uuidFactory = UuidFactoryImpl.INSTANCE; + private CEQueueStatus queueStatus = new CEQueueStatusImpl(dbTester.getDbClient()); + private DefaultOrganizationProvider defaultOrganizationProvider = mock(DefaultOrganizationProvider.class); + private InternalCeQueue underTest = new InternalCeQueueImpl(system2, dbTester.getDbClient(), uuidFactory, queueStatus, defaultOrganizationProvider); + + @Before + public void setUp() throws Exception { + OrganizationDto defaultOrganization = dbTester.getDefaultOrganization(); + when(defaultOrganizationProvider.get()).thenReturn(DefaultOrganization.newBuilder() + .setUuid(defaultOrganization.getUuid()) + .setKey(defaultOrganization.getKey()) + .setName(defaultOrganization.getName()) + .setCreatedAt(defaultOrganization.getCreatedAt()) + .setUpdatedAt(defaultOrganization.getUpdatedAt()) + .build()); + } + + @Test + public void submit_returns_task_populated_from_CeTaskSubmit_and_creates_CeQueue_row() { + CeTaskSubmit taskSubmit = createTaskSubmit(CeTaskTypes.REPORT, "PROJECT_1", "rob"); + CeTask task = underTest.submit(taskSubmit); + + verifyCeTask(taskSubmit, task, null); + verifyCeQueueDtoForTaskSubmit(taskSubmit); + } + + @Test + public void submit_populates_component_name_and_key_of_CeTask_if_component_exists() { + ComponentDto componentDto = insertComponent(newComponentDto("PROJECT_1")); + CeTaskSubmit taskSubmit = createTaskSubmit(CeTaskTypes.REPORT, componentDto.uuid(), null); + + CeTask task = underTest.submit(taskSubmit); + + verifyCeTask(taskSubmit, task, componentDto); + } + + @Test + public void submit_returns_task_without_component_info_when_submit_has_none() { + CeTaskSubmit taskSubmit = createTaskSubmit("not cpt related"); + + CeTask task = underTest.submit(taskSubmit); + + verifyCeTask(taskSubmit, task, null); + } + + @Test + public void submit_fails_with_ISE_if_paused() { + underTest.pauseSubmit(); + + expectedException.expect(IllegalStateException.class); + expectedException.expectMessage("Compute Engine does not currently accept new tasks"); + + submit(CeTaskTypes.REPORT, "PROJECT_1"); + } + + @Test + public void massSubmit_returns_tasks_for_each_CeTaskSubmit_populated_from_CeTaskSubmit_and_creates_CeQueue_row_for_each() { + CeTaskSubmit taskSubmit1 = createTaskSubmit(CeTaskTypes.REPORT, "PROJECT_1", "rob"); + CeTaskSubmit taskSubmit2 = createTaskSubmit("some type"); + + List<CeTask> tasks = underTest.massSubmit(asList(taskSubmit1, taskSubmit2)); + + assertThat(tasks).hasSize(2); + verifyCeTask(taskSubmit1, tasks.get(0), null); + verifyCeTask(taskSubmit2, tasks.get(1), null); + verifyCeQueueDtoForTaskSubmit(taskSubmit1); + verifyCeQueueDtoForTaskSubmit(taskSubmit2); + } + + @Test + public void massSubmit_populates_component_name_and_key_of_CeTask_if_component_exists() { + ComponentDto componentDto1 = insertComponent(newComponentDto("PROJECT_1")); + CeTaskSubmit taskSubmit1 = createTaskSubmit(CeTaskTypes.REPORT, componentDto1.uuid(), null); + CeTaskSubmit taskSubmit2 = createTaskSubmit("something", "non existing component uuid", null); + + List<CeTask> tasks = underTest.massSubmit(asList(taskSubmit1, taskSubmit2)); + + assertThat(tasks).hasSize(2); + verifyCeTask(taskSubmit1, tasks.get(0), componentDto1); + verifyCeTask(taskSubmit2, tasks.get(1), null); + } + + @Test + public void peek_throws_NPE_if_workerUUid_is_null() { + expectedException.expect(NullPointerException.class); + expectedException.expectMessage("workerUuid can't be null"); + + underTest.peek(null); + } + + @Test + public void test_remove() { + CeTask task = submit(CeTaskTypes.REPORT, "PROJECT_1"); + Optional<CeTask> peek = underTest.peek(WORKER_UUID_1); + underTest.remove(peek.get(), CeActivityDto.Status.SUCCESS, null, null); + + // queue is empty + assertThat(dbTester.getDbClient().ceQueueDao().selectByUuid(dbTester.getSession(), task.getUuid()).isPresent()).isFalse(); + assertThat(underTest.peek(WORKER_UUID_2).isPresent()).isFalse(); + + // available in history + Optional<CeActivityDto> history = dbTester.getDbClient().ceActivityDao().selectByUuid(dbTester.getSession(), task.getUuid()); + assertThat(history.isPresent()).isTrue(); + assertThat(history.get().getStatus()).isEqualTo(CeActivityDto.Status.SUCCESS); + assertThat(history.get().getIsLast()).isTrue(); + assertThat(history.get().getAnalysisUuid()).isNull(); + } + + @Test + public void remove_throws_IAE_if_exception_is_provided_but_status_is_SUCCESS() { + expectedException.expect(IllegalArgumentException.class); + expectedException.expectMessage("Error can be provided only when status is FAILED"); + + underTest.remove(mock(CeTask.class), CeActivityDto.Status.SUCCESS, null, new RuntimeException("Some error")); + } + + @Test + public void remove_throws_IAE_if_exception_is_provided_but_status_is_CANCELED() { + expectedException.expect(IllegalArgumentException.class); + expectedException.expectMessage("Error can be provided only when status is FAILED"); + + underTest.remove(mock(CeTask.class), CeActivityDto.Status.CANCELED, null, new RuntimeException("Some error")); + } + + @Test + public void remove_does_not_set_analysisUuid_in_CeActivity_when_CeTaskResult_has_no_analysis_uuid() { + CeTask task = submit(CeTaskTypes.REPORT, "PROJECT_1"); + Optional<CeTask> peek = underTest.peek(WORKER_UUID_1); + underTest.remove(peek.get(), CeActivityDto.Status.SUCCESS, newTaskResult(null), null); + + // available in history + Optional<CeActivityDto> history = dbTester.getDbClient().ceActivityDao().selectByUuid(dbTester.getSession(), task.getUuid()); + assertThat(history.isPresent()).isTrue(); + assertThat(history.get().getAnalysisUuid()).isNull(); + } + + @Test + public void remove_sets_analysisUuid_in_CeActivity_when_CeTaskResult_has_analysis_uuid() { + CeTask task = submit(CeTaskTypes.REPORT, "PROJECT_1"); + + Optional<CeTask> peek = underTest.peek(WORKER_UUID_2); + underTest.remove(peek.get(), CeActivityDto.Status.SUCCESS, newTaskResult(AN_ANALYSIS_UUID), null); + + // available in history + Optional<CeActivityDto> history = dbTester.getDbClient().ceActivityDao().selectByUuid(dbTester.getSession(), task.getUuid()); + assertThat(history.isPresent()).isTrue(); + assertThat(history.get().getAnalysisUuid()).isEqualTo("U1"); + } + + @Test + public void remove_saves_error_message_and_stacktrace_when_exception_is_provided() { + Throwable error = new NullPointerException("Fake NPE to test persistence to DB"); + + CeTask task = submit(CeTaskTypes.REPORT, "PROJECT_1"); + Optional<CeTask> peek = underTest.peek(WORKER_UUID_1); + underTest.remove(peek.get(), CeActivityDto.Status.FAILED, null, error); + + Optional<CeActivityDto> activityDto = dbTester.getDbClient().ceActivityDao().selectByUuid(session, task.getUuid()); + Assertions.assertThat(activityDto).isPresent(); + + assertThat(activityDto.get().getErrorMessage()).isEqualTo(error.getMessage()); + assertThat(activityDto.get().getErrorStacktrace()).isEqualToIgnoringWhitespace(stacktraceToString(error)); + } + + @Test + public void remove_copies_executionCount_and_workerUuid() { + dbTester.getDbClient().ceQueueDao().insert(session, new CeQueueDto() + .setUuid("uuid") + .setTaskType("foo") + .setStatus(CeQueueDto.Status.PENDING) + .setWorkerUuid("Dustin") + .setExecutionCount(2)); + dbTester.commit(); + + underTest.remove(new CeTask.Builder() + .setOrganizationUuid("foo") + .setUuid("uuid") + .setType("bar") + .build(), CeActivityDto.Status.SUCCESS, null, null); + + CeActivityDto dto = dbTester.getDbClient().ceActivityDao().selectByUuid(dbTester.getSession(), "uuid").get(); + assertThat(dto.getExecutionCount()).isEqualTo(2); + assertThat(dto.getWorkerUuid()).isEqualTo("Dustin"); + } + + @Test + public void fail_to_remove_if_not_in_queue() throws Exception { + CeTask task = submit(CeTaskTypes.REPORT, "PROJECT_1"); + underTest.remove(task, CeActivityDto.Status.SUCCESS, null, null); + + expectedException.expect(IllegalStateException.class); + + underTest.remove(task, CeActivityDto.Status.SUCCESS, null, null); + } + + @Test + public void test_peek() throws Exception { + CeTask task = submit(CeTaskTypes.REPORT, "PROJECT_1"); + + Optional<CeTask> peek = underTest.peek(WORKER_UUID_1); + assertThat(peek.isPresent()).isTrue(); + assertThat(peek.get().getUuid()).isEqualTo(task.getUuid()); + assertThat(peek.get().getType()).isEqualTo(CeTaskTypes.REPORT); + assertThat(peek.get().getComponentUuid()).isEqualTo("PROJECT_1"); + + // no more pending tasks + peek = underTest.peek(WORKER_UUID_2); + assertThat(peek.isPresent()).isFalse(); + } + + @Test + public void peek_overrides_workerUuid_to_argument() { + dbTester.getDbClient().ceQueueDao().insert(session, new CeQueueDto() + .setUuid("uuid") + .setTaskType("foo") + .setStatus(CeQueueDto.Status.PENDING) + .setWorkerUuid("must be overriden")); + dbTester.commit(); + + underTest.peek(WORKER_UUID_1); + + CeQueueDto ceQueueDto = dbTester.getDbClient().ceQueueDao().selectByUuid(session, "uuid").get(); + assertThat(ceQueueDto.getWorkerUuid()).isEqualTo(WORKER_UUID_1); + } + + @Test + public void peek_nothing_if_paused() throws Exception { + submit(CeTaskTypes.REPORT, "PROJECT_1"); + underTest.pausePeek(); + + Optional<CeTask> peek = underTest.peek(WORKER_UUID_1); + assertThat(peek.isPresent()).isFalse(); + } + + @Test + public void peek_peeks_pending_tasks_with_executionCount_equal_to_0_and_increases_it() { + dbTester.getDbClient().ceQueueDao().insert(session, new CeQueueDto() + .setUuid("uuid") + .setTaskType("foo") + .setStatus(CeQueueDto.Status.PENDING) + .setExecutionCount(0)); + dbTester.commit(); + + assertThat(underTest.peek(WORKER_UUID_1).get().getUuid()).isEqualTo("uuid"); + assertThat(dbTester.getDbClient().ceQueueDao().selectByUuid(session, "uuid").get().getExecutionCount()).isEqualTo(1); + } + + @Test + public void peek_peeks_pending_tasks_with_executionCount_equal_to_1_and_increases_it() { + dbTester.getDbClient().ceQueueDao().insert(session, new CeQueueDto() + .setUuid("uuid") + .setTaskType("foo") + .setStatus(CeQueueDto.Status.PENDING) + .setExecutionCount(1)); + dbTester.commit(); + + assertThat(underTest.peek(WORKER_UUID_1).get().getUuid()).isEqualTo("uuid"); + assertThat(dbTester.getDbClient().ceQueueDao().selectByUuid(session, "uuid").get().getExecutionCount()).isEqualTo(2); + } + + @Test + public void peek_ignores_pending_tasks_with_executionCount_equal_to_2() { + dbTester.getDbClient().ceQueueDao().insert(session, new CeQueueDto() + .setUuid("uuid") + .setTaskType("foo") + .setStatus(CeQueueDto.Status.PENDING) + .setExecutionCount(2)); + dbTester.commit(); + + assertThat(underTest.peek(WORKER_UUID_1).isPresent()).isFalse(); + } + + @Test + public void peek_ignores_pending_tasks_with_executionCount_greater_than_2() { + dbTester.getDbClient().ceQueueDao().insert(session, new CeQueueDto() + .setUuid("uuid") + .setTaskType("foo") + .setStatus(CeQueueDto.Status.PENDING) + .setExecutionCount(2 + Math.abs(new Random().nextInt(100)))); + dbTester.commit(); + + assertThat(underTest.peek(WORKER_UUID_1).isPresent()).isFalse(); + } + + @Test + public void cancel_pending() throws Exception { + CeTask task = submit(CeTaskTypes.REPORT, "PROJECT_1"); + + // ignore + boolean canceled = underTest.cancel("UNKNOWN"); + assertThat(canceled).isFalse(); + + canceled = underTest.cancel(task.getUuid()); + assertThat(canceled).isTrue(); + Optional<CeActivityDto> activity = dbTester.getDbClient().ceActivityDao().selectByUuid(dbTester.getSession(), task.getUuid()); + assertThat(activity.isPresent()).isTrue(); + assertThat(activity.get().getStatus()).isEqualTo(CeActivityDto.Status.CANCELED); + } + + @Test + public void cancel_copies_executionCount_and_workerUuid() { + dbTester.getDbClient().ceQueueDao().insert(session, new CeQueueDto() + .setUuid("uuid") + .setTaskType("foo") + .setStatus(CeQueueDto.Status.PENDING) + .setWorkerUuid("Dustin") + .setExecutionCount(2)); + dbTester.commit(); + + underTest.cancel("uuid"); + + CeActivityDto dto = dbTester.getDbClient().ceActivityDao().selectByUuid(dbTester.getSession(), "uuid").get(); + assertThat(dto.getExecutionCount()).isEqualTo(2); + assertThat(dto.getWorkerUuid()).isEqualTo("Dustin"); + } + + @Test + public void fail_to_cancel_if_in_progress() throws Exception { + expectedException.expect(IllegalStateException.class); + expectedException.expectMessage(startsWith("Task is in progress and can't be canceled")); + + CeTask task = submit(CeTaskTypes.REPORT, "PROJECT_1"); + underTest.peek(WORKER_UUID_2); + + underTest.cancel(task.getUuid()); + } + + @Test + public void cancelAll_pendings_but_not_in_progress() throws Exception { + CeTask inProgressTask = submit(CeTaskTypes.REPORT, "PROJECT_1"); + CeTask pendingTask1 = submit(CeTaskTypes.REPORT, "PROJECT_2"); + CeTask pendingTask2 = submit(CeTaskTypes.REPORT, "PROJECT_3"); + underTest.peek(WORKER_UUID_2); + + int canceledCount = underTest.cancelAll(); + assertThat(canceledCount).isEqualTo(2); + + Optional<CeActivityDto> history = dbTester.getDbClient().ceActivityDao().selectByUuid(dbTester.getSession(), pendingTask1.getUuid()); + assertThat(history.get().getStatus()).isEqualTo(CeActivityDto.Status.CANCELED); + history = dbTester.getDbClient().ceActivityDao().selectByUuid(dbTester.getSession(), pendingTask2.getUuid()); + assertThat(history.get().getStatus()).isEqualTo(CeActivityDto.Status.CANCELED); + history = dbTester.getDbClient().ceActivityDao().selectByUuid(dbTester.getSession(), inProgressTask.getUuid()); + assertThat(history.isPresent()).isFalse(); + } + + @Test + public void cancelWornOuts_cancels_pending_tasks_with_executionCount_greater_or_equal_to_2() { + CeQueueDto u1 = insertCeQueueDto("u1", CeQueueDto.Status.PENDING, 0, "worker1"); + CeQueueDto u2 = insertCeQueueDto("u2", CeQueueDto.Status.PENDING, 1, "worker1"); + CeQueueDto u3 = insertCeQueueDto("u3", CeQueueDto.Status.PENDING, 2, "worker1"); + CeQueueDto u4 = insertCeQueueDto("u4", CeQueueDto.Status.PENDING, 3, "worker1"); + CeQueueDto u5 = insertCeQueueDto("u5", CeQueueDto.Status.IN_PROGRESS, 0, "worker1"); + CeQueueDto u6 = insertCeQueueDto("u6", CeQueueDto.Status.IN_PROGRESS, 1, "worker1"); + CeQueueDto u7 = insertCeQueueDto("u7", CeQueueDto.Status.IN_PROGRESS, 2, "worker1"); + CeQueueDto u8 = insertCeQueueDto("u8", CeQueueDto.Status.IN_PROGRESS, 3, "worker1"); + + underTest.cancelWornOuts(); + + verifyUnmodifiedByCancelWornOuts(u1); + verifyUnmodifiedByCancelWornOuts(u2); + verifyCanceled(u3); + verifyCanceled(u4); + verifyUnmodifiedByCancelWornOuts(u5); + verifyUnmodifiedByCancelWornOuts(u6); + verifyUnmodifiedByCancelWornOuts(u7); + verifyUnmodifiedByCancelWornOuts(u8); + } + + private void verifyUnmodifiedByCancelWornOuts(CeQueueDto original) { + CeQueueDto dto = dbTester.getDbClient().ceQueueDao().selectByUuid(dbTester.getSession(), original.getUuid()).get(); + assertThat(dto.getStatus()).isEqualTo(original.getStatus()); + assertThat(dto.getExecutionCount()).isEqualTo(original.getExecutionCount()); + assertThat(dto.getCreatedAt()).isEqualTo(original.getCreatedAt()); + assertThat(dto.getUpdatedAt()).isEqualTo(original.getUpdatedAt()); + } + + private void verifyCanceled(CeQueueDto original) { + Assertions.assertThat(dbTester.getDbClient().ceQueueDao().selectByUuid(dbTester.getSession(), original.getUuid())).isAbsent(); + CeActivityDto dto = dbTester.getDbClient().ceActivityDao().selectByUuid(dbTester.getSession(), original.getUuid()).get(); + assertThat(dto.getStatus()).isEqualTo(CeActivityDto.Status.CANCELED); + assertThat(dto.getExecutionCount()).isEqualTo(original.getExecutionCount()); + assertThat(dto.getWorkerUuid()).isEqualTo(original.getWorkerUuid()); + } + + private CeQueueDto insertCeQueueDto(String uuid, CeQueueDto.Status status, int executionCount, String workerUuid) { + CeQueueDto dto = new CeQueueDto() + .setUuid(uuid) + .setTaskType("foo") + .setStatus(status) + .setExecutionCount(executionCount) + .setWorkerUuid(workerUuid); + dbTester.getDbClient().ceQueueDao().insert(dbTester.getSession(), dto); + dbTester.commit(); + return dto; + } + + @Test + public void pause_and_resume_submits() throws Exception { + assertThat(underTest.isSubmitPaused()).isFalse(); + underTest.pauseSubmit(); + assertThat(underTest.isSubmitPaused()).isTrue(); + underTest.resumeSubmit(); + assertThat(underTest.isSubmitPaused()).isFalse(); + } + + @Test + public void pause_and_resume_peeks() throws Exception { + assertThat(underTest.isPeekPaused()).isFalse(); + underTest.pausePeek(); + assertThat(underTest.isPeekPaused()).isTrue(); + underTest.resumePeek(); + assertThat(underTest.isPeekPaused()).isFalse(); + } + + private void verifyCeTask(CeTaskSubmit taskSubmit, CeTask task, @Nullable ComponentDto componentDto) { + if (componentDto == null) { + assertThat(task.getOrganizationUuid()).isEqualTo(defaultOrganizationProvider.get().getUuid()); + } else { + assertThat(task.getOrganizationUuid()).isEqualTo(componentDto.getOrganizationUuid()); + } + assertThat(task.getUuid()).isEqualTo(taskSubmit.getUuid()); + assertThat(task.getComponentUuid()).isEqualTo(task.getComponentUuid()); + assertThat(task.getType()).isEqualTo(taskSubmit.getType()); + if (componentDto == null) { + assertThat(task.getComponentKey()).isNull(); + assertThat(task.getComponentName()).isNull(); + } else { + assertThat(task.getComponentKey()).isEqualTo(componentDto.key()); + assertThat(task.getComponentName()).isEqualTo(componentDto.name()); + } + assertThat(task.getSubmitterLogin()).isEqualTo(taskSubmit.getSubmitterLogin()); + } + + private void verifyCeQueueDtoForTaskSubmit(CeTaskSubmit taskSubmit) { + Optional<CeQueueDto> queueDto = dbTester.getDbClient().ceQueueDao().selectByUuid(dbTester.getSession(), taskSubmit.getUuid()); + assertThat(queueDto.isPresent()).isTrue(); + assertThat(queueDto.get().getTaskType()).isEqualTo(taskSubmit.getType()); + assertThat(queueDto.get().getComponentUuid()).isEqualTo(taskSubmit.getComponentUuid()); + assertThat(queueDto.get().getSubmitterLogin()).isEqualTo(taskSubmit.getSubmitterLogin()); + assertThat(queueDto.get().getCreatedAt()).isEqualTo(1_450_000_000_000L); + } + + private ComponentDto newComponentDto(String uuid) { + return ComponentTesting.newProjectDto(dbTester.getDefaultOrganization(), uuid).setName("name_" + uuid).setKey("key_" + uuid); + } + + private CeTask submit(String reportType, String componentUuid) { + return underTest.submit(createTaskSubmit(reportType, componentUuid, null)); + } + + private CeTaskSubmit createTaskSubmit(String type) { + return createTaskSubmit(type, null, null); + } + + private CeTaskSubmit createTaskSubmit(String type, @Nullable String componentUuid, @Nullable String submitterLogin) { + CeTaskSubmit.Builder submission = underTest.prepareSubmit(); + submission.setType(type); + submission.setComponentUuid(componentUuid); + submission.setSubmitterLogin(submitterLogin); + return submission.build(); + } + + private CeTaskResult newTaskResult(@Nullable String analysisUuid) { + CeTaskResult taskResult = mock(CeTaskResult.class); + when(taskResult.getAnalysisUuid()).thenReturn(java.util.Optional.ofNullable(analysisUuid)); + return taskResult; + } + + private ComponentDto insertComponent(ComponentDto componentDto) { + dbTester.getDbClient().componentDao().insert(session, componentDto); + session.commit(); + return componentDto; + } + + private static String stacktraceToString(Throwable error) { + ByteArrayOutputStream out = new ByteArrayOutputStream(); + error.printStackTrace(new PrintStream(out)); + return out.toString(); + } +} diff --git a/server/sonar-ce/src/test/java/org/sonar/ce/queue/PurgeCeActivitiesTest.java b/server/sonar-ce/src/test/java/org/sonar/ce/queue/PurgeCeActivitiesTest.java new file mode 100644 index 00000000000..1fd2316eaf5 --- /dev/null +++ b/server/sonar-ce/src/test/java/org/sonar/ce/queue/PurgeCeActivitiesTest.java @@ -0,0 +1,66 @@ +/* + * SonarQube + * Copyright (C) 2009-2017 SonarSource SA + * mailto:info AT sonarsource DOT com + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 3 of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with this program; if not, write to the Free Software Foundation, + * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ +package org.sonar.ce.queue; + +import org.junit.Rule; +import org.junit.Test; +import org.sonar.api.utils.System2; +import org.sonar.db.DbTester; +import org.sonar.db.ce.CeActivityDto; +import org.sonar.db.ce.CeQueueDto; +import org.sonar.db.ce.CeTaskTypes; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.when; + +public class PurgeCeActivitiesTest { + + private System2 system2 = spy(System2.INSTANCE); + + @Rule + public DbTester dbTester = DbTester.create(system2); + + private PurgeCeActivities underTest = new PurgeCeActivities(dbTester.getDbClient(), system2); + + @Test + public void delete_older_than_6_months() throws Exception { + insertWithDate("VERY_OLD", 1_000_000_000_000L); + insertWithDate("RECENT", 1_500_000_000_000L); + when(system2.now()).thenReturn(1_500_000_000_100L); + + underTest.start(); + + assertThat(dbTester.getDbClient().ceActivityDao().selectByUuid(dbTester.getSession(), "VERY_OLD").isPresent()).isFalse(); + assertThat(dbTester.getDbClient().ceActivityDao().selectByUuid(dbTester.getSession(), "RECENT").isPresent()).isTrue(); + } + + private void insertWithDate(String uuid, long date) { + CeQueueDto queueDto = new CeQueueDto(); + queueDto.setUuid(uuid); + queueDto.setTaskType(CeTaskTypes.REPORT); + + CeActivityDto dto = new CeActivityDto(queueDto); + dto.setStatus(CeActivityDto.Status.SUCCESS); + when(system2.now()).thenReturn(date); + dbTester.getDbClient().ceActivityDao().insert(dbTester.getSession(), dto); + dbTester.getSession().commit(); + } +} diff --git a/server/sonar-ce/src/test/java/org/sonar/ce/taskprocessor/CeProcessingSchedulerImplTest.java b/server/sonar-ce/src/test/java/org/sonar/ce/taskprocessor/CeProcessingSchedulerImplTest.java new file mode 100644 index 00000000000..d3480186bd2 --- /dev/null +++ b/server/sonar-ce/src/test/java/org/sonar/ce/taskprocessor/CeProcessingSchedulerImplTest.java @@ -0,0 +1,542 @@ +/* + * SonarQube + * Copyright (C) 2009-2017 SonarSource SA + * mailto:info AT sonarsource DOT com + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 3 of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with this program; if not, write to the Free Software Foundation, + * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ +package org.sonar.ce.taskprocessor; + +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListenableScheduledFuture; +import com.google.common.util.concurrent.ListeningScheduledExecutorService; +import com.google.common.util.concurrent.MoreExecutors; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Objects; +import java.util.Queue; +import java.util.Random; +import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.Delayed; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import javax.annotation.Nullable; +import javax.annotation.concurrent.Immutable; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.Timeout; +import org.sonar.ce.configuration.CeConfigurationRule; + +import static java.util.concurrent.TimeUnit.MILLISECONDS; +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +public class CeProcessingSchedulerImplTest { + private static final Error ERROR_TO_INTERRUPT_CHAINING = new Error("Error should stop scheduling"); + + @Rule + // due to risks of infinite chaining of tasks/futures, a timeout is required for safety + public Timeout timeout = Timeout.seconds(60); + @Rule + public CeConfigurationRule ceConfiguration = new CeConfigurationRule(); + + private CeWorkerCallable ceWorkerRunnable = mock(CeWorkerCallable.class); + private StubCeProcessingSchedulerExecutorService processingExecutorService = new StubCeProcessingSchedulerExecutorService(); + private SchedulerCall regularDelayedPoll = new SchedulerCall(ceWorkerRunnable, 2000L, TimeUnit.MILLISECONDS); + private SchedulerCall notDelayedPoll = new SchedulerCall(ceWorkerRunnable); + + private CeProcessingSchedulerImpl underTest = new CeProcessingSchedulerImpl(ceConfiguration, processingExecutorService, ceWorkerRunnable); + + @Test + public void polls_without_delay_when_CeWorkerCallable_returns_true() throws Exception { + when(ceWorkerRunnable.call()) + .thenReturn(true) + .thenThrow(ERROR_TO_INTERRUPT_CHAINING); + + startSchedulingAndRun(); + + assertThat(processingExecutorService.getSchedulerCalls()).containsOnly( + regularDelayedPoll, + notDelayedPoll + ); + } + + @Test + public void polls_without_delay_when_CeWorkerCallable_throws_Exception_but_not_Error() throws Exception { + when(ceWorkerRunnable.call()) + .thenThrow(new Exception("Exception is followed by a poll without delay")) + .thenThrow(ERROR_TO_INTERRUPT_CHAINING); + + startSchedulingAndRun(); + + assertThat(processingExecutorService.getSchedulerCalls()).containsExactly( + regularDelayedPoll, + notDelayedPoll + ); + } + + @Test + public void polls_with_regular_delay_when_CeWorkerCallable_returns_false() throws Exception { + when(ceWorkerRunnable.call()) + .thenReturn(false) + .thenThrow(ERROR_TO_INTERRUPT_CHAINING); + + startSchedulingAndRun(); + + assertThat(processingExecutorService.getSchedulerCalls()).containsExactly( + regularDelayedPoll, + regularDelayedPoll + ); + } + + @Test + public void startScheduling_schedules_CeWorkerCallable_at_fixed_rate_run_head_of_queue() throws Exception { + when(ceWorkerRunnable.call()) + .thenReturn(true) + .thenReturn(true) + .thenReturn(false) + .thenReturn(true) + .thenReturn(false) + .thenThrow(new Exception("IAE should not cause scheduling to stop")) + .thenReturn(false) + .thenReturn(false) + .thenReturn(false) + .thenThrow(ERROR_TO_INTERRUPT_CHAINING); + + startSchedulingAndRun(); + + assertThat(processingExecutorService.getSchedulerCalls()).containsExactly( + regularDelayedPoll, + notDelayedPoll, + notDelayedPoll, + regularDelayedPoll, + notDelayedPoll, + regularDelayedPoll, + notDelayedPoll, + regularDelayedPoll, + regularDelayedPoll, + regularDelayedPoll + ); + } + + @Test + public void stop_cancels_next_polling_and_does_not_add_any_new_one() throws Exception { + when(ceWorkerRunnable.call()) + .thenReturn(false) + .thenReturn(true) + .thenReturn(false) + .thenReturn(false) + .thenReturn(false) + .thenReturn(false) + .thenReturn(false) + .thenThrow(ERROR_TO_INTERRUPT_CHAINING); + + underTest.startScheduling(); + + int cancelledTaskFutureCount = 0; + int i = 0; + while (processingExecutorService.futures.peek() != null) { + Future<?> future = processingExecutorService.futures.poll(); + if (future.isCancelled()) { + cancelledTaskFutureCount++; + } else { + future.get(); + } + // call stop after second delayed polling + if (i == 1) { + underTest.stop(); + } + i++; + } + + assertThat(cancelledTaskFutureCount).isEqualTo(1); + assertThat(processingExecutorService.getSchedulerCalls()).containsExactly( + regularDelayedPoll, + regularDelayedPoll, + notDelayedPoll, + regularDelayedPoll + ); + } + + @Test + public void when_workerCount_is_more_than_1_as_many_CeWorkerCallable_are_scheduled() throws InterruptedException { + int workerCount = Math.abs(new Random().nextInt(10)) + 1; + + ceConfiguration.setWorkerCount(workerCount); + + ListenableScheduledFuture listenableScheduledFuture = mock(ListenableScheduledFuture.class); + CeProcessingSchedulerExecutorService processingExecutorService = mock(CeProcessingSchedulerExecutorService.class); + CeProcessingSchedulerImpl underTest = new CeProcessingSchedulerImpl(ceConfiguration, processingExecutorService, ceWorkerRunnable); + when(processingExecutorService.schedule(ceWorkerRunnable, ceConfiguration.getQueuePollingDelay(), MILLISECONDS)) + .thenReturn(listenableScheduledFuture); + + underTest.startScheduling(); + + verify(processingExecutorService, times(workerCount)).schedule(ceWorkerRunnable, ceConfiguration.getQueuePollingDelay(), MILLISECONDS); + verify(listenableScheduledFuture, times(workerCount)).addListener(any(Runnable.class), eq(processingExecutorService)); + } + + private void startSchedulingAndRun() throws ExecutionException, InterruptedException { + underTest.startScheduling(); + + // execute future synchronously + processingExecutorService.runFutures(); + } + + /** + * A synchronous implementation of {@link CeProcessingSchedulerExecutorService} which exposes a synchronous + * method to execute futures it creates and exposes a method to retrieve logs of calls to + * {@link CeProcessingSchedulerExecutorService#schedule(Callable, long, TimeUnit)} which is used by + * {@link CeProcessingSchedulerImpl}. + */ + private static class StubCeProcessingSchedulerExecutorService implements CeProcessingSchedulerExecutorService { + + private final Queue<Future<?>> futures = new ConcurrentLinkedQueue<>(); + private final ListeningScheduledExecutorService delegate = MoreExecutors.listeningDecorator(new SynchronousStubExecutorService()); + + private final List<SchedulerCall> schedulerCalls = new ArrayList<>(); + + public List<SchedulerCall> getSchedulerCalls() { + return schedulerCalls; + } + + public void runFutures() throws ExecutionException, InterruptedException { + while (futures.peek() != null) { + Future<?> future = futures.poll(); + if (!future.isCancelled()) { + future.get(); + } + } + } + + @Override + public <V> ListenableScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) { + this.schedulerCalls.add(new SchedulerCall(callable, delay, unit)); + return delegate.schedule(callable, delay, unit); + } + + @Override + public <T> ListenableFuture<T> submit(Callable<T> task) { + this.schedulerCalls.add(new SchedulerCall(task)); + return delegate.submit(task); + } + + @Override + public void stop() { + throw new UnsupportedOperationException("stop() not implemented"); + } + + // ////////////// delegated methods //////////////// + + @Override + public ListenableScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) { + return delegate.schedule(command, delay, unit); + } + + @Override + public ListenableScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) { + return delegate.scheduleAtFixedRate(command, initialDelay, period, unit); + } + + @Override + public ListenableScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) { + return delegate.scheduleWithFixedDelay(command, initialDelay, delay, unit); + } + + @Override + public void shutdown() { + delegate.shutdown(); + } + + @Override + public List<Runnable> shutdownNow() { + return delegate.shutdownNow(); + } + + @Override + public boolean isShutdown() { + return delegate.isShutdown(); + } + + @Override + public boolean isTerminated() { + return delegate.isTerminated(); + } + + @Override + public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { + return delegate.awaitTermination(timeout, unit); + } + + @Override + public <T> ListenableFuture<T> submit(Runnable task, T result) { + return delegate.submit(task, result); + } + + @Override + public ListenableFuture<?> submit(Runnable task) { + return delegate.submit(task); + } + + @Override + public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException { + return delegate.invokeAll(tasks); + } + + @Override + public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException { + return delegate.invokeAll(tasks, timeout, unit); + } + + @Override + public <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException { + return delegate.invokeAny(tasks); + } + + @Override + public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { + return delegate.invokeAny(tasks, timeout, unit); + } + + @Override + public void execute(Runnable command) { + delegate.execute(command); + } + + /** + * A partial (only 3 methods) implementation of ScheduledExecutorService which stores futures it creates into + * {@link StubCeProcessingSchedulerExecutorService#futures}. + */ + private class SynchronousStubExecutorService implements ScheduledExecutorService { + @Override + public ScheduledFuture<?> schedule(final Runnable command, long delay, TimeUnit unit) { + ScheduledFuture<Void> res = new AbstractPartiallyImplementedScheduledFuture<Void>() { + @Override + public Void get() throws InterruptedException, ExecutionException { + command.run(); + return null; + } + }; + futures.add(res); + return res; + } + + @Override + public <V> ScheduledFuture<V> schedule(final Callable<V> callable, long delay, TimeUnit unit) { + ScheduledFuture<V> res = new AbstractPartiallyImplementedScheduledFuture<V>() { + + @Override + public V get() throws InterruptedException, ExecutionException { + try { + return callable.call(); + } catch (Exception e) { + throw new ExecutionException(e); + } + } + }; + futures.add(res); + return res; + } + + @Override + public <T> Future<T> submit(final Callable<T> task) { + Future<T> res = new AbstractPartiallyImplementedFuture<T>() { + + @Override + public T get() throws InterruptedException, ExecutionException { + try { + return task.call(); + } catch (Exception e) { + throw new ExecutionException(e); + } + } + + }; + futures.add(res); + return res; + } + + @Override + public void execute(Runnable command) { + command.run(); + } + + // ///////// unsupported operations /////////// + + @Override + public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) { + throw new UnsupportedOperationException("scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) not implemented"); + } + + @Override + public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) { + throw new UnsupportedOperationException("scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) not implemented"); + } + + @Override + public void shutdown() { + throw new UnsupportedOperationException("shutdown() not implemented"); + } + + @Override + public List<Runnable> shutdownNow() { + throw new UnsupportedOperationException("shutdownNow() not implemented"); + } + + @Override + public boolean isShutdown() { + throw new UnsupportedOperationException("isShutdown() not implemented"); + } + + @Override + public boolean isTerminated() { + throw new UnsupportedOperationException("isTerminated() not implemented"); + } + + @Override + public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { + throw new UnsupportedOperationException("awaitTermination(long timeout, TimeUnit unit) not implemented"); + } + + @Override + public <T> Future<T> submit(Runnable task, T result) { + throw new UnsupportedOperationException("submit(Runnable task, T result) not implemented"); + } + + @Override + public Future<?> submit(Runnable task) { + throw new UnsupportedOperationException("submit(Runnable task) not implemented"); + } + + @Override + public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException { + throw new UnsupportedOperationException("invokeAll(Collection<? extends Callable<T>> tasks) not implemented"); + } + + @Override + public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException { + throw new UnsupportedOperationException("invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) not implemented"); + } + + @Override + public <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException { + throw new UnsupportedOperationException("invokeAny(Collection<? extends Callable<T>> tasks) not implemented"); + } + + @Override + public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { + throw new UnsupportedOperationException("invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) not implemented"); + } + } + } + + private static abstract class AbstractPartiallyImplementedScheduledFuture<V> extends AbstractPartiallyImplementedFuture<V> implements ScheduledFuture<V> { + @Override + public long getDelay(TimeUnit unit) { + throw new UnsupportedOperationException("getDelay(TimeUnit unit) not implemented"); + } + + @Override + public int compareTo(Delayed o) { + throw new UnsupportedOperationException("compareTo(Delayed o) not implemented"); + } + + } + + private static abstract class AbstractPartiallyImplementedFuture<T> implements Future<T> { + private boolean cancelled = false; + + @Override + public boolean cancel(boolean mayInterruptIfRunning) { + this.cancelled = true; + return true; + } + + @Override + public boolean isCancelled() { + return this.cancelled; + } + + @Override + public boolean isDone() { + throw new UnsupportedOperationException("isDone() not implemented"); + } + + @Override + public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { + throw new UnsupportedOperationException("get(long timeout, TimeUnit unit) not implemented"); + } + } + + /** + * Used to log parameters of calls to {@link CeProcessingSchedulerExecutorService#schedule(Callable, long, TimeUnit)} + */ + @Immutable + private static final class SchedulerCall { + private final Callable<?> callable; + private final long delay; + private final TimeUnit unit; + + private SchedulerCall(Callable<?> callable, long delay, TimeUnit unit) { + this.callable = callable; + this.delay = delay; + this.unit = unit; + } + + private SchedulerCall(Callable<?> callable) { + this.callable = callable; + this.delay = -63366; + this.unit = TimeUnit.NANOSECONDS; + } + + @Override + public boolean equals(@Nullable Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + SchedulerCall that = (SchedulerCall) o; + return delay == that.delay && callable == that.callable && unit.equals(that.unit); + } + + @Override + public int hashCode() { + return Objects.hash(callable, delay, unit); + } + + @Override + public String toString() { + return "SchedulerCall{" + + "callable=" + callable + + ", delay=" + delay + + ", unit=" + unit + + '}'; + } + } + +} diff --git a/server/sonar-ce/src/test/java/org/sonar/ce/taskprocessor/CeTaskProcessorRepositoryImplTest.java b/server/sonar-ce/src/test/java/org/sonar/ce/taskprocessor/CeTaskProcessorRepositoryImplTest.java new file mode 100644 index 00000000000..743f51e169d --- /dev/null +++ b/server/sonar-ce/src/test/java/org/sonar/ce/taskprocessor/CeTaskProcessorRepositoryImplTest.java @@ -0,0 +1,148 @@ +/* + * SonarQube + * Copyright (C) 2009-2017 SonarSource SA + * mailto:info AT sonarsource DOT com + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 3 of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with this program; if not, write to the Free Software Foundation, + * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ +package org.sonar.ce.taskprocessor; + +import com.google.common.collect.ImmutableSet; +import java.util.Set; +import org.assertj.guava.api.Assertions; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.sonar.ce.queue.CeTask; +import org.sonar.ce.queue.CeTaskResult; + +import static org.assertj.core.api.Assertions.assertThat; + + +public class CeTaskProcessorRepositoryImplTest { + private static final String SOME_CE_TASK_TYPE = "some type"; + private static final String SOME_COMPONENT_KEY = "key"; + + @Rule + public ExpectedException expectedException = ExpectedException.none(); + + @Test + public void constructor_accepts_empty_array_argument() { + new CeTaskProcessorRepositoryImpl(new CeTaskProcessor[] {}); + } + + @Test + public void constructor_throws_IAE_if_two_TaskProcessor_handle_the_same_CeTask_type() { + expectedException.expect(IllegalArgumentException.class); + expectedException.expectMessage( + "There can be only one CeTaskProcessor instance registered as the processor for CeTask type " + SOME_CE_TASK_TYPE + ". " + + "More than one found. Please fix your configuration: " + SomeProcessor1.class.getName() + ", " + SomeProcessor2.class.getName()); + + new CeTaskProcessorRepositoryImpl(new CeTaskProcessor[] { + new SomeProcessor1(SOME_CE_TASK_TYPE), + new SomeProcessor2(SOME_CE_TASK_TYPE) + }); + } + + @Test + public void constructor_throws_IAE_if_multiple_TaskProcessor_overlap_their_supported_CeTask_type() { + expectedException.expect(IllegalArgumentException.class); + expectedException.expectMessage( + "There can be only one CeTaskProcessor instance registered as the processor for CeTask type " + SOME_CE_TASK_TYPE + ". " + + "More than one found. Please fix your configuration: " + SomeProcessor1.class.getName() + ", " + SomeProcessor2.class.getName()); + + new CeTaskProcessorRepositoryImpl(new CeTaskProcessor[] { + new SomeProcessor2(SOME_CE_TASK_TYPE + "_2", SOME_CE_TASK_TYPE), + new SomeProcessor1(SOME_CE_TASK_TYPE, SOME_CE_TASK_TYPE + "_3") + }); + } + + @Test + public void getForTask_returns_absent_if_repository_is_empty() { + CeTaskProcessorRepositoryImpl underTest = new CeTaskProcessorRepositoryImpl(new CeTaskProcessor[] {}); + + Assertions.assertThat(underTest.getForCeTask(createCeTask(SOME_CE_TASK_TYPE, SOME_COMPONENT_KEY))).isAbsent(); + } + + @Test + public void getForTask_returns_absent_if_repository_does_not_contain_matching_TaskProcessor() { + CeTaskProcessorRepositoryImpl underTest = new CeTaskProcessorRepositoryImpl(new CeTaskProcessor[] { + createCeTaskProcessor(SOME_CE_TASK_TYPE + "_1"), + createCeTaskProcessor(SOME_CE_TASK_TYPE + "_2", SOME_CE_TASK_TYPE + "_3"), + }); + + Assertions.assertThat(underTest.getForCeTask(createCeTask(SOME_CE_TASK_TYPE, SOME_COMPONENT_KEY))).isAbsent(); + } + + @Test + public void getForTask_returns_TaskProcessor_based_on_CeTask_type_only() { + CeTaskProcessor taskProcessor = createCeTaskProcessor(SOME_CE_TASK_TYPE); + CeTaskProcessorRepositoryImpl underTest = new CeTaskProcessorRepositoryImpl(new CeTaskProcessor[] {taskProcessor}); + + assertThat(underTest.getForCeTask(createCeTask(SOME_CE_TASK_TYPE, SOME_COMPONENT_KEY)).get()).isSameAs(taskProcessor); + assertThat(underTest.getForCeTask(createCeTask(SOME_CE_TASK_TYPE, SOME_COMPONENT_KEY + "2")).get()).isSameAs(taskProcessor); + } + + @Test + public void getForTask_returns_TaskProcessor_even_if_it_is_not_specific() { + CeTaskProcessor taskProcessor = createCeTaskProcessor(SOME_CE_TASK_TYPE + "_1", SOME_CE_TASK_TYPE, SOME_CE_TASK_TYPE + "_3"); + CeTaskProcessorRepositoryImpl underTest = new CeTaskProcessorRepositoryImpl(new CeTaskProcessor[] {taskProcessor}); + + assertThat(underTest.getForCeTask(createCeTask(SOME_CE_TASK_TYPE, SOME_COMPONENT_KEY)).get()).isSameAs(taskProcessor); + } + + private CeTaskProcessor createCeTaskProcessor(final String... ceTaskTypes) { + return new HandleTypeOnlyTaskProcessor(ceTaskTypes); + } + + private static CeTask createCeTask(String ceTaskType, String key) { + return new CeTask.Builder() + .setOrganizationUuid("org1") + .setType(ceTaskType) + .setUuid("task_uuid_" + key) + .setComponentKey(key).setComponentUuid("uuid_" + key).setComponentName("name_" + key) + .build(); + } + + private static class HandleTypeOnlyTaskProcessor implements CeTaskProcessor { + private final String[] ceTaskTypes; + + public HandleTypeOnlyTaskProcessor(String... ceTaskTypes) { + this.ceTaskTypes = ceTaskTypes; + } + + @Override + public Set<String> getHandledCeTaskTypes() { + return ImmutableSet.copyOf(ceTaskTypes); + } + + @Override + public CeTaskResult process(CeTask task) { + throw new UnsupportedOperationException("Process is not implemented"); + } + } + + private static class SomeProcessor1 extends HandleTypeOnlyTaskProcessor { + public SomeProcessor1(String... ceTaskTypes) { + super(ceTaskTypes); + } + } + + private static class SomeProcessor2 extends HandleTypeOnlyTaskProcessor { + public SomeProcessor2(String... ceTaskTypes) { + super(ceTaskTypes); + } + } +} diff --git a/server/sonar-ce/src/test/java/org/sonar/ce/taskprocessor/CeTaskProcessorRepositoryRule.java b/server/sonar-ce/src/test/java/org/sonar/ce/taskprocessor/CeTaskProcessorRepositoryRule.java new file mode 100644 index 00000000000..7a07b5e8b8e --- /dev/null +++ b/server/sonar-ce/src/test/java/org/sonar/ce/taskprocessor/CeTaskProcessorRepositoryRule.java @@ -0,0 +1,78 @@ +/* + * SonarQube + * Copyright (C) 2009-2017 SonarSource SA + * mailto:info AT sonarsource DOT com + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 3 of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with this program; if not, write to the Free Software Foundation, + * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ +package org.sonar.ce.taskprocessor; + +import com.google.common.base.Optional; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; +import org.junit.rules.ExternalResource; +import org.sonar.ce.queue.CeTask; +import org.sonar.ce.queue.CeTaskResult; + +import static com.google.common.base.Preconditions.checkState; +import static java.util.Objects.requireNonNull; + +/** + * A {@link org.junit.Rule} that implements the {@link CeTaskProcessorRepository} interface and + * requires consumer to explicitly define if a specific Task type has an associated {@link CeTaskProcessor} or not. + */ +public class CeTaskProcessorRepositoryRule extends ExternalResource implements CeTaskProcessorRepository { + + private final Map<String, CeTaskProcessor> index = new HashMap<>(); + + @Override + protected void after() { + index.clear(); + } + + public CeTaskProcessorRepositoryRule setNoProcessorForTask(String taskType) { + index.put(requireNonNull(taskType), NoCeTaskProcessor.INSTANCE); + return this; + } + + public CeTaskProcessorRepositoryRule setProcessorForTask(String taskType, CeTaskProcessor taskProcessor) { + index.put(requireNonNull(taskType), requireNonNull(taskProcessor)); + return this; + } + + @Override + public Optional<CeTaskProcessor> getForCeTask(CeTask ceTask) { + CeTaskProcessor taskProcessor = index.get(ceTask.getType()); + checkState(taskProcessor != null, "CeTaskProcessor was not set in rule for task %s", ceTask); + return taskProcessor instanceof NoCeTaskProcessor ? Optional.<CeTaskProcessor>absent() : Optional.of(taskProcessor); + } + + private enum NoCeTaskProcessor implements CeTaskProcessor { + INSTANCE; + + private static final String UOE_MESSAGE = "NoCeTaskProcessor does not implement any method since it not supposed to be ever used"; + + @Override + public Set<String> getHandledCeTaskTypes() { + throw new UnsupportedOperationException(UOE_MESSAGE); + } + + @Override + public CeTaskResult process(CeTask task) { + throw new UnsupportedOperationException(UOE_MESSAGE); + } + } +} diff --git a/server/sonar-ce/src/test/java/org/sonar/ce/taskprocessor/CeWorkerCallableImplTest.java b/server/sonar-ce/src/test/java/org/sonar/ce/taskprocessor/CeWorkerCallableImplTest.java new file mode 100644 index 00000000000..b63a9e0dacc --- /dev/null +++ b/server/sonar-ce/src/test/java/org/sonar/ce/taskprocessor/CeWorkerCallableImplTest.java @@ -0,0 +1,230 @@ +/* + * SonarQube + * Copyright (C) 2009-2017 SonarSource SA + * mailto:info AT sonarsource DOT com + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 3 of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with this program; if not, write to the Free Software Foundation, + * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ +package org.sonar.ce.taskprocessor; + +import com.google.common.base.Optional; +import java.util.List; +import javax.annotation.Nullable; +import org.junit.Rule; +import org.junit.Test; +import org.mockito.InOrder; +import org.mockito.Mockito; +import org.sonar.api.utils.log.LogTester; +import org.sonar.api.utils.log.LoggerLevel; +import org.sonar.ce.log.CeLogging; +import org.sonar.ce.queue.CeTask; +import org.sonar.ce.queue.InternalCeQueue; +import org.sonar.db.ce.CeActivityDto; +import org.sonar.db.ce.CeTaskTypes; +import org.sonar.server.computation.task.projectanalysis.taskprocessor.ReportTaskProcessor; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.verifyZeroInteractions; +import static org.mockito.Mockito.when; + +public class CeWorkerCallableImplTest { + + private static final String UNKNOWN_WORKER_UUID = "UNKNOWN"; + + @Rule + public CeTaskProcessorRepositoryRule taskProcessorRepository = new CeTaskProcessorRepositoryRule(); + @Rule + public LogTester logTester = new LogTester(); + + private InternalCeQueue queue = mock(InternalCeQueue.class); + private ReportTaskProcessor taskProcessor = mock(ReportTaskProcessor.class); + private CeLogging ceLogging = spy(CeLogging.class); + private CeWorkerCallable underTest = new CeWorkerCallableImpl(queue, ceLogging, taskProcessorRepository); + private InOrder inOrder = Mockito.inOrder(ceLogging, taskProcessor, queue); + + @Test + public void no_pending_tasks_in_queue() throws Exception { + when(queue.peek(UNKNOWN_WORKER_UUID)).thenReturn(Optional.<CeTask>absent()); + + assertThat(underTest.call()).isFalse(); + + verifyZeroInteractions(taskProcessor, ceLogging); + } + + @Test + public void fail_when_no_CeTaskProcessor_is_found_in_repository() throws Exception { + CeTask task = createCeTask(null); + taskProcessorRepository.setNoProcessorForTask(CeTaskTypes.REPORT); + when(queue.peek(UNKNOWN_WORKER_UUID)).thenReturn(Optional.of(task)); + + assertThat(underTest.call()).isTrue(); + + inOrder.verify(ceLogging).initForTask(task); + inOrder.verify(queue).remove(task, CeActivityDto.Status.FAILED, null, null); + inOrder.verify(ceLogging).clearForTask(); + } + + @Test + public void peek_and_process_task() throws Exception { + CeTask task = createCeTask(null); + taskProcessorRepository.setProcessorForTask(task.getType(), taskProcessor); + when(queue.peek(UNKNOWN_WORKER_UUID)).thenReturn(Optional.of(task)); + + assertThat(underTest.call()).isTrue(); + + inOrder.verify(ceLogging).initForTask(task); + inOrder.verify(taskProcessor).process(task); + inOrder.verify(queue).remove(task, CeActivityDto.Status.SUCCESS, null, null); + inOrder.verify(ceLogging).clearForTask(); + } + + @Test + public void fail_to_process_task() throws Exception { + CeTask task = createCeTask(null); + when(queue.peek(UNKNOWN_WORKER_UUID)).thenReturn(Optional.of(task)); + taskProcessorRepository.setProcessorForTask(task.getType(), taskProcessor); + Throwable error = makeTaskProcessorFail(task); + + assertThat(underTest.call()).isTrue(); + + inOrder.verify(ceLogging).initForTask(task); + inOrder.verify(taskProcessor).process(task); + inOrder.verify(queue).remove(task, CeActivityDto.Status.FAILED, null, error); + inOrder.verify(ceLogging).clearForTask(); + } + + @Test + public void do_not_display_submitter_param_in_log_when_submitterLogin_is_not_set_in_case_of_success() throws Exception { + when(queue.peek(UNKNOWN_WORKER_UUID)).thenReturn(Optional.of(createCeTask(null))); + taskProcessorRepository.setProcessorForTask(CeTaskTypes.REPORT, taskProcessor); + + underTest.call(); + + List<String> logs = logTester.logs(LoggerLevel.INFO); + assertThat(logs).hasSize(2); + for (int i = 0; i < 2; i++) { + assertThat(logs.get(i)).doesNotContain(" | submitter="); + } + } + + @Test + public void do_not_display_submitter_param_in_log_when_submitterLogin_is_not_set_in_case_of_error() throws Exception { + CeTask ceTask = createCeTask(null); + when(queue.peek(UNKNOWN_WORKER_UUID)).thenReturn(Optional.of(ceTask)); + taskProcessorRepository.setProcessorForTask(ceTask.getType(), taskProcessor); + makeTaskProcessorFail(ceTask); + + underTest.call(); + + List<String> logs = logTester.logs(LoggerLevel.INFO); + assertThat(logs).hasSize(1); + assertThat(logs.get(0)).doesNotContain(" | submitter="); + logs = logTester.logs(LoggerLevel.ERROR); + assertThat(logs).hasSize(2); + for (int i = 0; i < 2; i++) { + assertThat(logs.get(i)).doesNotContain(" | submitter="); + } + assertThat(logTester.logs(LoggerLevel.DEBUG)).isEmpty(); + } + + @Test + public void display_submitterLogin_in_logs_when_set_in_case_of_success() throws Exception { + when(queue.peek(UNKNOWN_WORKER_UUID)).thenReturn(Optional.of(createCeTask("FooBar"))); + taskProcessorRepository.setProcessorForTask(CeTaskTypes.REPORT, taskProcessor); + + underTest.call(); + + List<String> logs = logTester.logs(LoggerLevel.INFO); + assertThat(logs).hasSize(2); + assertThat(logs.get(0)).contains(" | submitter=FooBar"); + assertThat(logs.get(1)).contains(" | submitter=FooBar | time="); + assertThat(logTester.logs(LoggerLevel.ERROR)).isEmpty(); + assertThat(logTester.logs(LoggerLevel.DEBUG)).isEmpty(); + } + + @Test + public void display_submitterLogin_in_logs_when_set_in_case_of_error() throws Exception { + CeTask ceTask = createCeTask("FooBar"); + when(queue.peek(UNKNOWN_WORKER_UUID)).thenReturn(Optional.of(ceTask)); + taskProcessorRepository.setProcessorForTask(ceTask.getType(), taskProcessor); + makeTaskProcessorFail(ceTask); + + underTest.call(); + + List<String> logs = logTester.logs(LoggerLevel.INFO); + assertThat(logs).hasSize(1); + assertThat(logs.iterator().next()).contains(" | submitter=FooBar"); + logs = logTester.logs(LoggerLevel.ERROR); + assertThat(logs).hasSize(2); + assertThat(logs.get(0)).isEqualTo("Failed to execute task " + ceTask.getUuid()); + assertThat(logs.get(1)).contains(" | submitter=FooBar | time="); + } + + @Test + public void display_start_stop_at_debug_level_for_console_if_DEBUG_is_enabled_and_task_successful() throws Exception { + logTester.setLevel(LoggerLevel.DEBUG); + + when(queue.peek(UNKNOWN_WORKER_UUID)).thenReturn(Optional.of(createCeTask("FooBar"))); + taskProcessorRepository.setProcessorForTask(CeTaskTypes.REPORT, taskProcessor); + + underTest.call(); + + List<String> logs = logTester.logs(LoggerLevel.INFO); + assertThat(logs).hasSize(2); + assertThat(logs.get(0)).contains(" | submitter=FooBar"); + assertThat(logs.get(1)).contains(" | submitter=FooBar | time="); + assertThat(logTester.logs(LoggerLevel.ERROR)).isEmpty(); + assertThat(logTester.logs(LoggerLevel.DEBUG)).isEmpty(); + } + + @Test + public void display_start_at_debug_level_stop_at_error_level_for_console_if_DEBUG_is_enabled_and_task_failed() throws Exception { + logTester.setLevel(LoggerLevel.DEBUG); + + CeTask ceTask = createCeTask("FooBar"); + when(queue.peek(UNKNOWN_WORKER_UUID)).thenReturn(Optional.of(ceTask)); + taskProcessorRepository.setProcessorForTask(CeTaskTypes.REPORT, taskProcessor); + makeTaskProcessorFail(ceTask); + + underTest.call(); + + List<String> logs = logTester.logs(LoggerLevel.INFO); + assertThat(logs).hasSize(1); + assertThat(logs.iterator().next()).contains(" | submitter=FooBar"); + logs = logTester.logs(LoggerLevel.ERROR); + assertThat(logs).hasSize(2); + assertThat(logs.get(0)).isEqualTo("Failed to execute task " + ceTask.getUuid()); + assertThat(logs.get(1)).contains(" | submitter=FooBar | time="); + assertThat(logTester.logs(LoggerLevel.DEBUG)).isEmpty(); + } + + private static CeTask createCeTask(@Nullable String submitterLogin) { + return new CeTask.Builder() + .setOrganizationUuid("org1") + .setUuid("TASK_1").setType(CeTaskTypes.REPORT) + .setComponentUuid("PROJECT_1") + .setSubmitterLogin(submitterLogin) + .build(); + } + + private IllegalStateException makeTaskProcessorFail(CeTask task) { + IllegalStateException error = new IllegalStateException("simulate exception thrown by TaskProcessor#process"); + doThrow(error).when(taskProcessor).process(task); + return error; + } +} |