]> source.dussan.org Git - sonarqube.git/commitdiff
SONAR-5409 - Using JMX to connect to wrapped Process
authorStephane Gamard <stephane.gamard@searchbox.com>
Tue, 15 Jul 2014 17:03:52 +0000 (19:03 +0200)
committerStephane Gamard <stephane.gamard@searchbox.com>
Tue, 15 Jul 2014 20:54:53 +0000 (22:54 +0200)
server/sonar-process/src/main/java/org/sonar/process/MonitorService.java [deleted file]
server/sonar-process/src/main/java/org/sonar/process/ProcessWrapper.java
server/sonar-process/src/test/java/org/sonar/process/MonitorServiceTest.java [deleted file]
server/sonar-process/src/test/java/org/sonar/process/ProcessWrapperTest.java [new file with mode: 0644]

diff --git a/server/sonar-process/src/main/java/org/sonar/process/MonitorService.java b/server/sonar-process/src/main/java/org/sonar/process/MonitorService.java
deleted file mode 100644 (file)
index b4f0a07..0000000
+++ /dev/null
@@ -1,102 +0,0 @@
-/*
- * SonarQube, open source software quality management tool.
- * Copyright (C) 2008-2014 SonarSource
- * mailto:contact AT sonarsource DOT com
- *
- * SonarQube is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License as published by the Free Software Foundation; either
- * version 3 of the License, or (at your option) any later version.
- *
- * SonarQube is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public License
- * along with this program; if not, write to the Free Software Foundation,
- * Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
- */
-package org.sonar.process;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.net.DatagramPacket;
-import java.net.DatagramSocket;
-import java.util.HashMap;
-import java.util.Map;
-
-public class MonitorService extends Thread {
-
-  private final static Logger LOGGER = LoggerFactory.getLogger(ProcessWrapper.class);
-
-  private final static Long MAX_ELAPSED_TIME = 10000L;
-
-  final DatagramSocket socket;
-  final Map<String, ProcessWrapper> processes;
-  final Map<String, Long> processesPing;
-
-  public MonitorService(DatagramSocket socket) {
-    LOGGER.info("Monitor service is listening on socket:{}", socket.getLocalPort());
-    this.socket = socket;
-    processes = new HashMap<String, ProcessWrapper>();
-    processesPing = new HashMap<String, Long>();
-  }
-
-  public void register(ProcessWrapper process) {
-    this.processes.put(process.getName(), process);
-    this.processesPing.put(process.getName(), System.currentTimeMillis());
-  }
-
-  @Override
-  public void run() {
-    LOGGER.info("Starting monitoring for {} processes.", processes.size());
-    long time;
-    while (!Thread.currentThread().isInterrupted()) {
-      DatagramPacket packet = new DatagramPacket(new byte[1024], 1024);
-      time = System.currentTimeMillis();
-      try {
-        socket.setSoTimeout(200);
-        socket.receive(packet);
-        String message = new String(packet.getData()).trim();
-        processesPing.put(message, time);
-      } catch (Exception e) {
-        ; // To not do anything.
-      }
-      if (!checkAllProcessPing(time)) {
-        Thread.currentThread().interrupt();
-      }
-    }
-  }
-
-
-  private boolean checkAllProcessPing(long now) {
-
-    //check that all thread wrapper are running
-    for (Thread thread : processes.values()) {
-      if (thread.isInterrupted()) {
-        LOGGER.error("process {} has been interrupted. Aborting node",
-          thread.getName());
-        return false;
-      }
-    }
-
-    //check that all heartbeats are OK
-    for (Map.Entry<String, Long> processPing : processesPing.entrySet()) {
-      if ((now - processPing.getValue()) > MAX_ELAPSED_TIME) {
-        LOGGER.error("process {} has not checked-in since {}ms. Aborting node",
-          processPing.getKey(), (now - processPing.getValue()));
-        return false;
-      } else {
-        LOGGER.debug("process {} has last checked-in {}ms ago.",
-          processPing.getKey(), (now - processPing.getValue()));
-      }
-    }
-    return true;
-  }
-
-  public Integer getMonitoringPort() {
-    return socket.getLocalPort();
-  }
-}
index aa7dfe3fccf9cd118740c06e3eeef1a3c30e5631..f1997f2d09a7c5f1193f0d8b254064500b00725a 100644 (file)
  */
 package org.sonar.process;
 
+import com.google.common.io.Closeables;
 import org.apache.commons.lang.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.management.JMX;
+import javax.management.MBeanServerConnection;
+import javax.management.remote.JMXConnector;
+import javax.management.remote.JMXConnectorFactory;
+import javax.management.remote.JMXServiceURL;
 import java.io.BufferedReader;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.InputStreamReader;
+import java.net.InetAddress;
+import java.net.MalformedURLException;
+import java.net.UnknownHostException;
 import java.util.Map;
 
