@Test
public void peek_peeks_pending_tasks_with_executionCount_equal_to_0_and_increases_it() {
dbTester.getDbClient().ceQueueDao().insert(session, new CeQueueDto()
- .setUuid("uuid")
- .setTaskType("foo")
- .setStatus(CeQueueDto.Status.PENDING)
- .setExecutionCount(0));
+ .setUuid("uuid")
+ .setTaskType("foo")
+ .setStatus(CeQueueDto.Status.PENDING)
+ .setExecutionCount(0));
dbTester.commit();
assertThat(underTest.peek(WORKER_UUID_1).get().getUuid()).isEqualTo("uuid");
@Test
public void peek_peeks_pending_tasks_with_executionCount_equal_to_1_and_increases_it() {
dbTester.getDbClient().ceQueueDao().insert(session, new CeQueueDto()
- .setUuid("uuid")
- .setTaskType("foo")
- .setStatus(CeQueueDto.Status.PENDING)
- .setExecutionCount(1));
+ .setUuid("uuid")
+ .setTaskType("foo")
+ .setStatus(CeQueueDto.Status.PENDING)
+ .setExecutionCount(1));
dbTester.commit();
assertThat(underTest.peek(WORKER_UUID_1).get().getUuid()).isEqualTo("uuid");
@Test
public void peek_ignores_pending_tasks_with_executionCount_equal_to_2() {
dbTester.getDbClient().ceQueueDao().insert(session, new CeQueueDto()
- .setUuid("uuid")
- .setTaskType("foo")
- .setStatus(CeQueueDto.Status.PENDING)
- .setExecutionCount(2));
+ .setUuid("uuid")
+ .setTaskType("foo")
+ .setStatus(CeQueueDto.Status.PENDING)
+ .setExecutionCount(2));
dbTester.commit();
assertThat(underTest.peek(WORKER_UUID_1).isPresent()).isFalse();
@Test
public void peek_ignores_pending_tasks_with_executionCount_greater_than_2() {
dbTester.getDbClient().ceQueueDao().insert(session, new CeQueueDto()
- .setUuid("uuid")
- .setTaskType("foo")
- .setStatus(CeQueueDto.Status.PENDING)
- .setExecutionCount(2 + Math.abs(new Random().nextInt(100))));
+ .setUuid("uuid")
+ .setTaskType("foo")
+ .setStatus(CeQueueDto.Status.PENDING)
+ .setExecutionCount(2 + Math.abs(new Random().nextInt(100))));
dbTester.commit();
assertThat(underTest.peek(WORKER_UUID_1).isPresent()).isFalse();
assertThat(history.isPresent()).isFalse();
}
+ @Test
+ public void cancelWornOuts_cancels_pending_tasks_with_executionCount_greater_or_equal_to_2() {
+ CeQueueDto u1 = insertCeQueueDto("u1", CeQueueDto.Status.PENDING, 0, "worker1");
+ CeQueueDto u2 = insertCeQueueDto("u2", CeQueueDto.Status.PENDING, 1, "worker1");
+ CeQueueDto u3 = insertCeQueueDto("u3", CeQueueDto.Status.PENDING, 2, "worker1");
+ CeQueueDto u4 = insertCeQueueDto("u4", CeQueueDto.Status.PENDING, 3, "worker1");
+ CeQueueDto u5 = insertCeQueueDto("u5", CeQueueDto.Status.IN_PROGRESS, 0, "worker1");
+ CeQueueDto u6 = insertCeQueueDto("u6", CeQueueDto.Status.IN_PROGRESS, 1, "worker1");
+ CeQueueDto u7 = insertCeQueueDto("u7", CeQueueDto.Status.IN_PROGRESS, 2, "worker1");
+ CeQueueDto u8 = insertCeQueueDto("u8", CeQueueDto.Status.IN_PROGRESS, 3, "worker1");
+
+ underTest.cancelWornOuts();
+
+ verifyUnmodifiedByCancelWornOuts(u1);
+ verifyUnmodifiedByCancelWornOuts(u2);
+ verifyCanceled(u3);
+ verifyCanceled(u4);
+ verifyUnmodifiedByCancelWornOuts(u5);
+ verifyUnmodifiedByCancelWornOuts(u6);
+ verifyUnmodifiedByCancelWornOuts(u7);
+ verifyUnmodifiedByCancelWornOuts(u8);
+ }
+
+ private void verifyUnmodifiedByCancelWornOuts(CeQueueDto original) {
+ CeQueueDto dto = dbTester.getDbClient().ceQueueDao().selectByUuid(dbTester.getSession(), original.getUuid()).get();
+ assertThat(dto.getStatus()).isEqualTo(original.getStatus());
+ assertThat(dto.getExecutionCount()).isEqualTo(original.getExecutionCount());
+ assertThat(dto.getCreatedAt()).isEqualTo(original.getCreatedAt());
+ assertThat(dto.getUpdatedAt()).isEqualTo(original.getUpdatedAt());
+ }
+
+ private void verifyCanceled(CeQueueDto original) {
+ assertThat(dbTester.getDbClient().ceQueueDao().selectByUuid(dbTester.getSession(), original.getUuid())).isAbsent();
+ CeActivityDto dto = dbTester.getDbClient().ceActivityDao().selectByUuid(dbTester.getSession(), original.getUuid()).get();
+ assertThat(dto.getStatus()).isEqualTo(CeActivityDto.Status.CANCELED);
+ assertThat(dto.getExecutionCount()).isEqualTo(original.getExecutionCount());
+ assertThat(dto.getWorkerUuid()).isEqualTo(original.getWorkerUuid());
+ }
+
+ private CeQueueDto insertCeQueueDto(String uuid, CeQueueDto.Status status, int executionCount, String workerUuid) {
+ CeQueueDto dto = new CeQueueDto()
+ .setUuid(uuid)
+ .setTaskType("foo")
+ .setStatus(status)
+ .setExecutionCount(executionCount)
+ .setWorkerUuid(workerUuid);
+ dbTester.getDbClient().ceQueueDao().insert(dbTester.getSession(), dto);
+ dbTester.commit();
+ return dto;
+ }
+
@Test
public void pause_and_resume_submits() throws Exception {
assertThat(underTest.isSubmitPaused()).isFalse();