diff options
author | Stephane Gamard <stephane.gamard@searchbox.com> | 2014-07-15 15:27:39 +0200 |
---|---|---|
committer | Stephane Gamard <stephane.gamard@searchbox.com> | 2014-07-15 22:54:53 +0200 |
commit | f0d760a99ba5e1d52150cab26b59f3a68b48764f (patch) | |
tree | 1f19f5281c9af6a6eb23e5a29e2ba9dd8a1a40a5 /server/sonar-process/src | |
parent | b12b17b1c540e6327b5adf1b0dd5dfd11934b788 (diff) | |
download | sonarqube-f0d760a99ba5e1d52150cab26b59f3a68b48764f.tar.gz sonarqube-f0d760a99ba5e1d52150cab26b59f3a68b48764f.zip |
SONAR-5409 - Using JMS for sonar-process with MBean for monitoring
Diffstat (limited to 'server/sonar-process/src')
3 files changed, 185 insertions, 94 deletions
diff --git a/server/sonar-process/src/main/java/org/sonar/process/Process.java b/server/sonar-process/src/main/java/org/sonar/process/Process.java index 667776f86e9..b04b2ec0eff 100644 --- a/server/sonar-process/src/main/java/org/sonar/process/Process.java +++ b/server/sonar-process/src/main/java/org/sonar/process/Process.java @@ -23,24 +23,28 @@ import org.apache.commons.lang.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; -import java.net.DatagramPacket; -import java.net.DatagramSocket; -import java.net.InetAddress; +import javax.management.InstanceAlreadyExistsException; +import javax.management.MBeanRegistrationException; +import javax.management.MBeanServer; +import javax.management.MalformedObjectNameException; +import javax.management.NotCompliantMBeanException; +import javax.management.ObjectName; +import java.lang.management.ManagementFactory; -public abstract class Process implements Runnable { +public abstract class Process implements ProcessMXBean { public static final String NAME_PROPERTY = "pName"; - public static final String HEARTBEAT_PROPERTY = "pPort"; + public static final String PORT_PROPERTY = "pPort"; public static final String MISSING_NAME_ARGUMENT = "Missing Name argument"; public static final String MISSING_PORT_ARGUMENT = "Missing Port argument"; + private final Thread monitoringThread; + private static final long MAX_ALLOWED_TIME = 3000L; private final static Logger LOGGER = LoggerFactory.getLogger(Process.class); - protected Long heartBeatInterval = 1000L; - protected Thread monitor; + protected Long lastPing; final String name; final Integer port; @@ -52,7 +56,7 @@ public abstract class Process implements Runnable { // Loading all Properties from file this.props = props; this.name = props.of(NAME_PROPERTY, null); - this.port = props.intOf(HEARTBEAT_PROPERTY); + this.port = props.intOf(PORT_PROPERTY); // Testing required properties @@ -60,21 +64,34 @@ public abstract class Process implements Runnable { throw new IllegalStateException(MISSING_NAME_ARGUMENT); } - if (this.port == null) { - throw new IllegalStateException(MISSING_PORT_ARGUMENT); + if (this.port != null) { + this.monitoringThread = new Thread(new Monitor(this)); + } else { + this.monitoringThread = null; } - // Adding a shutdown hook - Runtime.getRuntime().addShutdownHook(new Thread() { - @Override - public void run() { - shutdown(); - } - }); + MBeanServer mbeanServer = ManagementFactory.getPlatformMBeanServer(); + try { + mbeanServer.registerMBean(this, this.getObjectName()); + } catch (InstanceAlreadyExistsException e) { + throw new IllegalStateException("Process already exists in current JVM", e); + } catch (MBeanRegistrationException e) { + throw new IllegalStateException("Could not register process as MBean", e); + } catch (NotCompliantMBeanException e) { + throw new IllegalStateException("Process is not a compliant MBean", e); + } + } + + public ObjectName getObjectName() { + try { + return new ObjectName("org.sonar", "name", name); + } catch (MalformedObjectNameException e) { + throw new IllegalStateException("Cannot create ObjectName for " + name, e); + } + } - //Starting monitoring thread - this.monitor = new Thread(this); - this.monitor.start(); + public void ping() { + this.lastPing = System.currentTimeMillis(); } public abstract void onStart(); @@ -82,38 +99,48 @@ public abstract class Process implements Runnable { public abstract void onStop(); public final void start() { - LOGGER.info("Process[{}]::start", name); - onStart(); + LOGGER.info("Process[{}]::start START", name); + if (monitoringThread != null) { + this.lastPing = System.currentTimeMillis(); + monitoringThread.start(); + } + this.onStart(); + LOGGER.info("Process[{}]::start END", name); } - public final void shutdown() { - LOGGER.info("Process[{}]::shutdown", name); - this.monitor.interrupt(); - this.monitor = null; + public final void stop() { + LOGGER.info("Process[{}]::shutdown START", name); + if (monitoringThread != null) { + monitoringThread.interrupt(); + } this.onStop(); + LOGGER.info("Process[{}]::shutdown END", name); } - @Override - public void run() { - LOGGER.info("Process[{}]::heartbeat({}) START", name, port); - try { - byte[] data = name.getBytes(); - DatagramPacket pack = - new DatagramPacket(data, data.length, InetAddress.getLocalHost(), port); - while (!Thread.currentThread().isInterrupted()) { - LOGGER.trace("{} process ping mother-ship", name); - DatagramSocket ds = new DatagramSocket(); - ds.send(pack); - ds.close(); + + private class Monitor implements Runnable { + + final Process process; + + private Monitor(Process process) { + this.process = process; + } + + @Override + public void run() { + while (monitoringThread != null && !monitoringThread.isInterrupted()) { + long time = System.currentTimeMillis(); + LOGGER.info("Process[{}]::Monitor::run - last checked-in is {}ms", name, time - lastPing); + if (time - lastPing > MAX_ALLOWED_TIME) { + process.stop(); + break; + } try { - Thread.sleep(heartBeatInterval); + Thread.sleep(1000); } catch (InterruptedException e) { - break; + e.printStackTrace(); } } - } catch (IOException e) { - throw new IllegalStateException("Heartbeat Thread for " + name + " could not communicate to socket", e); } - LOGGER.warn("Process[{}]::heartbeat OVER", name); } }
\ No newline at end of file diff --git a/server/sonar-process/src/main/java/org/sonar/process/ProcessMXBean.java b/server/sonar-process/src/main/java/org/sonar/process/ProcessMXBean.java new file mode 100644 index 00000000000..86c94eb6914 --- /dev/null +++ b/server/sonar-process/src/main/java/org/sonar/process/ProcessMXBean.java @@ -0,0 +1,10 @@ +package org.sonar.process; + +public interface ProcessMXBean { + + boolean isReady(); + + void ping(); + + void stop(); +} diff --git a/server/sonar-process/src/test/java/org/sonar/process/ProcessTest.java b/server/sonar-process/src/test/java/org/sonar/process/ProcessTest.java index 7a12b08b0f4..a94e53cde64 100644 --- a/server/sonar-process/src/test/java/org/sonar/process/ProcessTest.java +++ b/server/sonar-process/src/test/java/org/sonar/process/ProcessTest.java @@ -23,88 +23,142 @@ import org.junit.After; import org.junit.Before; import org.junit.Test; -import java.net.DatagramPacket; -import java.net.DatagramSocket; -import java.net.MalformedURLException; -import java.net.SocketException; -import java.net.URISyntaxException; +import javax.management.JMX; +import javax.management.MBeanServer; +import java.io.IOException; +import java.lang.management.ManagementFactory; +import java.net.ServerSocket; import java.util.Properties; +import static junit.framework.TestCase.fail; import static org.fest.assertions.Assertions.assertThat; public class ProcessTest { - DatagramSocket socket; + int freePort; @Before - public void setup() throws SocketException { - this.socket = new DatagramSocket(0); + public void setup() throws IOException { + ServerSocket socket = new ServerSocket(0); + freePort = socket.getLocalPort(); + socket.close(); } + Process process; + @After - public void tearDown() { - this.socket.close(); + public void tearDown() throws Exception { + if (process != null) { + MBeanServer mbeanServer = ManagementFactory.getPlatformMBeanServer(); + mbeanServer.unregisterMBean(process.getObjectName()); + } } @Test - public void fail_missing_properties() throws MalformedURLException, URISyntaxException { + public void should_register_mbean() throws Exception { + + MBeanServer mbeanServer = ManagementFactory.getPlatformMBeanServer(); + Properties properties = new Properties(); - try { - testProcess(properties); - } catch (Exception e) { - assertThat(e.getMessage()).isEqualTo(Process.MISSING_NAME_ARGUMENT); - } + properties.setProperty(Process.NAME_PROPERTY, "TEST"); + Props props = Props.create(properties); + process = new TestProcess(props); + + // 0 Can have a valid ObjectName + assertThat(process.getObjectName()).isNotNull(); - properties.setProperty(Process.NAME_PROPERTY, "test"); + // 1 assert that process MBean is registered + assertThat(mbeanServer.isRegistered(process.getObjectName())).isTrue(); + + // 2 assert that we cannot make another Process in the same JVM try { - testProcess(properties); - } catch (Exception e) { - assertThat(e.getMessage()).isEqualTo(Process.MISSING_PORT_ARGUMENT); + process = new TestProcess(null); + fail(); + } catch (IllegalStateException e) { + assertThat(e.getMessage()).isEqualTo("Process already exists in current JVM"); } - - properties.setProperty(Process.HEARTBEAT_PROPERTY, Integer.toString(socket.getLocalPort())); - assertThat(testProcess(properties)).isNotNull(); } @Test - public void heart_beats() { + public void should_stop() throws Exception { Properties properties = new Properties(); - properties.setProperty(Process.NAME_PROPERTY, "test"); - properties.setProperty(Process.HEARTBEAT_PROPERTY, Integer.toString(socket.getLocalPort())); - Process test = testProcess(properties); - - assertThat(test).isNotNull(); - - int ping = 0; - int loop = 0; - while (loop < 3) { - DatagramPacket packet = new DatagramPacket(new byte[1024], 1024); - try { - socket.setSoTimeout(2000); - socket.receive(packet); - ping++; - } catch (Exception e) { - // Do nothing + properties.setProperty(Process.NAME_PROPERTY, "TEST"); + Props props = Props.create(properties); + process = new TestProcess(props); + + MBeanServer mbeanServer = ManagementFactory.getPlatformMBeanServer(); + final ProcessMXBean processMXBean = JMX.newMBeanProxy(mbeanServer, process.getObjectName(), ProcessMXBean.class); + + Thread procThread = new Thread(new Runnable() { + @Override + public void run() { + process.start(); } - loop++; - } + }); + + // 0. Process is loaded but not ready yet. + assertThat(processMXBean.isReady()).isFalse(); + + // 1. Pretend the process has started + procThread.start(); + Thread.sleep(200); + assertThat(procThread.isAlive()).isTrue(); + assertThat(processMXBean.isReady()).isTrue(); - assertThat(ping).isEqualTo(loop); + // 2. Stop the process through Management + processMXBean.stop(); + Thread.sleep(200); + assertThat(procThread.isAlive()).isFalse(); } - private Process testProcess(Properties properties) { - return new Process(Props.create(properties)) { + @Test(timeout = 5000L) + public void should_stop_by_itself() throws Exception { + Properties properties = new Properties(); + properties.setProperty(Process.NAME_PROPERTY, "TEST"); + properties.setProperty(Process.PORT_PROPERTY, Integer.toString(freePort)); + Props props = Props.create(properties); + process = new TestProcess(props); + process.start(); + + } + + public static class TestProcess extends Process { + + private boolean ready = false; + private boolean running = false; + + public TestProcess(Props props) { + super(props); + running = true; + } - public void onStart() { + @Override + public void onStart() { + ready = true; + while (running) { try { - Thread.sleep(10000L); + Thread.sleep(200); } catch (InterruptedException e) { e.printStackTrace(); } } + } - public void onStop() { - } - }; + @Override + public void onStop() { + running = false; + } + + @Override + public boolean isReady() { + return ready; + } + + public static void main(String... args) { + System.out.println("Starting child process"); + Props props = Props.create(System.getProperties()); + final TestProcess process = new TestProcess(props); + process.start(); + } } }
\ No newline at end of file |