]> source.dussan.org Git - sonarqube.git/commitdiff
SONAR-9802 ability to execute distributed calls
authorSimon Brandhof <simon.brandhof@sonarsource.com>
Mon, 18 Sep 2017 13:38:49 +0000 (15:38 +0200)
committerSimon Brandhof <simon.brandhof@sonarsource.com>
Tue, 26 Sep 2017 21:49:37 +0000 (23:49 +0200)
See HazelcastMember#call(DistributedCall, ...)

15 files changed:
server/sonar-main/src/main/java/org/sonar/application/AppStateFactory.java
server/sonar-main/src/main/java/org/sonar/application/SchedulerImpl.java
server/sonar-main/src/main/java/org/sonar/application/cluster/ClusterAppStateImpl.java
server/sonar-main/src/test/java/org/sonar/application/cluster/ClusterAppStateImplTest.java
server/sonar-main/src/test/java/org/sonar/application/cluster/HazelcastTesting.java [deleted file]
server/sonar-process/src/main/java/org/sonar/process/cluster/hz/DistributedAnswer.java [new file with mode: 0644]
server/sonar-process/src/main/java/org/sonar/process/cluster/hz/DistributedCall.java [new file with mode: 0644]
server/sonar-process/src/main/java/org/sonar/process/cluster/hz/HazelcastMember.java
server/sonar-process/src/main/java/org/sonar/process/cluster/hz/HazelcastMemberImpl.java
server/sonar-process/src/test/java/org/sonar/process/cluster/hz/DistributedAnswerTest.java [new file with mode: 0644]
server/sonar-process/src/test/java/org/sonar/process/cluster/hz/FailedDistributedCall.java [new file with mode: 0644]
server/sonar-process/src/test/java/org/sonar/process/cluster/hz/HazelcastMemberImplTest.java [new file with mode: 0644]
server/sonar-process/src/test/java/org/sonar/process/cluster/hz/SuccessfulDistributedCall.java [new file with mode: 0644]
server/sonar-process/src/test/java/org/sonar/process/cluster/hz/TimedOutDistributedCall.java [new file with mode: 0644]
server/sonar-server/src/main/java/org/sonar/server/cluster/StartableHazelcastMember.java

