Browse Source

SONAR-8986 add CeDistributedInformation

tags/6.4-RC1
Sébastien Lesaint 7 years ago
parent
commit
e4d3426880
25 changed files with 1243 additions and 36 deletions
  1. 4
    0
      server/sonar-ce/pom.xml
  2. 37
    0
      server/sonar-ce/src/main/java/org/sonar/ce/CeDistributedInformation.java
  3. 74
    0
      server/sonar-ce/src/main/java/org/sonar/ce/CeDistributedInformationImpl.java
  4. 49
    0
      server/sonar-ce/src/main/java/org/sonar/ce/StandaloneCeDistributedInformation.java
  5. 69
    0
      server/sonar-ce/src/main/java/org/sonar/ce/cluster/HazelcastClientWrapper.java
  6. 117
    0
      server/sonar-ce/src/main/java/org/sonar/ce/cluster/HazelcastClientWrapperImpl.java
  7. 23
    0
      server/sonar-ce/src/main/java/org/sonar/ce/cluster/package-info.java
  8. 15
    0
      server/sonar-ce/src/main/java/org/sonar/ce/container/ComputeEngineContainerImpl.java
  9. 6
    1
      server/sonar-ce/src/main/java/org/sonar/ce/queue/CeQueueInitializer.java
  10. 115
    0
      server/sonar-ce/src/test/java/org/sonar/ce/CeDistributedInformationImplTest.java
  11. 70
    0
      server/sonar-ce/src/test/java/org/sonar/ce/StandaloneCeDistributedInformationTest.java
  12. 288
    0
      server/sonar-ce/src/test/java/org/sonar/ce/cluster/HazelcastClientWrapperImplTest.java
  13. 95
    0
      server/sonar-ce/src/test/java/org/sonar/ce/cluster/HazelcastTestHelper.java
  14. 66
    13
      server/sonar-ce/src/test/java/org/sonar/ce/container/ComputeEngineContainerImplTest.java
  15. 2
    1
      server/sonar-ce/src/test/java/org/sonar/ce/queue/CeQueueInitializerTest.java
  16. 7
    0
      server/sonar-process-monitor/pom.xml
  17. 4
    0
      server/sonar-process-monitor/src/main/java/org/sonar/application/cluster/AppStateClusterImpl.java
  18. 33
    10
      server/sonar-process-monitor/src/main/java/org/sonar/application/cluster/HazelcastCluster.java
  19. 1
    1
      server/sonar-process-monitor/src/test/java/org/sonar/application/cluster/AppStateClusterImplTest.java
  20. 83
    6
      server/sonar-process-monitor/src/test/java/org/sonar/application/cluster/HazelcastClusterTest.java
  21. 0
    2
      server/sonar-process-monitor/src/test/java/org/sonar/application/cluster/HazelcastTestHelper.java
  22. 4
    0
      server/sonar-process/src/main/java/org/sonar/process/ProcessProperties.java
  23. 56
    0
      server/sonar-process/src/main/java/org/sonar/process/cluster/ClusterObjectKeys.java
  24. 23
    0
      server/sonar-process/src/main/java/org/sonar/process/cluster/package-info.java
  25. 2
    2
      sonar-application/pom.xml

+ 4
- 0
server/sonar-ce/pom.xml View File

@@ -30,6 +30,10 @@
<artifactId>jsr305</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.hazelcast</groupId>
<artifactId>hazelcast-client</artifactId>
</dependency>

<!-- unit tests -->
<dependency>

+ 37
- 0
server/sonar-ce/src/main/java/org/sonar/ce/CeDistributedInformation.java View File

@@ -0,0 +1,37 @@
/*
* SonarQube
* Copyright (C) 2009-2017 SonarSource SA
* mailto:info AT sonarsource DOT com
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License as published by the Free Software Foundation; either
* version 3 of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with this program; if not, write to the Free Software Foundation,
* Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/

package org.sonar.ce;

import java.util.Set;

/**
* CeDistributedInformation is the interface to be implemented in order
* to implement information shared by all CE nodes
*/
public interface CeDistributedInformation {
Set<String> getWorkerUUIDs();

/**
* This method must be called once the workers of the current Compute Engine node
* are up so that they are shared with other Compute Engine nodes
*/
void broadcastWorkerUUIDs();
}

+ 74
- 0
server/sonar-ce/src/main/java/org/sonar/ce/CeDistributedInformationImpl.java View File

@@ -0,0 +1,74 @@
/*
* SonarQube
* Copyright (C) 2009-2017 SonarSource SA
* mailto:info AT sonarsource DOT com
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License as published by the Free Software Foundation; either
* version 3 of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with this program; if not, write to the Free Software Foundation,
* Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/

package org.sonar.ce;

import java.util.Map;
import java.util.Set;
import org.picocontainer.Startable;
import org.sonar.ce.cluster.HazelcastClientWrapper;
import org.sonar.ce.taskprocessor.CeWorkerFactory;

import static org.sonar.core.util.stream.MoreCollectors.toSet;
import static org.sonar.process.cluster.ClusterObjectKeys.WORKER_UUIDS;

/**
* Provide the set of worker's UUID in a clustered SonarQube instance
*/
public class CeDistributedInformationImpl implements CeDistributedInformation, Startable {
private final HazelcastClientWrapper hazelcastClientWrapper;
private final CeWorkerFactory ceCeWorkerFactory;

public CeDistributedInformationImpl(HazelcastClientWrapper hazelcastClientWrapper, CeWorkerFactory ceCeWorkerFactory) {
this.hazelcastClientWrapper = hazelcastClientWrapper;
this.ceCeWorkerFactory = ceCeWorkerFactory;
}

@Override
public Set<String> getWorkerUUIDs() {
Set<String> connectedWorkerUUIDs = hazelcastClientWrapper.getConnectedClients();

return getClusteredWorkerUUIDs().entrySet().stream()
.filter(e -> connectedWorkerUUIDs.contains(e.getKey()))
.map(Map.Entry::getValue)
.flatMap(Set::stream)
.collect(toSet());
}

@Override
public void broadcastWorkerUUIDs() {
getClusteredWorkerUUIDs().put(hazelcastClientWrapper.getClientUUID(), ceCeWorkerFactory.getWorkerUUIDs());
}

@Override
public void start() {
// Nothing to do here
}

@Override
public void stop() {
// Removing the worker UUIDs
getClusteredWorkerUUIDs().remove(hazelcastClientWrapper.getClientUUID());
}

private Map<String, Set<String>> getClusteredWorkerUUIDs() {
return hazelcastClientWrapper.getReplicatedMap(WORKER_UUIDS);
}
}

+ 49
- 0
server/sonar-ce/src/main/java/org/sonar/ce/StandaloneCeDistributedInformation.java View File

@@ -0,0 +1,49 @@
/*
* SonarQube
* Copyright (C) 2009-2017 SonarSource SA
* mailto:info AT sonarsource DOT com
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License as published by the Free Software Foundation; either
* version 3 of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with this program; if not, write to the Free Software Foundation,
* Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/

package org.sonar.ce;

import java.util.Set;
import org.sonar.ce.taskprocessor.CeWorkerFactory;

import static com.google.common.base.Preconditions.checkState;

/**
* Provide the set of worker's UUID in a non clustered SonarQube instance
*/
public class StandaloneCeDistributedInformation implements CeDistributedInformation {
private final CeWorkerFactory ceCeWorkerFactory;
private Set<String> workerUUIDs;

public StandaloneCeDistributedInformation(CeWorkerFactory ceCeWorkerFactory) {
this.ceCeWorkerFactory = ceCeWorkerFactory;
}

@Override
public Set<String> getWorkerUUIDs() {
checkState(workerUUIDs != null, "Invalid call, broadcastWorkerUUIDs() must be called first.");
return workerUUIDs;
}

@Override
public void broadcastWorkerUUIDs() {
workerUUIDs = ceCeWorkerFactory.getWorkerUUIDs();
}
}

