]> source.dussan.org Git - sonarqube.git/commitdiff
SONAR-9713 Allow to upgrade a cluster without stopping search nodes
authorEric Hartmann <hartmann.eric@gmail.com>
Wed, 16 Aug 2017 14:26:06 +0000 (16:26 +0200)
committerSimon Brandhof <simon.brandhof@sonarsource.com>
Tue, 5 Sep 2017 12:24:13 +0000 (14:24 +0200)
server/sonar-main/src/main/java/org/sonar/application/cluster/ClusterProperties.java
server/sonar-main/src/main/java/org/sonar/application/cluster/HazelcastCluster.java
server/sonar-process/src/main/java/org/sonar/process/cluster/ClusterObjectKeys.java
tests/src/test/java/org/sonarqube/tests/Category5Suite.java
tests/src/test/java/org/sonarqube/tests/cluster/DataCenterEditionTest.java

index f33877f4562fecdbe718aa4d39bcef7f72bd6c6a..aad11cfe302eb19b57259dbd105fcab7b8bb42de 100644 (file)
@@ -31,6 +31,7 @@ import org.apache.commons.lang.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.sonar.application.config.AppSettings;
+import org.sonar.process.NodeType;
 import org.sonar.process.ProcessProperties;
 
 /**
@@ -45,6 +46,7 @@ public final class ClusterProperties {
   private final List<String> hosts;
   private final List<String> networkInterfaces;
   private final String name;
+  private final NodeType nodeType;
 
   ClusterProperties(AppSettings appSettings) {
     port = appSettings.getProps().valueAsInt(ProcessProperties.CLUSTER_PORT);
@@ -56,6 +58,7 @@ public final class ClusterProperties {
     hosts = extractHosts(
       appSettings.getProps().value(ProcessProperties.CLUSTER_HOSTS, "")
     );
+    nodeType = NodeType.parse(appSettings.getProps().value(ProcessProperties.CLUSTER_NODE_TYPE));
   }
 
   int getPort() {
@@ -66,6 +69,10 @@ public final class ClusterProperties {
     return enabled;
   }
 
+  public NodeType getNodeType() {
+    return nodeType;
+  }
+
   List<String> getHosts() {
     return hosts;
   }
index a1e4f77fdba8b80d178bccc31fd9e5447a765a16..d9cbe2c5df64af43721fbf9b697ae7391b713f3e 100644 (file)
@@ -34,6 +34,9 @@ import com.hazelcast.core.IAtomicReference;
 import com.hazelcast.core.ILock;
 import com.hazelcast.core.MapEvent;
 import com.hazelcast.core.Member;
+import com.hazelcast.core.MemberAttributeEvent;
+import com.hazelcast.core.MembershipEvent;
+import com.hazelcast.core.MembershipListener;
 import com.hazelcast.core.ReplicatedMap;
 import com.hazelcast.nio.Address;
 import java.util.ArrayList;
@@ -43,6 +46,7 @@ import java.util.Optional;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.sonar.application.AppStateListener;
+import org.sonar.process.NodeType;
 import org.sonar.process.ProcessId;
 
 import static java.util.stream.Collectors.toList;
@@ -50,16 +54,18 @@ import static org.sonar.process.NetworkUtils.getHostName;
 import static org.sonar.process.cluster.ClusterObjectKeys.CLIENT_UUIDS;
 import static org.sonar.process.cluster.ClusterObjectKeys.HOSTNAME;
 import static org.sonar.process.cluster.ClusterObjectKeys.LEADER;
+import static org.sonar.process.cluster.ClusterObjectKeys.NODE_TYPE;
 import static org.sonar.process.cluster.ClusterObjectKeys.OPERATIONAL_PROCESSES;
 import static org.sonar.process.cluster.ClusterObjectKeys.SONARQUBE_VERSION;
 
 public class HazelcastCluster implements AutoCloseable {
-  private static final Logger LOGGER = LoggerFactory.getLogger(HazelcastCluster.class);
+  private static Logger LOGGER = LoggerFactory.getLogger(HazelcastCluster.class);
 
   private final List<AppStateListener> listeners = new ArrayList<>();
   private final ReplicatedMap<ClusterProcess, Boolean> operationalProcesses;
   private final String operationalProcessListenerUUID;
   private final String clientListenerUUID;
+  private final String nodeDisconnectedListenerUUID;
 
   protected final HazelcastInstance hzInstance;
 
@@ -71,6 +77,7 @@ public class HazelcastCluster implements AutoCloseable {
     operationalProcesses = hzInstance.getReplicatedMap(OPERATIONAL_PROCESSES);
     operationalProcessListenerUUID = operationalProcesses.addEntryListener(new OperationalProcessListener());
     clientListenerUUID = hzInstance.getClientService().addClientListener(new ConnectedClientListener());
+    nodeDisconnectedListenerUUID = hzInstance.getCluster().addMembershipListener(new NodeDisconnectedListener());
   }
 
   String getLocalUUID() {
@@ -154,6 +161,7 @@ public class HazelcastCluster implements AutoCloseable {
         // Removing listeners
         operationalProcesses.removeEntryListener(operationalProcessListenerUUID);
         hzInstance.getClientService().removeClientListener(clientListenerUUID);
+      hzInstance.getCluster().removeMembershipListener(nodeDisconnectedListenerUUID);
 
         // Removing the operationalProcess from the replicated map
         operationalProcesses.keySet().forEach(
@@ -207,7 +215,10 @@ public class HazelcastCluster implements AutoCloseable {
       .setProperty("hazelcast.logging.type", "slf4j");
 
     // Trying to resolve the hostname
-    hzConfig.getMemberAttributeConfig().setStringAttribute(HOSTNAME, getHostName());
+    hzConfig.getMemberAttributeConfig()
+      .setStringAttribute(HOSTNAME, getHostName());
+    hzConfig.getMemberAttributeConfig()
+      .setStringAttribute(NODE_TYPE, clusterProperties.getNodeType().getValue());
 
     // We are not using the partition group of Hazelcast, so disabling it
     hzConfig.getPartitionGroupConfig().setEnabled(false);
@@ -277,4 +288,44 @@ public class HazelcastCluster implements AutoCloseable {
       hzInstance.getSet(CLIENT_UUIDS).remove(client.getUuid());
     }
   }
+
+  private class NodeDisconnectedListener implements MembershipListener {
+    @Override
+    public void memberAdded(MembershipEvent membershipEvent) {
+      // Nothing to do
+    }
+
+    @Override
+    public void memberRemoved(MembershipEvent membershipEvent) {
+      removeOperationalProcess(membershipEvent.getMember().getUuid());
+      if (membershipEvent.getMembers().stream()
+        .noneMatch(this::isAppNode)) {
+        purgeSharedMemoryForAppNodes();
+      }
+    }
+
+    @Override
+    public void memberAttributeChanged(MemberAttributeEvent memberAttributeEvent) {
+      // Nothing to do
+    }
+
+    private boolean isAppNode(Member member) {
+      return NodeType.APPLICATION.getValue().equals(member.getStringAttribute(NODE_TYPE));
+    }
+
+    private void removeOperationalProcess(String uuid) {
+      for (ClusterProcess clusterProcess : operationalProcesses.keySet()) {
+        if (clusterProcess.getNodeUuid().equals(uuid)) {
+          LOGGER.debug("Set node process off for [{}:{}] : ", clusterProcess.getNodeUuid(), clusterProcess.getProcessId());
+          hzInstance.getReplicatedMap(OPERATIONAL_PROCESSES).put(clusterProcess, Boolean.FALSE);
+        }
+      }
+    }
+
+    private void purgeSharedMemoryForAppNodes() {
+      LOGGER.info("No more application nodes, clearing cluster information about application nodes.");
+      hzInstance.getAtomicReference(LEADER).clear();
+      hzInstance.getAtomicReference(SONARQUBE_VERSION).clear();
+    }
+  }
 }
index 1f7d9be3eade469e7360254fa38b375d55d30dd7..2306ce7aca2579a8a30d1208d3ac2ed32a9c8b00 100644 (file)
@@ -41,6 +41,12 @@ public final class ClusterObjectKeys {
    * The key of the hostname attribute of a member
    */
   public static final String HOSTNAME = "HOSTNAME";
