From: Simon Brandhof Date: Wed, 18 Apr 2018 15:57:03 +0000 (+0200) Subject: SONAR-10592 ability to pause/resume Compute Engine workers X-Git-Tag: 7.5~1235 X-Git-Url: https://source.dussan.org/?a=commitdiff_plain;h=a756fd6650759b0b6983913489f50a03d684d8bc;p=sonarqube.git SONAR-10592 ability to pause/resume Compute Engine workers --- diff --git a/server/sonar-ce/src/main/java/org/sonar/ce/container/ComputeEngineStatus.java b/server/sonar-ce/src/main/java/org/sonar/ce/container/ComputeEngineStatus.java index af6aa5ab498..396f152428a 100644 --- a/server/sonar-ce/src/main/java/org/sonar/ce/container/ComputeEngineStatus.java +++ b/server/sonar-ce/src/main/java/org/sonar/ce/container/ComputeEngineStatus.java @@ -19,6 +19,9 @@ */ package org.sonar.ce.container; +/** + * Status of the Compute Engine local node, but not of cluster of Compute Engine nodes. + */ public interface ComputeEngineStatus { /** diff --git a/server/sonar-ce/src/main/java/org/sonar/ce/queue/InternalCeQueue.java b/server/sonar-ce/src/main/java/org/sonar/ce/queue/InternalCeQueue.java index 60959bb6e81..26a1a558713 100644 --- a/server/sonar-ce/src/main/java/org/sonar/ce/queue/InternalCeQueue.java +++ b/server/sonar-ce/src/main/java/org/sonar/ce/queue/InternalCeQueue.java @@ -36,7 +36,7 @@ public interface InternalCeQueue extends CeQueue { /** * Peek the oldest task in status {@link org.sonar.db.ce.CeQueueDto.Status#PENDING}. * The task status is changed to {@link org.sonar.db.ce.CeQueueDto.Status#IN_PROGRESS}. - * Does not return anything if the queue is paused (see {@link #isPeekPaused()}. + * Does not return anything if workers are paused or being paused (see {@link #getWorkersPause()}. * *

Only a single task can be peeked by project.

