]> source.dussan.org Git - sonarqube.git/commitdiff
SONAR-10592 ability to pause/resume Compute Engine workers
authorSimon Brandhof <simon.brandhof@sonarsource.com>
Wed, 18 Apr 2018 15:57:03 +0000 (17:57 +0200)
committerSonarTech <sonartech@sonarsource.com>
Thu, 10 May 2018 18:20:52 +0000 (20:20 +0200)
server/sonar-ce/src/main/java/org/sonar/ce/container/ComputeEngineStatus.java
server/sonar-ce/src/main/java/org/sonar/ce/queue/InternalCeQueue.java
server/sonar-ce/src/main/java/org/sonar/ce/queue/InternalCeQueueImpl.java
server/sonar-ce/src/test/java/org/sonar/ce/queue/InternalCeQueueImplTest.java
server/sonar-db-dao/src/main/java/org/sonar/db/property/InternalPropertiesDao.java
server/sonar-server/src/main/java/org/sonar/ce/queue/CeQueue.java
server/sonar-server/src/main/java/org/sonar/ce/queue/CeQueueImpl.java
server/sonar-server/src/main/java/org/sonar/server/property/InternalProperties.java
server/sonar-server/src/test/java/org/sonar/ce/queue/CeQueueImplTest.java

index af6aa5ab498a44d31ce10674d7c13819d9a7a3e9..396f152428ada6e047cb45ec3ca3c8706dacf9a1 100644 (file)
@@ -19,6 +19,9 @@
  */
 package org.sonar.ce.container;
 
