import org.sonar.process.cluster.hz.HazelcastMember;
import org.sonar.process.cluster.hz.HazelcastMemberBuilder;
-import static java.util.Arrays.asList;
import static org.sonar.process.ProcessProperties.Property.CLUSTER_HZ_HOSTS;
import static org.sonar.process.ProcessProperties.Property.CLUSTER_KUBERNETES;
import static org.sonar.process.ProcessProperties.Property.CLUSTER_NODE_HOST;
boolean isRunningOnKubernetes = props.valueAsBoolean(CLUSTER_KUBERNETES.getKey(), Boolean.parseBoolean(CLUSTER_KUBERNETES.getDefaultValue()));
HazelcastMemberBuilder builder = new HazelcastMemberBuilder(isRunningOnKubernetes ? KUBERNETES : TCP_IP)
.setNetworkInterface(props.nonNullValue(CLUSTER_NODE_HOST.getKey()))
- .setMembers(asList(props.nonNullValue(CLUSTER_HZ_HOSTS.getKey()).split(",")))
+ .setMembers(props.nonNullValue(CLUSTER_HZ_HOSTS.getKey()))
.setNodeName(props.nonNullValue(CLUSTER_NODE_NAME.getKey()))
.setPort(Integer.parseInt(props.nonNullValue(CLUSTER_NODE_HZ_PORT.getKey())))
.setProcessId(ProcessId.APP);
if (process == null) {
return;
}
- if (!isEsClientStartable()) {
+ if (!isEsOperational()) {
if (firstWaitingEsLog.getAndSet(false)) {
LOG.info("Waiting for Elasticsearch to be up and running");
}
private void tryToStartCe() throws InterruptedException {
ManagedProcessHandler process = processesById.get(ProcessId.COMPUTE_ENGINE);
- if (process != null && appState.isOperational(ProcessId.WEB_SERVER, true) && isEsClientStartable()) {
+ if (process != null && appState.isOperational(ProcessId.WEB_SERVER, true) && isEsOperational()) {
tryToStartProcess(process, commandFactory::createCeCommand);
}
}
- private boolean isEsClientStartable() {
+ private boolean isEsOperational() {
boolean requireLocalEs = ClusterSettings.isLocalElasticsearchEnabled(settings);
return appState.isOperational(ProcessId.ELASTICSEARCH, requireLocalEs);
}
package org.sonar.application.cluster;
import com.hazelcast.cluster.Member;
+import com.hazelcast.cluster.MembershipAdapter;
import com.hazelcast.cluster.MembershipEvent;
-import com.hazelcast.cluster.MembershipListener;
+import com.hazelcast.core.EntryAdapter;
import com.hazelcast.core.EntryEvent;
-import com.hazelcast.core.EntryListener;
import com.hazelcast.core.HazelcastInstanceNotActiveException;
import com.hazelcast.cp.IAtomicReference;
-import com.hazelcast.map.MapEvent;
import com.hazelcast.replicatedmap.ReplicatedMap;
import java.util.ArrayList;
import java.util.EnumMap;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicReference;
import org.elasticsearch.cluster.health.ClusterHealthStatus;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
}
if (processId.equals(ProcessId.ELASTICSEARCH)) {
- return isElasticSearchAvailable();
+ boolean operational = isElasticSearchOperational();
+ if (!operational) {
+ asyncWaitForEsToBecomeOperational();
+ }
+ return operational;
}
for (Map.Entry<ClusterProcess, Boolean> entry : operationalProcesses.entrySet()) {
}
}
- private boolean isElasticSearchAvailable() {
- boolean available = esConnector.getClusterHealthStatus()
+ private boolean isElasticSearchOperational() {
+ return esConnector.getClusterHealthStatus()
.filter(t -> ClusterHealthStatus.GREEN.equals(t) || ClusterHealthStatus.YELLOW.equals(t))
.isPresent();
- if (!available) {
- asyncWaitForEsToBecomeOperational();
- }
- return available;
}
private void asyncWaitForEsToBecomeOperational() {
@Override
public void run() {
- if (isElasticSearchAvailable()) {
- listeners.forEach(l -> l.onAppStateOperational(ProcessId.ELASTICSEARCH));
- esPoolingThreadRunning.set(false);
- return;
- }
+ while (true) {
+ if (isElasticSearchOperational()) {
+ esPoolingThreadRunning.set(false);
+ listeners.forEach(l -> l.onAppStateOperational(ProcessId.ELASTICSEARCH));
+ return;
+ }
- LOGGER.info("Waiting for ElasticSearch to be up and running");
- do {
try {
Thread.sleep(5_000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return;
}
- if (isElasticSearchAvailable()) {
- listeners.forEach(l -> l.onAppStateOperational(ProcessId.ELASTICSEARCH));
- esPoolingThreadRunning.set(false);
- return;
- }
- } while (true);
+ }
}
}
- private class OperationalProcessListener implements EntryListener<ClusterProcess, Boolean> {
+ private class OperationalProcessListener extends EntryAdapter<ClusterProcess, Boolean> {
@Override
public void entryAdded(EntryEvent<ClusterProcess, Boolean> event) {
if (event.getValue()) {
}
}
- @Override
- public void entryRemoved(EntryEvent<ClusterProcess, Boolean> event) {
- // Ignore it
- }
-
@Override
public void entryUpdated(EntryEvent<ClusterProcess, Boolean> event) {
if (event.getValue()) {
listeners.forEach(appStateListener -> appStateListener.onAppStateOperational(event.getKey().getProcessId()));
}
}
-
- @Override
- public void entryEvicted(EntryEvent<ClusterProcess, Boolean> event) {
- // Ignore it
- }
-
- @Override
- public void mapCleared(MapEvent event) {
- // Ignore it
- }
-
- @Override
- public void mapEvicted(MapEvent event) {
- // Ignore it
- }
-
- @Override
- public void entryExpired(EntryEvent<ClusterProcess, Boolean> event) {
- // Ignore it
- }
}
- private class NodeDisconnectedListener implements MembershipListener {
- @Override
- public void memberAdded(MembershipEvent membershipEvent) {
- // Nothing to do
- }
-
+ private class NodeDisconnectedListener extends MembershipAdapter {
@Override
public void memberRemoved(MembershipEvent membershipEvent) {
removeOperationalProcess(membershipEvent.getMember().getUuid());
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.DisableOnDebug;
-import org.junit.rules.ExpectedException;
import org.junit.rules.TestRule;
import org.junit.rules.Timeout;
import org.sonar.application.AppStateListener;
import org.sonar.process.cluster.hz.JoinConfigurationType;
import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.timeout;
import static org.mockito.Mockito.verify;
public class ClusterAppStateImplTest {
- @Rule
- public ExpectedException expectedException = ExpectedException.none();
-
@Rule
public TestRule safeguardTimeout = new DisableOnDebug(Timeout.seconds(60));
public void tryToLockWebLeader_returns_true_only_for_the_first_call() {
try (ClusterAppStateImpl underTest = new ClusterAppStateImpl(new TestAppSettings(), newHzMember(),
mock(EsConnector.class), mock(AppNodesClusterHostsConsistency.class))) {
- assertThat(underTest.tryToLockWebLeader()).isEqualTo(true);
- assertThat(underTest.tryToLockWebLeader()).isEqualTo(false);
+ assertThat(underTest.tryToLockWebLeader()).isTrue();
+ assertThat(underTest.tryToLockWebLeader()).isFalse();
}
}
underTest.setOperational(ProcessId.ELASTICSEARCH);
verify(listener, timeout(20_000)).onAppStateOperational(ProcessId.ELASTICSEARCH);
- assertThat(underTest.isOperational(ProcessId.ELASTICSEARCH, true)).isEqualTo(true);
- assertThat(underTest.isOperational(ProcessId.APP, true)).isEqualTo(false);
- assertThat(underTest.isOperational(ProcessId.WEB_SERVER, true)).isEqualTo(false);
- assertThat(underTest.isOperational(ProcessId.COMPUTE_ENGINE, true)).isEqualTo(false);
+ assertThat(underTest.isOperational(ProcessId.ELASTICSEARCH, true)).isTrue();
+ assertThat(underTest.isOperational(ProcessId.APP, true)).isFalse();
+ assertThat(underTest.isOperational(ProcessId.WEB_SERVER, true)).isFalse();
+ assertThat(underTest.isOperational(ProcessId.COMPUTE_ENGINE, true)).isFalse();
}
}
@Test
public void reset_always_throws_ISE() {
try (ClusterAppStateImpl underTest = createClusterAppState()) {
- expectedException.expect(IllegalStateException.class);
- expectedException.expectMessage("state reset is not supported in cluster mode");
-
- underTest.reset();
+ assertThatThrownBy(underTest::reset)
+ .isInstanceOf(IllegalStateException.class)
+ .hasMessageContaining("state reset is not supported in cluster mode");
}
}
// Register first version
underTest.getHazelcastMember().getAtomicReference(SONARQUBE_VERSION).set("6.6.0.1111");
- expectedException.expect(IllegalStateException.class);
- expectedException.expectMessage("The local version 6.7.0.9999 is not the same as the cluster 6.6.0.1111");
-
// Registering a second different version must trigger an exception
- underTest.registerSonarQubeVersion("6.7.0.9999");
+ assertThatThrownBy(() -> underTest.registerSonarQubeVersion("6.7.0.9999"))
+ .isInstanceOf(IllegalStateException.class)
+ .hasMessageContaining("The local version 6.7.0.9999 is not the same as the cluster 6.6.0.1111");
}
}
// Register first version
underTest.getHazelcastMember().getAtomicReference(CLUSTER_NAME).set("goodClusterName");
- expectedException.expect(MessageException.class);
- expectedException.expectMessage("This node has a cluster name [badClusterName], which does not match [goodClusterName] from the cluster");
-
// Registering a second different cluster name must trigger an exception
- underTest.registerClusterName("badClusterName");
+ assertThatThrownBy(() -> underTest.registerClusterName("badClusterName"))
+ .isInstanceOf(MessageException.class)
+ .hasMessage("This node has a cluster name [badClusterName], which does not match [goodClusterName] from the cluster");
}
}
.setProcessId(ProcessId.COMPUTE_ENGINE)
.setNodeName("bar")
.setPort(NetworkUtilsImpl.INSTANCE.getNextLoopbackAvailablePort())
+ .setMembers(loopback.getHostAddress())
.setNetworkInterface(loopback.getHostAddress())
.build();
}
import com.hazelcast.config.MemberAttributeConfig;
import com.hazelcast.config.NetworkConfig;
import com.hazelcast.core.Hazelcast;
-import java.util.ArrayList;
-import java.util.Collection;
import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
import org.sonar.process.ProcessId;
import org.sonar.process.cluster.hz.HazelcastMember.Attribute;
+import static java.lang.String.format;
import static java.util.Collections.singletonList;
import static java.util.Objects.requireNonNull;
+import static org.sonar.process.ProcessProperties.Property.CLUSTER_NODE_HZ_PORT;
import static org.sonar.process.cluster.hz.JoinConfigurationType.KUBERNETES;
-import static org.sonar.process.cluster.hz.JoinConfigurationType.TCP_IP;
public class HazelcastMemberBuilder {
private String nodeName;
private int port;
private ProcessId processId;
private String networkInterface;
- private final MembersResolver membersResolver;
- private final List<String> members = new ArrayList<>();
+ private String members;
private final JoinConfigurationType type;
public HazelcastMemberBuilder(JoinConfigurationType type) {
this.type = type;
- this.membersResolver = TCP_IP.equals(type) ? new TcpIpMembersResolver() : new NopMembersResolver();
}
public HazelcastMemberBuilder setNodeName(String s) {
/**
* Adds references to cluster members
*/
- public HazelcastMemberBuilder setMembers(Collection<String> members) {
- this.members.addAll(members);
+ public HazelcastMemberBuilder setMembers(String members) {
+ this.members = members;
return this;
}
joinConfig.getAwsConfig().setEnabled(false);
joinConfig.getMulticastConfig().setEnabled(false);
- List<String> resolvedNodes = membersResolver.resolveMembers(this.members);
if (KUBERNETES.equals(type)) {
joinConfig.getKubernetesConfig().setEnabled(true)
- .setProperty("service-dns", requireNonNull(resolvedNodes.get(0), "Service DNS is missing"))
- .setProperty("service-port", "9003");
+ .setProperty("service-dns", requireNonNull(members, "Service DNS is missing"))
+ .setProperty("service-port", CLUSTER_NODE_HZ_PORT.getDefaultValue());
} else {
+ List<String> addressesWithDefaultPorts = Stream.of(this.members.split(","))
+ .filter(host -> !host.isBlank())
+ .map(String::trim)
+ .map(HazelcastMemberBuilder::applyDefaultPortToHost)
+ .collect(Collectors.toList());
joinConfig.getTcpIpConfig().setEnabled(true);
- joinConfig.getTcpIpConfig().setMembers(requireNonNull(resolvedNodes, "Members are missing"));
+ joinConfig.getTcpIpConfig().setMembers(requireNonNull(addressesWithDefaultPorts, "Members are missing"));
}
// We are not using the partition group of Hazelcast, so disabling it
return new HazelcastMemberImpl(Hazelcast.newHazelcastInstance(config));
}
+ private static String applyDefaultPortToHost(String host) {
+ return host.contains(":") ? host : format("%s:%s", host, CLUSTER_NODE_HZ_PORT.getDefaultValue());
+ }
+
}
+++ /dev/null
-/*
- * SonarQube
- * Copyright (C) 2009-2021 SonarSource SA
- * mailto:info AT sonarsource DOT com
- *
- * This program is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License as published by the Free Software Foundation; either
- * version 3 of the License, or (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public License
- * along with this program; if not, write to the Free Software Foundation,
- * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
- */
-package org.sonar.process.cluster.hz;
-
-import java.util.List;
-
-interface MembersResolver {
- List<String> resolveMembers(List<String> membersToResolve);
-}
+++ /dev/null
-/*
- * SonarQube
- * Copyright (C) 2009-2021 SonarSource SA
- * mailto:info AT sonarsource DOT com
- *
- * This program is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License as published by the Free Software Foundation; either
- * version 3 of the License, or (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public License
- * along with this program; if not, write to the Free Software Foundation,
- * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
- */
-package org.sonar.process.cluster.hz;
-
-import java.util.List;
-
-class NopMembersResolver implements MembersResolver {
- @Override
- public List<String> resolveMembers(List<String> membersToResolve) {
- return membersToResolve;
- }
-
-}
+++ /dev/null
-/*
- * SonarQube
- * Copyright (C) 2009-2021 SonarSource SA
- * mailto:info AT sonarsource DOT com
- *
- * This program is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License as published by the Free Software Foundation; either
- * version 3 of the License, or (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public License
- * along with this program; if not, write to the Free Software Foundation,
- * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
- */
-package org.sonar.process.cluster.hz;
-
-import com.hazelcast.internal.util.AddressUtil;
-import java.net.InetAddress;
-import java.net.UnknownHostException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.List;
-import java.util.stream.Collectors;
-import org.sonar.api.utils.log.Logger;
-import org.sonar.api.utils.log.Loggers;
-
-import static java.lang.String.format;
-import static org.sonar.process.ProcessProperties.Property.CLUSTER_NODE_HZ_PORT;
-
-class TcpIpMembersResolver implements MembersResolver {
- private static final Logger LOG = Loggers.get(TcpIpMembersResolver.class);
-
- @Override
- public List<String> resolveMembers(List<String> membersToResolve) {
- return membersToResolve.stream().map(this::extractMembers).flatMap(Collection::stream).collect(Collectors.toList());
- }
-
- private List<String> extractMembers(String host) {
- LOG.debug("Trying to add host: " + host);
- String hostStripped = host.split(":")[0];
- if (AddressUtil.isIpAddress(hostStripped)) {
- LOG.debug("Found ip based host config for host: " + host);
- return Collections.singletonList(host.contains(":") ? host : format("%s:%s", host, CLUSTER_NODE_HZ_PORT.getDefaultValue()));
- } else {
- List<String> membersToAdd = new ArrayList<>();
- for (String memberIp : getAllByName(hostStripped)) {
- String prefix = memberIp.split("/")[1];
- LOG.debug("Found IP for: " + hostStripped + " : " + prefix);
- String memberPort = host.contains(":") ? host.split(":")[1] : CLUSTER_NODE_HZ_PORT.getDefaultValue();
- String member = prefix + ":" + memberPort;
- membersToAdd.add(member);
- }
- return membersToAdd;
- }
- }
-
- private List<String> getAllByName(String hostname) {
- LOG.debug("Trying to resolve Hostname: " + hostname);
- try {
- return Arrays.stream(InetAddress.getAllByName(hostname)).map(InetAddress::toString).collect(Collectors.toList());
- } catch (UnknownHostException e) {
- LOG.error("Host could not be found: " + e.getMessage());
- }
- return new ArrayList<>();
- }
-
-}
@Test
public void build_tcp_ip_member_hostaddress() {
HazelcastMember member = new HazelcastMemberBuilder(JoinConfigurationType.TCP_IP)
- .setMembers(Collections.singletonList(loopback.getHostAddress()))
+ .setMembers(loopback.getHostAddress())
.setProcessId(ProcessId.COMPUTE_ENGINE)
.setNodeName("bar")
.setPort(NetworkUtilsImpl.INSTANCE.getNextLoopbackAvailablePort())
@Test
public void build_tcp_ip_member_hostname() {
HazelcastMember member = new HazelcastMemberBuilder(JoinConfigurationType.TCP_IP)
- .setMembers(Collections.singletonList(loopback.getHostName()))
+ .setMembers(loopback.getHostName())
.setProcessId(ProcessId.COMPUTE_ENGINE)
.setNodeName("bar")
.setPort(NetworkUtilsImpl.INSTANCE.getNextLoopbackAvailablePort())
@Test
public void build_kubernetes_member() {
HazelcastMember member = new HazelcastMemberBuilder(JoinConfigurationType.KUBERNETES)
- .setMembers(Collections.singletonList(loopback.getHostAddress()))
+ .setMembers(loopback.getHostAddress())
.setProcessId(ProcessId.COMPUTE_ENGINE)
.setNodeName("bar")
.setPort(NetworkUtilsImpl.INSTANCE.getNextLoopbackAvailablePort())
.setNodeName("name" + port)
.setPort(port)
.setNetworkInterface(loopback.getHostAddress())
- .setMembers(Arrays.stream(otherPorts).mapToObj(p -> loopback.getHostAddress() + ":" + p).collect(Collectors.toList()))
+ .setMembers(Arrays.stream(otherPorts).mapToObj(p -> loopback.getHostAddress() + ":" + p).collect(Collectors.joining(",")))
.build();
}