* diff --git a/server/sonar-ce/src/main/java/org/sonar/ce/queue/InternalCeQueueImpl.java b/server/sonar-ce/src/main/java/org/sonar/ce/queue/InternalCeQueueImpl.java index de97563d60e..907a2d477af 100644 --- a/server/sonar-ce/src/main/java/org/sonar/ce/queue/InternalCeQueueImpl.java +++ b/server/sonar-ce/src/main/java/org/sonar/ce/queue/InternalCeQueueImpl.java @@ -72,7 +72,7 @@ public class InternalCeQueueImpl extends CeQueueImpl implements InternalCeQueue public Optional peek(String workerUuid) { requireNonNull(workerUuid, "workerUuid can't be null"); - if (computeEngineStatus.getStatus() != ComputeEngineStatus.Status.STARTED) { + if (computeEngineStatus.getStatus() != ComputeEngineStatus.Status.STARTED || getWorkersPause().isPresent()) { return Optional.empty(); } try (DbSession dbSession = dbClient.openSession(false)) { diff --git a/server/sonar-ce/src/test/java/org/sonar/ce/queue/InternalCeQueueImplTest.java b/server/sonar-ce/src/test/java/org/sonar/ce/queue/InternalCeQueueImplTest.java index 6e31c0b3af2..4925c5df697 100644 --- a/server/sonar-ce/src/test/java/org/sonar/ce/queue/InternalCeQueueImplTest.java +++ b/server/sonar-ce/src/test/java/org/sonar/ce/queue/InternalCeQueueImplTest.java @@ -303,6 +303,20 @@ public class InternalCeQueueImplTest { assertThat(peek.isPresent()).isFalse(); } + @Test + public void peek_is_paused_then_resumed() { + CeTask task = submit(CeTaskTypes.REPORT, "PROJECT_1"); + underTest.pauseWorkers(); + + Optional peek = underTest.peek(WORKER_UUID_1); + assertThat(peek).isEmpty(); + + underTest.resumeWorkers(); + peek = underTest.peek(WORKER_UUID_1); + assertThat(peek).isPresent(); + assertThat(peek.get().getUuid()).isEqualTo(task.getUuid()); + } + @Test public void peek_overrides_workerUuid_to_argument() { db.getDbClient().ceQueueDao().insert(session, new CeQueueDto() diff --git a/server/sonar-db-dao/src/main/java/org/sonar/db/property/InternalPropertiesDao.java b/server/sonar-db-dao/src/main/java/org/sonar/db/property/InternalPropertiesDao.java index 01e8911efed..12669b7697d 100644 --- a/server/sonar-db-dao/src/main/java/org/sonar/db/property/InternalPropertiesDao.java +++ b/server/sonar-db-dao/src/main/java/org/sonar/db/property/InternalPropertiesDao.java @@ -89,6 +89,10 @@ public class InternalPropertiesDao implements Dao { mapper.insertAsEmpty(key, system2.now()); } + public void delete(DbSession dbSession, String key) { + getMapper(dbSession).deleteByKey(key); + } + /** * @return a Map with an {link Optional} for each String in {@code keys}. */ diff --git a/server/sonar-server/src/main/java/org/sonar/ce/queue/CeQueue.java b/server/sonar-server/src/main/java/org/sonar/ce/queue/CeQueue.java index 680a3b851f0..97e25b90db6 100644 --- a/server/sonar-server/src/main/java/org/sonar/ce/queue/CeQueue.java +++ b/server/sonar-server/src/main/java/org/sonar/ce/queue/CeQueue.java @@ -86,8 +86,35 @@ public interface CeQueue { */ int cancelAll(); + /** + * Requests workers to stop peeking tasks from queue. Does nothing if workers are already paused or being paused. + * The workers that are already processing tasks are not interrupted. + * This method is not restricted to the local workers. All the Compute Engine nodes are paused. + */ + void pauseWorkers(); + + /** + * Resumes workers so that they can peek tasks from queue. + * This method is not restricted to the local workers. All the Compute Engine nodes are paused. + */ + void resumeWorkers(); + + Optional getWorkersPause(); + enum SubmitOption { UNIQUE_QUEUE_PER_COMPONENT } + enum WorkersPause { + /** + * Pause triggered but at least one task is still in-progress + */ + PAUSING, + + /** + * Paused, no tasks are in-progress. Tasks are pending. + */ + PAUSED + } + } diff --git a/server/sonar-server/src/main/java/org/sonar/ce/queue/CeQueueImpl.java b/server/sonar-server/src/main/java/org/sonar/ce/queue/CeQueueImpl.java index dcb5b78833c..5e131bbcd7b 100644 --- a/server/sonar-server/src/main/java/org/sonar/ce/queue/CeQueueImpl.java +++ b/server/sonar-server/src/main/java/org/sonar/ce/queue/CeQueueImpl.java @@ -33,7 +33,7 @@ import java.util.Set; import java.util.function.Predicate; import java.util.stream.Collectors; import javax.annotation.Nonnull; -import org.sonar.api.ce.ComputeEngineSide; +import org.sonar.api.server.ServerSide; import org.sonar.core.util.UuidFactory; import org.sonar.core.util.stream.MoreCollectors; import org.sonar.db.DbClient; @@ -42,6 +42,7 @@ import org.sonar.db.ce.CeActivityDto; import org.sonar.db.ce.CeQueueDto; import org.sonar.db.component.ComponentDto; import org.sonar.server.organization.DefaultOrganizationProvider; +import org.sonar.server.property.InternalProperties; import static com.google.common.base.Preconditions.checkState; import static com.google.common.collect.FluentIterable.from; @@ -51,7 +52,7 @@ import static org.sonar.ce.queue.CeQueue.SubmitOption.UNIQUE_QUEUE_PER_COMPONENT import static org.sonar.core.util.stream.MoreCollectors.toEnumSet; import static org.sonar.db.ce.CeQueueDto.Status.PENDING; -@ComputeEngineSide +@ServerSide public class CeQueueImpl implements CeQueue { private final DbClient dbClient; @@ -176,7 +177,7 @@ public class CeQueueImpl implements CeQueue { .collect(Collectors.toSet()); Map componentDtoByUuid = from(dbClient.componentDao() .selectByUuids(dbSession, componentUuids)) - .uniqueIndex(ComponentDto::uuid); + .uniqueIndex(ComponentDto::uuid); return dtos.stream() .map(new CeQueueDtoToCeTask(defaultOrganizationProvider.get().getUuid(), componentDtoByUuid)::apply) @@ -213,6 +214,37 @@ public class CeQueueImpl implements CeQueue { } } + @Override + public void pauseWorkers() { + try (DbSession dbSession = dbClient.openSession(false)) { + dbClient.internalPropertiesDao().save(dbSession, InternalProperties.COMPUTE_ENGINE_PAUSE, "true"); + dbSession.commit(); + } + } + + @Override + public void resumeWorkers() { + try (DbSession dbSession = dbClient.openSession(false)) { + dbClient.internalPropertiesDao().delete(dbSession, InternalProperties.COMPUTE_ENGINE_PAUSE); + dbSession.commit(); + } + } + + @Override + public java.util.Optional getWorkersPause() { + try (DbSession dbSession = dbClient.openSession(false)) { + java.util.Optional propValue = dbClient.internalPropertiesDao().selectByKey(dbSession, InternalProperties.COMPUTE_ENGINE_PAUSE); + if (!propValue.isPresent() || !propValue.get().equals("true")) { + return java.util.Optional.empty(); + } + int countInProgress = dbClient.ceQueueDao().countByStatus(dbSession, CeQueueDto.Status.IN_PROGRESS); + if (countInProgress > 0) { + return java.util.Optional.of(WorkersPause.PAUSING); + } + return java.util.Optional.of(WorkersPause.PAUSED); + } + } + protected void remove(DbSession dbSession, CeQueueDto queueDto, CeActivityDto activityDto) { dbClient.ceActivityDao().insert(dbSession, activityDto); dbClient.ceQueueDao().deleteByUuid(dbSession, queueDto.getUuid()); diff --git a/server/sonar-server/src/main/java/org/sonar/server/property/InternalProperties.java b/server/sonar-server/src/main/java/org/sonar/server/property/InternalProperties.java index e5787cd373c..544cd6eb870 100644 --- a/server/sonar-server/src/main/java/org/sonar/server/property/InternalProperties.java +++ b/server/sonar-server/src/main/java/org/sonar/server/property/InternalProperties.java @@ -35,6 +35,12 @@ public interface InternalProperties { String ORGANIZATION_ENABLED = "organization.enabled"; String SERVER_ID_CHECKSUM = "server.idChecksum"; + + /** + * Compute Engine is pausing/paused if property value is "true". + */ + String COMPUTE_ENGINE_PAUSE = "ce.pause"; + /** * Read the value of the specified property. * diff --git a/server/sonar-server/src/test/java/org/sonar/ce/queue/CeQueueImplTest.java b/server/sonar-server/src/test/java/org/sonar/ce/queue/CeQueueImplTest.java index 1fc5caff7e3..830d8c55255 100644 --- a/server/sonar-server/src/test/java/org/sonar/ce/queue/CeQueueImplTest.java +++ b/server/sonar-server/src/test/java/org/sonar/ce/queue/CeQueueImplTest.java @@ -383,6 +383,60 @@ public class CeQueueImplTest { assertThat(history.isPresent()).isFalse(); } + @Test + public void pauseWorkers_marks_workers_as_paused_if_zero_tasks_in_progress() { + submit(CeTaskTypes.REPORT, "PROJECT_1"); + // task is pending + + assertThat(underTest.getWorkersPause()).isEmpty(); + + underTest.pauseWorkers(); + assertThat(underTest.getWorkersPause()).hasValue(CeQueue.WorkersPause.PAUSED); + } + + @Test + public void pauseWorkers_marks_workers_as_pausing_if_some_tasks_in_progress() { + submit(CeTaskTypes.REPORT, "PROJECT_1"); + db.getDbClient().ceQueueDao().peek(session, WORKER_UUID, MAX_EXECUTION_COUNT); + // task is in-progress + + assertThat(underTest.getWorkersPause()).isEmpty(); + + underTest.pauseWorkers(); + assertThat(underTest.getWorkersPause()).hasValue(CeQueue.WorkersPause.PAUSING); + } + + @Test + public void resumeWorkers_does_nothing_if_not_paused() { + assertThat(underTest.getWorkersPause()).isEmpty(); + + underTest.resumeWorkers(); + + assertThat(underTest.getWorkersPause()).isEmpty(); + } + + @Test + public void resumeWorkers_resumes_pausing_workers() { + submit(CeTaskTypes.REPORT, "PROJECT_1"); + db.getDbClient().ceQueueDao().peek(session, WORKER_UUID, MAX_EXECUTION_COUNT); + // task is in-progress + + underTest.pauseWorkers(); + assertThat(underTest.getWorkersPause()).hasValue(CeQueue.WorkersPause.PAUSING); + + underTest.resumeWorkers(); + assertThat(underTest.getWorkersPause()).isEmpty(); + } + + @Test + public void resumeWorkers_resumes_paused_workers() { + underTest.pauseWorkers(); + assertThat(underTest.getWorkersPause()).hasValue(CeQueue.WorkersPause.PAUSED); + + underTest.resumeWorkers(); + assertThat(underTest.getWorkersPause()).isEmpty(); + } + private void verifyCeTask(CeTaskSubmit taskSubmit, CeTask task, @Nullable ComponentDto componentDto) { if (componentDto == null) { assertThat(task.getOrganizationUuid()).isEqualTo(defaultOrganizationProvider.get().getUuid());