+ 69
- 0
server/sonar-ce/src/main/java/org/sonar/ce/cluster/HazelcastClientWrapper.java View File

@@ -0,0 +1,69 @@
/*
* SonarQube
* Copyright (C) 2009-2017 SonarSource SA
* mailto:info AT sonarsource DOT com
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License as published by the Free Software Foundation; either
* version 3 of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with this program; if not, write to the Free Software Foundation,
* Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/

package org.sonar.ce.cluster;

import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.locks.Lock;

/**
* The interface Hazelcast client wrapper.
*/
public interface HazelcastClientWrapper {
/**
* Gets the set shared by the cluster and identified by name
*/
<E> Set<E> getSet(String name);

/**
* Gets the list shared by the cluster and identified by name
*/
<E> List<E> getList(String name);

/**
* Gets the map shared by the cluster and identified by name
*/
<K, V> Map<K, V> getMap(String name);

/**
* Gets the replicated map shared by the cluster and identified by name
*/
<K,V> Map<K,V> getReplicatedMap(String name);

/**
* Retrieve the local UUID
*/
String getClientUUID();

/**
* Retrieve the Set of connected clients.
* The client is only CE for the time being
*
* @return the connected clients
*/
Set<String> getConnectedClients();

/**
* Gets lock among the cluster, identified by name
*/
Lock getLock(String name);
}

+ 117
- 0
server/sonar-ce/src/main/java/org/sonar/ce/cluster/HazelcastClientWrapperImpl.java View File

@@ -0,0 +1,117 @@
/*
* SonarQube
* Copyright (C) 2009-2017 SonarSource SA
* mailto:info AT sonarsource DOT com
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License as published by the Free Software Foundation; either
* version 3 of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with this program; if not, write to the Free Software Foundation,
* Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/

package org.sonar.ce.cluster;

import com.google.common.annotations.VisibleForTesting;
import com.hazelcast.client.HazelcastClient;
import com.hazelcast.client.config.ClientConfig;
import com.hazelcast.core.HazelcastInstance;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.locks.Lock;
import org.picocontainer.Startable;
import org.sonar.api.config.Settings;
import org.sonar.process.ProcessProperties;

import static com.google.common.base.Preconditions.checkState;
import static org.apache.commons.lang.StringUtils.isNotEmpty;
import static org.sonar.process.cluster.ClusterObjectKeys.CLIENT_UUIDS;

/**
* This class will connect as a Hazelcast client to the local instance of Hazelcluster
*/
public class HazelcastClientWrapperImpl implements Startable, HazelcastClientWrapper {

private final ClientConfig hzConfig;

@VisibleForTesting
protected HazelcastInstance hzInstance;

public HazelcastClientWrapperImpl(Settings settings) {
boolean clusterEnabled = settings.getBoolean(ProcessProperties.CLUSTER_ENABLED);
String clusterName = settings.getString(ProcessProperties.CLUSTER_NAME);
String clusterLocalEndPoint = settings.getString(ProcessProperties.CLUSTER_LOCALENDPOINT);

checkState(clusterEnabled, "Cluster is not enabled");
checkState(isNotEmpty(clusterLocalEndPoint), "LocalEndPoint have not been set");
checkState(isNotEmpty(clusterName), "sonar.cluster.name is missing");

hzConfig = new ClientConfig();
hzConfig.getGroupConfig().setName(clusterName);
hzConfig.getNetworkConfig().addAddress(clusterLocalEndPoint);

// Tweak HazelCast configuration
hzConfig
// Increase the number of tries
.setProperty("hazelcast.tcp.join.port.try.count", "10")
// Don't phone home
.setProperty("hazelcast.phone.home.enabled", "false")
// Use slf4j for logging
.setProperty("hazelcast.logging.type", "slf4j");
}

@Override
public <E> Set<E> getSet(String name) {
return hzInstance.getSet(name);
}

@Override
public <E> List<E> getList(String name) {
return hzInstance.getList(name);
}

@Override
public <K, V> Map<K, V> getMap(String name) {
return hzInstance.getMap(name);
}

@Override
public <K,V> Map<K,V> getReplicatedMap(String name) {
return hzInstance.getReplicatedMap(name);
}

@Override
public String getClientUUID() {
return hzInstance.getLocalEndpoint().getUuid();
}

@Override
public Set<String> getConnectedClients() {
return hzInstance.getSet(CLIENT_UUIDS);
}

@Override
public Lock getLock(String name) {
return hzInstance.getLock(name);
}

@Override
public void start() {
this.hzInstance = HazelcastClient.newHazelcastClient(hzConfig);
}

@Override
public void stop() {
// Shutdown Hazelcast properly
hzInstance.shutdown();
}
}

+ 23
- 0
server/sonar-ce/src/main/java/org/sonar/ce/cluster/package-info.java View File

