@@ -30,6 +30,17 @@ public interface ComputeEngine { | |||
void startup(); | |||
/** | |||
* Terminates all CE workers and blocks until then. | |||
* | |||
* @throws IllegalStateException if {@link #startup()} has never been called | |||
* @throws IllegalStateException if called after {@link #shutdown()} | |||
* @throws IllegalStateException when called more than once | |||
*/ | |||
void stopProcessing(); | |||
/** | |||
* Terminates all CE workers and stops the Compute Engine releasing all resources, and blocks until then. | |||
* | |||
* @throws IllegalStateException if {@link #startup()} has never been called | |||
* @throws IllegalStateException when called more than once | |||
*/ |
@@ -48,22 +48,33 @@ public class ComputeEngineImpl implements ComputeEngine, ComputeEngineStatus { | |||
} | |||
} | |||
@Override | |||
public void stopProcessing() { | |||
checkState(this.status.ordinal() >= Status.STARTED.ordinal(), "stopProcessing() must not be called before startup()"); | |||
checkState(this.status.ordinal() <= Status.STOPPING.ordinal(), "stopProcessing() can not be called after shutdown()"); | |||
checkState(this.status.ordinal() <= Status.STOPPING_WORKERS.ordinal(), "stopProcessing() can not be called multiple times"); | |||
try { | |||
this.status = Status.STOPPING_WORKERS; | |||
this.computeEngineContainer.stopWorkers(); | |||
} finally { | |||
this.status = Status.WORKERS_STOPPED; | |||
} | |||
} | |||
@Override | |||
public void shutdown() { | |||
checkStateAsShutdown(this.status); | |||
checkState(this.status.ordinal() >= Status.STARTED.ordinal(), "shutdown() must not be called before startup()"); | |||
checkState(this.status.ordinal() <= Status.STOPPING.ordinal(), "shutdown() can not be called multiple times"); | |||
try { | |||
this.status = Status.HARD_STOPPING; | |||
this.status = Status.STOPPING; | |||
this.computeEngineContainer.stop(); | |||
} finally { | |||
this.status = Status.STOPPED; | |||
} | |||
} | |||
private static void checkStateAsShutdown(Status currentStatus) { | |||
checkState(currentStatus.ordinal() >= Status.STARTED.ordinal(), "shutdown() must not be called before startup()"); | |||
checkState(currentStatus.ordinal() <= Status.HARD_STOPPING.ordinal(), "shutdown() can not be called multiple times"); | |||
} | |||
@Override | |||
public Status getStatus() { |
@@ -106,7 +106,7 @@ public class CeServer implements Monitored { | |||
@Override | |||
public void stop() { | |||
// FIXME MMF-1673 implement graceful stop in CE: eg. call computeEngine.stopProcessing(); | |||
computeEngine.stopProcessing(); | |||
hardStop(); | |||
} | |||
@@ -26,5 +26,7 @@ public interface ComputeEngineContainer { | |||
ComputeEngineContainer start(Props props); | |||
ComputeEngineContainer stopWorkers(); | |||
ComputeEngineContainer stop(); | |||
} |
@@ -250,11 +250,21 @@ public class ComputeEngineContainerImpl implements ComputeEngineContainer { | |||
} | |||
@Override | |||
public ComputeEngineContainer stop() { | |||
public ComputeEngineContainer stopWorkers() { | |||
if (level4 != null) { | |||
// try to graceful stop in-progress tasks | |||
CeProcessingScheduler ceProcessingScheduler = level4.getComponentByType(CeProcessingScheduler.class); | |||
ceProcessingScheduler.stopScheduling(); | |||
ceProcessingScheduler.gracefulStopScheduling(); | |||
} | |||
return this; | |||
} | |||
@Override | |||
public ComputeEngineContainer stop() { | |||
if (level4 != null) { | |||
// try to graceful but quick stop in-progress tasks | |||
CeProcessingScheduler ceProcessingScheduler = level4.getComponentByType(CeProcessingScheduler.class); | |||
ceProcessingScheduler.hardStopScheduling(); | |||
} | |||
this.level1.stopComponents(); | |||
return this; |
@@ -30,6 +30,6 @@ public interface ComputeEngineStatus { | |||
Status getStatus(); | |||
enum Status { | |||
INIT, STARTING, STARTED, HARD_STOPPING, STOPPED | |||
INIT, STARTING, STARTED, STOPPING_WORKERS, WORKERS_STOPPED, STOPPING, STOPPED | |||
} | |||
} |
@@ -23,5 +23,7 @@ public interface CeProcessingScheduler { | |||
void startScheduling(); | |||
void stopScheduling(); | |||
void gracefulStopScheduling(); | |||
void hardStopScheduling(); | |||
} |
@@ -23,6 +23,7 @@ import com.google.common.util.concurrent.FutureCallback; | |||
import com.google.common.util.concurrent.Futures; | |||
import com.google.common.util.concurrent.ListenableFuture; | |||
import com.google.common.util.concurrent.ListenableScheduledFuture; | |||
import java.util.Arrays; | |||
import java.util.concurrent.TimeUnit; | |||
import javax.annotation.CheckForNull; | |||
import javax.annotation.Nullable; | |||
@@ -72,41 +73,77 @@ public class CeProcessingSchedulerImpl implements CeProcessingScheduler { | |||
} | |||
/** | |||
* This method is stopping all the workers giving them a delay before killing them. | |||
* This method is stopping all the workers and giving them a very large delay before killing them. | |||
* <p> | |||
* It supports being interrupted (eg. by a hard stop). | |||
*/ | |||
@Override | |||
public void stopScheduling() { | |||
LOG.debug("Stopping compute engine"); | |||
// Requesting all workers to stop | |||
for (ChainingCallback chainingCallback : chainingCallbacks) { | |||
chainingCallback.stop(false); | |||
public void gracefulStopScheduling() { | |||
LOG.info("Gracefully stopping workers..."); | |||
requestAllWorkersToStop(); | |||
try { | |||
waitForInProgressWorkersToFinish(gracefulStopTimeoutInMs); | |||
if (ceWorkerController.hasAtLeastOneProcessingWorker()) { | |||
LOG.info("Graceful stop period ended but some in-progress task did not finish. Tasks will be interrupted."); | |||
} | |||
interruptAllWorkers(); | |||
} catch (InterruptedException e) { | |||
LOG.debug("Graceful stop was interrupted"); | |||
Thread.currentThread().interrupt(); | |||
} | |||
} | |||
/** | |||
* This method is stopping all the workers and hardly giving them a delay before killing them. | |||
* <p> | |||
* If interrupted, it will interrupt any worker still in-progress before returning. | |||
*/ | |||
public void hardStopScheduling() { | |||
// nothing to do if graceful stop went through | |||
if (Arrays.stream(chainingCallbacks).allMatch(ChainingCallback::isInterrupted)) { | |||
return; | |||
} | |||
// Workers have 40s to gracefully stop processing tasks | |||
long until = System.currentTimeMillis() + gracefulStopTimeoutInMs; | |||
LOG.info("Waiting for workers to finish in-progress tasks"); | |||
while (System.currentTimeMillis() < until && ceWorkerController.hasAtLeastOneProcessingWorker()) { | |||
try { | |||
Thread.sleep(200L); | |||
} catch (InterruptedException e) { | |||
LOG.debug("Graceful stop period has been interrupted: {}", e); | |||
Thread.currentThread().interrupt(); | |||
break; | |||
} | |||
LOG.info("Hard stopping workers..."); | |||
requestAllWorkersToStop(); | |||
try { | |||
waitForInProgressWorkersToFinish(350); | |||
} catch (InterruptedException e) { | |||
LOG.debug("Grace period of hard stop has been interrupted: {}", e); | |||
Thread.currentThread().interrupt(); | |||
} | |||
if (ceWorkerController.hasAtLeastOneProcessingWorker()) { | |||
LOG.info("Some in-progress tasks did not finish in due time. Tasks will be stopped."); | |||
LOG.info("Some in-progress tasks are getting killed."); | |||
} | |||
// Interrupting the tasks | |||
for (ChainingCallback chainingCallback : chainingCallbacks) { | |||
chainingCallback.stop(true); | |||
interruptAllWorkers(); | |||
} | |||
private void interruptAllWorkers() { | |||
// Interrupting the tasks | |||
Arrays.stream(chainingCallbacks).forEach(t -> t.stop(true)); | |||
} | |||
private void waitForInProgressWorkersToFinish(long shutdownTimeoutInMs) throws InterruptedException { | |||
// Workers have some time to complete their in progress tasks | |||
long until = System.currentTimeMillis() + shutdownTimeoutInMs; | |||
LOG.debug("Waiting for workers to finish in-progress tasks for at most {}ms", shutdownTimeoutInMs); | |||
while (System.currentTimeMillis() < until && ceWorkerController.hasAtLeastOneProcessingWorker()) { | |||
Thread.sleep(200L); | |||
} | |||
} | |||
private void requestAllWorkersToStop() { | |||
// Requesting all workers to stop | |||
Arrays.stream(chainingCallbacks).forEach(t -> t.stop(false)); | |||
} | |||
private class ChainingCallback implements FutureCallback<CeWorker.Result> { | |||
private volatile boolean keepRunning = true; | |||
private volatile boolean interrupted = false; | |||
private final CeWorker worker; | |||
@CheckForNull | |||
@@ -170,8 +207,13 @@ public class CeProcessingSchedulerImpl implements CeProcessingScheduler { | |||
public void stop(boolean interrupt) { | |||
keepRunning = false; | |||
if (workerFuture != null) { | |||
interrupted = true; | |||
workerFuture.cancel(interrupt); | |||
} | |||
} | |||
public boolean isInterrupted() { | |||
return interrupted; | |||
} | |||
} | |||
} |
@@ -44,6 +44,36 @@ public class ComputeEngineImplTest { | |||
underTest.startup(); | |||
} | |||
@Test | |||
public void stopProcessing_throws_ISE_if_startup_was_not_called_before() { | |||
expectedException.expect(IllegalStateException.class); | |||
expectedException.expectMessage("stopProcessing() must not be called before startup()"); | |||
underTest.stopProcessing(); | |||
} | |||
@Test | |||
public void stopProcessing_throws_ISE_if_called_after_shutdown() { | |||
underTest.startup(); | |||
underTest.shutdown(); | |||
expectedException.expect(IllegalStateException.class); | |||
expectedException.expectMessage("stopProcessing() can not be called after shutdown()"); | |||
underTest.stopProcessing(); | |||
} | |||
@Test | |||
public void stopProcessing_throws_ISE_if_called_twice() { | |||
underTest.startup(); | |||
underTest.stopProcessing(); | |||
expectedException.expect(IllegalStateException.class); | |||
expectedException.expectMessage("stopProcessing() can not be called multiple times"); | |||
underTest.stopProcessing(); | |||
} | |||
@Test | |||
public void shutdown_throws_ISE_if_startup_was_not_called_before() { | |||
expectedException.expect(IllegalStateException.class); | |||
@@ -73,6 +103,11 @@ public class ComputeEngineImplTest { | |||
return this; | |||
} | |||
@Override | |||
public ComputeEngineContainer stopWorkers() { | |||
return this; | |||
} | |||
@Override | |||
public ComputeEngineContainer stop() { | |||
return this; |
@@ -20,7 +20,6 @@ | |||
package org.sonar.ce.app; | |||
import com.google.common.base.MoreObjects; | |||
import java.io.IOException; | |||
import java.util.concurrent.CountDownLatch; | |||
import javax.annotation.CheckForNull; | |||
import javax.annotation.Nullable; | |||
@@ -64,7 +63,7 @@ public class CeServerTest { | |||
} | |||
@Test | |||
public void constructor_does_not_start_a_new_Thread() throws IOException { | |||
public void constructor_does_not_start_a_new_Thread() { | |||
int activeCount = Thread.activeCount(); | |||
newCeServer(); | |||
@@ -73,7 +72,7 @@ public class CeServerTest { | |||
} | |||
@Test | |||
public void start_starts_a_new_Thread() throws IOException { | |||
public void start_starts_a_new_Thread() { | |||
int activeCount = Thread.activeCount(); | |||
newCeServer().start(); | |||
@@ -82,7 +81,7 @@ public class CeServerTest { | |||
} | |||
@Test | |||
public void start_throws_ISE_when_called_twice() throws IOException { | |||
public void start_throws_ISE_when_called_twice() { | |||
CeServer ceServer = newCeServer(); | |||
ceServer.start(); | |||
@@ -94,7 +93,7 @@ public class CeServerTest { | |||
} | |||
@Test | |||
public void getStatus_throws_ISE_when_called_before_start() throws IOException { | |||
public void getStatus_throws_ISE_when_called_before_start() { | |||
CeServer ceServer = newCeServer(); | |||
expectedException.expect(IllegalStateException.class); | |||
@@ -104,7 +103,7 @@ public class CeServerTest { | |||
} | |||
@Test | |||
public void getStatus_does_not_return_OPERATIONAL_until_ComputeEngine_startup_returns() throws IOException { | |||
public void getStatus_does_not_return_OPERATIONAL_until_ComputeEngine_startup_returns() { | |||
BlockingStartupComputeEngine computeEngine = new BlockingStartupComputeEngine(null); | |||
CeServer ceServer = newCeServer(computeEngine); | |||
@@ -122,7 +121,7 @@ public class CeServerTest { | |||
} | |||
@Test | |||
public void getStatus_returns_OPERATIONAL_when_ComputeEngine_startup_throws_any_Exception_or_Error() throws IOException { | |||
public void getStatus_returns_OPERATIONAL_when_ComputeEngine_startup_throws_any_Exception_or_Error() { | |||
Throwable startupException = new Throwable("Faking failing ComputeEngine#startup()"); | |||
BlockingStartupComputeEngine computeEngine = new BlockingStartupComputeEngine(startupException); | |||
@@ -142,7 +141,7 @@ public class CeServerTest { | |||
} | |||
@Test | |||
public void awaitStop_throws_ISE_if_called_before_start() throws IOException { | |||
public void awaitStop_throws_ISE_if_called_before_start() { | |||
CeServer ceServer = newCeServer(); | |||
expectedException.expect(IllegalStateException.class); | |||
@@ -152,7 +151,7 @@ public class CeServerTest { | |||
} | |||
@Test | |||
public void awaitStop_throws_ISE_if_called_twice() throws IOException { | |||
public void awaitStop_throws_ISE_if_called_twice() { | |||
final CeServer ceServer = newCeServer(); | |||
ExceptionCatcherWaitingThread waitingThread1 = new ExceptionCatcherWaitingThread(ceServer); | |||
ExceptionCatcherWaitingThread waitingThread2 = new ExceptionCatcherWaitingThread(ceServer); | |||
@@ -197,16 +196,21 @@ public class CeServerTest { | |||
} | |||
@Test | |||
public void awaitStop_unblocks_when_waiting_for_ComputeEngine_startup_fails() throws IOException { | |||
public void awaitStop_unblocks_when_waiting_for_ComputeEngine_startup_fails() { | |||
CeServer ceServer = newCeServer(new ComputeEngine() { | |||
@Override | |||
public void startup() { | |||
throw new Error("Faking ComputeEngine.startup() failing"); | |||
} | |||
@Override | |||
public void stopProcessing() { | |||
throw new UnsupportedOperationException("stopProcessing() should never be called in this test"); | |||
} | |||
@Override | |||
public void shutdown() { | |||
throw new UnsupportedOperationException("shutdown() should never be called in this context"); | |||
throw new UnsupportedOperationException("shutdown() should never be called in this test"); | |||
} | |||
}); | |||
@@ -216,13 +220,18 @@ public class CeServerTest { | |||
} | |||
@Test | |||
public void stop_releases_thread_in_awaitStop_even_when_ComputeEngine_shutdown_fails() throws InterruptedException, IOException { | |||
public void stop_releases_thread_in_awaitStop_even_when_ComputeEngine_shutdown_fails() throws InterruptedException { | |||
final CeServer ceServer = newCeServer(new ComputeEngine() { | |||
@Override | |||
public void startup() { | |||
// nothing to do at startup | |||
} | |||
@Override | |||
public void stopProcessing() { | |||
throw new UnsupportedOperationException("stopProcessing should not be called in this test"); | |||
} | |||
@Override | |||
public void shutdown() { | |||
throw new Error("Faking ComputeEngine.shutdown() failing"); | |||
@@ -238,7 +247,7 @@ public class CeServerTest { | |||
waitingThread.join(); | |||
} | |||
private CeServer newCeServer() throws IOException { | |||
private CeServer newCeServer() { | |||
return newCeServer(DoNothingComputeEngine.INSTANCE); | |||
} | |||
@@ -281,6 +290,11 @@ public class CeServerTest { | |||
} | |||
} | |||
@Override | |||
public void stopProcessing() { | |||
// do nothing | |||
} | |||
@Override | |||
public void shutdown() { | |||
// do nothing | |||
@@ -323,6 +337,11 @@ public class CeServerTest { | |||
// do nothing | |||
} | |||
@Override | |||
public void stopProcessing() { | |||
// do nothing | |||
} | |||
@Override | |||
public void shutdown() { | |||
// do nothing |
@@ -62,7 +62,7 @@ import static org.mockito.Mockito.verify; | |||
import static org.mockito.Mockito.verifyZeroInteractions; | |||
import static org.mockito.Mockito.when; | |||
import static org.sonar.ce.container.ComputeEngineStatus.Status.STARTED; | |||
import static org.sonar.ce.container.ComputeEngineStatus.Status.HARD_STOPPING; | |||
import static org.sonar.ce.container.ComputeEngineStatus.Status.STOPPING; | |||
public class InternalCeQueueImplTest { | |||
@@ -407,7 +407,7 @@ public class InternalCeQueueImplTest { | |||
@Test | |||
public void peek_nothing_if_application_status_stopping() { | |||
submit(CeTaskTypes.REPORT, newProjectDto("PROJECT_1")); | |||
when(computeEngineStatus.getStatus()).thenReturn(HARD_STOPPING); | |||
when(computeEngineStatus.getStatus()).thenReturn(STOPPING); | |||
Optional<CeTask> peek = underTest.peek(WORKER_UUID_1); | |||
assertThat(peek.isPresent()).isFalse(); |
@@ -163,6 +163,44 @@ public class CeProcessingSchedulerImplTest { | |||
regularDelayedPoll); | |||
} | |||
@Test | |||
public void gracefulStopScheduling_cancels_next_polling_and_does_not_add_any_new_one() throws Exception { | |||
when(ceWorker.call()) | |||
.thenReturn(NO_TASK) | |||
.thenReturn(TASK_PROCESSED) | |||
.thenReturn(NO_TASK) | |||
.thenReturn(NO_TASK) | |||
.thenReturn(NO_TASK) | |||
.thenReturn(NO_TASK) | |||
.thenReturn(NO_TASK) | |||
.thenThrow(ERROR_TO_INTERRUPT_CHAINING); | |||
underTest.startScheduling(); | |||
int cancelledTaskFutureCount = 0; | |||
int i = 0; | |||
while (processingExecutorService.futures.peek() != null) { | |||
Future<?> future = processingExecutorService.futures.poll(); | |||
if (future.isCancelled()) { | |||
cancelledTaskFutureCount++; | |||
} else { | |||
future.get(); | |||
} | |||
// call for graceful after second delayed polling | |||
if (i == 1) { | |||
underTest.gracefulStopScheduling(); | |||
} | |||
i++; | |||
} | |||
assertThat(cancelledTaskFutureCount).isEqualTo(1); | |||
assertThat(processingExecutorService.getSchedulerCalls()).containsExactly( | |||
regularDelayedPoll, | |||
regularDelayedPoll, | |||
notDelayedPoll, | |||
regularDelayedPoll); | |||
} | |||
@Test | |||
public void stopScheduling_cancels_next_polling_and_does_not_add_any_new_one() throws Exception { | |||
when(ceWorker.call()) | |||
@@ -188,7 +226,7 @@ public class CeProcessingSchedulerImplTest { | |||
} | |||
// call stop after second delayed polling | |||
if (i == 1) { | |||
underTest.stopScheduling(); | |||
underTest.hardStopScheduling(); | |||
} | |||
i++; | |||
} |