diff options
author | Sébastien Lesaint <sebastien.lesaint@sonarsource.com> | 2017-07-11 18:08:07 +0200 |
---|---|---|
committer | Sébastien Lesaint <sebastien.lesaint@sonarsource.com> | 2017-07-18 08:51:46 +0200 |
commit | 7210cafc32b2e496c295ef37f48938fe702f97dc (patch) | |
tree | 8cfa25aa02f4435fd68dea127da1b1b096cd550f /server/sonar-ce/src | |
parent | 7779619c9134b24eadb7b37d322a91d6b498e09d (diff) | |
download | sonarqube-7210cafc32b2e496c295ef37f48938fe702f97dc.tar.gz sonarqube-7210cafc32b2e496c295ef37f48938fe702f97dc.zip |
SONAR-9525 use 30s delay between disabled workers run
Diffstat (limited to 'server/sonar-ce/src')
2 files changed, 43 insertions, 12 deletions
diff --git a/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeProcessingSchedulerImpl.java b/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeProcessingSchedulerImpl.java index 7b12e6c7ee3..458f527f307 100644 --- a/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeProcessingSchedulerImpl.java +++ b/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeProcessingSchedulerImpl.java @@ -34,14 +34,13 @@ import org.sonar.ce.configuration.CeConfiguration; import static com.google.common.util.concurrent.Futures.addCallback; import static java.util.concurrent.TimeUnit.MILLISECONDS; -import static org.sonar.ce.taskprocessor.CeWorker.Result.TASK_PROCESSED; public class CeProcessingSchedulerImpl implements CeProcessingScheduler, Startable { private static final Logger LOG = Loggers.get(CeProcessingSchedulerImpl.class); + private static final long DELAY_BETWEEN_DISABLED_TASKS = 30 * 1000L; // 30 seconds private final CeProcessingSchedulerExecutorService executorService; - - private final long delayBetweenTasks; + private final long delayBetweenEnabledTasks; private final TimeUnit timeUnit; private final ChainingCallback[] chainingCallbacks; @@ -49,7 +48,7 @@ public class CeProcessingSchedulerImpl implements CeProcessingScheduler, Startab CeProcessingSchedulerExecutorService processingExecutorService, CeWorkerFactory ceCeWorkerFactory) { this.executorService = processingExecutorService; - this.delayBetweenTasks = ceConfiguration.getQueuePollingDelay(); + this.delayBetweenEnabledTasks = ceConfiguration.getQueuePollingDelay(); this.timeUnit = MILLISECONDS; int threadWorkerCount = ceConfiguration.getWorkerMaxCount(); @@ -68,7 +67,7 @@ public class CeProcessingSchedulerImpl implements CeProcessingScheduler, Startab @Override public void startScheduling() { for (ChainingCallback chainingCallback : chainingCallbacks) { - ListenableScheduledFuture<CeWorker.Result> future = executorService.schedule(chainingCallback.worker, delayBetweenTasks, timeUnit); + ListenableScheduledFuture<CeWorker.Result> future = executorService.schedule(chainingCallback.worker, delayBetweenEnabledTasks, timeUnit); addCallback(future, chainingCallback, executorService); } } @@ -93,10 +92,20 @@ public class CeProcessingSchedulerImpl implements CeProcessingScheduler, Startab @Override public void onSuccess(@Nullable CeWorker.Result result) { - if (result != null && result == TASK_PROCESSED) { - chainWithoutDelay(); + if (result == null) { + chainWithEnabledTaskDelay(); } else { - chainWithDelay(); + switch (result) { + case DISABLED: + chainWithDisabledTaskDelay(); + break; + case NO_TASK: + chainWithEnabledTaskDelay(); + break; + case TASK_PROCESSED: + default: + chainWithoutDelay(); + } } } @@ -116,9 +125,16 @@ public class CeProcessingSchedulerImpl implements CeProcessingScheduler, Startab addCallback(); } - private void chainWithDelay() { + private void chainWithEnabledTaskDelay() { + if (keepRunning()) { + workerFuture = executorService.schedule(worker, delayBetweenEnabledTasks, timeUnit); + } + addCallback(); + } + + private void chainWithDisabledTaskDelay() { if (keepRunning()) { - workerFuture = executorService.schedule(worker, delayBetweenTasks, timeUnit); + workerFuture = executorService.schedule(worker, DELAY_BETWEEN_DISABLED_TASKS, timeUnit); } addCallback(); } diff --git a/server/sonar-ce/src/test/java/org/sonar/ce/taskprocessor/CeProcessingSchedulerImplTest.java b/server/sonar-ce/src/test/java/org/sonar/ce/taskprocessor/CeProcessingSchedulerImplTest.java index 57f6c225f7a..660dcc9a5ed 100644 --- a/server/sonar-ce/src/test/java/org/sonar/ce/taskprocessor/CeProcessingSchedulerImplTest.java +++ b/server/sonar-ce/src/test/java/org/sonar/ce/taskprocessor/CeProcessingSchedulerImplTest.java @@ -60,6 +60,7 @@ import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import static org.sonar.ce.taskprocessor.CeWorker.Result.DISABLED; import static org.sonar.ce.taskprocessor.CeWorker.Result.NO_TASK; import static org.sonar.ce.taskprocessor.CeWorker.Result.TASK_PROCESSED; @@ -76,12 +77,13 @@ public class CeProcessingSchedulerImplTest { private CeWorkerFactory ceWorkerFactory = new TestCeWorkerFactory(ceWorker); private StubCeProcessingSchedulerExecutorService processingExecutorService = new StubCeProcessingSchedulerExecutorService(); private SchedulerCall regularDelayedPoll = new SchedulerCall(ceWorker, 2000L, MILLISECONDS); + private SchedulerCall extendedDelayedPoll = new SchedulerCall(ceWorker, 30000L, MILLISECONDS); private SchedulerCall notDelayedPoll = new SchedulerCall(ceWorker); private CeProcessingSchedulerImpl underTest = new CeProcessingSchedulerImpl(ceConfiguration, processingExecutorService, ceWorkerFactory); @Test - public void polls_without_delay_when_CeWorkerCallable_returns_true() throws Exception { + public void polls_without_delay_when_CeWorkerCallable_returns_TASK_PROCESSED() throws Exception { when(ceWorker.call()) .thenReturn(TASK_PROCESSED) .thenThrow(ERROR_TO_INTERRUPT_CHAINING); @@ -107,7 +109,7 @@ public class CeProcessingSchedulerImplTest { } @Test - public void polls_with_regular_delay_when_CeWorkerCallable_returns_false() throws Exception { + public void polls_with_regular_delay_when_CeWorkerCallable_returns_NO_TASK() throws Exception { when(ceWorker.call()) .thenReturn(NO_TASK) .thenThrow(ERROR_TO_INTERRUPT_CHAINING); @@ -120,6 +122,19 @@ public class CeProcessingSchedulerImplTest { } @Test + public void polls_with_extended_delay_when_CeWorkerCallable_returns_DISABLED() throws Exception { + when(ceWorker.call()) + .thenReturn(DISABLED) + .thenThrow(ERROR_TO_INTERRUPT_CHAINING); + + startSchedulingAndRun(); + + assertThat(processingExecutorService.getSchedulerCalls()).containsExactly( + regularDelayedPoll, + extendedDelayedPoll); + } + + @Test public void startScheduling_schedules_CeWorkerCallable_at_fixed_rate_run_head_of_queue() throws Exception { when(ceWorker.call()) .thenReturn(TASK_PROCESSED) |