import org.sonar.core.platform.Module;
import org.sonar.server.computation.queue.report.ReportSubmitter;
-import org.sonar.server.computation.queue.report.ReportTaskProcessor;
+import org.sonar.server.computation.taskprocessor.report.ReportTaskProcessor;
import org.sonar.server.computation.step.ComputationStepExecutor;
public class ReportProcessingModule extends Module {
+++ /dev/null
-/*
- * SonarQube, open source software quality management tool.
- * Copyright (C) 2008-2014 SonarSource
- * mailto:contact AT sonarsource DOT com
- *
- * SonarQube is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License as published by the Free Software Foundation; either
- * version 3 of the License, or (at your option) any later version.
- *
- * SonarQube is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public License
- * along with this program; if not, write to the Free Software Foundation,
- * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
- */
-package org.sonar.server.computation.queue;
-
-public interface CeProcessingScheduler {
-
- void startScheduling();
-
-}
+++ /dev/null
-/*
- * SonarQube, open source software quality management tool.
- * Copyright (C) 2008-2014 SonarSource
- * mailto:contact AT sonarsource DOT com
- *
- * SonarQube is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License as published by the Free Software Foundation; either
- * version 3 of the License, or (at your option) any later version.
- *
- * SonarQube is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public License
- * along with this program; if not, write to the Free Software Foundation,
- * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
- */
-package org.sonar.server.computation.queue;
-
-import org.sonar.server.util.StoppableScheduledExecutorService;
-
-/**
- * The {@link java.util.concurrent.ExecutorService} responsible for running {@link CeWorkerRunnableImpl}.
- */
-public interface CeProcessingSchedulerExecutorService extends StoppableScheduledExecutorService {
-}
+++ /dev/null
-/*
- * SonarQube, open source software quality management tool.
- * Copyright (C) 2008-2014 SonarSource
- * mailto:contact AT sonarsource DOT com
- *
- * SonarQube is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License as published by the Free Software Foundation; either
- * version 3 of the License, or (at your option) any later version.
- *
- * SonarQube is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public License
- * along with this program; if not, write to the Free Software Foundation,
- * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
- */
-package org.sonar.server.computation.queue;
-
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import org.sonar.server.util.AbstractStoppableScheduledExecutorServiceImpl;
-
-public class CeProcessingSchedulerExecutorServiceImpl extends AbstractStoppableScheduledExecutorServiceImpl<ScheduledExecutorService>
- implements CeProcessingSchedulerExecutorService {
- private static final String THREAD_NAME_PREFIX = "ce-processor-";
-
- public CeProcessingSchedulerExecutorServiceImpl() {
- super(
- Executors.newSingleThreadScheduledExecutor(
- new ThreadFactoryBuilder()
- .setNameFormat(THREAD_NAME_PREFIX + "%d")
- .setPriority(Thread.MIN_PRIORITY)
- .build()));
- }
-
-}
+++ /dev/null
-/*
- * SonarQube, open source software quality management tool.
- * Copyright (C) 2008-2014 SonarSource
- * mailto:contact AT sonarsource DOT com
- *
- * SonarQube is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License as published by the Free Software Foundation; either
- * version 3 of the License, or (at your option) any later version.
- *
- * SonarQube is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public License
- * along with this program; if not, write to the Free Software Foundation,
- * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
- */
-package org.sonar.server.computation.queue;
-
-import java.util.concurrent.TimeUnit;
-
-public class CeProcessingSchedulerImpl implements CeProcessingScheduler {
- private final CeProcessingSchedulerExecutorService executorService;
- private final CeWorkerRunnable workerRunnable;
-
- private final long delayBetweenTasks;
- private final long delayForFirstStart;
- private final TimeUnit timeUnit;
-
- public CeProcessingSchedulerImpl(CeProcessingSchedulerExecutorService processingExecutorService, CeWorkerRunnable workerRunnable) {
- this.executorService = processingExecutorService;
- this.workerRunnable = workerRunnable;
-
- this.delayBetweenTasks = 10;
- this.delayForFirstStart = 0;
- this.timeUnit = TimeUnit.SECONDS;
- }
-
- @Override
- public void startScheduling() {
- executorService.scheduleAtFixedRate(workerRunnable, delayForFirstStart, delayBetweenTasks, timeUnit);
- }
-
-}
import org.sonar.db.DbClient;
import org.sonar.db.DbSession;
import org.sonar.server.computation.monitoring.CEQueueStatus;
+import org.sonar.server.computation.taskprocessor.CeProcessingScheduler;
/**
* Cleans-up the queue, initializes JMX counters then schedule
@Override
protected void configureModule() {
add(
- // queue state
- CeQueueImpl.class,
+ // queue state
+ CeQueueImpl.class,
- // queue monitoring
- CEQueueStatusImpl.class,
- ComputeEngineQueueMonitor.class,
+ // queue monitoring
+ CEQueueStatusImpl.class,
+ ComputeEngineQueueMonitor.class,
- // CE queue processing
- CeProcessingSchedulerExecutorServiceImpl.class,
- CeWorkerRunnableImpl.class,
- CeProcessingSchedulerImpl.class,
+ // queue cleaning
+ CeQueueCleaner.class,
+ CleanReportQueueListener.class,
+ ReportFiles.class,
- // queue cleaning
- CeQueueCleaner.class,
- CleanReportQueueListener.class,
- ReportFiles.class,
-
- // init queue state and queue processing
- CeQueueInitializer.class
- );
+ // init queue state and queue processing
+ CeQueueInitializer.class);
}
}
+++ /dev/null
-/*
- * SonarQube, open source software quality management tool.
- * Copyright (C) 2008-2014 SonarSource
- * mailto:contact AT sonarsource DOT com
- *
- * SonarQube is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License as published by the Free Software Foundation; either
- * version 3 of the License, or (at your option) any later version.
- *
- * SonarQube is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public License
- * along with this program; if not, write to the Free Software Foundation,
- * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
- */
-package org.sonar.server.computation.queue;
-
-/**
- * Marker interface of the runnable in charge of polling the {@link CeQueue} and executing {@link CeTask}.
- */
-public interface CeWorkerRunnable extends Runnable {
-}
+++ /dev/null
-/*
- * SonarQube, open source software quality management tool.
- * Copyright (C) 2008-2014 SonarSource
- * mailto:contact AT sonarsource DOT com
- *
- * SonarQube is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License as published by the Free Software Foundation; either
- * version 3 of the License, or (at your option) any later version.
- *
- * SonarQube is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public License
- * along with this program; if not, write to the Free Software Foundation,
- * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
- */
-
-package org.sonar.server.computation.queue;
-
-import com.google.common.base.Optional;
-import org.sonar.api.utils.log.Logger;
-import org.sonar.api.utils.log.Loggers;
-import org.sonar.core.util.logs.Profiler;
-import org.sonar.db.ce.CeActivityDto;
-import org.sonar.server.computation.log.CeLogging;
-import org.sonar.server.computation.queue.report.ReportTaskProcessor;
-
-import static java.lang.String.format;
-
-public class CeWorkerRunnableImpl implements CeWorkerRunnable {
-
- private static final Logger LOG = Loggers.get(CeWorkerRunnableImpl.class);
-
- private final CeQueue queue;
- private final ReportTaskProcessor reportTaskProcessor;
- private final CeLogging ceLogging;
-
- public CeWorkerRunnableImpl(CeQueue queue, ReportTaskProcessor reportTaskProcessor, CeLogging ceLogging) {
- this.queue = queue;
- this.reportTaskProcessor = reportTaskProcessor;
- this.ceLogging = ceLogging;
- }
-
- @Override
- public void run() {
- Optional<CeTask> ceTask = tryAndFindTaskToExecute();
- if (!ceTask.isPresent()) {
- return;
- }
-
- executeTask(ceTask.get());
- }
-
- private Optional<CeTask> tryAndFindTaskToExecute() {
- try {
- return queue.peek();
- } catch (Exception e) {
- LOG.error("Failed to pop the queue of analysis reports", e);
- }
- return Optional.absent();
- }
-
- private void executeTask(CeTask task) {
- // logging twice: once in sonar.log and once in CE appender
- Profiler regularProfiler = startProfiler(task);
- ceLogging.initForTask(task);
- Profiler ceProfiler = startProfiler(task);
-
- CeActivityDto.Status status = CeActivityDto.Status.FAILED;
- try {
- // TODO delegate the message to the related task processor, according to task type
- reportTaskProcessor.process(task);
- status = CeActivityDto.Status.SUCCESS;
- queue.remove(task, status);
- } catch (Throwable e) {
- LOG.error(format("Failed to execute task %s", task.getUuid()), e);
- queue.remove(task, status);
- } finally {
- // logging twice: once in sonar.log and once in CE appender
- stopProfiler(ceProfiler, task, status);
- ceLogging.clearForTask();
- stopProfiler(regularProfiler, task, status);
- }
- }
-
- private static Profiler startProfiler(CeTask task) {
- return Profiler.create(LOG).startInfo("Execute task | project={} | id={}", task.getComponentKey(), task.getUuid());
- }
-
- private static void stopProfiler(Profiler profiler, CeTask task, CeActivityDto.Status status) {
- if (status == CeActivityDto.Status.FAILED) {
- profiler.stopError("Executed task | project={} | id={}", task.getComponentKey(), task.getUuid());
- } else {
- profiler.stopInfo("Executed task | project={} | id={}", task.getComponentKey(), task.getUuid());
- }
- }
-}
+++ /dev/null
-/*
- * SonarQube, open source software quality management tool.
- * Copyright (C) 2008-2014 SonarSource
- * mailto:contact AT sonarsource DOT com
- *
- * SonarQube is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License as published by the Free Software Foundation; either
- * version 3 of the License, or (at your option) any later version.
- *
- * SonarQube is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public License
- * along with this program; if not, write to the Free Software Foundation,
- * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
- */
-package org.sonar.server.computation.queue.report;
-
-import org.sonar.core.platform.ComponentContainer;
-import org.sonar.server.computation.step.ComputationStepExecutor;
-import org.sonar.server.computation.container.ComputeEngineContainer;
-import org.sonar.server.computation.container.ContainerFactory;
-import org.sonar.server.computation.queue.CeTask;
-
-public class ReportTaskProcessor {
-
- private final ContainerFactory containerFactory;
- private final ComponentContainer serverContainer;
-
- public ReportTaskProcessor(ContainerFactory containerFactory, ComponentContainer serverContainer) {
- this.containerFactory = containerFactory;
- this.serverContainer = serverContainer;
- }
-
- public void process(CeTask task) {
- ComputeEngineContainer ceContainer = containerFactory.create(serverContainer, task);
- try {
- ceContainer.getComponentByType(ComputationStepExecutor.class).execute();
- } finally {
- ceContainer.cleanup();
- }
- }
-}
--- /dev/null
+/*
+ * SonarQube, open source software quality management tool.
+ * Copyright (C) 2008-2014 SonarSource
+ * mailto:contact AT sonarsource DOT com
+ *
+ * SonarQube is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 3 of the License, or (at your option) any later version.
+ *
+ * SonarQube is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this program; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ */
+package org.sonar.server.computation.taskprocessor;
+
+public interface CeProcessingScheduler {
+
+ void startScheduling();
+
+}
--- /dev/null
+/*
+ * SonarQube, open source software quality management tool.
+ * Copyright (C) 2008-2014 SonarSource
+ * mailto:contact AT sonarsource DOT com
+ *
+ * SonarQube is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 3 of the License, or (at your option) any later version.
+ *
+ * SonarQube is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this program; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ */
+package org.sonar.server.computation.taskprocessor;
+
+import org.sonar.server.util.StoppableScheduledExecutorService;
+
+/**
+ * The {@link java.util.concurrent.ExecutorService} responsible for running {@link CeWorkerRunnableImpl}.
+ */
+public interface CeProcessingSchedulerExecutorService extends StoppableScheduledExecutorService {
+}
--- /dev/null
+/*
+ * SonarQube, open source software quality management tool.
+ * Copyright (C) 2008-2014 SonarSource
+ * mailto:contact AT sonarsource DOT com
+ *
+ * SonarQube is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 3 of the License, or (at your option) any later version.
+ *
+ * SonarQube is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this program; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ */
+package org.sonar.server.computation.taskprocessor;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import org.sonar.server.util.AbstractStoppableScheduledExecutorServiceImpl;
+
+public class CeProcessingSchedulerExecutorServiceImpl extends AbstractStoppableScheduledExecutorServiceImpl<ScheduledExecutorService>
+ implements CeProcessingSchedulerExecutorService {
+ private static final String THREAD_NAME_PREFIX = "ce-processor-";
+
+ public CeProcessingSchedulerExecutorServiceImpl() {
+ super(
+ Executors.newSingleThreadScheduledExecutor(
+ new ThreadFactoryBuilder()
+ .setNameFormat(THREAD_NAME_PREFIX + "%d")
+ .setPriority(Thread.MIN_PRIORITY)
+ .build()));
+ }
+
+}
--- /dev/null
+/*
+ * SonarQube, open source software quality management tool.
+ * Copyright (C) 2008-2014 SonarSource
+ * mailto:contact AT sonarsource DOT com
+ *
+ * SonarQube is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 3 of the License, or (at your option) any later version.
+ *
+ * SonarQube is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this program; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ */
+package org.sonar.server.computation.taskprocessor;
+
+import java.util.concurrent.TimeUnit;
+
+public class CeProcessingSchedulerImpl implements CeProcessingScheduler {
+ private final CeProcessingSchedulerExecutorService executorService;
+ private final CeWorkerRunnable workerRunnable;
+
+ private final long delayBetweenTasks;
+ private final long delayForFirstStart;
+ private final TimeUnit timeUnit;
+
+ public CeProcessingSchedulerImpl(CeProcessingSchedulerExecutorService processingExecutorService, CeWorkerRunnable workerRunnable) {
+ this.executorService = processingExecutorService;
+ this.workerRunnable = workerRunnable;
+
+ this.delayBetweenTasks = 10;
+ this.delayForFirstStart = 0;
+ this.timeUnit = TimeUnit.SECONDS;
+ }
+
+ @Override
+ public void startScheduling() {
+ executorService.scheduleAtFixedRate(workerRunnable, delayForFirstStart, delayBetweenTasks, timeUnit);
+ }
+
+}
--- /dev/null
+/*
+ * SonarQube, open source software quality management tool.
+ * Copyright (C) 2008-2014 SonarSource
+ * mailto:contact AT sonarsource DOT com
+ *
+ * SonarQube is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 3 of the License, or (at your option) any later version.
+ *
+ * SonarQube is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this program; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ */
+package org.sonar.server.computation.taskprocessor;
+
+import java.util.Set;
+import org.sonar.server.computation.queue.CeTask;
+
+/**
+ * This interface is used to provide the processing code for {@link CeTask}s of one or more type to be called by the
+ * Compute Engine.
+ */
+public interface CeTaskProcessor {
+
+ /**
+ * The {@link CeTask#getType()} for which this {@link CeTaskProcessor} provides the processing code.
+ * <p>
+ * The match of type is done using {@link String#equals(Object)} and if more than one {@link CeTaskProcessor} declares
+ * itself had handler for the same {@link CeTask#getType()}, an error will be raised at startup and startup will
+ * fail.
+ * </p>
+ * <p>
+ * If an empty {@link Set} is returned, the {@link CeTaskProcessor} will be ignored.
+ * </p>
+ */
+ Set<String> getHandledCeTaskTypes();
+
+ /**
+ * Call the processing code for a specific {@link CeTask}.
+ * <p>
+ * The specified is guaranteed to be non {@code null} and its {@link CeTask#getType()} to be one of the values
+ * of {@link #getHandledCeTaskTypes()}.
+ * </p>
+ *
+ * @throws RuntimeException when thrown, it will be caught and logged by the Compute Engine and the processing of the
+ * specified {@link CeTask} will be flagged as failed.
+ */
+ void process(CeTask task);
+}
--- /dev/null
+/*
+ * SonarQube, open source software quality management tool.
+ * Copyright (C) 2008-2014 SonarSource
+ * mailto:contact AT sonarsource DOT com
+ *
+ * SonarQube is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 3 of the License, or (at your option) any later version.
+ *
+ * SonarQube is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this program; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ */
+package org.sonar.server.computation.taskprocessor;
+
+import org.sonar.core.platform.Module;
+
+public class CeTaskProcessorModule extends Module {
+ @Override
+ protected void configureModule() {
+ add(
+ CeTaskProcessorRepositoryImpl.class,
+ CeWorkerRunnableImpl.class,
+ CeProcessingSchedulerExecutorServiceImpl.class,
+ CeProcessingSchedulerImpl.class);
+ }
+}
--- /dev/null
+/*
+ * SonarQube, open source software quality management tool.
+ * Copyright (C) 2008-2014 SonarSource
+ * mailto:contact AT sonarsource DOT com
+ *
+ * SonarQube is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 3 of the License, or (at your option) any later version.
+ *
+ * SonarQube is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this program; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ */
+package org.sonar.server.computation.taskprocessor;
+
+import com.google.common.base.Optional;
+import org.sonar.server.computation.queue.CeTask;
+
+public interface CeTaskProcessorRepository {
+
+ /**
+ * @throws NullPointerException if the specified {@link CeTask} is {@code null}
+ * @throws IllegalStateException if there is no {@link CeTaskProcessor} for the specified {@link CeTask} in the repository
+ */
+ Optional<CeTaskProcessor> getForCeTask(CeTask ceTask);
+
+}
--- /dev/null
+/*
+ * SonarQube, open source software quality management tool.
+ * Copyright (C) 2008-2014 SonarSource
+ * mailto:contact AT sonarsource DOT com
+ *
+ * SonarQube is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 3 of the License, or (at your option) any later version.
+ *
+ * SonarQube is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this program; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ */
+package org.sonar.server.computation.taskprocessor;
+
+import com.google.common.base.Function;
+import com.google.common.base.Joiner;
+import com.google.common.base.Optional;
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Multimap;
+import java.util.Collection;
+import java.util.Map;
+import javax.annotation.Nonnull;
+import org.sonar.server.computation.queue.CeTask;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.collect.FluentIterable.from;
+import static java.lang.String.CASE_INSENSITIVE_ORDER;
+import static java.lang.String.format;
+
+/**
+ * {@link CeTaskProcessorRepository} implementation which provides access to the {@link CeTaskProcessor} existing in the
+ * PicoContainer the current object belongs to.
+ */
+public class CeTaskProcessorRepositoryImpl implements CeTaskProcessorRepository {
+ private static final Joiner COMMA_JOINER = Joiner.on(", ");
+
+ private final Map<String, CeTaskProcessor> taskProcessorByCeTaskType;
+
+ public CeTaskProcessorRepositoryImpl(CeTaskProcessor[] taskProcessors) {
+ this.taskProcessorByCeTaskType = indexTaskProcessors(taskProcessors);
+ }
+
+ @Override
+ public Optional<CeTaskProcessor> getForCeTask(CeTask ceTask) {
+ return Optional.fromNullable(taskProcessorByCeTaskType.get(ceTask.getType()));
+ }
+
+ private static Map<String, CeTaskProcessor> indexTaskProcessors(CeTaskProcessor[] taskProcessors) {
+ Multimap<String, CeTaskProcessor> permissiveIndex = buildPermissiveCeTaskProcessorIndex(taskProcessors);
+ checkUniqueHandlerPerCeTaskType(permissiveIndex);
+ return ImmutableMap.copyOf(Maps.transformValues(permissiveIndex.asMap(), CeTaskProcessorCollectionToFirstElement.INSTANCE));
+ }
+
+ private static Multimap<String, CeTaskProcessor> buildPermissiveCeTaskProcessorIndex(CeTaskProcessor[] taskProcessors) {
+ Multimap<String, CeTaskProcessor> permissiveIndex = ArrayListMultimap.create(taskProcessors.length, 1);
+ for (CeTaskProcessor taskProcessor : taskProcessors) {
+ for (String ceTaskType : taskProcessor.getHandledCeTaskTypes()) {
+ permissiveIndex.put(ceTaskType, taskProcessor);
+ }
+ }
+ return permissiveIndex;
+ }
+
+ private static void checkUniqueHandlerPerCeTaskType(Multimap<String, CeTaskProcessor> permissiveIndex) {
+ for (Map.Entry<String, Collection<CeTaskProcessor>> entry : permissiveIndex.asMap().entrySet()) {
+ checkArgument(
+ entry.getValue().size() == 1,
+ format(
+ "There can be only one CeTaskProcessor instance registered as the processor for CeTask type %s. " +
+ "More than one found. Please fix your configuration: %s",
+ entry.getKey(),
+ COMMA_JOINER.join(from(entry.getValue()).transform(ToClassName.INSTANCE).toSortedList(CASE_INSENSITIVE_ORDER))));
+ }
+ }
+
+ private enum ToClassName implements Function<Object, String> {
+ INSTANCE;
+
+ @Override
+ @Nonnull
+ public String apply(@Nonnull Object input) {
+ return input.getClass().getName();
+ }
+ }
+
+ private enum CeTaskProcessorCollectionToFirstElement implements Function<Collection<CeTaskProcessor>, CeTaskProcessor> {
+ INSTANCE;
+
+ @Override
+ @Nonnull
+ public CeTaskProcessor apply(@Nonnull Collection<CeTaskProcessor> input) {
+ return input.iterator().next();
+ }
+ }
+}
--- /dev/null
+/*
+ * SonarQube, open source software quality management tool.
+ * Copyright (C) 2008-2014 SonarSource
+ * mailto:contact AT sonarsource DOT com
+ *
+ * SonarQube is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 3 of the License, or (at your option) any later version.
+ *
+ * SonarQube is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this program; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ */
+package org.sonar.server.computation.taskprocessor;
+
+import org.sonar.server.computation.queue.CeQueue;
+import org.sonar.server.computation.queue.CeTask;
+
+/**
+ * Marker interface of the runnable in charge of polling the {@link CeQueue} and executing {@link CeTask}.
+ */
+public interface CeWorkerRunnable extends Runnable {
+}
--- /dev/null
+/*
+ * SonarQube, open source software quality management tool.
+ * Copyright (C) 2008-2014 SonarSource
+ * mailto:contact AT sonarsource DOT com
+ *
+ * SonarQube is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 3 of the License, or (at your option) any later version.
+ *
+ * SonarQube is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this program; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ */
+
+package org.sonar.server.computation.taskprocessor;
+
+import com.google.common.base.Optional;
+import org.sonar.api.utils.log.Logger;
+import org.sonar.api.utils.log.Loggers;
+import org.sonar.core.util.logs.Profiler;
+import org.sonar.db.ce.CeActivityDto;
+import org.sonar.server.computation.log.CeLogging;
+import org.sonar.server.computation.queue.CeQueue;
+import org.sonar.server.computation.queue.CeTask;
+
+import static java.lang.String.format;
+
+public class CeWorkerRunnableImpl implements CeWorkerRunnable {
+
+ private static final Logger LOG = Loggers.get(CeWorkerRunnableImpl.class);
+
+ private final CeQueue queue;
+ private final CeLogging ceLogging;
+ private final CeTaskProcessorRepository taskProcessorRepository;
+
+ public CeWorkerRunnableImpl(CeQueue queue, CeLogging ceLogging, CeTaskProcessorRepository taskProcessorRepository) {
+ this.queue = queue;
+ this.ceLogging = ceLogging;
+ this.taskProcessorRepository = taskProcessorRepository;
+ }
+
+ @Override
+ public void run() {
+ Optional<CeTask> ceTask = tryAndFindTaskToExecute();
+ if (!ceTask.isPresent()) {
+ return;
+ }
+
+ executeTask(ceTask.get());
+ }
+
+ private Optional<CeTask> tryAndFindTaskToExecute() {
+ try {
+ return queue.peek();
+ } catch (Exception e) {
+ LOG.error("Failed to pop the queue of analysis reports", e);
+ }
+ return Optional.absent();
+ }
+
+ private void executeTask(CeTask task) {
+ // logging twice: once in sonar.log and once in CE appender
+ Profiler regularProfiler = startProfiler(task);
+ ceLogging.initForTask(task);
+ Profiler ceProfiler = startProfiler(task);
+
+ CeActivityDto.Status status = CeActivityDto.Status.FAILED;
+ try {
+ // TODO delegate the message to the related task processor, according to task type
+ Optional<CeTaskProcessor> taskProcessor = taskProcessorRepository.getForCeTask(task);
+ if (taskProcessor.isPresent()) {
+ taskProcessor.get().process(task);
+ status = CeActivityDto.Status.SUCCESS;
+ } else {
+ LOG.error("No CeTaskProcessor is defined for task of type {}. Plugin configuration may have changed", task.getType());
+ status = CeActivityDto.Status.FAILED;
+ }
+ queue.remove(task, status);
+ } catch (Throwable e) {
+ LOG.error(format("Failed to execute task %s", task.getUuid()), e);
+ queue.remove(task, status);
+ } finally {
+ // logging twice: once in sonar.log and once in CE appender
+ stopProfiler(ceProfiler, task, status);
+ ceLogging.clearForTask();
+ stopProfiler(regularProfiler, task, status);
+ }
+ }
+
+ private static Profiler startProfiler(CeTask task) {
+ return Profiler.create(LOG).startInfo("Execute task | project={} | id={}", task.getComponentKey(), task.getUuid());
+ }
+
+ private static void stopProfiler(Profiler profiler, CeTask task, CeActivityDto.Status status) {
+ if (status == CeActivityDto.Status.FAILED) {
+ profiler.stopError("Executed task | project={} | id={}", task.getComponentKey(), task.getUuid());
+ } else {
+ profiler.stopInfo("Executed task | project={} | id={}", task.getComponentKey(), task.getUuid());
+ }
+ }
+}
--- /dev/null
+/*
+ * SonarQube, open source software quality management tool.
+ * Copyright (C) 2008-2014 SonarSource
+ * mailto:contact AT sonarsource DOT com
+ *
+ * SonarQube is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 3 of the License, or (at your option) any later version.
+ *
+ * SonarQube is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this program; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ */
+
+@ParametersAreNonnullByDefault
+package org.sonar.server.computation.taskprocessor;
+
+import javax.annotation.ParametersAreNonnullByDefault;
--- /dev/null
+/*
+ * SonarQube, open source software quality management tool.
+ * Copyright (C) 2008-2014 SonarSource
+ * mailto:contact AT sonarsource DOT com
+ *
+ * SonarQube is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 3 of the License, or (at your option) any later version.
+ *
+ * SonarQube is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this program; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ */
+package org.sonar.server.computation.taskprocessor.report;
+
+import java.util.Collections;
+import java.util.Set;
+import org.sonar.core.platform.ComponentContainer;
+import org.sonar.db.ce.CeTaskTypes;
+import org.sonar.server.computation.taskprocessor.CeTaskProcessor;
+import org.sonar.server.computation.step.ComputationStepExecutor;
+import org.sonar.server.computation.container.ComputeEngineContainer;
+import org.sonar.server.computation.container.ContainerFactory;
+import org.sonar.server.computation.queue.CeTask;
+
+public class ReportTaskProcessor implements CeTaskProcessor {
+
+ private static final Set<String> HANDLED_TYPES = Collections.singleton(CeTaskTypes.REPORT);
+
+ private final ContainerFactory containerFactory;
+ private final ComponentContainer serverContainer;
+
+ public ReportTaskProcessor(ContainerFactory containerFactory, ComponentContainer serverContainer) {
+ this.containerFactory = containerFactory;
+ this.serverContainer = serverContainer;
+ }
+
+ @Override
+ public Set<String> getHandledCeTaskTypes() {
+ return HANDLED_TYPES;
+ }
+
+ @Override
+ public void process(CeTask task) {
+ ComputeEngineContainer ceContainer = containerFactory.create(serverContainer, task);
+ try {
+ ceContainer.getComponentByType(ComputationStepExecutor.class).execute();
+ } finally {
+ ceContainer.cleanup();
+ }
+ }
+}
--- /dev/null
+/*
+ * SonarQube, open source software quality management tool.
+ * Copyright (C) 2008-2014 SonarSource
+ * mailto:contact AT sonarsource DOT com
+ *
+ * SonarQube is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 3 of the License, or (at your option) any later version.
+ *
+ * SonarQube is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this program; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ */
+
+@ParametersAreNonnullByDefault
+package org.sonar.server.computation.taskprocessor.report;
+
+import javax.annotation.ParametersAreNonnullByDefault;
import org.sonar.server.computation.CeModule;
import org.sonar.server.computation.container.ReportProcessingModule;
import org.sonar.server.computation.queue.CeQueueModule;
+import org.sonar.server.computation.taskprocessor.CeTaskProcessorModule;
import org.sonar.server.computation.ws.CeWsModule;
import org.sonar.server.config.ws.PropertiesWs;
import org.sonar.server.dashboard.template.GlobalDefaultDashboard;
// Compute engine
CeModule.class,
CeQueueModule.class,
+ CeTaskProcessorModule.class,
CeWsModule.class,
ReportProcessingModule.class,
+++ /dev/null
-/*
- * SonarQube, open source software quality management tool.
- * Copyright (C) 2008-2014 SonarSource
- * mailto:contact AT sonarsource DOT com
- *
- * SonarQube is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License as published by the Free Software Foundation; either
- * version 3 of the License, or (at your option) any later version.
- *
- * SonarQube is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public License
- * along with this program; if not, write to the Free Software Foundation,
- * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
- */
-package org.sonar.server.computation.queue;
-
-import java.util.concurrent.TimeUnit;
-import org.junit.Test;
-
-import static org.mockito.Matchers.eq;
-import static org.mockito.Matchers.same;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.verifyNoMoreInteractions;
-
-public class CeProcessingSchedulerImplTest {
- private CeProcessingSchedulerExecutorService processingExecutorService = mock(CeProcessingSchedulerExecutorService.class);
- private CeWorkerRunnable workerRunnable = mock(CeWorkerRunnable.class);
- private CeProcessingSchedulerImpl underTest = new CeProcessingSchedulerImpl(processingExecutorService, workerRunnable);
-
- @Test
- public void startScheduling_schedules_CeWorkerRunnable_at_fixed_rate_run_head_of_queue() {
- underTest.startScheduling();
-
- verify(processingExecutorService).scheduleAtFixedRate(same(workerRunnable), eq(0L), eq(10L), eq(TimeUnit.SECONDS));
- verifyNoMoreInteractions(processingExecutorService);
- }
-
-}
import org.sonar.server.computation.queue.report.ReportFiles;
import org.sonar.server.computation.monitoring.CEQueueStatus;
import org.sonar.server.computation.monitoring.CEQueueStatusImpl;
+import org.sonar.server.computation.taskprocessor.CeProcessingScheduler;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Matchers.any;
+++ /dev/null
-/*
- * SonarQube, open source software quality management tool.
- * Copyright (C) 2008-2014 SonarSource
- * mailto:contact AT sonarsource DOT com
- *
- * SonarQube is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License as published by the Free Software Foundation; either
- * version 3 of the License, or (at your option) any later version.
- *
- * SonarQube is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public License
- * along with this program; if not, write to the Free Software Foundation,
- * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
- */
-package org.sonar.server.computation.queue;
-
-import com.google.common.base.Optional;
-import org.junit.Test;
-import org.mockito.InOrder;
-import org.mockito.Mockito;
-import org.sonar.db.ce.CeActivityDto;
-import org.sonar.db.ce.CeTaskTypes;
-import org.sonar.server.computation.queue.report.ReportTaskProcessor;
-import org.sonar.server.computation.log.CeLogging;
-
-import static org.mockito.Mockito.doThrow;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.verifyZeroInteractions;
-import static org.mockito.Mockito.when;
-
-public class CeWorkerRunnableImplTest {
-
- CeQueue queue = mock(CeQueueImpl.class);
- ReportTaskProcessor taskProcessor = mock(ReportTaskProcessor.class);
- CeLogging ceLogging = mock(CeLogging.class);
- CeWorkerRunnable underTest = new CeWorkerRunnableImpl(queue, taskProcessor, ceLogging);
-
- @Test
- public void no_pending_tasks_in_queue() throws Exception {
- when(queue.peek()).thenReturn(Optional.<CeTask>absent());
-
- underTest.run();
-
- verifyZeroInteractions(taskProcessor, ceLogging);
- }
-
- @Test
- public void peek_and_process_task() throws Exception {
- CeTask task = new CeTask.Builder().setUuid("TASK_1").setType(CeTaskTypes.REPORT).setComponentUuid("PROJECT_1").setSubmitterLogin(null).build();
- when(queue.peek()).thenReturn(Optional.of(task));
-
- underTest.run();
-
- InOrder inOrder = Mockito.inOrder(ceLogging, taskProcessor, queue);
- inOrder.verify(ceLogging).initForTask(task);
- inOrder.verify(taskProcessor).process(task);
- inOrder.verify(queue).remove(task, CeActivityDto.Status.SUCCESS);
- inOrder.verify(ceLogging).clearForTask();
- }
-
- @Test
- public void fail_to_process_task() throws Exception {
- CeTask task = new CeTask.Builder().setUuid("TASK_1").setType(CeTaskTypes.REPORT).setComponentUuid("PROJECT_1").setSubmitterLogin(null).build();
- when(queue.peek()).thenReturn(Optional.of(task));
- doThrow(new IllegalStateException()).when(taskProcessor).process(task);
-
- underTest.run();
-
- InOrder inOrder = Mockito.inOrder(ceLogging, taskProcessor, queue);
- inOrder.verify(ceLogging).initForTask(task);
- inOrder.verify(taskProcessor).process(task);
- inOrder.verify(queue).remove(task, CeActivityDto.Status.FAILED);
- inOrder.verify(ceLogging).clearForTask();
- }
-}
--- /dev/null
+/*
+ * SonarQube, open source software quality management tool.
+ * Copyright (C) 2008-2014 SonarSource
+ * mailto:contact AT sonarsource DOT com
+ *
+ * SonarQube is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 3 of the License, or (at your option) any later version.
+ *
+ * SonarQube is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this program; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ */
+package org.sonar.server.computation.taskprocessor;
+
+import java.util.concurrent.TimeUnit;
+import org.junit.Test;
+
+import static org.mockito.Matchers.eq;
+import static org.mockito.Matchers.same;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+
+public class CeProcessingSchedulerImplTest {
+ private CeProcessingSchedulerExecutorService processingExecutorService = mock(CeProcessingSchedulerExecutorService.class);
+ private CeWorkerRunnable workerRunnable = mock(CeWorkerRunnable.class);
+ private CeProcessingSchedulerImpl underTest = new CeProcessingSchedulerImpl(processingExecutorService, workerRunnable);
+
+ @Test
+ public void startScheduling_schedules_CeWorkerRunnable_at_fixed_rate_run_head_of_queue() {
+ underTest.startScheduling();
+
+ verify(processingExecutorService).scheduleAtFixedRate(same(workerRunnable), eq(0L), eq(10L), eq(TimeUnit.SECONDS));
+ verifyNoMoreInteractions(processingExecutorService);
+ }
+
+}
--- /dev/null
+/*
+ * SonarQube, open source software quality management tool.
+ * Copyright (C) 2008-2014 SonarSource
+ * mailto:contact AT sonarsource DOT com
+ *
+ * SonarQube is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 3 of the License, or (at your option) any later version.
+ *
+ * SonarQube is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this program; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ */
+package org.sonar.server.computation.taskprocessor;
+
+import com.google.common.collect.ImmutableSet;
+import java.util.Set;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.sonar.server.computation.queue.CeTask;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.guava.api.Assertions.assertThat;
+
+public class CeTaskProcessorRepositoryImplTest {
+ private static final String SOME_CE_TASK_TYPE = "some type";
+ private static final String SOME_COMPONENT_KEY = "key";
+
+ @Rule
+ public ExpectedException expectedException = ExpectedException.none();
+
+ @Test
+ public void constructor_accepts_empty_array_argument() {
+ new CeTaskProcessorRepositoryImpl(new CeTaskProcessor[] {});
+ }
+
+ @Test
+ public void constructor_throws_IAE_if_two_TaskProcessor_handle_the_same_CeTask_type() {
+ expectedException.expect(IllegalArgumentException.class);
+ expectedException.expectMessage(
+ "There can be only one CeTaskProcessor instance registered as the processor for CeTask type " + SOME_CE_TASK_TYPE + ". " +
+ "More than one found. Please fix your configuration: " + SomeProcessor1.class.getName() + ", " + SomeProcessor2.class.getName());
+
+ new CeTaskProcessorRepositoryImpl(new CeTaskProcessor[] {
+ new SomeProcessor1(SOME_CE_TASK_TYPE),
+ new SomeProcessor2(SOME_CE_TASK_TYPE)
+ });
+ }
+
+ @Test
+ public void constructor_throws_IAE_if_multiple_TaskProcessor_overlap_their_supported_CeTask_type() {
+ expectedException.expect(IllegalArgumentException.class);
+ expectedException.expectMessage(
+ "There can be only one CeTaskProcessor instance registered as the processor for CeTask type " + SOME_CE_TASK_TYPE + ". " +
+ "More than one found. Please fix your configuration: " + SomeProcessor1.class.getName() + ", " + SomeProcessor2.class.getName());
+
+ new CeTaskProcessorRepositoryImpl(new CeTaskProcessor[] {
+ new SomeProcessor2(SOME_CE_TASK_TYPE + "_2", SOME_CE_TASK_TYPE),
+ new SomeProcessor1(SOME_CE_TASK_TYPE, SOME_CE_TASK_TYPE + "_3")
+ });
+ }
+
+ @Test
+ public void getForTask_returns_absent_if_repository_is_empty() {
+ CeTaskProcessorRepositoryImpl underTest = new CeTaskProcessorRepositoryImpl(new CeTaskProcessor[] {});
+
+ assertThat(underTest.getForCeTask(createCeTask(SOME_CE_TASK_TYPE, SOME_COMPONENT_KEY))).isAbsent();
+ }
+
+ @Test
+ public void getForTask_returns_absent_if_repository_does_not_contain_matching_TaskProcessor() {
+ CeTaskProcessorRepositoryImpl underTest = new CeTaskProcessorRepositoryImpl(new CeTaskProcessor[] {
+ createCeTaskProcessor(SOME_CE_TASK_TYPE + "_1"),
+ createCeTaskProcessor(SOME_CE_TASK_TYPE + "_2", SOME_CE_TASK_TYPE + "_3"),
+ });
+
+ assertThat(underTest.getForCeTask(createCeTask(SOME_CE_TASK_TYPE, SOME_COMPONENT_KEY))).isAbsent();
+ }
+
+ @Test
+ public void getForTask_returns_TaskProcessor_based_on_CeTask_type_only() {
+ CeTaskProcessor taskProcessor = createCeTaskProcessor(SOME_CE_TASK_TYPE);
+ CeTaskProcessorRepositoryImpl underTest = new CeTaskProcessorRepositoryImpl(new CeTaskProcessor[] {taskProcessor});
+
+ assertThat(underTest.getForCeTask(createCeTask(SOME_CE_TASK_TYPE, SOME_COMPONENT_KEY)).get()).isSameAs(taskProcessor);
+ assertThat(underTest.getForCeTask(createCeTask(SOME_CE_TASK_TYPE, SOME_COMPONENT_KEY + "2")).get()).isSameAs(taskProcessor);
+ }
+
+ @Test
+ public void getForTask_returns_TaskProcessor_even_if_it_is_not_specific() {
+ CeTaskProcessor taskProcessor = createCeTaskProcessor(SOME_CE_TASK_TYPE + "_1", SOME_CE_TASK_TYPE, SOME_CE_TASK_TYPE + "_3");
+ CeTaskProcessorRepositoryImpl underTest = new CeTaskProcessorRepositoryImpl(new CeTaskProcessor[] {taskProcessor});
+
+ assertThat(underTest.getForCeTask(createCeTask(SOME_CE_TASK_TYPE, SOME_COMPONENT_KEY)).get()).isSameAs(taskProcessor);
+ }
+
+ private CeTaskProcessor createCeTaskProcessor(final String... ceTaskTypes) {
+ return new HandleTypeOnlyTaskProcessor(ceTaskTypes);
+ }
+
+ private static CeTask createCeTask(String ceTaskType, String key) {
+ return new CeTask.Builder()
+ .setType(ceTaskType)
+ .setUuid("task_uuid_" + key)
+ .setComponentKey(key).setComponentUuid("uuid_" + key).setComponentName("name_" + key)
+ .build();
+ }
+
+ private static class HandleTypeOnlyTaskProcessor implements CeTaskProcessor {
+ private final String[] ceTaskTypes;
+
+ public HandleTypeOnlyTaskProcessor(String... ceTaskTypes) {
+ this.ceTaskTypes = ceTaskTypes;
+ }
+
+ @Override
+ public Set<String> getHandledCeTaskTypes() {
+ return ImmutableSet.copyOf(ceTaskTypes);
+ }
+
+ @Override
+ public void process(CeTask task) {
+ throw new UnsupportedOperationException("Process is not implemented");
+ }
+ }
+
+ private static class SomeProcessor1 extends HandleTypeOnlyTaskProcessor {
+ public SomeProcessor1(String... ceTaskTypes) {
+ super(ceTaskTypes);
+ }
+ }
+
+ private static class SomeProcessor2 extends HandleTypeOnlyTaskProcessor {
+ public SomeProcessor2(String... ceTaskTypes) {
+ super(ceTaskTypes);
+ }
+ }
+}
--- /dev/null
+/*
+ * SonarQube, open source software quality management tool.
+ * Copyright (C) 2008-2014 SonarSource
+ * mailto:contact AT sonarsource DOT com
+ *
+ * SonarQube is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 3 of the License, or (at your option) any later version.
+ *
+ * SonarQube is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this program; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ */
+package org.sonar.server.computation.taskprocessor;
+
+import com.google.common.base.Optional;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import org.junit.rules.ExternalResource;
+import org.sonar.server.computation.queue.CeTask;
+
+import static com.google.common.base.Preconditions.checkState;
+import static java.util.Objects.requireNonNull;
+
+/**
+ * A {@link org.junit.Rule} that implements the {@link CeTaskProcessorRepository} interface and
+ * requires consumer to explicitly define if a specific Task type has an associated {@link CeTaskProcessor} or not.
+ */
+public class CeTaskProcessorRepositoryRule extends ExternalResource implements CeTaskProcessorRepository {
+
+ private final Map<String, CeTaskProcessor> index = new HashMap<>();
+
+ @Override
+ protected void after() {
+ index.clear();
+ }
+
+ public CeTaskProcessorRepositoryRule setNoProcessorForTask(String taskType) {
+ index.put(requireNonNull(taskType), NoCeTaskProcessor.INSTANCE);
+ return this;
+ }
+
+ public CeTaskProcessorRepositoryRule setProcessorForTask(String taskType, CeTaskProcessor taskProcessor) {
+ index.put(requireNonNull(taskType), requireNonNull(taskProcessor));
+ return this;
+ }
+
+ @Override
+ public Optional<CeTaskProcessor> getForCeTask(CeTask ceTask) {
+ CeTaskProcessor taskProcessor = index.get(ceTask.getType());
+ checkState(taskProcessor != null, "CeTaskProcessor was not set in rule for task %s", ceTask);
+ return taskProcessor instanceof NoCeTaskProcessor ? Optional.<CeTaskProcessor>absent() : Optional.of(taskProcessor);
+ }
+
+ private enum NoCeTaskProcessor implements CeTaskProcessor {
+ INSTANCE;
+
+ private static final String UOE_MESSAGE = "NoCeTaskProcessor does not implement any method since it not supposed to be ever used";
+
+ @Override
+ public Set<String> getHandledCeTaskTypes() {
+ throw new UnsupportedOperationException(UOE_MESSAGE);
+ }
+
+ @Override
+ public void process(CeTask task) {
+ throw new UnsupportedOperationException(UOE_MESSAGE);
+ }
+ }
+}
--- /dev/null
+/*
+ * SonarQube, open source software quality management tool.
+ * Copyright (C) 2008-2014 SonarSource
+ * mailto:contact AT sonarsource DOT com
+ *
+ * SonarQube is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 3 of the License, or (at your option) any later version.
+ *
+ * SonarQube is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this program; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ */
+package org.sonar.server.computation.taskprocessor;
+
+import com.google.common.base.Optional;
+import org.junit.Rule;
+import org.junit.Test;
+import org.mockito.InOrder;
+import org.mockito.Mockito;
+import org.sonar.db.ce.CeActivityDto;
+import org.sonar.db.ce.CeTaskTypes;
+import org.sonar.server.computation.log.CeLogging;
+import org.sonar.server.computation.queue.CeQueue;
+import org.sonar.server.computation.queue.CeQueueImpl;
+import org.sonar.server.computation.queue.CeTask;
+import org.sonar.server.computation.taskprocessor.report.ReportTaskProcessor;
+
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verifyZeroInteractions;
+import static org.mockito.Mockito.when;
+
+public class CeWorkerRunnableImplTest {
+
+ @Rule
+ public CeTaskProcessorRepositoryRule taskProcessorRepository = new CeTaskProcessorRepositoryRule();
+
+ CeQueue queue = mock(CeQueueImpl.class);
+ ReportTaskProcessor taskProcessor = mock(ReportTaskProcessor.class);
+ CeLogging ceLogging = mock(CeLogging.class);
+ CeWorkerRunnable underTest = new CeWorkerRunnableImpl(queue, ceLogging, taskProcessorRepository);
+ InOrder inOrder = Mockito.inOrder(ceLogging, taskProcessor, queue);
+
+ @Test
+ public void no_pending_tasks_in_queue() throws Exception {
+ when(queue.peek()).thenReturn(Optional.<CeTask>absent());
+
+ underTest.run();
+
+ verifyZeroInteractions(taskProcessor, ceLogging);
+ }
+
+ @Test
+ public void fail_when_no_CeTaskProcessor_is_found_in_repository() {
+ CeTask task = new CeTask.Builder().setUuid("TASK_1").setType(CeTaskTypes.REPORT).setComponentUuid("PROJECT_1").setSubmitterLogin(null).build();
+ taskProcessorRepository.setNoProcessorForTask(CeTaskTypes.REPORT);
+ when(queue.peek()).thenReturn(Optional.of(task));
+
+ underTest.run();
+
+ inOrder.verify(ceLogging).initForTask(task);
+ inOrder.verify(queue).remove(task, CeActivityDto.Status.FAILED);
+ inOrder.verify(ceLogging).clearForTask();
+ }
+
+ @Test
+ public void peek_and_process_task() throws Exception {
+ CeTask task = new CeTask.Builder().setUuid("TASK_1").setType(CeTaskTypes.REPORT).setComponentUuid("PROJECT_1").setSubmitterLogin(null).build();
+ taskProcessorRepository.setProcessorForTask(task.getType(), taskProcessor);
+ when(queue.peek()).thenReturn(Optional.of(task));
+
+ underTest.run();
+
+ inOrder.verify(ceLogging).initForTask(task);
+ inOrder.verify(taskProcessor).process(task);
+ inOrder.verify(queue).remove(task, CeActivityDto.Status.SUCCESS);
+ inOrder.verify(ceLogging).clearForTask();
+ }
+
+ @Test
+ public void fail_to_process_task() throws Exception {
+ CeTask task = new CeTask.Builder().setUuid("TASK_1").setType(CeTaskTypes.REPORT).setComponentUuid("PROJECT_1").setSubmitterLogin(null).build();
+ when(queue.peek()).thenReturn(Optional.of(task));
+ taskProcessorRepository.setProcessorForTask(task.getType(), taskProcessor);
+ doThrow(new IllegalStateException("simulate exception thrown by TaskProcessor#process")).when(taskProcessor).process(task);
+
+ underTest.run();
+
+ inOrder.verify(ceLogging).initForTask(task);
+ inOrder.verify(taskProcessor).process(task);
+ inOrder.verify(queue).remove(task, CeActivityDto.Status.FAILED);
+ inOrder.verify(ceLogging).clearForTask();
+ }
+}