From 13af1134fa44603308e5e40c5b3d20c626d38119 Mon Sep 17 00:00:00 2001 From: =?utf8?q?S=C3=A9bastien=20Lesaint?= Date: Wed, 5 Dec 2018 17:05:52 +0100 Subject: [PATCH] SONARCLOUD-192 add (enabled) worker uuids to CE MBean --- .../ce/CeDistributedInformationImpl.java | 5 +- .../StandaloneCeDistributedInformation.java | 5 +- .../org/sonar/ce/monitoring/CeTasksMBean.java | 6 ++ .../sonar/ce/monitoring/CeTasksMBeanImpl.java | 33 ++++++++- .../ce/taskprocessor/CeWorkerFactory.java | 7 +- .../ce/taskprocessor/CeWorkerFactoryImpl.java | 17 ++--- .../ce/CeDistributedInformationImplTest.java | 19 +++-- ...tandaloneCeDistributedInformationTest.java | 23 +++--- .../ce/monitoring/CeTasksMBeanImplTest.java | 71 ++++++++++++++++++- .../CeProcessingSchedulerImplTest.java | 2 +- .../CeWorkerFactoryImplTest.java | 15 ++-- 11 files changed, 162 insertions(+), 41 deletions(-) diff --git a/server/sonar-ce/src/main/java/org/sonar/ce/CeDistributedInformationImpl.java b/server/sonar-ce/src/main/java/org/sonar/ce/CeDistributedInformationImpl.java index 9880abf5956..bfc2eff67b0 100644 --- a/server/sonar-ce/src/main/java/org/sonar/ce/CeDistributedInformationImpl.java +++ b/server/sonar-ce/src/main/java/org/sonar/ce/CeDistributedInformationImpl.java @@ -26,6 +26,7 @@ import java.util.concurrent.locks.Lock; import org.apache.logging.log4j.Logger; import org.elasticsearch.common.logging.Loggers; import org.picocontainer.Startable; +import org.sonar.ce.taskprocessor.CeWorker; import org.sonar.ce.taskprocessor.CeWorkerFactory; import org.sonar.process.cluster.hz.HazelcastMember; import org.sonar.process.cluster.hz.HazelcastObjects; @@ -64,7 +65,9 @@ public class CeDistributedInformationImpl implements CeDistributedInformation, S @Override public void broadcastWorkerUUIDs() { - getClusteredWorkerUUIDs().put(hazelcastMember.getUuid(), ceCeWorkerFactory.getWorkerUUIDs()); + Set workers = ceCeWorkerFactory.getWorkers(); + Set workerUuids = workers.stream().map(CeWorker::getUUID).collect(toSet(workers.size())); + getClusteredWorkerUUIDs().put(hazelcastMember.getUuid(), workerUuids); } @Override diff --git a/server/sonar-ce/src/main/java/org/sonar/ce/StandaloneCeDistributedInformation.java b/server/sonar-ce/src/main/java/org/sonar/ce/StandaloneCeDistributedInformation.java index 4063ac676a9..c60b44d0027 100644 --- a/server/sonar-ce/src/main/java/org/sonar/ce/StandaloneCeDistributedInformation.java +++ b/server/sonar-ce/src/main/java/org/sonar/ce/StandaloneCeDistributedInformation.java @@ -23,9 +23,11 @@ import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; +import org.sonar.ce.taskprocessor.CeWorker; import org.sonar.ce.taskprocessor.CeWorkerFactory; import static com.google.common.base.Preconditions.checkState; +import static org.sonar.core.util.stream.MoreCollectors.toSet; /** * Provide the set of worker's UUID in a non clustered SonarQube instance @@ -48,7 +50,8 @@ public class StandaloneCeDistributedInformation implements CeDistributedInformat @Override public void broadcastWorkerUUIDs() { - workerUUIDs = ceCeWorkerFactory.getWorkerUUIDs(); + Set workers = ceCeWorkerFactory.getWorkers(); + workerUUIDs = workers.stream().map(CeWorker::getUUID).collect(toSet(workers.size())); } /** diff --git a/server/sonar-ce/src/main/java/org/sonar/ce/monitoring/CeTasksMBean.java b/server/sonar-ce/src/main/java/org/sonar/ce/monitoring/CeTasksMBean.java index 0c0df6deb2a..df5be4ad7cb 100644 --- a/server/sonar-ce/src/main/java/org/sonar/ce/monitoring/CeTasksMBean.java +++ b/server/sonar-ce/src/main/java/org/sonar/ce/monitoring/CeTasksMBean.java @@ -19,6 +19,8 @@ */ package org.sonar.ce.monitoring; +import java.util.List; + public interface CeTasksMBean { String OBJECT_NAME = "SonarQube:name=ComputeEngineTasks"; @@ -57,4 +59,8 @@ public interface CeTasksMBean { * Configured number of Workers. */ int getWorkerCount(); + + List getWorkerUuids(); + + List getEnabledWorkerUuids(); } diff --git a/server/sonar-ce/src/main/java/org/sonar/ce/monitoring/CeTasksMBeanImpl.java b/server/sonar-ce/src/main/java/org/sonar/ce/monitoring/CeTasksMBeanImpl.java index ed62ab4bb21..08cd1464c45 100644 --- a/server/sonar-ce/src/main/java/org/sonar/ce/monitoring/CeTasksMBeanImpl.java +++ b/server/sonar-ce/src/main/java/org/sonar/ce/monitoring/CeTasksMBeanImpl.java @@ -19,19 +19,29 @@ */ package org.sonar.ce.monitoring; +import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; import org.picocontainer.Startable; +import org.sonar.ce.configuration.CeConfiguration; +import org.sonar.ce.taskprocessor.CeWorker; +import org.sonar.ce.taskprocessor.CeWorkerFactory; +import org.sonar.ce.taskprocessor.EnabledCeWorkerController; import org.sonar.process.Jmx; import org.sonar.process.systeminfo.SystemInfoSection; import org.sonar.process.systeminfo.protobuf.ProtobufSystemInfo; -import org.sonar.ce.configuration.CeConfiguration; public class CeTasksMBeanImpl implements CeTasksMBean, Startable, SystemInfoSection { private final CEQueueStatus queueStatus; private final CeConfiguration ceConfiguration; + private final CeWorkerFactory ceWorkerFactory; + private final EnabledCeWorkerController enabledCeWorkerController; - public CeTasksMBeanImpl(CEQueueStatus queueStatus, CeConfiguration ceConfiguration) { + public CeTasksMBeanImpl(CEQueueStatus queueStatus, CeConfiguration ceConfiguration, CeWorkerFactory ceWorkerFactory, EnabledCeWorkerController enabledCeWorkerController) { this.queueStatus = queueStatus; this.ceConfiguration = ceConfiguration; + this.ceWorkerFactory = ceWorkerFactory; + this.enabledCeWorkerController = enabledCeWorkerController; } @Override @@ -82,6 +92,25 @@ public class CeTasksMBeanImpl implements CeTasksMBean, Startable, SystemInfoSect return ceConfiguration.getWorkerCount(); } + @Override + public List getWorkerUuids() { + Set workers = ceWorkerFactory.getWorkers(); + return workers.stream() + .map(CeWorker::getUUID) + .sorted() + .collect(Collectors.toList()); + } + + @Override + public List getEnabledWorkerUuids() { + Set workers = ceWorkerFactory.getWorkers(); + return workers.stream() + .filter(enabledCeWorkerController::isEnabled) + .map(CeWorker::getUUID) + .sorted() + .collect(Collectors.toList()); + } + @Override public ProtobufSystemInfo.Section toProtobuf() { ProtobufSystemInfo.Section.Builder builder = ProtobufSystemInfo.Section.newBuilder(); diff --git a/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeWorkerFactory.java b/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeWorkerFactory.java index 7ba1e5713c1..a51ff406c4b 100644 --- a/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeWorkerFactory.java +++ b/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeWorkerFactory.java @@ -27,14 +27,15 @@ import java.util.Set; public interface CeWorkerFactory { /** * Create a new CeWorker object with the specified ordinal. - * Each {@link CeWorker} returned by this method will have a different UUID from the others and all of these UUIDS will be returned by {@link #getWorkerUUIDs()}. + * Each {@link CeWorker} returned by this method will have a different UUID from the others. + * All returned {@link CeWorker} will be returned by {@link #getWorkers()}. * * @return the CeWorker */ CeWorker create(int ordinal); /** - * @return the UUIDs of each {@link CeWorker} object returned by {@link #create}. + * @return each {@link CeWorker} object returned by {@link #create}. */ - Set getWorkerUUIDs(); + Set getWorkers(); } diff --git a/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeWorkerFactoryImpl.java b/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeWorkerFactoryImpl.java index 8257efe686c..5ddff907822 100644 --- a/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeWorkerFactoryImpl.java +++ b/server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeWorkerFactoryImpl.java @@ -19,20 +19,20 @@ */ package org.sonar.ce.taskprocessor; -import java.util.HashSet; +import java.util.Collections; import java.util.Set; +import java.util.stream.Stream; import org.sonar.ce.queue.InternalCeQueue; import org.sonar.core.util.UuidFactory; - -import static com.google.common.collect.ImmutableSet.copyOf; +import org.sonar.core.util.stream.MoreCollectors; public class CeWorkerFactoryImpl implements CeWorkerFactory { private final UuidFactory uuidFactory; - private final Set ceWorkerUUIDs = new HashSet<>(); private final InternalCeQueue queue; private final CeTaskProcessorRepository taskProcessorRepository; private final EnabledCeWorkerController enabledCeWorkerController; private final CeWorker.ExecutionListener[] executionListeners; + private Set ceWorkers = Collections.emptySet(); /** * Used by Pico when there is no {@link CeWorker.ExecutionListener} in the container. @@ -55,12 +55,13 @@ public class CeWorkerFactoryImpl implements CeWorkerFactory { @Override public CeWorker create(int ordinal) { String uuid = uuidFactory.create(); - ceWorkerUUIDs.add(uuid); - return new CeWorkerImpl(ordinal, uuid, queue, taskProcessorRepository, enabledCeWorkerController, executionListeners); + CeWorkerImpl ceWorker = new CeWorkerImpl(ordinal, uuid, queue, taskProcessorRepository, enabledCeWorkerController, executionListeners); + ceWorkers = Stream.concat(ceWorkers.stream(), Stream.of(ceWorker)).collect(MoreCollectors.toSet(ceWorkers.size() + 1)); + return ceWorker; } @Override - public Set getWorkerUUIDs() { - return copyOf(ceWorkerUUIDs); + public Set getWorkers() { + return ceWorkers; } } diff --git a/server/sonar-ce/src/test/java/org/sonar/ce/CeDistributedInformationImplTest.java b/server/sonar-ce/src/test/java/org/sonar/ce/CeDistributedInformationImplTest.java index b020cc74600..67ebc8c76b6 100644 --- a/server/sonar-ce/src/test/java/org/sonar/ce/CeDistributedInformationImplTest.java +++ b/server/sonar-ce/src/test/java/org/sonar/ce/CeDistributedInformationImplTest.java @@ -25,7 +25,10 @@ import java.util.HashMap; import java.util.HashSet; import java.util.Map; import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.Stream; import org.junit.Test; +import org.sonar.ce.taskprocessor.CeWorker; import org.sonar.ce.taskprocessor.CeWorkerFactory; import org.sonar.process.cluster.hz.HazelcastMember; @@ -42,8 +45,7 @@ public class CeDistributedInformationImplTest { private Map workerMap = ImmutableMap.of( clientUUID1, ImmutableSet.of("1", "2"), clientUUID2, ImmutableSet.of("3"), - clientUUID3, ImmutableSet.of("4", "5", "6") - ); + clientUUID3, ImmutableSet.of("4", "5", "6")); private HazelcastMember hzClientWrapper = mock(HazelcastMember.class); @@ -79,14 +81,18 @@ public class CeDistributedInformationImplTest { when(hzClientWrapper.getReplicatedMap(WORKER_UUIDS)).thenReturn(modifiableWorkerMap); CeWorkerFactory ceWorkerFactory = mock(CeWorkerFactory.class); - when(ceWorkerFactory.getWorkerUUIDs()).thenReturn(ImmutableSet.of("a10", "a11")); + Set ceWorkers = Stream.of("a10", "a11").map(uuid -> { + CeWorker res = mock(CeWorker.class); + when(res.getUUID()).thenReturn(uuid); + return res; + }).collect(Collectors.toSet()); + when(ceWorkerFactory.getWorkers()).thenReturn(ceWorkers); CeDistributedInformationImpl ceDistributedInformation = new CeDistributedInformationImpl(hzClientWrapper, ceWorkerFactory); try { ceDistributedInformation.broadcastWorkerUUIDs(); assertThat(modifiableWorkerMap).containsExactly( - entry(clientUUID1, ImmutableSet.of("a10", "a11")) - ); + entry(clientUUID1, ImmutableSet.of("a10", "a11"))); } finally { ceDistributedInformation.stop(); } @@ -109,7 +115,6 @@ public class CeDistributedInformationImplTest { ceDistributedInformation.stop(); assertThat(modifiableWorkerMap).containsExactly( entry(clientUUID2, ImmutableSet.of("3")), - entry(clientUUID3, ImmutableSet.of("4", "5", "6")) - ); + entry(clientUUID3, ImmutableSet.of("4", "5", "6"))); } } diff --git a/server/sonar-ce/src/test/java/org/sonar/ce/StandaloneCeDistributedInformationTest.java b/server/sonar-ce/src/test/java/org/sonar/ce/StandaloneCeDistributedInformationTest.java index 62d4f641095..13bfa348fe2 100644 --- a/server/sonar-ce/src/test/java/org/sonar/ce/StandaloneCeDistributedInformationTest.java +++ b/server/sonar-ce/src/test/java/org/sonar/ce/StandaloneCeDistributedInformationTest.java @@ -20,14 +20,16 @@ package org.sonar.ce; import com.google.common.collect.ImmutableSet; +import java.util.Arrays; import java.util.Random; -import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Lock; +import java.util.stream.Collectors; import java.util.stream.IntStream; import org.junit.Rule; import org.junit.Test; import org.junit.rules.ExpectedException; +import org.sonar.ce.taskprocessor.CeWorker; import org.sonar.ce.taskprocessor.CeWorkerFactory; import static org.assertj.core.api.Assertions.assertThat; @@ -46,25 +48,30 @@ public class StandaloneCeDistributedInformationTest { StandaloneCeDistributedInformation ceCluster = new StandaloneCeDistributedInformation(ceWorkerFactory); ceCluster.broadcastWorkerUUIDs(); - verify(ceWorkerFactory).getWorkerUUIDs(); + + verify(ceWorkerFactory).getWorkers(); } @Test public void getWorkerUUIDs_must_be_retrieved_from_ceworkerfactory() { CeWorkerFactory ceWorkerFactory = mock(CeWorkerFactory.class); - Set workerUUIDs = ImmutableSet.of("1", "2", "3"); - when(ceWorkerFactory.getWorkerUUIDs()).thenReturn(workerUUIDs); + CeWorker[] ceWorkers = IntStream.range(0, new Random().nextInt(10)) + .mapToObj(i -> { + CeWorker ceWorker = mock(CeWorker.class); + when(ceWorker.getUUID()).thenReturn("uuid_" + i); + return ceWorker; + }) + .toArray(CeWorker[]::new); + when(ceWorkerFactory.getWorkers()).thenReturn(ImmutableSet.copyOf(ceWorkers)); StandaloneCeDistributedInformation ceCluster = new StandaloneCeDistributedInformation(ceWorkerFactory); ceCluster.broadcastWorkerUUIDs(); - assertThat(ceCluster.getWorkerUUIDs()).isEqualTo(workerUUIDs); + assertThat(ceCluster.getWorkerUUIDs()).isEqualTo(Arrays.stream(ceWorkers).map(CeWorker::getUUID).collect(Collectors.toSet())); } @Test - public void when_broadcastWorkerUUIDs_is_not_called_getWorkerUUIDs_is_null() { + public void getWorkerUUIDs_throws_ISE_if_broadcastWorkerUUIDs_has_not_been_called_before() { CeWorkerFactory ceWorkerFactory = mock(CeWorkerFactory.class); - Set workerUUIDs = ImmutableSet.of("1", "2", "3"); - when(ceWorkerFactory.getWorkerUUIDs()).thenReturn(workerUUIDs); StandaloneCeDistributedInformation ceCluster = new StandaloneCeDistributedInformation(ceWorkerFactory); expectedException.expect(IllegalStateException.class); diff --git a/server/sonar-ce/src/test/java/org/sonar/ce/monitoring/CeTasksMBeanImplTest.java b/server/sonar-ce/src/test/java/org/sonar/ce/monitoring/CeTasksMBeanImplTest.java index cada6ab59aa..18cc1c0462a 100644 --- a/server/sonar-ce/src/test/java/org/sonar/ce/monitoring/CeTasksMBeanImplTest.java +++ b/server/sonar-ce/src/test/java/org/sonar/ce/monitoring/CeTasksMBeanImplTest.java @@ -19,16 +19,30 @@ */ package org.sonar.ce.monitoring; +import com.google.common.collect.ImmutableSet; import java.lang.management.ManagementFactory; +import java.util.List; +import java.util.Random; +import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.IntStream; +import java.util.stream.Stream; import javax.annotation.CheckForNull; import javax.management.InstanceNotFoundException; import javax.management.ObjectInstance; import javax.management.ObjectName; +import org.apache.commons.lang.RandomStringUtils; import org.junit.Test; import org.sonar.ce.configuration.CeConfiguration; +import org.sonar.ce.taskprocessor.CeWorker; +import org.sonar.ce.taskprocessor.CeWorkerFactory; +import org.sonar.ce.taskprocessor.EnabledCeWorkerController; +import org.sonar.core.util.stream.MoreCollectors; import org.sonar.process.systeminfo.protobuf.ProtobufSystemInfo; import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; public class CeTasksMBeanImplTest { private static final long PENDING_COUNT = 2; @@ -38,8 +52,17 @@ public class CeTasksMBeanImplTest { private static final long PROCESSING_TIME = 987; private static final int WORKER_MAX_COUNT = 666; private static final int WORKER_COUNT = 56; - - private CeTasksMBeanImpl underTest = new CeTasksMBeanImpl(new DumbCEQueueStatus(), new DumbCeConfiguration()); + private static final Set WORKERS = IntStream.range(0, 2 + new Random().nextInt(10)) + .mapToObj(i -> RandomStringUtils.randomAlphabetic(15)) + .map(uuid -> { + CeWorker res = mock(CeWorker.class); + when(res.getUUID()).thenReturn(uuid); + return res; + }) + .collect(MoreCollectors.toSet()); + + private EnabledCeWorkerController enabledCeWorkerController = mock(EnabledCeWorkerController.class); + private CeTasksMBeanImpl underTest = new CeTasksMBeanImpl(new DumbCEQueueStatus(), new DumbCeConfiguration(), new DumbCeWorkerFactory(), enabledCeWorkerController); @Test public void register_and_unregister() throws Exception { @@ -84,12 +107,44 @@ public class CeTasksMBeanImplTest { assertThat(underTest.getWorkerMaxCount()).isEqualTo(WORKER_MAX_COUNT); } + @Test + public void getWorkerUuids_returns_ordered_list_of_uuids_of_worker_from_CeWorkerFactory_instance() { + List workerUuids = underTest.getWorkerUuids(); + + assertThat(workerUuids).isEqualTo(WORKERS.stream().map(CeWorker::getUUID).sorted().collect(Collectors.toList())); + // ImmutableSet can not be serialized + assertThat(workerUuids).isNotInstanceOf(ImmutableSet.class); + } + + @Test + public void getEnabledWorkerUuids_returns_ordered_list_of_uuids_of_worker_from_CeWorkerFactory_instance_filtered_on_enabled_ones() { + int enabledWorkerCount = new Random().nextInt(WORKERS.size()); + int i = 0; + CeWorker[] enabledWorkers = new CeWorker[enabledWorkerCount]; + for (CeWorker worker : WORKERS) { + if (i < enabledWorkerCount) { + enabledWorkers[i] = worker; + when(enabledCeWorkerController.isEnabled(worker)).thenReturn(true); + } else { + when(enabledCeWorkerController.isEnabled(worker)).thenReturn(false); + } + i++; + } + + List enabledWorkerUuids = underTest.getEnabledWorkerUuids(); + + assertThat(enabledWorkerUuids).isEqualTo(Stream.of(enabledWorkers).map(CeWorker::getUUID).sorted().collect(Collectors.toList())); + // ImmutableSet can not be serialized + assertThat(enabledWorkerUuids).isNotInstanceOf(ImmutableSet.class); + } + @Test public void export_system_info() { ProtobufSystemInfo.Section section = underTest.toProtobuf(); assertThat(section.getName()).isEqualTo("Compute Engine Tasks"); assertThat(section.getAttributesCount()).isEqualTo(8); } + private static class DumbCEQueueStatus implements CEQueueStatus { @Override @@ -175,4 +230,16 @@ public class CeTasksMBeanImplTest { } } + + private static class DumbCeWorkerFactory implements CeWorkerFactory { + @Override + public CeWorker create(int ordinal) { + throw new UnsupportedOperationException("create should not be called"); + } + + @Override + public Set getWorkers() { + return WORKERS; + } + } } diff --git a/server/sonar-ce/src/test/java/org/sonar/ce/taskprocessor/CeProcessingSchedulerImplTest.java b/server/sonar-ce/src/test/java/org/sonar/ce/taskprocessor/CeProcessingSchedulerImplTest.java index 096d31fae3f..9c1f68683e1 100644 --- a/server/sonar-ce/src/test/java/org/sonar/ce/taskprocessor/CeProcessingSchedulerImplTest.java +++ b/server/sonar-ce/src/test/java/org/sonar/ce/taskprocessor/CeProcessingSchedulerImplTest.java @@ -257,7 +257,7 @@ public class CeProcessingSchedulerImplTest { } @Override - public Set getWorkerUUIDs() { + public Set getWorkers() { return emptySet(); } } diff --git a/server/sonar-ce/src/test/java/org/sonar/ce/taskprocessor/CeWorkerFactoryImplTest.java b/server/sonar-ce/src/test/java/org/sonar/ce/taskprocessor/CeWorkerFactoryImplTest.java index b61edd3b1d6..4f0603ef59f 100644 --- a/server/sonar-ce/src/test/java/org/sonar/ce/taskprocessor/CeWorkerFactoryImplTest.java +++ b/server/sonar-ce/src/test/java/org/sonar/ce/taskprocessor/CeWorkerFactoryImplTest.java @@ -91,18 +91,17 @@ public class CeWorkerFactoryImplTest { } @Test - public void CeWorkerFactory_has_an_empty_set_of_uuids_when_created() { - assertThat(underTest.getWorkerUUIDs()).isEmpty(); + public void getWorkers_returns_empty_if_create_has_not_been_called_before() { + assertThat(underTest.getWorkers()).isEmpty(); } @Test - public void CeWorkerFactory_must_returns_the_uuids_of_worker() { - Set ceWorkerUUIDs = new HashSet<>(); - - for (int i = 0; i < 10; i++) { - ceWorkerUUIDs.add(underTest.create(i).getUUID()); + public void CeWorkerFactory_must_returns_the_workers_returned_by_created() { + Set expected = new HashSet<>(); + for (int i = 0; i < 1 + new Random().nextInt(10); i++) { + expected.add(underTest.create(i)); } - assertThat(underTest.getWorkerUUIDs()).isEqualTo(ceWorkerUUIDs); + assertThat(underTest.getWorkers()).isEqualTo(expected); } } -- 2.39.5