aboutsummaryrefslogtreecommitdiffstats
path: root/server/sonar-ce/src
diff options
context:
space:
mode:
authorSébastien Lesaint <sebastien.lesaint@sonarsource.com>2017-07-11 18:08:07 +0200
committerSébastien Lesaint <sebastien.lesaint@sonarsource.com>2017-07-18 08:51:46 +0200
commit7210cafc32b2e496c295ef37f48938fe702f97dc (patch)
tree8cfa25aa02f4435fd68dea127da1b1b096cd550f /server/sonar-ce/src
parent7779619c9134b24eadb7b37d322a91d6b498e09d (diff)
downloadsonarqube-7210cafc32b2e496c295ef37f48938fe702f97dc.tar.gz
sonarqube-7210cafc32b2e496c295ef37f48938fe702f97dc.zip
SONAR-9525 use 30s delay between disabled workers run
Diffstat (limited to 'server/sonar-ce/src')
-rw-r--r--server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeProcessingSchedulerImpl.java36
-rw-r--r--server/sonar-ce/src/test/java/org/sonar/ce/taskprocessor/CeProcessingSchedulerImplTest.java19
2 files changed, 43 insertions, 12 deletions
diff --git a/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeProcessingSchedulerImpl.java b/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeProcessingSchedulerImpl.java
index 7b12e6c7ee3..458f527f307 100644
--- a/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeProcessingSchedulerImpl.java
+++ b/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeProcessingSchedulerImpl.java
@@ -34,14 +34,13 @@ import org.sonar.ce.configuration.CeConfiguration;
import static com.google.common.util.concurrent.Futures.addCallback;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
-import static org.sonar.ce.taskprocessor.CeWorker.Result.TASK_PROCESSED;
public class CeProcessingSchedulerImpl implements CeProcessingScheduler, Startable {
private static final Logger LOG = Loggers.get(CeProcessingSchedulerImpl.class);
+ private static final long DELAY_BETWEEN_DISABLED_TASKS = 30 * 1000L; // 30 seconds
private final CeProcessingSchedulerExecutorService executorService;
-
- private final long delayBetweenTasks;
+ private final long delayBetweenEnabledTasks;
private final TimeUnit timeUnit;
private final ChainingCallback[] chainingCallbacks;
@@ -49,7 +48,7 @@ public class CeProcessingSchedulerImpl implements CeProcessingScheduler, Startab
CeProcessingSchedulerExecutorService processingExecutorService, CeWorkerFactory ceCeWorkerFactory) {
this.executorService = processingExecutorService;
- this.delayBetweenTasks = ceConfiguration.getQueuePollingDelay();
+ this.delayBetweenEnabledTasks = ceConfiguration.getQueuePollingDelay();
this.timeUnit = MILLISECONDS;
int threadWorkerCount = ceConfiguration.getWorkerMaxCount();
@@ -68,7 +67,7 @@ public class CeProcessingSchedulerImpl implements CeProcessingScheduler, Startab
@Override
public void startScheduling() {
for (ChainingCallback chainingCallback : chainingCallbacks) {
- ListenableScheduledFuture<CeWorker.Result> future = executorService.schedule(chainingCallback.worker, delayBetweenTasks, timeUnit);
+ ListenableScheduledFuture<CeWorker.Result> future = executorService.schedule(chainingCallback.worker, delayBetweenEnabledTasks, timeUnit);
addCallback(future, chainingCallback, executorService);
}
}
@@ -93,10 +92,20 @@ public class CeProcessingSchedulerImpl implements CeProcessingScheduler, Startab
@Override
public void onSuccess(@Nullable CeWorker.Result result) {
- if (result != null && result == TASK_PROCESSED) {
- chainWithoutDelay();
+ if (result == null) {
+ chainWithEnabledTaskDelay();
} else {
- chainWithDelay();
+ switch (result) {
+ case DISABLED:
+ chainWithDisabledTaskDelay();
+ break;
+ case NO_TASK:
+ chainWithEnabledTaskDelay();
+ break;
+ case TASK_PROCESSED:
+ default:
+ chainWithoutDelay();
+ }
}
}
@@ -116,9 +125,16 @@ public class CeProcessingSchedulerImpl implements CeProcessingScheduler, Startab
addCallback();
}
- private void chainWithDelay() {
+ private void chainWithEnabledTaskDelay() {
+ if (keepRunning()) {
+ workerFuture = executorService.schedule(worker, delayBetweenEnabledTasks, timeUnit);
+ }
+ addCallback();
+ }
+
+ private void chainWithDisabledTaskDelay() {
if (keepRunning()) {
- workerFuture = executorService.schedule(worker, delayBetweenTasks, timeUnit);
+ workerFuture = executorService.schedule(worker, DELAY_BETWEEN_DISABLED_TASKS, timeUnit);
}
addCallback();
}
diff --git a/server/sonar-ce/src/test/java/org/sonar/ce/taskprocessor/CeProcessingSchedulerImplTest.java b/server/sonar-ce/src/test/java/org/sonar/ce/taskprocessor/CeProcessingSchedulerImplTest.java
index 57f6c225f7a..660dcc9a5ed 100644
--- a/server/sonar-ce/src/test/java/org/sonar/ce/taskprocessor/CeProcessingSchedulerImplTest.java
+++ b/server/sonar-ce/src/test/java/org/sonar/ce/taskprocessor/CeProcessingSchedulerImplTest.java
@@ -60,6 +60,7 @@ import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
+import static org.sonar.ce.taskprocessor.CeWorker.Result.DISABLED;
import static org.sonar.ce.taskprocessor.CeWorker.Result.NO_TASK;
import static org.sonar.ce.taskprocessor.CeWorker.Result.TASK_PROCESSED;
@@ -76,12 +77,13 @@ public class CeProcessingSchedulerImplTest {
private CeWorkerFactory ceWorkerFactory = new TestCeWorkerFactory(ceWorker);
private StubCeProcessingSchedulerExecutorService processingExecutorService = new StubCeProcessingSchedulerExecutorService();
private SchedulerCall regularDelayedPoll = new SchedulerCall(ceWorker, 2000L, MILLISECONDS);
+ private SchedulerCall extendedDelayedPoll = new SchedulerCall(ceWorker, 30000L, MILLISECONDS);
private SchedulerCall notDelayedPoll = new SchedulerCall(ceWorker);
private CeProcessingSchedulerImpl underTest = new CeProcessingSchedulerImpl(ceConfiguration, processingExecutorService, ceWorkerFactory);
@Test
- public void polls_without_delay_when_CeWorkerCallable_returns_true() throws Exception {
+ public void polls_without_delay_when_CeWorkerCallable_returns_TASK_PROCESSED() throws Exception {
when(ceWorker.call())
.thenReturn(TASK_PROCESSED)
.thenThrow(ERROR_TO_INTERRUPT_CHAINING);
@@ -107,7 +109,7 @@ public class CeProcessingSchedulerImplTest {
}
@Test
- public void polls_with_regular_delay_when_CeWorkerCallable_returns_false() throws Exception {
+ public void polls_with_regular_delay_when_CeWorkerCallable_returns_NO_TASK() throws Exception {
when(ceWorker.call())
.thenReturn(NO_TASK)
.thenThrow(ERROR_TO_INTERRUPT_CHAINING);
@@ -120,6 +122,19 @@ public class CeProcessingSchedulerImplTest {
}
@Test
+ public void polls_with_extended_delay_when_CeWorkerCallable_returns_DISABLED() throws Exception {
+ when(ceWorker.call())
+ .thenReturn(DISABLED)
+ .thenThrow(ERROR_TO_INTERRUPT_CHAINING);
+
+ startSchedulingAndRun();
+
+ assertThat(processingExecutorService.getSchedulerCalls()).containsExactly(
+ regularDelayedPoll,
+ extendedDelayedPoll);
+ }
+
+ @Test
public void startScheduling_schedules_CeWorkerCallable_at_fixed_rate_run_head_of_queue() throws Exception {
when(ceWorker.call())
.thenReturn(TASK_PROCESSED)