having a report specific scheduler is useless, tasks must be submitted to and only to CeQueue
--- /dev/null
+/*
+ * SonarQube, open source software quality management tool.
+ * Copyright (C) 2008-2014 SonarSource
+ * mailto:contact AT sonarsource DOT com
+ *
+ * SonarQube is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 3 of the License, or (at your option) any later version.
+ *
+ * SonarQube is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this program; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ */
+package org.sonar.server.computation;
+
+public interface CeProcessingScheduler {
+
+ void startScheduling();
+
+}
--- /dev/null
+/*
+ * SonarQube, open source software quality management tool.
+ * Copyright (C) 2008-2014 SonarSource
+ * mailto:contact AT sonarsource DOT com
+ *
+ * SonarQube is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 3 of the License, or (at your option) any later version.
+ *
+ * SonarQube is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this program; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ */
+package org.sonar.server.computation;
+
+import org.sonar.server.util.StoppableScheduledExecutorService;
+
+/**
+ * The {@link java.util.concurrent.ExecutorService} responsible for running {@link CeWorkerRunnable}.
+ */
+public interface CeProcessingSchedulerExecutorService extends StoppableScheduledExecutorService {
+}
--- /dev/null
+/*
+ * SonarQube, open source software quality management tool.
+ * Copyright (C) 2008-2014 SonarSource
+ * mailto:contact AT sonarsource DOT com
+ *
+ * SonarQube is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 3 of the License, or (at your option) any later version.
+ *
+ * SonarQube is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this program; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ */
+package org.sonar.server.computation;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import org.sonar.server.util.AbstractStoppableScheduledExecutorServiceImpl;
+
+public class CeProcessingSchedulerExecutorServiceImpl extends AbstractStoppableScheduledExecutorServiceImpl<ScheduledExecutorService>
+ implements CeProcessingSchedulerExecutorService {
+ private static final String THREAD_NAME_PREFIX = "ce-processor-";
+
+ public CeProcessingSchedulerExecutorServiceImpl() {
+ super(
+ Executors.newSingleThreadScheduledExecutor(
+ new ThreadFactoryBuilder()
+ .setNameFormat(THREAD_NAME_PREFIX + "%d")
+ .setPriority(Thread.MIN_PRIORITY)
+ .build()));
+ }
+
+}
--- /dev/null
+/*
+ * SonarQube, open source software quality management tool.
+ * Copyright (C) 2008-2014 SonarSource
+ * mailto:contact AT sonarsource DOT com
+ *
+ * SonarQube is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 3 of the License, or (at your option) any later version.
+ *
+ * SonarQube is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this program; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ */
+package org.sonar.server.computation;
+
+import java.util.concurrent.TimeUnit;
+import org.sonar.server.computation.log.CeLogging;
+
+public class CeProcessingSchedulerImpl implements CeProcessingScheduler {
+ private final CeProcessingSchedulerExecutorService executorService;
+ private final CeQueue ceQueue;
+ private final ReportTaskProcessor reportTaskProcessor;
+ private final CeLogging ceLogging;
+
+ private final long delayBetweenTasks;
+ private final long delayForFirstStart;
+ private final TimeUnit timeUnit;
+
+ public CeProcessingSchedulerImpl(CeProcessingSchedulerExecutorService processingExecutorService, CeQueue ceQueue, ReportTaskProcessor reportTaskProcessor, CeLogging ceLogging) {
+ this.executorService = processingExecutorService;
+ this.ceQueue = ceQueue;
+ this.reportTaskProcessor = reportTaskProcessor;
+ this.ceLogging = ceLogging;
+
+ this.delayBetweenTasks = 10;
+ this.delayForFirstStart = 0;
+ this.timeUnit = TimeUnit.SECONDS;
+ }
+
+ @Override
+ public void startScheduling() {
+ executorService.scheduleAtFixedRate(new CeWorkerRunnable(ceQueue, reportTaskProcessor, ceLogging), delayForFirstStart, delayBetweenTasks, timeUnit);
+ }
+
+}
private final DbClient dbClient;
private final CEQueueStatus queueStatus;
private final CeQueueCleaner cleaner;
- private final ReportProcessingScheduler scheduler;
+ private final CeProcessingScheduler scheduler;
- public CeQueueInitializer(DbClient dbClient, CEQueueStatus queueStatus, CeQueueCleaner cleaner, ReportProcessingScheduler scheduler) {
+ public CeQueueInitializer(DbClient dbClient, CEQueueStatus queueStatus, CeQueueCleaner cleaner, CeProcessingScheduler scheduler) {
this.dbClient = dbClient;
this.queueStatus = queueStatus;
this.cleaner = cleaner;
try {
initJmxCounters(dbSession);
cleaner.clean(dbSession);
- scheduler.schedule();
+ scheduler.startScheduling();
} finally {
dbClient.closeSession(dbSession);
+++ /dev/null
-/*
- * SonarQube, open source software quality management tool.
- * Copyright (C) 2008-2014 SonarSource
- * mailto:contact AT sonarsource DOT com
- *
- * SonarQube is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License as published by the Free Software Foundation; either
- * version 3 of the License, or (at your option) any later version.
- *
- * SonarQube is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public License
- * along with this program; if not, write to the Free Software Foundation,
- * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
- */
-package org.sonar.server.computation;
-
-/**
- * Worker that executes the tasks got from the queue
- */
-public interface CeWorker extends Runnable {
-}
+++ /dev/null
-/*
- * SonarQube, open source software quality management tool.
- * Copyright (C) 2008-2014 SonarSource
- * mailto:contact AT sonarsource DOT com
- *
- * SonarQube is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License as published by the Free Software Foundation; either
- * version 3 of the License, or (at your option) any later version.
- *
- * SonarQube is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public License
- * along with this program; if not, write to the Free Software Foundation,
- * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
- */
-
-package org.sonar.server.computation;
-
-import com.google.common.base.Optional;
-import org.sonar.api.utils.log.Logger;
-import org.sonar.api.utils.log.Loggers;
-import org.sonar.core.util.logs.Profiler;
-import org.sonar.db.ce.CeActivityDto;
-import org.sonar.server.computation.log.CeLogging;
-
-import static java.lang.String.format;
-
-public class CeWorkerImpl implements CeWorker {
-
- private static final Logger LOG = Loggers.get(CeWorkerImpl.class);
-
- private final CeQueue queue;
- private final ReportTaskProcessor reportTaskProcessor;
- private final CeLogging ceLogging;
-
- public CeWorkerImpl(CeQueue queue, ReportTaskProcessor reportTaskProcessor, CeLogging ceLogging) {
- this.queue = queue;
- this.reportTaskProcessor = reportTaskProcessor;
- this.ceLogging = ceLogging;
- }
-
- @Override
- public void run() {
- CeTask task;
- try {
- Optional<CeTask> taskOpt = queue.peek();
- if (!taskOpt.isPresent()) {
- return;
- }
- task = taskOpt.get();
- } catch (Exception e) {
- LOG.error("Failed to pop the queue of analysis reports", e);
- return;
- }
-
- ceLogging.initForTask(task);
- Profiler profiler = Profiler.create(LOG).startInfo(format("Analysis of project %s (report %s)", task.getComponentKey(), task.getUuid()));
- try {
- // TODO delegate the message to the related task processor, according to task type
- reportTaskProcessor.process(task);
- queue.remove(task, CeActivityDto.Status.SUCCESS);
- } catch (Throwable e) {
- LOG.error(format("Failed to process task %s", task.getUuid()), e);
- queue.remove(task, CeActivityDto.Status.FAILED);
- } finally {
- profiler.stopInfo();
- ceLogging.clearForTask();
- }
- }
-}
--- /dev/null
+/*
+ * SonarQube, open source software quality management tool.
+ * Copyright (C) 2008-2014 SonarSource
+ * mailto:contact AT sonarsource DOT com
+ *
+ * SonarQube is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 3 of the License, or (at your option) any later version.
+ *
+ * SonarQube is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this program; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ */
+
+package org.sonar.server.computation;
+
+import com.google.common.base.Optional;
+import org.sonar.api.utils.log.Logger;
+import org.sonar.api.utils.log.Loggers;
+import org.sonar.core.util.logs.Profiler;
+import org.sonar.db.ce.CeActivityDto;
+import org.sonar.server.computation.log.CeLogging;
+
+import static java.lang.String.format;
+
+class CeWorkerRunnable implements Runnable {
+
+ private static final Logger LOG = Loggers.get(CeWorkerRunnable.class);
+
+ private final CeQueue queue;
+ private final ReportTaskProcessor reportTaskProcessor;
+ private final CeLogging ceLogging;
+
+ public CeWorkerRunnable(CeQueue queue, ReportTaskProcessor reportTaskProcessor, CeLogging ceLogging) {
+ this.queue = queue;
+ this.reportTaskProcessor = reportTaskProcessor;
+ this.ceLogging = ceLogging;
+ }
+
+ @Override
+ public void run() {
+ Optional<CeTask> ceTask = tryAndFindTaskToExecute();
+ if (!ceTask.isPresent()) {
+ return;
+ }
+
+ executeTask(ceTask.get());
+ }
+
+ private Optional<CeTask> tryAndFindTaskToExecute() {
+ try {
+ return queue.peek();
+ } catch (Exception e) {
+ LOG.error("Failed to pop the queue of analysis reports", e);
+ }
+ return Optional.absent();
+ }
+
+ private void executeTask(CeTask task) {
+ ceLogging.initForTask(task);
+ Profiler profiler = Profiler.create(LOG).startInfo(format("Analysis of project %s (report %s)", task.getComponentKey(), task.getUuid()));
+ try {
+ // TODO delegate the message to the related task processor, according to task type
+ reportTaskProcessor.process(task);
+ queue.remove(task, CeActivityDto.Status.SUCCESS);
+ } catch (Throwable e) {
+ LOG.error(format("Failed to process task %s", task.getUuid()), e);
+ queue.remove(task, CeActivityDto.Status.FAILED);
+ } finally {
+ profiler.stopInfo(String.format("Total thread execution of project %s (report %s)", task.getComponentUuid(), task.getUuid()));
+ ceLogging.clearForTask();
+ }
+ }
+}
+++ /dev/null
-/*
- * SonarQube, open source software quality management tool.
- * Copyright (C) 2008-2014 SonarSource
- * mailto:contact AT sonarsource DOT com
- *
- * SonarQube is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License as published by the Free Software Foundation; either
- * version 3 of the License, or (at your option) any later version.
- *
- * SonarQube is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public License
- * along with this program; if not, write to the Free Software Foundation,
- * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
- */
-package org.sonar.server.computation;
-
-import org.sonar.server.util.StoppableScheduledExecutorService;
-
-/**
- * The {@link java.util.concurrent.ExecutorService} responsible for running {@link CeWorker}.
- */
-public interface ComputeEngineProcessingExecutorService extends StoppableScheduledExecutorService {
-}
+++ /dev/null
-/*
- * SonarQube, open source software quality management tool.
- * Copyright (C) 2008-2014 SonarSource
- * mailto:contact AT sonarsource DOT com
- *
- * SonarQube is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License as published by the Free Software Foundation; either
- * version 3 of the License, or (at your option) any later version.
- *
- * SonarQube is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public License
- * along with this program; if not, write to the Free Software Foundation,
- * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
- */
-package org.sonar.server.computation;
-
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import org.sonar.server.util.AbstractStoppableScheduledExecutorServiceImpl;
-
-public class ComputeEngineProcessingExecutorServiceImpl extends AbstractStoppableScheduledExecutorServiceImpl<ScheduledExecutorService>
- implements ComputeEngineProcessingExecutorService {
- private static final String THREAD_NAME_PREFIX = "ce-processor-";
-
- public ComputeEngineProcessingExecutorServiceImpl() {
- super(
- Executors.newSingleThreadScheduledExecutor(
- new ThreadFactoryBuilder()
- .setNameFormat(THREAD_NAME_PREFIX + "%d")
- .setPriority(Thread.MIN_PRIORITY)
- .build()));
- }
-
-}
@Override
protected void configureModule() {
add(
- CeWorkerImpl.class,
ContainerFactoryImpl.class,
ComputationStepExecutor.class,
ReportTaskProcessor.class,
- ReportProcessingScheduler.class,
- ReportProcessingSchedulerExecutorServiceImpl.class,
- ComputeEngineProcessingExecutorServiceImpl.class,
- ComputeEngineProcessingQueueImpl.class);
+ CeProcessingSchedulerExecutorServiceImpl.class,
+ CeProcessingSchedulerImpl.class);
}
}
+++ /dev/null
-/*
- * SonarQube, open source software quality management tool.
- * Copyright (C) 2008-2014 SonarSource
- * mailto:contact AT sonarsource DOT com
- *
- * SonarQube is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License as published by the Free Software Foundation; either
- * version 3 of the License, or (at your option) any later version.
- *
- * SonarQube is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public License
- * along with this program; if not, write to the Free Software Foundation,
- * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
- */
-package org.sonar.server.computation;
-
-public interface ComputeEngineProcessingQueue {
- /**
- * Adds a task to the Compute Engine processing queue.
- */
- void addTask(CeWorker task);
-}
+++ /dev/null
-/*
- * SonarQube, open source software quality management tool.
- * Copyright (C) 2008-2014 SonarSource
- * mailto:contact AT sonarsource DOT com
- *
- * SonarQube is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License as published by the Free Software Foundation; either
- * version 3 of the License, or (at your option) any later version.
- *
- * SonarQube is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public License
- * along with this program; if not, write to the Free Software Foundation,
- * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
- */
-package org.sonar.server.computation;
-
-import com.google.common.collect.Queues;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.TimeUnit;
-import org.sonar.api.platform.Server;
-import org.sonar.api.platform.ServerStartHandler;
-import org.sonar.api.utils.log.Logger;
-import org.sonar.api.utils.log.Loggers;
-
-import static java.util.Objects.requireNonNull;
-
-public class ComputeEngineProcessingQueueImpl implements ComputeEngineProcessingQueue, ServerStartHandler {
- private static final Logger LOG = Loggers.get(ComputeEngineProcessingQueueImpl.class);
-
- private final ComputeEngineProcessingExecutorService processingService;
- private final ConcurrentLinkedQueue<CeWorker> queue = Queues.newConcurrentLinkedQueue();
-
- private final long delayBetweenTasks;
- private final long delayForFirstStart;
- private final TimeUnit timeUnit;
-
- public ComputeEngineProcessingQueueImpl(ComputeEngineProcessingExecutorService processingExecutorService) {
- this.processingService = processingExecutorService;
-
- this.delayBetweenTasks = 10;
- this.delayForFirstStart = 0;
- this.timeUnit = TimeUnit.SECONDS;
- }
-
- @Override
- public void addTask(CeWorker task) {
- requireNonNull(task, "a ComputeEngineTask can not be null");
-
- queue.add(task);
- }
-
- @Override
- public void onServerStart(Server server) {
- processingService.scheduleAtFixedRate(new ProcessHeadOfQueueRunnable(), delayForFirstStart, delayBetweenTasks, timeUnit);
- }
-
- private class ProcessHeadOfQueueRunnable implements Runnable {
- @Override
- public void run() {
- CeWorker task = queue.poll();
- if (task != null) {
- try {
- task.run();
- } catch (Throwable e) {
- // we need to catch throwable, otherwise any task throwing an exception will cancel the scheduling of
- // ProcessHeadOfQueueRunnable in processingService
- LOG.error("Compute engine task failed", e);
- }
- }
- }
- }
-}
+++ /dev/null
-/*
- * SonarQube, open source software quality management tool.
- * Copyright (C) 2008-2014 SonarSource
- * mailto:contact AT sonarsource DOT com
- *
- * SonarQube is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License as published by the Free Software Foundation; either
- * version 3 of the License, or (at your option) any later version.
- *
- * SonarQube is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public License
- * along with this program; if not, write to the Free Software Foundation,
- * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
- */
-
-package org.sonar.server.computation;
-
-import java.util.concurrent.TimeUnit;
-
-/**
- * Adds tasks to the Compute Engine to process batch reports.
- */
-public class ReportProcessingScheduler {
- private final ReportProcessingSchedulerExecutorService reportProcessingSchedulerExecutorService;
- private final ComputeEngineProcessingQueue processingQueue;
- private final CeWorker worker;
-
- private final long delayBetweenTasks;
- private final long delayForFirstStart;
- private final TimeUnit timeUnit;
-
- public ReportProcessingScheduler(ReportProcessingSchedulerExecutorService reportProcessingSchedulerExecutorService,
- ComputeEngineProcessingQueue processingQueue,
- CeWorker worker) {
- this.reportProcessingSchedulerExecutorService = reportProcessingSchedulerExecutorService;
- this.processingQueue = processingQueue;
- this.worker = worker;
-
- this.delayBetweenTasks = 1;
- this.delayForFirstStart = 0;
- this.timeUnit = TimeUnit.SECONDS;
- }
-
- public void schedule() {
- reportProcessingSchedulerExecutorService.scheduleAtFixedRate(new AddReportProcessingToCEProcessingQueue(), delayForFirstStart, delayBetweenTasks, timeUnit);
- }
-
- private class AddReportProcessingToCEProcessingQueue implements Runnable {
- @Override
- public void run() {
- processingQueue.addTask(worker);
- }
- }
-}
+++ /dev/null
-/*
- * SonarQube, open source software quality management tool.
- * Copyright (C) 2008-2014 SonarSource
- * mailto:contact AT sonarsource DOT com
- *
- * SonarQube is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License as published by the Free Software Foundation; either
- * version 3 of the License, or (at your option) any later version.
- *
- * SonarQube is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public License
- * along with this program; if not, write to the Free Software Foundation,
- * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
- */
-package org.sonar.server.computation;
-
-import org.sonar.server.util.StoppableScheduledExecutorService;
-
-/**
- * ExecutorService responsible for adding {@link CeWorkerImpl} to {@link CeQueueImpl} on a regular basis.
- */
-public interface ReportProcessingSchedulerExecutorService extends StoppableScheduledExecutorService {
-}
+++ /dev/null
-/*
- * SonarQube, open source software quality management tool.
- * Copyright (C) 2008-2014 SonarSource
- * mailto:contact AT sonarsource DOT com
- *
- * SonarQube is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License as published by the Free Software Foundation; either
- * version 3 of the License, or (at your option) any later version.
- *
- * SonarQube is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public License
- * along with this program; if not, write to the Free Software Foundation,
- * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
- */
-package org.sonar.server.computation;
-
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import org.sonar.server.util.AbstractStoppableScheduledExecutorServiceImpl;
-
-public class ReportProcessingSchedulerExecutorServiceImpl extends AbstractStoppableScheduledExecutorServiceImpl<ScheduledExecutorService>
- implements ReportProcessingSchedulerExecutorService {
- private static final String THREAD_NAME_PREFIX = "ce-report-scheduler-";
-
- public ReportProcessingSchedulerExecutorServiceImpl() {
- super(
- Executors.newSingleThreadScheduledExecutor(
- new ThreadFactoryBuilder()
- .setNameFormat(THREAD_NAME_PREFIX + "%d")
- .setPriority(Thread.MIN_PRIORITY)
- .build()));
- }
-
-}
--- /dev/null
+/*
+ * SonarQube, open source software quality management tool.
+ * Copyright (C) 2008-2014 SonarSource
+ * mailto:contact AT sonarsource DOT com
+ *
+ * SonarQube is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 3 of the License, or (at your option) any later version.
+ *
+ * SonarQube is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this program; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ */
+package org.sonar.server.computation;
+
+import java.util.concurrent.TimeUnit;
+import org.junit.Test;
+import org.sonar.server.computation.log.CeLogging;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+
+public class CeProcessingSchedulerImplTest {
+ private CeProcessingSchedulerExecutorService processingExecutorService = mock(CeProcessingSchedulerExecutorService.class);
+ private CeQueue ceQueue = mock(CeQueue.class);
+ private ReportTaskProcessor reportTaskProcessor = mock(ReportTaskProcessor.class);
+ private CeLogging ceLogging = mock(CeLogging.class);
+ private CeProcessingSchedulerImpl underTest = new CeProcessingSchedulerImpl(processingExecutorService, ceQueue, reportTaskProcessor, ceLogging);
+
+ @Test
+ public void startScheduling_schedules_CeWorkerRunnable_at_fixed_rate_run_head_of_queue() {
+ underTest.startScheduling();
+
+ verify(processingExecutorService).scheduleAtFixedRate(any(CeWorkerRunnable.class), eq(0L), eq(10L), eq(TimeUnit.SECONDS));
+ verifyNoMoreInteractions(processingExecutorService);
+ }
+
+}
ReportFiles reportFiles = mock(ReportFiles.class, Mockito.RETURNS_DEEP_STUBS);
CEQueueStatus queueStatus = new CEQueueStatusImpl();
CeQueueCleaner cleaner = mock(CeQueueCleaner.class);
- ReportProcessingScheduler scheduler = mock(ReportProcessingScheduler.class);
+ CeProcessingScheduler scheduler = mock(CeProcessingScheduler.class);
CeQueueInitializer underTest = new CeQueueInitializer(dbTester.getDbClient(), queueStatus, cleaner, scheduler);
@Test
underTest.start();
inOrder.verify(cleaner).clean(any(DbSession.class));
- inOrder.verify(scheduler).schedule();
+ inOrder.verify(scheduler).startScheduling();
}
private void insertInQueue(String taskUuid, CeQueueDto.Status status) throws IOException {
+++ /dev/null
-/*
- * SonarQube, open source software quality management tool.
- * Copyright (C) 2008-2014 SonarSource
- * mailto:contact AT sonarsource DOT com
- *
- * SonarQube is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License as published by the Free Software Foundation; either
- * version 3 of the License, or (at your option) any later version.
- *
- * SonarQube is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public License
- * along with this program; if not, write to the Free Software Foundation,
- * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
- */
-package org.sonar.server.computation;
-
-import com.google.common.base.Optional;
-import org.junit.Test;
-import org.mockito.InOrder;
-import org.mockito.Mockito;
-import org.sonar.db.ce.CeActivityDto;
-import org.sonar.db.ce.CeTaskTypes;
-import org.sonar.server.computation.log.CeLogging;
-
-import static org.mockito.Mockito.doThrow;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.verifyZeroInteractions;
-import static org.mockito.Mockito.when;
-
-public class CeWorkerImplTest {
-
- CeQueue queue = mock(CeQueueImpl.class);
- ReportTaskProcessor taskProcessor = mock(ReportTaskProcessor.class);
- CeLogging ceLogging = mock(CeLogging.class);
- CeWorker underTest = new CeWorkerImpl(queue, taskProcessor, ceLogging);
-
- @Test
- public void no_pending_tasks_in_queue() throws Exception {
- when(queue.peek()).thenReturn(Optional.<CeTask>absent());
-
- underTest.run();
-
- verifyZeroInteractions(taskProcessor, ceLogging);
- }
-
- @Test
- public void peek_and_process_task() throws Exception {
- CeTask task = new CeTask.Builder().setUuid("TASK_1").setType(CeTaskTypes.REPORT).setComponentUuid("PROJECT_1").setSubmitterLogin(null).build();
- when(queue.peek()).thenReturn(Optional.of(task));
-
- underTest.run();
-
- InOrder inOrder = Mockito.inOrder(ceLogging, taskProcessor, queue);
- inOrder.verify(ceLogging).initForTask(task);
- inOrder.verify(taskProcessor).process(task);
- inOrder.verify(queue).remove(task, CeActivityDto.Status.SUCCESS);
- inOrder.verify(ceLogging).clearForTask();
- }
-
- @Test
- public void fail_to_process_task() throws Exception {
- CeTask task = new CeTask.Builder().setUuid("TASK_1").setType(CeTaskTypes.REPORT).setComponentUuid("PROJECT_1").setSubmitterLogin(null).build();
- when(queue.peek()).thenReturn(Optional.of(task));
- doThrow(new IllegalStateException()).when(taskProcessor).process(task);
-
- underTest.run();
-
- InOrder inOrder = Mockito.inOrder(ceLogging, taskProcessor, queue);
- inOrder.verify(ceLogging).initForTask(task);
- inOrder.verify(taskProcessor).process(task);
- inOrder.verify(queue).remove(task, CeActivityDto.Status.FAILED);
- inOrder.verify(ceLogging).clearForTask();
- }
-}
--- /dev/null
+/*
+ * SonarQube, open source software quality management tool.
+ * Copyright (C) 2008-2014 SonarSource
+ * mailto:contact AT sonarsource DOT com
+ *
+ * SonarQube is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 3 of the License, or (at your option) any later version.
+ *
+ * SonarQube is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this program; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ */
+package org.sonar.server.computation;
+
+import com.google.common.base.Optional;
+import org.junit.Test;
+import org.mockito.InOrder;
+import org.mockito.Mockito;
+import org.sonar.db.ce.CeActivityDto;
+import org.sonar.db.ce.CeTaskTypes;
+import org.sonar.server.computation.log.CeLogging;
+
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verifyZeroInteractions;
+import static org.mockito.Mockito.when;
+
+public class CeWorkerRunnableTest {
+
+ CeQueue queue = mock(CeQueueImpl.class);
+ ReportTaskProcessor taskProcessor = mock(ReportTaskProcessor.class);
+ CeLogging ceLogging = mock(CeLogging.class);
+ CeWorkerRunnable underTest = new CeWorkerRunnable(queue, taskProcessor, ceLogging);
+
+ @Test
+ public void no_pending_tasks_in_queue() throws Exception {
+ when(queue.peek()).thenReturn(Optional.<CeTask>absent());
+
+ underTest.run();
+
+ verifyZeroInteractions(taskProcessor, ceLogging);
+ }
+
+ @Test
+ public void peek_and_process_task() throws Exception {
+ CeTask task = new CeTask.Builder().setUuid("TASK_1").setType(CeTaskTypes.REPORT).setComponentUuid("PROJECT_1").setSubmitterLogin(null).build();
+ when(queue.peek()).thenReturn(Optional.of(task));
+
+ underTest.run();
+
+ InOrder inOrder = Mockito.inOrder(ceLogging, taskProcessor, queue);
+ inOrder.verify(ceLogging).initForTask(task);
+ inOrder.verify(taskProcessor).process(task);
+ inOrder.verify(queue).remove(task, CeActivityDto.Status.SUCCESS);
+ inOrder.verify(ceLogging).clearForTask();
+ }
+
+ @Test
+ public void fail_to_process_task() throws Exception {
+ CeTask task = new CeTask.Builder().setUuid("TASK_1").setType(CeTaskTypes.REPORT).setComponentUuid("PROJECT_1").setSubmitterLogin(null).build();
+ when(queue.peek()).thenReturn(Optional.of(task));
+ doThrow(new IllegalStateException()).when(taskProcessor).process(task);
+
+ underTest.run();
+
+ InOrder inOrder = Mockito.inOrder(ceLogging, taskProcessor, queue);
+ inOrder.verify(ceLogging).initForTask(task);
+ inOrder.verify(taskProcessor).process(task);
+ inOrder.verify(queue).remove(task, CeActivityDto.Status.FAILED);
+ inOrder.verify(ceLogging).clearForTask();
+ }
+}
+++ /dev/null
-/*
- * SonarQube, open source software quality management tool.
- * Copyright (C) 2008-2014 SonarSource
- * mailto:contact AT sonarsource DOT com
- *
- * SonarQube is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License as published by the Free Software Foundation; either
- * version 3 of the License, or (at your option) any later version.
- *
- * SonarQube is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public License
- * along with this program; if not, write to the Free Software Foundation,
- * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
- */
-package org.sonar.server.computation;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import org.junit.Test;
-import org.sonar.api.platform.Server;
-
-import static org.assertj.core.api.Assertions.assertThat;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.eq;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.verifyNoMoreInteractions;
-
-public class ComputeEngineProcessingQueueImplTest {
-
- @Test
- public void onServerStart_schedules_at_fixed_rate_run_head_of_queue() {
- ComputeEngineProcessingExecutorService processingExecutorService = mock(ComputeEngineProcessingExecutorService.class);
-
- ComputeEngineProcessingQueueImpl underTest = new ComputeEngineProcessingQueueImpl(processingExecutorService);
- underTest.onServerStart(mock(Server.class));
-
- verify(processingExecutorService).scheduleAtFixedRate(any(Runnable.class), eq(0L), eq(10L), eq(TimeUnit.SECONDS));
- verifyNoMoreInteractions(processingExecutorService);
- }
-
- @Test
- public void task_in_queue_is_called_run_only_once() {
- ComputeEngineProcessingExecutorServiceAdapter processingExecutorService = new SimulateFixedRateCallsProcessingExecutorService(10);
- CallCounterCeWorker task = new CallCounterCeWorker();
-
- ComputeEngineProcessingQueueImpl underTest = new ComputeEngineProcessingQueueImpl(processingExecutorService);
- underTest.addTask(task);
- underTest.onServerStart(mock(Server.class));
-
- assertThat(task.calls).isEqualTo(1);
- }
-
- @Test
- public void tasks_are_executed_in_order_of_addition() {
- ComputeEngineProcessingExecutorServiceAdapter processingExecutorService = new SimulateFixedRateCallsProcessingExecutorService(10);
-
- final List<Integer> nameList = new ArrayList<>();
-
- ComputeEngineProcessingQueueImpl underTest = new ComputeEngineProcessingQueueImpl(processingExecutorService);
- underTest.addTask(new CeWorker() {
- @Override
- public void run() {
- nameList.add(1);
- }
- });
- underTest.addTask(new CeWorker() {
- @Override
- public void run() {
- nameList.add(2);
- }
- });
- underTest.addTask(new CeWorker() {
- @Override
- public void run() {
- nameList.add(3);
- }
- });
- underTest.addTask(new CeWorker() {
- @Override
- public void run() {
- nameList.add(4);
- }
- });
-
- underTest.onServerStart(mock(Server.class));
-
- assertThat(nameList).containsExactly(1, 2, 3, 4);
- }
-
- @Test
- public void throwable_raised_by_a_ComputeEngineTask_must_be_caught() {
- ComputeEngineProcessingExecutorServiceAdapter processingExecutorService = new SimulateFixedRateCallsProcessingExecutorService(1);
-
- ComputeEngineProcessingQueueImpl underTest = new ComputeEngineProcessingQueueImpl(processingExecutorService);
- underTest.addTask(new CeWorker() {
- @Override
- public void run() {
- throw new RuntimeException("This should be caught by the processing queue");
- }
- });
-
- underTest.onServerStart(mock(Server.class));
- }
-
- private static class CallCounterCeWorker implements CeWorker {
- int calls = 0;
-
- @Override
- public void run() {
- calls++;
- }
- }
-
- private static class ComputeEngineProcessingExecutorServiceAdapter implements ComputeEngineProcessingExecutorService {
- @Override
- public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
- throw new UnsupportedOperationException("Not implemented!");
- }
-
- @Override
- public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
- throw new UnsupportedOperationException("Not implemented!");
- }
-
- @Override
- public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) {
- throw new UnsupportedOperationException("Not implemented!");
- }
-
- @Override
- public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) {
- throw new UnsupportedOperationException("Not implemented!");
- }
-
- @Override
- public void stop() {
- throw new UnsupportedOperationException("Not implemented!");
- }
-
- @Override
- public void shutdown() {
- throw new UnsupportedOperationException("Not implemented!");
- }
-
- @Override
- public List<Runnable> shutdownNow() {
- throw new UnsupportedOperationException("Not implemented!");
- }
-
- @Override
- public boolean isShutdown() {
- throw new UnsupportedOperationException("Not implemented!");
- }
-
- @Override
- public boolean isTerminated() {
- throw new UnsupportedOperationException("Not implemented!");
- }
-
- @Override
- public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
- throw new UnsupportedOperationException("Not implemented!");
- }
-
- @Override
- public <T> Future<T> submit(Callable<T> task) {
- throw new UnsupportedOperationException("Not implemented!");
- }
-
- @Override
- public <T> Future<T> submit(Runnable task, T result) {
- throw new UnsupportedOperationException("Not implemented!");
- }
-
- @Override
- public Future<?> submit(Runnable task) {
- throw new UnsupportedOperationException("Not implemented!");
- }
-
- @Override
- public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException {
- throw new UnsupportedOperationException("Not implemented!");
- }
-
- @Override
- public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException {
- throw new UnsupportedOperationException("Not implemented!");
- }
-
- @Override
- public <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException {
- throw new UnsupportedOperationException("Not implemented!");
- }
-
- @Override
- public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
- throw new UnsupportedOperationException("Not implemented!");
- }
-
- @Override
- public void execute(Runnable command) {
- throw new UnsupportedOperationException("Not implemented!");
- }
- }
-
- private static class SimulateFixedRateCallsProcessingExecutorService extends ComputeEngineProcessingExecutorServiceAdapter {
- private final int simulatedCalls;
-
- private SimulateFixedRateCallsProcessingExecutorService(int simulatedCalls) {
- this.simulatedCalls = simulatedCalls;
- }
-
- @Override
- public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) {
- // calling the runnable any number of times will only get a task run only once
- for (int i = 0; i < simulatedCalls ; i++) {
- command.run();
- }
- return null;
- }
- }
-}
+++ /dev/null
-/*
- * SonarQube, open source software quality management tool.
- * Copyright (C) 2008-2014 SonarSource
- * mailto:contact AT sonarsource DOT com
- *
- * SonarQube is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License as published by the Free Software Foundation; either
- * version 3 of the License, or (at your option) any later version.
- *
- * SonarQube is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public License
- * along with this program; if not, write to the Free Software Foundation,
- * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
- */
-
-package org.sonar.server.computation;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-import org.junit.Test;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
-
-import static org.assertj.core.api.Assertions.assertThat;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.eq;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-public class ReportProcessingSchedulerTest {
-
- ReportProcessingSchedulerExecutorService batchExecutorService = mock(ReportProcessingSchedulerExecutorService.class);
- SimpleComputeEngineProcessingQueue processingQueue = new SimpleComputeEngineProcessingQueue();
- CeWorker worker = mock(CeWorker.class);
- ReportProcessingScheduler underTest = new ReportProcessingScheduler(batchExecutorService, processingQueue, worker);
-
- @Test
- public void schedule_at_fixed_rate_adding_a_ReportProcessingTask_to_the_queue() throws Exception {
- when(batchExecutorService.scheduleAtFixedRate(any(Runnable.class), eq(0L), eq(1L), eq(TimeUnit.SECONDS)))
- .thenAnswer(new ExecuteFirstArgAsRunnable());
-
- underTest.schedule();
-
- assertThat(processingQueue.getTasks()).hasSize(1);
- assertThat(processingQueue.getTasks().iterator().next()).isInstanceOf(CeWorker.class);
- }
-
- private static class SimpleComputeEngineProcessingQueue implements ComputeEngineProcessingQueue {
- private final List<CeWorker> tasks = new ArrayList<>();
-
- @Override
- public void addTask(CeWorker task) {
- tasks.add(task);
- }
-
- public List<CeWorker> getTasks() {
- return tasks;
- }
- }
-
- private static class ExecuteFirstArgAsRunnable implements Answer<Object> {
- @Override
- public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
- Runnable runnable = (Runnable) invocationOnMock.getArguments()[0];
- runnable.run();
- return null;
- }
- }
-}