import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.ArrayList;
import java.util.List;
+import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
private static final long PING_DELAY_MS = 3000L;
private long pingDelayMs = PING_DELAY_MS;
- private volatile List<ProcessWrapper> processes;
+ private final List<ProcessWrapper> processes = new CopyOnWriteArrayList<ProcessWrapper>();
private final ScheduledFuture<?> watch;
private final ScheduledExecutorService monitorExecutionService;
*/
public Monitor() {
super("Process Monitor");
- processes = new ArrayList<ProcessWrapper>();
monitorExecutionService = Executors.newScheduledThreadPool(1);
watch = monitorExecutionService.scheduleAtFixedRate(new ProcessWatch(), 0L, getPingDelayMs(), TimeUnit.MILLISECONDS);
}
/**
* Registers and monitors process. Note that process is probably not ready yet.
*/
- public void registerProcess(ProcessWrapper process) throws InterruptedException {
- LOGGER.info("Registering process[{}] for monitoring.", process.getName());
- synchronized (processes) {
- processes.add(process);
- }
+ public void monitor(ProcessWrapper process) throws InterruptedException {
+ LOGGER.info("Monitoring process[{}]", process.getName());
// starts a monitoring thread
process.start();
+ processes.add(process);
}
/**
try {
boolean ok = true;
while (isRunning && ok) {
- synchronized (processes) {
- LOGGER.debug("Monitoring {} processes.", processes.size());
- for (ProcessWrapper process : processes) {
- if (!ProcessUtils.isAlive(process.process())) {
- LOGGER.info("{} is down, stopping all other processes", process.getName());
- ok = false;
- interrupt();
- }
+ LOGGER.debug("Monitoring {} processes.", processes.size());
+ for (ProcessWrapper process : processes) {
+ if (!ProcessUtils.isAlive(process.process())) {
+ LOGGER.info("{} is down, stopping all other processes", process.getName());
+ ok = false;
+ interrupt();
}
}
- if (ok) {
- Thread.sleep(PING_DELAY_MS);
- }
+ }
+ if (ok) {
+ Thread.sleep(PING_DELAY_MS);
}
} catch (InterruptedException e) {
LOGGER.debug("Monitoring thread is interrupted");
this.join();
}
} catch (InterruptedException e) {
- //Expected to be interrupted :)
+ // Expected to be interrupted :)
}
}
}
.addClasspath("./lib/common/*")
.addClasspath("./lib/search/*");
if (elasticsearch.execute()) {
- monitor.registerProcess(elasticsearch);
+ monitor.monitor(elasticsearch);
if (elasticsearch.waitForReady()) {
logger.info("search server is up");
server.addClasspath(driverPath);
}
if (server.execute()) {
- monitor.registerProcess(server);
+ monitor.monitor(server);
if (server.waitForReady()) {
success = true;
logger.info("web server is up");