]> source.dussan.org Git - sonarqube.git/commitdiff
SONAR-15231 Pool ES status in clusters
authorDuarte Meneses <duarte.meneses@sonarsource.com>
Thu, 22 Jul 2021 20:38:33 +0000 (15:38 -0500)
committersonartech <sonartech@sonarsource.com>
Thu, 29 Jul 2021 20:04:50 +0000 (20:04 +0000)
server/sonar-main/src/main/java/org/sonar/application/cluster/ClusterAppStateImpl.java
server/sonar-main/src/test/java/org/sonar/application/cluster/ClusterAppStateImplTest.java

index b2f7ab578e6fce988d7cfc132f0e3462012445f9..bcc81f2e3fba8ef213e6c222b49e3f9c37e79cbf 100644 (file)
@@ -34,6 +34,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.UUID;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
 import org.elasticsearch.cluster.health.ClusterHealthStatus;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -62,10 +64,12 @@ public class ClusterAppStateImpl implements ClusterAppState {
   private final HazelcastMember hzMember;
   private final List<AppStateListener> listeners = new ArrayList<>();
   private final Map<ProcessId, Boolean> operationalLocalProcesses = new EnumMap<>(ProcessId.class);
+  private final AtomicBoolean esPoolingThreadRunning = new AtomicBoolean(false);
   private final ReplicatedMap<ClusterProcess, Boolean> operationalProcesses;
   private final UUID operationalProcessListenerUUID;
   private final UUID nodeDisconnectedListenerUUID;
   private final EsConnector esConnector;
+
   private HealthStateSharing healthStateSharing = null;
 
   public ClusterAppStateImpl(AppSettings settings, HazelcastMember hzMember, EsConnector esConnector, AppNodesClusterHostsConsistency appNodesClusterHostsConsistency) {
@@ -200,9 +204,52 @@ public class ClusterAppStateImpl implements ClusterAppState {
   }
 
   private boolean isElasticSearchAvailable() {
-    return esConnector.getClusterHealthStatus()
+    boolean available = esConnector.getClusterHealthStatus()
       .filter(t -> ClusterHealthStatus.GREEN.equals(t) || ClusterHealthStatus.YELLOW.equals(t))
       .isPresent();
+    if (!available) {
+      asyncWaitForEsToBecomeOperational();
+    }
+    return available;
+  }
+
+  private void asyncWaitForEsToBecomeOperational() {
+    if (esPoolingThreadRunning.compareAndSet(false, true)) {
+      Thread thread = new EsPoolingThread();
+      thread.start();
+    }
+  }
+
+  private class EsPoolingThread extends Thread {
+    private EsPoolingThread() {
+      super("es-state-pooling");
+      this.setDaemon(true);
+    }
+
+    @Override
+    public void run() {
+      if (isElasticSearchAvailable()) {
+        listeners.forEach(l -> l.onAppStateOperational(ProcessId.ELASTICSEARCH));
+        esPoolingThreadRunning.set(false);
+        return;
+      }
+
+      LOGGER.info("Waiting for ElasticSearch to be up and running");
+      do {
+        try {
+          Thread.sleep(5_000);
+        } catch (InterruptedException e) {
+          esPoolingThreadRunning.set(false);
+          Thread.currentThread().interrupt();
+          return;
+        }
+        if (isElasticSearchAvailable()) {
+          listeners.forEach(l -> l.onAppStateOperational(ProcessId.ELASTICSEARCH));
+          esPoolingThreadRunning.set(false);
+          return;
+        }
+      } while (true);
+    }
   }
 
   private class OperationalProcessListener implements EntryListener<ClusterProcess, Boolean> {
index 16bbb81773b19c38ff36f09800643b88694f5138..16dce4d4bc7f747c19cf42148f0d784fba599389 100644 (file)
@@ -21,6 +21,7 @@ package org.sonar.application.cluster;
 
 import java.net.InetAddress;
 import java.util.Optional;
+import org.elasticsearch.cluster.health.ClusterHealthStatus;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.DisableOnDebug;
@@ -41,6 +42,7 @@ import static org.assertj.core.api.Assertions.assertThat;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.timeout;
 import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 import static org.sonar.process.cluster.hz.HazelcastObjects.CLUSTER_NAME;
 import static org.sonar.process.cluster.hz.HazelcastObjects.SONARQUBE_VERSION;
 
@@ -77,6 +79,24 @@ public class ClusterAppStateImplTest {
     }
   }
 
+  @Test
+  public void check_if_elasticsearch_is_operational_on_cluster() {
+    AppStateListener listener = mock(AppStateListener.class);
+    EsConnector esConnectorMock = mock(EsConnector.class);
+    when(esConnectorMock.getClusterHealthStatus())
+      .thenReturn(Optional.empty())
+      .thenReturn(Optional.of(ClusterHealthStatus.RED))
+      .thenReturn(Optional.of(ClusterHealthStatus.GREEN));
+    try (ClusterAppStateImpl underTest = createClusterAppState(esConnectorMock)) {
+      underTest.addListener(listener);
+
+      underTest.isOperational(ProcessId.ELASTICSEARCH, false);
+
+      //wait until undergoing thread marks ES as operational
+      verify(listener, timeout(20_000)).onAppStateOperational(ProcessId.ELASTICSEARCH);
+    }
+  }
+
   @Test
   public void constructor_checks_appNodesClusterHostsConsistency() {
     AppNodesClusterHostsConsistency clusterHostsConsistency = mock(AppNodesClusterHostsConsistency.class);
@@ -164,7 +184,11 @@ public class ClusterAppStateImplTest {
   }
 
   private ClusterAppStateImpl createClusterAppState() {
-    return new ClusterAppStateImpl(new TestAppSettings(), newHzMember(), mock(EsConnector.class), mock(AppNodesClusterHostsConsistency.class));
+    return createClusterAppState(mock(EsConnector.class));
+  }
+
+  private ClusterAppStateImpl createClusterAppState(EsConnector esConnector) {
+    return new ClusterAppStateImpl(new TestAppSettings(), newHzMember(), esConnector, mock(AppNodesClusterHostsConsistency.class));
   }
 
   private static HazelcastMember newHzMember() {