import static com.google.common.util.concurrent.Futures.addCallback;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
-import static org.sonar.ce.taskprocessor.CeWorker.Result.TASK_PROCESSED;
public class CeProcessingSchedulerImpl implements CeProcessingScheduler, Startable {
private static final Logger LOG = Loggers.get(CeProcessingSchedulerImpl.class);
+ private static final long DELAY_BETWEEN_DISABLED_TASKS = 30 * 1000L; // 30 seconds
private final CeProcessingSchedulerExecutorService executorService;
-
- private final long delayBetweenTasks;
+ private final long delayBetweenEnabledTasks;
private final TimeUnit timeUnit;
private final ChainingCallback[] chainingCallbacks;
CeProcessingSchedulerExecutorService processingExecutorService, CeWorkerFactory ceCeWorkerFactory) {
this.executorService = processingExecutorService;
- this.delayBetweenTasks = ceConfiguration.getQueuePollingDelay();
+ this.delayBetweenEnabledTasks = ceConfiguration.getQueuePollingDelay();
this.timeUnit = MILLISECONDS;
int threadWorkerCount = ceConfiguration.getWorkerMaxCount();
@Override
public void startScheduling() {
for (ChainingCallback chainingCallback : chainingCallbacks) {
- ListenableScheduledFuture<CeWorker.Result> future = executorService.schedule(chainingCallback.worker, delayBetweenTasks, timeUnit);
+ ListenableScheduledFuture<CeWorker.Result> future = executorService.schedule(chainingCallback.worker, delayBetweenEnabledTasks, timeUnit);
addCallback(future, chainingCallback, executorService);
}
}
@Override
public void onSuccess(@Nullable CeWorker.Result result) {
- if (result != null && result == TASK_PROCESSED) {
- chainWithoutDelay();
+ if (result == null) {
+ chainWithEnabledTaskDelay();
} else {
- chainWithDelay();
+ switch (result) {
+ case DISABLED:
+ chainWithDisabledTaskDelay();
+ break;
+ case NO_TASK:
+ chainWithEnabledTaskDelay();
+ break;
+ case TASK_PROCESSED:
+ default:
+ chainWithoutDelay();
+ }
}
}
addCallback();
}
- private void chainWithDelay() {
+ private void chainWithEnabledTaskDelay() {
+ if (keepRunning()) {
+ workerFuture = executorService.schedule(worker, delayBetweenEnabledTasks, timeUnit);
+ }
+ addCallback();
+ }
+
+ private void chainWithDisabledTaskDelay() {
if (keepRunning()) {
- workerFuture = executorService.schedule(worker, delayBetweenTasks, timeUnit);
+ workerFuture = executorService.schedule(worker, DELAY_BETWEEN_DISABLED_TASKS, timeUnit);
}
addCallback();
}
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
+import static org.sonar.ce.taskprocessor.CeWorker.Result.DISABLED;
import static org.sonar.ce.taskprocessor.CeWorker.Result.NO_TASK;
import static org.sonar.ce.taskprocessor.CeWorker.Result.TASK_PROCESSED;
private CeWorkerFactory ceWorkerFactory = new TestCeWorkerFactory(ceWorker);
private StubCeProcessingSchedulerExecutorService processingExecutorService = new StubCeProcessingSchedulerExecutorService();
private SchedulerCall regularDelayedPoll = new SchedulerCall(ceWorker, 2000L, MILLISECONDS);
+ private SchedulerCall extendedDelayedPoll = new SchedulerCall(ceWorker, 30000L, MILLISECONDS);
private SchedulerCall notDelayedPoll = new SchedulerCall(ceWorker);
private CeProcessingSchedulerImpl underTest = new CeProcessingSchedulerImpl(ceConfiguration, processingExecutorService, ceWorkerFactory);
@Test
- public void polls_without_delay_when_CeWorkerCallable_returns_true() throws Exception {
+ public void polls_without_delay_when_CeWorkerCallable_returns_TASK_PROCESSED() throws Exception {
when(ceWorker.call())
.thenReturn(TASK_PROCESSED)
.thenThrow(ERROR_TO_INTERRUPT_CHAINING);
}
@Test
- public void polls_with_regular_delay_when_CeWorkerCallable_returns_false() throws Exception {
+ public void polls_with_regular_delay_when_CeWorkerCallable_returns_NO_TASK() throws Exception {
when(ceWorker.call())
.thenReturn(NO_TASK)
.thenThrow(ERROR_TO_INTERRUPT_CHAINING);
regularDelayedPoll);
}
+ @Test
+ public void polls_with_extended_delay_when_CeWorkerCallable_returns_DISABLED() throws Exception {
+ when(ceWorker.call())
+ .thenReturn(DISABLED)
+ .thenThrow(ERROR_TO_INTERRUPT_CHAINING);
+
+ startSchedulingAndRun();
+
+ assertThat(processingExecutorService.getSchedulerCalls()).containsExactly(
+ regularDelayedPoll,
+ extendedDelayedPoll);
+ }
+
@Test
public void startScheduling_schedules_CeWorkerCallable_at_fixed_rate_run_head_of_queue() throws Exception {
when(ceWorker.call())