@@ -36,7 +36,7 @@ public interface InternalCeQueue extends CeQueue { | |||
/** | |||
* Peek the oldest task in status {@link org.sonar.db.ce.CeQueueDto.Status#PENDING}. | |||
* The task status is changed to {@link org.sonar.db.ce.CeQueueDto.Status#IN_PROGRESS}. | |||
* Does not return anything if workers are paused or being paused (see {@link #getWorkersPause()}. | |||
* Does not return anything if workers are paused or being paused (see {@link #getWorkersPauseStatus()}. | |||
* | |||
* <p>Only a single task can be peeked by project.</p> | |||
* |
@@ -72,7 +72,7 @@ public class InternalCeQueueImpl extends CeQueueImpl implements InternalCeQueue | |||
public Optional<CeTask> peek(String workerUuid) { | |||
requireNonNull(workerUuid, "workerUuid can't be null"); | |||
if (computeEngineStatus.getStatus() != ComputeEngineStatus.Status.STARTED || getWorkersPause().isPresent()) { | |||
if (computeEngineStatus.getStatus() != ComputeEngineStatus.Status.STARTED || getWorkersPauseStatus() != WorkersPauseStatus.RESUMED) { | |||
return Optional.empty(); | |||
} | |||
try (DbSession dbSession = dbClient.openSession(false)) { |
@@ -99,13 +99,13 @@ public interface CeQueue { | |||
*/ | |||
void resumeWorkers(); | |||
Optional<WorkersPause> getWorkersPause(); | |||
WorkersPauseStatus getWorkersPauseStatus(); | |||
enum SubmitOption { | |||
UNIQUE_QUEUE_PER_COMPONENT | |||
} | |||
enum WorkersPause { | |||
enum WorkersPauseStatus { | |||
/** | |||
* Pause triggered but at least one task is still in-progress | |||
*/ | |||
@@ -114,7 +114,12 @@ public interface CeQueue { | |||
/** | |||
* Paused, no tasks are in-progress. Tasks are pending. | |||
*/ | |||
PAUSED | |||
PAUSED, | |||
/** | |||
* Not paused nor pausing | |||
*/ | |||
RESUMED | |||
} | |||
} |
@@ -231,17 +231,17 @@ public class CeQueueImpl implements CeQueue { | |||
} | |||
@Override | |||
public java.util.Optional<WorkersPause> getWorkersPause() { | |||
public WorkersPauseStatus getWorkersPauseStatus() { | |||
try (DbSession dbSession = dbClient.openSession(false)) { | |||
java.util.Optional<String> propValue = dbClient.internalPropertiesDao().selectByKey(dbSession, InternalProperties.COMPUTE_ENGINE_PAUSE); | |||
if (!propValue.isPresent() || !propValue.get().equals("true")) { | |||
return java.util.Optional.empty(); | |||
return WorkersPauseStatus.RESUMED; | |||
} | |||
int countInProgress = dbClient.ceQueueDao().countByStatus(dbSession, CeQueueDto.Status.IN_PROGRESS); | |||
if (countInProgress > 0) { | |||
return java.util.Optional.of(WorkersPause.PAUSING); | |||
return WorkersPauseStatus.PAUSING; | |||
} | |||
return java.util.Optional.of(WorkersPause.PAUSED); | |||
return WorkersPauseStatus.PAUSED; | |||
} | |||
} | |||
@@ -19,7 +19,6 @@ | |||
*/ | |||
package org.sonar.server.ce.ws; | |||
import java.util.Optional; | |||
import org.sonar.api.server.ws.Request; | |||
import org.sonar.api.server.ws.Response; | |||
import org.sonar.api.server.ws.WebService; | |||
@@ -60,17 +59,21 @@ public class InfoAction implements CeWsAction { | |||
} | |||
Ce.InfoWsResponse.Builder builder = Ce.InfoWsResponse.newBuilder(); | |||
Optional<CeQueue.WorkersPause> pause = ceQueue.getWorkersPause(); | |||
builder.setWorkersPaused(isPaused(pause)); | |||
builder.setWorkersPauseRequested(isPauseRequested(pause)); | |||
CeQueue.WorkersPauseStatus status = ceQueue.getWorkersPauseStatus(); | |||
builder.setWorkersPauseStatus(convert(status)); | |||
WsUtils.writeProtobuf(builder.build(), request, response); | |||
} | |||
private static boolean isPaused(Optional<CeQueue.WorkersPause> pause) { | |||
return pause.isPresent() && pause.get() == CeQueue.WorkersPause.PAUSED; | |||
} | |||
private static boolean isPauseRequested(Optional<CeQueue.WorkersPause> pause) { | |||
return pause.isPresent() && pause.get() == CeQueue.WorkersPause.PAUSING; | |||
private Ce.WorkersPauseStatus convert(CeQueue.WorkersPauseStatus status) { | |||
switch (status) { | |||
case PAUSING: | |||
return Ce.WorkersPauseStatus.PAUSING; | |||
case PAUSED: | |||
return Ce.WorkersPauseStatus.PAUSED; | |||
case RESUMED: | |||
return Ce.WorkersPauseStatus.RESUMED; | |||
default: | |||
throw new IllegalStateException("Unsupported WorkersPauseStatus: " + status); | |||
} | |||
} | |||
} |
@@ -1,4 +1,3 @@ | |||
{ | |||
"workersPaused": false, | |||
"workersPauseRequested": true | |||
"workersPauseStatus": "PAUSING" | |||
} |
@@ -388,10 +388,10 @@ public class CeQueueImplTest { | |||
submit(CeTaskTypes.REPORT, "PROJECT_1"); | |||
// task is pending | |||
assertThat(underTest.getWorkersPause()).isEmpty(); | |||
assertThat(underTest.getWorkersPauseStatus()).isEqualTo(CeQueue.WorkersPauseStatus.RESUMED); | |||
underTest.pauseWorkers(); | |||
assertThat(underTest.getWorkersPause()).hasValue(CeQueue.WorkersPause.PAUSED); | |||
assertThat(underTest.getWorkersPauseStatus()).isEqualTo(CeQueue.WorkersPauseStatus.PAUSED); | |||
} | |||
@Test | |||
@@ -400,19 +400,19 @@ public class CeQueueImplTest { | |||
db.getDbClient().ceQueueDao().peek(session, WORKER_UUID, MAX_EXECUTION_COUNT); | |||
// task is in-progress | |||
assertThat(underTest.getWorkersPause()).isEmpty(); | |||
assertThat(underTest.getWorkersPauseStatus()).isEqualTo(CeQueue.WorkersPauseStatus.RESUMED); | |||
underTest.pauseWorkers(); | |||
assertThat(underTest.getWorkersPause()).hasValue(CeQueue.WorkersPause.PAUSING); | |||
assertThat(underTest.getWorkersPauseStatus()).isEqualTo(CeQueue.WorkersPauseStatus.PAUSING); | |||
} | |||
@Test | |||
public void resumeWorkers_does_nothing_if_not_paused() { | |||
assertThat(underTest.getWorkersPause()).isEmpty(); | |||
assertThat(underTest.getWorkersPauseStatus()).isEqualTo(CeQueue.WorkersPauseStatus.RESUMED); | |||
underTest.resumeWorkers(); | |||
assertThat(underTest.getWorkersPause()).isEmpty(); | |||
assertThat(underTest.getWorkersPauseStatus()).isEqualTo(CeQueue.WorkersPauseStatus.RESUMED); | |||
} | |||
@Test | |||
@@ -422,19 +422,19 @@ public class CeQueueImplTest { | |||
// task is in-progress | |||
underTest.pauseWorkers(); | |||
assertThat(underTest.getWorkersPause()).hasValue(CeQueue.WorkersPause.PAUSING); | |||
assertThat(underTest.getWorkersPauseStatus()).isEqualTo(CeQueue.WorkersPauseStatus.PAUSING); | |||
underTest.resumeWorkers(); | |||
assertThat(underTest.getWorkersPause()).isEmpty(); | |||
assertThat(underTest.getWorkersPauseStatus()).isEqualTo(CeQueue.WorkersPauseStatus.RESUMED); | |||
} | |||
@Test | |||
public void resumeWorkers_resumes_paused_workers() { | |||
underTest.pauseWorkers(); | |||
assertThat(underTest.getWorkersPause()).hasValue(CeQueue.WorkersPause.PAUSED); | |||
assertThat(underTest.getWorkersPauseStatus()).isEqualTo(CeQueue.WorkersPauseStatus.PAUSED); | |||
underTest.resumeWorkers(); | |||
assertThat(underTest.getWorkersPause()).isEmpty(); | |||
assertThat(underTest.getWorkersPauseStatus()).isEqualTo(CeQueue.WorkersPauseStatus.RESUMED); | |||
} | |||
private void verifyCeTask(CeTaskSubmit taskSubmit, CeTask task, @Nullable ComponentDto componentDto) { |
@@ -19,7 +19,6 @@ | |||
*/ | |||
package org.sonar.server.ce.ws; | |||
import java.util.Optional; | |||
import org.junit.Rule; | |||
import org.junit.Test; | |||
import org.junit.rules.ExpectedException; | |||
@@ -60,7 +59,7 @@ public class InfoActionTest { | |||
@Test | |||
public void test_example_of_response() { | |||
userSession.logIn().setSystemAdministrator(); | |||
when(ceQueue.getWorkersPause()).thenReturn(Optional.of(CeQueue.WorkersPause.PAUSING)); | |||
when(ceQueue.getWorkersPauseStatus()).thenReturn(CeQueue.WorkersPauseStatus.PAUSING); | |||
ws.newRequest().execute().assertJson(ws.getDef().responseExampleAsString()); | |||
} | |||
@@ -68,31 +67,28 @@ public class InfoActionTest { | |||
@Test | |||
public void test_workers_in_pausing_state() { | |||
userSession.logIn().setSystemAdministrator(); | |||
when(ceQueue.getWorkersPause()).thenReturn(Optional.of(CeQueue.WorkersPause.PAUSING)); | |||
when(ceQueue.getWorkersPauseStatus()).thenReturn(CeQueue.WorkersPauseStatus.PAUSING); | |||
Ce.InfoWsResponse response = ws.newRequest().executeProtobuf(Ce.InfoWsResponse.class); | |||
assertThat(response.getWorkersPaused()).isFalse(); | |||
assertThat(response.getWorkersPauseRequested()).isTrue(); | |||
assertThat(response.getWorkersPauseStatus()).isEqualTo(Ce.WorkersPauseStatus.PAUSING); | |||
} | |||
@Test | |||
public void test_workers_in_paused_state() { | |||
userSession.logIn().setSystemAdministrator(); | |||
when(ceQueue.getWorkersPause()).thenReturn(Optional.of(CeQueue.WorkersPause.PAUSED)); | |||
when(ceQueue.getWorkersPauseStatus()).thenReturn(CeQueue.WorkersPauseStatus.PAUSED); | |||
Ce.InfoWsResponse response = ws.newRequest().executeProtobuf(Ce.InfoWsResponse.class); | |||
assertThat(response.getWorkersPaused()).isTrue(); | |||
assertThat(response.getWorkersPauseRequested()).isFalse(); | |||
assertThat(response.getWorkersPauseStatus()).isEqualTo(Ce.WorkersPauseStatus.PAUSED); | |||
} | |||
@Test | |||
public void test_workers_in_resumed_state() { | |||
userSession.logIn().setSystemAdministrator(); | |||
when(ceQueue.getWorkersPause()).thenReturn(Optional.empty()); | |||
when(ceQueue.getWorkersPauseStatus()).thenReturn(CeQueue.WorkersPauseStatus.RESUMED); | |||
Ce.InfoWsResponse response = ws.newRequest().executeProtobuf(Ce.InfoWsResponse.class); | |||
assertThat(response.getWorkersPaused()).isFalse(); | |||
assertThat(response.getWorkersPauseRequested()).isFalse(); | |||
assertThat(response.getWorkersPauseStatus()).isEqualTo(Ce.WorkersPauseStatus.RESUMED); | |||
} | |||
@Test | |||
@@ -120,10 +116,9 @@ public class InfoActionTest { | |||
public void authenticate_with_passcode() { | |||
userSession.anonymous(); | |||
when(passcode.isValid(any())).thenReturn(true); | |||
when(ceQueue.getWorkersPause()).thenReturn(Optional.empty()); | |||
when(ceQueue.getWorkersPauseStatus()).thenReturn(CeQueue.WorkersPauseStatus.RESUMED); | |||
Ce.InfoWsResponse response = ws.newRequest().executeProtobuf(Ce.InfoWsResponse.class); | |||
assertThat(response.getWorkersPaused()).isFalse(); | |||
assertThat(response.getWorkersPauseRequested()).isFalse(); | |||
assertThat(response.getWorkersPauseStatus()).isEqualTo(Ce.WorkersPauseStatus.RESUMED); | |||
} | |||
} |
@@ -59,8 +59,13 @@ message ComponentResponse { | |||
// GET api/ce/info | |||
message InfoWsResponse { | |||
optional bool workersPaused = 1; | |||
optional bool workersPauseRequested = 2; | |||
optional WorkersPauseStatus workersPauseStatus = 1; | |||
} | |||
enum WorkersPauseStatus { | |||
RESUMED = 0; | |||
PAUSING = 1; | |||
PAUSED = 2; | |||
} | |||
// GET api/ce/task_types |
@@ -63,8 +63,7 @@ public class CeWorkersPauseTest { | |||
public void pause_and_resume_workers() throws IOException { | |||
tester.wsClient().ce().pause(); | |||
// no in-progress tasks --> already paused | |||
assertThat(tester.wsClient().ce().info().getWorkersPaused()).isTrue(); | |||
assertThat(tester.wsClient().ce().info().getWorkersPauseRequested()).isFalse(); | |||
assertThat(tester.wsClient().ce().info().getWorkersPauseStatus()).isEqualTo(Ce.WorkersPauseStatus.PAUSED); | |||
// run analysis | |||
File projectDir = temp.newFolder(); | |||
@@ -79,8 +78,7 @@ public class CeWorkersPauseTest { | |||
// workers are resumed | |||
tester.wsClient().ce().resume(); | |||
assertThat(tester.wsClient().ce().info().getWorkersPaused()).isFalse(); | |||
assertThat(tester.wsClient().ce().info().getWorkersPauseRequested()).isFalse(); | |||
assertThat(tester.wsClient().ce().info().getWorkersPauseStatus()).isEqualTo(Ce.WorkersPauseStatus.RESUMED); | |||
while (!isQueueEmpty()) { | |||
Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS); |