diff options
22 files changed, 703 insertions, 161 deletions
diff --git a/server/sonar-server/src/main/java/org/sonar/server/computation/ComputeEngineBatchExecutorService.java b/server/sonar-server/src/main/java/org/sonar/server/computation/ComputeEngineBatchExecutorService.java new file mode 100644 index 00000000000..3f1618ca37b --- /dev/null +++ b/server/sonar-server/src/main/java/org/sonar/server/computation/ComputeEngineBatchExecutorService.java @@ -0,0 +1,29 @@ +/* + * SonarQube, open source software quality management tool. + * Copyright (C) 2008-2014 SonarSource + * mailto:contact AT sonarsource DOT com + * + * SonarQube is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 3 of the License, or (at your option) any later version. + * + * SonarQube is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with this program; if not, write to the Free Software Foundation, + * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ +package org.sonar.server.computation; + +import java.util.concurrent.ScheduledExecutorService; +import org.sonar.server.util.StoppableExecutorService; + +/** + * ExecutorService responsible for adding {@link ReportProcessingTask} to {@link ReportQueue} on a regular basis. + */ +public interface ComputeEngineBatchExecutorService extends ScheduledExecutorService, StoppableExecutorService { +} diff --git a/server/sonar-server/src/main/java/org/sonar/server/computation/ComputeEngineBatchExecutorServiceImpl.java b/server/sonar-server/src/main/java/org/sonar/server/computation/ComputeEngineBatchExecutorServiceImpl.java new file mode 100644 index 00000000000..df1c8e2cbf3 --- /dev/null +++ b/server/sonar-server/src/main/java/org/sonar/server/computation/ComputeEngineBatchExecutorServiceImpl.java @@ -0,0 +1,63 @@ +/* + * SonarQube, open source software quality management tool. + * Copyright (C) 2008-2014 SonarSource + * mailto:contact AT sonarsource DOT com + * + * SonarQube is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 3 of the License, or (at your option) any later version. + * + * SonarQube is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with this program; if not, write to the Free Software Foundation, + * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ +package org.sonar.server.computation; + +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import java.util.concurrent.Callable; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import org.sonar.server.util.AbstractStoppableExecutorService; + +public class ComputeEngineBatchExecutorServiceImpl extends AbstractStoppableExecutorService<ScheduledExecutorService> + implements ComputeEngineBatchExecutorService { + private static final String THREAD_NAME_PREFIX = "ce-batch-scheduler-"; + + public ComputeEngineBatchExecutorServiceImpl() { + super( + Executors.newSingleThreadScheduledExecutor( + new ThreadFactoryBuilder() + .setNameFormat(THREAD_NAME_PREFIX + "%d") + .setPriority(Thread.MIN_PRIORITY) + .build() + )); + } + + @Override + public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) { + return delegate.schedule(command, delay, unit); + } + + @Override + public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) { + return delegate.schedule(callable, delay, unit); + } + + @Override + public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) { + return delegate.scheduleAtFixedRate(command, initialDelay, period, unit); + } + + @Override + public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) { + return delegate.scheduleWithFixedDelay(command, initialDelay, delay, unit); + } +} diff --git a/server/sonar-server/src/main/java/org/sonar/server/computation/ComputeEngineProcessingExecutorService.java b/server/sonar-server/src/main/java/org/sonar/server/computation/ComputeEngineProcessingExecutorService.java new file mode 100644 index 00000000000..bf75d38d47b --- /dev/null +++ b/server/sonar-server/src/main/java/org/sonar/server/computation/ComputeEngineProcessingExecutorService.java @@ -0,0 +1,29 @@ +/* + * SonarQube, open source software quality management tool. + * Copyright (C) 2008-2014 SonarSource + * mailto:contact AT sonarsource DOT com + * + * SonarQube is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 3 of the License, or (at your option) any later version. + * + * SonarQube is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with this program; if not, write to the Free Software Foundation, + * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ +package org.sonar.server.computation; + +import java.util.concurrent.ScheduledExecutorService; +import org.sonar.server.util.StoppableExecutorService; + +/** + * The {@link java.util.concurrent.ExecutorService} responsible for running {@link ComputeEngineTask}. + */ +public interface ComputeEngineProcessingExecutorService extends StoppableExecutorService, ScheduledExecutorService { +} diff --git a/server/sonar-server/src/main/java/org/sonar/server/computation/ComputeEngineProcessingExecutorServiceImpl.java b/server/sonar-server/src/main/java/org/sonar/server/computation/ComputeEngineProcessingExecutorServiceImpl.java new file mode 100644 index 00000000000..cb4c4db7b3e --- /dev/null +++ b/server/sonar-server/src/main/java/org/sonar/server/computation/ComputeEngineProcessingExecutorServiceImpl.java @@ -0,0 +1,62 @@ +/* + * SonarQube, open source software quality management tool. + * Copyright (C) 2008-2014 SonarSource + * mailto:contact AT sonarsource DOT com + * + * SonarQube is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 3 of the License, or (at your option) any later version. + * + * SonarQube is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with this program; if not, write to the Free Software Foundation, + * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ +package org.sonar.server.computation; + +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import java.util.concurrent.Callable; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import org.sonar.server.util.AbstractStoppableExecutorService; + +public class ComputeEngineProcessingExecutorServiceImpl extends AbstractStoppableExecutorService<ScheduledExecutorService> + implements ComputeEngineProcessingExecutorService { + private static final String THREAD_NAME_PREFIX = "ce-processor-"; + + public ComputeEngineProcessingExecutorServiceImpl() { + super( + Executors.newSingleThreadScheduledExecutor( + new ThreadFactoryBuilder() + .setNameFormat(THREAD_NAME_PREFIX + "%d") + .setPriority(Thread.MIN_PRIORITY) + .build())); + } + + @Override + public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) { + return delegate.schedule(command, delay, unit); + } + + @Override + public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) { + return delegate.schedule(callable, delay, unit); + } + + @Override + public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) { + return delegate.scheduleAtFixedRate(command, initialDelay, period, unit); + } + + @Override + public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) { + return delegate.scheduleWithFixedDelay(command, initialDelay, delay, unit); + } +} diff --git a/server/sonar-server/src/main/java/org/sonar/server/computation/ComputeEngineProcessingModule.java b/server/sonar-server/src/main/java/org/sonar/server/computation/ComputeEngineProcessingModule.java new file mode 100644 index 00000000000..39a21a27997 --- /dev/null +++ b/server/sonar-server/src/main/java/org/sonar/server/computation/ComputeEngineProcessingModule.java @@ -0,0 +1,33 @@ +/* + * SonarQube, open source software quality management tool. + * Copyright (C) 2008-2014 SonarSource + * mailto:contact AT sonarsource DOT com + * + * SonarQube is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 3 of the License, or (at your option) any later version. + * + * SonarQube is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with this program; if not, write to the Free Software Foundation, + * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ +package org.sonar.server.computation; + +import org.sonar.core.platform.Module; + +public class ComputeEngineProcessingModule extends Module { + @Override + protected void configureModule() { + add( + ReportProcessingScheduler.class, + ComputeEngineBatchExecutorServiceImpl.class, + ComputeEngineProcessingExecutorServiceImpl.class, + ComputeEngineProcessingQueueImpl.class); + } +} diff --git a/server/sonar-server/src/main/java/org/sonar/server/computation/ComputeEngineProcessingQueue.java b/server/sonar-server/src/main/java/org/sonar/server/computation/ComputeEngineProcessingQueue.java new file mode 100644 index 00000000000..687aca20c59 --- /dev/null +++ b/server/sonar-server/src/main/java/org/sonar/server/computation/ComputeEngineProcessingQueue.java @@ -0,0 +1,27 @@ +/* + * SonarQube, open source software quality management tool. + * Copyright (C) 2008-2014 SonarSource + * mailto:contact AT sonarsource DOT com + * + * SonarQube is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 3 of the License, or (at your option) any later version. + * + * SonarQube is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with this program; if not, write to the Free Software Foundation, + * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ +package org.sonar.server.computation; + +public interface ComputeEngineProcessingQueue { + /** + * Adds a task to the Compute Engine processing queue. + */ + void addTask(ComputeEngineTask task); +} diff --git a/server/sonar-server/src/main/java/org/sonar/server/computation/ComputeEngineProcessingQueueImpl.java b/server/sonar-server/src/main/java/org/sonar/server/computation/ComputeEngineProcessingQueueImpl.java new file mode 100644 index 00000000000..8fab03546e7 --- /dev/null +++ b/server/sonar-server/src/main/java/org/sonar/server/computation/ComputeEngineProcessingQueueImpl.java @@ -0,0 +1,78 @@ +/* + * SonarQube, open source software quality management tool. + * Copyright (C) 2008-2014 SonarSource + * mailto:contact AT sonarsource DOT com + * + * SonarQube is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 3 of the License, or (at your option) any later version. + * + * SonarQube is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with this program; if not, write to the Free Software Foundation, + * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ +package org.sonar.server.computation; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.Queues; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.TimeUnit; +import org.sonar.api.platform.Server; +import org.sonar.api.platform.ServerStartHandler; + +import static java.util.Objects.requireNonNull; + +public class ComputeEngineProcessingQueueImpl implements ComputeEngineProcessingQueue, ServerStartHandler { + private final ComputeEngineProcessingExecutorService processingService; + private final ConcurrentLinkedQueue<ComputeEngineTask> queue = Queues.newConcurrentLinkedQueue(); + + private final long delayBetweenTasks; + private final long delayForFirstStart; + private final TimeUnit timeUnit; + + public ComputeEngineProcessingQueueImpl(ComputeEngineProcessingExecutorService processingExecutorService) { + this.processingService = processingExecutorService; + + this.delayBetweenTasks = 10; + this.delayForFirstStart = 0; + this.timeUnit = TimeUnit.SECONDS; + } + + @VisibleForTesting + ComputeEngineProcessingQueueImpl(ComputeEngineProcessingExecutorService processingExecutorService, + long delayForFirstStart, long delayBetweenTasks, TimeUnit timeUnit) { + this.processingService = processingExecutorService; + + this.delayBetweenTasks = delayBetweenTasks; + this.delayForFirstStart = delayForFirstStart; + this.timeUnit = timeUnit; + } + + @Override + public void addTask(ComputeEngineTask task) { + requireNonNull(task, "a ComputeEngineTask can not be null"); + + queue.add(task); + } + + @Override + public void onServerStart(Server server) { + processingService.scheduleAtFixedRate(new ProcessHeadOfQueueRunnable(), delayForFirstStart, delayBetweenTasks, timeUnit); + } + + private class ProcessHeadOfQueueRunnable implements Runnable { + @Override + public void run() { + ComputeEngineTask task = queue.poll(); + if (task != null) { + task.run(); + } + } + } +} diff --git a/server/sonar-server/src/main/java/org/sonar/server/computation/ComputeEngineTask.java b/server/sonar-server/src/main/java/org/sonar/server/computation/ComputeEngineTask.java new file mode 100644 index 00000000000..626c11b207f --- /dev/null +++ b/server/sonar-server/src/main/java/org/sonar/server/computation/ComputeEngineTask.java @@ -0,0 +1,26 @@ +/* + * SonarQube, open source software quality management tool. + * Copyright (C) 2008-2014 SonarSource + * mailto:contact AT sonarsource DOT com + * + * SonarQube is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 3 of the License, or (at your option) any later version. + * + * SonarQube is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with this program; if not, write to the Free Software Foundation, + * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ +package org.sonar.server.computation; + +/** + * A task to be executed by the Compute Engine. + */ +public interface ComputeEngineTask extends Runnable { +} diff --git a/server/sonar-server/src/main/java/org/sonar/server/computation/ComputationThreadLauncher.java b/server/sonar-server/src/main/java/org/sonar/server/computation/ReportProcessingScheduler.java index 018f2e9d687..e294f406207 100644 --- a/server/sonar-server/src/main/java/org/sonar/server/computation/ComputationThreadLauncher.java +++ b/server/sonar-server/src/main/java/org/sonar/server/computation/ReportProcessingScheduler.java @@ -20,36 +20,35 @@ package org.sonar.server.computation; -import com.google.common.annotations.VisibleForTesting; -import com.google.common.util.concurrent.ThreadFactoryBuilder; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; -import org.picocontainer.Startable; import org.sonar.api.platform.Server; import org.sonar.api.platform.ServerStartHandler; import org.sonar.core.platform.ComponentContainer; import org.sonar.server.computation.container.ContainerFactory; import org.sonar.server.computation.container.ContainerFactoryImpl; -public class ComputationThreadLauncher implements Startable, ServerStartHandler { - - public static final String THREAD_NAME_PREFIX = "computation-"; +/** + * Adds tasks to the Compute Engine to process batch reports. + */ +public class ReportProcessingScheduler implements ServerStartHandler { + private final ComputeEngineBatchExecutorService batchExecutorService; + private final ComputeEngineProcessingQueue processingQueue; private final ReportQueue queue; private final ComponentContainer sqContainer; - private final ScheduledExecutorService executorService; private final ContainerFactory containerFactory; private final long delayBetweenTasks; private final long delayForFirstStart; private final TimeUnit timeUnit; - public ComputationThreadLauncher(ReportQueue queue, ComponentContainer sqContainer) { + public ReportProcessingScheduler(ComputeEngineBatchExecutorService batchExecutorService, + ComputeEngineProcessingQueue processingQueue, + ReportQueue queue, ComponentContainer sqContainer) { + this.batchExecutorService = batchExecutorService; + this.processingQueue = processingQueue; this.queue = queue; this.sqContainer = sqContainer; - this.executorService = Executors.newSingleThreadScheduledExecutor(newThreadFactory()); this.containerFactory = new ContainerFactoryImpl(); this.delayBetweenTasks = 10; @@ -57,40 +56,19 @@ public class ComputationThreadLauncher implements Startable, ServerStartHandler this.timeUnit = TimeUnit.SECONDS; } - @VisibleForTesting - ComputationThreadLauncher(ReportQueue queue, ComponentContainer sqContainer, ContainerFactory containerFactory, - long delayForFirstStart, long delayBetweenTasks, TimeUnit timeUnit) { - this.queue = queue; - this.sqContainer = sqContainer; - this.containerFactory = containerFactory; - this.executorService = Executors.newSingleThreadScheduledExecutor(newThreadFactory()); - - this.delayBetweenTasks = delayBetweenTasks; - this.delayForFirstStart = delayForFirstStart; - this.timeUnit = timeUnit; - } - - @Override - public void start() { - // do nothing because we want to wait for the server to finish startup - } - - @Override - public void stop() { - executorService.shutdown(); - } - public void startAnalysisTaskNow() { - executorService.execute(new ComputationThread(queue, sqContainer, containerFactory)); + batchExecutorService.execute(new AddBatchProcessingCETaskRunnable()); } @Override public void onServerStart(Server server) { - executorService.scheduleAtFixedRate(new ComputationThread(queue, sqContainer, containerFactory), delayForFirstStart, delayBetweenTasks, timeUnit); + batchExecutorService.scheduleAtFixedRate(new AddBatchProcessingCETaskRunnable(), delayForFirstStart, delayBetweenTasks, timeUnit); } - private static ThreadFactory newThreadFactory() { - return new ThreadFactoryBuilder() - .setNameFormat(THREAD_NAME_PREFIX + "%d").setPriority(Thread.MIN_PRIORITY).build(); + private class AddBatchProcessingCETaskRunnable implements Runnable { + @Override + public void run() { + processingQueue.addTask(new ReportProcessingTask(queue, sqContainer, containerFactory)); + } } } diff --git a/server/sonar-server/src/main/java/org/sonar/server/computation/ComputationThread.java b/server/sonar-server/src/main/java/org/sonar/server/computation/ReportProcessingTask.java index 58b23538c7d..e312f10046c 100644 --- a/server/sonar-server/src/main/java/org/sonar/server/computation/ComputationThread.java +++ b/server/sonar-server/src/main/java/org/sonar/server/computation/ReportProcessingTask.java @@ -28,17 +28,17 @@ import org.sonar.server.computation.container.ComputeEngineContainer; import org.sonar.server.computation.container.ContainerFactory; /** - * This thread pops a report from the queue and integrate it. + * This Compute Engine task pops a report from the queue and integrate it. */ -public class ComputationThread implements Runnable { +public class ReportProcessingTask implements ComputeEngineTask { - private static final Logger LOG = Loggers.get(ComputationThread.class); + private static final Logger LOG = Loggers.get(ReportProcessingTask.class); private final ReportQueue queue; private final ComponentContainer sqContainer; private final ContainerFactory containerFactory; - public ComputationThread(ReportQueue queue, ComponentContainer sqContainer, ContainerFactory containerFactory) { + public ReportProcessingTask(ReportQueue queue, ComponentContainer sqContainer, ContainerFactory containerFactory) { this.queue = queue; this.sqContainer = sqContainer; this.containerFactory = containerFactory; diff --git a/server/sonar-server/src/main/java/org/sonar/server/computation/ComputationService.java b/server/sonar-server/src/main/java/org/sonar/server/computation/ReportProcessor.java index 95e332c975e..f6255658f54 100644 --- a/server/sonar-server/src/main/java/org/sonar/server/computation/ComputationService.java +++ b/server/sonar-server/src/main/java/org/sonar/server/computation/ReportProcessor.java @@ -35,9 +35,9 @@ import static org.sonar.db.compute.AnalysisReportDto.Status.FAILED; import static org.sonar.db.compute.AnalysisReportDto.Status.SUCCESS; @ServerSide -public class ComputationService { +public class ReportProcessor { - private static final Logger LOG = Loggers.get(ComputationService.class); + private static final Logger LOG = Loggers.get(ReportProcessor.class); private final ReportQueue.Item item; private final ComputationSteps steps; @@ -45,7 +45,7 @@ public class ComputationService { private final System2 system; private final CEQueueStatus queueStatus; - public ComputationService(ReportQueue.Item item, ComputationSteps steps, ActivityManager activityManager, System2 system, + public ReportProcessor(ReportQueue.Item item, ComputationSteps steps, ActivityManager activityManager, System2 system, CEQueueStatus queueStatus) { this.item = item; this.steps = steps; diff --git a/server/sonar-server/src/main/java/org/sonar/server/computation/container/ComputeEngineContainerImpl.java b/server/sonar-server/src/main/java/org/sonar/server/computation/container/ComputeEngineContainerImpl.java index b17e1e50824..ac2d5851922 100644 --- a/server/sonar-server/src/main/java/org/sonar/server/computation/container/ComputeEngineContainerImpl.java +++ b/server/sonar-server/src/main/java/org/sonar/server/computation/container/ComputeEngineContainerImpl.java @@ -37,7 +37,7 @@ import org.sonar.api.utils.log.Loggers; import org.sonar.core.issue.tracking.Tracker; import org.sonar.core.platform.ComponentContainer; import org.sonar.core.platform.Module; -import org.sonar.server.computation.ComputationService; +import org.sonar.server.computation.ReportProcessor; import org.sonar.server.computation.ComputationTempFolderProvider; import org.sonar.server.computation.ReportQueue; import org.sonar.server.computation.activity.ActivityManager; @@ -250,14 +250,14 @@ public class ComputeEngineContainerImpl extends ComponentContainer implements Co // views ViewIndex.class, - // ComputationService - ComputationService.class); + // ReportProcessor + ReportProcessor.class); } @Override public void process() { // calls the first - getComponentByType(ComputationService.class).process(); + getComponentByType(ReportProcessor.class).process(); } @Override diff --git a/server/sonar-server/src/main/java/org/sonar/server/computation/ws/SubmitReportAction.java b/server/sonar-server/src/main/java/org/sonar/server/computation/ws/SubmitReportAction.java index 1aafbd2591b..a908efa46fe 100644 --- a/server/sonar-server/src/main/java/org/sonar/server/computation/ws/SubmitReportAction.java +++ b/server/sonar-server/src/main/java/org/sonar/server/computation/ws/SubmitReportAction.java @@ -26,7 +26,7 @@ import org.sonar.api.server.ws.Request; import org.sonar.api.server.ws.Response; import org.sonar.api.server.ws.WebService; import org.sonar.core.permission.GlobalPermissions; -import org.sonar.server.computation.ComputationThreadLauncher; +import org.sonar.server.computation.ReportProcessingScheduler; import org.sonar.server.computation.ReportQueue; import org.sonar.server.computation.monitoring.CEQueueStatus; import org.sonar.server.user.UserSession; @@ -39,11 +39,11 @@ public class SubmitReportAction implements ComputationWsAction { public static final String PARAM_REPORT_DATA = "report"; private final ReportQueue queue; - private final ComputationThreadLauncher workerLauncher; + private final ReportProcessingScheduler workerLauncher; private final UserSession userSession; private final CEQueueStatus queueStatus; - public SubmitReportAction(ReportQueue queue, ComputationThreadLauncher workerLauncher, UserSession userSession, CEQueueStatus queueStatus) { + public SubmitReportAction(ReportQueue queue, ReportProcessingScheduler workerLauncher, UserSession userSession, CEQueueStatus queueStatus) { this.queue = queue; this.workerLauncher = workerLauncher; this.userSession = userSession; diff --git a/server/sonar-server/src/main/java/org/sonar/server/platform/platformlevel/PlatformLevel4.java b/server/sonar-server/src/main/java/org/sonar/server/platform/platformlevel/PlatformLevel4.java index 534edeffa91..db5b2db3de7 100644 --- a/server/sonar-server/src/main/java/org/sonar/server/platform/platformlevel/PlatformLevel4.java +++ b/server/sonar-server/src/main/java/org/sonar/server/platform/platformlevel/PlatformLevel4.java @@ -64,7 +64,7 @@ import org.sonar.server.component.DefaultRubyComponentService; import org.sonar.server.component.ws.ComponentsWs; import org.sonar.server.component.ws.EventsWs; import org.sonar.server.component.ws.ResourcesWs; -import org.sonar.server.computation.ComputationThreadLauncher; +import org.sonar.server.computation.ComputeEngineProcessingModule; import org.sonar.server.computation.ReportQueue; import org.sonar.server.computation.monitoring.CEQueueStatusImpl; import org.sonar.server.computation.monitoring.ComputeEngineQueueMonitor; @@ -715,7 +715,7 @@ public class PlatformLevel4 extends PlatformLevel { CEQueueStatusImpl.class, ComputeEngineQueueMonitor.class, ReportQueue.class, - ComputationThreadLauncher.class, + ComputeEngineProcessingModule.class, ComputationWs.class, IsQueueEmptyWs.class, QueueAction.class, diff --git a/server/sonar-server/src/main/java/org/sonar/server/util/AbstractStoppableExecutorService.java b/server/sonar-server/src/main/java/org/sonar/server/util/AbstractStoppableExecutorService.java index daf575eba7d..059ab25bc27 100644 --- a/server/sonar-server/src/main/java/org/sonar/server/util/AbstractStoppableExecutorService.java +++ b/server/sonar-server/src/main/java/org/sonar/server/util/AbstractStoppableExecutorService.java @@ -36,10 +36,10 @@ import static java.lang.String.format; * Abstract implementation of StoppableExecutorService that implements the * stop() method and delegates all methods to the provided ExecutorService instance. */ -public abstract class AbstractStoppableExecutorService implements StoppableExecutorService { - private final ExecutorService delegate; +public abstract class AbstractStoppableExecutorService<T extends ExecutorService> implements StoppableExecutorService { + protected final T delegate; - public AbstractStoppableExecutorService(ExecutorService delegate) { + public AbstractStoppableExecutorService(T delegate) { this.delegate = delegate; } diff --git a/server/sonar-server/src/test/java/org/sonar/server/computation/ComputationThreadLauncherTest.java b/server/sonar-server/src/test/java/org/sonar/server/computation/ComputationThreadLauncherTest.java deleted file mode 100644 index 7d5d70e0b13..00000000000 --- a/server/sonar-server/src/test/java/org/sonar/server/computation/ComputationThreadLauncherTest.java +++ /dev/null @@ -1,93 +0,0 @@ -/* - * SonarQube, open source software quality management tool. - * Copyright (C) 2008-2014 SonarSource - * mailto:contact AT sonarsource DOT com - * - * SonarQube is free software; you can redistribute it and/or - * modify it under the terms of the GNU Lesser General Public - * License as published by the Free Software Foundation; either - * version 3 of the License, or (at your option) any later version. - * - * SonarQube is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - * Lesser General Public License for more details. - * - * You should have received a copy of the GNU Lesser General Public License - * along with this program; if not, write to the Free Software Foundation, - * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. - */ - -package org.sonar.server.computation; - -import java.util.concurrent.TimeUnit; -import org.junit.After; -import org.junit.Before; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.DisableOnDebug; -import org.junit.rules.TestRule; -import org.junit.rules.Timeout; -import org.sonar.api.platform.Server; -import org.sonar.core.platform.ComponentContainer; -import org.sonar.server.computation.container.ContainerFactory; - -import static org.mockito.Mockito.atLeastOnce; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.verify; - -public class ComputationThreadLauncherTest { - - @Rule - public TestRule timeout = new DisableOnDebug(Timeout.seconds(60)); - - ComputationThreadLauncher underTest; - ReportQueue queue; - ComponentContainer componentContainer; - ContainerFactory containerFactory; - - @Before - public void before() { - this.queue = mock(ReportQueue.class); - this.componentContainer = mock(ComponentContainer.class); - this.containerFactory = mock(ContainerFactory.class); - } - - @After - public void after() { - underTest.stop(); - } - - @Test - public void call_findAndBook_when_launching_a_recurrent_task() throws Exception { - underTest = new ComputationThreadLauncher(queue, componentContainer, containerFactory, 0, 1, TimeUnit.MILLISECONDS); - - underTest.onServerStart(mock(Server.class)); - - sleep(); - - verify(queue, atLeastOnce()).pop(); - } - - @Test - public void call_findAndBook_when_executing_task_immediately() throws Exception { - underTest = new ComputationThreadLauncher(queue, componentContainer, containerFactory, 1, 1, TimeUnit.HOURS); - underTest.start(); - - underTest.startAnalysisTaskNow(); - - sleep(); - - verify(queue, atLeastOnce()).pop(); - } - - @Test - public void test_real_constructor() throws Exception { - underTest = new ComputationThreadLauncher(queue, componentContainer); - underTest.start(); - } - - private void sleep() throws InterruptedException { - TimeUnit.MILLISECONDS.sleep(500L); - } -} diff --git a/server/sonar-server/src/test/java/org/sonar/server/computation/ComputeEngineProcessingQueueImplTest.java b/server/sonar-server/src/test/java/org/sonar/server/computation/ComputeEngineProcessingQueueImplTest.java new file mode 100644 index 00000000000..7fe63ec716c --- /dev/null +++ b/server/sonar-server/src/test/java/org/sonar/server/computation/ComputeEngineProcessingQueueImplTest.java @@ -0,0 +1,220 @@ +/* + * SonarQube, open source software quality management tool. + * Copyright (C) 2008-2014 SonarSource + * mailto:contact AT sonarsource DOT com + * + * SonarQube is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 3 of the License, or (at your option) any later version. + * + * SonarQube is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with this program; if not, write to the Free Software Foundation, + * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ +package org.sonar.server.computation; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import org.junit.Test; +import org.sonar.api.platform.Server; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; + +public class ComputeEngineProcessingQueueImplTest { + + @Test + public void onServerStart_schedules_at_fixed_rate_run_head_of_queue() { + ComputeEngineProcessingExecutorService processingExecutorService = mock(ComputeEngineProcessingExecutorService.class); + + ComputeEngineProcessingQueueImpl underTest = new ComputeEngineProcessingQueueImpl(processingExecutorService); + underTest.onServerStart(mock(Server.class)); + + verify(processingExecutorService).scheduleAtFixedRate(any(Runnable.class), eq(0L), eq(10L), eq(TimeUnit.SECONDS)); + verifyNoMoreInteractions(processingExecutorService); + } + + @Test + public void task_in_queue_is_called_run_only_once() { + ComputeEngineProcessingExecutorServiceAdapter processingExecutorService = new SimulateFixedRateCallsProcessingExecutorService(10); + CallCounterComputeEngineTask task = new CallCounterComputeEngineTask(); + + ComputeEngineProcessingQueueImpl underTest = new ComputeEngineProcessingQueueImpl(processingExecutorService); + underTest.addTask(task); + underTest.onServerStart(mock(Server.class)); + + assertThat(task.calls).isEqualTo(1); + } + + @Test + public void tasks_are_executed_in_order_of_addition() { + ComputeEngineProcessingExecutorServiceAdapter processingExecutorService = new SimulateFixedRateCallsProcessingExecutorService(10); + + final List<Integer> nameList = new ArrayList<>(); + + ComputeEngineProcessingQueueImpl underTest = new ComputeEngineProcessingQueueImpl(processingExecutorService); + underTest.addTask(new ComputeEngineTask() { + @Override + public void run() { + nameList.add(1); + } + }); + underTest.addTask(new ComputeEngineTask() { + @Override + public void run() { + nameList.add(2); + } + }); + underTest.addTask(new ComputeEngineTask() { + @Override + public void run() { + nameList.add(3); + } + }); + underTest.addTask(new ComputeEngineTask() { + @Override + public void run() { + nameList.add(4); + } + }); + + underTest.onServerStart(mock(Server.class)); + + assertThat(nameList).containsExactly(1, 2, 3, 4); + } + + private static class CallCounterComputeEngineTask implements ComputeEngineTask { + int calls = 0; + + @Override + public void run() { + calls++; + } + } + + private static class ComputeEngineProcessingExecutorServiceAdapter implements ComputeEngineProcessingExecutorService { + @Override + public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) { + throw new UnsupportedOperationException("Not implemented!"); + } + + @Override + public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) { + throw new UnsupportedOperationException("Not implemented!"); + } + + @Override + public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) { + throw new UnsupportedOperationException("Not implemented!"); + } + + @Override + public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) { + throw new UnsupportedOperationException("Not implemented!"); + } + + @Override + public void stop() { + throw new UnsupportedOperationException("Not implemented!"); + } + + @Override + public void shutdown() { + throw new UnsupportedOperationException("Not implemented!"); + } + + @Override + public List<Runnable> shutdownNow() { + throw new UnsupportedOperationException("Not implemented!"); + } + + @Override + public boolean isShutdown() { + throw new UnsupportedOperationException("Not implemented!"); + } + + @Override + public boolean isTerminated() { + throw new UnsupportedOperationException("Not implemented!"); + } + + @Override + public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { + throw new UnsupportedOperationException("Not implemented!"); + } + + @Override + public <T> Future<T> submit(Callable<T> task) { + throw new UnsupportedOperationException("Not implemented!"); + } + + @Override + public <T> Future<T> submit(Runnable task, T result) { + throw new UnsupportedOperationException("Not implemented!"); + } + + @Override + public Future<?> submit(Runnable task) { + throw new UnsupportedOperationException("Not implemented!"); + } + + @Override + public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException { + throw new UnsupportedOperationException("Not implemented!"); + } + + @Override + public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException { + throw new UnsupportedOperationException("Not implemented!"); + } + + @Override + public <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException { + throw new UnsupportedOperationException("Not implemented!"); + } + + @Override + public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { + throw new UnsupportedOperationException("Not implemented!"); + } + + @Override + public void execute(Runnable command) { + throw new UnsupportedOperationException("Not implemented!"); + } + } + + private static class SimulateFixedRateCallsProcessingExecutorService extends ComputeEngineProcessingExecutorServiceAdapter { + private final int simulatedCalls; + + private SimulateFixedRateCallsProcessingExecutorService(int simulatedCalls) { + this.simulatedCalls = simulatedCalls; + } + + @Override + public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) { + // calling the runnable any number of times will only get a task run only once + for (int i = 0; i < simulatedCalls ; i++) { + command.run(); + } + return null; + } + } +} diff --git a/server/sonar-server/src/test/java/org/sonar/server/computation/ReportProcessingSchedulerTest.java b/server/sonar-server/src/test/java/org/sonar/server/computation/ReportProcessingSchedulerTest.java new file mode 100644 index 00000000000..8a5382b7c9f --- /dev/null +++ b/server/sonar-server/src/test/java/org/sonar/server/computation/ReportProcessingSchedulerTest.java @@ -0,0 +1,90 @@ +/* + * SonarQube, open source software quality management tool. + * Copyright (C) 2008-2014 SonarSource + * mailto:contact AT sonarsource DOT com + * + * SonarQube is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 3 of the License, or (at your option) any later version. + * + * SonarQube is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with this program; if not, write to the Free Software Foundation, + * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package org.sonar.server.computation; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.TimeUnit; +import org.junit.Test; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; +import org.sonar.api.platform.Server; +import org.sonar.core.platform.ComponentContainer; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class ReportProcessingSchedulerTest { + + ComputeEngineBatchExecutorService batchExecutorService = mock(ComputeEngineBatchExecutorService.class); + SimpleComputeEngineProcessingQueue processingQueue = new SimpleComputeEngineProcessingQueue(); + ReportQueue queue = mock(ReportQueue.class); + ComponentContainer componentContainer = mock(ComponentContainer.class); + + ReportProcessingScheduler underTest = new ReportProcessingScheduler(batchExecutorService, processingQueue, queue, componentContainer); + + @Test + public void schedule_at_fixed_rate_adding_a_ReportProcessingTask_to_the_queue() throws Exception { + when(batchExecutorService.scheduleAtFixedRate(any(Runnable.class), eq(0L), eq(10L), eq(TimeUnit.SECONDS))) + .thenAnswer(new ExecuteFirstArgAsRunnable()); + + underTest.onServerStart(mock(Server.class)); + + assertThat(processingQueue.getTasks()).hasSize(1); + assertThat(processingQueue.getTasks().iterator().next()).isInstanceOf(ReportProcessingTask.class); + } + + @Test + public void adds_immediately_a_ReportProcessingTask_to_the_queue() throws Exception { + doAnswer(new ExecuteFirstArgAsRunnable()).when(batchExecutorService).execute(any(Runnable.class)); + + underTest.startAnalysisTaskNow(); + + assertThat(processingQueue.getTasks()).hasSize(1); + assertThat(processingQueue.getTasks().iterator().next()).isInstanceOf(ReportProcessingTask.class); + } + + private static class SimpleComputeEngineProcessingQueue implements ComputeEngineProcessingQueue { + private final List<ComputeEngineTask> tasks = new ArrayList<>(); + + @Override + public void addTask(ComputeEngineTask task) { + tasks.add(task); + } + + public List<ComputeEngineTask> getTasks() { + return tasks; + } + } + + private static class ExecuteFirstArgAsRunnable implements Answer<Object> { + @Override + public Object answer(InvocationOnMock invocationOnMock) throws Throwable { + Runnable runnable = (Runnable) invocationOnMock.getArguments()[0]; + runnable.run(); + return null; + } + } +} diff --git a/server/sonar-server/src/test/java/org/sonar/server/computation/ComputationThreadTest.java b/server/sonar-server/src/test/java/org/sonar/server/computation/ReportProcessingTaskTest.java index 370bbe72210..65647bd9481 100644 --- a/server/sonar-server/src/test/java/org/sonar/server/computation/ComputationThreadTest.java +++ b/server/sonar-server/src/test/java/org/sonar/server/computation/ReportProcessingTaskTest.java @@ -38,7 +38,7 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyZeroInteractions; import static org.mockito.Mockito.when; -public class ComputationThreadTest { +public class ReportProcessingTaskTest { @Rule public TemporaryFolder temp = new TemporaryFolder(); @@ -48,7 +48,7 @@ public class ComputationThreadTest { ReportQueue queue = mock(ReportQueue.class); ComponentContainer componentContainer = mock(ComponentContainer.class); ContainerFactory containerFactory = mock(ContainerFactory.class); - ComputationThread underTest = new ComputationThread(queue, componentContainer, containerFactory); + ReportProcessingTask underTest = new ReportProcessingTask(queue, componentContainer, containerFactory); @Test public void do_nothing_if_queue_empty() { diff --git a/server/sonar-server/src/test/java/org/sonar/server/computation/ComputationServiceTest.java b/server/sonar-server/src/test/java/org/sonar/server/computation/ReportProcessorTest.java index a4abde6c34e..615a6dc2530 100644 --- a/server/sonar-server/src/test/java/org/sonar/server/computation/ComputationServiceTest.java +++ b/server/sonar-server/src/test/java/org/sonar/server/computation/ReportProcessorTest.java @@ -46,7 +46,7 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.Mockito.when; -public class ComputationServiceTest { +public class ReportProcessorTest { @Rule public LogTester logTester = new LogTester(); @@ -58,11 +58,11 @@ public class ComputationServiceTest { System2 system = mock(System2.class); CEQueueStatus queueStatus = mock(CEQueueStatus.class); AnalysisReportDto dto = AnalysisReportDto.newForTests(1L).setProjectKey("P1").setUuid("U1").setStatus(Status.PENDING); - ComputationService underTest; + ReportProcessor underTest; @Before public void setUp() { - underTest = new ComputationService(new ReportQueue.Item(dto, new File("Do_not_care")), steps, activityManager, system, queueStatus); + underTest = new ReportProcessor(new ReportQueue.Item(dto, new File("Do_not_care")), steps, activityManager, system, queueStatus); } @Test diff --git a/server/sonar-server/src/test/java/org/sonar/server/computation/ws/ComputationWsTest.java b/server/sonar-server/src/test/java/org/sonar/server/computation/ws/ComputationWsTest.java index 4872feb71a7..b40aec69ade 100644 --- a/server/sonar-server/src/test/java/org/sonar/server/computation/ws/ComputationWsTest.java +++ b/server/sonar-server/src/test/java/org/sonar/server/computation/ws/ComputationWsTest.java @@ -23,7 +23,7 @@ package org.sonar.server.computation.ws; import org.junit.Test; import org.sonar.api.server.ws.WebService; import org.sonar.server.activity.index.ActivityIndex; -import org.sonar.server.computation.ComputationThreadLauncher; +import org.sonar.server.computation.ReportProcessingScheduler; import org.sonar.server.computation.ReportQueue; import org.sonar.server.computation.monitoring.CEQueueStatus; import org.sonar.server.user.UserSession; @@ -36,7 +36,7 @@ public class ComputationWsTest { WsTester ws = new WsTester(new ComputationWs( new QueueAction(mock(ReportQueue.class)), - new SubmitReportAction(mock(ReportQueue.class), mock(ComputationThreadLauncher.class), mock(UserSession.class), mock(CEQueueStatus.class)), + new SubmitReportAction(mock(ReportQueue.class), mock(ReportProcessingScheduler.class), mock(UserSession.class), mock(CEQueueStatus.class)), new HistoryAction(mock(ActivityIndex.class), mock(UserSession.class)))); @Test diff --git a/server/sonar-server/src/test/java/org/sonar/server/computation/ws/SubmitReportActionTest.java b/server/sonar-server/src/test/java/org/sonar/server/computation/ws/SubmitReportActionTest.java index 90fc58320aa..18f8fdfc5d0 100644 --- a/server/sonar-server/src/test/java/org/sonar/server/computation/ws/SubmitReportActionTest.java +++ b/server/sonar-server/src/test/java/org/sonar/server/computation/ws/SubmitReportActionTest.java @@ -27,7 +27,7 @@ import org.junit.Test; import org.sonar.api.server.ws.WebService; import org.sonar.core.permission.GlobalPermissions; import org.sonar.db.compute.AnalysisReportDto; -import org.sonar.server.computation.ComputationThreadLauncher; +import org.sonar.server.computation.ReportProcessingScheduler; import org.sonar.server.computation.ReportQueue; import org.sonar.server.computation.monitoring.CEQueueStatus; import org.sonar.server.exceptions.ForbiddenException; @@ -46,7 +46,7 @@ public class SubmitReportActionTest { @Rule public UserSessionRule userSessionRule = UserSessionRule.standalone(); - ComputationThreadLauncher workerLauncher = mock(ComputationThreadLauncher.class); + ReportProcessingScheduler workerLauncher = mock(ReportProcessingScheduler.class); CEQueueStatus queueStatus = mock(CEQueueStatus.class); ReportQueue queue = mock(ReportQueue.class); WsTester wsTester; |