]> source.dussan.org Git - sonarqube.git/commitdiff
SONAR-9525 make CeWorker#call return an enum rather than a Boolean
authorSébastien Lesaint <sebastien.lesaint@sonarsource.com>
Thu, 6 Jul 2017 12:17:11 +0000 (14:17 +0200)
committerSébastien Lesaint <sebastien.lesaint@sonarsource.com>
Mon, 17 Jul 2017 08:52:47 +0000 (10:52 +0200)
server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeProcessingSchedulerImpl.java
server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeWorker.java
server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeWorkerImpl.java
server/sonar-ce/src/test/java/org/sonar/ce/taskprocessor/CeProcessingSchedulerImplTest.java
server/sonar-ce/src/test/java/org/sonar/ce/taskprocessor/CeWorkerImplTest.java

index 44f22f74d30539034b552de3b6c84517561eb7d0..7b12e6c7ee3bc9a922a4466cffe9df5e20cb018f 100644 (file)
@@ -34,6 +34,7 @@ import org.sonar.ce.configuration.CeConfiguration;
 
 import static com.google.common.util.concurrent.Futures.addCallback;
 import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static org.sonar.ce.taskprocessor.CeWorker.Result.TASK_PROCESSED;
 
 public class CeProcessingSchedulerImpl implements CeProcessingScheduler, Startable {
   private static final Logger LOG = Loggers.get(CeProcessingSchedulerImpl.class);
@@ -67,7 +68,7 @@ public class CeProcessingSchedulerImpl implements CeProcessingScheduler, Startab
   @Override
   public void startScheduling() {
     for (ChainingCallback chainingCallback : chainingCallbacks) {
-      ListenableScheduledFuture<Boolean> future = executorService.schedule(chainingCallback.worker, delayBetweenTasks, timeUnit);
+      ListenableScheduledFuture<CeWorker.Result> future = executorService.schedule(chainingCallback.worker, delayBetweenTasks, timeUnit);
       addCallback(future, chainingCallback, executorService);
     }
   }
@@ -79,20 +80,20 @@ public class CeProcessingSchedulerImpl implements CeProcessingScheduler, Startab
     }
   }
 