-public class ProcessWrapper extends Thread {
+public class ProcessWrapper {
 
   private final static Logger LOGGER = LoggerFactory.getLogger(ProcessWrapper.class);
 
   final int port;
+  final String name;
   final String className;
   final String[] classPath;
   final Map<String, String> properties;
-  java.lang.Process process;
 
-  public ProcessWrapper(String className, String[] classPath, Map<String, String> properties, String name, Integer port) {
-    super(name);
+  ProcessMXBean processMXBean;
+  MBeanServerConnection mBeanServer;
+
+  final java.lang.Process process;
+  private StreamGobbler errorGobbler;
+  private StreamGobbler outputGobbler;
+
+
+  public ProcessWrapper(String className, Map<String, String> properties, final String name, Integer port, String... classPath) {
     LOGGER.info("Creating Process for '{}' with monitoring port: {}", name, port);
+    this.name = name;
     this.port = port;
     this.className = className;
     this.classPath = classPath;
     this.properties = properties;
+
+    this.process = executeProcess();
+
+    new Thread(new Runnable() {
+      @Override
+      public void run() {
+        try {
+          process.waitFor();
+        } catch (InterruptedException e) {
+          e.printStackTrace();
+        } finally {
+          waitUntilFinish(outputGobbler);
+          waitUntilFinish(errorGobbler);
+        }
+        LOGGER.warn("Process '{}' Unexpectedly finished. Node should shutdown.", name);
+      }
+    }).start();
+
+    // Waiting for the Child VM to start and for JMX to be available
+    try {
+      Thread.sleep(2000);
+    } catch (InterruptedException e) {
+      e.printStackTrace();
+    }
+
+    JMXServiceURL jmxUrl = null;
+
+    try {
+      String protocol = "rmi";
+      String path = "/jndi/rmi://" + InetAddress.getLocalHost().getHostName() + ":" + port + "/jmxrmi";
+      jmxUrl = new JMXServiceURL(protocol, InetAddress.getLocalHost().getHostName(), port, path);
+      JMXConnector jmxConnector = JMXConnectorFactory.connect(jmxUrl, null);
+      mBeanServer = jmxConnector.getMBeanServerConnection();
+      processMXBean = JMX.newMBeanProxy(mBeanServer, Process.objectNameFor(name), ProcessMXBean.class);
+    } catch (MalformedURLException e) {
+      throw new IllegalStateException("JMXUrl '" + jmxUrl + "'is not valid", e);
+    } catch (UnknownHostException e) {
+      throw new IllegalStateException("Could not get hostname", e);
+    } catch (IOException e) {
+      throw new IllegalStateException("Could not connect to JMX service", e);
+    }
+
+    //TODO Register Scheduled timer to ping the Mbean
+  }
+
+  public boolean isReady() {
+    return processMXBean.isReady();
+  }
+
+  public void stop() {
+    processMXBean.stop();
   }
 
-  public void run() {
+  public void ping() {
+    processMXBean.ping();
+  }
+
+  public String getName() {
+    return name;
+  }
+
+  public ProcessMXBean getProcessMXBean() {
+    return processMXBean;
+  }
+
+  public java.lang.Process executeProcess() {
     ProcessBuilder processBuilder =
-      new ProcessBuilder("java", "-cp",
+      new ProcessBuilder("java",
+        "-Dcom.sun.management.jmxremote",
+        "-Dcom.sun.management.jmxremote.port=" + port,
+        "-Dcom.sun.management.jmxremote.authenticate=false",
+        "-Dcom.sun.management.jmxremote.ssl=false",
+        "-cp",
         StringUtils.join(classPath, ":"),
         className);
     processBuilder.environment().putAll(properties);
     processBuilder.environment().put(Process.NAME_PROPERTY, this.getName());
-    processBuilder.environment().put(Process.HEARTBEAT_PROPERTY, Integer.toString(port));
-
+    processBuilder.environment().put(Process.PORT_PROPERTY, Integer.toString(port));
+    System.out.println("processBuilder.toString(); = " + processBuilder.toString());
     try {
-      process = processBuilder.start();
-      StreamGobbler errorGobbler = new StreamGobbler(process.getErrorStream(), this.getName()+"-ERROR");
-      StreamGobbler outputGobbler = new StreamGobbler(process.getInputStream(), this.getName());
+      java.lang.Process process = processBuilder.start();
+      errorGobbler = new StreamGobbler(process.getErrorStream(), this.getName() + "-ERROR");
+      outputGobbler = new StreamGobbler(process.getInputStream(), this.getName());
       outputGobbler.start();
       errorGobbler.start();
-      while (!currentThread().isInterrupted()) {
-        process.wait(100);
-      }
+      return process;
     } catch (IOException e) {
       throw new IllegalStateException("Io Exception in ProcessWrapper", e);
-    } catch (InterruptedException e) {
-      LOGGER.warn("Process has been shutdown");
     }
   }
 
-  public void shutdown() {
-    LOGGER.info("Shutting down '{}'", this.getName());
-    this.interrupt();
-//    try {
-//      process.destroy();
-//    } catch (InterruptedException e) {
-//      //TODO do kill only in last resort...
-//      process.destroy();
-//    } finally {
-      process = null;
-//    }
+
+  private void closeStreams(java.lang.Process process) {
+    if (process != null) {
+      Closeables.closeQuietly(process.getInputStream());
+      Closeables.closeQuietly(process.getOutputStream());
+      Closeables.closeQuietly(process.getErrorStream());
+    }
   }
 
-  private class StreamGobbler extends Thread {
-    InputStream is;
-    String type;
+  private void waitUntilFinish(StreamGobbler thread) {
+    if (thread != null) {
+      try {
+        thread.join();
+      } catch (InterruptedException e) {
+        LOGGER.error("InterruptedException while waiting finish of " + thread.toString(), e);
+      }
+    }
+  }
 
-    private StreamGobbler(InputStream is, String type) {
+  private static class StreamGobbler extends Thread {
+    private final InputStream is;
+    private volatile Exception exception;
+    private final String pName;
+
+    StreamGobbler(InputStream is, String name) {
+      super("ProcessStreamGobbler");
       this.is = is;
-      this.type = type;
+      this.pName = name;
     }
 
     @Override
     public void run() {
+      InputStreamReader isr = new InputStreamReader(is);
+      BufferedReader br = new BufferedReader(isr);
       try {
-        InputStreamReader isr = new InputStreamReader(is);
-        BufferedReader br = new BufferedReader(isr);
-        String line = null;
-        while ((line = br.readLine()) != null)
-          System.out.println(type + " > " + line);
+        String line;
+        while ((line = br.readLine()) != null) {
+          LOGGER.info(pName + " > " + line);
+        }
       } catch (IOException ioe) {
-        ioe.printStackTrace();
+        exception = ioe;
+
+      } finally {
+        Closeables.closeQuietly(br);
+        Closeables.closeQuietly(isr);
       }
     }
+
+    public Exception getException() {
+      return exception;
+    }
   }
 }
diff --git a/server/sonar-process/src/test/java/org/sonar/process/MonitorServiceTest.java b/server/sonar-process/src/test/java/org/sonar/process/MonitorServiceTest.java
deleted file mode 100644 (file)
index bf801b1..0000000
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- * SonarQube, open source software quality management tool.
- * Copyright (C) 2008-2014 SonarSource
- * mailto:contact AT sonarsource DOT com
- *
- * SonarQube is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License as published by the Free Software Foundation; either
- * version 3 of the License, or (at your option) any later version.
- *
- * SonarQube is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public License
- * along with this program; if not, write to the Free Software Foundation,
- * Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
- */
-package org.sonar.process;
-
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-import java.net.DatagramSocket;
-
-public class MonitorServiceTest {
-
-
-  private DatagramSocket socket;
-
-  @Before
-  public void setUp() throws Exception {
-    socket = new DatagramSocket(0);
-  }
-
-  @After
-  public void tearDown() throws Exception {
-    if (socket != null && !socket.isClosed()) {
-      socket.close();
-    }
-  }
-
-  @Test
-  public void should_build() {
-    MonitorService monitor = new MonitorService(socket);
-  }
-
-//  class LongProcessWrapper extends ProcessWrapper {
-//
-//    LongProcessWrapper(String name, Integer port) {
-//      super(name, port);
-//    }
-//
-//    @Override
-//    public void run() {
-//      try {
-//        Thread.sleep(10000L);
-//      } catch (InterruptedException e) {
-//        e.printStackTrace();
-//      }
-//    }
-//  }
-}
\ No newline at end of file
diff --git a/server/sonar-process/src/test/java/org/sonar/process/ProcessWrapperTest.java b/server/sonar-process/src/test/java/org/sonar/process/ProcessWrapperTest.java
new file mode 100644 (file)
index 0000000..557527e
--- /dev/null
@@ -0,0 +1,46 @@
+package org.sonar.process;
+
+import com.sun.tools.attach.VirtualMachine;
+import org.junit.Before;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import javax.management.MalformedObjectNameException;
+import java.io.IOException;
+import java.lang.management.ManagementFactory;
+import java.lang.management.RuntimeMXBean;
+import java.net.ServerSocket;
+import java.util.Collections;
+
+import static org.fest.assertions.Assertions.assertThat;
+
+public class ProcessWrapperTest {
+
+  int freePort;
+
+  @Before
+  public void setup() throws IOException {
+    ServerSocket socket = new ServerSocket(0);
+    freePort = socket.getLocalPort();
+    socket.close();
+  }
+
+  @Test
+  @Ignore("Not a good idea to assert on # of VMs")
+  public void process_should_run() throws IOException, MalformedObjectNameException, InterruptedException {
+
+    int VMcount = VirtualMachine.list().size();
+    RuntimeMXBean runtime = ManagementFactory.getRuntimeMXBean();
+    ProcessWrapper wrapper = wrapper = new ProcessWrapper(ProcessTest.TestProcess.class.getName(),
+      Collections.EMPTY_MAP, "TEST", freePort, runtime.getClassPath());
+
+    assertThat(wrapper).isNotNull();
+    assertThat(wrapper.isReady()).isTrue();
+
+    assertThat(VirtualMachine.list().size()).isEqualTo(VMcount + 1);
+
+    wrapper.stop();
+    assertThat(VirtualMachine.list().size()).isEqualTo(VMcount);
+
+  }
+}
\ No newline at end of file