]> source.dussan.org Git - sonarqube.git/commitdiff
SONARCLOUD-192 add (enabled) worker uuids to CE MBean
authorSébastien Lesaint <sebastien.lesaint@sonarsource.com>
Wed, 5 Dec 2018 16:05:52 +0000 (17:05 +0100)
committerSonarTech <sonartech@sonarsource.com>
Fri, 21 Dec 2018 19:21:02 +0000 (20:21 +0100)
server/sonar-ce/src/main/java/org/sonar/ce/CeDistributedInformationImpl.java
server/sonar-ce/src/main/java/org/sonar/ce/StandaloneCeDistributedInformation.java
server/sonar-ce/src/main/java/org/sonar/ce/monitoring/CeTasksMBean.java
server/sonar-ce/src/main/java/org/sonar/ce/monitoring/CeTasksMBeanImpl.java
server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeWorkerFactory.java
server/sonar-ce/src/main/java/org/sonar/ce/taskprocessor/CeWorkerFactoryImpl.java
server/sonar-ce/src/test/java/org/sonar/ce/CeDistributedInformationImplTest.java
server/sonar-ce/src/test/java/org/sonar/ce/StandaloneCeDistributedInformationTest.java
server/sonar-ce/src/test/java/org/sonar/ce/monitoring/CeTasksMBeanImplTest.java
server/sonar-ce/src/test/java/org/sonar/ce/taskprocessor/CeProcessingSchedulerImplTest.java
server/sonar-ce/src/test/java/org/sonar/ce/taskprocessor/CeWorkerFactoryImplTest.java

index 9880abf59563a0c9c2cca9f6d895de1e8ec5f269..bfc2eff67b0b81cc2d762316ce7c92ba2510d0c7 100644 (file)
@@ -26,6 +26,7 @@ import java.util.concurrent.locks.Lock;
 import org.apache.logging.log4j.Logger;
 import org.elasticsearch.common.logging.Loggers;
 import org.picocontainer.Startable;
+import org.sonar.ce.taskprocessor.CeWorker;
 import org.sonar.ce.taskprocessor.CeWorkerFactory;
 import org.sonar.process.cluster.hz.HazelcastMember;
 import org.sonar.process.cluster.hz.HazelcastObjects;
@@ -64,7 +65,9 @@ public class CeDistributedInformationImpl implements CeDistributedInformation, S
 
   @Override
   public void broadcastWorkerUUIDs() {
-    getClusteredWorkerUUIDs().put(hazelcastMember.getUuid(), ceCeWorkerFactory.getWorkerUUIDs());
+    Set<CeWorker> workers = ceCeWorkerFactory.getWorkers();
+    Set<String> workerUuids = workers.stream().map(CeWorker::getUUID).collect(toSet(workers.size()));
+    getClusteredWorkerUUIDs().put(hazelcastMember.getUuid(), workerUuids);
   }
 
   @Override
index 4063ac676a907bd9d001c702e6e6ac06d8ec68b1..c60b44d002759e440bd568d6602d2e59e6da03ee 100644 (file)
@@ -23,9 +23,11 @@ import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.Lock;
+import org.sonar.ce.taskprocessor.CeWorker;
 import org.sonar.ce.taskprocessor.CeWorkerFactory;
 
 import static com.google.common.base.Preconditions.checkState;
+import static org.sonar.core.util.stream.MoreCollectors.toSet;
 
 /**
  * Provide the set of worker's UUID in a non clustered SonarQube instance
@@ -48,7 +50,8 @@ public class StandaloneCeDistributedInformation implements CeDistributedInformat
 
   @Override
   public void broadcastWorkerUUIDs() {
-    workerUUIDs = ceCeWorkerFactory.getWorkerUUIDs();
+    Set<CeWorker> workers = ceCeWorkerFactory.getWorkers();
+    workerUUIDs = workers.stream().map(CeWorker::getUUID).collect(toSet(workers.size()));
   }
 
   /**
index 0c0df6deb2a88074ac0b315945b61e6c3d3f9d1a..df5be4ad7cbd7dc4d799286c0e8cf3295339f690 100644 (file)
@@ -19,6 +19,8 @@
  */
 package org.sonar.ce.monitoring;
 