@@ -0,0 +1,23 @@
/*
* SonarQube
* Copyright (C) 2009-2017 SonarSource SA
* mailto:info AT sonarsource DOT com
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License as published by the Free Software Foundation; either
* version 3 of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with this program; if not, write to the Free Software Foundation,
* Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
@ParametersAreNonnullByDefault
package org.sonar.ce.cluster;

import javax.annotation.ParametersAreNonnullByDefault;

+ 15
- 0
server/sonar-ce/src/main/java/org/sonar/ce/container/ComputeEngineContainerImpl.java View File

@@ -44,6 +44,7 @@ import org.sonar.ce.CeHttpModule;
import org.sonar.ce.CeQueueModule;
import org.sonar.ce.CeTaskCommonsModule;
import org.sonar.ce.cleaning.CeCleaningModule;
import org.sonar.ce.cluster.HazelcastClientWrapperImpl;
import org.sonar.ce.db.ReadOnlyPropertiesDao;
import org.sonar.ce.log.CeProcessLogging;
import org.sonar.ce.platform.ComputeEngineExtensionInstaller;
@@ -52,6 +53,8 @@ import org.sonar.ce.queue.PurgeCeActivities;
import org.sonar.ce.settings.ProjectSettingsFactory;
import org.sonar.ce.taskprocessor.CeTaskProcessorModule;
import org.sonar.ce.user.CeUserSession;
import org.sonar.ce.CeDistributedInformationImpl;
import org.sonar.ce.StandaloneCeDistributedInformation;
import org.sonar.core.component.DefaultResourceTypes;
import org.sonar.core.config.CorePropertyDefinitions;
import org.sonar.core.i18n.DefaultI18n;
@@ -174,6 +177,18 @@ public class ComputeEngineContainerImpl implements ComputeEngineContainer {

this.level4 = level3.createChild();
this.level4.add(level4Components());

// TODO refactoring levelXComponents()
if (props.valueAsBoolean("sonar.cluster.enabled")) {
this.level4.add(
HazelcastClientWrapperImpl.class,
CeDistributedInformationImpl.class
);
} else {
this.level4.add(
StandaloneCeDistributedInformation.class
);
}
configureFromModules(this.level4);
ServerExtensionInstaller extensionInstaller = this.level4.getComponentByType(ServerExtensionInstaller.class);
extensionInstaller.installExtensions(this.level4);

+ 6
- 1
server/sonar-ce/src/main/java/org/sonar/ce/queue/CeQueueInitializer.java View File

@@ -24,6 +24,7 @@ import org.sonar.api.platform.Server;
import org.sonar.api.platform.ServerStartHandler;
import org.sonar.ce.cleaning.CeCleaningScheduler;
import org.sonar.ce.taskprocessor.CeProcessingScheduler;
import org.sonar.ce.CeDistributedInformation;

/**
* Cleans-up the queue, initializes JMX counters then schedule
@@ -35,11 +36,14 @@ public class CeQueueInitializer implements ServerStartHandler {

private final CeProcessingScheduler processingScheduler;
private final CeCleaningScheduler cleaningScheduler;
private final CeDistributedInformation ceDistributedInformation;
private boolean done = false;

public CeQueueInitializer(CeProcessingScheduler processingScheduler, CeCleaningScheduler cleaningScheduler) {
public CeQueueInitializer(CeProcessingScheduler processingScheduler, CeCleaningScheduler cleaningScheduler,
CeDistributedInformation ceDistributedInformation) {
this.processingScheduler = processingScheduler;
this.cleaningScheduler = cleaningScheduler;
this.ceDistributedInformation = ceDistributedInformation;
}

@Override
@@ -51,6 +55,7 @@ public class CeQueueInitializer implements ServerStartHandler {
}

private void initCe() {
ceDistributedInformation.broadcastWorkerUUIDs();
processingScheduler.startScheduling();
cleaningScheduler.startScheduling();
}

+ 115
- 0
server/sonar-ce/src/test/java/org/sonar/ce/CeDistributedInformationImplTest.java View File

@@ -0,0 +1,115 @@
/*
* SonarQube
* Copyright (C) 2009-2017 SonarSource SA
* mailto:info AT sonarsource DOT com
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License as published by the Free Software Foundation; either
* version 3 of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with this program; if not, write to the Free Software Foundation,
* Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package org.sonar.ce;

import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import org.junit.Test;
import org.sonar.ce.cluster.HazelcastClientWrapperImpl;
import org.sonar.ce.taskprocessor.CeWorkerFactory;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.data.MapEntry.entry;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.sonar.process.cluster.ClusterObjectKeys.WORKER_UUIDS;

public class CeDistributedInformationImplTest {
private String clientUUID1 = "1";
private String clientUUID2 = "2";
private String clientUUID3 = "3";
private Map workerMap = ImmutableMap.of(
clientUUID1, ImmutableSet.of("1", "2"),
clientUUID2, ImmutableSet.of("3"),
clientUUID3, ImmutableSet.of("4", "5", "6")
);

private HazelcastClientWrapperImpl hzClientWrapper = mock(HazelcastClientWrapperImpl.class);

@Test
public void getWorkerUUIDs_returns_union_of_workers_uuids_of_local_and_cluster_worker_uuids() {
when(hzClientWrapper.getClientUUID()).thenReturn(clientUUID1);
when(hzClientWrapper.getConnectedClients()).thenReturn(ImmutableSet.of(clientUUID1, clientUUID2, clientUUID3));
when(hzClientWrapper.getReplicatedMap(WORKER_UUIDS)).thenReturn(workerMap);

CeDistributedInformation ceDistributedInformation = new CeDistributedInformationImpl(hzClientWrapper, mock(CeWorkerFactory.class));
assertThat(ceDistributedInformation.getWorkerUUIDs()).containsExactly("1", "2", "3", "4", "5", "6");
}

@Test
public void getWorkerUUIDs_must_filter_absent_client() {
when(hzClientWrapper.getClientUUID()).thenReturn(clientUUID1);
when(hzClientWrapper.getConnectedClients()).thenReturn(ImmutableSet.of(clientUUID1, clientUUID2));
when(hzClientWrapper.getReplicatedMap(WORKER_UUIDS)).thenReturn(workerMap);

CeDistributedInformation ceDistributedInformation = new CeDistributedInformationImpl(hzClientWrapper, mock(CeWorkerFactory.class));
assertThat(ceDistributedInformation.getWorkerUUIDs()).containsExactly("1", "2", "3");
}

@Test
public void broadcastWorkerUUIDs_adds_local_workerUUIDs_to_shared_map_under_key_of_localendpoint_uuid() {
Set<String> connectedClients = new HashSet<>();
Map modifiableWorkerMap = new HashMap<>();
connectedClients.add(clientUUID1);
connectedClients.add(clientUUID2);

when(hzClientWrapper.getClientUUID()).thenReturn(clientUUID1);
when(hzClientWrapper.getConnectedClients()).thenReturn(connectedClients);
when(hzClientWrapper.getReplicatedMap(WORKER_UUIDS)).thenReturn(modifiableWorkerMap);

CeWorkerFactory ceWorkerFactory = mock(CeWorkerFactory.class);
when(ceWorkerFactory.getWorkerUUIDs()).thenReturn(ImmutableSet.of("a10", "a11"));
CeDistributedInformationImpl ceDistributedInformation = new CeDistributedInformationImpl(hzClientWrapper, ceWorkerFactory);

try {
ceDistributedInformation.broadcastWorkerUUIDs();
assertThat(modifiableWorkerMap).containsExactly(
entry(clientUUID1, ImmutableSet.of("a10", "a11"))
);
} finally {
ceDistributedInformation.stop();
}
}

@Test
public void stop_must_remove_local_workerUUIDs() {
Set<String> connectedClients = new HashSet<>();
connectedClients.add(clientUUID1);
connectedClients.add(clientUUID2);
connectedClients.add(clientUUID3);
Map modifiableWorkerMap = new HashMap();
modifiableWorkerMap.putAll(workerMap);

when(hzClientWrapper.getClientUUID()).thenReturn(clientUUID1);
when(hzClientWrapper.getConnectedClients()).thenReturn(connectedClients);
when(hzClientWrapper.getReplicatedMap(WORKER_UUIDS)).thenReturn(modifiableWorkerMap);

CeDistributedInformationImpl ceDistributedInformation = new CeDistributedInformationImpl(hzClientWrapper, mock(CeWorkerFactory.class));
ceDistributedInformation.stop();
assertThat(modifiableWorkerMap).containsExactly(
entry(clientUUID2, ImmutableSet.of("3")),
entry(clientUUID3, ImmutableSet.of("4", "5", "6"))
);
}
}

+ 70
- 0
server/sonar-ce/src/test/java/org/sonar/ce/StandaloneCeDistributedInformationTest.java View File

@@ -0,0 +1,70 @@
/*
* SonarQube
* Copyright (C) 2009-2017 SonarSource SA
* mailto:info AT sonarsource DOT com
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License as published by the Free Software Foundation; either
* version 3 of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with this program; if not, write to the Free Software Foundation,
* Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package org.sonar.ce;

import com.google.common.collect.ImmutableSet;
import java.util.Set;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.sonar.ce.taskprocessor.CeWorkerFactory;

import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

public class StandaloneCeDistributedInformationTest {
@Rule
public ExpectedException expectedException = ExpectedException.none();

@Test
public void broadcastWorkerUUIDs_must_retrieve_from_ceworkerfactory() {
CeWorkerFactory ceWorkerFactory = mock(CeWorkerFactory.class);
StandaloneCeDistributedInformation ceCluster = new StandaloneCeDistributedInformation(ceWorkerFactory);

ceCluster.broadcastWorkerUUIDs();
verify(ceWorkerFactory).getWorkerUUIDs();
}

@Test
public void getWorkerUUIDs_must_be_retrieved_from_ceworkerfactory() {
CeWorkerFactory ceWorkerFactory = mock(CeWorkerFactory.class);
Set<String> workerUUIDs = ImmutableSet.of("1", "2", "3");
when(ceWorkerFactory.getWorkerUUIDs()).thenReturn(workerUUIDs);
StandaloneCeDistributedInformation ceCluster = new StandaloneCeDistributedInformation(ceWorkerFactory);

ceCluster.broadcastWorkerUUIDs();
assertThat(ceCluster.getWorkerUUIDs()).isEqualTo(workerUUIDs);
}

@Test
public void when_broadcastWorkerUUIDs_is_not_called_getWorkerUUIDs_is_null() {
CeWorkerFactory ceWorkerFactory = mock(CeWorkerFactory.class);
Set<String> workerUUIDs = ImmutableSet.of("1", "2", "3");
when(ceWorkerFactory.getWorkerUUIDs()).thenReturn(workerUUIDs);
StandaloneCeDistributedInformation ceCluster = new StandaloneCeDistributedInformation(ceWorkerFactory);

expectedException.expect(IllegalStateException.class);
expectedException.expectMessage("Invalid call, broadcastWorkerUUIDs() must be called first.");

ceCluster.getWorkerUUIDs();
}
}

+ 288
- 0
server/sonar-ce/src/test/java/org/sonar/ce/cluster/HazelcastClientWrapperImplTest.java View File

@@ -0,0 +1,288 @@
/*
* SonarQube
* Copyright (C) 2009-2017 SonarSource SA
* mailto:info AT sonarsource DOT com
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License as published by the Free Software Foundation; either
* version 3 of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with this program; if not, write to the Free Software Foundation,
* Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/

package org.sonar.ce.cluster;

import ch.qos.logback.classic.LoggerContext;
import ch.qos.logback.classic.spi.ILoggingEvent;
import ch.qos.logback.core.AppenderBase;
import com.google.common.collect.ImmutableSet;
import com.hazelcast.client.impl.HazelcastClientInstanceImpl;
import com.hazelcast.client.impl.HazelcastClientProxy;
import com.hazelcast.core.HazelcastInstance;
import java.net.InetAddress;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import org.apache.commons.lang.RandomStringUtils;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.DisableOnDebug;
import org.junit.rules.ExpectedException;
import org.junit.rules.TestRule;
import org.junit.rules.Timeout;
import org.slf4j.LoggerFactory;
import org.sonar.api.config.MapSettings;
import org.sonar.api.config.PropertyDefinitions;
import org.sonar.api.config.Settings;
import org.sonar.process.NetworkUtils;
import org.sonar.process.ProcessProperties;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.data.MapEntry.entry;

public class HazelcastClientWrapperImplTest {
@Rule
public ExpectedException expectedException = ExpectedException.none();

@Rule
public TestRule safeGuard = new DisableOnDebug(Timeout.seconds(10));

private static HazelcastInstance hzCluster;
private static HazelcastClientWrapperImpl hzClient;

@BeforeClass
public static void setupHazelcastClusterAndHazelcastClient() {
int port = NetworkUtils.getNextAvailablePort(InetAddress.getLoopbackAddress());
hzCluster = HazelcastTestHelper.createHazelcastCluster("cluster_with_client", port);

Settings settings = createClusterSettings("cluster_with_client", "localhost:" + port);
hzClient = new HazelcastClientWrapperImpl(settings);
}

@AfterClass
public static void stopHazelcastClusterAndHazelcastClient() {
try {
hzClient.stop();
} catch (Exception e) {
// Ignore it
}
try {
hzCluster.shutdown();
} catch (Exception e) {
// Ignore it
}
}

@Test
public void start_throws_ISE_if_LOCALENDPOINT_is_incorrect() {
Settings settings = createClusterSettings("sonarqube", "\u4563\u1432\u1564");
HazelcastClientWrapperImpl hzClient = new HazelcastClientWrapperImpl(settings);

expectedException.expect(IllegalStateException.class);
expectedException.expectMessage("Unable to connect to any address in the config! The following addresses were tried:");

hzClient.start();
}

@Test
public void constructor_throws_ISE_if_LOCALENDPOINT_is_empty() {
Settings settings = createClusterSettings("sonarqube", "");

expectedException.expect(IllegalStateException.class);
expectedException.expectMessage("LocalEndPoint have not been set");

new HazelcastClientWrapperImpl(settings);
}

@Test
public void constructor_throws_ISE_if_CLUSTER_ENABLED_is_false() {
Settings settings = createClusterSettings("sonarqube", "localhost:9003");
settings.setProperty(ProcessProperties.CLUSTER_ENABLED, false);

expectedException.expect(IllegalStateException.class);
expectedException.expectMessage("Cluster is not enabled");

new HazelcastClientWrapperImpl(settings);
}

@Test
public void constructor_throws_ISE_if_missing_CLUSTER_ENABLED() {
Settings settings = createClusterSettings("sonarqube", "localhost:9003");
settings.removeProperty(ProcessProperties.CLUSTER_ENABLED);

expectedException.expect(IllegalStateException.class);
expectedException.expectMessage("Cluster is not enabled");

new HazelcastClientWrapperImpl(settings);
}

@Test
public void constructor_throws_ISE_if_missing_CLUSTER_NAME() {
Settings settings = createClusterSettings("sonarqube", "localhost:9003");
settings.removeProperty(ProcessProperties.CLUSTER_NAME);

expectedException.expect(IllegalStateException.class);
expectedException.expectMessage("sonar.cluster.name is missing");

new HazelcastClientWrapperImpl(settings);
}

@Test
public void constructor_throws_ISE_if_missing_CLUSTER_LOCALENDPOINT() {
Settings settings = createClusterSettings("sonarqube", "localhost:9003");
settings.removeProperty(ProcessProperties.CLUSTER_LOCALENDPOINT);

expectedException.expect(IllegalStateException.class);
expectedException.expectMessage("LocalEndPoint have not been set");

new HazelcastClientWrapperImpl(settings);
}

@Test
public void client_must_connect_to_hazelcast() {
int port = NetworkUtils.getNextAvailablePort(InetAddress.getLoopbackAddress());
// Launch a fake Hazelcast instance
HazelcastInstance hzInstance = HazelcastTestHelper.createHazelcastCluster("client_must_connect_to_hazelcast", port);
Settings settings = createClusterSettings("client_must_connect_to_hazelcast", "localhost:" + port);

HazelcastClientWrapperImpl hazelcastClientWrapperImpl = new HazelcastClientWrapperImpl(settings);
try {
hazelcastClientWrapperImpl.start();
assertThat(hazelcastClientWrapperImpl.getConnectedClients()).hasSize(1);
assertThat(hazelcastClientWrapperImpl.getClientUUID()).isNotEmpty();
} finally {
hazelcastClientWrapperImpl.stop();
}
}

@Test
public void client_must_be_able_to_set_ReplicatedMap_objects() throws InterruptedException {
try {
hzClient.start();

Set<String> setTest = new HashSet<>();
setTest.addAll(
Arrays.asList(RandomStringUtils.randomAlphanumeric(10), RandomStringUtils.randomAlphanumeric(10))
);
Map<String, Set<String>> replicatedMap = hzClient.getReplicatedMap("TEST1");
replicatedMap.put("KEY1", ImmutableSet.copyOf(setTest));
assertThat(hzCluster.getReplicatedMap("TEST1"))
.containsOnlyKeys("KEY1");
assertThat(hzCluster.getReplicatedMap("TEST1").get("KEY1"))
.isEqualTo(setTest);
} finally {
hzClient.stop();
}
}

@Test
public void client_must_be_able_to_retrieve_Set_objects() {
try {
hzClient.start();

// Set
Set<String> setTest = new HashSet<>();
setTest.addAll(Arrays.asList("8", "9"));
hzCluster.getSet("TEST1").addAll(setTest);
assertThat(hzClient.getSet("TEST1")).containsAll(setTest);
} finally {
hzClient.stop();
}
}

@Test
public void client_must_be_able_to_retrieve_List_objects() {
try {
hzClient.start();

// List
List<String> listTest = Arrays.asList("1", "2");
hzCluster.getList("TEST2").addAll(listTest);
assertThat(hzClient.getList("TEST2")).containsAll(listTest);
} finally {
hzClient.stop();
}
}

@Test
public void client_must_be_able_to_retrieve_Map_objects() {
try {
hzClient.start();

Map mapTest = new HashMap<>();
mapTest.put("a", Arrays.asList("123", "456"));
hzCluster.getMap("TEST3").putAll(mapTest);
assertThat(hzClient.getMap("TEST3")).containsExactly(
entry("a", Arrays.asList("123", "456"))
);
} finally {
hzClient.stop();
}
}

@Test
public void configuration_tweaks_of_hazelcast_must_be_present() {
try {
hzClient.start();
HazelcastClientInstanceImpl realClient = ((HazelcastClientProxy) hzClient.hzInstance).client;
assertThat(realClient.getClientConfig().getProperty("hazelcast.tcp.join.port.try.count")).isEqualTo("10");
assertThat(realClient.getClientConfig().getProperty("hazelcast.phone.home.enabled")).isEqualTo("false");
assertThat(realClient.getClientConfig().getProperty("hazelcast.logging.type")).isEqualTo("slf4j");
} finally {
hzClient.stop();
}
}

@Test
public void hazelcast_client_must_log_through_sl4fj() {
MemoryAppender<ILoggingEvent> memoryAppender = new MemoryAppender<>();
LoggerContext lc = (LoggerContext) LoggerFactory.getILoggerFactory();
lc.reset();
memoryAppender.setContext(lc);
memoryAppender.start();
lc.getLogger("com.hazelcast").addAppender(memoryAppender);

try {
hzClient.start();
} finally {
hzClient.stop();
memoryAppender.stop();
}
assertThat(memoryAppender.events).isNotEmpty();
memoryAppender.events.stream().forEach(
e -> assertThat(e.getLoggerName()).startsWith("com.hazelcast")
);
}

private static Settings createClusterSettings(String name, String localEndPoint) {
Properties properties = new Properties();
properties.setProperty(ProcessProperties.CLUSTER_NAME, name);
properties.setProperty(ProcessProperties.CLUSTER_LOCALENDPOINT, localEndPoint);
properties.setProperty(ProcessProperties.CLUSTER_ENABLED, "true");
return new MapSettings(new PropertyDefinitions()).addProperties(properties);
}

private class MemoryAppender<E> extends AppenderBase<E> {
private final List<E> events = new ArrayList();

@Override
protected void append(E eventObject) {
events.add(eventObject);
}
}
}

+ 95
- 0
server/sonar-ce/src/test/java/org/sonar/ce/cluster/HazelcastTestHelper.java View File

@@ -0,0 +1,95 @@
/*
* SonarQube
* Copyright (C) 2009-2017 SonarSource SA
* mailto:info AT sonarsource DOT com
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License as published by the Free Software Foundation; either
* version 3 of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with this program; if not, write to the Free Software Foundation,
* Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/

package org.sonar.ce.cluster;

import com.hazelcast.config.Config;
import com.hazelcast.config.JoinConfig;
import com.hazelcast.config.NetworkConfig;
import com.hazelcast.core.Client;
import com.hazelcast.core.ClientListener;
import com.hazelcast.core.Hazelcast;
import com.hazelcast.core.HazelcastInstance;
import java.net.InetAddress;

import static org.sonar.process.NetworkUtils.getHostName;
import static org.sonar.process.cluster.ClusterObjectKeys.CLIENT_UUIDS;

public class HazelcastTestHelper {

public static HazelcastInstance createHazelcastCluster(String clusterName, int port) {
Config hzConfig = new Config();
hzConfig.getGroupConfig().setName(clusterName);

// Configure the network instance
NetworkConfig netConfig = hzConfig.getNetworkConfig();
netConfig
.setPort(port)
.setReuseAddress(true);

netConfig.getInterfaces()
.setEnabled(true)
.addInterface(InetAddress.getLoopbackAddress().getHostAddress());

// Only allowing TCP/IP configuration
JoinConfig joinConfig = netConfig.getJoin();
joinConfig.getAwsConfig().setEnabled(false);
joinConfig.getMulticastConfig().setEnabled(false);
joinConfig.getTcpIpConfig().setEnabled(true);

// Tweak HazelCast configuration
hzConfig
// Increase the number of tries
.setProperty("hazelcast.tcp.join.port.try.count", "10")
// Don't bind on all interfaces
.setProperty("hazelcast.socket.bind.any", "false")
// Don't phone home
.setProperty("hazelcast.phone.home.enabled", "false")
// Use slf4j for logging
.setProperty("hazelcast.logging.type", "slf4j");

// Trying to resolve the hostname
hzConfig.getMemberAttributeConfig().setStringAttribute("HOSTNAME", getHostName());

// We are not using the partition group of Hazelcast, so disabling it
hzConfig.getPartitionGroupConfig().setEnabled(false);
HazelcastInstance hzInstance = Hazelcast.newHazelcastInstance(hzConfig);
hzInstance.getClientService().addClientListener(new ConnectedClientListener(hzInstance));
return hzInstance;
}

private static class ConnectedClientListener implements ClientListener {
private final HazelcastInstance hzInstance;

private ConnectedClientListener(HazelcastInstance hzInstance) {
this.hzInstance = hzInstance;
}

@Override
public void clientConnected(Client client) {
hzInstance.getSet(CLIENT_UUIDS).add(client.getUuid());
}

@Override
public void clientDisconnected(Client client) {
hzInstance.getSet(CLIENT_UUIDS).remove(client.getUuid());
}
}
}

+ 66
- 13
server/sonar-ce/src/test/java/org/sonar/ce/container/ComputeEngineContainerImplTest.java View File

@@ -19,21 +19,30 @@
*/
package org.sonar.ce.container;