-  private class ChainingCallback implements FutureCallback<Boolean> {
+  private class ChainingCallback implements FutureCallback<CeWorker.Result> {
     private final AtomicBoolean keepRunning = new AtomicBoolean(true);
     private final CeWorker worker;
 
     @CheckForNull
-    private ListenableFuture<Boolean> workerFuture;
+    private ListenableFuture<CeWorker.Result> workerFuture;
 
     public ChainingCallback(CeWorker worker) {
       this.worker = worker;
     }
 
     @Override
-    public void onSuccess(@Nullable Boolean result) {
-      if (result != null && result) {
+    public void onSuccess(@Nullable CeWorker.Result result) {
+      if (result != null && result == TASK_PROCESSED) {
         chainWithoutDelay();
       } else {
         chainWithDelay();
index f0da47fc356d9bacfab8092a87f2f6b16001e355..96bb7df9243e561be3bfdad5b3feda20a5022709 100644 (file)
@@ -28,7 +28,14 @@ import org.sonar.ce.queue.CeTask;
  * {@link Callable#call()} returns a Boolean which is {@code true} when some a {@link CeTask} was processed,
  * {@code false} otherwise.
  */
-public interface CeWorker extends Callable<Boolean> {
+public interface CeWorker extends Callable<CeWorker.Result> {
+  enum Result {
+    /** Worker found no task to process */
+    NO_TASK,
+    /** Worker found a task and processed it (either successfully or not) */
+    TASK_PROCESSED
+  }
+
   /**
    * Position of the current CeWorker among all the running workers.
    */
index 6052abb03d6f91cf7c2418db7f9a2f5b4a1595a7..44034a70cfc7957a4339083cc534c436707077e0 100644 (file)
@@ -31,6 +31,8 @@ import org.sonar.core.util.logs.Profiler;
 import org.sonar.db.ce.CeActivityDto;
 
 import static java.lang.String.format;
+import static org.sonar.ce.taskprocessor.CeWorker.Result.NO_TASK;
+import static org.sonar.ce.taskprocessor.CeWorker.Result.TASK_PROCESSED;
 
 public class CeWorkerImpl implements CeWorker {
 
@@ -52,10 +54,10 @@ public class CeWorkerImpl implements CeWorker {
   }
 
   @Override
-  public Boolean call() throws Exception {
+  public Result call() throws Exception {
     Optional<CeTask> ceTask = tryAndFindTaskToExecute();
     if (!ceTask.isPresent()) {
-      return false;
+      return NO_TASK;
     }
 
     try {
@@ -63,7 +65,7 @@ public class CeWorkerImpl implements CeWorker {
     } catch (Exception e) {
       LOG.error(format("An error occurred while executing task with uuid '%s'", ceTask.get().getUuid()), e);
     }
-    return true;
+    return TASK_PROCESSED;
   }
 
   private Optional<CeTask> tryAndFindTaskToExecute() {
index dc0b2598cab6c6f8f4d067b0316875820651b042..57f6c225f7ae40ae1a1425cff6d3ce3e9b3a455f 100644 (file)
@@ -60,6 +60,8 @@ import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
+import static org.sonar.ce.taskprocessor.CeWorker.Result.NO_TASK;
+import static org.sonar.ce.taskprocessor.CeWorker.Result.TASK_PROCESSED;
 
 public class CeProcessingSchedulerImplTest {
   private static final Error ERROR_TO_INTERRUPT_CHAINING = new Error("Error should stop scheduling");
@@ -81,15 +83,14 @@ public class CeProcessingSchedulerImplTest {
   @Test
   public void polls_without_delay_when_CeWorkerCallable_returns_true() throws Exception {
     when(ceWorker.call())
-      .thenReturn(true)
+      .thenReturn(TASK_PROCESSED)
       .thenThrow(ERROR_TO_INTERRUPT_CHAINING);
 
     startSchedulingAndRun();
 
     assertThat(processingExecutorService.getSchedulerCalls()).containsOnly(
       regularDelayedPoll,
-      notDelayedPoll
-      );
+      notDelayedPoll);
   }
 
   @Test
@@ -102,36 +103,34 @@ public class CeProcessingSchedulerImplTest {
 
     assertThat(processingExecutorService.getSchedulerCalls()).containsExactly(
       regularDelayedPoll,
-      notDelayedPoll
-      );
+      notDelayedPoll);
   }
 
   @Test
   public void polls_with_regular_delay_when_CeWorkerCallable_returns_false() throws Exception {
     when(ceWorker.call())
-      .thenReturn(false)
+      .thenReturn(NO_TASK)
       .thenThrow(ERROR_TO_INTERRUPT_CHAINING);
 
     startSchedulingAndRun();
 
     assertThat(processingExecutorService.getSchedulerCalls()).containsExactly(
       regularDelayedPoll,
-      regularDelayedPoll
-      );
+      regularDelayedPoll);
   }
 
   @Test
   public void startScheduling_schedules_CeWorkerCallable_at_fixed_rate_run_head_of_queue() throws Exception {
     when(ceWorker.call())
-      .thenReturn(true)
-      .thenReturn(true)
-      .thenReturn(false)
-      .thenReturn(true)
-      .thenReturn(false)
+      .thenReturn(TASK_PROCESSED)
+      .thenReturn(TASK_PROCESSED)
+      .thenReturn(NO_TASK)
+      .thenReturn(TASK_PROCESSED)
+      .thenReturn(NO_TASK)
       .thenThrow(new Exception("IAE should not cause scheduling to stop"))
-      .thenReturn(false)
-      .thenReturn(false)
-      .thenReturn(false)
+      .thenReturn(NO_TASK)
+      .thenReturn(NO_TASK)
+      .thenReturn(NO_TASK)
       .thenThrow(ERROR_TO_INTERRUPT_CHAINING);
 
     startSchedulingAndRun();
@@ -146,20 +145,19 @@ public class CeProcessingSchedulerImplTest {
       notDelayedPoll,
       regularDelayedPoll,
       regularDelayedPoll,
-      regularDelayedPoll
-      );
+      regularDelayedPoll);
   }
 
   @Test
   public void stop_cancels_next_polling_and_does_not_add_any_new_one() throws Exception {
     when(ceWorker.call())
-      .thenReturn(false)
-      .thenReturn(true)
-      .thenReturn(false)
-      .thenReturn(false)
-      .thenReturn(false)
-      .thenReturn(false)
-      .thenReturn(false)
+      .thenReturn(NO_TASK)
+      .thenReturn(TASK_PROCESSED)
+      .thenReturn(NO_TASK)
+      .thenReturn(NO_TASK)
+      .thenReturn(NO_TASK)
+      .thenReturn(NO_TASK)
+      .thenReturn(NO_TASK)
       .thenThrow(ERROR_TO_INTERRUPT_CHAINING);
 
     underTest.startScheduling();
@@ -185,8 +183,7 @@ public class CeProcessingSchedulerImplTest {
       regularDelayedPoll,
       regularDelayedPoll,
       notDelayedPoll,
-      regularDelayedPoll
-      );
+      regularDelayedPoll);
   }
 
   @Test
@@ -198,18 +195,18 @@ public class CeProcessingSchedulerImplTest {
     for (int i = 0; i < workerCount; i++) {
       workers[i] = mock(CeWorker.class);
       when(workers[i].call())
-        .thenReturn(false)
+        .thenReturn(NO_TASK)
         .thenThrow(ERROR_TO_INTERRUPT_CHAINING);
     }
 
     ListenableScheduledFuture listenableScheduledFuture = mock(ListenableScheduledFuture.class);
     CeProcessingSchedulerExecutorService processingExecutorService = mock(CeProcessingSchedulerExecutorService.class);
-    when(processingExecutorService.schedule(any(CeWorker.class), any(Long.class),any(TimeUnit.class))).thenReturn(listenableScheduledFuture);
+    when(processingExecutorService.schedule(any(CeWorker.class), any(Long.class), any(TimeUnit.class))).thenReturn(listenableScheduledFuture);
 
     CeWorkerFactory ceWorkerFactory = spy(new TestCeWorkerFactory(workers));
     CeProcessingSchedulerImpl underTest = new CeProcessingSchedulerImpl(ceConfiguration, processingExecutorService, ceWorkerFactory);
     when(processingExecutorService.schedule(ceWorker, ceConfiguration.getQueuePollingDelay(), MILLISECONDS))
-        .thenReturn(listenableScheduledFuture);
+      .thenReturn(listenableScheduledFuture);
 
     underTest.startScheduling();
     // No exception from TestCeWorkerFactory must be thrown
index 041418c5cf84d1bba68c0954177d5269379e04d0..73381b49dfac7591ed4bb6778f40e4c710b33bb4 100644 (file)
@@ -46,6 +46,8 @@ import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.verifyZeroInteractions;
 import static org.mockito.Mockito.when;
+import static org.sonar.ce.taskprocessor.CeWorker.Result.NO_TASK;
+import static org.sonar.ce.taskprocessor.CeWorker.Result.TASK_PROCESSED;
 
 public class CeWorkerImplTest {
 
@@ -73,7 +75,7 @@ public class CeWorkerImplTest {
   public void no_pending_tasks_in_queue() throws Exception {
     when(queue.peek(anyString())).thenReturn(Optional.empty());
 
-    assertThat(underTest.call()).isFalse();
+    assertThat(underTest.call()).isEqualTo(NO_TASK);
 
     verifyZeroInteractions(taskProcessor, ceLogging);
   }
@@ -84,7 +86,7 @@ public class CeWorkerImplTest {
     taskProcessorRepository.setNoProcessorForTask(CeTaskTypes.REPORT);
     when(queue.peek(anyString())).thenReturn(Optional.of(task));
 
-    assertThat(underTest.call()).isTrue();
+    assertThat(underTest.call()).isEqualTo(TASK_PROCESSED);
 
     verifyWorkerUuid();
     inOrder.verify(ceLogging).initForTask(task);
@@ -98,7 +100,7 @@ public class CeWorkerImplTest {
     taskProcessorRepository.setProcessorForTask(task.getType(), taskProcessor);
     when(queue.peek(anyString())).thenReturn(Optional.of(task));
 
-    assertThat(underTest.call()).isTrue();
+    assertThat(underTest.call()).isEqualTo(TASK_PROCESSED);
 
     verifyWorkerUuid();
     inOrder.verify(ceLogging).initForTask(task);
@@ -114,7 +116,7 @@ public class CeWorkerImplTest {
     taskProcessorRepository.setProcessorForTask(task.getType(), taskProcessor);
     Throwable error = makeTaskProcessorFail(task);
 
-    assertThat(underTest.call()).isTrue();
+    assertThat(underTest.call()).isEqualTo(TASK_PROCESSED);
 
     verifyWorkerUuid();
     inOrder.verify(ceLogging).initForTask(task);