import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableMap;
-import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.CountDownLatch;
import javax.annotation.Nullable;
import org.sonar.api.utils.log.Logger;
import org.sonar.api.utils.log.Loggers;
import org.sonar.process.Props;
import static com.google.common.base.Preconditions.checkState;
-import static org.sonar.process.ProcessUtils.awaitTermination;
/**
* The Compute Engine server which starts a daemon thread to run the {@link ComputeEngineImpl} when it's {@link #start()}
private static final String CE_MAIN_THREAD_NAME = "ce-main";
- /**
- * Thread that currently is inside our await() method.
- */
- private AtomicReference<Thread> awaitThread = new AtomicReference<>();
- private volatile boolean stopAwait = false;
+ private CountDownLatch awaitStop = new CountDownLatch(1);
private final ComputeEngine computeEngine;
@Nullable
@Override
public void awaitStop() {
- checkState(awaitThread.compareAndSet(null, Thread.currentThread()), "There can't be more than one thread waiting for the Compute Engine to stop");
checkState(ceMainThread != null, "awaitStop() must not be called before start()");
-
- try {
- while (!stopAwait) {
- try {
- // wait for a quite long time but we will be interrupted if flag changes anyway
- Thread.sleep(10_000);
- } catch (InterruptedException e) {
- // continue and check the flag
- }
+ while (true) {
+ try {
+ awaitStop.await();
+ return;
+ } catch (InterruptedException e) {
+ // abort waiting
}
- } finally {
- awaitThread = null;
}
}
@Override
public void stop() {
- computeEngine.stopProcessing();
- hardStop();
+ if (ceMainThread != null) {
+ ceMainThread.stopIt();
+ awaitStop();
+ }
}
@Override
public void hardStop() {
if (ceMainThread != null) {
- // signal main Thread to stop
- ceMainThread.stopIt();
- awaitTermination(ceMainThread);
+ ceMainThread.stopItNow();
+ awaitStop();
}
}
}
private class CeMainThread extends Thread {
- private static final int CHECK_FOR_STOP_DELAY = 50;
- private volatile boolean stop = false;
+ private final CountDownLatch stopSignal = new CountDownLatch(1);
private volatile boolean started = false;
private volatile boolean operational = false;
+ private volatile boolean hardStop = false;
+ private volatile boolean dontInterrupt = false;
public CeMainThread() {
super(CE_MAIN_THREAD_NAME);
boolean startupSuccessful = attemptStartup();
this.operational = startupSuccessful;
this.started = true;
- if (startupSuccessful) {
- // call below is blocking
- waitForStopSignal();
- } else {
- stopAwait();
+ try {
+ if (startupSuccessful) {
+ try {
+ stopSignal.await();
+ } catch (InterruptedException e) {
+ // don't restore interrupt flag since it would be unset in attemptShutdown anyway
+ }
+
+ attemptShutdown();
+ }
+ } finally {
+ // release thread(s) waiting for CeServer to stop
+ signalAwaitStop();
}
}
private boolean attemptStartup() {
try {
- startup();
+ LOG.info("Compute Engine starting up...");
+ computeEngine.startup();
+ LOG.info("Compute Engine is operational");
return true;
} catch (org.sonar.api.utils.MessageException | org.sonar.process.MessageException e) {
LOG.error("Compute Engine startup failed: " + e.getMessage());
}
}
- private void startup() {
- LOG.info("Compute Engine starting up...");
- computeEngine.startup();
- LOG.info("Compute Engine is operational");
- }
-
- private void waitForStopSignal() {
- while (!stop) {
- try {
- Thread.sleep(CHECK_FOR_STOP_DELAY);
- } catch (InterruptedException e) {
- // ignore the interruption itself
- // Do not propagate the isInterrupted flag with Thread.currentThread().interrupt()
- // It will break the shutdown of ComputeEngineContainerImpl#stop()
- }
- }
- attemptShutdown();
- }
-
private void attemptShutdown() {
try {
LOG.info("Compute Engine is stopping...");
+ if (!hardStop) {
+ computeEngine.stopProcessing();
+ }
+ dontInterrupt = true;
+ // make sure that interrupt flag is unset because we don't want to interrupt shutdown of pico container
+ interrupted();
computeEngine.shutdown();
LOG.info("Compute Engine is stopped");
} catch (Throwable e) {
LOG.error("Compute Engine failed to stop", e);
- } finally {
- // release thread waiting for CeServer
- stopAwait();
}
}
}
public void stopIt() {
- // stop looping indefinitely
- this.stop = true;
- // interrupt current thread in case its waiting for WebServer
- // TODO is the waiting during startup or shutdown? this will most likely cause the shutdown to always fail to finish cleanly
- interrupt();
+ stopSignal.countDown();
}
- private void stopAwait() {
- stopAwait = true;
- Thread t = awaitThread.get();
- if (t != null) {
- t.interrupt();
- try {
- t.join(1_000);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- // Ignored
- }
+ public void stopItNow() {
+ hardStop = true;
+ stopSignal.countDown();
+ // interrupt current thread unless it's already performing shutdown
+ if (!dontInterrupt) {
+ interrupt();
}
}
+
+ private void signalAwaitStop() {
+ awaitStop.countDown();
+ }
}
}
*/
package org.sonar.ce.app;
-import com.google.common.base.MoreObjects;
import java.util.concurrent.CountDownLatch;
import javax.annotation.CheckForNull;
import javax.annotation.Nullable;
assertThat(Thread.activeCount()).isSameAs(activeCount);
}
+ @Test
+ public void awaitStop_throws_ISE_if_called_before_start() {
+ CeServer ceServer = newCeServer();
+
+ expectedException.expect(IllegalStateException.class);
+ expectedException.expectMessage("awaitStop() must not be called before start()");
+
+ ceServer.awaitStop();
+ }
+
@Test
public void start_starts_a_new_Thread() {
int activeCount = Thread.activeCount();
assertThat(ceServer.getStatus()).isEqualTo(Monitored.Status.OPERATIONAL);
}
- @Test
- public void awaitStop_throws_ISE_if_called_before_start() {
- CeServer ceServer = newCeServer();
-
- expectedException.expect(IllegalStateException.class);
- expectedException.expectMessage("awaitStop() must not be called before start()");
-
- ceServer.awaitStop();
- }
-
- @Test
- public void awaitStop_throws_ISE_if_called_twice() {
- final CeServer ceServer = newCeServer();
- ExceptionCatcherWaitingThread waitingThread1 = new ExceptionCatcherWaitingThread(ceServer);
- ExceptionCatcherWaitingThread waitingThread2 = new ExceptionCatcherWaitingThread(ceServer);
-
- ceServer.start();
-
- waitingThread1.start();
- waitingThread2.start();
-
- while (waitingThread1.isAlive() && waitingThread2.isAlive()) {
- // wait for either thread to stop because ceServer.awaitStop() failed with an exception
- // if none stops, the test will fail with timeout
- }
-
- Exception exception = MoreObjects.firstNonNull(waitingThread1.getException(), waitingThread2.getException());
- assertThat(exception)
- .isInstanceOf(IllegalStateException.class)
- .hasMessage("There can't be more than one thread waiting for the Compute Engine to stop");
-
- assertThat(waitingThread1.getException() != null && waitingThread2.getException() != null).isFalse();
- }
-
@Test
public void awaitStop_keeps_blocking_calling_thread_even_if_calling_thread_is_interrupted_but_until_stop_is_called() throws Exception {
final CeServer ceServer = newCeServer();
}
}
- private static class ExceptionCatcherWaitingThread extends Thread {
- private final CeServer ceServer;
- @CheckForNull
- private Exception exception = null;
-
- public ExceptionCatcherWaitingThread(CeServer ceServer) {
- this.ceServer = ceServer;
- }
-
- @Override
- public void run() {
- try {
- ceServer.awaitStop();
- } catch (Exception e) {
- this.exception = e;
- }
- }
-
- @CheckForNull
- public Exception getException() {
- return exception;
- }
- }
-
private enum DoNothingComputeEngine implements ComputeEngine {
INSTANCE;
+++ /dev/null
-/*
- * SonarQube
- * Copyright (C) 2009-2019 SonarSource SA
- * mailto:info AT sonarsource DOT com
- *
- * This program is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License as published by the Free Software Foundation; either
- * version 3 of the License, or (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public License
- * along with this program; if not, write to the Free Software Foundation,
- * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
- */
-package org.sonar.process;
-
-import javax.annotation.Nullable;
-
-public class ProcessUtils {
-
- private ProcessUtils() {
- // only static stuff
- }
-
- public static void awaitTermination(@Nullable Thread t) {
- if (t == null || Thread.currentThread() == t) {
- return;
- }
-
- while (t.isAlive()) {
- try {
- t.join();
- } catch (InterruptedException e) {
- // ignore, keep on waiting for t to stop
- Thread.currentThread().interrupt();
- }
- }
- }
-}
+++ /dev/null
-/*
- * SonarQube
- * Copyright (C) 2009-2019 SonarSource SA
- * mailto:info AT sonarsource DOT com
- *
- * This program is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License as published by the Free Software Foundation; either
- * version 3 of the License, or (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public License
- * along with this program; if not, write to the Free Software Foundation,
- * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
- */
-package org.sonar.process;
-
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.DisableOnDebug;
-import org.junit.rules.TestRule;
-import org.junit.rules.Timeout;
-import org.sonar.test.TestUtils;
-
-import static org.assertj.core.api.Assertions.assertThat;
-import static org.sonar.process.ProcessUtils.awaitTermination;
-
-public class ProcessUtilsTest {
-
- @Rule
- public TestRule safeguardTimeout = new DisableOnDebug(Timeout.seconds(60));
-
- @Test
- public void private_constructor() {
- assertThat(TestUtils.hasOnlyPrivateConstructors(ProcessUtils.class)).isTrue();
- }
-
- @Test
- public void awaitTermination_does_not_fail_on_null_Thread_argument() {
- awaitTermination(null);
- }
-
- @Test
- public void awaitTermination_does_not_wait_on_currentThread() {
- awaitTermination(Thread.currentThread());
- }
-
- @Test
- public void awaitTermination_ignores_interrupted_exception_of_current_thread() throws InterruptedException {
- final EverRunningThread runningThread = new EverRunningThread();
- final Thread safeJoiner = new Thread(() -> awaitTermination(runningThread));
- final Thread simpleJoiner = new Thread(() -> {
- try {
- runningThread.join();
- } catch (InterruptedException e) {
- System.err.println("runningThread interruption detected in SimpleJoiner");
- }
- });
- runningThread.start();
- safeJoiner.start();
- simpleJoiner.start();
-
- // interrupt safeJoiner _before simpleJoiner to work around some arbitrary sleep delay_ which should not stop watching
- safeJoiner.interrupt();
-
- // interrupting simpleJoiner which should stop
- simpleJoiner.interrupt();
-
- while (simpleJoiner.isAlive()) {
- // wait for simpleJoiner to stop
- }
-
- // safeJoiner must still be alive
- assertThat(safeJoiner.isAlive()).isTrue();
-
- // stop runningThread
- runningThread.stopIt();
-
- while (runningThread.isAlive()) {
- // wait for runningThread to stop
- }
-
- // wait for safeJoiner to stop because runningThread has stopped, if it doesn't, the test will fail with a timeout
- safeJoiner.join();
- }
-
- private static class EverRunningThread extends Thread {
- private volatile boolean stop = false;
-
- @Override
- public void run() {
- while (!stop) {
- // infinite loop!
- }
- }
-
- public void stopIt() {
- this.stop = true;
- }
- }
-
-}