diff options
author | Sébastien Lesaint <sebastien.lesaint@sonarsource.com> | 2016-01-07 18:20:00 +0100 |
---|---|---|
committer | Sébastien Lesaint <sebastien.lesaint@sonarsource.com> | 2016-01-13 13:42:43 +0100 |
commit | 1d49769ae87ac8a2a553815bf2bfc9bf17a85f8f (patch) | |
tree | 41a20b8882d1b69f0b48ae7adea9b0b4ab877558 /server/sonar-process-monitor | |
parent | 10ddbf28d97a7b1626706efdf2cb88893b34ed4a (diff) | |
download | sonarqube-1d49769ae87ac8a2a553815bf2bfc9bf17a85f8f.tar.gz sonarqube-1d49769ae87ac8a2a553815bf2bfc9bf17a85f8f.zip |
SONAR-7168 rework threads and restart implementation
Diffstat (limited to 'server/sonar-process-monitor')
5 files changed, 193 insertions, 350 deletions
diff --git a/server/sonar-process-monitor/src/main/java/org/sonar/process/monitor/Monitor.java b/server/sonar-process-monitor/src/main/java/org/sonar/process/monitor/Monitor.java index 2655bb01349..98498c0e17e 100644 --- a/server/sonar-process-monitor/src/main/java/org/sonar/process/monitor/Monitor.java +++ b/server/sonar-process-monitor/src/main/java/org/sonar/process/monitor/Monitor.java @@ -20,9 +20,12 @@ package org.sonar.process.monitor; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.concurrent.CopyOnWriteArrayList; import javax.annotation.CheckForNull; +import javax.annotation.Nullable; +import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.sonar.process.Lifecycle; import org.sonar.process.Lifecycle.State; @@ -30,24 +33,26 @@ import org.sonar.process.ProcessCommands; import org.sonar.process.SystemExit; public class Monitor { + + private static final Logger LOG = LoggerFactory.getLogger(Monitor.class); private static final Timeouts TIMEOUTS = new Timeouts(); + private static final long WATCH_DELAY_MS = 500L; + + private static int restartorInstanceCounter = 0; - private final List<ProcessRef> processes = new CopyOnWriteArrayList<>(); private final JavaProcessLauncher launcher; private final SystemExit systemExit; - private Thread shutdownHook = new Thread(new MonitorShutdownHook(), "Monitor Shutdown Hook"); + private final Thread shutdownHook = new Thread(new MonitorShutdownHook(), "Monitor Shutdown Hook"); - // used by awaitStop() to block until all processes are shutdown - private List<WatcherThread> watcherThreads = new CopyOnWriteArrayList<>(); + private final List<WatcherThread> watcherThreads = new CopyOnWriteArrayList<>(); + private final Lifecycle lifecycle = new Lifecycle(); + private final TerminatorThread terminator = new TerminatorThread(); + private final RestartRequestWatcherThread restartWatcher = new RestartRequestWatcherThread(); @CheckForNull private List<JavaCommand> javaCommands; @CheckForNull - private Lifecycle lifecycle; - @CheckForNull - private RestartRequestWatcherThread restartWatcher; - @CheckForNull - private TerminatorThread terminator; + private RestartorThread restartor; static int nextProcessId = 1; Monitor(JavaProcessLauncher launcher, SystemExit exit) { @@ -70,43 +75,37 @@ public class Monitor { throw new IllegalArgumentException("At least one command is required"); } - if (lifecycle != null) { + if (lifecycle.getState() != State.INIT) { throw new IllegalStateException("Can not start multiple times"); } // intercepts CTRL-C Runtime.getRuntime().addShutdownHook(shutdownHook); + this.restartWatcher.start(); this.javaCommands = commands; - start(); - } - - private void start() { - resetState(); - List<ProcessRef> processRefs = startAndMonitorProcesses(); - startWatchingForRestartRequests(processRefs); + startProcesses(); } - private void resetState() { - this.lifecycle = new Lifecycle(); - lifecycle.tryToMoveTo(State.STARTING); - this.watcherThreads.clear(); + private void startProcesses() { + // do no start any child process if not in state INIT or RESTARTING (a stop could be in progress too) + if (lifecycle.tryToMoveTo(State.STARTING)) { + startAndMonitorProcesses(); + stopIfAnyProcessDidNotStart(); + } } - private List<ProcessRef> startAndMonitorProcesses() { - List<ProcessRef> processRefs = new ArrayList<>(javaCommands.size()); + private void startAndMonitorProcesses() { for (JavaCommand command : javaCommands) { try { ProcessRef processRef = launcher.launch(command); monitor(processRef); - processRefs.add(processRef); } catch (RuntimeException e) { // fail to start or to monitor stop(); throw e; } } - return processRefs; } private void monitor(ProcessRef processRef) { @@ -115,19 +114,14 @@ public class Monitor { watcherThread.start(); watcherThreads.add(watcherThread); - processes.add(processRef); - // wait for process to be ready (accept requests or so on) processRef.waitForReady(); - LoggerFactory.getLogger(getClass()).info(String.format("%s is up", processRef)); + LOG.info("{} is up", processRef); } - private void startWatchingForRestartRequests(List<ProcessRef> processRefs) { - if (lifecycle.tryToMoveTo(State.STARTED)) { - stopRestartWatcher(); - startRestartWatcher(processRefs); - } else { + private void stopIfAnyProcessDidNotStart() { + if (!lifecycle.tryToMoveTo(State.STARTED)) { // stopping or stopped during startup, for instance : // 1. A is started // 2. B starts @@ -138,60 +132,43 @@ public class Monitor { } } - private void stopRestartWatcher() { - if (this.restartWatcher != null) { - this.restartWatcher.stopWatching(); - try { - this.restartWatcher.join(); - } catch (InterruptedException e) { - // failed to cleanly stop (very unlikely), ignore and proceed - } - } - } - - private void startRestartWatcher(List<ProcessRef> processRefs) { - this.restartWatcher = new RestartRequestWatcherThread(this, processRefs); - this.restartWatcher.start(); - } - /** * Blocks until all processes are terminated */ public void awaitTermination() { - while (awaitTerminationImpl()) { - LoggerFactory.getLogger(RestartRequestWatcherThread.class).info("Restarting SQ..."); - start(); + while (awaitChildProcessesTermination()) { + trace("await termination of restartor..."); + awaitTermination(restartor); } - stopRestartWatcher(); + cleanAfterTermination(); } boolean waitForOneRestart() { - boolean restartRequested = awaitTerminationImpl(); + boolean restartRequested = awaitChildProcessesTermination(); if (restartRequested) { - start(); + awaitTermination(restartor); } return restartRequested; } - private boolean awaitTerminationImpl() { - for (WatcherThread watcherThread : watcherThreads) { - while (watcherThread.isAlive()) { - try { - watcherThread.join(); - } catch (InterruptedException ignored) { - // ignore, stop blocking - } - } + private boolean awaitChildProcessesTermination() { + trace("await termination of child processes..."); + List<WatcherThread> watcherThreadsCopy = new ArrayList<>(this.watcherThreads); + for (WatcherThread watcherThread : watcherThreadsCopy) { + awaitTermination(watcherThread); } - return hasRestartBeenRequested(); + trace("all child processes done"); + return hasRestartBeenRequested(watcherThreadsCopy); } - private boolean hasRestartBeenRequested() { + private static boolean hasRestartBeenRequested(List<WatcherThread> watcherThreads) { for (WatcherThread watcherThread : watcherThreads) { if (watcherThread.isAskedForRestart()) { + trace("one child process requested restart"); return true; } } + trace("no child process requested restart"); return false; } @@ -199,19 +176,30 @@ public class Monitor { * Blocks until all processes are terminated. */ public void stop() { + trace("start stop async..."); stopAsync(); - try { - terminator.join(); - } catch (InterruptedException ignored) { - // stop blocking and exiting - } - // safeguard if TerminatorThread is buggy - lifecycle.tryToMoveTo(State.STOPPED); - // cleanly stop restart watcher - stopRestartWatcher(); + trace("await termination of terminator..."); + awaitTermination(terminator); + cleanAfterTermination(); + trace("exit..."); systemExit.exit(0); } + private void cleanAfterTermination() { + trace("go to STOPPED..."); + // safeguard if TerminatorThread is buggy and stop restartWatcher + lifecycle.tryToMoveTo(State.STOPPED); + trace("await termination of restartWatcher..."); + // wait for restartWatcher to cleanly stop + awaitTermination(restartWatcher); + trace("restartWatcher done"); + // removing shutdown hook to avoid called stop() unnecessarily unless already in shutdownHook + if (!systemExit.isInShutdownHook()) { + trace("removing shutdown hook..."); + Runtime.getRuntime().removeShutdownHook(shutdownHook); + } + } + /** * Asks for processes termination and returns without blocking until termination. * However, if a termination request is already under way (it's not supposed to happen, but, technically, it can occur), @@ -219,17 +207,117 @@ public class Monitor { */ public void stopAsync() { if (lifecycle.tryToMoveTo(State.STOPPING)) { - if (terminator != null) { + terminator.start(); + } + } + + public void restartAsync() { + if (lifecycle.tryToMoveTo(State.RESTARTING)) { + restartor = new RestartorThread(); + restartor.start(); + } + } + + /** + * Runs every time a restart request is detected. + */ + private class RestartorThread extends Thread { + + private RestartorThread() { + super("Restartor " + (restartorInstanceCounter++)); + } + + @Override + public void run() { + stopProcesses(); + startProcesses(); + } + } + + /** + * Runs only once + */ + private class TerminatorThread extends Thread { + + private TerminatorThread() { + super("Terminator"); + } + + @Override + public void run() { + stopProcesses(); + } + } + + /** + * Watches for any child process requesting a restart of all children processes. + * It runs once and as long as {@link #lifecycle} hasn't reached {@link Lifecycle.State#STOPPED} and holds its checks + * when {@link #lifecycle} is not in state {@link Lifecycle.State#STARTED} to avoid taking the same request into account + * twice. + */ + public class RestartRequestWatcherThread extends Thread { + public RestartRequestWatcherThread() { + super("Restart watcher"); + } + + @Override + public void run() { + while (lifecycle.getState() != Lifecycle.State.STOPPED) { + if (lifecycle.getState() == Lifecycle.State.STARTED && didAnyProcessRequestRestart()) { + restartAsync(); + } try { - terminator.join(); - } catch (InterruptedException e) { - // stop waiting for thread to complete and continue with creating a new one + Thread.sleep(WATCH_DELAY_MS); + } catch (InterruptedException ignored) { + // keep watching } } - terminator = new TerminatorThread(TIMEOUTS); - terminator.setProcesses(processes); - terminator.start(); } + + private boolean didAnyProcessRequestRestart() { + for (WatcherThread watcherThread : watcherThreads) { + ProcessRef processRef = watcherThread.getProcessRef(); + if (processRef.getCommands().askedForRestart()) { + LOG.info("Process [{}] requested restart", processRef.getKey()); + return true; + } + } + return false; + } + + } + + private void stopProcesses() { + ArrayList<WatcherThread> watcherThreads = new ArrayList<>(this.watcherThreads); + // create a copy and reverse it to terminate in reverse order of startup (dependency order) + Collections.reverse(watcherThreads); + + for (WatcherThread watcherThread : watcherThreads) { + ProcessRef ref = watcherThread.getProcessRef(); + if (!ref.isStopped()) { + LOG.info("{} is stopping", ref); + ref.askForGracefulAsyncStop(); + + long killAt = System.currentTimeMillis() + TIMEOUTS.getTerminationTimeout(); + while (!ref.isStopped() && System.currentTimeMillis() < killAt) { + try { + Thread.sleep(10L); + } catch (InterruptedException e) { + // stop asking for graceful stops, Monitor will hardly kill all processes + break; + } + } + if (!ref.isStopped()) { + LOG.info("{} failed to stop in a timely fashion. Killing it.", ref); + } + ref.stop(); + LOG.info("{} is stopped", ref); + } + } + + // all processes are stopped, no need to keep references to these WatcherThread anymore + trace("all processes stopped, clean list of watcherThreads..."); + this.watcherThreads.clear(); } public State getState() { @@ -240,19 +328,34 @@ public class Monitor { return shutdownHook; } - public void restartAsync() { - stopAsync(); - } - private class MonitorShutdownHook implements Runnable { @Override public void run() { systemExit.setInShutdownHook(); + trace("calling stop from MonitorShutdownHook..."); // blocks until everything is corrected terminated stop(); } } + private static void awaitTermination(@Nullable Thread t) { + if (t == null) { + return; + } + + while (t.isAlive()) { + try { + t.join(); + } catch (InterruptedException e) { + // ignore and stop blocking + } + } + } + + private static void trace(String s) { + System.err.println("APP: " + s); + } + public static int getNextProcessId() { if (nextProcessId >= ProcessCommands.MAX_PROCESSES) { throw new IllegalStateException("The maximum number of processes launched has been reached " + ProcessCommands.MAX_PROCESSES); diff --git a/server/sonar-process-monitor/src/main/java/org/sonar/process/monitor/RestartRequestWatcherThread.java b/server/sonar-process-monitor/src/main/java/org/sonar/process/monitor/RestartRequestWatcherThread.java deleted file mode 100644 index 394b477cb38..00000000000 --- a/server/sonar-process-monitor/src/main/java/org/sonar/process/monitor/RestartRequestWatcherThread.java +++ /dev/null @@ -1,75 +0,0 @@ -/* - * SonarQube :: Process Monitor - * Copyright (C) 2009-2016 SonarSource SA - * mailto:contact AT sonarsource DOT com - * - * This program is free software; you can redistribute it and/or - * modify it under the terms of the GNU Lesser General Public - * License as published by the Free Software Foundation; either - * version 3 of the License, or (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - * Lesser General Public License for more details. - * - * You should have received a copy of the GNU Lesser General Public License - * along with this program; if not, write to the Free Software Foundation, - * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. - */ -package org.sonar.process.monitor; - -import java.util.List; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import static java.util.Objects.requireNonNull; - -public class RestartRequestWatcherThread extends Thread { - private static final Logger LOG = LoggerFactory.getLogger(RestartRequestWatcherThread.class); - private static int instanceCounter = 0; - - private final Monitor monitor; - private final List<ProcessRef> processes; - private final long delayMs; - - private boolean watching = true; - - public RestartRequestWatcherThread(Monitor monitor, List<ProcessRef> processes) { - this(monitor, processes, 500); - } - - public RestartRequestWatcherThread(Monitor monitor, List<ProcessRef> processes, long delayMs) { - super("Restart watcher " + (instanceCounter++)); - this.monitor = requireNonNull(monitor, "monitor can not be null"); - this.processes = requireNonNull(processes, "processes can not be null"); - this.delayMs = delayMs; - } - - @Override - public void run() { - while (watching) { - for (ProcessRef processCommands : processes) { - if (processCommands.getCommands().askedForRestart()) { - LOG.info("Process [{}] requested restart", processCommands.getKey()); - monitor.restartAsync(); - watching = false; - } else { - try { - Thread.sleep(delayMs); - } catch (InterruptedException ignored) { - // keep watching - } - } - } - } - } - - public void stopWatching() { - this.watching = false; - } - - public boolean isWatching() { - return watching; - } -} diff --git a/server/sonar-process-monitor/src/main/java/org/sonar/process/monitor/TerminatorThread.java b/server/sonar-process-monitor/src/main/java/org/sonar/process/monitor/TerminatorThread.java deleted file mode 100644 index ab0a08df27f..00000000000 --- a/server/sonar-process-monitor/src/main/java/org/sonar/process/monitor/TerminatorThread.java +++ /dev/null @@ -1,76 +0,0 @@ -/* - * SonarQube :: Process Monitor - * Copyright (C) 2009-2016 SonarSource SA - * mailto:contact AT sonarsource DOT com - * - * This program is free software; you can redistribute it and/or - * modify it under the terms of the GNU Lesser General Public - * License as published by the Free Software Foundation; either - * version 3 of the License, or (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - * Lesser General Public License for more details. - * - * You should have received a copy of the GNU Lesser General Public License - * along with this program; if not, write to the Free Software Foundation, - * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. - */ -package org.sonar.process.monitor; - -import org.slf4j.LoggerFactory; - -import java.util.Collections; -import java.util.List; - -/** - * Terminates all monitored processes. Tries to gracefully terminate each process, - * then kill if timeout expires. Ping monitoring is disabled so process auto kills (self graceful termination, else self kill) - * if it does not receive the termination request. - */ -class TerminatorThread extends Thread { - private static int instanceCounter = 0; - - private final Timeouts timeouts; - private List<ProcessRef> processes = Collections.emptyList(); - - TerminatorThread(Timeouts timeouts) { - super("Terminator " + (instanceCounter++)); - this.timeouts = timeouts; - } - - /** - * To be called before {@link #run()} - */ - void setProcesses(List<ProcessRef> l) { - this.processes = l; - } - - @Override - public void run() { - // terminate in reverse order of startup (dependency order) - for (int index = processes.size() - 1; index >= 0; index--) { - ProcessRef ref = processes.get(index); - if (!ref.isStopped()) { - LoggerFactory.getLogger(getClass()).info(String.format("%s is stopping", ref)); - ref.askForGracefulAsyncStop(); - - long killAt = System.currentTimeMillis() + timeouts.getTerminationTimeout(); - while (!ref.isStopped() && System.currentTimeMillis() < killAt) { - try { - Thread.sleep(100L); - } catch (InterruptedException e) { - // stop asking for graceful stops, Monitor will hardly kill all processes - return; - } - } - if (!ref.isStopped()) { - LoggerFactory.getLogger(getClass()).info(String.format("%s failed to stop in a timely fashion. Killing it.", ref)); - } - ref.stop(); - LoggerFactory.getLogger(getClass()).info(String.format("%s is stopped", ref)); - } - } - } -} diff --git a/server/sonar-process-monitor/src/main/java/org/sonar/process/monitor/WatcherThread.java b/server/sonar-process-monitor/src/main/java/org/sonar/process/monitor/WatcherThread.java index d6e6da49651..86bf067b6f1 100644 --- a/server/sonar-process-monitor/src/main/java/org/sonar/process/monitor/WatcherThread.java +++ b/server/sonar-process-monitor/src/main/java/org/sonar/process/monitor/WatcherThread.java @@ -64,6 +64,10 @@ class WatcherThread extends Thread { } } + public ProcessRef getProcessRef() { + return processRef; + } + public boolean isAskedForRestart() { return askedForRestart; } diff --git a/server/sonar-process-monitor/src/test/java/org/sonar/process/monitor/RestartRequestWatcherThreadTest.java b/server/sonar-process-monitor/src/test/java/org/sonar/process/monitor/RestartRequestWatcherThreadTest.java deleted file mode 100644 index c87f0bae734..00000000000 --- a/server/sonar-process-monitor/src/test/java/org/sonar/process/monitor/RestartRequestWatcherThreadTest.java +++ /dev/null @@ -1,113 +0,0 @@ -/* - * SonarQube :: Process Monitor - * Copyright (C) 2009-2016 SonarSource SA - * mailto:contact AT sonarsource DOT com - * - * This program is free software; you can redistribute it and/or - * modify it under the terms of the GNU Lesser General Public - * License as published by the Free Software Foundation; either - * version 3 of the License, or (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - * Lesser General Public License for more details. - * - * You should have received a copy of the GNU Lesser General Public License - * along with this program; if not, write to the Free Software Foundation, - * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. - */ -package org.sonar.process.monitor; - -import java.io.IOException; -import java.util.Arrays; -import java.util.Collections; -import java.util.Random; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.ExpectedException; -import org.junit.rules.TemporaryFolder; -import org.sonar.process.DefaultProcessCommands; - -import static org.assertj.core.api.Assertions.assertThat; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.verify; - -public class RestartRequestWatcherThreadTest { - private static final long TEST_DELAYS_MS = 5L; - - @Rule - public ExpectedException expectedException = ExpectedException.none(); - @Rule - public TemporaryFolder temp = new TemporaryFolder(); - - private Monitor monitor = mock(Monitor.class); - - @Test - public void constructor_throws_NPE_if_monitor_arg_is_null() { - expectedException.expect(NullPointerException.class); - expectedException.expectMessage("monitor can not be null"); - - new RestartRequestWatcherThread(null, Collections.<ProcessRef>emptyList()); - } - - @Test - public void constructor_throws_NPE_if_processes_arg_is_null() { - expectedException.expect(NullPointerException.class); - expectedException.expectMessage("processes can not be null"); - - new RestartRequestWatcherThread(monitor, null); - } - - @Test - public void each_RestartRequestWatcherThread_instance_get_a_unique_thread_name() { - assertThat(newSingleProcessRefRestartWatcher().getName()) - .isNotEqualTo(newSingleProcessRefRestartWatcher().getName()); - } - - @Test - public void does_not_stop_watching_when_no_processRef_requests_restart() throws Exception { - RestartRequestWatcherThread underTest = newSingleProcessRefRestartWatcher(); - - underTest.start(); - - Thread.sleep(200L); - - assertThat(underTest.isWatching()).isTrue(); - assertThat(underTest.isAlive()).isTrue(); - } - - @Test(timeout = 500L) - public void stops_watching_when_any_processRef_requests_restart() throws Exception { - ProcessRef processRef1 = newProcessRef(1); - ProcessRef processRef2 = newProcessRef(2); - RestartRequestWatcherThread underTest = newSingleProcessRefRestartWatcher(processRef1, processRef2); - - underTest.start(); - - Thread.sleep(123L); - - if (new Random().nextInt() % 2 == 1) { - processRef1.getCommands().askForRestart(); - } else { - processRef2.getCommands().askForRestart(); - } - - underTest.join(); - - assertThat(underTest.isWatching()).isFalse(); - verify(monitor).restartAsync(); - } - - private RestartRequestWatcherThread newSingleProcessRefRestartWatcher(ProcessRef... processRefs) { - return new RestartRequestWatcherThread(monitor, Arrays.asList(processRefs), TEST_DELAYS_MS); - } - - private ProcessRef newProcessRef(int id) { - try { - return new ProcessRef(String.valueOf(id), new DefaultProcessCommands(temp.newFolder(), id), mock(Process.class), mock(StreamGobbler.class)); - } catch (IOException e) { - throw new RuntimeException(e); - } - } -} |