aboutsummaryrefslogtreecommitdiffstats
path: root/server/sonar-ce-common
diff options
context:
space:
mode:
authorSimon Brandhof <simon.brandhof@sonarsource.com>2018-12-19 22:16:05 +0100
committerSonarTech <sonartech@sonarsource.com>2019-01-09 20:21:07 +0100
commit0c376153dd00a6d06efc2d60fe47801d7a9fae96 (patch)
tree7670d6897ac7b8058ad8ceb2653db27eee433f14 /server/sonar-ce-common
parente428c8d0491dcc6b0b5cab846156d5a542668708 (diff)
downloadsonarqube-0c376153dd00a6d06efc2d60fe47801d7a9fae96.tar.gz
sonarqube-0c376153dd00a6d06efc2d60fe47801d7a9fae96.zip
SONARCLOUD-310 add WS api/ce/timeout_tasks
Diffstat (limited to 'server/sonar-ce-common')
-rw-r--r--server/sonar-ce-common/src/main/java/org/sonar/ce/queue/CeQueue.java11
-rw-r--r--server/sonar-ce-common/src/main/java/org/sonar/ce/queue/CeQueueImpl.java33
-rw-r--r--server/sonar-ce-common/src/test/java/org/sonar/ce/queue/CeQueueImplTest.java33
3 files changed, 72 insertions, 5 deletions
diff --git a/server/sonar-ce-common/src/main/java/org/sonar/ce/queue/CeQueue.java b/server/sonar-ce-common/src/main/java/org/sonar/ce/queue/CeQueue.java
index f73758a04ac..bb1b7281c02 100644
--- a/server/sonar-ce-common/src/main/java/org/sonar/ce/queue/CeQueue.java
+++ b/server/sonar-ce-common/src/main/java/org/sonar/ce/queue/CeQueue.java
@@ -22,6 +22,7 @@ package org.sonar.ce.queue;
import java.util.Collection;
import java.util.List;
import java.util.Optional;
+import javax.annotation.Nullable;
import org.sonar.ce.task.CeTask;
import org.sonar.db.DbSession;
import org.sonar.db.ce.CeQueueDto;
@@ -87,6 +88,16 @@ public interface CeQueue {
int cancelAll();
/**
+ * Mark a task in status {@link org.sonar.db.ce.CeQueueDto.Status#IN_PROGRESS} as failed. An unchecked
+ * exception is thrown if the status is not {@link org.sonar.db.ce.CeQueueDto.Status#IN_PROGRESS}.
+ *
+ * The {@code dbSession} is committed.
+
+ * @throws RuntimeException if the task is concurrently removed from the queue
+ */
+ void fail(DbSession dbSession, CeQueueDto ceQueueDto, @Nullable String errorType, @Nullable String errorMessage);
+
+ /**
* Requests workers to stop peeking tasks from queue. Does nothing if workers are already paused or being paused.
* The workers that are already processing tasks are not interrupted.
* This method is not restricted to the local workers. All the Compute Engine nodes are paused.
diff --git a/server/sonar-ce-common/src/main/java/org/sonar/ce/queue/CeQueueImpl.java b/server/sonar-ce-common/src/main/java/org/sonar/ce/queue/CeQueueImpl.java
index cb375669392..39f832f8f30 100644
--- a/server/sonar-ce-common/src/main/java/org/sonar/ce/queue/CeQueueImpl.java
+++ b/server/sonar-ce-common/src/main/java/org/sonar/ce/queue/CeQueueImpl.java
@@ -36,6 +36,7 @@ import java.util.stream.Stream;
import javax.annotation.CheckForNull;
import javax.annotation.Nullable;
import org.sonar.api.server.ServerSide;
+import org.sonar.api.utils.System2;
import org.sonar.api.utils.log.Loggers;
import org.sonar.ce.task.CeTask;
import org.sonar.core.util.UuidFactory;
@@ -59,16 +60,19 @@ import static java.util.Optional.ofNullable;
import static org.sonar.ce.queue.CeQueue.SubmitOption.UNIQUE_QUEUE_PER_MAIN_COMPONENT;
import static org.sonar.core.util.stream.MoreCollectors.toEnumSet;
import static org.sonar.core.util.stream.MoreCollectors.uniqueIndex;
+import static org.sonar.db.ce.CeQueueDto.Status.IN_PROGRESS;
import static org.sonar.db.ce.CeQueueDto.Status.PENDING;
@ServerSide
public class CeQueueImpl implements CeQueue {
+ private final System2 system2;
private final DbClient dbClient;
private final UuidFactory uuidFactory;
private final DefaultOrganizationProvider defaultOrganizationProvider;
- public CeQueueImpl(DbClient dbClient, UuidFactory uuidFactory, DefaultOrganizationProvider defaultOrganizationProvider) {
+ public CeQueueImpl(System2 system2, DbClient dbClient, UuidFactory uuidFactory, DefaultOrganizationProvider defaultOrganizationProvider) {
+ this.system2 = system2;
this.dbClient = dbClient;
this.uuidFactory = uuidFactory;
this.defaultOrganizationProvider = defaultOrganizationProvider;
@@ -246,6 +250,29 @@ public class CeQueueImpl implements CeQueue {
remove(dbSession, q, activityDto);
}
+ @Override
+ public void fail(DbSession dbSession, CeQueueDto task, @Nullable String errorType, @Nullable String errorMessage) {
+ checkState(IN_PROGRESS.equals(task.getStatus()), "Task is not in-progress and can't be marked as failed [uuid=%s]", task.getUuid());
+ CeActivityDto activityDto = new CeActivityDto(task);
+ activityDto.setStatus(CeActivityDto.Status.FAILED);
+ activityDto.setErrorType(errorType);
+ activityDto.setErrorMessage(errorMessage);
+ updateExecutionFields(activityDto);
+ remove(dbSession, task, activityDto);
+ }
+
+ protected long updateExecutionFields(CeActivityDto activityDto) {
+ Long startedAt = activityDto.getStartedAt();
+ if (startedAt == null) {
+ return 0L;
+ }
+ long now = system2.now();
+ long executionTimeInMs = now - startedAt;
+ activityDto.setExecutedAt(now);
+ activityDto.setExecutionTimeMs(executionTimeInMs);
+ return executionTimeInMs;
+ }
+
protected void remove(DbSession dbSession, CeQueueDto queueDto, CeActivityDto activityDto) {
String taskUuid = queueDto.getUuid();
CeQueueDto.Status expectedQueueDtoStatus = queueDto.getStatus();
@@ -273,7 +300,7 @@ public class CeQueueImpl implements CeQueue {
int count = 0;
try (DbSession dbSession = dbClient.openSession(false)) {
for (CeQueueDto queueDto : dbClient.ceQueueDao().selectAllInAscOrder(dbSession)) {
- if (includeInProgress || !queueDto.getStatus().equals(CeQueueDto.Status.IN_PROGRESS)) {
+ if (includeInProgress || !queueDto.getStatus().equals(IN_PROGRESS)) {
cancelImpl(dbSession, queueDto);
count++;
}
@@ -305,7 +332,7 @@ public class CeQueueImpl implements CeQueue {
if (!propValue.isPresent() || !propValue.get().equals("true")) {
return WorkersPauseStatus.RESUMED;
}
- int countInProgress = dbClient.ceQueueDao().countByStatus(dbSession, CeQueueDto.Status.IN_PROGRESS);
+ int countInProgress = dbClient.ceQueueDao().countByStatus(dbSession, IN_PROGRESS);
if (countInProgress > 0) {
return WorkersPauseStatus.PAUSING;
}
diff --git a/server/sonar-ce-common/src/test/java/org/sonar/ce/queue/CeQueueImplTest.java b/server/sonar-ce-common/src/test/java/org/sonar/ce/queue/CeQueueImplTest.java
index fb557a8293e..6f44b7e42b0 100644
--- a/server/sonar-ce-common/src/test/java/org/sonar/ce/queue/CeQueueImplTest.java
+++ b/server/sonar-ce-common/src/test/java/org/sonar/ce/queue/CeQueueImplTest.java
@@ -52,6 +52,7 @@ import static java.util.Arrays.asList;
import static java.util.Collections.emptyMap;
import static org.apache.commons.lang.RandomStringUtils.randomAlphabetic;
import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.catchThrowable;
import static org.assertj.core.api.Assertions.tuple;
import static org.hamcrest.Matchers.startsWith;
import static org.sonar.ce.queue.CeQueue.SubmitOption.UNIQUE_QUEUE_PER_MAIN_COMPONENT;
@@ -59,8 +60,9 @@ import static org.sonar.ce.queue.CeQueue.SubmitOption.UNIQUE_QUEUE_PER_MAIN_COMP
public class CeQueueImplTest {
private static final String WORKER_UUID = "workerUuid";
+ private static final long NOW = 1_450_000_000_000L;
- private System2 system2 = new TestSystem2().setNow(1_450_000_000_000L);
+ private System2 system2 = new TestSystem2().setNow(NOW);
@Rule
public ExpectedException expectedException = ExpectedException.none();
@@ -72,7 +74,7 @@ public class CeQueueImplTest {
private UuidFactory uuidFactory = UuidFactoryImpl.INSTANCE;
private DefaultOrganizationProvider defaultOrganizationProvider = TestDefaultOrganizationProvider.from(db);
- private CeQueue underTest = new CeQueueImpl(db.getDbClient(), uuidFactory, defaultOrganizationProvider);
+ private CeQueue underTest = new CeQueueImpl(system2, db.getDbClient(), uuidFactory, defaultOrganizationProvider);
@Test
public void submit_returns_task_populated_from_CeTaskSubmit_and_creates_CeQueue_row() {
@@ -476,6 +478,33 @@ public class CeQueueImplTest {
assertThat(underTest.getWorkersPauseStatus()).isEqualTo(CeQueue.WorkersPauseStatus.RESUMED);
}
+ @Test
+ public void fail_in_progress_task() {
+ CeTask task = submit(CeTaskTypes.REPORT, newComponent(randomAlphabetic(12)));
+ CeQueueDto queueDto = db.getDbClient().ceQueueDao().peek(db.getSession(), WORKER_UUID).get();
+
+ underTest.fail(db.getSession(), queueDto, "TIMEOUT", "Failed on timeout");
+
+ Optional<CeActivityDto> activity = db.getDbClient().ceActivityDao().selectByUuid(db.getSession(), task.getUuid());
+ assertThat(activity.isPresent()).isTrue();
+ assertThat(activity.get().getStatus()).isEqualTo(CeActivityDto.Status.FAILED);
+ assertThat(activity.get().getErrorType()).isEqualTo("TIMEOUT");
+ assertThat(activity.get().getErrorMessage()).isEqualTo("Failed on timeout");
+ assertThat(activity.get().getExecutedAt()).isEqualTo(NOW);
+ assertThat(activity.get().getWorkerUuid()).isEqualTo(WORKER_UUID);
+ }
+
+ @Test
+ public void fail_throws_exception_if_task_is_pending() {
+ CeTask task = submit(CeTaskTypes.REPORT, newComponent(randomAlphabetic(12)));
+ CeQueueDto queueDto = db.getDbClient().ceQueueDao().selectByUuid(db.getSession(), task.getUuid()).get();
+
+ Throwable thrown = catchThrowable(() -> underTest.fail(db.getSession(), queueDto, "TIMEOUT", "Failed on timeout"));
+
+ assertThat(thrown)
+ .isInstanceOf(IllegalStateException.class)
+ .hasMessage("Task is not in-progress and can't be marked as failed [uuid=" + task.getUuid() + "]");
+ }
private void verifyCeTask(CeTaskSubmit taskSubmit, CeTask task, @Nullable ComponentDto componentDto, UserDto userDto) {
verifyCeTask(taskSubmit, task, componentDto, componentDto, userDto);