*/
package org.sonar.server.computation.taskprocessor;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListenableScheduledFuture;
+import com.google.common.util.concurrent.ListeningScheduledExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import java.util.concurrent.Callable;
import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import org.sonar.server.util.AbstractStoppableScheduledExecutorServiceImpl;
+import java.util.concurrent.TimeUnit;
+import org.sonar.server.util.AbstractStoppableExecutorService;
-public class CeProcessingSchedulerExecutorServiceImpl extends AbstractStoppableScheduledExecutorServiceImpl<ScheduledExecutorService>
+public class CeProcessingSchedulerExecutorServiceImpl extends AbstractStoppableExecutorService<ListeningScheduledExecutorService>
implements CeProcessingSchedulerExecutorService {
private static final String THREAD_NAME_PREFIX = "ce-processor-";
public CeProcessingSchedulerExecutorServiceImpl() {
super(
- Executors.newSingleThreadScheduledExecutor(
- new ThreadFactoryBuilder()
- .setNameFormat(THREAD_NAME_PREFIX + "%d")
- .setPriority(Thread.MIN_PRIORITY)
- .build()));
+ MoreExecutors.listeningDecorator(
+ Executors.newSingleThreadScheduledExecutor(
+ new ThreadFactoryBuilder()
+ .setNameFormat(THREAD_NAME_PREFIX + "%d")
+ .setPriority(Thread.MIN_PRIORITY)
+ .build())));
}
+ @Override
+ public <T> ListenableFuture<T> submit(Callable<T> task) {
+ return delegate.submit(task);
+ }
+
+ @Override
+ public <T> ListenableFuture<T> submit(Runnable task, T result) {
+ return delegate.submit(task, result);
+ }
+
+ @Override
+ public ListenableFuture<?> submit(Runnable task) {
+ return delegate.submit(task);
+ }
+
+ @Override
+ public ListenableScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
+ return delegate.schedule(command, delay, unit);
+ }
+
+ @Override
+ public <V> ListenableScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
+ return delegate.schedule(callable, delay, unit);
+ }
+
+ @Override
+ public ListenableScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) {
+ return delegate.scheduleAtFixedRate(command, initialDelay, period, unit);
+ }
+
+ @Override
+ public ListenableScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) {
+ return delegate.scheduleWithFixedDelay(command, initialDelay, delay, unit);
+ }
}
*/
package org.sonar.server.computation.taskprocessor;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListenableScheduledFuture;
+import com.google.common.util.concurrent.ListeningScheduledExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Objects;
+import java.util.Queue;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.Delayed;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.Immutable;
+import org.junit.Rule;
import org.junit.Test;
+import org.junit.rules.Timeout;
-import static org.mockito.Matchers.eq;
-import static org.mockito.Matchers.same;
+import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.verifyNoMoreInteractions;
+import static org.mockito.Mockito.when;
public class CeProcessingSchedulerImplTest {
- private CeProcessingSchedulerExecutorService processingExecutorService = mock(CeProcessingSchedulerExecutorService.class);
- private CeWorkerRunnable workerRunnable = mock(CeWorkerRunnable.class);
- private CeProcessingSchedulerImpl underTest = new CeProcessingSchedulerImpl(processingExecutorService, workerRunnable);
+ private static final Error ERROR_TO_INTERRUPT_CHAINING = new Error("Error should stop scheduling");
+
+ @Rule
+ // due to risks of infinite chaining of tasks/futures, a timeout is required for safety
+ public Timeout timeout = Timeout.seconds(1);
+
+ private CeWorkerRunnable ceWorkerRunnable = mock(CeWorkerRunnable.class);
+ private StubCeProcessingSchedulerExecutorService processingExecutorService = new StubCeProcessingSchedulerExecutorService();
+ private ScheduleCall regularDelayedPoll = new ScheduleCall(ceWorkerRunnable, 2L, TimeUnit.SECONDS);
+ private ScheduleCall notDelayedPoll = new ScheduleCall(ceWorkerRunnable, 1L, TimeUnit.MILLISECONDS);
+
+ private CeProcessingSchedulerImpl underTest = new CeProcessingSchedulerImpl(processingExecutorService, ceWorkerRunnable);
+
+ @Test
+ public void polls_without_delay_when_CeWorkerRunnable_returns_true() throws Exception {
+ when(ceWorkerRunnable.call())
+ .thenReturn(true)
+ .thenThrow(ERROR_TO_INTERRUPT_CHAINING);
+
+ startSchedulingAndRun();
+
+ assertThat(processingExecutorService.getScheduleCalls()).containsOnly(
+ regularDelayedPoll,
+ notDelayedPoll
+ );
+ }
+
+ @Test
+ public void polls_without_delay_when_CeWorkerRunnable_throws_Exception_but_not_Error() throws Exception {
+ when(ceWorkerRunnable.call())
+ .thenThrow(new Exception("Exception is followed by a poll without delay"))
+ .thenThrow(ERROR_TO_INTERRUPT_CHAINING);
+
+ startSchedulingAndRun();
+
+ assertThat(processingExecutorService.getScheduleCalls()).containsExactly(
+ regularDelayedPoll,
+ notDelayedPoll
+ );
+ }
+
+ @Test
+ public void polls_with_regular_delay_when_CeWorkerRunnable_returns_false() throws Exception {
+ when(ceWorkerRunnable.call())
+ .thenReturn(false)
+ .thenThrow(ERROR_TO_INTERRUPT_CHAINING);
+
+ startSchedulingAndRun();
+
+ assertThat(processingExecutorService.getScheduleCalls()).containsExactly(
+ regularDelayedPoll,
+ regularDelayedPoll
+ );
+ }
@Test
- public void startScheduling_schedules_CeWorkerRunnable_at_fixed_rate_run_head_of_queue() {
+ public void startScheduling_schedules_CeWorkerRunnable_at_fixed_rate_run_head_of_queue() throws Exception {
+ when(ceWorkerRunnable.call())
+ .thenReturn(true)
+ .thenReturn(true)
+ .thenReturn(false)
+ .thenReturn(true)
+ .thenReturn(false)
+ .thenThrow(new Exception("IAE should not cause scheduling to stop"))
+ .thenReturn(false)
+ .thenReturn(false)
+ .thenReturn(false)
+ .thenThrow(ERROR_TO_INTERRUPT_CHAINING);
+
+ startSchedulingAndRun();
+
+ assertThat(processingExecutorService.getScheduleCalls()).containsExactly(
+ regularDelayedPoll,
+ notDelayedPoll,
+ notDelayedPoll,
+ regularDelayedPoll,
+ notDelayedPoll,
+ regularDelayedPoll,
+ notDelayedPoll,
+ regularDelayedPoll,
+ regularDelayedPoll,
+ regularDelayedPoll
+ );
+ }
+
+ private void startSchedulingAndRun() throws ExecutionException, InterruptedException {
underTest.startScheduling();
- verify(processingExecutorService).scheduleAtFixedRate(same(workerRunnable), eq(0L), eq(2L), eq(TimeUnit.SECONDS));
- verifyNoMoreInteractions(processingExecutorService);
+ // execute future synchronously
+ processingExecutorService.runFutures();
+ }
+
+ /**
+ * A synchronous implementation of {@link CeProcessingSchedulerExecutorService} which exposes a synchronous
+ * method to execute futures it creates and exposes a method to retrieve logs of calls to
+ * {@link CeProcessingSchedulerExecutorService#schedule(Callable, long, TimeUnit)} which is used by
+ * {@link CeProcessingSchedulerImpl}.
+ */
+ private static class StubCeProcessingSchedulerExecutorService implements CeProcessingSchedulerExecutorService {
+
+ private final Queue<ScheduledFuture<?>> futures = new ConcurrentLinkedQueue<>();
+ private final ListeningScheduledExecutorService delegate = MoreExecutors.listeningDecorator(new SynchronousStubExecutorService());
+
+ private final List<ScheduleCall> scheduleCalls = new ArrayList<>();
+
+ public List<ScheduleCall> getScheduleCalls() {
+ return scheduleCalls;
+ }
+
+ public void runFutures() throws ExecutionException, InterruptedException {
+ while (futures.peek() != null) {
+ futures.poll().get();
+ }
+ }
+
+ @Override
+ public <V> ListenableScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
+ this.scheduleCalls.add(new ScheduleCall(callable, delay, unit));
+ return delegate.schedule(callable, delay, unit);
+ }
+
+ @Override
+ public void stop() {
+ throw new UnsupportedOperationException("stop() not implemented");
+ }
+
+ // ////////////// delegated methods ////////////////
+
+ @Override
+ public ListenableScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
+ return delegate.schedule(command, delay, unit);
+ }
+
+ @Override
+ public ListenableScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) {
+ return delegate.scheduleAtFixedRate(command, initialDelay, period, unit);
+ }
+
+ @Override
+ public ListenableScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) {
+ return delegate.scheduleWithFixedDelay(command, initialDelay, delay, unit);
+ }
+
+ @Override
+ public void shutdown() {
+ delegate.shutdown();
+ }
+
+ @Override
+ public List<Runnable> shutdownNow() {
+ return delegate.shutdownNow();
+ }
+
+ @Override
+ public boolean isShutdown() {
+ return delegate.isShutdown();
+ }
+
+ @Override
+ public boolean isTerminated() {
+ return delegate.isTerminated();
+ }
+
+ @Override
+ public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
+ return delegate.awaitTermination(timeout, unit);
+ }
+
+ @Override
+ public <T> ListenableFuture<T> submit(Callable<T> task) {
+ return delegate.submit(task);
+ }
+
+ @Override
+ public <T> ListenableFuture<T> submit(Runnable task, T result) {
+ return delegate.submit(task, result);
+ }
+
+ @Override
+ public ListenableFuture<?> submit(Runnable task) {
+ return delegate.submit(task);
+ }
+
+ @Override
+ public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException {
+ return delegate.invokeAll(tasks);
+ }
+
+ @Override
+ public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException {
+ return delegate.invokeAll(tasks, timeout, unit);
+ }
+
+ @Override
+ public <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException {
+ return delegate.invokeAny(tasks);
+ }
+
+ @Override
+ public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
+ return delegate.invokeAny(tasks, timeout, unit);
+ }
+
+ @Override
+ public void execute(Runnable command) {
+ delegate.execute(command);
+ }
+
+ /**
+ * A partial (only 3 methods) implementation of ScheduledExecutorService which stores futures it creates into
+ * {@link StubCeProcessingSchedulerExecutorService#futures}.
+ */
+ private class SynchronousStubExecutorService implements ScheduledExecutorService {
+ @Override
+ public ScheduledFuture<?> schedule(final Runnable command, long delay, TimeUnit unit) {
+ ScheduledFuture<Void> res = new AbstractPartiallyImplementedScheduledFuture<Void>() {
+ @Override
+ public Void get() throws InterruptedException, ExecutionException {
+ command.run();
+ return null;
+ }
+ };
+ futures.add(res);
+ return res;
+ }
+
+ @Override
+ public <V> ScheduledFuture<V> schedule(final Callable<V> callable, long delay, TimeUnit unit) {
+ ScheduledFuture<V> res = new AbstractPartiallyImplementedScheduledFuture<V>() {
+
+ @Override
+ public V get() throws InterruptedException, ExecutionException {
+ try {
+ return callable.call();
+ } catch (Exception e) {
+ throw new ExecutionException(e);
+ }
+ }
+ };
+ futures.add(res);
+ return res;
+ }
+
+ @Override
+ public void execute(Runnable command) {
+ command.run();
+ }
+
+ // ///////// unsupported operations ///////////
+
+ @Override
+ public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) {
+ throw new UnsupportedOperationException("scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) not implemented");
+ }
+
+ @Override
+ public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) {
+ throw new UnsupportedOperationException("scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) not implemented");
+ }
+
+ @Override
+ public void shutdown() {
+ throw new UnsupportedOperationException("shutdown() not implemented");
+ }
+
+ @Override
+ public List<Runnable> shutdownNow() {
+ throw new UnsupportedOperationException("shutdownNow() not implemented");
+ }
+
+ @Override
+ public boolean isShutdown() {
+ throw new UnsupportedOperationException("isShutdown() not implemented");
+ }
+
+ @Override
+ public boolean isTerminated() {
+ throw new UnsupportedOperationException("isTerminated() not implemented");
+ }
+
+ @Override
+ public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
+ throw new UnsupportedOperationException("awaitTermination(long timeout, TimeUnit unit) not implemented");
+ }
+
+ @Override
+ public <T> Future<T> submit(Callable<T> task) {
+ throw new UnsupportedOperationException("submit(Callable<T> task) not implemented");
+ }
+
+ @Override
+ public <T> Future<T> submit(Runnable task, T result) {
+ throw new UnsupportedOperationException("submit(Runnable task, T result) not implemented");
+ }
+
+ @Override
+ public Future<?> submit(Runnable task) {
+ throw new UnsupportedOperationException("submit(Runnable task) not implemented");
+ }
+
+ @Override
+ public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException {
+ throw new UnsupportedOperationException("invokeAll(Collection<? extends Callable<T>> tasks) not implemented");
+ }
+
+ @Override
+ public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException {
+ throw new UnsupportedOperationException("invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) not implemented");
+ }
+
+ @Override
+ public <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException {
+ throw new UnsupportedOperationException("invokeAny(Collection<? extends Callable<T>> tasks) not implemented");
+ }
+
+ @Override
+ public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
+ throw new UnsupportedOperationException("invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) not implemented");
+ }
+ }
+ }
+
+ private static abstract class AbstractPartiallyImplementedScheduledFuture<V> implements ScheduledFuture<V> {
+ @Override
+ public long getDelay(TimeUnit unit) {
+ throw new UnsupportedOperationException("XXX not implemented");
+ }
+
+ @Override
+ public int compareTo(Delayed o) {
+ throw new UnsupportedOperationException("XXX not implemented");
+ }
+
+ @Override
+ public boolean cancel(boolean mayInterruptIfRunning) {
+ throw new UnsupportedOperationException("XXX not implemented");
+ }
+
+ @Override
+ public boolean isCancelled() {
+ throw new UnsupportedOperationException("XXX not implemented");
+ }
+
+ @Override
+ public boolean isDone() {
+ throw new UnsupportedOperationException("XXX not implemented");
+ }
+
+ @Override
+ public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
+ throw new UnsupportedOperationException("XXX not implemented");
+ }
+ }
+
+ /**
+ * Used to log parameters of calls to {@link CeProcessingSchedulerExecutorService#schedule(Callable, long, TimeUnit)}
+ */
+ @Immutable
+ private static final class ScheduleCall {
+ private final Callable<?> callable;
+ private final long delay;
+ private final TimeUnit unit;
+
+ private ScheduleCall(Callable<?> callable, long delay, TimeUnit unit) {
+ this.callable = callable;
+ this.delay = delay;
+ this.unit = unit;
+ }
+
+ @Override
+ public boolean equals(@Nullable Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ ScheduleCall that = (ScheduleCall) o;
+ return delay == that.delay && callable == that.callable && unit.equals(that.unit);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(callable, delay, unit);
+ }
+
+ @Override
+ public String toString() {
+ return "ScheduleCall{" +
+ "callable=" + callable +
+ ", delay=" + delay +
+ ", unit=" + unit +
+ '}';
+ }
}
}