@@ -22,6 +22,7 @@ package org.sonar.ce.queue; | |||
import java.util.Collection; | |||
import java.util.List; | |||
import java.util.Optional; | |||
import javax.annotation.Nullable; | |||
import org.sonar.ce.task.CeTask; | |||
import org.sonar.db.DbSession; | |||
import org.sonar.db.ce.CeQueueDto; | |||
@@ -86,6 +87,16 @@ public interface CeQueue { | |||
*/ | |||
int cancelAll(); | |||
/** | |||
* Mark a task in status {@link org.sonar.db.ce.CeQueueDto.Status#IN_PROGRESS} as failed. An unchecked | |||
* exception is thrown if the status is not {@link org.sonar.db.ce.CeQueueDto.Status#IN_PROGRESS}. | |||
* | |||
* The {@code dbSession} is committed. | |||
* @throws RuntimeException if the task is concurrently removed from the queue | |||
*/ | |||
void fail(DbSession dbSession, CeQueueDto ceQueueDto, @Nullable String errorType, @Nullable String errorMessage); | |||
/** | |||
* Requests workers to stop peeking tasks from queue. Does nothing if workers are already paused or being paused. | |||
* The workers that are already processing tasks are not interrupted. |
@@ -36,6 +36,7 @@ import java.util.stream.Stream; | |||
import javax.annotation.CheckForNull; | |||
import javax.annotation.Nullable; | |||
import org.sonar.api.server.ServerSide; | |||
import org.sonar.api.utils.System2; | |||
import org.sonar.api.utils.log.Loggers; | |||
import org.sonar.ce.task.CeTask; | |||
import org.sonar.core.util.UuidFactory; | |||
@@ -59,16 +60,19 @@ import static java.util.Optional.ofNullable; | |||
import static org.sonar.ce.queue.CeQueue.SubmitOption.UNIQUE_QUEUE_PER_MAIN_COMPONENT; | |||
import static org.sonar.core.util.stream.MoreCollectors.toEnumSet; | |||
import static org.sonar.core.util.stream.MoreCollectors.uniqueIndex; | |||
import static org.sonar.db.ce.CeQueueDto.Status.IN_PROGRESS; | |||
import static org.sonar.db.ce.CeQueueDto.Status.PENDING; | |||
@ServerSide | |||
public class CeQueueImpl implements CeQueue { | |||
private final System2 system2; | |||
private final DbClient dbClient; | |||
private final UuidFactory uuidFactory; | |||
private final DefaultOrganizationProvider defaultOrganizationProvider; | |||
public CeQueueImpl(DbClient dbClient, UuidFactory uuidFactory, DefaultOrganizationProvider defaultOrganizationProvider) { | |||
public CeQueueImpl(System2 system2, DbClient dbClient, UuidFactory uuidFactory, DefaultOrganizationProvider defaultOrganizationProvider) { | |||
this.system2 = system2; | |||
this.dbClient = dbClient; | |||
this.uuidFactory = uuidFactory; | |||
this.defaultOrganizationProvider = defaultOrganizationProvider; | |||
@@ -246,6 +250,29 @@ public class CeQueueImpl implements CeQueue { | |||
remove(dbSession, q, activityDto); | |||
} | |||
@Override | |||
public void fail(DbSession dbSession, CeQueueDto task, @Nullable String errorType, @Nullable String errorMessage) { | |||
checkState(IN_PROGRESS.equals(task.getStatus()), "Task is not in-progress and can't be marked as failed [uuid=%s]", task.getUuid()); | |||
CeActivityDto activityDto = new CeActivityDto(task); | |||
activityDto.setStatus(CeActivityDto.Status.FAILED); | |||
activityDto.setErrorType(errorType); | |||
activityDto.setErrorMessage(errorMessage); | |||
updateExecutionFields(activityDto); | |||
remove(dbSession, task, activityDto); | |||
} | |||
protected long updateExecutionFields(CeActivityDto activityDto) { | |||
Long startedAt = activityDto.getStartedAt(); | |||
if (startedAt == null) { | |||
return 0L; | |||
} | |||
long now = system2.now(); | |||
long executionTimeInMs = now - startedAt; | |||
activityDto.setExecutedAt(now); | |||
activityDto.setExecutionTimeMs(executionTimeInMs); | |||
return executionTimeInMs; | |||
} | |||
protected void remove(DbSession dbSession, CeQueueDto queueDto, CeActivityDto activityDto) { | |||
String taskUuid = queueDto.getUuid(); | |||
CeQueueDto.Status expectedQueueDtoStatus = queueDto.getStatus(); | |||
@@ -273,7 +300,7 @@ public class CeQueueImpl implements CeQueue { | |||
int count = 0; | |||
try (DbSession dbSession = dbClient.openSession(false)) { | |||
for (CeQueueDto queueDto : dbClient.ceQueueDao().selectAllInAscOrder(dbSession)) { | |||
if (includeInProgress || !queueDto.getStatus().equals(CeQueueDto.Status.IN_PROGRESS)) { | |||
if (includeInProgress || !queueDto.getStatus().equals(IN_PROGRESS)) { | |||
cancelImpl(dbSession, queueDto); | |||
count++; | |||
} | |||
@@ -305,7 +332,7 @@ public class CeQueueImpl implements CeQueue { | |||
if (!propValue.isPresent() || !propValue.get().equals("true")) { | |||
return WorkersPauseStatus.RESUMED; | |||
} | |||
int countInProgress = dbClient.ceQueueDao().countByStatus(dbSession, CeQueueDto.Status.IN_PROGRESS); | |||
int countInProgress = dbClient.ceQueueDao().countByStatus(dbSession, IN_PROGRESS); | |||
if (countInProgress > 0) { | |||
return WorkersPauseStatus.PAUSING; | |||
} |
@@ -52,6 +52,7 @@ import static java.util.Arrays.asList; | |||
import static java.util.Collections.emptyMap; | |||
import static org.apache.commons.lang.RandomStringUtils.randomAlphabetic; | |||
import static org.assertj.core.api.Assertions.assertThat; | |||
import static org.assertj.core.api.Assertions.catchThrowable; | |||
import static org.assertj.core.api.Assertions.tuple; | |||
import static org.hamcrest.Matchers.startsWith; | |||
import static org.sonar.ce.queue.CeQueue.SubmitOption.UNIQUE_QUEUE_PER_MAIN_COMPONENT; | |||
@@ -59,8 +60,9 @@ import static org.sonar.ce.queue.CeQueue.SubmitOption.UNIQUE_QUEUE_PER_MAIN_COMP | |||
public class CeQueueImplTest { | |||
private static final String WORKER_UUID = "workerUuid"; | |||
private static final long NOW = 1_450_000_000_000L; | |||
private System2 system2 = new TestSystem2().setNow(1_450_000_000_000L); | |||
private System2 system2 = new TestSystem2().setNow(NOW); | |||
@Rule | |||
public ExpectedException expectedException = ExpectedException.none(); | |||
@@ -72,7 +74,7 @@ public class CeQueueImplTest { | |||
private UuidFactory uuidFactory = UuidFactoryImpl.INSTANCE; | |||
private DefaultOrganizationProvider defaultOrganizationProvider = TestDefaultOrganizationProvider.from(db); | |||
private CeQueue underTest = new CeQueueImpl(db.getDbClient(), uuidFactory, defaultOrganizationProvider); | |||
private CeQueue underTest = new CeQueueImpl(system2, db.getDbClient(), uuidFactory, defaultOrganizationProvider); | |||
@Test | |||
public void submit_returns_task_populated_from_CeTaskSubmit_and_creates_CeQueue_row() { | |||
@@ -476,6 +478,33 @@ public class CeQueueImplTest { | |||
assertThat(underTest.getWorkersPauseStatus()).isEqualTo(CeQueue.WorkersPauseStatus.RESUMED); | |||
} | |||
@Test | |||
public void fail_in_progress_task() { | |||
CeTask task = submit(CeTaskTypes.REPORT, newComponent(randomAlphabetic(12))); | |||
CeQueueDto queueDto = db.getDbClient().ceQueueDao().peek(db.getSession(), WORKER_UUID).get(); | |||
underTest.fail(db.getSession(), queueDto, "TIMEOUT", "Failed on timeout"); | |||
Optional<CeActivityDto> activity = db.getDbClient().ceActivityDao().selectByUuid(db.getSession(), task.getUuid()); | |||
assertThat(activity.isPresent()).isTrue(); | |||
assertThat(activity.get().getStatus()).isEqualTo(CeActivityDto.Status.FAILED); | |||
assertThat(activity.get().getErrorType()).isEqualTo("TIMEOUT"); | |||
assertThat(activity.get().getErrorMessage()).isEqualTo("Failed on timeout"); | |||
assertThat(activity.get().getExecutedAt()).isEqualTo(NOW); | |||
assertThat(activity.get().getWorkerUuid()).isEqualTo(WORKER_UUID); | |||
} | |||
@Test | |||
public void fail_throws_exception_if_task_is_pending() { | |||
CeTask task = submit(CeTaskTypes.REPORT, newComponent(randomAlphabetic(12))); | |||
CeQueueDto queueDto = db.getDbClient().ceQueueDao().selectByUuid(db.getSession(), task.getUuid()).get(); | |||
Throwable thrown = catchThrowable(() -> underTest.fail(db.getSession(), queueDto, "TIMEOUT", "Failed on timeout")); | |||
assertThat(thrown) | |||
.isInstanceOf(IllegalStateException.class) | |||
.hasMessage("Task is not in-progress and can't be marked as failed [uuid=" + task.getUuid() + "]"); | |||
} | |||
private void verifyCeTask(CeTaskSubmit taskSubmit, CeTask task, @Nullable ComponentDto componentDto, UserDto userDto) { | |||
verifyCeTask(taskSubmit, task, componentDto, componentDto, userDto); |
@@ -60,15 +60,13 @@ import static org.sonar.core.util.stream.MoreCollectors.uniqueIndex; | |||
public class InternalCeQueueImpl extends CeQueueImpl implements InternalCeQueue { | |||
private static final Logger LOG = Loggers.get(InternalCeQueueImpl.class); | |||
private final System2 system2; | |||
private final DbClient dbClient; | |||
private final CEQueueStatus queueStatus; | |||
private final ComputeEngineStatus computeEngineStatus; | |||
public InternalCeQueueImpl(System2 system2, DbClient dbClient, UuidFactory uuidFactory, CEQueueStatus queueStatus, | |||
DefaultOrganizationProvider defaultOrganizationProvider, ComputeEngineStatus computeEngineStatus) { | |||
super(dbClient, uuidFactory, defaultOrganizationProvider); | |||
this.system2 = system2; | |||
super(system2, dbClient, uuidFactory, defaultOrganizationProvider); | |||
this.dbClient = dbClient; | |||
this.queueStatus = queueStatus; | |||
this.computeEngineStatus = computeEngineStatus; | |||
@@ -176,18 +174,6 @@ public class InternalCeQueueImpl extends CeQueueImpl implements InternalCeQueue | |||
} | |||
} | |||
private long updateExecutionFields(CeActivityDto activityDto) { | |||
Long startedAt = activityDto.getStartedAt(); | |||
if (startedAt == null) { | |||
return 0L; | |||
} | |||
long now = system2.now(); | |||
long executionTimeInMs = now - startedAt; | |||
activityDto.setExecutedAt(now); | |||
activityDto.setExecutionTimeMs(executionTimeInMs); | |||
return executionTimeInMs; | |||
} | |||
@Override | |||
public void cancelWornOuts() { | |||
try (DbSession dbSession = dbClient.openSession(false)) { |
@@ -93,6 +93,10 @@ public class CeQueueDao implements Dao { | |||
return mapper(dbSession).selectWornout(); | |||
} | |||
public List<CeQueueDto> selectInProgressStartedBefore(DbSession dbSession, long date) { | |||
return mapper(dbSession).selectInProgressStartedBefore(date); | |||
} | |||
public void resetTasksWithUnknownWorkerUUIDs(DbSession dbSession, Set<String> knownWorkerUUIDs) { | |||
if (knownWorkerUUIDs.isEmpty()) { | |||
mapper(dbSession).resetAllInProgressTasks(system2.now()); |
@@ -145,10 +145,7 @@ public class CeQueueDto { | |||
return workerUuid; | |||
} | |||
/** | |||
* Accessed by MyBatis through reflexion. Field is otherwise read-only. | |||
*/ | |||
protected CeQueueDto setWorkerUuid(@Nullable String workerUuid) { | |||
public CeQueueDto setWorkerUuid(@Nullable String workerUuid) { | |||
this.workerUuid = workerUuid; | |||
return this; | |||
} | |||
@@ -158,10 +155,7 @@ public class CeQueueDto { | |||
return startedAt; | |||
} | |||
/** | |||
* Accessed by MyBatis through reflexion. Field is otherwise read-only. | |||
*/ | |||
protected CeQueueDto setStartedAt(@Nullable Long l) { | |||
public CeQueueDto setStartedAt(@Nullable Long l) { | |||
this.startedAt = l; | |||
return this; | |||
} |
@@ -52,6 +52,11 @@ public interface CeQueueMapper { | |||
*/ | |||
List<CeQueueDto> selectWornout(); | |||
/** | |||
* The tasks that are in the in-progress status for too long | |||
*/ | |||
List<CeQueueDto> selectInProgressStartedBefore(@Param("date") long date); | |||
/** | |||
* Select all tasks whose worker UUID is not present in {@code knownWorkerUUIDs} | |||
*/ |
@@ -36,11 +36,20 @@ public class CeTaskQuery { | |||
private boolean onlyCurrents = false; | |||
// SONAR-7681 a public implementation of List must be used in MyBatis - potential concurrency exceptions otherwise | |||
@Nullable | |||
private ArrayList<String> mainComponentUuids; | |||
@Nullable | |||
private ArrayList<String> statuses; | |||
@Nullable | |||
private String type; | |||
@Nullable | |||
private Long minSubmittedAt; | |||
@Nullable | |||
private Long maxExecutedAt; | |||
@Nullable | |||
private Long minExecutedAt; | |||
@Nullable | |||
private ArrayList<String> errorTypes; | |||
@CheckForNull | |||
public List<String> getMainComponentUuids() { | |||
@@ -84,6 +93,16 @@ public class CeTaskQuery { | |||
return this; | |||
} | |||
@CheckForNull | |||
public List<String> getErrorTypes() { | |||
return errorTypes; | |||
} | |||
public CeTaskQuery setErrorTypes(@Nullable List<String> l) { | |||
this.errorTypes = l == null ? null : newArrayList(l); | |||
return this; | |||
} | |||
@CheckForNull | |||
public String getType() { | |||
return type; | |||
@@ -104,6 +123,16 @@ public class CeTaskQuery { | |||
return this; | |||
} | |||
@CheckForNull | |||
public Long getMinExecutedAt() { | |||
return minExecutedAt; | |||
} | |||
public CeTaskQuery setMinExecutedAt(@Nullable Long l) { | |||
this.minExecutedAt = l; | |||
return this; | |||
} | |||
@CheckForNull | |||
public Long getMinSubmittedAt() { | |||
return minSubmittedAt; |
@@ -111,6 +111,12 @@ | |||
#{status,jdbcType=VARCHAR} | |||
</foreach> | |||
</if> | |||
<if test="query.errorTypes != null and !query.errorTypes.isEmpty()"> | |||
and ca.error_type in | |||
<foreach collection="query.errorTypes" open="(" close=")" item="errorType" separator=","> | |||
#{errorType,jdbcType=VARCHAR} | |||
</foreach> | |||
</if> | |||
<if test="query.type != null"> | |||
and ca.task_type=#{query.type,jdbcType=VARCHAR} | |||
</if> | |||
@@ -120,6 +126,9 @@ | |||
<if test="query.maxExecutedAt != null"> | |||
and ca.executed_at <= #{query.maxExecutedAt,jdbcType=BIGINT} | |||
</if> | |||
<if test="query.minExecutedAt != null"> | |||
and ca.executed_at >= #{query.minExecutedAt,jdbcType=BIGINT} | |||
</if> | |||
</where> | |||
</sql> | |||
@@ -207,6 +207,17 @@ | |||
and cq.started_at is not null | |||
</select> | |||
<select id="selectInProgressStartedBefore" resultType="org.sonar.db.ce.CeQueueDto"> | |||
select | |||
<include refid="columns"/> | |||
from | |||
ce_queue cq | |||
where | |||
cq.status = 'IN_PROGRESS' | |||
and cq.started_at is not null | |||
and cq.started_at < #{date,jdbcType=BIGINT} | |||
</select> | |||
<insert id="insert" parameterType="org.sonar.db.ce.CeQueueDto" useGeneratedKeys="false"> | |||
insert into ce_queue | |||
( | |||
@@ -216,9 +227,11 @@ | |||
main_component_uuid, | |||
status, | |||
submitter_uuid, | |||
worker_uuid, | |||
execution_count, | |||
created_at, | |||
updated_at | |||
updated_at, | |||
started_at | |||
) | |||
values ( | |||
#{uuid,jdbcType=VARCHAR}, | |||
@@ -227,9 +240,11 @@ | |||
#{mainComponentUuid,jdbcType=VARCHAR}, | |||
#{status,jdbcType=VARCHAR}, | |||
#{submitterUuid,jdbcType=VARCHAR}, | |||
#{workerUuid,jdbcType=VARCHAR}, | |||
0, | |||
#{createdAt,jdbcType=BIGINT}, | |||
#{updatedAt,jdbcType=BIGINT} | |||
#{updatedAt,jdbcType=BIGINT}, | |||
#{startedAt,jdbcType=BIGINT} | |||
) | |||
</insert> | |||
@@ -557,7 +557,21 @@ public class CeActivityDaoTest { | |||
.setMinSubmittedAt(1_400_000_000_000L) | |||
.setMaxExecutedAt(1_475_000_000_000L); | |||
assertThat(underTest.selectByQuery(db.getSession(), query, forPage(1).andSize(5))).extracting("uuid").containsOnly("UUID1"); | |||
} | |||
@Test | |||
public void select_by_minExecutedAt() { | |||
insertWithDates("UUID1", 1_450_000_000_000L, 1_470_000_000_000L); | |||
insertWithDates("UUID2", 1_460_000_000_000L, 1_480_000_000_000L); | |||
CeTaskQuery query = new CeTaskQuery().setMinExecutedAt(1_460_000_000_000L); | |||
assertThat(underTest.selectByQuery(db.getSession(), query, forPage(1).andSize(5))).extracting("uuid").containsExactlyInAnyOrder("UUID1", "UUID2"); | |||
query = new CeTaskQuery().setMinExecutedAt(1_475_000_000_000L); | |||
assertThat(underTest.selectByQuery(db.getSession(), query, forPage(1).andSize(5))).extracting("uuid").containsExactlyInAnyOrder("UUID2"); | |||
query = new CeTaskQuery().setMinExecutedAt(1_485_000_000_000L); | |||
assertThat(underTest.selectByQuery(db.getSession(), query, forPage(1).andSize(5))).isEmpty(); | |||
} | |||
private void insertWithDates(String uuid, long submittedAt, long executedAt) { |
@@ -557,6 +557,26 @@ public class CeQueueDaoTest { | |||
assertThat(underTest.countByStatus(db.getSession(), IN_PROGRESS)).isEqualTo(2); | |||
} | |||
@Test | |||
public void selectInProgressStartedBefore() { | |||
// pending task is ignored | |||
insertPending(newCeQueueDto(TASK_UUID_1) | |||
.setStatus(PENDING) | |||
.setStartedAt(1_000L)); | |||
// in-progress tasks | |||
insertPending(newCeQueueDto(TASK_UUID_2) | |||
.setStatus(IN_PROGRESS) | |||
.setStartedAt(1_000L)); | |||
insertPending(newCeQueueDto(TASK_UUID_3) | |||
.setStatus(IN_PROGRESS) | |||
.setStartedAt(2_000L)); | |||
assertThat(underTest.selectInProgressStartedBefore(db.getSession(), 500L)).isEmpty(); | |||
assertThat(underTest.selectInProgressStartedBefore(db.getSession(), 1_000L)).isEmpty(); | |||
assertThat(underTest.selectInProgressStartedBefore(db.getSession(), 1_500L)).extracting(CeQueueDto::getUuid).containsExactly(TASK_UUID_2); | |||
assertThat(underTest.selectInProgressStartedBefore(db.getSession(), 3_000L)).extracting(CeQueueDto::getUuid).containsExactlyInAnyOrder(TASK_UUID_2, TASK_UUID_3); | |||
} | |||
private void insertPending(CeQueueDto dto) { | |||
underTest.insert(db.getSession(), dto); | |||
db.commit(); | |||
@@ -574,6 +594,7 @@ public class CeQueueDaoTest { | |||
} | |||
private int pendingComponentUuidGenerator = new Random().nextInt(200); | |||
private CeQueueDto insertPending(String uuid, String mainComponentUuid) { | |||
CeQueueDto dto = new CeQueueDto(); | |||
dto.setUuid(uuid); | |||
@@ -608,7 +629,7 @@ public class CeQueueDaoTest { | |||
} | |||
private void verifyCeQueueStatuses(String taskUuid1, CeQueueDto.Status taskStatus1, String taskUuid2, CeQueueDto.Status taskStatus2) { | |||
verifyCeQueueStatuses(new String[] {taskUuid1, taskUuid2}, new CeQueueDto.Status[] {taskStatus1, taskStatus2}); | |||
verifyCeQueueStatuses(new String[]{taskUuid1, taskUuid2}, new CeQueueDto.Status[]{taskStatus1, taskStatus2}); | |||
} | |||
private void verifyCeQueueStatuses(String[] taskUuids, CeQueueDto.Status[] statuses) { |
@@ -76,4 +76,20 @@ public class CeTaskQueryTest { | |||
assertThat(underTest.getMainComponentUuids()).hasSize(CeTaskQuery.MAX_COMPONENT_UUIDS + 2); | |||
assertThat(underTest.isShortCircuitedByMainComponentUuids()).isTrue(); | |||
} | |||
@Test | |||
public void test_errorTypes() { | |||
assertThat(underTest.getErrorTypes()).isNull(); | |||
underTest.setErrorTypes(asList("foo", "bar")); | |||
assertThat(underTest.getErrorTypes()).containsExactlyInAnyOrder("foo", "bar"); | |||
} | |||
@Test | |||
public void test_minExecutedAt() { | |||
assertThat(underTest.getMinExecutedAt()).isNull(); | |||
underTest.setMinExecutedAt(1_000L); | |||
assertThat(underTest.getMinExecutedAt()).isEqualTo(1_000L); | |||
} | |||
} |
@@ -23,6 +23,8 @@ import javax.annotation.Nullable; | |||
import org.junit.Rule; | |||
import org.junit.Test; | |||
import org.junit.rules.ExpectedException; | |||
import org.sonar.api.utils.System2; | |||
import org.sonar.api.utils.internal.TestSystem2; | |||
import org.sonar.api.web.UserRole; | |||
import org.sonar.ce.queue.CeQueue; | |||
import org.sonar.ce.queue.CeQueueImpl; | |||
@@ -53,7 +55,8 @@ public class CancelActionTest { | |||
public DbTester db = DbTester.create(); | |||
private DefaultOrganizationProvider defaultOrganizationProvider = TestDefaultOrganizationProvider.from(db); | |||
private CeQueue queue = new CeQueueImpl(db.getDbClient(), UuidFactoryFast.getInstance(), defaultOrganizationProvider); | |||
private System2 system2 = new TestSystem2(); | |||
private CeQueue queue = new CeQueueImpl(system2, db.getDbClient(), UuidFactoryFast.getInstance(), defaultOrganizationProvider); | |||
private CancelAction underTest = new CancelAction(userSession, db.getDbClient(), queue); | |||
private WsActionTester tester = new WsActionTester(underTest); |