index a2b9cbd7d700358f792498d918f56bab11115d86..0498f2e41005bbc3d1d91e3f5a927d781c1218cf 100644 (file)
@@ -42,7 +42,7 @@ public class AppStateFactory {
   public AppState create() {
     if (ClusterSettings.isClusterEnabled(settings)) {
       HazelcastMember hzMember = createHzMember(settings.getProps());
-      return new ClusterAppStateImpl(hzMember);
+      return new ClusterAppStateImpl(settings, hzMember);
     }
     return new AppStateImpl();
   }
index 4862cfa89dea607a53c374178901151c8cd62f1f..8589e29414a9edc3b41002669af3346d11592abc 100644 (file)
@@ -28,8 +28,6 @@ import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.Supplier;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.sonar.application.cluster.ClusterAppState;
-import org.sonar.application.cluster.health.SearchNodeHealthProvider;
 import org.sonar.application.command.CommandFactory;
 import org.sonar.application.command.EsCommand;
 import org.sonar.application.command.JavaCommand;
@@ -41,10 +39,7 @@ import org.sonar.application.process.ProcessLauncher;
 import org.sonar.application.process.ProcessLifecycleListener;
 import org.sonar.application.process.ProcessMonitor;
 import org.sonar.application.process.SQProcess;
-import org.sonar.process.NetworkUtils;
 import org.sonar.process.ProcessId;
-import org.sonar.application.cluster.health.HealthStateSharing;
-import org.sonar.application.cluster.health.HealthStateSharingImpl;
 
 public class SchedulerImpl implements Scheduler, ProcessEventListener, ProcessLifecycleListener, AppStateListener {
 
@@ -65,7 +60,6 @@ public class SchedulerImpl implements Scheduler, ProcessEventListener, ProcessLi
   private final AtomicInteger stopCountDown = new AtomicInteger(0);
   private StopperThread stopperThread;
   private RestarterThread restarterThread;
-  private HealthStateSharing healthStateSharing;
   private long processWatcherDelayMs = SQProcess.DEFAULT_WATCHER_DELAY_MS;
 
   public SchedulerImpl(AppSettings settings, AppReloader appReloader, CommandFactory commandFactory,
@@ -105,7 +99,6 @@ public class SchedulerImpl implements Scheduler, ProcessEventListener, ProcessLi
   }
 
   private void tryToStartAll() {
-    tryToStartHealthStateSharing();
     tryToStartEs();
     tryToStartWeb();
     tryToStartCe();
@@ -144,18 +137,6 @@ public class SchedulerImpl implements Scheduler, ProcessEventListener, ProcessLi
     }
   }
 
-  private void tryToStartHealthStateSharing() {
-    if (healthStateSharing == null
-      && appState instanceof ClusterAppState
-      && ClusterSettings.isLocalElasticsearchEnabled(settings)) {
-      ClusterAppState clusterAppState = (ClusterAppState) appState;
-      this.healthStateSharing = new HealthStateSharingImpl(
-        clusterAppState.getHazelcastMember(),
-        new SearchNodeHealthProvider(settings.getProps(), clusterAppState, NetworkUtils.INSTANCE));
-      this.healthStateSharing.start();
-    }
-  }
-
   private boolean isEsClientStartable() {
     boolean requireLocalEs = ClusterSettings.isLocalElasticsearchEnabled(settings);
     return appState.isOperational(ProcessId.ELASTICSEARCH, requireLocalEs);
@@ -190,7 +171,6 @@ public class SchedulerImpl implements Scheduler, ProcessEventListener, ProcessLi
     stopProcess(ProcessId.COMPUTE_ENGINE);
     stopProcess(ProcessId.WEB_SERVER);
     stopProcess(ProcessId.ELASTICSEARCH);
-    stopHealthStateSharing();
   }
 
   /**
@@ -204,12 +184,6 @@ public class SchedulerImpl implements Scheduler, ProcessEventListener, ProcessLi
     }
   }
 
-  private void stopHealthStateSharing() {
-    if (healthStateSharing != null) {
-      healthStateSharing.stop();
-    }
-  }
-
   /**
    * Blocks until all processes are stopped. Pending restart, if
    * any, is disabled.
index 80940e96b38949ca5af6767f5c9d511fa8da6cee..3c23a5501eb99c17968c6c71991010658b5eb7e8 100644 (file)
@@ -39,7 +39,13 @@ import java.util.concurrent.locks.Lock;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.sonar.application.AppStateListener;
+import org.sonar.application.cluster.health.HealthStateSharing;
+import org.sonar.application.cluster.health.HealthStateSharingImpl;
+import org.sonar.application.cluster.health.SearchNodeHealthProvider;
+import org.sonar.application.config.AppSettings;
+import org.sonar.application.config.ClusterSettings;
 import org.sonar.process.MessageException;
+import org.sonar.process.NetworkUtils;
 import org.sonar.process.ProcessId;
 import org.sonar.process.cluster.NodeType;
 import org.sonar.process.cluster.hz.HazelcastMember;
@@ -60,14 +66,20 @@ public class ClusterAppStateImpl implements ClusterAppState {
   private final ReplicatedMap<ClusterProcess, Boolean> operationalProcesses;
   private final String operationalProcessListenerUUID;
   private final String nodeDisconnectedListenerUUID;
+  private HealthStateSharing healthStateSharing = null;
 
-  public ClusterAppStateImpl(HazelcastMember hzMember) {
+  public ClusterAppStateImpl(AppSettings settings, HazelcastMember hzMember) {
     this.hzMember = hzMember;
 
     // Get or create the replicated map
     operationalProcesses = (ReplicatedMap) hzMember.getReplicatedMap(OPERATIONAL_PROCESSES);
     operationalProcessListenerUUID = operationalProcesses.addEntryListener(new OperationalProcessListener());
     nodeDisconnectedListenerUUID = hzMember.getCluster().addMembershipListener(new NodeDisconnectedListener());
+
+    if (ClusterSettings.isLocalElasticsearchEnabled(settings)) {
+      this.healthStateSharing = new HealthStateSharingImpl(hzMember, new SearchNodeHealthProvider(settings.getProps(), this, NetworkUtils.INSTANCE));
+      this.healthStateSharing.start();
+    }
   }
 
   @Override
@@ -184,6 +196,9 @@ public class ClusterAppStateImpl implements ClusterAppState {
   @Override
   public void close() {
     if (hzMember != null) {
+      if (healthStateSharing != null) {
+        healthStateSharing.stop();
+      }
       try {
         // Removing listeners
         operationalProcesses.removeEntryListener(operationalProcessListenerUUID);
index 72e117924a2bbb442a8ecd9a7d3ff40172ff8a92..bf93a7b88c5b2619df334b3d12cd2b68f3a61913 100644 (file)
@@ -19,6 +19,7 @@
  */
 package org.sonar.application.cluster;
 
+import java.net.InetAddress;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.DisableOnDebug;
@@ -26,14 +27,18 @@ import org.junit.rules.ExpectedException;
 import org.junit.rules.TestRule;
 import org.junit.rules.Timeout;
 import org.sonar.application.AppStateListener;
+import org.sonar.application.config.TestAppSettings;
 import org.sonar.process.MessageException;
+import org.sonar.process.NetworkUtils;
 import org.sonar.process.ProcessId;
+import org.sonar.process.cluster.NodeType;
+import org.sonar.process.cluster.hz.HazelcastMember;
+import org.sonar.process.cluster.hz.HazelcastMemberBuilder;
 
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.timeout;
 import static org.mockito.Mockito.verify;
-import static org.sonar.application.cluster.HazelcastTesting.newHzMember;
 import static org.sonar.process.cluster.hz.HazelcastObjects.CLUSTER_NAME;
 import static org.sonar.process.cluster.hz.HazelcastObjects.SONARQUBE_VERSION;
 
@@ -47,7 +52,7 @@ public class ClusterAppStateImplTest {
 
   @Test
   public void tryToLockWebLeader_returns_true_only_for_the_first_call() throws Exception {
-    try (ClusterAppStateImpl underTest = new ClusterAppStateImpl(newHzMember())) {
+    try (ClusterAppStateImpl underTest = new ClusterAppStateImpl(new TestAppSettings(), newHzMember())) {
       assertThat(underTest.tryToLockWebLeader()).isEqualTo(true);
       assertThat(underTest.tryToLockWebLeader()).isEqualTo(false);
     }
@@ -56,7 +61,7 @@ public class ClusterAppStateImplTest {
   @Test
   public void test_listeners() throws InterruptedException {
     AppStateListener listener = mock(AppStateListener.class);
-    try (ClusterAppStateImpl underTest = new ClusterAppStateImpl(newHzMember())) {
+    try (ClusterAppStateImpl underTest = new ClusterAppStateImpl(new TestAppSettings(), newHzMember())) {
       underTest.addListener(listener);
 
       underTest.setOperational(ProcessId.ELASTICSEARCH);
@@ -72,7 +77,7 @@ public class ClusterAppStateImplTest {
   @Test
   public void registerSonarQubeVersion_publishes_version_on_first_call() {
 
-    try (ClusterAppStateImpl underTest = new ClusterAppStateImpl(newHzMember())) {
+    try (ClusterAppStateImpl underTest = new ClusterAppStateImpl(new TestAppSettings(), newHzMember())) {
       underTest.registerSonarQubeVersion("6.4.1.5");
 
       assertThat(underTest.getHazelcastMember().getAtomicReference(SONARQUBE_VERSION).get())
@@ -82,7 +87,7 @@ public class ClusterAppStateImplTest {
 
   @Test
   public void registerClusterName_publishes_clusterName_on_first_call() {
-    try (ClusterAppStateImpl underTest = new ClusterAppStateImpl(newHzMember())) {
+    try (ClusterAppStateImpl underTest = new ClusterAppStateImpl(new TestAppSettings(), newHzMember())) {
       underTest.registerClusterName("foo");
 
       assertThat(underTest.getHazelcastMember().getAtomicReference(CLUSTER_NAME).get())
@@ -92,7 +97,7 @@ public class ClusterAppStateImplTest {
 
   @Test
   public void reset_always_throws_ISE() {
-    try (ClusterAppStateImpl underTest = new ClusterAppStateImpl(newHzMember())) {
+    try (ClusterAppStateImpl underTest = new ClusterAppStateImpl(new TestAppSettings(), newHzMember())) {
       expectedException.expect(IllegalStateException.class);
       expectedException.expectMessage("state reset is not supported in cluster mode");
 
@@ -103,7 +108,7 @@ public class ClusterAppStateImplTest {
   @Test
   public void registerSonarQubeVersion_throws_ISE_if_initial_version_is_different() throws Exception {
     // Now launch an instance that try to be part of the hzInstance cluster
-    try (ClusterAppStateImpl underTest = new ClusterAppStateImpl(newHzMember())) {
+    try (ClusterAppStateImpl underTest = new ClusterAppStateImpl(new TestAppSettings(), newHzMember())) {
       // Register first version
       underTest.getHazelcastMember().getAtomicReference(SONARQUBE_VERSION).set("6.6.0.1111");
 
@@ -117,7 +122,7 @@ public class ClusterAppStateImplTest {
 
   @Test
   public void registerClusterName_throws_MessageException_if_clusterName_is_different() throws Exception {
-    try (ClusterAppStateImpl underTest = new ClusterAppStateImpl(newHzMember())) {
+    try (ClusterAppStateImpl underTest = new ClusterAppStateImpl(new TestAppSettings(), newHzMember())) {
       // Register first version
       underTest.getHazelcastMember().getAtomicReference(CLUSTER_NAME).set("goodClusterName");
 
@@ -128,4 +133,18 @@ public class ClusterAppStateImplTest {
       underTest.registerClusterName("badClusterName");
     }
   }
+
+  private static HazelcastMember newHzMember() {
+    // use loopback for support of offline builds
+    InetAddress loopback = InetAddress.getLoopbackAddress();
+
+    return new HazelcastMemberBuilder()
+      .setNodeType(NodeType.APPLICATION)
+      .setProcessId(ProcessId.COMPUTE_ENGINE)
+      .setClusterName("foo")
+      .setNodeName("bar")
+      .setPort(NetworkUtils.INSTANCE.getNextAvailablePort(loopback))
+      .setNetworkInterface(loopback.getHostAddress())
+      .build();
+  }
 }
diff --git a/server/sonar-main/src/test/java/org/sonar/application/cluster/HazelcastTesting.java b/server/sonar-main/src/test/java/org/sonar/application/cluster/HazelcastTesting.java
deleted file mode 100644 (file)
index cf8fea2..0000000
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * SonarQube
- * Copyright (C) 2009-2017 SonarSource SA
- * mailto:info AT sonarsource DOT com
- *
- * This program is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License as published by the Free Software Foundation; either
- * version 3 of the License, or (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public License
- * along with this program; if not, write to the Free Software Foundation,
- * Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
- */
-package org.sonar.application.cluster;
-
-import java.net.InetAddress;
-import org.sonar.process.NetworkUtils;
-import org.sonar.process.ProcessId;
-import org.sonar.process.cluster.NodeType;
-import org.sonar.process.cluster.hz.HazelcastMember;
-import org.sonar.process.cluster.hz.HazelcastMemberBuilder;
-
-public class HazelcastTesting {
-
-  private HazelcastTesting() {
-    // do not instantiate
-  }
-
-  public static HazelcastMember newHzMember() {
-    // use loopback for support of offline builds
-    InetAddress loopback = InetAddress.getLoopbackAddress();
-
-    return new HazelcastMemberBuilder()
-      .setNodeType(NodeType.APPLICATION)
-      .setProcessId(ProcessId.COMPUTE_ENGINE)
-      .setClusterName("foo")
-      .setNodeName("bar")
-      .setPort(NetworkUtils.INSTANCE.getNextAvailablePort(loopback))
-      .setNetworkInterface(loopback.getHostAddress())
-      .build();
-  }
-}
diff --git a/server/sonar-process/src/main/java/org/sonar/process/cluster/hz/DistributedAnswer.java b/server/sonar-process/src/main/java/org/sonar/process/cluster/hz/DistributedAnswer.java
new file mode 100644 (file)
index 0000000..25c60a3
--- /dev/null
@@ -0,0 +1,73 @@
+/*
+ * SonarQube
+ * Copyright (C) 2009-2017 SonarSource SA
+ * mailto:info AT sonarsource DOT com
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 3 of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this program; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
+ */
+package org.sonar.process.cluster.hz;
+
+import com.hazelcast.core.Member;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+/**
+ * Answer of {@link DistributedCall}, aggregating the answers from
+ * all the target members.
+ */
+public class DistributedAnswer<T> {
+
+  private final Map<Member, T> answers = new HashMap<>();
+  private final Set<Member> timedOutMembers = new HashSet<>();
+  private final Map<Member, Exception> failedMembers = new HashMap<>();
+
+  public Optional<T> getAnswer(Member member) {
+    return Optional.ofNullable(answers.get(member));
+  }
+
+  public boolean hasTimedOut(Member member) {
+    return timedOutMembers.contains(member);
+  }
+
+  public Optional<Exception> getFailed(Member member) {
+    return Optional.ofNullable(failedMembers.get(member));
+  }
+
+  public Collection<Member> getMembers() {
+    List<Member> members = new ArrayList<>();
+    members.addAll(answers.keySet());
+    members.addAll(timedOutMembers);
+    members.addAll(failedMembers.keySet());
+    return members;
+  }
+
+  void setAnswer(Member member, T answer) {
+    this.answers.put(member, answer);
+  }
+
+  void setTimedOut(Member member) {
+    this.timedOutMembers.add(member);
+  }
+
+  void setFailed(Member member, Exception e) {
+    failedMembers.put(member, e);
+  }
+}
diff --git a/server/sonar-process/src/main/java/org/sonar/process/cluster/hz/DistributedCall.java b/server/sonar-process/src/main/java/org/sonar/process/cluster/hz/DistributedCall.java
new file mode 100644 (file)
index 0000000..c55668e
--- /dev/null
@@ -0,0 +1,26 @@
+/*
+ * SonarQube
+ * Copyright (C) 2009-2017 SonarSource SA
+ * mailto:info AT sonarsource DOT com
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 3 of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this program; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
+ */
+package org.sonar.process.cluster.hz;
+
+import java.io.Serializable;
+import java.util.concurrent.Callable;
+
+public interface DistributedCall<T> extends Callable<T>, Serializable {
+}
index 7521fc9c4ea5fb10533dd8ac3a27b58f835f9bcb..0a70741d02067bcb5a2cde7fe21d68e8aa490102 100644 (file)
@@ -21,6 +21,7 @@ package org.sonar.process.cluster.hz;
 
 import com.hazelcast.core.Cluster;
 import com.hazelcast.core.IAtomicReference;
+import com.hazelcast.core.MemberSelector;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -98,6 +99,19 @@ public interface HazelcastMember extends AutoCloseable {
 
   Cluster getCluster();
 
+  /**
+   * Runs a distributed query on a set of Hazelcast members.
+   *
+   * @param callable the query that is executed on all target members. Be careful of classloader, don't use classes
+   *                 that are not available in classpath of target members.
+   * @param memberSelector the subset of members to target. See {@link com.hazelcast.cluster.memberselector.MemberSelectors}
+   *                       for utilities.
+   * @param timeoutMs the total timeout to get responses from all target members, in milliseconds. If timeout is reached, then
+   *                the members that didn't answer on time are marked as timed-out in {@link DistributedAnswer}
+   */
+  <T> DistributedAnswer<T> call(DistributedCall<T> callable, MemberSelector memberSelector, long timeoutMs)
+    throws InterruptedException;
+
   @Override
   void close();
 }
index 606c58928003777807cf54a9965e7ce0a3a5aabe..118b4e6ee1d40ca3e8e6392dd1e3887acda0209a 100644 (file)
@@ -23,10 +23,16 @@ import com.hazelcast.core.Cluster;
 import com.hazelcast.core.HazelcastInstance;
 import com.hazelcast.core.HazelcastInstanceNotActiveException;
 import com.hazelcast.core.IAtomicReference;
+import com.hazelcast.core.IExecutorService;
 import com.hazelcast.core.Member;
+import com.hazelcast.core.MemberSelector;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import java.util.concurrent.locks.Lock;
 import java.util.stream.Collectors;
 import org.slf4j.LoggerFactory;
@@ -89,6 +95,32 @@ class HazelcastMemberImpl implements HazelcastMember {
     return hzInstance.getCluster();
   }
 
+  @Override
+  public <T> DistributedAnswer<T> call(DistributedCall<T> callable, MemberSelector memberSelector, long timeoutMs)
+    throws InterruptedException {
+
+    IExecutorService executor = hzInstance.getExecutorService("default");
+    Map<Member, Future<T>> futures = executor.submitToMembers(callable, memberSelector);
+    try {
+      DistributedAnswer<T> distributedAnswer = new DistributedAnswer<>();
+      long maxTime = System.currentTimeMillis() + timeoutMs;
+      for (Map.Entry<Member, Future<T>> entry : futures.entrySet()) {
+        long remainingMs = Math.max(maxTime - System.currentTimeMillis(), 5L);
+        try {
+          T answer = entry.getValue().get(remainingMs, TimeUnit.MILLISECONDS);
+          distributedAnswer.setAnswer(entry.getKey(), answer);
+        } catch (ExecutionException e) {
+          distributedAnswer.setFailed(entry.getKey(), e);
+        } catch (TimeoutException e) {
+          distributedAnswer.setTimedOut(entry.getKey());
+        }
+      }
+      return distributedAnswer;
+    } finally {
+      futures.values().forEach(f -> f.cancel(true));
+    }
+  }
+
   @Override
   public void close() {
     try {
diff --git a/server/sonar-process/src/test/java/org/sonar/process/cluster/hz/DistributedAnswerTest.java b/server/sonar-process/src/test/java/org/sonar/process/cluster/hz/DistributedAnswerTest.java
new file mode 100644 (file)
index 0000000..6c9d827
--- /dev/null
@@ -0,0 +1,84 @@
+/*
+ * SonarQube
+ * Copyright (C) 2009-2017 SonarSource SA
+ * mailto:info AT sonarsource DOT com
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 3 of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this program; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
+ */
+package org.sonar.process.cluster.hz;
+
+import com.hazelcast.core.Member;
+import java.io.IOException;
+import org.junit.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+
+public class DistributedAnswerTest {
+
+  private Member member = newMember("member1");
+  private DistributedAnswer underTest = new DistributedAnswer();
+
+  @Test
+  public void test_call_with_unknown_member() {
+    assertThat(underTest.getAnswer(member)).isEmpty();
+    assertThat(underTest.hasTimedOut(member)).isFalse();
+    assertThat(underTest.getFailed(member)).isEmpty();
+  }
+
+  @Test
+  public void test_setAnswer() {
+    underTest.setAnswer(member, "foo");
+
+    assertThat(underTest.getAnswer(member)).hasValue("foo");
+    assertThat(underTest.hasTimedOut(member)).isFalse();
+  }
+
+  @Test
+  public void test_setTimedOut() {
+    underTest.setTimedOut(member);
+
+    assertThat(underTest.getAnswer(member)).isEmpty();
+    assertThat(underTest.hasTimedOut(member)).isTrue();
+  }
+
+  @Test
+  public void test_setFailed() {
+    IOException e = new IOException();
+    underTest.setFailed(member, e);
+
+    assertThat(underTest.getFailed(member)).hasValue(e);
+  }
+
+  @Test
+  public void member_can_be_referenced_multiple_times() {
+    underTest.setTimedOut(member);
+    underTest.setAnswer(member, "foo");
+    IOException exception = new IOException();
+    underTest.setFailed(member, exception);
+
+    assertThat(underTest.hasTimedOut(member)).isTrue();
+    assertThat(underTest.getAnswer(member)).hasValue("foo");
+    assertThat(underTest.getFailed(member)).hasValue(exception);
+  }
+
+  private static Member newMember(String uuid) {
+    Member member = mock(Member.class);
+    when(member.getUuid()).thenReturn(uuid);
+    return member;
+  }
+}
diff --git a/server/sonar-process/src/test/java/org/sonar/process/cluster/hz/FailedDistributedCall.java b/server/sonar-process/src/test/java/org/sonar/process/cluster/hz/FailedDistributedCall.java
new file mode 100644 (file)
index 0000000..03a13d6
--- /dev/null
@@ -0,0 +1,37 @@
+/*
+ * SonarQube
+ * Copyright (C) 2009-2017 SonarSource SA
+ * mailto:info AT sonarsource DOT com
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 3 of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this program; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
+ */
+package org.sonar.process.cluster.hz;
+
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicLong;
+
+public class FailedDistributedCall implements DistributedCall<Long> {
+  static final AtomicLong COUNTER = new AtomicLong();
+
+  @Override
+  public Long call() throws Exception {
+    long value = COUNTER.getAndIncrement();
+    if (value == 1L) {
+      // only the second call fails
+      throw new IOException("BOOM");
+    }
+    return value;
+  }
+}
diff --git a/server/sonar-process/src/test/java/org/sonar/process/cluster/hz/HazelcastMemberImplTest.java b/server/sonar-process/src/test/java/org/sonar/process/cluster/hz/HazelcastMemberImplTest.java
new file mode 100644 (file)
index 0000000..7208791
--- /dev/null
@@ -0,0 +1,146 @@
+/*
+ * SonarQube
+ * Copyright (C) 2009-2017 SonarSource SA
+ * mailto:info AT sonarsource DOT com
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 3 of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this program; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
+ */
+package org.sonar.process.cluster.hz;
+
+import com.hazelcast.cluster.memberselector.MemberSelectors;
+import com.hazelcast.core.Member;
+import java.net.InetAddress;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.DisableOnDebug;
+import org.junit.rules.ExpectedException;
+import org.junit.rules.TestRule;
+import org.junit.rules.Timeout;
+import org.sonar.process.NetworkUtils;
+import org.sonar.process.ProcessId;
+import org.sonar.process.cluster.NodeType;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+public class HazelcastMemberImplTest {
+
+  @Rule
+  public ExpectedException expectedException = ExpectedException.none();
+  @Rule
+  public TestRule safeguardTimeout = new DisableOnDebug(Timeout.seconds(60));
+
+  // use loopback for support of offline builds
+  private static InetAddress loopback = InetAddress.getLoopbackAddress();
+  private static HazelcastMember member1;
+  private static HazelcastMember member2;
+  private static HazelcastMember member3;
+
+  @BeforeClass
+  public static void setUp() throws Exception {
+    int port1 = NetworkUtils.INSTANCE.getNextAvailablePort(loopback);
+    int port2 = NetworkUtils.INSTANCE.getNextAvailablePort(loopback);
+    int port3 = NetworkUtils.INSTANCE.getNextAvailablePort(loopback);
+    member1 = newHzMember(port1, port2, port3);
+    member2 = newHzMember(port2, port1, port3);
+    member3 = newHzMember(port3, port1, port2);
+  }
+
+  @AfterClass
+  public static void tearDown() throws Exception {
+    member1.close();
+    member2.close();
+    member3.close();
+  }
+
+  @Test
+  public void call_executes_query_on_members() throws Exception {
+    SuccessfulDistributedCall.COUNTER.set(0L);
+    DistributedCall<Long> call = new SuccessfulDistributedCall();
+
+    DistributedAnswer<Long> answer = member1.call(call, MemberSelectors.DATA_MEMBER_SELECTOR, 30_000L);
+
+    assertThat(answer.getMembers()).extracting(Member::getUuid).containsOnlyOnce(member1.getUuid(), member2.getUuid(), member3.getUuid());
+    assertThat(extractAnswers(answer)).containsOnlyOnce(0L, 1L, 2L);
+  }
+
+  @Test
+  public void timed_out_calls_do_not_break_other_answers() throws InterruptedException {
+    // member 1 and 3 success, member 2 times-out
+    TimedOutDistributedCall.COUNTER.set(0L);
+    DistributedCall call = new TimedOutDistributedCall();
+    DistributedAnswer<Long> answer = member1.call(call, MemberSelectors.DATA_MEMBER_SELECTOR, 2_000L);
+
+    assertThat(extractAnswers(answer)).containsOnlyOnce(0L, 2L);
+
+    assertThat(extractTimeOuts(answer)).containsExactlyInAnyOrder(false, false, true);
+  }
+
+  @Test
+  public void failed_calls_do_not_break_other_answers() throws InterruptedException {
+    // member 1 and 3 success, member 2 fails
+    FailedDistributedCall.COUNTER.set(0L);
+    DistributedCall call = new FailedDistributedCall();
+    DistributedAnswer<Long> answer = member1.call(call, MemberSelectors.DATA_MEMBER_SELECTOR, 2_000L);
+
+    // 2 successful answers
+    assertThat(extractAnswers(answer)).containsOnlyOnce(0L, 2L);
+
+    // 1 failure
+    List<Exception> failures = extractFailures(answer);
+    assertThat(failures).hasSize(1);
+    assertThat(failures.get(0)).hasMessageContaining("BOOM");
+  }
+
+  private static HazelcastMember newHzMember(int port, int... otherPorts) {
+    return new HazelcastMemberBuilder()
+      .setNodeType(NodeType.APPLICATION)
+      .setProcessId(ProcessId.COMPUTE_ENGINE)
+      .setClusterName("foo")
+      .setNodeName("name" + port)
+      .setPort(port)
+      .setNetworkInterface(loopback.getHostAddress())
+      .setMembers(Arrays.stream(otherPorts).mapToObj(p -> loopback.getHostAddress() + ":" + p).collect(Collectors.toList()))
+      .build();
+  }
+
+  private static Set<Long> extractAnswers(DistributedAnswer<Long> answer) {
+    return answer.getMembers().stream()
+      .map(answer::getAnswer)
+      .filter(Optional::isPresent)
+      .map(Optional::get)
+      .collect(Collectors.toSet());
+  }
+
+  private static List<Exception> extractFailures(DistributedAnswer<Long> answer) {
+    return answer.getMembers().stream()
+      .map(answer::getFailed)
+      .filter(Optional::isPresent)
+      .map(Optional::get)
+      .collect(Collectors.toList());
+  }
+
+  private static List<Boolean> extractTimeOuts(DistributedAnswer<Long> answer) {
+    return answer.getMembers().stream()
+      .map(answer::hasTimedOut)
+      .collect(Collectors.toList());
+  }
+}
diff --git a/server/sonar-process/src/test/java/org/sonar/process/cluster/hz/SuccessfulDistributedCall.java b/server/sonar-process/src/test/java/org/sonar/process/cluster/hz/SuccessfulDistributedCall.java
new file mode 100644 (file)
index 0000000..5159656
--- /dev/null
@@ -0,0 +1,31 @@
+/*
+ * SonarQube
+ * Copyright (C) 2009-2017 SonarSource SA
+ * mailto:info AT sonarsource DOT com
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 3 of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this program; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
+ */
+package org.sonar.process.cluster.hz;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+public class SuccessfulDistributedCall implements DistributedCall<Long> {
+  static final AtomicLong COUNTER = new AtomicLong();
+
+  @Override
+  public Long call() throws Exception {
+    return COUNTER.getAndIncrement();
+  }
+}
diff --git a/server/sonar-process/src/test/java/org/sonar/process/cluster/hz/TimedOutDistributedCall.java b/server/sonar-process/src/test/java/org/sonar/process/cluster/hz/TimedOutDistributedCall.java
new file mode 100644 (file)
index 0000000..05136ae
--- /dev/null
@@ -0,0 +1,36 @@
+/*
+ * SonarQube
+ * Copyright (C) 2009-2017 SonarSource SA
+ * mailto:info AT sonarsource DOT com
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 3 of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this program; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
+ */
+package org.sonar.process.cluster.hz;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+public class TimedOutDistributedCall implements DistributedCall<Long> {
+  static final AtomicLong COUNTER = new AtomicLong();
+
+  @Override
+  public Long call() throws Exception {
+    long value = COUNTER.getAndIncrement();
+    if (value == 1L) {
+      // only the second call times out
+      Thread.sleep(30_000L);
+    }
+    return value;
+  }
+}
index f9347fd55bffe25981f7a06282b2a2bd21f85820..e746c21435dc26b765ce0cac2b6d1ba60988c01a 100644 (file)
@@ -21,6 +21,7 @@ package org.sonar.server.cluster;
 
 import com.hazelcast.core.Cluster;
 import com.hazelcast.core.IAtomicReference;
+import com.hazelcast.core.MemberSelector;
 import java.net.InetAddress;
 import java.net.UnknownHostException;
 import java.util.List;
@@ -33,6 +34,8 @@ import org.sonar.process.NetworkUtils;
 import org.sonar.process.ProcessId;
 import org.sonar.process.ProcessProperties;
 import org.sonar.process.cluster.NodeType;
+import org.sonar.process.cluster.hz.DistributedAnswer;
+import org.sonar.process.cluster.hz.DistributedCall;
 import org.sonar.process.cluster.hz.HazelcastMember;
 import org.sonar.process.cluster.hz.HazelcastMemberBuilder;
 
@@ -110,6 +113,12 @@ public class StartableHazelcastMember implements HazelcastMember, Startable {
     return nonNullMember().getCluster();
   }
 
+  @Override
+  public <T> DistributedAnswer<T> call(DistributedCall<T> callable, MemberSelector memberSelector, long timeoutMs)
+    throws InterruptedException {
+    return nonNullMember().call(callable, memberSelector, timeoutMs);
+  }
+
   private HazelcastMember nonNullMember() {
     return requireNonNull(member, "Hazelcast member not started");
   }