private final static Logger LOGGER = LoggerFactory.getLogger(ProcessWrapper.class);
+ private final static Long MAX_ELAPSED_TIME = 10000L;
+
final DatagramSocket socket;
final Map<String, ProcessWrapper> processes;
final Map<String, Long> processesPing;
public MonitorService(DatagramSocket socket) {
- LOGGER.info("Monitor listening on socket:{}", socket.getLocalPort());
+ LOGGER.info("Monitor service is listening on socket:{}", socket.getLocalPort());
this.socket = socket;
processes = new HashMap<String, ProcessWrapper>();
processesPing = new HashMap<String, Long>();
public void register(ProcessWrapper process) {
this.processes.put(process.getName(), process);
+ this.processesPing.put(process.getName(), System.currentTimeMillis());
}
@Override
break;
}
}
- LOGGER.info("Some app has not been pinging");
+ LOGGER.error("Not all process have checked-in. Aborting node");
for (ProcessWrapper process : processes.values()) {
process.shutdown();
}
//check that all thread wrapper are running
for (Thread thread : processes.values()) {
if (thread.isInterrupted()) {
+ LOGGER.error("process {} has been interrupted. Aborting node",
+ thread.getName());
return false;
}
}
//check that all heartbeats are OK
- for (Long ping : processesPing.values()) {
- if ((now - ping) > 5000) {
+ for (Map.Entry<String, Long> processPing : processesPing.entrySet()) {
+ if ((now - processPing.getValue()) > MAX_ELAPSED_TIME) {
+ LOGGER.error("process {} has not checked-in since {}ms. Aborting node",
+ processPing.getKey(), (now - processPing.getValue()));
return false;
+ } else {
+ LOGGER.debug("process {} has last checked-in {}ms ago.",
+ processPing.getKey(), (now - processPing.getValue()));
}
}
return true;