2 * SonarQube, open source software quality management tool.
3 * Copyright (C) 2008-2014 SonarSource
4 * mailto:contact AT sonarsource DOT com
6 * SonarQube is free software; you can redistribute it and/or
7 * modify it under the terms of the GNU Lesser General Public
8 * License as published by the Free Software Foundation; either
9 * version 3 of the License, or (at your option) any later version.
11 * SonarQube is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 * Lesser General Public License for more details.
16 * You should have received a copy of the GNU Lesser General Public License
17 * along with this program; if not, write to the Free Software Foundation,
18 * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
20 package org.sonar.server.computation.taskprocessor;
22 import com.google.common.util.concurrent.ListenableFuture;
23 import com.google.common.util.concurrent.ListenableScheduledFuture;
24 import com.google.common.util.concurrent.ListeningScheduledExecutorService;
25 import com.google.common.util.concurrent.MoreExecutors;
26 import java.util.ArrayList;
27 import java.util.Collection;
28 import java.util.List;
29 import java.util.Objects;
30 import java.util.Queue;
31 import java.util.concurrent.Callable;
32 import java.util.concurrent.ConcurrentLinkedQueue;
33 import java.util.concurrent.Delayed;
34 import java.util.concurrent.ExecutionException;
35 import java.util.concurrent.Future;
36 import java.util.concurrent.ScheduledExecutorService;
37 import java.util.concurrent.ScheduledFuture;
38 import java.util.concurrent.TimeUnit;
39 import java.util.concurrent.TimeoutException;
40 import javax.annotation.Nullable;
41 import javax.annotation.concurrent.Immutable;
42 import org.junit.Rule;
43 import org.junit.Test;
44 import org.junit.rules.Timeout;
46 import static org.assertj.core.api.Assertions.assertThat;
47 import static org.mockito.Mockito.mock;
48 import static org.mockito.Mockito.when;
50 public class CeProcessingSchedulerImplTest {
51 private static final Error ERROR_TO_INTERRUPT_CHAINING = new Error("Error should stop scheduling");
54 // due to risks of infinite chaining of tasks/futures, a timeout is required for safety
55 public Timeout timeout = Timeout.seconds(60);
57 private CeWorkerRunnable ceWorkerRunnable = mock(CeWorkerRunnable.class);
58 private StubCeProcessingSchedulerExecutorService processingExecutorService = new StubCeProcessingSchedulerExecutorService();
59 private SchedulerCall regularDelayedPoll = new SchedulerCall(ceWorkerRunnable, 2L, TimeUnit.SECONDS);
60 private SchedulerCall notDelayedPoll = new SchedulerCall(ceWorkerRunnable);
62 private CeProcessingSchedulerImpl underTest = new CeProcessingSchedulerImpl(processingExecutorService, ceWorkerRunnable);
65 public void polls_without_delay_when_CeWorkerRunnable_returns_true() throws Exception {
66 when(ceWorkerRunnable.call())
68 .thenThrow(ERROR_TO_INTERRUPT_CHAINING);
70 startSchedulingAndRun();
72 assertThat(processingExecutorService.getSchedulerCalls()).containsOnly(
79 public void polls_without_delay_when_CeWorkerRunnable_throws_Exception_but_not_Error() throws Exception {
80 when(ceWorkerRunnable.call())
81 .thenThrow(new Exception("Exception is followed by a poll without delay"))
82 .thenThrow(ERROR_TO_INTERRUPT_CHAINING);
84 startSchedulingAndRun();
86 assertThat(processingExecutorService.getSchedulerCalls()).containsExactly(
93 public void polls_with_regular_delay_when_CeWorkerRunnable_returns_false() throws Exception {
94 when(ceWorkerRunnable.call())
96 .thenThrow(ERROR_TO_INTERRUPT_CHAINING);
98 startSchedulingAndRun();
100 assertThat(processingExecutorService.getSchedulerCalls()).containsExactly(
107 public void startScheduling_schedules_CeWorkerRunnable_at_fixed_rate_run_head_of_queue() throws Exception {
108 when(ceWorkerRunnable.call())
114 .thenThrow(new Exception("IAE should not cause scheduling to stop"))
118 .thenThrow(ERROR_TO_INTERRUPT_CHAINING);
120 startSchedulingAndRun();
122 assertThat(processingExecutorService.getSchedulerCalls()).containsExactly(
137 public void stop_cancels_next_polling_and_does_not_add_any_new_one() throws Exception {
138 when(ceWorkerRunnable.call())
146 .thenThrow(ERROR_TO_INTERRUPT_CHAINING);
148 underTest.startScheduling();
150 int cancelledTaskFutureCount = 0;
152 while (processingExecutorService.futures.peek() != null) {
153 Future<?> future = processingExecutorService.futures.poll();
154 if (future.isCancelled()) {
155 cancelledTaskFutureCount++;
159 // call stop after second delayed polling
166 assertThat(cancelledTaskFutureCount).isEqualTo(1);
167 assertThat(processingExecutorService.getSchedulerCalls()).containsExactly(
175 private void startSchedulingAndRun() throws ExecutionException, InterruptedException {
176 underTest.startScheduling();
178 // execute future synchronously
179 processingExecutorService.runFutures();
183 * A synchronous implementation of {@link CeProcessingSchedulerExecutorService} which exposes a synchronous
184 * method to execute futures it creates and exposes a method to retrieve logs of calls to
185 * {@link CeProcessingSchedulerExecutorService#schedule(Callable, long, TimeUnit)} which is used by
186 * {@link CeProcessingSchedulerImpl}.
188 private static class StubCeProcessingSchedulerExecutorService implements CeProcessingSchedulerExecutorService {
190 private final Queue<Future<?>> futures = new ConcurrentLinkedQueue<>();
191 private final ListeningScheduledExecutorService delegate = MoreExecutors.listeningDecorator(new SynchronousStubExecutorService());
193 private final List<SchedulerCall> schedulerCalls = new ArrayList<>();
195 public List<SchedulerCall> getSchedulerCalls() {
196 return schedulerCalls;
199 public void runFutures() throws ExecutionException, InterruptedException {
200 while (futures.peek() != null) {
201 Future<?> future = futures.poll();
202 if (!future.isCancelled()) {
209 public <V> ListenableScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
210 this.schedulerCalls.add(new SchedulerCall(callable, delay, unit));
211 return delegate.schedule(callable, delay, unit);
215 public <T> ListenableFuture<T> submit(Callable<T> task) {
216 this.schedulerCalls.add(new SchedulerCall(task));
217 return delegate.submit(task);
222 throw new UnsupportedOperationException("stop() not implemented");
225 // ////////////// delegated methods ////////////////
228 public ListenableScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
229 return delegate.schedule(command, delay, unit);
233 public ListenableScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) {
234 return delegate.scheduleAtFixedRate(command, initialDelay, period, unit);
238 public ListenableScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) {
239 return delegate.scheduleWithFixedDelay(command, initialDelay, delay, unit);
243 public void shutdown() {
248 public List<Runnable> shutdownNow() {
249 return delegate.shutdownNow();
253 public boolean isShutdown() {
254 return delegate.isShutdown();
258 public boolean isTerminated() {
259 return delegate.isTerminated();
263 public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
264 return delegate.awaitTermination(timeout, unit);
268 public <T> ListenableFuture<T> submit(Runnable task, T result) {
269 return delegate.submit(task, result);
273 public ListenableFuture<?> submit(Runnable task) {
274 return delegate.submit(task);
278 public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException {
279 return delegate.invokeAll(tasks);
283 public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException {
284 return delegate.invokeAll(tasks, timeout, unit);
288 public <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException {
289 return delegate.invokeAny(tasks);
293 public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
294 return delegate.invokeAny(tasks, timeout, unit);
298 public void execute(Runnable command) {
299 delegate.execute(command);
303 * A partial (only 3 methods) implementation of ScheduledExecutorService which stores futures it creates into
304 * {@link StubCeProcessingSchedulerExecutorService#futures}.
306 private class SynchronousStubExecutorService implements ScheduledExecutorService {
308 public ScheduledFuture<?> schedule(final Runnable command, long delay, TimeUnit unit) {
309 ScheduledFuture<Void> res = new AbstractPartiallyImplementedScheduledFuture<Void>() {
311 public Void get() throws InterruptedException, ExecutionException {
321 public <V> ScheduledFuture<V> schedule(final Callable<V> callable, long delay, TimeUnit unit) {
322 ScheduledFuture<V> res = new AbstractPartiallyImplementedScheduledFuture<V>() {
325 public V get() throws InterruptedException, ExecutionException {
327 return callable.call();
328 } catch (Exception e) {
329 throw new ExecutionException(e);
338 public <T> Future<T> submit(final Callable<T> task) {
339 Future<T> res = new AbstractPartiallyImplementedFuture<T>() {
342 public T get() throws InterruptedException, ExecutionException {
345 } catch (Exception e) {
346 throw new ExecutionException(e);
356 public void execute(Runnable command) {
360 // ///////// unsupported operations ///////////
363 public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) {
364 throw new UnsupportedOperationException("scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) not implemented");
368 public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) {
369 throw new UnsupportedOperationException("scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) not implemented");
373 public void shutdown() {
374 throw new UnsupportedOperationException("shutdown() not implemented");
378 public List<Runnable> shutdownNow() {
379 throw new UnsupportedOperationException("shutdownNow() not implemented");
383 public boolean isShutdown() {
384 throw new UnsupportedOperationException("isShutdown() not implemented");
388 public boolean isTerminated() {
389 throw new UnsupportedOperationException("isTerminated() not implemented");
393 public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
394 throw new UnsupportedOperationException("awaitTermination(long timeout, TimeUnit unit) not implemented");
398 public <T> Future<T> submit(Runnable task, T result) {
399 throw new UnsupportedOperationException("submit(Runnable task, T result) not implemented");
403 public Future<?> submit(Runnable task) {
404 throw new UnsupportedOperationException("submit(Runnable task) not implemented");
408 public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException {
409 throw new UnsupportedOperationException("invokeAll(Collection<? extends Callable<T>> tasks) not implemented");
413 public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException {
414 throw new UnsupportedOperationException("invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) not implemented");
418 public <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException {
419 throw new UnsupportedOperationException("invokeAny(Collection<? extends Callable<T>> tasks) not implemented");
423 public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
424 throw new UnsupportedOperationException("invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) not implemented");
429 private static abstract class AbstractPartiallyImplementedScheduledFuture<V> extends AbstractPartiallyImplementedFuture<V> implements ScheduledFuture<V> {
431 public long getDelay(TimeUnit unit) {
432 throw new UnsupportedOperationException("getDelay(TimeUnit unit) not implemented");
436 public int compareTo(Delayed o) {
437 throw new UnsupportedOperationException("compareTo(Delayed o) not implemented");
442 private static abstract class AbstractPartiallyImplementedFuture<T> implements Future<T> {
443 private boolean cancelled = false;
446 public boolean cancel(boolean mayInterruptIfRunning) {
447 this.cancelled = true;
452 public boolean isCancelled() {
453 return this.cancelled;
457 public boolean isDone() {
458 throw new UnsupportedOperationException("isDone() not implemented");
462 public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
463 throw new UnsupportedOperationException("get(long timeout, TimeUnit unit) not implemented");
468 * Used to log parameters of calls to {@link CeProcessingSchedulerExecutorService#schedule(Callable, long, TimeUnit)}
471 private static final class SchedulerCall {
472 private final Callable<?> callable;
473 private final long delay;
474 private final TimeUnit unit;
476 private SchedulerCall(Callable<?> callable, long delay, TimeUnit unit) {
477 this.callable = callable;
482 private SchedulerCall(Callable<?> callable) {
483 this.callable = callable;
485 this.unit = TimeUnit.NANOSECONDS;
489 public boolean equals(@Nullable Object o) {
493 if (o == null || getClass() != o.getClass()) {
496 SchedulerCall that = (SchedulerCall) o;
497 return delay == that.delay && callable == that.callable && unit.equals(that.unit);
501 public int hashCode() {
502 return Objects.hash(callable, delay, unit);
506 public String toString() {
507 return "SchedulerCall{" +
508 "callable=" + callable +