aboutsummaryrefslogtreecommitdiffstats
path: root/server
diff options
context:
space:
mode:
authorSébastien Lesaint <sebastien.lesaint@sonarsource.com>2017-07-06 14:17:11 +0200
committerSébastien Lesaint <sebastien.lesaint@sonarsource.com>2017-07-17 10:52:47 +0200
commit441e1e45649d7f200e03519f71a689d9b5dd7e4c (patch)
tree8470fc7b069d4d85bb350a8cee0c9c627af2cac4 /server
parent9bf5fae306b2da2da063435fcb86b505420f46e6 (diff)
downloadsonarqube-441e1e45649d7f200e03519f71a689d9b5dd7e4c.tar.gz
sonarqube-441e1e45649d7f200e03519f71a689d9b5dd7e4c.zip
SONAR-9525 make CeWorker#call return an enum rather than a Boolean
Diffstat (limited to 'server')
-rw-r--r--server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeProcessingSchedulerImpl.java11
-rw-r--r--server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeWorker.java9
-rw-r--r--server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeWorkerImpl.java8
-rw-r--r--server/sonar-ce/src/test/java/org/sonar/ce/taskprocessor/CeProcessingSchedulerImplTest.java57
-rw-r--r--server/sonar-ce/src/test/java/org/sonar/ce/taskprocessor/CeWorkerImplTest.java10
5 files changed, 52 insertions, 43 deletions
diff --git a/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeProcessingSchedulerImpl.java b/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeProcessingSchedulerImpl.java
index 44f22f74d30..7b12e6c7ee3 100644
--- a/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeProcessingSchedulerImpl.java
+++ b/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeProcessingSchedulerImpl.java
@@ -34,6 +34,7 @@ import org.sonar.ce.configuration.CeConfiguration;
import static com.google.common.util.concurrent.Futures.addCallback;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static org.sonar.ce.taskprocessor.CeWorker.Result.TASK_PROCESSED;
public class CeProcessingSchedulerImpl implements CeProcessingScheduler, Startable {
private static final Logger LOG = Loggers.get(CeProcessingSchedulerImpl.class);
@@ -67,7 +68,7 @@ public class CeProcessingSchedulerImpl implements CeProcessingScheduler, Startab
@Override
public void startScheduling() {
for (ChainingCallback chainingCallback : chainingCallbacks) {
- ListenableScheduledFuture<Boolean> future = executorService.schedule(chainingCallback.worker, delayBetweenTasks, timeUnit);
+ ListenableScheduledFuture<CeWorker.Result> future = executorService.schedule(chainingCallback.worker, delayBetweenTasks, timeUnit);
addCallback(future, chainingCallback, executorService);
}
}
@@ -79,20 +80,20 @@ public class CeProcessingSchedulerImpl implements CeProcessingScheduler, Startab
}
}
- private class ChainingCallback implements FutureCallback<Boolean> {
+ private class ChainingCallback implements FutureCallback<CeWorker.Result> {
private final AtomicBoolean keepRunning = new AtomicBoolean(true);
private final CeWorker worker;
@CheckForNull
- private ListenableFuture<Boolean> workerFuture;
+ private ListenableFuture<CeWorker.Result> workerFuture;
public ChainingCallback(CeWorker worker) {
this.worker = worker;
}
@Override
- public void onSuccess(@Nullable Boolean result) {
- if (result != null && result) {
+ public void onSuccess(@Nullable CeWorker.Result result) {
+ if (result != null && result == TASK_PROCESSED) {
chainWithoutDelay();
} else {
chainWithDelay();
diff --git a/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeWorker.java b/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeWorker.java
index f0da47fc356..96bb7df9243 100644
--- a/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeWorker.java
+++ b/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeWorker.java
@@ -28,7 +28,14 @@ import org.sonar.ce.queue.CeTask;
* {@link Callable#call()} returns a Boolean which is {@code true} when some a {@link CeTask} was processed,
* {@code false} otherwise.
*/
-public interface CeWorker extends Callable<Boolean> {
+public interface CeWorker extends Callable<CeWorker.Result> {
+ enum Result {
+ /** Worker found no task to process */
+ NO_TASK,
+ /** Worker found a task and processed it (either successfully or not) */
+ TASK_PROCESSED
+ }
+
/**
* Position of the current CeWorker among all the running workers.
*/
diff --git a/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeWorkerImpl.java b/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeWorkerImpl.java
index 6052abb03d6..44034a70cfc 100644
--- a/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeWorkerImpl.java
+++ b/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeWorkerImpl.java
@@ -31,6 +31,8 @@ import org.sonar.core.util.logs.Profiler;
import org.sonar.db.ce.CeActivityDto;
import static java.lang.String.format;
+import static org.sonar.ce.taskprocessor.CeWorker.Result.NO_TASK;
+import static org.sonar.ce.taskprocessor.CeWorker.Result.TASK_PROCESSED;
public class CeWorkerImpl implements CeWorker {
@@ -52,10 +54,10 @@ public class CeWorkerImpl implements CeWorker {
}
@Override
- public Boolean call() throws Exception {
+ public Result call() throws Exception {
Optional<CeTask> ceTask = tryAndFindTaskToExecute();
if (!ceTask.isPresent()) {
- return false;
+ return NO_TASK;
}
try {
@@ -63,7 +65,7 @@ public class CeWorkerImpl implements CeWorker {
} catch (Exception e) {
LOG.error(format("An error occurred while executing task with uuid '%s'", ceTask.get().getUuid()), e);
}
- return true;
+ return TASK_PROCESSED;
}
private Optional<CeTask> tryAndFindTaskToExecute() {
diff --git a/server/sonar-ce/src/test/java/org/sonar/ce/taskprocessor/CeProcessingSchedulerImplTest.java b/server/sonar-ce/src/test/java/org/sonar/ce/taskprocessor/CeProcessingSchedulerImplTest.java
index dc0b2598cab..57f6c225f7a 100644
--- a/server/sonar-ce/src/test/java/org/sonar/ce/taskprocessor/CeProcessingSchedulerImplTest.java
+++ b/server/sonar-ce/src/test/java/org/sonar/ce/taskprocessor/CeProcessingSchedulerImplTest.java
@@ -60,6 +60,8 @@ import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
+import static org.sonar.ce.taskprocessor.CeWorker.Result.NO_TASK;
+import static org.sonar.ce.taskprocessor.CeWorker.Result.TASK_PROCESSED;
public class CeProcessingSchedulerImplTest {
private static final Error ERROR_TO_INTERRUPT_CHAINING = new Error("Error should stop scheduling");
@@ -81,15 +83,14 @@ public class CeProcessingSchedulerImplTest {
@Test
public void polls_without_delay_when_CeWorkerCallable_returns_true() throws Exception {
when(ceWorker.call())
- .thenReturn(true)
+ .thenReturn(TASK_PROCESSED)
.thenThrow(ERROR_TO_INTERRUPT_CHAINING);
startSchedulingAndRun();
assertThat(processingExecutorService.getSchedulerCalls()).containsOnly(
regularDelayedPoll,
- notDelayedPoll
- );
+ notDelayedPoll);
}
@Test
@@ -102,36 +103,34 @@ public class CeProcessingSchedulerImplTest {
assertThat(processingExecutorService.getSchedulerCalls()).containsExactly(
regularDelayedPoll,
- notDelayedPoll
- );
+ notDelayedPoll);
}
@Test
public void polls_with_regular_delay_when_CeWorkerCallable_returns_false() throws Exception {
when(ceWorker.call())
- .thenReturn(false)
+ .thenReturn(NO_TASK)
.thenThrow(ERROR_TO_INTERRUPT_CHAINING);
startSchedulingAndRun();
assertThat(processingExecutorService.getSchedulerCalls()).containsExactly(
regularDelayedPoll,
- regularDelayedPoll
- );
+ regularDelayedPoll);
}
@Test
public void startScheduling_schedules_CeWorkerCallable_at_fixed_rate_run_head_of_queue() throws Exception {
when(ceWorker.call())
- .thenReturn(true)
- .thenReturn(true)
- .thenReturn(false)
- .thenReturn(true)
- .thenReturn(false)
+ .thenReturn(TASK_PROCESSED)
+ .thenReturn(TASK_PROCESSED)
+ .thenReturn(NO_TASK)
+ .thenReturn(TASK_PROCESSED)
+ .thenReturn(NO_TASK)
.thenThrow(new Exception("IAE should not cause scheduling to stop"))
- .thenReturn(false)
- .thenReturn(false)
- .thenReturn(false)
+ .thenReturn(NO_TASK)
+ .thenReturn(NO_TASK)
+ .thenReturn(NO_TASK)
.thenThrow(ERROR_TO_INTERRUPT_CHAINING);
startSchedulingAndRun();
@@ -146,20 +145,19 @@ public class CeProcessingSchedulerImplTest {
notDelayedPoll,
regularDelayedPoll,
regularDelayedPoll,
- regularDelayedPoll
- );
+ regularDelayedPoll);
}
@Test
public void stop_cancels_next_polling_and_does_not_add_any_new_one() throws Exception {
when(ceWorker.call())
- .thenReturn(false)
- .thenReturn(true)
- .thenReturn(false)
- .thenReturn(false)
- .thenReturn(false)
- .thenReturn(false)
- .thenReturn(false)
+ .thenReturn(NO_TASK)
+ .thenReturn(TASK_PROCESSED)
+ .thenReturn(NO_TASK)
+ .thenReturn(NO_TASK)
+ .thenReturn(NO_TASK)
+ .thenReturn(NO_TASK)
+ .thenReturn(NO_TASK)
.thenThrow(ERROR_TO_INTERRUPT_CHAINING);
underTest.startScheduling();
@@ -185,8 +183,7 @@ public class CeProcessingSchedulerImplTest {
regularDelayedPoll,
regularDelayedPoll,
notDelayedPoll,
- regularDelayedPoll
- );
+ regularDelayedPoll);
}
@Test
@@ -198,18 +195,18 @@ public class CeProcessingSchedulerImplTest {
for (int i = 0; i < workerCount; i++) {
workers[i] = mock(CeWorker.class);
when(workers[i].call())
- .thenReturn(false)
+ .thenReturn(NO_TASK)
.thenThrow(ERROR_TO_INTERRUPT_CHAINING);
}
ListenableScheduledFuture listenableScheduledFuture = mock(ListenableScheduledFuture.class);
CeProcessingSchedulerExecutorService processingExecutorService = mock(CeProcessingSchedulerExecutorService.class);
- when(processingExecutorService.schedule(any(CeWorker.class), any(Long.class),any(TimeUnit.class))).thenReturn(listenableScheduledFuture);
+ when(processingExecutorService.schedule(any(CeWorker.class), any(Long.class), any(TimeUnit.class))).thenReturn(listenableScheduledFuture);
CeWorkerFactory ceWorkerFactory = spy(new TestCeWorkerFactory(workers));
CeProcessingSchedulerImpl underTest = new CeProcessingSchedulerImpl(ceConfiguration, processingExecutorService, ceWorkerFactory);
when(processingExecutorService.schedule(ceWorker, ceConfiguration.getQueuePollingDelay(), MILLISECONDS))
- .thenReturn(listenableScheduledFuture);
+ .thenReturn(listenableScheduledFuture);
underTest.startScheduling();
// No exception from TestCeWorkerFactory must be thrown
diff --git a/server/sonar-ce/src/test/java/org/sonar/ce/taskprocessor/CeWorkerImplTest.java b/server/sonar-ce/src/test/java/org/sonar/ce/taskprocessor/CeWorkerImplTest.java
index 041418c5cf8..73381b49dfa 100644
--- a/server/sonar-ce/src/test/java/org/sonar/ce/taskprocessor/CeWorkerImplTest.java
+++ b/server/sonar-ce/src/test/java/org/sonar/ce/taskprocessor/CeWorkerImplTest.java
@@ -46,6 +46,8 @@ import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyZeroInteractions;
import static org.mockito.Mockito.when;
+import static org.sonar.ce.taskprocessor.CeWorker.Result.NO_TASK;
+import static org.sonar.ce.taskprocessor.CeWorker.Result.TASK_PROCESSED;
public class CeWorkerImplTest {
@@ -73,7 +75,7 @@ public class CeWorkerImplTest {
public void no_pending_tasks_in_queue() throws Exception {
when(queue.peek(anyString())).thenReturn(Optional.empty());
- assertThat(underTest.call()).isFalse();
+ assertThat(underTest.call()).isEqualTo(NO_TASK);
verifyZeroInteractions(taskProcessor, ceLogging);
}
@@ -84,7 +86,7 @@ public class CeWorkerImplTest {
taskProcessorRepository.setNoProcessorForTask(CeTaskTypes.REPORT);
when(queue.peek(anyString())).thenReturn(Optional.of(task));
- assertThat(underTest.call()).isTrue();
+ assertThat(underTest.call()).isEqualTo(TASK_PROCESSED);
verifyWorkerUuid();
inOrder.verify(ceLogging).initForTask(task);
@@ -98,7 +100,7 @@ public class CeWorkerImplTest {
taskProcessorRepository.setProcessorForTask(task.getType(), taskProcessor);
when(queue.peek(anyString())).thenReturn(Optional.of(task));
- assertThat(underTest.call()).isTrue();
+ assertThat(underTest.call()).isEqualTo(TASK_PROCESSED);
verifyWorkerUuid();
inOrder.verify(ceLogging).initForTask(task);
@@ -114,7 +116,7 @@ public class CeWorkerImplTest {
taskProcessorRepository.setProcessorForTask(task.getType(), taskProcessor);
Throwable error = makeTaskProcessorFail(task);
- assertThat(underTest.call()).isTrue();
+ assertThat(underTest.call()).isEqualTo(TASK_PROCESSED);
verifyWorkerUuid();
inOrder.verify(ceLogging).initForTask(task);