+
+  /**
+   * The role of the sonar-application inside the SonarQube cluster
+   * {@link org.sonar.process.NodeType}
+   */
+  public static final String NODE_TYPE = "NODE_TYPE";
   /**
    * The key of atomic reference holding the SonarQube version of the cluster
    */
index fd12d951546caabcf87b03919216352b81be00e7..19ed5d87d37b35b137f99ced23fe8ff4667bd98b 100644 (file)
@@ -22,7 +22,6 @@ package org.sonarqube.tests;
 import org.junit.runner.RunWith;
 import org.junit.runners.Suite;
 import org.sonarqube.tests.ce.CeWorkersTest;
-import org.sonarqube.tests.cluster.DataCenterEditionTest;
 import org.sonarqube.tests.qualityProfile.ActiveRuleEsResilienceTest;
 import org.sonarqube.tests.qualityProfile.BuiltInQualityProfilesNotificationTest;
 import org.sonarqube.tests.rule.RuleEsResilienceTest;
@@ -63,8 +62,7 @@ import org.sonarqube.tests.user.UserEsResilienceTest;
   TelemetryUploadTest.class,
   TelemetryOptOutTest.class,
   // ce
-  CeWorkersTest.class,
-  DataCenterEditionTest.class
+  CeWorkersTest.class
 })
 public class Category5Suite {
 
index 832d6111c82f91cf510af979c9e9ea9505971dd7..2a52d6f7818cea8f825d758ac2aa5d2b7a99022e 100644 (file)
 
 package org.sonarqube.tests.cluster;
 
+import com.sonar.orchestrator.db.Database;
+import com.sonar.orchestrator.db.DatabaseClient;
 import java.io.IOException;
 import java.net.InetAddress;
 import java.net.ServerSocket;
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.sql.Statement;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
 import javax.annotation.Nullable;
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.DisableOnDebug;
+import org.junit.rules.TestRule;
+import org.junit.rules.Timeout;
 
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.tuple;
+import static org.junit.Assert.fail;
 import static org.sonarqube.tests.cluster.Cluster.NodeType.APPLICATION;
 import static org.sonarqube.tests.cluster.Cluster.NodeType.SEARCH;
 
 public class DataCenterEditionTest {
 
+  @Rule
+  public TestRule timeout = new DisableOnDebug(Timeout.builder()
+    .withLookingForStuckThread(true)
+    .withTimeout(5, TimeUnit.MINUTES)
+    .build());
+
   @Test
   public void launch() throws ExecutionException, InterruptedException {
     DataCenterEdition dce = new DataCenterEdition();
@@ -51,6 +68,46 @@ public class DataCenterEditionTest {
     dce.stop();
   }
 
+  @Test
+  public void upgrade_application_nodes_without_stopping_search_nodes_must_work() throws ExecutionException, InterruptedException, SQLException {
+    DataCenterEdition dce = new DataCenterEdition();
+    Cluster cluster = dce.getCluster();
+    dce.start();
+
+    // Stop all Application nodes
+    cluster.stopAll(n -> n.getType() == APPLICATION);
+
+    // Drop the schema
+    Database database = cluster.getNodes().get(0).getOrchestrator().getDatabase();
+    dropAndCreate(database.getClient());
+    assertDatabaseDropped(database);
+
+    // Start all Application nodes
+    cluster.startAll(n -> n.getType() == APPLICATION);
+
+    // We are expecting a new leader to be elected which will recreate the database
+    assertDatabaseInitialized(database);
+
+    dce.stop();
+  }
+
+  private void assertDatabaseInitialized(Database database) {
+    assertThat(countRowsOfMigration(database)).isGreaterThan(0);
+  }
+
+  private int countRowsOfMigration(Database database) {
+    return database.countSql("select count(*) from schema_migrations");
+  }
+
+  private void assertDatabaseDropped(Database database) {
+    try {
+      countRowsOfMigration(database);
+      fail("Table 'schema_migrations' has not been dropped");
+    } catch (Exception e) {
+      // we expect the table to not exist
+    }
+  }
+
   private static boolean isPortBound(boolean loopback, @Nullable Integer port) {
     if (port == null) {
       return false;
@@ -62,4 +119,19 @@ public class DataCenterEditionTest {
       return true;
     }
   }
+
+  private static void dropAndCreate(DatabaseClient databaseClient) throws SQLException {
+    try (Connection connection = databaseClient.openRootConnection()) {
+      executeDdl(connection, databaseClient.getDropDdl());
+      executeDdl(connection, databaseClient.getCreateDdl());
+    }
+  }
+
+  private static void executeDdl(Connection connection, String... ddls) throws SQLException {
+    try (Statement stmt = connection.createStatement()) {
+      for (String ddl : ddls) {
+        stmt.executeUpdate(ddl);
+      }
+    }
+  }
 }