Browse Source

SONAR-6831 remove ReportProcessingScheduler and CeWorker queue

having a report specific scheduler is useless, tasks must be submitted to and only to CeQueue
tags/5.2-RC1
Sébastien Lesaint 8 years ago
parent
commit
24e3581dd2
16 changed files with 97 additions and 537 deletions
  1. 4
    4
      server/sonar-server/src/main/java/org/sonar/server/computation/CeProcessingScheduler.java
  2. 2
    2
      server/sonar-server/src/main/java/org/sonar/server/computation/CeProcessingSchedulerExecutorService.java
  3. 3
    3
      server/sonar-server/src/main/java/org/sonar/server/computation/CeProcessingSchedulerExecutorServiceImpl.java
  4. 15
    23
      server/sonar-server/src/main/java/org/sonar/server/computation/CeProcessingSchedulerImpl.java
  5. 3
    3
      server/sonar-server/src/main/java/org/sonar/server/computation/CeQueueInitializer.java
  6. 17
    11
      server/sonar-server/src/main/java/org/sonar/server/computation/CeWorkerRunnable.java
  7. 2
    5
      server/sonar-server/src/main/java/org/sonar/server/computation/ComputeEngineProcessingModule.java
  8. 0
    27
      server/sonar-server/src/main/java/org/sonar/server/computation/ComputeEngineProcessingQueue.java
  9. 0
    77
      server/sonar-server/src/main/java/org/sonar/server/computation/ComputeEngineProcessingQueueImpl.java
  10. 0
    28
      server/sonar-server/src/main/java/org/sonar/server/computation/ReportProcessingSchedulerExecutorService.java
  11. 0
    40
      server/sonar-server/src/main/java/org/sonar/server/computation/ReportProcessingSchedulerExecutorServiceImpl.java
  12. 47
    0
      server/sonar-server/src/test/java/org/sonar/server/computation/CeProcessingSchedulerImplTest.java
  13. 2
    2
      server/sonar-server/src/test/java/org/sonar/server/computation/CeQueueInitializerTest.java
  14. 2
    2
      server/sonar-server/src/test/java/org/sonar/server/computation/CeWorkerRunnableTest.java
  15. 0
    235
      server/sonar-server/src/test/java/org/sonar/server/computation/ComputeEngineProcessingQueueImplTest.java
  16. 0
    75
      server/sonar-server/src/test/java/org/sonar/server/computation/ReportProcessingSchedulerTest.java

server/sonar-server/src/main/java/org/sonar/server/computation/CeWorker.java → server/sonar-server/src/main/java/org/sonar/server/computation/CeProcessingScheduler.java View File

@@ -19,8 +19,8 @@
*/
package org.sonar.server.computation;

