Browse Source

SONAR-10917 automatically cancel tasks already executed once

also: make CeQueueDto#started_at and CeQueueDto#workerUuid read only fields as a new task can be inserted in queue only with these two fields null
tags/7.5
Sébastien Lesaint 6 years ago
parent
commit
5b947f0d78

+ 1
- 2
server/sonar-ce-common/src/main/java/org/sonar/ce/queue/CeQueueImpl.java View File

@@ -154,12 +154,11 @@ public class CeQueueImpl implements CeQueue {
dto.setComponentUuid(submission.getComponentUuid());
dto.setStatus(PENDING);
dto.setSubmitterUuid(submission.getSubmitterUuid());
dto.setStartedAt(null);
dbClient.ceQueueDao().insert(dbSession, dto);
return dto;
}

protected CeTask loadTask(DbSession dbSession, CeQueueDto dto) {
CeTask loadTask(DbSession dbSession, CeQueueDto dto) {
String componentUuid = dto.getComponentUuid();
if (componentUuid == null) {
return new CeQueueDtoToCeTask(defaultOrganizationProvider.get().getUuid()).apply(dto);

+ 11
- 0
server/sonar-ce/src/main/java/org/sonar/ce/cleaning/CeCleaningSchedulerImpl.java View File

@@ -56,9 +56,11 @@ public class CeCleaningSchedulerImpl implements CeCleaningScheduler {
Lock ceCleaningJobLock = ceDistributedInformation.acquireCleanJobLock();

// If we cannot lock that means that another job is running
// So we skip resetting and cancelling tasks in queue
if (ceCleaningJobLock.tryLock()) {
try {
resetTasksWithUnknownWorkerUUIDs();
cancelWornOuts();
} finally {
ceCleaningJobLock.unlock();
}
@@ -73,4 +75,13 @@ public class CeCleaningSchedulerImpl implements CeCleaningScheduler {
LOG.warn("Failed to reset tasks with unknown worker UUIDs", e);
}
}

private void cancelWornOuts() {
try {
LOG.debug("Cancelling any worn out task");
internalCeQueue.cancelWornOuts();
} catch (Exception e) {
LOG.warn("Failed to cancel worn out tasks", e);
}
}
}

+ 2
- 0
server/sonar-ce/src/main/java/org/sonar/ce/queue/InternalCeQueue.java View File

@@ -68,5 +68,7 @@ public interface InternalCeQueue extends CeQueue {
*/
void remove(CeTask task, Status status, @Nullable CeTaskResult taskResult, @Nullable Throwable error);

void cancelWornOuts();

void resetTasksWithUnknownWorkerUUIDs(Set<String> knownWorkerUUIDs);
}

+ 15
- 0
server/sonar-ce/src/main/java/org/sonar/ce/queue/InternalCeQueueImpl.java View File

@@ -23,6 +23,7 @@ import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.io.PrintWriter;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import javax.annotation.CheckForNull;
@@ -78,6 +79,7 @@ public class InternalCeQueueImpl extends CeQueueImpl implements InternalCeQueue
CeQueueDao ceQueueDao = dbClient.ceQueueDao();
int i = ceQueueDao.resetToPendingForWorker(dbSession, workerUuid);
if (i > 0) {
dbSession.commit();
LOG.debug("{} in progress tasks reset for worker uuid {}", i, workerUuid);
}
Optional<CeQueueDto> dto = ceQueueDao.peek(dbSession, workerUuid);
@@ -164,6 +166,19 @@ public class InternalCeQueueImpl extends CeQueueImpl implements InternalCeQueue
}
}

@Override
public void cancelWornOuts() {
try (DbSession dbSession = dbClient.openSession(false)) {
List<CeQueueDto> wornOutTasks = dbClient.ceQueueDao().selectWornout(dbSession);
wornOutTasks.forEach(queueDto -> {
CeActivityDto activityDto = new CeActivityDto(queueDto);
activityDto.setStatus(CeActivityDto.Status.CANCELED);
updateQueueStatus(CeActivityDto.Status.CANCELED, activityDto);
remove(dbSession, queueDto, activityDto);
});
}
}

@Override
public void resetTasksWithUnknownWorkerUUIDs(Set<String> knownWorkerUUIDs) {
try (DbSession dbSession = dbClient.openSession(false)) {

+ 40
- 80
server/sonar-ce/src/test/java/org/sonar/ce/queue/InternalCeQueueImplTest.java View File

@@ -43,6 +43,7 @@ import org.sonar.db.DbSession;
import org.sonar.db.DbTester;
import org.sonar.db.ce.CeActivityDto;
import org.sonar.db.ce.CeQueueDto;
import org.sonar.db.ce.CeQueueTesting;
import org.sonar.db.ce.CeTaskTypes;
import org.sonar.db.component.ComponentDto;
import org.sonar.db.component.ComponentTesting;
@@ -259,11 +260,11 @@ public class InternalCeQueueImplTest {

@Test
public void remove_copies_workerUuid() {
db.getDbClient().ceQueueDao().insert(session, new CeQueueDto()
CeQueueDto ceQueueDto = db.getDbClient().ceQueueDao().insert(session, new CeQueueDto()
.setUuid("uuid")
.setTaskType("foo")
.setStatus(CeQueueDto.Status.PENDING)
.setWorkerUuid("Dustin"));
.setStatus(CeQueueDto.Status.PENDING));
makeInProgress(ceQueueDto, "Dustin");
db.commit();

underTest.remove(new CeTask.Builder()
@@ -316,18 +317,15 @@ public class InternalCeQueueImplTest {
}

@Test
public void peek_overrides_workerUuid_to_argument() {
db.getDbClient().ceQueueDao().insert(session, new CeQueueDto()
public void peek_ignores_in_progress_tasks() {
CeQueueDto dto = db.getDbClient().ceQueueDao().insert(session, new CeQueueDto()
.setUuid("uuid")
.setTaskType("foo")
.setStatus(CeQueueDto.Status.PENDING)
.setWorkerUuid("must be overriden"));
.setStatus(CeQueueDto.Status.PENDING));
makeInProgress(dto, "foo");
db.commit();

underTest.peek(WORKER_UUID_1);

CeQueueDto ceQueueDto = db.getDbClient().ceQueueDao().selectByUuid(session, "uuid").get();
assertThat(ceQueueDto.getWorkerUuid()).isEqualTo(WORKER_UUID_1);
assertThat(underTest.peek(WORKER_UUID_1)).isEmpty();
}

@Test
@@ -352,8 +350,8 @@ public class InternalCeQueueImplTest {

@Test
public void peek_resets_to_pending_any_task_in_progress_for_specified_worker_uuid_and_updates_updatedAt() {
insertPending("u0", "doesn't matter"); // add a pending one that will be picked so that u1 isn't peek and status reset is visible in DB
CeQueueDto u1 = insertPending("u1", WORKER_UUID_1);// will be picked-because older than any of the reset ones
insertPending("u0"); // add a pending one that will be picked so that u1 isn't peek and status reset is visible in DB
CeQueueDto u1 = insertPending("u1");// will be picked-because older than any of the reset ones
CeQueueDto u2 = insertInProgress("u2", WORKER_UUID_1);// will be reset

assertThat(underTest.peek(WORKER_UUID_1).get().getUuid()).isEqualTo("u0");
@@ -364,7 +362,7 @@ public class InternalCeQueueImplTest {

@Test
public void peek_resets_to_pending_any_task_in_progress_for_specified_worker_uuid_and_only_this_uuid() {
insertPending("u0", "doesn't matter"); // add a pending one that will be picked so that u1 isn't peek and status reset is visible in DB
insertPending("u0"); // add a pending one that will be picked so that u1 isn't peek and status reset is visible in DB
CeQueueDto u1 = insertInProgress("u1", WORKER_UUID_1);
CeQueueDto u2 = insertInProgress("u2", WORKER_UUID_2);
CeQueueDto u3 = insertInProgress("u3", WORKER_UUID_1);
@@ -396,19 +394,18 @@ public class InternalCeQueueImplTest {
CeQueueDto dto = new CeQueueDto()
.setUuid(uuid)
.setTaskType("foo")
.setStatus(CeQueueDto.Status.IN_PROGRESS)
.setWorkerUuid(workerUuid);
.setStatus(CeQueueDto.Status.PENDING);
db.getDbClient().ceQueueDao().insert(session, dto);
makeInProgress(dto, workerUuid);
db.commit();
return dto;
return db.getDbClient().ceQueueDao().selectByUuid(session, uuid).get();
}

private CeQueueDto insertPending(String uuid, String workerUuid) {
private CeQueueDto insertPending(String uuid) {
CeQueueDto dto = new CeQueueDto()
.setUuid(uuid)
.setTaskType("foo")
.setStatus(CeQueueDto.Status.PENDING)
.setWorkerUuid(workerUuid);
.setStatus(CeQueueDto.Status.PENDING);
db.getDbClient().ceQueueDao().insert(session, dto);
db.commit();
return dto;
@@ -426,21 +423,6 @@ public class InternalCeQueueImplTest {
assertThat(activity.get().getStatus()).isEqualTo(CeActivityDto.Status.CANCELED);
}

@Test
public void cancel_copies_workerUuid() {
CeQueueDto ceQueueDto = db.getDbClient().ceQueueDao().insert(session, new CeQueueDto()
.setUuid("uuid")
.setTaskType("foo")
.setStatus(CeQueueDto.Status.PENDING)
.setWorkerUuid("Dustin"));
db.commit();

underTest.cancel(db.getSession(), ceQueueDto);

CeActivityDto dto = db.getDbClient().ceActivityDao().selectByUuid(db.getSession(), "uuid").get();
assertThat(dto.getWorkerUuid()).isEqualTo("Dustin");
}

@Test
public void fail_to_cancel_if_in_progress() {
CeTask task = submit(CeTaskTypes.REPORT, "PROJECT_1");
@@ -473,25 +455,19 @@ public class InternalCeQueueImplTest {

@Test
public void resetTasksWithUnknownWorkerUUIDs_reset_only_in_progress_tasks() {
CeQueueDto u1 = insertCeQueueDto("u1", CeQueueDto.Status.PENDING, null);
CeQueueDto u2 = insertCeQueueDto("u2", CeQueueDto.Status.PENDING, "worker1");
CeQueueDto u3 = insertCeQueueDto("u3", CeQueueDto.Status.PENDING, null);
CeQueueDto u4 = insertCeQueueDto("u4", CeQueueDto.Status.PENDING, "worker2");
CeQueueDto u5 = insertCeQueueDto("u5", CeQueueDto.Status.IN_PROGRESS, null);
CeQueueDto u6 = insertCeQueueDto("u6", CeQueueDto.Status.IN_PROGRESS, "worker1");
CeQueueDto u7 = insertCeQueueDto("u7", CeQueueDto.Status.IN_PROGRESS, "worker2");
CeQueueDto u8 = insertCeQueueDto("u8", CeQueueDto.Status.IN_PROGRESS, "worker3");
CeQueueDto u1 = insertCeQueueDto("u1");
CeQueueDto u2 = insertCeQueueDto("u2");
CeQueueDto u6 = insertInProgress("u6", "worker1");
CeQueueDto u7 = insertInProgress("u7", "worker2");
CeQueueDto u8 = insertInProgress("u8", "worker3");

underTest.resetTasksWithUnknownWorkerUUIDs(ImmutableSet.of("worker2", "worker3"));

// Pending tasks must not be modified even if a workerUUID is not present
verifyUnmodified(u1);
verifyUnmodified(u2);
verifyUnmodified(u3);
verifyUnmodified(u4);

// Unknown worker : null, "worker1"
verifyReset(u5);
verifyReset(u6);

// Known workers : "worker2", "worker3"
@@ -501,25 +477,19 @@ public class InternalCeQueueImplTest {

@Test
public void resetTasksWithUnknownWorkerUUIDs_with_empty_set_will_reset_all_in_progress_tasks() {
CeQueueDto u1 = insertCeQueueDto("u1", CeQueueDto.Status.PENDING, null);
CeQueueDto u2 = insertCeQueueDto("u2", CeQueueDto.Status.PENDING, "worker1");
CeQueueDto u3 = insertCeQueueDto("u3", CeQueueDto.Status.PENDING, null);
CeQueueDto u4 = insertCeQueueDto("u4", CeQueueDto.Status.PENDING, "worker2");
CeQueueDto u5 = insertCeQueueDto("u5", CeQueueDto.Status.IN_PROGRESS, null);
CeQueueDto u6 = insertCeQueueDto("u6", CeQueueDto.Status.IN_PROGRESS, "worker1");
CeQueueDto u7 = insertCeQueueDto("u7", CeQueueDto.Status.IN_PROGRESS, "worker2");
CeQueueDto u8 = insertCeQueueDto("u8", CeQueueDto.Status.IN_PROGRESS, "worker3");
CeQueueDto u1 = insertCeQueueDto("u1");
CeQueueDto u2 = insertCeQueueDto("u2");
CeQueueDto u6 = insertInProgress("u6", "worker1");
CeQueueDto u7 = insertInProgress("u7", "worker2");
CeQueueDto u8 = insertInProgress("u8", "worker3");

underTest.resetTasksWithUnknownWorkerUUIDs(ImmutableSet.of());

// Pending tasks must not be modified even if a workerUUID is not present
verifyUnmodified(u1);
verifyUnmodified(u2);
verifyUnmodified(u3);
verifyUnmodified(u4);

// Unknown worker : null, "worker1"
verifyReset(u5);
verifyReset(u6);
verifyReset(u7);
verifyReset(u8);
@@ -527,25 +497,19 @@ public class InternalCeQueueImplTest {

@Test
public void resetTasksWithUnknownWorkerUUIDs_with_worker_without_tasks_will_reset_all_in_progress_tasks() {
CeQueueDto u1 = insertCeQueueDto("u1", CeQueueDto.Status.PENDING, null);
CeQueueDto u2 = insertCeQueueDto("u2", CeQueueDto.Status.PENDING, "worker1");
CeQueueDto u3 = insertCeQueueDto("u3", CeQueueDto.Status.PENDING, null);
CeQueueDto u4 = insertCeQueueDto("u4", CeQueueDto.Status.PENDING, "worker2");
CeQueueDto u5 = insertCeQueueDto("u5", CeQueueDto.Status.IN_PROGRESS, null);
CeQueueDto u6 = insertCeQueueDto("u6", CeQueueDto.Status.IN_PROGRESS, "worker1");
CeQueueDto u7 = insertCeQueueDto("u7", CeQueueDto.Status.IN_PROGRESS, "worker2");
CeQueueDto u8 = insertCeQueueDto("u8", CeQueueDto.Status.IN_PROGRESS, "worker3");
CeQueueDto u1 = insertCeQueueDto("u1");
CeQueueDto u2 = insertCeQueueDto("u2");
CeQueueDto u6 = insertInProgress("u6", "worker1");
CeQueueDto u7 = insertInProgress("u7", "worker2");
CeQueueDto u8 = insertInProgress("u8", "worker3");

underTest.resetTasksWithUnknownWorkerUUIDs(ImmutableSet.of("worker1000", "worker1001"));

// Pending tasks must not be modified even if a workerUUID is not present
verifyUnmodified(u1);
verifyUnmodified(u2);
verifyUnmodified(u3);
verifyUnmodified(u4);

// Unknown worker : null, "worker1"
verifyReset(u5);
verifyReset(u6);
verifyReset(u7);
verifyReset(u8);
@@ -560,8 +524,7 @@ public class InternalCeQueueImplTest {
assertThat(dto.getStatus()).isEqualTo(CeQueueDto.Status.PENDING).isNotEqualTo(original.getStatus());
// UpdatedAt must have been updated
assertThat(dto.getUpdatedAt()).isNotEqualTo(original.getUpdatedAt());
// StartedAt must be null
assertThat(dto.getStartedAt()).isNull();
assertThat(dto.getStartedAt()).isEqualTo(original.getStartedAt());
// WorkerUuid must be null
assertThat(dto.getWorkerUuid()).isNull();
}
@@ -573,19 +536,11 @@ public class InternalCeQueueImplTest {
assertThat(dto.getUpdatedAt()).isEqualTo(original.getUpdatedAt());
}

private void verifyCanceled(CeQueueDto original) {
assertThat(db.getDbClient().ceQueueDao().selectByUuid(db.getSession(), original.getUuid())).isEmpty();
CeActivityDto dto = db.getDbClient().ceActivityDao().selectByUuid(db.getSession(), original.getUuid()).get();
assertThat(dto.getStatus()).isEqualTo(CeActivityDto.Status.CANCELED);
assertThat(dto.getWorkerUuid()).isEqualTo(original.getWorkerUuid());
}

private CeQueueDto insertCeQueueDto(String uuid, CeQueueDto.Status status, String workerUuid) {
private CeQueueDto insertCeQueueDto(String uuid) {
CeQueueDto dto = new CeQueueDto()
.setUuid(uuid)
.setTaskType("foo")
.setStatus(status)
.setWorkerUuid(workerUuid);
.setStatus(CeQueueDto.Status.PENDING);
db.getDbClient().ceQueueDao().insert(db.getSession(), dto);
db.commit();
return dto;
@@ -652,6 +607,11 @@ public class InternalCeQueueImplTest {
return componentDto;
}

private CeQueueDto makeInProgress(CeQueueDto ceQueueDto, String workerUuid) {
CeQueueTesting.makeInProgress(session, workerUuid, system2.now(), ceQueueDto);
return db.getDbClient().ceQueueDao().selectByUuid(session, ceQueueDto.getUuid()).get();
}

private static String stacktraceToString(Throwable error) {
ByteArrayOutputStream out = new ByteArrayOutputStream();
error.printStackTrace(new PrintStream(out));

+ 4
- 0
server/sonar-db-dao/src/main/java/org/sonar/db/ce/CeQueueDao.java View File

@@ -89,6 +89,10 @@ public class CeQueueDao implements Dao {
return mapper(dbSession).selectPending();
}

public List<CeQueueDto> selectWornout(DbSession dbSession) {
return mapper(dbSession).selectWornout();
}

public void resetTasksWithUnknownWorkerUUIDs(DbSession dbSession, Set<String> knownWorkerUUIDs) {
if (knownWorkerUUIDs.isEmpty()) {
mapper(dbSession).resetAllInProgressTasks(system2.now());

+ 8
- 5
server/sonar-db-dao/src/main/java/org/sonar/db/ce/CeQueueDto.java View File

@@ -98,10 +98,11 @@ public class CeQueueDto {
return workerUuid;
}

public CeQueueDto setWorkerUuid(@Nullable String workerUuid) {
checkArgument(workerUuid == null || workerUuid.length() <= 40, "worker uuid is too long: %s", workerUuid);
/**
* Accessed by MyBatis through reflexion. Field is otherwise read-only.
*/
private void setWorkerUuid(@Nullable String workerUuid) {
this.workerUuid = workerUuid;
return this;
}

@CheckForNull
@@ -109,9 +110,11 @@ public class CeQueueDto {
return startedAt;
}

public CeQueueDto setStartedAt(@Nullable Long l) {
/**
* Accessed by MyBatis through reflexion. Field is otherwise read-only.
*/
private void setStartedAt(@Nullable Long l) {
this.startedAt = l;
return this;
}

public long getCreatedAt() {

+ 5
- 0
server/sonar-db-dao/src/main/java/org/sonar/db/ce/CeQueueMapper.java View File

@@ -47,6 +47,11 @@ public interface CeQueueMapper {
*/
List<CeQueueDto> selectPending();

/**
* Select all pending tasks which have already been started.
*/
List<CeQueueDto> selectWornout();

/**
* Select all tasks whose worker UUID is not present in {@code knownWorkerUUIDs}
*/

+ 3
- 2
server/sonar-db-dao/src/main/java/org/sonar/db/ce/UpdateIf.java View File

@@ -23,6 +23,7 @@ import javax.annotation.CheckForNull;
import javax.annotation.Nullable;
import javax.annotation.concurrent.Immutable;

import static com.google.common.base.Preconditions.checkArgument;
import static java.util.Objects.requireNonNull;

final class UpdateIf {
@@ -38,7 +39,8 @@ final class UpdateIf {
private final long updatedAt;

NewProperties(CeQueueDto.Status status, @Nullable String workerUuid,
Long startedAt, long updatedAt) {
long startedAt, long updatedAt) {
checkArgument(workerUuid == null || workerUuid.length() <= 40, "worker uuid is too long: %s", workerUuid);
this.status = requireNonNull(status, "status can't be null");
this.workerUuid = workerUuid;
this.startedAt = startedAt;
@@ -54,7 +56,6 @@ final class UpdateIf {
return workerUuid;
}

@CheckForNull
public Long getStartedAt() {
return startedAt;
}

+ 11
- 7
server/sonar-db-dao/src/main/resources/org/sonar/db/ce/CeQueueMapper.xml View File

@@ -169,6 +169,7 @@
ce_queue cq
where
cq.status='PENDING'
and cq.started_at is null
and not exists (
select
1
@@ -195,6 +196,16 @@
cq.status = 'PENDING'
</select>

<select id="selectWornout" resultType="org.sonar.db.ce.CeQueueDto">
select
<include refid="columns"/>
from
ce_queue cq
where
cq.status = 'PENDING'
and cq.started_at is not null
</select>

<insert id="insert" parameterType="org.sonar.db.ce.CeQueueDto" useGeneratedKeys="false">
insert into ce_queue
(
@@ -203,9 +214,7 @@
component_uuid,
status,
submitter_uuid,
worker_uuid,
execution_count,
started_at,
created_at,
updated_at
)
@@ -215,9 +224,7 @@
#{componentUuid,jdbcType=VARCHAR},
#{status,jdbcType=VARCHAR},
#{submitterUuid,jdbcType=VARCHAR},
#{workerUuid,jdbcType=VARCHAR},
0,
#{startedAt,jdbcType=BIGINT},
#{createdAt,jdbcType=BIGINT},
#{updatedAt,jdbcType=BIGINT}
)
@@ -226,7 +233,6 @@
<update id="resetToPendingForWorker">
update ce_queue set
status='PENDING',
started_at=NULL,
updated_at=#{updatedAt,jdbcType=BIGINT}
where
status &lt;&gt; 'PENDING'
@@ -255,7 +261,6 @@
update ce_queue set
status='PENDING',
worker_uuid=NULL,
started_at=NULL,
updated_at=#{updatedAt,jdbcType=BIGINT}
where
status = 'IN_PROGRESS'
@@ -272,7 +277,6 @@
update ce_queue set
status='PENDING',
worker_uuid=NULL,
started_at=NULL,
updated_at=#{updatedAt,jdbcType=BIGINT}
where
status = 'IN_PROGRESS'

+ 18
- 11
server/sonar-db-dao/src/test/java/org/sonar/db/ce/CeActivityDaoTest.java View File

@@ -41,6 +41,8 @@ import static org.assertj.core.api.Assertions.assertThat;
import static org.sonar.db.Pagination.forPage;
import static org.sonar.db.ce.CeActivityDto.Status.FAILED;
import static org.sonar.db.ce.CeActivityDto.Status.SUCCESS;
import static org.sonar.db.ce.CeQueueDto.Status.PENDING;
import static org.sonar.db.ce.CeQueueTesting.makeInProgress;
import static org.sonar.db.ce.CeTaskTypes.REPORT;

public class CeActivityDaoTest {
@@ -64,7 +66,7 @@ public class CeActivityDaoTest {
assertThat(dto.getComponentUuid()).isEqualTo("PROJECT_1");
assertThat(dto.getStatus()).isEqualTo(CeActivityDto.Status.SUCCESS);
assertThat(dto.getSubmitterUuid()).isEqualTo("submitter uuid");
assertThat(dto.getSubmittedAt()).isEqualTo(1_300_000_000_000L);
assertThat(dto.getSubmittedAt()).isEqualTo(1_450_000_000_000L);
assertThat(dto.getWorkerUuid()).isEqualTo("worker uuid");
assertThat(dto.getIsLast()).isTrue();
assertThat(dto.getIsLastKey()).isEqualTo("REPORTPROJECT_1");
@@ -344,15 +346,20 @@ public class CeActivityDaoTest {
}

private CeActivityDto createActivityDto(String uuid, String type, String componentUuid, CeActivityDto.Status status) {
CeQueueDto queueDto = new CeQueueDto();
queueDto.setUuid(uuid);
queueDto.setTaskType(type);
queueDto.setComponentUuid(componentUuid);
queueDto.setSubmitterUuid("submitter uuid");
queueDto.setWorkerUuid("worker uuid");
queueDto.setCreatedAt(1_300_000_000_000L);
CeQueueDto creating = new CeQueueDto();
creating.setUuid(uuid);
creating.setStatus(PENDING);
creating.setTaskType(type);
creating.setComponentUuid(componentUuid);
creating.setSubmitterUuid("submitter uuid");
creating.setCreatedAt(1_300_000_000_000L);

CeActivityDto dto = new CeActivityDto(queueDto);
db.getDbClient().ceQueueDao().insert(dbSession, creating);
makeInProgress(dbSession, "worker uuid", 1_400_000_000_000L, creating);

CeQueueDto ceQueueDto = db.getDbClient().ceQueueDao().selectByUuid(dbSession, uuid).get();

CeActivityDto dto = new CeActivityDto(ceQueueDto);
dto.setStatus(status);
dto.setStartedAt(1_500_000_000_000L);
dto.setExecutedAt(1_500_000_000_500L);
@@ -385,8 +392,8 @@ public class CeActivityDaoTest {

private List<String> selectPageOfUuids(Pagination pagination) {
return underTest.selectByQuery(db.getSession(), new CeTaskQuery(), pagination).stream()
.map(CeActivityToUuid.INSTANCE::apply)
.collect(MoreCollectors.toList());
.map(CeActivityToUuid.INSTANCE::apply)
.collect(MoreCollectors.toList());
}

private enum CeActivityToUuid implements Function<CeActivityDto, String> {

+ 127
- 116
server/sonar-db-dao/src/test/java/org/sonar/db/ce/CeQueueDaoTest.java View File

@@ -42,11 +42,13 @@ import static com.google.common.collect.Lists.newArrayList;
import static java.util.Collections.singletonList;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.entry;
import static org.assertj.core.groups.Tuple.tuple;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.sonar.db.ce.CeQueueDto.Status.IN_PROGRESS;
import static org.sonar.db.ce.CeQueueDto.Status.PENDING;
import static org.sonar.db.ce.CeQueueTesting.newCeQueueDto;
import static org.sonar.db.ce.CeQueueTesting.reset;

public class CeQueueDaoTest {
private static final long INIT_TIME = 1_450_000_000_000L;
@@ -66,10 +68,11 @@ public class CeQueueDaoTest {
public DbTester db = DbTester.create(system2);

private System2 mockedSystem2 = mock(System2.class);
private System2 alwaysIncreasingSystem2 = new AlwaysIncreasingSystem2();

private CeQueueDao underTest = new CeQueueDao(system2);
private CeQueueDao underTestWithSystem2Mock = new CeQueueDao(mockedSystem2);
private CeQueueDao underTestAlwaysIncreasingSystem2 = new CeQueueDao(new AlwaysIncreasingSystem2());
private CeQueueDao underTestAlwaysIncreasingSystem2 = new CeQueueDao(alwaysIncreasingSystem2);

@Test
public void insert_populates_createdAt_and_updateAt_from_System2_with_same_value_if_any_is_not_set() {
@@ -78,8 +81,7 @@ public class CeQueueDaoTest {
.setTaskType(CeTaskTypes.REPORT)
.setComponentUuid(COMPONENT_UUID_1)
.setStatus(PENDING)
.setSubmitterUuid(SUBMITTER_LOGIN)
.setWorkerUuid(WORKER_UUID_1);
.setSubmitterUuid(SUBMITTER_LOGIN);

mockSystem2ForSingleCall(now);
underTestWithSystem2Mock.insert(db.getSession(), dto.setUuid(TASK_UUID_1));
@@ -100,7 +102,7 @@ public class CeQueueDaoTest {
assertThat(saved.getComponentUuid()).isEqualTo(COMPONENT_UUID_1);
assertThat(saved.getStatus()).isEqualTo(PENDING);
assertThat(saved.getSubmitterUuid()).isEqualTo(SUBMITTER_LOGIN);
assertThat(saved.getWorkerUuid()).isEqualTo(WORKER_UUID_1);
assertThat(saved.getWorkerUuid()).isNull();
assertThat(saved.getCreatedAt()).isEqualTo(now);
assertThat(saved.getUpdatedAt()).isEqualTo(now);
assertThat(saved.getStartedAt()).isNull();
@@ -111,7 +113,7 @@ public class CeQueueDaoTest {
assertThat(saved.getComponentUuid()).isEqualTo(COMPONENT_UUID_1);
assertThat(saved.getStatus()).isEqualTo(PENDING);
assertThat(saved.getSubmitterUuid()).isEqualTo(SUBMITTER_LOGIN);
assertThat(saved.getWorkerUuid()).isEqualTo(WORKER_UUID_1);
assertThat(saved.getWorkerUuid()).isNull();
assertThat(saved.getCreatedAt()).isEqualTo(6_888_777L);
assertThat(saved.getUpdatedAt()).isEqualTo(8_000_999L);
assertThat(saved.getStartedAt()).isNull();
@@ -119,7 +121,7 @@ public class CeQueueDaoTest {

@Test
public void test_selectByUuid() {
insert(TASK_UUID_1, COMPONENT_UUID_1, PENDING);
insertPending(TASK_UUID_1, COMPONENT_UUID_1);

assertThat(underTest.selectByUuid(db.getSession(), "TASK_UNKNOWN").isPresent()).isFalse();
CeQueueDto saved = underTest.selectByUuid(db.getSession(), TASK_UUID_1).get();
@@ -136,9 +138,9 @@ public class CeQueueDaoTest {

@Test
public void test_selectByComponentUuid() {
insert(TASK_UUID_1, COMPONENT_UUID_1, PENDING);
insert(TASK_UUID_2, COMPONENT_UUID_1, PENDING);
insert(TASK_UUID_3, "PROJECT_2", PENDING);
insertPending(TASK_UUID_1, COMPONENT_UUID_1);
insertPending(TASK_UUID_2, COMPONENT_UUID_1);
insertPending(TASK_UUID_3, "PROJECT_2");

assertThat(underTest.selectByComponentUuid(db.getSession(), "UNKNOWN")).isEmpty();
assertThat(underTest.selectByComponentUuid(db.getSession(), COMPONENT_UUID_1)).extracting("uuid").containsOnly(TASK_UUID_1, TASK_UUID_2);
@@ -147,30 +149,44 @@ public class CeQueueDaoTest {

@Test
public void test_selectAllInAscOrder() {
insert(TASK_UUID_1, COMPONENT_UUID_1, PENDING);
insert(TASK_UUID_2, COMPONENT_UUID_1, PENDING);
insert(TASK_UUID_3, "PROJECT_2", PENDING);
insertPending(TASK_UUID_1, COMPONENT_UUID_1);
insertPending(TASK_UUID_2, COMPONENT_UUID_1);
insertPending(TASK_UUID_3, "PROJECT_2");

assertThat(underTest.selectAllInAscOrder(db.getSession())).extracting("uuid").containsOnly(TASK_UUID_1, TASK_UUID_2, TASK_UUID_3);
}

@Test
public void selectPending_returns_pending_tasks() {
insert("p1", CeQueueDto.Status.PENDING);
insert("p2", CeQueueDto.Status.PENDING);
insert("p3", CeQueueDto.Status.PENDING);
insert("i1", CeQueueDto.Status.IN_PROGRESS);
insert("i2", CeQueueDto.Status.IN_PROGRESS);
insert("i3", CeQueueDto.Status.IN_PROGRESS);
insertPending("p1");
insertPending("p2");
insertPending("p3");
makeInProgress("w1", alwaysIncreasingSystem2.now(), insertPending("i1"));
makeInProgress("w1", alwaysIncreasingSystem2.now(), insertPending("i2"));
makeInProgress("w1", alwaysIncreasingSystem2.now(), insertPending("i3"));

assertThat(underTest.selectPending(db.getSession()))
.extracting(CeQueueDto::getUuid)
.containsOnly("p1", "p2", "p3");
}

@Test
public void selectWornout_returns_task_pending_with_a_non_null_startedAt() {
insertPending("p1");
makeInProgress("w1", alwaysIncreasingSystem2.now(), insertPending("i1"));
CeQueueDto resetDto = makeInProgress("w1", alwaysIncreasingSystem2.now(), insertPending("i2"));
makeInProgress("w1", alwaysIncreasingSystem2.now(), insertPending("i3"));
reset(db.getSession(), alwaysIncreasingSystem2.now(), resetDto);

List<CeQueueDto> ceQueueDtos = underTest.selectWornout(db.getSession());
assertThat(ceQueueDtos)
.extracting(CeQueueDto::getStatus, CeQueueDto::getUuid)
.containsOnly(tuple(PENDING, resetDto.getUuid()));
}

@Test
public void test_delete() {
insert(TASK_UUID_1, COMPONENT_UUID_1, PENDING);
insertPending(TASK_UUID_1, COMPONENT_UUID_1);

underTest.deleteByUuid(db.getSession(), "UNKNOWN");
assertThat(underTest.selectByUuid(db.getSession(), TASK_UUID_1)).isPresent();
@@ -181,92 +197,91 @@ public class CeQueueDaoTest {

@Test
public void resetToPendingForWorker_resets_status_of_non_pending_tasks_only_for_specified_workerUuid() {
long startedAt = 2_099_888L;
CeQueueDto u1 = insert("u1", CeQueueDto.Status.IN_PROGRESS, WORKER_UUID_1, startedAt);
CeQueueDto u2 = insert("u2", CeQueueDto.Status.PENDING, WORKER_UUID_1, startedAt);
CeQueueDto u3 = insert("u3", CeQueueDto.Status.PENDING, WORKER_UUID_1, startedAt);
CeQueueDto u4 = insert("u4", CeQueueDto.Status.IN_PROGRESS, WORKER_UUID_1, startedAt);
CeQueueDto o1 = insert("o1", CeQueueDto.Status.IN_PROGRESS, WORKER_UUID_2, startedAt);
CeQueueDto o2 = insert("o2", CeQueueDto.Status.PENDING, WORKER_UUID_2, startedAt);
CeQueueDto o3 = insert("o3", CeQueueDto.Status.PENDING, WORKER_UUID_2, startedAt);
CeQueueDto o4 = insert("o4", CeQueueDto.Status.IN_PROGRESS, WORKER_UUID_2, startedAt);
CeQueueDto[] worker1 = {insertPending("u1"), insertPending("u2"), insertPending("u3"), insertPending("u4")};
CeQueueDto[] worker2 = {insertPending("o1"), insertPending("o2"), insertPending("o3"), insertPending("o4")};
long startedAt = alwaysIncreasingSystem2.now();
makeInProgress(WORKER_UUID_1, startedAt, worker1[0]);
makeInProgress(WORKER_UUID_1, startedAt, worker1[3]);
makeInProgress(WORKER_UUID_2, startedAt, worker2[0]);
makeInProgress(WORKER_UUID_2, startedAt, worker2[3]);

underTestAlwaysIncreasingSystem2.resetToPendingForWorker(db.getSession(), WORKER_UUID_1);

verifyResetToPendingForWorker(u1);
verifyUnchangedByResetToPendingForWorker(u2);
verifyUnchangedByResetToPendingForWorker(u3);
verifyResetToPendingForWorker(u4);
verifyUnchangedByResetToPendingForWorker(o1);
verifyUnchangedByResetToPendingForWorker(o2);
verifyUnchangedByResetToPendingForWorker(o3);
verifyUnchangedByResetToPendingForWorker(o4);
verifyResetToPendingForWorker(worker1[0], WORKER_UUID_1, startedAt);
verifyUnchangedByResetToPendingForWorker(worker1[1]);
verifyUnchangedByResetToPendingForWorker(worker1[2]);
verifyResetToPendingForWorker(worker1[3], WORKER_UUID_1, startedAt);
verifyInProgressUnchangedByResetToPendingForWorker(worker2[0], WORKER_UUID_2, startedAt);
verifyUnchangedByResetToPendingForWorker(worker2[1]);
verifyUnchangedByResetToPendingForWorker(worker2[2]);
verifyInProgressUnchangedByResetToPendingForWorker(worker2[3], WORKER_UUID_2, startedAt);
}

@Test
public void resetTasksWithUnknownWorkerUUIDs_with_empty_set_resets_status_of_all_pending_tasks() {
long startedAt = 2_099_888L;
CeQueueDto u1 = insert("u1", CeQueueDto.Status.IN_PROGRESS, WORKER_UUID_1, startedAt);
CeQueueDto u2 = insert("u2", CeQueueDto.Status.PENDING, WORKER_UUID_1, startedAt);
CeQueueDto u3 = insert("u3", CeQueueDto.Status.PENDING, WORKER_UUID_1, startedAt);
CeQueueDto u4 = insert("u4", CeQueueDto.Status.IN_PROGRESS, WORKER_UUID_1, startedAt);
CeQueueDto o1 = insert("o1", CeQueueDto.Status.IN_PROGRESS, WORKER_UUID_2, startedAt);
CeQueueDto o2 = insert("o2", CeQueueDto.Status.PENDING, WORKER_UUID_2, startedAt);
CeQueueDto o3 = insert("o3", CeQueueDto.Status.PENDING, WORKER_UUID_2, startedAt);
CeQueueDto o4 = insert("o4", CeQueueDto.Status.IN_PROGRESS, WORKER_UUID_2, startedAt);
CeQueueDto[] worker1 = {insertPending("u1"), insertPending("u2"), insertPending("u3"), insertPending("u4")};
CeQueueDto[] worker2 = {insertPending("o1"), insertPending("o2"), insertPending("o3"), insertPending("o4")};
long startedAt = alwaysIncreasingSystem2.now();
makeInProgress(WORKER_UUID_1, startedAt, worker1[0]);
makeInProgress(WORKER_UUID_1, startedAt, worker1[3]);
makeInProgress(WORKER_UUID_2, startedAt, worker2[0]);
makeInProgress(WORKER_UUID_2, startedAt, worker2[3]);

underTestAlwaysIncreasingSystem2.resetTasksWithUnknownWorkerUUIDs(db.getSession(), ImmutableSet.of());

verifyResetByResetTasks(u1);
verifyUnchangedByResetToPendingForWorker(u2);
verifyUnchangedByResetToPendingForWorker(u3);
verifyResetByResetTasks(u4);
verifyResetByResetTasks(o1);
verifyUnchangedByResetToPendingForWorker(o2);
verifyUnchangedByResetToPendingForWorker(o3);
verifyResetByResetTasks(o4);
verifyResetByResetTasks(worker1[0], startedAt);
verifyUnchangedByResetToPendingForWorker(worker1[1]);
verifyUnchangedByResetToPendingForWorker(worker1[2]);
verifyResetByResetTasks(worker1[3], startedAt);
verifyResetByResetTasks(worker2[0], startedAt);
verifyUnchangedByResetToPendingForWorker(worker2[1]);
verifyUnchangedByResetToPendingForWorker(worker2[2]);
verifyResetByResetTasks(worker2[3], startedAt);
}

@Test
public void resetTasksWithUnknownWorkerUUIDs_set_resets_status_of_all_pending_tasks_with_unknown_workers() {
long startedAt = 2_099_888L;
CeQueueDto u1 = insert("u1", CeQueueDto.Status.IN_PROGRESS, WORKER_UUID_1, startedAt);
CeQueueDto u2 = insert("u2", CeQueueDto.Status.PENDING, WORKER_UUID_1, startedAt);
CeQueueDto u3 = insert("u3", CeQueueDto.Status.PENDING, WORKER_UUID_1, startedAt);
CeQueueDto u4 = insert("u4", CeQueueDto.Status.IN_PROGRESS, WORKER_UUID_1, startedAt);
CeQueueDto o1 = insert("o1", CeQueueDto.Status.IN_PROGRESS, WORKER_UUID_2, startedAt);
CeQueueDto o2 = insert("o2", CeQueueDto.Status.PENDING, WORKER_UUID_2, startedAt);
CeQueueDto o3 = insert("o3", CeQueueDto.Status.PENDING, WORKER_UUID_2, startedAt);
CeQueueDto o4 = insert("o4", CeQueueDto.Status.IN_PROGRESS, WORKER_UUID_2, startedAt);
CeQueueDto[] worker1 = {insertPending("u1"), insertPending("u2"), insertPending("u3"), insertPending("u4")};
CeQueueDto[] worker2 = {insertPending("o1"), insertPending("o2"), insertPending("o3"), insertPending("o4")};
long startedAt = alwaysIncreasingSystem2.now();
makeInProgress(WORKER_UUID_1, startedAt, worker1[0]);
makeInProgress(WORKER_UUID_1, startedAt, worker1[3]);
makeInProgress(WORKER_UUID_2, startedAt, worker2[0]);
makeInProgress(WORKER_UUID_2, startedAt, worker2[3]);

underTestAlwaysIncreasingSystem2.resetTasksWithUnknownWorkerUUIDs(db.getSession(), ImmutableSet.of(WORKER_UUID_1, "unknown"));

verifyUnchangedByResetToPendingForWorker(u1);
verifyUnchangedByResetToPendingForWorker(u2);
verifyUnchangedByResetToPendingForWorker(u3);
verifyUnchangedByResetToPendingForWorker(u4);
verifyResetByResetTasks(o1);
verifyUnchangedByResetToPendingForWorker(o2);
verifyUnchangedByResetToPendingForWorker(o3);
verifyResetByResetTasks(o4);
verifyInProgressUnchangedByResetToPendingForWorker(worker1[0], WORKER_UUID_1, startedAt);
verifyUnchangedByResetToPendingForWorker(worker1[1]);
verifyUnchangedByResetToPendingForWorker(worker1[2]);
verifyInProgressUnchangedByResetToPendingForWorker(worker1[3], WORKER_UUID_1, startedAt);
verifyResetByResetTasks(worker2[0], startedAt);
verifyUnchangedByResetToPendingForWorker(worker2[1]);
verifyUnchangedByResetToPendingForWorker(worker2[2]);
verifyResetByResetTasks(worker2[3], startedAt);
}

private CeQueueDto makeInProgress(String workerUuid, long startedAt, CeQueueDto ceQueueDto) {
CeQueueTesting.makeInProgress(db.getSession(), workerUuid, startedAt, ceQueueDto);
return underTestAlwaysIncreasingSystem2.selectByUuid(db.getSession(), ceQueueDto.getUuid()).get();
}

private void verifyResetByResetTasks(CeQueueDto original) {
private void verifyResetByResetTasks(CeQueueDto original, long startedAt) {
CeQueueDto dto = db.getDbClient().ceQueueDao().selectByUuid(db.getSession(), original.getUuid()).get();
assertThat(dto.getStatus()).isEqualTo(CeQueueDto.Status.PENDING).isNotEqualTo(original.getStatus());
assertThat(dto.getStartedAt()).isNull();
assertThat(dto.getStatus()).isEqualTo(PENDING);
assertThat(dto.getStartedAt()).isEqualTo(startedAt);
assertThat(dto.getCreatedAt()).isEqualTo(original.getCreatedAt());
assertThat(dto.getUpdatedAt()).isGreaterThan(original.getUpdatedAt());
assertThat(dto.getUpdatedAt()).isGreaterThan(startedAt);
assertThat(dto.getWorkerUuid()).isNull();
}

private void verifyResetToPendingForWorker(CeQueueDto original) {
private void verifyResetToPendingForWorker(CeQueueDto original, String workerUuid, long startedAt) {
CeQueueDto dto = db.getDbClient().ceQueueDao().selectByUuid(db.getSession(), original.getUuid()).get();
assertThat(dto.getStatus()).isEqualTo(CeQueueDto.Status.PENDING);
assertThat(dto.getStartedAt()).isNull();
assertThat(dto.getStatus()).isEqualTo(PENDING);
assertThat(dto.getStartedAt()).isEqualTo(startedAt);
assertThat(dto.getCreatedAt()).isEqualTo(original.getCreatedAt());
assertThat(dto.getUpdatedAt()).isGreaterThan(original.getUpdatedAt());
assertThat(dto.getWorkerUuid()).isEqualTo(original.getWorkerUuid());
assertThat(dto.getUpdatedAt()).isGreaterThan(startedAt);
assertThat(dto.getWorkerUuid()).isEqualTo(workerUuid);
}

private void verifyUnchangedByResetToPendingForWorker(CeQueueDto original) {
@@ -278,20 +293,29 @@ public class CeQueueDaoTest {
assertThat(dto.getWorkerUuid()).isEqualTo(original.getWorkerUuid());
}

private void verifyInProgressUnchangedByResetToPendingForWorker(CeQueueDto original, String workerUuid, long startedAt) {
CeQueueDto dto = db.getDbClient().ceQueueDao().selectByUuid(db.getSession(), original.getUuid()).get();
assertThat(dto.getStatus()).isEqualTo(IN_PROGRESS);
assertThat(dto.getStartedAt()).isEqualTo(startedAt);
assertThat(dto.getCreatedAt()).isEqualTo(original.getCreatedAt());
assertThat(dto.getUpdatedAt()).isEqualTo(startedAt);
assertThat(dto.getWorkerUuid()).isEqualTo(workerUuid);
}

@Test
public void peek_none_if_no_pendings() {
assertThat(underTest.peek(db.getSession(), WORKER_UUID_1).isPresent()).isFalse();

// not pending, but in progress
insert(TASK_UUID_1, COMPONENT_UUID_1, IN_PROGRESS);
makeInProgress(WORKER_UUID_1, 2_232_222L, insertPending(TASK_UUID_1, COMPONENT_UUID_1));
assertThat(underTest.peek(db.getSession(), WORKER_UUID_1).isPresent()).isFalse();
}

@Test
public void peek_oldest_pending() {
insert(TASK_UUID_1, COMPONENT_UUID_1, PENDING);
insertPending(TASK_UUID_1, COMPONENT_UUID_1);
system2.setNow(INIT_TIME + 3_000_000);
insert(TASK_UUID_2, COMPONENT_UUID_2, PENDING);
insertPending(TASK_UUID_2, COMPONENT_UUID_2);

assertThat(db.countRowsOfTable("ce_queue")).isEqualTo(2);
verifyCeQueueStatuses(TASK_UUID_1, PENDING, TASK_UUID_2, PENDING);
@@ -319,9 +343,9 @@ public class CeQueueDaoTest {
@Test
public void do_not_peek_multiple_tasks_on_same_project_at_the_same_time() {
// two pending tasks on the same project
insert(TASK_UUID_1, COMPONENT_UUID_1, PENDING);
insertPending(TASK_UUID_1, COMPONENT_UUID_1);
system2.setNow(INIT_TIME + 3_000_000);
insert(TASK_UUID_2, COMPONENT_UUID_1, PENDING);
insertPending(TASK_UUID_2, COMPONENT_UUID_1);

Optional<CeQueueDto> peek = underTest.peek(db.getSession(), WORKER_UUID_1);
assertThat(peek).isPresent();
@@ -343,35 +367,35 @@ public class CeQueueDaoTest {
@Test
public void select_by_query() {
// task status not in query
insert(newCeQueueDto(TASK_UUID_1)
insertPending(newCeQueueDto(TASK_UUID_1)
.setComponentUuid(COMPONENT_UUID_1)
.setStatus(IN_PROGRESS)
.setTaskType(CeTaskTypes.REPORT)
.setCreatedAt(100_000L));

// too early
insert(newCeQueueDto(TASK_UUID_3)
insertPending(newCeQueueDto(TASK_UUID_3)
.setComponentUuid(COMPONENT_UUID_1)
.setStatus(PENDING)
.setTaskType(CeTaskTypes.REPORT)
.setCreatedAt(90_000L));

// task type not in query
insert(newCeQueueDto("TASK_4")
insertPending(newCeQueueDto("TASK_4")
.setComponentUuid("PROJECT_2")
.setStatus(PENDING)
.setTaskType("ANOTHER_TYPE")
.setCreatedAt(100_000L));

// correct
insert(newCeQueueDto(TASK_UUID_2)
insertPending(newCeQueueDto(TASK_UUID_2)
.setComponentUuid(COMPONENT_UUID_1)
.setStatus(PENDING)
.setTaskType(CeTaskTypes.REPORT)
.setCreatedAt(100_000L));

// correct submitted later
insert(newCeQueueDto("TASK_5")
insertPending(newCeQueueDto("TASK_5")
.setComponentUuid(COMPONENT_UUID_1)
.setStatus(PENDING)
.setTaskType(CeTaskTypes.REPORT)
@@ -393,7 +417,7 @@ public class CeQueueDaoTest {

@Test
public void select_by_query_returns_empty_list_when_only_current() {
insert(newCeQueueDto(TASK_UUID_1)
insertPending(newCeQueueDto(TASK_UUID_1)
.setComponentUuid(COMPONENT_UUID_1)
.setStatus(IN_PROGRESS)
.setTaskType(CeTaskTypes.REPORT)
@@ -410,7 +434,7 @@ public class CeQueueDaoTest {

@Test
public void select_by_query_returns_empty_list_when_max_submitted_at() {
insert(newCeQueueDto(TASK_UUID_1)
insertPending(newCeQueueDto(TASK_UUID_1)
.setComponentUuid(COMPONENT_UUID_1)
.setStatus(IN_PROGRESS)
.setTaskType(CeTaskTypes.REPORT)
@@ -427,7 +451,7 @@ public class CeQueueDaoTest {

@Test
public void select_by_query_returns_empty_list_when_empty_list_of_component_uuid() {
insert(newCeQueueDto(TASK_UUID_1)
insertPending(newCeQueueDto(TASK_UUID_1)
.setComponentUuid(COMPONENT_UUID_1)
.setStatus(IN_PROGRESS)
.setTaskType(CeTaskTypes.REPORT)
@@ -445,19 +469,19 @@ public class CeQueueDaoTest {
@Test
public void count_by_status_and_component_uuid() {
// task retrieved in the queue
insert(newCeQueueDto(TASK_UUID_1)
insertPending(newCeQueueDto(TASK_UUID_1)
.setComponentUuid(COMPONENT_UUID_1)
.setStatus(IN_PROGRESS)
.setTaskType(CeTaskTypes.REPORT)
.setCreatedAt(100_000L));
// on component uuid 2, not returned
insert(newCeQueueDto(TASK_UUID_2)
insertPending(newCeQueueDto(TASK_UUID_2)
.setComponentUuid(COMPONENT_UUID_2)
.setStatus(IN_PROGRESS)
.setTaskType(CeTaskTypes.REPORT)
.setCreatedAt(100_000L));
// pending status, not returned
insert(newCeQueueDto(TASK_UUID_3)
insertPending(newCeQueueDto(TASK_UUID_3)
.setComponentUuid(COMPONENT_UUID_1)
.setStatus(PENDING)
.setTaskType(CeTaskTypes.REPORT)
@@ -470,19 +494,19 @@ public class CeQueueDaoTest {
@Test
public void count_by_status_and_component_uuids() {
// task retrieved in the queue
insert(newCeQueueDto(TASK_UUID_1)
insertPending(newCeQueueDto(TASK_UUID_1)
.setComponentUuid(COMPONENT_UUID_1)
.setStatus(IN_PROGRESS)
.setTaskType(CeTaskTypes.REPORT)
.setCreatedAt(100_000L));
// on component uuid 2, not returned
insert(newCeQueueDto(TASK_UUID_2)
insertPending(newCeQueueDto(TASK_UUID_2)
.setComponentUuid(COMPONENT_UUID_2)
.setStatus(IN_PROGRESS)
.setTaskType(CeTaskTypes.REPORT)
.setCreatedAt(100_000L));
// pending status, not returned
insert(newCeQueueDto(TASK_UUID_3)
insertPending(newCeQueueDto(TASK_UUID_3)
.setComponentUuid(COMPONENT_UUID_1)
.setStatus(PENDING)
.setTaskType(CeTaskTypes.REPORT)
@@ -497,41 +521,28 @@ public class CeQueueDaoTest {
assertThat(underTest.countByStatus(db.getSession(), IN_PROGRESS)).isEqualTo(2);
}

private void insert(CeQueueDto dto) {
private void insertPending(CeQueueDto dto) {
underTest.insert(db.getSession(), dto);
db.commit();
}

private CeQueueDto insert(String uuid, CeQueueDto.Status status) {
CeQueueDto dto = new CeQueueDto();
dto.setUuid(uuid);
dto.setTaskType(CeTaskTypes.REPORT);
dto.setStatus(status);
dto.setSubmitterUuid("henri");
underTestAlwaysIncreasingSystem2.insert(db.getSession(), dto);
db.getSession().commit();
return dto;
}

private CeQueueDto insert(String uuid, CeQueueDto.Status status, String workerUuid, Long startedAt) {
private CeQueueDto insertPending(String uuid) {
CeQueueDto dto = new CeQueueDto();
dto.setUuid(uuid);
dto.setTaskType(CeTaskTypes.REPORT);
dto.setStatus(status);
dto.setStatus(PENDING);
dto.setSubmitterUuid("henri");
dto.setWorkerUuid(workerUuid);
dto.setStartedAt(startedAt);
underTestAlwaysIncreasingSystem2.insert(db.getSession(), dto);
db.getSession().commit();
return dto;
}

private CeQueueDto insert(String uuid, String componentUuid, CeQueueDto.Status status) {
private CeQueueDto insertPending(String uuid, String componentUuid) {
CeQueueDto dto = new CeQueueDto();
dto.setUuid(uuid);
dto.setTaskType(CeTaskTypes.REPORT);
dto.setComponentUuid(componentUuid);
dto.setStatus(status);
dto.setStatus(PENDING);
dto.setSubmitterUuid("henri");
underTest.insert(db.getSession(), dto);
db.getSession().commit();

+ 0
- 18
server/sonar-db-dao/src/test/java/org/sonar/db/ce/CeQueueDtoTest.java View File

@@ -93,22 +93,4 @@ public class CeQueueDtoTest {

underTest.setSubmitterUuid(str_256_chars);
}

@Test
public void setWorkerUuid_accepts_null_empty_and_string_40_chars_or_less() {
underTest.setWorkerUuid(null);
underTest.setWorkerUuid("");
underTest.setWorkerUuid("bar");
underTest.setWorkerUuid(STR_40_CHARS);
}

@Test
public void setWorkerUuid_throws_IAE_if_value_is_41_chars() {
String str_41_chars = STR_40_CHARS + "a";

expectedException.expect(IllegalArgumentException.class);
expectedException.expectMessage("worker uuid is too long: " + str_41_chars);

underTest.setWorkerUuid(str_41_chars);
}
}

+ 29
- 2
server/sonar-db-dao/src/test/java/org/sonar/db/ce/CeQueueTesting.java View File

@@ -19,8 +19,15 @@
*/
package org.sonar.db.ce;

import java.util.stream.Stream;
import org.sonar.db.DbSession;

import static com.google.common.base.Preconditions.checkArgument;
import static org.apache.commons.lang.RandomStringUtils.randomAlphanumeric;
import static org.apache.commons.lang.math.RandomUtils.nextLong;
import static org.assertj.core.api.Assertions.assertThat;
import static org.sonar.db.ce.CeQueueDto.Status.IN_PROGRESS;
import static org.sonar.db.ce.CeQueueDto.Status.PENDING;

public class CeQueueTesting {
private CeQueueTesting() {
@@ -35,9 +42,29 @@ public class CeQueueTesting {
.setTaskType(CeTaskTypes.REPORT)
.setSubmitterUuid(randomAlphanumeric(255))
.setCreatedAt(nextLong())
.setUpdatedAt(nextLong())
.setStartedAt(nextLong());
.setUpdatedAt(nextLong());
}

public static void makeInProgress(DbSession dbSession, String workerUuid, long now, CeQueueDto... ceQueueDtos) {
Stream.of(ceQueueDtos).forEach(ceQueueDto -> {
CeQueueMapper mapper = dbSession.getMapper(CeQueueMapper.class);
int touchedRows = mapper.updateIf(ceQueueDto.getUuid(),
new UpdateIf.NewProperties(IN_PROGRESS, workerUuid, now, now),
new UpdateIf.OldProperties(PENDING));
assertThat(touchedRows).isEqualTo(1);
});
}

public static void reset(DbSession dbSession, long now, CeQueueDto... ceQueueDtos) {
Stream.of(ceQueueDtos).forEach(ceQueueDto -> {
checkArgument(ceQueueDto.getStatus() == IN_PROGRESS);
checkArgument(ceQueueDto.getWorkerUuid() != null);

CeQueueMapper mapper = dbSession.getMapper(CeQueueMapper.class);
int touchedRows = mapper.updateIf(ceQueueDto.getUuid(),
new UpdateIf.NewProperties(PENDING, ceQueueDto.getUuid(), now, now),
new UpdateIf.OldProperties(IN_PROGRESS));
assertThat(touchedRows).isEqualTo(1);
});
}
}

+ 91
- 0
server/sonar-db-dao/src/test/java/org/sonar/db/ce/UpdateIfTest.java View File

@@ -0,0 +1,91 @@
/*
* SonarQube
* Copyright (C) 2009-2018 SonarSource SA
* mailto:info AT sonarsource DOT com
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License as published by the Free Software Foundation; either
* version 3 of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with this program; if not, write to the Free Software Foundation,
* Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package org.sonar.db.ce;

import com.tngtech.java.junit.dataprovider.DataProvider;
import com.tngtech.java.junit.dataprovider.DataProviderRunner;
import com.tngtech.java.junit.dataprovider.UseDataProvider;
import java.util.Random;
import org.apache.commons.lang.RandomStringUtils;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;

import static org.assertj.core.api.Assertions.assertThat;

@RunWith(DataProviderRunner.class)
public class UpdateIfTest {
private static final String STR_40_CHARS = "0123456789012345678901234567890123456789";

@Rule
public ExpectedException expectedException = ExpectedException.none();

@Test
public void newProperties_constructor_accepts_null_workerUuid() {
UpdateIf.NewProperties newProperties = new UpdateIf.NewProperties(CeQueueDto.Status.PENDING, null, 123, 456);

assertThat(newProperties.getWorkerUuid()).isNull();
}

@Test
public void newProperties_constructor_fails_with_NPE_if_status_is_null() {
expectedException.expect(NullPointerException.class);
expectedException.expectMessage("status can't be null");

new UpdateIf.NewProperties(null, "foo", 123, 456);
}

@Test
public void newProperties_constructor_fails_with_IAE_if_workerUuid_is_41_or_more() {
String workerUuid = RandomStringUtils.randomAlphanumeric(41 + new Random().nextInt(5));

expectedException.expect(IllegalArgumentException.class);
expectedException.expectMessage("worker uuid is too long: " + workerUuid);

new UpdateIf.NewProperties(CeQueueDto.Status.PENDING, workerUuid, 123, 456);
}

@Test
@UseDataProvider("workerUuidValidValues")
public void newProperties_constructor_accepts_null_empty_and_string_40_chars_or_less(String workerUuid) {
new UpdateIf.NewProperties(CeQueueDto.Status.PENDING, workerUuid, 123, 345);
}

@DataProvider
public static Object[][] workerUuidValidValues() {
return new Object[][] {
{null},
{""},
{"bar"},
{STR_40_CHARS}
};
}

@Test
public void newProperties_constructor_IAE_if_workerUuid_is_41_chars() {
String str_41_chars = STR_40_CHARS + "a";

expectedException.expect(IllegalArgumentException.class);
expectedException.expectMessage("worker uuid is too long: " + str_41_chars);

new UpdateIf.NewProperties(CeQueueDto.Status.PENDING, str_41_chars, 123, 345);
}
}

+ 11
- 6
server/sonar-server/src/test/java/org/sonar/server/ce/ws/TaskFormatterTest.java View File

@@ -39,6 +39,7 @@ import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.tuple;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.sonar.db.ce.CeQueueTesting.makeInProgress;

public class TaskFormatterTest {

@@ -85,13 +86,15 @@ public class TaskFormatterTest {
CeQueueDto dto = new CeQueueDto();
dto.setUuid("UUID");
dto.setTaskType("TYPE");
dto.setStatus(CeQueueDto.Status.IN_PROGRESS);
dto.setStatus(CeQueueDto.Status.PENDING);
dto.setCreatedAt(1_450_000_000_000L);
dto.setStartedAt(1_451_000_000_000L);
dto.setComponentUuid(uuid);
dto.setSubmitterUuid(user.getUuid());
db.getDbClient().ceQueueDao().insert(db.getSession(), dto);
makeInProgress(db.getSession(), "workerUuid", 1_958_000_000_000L, dto);
CeQueueDto inProgress = db.getDbClient().ceQueueDao().selectByUuid(db.getSession(), dto.getUuid()).get();

Ce.Task wsTask = underTest.formatQueue(db.getSession(), dto);
Ce.Task wsTask = underTest.formatQueue(db.getSession(), inProgress);

assertThat(wsTask.getType()).isEqualTo("TYPE");
assertThat(wsTask.getId()).isEqualTo("UUID");
@@ -130,12 +133,14 @@ public class TaskFormatterTest {
CeQueueDto dto = new CeQueueDto();
dto.setUuid("UUID");
dto.setTaskType("TYPE");
dto.setStatus(CeQueueDto.Status.IN_PROGRESS);
dto.setStatus(CeQueueDto.Status.PENDING);
dto.setCreatedAt(1_450_000_000_000L);
dto.setStartedAt(startedAt);
db.getDbClient().ceQueueDao().insert(db.getSession(), dto);
makeInProgress(db.getSession(), "workerUuid", startedAt, dto);
CeQueueDto inProgress = db.getDbClient().ceQueueDao().selectByUuid(db.getSession(), dto.getUuid()).get();
when(system2.now()).thenReturn(now);

Ce.Task wsTask = underTest.formatQueue(db.getSession(), dto);
Ce.Task wsTask = underTest.formatQueue(db.getSession(), inProgress);

assertThat(wsTask.getExecutionTimeMs()).isEqualTo(now - startedAt);
}

Loading…
Cancel
Save