diff options
author | Simon Brandhof <simon.brandhof@sonarsource.com> | 2017-09-18 15:38:49 +0200 |
---|---|---|
committer | Simon Brandhof <simon.brandhof@sonarsource.com> | 2017-09-26 23:49:37 +0200 |
commit | df538630ea2ae47b725414257f65a8a9e28722bb (patch) | |
tree | 6d30b4a2e45fd54a115775b11c625f1cdb24bd68 /server/sonar-process/src | |
parent | e4c401f8bc9100fec7fd28bb93469d3de199f5f7 (diff) | |
download | sonarqube-df538630ea2ae47b725414257f65a8a9e28722bb.tar.gz sonarqube-df538630ea2ae47b725414257f65a8a9e28722bb.zip |
SONAR-9802 ability to execute distributed calls
See HazelcastMember#call(DistributedCall, ...)
Diffstat (limited to 'server/sonar-process/src')
9 files changed, 479 insertions, 0 deletions
diff --git a/server/sonar-process/src/main/java/org/sonar/process/cluster/hz/DistributedAnswer.java b/server/sonar-process/src/main/java/org/sonar/process/cluster/hz/DistributedAnswer.java new file mode 100644 index 00000000000..25c60a3c712 --- /dev/null +++ b/server/sonar-process/src/main/java/org/sonar/process/cluster/hz/DistributedAnswer.java @@ -0,0 +1,73 @@ +/* + * SonarQube + * Copyright (C) 2009-2017 SonarSource SA + * mailto:info AT sonarsource DOT com + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 3 of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with this program; if not, write to the Free Software Foundation, + * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ +package org.sonar.process.cluster.hz; + +import com.hazelcast.core.Member; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; + +/** + * Answer of {@link DistributedCall}, aggregating the answers from + * all the target members. + */ +public class DistributedAnswer<T> { + + private final Map<Member, T> answers = new HashMap<>(); + private final Set<Member> timedOutMembers = new HashSet<>(); + private final Map<Member, Exception> failedMembers = new HashMap<>(); + + public Optional<T> getAnswer(Member member) { + return Optional.ofNullable(answers.get(member)); + } + + public boolean hasTimedOut(Member member) { + return timedOutMembers.contains(member); + } + + public Optional<Exception> getFailed(Member member) { + return Optional.ofNullable(failedMembers.get(member)); + } + + public Collection<Member> getMembers() { + List<Member> members = new ArrayList<>(); + members.addAll(answers.keySet()); + members.addAll(timedOutMembers); + members.addAll(failedMembers.keySet()); + return members; + } + + void setAnswer(Member member, T answer) { + this.answers.put(member, answer); + } + + void setTimedOut(Member member) { + this.timedOutMembers.add(member); + } + + void setFailed(Member member, Exception e) { + failedMembers.put(member, e); + } +} diff --git a/server/sonar-process/src/main/java/org/sonar/process/cluster/hz/DistributedCall.java b/server/sonar-process/src/main/java/org/sonar/process/cluster/hz/DistributedCall.java new file mode 100644 index 00000000000..c55668e7c93 --- /dev/null +++ b/server/sonar-process/src/main/java/org/sonar/process/cluster/hz/DistributedCall.java @@ -0,0 +1,26 @@ +/* + * SonarQube + * Copyright (C) 2009-2017 SonarSource SA + * mailto:info AT sonarsource DOT com + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 3 of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with this program; if not, write to the Free Software Foundation, + * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ +package org.sonar.process.cluster.hz; + +import java.io.Serializable; +import java.util.concurrent.Callable; + +public interface DistributedCall<T> extends Callable<T>, Serializable { +} diff --git a/server/sonar-process/src/main/java/org/sonar/process/cluster/hz/HazelcastMember.java b/server/sonar-process/src/main/java/org/sonar/process/cluster/hz/HazelcastMember.java index 7521fc9c4ea..0a70741d020 100644 --- a/server/sonar-process/src/main/java/org/sonar/process/cluster/hz/HazelcastMember.java +++ b/server/sonar-process/src/main/java/org/sonar/process/cluster/hz/HazelcastMember.java @@ -21,6 +21,7 @@ package org.sonar.process.cluster.hz; import com.hazelcast.core.Cluster; import com.hazelcast.core.IAtomicReference; +import com.hazelcast.core.MemberSelector; import java.util.List; import java.util.Map; import java.util.Set; @@ -98,6 +99,19 @@ public interface HazelcastMember extends AutoCloseable { Cluster getCluster(); + /** + * Runs a distributed query on a set of Hazelcast members. + * + * @param callable the query that is executed on all target members. Be careful of classloader, don't use classes + * that are not available in classpath of target members. + * @param memberSelector the subset of members to target. See {@link com.hazelcast.cluster.memberselector.MemberSelectors} + * for utilities. + * @param timeoutMs the total timeout to get responses from all target members, in milliseconds. If timeout is reached, then + * the members that didn't answer on time are marked as timed-out in {@link DistributedAnswer} + */ + <T> DistributedAnswer<T> call(DistributedCall<T> callable, MemberSelector memberSelector, long timeoutMs) + throws InterruptedException; + @Override void close(); } diff --git a/server/sonar-process/src/main/java/org/sonar/process/cluster/hz/HazelcastMemberImpl.java b/server/sonar-process/src/main/java/org/sonar/process/cluster/hz/HazelcastMemberImpl.java index 606c5892800..118b4e6ee1d 100644 --- a/server/sonar-process/src/main/java/org/sonar/process/cluster/hz/HazelcastMemberImpl.java +++ b/server/sonar-process/src/main/java/org/sonar/process/cluster/hz/HazelcastMemberImpl.java @@ -23,10 +23,16 @@ import com.hazelcast.core.Cluster; import com.hazelcast.core.HazelcastInstance; import com.hazelcast.core.HazelcastInstanceNotActiveException; import com.hazelcast.core.IAtomicReference; +import com.hazelcast.core.IExecutorService; import com.hazelcast.core.Member; +import com.hazelcast.core.MemberSelector; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.locks.Lock; import java.util.stream.Collectors; import org.slf4j.LoggerFactory; @@ -90,6 +96,32 @@ class HazelcastMemberImpl implements HazelcastMember { } @Override + public <T> DistributedAnswer<T> call(DistributedCall<T> callable, MemberSelector memberSelector, long timeoutMs) + throws InterruptedException { + + IExecutorService executor = hzInstance.getExecutorService("default"); + Map<Member, Future<T>> futures = executor.submitToMembers(callable, memberSelector); + try { + DistributedAnswer<T> distributedAnswer = new DistributedAnswer<>(); + long maxTime = System.currentTimeMillis() + timeoutMs; + for (Map.Entry<Member, Future<T>> entry : futures.entrySet()) { + long remainingMs = Math.max(maxTime - System.currentTimeMillis(), 5L); + try { + T answer = entry.getValue().get(remainingMs, TimeUnit.MILLISECONDS); + distributedAnswer.setAnswer(entry.getKey(), answer); + } catch (ExecutionException e) { + distributedAnswer.setFailed(entry.getKey(), e); + } catch (TimeoutException e) { + distributedAnswer.setTimedOut(entry.getKey()); + } + } + return distributedAnswer; + } finally { + futures.values().forEach(f -> f.cancel(true)); + } + } + + @Override public void close() { try { hzInstance.shutdown(); diff --git a/server/sonar-process/src/test/java/org/sonar/process/cluster/hz/DistributedAnswerTest.java b/server/sonar-process/src/test/java/org/sonar/process/cluster/hz/DistributedAnswerTest.java new file mode 100644 index 00000000000..6c9d8270a19 --- /dev/null +++ b/server/sonar-process/src/test/java/org/sonar/process/cluster/hz/DistributedAnswerTest.java @@ -0,0 +1,84 @@ +/* + * SonarQube + * Copyright (C) 2009-2017 SonarSource SA + * mailto:info AT sonarsource DOT com + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 3 of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with this program; if not, write to the Free Software Foundation, + * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ +package org.sonar.process.cluster.hz; + +import com.hazelcast.core.Member; +import java.io.IOException; +import org.junit.Test; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + + +public class DistributedAnswerTest { + + private Member member = newMember("member1"); + private DistributedAnswer underTest = new DistributedAnswer(); + + @Test + public void test_call_with_unknown_member() { + assertThat(underTest.getAnswer(member)).isEmpty(); + assertThat(underTest.hasTimedOut(member)).isFalse(); + assertThat(underTest.getFailed(member)).isEmpty(); + } + + @Test + public void test_setAnswer() { + underTest.setAnswer(member, "foo"); + + assertThat(underTest.getAnswer(member)).hasValue("foo"); + assertThat(underTest.hasTimedOut(member)).isFalse(); + } + + @Test + public void test_setTimedOut() { + underTest.setTimedOut(member); + + assertThat(underTest.getAnswer(member)).isEmpty(); + assertThat(underTest.hasTimedOut(member)).isTrue(); + } + + @Test + public void test_setFailed() { + IOException e = new IOException(); + underTest.setFailed(member, e); + + assertThat(underTest.getFailed(member)).hasValue(e); + } + + @Test + public void member_can_be_referenced_multiple_times() { + underTest.setTimedOut(member); + underTest.setAnswer(member, "foo"); + IOException exception = new IOException(); + underTest.setFailed(member, exception); + + assertThat(underTest.hasTimedOut(member)).isTrue(); + assertThat(underTest.getAnswer(member)).hasValue("foo"); + assertThat(underTest.getFailed(member)).hasValue(exception); + } + + private static Member newMember(String uuid) { + Member member = mock(Member.class); + when(member.getUuid()).thenReturn(uuid); + return member; + } +} diff --git a/server/sonar-process/src/test/java/org/sonar/process/cluster/hz/FailedDistributedCall.java b/server/sonar-process/src/test/java/org/sonar/process/cluster/hz/FailedDistributedCall.java new file mode 100644 index 00000000000..03a13d6fda5 --- /dev/null +++ b/server/sonar-process/src/test/java/org/sonar/process/cluster/hz/FailedDistributedCall.java @@ -0,0 +1,37 @@ +/* + * SonarQube + * Copyright (C) 2009-2017 SonarSource SA + * mailto:info AT sonarsource DOT com + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 3 of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with this program; if not, write to the Free Software Foundation, + * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ +package org.sonar.process.cluster.hz; + +import java.io.IOException; +import java.util.concurrent.atomic.AtomicLong; + +public class FailedDistributedCall implements DistributedCall<Long> { + static final AtomicLong COUNTER = new AtomicLong(); + + @Override + public Long call() throws Exception { + long value = COUNTER.getAndIncrement(); + if (value == 1L) { + // only the second call fails + throw new IOException("BOOM"); + } + return value; + } +} diff --git a/server/sonar-process/src/test/java/org/sonar/process/cluster/hz/HazelcastMemberImplTest.java b/server/sonar-process/src/test/java/org/sonar/process/cluster/hz/HazelcastMemberImplTest.java new file mode 100644 index 00000000000..72087914a8e --- /dev/null +++ b/server/sonar-process/src/test/java/org/sonar/process/cluster/hz/HazelcastMemberImplTest.java @@ -0,0 +1,146 @@ +/* + * SonarQube + * Copyright (C) 2009-2017 SonarSource SA + * mailto:info AT sonarsource DOT com + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 3 of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with this program; if not, write to the Free Software Foundation, + * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ +package org.sonar.process.cluster.hz; + +import com.hazelcast.cluster.memberselector.MemberSelectors; +import com.hazelcast.core.Member; +import java.net.InetAddress; +import java.util.Arrays; +import java.util.List; +import java.util.Optional; +import java.util.Set; +import java.util.stream.Collectors; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.DisableOnDebug; +import org.junit.rules.ExpectedException; +import org.junit.rules.TestRule; +import org.junit.rules.Timeout; +import org.sonar.process.NetworkUtils; +import org.sonar.process.ProcessId; +import org.sonar.process.cluster.NodeType; + +import static org.assertj.core.api.Assertions.assertThat; + +public class HazelcastMemberImplTest { + + @Rule + public ExpectedException expectedException = ExpectedException.none(); + @Rule + public TestRule safeguardTimeout = new DisableOnDebug(Timeout.seconds(60)); + + // use loopback for support of offline builds + private static InetAddress loopback = InetAddress.getLoopbackAddress(); + private static HazelcastMember member1; + private static HazelcastMember member2; + private static HazelcastMember member3; + + @BeforeClass + public static void setUp() throws Exception { + int port1 = NetworkUtils.INSTANCE.getNextAvailablePort(loopback); + int port2 = NetworkUtils.INSTANCE.getNextAvailablePort(loopback); + int port3 = NetworkUtils.INSTANCE.getNextAvailablePort(loopback); + member1 = newHzMember(port1, port2, port3); + member2 = newHzMember(port2, port1, port3); + member3 = newHzMember(port3, port1, port2); + } + + @AfterClass + public static void tearDown() throws Exception { + member1.close(); + member2.close(); + member3.close(); + } + + @Test + public void call_executes_query_on_members() throws Exception { + SuccessfulDistributedCall.COUNTER.set(0L); + DistributedCall<Long> call = new SuccessfulDistributedCall(); + + DistributedAnswer<Long> answer = member1.call(call, MemberSelectors.DATA_MEMBER_SELECTOR, 30_000L); + + assertThat(answer.getMembers()).extracting(Member::getUuid).containsOnlyOnce(member1.getUuid(), member2.getUuid(), member3.getUuid()); + assertThat(extractAnswers(answer)).containsOnlyOnce(0L, 1L, 2L); + } + + @Test + public void timed_out_calls_do_not_break_other_answers() throws InterruptedException { + // member 1 and 3 success, member 2 times-out + TimedOutDistributedCall.COUNTER.set(0L); + DistributedCall call = new TimedOutDistributedCall(); + DistributedAnswer<Long> answer = member1.call(call, MemberSelectors.DATA_MEMBER_SELECTOR, 2_000L); + + assertThat(extractAnswers(answer)).containsOnlyOnce(0L, 2L); + + assertThat(extractTimeOuts(answer)).containsExactlyInAnyOrder(false, false, true); + } + + @Test + public void failed_calls_do_not_break_other_answers() throws InterruptedException { + // member 1 and 3 success, member 2 fails + FailedDistributedCall.COUNTER.set(0L); + DistributedCall call = new FailedDistributedCall(); + DistributedAnswer<Long> answer = member1.call(call, MemberSelectors.DATA_MEMBER_SELECTOR, 2_000L); + + // 2 successful answers + assertThat(extractAnswers(answer)).containsOnlyOnce(0L, 2L); + + // 1 failure + List<Exception> failures = extractFailures(answer); + assertThat(failures).hasSize(1); + assertThat(failures.get(0)).hasMessageContaining("BOOM"); + } + + private static HazelcastMember newHzMember(int port, int... otherPorts) { + return new HazelcastMemberBuilder() + .setNodeType(NodeType.APPLICATION) + .setProcessId(ProcessId.COMPUTE_ENGINE) + .setClusterName("foo") + .setNodeName("name" + port) + .setPort(port) + .setNetworkInterface(loopback.getHostAddress()) + .setMembers(Arrays.stream(otherPorts).mapToObj(p -> loopback.getHostAddress() + ":" + p).collect(Collectors.toList())) + .build(); + } + + private static Set<Long> extractAnswers(DistributedAnswer<Long> answer) { + return answer.getMembers().stream() + .map(answer::getAnswer) + .filter(Optional::isPresent) + .map(Optional::get) + .collect(Collectors.toSet()); + } + + private static List<Exception> extractFailures(DistributedAnswer<Long> answer) { + return answer.getMembers().stream() + .map(answer::getFailed) + .filter(Optional::isPresent) + .map(Optional::get) + .collect(Collectors.toList()); + } + + private static List<Boolean> extractTimeOuts(DistributedAnswer<Long> answer) { + return answer.getMembers().stream() + .map(answer::hasTimedOut) + .collect(Collectors.toList()); + } +} diff --git a/server/sonar-process/src/test/java/org/sonar/process/cluster/hz/SuccessfulDistributedCall.java b/server/sonar-process/src/test/java/org/sonar/process/cluster/hz/SuccessfulDistributedCall.java new file mode 100644 index 00000000000..515965617b9 --- /dev/null +++ b/server/sonar-process/src/test/java/org/sonar/process/cluster/hz/SuccessfulDistributedCall.java @@ -0,0 +1,31 @@ +/* + * SonarQube + * Copyright (C) 2009-2017 SonarSource SA + * mailto:info AT sonarsource DOT com + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 3 of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with this program; if not, write to the Free Software Foundation, + * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ +package org.sonar.process.cluster.hz; + +import java.util.concurrent.atomic.AtomicLong; + +public class SuccessfulDistributedCall implements DistributedCall<Long> { + static final AtomicLong COUNTER = new AtomicLong(); + + @Override + public Long call() throws Exception { + return COUNTER.getAndIncrement(); + } +} diff --git a/server/sonar-process/src/test/java/org/sonar/process/cluster/hz/TimedOutDistributedCall.java b/server/sonar-process/src/test/java/org/sonar/process/cluster/hz/TimedOutDistributedCall.java new file mode 100644 index 00000000000..05136aea2c6 --- /dev/null +++ b/server/sonar-process/src/test/java/org/sonar/process/cluster/hz/TimedOutDistributedCall.java @@ -0,0 +1,36 @@ +/* + * SonarQube + * Copyright (C) 2009-2017 SonarSource SA + * mailto:info AT sonarsource DOT com + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 3 of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with this program; if not, write to the Free Software Foundation, + * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ +package org.sonar.process.cluster.hz; + +import java.util.concurrent.atomic.AtomicLong; + +public class TimedOutDistributedCall implements DistributedCall<Long> { + static final AtomicLong COUNTER = new AtomicLong(); + + @Override + public Long call() throws Exception { + long value = COUNTER.getAndIncrement(); + if (value == 1L) { + // only the second call times out + Thread.sleep(30_000L); + } + return value; + } +} |