From: Sébastien Lesaint Date: Tue, 1 Dec 2015 16:20:09 +0000 (+0100) Subject: SONAR-7088 faster consumption of pending tasks in CE X-Git-Tag: 5.3-RC1~96 X-Git-Url: https://source.dussan.org/?a=commitdiff_plain;h=refs%2Fpull%2F667%2Fhead;p=sonarqube.git SONAR-7088 faster consumption of pending tasks in CE --- diff --git a/server/sonar-server/src/main/java/org/sonar/server/computation/taskprocessor/CeProcessingSchedulerExecutorService.java b/server/sonar-server/src/main/java/org/sonar/server/computation/taskprocessor/CeProcessingSchedulerExecutorService.java index 3fc101ed7ab..33d02bf48aa 100644 --- a/server/sonar-server/src/main/java/org/sonar/server/computation/taskprocessor/CeProcessingSchedulerExecutorService.java +++ b/server/sonar-server/src/main/java/org/sonar/server/computation/taskprocessor/CeProcessingSchedulerExecutorService.java @@ -19,10 +19,11 @@ */ package org.sonar.server.computation.taskprocessor; -import org.sonar.server.util.StoppableScheduledExecutorService; +import com.google.common.util.concurrent.ListeningScheduledExecutorService; +import org.sonar.server.util.StoppableExecutorService; /** * The {@link java.util.concurrent.ExecutorService} responsible for running {@link CeWorkerRunnableImpl}. */ -public interface CeProcessingSchedulerExecutorService extends StoppableScheduledExecutorService { +public interface CeProcessingSchedulerExecutorService extends StoppableExecutorService, ListeningScheduledExecutorService { } diff --git a/server/sonar-server/src/main/java/org/sonar/server/computation/taskprocessor/CeProcessingSchedulerExecutorServiceImpl.java b/server/sonar-server/src/main/java/org/sonar/server/computation/taskprocessor/CeProcessingSchedulerExecutorServiceImpl.java index 8ea57466439..3970c17d460 100644 --- a/server/sonar-server/src/main/java/org/sonar/server/computation/taskprocessor/CeProcessingSchedulerExecutorServiceImpl.java +++ b/server/sonar-server/src/main/java/org/sonar/server/computation/taskprocessor/CeProcessingSchedulerExecutorServiceImpl.java @@ -19,22 +19,62 @@ */ package org.sonar.server.computation.taskprocessor; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListenableScheduledFuture; +import com.google.common.util.concurrent.ListeningScheduledExecutorService; +import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.ThreadFactoryBuilder; +import java.util.concurrent.Callable; import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import org.sonar.server.util.AbstractStoppableScheduledExecutorServiceImpl; +import java.util.concurrent.TimeUnit; +import org.sonar.server.util.AbstractStoppableExecutorService; -public class CeProcessingSchedulerExecutorServiceImpl extends AbstractStoppableScheduledExecutorServiceImpl +public class CeProcessingSchedulerExecutorServiceImpl extends AbstractStoppableExecutorService implements CeProcessingSchedulerExecutorService { private static final String THREAD_NAME_PREFIX = "ce-processor-"; public CeProcessingSchedulerExecutorServiceImpl() { super( - Executors.newSingleThreadScheduledExecutor( - new ThreadFactoryBuilder() - .setNameFormat(THREAD_NAME_PREFIX + "%d") - .setPriority(Thread.MIN_PRIORITY) - .build())); + MoreExecutors.listeningDecorator( + Executors.newSingleThreadScheduledExecutor( + new ThreadFactoryBuilder() + .setNameFormat(THREAD_NAME_PREFIX + "%d") + .setPriority(Thread.MIN_PRIORITY) + .build()))); } + @Override + public ListenableFuture submit(Callable task) { + return delegate.submit(task); + } + + @Override + public ListenableFuture submit(Runnable task, T result) { + return delegate.submit(task, result); + } + + @Override + public ListenableFuture submit(Runnable task) { + return delegate.submit(task); + } + + @Override + public ListenableScheduledFuture schedule(Runnable command, long delay, TimeUnit unit) { + return delegate.schedule(command, delay, unit); + } + + @Override + public ListenableScheduledFuture schedule(Callable callable, long delay, TimeUnit unit) { + return delegate.schedule(callable, delay, unit); + } + + @Override + public ListenableScheduledFuture scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) { + return delegate.scheduleAtFixedRate(command, initialDelay, period, unit); + } + + @Override + public ListenableScheduledFuture scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) { + return delegate.scheduleWithFixedDelay(command, initialDelay, delay, unit); + } } diff --git a/server/sonar-server/src/main/java/org/sonar/server/computation/taskprocessor/CeProcessingSchedulerImpl.java b/server/sonar-server/src/main/java/org/sonar/server/computation/taskprocessor/CeProcessingSchedulerImpl.java index 4737c4010dd..f7c58020800 100644 --- a/server/sonar-server/src/main/java/org/sonar/server/computation/taskprocessor/CeProcessingSchedulerImpl.java +++ b/server/sonar-server/src/main/java/org/sonar/server/computation/taskprocessor/CeProcessingSchedulerImpl.java @@ -19,14 +19,24 @@ */ package org.sonar.server.computation.taskprocessor; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.ListenableScheduledFuture; import java.util.concurrent.TimeUnit; +import javax.annotation.Nullable; +import org.sonar.api.utils.log.Logger; +import org.sonar.api.utils.log.Loggers; + +import static com.google.common.util.concurrent.Futures.addCallback; +import static java.util.concurrent.TimeUnit.MILLISECONDS; +import static java.util.concurrent.TimeUnit.SECONDS; public class CeProcessingSchedulerImpl implements CeProcessingScheduler { + private static final Logger LOG = Loggers.get(CeProcessingSchedulerImpl.class); + private final CeProcessingSchedulerExecutorService executorService; private final CeWorkerRunnable workerRunnable; private final long delayBetweenTasks; - private final long delayForFirstStart; private final TimeUnit timeUnit; public CeProcessingSchedulerImpl(CeProcessingSchedulerExecutorService processingExecutorService, CeWorkerRunnable workerRunnable) { @@ -34,13 +44,43 @@ public class CeProcessingSchedulerImpl implements CeProcessingScheduler { this.workerRunnable = workerRunnable; this.delayBetweenTasks = 2; - this.delayForFirstStart = 0; - this.timeUnit = TimeUnit.SECONDS; + this.timeUnit = SECONDS; } @Override public void startScheduling() { - executorService.scheduleAtFixedRate(workerRunnable, delayForFirstStart, delayBetweenTasks, timeUnit); + ListenableScheduledFuture future = executorService.schedule(workerRunnable, delayBetweenTasks, timeUnit); + + FutureCallback chainingCallback = new ChainingCallback(); + addCallback(future, chainingCallback, executorService); } + private class ChainingCallback implements FutureCallback { + @Override + public void onSuccess(@Nullable Boolean result) { + if (result != null && result) { + chainWithoutDelay(); + } else { + chainTask(delayBetweenTasks, timeUnit); + } + } + + @Override + public void onFailure(Throwable t) { + if (!(t instanceof Error)) { + chainWithoutDelay(); + } else { + LOG.error("Compute Engine execution failed. Scheduled processing interrupted.", t); + } + } + + private void chainWithoutDelay() { + chainTask(1, MILLISECONDS); + } + + private void chainTask(long delay, TimeUnit unit) { + ListenableScheduledFuture future = executorService.schedule(workerRunnable, delay, unit); + addCallback(future, this, executorService); + } + } } diff --git a/server/sonar-server/src/main/java/org/sonar/server/computation/taskprocessor/CeWorkerRunnable.java b/server/sonar-server/src/main/java/org/sonar/server/computation/taskprocessor/CeWorkerRunnable.java index 2df1a66743e..d6c8fb86b89 100644 --- a/server/sonar-server/src/main/java/org/sonar/server/computation/taskprocessor/CeWorkerRunnable.java +++ b/server/sonar-server/src/main/java/org/sonar/server/computation/taskprocessor/CeWorkerRunnable.java @@ -19,11 +19,12 @@ */ package org.sonar.server.computation.taskprocessor; +import java.util.concurrent.Callable; import org.sonar.server.computation.queue.CeQueue; import org.sonar.server.computation.queue.CeTask; /** * Marker interface of the runnable in charge of polling the {@link CeQueue} and executing {@link CeTask}. */ -public interface CeWorkerRunnable extends Runnable { +public interface CeWorkerRunnable extends Callable { } diff --git a/server/sonar-server/src/main/java/org/sonar/server/computation/taskprocessor/CeWorkerRunnableImpl.java b/server/sonar-server/src/main/java/org/sonar/server/computation/taskprocessor/CeWorkerRunnableImpl.java index e20b6ac647e..a7b28ee9367 100644 --- a/server/sonar-server/src/main/java/org/sonar/server/computation/taskprocessor/CeWorkerRunnableImpl.java +++ b/server/sonar-server/src/main/java/org/sonar/server/computation/taskprocessor/CeWorkerRunnableImpl.java @@ -46,13 +46,14 @@ public class CeWorkerRunnableImpl implements CeWorkerRunnable { } @Override - public void run() { + public Boolean call() throws Exception { Optional ceTask = tryAndFindTaskToExecute(); if (!ceTask.isPresent()) { - return; + return false; } executeTask(ceTask.get()); + return true; } private Optional tryAndFindTaskToExecute() { diff --git a/server/sonar-server/src/test/java/org/sonar/server/computation/taskprocessor/CeProcessingSchedulerImplTest.java b/server/sonar-server/src/test/java/org/sonar/server/computation/taskprocessor/CeProcessingSchedulerImplTest.java index c94bbd4312d..f50ab2c8e48 100644 --- a/server/sonar-server/src/test/java/org/sonar/server/computation/taskprocessor/CeProcessingSchedulerImplTest.java +++ b/server/sonar-server/src/test/java/org/sonar/server/computation/taskprocessor/CeProcessingSchedulerImplTest.java @@ -19,26 +19,429 @@ */ package org.sonar.server.computation.taskprocessor; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListenableScheduledFuture; +import com.google.common.util.concurrent.ListeningScheduledExecutorService; +import com.google.common.util.concurrent.MoreExecutors; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Objects; +import java.util.Queue; +import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.Delayed; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import javax.annotation.Nullable; +import javax.annotation.concurrent.Immutable; +import org.junit.Rule; import org.junit.Test; +import org.junit.rules.Timeout; -import static org.mockito.Matchers.eq; -import static org.mockito.Matchers.same; +import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.verifyNoMoreInteractions; +import static org.mockito.Mockito.when; public class CeProcessingSchedulerImplTest { - private CeProcessingSchedulerExecutorService processingExecutorService = mock(CeProcessingSchedulerExecutorService.class); - private CeWorkerRunnable workerRunnable = mock(CeWorkerRunnable.class); - private CeProcessingSchedulerImpl underTest = new CeProcessingSchedulerImpl(processingExecutorService, workerRunnable); + private static final Error ERROR_TO_INTERRUPT_CHAINING = new Error("Error should stop scheduling"); + + @Rule + // due to risks of infinite chaining of tasks/futures, a timeout is required for safety + public Timeout timeout = Timeout.seconds(1); + + private CeWorkerRunnable ceWorkerRunnable = mock(CeWorkerRunnable.class); + private StubCeProcessingSchedulerExecutorService processingExecutorService = new StubCeProcessingSchedulerExecutorService(); + private ScheduleCall regularDelayedPoll = new ScheduleCall(ceWorkerRunnable, 2L, TimeUnit.SECONDS); + private ScheduleCall notDelayedPoll = new ScheduleCall(ceWorkerRunnable, 1L, TimeUnit.MILLISECONDS); + + private CeProcessingSchedulerImpl underTest = new CeProcessingSchedulerImpl(processingExecutorService, ceWorkerRunnable); + + @Test + public void polls_without_delay_when_CeWorkerRunnable_returns_true() throws Exception { + when(ceWorkerRunnable.call()) + .thenReturn(true) + .thenThrow(ERROR_TO_INTERRUPT_CHAINING); + + startSchedulingAndRun(); + + assertThat(processingExecutorService.getScheduleCalls()).containsOnly( + regularDelayedPoll, + notDelayedPoll + ); + } + + @Test + public void polls_without_delay_when_CeWorkerRunnable_throws_Exception_but_not_Error() throws Exception { + when(ceWorkerRunnable.call()) + .thenThrow(new Exception("Exception is followed by a poll without delay")) + .thenThrow(ERROR_TO_INTERRUPT_CHAINING); + + startSchedulingAndRun(); + + assertThat(processingExecutorService.getScheduleCalls()).containsExactly( + regularDelayedPoll, + notDelayedPoll + ); + } + + @Test + public void polls_with_regular_delay_when_CeWorkerRunnable_returns_false() throws Exception { + when(ceWorkerRunnable.call()) + .thenReturn(false) + .thenThrow(ERROR_TO_INTERRUPT_CHAINING); + + startSchedulingAndRun(); + + assertThat(processingExecutorService.getScheduleCalls()).containsExactly( + regularDelayedPoll, + regularDelayedPoll + ); + } @Test - public void startScheduling_schedules_CeWorkerRunnable_at_fixed_rate_run_head_of_queue() { + public void startScheduling_schedules_CeWorkerRunnable_at_fixed_rate_run_head_of_queue() throws Exception { + when(ceWorkerRunnable.call()) + .thenReturn(true) + .thenReturn(true) + .thenReturn(false) + .thenReturn(true) + .thenReturn(false) + .thenThrow(new Exception("IAE should not cause scheduling to stop")) + .thenReturn(false) + .thenReturn(false) + .thenReturn(false) + .thenThrow(ERROR_TO_INTERRUPT_CHAINING); + + startSchedulingAndRun(); + + assertThat(processingExecutorService.getScheduleCalls()).containsExactly( + regularDelayedPoll, + notDelayedPoll, + notDelayedPoll, + regularDelayedPoll, + notDelayedPoll, + regularDelayedPoll, + notDelayedPoll, + regularDelayedPoll, + regularDelayedPoll, + regularDelayedPoll + ); + } + + private void startSchedulingAndRun() throws ExecutionException, InterruptedException { underTest.startScheduling(); - verify(processingExecutorService).scheduleAtFixedRate(same(workerRunnable), eq(0L), eq(2L), eq(TimeUnit.SECONDS)); - verifyNoMoreInteractions(processingExecutorService); + // execute future synchronously + processingExecutorService.runFutures(); + } + + /** + * A synchronous implementation of {@link CeProcessingSchedulerExecutorService} which exposes a synchronous + * method to execute futures it creates and exposes a method to retrieve logs of calls to + * {@link CeProcessingSchedulerExecutorService#schedule(Callable, long, TimeUnit)} which is used by + * {@link CeProcessingSchedulerImpl}. + */ + private static class StubCeProcessingSchedulerExecutorService implements CeProcessingSchedulerExecutorService { + + private final Queue> futures = new ConcurrentLinkedQueue<>(); + private final ListeningScheduledExecutorService delegate = MoreExecutors.listeningDecorator(new SynchronousStubExecutorService()); + + private final List scheduleCalls = new ArrayList<>(); + + public List getScheduleCalls() { + return scheduleCalls; + } + + public void runFutures() throws ExecutionException, InterruptedException { + while (futures.peek() != null) { + futures.poll().get(); + } + } + + @Override + public ListenableScheduledFuture schedule(Callable callable, long delay, TimeUnit unit) { + this.scheduleCalls.add(new ScheduleCall(callable, delay, unit)); + return delegate.schedule(callable, delay, unit); + } + + @Override + public void stop() { + throw new UnsupportedOperationException("stop() not implemented"); + } + + // ////////////// delegated methods //////////////// + + @Override + public ListenableScheduledFuture schedule(Runnable command, long delay, TimeUnit unit) { + return delegate.schedule(command, delay, unit); + } + + @Override + public ListenableScheduledFuture scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) { + return delegate.scheduleAtFixedRate(command, initialDelay, period, unit); + } + + @Override + public ListenableScheduledFuture scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) { + return delegate.scheduleWithFixedDelay(command, initialDelay, delay, unit); + } + + @Override + public void shutdown() { + delegate.shutdown(); + } + + @Override + public List shutdownNow() { + return delegate.shutdownNow(); + } + + @Override + public boolean isShutdown() { + return delegate.isShutdown(); + } + + @Override + public boolean isTerminated() { + return delegate.isTerminated(); + } + + @Override + public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { + return delegate.awaitTermination(timeout, unit); + } + + @Override + public ListenableFuture submit(Callable task) { + return delegate.submit(task); + } + + @Override + public ListenableFuture submit(Runnable task, T result) { + return delegate.submit(task, result); + } + + @Override + public ListenableFuture submit(Runnable task) { + return delegate.submit(task); + } + + @Override + public List> invokeAll(Collection> tasks) throws InterruptedException { + return delegate.invokeAll(tasks); + } + + @Override + public List> invokeAll(Collection> tasks, long timeout, TimeUnit unit) throws InterruptedException { + return delegate.invokeAll(tasks, timeout, unit); + } + + @Override + public T invokeAny(Collection> tasks) throws InterruptedException, ExecutionException { + return delegate.invokeAny(tasks); + } + + @Override + public T invokeAny(Collection> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { + return delegate.invokeAny(tasks, timeout, unit); + } + + @Override + public void execute(Runnable command) { + delegate.execute(command); + } + + /** + * A partial (only 3 methods) implementation of ScheduledExecutorService which stores futures it creates into + * {@link StubCeProcessingSchedulerExecutorService#futures}. + */ + private class SynchronousStubExecutorService implements ScheduledExecutorService { + @Override + public ScheduledFuture schedule(final Runnable command, long delay, TimeUnit unit) { + ScheduledFuture res = new AbstractPartiallyImplementedScheduledFuture() { + @Override + public Void get() throws InterruptedException, ExecutionException { + command.run(); + return null; + } + }; + futures.add(res); + return res; + } + + @Override + public ScheduledFuture schedule(final Callable callable, long delay, TimeUnit unit) { + ScheduledFuture res = new AbstractPartiallyImplementedScheduledFuture() { + + @Override + public V get() throws InterruptedException, ExecutionException { + try { + return callable.call(); + } catch (Exception e) { + throw new ExecutionException(e); + } + } + }; + futures.add(res); + return res; + } + + @Override + public void execute(Runnable command) { + command.run(); + } + + // ///////// unsupported operations /////////// + + @Override + public ScheduledFuture scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) { + throw new UnsupportedOperationException("scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) not implemented"); + } + + @Override + public ScheduledFuture scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) { + throw new UnsupportedOperationException("scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) not implemented"); + } + + @Override + public void shutdown() { + throw new UnsupportedOperationException("shutdown() not implemented"); + } + + @Override + public List shutdownNow() { + throw new UnsupportedOperationException("shutdownNow() not implemented"); + } + + @Override + public boolean isShutdown() { + throw new UnsupportedOperationException("isShutdown() not implemented"); + } + + @Override + public boolean isTerminated() { + throw new UnsupportedOperationException("isTerminated() not implemented"); + } + + @Override + public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { + throw new UnsupportedOperationException("awaitTermination(long timeout, TimeUnit unit) not implemented"); + } + + @Override + public Future submit(Callable task) { + throw new UnsupportedOperationException("submit(Callable task) not implemented"); + } + + @Override + public Future submit(Runnable task, T result) { + throw new UnsupportedOperationException("submit(Runnable task, T result) not implemented"); + } + + @Override + public Future submit(Runnable task) { + throw new UnsupportedOperationException("submit(Runnable task) not implemented"); + } + + @Override + public List> invokeAll(Collection> tasks) throws InterruptedException { + throw new UnsupportedOperationException("invokeAll(Collection> tasks) not implemented"); + } + + @Override + public List> invokeAll(Collection> tasks, long timeout, TimeUnit unit) throws InterruptedException { + throw new UnsupportedOperationException("invokeAll(Collection> tasks, long timeout, TimeUnit unit) not implemented"); + } + + @Override + public T invokeAny(Collection> tasks) throws InterruptedException, ExecutionException { + throw new UnsupportedOperationException("invokeAny(Collection> tasks) not implemented"); + } + + @Override + public T invokeAny(Collection> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { + throw new UnsupportedOperationException("invokeAny(Collection> tasks, long timeout, TimeUnit unit) not implemented"); + } + } + } + + private static abstract class AbstractPartiallyImplementedScheduledFuture implements ScheduledFuture { + @Override + public long getDelay(TimeUnit unit) { + throw new UnsupportedOperationException("XXX not implemented"); + } + + @Override + public int compareTo(Delayed o) { + throw new UnsupportedOperationException("XXX not implemented"); + } + + @Override + public boolean cancel(boolean mayInterruptIfRunning) { + throw new UnsupportedOperationException("XXX not implemented"); + } + + @Override + public boolean isCancelled() { + throw new UnsupportedOperationException("XXX not implemented"); + } + + @Override + public boolean isDone() { + throw new UnsupportedOperationException("XXX not implemented"); + } + + @Override + public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { + throw new UnsupportedOperationException("XXX not implemented"); + } + } + + /** + * Used to log parameters of calls to {@link CeProcessingSchedulerExecutorService#schedule(Callable, long, TimeUnit)} + */ + @Immutable + private static final class ScheduleCall { + private final Callable callable; + private final long delay; + private final TimeUnit unit; + + private ScheduleCall(Callable callable, long delay, TimeUnit unit) { + this.callable = callable; + this.delay = delay; + this.unit = unit; + } + + @Override + public boolean equals(@Nullable Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + ScheduleCall that = (ScheduleCall) o; + return delay == that.delay && callable == that.callable && unit.equals(that.unit); + } + + @Override + public int hashCode() { + return Objects.hash(callable, delay, unit); + } + + @Override + public String toString() { + return "ScheduleCall{" + + "callable=" + callable + + ", delay=" + delay + + ", unit=" + unit + + '}'; + } } } diff --git a/server/sonar-server/src/test/java/org/sonar/server/computation/taskprocessor/CeWorkerRunnableImplTest.java b/server/sonar-server/src/test/java/org/sonar/server/computation/taskprocessor/CeWorkerRunnableImplTest.java index a95d8365421..305c5e321d4 100644 --- a/server/sonar-server/src/test/java/org/sonar/server/computation/taskprocessor/CeWorkerRunnableImplTest.java +++ b/server/sonar-server/src/test/java/org/sonar/server/computation/taskprocessor/CeWorkerRunnableImplTest.java @@ -32,6 +32,7 @@ import org.sonar.server.computation.queue.CeQueueImpl; import org.sonar.server.computation.queue.CeTask; import org.sonar.server.computation.taskprocessor.report.ReportTaskProcessor; +import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verifyZeroInteractions; @@ -52,18 +53,18 @@ public class CeWorkerRunnableImplTest { public void no_pending_tasks_in_queue() throws Exception { when(queue.peek()).thenReturn(Optional.absent()); - underTest.run(); + assertThat(underTest.call()).isFalse(); verifyZeroInteractions(taskProcessor, ceLogging); } @Test - public void fail_when_no_CeTaskProcessor_is_found_in_repository() { + public void fail_when_no_CeTaskProcessor_is_found_in_repository() throws Exception { CeTask task = new CeTask.Builder().setUuid("TASK_1").setType(CeTaskTypes.REPORT).setComponentUuid("PROJECT_1").setSubmitterLogin(null).build(); taskProcessorRepository.setNoProcessorForTask(CeTaskTypes.REPORT); when(queue.peek()).thenReturn(Optional.of(task)); - underTest.run(); + assertThat(underTest.call()).isTrue(); inOrder.verify(ceLogging).initForTask(task); inOrder.verify(queue).remove(task, CeActivityDto.Status.FAILED); @@ -76,7 +77,7 @@ public class CeWorkerRunnableImplTest { taskProcessorRepository.setProcessorForTask(task.getType(), taskProcessor); when(queue.peek()).thenReturn(Optional.of(task)); - underTest.run(); + assertThat(underTest.call()).isTrue(); inOrder.verify(ceLogging).initForTask(task); inOrder.verify(taskProcessor).process(task); @@ -91,7 +92,7 @@ public class CeWorkerRunnableImplTest { taskProcessorRepository.setProcessorForTask(task.getType(), taskProcessor); doThrow(new IllegalStateException("simulate exception thrown by TaskProcessor#process")).when(taskProcessor).process(task); - underTest.run(); + assertThat(underTest.call()).isTrue(); inOrder.verify(ceLogging).initForTask(task); inOrder.verify(taskProcessor).process(task);