]> source.dussan.org Git - sonarqube.git/commitdiff
SONAR-6749 add CeTaskProcessor concept to support process Views tasks
authorSébastien Lesaint <sebastien.lesaint@sonarsource.com>
Tue, 29 Sep 2015 16:51:23 +0000 (18:51 +0200)
committerSébastien Lesaint <sebastien.lesaint@sonarsource.com>
Thu, 1 Oct 2015 11:44:45 +0000 (13:44 +0200)
31 files changed:
server/sonar-server/src/main/java/org/sonar/server/computation/container/ReportProcessingModule.java
server/sonar-server/src/main/java/org/sonar/server/computation/queue/CeProcessingScheduler.java [deleted file]
server/sonar-server/src/main/java/org/sonar/server/computation/queue/CeProcessingSchedulerExecutorService.java [deleted file]
server/sonar-server/src/main/java/org/sonar/server/computation/queue/CeProcessingSchedulerExecutorServiceImpl.java [deleted file]
server/sonar-server/src/main/java/org/sonar/server/computation/queue/CeProcessingSchedulerImpl.java [deleted file]
server/sonar-server/src/main/java/org/sonar/server/computation/queue/CeQueueInitializer.java
server/sonar-server/src/main/java/org/sonar/server/computation/queue/CeQueueModule.java
server/sonar-server/src/main/java/org/sonar/server/computation/queue/CeWorkerRunnable.java [deleted file]
server/sonar-server/src/main/java/org/sonar/server/computation/queue/CeWorkerRunnableImpl.java [deleted file]
server/sonar-server/src/main/java/org/sonar/server/computation/queue/report/ReportTaskProcessor.java [deleted file]
server/sonar-server/src/main/java/org/sonar/server/computation/taskprocessor/CeProcessingScheduler.java [new file with mode: 0644]
server/sonar-server/src/main/java/org/sonar/server/computation/taskprocessor/CeProcessingSchedulerExecutorService.java [new file with mode: 0644]
server/sonar-server/src/main/java/org/sonar/server/computation/taskprocessor/CeProcessingSchedulerExecutorServiceImpl.java [new file with mode: 0644]
server/sonar-server/src/main/java/org/sonar/server/computation/taskprocessor/CeProcessingSchedulerImpl.java [new file with mode: 0644]
server/sonar-server/src/main/java/org/sonar/server/computation/taskprocessor/CeTaskProcessor.java [new file with mode: 0644]
server/sonar-server/src/main/java/org/sonar/server/computation/taskprocessor/CeTaskProcessorModule.java [new file with mode: 0644]
server/sonar-server/src/main/java/org/sonar/server/computation/taskprocessor/CeTaskProcessorRepository.java [new file with mode: 0644]
server/sonar-server/src/main/java/org/sonar/server/computation/taskprocessor/CeTaskProcessorRepositoryImpl.java [new file with mode: 0644]
server/sonar-server/src/main/java/org/sonar/server/computation/taskprocessor/CeWorkerRunnable.java [new file with mode: 0644]
server/sonar-server/src/main/java/org/sonar/server/computation/taskprocessor/CeWorkerRunnableImpl.java [new file with mode: 0644]
server/sonar-server/src/main/java/org/sonar/server/computation/taskprocessor/package-info.java [new file with mode: 0644]
server/sonar-server/src/main/java/org/sonar/server/computation/taskprocessor/report/ReportTaskProcessor.java [new file with mode: 0644]
server/sonar-server/src/main/java/org/sonar/server/computation/taskprocessor/report/package-info.java [new file with mode: 0644]
server/sonar-server/src/main/java/org/sonar/server/platform/platformlevel/PlatformLevel4.java
server/sonar-server/src/test/java/org/sonar/server/computation/queue/CeProcessingSchedulerImplTest.java [deleted file]
server/sonar-server/src/test/java/org/sonar/server/computation/queue/CeQueueInitializerTest.java
server/sonar-server/src/test/java/org/sonar/server/computation/queue/CeWorkerRunnableImplTest.java [deleted file]
server/sonar-server/src/test/java/org/sonar/server/computation/taskprocessor/CeProcessingSchedulerImplTest.java [new file with mode: 0644]
server/sonar-server/src/test/java/org/sonar/server/computation/taskprocessor/CeTaskProcessorRepositoryImplTest.java [new file with mode: 0644]
server/sonar-server/src/test/java/org/sonar/server/computation/taskprocessor/CeTaskProcessorRepositoryRule.java [new file with mode: 0644]
server/sonar-server/src/test/java/org/sonar/server/computation/taskprocessor/CeWorkerRunnableImplTest.java [new file with mode: 0644]

index 12db121dab1450f8ccbd8bfb34a4a7c2b962a5db..12d0e0e76f4f6e803857a8fc05efadce60b447d4 100644 (file)
@@ -21,7 +21,7 @@ package org.sonar.server.computation.container;
 
 import org.sonar.core.platform.Module;
 import org.sonar.server.computation.queue.report.ReportSubmitter;
