@@ -21,7 +21,7 @@ package org.sonar.server.computation.container; | |||
import org.sonar.core.platform.Module; | |||
import org.sonar.server.computation.queue.report.ReportSubmitter; | |||
import org.sonar.server.computation.queue.report.ReportTaskProcessor; | |||
import org.sonar.server.computation.taskprocessor.report.ReportTaskProcessor; | |||
import org.sonar.server.computation.step.ComputationStepExecutor; | |||
public class ReportProcessingModule extends Module { |
@@ -24,6 +24,7 @@ import org.sonar.api.server.ServerSide; | |||
import org.sonar.db.DbClient; | |||
import org.sonar.db.DbSession; | |||
import org.sonar.server.computation.monitoring.CEQueueStatus; | |||
import org.sonar.server.computation.taskprocessor.CeProcessingScheduler; | |||
/** | |||
* Cleans-up the queue, initializes JMX counters then schedule |
@@ -29,25 +29,19 @@ public class CeQueueModule extends Module { | |||
@Override | |||
protected void configureModule() { | |||
add( | |||
// queue state | |||
CeQueueImpl.class, | |||
// queue state | |||
CeQueueImpl.class, | |||
// queue monitoring | |||
CEQueueStatusImpl.class, | |||
ComputeEngineQueueMonitor.class, | |||
// queue monitoring | |||
CEQueueStatusImpl.class, | |||
ComputeEngineQueueMonitor.class, | |||
// CE queue processing | |||
CeProcessingSchedulerExecutorServiceImpl.class, | |||
CeWorkerRunnableImpl.class, | |||
CeProcessingSchedulerImpl.class, | |||
// queue cleaning | |||
CeQueueCleaner.class, | |||
CleanReportQueueListener.class, | |||
ReportFiles.class, | |||
// queue cleaning | |||
CeQueueCleaner.class, | |||
CleanReportQueueListener.class, | |||
ReportFiles.class, | |||
// init queue state and queue processing | |||
CeQueueInitializer.class | |||
); | |||
// init queue state and queue processing | |||
CeQueueInitializer.class); | |||
} | |||
} |
@@ -17,7 +17,7 @@ | |||
* along with this program; if not, write to the Free Software Foundation, | |||
* Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. | |||
*/ | |||
package org.sonar.server.computation.queue; | |||
package org.sonar.server.computation.taskprocessor; | |||
public interface CeProcessingScheduler { | |||
@@ -17,7 +17,7 @@ | |||
* along with this program; if not, write to the Free Software Foundation, | |||
* Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. | |||
*/ | |||
package org.sonar.server.computation.queue; | |||
package org.sonar.server.computation.taskprocessor; | |||
import org.sonar.server.util.StoppableScheduledExecutorService; | |||
@@ -17,7 +17,7 @@ | |||
* along with this program; if not, write to the Free Software Foundation, | |||
* Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. | |||
*/ | |||
package org.sonar.server.computation.queue; | |||
package org.sonar.server.computation.taskprocessor; | |||
import com.google.common.util.concurrent.ThreadFactoryBuilder; | |||
import java.util.concurrent.Executors; |
@@ -17,7 +17,7 @@ | |||
* along with this program; if not, write to the Free Software Foundation, | |||
* Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. | |||
*/ | |||
package org.sonar.server.computation.queue; | |||
package org.sonar.server.computation.taskprocessor; | |||
import java.util.concurrent.TimeUnit; | |||
@@ -0,0 +1,55 @@ | |||
/* | |||
* SonarQube, open source software quality management tool. | |||
* Copyright (C) 2008-2014 SonarSource | |||
* mailto:contact AT sonarsource DOT com | |||
* | |||
* SonarQube is free software; you can redistribute it and/or | |||
* modify it under the terms of the GNU Lesser General Public | |||
* License as published by the Free Software Foundation; either | |||
* version 3 of the License, or (at your option) any later version. | |||
* | |||
* SonarQube is distributed in the hope that it will be useful, | |||
* but WITHOUT ANY WARRANTY; without even the implied warranty of | |||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU | |||
* Lesser General Public License for more details. | |||
* | |||
* You should have received a copy of the GNU Lesser General Public License | |||
* along with this program; if not, write to the Free Software Foundation, | |||
* Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. | |||
*/ | |||
package org.sonar.server.computation.taskprocessor; | |||
import java.util.Set; | |||
import org.sonar.server.computation.queue.CeTask; | |||
/** | |||
* This interface is used to provide the processing code for {@link CeTask}s of one or more type to be called by the | |||
* Compute Engine. | |||
*/ | |||
public interface CeTaskProcessor { | |||
/** | |||
* The {@link CeTask#getType()} for which this {@link CeTaskProcessor} provides the processing code. | |||
* <p> | |||
* The match of type is done using {@link String#equals(Object)} and if more than one {@link CeTaskProcessor} declares | |||
* itself had handler for the same {@link CeTask#getType()}, an error will be raised at startup and startup will | |||
* fail. | |||
* </p> | |||
* <p> | |||
* If an empty {@link Set} is returned, the {@link CeTaskProcessor} will be ignored. | |||
* </p> | |||
*/ | |||
Set<String> getHandledCeTaskTypes(); | |||
/** | |||
* Call the processing code for a specific {@link CeTask}. | |||
* <p> | |||
* The specified is guaranteed to be non {@code null} and its {@link CeTask#getType()} to be one of the values | |||
* of {@link #getHandledCeTaskTypes()}. | |||
* </p> | |||
* | |||
* @throws RuntimeException when thrown, it will be caught and logged by the Compute Engine and the processing of the | |||
* specified {@link CeTask} will be flagged as failed. | |||
*/ | |||
void process(CeTask task); | |||
} |
@@ -0,0 +1,33 @@ | |||
/* | |||
* SonarQube, open source software quality management tool. | |||
* Copyright (C) 2008-2014 SonarSource | |||
* mailto:contact AT sonarsource DOT com | |||
* | |||
* SonarQube is free software; you can redistribute it and/or | |||
* modify it under the terms of the GNU Lesser General Public | |||
* License as published by the Free Software Foundation; either | |||
* version 3 of the License, or (at your option) any later version. | |||
* | |||
* SonarQube is distributed in the hope that it will be useful, | |||
* but WITHOUT ANY WARRANTY; without even the implied warranty of | |||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU | |||
* Lesser General Public License for more details. | |||
* | |||
* You should have received a copy of the GNU Lesser General Public License | |||
* along with this program; if not, write to the Free Software Foundation, | |||
* Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. | |||
*/ | |||
package org.sonar.server.computation.taskprocessor; | |||
import org.sonar.core.platform.Module; | |||
public class CeTaskProcessorModule extends Module { | |||
@Override | |||
protected void configureModule() { | |||
add( | |||
CeTaskProcessorRepositoryImpl.class, | |||
CeWorkerRunnableImpl.class, | |||
CeProcessingSchedulerExecutorServiceImpl.class, | |||
CeProcessingSchedulerImpl.class); | |||
} | |||
} |
@@ -0,0 +1,33 @@ | |||
/* | |||
* SonarQube, open source software quality management tool. | |||
* Copyright (C) 2008-2014 SonarSource | |||
* mailto:contact AT sonarsource DOT com | |||
* | |||
* SonarQube is free software; you can redistribute it and/or | |||
* modify it under the terms of the GNU Lesser General Public | |||
* License as published by the Free Software Foundation; either | |||
* version 3 of the License, or (at your option) any later version. | |||
* | |||
* SonarQube is distributed in the hope that it will be useful, | |||
* but WITHOUT ANY WARRANTY; without even the implied warranty of | |||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU | |||
* Lesser General Public License for more details. | |||
* | |||
* You should have received a copy of the GNU Lesser General Public License | |||
* along with this program; if not, write to the Free Software Foundation, | |||
* Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. | |||
*/ | |||
package org.sonar.server.computation.taskprocessor; | |||
import com.google.common.base.Optional; | |||
import org.sonar.server.computation.queue.CeTask; | |||
public interface CeTaskProcessorRepository { | |||
/** | |||
* @throws NullPointerException if the specified {@link CeTask} is {@code null} | |||
* @throws IllegalStateException if there is no {@link CeTaskProcessor} for the specified {@link CeTask} in the repository | |||
*/ | |||
Optional<CeTaskProcessor> getForCeTask(CeTask ceTask); | |||
} |
@@ -0,0 +1,104 @@ | |||
/* | |||
* SonarQube, open source software quality management tool. | |||
* Copyright (C) 2008-2014 SonarSource | |||
* mailto:contact AT sonarsource DOT com | |||
* | |||
* SonarQube is free software; you can redistribute it and/or | |||
* modify it under the terms of the GNU Lesser General Public | |||
* License as published by the Free Software Foundation; either | |||
* version 3 of the License, or (at your option) any later version. | |||
* | |||
* SonarQube is distributed in the hope that it will be useful, | |||
* but WITHOUT ANY WARRANTY; without even the implied warranty of | |||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU | |||
* Lesser General Public License for more details. | |||
* | |||
* You should have received a copy of the GNU Lesser General Public License | |||
* along with this program; if not, write to the Free Software Foundation, | |||
* Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. | |||
*/ | |||
package org.sonar.server.computation.taskprocessor; | |||
import com.google.common.base.Function; | |||
import com.google.common.base.Joiner; | |||
import com.google.common.base.Optional; | |||
import com.google.common.collect.ArrayListMultimap; | |||
import com.google.common.collect.ImmutableMap; | |||
import com.google.common.collect.Maps; | |||
import com.google.common.collect.Multimap; | |||
import java.util.Collection; | |||
import java.util.Map; | |||
import javax.annotation.Nonnull; | |||
import org.sonar.server.computation.queue.CeTask; | |||
import static com.google.common.base.Preconditions.checkArgument; | |||
import static com.google.common.collect.FluentIterable.from; | |||
import static java.lang.String.CASE_INSENSITIVE_ORDER; | |||
import static java.lang.String.format; | |||
/** | |||
* {@link CeTaskProcessorRepository} implementation which provides access to the {@link CeTaskProcessor} existing in the | |||
* PicoContainer the current object belongs to. | |||
*/ | |||
public class CeTaskProcessorRepositoryImpl implements CeTaskProcessorRepository { | |||
private static final Joiner COMMA_JOINER = Joiner.on(", "); | |||
private final Map<String, CeTaskProcessor> taskProcessorByCeTaskType; | |||
public CeTaskProcessorRepositoryImpl(CeTaskProcessor[] taskProcessors) { | |||
this.taskProcessorByCeTaskType = indexTaskProcessors(taskProcessors); | |||
} | |||
@Override | |||
public Optional<CeTaskProcessor> getForCeTask(CeTask ceTask) { | |||
return Optional.fromNullable(taskProcessorByCeTaskType.get(ceTask.getType())); | |||
} | |||
private static Map<String, CeTaskProcessor> indexTaskProcessors(CeTaskProcessor[] taskProcessors) { | |||
Multimap<String, CeTaskProcessor> permissiveIndex = buildPermissiveCeTaskProcessorIndex(taskProcessors); | |||
checkUniqueHandlerPerCeTaskType(permissiveIndex); | |||
return ImmutableMap.copyOf(Maps.transformValues(permissiveIndex.asMap(), CeTaskProcessorCollectionToFirstElement.INSTANCE)); | |||
} | |||
private static Multimap<String, CeTaskProcessor> buildPermissiveCeTaskProcessorIndex(CeTaskProcessor[] taskProcessors) { | |||
Multimap<String, CeTaskProcessor> permissiveIndex = ArrayListMultimap.create(taskProcessors.length, 1); | |||
for (CeTaskProcessor taskProcessor : taskProcessors) { | |||
for (String ceTaskType : taskProcessor.getHandledCeTaskTypes()) { | |||
permissiveIndex.put(ceTaskType, taskProcessor); | |||
} | |||
} | |||
return permissiveIndex; | |||
} | |||
private static void checkUniqueHandlerPerCeTaskType(Multimap<String, CeTaskProcessor> permissiveIndex) { | |||
for (Map.Entry<String, Collection<CeTaskProcessor>> entry : permissiveIndex.asMap().entrySet()) { | |||
checkArgument( | |||
entry.getValue().size() == 1, | |||
format( | |||
"There can be only one CeTaskProcessor instance registered as the processor for CeTask type %s. " + | |||
"More than one found. Please fix your configuration: %s", | |||
entry.getKey(), | |||
COMMA_JOINER.join(from(entry.getValue()).transform(ToClassName.INSTANCE).toSortedList(CASE_INSENSITIVE_ORDER)))); | |||
} | |||
} | |||
private enum ToClassName implements Function<Object, String> { | |||
INSTANCE; | |||
@Override | |||
@Nonnull | |||
public String apply(@Nonnull Object input) { | |||
return input.getClass().getName(); | |||
} | |||
} | |||
private enum CeTaskProcessorCollectionToFirstElement implements Function<Collection<CeTaskProcessor>, CeTaskProcessor> { | |||
INSTANCE; | |||
@Override | |||
@Nonnull | |||
public CeTaskProcessor apply(@Nonnull Collection<CeTaskProcessor> input) { | |||
return input.iterator().next(); | |||
} | |||
} | |||
} |
@@ -17,7 +17,10 @@ | |||
* along with this program; if not, write to the Free Software Foundation, | |||
* Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. | |||
*/ | |||
package org.sonar.server.computation.queue; | |||
package org.sonar.server.computation.taskprocessor; | |||
import org.sonar.server.computation.queue.CeQueue; | |||
import org.sonar.server.computation.queue.CeTask; | |||
/** | |||
* Marker interface of the runnable in charge of polling the {@link CeQueue} and executing {@link CeTask}. |
@@ -18,7 +18,7 @@ | |||
* Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. | |||
*/ | |||
package org.sonar.server.computation.queue; | |||
package org.sonar.server.computation.taskprocessor; | |||
import com.google.common.base.Optional; | |||
import org.sonar.api.utils.log.Logger; | |||
@@ -26,7 +26,8 @@ import org.sonar.api.utils.log.Loggers; | |||
import org.sonar.core.util.logs.Profiler; | |||
import org.sonar.db.ce.CeActivityDto; | |||
import org.sonar.server.computation.log.CeLogging; | |||
import org.sonar.server.computation.queue.report.ReportTaskProcessor; | |||
import org.sonar.server.computation.queue.CeQueue; | |||
import org.sonar.server.computation.queue.CeTask; | |||
import static java.lang.String.format; | |||
@@ -35,13 +36,13 @@ public class CeWorkerRunnableImpl implements CeWorkerRunnable { | |||
private static final Logger LOG = Loggers.get(CeWorkerRunnableImpl.class); | |||
private final CeQueue queue; | |||
private final ReportTaskProcessor reportTaskProcessor; | |||
private final CeLogging ceLogging; | |||
private final CeTaskProcessorRepository taskProcessorRepository; | |||
public CeWorkerRunnableImpl(CeQueue queue, ReportTaskProcessor reportTaskProcessor, CeLogging ceLogging) { | |||
public CeWorkerRunnableImpl(CeQueue queue, CeLogging ceLogging, CeTaskProcessorRepository taskProcessorRepository) { | |||
this.queue = queue; | |||
this.reportTaskProcessor = reportTaskProcessor; | |||
this.ceLogging = ceLogging; | |||
this.taskProcessorRepository = taskProcessorRepository; | |||
} | |||
@Override | |||
@@ -72,8 +73,14 @@ public class CeWorkerRunnableImpl implements CeWorkerRunnable { | |||
CeActivityDto.Status status = CeActivityDto.Status.FAILED; | |||
try { | |||
// TODO delegate the message to the related task processor, according to task type | |||
reportTaskProcessor.process(task); | |||
status = CeActivityDto.Status.SUCCESS; | |||
Optional<CeTaskProcessor> taskProcessor = taskProcessorRepository.getForCeTask(task); | |||
if (taskProcessor.isPresent()) { | |||
taskProcessor.get().process(task); | |||
status = CeActivityDto.Status.SUCCESS; | |||
} else { | |||
LOG.error("No CeTaskProcessor is defined for task of type {}. Plugin configuration may have changed", task.getType()); | |||
status = CeActivityDto.Status.FAILED; | |||
} | |||
queue.remove(task, status); | |||
} catch (Throwable e) { | |||
LOG.error(format("Failed to execute task %s", task.getUuid()), e); |
@@ -0,0 +1,24 @@ | |||
/* | |||
* SonarQube, open source software quality management tool. | |||
* Copyright (C) 2008-2014 SonarSource | |||
* mailto:contact AT sonarsource DOT com | |||
* | |||
* SonarQube is free software; you can redistribute it and/or | |||
* modify it under the terms of the GNU Lesser General Public | |||
* License as published by the Free Software Foundation; either | |||
* version 3 of the License, or (at your option) any later version. | |||
* | |||
* SonarQube is distributed in the hope that it will be useful, | |||
* but WITHOUT ANY WARRANTY; without even the implied warranty of | |||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU | |||
* Lesser General Public License for more details. | |||
* | |||
* You should have received a copy of the GNU Lesser General Public License | |||
* along with this program; if not, write to the Free Software Foundation, | |||
* Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. | |||
*/ | |||
@ParametersAreNonnullByDefault | |||
package org.sonar.server.computation.taskprocessor; | |||
import javax.annotation.ParametersAreNonnullByDefault; |
@@ -17,15 +17,21 @@ | |||
* along with this program; if not, write to the Free Software Foundation, | |||
* Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. | |||
*/ | |||
package org.sonar.server.computation.queue.report; | |||
package org.sonar.server.computation.taskprocessor.report; | |||
import java.util.Collections; | |||
import java.util.Set; | |||
import org.sonar.core.platform.ComponentContainer; | |||
import org.sonar.db.ce.CeTaskTypes; | |||
import org.sonar.server.computation.taskprocessor.CeTaskProcessor; | |||
import org.sonar.server.computation.step.ComputationStepExecutor; | |||
import org.sonar.server.computation.container.ComputeEngineContainer; | |||
import org.sonar.server.computation.container.ContainerFactory; | |||
import org.sonar.server.computation.queue.CeTask; | |||
public class ReportTaskProcessor { | |||
public class ReportTaskProcessor implements CeTaskProcessor { | |||
private static final Set<String> HANDLED_TYPES = Collections.singleton(CeTaskTypes.REPORT); | |||
private final ContainerFactory containerFactory; | |||
private final ComponentContainer serverContainer; | |||
@@ -35,6 +41,12 @@ public class ReportTaskProcessor { | |||
this.serverContainer = serverContainer; | |||
} | |||
@Override | |||
public Set<String> getHandledCeTaskTypes() { | |||
return HANDLED_TYPES; | |||
} | |||
@Override | |||
public void process(CeTask task) { | |||
ComputeEngineContainer ceContainer = containerFactory.create(serverContainer, task); | |||
try { |
@@ -0,0 +1,24 @@ | |||
/* | |||
* SonarQube, open source software quality management tool. | |||
* Copyright (C) 2008-2014 SonarSource | |||
* mailto:contact AT sonarsource DOT com | |||
* | |||
* SonarQube is free software; you can redistribute it and/or | |||
* modify it under the terms of the GNU Lesser General Public | |||
* License as published by the Free Software Foundation; either | |||
* version 3 of the License, or (at your option) any later version. | |||
* | |||
* SonarQube is distributed in the hope that it will be useful, | |||
* but WITHOUT ANY WARRANTY; without even the implied warranty of | |||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU | |||
* Lesser General Public License for more details. | |||
* | |||
* You should have received a copy of the GNU Lesser General Public License | |||
* along with this program; if not, write to the Free Software Foundation, | |||
* Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. | |||
*/ | |||
@ParametersAreNonnullByDefault | |||
package org.sonar.server.computation.taskprocessor.report; | |||
import javax.annotation.ParametersAreNonnullByDefault; |
@@ -64,6 +64,7 @@ import org.sonar.server.component.ws.ResourcesWs; | |||
import org.sonar.server.computation.CeModule; | |||
import org.sonar.server.computation.container.ReportProcessingModule; | |||
import org.sonar.server.computation.queue.CeQueueModule; | |||
import org.sonar.server.computation.taskprocessor.CeTaskProcessorModule; | |||
import org.sonar.server.computation.ws.CeWsModule; | |||
import org.sonar.server.config.ws.PropertiesWs; | |||
import org.sonar.server.dashboard.template.GlobalDefaultDashboard; | |||
@@ -701,6 +702,7 @@ public class PlatformLevel4 extends PlatformLevel { | |||
// Compute engine | |||
CeModule.class, | |||
CeQueueModule.class, | |||
CeTaskProcessorModule.class, | |||
CeWsModule.class, | |||
ReportProcessingModule.class, | |||
@@ -34,6 +34,7 @@ import org.sonar.db.ce.CeTaskTypes; | |||
import org.sonar.server.computation.queue.report.ReportFiles; | |||
import org.sonar.server.computation.monitoring.CEQueueStatus; | |||
import org.sonar.server.computation.monitoring.CEQueueStatusImpl; | |||
import org.sonar.server.computation.taskprocessor.CeProcessingScheduler; | |||
import static org.assertj.core.api.Assertions.assertThat; | |||
import static org.mockito.Matchers.any; |
@@ -17,7 +17,7 @@ | |||
* along with this program; if not, write to the Free Software Foundation, | |||
* Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. | |||
*/ | |||
package org.sonar.server.computation.queue; | |||
package org.sonar.server.computation.taskprocessor; | |||
import java.util.concurrent.TimeUnit; | |||
import org.junit.Test; |
@@ -0,0 +1,145 @@ | |||
/* | |||
* SonarQube, open source software quality management tool. | |||
* Copyright (C) 2008-2014 SonarSource | |||
* mailto:contact AT sonarsource DOT com | |||
* | |||
* SonarQube is free software; you can redistribute it and/or | |||
* modify it under the terms of the GNU Lesser General Public | |||
* License as published by the Free Software Foundation; either | |||
* version 3 of the License, or (at your option) any later version. | |||
* | |||
* SonarQube is distributed in the hope that it will be useful, | |||
* but WITHOUT ANY WARRANTY; without even the implied warranty of | |||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU | |||
* Lesser General Public License for more details. | |||
* | |||
* You should have received a copy of the GNU Lesser General Public License | |||
* along with this program; if not, write to the Free Software Foundation, | |||
* Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. | |||
*/ | |||
package org.sonar.server.computation.taskprocessor; | |||
import com.google.common.collect.ImmutableSet; | |||
import java.util.Set; | |||
import org.junit.Rule; | |||
import org.junit.Test; | |||
import org.junit.rules.ExpectedException; | |||
import org.sonar.server.computation.queue.CeTask; | |||
import static org.assertj.core.api.Assertions.assertThat; | |||
import static org.assertj.guava.api.Assertions.assertThat; | |||
public class CeTaskProcessorRepositoryImplTest { | |||
private static final String SOME_CE_TASK_TYPE = "some type"; | |||
private static final String SOME_COMPONENT_KEY = "key"; | |||
@Rule | |||
public ExpectedException expectedException = ExpectedException.none(); | |||
@Test | |||
public void constructor_accepts_empty_array_argument() { | |||
new CeTaskProcessorRepositoryImpl(new CeTaskProcessor[] {}); | |||
} | |||
@Test | |||
public void constructor_throws_IAE_if_two_TaskProcessor_handle_the_same_CeTask_type() { | |||
expectedException.expect(IllegalArgumentException.class); | |||
expectedException.expectMessage( | |||
"There can be only one CeTaskProcessor instance registered as the processor for CeTask type " + SOME_CE_TASK_TYPE + ". " + | |||
"More than one found. Please fix your configuration: " + SomeProcessor1.class.getName() + ", " + SomeProcessor2.class.getName()); | |||
new CeTaskProcessorRepositoryImpl(new CeTaskProcessor[] { | |||
new SomeProcessor1(SOME_CE_TASK_TYPE), | |||
new SomeProcessor2(SOME_CE_TASK_TYPE) | |||
}); | |||
} | |||
@Test | |||
public void constructor_throws_IAE_if_multiple_TaskProcessor_overlap_their_supported_CeTask_type() { | |||
expectedException.expect(IllegalArgumentException.class); | |||
expectedException.expectMessage( | |||
"There can be only one CeTaskProcessor instance registered as the processor for CeTask type " + SOME_CE_TASK_TYPE + ". " + | |||
"More than one found. Please fix your configuration: " + SomeProcessor1.class.getName() + ", " + SomeProcessor2.class.getName()); | |||
new CeTaskProcessorRepositoryImpl(new CeTaskProcessor[] { | |||
new SomeProcessor2(SOME_CE_TASK_TYPE + "_2", SOME_CE_TASK_TYPE), | |||
new SomeProcessor1(SOME_CE_TASK_TYPE, SOME_CE_TASK_TYPE + "_3") | |||
}); | |||
} | |||
@Test | |||
public void getForTask_returns_absent_if_repository_is_empty() { | |||
CeTaskProcessorRepositoryImpl underTest = new CeTaskProcessorRepositoryImpl(new CeTaskProcessor[] {}); | |||
assertThat(underTest.getForCeTask(createCeTask(SOME_CE_TASK_TYPE, SOME_COMPONENT_KEY))).isAbsent(); | |||
} | |||
@Test | |||
public void getForTask_returns_absent_if_repository_does_not_contain_matching_TaskProcessor() { | |||
CeTaskProcessorRepositoryImpl underTest = new CeTaskProcessorRepositoryImpl(new CeTaskProcessor[] { | |||
createCeTaskProcessor(SOME_CE_TASK_TYPE + "_1"), | |||
createCeTaskProcessor(SOME_CE_TASK_TYPE + "_2", SOME_CE_TASK_TYPE + "_3"), | |||
}); | |||
assertThat(underTest.getForCeTask(createCeTask(SOME_CE_TASK_TYPE, SOME_COMPONENT_KEY))).isAbsent(); | |||
} | |||
@Test | |||
public void getForTask_returns_TaskProcessor_based_on_CeTask_type_only() { | |||
CeTaskProcessor taskProcessor = createCeTaskProcessor(SOME_CE_TASK_TYPE); | |||
CeTaskProcessorRepositoryImpl underTest = new CeTaskProcessorRepositoryImpl(new CeTaskProcessor[] {taskProcessor}); | |||
assertThat(underTest.getForCeTask(createCeTask(SOME_CE_TASK_TYPE, SOME_COMPONENT_KEY)).get()).isSameAs(taskProcessor); | |||
assertThat(underTest.getForCeTask(createCeTask(SOME_CE_TASK_TYPE, SOME_COMPONENT_KEY + "2")).get()).isSameAs(taskProcessor); | |||
} | |||
@Test | |||
public void getForTask_returns_TaskProcessor_even_if_it_is_not_specific() { | |||
CeTaskProcessor taskProcessor = createCeTaskProcessor(SOME_CE_TASK_TYPE + "_1", SOME_CE_TASK_TYPE, SOME_CE_TASK_TYPE + "_3"); | |||
CeTaskProcessorRepositoryImpl underTest = new CeTaskProcessorRepositoryImpl(new CeTaskProcessor[] {taskProcessor}); | |||
assertThat(underTest.getForCeTask(createCeTask(SOME_CE_TASK_TYPE, SOME_COMPONENT_KEY)).get()).isSameAs(taskProcessor); | |||
} | |||
private CeTaskProcessor createCeTaskProcessor(final String... ceTaskTypes) { | |||
return new HandleTypeOnlyTaskProcessor(ceTaskTypes); | |||
} | |||
private static CeTask createCeTask(String ceTaskType, String key) { | |||
return new CeTask.Builder() | |||
.setType(ceTaskType) | |||
.setUuid("task_uuid_" + key) | |||
.setComponentKey(key).setComponentUuid("uuid_" + key).setComponentName("name_" + key) | |||
.build(); | |||
} | |||
private static class HandleTypeOnlyTaskProcessor implements CeTaskProcessor { | |||
private final String[] ceTaskTypes; | |||
public HandleTypeOnlyTaskProcessor(String... ceTaskTypes) { | |||
this.ceTaskTypes = ceTaskTypes; | |||
} | |||
@Override | |||
public Set<String> getHandledCeTaskTypes() { | |||
return ImmutableSet.copyOf(ceTaskTypes); | |||
} | |||
@Override | |||
public void process(CeTask task) { | |||
throw new UnsupportedOperationException("Process is not implemented"); | |||
} | |||
} | |||
private static class SomeProcessor1 extends HandleTypeOnlyTaskProcessor { | |||
public SomeProcessor1(String... ceTaskTypes) { | |||
super(ceTaskTypes); | |||
} | |||
} | |||
private static class SomeProcessor2 extends HandleTypeOnlyTaskProcessor { | |||
public SomeProcessor2(String... ceTaskTypes) { | |||
super(ceTaskTypes); | |||
} | |||
} | |||
} |
@@ -0,0 +1,77 @@ | |||
/* | |||
* SonarQube, open source software quality management tool. | |||
* Copyright (C) 2008-2014 SonarSource | |||
* mailto:contact AT sonarsource DOT com | |||
* | |||
* SonarQube is free software; you can redistribute it and/or | |||
* modify it under the terms of the GNU Lesser General Public | |||
* License as published by the Free Software Foundation; either | |||
* version 3 of the License, or (at your option) any later version. | |||
* | |||
* SonarQube is distributed in the hope that it will be useful, | |||
* but WITHOUT ANY WARRANTY; without even the implied warranty of | |||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU | |||
* Lesser General Public License for more details. | |||
* | |||
* You should have received a copy of the GNU Lesser General Public License | |||
* along with this program; if not, write to the Free Software Foundation, | |||
* Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. | |||
*/ | |||
package org.sonar.server.computation.taskprocessor; | |||
import com.google.common.base.Optional; | |||
import java.util.HashMap; | |||
import java.util.Map; | |||
import java.util.Set; | |||
import org.junit.rules.ExternalResource; | |||
import org.sonar.server.computation.queue.CeTask; | |||
import static com.google.common.base.Preconditions.checkState; | |||
import static java.util.Objects.requireNonNull; | |||
/** | |||
* A {@link org.junit.Rule} that implements the {@link CeTaskProcessorRepository} interface and | |||
* requires consumer to explicitly define if a specific Task type has an associated {@link CeTaskProcessor} or not. | |||
*/ | |||
public class CeTaskProcessorRepositoryRule extends ExternalResource implements CeTaskProcessorRepository { | |||
private final Map<String, CeTaskProcessor> index = new HashMap<>(); | |||
@Override | |||
protected void after() { | |||
index.clear(); | |||
} | |||
public CeTaskProcessorRepositoryRule setNoProcessorForTask(String taskType) { | |||
index.put(requireNonNull(taskType), NoCeTaskProcessor.INSTANCE); | |||
return this; | |||
} | |||
public CeTaskProcessorRepositoryRule setProcessorForTask(String taskType, CeTaskProcessor taskProcessor) { | |||
index.put(requireNonNull(taskType), requireNonNull(taskProcessor)); | |||
return this; | |||
} | |||
@Override | |||
public Optional<CeTaskProcessor> getForCeTask(CeTask ceTask) { | |||
CeTaskProcessor taskProcessor = index.get(ceTask.getType()); | |||
checkState(taskProcessor != null, "CeTaskProcessor was not set in rule for task %s", ceTask); | |||
return taskProcessor instanceof NoCeTaskProcessor ? Optional.<CeTaskProcessor>absent() : Optional.of(taskProcessor); | |||
} | |||
private enum NoCeTaskProcessor implements CeTaskProcessor { | |||
INSTANCE; | |||
private static final String UOE_MESSAGE = "NoCeTaskProcessor does not implement any method since it not supposed to be ever used"; | |||
@Override | |||
public Set<String> getHandledCeTaskTypes() { | |||
throw new UnsupportedOperationException(UOE_MESSAGE); | |||
} | |||
@Override | |||
public void process(CeTask task) { | |||
throw new UnsupportedOperationException(UOE_MESSAGE); | |||
} | |||
} | |||
} |
@@ -17,16 +17,20 @@ | |||
* along with this program; if not, write to the Free Software Foundation, | |||
* Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. | |||
*/ | |||
package org.sonar.server.computation.queue; | |||
package org.sonar.server.computation.taskprocessor; | |||
import com.google.common.base.Optional; | |||
import org.junit.Rule; | |||
import org.junit.Test; | |||
import org.mockito.InOrder; | |||
import org.mockito.Mockito; | |||
import org.sonar.db.ce.CeActivityDto; | |||
import org.sonar.db.ce.CeTaskTypes; | |||
import org.sonar.server.computation.queue.report.ReportTaskProcessor; | |||
import org.sonar.server.computation.log.CeLogging; | |||
import org.sonar.server.computation.queue.CeQueue; | |||
import org.sonar.server.computation.queue.CeQueueImpl; | |||
import org.sonar.server.computation.queue.CeTask; | |||
import org.sonar.server.computation.taskprocessor.report.ReportTaskProcessor; | |||
import static org.mockito.Mockito.doThrow; | |||
import static org.mockito.Mockito.mock; | |||
@@ -35,10 +39,14 @@ import static org.mockito.Mockito.when; | |||
public class CeWorkerRunnableImplTest { | |||
@Rule | |||
public CeTaskProcessorRepositoryRule taskProcessorRepository = new CeTaskProcessorRepositoryRule(); | |||
CeQueue queue = mock(CeQueueImpl.class); | |||
ReportTaskProcessor taskProcessor = mock(ReportTaskProcessor.class); | |||
CeLogging ceLogging = mock(CeLogging.class); | |||
CeWorkerRunnable underTest = new CeWorkerRunnableImpl(queue, taskProcessor, ceLogging); | |||
CeWorkerRunnable underTest = new CeWorkerRunnableImpl(queue, ceLogging, taskProcessorRepository); | |||
InOrder inOrder = Mockito.inOrder(ceLogging, taskProcessor, queue); | |||
@Test | |||
public void no_pending_tasks_in_queue() throws Exception { | |||
@@ -49,14 +57,27 @@ public class CeWorkerRunnableImplTest { | |||
verifyZeroInteractions(taskProcessor, ceLogging); | |||
} | |||
@Test | |||
public void fail_when_no_CeTaskProcessor_is_found_in_repository() { | |||
CeTask task = new CeTask.Builder().setUuid("TASK_1").setType(CeTaskTypes.REPORT).setComponentUuid("PROJECT_1").setSubmitterLogin(null).build(); | |||
taskProcessorRepository.setNoProcessorForTask(CeTaskTypes.REPORT); | |||
when(queue.peek()).thenReturn(Optional.of(task)); | |||
underTest.run(); | |||
inOrder.verify(ceLogging).initForTask(task); | |||
inOrder.verify(queue).remove(task, CeActivityDto.Status.FAILED); | |||
inOrder.verify(ceLogging).clearForTask(); | |||
} | |||
@Test | |||
public void peek_and_process_task() throws Exception { | |||
CeTask task = new CeTask.Builder().setUuid("TASK_1").setType(CeTaskTypes.REPORT).setComponentUuid("PROJECT_1").setSubmitterLogin(null).build(); | |||
taskProcessorRepository.setProcessorForTask(task.getType(), taskProcessor); | |||
when(queue.peek()).thenReturn(Optional.of(task)); | |||
underTest.run(); | |||
InOrder inOrder = Mockito.inOrder(ceLogging, taskProcessor, queue); | |||
inOrder.verify(ceLogging).initForTask(task); | |||
inOrder.verify(taskProcessor).process(task); | |||
inOrder.verify(queue).remove(task, CeActivityDto.Status.SUCCESS); | |||
@@ -67,11 +88,11 @@ public class CeWorkerRunnableImplTest { | |||
public void fail_to_process_task() throws Exception { | |||
CeTask task = new CeTask.Builder().setUuid("TASK_1").setType(CeTaskTypes.REPORT).setComponentUuid("PROJECT_1").setSubmitterLogin(null).build(); | |||
when(queue.peek()).thenReturn(Optional.of(task)); | |||
doThrow(new IllegalStateException()).when(taskProcessor).process(task); | |||
taskProcessorRepository.setProcessorForTask(task.getType(), taskProcessor); | |||
doThrow(new IllegalStateException("simulate exception thrown by TaskProcessor#process")).when(taskProcessor).process(task); | |||
underTest.run(); | |||
InOrder inOrder = Mockito.inOrder(ceLogging, taskProcessor, queue); | |||
inOrder.verify(ceLogging).initForTask(task); | |||
inOrder.verify(taskProcessor).process(task); | |||
inOrder.verify(queue).remove(task, CeActivityDto.Status.FAILED); |