diff options
author | Stephane Gamard <stephane.gamard@searchbox.com> | 2014-07-18 10:48:39 +0200 |
---|---|---|
committer | Stephane Gamard <stephane.gamard@searchbox.com> | 2014-07-18 11:28:00 +0200 |
commit | 76e842ab44f6bb0059be659e6c057af98dcf61a7 (patch) | |
tree | bfcf30e28ca316beb4e0cafa384d0791d2d75db9 /server/sonar-process | |
parent | 70dabb8a6d7261185e3f9895361e1ef72559fdbd (diff) | |
download | sonarqube-76e842ab44f6bb0059be659e6c057af98dcf61a7.tar.gz sonarqube-76e842ab44f6bb0059be659e6c057af98dcf61a7.zip |
SONAR-5408 - two-way ping for wrapper & process with isReady and JMX retry
Diffstat (limited to 'server/sonar-process')
5 files changed, 224 insertions, 86 deletions
diff --git a/server/sonar-process/src/main/java/org/sonar/process/Monitor.java b/server/sonar-process/src/main/java/org/sonar/process/Monitor.java new file mode 100644 index 00000000000..9e2465f9648 --- /dev/null +++ b/server/sonar-process/src/main/java/org/sonar/process/Monitor.java @@ -0,0 +1,112 @@ +/* + * SonarQube, open source software quality management tool. + * Copyright (C) 2008-2014 SonarSource + * mailto:contact AT sonarsource DOT com + * + * SonarQube is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 3 of the License, or (at your option) any later version. + * + * SonarQube is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with this program; if not, write to the Free Software Foundation, + * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ +package org.sonar.process; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; + +public class Monitor extends Thread { + + private final static Logger LOGGER = LoggerFactory.getLogger(Monitor.class); + + private volatile List<ProcessWrapper> processes; + private volatile Map<String, Long> pings; + + private ScheduledFuture<?> watch; + private final ScheduledExecutorService monitor; + + public Monitor() { + processes = new ArrayList<ProcessWrapper>(); + pings = new HashMap<String, Long>(); + monitor = Executors.newScheduledThreadPool(1); + watch = monitor.scheduleWithFixedDelay(new ProcessWatch(), 0, 3, TimeUnit.SECONDS); + } + + public void registerProcess(ProcessWrapper processWrapper) { + LOGGER.trace("Monitor::registerProcess() START"); + processes.add(processWrapper); + pings.put(processWrapper.getName(), System.currentTimeMillis()); + processWrapper.start(); + for(int i=0; i<10; i++){ + if(processWrapper.getProcessMXBean() == null + || !processWrapper.getProcessMXBean().isReady()){ + try { + Thread.sleep(500L); + } catch (InterruptedException e) { + throw new IllegalStateException("Could not register process in Monitor", e); + } + } + } + LOGGER.trace("Monitor::registerProcess() END"); + } + + private class ProcessWatch implements Runnable { + public void run() { + LOGGER.trace("Monitor::ProcessWatch PINGING for map: {}", processes); + for (ProcessWrapper process : processes) { + try { + long time = process.getProcessMXBean().ping(); + LOGGER.info("Monitor::ProcessWatch PINGED '{}'", process.getName()); + pings.put(process.getName(), time); + } catch (Exception e) { + LOGGER.error("Error while pinging {}", process.getName(), e); + } + } + } + } + + private boolean processIsValid(ProcessWrapper process) { + long now = System.currentTimeMillis(); + LOGGER.debug("Monitor::processIsValid() -- Time since last ping for '{}': {}ms", + process.getName(), (now - pings.get(process.getName()))); + return (now - pings.get(process.getName())) < 5000L; + } + + public void run() { + LOGGER.trace("Monitor::run() START"); + boolean everythingOK = true; + while (everythingOK) { + for(ProcessWrapper process: processes){ + if(!processIsValid(process)){ + LOGGER.warn("Monitor::run() -- Process '{}' is not valid. Exiting monitor", process.getName()); + everythingOK = false; + break; + } + } + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + watch.cancel(true); + monitor.shutdownNow(); + LOGGER.trace("Monitor::run() END"); + } +} diff --git a/server/sonar-process/src/main/java/org/sonar/process/Process.java b/server/sonar-process/src/main/java/org/sonar/process/Process.java index abcdd75dfb0..44d7b4f028a 100644 --- a/server/sonar-process/src/main/java/org/sonar/process/Process.java +++ b/server/sonar-process/src/main/java/org/sonar/process/Process.java @@ -54,6 +54,7 @@ public abstract class Process implements ProcessMXBean { final Integer port; final protected Props props; + final private Thread shutdownHook; private static final long MAX_ALLOWED_TIME = 3000L; private ScheduledFuture<?> pingTask = null; @@ -61,16 +62,16 @@ public abstract class Process implements ProcessMXBean { final Runnable breakOnMissingPing = new Runnable() { public void run() { long time = System.currentTimeMillis(); - LOGGER.info("last check-in was {}ms ago.", time - lastPing); + LOGGER.debug("last check-in was {}ms ago.", time - lastPing); if (time - lastPing > MAX_ALLOWED_TIME) { LOGGER.warn("Did not get a check-in since {}ms. Initiate shutdown", time - lastPing); - stop(); + Runtime.getRuntime().removeShutdownHook(shutdownHook); + shutdown(); } } }; public Process(Props props) { - validateSonarHome(props); @@ -96,12 +97,13 @@ public abstract class Process implements ProcessMXBean { throw new IllegalStateException("Process is not a compliant MBean", e); } - Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() { + shutdownHook = new Thread(new Runnable() { @Override public void run() { - Process.this.stop(); + Process.this.shutdown(); } - })); + }); + Runtime.getRuntime().addShutdownHook(shutdownHook); } public ObjectName getObjectName() { @@ -116,32 +118,45 @@ public abstract class Process implements ProcessMXBean { } } - public void ping() { + public long ping() { this.lastPing = System.currentTimeMillis(); + return lastPing; } public abstract void onStart(); - public abstract void onStop(); + public abstract void onTerminate(); public final void start() { - LOGGER.info("Process[{}]::start START", name); + LOGGER.trace("Process[{}]::start() START", name); if (this.port != null) { lastPing = System.currentTimeMillis(); pingTask = monitor.scheduleWithFixedDelay(breakOnMissingPing, 5, 5, TimeUnit.SECONDS); } this.onStart(); - LOGGER.info("Process[{}]::start END", name); + LOGGER.trace("Process[{}]::start() END", name); + } + + public final void terminate() { + LOGGER.trace("Process[{}]::stop() START", name); + Runtime.getRuntime().removeShutdownHook(shutdownHook); + new Thread(new Runnable() { + @Override + public void run() { + shutdown(); + } + }).start(); + LOGGER.trace("Process[{}]::stop() END", name); } - public final void stop() { - LOGGER.info("Process[{}]::shutdown START", name); + private void shutdown(){ + LOGGER.trace("Process[{}]::shutdown() START", name); + this.onTerminate(); if (pingTask != null) { pingTask.cancel(true); } monitor.shutdownNow(); - this.onStop(); - LOGGER.info("Process[{}]::shutdown END", name); + LOGGER.trace("Process[{}]::shutdown() END", name); } private void validateSonarHome(Props props) { diff --git a/server/sonar-process/src/main/java/org/sonar/process/ProcessMXBean.java b/server/sonar-process/src/main/java/org/sonar/process/ProcessMXBean.java index 51ae80c8626..c0ba7c9e033 100644 --- a/server/sonar-process/src/main/java/org/sonar/process/ProcessMXBean.java +++ b/server/sonar-process/src/main/java/org/sonar/process/ProcessMXBean.java @@ -23,7 +23,7 @@ public interface ProcessMXBean { boolean isReady(); - void ping(); + long ping(); - void stop(); + void terminate(); } diff --git a/server/sonar-process/src/main/java/org/sonar/process/ProcessWrapper.java b/server/sonar-process/src/main/java/org/sonar/process/ProcessWrapper.java index 42fcd51de85..eeb10b72bf3 100644 --- a/server/sonar-process/src/main/java/org/sonar/process/ProcessWrapper.java +++ b/server/sonar-process/src/main/java/org/sonar/process/ProcessWrapper.java @@ -22,6 +22,7 @@ package org.sonar.process; import com.google.common.io.Closeables; import org.apache.commons.io.FileUtils; import org.apache.commons.lang.StringUtils; +import org.apache.commons.lang.builder.ReflectionToStringBuilder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -39,107 +40,95 @@ import java.net.InetAddress; import java.net.MalformedURLException; import java.net.UnknownHostException; import java.util.Map; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledFuture; -import java.util.concurrent.TimeUnit; -public class ProcessWrapper { +public class ProcessWrapper extends Thread { private final static Logger LOGGER = LoggerFactory.getLogger(ProcessWrapper.class); final int port; - final String name; final String workDir; final String className; final String[] classPath; final Map<String, String> properties; - ProcessMXBean processMXBean; - MBeanServerConnection mBeanServer; - final java.lang.Process process; + private volatile Thread processThread; + private StreamGobbler errorGobbler; private StreamGobbler outputGobbler; - - private ScheduledFuture<?> pingTask = null; - final ScheduledExecutorService monitor = Executors.newScheduledThreadPool(1); - final Runnable pinging = new Runnable() { - public void run() { - processMXBean.ping(); - } - }; - - + final ProcessMXBean processMXBean; public ProcessWrapper(String workDir, String className, Map<String, String> properties, final String name, String... classPath) { + super(name); this.port = NetworkUtils.freePort(); LOGGER.info("Creating Process for '{}' with workDir: '{}' and monitoring port: {}", name, workDir, port); this.workDir = workDir; - this.name = name; this.className = className; this.classPath = classPath; this.properties = properties; + processThread = this; this.process = executeProcess(); - new Thread(new Runnable() { - @Override - public void run() { - try { - process.waitFor(); - } catch (InterruptedException e) { - e.printStackTrace(); - } finally { - waitUntilFinish(outputGobbler); - waitUntilFinish(errorGobbler); - } - LOGGER.warn("Process '{}' Unexpectedly finished. Node should shutdown.", name); - } - }).start(); + processMXBean = waitForJMX(name, port); + } - // Waiting for the Child VM to start and for JMX to be available - try { - Thread.sleep(2000); - } catch (InterruptedException e) { - e.printStackTrace(); - } + public ProcessMXBean getProcessMXBean() { + return processMXBean; + } - JMXServiceURL jmxUrl = null; + private ProcessMXBean waitForJMX(String name, Integer port){ - try { - String protocol = "rmi"; - String path = "/jndi/rmi://" + InetAddress.getLocalHost().getHostName() + ":" + port + "/jmxrmi"; - jmxUrl = new JMXServiceURL(protocol, InetAddress.getLocalHost().getHostName(), port, path); - JMXConnector jmxConnector = JMXConnectorFactory.connect(jmxUrl, null); - mBeanServer = jmxConnector.getMBeanServerConnection(); - processMXBean = JMX.newMBeanProxy(mBeanServer, Process.objectNameFor(name), ProcessMXBean.class); - } catch (MalformedURLException e) { - throw new IllegalStateException("JMXUrl '" + jmxUrl + "'is not valid", e); - } catch (UnknownHostException e) { - throw new IllegalStateException("Could not get hostname", e); - } catch (IOException e) { - throw new IllegalStateException("Could not connect to JMX service", e); + Exception exception = null; + for(int i=0; i< 10; i++) { + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + throw new IllegalStateException("Could not connect to JMX server", e); + } + LOGGER.info("Try #{} to connect to JMX server for process '{}'", i, name); + try { + String protocol = "rmi"; + String path = "/jndi/rmi://" + InetAddress.getLocalHost().getHostName() + ":" + port + "/jmxrmi"; + JMXServiceURL jmxUrl = new JMXServiceURL(protocol, InetAddress.getLocalHost().getHostName(), port, path); + JMXConnector jmxConnector = JMXConnectorFactory.connect(jmxUrl, null); + MBeanServerConnection mBeanServer = jmxConnector.getMBeanServerConnection(); + ProcessMXBean bean = JMX.newMBeanProxy(mBeanServer, Process.objectNameFor(name), ProcessMXBean.class); + LOGGER.info("ProcessWrapper::waitForJMX -- Connected to JMX Server with URL: {}",jmxUrl.toString()); + return bean; + } catch (MalformedURLException e) { + throw new IllegalStateException("JMXUrl is not valid", e); + } catch (UnknownHostException e) { + throw new IllegalStateException("Could not get hostname", e); + } catch (IOException e) { + exception = e; + } } - - pingTask = monitor.scheduleWithFixedDelay(pinging, 0, 3, TimeUnit.SECONDS); + throw new IllegalStateException("Could not connect to JMX service", exception); } - public boolean isReady() { - return processMXBean.isReady(); + public boolean isReady(){ + return processMXBean != null && processMXBean.isReady(); } - public void stop() { - pingTask.cancel(true); - processMXBean.stop(); - } - - public String getName() { - return name; + public void run() { + LOGGER.trace("ProcessWrapper::run() START"); + try { + process.waitFor(); + } catch (InterruptedException e) { + e.printStackTrace(); + } finally { + waitUntilFinish(outputGobbler); + waitUntilFinish(errorGobbler); + closeStreams(process); + } + ProcessWrapper.this.processThread = null; + LOGGER.trace("ProcessWrapper::run() END"); } public java.lang.Process executeProcess() { + LOGGER.info("ProcessWrapper::executeProcess() START"); ProcessBuilder processBuilder = new ProcessBuilder("java", "-Dcom.sun.management.jmxremote", @@ -156,24 +145,33 @@ public class ProcessWrapper { //check that working directory exists. File workDirectory = new File(workDir); - if(!workDirectory.exists()) { + if (!workDirectory.exists()) { throw new IllegalStateException("Work directory does not exist."); } else { processBuilder.directory(FileUtils.getFile(workDir)); } try { + LOGGER.debug("ProcessWrapper::executeProcess() -- Starting process with command '{}'", + StringUtils.join(processBuilder.command()," ")); java.lang.Process process = processBuilder.start(); + LOGGER.debug("ProcessWrapper::executeProcess() -- Process started: {}",process.toString()); errorGobbler = new StreamGobbler(process.getErrorStream(), this.getName() + "-ERROR"); outputGobbler = new StreamGobbler(process.getInputStream(), this.getName()); outputGobbler.start(); errorGobbler.start(); + LOGGER.trace("ProcessWrapper::executeProcess() END"); return process; } catch (IOException e) { throw new IllegalStateException("Io Exception in ProcessWrapper", e); } + } + @Override + public String toString() { + return ReflectionToStringBuilder.toString(this); + } private void closeStreams(java.lang.Process process) { if (process != null) { @@ -183,7 +181,7 @@ public class ProcessWrapper { } } - private void waitUntilFinish(StreamGobbler thread) { + private void waitUntilFinish(Thread thread) { if (thread != null) { try { thread.join(); @@ -193,13 +191,26 @@ public class ProcessWrapper { } } + public void terminate() { + if(this.processMXBean != null) { + this.processMXBean.terminate(); + waitUntilFinish(this); + } else { + process.destroy(); + } + } + + public Object getThread() { + return this.processThread; + } + private static class StreamGobbler extends Thread { private final InputStream is; private volatile Exception exception; private final String pName; StreamGobbler(InputStream is, String name) { - super("ProcessStreamGobbler"); + super(name+"_ProcessStreamGobbler"); this.is = is; this.pName = name; } diff --git a/server/sonar-process/src/test/java/org/sonar/process/ProcessTest.java b/server/sonar-process/src/test/java/org/sonar/process/ProcessTest.java index 10fc44187d1..d7b390654d4 100644 --- a/server/sonar-process/src/test/java/org/sonar/process/ProcessTest.java +++ b/server/sonar-process/src/test/java/org/sonar/process/ProcessTest.java @@ -126,7 +126,7 @@ public class ProcessTest { assertThat(processMXBean.isReady()).isTrue(); // 2. Stop the process through Management - processMXBean.stop(); + processMXBean.terminate(); procThread.join(); } @@ -164,7 +164,7 @@ public class ProcessTest { } @Override - public void onStop() { + public void onTerminate() { running = false; } |