-import org.sonar.server.computation.queue.report.ReportTaskProcessor;
+import org.sonar.server.computation.taskprocessor.report.ReportTaskProcessor;
 import org.sonar.server.computation.step.ComputationStepExecutor;
 
 public class ReportProcessingModule extends Module {
diff --git a/server/sonar-server/src/main/java/org/sonar/server/computation/queue/CeProcessingScheduler.java b/server/sonar-server/src/main/java/org/sonar/server/computation/queue/CeProcessingScheduler.java
deleted file mode 100644 (file)
index fa82e55..0000000
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
- * SonarQube, open source software quality management tool.
- * Copyright (C) 2008-2014 SonarSource
- * mailto:contact AT sonarsource DOT com
- *
- * SonarQube is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License as published by the Free Software Foundation; either
- * version 3 of the License, or (at your option) any later version.
- *
- * SonarQube is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public License
- * along with this program; if not, write to the Free Software Foundation,
- * Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
- */
-package org.sonar.server.computation.queue;
-
-public interface CeProcessingScheduler {
-
-  void startScheduling();
-
-}
diff --git a/server/sonar-server/src/main/java/org/sonar/server/computation/queue/CeProcessingSchedulerExecutorService.java b/server/sonar-server/src/main/java/org/sonar/server/computation/queue/CeProcessingSchedulerExecutorService.java
deleted file mode 100644 (file)
index bed8e1f..0000000
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * SonarQube, open source software quality management tool.
- * Copyright (C) 2008-2014 SonarSource
- * mailto:contact AT sonarsource DOT com
- *
- * SonarQube is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License as published by the Free Software Foundation; either
- * version 3 of the License, or (at your option) any later version.
- *
- * SonarQube is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public License
- * along with this program; if not, write to the Free Software Foundation,
- * Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
- */
-package org.sonar.server.computation.queue;
-
-import org.sonar.server.util.StoppableScheduledExecutorService;
-
-/**
- * The {@link java.util.concurrent.ExecutorService} responsible for running {@link CeWorkerRunnableImpl}.
- */
-public interface CeProcessingSchedulerExecutorService extends StoppableScheduledExecutorService {
-}
diff --git a/server/sonar-server/src/main/java/org/sonar/server/computation/queue/CeProcessingSchedulerExecutorServiceImpl.java b/server/sonar-server/src/main/java/org/sonar/server/computation/queue/CeProcessingSchedulerExecutorServiceImpl.java
deleted file mode 100644 (file)
index e9eb954..0000000
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * SonarQube, open source software quality management tool.
- * Copyright (C) 2008-2014 SonarSource
- * mailto:contact AT sonarsource DOT com
- *
- * SonarQube is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License as published by the Free Software Foundation; either
- * version 3 of the License, or (at your option) any later version.
- *
- * SonarQube is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public License
- * along with this program; if not, write to the Free Software Foundation,
- * Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
- */
-package org.sonar.server.computation.queue;
-
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import org.sonar.server.util.AbstractStoppableScheduledExecutorServiceImpl;
-
-public class CeProcessingSchedulerExecutorServiceImpl extends AbstractStoppableScheduledExecutorServiceImpl<ScheduledExecutorService>
-  implements CeProcessingSchedulerExecutorService {
-  private static final String THREAD_NAME_PREFIX = "ce-processor-";
-
-  public CeProcessingSchedulerExecutorServiceImpl() {
-    super(
-      Executors.newSingleThreadScheduledExecutor(
-        new ThreadFactoryBuilder()
-          .setNameFormat(THREAD_NAME_PREFIX + "%d")
-          .setPriority(Thread.MIN_PRIORITY)
-          .build()));
-  }
-
-}
diff --git a/server/sonar-server/src/main/java/org/sonar/server/computation/queue/CeProcessingSchedulerImpl.java b/server/sonar-server/src/main/java/org/sonar/server/computation/queue/CeProcessingSchedulerImpl.java
deleted file mode 100644 (file)
index b9d32e1..0000000
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * SonarQube, open source software quality management tool.
- * Copyright (C) 2008-2014 SonarSource
- * mailto:contact AT sonarsource DOT com
- *
- * SonarQube is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License as published by the Free Software Foundation; either
- * version 3 of the License, or (at your option) any later version.
- *
- * SonarQube is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public License
- * along with this program; if not, write to the Free Software Foundation,
- * Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
- */
-package org.sonar.server.computation.queue;
-
-import java.util.concurrent.TimeUnit;
-
-public class CeProcessingSchedulerImpl implements CeProcessingScheduler {
-  private final CeProcessingSchedulerExecutorService executorService;
-  private final CeWorkerRunnable workerRunnable;
-
-  private final long delayBetweenTasks;
-  private final long delayForFirstStart;
-  private final TimeUnit timeUnit;
-
-  public CeProcessingSchedulerImpl(CeProcessingSchedulerExecutorService processingExecutorService, CeWorkerRunnable workerRunnable) {
-    this.executorService = processingExecutorService;
-    this.workerRunnable = workerRunnable;
-
-    this.delayBetweenTasks = 10;
-    this.delayForFirstStart = 0;
-    this.timeUnit = TimeUnit.SECONDS;
-  }
-
-  @Override
-  public void startScheduling() {
-    executorService.scheduleAtFixedRate(workerRunnable, delayForFirstStart, delayBetweenTasks, timeUnit);
-  }
-
-}
index 526eb16f0bd09ff53f0b263d74ee9c6bbd6d7c80..3f63721405aaa9b08fe6edf405827c50fe029544 100644 (file)
@@ -24,6 +24,7 @@ import org.sonar.api.server.ServerSide;
 import org.sonar.db.DbClient;
 import org.sonar.db.DbSession;
 import org.sonar.server.computation.monitoring.CEQueueStatus;
+import org.sonar.server.computation.taskprocessor.CeProcessingScheduler;
 
 /**
  * Cleans-up the queue, initializes JMX counters then schedule
index 67c5bc6de636a6f2d3764860b185b7082398578d..0af3f348ccfc1d97902d421ef702d314b23eefe2 100644 (file)
@@ -29,25 +29,19 @@ public class CeQueueModule extends Module {
   @Override
   protected void configureModule() {
     add(
-        // queue state
-        CeQueueImpl.class,
+      // queue state
+      CeQueueImpl.class,
 
-        // queue monitoring
-        CEQueueStatusImpl.class,
-        ComputeEngineQueueMonitor.class,
+      // queue monitoring
+      CEQueueStatusImpl.class,
+      ComputeEngineQueueMonitor.class,
 
-        // CE queue processing
-        CeProcessingSchedulerExecutorServiceImpl.class,
-        CeWorkerRunnableImpl.class,
-        CeProcessingSchedulerImpl.class,
+      // queue cleaning
+      CeQueueCleaner.class,
+      CleanReportQueueListener.class,
+      ReportFiles.class,
 
-        // queue cleaning
-        CeQueueCleaner.class,
-        CleanReportQueueListener.class,
-        ReportFiles.class,
-
-        // init queue state and queue processing
-        CeQueueInitializer.class
-    );
+      // init queue state and queue processing
+      CeQueueInitializer.class);
   }
 }
diff --git a/server/sonar-server/src/main/java/org/sonar/server/computation/queue/CeWorkerRunnable.java b/server/sonar-server/src/main/java/org/sonar/server/computation/queue/CeWorkerRunnable.java
deleted file mode 100644 (file)
index 5de95c6..0000000
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
- * SonarQube, open source software quality management tool.
- * Copyright (C) 2008-2014 SonarSource
- * mailto:contact AT sonarsource DOT com
- *
- * SonarQube is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License as published by the Free Software Foundation; either
- * version 3 of the License, or (at your option) any later version.
- *
- * SonarQube is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public License
- * along with this program; if not, write to the Free Software Foundation,
- * Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
- */
-package org.sonar.server.computation.queue;
-
-/**
- * Marker interface of the runnable in charge of polling the {@link CeQueue} and executing {@link CeTask}.
- */
-public interface CeWorkerRunnable extends Runnable {
-}
diff --git a/server/sonar-server/src/main/java/org/sonar/server/computation/queue/CeWorkerRunnableImpl.java b/server/sonar-server/src/main/java/org/sonar/server/computation/queue/CeWorkerRunnableImpl.java
deleted file mode 100644 (file)
index 9b46bb3..0000000
+++ /dev/null
@@ -1,100 +0,0 @@
-/*
- * SonarQube, open source software quality management tool.
- * Copyright (C) 2008-2014 SonarSource
- * mailto:contact AT sonarsource DOT com
- *
- * SonarQube is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License as published by the Free Software Foundation; either
- * version 3 of the License, or (at your option) any later version.
- *
- * SonarQube is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public License
- * along with this program; if not, write to the Free Software Foundation,
- * Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
- */
-
-package org.sonar.server.computation.queue;
-
-import com.google.common.base.Optional;
-import org.sonar.api.utils.log.Logger;
-import org.sonar.api.utils.log.Loggers;
-import org.sonar.core.util.logs.Profiler;
-import org.sonar.db.ce.CeActivityDto;
-import org.sonar.server.computation.log.CeLogging;
-import org.sonar.server.computation.queue.report.ReportTaskProcessor;
-
-import static java.lang.String.format;
-
-public class CeWorkerRunnableImpl implements CeWorkerRunnable {
-
-  private static final Logger LOG = Loggers.get(CeWorkerRunnableImpl.class);
-
-  private final CeQueue queue;
-  private final ReportTaskProcessor reportTaskProcessor;
-  private final CeLogging ceLogging;
-
-  public CeWorkerRunnableImpl(CeQueue queue, ReportTaskProcessor reportTaskProcessor, CeLogging ceLogging) {
-    this.queue = queue;
-    this.reportTaskProcessor = reportTaskProcessor;
-    this.ceLogging = ceLogging;
-  }
-
-  @Override
-  public void run() {
-    Optional<CeTask> ceTask = tryAndFindTaskToExecute();
-    if (!ceTask.isPresent()) {
-      return;
-    }
-
-    executeTask(ceTask.get());
-  }
-
-  private Optional<CeTask> tryAndFindTaskToExecute() {
-    try {
-      return queue.peek();
-    } catch (Exception e) {
-      LOG.error("Failed to pop the queue of analysis reports", e);
-    }
-    return Optional.absent();
-  }
-
-  private void executeTask(CeTask task) {
-    // logging twice: once in sonar.log and once in CE appender
-    Profiler regularProfiler = startProfiler(task);
-    ceLogging.initForTask(task);
-    Profiler ceProfiler = startProfiler(task);
-
-    CeActivityDto.Status status = CeActivityDto.Status.FAILED;
-    try {
-      // TODO delegate the message to the related task processor, according to task type
-      reportTaskProcessor.process(task);
-      status = CeActivityDto.Status.SUCCESS;
-      queue.remove(task, status);
-    } catch (Throwable e) {
-      LOG.error(format("Failed to execute task %s", task.getUuid()), e);
-      queue.remove(task, status);
-    } finally {
-      // logging twice: once in sonar.log and once in CE appender
-      stopProfiler(ceProfiler, task, status);
-      ceLogging.clearForTask();
-      stopProfiler(regularProfiler, task, status);
-    }
-  }
-
-  private static Profiler startProfiler(CeTask task) {
-    return Profiler.create(LOG).startInfo("Execute task | project={} | id={}", task.getComponentKey(), task.getUuid());
-  }
-
-  private static void stopProfiler(Profiler profiler, CeTask task, CeActivityDto.Status status) {
-    if (status == CeActivityDto.Status.FAILED) {
-      profiler.stopError("Executed task | project={} | id={}", task.getComponentKey(), task.getUuid());
-    } else {
-      profiler.stopInfo("Executed task | project={} | id={}", task.getComponentKey(), task.getUuid());
-    }
-  }
-}
diff --git a/server/sonar-server/src/main/java/org/sonar/server/computation/queue/report/ReportTaskProcessor.java b/server/sonar-server/src/main/java/org/sonar/server/computation/queue/report/ReportTaskProcessor.java
deleted file mode 100644 (file)
index ed4693f..0000000
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * SonarQube, open source software quality management tool.
- * Copyright (C) 2008-2014 SonarSource
- * mailto:contact AT sonarsource DOT com
- *
- * SonarQube is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License as published by the Free Software Foundation; either
- * version 3 of the License, or (at your option) any later version.
- *
- * SonarQube is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public License
- * along with this program; if not, write to the Free Software Foundation,
- * Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
- */
-package org.sonar.server.computation.queue.report;
-
-import org.sonar.core.platform.ComponentContainer;
-import org.sonar.server.computation.step.ComputationStepExecutor;
-import org.sonar.server.computation.container.ComputeEngineContainer;
-import org.sonar.server.computation.container.ContainerFactory;
-import org.sonar.server.computation.queue.CeTask;
-
-public class ReportTaskProcessor {
-
-  private final ContainerFactory containerFactory;
-  private final ComponentContainer serverContainer;
-
-  public ReportTaskProcessor(ContainerFactory containerFactory, ComponentContainer serverContainer) {
-    this.containerFactory = containerFactory;
-    this.serverContainer = serverContainer;
-  }
-
-  public void process(CeTask task) {
-    ComputeEngineContainer ceContainer = containerFactory.create(serverContainer, task);
-    try {
-      ceContainer.getComponentByType(ComputationStepExecutor.class).execute();
-    } finally {
-      ceContainer.cleanup();
-    }
-  }
-}
diff --git a/server/sonar-server/src/main/java/org/sonar/server/computation/taskprocessor/CeProcessingScheduler.java b/server/sonar-server/src/main/java/org/sonar/server/computation/taskprocessor/CeProcessingScheduler.java
new file mode 100644 (file)
index 0000000..feaf513
--- /dev/null
@@ -0,0 +1,26 @@
+/*
+ * SonarQube, open source software quality management tool.
+ * Copyright (C) 2008-2014 SonarSource
+ * mailto:contact AT sonarsource DOT com
+ *
+ * SonarQube is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 3 of the License, or (at your option) any later version.
+ *
+ * SonarQube is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this program; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
+ */
+package org.sonar.server.computation.taskprocessor;
+
+public interface CeProcessingScheduler {
+
+  void startScheduling();
+
+}
diff --git a/server/sonar-server/src/main/java/org/sonar/server/computation/taskprocessor/CeProcessingSchedulerExecutorService.java b/server/sonar-server/src/main/java/org/sonar/server/computation/taskprocessor/CeProcessingSchedulerExecutorService.java
new file mode 100644 (file)
index 0000000..3fc101e
--- /dev/null
@@ -0,0 +1,28 @@
+/*
+ * SonarQube, open source software quality management tool.
+ * Copyright (C) 2008-2014 SonarSource
+ * mailto:contact AT sonarsource DOT com
+ *
+ * SonarQube is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 3 of the License, or (at your option) any later version.
+ *
+ * SonarQube is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this program; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
+ */
+package org.sonar.server.computation.taskprocessor;
+
+import org.sonar.server.util.StoppableScheduledExecutorService;
+
+/**
+ * The {@link java.util.concurrent.ExecutorService} responsible for running {@link CeWorkerRunnableImpl}.
+ */
+public interface CeProcessingSchedulerExecutorService extends StoppableScheduledExecutorService {
+}
diff --git a/server/sonar-server/src/main/java/org/sonar/server/computation/taskprocessor/CeProcessingSchedulerExecutorServiceImpl.java b/server/sonar-server/src/main/java/org/sonar/server/computation/taskprocessor/CeProcessingSchedulerExecutorServiceImpl.java
new file mode 100644 (file)
index 0000000..8ea5746
--- /dev/null
@@ -0,0 +1,40 @@
+/*
+ * SonarQube, open source software quality management tool.
+ * Copyright (C) 2008-2014 SonarSource
+ * mailto:contact AT sonarsource DOT com
+ *
+ * SonarQube is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 3 of the License, or (at your option) any later version.
+ *
+ * SonarQube is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this program; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
+ */
+package org.sonar.server.computation.taskprocessor;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import org.sonar.server.util.AbstractStoppableScheduledExecutorServiceImpl;
+
+public class CeProcessingSchedulerExecutorServiceImpl extends AbstractStoppableScheduledExecutorServiceImpl<ScheduledExecutorService>
+  implements CeProcessingSchedulerExecutorService {
+  private static final String THREAD_NAME_PREFIX = "ce-processor-";
+
+  public CeProcessingSchedulerExecutorServiceImpl() {
+    super(
+      Executors.newSingleThreadScheduledExecutor(
+        new ThreadFactoryBuilder()
+          .setNameFormat(THREAD_NAME_PREFIX + "%d")
+          .setPriority(Thread.MIN_PRIORITY)
+          .build()));
+  }
+
+}
diff --git a/server/sonar-server/src/main/java/org/sonar/server/computation/taskprocessor/CeProcessingSchedulerImpl.java b/server/sonar-server/src/main/java/org/sonar/server/computation/taskprocessor/CeProcessingSchedulerImpl.java
new file mode 100644 (file)
index 0000000..d9e346a
--- /dev/null
@@ -0,0 +1,46 @@
+/*
+ * SonarQube, open source software quality management tool.
+ * Copyright (C) 2008-2014 SonarSource
+ * mailto:contact AT sonarsource DOT com
+ *
+ * SonarQube is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 3 of the License, or (at your option) any later version.
+ *
+ * SonarQube is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this program; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
+ */
+package org.sonar.server.computation.taskprocessor;
+
+import java.util.concurrent.TimeUnit;
+
+public class CeProcessingSchedulerImpl implements CeProcessingScheduler {
+  private final CeProcessingSchedulerExecutorService executorService;
+  private final CeWorkerRunnable workerRunnable;
+
+  private final long delayBetweenTasks;
+  private final long delayForFirstStart;
+  private final TimeUnit timeUnit;
+
+  public CeProcessingSchedulerImpl(CeProcessingSchedulerExecutorService processingExecutorService, CeWorkerRunnable workerRunnable) {
+    this.executorService = processingExecutorService;
+    this.workerRunnable = workerRunnable;
+
+    this.delayBetweenTasks = 10;
+    this.delayForFirstStart = 0;
+    this.timeUnit = TimeUnit.SECONDS;
+  }
+
+  @Override
+  public void startScheduling() {
+    executorService.scheduleAtFixedRate(workerRunnable, delayForFirstStart, delayBetweenTasks, timeUnit);
+  }
+
+}
diff --git a/server/sonar-server/src/main/java/org/sonar/server/computation/taskprocessor/CeTaskProcessor.java b/server/sonar-server/src/main/java/org/sonar/server/computation/taskprocessor/CeTaskProcessor.java
new file mode 100644 (file)
index 0000000..16edeb6
--- /dev/null
@@ -0,0 +1,55 @@
+/*
+ * SonarQube, open source software quality management tool.
+ * Copyright (C) 2008-2014 SonarSource
+ * mailto:contact AT sonarsource DOT com
+ *
+ * SonarQube is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 3 of the License, or (at your option) any later version.
+ *
+ * SonarQube is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this program; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
+ */
+package org.sonar.server.computation.taskprocessor;
+
+import java.util.Set;
+import org.sonar.server.computation.queue.CeTask;
+
+/**
+ * This interface is used to provide the processing code for {@link CeTask}s of one or more type to be called by the
+ * Compute Engine.
+ */
+public interface CeTaskProcessor {
+
+  /**
+   * The {@link CeTask#getType()} for which this {@link CeTaskProcessor} provides the processing code.
+   * <p>
+   * The match of type is done using {@link String#equals(Object)} and if more than one {@link CeTaskProcessor} declares
+   * itself had handler for the same {@link CeTask#getType()}, an error will be raised at startup and startup will
+   * fail.
+   * </p>
+   * <p>
+   * If an empty {@link Set} is returned, the {@link CeTaskProcessor} will be ignored.
+   * </p>
+   */
+  Set<String> getHandledCeTaskTypes();
+
+  /**
+   * Call the processing code for a specific {@link CeTask}.
+   * <p>
+   * The specified is guaranteed to be non {@code null} and its {@link CeTask#getType()} to be one of the values
+   * of {@link #getHandledCeTaskTypes()}.
+   * </p>
+   *
+   * @throws RuntimeException when thrown, it will be caught and logged by the Compute Engine and the processing of the
+   *         specified {@link CeTask} will be flagged as failed.
+   */
+  void process(CeTask task);
+}
diff --git a/server/sonar-server/src/main/java/org/sonar/server/computation/taskprocessor/CeTaskProcessorModule.java b/server/sonar-server/src/main/java/org/sonar/server/computation/taskprocessor/CeTaskProcessorModule.java
new file mode 100644 (file)
index 0000000..8ae68bd
--- /dev/null
@@ -0,0 +1,33 @@
+/*
+ * SonarQube, open source software quality management tool.
+ * Copyright (C) 2008-2014 SonarSource
+ * mailto:contact AT sonarsource DOT com
+ *
+ * SonarQube is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 3 of the License, or (at your option) any later version.
+ *
+ * SonarQube is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this program; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
+ */
+package org.sonar.server.computation.taskprocessor;
+
+import org.sonar.core.platform.Module;
+
+public class CeTaskProcessorModule extends Module {
+  @Override
+  protected void configureModule() {
+    add(
+      CeTaskProcessorRepositoryImpl.class,
+      CeWorkerRunnableImpl.class,
+      CeProcessingSchedulerExecutorServiceImpl.class,
+      CeProcessingSchedulerImpl.class);
+  }
+}
diff --git a/server/sonar-server/src/main/java/org/sonar/server/computation/taskprocessor/CeTaskProcessorRepository.java b/server/sonar-server/src/main/java/org/sonar/server/computation/taskprocessor/CeTaskProcessorRepository.java
new file mode 100644 (file)
index 0000000..be4be61
--- /dev/null
@@ -0,0 +1,33 @@
+/*
+ * SonarQube, open source software quality management tool.
+ * Copyright (C) 2008-2014 SonarSource
+ * mailto:contact AT sonarsource DOT com
+ *
+ * SonarQube is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 3 of the License, or (at your option) any later version.
+ *
+ * SonarQube is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this program; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
+ */
+package org.sonar.server.computation.taskprocessor;
+
+import com.google.common.base.Optional;
+import org.sonar.server.computation.queue.CeTask;
+
+public interface CeTaskProcessorRepository {
+
+  /**
+   * @throws NullPointerException if the specified {@link CeTask} is {@code null}
+   * @throws IllegalStateException if there is no {@link CeTaskProcessor} for the specified {@link CeTask} in the repository
+   */
+  Optional<CeTaskProcessor> getForCeTask(CeTask ceTask);
+
+}
diff --git a/server/sonar-server/src/main/java/org/sonar/server/computation/taskprocessor/CeTaskProcessorRepositoryImpl.java b/server/sonar-server/src/main/java/org/sonar/server/computation/taskprocessor/CeTaskProcessorRepositoryImpl.java
new file mode 100644 (file)
index 0000000..4a19ff7
--- /dev/null
@@ -0,0 +1,104 @@
+/*
+ * SonarQube, open source software quality management tool.
+ * Copyright (C) 2008-2014 SonarSource
+ * mailto:contact AT sonarsource DOT com
+ *
+ * SonarQube is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 3 of the License, or (at your option) any later version.
+ *
+ * SonarQube is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this program; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
+ */
+package org.sonar.server.computation.taskprocessor;
+
+import com.google.common.base.Function;
+import com.google.common.base.Joiner;
+import com.google.common.base.Optional;
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Multimap;
+import java.util.Collection;
+import java.util.Map;
+import javax.annotation.Nonnull;
+import org.sonar.server.computation.queue.CeTask;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.collect.FluentIterable.from;
+import static java.lang.String.CASE_INSENSITIVE_ORDER;
+import static java.lang.String.format;
+
+/**
+ * {@link CeTaskProcessorRepository} implementation which provides access to the {@link CeTaskProcessor} existing in the
+ * PicoContainer the current object belongs to.
+ */
+public class CeTaskProcessorRepositoryImpl implements CeTaskProcessorRepository {
+  private static final Joiner COMMA_JOINER = Joiner.on(", ");
+
+  private final Map<String, CeTaskProcessor> taskProcessorByCeTaskType;
+
+  public CeTaskProcessorRepositoryImpl(CeTaskProcessor[] taskProcessors) {
+    this.taskProcessorByCeTaskType = indexTaskProcessors(taskProcessors);
+  }
+
+  @Override
+  public Optional<CeTaskProcessor> getForCeTask(CeTask ceTask) {
+    return Optional.fromNullable(taskProcessorByCeTaskType.get(ceTask.getType()));
+  }
+
+  private static Map<String, CeTaskProcessor> indexTaskProcessors(CeTaskProcessor[] taskProcessors) {
+    Multimap<String, CeTaskProcessor> permissiveIndex = buildPermissiveCeTaskProcessorIndex(taskProcessors);
+    checkUniqueHandlerPerCeTaskType(permissiveIndex);
+    return ImmutableMap.copyOf(Maps.transformValues(permissiveIndex.asMap(), CeTaskProcessorCollectionToFirstElement.INSTANCE));
+  }
+
+  private static Multimap<String, CeTaskProcessor> buildPermissiveCeTaskProcessorIndex(CeTaskProcessor[] taskProcessors) {
+    Multimap<String, CeTaskProcessor> permissiveIndex = ArrayListMultimap.create(taskProcessors.length, 1);
+    for (CeTaskProcessor taskProcessor : taskProcessors) {
+      for (String ceTaskType : taskProcessor.getHandledCeTaskTypes()) {
+        permissiveIndex.put(ceTaskType, taskProcessor);
+      }
+    }
+    return permissiveIndex;
+  }
+
+  private static void checkUniqueHandlerPerCeTaskType(Multimap<String, CeTaskProcessor> permissiveIndex) {
+    for (Map.Entry<String, Collection<CeTaskProcessor>> entry : permissiveIndex.asMap().entrySet()) {
+      checkArgument(
+        entry.getValue().size() == 1,
+        format(
+          "There can be only one CeTaskProcessor instance registered as the processor for CeTask type %s. " +
+            "More than one found. Please fix your configuration: %s",
+          entry.getKey(),
+          COMMA_JOINER.join(from(entry.getValue()).transform(ToClassName.INSTANCE).toSortedList(CASE_INSENSITIVE_ORDER))));
+    }
+  }
+
+  private enum ToClassName implements Function<Object, String> {
+    INSTANCE;
+
+    @Override
+    @Nonnull
+    public String apply(@Nonnull Object input) {
+      return input.getClass().getName();
+    }
+  }
+
+  private enum CeTaskProcessorCollectionToFirstElement implements Function<Collection<CeTaskProcessor>, CeTaskProcessor> {
+    INSTANCE;
+
+    @Override
+    @Nonnull
+    public CeTaskProcessor apply(@Nonnull Collection<CeTaskProcessor> input) {
+      return input.iterator().next();
+    }
+  }
+}
diff --git a/server/sonar-server/src/main/java/org/sonar/server/computation/taskprocessor/CeWorkerRunnable.java b/server/sonar-server/src/main/java/org/sonar/server/computation/taskprocessor/CeWorkerRunnable.java
new file mode 100644 (file)
index 0000000..2df1a66
--- /dev/null
@@ -0,0 +1,29 @@
+/*
+ * SonarQube, open source software quality management tool.
+ * Copyright (C) 2008-2014 SonarSource
+ * mailto:contact AT sonarsource DOT com
+ *
+ * SonarQube is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 3 of the License, or (at your option) any later version.
+ *
+ * SonarQube is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this program; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
+ */
+package org.sonar.server.computation.taskprocessor;
+
+import org.sonar.server.computation.queue.CeQueue;
+import org.sonar.server.computation.queue.CeTask;
+
+/**
+ * Marker interface of the runnable in charge of polling the {@link CeQueue} and executing {@link CeTask}.
+ */
+public interface CeWorkerRunnable extends Runnable {
+}
diff --git a/server/sonar-server/src/main/java/org/sonar/server/computation/taskprocessor/CeWorkerRunnableImpl.java b/server/sonar-server/src/main/java/org/sonar/server/computation/taskprocessor/CeWorkerRunnableImpl.java
new file mode 100644 (file)
index 0000000..e20b6ac
--- /dev/null
@@ -0,0 +1,107 @@
+/*
+ * SonarQube, open source software quality management tool.
+ * Copyright (C) 2008-2014 SonarSource
+ * mailto:contact AT sonarsource DOT com
+ *
+ * SonarQube is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 3 of the License, or (at your option) any later version.
+ *
+ * SonarQube is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this program; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
+ */
+
+package org.sonar.server.computation.taskprocessor;
+
+import com.google.common.base.Optional;
+import org.sonar.api.utils.log.Logger;
+import org.sonar.api.utils.log.Loggers;
+import org.sonar.core.util.logs.Profiler;
+import org.sonar.db.ce.CeActivityDto;
+import org.sonar.server.computation.log.CeLogging;
+import org.sonar.server.computation.queue.CeQueue;
+import org.sonar.server.computation.queue.CeTask;
+
+import static java.lang.String.format;
+
+public class CeWorkerRunnableImpl implements CeWorkerRunnable {
+
+  private static final Logger LOG = Loggers.get(CeWorkerRunnableImpl.class);
+
+  private final CeQueue queue;
+  private final CeLogging ceLogging;
+  private final CeTaskProcessorRepository taskProcessorRepository;
+
+  public CeWorkerRunnableImpl(CeQueue queue, CeLogging ceLogging, CeTaskProcessorRepository taskProcessorRepository) {
+    this.queue = queue;
+    this.ceLogging = ceLogging;
+    this.taskProcessorRepository = taskProcessorRepository;
+  }
+
+  @Override
+  public void run() {
+    Optional<CeTask> ceTask = tryAndFindTaskToExecute();
+    if (!ceTask.isPresent()) {
+      return;
+    }
+
+    executeTask(ceTask.get());
+  }
+
+  private Optional<CeTask> tryAndFindTaskToExecute() {
+    try {
+      return queue.peek();
+    } catch (Exception e) {
+      LOG.error("Failed to pop the queue of analysis reports", e);
+    }
+    return Optional.absent();
+  }
+
+  private void executeTask(CeTask task) {
+    // logging twice: once in sonar.log and once in CE appender
+    Profiler regularProfiler = startProfiler(task);
+    ceLogging.initForTask(task);
+    Profiler ceProfiler = startProfiler(task);
+
+    CeActivityDto.Status status = CeActivityDto.Status.FAILED;
+    try {
+      // TODO delegate the message to the related task processor, according to task type
+      Optional<CeTaskProcessor> taskProcessor = taskProcessorRepository.getForCeTask(task);
+      if (taskProcessor.isPresent()) {
+        taskProcessor.get().process(task);
+        status = CeActivityDto.Status.SUCCESS;
+      } else {
+        LOG.error("No CeTaskProcessor is defined for task of type {}. Plugin configuration may have changed", task.getType());
+        status = CeActivityDto.Status.FAILED;
+      }
+      queue.remove(task, status);
+    } catch (Throwable e) {
+      LOG.error(format("Failed to execute task %s", task.getUuid()), e);
+      queue.remove(task, status);
+    } finally {
+      // logging twice: once in sonar.log and once in CE appender
+      stopProfiler(ceProfiler, task, status);
+      ceLogging.clearForTask();
+      stopProfiler(regularProfiler, task, status);
+    }
+  }
+
+  private static Profiler startProfiler(CeTask task) {
+    return Profiler.create(LOG).startInfo("Execute task | project={} | id={}", task.getComponentKey(), task.getUuid());
+  }
+
+  private static void stopProfiler(Profiler profiler, CeTask task, CeActivityDto.Status status) {
+    if (status == CeActivityDto.Status.FAILED) {
+      profiler.stopError("Executed task | project={} | id={}", task.getComponentKey(), task.getUuid());
+    } else {
+      profiler.stopInfo("Executed task | project={} | id={}", task.getComponentKey(), task.getUuid());
+    }
+  }
+}
diff --git a/server/sonar-server/src/main/java/org/sonar/server/computation/taskprocessor/package-info.java b/server/sonar-server/src/main/java/org/sonar/server/computation/taskprocessor/package-info.java
new file mode 100644 (file)
index 0000000..58194f6
--- /dev/null
@@ -0,0 +1,24 @@
+/*
+ * SonarQube, open source software quality management tool.
+ * Copyright (C) 2008-2014 SonarSource
+ * mailto:contact AT sonarsource DOT com
+ *
+ * SonarQube is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 3 of the License, or (at your option) any later version.
+ *
+ * SonarQube is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this program; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
+ */
+
+@ParametersAreNonnullByDefault
+package org.sonar.server.computation.taskprocessor;
+
+import javax.annotation.ParametersAreNonnullByDefault;
diff --git a/server/sonar-server/src/main/java/org/sonar/server/computation/taskprocessor/report/ReportTaskProcessor.java b/server/sonar-server/src/main/java/org/sonar/server/computation/taskprocessor/report/ReportTaskProcessor.java
new file mode 100644 (file)
index 0000000..e0b1dcd
--- /dev/null
@@ -0,0 +1,58 @@
+/*
+ * SonarQube, open source software quality management tool.
+ * Copyright (C) 2008-2014 SonarSource
+ * mailto:contact AT sonarsource DOT com
+ *
+ * SonarQube is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 3 of the License, or (at your option) any later version.
+ *
+ * SonarQube is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this program; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
+ */
+package org.sonar.server.computation.taskprocessor.report;
+
+import java.util.Collections;
+import java.util.Set;
+import org.sonar.core.platform.ComponentContainer;
+import org.sonar.db.ce.CeTaskTypes;
+import org.sonar.server.computation.taskprocessor.CeTaskProcessor;
+import org.sonar.server.computation.step.ComputationStepExecutor;
+import org.sonar.server.computation.container.ComputeEngineContainer;
+import org.sonar.server.computation.container.ContainerFactory;
+import org.sonar.server.computation.queue.CeTask;
+
+public class ReportTaskProcessor implements CeTaskProcessor {
+
+  private static final Set<String> HANDLED_TYPES = Collections.singleton(CeTaskTypes.REPORT);
+
+  private final ContainerFactory containerFactory;
+  private final ComponentContainer serverContainer;
+
+  public ReportTaskProcessor(ContainerFactory containerFactory, ComponentContainer serverContainer) {
+    this.containerFactory = containerFactory;
+    this.serverContainer = serverContainer;
+  }
+
+  @Override
+  public Set<String> getHandledCeTaskTypes() {
+    return HANDLED_TYPES;
+  }
+
+  @Override
+  public void process(CeTask task) {
+    ComputeEngineContainer ceContainer = containerFactory.create(serverContainer, task);
+    try {
+      ceContainer.getComponentByType(ComputationStepExecutor.class).execute();
+    } finally {
+      ceContainer.cleanup();
+    }
+  }
+}
diff --git a/server/sonar-server/src/main/java/org/sonar/server/computation/taskprocessor/report/package-info.java b/server/sonar-server/src/main/java/org/sonar/server/computation/taskprocessor/report/package-info.java
new file mode 100644 (file)
index 0000000..c21d764
--- /dev/null
@@ -0,0 +1,24 @@
+/*
+ * SonarQube, open source software quality management tool.
+ * Copyright (C) 2008-2014 SonarSource
+ * mailto:contact AT sonarsource DOT com
+ *
+ * SonarQube is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 3 of the License, or (at your option) any later version.
+ *
+ * SonarQube is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this program; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
+ */
+
+@ParametersAreNonnullByDefault
+package org.sonar.server.computation.taskprocessor.report;
+
+import javax.annotation.ParametersAreNonnullByDefault;
index 1af4c400a596d4e311ed9bb0ce4e1b34ec701b0b..f21981e83802fe440ff94808c9c6d6de88e18e86 100644 (file)
@@ -64,6 +64,7 @@ import org.sonar.server.component.ws.ResourcesWs;
 import org.sonar.server.computation.CeModule;
 import org.sonar.server.computation.container.ReportProcessingModule;
 import org.sonar.server.computation.queue.CeQueueModule;
+import org.sonar.server.computation.taskprocessor.CeTaskProcessorModule;
 import org.sonar.server.computation.ws.CeWsModule;
 import org.sonar.server.config.ws.PropertiesWs;
 import org.sonar.server.dashboard.template.GlobalDefaultDashboard;
@@ -701,6 +702,7 @@ public class PlatformLevel4 extends PlatformLevel {
       // Compute engine
       CeModule.class,
       CeQueueModule.class,
+      CeTaskProcessorModule.class,
       CeWsModule.class,
       ReportProcessingModule.class,
 
diff --git a/server/sonar-server/src/test/java/org/sonar/server/computation/queue/CeProcessingSchedulerImplTest.java b/server/sonar-server/src/test/java/org/sonar/server/computation/queue/CeProcessingSchedulerImplTest.java
deleted file mode 100644 (file)
index b76640d..0000000
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * SonarQube, open source software quality management tool.
- * Copyright (C) 2008-2014 SonarSource
- * mailto:contact AT sonarsource DOT com
- *
- * SonarQube is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License as published by the Free Software Foundation; either
- * version 3 of the License, or (at your option) any later version.
- *
- * SonarQube is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public License
- * along with this program; if not, write to the Free Software Foundation,
- * Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
- */
-package org.sonar.server.computation.queue;
-
-import java.util.concurrent.TimeUnit;
-import org.junit.Test;
-
-import static org.mockito.Matchers.eq;
-import static org.mockito.Matchers.same;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.verifyNoMoreInteractions;
-
-public class CeProcessingSchedulerImplTest {
-  private CeProcessingSchedulerExecutorService processingExecutorService = mock(CeProcessingSchedulerExecutorService.class);
-  private CeWorkerRunnable workerRunnable = mock(CeWorkerRunnable.class);
-  private CeProcessingSchedulerImpl underTest = new CeProcessingSchedulerImpl(processingExecutorService, workerRunnable);
-
-  @Test
-  public void startScheduling_schedules_CeWorkerRunnable_at_fixed_rate_run_head_of_queue() {
-    underTest.startScheduling();
-
-    verify(processingExecutorService).scheduleAtFixedRate(same(workerRunnable), eq(0L), eq(10L), eq(TimeUnit.SECONDS));
-    verifyNoMoreInteractions(processingExecutorService);
-  }
-
-}
index 9c09fe0610baaeb3b99c4cf553fb089e7f75b545..2fdb02bfb3956a089c2acc252e6ff49394ce3544 100644 (file)
@@ -34,6 +34,7 @@ import org.sonar.db.ce.CeTaskTypes;
 import org.sonar.server.computation.queue.report.ReportFiles;
 import org.sonar.server.computation.monitoring.CEQueueStatus;
 import org.sonar.server.computation.monitoring.CEQueueStatusImpl;
+import org.sonar.server.computation.taskprocessor.CeProcessingScheduler;
 
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.mockito.Matchers.any;
diff --git a/server/sonar-server/src/test/java/org/sonar/server/computation/queue/CeWorkerRunnableImplTest.java b/server/sonar-server/src/test/java/org/sonar/server/computation/queue/CeWorkerRunnableImplTest.java
deleted file mode 100644 (file)
index aae8a53..0000000
+++ /dev/null
@@ -1,80 +0,0 @@
-/*
- * SonarQube, open source software quality management tool.
- * Copyright (C) 2008-2014 SonarSource
- * mailto:contact AT sonarsource DOT com
- *
- * SonarQube is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License as published by the Free Software Foundation; either
- * version 3 of the License, or (at your option) any later version.
- *
- * SonarQube is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public License
- * along with this program; if not, write to the Free Software Foundation,
- * Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
- */
-package org.sonar.server.computation.queue;
-
-import com.google.common.base.Optional;
-import org.junit.Test;
-import org.mockito.InOrder;
-import org.mockito.Mockito;
-import org.sonar.db.ce.CeActivityDto;
-import org.sonar.db.ce.CeTaskTypes;
-import org.sonar.server.computation.queue.report.ReportTaskProcessor;
-import org.sonar.server.computation.log.CeLogging;
-
-import static org.mockito.Mockito.doThrow;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.verifyZeroInteractions;
-import static org.mockito.Mockito.when;
-
-public class CeWorkerRunnableImplTest {
-
-  CeQueue queue = mock(CeQueueImpl.class);
-  ReportTaskProcessor taskProcessor = mock(ReportTaskProcessor.class);
-  CeLogging ceLogging = mock(CeLogging.class);
-  CeWorkerRunnable underTest = new CeWorkerRunnableImpl(queue, taskProcessor, ceLogging);
-
-  @Test
-  public void no_pending_tasks_in_queue() throws Exception {
-    when(queue.peek()).thenReturn(Optional.<CeTask>absent());
-
-    underTest.run();
-
-    verifyZeroInteractions(taskProcessor, ceLogging);
-  }
-
-  @Test
-  public void peek_and_process_task() throws Exception {
-    CeTask task = new CeTask.Builder().setUuid("TASK_1").setType(CeTaskTypes.REPORT).setComponentUuid("PROJECT_1").setSubmitterLogin(null).build();
-    when(queue.peek()).thenReturn(Optional.of(task));
-
-    underTest.run();
-
-    InOrder inOrder = Mockito.inOrder(ceLogging, taskProcessor, queue);
-    inOrder.verify(ceLogging).initForTask(task);
-    inOrder.verify(taskProcessor).process(task);
-    inOrder.verify(queue).remove(task, CeActivityDto.Status.SUCCESS);
-    inOrder.verify(ceLogging).clearForTask();
-  }
-
-  @Test
-  public void fail_to_process_task() throws Exception {
-    CeTask task = new CeTask.Builder().setUuid("TASK_1").setType(CeTaskTypes.REPORT).setComponentUuid("PROJECT_1").setSubmitterLogin(null).build();
-    when(queue.peek()).thenReturn(Optional.of(task));
-    doThrow(new IllegalStateException()).when(taskProcessor).process(task);
-
-    underTest.run();
-
-    InOrder inOrder = Mockito.inOrder(ceLogging, taskProcessor, queue);
-    inOrder.verify(ceLogging).initForTask(task);
-    inOrder.verify(taskProcessor).process(task);
-    inOrder.verify(queue).remove(task, CeActivityDto.Status.FAILED);
-    inOrder.verify(ceLogging).clearForTask();
-  }
-}
diff --git a/server/sonar-server/src/test/java/org/sonar/server/computation/taskprocessor/CeProcessingSchedulerImplTest.java b/server/sonar-server/src/test/java/org/sonar/server/computation/taskprocessor/CeProcessingSchedulerImplTest.java
new file mode 100644 (file)
index 0000000..9dfa6da
--- /dev/null
@@ -0,0 +1,44 @@
+/*
+ * SonarQube, open source software quality management tool.
+ * Copyright (C) 2008-2014 SonarSource
+ * mailto:contact AT sonarsource DOT com
+ *
+ * SonarQube is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 3 of the License, or (at your option) any later version.
+ *
+ * SonarQube is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this program; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
+ */
+package org.sonar.server.computation.taskprocessor;
+
+import java.util.concurrent.TimeUnit;
+import org.junit.Test;
+
+import static org.mockito.Matchers.eq;
+import static org.mockito.Matchers.same;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+
+public class CeProcessingSchedulerImplTest {
+  private CeProcessingSchedulerExecutorService processingExecutorService = mock(CeProcessingSchedulerExecutorService.class);
+  private CeWorkerRunnable workerRunnable = mock(CeWorkerRunnable.class);
+  private CeProcessingSchedulerImpl underTest = new CeProcessingSchedulerImpl(processingExecutorService, workerRunnable);
+
+  @Test
+  public void startScheduling_schedules_CeWorkerRunnable_at_fixed_rate_run_head_of_queue() {
+    underTest.startScheduling();
+
+    verify(processingExecutorService).scheduleAtFixedRate(same(workerRunnable), eq(0L), eq(10L), eq(TimeUnit.SECONDS));
+    verifyNoMoreInteractions(processingExecutorService);
+  }
+
+}
diff --git a/server/sonar-server/src/test/java/org/sonar/server/computation/taskprocessor/CeTaskProcessorRepositoryImplTest.java b/server/sonar-server/src/test/java/org/sonar/server/computation/taskprocessor/CeTaskProcessorRepositoryImplTest.java
new file mode 100644 (file)
index 0000000..6b86595
--- /dev/null
@@ -0,0 +1,145 @@
+/*
+ * SonarQube, open source software quality management tool.
+ * Copyright (C) 2008-2014 SonarSource
+ * mailto:contact AT sonarsource DOT com
+ *
+ * SonarQube is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 3 of the License, or (at your option) any later version.
+ *
+ * SonarQube is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this program; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
+ */
+package org.sonar.server.computation.taskprocessor;
+
+import com.google.common.collect.ImmutableSet;
+import java.util.Set;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.sonar.server.computation.queue.CeTask;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.guava.api.Assertions.assertThat;
+
+public class CeTaskProcessorRepositoryImplTest {
+  private static final String SOME_CE_TASK_TYPE = "some type";
+  private static final String SOME_COMPONENT_KEY = "key";
+
+  @Rule
+  public ExpectedException expectedException = ExpectedException.none();
+
+  @Test
+  public void constructor_accepts_empty_array_argument() {
+    new CeTaskProcessorRepositoryImpl(new CeTaskProcessor[] {});
+  }
+
+  @Test
+  public void constructor_throws_IAE_if_two_TaskProcessor_handle_the_same_CeTask_type() {
+    expectedException.expect(IllegalArgumentException.class);
+    expectedException.expectMessage(
+      "There can be only one CeTaskProcessor instance registered as the processor for CeTask type " + SOME_CE_TASK_TYPE + ". " +
+        "More than one found. Please fix your configuration: " + SomeProcessor1.class.getName() + ", " + SomeProcessor2.class.getName());
+
+    new CeTaskProcessorRepositoryImpl(new CeTaskProcessor[] {
+      new SomeProcessor1(SOME_CE_TASK_TYPE),
+      new SomeProcessor2(SOME_CE_TASK_TYPE)
+    });
+  }
+
+  @Test
+  public void constructor_throws_IAE_if_multiple_TaskProcessor_overlap_their_supported_CeTask_type() {
+    expectedException.expect(IllegalArgumentException.class);
+    expectedException.expectMessage(
+      "There can be only one CeTaskProcessor instance registered as the processor for CeTask type " + SOME_CE_TASK_TYPE + ". " +
+        "More than one found. Please fix your configuration: " + SomeProcessor1.class.getName() + ", " + SomeProcessor2.class.getName());
+
+    new CeTaskProcessorRepositoryImpl(new CeTaskProcessor[] {
+      new SomeProcessor2(SOME_CE_TASK_TYPE + "_2", SOME_CE_TASK_TYPE),
+      new SomeProcessor1(SOME_CE_TASK_TYPE, SOME_CE_TASK_TYPE + "_3")
+    });
+  }
+
+  @Test
+  public void getForTask_returns_absent_if_repository_is_empty() {
+    CeTaskProcessorRepositoryImpl underTest = new CeTaskProcessorRepositoryImpl(new CeTaskProcessor[] {});
+
+    assertThat(underTest.getForCeTask(createCeTask(SOME_CE_TASK_TYPE, SOME_COMPONENT_KEY))).isAbsent();
+  }
+
+  @Test
+  public void getForTask_returns_absent_if_repository_does_not_contain_matching_TaskProcessor() {
+    CeTaskProcessorRepositoryImpl underTest = new CeTaskProcessorRepositoryImpl(new CeTaskProcessor[] {
+      createCeTaskProcessor(SOME_CE_TASK_TYPE + "_1"),
+      createCeTaskProcessor(SOME_CE_TASK_TYPE + "_2", SOME_CE_TASK_TYPE + "_3"),
+    });
+
+    assertThat(underTest.getForCeTask(createCeTask(SOME_CE_TASK_TYPE, SOME_COMPONENT_KEY))).isAbsent();
+  }
+
+  @Test
+  public void getForTask_returns_TaskProcessor_based_on_CeTask_type_only() {
+    CeTaskProcessor taskProcessor = createCeTaskProcessor(SOME_CE_TASK_TYPE);
+    CeTaskProcessorRepositoryImpl underTest = new CeTaskProcessorRepositoryImpl(new CeTaskProcessor[] {taskProcessor});
+
+    assertThat(underTest.getForCeTask(createCeTask(SOME_CE_TASK_TYPE, SOME_COMPONENT_KEY)).get()).isSameAs(taskProcessor);
+    assertThat(underTest.getForCeTask(createCeTask(SOME_CE_TASK_TYPE, SOME_COMPONENT_KEY + "2")).get()).isSameAs(taskProcessor);
+  }
+
+  @Test
+  public void getForTask_returns_TaskProcessor_even_if_it_is_not_specific() {
+    CeTaskProcessor taskProcessor = createCeTaskProcessor(SOME_CE_TASK_TYPE + "_1", SOME_CE_TASK_TYPE, SOME_CE_TASK_TYPE + "_3");
+    CeTaskProcessorRepositoryImpl underTest = new CeTaskProcessorRepositoryImpl(new CeTaskProcessor[] {taskProcessor});
+
+    assertThat(underTest.getForCeTask(createCeTask(SOME_CE_TASK_TYPE, SOME_COMPONENT_KEY)).get()).isSameAs(taskProcessor);
+  }
+
+  private CeTaskProcessor createCeTaskProcessor(final String... ceTaskTypes) {
+    return new HandleTypeOnlyTaskProcessor(ceTaskTypes);
+  }
+
+  private static CeTask createCeTask(String ceTaskType, String key) {
+    return new CeTask.Builder()
+      .setType(ceTaskType)
+      .setUuid("task_uuid_" + key)
+      .setComponentKey(key).setComponentUuid("uuid_" + key).setComponentName("name_" + key)
+      .build();
+  }
+
+  private static class HandleTypeOnlyTaskProcessor implements CeTaskProcessor {
+    private final String[] ceTaskTypes;
+
+    public HandleTypeOnlyTaskProcessor(String... ceTaskTypes) {
+      this.ceTaskTypes = ceTaskTypes;
+    }
+
+    @Override
+    public Set<String> getHandledCeTaskTypes() {
+      return ImmutableSet.copyOf(ceTaskTypes);
+    }
+
+    @Override
+    public void process(CeTask task) {
+      throw new UnsupportedOperationException("Process is not implemented");
+    }
+  }
+
+  private static class SomeProcessor1 extends HandleTypeOnlyTaskProcessor {
+    public SomeProcessor1(String... ceTaskTypes) {
+      super(ceTaskTypes);
+    }
+  }
+
+  private static class SomeProcessor2 extends HandleTypeOnlyTaskProcessor {
+    public SomeProcessor2(String... ceTaskTypes) {
+      super(ceTaskTypes);
+    }
+  }
+}
diff --git a/server/sonar-server/src/test/java/org/sonar/server/computation/taskprocessor/CeTaskProcessorRepositoryRule.java b/server/sonar-server/src/test/java/org/sonar/server/computation/taskprocessor/CeTaskProcessorRepositoryRule.java
new file mode 100644 (file)
index 0000000..02932ed
--- /dev/null
@@ -0,0 +1,77 @@
+/*
+ * SonarQube, open source software quality management tool.
+ * Copyright (C) 2008-2014 SonarSource
+ * mailto:contact AT sonarsource DOT com
+ *
+ * SonarQube is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 3 of the License, or (at your option) any later version.
+ *
+ * SonarQube is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this program; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
+ */
+package org.sonar.server.computation.taskprocessor;
+
+import com.google.common.base.Optional;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import org.junit.rules.ExternalResource;
+import org.sonar.server.computation.queue.CeTask;
+
+import static com.google.common.base.Preconditions.checkState;
+import static java.util.Objects.requireNonNull;
+
+/**
+ * A {@link org.junit.Rule} that implements the {@link CeTaskProcessorRepository} interface and
+ * requires consumer to explicitly define if a specific Task type has an associated {@link CeTaskProcessor} or not.
+ */
+public class CeTaskProcessorRepositoryRule extends ExternalResource implements CeTaskProcessorRepository {
+
+  private final Map<String, CeTaskProcessor> index = new HashMap<>();
+
+  @Override
+  protected void after() {
+    index.clear();
+  }
+
+  public CeTaskProcessorRepositoryRule setNoProcessorForTask(String taskType) {
+    index.put(requireNonNull(taskType), NoCeTaskProcessor.INSTANCE);
+    return this;
+  }
+
+  public CeTaskProcessorRepositoryRule setProcessorForTask(String taskType, CeTaskProcessor taskProcessor) {
+    index.put(requireNonNull(taskType), requireNonNull(taskProcessor));
+    return this;
+  }
+
+  @Override
+  public Optional<CeTaskProcessor> getForCeTask(CeTask ceTask) {
+    CeTaskProcessor taskProcessor = index.get(ceTask.getType());
+    checkState(taskProcessor != null, "CeTaskProcessor was not set in rule for task %s", ceTask);
+    return taskProcessor instanceof NoCeTaskProcessor ? Optional.<CeTaskProcessor>absent() : Optional.of(taskProcessor);
+  }
+
+  private enum NoCeTaskProcessor implements CeTaskProcessor {
+    INSTANCE;
+
+    private static final String UOE_MESSAGE = "NoCeTaskProcessor does not implement any method since it not supposed to be ever used";
+
+    @Override
+    public Set<String> getHandledCeTaskTypes() {
+      throw new UnsupportedOperationException(UOE_MESSAGE);
+    }
+
+    @Override
+    public void process(CeTask task) {
+      throw new UnsupportedOperationException(UOE_MESSAGE);
+    }
+  }
+}
diff --git a/server/sonar-server/src/test/java/org/sonar/server/computation/taskprocessor/CeWorkerRunnableImplTest.java b/server/sonar-server/src/test/java/org/sonar/server/computation/taskprocessor/CeWorkerRunnableImplTest.java
new file mode 100644 (file)
index 0000000..a95d836
--- /dev/null
@@ -0,0 +1,101 @@
+/*
+ * SonarQube, open source software quality management tool.
+ * Copyright (C) 2008-2014 SonarSource
+ * mailto:contact AT sonarsource DOT com
+ *
+ * SonarQube is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 3 of the License, or (at your option) any later version.
+ *
+ * SonarQube is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this program; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
+ */
+package org.sonar.server.computation.taskprocessor;
+
+import com.google.common.base.Optional;
+import org.junit.Rule;
+import org.junit.Test;
+import org.mockito.InOrder;
+import org.mockito.Mockito;
+import org.sonar.db.ce.CeActivityDto;
+import org.sonar.db.ce.CeTaskTypes;
+import org.sonar.server.computation.log.CeLogging;
+import org.sonar.server.computation.queue.CeQueue;
+import org.sonar.server.computation.queue.CeQueueImpl;
+import org.sonar.server.computation.queue.CeTask;
+import org.sonar.server.computation.taskprocessor.report.ReportTaskProcessor;
+
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verifyZeroInteractions;
+import static org.mockito.Mockito.when;
+
+public class CeWorkerRunnableImplTest {
+
+  @Rule
+  public CeTaskProcessorRepositoryRule taskProcessorRepository = new CeTaskProcessorRepositoryRule();
+
+  CeQueue queue = mock(CeQueueImpl.class);
+  ReportTaskProcessor taskProcessor = mock(ReportTaskProcessor.class);
+  CeLogging ceLogging = mock(CeLogging.class);
+  CeWorkerRunnable underTest = new CeWorkerRunnableImpl(queue, ceLogging, taskProcessorRepository);
+  InOrder inOrder = Mockito.inOrder(ceLogging, taskProcessor, queue);
+
+  @Test
+  public void no_pending_tasks_in_queue() throws Exception {
+    when(queue.peek()).thenReturn(Optional.<CeTask>absent());
+
+    underTest.run();
+
+    verifyZeroInteractions(taskProcessor, ceLogging);
+  }
+
+  @Test
+  public void fail_when_no_CeTaskProcessor_is_found_in_repository() {
+    CeTask task = new CeTask.Builder().setUuid("TASK_1").setType(CeTaskTypes.REPORT).setComponentUuid("PROJECT_1").setSubmitterLogin(null).build();
+    taskProcessorRepository.setNoProcessorForTask(CeTaskTypes.REPORT);
+    when(queue.peek()).thenReturn(Optional.of(task));
+
+    underTest.run();
+
+    inOrder.verify(ceLogging).initForTask(task);
+    inOrder.verify(queue).remove(task, CeActivityDto.Status.FAILED);
+    inOrder.verify(ceLogging).clearForTask();
+  }
+
+  @Test
+  public void peek_and_process_task() throws Exception {
+    CeTask task = new CeTask.Builder().setUuid("TASK_1").setType(CeTaskTypes.REPORT).setComponentUuid("PROJECT_1").setSubmitterLogin(null).build();
+    taskProcessorRepository.setProcessorForTask(task.getType(), taskProcessor);
+    when(queue.peek()).thenReturn(Optional.of(task));
+
+    underTest.run();
+
+    inOrder.verify(ceLogging).initForTask(task);
+    inOrder.verify(taskProcessor).process(task);
+    inOrder.verify(queue).remove(task, CeActivityDto.Status.SUCCESS);
+    inOrder.verify(ceLogging).clearForTask();
+  }
+
+  @Test
+  public void fail_to_process_task() throws Exception {
+    CeTask task = new CeTask.Builder().setUuid("TASK_1").setType(CeTaskTypes.REPORT).setComponentUuid("PROJECT_1").setSubmitterLogin(null).build();
+    when(queue.peek()).thenReturn(Optional.of(task));
+    taskProcessorRepository.setProcessorForTask(task.getType(), taskProcessor);
+    doThrow(new IllegalStateException("simulate exception thrown by TaskProcessor#process")).when(taskProcessor).process(task);
+
+    underTest.run();
+
+    inOrder.verify(ceLogging).initForTask(task);
+    inOrder.verify(taskProcessor).process(task);
+    inOrder.verify(queue).remove(task, CeActivityDto.Status.FAILED);
+    inOrder.verify(ceLogging).clearForTask();
+  }
+}