From 441e1e45649d7f200e03519f71a689d9b5dd7e4c Mon Sep 17 00:00:00 2001 From: =?utf8?q?S=C3=A9bastien=20Lesaint?= Date: Thu, 6 Jul 2017 14:17:11 +0200 Subject: [PATCH] SONAR-9525 make CeWorker#call return an enum rather than a Boolean --- .../CeProcessingSchedulerImpl.java | 11 ++-- .../org/sonar/ce/taskprocessor/CeWorker.java | 9 ++- .../sonar/ce/taskprocessor/CeWorkerImpl.java | 8 ++- .../CeProcessingSchedulerImplTest.java | 57 +++++++++---------- .../ce/taskprocessor/CeWorkerImplTest.java | 10 ++-- 5 files changed, 52 insertions(+), 43 deletions(-) diff --git a/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeProcessingSchedulerImpl.java b/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeProcessingSchedulerImpl.java index 44f22f74d30..7b12e6c7ee3 100644 --- a/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeProcessingSchedulerImpl.java +++ b/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeProcessingSchedulerImpl.java @@ -34,6 +34,7 @@ import org.sonar.ce.configuration.CeConfiguration; import static com.google.common.util.concurrent.Futures.addCallback; import static java.util.concurrent.TimeUnit.MILLISECONDS; +import static org.sonar.ce.taskprocessor.CeWorker.Result.TASK_PROCESSED; public class CeProcessingSchedulerImpl implements CeProcessingScheduler, Startable { private static final Logger LOG = Loggers.get(CeProcessingSchedulerImpl.class); @@ -67,7 +68,7 @@ public class CeProcessingSchedulerImpl implements CeProcessingScheduler, Startab @Override public void startScheduling() { for (ChainingCallback chainingCallback : chainingCallbacks) { - ListenableScheduledFuture future = executorService.schedule(chainingCallback.worker, delayBetweenTasks, timeUnit); + ListenableScheduledFuture future = executorService.schedule(chainingCallback.worker, delayBetweenTasks, timeUnit); addCallback(future, chainingCallback, executorService); } } @@ -79,20 +80,20 @@ public class CeProcessingSchedulerImpl implements CeProcessingScheduler, Startab } } - private class ChainingCallback implements FutureCallback { + private class ChainingCallback implements FutureCallback { private final AtomicBoolean keepRunning = new AtomicBoolean(true); private final CeWorker worker; @CheckForNull - private ListenableFuture workerFuture; + private ListenableFuture workerFuture; public ChainingCallback(CeWorker worker) { this.worker = worker; } @Override - public void onSuccess(@Nullable Boolean result) { - if (result != null && result) { + public void onSuccess(@Nullable CeWorker.Result result) { + if (result != null && result == TASK_PROCESSED) { chainWithoutDelay(); } else { chainWithDelay(); diff --git a/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeWorker.java b/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeWorker.java index f0da47fc356..96bb7df9243 100644 --- a/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeWorker.java +++ b/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeWorker.java @@ -28,7 +28,14 @@ import org.sonar.ce.queue.CeTask; * {@link Callable#call()} returns a Boolean which is {@code true} when some a {@link CeTask} was processed, * {@code false} otherwise. */ -public interface CeWorker extends Callable { +public interface CeWorker extends Callable { + enum Result { + /** Worker found no task to process */ + NO_TASK, + /** Worker found a task and processed it (either successfully or not) */ + TASK_PROCESSED + } + /** * Position of the current CeWorker among all the running workers. */ diff --git a/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeWorkerImpl.java b/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeWorkerImpl.java index 6052abb03d6..44034a70cfc 100644 --- a/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeWorkerImpl.java +++ b/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeWorkerImpl.java @@ -31,6 +31,8 @@ import org.sonar.core.util.logs.Profiler; import org.sonar.db.ce.CeActivityDto; import static java.lang.String.format; +import static org.sonar.ce.taskprocessor.CeWorker.Result.NO_TASK; +import static org.sonar.ce.taskprocessor.CeWorker.Result.TASK_PROCESSED; public class CeWorkerImpl implements CeWorker { @@ -52,10 +54,10 @@ public class CeWorkerImpl implements CeWorker { } @Override - public Boolean call() throws Exception { + public Result call() throws Exception { Optional ceTask = tryAndFindTaskToExecute(); if (!ceTask.isPresent()) { - return false; + return NO_TASK; } try { @@ -63,7 +65,7 @@ public class CeWorkerImpl implements CeWorker { } catch (Exception e) { LOG.error(format("An error occurred while executing task with uuid '%s'", ceTask.get().getUuid()), e); } - return true; + return TASK_PROCESSED; } private Optional tryAndFindTaskToExecute() { diff --git a/server/sonar-ce/src/test/java/org/sonar/ce/taskprocessor/CeProcessingSchedulerImplTest.java b/server/sonar-ce/src/test/java/org/sonar/ce/taskprocessor/CeProcessingSchedulerImplTest.java index dc0b2598cab..57f6c225f7a 100644 --- a/server/sonar-ce/src/test/java/org/sonar/ce/taskprocessor/CeProcessingSchedulerImplTest.java +++ b/server/sonar-ce/src/test/java/org/sonar/ce/taskprocessor/CeProcessingSchedulerImplTest.java @@ -60,6 +60,8 @@ import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import static org.sonar.ce.taskprocessor.CeWorker.Result.NO_TASK; +import static org.sonar.ce.taskprocessor.CeWorker.Result.TASK_PROCESSED; public class CeProcessingSchedulerImplTest { private static final Error ERROR_TO_INTERRUPT_CHAINING = new Error("Error should stop scheduling"); @@ -81,15 +83,14 @@ public class CeProcessingSchedulerImplTest { @Test public void polls_without_delay_when_CeWorkerCallable_returns_true() throws Exception { when(ceWorker.call()) - .thenReturn(true) + .thenReturn(TASK_PROCESSED) .thenThrow(ERROR_TO_INTERRUPT_CHAINING); startSchedulingAndRun(); assertThat(processingExecutorService.getSchedulerCalls()).containsOnly( regularDelayedPoll, - notDelayedPoll - ); + notDelayedPoll); } @Test @@ -102,36 +103,34 @@ public class CeProcessingSchedulerImplTest { assertThat(processingExecutorService.getSchedulerCalls()).containsExactly( regularDelayedPoll, - notDelayedPoll - ); + notDelayedPoll); } @Test public void polls_with_regular_delay_when_CeWorkerCallable_returns_false() throws Exception { when(ceWorker.call()) - .thenReturn(false) + .thenReturn(NO_TASK) .thenThrow(ERROR_TO_INTERRUPT_CHAINING); startSchedulingAndRun(); assertThat(processingExecutorService.getSchedulerCalls()).containsExactly( regularDelayedPoll, - regularDelayedPoll - ); + regularDelayedPoll); } @Test public void startScheduling_schedules_CeWorkerCallable_at_fixed_rate_run_head_of_queue() throws Exception { when(ceWorker.call()) - .thenReturn(true) - .thenReturn(true) - .thenReturn(false) - .thenReturn(true) - .thenReturn(false) + .thenReturn(TASK_PROCESSED) + .thenReturn(TASK_PROCESSED) + .thenReturn(NO_TASK) + .thenReturn(TASK_PROCESSED) + .thenReturn(NO_TASK) .thenThrow(new Exception("IAE should not cause scheduling to stop")) - .thenReturn(false) - .thenReturn(false) - .thenReturn(false) + .thenReturn(NO_TASK) + .thenReturn(NO_TASK) + .thenReturn(NO_TASK) .thenThrow(ERROR_TO_INTERRUPT_CHAINING); startSchedulingAndRun(); @@ -146,20 +145,19 @@ public class CeProcessingSchedulerImplTest { notDelayedPoll, regularDelayedPoll, regularDelayedPoll, - regularDelayedPoll - ); + regularDelayedPoll); } @Test public void stop_cancels_next_polling_and_does_not_add_any_new_one() throws Exception { when(ceWorker.call()) - .thenReturn(false) - .thenReturn(true) - .thenReturn(false) - .thenReturn(false) - .thenReturn(false) - .thenReturn(false) - .thenReturn(false) + .thenReturn(NO_TASK) + .thenReturn(TASK_PROCESSED) + .thenReturn(NO_TASK) + .thenReturn(NO_TASK) + .thenReturn(NO_TASK) + .thenReturn(NO_TASK) + .thenReturn(NO_TASK) .thenThrow(ERROR_TO_INTERRUPT_CHAINING); underTest.startScheduling(); @@ -185,8 +183,7 @@ public class CeProcessingSchedulerImplTest { regularDelayedPoll, regularDelayedPoll, notDelayedPoll, - regularDelayedPoll - ); + regularDelayedPoll); } @Test @@ -198,18 +195,18 @@ public class CeProcessingSchedulerImplTest { for (int i = 0; i < workerCount; i++) { workers[i] = mock(CeWorker.class); when(workers[i].call()) - .thenReturn(false) + .thenReturn(NO_TASK) .thenThrow(ERROR_TO_INTERRUPT_CHAINING); } ListenableScheduledFuture listenableScheduledFuture = mock(ListenableScheduledFuture.class); CeProcessingSchedulerExecutorService processingExecutorService = mock(CeProcessingSchedulerExecutorService.class); - when(processingExecutorService.schedule(any(CeWorker.class), any(Long.class),any(TimeUnit.class))).thenReturn(listenableScheduledFuture); + when(processingExecutorService.schedule(any(CeWorker.class), any(Long.class), any(TimeUnit.class))).thenReturn(listenableScheduledFuture); CeWorkerFactory ceWorkerFactory = spy(new TestCeWorkerFactory(workers)); CeProcessingSchedulerImpl underTest = new CeProcessingSchedulerImpl(ceConfiguration, processingExecutorService, ceWorkerFactory); when(processingExecutorService.schedule(ceWorker, ceConfiguration.getQueuePollingDelay(), MILLISECONDS)) - .thenReturn(listenableScheduledFuture); + .thenReturn(listenableScheduledFuture); underTest.startScheduling(); // No exception from TestCeWorkerFactory must be thrown diff --git a/server/sonar-ce/src/test/java/org/sonar/ce/taskprocessor/CeWorkerImplTest.java b/server/sonar-ce/src/test/java/org/sonar/ce/taskprocessor/CeWorkerImplTest.java index 041418c5cf8..73381b49dfa 100644 --- a/server/sonar-ce/src/test/java/org/sonar/ce/taskprocessor/CeWorkerImplTest.java +++ b/server/sonar-ce/src/test/java/org/sonar/ce/taskprocessor/CeWorkerImplTest.java @@ -46,6 +46,8 @@ import static org.mockito.Mockito.spy; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyZeroInteractions; import static org.mockito.Mockito.when; +import static org.sonar.ce.taskprocessor.CeWorker.Result.NO_TASK; +import static org.sonar.ce.taskprocessor.CeWorker.Result.TASK_PROCESSED; public class CeWorkerImplTest { @@ -73,7 +75,7 @@ public class CeWorkerImplTest { public void no_pending_tasks_in_queue() throws Exception { when(queue.peek(anyString())).thenReturn(Optional.empty()); - assertThat(underTest.call()).isFalse(); + assertThat(underTest.call()).isEqualTo(NO_TASK); verifyZeroInteractions(taskProcessor, ceLogging); } @@ -84,7 +86,7 @@ public class CeWorkerImplTest { taskProcessorRepository.setNoProcessorForTask(CeTaskTypes.REPORT); when(queue.peek(anyString())).thenReturn(Optional.of(task)); - assertThat(underTest.call()).isTrue(); + assertThat(underTest.call()).isEqualTo(TASK_PROCESSED); verifyWorkerUuid(); inOrder.verify(ceLogging).initForTask(task); @@ -98,7 +100,7 @@ public class CeWorkerImplTest { taskProcessorRepository.setProcessorForTask(task.getType(), taskProcessor); when(queue.peek(anyString())).thenReturn(Optional.of(task)); - assertThat(underTest.call()).isTrue(); + assertThat(underTest.call()).isEqualTo(TASK_PROCESSED); verifyWorkerUuid(); inOrder.verify(ceLogging).initForTask(task); @@ -114,7 +116,7 @@ public class CeWorkerImplTest { taskProcessorRepository.setProcessorForTask(task.getType(), taskProcessor); Throwable error = makeTaskProcessorFail(task); - assertThat(underTest.call()).isTrue(); + assertThat(underTest.call()).isEqualTo(TASK_PROCESSED); verifyWorkerUuid(); inOrder.verify(ceLogging).initForTask(task); -- 2.39.5