]> source.dussan.org Git - sonarqube.git/commitdiff
SONAR-9525 use 30s delay between disabled workers run 2218/head
authorSébastien Lesaint <sebastien.lesaint@sonarsource.com>
Tue, 11 Jul 2017 16:08:07 +0000 (18:08 +0200)
committerSébastien Lesaint <sebastien.lesaint@sonarsource.com>
Tue, 18 Jul 2017 06:51:46 +0000 (08:51 +0200)
server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeProcessingSchedulerImpl.java
server/sonar-ce/src/test/java/org/sonar/ce/taskprocessor/CeProcessingSchedulerImplTest.java

index 7b12e6c7ee3bc9a922a4466cffe9df5e20cb018f..458f527f30785fa35bb13ba1dc6deb016300098c 100644 (file)
@@ -34,14 +34,13 @@ import org.sonar.ce.configuration.CeConfiguration;
 
 import static com.google.common.util.concurrent.Futures.addCallback;
 import static java.util.concurrent.TimeUnit.MILLISECONDS;
-import static org.sonar.ce.taskprocessor.CeWorker.Result.TASK_PROCESSED;
 
 public class CeProcessingSchedulerImpl implements CeProcessingScheduler, Startable {
   private static final Logger LOG = Loggers.get(CeProcessingSchedulerImpl.class);
+  private static final long DELAY_BETWEEN_DISABLED_TASKS = 30 * 1000L; // 30 seconds
 
   private final CeProcessingSchedulerExecutorService executorService;
-
-  private final long delayBetweenTasks;
+  private final long delayBetweenEnabledTasks;
   private final TimeUnit timeUnit;
   private final ChainingCallback[] chainingCallbacks;
 
@@ -49,7 +48,7 @@ public class CeProcessingSchedulerImpl implements CeProcessingScheduler, Startab
     CeProcessingSchedulerExecutorService processingExecutorService, CeWorkerFactory ceCeWorkerFactory) {
     this.executorService = processingExecutorService;
 
-    this.delayBetweenTasks = ceConfiguration.getQueuePollingDelay();
+    this.delayBetweenEnabledTasks = ceConfiguration.getQueuePollingDelay();
     this.timeUnit = MILLISECONDS;
 
     int threadWorkerCount = ceConfiguration.getWorkerMaxCount();
@@ -68,7 +67,7 @@ public class CeProcessingSchedulerImpl implements CeProcessingScheduler, Startab
   @Override
   public void startScheduling() {
     for (ChainingCallback chainingCallback : chainingCallbacks) {
-      ListenableScheduledFuture<CeWorker.Result> future = executorService.schedule(chainingCallback.worker, delayBetweenTasks, timeUnit);
+      ListenableScheduledFuture<CeWorker.Result> future = executorService.schedule(chainingCallback.worker, delayBetweenEnabledTasks, timeUnit);
       addCallback(future, chainingCallback, executorService);
     }
   }
@@ -93,10 +92,20 @@ public class CeProcessingSchedulerImpl implements CeProcessingScheduler, Startab
 
     @Override
     public void onSuccess(@Nullable CeWorker.Result result) {
-      if (result != null && result == TASK_PROCESSED) {
-        chainWithoutDelay();
+      if (result == null) {
+        chainWithEnabledTaskDelay();
       } else {
-        chainWithDelay();
+        switch (result) {
+          case DISABLED:
+            chainWithDisabledTaskDelay();
+            break;
+          case NO_TASK:
+            chainWithEnabledTaskDelay();
+            break;
+          case TASK_PROCESSED:
+          default:
+            chainWithoutDelay();
+        }
       }
     }
 
@@ -116,9 +125,16 @@ public class CeProcessingSchedulerImpl implements CeProcessingScheduler, Startab
       addCallback();
     }
 
-    private void chainWithDelay() {
+    private void chainWithEnabledTaskDelay() {
+      if (keepRunning()) {
+        workerFuture = executorService.schedule(worker, delayBetweenEnabledTasks, timeUnit);
+      }
+      addCallback();
+    }
+
+    private void chainWithDisabledTaskDelay() {
       if (keepRunning()) {
-        workerFuture = executorService.schedule(worker, delayBetweenTasks, timeUnit);
+        workerFuture = executorService.schedule(worker, DELAY_BETWEEN_DISABLED_TASKS, timeUnit);
       }
       addCallback();
     }
index 57f6c225f7ae40ae1a1425cff6d3ce3e9b3a455f..660dcc9a5ed114ab7c0006214a1cae9cdf0912d2 100644 (file)
@@ -60,6 +60,7 @@ import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
+import static org.sonar.ce.taskprocessor.CeWorker.Result.DISABLED;
 import static org.sonar.ce.taskprocessor.CeWorker.Result.NO_TASK;
 import static org.sonar.ce.taskprocessor.CeWorker.Result.TASK_PROCESSED;
 
@@ -76,12 +77,13 @@ public class CeProcessingSchedulerImplTest {
   private CeWorkerFactory ceWorkerFactory = new TestCeWorkerFactory(ceWorker);
   private StubCeProcessingSchedulerExecutorService processingExecutorService = new StubCeProcessingSchedulerExecutorService();
   private SchedulerCall regularDelayedPoll = new SchedulerCall(ceWorker, 2000L, MILLISECONDS);
+  private SchedulerCall extendedDelayedPoll = new SchedulerCall(ceWorker, 30000L, MILLISECONDS);
   private SchedulerCall notDelayedPoll = new SchedulerCall(ceWorker);
 
   private CeProcessingSchedulerImpl underTest = new CeProcessingSchedulerImpl(ceConfiguration, processingExecutorService, ceWorkerFactory);
 
   @Test
-  public void polls_without_delay_when_CeWorkerCallable_returns_true() throws Exception {
+  public void polls_without_delay_when_CeWorkerCallable_returns_TASK_PROCESSED() throws Exception {
     when(ceWorker.call())
       .thenReturn(TASK_PROCESSED)
       .thenThrow(ERROR_TO_INTERRUPT_CHAINING);
@@ -107,7 +109,7 @@ public class CeProcessingSchedulerImplTest {
   }
 
   @Test
-  public void polls_with_regular_delay_when_CeWorkerCallable_returns_false() throws Exception {
+  public void polls_with_regular_delay_when_CeWorkerCallable_returns_NO_TASK() throws Exception {
     when(ceWorker.call())
       .thenReturn(NO_TASK)
       .thenThrow(ERROR_TO_INTERRUPT_CHAINING);
@@ -119,6 +121,19 @@ public class CeProcessingSchedulerImplTest {
       regularDelayedPoll);
   }
 
+  @Test
+  public void polls_with_extended_delay_when_CeWorkerCallable_returns_DISABLED() throws Exception {
+    when(ceWorker.call())
+      .thenReturn(DISABLED)
+      .thenThrow(ERROR_TO_INTERRUPT_CHAINING);
+
+    startSchedulingAndRun();
+
+    assertThat(processingExecutorService.getSchedulerCalls()).containsExactly(
+      regularDelayedPoll,
+      extendedDelayedPoll);
+  }
+
   @Test
   public void startScheduling_schedules_CeWorkerCallable_at_fixed_rate_run_head_of_queue() throws Exception {
     when(ceWorker.call())