]> source.dussan.org Git - sonarqube.git/commitdiff
SONAR-5409 - Using JMS for sonar-process with MBean for monitoring
authorStephane Gamard <stephane.gamard@searchbox.com>
Tue, 15 Jul 2014 13:27:39 +0000 (15:27 +0200)
committerStephane Gamard <stephane.gamard@searchbox.com>
Tue, 15 Jul 2014 20:54:53 +0000 (22:54 +0200)
server/sonar-process/src/main/java/org/sonar/process/Process.java
server/sonar-process/src/main/java/org/sonar/process/ProcessMXBean.java [new file with mode: 0644]
server/sonar-process/src/test/java/org/sonar/process/ProcessTest.java

index 667776f86e9bff6d18bfc608d68fa03665a16e31..b04b2ec0eff7d4e05672231b63957bec0c8f673f 100644 (file)
@@ -23,24 +23,28 @@ import org.apache.commons.lang.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.IOException;
-import java.net.DatagramPacket;
-import java.net.DatagramSocket;
-import java.net.InetAddress;
+import javax.management.InstanceAlreadyExistsException;
+import javax.management.MBeanRegistrationException;
+import javax.management.MBeanServer;
+import javax.management.MalformedObjectNameException;
+import javax.management.NotCompliantMBeanException;
+import javax.management.ObjectName;
+import java.lang.management.ManagementFactory;
 