import com.hazelcast.core.HazelcastInstance;
import java.io.File;
import java.io.IOException;
import java.net.InetAddress;
import java.util.Date;
import java.util.Properties;
import java.util.stream.Collectors;
import org.apache.commons.dbcp.BasicDataSource;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.picocontainer.ComponentAdapter;
import org.picocontainer.MutablePicoContainer;
import org.sonar.api.CoreProperties;
import org.sonar.api.database.DatabaseProperties;
import org.sonar.api.utils.DateUtils;
import org.sonar.api.utils.System2;
import org.sonar.ce.cluster.HazelcastClientWrapperImpl;
import org.sonar.ce.cluster.HazelcastTestHelper;
import org.sonar.ce.CeDistributedInformationImpl;
import org.sonar.ce.StandaloneCeDistributedInformation;
import org.sonar.db.DbTester;
import org.sonar.db.property.PropertyDto;
import org.sonar.process.NetworkUtils;
import org.sonar.process.ProcessId;
import org.sonar.process.ProcessProperties;
import org.sonar.process.Props;
@@ -49,6 +58,7 @@ import static org.sonar.process.ProcessProperties.PATH_TEMP;
public class ComputeEngineContainerImplTest {
private static final int CONTAINER_ITSELF = 1;
private static final int COMPONENTS_IN_LEVEL_1_AT_CONSTRUCTION = CONTAINER_ITSELF + 1;
private static final String CLUSTER_NAME = "test";

@Rule
public TemporaryFolder tempFolder = new TemporaryFolder();
@@ -63,19 +73,36 @@ public class ComputeEngineContainerImplTest {
}

@Test
public void real_start() throws IOException {
Properties properties = ProcessProperties.defaults();
File homeDir = tempFolder.newFolder();
File dataDir = new File(homeDir, "data");
File tmpDir = new File(homeDir, "tmp");
properties.setProperty(PATH_HOME, homeDir.getAbsolutePath());
properties.setProperty(PATH_DATA, dataDir.getAbsolutePath());
properties.setProperty(PATH_TEMP, tmpDir.getAbsolutePath());
properties.setProperty(PROPERTY_PROCESS_INDEX, valueOf(ProcessId.COMPUTE_ENGINE.getIpcIndex()));
properties.setProperty(PROPERTY_SHARED_PATH, tmpDir.getAbsolutePath());
properties.setProperty(DatabaseProperties.PROP_URL, ((BasicDataSource) dbTester.database().getDataSource()).getUrl());
properties.setProperty(DatabaseProperties.PROP_USER, "sonar");
properties.setProperty(DatabaseProperties.PROP_PASSWORD, "sonar");
public void real_start_with_cluster() throws IOException {
int port = NetworkUtils.getNextAvailablePort(InetAddress.getLoopbackAddress());
HazelcastInstance hzInstance = HazelcastTestHelper.createHazelcastCluster(CLUSTER_NAME, port);

Properties properties = getProperties();
properties.setProperty(ProcessProperties.CLUSTER_ENABLED, "true");
properties.setProperty(ProcessProperties.CLUSTER_LOCALENDPOINT, String.format("%s:%d", hzInstance.getCluster().getLocalMember().getAddress().getHost(), port));
properties.setProperty(ProcessProperties.CLUSTER_NAME, CLUSTER_NAME);

// required persisted properties
insertProperty(CoreProperties.SERVER_ID, "a_startup_id");
insertProperty(CoreProperties.SERVER_STARTTIME, DateUtils.formatDateTime(new Date()));

underTest
.start(new Props(properties));

MutablePicoContainer picoContainer = underTest.getComponentContainer().getPicoContainer();
assertThat(
picoContainer.getComponentAdapters().stream()
.map(ComponentAdapter::getComponentImplementation)
.collect(Collectors.toList())
).contains((Class) HazelcastClientWrapperImpl.class,
(Class) CeDistributedInformationImpl.class
);
underTest.stop();
}

@Test
public void real_start_without_cluster() throws IOException {
Properties properties = getProperties();

// required persisted properties
insertProperty(CoreProperties.SERVER_ID, "a_startup_id");
@@ -96,6 +123,7 @@ public class ComputeEngineContainerImplTest {
+ 4 // content of ProjectAnalysisTaskModule
+ 4 // content of CeTaskProcessorModule
+ 3 // CeCleaningModule + its content
+ 1 // CeDistributedInformation
);
assertThat(picoContainer.getParent().getComponentAdapters()).hasSize(
CONTAINER_ITSELF
@@ -113,6 +141,15 @@ public class ComputeEngineContainerImplTest {
+ 3 // content of EsSearchModule
+ 57 // content of CorePropertyDefinitions
);
assertThat(
picoContainer.getComponentAdapters().stream()
.map(ComponentAdapter::getComponentImplementation)
.collect(Collectors.toList())
).doesNotContain((Class) HazelcastClientWrapperImpl.class,
(Class) CeDistributedInformationImpl.class
).contains(
(Class) StandaloneCeDistributedInformation.class
);
assertThat(picoContainer.getParent().getParent().getParent().getParent()).isNull();
underTest.stop();

@@ -121,6 +158,22 @@ public class ComputeEngineContainerImplTest {
assertThat(picoContainer.getLifecycleState().isDisposed()).isTrue();
}

private Properties getProperties() throws IOException {
Properties properties = ProcessProperties.defaults();
File homeDir = tempFolder.newFolder();
File dataDir = new File(homeDir, "data");
File tmpDir = new File(homeDir, "tmp");
properties.setProperty(PATH_HOME, homeDir.getAbsolutePath());
properties.setProperty(PATH_DATA, dataDir.getAbsolutePath());
properties.setProperty(PATH_TEMP, tmpDir.getAbsolutePath());
properties.setProperty(PROPERTY_PROCESS_INDEX, valueOf(ProcessId.COMPUTE_ENGINE.getIpcIndex()));
properties.setProperty(PROPERTY_SHARED_PATH, tmpDir.getAbsolutePath());
properties.setProperty(DatabaseProperties.PROP_URL, ((BasicDataSource) dbTester.database().getDataSource()).getUrl());
properties.setProperty(DatabaseProperties.PROP_USER, "sonar");
properties.setProperty(DatabaseProperties.PROP_PASSWORD, "sonar");
return properties;
}

private void insertProperty(String key, String value) {
PropertyDto dto = new PropertyDto().setKey(key).setValue(value);
dbTester.getDbClient().propertiesDao().saveProperty(dbTester.getSession(), dto);

+ 2
- 1
server/sonar-ce/src/test/java/org/sonar/ce/queue/CeQueueInitializerTest.java View File

@@ -23,6 +23,7 @@ import java.io.IOException;
import org.junit.Test;
import org.sonar.api.platform.Server;
import org.sonar.ce.cleaning.CeCleaningScheduler;
import org.sonar.ce.CeDistributedInformation;
import org.sonar.ce.taskprocessor.CeProcessingScheduler;

import static org.mockito.Mockito.mock;
@@ -35,7 +36,7 @@ public class CeQueueInitializerTest {
private Server server = mock(Server.class);
private CeProcessingScheduler processingScheduler = mock(CeProcessingScheduler.class);
private CeCleaningScheduler cleaningScheduler = mock(CeCleaningScheduler.class);
private CeQueueInitializer underTest = new CeQueueInitializer(processingScheduler, cleaningScheduler);
private CeQueueInitializer underTest = new CeQueueInitializer(processingScheduler, cleaningScheduler, mock(CeDistributedInformation.class));

@Test
public void clean_queue_then_start_scheduler_of_workers() throws IOException {

+ 7
- 0
server/sonar-process-monitor/pom.xml View File

@@ -40,6 +40,13 @@
<groupId>com.hazelcast</groupId>
<artifactId>hazelcast</artifactId>
</dependency>
<!--
Required by our usage of Guava for clustering : CeWorkerFactoryImpl.getClusteredWorkerUUIDs()
-->
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</dependency>

<dependency>
<groupId>com.google.code.findbugs</groupId>

+ 4
- 0
server/sonar-process-monitor/src/main/java/org/sonar/application/cluster/AppStateClusterImpl.java View File

@@ -31,6 +31,7 @@ import org.sonar.application.AppState;
import org.sonar.application.AppStateListener;
import org.sonar.application.config.AppSettings;
import org.sonar.process.ProcessId;
import org.sonar.process.ProcessProperties;

public class AppStateClusterImpl implements AppState {
private static Logger LOGGER = LoggerFactory.getLogger(AppStateClusterImpl.class);
@@ -47,6 +48,9 @@ public class AppStateClusterImpl implements AppState {
}

hazelcastCluster = HazelcastCluster.create(clusterProperties);
// Add the local endpoint to be used by processes
appSettings.getProps().set(ProcessProperties.CLUSTER_LOCALENDPOINT, hazelcastCluster.getLocalEndPoint());
appSettings.getProps().set(ProcessProperties.CLUSTER_MEMBERUUID, hazelcastCluster.getLocalUUID());

String members = hazelcastCluster.getMembers().stream().collect(Collectors.joining(","));
LOGGER.info("Joined the cluster [{}] that contains the following hosts : [{}]", hazelcastCluster.getName(), members);

+ 33
- 10
server/sonar-process-monitor/src/main/java/org/sonar/application/cluster/HazelcastCluster.java View File

@@ -23,6 +23,8 @@ package org.sonar.application.cluster;
import com.hazelcast.config.Config;
import com.hazelcast.config.JoinConfig;
import com.hazelcast.config.NetworkConfig;
import com.hazelcast.core.Client;
import com.hazelcast.core.ClientListener;
import com.hazelcast.core.EntryEvent;
import com.hazelcast.core.EntryListener;
import com.hazelcast.core.Hazelcast;
@@ -32,6 +34,7 @@ import com.hazelcast.core.ILock;
import com.hazelcast.core.MapEvent;
import com.hazelcast.core.Member;
import com.hazelcast.core.ReplicatedMap;
import com.hazelcast.nio.Address;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
@@ -41,18 +44,19 @@ import org.sonar.process.ProcessId;

import static java.util.stream.Collectors.toList;
import static org.sonar.process.NetworkUtils.getHostName;
import static org.sonar.process.cluster.ClusterObjectKeys.CLIENT_UUIDS;
import static org.sonar.process.cluster.ClusterObjectKeys.HOSTNAME;
import static org.sonar.process.cluster.ClusterObjectKeys.LEADER;
import static org.sonar.process.cluster.ClusterObjectKeys.OPERATIONAL_PROCESSES;
import static org.sonar.process.cluster.ClusterObjectKeys.SONARQUBE_VERSION;

public class HazelcastCluster implements AutoCloseable {
static final String OPERATIONAL_PROCESSES = "OPERATIONAL_PROCESSES";
static final String LEADER = "LEADER";
static final String HOSTNAME = "HOSTNAME";
static final String SONARQUBE_VERSION = "SONARQUBE_VERSION";

private final List<AppStateListener> listeners = new ArrayList<>();
private final ReplicatedMap<ClusterProcess, Boolean> operationalProcesses;
private final String operationalProcessListenerUUID;
private final String clientListenerUUID;

final HazelcastInstance hzInstance;
protected final HazelcastInstance hzInstance;

private HazelcastCluster(Config hzConfig) {
// Create the Hazelcast instance
@@ -61,9 +65,10 @@ public class HazelcastCluster implements AutoCloseable {
// Get or create the replicated map
operationalProcesses = hzInstance.getReplicatedMap(OPERATIONAL_PROCESSES);
operationalProcessListenerUUID = operationalProcesses.addEntryListener(new OperationalProcessListener());
clientListenerUUID = hzInstance.getClientService().addClientListener(new ConnectedClientListener());
}

String getLocalUuid() {
String getLocalUUID() {
return hzInstance.getLocalEndpoint().getUuid();
}

@@ -92,7 +97,7 @@ public class HazelcastCluster implements AutoCloseable {
}

void setOperational(ProcessId processId) {
operationalProcesses.put(new ClusterProcess(getLocalUuid(), processId), Boolean.TRUE);
operationalProcesses.put(new ClusterProcess(getLocalUUID(), processId), Boolean.TRUE);
}

boolean tryToLockWebLeader() {
@@ -102,7 +107,7 @@ public class HazelcastCluster implements AutoCloseable {
lock.lock();
try {
if (leader.get() == null) {
leader.set(getLocalUuid());
leader.set(getLocalUUID());
return true;
} else {
return false;
@@ -142,11 +147,12 @@ public class HazelcastCluster implements AutoCloseable {
if (hzInstance != null) {
// Removing listeners
operationalProcesses.removeEntryListener(operationalProcessListenerUUID);
hzInstance.getClientService().removeClientListener(clientListenerUUID);

// Removing the operationalProcess from the replicated map
operationalProcesses.keySet().forEach(
clusterNodeProcess -> {
if (clusterNodeProcess.getNodeUuid().equals(getLocalUuid())) {
if (clusterNodeProcess.getNodeUuid().equals(getLocalUUID())) {
operationalProcesses.remove(clusterNodeProcess);
}
});
@@ -209,6 +215,11 @@ public class HazelcastCluster implements AutoCloseable {
return Optional.empty();
}

String getLocalEndPoint() {
Address localAddress = hzInstance.getCluster().getLocalMember().getAddress();
return String.format("%s:%d", localAddress.getHost(), localAddress.getPort());
}

private class OperationalProcessListener implements EntryListener<ClusterProcess, Boolean> {
@Override
public void entryAdded(EntryEvent<ClusterProcess, Boolean> event) {
@@ -244,4 +255,16 @@ public class HazelcastCluster implements AutoCloseable {
// Ignore it
}
}

private class ConnectedClientListener implements ClientListener {
@Override
public void clientConnected(Client client) {
hzInstance.getSet(CLIENT_UUIDS).add(client.getUuid());
}

@Override
public void clientDisconnected(Client client) {
hzInstance.getSet(CLIENT_UUIDS).remove(client.getUuid());
}
}
}

+ 1
- 1
server/sonar-process-monitor/src/test/java/org/sonar/application/cluster/AppStateClusterImplTest.java View File

@@ -39,9 +39,9 @@ import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.timeout;
import static org.mockito.Mockito.verify;
import static org.sonar.application.cluster.HazelcastCluster.SONARQUBE_VERSION;
import static org.sonar.application.cluster.HazelcastTestHelper.createHazelcastClient;
import static org.sonar.application.cluster.HazelcastTestHelper.newClusterSettings;
import static org.sonar.process.cluster.ClusterObjectKeys.SONARQUBE_VERSION;

public class AppStateClusterImplTest {


+ 83
- 6
server/sonar-process-monitor/src/test/java/org/sonar/application/cluster/HazelcastClusterTest.java View File

@@ -19,34 +19,45 @@
*/
package org.sonar.application.cluster;

import ch.qos.logback.classic.LoggerContext;
import ch.qos.logback.classic.spi.ILoggingEvent;
import ch.qos.logback.core.AppenderBase;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.core.ItemEvent;
import com.hazelcast.core.ItemListener;
import com.hazelcast.core.ReplicatedMap;
import java.net.InetAddress;
import java.util.AbstractMap;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.DisableOnDebug;
import org.junit.rules.ExpectedException;
import org.junit.rules.TestRule;
import org.junit.rules.Timeout;
import org.slf4j.LoggerFactory;
import org.sonar.application.AppStateListener;
import org.sonar.application.config.TestAppSettings;
import org.sonar.process.NetworkUtils;
import org.sonar.process.ProcessId;
import org.sonar.process.ProcessProperties;
import org.sonar.process.cluster.ClusterObjectKeys;

import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.timeout;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.sonar.application.cluster.HazelcastCluster.LEADER;
import static org.sonar.application.cluster.HazelcastCluster.OPERATIONAL_PROCESSES;
import static org.sonar.application.cluster.HazelcastCluster.SONARQUBE_VERSION;
import static org.sonar.application.cluster.HazelcastTestHelper.createHazelcastClient;
import static org.sonar.application.cluster.HazelcastTestHelper.newClusterSettings;
import static org.sonar.process.ProcessProperties.CLUSTER_NAME;
import static org.sonar.process.cluster.ClusterObjectKeys.LEADER;
import static org.sonar.process.cluster.ClusterObjectKeys.OPERATIONAL_PROCESSES;
import static org.sonar.process.cluster.ClusterObjectKeys.SONARQUBE_VERSION;

public class HazelcastClusterTest {
@Rule
@@ -114,7 +125,7 @@ public class HazelcastClusterTest {
HazelcastInstance hzInstance = createHazelcastClient(hzCluster);
ReplicatedMap<ClusterProcess, Boolean> operationalProcesses = hzInstance.getReplicatedMap(OPERATIONAL_PROCESSES);
assertThat(operationalProcesses)
.containsExactly(new AbstractMap.SimpleEntry<>(new ClusterProcess(hzCluster.getLocalUuid(), ProcessId.ELASTICSEARCH), Boolean.TRUE));
.containsExactly(new AbstractMap.SimpleEntry<>(new ClusterProcess(hzCluster.getLocalUUID(), ProcessId.ELASTICSEARCH), Boolean.TRUE));
}
}

@@ -128,11 +139,40 @@ public class HazelcastClusterTest {
}
}

@Test
public void cluster_must_keep_a_list_of_clients() throws InterruptedException {
TestAppSettings testAppSettings = newClusterSettings();
testAppSettings.set(CLUSTER_NAME, "a_cluster_");
ClusterProperties clusterProperties = new ClusterProperties(testAppSettings);
try (HazelcastCluster hzCluster = HazelcastCluster.create(clusterProperties)) {
assertThat(hzCluster.hzInstance.getSet(ClusterObjectKeys.CLIENT_UUIDS)).isEmpty();
HazelcastInstance hzClient = HazelcastTestHelper.createHazelcastClient(hzCluster);
assertThat(hzCluster.hzInstance.getSet(ClusterObjectKeys.CLIENT_UUIDS)).containsExactly(hzClient.getLocalEndpoint().getUuid());

CountDownLatch latch = new CountDownLatch(1);
hzCluster.hzInstance.getSet(ClusterObjectKeys.CLIENT_UUIDS).addItemListener(new ItemListener<Object>() {
@Override
public void itemAdded(ItemEvent<Object> item) {
}

@Override
public void itemRemoved(ItemEvent<Object> item) {
latch.countDown();
}
}, false);

hzClient.shutdown();
latch.await(1, TimeUnit.SECONDS);

assertThat(hzCluster.hzInstance.getSet(ClusterObjectKeys.CLIENT_UUIDS)).isEmpty();
}
}

@Test
public void localUUID_must_not_be_empty() {
ClusterProperties clusterProperties = new ClusterProperties(newClusterSettings());
try (HazelcastCluster hzCluster = HazelcastCluster.create(clusterProperties)) {
assertThat(hzCluster.getLocalUuid()).isNotEmpty();
assertThat(hzCluster.getLocalUUID()).isNotEmpty();
}
}

@@ -170,7 +210,6 @@ public class HazelcastClusterTest {
}
}


@Test
public void registerSonarQubeVersion_throws_ISE_if_initial_version_is_different() throws Exception {
ClusterProperties clusterProperties = new ClusterProperties(newClusterSettings());
@@ -215,4 +254,42 @@ public class HazelcastClusterTest {
hzInstance.shutdown();
}
}

@Test
public void hazelcast_must_log_through_sl4fj() {
MemoryAppender<ILoggingEvent> memoryAppender = new MemoryAppender<>();
LoggerContext lc = (LoggerContext) LoggerFactory.getILoggerFactory();
lc.reset();
memoryAppender.setContext(lc);
memoryAppender.start();
lc.getLogger("com.hazelcast").addAppender(memoryAppender);

try (AppStateClusterImpl appStateCluster = new AppStateClusterImpl(newClusterSettings())) {
}

assertThat(memoryAppender.events).isNotEmpty();
memoryAppender.events.stream().forEach(
e -> assertThat(e.getLoggerName()).startsWith("com.hazelcast")
);
}

private class MemoryAppender<E> extends AppenderBase<E> {
private final List<E> events = new ArrayList();

@Override
protected void append(E eventObject) {
events.add(eventObject);
}
}


@Test
public void configuration_tweaks_of_hazelcast_must_be_present() {
try (HazelcastCluster hzCluster = HazelcastCluster.create(new ClusterProperties(newClusterSettings()))) {
assertThat(hzCluster.hzInstance.getConfig().getProperty("hazelcast.tcp.join.port.try.count")).isEqualTo("10");
assertThat(hzCluster.hzInstance.getConfig().getProperty("hazelcast.phone.home.enabled")).isEqualTo("false");
assertThat(hzCluster.hzInstance.getConfig().getProperty("hazelcast.logging.type")).isEqualTo("slf4j");
assertThat(hzCluster.hzInstance.getConfig().getProperty("hazelcast.socket.bind.any")).isEqualTo("false");
}
}
}

+ 0
- 2
server/sonar-process-monitor/src/test/java/org/sonar/application/cluster/HazelcastTestHelper.java View File

@@ -46,12 +46,10 @@ public class HazelcastTestHelper {
return createHazelcastClient(appStateCluster.getHazelcastCluster());
}


static TestAppSettings newClusterSettings() {
TestAppSettings settings = new TestAppSettings();
settings.set(ProcessProperties.CLUSTER_ENABLED, "true");
settings.set(ProcessProperties.CLUSTER_NAME, "sonarqube");
return settings;
}

}

+ 4
- 0
server/sonar-process/src/main/java/org/sonar/process/ProcessProperties.java View File

@@ -40,6 +40,10 @@ public class ProcessProperties {
public static final String CLUSTER_NAME = "sonar.cluster.name";
public static final String HAZELCAST_LOG_LEVEL = "sonar.log.level.app.hazelcast";
public static final String CLUSTER_WEB_LEADER = "sonar.cluster.web.startupLeader";
// Internal property used by sonar-application to share the local endpoint of Hazelcast
public static final String CLUSTER_LOCALENDPOINT = "sonar.cluster.hazelcast.localEndPoint";
// Internal property used by sonar-application to share the local UUID of the Hazelcast member
public static final String CLUSTER_MEMBERUUID = "sonar.cluster.hazelcast.memberUUID";

public static final String JDBC_URL = "sonar.jdbc.url";
public static final String JDBC_DRIVER_PATH = "sonar.jdbc.driverPath";

+ 56
- 0
server/sonar-process/src/main/java/org/sonar/process/cluster/ClusterObjectKeys.java View File

@@ -0,0 +1,56 @@
/*
* SonarQube
* Copyright (C) 2009-2017 SonarSource SA
* mailto:info AT sonarsource DOT com
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License as published by the Free Software Foundation; either
* version 3 of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with this program; if not, write to the Free Software Foundation,
* Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/

package org.sonar.process.cluster;

/**
* This class holds all object keys accessible via Hazelcast
*/
public final class ClusterObjectKeys {

private ClusterObjectKeys() {
// Holder for clustered objects
}

/**
* The key of replicated map that hold all operational processes
*/
public static final String OPERATIONAL_PROCESSES = "OPERATIONAL_PROCESSES";
/**
* The key of atomic reference holding the leader UUID
*/
public static final String LEADER = "LEADER";
/**
* The key of the hostname attribute of a member
*/
public static final String HOSTNAME = "HOSTNAME";
/**
* The key of atomic reference holding the SonarQube version of the cluster
*/
public static final String SONARQUBE_VERSION = "SONARQUBE_VERSION";
/**
* The key of the Set holding the UUIDs of clients
*/
public static final String CLIENT_UUIDS = "CLIENT_UUIDS";
/**
* The key of replicated map holding the CeWorker UUIDs
*/
public static final String WORKER_UUIDS = "WORKER_UUIDS";
}

+ 23
- 0
server/sonar-process/src/main/java/org/sonar/process/cluster/package-info.java View File

@@ -0,0 +1,23 @@
/*
* SonarQube
* Copyright (C) 2009-2017 SonarSource SA
* mailto:info AT sonarsource DOT com
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License as published by the Free Software Foundation; either
* version 3 of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with this program; if not, write to the Free Software Foundation,
* Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
@ParametersAreNonnullByDefault
package org.sonar.process.cluster;

import javax.annotation.ParametersAreNonnullByDefault;

+ 2
- 2
sonar-application/pom.xml View File

@@ -236,8 +236,8 @@
<configuration>
<rules>
<requireFilesSize>
<minsize>126000000</minsize>
<maxsize>134000000</maxsize>
<minsize>128000000</minsize>
<maxsize>136000000</maxsize>
<files>
<file>${project.build.directory}/sonarqube-${project.version}.zip</file>
</files>

Loading…
Cancel
Save