diff options
author | Simon Brandhof <simon.brandhof@sonarsource.com> | 2018-12-19 22:16:05 +0100 |
---|---|---|
committer | SonarTech <sonartech@sonarsource.com> | 2019-01-09 20:21:07 +0100 |
commit | 0c376153dd00a6d06efc2d60fe47801d7a9fae96 (patch) | |
tree | 7670d6897ac7b8058ad8ceb2653db27eee433f14 /server/sonar-ce-common | |
parent | e428c8d0491dcc6b0b5cab846156d5a542668708 (diff) | |
download | sonarqube-0c376153dd00a6d06efc2d60fe47801d7a9fae96.tar.gz sonarqube-0c376153dd00a6d06efc2d60fe47801d7a9fae96.zip |
SONARCLOUD-310 add WS api/ce/timeout_tasks
Diffstat (limited to 'server/sonar-ce-common')
3 files changed, 72 insertions, 5 deletions
diff --git a/server/sonar-ce-common/src/main/java/org/sonar/ce/queue/CeQueue.java b/server/sonar-ce-common/src/main/java/org/sonar/ce/queue/CeQueue.java index f73758a04ac..bb1b7281c02 100644 --- a/server/sonar-ce-common/src/main/java/org/sonar/ce/queue/CeQueue.java +++ b/server/sonar-ce-common/src/main/java/org/sonar/ce/queue/CeQueue.java @@ -22,6 +22,7 @@ package org.sonar.ce.queue; import java.util.Collection; import java.util.List; import java.util.Optional; +import javax.annotation.Nullable; import org.sonar.ce.task.CeTask; import org.sonar.db.DbSession; import org.sonar.db.ce.CeQueueDto; @@ -87,6 +88,16 @@ public interface CeQueue { int cancelAll(); /** + * Mark a task in status {@link org.sonar.db.ce.CeQueueDto.Status#IN_PROGRESS} as failed. An unchecked + * exception is thrown if the status is not {@link org.sonar.db.ce.CeQueueDto.Status#IN_PROGRESS}. + * + * The {@code dbSession} is committed. + + * @throws RuntimeException if the task is concurrently removed from the queue + */ + void fail(DbSession dbSession, CeQueueDto ceQueueDto, @Nullable String errorType, @Nullable String errorMessage); + + /** * Requests workers to stop peeking tasks from queue. Does nothing if workers are already paused or being paused. * The workers that are already processing tasks are not interrupted. * This method is not restricted to the local workers. All the Compute Engine nodes are paused. diff --git a/server/sonar-ce-common/src/main/java/org/sonar/ce/queue/CeQueueImpl.java b/server/sonar-ce-common/src/main/java/org/sonar/ce/queue/CeQueueImpl.java index cb375669392..39f832f8f30 100644 --- a/server/sonar-ce-common/src/main/java/org/sonar/ce/queue/CeQueueImpl.java +++ b/server/sonar-ce-common/src/main/java/org/sonar/ce/queue/CeQueueImpl.java @@ -36,6 +36,7 @@ import java.util.stream.Stream; import javax.annotation.CheckForNull; import javax.annotation.Nullable; import org.sonar.api.server.ServerSide; +import org.sonar.api.utils.System2; import org.sonar.api.utils.log.Loggers; import org.sonar.ce.task.CeTask; import org.sonar.core.util.UuidFactory; @@ -59,16 +60,19 @@ import static java.util.Optional.ofNullable; import static org.sonar.ce.queue.CeQueue.SubmitOption.UNIQUE_QUEUE_PER_MAIN_COMPONENT; import static org.sonar.core.util.stream.MoreCollectors.toEnumSet; import static org.sonar.core.util.stream.MoreCollectors.uniqueIndex; +import static org.sonar.db.ce.CeQueueDto.Status.IN_PROGRESS; import static org.sonar.db.ce.CeQueueDto.Status.PENDING; @ServerSide public class CeQueueImpl implements CeQueue { + private final System2 system2; private final DbClient dbClient; private final UuidFactory uuidFactory; private final DefaultOrganizationProvider defaultOrganizationProvider; - public CeQueueImpl(DbClient dbClient, UuidFactory uuidFactory, DefaultOrganizationProvider defaultOrganizationProvider) { + public CeQueueImpl(System2 system2, DbClient dbClient, UuidFactory uuidFactory, DefaultOrganizationProvider defaultOrganizationProvider) { + this.system2 = system2; this.dbClient = dbClient; this.uuidFactory = uuidFactory; this.defaultOrganizationProvider = defaultOrganizationProvider; @@ -246,6 +250,29 @@ public class CeQueueImpl implements CeQueue { remove(dbSession, q, activityDto); } + @Override + public void fail(DbSession dbSession, CeQueueDto task, @Nullable String errorType, @Nullable String errorMessage) { + checkState(IN_PROGRESS.equals(task.getStatus()), "Task is not in-progress and can't be marked as failed [uuid=%s]", task.getUuid()); + CeActivityDto activityDto = new CeActivityDto(task); + activityDto.setStatus(CeActivityDto.Status.FAILED); + activityDto.setErrorType(errorType); + activityDto.setErrorMessage(errorMessage); + updateExecutionFields(activityDto); + remove(dbSession, task, activityDto); + } + + protected long updateExecutionFields(CeActivityDto activityDto) { + Long startedAt = activityDto.getStartedAt(); + if (startedAt == null) { + return 0L; + } + long now = system2.now(); + long executionTimeInMs = now - startedAt; + activityDto.setExecutedAt(now); + activityDto.setExecutionTimeMs(executionTimeInMs); + return executionTimeInMs; + } + protected void remove(DbSession dbSession, CeQueueDto queueDto, CeActivityDto activityDto) { String taskUuid = queueDto.getUuid(); CeQueueDto.Status expectedQueueDtoStatus = queueDto.getStatus(); @@ -273,7 +300,7 @@ public class CeQueueImpl implements CeQueue { int count = 0; try (DbSession dbSession = dbClient.openSession(false)) { for (CeQueueDto queueDto : dbClient.ceQueueDao().selectAllInAscOrder(dbSession)) { - if (includeInProgress || !queueDto.getStatus().equals(CeQueueDto.Status.IN_PROGRESS)) { + if (includeInProgress || !queueDto.getStatus().equals(IN_PROGRESS)) { cancelImpl(dbSession, queueDto); count++; } @@ -305,7 +332,7 @@ public class CeQueueImpl implements CeQueue { if (!propValue.isPresent() || !propValue.get().equals("true")) { return WorkersPauseStatus.RESUMED; } - int countInProgress = dbClient.ceQueueDao().countByStatus(dbSession, CeQueueDto.Status.IN_PROGRESS); + int countInProgress = dbClient.ceQueueDao().countByStatus(dbSession, IN_PROGRESS); if (countInProgress > 0) { return WorkersPauseStatus.PAUSING; } diff --git a/server/sonar-ce-common/src/test/java/org/sonar/ce/queue/CeQueueImplTest.java b/server/sonar-ce-common/src/test/java/org/sonar/ce/queue/CeQueueImplTest.java index fb557a8293e..6f44b7e42b0 100644 --- a/server/sonar-ce-common/src/test/java/org/sonar/ce/queue/CeQueueImplTest.java +++ b/server/sonar-ce-common/src/test/java/org/sonar/ce/queue/CeQueueImplTest.java @@ -52,6 +52,7 @@ import static java.util.Arrays.asList; import static java.util.Collections.emptyMap; import static org.apache.commons.lang.RandomStringUtils.randomAlphabetic; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.catchThrowable; import static org.assertj.core.api.Assertions.tuple; import static org.hamcrest.Matchers.startsWith; import static org.sonar.ce.queue.CeQueue.SubmitOption.UNIQUE_QUEUE_PER_MAIN_COMPONENT; @@ -59,8 +60,9 @@ import static org.sonar.ce.queue.CeQueue.SubmitOption.UNIQUE_QUEUE_PER_MAIN_COMP public class CeQueueImplTest { private static final String WORKER_UUID = "workerUuid"; + private static final long NOW = 1_450_000_000_000L; - private System2 system2 = new TestSystem2().setNow(1_450_000_000_000L); + private System2 system2 = new TestSystem2().setNow(NOW); @Rule public ExpectedException expectedException = ExpectedException.none(); @@ -72,7 +74,7 @@ public class CeQueueImplTest { private UuidFactory uuidFactory = UuidFactoryImpl.INSTANCE; private DefaultOrganizationProvider defaultOrganizationProvider = TestDefaultOrganizationProvider.from(db); - private CeQueue underTest = new CeQueueImpl(db.getDbClient(), uuidFactory, defaultOrganizationProvider); + private CeQueue underTest = new CeQueueImpl(system2, db.getDbClient(), uuidFactory, defaultOrganizationProvider); @Test public void submit_returns_task_populated_from_CeTaskSubmit_and_creates_CeQueue_row() { @@ -476,6 +478,33 @@ public class CeQueueImplTest { assertThat(underTest.getWorkersPauseStatus()).isEqualTo(CeQueue.WorkersPauseStatus.RESUMED); } + @Test + public void fail_in_progress_task() { + CeTask task = submit(CeTaskTypes.REPORT, newComponent(randomAlphabetic(12))); + CeQueueDto queueDto = db.getDbClient().ceQueueDao().peek(db.getSession(), WORKER_UUID).get(); + + underTest.fail(db.getSession(), queueDto, "TIMEOUT", "Failed on timeout"); + + Optional<CeActivityDto> activity = db.getDbClient().ceActivityDao().selectByUuid(db.getSession(), task.getUuid()); + assertThat(activity.isPresent()).isTrue(); + assertThat(activity.get().getStatus()).isEqualTo(CeActivityDto.Status.FAILED); + assertThat(activity.get().getErrorType()).isEqualTo("TIMEOUT"); + assertThat(activity.get().getErrorMessage()).isEqualTo("Failed on timeout"); + assertThat(activity.get().getExecutedAt()).isEqualTo(NOW); + assertThat(activity.get().getWorkerUuid()).isEqualTo(WORKER_UUID); + } + + @Test + public void fail_throws_exception_if_task_is_pending() { + CeTask task = submit(CeTaskTypes.REPORT, newComponent(randomAlphabetic(12))); + CeQueueDto queueDto = db.getDbClient().ceQueueDao().selectByUuid(db.getSession(), task.getUuid()).get(); + + Throwable thrown = catchThrowable(() -> underTest.fail(db.getSession(), queueDto, "TIMEOUT", "Failed on timeout")); + + assertThat(thrown) + .isInstanceOf(IllegalStateException.class) + .hasMessage("Task is not in-progress and can't be marked as failed [uuid=" + task.getUuid() + "]"); + } private void verifyCeTask(CeTaskSubmit taskSubmit, CeTask task, @Nullable ComponentDto componentDto, UserDto userDto) { verifyCeTask(taskSubmit, task, componentDto, componentDto, userDto); |