]> source.dussan.org Git - sonarqube.git/commitdiff
SONAR-6831 CeWorkerRunnable is now an interface and impl class in pico
authorSébastien Lesaint <sebastien.lesaint@sonarsource.com>
Mon, 28 Sep 2015 08:38:12 +0000 (10:38 +0200)
committerSébastien Lesaint <sebastien.lesaint@sonarsource.com>
Mon, 28 Sep 2015 10:22:13 +0000 (12:22 +0200)
server/sonar-server/src/main/java/org/sonar/server/computation/ComputeEngineProcessingModule.java
server/sonar-server/src/main/java/org/sonar/server/computation/queue/CeProcessingSchedulerExecutorService.java
server/sonar-server/src/main/java/org/sonar/server/computation/queue/CeProcessingSchedulerImpl.java
server/sonar-server/src/main/java/org/sonar/server/computation/queue/CeWorkerRunnable.java
server/sonar-server/src/main/java/org/sonar/server/computation/queue/CeWorkerRunnableImpl.java [new file with mode: 0644]
server/sonar-server/src/test/java/org/sonar/server/computation/queue/CeProcessingSchedulerImplTest.java
server/sonar-server/src/test/java/org/sonar/server/computation/queue/CeWorkerRunnableTest.java

index 8766d7a8e27051bd8cb237601ff6a51fe2b33204..3cd9c0684dd21d16906878ab70a7866497dcc947 100644 (file)
@@ -23,6 +23,7 @@ import org.sonar.core.platform.Module;
 import org.sonar.server.computation.container.ContainerFactoryImpl;
 import org.sonar.server.computation.queue.CeProcessingSchedulerExecutorServiceImpl;
 import org.sonar.server.computation.queue.CeProcessingSchedulerImpl;
+import org.sonar.server.computation.queue.CeWorkerRunnableImpl;
 import org.sonar.server.computation.queue.report.ReportTaskProcessor;
 
 public class ComputeEngineProcessingModule extends Module {
@@ -33,6 +34,7 @@ public class ComputeEngineProcessingModule extends Module {
       ComputationStepExecutor.class,
       ReportTaskProcessor.class,
       CeProcessingSchedulerExecutorServiceImpl.class,
+      CeWorkerRunnableImpl.class,
       CeProcessingSchedulerImpl.class);
   }
 }
index 54d0be270ae0978a3832be40400fba82f203514c..bed8e1f09f94146dc53429b418dfb2419773d835 100644 (file)
@@ -22,7 +22,7 @@ package org.sonar.server.computation.queue;
 import org.sonar.server.util.StoppableScheduledExecutorService;
 
 /**
- * The {@link java.util.concurrent.ExecutorService} responsible for running {@link CeWorkerRunnable}.
+ * The {@link java.util.concurrent.ExecutorService} responsible for running {@link CeWorkerRunnableImpl}.
  */
 public interface CeProcessingSchedulerExecutorService extends StoppableScheduledExecutorService {
 }
index 4b918246c7d10ed95c2d7f00e338b0a8ef8d2ab0..b9d32e14fb049f40efa3e07ff5480a14e9ec0f65 100644 (file)
 package org.sonar.server.computation.queue;
 
 import java.util.concurrent.TimeUnit;
