import org.apache.logging.log4j.Logger;
import org.elasticsearch.common.logging.Loggers;
import org.picocontainer.Startable;
+import org.sonar.ce.taskprocessor.CeWorker;
import org.sonar.ce.taskprocessor.CeWorkerFactory;
import org.sonar.process.cluster.hz.HazelcastMember;
import org.sonar.process.cluster.hz.HazelcastObjects;
@Override
public void broadcastWorkerUUIDs() {
- getClusteredWorkerUUIDs().put(hazelcastMember.getUuid(), ceCeWorkerFactory.getWorkerUUIDs());
+ Set<CeWorker> workers = ceCeWorkerFactory.getWorkers();
+ Set<String> workerUuids = workers.stream().map(CeWorker::getUUID).collect(toSet(workers.size()));
+ getClusteredWorkerUUIDs().put(hazelcastMember.getUuid(), workerUuids);
}
@Override
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
+import org.sonar.ce.taskprocessor.CeWorker;
import org.sonar.ce.taskprocessor.CeWorkerFactory;
import static com.google.common.base.Preconditions.checkState;
+import static org.sonar.core.util.stream.MoreCollectors.toSet;
/**
* Provide the set of worker's UUID in a non clustered SonarQube instance
@Override
public void broadcastWorkerUUIDs() {
- workerUUIDs = ceCeWorkerFactory.getWorkerUUIDs();
+ Set<CeWorker> workers = ceCeWorkerFactory.getWorkers();
+ workerUUIDs = workers.stream().map(CeWorker::getUUID).collect(toSet(workers.size()));
}
/**
*/
package org.sonar.ce.monitoring;
+import java.util.List;
+
public interface CeTasksMBean {
String OBJECT_NAME = "SonarQube:name=ComputeEngineTasks";
* Configured number of Workers.
*/
int getWorkerCount();
+
+ List<String> getWorkerUuids();
+
+ List<String> getEnabledWorkerUuids();
}
*/
package org.sonar.ce.monitoring;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
import org.picocontainer.Startable;
+import org.sonar.ce.configuration.CeConfiguration;
+import org.sonar.ce.taskprocessor.CeWorker;
+import org.sonar.ce.taskprocessor.CeWorkerFactory;
+import org.sonar.ce.taskprocessor.EnabledCeWorkerController;
import org.sonar.process.Jmx;
import org.sonar.process.systeminfo.SystemInfoSection;
import org.sonar.process.systeminfo.protobuf.ProtobufSystemInfo;
-import org.sonar.ce.configuration.CeConfiguration;
public class CeTasksMBeanImpl implements CeTasksMBean, Startable, SystemInfoSection {
private final CEQueueStatus queueStatus;
private final CeConfiguration ceConfiguration;
+ private final CeWorkerFactory ceWorkerFactory;
+ private final EnabledCeWorkerController enabledCeWorkerController;
- public CeTasksMBeanImpl(CEQueueStatus queueStatus, CeConfiguration ceConfiguration) {
+ public CeTasksMBeanImpl(CEQueueStatus queueStatus, CeConfiguration ceConfiguration, CeWorkerFactory ceWorkerFactory, EnabledCeWorkerController enabledCeWorkerController) {
this.queueStatus = queueStatus;
this.ceConfiguration = ceConfiguration;
+ this.ceWorkerFactory = ceWorkerFactory;
+ this.enabledCeWorkerController = enabledCeWorkerController;
}
@Override
return ceConfiguration.getWorkerCount();
}
+ @Override
+ public List<String> getWorkerUuids() {
+ Set<CeWorker> workers = ceWorkerFactory.getWorkers();
+ return workers.stream()
+ .map(CeWorker::getUUID)
+ .sorted()
+ .collect(Collectors.toList());
+ }
+
+ @Override
+ public List<String> getEnabledWorkerUuids() {
+ Set<CeWorker> workers = ceWorkerFactory.getWorkers();
+ return workers.stream()
+ .filter(enabledCeWorkerController::isEnabled)
+ .map(CeWorker::getUUID)
+ .sorted()
+ .collect(Collectors.toList());
+ }
+
@Override
public ProtobufSystemInfo.Section toProtobuf() {
ProtobufSystemInfo.Section.Builder builder = ProtobufSystemInfo.Section.newBuilder();
public interface CeWorkerFactory {
/**
* Create a new CeWorker object with the specified ordinal.
- * Each {@link CeWorker} returned by this method will have a different UUID from the others and all of these UUIDS will be returned by {@link #getWorkerUUIDs()}.
+ * Each {@link CeWorker} returned by this method will have a different UUID from the others.
+ * All returned {@link CeWorker} will be returned by {@link #getWorkers()}.
*
* @return the CeWorker
*/
CeWorker create(int ordinal);
/**
- * @return the UUIDs of each {@link CeWorker} object returned by {@link #create}.
+ * @return each {@link CeWorker} object returned by {@link #create}.
*/
- Set<String> getWorkerUUIDs();
+ Set<CeWorker> getWorkers();
}
*/
package org.sonar.ce.taskprocessor;
-import java.util.HashSet;
+import java.util.Collections;
import java.util.Set;
+import java.util.stream.Stream;
import org.sonar.ce.queue.InternalCeQueue;
import org.sonar.core.util.UuidFactory;
-
-import static com.google.common.collect.ImmutableSet.copyOf;
+import org.sonar.core.util.stream.MoreCollectors;
public class CeWorkerFactoryImpl implements CeWorkerFactory {
private final UuidFactory uuidFactory;
- private final Set<String> ceWorkerUUIDs = new HashSet<>();
private final InternalCeQueue queue;
private final CeTaskProcessorRepository taskProcessorRepository;
private final EnabledCeWorkerController enabledCeWorkerController;
private final CeWorker.ExecutionListener[] executionListeners;
+ private Set<CeWorker> ceWorkers = Collections.emptySet();
/**
* Used by Pico when there is no {@link CeWorker.ExecutionListener} in the container.
@Override
public CeWorker create(int ordinal) {
String uuid = uuidFactory.create();
- ceWorkerUUIDs.add(uuid);
- return new CeWorkerImpl(ordinal, uuid, queue, taskProcessorRepository, enabledCeWorkerController, executionListeners);
+ CeWorkerImpl ceWorker = new CeWorkerImpl(ordinal, uuid, queue, taskProcessorRepository, enabledCeWorkerController, executionListeners);
+ ceWorkers = Stream.concat(ceWorkers.stream(), Stream.of(ceWorker)).collect(MoreCollectors.toSet(ceWorkers.size() + 1));
+ return ceWorker;
}
@Override
- public Set<String> getWorkerUUIDs() {
- return copyOf(ceWorkerUUIDs);
+ public Set<CeWorker> getWorkers() {
+ return ceWorkers;
}
}
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
import org.junit.Test;
+import org.sonar.ce.taskprocessor.CeWorker;
import org.sonar.ce.taskprocessor.CeWorkerFactory;
import org.sonar.process.cluster.hz.HazelcastMember;
private Map workerMap = ImmutableMap.of(
clientUUID1, ImmutableSet.of("1", "2"),
clientUUID2, ImmutableSet.of("3"),
- clientUUID3, ImmutableSet.of("4", "5", "6")
- );
+ clientUUID3, ImmutableSet.of("4", "5", "6"));
private HazelcastMember hzClientWrapper = mock(HazelcastMember.class);
when(hzClientWrapper.getReplicatedMap(WORKER_UUIDS)).thenReturn(modifiableWorkerMap);
CeWorkerFactory ceWorkerFactory = mock(CeWorkerFactory.class);
- when(ceWorkerFactory.getWorkerUUIDs()).thenReturn(ImmutableSet.of("a10", "a11"));
+ Set<CeWorker> ceWorkers = Stream.of("a10", "a11").map(uuid -> {
+ CeWorker res = mock(CeWorker.class);
+ when(res.getUUID()).thenReturn(uuid);
+ return res;
+ }).collect(Collectors.toSet());
+ when(ceWorkerFactory.getWorkers()).thenReturn(ceWorkers);
CeDistributedInformationImpl ceDistributedInformation = new CeDistributedInformationImpl(hzClientWrapper, ceWorkerFactory);
try {
ceDistributedInformation.broadcastWorkerUUIDs();
assertThat(modifiableWorkerMap).containsExactly(
- entry(clientUUID1, ImmutableSet.of("a10", "a11"))
- );
+ entry(clientUUID1, ImmutableSet.of("a10", "a11")));
} finally {
ceDistributedInformation.stop();
}
ceDistributedInformation.stop();
assertThat(modifiableWorkerMap).containsExactly(
entry(clientUUID2, ImmutableSet.of("3")),
- entry(clientUUID3, ImmutableSet.of("4", "5", "6"))
- );
+ entry(clientUUID3, ImmutableSet.of("4", "5", "6")));
}
}
package org.sonar.ce;
import com.google.common.collect.ImmutableSet;
+import java.util.Arrays;
import java.util.Random;
-import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
+import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
+import org.sonar.ce.taskprocessor.CeWorker;
import org.sonar.ce.taskprocessor.CeWorkerFactory;
import static org.assertj.core.api.Assertions.assertThat;
StandaloneCeDistributedInformation ceCluster = new StandaloneCeDistributedInformation(ceWorkerFactory);
ceCluster.broadcastWorkerUUIDs();
- verify(ceWorkerFactory).getWorkerUUIDs();
+
+ verify(ceWorkerFactory).getWorkers();
}
@Test
public void getWorkerUUIDs_must_be_retrieved_from_ceworkerfactory() {
CeWorkerFactory ceWorkerFactory = mock(CeWorkerFactory.class);
- Set<String> workerUUIDs = ImmutableSet.of("1", "2", "3");
- when(ceWorkerFactory.getWorkerUUIDs()).thenReturn(workerUUIDs);
+ CeWorker[] ceWorkers = IntStream.range(0, new Random().nextInt(10))
+ .mapToObj(i -> {
+ CeWorker ceWorker = mock(CeWorker.class);
+ when(ceWorker.getUUID()).thenReturn("uuid_" + i);
+ return ceWorker;
+ })
+ .toArray(CeWorker[]::new);
+ when(ceWorkerFactory.getWorkers()).thenReturn(ImmutableSet.copyOf(ceWorkers));
StandaloneCeDistributedInformation ceCluster = new StandaloneCeDistributedInformation(ceWorkerFactory);
ceCluster.broadcastWorkerUUIDs();
- assertThat(ceCluster.getWorkerUUIDs()).isEqualTo(workerUUIDs);
+ assertThat(ceCluster.getWorkerUUIDs()).isEqualTo(Arrays.stream(ceWorkers).map(CeWorker::getUUID).collect(Collectors.toSet()));
}
@Test
- public void when_broadcastWorkerUUIDs_is_not_called_getWorkerUUIDs_is_null() {
+ public void getWorkerUUIDs_throws_ISE_if_broadcastWorkerUUIDs_has_not_been_called_before() {
CeWorkerFactory ceWorkerFactory = mock(CeWorkerFactory.class);
- Set<String> workerUUIDs = ImmutableSet.of("1", "2", "3");
- when(ceWorkerFactory.getWorkerUUIDs()).thenReturn(workerUUIDs);
StandaloneCeDistributedInformation ceCluster = new StandaloneCeDistributedInformation(ceWorkerFactory);
expectedException.expect(IllegalStateException.class);
*/
package org.sonar.ce.monitoring;
+import com.google.common.collect.ImmutableSet;
import java.lang.management.ManagementFactory;
+import java.util.List;
+import java.util.Random;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import java.util.stream.Stream;
import javax.annotation.CheckForNull;
import javax.management.InstanceNotFoundException;
import javax.management.ObjectInstance;
import javax.management.ObjectName;
+import org.apache.commons.lang.RandomStringUtils;
import org.junit.Test;
import org.sonar.ce.configuration.CeConfiguration;
+import org.sonar.ce.taskprocessor.CeWorker;
+import org.sonar.ce.taskprocessor.CeWorkerFactory;
+import org.sonar.ce.taskprocessor.EnabledCeWorkerController;
+import org.sonar.core.util.stream.MoreCollectors;
import org.sonar.process.systeminfo.protobuf.ProtobufSystemInfo;
import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
public class CeTasksMBeanImplTest {
private static final long PENDING_COUNT = 2;
private static final long PROCESSING_TIME = 987;
private static final int WORKER_MAX_COUNT = 666;
private static final int WORKER_COUNT = 56;
-
- private CeTasksMBeanImpl underTest = new CeTasksMBeanImpl(new DumbCEQueueStatus(), new DumbCeConfiguration());
+ private static final Set<CeWorker> WORKERS = IntStream.range(0, 2 + new Random().nextInt(10))
+ .mapToObj(i -> RandomStringUtils.randomAlphabetic(15))
+ .map(uuid -> {
+ CeWorker res = mock(CeWorker.class);
+ when(res.getUUID()).thenReturn(uuid);
+ return res;
+ })
+ .collect(MoreCollectors.toSet());
+
+ private EnabledCeWorkerController enabledCeWorkerController = mock(EnabledCeWorkerController.class);
+ private CeTasksMBeanImpl underTest = new CeTasksMBeanImpl(new DumbCEQueueStatus(), new DumbCeConfiguration(), new DumbCeWorkerFactory(), enabledCeWorkerController);
@Test
public void register_and_unregister() throws Exception {
assertThat(underTest.getWorkerMaxCount()).isEqualTo(WORKER_MAX_COUNT);
}
+ @Test
+ public void getWorkerUuids_returns_ordered_list_of_uuids_of_worker_from_CeWorkerFactory_instance() {
+ List<String> workerUuids = underTest.getWorkerUuids();
+
+ assertThat(workerUuids).isEqualTo(WORKERS.stream().map(CeWorker::getUUID).sorted().collect(Collectors.toList()));
+ // ImmutableSet can not be serialized
+ assertThat(workerUuids).isNotInstanceOf(ImmutableSet.class);
+ }
+
+ @Test
+ public void getEnabledWorkerUuids_returns_ordered_list_of_uuids_of_worker_from_CeWorkerFactory_instance_filtered_on_enabled_ones() {
+ int enabledWorkerCount = new Random().nextInt(WORKERS.size());
+ int i = 0;
+ CeWorker[] enabledWorkers = new CeWorker[enabledWorkerCount];
+ for (CeWorker worker : WORKERS) {
+ if (i < enabledWorkerCount) {
+ enabledWorkers[i] = worker;
+ when(enabledCeWorkerController.isEnabled(worker)).thenReturn(true);
+ } else {
+ when(enabledCeWorkerController.isEnabled(worker)).thenReturn(false);
+ }
+ i++;
+ }
+
+ List<String> enabledWorkerUuids = underTest.getEnabledWorkerUuids();
+
+ assertThat(enabledWorkerUuids).isEqualTo(Stream.of(enabledWorkers).map(CeWorker::getUUID).sorted().collect(Collectors.toList()));
+ // ImmutableSet can not be serialized
+ assertThat(enabledWorkerUuids).isNotInstanceOf(ImmutableSet.class);
+ }
+
@Test
public void export_system_info() {
ProtobufSystemInfo.Section section = underTest.toProtobuf();
assertThat(section.getName()).isEqualTo("Compute Engine Tasks");
assertThat(section.getAttributesCount()).isEqualTo(8);
}
+
private static class DumbCEQueueStatus implements CEQueueStatus {
@Override
}
}
+
+ private static class DumbCeWorkerFactory implements CeWorkerFactory {
+ @Override
+ public CeWorker create(int ordinal) {
+ throw new UnsupportedOperationException("create should not be called");
+ }
+
+ @Override
+ public Set<CeWorker> getWorkers() {
+ return WORKERS;
+ }
+ }
}
}
@Override
- public Set<String> getWorkerUUIDs() {
+ public Set<CeWorker> getWorkers() {
return emptySet();
}
}
}
@Test
- public void CeWorkerFactory_has_an_empty_set_of_uuids_when_created() {
- assertThat(underTest.getWorkerUUIDs()).isEmpty();
+ public void getWorkers_returns_empty_if_create_has_not_been_called_before() {
+ assertThat(underTest.getWorkers()).isEmpty();
}
@Test
- public void CeWorkerFactory_must_returns_the_uuids_of_worker() {
- Set<String> ceWorkerUUIDs = new HashSet<>();
-
- for (int i = 0; i < 10; i++) {
- ceWorkerUUIDs.add(underTest.create(i).getUUID());
+ public void CeWorkerFactory_must_returns_the_workers_returned_by_created() {
+ Set<CeWorker> expected = new HashSet<>();
+ for (int i = 0; i < 1 + new Random().nextInt(10); i++) {
+ expected.add(underTest.create(i));
}
- assertThat(underTest.getWorkerUUIDs()).isEqualTo(ceWorkerUUIDs);
+ assertThat(underTest.getWorkers()).isEqualTo(expected);
}
}