/**
* Worker that executes the tasks got from the queue
*/
public interface CeWorker extends Runnable {
public interface CeProcessingScheduler {
void startScheduling();
}

server/sonar-server/src/main/java/org/sonar/server/computation/ComputeEngineProcessingExecutorService.java → server/sonar-server/src/main/java/org/sonar/server/computation/CeProcessingSchedulerExecutorService.java View File

@@ -22,7 +22,7 @@ package org.sonar.server.computation;
import org.sonar.server.util.StoppableScheduledExecutorService;

/**
* The {@link java.util.concurrent.ExecutorService} responsible for running {@link CeWorker}.
* The {@link java.util.concurrent.ExecutorService} responsible for running {@link CeWorkerRunnable}.
*/
public interface ComputeEngineProcessingExecutorService extends StoppableScheduledExecutorService {
public interface CeProcessingSchedulerExecutorService extends StoppableScheduledExecutorService {
}

server/sonar-server/src/main/java/org/sonar/server/computation/ComputeEngineProcessingExecutorServiceImpl.java → server/sonar-server/src/main/java/org/sonar/server/computation/CeProcessingSchedulerExecutorServiceImpl.java View File

@@ -24,11 +24,11 @@ import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import org.sonar.server.util.AbstractStoppableScheduledExecutorServiceImpl;

public class ComputeEngineProcessingExecutorServiceImpl extends AbstractStoppableScheduledExecutorServiceImpl<ScheduledExecutorService>
implements ComputeEngineProcessingExecutorService {
public class CeProcessingSchedulerExecutorServiceImpl extends AbstractStoppableScheduledExecutorServiceImpl<ScheduledExecutorService>
implements CeProcessingSchedulerExecutorService {
private static final String THREAD_NAME_PREFIX = "ce-processor-";

public ComputeEngineProcessingExecutorServiceImpl() {
public CeProcessingSchedulerExecutorServiceImpl() {
super(
Executors.newSingleThreadScheduledExecutor(
new ThreadFactoryBuilder()

server/sonar-server/src/main/java/org/sonar/server/computation/ReportProcessingScheduler.java → server/sonar-server/src/main/java/org/sonar/server/computation/CeProcessingSchedulerImpl.java View File

@@ -17,43 +17,35 @@
* along with this program; if not, write to the Free Software Foundation,
* Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/

package org.sonar.server.computation;

import java.util.concurrent.TimeUnit;
import org.sonar.server.computation.log.CeLogging;

/**
* Adds tasks to the Compute Engine to process batch reports.
*/
public class ReportProcessingScheduler {
private final ReportProcessingSchedulerExecutorService reportProcessingSchedulerExecutorService;
private final ComputeEngineProcessingQueue processingQueue;
private final CeWorker worker;
public class CeProcessingSchedulerImpl implements CeProcessingScheduler {
private final CeProcessingSchedulerExecutorService executorService;
private final CeQueue ceQueue;
private final ReportTaskProcessor reportTaskProcessor;
private final CeLogging ceLogging;

private final long delayBetweenTasks;
private final long delayForFirstStart;
private final TimeUnit timeUnit;

public ReportProcessingScheduler(ReportProcessingSchedulerExecutorService reportProcessingSchedulerExecutorService,
ComputeEngineProcessingQueue processingQueue,
CeWorker worker) {
this.reportProcessingSchedulerExecutorService = reportProcessingSchedulerExecutorService;
this.processingQueue = processingQueue;
this.worker = worker;
public CeProcessingSchedulerImpl(CeProcessingSchedulerExecutorService processingExecutorService, CeQueue ceQueue, ReportTaskProcessor reportTaskProcessor, CeLogging ceLogging) {
this.executorService = processingExecutorService;
this.ceQueue = ceQueue;
this.reportTaskProcessor = reportTaskProcessor;
this.ceLogging = ceLogging;

this.delayBetweenTasks = 1;
this.delayBetweenTasks = 10;
this.delayForFirstStart = 0;
this.timeUnit = TimeUnit.SECONDS;
}

public void schedule() {
reportProcessingSchedulerExecutorService.scheduleAtFixedRate(new AddReportProcessingToCEProcessingQueue(), delayForFirstStart, delayBetweenTasks, timeUnit);
@Override
public void startScheduling() {
executorService.scheduleAtFixedRate(new CeWorkerRunnable(ceQueue, reportTaskProcessor, ceLogging), delayForFirstStart, delayBetweenTasks, timeUnit);
}

private class AddReportProcessingToCEProcessingQueue implements Runnable {
@Override
public void run() {
processingQueue.addTask(worker);
}
}
}

+ 3
- 3
server/sonar-server/src/main/java/org/sonar/server/computation/CeQueueInitializer.java View File

@@ -36,9 +36,9 @@ public class CeQueueInitializer implements Startable {
private final DbClient dbClient;
private final CEQueueStatus queueStatus;
private final CeQueueCleaner cleaner;
private final ReportProcessingScheduler scheduler;
private final CeProcessingScheduler scheduler;

public CeQueueInitializer(DbClient dbClient, CEQueueStatus queueStatus, CeQueueCleaner cleaner, ReportProcessingScheduler scheduler) {
public CeQueueInitializer(DbClient dbClient, CEQueueStatus queueStatus, CeQueueCleaner cleaner, CeProcessingScheduler scheduler) {
this.dbClient = dbClient;
this.queueStatus = queueStatus;
this.cleaner = cleaner;
@@ -51,7 +51,7 @@ public class CeQueueInitializer implements Startable {
try {
initJmxCounters(dbSession);
cleaner.clean(dbSession);
scheduler.schedule();
scheduler.startScheduling();

} finally {
dbClient.closeSession(dbSession);

server/sonar-server/src/main/java/org/sonar/server/computation/CeWorkerImpl.java → server/sonar-server/src/main/java/org/sonar/server/computation/CeWorkerRunnable.java View File

@@ -29,15 +29,15 @@ import org.sonar.server.computation.log.CeLogging;

import static java.lang.String.format;

public class CeWorkerImpl implements CeWorker {
class CeWorkerRunnable implements Runnable {

private static final Logger LOG = Loggers.get(CeWorkerImpl.class);
private static final Logger LOG = Loggers.get(CeWorkerRunnable.class);

private final CeQueue queue;
private final ReportTaskProcessor reportTaskProcessor;
private final CeLogging ceLogging;

public CeWorkerImpl(CeQueue queue, ReportTaskProcessor reportTaskProcessor, CeLogging ceLogging) {
public CeWorkerRunnable(CeQueue queue, ReportTaskProcessor reportTaskProcessor, CeLogging ceLogging) {
this.queue = queue;
this.reportTaskProcessor = reportTaskProcessor;
this.ceLogging = ceLogging;
@@ -45,18 +45,24 @@ public class CeWorkerImpl implements CeWorker {

@Override
public void run() {
CeTask task;
Optional<CeTask> ceTask = tryAndFindTaskToExecute();
if (!ceTask.isPresent()) {
return;
}

executeTask(ceTask.get());
}

private Optional<CeTask> tryAndFindTaskToExecute() {
try {
Optional<CeTask> taskOpt = queue.peek();
if (!taskOpt.isPresent()) {
return;
}
task = taskOpt.get();
return queue.peek();
} catch (Exception e) {
LOG.error("Failed to pop the queue of analysis reports", e);
return;
}
return Optional.absent();
}

private void executeTask(CeTask task) {
ceLogging.initForTask(task);
Profiler profiler = Profiler.create(LOG).startInfo(format("Analysis of project %s (report %s)", task.getComponentKey(), task.getUuid()));
try {
@@ -67,7 +73,7 @@ public class CeWorkerImpl implements CeWorker {
LOG.error(format("Failed to process task %s", task.getUuid()), e);
queue.remove(task, CeActivityDto.Status.FAILED);
} finally {
profiler.stopInfo();
profiler.stopInfo(String.format("Total thread execution of project %s (report %s)", task.getComponentUuid(), task.getUuid()));
ceLogging.clearForTask();
}
}

+ 2
- 5
server/sonar-server/src/main/java/org/sonar/server/computation/ComputeEngineProcessingModule.java View File

@@ -26,13 +26,10 @@ public class ComputeEngineProcessingModule extends Module {
@Override
protected void configureModule() {
add(
CeWorkerImpl.class,
ContainerFactoryImpl.class,
ComputationStepExecutor.class,
ReportTaskProcessor.class,
ReportProcessingScheduler.class,
ReportProcessingSchedulerExecutorServiceImpl.class,
ComputeEngineProcessingExecutorServiceImpl.class,
ComputeEngineProcessingQueueImpl.class);
CeProcessingSchedulerExecutorServiceImpl.class,
CeProcessingSchedulerImpl.class);
}
}

+ 0
- 27
server/sonar-server/src/main/java/org/sonar/server/computation/ComputeEngineProcessingQueue.java View File

@@ -1,27 +0,0 @@
/*
* SonarQube, open source software quality management tool.
* Copyright (C) 2008-2014 SonarSource
* mailto:contact AT sonarsource DOT com
*
* SonarQube is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License as published by the Free Software Foundation; either
* version 3 of the License, or (at your option) any later version.
*
* SonarQube is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with this program; if not, write to the Free Software Foundation,
* Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package org.sonar.server.computation;

public interface ComputeEngineProcessingQueue {
/**
* Adds a task to the Compute Engine processing queue.
*/
void addTask(CeWorker task);
}

+ 0
- 77
server/sonar-server/src/main/java/org/sonar/server/computation/ComputeEngineProcessingQueueImpl.java View File

@@ -1,77 +0,0 @@
/*
* SonarQube, open source software quality management tool.
* Copyright (C) 2008-2014 SonarSource
* mailto:contact AT sonarsource DOT com
*
* SonarQube is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License as published by the Free Software Foundation; either
* version 3 of the License, or (at your option) any later version.
*
* SonarQube is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with this program; if not, write to the Free Software Foundation,
* Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package org.sonar.server.computation;

import com.google.common.collect.Queues;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeUnit;
import org.sonar.api.platform.Server;
import org.sonar.api.platform.ServerStartHandler;
import org.sonar.api.utils.log.Logger;
import org.sonar.api.utils.log.Loggers;

import static java.util.Objects.requireNonNull;

public class ComputeEngineProcessingQueueImpl implements ComputeEngineProcessingQueue, ServerStartHandler {
private static final Logger LOG = Loggers.get(ComputeEngineProcessingQueueImpl.class);

private final ComputeEngineProcessingExecutorService processingService;
private final ConcurrentLinkedQueue<CeWorker> queue = Queues.newConcurrentLinkedQueue();

private final long delayBetweenTasks;
private final long delayForFirstStart;
private final TimeUnit timeUnit;

public ComputeEngineProcessingQueueImpl(ComputeEngineProcessingExecutorService processingExecutorService) {
this.processingService = processingExecutorService;

this.delayBetweenTasks = 10;
this.delayForFirstStart = 0;
this.timeUnit = TimeUnit.SECONDS;
}

@Override
public void addTask(CeWorker task) {
requireNonNull(task, "a ComputeEngineTask can not be null");

queue.add(task);
}

@Override
public void onServerStart(Server server) {
processingService.scheduleAtFixedRate(new ProcessHeadOfQueueRunnable(), delayForFirstStart, delayBetweenTasks, timeUnit);
}

private class ProcessHeadOfQueueRunnable implements Runnable {
@Override
public void run() {
CeWorker task = queue.poll();
if (task != null) {
try {
task.run();
} catch (Throwable e) {
// we need to catch throwable, otherwise any task throwing an exception will cancel the scheduling of
// ProcessHeadOfQueueRunnable in processingService
LOG.error("Compute engine task failed", e);
}
}
}
}
}

+ 0
- 28
server/sonar-server/src/main/java/org/sonar/server/computation/ReportProcessingSchedulerExecutorService.java View File

@@ -1,28 +0,0 @@
/*
* SonarQube, open source software quality management tool.
* Copyright (C) 2008-2014 SonarSource
* mailto:contact AT sonarsource DOT com
*
* SonarQube is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License as published by the Free Software Foundation; either
* version 3 of the License, or (at your option) any later version.
*
* SonarQube is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with this program; if not, write to the Free Software Foundation,
* Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package org.sonar.server.computation;

import org.sonar.server.util.StoppableScheduledExecutorService;

/**
* ExecutorService responsible for adding {@link CeWorkerImpl} to {@link CeQueueImpl} on a regular basis.
*/
public interface ReportProcessingSchedulerExecutorService extends StoppableScheduledExecutorService {
}

+ 0
- 40
server/sonar-server/src/main/java/org/sonar/server/computation/ReportProcessingSchedulerExecutorServiceImpl.java View File

@@ -1,40 +0,0 @@
/*
* SonarQube, open source software quality management tool.
* Copyright (C) 2008-2014 SonarSource
* mailto:contact AT sonarsource DOT com
*
* SonarQube is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License as published by the Free Software Foundation; either
* version 3 of the License, or (at your option) any later version.
*
* SonarQube is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with this program; if not, write to the Free Software Foundation,
* Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package org.sonar.server.computation;

import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import org.sonar.server.util.AbstractStoppableScheduledExecutorServiceImpl;

public class ReportProcessingSchedulerExecutorServiceImpl extends AbstractStoppableScheduledExecutorServiceImpl<ScheduledExecutorService>
implements ReportProcessingSchedulerExecutorService {
private static final String THREAD_NAME_PREFIX = "ce-report-scheduler-";

public ReportProcessingSchedulerExecutorServiceImpl() {
super(
Executors.newSingleThreadScheduledExecutor(
new ThreadFactoryBuilder()
.setNameFormat(THREAD_NAME_PREFIX + "%d")
.setPriority(Thread.MIN_PRIORITY)
.build()));
}

}

+ 47
- 0
server/sonar-server/src/test/java/org/sonar/server/computation/CeProcessingSchedulerImplTest.java View File

@@ -0,0 +1,47 @@
/*
* SonarQube, open source software quality management tool.
* Copyright (C) 2008-2014 SonarSource
* mailto:contact AT sonarsource DOT com
*
* SonarQube is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License as published by the Free Software Foundation; either
* version 3 of the License, or (at your option) any later version.
*
* SonarQube is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with this program; if not, write to the Free Software Foundation,
* Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package org.sonar.server.computation;

import java.util.concurrent.TimeUnit;
import org.junit.Test;
import org.sonar.server.computation.log.CeLogging;

import static org.mockito.Matchers.any;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;

public class CeProcessingSchedulerImplTest {
private CeProcessingSchedulerExecutorService processingExecutorService = mock(CeProcessingSchedulerExecutorService.class);
private CeQueue ceQueue = mock(CeQueue.class);
private ReportTaskProcessor reportTaskProcessor = mock(ReportTaskProcessor.class);
private CeLogging ceLogging = mock(CeLogging.class);
private CeProcessingSchedulerImpl underTest = new CeProcessingSchedulerImpl(processingExecutorService, ceQueue, reportTaskProcessor, ceLogging);

@Test
public void startScheduling_schedules_CeWorkerRunnable_at_fixed_rate_run_head_of_queue() {
underTest.startScheduling();

verify(processingExecutorService).scheduleAtFixedRate(any(CeWorkerRunnable.class), eq(0L), eq(10L), eq(TimeUnit.SECONDS));
verifyNoMoreInteractions(processingExecutorService);
}

}

+ 2
- 2
server/sonar-server/src/test/java/org/sonar/server/computation/CeQueueInitializerTest.java View File

@@ -50,7 +50,7 @@ public class CeQueueInitializerTest {
ReportFiles reportFiles = mock(ReportFiles.class, Mockito.RETURNS_DEEP_STUBS);
CEQueueStatus queueStatus = new CEQueueStatusImpl();
CeQueueCleaner cleaner = mock(CeQueueCleaner.class);
ReportProcessingScheduler scheduler = mock(ReportProcessingScheduler.class);
CeProcessingScheduler scheduler = mock(CeProcessingScheduler.class);
CeQueueInitializer underTest = new CeQueueInitializer(dbTester.getDbClient(), queueStatus, cleaner, scheduler);

@Test
@@ -79,7 +79,7 @@ public class CeQueueInitializerTest {
underTest.start();

inOrder.verify(cleaner).clean(any(DbSession.class));
inOrder.verify(scheduler).schedule();
inOrder.verify(scheduler).startScheduling();
}

private void insertInQueue(String taskUuid, CeQueueDto.Status status) throws IOException {

server/sonar-server/src/test/java/org/sonar/server/computation/CeWorkerImplTest.java → server/sonar-server/src/test/java/org/sonar/server/computation/CeWorkerRunnableTest.java View File

@@ -32,12 +32,12 @@ import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verifyZeroInteractions;
import static org.mockito.Mockito.when;

public class CeWorkerImplTest {
public class CeWorkerRunnableTest {

CeQueue queue = mock(CeQueueImpl.class);
ReportTaskProcessor taskProcessor = mock(ReportTaskProcessor.class);
CeLogging ceLogging = mock(CeLogging.class);
CeWorker underTest = new CeWorkerImpl(queue, taskProcessor, ceLogging);
CeWorkerRunnable underTest = new CeWorkerRunnable(queue, taskProcessor, ceLogging);

@Test
public void no_pending_tasks_in_queue() throws Exception {

+ 0
- 235
server/sonar-server/src/test/java/org/sonar/server/computation/ComputeEngineProcessingQueueImplTest.java View File

@@ -1,235 +0,0 @@
/*
* SonarQube, open source software quality management tool.
* Copyright (C) 2008-2014 SonarSource
* mailto:contact AT sonarsource DOT com
*
* SonarQube is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License as published by the Free Software Foundation; either
* version 3 of the License, or (at your option) any later version.
*
* SonarQube is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with this program; if not, write to the Free Software Foundation,
* Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package org.sonar.server.computation;

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.junit.Test;
import org.sonar.api.platform.Server;

import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;

public class ComputeEngineProcessingQueueImplTest {

@Test
public void onServerStart_schedules_at_fixed_rate_run_head_of_queue() {
ComputeEngineProcessingExecutorService processingExecutorService = mock(ComputeEngineProcessingExecutorService.class);

ComputeEngineProcessingQueueImpl underTest = new ComputeEngineProcessingQueueImpl(processingExecutorService);
underTest.onServerStart(mock(Server.class));

verify(processingExecutorService).scheduleAtFixedRate(any(Runnable.class), eq(0L), eq(10L), eq(TimeUnit.SECONDS));
verifyNoMoreInteractions(processingExecutorService);
}

@Test
public void task_in_queue_is_called_run_only_once() {
ComputeEngineProcessingExecutorServiceAdapter processingExecutorService = new SimulateFixedRateCallsProcessingExecutorService(10);
CallCounterCeWorker task = new CallCounterCeWorker();

ComputeEngineProcessingQueueImpl underTest = new ComputeEngineProcessingQueueImpl(processingExecutorService);
underTest.addTask(task);
underTest.onServerStart(mock(Server.class));

assertThat(task.calls).isEqualTo(1);
}

@Test
public void tasks_are_executed_in_order_of_addition() {
ComputeEngineProcessingExecutorServiceAdapter processingExecutorService = new SimulateFixedRateCallsProcessingExecutorService(10);

final List<Integer> nameList = new ArrayList<>();

ComputeEngineProcessingQueueImpl underTest = new ComputeEngineProcessingQueueImpl(processingExecutorService);
underTest.addTask(new CeWorker() {
@Override
public void run() {
nameList.add(1);
}
});
underTest.addTask(new CeWorker() {
@Override
public void run() {
nameList.add(2);
}
});
underTest.addTask(new CeWorker() {
@Override
public void run() {
nameList.add(3);
}
});
underTest.addTask(new CeWorker() {
@Override
public void run() {
nameList.add(4);
}
});

underTest.onServerStart(mock(Server.class));

assertThat(nameList).containsExactly(1, 2, 3, 4);
}

@Test
public void throwable_raised_by_a_ComputeEngineTask_must_be_caught() {
ComputeEngineProcessingExecutorServiceAdapter processingExecutorService = new SimulateFixedRateCallsProcessingExecutorService(1);

ComputeEngineProcessingQueueImpl underTest = new ComputeEngineProcessingQueueImpl(processingExecutorService);
underTest.addTask(new CeWorker() {
@Override
public void run() {
throw new RuntimeException("This should be caught by the processing queue");
}
});

underTest.onServerStart(mock(Server.class));
}

private static class CallCounterCeWorker implements CeWorker {
int calls = 0;

@Override
public void run() {
calls++;
}
}

private static class ComputeEngineProcessingExecutorServiceAdapter implements ComputeEngineProcessingExecutorService {
@Override
public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
throw new UnsupportedOperationException("Not implemented!");
}

@Override
public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
throw new UnsupportedOperationException("Not implemented!");
}

@Override
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) {
throw new UnsupportedOperationException("Not implemented!");
}

@Override
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) {
throw new UnsupportedOperationException("Not implemented!");
}

@Override
public void stop() {
throw new UnsupportedOperationException("Not implemented!");
}

@Override
public void shutdown() {
throw new UnsupportedOperationException("Not implemented!");
}

@Override
public List<Runnable> shutdownNow() {
throw new UnsupportedOperationException("Not implemented!");
}

@Override
public boolean isShutdown() {
throw new UnsupportedOperationException("Not implemented!");
}

@Override
public boolean isTerminated() {
throw new UnsupportedOperationException("Not implemented!");
}

@Override
public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
throw new UnsupportedOperationException("Not implemented!");
}

@Override
public <T> Future<T> submit(Callable<T> task) {
throw new UnsupportedOperationException("Not implemented!");
}

@Override
public <T> Future<T> submit(Runnable task, T result) {
throw new UnsupportedOperationException("Not implemented!");
}

@Override
public Future<?> submit(Runnable task) {
throw new UnsupportedOperationException("Not implemented!");
}

@Override
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException {
throw new UnsupportedOperationException("Not implemented!");
}

@Override
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException {
throw new UnsupportedOperationException("Not implemented!");
}

@Override
public <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException {
throw new UnsupportedOperationException("Not implemented!");
}

@Override
public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
throw new UnsupportedOperationException("Not implemented!");
}

@Override
public void execute(Runnable command) {
throw new UnsupportedOperationException("Not implemented!");
}
}

private static class SimulateFixedRateCallsProcessingExecutorService extends ComputeEngineProcessingExecutorServiceAdapter {
private final int simulatedCalls;

private SimulateFixedRateCallsProcessingExecutorService(int simulatedCalls) {
this.simulatedCalls = simulatedCalls;
}

@Override
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) {
// calling the runnable any number of times will only get a task run only once
for (int i = 0; i < simulatedCalls ; i++) {
command.run();
}
return null;
}
}
}

+ 0
- 75
server/sonar-server/src/test/java/org/sonar/server/computation/ReportProcessingSchedulerTest.java View File

@@ -1,75 +0,0 @@
/*
* SonarQube, open source software quality management tool.
* Copyright (C) 2008-2014 SonarSource
* mailto:contact AT sonarsource DOT com
*
* SonarQube is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License as published by the Free Software Foundation; either
* version 3 of the License, or (at your option) any later version.
*
* SonarQube is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with this program; if not, write to the Free Software Foundation,
* Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/

package org.sonar.server.computation;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.junit.Test;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;

import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

public class ReportProcessingSchedulerTest {

ReportProcessingSchedulerExecutorService batchExecutorService = mock(ReportProcessingSchedulerExecutorService.class);
SimpleComputeEngineProcessingQueue processingQueue = new SimpleComputeEngineProcessingQueue();
CeWorker worker = mock(CeWorker.class);
ReportProcessingScheduler underTest = new ReportProcessingScheduler(batchExecutorService, processingQueue, worker);

@Test
public void schedule_at_fixed_rate_adding_a_ReportProcessingTask_to_the_queue() throws Exception {
when(batchExecutorService.scheduleAtFixedRate(any(Runnable.class), eq(0L), eq(1L), eq(TimeUnit.SECONDS)))
.thenAnswer(new ExecuteFirstArgAsRunnable());

underTest.schedule();

assertThat(processingQueue.getTasks()).hasSize(1);
assertThat(processingQueue.getTasks().iterator().next()).isInstanceOf(CeWorker.class);
}

private static class SimpleComputeEngineProcessingQueue implements ComputeEngineProcessingQueue {
private final List<CeWorker> tasks = new ArrayList<>();

@Override
public void addTask(CeWorker task) {
tasks.add(task);
}

public List<CeWorker> getTasks() {
return tasks;
}
}

private static class ExecuteFirstArgAsRunnable implements Answer<Object> {
@Override
public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
Runnable runnable = (Runnable) invocationOnMock.getArguments()[0];
runnable.run();
return null;
}
}
}

Loading…
Cancel
Save