aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorSébastien Lesaint <sebastien.lesaint@sonarsource.com>2015-12-01 17:20:09 +0100
committerSébastien Lesaint <sebastien.lesaint@sonarsource.com>2015-12-02 18:13:05 +0100
commitef483f44a0f330c58b16ef843bc722b4e87743a7 (patch)
treeb92cb3c6b2a7dcdf9f3ec27fe7cd1b8fcff5fd43
parent99751f508494d59c1394d374163d86a27868106f (diff)
downloadsonarqube-ef483f44a0f330c58b16ef843bc722b4e87743a7.tar.gz
sonarqube-ef483f44a0f330c58b16ef843bc722b4e87743a7.zip
SONAR-7088 faster consumption of pending tasks in CE
-rw-r--r--server/sonar-server/src/main/java/org/sonar/server/computation/taskprocessor/CeProcessingSchedulerExecutorService.java5
-rw-r--r--server/sonar-server/src/main/java/org/sonar/server/computation/taskprocessor/CeProcessingSchedulerExecutorServiceImpl.java56
-rw-r--r--server/sonar-server/src/main/java/org/sonar/server/computation/taskprocessor/CeProcessingSchedulerImpl.java48
-rw-r--r--server/sonar-server/src/main/java/org/sonar/server/computation/taskprocessor/CeWorkerRunnable.java3
-rw-r--r--server/sonar-server/src/main/java/org/sonar/server/computation/taskprocessor/CeWorkerRunnableImpl.java5
-rw-r--r--server/sonar-server/src/test/java/org/sonar/server/computation/taskprocessor/CeProcessingSchedulerImplTest.java423
-rw-r--r--server/sonar-server/src/test/java/org/sonar/server/computation/taskprocessor/CeWorkerRunnableImplTest.java11
7 files changed, 519 insertions, 32 deletions
diff --git a/server/sonar-server/src/main/java/org/sonar/server/computation/taskprocessor/CeProcessingSchedulerExecutorService.java b/server/sonar-server/src/main/java/org/sonar/server/computation/taskprocessor/CeProcessingSchedulerExecutorService.java
index 3fc101ed7ab..33d02bf48aa 100644
--- a/server/sonar-server/src/main/java/org/sonar/server/computation/taskprocessor/CeProcessingSchedulerExecutorService.java
+++ b/server/sonar-server/src/main/java/org/sonar/server/computation/taskprocessor/CeProcessingSchedulerExecutorService.java
@@ -19,10 +19,11 @@
*/
package org.sonar.server.computation.taskprocessor;
-import org.sonar.server.util.StoppableScheduledExecutorService;
+import com.google.common.util.concurrent.ListeningScheduledExecutorService;
+import org.sonar.server.util.StoppableExecutorService;
/**
* The {@link java.util.concurrent.ExecutorService} responsible for running {@link CeWorkerRunnableImpl}.
*/
-public interface CeProcessingSchedulerExecutorService extends StoppableScheduledExecutorService {
+public interface CeProcessingSchedulerExecutorService extends StoppableExecutorService, ListeningScheduledExecutorService {
}
diff --git a/server/sonar-server/src/main/java/org/sonar/server/computation/taskprocessor/CeProcessingSchedulerExecutorServiceImpl.java b/server/sonar-server/src/main/java/org/sonar/server/computation/taskprocessor/CeProcessingSchedulerExecutorServiceImpl.java
index 8ea57466439..3970c17d460 100644
--- a/server/sonar-server/src/main/java/org/sonar/server/computation/taskprocessor/CeProcessingSchedulerExecutorServiceImpl.java
+++ b/server/sonar-server/src/main/java/org/sonar/server/computation/taskprocessor/CeProcessingSchedulerExecutorServiceImpl.java
@@ -19,22 +19,62 @@
*/
package org.sonar.server.computation.taskprocessor;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListenableScheduledFuture;
+import com.google.common.util.concurrent.ListeningScheduledExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import java.util.concurrent.Callable;
import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import org.sonar.server.util.AbstractStoppableScheduledExecutorServiceImpl;
+import java.util.concurrent.TimeUnit;
+import org.sonar.server.util.AbstractStoppableExecutorService;
-public class CeProcessingSchedulerExecutorServiceImpl extends AbstractStoppableScheduledExecutorServiceImpl<ScheduledExecutorService>
+public class CeProcessingSchedulerExecutorServiceImpl extends AbstractStoppableExecutorService<ListeningScheduledExecutorService>
implements CeProcessingSchedulerExecutorService {
private static final String THREAD_NAME_PREFIX = "ce-processor-";
public CeProcessingSchedulerExecutorServiceImpl() {
super(
- Executors.newSingleThreadScheduledExecutor(
- new ThreadFactoryBuilder()
- .setNameFormat(THREAD_NAME_PREFIX + "%d")
- .setPriority(Thread.MIN_PRIORITY)
- .build()));
+ MoreExecutors.listeningDecorator(
+ Executors.newSingleThreadScheduledExecutor(
+ new ThreadFactoryBuilder()
+ .setNameFormat(THREAD_NAME_PREFIX + "%d")
+ .setPriority(Thread.MIN_PRIORITY)
+ .build())));
}
+ @Override
+ public <T> ListenableFuture<T> submit(Callable<T> task) {
+ return delegate.submit(task);
+ }
+
+ @Override
+ public <T> ListenableFuture<T> submit(Runnable task, T result) {
+ return delegate.submit(task, result);
+ }
+
+ @Override
+ public ListenableFuture<?> submit(Runnable task) {
+ return delegate.submit(task);
+ }
+
+ @Override
+ public ListenableScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
+ return delegate.schedule(command, delay, unit);
+ }
+
+ @Override
+ public <V> ListenableScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
+ return delegate.schedule(callable, delay, unit);
+ }
+
+ @Override
+ public ListenableScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) {
+ return delegate.scheduleAtFixedRate(command, initialDelay, period, unit);
+ }
+
+ @Override
+ public ListenableScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) {
+ return delegate.scheduleWithFixedDelay(command, initialDelay, delay, unit);
+ }
}
diff --git a/server/sonar-server/src/main/java/org/sonar/server/computation/taskprocessor/CeProcessingSchedulerImpl.java b/server/sonar-server/src/main/java/org/sonar/server/computation/taskprocessor/CeProcessingSchedulerImpl.java
index 4737c4010dd..f7c58020800 100644
--- a/server/sonar-server/src/main/java/org/sonar/server/computation/taskprocessor/CeProcessingSchedulerImpl.java
+++ b/server/sonar-server/src/main/java/org/sonar/server/computation/taskprocessor/CeProcessingSchedulerImpl.java
@@ -19,14 +19,24 @@
*/
package org.sonar.server.computation.taskprocessor;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.ListenableScheduledFuture;
import java.util.concurrent.TimeUnit;
+import javax.annotation.Nullable;
+import org.sonar.api.utils.log.Logger;
+import org.sonar.api.utils.log.Loggers;
+
+import static com.google.common.util.concurrent.Futures.addCallback;
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static java.util.concurrent.TimeUnit.SECONDS;
public class CeProcessingSchedulerImpl implements CeProcessingScheduler {
+ private static final Logger LOG = Loggers.get(CeProcessingSchedulerImpl.class);
+
private final CeProcessingSchedulerExecutorService executorService;
private final CeWorkerRunnable workerRunnable;
private final long delayBetweenTasks;
- private final long delayForFirstStart;
private final TimeUnit timeUnit;
public CeProcessingSchedulerImpl(CeProcessingSchedulerExecutorService processingExecutorService, CeWorkerRunnable workerRunnable) {
@@ -34,13 +44,43 @@ public class CeProcessingSchedulerImpl implements CeProcessingScheduler {
this.workerRunnable = workerRunnable;
this.delayBetweenTasks = 2;
- this.delayForFirstStart = 0;
- this.timeUnit = TimeUnit.SECONDS;
+ this.timeUnit = SECONDS;
}
@Override
public void startScheduling() {
- executorService.scheduleAtFixedRate(workerRunnable, delayForFirstStart, delayBetweenTasks, timeUnit);
+ ListenableScheduledFuture<Boolean> future = executorService.schedule(workerRunnable, delayBetweenTasks, timeUnit);
+
+ FutureCallback<Boolean> chainingCallback = new ChainingCallback();
+ addCallback(future, chainingCallback, executorService);
}
+ private class ChainingCallback implements FutureCallback<Boolean> {
+ @Override
+ public void onSuccess(@Nullable Boolean result) {
+ if (result != null && result) {
+ chainWithoutDelay();
+ } else {
+ chainTask(delayBetweenTasks, timeUnit);
+ }
+ }
+
+ @Override
+ public void onFailure(Throwable t) {
+ if (!(t instanceof Error)) {
+ chainWithoutDelay();
+ } else {
+ LOG.error("Compute Engine execution failed. Scheduled processing interrupted.", t);
+ }
+ }
+
+ private void chainWithoutDelay() {
+ chainTask(1, MILLISECONDS);
+ }
+
+ private void chainTask(long delay, TimeUnit unit) {
+ ListenableScheduledFuture<Boolean> future = executorService.schedule(workerRunnable, delay, unit);
+ addCallback(future, this, executorService);
+ }
+ }
}
diff --git a/server/sonar-server/src/main/java/org/sonar/server/computation/taskprocessor/CeWorkerRunnable.java b/server/sonar-server/src/main/java/org/sonar/server/computation/taskprocessor/CeWorkerRunnable.java
index 2df1a66743e..d6c8fb86b89 100644
--- a/server/sonar-server/src/main/java/org/sonar/server/computation/taskprocessor/CeWorkerRunnable.java
+++ b/server/sonar-server/src/main/java/org/sonar/server/computation/taskprocessor/CeWorkerRunnable.java
@@ -19,11 +19,12 @@
*/
package org.sonar.server.computation.taskprocessor;
+import java.util.concurrent.Callable;
import org.sonar.server.computation.queue.CeQueue;
import org.sonar.server.computation.queue.CeTask;
/**
* Marker interface of the runnable in charge of polling the {@link CeQueue} and executing {@link CeTask}.
*/
-public interface CeWorkerRunnable extends Runnable {
+public interface CeWorkerRunnable extends Callable<Boolean> {
}
diff --git a/server/sonar-server/src/main/java/org/sonar/server/computation/taskprocessor/CeWorkerRunnableImpl.java b/server/sonar-server/src/main/java/org/sonar/server/computation/taskprocessor/CeWorkerRunnableImpl.java
index e20b6ac647e..a7b28ee9367 100644
--- a/server/sonar-server/src/main/java/org/sonar/server/computation/taskprocessor/CeWorkerRunnableImpl.java
+++ b/server/sonar-server/src/main/java/org/sonar/server/computation/taskprocessor/CeWorkerRunnableImpl.java
@@ -46,13 +46,14 @@ public class CeWorkerRunnableImpl implements CeWorkerRunnable {
}
@Override
- public void run() {
+ public Boolean call() throws Exception {
Optional<CeTask> ceTask = tryAndFindTaskToExecute();
if (!ceTask.isPresent()) {
- return;
+ return false;
}
executeTask(ceTask.get());
+ return true;
}
private Optional<CeTask> tryAndFindTaskToExecute() {
diff --git a/server/sonar-server/src/test/java/org/sonar/server/computation/taskprocessor/CeProcessingSchedulerImplTest.java b/server/sonar-server/src/test/java/org/sonar/server/computation/taskprocessor/CeProcessingSchedulerImplTest.java
index c94bbd4312d..f50ab2c8e48 100644
--- a/server/sonar-server/src/test/java/org/sonar/server/computation/taskprocessor/CeProcessingSchedulerImplTest.java
+++ b/server/sonar-server/src/test/java/org/sonar/server/computation/taskprocessor/CeProcessingSchedulerImplTest.java
@@ -19,26 +19,429 @@
*/
package org.sonar.server.computation.taskprocessor;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListenableScheduledFuture;
+import com.google.common.util.concurrent.ListeningScheduledExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Objects;
+import java.util.Queue;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.Delayed;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.Immutable;
+import org.junit.Rule;
import org.junit.Test;
+import org.junit.rules.Timeout;
-import static org.mockito.Matchers.eq;
-import static org.mockito.Matchers.same;
+import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.verifyNoMoreInteractions;
+import static org.mockito.Mockito.when;
public class CeProcessingSchedulerImplTest {
- private CeProcessingSchedulerExecutorService processingExecutorService = mock(CeProcessingSchedulerExecutorService.class);
- private CeWorkerRunnable workerRunnable = mock(CeWorkerRunnable.class);
- private CeProcessingSchedulerImpl underTest = new CeProcessingSchedulerImpl(processingExecutorService, workerRunnable);
+ private static final Error ERROR_TO_INTERRUPT_CHAINING = new Error("Error should stop scheduling");
+
+ @Rule
+ // due to risks of infinite chaining of tasks/futures, a timeout is required for safety
+ public Timeout timeout = Timeout.seconds(1);
+
+ private CeWorkerRunnable ceWorkerRunnable = mock(CeWorkerRunnable.class);
+ private StubCeProcessingSchedulerExecutorService processingExecutorService = new StubCeProcessingSchedulerExecutorService();
+ private ScheduleCall regularDelayedPoll = new ScheduleCall(ceWorkerRunnable, 2L, TimeUnit.SECONDS);
+ private ScheduleCall notDelayedPoll = new ScheduleCall(ceWorkerRunnable, 1L, TimeUnit.MILLISECONDS);
+
+ private CeProcessingSchedulerImpl underTest = new CeProcessingSchedulerImpl(processingExecutorService, ceWorkerRunnable);
+
+ @Test
+ public void polls_without_delay_when_CeWorkerRunnable_returns_true() throws Exception {
+ when(ceWorkerRunnable.call())
+ .thenReturn(true)
+ .thenThrow(ERROR_TO_INTERRUPT_CHAINING);
+
+ startSchedulingAndRun();
+
+ assertThat(processingExecutorService.getScheduleCalls()).containsOnly(
+ regularDelayedPoll,
+ notDelayedPoll
+ );
+ }
+
+ @Test
+ public void polls_without_delay_when_CeWorkerRunnable_throws_Exception_but_not_Error() throws Exception {
+ when(ceWorkerRunnable.call())
+ .thenThrow(new Exception("Exception is followed by a poll without delay"))
+ .thenThrow(ERROR_TO_INTERRUPT_CHAINING);
+
+ startSchedulingAndRun();
+
+ assertThat(processingExecutorService.getScheduleCalls()).containsExactly(
+ regularDelayedPoll,
+ notDelayedPoll
+ );
+ }
+
+ @Test
+ public void polls_with_regular_delay_when_CeWorkerRunnable_returns_false() throws Exception {
+ when(ceWorkerRunnable.call())
+ .thenReturn(false)
+ .thenThrow(ERROR_TO_INTERRUPT_CHAINING);
+
+ startSchedulingAndRun();
+
+ assertThat(processingExecutorService.getScheduleCalls()).containsExactly(
+ regularDelayedPoll,
+ regularDelayedPoll
+ );
+ }
@Test
- public void startScheduling_schedules_CeWorkerRunnable_at_fixed_rate_run_head_of_queue() {
+ public void startScheduling_schedules_CeWorkerRunnable_at_fixed_rate_run_head_of_queue() throws Exception {
+ when(ceWorkerRunnable.call())
+ .thenReturn(true)
+ .thenReturn(true)
+ .thenReturn(false)
+ .thenReturn(true)
+ .thenReturn(false)
+ .thenThrow(new Exception("IAE should not cause scheduling to stop"))
+ .thenReturn(false)
+ .thenReturn(false)
+ .thenReturn(false)
+ .thenThrow(ERROR_TO_INTERRUPT_CHAINING);
+
+ startSchedulingAndRun();
+
+ assertThat(processingExecutorService.getScheduleCalls()).containsExactly(
+ regularDelayedPoll,
+ notDelayedPoll,
+ notDelayedPoll,
+ regularDelayedPoll,
+ notDelayedPoll,
+ regularDelayedPoll,
+ notDelayedPoll,
+ regularDelayedPoll,
+ regularDelayedPoll,
+ regularDelayedPoll
+ );
+ }
+
+ private void startSchedulingAndRun() throws ExecutionException, InterruptedException {
underTest.startScheduling();
- verify(processingExecutorService).scheduleAtFixedRate(same(workerRunnable), eq(0L), eq(2L), eq(TimeUnit.SECONDS));
- verifyNoMoreInteractions(processingExecutorService);
+ // execute future synchronously
+ processingExecutorService.runFutures();
+ }
+
+ /**
+ * A synchronous implementation of {@link CeProcessingSchedulerExecutorService} which exposes a synchronous
+ * method to execute futures it creates and exposes a method to retrieve logs of calls to
+ * {@link CeProcessingSchedulerExecutorService#schedule(Callable, long, TimeUnit)} which is used by
+ * {@link CeProcessingSchedulerImpl}.
+ */
+ private static class StubCeProcessingSchedulerExecutorService implements CeProcessingSchedulerExecutorService {
+
+ private final Queue<ScheduledFuture<?>> futures = new ConcurrentLinkedQueue<>();
+ private final ListeningScheduledExecutorService delegate = MoreExecutors.listeningDecorator(new SynchronousStubExecutorService());
+
+ private final List<ScheduleCall> scheduleCalls = new ArrayList<>();
+
+ public List<ScheduleCall> getScheduleCalls() {
+ return scheduleCalls;
+ }
+
+ public void runFutures() throws ExecutionException, InterruptedException {
+ while (futures.peek() != null) {
+ futures.poll().get();
+ }
+ }
+
+ @Override
+ public <V> ListenableScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
+ this.scheduleCalls.add(new ScheduleCall(callable, delay, unit));
+ return delegate.schedule(callable, delay, unit);
+ }
+
+ @Override
+ public void stop() {
+ throw new UnsupportedOperationException("stop() not implemented");
+ }
+
+ // ////////////// delegated methods ////////////////
+
+ @Override
+ public ListenableScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
+ return delegate.schedule(command, delay, unit);
+ }
+
+ @Override
+ public ListenableScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) {
+ return delegate.scheduleAtFixedRate(command, initialDelay, period, unit);
+ }
+
+ @Override
+ public ListenableScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) {
+ return delegate.scheduleWithFixedDelay(command, initialDelay, delay, unit);
+ }
+
+ @Override
+ public void shutdown() {
+ delegate.shutdown();
+ }
+
+ @Override
+ public List<Runnable> shutdownNow() {
+ return delegate.shutdownNow();
+ }
+
+ @Override
+ public boolean isShutdown() {
+ return delegate.isShutdown();
+ }
+
+ @Override
+ public boolean isTerminated() {
+ return delegate.isTerminated();
+ }
+
+ @Override
+ public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
+ return delegate.awaitTermination(timeout, unit);
+ }
+
+ @Override
+ public <T> ListenableFuture<T> submit(Callable<T> task) {
+ return delegate.submit(task);
+ }
+
+ @Override
+ public <T> ListenableFuture<T> submit(Runnable task, T result) {
+ return delegate.submit(task, result);
+ }
+
+ @Override
+ public ListenableFuture<?> submit(Runnable task) {
+ return delegate.submit(task);
+ }
+
+ @Override
+ public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException {
+ return delegate.invokeAll(tasks);
+ }
+
+ @Override
+ public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException {
+ return delegate.invokeAll(tasks, timeout, unit);
+ }
+
+ @Override
+ public <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException {
+ return delegate.invokeAny(tasks);
+ }
+
+ @Override
+ public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
+ return delegate.invokeAny(tasks, timeout, unit);
+ }
+
+ @Override
+ public void execute(Runnable command) {
+ delegate.execute(command);
+ }
+
+ /**
+ * A partial (only 3 methods) implementation of ScheduledExecutorService which stores futures it creates into
+ * {@link StubCeProcessingSchedulerExecutorService#futures}.
+ */
+ private class SynchronousStubExecutorService implements ScheduledExecutorService {
+ @Override
+ public ScheduledFuture<?> schedule(final Runnable command, long delay, TimeUnit unit) {
+ ScheduledFuture<Void> res = new AbstractPartiallyImplementedScheduledFuture<Void>() {
+ @Override
+ public Void get() throws InterruptedException, ExecutionException {
+ command.run();
+ return null;
+ }
+ };
+ futures.add(res);
+ return res;
+ }
+
+ @Override
+ public <V> ScheduledFuture<V> schedule(final Callable<V> callable, long delay, TimeUnit unit) {
+ ScheduledFuture<V> res = new AbstractPartiallyImplementedScheduledFuture<V>() {
+
+ @Override
+ public V get() throws InterruptedException, ExecutionException {
+ try {
+ return callable.call();
+ } catch (Exception e) {
+ throw new ExecutionException(e);
+ }
+ }
+ };
+ futures.add(res);
+ return res;
+ }
+
+ @Override
+ public void execute(Runnable command) {
+ command.run();
+ }
+
+ // ///////// unsupported operations ///////////
+
+ @Override
+ public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) {
+ throw new UnsupportedOperationException("scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) not implemented");
+ }
+
+ @Override
+ public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) {
+ throw new UnsupportedOperationException("scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) not implemented");
+ }
+
+ @Override
+ public void shutdown() {
+ throw new UnsupportedOperationException("shutdown() not implemented");
+ }
+
+ @Override
+ public List<Runnable> shutdownNow() {
+ throw new UnsupportedOperationException("shutdownNow() not implemented");
+ }
+
+ @Override
+ public boolean isShutdown() {
+ throw new UnsupportedOperationException("isShutdown() not implemented");
+ }
+
+ @Override
+ public boolean isTerminated() {
+ throw new UnsupportedOperationException("isTerminated() not implemented");
+ }
+
+ @Override
+ public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
+ throw new UnsupportedOperationException("awaitTermination(long timeout, TimeUnit unit) not implemented");
+ }
+
+ @Override
+ public <T> Future<T> submit(Callable<T> task) {
+ throw new UnsupportedOperationException("submit(Callable<T> task) not implemented");
+ }
+
+ @Override
+ public <T> Future<T> submit(Runnable task, T result) {
+ throw new UnsupportedOperationException("submit(Runnable task, T result) not implemented");
+ }
+
+ @Override
+ public Future<?> submit(Runnable task) {
+ throw new UnsupportedOperationException("submit(Runnable task) not implemented");
+ }
+
+ @Override
+ public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException {
+ throw new UnsupportedOperationException("invokeAll(Collection<? extends Callable<T>> tasks) not implemented");
+ }
+
+ @Override
+ public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException {
+ throw new UnsupportedOperationException("invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) not implemented");
+ }
+
+ @Override
+ public <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException {
+ throw new UnsupportedOperationException("invokeAny(Collection<? extends Callable<T>> tasks) not implemented");
+ }
+
+ @Override
+ public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
+ throw new UnsupportedOperationException("invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) not implemented");
+ }
+ }
+ }
+
+ private static abstract class AbstractPartiallyImplementedScheduledFuture<V> implements ScheduledFuture<V> {
+ @Override
+ public long getDelay(TimeUnit unit) {
+ throw new UnsupportedOperationException("XXX not implemented");
+ }
+
+ @Override
+ public int compareTo(Delayed o) {
+ throw new UnsupportedOperationException("XXX not implemented");
+ }
+
+ @Override
+ public boolean cancel(boolean mayInterruptIfRunning) {
+ throw new UnsupportedOperationException("XXX not implemented");
+ }
+
+ @Override
+ public boolean isCancelled() {
+ throw new UnsupportedOperationException("XXX not implemented");
+ }
+
+ @Override
+ public boolean isDone() {
+ throw new UnsupportedOperationException("XXX not implemented");
+ }
+
+ @Override
+ public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
+ throw new UnsupportedOperationException("XXX not implemented");
+ }
+ }
+
+ /**
+ * Used to log parameters of calls to {@link CeProcessingSchedulerExecutorService#schedule(Callable, long, TimeUnit)}
+ */
+ @Immutable
+ private static final class ScheduleCall {
+ private final Callable<?> callable;
+ private final long delay;
+ private final TimeUnit unit;
+
+ private ScheduleCall(Callable<?> callable, long delay, TimeUnit unit) {
+ this.callable = callable;
+ this.delay = delay;
+ this.unit = unit;
+ }
+
+ @Override
+ public boolean equals(@Nullable Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ ScheduleCall that = (ScheduleCall) o;
+ return delay == that.delay && callable == that.callable && unit.equals(that.unit);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(callable, delay, unit);
+ }
+
+ @Override
+ public String toString() {
+ return "ScheduleCall{" +
+ "callable=" + callable +
+ ", delay=" + delay +
+ ", unit=" + unit +
+ '}';
+ }
}
}
diff --git a/server/sonar-server/src/test/java/org/sonar/server/computation/taskprocessor/CeWorkerRunnableImplTest.java b/server/sonar-server/src/test/java/org/sonar/server/computation/taskprocessor/CeWorkerRunnableImplTest.java
index a95d8365421..305c5e321d4 100644
--- a/server/sonar-server/src/test/java/org/sonar/server/computation/taskprocessor/CeWorkerRunnableImplTest.java
+++ b/server/sonar-server/src/test/java/org/sonar/server/computation/taskprocessor/CeWorkerRunnableImplTest.java
@@ -32,6 +32,7 @@ import org.sonar.server.computation.queue.CeQueueImpl;
import org.sonar.server.computation.queue.CeTask;
import org.sonar.server.computation.taskprocessor.report.ReportTaskProcessor;
+import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verifyZeroInteractions;
@@ -52,18 +53,18 @@ public class CeWorkerRunnableImplTest {
public void no_pending_tasks_in_queue() throws Exception {
when(queue.peek()).thenReturn(Optional.<CeTask>absent());
- underTest.run();
+ assertThat(underTest.call()).isFalse();
verifyZeroInteractions(taskProcessor, ceLogging);
}
@Test
- public void fail_when_no_CeTaskProcessor_is_found_in_repository() {
+ public void fail_when_no_CeTaskProcessor_is_found_in_repository() throws Exception {
CeTask task = new CeTask.Builder().setUuid("TASK_1").setType(CeTaskTypes.REPORT).setComponentUuid("PROJECT_1").setSubmitterLogin(null).build();
taskProcessorRepository.setNoProcessorForTask(CeTaskTypes.REPORT);
when(queue.peek()).thenReturn(Optional.of(task));
- underTest.run();
+ assertThat(underTest.call()).isTrue();
inOrder.verify(ceLogging).initForTask(task);
inOrder.verify(queue).remove(task, CeActivityDto.Status.FAILED);
@@ -76,7 +77,7 @@ public class CeWorkerRunnableImplTest {
taskProcessorRepository.setProcessorForTask(task.getType(), taskProcessor);
when(queue.peek()).thenReturn(Optional.of(task));
- underTest.run();
+ assertThat(underTest.call()).isTrue();
inOrder.verify(ceLogging).initForTask(task);
inOrder.verify(taskProcessor).process(task);
@@ -91,7 +92,7 @@ public class CeWorkerRunnableImplTest {
taskProcessorRepository.setProcessorForTask(task.getType(), taskProcessor);
doThrow(new IllegalStateException("simulate exception thrown by TaskProcessor#process")).when(taskProcessor).process(task);
- underTest.run();
+ assertThat(underTest.call()).isTrue();
inOrder.verify(ceLogging).initForTask(task);
inOrder.verify(taskProcessor).process(task);