-public abstract class Process implements Runnable {
+public abstract class Process implements ProcessMXBean {
 
   public static final String NAME_PROPERTY = "pName";
-  public static final String HEARTBEAT_PROPERTY = "pPort";
+  public static final String PORT_PROPERTY = "pPort";
 
   public static final String MISSING_NAME_ARGUMENT = "Missing Name argument";
   public static final String MISSING_PORT_ARGUMENT = "Missing Port argument";
 
+  private final Thread monitoringThread;
+  private static final long MAX_ALLOWED_TIME = 3000L;
 
   private final static Logger LOGGER = LoggerFactory.getLogger(Process.class);
 
-  protected Long heartBeatInterval = 1000L;
-  protected Thread monitor;
+  protected Long lastPing;
 
   final String name;
   final Integer port;
@@ -52,7 +56,7 @@ public abstract class Process implements Runnable {
     // Loading all Properties from file
     this.props = props;
     this.name = props.of(NAME_PROPERTY, null);
-    this.port = props.intOf(HEARTBEAT_PROPERTY);
+    this.port = props.intOf(PORT_PROPERTY);
 
 
     // Testing required properties
@@ -60,21 +64,34 @@ public abstract class Process implements Runnable {
       throw new IllegalStateException(MISSING_NAME_ARGUMENT);
     }
 
-    if (this.port == null) {
-      throw new IllegalStateException(MISSING_PORT_ARGUMENT);
+    if (this.port != null) {
+      this.monitoringThread = new Thread(new Monitor(this));
+    } else {
+      this.monitoringThread = null;
     }
 
-    // Adding a shutdown hook
-    Runtime.getRuntime().addShutdownHook(new Thread() {
-      @Override
-      public void run() {
-        shutdown();
-      }
-    });
+    MBeanServer mbeanServer = ManagementFactory.getPlatformMBeanServer();
+    try {
+      mbeanServer.registerMBean(this, this.getObjectName());
+    } catch (InstanceAlreadyExistsException e) {
+      throw new IllegalStateException("Process already exists in current JVM", e);
+    } catch (MBeanRegistrationException e) {
+      throw new IllegalStateException("Could not register process as MBean", e);
+    } catch (NotCompliantMBeanException e) {
+      throw new IllegalStateException("Process is not a compliant MBean", e);
+    }
+  }
+
+  public ObjectName getObjectName() {
+    try {
+      return new ObjectName("org.sonar", "name", name);
+    } catch (MalformedObjectNameException e) {
+      throw new IllegalStateException("Cannot create ObjectName for " + name, e);
+    }
+  }
 
-    //Starting monitoring thread
-    this.monitor = new Thread(this);
-    this.monitor.start();
+  public void ping() {
+    this.lastPing = System.currentTimeMillis();
   }
 
   public abstract void onStart();
@@ -82,38 +99,48 @@ public abstract class Process implements Runnable {
   public abstract void onStop();
 
   public final void start() {
-    LOGGER.info("Process[{}]::start", name);
-    onStart();
+    LOGGER.info("Process[{}]::start START", name);
+    if (monitoringThread != null) {
+      this.lastPing = System.currentTimeMillis();
+      monitoringThread.start();
+    }
+    this.onStart();
+    LOGGER.info("Process[{}]::start END", name);
   }
 
-  public final void shutdown() {
-    LOGGER.info("Process[{}]::shutdown", name);
-    this.monitor.interrupt();
-    this.monitor = null;
+  public final void stop() {
+    LOGGER.info("Process[{}]::shutdown START", name);
+    if (monitoringThread != null) {
+      monitoringThread.interrupt();
+    }
     this.onStop();
+    LOGGER.info("Process[{}]::shutdown END", name);
   }
 
-  @Override
-  public void run() {
-    LOGGER.info("Process[{}]::heartbeat({}) START", name, port);
-    try {
-      byte[] data = name.getBytes();
-      DatagramPacket pack =
-        new DatagramPacket(data, data.length, InetAddress.getLocalHost(), port);
-      while (!Thread.currentThread().isInterrupted()) {
-        LOGGER.trace("{} process ping mother-ship", name);
-        DatagramSocket ds = new DatagramSocket();
-        ds.send(pack);
-        ds.close();
+
+  private class Monitor implements Runnable {
+
+    final Process process;
+
+    private Monitor(Process process) {
+      this.process = process;
+    }
+
+    @Override
+    public void run() {
+      while (monitoringThread != null && !monitoringThread.isInterrupted()) {
+        long time = System.currentTimeMillis();
+        LOGGER.info("Process[{}]::Monitor::run - last checked-in is {}ms", name, time - lastPing);
+        if (time - lastPing > MAX_ALLOWED_TIME) {
+          process.stop();
+          break;
+        }
         try {
-          Thread.sleep(heartBeatInterval);
+          Thread.sleep(1000);
         } catch (InterruptedException e) {
-          break;
+          e.printStackTrace();
         }
       }
-    } catch (IOException e) {
-      throw new IllegalStateException("Heartbeat Thread for " + name + " could not communicate to socket", e);
     }
-    LOGGER.warn("Process[{}]::heartbeat OVER", name);
   }
 }
\ No newline at end of file
diff --git a/server/sonar-process/src/main/java/org/sonar/process/ProcessMXBean.java b/server/sonar-process/src/main/java/org/sonar/process/ProcessMXBean.java
new file mode 100644 (file)
index 0000000..86c94eb
--- /dev/null
@@ -0,0 +1,10 @@
+package org.sonar.process;
+
+public interface ProcessMXBean {
+
+  boolean isReady();
+
+  void ping();
+
+  void stop();
+}
index 7a12b08b0f4de2dce79d17d8e7bc740d4f1e60b4..a94e53cde641ccaca826c7f04ea4eb698c7f983e 100644 (file)
@@ -23,88 +23,142 @@ import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 
-import java.net.DatagramPacket;
-import java.net.DatagramSocket;
-import java.net.MalformedURLException;
-import java.net.SocketException;
-import java.net.URISyntaxException;
+import javax.management.JMX;
+import javax.management.MBeanServer;
+import java.io.IOException;
+import java.lang.management.ManagementFactory;
+import java.net.ServerSocket;
 import java.util.Properties;
 
+import static junit.framework.TestCase.fail;
 import static org.fest.assertions.Assertions.assertThat;
 
 public class ProcessTest {
 
-  DatagramSocket socket;
+  int freePort;
 
   @Before
-  public void setup() throws SocketException {
-    this.socket = new DatagramSocket(0);
+  public void setup() throws IOException {
+    ServerSocket socket = new ServerSocket(0);
+    freePort = socket.getLocalPort();
+    socket.close();
   }
 
+  Process process;
+
   @After
-  public void tearDown() {
-    this.socket.close();
+  public void tearDown() throws Exception {
+    if (process != null) {
+      MBeanServer mbeanServer = ManagementFactory.getPlatformMBeanServer();
+      mbeanServer.unregisterMBean(process.getObjectName());
+    }
   }
 
   @Test
-  public void fail_missing_properties() throws MalformedURLException, URISyntaxException {
+  public void should_register_mbean() throws Exception {
+
+    MBeanServer mbeanServer = ManagementFactory.getPlatformMBeanServer();
+
     Properties properties = new Properties();
-    try {
-      testProcess(properties);
-    } catch (Exception e) {
-      assertThat(e.getMessage()).isEqualTo(Process.MISSING_NAME_ARGUMENT);
-    }
+    properties.setProperty(Process.NAME_PROPERTY, "TEST");
+    Props props = Props.create(properties);
+    process = new TestProcess(props);
+
+    // 0 Can have a valid ObjectName
+    assertThat(process.getObjectName()).isNotNull();
 
-    properties.setProperty(Process.NAME_PROPERTY, "test");
+    // 1 assert that process MBean is registered
+    assertThat(mbeanServer.isRegistered(process.getObjectName())).isTrue();
+
+    // 2 assert that we cannot make another Process in the same JVM
     try {
-      testProcess(properties);
-    } catch (Exception e) {
-      assertThat(e.getMessage()).isEqualTo(Process.MISSING_PORT_ARGUMENT);
+      process = new TestProcess(null);
+      fail();
+    } catch (IllegalStateException e) {
+      assertThat(e.getMessage()).isEqualTo("Process already exists in current JVM");
     }
-
-    properties.setProperty(Process.HEARTBEAT_PROPERTY, Integer.toString(socket.getLocalPort()));
-    assertThat(testProcess(properties)).isNotNull();
   }
 
   @Test
-  public void heart_beats() {
+  public void should_stop() throws Exception {
     Properties properties = new Properties();
-    properties.setProperty(Process.NAME_PROPERTY, "test");
-    properties.setProperty(Process.HEARTBEAT_PROPERTY, Integer.toString(socket.getLocalPort()));
-    Process test = testProcess(properties);
-
-    assertThat(test).isNotNull();
-
-    int ping = 0;
-    int loop = 0;
-    while (loop < 3) {
-      DatagramPacket packet = new DatagramPacket(new byte[1024], 1024);
-      try {
-        socket.setSoTimeout(2000);
-        socket.receive(packet);
-        ping++;
-      } catch (Exception e) {
-        // Do nothing
+    properties.setProperty(Process.NAME_PROPERTY, "TEST");
+    Props props = Props.create(properties);
+    process = new TestProcess(props);
+
+    MBeanServer mbeanServer = ManagementFactory.getPlatformMBeanServer();
+    final ProcessMXBean processMXBean = JMX.newMBeanProxy(mbeanServer, process.getObjectName(), ProcessMXBean.class);
+
+    Thread procThread = new Thread(new Runnable() {
+      @Override
+      public void run() {
+        process.start();
       }
-      loop++;
-    }
+    });
+
+    // 0. Process is loaded but not ready yet.
+    assertThat(processMXBean.isReady()).isFalse();
+
+    // 1. Pretend the process has started
+    procThread.start();
+    Thread.sleep(200);
+    assertThat(procThread.isAlive()).isTrue();
+    assertThat(processMXBean.isReady()).isTrue();
 
-    assertThat(ping).isEqualTo(loop);
+    // 2. Stop the process through Management
+    processMXBean.stop();
+    Thread.sleep(200);
+    assertThat(procThread.isAlive()).isFalse();
   }
 
-  private Process testProcess(Properties properties) {
-    return new Process(Props.create(properties)) {
+  @Test(timeout = 5000L)
+  public void should_stop_by_itself() throws Exception {
+    Properties properties = new Properties();
+    properties.setProperty(Process.NAME_PROPERTY, "TEST");
+    properties.setProperty(Process.PORT_PROPERTY, Integer.toString(freePort));
+    Props props = Props.create(properties);
+    process = new TestProcess(props);
+    process.start();
+
+  }
+
+  public static class TestProcess extends Process {
+
+    private boolean ready = false;
+    private boolean running = false;
+
+    public TestProcess(Props props) {
+      super(props);
+      running = true;
+    }
 
-      public void onStart() {
+    @Override
+    public void onStart() {
+      ready = true;
+      while (running) {
         try {
-          Thread.sleep(10000L);
+          Thread.sleep(200);
         } catch (InterruptedException e) {
           e.printStackTrace();
         }
       }
+    }
 
-      public void onStop() {
-      }
-    };
+    @Override
+    public void onStop() {
+      running = false;
+    }
+
+    @Override
+    public boolean isReady() {
+      return ready;
+    }
+
+    public static void main(String... args) {
+      System.out.println("Starting child process");
+      Props props = Props.create(System.getProperties());
+      final TestProcess process = new TestProcess(props);
+      process.start();
+    }
   }
 }
\ No newline at end of file