+import java.util.List;
+
 public interface CeTasksMBean {
 
   String OBJECT_NAME = "SonarQube:name=ComputeEngineTasks";
@@ -57,4 +59,8 @@ public interface CeTasksMBean {
    * Configured number of Workers.
    */
   int getWorkerCount();
+
+  List<String> getWorkerUuids();
+
+  List<String> getEnabledWorkerUuids();
 }
index ed62ab4bb21a7f9f22f20146e709daf2a5778f0d..08cd1464c457a90aa74328a5b8fec25ce33f5c4a 100644 (file)
  */
 package org.sonar.ce.monitoring;
 
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
 import org.picocontainer.Startable;
+import org.sonar.ce.configuration.CeConfiguration;
+import org.sonar.ce.taskprocessor.CeWorker;
+import org.sonar.ce.taskprocessor.CeWorkerFactory;
+import org.sonar.ce.taskprocessor.EnabledCeWorkerController;
 import org.sonar.process.Jmx;
 import org.sonar.process.systeminfo.SystemInfoSection;
 import org.sonar.process.systeminfo.protobuf.ProtobufSystemInfo;
-import org.sonar.ce.configuration.CeConfiguration;
 
 public class CeTasksMBeanImpl implements CeTasksMBean, Startable, SystemInfoSection {
   private final CEQueueStatus queueStatus;
   private final CeConfiguration ceConfiguration;
+  private final CeWorkerFactory ceWorkerFactory;
+  private final EnabledCeWorkerController enabledCeWorkerController;
 
-  public CeTasksMBeanImpl(CEQueueStatus queueStatus, CeConfiguration ceConfiguration) {
+  public CeTasksMBeanImpl(CEQueueStatus queueStatus, CeConfiguration ceConfiguration, CeWorkerFactory ceWorkerFactory, EnabledCeWorkerController enabledCeWorkerController) {
     this.queueStatus = queueStatus;
     this.ceConfiguration = ceConfiguration;
+    this.ceWorkerFactory = ceWorkerFactory;
+    this.enabledCeWorkerController = enabledCeWorkerController;
   }
 
   @Override
@@ -82,6 +92,25 @@ public class CeTasksMBeanImpl implements CeTasksMBean, Startable, SystemInfoSect
     return ceConfiguration.getWorkerCount();
   }
 
+  @Override
+  public List<String> getWorkerUuids() {
+    Set<CeWorker> workers = ceWorkerFactory.getWorkers();
+    return workers.stream()
+      .map(CeWorker::getUUID)
+      .sorted()
+      .collect(Collectors.toList());
+  }
+
+  @Override
+  public List<String> getEnabledWorkerUuids() {
+    Set<CeWorker> workers = ceWorkerFactory.getWorkers();
+    return workers.stream()
+      .filter(enabledCeWorkerController::isEnabled)
+      .map(CeWorker::getUUID)
+      .sorted()
+      .collect(Collectors.toList());
+  }
+
   @Override
   public ProtobufSystemInfo.Section toProtobuf() {
     ProtobufSystemInfo.Section.Builder builder = ProtobufSystemInfo.Section.newBuilder();
index 7ba1e5713c13631ab23f86240cb456afa6d94eb9..a51ff406c4bb3b999b6ba162b10c28b86d8bd492 100644 (file)
@@ -27,14 +27,15 @@ import java.util.Set;
 public interface CeWorkerFactory {
   /**
    * Create a new CeWorker object with the specified ordinal.
-   * Each {@link CeWorker} returned by this method will have a different UUID from the others and all of these UUIDS will be returned by {@link #getWorkerUUIDs()}.
+   * Each {@link CeWorker} returned by this method will have a different UUID from the others.
+   * All returned {@link CeWorker} will be returned by {@link #getWorkers()}.
    *
    * @return the CeWorker
    */
   CeWorker create(int ordinal);
 
   /**
-   * @return  the UUIDs of each {@link CeWorker} object returned by {@link #create}.
+   * @return each {@link CeWorker} object returned by {@link #create}.
    */
-  Set<String> getWorkerUUIDs();
+  Set<CeWorker> getWorkers();
 }
index 8257efe686c8a6bfc19387d19939c11f92e28880..5ddff9078225964cd4e3964484dfd6c79fb397c2 100644 (file)
  */
 package org.sonar.ce.taskprocessor;
 
-import java.util.HashSet;
+import java.util.Collections;
 import java.util.Set;
+import java.util.stream.Stream;
 import org.sonar.ce.queue.InternalCeQueue;
 import org.sonar.core.util.UuidFactory;
-
-import static com.google.common.collect.ImmutableSet.copyOf;
+import org.sonar.core.util.stream.MoreCollectors;
 
 public class CeWorkerFactoryImpl implements CeWorkerFactory {
   private final UuidFactory uuidFactory;
-  private final Set<String> ceWorkerUUIDs = new HashSet<>();
   private final InternalCeQueue queue;
   private final CeTaskProcessorRepository taskProcessorRepository;
   private final EnabledCeWorkerController enabledCeWorkerController;
   private final CeWorker.ExecutionListener[] executionListeners;
+  private Set<CeWorker> ceWorkers = Collections.emptySet();
 
   /**
    * Used by Pico when there is no {@link CeWorker.ExecutionListener} in the container.
@@ -55,12 +55,13 @@ public class CeWorkerFactoryImpl implements CeWorkerFactory {
   @Override
   public CeWorker create(int ordinal) {
     String uuid = uuidFactory.create();
-    ceWorkerUUIDs.add(uuid);
-    return new CeWorkerImpl(ordinal, uuid, queue, taskProcessorRepository, enabledCeWorkerController, executionListeners);
+    CeWorkerImpl ceWorker = new CeWorkerImpl(ordinal, uuid, queue, taskProcessorRepository, enabledCeWorkerController, executionListeners);
+    ceWorkers = Stream.concat(ceWorkers.stream(), Stream.of(ceWorker)).collect(MoreCollectors.toSet(ceWorkers.size() + 1));
+    return ceWorker;
   }
 
   @Override
-  public Set<String> getWorkerUUIDs() {
-    return copyOf(ceWorkerUUIDs);
+  public Set<CeWorker> getWorkers() {
+    return ceWorkers;
   }
 }
index b020cc7460071f270ac71ff564b0bcdca18d796d..67ebc8c76b6db84b725c9d9adc11d63d4908c419 100644 (file)
@@ -25,7 +25,10 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
 import org.junit.Test;
+import org.sonar.ce.taskprocessor.CeWorker;
 import org.sonar.ce.taskprocessor.CeWorkerFactory;
 import org.sonar.process.cluster.hz.HazelcastMember;
 
@@ -42,8 +45,7 @@ public class CeDistributedInformationImplTest {
   private Map workerMap = ImmutableMap.of(
     clientUUID1, ImmutableSet.of("1", "2"),
     clientUUID2, ImmutableSet.of("3"),
-    clientUUID3, ImmutableSet.of("4", "5", "6")
-  );
+    clientUUID3, ImmutableSet.of("4", "5", "6"));
 
   private HazelcastMember hzClientWrapper = mock(HazelcastMember.class);
 
@@ -79,14 +81,18 @@ public class CeDistributedInformationImplTest {
     when(hzClientWrapper.getReplicatedMap(WORKER_UUIDS)).thenReturn(modifiableWorkerMap);
 
     CeWorkerFactory ceWorkerFactory = mock(CeWorkerFactory.class);
-    when(ceWorkerFactory.getWorkerUUIDs()).thenReturn(ImmutableSet.of("a10", "a11"));
+    Set<CeWorker> ceWorkers = Stream.of("a10", "a11").map(uuid -> {
+      CeWorker res = mock(CeWorker.class);
+      when(res.getUUID()).thenReturn(uuid);
+      return res;
+    }).collect(Collectors.toSet());
+    when(ceWorkerFactory.getWorkers()).thenReturn(ceWorkers);
     CeDistributedInformationImpl ceDistributedInformation = new CeDistributedInformationImpl(hzClientWrapper, ceWorkerFactory);
 
     try {
       ceDistributedInformation.broadcastWorkerUUIDs();
       assertThat(modifiableWorkerMap).containsExactly(
-        entry(clientUUID1, ImmutableSet.of("a10", "a11"))
-      );
+        entry(clientUUID1, ImmutableSet.of("a10", "a11")));
     } finally {
       ceDistributedInformation.stop();
     }
@@ -109,7 +115,6 @@ public class CeDistributedInformationImplTest {
     ceDistributedInformation.stop();
     assertThat(modifiableWorkerMap).containsExactly(
       entry(clientUUID2, ImmutableSet.of("3")),
-      entry(clientUUID3, ImmutableSet.of("4", "5", "6"))
-    );
+      entry(clientUUID3, ImmutableSet.of("4", "5", "6")));
   }
 }
index 62d4f64109589eb6542055dc493de3ba8e0cadf9..13bfa348fe229057d90e01bd9fb10231c5172931 100644 (file)
 package org.sonar.ce;
 
 import com.google.common.collect.ImmutableSet;
+import java.util.Arrays;
 import java.util.Random;
-import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.Lock;
+import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.ExpectedException;
+import org.sonar.ce.taskprocessor.CeWorker;
 import org.sonar.ce.taskprocessor.CeWorkerFactory;
 
 import static org.assertj.core.api.Assertions.assertThat;
@@ -46,25 +48,30 @@ public class StandaloneCeDistributedInformationTest {
     StandaloneCeDistributedInformation ceCluster = new StandaloneCeDistributedInformation(ceWorkerFactory);
 
     ceCluster.broadcastWorkerUUIDs();
-    verify(ceWorkerFactory).getWorkerUUIDs();
+
+    verify(ceWorkerFactory).getWorkers();
   }
 
   @Test
   public void getWorkerUUIDs_must_be_retrieved_from_ceworkerfactory() {
     CeWorkerFactory ceWorkerFactory = mock(CeWorkerFactory.class);
-    Set<String> workerUUIDs = ImmutableSet.of("1", "2", "3");
-    when(ceWorkerFactory.getWorkerUUIDs()).thenReturn(workerUUIDs);
+    CeWorker[] ceWorkers = IntStream.range(0, new Random().nextInt(10))
+      .mapToObj(i -> {
+        CeWorker ceWorker = mock(CeWorker.class);
+        when(ceWorker.getUUID()).thenReturn("uuid_" + i);
+        return ceWorker;
+      })
+      .toArray(CeWorker[]::new);
+    when(ceWorkerFactory.getWorkers()).thenReturn(ImmutableSet.copyOf(ceWorkers));
     StandaloneCeDistributedInformation ceCluster = new StandaloneCeDistributedInformation(ceWorkerFactory);
 
     ceCluster.broadcastWorkerUUIDs();
-    assertThat(ceCluster.getWorkerUUIDs()).isEqualTo(workerUUIDs);
+    assertThat(ceCluster.getWorkerUUIDs()).isEqualTo(Arrays.stream(ceWorkers).map(CeWorker::getUUID).collect(Collectors.toSet()));
   }
 
   @Test
-  public void when_broadcastWorkerUUIDs_is_not_called_getWorkerUUIDs_is_null() {
+  public void getWorkerUUIDs_throws_ISE_if_broadcastWorkerUUIDs_has_not_been_called_before() {
     CeWorkerFactory ceWorkerFactory = mock(CeWorkerFactory.class);
-    Set<String> workerUUIDs = ImmutableSet.of("1", "2", "3");
-    when(ceWorkerFactory.getWorkerUUIDs()).thenReturn(workerUUIDs);
     StandaloneCeDistributedInformation ceCluster = new StandaloneCeDistributedInformation(ceWorkerFactory);
 
     expectedException.expect(IllegalStateException.class);
index cada6ab59aab24a427c41be1ab7118c49c9ebf9e..18cc1c0462a210a5d5c3a2f4c676bd78d807acaa 100644 (file)
  */
 package org.sonar.ce.monitoring;
 
+import com.google.common.collect.ImmutableSet;
 import java.lang.management.ManagementFactory;
+import java.util.List;
+import java.util.Random;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import java.util.stream.Stream;
 import javax.annotation.CheckForNull;
 import javax.management.InstanceNotFoundException;
 import javax.management.ObjectInstance;
 import javax.management.ObjectName;
+import org.apache.commons.lang.RandomStringUtils;
 import org.junit.Test;
 import org.sonar.ce.configuration.CeConfiguration;
+import org.sonar.ce.taskprocessor.CeWorker;
+import org.sonar.ce.taskprocessor.CeWorkerFactory;
+import org.sonar.ce.taskprocessor.EnabledCeWorkerController;
+import org.sonar.core.util.stream.MoreCollectors;
 import org.sonar.process.systeminfo.protobuf.ProtobufSystemInfo;
 
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 
 public class CeTasksMBeanImplTest {
   private static final long PENDING_COUNT = 2;
@@ -38,8 +52,17 @@ public class CeTasksMBeanImplTest {
   private static final long PROCESSING_TIME = 987;
   private static final int WORKER_MAX_COUNT = 666;
   private static final int WORKER_COUNT = 56;
-
-  private CeTasksMBeanImpl underTest = new CeTasksMBeanImpl(new DumbCEQueueStatus(), new DumbCeConfiguration());
+  private static final Set<CeWorker> WORKERS = IntStream.range(0, 2 + new Random().nextInt(10))
+    .mapToObj(i -> RandomStringUtils.randomAlphabetic(15))
+    .map(uuid -> {
+      CeWorker res = mock(CeWorker.class);
+      when(res.getUUID()).thenReturn(uuid);
+      return res;
+    })
+    .collect(MoreCollectors.toSet());
+
+  private EnabledCeWorkerController enabledCeWorkerController = mock(EnabledCeWorkerController.class);
+  private CeTasksMBeanImpl underTest = new CeTasksMBeanImpl(new DumbCEQueueStatus(), new DumbCeConfiguration(), new DumbCeWorkerFactory(), enabledCeWorkerController);
 
   @Test
   public void register_and_unregister() throws Exception {
@@ -84,12 +107,44 @@ public class CeTasksMBeanImplTest {
     assertThat(underTest.getWorkerMaxCount()).isEqualTo(WORKER_MAX_COUNT);
   }
 
+  @Test
+  public void getWorkerUuids_returns_ordered_list_of_uuids_of_worker_from_CeWorkerFactory_instance() {
+    List<String> workerUuids = underTest.getWorkerUuids();
+
+    assertThat(workerUuids).isEqualTo(WORKERS.stream().map(CeWorker::getUUID).sorted().collect(Collectors.toList()));
+    // ImmutableSet can not be serialized
+    assertThat(workerUuids).isNotInstanceOf(ImmutableSet.class);
+  }
+
+  @Test
+  public void getEnabledWorkerUuids_returns_ordered_list_of_uuids_of_worker_from_CeWorkerFactory_instance_filtered_on_enabled_ones() {
+    int enabledWorkerCount = new Random().nextInt(WORKERS.size());
+    int i = 0;
+    CeWorker[] enabledWorkers = new CeWorker[enabledWorkerCount];
+    for (CeWorker worker : WORKERS) {
+      if (i < enabledWorkerCount) {
+        enabledWorkers[i] = worker;
+        when(enabledCeWorkerController.isEnabled(worker)).thenReturn(true);
+      } else {
+        when(enabledCeWorkerController.isEnabled(worker)).thenReturn(false);
+      }
+      i++;
+    }
+
+    List<String> enabledWorkerUuids = underTest.getEnabledWorkerUuids();
+
+    assertThat(enabledWorkerUuids).isEqualTo(Stream.of(enabledWorkers).map(CeWorker::getUUID).sorted().collect(Collectors.toList()));
+    // ImmutableSet can not be serialized
+    assertThat(enabledWorkerUuids).isNotInstanceOf(ImmutableSet.class);
+  }
+
   @Test
   public void export_system_info() {
     ProtobufSystemInfo.Section section = underTest.toProtobuf();
     assertThat(section.getName()).isEqualTo("Compute Engine Tasks");
     assertThat(section.getAttributesCount()).isEqualTo(8);
   }
+
   private static class DumbCEQueueStatus implements CEQueueStatus {
 
     @Override
@@ -175,4 +230,16 @@ public class CeTasksMBeanImplTest {
     }
 
   }
+
+  private static class DumbCeWorkerFactory implements CeWorkerFactory {
+    @Override
+    public CeWorker create(int ordinal) {
+      throw new UnsupportedOperationException("create should not be called");
+    }
+
+    @Override
+    public Set<CeWorker> getWorkers() {
+      return WORKERS;
+    }
+  }
 }
index 096d31fae3f244bf816b0deebd3c5b9d01f8639d..9c1f68683e1b4d70d6da056e8562732afcb85816 100644 (file)
@@ -257,7 +257,7 @@ public class CeProcessingSchedulerImplTest {
     }
 
     @Override
-    public Set<String> getWorkerUUIDs() {
+    public Set<CeWorker> getWorkers() {
       return emptySet();
     }
   }
index b61edd3b1d6df24d3d06cfa764a4a47246c9e129..4f0603ef59f283146daa294b368572bf423b3d54 100644 (file)
@@ -91,18 +91,17 @@ public class CeWorkerFactoryImplTest {
   }
 
   @Test
-  public void CeWorkerFactory_has_an_empty_set_of_uuids_when_created() {
-    assertThat(underTest.getWorkerUUIDs()).isEmpty();
+  public void getWorkers_returns_empty_if_create_has_not_been_called_before() {
+    assertThat(underTest.getWorkers()).isEmpty();
   }
 
   @Test
-  public void CeWorkerFactory_must_returns_the_uuids_of_worker() {
-    Set<String> ceWorkerUUIDs = new HashSet<>();
-
-    for (int i = 0; i < 10; i++) {
-      ceWorkerUUIDs.add(underTest.create(i).getUUID());
+  public void CeWorkerFactory_must_returns_the_workers_returned_by_created() {
+    Set<CeWorker> expected = new HashSet<>();
+    for (int i = 0; i < 1 + new Random().nextInt(10); i++) {
+      expected.add(underTest.create(i));
     }
 
-    assertThat(underTest.getWorkerUUIDs()).isEqualTo(ceWorkerUUIDs);
+    assertThat(underTest.getWorkers()).isEqualTo(expected);
   }
 }