aboutsummaryrefslogtreecommitdiffstats
path: root/server/sonar-ce
diff options
context:
space:
mode:
authorEric Hartmann <hartmann.eric@gmail.com>2017-08-30 17:27:05 +0200
committerSimon Brandhof <simon.brandhof@sonarsource.com>2017-09-05 14:24:13 +0200
commit0f551cb0f2bbbdc9319b49fe495288eed8432fab (patch)
tree2643d483c6ff3ebe53c8b5ea05b3feb6a347fc85 /server/sonar-ce
parenta228919e41902d87b3d4d73892ed8e849375fb85 (diff)
downloadsonarqube-0f551cb0f2bbbdc9319b49fe495288eed8432fab.tar.gz
sonarqube-0f551cb0f2bbbdc9319b49fe495288eed8432fab.zip
SONAR-9715 Implement a delay for finishing task in Compute Engine
Diffstat (limited to 'server/sonar-ce')
-rw-r--r--server/sonar-ce/src/main/java/org/sonar/ce/app/CeServer.java13
-rw-r--r--server/sonar-ce/src/main/java/org/sonar/ce/container/ComputeEngineContainerImpl.java6
-rw-r--r--server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeProcessingScheduler.java1
-rw-r--r--server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeProcessingSchedulerImpl.java48
-rw-r--r--server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeWorkerImpl.java2
-rw-r--r--server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/EnabledCeWorkerController.java12
-rw-r--r--server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/EnabledCeWorkerControllerImpl.java30
-rw-r--r--server/sonar-ce/src/test/java/org/sonar/ce/taskprocessor/CeProcessingSchedulerImplTest.java14
8 files changed, 101 insertions, 25 deletions
diff --git a/server/sonar-ce/src/main/java/org/sonar/ce/app/CeServer.java b/server/sonar-ce/src/main/java/org/sonar/ce/app/CeServer.java
index 129c65ef502..05815aed2b5 100644
--- a/server/sonar-ce/src/main/java/org/sonar/ce/app/CeServer.java
+++ b/server/sonar-ce/src/main/java/org/sonar/ce/app/CeServer.java
@@ -171,8 +171,9 @@ public class CeServer implements Monitored {
try {
Thread.sleep(CHECK_FOR_STOP_DELAY);
} catch (InterruptedException e) {
- // ignore the interruption itself, check the flag
- Thread.currentThread().interrupt();
+ // ignore the interruption itself
+ // Do not propagate the isInterrupted flag with : Thread.currentThread().interrupt();
+ // It will break the shutdown of ComputeEngineContainerImpl#stop()
}
}
attemptShutdown();
@@ -180,10 +181,11 @@ public class CeServer implements Monitored {
private void attemptShutdown() {
try {
- LOG.info("Compute Engine shutting down...");
+ LOG.info("Compute Engine is stopping...");
computeEngine.shutdown();
+ LOG.info("Compute Engine is stopped");
} catch (Throwable e) {
- LOG.error("Compute Engine shutdown failed", e);
+ LOG.error("Compute Engine failed to stop", e);
} finally {
// release thread waiting for CeServer
stopAwait();
@@ -207,8 +209,9 @@ public class CeServer implements Monitored {
if (t != null) {
t.interrupt();
try {
- t.join(1000);
+ t.join(1_000);
} catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
// Ignored
}
}
diff --git a/server/sonar-ce/src/main/java/org/sonar/ce/container/ComputeEngineContainerImpl.java b/server/sonar-ce/src/main/java/org/sonar/ce/container/ComputeEngineContainerImpl.java
index d89b83febaf..ca967e1b59d 100644
--- a/server/sonar-ce/src/main/java/org/sonar/ce/container/ComputeEngineContainerImpl.java
+++ b/server/sonar-ce/src/main/java/org/sonar/ce/container/ComputeEngineContainerImpl.java
@@ -53,6 +53,7 @@ import org.sonar.ce.platform.ComputeEngineExtensionInstaller;
import org.sonar.ce.queue.CeQueueCleaner;
import org.sonar.ce.queue.PurgeCeActivities;
import org.sonar.ce.settings.ProjectConfigurationFactory;
+import org.sonar.ce.taskprocessor.CeProcessingScheduler;
import org.sonar.ce.taskprocessor.CeTaskProcessorModule;
import org.sonar.ce.user.CeUserSession;
import org.sonar.core.component.DefaultResourceTypes;
@@ -220,6 +221,11 @@ public class ComputeEngineContainerImpl implements ComputeEngineContainer {
@Override
public ComputeEngineContainer stop() {
+ if (level4 != null) {
+ // try to graceful stop in-progress tasks
+ CeProcessingScheduler ceProcessingScheduler = level4.getComponentByType(CeProcessingScheduler.class);
+ ceProcessingScheduler.stopScheduling();
+ }
this.level1.stopComponents();
return this;
}
diff --git a/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeProcessingScheduler.java b/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeProcessingScheduler.java
index 831684d2e8c..f2a327fe720 100644
--- a/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeProcessingScheduler.java
+++ b/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeProcessingScheduler.java
@@ -23,4 +23,5 @@ public interface CeProcessingScheduler {
void startScheduling();
+ void stopScheduling();
}
diff --git a/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeProcessingSchedulerImpl.java b/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeProcessingSchedulerImpl.java
index 458f527f307..4792bd27ed4 100644
--- a/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeProcessingSchedulerImpl.java
+++ b/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeProcessingSchedulerImpl.java
@@ -27,7 +27,6 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.annotation.CheckForNull;
import javax.annotation.Nullable;
-import org.picocontainer.Startable;
import org.sonar.api.utils.log.Logger;
import org.sonar.api.utils.log.Loggers;
import org.sonar.ce.configuration.CeConfiguration;
@@ -35,7 +34,7 @@ import org.sonar.ce.configuration.CeConfiguration;
import static com.google.common.util.concurrent.Futures.addCallback;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
-public class CeProcessingSchedulerImpl implements CeProcessingScheduler, Startable {
+public class CeProcessingSchedulerImpl implements CeProcessingScheduler {
private static final Logger LOG = Loggers.get(CeProcessingSchedulerImpl.class);
private static final long DELAY_BETWEEN_DISABLED_TASKS = 30 * 1000L; // 30 seconds
@@ -43,12 +42,15 @@ public class CeProcessingSchedulerImpl implements CeProcessingScheduler, Startab
private final long delayBetweenEnabledTasks;
private final TimeUnit timeUnit;
private final ChainingCallback[] chainingCallbacks;
+ private final EnabledCeWorkerController ceWorkerController;
public CeProcessingSchedulerImpl(CeConfiguration ceConfiguration,
- CeProcessingSchedulerExecutorService processingExecutorService, CeWorkerFactory ceCeWorkerFactory) {
+ CeProcessingSchedulerExecutorService processingExecutorService, CeWorkerFactory ceCeWorkerFactory,
+ EnabledCeWorkerController ceWorkerController) {
this.executorService = processingExecutorService;
this.delayBetweenEnabledTasks = ceConfiguration.getQueuePollingDelay();
+ this.ceWorkerController = ceWorkerController;
this.timeUnit = MILLISECONDS;
int threadWorkerCount = ceConfiguration.getWorkerMaxCount();
@@ -60,11 +62,6 @@ public class CeProcessingSchedulerImpl implements CeProcessingScheduler, Startab
}
@Override
- public void start() {
- // nothing to do at component startup, startScheduling will be called by CeQueueInitializer
- }
-
- @Override
public void startScheduling() {
for (ChainingCallback chainingCallback : chainingCallbacks) {
ListenableScheduledFuture<CeWorker.Result> future = executorService.schedule(chainingCallback.worker, delayBetweenEnabledTasks, timeUnit);
@@ -72,10 +69,37 @@ public class CeProcessingSchedulerImpl implements CeProcessingScheduler, Startab
}
}
+ /**
+ * This method is stopping all the workers giving them a delay before killing them.
+ */
@Override
- public void stop() {
+ public void stopScheduling() {
+ LOG.debug("Stopping compute engine");
+ // Requesting all workers to stop
+ for (ChainingCallback chainingCallback : chainingCallbacks) {
+ chainingCallback.stop(false);
+ }
+
+ // Workers have 40s to gracefully stop processing tasks
+ long until = System.currentTimeMillis() + 40_000L;
+ LOG.info("Waiting for workers to finish in-progress tasks");
+ while (System.currentTimeMillis() < until && ceWorkerController.hasAtLeastOneProcessingWorker()) {
+ try {
+ Thread.sleep(200L);
+ } catch (InterruptedException e) {
+ LOG.debug("Graceful stop period has been interrupted", e);
+ Thread.currentThread().interrupt();
+ break;
+ }
+ }
+
+ if (ceWorkerController.hasAtLeastOneProcessingWorker()) {
+ LOG.info("Some in-progress tasks did not finish in due time. Tasks will be stopped.");
+ }
+
+ // Interrupting the tasks
for (ChainingCallback chainingCallback : chainingCallbacks) {
- chainingCallback.stop();
+ chainingCallback.stop(true);
}
}
@@ -149,10 +173,10 @@ public class CeProcessingSchedulerImpl implements CeProcessingScheduler, Startab
return keepRunning.get();
}
- public void stop() {
+ public void stop(boolean interrupt) {
this.keepRunning.set(false);
if (workerFuture != null) {
- workerFuture.cancel(false);
+ workerFuture.cancel(interrupt);
}
}
}
diff --git a/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeWorkerImpl.java b/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeWorkerImpl.java
index d52bed437f9..f8e6807cfa5 100644
--- a/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeWorkerImpl.java
+++ b/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeWorkerImpl.java
@@ -79,7 +79,7 @@ public class CeWorkerImpl implements CeWorker {
return NO_TASK;
}
- try {
+ try (EnabledCeWorkerController.ProcessingRecorderHook processing = enabledCeWorkerController.registerProcessingFor(this)) {
executeTask(ceTask.get());
} catch (Exception e) {
LOG.error(format("An error occurred while executing task with uuid '%s'", ceTask.get().getUuid()), e);
diff --git a/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/EnabledCeWorkerController.java b/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/EnabledCeWorkerController.java
index 37c6c59c0ef..5634a20e455 100644
--- a/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/EnabledCeWorkerController.java
+++ b/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/EnabledCeWorkerController.java
@@ -24,6 +24,9 @@ package org.sonar.ce.taskprocessor;
* task to process.
*/
public interface EnabledCeWorkerController {
+ interface ProcessingRecorderHook extends AutoCloseable {
+ }
+
/**
* Requests the {@link EnabledCeWorkerController} to refresh its state, if it has any.
*/
@@ -33,4 +36,13 @@ public interface EnabledCeWorkerController {
* Returns {@code true} if the specified {@link CeWorker} is enabled
*/
boolean isEnabled(CeWorker ceWorker);
+
+ ProcessingRecorderHook registerProcessingFor(CeWorker ceWorker);
+
+ /**
+ * Whether at least one worker is being processed a task or not.
+ * Returns {@code false} when all workers are waiting for tasks
+ * or are being stopped.
+ */
+ boolean hasAtLeastOneProcessingWorker();
}
diff --git a/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/EnabledCeWorkerControllerImpl.java b/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/EnabledCeWorkerControllerImpl.java
index 7bb905a3db5..500eb671be9 100644
--- a/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/EnabledCeWorkerControllerImpl.java
+++ b/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/EnabledCeWorkerControllerImpl.java
@@ -19,14 +19,20 @@
*/
package org.sonar.ce.taskprocessor;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import org.sonar.api.utils.log.Loggers;
import org.sonar.ce.configuration.CeConfiguration;
public class EnabledCeWorkerControllerImpl implements EnabledCeWorkerController {
+ private final ConcurrentHashMap<CeWorker, Status> map = new ConcurrentHashMap<>();
private final CeConfiguration ceConfiguration;
private final AtomicInteger workerCount;
+ enum Status {
+ PROCESSING, PAUSED
+ }
+
public EnabledCeWorkerControllerImpl(CeConfiguration ceConfiguration) {
this.ceConfiguration = ceConfiguration;
this.workerCount = new AtomicInteger(ceConfiguration.getWorkerCount());
@@ -46,6 +52,16 @@ public class EnabledCeWorkerControllerImpl implements EnabledCeWorkerController
logEnabledWorkerCount();
}
+ @Override
+ public ProcessingRecorderHook registerProcessingFor(CeWorker ceWorker) {
+ return new ProcessingRecorderHookImpl(ceWorker);
+ }
+
+ @Override
+ public boolean hasAtLeastOneProcessingWorker() {
+ return map.entrySet().stream().anyMatch(e -> e.getValue() == Status.PROCESSING);
+ }
+
/**
* Returns {@code true} if {@link CeWorker#getOrdinal() worker ordinal} is strictly less than
* {@link CeConfiguration#getWorkerCount()}.
@@ -56,4 +72,18 @@ public class EnabledCeWorkerControllerImpl implements EnabledCeWorkerController
public boolean isEnabled(CeWorker ceWorker) {
return ceWorker.getOrdinal() < workerCount.get();
}
+
+ private class ProcessingRecorderHookImpl implements ProcessingRecorderHook {
+ private final CeWorker ceWorker;
+
+ private ProcessingRecorderHookImpl(CeWorker ceWorker) {
+ this.ceWorker = ceWorker;
+ map.put(this.ceWorker, Status.PROCESSING);
+ }
+
+ @Override
+ public void close() throws Exception {
+ map.put(ceWorker, Status.PAUSED);
+ }
+ }
}
diff --git a/server/sonar-ce/src/test/java/org/sonar/ce/taskprocessor/CeProcessingSchedulerImplTest.java b/server/sonar-ce/src/test/java/org/sonar/ce/taskprocessor/CeProcessingSchedulerImplTest.java
index 660dcc9a5ed..e98fa826962 100644
--- a/server/sonar-ce/src/test/java/org/sonar/ce/taskprocessor/CeProcessingSchedulerImplTest.java
+++ b/server/sonar-ce/src/test/java/org/sonar/ce/taskprocessor/CeProcessingSchedulerImplTest.java
@@ -67,20 +67,20 @@ import static org.sonar.ce.taskprocessor.CeWorker.Result.TASK_PROCESSED;
public class CeProcessingSchedulerImplTest {
private static final Error ERROR_TO_INTERRUPT_CHAINING = new Error("Error should stop scheduling");
- @Rule
// due to risks of infinite chaining of tasks/futures, a timeout is required for safety
+ @Rule
public TestRule safeguardTimeout = new DisableOnDebug(Timeout.seconds(60));
@Rule
public CeConfigurationRule ceConfiguration = new CeConfigurationRule();
- // Required to prevent an infinite loop
private CeWorker ceWorker = mock(CeWorker.class);
private CeWorkerFactory ceWorkerFactory = new TestCeWorkerFactory(ceWorker);
private StubCeProcessingSchedulerExecutorService processingExecutorService = new StubCeProcessingSchedulerExecutorService();
private SchedulerCall regularDelayedPoll = new SchedulerCall(ceWorker, 2000L, MILLISECONDS);
private SchedulerCall extendedDelayedPoll = new SchedulerCall(ceWorker, 30000L, MILLISECONDS);
private SchedulerCall notDelayedPoll = new SchedulerCall(ceWorker);
+ private EnabledCeWorkerController ceWorkerController = new EnabledCeWorkerControllerImpl(ceConfiguration);
- private CeProcessingSchedulerImpl underTest = new CeProcessingSchedulerImpl(ceConfiguration, processingExecutorService, ceWorkerFactory);
+ private CeProcessingSchedulerImpl underTest = new CeProcessingSchedulerImpl(ceConfiguration, processingExecutorService, ceWorkerFactory, ceWorkerController);
@Test
public void polls_without_delay_when_CeWorkerCallable_returns_TASK_PROCESSED() throws Exception {
@@ -164,7 +164,7 @@ public class CeProcessingSchedulerImplTest {
}
@Test
- public void stop_cancels_next_polling_and_does_not_add_any_new_one() throws Exception {
+ public void stopScheduling_cancels_next_polling_and_does_not_add_any_new_one() throws Exception {
when(ceWorker.call())
.thenReturn(NO_TASK)
.thenReturn(TASK_PROCESSED)
@@ -188,7 +188,7 @@ public class CeProcessingSchedulerImplTest {
}
// call stop after second delayed polling
if (i == 1) {
- underTest.stop();
+ underTest.stopScheduling();
}
i++;
}
@@ -219,7 +219,7 @@ public class CeProcessingSchedulerImplTest {
when(processingExecutorService.schedule(any(CeWorker.class), any(Long.class), any(TimeUnit.class))).thenReturn(listenableScheduledFuture);
CeWorkerFactory ceWorkerFactory = spy(new TestCeWorkerFactory(workers));
- CeProcessingSchedulerImpl underTest = new CeProcessingSchedulerImpl(ceConfiguration, processingExecutorService, ceWorkerFactory);
+ CeProcessingSchedulerImpl underTest = new CeProcessingSchedulerImpl(ceConfiguration, processingExecutorService, ceWorkerFactory, ceWorkerController);
when(processingExecutorService.schedule(ceWorker, ceConfiguration.getQueuePollingDelay(), MILLISECONDS))
.thenReturn(listenableScheduledFuture);
@@ -454,7 +454,7 @@ public class CeProcessingSchedulerImplTest {
@Override
public void shutdown() {
- throw new UnsupportedOperationException("shutdown() not implemented");
+ // Nothing to do
}
@Override