diff options
author | Eric Hartmann <hartmann.eric@gmail.com> | 2017-04-14 17:20:26 +0200 |
---|---|---|
committer | Eric Hartmann <hartmann.eric@gmail.Com> | 2017-04-27 09:23:18 +0200 |
commit | d0c877fee5701b7e690a2e721cd9da13fa2ee5f2 (patch) | |
tree | 73eb2b229c8c0f45bbde7b13ed7c2112ae8537ed /server/sonar-ce | |
parent | 0274020f896128cbb59e4b716ab9e998011ea3b6 (diff) | |
download | sonarqube-d0c877fee5701b7e690a2e721cd9da13fa2ee5f2.tar.gz sonarqube-d0c877fee5701b7e690a2e721cd9da13fa2ee5f2.zip |
SONAR-9040 Implement a lock mechanism for cleaning jobs
SONAR-8986 Add guava dependency
Diffstat (limited to 'server/sonar-ce')
6 files changed, 166 insertions, 27 deletions
diff --git a/server/sonar-ce/src/main/java/org/sonar/ce/CeDistributedInformation.java b/server/sonar-ce/src/main/java/org/sonar/ce/CeDistributedInformation.java index a989272d215..a201382bf94 100644 --- a/server/sonar-ce/src/main/java/org/sonar/ce/CeDistributedInformation.java +++ b/server/sonar-ce/src/main/java/org/sonar/ce/CeDistributedInformation.java @@ -21,6 +21,7 @@ package org.sonar.ce; import java.util.Set; +import java.util.concurrent.locks.Lock; /** * CeDistributedInformation is the interface to be implemented in order @@ -34,4 +35,9 @@ public interface CeDistributedInformation { * are up so that they are shared with other Compute Engine nodes */ void broadcastWorkerUUIDs(); + + /** + * Acquire a lock among all the Compute Engines + */ + Lock acquireCleanJobLock(); } diff --git a/server/sonar-ce/src/main/java/org/sonar/ce/CeDistributedInformationImpl.java b/server/sonar-ce/src/main/java/org/sonar/ce/CeDistributedInformationImpl.java index a94b9d87681..cfb52e7cc95 100644 --- a/server/sonar-ce/src/main/java/org/sonar/ce/CeDistributedInformationImpl.java +++ b/server/sonar-ce/src/main/java/org/sonar/ce/CeDistributedInformationImpl.java @@ -22,9 +22,11 @@ package org.sonar.ce; import java.util.Map; import java.util.Set; +import java.util.concurrent.locks.Lock; import org.picocontainer.Startable; import org.sonar.ce.cluster.HazelcastClientWrapper; import org.sonar.ce.taskprocessor.CeWorkerFactory; +import org.sonar.process.cluster.ClusterObjectKeys; import static org.sonar.core.util.stream.MoreCollectors.toSet; import static org.sonar.process.cluster.ClusterObjectKeys.WORKER_UUIDS; @@ -58,6 +60,11 @@ public class CeDistributedInformationImpl implements CeDistributedInformation, S } @Override + public Lock acquireCleanJobLock() { + return hazelcastClientWrapper.getLock(ClusterObjectKeys.CE_CLEANING_JOB_LOCK); + } + + @Override public void start() { // Nothing to do here } diff --git a/server/sonar-ce/src/main/java/org/sonar/ce/StandaloneCeDistributedInformation.java b/server/sonar-ce/src/main/java/org/sonar/ce/StandaloneCeDistributedInformation.java index da83705d7fc..7fcbd2b842a 100644 --- a/server/sonar-ce/src/main/java/org/sonar/ce/StandaloneCeDistributedInformation.java +++ b/server/sonar-ce/src/main/java/org/sonar/ce/StandaloneCeDistributedInformation.java @@ -21,6 +21,9 @@ package org.sonar.ce; import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.Lock; import org.sonar.ce.taskprocessor.CeWorkerFactory; import static com.google.common.base.Preconditions.checkState; @@ -32,6 +35,8 @@ public class StandaloneCeDistributedInformation implements CeDistributedInformat private final CeWorkerFactory ceCeWorkerFactory; private Set<String> workerUUIDs; + private Lock cleanJobLock = new NonConcurrentLock(); + public StandaloneCeDistributedInformation(CeWorkerFactory ceCeWorkerFactory) { this.ceCeWorkerFactory = ceCeWorkerFactory; } @@ -46,4 +51,47 @@ public class StandaloneCeDistributedInformation implements CeDistributedInformat public void broadcastWorkerUUIDs() { workerUUIDs = ceCeWorkerFactory.getWorkerUUIDs(); } + + /** + * Since StandaloneCeDistributedInformation in fact does not provide any distribution support, the lock returned by + * this method provides no concurrency support at all. + */ + @Override + public Lock acquireCleanJobLock() { + return cleanJobLock; + } + + private static class NonConcurrentLock implements Lock { + @Override + public void lock() { + // return immediately and never block + } + + @Override + public void lockInterruptibly() throws InterruptedException { + // return immediately and never block + } + + @Override + public boolean tryLock() { + // always succeed + return true; + } + + @Override + public boolean tryLock(long time, TimeUnit unit) throws InterruptedException { + // always succeed + return true; + } + + @Override + public void unlock() { + // nothing to do + } + + @Override + public Condition newCondition() { + throw new UnsupportedOperationException("newCondition not supported"); + } + } } diff --git a/server/sonar-ce/src/main/java/org/sonar/ce/cleaning/CeCleaningSchedulerImpl.java b/server/sonar-ce/src/main/java/org/sonar/ce/cleaning/CeCleaningSchedulerImpl.java index 1fdb2c5a236..8c3e811eb67 100644 --- a/server/sonar-ce/src/main/java/org/sonar/ce/cleaning/CeCleaningSchedulerImpl.java +++ b/server/sonar-ce/src/main/java/org/sonar/ce/cleaning/CeCleaningSchedulerImpl.java @@ -19,6 +19,7 @@ */ package org.sonar.ce.cleaning; +import java.util.concurrent.locks.Lock; import org.sonar.api.utils.log.Logger; import org.sonar.api.utils.log.Loggers; import org.sonar.ce.CeDistributedInformation; @@ -52,8 +53,18 @@ public class CeCleaningSchedulerImpl implements CeCleaningScheduler { } private void cleanCeQueue() { - cancelWornOuts(); - resetTasksWithUnknownWorkerUUIDs(); + Lock ceCleaningJobLock = ceDistributedInformation.acquireCleanJobLock(); + + // If we cannot lock that means that another job is running + // So we skip the cancelWornOuts() method + if (ceCleaningJobLock.tryLock()) { + try { + cancelWornOuts(); + resetTasksWithUnknownWorkerUUIDs(); + } finally { + ceCleaningJobLock.unlock(); + } + } } private void cancelWornOuts() { diff --git a/server/sonar-ce/src/test/java/org/sonar/ce/StandaloneCeDistributedInformationTest.java b/server/sonar-ce/src/test/java/org/sonar/ce/StandaloneCeDistributedInformationTest.java index 1585bcce051..83383eb07f9 100644 --- a/server/sonar-ce/src/test/java/org/sonar/ce/StandaloneCeDistributedInformationTest.java +++ b/server/sonar-ce/src/test/java/org/sonar/ce/StandaloneCeDistributedInformationTest.java @@ -20,13 +20,18 @@ package org.sonar.ce; import com.google.common.collect.ImmutableSet; +import java.util.Random; import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Lock; +import java.util.stream.IntStream; import org.junit.Rule; import org.junit.Test; import org.junit.rules.ExpectedException; import org.sonar.ce.taskprocessor.CeWorkerFactory; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.fail; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -67,4 +72,30 @@ public class StandaloneCeDistributedInformationTest { ceCluster.getWorkerUUIDs(); } + + @Test + public void acquireCleanJobLock_returns_a_non_current_lock() { + StandaloneCeDistributedInformation underTest = new StandaloneCeDistributedInformation(mock(CeWorkerFactory.class)); + + Lock lock = underTest.acquireCleanJobLock(); + + IntStream.range(0, 5 + Math.abs(new Random().nextInt(50))) + .forEach(i -> { + try { + assertThat(lock.tryLock()).isTrue(); + assertThat(lock.tryLock(1, TimeUnit.MINUTES)).isTrue(); + lock.lock(); + lock.lockInterruptibly(); + lock.unlock(); + } catch (InterruptedException e) { + fail("no InterruptedException should be thrown"); + } + try { + lock.newCondition(); + fail("a UnsupportedOperationException should have been thrown"); + } catch (UnsupportedOperationException e) { + assertThat(e.getMessage()).isEqualTo("newCondition not supported"); + } + }); + } } diff --git a/server/sonar-ce/src/test/java/org/sonar/ce/cleaning/CeCleaningSchedulerImplTest.java b/server/sonar-ce/src/test/java/org/sonar/ce/cleaning/CeCleaningSchedulerImplTest.java index bf91352b379..f9169ffbba9 100644 --- a/server/sonar-ce/src/test/java/org/sonar/ce/cleaning/CeCleaningSchedulerImplTest.java +++ b/server/sonar-ce/src/test/java/org/sonar/ce/cleaning/CeCleaningSchedulerImplTest.java @@ -27,6 +27,7 @@ import java.util.concurrent.Future; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.concurrent.locks.Lock; import org.junit.Test; import org.sonar.ce.CeDistributedInformation; import org.sonar.ce.configuration.CeConfiguration; @@ -37,21 +38,19 @@ import static org.assertj.core.api.Assertions.fail; import static org.mockito.Matchers.any; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; public class CeCleaningSchedulerImplTest { + + private Lock jobLock = mock(Lock.class); + @Test public void startScheduling_does_not_fail_if_cleaning_methods_send_even_an_Exception() { InternalCeQueue mockedInternalCeQueue = mock(InternalCeQueue.class); - CeCleaningSchedulerImpl underTest = new CeCleaningSchedulerImpl(new CeCleaningAdapter() { - @Override - public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) { - // synchronously execute command - command.run(); - return null; - } - }, mockCeConfiguration(1, 10), mockedInternalCeQueue, mock(CeDistributedInformation.class)); + CeDistributedInformation mockedCeDistributedInformation = mockCeDistributedInformation(jobLock); + CeCleaningSchedulerImpl underTest = mockCeCleaningSchedulerImpl(mockedInternalCeQueue, mockedCeDistributedInformation); Exception exception = new IllegalArgumentException("faking unchecked exception thrown by cancelWornOuts"); doThrow(exception).when(mockedInternalCeQueue).cancelWornOuts(); doThrow(exception).when(mockedInternalCeQueue).resetTasksWithUnknownWorkerUUIDs(any()); @@ -65,14 +64,8 @@ public class CeCleaningSchedulerImplTest { @Test public void startScheduling_fails_if_cancelWornOuts_send_an_Error() { InternalCeQueue mockedInternalCeQueue = mock(InternalCeQueue.class); - CeCleaningSchedulerImpl underTest = new CeCleaningSchedulerImpl(new CeCleaningAdapter() { - @Override - public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) { - // synchronously execute command - command.run(); - return null; - } - }, mockCeConfiguration(1, 10), mockedInternalCeQueue, mock(CeDistributedInformation.class)); + CeDistributedInformation mockedCeDistributedInformation = mockCeDistributedInformation(jobLock); + CeCleaningSchedulerImpl underTest = mockCeCleaningSchedulerImpl(mockedInternalCeQueue, mockedCeDistributedInformation); Error expected = new Error("faking Error thrown by cancelWornOuts"); doThrow(expected).when(mockedInternalCeQueue).cancelWornOuts(); @@ -88,14 +81,8 @@ public class CeCleaningSchedulerImplTest { @Test public void startScheduling_fails_if_resetTasksWithUnknownWorkerUUIDs_send_an_Error() { InternalCeQueue mockedInternalCeQueue = mock(InternalCeQueue.class); - CeCleaningSchedulerImpl underTest = new CeCleaningSchedulerImpl(new CeCleaningAdapter() { - @Override - public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) { - // synchronously execute command - command.run(); - return null; - } - }, mockCeConfiguration(1, 10), mockedInternalCeQueue, mock(CeDistributedInformation.class)); + CeDistributedInformation mockedCeDistributedInformation = mockCeDistributedInformation(jobLock); + CeCleaningSchedulerImpl underTest = mockCeCleaningSchedulerImpl(mockedInternalCeQueue, mockedCeDistributedInformation); Error expected = new Error("faking Error thrown by cancelWornOuts"); doThrow(expected).when(mockedInternalCeQueue).resetTasksWithUnknownWorkerUUIDs(any()); @@ -109,6 +96,36 @@ public class CeCleaningSchedulerImplTest { } @Test + public void startScheduling_must_call_the_lock_methods() { + InternalCeQueue mockedInternalCeQueue = mock(InternalCeQueue.class); + CeDistributedInformation mockedCeDistributedInformation = mockCeDistributedInformation(jobLock); + CeCleaningSchedulerImpl underTest = mockCeCleaningSchedulerImpl(mockedInternalCeQueue, mockedCeDistributedInformation); + underTest.startScheduling(); + + verify(mockedCeDistributedInformation, times(1)).acquireCleanJobLock(); + verify(jobLock, times(1)).tryLock(); + verify(jobLock, times(1)).unlock(); + } + + @Test + public void startScheduling_must_not_execute_method_if_lock_is_already_acquired() { + InternalCeQueue mockedInternalCeQueue = mock(InternalCeQueue.class); + CeDistributedInformation mockedCeDistributedInformation = mockCeDistributedInformation(jobLock); + when(jobLock.tryLock()).thenReturn(false); + + CeCleaningSchedulerImpl underTest = mockCeCleaningSchedulerImpl(mockedInternalCeQueue, mockedCeDistributedInformation); + underTest.startScheduling(); + + verify(mockedCeDistributedInformation, times(1)).acquireCleanJobLock(); + verify(jobLock, times(1)).tryLock(); + // since lock cannot be locked, unlock method is not been called + verify(jobLock, times(0)).unlock(); + // since lock cannot be locked, cleaning job methods must not be called + verify(mockedInternalCeQueue, times(0)).resetTasksWithUnknownWorkerUUIDs(any()); + verify(mockedInternalCeQueue, times(0)).cancelWornOuts(); + } + + @Test public void startScheduling_calls_cleaning_methods_of_internalCeQueue_at_fixed_rate_with_value_from_CeConfiguration() { InternalCeQueue mockedInternalCeQueue = mock(InternalCeQueue.class); long wornOutInitialDelay = 10L; @@ -139,7 +156,8 @@ public class CeCleaningSchedulerImplTest { return null; } }; - CeCleaningSchedulerImpl underTest = new CeCleaningSchedulerImpl(executorService, mockedCeConfiguration, mockedInternalCeQueue, mock(CeDistributedInformation.class)); + CeCleaningSchedulerImpl underTest = new CeCleaningSchedulerImpl(executorService, mockedCeConfiguration, + mockedInternalCeQueue, mockCeDistributedInformation(jobLock)); underTest.startScheduling(); assertThat(executorService.schedulerCounter).isEqualTo(1); @@ -153,6 +171,24 @@ public class CeCleaningSchedulerImplTest { return mockedCeConfiguration; } + private CeCleaningSchedulerImpl mockCeCleaningSchedulerImpl(InternalCeQueue internalCeQueue, CeDistributedInformation ceDistributedInformation) { + return new CeCleaningSchedulerImpl(new CeCleaningAdapter() { + @Override + public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) { + // synchronously execute command + command.run(); + return null; + } + }, mockCeConfiguration(1, 10), internalCeQueue, ceDistributedInformation); + } + + private CeDistributedInformation mockCeDistributedInformation(Lock result) { + CeDistributedInformation mocked = mock(CeDistributedInformation.class); + when(mocked.acquireCleanJobLock()).thenReturn(result); + when(result.tryLock()).thenReturn(true); + return mocked; + } + /** * Implementation of {@link CeCleaningExecutorService} which throws {@link UnsupportedOperationException} for every * method. |