@@ -19,6 +19,9 @@ | |||
*/ | |||
package org.sonar.ce.monitoring; | |||
import org.sonar.api.ce.ComputeEngineSide; | |||
@ComputeEngineSide | |||
public interface CEQueueStatus { | |||
/** | |||
@@ -85,4 +88,6 @@ public interface CEQueueStatus { | |||
* Time spent processing batch reports since startup, in milliseconds. | |||
*/ | |||
long getProcessingTime(); | |||
boolean areWorkersPaused(); | |||
} |
@@ -19,10 +19,12 @@ | |||
*/ | |||
package org.sonar.ce.monitoring; | |||
import java.util.Optional; | |||
import java.util.concurrent.atomic.AtomicLong; | |||
import org.sonar.db.DbClient; | |||
import org.sonar.db.DbSession; | |||
import org.sonar.db.ce.CeQueueDto; | |||
import org.sonar.server.property.InternalProperties; | |||
import static com.google.common.base.Preconditions.checkArgument; | |||
@@ -69,6 +71,14 @@ public class CEQueueStatusImpl implements CEQueueStatus { | |||
} | |||
} | |||
@Override | |||
public boolean areWorkersPaused() { | |||
try (DbSession dbSession = dbClient.openSession(false)) { | |||
Optional<String> val = dbClient.internalPropertiesDao().selectByKey(dbSession, InternalProperties.COMPUTE_ENGINE_PAUSE); | |||
return "true".equals(val.orElse(null)); | |||
} | |||
} | |||
@Override | |||
public long getInProgressCount() { | |||
return inProgress.get(); |
@@ -93,6 +93,7 @@ public class CeTasksMBeanImpl implements CeTasksMBean, Startable, SystemInfoSect | |||
builder.addAttributesBuilder().setKey("Processing Time (ms)").setLongValue(getProcessingTime()).build(); | |||
builder.addAttributesBuilder().setKey("Worker Count").setLongValue(getWorkerCount()).build(); | |||
builder.addAttributesBuilder().setKey("Max Worker Count").setLongValue(getWorkerMaxCount()).build(); | |||
builder.addAttributesBuilder().setKey("Workers Paused").setBooleanValue(queueStatus.areWorkersPaused()).build(); | |||
return builder.build(); | |||
} | |||
} |
@@ -19,14 +19,15 @@ | |||
*/ | |||
package org.sonar.ce.monitoring; | |||
import java.util.Optional; | |||
import java.util.Random; | |||
import org.junit.Rule; | |||
import org.junit.Test; | |||
import org.junit.rules.ExpectedException; | |||
import org.mockito.Mockito; | |||
import org.sonar.db.DbClient; | |||
import org.sonar.db.DbSession; | |||
import org.sonar.db.ce.CeQueueDto; | |||
import org.sonar.server.property.InternalProperties; | |||
import static org.assertj.core.api.Assertions.assertThat; | |||
import static org.mockito.ArgumentMatchers.any; | |||
@@ -135,8 +136,20 @@ public class CEQueueStatusImplTest { | |||
@Test | |||
public void count_Pending_from_database() { | |||
when(dbClient.ceQueueDao().countByStatus(any(DbSession.class), eq(CeQueueDto.Status.PENDING))).thenReturn(42); | |||
when(dbClient.ceQueueDao().countByStatus(any(), eq(CeQueueDto.Status.PENDING))).thenReturn(42); | |||
assertThat(underTest.getPendingCount()).isEqualTo(42); | |||
} | |||
@Test | |||
public void workers_pause_is_loaded_from_db() { | |||
when(dbClient.internalPropertiesDao().selectByKey(any(), eq(InternalProperties.COMPUTE_ENGINE_PAUSE))).thenReturn(Optional.of("true")); | |||
assertThat(underTest.areWorkersPaused()).isTrue(); | |||
} | |||
@Test | |||
public void workers_pause_is_false_by_default() { | |||
assertThat(underTest.areWorkersPaused()).isFalse(); | |||
} | |||
} |
@@ -88,11 +88,10 @@ public class CeTasksMBeanImplTest { | |||
public void export_system_info() { | |||
ProtobufSystemInfo.Section section = underTest.toProtobuf(); | |||
assertThat(section.getName()).isEqualTo("Compute Engine Tasks"); | |||
assertThat(section.getAttributesCount()).isEqualTo(7); | |||
assertThat(section.getAttributesCount()).isEqualTo(8); | |||
} | |||
private static class DumbCEQueueStatus implements CEQueueStatus { | |||
@Override | |||
public long getPendingCount() { | |||
return PENDING_COUNT; | |||
@@ -132,6 +131,12 @@ public class CeTasksMBeanImplTest { | |||
public long getProcessingTime() { | |||
return PROCESSING_TIME; | |||
} | |||
@Override | |||
public boolean areWorkersPaused() { | |||
return false; | |||
} | |||
private long methodNotImplemented() { | |||
throw new UnsupportedOperationException("Not Implemented"); | |||
} |
@@ -28,6 +28,7 @@ import org.sonar.db.ce.CeQueueDto; | |||
import org.sonar.process.systeminfo.Global; | |||
import org.sonar.process.systeminfo.SystemInfoSection; | |||
import org.sonar.process.systeminfo.protobuf.ProtobufSystemInfo; | |||
import org.sonar.server.property.InternalProperties; | |||
import static org.sonar.process.systeminfo.SystemInfoUtils.setAttribute; | |||
@@ -57,6 +58,7 @@ public class CeQueueGlobalSection implements SystemInfoSection, Global { | |||
setAttribute(protobuf, "Total Pending", dbClient.ceQueueDao().countByStatus(dbSession, CeQueueDto.Status.PENDING)); | |||
setAttribute(protobuf, "Total In Progress", dbClient.ceQueueDao().countByStatus(dbSession, CeQueueDto.Status.IN_PROGRESS)); | |||
setAttribute(protobuf, "Max Workers per Node", workerCountProvider == null ? DEFAULT_NB_OF_WORKERS : workerCountProvider.get()); | |||
setAttribute(protobuf, "Workers Paused", "true".equals(dbClient.internalPropertiesDao().selectByKey(dbSession, InternalProperties.COMPUTE_ENGINE_PAUSE).orElse(null))); | |||
} | |||
return protobuf.build(); | |||
} |
@@ -19,13 +19,14 @@ | |||
*/ | |||
package org.sonar.server.platform.monitoring.cluster; | |||
import java.util.Optional; | |||
import org.junit.Test; | |||
import org.mockito.Mockito; | |||
import org.sonar.ce.configuration.WorkerCountProvider; | |||
import org.sonar.db.DbClient; | |||
import org.sonar.db.DbSession; | |||
import org.sonar.db.ce.CeQueueDto; | |||
import org.sonar.process.systeminfo.protobuf.ProtobufSystemInfo; | |||
import org.sonar.server.property.InternalProperties; | |||
import static org.mockito.ArgumentMatchers.any; | |||
import static org.mockito.ArgumentMatchers.eq; | |||
@@ -36,11 +37,12 @@ import static org.sonar.server.platform.monitoring.SystemInfoTesting.assertThatA | |||
public class CeQueueGlobalSectionTest { | |||
private DbClient dbClient = mock(DbClient.class, Mockito.RETURNS_DEEP_STUBS); | |||
private WorkerCountProvider workerCountProvider = mock(WorkerCountProvider.class); | |||
@Test | |||
public void test_queue_state_with_default_settings() { | |||
when(dbClient.ceQueueDao().countByStatus(any(DbSession.class), eq(CeQueueDto.Status.PENDING))).thenReturn(10); | |||
when(dbClient.ceQueueDao().countByStatus(any(DbSession.class), eq(CeQueueDto.Status.IN_PROGRESS))).thenReturn(1); | |||
when(dbClient.ceQueueDao().countByStatus(any(), eq(CeQueueDto.Status.PENDING))).thenReturn(10); | |||
when(dbClient.ceQueueDao().countByStatus(any(), eq(CeQueueDto.Status.IN_PROGRESS))).thenReturn(1); | |||
CeQueueGlobalSection underTest = new CeQueueGlobalSection(dbClient); | |||
ProtobufSystemInfo.Section section = underTest.toProtobuf(); | |||
@@ -52,9 +54,8 @@ public class CeQueueGlobalSectionTest { | |||
@Test | |||
public void test_queue_state_with_overridden_settings() { | |||
when(dbClient.ceQueueDao().countByStatus(any(DbSession.class), eq(CeQueueDto.Status.PENDING))).thenReturn(10); | |||
when(dbClient.ceQueueDao().countByStatus(any(DbSession.class), eq(CeQueueDto.Status.IN_PROGRESS))).thenReturn(2); | |||
WorkerCountProvider workerCountProvider = mock(WorkerCountProvider.class); | |||
when(dbClient.ceQueueDao().countByStatus(any(), eq(CeQueueDto.Status.PENDING))).thenReturn(10); | |||
when(dbClient.ceQueueDao().countByStatus(any(), eq(CeQueueDto.Status.IN_PROGRESS))).thenReturn(2); | |||
when(workerCountProvider.get()).thenReturn(5); | |||
CeQueueGlobalSection underTest = new CeQueueGlobalSection(dbClient, workerCountProvider); | |||
@@ -65,5 +66,21 @@ public class CeQueueGlobalSectionTest { | |||
assertThatAttributeIs(section, "Max Workers per Node", 5); | |||
} | |||
@Test | |||
public void test_workers_not_paused() { | |||
CeQueueGlobalSection underTest = new CeQueueGlobalSection(dbClient, workerCountProvider); | |||
ProtobufSystemInfo.Section section = underTest.toProtobuf(); | |||
assertThatAttributeIs(section, "Workers Paused", false); | |||
} | |||
@Test | |||
public void test_workers_paused() { | |||
when(dbClient.internalPropertiesDao().selectByKey(any(), eq(InternalProperties.COMPUTE_ENGINE_PAUSE))).thenReturn(Optional.of("true")); | |||
CeQueueGlobalSection underTest = new CeQueueGlobalSection(dbClient, workerCountProvider); | |||
ProtobufSystemInfo.Section section = underTest.toProtobuf(); | |||
assertThatAttributeIs(section, "Workers Paused", true); | |||
} | |||
} |