]> source.dussan.org Git - sonarqube.git/blob
82b412fd43f101c1a293b02cf84fa8088daa251c
[sonarqube.git] /
1 /*
2  * SonarQube, open source software quality management tool.
3  * Copyright (C) 2008-2014 SonarSource
4  * mailto:contact AT sonarsource DOT com
5  *
6  * SonarQube is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU Lesser General Public
8  * License as published by the Free Software Foundation; either
9  * version 3 of the License, or (at your option) any later version.
10  *
11  * SonarQube is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14  * Lesser General Public License for more details.
15  *
16  * You should have received a copy of the GNU Lesser General Public License
17  * along with this program; if not, write to the Free Software Foundation,
18  * Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
19  */
20 package org.sonar.server.computation.taskprocessor;
21
22 import com.google.common.util.concurrent.ListenableFuture;
23 import com.google.common.util.concurrent.ListenableScheduledFuture;
24 import com.google.common.util.concurrent.ListeningScheduledExecutorService;
25 import com.google.common.util.concurrent.MoreExecutors;
26 import java.util.ArrayList;
27 import java.util.Collection;
28 import java.util.List;
29 import java.util.Objects;
30 import java.util.Queue;
31 import java.util.concurrent.Callable;
32 import java.util.concurrent.ConcurrentLinkedQueue;
33 import java.util.concurrent.Delayed;
34 import java.util.concurrent.ExecutionException;
35 import java.util.concurrent.Future;
36 import java.util.concurrent.ScheduledExecutorService;
37 import java.util.concurrent.ScheduledFuture;
38 import java.util.concurrent.TimeUnit;
39 import java.util.concurrent.TimeoutException;
40 import javax.annotation.Nullable;
41 import javax.annotation.concurrent.Immutable;
42 import org.junit.Rule;
43 import org.junit.Test;
44 import org.junit.rules.Timeout;
45
46 import static org.assertj.core.api.Assertions.assertThat;
47 import static org.mockito.Mockito.mock;
48 import static org.mockito.Mockito.when;
49
50 public class CeProcessingSchedulerImplTest {
51   private static final Error ERROR_TO_INTERRUPT_CHAINING = new Error("Error should stop scheduling");
52
53   @Rule
54   // due to risks of infinite chaining of tasks/futures, a timeout is required for safety
55   public Timeout timeout = Timeout.seconds(60);
56
57   private CeWorkerRunnable ceWorkerRunnable = mock(CeWorkerRunnable.class);
58   private StubCeProcessingSchedulerExecutorService processingExecutorService = new StubCeProcessingSchedulerExecutorService();
59   private SchedulerCall regularDelayedPoll = new SchedulerCall(ceWorkerRunnable, 2L, TimeUnit.SECONDS);
60   private SchedulerCall notDelayedPoll = new SchedulerCall(ceWorkerRunnable);
61
62   private CeProcessingSchedulerImpl underTest = new CeProcessingSchedulerImpl(processingExecutorService, ceWorkerRunnable);
63
64   @Test
65   public void polls_without_delay_when_CeWorkerRunnable_returns_true() throws Exception {
66     when(ceWorkerRunnable.call())
67         .thenReturn(true)
68         .thenThrow(ERROR_TO_INTERRUPT_CHAINING);
69
70     startSchedulingAndRun();
71
72     assertThat(processingExecutorService.getSchedulerCalls()).containsOnly(
73         regularDelayedPoll,
74         notDelayedPoll
75     );
76   }
77
78   @Test
79   public void polls_without_delay_when_CeWorkerRunnable_throws_Exception_but_not_Error() throws Exception {
80     when(ceWorkerRunnable.call())
81         .thenThrow(new Exception("Exception is followed by a poll without delay"))
82         .thenThrow(ERROR_TO_INTERRUPT_CHAINING);
83
84     startSchedulingAndRun();
85
86     assertThat(processingExecutorService.getSchedulerCalls()).containsExactly(
87         regularDelayedPoll,
88         notDelayedPoll
89     );
90   }
91
92   @Test
93   public void polls_with_regular_delay_when_CeWorkerRunnable_returns_false() throws Exception {
94     when(ceWorkerRunnable.call())
95         .thenReturn(false)
96         .thenThrow(ERROR_TO_INTERRUPT_CHAINING);
97
98     startSchedulingAndRun();
99
100     assertThat(processingExecutorService.getSchedulerCalls()).containsExactly(
101         regularDelayedPoll,
102         regularDelayedPoll
103     );
104   }
105
106   @Test
107   public void startScheduling_schedules_CeWorkerRunnable_at_fixed_rate_run_head_of_queue() throws Exception {
108     when(ceWorkerRunnable.call())
109         .thenReturn(true)
110         .thenReturn(true)
111         .thenReturn(false)
112         .thenReturn(true)
113         .thenReturn(false)
114         .thenThrow(new Exception("IAE should not cause scheduling to stop"))
115         .thenReturn(false)
116         .thenReturn(false)
117         .thenReturn(false)
118         .thenThrow(ERROR_TO_INTERRUPT_CHAINING);
119
120     startSchedulingAndRun();
121
122     assertThat(processingExecutorService.getSchedulerCalls()).containsExactly(
123         regularDelayedPoll,
124         notDelayedPoll,
125         notDelayedPoll,
126         regularDelayedPoll,
127         notDelayedPoll,
128         regularDelayedPoll,
129         notDelayedPoll,
130         regularDelayedPoll,
131         regularDelayedPoll,
132         regularDelayedPoll
133     );
134   }
135
136   @Test
137   public void stop_cancels_next_polling_and_does_not_add_any_new_one() throws Exception {
138     when(ceWorkerRunnable.call())
139         .thenReturn(false)
140         .thenReturn(true)
141         .thenReturn(false)
142         .thenReturn(false)
143         .thenReturn(false)
144         .thenReturn(false)
145         .thenReturn(false)
146         .thenThrow(ERROR_TO_INTERRUPT_CHAINING);
147
148     underTest.startScheduling();
149
150     int cancelledTaskFutureCount = 0;
151     int i = 0;
152     while (processingExecutorService.futures.peek() != null) {
153       Future<?> future = processingExecutorService.futures.poll();
154       if (future.isCancelled()) {
155         cancelledTaskFutureCount++;
156       } else {
157         future.get();
158       }
159       // call stop after second delayed polling
160       if (i == 1) {
161         underTest.stop();
162       }
163       i++;
164     }
165
166     assertThat(cancelledTaskFutureCount).isEqualTo(1);
167     assertThat(processingExecutorService.getSchedulerCalls()).containsExactly(
168         regularDelayedPoll,
169         regularDelayedPoll,
170         notDelayedPoll,
171         regularDelayedPoll
172     );
173   }
174
175   private void startSchedulingAndRun() throws ExecutionException, InterruptedException {
176     underTest.startScheduling();
177
178     // execute future synchronously
179     processingExecutorService.runFutures();
180   }
181
182   /**
183    * A synchronous implementation of {@link CeProcessingSchedulerExecutorService} which exposes a synchronous
184    * method to execute futures it creates and exposes a method to retrieve logs of calls to
185    * {@link CeProcessingSchedulerExecutorService#schedule(Callable, long, TimeUnit)} which is used by
186    * {@link CeProcessingSchedulerImpl}.
187    */
188   private static class StubCeProcessingSchedulerExecutorService implements CeProcessingSchedulerExecutorService {
189
190     private final Queue<Future<?>> futures = new ConcurrentLinkedQueue<>();
191     private final ListeningScheduledExecutorService delegate = MoreExecutors.listeningDecorator(new SynchronousStubExecutorService());
192
193     private final List<SchedulerCall> schedulerCalls = new ArrayList<>();
194
195     public List<SchedulerCall> getSchedulerCalls() {
196       return schedulerCalls;
197     }
198
199     public void runFutures() throws ExecutionException, InterruptedException {
200       while (futures.peek() != null) {
201         Future<?> future = futures.poll();
202         if (!future.isCancelled()) {
203           future.get();
204         }
205       }
206     }
207
208     @Override
209     public <V> ListenableScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
210       this.schedulerCalls.add(new SchedulerCall(callable, delay, unit));
211       return delegate.schedule(callable, delay, unit);
212     }
213
214     @Override
215     public <T> ListenableFuture<T> submit(Callable<T> task) {
216       this.schedulerCalls.add(new SchedulerCall(task));
217       return delegate.submit(task);
218     }
219
220     @Override
221     public void stop() {
222       throw new UnsupportedOperationException("stop() not implemented");
223     }
224
225     // ////////////// delegated methods ////////////////
226
227     @Override
228     public ListenableScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
229       return delegate.schedule(command, delay, unit);
230     }
231
232     @Override
233     public ListenableScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) {
234       return delegate.scheduleAtFixedRate(command, initialDelay, period, unit);
235     }
236
237     @Override
238     public ListenableScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) {
239       return delegate.scheduleWithFixedDelay(command, initialDelay, delay, unit);
240     }
241
242     @Override
243     public void shutdown() {
244       delegate.shutdown();
245     }
246
247     @Override
248     public List<Runnable> shutdownNow() {
249       return delegate.shutdownNow();
250     }
251
252     @Override
253     public boolean isShutdown() {
254       return delegate.isShutdown();
255     }
256
257     @Override
258     public boolean isTerminated() {
259       return delegate.isTerminated();
260     }
261
262     @Override
263     public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
264       return delegate.awaitTermination(timeout, unit);
265     }
266
267     @Override
268     public <T> ListenableFuture<T> submit(Runnable task, T result) {
269       return delegate.submit(task, result);
270     }
271
272     @Override
273     public ListenableFuture<?> submit(Runnable task) {
274       return delegate.submit(task);
275     }
276
277     @Override
278     public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException {
279       return delegate.invokeAll(tasks);
280     }
281
282     @Override
283     public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException {
284       return delegate.invokeAll(tasks, timeout, unit);
285     }
286
287     @Override
288     public <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException {
289       return delegate.invokeAny(tasks);
290     }
291
292     @Override
293     public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
294       return delegate.invokeAny(tasks, timeout, unit);
295     }
296
297     @Override
298     public void execute(Runnable command) {
299       delegate.execute(command);
300     }
301
302     /**
303      * A partial (only 3 methods) implementation of ScheduledExecutorService which stores futures it creates into
304      * {@link StubCeProcessingSchedulerExecutorService#futures}.
305      */
306     private class SynchronousStubExecutorService implements ScheduledExecutorService {
307       @Override
308       public ScheduledFuture<?> schedule(final Runnable command, long delay, TimeUnit unit) {
309         ScheduledFuture<Void> res = new AbstractPartiallyImplementedScheduledFuture<Void>() {
310           @Override
311           public Void get() throws InterruptedException, ExecutionException {
312             command.run();
313             return null;
314           }
315         };
316         futures.add(res);
317         return res;
318       }
319
320       @Override
321       public <V> ScheduledFuture<V> schedule(final Callable<V> callable, long delay, TimeUnit unit) {
322         ScheduledFuture<V> res = new AbstractPartiallyImplementedScheduledFuture<V>() {
323
324           @Override
325           public V get() throws InterruptedException, ExecutionException {
326             try {
327               return callable.call();
328             } catch (Exception e) {
329               throw new ExecutionException(e);
330             }
331           }
332         };
333         futures.add(res);
334         return res;
335       }
336
337       @Override
338       public <T> Future<T> submit(final Callable<T> task) {
339         Future<T> res = new AbstractPartiallyImplementedFuture<T>() {
340
341           @Override
342           public T get() throws InterruptedException, ExecutionException {
343             try {
344               return task.call();
345             } catch (Exception e) {
346               throw new ExecutionException(e);
347             }
348           }
349
350         };
351         futures.add(res);
352         return res;
353       }
354
355       @Override
356       public void execute(Runnable command) {
357         command.run();
358       }
359
360       // ///////// unsupported operations ///////////
361
362       @Override
363       public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) {
364         throw new UnsupportedOperationException("scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) not implemented");
365       }
366
367       @Override
368       public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) {
369         throw new UnsupportedOperationException("scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) not implemented");
370       }
371
372       @Override
373       public void shutdown() {
374         throw new UnsupportedOperationException("shutdown() not implemented");
375       }
376
377       @Override
378       public List<Runnable> shutdownNow() {
379         throw new UnsupportedOperationException("shutdownNow() not implemented");
380       }
381
382       @Override
383       public boolean isShutdown() {
384         throw new UnsupportedOperationException("isShutdown() not implemented");
385       }
386
387       @Override
388       public boolean isTerminated() {
389         throw new UnsupportedOperationException("isTerminated() not implemented");
390       }
391
392       @Override
393       public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
394         throw new UnsupportedOperationException("awaitTermination(long timeout, TimeUnit unit) not implemented");
395       }
396
397       @Override
398       public <T> Future<T> submit(Runnable task, T result) {
399         throw new UnsupportedOperationException("submit(Runnable task, T result) not implemented");
400       }
401
402       @Override
403       public Future<?> submit(Runnable task) {
404         throw new UnsupportedOperationException("submit(Runnable task) not implemented");
405       }
406
407       @Override
408       public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException {
409         throw new UnsupportedOperationException("invokeAll(Collection<? extends Callable<T>> tasks) not implemented");
410       }
411
412       @Override
413       public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException {
414         throw new UnsupportedOperationException("invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) not implemented");
415       }
416
417       @Override
418       public <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException {
419         throw new UnsupportedOperationException("invokeAny(Collection<? extends Callable<T>> tasks) not implemented");
420       }
421
422       @Override
423       public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
424         throw new UnsupportedOperationException("invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) not implemented");
425       }
426     }
427   }
428
429   private static abstract class AbstractPartiallyImplementedScheduledFuture<V> extends AbstractPartiallyImplementedFuture<V> implements ScheduledFuture<V> {
430     @Override
431     public long getDelay(TimeUnit unit) {
432       throw new UnsupportedOperationException("getDelay(TimeUnit unit) not implemented");
433     }
434
435     @Override
436     public int compareTo(Delayed o) {
437       throw new UnsupportedOperationException("compareTo(Delayed o) not implemented");
438     }
439
440   }
441
442   private static abstract class AbstractPartiallyImplementedFuture<T> implements Future<T> {
443     private boolean cancelled = false;
444
445     @Override
446     public boolean cancel(boolean mayInterruptIfRunning) {
447       this.cancelled = true;
448       return true;
449     }
450
451     @Override
452     public boolean isCancelled() {
453       return this.cancelled;
454     }
455
456     @Override
457     public boolean isDone() {
458       throw new UnsupportedOperationException("isDone() not implemented");
459     }
460
461     @Override
462     public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
463       throw new UnsupportedOperationException("get(long timeout, TimeUnit unit) not implemented");
464     }
465   }
466
467   /**
468    * Used to log parameters of calls to {@link CeProcessingSchedulerExecutorService#schedule(Callable, long, TimeUnit)}
469    */
470   @Immutable
471   private static final class SchedulerCall {
472     private final Callable<?> callable;
473     private final long delay;
474     private final TimeUnit unit;
475
476     private SchedulerCall(Callable<?> callable, long delay, TimeUnit unit) {
477       this.callable = callable;
478       this.delay = delay;
479       this.unit = unit;
480     }
481
482     private SchedulerCall(Callable<?> callable) {
483       this.callable = callable;
484       this.delay = -63366;
485       this.unit = TimeUnit.NANOSECONDS;
486     }
487
488     @Override
489     public boolean equals(@Nullable Object o) {
490       if (this == o) {
491         return true;
492       }
493       if (o == null || getClass() != o.getClass()) {
494         return false;
495       }
496       SchedulerCall that = (SchedulerCall) o;
497       return delay == that.delay && callable == that.callable && unit.equals(that.unit);
498     }
499
500     @Override
501     public int hashCode() {
502       return Objects.hash(callable, delay, unit);
503     }
504
505     @Override
506     public String toString() {
507       return "SchedulerCall{" +
508           "callable=" + callable +
509           ", delay=" + delay +
510           ", unit=" + unit +
511           '}';
512     }
513   }
514
515 }