From 04920926c2673c265a0fdd586b1e319507a75280 Mon Sep 17 00:00:00 2001 From: =?utf8?q?S=C3=A9bastien=20Lesaint?= Date: Mon, 28 Sep 2015 10:38:12 +0200 Subject: [PATCH] SONAR-6831 CeWorkerRunnable is now an interface and impl class in pico --- .../ComputeEngineProcessingModule.java | 2 + .../CeProcessingSchedulerExecutorService.java | 2 +- .../queue/CeProcessingSchedulerImpl.java | 14 +--- .../computation/queue/CeWorkerRunnable.java | 63 +-------------- .../queue/CeWorkerRunnableImpl.java | 81 +++++++++++++++++++ .../queue/CeProcessingSchedulerImplTest.java | 12 +-- .../queue/CeWorkerRunnableTest.java | 2 +- 7 files changed, 97 insertions(+), 79 deletions(-) create mode 100644 server/sonar-server/src/main/java/org/sonar/server/computation/queue/CeWorkerRunnableImpl.java diff --git a/server/sonar-server/src/main/java/org/sonar/server/computation/ComputeEngineProcessingModule.java b/server/sonar-server/src/main/java/org/sonar/server/computation/ComputeEngineProcessingModule.java index 8766d7a8e27..3cd9c0684dd 100644 --- a/server/sonar-server/src/main/java/org/sonar/server/computation/ComputeEngineProcessingModule.java +++ b/server/sonar-server/src/main/java/org/sonar/server/computation/ComputeEngineProcessingModule.java @@ -23,6 +23,7 @@ import org.sonar.core.platform.Module; import org.sonar.server.computation.container.ContainerFactoryImpl; import org.sonar.server.computation.queue.CeProcessingSchedulerExecutorServiceImpl; import org.sonar.server.computation.queue.CeProcessingSchedulerImpl; +import org.sonar.server.computation.queue.CeWorkerRunnableImpl; import org.sonar.server.computation.queue.report.ReportTaskProcessor; public class ComputeEngineProcessingModule extends Module { @@ -33,6 +34,7 @@ public class ComputeEngineProcessingModule extends Module { ComputationStepExecutor.class, ReportTaskProcessor.class, CeProcessingSchedulerExecutorServiceImpl.class, + CeWorkerRunnableImpl.class, CeProcessingSchedulerImpl.class); } } diff --git a/server/sonar-server/src/main/java/org/sonar/server/computation/queue/CeProcessingSchedulerExecutorService.java b/server/sonar-server/src/main/java/org/sonar/server/computation/queue/CeProcessingSchedulerExecutorService.java index 54d0be270ae..bed8e1f09f9 100644 --- a/server/sonar-server/src/main/java/org/sonar/server/computation/queue/CeProcessingSchedulerExecutorService.java +++ b/server/sonar-server/src/main/java/org/sonar/server/computation/queue/CeProcessingSchedulerExecutorService.java @@ -22,7 +22,7 @@ package org.sonar.server.computation.queue; import org.sonar.server.util.StoppableScheduledExecutorService; /** - * The {@link java.util.concurrent.ExecutorService} responsible for running {@link CeWorkerRunnable}. + * The {@link java.util.concurrent.ExecutorService} responsible for running {@link CeWorkerRunnableImpl}. */ public interface CeProcessingSchedulerExecutorService extends StoppableScheduledExecutorService { } diff --git a/server/sonar-server/src/main/java/org/sonar/server/computation/queue/CeProcessingSchedulerImpl.java b/server/sonar-server/src/main/java/org/sonar/server/computation/queue/CeProcessingSchedulerImpl.java index 4b918246c7d..b9d32e14fb0 100644 --- a/server/sonar-server/src/main/java/org/sonar/server/computation/queue/CeProcessingSchedulerImpl.java +++ b/server/sonar-server/src/main/java/org/sonar/server/computation/queue/CeProcessingSchedulerImpl.java @@ -20,24 +20,18 @@ package org.sonar.server.computation.queue; import java.util.concurrent.TimeUnit; -import org.sonar.server.computation.queue.report.ReportTaskProcessor; -import org.sonar.server.computation.log.CeLogging; public class CeProcessingSchedulerImpl implements CeProcessingScheduler { private final CeProcessingSchedulerExecutorService executorService; - private final CeQueue ceQueue; - private final ReportTaskProcessor reportTaskProcessor; - private final CeLogging ceLogging; + private final CeWorkerRunnable workerRunnable; private final long delayBetweenTasks; private final long delayForFirstStart; private final TimeUnit timeUnit; - public CeProcessingSchedulerImpl(CeProcessingSchedulerExecutorService processingExecutorService, CeQueue ceQueue, ReportTaskProcessor reportTaskProcessor, CeLogging ceLogging) { + public CeProcessingSchedulerImpl(CeProcessingSchedulerExecutorService processingExecutorService, CeWorkerRunnable workerRunnable) { this.executorService = processingExecutorService; - this.ceQueue = ceQueue; - this.reportTaskProcessor = reportTaskProcessor; - this.ceLogging = ceLogging; + this.workerRunnable = workerRunnable; this.delayBetweenTasks = 10; this.delayForFirstStart = 0; @@ -46,7 +40,7 @@ public class CeProcessingSchedulerImpl implements CeProcessingScheduler { @Override public void startScheduling() { - executorService.scheduleAtFixedRate(new CeWorkerRunnable(ceQueue, reportTaskProcessor, ceLogging), delayForFirstStart, delayBetweenTasks, timeUnit); + executorService.scheduleAtFixedRate(workerRunnable, delayForFirstStart, delayBetweenTasks, timeUnit); } } diff --git a/server/sonar-server/src/main/java/org/sonar/server/computation/queue/CeWorkerRunnable.java b/server/sonar-server/src/main/java/org/sonar/server/computation/queue/CeWorkerRunnable.java index 20cebe76f53..5de95c6be16 100644 --- a/server/sonar-server/src/main/java/org/sonar/server/computation/queue/CeWorkerRunnable.java +++ b/server/sonar-server/src/main/java/org/sonar/server/computation/queue/CeWorkerRunnable.java @@ -17,65 +17,10 @@ * along with this program; if not, write to the Free Software Foundation, * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. */ - package org.sonar.server.computation.queue; -import com.google.common.base.Optional; -import org.sonar.api.utils.log.Logger; -import org.sonar.api.utils.log.Loggers; -import org.sonar.core.util.logs.Profiler; -import org.sonar.db.ce.CeActivityDto; -import org.sonar.server.computation.log.CeLogging; -import org.sonar.server.computation.queue.report.ReportTaskProcessor; - -import static java.lang.String.format; - -class CeWorkerRunnable implements Runnable { - - private static final Logger LOG = Loggers.get(CeWorkerRunnable.class); - - private final CeQueue queue; - private final ReportTaskProcessor reportTaskProcessor; - private final CeLogging ceLogging; - - public CeWorkerRunnable(CeQueue queue, ReportTaskProcessor reportTaskProcessor, CeLogging ceLogging) { - this.queue = queue; - this.reportTaskProcessor = reportTaskProcessor; - this.ceLogging = ceLogging; - } - - @Override - public void run() { - Optional ceTask = tryAndFindTaskToExecute(); - if (!ceTask.isPresent()) { - return; - } - - executeTask(ceTask.get()); - } - - private Optional tryAndFindTaskToExecute() { - try { - return queue.peek(); - } catch (Exception e) { - LOG.error("Failed to pop the queue of analysis reports", e); - } - return Optional.absent(); - } - - private void executeTask(CeTask task) { - ceLogging.initForTask(task); - Profiler profiler = Profiler.create(LOG).startInfo(format("Analysis of project %s (report %s)", task.getComponentKey(), task.getUuid())); - try { - // TODO delegate the message to the related task processor, according to task type - reportTaskProcessor.process(task); - queue.remove(task, CeActivityDto.Status.SUCCESS); - } catch (Throwable e) { - LOG.error(format("Failed to process task %s", task.getUuid()), e); - queue.remove(task, CeActivityDto.Status.FAILED); - } finally { - profiler.stopInfo(String.format("Total thread execution of project %s (report %s)", task.getComponentUuid(), task.getUuid())); - ceLogging.clearForTask(); - } - } +/** + * Marker interface of the runnable in charge of polling the {@link CeQueue} and executing {@link CeTask}. + */ +public interface CeWorkerRunnable extends Runnable { } diff --git a/server/sonar-server/src/main/java/org/sonar/server/computation/queue/CeWorkerRunnableImpl.java b/server/sonar-server/src/main/java/org/sonar/server/computation/queue/CeWorkerRunnableImpl.java new file mode 100644 index 00000000000..7501cf430a8 --- /dev/null +++ b/server/sonar-server/src/main/java/org/sonar/server/computation/queue/CeWorkerRunnableImpl.java @@ -0,0 +1,81 @@ +/* + * SonarQube, open source software quality management tool. + * Copyright (C) 2008-2014 SonarSource + * mailto:contact AT sonarsource DOT com + * + * SonarQube is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 3 of the License, or (at your option) any later version. + * + * SonarQube is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with this program; if not, write to the Free Software Foundation, + * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package org.sonar.server.computation.queue; + +import com.google.common.base.Optional; +import org.sonar.api.utils.log.Logger; +import org.sonar.api.utils.log.Loggers; +import org.sonar.core.util.logs.Profiler; +import org.sonar.db.ce.CeActivityDto; +import org.sonar.server.computation.log.CeLogging; +import org.sonar.server.computation.queue.report.ReportTaskProcessor; + +import static java.lang.String.format; + +public class CeWorkerRunnableImpl implements CeWorkerRunnable { + + private static final Logger LOG = Loggers.get(CeWorkerRunnableImpl.class); + + private final CeQueue queue; + private final ReportTaskProcessor reportTaskProcessor; + private final CeLogging ceLogging; + + public CeWorkerRunnableImpl(CeQueue queue, ReportTaskProcessor reportTaskProcessor, CeLogging ceLogging) { + this.queue = queue; + this.reportTaskProcessor = reportTaskProcessor; + this.ceLogging = ceLogging; + } + + @Override + public void run() { + Optional ceTask = tryAndFindTaskToExecute(); + if (!ceTask.isPresent()) { + return; + } + + executeTask(ceTask.get()); + } + + private Optional tryAndFindTaskToExecute() { + try { + return queue.peek(); + } catch (Exception e) { + LOG.error("Failed to pop the queue of analysis reports", e); + } + return Optional.absent(); + } + + private void executeTask(CeTask task) { + ceLogging.initForTask(task); + Profiler profiler = Profiler.create(LOG).startInfo(format("Analysis of project %s (report %s)", task.getComponentKey(), task.getUuid())); + try { + // TODO delegate the message to the related task processor, according to task type + reportTaskProcessor.process(task); + queue.remove(task, CeActivityDto.Status.SUCCESS); + } catch (Throwable e) { + LOG.error(format("Failed to process task %s", task.getUuid()), e); + queue.remove(task, CeActivityDto.Status.FAILED); + } finally { + profiler.stopInfo(String.format("Total thread execution of project %s (report %s)", task.getComponentUuid(), task.getUuid())); + ceLogging.clearForTask(); + } + } +} diff --git a/server/sonar-server/src/test/java/org/sonar/server/computation/queue/CeProcessingSchedulerImplTest.java b/server/sonar-server/src/test/java/org/sonar/server/computation/queue/CeProcessingSchedulerImplTest.java index 2a031bfb52d..b76640dd572 100644 --- a/server/sonar-server/src/test/java/org/sonar/server/computation/queue/CeProcessingSchedulerImplTest.java +++ b/server/sonar-server/src/test/java/org/sonar/server/computation/queue/CeProcessingSchedulerImplTest.java @@ -21,27 +21,23 @@ package org.sonar.server.computation.queue; import java.util.concurrent.TimeUnit; import org.junit.Test; -import org.sonar.server.computation.queue.report.ReportTaskProcessor; -import org.sonar.server.computation.log.CeLogging; -import static org.mockito.Matchers.any; import static org.mockito.Matchers.eq; +import static org.mockito.Matchers.same; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoMoreInteractions; public class CeProcessingSchedulerImplTest { private CeProcessingSchedulerExecutorService processingExecutorService = mock(CeProcessingSchedulerExecutorService.class); - private CeQueue ceQueue = mock(CeQueue.class); - private ReportTaskProcessor reportTaskProcessor = mock(ReportTaskProcessor.class); - private CeLogging ceLogging = mock(CeLogging.class); - private CeProcessingSchedulerImpl underTest = new CeProcessingSchedulerImpl(processingExecutorService, ceQueue, reportTaskProcessor, ceLogging); + private CeWorkerRunnable workerRunnable = mock(CeWorkerRunnable.class); + private CeProcessingSchedulerImpl underTest = new CeProcessingSchedulerImpl(processingExecutorService, workerRunnable); @Test public void startScheduling_schedules_CeWorkerRunnable_at_fixed_rate_run_head_of_queue() { underTest.startScheduling(); - verify(processingExecutorService).scheduleAtFixedRate(any(CeWorkerRunnable.class), eq(0L), eq(10L), eq(TimeUnit.SECONDS)); + verify(processingExecutorService).scheduleAtFixedRate(same(workerRunnable), eq(0L), eq(10L), eq(TimeUnit.SECONDS)); verifyNoMoreInteractions(processingExecutorService); } diff --git a/server/sonar-server/src/test/java/org/sonar/server/computation/queue/CeWorkerRunnableTest.java b/server/sonar-server/src/test/java/org/sonar/server/computation/queue/CeWorkerRunnableTest.java index f7ed66b6797..75578033f52 100644 --- a/server/sonar-server/src/test/java/org/sonar/server/computation/queue/CeWorkerRunnableTest.java +++ b/server/sonar-server/src/test/java/org/sonar/server/computation/queue/CeWorkerRunnableTest.java @@ -38,7 +38,7 @@ public class CeWorkerRunnableTest { CeQueue queue = mock(CeQueueImpl.class); ReportTaskProcessor taskProcessor = mock(ReportTaskProcessor.class); CeLogging ceLogging = mock(CeLogging.class); - CeWorkerRunnable underTest = new CeWorkerRunnable(queue, taskProcessor, ceLogging); + CeWorkerRunnable underTest = new CeWorkerRunnableImpl(queue, taskProcessor, ceLogging); @Test public void no_pending_tasks_in_queue() throws Exception { -- 2.39.5