import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.sonar.application.config.AppSettings;
+import org.sonar.process.NodeType;
import org.sonar.process.ProcessProperties;
/**
private final List<String> hosts;
private final List<String> networkInterfaces;
private final String name;
+ private final NodeType nodeType;
ClusterProperties(AppSettings appSettings) {
port = appSettings.getProps().valueAsInt(ProcessProperties.CLUSTER_PORT);
hosts = extractHosts(
appSettings.getProps().value(ProcessProperties.CLUSTER_HOSTS, "")
);
+ nodeType = NodeType.parse(appSettings.getProps().value(ProcessProperties.CLUSTER_NODE_TYPE));
}
int getPort() {
return enabled;
}
+ public NodeType getNodeType() {
+ return nodeType;
+ }
+
List<String> getHosts() {
return hosts;
}
import com.hazelcast.core.ILock;
import com.hazelcast.core.MapEvent;
import com.hazelcast.core.Member;
+import com.hazelcast.core.MemberAttributeEvent;
+import com.hazelcast.core.MembershipEvent;
+import com.hazelcast.core.MembershipListener;
import com.hazelcast.core.ReplicatedMap;
import com.hazelcast.nio.Address;
import java.util.ArrayList;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.sonar.application.AppStateListener;
+import org.sonar.process.NodeType;
import org.sonar.process.ProcessId;
import static java.util.stream.Collectors.toList;
import static org.sonar.process.cluster.ClusterObjectKeys.CLIENT_UUIDS;
import static org.sonar.process.cluster.ClusterObjectKeys.HOSTNAME;
import static org.sonar.process.cluster.ClusterObjectKeys.LEADER;
+import static org.sonar.process.cluster.ClusterObjectKeys.NODE_TYPE;
import static org.sonar.process.cluster.ClusterObjectKeys.OPERATIONAL_PROCESSES;
import static org.sonar.process.cluster.ClusterObjectKeys.SONARQUBE_VERSION;
public class HazelcastCluster implements AutoCloseable {
- private static final Logger LOGGER = LoggerFactory.getLogger(HazelcastCluster.class);
+ private static Logger LOGGER = LoggerFactory.getLogger(HazelcastCluster.class);
private final List<AppStateListener> listeners = new ArrayList<>();
private final ReplicatedMap<ClusterProcess, Boolean> operationalProcesses;
private final String operationalProcessListenerUUID;
private final String clientListenerUUID;
+ private final String nodeDisconnectedListenerUUID;
protected final HazelcastInstance hzInstance;
operationalProcesses = hzInstance.getReplicatedMap(OPERATIONAL_PROCESSES);
operationalProcessListenerUUID = operationalProcesses.addEntryListener(new OperationalProcessListener());
clientListenerUUID = hzInstance.getClientService().addClientListener(new ConnectedClientListener());
+ nodeDisconnectedListenerUUID = hzInstance.getCluster().addMembershipListener(new NodeDisconnectedListener());
}
String getLocalUUID() {
// Removing listeners
operationalProcesses.removeEntryListener(operationalProcessListenerUUID);
hzInstance.getClientService().removeClientListener(clientListenerUUID);
+ hzInstance.getCluster().removeMembershipListener(nodeDisconnectedListenerUUID);
// Removing the operationalProcess from the replicated map
operationalProcesses.keySet().forEach(
.setProperty("hazelcast.logging.type", "slf4j");
// Trying to resolve the hostname
- hzConfig.getMemberAttributeConfig().setStringAttribute(HOSTNAME, getHostName());
+ hzConfig.getMemberAttributeConfig()
+ .setStringAttribute(HOSTNAME, getHostName());
+ hzConfig.getMemberAttributeConfig()
+ .setStringAttribute(NODE_TYPE, clusterProperties.getNodeType().getValue());
// We are not using the partition group of Hazelcast, so disabling it
hzConfig.getPartitionGroupConfig().setEnabled(false);
hzInstance.getSet(CLIENT_UUIDS).remove(client.getUuid());
}
}
+
+ private class NodeDisconnectedListener implements MembershipListener {
+ @Override
+ public void memberAdded(MembershipEvent membershipEvent) {
+ // Nothing to do
+ }
+
+ @Override
+ public void memberRemoved(MembershipEvent membershipEvent) {
+ removeOperationalProcess(membershipEvent.getMember().getUuid());
+ if (membershipEvent.getMembers().stream()
+ .noneMatch(this::isAppNode)) {
+ purgeSharedMemoryForAppNodes();
+ }
+ }
+
+ @Override
+ public void memberAttributeChanged(MemberAttributeEvent memberAttributeEvent) {
+ // Nothing to do
+ }
+
+ private boolean isAppNode(Member member) {
+ return NodeType.APPLICATION.getValue().equals(member.getStringAttribute(NODE_TYPE));
+ }
+
+ private void removeOperationalProcess(String uuid) {
+ for (ClusterProcess clusterProcess : operationalProcesses.keySet()) {
+ if (clusterProcess.getNodeUuid().equals(uuid)) {
+ LOGGER.debug("Set node process off for [{}:{}] : ", clusterProcess.getNodeUuid(), clusterProcess.getProcessId());
+ hzInstance.getReplicatedMap(OPERATIONAL_PROCESSES).put(clusterProcess, Boolean.FALSE);
+ }
+ }
+ }
+
+ private void purgeSharedMemoryForAppNodes() {
+ LOGGER.info("No more application nodes, clearing cluster information about application nodes.");
+ hzInstance.getAtomicReference(LEADER).clear();
+ hzInstance.getAtomicReference(SONARQUBE_VERSION).clear();
+ }
+ }
}
* The key of the hostname attribute of a member
*/
public static final String HOSTNAME = "HOSTNAME";
+
+ /**
+ * The role of the sonar-application inside the SonarQube cluster
+ * {@link org.sonar.process.NodeType}
+ */
+ public static final String NODE_TYPE = "NODE_TYPE";
/**
* The key of atomic reference holding the SonarQube version of the cluster
*/
import org.junit.runner.RunWith;
import org.junit.runners.Suite;
import org.sonarqube.tests.ce.CeWorkersTest;
-import org.sonarqube.tests.cluster.DataCenterEditionTest;
import org.sonarqube.tests.qualityProfile.ActiveRuleEsResilienceTest;
import org.sonarqube.tests.qualityProfile.BuiltInQualityProfilesNotificationTest;
import org.sonarqube.tests.rule.RuleEsResilienceTest;
TelemetryUploadTest.class,
TelemetryOptOutTest.class,
// ce
- CeWorkersTest.class,
- DataCenterEditionTest.class
+ CeWorkersTest.class
})
public class Category5Suite {
package org.sonarqube.tests.cluster;
+import com.sonar.orchestrator.db.Database;
+import com.sonar.orchestrator.db.DatabaseClient;
import java.io.IOException;
import java.net.InetAddress;
import java.net.ServerSocket;
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.sql.Statement;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
+import org.junit.Rule;
import org.junit.Test;
+import org.junit.rules.DisableOnDebug;
+import org.junit.rules.TestRule;
+import org.junit.rules.Timeout;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.tuple;
+import static org.junit.Assert.fail;
import static org.sonarqube.tests.cluster.Cluster.NodeType.APPLICATION;
import static org.sonarqube.tests.cluster.Cluster.NodeType.SEARCH;
public class DataCenterEditionTest {
+ @Rule
+ public TestRule timeout = new DisableOnDebug(Timeout.builder()
+ .withLookingForStuckThread(true)
+ .withTimeout(5, TimeUnit.MINUTES)
+ .build());
+
@Test
public void launch() throws ExecutionException, InterruptedException {
DataCenterEdition dce = new DataCenterEdition();
dce.stop();
}
+ @Test
+ public void upgrade_application_nodes_without_stopping_search_nodes_must_work() throws ExecutionException, InterruptedException, SQLException {
+ DataCenterEdition dce = new DataCenterEdition();
+ Cluster cluster = dce.getCluster();
+ dce.start();
+
+ // Stop all Application nodes
+ cluster.stopAll(n -> n.getType() == APPLICATION);
+
+ // Drop the schema
+ Database database = cluster.getNodes().get(0).getOrchestrator().getDatabase();
+ dropAndCreate(database.getClient());
+ assertDatabaseDropped(database);
+
+ // Start all Application nodes
+ cluster.startAll(n -> n.getType() == APPLICATION);
+
+ // We are expecting a new leader to be elected which will recreate the database
+ assertDatabaseInitialized(database);
+
+ dce.stop();
+ }
+
+ private void assertDatabaseInitialized(Database database) {
+ assertThat(countRowsOfMigration(database)).isGreaterThan(0);
+ }
+
+ private int countRowsOfMigration(Database database) {
+ return database.countSql("select count(*) from schema_migrations");
+ }
+
+ private void assertDatabaseDropped(Database database) {
+ try {
+ countRowsOfMigration(database);
+ fail("Table 'schema_migrations' has not been dropped");
+ } catch (Exception e) {
+ // we expect the table to not exist
+ }
+ }
+
private static boolean isPortBound(boolean loopback, @Nullable Integer port) {
if (port == null) {
return false;
return true;
}
}
+
+ private static void dropAndCreate(DatabaseClient databaseClient) throws SQLException {
+ try (Connection connection = databaseClient.openRootConnection()) {
+ executeDdl(connection, databaseClient.getDropDdl());
+ executeDdl(connection, databaseClient.getCreateDdl());
+ }
+ }
+
+ private static void executeDdl(Connection connection, String... ddls) throws SQLException {
+ try (Statement stmt = connection.createStatement()) {
+ for (String ddl : ddls) {
+ stmt.executeUpdate(ddl);
+ }
+ }
+ }
}