diff options
2 files changed, 73 insertions, 2 deletions
diff --git a/server/sonar-main/src/main/java/org/sonar/application/cluster/ClusterAppStateImpl.java b/server/sonar-main/src/main/java/org/sonar/application/cluster/ClusterAppStateImpl.java index b2f7ab578e6..bcc81f2e3fb 100644 --- a/server/sonar-main/src/main/java/org/sonar/application/cluster/ClusterAppStateImpl.java +++ b/server/sonar-main/src/main/java/org/sonar/application/cluster/ClusterAppStateImpl.java @@ -34,6 +34,8 @@ import java.util.List; import java.util.Map; import java.util.Optional; import java.util.UUID; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; import org.elasticsearch.cluster.health.ClusterHealthStatus; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -62,10 +64,12 @@ public class ClusterAppStateImpl implements ClusterAppState { private final HazelcastMember hzMember; private final List<AppStateListener> listeners = new ArrayList<>(); private final Map<ProcessId, Boolean> operationalLocalProcesses = new EnumMap<>(ProcessId.class); + private final AtomicBoolean esPoolingThreadRunning = new AtomicBoolean(false); private final ReplicatedMap<ClusterProcess, Boolean> operationalProcesses; private final UUID operationalProcessListenerUUID; private final UUID nodeDisconnectedListenerUUID; private final EsConnector esConnector; + private HealthStateSharing healthStateSharing = null; public ClusterAppStateImpl(AppSettings settings, HazelcastMember hzMember, EsConnector esConnector, AppNodesClusterHostsConsistency appNodesClusterHostsConsistency) { @@ -200,9 +204,52 @@ public class ClusterAppStateImpl implements ClusterAppState { } private boolean isElasticSearchAvailable() { - return esConnector.getClusterHealthStatus() + boolean available = esConnector.getClusterHealthStatus() .filter(t -> ClusterHealthStatus.GREEN.equals(t) || ClusterHealthStatus.YELLOW.equals(t)) .isPresent(); + if (!available) { + asyncWaitForEsToBecomeOperational(); + } + return available; + } + + private void asyncWaitForEsToBecomeOperational() { + if (esPoolingThreadRunning.compareAndSet(false, true)) { + Thread thread = new EsPoolingThread(); + thread.start(); + } + } + + private class EsPoolingThread extends Thread { + private EsPoolingThread() { + super("es-state-pooling"); + this.setDaemon(true); + } + + @Override + public void run() { + if (isElasticSearchAvailable()) { + listeners.forEach(l -> l.onAppStateOperational(ProcessId.ELASTICSEARCH)); + esPoolingThreadRunning.set(false); + return; + } + + LOGGER.info("Waiting for ElasticSearch to be up and running"); + do { + try { + Thread.sleep(5_000); + } catch (InterruptedException e) { + esPoolingThreadRunning.set(false); + Thread.currentThread().interrupt(); + return; + } + if (isElasticSearchAvailable()) { + listeners.forEach(l -> l.onAppStateOperational(ProcessId.ELASTICSEARCH)); + esPoolingThreadRunning.set(false); + return; + } + } while (true); + } } private class OperationalProcessListener implements EntryListener<ClusterProcess, Boolean> { diff --git a/server/sonar-main/src/test/java/org/sonar/application/cluster/ClusterAppStateImplTest.java b/server/sonar-main/src/test/java/org/sonar/application/cluster/ClusterAppStateImplTest.java index 16bbb81773b..16dce4d4bc7 100644 --- a/server/sonar-main/src/test/java/org/sonar/application/cluster/ClusterAppStateImplTest.java +++ b/server/sonar-main/src/test/java/org/sonar/application/cluster/ClusterAppStateImplTest.java @@ -21,6 +21,7 @@ package org.sonar.application.cluster; import java.net.InetAddress; import java.util.Optional; +import org.elasticsearch.cluster.health.ClusterHealthStatus; import org.junit.Rule; import org.junit.Test; import org.junit.rules.DisableOnDebug; @@ -41,6 +42,7 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.timeout; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; import static org.sonar.process.cluster.hz.HazelcastObjects.CLUSTER_NAME; import static org.sonar.process.cluster.hz.HazelcastObjects.SONARQUBE_VERSION; @@ -78,6 +80,24 @@ public class ClusterAppStateImplTest { } @Test + public void check_if_elasticsearch_is_operational_on_cluster() { + AppStateListener listener = mock(AppStateListener.class); + EsConnector esConnectorMock = mock(EsConnector.class); + when(esConnectorMock.getClusterHealthStatus()) + .thenReturn(Optional.empty()) + .thenReturn(Optional.of(ClusterHealthStatus.RED)) + .thenReturn(Optional.of(ClusterHealthStatus.GREEN)); + try (ClusterAppStateImpl underTest = createClusterAppState(esConnectorMock)) { + underTest.addListener(listener); + + underTest.isOperational(ProcessId.ELASTICSEARCH, false); + + //wait until undergoing thread marks ES as operational + verify(listener, timeout(20_000)).onAppStateOperational(ProcessId.ELASTICSEARCH); + } + } + + @Test public void constructor_checks_appNodesClusterHostsConsistency() { AppNodesClusterHostsConsistency clusterHostsConsistency = mock(AppNodesClusterHostsConsistency.class); try (ClusterAppStateImpl underTest = new ClusterAppStateImpl(new TestAppSettings(), newHzMember(), @@ -164,7 +184,11 @@ public class ClusterAppStateImplTest { } private ClusterAppStateImpl createClusterAppState() { - return new ClusterAppStateImpl(new TestAppSettings(), newHzMember(), mock(EsConnector.class), mock(AppNodesClusterHostsConsistency.class)); + return createClusterAppState(mock(EsConnector.class)); + } + + private ClusterAppStateImpl createClusterAppState(EsConnector esConnector) { + return new ClusterAppStateImpl(new TestAppSettings(), newHzMember(), esConnector, mock(AppNodesClusterHostsConsistency.class)); } private static HazelcastMember newHzMember() { |