diff options
16 files changed, 376 insertions, 249 deletions
diff --git a/server/sonar-ce-common/src/main/java/org/sonar/ce/queue/CeQueueImpl.java b/server/sonar-ce-common/src/main/java/org/sonar/ce/queue/CeQueueImpl.java index 01b113bd64d..34a8c01f677 100644 --- a/server/sonar-ce-common/src/main/java/org/sonar/ce/queue/CeQueueImpl.java +++ b/server/sonar-ce-common/src/main/java/org/sonar/ce/queue/CeQueueImpl.java @@ -154,12 +154,11 @@ public class CeQueueImpl implements CeQueue { dto.setComponentUuid(submission.getComponentUuid()); dto.setStatus(PENDING); dto.setSubmitterUuid(submission.getSubmitterUuid()); - dto.setStartedAt(null); dbClient.ceQueueDao().insert(dbSession, dto); return dto; } - protected CeTask loadTask(DbSession dbSession, CeQueueDto dto) { + CeTask loadTask(DbSession dbSession, CeQueueDto dto) { String componentUuid = dto.getComponentUuid(); if (componentUuid == null) { return new CeQueueDtoToCeTask(defaultOrganizationProvider.get().getUuid()).apply(dto); diff --git a/server/sonar-ce/src/main/java/org/sonar/ce/cleaning/CeCleaningSchedulerImpl.java b/server/sonar-ce/src/main/java/org/sonar/ce/cleaning/CeCleaningSchedulerImpl.java index 6c34cac9baf..b266d4a94ad 100644 --- a/server/sonar-ce/src/main/java/org/sonar/ce/cleaning/CeCleaningSchedulerImpl.java +++ b/server/sonar-ce/src/main/java/org/sonar/ce/cleaning/CeCleaningSchedulerImpl.java @@ -56,9 +56,11 @@ public class CeCleaningSchedulerImpl implements CeCleaningScheduler { Lock ceCleaningJobLock = ceDistributedInformation.acquireCleanJobLock(); // If we cannot lock that means that another job is running + // So we skip resetting and cancelling tasks in queue if (ceCleaningJobLock.tryLock()) { try { resetTasksWithUnknownWorkerUUIDs(); + cancelWornOuts(); } finally { ceCleaningJobLock.unlock(); } @@ -73,4 +75,13 @@ public class CeCleaningSchedulerImpl implements CeCleaningScheduler { LOG.warn("Failed to reset tasks with unknown worker UUIDs", e); } } + + private void cancelWornOuts() { + try { + LOG.debug("Cancelling any worn out task"); + internalCeQueue.cancelWornOuts(); + } catch (Exception e) { + LOG.warn("Failed to cancel worn out tasks", e); + } + } } diff --git a/server/sonar-ce/src/main/java/org/sonar/ce/queue/InternalCeQueue.java b/server/sonar-ce/src/main/java/org/sonar/ce/queue/InternalCeQueue.java index 9932e2e3986..255e94d7011 100644 --- a/server/sonar-ce/src/main/java/org/sonar/ce/queue/InternalCeQueue.java +++ b/server/sonar-ce/src/main/java/org/sonar/ce/queue/InternalCeQueue.java @@ -68,5 +68,7 @@ public interface InternalCeQueue extends CeQueue { */ void remove(CeTask task, Status status, @Nullable CeTaskResult taskResult, @Nullable Throwable error); + void cancelWornOuts(); + void resetTasksWithUnknownWorkerUUIDs(Set<String> knownWorkerUUIDs); } diff --git a/server/sonar-ce/src/main/java/org/sonar/ce/queue/InternalCeQueueImpl.java b/server/sonar-ce/src/main/java/org/sonar/ce/queue/InternalCeQueueImpl.java index 8f9b6eb34a5..743d71336e8 100644 --- a/server/sonar-ce/src/main/java/org/sonar/ce/queue/InternalCeQueueImpl.java +++ b/server/sonar-ce/src/main/java/org/sonar/ce/queue/InternalCeQueueImpl.java @@ -23,6 +23,7 @@ import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.OutputStream; import java.io.PrintWriter; +import java.util.List; import java.util.Optional; import java.util.Set; import javax.annotation.CheckForNull; @@ -78,6 +79,7 @@ public class InternalCeQueueImpl extends CeQueueImpl implements InternalCeQueue CeQueueDao ceQueueDao = dbClient.ceQueueDao(); int i = ceQueueDao.resetToPendingForWorker(dbSession, workerUuid); if (i > 0) { + dbSession.commit(); LOG.debug("{} in progress tasks reset for worker uuid {}", i, workerUuid); } Optional<CeQueueDto> dto = ceQueueDao.peek(dbSession, workerUuid); @@ -165,6 +167,19 @@ public class InternalCeQueueImpl extends CeQueueImpl implements InternalCeQueue } @Override + public void cancelWornOuts() { + try (DbSession dbSession = dbClient.openSession(false)) { + List<CeQueueDto> wornOutTasks = dbClient.ceQueueDao().selectWornout(dbSession); + wornOutTasks.forEach(queueDto -> { + CeActivityDto activityDto = new CeActivityDto(queueDto); + activityDto.setStatus(CeActivityDto.Status.CANCELED); + updateQueueStatus(CeActivityDto.Status.CANCELED, activityDto); + remove(dbSession, queueDto, activityDto); + }); + } + } + + @Override public void resetTasksWithUnknownWorkerUUIDs(Set<String> knownWorkerUUIDs) { try (DbSession dbSession = dbClient.openSession(false)) { dbClient.ceQueueDao().resetTasksWithUnknownWorkerUUIDs(dbSession, knownWorkerUUIDs); diff --git a/server/sonar-ce/src/test/java/org/sonar/ce/queue/InternalCeQueueImplTest.java b/server/sonar-ce/src/test/java/org/sonar/ce/queue/InternalCeQueueImplTest.java index ee5680b24b7..74c38b92df5 100644 --- a/server/sonar-ce/src/test/java/org/sonar/ce/queue/InternalCeQueueImplTest.java +++ b/server/sonar-ce/src/test/java/org/sonar/ce/queue/InternalCeQueueImplTest.java @@ -43,6 +43,7 @@ import org.sonar.db.DbSession; import org.sonar.db.DbTester; import org.sonar.db.ce.CeActivityDto; import org.sonar.db.ce.CeQueueDto; +import org.sonar.db.ce.CeQueueTesting; import org.sonar.db.ce.CeTaskTypes; import org.sonar.db.component.ComponentDto; import org.sonar.db.component.ComponentTesting; @@ -259,11 +260,11 @@ public class InternalCeQueueImplTest { @Test public void remove_copies_workerUuid() { - db.getDbClient().ceQueueDao().insert(session, new CeQueueDto() + CeQueueDto ceQueueDto = db.getDbClient().ceQueueDao().insert(session, new CeQueueDto() .setUuid("uuid") .setTaskType("foo") - .setStatus(CeQueueDto.Status.PENDING) - .setWorkerUuid("Dustin")); + .setStatus(CeQueueDto.Status.PENDING)); + makeInProgress(ceQueueDto, "Dustin"); db.commit(); underTest.remove(new CeTask.Builder() @@ -316,18 +317,15 @@ public class InternalCeQueueImplTest { } @Test - public void peek_overrides_workerUuid_to_argument() { - db.getDbClient().ceQueueDao().insert(session, new CeQueueDto() + public void peek_ignores_in_progress_tasks() { + CeQueueDto dto = db.getDbClient().ceQueueDao().insert(session, new CeQueueDto() .setUuid("uuid") .setTaskType("foo") - .setStatus(CeQueueDto.Status.PENDING) - .setWorkerUuid("must be overriden")); + .setStatus(CeQueueDto.Status.PENDING)); + makeInProgress(dto, "foo"); db.commit(); - underTest.peek(WORKER_UUID_1); - - CeQueueDto ceQueueDto = db.getDbClient().ceQueueDao().selectByUuid(session, "uuid").get(); - assertThat(ceQueueDto.getWorkerUuid()).isEqualTo(WORKER_UUID_1); + assertThat(underTest.peek(WORKER_UUID_1)).isEmpty(); } @Test @@ -352,8 +350,8 @@ public class InternalCeQueueImplTest { @Test public void peek_resets_to_pending_any_task_in_progress_for_specified_worker_uuid_and_updates_updatedAt() { - insertPending("u0", "doesn't matter"); // add a pending one that will be picked so that u1 isn't peek and status reset is visible in DB - CeQueueDto u1 = insertPending("u1", WORKER_UUID_1);// will be picked-because older than any of the reset ones + insertPending("u0"); // add a pending one that will be picked so that u1 isn't peek and status reset is visible in DB + CeQueueDto u1 = insertPending("u1");// will be picked-because older than any of the reset ones CeQueueDto u2 = insertInProgress("u2", WORKER_UUID_1);// will be reset assertThat(underTest.peek(WORKER_UUID_1).get().getUuid()).isEqualTo("u0"); @@ -364,7 +362,7 @@ public class InternalCeQueueImplTest { @Test public void peek_resets_to_pending_any_task_in_progress_for_specified_worker_uuid_and_only_this_uuid() { - insertPending("u0", "doesn't matter"); // add a pending one that will be picked so that u1 isn't peek and status reset is visible in DB + insertPending("u0"); // add a pending one that will be picked so that u1 isn't peek and status reset is visible in DB CeQueueDto u1 = insertInProgress("u1", WORKER_UUID_1); CeQueueDto u2 = insertInProgress("u2", WORKER_UUID_2); CeQueueDto u3 = insertInProgress("u3", WORKER_UUID_1); @@ -396,19 +394,18 @@ public class InternalCeQueueImplTest { CeQueueDto dto = new CeQueueDto() .setUuid(uuid) .setTaskType("foo") - .setStatus(CeQueueDto.Status.IN_PROGRESS) - .setWorkerUuid(workerUuid); + .setStatus(CeQueueDto.Status.PENDING); db.getDbClient().ceQueueDao().insert(session, dto); + makeInProgress(dto, workerUuid); db.commit(); - return dto; + return db.getDbClient().ceQueueDao().selectByUuid(session, uuid).get(); } - private CeQueueDto insertPending(String uuid, String workerUuid) { + private CeQueueDto insertPending(String uuid) { CeQueueDto dto = new CeQueueDto() .setUuid(uuid) .setTaskType("foo") - .setStatus(CeQueueDto.Status.PENDING) - .setWorkerUuid(workerUuid); + .setStatus(CeQueueDto.Status.PENDING); db.getDbClient().ceQueueDao().insert(session, dto); db.commit(); return dto; @@ -427,21 +424,6 @@ public class InternalCeQueueImplTest { } @Test - public void cancel_copies_workerUuid() { - CeQueueDto ceQueueDto = db.getDbClient().ceQueueDao().insert(session, new CeQueueDto() - .setUuid("uuid") - .setTaskType("foo") - .setStatus(CeQueueDto.Status.PENDING) - .setWorkerUuid("Dustin")); - db.commit(); - - underTest.cancel(db.getSession(), ceQueueDto); - - CeActivityDto dto = db.getDbClient().ceActivityDao().selectByUuid(db.getSession(), "uuid").get(); - assertThat(dto.getWorkerUuid()).isEqualTo("Dustin"); - } - - @Test public void fail_to_cancel_if_in_progress() { CeTask task = submit(CeTaskTypes.REPORT, "PROJECT_1"); underTest.peek(WORKER_UUID_2); @@ -473,25 +455,19 @@ public class InternalCeQueueImplTest { @Test public void resetTasksWithUnknownWorkerUUIDs_reset_only_in_progress_tasks() { - CeQueueDto u1 = insertCeQueueDto("u1", CeQueueDto.Status.PENDING, null); - CeQueueDto u2 = insertCeQueueDto("u2", CeQueueDto.Status.PENDING, "worker1"); - CeQueueDto u3 = insertCeQueueDto("u3", CeQueueDto.Status.PENDING, null); - CeQueueDto u4 = insertCeQueueDto("u4", CeQueueDto.Status.PENDING, "worker2"); - CeQueueDto u5 = insertCeQueueDto("u5", CeQueueDto.Status.IN_PROGRESS, null); - CeQueueDto u6 = insertCeQueueDto("u6", CeQueueDto.Status.IN_PROGRESS, "worker1"); - CeQueueDto u7 = insertCeQueueDto("u7", CeQueueDto.Status.IN_PROGRESS, "worker2"); - CeQueueDto u8 = insertCeQueueDto("u8", CeQueueDto.Status.IN_PROGRESS, "worker3"); + CeQueueDto u1 = insertCeQueueDto("u1"); + CeQueueDto u2 = insertCeQueueDto("u2"); + CeQueueDto u6 = insertInProgress("u6", "worker1"); + CeQueueDto u7 = insertInProgress("u7", "worker2"); + CeQueueDto u8 = insertInProgress("u8", "worker3"); underTest.resetTasksWithUnknownWorkerUUIDs(ImmutableSet.of("worker2", "worker3")); // Pending tasks must not be modified even if a workerUUID is not present verifyUnmodified(u1); verifyUnmodified(u2); - verifyUnmodified(u3); - verifyUnmodified(u4); // Unknown worker : null, "worker1" - verifyReset(u5); verifyReset(u6); // Known workers : "worker2", "worker3" @@ -501,25 +477,19 @@ public class InternalCeQueueImplTest { @Test public void resetTasksWithUnknownWorkerUUIDs_with_empty_set_will_reset_all_in_progress_tasks() { - CeQueueDto u1 = insertCeQueueDto("u1", CeQueueDto.Status.PENDING, null); - CeQueueDto u2 = insertCeQueueDto("u2", CeQueueDto.Status.PENDING, "worker1"); - CeQueueDto u3 = insertCeQueueDto("u3", CeQueueDto.Status.PENDING, null); - CeQueueDto u4 = insertCeQueueDto("u4", CeQueueDto.Status.PENDING, "worker2"); - CeQueueDto u5 = insertCeQueueDto("u5", CeQueueDto.Status.IN_PROGRESS, null); - CeQueueDto u6 = insertCeQueueDto("u6", CeQueueDto.Status.IN_PROGRESS, "worker1"); - CeQueueDto u7 = insertCeQueueDto("u7", CeQueueDto.Status.IN_PROGRESS, "worker2"); - CeQueueDto u8 = insertCeQueueDto("u8", CeQueueDto.Status.IN_PROGRESS, "worker3"); + CeQueueDto u1 = insertCeQueueDto("u1"); + CeQueueDto u2 = insertCeQueueDto("u2"); + CeQueueDto u6 = insertInProgress("u6", "worker1"); + CeQueueDto u7 = insertInProgress("u7", "worker2"); + CeQueueDto u8 = insertInProgress("u8", "worker3"); underTest.resetTasksWithUnknownWorkerUUIDs(ImmutableSet.of()); // Pending tasks must not be modified even if a workerUUID is not present verifyUnmodified(u1); verifyUnmodified(u2); - verifyUnmodified(u3); - verifyUnmodified(u4); // Unknown worker : null, "worker1" - verifyReset(u5); verifyReset(u6); verifyReset(u7); verifyReset(u8); @@ -527,25 +497,19 @@ public class InternalCeQueueImplTest { @Test public void resetTasksWithUnknownWorkerUUIDs_with_worker_without_tasks_will_reset_all_in_progress_tasks() { - CeQueueDto u1 = insertCeQueueDto("u1", CeQueueDto.Status.PENDING, null); - CeQueueDto u2 = insertCeQueueDto("u2", CeQueueDto.Status.PENDING, "worker1"); - CeQueueDto u3 = insertCeQueueDto("u3", CeQueueDto.Status.PENDING, null); - CeQueueDto u4 = insertCeQueueDto("u4", CeQueueDto.Status.PENDING, "worker2"); - CeQueueDto u5 = insertCeQueueDto("u5", CeQueueDto.Status.IN_PROGRESS, null); - CeQueueDto u6 = insertCeQueueDto("u6", CeQueueDto.Status.IN_PROGRESS, "worker1"); - CeQueueDto u7 = insertCeQueueDto("u7", CeQueueDto.Status.IN_PROGRESS, "worker2"); - CeQueueDto u8 = insertCeQueueDto("u8", CeQueueDto.Status.IN_PROGRESS, "worker3"); + CeQueueDto u1 = insertCeQueueDto("u1"); + CeQueueDto u2 = insertCeQueueDto("u2"); + CeQueueDto u6 = insertInProgress("u6", "worker1"); + CeQueueDto u7 = insertInProgress("u7", "worker2"); + CeQueueDto u8 = insertInProgress("u8", "worker3"); underTest.resetTasksWithUnknownWorkerUUIDs(ImmutableSet.of("worker1000", "worker1001")); // Pending tasks must not be modified even if a workerUUID is not present verifyUnmodified(u1); verifyUnmodified(u2); - verifyUnmodified(u3); - verifyUnmodified(u4); // Unknown worker : null, "worker1" - verifyReset(u5); verifyReset(u6); verifyReset(u7); verifyReset(u8); @@ -560,8 +524,7 @@ public class InternalCeQueueImplTest { assertThat(dto.getStatus()).isEqualTo(CeQueueDto.Status.PENDING).isNotEqualTo(original.getStatus()); // UpdatedAt must have been updated assertThat(dto.getUpdatedAt()).isNotEqualTo(original.getUpdatedAt()); - // StartedAt must be null - assertThat(dto.getStartedAt()).isNull(); + assertThat(dto.getStartedAt()).isEqualTo(original.getStartedAt()); // WorkerUuid must be null assertThat(dto.getWorkerUuid()).isNull(); } @@ -573,19 +536,11 @@ public class InternalCeQueueImplTest { assertThat(dto.getUpdatedAt()).isEqualTo(original.getUpdatedAt()); } - private void verifyCanceled(CeQueueDto original) { - assertThat(db.getDbClient().ceQueueDao().selectByUuid(db.getSession(), original.getUuid())).isEmpty(); - CeActivityDto dto = db.getDbClient().ceActivityDao().selectByUuid(db.getSession(), original.getUuid()).get(); - assertThat(dto.getStatus()).isEqualTo(CeActivityDto.Status.CANCELED); - assertThat(dto.getWorkerUuid()).isEqualTo(original.getWorkerUuid()); - } - - private CeQueueDto insertCeQueueDto(String uuid, CeQueueDto.Status status, String workerUuid) { + private CeQueueDto insertCeQueueDto(String uuid) { CeQueueDto dto = new CeQueueDto() .setUuid(uuid) .setTaskType("foo") - .setStatus(status) - .setWorkerUuid(workerUuid); + .setStatus(CeQueueDto.Status.PENDING); db.getDbClient().ceQueueDao().insert(db.getSession(), dto); db.commit(); return dto; @@ -652,6 +607,11 @@ public class InternalCeQueueImplTest { return componentDto; } + private CeQueueDto makeInProgress(CeQueueDto ceQueueDto, String workerUuid) { + CeQueueTesting.makeInProgress(session, workerUuid, system2.now(), ceQueueDto); + return db.getDbClient().ceQueueDao().selectByUuid(session, ceQueueDto.getUuid()).get(); + } + private static String stacktraceToString(Throwable error) { ByteArrayOutputStream out = new ByteArrayOutputStream(); error.printStackTrace(new PrintStream(out)); diff --git a/server/sonar-db-dao/src/main/java/org/sonar/db/ce/CeQueueDao.java b/server/sonar-db-dao/src/main/java/org/sonar/db/ce/CeQueueDao.java index c5e47fde5f5..308a90a5cfd 100644 --- a/server/sonar-db-dao/src/main/java/org/sonar/db/ce/CeQueueDao.java +++ b/server/sonar-db-dao/src/main/java/org/sonar/db/ce/CeQueueDao.java @@ -89,6 +89,10 @@ public class CeQueueDao implements Dao { return mapper(dbSession).selectPending(); } + public List<CeQueueDto> selectWornout(DbSession dbSession) { + return mapper(dbSession).selectWornout(); + } + public void resetTasksWithUnknownWorkerUUIDs(DbSession dbSession, Set<String> knownWorkerUUIDs) { if (knownWorkerUUIDs.isEmpty()) { mapper(dbSession).resetAllInProgressTasks(system2.now()); diff --git a/server/sonar-db-dao/src/main/java/org/sonar/db/ce/CeQueueDto.java b/server/sonar-db-dao/src/main/java/org/sonar/db/ce/CeQueueDto.java index 640b9c217e4..b31c0bece83 100644 --- a/server/sonar-db-dao/src/main/java/org/sonar/db/ce/CeQueueDto.java +++ b/server/sonar-db-dao/src/main/java/org/sonar/db/ce/CeQueueDto.java @@ -98,10 +98,11 @@ public class CeQueueDto { return workerUuid; } - public CeQueueDto setWorkerUuid(@Nullable String workerUuid) { - checkArgument(workerUuid == null || workerUuid.length() <= 40, "worker uuid is too long: %s", workerUuid); + /** + * Accessed by MyBatis through reflexion. Field is otherwise read-only. + */ + private void setWorkerUuid(@Nullable String workerUuid) { this.workerUuid = workerUuid; - return this; } @CheckForNull @@ -109,9 +110,11 @@ public class CeQueueDto { return startedAt; } - public CeQueueDto setStartedAt(@Nullable Long l) { + /** + * Accessed by MyBatis through reflexion. Field is otherwise read-only. + */ + private void setStartedAt(@Nullable Long l) { this.startedAt = l; - return this; } public long getCreatedAt() { diff --git a/server/sonar-db-dao/src/main/java/org/sonar/db/ce/CeQueueMapper.java b/server/sonar-db-dao/src/main/java/org/sonar/db/ce/CeQueueMapper.java index 711e3d0bf84..9cccbb2466c 100644 --- a/server/sonar-db-dao/src/main/java/org/sonar/db/ce/CeQueueMapper.java +++ b/server/sonar-db-dao/src/main/java/org/sonar/db/ce/CeQueueMapper.java @@ -48,6 +48,11 @@ public interface CeQueueMapper { List<CeQueueDto> selectPending(); /** + * Select all pending tasks which have already been started. + */ + List<CeQueueDto> selectWornout(); + + /** * Select all tasks whose worker UUID is not present in {@code knownWorkerUUIDs} */ void resetTasksWithUnknownWorkerUUIDs(@Param("knownWorkerUUIDs") List<String> knownWorkerUUIDs, @Param("updatedAt") long updatedAt); diff --git a/server/sonar-db-dao/src/main/java/org/sonar/db/ce/UpdateIf.java b/server/sonar-db-dao/src/main/java/org/sonar/db/ce/UpdateIf.java index 9b6e11f1704..1c1df6ec8a8 100644 --- a/server/sonar-db-dao/src/main/java/org/sonar/db/ce/UpdateIf.java +++ b/server/sonar-db-dao/src/main/java/org/sonar/db/ce/UpdateIf.java @@ -23,6 +23,7 @@ import javax.annotation.CheckForNull; import javax.annotation.Nullable; import javax.annotation.concurrent.Immutable; +import static com.google.common.base.Preconditions.checkArgument; import static java.util.Objects.requireNonNull; final class UpdateIf { @@ -38,7 +39,8 @@ final class UpdateIf { private final long updatedAt; NewProperties(CeQueueDto.Status status, @Nullable String workerUuid, - Long startedAt, long updatedAt) { + long startedAt, long updatedAt) { + checkArgument(workerUuid == null || workerUuid.length() <= 40, "worker uuid is too long: %s", workerUuid); this.status = requireNonNull(status, "status can't be null"); this.workerUuid = workerUuid; this.startedAt = startedAt; @@ -54,7 +56,6 @@ final class UpdateIf { return workerUuid; } - @CheckForNull public Long getStartedAt() { return startedAt; } diff --git a/server/sonar-db-dao/src/main/resources/org/sonar/db/ce/CeQueueMapper.xml b/server/sonar-db-dao/src/main/resources/org/sonar/db/ce/CeQueueMapper.xml index b2b8fb3e403..602e386ead5 100644 --- a/server/sonar-db-dao/src/main/resources/org/sonar/db/ce/CeQueueMapper.xml +++ b/server/sonar-db-dao/src/main/resources/org/sonar/db/ce/CeQueueMapper.xml @@ -169,6 +169,7 @@ ce_queue cq where cq.status='PENDING' + and cq.started_at is null and not exists ( select 1 @@ -195,6 +196,16 @@ cq.status = 'PENDING' </select> + <select id="selectWornout" resultType="org.sonar.db.ce.CeQueueDto"> + select + <include refid="columns"/> + from + ce_queue cq + where + cq.status = 'PENDING' + and cq.started_at is not null + </select> + <insert id="insert" parameterType="org.sonar.db.ce.CeQueueDto" useGeneratedKeys="false"> insert into ce_queue ( @@ -203,9 +214,7 @@ component_uuid, status, submitter_uuid, - worker_uuid, execution_count, - started_at, created_at, updated_at ) @@ -215,9 +224,7 @@ #{componentUuid,jdbcType=VARCHAR}, #{status,jdbcType=VARCHAR}, #{submitterUuid,jdbcType=VARCHAR}, - #{workerUuid,jdbcType=VARCHAR}, 0, - #{startedAt,jdbcType=BIGINT}, #{createdAt,jdbcType=BIGINT}, #{updatedAt,jdbcType=BIGINT} ) @@ -226,7 +233,6 @@ <update id="resetToPendingForWorker"> update ce_queue set status='PENDING', - started_at=NULL, updated_at=#{updatedAt,jdbcType=BIGINT} where status <> 'PENDING' @@ -255,7 +261,6 @@ update ce_queue set status='PENDING', worker_uuid=NULL, - started_at=NULL, updated_at=#{updatedAt,jdbcType=BIGINT} where status = 'IN_PROGRESS' @@ -272,7 +277,6 @@ update ce_queue set status='PENDING', worker_uuid=NULL, - started_at=NULL, updated_at=#{updatedAt,jdbcType=BIGINT} where status = 'IN_PROGRESS' diff --git a/server/sonar-db-dao/src/test/java/org/sonar/db/ce/CeActivityDaoTest.java b/server/sonar-db-dao/src/test/java/org/sonar/db/ce/CeActivityDaoTest.java index f03acf46547..f24e8b5a14a 100644 --- a/server/sonar-db-dao/src/test/java/org/sonar/db/ce/CeActivityDaoTest.java +++ b/server/sonar-db-dao/src/test/java/org/sonar/db/ce/CeActivityDaoTest.java @@ -41,6 +41,8 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.sonar.db.Pagination.forPage; import static org.sonar.db.ce.CeActivityDto.Status.FAILED; import static org.sonar.db.ce.CeActivityDto.Status.SUCCESS; +import static org.sonar.db.ce.CeQueueDto.Status.PENDING; +import static org.sonar.db.ce.CeQueueTesting.makeInProgress; import static org.sonar.db.ce.CeTaskTypes.REPORT; public class CeActivityDaoTest { @@ -64,7 +66,7 @@ public class CeActivityDaoTest { assertThat(dto.getComponentUuid()).isEqualTo("PROJECT_1"); assertThat(dto.getStatus()).isEqualTo(CeActivityDto.Status.SUCCESS); assertThat(dto.getSubmitterUuid()).isEqualTo("submitter uuid"); - assertThat(dto.getSubmittedAt()).isEqualTo(1_300_000_000_000L); + assertThat(dto.getSubmittedAt()).isEqualTo(1_450_000_000_000L); assertThat(dto.getWorkerUuid()).isEqualTo("worker uuid"); assertThat(dto.getIsLast()).isTrue(); assertThat(dto.getIsLastKey()).isEqualTo("REPORTPROJECT_1"); @@ -344,15 +346,20 @@ public class CeActivityDaoTest { } private CeActivityDto createActivityDto(String uuid, String type, String componentUuid, CeActivityDto.Status status) { - CeQueueDto queueDto = new CeQueueDto(); - queueDto.setUuid(uuid); - queueDto.setTaskType(type); - queueDto.setComponentUuid(componentUuid); - queueDto.setSubmitterUuid("submitter uuid"); - queueDto.setWorkerUuid("worker uuid"); - queueDto.setCreatedAt(1_300_000_000_000L); + CeQueueDto creating = new CeQueueDto(); + creating.setUuid(uuid); + creating.setStatus(PENDING); + creating.setTaskType(type); + creating.setComponentUuid(componentUuid); + creating.setSubmitterUuid("submitter uuid"); + creating.setCreatedAt(1_300_000_000_000L); - CeActivityDto dto = new CeActivityDto(queueDto); + db.getDbClient().ceQueueDao().insert(dbSession, creating); + makeInProgress(dbSession, "worker uuid", 1_400_000_000_000L, creating); + + CeQueueDto ceQueueDto = db.getDbClient().ceQueueDao().selectByUuid(dbSession, uuid).get(); + + CeActivityDto dto = new CeActivityDto(ceQueueDto); dto.setStatus(status); dto.setStartedAt(1_500_000_000_000L); dto.setExecutedAt(1_500_000_000_500L); @@ -385,8 +392,8 @@ public class CeActivityDaoTest { private List<String> selectPageOfUuids(Pagination pagination) { return underTest.selectByQuery(db.getSession(), new CeTaskQuery(), pagination).stream() - .map(CeActivityToUuid.INSTANCE::apply) - .collect(MoreCollectors.toList()); + .map(CeActivityToUuid.INSTANCE::apply) + .collect(MoreCollectors.toList()); } private enum CeActivityToUuid implements Function<CeActivityDto, String> { diff --git a/server/sonar-db-dao/src/test/java/org/sonar/db/ce/CeQueueDaoTest.java b/server/sonar-db-dao/src/test/java/org/sonar/db/ce/CeQueueDaoTest.java index a065ed4855b..6ae433012a7 100644 --- a/server/sonar-db-dao/src/test/java/org/sonar/db/ce/CeQueueDaoTest.java +++ b/server/sonar-db-dao/src/test/java/org/sonar/db/ce/CeQueueDaoTest.java @@ -42,11 +42,13 @@ import static com.google.common.collect.Lists.newArrayList; import static java.util.Collections.singletonList; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.entry; +import static org.assertj.core.groups.Tuple.tuple; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; import static org.sonar.db.ce.CeQueueDto.Status.IN_PROGRESS; import static org.sonar.db.ce.CeQueueDto.Status.PENDING; import static org.sonar.db.ce.CeQueueTesting.newCeQueueDto; +import static org.sonar.db.ce.CeQueueTesting.reset; public class CeQueueDaoTest { private static final long INIT_TIME = 1_450_000_000_000L; @@ -66,10 +68,11 @@ public class CeQueueDaoTest { public DbTester db = DbTester.create(system2); private System2 mockedSystem2 = mock(System2.class); + private System2 alwaysIncreasingSystem2 = new AlwaysIncreasingSystem2(); private CeQueueDao underTest = new CeQueueDao(system2); private CeQueueDao underTestWithSystem2Mock = new CeQueueDao(mockedSystem2); - private CeQueueDao underTestAlwaysIncreasingSystem2 = new CeQueueDao(new AlwaysIncreasingSystem2()); + private CeQueueDao underTestAlwaysIncreasingSystem2 = new CeQueueDao(alwaysIncreasingSystem2); @Test public void insert_populates_createdAt_and_updateAt_from_System2_with_same_value_if_any_is_not_set() { @@ -78,8 +81,7 @@ public class CeQueueDaoTest { .setTaskType(CeTaskTypes.REPORT) .setComponentUuid(COMPONENT_UUID_1) .setStatus(PENDING) - .setSubmitterUuid(SUBMITTER_LOGIN) - .setWorkerUuid(WORKER_UUID_1); + .setSubmitterUuid(SUBMITTER_LOGIN); mockSystem2ForSingleCall(now); underTestWithSystem2Mock.insert(db.getSession(), dto.setUuid(TASK_UUID_1)); @@ -100,7 +102,7 @@ public class CeQueueDaoTest { assertThat(saved.getComponentUuid()).isEqualTo(COMPONENT_UUID_1); assertThat(saved.getStatus()).isEqualTo(PENDING); assertThat(saved.getSubmitterUuid()).isEqualTo(SUBMITTER_LOGIN); - assertThat(saved.getWorkerUuid()).isEqualTo(WORKER_UUID_1); + assertThat(saved.getWorkerUuid()).isNull(); assertThat(saved.getCreatedAt()).isEqualTo(now); assertThat(saved.getUpdatedAt()).isEqualTo(now); assertThat(saved.getStartedAt()).isNull(); @@ -111,7 +113,7 @@ public class CeQueueDaoTest { assertThat(saved.getComponentUuid()).isEqualTo(COMPONENT_UUID_1); assertThat(saved.getStatus()).isEqualTo(PENDING); assertThat(saved.getSubmitterUuid()).isEqualTo(SUBMITTER_LOGIN); - assertThat(saved.getWorkerUuid()).isEqualTo(WORKER_UUID_1); + assertThat(saved.getWorkerUuid()).isNull(); assertThat(saved.getCreatedAt()).isEqualTo(6_888_777L); assertThat(saved.getUpdatedAt()).isEqualTo(8_000_999L); assertThat(saved.getStartedAt()).isNull(); @@ -119,7 +121,7 @@ public class CeQueueDaoTest { @Test public void test_selectByUuid() { - insert(TASK_UUID_1, COMPONENT_UUID_1, PENDING); + insertPending(TASK_UUID_1, COMPONENT_UUID_1); assertThat(underTest.selectByUuid(db.getSession(), "TASK_UNKNOWN").isPresent()).isFalse(); CeQueueDto saved = underTest.selectByUuid(db.getSession(), TASK_UUID_1).get(); @@ -136,9 +138,9 @@ public class CeQueueDaoTest { @Test public void test_selectByComponentUuid() { - insert(TASK_UUID_1, COMPONENT_UUID_1, PENDING); - insert(TASK_UUID_2, COMPONENT_UUID_1, PENDING); - insert(TASK_UUID_3, "PROJECT_2", PENDING); + insertPending(TASK_UUID_1, COMPONENT_UUID_1); + insertPending(TASK_UUID_2, COMPONENT_UUID_1); + insertPending(TASK_UUID_3, "PROJECT_2"); assertThat(underTest.selectByComponentUuid(db.getSession(), "UNKNOWN")).isEmpty(); assertThat(underTest.selectByComponentUuid(db.getSession(), COMPONENT_UUID_1)).extracting("uuid").containsOnly(TASK_UUID_1, TASK_UUID_2); @@ -147,21 +149,21 @@ public class CeQueueDaoTest { @Test public void test_selectAllInAscOrder() { - insert(TASK_UUID_1, COMPONENT_UUID_1, PENDING); - insert(TASK_UUID_2, COMPONENT_UUID_1, PENDING); - insert(TASK_UUID_3, "PROJECT_2", PENDING); + insertPending(TASK_UUID_1, COMPONENT_UUID_1); + insertPending(TASK_UUID_2, COMPONENT_UUID_1); + insertPending(TASK_UUID_3, "PROJECT_2"); assertThat(underTest.selectAllInAscOrder(db.getSession())).extracting("uuid").containsOnly(TASK_UUID_1, TASK_UUID_2, TASK_UUID_3); } @Test public void selectPending_returns_pending_tasks() { - insert("p1", CeQueueDto.Status.PENDING); - insert("p2", CeQueueDto.Status.PENDING); - insert("p3", CeQueueDto.Status.PENDING); - insert("i1", CeQueueDto.Status.IN_PROGRESS); - insert("i2", CeQueueDto.Status.IN_PROGRESS); - insert("i3", CeQueueDto.Status.IN_PROGRESS); + insertPending("p1"); + insertPending("p2"); + insertPending("p3"); + makeInProgress("w1", alwaysIncreasingSystem2.now(), insertPending("i1")); + makeInProgress("w1", alwaysIncreasingSystem2.now(), insertPending("i2")); + makeInProgress("w1", alwaysIncreasingSystem2.now(), insertPending("i3")); assertThat(underTest.selectPending(db.getSession())) .extracting(CeQueueDto::getUuid) @@ -169,8 +171,22 @@ public class CeQueueDaoTest { } @Test + public void selectWornout_returns_task_pending_with_a_non_null_startedAt() { + insertPending("p1"); + makeInProgress("w1", alwaysIncreasingSystem2.now(), insertPending("i1")); + CeQueueDto resetDto = makeInProgress("w1", alwaysIncreasingSystem2.now(), insertPending("i2")); + makeInProgress("w1", alwaysIncreasingSystem2.now(), insertPending("i3")); + reset(db.getSession(), alwaysIncreasingSystem2.now(), resetDto); + + List<CeQueueDto> ceQueueDtos = underTest.selectWornout(db.getSession()); + assertThat(ceQueueDtos) + .extracting(CeQueueDto::getStatus, CeQueueDto::getUuid) + .containsOnly(tuple(PENDING, resetDto.getUuid())); + } + + @Test public void test_delete() { - insert(TASK_UUID_1, COMPONENT_UUID_1, PENDING); + insertPending(TASK_UUID_1, COMPONENT_UUID_1); underTest.deleteByUuid(db.getSession(), "UNKNOWN"); assertThat(underTest.selectByUuid(db.getSession(), TASK_UUID_1)).isPresent(); @@ -181,92 +197,91 @@ public class CeQueueDaoTest { @Test public void resetToPendingForWorker_resets_status_of_non_pending_tasks_only_for_specified_workerUuid() { - long startedAt = 2_099_888L; - CeQueueDto u1 = insert("u1", CeQueueDto.Status.IN_PROGRESS, WORKER_UUID_1, startedAt); - CeQueueDto u2 = insert("u2", CeQueueDto.Status.PENDING, WORKER_UUID_1, startedAt); - CeQueueDto u3 = insert("u3", CeQueueDto.Status.PENDING, WORKER_UUID_1, startedAt); - CeQueueDto u4 = insert("u4", CeQueueDto.Status.IN_PROGRESS, WORKER_UUID_1, startedAt); - CeQueueDto o1 = insert("o1", CeQueueDto.Status.IN_PROGRESS, WORKER_UUID_2, startedAt); - CeQueueDto o2 = insert("o2", CeQueueDto.Status.PENDING, WORKER_UUID_2, startedAt); - CeQueueDto o3 = insert("o3", CeQueueDto.Status.PENDING, WORKER_UUID_2, startedAt); - CeQueueDto o4 = insert("o4", CeQueueDto.Status.IN_PROGRESS, WORKER_UUID_2, startedAt); + CeQueueDto[] worker1 = {insertPending("u1"), insertPending("u2"), insertPending("u3"), insertPending("u4")}; + CeQueueDto[] worker2 = {insertPending("o1"), insertPending("o2"), insertPending("o3"), insertPending("o4")}; + long startedAt = alwaysIncreasingSystem2.now(); + makeInProgress(WORKER_UUID_1, startedAt, worker1[0]); + makeInProgress(WORKER_UUID_1, startedAt, worker1[3]); + makeInProgress(WORKER_UUID_2, startedAt, worker2[0]); + makeInProgress(WORKER_UUID_2, startedAt, worker2[3]); underTestAlwaysIncreasingSystem2.resetToPendingForWorker(db.getSession(), WORKER_UUID_1); - verifyResetToPendingForWorker(u1); - verifyUnchangedByResetToPendingForWorker(u2); - verifyUnchangedByResetToPendingForWorker(u3); - verifyResetToPendingForWorker(u4); - verifyUnchangedByResetToPendingForWorker(o1); - verifyUnchangedByResetToPendingForWorker(o2); - verifyUnchangedByResetToPendingForWorker(o3); - verifyUnchangedByResetToPendingForWorker(o4); + verifyResetToPendingForWorker(worker1[0], WORKER_UUID_1, startedAt); + verifyUnchangedByResetToPendingForWorker(worker1[1]); + verifyUnchangedByResetToPendingForWorker(worker1[2]); + verifyResetToPendingForWorker(worker1[3], WORKER_UUID_1, startedAt); + verifyInProgressUnchangedByResetToPendingForWorker(worker2[0], WORKER_UUID_2, startedAt); + verifyUnchangedByResetToPendingForWorker(worker2[1]); + verifyUnchangedByResetToPendingForWorker(worker2[2]); + verifyInProgressUnchangedByResetToPendingForWorker(worker2[3], WORKER_UUID_2, startedAt); } @Test public void resetTasksWithUnknownWorkerUUIDs_with_empty_set_resets_status_of_all_pending_tasks() { - long startedAt = 2_099_888L; - CeQueueDto u1 = insert("u1", CeQueueDto.Status.IN_PROGRESS, WORKER_UUID_1, startedAt); - CeQueueDto u2 = insert("u2", CeQueueDto.Status.PENDING, WORKER_UUID_1, startedAt); - CeQueueDto u3 = insert("u3", CeQueueDto.Status.PENDING, WORKER_UUID_1, startedAt); - CeQueueDto u4 = insert("u4", CeQueueDto.Status.IN_PROGRESS, WORKER_UUID_1, startedAt); - CeQueueDto o1 = insert("o1", CeQueueDto.Status.IN_PROGRESS, WORKER_UUID_2, startedAt); - CeQueueDto o2 = insert("o2", CeQueueDto.Status.PENDING, WORKER_UUID_2, startedAt); - CeQueueDto o3 = insert("o3", CeQueueDto.Status.PENDING, WORKER_UUID_2, startedAt); - CeQueueDto o4 = insert("o4", CeQueueDto.Status.IN_PROGRESS, WORKER_UUID_2, startedAt); + CeQueueDto[] worker1 = {insertPending("u1"), insertPending("u2"), insertPending("u3"), insertPending("u4")}; + CeQueueDto[] worker2 = {insertPending("o1"), insertPending("o2"), insertPending("o3"), insertPending("o4")}; + long startedAt = alwaysIncreasingSystem2.now(); + makeInProgress(WORKER_UUID_1, startedAt, worker1[0]); + makeInProgress(WORKER_UUID_1, startedAt, worker1[3]); + makeInProgress(WORKER_UUID_2, startedAt, worker2[0]); + makeInProgress(WORKER_UUID_2, startedAt, worker2[3]); underTestAlwaysIncreasingSystem2.resetTasksWithUnknownWorkerUUIDs(db.getSession(), ImmutableSet.of()); - verifyResetByResetTasks(u1); - verifyUnchangedByResetToPendingForWorker(u2); - verifyUnchangedByResetToPendingForWorker(u3); - verifyResetByResetTasks(u4); - verifyResetByResetTasks(o1); - verifyUnchangedByResetToPendingForWorker(o2); - verifyUnchangedByResetToPendingForWorker(o3); - verifyResetByResetTasks(o4); + verifyResetByResetTasks(worker1[0], startedAt); + verifyUnchangedByResetToPendingForWorker(worker1[1]); + verifyUnchangedByResetToPendingForWorker(worker1[2]); + verifyResetByResetTasks(worker1[3], startedAt); + verifyResetByResetTasks(worker2[0], startedAt); + verifyUnchangedByResetToPendingForWorker(worker2[1]); + verifyUnchangedByResetToPendingForWorker(worker2[2]); + verifyResetByResetTasks(worker2[3], startedAt); } @Test public void resetTasksWithUnknownWorkerUUIDs_set_resets_status_of_all_pending_tasks_with_unknown_workers() { - long startedAt = 2_099_888L; - CeQueueDto u1 = insert("u1", CeQueueDto.Status.IN_PROGRESS, WORKER_UUID_1, startedAt); - CeQueueDto u2 = insert("u2", CeQueueDto.Status.PENDING, WORKER_UUID_1, startedAt); - CeQueueDto u3 = insert("u3", CeQueueDto.Status.PENDING, WORKER_UUID_1, startedAt); - CeQueueDto u4 = insert("u4", CeQueueDto.Status.IN_PROGRESS, WORKER_UUID_1, startedAt); - CeQueueDto o1 = insert("o1", CeQueueDto.Status.IN_PROGRESS, WORKER_UUID_2, startedAt); - CeQueueDto o2 = insert("o2", CeQueueDto.Status.PENDING, WORKER_UUID_2, startedAt); - CeQueueDto o3 = insert("o3", CeQueueDto.Status.PENDING, WORKER_UUID_2, startedAt); - CeQueueDto o4 = insert("o4", CeQueueDto.Status.IN_PROGRESS, WORKER_UUID_2, startedAt); + CeQueueDto[] worker1 = {insertPending("u1"), insertPending("u2"), insertPending("u3"), insertPending("u4")}; + CeQueueDto[] worker2 = {insertPending("o1"), insertPending("o2"), insertPending("o3"), insertPending("o4")}; + long startedAt = alwaysIncreasingSystem2.now(); + makeInProgress(WORKER_UUID_1, startedAt, worker1[0]); + makeInProgress(WORKER_UUID_1, startedAt, worker1[3]); + makeInProgress(WORKER_UUID_2, startedAt, worker2[0]); + makeInProgress(WORKER_UUID_2, startedAt, worker2[3]); underTestAlwaysIncreasingSystem2.resetTasksWithUnknownWorkerUUIDs(db.getSession(), ImmutableSet.of(WORKER_UUID_1, "unknown")); - verifyUnchangedByResetToPendingForWorker(u1); - verifyUnchangedByResetToPendingForWorker(u2); - verifyUnchangedByResetToPendingForWorker(u3); - verifyUnchangedByResetToPendingForWorker(u4); - verifyResetByResetTasks(o1); - verifyUnchangedByResetToPendingForWorker(o2); - verifyUnchangedByResetToPendingForWorker(o3); - verifyResetByResetTasks(o4); + verifyInProgressUnchangedByResetToPendingForWorker(worker1[0], WORKER_UUID_1, startedAt); + verifyUnchangedByResetToPendingForWorker(worker1[1]); + verifyUnchangedByResetToPendingForWorker(worker1[2]); + verifyInProgressUnchangedByResetToPendingForWorker(worker1[3], WORKER_UUID_1, startedAt); + verifyResetByResetTasks(worker2[0], startedAt); + verifyUnchangedByResetToPendingForWorker(worker2[1]); + verifyUnchangedByResetToPendingForWorker(worker2[2]); + verifyResetByResetTasks(worker2[3], startedAt); + } + + private CeQueueDto makeInProgress(String workerUuid, long startedAt, CeQueueDto ceQueueDto) { + CeQueueTesting.makeInProgress(db.getSession(), workerUuid, startedAt, ceQueueDto); + return underTestAlwaysIncreasingSystem2.selectByUuid(db.getSession(), ceQueueDto.getUuid()).get(); } - private void verifyResetByResetTasks(CeQueueDto original) { + private void verifyResetByResetTasks(CeQueueDto original, long startedAt) { CeQueueDto dto = db.getDbClient().ceQueueDao().selectByUuid(db.getSession(), original.getUuid()).get(); - assertThat(dto.getStatus()).isEqualTo(CeQueueDto.Status.PENDING).isNotEqualTo(original.getStatus()); - assertThat(dto.getStartedAt()).isNull(); + assertThat(dto.getStatus()).isEqualTo(PENDING); + assertThat(dto.getStartedAt()).isEqualTo(startedAt); assertThat(dto.getCreatedAt()).isEqualTo(original.getCreatedAt()); - assertThat(dto.getUpdatedAt()).isGreaterThan(original.getUpdatedAt()); + assertThat(dto.getUpdatedAt()).isGreaterThan(startedAt); assertThat(dto.getWorkerUuid()).isNull(); } - private void verifyResetToPendingForWorker(CeQueueDto original) { + private void verifyResetToPendingForWorker(CeQueueDto original, String workerUuid, long startedAt) { CeQueueDto dto = db.getDbClient().ceQueueDao().selectByUuid(db.getSession(), original.getUuid()).get(); - assertThat(dto.getStatus()).isEqualTo(CeQueueDto.Status.PENDING); - assertThat(dto.getStartedAt()).isNull(); + assertThat(dto.getStatus()).isEqualTo(PENDING); + assertThat(dto.getStartedAt()).isEqualTo(startedAt); assertThat(dto.getCreatedAt()).isEqualTo(original.getCreatedAt()); - assertThat(dto.getUpdatedAt()).isGreaterThan(original.getUpdatedAt()); - assertThat(dto.getWorkerUuid()).isEqualTo(original.getWorkerUuid()); + assertThat(dto.getUpdatedAt()).isGreaterThan(startedAt); + assertThat(dto.getWorkerUuid()).isEqualTo(workerUuid); } private void verifyUnchangedByResetToPendingForWorker(CeQueueDto original) { @@ -278,20 +293,29 @@ public class CeQueueDaoTest { assertThat(dto.getWorkerUuid()).isEqualTo(original.getWorkerUuid()); } + private void verifyInProgressUnchangedByResetToPendingForWorker(CeQueueDto original, String workerUuid, long startedAt) { + CeQueueDto dto = db.getDbClient().ceQueueDao().selectByUuid(db.getSession(), original.getUuid()).get(); + assertThat(dto.getStatus()).isEqualTo(IN_PROGRESS); + assertThat(dto.getStartedAt()).isEqualTo(startedAt); + assertThat(dto.getCreatedAt()).isEqualTo(original.getCreatedAt()); + assertThat(dto.getUpdatedAt()).isEqualTo(startedAt); + assertThat(dto.getWorkerUuid()).isEqualTo(workerUuid); + } + @Test public void peek_none_if_no_pendings() { assertThat(underTest.peek(db.getSession(), WORKER_UUID_1).isPresent()).isFalse(); // not pending, but in progress - insert(TASK_UUID_1, COMPONENT_UUID_1, IN_PROGRESS); + makeInProgress(WORKER_UUID_1, 2_232_222L, insertPending(TASK_UUID_1, COMPONENT_UUID_1)); assertThat(underTest.peek(db.getSession(), WORKER_UUID_1).isPresent()).isFalse(); } @Test public void peek_oldest_pending() { - insert(TASK_UUID_1, COMPONENT_UUID_1, PENDING); + insertPending(TASK_UUID_1, COMPONENT_UUID_1); system2.setNow(INIT_TIME + 3_000_000); - insert(TASK_UUID_2, COMPONENT_UUID_2, PENDING); + insertPending(TASK_UUID_2, COMPONENT_UUID_2); assertThat(db.countRowsOfTable("ce_queue")).isEqualTo(2); verifyCeQueueStatuses(TASK_UUID_1, PENDING, TASK_UUID_2, PENDING); @@ -319,9 +343,9 @@ public class CeQueueDaoTest { @Test public void do_not_peek_multiple_tasks_on_same_project_at_the_same_time() { // two pending tasks on the same project - insert(TASK_UUID_1, COMPONENT_UUID_1, PENDING); + insertPending(TASK_UUID_1, COMPONENT_UUID_1); system2.setNow(INIT_TIME + 3_000_000); - insert(TASK_UUID_2, COMPONENT_UUID_1, PENDING); + insertPending(TASK_UUID_2, COMPONENT_UUID_1); Optional<CeQueueDto> peek = underTest.peek(db.getSession(), WORKER_UUID_1); assertThat(peek).isPresent(); @@ -343,35 +367,35 @@ public class CeQueueDaoTest { @Test public void select_by_query() { // task status not in query - insert(newCeQueueDto(TASK_UUID_1) + insertPending(newCeQueueDto(TASK_UUID_1) .setComponentUuid(COMPONENT_UUID_1) .setStatus(IN_PROGRESS) .setTaskType(CeTaskTypes.REPORT) .setCreatedAt(100_000L)); // too early - insert(newCeQueueDto(TASK_UUID_3) + insertPending(newCeQueueDto(TASK_UUID_3) .setComponentUuid(COMPONENT_UUID_1) .setStatus(PENDING) .setTaskType(CeTaskTypes.REPORT) .setCreatedAt(90_000L)); // task type not in query - insert(newCeQueueDto("TASK_4") + insertPending(newCeQueueDto("TASK_4") .setComponentUuid("PROJECT_2") .setStatus(PENDING) .setTaskType("ANOTHER_TYPE") .setCreatedAt(100_000L)); // correct - insert(newCeQueueDto(TASK_UUID_2) + insertPending(newCeQueueDto(TASK_UUID_2) .setComponentUuid(COMPONENT_UUID_1) .setStatus(PENDING) .setTaskType(CeTaskTypes.REPORT) .setCreatedAt(100_000L)); // correct submitted later - insert(newCeQueueDto("TASK_5") + insertPending(newCeQueueDto("TASK_5") .setComponentUuid(COMPONENT_UUID_1) .setStatus(PENDING) .setTaskType(CeTaskTypes.REPORT) @@ -393,7 +417,7 @@ public class CeQueueDaoTest { @Test public void select_by_query_returns_empty_list_when_only_current() { - insert(newCeQueueDto(TASK_UUID_1) + insertPending(newCeQueueDto(TASK_UUID_1) .setComponentUuid(COMPONENT_UUID_1) .setStatus(IN_PROGRESS) .setTaskType(CeTaskTypes.REPORT) @@ -410,7 +434,7 @@ public class CeQueueDaoTest { @Test public void select_by_query_returns_empty_list_when_max_submitted_at() { - insert(newCeQueueDto(TASK_UUID_1) + insertPending(newCeQueueDto(TASK_UUID_1) .setComponentUuid(COMPONENT_UUID_1) .setStatus(IN_PROGRESS) .setTaskType(CeTaskTypes.REPORT) @@ -427,7 +451,7 @@ public class CeQueueDaoTest { @Test public void select_by_query_returns_empty_list_when_empty_list_of_component_uuid() { - insert(newCeQueueDto(TASK_UUID_1) + insertPending(newCeQueueDto(TASK_UUID_1) .setComponentUuid(COMPONENT_UUID_1) .setStatus(IN_PROGRESS) .setTaskType(CeTaskTypes.REPORT) @@ -445,19 +469,19 @@ public class CeQueueDaoTest { @Test public void count_by_status_and_component_uuid() { // task retrieved in the queue - insert(newCeQueueDto(TASK_UUID_1) + insertPending(newCeQueueDto(TASK_UUID_1) .setComponentUuid(COMPONENT_UUID_1) .setStatus(IN_PROGRESS) .setTaskType(CeTaskTypes.REPORT) .setCreatedAt(100_000L)); // on component uuid 2, not returned - insert(newCeQueueDto(TASK_UUID_2) + insertPending(newCeQueueDto(TASK_UUID_2) .setComponentUuid(COMPONENT_UUID_2) .setStatus(IN_PROGRESS) .setTaskType(CeTaskTypes.REPORT) .setCreatedAt(100_000L)); // pending status, not returned - insert(newCeQueueDto(TASK_UUID_3) + insertPending(newCeQueueDto(TASK_UUID_3) .setComponentUuid(COMPONENT_UUID_1) .setStatus(PENDING) .setTaskType(CeTaskTypes.REPORT) @@ -470,19 +494,19 @@ public class CeQueueDaoTest { @Test public void count_by_status_and_component_uuids() { // task retrieved in the queue - insert(newCeQueueDto(TASK_UUID_1) + insertPending(newCeQueueDto(TASK_UUID_1) .setComponentUuid(COMPONENT_UUID_1) .setStatus(IN_PROGRESS) .setTaskType(CeTaskTypes.REPORT) .setCreatedAt(100_000L)); // on component uuid 2, not returned - insert(newCeQueueDto(TASK_UUID_2) + insertPending(newCeQueueDto(TASK_UUID_2) .setComponentUuid(COMPONENT_UUID_2) .setStatus(IN_PROGRESS) .setTaskType(CeTaskTypes.REPORT) .setCreatedAt(100_000L)); // pending status, not returned - insert(newCeQueueDto(TASK_UUID_3) + insertPending(newCeQueueDto(TASK_UUID_3) .setComponentUuid(COMPONENT_UUID_1) .setStatus(PENDING) .setTaskType(CeTaskTypes.REPORT) @@ -497,41 +521,28 @@ public class CeQueueDaoTest { assertThat(underTest.countByStatus(db.getSession(), IN_PROGRESS)).isEqualTo(2); } - private void insert(CeQueueDto dto) { + private void insertPending(CeQueueDto dto) { underTest.insert(db.getSession(), dto); db.commit(); } - private CeQueueDto insert(String uuid, CeQueueDto.Status status) { - CeQueueDto dto = new CeQueueDto(); - dto.setUuid(uuid); - dto.setTaskType(CeTaskTypes.REPORT); - dto.setStatus(status); - dto.setSubmitterUuid("henri"); - underTestAlwaysIncreasingSystem2.insert(db.getSession(), dto); - db.getSession().commit(); - return dto; - } - - private CeQueueDto insert(String uuid, CeQueueDto.Status status, String workerUuid, Long startedAt) { + private CeQueueDto insertPending(String uuid) { CeQueueDto dto = new CeQueueDto(); dto.setUuid(uuid); dto.setTaskType(CeTaskTypes.REPORT); - dto.setStatus(status); + dto.setStatus(PENDING); dto.setSubmitterUuid("henri"); - dto.setWorkerUuid(workerUuid); - dto.setStartedAt(startedAt); underTestAlwaysIncreasingSystem2.insert(db.getSession(), dto); db.getSession().commit(); return dto; } - private CeQueueDto insert(String uuid, String componentUuid, CeQueueDto.Status status) { + private CeQueueDto insertPending(String uuid, String componentUuid) { CeQueueDto dto = new CeQueueDto(); dto.setUuid(uuid); dto.setTaskType(CeTaskTypes.REPORT); dto.setComponentUuid(componentUuid); - dto.setStatus(status); + dto.setStatus(PENDING); dto.setSubmitterUuid("henri"); underTest.insert(db.getSession(), dto); db.getSession().commit(); diff --git a/server/sonar-db-dao/src/test/java/org/sonar/db/ce/CeQueueDtoTest.java b/server/sonar-db-dao/src/test/java/org/sonar/db/ce/CeQueueDtoTest.java index 85724236dac..b93c6c90fd7 100644 --- a/server/sonar-db-dao/src/test/java/org/sonar/db/ce/CeQueueDtoTest.java +++ b/server/sonar-db-dao/src/test/java/org/sonar/db/ce/CeQueueDtoTest.java @@ -93,22 +93,4 @@ public class CeQueueDtoTest { underTest.setSubmitterUuid(str_256_chars); } - - @Test - public void setWorkerUuid_accepts_null_empty_and_string_40_chars_or_less() { - underTest.setWorkerUuid(null); - underTest.setWorkerUuid(""); - underTest.setWorkerUuid("bar"); - underTest.setWorkerUuid(STR_40_CHARS); - } - - @Test - public void setWorkerUuid_throws_IAE_if_value_is_41_chars() { - String str_41_chars = STR_40_CHARS + "a"; - - expectedException.expect(IllegalArgumentException.class); - expectedException.expectMessage("worker uuid is too long: " + str_41_chars); - - underTest.setWorkerUuid(str_41_chars); - } } diff --git a/server/sonar-db-dao/src/test/java/org/sonar/db/ce/CeQueueTesting.java b/server/sonar-db-dao/src/test/java/org/sonar/db/ce/CeQueueTesting.java index 14e2ab30db0..16330122d55 100644 --- a/server/sonar-db-dao/src/test/java/org/sonar/db/ce/CeQueueTesting.java +++ b/server/sonar-db-dao/src/test/java/org/sonar/db/ce/CeQueueTesting.java @@ -19,8 +19,15 @@ */ package org.sonar.db.ce; +import java.util.stream.Stream; +import org.sonar.db.DbSession; + +import static com.google.common.base.Preconditions.checkArgument; import static org.apache.commons.lang.RandomStringUtils.randomAlphanumeric; import static org.apache.commons.lang.math.RandomUtils.nextLong; +import static org.assertj.core.api.Assertions.assertThat; +import static org.sonar.db.ce.CeQueueDto.Status.IN_PROGRESS; +import static org.sonar.db.ce.CeQueueDto.Status.PENDING; public class CeQueueTesting { private CeQueueTesting() { @@ -35,9 +42,29 @@ public class CeQueueTesting { .setTaskType(CeTaskTypes.REPORT) .setSubmitterUuid(randomAlphanumeric(255)) .setCreatedAt(nextLong()) - .setUpdatedAt(nextLong()) - .setStartedAt(nextLong()); + .setUpdatedAt(nextLong()); + } + + public static void makeInProgress(DbSession dbSession, String workerUuid, long now, CeQueueDto... ceQueueDtos) { + Stream.of(ceQueueDtos).forEach(ceQueueDto -> { + CeQueueMapper mapper = dbSession.getMapper(CeQueueMapper.class); + int touchedRows = mapper.updateIf(ceQueueDto.getUuid(), + new UpdateIf.NewProperties(IN_PROGRESS, workerUuid, now, now), + new UpdateIf.OldProperties(PENDING)); + assertThat(touchedRows).isEqualTo(1); + }); } + public static void reset(DbSession dbSession, long now, CeQueueDto... ceQueueDtos) { + Stream.of(ceQueueDtos).forEach(ceQueueDto -> { + checkArgument(ceQueueDto.getStatus() == IN_PROGRESS); + checkArgument(ceQueueDto.getWorkerUuid() != null); + CeQueueMapper mapper = dbSession.getMapper(CeQueueMapper.class); + int touchedRows = mapper.updateIf(ceQueueDto.getUuid(), + new UpdateIf.NewProperties(PENDING, ceQueueDto.getUuid(), now, now), + new UpdateIf.OldProperties(IN_PROGRESS)); + assertThat(touchedRows).isEqualTo(1); + }); + } } diff --git a/server/sonar-db-dao/src/test/java/org/sonar/db/ce/UpdateIfTest.java b/server/sonar-db-dao/src/test/java/org/sonar/db/ce/UpdateIfTest.java new file mode 100644 index 00000000000..0b10e749863 --- /dev/null +++ b/server/sonar-db-dao/src/test/java/org/sonar/db/ce/UpdateIfTest.java @@ -0,0 +1,91 @@ +/* + * SonarQube + * Copyright (C) 2009-2018 SonarSource SA + * mailto:info AT sonarsource DOT com + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 3 of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with this program; if not, write to the Free Software Foundation, + * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ +package org.sonar.db.ce; + +import com.tngtech.java.junit.dataprovider.DataProvider; +import com.tngtech.java.junit.dataprovider.DataProviderRunner; +import com.tngtech.java.junit.dataprovider.UseDataProvider; +import java.util.Random; +import org.apache.commons.lang.RandomStringUtils; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.junit.runner.RunWith; + +import static org.assertj.core.api.Assertions.assertThat; + +@RunWith(DataProviderRunner.class) +public class UpdateIfTest { + private static final String STR_40_CHARS = "0123456789012345678901234567890123456789"; + + @Rule + public ExpectedException expectedException = ExpectedException.none(); + + @Test + public void newProperties_constructor_accepts_null_workerUuid() { + UpdateIf.NewProperties newProperties = new UpdateIf.NewProperties(CeQueueDto.Status.PENDING, null, 123, 456); + + assertThat(newProperties.getWorkerUuid()).isNull(); + } + + @Test + public void newProperties_constructor_fails_with_NPE_if_status_is_null() { + expectedException.expect(NullPointerException.class); + expectedException.expectMessage("status can't be null"); + + new UpdateIf.NewProperties(null, "foo", 123, 456); + } + + @Test + public void newProperties_constructor_fails_with_IAE_if_workerUuid_is_41_or_more() { + String workerUuid = RandomStringUtils.randomAlphanumeric(41 + new Random().nextInt(5)); + + expectedException.expect(IllegalArgumentException.class); + expectedException.expectMessage("worker uuid is too long: " + workerUuid); + + new UpdateIf.NewProperties(CeQueueDto.Status.PENDING, workerUuid, 123, 456); + } + + @Test + @UseDataProvider("workerUuidValidValues") + public void newProperties_constructor_accepts_null_empty_and_string_40_chars_or_less(String workerUuid) { + new UpdateIf.NewProperties(CeQueueDto.Status.PENDING, workerUuid, 123, 345); + } + + @DataProvider + public static Object[][] workerUuidValidValues() { + return new Object[][] { + {null}, + {""}, + {"bar"}, + {STR_40_CHARS} + }; + } + + @Test + public void newProperties_constructor_IAE_if_workerUuid_is_41_chars() { + String str_41_chars = STR_40_CHARS + "a"; + + expectedException.expect(IllegalArgumentException.class); + expectedException.expectMessage("worker uuid is too long: " + str_41_chars); + + new UpdateIf.NewProperties(CeQueueDto.Status.PENDING, str_41_chars, 123, 345); + } +} diff --git a/server/sonar-server/src/test/java/org/sonar/server/ce/ws/TaskFormatterTest.java b/server/sonar-server/src/test/java/org/sonar/server/ce/ws/TaskFormatterTest.java index 42a81c4a6d5..bcd8b84e7f1 100644 --- a/server/sonar-server/src/test/java/org/sonar/server/ce/ws/TaskFormatterTest.java +++ b/server/sonar-server/src/test/java/org/sonar/server/ce/ws/TaskFormatterTest.java @@ -39,6 +39,7 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.tuple; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; +import static org.sonar.db.ce.CeQueueTesting.makeInProgress; public class TaskFormatterTest { @@ -85,13 +86,15 @@ public class TaskFormatterTest { CeQueueDto dto = new CeQueueDto(); dto.setUuid("UUID"); dto.setTaskType("TYPE"); - dto.setStatus(CeQueueDto.Status.IN_PROGRESS); + dto.setStatus(CeQueueDto.Status.PENDING); dto.setCreatedAt(1_450_000_000_000L); - dto.setStartedAt(1_451_000_000_000L); dto.setComponentUuid(uuid); dto.setSubmitterUuid(user.getUuid()); + db.getDbClient().ceQueueDao().insert(db.getSession(), dto); + makeInProgress(db.getSession(), "workerUuid", 1_958_000_000_000L, dto); + CeQueueDto inProgress = db.getDbClient().ceQueueDao().selectByUuid(db.getSession(), dto.getUuid()).get(); - Ce.Task wsTask = underTest.formatQueue(db.getSession(), dto); + Ce.Task wsTask = underTest.formatQueue(db.getSession(), inProgress); assertThat(wsTask.getType()).isEqualTo("TYPE"); assertThat(wsTask.getId()).isEqualTo("UUID"); @@ -130,12 +133,14 @@ public class TaskFormatterTest { CeQueueDto dto = new CeQueueDto(); dto.setUuid("UUID"); dto.setTaskType("TYPE"); - dto.setStatus(CeQueueDto.Status.IN_PROGRESS); + dto.setStatus(CeQueueDto.Status.PENDING); dto.setCreatedAt(1_450_000_000_000L); - dto.setStartedAt(startedAt); + db.getDbClient().ceQueueDao().insert(db.getSession(), dto); + makeInProgress(db.getSession(), "workerUuid", startedAt, dto); + CeQueueDto inProgress = db.getDbClient().ceQueueDao().selectByUuid(db.getSession(), dto.getUuid()).get(); when(system2.now()).thenReturn(now); - Ce.Task wsTask = underTest.formatQueue(db.getSession(), dto); + Ce.Task wsTask = underTest.formatQueue(db.getSession(), inProgress); assertThat(wsTask.getExecutionTimeMs()).isEqualTo(now - startedAt); } |