import org.sonar.server.computation.container.ContainerFactoryImpl;
import org.sonar.server.computation.queue.CeProcessingSchedulerExecutorServiceImpl;
import org.sonar.server.computation.queue.CeProcessingSchedulerImpl;
+import org.sonar.server.computation.queue.CeWorkerRunnableImpl;
import org.sonar.server.computation.queue.report.ReportTaskProcessor;
public class ComputeEngineProcessingModule extends Module {
ComputationStepExecutor.class,
ReportTaskProcessor.class,
CeProcessingSchedulerExecutorServiceImpl.class,
+ CeWorkerRunnableImpl.class,
CeProcessingSchedulerImpl.class);
}
}
import org.sonar.server.util.StoppableScheduledExecutorService;
/**
- * The {@link java.util.concurrent.ExecutorService} responsible for running {@link CeWorkerRunnable}.
+ * The {@link java.util.concurrent.ExecutorService} responsible for running {@link CeWorkerRunnableImpl}.
*/
public interface CeProcessingSchedulerExecutorService extends StoppableScheduledExecutorService {
}
package org.sonar.server.computation.queue;
import java.util.concurrent.TimeUnit;
-import org.sonar.server.computation.queue.report.ReportTaskProcessor;
-import org.sonar.server.computation.log.CeLogging;
public class CeProcessingSchedulerImpl implements CeProcessingScheduler {
private final CeProcessingSchedulerExecutorService executorService;
- private final CeQueue ceQueue;
- private final ReportTaskProcessor reportTaskProcessor;
- private final CeLogging ceLogging;
+ private final CeWorkerRunnable workerRunnable;
private final long delayBetweenTasks;
private final long delayForFirstStart;
private final TimeUnit timeUnit;
- public CeProcessingSchedulerImpl(CeProcessingSchedulerExecutorService processingExecutorService, CeQueue ceQueue, ReportTaskProcessor reportTaskProcessor, CeLogging ceLogging) {
+ public CeProcessingSchedulerImpl(CeProcessingSchedulerExecutorService processingExecutorService, CeWorkerRunnable workerRunnable) {
this.executorService = processingExecutorService;
- this.ceQueue = ceQueue;
- this.reportTaskProcessor = reportTaskProcessor;
- this.ceLogging = ceLogging;
+ this.workerRunnable = workerRunnable;
this.delayBetweenTasks = 10;
this.delayForFirstStart = 0;
@Override
public void startScheduling() {
- executorService.scheduleAtFixedRate(new CeWorkerRunnable(ceQueue, reportTaskProcessor, ceLogging), delayForFirstStart, delayBetweenTasks, timeUnit);
+ executorService.scheduleAtFixedRate(workerRunnable, delayForFirstStart, delayBetweenTasks, timeUnit);
}
}
* along with this program; if not, write to the Free Software Foundation,
* Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
-
package org.sonar.server.computation.queue;
-import com.google.common.base.Optional;
-import org.sonar.api.utils.log.Logger;
-import org.sonar.api.utils.log.Loggers;
-import org.sonar.core.util.logs.Profiler;
-import org.sonar.db.ce.CeActivityDto;
-import org.sonar.server.computation.log.CeLogging;
-import org.sonar.server.computation.queue.report.ReportTaskProcessor;
-
-import static java.lang.String.format;
-
-class CeWorkerRunnable implements Runnable {
-
- private static final Logger LOG = Loggers.get(CeWorkerRunnable.class);
-
- private final CeQueue queue;
- private final ReportTaskProcessor reportTaskProcessor;
- private final CeLogging ceLogging;
-
- public CeWorkerRunnable(CeQueue queue, ReportTaskProcessor reportTaskProcessor, CeLogging ceLogging) {
- this.queue = queue;
- this.reportTaskProcessor = reportTaskProcessor;
- this.ceLogging = ceLogging;
- }
-
- @Override
- public void run() {
- Optional<CeTask> ceTask = tryAndFindTaskToExecute();
- if (!ceTask.isPresent()) {
- return;
- }
-
- executeTask(ceTask.get());
- }
-
- private Optional<CeTask> tryAndFindTaskToExecute() {
- try {
- return queue.peek();
- } catch (Exception e) {
- LOG.error("Failed to pop the queue of analysis reports", e);
- }
- return Optional.absent();
- }
-
- private void executeTask(CeTask task) {
- ceLogging.initForTask(task);
- Profiler profiler = Profiler.create(LOG).startInfo(format("Analysis of project %s (report %s)", task.getComponentKey(), task.getUuid()));
- try {
- // TODO delegate the message to the related task processor, according to task type
- reportTaskProcessor.process(task);
- queue.remove(task, CeActivityDto.Status.SUCCESS);
- } catch (Throwable e) {
- LOG.error(format("Failed to process task %s", task.getUuid()), e);
- queue.remove(task, CeActivityDto.Status.FAILED);
- } finally {
- profiler.stopInfo(String.format("Total thread execution of project %s (report %s)", task.getComponentUuid(), task.getUuid()));
- ceLogging.clearForTask();
- }
- }
+/**
+ * Marker interface of the runnable in charge of polling the {@link CeQueue} and executing {@link CeTask}.
+ */
+public interface CeWorkerRunnable extends Runnable {
}
--- /dev/null
+/*
+ * SonarQube, open source software quality management tool.
+ * Copyright (C) 2008-2014 SonarSource
+ * mailto:contact AT sonarsource DOT com
+ *
+ * SonarQube is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 3 of the License, or (at your option) any later version.
+ *
+ * SonarQube is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this program; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ */
+
+package org.sonar.server.computation.queue;
+
+import com.google.common.base.Optional;
+import org.sonar.api.utils.log.Logger;
+import org.sonar.api.utils.log.Loggers;
+import org.sonar.core.util.logs.Profiler;
+import org.sonar.db.ce.CeActivityDto;
+import org.sonar.server.computation.log.CeLogging;
+import org.sonar.server.computation.queue.report.ReportTaskProcessor;
+
+import static java.lang.String.format;
+
+public class CeWorkerRunnableImpl implements CeWorkerRunnable {
+
+ private static final Logger LOG = Loggers.get(CeWorkerRunnableImpl.class);
+
+ private final CeQueue queue;
+ private final ReportTaskProcessor reportTaskProcessor;
+ private final CeLogging ceLogging;
+
+ public CeWorkerRunnableImpl(CeQueue queue, ReportTaskProcessor reportTaskProcessor, CeLogging ceLogging) {
+ this.queue = queue;
+ this.reportTaskProcessor = reportTaskProcessor;
+ this.ceLogging = ceLogging;
+ }
+
+ @Override
+ public void run() {
+ Optional<CeTask> ceTask = tryAndFindTaskToExecute();
+ if (!ceTask.isPresent()) {
+ return;
+ }
+
+ executeTask(ceTask.get());
+ }
+
+ private Optional<CeTask> tryAndFindTaskToExecute() {
+ try {
+ return queue.peek();
+ } catch (Exception e) {
+ LOG.error("Failed to pop the queue of analysis reports", e);
+ }
+ return Optional.absent();
+ }
+
+ private void executeTask(CeTask task) {
+ ceLogging.initForTask(task);
+ Profiler profiler = Profiler.create(LOG).startInfo(format("Analysis of project %s (report %s)", task.getComponentKey(), task.getUuid()));
+ try {
+ // TODO delegate the message to the related task processor, according to task type
+ reportTaskProcessor.process(task);
+ queue.remove(task, CeActivityDto.Status.SUCCESS);
+ } catch (Throwable e) {
+ LOG.error(format("Failed to process task %s", task.getUuid()), e);
+ queue.remove(task, CeActivityDto.Status.FAILED);
+ } finally {
+ profiler.stopInfo(String.format("Total thread execution of project %s (report %s)", task.getComponentUuid(), task.getUuid()));
+ ceLogging.clearForTask();
+ }
+ }
+}
import java.util.concurrent.TimeUnit;
import org.junit.Test;
-import org.sonar.server.computation.queue.report.ReportTaskProcessor;
-import org.sonar.server.computation.log.CeLogging;
-import static org.mockito.Matchers.any;
import static org.mockito.Matchers.eq;
+import static org.mockito.Matchers.same;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
public class CeProcessingSchedulerImplTest {
private CeProcessingSchedulerExecutorService processingExecutorService = mock(CeProcessingSchedulerExecutorService.class);
- private CeQueue ceQueue = mock(CeQueue.class);
- private ReportTaskProcessor reportTaskProcessor = mock(ReportTaskProcessor.class);
- private CeLogging ceLogging = mock(CeLogging.class);
- private CeProcessingSchedulerImpl underTest = new CeProcessingSchedulerImpl(processingExecutorService, ceQueue, reportTaskProcessor, ceLogging);
+ private CeWorkerRunnable workerRunnable = mock(CeWorkerRunnable.class);
+ private CeProcessingSchedulerImpl underTest = new CeProcessingSchedulerImpl(processingExecutorService, workerRunnable);
@Test
public void startScheduling_schedules_CeWorkerRunnable_at_fixed_rate_run_head_of_queue() {
underTest.startScheduling();
- verify(processingExecutorService).scheduleAtFixedRate(any(CeWorkerRunnable.class), eq(0L), eq(10L), eq(TimeUnit.SECONDS));
+ verify(processingExecutorService).scheduleAtFixedRate(same(workerRunnable), eq(0L), eq(10L), eq(TimeUnit.SECONDS));
verifyNoMoreInteractions(processingExecutorService);
}
CeQueue queue = mock(CeQueueImpl.class);
ReportTaskProcessor taskProcessor = mock(ReportTaskProcessor.class);
CeLogging ceLogging = mock(CeLogging.class);
- CeWorkerRunnable underTest = new CeWorkerRunnable(queue, taskProcessor, ceLogging);
+ CeWorkerRunnable underTest = new CeWorkerRunnableImpl(queue, taskProcessor, ceLogging);
@Test
public void no_pending_tasks_in_queue() throws Exception {