import static com.google.common.util.concurrent.Futures.addCallback;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static org.sonar.ce.taskprocessor.CeWorker.Result.TASK_PROCESSED;
public class CeProcessingSchedulerImpl implements CeProcessingScheduler, Startable {
private static final Logger LOG = Loggers.get(CeProcessingSchedulerImpl.class);
@Override
public void startScheduling() {
for (ChainingCallback chainingCallback : chainingCallbacks) {
- ListenableScheduledFuture<Boolean> future = executorService.schedule(chainingCallback.worker, delayBetweenTasks, timeUnit);
+ ListenableScheduledFuture<CeWorker.Result> future = executorService.schedule(chainingCallback.worker, delayBetweenTasks, timeUnit);
addCallback(future, chainingCallback, executorService);
}
}
}
}
- private class ChainingCallback implements FutureCallback<Boolean> {
+ private class ChainingCallback implements FutureCallback<CeWorker.Result> {
private final AtomicBoolean keepRunning = new AtomicBoolean(true);
private final CeWorker worker;
@CheckForNull
- private ListenableFuture<Boolean> workerFuture;
+ private ListenableFuture<CeWorker.Result> workerFuture;
public ChainingCallback(CeWorker worker) {
this.worker = worker;
}
@Override
- public void onSuccess(@Nullable Boolean result) {
- if (result != null && result) {
+ public void onSuccess(@Nullable CeWorker.Result result) {
+ if (result != null && result == TASK_PROCESSED) {
chainWithoutDelay();
} else {
chainWithDelay();
* {@link Callable#call()} returns a Boolean which is {@code true} when some a {@link CeTask} was processed,
* {@code false} otherwise.
*/
-public interface CeWorker extends Callable<Boolean> {
+public interface CeWorker extends Callable<CeWorker.Result> {
+ enum Result {
+ /** Worker found no task to process */
+ NO_TASK,
+ /** Worker found a task and processed it (either successfully or not) */
+ TASK_PROCESSED
+ }
+
/**
* Position of the current CeWorker among all the running workers.
*/
import org.sonar.db.ce.CeActivityDto;
import static java.lang.String.format;
+import static org.sonar.ce.taskprocessor.CeWorker.Result.NO_TASK;
+import static org.sonar.ce.taskprocessor.CeWorker.Result.TASK_PROCESSED;
public class CeWorkerImpl implements CeWorker {
}
@Override
- public Boolean call() throws Exception {
+ public Result call() throws Exception {
Optional<CeTask> ceTask = tryAndFindTaskToExecute();
if (!ceTask.isPresent()) {
- return false;
+ return NO_TASK;
}
try {
} catch (Exception e) {
LOG.error(format("An error occurred while executing task with uuid '%s'", ceTask.get().getUuid()), e);
}
- return true;
+ return TASK_PROCESSED;
}
private Optional<CeTask> tryAndFindTaskToExecute() {
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
+import static org.sonar.ce.taskprocessor.CeWorker.Result.NO_TASK;
+import static org.sonar.ce.taskprocessor.CeWorker.Result.TASK_PROCESSED;
public class CeProcessingSchedulerImplTest {
private static final Error ERROR_TO_INTERRUPT_CHAINING = new Error("Error should stop scheduling");
@Test
public void polls_without_delay_when_CeWorkerCallable_returns_true() throws Exception {
when(ceWorker.call())
- .thenReturn(true)
+ .thenReturn(TASK_PROCESSED)
.thenThrow(ERROR_TO_INTERRUPT_CHAINING);
startSchedulingAndRun();
assertThat(processingExecutorService.getSchedulerCalls()).containsOnly(
regularDelayedPoll,
- notDelayedPoll
- );
+ notDelayedPoll);
}
@Test
assertThat(processingExecutorService.getSchedulerCalls()).containsExactly(
regularDelayedPoll,
- notDelayedPoll
- );
+ notDelayedPoll);
}
@Test
public void polls_with_regular_delay_when_CeWorkerCallable_returns_false() throws Exception {
when(ceWorker.call())
- .thenReturn(false)
+ .thenReturn(NO_TASK)
.thenThrow(ERROR_TO_INTERRUPT_CHAINING);
startSchedulingAndRun();
assertThat(processingExecutorService.getSchedulerCalls()).containsExactly(
regularDelayedPoll,
- regularDelayedPoll
- );
+ regularDelayedPoll);
}
@Test
public void startScheduling_schedules_CeWorkerCallable_at_fixed_rate_run_head_of_queue() throws Exception {
when(ceWorker.call())
- .thenReturn(true)
- .thenReturn(true)
- .thenReturn(false)
- .thenReturn(true)
- .thenReturn(false)
+ .thenReturn(TASK_PROCESSED)
+ .thenReturn(TASK_PROCESSED)
+ .thenReturn(NO_TASK)
+ .thenReturn(TASK_PROCESSED)
+ .thenReturn(NO_TASK)
.thenThrow(new Exception("IAE should not cause scheduling to stop"))
- .thenReturn(false)
- .thenReturn(false)
- .thenReturn(false)
+ .thenReturn(NO_TASK)
+ .thenReturn(NO_TASK)
+ .thenReturn(NO_TASK)
.thenThrow(ERROR_TO_INTERRUPT_CHAINING);
startSchedulingAndRun();
notDelayedPoll,
regularDelayedPoll,
regularDelayedPoll,
- regularDelayedPoll
- );
+ regularDelayedPoll);
}
@Test
public void stop_cancels_next_polling_and_does_not_add_any_new_one() throws Exception {
when(ceWorker.call())
- .thenReturn(false)
- .thenReturn(true)
- .thenReturn(false)
- .thenReturn(false)
- .thenReturn(false)
- .thenReturn(false)
- .thenReturn(false)
+ .thenReturn(NO_TASK)
+ .thenReturn(TASK_PROCESSED)
+ .thenReturn(NO_TASK)
+ .thenReturn(NO_TASK)
+ .thenReturn(NO_TASK)
+ .thenReturn(NO_TASK)
+ .thenReturn(NO_TASK)
.thenThrow(ERROR_TO_INTERRUPT_CHAINING);
underTest.startScheduling();
regularDelayedPoll,
regularDelayedPoll,
notDelayedPoll,
- regularDelayedPoll
- );
+ regularDelayedPoll);
}
@Test
for (int i = 0; i < workerCount; i++) {
workers[i] = mock(CeWorker.class);
when(workers[i].call())
- .thenReturn(false)
+ .thenReturn(NO_TASK)
.thenThrow(ERROR_TO_INTERRUPT_CHAINING);
}
ListenableScheduledFuture listenableScheduledFuture = mock(ListenableScheduledFuture.class);
CeProcessingSchedulerExecutorService processingExecutorService = mock(CeProcessingSchedulerExecutorService.class);
- when(processingExecutorService.schedule(any(CeWorker.class), any(Long.class),any(TimeUnit.class))).thenReturn(listenableScheduledFuture);
+ when(processingExecutorService.schedule(any(CeWorker.class), any(Long.class), any(TimeUnit.class))).thenReturn(listenableScheduledFuture);
CeWorkerFactory ceWorkerFactory = spy(new TestCeWorkerFactory(workers));
CeProcessingSchedulerImpl underTest = new CeProcessingSchedulerImpl(ceConfiguration, processingExecutorService, ceWorkerFactory);
when(processingExecutorService.schedule(ceWorker, ceConfiguration.getQueuePollingDelay(), MILLISECONDS))
- .thenReturn(listenableScheduledFuture);
+ .thenReturn(listenableScheduledFuture);
underTest.startScheduling();
// No exception from TestCeWorkerFactory must be thrown
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyZeroInteractions;
import static org.mockito.Mockito.when;
+import static org.sonar.ce.taskprocessor.CeWorker.Result.NO_TASK;
+import static org.sonar.ce.taskprocessor.CeWorker.Result.TASK_PROCESSED;
public class CeWorkerImplTest {
public void no_pending_tasks_in_queue() throws Exception {
when(queue.peek(anyString())).thenReturn(Optional.empty());
- assertThat(underTest.call()).isFalse();
+ assertThat(underTest.call()).isEqualTo(NO_TASK);
verifyZeroInteractions(taskProcessor, ceLogging);
}
taskProcessorRepository.setNoProcessorForTask(CeTaskTypes.REPORT);
when(queue.peek(anyString())).thenReturn(Optional.of(task));
- assertThat(underTest.call()).isTrue();
+ assertThat(underTest.call()).isEqualTo(TASK_PROCESSED);
verifyWorkerUuid();
inOrder.verify(ceLogging).initForTask(task);
taskProcessorRepository.setProcessorForTask(task.getType(), taskProcessor);
when(queue.peek(anyString())).thenReturn(Optional.of(task));
- assertThat(underTest.call()).isTrue();
+ assertThat(underTest.call()).isEqualTo(TASK_PROCESSED);
verifyWorkerUuid();
inOrder.verify(ceLogging).initForTask(task);
taskProcessorRepository.setProcessorForTask(task.getType(), taskProcessor);
Throwable error = makeTaskProcessorFail(task);
- assertThat(underTest.call()).isTrue();
+ assertThat(underTest.call()).isEqualTo(TASK_PROCESSED);
verifyWorkerUuid();
inOrder.verify(ceLogging).initForTask(task);