summaryrefslogtreecommitdiffstats
path: root/server/sonar-ce
diff options
context:
space:
mode:
authorEric Hartmann <hartmann.eric@gmail.com>2017-04-14 17:20:26 +0200
committerEric Hartmann <hartmann.eric@gmail.Com>2017-04-27 09:23:18 +0200
commitd0c877fee5701b7e690a2e721cd9da13fa2ee5f2 (patch)
tree73eb2b229c8c0f45bbde7b13ed7c2112ae8537ed /server/sonar-ce
parent0274020f896128cbb59e4b716ab9e998011ea3b6 (diff)
downloadsonarqube-d0c877fee5701b7e690a2e721cd9da13fa2ee5f2.tar.gz
sonarqube-d0c877fee5701b7e690a2e721cd9da13fa2ee5f2.zip
SONAR-9040 Implement a lock mechanism for cleaning jobs
SONAR-8986 Add guava dependency
Diffstat (limited to 'server/sonar-ce')
-rw-r--r--server/sonar-ce/src/main/java/org/sonar/ce/CeDistributedInformation.java6
-rw-r--r--server/sonar-ce/src/main/java/org/sonar/ce/CeDistributedInformationImpl.java7
-rw-r--r--server/sonar-ce/src/main/java/org/sonar/ce/StandaloneCeDistributedInformation.java48
-rw-r--r--server/sonar-ce/src/main/java/org/sonar/ce/cleaning/CeCleaningSchedulerImpl.java15
-rw-r--r--server/sonar-ce/src/test/java/org/sonar/ce/StandaloneCeDistributedInformationTest.java31
-rw-r--r--server/sonar-ce/src/test/java/org/sonar/ce/cleaning/CeCleaningSchedulerImplTest.java86
6 files changed, 166 insertions, 27 deletions
diff --git a/server/sonar-ce/src/main/java/org/sonar/ce/CeDistributedInformation.java b/server/sonar-ce/src/main/java/org/sonar/ce/CeDistributedInformation.java
index a989272d215..a201382bf94 100644
--- a/server/sonar-ce/src/main/java/org/sonar/ce/CeDistributedInformation.java
+++ b/server/sonar-ce/src/main/java/org/sonar/ce/CeDistributedInformation.java
@@ -21,6 +21,7 @@
package org.sonar.ce;
import java.util.Set;
+import java.util.concurrent.locks.Lock;
/**
* CeDistributedInformation is the interface to be implemented in order
@@ -34,4 +35,9 @@ public interface CeDistributedInformation {
* are up so that they are shared with other Compute Engine nodes
*/
void broadcastWorkerUUIDs();
+
+ /**
+ * Acquire a lock among all the Compute Engines
+ */
+ Lock acquireCleanJobLock();
}
diff --git a/server/sonar-ce/src/main/java/org/sonar/ce/CeDistributedInformationImpl.java b/server/sonar-ce/src/main/java/org/sonar/ce/CeDistributedInformationImpl.java
index a94b9d87681..cfb52e7cc95 100644
--- a/server/sonar-ce/src/main/java/org/sonar/ce/CeDistributedInformationImpl.java
+++ b/server/sonar-ce/src/main/java/org/sonar/ce/CeDistributedInformationImpl.java
@@ -22,9 +22,11 @@ package org.sonar.ce;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.locks.Lock;
import org.picocontainer.Startable;
import org.sonar.ce.cluster.HazelcastClientWrapper;
import org.sonar.ce.taskprocessor.CeWorkerFactory;
+import org.sonar.process.cluster.ClusterObjectKeys;
import static org.sonar.core.util.stream.MoreCollectors.toSet;
import static org.sonar.process.cluster.ClusterObjectKeys.WORKER_UUIDS;
@@ -58,6 +60,11 @@ public class CeDistributedInformationImpl implements CeDistributedInformation, S
}
@Override
+ public Lock acquireCleanJobLock() {
+ return hazelcastClientWrapper.getLock(ClusterObjectKeys.CE_CLEANING_JOB_LOCK);
+ }
+
+ @Override
public void start() {
// Nothing to do here
}
diff --git a/server/sonar-ce/src/main/java/org/sonar/ce/StandaloneCeDistributedInformation.java b/server/sonar-ce/src/main/java/org/sonar/ce/StandaloneCeDistributedInformation.java
index da83705d7fc..7fcbd2b842a 100644
--- a/server/sonar-ce/src/main/java/org/sonar/ce/StandaloneCeDistributedInformation.java
+++ b/server/sonar-ce/src/main/java/org/sonar/ce/StandaloneCeDistributedInformation.java
@@ -21,6 +21,9 @@
package org.sonar.ce;
import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
import org.sonar.ce.taskprocessor.CeWorkerFactory;
import static com.google.common.base.Preconditions.checkState;
@@ -32,6 +35,8 @@ public class StandaloneCeDistributedInformation implements CeDistributedInformat
private final CeWorkerFactory ceCeWorkerFactory;
private Set<String> workerUUIDs;
+ private Lock cleanJobLock = new NonConcurrentLock();
+
public StandaloneCeDistributedInformation(CeWorkerFactory ceCeWorkerFactory) {
this.ceCeWorkerFactory = ceCeWorkerFactory;
}
@@ -46,4 +51,47 @@ public class StandaloneCeDistributedInformation implements CeDistributedInformat
public void broadcastWorkerUUIDs() {
workerUUIDs = ceCeWorkerFactory.getWorkerUUIDs();
}
+
+ /**
+ * Since StandaloneCeDistributedInformation in fact does not provide any distribution support, the lock returned by
+ * this method provides no concurrency support at all.
+ */
+ @Override
+ public Lock acquireCleanJobLock() {
+ return cleanJobLock;
+ }
+
+ private static class NonConcurrentLock implements Lock {
+ @Override
+ public void lock() {
+ // return immediately and never block
+ }
+
+ @Override
+ public void lockInterruptibly() throws InterruptedException {
+ // return immediately and never block
+ }
+
+ @Override
+ public boolean tryLock() {
+ // always succeed
+ return true;
+ }
+
+ @Override
+ public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
+ // always succeed
+ return true;
+ }
+
+ @Override
+ public void unlock() {
+ // nothing to do
+ }
+
+ @Override
+ public Condition newCondition() {
+ throw new UnsupportedOperationException("newCondition not supported");
+ }
+ }
}
diff --git a/server/sonar-ce/src/main/java/org/sonar/ce/cleaning/CeCleaningSchedulerImpl.java b/server/sonar-ce/src/main/java/org/sonar/ce/cleaning/CeCleaningSchedulerImpl.java
index 1fdb2c5a236..8c3e811eb67 100644
--- a/server/sonar-ce/src/main/java/org/sonar/ce/cleaning/CeCleaningSchedulerImpl.java
+++ b/server/sonar-ce/src/main/java/org/sonar/ce/cleaning/CeCleaningSchedulerImpl.java
@@ -19,6 +19,7 @@
*/
package org.sonar.ce.cleaning;
+import java.util.concurrent.locks.Lock;
import org.sonar.api.utils.log.Logger;
import org.sonar.api.utils.log.Loggers;
import org.sonar.ce.CeDistributedInformation;
@@ -52,8 +53,18 @@ public class CeCleaningSchedulerImpl implements CeCleaningScheduler {
}
private void cleanCeQueue() {
- cancelWornOuts();
- resetTasksWithUnknownWorkerUUIDs();
+ Lock ceCleaningJobLock = ceDistributedInformation.acquireCleanJobLock();
+
+ // If we cannot lock that means that another job is running
+ // So we skip the cancelWornOuts() method
+ if (ceCleaningJobLock.tryLock()) {
+ try {
+ cancelWornOuts();
+ resetTasksWithUnknownWorkerUUIDs();
+ } finally {
+ ceCleaningJobLock.unlock();
+ }
+ }
}
private void cancelWornOuts() {
diff --git a/server/sonar-ce/src/test/java/org/sonar/ce/StandaloneCeDistributedInformationTest.java b/server/sonar-ce/src/test/java/org/sonar/ce/StandaloneCeDistributedInformationTest.java
index 1585bcce051..83383eb07f9 100644
--- a/server/sonar-ce/src/test/java/org/sonar/ce/StandaloneCeDistributedInformationTest.java
+++ b/server/sonar-ce/src/test/java/org/sonar/ce/StandaloneCeDistributedInformationTest.java
@@ -20,13 +20,18 @@
package org.sonar.ce;
import com.google.common.collect.ImmutableSet;
+import java.util.Random;
import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Lock;
+import java.util.stream.IntStream;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.sonar.ce.taskprocessor.CeWorkerFactory;
import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.fail;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@@ -67,4 +72,30 @@ public class StandaloneCeDistributedInformationTest {
ceCluster.getWorkerUUIDs();
}
+
+ @Test
+ public void acquireCleanJobLock_returns_a_non_current_lock() {
+ StandaloneCeDistributedInformation underTest = new StandaloneCeDistributedInformation(mock(CeWorkerFactory.class));
+
+ Lock lock = underTest.acquireCleanJobLock();
+
+ IntStream.range(0, 5 + Math.abs(new Random().nextInt(50)))
+ .forEach(i -> {
+ try {
+ assertThat(lock.tryLock()).isTrue();
+ assertThat(lock.tryLock(1, TimeUnit.MINUTES)).isTrue();
+ lock.lock();
+ lock.lockInterruptibly();
+ lock.unlock();
+ } catch (InterruptedException e) {
+ fail("no InterruptedException should be thrown");
+ }
+ try {
+ lock.newCondition();
+ fail("a UnsupportedOperationException should have been thrown");
+ } catch (UnsupportedOperationException e) {
+ assertThat(e.getMessage()).isEqualTo("newCondition not supported");
+ }
+ });
+ }
}
diff --git a/server/sonar-ce/src/test/java/org/sonar/ce/cleaning/CeCleaningSchedulerImplTest.java b/server/sonar-ce/src/test/java/org/sonar/ce/cleaning/CeCleaningSchedulerImplTest.java
index bf91352b379..f9169ffbba9 100644
--- a/server/sonar-ce/src/test/java/org/sonar/ce/cleaning/CeCleaningSchedulerImplTest.java
+++ b/server/sonar-ce/src/test/java/org/sonar/ce/cleaning/CeCleaningSchedulerImplTest.java
@@ -27,6 +27,7 @@ import java.util.concurrent.Future;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
+import java.util.concurrent.locks.Lock;
import org.junit.Test;
import org.sonar.ce.CeDistributedInformation;
import org.sonar.ce.configuration.CeConfiguration;
@@ -37,21 +38,19 @@ import static org.assertj.core.api.Assertions.fail;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
public class CeCleaningSchedulerImplTest {
+
+ private Lock jobLock = mock(Lock.class);
+
@Test
public void startScheduling_does_not_fail_if_cleaning_methods_send_even_an_Exception() {
InternalCeQueue mockedInternalCeQueue = mock(InternalCeQueue.class);
- CeCleaningSchedulerImpl underTest = new CeCleaningSchedulerImpl(new CeCleaningAdapter() {
- @Override
- public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) {
- // synchronously execute command
- command.run();
- return null;
- }
- }, mockCeConfiguration(1, 10), mockedInternalCeQueue, mock(CeDistributedInformation.class));
+ CeDistributedInformation mockedCeDistributedInformation = mockCeDistributedInformation(jobLock);
+ CeCleaningSchedulerImpl underTest = mockCeCleaningSchedulerImpl(mockedInternalCeQueue, mockedCeDistributedInformation);
Exception exception = new IllegalArgumentException("faking unchecked exception thrown by cancelWornOuts");
doThrow(exception).when(mockedInternalCeQueue).cancelWornOuts();
doThrow(exception).when(mockedInternalCeQueue).resetTasksWithUnknownWorkerUUIDs(any());
@@ -65,14 +64,8 @@ public class CeCleaningSchedulerImplTest {
@Test
public void startScheduling_fails_if_cancelWornOuts_send_an_Error() {
InternalCeQueue mockedInternalCeQueue = mock(InternalCeQueue.class);
- CeCleaningSchedulerImpl underTest = new CeCleaningSchedulerImpl(new CeCleaningAdapter() {
- @Override
- public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) {
- // synchronously execute command
- command.run();
- return null;
- }
- }, mockCeConfiguration(1, 10), mockedInternalCeQueue, mock(CeDistributedInformation.class));
+ CeDistributedInformation mockedCeDistributedInformation = mockCeDistributedInformation(jobLock);
+ CeCleaningSchedulerImpl underTest = mockCeCleaningSchedulerImpl(mockedInternalCeQueue, mockedCeDistributedInformation);
Error expected = new Error("faking Error thrown by cancelWornOuts");
doThrow(expected).when(mockedInternalCeQueue).cancelWornOuts();
@@ -88,14 +81,8 @@ public class CeCleaningSchedulerImplTest {
@Test
public void startScheduling_fails_if_resetTasksWithUnknownWorkerUUIDs_send_an_Error() {
InternalCeQueue mockedInternalCeQueue = mock(InternalCeQueue.class);
- CeCleaningSchedulerImpl underTest = new CeCleaningSchedulerImpl(new CeCleaningAdapter() {
- @Override
- public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) {
- // synchronously execute command
- command.run();
- return null;
- }
- }, mockCeConfiguration(1, 10), mockedInternalCeQueue, mock(CeDistributedInformation.class));
+ CeDistributedInformation mockedCeDistributedInformation = mockCeDistributedInformation(jobLock);
+ CeCleaningSchedulerImpl underTest = mockCeCleaningSchedulerImpl(mockedInternalCeQueue, mockedCeDistributedInformation);
Error expected = new Error("faking Error thrown by cancelWornOuts");
doThrow(expected).when(mockedInternalCeQueue).resetTasksWithUnknownWorkerUUIDs(any());
@@ -109,6 +96,36 @@ public class CeCleaningSchedulerImplTest {
}
@Test
+ public void startScheduling_must_call_the_lock_methods() {
+ InternalCeQueue mockedInternalCeQueue = mock(InternalCeQueue.class);
+ CeDistributedInformation mockedCeDistributedInformation = mockCeDistributedInformation(jobLock);
+ CeCleaningSchedulerImpl underTest = mockCeCleaningSchedulerImpl(mockedInternalCeQueue, mockedCeDistributedInformation);
+ underTest.startScheduling();
+
+ verify(mockedCeDistributedInformation, times(1)).acquireCleanJobLock();
+ verify(jobLock, times(1)).tryLock();
+ verify(jobLock, times(1)).unlock();
+ }
+
+ @Test
+ public void startScheduling_must_not_execute_method_if_lock_is_already_acquired() {
+ InternalCeQueue mockedInternalCeQueue = mock(InternalCeQueue.class);
+ CeDistributedInformation mockedCeDistributedInformation = mockCeDistributedInformation(jobLock);
+ when(jobLock.tryLock()).thenReturn(false);
+
+ CeCleaningSchedulerImpl underTest = mockCeCleaningSchedulerImpl(mockedInternalCeQueue, mockedCeDistributedInformation);
+ underTest.startScheduling();
+
+ verify(mockedCeDistributedInformation, times(1)).acquireCleanJobLock();
+ verify(jobLock, times(1)).tryLock();
+ // since lock cannot be locked, unlock method is not been called
+ verify(jobLock, times(0)).unlock();
+ // since lock cannot be locked, cleaning job methods must not be called
+ verify(mockedInternalCeQueue, times(0)).resetTasksWithUnknownWorkerUUIDs(any());
+ verify(mockedInternalCeQueue, times(0)).cancelWornOuts();
+ }
+
+ @Test
public void startScheduling_calls_cleaning_methods_of_internalCeQueue_at_fixed_rate_with_value_from_CeConfiguration() {
InternalCeQueue mockedInternalCeQueue = mock(InternalCeQueue.class);
long wornOutInitialDelay = 10L;
@@ -139,7 +156,8 @@ public class CeCleaningSchedulerImplTest {
return null;
}
};
- CeCleaningSchedulerImpl underTest = new CeCleaningSchedulerImpl(executorService, mockedCeConfiguration, mockedInternalCeQueue, mock(CeDistributedInformation.class));
+ CeCleaningSchedulerImpl underTest = new CeCleaningSchedulerImpl(executorService, mockedCeConfiguration,
+ mockedInternalCeQueue, mockCeDistributedInformation(jobLock));
underTest.startScheduling();
assertThat(executorService.schedulerCounter).isEqualTo(1);
@@ -153,6 +171,24 @@ public class CeCleaningSchedulerImplTest {
return mockedCeConfiguration;
}
+ private CeCleaningSchedulerImpl mockCeCleaningSchedulerImpl(InternalCeQueue internalCeQueue, CeDistributedInformation ceDistributedInformation) {
+ return new CeCleaningSchedulerImpl(new CeCleaningAdapter() {
+ @Override
+ public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) {
+ // synchronously execute command
+ command.run();
+ return null;
+ }
+ }, mockCeConfiguration(1, 10), internalCeQueue, ceDistributedInformation);
+ }
+
+ private CeDistributedInformation mockCeDistributedInformation(Lock result) {
+ CeDistributedInformation mocked = mock(CeDistributedInformation.class);
+ when(mocked.acquireCleanJobLock()).thenReturn(result);
+ when(result.tryLock()).thenReturn(true);
+ return mocked;
+ }
+
/**
* Implementation of {@link CeCleaningExecutorService} which throws {@link UnsupportedOperationException} for every
* method.