diff options
Diffstat (limited to 'server')
9 files changed, 432 insertions, 17 deletions
diff --git a/server/sonar-main/src/main/java/org/sonar/application/AppStateFactory.java b/server/sonar-main/src/main/java/org/sonar/application/AppStateFactory.java index 72580deebc8..2e9100bac07 100644 --- a/server/sonar-main/src/main/java/org/sonar/application/AppStateFactory.java +++ b/server/sonar-main/src/main/java/org/sonar/application/AppStateFactory.java @@ -23,6 +23,7 @@ import com.google.common.net.HostAndPort; import java.util.Arrays; import java.util.Set; import java.util.stream.Collectors; +import org.sonar.application.cluster.AppNodesClusterHostsConsistency; import org.sonar.application.cluster.ClusterAppStateImpl; import org.sonar.application.config.AppSettings; import org.sonar.application.config.ClusterSettings; @@ -37,12 +38,11 @@ import static java.util.Arrays.asList; import static org.sonar.process.ProcessProperties.Property.CLUSTER_HZ_HOSTS; import static org.sonar.process.ProcessProperties.Property.CLUSTER_NAME; import static org.sonar.process.ProcessProperties.Property.CLUSTER_NODE_HOST; -import static org.sonar.process.ProcessProperties.Property.CLUSTER_NODE_NAME; import static org.sonar.process.ProcessProperties.Property.CLUSTER_NODE_HZ_PORT; +import static org.sonar.process.ProcessProperties.Property.CLUSTER_NODE_NAME; import static org.sonar.process.ProcessProperties.Property.CLUSTER_SEARCH_HOSTS; public class AppStateFactory { - private final AppSettings settings; public AppStateFactory(AppSettings settings) { @@ -53,7 +53,8 @@ public class AppStateFactory { if (ClusterSettings.shouldStartHazelcast(settings)) { EsConnector esConnector = createEsConnector(settings.getProps()); HazelcastMember hzMember = createHzMember(settings.getProps()); - return new ClusterAppStateImpl(settings, hzMember, esConnector); + AppNodesClusterHostsConsistency appNodesClusterHostsConsistency = AppNodesClusterHostsConsistency.setInstance(hzMember, settings); + return new ClusterAppStateImpl(settings, hzMember, esConnector, appNodesClusterHostsConsistency); } return new AppStateImpl(); } diff --git a/server/sonar-main/src/main/java/org/sonar/application/cluster/AppNodesClusterHostsConsistency.java b/server/sonar-main/src/main/java/org/sonar/application/cluster/AppNodesClusterHostsConsistency.java new file mode 100644 index 00000000000..dcf1f2a11d9 --- /dev/null +++ b/server/sonar-main/src/main/java/org/sonar/application/cluster/AppNodesClusterHostsConsistency.java @@ -0,0 +1,135 @@ +/* + * SonarQube + * Copyright (C) 2009-2019 SonarSource SA + * mailto:info AT sonarsource DOT com + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 3 of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with this program; if not, write to the Free Software Foundation, + * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ +package org.sonar.application.cluster; + +import com.google.common.annotations.VisibleForTesting; +import com.hazelcast.cluster.memberselector.MemberSelectors; +import com.hazelcast.core.Member; +import com.hazelcast.core.MemberSelector; +import com.hazelcast.nio.Address; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Consumer; +import javax.annotation.CheckForNull; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.sonar.application.config.AppSettings; +import org.sonar.process.ProcessId; +import org.sonar.process.cluster.hz.DistributedCallback; +import org.sonar.process.cluster.hz.HazelcastMember; +import org.sonar.process.cluster.hz.HazelcastMemberSelectors; + +import static com.google.common.base.Preconditions.checkState; +import static com.hazelcast.cluster.memberselector.MemberSelectors.NON_LOCAL_MEMBER_SELECTOR; +import static org.sonar.process.ProcessProperties.Property.CLUSTER_HZ_HOSTS; + +public class AppNodesClusterHostsConsistency { + private static final Logger LOG = LoggerFactory.getLogger(AppNodesClusterHostsConsistency.class); + + private static final AtomicReference<AppNodesClusterHostsConsistency> INSTANCE = new AtomicReference<>(); + + private final AppSettings settings; + private final HazelcastMember hzMember; + private final Consumer<String> logger; + + private AppNodesClusterHostsConsistency(HazelcastMember hzMember, AppSettings settings, Consumer<String> logger) { + this.hzMember = hzMember; + this.settings = settings; + this.logger = logger; + } + + public static AppNodesClusterHostsConsistency setInstance(HazelcastMember hzMember, AppSettings settings) { + return setInstance(hzMember, settings, LOG::warn); + } + + @VisibleForTesting + public static AppNodesClusterHostsConsistency setInstance(HazelcastMember hzMember, AppSettings settings, Consumer<String> logger) { + AppNodesClusterHostsConsistency instance = new AppNodesClusterHostsConsistency(hzMember, settings, logger); + checkState(INSTANCE.compareAndSet(null, instance), "Instance is already set"); + return instance; + } + + @VisibleForTesting + @CheckForNull + protected static AppNodesClusterHostsConsistency clearInstance() { + return INSTANCE.getAndSet(null); + } + + public void check() { + try { + MemberSelector selector = MemberSelectors.and(NON_LOCAL_MEMBER_SELECTOR, HazelcastMemberSelectors.selectorForProcessIds(ProcessId.APP)); + hzMember.callAsync(AppNodesClusterHostsConsistency::getConfiguredClusterHosts, selector, new Callback()); + } catch (RejectedExecutionException e) { + // no other node in the cluster yet, ignore + } + } + + private class Callback implements DistributedCallback<List<String>> { + @Override + public void onComplete(Map<Member, List<String>> hostsPerMember) { + List<String> currentConfiguredHosts = getConfiguredClusterHosts(); + + boolean anyDifference = hostsPerMember.values().stream() + .filter(v -> !v.isEmpty()) + .anyMatch(hosts -> currentConfiguredHosts.size() != hosts.size() || !currentConfiguredHosts.containsAll(hosts)); + + if (anyDifference) { + StringBuilder builder = new StringBuilder().append("The configuration of the current node doesn't match the list of hosts configured in " + + "the application nodes that have already joined the cluster:\n"); + for (Map.Entry<Member, List<String>> e : hostsPerMember.entrySet()) { + if (e.getValue().isEmpty()) { + continue; + } + builder.append(toString(e.getKey().getAddress())); + builder.append(" : "); + builder.append(e.getValue()); + if (e.getKey().localMember()) { + builder.append(" (current)"); + } + builder.append("\n"); + } + builder.append("Make sure the configuration is consistent among all application nodes before you restart any node"); + logger.accept(builder.toString()); + } + } + + private String toString(Address address) { + return address.getHost() + ":" + address.getPort(); + } + } + + private static List<String> getConfiguredClusterHosts() { + try { + AppNodesClusterHostsConsistency instance = INSTANCE.get(); + if (instance != null) { + return Arrays.asList(instance.settings.getProps().nonNullValue(CLUSTER_HZ_HOSTS.getKey()).split(",")); + } + return Collections.emptyList(); + } catch (Exception e) { + LOG.error("Failed to get configured cluster nodes", e); + return Collections.emptyList(); + } + } + +} diff --git a/server/sonar-main/src/main/java/org/sonar/application/cluster/ClusterAppStateImpl.java b/server/sonar-main/src/main/java/org/sonar/application/cluster/ClusterAppStateImpl.java index efc2eafaf1d..bf5f215790b 100644 --- a/server/sonar-main/src/main/java/org/sonar/application/cluster/ClusterAppStateImpl.java +++ b/server/sonar-main/src/main/java/org/sonar/application/cluster/ClusterAppStateImpl.java @@ -68,14 +68,14 @@ public class ClusterAppStateImpl implements ClusterAppState { private final EsConnector esConnector; private HealthStateSharing healthStateSharing = null; - public ClusterAppStateImpl(AppSettings settings, HazelcastMember hzMember, EsConnector esConnector) { + public ClusterAppStateImpl(AppSettings settings, HazelcastMember hzMember, EsConnector esConnector, AppNodesClusterHostsConsistency appNodesClusterHostsConsistency) { this.hzMember = hzMember; // Get or create the replicated map operationalProcesses = (ReplicatedMap) hzMember.getReplicatedMap(OPERATIONAL_PROCESSES); operationalProcessListenerUUID = operationalProcesses.addEntryListener(new OperationalProcessListener()); nodeDisconnectedListenerUUID = hzMember.getCluster().addMembershipListener(new NodeDisconnectedListener()); - + appNodesClusterHostsConsistency.check(); if (ClusterSettings.isLocalElasticsearchEnabled(settings)) { this.healthStateSharing = new HealthStateSharingImpl(hzMember, new SearchNodeHealthProvider(settings.getProps(), this, NetworkUtilsImpl.INSTANCE)); this.healthStateSharing.start(); diff --git a/server/sonar-main/src/main/java/org/sonar/application/config/AppSettingsImpl.java b/server/sonar-main/src/main/java/org/sonar/application/config/AppSettingsImpl.java index 329c625dc85..8ea91186222 100644 --- a/server/sonar-main/src/main/java/org/sonar/application/config/AppSettingsImpl.java +++ b/server/sonar-main/src/main/java/org/sonar/application/config/AppSettingsImpl.java @@ -24,7 +24,7 @@ import org.sonar.process.Props; public class AppSettingsImpl implements AppSettings { - private Props props; + private volatile Props props; AppSettingsImpl(Props props) { this.props = props; diff --git a/server/sonar-main/src/test/java/org/sonar/application/cluster/AppNodesClusterHostsConsistencyTest.java b/server/sonar-main/src/test/java/org/sonar/application/cluster/AppNodesClusterHostsConsistencyTest.java new file mode 100644 index 00000000000..3999997a7eb --- /dev/null +++ b/server/sonar-main/src/test/java/org/sonar/application/cluster/AppNodesClusterHostsConsistencyTest.java @@ -0,0 +1,205 @@ +/* + * SonarQube + * Copyright (C) 2009-2019 SonarSource SA + * mailto:info AT sonarsource DOT com + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 3 of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with this program; if not, write to the Free Software Foundation, + * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ +package org.sonar.application.cluster; + +import com.hazelcast.core.Cluster; +import com.hazelcast.core.IAtomicReference; +import com.hazelcast.core.Member; +import com.hazelcast.core.MemberSelector; +import com.hazelcast.nio.Address; +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.util.Arrays; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.locks.Lock; +import java.util.function.Consumer; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.sonar.application.config.TestAppSettings; +import org.sonar.process.cluster.hz.DistributedAnswer; +import org.sonar.process.cluster.hz.DistributedCall; +import org.sonar.process.cluster.hz.DistributedCallback; +import org.sonar.process.cluster.hz.HazelcastMember; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; +import static org.mockito.Mockito.verifyZeroInteractions; +import static org.mockito.Mockito.when; +import static org.sonar.process.ProcessProperties.Property.CLUSTER_HZ_HOSTS; + +public class AppNodesClusterHostsConsistencyTest { + @Rule + public ExpectedException expectedException = ExpectedException.none(); + + private TestAppSettings settings = new TestAppSettings(); + private Consumer<String> logger = mock(Consumer.class); + + @Before + public void setUp() { + AppNodesClusterHostsConsistency.clearInstance(); + } + + @Test + public void log_warning_if_configured_hosts_are_not_consistent() throws UnknownHostException { + Map<Member, List<String>> hostsPerMember = new LinkedHashMap<>(); + Member m1 = newLocalHostMember(1, true); + Member m2 = newLocalHostMember(2); + Member m3 = newLocalHostMember(3); + + hostsPerMember.put(m1, Arrays.asList("1.1.1.1:1000", "1.1.1.1:2000", "1.1.1.2:1000")); + hostsPerMember.put(m2, Arrays.asList("1.1.1.1:1000", "1.1.1.1:2000")); + hostsPerMember.put(m3, Arrays.asList("1.1.1.1:1000", "1.1.1.2:1000")); + + settings.set(CLUSTER_HZ_HOSTS.getKey(), "1.1.1.1:1000,1.1.1.1:2000,1.1.1.2:1000"); + + TestHazelcastMember member = new TestHazelcastMember(hostsPerMember); + AppNodesClusterHostsConsistency underTest = AppNodesClusterHostsConsistency.setInstance(member, settings, logger); + underTest.check(); + + verify(logger).accept("The configuration of the current node doesn't match the list of hosts configured in the application nodes that have already joined the cluster:\n" + + m1.getAddress().getHost() + ":" + m1.getAddress().getPort() + " : [1.1.1.1:1000, 1.1.1.1:2000, 1.1.1.2:1000] (current)\n" + + m2.getAddress().getHost() + ":" + m2.getAddress().getPort() + " : [1.1.1.1:1000, 1.1.1.1:2000]\n" + + m3.getAddress().getHost() + ":" + m3.getAddress().getPort() + " : [1.1.1.1:1000, 1.1.1.2:1000]\n" + + "Make sure the configuration is consistent among all application nodes before you restart any node"); + verifyNoMoreInteractions(logger); + } + + @Test + public void dont_log_if_configured_hosts_are_consistent() throws UnknownHostException { + Map<Member, List<String>> hostsPerMember = new LinkedHashMap<>(); + Member m1 = newLocalHostMember(1); + Member m2 = newLocalHostMember(2); + Member m3 = newLocalHostMember(3); + + hostsPerMember.put(m1, Arrays.asList("1.1.1.1:1000", "1.1.1.1:2000", "1.1.1.2:1000")); + hostsPerMember.put(m2, Arrays.asList("1.1.1.1:1000", "1.1.1.1:2000", "1.1.1.2:1000")); + hostsPerMember.put(m3, Arrays.asList("1.1.1.1:1000", "1.1.1.1:2000", "1.1.1.2:1000")); + + settings.set(CLUSTER_HZ_HOSTS.getKey(), "1.1.1.1:1000,1.1.1.1:2000,1.1.1.2:1000"); + + TestHazelcastMember member = new TestHazelcastMember(hostsPerMember); + AppNodesClusterHostsConsistency underTest = AppNodesClusterHostsConsistency.setInstance(member, settings, logger); + underTest.check(); + + verifyZeroInteractions(logger); + } + + @Test + public void setInstance_fails_with_ISE_when_called_twice_with_same_arguments() { + HazelcastMember hzMember = mock(HazelcastMember.class); + + AppNodesClusterHostsConsistency.setInstance(hzMember, settings); + + expectedException.expect(IllegalStateException.class); + expectedException.expectMessage("Instance is already set"); + + AppNodesClusterHostsConsistency.setInstance(hzMember, settings); + } + + @Test + public void setInstance_fails_with_ISE_when_called_twice_with_other_arguments() { + HazelcastMember hzMember1 = mock(HazelcastMember.class); + HazelcastMember hzMember2 = mock(HazelcastMember.class); + + AppNodesClusterHostsConsistency.setInstance(hzMember1, settings); + + expectedException.expect(IllegalStateException.class); + expectedException.expectMessage("Instance is already set"); + + AppNodesClusterHostsConsistency.setInstance(hzMember2, new TestAppSettings()); + } + + private Member newLocalHostMember(int port) throws UnknownHostException { + return newLocalHostMember(port, false); + } + + private Member newLocalHostMember(int port, boolean localMember) throws UnknownHostException { + Member member = mock(Member.class); + when (member.localMember()).thenReturn(localMember); + Address address1 = new Address(InetAddress.getLocalHost(), port); + when(member.getAddress()).thenReturn(address1); + return member; + } + + private class TestHazelcastMember implements HazelcastMember { + private final Map<Member, List<String>> hostsPerMember; + + private TestHazelcastMember(Map<Member, List<String>> hostsPerMember) { + this.hostsPerMember = hostsPerMember; + } + + @Override + public <E> IAtomicReference<E> getAtomicReference(String name) { + throw new IllegalStateException("not expected to be called"); + } + + @Override + public <K, V> Map<K, V> getReplicatedMap(String name) { + throw new IllegalStateException("not expected to be called"); + } + + @Override + public String getUuid() { + throw new IllegalStateException("not expected to be called"); + } + + @Override + public Set<String> getMemberUuids() { + throw new IllegalStateException("not expected to be called"); + + } + + @Override + public Lock getLock(String name) { + throw new IllegalStateException("not expected to be called"); + } + + @Override + public long getClusterTime() { + throw new IllegalStateException("not expected to be called"); + } + + @Override + public Cluster getCluster() { + throw new IllegalStateException("not expected to be called"); + } + + @Override + public <T> DistributedAnswer<T> call(DistributedCall<T> callable, MemberSelector memberSelector, long timeoutMs) { + throw new IllegalStateException("not expected to be called"); + } + + @Override + public <T> void callAsync(DistributedCall<T> callable, MemberSelector memberSelector, DistributedCallback<T> callback) { + callback.onComplete((Map<Member, T>) hostsPerMember); + } + + @Override + public void close() { + + } + } +} diff --git a/server/sonar-main/src/test/java/org/sonar/application/cluster/ClusterAppStateImplTest.java b/server/sonar-main/src/test/java/org/sonar/application/cluster/ClusterAppStateImplTest.java index 9e8e7d74996..5a44eb6fa85 100644 --- a/server/sonar-main/src/test/java/org/sonar/application/cluster/ClusterAppStateImplTest.java +++ b/server/sonar-main/src/test/java/org/sonar/application/cluster/ClusterAppStateImplTest.java @@ -52,7 +52,8 @@ public class ClusterAppStateImplTest { @Test public void tryToLockWebLeader_returns_true_only_for_the_first_call() { - try (ClusterAppStateImpl underTest = new ClusterAppStateImpl(new TestAppSettings(), newHzMember(), mock(EsConnector.class))) { + try (ClusterAppStateImpl underTest = new ClusterAppStateImpl(new TestAppSettings(), newHzMember(), + mock(EsConnector.class), mock(AppNodesClusterHostsConsistency.class))) { assertThat(underTest.tryToLockWebLeader()).isEqualTo(true); assertThat(underTest.tryToLockWebLeader()).isEqualTo(false); } @@ -61,7 +62,7 @@ public class ClusterAppStateImplTest { @Test public void test_listeners() { AppStateListener listener = mock(AppStateListener.class); - try (ClusterAppStateImpl underTest = new ClusterAppStateImpl(new TestAppSettings(), newHzMember(), mock(EsConnector.class))) { + try (ClusterAppStateImpl underTest = createClusterAppState()) { underTest.addListener(listener); underTest.setOperational(ProcessId.ELASTICSEARCH); @@ -75,9 +76,18 @@ public class ClusterAppStateImplTest { } @Test + public void constructor_checks_appNodesClusterHostsConsistency() { + AppNodesClusterHostsConsistency clusterHostsConsistency = mock(AppNodesClusterHostsConsistency.class); + try (ClusterAppStateImpl underTest = new ClusterAppStateImpl(new TestAppSettings(), newHzMember(), + mock(EsConnector.class), clusterHostsConsistency)) { + verify(clusterHostsConsistency).check(); + } + } + + @Test public void registerSonarQubeVersion_publishes_version_on_first_call() { - try (ClusterAppStateImpl underTest = new ClusterAppStateImpl(new TestAppSettings(), newHzMember(), mock(EsConnector.class))) { + try (ClusterAppStateImpl underTest = createClusterAppState()) { underTest.registerSonarQubeVersion("6.4.1.5"); assertThat(underTest.getHazelcastMember().getAtomicReference(SONARQUBE_VERSION).get()) @@ -87,7 +97,7 @@ public class ClusterAppStateImplTest { @Test public void registerClusterName_publishes_clusterName_on_first_call() { - try (ClusterAppStateImpl underTest = new ClusterAppStateImpl(new TestAppSettings(), newHzMember(), mock(EsConnector.class))) { + try (ClusterAppStateImpl underTest = createClusterAppState()) { underTest.registerClusterName("foo"); assertThat(underTest.getHazelcastMember().getAtomicReference(CLUSTER_NAME).get()) @@ -97,7 +107,7 @@ public class ClusterAppStateImplTest { @Test public void reset_always_throws_ISE() { - try (ClusterAppStateImpl underTest = new ClusterAppStateImpl(new TestAppSettings(), newHzMember(), mock(EsConnector.class))) { + try (ClusterAppStateImpl underTest = createClusterAppState()) { expectedException.expect(IllegalStateException.class); expectedException.expectMessage("state reset is not supported in cluster mode"); @@ -108,7 +118,7 @@ public class ClusterAppStateImplTest { @Test public void registerSonarQubeVersion_throws_ISE_if_initial_version_is_different() { // Now launch an instance that try to be part of the hzInstance cluster - try (ClusterAppStateImpl underTest = new ClusterAppStateImpl(new TestAppSettings(), newHzMember(), mock(EsConnector.class))) { + try (ClusterAppStateImpl underTest = createClusterAppState()) { // Register first version underTest.getHazelcastMember().getAtomicReference(SONARQUBE_VERSION).set("6.6.0.1111"); @@ -122,7 +132,7 @@ public class ClusterAppStateImplTest { @Test public void registerClusterName_throws_MessageException_if_clusterName_is_different() { - try (ClusterAppStateImpl underTest = new ClusterAppStateImpl(new TestAppSettings(), newHzMember(), mock(EsConnector.class))) { + try (ClusterAppStateImpl underTest = createClusterAppState()) { // Register first version underTest.getHazelcastMember().getAtomicReference(CLUSTER_NAME).set("goodClusterName"); @@ -134,6 +144,10 @@ public class ClusterAppStateImplTest { } } + private ClusterAppStateImpl createClusterAppState() { + return new ClusterAppStateImpl(new TestAppSettings(), newHzMember(), mock(EsConnector.class), mock(AppNodesClusterHostsConsistency.class)); + } + private static HazelcastMember newHzMember() { // use loopback for support of offline builds InetAddress loopback = InetAddress.getLoopbackAddress(); diff --git a/server/sonar-process/src/main/java/org/sonar/process/cluster/hz/DistributedCallback.java b/server/sonar-process/src/main/java/org/sonar/process/cluster/hz/DistributedCallback.java new file mode 100644 index 00000000000..edf1159a3c9 --- /dev/null +++ b/server/sonar-process/src/main/java/org/sonar/process/cluster/hz/DistributedCallback.java @@ -0,0 +1,28 @@ +/* + * SonarQube + * Copyright (C) 2009-2019 SonarSource SA + * mailto:info AT sonarsource DOT com + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 3 of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with this program; if not, write to the Free Software Foundation, + * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ +package org.sonar.process.cluster.hz; + +import com.hazelcast.core.Member; +import java.util.Map; + +@FunctionalInterface +public interface DistributedCallback<T> { + void onComplete(Map<Member, T> response); +} diff --git a/server/sonar-process/src/main/java/org/sonar/process/cluster/hz/HazelcastMember.java b/server/sonar-process/src/main/java/org/sonar/process/cluster/hz/HazelcastMember.java index 951b6a73574..a880da32de2 100644 --- a/server/sonar-process/src/main/java/org/sonar/process/cluster/hz/HazelcastMember.java +++ b/server/sonar-process/src/main/java/org/sonar/process/cluster/hz/HazelcastMember.java @@ -82,16 +82,29 @@ public interface HazelcastMember extends AutoCloseable { /** * Runs a distributed query on a set of Hazelcast members. * - * @param callable the query that is executed on all target members. Be careful of classloader, don't use classes - * that are not available in classpath of target members. + * @param callable the query that is executed on all target members. Be careful of classloader, don't use classes + * that are not available in classpath of target members. * @param memberSelector the subset of members to target. See {@link com.hazelcast.cluster.memberselector.MemberSelectors} * for utilities. - * @param timeoutMs the total timeout to get responses from all target members, in milliseconds. If timeout is reached, then - * the members that didn't answer on time are marked as timed-out in {@link DistributedAnswer} + * @param timeoutMs the total timeout to get responses from all target members, in milliseconds. If timeout is reached, then + * the members that didn't answer on time are marked as timed-out in {@link DistributedAnswer} + * @throws java.util.concurrent.RejectedExecutionException if no member is selected */ <T> DistributedAnswer<T> call(DistributedCall<T> callable, MemberSelector memberSelector, long timeoutMs) throws InterruptedException; + /** + * Runs asynchronously a distributed query on a set of Hazelcast members. + * + * @param callable the query that is executed on all target members. Be careful of classloader, don't use classes + * that are not available in classpath of target members. + * @param memberSelector the subset of members to target. See {@link com.hazelcast.cluster.memberselector.MemberSelectors} + * for utilities. + * @param callback will be called once we get all responses. + * @throws java.util.concurrent.RejectedExecutionException if no member is selected + */ + <T> void callAsync(DistributedCall<T> callable, MemberSelector memberSelector, DistributedCallback<T> callback); + @Override void close(); } diff --git a/server/sonar-process/src/main/java/org/sonar/process/cluster/hz/HazelcastMemberImpl.java b/server/sonar-process/src/main/java/org/sonar/process/cluster/hz/HazelcastMemberImpl.java index 0d7588f8bb0..a52afe342d9 100644 --- a/server/sonar-process/src/main/java/org/sonar/process/cluster/hz/HazelcastMemberImpl.java +++ b/server/sonar-process/src/main/java/org/sonar/process/cluster/hz/HazelcastMemberImpl.java @@ -26,6 +26,7 @@ import com.hazelcast.core.IAtomicReference; import com.hazelcast.core.IExecutorService; import com.hazelcast.core.Member; import com.hazelcast.core.MemberSelector; +import com.hazelcast.core.MultiExecutionCallback; import java.util.Map; import java.util.Set; import java.util.concurrent.ExecutionException; @@ -106,6 +107,24 @@ class HazelcastMemberImpl implements HazelcastMember { } @Override + public <T> void callAsync(DistributedCall<T> callable, MemberSelector memberSelector, DistributedCallback<T> callback) { + IExecutorService executor = hzInstance.getExecutorService("default"); + + // callback doesn't handle failures, so we need to make sure the callable won't fail! + executor.submitToMembers(callable, memberSelector, new MultiExecutionCallback() { + @Override + public void onResponse(Member member, Object value) { + // nothing to do when each node responds + } + + @Override + public void onComplete(Map<Member, Object> values) { + callback.onComplete((Map<Member, T>) values); + } + }); + } + + @Override public void close() { try { hzInstance.shutdown(); |