diff options
author | Eric Hartmann <hartmann.eric@gmail.Com> | 2017-04-14 11:58:04 +0200 |
---|---|---|
committer | Eric Hartmann <hartmann.eric@gmail.Com> | 2017-04-27 09:23:18 +0200 |
commit | a5fb1f9e0ce662aa7f8a8d6e79645dc9438cee82 (patch) | |
tree | 9eca961289d1a5319e6f79a7223eb669e9e8ca10 /server/sonar-ce | |
parent | e4d3426880a5c50d6e9cf9736c786a564e5ca777 (diff) | |
download | sonarqube-a5fb1f9e0ce662aa7f8a8d6e79645dc9438cee82.tar.gz sonarqube-a5fb1f9e0ce662aa7f8a8d6e79645dc9438cee82.zip |
SONAR-8986 add purge of tasks of non existing workers to ce clean job
Diffstat (limited to 'server/sonar-ce')
7 files changed, 209 insertions, 41 deletions
diff --git a/server/sonar-ce/src/main/java/org/sonar/ce/cleaning/CeCleaningSchedulerImpl.java b/server/sonar-ce/src/main/java/org/sonar/ce/cleaning/CeCleaningSchedulerImpl.java index 82675c693d6..1fdb2c5a236 100644 --- a/server/sonar-ce/src/main/java/org/sonar/ce/cleaning/CeCleaningSchedulerImpl.java +++ b/server/sonar-ce/src/main/java/org/sonar/ce/cleaning/CeCleaningSchedulerImpl.java @@ -21,6 +21,7 @@ package org.sonar.ce.cleaning; import org.sonar.api.utils.log.Logger; import org.sonar.api.utils.log.Loggers; +import org.sonar.ce.CeDistributedInformation; import org.sonar.ce.configuration.CeConfiguration; import org.sonar.ce.queue.InternalCeQueue; @@ -32,27 +33,44 @@ public class CeCleaningSchedulerImpl implements CeCleaningScheduler { private final CeCleaningExecutorService executorService; private final CeConfiguration ceConfiguration; private final InternalCeQueue internalCeQueue; + private final CeDistributedInformation ceDistributedInformation; - public CeCleaningSchedulerImpl(CeCleaningExecutorService executorService, CeConfiguration ceConfiguration, InternalCeQueue internalCeQueue) { + public CeCleaningSchedulerImpl(CeCleaningExecutorService executorService, CeConfiguration ceConfiguration, + InternalCeQueue internalCeQueue, CeDistributedInformation ceDistributedInformation) { this.executorService = executorService; this.internalCeQueue = internalCeQueue; this.ceConfiguration = ceConfiguration; + this.ceDistributedInformation = ceDistributedInformation; } @Override public void startScheduling() { - executorService.scheduleWithFixedDelay(this::cancelWornOuts, - ceConfiguration.getCancelWornOutsInitialDelay(), - ceConfiguration.getCancelWornOutsDelay(), + executorService.scheduleWithFixedDelay(this::cleanCeQueue, + ceConfiguration.getCleanCeTasksInitialDelay(), + ceConfiguration.getCleanCeTasksDelay(), MINUTES); } + private void cleanCeQueue() { + cancelWornOuts(); + resetTasksWithUnknownWorkerUUIDs(); + } + private void cancelWornOuts() { try { - LOG.info("Deleting any worn out task"); + LOG.debug("Deleting any worn out task"); internalCeQueue.cancelWornOuts(); } catch (Exception e) { LOG.warn("Failed to cancel worn out tasks", e); } } + + private void resetTasksWithUnknownWorkerUUIDs() { + try { + LOG.debug("Resetting state of tasks with unknown worker UUIDs"); + internalCeQueue.resetTasksWithUnknownWorkerUUIDs(ceDistributedInformation.getWorkerUUIDs()); + } catch (Exception e) { + LOG.warn("Failed to reset tasks with unknown worker UUIDs", e); + } + } } diff --git a/server/sonar-ce/src/main/java/org/sonar/ce/queue/InternalCeQueue.java b/server/sonar-ce/src/main/java/org/sonar/ce/queue/InternalCeQueue.java index 42447633920..9cdc5243de9 100644 --- a/server/sonar-ce/src/main/java/org/sonar/ce/queue/InternalCeQueue.java +++ b/server/sonar-ce/src/main/java/org/sonar/ce/queue/InternalCeQueue.java @@ -20,10 +20,8 @@ package org.sonar.ce.queue; import java.util.Optional; +import java.util.Set; import javax.annotation.Nullable; -import org.sonar.ce.queue.CeQueue; -import org.sonar.ce.queue.CeTask; -import org.sonar.ce.queue.CeTaskResult; import org.sonar.db.ce.CeActivityDto.Status; /** @@ -70,6 +68,8 @@ public interface InternalCeQueue extends CeQueue { void cancelWornOuts(); + void resetTasksWithUnknownWorkerUUIDs(Set<String> knownWorkerUUIDs); + void pausePeek(); void resumePeek(); diff --git a/server/sonar-ce/src/main/java/org/sonar/ce/queue/InternalCeQueueImpl.java b/server/sonar-ce/src/main/java/org/sonar/ce/queue/InternalCeQueueImpl.java index 8e5bc19bf8f..8a2009dc25b 100644 --- a/server/sonar-ce/src/main/java/org/sonar/ce/queue/InternalCeQueueImpl.java +++ b/server/sonar-ce/src/main/java/org/sonar/ce/queue/InternalCeQueueImpl.java @@ -25,6 +25,7 @@ import java.io.OutputStream; import java.io.PrintWriter; import java.util.List; import java.util.Optional; +import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; import javax.annotation.CheckForNull; import javax.annotation.Nullable; @@ -86,7 +87,6 @@ public class InternalCeQueueImpl extends CeQueueImpl implements InternalCeQueue queueStatus.addInProgress(); } return Optional.ofNullable(task); - } } @@ -173,6 +173,14 @@ public class InternalCeQueueImpl extends CeQueueImpl implements InternalCeQueue } @Override + public void resetTasksWithUnknownWorkerUUIDs(Set<String> knownWorkerUUIDs) { + try (DbSession dbSession = dbClient.openSession(false)) { + dbClient.ceQueueDao().resetTasksWithUnknownWorkerUUIDs(dbSession, knownWorkerUUIDs); + dbSession.commit(); + } + } + + @Override public void pausePeek() { this.peekPaused.set(true); } diff --git a/server/sonar-ce/src/test/java/org/sonar/ce/cleaning/CeCleaningSchedulerImplTest.java b/server/sonar-ce/src/test/java/org/sonar/ce/cleaning/CeCleaningSchedulerImplTest.java index 5449865fc7e..bf91352b379 100644 --- a/server/sonar-ce/src/test/java/org/sonar/ce/cleaning/CeCleaningSchedulerImplTest.java +++ b/server/sonar-ce/src/test/java/org/sonar/ce/cleaning/CeCleaningSchedulerImplTest.java @@ -28,11 +28,13 @@ import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import org.junit.Test; +import org.sonar.ce.CeDistributedInformation; import org.sonar.ce.configuration.CeConfiguration; import org.sonar.ce.queue.InternalCeQueue; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.fail; +import static org.mockito.Matchers.any; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; @@ -40,7 +42,7 @@ import static org.mockito.Mockito.when; public class CeCleaningSchedulerImplTest { @Test - public void startScheduling_does_not_fail_if_cancelWornOuts_send_even_an_Exception() { + public void startScheduling_does_not_fail_if_cleaning_methods_send_even_an_Exception() { InternalCeQueue mockedInternalCeQueue = mock(InternalCeQueue.class); CeCleaningSchedulerImpl underTest = new CeCleaningSchedulerImpl(new CeCleaningAdapter() { @Override @@ -49,16 +51,19 @@ public class CeCleaningSchedulerImplTest { command.run(); return null; } - }, mockCeConfiguration(1, 10), mockedInternalCeQueue); - doThrow(new IllegalArgumentException("faking unchecked exception thrown by cancelWornOuts")).when(mockedInternalCeQueue).cancelWornOuts(); + }, mockCeConfiguration(1, 10), mockedInternalCeQueue, mock(CeDistributedInformation.class)); + Exception exception = new IllegalArgumentException("faking unchecked exception thrown by cancelWornOuts"); + doThrow(exception).when(mockedInternalCeQueue).cancelWornOuts(); + doThrow(exception).when(mockedInternalCeQueue).resetTasksWithUnknownWorkerUUIDs(any()); underTest.startScheduling(); verify(mockedInternalCeQueue).cancelWornOuts(); + verify(mockedInternalCeQueue).resetTasksWithUnknownWorkerUUIDs(any()); } @Test - public void startScheduling_fails_if_cancelWornOuts_send_even_an_Error() { + public void startScheduling_fails_if_cancelWornOuts_send_an_Error() { InternalCeQueue mockedInternalCeQueue = mock(InternalCeQueue.class); CeCleaningSchedulerImpl underTest = new CeCleaningSchedulerImpl(new CeCleaningAdapter() { @Override @@ -67,7 +72,7 @@ public class CeCleaningSchedulerImplTest { command.run(); return null; } - }, mockCeConfiguration(1, 10), mockedInternalCeQueue); + }, mockCeConfiguration(1, 10), mockedInternalCeQueue, mock(CeDistributedInformation.class)); Error expected = new Error("faking Error thrown by cancelWornOuts"); doThrow(expected).when(mockedInternalCeQueue).cancelWornOuts(); @@ -77,36 +82,74 @@ public class CeCleaningSchedulerImplTest { } catch (Error e) { assertThat(e).isSameAs(expected); } + verify(mockedInternalCeQueue).cancelWornOuts(); } @Test - public void startScheduling_calls_cancelWornOuts_of_internalCeQueue_at_fixed_rate_with_value_from_CeConfiguration() { + public void startScheduling_fails_if_resetTasksWithUnknownWorkerUUIDs_send_an_Error() { InternalCeQueue mockedInternalCeQueue = mock(InternalCeQueue.class); - long initialDelay = 10L; - long delay = 20L; - CeConfiguration mockedCeConfiguration = mockCeConfiguration(initialDelay, delay); + CeCleaningSchedulerImpl underTest = new CeCleaningSchedulerImpl(new CeCleaningAdapter() { + @Override + public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) { + // synchronously execute command + command.run(); + return null; + } + }, mockCeConfiguration(1, 10), mockedInternalCeQueue, mock(CeDistributedInformation.class)); + Error expected = new Error("faking Error thrown by cancelWornOuts"); + doThrow(expected).when(mockedInternalCeQueue).resetTasksWithUnknownWorkerUUIDs(any()); + + try { + underTest.startScheduling(); + fail("the error should have been thrown"); + } catch (Error e) { + assertThat(e).isSameAs(expected); + } + verify(mockedInternalCeQueue).resetTasksWithUnknownWorkerUUIDs(any()); + } + + @Test + public void startScheduling_calls_cleaning_methods_of_internalCeQueue_at_fixed_rate_with_value_from_CeConfiguration() { + InternalCeQueue mockedInternalCeQueue = mock(InternalCeQueue.class); + long wornOutInitialDelay = 10L; + long wornOutDelay = 20L; + long unknownWorkerInitialDelay = 11L; + long unknownWorkerDelay = 21L; + CeConfiguration mockedCeConfiguration = mockCeConfiguration(wornOutInitialDelay, wornOutDelay); CeCleaningAdapter executorService = new CeCleaningAdapter() { @Override public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initDelay, long period, TimeUnit unit) { - assertThat(initDelay).isEqualTo(initialDelay); - assertThat(period).isEqualTo(delay); - assertThat(unit).isEqualTo(TimeUnit.MINUTES); + schedulerCounter++; + switch(schedulerCounter) { + case 1: + assertThat(initDelay).isEqualTo(wornOutInitialDelay); + assertThat(period).isEqualTo(wornOutDelay); + assertThat(unit).isEqualTo(TimeUnit.MINUTES); + break; + case 2: + assertThat(initDelay).isEqualTo(unknownWorkerInitialDelay); + assertThat(period).isEqualTo(unknownWorkerDelay); + assertThat(unit).isEqualTo(TimeUnit.MINUTES); + break; + default: + fail("Unknwon call of scheduleWithFixedDelay"); + } // synchronously execute command command.run(); return null; } }; - CeCleaningSchedulerImpl underTest = new CeCleaningSchedulerImpl(executorService, mockedCeConfiguration, mockedInternalCeQueue); + CeCleaningSchedulerImpl underTest = new CeCleaningSchedulerImpl(executorService, mockedCeConfiguration, mockedInternalCeQueue, mock(CeDistributedInformation.class)); underTest.startScheduling(); - + assertThat(executorService.schedulerCounter).isEqualTo(1); verify(mockedInternalCeQueue).cancelWornOuts(); } - private CeConfiguration mockCeConfiguration(long initialDelay, long delay) { + private CeConfiguration mockCeConfiguration(long cleanCeTasksInitialDelay, long cleanCeTasksDelay) { CeConfiguration mockedCeConfiguration = mock(CeConfiguration.class); - when(mockedCeConfiguration.getCancelWornOutsInitialDelay()).thenReturn(initialDelay); - when(mockedCeConfiguration.getCancelWornOutsDelay()).thenReturn(delay); + when(mockedCeConfiguration.getCleanCeTasksInitialDelay()).thenReturn(cleanCeTasksInitialDelay); + when(mockedCeConfiguration.getCleanCeTasksDelay()).thenReturn(cleanCeTasksDelay); return mockedCeConfiguration; } @@ -115,6 +158,7 @@ public class CeCleaningSchedulerImplTest { * method. */ private static class CeCleaningAdapter implements CeCleaningExecutorService { + protected int schedulerCounter = 0; @Override public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) { diff --git a/server/sonar-ce/src/test/java/org/sonar/ce/configuration/CeConfigurationRule.java b/server/sonar-ce/src/test/java/org/sonar/ce/configuration/CeConfigurationRule.java index 1c69bb40c25..61e22fbbdc8 100644 --- a/server/sonar-ce/src/test/java/org/sonar/ce/configuration/CeConfigurationRule.java +++ b/server/sonar-ce/src/test/java/org/sonar/ce/configuration/CeConfigurationRule.java @@ -54,21 +54,21 @@ public class CeConfigurationRule extends ExternalResource implements CeConfigura } @Override - public long getCancelWornOutsInitialDelay() { + public long getCleanCeTasksInitialDelay() { return cancelWornOutsInitialDelay; } - public void setCancelWornOutsInitialDelay(long cancelWornOutsInitialDelay) { + public void setCleanCeTasksInitialDelay(long cancelWornOutsInitialDelay) { checkArgument(cancelWornOutsInitialDelay > 0, "cancel worn-outs polling initial delay must be >= 1"); this.cancelWornOutsInitialDelay = cancelWornOutsInitialDelay; } @Override - public long getCancelWornOutsDelay() { + public long getCleanCeTasksDelay() { return cancelWornOutsDelay; } - public void setCancelWornOutsDelay(long cancelWornOutsDelay) { + public void setCleanCeTasksDelay(long cancelWornOutsDelay) { checkArgument(cancelWornOutsDelay > 0, "cancel worn-outs polling delay must be >= 1"); this.cancelWornOutsDelay = cancelWornOutsDelay; } diff --git a/server/sonar-ce/src/test/java/org/sonar/ce/monitoring/CeTasksMBeanImplTest.java b/server/sonar-ce/src/test/java/org/sonar/ce/monitoring/CeTasksMBeanImplTest.java index 1ec6f932115..0fd5c4de753 100644 --- a/server/sonar-ce/src/test/java/org/sonar/ce/monitoring/CeTasksMBeanImplTest.java +++ b/server/sonar-ce/src/test/java/org/sonar/ce/monitoring/CeTasksMBeanImplTest.java @@ -135,14 +135,15 @@ public class CeTasksMBeanImplTest { } @Override - public long getCancelWornOutsInitialDelay() { - throw new UnsupportedOperationException("getCancelWornOutsInitialDelay is not implemented"); + public long getCleanCeTasksInitialDelay() { + throw new UnsupportedOperationException("getCleanCeTasksInitialDelay is not implemented"); } @Override - public long getCancelWornOutsDelay() { - throw new UnsupportedOperationException("getCancelWornOutsDelay is not implemented"); + public long getCleanCeTasksDelay() { + throw new UnsupportedOperationException("getCleanCeTasksDelay is not implemented"); } + } @CheckForNull diff --git a/server/sonar-ce/src/test/java/org/sonar/ce/queue/InternalCeQueueImplTest.java b/server/sonar-ce/src/test/java/org/sonar/ce/queue/InternalCeQueueImplTest.java index 93340a39758..7162d2967a9 100644 --- a/server/sonar-ce/src/test/java/org/sonar/ce/queue/InternalCeQueueImplTest.java +++ b/server/sonar-ce/src/test/java/org/sonar/ce/queue/InternalCeQueueImplTest.java @@ -19,6 +19,7 @@ */ package org.sonar.ce.queue; +import com.google.common.collect.ImmutableSet; import java.io.ByteArrayOutputStream; import java.io.PrintStream; import java.util.List; @@ -530,17 +531,113 @@ public class InternalCeQueueImplTest { underTest.cancelWornOuts(); - verifyUnmodifiedByCancelWornOuts(u1); - verifyUnmodifiedByCancelWornOuts(u2); + verifyUnmodified(u1); + verifyUnmodified(u2); verifyCanceled(u3); verifyCanceled(u4); - verifyUnmodifiedByCancelWornOuts(u5); - verifyUnmodifiedByCancelWornOuts(u6); - verifyUnmodifiedByCancelWornOuts(u7); - verifyUnmodifiedByCancelWornOuts(u8); + verifyUnmodified(u5); + verifyUnmodified(u6); + verifyUnmodified(u7); + verifyUnmodified(u8); } - private void verifyUnmodifiedByCancelWornOuts(CeQueueDto original) { + @Test + public void resetTasksWithUnknownWorkerUUIDs_reset_only_in_progress_tasks() { + CeQueueDto u1 = insertCeQueueDto("u1", CeQueueDto.Status.PENDING, 0, null); + CeQueueDto u2 = insertCeQueueDto("u2", CeQueueDto.Status.PENDING, 1, "worker1"); + CeQueueDto u3 = insertCeQueueDto("u3", CeQueueDto.Status.PENDING, 2, null); + CeQueueDto u4 = insertCeQueueDto("u4", CeQueueDto.Status.PENDING, 3, "worker2"); + CeQueueDto u5 = insertCeQueueDto("u5", CeQueueDto.Status.IN_PROGRESS, 0, null); + CeQueueDto u6 = insertCeQueueDto("u6", CeQueueDto.Status.IN_PROGRESS, 1, "worker1"); + CeQueueDto u7 = insertCeQueueDto("u7", CeQueueDto.Status.IN_PROGRESS, 2, "worker2"); + CeQueueDto u8 = insertCeQueueDto("u8", CeQueueDto.Status.IN_PROGRESS, 3, "worker3"); + + underTest.resetTasksWithUnknownWorkerUUIDs(ImmutableSet.of("worker2", "worker3")); + + // Pending tasks must not be modified even if a workerUUID is not present + verifyUnmodified(u1); + verifyUnmodified(u2); + verifyUnmodified(u3); + verifyUnmodified(u4); + + // Unknown worker : null, "worker1" + verifyReset(u5); + verifyReset(u6); + + // Known workers : "worker2", "worker3" + verifyUnmodified(u7); + verifyUnmodified(u8); + } + + @Test + public void resetTasksWithUnknownWorkerUUIDs_with_empty_set_will_reset_all_in_progress_tasks() { + CeQueueDto u1 = insertCeQueueDto("u1", CeQueueDto.Status.PENDING, 0, null); + CeQueueDto u2 = insertCeQueueDto("u2", CeQueueDto.Status.PENDING, 1, "worker1"); + CeQueueDto u3 = insertCeQueueDto("u3", CeQueueDto.Status.PENDING, 2, null); + CeQueueDto u4 = insertCeQueueDto("u4", CeQueueDto.Status.PENDING, 3, "worker2"); + CeQueueDto u5 = insertCeQueueDto("u5", CeQueueDto.Status.IN_PROGRESS, 0, null); + CeQueueDto u6 = insertCeQueueDto("u6", CeQueueDto.Status.IN_PROGRESS, 1, "worker1"); + CeQueueDto u7 = insertCeQueueDto("u7", CeQueueDto.Status.IN_PROGRESS, 2, "worker2"); + CeQueueDto u8 = insertCeQueueDto("u8", CeQueueDto.Status.IN_PROGRESS, 3, "worker3"); + + underTest.resetTasksWithUnknownWorkerUUIDs(ImmutableSet.of()); + + // Pending tasks must not be modified even if a workerUUID is not present + verifyUnmodified(u1); + verifyUnmodified(u2); + verifyUnmodified(u3); + verifyUnmodified(u4); + + // Unknown worker : null, "worker1" + verifyReset(u5); + verifyReset(u6); + verifyReset(u7); + verifyReset(u8); + } + + @Test + public void resetTasksWithUnknownWorkerUUIDs_with_worker_without_tasks_will_reset_all_in_progress_tasks() { + CeQueueDto u1 = insertCeQueueDto("u1", CeQueueDto.Status.PENDING, 0, null); + CeQueueDto u2 = insertCeQueueDto("u2", CeQueueDto.Status.PENDING, 1, "worker1"); + CeQueueDto u3 = insertCeQueueDto("u3", CeQueueDto.Status.PENDING, 2, null); + CeQueueDto u4 = insertCeQueueDto("u4", CeQueueDto.Status.PENDING, 3, "worker2"); + CeQueueDto u5 = insertCeQueueDto("u5", CeQueueDto.Status.IN_PROGRESS, 0, null); + CeQueueDto u6 = insertCeQueueDto("u6", CeQueueDto.Status.IN_PROGRESS, 1, "worker1"); + CeQueueDto u7 = insertCeQueueDto("u7", CeQueueDto.Status.IN_PROGRESS, 2, "worker2"); + CeQueueDto u8 = insertCeQueueDto("u8", CeQueueDto.Status.IN_PROGRESS, 3, "worker3"); + + underTest.resetTasksWithUnknownWorkerUUIDs(ImmutableSet.of("worker1000", "worker1001")); + + // Pending tasks must not be modified even if a workerUUID is not present + verifyUnmodified(u1); + verifyUnmodified(u2); + verifyUnmodified(u3); + verifyUnmodified(u4); + + // Unknown worker : null, "worker1" + verifyReset(u5); + verifyReset(u6); + verifyReset(u7); + verifyReset(u8); + } + + private void verifyReset(CeQueueDto original) { + CeQueueDto dto = dbTester.getDbClient().ceQueueDao().selectByUuid(dbTester.getSession(), original.getUuid()).get(); + // We do not touch ExecutionCount nor CreatedAt + assertThat(dto.getExecutionCount()).isEqualTo(original.getExecutionCount()); + assertThat(dto.getCreatedAt()).isEqualTo(original.getCreatedAt()); + + // Status must have changed to PENDING and must not be equal to previous status + assertThat(dto.getStatus()).isEqualTo(CeQueueDto.Status.PENDING).isNotEqualTo(original.getStatus()); + // UpdatedAt must have been updated + assertThat(dto.getUpdatedAt()).isNotEqualTo(original.getUpdatedAt()); + // StartedAt must be null + assertThat(dto.getStartedAt()).isNull(); + // WorkerUuid must be null + assertThat(dto.getWorkerUuid()).isNull(); + } + + private void verifyUnmodified(CeQueueDto original) { CeQueueDto dto = dbTester.getDbClient().ceQueueDao().selectByUuid(dbTester.getSession(), original.getUuid()).get(); assertThat(dto.getStatus()).isEqualTo(original.getStatus()); assertThat(dto.getExecutionCount()).isEqualTo(original.getExecutionCount()); |