import java.util.Collection;
import java.util.List;
import java.util.Optional;
+import javax.annotation.Nullable;
import org.sonar.ce.task.CeTask;
import org.sonar.db.DbSession;
import org.sonar.db.ce.CeQueueDto;
*/
int cancelAll();
+ /**
+ * Mark a task in status {@link org.sonar.db.ce.CeQueueDto.Status#IN_PROGRESS} as failed. An unchecked
+ * exception is thrown if the status is not {@link org.sonar.db.ce.CeQueueDto.Status#IN_PROGRESS}.
+ *
+ * The {@code dbSession} is committed.
+
+ * @throws RuntimeException if the task is concurrently removed from the queue
+ */
+ void fail(DbSession dbSession, CeQueueDto ceQueueDto, @Nullable String errorType, @Nullable String errorMessage);
+
/**
* Requests workers to stop peeking tasks from queue. Does nothing if workers are already paused or being paused.
* The workers that are already processing tasks are not interrupted.
import javax.annotation.CheckForNull;
import javax.annotation.Nullable;
import org.sonar.api.server.ServerSide;
+import org.sonar.api.utils.System2;
import org.sonar.api.utils.log.Loggers;
import org.sonar.ce.task.CeTask;
import org.sonar.core.util.UuidFactory;
import static org.sonar.ce.queue.CeQueue.SubmitOption.UNIQUE_QUEUE_PER_MAIN_COMPONENT;
import static org.sonar.core.util.stream.MoreCollectors.toEnumSet;
import static org.sonar.core.util.stream.MoreCollectors.uniqueIndex;
+import static org.sonar.db.ce.CeQueueDto.Status.IN_PROGRESS;
import static org.sonar.db.ce.CeQueueDto.Status.PENDING;
@ServerSide
public class CeQueueImpl implements CeQueue {
+ private final System2 system2;
private final DbClient dbClient;
private final UuidFactory uuidFactory;
private final DefaultOrganizationProvider defaultOrganizationProvider;
- public CeQueueImpl(DbClient dbClient, UuidFactory uuidFactory, DefaultOrganizationProvider defaultOrganizationProvider) {
+ public CeQueueImpl(System2 system2, DbClient dbClient, UuidFactory uuidFactory, DefaultOrganizationProvider defaultOrganizationProvider) {
+ this.system2 = system2;
this.dbClient = dbClient;
this.uuidFactory = uuidFactory;
this.defaultOrganizationProvider = defaultOrganizationProvider;
remove(dbSession, q, activityDto);
}
+ @Override
+ public void fail(DbSession dbSession, CeQueueDto task, @Nullable String errorType, @Nullable String errorMessage) {
+ checkState(IN_PROGRESS.equals(task.getStatus()), "Task is not in-progress and can't be marked as failed [uuid=%s]", task.getUuid());
+ CeActivityDto activityDto = new CeActivityDto(task);
+ activityDto.setStatus(CeActivityDto.Status.FAILED);
+ activityDto.setErrorType(errorType);
+ activityDto.setErrorMessage(errorMessage);
+ updateExecutionFields(activityDto);
+ remove(dbSession, task, activityDto);
+ }
+
+ protected long updateExecutionFields(CeActivityDto activityDto) {
+ Long startedAt = activityDto.getStartedAt();
+ if (startedAt == null) {
+ return 0L;
+ }
+ long now = system2.now();
+ long executionTimeInMs = now - startedAt;
+ activityDto.setExecutedAt(now);
+ activityDto.setExecutionTimeMs(executionTimeInMs);
+ return executionTimeInMs;
+ }
+
protected void remove(DbSession dbSession, CeQueueDto queueDto, CeActivityDto activityDto) {
String taskUuid = queueDto.getUuid();
CeQueueDto.Status expectedQueueDtoStatus = queueDto.getStatus();
int count = 0;
try (DbSession dbSession = dbClient.openSession(false)) {
for (CeQueueDto queueDto : dbClient.ceQueueDao().selectAllInAscOrder(dbSession)) {
- if (includeInProgress || !queueDto.getStatus().equals(CeQueueDto.Status.IN_PROGRESS)) {
+ if (includeInProgress || !queueDto.getStatus().equals(IN_PROGRESS)) {
cancelImpl(dbSession, queueDto);
count++;
}
if (!propValue.isPresent() || !propValue.get().equals("true")) {
return WorkersPauseStatus.RESUMED;
}
- int countInProgress = dbClient.ceQueueDao().countByStatus(dbSession, CeQueueDto.Status.IN_PROGRESS);
+ int countInProgress = dbClient.ceQueueDao().countByStatus(dbSession, IN_PROGRESS);
if (countInProgress > 0) {
return WorkersPauseStatus.PAUSING;
}
import static java.util.Collections.emptyMap;
import static org.apache.commons.lang.RandomStringUtils.randomAlphabetic;
import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.catchThrowable;
import static org.assertj.core.api.Assertions.tuple;
import static org.hamcrest.Matchers.startsWith;
import static org.sonar.ce.queue.CeQueue.SubmitOption.UNIQUE_QUEUE_PER_MAIN_COMPONENT;
public class CeQueueImplTest {
private static final String WORKER_UUID = "workerUuid";
+ private static final long NOW = 1_450_000_000_000L;
- private System2 system2 = new TestSystem2().setNow(1_450_000_000_000L);
+ private System2 system2 = new TestSystem2().setNow(NOW);
@Rule
public ExpectedException expectedException = ExpectedException.none();
private UuidFactory uuidFactory = UuidFactoryImpl.INSTANCE;
private DefaultOrganizationProvider defaultOrganizationProvider = TestDefaultOrganizationProvider.from(db);
- private CeQueue underTest = new CeQueueImpl(db.getDbClient(), uuidFactory, defaultOrganizationProvider);
+ private CeQueue underTest = new CeQueueImpl(system2, db.getDbClient(), uuidFactory, defaultOrganizationProvider);
@Test
public void submit_returns_task_populated_from_CeTaskSubmit_and_creates_CeQueue_row() {
assertThat(underTest.getWorkersPauseStatus()).isEqualTo(CeQueue.WorkersPauseStatus.RESUMED);
}
+ @Test
+ public void fail_in_progress_task() {
+ CeTask task = submit(CeTaskTypes.REPORT, newComponent(randomAlphabetic(12)));
+ CeQueueDto queueDto = db.getDbClient().ceQueueDao().peek(db.getSession(), WORKER_UUID).get();
+
+ underTest.fail(db.getSession(), queueDto, "TIMEOUT", "Failed on timeout");
+
+ Optional<CeActivityDto> activity = db.getDbClient().ceActivityDao().selectByUuid(db.getSession(), task.getUuid());
+ assertThat(activity.isPresent()).isTrue();
+ assertThat(activity.get().getStatus()).isEqualTo(CeActivityDto.Status.FAILED);
+ assertThat(activity.get().getErrorType()).isEqualTo("TIMEOUT");
+ assertThat(activity.get().getErrorMessage()).isEqualTo("Failed on timeout");
+ assertThat(activity.get().getExecutedAt()).isEqualTo(NOW);
+ assertThat(activity.get().getWorkerUuid()).isEqualTo(WORKER_UUID);
+ }
+
+ @Test
+ public void fail_throws_exception_if_task_is_pending() {
+ CeTask task = submit(CeTaskTypes.REPORT, newComponent(randomAlphabetic(12)));
+ CeQueueDto queueDto = db.getDbClient().ceQueueDao().selectByUuid(db.getSession(), task.getUuid()).get();
+
+ Throwable thrown = catchThrowable(() -> underTest.fail(db.getSession(), queueDto, "TIMEOUT", "Failed on timeout"));
+
+ assertThat(thrown)
+ .isInstanceOf(IllegalStateException.class)
+ .hasMessage("Task is not in-progress and can't be marked as failed [uuid=" + task.getUuid() + "]");
+ }
private void verifyCeTask(CeTaskSubmit taskSubmit, CeTask task, @Nullable ComponentDto componentDto, UserDto userDto) {
verifyCeTask(taskSubmit, task, componentDto, componentDto, userDto);
public class InternalCeQueueImpl extends CeQueueImpl implements InternalCeQueue {
private static final Logger LOG = Loggers.get(InternalCeQueueImpl.class);
- private final System2 system2;
private final DbClient dbClient;
private final CEQueueStatus queueStatus;
private final ComputeEngineStatus computeEngineStatus;
public InternalCeQueueImpl(System2 system2, DbClient dbClient, UuidFactory uuidFactory, CEQueueStatus queueStatus,
DefaultOrganizationProvider defaultOrganizationProvider, ComputeEngineStatus computeEngineStatus) {
- super(dbClient, uuidFactory, defaultOrganizationProvider);
- this.system2 = system2;
+ super(system2, dbClient, uuidFactory, defaultOrganizationProvider);
this.dbClient = dbClient;
this.queueStatus = queueStatus;
this.computeEngineStatus = computeEngineStatus;
}
}
- private long updateExecutionFields(CeActivityDto activityDto) {
- Long startedAt = activityDto.getStartedAt();
- if (startedAt == null) {
- return 0L;
- }
- long now = system2.now();
- long executionTimeInMs = now - startedAt;
- activityDto.setExecutedAt(now);
- activityDto.setExecutionTimeMs(executionTimeInMs);
- return executionTimeInMs;
- }
-
@Override
public void cancelWornOuts() {
try (DbSession dbSession = dbClient.openSession(false)) {
return mapper(dbSession).selectWornout();
}
+ public List<CeQueueDto> selectInProgressStartedBefore(DbSession dbSession, long date) {
+ return mapper(dbSession).selectInProgressStartedBefore(date);
+ }
+
public void resetTasksWithUnknownWorkerUUIDs(DbSession dbSession, Set<String> knownWorkerUUIDs) {
if (knownWorkerUUIDs.isEmpty()) {
mapper(dbSession).resetAllInProgressTasks(system2.now());
return workerUuid;
}
- /**
- * Accessed by MyBatis through reflexion. Field is otherwise read-only.
- */
- protected CeQueueDto setWorkerUuid(@Nullable String workerUuid) {
+ public CeQueueDto setWorkerUuid(@Nullable String workerUuid) {
this.workerUuid = workerUuid;
return this;
}
return startedAt;
}
- /**
- * Accessed by MyBatis through reflexion. Field is otherwise read-only.
- */
- protected CeQueueDto setStartedAt(@Nullable Long l) {
+ public CeQueueDto setStartedAt(@Nullable Long l) {
this.startedAt = l;
return this;
}
*/
List<CeQueueDto> selectWornout();
+ /**
+ * The tasks that are in the in-progress status for too long
+ */
+ List<CeQueueDto> selectInProgressStartedBefore(@Param("date") long date);
+
/**
* Select all tasks whose worker UUID is not present in {@code knownWorkerUUIDs}
*/
private boolean onlyCurrents = false;
// SONAR-7681 a public implementation of List must be used in MyBatis - potential concurrency exceptions otherwise
+ @Nullable
private ArrayList<String> mainComponentUuids;
+ @Nullable
private ArrayList<String> statuses;
+ @Nullable
private String type;
+ @Nullable
private Long minSubmittedAt;
+ @Nullable
private Long maxExecutedAt;
+ @Nullable
+ private Long minExecutedAt;
+ @Nullable
+ private ArrayList<String> errorTypes;
@CheckForNull
public List<String> getMainComponentUuids() {
return this;
}
+ @CheckForNull
+ public List<String> getErrorTypes() {
+ return errorTypes;
+ }
+
+ public CeTaskQuery setErrorTypes(@Nullable List<String> l) {
+ this.errorTypes = l == null ? null : newArrayList(l);
+ return this;
+ }
+
@CheckForNull
public String getType() {
return type;
return this;
}
+ @CheckForNull
+ public Long getMinExecutedAt() {
+ return minExecutedAt;
+ }
+
+ public CeTaskQuery setMinExecutedAt(@Nullable Long l) {
+ this.minExecutedAt = l;
+ return this;
+ }
+
@CheckForNull
public Long getMinSubmittedAt() {
return minSubmittedAt;
#{status,jdbcType=VARCHAR}
</foreach>
</if>
+ <if test="query.errorTypes != null and !query.errorTypes.isEmpty()">
+ and ca.error_type in
+ <foreach collection="query.errorTypes" open="(" close=")" item="errorType" separator=",">
+ #{errorType,jdbcType=VARCHAR}
+ </foreach>
+ </if>
<if test="query.type != null">
and ca.task_type=#{query.type,jdbcType=VARCHAR}
</if>
<if test="query.maxExecutedAt != null">
and ca.executed_at <= #{query.maxExecutedAt,jdbcType=BIGINT}
</if>
+ <if test="query.minExecutedAt != null">
+ and ca.executed_at >= #{query.minExecutedAt,jdbcType=BIGINT}
+ </if>
</where>
</sql>
and cq.started_at is not null
</select>
+ <select id="selectInProgressStartedBefore" resultType="org.sonar.db.ce.CeQueueDto">
+ select
+ <include refid="columns"/>
+ from
+ ce_queue cq
+ where
+ cq.status = 'IN_PROGRESS'
+ and cq.started_at is not null
+ and cq.started_at < #{date,jdbcType=BIGINT}
+ </select>
+
<insert id="insert" parameterType="org.sonar.db.ce.CeQueueDto" useGeneratedKeys="false">
insert into ce_queue
(
main_component_uuid,
status,
submitter_uuid,
+ worker_uuid,
execution_count,
created_at,
- updated_at
+ updated_at,
+ started_at
)
values (
#{uuid,jdbcType=VARCHAR},
#{mainComponentUuid,jdbcType=VARCHAR},
#{status,jdbcType=VARCHAR},
#{submitterUuid,jdbcType=VARCHAR},
+ #{workerUuid,jdbcType=VARCHAR},
0,
#{createdAt,jdbcType=BIGINT},
- #{updatedAt,jdbcType=BIGINT}
+ #{updatedAt,jdbcType=BIGINT},
+ #{startedAt,jdbcType=BIGINT}
)
</insert>
.setMinSubmittedAt(1_400_000_000_000L)
.setMaxExecutedAt(1_475_000_000_000L);
assertThat(underTest.selectByQuery(db.getSession(), query, forPage(1).andSize(5))).extracting("uuid").containsOnly("UUID1");
+ }
+
+ @Test
+ public void select_by_minExecutedAt() {
+ insertWithDates("UUID1", 1_450_000_000_000L, 1_470_000_000_000L);
+ insertWithDates("UUID2", 1_460_000_000_000L, 1_480_000_000_000L);
+
+ CeTaskQuery query = new CeTaskQuery().setMinExecutedAt(1_460_000_000_000L);
+ assertThat(underTest.selectByQuery(db.getSession(), query, forPage(1).andSize(5))).extracting("uuid").containsExactlyInAnyOrder("UUID1", "UUID2");
+
+ query = new CeTaskQuery().setMinExecutedAt(1_475_000_000_000L);
+ assertThat(underTest.selectByQuery(db.getSession(), query, forPage(1).andSize(5))).extracting("uuid").containsExactlyInAnyOrder("UUID2");
+ query = new CeTaskQuery().setMinExecutedAt(1_485_000_000_000L);
+ assertThat(underTest.selectByQuery(db.getSession(), query, forPage(1).andSize(5))).isEmpty();
}
private void insertWithDates(String uuid, long submittedAt, long executedAt) {
assertThat(underTest.countByStatus(db.getSession(), IN_PROGRESS)).isEqualTo(2);
}
+ @Test
+ public void selectInProgressStartedBefore() {
+ // pending task is ignored
+ insertPending(newCeQueueDto(TASK_UUID_1)
+ .setStatus(PENDING)
+ .setStartedAt(1_000L));
+ // in-progress tasks
+ insertPending(newCeQueueDto(TASK_UUID_2)
+ .setStatus(IN_PROGRESS)
+ .setStartedAt(1_000L));
+ insertPending(newCeQueueDto(TASK_UUID_3)
+ .setStatus(IN_PROGRESS)
+ .setStartedAt(2_000L));
+
+ assertThat(underTest.selectInProgressStartedBefore(db.getSession(), 500L)).isEmpty();
+ assertThat(underTest.selectInProgressStartedBefore(db.getSession(), 1_000L)).isEmpty();
+ assertThat(underTest.selectInProgressStartedBefore(db.getSession(), 1_500L)).extracting(CeQueueDto::getUuid).containsExactly(TASK_UUID_2);
+ assertThat(underTest.selectInProgressStartedBefore(db.getSession(), 3_000L)).extracting(CeQueueDto::getUuid).containsExactlyInAnyOrder(TASK_UUID_2, TASK_UUID_3);
+ }
+
private void insertPending(CeQueueDto dto) {
underTest.insert(db.getSession(), dto);
db.commit();
}
private int pendingComponentUuidGenerator = new Random().nextInt(200);
+
private CeQueueDto insertPending(String uuid, String mainComponentUuid) {
CeQueueDto dto = new CeQueueDto();
dto.setUuid(uuid);
}
private void verifyCeQueueStatuses(String taskUuid1, CeQueueDto.Status taskStatus1, String taskUuid2, CeQueueDto.Status taskStatus2) {
- verifyCeQueueStatuses(new String[] {taskUuid1, taskUuid2}, new CeQueueDto.Status[] {taskStatus1, taskStatus2});
+ verifyCeQueueStatuses(new String[]{taskUuid1, taskUuid2}, new CeQueueDto.Status[]{taskStatus1, taskStatus2});
}
private void verifyCeQueueStatuses(String[] taskUuids, CeQueueDto.Status[] statuses) {
assertThat(underTest.getMainComponentUuids()).hasSize(CeTaskQuery.MAX_COMPONENT_UUIDS + 2);
assertThat(underTest.isShortCircuitedByMainComponentUuids()).isTrue();
}
+
+ @Test
+ public void test_errorTypes() {
+ assertThat(underTest.getErrorTypes()).isNull();
+
+ underTest.setErrorTypes(asList("foo", "bar"));
+ assertThat(underTest.getErrorTypes()).containsExactlyInAnyOrder("foo", "bar");
+ }
+
+ @Test
+ public void test_minExecutedAt() {
+ assertThat(underTest.getMinExecutedAt()).isNull();
+
+ underTest.setMinExecutedAt(1_000L);
+ assertThat(underTest.getMinExecutedAt()).isEqualTo(1_000L);
+ }
}
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
+import org.sonar.api.utils.System2;
+import org.sonar.api.utils.internal.TestSystem2;
import org.sonar.api.web.UserRole;
import org.sonar.ce.queue.CeQueue;
import org.sonar.ce.queue.CeQueueImpl;
public DbTester db = DbTester.create();
private DefaultOrganizationProvider defaultOrganizationProvider = TestDefaultOrganizationProvider.from(db);
- private CeQueue queue = new CeQueueImpl(db.getDbClient(), UuidFactoryFast.getInstance(), defaultOrganizationProvider);
+ private System2 system2 = new TestSystem2();
+ private CeQueue queue = new CeQueueImpl(system2, db.getDbClient(), UuidFactoryFast.getInstance(), defaultOrganizationProvider);
private CancelAction underTest = new CancelAction(userSession, db.getDbClient(), queue);
private WsActionTester tester = new WsActionTester(underTest);