public AppState create() {
if (ClusterSettings.isClusterEnabled(settings)) {
HazelcastMember hzMember = createHzMember(settings.getProps());
- return new ClusterAppStateImpl(hzMember);
+ return new ClusterAppStateImpl(settings, hzMember);
}
return new AppStateImpl();
}
import java.util.function.Supplier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.sonar.application.cluster.ClusterAppState;
-import org.sonar.application.cluster.health.SearchNodeHealthProvider;
import org.sonar.application.command.CommandFactory;
import org.sonar.application.command.EsCommand;
import org.sonar.application.command.JavaCommand;
import org.sonar.application.process.ProcessLifecycleListener;
import org.sonar.application.process.ProcessMonitor;
import org.sonar.application.process.SQProcess;
-import org.sonar.process.NetworkUtils;
import org.sonar.process.ProcessId;
-import org.sonar.application.cluster.health.HealthStateSharing;
-import org.sonar.application.cluster.health.HealthStateSharingImpl;
public class SchedulerImpl implements Scheduler, ProcessEventListener, ProcessLifecycleListener, AppStateListener {
private final AtomicInteger stopCountDown = new AtomicInteger(0);
private StopperThread stopperThread;
private RestarterThread restarterThread;
- private HealthStateSharing healthStateSharing;
private long processWatcherDelayMs = SQProcess.DEFAULT_WATCHER_DELAY_MS;
public SchedulerImpl(AppSettings settings, AppReloader appReloader, CommandFactory commandFactory,
}
private void tryToStartAll() {
- tryToStartHealthStateSharing();
tryToStartEs();
tryToStartWeb();
tryToStartCe();
}
}
- private void tryToStartHealthStateSharing() {
- if (healthStateSharing == null
- && appState instanceof ClusterAppState
- && ClusterSettings.isLocalElasticsearchEnabled(settings)) {
- ClusterAppState clusterAppState = (ClusterAppState) appState;
- this.healthStateSharing = new HealthStateSharingImpl(
- clusterAppState.getHazelcastMember(),
- new SearchNodeHealthProvider(settings.getProps(), clusterAppState, NetworkUtils.INSTANCE));
- this.healthStateSharing.start();
- }
- }
-
private boolean isEsClientStartable() {
boolean requireLocalEs = ClusterSettings.isLocalElasticsearchEnabled(settings);
return appState.isOperational(ProcessId.ELASTICSEARCH, requireLocalEs);
stopProcess(ProcessId.COMPUTE_ENGINE);
stopProcess(ProcessId.WEB_SERVER);
stopProcess(ProcessId.ELASTICSEARCH);
- stopHealthStateSharing();
}
/**
}
}
- private void stopHealthStateSharing() {
- if (healthStateSharing != null) {
- healthStateSharing.stop();
- }
- }
-
/**
* Blocks until all processes are stopped. Pending restart, if
* any, is disabled.
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.sonar.application.AppStateListener;
+import org.sonar.application.cluster.health.HealthStateSharing;
+import org.sonar.application.cluster.health.HealthStateSharingImpl;
+import org.sonar.application.cluster.health.SearchNodeHealthProvider;
+import org.sonar.application.config.AppSettings;
+import org.sonar.application.config.ClusterSettings;
import org.sonar.process.MessageException;
+import org.sonar.process.NetworkUtils;
import org.sonar.process.ProcessId;
import org.sonar.process.cluster.NodeType;
import org.sonar.process.cluster.hz.HazelcastMember;
private final ReplicatedMap<ClusterProcess, Boolean> operationalProcesses;
private final String operationalProcessListenerUUID;
private final String nodeDisconnectedListenerUUID;
+ private HealthStateSharing healthStateSharing = null;
- public ClusterAppStateImpl(HazelcastMember hzMember) {
+ public ClusterAppStateImpl(AppSettings settings, HazelcastMember hzMember) {
this.hzMember = hzMember;
// Get or create the replicated map
operationalProcesses = (ReplicatedMap) hzMember.getReplicatedMap(OPERATIONAL_PROCESSES);
operationalProcessListenerUUID = operationalProcesses.addEntryListener(new OperationalProcessListener());
nodeDisconnectedListenerUUID = hzMember.getCluster().addMembershipListener(new NodeDisconnectedListener());
+
+ if (ClusterSettings.isLocalElasticsearchEnabled(settings)) {
+ this.healthStateSharing = new HealthStateSharingImpl(hzMember, new SearchNodeHealthProvider(settings.getProps(), this, NetworkUtils.INSTANCE));
+ this.healthStateSharing.start();
+ }
}
@Override
@Override
public void close() {
if (hzMember != null) {
+ if (healthStateSharing != null) {
+ healthStateSharing.stop();
+ }
try {
// Removing listeners
operationalProcesses.removeEntryListener(operationalProcessListenerUUID);
*/
package org.sonar.application.cluster;
+import java.net.InetAddress;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.DisableOnDebug;
import org.junit.rules.TestRule;
import org.junit.rules.Timeout;
import org.sonar.application.AppStateListener;
+import org.sonar.application.config.TestAppSettings;
import org.sonar.process.MessageException;
+import org.sonar.process.NetworkUtils;
import org.sonar.process.ProcessId;
+import org.sonar.process.cluster.NodeType;
+import org.sonar.process.cluster.hz.HazelcastMember;
+import org.sonar.process.cluster.hz.HazelcastMemberBuilder;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.timeout;
import static org.mockito.Mockito.verify;
-import static org.sonar.application.cluster.HazelcastTesting.newHzMember;
import static org.sonar.process.cluster.hz.HazelcastObjects.CLUSTER_NAME;
import static org.sonar.process.cluster.hz.HazelcastObjects.SONARQUBE_VERSION;
@Test
public void tryToLockWebLeader_returns_true_only_for_the_first_call() throws Exception {
- try (ClusterAppStateImpl underTest = new ClusterAppStateImpl(newHzMember())) {
+ try (ClusterAppStateImpl underTest = new ClusterAppStateImpl(new TestAppSettings(), newHzMember())) {
assertThat(underTest.tryToLockWebLeader()).isEqualTo(true);
assertThat(underTest.tryToLockWebLeader()).isEqualTo(false);
}
@Test
public void test_listeners() throws InterruptedException {
AppStateListener listener = mock(AppStateListener.class);
- try (ClusterAppStateImpl underTest = new ClusterAppStateImpl(newHzMember())) {
+ try (ClusterAppStateImpl underTest = new ClusterAppStateImpl(new TestAppSettings(), newHzMember())) {
underTest.addListener(listener);
underTest.setOperational(ProcessId.ELASTICSEARCH);
@Test
public void registerSonarQubeVersion_publishes_version_on_first_call() {
- try (ClusterAppStateImpl underTest = new ClusterAppStateImpl(newHzMember())) {
+ try (ClusterAppStateImpl underTest = new ClusterAppStateImpl(new TestAppSettings(), newHzMember())) {
underTest.registerSonarQubeVersion("6.4.1.5");
assertThat(underTest.getHazelcastMember().getAtomicReference(SONARQUBE_VERSION).get())
@Test
public void registerClusterName_publishes_clusterName_on_first_call() {
- try (ClusterAppStateImpl underTest = new ClusterAppStateImpl(newHzMember())) {
+ try (ClusterAppStateImpl underTest = new ClusterAppStateImpl(new TestAppSettings(), newHzMember())) {
underTest.registerClusterName("foo");
assertThat(underTest.getHazelcastMember().getAtomicReference(CLUSTER_NAME).get())
@Test
public void reset_always_throws_ISE() {
- try (ClusterAppStateImpl underTest = new ClusterAppStateImpl(newHzMember())) {
+ try (ClusterAppStateImpl underTest = new ClusterAppStateImpl(new TestAppSettings(), newHzMember())) {
expectedException.expect(IllegalStateException.class);
expectedException.expectMessage("state reset is not supported in cluster mode");
@Test
public void registerSonarQubeVersion_throws_ISE_if_initial_version_is_different() throws Exception {
// Now launch an instance that try to be part of the hzInstance cluster
- try (ClusterAppStateImpl underTest = new ClusterAppStateImpl(newHzMember())) {
+ try (ClusterAppStateImpl underTest = new ClusterAppStateImpl(new TestAppSettings(), newHzMember())) {
// Register first version
underTest.getHazelcastMember().getAtomicReference(SONARQUBE_VERSION).set("6.6.0.1111");
@Test
public void registerClusterName_throws_MessageException_if_clusterName_is_different() throws Exception {
- try (ClusterAppStateImpl underTest = new ClusterAppStateImpl(newHzMember())) {
+ try (ClusterAppStateImpl underTest = new ClusterAppStateImpl(new TestAppSettings(), newHzMember())) {
// Register first version
underTest.getHazelcastMember().getAtomicReference(CLUSTER_NAME).set("goodClusterName");
underTest.registerClusterName("badClusterName");
}
}
+
+ private static HazelcastMember newHzMember() {
+ // use loopback for support of offline builds
+ InetAddress loopback = InetAddress.getLoopbackAddress();
+
+ return new HazelcastMemberBuilder()
+ .setNodeType(NodeType.APPLICATION)
+ .setProcessId(ProcessId.COMPUTE_ENGINE)
+ .setClusterName("foo")
+ .setNodeName("bar")
+ .setPort(NetworkUtils.INSTANCE.getNextAvailablePort(loopback))
+ .setNetworkInterface(loopback.getHostAddress())
+ .build();
+ }
}
+++ /dev/null
-/*
- * SonarQube
- * Copyright (C) 2009-2017 SonarSource SA
- * mailto:info AT sonarsource DOT com
- *
- * This program is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License as published by the Free Software Foundation; either
- * version 3 of the License, or (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public License
- * along with this program; if not, write to the Free Software Foundation,
- * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
- */
-package org.sonar.application.cluster;
-
-import java.net.InetAddress;
-import org.sonar.process.NetworkUtils;
-import org.sonar.process.ProcessId;
-import org.sonar.process.cluster.NodeType;
-import org.sonar.process.cluster.hz.HazelcastMember;
-import org.sonar.process.cluster.hz.HazelcastMemberBuilder;
-
-public class HazelcastTesting {
-
- private HazelcastTesting() {
- // do not instantiate
- }
-
- public static HazelcastMember newHzMember() {
- // use loopback for support of offline builds
- InetAddress loopback = InetAddress.getLoopbackAddress();
-
- return new HazelcastMemberBuilder()
- .setNodeType(NodeType.APPLICATION)
- .setProcessId(ProcessId.COMPUTE_ENGINE)
- .setClusterName("foo")
- .setNodeName("bar")
- .setPort(NetworkUtils.INSTANCE.getNextAvailablePort(loopback))
- .setNetworkInterface(loopback.getHostAddress())
- .build();
- }
-}
--- /dev/null
+/*
+ * SonarQube
+ * Copyright (C) 2009-2017 SonarSource SA
+ * mailto:info AT sonarsource DOT com
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 3 of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this program; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ */
+package org.sonar.process.cluster.hz;
+
+import com.hazelcast.core.Member;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+/**
+ * Answer of {@link DistributedCall}, aggregating the answers from
+ * all the target members.
+ */
+public class DistributedAnswer<T> {
+
+ private final Map<Member, T> answers = new HashMap<>();
+ private final Set<Member> timedOutMembers = new HashSet<>();
+ private final Map<Member, Exception> failedMembers = new HashMap<>();
+
+ public Optional<T> getAnswer(Member member) {
+ return Optional.ofNullable(answers.get(member));
+ }
+
+ public boolean hasTimedOut(Member member) {
+ return timedOutMembers.contains(member);
+ }
+
+ public Optional<Exception> getFailed(Member member) {
+ return Optional.ofNullable(failedMembers.get(member));
+ }
+
+ public Collection<Member> getMembers() {
+ List<Member> members = new ArrayList<>();
+ members.addAll(answers.keySet());
+ members.addAll(timedOutMembers);
+ members.addAll(failedMembers.keySet());
+ return members;
+ }
+
+ void setAnswer(Member member, T answer) {
+ this.answers.put(member, answer);
+ }
+
+ void setTimedOut(Member member) {
+ this.timedOutMembers.add(member);
+ }
+
+ void setFailed(Member member, Exception e) {
+ failedMembers.put(member, e);
+ }
+}
--- /dev/null
+/*
+ * SonarQube
+ * Copyright (C) 2009-2017 SonarSource SA
+ * mailto:info AT sonarsource DOT com
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 3 of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this program; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ */
+package org.sonar.process.cluster.hz;
+
+import java.io.Serializable;
+import java.util.concurrent.Callable;
+
+public interface DistributedCall<T> extends Callable<T>, Serializable {
+}
import com.hazelcast.core.Cluster;
import com.hazelcast.core.IAtomicReference;
+import com.hazelcast.core.MemberSelector;
import java.util.List;
import java.util.Map;
import java.util.Set;
Cluster getCluster();
+ /**
+ * Runs a distributed query on a set of Hazelcast members.
+ *
+ * @param callable the query that is executed on all target members. Be careful of classloader, don't use classes
+ * that are not available in classpath of target members.
+ * @param memberSelector the subset of members to target. See {@link com.hazelcast.cluster.memberselector.MemberSelectors}
+ * for utilities.
+ * @param timeoutMs the total timeout to get responses from all target members, in milliseconds. If timeout is reached, then
+ * the members that didn't answer on time are marked as timed-out in {@link DistributedAnswer}
+ */
+ <T> DistributedAnswer<T> call(DistributedCall<T> callable, MemberSelector memberSelector, long timeoutMs)
+ throws InterruptedException;
+
@Override
void close();
}
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.core.HazelcastInstanceNotActiveException;
import com.hazelcast.core.IAtomicReference;
+import com.hazelcast.core.IExecutorService;
import com.hazelcast.core.Member;
+import com.hazelcast.core.MemberSelector;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import java.util.concurrent.locks.Lock;
import java.util.stream.Collectors;
import org.slf4j.LoggerFactory;
return hzInstance.getCluster();
}
+ @Override
+ public <T> DistributedAnswer<T> call(DistributedCall<T> callable, MemberSelector memberSelector, long timeoutMs)
+ throws InterruptedException {
+
+ IExecutorService executor = hzInstance.getExecutorService("default");
+ Map<Member, Future<T>> futures = executor.submitToMembers(callable, memberSelector);
+ try {
+ DistributedAnswer<T> distributedAnswer = new DistributedAnswer<>();
+ long maxTime = System.currentTimeMillis() + timeoutMs;
+ for (Map.Entry<Member, Future<T>> entry : futures.entrySet()) {
+ long remainingMs = Math.max(maxTime - System.currentTimeMillis(), 5L);
+ try {
+ T answer = entry.getValue().get(remainingMs, TimeUnit.MILLISECONDS);
+ distributedAnswer.setAnswer(entry.getKey(), answer);
+ } catch (ExecutionException e) {
+ distributedAnswer.setFailed(entry.getKey(), e);
+ } catch (TimeoutException e) {
+ distributedAnswer.setTimedOut(entry.getKey());
+ }
+ }
+ return distributedAnswer;
+ } finally {
+ futures.values().forEach(f -> f.cancel(true));
+ }
+ }
+
@Override
public void close() {
try {
--- /dev/null
+/*
+ * SonarQube
+ * Copyright (C) 2009-2017 SonarSource SA
+ * mailto:info AT sonarsource DOT com
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 3 of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this program; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ */
+package org.sonar.process.cluster.hz;
+
+import com.hazelcast.core.Member;
+import java.io.IOException;
+import org.junit.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+
+public class DistributedAnswerTest {
+
+ private Member member = newMember("member1");
+ private DistributedAnswer underTest = new DistributedAnswer();
+
+ @Test
+ public void test_call_with_unknown_member() {
+ assertThat(underTest.getAnswer(member)).isEmpty();
+ assertThat(underTest.hasTimedOut(member)).isFalse();
+ assertThat(underTest.getFailed(member)).isEmpty();
+ }
+
+ @Test
+ public void test_setAnswer() {
+ underTest.setAnswer(member, "foo");
+
+ assertThat(underTest.getAnswer(member)).hasValue("foo");
+ assertThat(underTest.hasTimedOut(member)).isFalse();
+ }
+
+ @Test
+ public void test_setTimedOut() {
+ underTest.setTimedOut(member);
+
+ assertThat(underTest.getAnswer(member)).isEmpty();
+ assertThat(underTest.hasTimedOut(member)).isTrue();
+ }
+
+ @Test
+ public void test_setFailed() {
+ IOException e = new IOException();
+ underTest.setFailed(member, e);
+
+ assertThat(underTest.getFailed(member)).hasValue(e);
+ }
+
+ @Test
+ public void member_can_be_referenced_multiple_times() {
+ underTest.setTimedOut(member);
+ underTest.setAnswer(member, "foo");
+ IOException exception = new IOException();
+ underTest.setFailed(member, exception);
+
+ assertThat(underTest.hasTimedOut(member)).isTrue();
+ assertThat(underTest.getAnswer(member)).hasValue("foo");
+ assertThat(underTest.getFailed(member)).hasValue(exception);
+ }
+
+ private static Member newMember(String uuid) {
+ Member member = mock(Member.class);
+ when(member.getUuid()).thenReturn(uuid);
+ return member;
+ }
+}
--- /dev/null
+/*
+ * SonarQube
+ * Copyright (C) 2009-2017 SonarSource SA
+ * mailto:info AT sonarsource DOT com
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 3 of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this program; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ */
+package org.sonar.process.cluster.hz;
+
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicLong;
+
+public class FailedDistributedCall implements DistributedCall<Long> {
+ static final AtomicLong COUNTER = new AtomicLong();
+
+ @Override
+ public Long call() throws Exception {
+ long value = COUNTER.getAndIncrement();
+ if (value == 1L) {
+ // only the second call fails
+ throw new IOException("BOOM");
+ }
+ return value;
+ }
+}
--- /dev/null
+/*
+ * SonarQube
+ * Copyright (C) 2009-2017 SonarSource SA
+ * mailto:info AT sonarsource DOT com
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 3 of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this program; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ */
+package org.sonar.process.cluster.hz;
+
+import com.hazelcast.cluster.memberselector.MemberSelectors;
+import com.hazelcast.core.Member;
+import java.net.InetAddress;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.DisableOnDebug;
+import org.junit.rules.ExpectedException;
+import org.junit.rules.TestRule;
+import org.junit.rules.Timeout;
+import org.sonar.process.NetworkUtils;
+import org.sonar.process.ProcessId;
+import org.sonar.process.cluster.NodeType;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+public class HazelcastMemberImplTest {
+
+ @Rule
+ public ExpectedException expectedException = ExpectedException.none();
+ @Rule
+ public TestRule safeguardTimeout = new DisableOnDebug(Timeout.seconds(60));
+
+ // use loopback for support of offline builds
+ private static InetAddress loopback = InetAddress.getLoopbackAddress();
+ private static HazelcastMember member1;
+ private static HazelcastMember member2;
+ private static HazelcastMember member3;
+
+ @BeforeClass
+ public static void setUp() throws Exception {
+ int port1 = NetworkUtils.INSTANCE.getNextAvailablePort(loopback);
+ int port2 = NetworkUtils.INSTANCE.getNextAvailablePort(loopback);
+ int port3 = NetworkUtils.INSTANCE.getNextAvailablePort(loopback);
+ member1 = newHzMember(port1, port2, port3);
+ member2 = newHzMember(port2, port1, port3);
+ member3 = newHzMember(port3, port1, port2);
+ }
+
+ @AfterClass
+ public static void tearDown() throws Exception {
+ member1.close();
+ member2.close();
+ member3.close();
+ }
+
+ @Test
+ public void call_executes_query_on_members() throws Exception {
+ SuccessfulDistributedCall.COUNTER.set(0L);
+ DistributedCall<Long> call = new SuccessfulDistributedCall();
+
+ DistributedAnswer<Long> answer = member1.call(call, MemberSelectors.DATA_MEMBER_SELECTOR, 30_000L);
+
+ assertThat(answer.getMembers()).extracting(Member::getUuid).containsOnlyOnce(member1.getUuid(), member2.getUuid(), member3.getUuid());
+ assertThat(extractAnswers(answer)).containsOnlyOnce(0L, 1L, 2L);
+ }
+
+ @Test
+ public void timed_out_calls_do_not_break_other_answers() throws InterruptedException {
+ // member 1 and 3 success, member 2 times-out
+ TimedOutDistributedCall.COUNTER.set(0L);
+ DistributedCall call = new TimedOutDistributedCall();
+ DistributedAnswer<Long> answer = member1.call(call, MemberSelectors.DATA_MEMBER_SELECTOR, 2_000L);
+
+ assertThat(extractAnswers(answer)).containsOnlyOnce(0L, 2L);
+
+ assertThat(extractTimeOuts(answer)).containsExactlyInAnyOrder(false, false, true);
+ }
+
+ @Test
+ public void failed_calls_do_not_break_other_answers() throws InterruptedException {
+ // member 1 and 3 success, member 2 fails
+ FailedDistributedCall.COUNTER.set(0L);
+ DistributedCall call = new FailedDistributedCall();
+ DistributedAnswer<Long> answer = member1.call(call, MemberSelectors.DATA_MEMBER_SELECTOR, 2_000L);
+
+ // 2 successful answers
+ assertThat(extractAnswers(answer)).containsOnlyOnce(0L, 2L);
+
+ // 1 failure
+ List<Exception> failures = extractFailures(answer);
+ assertThat(failures).hasSize(1);
+ assertThat(failures.get(0)).hasMessageContaining("BOOM");
+ }
+
+ private static HazelcastMember newHzMember(int port, int... otherPorts) {
+ return new HazelcastMemberBuilder()
+ .setNodeType(NodeType.APPLICATION)
+ .setProcessId(ProcessId.COMPUTE_ENGINE)
+ .setClusterName("foo")
+ .setNodeName("name" + port)
+ .setPort(port)
+ .setNetworkInterface(loopback.getHostAddress())
+ .setMembers(Arrays.stream(otherPorts).mapToObj(p -> loopback.getHostAddress() + ":" + p).collect(Collectors.toList()))
+ .build();
+ }
+
+ private static Set<Long> extractAnswers(DistributedAnswer<Long> answer) {
+ return answer.getMembers().stream()
+ .map(answer::getAnswer)
+ .filter(Optional::isPresent)
+ .map(Optional::get)
+ .collect(Collectors.toSet());
+ }
+
+ private static List<Exception> extractFailures(DistributedAnswer<Long> answer) {
+ return answer.getMembers().stream()
+ .map(answer::getFailed)
+ .filter(Optional::isPresent)
+ .map(Optional::get)
+ .collect(Collectors.toList());
+ }
+
+ private static List<Boolean> extractTimeOuts(DistributedAnswer<Long> answer) {
+ return answer.getMembers().stream()
+ .map(answer::hasTimedOut)
+ .collect(Collectors.toList());
+ }
+}
--- /dev/null
+/*
+ * SonarQube
+ * Copyright (C) 2009-2017 SonarSource SA
+ * mailto:info AT sonarsource DOT com
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 3 of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this program; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ */
+package org.sonar.process.cluster.hz;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+public class SuccessfulDistributedCall implements DistributedCall<Long> {
+ static final AtomicLong COUNTER = new AtomicLong();
+
+ @Override
+ public Long call() throws Exception {
+ return COUNTER.getAndIncrement();
+ }
+}
--- /dev/null
+/*
+ * SonarQube
+ * Copyright (C) 2009-2017 SonarSource SA
+ * mailto:info AT sonarsource DOT com
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 3 of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this program; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ */
+package org.sonar.process.cluster.hz;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+public class TimedOutDistributedCall implements DistributedCall<Long> {
+ static final AtomicLong COUNTER = new AtomicLong();
+
+ @Override
+ public Long call() throws Exception {
+ long value = COUNTER.getAndIncrement();
+ if (value == 1L) {
+ // only the second call times out
+ Thread.sleep(30_000L);
+ }
+ return value;
+ }
+}
import com.hazelcast.core.Cluster;
import com.hazelcast.core.IAtomicReference;
+import com.hazelcast.core.MemberSelector;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.List;
import org.sonar.process.ProcessId;
import org.sonar.process.ProcessProperties;
import org.sonar.process.cluster.NodeType;
+import org.sonar.process.cluster.hz.DistributedAnswer;
+import org.sonar.process.cluster.hz.DistributedCall;
import org.sonar.process.cluster.hz.HazelcastMember;
import org.sonar.process.cluster.hz.HazelcastMemberBuilder;
return nonNullMember().getCluster();
}
+ @Override
+ public <T> DistributedAnswer<T> call(DistributedCall<T> callable, MemberSelector memberSelector, long timeoutMs)
+ throws InterruptedException {
+ return nonNullMember().call(callable, memberSelector, timeoutMs);
+ }
+
private HazelcastMember nonNullMember() {
return requireNonNull(member, "Hazelcast member not started");
}