From dd09efbab2e48ee1b11a98d868493d4c5457ca69 Mon Sep 17 00:00:00 2001 From: =?utf8?q?S=C3=A9bastien=20Lesaint?= Date: Thu, 30 Mar 2017 17:38:03 +0200 Subject: [PATCH] SONAR-9041 add InternalCeQueue#cancelWornOuts --- .../computation/queue/InternalCeQueue.java | 2 + .../queue/InternalCeQueueImpl.java | 14 ++++ .../queue/InternalCeQueueImplTest.java | 83 +++++++++++++++---- 3 files changed, 83 insertions(+), 16 deletions(-) diff --git a/server/sonar-server/src/main/java/org/sonar/server/computation/queue/InternalCeQueue.java b/server/sonar-server/src/main/java/org/sonar/server/computation/queue/InternalCeQueue.java index f06d997fa64..a010aa54035 100644 --- a/server/sonar-server/src/main/java/org/sonar/server/computation/queue/InternalCeQueue.java +++ b/server/sonar-server/src/main/java/org/sonar/server/computation/queue/InternalCeQueue.java @@ -68,6 +68,8 @@ public interface InternalCeQueue extends CeQueue { */ void remove(CeTask task, Status status, @Nullable CeTaskResult taskResult, @Nullable Throwable error); + void cancelWornOuts(); + void pausePeek(); void resumePeek(); diff --git a/server/sonar-server/src/main/java/org/sonar/server/computation/queue/InternalCeQueueImpl.java b/server/sonar-server/src/main/java/org/sonar/server/computation/queue/InternalCeQueueImpl.java index 869dc970f09..4d291d8aacb 100644 --- a/server/sonar-server/src/main/java/org/sonar/server/computation/queue/InternalCeQueueImpl.java +++ b/server/sonar-server/src/main/java/org/sonar/server/computation/queue/InternalCeQueueImpl.java @@ -24,6 +24,7 @@ import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.OutputStream; import java.io.PrintWriter; +import java.util.List; import java.util.concurrent.atomic.AtomicBoolean; import javax.annotation.CheckForNull; import javax.annotation.Nullable; @@ -153,6 +154,19 @@ public class InternalCeQueueImpl extends CeQueueImpl implements InternalCeQueue } } + @Override + public void cancelWornOuts() { + try (DbSession dbSession = dbClient.openSession(false)) { + List wornOutTasks = dbClient.ceQueueDao().selectPendingByMinimumExecutionCount(dbSession, MAX_EXECUTION_COUNT); + wornOutTasks.forEach(queueDto -> { + CeActivityDto activityDto = new CeActivityDto(queueDto); + activityDto.setStatus(CeActivityDto.Status.CANCELED); + updateQueueStatus(CeActivityDto.Status.CANCELED, activityDto); + remove(dbSession, queueDto, activityDto); + }); + } + } + @Override public void pausePeek() { this.peekPaused.set(true); diff --git a/server/sonar-server/src/test/java/org/sonar/server/computation/queue/InternalCeQueueImplTest.java b/server/sonar-server/src/test/java/org/sonar/server/computation/queue/InternalCeQueueImplTest.java index 869f3076612..5c8017273cc 100644 --- a/server/sonar-server/src/test/java/org/sonar/server/computation/queue/InternalCeQueueImplTest.java +++ b/server/sonar-server/src/test/java/org/sonar/server/computation/queue/InternalCeQueueImplTest.java @@ -294,10 +294,10 @@ public class InternalCeQueueImplTest { @Test public void peek_peeks_pending_tasks_with_executionCount_equal_to_0_and_increases_it() { dbTester.getDbClient().ceQueueDao().insert(session, new CeQueueDto() - .setUuid("uuid") - .setTaskType("foo") - .setStatus(CeQueueDto.Status.PENDING) - .setExecutionCount(0)); + .setUuid("uuid") + .setTaskType("foo") + .setStatus(CeQueueDto.Status.PENDING) + .setExecutionCount(0)); dbTester.commit(); assertThat(underTest.peek(WORKER_UUID_1).get().getUuid()).isEqualTo("uuid"); @@ -307,10 +307,10 @@ public class InternalCeQueueImplTest { @Test public void peek_peeks_pending_tasks_with_executionCount_equal_to_1_and_increases_it() { dbTester.getDbClient().ceQueueDao().insert(session, new CeQueueDto() - .setUuid("uuid") - .setTaskType("foo") - .setStatus(CeQueueDto.Status.PENDING) - .setExecutionCount(1)); + .setUuid("uuid") + .setTaskType("foo") + .setStatus(CeQueueDto.Status.PENDING) + .setExecutionCount(1)); dbTester.commit(); assertThat(underTest.peek(WORKER_UUID_1).get().getUuid()).isEqualTo("uuid"); @@ -320,10 +320,10 @@ public class InternalCeQueueImplTest { @Test public void peek_ignores_pending_tasks_with_executionCount_equal_to_2() { dbTester.getDbClient().ceQueueDao().insert(session, new CeQueueDto() - .setUuid("uuid") - .setTaskType("foo") - .setStatus(CeQueueDto.Status.PENDING) - .setExecutionCount(2)); + .setUuid("uuid") + .setTaskType("foo") + .setStatus(CeQueueDto.Status.PENDING) + .setExecutionCount(2)); dbTester.commit(); assertThat(underTest.peek(WORKER_UUID_1).isPresent()).isFalse(); @@ -332,10 +332,10 @@ public class InternalCeQueueImplTest { @Test public void peek_ignores_pending_tasks_with_executionCount_greater_than_2() { dbTester.getDbClient().ceQueueDao().insert(session, new CeQueueDto() - .setUuid("uuid") - .setTaskType("foo") - .setStatus(CeQueueDto.Status.PENDING) - .setExecutionCount(2 + Math.abs(new Random().nextInt(100)))); + .setUuid("uuid") + .setTaskType("foo") + .setStatus(CeQueueDto.Status.PENDING) + .setExecutionCount(2 + Math.abs(new Random().nextInt(100)))); dbTester.commit(); assertThat(underTest.peek(WORKER_UUID_1).isPresent()).isFalse(); @@ -402,6 +402,57 @@ public class InternalCeQueueImplTest { assertThat(history.isPresent()).isFalse(); } + @Test + public void cancelWornOuts_cancels_pending_tasks_with_executionCount_greater_or_equal_to_2() { + CeQueueDto u1 = insertCeQueueDto("u1", CeQueueDto.Status.PENDING, 0, "worker1"); + CeQueueDto u2 = insertCeQueueDto("u2", CeQueueDto.Status.PENDING, 1, "worker1"); + CeQueueDto u3 = insertCeQueueDto("u3", CeQueueDto.Status.PENDING, 2, "worker1"); + CeQueueDto u4 = insertCeQueueDto("u4", CeQueueDto.Status.PENDING, 3, "worker1"); + CeQueueDto u5 = insertCeQueueDto("u5", CeQueueDto.Status.IN_PROGRESS, 0, "worker1"); + CeQueueDto u6 = insertCeQueueDto("u6", CeQueueDto.Status.IN_PROGRESS, 1, "worker1"); + CeQueueDto u7 = insertCeQueueDto("u7", CeQueueDto.Status.IN_PROGRESS, 2, "worker1"); + CeQueueDto u8 = insertCeQueueDto("u8", CeQueueDto.Status.IN_PROGRESS, 3, "worker1"); + + underTest.cancelWornOuts(); + + verifyUnmodifiedByCancelWornOuts(u1); + verifyUnmodifiedByCancelWornOuts(u2); + verifyCanceled(u3); + verifyCanceled(u4); + verifyUnmodifiedByCancelWornOuts(u5); + verifyUnmodifiedByCancelWornOuts(u6); + verifyUnmodifiedByCancelWornOuts(u7); + verifyUnmodifiedByCancelWornOuts(u8); + } + + private void verifyUnmodifiedByCancelWornOuts(CeQueueDto original) { + CeQueueDto dto = dbTester.getDbClient().ceQueueDao().selectByUuid(dbTester.getSession(), original.getUuid()).get(); + assertThat(dto.getStatus()).isEqualTo(original.getStatus()); + assertThat(dto.getExecutionCount()).isEqualTo(original.getExecutionCount()); + assertThat(dto.getCreatedAt()).isEqualTo(original.getCreatedAt()); + assertThat(dto.getUpdatedAt()).isEqualTo(original.getUpdatedAt()); + } + + private void verifyCanceled(CeQueueDto original) { + assertThat(dbTester.getDbClient().ceQueueDao().selectByUuid(dbTester.getSession(), original.getUuid())).isAbsent(); + CeActivityDto dto = dbTester.getDbClient().ceActivityDao().selectByUuid(dbTester.getSession(), original.getUuid()).get(); + assertThat(dto.getStatus()).isEqualTo(CeActivityDto.Status.CANCELED); + assertThat(dto.getExecutionCount()).isEqualTo(original.getExecutionCount()); + assertThat(dto.getWorkerUuid()).isEqualTo(original.getWorkerUuid()); + } + + private CeQueueDto insertCeQueueDto(String uuid, CeQueueDto.Status status, int executionCount, String workerUuid) { + CeQueueDto dto = new CeQueueDto() + .setUuid(uuid) + .setTaskType("foo") + .setStatus(status) + .setExecutionCount(executionCount) + .setWorkerUuid(workerUuid); + dbTester.getDbClient().ceQueueDao().insert(dbTester.getSession(), dto); + dbTester.commit(); + return dto; + } + @Test public void pause_and_resume_submits() throws Exception { assertThat(underTest.isSubmitPaused()).isFalse(); -- 2.39.5