aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--server/sonar-main/src/main/java/org/sonar/application/cluster/ClusterAppStateImpl.java49
-rw-r--r--server/sonar-main/src/test/java/org/sonar/application/cluster/ClusterAppStateImplTest.java26
2 files changed, 73 insertions, 2 deletions
diff --git a/server/sonar-main/src/main/java/org/sonar/application/cluster/ClusterAppStateImpl.java b/server/sonar-main/src/main/java/org/sonar/application/cluster/ClusterAppStateImpl.java
index b2f7ab578e6..bcc81f2e3fb 100644
--- a/server/sonar-main/src/main/java/org/sonar/application/cluster/ClusterAppStateImpl.java
+++ b/server/sonar-main/src/main/java/org/sonar/application/cluster/ClusterAppStateImpl.java
@@ -34,6 +34,8 @@ import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
import org.elasticsearch.cluster.health.ClusterHealthStatus;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -62,10 +64,12 @@ public class ClusterAppStateImpl implements ClusterAppState {
private final HazelcastMember hzMember;
private final List<AppStateListener> listeners = new ArrayList<>();
private final Map<ProcessId, Boolean> operationalLocalProcesses = new EnumMap<>(ProcessId.class);
+ private final AtomicBoolean esPoolingThreadRunning = new AtomicBoolean(false);
private final ReplicatedMap<ClusterProcess, Boolean> operationalProcesses;
private final UUID operationalProcessListenerUUID;
private final UUID nodeDisconnectedListenerUUID;
private final EsConnector esConnector;
+
private HealthStateSharing healthStateSharing = null;
public ClusterAppStateImpl(AppSettings settings, HazelcastMember hzMember, EsConnector esConnector, AppNodesClusterHostsConsistency appNodesClusterHostsConsistency) {
@@ -200,9 +204,52 @@ public class ClusterAppStateImpl implements ClusterAppState {
}
private boolean isElasticSearchAvailable() {
- return esConnector.getClusterHealthStatus()
+ boolean available = esConnector.getClusterHealthStatus()
.filter(t -> ClusterHealthStatus.GREEN.equals(t) || ClusterHealthStatus.YELLOW.equals(t))
.isPresent();
+ if (!available) {
+ asyncWaitForEsToBecomeOperational();
+ }
+ return available;
+ }
+
+ private void asyncWaitForEsToBecomeOperational() {
+ if (esPoolingThreadRunning.compareAndSet(false, true)) {
+ Thread thread = new EsPoolingThread();
+ thread.start();
+ }
+ }
+
+ private class EsPoolingThread extends Thread {
+ private EsPoolingThread() {
+ super("es-state-pooling");
+ this.setDaemon(true);
+ }
+
+ @Override
+ public void run() {
+ if (isElasticSearchAvailable()) {
+ listeners.forEach(l -> l.onAppStateOperational(ProcessId.ELASTICSEARCH));
+ esPoolingThreadRunning.set(false);
+ return;
+ }
+
+ LOGGER.info("Waiting for ElasticSearch to be up and running");
+ do {
+ try {
+ Thread.sleep(5_000);
+ } catch (InterruptedException e) {
+ esPoolingThreadRunning.set(false);
+ Thread.currentThread().interrupt();
+ return;
+ }
+ if (isElasticSearchAvailable()) {
+ listeners.forEach(l -> l.onAppStateOperational(ProcessId.ELASTICSEARCH));
+ esPoolingThreadRunning.set(false);
+ return;
+ }
+ } while (true);
+ }
}
private class OperationalProcessListener implements EntryListener<ClusterProcess, Boolean> {
diff --git a/server/sonar-main/src/test/java/org/sonar/application/cluster/ClusterAppStateImplTest.java b/server/sonar-main/src/test/java/org/sonar/application/cluster/ClusterAppStateImplTest.java
index 16bbb81773b..16dce4d4bc7 100644
--- a/server/sonar-main/src/test/java/org/sonar/application/cluster/ClusterAppStateImplTest.java
+++ b/server/sonar-main/src/test/java/org/sonar/application/cluster/ClusterAppStateImplTest.java
@@ -21,6 +21,7 @@ package org.sonar.application.cluster;
import java.net.InetAddress;
import java.util.Optional;
+import org.elasticsearch.cluster.health.ClusterHealthStatus;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.DisableOnDebug;
@@ -41,6 +42,7 @@ import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.timeout;
import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
import static org.sonar.process.cluster.hz.HazelcastObjects.CLUSTER_NAME;
import static org.sonar.process.cluster.hz.HazelcastObjects.SONARQUBE_VERSION;
@@ -78,6 +80,24 @@ public class ClusterAppStateImplTest {
}
@Test
+ public void check_if_elasticsearch_is_operational_on_cluster() {
+ AppStateListener listener = mock(AppStateListener.class);
+ EsConnector esConnectorMock = mock(EsConnector.class);
+ when(esConnectorMock.getClusterHealthStatus())
+ .thenReturn(Optional.empty())
+ .thenReturn(Optional.of(ClusterHealthStatus.RED))
+ .thenReturn(Optional.of(ClusterHealthStatus.GREEN));
+ try (ClusterAppStateImpl underTest = createClusterAppState(esConnectorMock)) {
+ underTest.addListener(listener);
+
+ underTest.isOperational(ProcessId.ELASTICSEARCH, false);
+
+ //wait until undergoing thread marks ES as operational
+ verify(listener, timeout(20_000)).onAppStateOperational(ProcessId.ELASTICSEARCH);
+ }
+ }
+
+ @Test
public void constructor_checks_appNodesClusterHostsConsistency() {
AppNodesClusterHostsConsistency clusterHostsConsistency = mock(AppNodesClusterHostsConsistency.class);
try (ClusterAppStateImpl underTest = new ClusterAppStateImpl(new TestAppSettings(), newHzMember(),
@@ -164,7 +184,11 @@ public class ClusterAppStateImplTest {
}
private ClusterAppStateImpl createClusterAppState() {
- return new ClusterAppStateImpl(new TestAppSettings(), newHzMember(), mock(EsConnector.class), mock(AppNodesClusterHostsConsistency.class));
+ return createClusterAppState(mock(EsConnector.class));
+ }
+
+ private ClusterAppStateImpl createClusterAppState(EsConnector esConnector) {
+ return new ClusterAppStateImpl(new TestAppSettings(), newHzMember(), esConnector, mock(AppNodesClusterHostsConsistency.class));
}
private static HazelcastMember newHzMember() {