-import org.sonar.server.computation.queue.report.ReportTaskProcessor;
-import org.sonar.server.computation.log.CeLogging;
 
 public class CeProcessingSchedulerImpl implements CeProcessingScheduler {
   private final CeProcessingSchedulerExecutorService executorService;
-  private final CeQueue ceQueue;
-  private final ReportTaskProcessor reportTaskProcessor;
-  private final CeLogging ceLogging;
+  private final CeWorkerRunnable workerRunnable;
 
   private final long delayBetweenTasks;
   private final long delayForFirstStart;
   private final TimeUnit timeUnit;
 
-  public CeProcessingSchedulerImpl(CeProcessingSchedulerExecutorService processingExecutorService, CeQueue ceQueue, ReportTaskProcessor reportTaskProcessor, CeLogging ceLogging) {
+  public CeProcessingSchedulerImpl(CeProcessingSchedulerExecutorService processingExecutorService, CeWorkerRunnable workerRunnable) {
     this.executorService = processingExecutorService;
-    this.ceQueue = ceQueue;
-    this.reportTaskProcessor = reportTaskProcessor;
-    this.ceLogging = ceLogging;
+    this.workerRunnable = workerRunnable;
 
     this.delayBetweenTasks = 10;
     this.delayForFirstStart = 0;
@@ -46,7 +40,7 @@ public class CeProcessingSchedulerImpl implements CeProcessingScheduler {
 
   @Override
   public void startScheduling() {
-    executorService.scheduleAtFixedRate(new CeWorkerRunnable(ceQueue, reportTaskProcessor, ceLogging), delayForFirstStart, delayBetweenTasks, timeUnit);
+    executorService.scheduleAtFixedRate(workerRunnable, delayForFirstStart, delayBetweenTasks, timeUnit);
   }
 
 }
index 20cebe76f53d3ad2a04decf551b8b383d3c5c78f..5de95c6be16a7f5ebac1ac30d24c7e195e518c32 100644 (file)
  * along with this program; if not, write to the Free Software Foundation,
  * Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
  */
-
 package org.sonar.server.computation.queue;
 
-import com.google.common.base.Optional;
-import org.sonar.api.utils.log.Logger;
-import org.sonar.api.utils.log.Loggers;
-import org.sonar.core.util.logs.Profiler;
-import org.sonar.db.ce.CeActivityDto;
-import org.sonar.server.computation.log.CeLogging;
-import org.sonar.server.computation.queue.report.ReportTaskProcessor;
-
-import static java.lang.String.format;
-
-class CeWorkerRunnable implements Runnable {
-
-  private static final Logger LOG = Loggers.get(CeWorkerRunnable.class);
-
-  private final CeQueue queue;
-  private final ReportTaskProcessor reportTaskProcessor;
-  private final CeLogging ceLogging;
-
-  public CeWorkerRunnable(CeQueue queue, ReportTaskProcessor reportTaskProcessor, CeLogging ceLogging) {
-    this.queue = queue;
-    this.reportTaskProcessor = reportTaskProcessor;
-    this.ceLogging = ceLogging;
-  }
-
-  @Override
-  public void run() {
-    Optional<CeTask> ceTask = tryAndFindTaskToExecute();
-    if (!ceTask.isPresent()) {
-      return;
-    }
-
-    executeTask(ceTask.get());
-  }
-
-  private Optional<CeTask> tryAndFindTaskToExecute() {
-    try {
-      return queue.peek();
-    } catch (Exception e) {
-      LOG.error("Failed to pop the queue of analysis reports", e);
-    }
-    return Optional.absent();
-  }
-
-  private void executeTask(CeTask task) {
-    ceLogging.initForTask(task);
-    Profiler profiler = Profiler.create(LOG).startInfo(format("Analysis of project %s (report %s)", task.getComponentKey(), task.getUuid()));
-    try {
-      // TODO delegate the message to the related task processor, according to task type
-      reportTaskProcessor.process(task);
-      queue.remove(task, CeActivityDto.Status.SUCCESS);
-    } catch (Throwable e) {
-      LOG.error(format("Failed to process task %s", task.getUuid()), e);
-      queue.remove(task, CeActivityDto.Status.FAILED);
-    } finally {
-      profiler.stopInfo(String.format("Total thread execution of project %s (report %s)", task.getComponentUuid(), task.getUuid()));
-      ceLogging.clearForTask();
-    }
-  }
+/**
+ * Marker interface of the runnable in charge of polling the {@link CeQueue} and executing {@link CeTask}.
+ */
+public interface CeWorkerRunnable extends Runnable {
 }
diff --git a/server/sonar-server/src/main/java/org/sonar/server/computation/queue/CeWorkerRunnableImpl.java b/server/sonar-server/src/main/java/org/sonar/server/computation/queue/CeWorkerRunnableImpl.java
new file mode 100644 (file)
index 0000000..7501cf4
--- /dev/null
@@ -0,0 +1,81 @@
+/*
+ * SonarQube, open source software quality management tool.
+ * Copyright (C) 2008-2014 SonarSource
+ * mailto:contact AT sonarsource DOT com
+ *
+ * SonarQube is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 3 of the License, or (at your option) any later version.
+ *
+ * SonarQube is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this program; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
+ */
+
+package org.sonar.server.computation.queue;
+
+import com.google.common.base.Optional;
+import org.sonar.api.utils.log.Logger;
+import org.sonar.api.utils.log.Loggers;
+import org.sonar.core.util.logs.Profiler;
+import org.sonar.db.ce.CeActivityDto;
+import org.sonar.server.computation.log.CeLogging;
+import org.sonar.server.computation.queue.report.ReportTaskProcessor;
+
+import static java.lang.String.format;
+
+public class CeWorkerRunnableImpl implements CeWorkerRunnable {
+
+  private static final Logger LOG = Loggers.get(CeWorkerRunnableImpl.class);
+
+  private final CeQueue queue;
+  private final ReportTaskProcessor reportTaskProcessor;
+  private final CeLogging ceLogging;
+
+  public CeWorkerRunnableImpl(CeQueue queue, ReportTaskProcessor reportTaskProcessor, CeLogging ceLogging) {
+    this.queue = queue;
+    this.reportTaskProcessor = reportTaskProcessor;
+    this.ceLogging = ceLogging;
+  }
+
+  @Override
+  public void run() {
+    Optional<CeTask> ceTask = tryAndFindTaskToExecute();
+    if (!ceTask.isPresent()) {
+      return;
+    }
+
+    executeTask(ceTask.get());
+  }
+
+  private Optional<CeTask> tryAndFindTaskToExecute() {
+    try {
+      return queue.peek();
+    } catch (Exception e) {
+      LOG.error("Failed to pop the queue of analysis reports", e);
+    }
+    return Optional.absent();
+  }
+
+  private void executeTask(CeTask task) {
+    ceLogging.initForTask(task);
+    Profiler profiler = Profiler.create(LOG).startInfo(format("Analysis of project %s (report %s)", task.getComponentKey(), task.getUuid()));
+    try {
+      // TODO delegate the message to the related task processor, according to task type
+      reportTaskProcessor.process(task);
+      queue.remove(task, CeActivityDto.Status.SUCCESS);
+    } catch (Throwable e) {
+      LOG.error(format("Failed to process task %s", task.getUuid()), e);
+      queue.remove(task, CeActivityDto.Status.FAILED);
+    } finally {
+      profiler.stopInfo(String.format("Total thread execution of project %s (report %s)", task.getComponentUuid(), task.getUuid()));
+      ceLogging.clearForTask();
+    }
+  }
+}
index 2a031bfb52db588460c8511b529ec617e1cb5a43..b76640dd5729cf75ccc3c0017a8b55aa71d99337 100644 (file)
@@ -21,27 +21,23 @@ package org.sonar.server.computation.queue;
 
 import java.util.concurrent.TimeUnit;
 import org.junit.Test;
-import org.sonar.server.computation.queue.report.ReportTaskProcessor;
-import org.sonar.server.computation.log.CeLogging;
 
-import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.eq;
+import static org.mockito.Matchers.same;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.verifyNoMoreInteractions;
 
 public class CeProcessingSchedulerImplTest {
   private CeProcessingSchedulerExecutorService processingExecutorService = mock(CeProcessingSchedulerExecutorService.class);
-  private CeQueue ceQueue = mock(CeQueue.class);
-  private ReportTaskProcessor reportTaskProcessor = mock(ReportTaskProcessor.class);
-  private CeLogging ceLogging = mock(CeLogging.class);
-  private CeProcessingSchedulerImpl underTest = new CeProcessingSchedulerImpl(processingExecutorService, ceQueue, reportTaskProcessor, ceLogging);
+  private CeWorkerRunnable workerRunnable = mock(CeWorkerRunnable.class);
+  private CeProcessingSchedulerImpl underTest = new CeProcessingSchedulerImpl(processingExecutorService, workerRunnable);
 
   @Test
   public void startScheduling_schedules_CeWorkerRunnable_at_fixed_rate_run_head_of_queue() {
     underTest.startScheduling();
 
-    verify(processingExecutorService).scheduleAtFixedRate(any(CeWorkerRunnable.class), eq(0L), eq(10L), eq(TimeUnit.SECONDS));
+    verify(processingExecutorService).scheduleAtFixedRate(same(workerRunnable), eq(0L), eq(10L), eq(TimeUnit.SECONDS));
     verifyNoMoreInteractions(processingExecutorService);
   }
 
index f7ed66b6797ff51ceeddb3d65d2412f154db42a0..75578033f52ecf4297b381118df753d76b7ea500 100644 (file)
@@ -38,7 +38,7 @@ public class CeWorkerRunnableTest {
   CeQueue queue = mock(CeQueueImpl.class);
   ReportTaskProcessor taskProcessor = mock(ReportTaskProcessor.class);
   CeLogging ceLogging = mock(CeLogging.class);
-  CeWorkerRunnable underTest = new CeWorkerRunnable(queue, taskProcessor, ceLogging);
+  CeWorkerRunnable underTest = new CeWorkerRunnableImpl(queue, taskProcessor, ceLogging);
 
   @Test
   public void no_pending_tasks_in_queue() throws Exception {