+/**
+ * Status of the Compute Engine local node, but not of cluster of Compute Engine nodes.
+ */
 public interface ComputeEngineStatus {
 
   /**
index 60959bb6e81b2c1f72f9919a779aac52dd6d424c..26a1a558713a1c55c9325ab1bacdb562190cb024 100644 (file)
@@ -36,7 +36,7 @@ public interface InternalCeQueue extends CeQueue {
   /**
    * Peek the oldest task in status {@link org.sonar.db.ce.CeQueueDto.Status#PENDING}.
    * The task status is changed to {@link org.sonar.db.ce.CeQueueDto.Status#IN_PROGRESS}.
-   * Does not return anything if the queue is paused (see {@link #isPeekPaused()}.
+   * Does not return anything if workers are paused or being paused (see {@link #getWorkersPause()}.
    *
    * <p>Only a single task can be peeked by project.</p>
    *
index de97563d60e6fd12f8db5f2846157c750f85c350..907a2d477af355c48aca9be695581b2cb29c0daf 100644 (file)
@@ -72,7 +72,7 @@ public class InternalCeQueueImpl extends CeQueueImpl implements InternalCeQueue
   public Optional<CeTask> peek(String workerUuid) {
     requireNonNull(workerUuid, "workerUuid can't be null");
 
-    if (computeEngineStatus.getStatus() != ComputeEngineStatus.Status.STARTED) {
+    if (computeEngineStatus.getStatus() != ComputeEngineStatus.Status.STARTED || getWorkersPause().isPresent()) {
       return Optional.empty();
     }
     try (DbSession dbSession = dbClient.openSession(false)) {
index 6e31c0b3af2fa8b153dee945251d07c9af18c22e..4925c5df697d38a381b5b0d2ef5cdc6db93d2d06 100644 (file)
@@ -303,6 +303,20 @@ public class InternalCeQueueImplTest {
     assertThat(peek.isPresent()).isFalse();
   }
 
+  @Test
+  public void peek_is_paused_then_resumed() {
+    CeTask task = submit(CeTaskTypes.REPORT, "PROJECT_1");
+    underTest.pauseWorkers();
+
+    Optional<CeTask> peek = underTest.peek(WORKER_UUID_1);
+    assertThat(peek).isEmpty();
+
+    underTest.resumeWorkers();
+    peek = underTest.peek(WORKER_UUID_1);
+    assertThat(peek).isPresent();
+    assertThat(peek.get().getUuid()).isEqualTo(task.getUuid());
+  }
+
   @Test
   public void peek_overrides_workerUuid_to_argument() {
     db.getDbClient().ceQueueDao().insert(session, new CeQueueDto()
index 01e8911efed4cb20f42486ec99a9dacfb794820c..12669b7697d2b24051c3cd6d725b0618a6ba0073 100644 (file)
@@ -89,6 +89,10 @@ public class InternalPropertiesDao implements Dao {
     mapper.insertAsEmpty(key, system2.now());
   }
 
+  public void delete(DbSession dbSession, String key) {
+    getMapper(dbSession).deleteByKey(key);
+  }
+
   /**
    * @return a Map with an {link Optional<String>} for each String in {@code keys}.
    */
index 680a3b851f0e4fbaecb73e45e8b049e50f3c4a7c..97e25b90db61696534b9c6196547b2bf95fc3cf5 100644 (file)
@@ -86,8 +86,35 @@ public interface CeQueue {
    */
   int cancelAll();
 
+  /**
+   * Requests workers to stop peeking tasks from queue. Does nothing if workers are already paused or being paused.
+   * The workers that are already processing tasks are not interrupted.
+   * This method is not restricted to the local workers. All the Compute Engine nodes are paused.
+   */
+  void pauseWorkers();
+
+  /**
+   * Resumes workers so that they can peek tasks from queue.
+   * This method is not restricted to the local workers. All the Compute Engine nodes are paused.
+   */
+  void resumeWorkers();
+
+  Optional<WorkersPause> getWorkersPause();
+
   enum SubmitOption {
     UNIQUE_QUEUE_PER_COMPONENT
   }
 
+  enum WorkersPause {
+    /**
+     * Pause triggered but at least one task is still in-progress
+     */
+    PAUSING,
+
+    /**
+     * Paused, no tasks are in-progress. Tasks are pending.
+     */
+    PAUSED
+  }
+
 }
index dcb5b78833ccaa37f3bebd16e921a94724d2449c..5e131bbcd7b866556c5af01a5ace0f2e27e9a7f1 100644 (file)
@@ -33,7 +33,7 @@ import java.util.Set;
 import java.util.function.Predicate;
 import java.util.stream.Collectors;
 import javax.annotation.Nonnull;
-import org.sonar.api.ce.ComputeEngineSide;
+import org.sonar.api.server.ServerSide;
 import org.sonar.core.util.UuidFactory;
 import org.sonar.core.util.stream.MoreCollectors;
 import org.sonar.db.DbClient;
@@ -42,6 +42,7 @@ import org.sonar.db.ce.CeActivityDto;
 import org.sonar.db.ce.CeQueueDto;
 import org.sonar.db.component.ComponentDto;
 import org.sonar.server.organization.DefaultOrganizationProvider;
+import org.sonar.server.property.InternalProperties;
 
 import static com.google.common.base.Preconditions.checkState;
 import static com.google.common.collect.FluentIterable.from;
@@ -51,7 +52,7 @@ import static org.sonar.ce.queue.CeQueue.SubmitOption.UNIQUE_QUEUE_PER_COMPONENT
 import static org.sonar.core.util.stream.MoreCollectors.toEnumSet;
 import static org.sonar.db.ce.CeQueueDto.Status.PENDING;
 
-@ComputeEngineSide
+@ServerSide
 public class CeQueueImpl implements CeQueue {
 
   private final DbClient dbClient;
@@ -176,7 +177,7 @@ public class CeQueueImpl implements CeQueue {
       .collect(Collectors.toSet());
     Map<String, ComponentDto> componentDtoByUuid = from(dbClient.componentDao()
       .selectByUuids(dbSession, componentUuids))
-      .uniqueIndex(ComponentDto::uuid);
+        .uniqueIndex(ComponentDto::uuid);
 
     return dtos.stream()
       .map(new CeQueueDtoToCeTask(defaultOrganizationProvider.get().getUuid(), componentDtoByUuid)::apply)
@@ -213,6 +214,37 @@ public class CeQueueImpl implements CeQueue {
     }
   }
 
+  @Override
+  public void pauseWorkers() {
+    try (DbSession dbSession = dbClient.openSession(false)) {
+      dbClient.internalPropertiesDao().save(dbSession, InternalProperties.COMPUTE_ENGINE_PAUSE, "true");
+      dbSession.commit();
+    }
+  }
+
+  @Override
+  public void resumeWorkers() {
+    try (DbSession dbSession = dbClient.openSession(false)) {
+      dbClient.internalPropertiesDao().delete(dbSession, InternalProperties.COMPUTE_ENGINE_PAUSE);
+      dbSession.commit();
+    }
+  }
+
+  @Override
+  public java.util.Optional<WorkersPause> getWorkersPause() {
+    try (DbSession dbSession = dbClient.openSession(false)) {
+      java.util.Optional<String> propValue = dbClient.internalPropertiesDao().selectByKey(dbSession, InternalProperties.COMPUTE_ENGINE_PAUSE);
+      if (!propValue.isPresent() || !propValue.get().equals("true")) {
+        return java.util.Optional.empty();
+      }
+      int countInProgress = dbClient.ceQueueDao().countByStatus(dbSession, CeQueueDto.Status.IN_PROGRESS);
+      if (countInProgress > 0) {
+        return java.util.Optional.of(WorkersPause.PAUSING);
+      }
+      return java.util.Optional.of(WorkersPause.PAUSED);
+    }
+  }
+
   protected void remove(DbSession dbSession, CeQueueDto queueDto, CeActivityDto activityDto) {
     dbClient.ceActivityDao().insert(dbSession, activityDto);
     dbClient.ceQueueDao().deleteByUuid(dbSession, queueDto.getUuid());
index e5787cd373ce5839b7961b1f226a98a8883b13a3..544cd6eb870d7436561583e1372dbdd4df131fdd 100644 (file)
@@ -35,6 +35,12 @@ public interface InternalProperties {
   String ORGANIZATION_ENABLED = "organization.enabled";
 
   String SERVER_ID_CHECKSUM = "server.idChecksum";
+
+  /**
+   * Compute Engine is pausing/paused if property value is "true".
+   */
+  String COMPUTE_ENGINE_PAUSE = "ce.pause";
+
   /**
    * Read the value of the specified property.
    *
index 1fc5caff7e36f007b3f11c2db40b51c079ccf4f6..830d8c55255d22a85b01a6635a8d8fde30773a6c 100644 (file)
@@ -383,6 +383,60 @@ public class CeQueueImplTest {
     assertThat(history.isPresent()).isFalse();
   }
 
+  @Test
+  public void pauseWorkers_marks_workers_as_paused_if_zero_tasks_in_progress() {
+    submit(CeTaskTypes.REPORT, "PROJECT_1");
+    // task is pending
+
+    assertThat(underTest.getWorkersPause()).isEmpty();
+
+    underTest.pauseWorkers();
+    assertThat(underTest.getWorkersPause()).hasValue(CeQueue.WorkersPause.PAUSED);
+  }
+
+  @Test
+  public void pauseWorkers_marks_workers_as_pausing_if_some_tasks_in_progress() {
+    submit(CeTaskTypes.REPORT, "PROJECT_1");
+    db.getDbClient().ceQueueDao().peek(session, WORKER_UUID, MAX_EXECUTION_COUNT);
+    // task is in-progress
+
+    assertThat(underTest.getWorkersPause()).isEmpty();
+
+    underTest.pauseWorkers();
+    assertThat(underTest.getWorkersPause()).hasValue(CeQueue.WorkersPause.PAUSING);
+  }
+
+  @Test
+  public void resumeWorkers_does_nothing_if_not_paused() {
+    assertThat(underTest.getWorkersPause()).isEmpty();
+
+    underTest.resumeWorkers();
+
+    assertThat(underTest.getWorkersPause()).isEmpty();
+  }
+
+  @Test
+  public void resumeWorkers_resumes_pausing_workers() {
+    submit(CeTaskTypes.REPORT, "PROJECT_1");
+    db.getDbClient().ceQueueDao().peek(session, WORKER_UUID, MAX_EXECUTION_COUNT);
+    // task is in-progress
+
+    underTest.pauseWorkers();
+    assertThat(underTest.getWorkersPause()).hasValue(CeQueue.WorkersPause.PAUSING);
+
+    underTest.resumeWorkers();
+    assertThat(underTest.getWorkersPause()).isEmpty();
+  }
+
+  @Test
+  public void resumeWorkers_resumes_paused_workers() {
+    underTest.pauseWorkers();
+    assertThat(underTest.getWorkersPause()).hasValue(CeQueue.WorkersPause.PAUSED);
+
+    underTest.resumeWorkers();
+    assertThat(underTest.getWorkersPause()).isEmpty();
+  }
+
   private void verifyCeTask(CeTaskSubmit taskSubmit, CeTask task, @Nullable ComponentDto componentDto) {
     if (componentDto == null) {
       assertThat(task.getOrganizationUuid()).isEqualTo(defaultOrganizationProvider.get().getUuid());