diff options
author | Eric Hartmann <hartmann.eric@gmail.com> | 2017-08-30 17:27:05 +0200 |
---|---|---|
committer | Simon Brandhof <simon.brandhof@sonarsource.com> | 2017-09-05 14:24:13 +0200 |
commit | 0f551cb0f2bbbdc9319b49fe495288eed8432fab (patch) | |
tree | 2643d483c6ff3ebe53c8b5ea05b3feb6a347fc85 /server/sonar-ce | |
parent | a228919e41902d87b3d4d73892ed8e849375fb85 (diff) | |
download | sonarqube-0f551cb0f2bbbdc9319b49fe495288eed8432fab.tar.gz sonarqube-0f551cb0f2bbbdc9319b49fe495288eed8432fab.zip |
SONAR-9715 Implement a delay for finishing task in Compute Engine
Diffstat (limited to 'server/sonar-ce')
8 files changed, 101 insertions, 25 deletions
diff --git a/server/sonar-ce/src/main/java/org/sonar/ce/app/CeServer.java b/server/sonar-ce/src/main/java/org/sonar/ce/app/CeServer.java index 129c65ef502..05815aed2b5 100644 --- a/server/sonar-ce/src/main/java/org/sonar/ce/app/CeServer.java +++ b/server/sonar-ce/src/main/java/org/sonar/ce/app/CeServer.java @@ -171,8 +171,9 @@ public class CeServer implements Monitored { try { Thread.sleep(CHECK_FOR_STOP_DELAY); } catch (InterruptedException e) { - // ignore the interruption itself, check the flag - Thread.currentThread().interrupt(); + // ignore the interruption itself + // Do not propagate the isInterrupted flag with : Thread.currentThread().interrupt(); + // It will break the shutdown of ComputeEngineContainerImpl#stop() } } attemptShutdown(); @@ -180,10 +181,11 @@ public class CeServer implements Monitored { private void attemptShutdown() { try { - LOG.info("Compute Engine shutting down..."); + LOG.info("Compute Engine is stopping..."); computeEngine.shutdown(); + LOG.info("Compute Engine is stopped"); } catch (Throwable e) { - LOG.error("Compute Engine shutdown failed", e); + LOG.error("Compute Engine failed to stop", e); } finally { // release thread waiting for CeServer stopAwait(); @@ -207,8 +209,9 @@ public class CeServer implements Monitored { if (t != null) { t.interrupt(); try { - t.join(1000); + t.join(1_000); } catch (InterruptedException e) { + Thread.currentThread().interrupt(); // Ignored } } diff --git a/server/sonar-ce/src/main/java/org/sonar/ce/container/ComputeEngineContainerImpl.java b/server/sonar-ce/src/main/java/org/sonar/ce/container/ComputeEngineContainerImpl.java index d89b83febaf..ca967e1b59d 100644 --- a/server/sonar-ce/src/main/java/org/sonar/ce/container/ComputeEngineContainerImpl.java +++ b/server/sonar-ce/src/main/java/org/sonar/ce/container/ComputeEngineContainerImpl.java @@ -53,6 +53,7 @@ import org.sonar.ce.platform.ComputeEngineExtensionInstaller; import org.sonar.ce.queue.CeQueueCleaner; import org.sonar.ce.queue.PurgeCeActivities; import org.sonar.ce.settings.ProjectConfigurationFactory; +import org.sonar.ce.taskprocessor.CeProcessingScheduler; import org.sonar.ce.taskprocessor.CeTaskProcessorModule; import org.sonar.ce.user.CeUserSession; import org.sonar.core.component.DefaultResourceTypes; @@ -220,6 +221,11 @@ public class ComputeEngineContainerImpl implements ComputeEngineContainer { @Override public ComputeEngineContainer stop() { + if (level4 != null) { + // try to graceful stop in-progress tasks + CeProcessingScheduler ceProcessingScheduler = level4.getComponentByType(CeProcessingScheduler.class); + ceProcessingScheduler.stopScheduling(); + } this.level1.stopComponents(); return this; } diff --git a/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeProcessingScheduler.java b/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeProcessingScheduler.java index 831684d2e8c..f2a327fe720 100644 --- a/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeProcessingScheduler.java +++ b/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeProcessingScheduler.java @@ -23,4 +23,5 @@ public interface CeProcessingScheduler { void startScheduling(); + void stopScheduling(); } diff --git a/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeProcessingSchedulerImpl.java b/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeProcessingSchedulerImpl.java index 458f527f307..4792bd27ed4 100644 --- a/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeProcessingSchedulerImpl.java +++ b/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeProcessingSchedulerImpl.java @@ -27,7 +27,6 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import javax.annotation.CheckForNull; import javax.annotation.Nullable; -import org.picocontainer.Startable; import org.sonar.api.utils.log.Logger; import org.sonar.api.utils.log.Loggers; import org.sonar.ce.configuration.CeConfiguration; @@ -35,7 +34,7 @@ import org.sonar.ce.configuration.CeConfiguration; import static com.google.common.util.concurrent.Futures.addCallback; import static java.util.concurrent.TimeUnit.MILLISECONDS; -public class CeProcessingSchedulerImpl implements CeProcessingScheduler, Startable { +public class CeProcessingSchedulerImpl implements CeProcessingScheduler { private static final Logger LOG = Loggers.get(CeProcessingSchedulerImpl.class); private static final long DELAY_BETWEEN_DISABLED_TASKS = 30 * 1000L; // 30 seconds @@ -43,12 +42,15 @@ public class CeProcessingSchedulerImpl implements CeProcessingScheduler, Startab private final long delayBetweenEnabledTasks; private final TimeUnit timeUnit; private final ChainingCallback[] chainingCallbacks; + private final EnabledCeWorkerController ceWorkerController; public CeProcessingSchedulerImpl(CeConfiguration ceConfiguration, - CeProcessingSchedulerExecutorService processingExecutorService, CeWorkerFactory ceCeWorkerFactory) { + CeProcessingSchedulerExecutorService processingExecutorService, CeWorkerFactory ceCeWorkerFactory, + EnabledCeWorkerController ceWorkerController) { this.executorService = processingExecutorService; this.delayBetweenEnabledTasks = ceConfiguration.getQueuePollingDelay(); + this.ceWorkerController = ceWorkerController; this.timeUnit = MILLISECONDS; int threadWorkerCount = ceConfiguration.getWorkerMaxCount(); @@ -60,11 +62,6 @@ public class CeProcessingSchedulerImpl implements CeProcessingScheduler, Startab } @Override - public void start() { - // nothing to do at component startup, startScheduling will be called by CeQueueInitializer - } - - @Override public void startScheduling() { for (ChainingCallback chainingCallback : chainingCallbacks) { ListenableScheduledFuture<CeWorker.Result> future = executorService.schedule(chainingCallback.worker, delayBetweenEnabledTasks, timeUnit); @@ -72,10 +69,37 @@ public class CeProcessingSchedulerImpl implements CeProcessingScheduler, Startab } } + /** + * This method is stopping all the workers giving them a delay before killing them. + */ @Override - public void stop() { + public void stopScheduling() { + LOG.debug("Stopping compute engine"); + // Requesting all workers to stop + for (ChainingCallback chainingCallback : chainingCallbacks) { + chainingCallback.stop(false); + } + + // Workers have 40s to gracefully stop processing tasks + long until = System.currentTimeMillis() + 40_000L; + LOG.info("Waiting for workers to finish in-progress tasks"); + while (System.currentTimeMillis() < until && ceWorkerController.hasAtLeastOneProcessingWorker()) { + try { + Thread.sleep(200L); + } catch (InterruptedException e) { + LOG.debug("Graceful stop period has been interrupted", e); + Thread.currentThread().interrupt(); + break; + } + } + + if (ceWorkerController.hasAtLeastOneProcessingWorker()) { + LOG.info("Some in-progress tasks did not finish in due time. Tasks will be stopped."); + } + + // Interrupting the tasks for (ChainingCallback chainingCallback : chainingCallbacks) { - chainingCallback.stop(); + chainingCallback.stop(true); } } @@ -149,10 +173,10 @@ public class CeProcessingSchedulerImpl implements CeProcessingScheduler, Startab return keepRunning.get(); } - public void stop() { + public void stop(boolean interrupt) { this.keepRunning.set(false); if (workerFuture != null) { - workerFuture.cancel(false); + workerFuture.cancel(interrupt); } } } diff --git a/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeWorkerImpl.java b/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeWorkerImpl.java index d52bed437f9..f8e6807cfa5 100644 --- a/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeWorkerImpl.java +++ b/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeWorkerImpl.java @@ -79,7 +79,7 @@ public class CeWorkerImpl implements CeWorker { return NO_TASK; } - try { + try (EnabledCeWorkerController.ProcessingRecorderHook processing = enabledCeWorkerController.registerProcessingFor(this)) { executeTask(ceTask.get()); } catch (Exception e) { LOG.error(format("An error occurred while executing task with uuid '%s'", ceTask.get().getUuid()), e); diff --git a/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/EnabledCeWorkerController.java b/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/EnabledCeWorkerController.java index 37c6c59c0ef..5634a20e455 100644 --- a/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/EnabledCeWorkerController.java +++ b/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/EnabledCeWorkerController.java @@ -24,6 +24,9 @@ package org.sonar.ce.taskprocessor; * task to process. */ public interface EnabledCeWorkerController { + interface ProcessingRecorderHook extends AutoCloseable { + } + /** * Requests the {@link EnabledCeWorkerController} to refresh its state, if it has any. */ @@ -33,4 +36,13 @@ public interface EnabledCeWorkerController { * Returns {@code true} if the specified {@link CeWorker} is enabled */ boolean isEnabled(CeWorker ceWorker); + + ProcessingRecorderHook registerProcessingFor(CeWorker ceWorker); + + /** + * Whether at least one worker is being processed a task or not. + * Returns {@code false} when all workers are waiting for tasks + * or are being stopped. + */ + boolean hasAtLeastOneProcessingWorker(); } diff --git a/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/EnabledCeWorkerControllerImpl.java b/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/EnabledCeWorkerControllerImpl.java index 7bb905a3db5..500eb671be9 100644 --- a/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/EnabledCeWorkerControllerImpl.java +++ b/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/EnabledCeWorkerControllerImpl.java @@ -19,14 +19,20 @@ */ package org.sonar.ce.taskprocessor; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicInteger; import org.sonar.api.utils.log.Loggers; import org.sonar.ce.configuration.CeConfiguration; public class EnabledCeWorkerControllerImpl implements EnabledCeWorkerController { + private final ConcurrentHashMap<CeWorker, Status> map = new ConcurrentHashMap<>(); private final CeConfiguration ceConfiguration; private final AtomicInteger workerCount; + enum Status { + PROCESSING, PAUSED + } + public EnabledCeWorkerControllerImpl(CeConfiguration ceConfiguration) { this.ceConfiguration = ceConfiguration; this.workerCount = new AtomicInteger(ceConfiguration.getWorkerCount()); @@ -46,6 +52,16 @@ public class EnabledCeWorkerControllerImpl implements EnabledCeWorkerController logEnabledWorkerCount(); } + @Override + public ProcessingRecorderHook registerProcessingFor(CeWorker ceWorker) { + return new ProcessingRecorderHookImpl(ceWorker); + } + + @Override + public boolean hasAtLeastOneProcessingWorker() { + return map.entrySet().stream().anyMatch(e -> e.getValue() == Status.PROCESSING); + } + /** * Returns {@code true} if {@link CeWorker#getOrdinal() worker ordinal} is strictly less than * {@link CeConfiguration#getWorkerCount()}. @@ -56,4 +72,18 @@ public class EnabledCeWorkerControllerImpl implements EnabledCeWorkerController public boolean isEnabled(CeWorker ceWorker) { return ceWorker.getOrdinal() < workerCount.get(); } + + private class ProcessingRecorderHookImpl implements ProcessingRecorderHook { + private final CeWorker ceWorker; + + private ProcessingRecorderHookImpl(CeWorker ceWorker) { + this.ceWorker = ceWorker; + map.put(this.ceWorker, Status.PROCESSING); + } + + @Override + public void close() throws Exception { + map.put(ceWorker, Status.PAUSED); + } + } } diff --git a/server/sonar-ce/src/test/java/org/sonar/ce/taskprocessor/CeProcessingSchedulerImplTest.java b/server/sonar-ce/src/test/java/org/sonar/ce/taskprocessor/CeProcessingSchedulerImplTest.java index 660dcc9a5ed..e98fa826962 100644 --- a/server/sonar-ce/src/test/java/org/sonar/ce/taskprocessor/CeProcessingSchedulerImplTest.java +++ b/server/sonar-ce/src/test/java/org/sonar/ce/taskprocessor/CeProcessingSchedulerImplTest.java @@ -67,20 +67,20 @@ import static org.sonar.ce.taskprocessor.CeWorker.Result.TASK_PROCESSED; public class CeProcessingSchedulerImplTest { private static final Error ERROR_TO_INTERRUPT_CHAINING = new Error("Error should stop scheduling"); - @Rule // due to risks of infinite chaining of tasks/futures, a timeout is required for safety + @Rule public TestRule safeguardTimeout = new DisableOnDebug(Timeout.seconds(60)); @Rule public CeConfigurationRule ceConfiguration = new CeConfigurationRule(); - // Required to prevent an infinite loop private CeWorker ceWorker = mock(CeWorker.class); private CeWorkerFactory ceWorkerFactory = new TestCeWorkerFactory(ceWorker); private StubCeProcessingSchedulerExecutorService processingExecutorService = new StubCeProcessingSchedulerExecutorService(); private SchedulerCall regularDelayedPoll = new SchedulerCall(ceWorker, 2000L, MILLISECONDS); private SchedulerCall extendedDelayedPoll = new SchedulerCall(ceWorker, 30000L, MILLISECONDS); private SchedulerCall notDelayedPoll = new SchedulerCall(ceWorker); + private EnabledCeWorkerController ceWorkerController = new EnabledCeWorkerControllerImpl(ceConfiguration); - private CeProcessingSchedulerImpl underTest = new CeProcessingSchedulerImpl(ceConfiguration, processingExecutorService, ceWorkerFactory); + private CeProcessingSchedulerImpl underTest = new CeProcessingSchedulerImpl(ceConfiguration, processingExecutorService, ceWorkerFactory, ceWorkerController); @Test public void polls_without_delay_when_CeWorkerCallable_returns_TASK_PROCESSED() throws Exception { @@ -164,7 +164,7 @@ public class CeProcessingSchedulerImplTest { } @Test - public void stop_cancels_next_polling_and_does_not_add_any_new_one() throws Exception { + public void stopScheduling_cancels_next_polling_and_does_not_add_any_new_one() throws Exception { when(ceWorker.call()) .thenReturn(NO_TASK) .thenReturn(TASK_PROCESSED) @@ -188,7 +188,7 @@ public class CeProcessingSchedulerImplTest { } // call stop after second delayed polling if (i == 1) { - underTest.stop(); + underTest.stopScheduling(); } i++; } @@ -219,7 +219,7 @@ public class CeProcessingSchedulerImplTest { when(processingExecutorService.schedule(any(CeWorker.class), any(Long.class), any(TimeUnit.class))).thenReturn(listenableScheduledFuture); CeWorkerFactory ceWorkerFactory = spy(new TestCeWorkerFactory(workers)); - CeProcessingSchedulerImpl underTest = new CeProcessingSchedulerImpl(ceConfiguration, processingExecutorService, ceWorkerFactory); + CeProcessingSchedulerImpl underTest = new CeProcessingSchedulerImpl(ceConfiguration, processingExecutorService, ceWorkerFactory, ceWorkerController); when(processingExecutorService.schedule(ceWorker, ceConfiguration.getQueuePollingDelay(), MILLISECONDS)) .thenReturn(listenableScheduledFuture); @@ -454,7 +454,7 @@ public class CeProcessingSchedulerImplTest { @Override public void shutdown() { - throw new UnsupportedOperationException("shutdown() not implemented"); + // Nothing to do } @Override |