*/
package org.sonar.ce.container;
+/**
+ * Status of the Compute Engine local node, but not of cluster of Compute Engine nodes.
+ */
public interface ComputeEngineStatus {
/**
/**
* Peek the oldest task in status {@link org.sonar.db.ce.CeQueueDto.Status#PENDING}.
* The task status is changed to {@link org.sonar.db.ce.CeQueueDto.Status#IN_PROGRESS}.
- * Does not return anything if the queue is paused (see {@link #isPeekPaused()}.
+ * Does not return anything if workers are paused or being paused (see {@link #getWorkersPause()}.
*
* <p>Only a single task can be peeked by project.</p>
*
public Optional<CeTask> peek(String workerUuid) {
requireNonNull(workerUuid, "workerUuid can't be null");
- if (computeEngineStatus.getStatus() != ComputeEngineStatus.Status.STARTED) {
+ if (computeEngineStatus.getStatus() != ComputeEngineStatus.Status.STARTED || getWorkersPause().isPresent()) {
return Optional.empty();
}
try (DbSession dbSession = dbClient.openSession(false)) {
assertThat(peek.isPresent()).isFalse();
}
+ @Test
+ public void peek_is_paused_then_resumed() {
+ CeTask task = submit(CeTaskTypes.REPORT, "PROJECT_1");
+ underTest.pauseWorkers();
+
+ Optional<CeTask> peek = underTest.peek(WORKER_UUID_1);
+ assertThat(peek).isEmpty();
+
+ underTest.resumeWorkers();
+ peek = underTest.peek(WORKER_UUID_1);
+ assertThat(peek).isPresent();
+ assertThat(peek.get().getUuid()).isEqualTo(task.getUuid());
+ }
+
@Test
public void peek_overrides_workerUuid_to_argument() {
db.getDbClient().ceQueueDao().insert(session, new CeQueueDto()
mapper.insertAsEmpty(key, system2.now());
}
+ public void delete(DbSession dbSession, String key) {
+ getMapper(dbSession).deleteByKey(key);
+ }
+
/**
* @return a Map with an {link Optional<String>} for each String in {@code keys}.
*/
*/
int cancelAll();
+ /**
+ * Requests workers to stop peeking tasks from queue. Does nothing if workers are already paused or being paused.
+ * The workers that are already processing tasks are not interrupted.
+ * This method is not restricted to the local workers. All the Compute Engine nodes are paused.
+ */
+ void pauseWorkers();
+
+ /**
+ * Resumes workers so that they can peek tasks from queue.
+ * This method is not restricted to the local workers. All the Compute Engine nodes are paused.
+ */
+ void resumeWorkers();
+
+ Optional<WorkersPause> getWorkersPause();
+
enum SubmitOption {
UNIQUE_QUEUE_PER_COMPONENT
}
+ enum WorkersPause {
+ /**
+ * Pause triggered but at least one task is still in-progress
+ */
+ PAUSING,
+
+ /**
+ * Paused, no tasks are in-progress. Tasks are pending.
+ */
+ PAUSED
+ }
+
}
import java.util.function.Predicate;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
-import org.sonar.api.ce.ComputeEngineSide;
+import org.sonar.api.server.ServerSide;
import org.sonar.core.util.UuidFactory;
import org.sonar.core.util.stream.MoreCollectors;
import org.sonar.db.DbClient;
import org.sonar.db.ce.CeQueueDto;
import org.sonar.db.component.ComponentDto;
import org.sonar.server.organization.DefaultOrganizationProvider;
+import org.sonar.server.property.InternalProperties;
import static com.google.common.base.Preconditions.checkState;
import static com.google.common.collect.FluentIterable.from;
import static org.sonar.core.util.stream.MoreCollectors.toEnumSet;
import static org.sonar.db.ce.CeQueueDto.Status.PENDING;
-@ComputeEngineSide
+@ServerSide
public class CeQueueImpl implements CeQueue {
private final DbClient dbClient;
.collect(Collectors.toSet());
Map<String, ComponentDto> componentDtoByUuid = from(dbClient.componentDao()
.selectByUuids(dbSession, componentUuids))
- .uniqueIndex(ComponentDto::uuid);
+ .uniqueIndex(ComponentDto::uuid);
return dtos.stream()
.map(new CeQueueDtoToCeTask(defaultOrganizationProvider.get().getUuid(), componentDtoByUuid)::apply)
}
}
+ @Override
+ public void pauseWorkers() {
+ try (DbSession dbSession = dbClient.openSession(false)) {
+ dbClient.internalPropertiesDao().save(dbSession, InternalProperties.COMPUTE_ENGINE_PAUSE, "true");
+ dbSession.commit();
+ }
+ }
+
+ @Override
+ public void resumeWorkers() {
+ try (DbSession dbSession = dbClient.openSession(false)) {
+ dbClient.internalPropertiesDao().delete(dbSession, InternalProperties.COMPUTE_ENGINE_PAUSE);
+ dbSession.commit();
+ }
+ }
+
+ @Override
+ public java.util.Optional<WorkersPause> getWorkersPause() {
+ try (DbSession dbSession = dbClient.openSession(false)) {
+ java.util.Optional<String> propValue = dbClient.internalPropertiesDao().selectByKey(dbSession, InternalProperties.COMPUTE_ENGINE_PAUSE);
+ if (!propValue.isPresent() || !propValue.get().equals("true")) {
+ return java.util.Optional.empty();
+ }
+ int countInProgress = dbClient.ceQueueDao().countByStatus(dbSession, CeQueueDto.Status.IN_PROGRESS);
+ if (countInProgress > 0) {
+ return java.util.Optional.of(WorkersPause.PAUSING);
+ }
+ return java.util.Optional.of(WorkersPause.PAUSED);
+ }
+ }
+
protected void remove(DbSession dbSession, CeQueueDto queueDto, CeActivityDto activityDto) {
dbClient.ceActivityDao().insert(dbSession, activityDto);
dbClient.ceQueueDao().deleteByUuid(dbSession, queueDto.getUuid());
String ORGANIZATION_ENABLED = "organization.enabled";
String SERVER_ID_CHECKSUM = "server.idChecksum";
+
+ /**
+ * Compute Engine is pausing/paused if property value is "true".
+ */
+ String COMPUTE_ENGINE_PAUSE = "ce.pause";
+
/**
* Read the value of the specified property.
*
assertThat(history.isPresent()).isFalse();
}
+ @Test
+ public void pauseWorkers_marks_workers_as_paused_if_zero_tasks_in_progress() {
+ submit(CeTaskTypes.REPORT, "PROJECT_1");
+ // task is pending
+
+ assertThat(underTest.getWorkersPause()).isEmpty();
+
+ underTest.pauseWorkers();
+ assertThat(underTest.getWorkersPause()).hasValue(CeQueue.WorkersPause.PAUSED);
+ }
+
+ @Test
+ public void pauseWorkers_marks_workers_as_pausing_if_some_tasks_in_progress() {
+ submit(CeTaskTypes.REPORT, "PROJECT_1");
+ db.getDbClient().ceQueueDao().peek(session, WORKER_UUID, MAX_EXECUTION_COUNT);
+ // task is in-progress
+
+ assertThat(underTest.getWorkersPause()).isEmpty();
+
+ underTest.pauseWorkers();
+ assertThat(underTest.getWorkersPause()).hasValue(CeQueue.WorkersPause.PAUSING);
+ }
+
+ @Test
+ public void resumeWorkers_does_nothing_if_not_paused() {
+ assertThat(underTest.getWorkersPause()).isEmpty();
+
+ underTest.resumeWorkers();
+
+ assertThat(underTest.getWorkersPause()).isEmpty();
+ }
+
+ @Test
+ public void resumeWorkers_resumes_pausing_workers() {
+ submit(CeTaskTypes.REPORT, "PROJECT_1");
+ db.getDbClient().ceQueueDao().peek(session, WORKER_UUID, MAX_EXECUTION_COUNT);
+ // task is in-progress
+
+ underTest.pauseWorkers();
+ assertThat(underTest.getWorkersPause()).hasValue(CeQueue.WorkersPause.PAUSING);
+
+ underTest.resumeWorkers();
+ assertThat(underTest.getWorkersPause()).isEmpty();
+ }
+
+ @Test
+ public void resumeWorkers_resumes_paused_workers() {
+ underTest.pauseWorkers();
+ assertThat(underTest.getWorkersPause()).hasValue(CeQueue.WorkersPause.PAUSED);
+
+ underTest.resumeWorkers();
+ assertThat(underTest.getWorkersPause()).isEmpty();
+ }
+
private void verifyCeTask(CeTaskSubmit taskSubmit, CeTask task, @Nullable ComponentDto componentDto) {
if (componentDto == null) {
assertThat(task.getOrganizationUuid()).isEqualTo(defaultOrganizationProvider.get().getUuid());