<artifactId>jsr305</artifactId>
<scope>provided</scope>
</dependency>
+ <dependency>
+ <groupId>com.hazelcast</groupId>
+ <artifactId>hazelcast-client</artifactId>
+ </dependency>
<!-- unit tests -->
<dependency>
--- /dev/null
+/*
+ * SonarQube
+ * Copyright (C) 2009-2017 SonarSource SA
+ * mailto:info AT sonarsource DOT com
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 3 of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this program; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ */
+
+package org.sonar.ce;
+
+import java.util.Set;
+
+/**
+ * CeDistributedInformation is the interface to be implemented in order
+ * to implement information shared by all CE nodes
+ */
+public interface CeDistributedInformation {
+ Set<String> getWorkerUUIDs();
+
+ /**
+ * This method must be called once the workers of the current Compute Engine node
+ * are up so that they are shared with other Compute Engine nodes
+ */
+ void broadcastWorkerUUIDs();
+}
--- /dev/null
+/*
+ * SonarQube
+ * Copyright (C) 2009-2017 SonarSource SA
+ * mailto:info AT sonarsource DOT com
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 3 of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this program; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ */
+
+package org.sonar.ce;
+
+import java.util.Map;
+import java.util.Set;
+import org.picocontainer.Startable;
+import org.sonar.ce.cluster.HazelcastClientWrapper;
+import org.sonar.ce.taskprocessor.CeWorkerFactory;
+
+import static org.sonar.core.util.stream.MoreCollectors.toSet;
+import static org.sonar.process.cluster.ClusterObjectKeys.WORKER_UUIDS;
+
+/**
+ * Provide the set of worker's UUID in a clustered SonarQube instance
+ */
+public class CeDistributedInformationImpl implements CeDistributedInformation, Startable {
+ private final HazelcastClientWrapper hazelcastClientWrapper;
+ private final CeWorkerFactory ceCeWorkerFactory;
+
+ public CeDistributedInformationImpl(HazelcastClientWrapper hazelcastClientWrapper, CeWorkerFactory ceCeWorkerFactory) {
+ this.hazelcastClientWrapper = hazelcastClientWrapper;
+ this.ceCeWorkerFactory = ceCeWorkerFactory;
+ }
+
+ @Override
+ public Set<String> getWorkerUUIDs() {
+ Set<String> connectedWorkerUUIDs = hazelcastClientWrapper.getConnectedClients();
+
+ return getClusteredWorkerUUIDs().entrySet().stream()
+ .filter(e -> connectedWorkerUUIDs.contains(e.getKey()))
+ .map(Map.Entry::getValue)
+ .flatMap(Set::stream)
+ .collect(toSet());
+ }
+
+ @Override
+ public void broadcastWorkerUUIDs() {
+ getClusteredWorkerUUIDs().put(hazelcastClientWrapper.getClientUUID(), ceCeWorkerFactory.getWorkerUUIDs());
+ }
+
+ @Override
+ public void start() {
+ // Nothing to do here
+ }
+
+ @Override
+ public void stop() {
+ // Removing the worker UUIDs
+ getClusteredWorkerUUIDs().remove(hazelcastClientWrapper.getClientUUID());
+ }
+
+ private Map<String, Set<String>> getClusteredWorkerUUIDs() {
+ return hazelcastClientWrapper.getReplicatedMap(WORKER_UUIDS);
+ }
+}
--- /dev/null
+/*
+ * SonarQube
+ * Copyright (C) 2009-2017 SonarSource SA
+ * mailto:info AT sonarsource DOT com
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 3 of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this program; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ */
+
+package org.sonar.ce;
+
+import java.util.Set;
+import org.sonar.ce.taskprocessor.CeWorkerFactory;
+
+import static com.google.common.base.Preconditions.checkState;
+
+/**
+ * Provide the set of worker's UUID in a non clustered SonarQube instance
+ */
+public class StandaloneCeDistributedInformation implements CeDistributedInformation {
+ private final CeWorkerFactory ceCeWorkerFactory;
+ private Set<String> workerUUIDs;
+
+ public StandaloneCeDistributedInformation(CeWorkerFactory ceCeWorkerFactory) {
+ this.ceCeWorkerFactory = ceCeWorkerFactory;
+ }
+
+ @Override
+ public Set<String> getWorkerUUIDs() {
+ checkState(workerUUIDs != null, "Invalid call, broadcastWorkerUUIDs() must be called first.");
+ return workerUUIDs;
+ }
+
+ @Override
+ public void broadcastWorkerUUIDs() {
+ workerUUIDs = ceCeWorkerFactory.getWorkerUUIDs();
+ }
+}
--- /dev/null
+/*
+ * SonarQube
+ * Copyright (C) 2009-2017 SonarSource SA
+ * mailto:info AT sonarsource DOT com
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 3 of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this program; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ */
+
+package org.sonar.ce.cluster;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.locks.Lock;
+
+/**
+ * The interface Hazelcast client wrapper.
+ */
+public interface HazelcastClientWrapper {
+ /**
+ * Gets the set shared by the cluster and identified by name
+ */
+ <E> Set<E> getSet(String name);
+
+ /**
+ * Gets the list shared by the cluster and identified by name
+ */
+ <E> List<E> getList(String name);
+
+ /**
+ * Gets the map shared by the cluster and identified by name
+ */
+ <K, V> Map<K, V> getMap(String name);
+
+ /**
+ * Gets the replicated map shared by the cluster and identified by name
+ */
+ <K,V> Map<K,V> getReplicatedMap(String name);
+
+ /**
+ * Retrieve the local UUID
+ */
+ String getClientUUID();
+
+ /**
+ * Retrieve the Set of connected clients.
+ * The client is only CE for the time being
+ *
+ * @return the connected clients
+ */
+ Set<String> getConnectedClients();
+
+ /**
+ * Gets lock among the cluster, identified by name
+ */
+ Lock getLock(String name);
+}
--- /dev/null
+/*
+ * SonarQube
+ * Copyright (C) 2009-2017 SonarSource SA
+ * mailto:info AT sonarsource DOT com
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 3 of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this program; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ */
+
+package org.sonar.ce.cluster;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.hazelcast.client.HazelcastClient;
+import com.hazelcast.client.config.ClientConfig;
+import com.hazelcast.core.HazelcastInstance;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.locks.Lock;
+import org.picocontainer.Startable;
+import org.sonar.api.config.Settings;
+import org.sonar.process.ProcessProperties;
+
+import static com.google.common.base.Preconditions.checkState;
+import static org.apache.commons.lang.StringUtils.isNotEmpty;
+import static org.sonar.process.cluster.ClusterObjectKeys.CLIENT_UUIDS;
+
+/**
+ * This class will connect as a Hazelcast client to the local instance of Hazelcluster
+ */
+public class HazelcastClientWrapperImpl implements Startable, HazelcastClientWrapper {
+
+ private final ClientConfig hzConfig;
+
+ @VisibleForTesting
+ protected HazelcastInstance hzInstance;
+
+ public HazelcastClientWrapperImpl(Settings settings) {
+ boolean clusterEnabled = settings.getBoolean(ProcessProperties.CLUSTER_ENABLED);
+ String clusterName = settings.getString(ProcessProperties.CLUSTER_NAME);
+ String clusterLocalEndPoint = settings.getString(ProcessProperties.CLUSTER_LOCALENDPOINT);
+
+ checkState(clusterEnabled, "Cluster is not enabled");
+ checkState(isNotEmpty(clusterLocalEndPoint), "LocalEndPoint have not been set");
+ checkState(isNotEmpty(clusterName), "sonar.cluster.name is missing");
+
+ hzConfig = new ClientConfig();
+ hzConfig.getGroupConfig().setName(clusterName);
+ hzConfig.getNetworkConfig().addAddress(clusterLocalEndPoint);
+
+ // Tweak HazelCast configuration
+ hzConfig
+ // Increase the number of tries
+ .setProperty("hazelcast.tcp.join.port.try.count", "10")
+ // Don't phone home
+ .setProperty("hazelcast.phone.home.enabled", "false")
+ // Use slf4j for logging
+ .setProperty("hazelcast.logging.type", "slf4j");
+ }
+
+ @Override
+ public <E> Set<E> getSet(String name) {
+ return hzInstance.getSet(name);
+ }
+
+ @Override
+ public <E> List<E> getList(String name) {
+ return hzInstance.getList(name);
+ }
+
+ @Override
+ public <K, V> Map<K, V> getMap(String name) {
+ return hzInstance.getMap(name);
+ }
+
+ @Override
+ public <K,V> Map<K,V> getReplicatedMap(String name) {
+ return hzInstance.getReplicatedMap(name);
+ }
+
+ @Override
+ public String getClientUUID() {
+ return hzInstance.getLocalEndpoint().getUuid();
+ }
+
+ @Override
+ public Set<String> getConnectedClients() {
+ return hzInstance.getSet(CLIENT_UUIDS);
+ }
+
+ @Override
+ public Lock getLock(String name) {
+ return hzInstance.getLock(name);
+ }
+
+ @Override
+ public void start() {
+ this.hzInstance = HazelcastClient.newHazelcastClient(hzConfig);
+ }
+
+ @Override
+ public void stop() {
+ // Shutdown Hazelcast properly
+ hzInstance.shutdown();
+ }
+}
--- /dev/null
+/*
+ * SonarQube
+ * Copyright (C) 2009-2017 SonarSource SA
+ * mailto:info AT sonarsource DOT com
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 3 of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this program; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ */
+@ParametersAreNonnullByDefault
+package org.sonar.ce.cluster;
+
+import javax.annotation.ParametersAreNonnullByDefault;
import org.sonar.ce.CeQueueModule;
import org.sonar.ce.CeTaskCommonsModule;
import org.sonar.ce.cleaning.CeCleaningModule;
+import org.sonar.ce.cluster.HazelcastClientWrapperImpl;
import org.sonar.ce.db.ReadOnlyPropertiesDao;
import org.sonar.ce.log.CeProcessLogging;
import org.sonar.ce.platform.ComputeEngineExtensionInstaller;
import org.sonar.ce.settings.ProjectSettingsFactory;
import org.sonar.ce.taskprocessor.CeTaskProcessorModule;
import org.sonar.ce.user.CeUserSession;
+import org.sonar.ce.CeDistributedInformationImpl;
+import org.sonar.ce.StandaloneCeDistributedInformation;
import org.sonar.core.component.DefaultResourceTypes;
import org.sonar.core.config.CorePropertyDefinitions;
import org.sonar.core.i18n.DefaultI18n;
this.level4 = level3.createChild();
this.level4.add(level4Components());
+
+ // TODO refactoring levelXComponents()
+ if (props.valueAsBoolean("sonar.cluster.enabled")) {
+ this.level4.add(
+ HazelcastClientWrapperImpl.class,
+ CeDistributedInformationImpl.class
+ );
+ } else {
+ this.level4.add(
+ StandaloneCeDistributedInformation.class
+ );
+ }
configureFromModules(this.level4);
ServerExtensionInstaller extensionInstaller = this.level4.getComponentByType(ServerExtensionInstaller.class);
extensionInstaller.installExtensions(this.level4);
import org.sonar.api.platform.ServerStartHandler;
import org.sonar.ce.cleaning.CeCleaningScheduler;
import org.sonar.ce.taskprocessor.CeProcessingScheduler;
+import org.sonar.ce.CeDistributedInformation;
/**
* Cleans-up the queue, initializes JMX counters then schedule
private final CeProcessingScheduler processingScheduler;
private final CeCleaningScheduler cleaningScheduler;
+ private final CeDistributedInformation ceDistributedInformation;
private boolean done = false;
- public CeQueueInitializer(CeProcessingScheduler processingScheduler, CeCleaningScheduler cleaningScheduler) {
+ public CeQueueInitializer(CeProcessingScheduler processingScheduler, CeCleaningScheduler cleaningScheduler,
+ CeDistributedInformation ceDistributedInformation) {
this.processingScheduler = processingScheduler;
this.cleaningScheduler = cleaningScheduler;
+ this.ceDistributedInformation = ceDistributedInformation;
}
@Override
}
private void initCe() {
+ ceDistributedInformation.broadcastWorkerUUIDs();
processingScheduler.startScheduling();
cleaningScheduler.startScheduling();
}
--- /dev/null
+/*
+ * SonarQube
+ * Copyright (C) 2009-2017 SonarSource SA
+ * mailto:info AT sonarsource DOT com
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 3 of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this program; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ */
+package org.sonar.ce;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import org.junit.Test;
+import org.sonar.ce.cluster.HazelcastClientWrapperImpl;
+import org.sonar.ce.taskprocessor.CeWorkerFactory;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.data.MapEntry.entry;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+import static org.sonar.process.cluster.ClusterObjectKeys.WORKER_UUIDS;
+
+public class CeDistributedInformationImplTest {
+ private String clientUUID1 = "1";
+ private String clientUUID2 = "2";
+ private String clientUUID3 = "3";
+ private Map workerMap = ImmutableMap.of(
+ clientUUID1, ImmutableSet.of("1", "2"),
+ clientUUID2, ImmutableSet.of("3"),
+ clientUUID3, ImmutableSet.of("4", "5", "6")
+ );
+
+ private HazelcastClientWrapperImpl hzClientWrapper = mock(HazelcastClientWrapperImpl.class);
+
+ @Test
+ public void getWorkerUUIDs_returns_union_of_workers_uuids_of_local_and_cluster_worker_uuids() {
+ when(hzClientWrapper.getClientUUID()).thenReturn(clientUUID1);
+ when(hzClientWrapper.getConnectedClients()).thenReturn(ImmutableSet.of(clientUUID1, clientUUID2, clientUUID3));
+ when(hzClientWrapper.getReplicatedMap(WORKER_UUIDS)).thenReturn(workerMap);
+
+ CeDistributedInformation ceDistributedInformation = new CeDistributedInformationImpl(hzClientWrapper, mock(CeWorkerFactory.class));
+ assertThat(ceDistributedInformation.getWorkerUUIDs()).containsExactly("1", "2", "3", "4", "5", "6");
+ }
+
+ @Test
+ public void getWorkerUUIDs_must_filter_absent_client() {
+ when(hzClientWrapper.getClientUUID()).thenReturn(clientUUID1);
+ when(hzClientWrapper.getConnectedClients()).thenReturn(ImmutableSet.of(clientUUID1, clientUUID2));
+ when(hzClientWrapper.getReplicatedMap(WORKER_UUIDS)).thenReturn(workerMap);
+
+ CeDistributedInformation ceDistributedInformation = new CeDistributedInformationImpl(hzClientWrapper, mock(CeWorkerFactory.class));
+ assertThat(ceDistributedInformation.getWorkerUUIDs()).containsExactly("1", "2", "3");
+ }
+
+ @Test
+ public void broadcastWorkerUUIDs_adds_local_workerUUIDs_to_shared_map_under_key_of_localendpoint_uuid() {
+ Set<String> connectedClients = new HashSet<>();
+ Map modifiableWorkerMap = new HashMap<>();
+ connectedClients.add(clientUUID1);
+ connectedClients.add(clientUUID2);
+
+ when(hzClientWrapper.getClientUUID()).thenReturn(clientUUID1);
+ when(hzClientWrapper.getConnectedClients()).thenReturn(connectedClients);
+ when(hzClientWrapper.getReplicatedMap(WORKER_UUIDS)).thenReturn(modifiableWorkerMap);
+
+ CeWorkerFactory ceWorkerFactory = mock(CeWorkerFactory.class);
+ when(ceWorkerFactory.getWorkerUUIDs()).thenReturn(ImmutableSet.of("a10", "a11"));
+ CeDistributedInformationImpl ceDistributedInformation = new CeDistributedInformationImpl(hzClientWrapper, ceWorkerFactory);
+
+ try {
+ ceDistributedInformation.broadcastWorkerUUIDs();
+ assertThat(modifiableWorkerMap).containsExactly(
+ entry(clientUUID1, ImmutableSet.of("a10", "a11"))
+ );
+ } finally {
+ ceDistributedInformation.stop();
+ }
+ }
+
+ @Test
+ public void stop_must_remove_local_workerUUIDs() {
+ Set<String> connectedClients = new HashSet<>();
+ connectedClients.add(clientUUID1);
+ connectedClients.add(clientUUID2);
+ connectedClients.add(clientUUID3);
+ Map modifiableWorkerMap = new HashMap();
+ modifiableWorkerMap.putAll(workerMap);
+
+ when(hzClientWrapper.getClientUUID()).thenReturn(clientUUID1);
+ when(hzClientWrapper.getConnectedClients()).thenReturn(connectedClients);
+ when(hzClientWrapper.getReplicatedMap(WORKER_UUIDS)).thenReturn(modifiableWorkerMap);
+
+ CeDistributedInformationImpl ceDistributedInformation = new CeDistributedInformationImpl(hzClientWrapper, mock(CeWorkerFactory.class));
+ ceDistributedInformation.stop();
+ assertThat(modifiableWorkerMap).containsExactly(
+ entry(clientUUID2, ImmutableSet.of("3")),
+ entry(clientUUID3, ImmutableSet.of("4", "5", "6"))
+ );
+ }
+}
--- /dev/null
+/*
+ * SonarQube
+ * Copyright (C) 2009-2017 SonarSource SA
+ * mailto:info AT sonarsource DOT com
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 3 of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this program; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ */
+package org.sonar.ce;
+
+import com.google.common.collect.ImmutableSet;
+import java.util.Set;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.sonar.ce.taskprocessor.CeWorkerFactory;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public class StandaloneCeDistributedInformationTest {
+ @Rule
+ public ExpectedException expectedException = ExpectedException.none();
+
+ @Test
+ public void broadcastWorkerUUIDs_must_retrieve_from_ceworkerfactory() {
+ CeWorkerFactory ceWorkerFactory = mock(CeWorkerFactory.class);
+ StandaloneCeDistributedInformation ceCluster = new StandaloneCeDistributedInformation(ceWorkerFactory);
+
+ ceCluster.broadcastWorkerUUIDs();
+ verify(ceWorkerFactory).getWorkerUUIDs();
+ }
+
+ @Test
+ public void getWorkerUUIDs_must_be_retrieved_from_ceworkerfactory() {
+ CeWorkerFactory ceWorkerFactory = mock(CeWorkerFactory.class);
+ Set<String> workerUUIDs = ImmutableSet.of("1", "2", "3");
+ when(ceWorkerFactory.getWorkerUUIDs()).thenReturn(workerUUIDs);
+ StandaloneCeDistributedInformation ceCluster = new StandaloneCeDistributedInformation(ceWorkerFactory);
+
+ ceCluster.broadcastWorkerUUIDs();
+ assertThat(ceCluster.getWorkerUUIDs()).isEqualTo(workerUUIDs);
+ }
+
+ @Test
+ public void when_broadcastWorkerUUIDs_is_not_called_getWorkerUUIDs_is_null() {
+ CeWorkerFactory ceWorkerFactory = mock(CeWorkerFactory.class);
+ Set<String> workerUUIDs = ImmutableSet.of("1", "2", "3");
+ when(ceWorkerFactory.getWorkerUUIDs()).thenReturn(workerUUIDs);
+ StandaloneCeDistributedInformation ceCluster = new StandaloneCeDistributedInformation(ceWorkerFactory);
+
+ expectedException.expect(IllegalStateException.class);
+ expectedException.expectMessage("Invalid call, broadcastWorkerUUIDs() must be called first.");
+
+ ceCluster.getWorkerUUIDs();
+ }
+}
--- /dev/null
+/*
+ * SonarQube
+ * Copyright (C) 2009-2017 SonarSource SA
+ * mailto:info AT sonarsource DOT com
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 3 of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this program; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ */
+
+package org.sonar.ce.cluster;
+
+import ch.qos.logback.classic.LoggerContext;
+import ch.qos.logback.classic.spi.ILoggingEvent;
+import ch.qos.logback.core.AppenderBase;
+import com.google.common.collect.ImmutableSet;
+import com.hazelcast.client.impl.HazelcastClientInstanceImpl;
+import com.hazelcast.client.impl.HazelcastClientProxy;
+import com.hazelcast.core.HazelcastInstance;
+import java.net.InetAddress;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import org.apache.commons.lang.RandomStringUtils;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.DisableOnDebug;
+import org.junit.rules.ExpectedException;
+import org.junit.rules.TestRule;
+import org.junit.rules.Timeout;
+import org.slf4j.LoggerFactory;
+import org.sonar.api.config.MapSettings;
+import org.sonar.api.config.PropertyDefinitions;
+import org.sonar.api.config.Settings;
+import org.sonar.process.NetworkUtils;
+import org.sonar.process.ProcessProperties;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.data.MapEntry.entry;
+
+public class HazelcastClientWrapperImplTest {
+ @Rule
+ public ExpectedException expectedException = ExpectedException.none();
+
+ @Rule
+ public TestRule safeGuard = new DisableOnDebug(Timeout.seconds(10));
+
+ private static HazelcastInstance hzCluster;
+ private static HazelcastClientWrapperImpl hzClient;
+
+ @BeforeClass
+ public static void setupHazelcastClusterAndHazelcastClient() {
+ int port = NetworkUtils.getNextAvailablePort(InetAddress.getLoopbackAddress());
+ hzCluster = HazelcastTestHelper.createHazelcastCluster("cluster_with_client", port);
+
+ Settings settings = createClusterSettings("cluster_with_client", "localhost:" + port);
+ hzClient = new HazelcastClientWrapperImpl(settings);
+ }
+
+ @AfterClass
+ public static void stopHazelcastClusterAndHazelcastClient() {
+ try {
+ hzClient.stop();
+ } catch (Exception e) {
+ // Ignore it
+ }
+ try {
+ hzCluster.shutdown();
+ } catch (Exception e) {
+ // Ignore it
+ }
+ }
+
+ @Test
+ public void start_throws_ISE_if_LOCALENDPOINT_is_incorrect() {
+ Settings settings = createClusterSettings("sonarqube", "\u4563\u1432\u1564");
+ HazelcastClientWrapperImpl hzClient = new HazelcastClientWrapperImpl(settings);
+
+ expectedException.expect(IllegalStateException.class);
+ expectedException.expectMessage("Unable to connect to any address in the config! The following addresses were tried:");
+
+ hzClient.start();
+ }
+
+ @Test
+ public void constructor_throws_ISE_if_LOCALENDPOINT_is_empty() {
+ Settings settings = createClusterSettings("sonarqube", "");
+
+ expectedException.expect(IllegalStateException.class);
+ expectedException.expectMessage("LocalEndPoint have not been set");
+
+ new HazelcastClientWrapperImpl(settings);
+ }
+
+ @Test
+ public void constructor_throws_ISE_if_CLUSTER_ENABLED_is_false() {
+ Settings settings = createClusterSettings("sonarqube", "localhost:9003");
+ settings.setProperty(ProcessProperties.CLUSTER_ENABLED, false);
+
+ expectedException.expect(IllegalStateException.class);
+ expectedException.expectMessage("Cluster is not enabled");
+
+ new HazelcastClientWrapperImpl(settings);
+ }
+
+ @Test
+ public void constructor_throws_ISE_if_missing_CLUSTER_ENABLED() {
+ Settings settings = createClusterSettings("sonarqube", "localhost:9003");
+ settings.removeProperty(ProcessProperties.CLUSTER_ENABLED);
+
+ expectedException.expect(IllegalStateException.class);
+ expectedException.expectMessage("Cluster is not enabled");
+
+ new HazelcastClientWrapperImpl(settings);
+ }
+
+ @Test
+ public void constructor_throws_ISE_if_missing_CLUSTER_NAME() {
+ Settings settings = createClusterSettings("sonarqube", "localhost:9003");
+ settings.removeProperty(ProcessProperties.CLUSTER_NAME);
+
+ expectedException.expect(IllegalStateException.class);
+ expectedException.expectMessage("sonar.cluster.name is missing");
+
+ new HazelcastClientWrapperImpl(settings);
+ }
+
+ @Test
+ public void constructor_throws_ISE_if_missing_CLUSTER_LOCALENDPOINT() {
+ Settings settings = createClusterSettings("sonarqube", "localhost:9003");
+ settings.removeProperty(ProcessProperties.CLUSTER_LOCALENDPOINT);
+
+ expectedException.expect(IllegalStateException.class);
+ expectedException.expectMessage("LocalEndPoint have not been set");
+
+ new HazelcastClientWrapperImpl(settings);
+ }
+
+ @Test
+ public void client_must_connect_to_hazelcast() {
+ int port = NetworkUtils.getNextAvailablePort(InetAddress.getLoopbackAddress());
+ // Launch a fake Hazelcast instance
+ HazelcastInstance hzInstance = HazelcastTestHelper.createHazelcastCluster("client_must_connect_to_hazelcast", port);
+ Settings settings = createClusterSettings("client_must_connect_to_hazelcast", "localhost:" + port);
+
+ HazelcastClientWrapperImpl hazelcastClientWrapperImpl = new HazelcastClientWrapperImpl(settings);
+ try {
+ hazelcastClientWrapperImpl.start();
+ assertThat(hazelcastClientWrapperImpl.getConnectedClients()).hasSize(1);
+ assertThat(hazelcastClientWrapperImpl.getClientUUID()).isNotEmpty();
+ } finally {
+ hazelcastClientWrapperImpl.stop();
+ }
+ }
+
+ @Test
+ public void client_must_be_able_to_set_ReplicatedMap_objects() throws InterruptedException {
+ try {
+ hzClient.start();
+
+ Set<String> setTest = new HashSet<>();
+ setTest.addAll(
+ Arrays.asList(RandomStringUtils.randomAlphanumeric(10), RandomStringUtils.randomAlphanumeric(10))
+ );
+ Map<String, Set<String>> replicatedMap = hzClient.getReplicatedMap("TEST1");
+ replicatedMap.put("KEY1", ImmutableSet.copyOf(setTest));
+ assertThat(hzCluster.getReplicatedMap("TEST1"))
+ .containsOnlyKeys("KEY1");
+ assertThat(hzCluster.getReplicatedMap("TEST1").get("KEY1"))
+ .isEqualTo(setTest);
+ } finally {
+ hzClient.stop();
+ }
+ }
+
+ @Test
+ public void client_must_be_able_to_retrieve_Set_objects() {
+ try {
+ hzClient.start();
+
+ // Set
+ Set<String> setTest = new HashSet<>();
+ setTest.addAll(Arrays.asList("8", "9"));
+ hzCluster.getSet("TEST1").addAll(setTest);
+ assertThat(hzClient.getSet("TEST1")).containsAll(setTest);
+ } finally {
+ hzClient.stop();
+ }
+ }
+
+ @Test
+ public void client_must_be_able_to_retrieve_List_objects() {
+ try {
+ hzClient.start();
+
+ // List
+ List<String> listTest = Arrays.asList("1", "2");
+ hzCluster.getList("TEST2").addAll(listTest);
+ assertThat(hzClient.getList("TEST2")).containsAll(listTest);
+ } finally {
+ hzClient.stop();
+ }
+ }
+
+ @Test
+ public void client_must_be_able_to_retrieve_Map_objects() {
+ try {
+ hzClient.start();
+
+ Map mapTest = new HashMap<>();
+ mapTest.put("a", Arrays.asList("123", "456"));
+ hzCluster.getMap("TEST3").putAll(mapTest);
+ assertThat(hzClient.getMap("TEST3")).containsExactly(
+ entry("a", Arrays.asList("123", "456"))
+ );
+ } finally {
+ hzClient.stop();
+ }
+ }
+
+ @Test
+ public void configuration_tweaks_of_hazelcast_must_be_present() {
+ try {
+ hzClient.start();
+ HazelcastClientInstanceImpl realClient = ((HazelcastClientProxy) hzClient.hzInstance).client;
+ assertThat(realClient.getClientConfig().getProperty("hazelcast.tcp.join.port.try.count")).isEqualTo("10");
+ assertThat(realClient.getClientConfig().getProperty("hazelcast.phone.home.enabled")).isEqualTo("false");
+ assertThat(realClient.getClientConfig().getProperty("hazelcast.logging.type")).isEqualTo("slf4j");
+ } finally {
+ hzClient.stop();
+ }
+ }
+
+ @Test
+ public void hazelcast_client_must_log_through_sl4fj() {
+ MemoryAppender<ILoggingEvent> memoryAppender = new MemoryAppender<>();
+ LoggerContext lc = (LoggerContext) LoggerFactory.getILoggerFactory();
+ lc.reset();
+ memoryAppender.setContext(lc);
+ memoryAppender.start();
+ lc.getLogger("com.hazelcast").addAppender(memoryAppender);
+
+ try {
+ hzClient.start();
+ } finally {
+ hzClient.stop();
+ memoryAppender.stop();
+ }
+ assertThat(memoryAppender.events).isNotEmpty();
+ memoryAppender.events.stream().forEach(
+ e -> assertThat(e.getLoggerName()).startsWith("com.hazelcast")
+ );
+ }
+
+ private static Settings createClusterSettings(String name, String localEndPoint) {
+ Properties properties = new Properties();
+ properties.setProperty(ProcessProperties.CLUSTER_NAME, name);
+ properties.setProperty(ProcessProperties.CLUSTER_LOCALENDPOINT, localEndPoint);
+ properties.setProperty(ProcessProperties.CLUSTER_ENABLED, "true");
+ return new MapSettings(new PropertyDefinitions()).addProperties(properties);
+ }
+
+ private class MemoryAppender<E> extends AppenderBase<E> {
+ private final List<E> events = new ArrayList();
+
+ @Override
+ protected void append(E eventObject) {
+ events.add(eventObject);
+ }
+ }
+}
--- /dev/null
+/*
+ * SonarQube
+ * Copyright (C) 2009-2017 SonarSource SA
+ * mailto:info AT sonarsource DOT com
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 3 of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this program; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ */
+
+package org.sonar.ce.cluster;
+
+import com.hazelcast.config.Config;
+import com.hazelcast.config.JoinConfig;
+import com.hazelcast.config.NetworkConfig;
+import com.hazelcast.core.Client;
+import com.hazelcast.core.ClientListener;
+import com.hazelcast.core.Hazelcast;
+import com.hazelcast.core.HazelcastInstance;
+import java.net.InetAddress;
+
+import static org.sonar.process.NetworkUtils.getHostName;
+import static org.sonar.process.cluster.ClusterObjectKeys.CLIENT_UUIDS;
+
+public class HazelcastTestHelper {
+
+ public static HazelcastInstance createHazelcastCluster(String clusterName, int port) {
+ Config hzConfig = new Config();
+ hzConfig.getGroupConfig().setName(clusterName);
+
+ // Configure the network instance
+ NetworkConfig netConfig = hzConfig.getNetworkConfig();
+ netConfig
+ .setPort(port)
+ .setReuseAddress(true);
+
+ netConfig.getInterfaces()
+ .setEnabled(true)
+ .addInterface(InetAddress.getLoopbackAddress().getHostAddress());
+
+ // Only allowing TCP/IP configuration
+ JoinConfig joinConfig = netConfig.getJoin();
+ joinConfig.getAwsConfig().setEnabled(false);
+ joinConfig.getMulticastConfig().setEnabled(false);
+ joinConfig.getTcpIpConfig().setEnabled(true);
+
+ // Tweak HazelCast configuration
+ hzConfig
+ // Increase the number of tries
+ .setProperty("hazelcast.tcp.join.port.try.count", "10")
+ // Don't bind on all interfaces
+ .setProperty("hazelcast.socket.bind.any", "false")
+ // Don't phone home
+ .setProperty("hazelcast.phone.home.enabled", "false")
+ // Use slf4j for logging
+ .setProperty("hazelcast.logging.type", "slf4j");
+
+ // Trying to resolve the hostname
+ hzConfig.getMemberAttributeConfig().setStringAttribute("HOSTNAME", getHostName());
+
+ // We are not using the partition group of Hazelcast, so disabling it
+ hzConfig.getPartitionGroupConfig().setEnabled(false);
+ HazelcastInstance hzInstance = Hazelcast.newHazelcastInstance(hzConfig);
+ hzInstance.getClientService().addClientListener(new ConnectedClientListener(hzInstance));
+ return hzInstance;
+ }
+
+ private static class ConnectedClientListener implements ClientListener {
+ private final HazelcastInstance hzInstance;
+
+ private ConnectedClientListener(HazelcastInstance hzInstance) {
+ this.hzInstance = hzInstance;
+ }
+
+ @Override
+ public void clientConnected(Client client) {
+ hzInstance.getSet(CLIENT_UUIDS).add(client.getUuid());
+ }
+
+ @Override
+ public void clientDisconnected(Client client) {
+ hzInstance.getSet(CLIENT_UUIDS).remove(client.getUuid());
+ }
+ }
+}
*/
package org.sonar.ce.container;
+import com.hazelcast.core.HazelcastInstance;
import java.io.File;
import java.io.IOException;
+import java.net.InetAddress;
import java.util.Date;
import java.util.Properties;
+import java.util.stream.Collectors;
import org.apache.commons.dbcp.BasicDataSource;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
+import org.picocontainer.ComponentAdapter;
import org.picocontainer.MutablePicoContainer;
import org.sonar.api.CoreProperties;
import org.sonar.api.database.DatabaseProperties;
import org.sonar.api.utils.DateUtils;
import org.sonar.api.utils.System2;
+import org.sonar.ce.cluster.HazelcastClientWrapperImpl;
+import org.sonar.ce.cluster.HazelcastTestHelper;
+import org.sonar.ce.CeDistributedInformationImpl;
+import org.sonar.ce.StandaloneCeDistributedInformation;
import org.sonar.db.DbTester;
import org.sonar.db.property.PropertyDto;
+import org.sonar.process.NetworkUtils;
import org.sonar.process.ProcessId;
import org.sonar.process.ProcessProperties;
import org.sonar.process.Props;
public class ComputeEngineContainerImplTest {
private static final int CONTAINER_ITSELF = 1;
private static final int COMPONENTS_IN_LEVEL_1_AT_CONSTRUCTION = CONTAINER_ITSELF + 1;
+ private static final String CLUSTER_NAME = "test";
@Rule
public TemporaryFolder tempFolder = new TemporaryFolder();
}
@Test
- public void real_start() throws IOException {
- Properties properties = ProcessProperties.defaults();
- File homeDir = tempFolder.newFolder();
- File dataDir = new File(homeDir, "data");
- File tmpDir = new File(homeDir, "tmp");
- properties.setProperty(PATH_HOME, homeDir.getAbsolutePath());
- properties.setProperty(PATH_DATA, dataDir.getAbsolutePath());
- properties.setProperty(PATH_TEMP, tmpDir.getAbsolutePath());
- properties.setProperty(PROPERTY_PROCESS_INDEX, valueOf(ProcessId.COMPUTE_ENGINE.getIpcIndex()));
- properties.setProperty(PROPERTY_SHARED_PATH, tmpDir.getAbsolutePath());
- properties.setProperty(DatabaseProperties.PROP_URL, ((BasicDataSource) dbTester.database().getDataSource()).getUrl());
- properties.setProperty(DatabaseProperties.PROP_USER, "sonar");
- properties.setProperty(DatabaseProperties.PROP_PASSWORD, "sonar");
+ public void real_start_with_cluster() throws IOException {
+ int port = NetworkUtils.getNextAvailablePort(InetAddress.getLoopbackAddress());
+ HazelcastInstance hzInstance = HazelcastTestHelper.createHazelcastCluster(CLUSTER_NAME, port);
+
+ Properties properties = getProperties();
+ properties.setProperty(ProcessProperties.CLUSTER_ENABLED, "true");
+ properties.setProperty(ProcessProperties.CLUSTER_LOCALENDPOINT, String.format("%s:%d", hzInstance.getCluster().getLocalMember().getAddress().getHost(), port));
+ properties.setProperty(ProcessProperties.CLUSTER_NAME, CLUSTER_NAME);
+
+ // required persisted properties
+ insertProperty(CoreProperties.SERVER_ID, "a_startup_id");
+ insertProperty(CoreProperties.SERVER_STARTTIME, DateUtils.formatDateTime(new Date()));
+
+ underTest
+ .start(new Props(properties));
+
+ MutablePicoContainer picoContainer = underTest.getComponentContainer().getPicoContainer();
+ assertThat(
+ picoContainer.getComponentAdapters().stream()
+ .map(ComponentAdapter::getComponentImplementation)
+ .collect(Collectors.toList())
+ ).contains((Class) HazelcastClientWrapperImpl.class,
+ (Class) CeDistributedInformationImpl.class
+ );
+ underTest.stop();
+ }
+
+ @Test
+ public void real_start_without_cluster() throws IOException {
+ Properties properties = getProperties();
// required persisted properties
insertProperty(CoreProperties.SERVER_ID, "a_startup_id");
+ 4 // content of ProjectAnalysisTaskModule
+ 4 // content of CeTaskProcessorModule
+ 3 // CeCleaningModule + its content
+ + 1 // CeDistributedInformation
);
assertThat(picoContainer.getParent().getComponentAdapters()).hasSize(
CONTAINER_ITSELF
+ 3 // content of EsSearchModule
+ 57 // content of CorePropertyDefinitions
);
+ assertThat(
+ picoContainer.getComponentAdapters().stream()
+ .map(ComponentAdapter::getComponentImplementation)
+ .collect(Collectors.toList())
+ ).doesNotContain((Class) HazelcastClientWrapperImpl.class,
+ (Class) CeDistributedInformationImpl.class
+ ).contains(
+ (Class) StandaloneCeDistributedInformation.class
+ );
assertThat(picoContainer.getParent().getParent().getParent().getParent()).isNull();
underTest.stop();
assertThat(picoContainer.getLifecycleState().isDisposed()).isTrue();
}
+ private Properties getProperties() throws IOException {
+ Properties properties = ProcessProperties.defaults();
+ File homeDir = tempFolder.newFolder();
+ File dataDir = new File(homeDir, "data");
+ File tmpDir = new File(homeDir, "tmp");
+ properties.setProperty(PATH_HOME, homeDir.getAbsolutePath());
+ properties.setProperty(PATH_DATA, dataDir.getAbsolutePath());
+ properties.setProperty(PATH_TEMP, tmpDir.getAbsolutePath());
+ properties.setProperty(PROPERTY_PROCESS_INDEX, valueOf(ProcessId.COMPUTE_ENGINE.getIpcIndex()));
+ properties.setProperty(PROPERTY_SHARED_PATH, tmpDir.getAbsolutePath());
+ properties.setProperty(DatabaseProperties.PROP_URL, ((BasicDataSource) dbTester.database().getDataSource()).getUrl());
+ properties.setProperty(DatabaseProperties.PROP_USER, "sonar");
+ properties.setProperty(DatabaseProperties.PROP_PASSWORD, "sonar");
+ return properties;
+ }
+
private void insertProperty(String key, String value) {
PropertyDto dto = new PropertyDto().setKey(key).setValue(value);
dbTester.getDbClient().propertiesDao().saveProperty(dbTester.getSession(), dto);
import org.junit.Test;
import org.sonar.api.platform.Server;
import org.sonar.ce.cleaning.CeCleaningScheduler;
+import org.sonar.ce.CeDistributedInformation;
import org.sonar.ce.taskprocessor.CeProcessingScheduler;
import static org.mockito.Mockito.mock;
private Server server = mock(Server.class);
private CeProcessingScheduler processingScheduler = mock(CeProcessingScheduler.class);
private CeCleaningScheduler cleaningScheduler = mock(CeCleaningScheduler.class);
- private CeQueueInitializer underTest = new CeQueueInitializer(processingScheduler, cleaningScheduler);
+ private CeQueueInitializer underTest = new CeQueueInitializer(processingScheduler, cleaningScheduler, mock(CeDistributedInformation.class));
@Test
public void clean_queue_then_start_scheduler_of_workers() throws IOException {
<groupId>com.hazelcast</groupId>
<artifactId>hazelcast</artifactId>
</dependency>
+ <!--
+ Required by our usage of Guava for clustering : CeWorkerFactoryImpl.getClusteredWorkerUUIDs()
+ -->
+ <dependency>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ </dependency>
<dependency>
<groupId>com.google.code.findbugs</groupId>
import org.sonar.application.AppStateListener;
import org.sonar.application.config.AppSettings;
import org.sonar.process.ProcessId;
+import org.sonar.process.ProcessProperties;
public class AppStateClusterImpl implements AppState {
private static Logger LOGGER = LoggerFactory.getLogger(AppStateClusterImpl.class);
}
hazelcastCluster = HazelcastCluster.create(clusterProperties);
+ // Add the local endpoint to be used by processes
+ appSettings.getProps().set(ProcessProperties.CLUSTER_LOCALENDPOINT, hazelcastCluster.getLocalEndPoint());
+ appSettings.getProps().set(ProcessProperties.CLUSTER_MEMBERUUID, hazelcastCluster.getLocalUUID());
String members = hazelcastCluster.getMembers().stream().collect(Collectors.joining(","));
LOGGER.info("Joined the cluster [{}] that contains the following hosts : [{}]", hazelcastCluster.getName(), members);
import com.hazelcast.config.Config;
import com.hazelcast.config.JoinConfig;
import com.hazelcast.config.NetworkConfig;
+import com.hazelcast.core.Client;
+import com.hazelcast.core.ClientListener;
import com.hazelcast.core.EntryEvent;
import com.hazelcast.core.EntryListener;
import com.hazelcast.core.Hazelcast;
import com.hazelcast.core.MapEvent;
import com.hazelcast.core.Member;
import com.hazelcast.core.ReplicatedMap;
+import com.hazelcast.nio.Address;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import static java.util.stream.Collectors.toList;
import static org.sonar.process.NetworkUtils.getHostName;
+import static org.sonar.process.cluster.ClusterObjectKeys.CLIENT_UUIDS;
+import static org.sonar.process.cluster.ClusterObjectKeys.HOSTNAME;
+import static org.sonar.process.cluster.ClusterObjectKeys.LEADER;
+import static org.sonar.process.cluster.ClusterObjectKeys.OPERATIONAL_PROCESSES;
+import static org.sonar.process.cluster.ClusterObjectKeys.SONARQUBE_VERSION;
public class HazelcastCluster implements AutoCloseable {
- static final String OPERATIONAL_PROCESSES = "OPERATIONAL_PROCESSES";
- static final String LEADER = "LEADER";
- static final String HOSTNAME = "HOSTNAME";
- static final String SONARQUBE_VERSION = "SONARQUBE_VERSION";
-
private final List<AppStateListener> listeners = new ArrayList<>();
private final ReplicatedMap<ClusterProcess, Boolean> operationalProcesses;
private final String operationalProcessListenerUUID;
+ private final String clientListenerUUID;
- final HazelcastInstance hzInstance;
+ protected final HazelcastInstance hzInstance;
private HazelcastCluster(Config hzConfig) {
// Create the Hazelcast instance
// Get or create the replicated map
operationalProcesses = hzInstance.getReplicatedMap(OPERATIONAL_PROCESSES);
operationalProcessListenerUUID = operationalProcesses.addEntryListener(new OperationalProcessListener());
+ clientListenerUUID = hzInstance.getClientService().addClientListener(new ConnectedClientListener());
}
- String getLocalUuid() {
+ String getLocalUUID() {
return hzInstance.getLocalEndpoint().getUuid();
}
}
void setOperational(ProcessId processId) {
- operationalProcesses.put(new ClusterProcess(getLocalUuid(), processId), Boolean.TRUE);
+ operationalProcesses.put(new ClusterProcess(getLocalUUID(), processId), Boolean.TRUE);
}
boolean tryToLockWebLeader() {
lock.lock();
try {
if (leader.get() == null) {
- leader.set(getLocalUuid());
+ leader.set(getLocalUUID());
return true;
} else {
return false;
if (hzInstance != null) {
// Removing listeners
operationalProcesses.removeEntryListener(operationalProcessListenerUUID);
+ hzInstance.getClientService().removeClientListener(clientListenerUUID);
// Removing the operationalProcess from the replicated map
operationalProcesses.keySet().forEach(
clusterNodeProcess -> {
- if (clusterNodeProcess.getNodeUuid().equals(getLocalUuid())) {
+ if (clusterNodeProcess.getNodeUuid().equals(getLocalUUID())) {
operationalProcesses.remove(clusterNodeProcess);
}
});
return Optional.empty();
}
+ String getLocalEndPoint() {
+ Address localAddress = hzInstance.getCluster().getLocalMember().getAddress();
+ return String.format("%s:%d", localAddress.getHost(), localAddress.getPort());
+ }
+
private class OperationalProcessListener implements EntryListener<ClusterProcess, Boolean> {
@Override
public void entryAdded(EntryEvent<ClusterProcess, Boolean> event) {
// Ignore it
}
}
+
+ private class ConnectedClientListener implements ClientListener {
+ @Override
+ public void clientConnected(Client client) {
+ hzInstance.getSet(CLIENT_UUIDS).add(client.getUuid());
+ }
+
+ @Override
+ public void clientDisconnected(Client client) {
+ hzInstance.getSet(CLIENT_UUIDS).remove(client.getUuid());
+ }
+ }
}
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.timeout;
import static org.mockito.Mockito.verify;
-import static org.sonar.application.cluster.HazelcastCluster.SONARQUBE_VERSION;
import static org.sonar.application.cluster.HazelcastTestHelper.createHazelcastClient;
import static org.sonar.application.cluster.HazelcastTestHelper.newClusterSettings;
+import static org.sonar.process.cluster.ClusterObjectKeys.SONARQUBE_VERSION;
public class AppStateClusterImplTest {
*/
package org.sonar.application.cluster;
+import ch.qos.logback.classic.LoggerContext;
+import ch.qos.logback.classic.spi.ILoggingEvent;
+import ch.qos.logback.core.AppenderBase;
import com.hazelcast.core.HazelcastInstance;
+import com.hazelcast.core.ItemEvent;
+import com.hazelcast.core.ItemListener;
import com.hazelcast.core.ReplicatedMap;
import java.net.InetAddress;
import java.util.AbstractMap;
+import java.util.ArrayList;
+import java.util.List;
import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.DisableOnDebug;
import org.junit.rules.ExpectedException;
import org.junit.rules.TestRule;
import org.junit.rules.Timeout;
+import org.slf4j.LoggerFactory;
import org.sonar.application.AppStateListener;
import org.sonar.application.config.TestAppSettings;
import org.sonar.process.NetworkUtils;
import org.sonar.process.ProcessId;
import org.sonar.process.ProcessProperties;
+import org.sonar.process.cluster.ClusterObjectKeys;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.timeout;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
-import static org.sonar.application.cluster.HazelcastCluster.LEADER;
-import static org.sonar.application.cluster.HazelcastCluster.OPERATIONAL_PROCESSES;
-import static org.sonar.application.cluster.HazelcastCluster.SONARQUBE_VERSION;
import static org.sonar.application.cluster.HazelcastTestHelper.createHazelcastClient;
import static org.sonar.application.cluster.HazelcastTestHelper.newClusterSettings;
import static org.sonar.process.ProcessProperties.CLUSTER_NAME;
+import static org.sonar.process.cluster.ClusterObjectKeys.LEADER;
+import static org.sonar.process.cluster.ClusterObjectKeys.OPERATIONAL_PROCESSES;
+import static org.sonar.process.cluster.ClusterObjectKeys.SONARQUBE_VERSION;
public class HazelcastClusterTest {
@Rule
HazelcastInstance hzInstance = createHazelcastClient(hzCluster);
ReplicatedMap<ClusterProcess, Boolean> operationalProcesses = hzInstance.getReplicatedMap(OPERATIONAL_PROCESSES);
assertThat(operationalProcesses)
- .containsExactly(new AbstractMap.SimpleEntry<>(new ClusterProcess(hzCluster.getLocalUuid(), ProcessId.ELASTICSEARCH), Boolean.TRUE));
+ .containsExactly(new AbstractMap.SimpleEntry<>(new ClusterProcess(hzCluster.getLocalUUID(), ProcessId.ELASTICSEARCH), Boolean.TRUE));
}
}
}
}
+ @Test
+ public void cluster_must_keep_a_list_of_clients() throws InterruptedException {
+ TestAppSettings testAppSettings = newClusterSettings();
+ testAppSettings.set(CLUSTER_NAME, "a_cluster_");
+ ClusterProperties clusterProperties = new ClusterProperties(testAppSettings);
+ try (HazelcastCluster hzCluster = HazelcastCluster.create(clusterProperties)) {
+ assertThat(hzCluster.hzInstance.getSet(ClusterObjectKeys.CLIENT_UUIDS)).isEmpty();
+ HazelcastInstance hzClient = HazelcastTestHelper.createHazelcastClient(hzCluster);
+ assertThat(hzCluster.hzInstance.getSet(ClusterObjectKeys.CLIENT_UUIDS)).containsExactly(hzClient.getLocalEndpoint().getUuid());
+
+ CountDownLatch latch = new CountDownLatch(1);
+ hzCluster.hzInstance.getSet(ClusterObjectKeys.CLIENT_UUIDS).addItemListener(new ItemListener<Object>() {
+ @Override
+ public void itemAdded(ItemEvent<Object> item) {
+ }
+
+ @Override
+ public void itemRemoved(ItemEvent<Object> item) {
+ latch.countDown();
+ }
+ }, false);
+
+ hzClient.shutdown();
+ latch.await(1, TimeUnit.SECONDS);
+
+ assertThat(hzCluster.hzInstance.getSet(ClusterObjectKeys.CLIENT_UUIDS)).isEmpty();
+ }
+ }
+
@Test
public void localUUID_must_not_be_empty() {
ClusterProperties clusterProperties = new ClusterProperties(newClusterSettings());
try (HazelcastCluster hzCluster = HazelcastCluster.create(clusterProperties)) {
- assertThat(hzCluster.getLocalUuid()).isNotEmpty();
+ assertThat(hzCluster.getLocalUUID()).isNotEmpty();
}
}
}
}
-
@Test
public void registerSonarQubeVersion_throws_ISE_if_initial_version_is_different() throws Exception {
ClusterProperties clusterProperties = new ClusterProperties(newClusterSettings());
hzInstance.shutdown();
}
}
+
+ @Test
+ public void hazelcast_must_log_through_sl4fj() {
+ MemoryAppender<ILoggingEvent> memoryAppender = new MemoryAppender<>();
+ LoggerContext lc = (LoggerContext) LoggerFactory.getILoggerFactory();
+ lc.reset();
+ memoryAppender.setContext(lc);
+ memoryAppender.start();
+ lc.getLogger("com.hazelcast").addAppender(memoryAppender);
+
+ try (AppStateClusterImpl appStateCluster = new AppStateClusterImpl(newClusterSettings())) {
+ }
+
+ assertThat(memoryAppender.events).isNotEmpty();
+ memoryAppender.events.stream().forEach(
+ e -> assertThat(e.getLoggerName()).startsWith("com.hazelcast")
+ );
+ }
+
+ private class MemoryAppender<E> extends AppenderBase<E> {
+ private final List<E> events = new ArrayList();
+
+ @Override
+ protected void append(E eventObject) {
+ events.add(eventObject);
+ }
+ }
+
+
+ @Test
+ public void configuration_tweaks_of_hazelcast_must_be_present() {
+ try (HazelcastCluster hzCluster = HazelcastCluster.create(new ClusterProperties(newClusterSettings()))) {
+ assertThat(hzCluster.hzInstance.getConfig().getProperty("hazelcast.tcp.join.port.try.count")).isEqualTo("10");
+ assertThat(hzCluster.hzInstance.getConfig().getProperty("hazelcast.phone.home.enabled")).isEqualTo("false");
+ assertThat(hzCluster.hzInstance.getConfig().getProperty("hazelcast.logging.type")).isEqualTo("slf4j");
+ assertThat(hzCluster.hzInstance.getConfig().getProperty("hazelcast.socket.bind.any")).isEqualTo("false");
+ }
+ }
}
return createHazelcastClient(appStateCluster.getHazelcastCluster());
}
-
static TestAppSettings newClusterSettings() {
TestAppSettings settings = new TestAppSettings();
settings.set(ProcessProperties.CLUSTER_ENABLED, "true");
settings.set(ProcessProperties.CLUSTER_NAME, "sonarqube");
return settings;
}
-
}
public static final String CLUSTER_NAME = "sonar.cluster.name";
public static final String HAZELCAST_LOG_LEVEL = "sonar.log.level.app.hazelcast";
public static final String CLUSTER_WEB_LEADER = "sonar.cluster.web.startupLeader";
+ // Internal property used by sonar-application to share the local endpoint of Hazelcast
+ public static final String CLUSTER_LOCALENDPOINT = "sonar.cluster.hazelcast.localEndPoint";
+ // Internal property used by sonar-application to share the local UUID of the Hazelcast member
+ public static final String CLUSTER_MEMBERUUID = "sonar.cluster.hazelcast.memberUUID";
public static final String JDBC_URL = "sonar.jdbc.url";
public static final String JDBC_DRIVER_PATH = "sonar.jdbc.driverPath";
--- /dev/null
+/*
+ * SonarQube
+ * Copyright (C) 2009-2017 SonarSource SA
+ * mailto:info AT sonarsource DOT com
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 3 of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this program; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ */
+
+package org.sonar.process.cluster;
+
+/**
+ * This class holds all object keys accessible via Hazelcast
+ */
+public final class ClusterObjectKeys {
+
+ private ClusterObjectKeys() {
+ // Holder for clustered objects
+ }
+
+ /**
+ * The key of replicated map that hold all operational processes
+ */
+ public static final String OPERATIONAL_PROCESSES = "OPERATIONAL_PROCESSES";
+ /**
+ * The key of atomic reference holding the leader UUID
+ */
+ public static final String LEADER = "LEADER";
+ /**
+ * The key of the hostname attribute of a member
+ */
+ public static final String HOSTNAME = "HOSTNAME";
+ /**
+ * The key of atomic reference holding the SonarQube version of the cluster
+ */
+ public static final String SONARQUBE_VERSION = "SONARQUBE_VERSION";
+ /**
+ * The key of the Set holding the UUIDs of clients
+ */
+ public static final String CLIENT_UUIDS = "CLIENT_UUIDS";
+ /**
+ * The key of replicated map holding the CeWorker UUIDs
+ */
+ public static final String WORKER_UUIDS = "WORKER_UUIDS";
+}
--- /dev/null
+/*
+ * SonarQube
+ * Copyright (C) 2009-2017 SonarSource SA
+ * mailto:info AT sonarsource DOT com
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 3 of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this program; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ */
+@ParametersAreNonnullByDefault
+package org.sonar.process.cluster;
+
+import javax.annotation.ParametersAreNonnullByDefault;
<configuration>
<rules>
<requireFilesSize>
- <minsize>126000000</minsize>
- <maxsize>134000000</maxsize>
+ <minsize>128000000</minsize>
+ <maxsize>136000000</maxsize>
<files>
<file>${project.build.directory}/sonarqube-${project.version}.zip</file>
</files>