import java.util.Map;
import java.util.Optional;
import java.util.UUID;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
import org.elasticsearch.cluster.health.ClusterHealthStatus;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
private final HazelcastMember hzMember;
private final List<AppStateListener> listeners = new ArrayList<>();
private final Map<ProcessId, Boolean> operationalLocalProcesses = new EnumMap<>(ProcessId.class);
+ private final AtomicBoolean esPoolingThreadRunning = new AtomicBoolean(false);
private final ReplicatedMap<ClusterProcess, Boolean> operationalProcesses;
private final UUID operationalProcessListenerUUID;
private final UUID nodeDisconnectedListenerUUID;
private final EsConnector esConnector;
+
private HealthStateSharing healthStateSharing = null;
public ClusterAppStateImpl(AppSettings settings, HazelcastMember hzMember, EsConnector esConnector, AppNodesClusterHostsConsistency appNodesClusterHostsConsistency) {
}
private boolean isElasticSearchAvailable() {
- return esConnector.getClusterHealthStatus()
+ boolean available = esConnector.getClusterHealthStatus()
.filter(t -> ClusterHealthStatus.GREEN.equals(t) || ClusterHealthStatus.YELLOW.equals(t))
.isPresent();
+ if (!available) {
+ asyncWaitForEsToBecomeOperational();
+ }
+ return available;
+ }
+
+ private void asyncWaitForEsToBecomeOperational() {
+ if (esPoolingThreadRunning.compareAndSet(false, true)) {
+ Thread thread = new EsPoolingThread();
+ thread.start();
+ }
+ }
+
+ private class EsPoolingThread extends Thread {
+ private EsPoolingThread() {
+ super("es-state-pooling");
+ this.setDaemon(true);
+ }
+
+ @Override
+ public void run() {
+ if (isElasticSearchAvailable()) {
+ listeners.forEach(l -> l.onAppStateOperational(ProcessId.ELASTICSEARCH));
+ esPoolingThreadRunning.set(false);
+ return;
+ }
+
+ LOGGER.info("Waiting for ElasticSearch to be up and running");
+ do {
+ try {
+ Thread.sleep(5_000);
+ } catch (InterruptedException e) {
+ esPoolingThreadRunning.set(false);
+ Thread.currentThread().interrupt();
+ return;
+ }
+ if (isElasticSearchAvailable()) {
+ listeners.forEach(l -> l.onAppStateOperational(ProcessId.ELASTICSEARCH));
+ esPoolingThreadRunning.set(false);
+ return;
+ }
+ } while (true);
+ }
}
private class OperationalProcessListener implements EntryListener<ClusterProcess, Boolean> {
import java.net.InetAddress;
import java.util.Optional;
+import org.elasticsearch.cluster.health.ClusterHealthStatus;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.DisableOnDebug;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.timeout;
import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
import static org.sonar.process.cluster.hz.HazelcastObjects.CLUSTER_NAME;
import static org.sonar.process.cluster.hz.HazelcastObjects.SONARQUBE_VERSION;
}
}
+ @Test
+ public void check_if_elasticsearch_is_operational_on_cluster() {
+ AppStateListener listener = mock(AppStateListener.class);
+ EsConnector esConnectorMock = mock(EsConnector.class);
+ when(esConnectorMock.getClusterHealthStatus())
+ .thenReturn(Optional.empty())
+ .thenReturn(Optional.of(ClusterHealthStatus.RED))
+ .thenReturn(Optional.of(ClusterHealthStatus.GREEN));
+ try (ClusterAppStateImpl underTest = createClusterAppState(esConnectorMock)) {
+ underTest.addListener(listener);
+
+ underTest.isOperational(ProcessId.ELASTICSEARCH, false);
+
+ //wait until undergoing thread marks ES as operational
+ verify(listener, timeout(20_000)).onAppStateOperational(ProcessId.ELASTICSEARCH);
+ }
+ }
+
@Test
public void constructor_checks_appNodesClusterHostsConsistency() {
AppNodesClusterHostsConsistency clusterHostsConsistency = mock(AppNodesClusterHostsConsistency.class);
}
private ClusterAppStateImpl createClusterAppState() {
- return new ClusterAppStateImpl(new TestAppSettings(), newHzMember(), mock(EsConnector.class), mock(AppNodesClusterHostsConsistency.class));
+ return createClusterAppState(mock(EsConnector.class));
+ }
+
+ private ClusterAppStateImpl createClusterAppState(EsConnector esConnector) {
+ return new ClusterAppStateImpl(new TestAppSettings(), newHzMember(), esConnector, mock(AppNodesClusterHostsConsistency.class));
}
private static HazelcastMember newHzMember() {