private File tempDir = null;
+ private int processIndex = -1;
+
public JavaCommand(String key) {
this.key = key;
+ processIndex = KnownJavaCommand.lookIndexFor(key);
+ if (processIndex == -1) {
+ processIndex = Monitor.getNextProcessId();
+ }
}
public String getKey() {
return key;
}
+ public int getProcessIndex() {
+ return processIndex;
+ }
+
public File getWorkDir() {
return workDir;
}
Process process = null;
try {
// cleanup existing monitor files
- ProcessCommands commands = new ProcessCommands(command.getTempDir(), command.getKey());
- commands.prepare();
+ ProcessCommands commands = new ProcessCommands(command.getTempDir(), command.getProcessIndex());
ProcessBuilder processBuilder = create(command);
LoggerFactory.getLogger(getClass()).info("Launch process[{}]: {}",
Properties props = new Properties();
props.putAll(javaCommand.getArguments());
props.setProperty(ProcessEntryPoint.PROPERTY_PROCESS_KEY, javaCommand.getKey());
+ props.setProperty(ProcessEntryPoint.PROPERTY_PROCESS_INDEX, "" + javaCommand.getProcessIndex());
props.setProperty(ProcessEntryPoint.PROPERTY_TERMINATION_TIMEOUT, String.valueOf(timeouts.getTerminationTimeout()));
props.setProperty(ProcessEntryPoint.PROPERTY_SHARED_PATH, javaCommand.getTempDir().getAbsolutePath());
OutputStream out = new FileOutputStream(propertiesFile);
--- /dev/null
+/*
+ * SonarQube, open source software quality management tool.
+ * Copyright (C) 2008-2014 SonarSource
+ * mailto:contact AT sonarsource DOT com
+ *
+ * SonarQube is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 3 of the License, or (at your option) any later version.
+ *
+ * SonarQube is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this program; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ */
+package org.sonar.process.monitor;
+
+/**
+ * Created by eric on 20/02/15.
+ */
+public enum KnownJavaCommand {
+ APP("app", 0), WEB("web", 1), ELASTIC_SEARCH("search", 2), UNKNOWN("unknown", -1);
+
+ private String key;
+ private int index;
+
+ KnownJavaCommand(String key, int index) {
+ this.key = key;
+ this.index = index;
+ }
+
+ public String getKey() {
+ return key;
+ }
+
+ public int getIndex() {
+ return index;
+ }
+
+ public static KnownJavaCommand lookFor(String key) {
+ for (KnownJavaCommand knownJavaCommand : KnownJavaCommand.values()) {
+ if (knownJavaCommand.getKey().equals(key)) {
+ return knownJavaCommand;
+ }
+ }
+ return KnownJavaCommand.UNKNOWN;
+ }
+
+ public static int lookIndexFor(String key) {
+ return lookFor(key).getIndex();
+ }
+
+ public static int getFirstIndexAvailable() {
+ int result = 0;
+ for (KnownJavaCommand knownJavaCommand : KnownJavaCommand.values()) {
+ result = knownJavaCommand.getIndex() >= result ? knownJavaCommand.getIndex() + 1 : result;
+ }
+ return result;
+ }
+}
import org.slf4j.LoggerFactory;
import org.sonar.process.Lifecycle;
import org.sonar.process.Lifecycle.State;
+import org.sonar.process.ProcessCommands;
import org.sonar.process.SystemExit;
import java.util.List;
// used by awaitStop() to block until all processes are shutdown
private final List<WatcherThread> watcherThreads = new CopyOnWriteArrayList<WatcherThread>();
+ static int nextProcessId = KnownJavaCommand.getFirstIndexAvailable();
Monitor(JavaProcessLauncher launcher, SystemExit exit, TerminatorThread terminator) {
this.launcher = launcher;
stop();
}
}
+
+ public static int getNextProcessId() {
+ if (nextProcessId >= ProcessCommands.getMaxProcesses()) {
+ throw new IllegalStateException("The maximum number of processes launched has been reached " + ProcessCommands.getMaxProcesses());
+ }
+ return nextProcessId++;
+ }
}
import org.junit.rules.Timeout;
import org.sonar.process.NetworkUtils;
import org.sonar.process.Lifecycle.State;
+import org.sonar.process.ProcessCommands;
import org.sonar.process.SystemExit;
import java.io.File;
}
}
+ @Test
+ public void test_too_many_processes() {
+ while (Monitor.getNextProcessId() < ProcessCommands.getMaxProcesses() - 1) {}
+ try {
+ newDefaultMonitor();
+ } catch (IllegalStateException e) {
+ assertThat(e).hasMessageStartingWith("The maximum number of processes launched has been reached ");
+ } finally {
+ Monitor.nextProcessId = KnownJavaCommand.getFirstIndexAvailable();
+ }
+ }
+
@Test
public void force_stop_if_too_long() throws Exception {
// TODO
*/
package org.sonar.process;
-import org.apache.commons.io.FileUtils;
+import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.MappedByteBuffer;
+import java.nio.channels.FileChannel;
/**
* Process inter-communication to :
*/
public class ProcessCommands {
- private final File readyFile, stopFile;
-
- public ProcessCommands(File directory, String processKey) {
+ /**
+ * The ByteBuffer will contains :
+ * <ul>
+ * <li>First byte will contains 0x00 until stop command is issued = 0xFF</li>
+ * <li>Then each 10 bytes will be reserved for each process</li>
+ * </ul>
+ *
+ * Description of ten bytes of each process :
+ * <ul>
+ * <li>First byte will contains the state 0x00 until READY 0x01</li>
+ * <li>The second byte will contains the request for stopping 0x00 or STOP (0xFF)</li>
+ * <li>The next 8 bytes contains a long (System.currentTimeInMillis for ping)</li>
+ * </ul>
+ */
+ final MappedByteBuffer mappedByteBuffer;
+ private static final int MAX_PROCESSES = 50;
+ private static final int BYTE_LENGTH_FOR_ONE_PROCESS = 1 + 1 + 8;
+ private static final int MAX_SHARED_MEMORY = BYTE_LENGTH_FOR_ONE_PROCESS * MAX_PROCESSES; // With this shared memory we can handle up to MAX_PROCESSES processes
+ public static final byte STOP = (byte) 0xFF;
+ public static final byte READY = (byte) 0x01;
+ public static final byte EMPTY = (byte) 0x00;
+
+ private int processNumber;
+
+ public ProcessCommands(File directory, int processNumber) {
+ // processNumber should not excess MAX_PROCESSES and must not be below -1
+ assert processNumber <= MAX_PROCESSES : "Incorrect process number";
+ assert processNumber >= -1 : "Incorrect process number";
+
+ this.processNumber = processNumber;
if (!directory.isDirectory() || !directory.exists()) {
throw new IllegalArgumentException("Not a valid directory: " + directory);
}
- this.readyFile = new File(directory, processKey + ".ready");
- this.stopFile = new File(directory, processKey + ".stop");
- }
-
- // visible for tests
- ProcessCommands(File readyFile, File stopFile) {
- this.readyFile = readyFile;
- this.stopFile = stopFile;
- }
- public void prepare() {
- deleteFile(readyFile);
- deleteFile(stopFile);
- }
-
- public void endWatch() {
- // do not fail if files can't be deleted
- FileUtils.deleteQuietly(readyFile);
- FileUtils.deleteQuietly(stopFile);
+ try {
+ RandomAccessFile sharedMemory = new RandomAccessFile(new File(directory, "sharedmemory"), "rw");
+ mappedByteBuffer = sharedMemory.getChannel().map(FileChannel.MapMode.READ_WRITE, 0, MAX_SHARED_MEMORY);
+ } catch (IOException e) {
+ throw new IllegalArgumentException("Unable to create shared memory : ", e);
+ }
}
public boolean isReady() {
- return readyFile.exists();
+ return canBeMonitored() && mappedByteBuffer.get(offset()) == READY;
}
/**
* To be executed by child process to declare that it's ready
*/
public void setReady() {
- createFile(readyFile);
+ if (canBeMonitored()) {
+ mappedByteBuffer.put(offset(), READY);
+ }
+ }
+
+ public void ping() {
+ if (canBeMonitored()) {
+ mappedByteBuffer.putLong(2 + offset(), System.currentTimeMillis());
+ }
+ }
+
+ public long getLastPing() {
+ if (canBeMonitored()) {
+ return mappedByteBuffer.getLong(2 + offset());
+ } else {
+ return -1;
+ }
}
/**
* To be executed by monitor process to ask for child process termination
*/
public void askForStop() {
- createFile(stopFile);
+ mappedByteBuffer.put(offset() + 1, STOP);
}
public boolean askedForStop() {
- return stopFile.exists();
+ return mappedByteBuffer.get(offset() + 1) == STOP;
}
- File getReadyFile() {
- return readyFile;
+ int offset() {
+ return BYTE_LENGTH_FOR_ONE_PROCESS * processNumber;
}
- File getStopFile() {
- return stopFile;
- }
-
- private void createFile(File file) {
- try {
- FileUtils.touch(file);
- } catch (IOException e) {
- throw new IllegalStateException(String.format("Fail to create file %s", file), e);
+ private boolean canBeMonitored() {
+ boolean result = processNumber >= 0 && processNumber < MAX_PROCESSES;
+ if (!result) {
+ LoggerFactory.getLogger(getClass()).info("This process cannot be monitored. Process Id : [{}]", processNumber);
}
+ return result;
}
- private void deleteFile(File file) {
- if (file.exists() && !file.delete()) {
- throw new MessageException(String.format(
- "Fail to delete file %s. Please check that no SonarQube process is alive", file));
- }
+ public static final int getMaxProcesses() {
+ return MAX_PROCESSES;
}
-}
+}
\ No newline at end of file
public class ProcessEntryPoint implements Stoppable {
public static final String PROPERTY_PROCESS_KEY = "process.key";
+ public static final String PROPERTY_PROCESS_INDEX = "process.index";
public static final String PROPERTY_TERMINATION_TIMEOUT = "process.terminationTimeout";
public static final String PROPERTY_SHARED_PATH = "process.sharedDir";
if (!lifecycle.tryToMoveTo(Lifecycle.State.STARTING)) {
throw new IllegalStateException("Already started");
}
- commands.prepare();
monitored = mp;
try {
public static ProcessEntryPoint createForArguments(String[] args) {
Props props = ConfigurationUtils.loadPropsFromCommandLineArgs(args);
ProcessCommands commands = new ProcessCommands(
- props.nonNullValueAsFile(PROPERTY_SHARED_PATH), props.nonNullValue(PROPERTY_PROCESS_KEY));
+ props.nonNullValueAsFile(PROPERTY_SHARED_PATH), Integer.parseInt(props.nonNullValue(PROPERTY_PROCESS_INDEX)));
return new ProcessEntryPoint(props, new SystemExit(), commands);
}
}
@Override
public void run() {
- commands.prepare();
- try {
- while (watching) {
- if (commands.askedForStop()) {
- LoggerFactory.getLogger(getClass()).info("Stopping process");
- stoppable.stopAsync();
+ while (watching) {
+ if (commands.askedForStop()) {
+ LoggerFactory.getLogger(getClass()).info("Stopping process");
+ stoppable.stopAsync();
+ watching = false;
+ } else {
+ try {
+ Thread.sleep(delayMs);
+ } catch (InterruptedException ignored) {
watching = false;
- } else {
- try {
- Thread.sleep(delayMs);
- } catch (InterruptedException ignored) {
- watching = false;
- }
}
}
- } finally {
- commands.endWatch();
}
}
LoggerFactory.getLogger(getClass()).error(String.format("Can not stop in %dms", terminationTimeout), e);
}
executor.shutdownNow();
- commands.endWatch();
}
}
import java.io.File;
import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.failBecauseExceptionWasNotThrown;
import static org.junit.Assert.fail;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
public class ProcessCommandsTest {
FileUtils.deleteQuietly(dir);
try {
- new ProcessCommands(dir, "web");
+ new ProcessCommands(dir, 1);
fail();
} catch (IllegalArgumentException e) {
assertThat(e).hasMessage("Not a valid directory: " + dir.getAbsolutePath());
}
@Test
- public void delete_files_on_prepare() throws Exception {
+ public void child_process_update_the_mapped_memory() throws Exception {
File dir = temp.newFolder();
- assertThat(dir).exists();
- FileUtils.touch(new File(dir, "web.ready"));
- FileUtils.touch(new File(dir, "web.stop"));
- ProcessCommands commands = new ProcessCommands(dir, "web");
- commands.prepare();
-
- assertThat(commands.getReadyFile()).doesNotExist();
- assertThat(commands.getStopFile()).doesNotExist();
- }
-
- @Test
- public void fail_to_prepare_if_file_is_locked() throws Exception {
- File readyFile = mock(File.class);
- when(readyFile.exists()).thenReturn(true);
- when(readyFile.delete()).thenReturn(false);
-
- ProcessCommands commands = new ProcessCommands(readyFile, temp.newFile());
- try {
- commands.prepare();
- fail();
- } catch (MessageException e) {
- // ok
- }
- }
-
- @Test
- public void child_process_create_file_when_ready() throws Exception {
- File dir = temp.newFolder();
-
- ProcessCommands commands = new ProcessCommands(dir, "web");
- commands.prepare();
+ ProcessCommands commands = new ProcessCommands(dir, 1);
assertThat(commands.isReady()).isFalse();
- assertThat(commands.getReadyFile()).doesNotExist();
+ assertThat(commands.mappedByteBuffer.get(commands.offset())).isEqualTo(ProcessCommands.EMPTY);
+ assertThat(commands.mappedByteBuffer.getLong(2 + commands.offset())).isEqualTo(0L);
commands.setReady();
assertThat(commands.isReady()).isTrue();
- assertThat(commands.getReadyFile()).exists().isFile();
+ assertThat(commands.mappedByteBuffer.get(commands.offset())).isEqualTo(ProcessCommands.READY);
- commands.endWatch();
- assertThat(commands.getReadyFile()).doesNotExist();
+ long currentTime = System.currentTimeMillis();
+ commands.ping();
+ assertThat(commands.mappedByteBuffer.getLong(2 + commands.offset())).isGreaterThanOrEqualTo(currentTime);
}
@Test
public void ask_for_stop() throws Exception {
File dir = temp.newFolder();
- ProcessCommands commands = new ProcessCommands(dir, "web");
+ ProcessCommands commands = new ProcessCommands(dir, 1);
+ assertThat(commands.mappedByteBuffer.get(commands.offset() + 1)).isNotEqualTo(ProcessCommands.STOP);
assertThat(commands.askedForStop()).isFalse();
- assertThat(commands.getStopFile()).doesNotExist();
commands.askForStop();
assertThat(commands.askedForStop()).isTrue();
- assertThat(commands.getStopFile()).exists().isFile();
- assertThat(commands.getStopFile().getName()).isEqualTo("web.stop");
+ assertThat(commands.mappedByteBuffer.get(commands.offset() + 1)).isEqualTo(ProcessCommands.STOP);
+ }
+
+ @Test
+ public void test_max_processes() throws Exception {
+ File dir = temp.newFolder();
+ try {
+ new ProcessCommands(dir, -2);
+ failBecauseExceptionWasNotThrown(AssertionError.class);
+ } catch (AssertionError e) {
+ assertThat(e).hasMessage("Incorrect process number");
+ }
+ try {
+ new ProcessCommands(dir, ProcessCommands.getMaxProcesses() + 1);
+ failBecauseExceptionWasNotThrown(AssertionError.class);
+ } catch (AssertionError e) {
+ assertThat(e).hasMessage("Incorrect process number");
+ }
}
}
@Test
public void load_properties_from_file() throws Exception {
File propsFile = temp.newFile();
- FileUtils.write(propsFile, "sonar.foo=bar\nprocess.key=web\nprocess.sharedDir=" + temp.newFolder().getAbsolutePath().replaceAll("\\\\", "/"));
+ FileUtils.write(propsFile, "sonar.foo=bar\nprocess.key=web\nprocess.index=1\nprocess.sharedDir=" + temp.newFolder().getAbsolutePath().replaceAll("\\\\", "/"));
ProcessEntryPoint entryPoint = ProcessEntryPoint.createForArguments(new String[] {propsFile.getAbsolutePath()});
assertThat(entryPoint.getProps().value("sonar.foo")).isEqualTo("bar");
stopper.join();
verify(monitored).stop();
- verify(commands).endWatch();
}
@Test(timeout = 3000L)
stopper.join();
verify(monitored).stop();
- // even if stopper was interrupted, stop watching process
- verify(commands).endWatch();
}
}
wrapper.disable_restarts=TRUE
wrapper.ping.timeout=0
wrapper.shutdown.timeout=3000
+wrapper.jvm_exit.timeout=3000
import org.sonar.process.StopWatcher;
import org.sonar.process.Stoppable;
import org.sonar.process.monitor.JavaCommand;
+import org.sonar.process.monitor.KnownJavaCommand;
import org.sonar.process.monitor.Monitor;
import java.io.File;
public void start(Props props) {
if (props.valueAsBoolean(ProcessConstants.ENABLE_STOP_COMMAND, false)) {
- // stop application when file <temp>/app.stop is created
File tempDir = props.nonNullValueAsFile(ProcessConstants.PATH_TEMP);
- ProcessCommands commands = new ProcessCommands(tempDir, "app");
+ ProcessCommands commands = new ProcessCommands(tempDir, KnownJavaCommand.APP.getIndex());
stopWatcher = new StopWatcher(commands, this);
stopWatcher.start();
}