+++ /dev/null
-/*
- * SonarQube, open source software quality management tool.
- * Copyright (C) 2008-2014 SonarSource
- * mailto:contact AT sonarsource DOT com
- *
- * SonarQube is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License as published by the Free Software Foundation; either
- * version 3 of the License, or (at your option) any later version.
- *
- * SonarQube is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public License
- * along with this program; if not, write to the Free Software Foundation,
- * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
- */
-package org.sonar.process;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.net.DatagramPacket;
-import java.net.DatagramSocket;
-import java.util.HashMap;
-import java.util.Map;
-
-public class MonitorService extends Thread {
-
- private final static Logger LOGGER = LoggerFactory.getLogger(ProcessWrapper.class);
-
- private final static Long MAX_ELAPSED_TIME = 10000L;
-
- final DatagramSocket socket;
- final Map<String, ProcessWrapper> processes;
- final Map<String, Long> processesPing;
-
- public MonitorService(DatagramSocket socket) {
- LOGGER.info("Monitor service is listening on socket:{}", socket.getLocalPort());
- this.socket = socket;
- processes = new HashMap<String, ProcessWrapper>();
- processesPing = new HashMap<String, Long>();
- }
-
- public void register(ProcessWrapper process) {
- this.processes.put(process.getName(), process);
- this.processesPing.put(process.getName(), System.currentTimeMillis());
- }
-
- @Override
- public void run() {
- LOGGER.info("Starting monitoring for {} processes.", processes.size());
- long time;
- while (!Thread.currentThread().isInterrupted()) {
- DatagramPacket packet = new DatagramPacket(new byte[1024], 1024);
- time = System.currentTimeMillis();
- try {
- socket.setSoTimeout(200);
- socket.receive(packet);
- String message = new String(packet.getData()).trim();
- processesPing.put(message, time);
- } catch (Exception e) {
- ; // To not do anything.
- }
- if (!checkAllProcessPing(time)) {
- Thread.currentThread().interrupt();
- }
- }
- }
-
-
- private boolean checkAllProcessPing(long now) {
-
- //check that all thread wrapper are running
- for (Thread thread : processes.values()) {
- if (thread.isInterrupted()) {
- LOGGER.error("process {} has been interrupted. Aborting node",
- thread.getName());
- return false;
- }
- }
-
- //check that all heartbeats are OK
- for (Map.Entry<String, Long> processPing : processesPing.entrySet()) {
- if ((now - processPing.getValue()) > MAX_ELAPSED_TIME) {
- LOGGER.error("process {} has not checked-in since {}ms. Aborting node",
- processPing.getKey(), (now - processPing.getValue()));
- return false;
- } else {
- LOGGER.debug("process {} has last checked-in {}ms ago.",
- processPing.getKey(), (now - processPing.getValue()));
- }
- }
- return true;
- }
-
- public Integer getMonitoringPort() {
- return socket.getLocalPort();
- }
-}
*/
package org.sonar.process;
+import com.google.common.io.Closeables;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import javax.management.JMX;
+import javax.management.MBeanServerConnection;
+import javax.management.remote.JMXConnector;
+import javax.management.remote.JMXConnectorFactory;
+import javax.management.remote.JMXServiceURL;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
+import java.net.InetAddress;
+import java.net.MalformedURLException;
+import java.net.UnknownHostException;
import java.util.Map;
-public class ProcessWrapper extends Thread {
+public class ProcessWrapper {
private final static Logger LOGGER = LoggerFactory.getLogger(ProcessWrapper.class);
final int port;
+ final String name;
final String className;
final String[] classPath;
final Map<String, String> properties;
- java.lang.Process process;
- public ProcessWrapper(String className, String[] classPath, Map<String, String> properties, String name, Integer port) {
- super(name);
+ ProcessMXBean processMXBean;
+ MBeanServerConnection mBeanServer;
+
+ final java.lang.Process process;
+ private StreamGobbler errorGobbler;
+ private StreamGobbler outputGobbler;
+
+
+ public ProcessWrapper(String className, Map<String, String> properties, final String name, Integer port, String... classPath) {
LOGGER.info("Creating Process for '{}' with monitoring port: {}", name, port);
+ this.name = name;
this.port = port;
this.className = className;
this.classPath = classPath;
this.properties = properties;
+
+ this.process = executeProcess();
+
+ new Thread(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ process.waitFor();
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ } finally {
+ waitUntilFinish(outputGobbler);
+ waitUntilFinish(errorGobbler);
+ }
+ LOGGER.warn("Process '{}' Unexpectedly finished. Node should shutdown.", name);
+ }
+ }).start();
+
+ // Waiting for the Child VM to start and for JMX to be available
+ try {
+ Thread.sleep(2000);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+
+ JMXServiceURL jmxUrl = null;
+
+ try {
+ String protocol = "rmi";
+ String path = "/jndi/rmi://" + InetAddress.getLocalHost().getHostName() + ":" + port + "/jmxrmi";
+ jmxUrl = new JMXServiceURL(protocol, InetAddress.getLocalHost().getHostName(), port, path);
+ JMXConnector jmxConnector = JMXConnectorFactory.connect(jmxUrl, null);
+ mBeanServer = jmxConnector.getMBeanServerConnection();
+ processMXBean = JMX.newMBeanProxy(mBeanServer, Process.objectNameFor(name), ProcessMXBean.class);
+ } catch (MalformedURLException e) {
+ throw new IllegalStateException("JMXUrl '" + jmxUrl + "'is not valid", e);
+ } catch (UnknownHostException e) {
+ throw new IllegalStateException("Could not get hostname", e);
+ } catch (IOException e) {
+ throw new IllegalStateException("Could not connect to JMX service", e);
+ }
+
+ //TODO Register Scheduled timer to ping the Mbean
+ }
+
+ public boolean isReady() {
+ return processMXBean.isReady();
+ }
+
+ public void stop() {
+ processMXBean.stop();
}
- public void run() {
+ public void ping() {
+ processMXBean.ping();
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public ProcessMXBean getProcessMXBean() {
+ return processMXBean;
+ }
+
+ public java.lang.Process executeProcess() {
ProcessBuilder processBuilder =
- new ProcessBuilder("java", "-cp",
+ new ProcessBuilder("java",
+ "-Dcom.sun.management.jmxremote",
+ "-Dcom.sun.management.jmxremote.port=" + port,
+ "-Dcom.sun.management.jmxremote.authenticate=false",
+ "-Dcom.sun.management.jmxremote.ssl=false",
+ "-cp",
StringUtils.join(classPath, ":"),
className);
processBuilder.environment().putAll(properties);
processBuilder.environment().put(Process.NAME_PROPERTY, this.getName());
- processBuilder.environment().put(Process.HEARTBEAT_PROPERTY, Integer.toString(port));
-
+ processBuilder.environment().put(Process.PORT_PROPERTY, Integer.toString(port));
+ System.out.println("processBuilder.toString(); = " + processBuilder.toString());
try {
- process = processBuilder.start();
- StreamGobbler errorGobbler = new StreamGobbler(process.getErrorStream(), this.getName()+"-ERROR");
- StreamGobbler outputGobbler = new StreamGobbler(process.getInputStream(), this.getName());
+ java.lang.Process process = processBuilder.start();
+ errorGobbler = new StreamGobbler(process.getErrorStream(), this.getName() + "-ERROR");
+ outputGobbler = new StreamGobbler(process.getInputStream(), this.getName());
outputGobbler.start();
errorGobbler.start();
- while (!currentThread().isInterrupted()) {
- process.wait(100);
- }
+ return process;
} catch (IOException e) {
throw new IllegalStateException("Io Exception in ProcessWrapper", e);
- } catch (InterruptedException e) {
- LOGGER.warn("Process has been shutdown");
}
}
- public void shutdown() {
- LOGGER.info("Shutting down '{}'", this.getName());
- this.interrupt();
-// try {
-// process.destroy();
-// } catch (InterruptedException e) {
-// //TODO do kill only in last resort...
-// process.destroy();
-// } finally {
- process = null;
-// }
+
+ private void closeStreams(java.lang.Process process) {
+ if (process != null) {
+ Closeables.closeQuietly(process.getInputStream());
+ Closeables.closeQuietly(process.getOutputStream());
+ Closeables.closeQuietly(process.getErrorStream());
+ }
}
- private class StreamGobbler extends Thread {
- InputStream is;
- String type;
+ private void waitUntilFinish(StreamGobbler thread) {
+ if (thread != null) {
+ try {
+ thread.join();
+ } catch (InterruptedException e) {
+ LOGGER.error("InterruptedException while waiting finish of " + thread.toString(), e);
+ }
+ }
+ }
- private StreamGobbler(InputStream is, String type) {
+ private static class StreamGobbler extends Thread {
+ private final InputStream is;
+ private volatile Exception exception;
+ private final String pName;
+
+ StreamGobbler(InputStream is, String name) {
+ super("ProcessStreamGobbler");
this.is = is;
- this.type = type;
+ this.pName = name;
}
@Override
public void run() {
+ InputStreamReader isr = new InputStreamReader(is);
+ BufferedReader br = new BufferedReader(isr);
try {
- InputStreamReader isr = new InputStreamReader(is);
- BufferedReader br = new BufferedReader(isr);
- String line = null;
- while ((line = br.readLine()) != null)
- System.out.println(type + " > " + line);
+ String line;
+ while ((line = br.readLine()) != null) {
+ LOGGER.info(pName + " > " + line);
+ }
} catch (IOException ioe) {
- ioe.printStackTrace();
+ exception = ioe;
+
+ } finally {
+ Closeables.closeQuietly(br);
+ Closeables.closeQuietly(isr);
}
}
+
+ public Exception getException() {
+ return exception;
+ }
}
}
+++ /dev/null
-/*
- * SonarQube, open source software quality management tool.
- * Copyright (C) 2008-2014 SonarSource
- * mailto:contact AT sonarsource DOT com
- *
- * SonarQube is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License as published by the Free Software Foundation; either
- * version 3 of the License, or (at your option) any later version.
- *
- * SonarQube is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public License
- * along with this program; if not, write to the Free Software Foundation,
- * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
- */
-package org.sonar.process;
-
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-import java.net.DatagramSocket;
-
-public class MonitorServiceTest {
-
-
- private DatagramSocket socket;
-
- @Before
- public void setUp() throws Exception {
- socket = new DatagramSocket(0);
- }
-
- @After
- public void tearDown() throws Exception {
- if (socket != null && !socket.isClosed()) {
- socket.close();
- }
- }
-
- @Test
- public void should_build() {
- MonitorService monitor = new MonitorService(socket);
- }
-
-// class LongProcessWrapper extends ProcessWrapper {
-//
-// LongProcessWrapper(String name, Integer port) {
-// super(name, port);
-// }
-//
-// @Override
-// public void run() {
-// try {
-// Thread.sleep(10000L);
-// } catch (InterruptedException e) {
-// e.printStackTrace();
-// }
-// }
-// }
-}
\ No newline at end of file
--- /dev/null
+package org.sonar.process;
+
+import com.sun.tools.attach.VirtualMachine;
+import org.junit.Before;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import javax.management.MalformedObjectNameException;
+import java.io.IOException;
+import java.lang.management.ManagementFactory;
+import java.lang.management.RuntimeMXBean;
+import java.net.ServerSocket;
+import java.util.Collections;
+
+import static org.fest.assertions.Assertions.assertThat;
+
+public class ProcessWrapperTest {
+
+ int freePort;
+
+ @Before
+ public void setup() throws IOException {
+ ServerSocket socket = new ServerSocket(0);
+ freePort = socket.getLocalPort();
+ socket.close();
+ }
+
+ @Test
+ @Ignore("Not a good idea to assert on # of VMs")
+ public void process_should_run() throws IOException, MalformedObjectNameException, InterruptedException {
+
+ int VMcount = VirtualMachine.list().size();
+ RuntimeMXBean runtime = ManagementFactory.getRuntimeMXBean();
+ ProcessWrapper wrapper = wrapper = new ProcessWrapper(ProcessTest.TestProcess.class.getName(),
+ Collections.EMPTY_MAP, "TEST", freePort, runtime.getClassPath());
+
+ assertThat(wrapper).isNotNull();
+ assertThat(wrapper.isReady()).isTrue();
+
+ assertThat(VirtualMachine.list().size()).isEqualTo(VMcount + 1);
+
+ wrapper.stop();
+ assertThat(VirtualMachine.list().size()).isEqualTo(VMcount);
+
+ }
+}
\ No newline at end of file