See HazelcastMember#call(DistributedCall, ...)tags/6.6-RC1
@@ -42,7 +42,7 @@ public class AppStateFactory { | |||
public AppState create() { | |||
if (ClusterSettings.isClusterEnabled(settings)) { | |||
HazelcastMember hzMember = createHzMember(settings.getProps()); | |||
return new ClusterAppStateImpl(hzMember); | |||
return new ClusterAppStateImpl(settings, hzMember); | |||
} | |||
return new AppStateImpl(); | |||
} |
@@ -28,8 +28,6 @@ import java.util.concurrent.atomic.AtomicInteger; | |||
import java.util.function.Supplier; | |||
import org.slf4j.Logger; | |||
import org.slf4j.LoggerFactory; | |||
import org.sonar.application.cluster.ClusterAppState; | |||
import org.sonar.application.cluster.health.SearchNodeHealthProvider; | |||
import org.sonar.application.command.CommandFactory; | |||
import org.sonar.application.command.EsCommand; | |||
import org.sonar.application.command.JavaCommand; | |||
@@ -41,10 +39,7 @@ import org.sonar.application.process.ProcessLauncher; | |||
import org.sonar.application.process.ProcessLifecycleListener; | |||
import org.sonar.application.process.ProcessMonitor; | |||
import org.sonar.application.process.SQProcess; | |||
import org.sonar.process.NetworkUtils; | |||
import org.sonar.process.ProcessId; | |||
import org.sonar.application.cluster.health.HealthStateSharing; | |||
import org.sonar.application.cluster.health.HealthStateSharingImpl; | |||
public class SchedulerImpl implements Scheduler, ProcessEventListener, ProcessLifecycleListener, AppStateListener { | |||
@@ -65,7 +60,6 @@ public class SchedulerImpl implements Scheduler, ProcessEventListener, ProcessLi | |||
private final AtomicInteger stopCountDown = new AtomicInteger(0); | |||
private StopperThread stopperThread; | |||
private RestarterThread restarterThread; | |||
private HealthStateSharing healthStateSharing; | |||
private long processWatcherDelayMs = SQProcess.DEFAULT_WATCHER_DELAY_MS; | |||
public SchedulerImpl(AppSettings settings, AppReloader appReloader, CommandFactory commandFactory, | |||
@@ -105,7 +99,6 @@ public class SchedulerImpl implements Scheduler, ProcessEventListener, ProcessLi | |||
} | |||
private void tryToStartAll() { | |||
tryToStartHealthStateSharing(); | |||
tryToStartEs(); | |||
tryToStartWeb(); | |||
tryToStartCe(); | |||
@@ -144,18 +137,6 @@ public class SchedulerImpl implements Scheduler, ProcessEventListener, ProcessLi | |||
} | |||
} | |||
private void tryToStartHealthStateSharing() { | |||
if (healthStateSharing == null | |||
&& appState instanceof ClusterAppState | |||
&& ClusterSettings.isLocalElasticsearchEnabled(settings)) { | |||
ClusterAppState clusterAppState = (ClusterAppState) appState; | |||
this.healthStateSharing = new HealthStateSharingImpl( | |||
clusterAppState.getHazelcastMember(), | |||
new SearchNodeHealthProvider(settings.getProps(), clusterAppState, NetworkUtils.INSTANCE)); | |||
this.healthStateSharing.start(); | |||
} | |||
} | |||
private boolean isEsClientStartable() { | |||
boolean requireLocalEs = ClusterSettings.isLocalElasticsearchEnabled(settings); | |||
return appState.isOperational(ProcessId.ELASTICSEARCH, requireLocalEs); | |||
@@ -190,7 +171,6 @@ public class SchedulerImpl implements Scheduler, ProcessEventListener, ProcessLi | |||
stopProcess(ProcessId.COMPUTE_ENGINE); | |||
stopProcess(ProcessId.WEB_SERVER); | |||
stopProcess(ProcessId.ELASTICSEARCH); | |||
stopHealthStateSharing(); | |||
} | |||
/** | |||
@@ -204,12 +184,6 @@ public class SchedulerImpl implements Scheduler, ProcessEventListener, ProcessLi | |||
} | |||
} | |||
private void stopHealthStateSharing() { | |||
if (healthStateSharing != null) { | |||
healthStateSharing.stop(); | |||
} | |||
} | |||
/** | |||
* Blocks until all processes are stopped. Pending restart, if | |||
* any, is disabled. |
@@ -39,7 +39,13 @@ import java.util.concurrent.locks.Lock; | |||
import org.slf4j.Logger; | |||
import org.slf4j.LoggerFactory; | |||
import org.sonar.application.AppStateListener; | |||
import org.sonar.application.cluster.health.HealthStateSharing; | |||
import org.sonar.application.cluster.health.HealthStateSharingImpl; | |||
import org.sonar.application.cluster.health.SearchNodeHealthProvider; | |||
import org.sonar.application.config.AppSettings; | |||
import org.sonar.application.config.ClusterSettings; | |||
import org.sonar.process.MessageException; | |||
import org.sonar.process.NetworkUtils; | |||
import org.sonar.process.ProcessId; | |||
import org.sonar.process.cluster.NodeType; | |||
import org.sonar.process.cluster.hz.HazelcastMember; | |||
@@ -60,14 +66,20 @@ public class ClusterAppStateImpl implements ClusterAppState { | |||
private final ReplicatedMap<ClusterProcess, Boolean> operationalProcesses; | |||
private final String operationalProcessListenerUUID; | |||
private final String nodeDisconnectedListenerUUID; | |||
private HealthStateSharing healthStateSharing = null; | |||
public ClusterAppStateImpl(HazelcastMember hzMember) { | |||
public ClusterAppStateImpl(AppSettings settings, HazelcastMember hzMember) { | |||
this.hzMember = hzMember; | |||
// Get or create the replicated map | |||
operationalProcesses = (ReplicatedMap) hzMember.getReplicatedMap(OPERATIONAL_PROCESSES); | |||
operationalProcessListenerUUID = operationalProcesses.addEntryListener(new OperationalProcessListener()); | |||
nodeDisconnectedListenerUUID = hzMember.getCluster().addMembershipListener(new NodeDisconnectedListener()); | |||
if (ClusterSettings.isLocalElasticsearchEnabled(settings)) { | |||
this.healthStateSharing = new HealthStateSharingImpl(hzMember, new SearchNodeHealthProvider(settings.getProps(), this, NetworkUtils.INSTANCE)); | |||
this.healthStateSharing.start(); | |||
} | |||
} | |||
@Override | |||
@@ -184,6 +196,9 @@ public class ClusterAppStateImpl implements ClusterAppState { | |||
@Override | |||
public void close() { | |||
if (hzMember != null) { | |||
if (healthStateSharing != null) { | |||
healthStateSharing.stop(); | |||
} | |||
try { | |||
// Removing listeners | |||
operationalProcesses.removeEntryListener(operationalProcessListenerUUID); |
@@ -19,6 +19,7 @@ | |||
*/ | |||
package org.sonar.application.cluster; | |||
import java.net.InetAddress; | |||
import org.junit.Rule; | |||
import org.junit.Test; | |||
import org.junit.rules.DisableOnDebug; | |||
@@ -26,14 +27,18 @@ import org.junit.rules.ExpectedException; | |||
import org.junit.rules.TestRule; | |||
import org.junit.rules.Timeout; | |||
import org.sonar.application.AppStateListener; | |||
import org.sonar.application.config.TestAppSettings; | |||
import org.sonar.process.MessageException; | |||
import org.sonar.process.NetworkUtils; | |||
import org.sonar.process.ProcessId; | |||
import org.sonar.process.cluster.NodeType; | |||
import org.sonar.process.cluster.hz.HazelcastMember; | |||
import org.sonar.process.cluster.hz.HazelcastMemberBuilder; | |||
import static org.assertj.core.api.Assertions.assertThat; | |||
import static org.mockito.Mockito.mock; | |||
import static org.mockito.Mockito.timeout; | |||
import static org.mockito.Mockito.verify; | |||
import static org.sonar.application.cluster.HazelcastTesting.newHzMember; | |||
import static org.sonar.process.cluster.hz.HazelcastObjects.CLUSTER_NAME; | |||
import static org.sonar.process.cluster.hz.HazelcastObjects.SONARQUBE_VERSION; | |||
@@ -47,7 +52,7 @@ public class ClusterAppStateImplTest { | |||
@Test | |||
public void tryToLockWebLeader_returns_true_only_for_the_first_call() throws Exception { | |||
try (ClusterAppStateImpl underTest = new ClusterAppStateImpl(newHzMember())) { | |||
try (ClusterAppStateImpl underTest = new ClusterAppStateImpl(new TestAppSettings(), newHzMember())) { | |||
assertThat(underTest.tryToLockWebLeader()).isEqualTo(true); | |||
assertThat(underTest.tryToLockWebLeader()).isEqualTo(false); | |||
} | |||
@@ -56,7 +61,7 @@ public class ClusterAppStateImplTest { | |||
@Test | |||
public void test_listeners() throws InterruptedException { | |||
AppStateListener listener = mock(AppStateListener.class); | |||
try (ClusterAppStateImpl underTest = new ClusterAppStateImpl(newHzMember())) { | |||
try (ClusterAppStateImpl underTest = new ClusterAppStateImpl(new TestAppSettings(), newHzMember())) { | |||
underTest.addListener(listener); | |||
underTest.setOperational(ProcessId.ELASTICSEARCH); | |||
@@ -72,7 +77,7 @@ public class ClusterAppStateImplTest { | |||
@Test | |||
public void registerSonarQubeVersion_publishes_version_on_first_call() { | |||
try (ClusterAppStateImpl underTest = new ClusterAppStateImpl(newHzMember())) { | |||
try (ClusterAppStateImpl underTest = new ClusterAppStateImpl(new TestAppSettings(), newHzMember())) { | |||
underTest.registerSonarQubeVersion("6.4.1.5"); | |||
assertThat(underTest.getHazelcastMember().getAtomicReference(SONARQUBE_VERSION).get()) | |||
@@ -82,7 +87,7 @@ public class ClusterAppStateImplTest { | |||
@Test | |||
public void registerClusterName_publishes_clusterName_on_first_call() { | |||
try (ClusterAppStateImpl underTest = new ClusterAppStateImpl(newHzMember())) { | |||
try (ClusterAppStateImpl underTest = new ClusterAppStateImpl(new TestAppSettings(), newHzMember())) { | |||
underTest.registerClusterName("foo"); | |||
assertThat(underTest.getHazelcastMember().getAtomicReference(CLUSTER_NAME).get()) | |||
@@ -92,7 +97,7 @@ public class ClusterAppStateImplTest { | |||
@Test | |||
public void reset_always_throws_ISE() { | |||
try (ClusterAppStateImpl underTest = new ClusterAppStateImpl(newHzMember())) { | |||
try (ClusterAppStateImpl underTest = new ClusterAppStateImpl(new TestAppSettings(), newHzMember())) { | |||
expectedException.expect(IllegalStateException.class); | |||
expectedException.expectMessage("state reset is not supported in cluster mode"); | |||
@@ -103,7 +108,7 @@ public class ClusterAppStateImplTest { | |||
@Test | |||
public void registerSonarQubeVersion_throws_ISE_if_initial_version_is_different() throws Exception { | |||
// Now launch an instance that try to be part of the hzInstance cluster | |||
try (ClusterAppStateImpl underTest = new ClusterAppStateImpl(newHzMember())) { | |||
try (ClusterAppStateImpl underTest = new ClusterAppStateImpl(new TestAppSettings(), newHzMember())) { | |||
// Register first version | |||
underTest.getHazelcastMember().getAtomicReference(SONARQUBE_VERSION).set("6.6.0.1111"); | |||
@@ -117,7 +122,7 @@ public class ClusterAppStateImplTest { | |||
@Test | |||
public void registerClusterName_throws_MessageException_if_clusterName_is_different() throws Exception { | |||
try (ClusterAppStateImpl underTest = new ClusterAppStateImpl(newHzMember())) { | |||
try (ClusterAppStateImpl underTest = new ClusterAppStateImpl(new TestAppSettings(), newHzMember())) { | |||
// Register first version | |||
underTest.getHazelcastMember().getAtomicReference(CLUSTER_NAME).set("goodClusterName"); | |||
@@ -128,4 +133,18 @@ public class ClusterAppStateImplTest { | |||
underTest.registerClusterName("badClusterName"); | |||
} | |||
} | |||
private static HazelcastMember newHzMember() { | |||
// use loopback for support of offline builds | |||
InetAddress loopback = InetAddress.getLoopbackAddress(); | |||
return new HazelcastMemberBuilder() | |||
.setNodeType(NodeType.APPLICATION) | |||
.setProcessId(ProcessId.COMPUTE_ENGINE) | |||
.setClusterName("foo") | |||
.setNodeName("bar") | |||
.setPort(NetworkUtils.INSTANCE.getNextAvailablePort(loopback)) | |||
.setNetworkInterface(loopback.getHostAddress()) | |||
.build(); | |||
} | |||
} |
@@ -1,48 +0,0 @@ | |||
/* | |||
* SonarQube | |||
* Copyright (C) 2009-2017 SonarSource SA | |||
* mailto:info AT sonarsource DOT com | |||
* | |||
* This program is free software; you can redistribute it and/or | |||
* modify it under the terms of the GNU Lesser General Public | |||
* License as published by the Free Software Foundation; either | |||
* version 3 of the License, or (at your option) any later version. | |||
* | |||
* This program is distributed in the hope that it will be useful, | |||
* but WITHOUT ANY WARRANTY; without even the implied warranty of | |||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU | |||
* Lesser General Public License for more details. | |||
* | |||
* You should have received a copy of the GNU Lesser General Public License | |||
* along with this program; if not, write to the Free Software Foundation, | |||
* Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. | |||
*/ | |||
package org.sonar.application.cluster; | |||
import java.net.InetAddress; | |||
import org.sonar.process.NetworkUtils; | |||
import org.sonar.process.ProcessId; | |||
import org.sonar.process.cluster.NodeType; | |||
import org.sonar.process.cluster.hz.HazelcastMember; | |||
import org.sonar.process.cluster.hz.HazelcastMemberBuilder; | |||
public class HazelcastTesting { | |||
private HazelcastTesting() { | |||
// do not instantiate | |||
} | |||
public static HazelcastMember newHzMember() { | |||
// use loopback for support of offline builds | |||
InetAddress loopback = InetAddress.getLoopbackAddress(); | |||
return new HazelcastMemberBuilder() | |||
.setNodeType(NodeType.APPLICATION) | |||
.setProcessId(ProcessId.COMPUTE_ENGINE) | |||
.setClusterName("foo") | |||
.setNodeName("bar") | |||
.setPort(NetworkUtils.INSTANCE.getNextAvailablePort(loopback)) | |||
.setNetworkInterface(loopback.getHostAddress()) | |||
.build(); | |||
} | |||
} |
@@ -0,0 +1,73 @@ | |||
/* | |||
* SonarQube | |||
* Copyright (C) 2009-2017 SonarSource SA | |||
* mailto:info AT sonarsource DOT com | |||
* | |||
* This program is free software; you can redistribute it and/or | |||
* modify it under the terms of the GNU Lesser General Public | |||
* License as published by the Free Software Foundation; either | |||
* version 3 of the License, or (at your option) any later version. | |||
* | |||
* This program is distributed in the hope that it will be useful, | |||
* but WITHOUT ANY WARRANTY; without even the implied warranty of | |||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU | |||
* Lesser General Public License for more details. | |||
* | |||
* You should have received a copy of the GNU Lesser General Public License | |||
* along with this program; if not, write to the Free Software Foundation, | |||
* Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. | |||
*/ | |||
package org.sonar.process.cluster.hz; | |||
import com.hazelcast.core.Member; | |||
import java.util.ArrayList; | |||
import java.util.Collection; | |||
import java.util.HashMap; | |||
import java.util.HashSet; | |||
import java.util.List; | |||
import java.util.Map; | |||
import java.util.Optional; | |||
import java.util.Set; | |||
/** | |||
* Answer of {@link DistributedCall}, aggregating the answers from | |||
* all the target members. | |||
*/ | |||
public class DistributedAnswer<T> { | |||
private final Map<Member, T> answers = new HashMap<>(); | |||
private final Set<Member> timedOutMembers = new HashSet<>(); | |||
private final Map<Member, Exception> failedMembers = new HashMap<>(); | |||
public Optional<T> getAnswer(Member member) { | |||
return Optional.ofNullable(answers.get(member)); | |||
} | |||
public boolean hasTimedOut(Member member) { | |||
return timedOutMembers.contains(member); | |||
} | |||
public Optional<Exception> getFailed(Member member) { | |||
return Optional.ofNullable(failedMembers.get(member)); | |||
} | |||
public Collection<Member> getMembers() { | |||
List<Member> members = new ArrayList<>(); | |||
members.addAll(answers.keySet()); | |||
members.addAll(timedOutMembers); | |||
members.addAll(failedMembers.keySet()); | |||
return members; | |||
} | |||
void setAnswer(Member member, T answer) { | |||
this.answers.put(member, answer); | |||
} | |||
void setTimedOut(Member member) { | |||
this.timedOutMembers.add(member); | |||
} | |||
void setFailed(Member member, Exception e) { | |||
failedMembers.put(member, e); | |||
} | |||
} |
@@ -0,0 +1,26 @@ | |||
/* | |||
* SonarQube | |||
* Copyright (C) 2009-2017 SonarSource SA | |||
* mailto:info AT sonarsource DOT com | |||
* | |||
* This program is free software; you can redistribute it and/or | |||
* modify it under the terms of the GNU Lesser General Public | |||
* License as published by the Free Software Foundation; either | |||
* version 3 of the License, or (at your option) any later version. | |||
* | |||
* This program is distributed in the hope that it will be useful, | |||
* but WITHOUT ANY WARRANTY; without even the implied warranty of | |||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU | |||
* Lesser General Public License for more details. | |||
* | |||
* You should have received a copy of the GNU Lesser General Public License | |||
* along with this program; if not, write to the Free Software Foundation, | |||
* Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. | |||
*/ | |||
package org.sonar.process.cluster.hz; | |||
import java.io.Serializable; | |||
import java.util.concurrent.Callable; | |||
public interface DistributedCall<T> extends Callable<T>, Serializable { | |||
} |
@@ -21,6 +21,7 @@ package org.sonar.process.cluster.hz; | |||
import com.hazelcast.core.Cluster; | |||
import com.hazelcast.core.IAtomicReference; | |||
import com.hazelcast.core.MemberSelector; | |||
import java.util.List; | |||
import java.util.Map; | |||
import java.util.Set; | |||
@@ -98,6 +99,19 @@ public interface HazelcastMember extends AutoCloseable { | |||
Cluster getCluster(); | |||
/** | |||
* Runs a distributed query on a set of Hazelcast members. | |||
* | |||
* @param callable the query that is executed on all target members. Be careful of classloader, don't use classes | |||
* that are not available in classpath of target members. | |||
* @param memberSelector the subset of members to target. See {@link com.hazelcast.cluster.memberselector.MemberSelectors} | |||
* for utilities. | |||
* @param timeoutMs the total timeout to get responses from all target members, in milliseconds. If timeout is reached, then | |||
* the members that didn't answer on time are marked as timed-out in {@link DistributedAnswer} | |||
*/ | |||
<T> DistributedAnswer<T> call(DistributedCall<T> callable, MemberSelector memberSelector, long timeoutMs) | |||
throws InterruptedException; | |||
@Override | |||
void close(); | |||
} |
@@ -23,10 +23,16 @@ import com.hazelcast.core.Cluster; | |||
import com.hazelcast.core.HazelcastInstance; | |||
import com.hazelcast.core.HazelcastInstanceNotActiveException; | |||
import com.hazelcast.core.IAtomicReference; | |||
import com.hazelcast.core.IExecutorService; | |||
import com.hazelcast.core.Member; | |||
import com.hazelcast.core.MemberSelector; | |||
import java.util.List; | |||
import java.util.Map; | |||
import java.util.Set; | |||
import java.util.concurrent.ExecutionException; | |||
import java.util.concurrent.Future; | |||
import java.util.concurrent.TimeUnit; | |||
import java.util.concurrent.TimeoutException; | |||
import java.util.concurrent.locks.Lock; | |||
import java.util.stream.Collectors; | |||
import org.slf4j.LoggerFactory; | |||
@@ -89,6 +95,32 @@ class HazelcastMemberImpl implements HazelcastMember { | |||
return hzInstance.getCluster(); | |||
} | |||
@Override | |||
public <T> DistributedAnswer<T> call(DistributedCall<T> callable, MemberSelector memberSelector, long timeoutMs) | |||
throws InterruptedException { | |||
IExecutorService executor = hzInstance.getExecutorService("default"); | |||
Map<Member, Future<T>> futures = executor.submitToMembers(callable, memberSelector); | |||
try { | |||
DistributedAnswer<T> distributedAnswer = new DistributedAnswer<>(); | |||
long maxTime = System.currentTimeMillis() + timeoutMs; | |||
for (Map.Entry<Member, Future<T>> entry : futures.entrySet()) { | |||
long remainingMs = Math.max(maxTime - System.currentTimeMillis(), 5L); | |||
try { | |||
T answer = entry.getValue().get(remainingMs, TimeUnit.MILLISECONDS); | |||
distributedAnswer.setAnswer(entry.getKey(), answer); | |||
} catch (ExecutionException e) { | |||
distributedAnswer.setFailed(entry.getKey(), e); | |||
} catch (TimeoutException e) { | |||
distributedAnswer.setTimedOut(entry.getKey()); | |||
} | |||
} | |||
return distributedAnswer; | |||
} finally { | |||
futures.values().forEach(f -> f.cancel(true)); | |||
} | |||
} | |||
@Override | |||
public void close() { | |||
try { |
@@ -0,0 +1,84 @@ | |||
/* | |||
* SonarQube | |||
* Copyright (C) 2009-2017 SonarSource SA | |||
* mailto:info AT sonarsource DOT com | |||
* | |||
* This program is free software; you can redistribute it and/or | |||
* modify it under the terms of the GNU Lesser General Public | |||
* License as published by the Free Software Foundation; either | |||
* version 3 of the License, or (at your option) any later version. | |||
* | |||
* This program is distributed in the hope that it will be useful, | |||
* but WITHOUT ANY WARRANTY; without even the implied warranty of | |||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU | |||
* Lesser General Public License for more details. | |||
* | |||
* You should have received a copy of the GNU Lesser General Public License | |||
* along with this program; if not, write to the Free Software Foundation, | |||
* Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. | |||
*/ | |||
package org.sonar.process.cluster.hz; | |||
import com.hazelcast.core.Member; | |||
import java.io.IOException; | |||
import org.junit.Test; | |||
import static org.assertj.core.api.Assertions.assertThat; | |||
import static org.mockito.Mockito.mock; | |||
import static org.mockito.Mockito.when; | |||
public class DistributedAnswerTest { | |||
private Member member = newMember("member1"); | |||
private DistributedAnswer underTest = new DistributedAnswer(); | |||
@Test | |||
public void test_call_with_unknown_member() { | |||
assertThat(underTest.getAnswer(member)).isEmpty(); | |||
assertThat(underTest.hasTimedOut(member)).isFalse(); | |||
assertThat(underTest.getFailed(member)).isEmpty(); | |||
} | |||
@Test | |||
public void test_setAnswer() { | |||
underTest.setAnswer(member, "foo"); | |||
assertThat(underTest.getAnswer(member)).hasValue("foo"); | |||
assertThat(underTest.hasTimedOut(member)).isFalse(); | |||
} | |||
@Test | |||
public void test_setTimedOut() { | |||
underTest.setTimedOut(member); | |||
assertThat(underTest.getAnswer(member)).isEmpty(); | |||
assertThat(underTest.hasTimedOut(member)).isTrue(); | |||
} | |||
@Test | |||
public void test_setFailed() { | |||
IOException e = new IOException(); | |||
underTest.setFailed(member, e); | |||
assertThat(underTest.getFailed(member)).hasValue(e); | |||
} | |||
@Test | |||
public void member_can_be_referenced_multiple_times() { | |||
underTest.setTimedOut(member); | |||
underTest.setAnswer(member, "foo"); | |||
IOException exception = new IOException(); | |||
underTest.setFailed(member, exception); | |||
assertThat(underTest.hasTimedOut(member)).isTrue(); | |||
assertThat(underTest.getAnswer(member)).hasValue("foo"); | |||
assertThat(underTest.getFailed(member)).hasValue(exception); | |||
} | |||
private static Member newMember(String uuid) { | |||
Member member = mock(Member.class); | |||
when(member.getUuid()).thenReturn(uuid); | |||
return member; | |||
} | |||
} |
@@ -0,0 +1,37 @@ | |||
/* | |||
* SonarQube | |||
* Copyright (C) 2009-2017 SonarSource SA | |||
* mailto:info AT sonarsource DOT com | |||
* | |||
* This program is free software; you can redistribute it and/or | |||
* modify it under the terms of the GNU Lesser General Public | |||
* License as published by the Free Software Foundation; either | |||
* version 3 of the License, or (at your option) any later version. | |||
* | |||
* This program is distributed in the hope that it will be useful, | |||
* but WITHOUT ANY WARRANTY; without even the implied warranty of | |||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU | |||
* Lesser General Public License for more details. | |||
* | |||
* You should have received a copy of the GNU Lesser General Public License | |||
* along with this program; if not, write to the Free Software Foundation, | |||
* Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. | |||
*/ | |||
package org.sonar.process.cluster.hz; | |||
import java.io.IOException; | |||
import java.util.concurrent.atomic.AtomicLong; | |||
public class FailedDistributedCall implements DistributedCall<Long> { | |||
static final AtomicLong COUNTER = new AtomicLong(); | |||
@Override | |||
public Long call() throws Exception { | |||
long value = COUNTER.getAndIncrement(); | |||
if (value == 1L) { | |||
// only the second call fails | |||
throw new IOException("BOOM"); | |||
} | |||
return value; | |||
} | |||
} |
@@ -0,0 +1,146 @@ | |||
/* | |||
* SonarQube | |||
* Copyright (C) 2009-2017 SonarSource SA | |||
* mailto:info AT sonarsource DOT com | |||
* | |||
* This program is free software; you can redistribute it and/or | |||
* modify it under the terms of the GNU Lesser General Public | |||
* License as published by the Free Software Foundation; either | |||
* version 3 of the License, or (at your option) any later version. | |||
* | |||
* This program is distributed in the hope that it will be useful, | |||
* but WITHOUT ANY WARRANTY; without even the implied warranty of | |||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU | |||
* Lesser General Public License for more details. | |||
* | |||
* You should have received a copy of the GNU Lesser General Public License | |||
* along with this program; if not, write to the Free Software Foundation, | |||
* Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. | |||
*/ | |||
package org.sonar.process.cluster.hz; | |||
import com.hazelcast.cluster.memberselector.MemberSelectors; | |||
import com.hazelcast.core.Member; | |||
import java.net.InetAddress; | |||
import java.util.Arrays; | |||
import java.util.List; | |||
import java.util.Optional; | |||
import java.util.Set; | |||
import java.util.stream.Collectors; | |||
import org.junit.AfterClass; | |||
import org.junit.BeforeClass; | |||
import org.junit.Rule; | |||
import org.junit.Test; | |||
import org.junit.rules.DisableOnDebug; | |||
import org.junit.rules.ExpectedException; | |||
import org.junit.rules.TestRule; | |||
import org.junit.rules.Timeout; | |||
import org.sonar.process.NetworkUtils; | |||
import org.sonar.process.ProcessId; | |||
import org.sonar.process.cluster.NodeType; | |||
import static org.assertj.core.api.Assertions.assertThat; | |||
public class HazelcastMemberImplTest { | |||
@Rule | |||
public ExpectedException expectedException = ExpectedException.none(); | |||
@Rule | |||
public TestRule safeguardTimeout = new DisableOnDebug(Timeout.seconds(60)); | |||
// use loopback for support of offline builds | |||
private static InetAddress loopback = InetAddress.getLoopbackAddress(); | |||
private static HazelcastMember member1; | |||
private static HazelcastMember member2; | |||
private static HazelcastMember member3; | |||
@BeforeClass | |||
public static void setUp() throws Exception { | |||
int port1 = NetworkUtils.INSTANCE.getNextAvailablePort(loopback); | |||
int port2 = NetworkUtils.INSTANCE.getNextAvailablePort(loopback); | |||
int port3 = NetworkUtils.INSTANCE.getNextAvailablePort(loopback); | |||
member1 = newHzMember(port1, port2, port3); | |||
member2 = newHzMember(port2, port1, port3); | |||
member3 = newHzMember(port3, port1, port2); | |||
} | |||
@AfterClass | |||
public static void tearDown() throws Exception { | |||
member1.close(); | |||
member2.close(); | |||
member3.close(); | |||
} | |||
@Test | |||
public void call_executes_query_on_members() throws Exception { | |||
SuccessfulDistributedCall.COUNTER.set(0L); | |||
DistributedCall<Long> call = new SuccessfulDistributedCall(); | |||
DistributedAnswer<Long> answer = member1.call(call, MemberSelectors.DATA_MEMBER_SELECTOR, 30_000L); | |||
assertThat(answer.getMembers()).extracting(Member::getUuid).containsOnlyOnce(member1.getUuid(), member2.getUuid(), member3.getUuid()); | |||
assertThat(extractAnswers(answer)).containsOnlyOnce(0L, 1L, 2L); | |||
} | |||
@Test | |||
public void timed_out_calls_do_not_break_other_answers() throws InterruptedException { | |||
// member 1 and 3 success, member 2 times-out | |||
TimedOutDistributedCall.COUNTER.set(0L); | |||
DistributedCall call = new TimedOutDistributedCall(); | |||
DistributedAnswer<Long> answer = member1.call(call, MemberSelectors.DATA_MEMBER_SELECTOR, 2_000L); | |||
assertThat(extractAnswers(answer)).containsOnlyOnce(0L, 2L); | |||
assertThat(extractTimeOuts(answer)).containsExactlyInAnyOrder(false, false, true); | |||
} | |||
@Test | |||
public void failed_calls_do_not_break_other_answers() throws InterruptedException { | |||
// member 1 and 3 success, member 2 fails | |||
FailedDistributedCall.COUNTER.set(0L); | |||
DistributedCall call = new FailedDistributedCall(); | |||
DistributedAnswer<Long> answer = member1.call(call, MemberSelectors.DATA_MEMBER_SELECTOR, 2_000L); | |||
// 2 successful answers | |||
assertThat(extractAnswers(answer)).containsOnlyOnce(0L, 2L); | |||
// 1 failure | |||
List<Exception> failures = extractFailures(answer); | |||
assertThat(failures).hasSize(1); | |||
assertThat(failures.get(0)).hasMessageContaining("BOOM"); | |||
} | |||
private static HazelcastMember newHzMember(int port, int... otherPorts) { | |||
return new HazelcastMemberBuilder() | |||
.setNodeType(NodeType.APPLICATION) | |||
.setProcessId(ProcessId.COMPUTE_ENGINE) | |||
.setClusterName("foo") | |||
.setNodeName("name" + port) | |||
.setPort(port) | |||
.setNetworkInterface(loopback.getHostAddress()) | |||
.setMembers(Arrays.stream(otherPorts).mapToObj(p -> loopback.getHostAddress() + ":" + p).collect(Collectors.toList())) | |||
.build(); | |||
} | |||
private static Set<Long> extractAnswers(DistributedAnswer<Long> answer) { | |||
return answer.getMembers().stream() | |||
.map(answer::getAnswer) | |||
.filter(Optional::isPresent) | |||
.map(Optional::get) | |||
.collect(Collectors.toSet()); | |||
} | |||
private static List<Exception> extractFailures(DistributedAnswer<Long> answer) { | |||
return answer.getMembers().stream() | |||
.map(answer::getFailed) | |||
.filter(Optional::isPresent) | |||
.map(Optional::get) | |||
.collect(Collectors.toList()); | |||
} | |||
private static List<Boolean> extractTimeOuts(DistributedAnswer<Long> answer) { | |||
return answer.getMembers().stream() | |||
.map(answer::hasTimedOut) | |||
.collect(Collectors.toList()); | |||
} | |||
} |
@@ -0,0 +1,31 @@ | |||
/* | |||
* SonarQube | |||
* Copyright (C) 2009-2017 SonarSource SA | |||
* mailto:info AT sonarsource DOT com | |||
* | |||
* This program is free software; you can redistribute it and/or | |||
* modify it under the terms of the GNU Lesser General Public | |||
* License as published by the Free Software Foundation; either | |||
* version 3 of the License, or (at your option) any later version. | |||
* | |||
* This program is distributed in the hope that it will be useful, | |||
* but WITHOUT ANY WARRANTY; without even the implied warranty of | |||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU | |||
* Lesser General Public License for more details. | |||
* | |||
* You should have received a copy of the GNU Lesser General Public License | |||
* along with this program; if not, write to the Free Software Foundation, | |||
* Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. | |||
*/ | |||
package org.sonar.process.cluster.hz; | |||
import java.util.concurrent.atomic.AtomicLong; | |||
public class SuccessfulDistributedCall implements DistributedCall<Long> { | |||
static final AtomicLong COUNTER = new AtomicLong(); | |||
@Override | |||
public Long call() throws Exception { | |||
return COUNTER.getAndIncrement(); | |||
} | |||
} |
@@ -0,0 +1,36 @@ | |||
/* | |||
* SonarQube | |||
* Copyright (C) 2009-2017 SonarSource SA | |||
* mailto:info AT sonarsource DOT com | |||
* | |||
* This program is free software; you can redistribute it and/or | |||
* modify it under the terms of the GNU Lesser General Public | |||
* License as published by the Free Software Foundation; either | |||
* version 3 of the License, or (at your option) any later version. | |||
* | |||
* This program is distributed in the hope that it will be useful, | |||
* but WITHOUT ANY WARRANTY; without even the implied warranty of | |||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU | |||
* Lesser General Public License for more details. | |||
* | |||
* You should have received a copy of the GNU Lesser General Public License | |||
* along with this program; if not, write to the Free Software Foundation, | |||
* Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. | |||
*/ | |||
package org.sonar.process.cluster.hz; | |||
import java.util.concurrent.atomic.AtomicLong; | |||
public class TimedOutDistributedCall implements DistributedCall<Long> { | |||
static final AtomicLong COUNTER = new AtomicLong(); | |||
@Override | |||
public Long call() throws Exception { | |||
long value = COUNTER.getAndIncrement(); | |||
if (value == 1L) { | |||
// only the second call times out | |||
Thread.sleep(30_000L); | |||
} | |||
return value; | |||
} | |||
} |
@@ -21,6 +21,7 @@ package org.sonar.server.cluster; | |||
import com.hazelcast.core.Cluster; | |||
import com.hazelcast.core.IAtomicReference; | |||
import com.hazelcast.core.MemberSelector; | |||
import java.net.InetAddress; | |||
import java.net.UnknownHostException; | |||
import java.util.List; | |||
@@ -33,6 +34,8 @@ import org.sonar.process.NetworkUtils; | |||
import org.sonar.process.ProcessId; | |||
import org.sonar.process.ProcessProperties; | |||
import org.sonar.process.cluster.NodeType; | |||
import org.sonar.process.cluster.hz.DistributedAnswer; | |||
import org.sonar.process.cluster.hz.DistributedCall; | |||
import org.sonar.process.cluster.hz.HazelcastMember; | |||
import org.sonar.process.cluster.hz.HazelcastMemberBuilder; | |||
@@ -110,6 +113,12 @@ public class StartableHazelcastMember implements HazelcastMember, Startable { | |||
return nonNullMember().getCluster(); | |||
} | |||
@Override | |||
public <T> DistributedAnswer<T> call(DistributedCall<T> callable, MemberSelector memberSelector, long timeoutMs) | |||
throws InterruptedException { | |||
return nonNullMember().call(callable, memberSelector, timeoutMs); | |||
} | |||
private HazelcastMember nonNullMember() { | |||
return requireNonNull(member, "Hazelcast member not started"); | |||
} |